zyclove commented on issue #10407:
URL: https://github.com/apache/hudi/issues/10407#issuecomment-1870952480

   @parisni @beyond1920 
   Thank you very much for helping me take a look. The code changes before and 
after the upgrade are as follows. Is there any good way to merge layers into 
new formats now?
   
   Before upgrading
   ```java
    private void upsertAllAction(Dataset<Row> jsonDataSet, long maxUseMemory, 
String tempPath) {
           int dataKeepTime = 3 * 24 * 60 / config.getTriggerTime();
           jsonDataSet.write()
                   .format("org.apache.hudi")
                   .option(HoodieTableConfig.TYPE.key(), 
HoodieTableType.MERGE_ON_READ.name())
                   .option(DataSourceWriteOptions.OPERATION().key(), 
WriteOperationType.UPSERT.value())
                   .option(DataSourceWriteOptions.TABLE_TYPE().key(), 
HoodieTableType.MERGE_ON_READ.name())
                   .option(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), 
config.getIdName())
                   .option(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(), 
Constants.DT)
                   
.option(KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_ENABLE.key(), true)
                   .option(HoodieWriteConfig.PRECOMBINE_FIELD_NAME.key(), 
Constants.UPDATE_TIME)
                   .option(HoodieWriteConfig.COMBINE_BEFORE_UPSERT.key(), false)
                   .option(HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key(), 
config.getUpsertParallelism())
                   
.option(HoodieWriteConfig.FINALIZE_WRITE_PARALLELISM_VALUE.key(), 
config.getUpsertParallelism())
                   .option(HoodieWriteConfig.WRITE_PAYLOAD_CLASS_NAME.key(), 
DefaultHoodieRecordPayload.class.getName())
                   
.option(HoodieWriteConfig.AVRO_EXTERNAL_SCHEMA_TRANSFORMATION_ENABLE.key(), 
true)
                   .option(HoodieWriteConfig.AVRO_SCHEMA_VALIDATE_ENABLE.key(), 
true)
   //                
.option(HoodieWriteConfig.SCHEMA_ALLOW_AUTO_EVOLUTION_COLUMN_DROP.key(), true)
                   .option(HoodieCommonConfig.RECONCILE_SCHEMA.key(), true)
                   .option(HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE.key(), 
true)
                   .option(HoodieWriteConfig.MARKERS_TYPE.key(), 
MarkerType.DIRECT.toString())
                   .option(HoodieWriteConfig.COMBINE_BEFORE_INSERT.key(), true)
                   .option(HoodiePayloadConfig.PAYLOAD_CLASS_NAME.key(), 
DefaultHoodieRecordPayload.class.getName())
                   .option(HoodieCleanConfig.CLEANER_COMMITS_RETAINED.key(), 
dataKeepTime)
                   .option(HoodieCleanConfig.AUTO_CLEAN.key(), true)
                   
.option(HoodieCleanConfig.CLEANER_INCREMENTAL_MODE_ENABLE.key(), true)
                   .option(HoodieArchivalConfig.MIN_COMMITS_TO_KEEP.key(), 
dataKeepTime + 1)
                   .option(HoodieArchivalConfig.MAX_COMMITS_TO_KEEP.key(), 
dataKeepTime * 3)
                   
.option(HoodieCompactionConfig.TARGET_IO_PER_COMPACTION_IN_MB.key(), 500 * 1024)
                   .option(HoodieCleanConfig.CLEANER_POLICY.key(), 
HoodieCleaningPolicy.KEEP_LATEST_BY_HOURS.name())
                   .option(HoodieCleanConfig.CLEANER_HOURS_RETAINED.key(), 72)
                   
.option(HoodieCompactionConfig.PARQUET_SMALL_FILE_LIMIT.key(), 128 * 1024 * 
1024)
                   .option(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE.key(), 256 
* 1024 * 1024)
                   .option(HoodieCompactionConfig.INLINE_COMPACT.key(), true)
                   
.option(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key(), 0)
                   .option(HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH.key(), 
tempPath)
                   .option(HoodieMetadataConfig.ENABLE.key(), true)
                   .option(HoodieMetadataConfig.MIN_COMMITS_TO_KEEP.key(), 
dataKeepTime + 1)
                   .option(HoodieMetadataConfig.MAX_COMMITS_TO_KEEP.key(), 
dataKeepTime + 2)
                   .option(HoodieMetadataConfig.CLEANER_COMMITS_RETAINED.key(), 
dataKeepTime)
                   .option(HoodieMemoryConfig.MAX_MEMORY_FOR_MERGE.key(), 
maxUseMemory)
                   .option(HoodieMemoryConfig.MAX_MEMORY_FOR_COMPACTION.key(), 
maxUseMemory)
                   .option(HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP_KEY, 
Constants.UPDATE_TIME)
                   
.option(HoodiePayloadProps.PAYLOAD_EVENT_TIME_FIELD_PROP_KEY, 
Constants.UPDATE_TIME)
                   .option(HoodieTableConfig.NAME.key(), config.getName())
   //                .option(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key(), 
ComplexKeyGenerator.class.getName())
                   .option(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key(), 
SimpleKeyGenerator.class.getName())
                   .option(HoodieIndexConfig.INDEX_TYPE.key(), 
HoodieIndex.IndexType.SIMPLE.name())
   //                .partitionBy(HoodieIndexConfig.INDEX_CLASS_NAME.key(), 
HoodieSparkConsistentBucketIndex.class.getName())
                   
.option(DataSourceReadOptions.EXTRACT_PARTITION_VALUES_FROM_PARTITION_PATH().key(),
 true)
                   .option(HoodieIndexConfig.BUCKET_INDEX_NUM_BUCKETS.key(), 20)
                   .option(HoodieIndexConfig.BUCKET_INDEX_ENGINE_TYPE.key(), 
HoodieIndex.BucketIndexEngineType.SIMPLE.name())
   //                .option(HoodieIndexConfig.BUCKET_INDEX_ENGINE_TYPE.key(), 
HoodieIndex.BucketIndexEngineType.CONSISTENT_HASHING.name())
                   .option(HoodieLayoutConfig.LAYOUT_TYPE.key(), 
HoodieStorageLayout.LayoutType.DEFAULT.name())
                   
.option(HoodieLayoutConfig.LAYOUT_PARTITIONER_CLASS_NAME.key(), 
SparkBucketIndexPartitioner.class.getName())
   //                .option(HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key(), 
WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.name())
                   .option(HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key(), 
WriteConcurrencyMode.SINGLE_WRITER.name())
                   
.option(HoodieCleanConfig.FAILED_WRITES_CLEANER_POLICY.key(), 
HoodieFailedWritesCleaningPolicy.LAZY.name())
   //                .option("hoodie.write.lock.provider", 
"org.apache.hudi.client.transaction.lock.FileSystemBasedLockProvider")
                   .option("hoodie.write.lock.provider", 
HiveMetastoreBasedLockProvider.class.getName())
                   .option("hoodie.write.lock.hivemetastore.database", 
"bi_ods_real")
                   .option("hoodie.write.lock.hivemetastore.table", 
getTableName())
                   .mode(SaveMode.Append)
                   .save(config.getSinkPath());
       }
   
   }
   ```   
   
   now the code is follow .
   ```java
   private void upsertAllAction(Dataset<Row> jsonDataSet, long maxUseMemory, 
String tempPath) {
           int dataKeepTime = 3 * 24 * 60 / config.getTriggerTime();
           jsonDataSet.write()
                   .format("org.apache.hudi")
                   .option(HoodieTableConfig.TYPE.key(), 
HoodieTableType.MERGE_ON_READ.name())
                   .option(DataSourceWriteOptions.OPERATION().key(), 
WriteOperationType.UPSERT.value())
                   .option(DataSourceWriteOptions.TABLE_TYPE().key(), 
HoodieTableType.MERGE_ON_READ.name())
                   .option(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), 
config.getIdName())
                   .option(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(), 
Constants.DT)
                   
.option(KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_ENABLE.key(), true)
                   .option(HoodieWriteConfig.PRECOMBINE_FIELD_NAME.key(), 
Constants.UPDATE_TIME)
                   .option(HoodieWriteConfig.COMBINE_BEFORE_UPSERT.key(), false)
                   .option(HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key(), 
config.getUpsertParallelism())
                   
.option(HoodieWriteConfig.FINALIZE_WRITE_PARALLELISM_VALUE.key(), 
config.getUpsertParallelism())
                   .option(HoodieWriteConfig.WRITE_PAYLOAD_CLASS_NAME.key(), 
DefaultHoodieRecordPayload.class.getName())
   //                
.option(HoodieWriteConfig.AVRO_EXTERNAL_SCHEMA_TRANSFORMATION_ENABLE.key(), 
true)
   //                
.option(HoodieWriteConfig.AVRO_SCHEMA_VALIDATE_ENABLE.key(), true)
   //                
.option(HoodieWriteConfig.SCHEMA_ALLOW_AUTO_EVOLUTION_COLUMN_DROP.key(), true)
   //                .option(HoodieCommonConfig.RECONCILE_SCHEMA.key(), true)
   //                .option(HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE.key(), 
true)
                   .option(HoodieWriteConfig.MARKERS_TYPE.key(), 
MarkerType.DIRECT.toString())
   //                .option(HoodieWriteConfig.COMBINE_BEFORE_INSERT.key(), 
true)
                   .option(HoodiePayloadConfig.PAYLOAD_CLASS_NAME.key(), 
DefaultHoodieRecordPayload.class.getName())
                   .option(HoodieCleanConfig.CLEANER_COMMITS_RETAINED.key(), 
dataKeepTime)
                   .option(HoodieCleanConfig.AUTO_CLEAN.key(), true)
                   
.option(HoodieCleanConfig.CLEANER_INCREMENTAL_MODE_ENABLE.key(), true)
                   .option(HoodieArchivalConfig.MIN_COMMITS_TO_KEEP.key(), 
dataKeepTime + 1)
                   .option(HoodieArchivalConfig.MAX_COMMITS_TO_KEEP.key(), 
dataKeepTime * 3)
                   
.option(HoodieCompactionConfig.TARGET_IO_PER_COMPACTION_IN_MB.key(), 500 * 1024)
                   .option(HoodieCleanConfig.CLEANER_POLICY.key(), 
HoodieCleaningPolicy.KEEP_LATEST_BY_HOURS.name())
                   .option(HoodieCleanConfig.CLEANER_HOURS_RETAINED.key(), 72)
                   
.option(HoodieCompactionConfig.PARQUET_SMALL_FILE_LIMIT.key(), 128 * 1024 * 
1024)
                   .option(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE.key(), 256 
* 1024 * 1024)
                   .option(HoodieCompactionConfig.INLINE_COMPACT.key(), true)
                   
.option(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key(), 0)
                   .option(HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH.key(), 
tempPath)
                   .option(HoodieMetadataConfig.ENABLE.key(), true)
                   .option(HoodieArchivalConfig.MIN_COMMITS_TO_KEEP.key(), 
dataKeepTime + 1)
                   .option(HoodieArchivalConfig.MAX_COMMITS_TO_KEEP.key(), 
dataKeepTime + 2)
                   .option(HoodieCleanConfig.CLEANER_COMMITS_RETAINED.key(), 
dataKeepTime)
                   .option(HoodieMemoryConfig.MAX_MEMORY_FOR_MERGE.key(), 
maxUseMemory)
                   .option(HoodieMemoryConfig.MAX_MEMORY_FOR_COMPACTION.key(), 
maxUseMemory)
                   .option(HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP_KEY, 
Constants.UPDATE_TIME)
                   
.option(HoodiePayloadProps.PAYLOAD_EVENT_TIME_FIELD_PROP_KEY, 
Constants.UPDATE_TIME)
                   .option(HoodieTableConfig.NAME.key(), config.getName())
   //                .option(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key(), 
ComplexKeyGenerator.class.getName())
                   .option(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key(), 
SimpleKeyGenerator.class.getName())
                   .option(HoodieIndexConfig.INDEX_TYPE.key(), 
HoodieIndex.IndexType.SIMPLE.name())
   //                .option(HoodieIndexConfig.INDEX_TYPE.key(), 
HoodieIndex.IndexType.BUCKET.name())
   //                
.option(HoodieMetadataConfig.RECORD_INDEX_ENABLE_PROP.key(), true)
   //                .option(HoodieIndexConfig.INDEX_TYPE.key(), 
HoodieIndex.IndexType.RECORD_INDEX.name())
   //                .partitionBy(HoodieIndexConfig.INDEX_CLASS_NAME.key(), 
HoodieSparkConsistentBucketIndex.class.getName())
                   
.option(DataSourceReadOptions.EXTRACT_PARTITION_VALUES_FROM_PARTITION_PATH().key(),
 true)
                   .option(HoodieIndexConfig.BUCKET_INDEX_NUM_BUCKETS.key(), 
config.getBucketSize())
                   .option(HoodieIndexConfig.BUCKET_INDEX_ENGINE_TYPE.key(), 
HoodieIndex.BucketIndexEngineType.SIMPLE.name())
   //                .option(HoodieIndexConfig.BUCKET_INDEX_ENGINE_TYPE.key(), 
HoodieIndex.BucketIndexEngineType.CONSISTENT_HASHING.name())
                   .option(HoodieLayoutConfig.LAYOUT_TYPE.key(), 
HoodieStorageLayout.LayoutType.DEFAULT.name())
                   
.option(HoodieLayoutConfig.LAYOUT_PARTITIONER_CLASS_NAME.key(), 
SparkBucketIndexPartitioner.class.getName())
   //                .option(HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key(), 
WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.name())
                   .option(HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key(), 
WriteConcurrencyMode.SINGLE_WRITER.name())
                   
.option(HoodieCleanConfig.FAILED_WRITES_CLEANER_POLICY.key(), 
HoodieFailedWritesCleaningPolicy.LAZY.name())
   //                .option("hoodie.write.lock.provider", 
"org.apache.hudi.client.transaction.lock.FileSystemBasedLockProvider")
   //                .option("hoodie.write.lock.provider", 
HiveMetastoreBasedLockProvider.class.getName())
                   .option("hoodie.write.lock.hivemetastore.database", 
"bi_ods_real")
                   .option("hoodie.write.lock.hivemetastore.table", 
getTableName())
                   .option(HoodieLockConfig.ZK_LOCK_KEY.key(), "bi_ods_real." + 
getTableName())
                   .mode(SaveMode.Append)
                   .save(config.getSinkPath());
       }
   ``` 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to