[ 
https://issues.apache.org/jira/browse/HUDI-7016?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ethan Guo updated HUDI-7016:
----------------------------
    Description: 
When running Spark quickstart with deletes using the Hudi Spark bundle and 
turning on position encoding for deletes and updates 
("hoodie.write.record.positions=true"), the write fails with the following 
exception:
{code:java}
Caused by: java.lang.NoSuchMethodError: 
org.roaringbitmap.longlong.Roaring64NavigableMap.serializePortable(Ljava/io/DataOutput;)V
        at 
org.apache.hudi.common.table.log.LogReaderUtils.encodePositions(LogReaderUtils.java:128)
        at 
org.apache.hudi.common.table.log.LogReaderUtils.encodePositions(LogReaderUtils.java:109)
        at 
org.apache.hudi.common.table.log.block.HoodieLogBlock.addRecordPositionsToHeader(HoodieLogBlock.java:147)
        at 
org.apache.hudi.common.table.log.block.HoodieDeleteBlock.<init>(HoodieDeleteBlock.java:88)
        at 
org.apache.hudi.io.HoodieAppendHandle.appendDataAndDeleteBlocks(HoodieAppendHandle.java:480)
        at 
org.apache.hudi.io.HoodieAppendHandle.doAppend(HoodieAppendHandle.java:454)
        at 
org.apache.hudi.table.action.deltacommit.BaseSparkDeltaCommitActionExecutor.handleUpdate(BaseSparkDeltaCommitActionExecutor.java:83)
        at 
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:313)
        ... 28 more {code}
Spark quickstart run:

 
{code:java}
// spark-shell
import org.apache.hudi.QuickstartUtils._
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.hudi.common.model.HoodieRecord
val tableName = "hudi_trips_cow"
val basePath = "file:///tmp/hudi_trips_mor"
val dataGen = new DataGenerator
val inserts = convertToStringList(dataGen.generateInserts(1000))
val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
df.write.format("hudi").
  option(PRECOMBINE_FIELD_OPT_KEY, "ts").
  option(RECORDKEY_FIELD_OPT_KEY, "uuid").
  option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
  option("hoodie.logfile.data.block.format", "parquet").
  option("hoodie.write.record.positions", "true").
  option("hoodie.datasource.write.table.type", "MERGE_ON_READ").
  option(TABLE_NAME, tableName + "true").
  mode(Overwrite).
  save(basePath + "true")
df.write.format("hudi").
  option(PRECOMBINE_FIELD_OPT_KEY, "ts").
  option(RECORDKEY_FIELD_OPT_KEY, "uuid").
  option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
  option("hoodie.logfile.data.block.format", "parquet").
  option("hoodie.write.record.positions", "false").
  option("hoodie.datasource.write.table.type", "MERGE_ON_READ").
  option(TABLE_NAME, tableName + "false").
  mode(Overwrite).
  save(basePath + "false")
spark.read.format("hudi").load(basePath + 
"true").createOrReplaceTempView("hudi_trips_snapshot")
val ds = spark.sql("select uuid, partitionpath from 
hudi_trips_snapshot").sample(0.01)
// issue deletes
val deletes = dataGen.generateDeletes(ds.collectAsList())
val hardDeleteDf = spark.read.json(spark.sparkContext.parallelize(deletes, 2))
hardDeleteDf.write.format("hudi").
  option(OPERATION_OPT_KEY, "delete").
  option(PRECOMBINE_FIELD_OPT_KEY, "ts").
  option(RECORDKEY_FIELD_OPT_KEY, "uuid").
  option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
  option("hoodie.logfile.data.block.format", "parquet").
  option("hoodie.write.record.positions", "true").
  option("hoodie.datasource.write.table.type", "MERGE_ON_READ").
  option(TABLE_NAME, tableName + "true").
  mode(Append).
  save(basePath + "true")
hardDeleteDf.write.format("hudi").
  option(OPERATION_OPT_KEY, "delete").
  option(PRECOMBINE_FIELD_OPT_KEY, "ts").
  option(RECORDKEY_FIELD_OPT_KEY, "uuid").
  option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
  option("hoodie.logfile.data.block.format", "parquet").
  option("hoodie.write.record.positions", "false").
  option("hoodie.datasource.write.table.type", "MERGE_ON_READ").
  option(TABLE_NAME, tableName + "false").
  mode(Append).
  save(basePath + "false")
spark.read.format("hudi").
    option("hoodie.datasource.read.use.new.parquet.file.format", "true").
    option("hoodie.file.group.reader.enabled", "true").
    load(basePath + "true").count()
spark.read.format("hudi").
    option("hoodie.datasource.read.use.new.parquet.file.format", "true").
    option("hoodie.file.group.reader.enabled", "true").
    load(basePath + "false").count() {code}

> Fix bundling of RoaringBitmap dependency
> ----------------------------------------
>
>                 Key: HUDI-7016
>                 URL: https://issues.apache.org/jira/browse/HUDI-7016
>             Project: Apache Hudi
>          Issue Type: Bug
>            Reporter: Ethan Guo
>            Assignee: Ethan Guo
>            Priority: Blocker
>             Fix For: 1.0.0
>
>
> When running Spark quickstart with deletes using the Hudi Spark bundle and 
> turning on position encoding for deletes and updates 
> ("hoodie.write.record.positions=true"), the write fails with the following 
> exception:
> {code:java}
> Caused by: java.lang.NoSuchMethodError: 
> org.roaringbitmap.longlong.Roaring64NavigableMap.serializePortable(Ljava/io/DataOutput;)V
>       at 
> org.apache.hudi.common.table.log.LogReaderUtils.encodePositions(LogReaderUtils.java:128)
>       at 
> org.apache.hudi.common.table.log.LogReaderUtils.encodePositions(LogReaderUtils.java:109)
>       at 
> org.apache.hudi.common.table.log.block.HoodieLogBlock.addRecordPositionsToHeader(HoodieLogBlock.java:147)
>       at 
> org.apache.hudi.common.table.log.block.HoodieDeleteBlock.<init>(HoodieDeleteBlock.java:88)
>       at 
> org.apache.hudi.io.HoodieAppendHandle.appendDataAndDeleteBlocks(HoodieAppendHandle.java:480)
>       at 
> org.apache.hudi.io.HoodieAppendHandle.doAppend(HoodieAppendHandle.java:454)
>       at 
> org.apache.hudi.table.action.deltacommit.BaseSparkDeltaCommitActionExecutor.handleUpdate(BaseSparkDeltaCommitActionExecutor.java:83)
>       at 
> org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:313)
>       ... 28 more {code}
> Spark quickstart run:
>  
> {code:java}
> // spark-shell
> import org.apache.hudi.QuickstartUtils._
> import scala.collection.JavaConversions._
> import org.apache.spark.sql.SaveMode._
> import org.apache.hudi.DataSourceReadOptions._
> import org.apache.hudi.DataSourceWriteOptions._
> import org.apache.hudi.config.HoodieWriteConfig._
> import org.apache.hudi.common.model.HoodieRecord
> val tableName = "hudi_trips_cow"
> val basePath = "file:///tmp/hudi_trips_mor"
> val dataGen = new DataGenerator
> val inserts = convertToStringList(dataGen.generateInserts(1000))
> val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
> df.write.format("hudi").
>   option(PRECOMBINE_FIELD_OPT_KEY, "ts").
>   option(RECORDKEY_FIELD_OPT_KEY, "uuid").
>   option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
>   option("hoodie.logfile.data.block.format", "parquet").
>   option("hoodie.write.record.positions", "true").
>   option("hoodie.datasource.write.table.type", "MERGE_ON_READ").
>   option(TABLE_NAME, tableName + "true").
>   mode(Overwrite).
>   save(basePath + "true")
> df.write.format("hudi").
>   option(PRECOMBINE_FIELD_OPT_KEY, "ts").
>   option(RECORDKEY_FIELD_OPT_KEY, "uuid").
>   option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
>   option("hoodie.logfile.data.block.format", "parquet").
>   option("hoodie.write.record.positions", "false").
>   option("hoodie.datasource.write.table.type", "MERGE_ON_READ").
>   option(TABLE_NAME, tableName + "false").
>   mode(Overwrite).
>   save(basePath + "false")
> spark.read.format("hudi").load(basePath + 
> "true").createOrReplaceTempView("hudi_trips_snapshot")
> val ds = spark.sql("select uuid, partitionpath from 
> hudi_trips_snapshot").sample(0.01)
> // issue deletes
> val deletes = dataGen.generateDeletes(ds.collectAsList())
> val hardDeleteDf = spark.read.json(spark.sparkContext.parallelize(deletes, 2))
> hardDeleteDf.write.format("hudi").
>   option(OPERATION_OPT_KEY, "delete").
>   option(PRECOMBINE_FIELD_OPT_KEY, "ts").
>   option(RECORDKEY_FIELD_OPT_KEY, "uuid").
>   option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
>   option("hoodie.logfile.data.block.format", "parquet").
>   option("hoodie.write.record.positions", "true").
>   option("hoodie.datasource.write.table.type", "MERGE_ON_READ").
>   option(TABLE_NAME, tableName + "true").
>   mode(Append).
>   save(basePath + "true")
> hardDeleteDf.write.format("hudi").
>   option(OPERATION_OPT_KEY, "delete").
>   option(PRECOMBINE_FIELD_OPT_KEY, "ts").
>   option(RECORDKEY_FIELD_OPT_KEY, "uuid").
>   option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
>   option("hoodie.logfile.data.block.format", "parquet").
>   option("hoodie.write.record.positions", "false").
>   option("hoodie.datasource.write.table.type", "MERGE_ON_READ").
>   option(TABLE_NAME, tableName + "false").
>   mode(Append).
>   save(basePath + "false")
> spark.read.format("hudi").
>     option("hoodie.datasource.read.use.new.parquet.file.format", "true").
>     option("hoodie.file.group.reader.enabled", "true").
>     load(basePath + "true").count()
> spark.read.format("hudi").
>     option("hoodie.datasource.read.use.new.parquet.file.format", "true").
>     option("hoodie.file.group.reader.enabled", "true").
>     load(basePath + "false").count() {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to