xiarixiaoyao commented on pull request #2721:
URL: https://github.com/apache/hudi/pull/2721#issuecomment-806582989
test step:
before patch:
step1:
val df = spark.range(0, 1000000).toDF("keyid")
.withColumn("col3", expr("keyid + 10000000"))
.withColumn("p", lit(0))
.withColumn("p1", lit(0))
.withColumn("p2", lit(7))
.withColumn("a1", lit(Array[String] ("sb1", "rz")))
.withColumn("a2", lit(Array[String] ("sb1", "rz")))
// bulk_insert 100w row (keyid from 0 to 1000000)
merge(df, 4, "default", "hive_9b",
DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL, op = "bulk_insert")
step2:
val df = spark.range(0, 900000).toDF("keyid")
.withColumn("col3", expr("keyid + 10000000"))
.withColumn("p", lit(0))
.withColumn("p1", lit(0))
.withColumn("p2", lit(7))
.withColumn("a1", lit(Array[String] ("sb1", "rz")))
.withColumn("a2", lit(Array[String] ("sb1", "rz")))
// delete 90w row (keyid from 0 to 900000)
delete(df, 4, "default", "hive_9b")
step3:
query on beeline/spark-sql : select count(col3) from hive_9b_rt
2021-03-25 15:33:29,029 | INFO | main | RECORDS_OUT_OPERATOR_RS_3:1,
RECORDS_OUT_INTERMEDIATE:1, | Operator.java:10382021-03-25 15:33:29,029 | INFO
| main | RECORDS_OUT_OPERATOR_RS_3:1, RECORDS_OUT_INTERMEDIATE:1, |
Operator.java:10382021-03-25 15:33:29,029 | ERROR | main | Error running child
: java.lang.StackOverflowError at
org.apache.parquet.bytes.BytesUtils.readIntLittleEndian(BytesUtils.java:83) at
org.apache.parquet.column.values.plain.BinaryPlainValuesReader.readBytes(BinaryPlainValuesReader.java:39)
at
org.apache.parquet.column.impl.ColumnReaderBase$2$6.read(ColumnReaderBase.java:344)
at
org.apache.parquet.column.impl.ColumnReaderBase.readValue(ColumnReaderBase.java:503)
at
org.apache.parquet.column.impl.ColumnReaderImpl.readValue(ColumnReaderImpl.java:30)
at
org.apache.parquet.column.impl.ColumnReaderBase.writeCurrentValueToConverter(ColumnReaderBase.java:409)
at
org.apache.parquet.column.impl.ColumnReaderImpl.writeCurrentValueToConverter(ColumnReaderImpl.java:30)
at
org.apache.parquet.io.RecordReaderImplementation.read(RecordReaderImplementation.java:406)
at
org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:226)
at
org.apache.parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:207)
at
org.apache.hadoop.hive.ql.io.parquet.read.ParquetRecordReaderWrapper.next(ParquetRecordReaderWrapper.java:159)
at
org.apache.hadoop.hive.ql.io.parquet.read.ParquetRecordReaderWrapper.next(ParquetRecordReaderWrapper.java:41)
at
org.apache.hudi.hadoop.realtime.RealtimeCompactedRecordReader.next(RealtimeCompactedRecordReader.java:84)
at
org.apache.hudi.hadoop.realtime.RealtimeCompactedRecordReader.next(RealtimeCompactedRecordReader.java:106)
at
org.apache.hudi.hadoop.realtime.RealtimeCompactedRecordReader.next(RealtimeCompactedRecordReader.java:106)
at
org.apache.hudi.hadoop.realtime.RealtimeCompactedRecordReader.next(RealtimeCompactedRecordReader.java:106)
at org.apache.hudi.hadoop.realtime.Real
timeCompactedRecordReader.next(RealtimeCompactedRecordReader.java:106) at
org.apache.hudi.hadoop.realtime.RealtimeCompactedRecordReader.next(RealtimeCompactedRecordReader.java:106)
at
org.apache.hudi.hadoop.realtime.RealtimeCompactedRecordReader.next(RealtimeCompactedRecordReader.java:106)
at
org.apache.hudi.hadoop.realtime.RealtimeCompactedRecordReader.next(RealtimeCompactedRecordReader.java:106)
at
org.apache.hudi.hadoop.realtime.RealtimeCompactedRecordReader.next(RealtimeCompactedRecordReader.java:106)
at
org.apache.hudi.hadoop.realtime.RealtimeCompactedRecordReader.next(RealtimeCompactedRecordReader.java:106)
at
org.apache.hudi.hadoop.realtime.RealtimeCompactedRecordReader.next(RealtimeCompactedRecordReader.java:106)
at
org.apache.hudi.hadoop.realtime.RealtimeCompactedRecordReader.next(RealtimeCompactedRecordReader.java:106)
at
org.apache.hudi.hadoop.realtime.RealtimeCompactedRecordReader.next(RealtimeCompactedRecordReader.java:106)
at org.apache.hudi.hadoop.realtime.RealtimeCom
pactedRecordReader.next(RealtimeCompactedRecordReader.java:106) at
org.apache.hudi.hadoop.realtime.RealtimeCompactedRecordReader.next(RealtimeCompactedRecordReader.java:106)
at
org.apache.hudi.hadoop.realtime.RealtimeCompactedRecordReader.next(RealtimeCompactedRecordReader.java:106)
at
org.apache.hudi.hadoop.realtime.RealtimeCompactedRecordReader.next(RealtimeCompactedRecordReader.java:106)
at
org.apache.hudi.hadoop.realtime.RealtimeCompactedRecordReader.next(RealtimeCompactedRecordReader.java:106)
at
org.apache.hudi.hadoop.realtime.RealtimeCompactedRecordReader.next(RealtimeCompactedRecordReader.java:106)
at
org.apache.hudi.hadoop.realtime.RealtimeCompactedRecordReader.next(RealtimeCompactedRecordReader.java:106)
After patch:
select count(col3) from hive_9b_rt
+---------+
| _c0 |
+---------+
| 100000 |
+---------+
step4:
val df = spark.range(900000, 1000000).toDF("keyid")
.withColumn("col3", expr("keyid + 10000000"))
.withColumn("p", lit(0))
.withColumn("p1", lit(0))
.withColumn("p2", lit(7))
.withColumn("a1", lit(Array[String] ("sb1", "rz")))
.withColumn("a2", lit(Array[String] ("sb1", "rz")))
// delete remaining 10W lines
delete(df, 4, "default", "hive_9b")
sparksql/hive-beeline:
select count(col3) from hive_9b_rt;
+------+
| _c0 |
+------+
| 0 |
+------+
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]