xiarixiaoyao edited a comment on pull request #2721: URL: https://github.com/apache/hudi/pull/2721#issuecomment-806582989
test step: before patch: step1: val df = spark.range(0, 1000000).toDF("keyid") .withColumn("col3", expr("keyid + 10000000")) .withColumn("p", lit(0)) .withColumn("p1", lit(0)) .withColumn("p2", lit(7)) .withColumn("a1", lit(Array[String] ("sb1", "rz"))) .withColumn("a2", lit(Array[String] ("sb1", "rz"))) // bulk_insert 100w row (keyid from 0 to 1000000) merge(df, 4, "default", "hive_9b", DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL, op = "bulk_insert") step2: val df = spark.range(0, 900000).toDF("keyid") .withColumn("col3", expr("keyid + 10000000")) .withColumn("p", lit(0)) .withColumn("p1", lit(0)) .withColumn("p2", lit(7)) .withColumn("a1", lit(Array[String] ("sb1", "rz"))) .withColumn("a2", lit(Array[String] ("sb1", "rz"))) // delete 90w row (keyid from 0 to 900000) delete(df, 4, "default", "hive_9b") step3: query on beeline/spark-sql : select count(col3) from hive_9b_rt 2021-03-25 15:33:29,029 | INFO | main | RECORDS_OUT_OPERATOR_RS_3:1, RECORDS_OUT_INTERMEDIATE:1, | Operator.java:10382021-03-25 15:33:29,029 | INFO | main | RECORDS_OUT_OPERATOR_RS_3:1, RECORDS_OUT_INTERMEDIATE:1, | Operator.java:10382021-03-25 15:33:29,029 | ERROR | main | Error running child : java.lang.StackOverflowError at org.apache.parquet.bytes.BytesUtils.readIntLittleEndian(BytesUtils.java:83) at org.apache.parquet.column.values.plain.BinaryPlainValuesReader.readBytes(BinaryPlainValuesReader.java:39) at org.apache.parquet.column.impl.ColumnReaderBase$2$6.read(ColumnReaderBase.java:344) at org.apache.parquet.column.impl.ColumnReaderBase.readValue(ColumnReaderBase.java:503) at org.apache.parquet.column.impl.ColumnReaderImpl.readValue(ColumnReaderImpl.java:30) at org.apache.parquet.column.impl.ColumnReaderBase.writeCurrentValueToConverter(ColumnReaderBase.java:409) at org.apache.parquet.column.impl.ColumnReaderImpl.writeCurrentValueToConverter(ColumnReaderImpl.java:30) at org.apache.parquet.io.RecordReaderImplementation.read(RecordReaderImplementation.java:406) at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:226) at org.apache.parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:207) at org.apache.hadoop.hive.ql.io.parquet.read.ParquetRecordReaderWrapper.next(ParquetRecordReaderWrapper.java:159) at org.apache.hadoop.hive.ql.io.parquet.read.ParquetRecordReaderWrapper.next(ParquetRecordReaderWrapper.java:41) at org.apache.hudi.hadoop.realtime.RealtimeCompactedRecordReader.next(RealtimeCompactedRecordReader.java:84) at org.apache.hudi.hadoop.realtime.RealtimeCompactedRecordReader.next(RealtimeCompactedRecordReader.java:106) at org.apache.hudi.hadoop.realtime.RealtimeCompactedRecordReader.next(RealtimeCompactedRecordReader.java:106) at org.apache.hudi.hadoop.realtime.RealtimeCompactedRecordReader.next(RealtimeCompactedRecordReader.java:106) at org.apache.hudi.hadoop.realtime.Real timeCompactedRecordReader.next(RealtimeCompactedRecordReader.java:106) at org.apache.hudi.hadoop.realtime.RealtimeCompactedRecordReader.next(RealtimeCompactedRecordReader.java:106) at org.apache.hudi.hadoop.realtime.RealtimeCompactedRecordReader.next(RealtimeCompactedRecordReader.java:106) at org.apache.hudi.hadoop.realtime.RealtimeCompactedRecordReader.next(RealtimeCompactedRecordReader.java:106) at org.apache.hudi.hadoop.realtime.RealtimeCompactedRecordReader.next(RealtimeCompactedRecordReader.java:106) at org.apache.hudi.hadoop.realtime.RealtimeCompactedRecordReader.next(RealtimeCompactedRecordReader.java:106) at org.apache.hudi.hadoop.realtime.RealtimeCompactedRecordReader.next(RealtimeCompactedRecordReader.java:106) at org.apache.hudi.hadoop.realtime.RealtimeCompactedRecordReader.next(RealtimeCompactedRecordReader.java:106) at org.apache.hudi.hadoop.realtime.RealtimeCompactedRecordReader.next(RealtimeCompactedRecordReader.java:106) at org.apache.hudi.hadoop.realtime.RealtimeCom pactedRecordReader.next(RealtimeCompactedRecordReader.java:106) at org.apache.hudi.hadoop.realtime.RealtimeCompactedRecordReader.next(RealtimeCompactedRecordReader.java:106) at org.apache.hudi.hadoop.realtime.RealtimeCompactedRecordReader.next(RealtimeCompactedRecordReader.java:106) at org.apache.hudi.hadoop.realtime.RealtimeCompactedRecordReader.next(RealtimeCompactedRecordReader.java:106) at org.apache.hudi.hadoop.realtime.RealtimeCompactedRecordReader.next(RealtimeCompactedRecordReader.java:106) at org.apache.hudi.hadoop.realtime.RealtimeCompactedRecordReader.next(RealtimeCompactedRecordReader.java:106) at org.apache.hudi.hadoop.realtime.RealtimeCompactedRecordReader.next(RealtimeCompactedRecordReader.java:106) After patch: select count(col3) from hive_9b_rt +---------+ | _c0 | +---------+ | 100000 | +---------+ step4: val df = spark.range(900000, 1000000).toDF("keyid") .withColumn("col3", expr("keyid + 10000000")) .withColumn("p", lit(0)) .withColumn("p1", lit(0)) .withColumn("p2", lit(7)) .withColumn("a1", lit(Array[String] ("sb1", "rz"))) .withColumn("a2", lit(Array[String] ("sb1", "rz"))) // delete remaining 10W lines delete(df, 4, "default", "hive_9b") sparksql/hive-beeline: select count(col3) from hive_9b_rt; +------+ | _c0 | +------+ | 0 | +------+ delete function: def delete(df: DataFrame, par: Int, db: String, tableName: String, tableType: String = DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL): Unit = { df.write.format("hudi"). option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, tableType). option(HoodieCompactionConfig.INLINE_COMPACT_PROP, false). option(PRECOMBINE_FIELD_OPT_KEY, "col3"). option(RECORDKEY_FIELD_OPT_KEY, "keyid"). option(PARTITIONPATH_FIELD_OPT_KEY, "p,p1,p2"). option(HoodieWriteConfig.KEYGENERATOR_CLASS_PROP, classOf[ComplexKeyGenerator].getName). option(DataSourceWriteOptions.OPERATION_OPT_KEY, "delete"). option("hoodie.bulkinsert.shuffle.parallelism", par.toString). option("hoodie.insert.shuffle.parallelism", par.toString). option("hoodie.upsert.shuffle.parallelism", par.toString). option("hoodie.delete.shuffle.parallelism", par.toString). option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY, "true"). option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY, "p,p1,p2"). option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, "org.apache.hudi.hive.MultiPartKeysValueExtractor"). option("hoodie.datasource.hive_sync.support_timestamp", "true"). option(HIVE_STYLE_PARTITIONING_OPT_KEY, "true"). option(HIVE_USE_JDBC_OPT_KEY, "false"). option(DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY, db). option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY, tableName). option(TABLE_NAME, tableName).mode(Append).save(s"/tmp/${db}/${tableName}") } merge function: def merge(df: org.apache.spark.sql.DataFrame, par: Int, db: String, tableName: String, tableType: String = DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL, hivePartitionExtract: String = "org.apache.hudi.hive.MultiPartKeysValueExtractor", op: String = "upsert"): Unit = { val mode = if (op.equals("bulk_insert")) { Overwrite } else { Append } df.write.format("hudi"). option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, tableType). option(HoodieCompactionConfig.INLINE_COMPACT_PROP, false). option(PRECOMBINE_FIELD_OPT_KEY, "col3"). option(RECORDKEY_FIELD_OPT_KEY, "keyid"). option(PARTITIONPATH_FIELD_OPT_KEY, "p,p1,p2"). option(DataSourceWriteOptions.OPERATION_OPT_KEY, op). option(HoodieWriteConfig.KEYGENERATOR_CLASS_PROP, classOf[ComplexKeyGenerator].getName). option("hoodie.bulkinsert.shuffle.parallelism", par.toString). option("hoodie.metadata.enable", "false"). option("hoodie.insert.shuffle.parallelism", par.toString). option("hoodie.upsert.shuffle.parallelism", par.toString). option("hoodie.delete.shuffle.parallelism", par.toString). option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY, "true"). option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY, "p,p1,p2"). option("hoodie.datasource.hive_sync.support_timestamp", "true"). option(HIVE_STYLE_PARTITIONING_OPT_KEY, "true"). option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, "org.apache.hudi.hive.MultiPartKeysValueExtractor"). option(HIVE_USE_JDBC_OPT_KEY, "false"). option(DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY, db). option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY, tableName). option(TABLE_NAME, tableName).mode(mode).save(s"/tmp/${db}/${tableName}") } -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org