MrAladdin commented on issue #11178:
URL: https://github.com/apache/hudi/issues/11178#issuecomment-2103729103

   > [@MrAladdin](https://github.com/MrAladdin) Can you please share the 
timeline and writer configurations.
   
   df
    .writeStream
         .format("hudi")
         .option("hoodie.table.base.file.format", "PARQUET")
         .option("hoodie.allow.empty.commit", "true")
         .option("hoodie.datasource.write.drop.partition.columns", "false")
         .option("hoodie.table.services.enabled", "true")
         .option("hoodie.datasource.write.streaming.checkpoint.identifier", 
"lakehouse-dwd-social-kbi-beauty-lower-v1-writer-1")
         .option(PRECOMBINE_FIELD.key(), "date_kbiudate")
         .option(RECORDKEY_FIELD.key(), "records_key")
         .option(PARTITIONPATH_FIELD.key(), "partition_index_date")
         .option(DataSourceWriteOptions.OPERATION.key(), 
DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
         .option(DataSourceWriteOptions.TABLE_TYPE.key(), 
DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
         .option("hoodie.combine.before.upsert", "true")
         .option("hoodie.datasource.write.payload.class", 
"org.apache.hudi.common.model.OverwriteWithLatestAvroPayload")
   
         .option("hoodie.file.listing.parallelism", "200")
   
         .option("hoodie.schema.on.read.enable", "true")
   
         //markers
         .option("hoodie.write.markers.type", "DIRECT")
   
         //timeline server
         .option("hoodie.embed.timeline.server", "true")
         .option("hoodie.embed.timeline.server.async", "true")
         .option("hoodie.embed.timeline.server.gzip", "true")
         .option("hoodie.embed.timeline.server.reuse.enabled", "true")
         .option("hudi hoodie.filesystem.view.incr.timeline.sync.enable", 
"true")
   
         //File System View Storage Configurations
         .option("hoodie.filesystem.view.remote.timeout.secs", "1200")
         .option("hoodie.filesystem.view.remote.retry.enable", "true")
         .option("hoodie.filesystem.view.remote.retry.initial_interval_ms", 
"500")
         .option("hoodie.filesystem.view.remote.retry.max_numbers", "15")
         .option("hoodie.filesystem.view.remote.retry.max_interval_ms", "8000")
         //.option("hoodie.filesystem.operation.retry.enable","true")
   
         //schema cache
         .option("hoodie.schema.cache.enable", "true")
   
         //spark write
         .option("hoodie.datasource.write.streaming.ignore.failed.batch", 
"false")
         .option("hoodie.datasource.write.streaming.retry.count", "6")
         .option("hoodie.datasource.write.streaming.retry.interval.ms", "3000")
   
         //metadata
         .option("hoodie.metadata.enable", "true")
         .option("hoodie.metadata.index.async", "false")
         .option("hoodie.metadata.index.check.timeout.seconds", "900")
         .option("hoodie.auto.adjust.lock.configs", "true")
         .option("hoodie.metadata.optimized.log.blocks.scan.enable", "true")
         .option("hoodie.metadata.compact.max.delta.commits", "20")
         .option("hoodie.metadata.max.reader.memory", "3221225472")
         .option("hoodie.metadata.max.reader.buffer.size", "1073741824")
   
         //index type
         .option("hoodie.metadata.record.index.enable", "true")
         .option("hoodie.index.type", "RECORD_INDEX")
         .option("hoodie.record.index.use.caching", "true")
         .option("hoodie.record.index.input.storage.level", 
"MEMORY_AND_DISK_SER")
         .option("hoodie.metadata.max.init.parallelism", "100000")
         .option("hoodie.metadata.record.index.min.filegroup.count", "720")
         .option("hoodie.metadata.record.index.growth.factor", "2.0")
         .option("hoodie.metadata.record.index.max.filegroup.count", "10000")
         .option("hoodie.metadata.record.index.max.filegroup.size", 
"1073741824")
         .option("hoodie.metadata.auto.initialize", "true")
         .option("hoodie.metadata.max.logfile.size", "2147483648")
         .option("hoodie.metadata.max.deltacommits.when_pending", "1000")
   
         //
         .option("hoodie.parquet.field_id.write.enabled", "true")
         .option("hoodie.copyonwrite.insert.auto.split", "true")
         .option("hoodie.record.size.estimation.threshold", "1.0")
         .option("hoodie.parquet.block.size", "536870912")
         .option("hoodie.parquet.max.file.size", "536870912")
         .option("hoodie.parquet.small.file.limit", "209715200")
         .option("hoodie.logfile.max.size", "536870912")
         .option("hoodie.logfile.data.block.max.size", "536870912")
         .option("hoodie.logfile.to.parquet.compression.ratio", "0.35")
   
         //archive
         .option("hoodie.keep.max.commits", "30")
         .option("hoodie.keep.min.commits", "20")
         .option("hoodie.commits.archival.batch", "10")
         .option("hoodie.archive.automatic", "true")
         .option("hoodie.archive.async", "true")
         .option("hoodie.archive.beyond.savepoint", "true")
         .option("hoodie.fail.on.timeline.archiving", "true")
         .option("hoodie.archive.merge.enable", "true")
         .option("hoodie.archive.merge.files.batch.size", "10")
         .option("hoodie.archive.merge.small.file.limit.bytes", "20971520")
   
         //cleaner
         .option("hoodie.clean.allow.multiple", "true")
         .option("hoodie.cleaner.incremental.mode", "true")
         .option("hoodie.clean.async", "true")
         .option("hoodie.cleaner.policy.failed.writes", "LAZY")
         .option("hoodie.cleaner.delete.bootstrap.base.file", "true")
         .option("hoodie.clean.automatic", "true")
         .option("hoodie.cleaner.policy", "KEEP_LATEST_BY_HOURS")
         .option("hoodie.cleaner.hours.retained", "6")
         .option("hoodie.clean.trigger.strategy", "NUM_COMMITS")
         .option("hoodie.clean.max.commits", "10")
   
         //compact
         .option("hoodie.datasource.compaction.async.enable", "true")
         .option("hoodie.compact.inline", "false")
         .option("hoodie.compact.schedule.inline", "false")
         .option("hoodie.compaction.lazy.block.read", "true")
         .option("hoodie.compaction.reverse.log.read", "false")
         .option("hoodie.compaction.logfile.size.threshold", "314572800")
         .option("hoodie.compaction.target.io", compact_limit)
         .option("hoodie.compaction.strategy", 
"org.apache.hudi.table.action.compact.strategy.LogFileSizeBasedCompactionStrategy")
         .option("hoodie.compact.inline.trigger.strategy", "NUM_AND_TIME")
         .option("hoodie.compact.inline.max.delta.commits", "10")
         .option("hoodie.compact.inline.max.delta.seconds", "7200")
         .option("hoodie.memory.compaction.fraction", "0.6")
   
   
         .option("hoodie.datasource.write.reconcile.schema", "true")
         .option("hoodie.write.set.null.for.missing.columns", "true")
         .option("hoodie.avro.schema.external.transformation", "true")
         .option("hoodie.avro.schema.validate", "true")
   
   
         //lock
         .option("hoodie.write.concurrency.mode", 
"optimistic_concurrency_control")
         .option("hoodie.write.lock.provider", 
"org.apache.hudi.client.transaction.lock.FileSystemBasedLockProvider")
         .option("hoodie.write.lock.filesystem.expire", "10")
   
   
         .option(config.HoodieWriteConfig.TBL_NAME.key(), table_name)
         .option("path", output_path + "/" + table_name)
         .option("checkpointLocation", checkpoint_path)
         .outputMode(OutputMode.Append())
         .queryName("lakehouse-dwd-social-kbi-beauty-lower-v1")
         .start()
   
   
   
   
   1、In fact, there is only one writing program, and all table services are 
completed within the structured writing program. Just discovered that in 
.option(RECORDKEY_FIELD.key(), "records_key"), the records_key is unique under 
each partition, and only a very small number of data instances will have the 
same records_key but in different partitions. Since record_index is a global 
index, is this the reason that causes the exception during upsert?
   2、I have a question: When using Spark Structured Streaming to write data, 
the number of hfile files under .hoodie/metadata/record_index is twice the 
amount set by .option("hoodie.metadata.record.index.min.filegroup.count", 
"720"), but when using offline Spark DataFrame for batch data writing, each 
submission will generate a corresponding number of hfile, leading to an 
excessively large number of hfiles under record_index. What is the reason for 
this, and how can we better control the number of hfile files under 
.hoodie/metadata/record_index and what is the most reasonable setting for the 
size of each hfile? Also, what are the specific parameter names involved?
   3、When using Spark Structured Streaming to write data, if it is found that 
individual hfile files are too large, by using 
.option("hoodie.metadata.record.index.min.filegroup.count", "1000") to change 
the number of hfile files under .hoodie/metadata/record_index later, will it 
take effect after restarting the program, and how to modify it when it does not 
take effect?
   
   Thanks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to