peng-xin opened a new issue #2448:
URL: https://github.com/apache/hudi/issues/2448
**Environment Description**
* Hudi version :
0.6.0
* Spark version :
spark-2.4.4-bin-hadoop2.7
* Hive version :
hive-2.3.4
* Hadoop version :
hadoop2.7.3
* Storage (HDFS/S3/GCS..) :
hdfs
* Running on Docker? (yes/no) :
no
1.when i write data to hudi,the error is
`Logical Plan:
RepartitionByExpression [dbName#23, tblName#24], 6
+- Project [row#21.dbName AS dbName#23, row#21.tblName AS tblName#24,
row#21.opr AS opr#25, row#21.datalakeLogicalDeletion AS
datalakeLogicalDeletion#26, row#21.etlTime AS etlTime#27L, row#21.jsonData AS
jsonData#28]
+- Project [jsontostructs(StructField(dbName,StringType,true),
StructField(tblName,StringType,true), StructField(opr,StringType,true),
StructField(datalakeLogicalDeletion,IntegerType,true),
StructField(etlTime,LongType,true), StructField(jsonData,StringType,true),
cast(value#8 as string), Some(PRC)) AS row#21]
+- StreamingExecutionRelation KafkaV2[Subscribe[datalake_advertise]],
[key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12,
timestampType#13]
at
org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:297)
at
org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:193)
at
org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:297)
at
org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:193)
Caused by: org.apache.hudi.exception.HoodieIOException: Failed to create
file
/user/datalake/hudi/hbase/f_mid_business_card/.hoodie/20210114202219.deltacommit
at
org.apache.hudi.common.table.timeline.HoodieActiveTimeline.createImmutableFileInPath(HoodieActiveTimeline.java:449)
at
org.apache.hudi.common.table.timeline.HoodieActiveTimeline.transitionState(HoodieActiveTimeline.java:333)
at
org.apache.hudi.common.table.timeline.HoodieActiveTimeline.createImmutableFileInPath(HoodieActiveTimeline.java:449)
at
org.apache.hudi.common.table.timeline.HoodieActiveTimeline.transitionState(HoodieActiveTimeline.java:333)
at
org.apache.hudi.common.table.timeline.HoodieActiveTimeline.transitionState(HoodieActiveTimeline.java:308)
at
org.apache.hudi.common.table.timeline.HoodieActiveTimeline.saveAsComplete(HoodieActiveTimeline.java:143)
at
org.apache.hudi.client.AbstractHoodieWriteClient.commitStats(AbstractHoodieWriteClient.java:124)
at
org.apache.hudi.client.AbstractHoodieWriteClient.commitStats(AbstractHoodieWriteClient.java:124)
at
org.apache.hudi.client.AbstractHoodieWriteClient.commit(AbstractHoodieWriteClient.java:99)
at
org.apache.hudi.HoodieSparkSqlWriter$.commitAndPerformPostOperations(HoodieSparkSqlWriter.scala:397)
at
org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:205) at
org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:205)
at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:125)
at
org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
`
2.my hoodieWriteConfig :
`hoodie.filesystem.view.incr.timeline.sync.enable -> false,
hoodie.bulkinsert.sort.mode -> GLOBAL_SORT,
hoodie.avro.schema.externalTransformation -> false,
hoodie.bootstrap.parallelism -> 1500,
hoodie.delete.shuffle.parallelism -> 1500,
hoodie.simple.index.use.caching -> true,
hoodie.bloom.index.filter.type -> DYNAMIC_V0,
hoodie.filesystem.view.remote.port -> 26754,
hoodie.datasource.write.operation -> upsert,
hoodie.cleaner.parallelism -> 200,
hoodie.global.simple.index.parallelism -> 100,
hoodie.bootstrap.mode.selector.regex -> .*,
hoodie.parquet.page.size -> 1048576,
hoodie.datasource.write.table.type -> MERGE_ON_READ,
hoodie.datasource.hive_sync.table -> f_mid_business_card,
hoodie.compaction.daybased.target.partitions -> 10,
hoodie.metrics.reporter.class -> ,
hoodie.parquet.block.size -> 125829120,
hoodie.cleaner.delete.bootstrap.base.file -> false,
hoodie.consistency.check.max_interval_ms -> 300000,
hoodie.insert.shuffle.parallelism -> 100,
hoodie.upsert.shuffle.parallelism -> 100,
hoodie.bulkinsert.shuffle.parallelism -> 1500,
hoodie.write.commit.callback.on -> false,
hoodie.cleaner.fileversions.retained -> 3,
hoodie.datasource.hive_sync.partition_extractor_class ->
org.apache.hudi.hive.NonPartitionedExtractor,
hoodie.parquet.compression.codec -> gzip,
hoodie.datasource.write.hive_style_partitioning -> true,
hoodie.copyonwrite.insert.split.size -> 500000,
hoodie.optimistic.consistency.guard.sleep_time_ms -> 500,
hoodie.datasource.hive_sync.use_jdbc -> true,
hoodie.metrics.reporter.type -> GRAPHITE,
hoodie.bootstrap.index.class ->
org.apache.hudi.common.bootstrap.index.HFileBootstrapIndex,
hoodie.filesystem.remote.backup.view.enable -> true,
hoodie.logfile.to.parquet.compression.ratio -> 0.35,
hoodie.filesystem.view.spillable.mem -> 104857600,
hoodie.write.status.storage.level -> MEMORY_AND_DISK_SER,
hoodie.write.commit.callback.http.timeout.seconds -> 3,
hoodie.copyonwrite.insert.auto.split -> true,
hoodie.logfile.data.block.max.size -> 268435456,
hoodie.index.type -> BLOOM,
hoodie.keep.min.commits -> 6,
hoodie.memory.spillable.map.path -> /tmp/,
hoodie.filesystem.view.rocksdb.base.path -> /tmp/hoodie_timeline_rocksdb,
hoodie.compact.inline -> false,
hoodie.clean.async -> true,
hoodie.record.size.estimation.threshold -> 1.0,
hoodie.metrics.graphite.host -> localhost,
hoodie.simple.index.update.partition.path -> false,
hoodie.bloom.index.filter.dynamic.max.entries -> 100000,
hoodie.compaction.reverse.log.read -> false,
hoodie.metrics.jmx.port -> 9889,
hoodie.writestatus.class -> org.apache.hudi.client.WriteStatus,
hoodie.datasource.hive_sync.enable -> true,
hoodie.finalize.write.parallelism -> 1500,
hoodie.rollback.parallelism -> 100,
hoodie.index.bloom.num_entries -> 60000,
hoodie.memory.merge.max.size -> 131072,
hoodie.bootstrap.mode.selector.regex.mode -> METADATA_ONLY,
hoodie.rollback.using.markers -> false,
hoodie.copyonwrite.record.size.estimate -> 1024,
hoodie.bloom.index.input.storage.level -> MEMORY_AND_DISK_SER,
hoodie.simple.index.parallelism -> 50,
hoodie.consistency.check.enabled -> false,
hoodie.bloom.index.use.caching -> true,
hoodie.metrics.on -> false,
hoodie.memory.compaction.max.size -> 1048576,
hoodie.parquet.small.file.limit -> 104857600,
hoodie.combine.before.insert -> false,
hoodie.cleaner.commits.retained -> 2,
hoodie.embed.timeline.server -> true,
hoodie.bootstrap.mode.selector ->
org.apache.hudi.client.bootstrap.selector.MetadataOnlyBootstrapModeSelector,
hoodie.filesystem.view.secondary.type -> MEMORY,
_.hoodie.allow.multi.write.on.same.instant -> false,
hoodie.datasource.write.partitionpath.field -> ,
_hoodie.optimistic.consistency.guard.enable -> true,
hoodie.datasource.hive_sync.database -> hbase,
hoodie.bloom.index.update.partition.path -> true,
hoodie.fail.on.timeline.archiving -> true,
hoodie.markers.delete.parallelism -> 100,
hoodie.filesystem.view.type -> MEMORY,
hoodie.parquet.max.file.size -> 125829120,
hoodie.datasource.write.keygenerator.class ->
org.apache.hudi.keygen.NonpartitionedKeyGenerator,
hoodie.bootstrap.partitionpath.translator.class ->
org.apache.hudi.client.bootstrap.translator.IdentityBootstrapPartitionPathTranslator,
hoodie.bloom.index.prune.by.ranges -> true,
hoodie.base.path -> /user/datalake/hudi/hbase/f_mid_business_card,
hoodie.index.class -> ,
hoodie.clean.automatic -> true,
hoodie.filesystem.view.remote.host -> localhost,
hoodie.compaction.lazy.block.read -> false,
hoodie.memory.writestatus.failure.fraction -> 0.1,
hoodie.metrics.graphite.port -> 4756,
hoodie.cleaner.policy -> KEEP_LATEST_COMMITS,
hoodie.logfile.max.size -> 1073741824,
hoodie.filesystem.view.spillable.compaction.mem.fraction -> 0.01,
hoodie.datasource.write.recordkey.field -> datalake_rowkey,
hoodie.avro.schema.validate -> false,
hoodie.simple.index.input.storage.level -> MEMORY_AND_DISK_SER,
hoodie.timeline.layout.version -> 1,
hoodie.consistency.check.max_checks -> 7,
hoodie.consistency.check.initial_interval_ms -> 2000,
hoodie.keep.max.commits -> 8,
hoodie.compact.inline.max.delta.commits -> 5,
hoodie.parquet.compression.ratio -> 0.1,
hoodie.memory.dfs.buffer.max.size -> 16777216,
hoodie.auto.commit -> true,
hoodie.write.commit.callback.http.api.key -> hudi_write_commit_http_callback,
hoodie.assume.date.partitioning -> false,
hoodie.filesystem.view.spillable.dir -> /tmp/view_map/,
hoodie.compaction.strategy ->
org.apache.hudi.table.action.compact.strategy.LogFileSizeBasedCompactionStrategy,
hoodie.combine.before.upsert -> true,
hoodie.bloom.index.keys.per.bucket -> 10000000,
hoodie.write.commit.callback.class ->
org.apache.hudi.callback.impl.HoodieWriteCommitHttpCallback,
hoodie.bloom.index.parallelism -> 0,
hoodie.cleaner.incremental.mode -> true,
hoodie.commits.archival.batch -> 5,
hoodie.datasource.hive_sync.partition_fields -> ,
hoodie.compaction.target.io -> 512000,
hoodie.table.name -> f_mid_business_card,
hoodie.bloom.index.bucketized.checking -> true,
hoodie.compaction.payload.class ->
org.apache.hudi.common.model.OverwriteWithLatestAvroPayload,
hoodie.combine.before.delete -> true,
hoodie.datasource.write.precombine.field -> ts,
hoodie.filesystem.view.spillable.bootstrap.base.file.mem.fraction -> 0.05,
hoodie.metrics.jmx.host -> localhost,
hoodie.index.bloom.fpp -> 0.000000001,
hoodie.datasource.hive_sync.jdbcurl -> jdbc:hive2://172.16.116.102:10000,
hoodie.bloom.index.use.treebased.filter -> true`
**To Reproduce**
Key code is
`val hudiDF = buildHudiDF(tableConfig, partitionDF, dbNameSource,
tableNameSource)
val hoodieWriteConfigMap = buildHudiWriteConfig(tableConfig)
hudiDF.write.format("hudi")
.options(hoodieWriteConfigMap)
.mode(SaveMode.Append)
.save(hoodieWriteConfigMap.getOrElse(BASE_PATH_PROP,
"/tmp"))`
` private def buildHudiDF(tableConfig: TableConfig, sourceDataFrame:
DataFrame, dbNameKafka: String, tblNameKafka: String): DataFrame = {
val dataLakeColumns = tableConfig.getDataLakeColumns.asScala.toArray
val commonSchema = new StructType(dataLakeColumns.map(column =>
StructField(column.getColumnName, StringType)))
val castArray = dataLakeColumns.map(column =>
col(column.getColumnName).cast(column.getColumnType).toString())
var targetDataFrame = sourceDataFrame
.select(from_json(col("jsonData"), commonSchema).alias("row"),
col("datalakeLogicalDeletion"))
.select(col("row.*"), col("datalakeLogicalDeletion"))
.selectExpr(castArray :+
(col("datalakeLogicalDeletion").alias("datalake_logical_deletion").toString()):
_*)
.filter(col("dbName").isNull.or(col("dbName").equalTo(dbNameKafka)).and(col("tblName").equalTo(tblNameKafka)))
val precombineField = tableConfig.getPrecombineFieldOptKey
if (StringUtils.isEmpty(precombineField) ||
!commonSchema.contains(StructField(precombineField, StringType))) {
targetDataFrame = targetDataFrame
.withColumn("ts", current_timestamp())
}
val complexRecordKey = tableConfig.getRecordkeyFieldOptKey
if (StringUtils.isNotEmpty(complexRecordKey)) {
val keyArray = complexRecordKey.split(StringUtils.COMMA_SYMBOL)
var conditionChain =
col(keyArray(0)).isNotNull.and(col(keyArray(0)).notEqual(StringUtils.EMPTY))
keyArray.tail.foreach(key =>
conditionChain.or(col(key).isNotNull.and(col(key).notEqual(StringUtils.EMPTY))))
targetDataFrame = targetDataFrame.filter(conditionChain)
}
targetDataFrame
}`
` private def buildHudiWriteConfig(tableConfig: TableConfig):
mutable.Map[String, String] = {
val options = new mutable.HashMap[String, String]
val dbName = tableConfig.getDbName
val tblName = tableConfig.getTableName
val partition_field = tableConfig.getPartitionpathFieldOptKey
options += (TABLE_TYPE_OPT_KEY -> tableConfig.getTableTypeOptKey)
options += (OPERATION_OPT_KEY -> tableConfig.getOperationOptKey)
options += (RECORDKEY_FIELD_OPT_KEY ->
tableConfig.getRecordkeyFieldOptKey)
options += (PRECOMBINE_FIELD_OPT_KEY ->
tableConfig.getPrecombineFieldOptKey)
options += (HIVE_SYNC_ENABLED_OPT_KEY -> "true")
options += (INDEX_TYPE_PROP -> tableConfig.getIndexTypeProp)
options += (BLOOM_INDEX_FILTER_TYPE ->
BloomFilterTypeCode.DYNAMIC_V0.name())
options += (KEYGENERATOR_CLASS_OPT_KEY ->
tableConfig.getKeygeneratorClassOptKey)
options += (HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY ->
tableConfig.getHivePartitionExtractorClassOptKey)
options += (HIVE_STYLE_PARTITIONING_OPT_KEY -> "true")
options += (PARTITIONPATH_FIELD_OPT_KEY -> partition_field)
options += (HIVE_PARTITION_FIELDS_OPT_KEY ->
partition_field.split(",").map(pair => pair.split(":")(0)).mkString(","))
options += (HIVE_DATABASE_OPT_KEY -> dbName)
options += (HIVE_TABLE_OPT_KEY -> tblName)
options += (HIVE_USE_JDBC_OPT_KEY -> tableConfig.getHiveUseJdbcOptKey)
options += (HIVE_URL_OPT_KEY -> tableConfig.getHiveUrlOptKey)
val hoodieCompactionConfig = HoodieCompactionConfig.newBuilder()
.retainCommits(2)
.withCommitsArchivalBatchSize(5)
.archiveCommitsWith(6, 8)
.withAsyncClean(true)
.build()
val hoodieIndexConfig = HoodieIndexConfig.newBuilder()
.withBloomIndexUpdatePartitionPath(true)
.build()
val hoodieMemoryConfig = HoodieMemoryConfig.newBuilder()
.withMaxMemoryMaxSize(128 * 1024, 1024 * 1024)
.build()
val hoodieWriteConfig = HoodieWriteConfig.newBuilder()
.withParallelism(tableConfig.getShuffleParallelism.toInt,
tableConfig.getShuffleParallelism.toInt)
.withIndexConfig(hoodieIndexConfig)
.withCompactionConfig(hoodieCompactionConfig)
.withMemoryConfig(hoodieMemoryConfig)
.withProps(options.asJava)
.forTable(tblName)
.withPath(StringUtils.concat(tableConfig.getBasePath, File.separator,
tableConfig.getDbName, File.separator, tableConfig.getTableName))
.build()
logger.info(s"hoodieWriteConfig ->
{${hoodieWriteConfig.getProps.asScala.toString()}}")
hoodieWriteConfig.getProps.asScala
}`
but ,when the hoodieWriteConfig like below,it is ok.but the log file is too
big(2GB+),and small data file(1MB-) too much. and cause OOM error each hour.
`hoodie.datasource.hive_sync.table->f_mid_order_details,
hoodie.bloom.index.update.partition.path->true,
hoodie.bloom.index.filter.type->DYNAMIC_V0,
hoodie.datasource.write.keygenerator.class->org.apache.hudi.keygen.SimpleKeyGenerator,
hoodie.datasource.hive_sync.database->hudi,
hoodie.datasource.write.table.type->MERGE_ON_READ,
hoodie.datasource.write.partitionpath.field->payment_date,
hoodie.datasource.hive_sync.partition_fields->payment_date,
hoodie.datasource.hive_sync.partition_extractor_class->org.apache.hudi.hive.MultiPartKeysValueExtractor,
hoodie.datasource.write.recordkey.field->key,
hoodie.datasource.hive_sync.enable->true,
hoodie.upsert.shuffle.parallelism->100,
hoodie.index.type->GLOBAL_BLOOM,
hoodie.datasource.hive_sync.jdbcurl->jdbc:hive2://172.16.117.73:10000,
hoodie.compact.inline->true,
hoodie.datasource.write.precombine.field->ts,
hoodie.table.name->f_mid_order_details,
hoodie.datasource.write.hive_style_partitioning->true,
hoodie.datasource.write.operation->upsert`
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]