xiarixiaoyao edited a comment on pull request #2716:
URL: https://github.com/apache/hudi/pull/2716#issuecomment-806535452
test env: spark2.4.5, hadoop 3.1.1, hive 3.1.1
before patch:
step1:
val df = spark.range(0, 10000).toDF("keyid")
.withColumn("col3", expr("keyid + 10000000"))
.withColumn("p", lit(0))
.withColumn("p1", lit(0))
.withColumn("p2", lit(6))
.withColumn("a1", lit( Array[String] ("sb1", "rz") ) )
.withColumn("a2", lit( Array[String] ("sb1", "rz") ) )
// bulk_insert df, partition by p,p1,p2
merge(df, 4, "default", "hive_8b",
DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL, op = "bulk_insert")
step2:
val df = spark.range(0, 10000).toDF("keyid")
.withColumn("col3", expr("keyid + 10000000"))
.withColumn("p", lit(0))
.withColumn("p1", lit(0))
.withColumn("p2", lit(7))
.withColumn("a1", lit( Array[String] ("sb1", "rz") ) )
.withColumn("a2", lit( Array[String] ("sb1", "rz") ) )
// upsert table hive8b
merge(df, 4, "default", "hive_8b",
DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL, op = "upsert")
step3:
start hive beeline:
set
hive.input.format=org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat;
set hoodie.hive_8b.consume.mode=INCREMENTAL;
set hoodie.hive_8b.consume.max.commits=3;
set hoodie.hive_8b.consume.start.timestamp=20210325141300; // this
timestamp is smaller the earlist commit, so we can query whole commits
select `p`, `p1`, `p2`,`keyid` from hive_8b_rt where
`_hoodie_commit_time`>'20210325141300'
2021-03-25 14:14:36,036 | INFO | AsyncDispatcher event handler |
Diagnostics report from attempt_1615883368881_0028_m_000000_3: Error:
org.apache.hudi.org.apache.avro.SchemaParseException: Illegal character in:
**p,p1,p2** 2021-03-25 14:14:36,036 | INFO | AsyncDispatcher event handler |
Diagnostics report from attempt_1615883368881_0028_m_000000_3: Error:
org.apache.hudi.org.apache.avro.SchemaParseException: Illegal character in:
**p,p1,p2** at
org.apache.hudi.org.apache.avro.Schema.validateName(Schema.java:1151) at
org.apache.hudi.org.apache.avro.Schema.access$200(Schema.java:81) at
org.apache.hudi.org.apache.avro.Schema$Field.<init>(Schema.java:403) at
org.apache.hudi.org.apache.avro.Schema$Field.<init>(Schema.java:396) at
org.apache.hudi.avro.HoodieAvroUtils.appendNullSchemaFields(HoodieAvroUtils.java:268)
at
org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.addPartitionFields(HoodieRealtimeRecordReaderUtils.java:286)
at org.apache.hudi.hadoop.realtime.AbstractReal
timeRecordReader.init(AbstractRealtimeRecordReader.java:98) at
org.apache.hudi.hadoop.realtime.AbstractRealtimeRecordReader.<init>(AbstractRealtimeRecordReader.java:67)
at
org.apache.hudi.hadoop.realtime.RealtimeCompactedRecordReader.<init>(RealtimeCompactedRecordReader.java:53)
at
org.apache.hudi.hadoop.realtime.HoodieRealtimeRecordReader.constructRecordReader(HoodieRealtimeRecordReader.java:70)
at
org.apache.hudi.hadoop.realtime.HoodieRealtimeRecordReader.<init>(HoodieRealtimeRecordReader.java:47)
at
org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat.getRecordReader(HoodieParquetRealtimeInputFormat.java:123)
at
org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat$HoodieCombineFileInputFormatShim.getRecordReader(HoodieCombineHiveInputFormat.java:975)
at
org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat.getRecordReader(HoodieCombineHiveInputFormat.java:556)
at
org.apache.hadoop.mapred.MapTask$TrackedRecordReader.<init>(MapTask.java:175)
at org.apache.hadoop.
mapred.MapTask.runOldMapper(MapTask.java:444) at
org.apache.hadoop.mapred.MapTask.run(MapTask.java:349) at
org.apache.hadoop.mapred.YarnChild$1.run(YarnChild.java:183) at
java.security.AccessController.doPrivileged(Native Method) at
javax.security.auth.Subject.doAs(Subject.java:422) at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1761)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:177)
after patch:
+----+-----+-----+--------+
| p | p1 | p2 | keyid |
+----+-----+-----+--------+
| 0 | 0 | 6 | 5068 |
| 0 | 0 | 6 | 6058 |
| 0 | 0 | 6 | 823 |
| 0 | 0 | 6 | 5031 |
| 0 | 0 | 6 | 4445 |
| 0 | 0 | 6 | 5082 |
merge function:
def merge(df: org.apache.spark.sql.DataFrame, par: Int, db: String,
tableName: String,
tableType: String =
DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL,
hivePartitionExtract: String =
"org.apache.hudi.hive.MultiPartKeysValueExtractor", op: String = "upsert"):
Unit = {
val mode = if (op.equals("bulk_insert")) {
Overwrite
} else {
Append
}
df.write.format("hudi").
option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, tableType).
option(HoodieCompactionConfig.INLINE_COMPACT_PROP, false).
option(PRECOMBINE_FIELD_OPT_KEY, "col3").
option(RECORDKEY_FIELD_OPT_KEY, "keyid").
option(PARTITIONPATH_FIELD_OPT_KEY, "p,p1,p2").
option(DataSourceWriteOptions.OPERATION_OPT_KEY, op).
option(HoodieWriteConfig.KEYGENERATOR_CLASS_PROP,
classOf[ComplexKeyGenerator].getName).
option("hoodie.bulkinsert.shuffle.parallelism", par.toString).
option("hoodie.metadata.enable", "false").
option("hoodie.insert.shuffle.parallelism", par.toString).
option("hoodie.upsert.shuffle.parallelism", par.toString).
option("hoodie.delete.shuffle.parallelism", par.toString).
option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY, "true").
option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY,
"p,p1,p2").
option("hoodie.datasource.hive_sync.support_timestamp", "true").
option(HIVE_STYLE_PARTITIONING_OPT_KEY, "true").
option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY,
"org.apache.hudi.hive.MultiPartKeysValueExtractor").
option(HIVE_USE_JDBC_OPT_KEY, "false").
option(DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY, db).
option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY, tableName).
option(TABLE_NAME,
tableName).mode(mode).save(s"/tmp/${db}/${tableName}")
}
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]