wuzhenhua01 commented on PR #10220:
URL: https://github.com/apache/hudi/pull/10220#issuecomment-1838133272
> Can you elaborate a little more about the operation procedure that can
reproduce this exception? That would help a lot for understanding the fix.
step 1, i use bootstrapping convert parquet table to hudi, like this
```shell
spark-submit \
--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer \
/cluster/hudi/hudi-utilities-bundle_2.11-0.12.2.jar \
--target-base-path /tmp/t1 --table-type MERGE_ON_READ \
--target-table t1 \
--run-bootstrap \
--bootstrap-overwrite \
--hoodie-conf
hoodie.bootstrap.base.path=hdfs://hacluster/user/hive/warehouse/t1 \
--hoodie-conf hoodie.datasource.write.recordkey.field=id \
--hoodie-conf hoodie.datasource.write.keygenerator.type=NON_PARTITION \
--hoodie-conf hoodie.bootstrap.parallelism=2 \
```
step 2, use flink datastrem writer to this hudi table and turn on compact,
like this
```scala
val hudiProps = DFSPropertiesConfiguration.getGlobalProps
val hudiConf = Configuration.fromMap(hudiProps.asInstanceOf[JMap[String,
String]])
conf.addAll(hudiConf)
OptionsInference.setupSinkTasks(conf, env.getParallelism)
val rowType = RowType.of(false, Array(new TimestampType, new BigIntType,
new VarCharType, new VarCharType), Array("_origin_op_ts", "id", "content",
"date"))
conf.setString(FlinkOptions.SOURCE_AVRO_SCHEMA,
AvroSchemaConverter.convertToSchema(rowType).toString)
conf.setString(FlinkOptions.PATH, "/tmp/t1")
conf.setString(FlinkOptions.TABLE_TYPE,
FlinkOptions.TABLE_TYPE_MERGE_ON_READ)
conf.setString(FlinkOptions.TABLE_NAME, "t1")
conf.setString(FlinkOptions.PRECOMBINE_FIELD, "_origin_op_ts")
conf.setString(FlinkOptions.RECORD_KEY_FIELD, "id")
conf.setBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING, true)
conf.setString(FlinkOptions.KEYGEN_TYPE,
KeyGeneratorType.NON_PARTITION.name)
conf.setBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED, false)
val hoodieRecordDataStream = Pipelines.bootstrap(conf, rowType,
rowDataDataStream.javaStream)
val pipeline = Pipelines.hoodieStreamWrite(conf, hoodieRecordDataStream)
conf.setString(FlinkOptions.COMPACTION_TRIGGER_STRATEGY,
FlinkOptions.TIME_ELAPSED)
conf.setInteger(FlinkOptions.COMPACTION_DELTA_SECONDS,
TimeUnit.SECONDS.toSeconds(120).toInt)
Pipelines.compact(conf, pipeline)
env.execute
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]