xiarixiaoyao commented on pull request #2720:
URL: https://github.com/apache/hudi/pull/2720#issuecomment-806562011
test env: spark2.4.5, hadoop 3.1.1, hive 3.1.1
before patch:
test step:
step1:
val df = spark.range(0, 10000).toDF("keyid")
.withColumn("col3", expr("keyid + 10000000"))
.withColumn("p", lit(0))
.withColumn("p1", lit(0))
.withColumn("p2", lit(6))
.withColumn("a1", lit(Array[String] ("sb1", "rz")))
.withColumn("a2", lit(Array[String] ("sb1", "rz")))
// create hudi table which has three level partitions p,p1,p2
merge(df, 4, "default", "hive_8b",
DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL, op = "bulk_insert")
step2:
val df = spark.range(0, 10000).toDF("keyid")
.withColumn("col3", expr("keyid + 10000000"))
.withColumn("p", lit(0))
.withColumn("p1", lit(0))
.withColumn("p2", lit(7))
.withColumn("a1", lit(Array[String] ("sb1", "rz")))
.withColumn("a2", lit(Array[String] ("sb1", "rz")))
// upsert current table
merge(df, 4, "default", "hive_8b",
DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL, op = "upsert")
hive beeline:
set
hive.input.format=org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat;
set hoodie.hive_8b.consume.mode=INCREMENTAL;
set hoodie.hive_8b.consume.max.commits=3;
set hoodie.hive_8b.consume.start.timestamp=20210325141300; // this timestamp
is smaller the earlist commit, so we can query whole commits
select `p`, `p1`, `p2`,`keyid` from hive_8b_rt where
`_hoodie_commit_time`>'20210325141300' and `keyid` < 5;
query result:
+----+-----+-----+--------+
| p | p1 | p2 | keyid |
+----+-----+-----+--------+
| 0 | 0 | 6 | 0 |
| 0 | 0 | 6 | 1 |
| 0 | 0 | 6 | 2 |
| 0 | 0 | 6 | 3 |
| 0 | 0 | 6 | 4 |
| 0 | 0 | 6 | 4 |
| 0 | 0 | 6 | 0 |
| 0 | 0 | 6 | 3 |
| 0 | 0 | 6 | 2 |
| 0 | 0 | 6 | 1 |
+----+-----+-----+--------+
this result is wrong, since the second step we insert new data in table
partition p2=7, however in the query result we cannot find p2=7, all p2= 6
After patch:
+----+-----+-----+--------+
| p | p1 | p2 | keyid |
+----+-----+-----+--------+
| 0 | 0 | 6 | 0 |
| 0 | 0 | 6 | 1 |
| 0 | 0 | 6 | 2 |
| 0 | 0 | 6 | 3 |
| 0 | 0 | 6 | 4 |
| 0 | 0 | 7 | 4 |
| 0 | 0 | 7 | 0 |
| 0 | 0 | 7 | 3 |
| 0 | 0 | 7 | 2 |
| 0 | 0 | 7 | 1 |
+----+-----+-----+--------+
this result is correct.
merge function:
def merge(df: org.apache.spark.sql.DataFrame, par: Int, db: String,
tableName: String,
tableType: String = DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL,
hivePartitionExtract: String =
"org.apache.hudi.hive.MultiPartKeysValueExtractor", op: String = "upsert"):
Unit = {
val mode = if (op.equals("bulk_insert")) {
Overwrite
} else {
Append
}
df.write.format("hudi").
option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, tableType).
option(HoodieCompactionConfig.INLINE_COMPACT_PROP, false).
option(PRECOMBINE_FIELD_OPT_KEY, "col3").
option(RECORDKEY_FIELD_OPT_KEY, "keyid").
option(PARTITIONPATH_FIELD_OPT_KEY, "p,p1,p2").
option(DataSourceWriteOptions.OPERATION_OPT_KEY, op).
option(HoodieWriteConfig.KEYGENERATOR_CLASS_PROP,
classOf[ComplexKeyGenerator].getName).
option("hoodie.bulkinsert.shuffle.parallelism", par.toString).
option("hoodie.metadata.enable", "false").
option("hoodie.insert.shuffle.parallelism", par.toString).
option("hoodie.upsert.shuffle.parallelism", par.toString).
option("hoodie.delete.shuffle.parallelism", par.toString).
option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY, "true").
option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY, "p,p1,p2").
option("hoodie.datasource.hive_sync.support_timestamp", "true").
option(HIVE_STYLE_PARTITIONING_OPT_KEY, "true").
option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY,
"org.apache.hudi.hive.MultiPartKeysValueExtractor").
option(HIVE_USE_JDBC_OPT_KEY, "false").
option(DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY, db).
option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY, tableName).
option(TABLE_NAME, tableName).mode(mode).save(s"/tmp/${db}/${tableName}")
}
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]