xiarixiaoyao edited a comment on pull request #2721:
URL: https://github.com/apache/hudi/pull/2721#issuecomment-806582989
test step:
before patch:
step1:
val df = spark.range(0, 1000000).toDF("keyid")
.withColumn("col3", expr("keyid + 10000000"))
.withColumn("p", lit(0))
.withColumn("p1", lit(0))
.withColumn("p2", lit(7))
.withColumn("a1", lit(Array[String] ("sb1", "rz")))
.withColumn("a2", lit(Array[String] ("sb1", "rz")))
// bulk_insert 100w row (keyid from 0 to 1000000)
merge(df, 4, "default", "hive_9b",
DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL, op = "bulk_insert")
step2:
val df = spark.range(0, 900000).toDF("keyid")
.withColumn("col3", expr("keyid + 10000000"))
.withColumn("p", lit(0))
.withColumn("p1", lit(0))
.withColumn("p2", lit(7))
.withColumn("a1", lit(Array[String] ("sb1", "rz")))
.withColumn("a2", lit(Array[String] ("sb1", "rz")))
// delete 90w row (keyid from 0 to 900000)
delete(df, 4, "default", "hive_9b")
step3:
query on beeline/spark-sql : select count(col3) from hive_9b_rt
2021-03-25 15:33:29,029 | INFO | main | RECORDS_OUT_OPERATOR_RS_3:1,
RECORDS_OUT_INTERMEDIATE:1, | Operator.java:10382021-03-25 15:33:29,029 | INFO
| main | RECORDS_OUT_OPERATOR_RS_3:1, RECORDS_OUT_INTERMEDIATE:1, |
Operator.java:10382021-03-25 15:33:29,029 | ERROR | main | Error running child
: java.lang.StackOverflowError at
org.apache.parquet.bytes.BytesUtils.readIntLittleEndian(BytesUtils.java:83) at
org.apache.parquet.column.values.plain.BinaryPlainValuesReader.readBytes(BinaryPlainValuesReader.java:39)
at
org.apache.parquet.column.impl.ColumnReaderBase$2$6.read(ColumnReaderBase.java:344)
at
org.apache.parquet.column.impl.ColumnReaderBase.readValue(ColumnReaderBase.java:503)
at
org.apache.parquet.column.impl.ColumnReaderImpl.readValue(ColumnReaderImpl.java:30)
at
org.apache.parquet.column.impl.ColumnReaderBase.writeCurrentValueToConverter(ColumnReaderBase.java:409)
at
org.apache.parquet.column.impl.ColumnReaderImpl.writeCurrentValueToConverter(ColumnReaderImpl.java:30)
at
org.apache.parquet.io.RecordReaderImplementation.read(RecordReaderImplementation.java:406)
at
org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:226)
at
org.apache.parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:207)
at
org.apache.hadoop.hive.ql.io.parquet.read.ParquetRecordReaderWrapper.next(ParquetRecordReaderWrapper.java:159)
at
org.apache.hadoop.hive.ql.io.parquet.read.ParquetRecordReaderWrapper.next(ParquetRecordReaderWrapper.java:41)
at
org.apache.hudi.hadoop.realtime.RealtimeCompactedRecordReader.next(RealtimeCompactedRecordReader.java:84)
at
org.apache.hudi.hadoop.realtime.RealtimeCompactedRecordReader.next(RealtimeCompactedRecordReader.java:106)
at
org.apache.hudi.hadoop.realtime.RealtimeCompactedRecordReader.next(RealtimeCompactedRecordReader.java:106)
at
org.apache.hudi.hadoop.realtime.RealtimeCompactedRecordReader.next(RealtimeCompactedRecordReader.java:106)
at org.apache.hudi.hadoop.realtime.Real
timeCompactedRecordReader.next(RealtimeCompactedRecordReader.java:106) at
org.apache.hudi.hadoop.realtime.RealtimeCompactedRecordReader.next(RealtimeCompactedRecordReader.java:106)
at
org.apache.hudi.hadoop.realtime.RealtimeCompactedRecordReader.next(RealtimeCompactedRecordReader.java:106)
at
org.apache.hudi.hadoop.realtime.RealtimeCompactedRecordReader.next(RealtimeCompactedRecordReader.java:106)
at
org.apache.hudi.hadoop.realtime.RealtimeCompactedRecordReader.next(RealtimeCompactedRecordReader.java:106)
at
org.apache.hudi.hadoop.realtime.RealtimeCompactedRecordReader.next(RealtimeCompactedRecordReader.java:106)
at
org.apache.hudi.hadoop.realtime.RealtimeCompactedRecordReader.next(RealtimeCompactedRecordReader.java:106)
at
org.apache.hudi.hadoop.realtime.RealtimeCompactedRecordReader.next(RealtimeCompactedRecordReader.java:106)
at
org.apache.hudi.hadoop.realtime.RealtimeCompactedRecordReader.next(RealtimeCompactedRecordReader.java:106)
at org.apache.hudi.hadoop.realtime.RealtimeCom
pactedRecordReader.next(RealtimeCompactedRecordReader.java:106) at
org.apache.hudi.hadoop.realtime.RealtimeCompactedRecordReader.next(RealtimeCompactedRecordReader.java:106)
at
org.apache.hudi.hadoop.realtime.RealtimeCompactedRecordReader.next(RealtimeCompactedRecordReader.java:106)
at
org.apache.hudi.hadoop.realtime.RealtimeCompactedRecordReader.next(RealtimeCompactedRecordReader.java:106)
at
org.apache.hudi.hadoop.realtime.RealtimeCompactedRecordReader.next(RealtimeCompactedRecordReader.java:106)
at
org.apache.hudi.hadoop.realtime.RealtimeCompactedRecordReader.next(RealtimeCompactedRecordReader.java:106)
at
org.apache.hudi.hadoop.realtime.RealtimeCompactedRecordReader.next(RealtimeCompactedRecordReader.java:106)
After patch:
select count(col3) from hive_9b_rt
+---------+
| _c0 |
+---------+
| 100000 |
+---------+
step4:
val df = spark.range(900000, 1000000).toDF("keyid")
.withColumn("col3", expr("keyid + 10000000"))
.withColumn("p", lit(0))
.withColumn("p1", lit(0))
.withColumn("p2", lit(7))
.withColumn("a1", lit(Array[String] ("sb1", "rz")))
.withColumn("a2", lit(Array[String] ("sb1", "rz")))
// delete remaining 10W lines
delete(df, 4, "default", "hive_9b")
sparksql/hive-beeline:
select count(col3) from hive_9b_rt;
+------+
| _c0 |
+------+
| 0 |
+------+
delete function:
def delete(df: DataFrame, par: Int, db: String, tableName: String,
tableType: String = DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL): Unit = {
df.write.format("hudi").
option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, tableType).
option(HoodieCompactionConfig.INLINE_COMPACT_PROP, false).
option(PRECOMBINE_FIELD_OPT_KEY, "col3").
option(RECORDKEY_FIELD_OPT_KEY, "keyid").
option(PARTITIONPATH_FIELD_OPT_KEY, "p,p1,p2").
option(HoodieWriteConfig.KEYGENERATOR_CLASS_PROP,
classOf[ComplexKeyGenerator].getName).
option(DataSourceWriteOptions.OPERATION_OPT_KEY, "delete").
option("hoodie.bulkinsert.shuffle.parallelism", par.toString).
option("hoodie.insert.shuffle.parallelism", par.toString).
option("hoodie.upsert.shuffle.parallelism", par.toString).
option("hoodie.delete.shuffle.parallelism", par.toString).
option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY, "true").
option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY,
"p,p1,p2").
option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY,
"org.apache.hudi.hive.MultiPartKeysValueExtractor").
option("hoodie.datasource.hive_sync.support_timestamp", "true").
option(HIVE_STYLE_PARTITIONING_OPT_KEY, "true").
option(HIVE_USE_JDBC_OPT_KEY, "false").
option(DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY, db).
option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY, tableName).
option(TABLE_NAME,
tableName).mode(Append).save(s"/tmp/${db}/${tableName}")
}
merge function:
def merge(df: org.apache.spark.sql.DataFrame, par: Int, db: String,
tableName: String,
tableType: String = DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL,
hivePartitionExtract: String =
"org.apache.hudi.hive.MultiPartKeysValueExtractor", op: String = "upsert"):
Unit = {
val mode = if (op.equals("bulk_insert")) {
Overwrite
} else {
Append
}
df.write.format("hudi").
option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, tableType).
option(HoodieCompactionConfig.INLINE_COMPACT_PROP, false).
option(PRECOMBINE_FIELD_OPT_KEY, "col3").
option(RECORDKEY_FIELD_OPT_KEY, "keyid").
option(PARTITIONPATH_FIELD_OPT_KEY, "p,p1,p2").
option(DataSourceWriteOptions.OPERATION_OPT_KEY, op).
option(HoodieWriteConfig.KEYGENERATOR_CLASS_PROP,
classOf[ComplexKeyGenerator].getName).
option("hoodie.bulkinsert.shuffle.parallelism", par.toString).
option("hoodie.metadata.enable", "false").
option("hoodie.insert.shuffle.parallelism", par.toString).
option("hoodie.upsert.shuffle.parallelism", par.toString).
option("hoodie.delete.shuffle.parallelism", par.toString).
option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY, "true").
option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY, "p,p1,p2").
option("hoodie.datasource.hive_sync.support_timestamp", "true").
option(HIVE_STYLE_PARTITIONING_OPT_KEY, "true").
option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY,
"org.apache.hudi.hive.MultiPartKeysValueExtractor").
option(HIVE_USE_JDBC_OPT_KEY, "false").
option(DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY, db).
option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY, tableName).
option(TABLE_NAME, tableName).mode(mode).save(s"/tmp/${db}/${tableName}")
}
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]