wqwl611 commented on issue #10882:
URL: https://github.com/apache/hudi/issues/10882#issuecomment-2006766143

   > @danny0405 Can you give more details on how did you ingested this table? 
What writer configuration you used and did you changed index type for this 
table?
   
   @ad1happy2go 
   I write this table by spark like 
"df.write.format("org.apache.hudi").save(basePath)", and I use multi-writer 
mode and a independent compactor service。
   
   Following is my full write config:
   
   df.write.format("org.apache.hudi").
         // base info
         option(TABLE_TYPE.key(), tableType).
         option(OPERATION.key(), opType).
         option(TBL_NAME.key, tableNameWithSuffix).
         option(HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key, 
upsertParallelism).
         option(HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key, 
upsertParallelism).
         option(HoodieWriteConfig.BULKINSERT_PARALLELISM_VALUE.key, 
upsertParallelism).
         option(PRECOMBINE_FIELD.key, preCombineField).
         option(EVENT_TIME_FIELD.key, preCombineField).
         option(ORDERING_FIELD.key, preCombineField).
         option(PARTITIONPATH_FIELD.key, partitionFields).
         option(RECORDKEY_FIELD.key, recordKeyField).
         // key
         option(HoodieWriteConfig.KEYGENERATOR_TYPE.key, 
KeyGeneratorType.COMPLEX.name).
         option(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key, 
classOf[ComplexKeyGenerator].getName).
         // hive info
         option(HiveSyncConfig.HIVE_SYNC_ENABLED.key(), syncHive).
         option(HiveSyncConfig.HIVE_SYNC_TABLE_STRATEGY.key(), "RT").
         option(HIVE_STYLE_PARTITIONING.key(), true).
         option(HiveSyncConfig.HIVE_IGNORE_EXCEPTIONS.key(), false).
         option(HiveSyncConfig.HIVE_CREATE_MANAGED_TABLE.key(), false).
         
option(HiveSyncConfig.HIVE_SKIP_RO_SUFFIX_FOR_READ_OPTIMIZED_TABLE.key(), 
"true").
         option(HiveSyncConfig.HIVE_SYNC_MODE.key(), "hms").
         option(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key, 
classOf[MultiPartKeysValueExtractor].getName).
         option(HoodieSyncConfig.META_SYNC_DATABASE_NAME.key(), hiveDb).
         option(HoodieSyncConfig.META_SYNC_TABLE_NAME.key(), hiveTableName).
         // metadata table 关闭 元数据表
         options(metatableOptions).
         // compact info
         option(ASYNC_COMPACT_ENABLE.key, true).
         option(INLINE_COMPACT.key, inLineCompact).
         option(SCHEDULE_INLINE_COMPACT.key, scheduleInlineCompact).
         option(COMPACTION_LAZY_BLOCK_READ_ENABLE.key, true).
         option(COMPACTION_LOG_FILE_SIZE_THRESHOLD.key, 
CompactorParamUtils.getLogSizeThreshold(tableNameWithSuffix)).
         option(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE.key, maxFileSize).
         option(PARQUET_SMALL_FILE_LIMIT.key, minLimitFileSize).
         option(HoodieStorageConfig.PARQUET_COMPRESSION_CODEC_NAME.key, "gzip").
         // archive
         option(HoodieArchivalConfig.MAX_COMMITS_TO_KEEP.key, "180").
         option(HoodieArchivalConfig.MIN_COMMITS_TO_KEEP.key, "120").
         option(HoodieArchivalConfig.COMMITS_ARCHIVAL_BATCH_SIZE.key, "100").
         option(HoodieArchivalConfig.AUTO_ARCHIVE.key, "true").
         option(HoodieArchivalConfig.ASYNC_ARCHIVE.key, "false").
         // important payload.....
         option(HoodiePayloadConfig.PAYLOAD_CLASS_NAME.key, 
classOf[EventTimeAvroPayload].getName).
         option(HoodieWriteConfig.WRITE_PAYLOAD_CLASS_NAME.key, 
classOf[EventTimeAvroPayload].getName).
         option(HoodieCompactionConfig.INLINE_COMPACT_TRIGGER_STRATEGY.key, 
CompactionTriggerStrategy.NUM_COMMITS.name).
         option(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key, 
CompactorParamUtils.getMaxDeltaCommits(tableNameWithSuffix)).
         option(HoodieCompactionConfig.INLINE_COMPACT_TIME_DELTA_SECONDS.key, 
3600 * 3).
         // index info
         options(buildCleanerOptions(df.sparkSession)).
         option(HoodieIndexConfig.INDEX_TYPE.key, IndexType.BUCKET.name()).
         option(HoodieIndexConfig.BUCKET_INDEX_NUM_BUCKETS.key, 1).
         option(HoodieIndexConfig.BUCKET_INDEX_HASH_FIELD.key, hashKey).
         option(HoodieLayoutConfig.LAYOUT_TYPE.key, "BUCKET").
         option(HoodieLayoutConfig.LAYOUT_PARTITIONER_CLASS_NAME.key, 
classOf[SparkBucketIndexPartitioner[_]].getName).
         // 并发
         options(buildConcurrentOptions(tableNameWithSuffix)).
         // heartbeat
         option(HoodieWriteConfig.CLIENT_HEARTBEAT_NUM_TOLERABLE_MISSES.key, 
10).
         option(HoodieWriteConfig.CLIENT_HEARTBEAT_INTERVAL_IN_MS.key, 60000).
         mode(Append).
         save(basePath)
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to