peng-xin opened a new issue #2448:
URL: https://github.com/apache/hudi/issues/2448


   **Environment Description**
   * Hudi version :
   0.6.0
   * Spark version :
   spark-2.4.4-bin-hadoop2.7
   * Hive version :
   hive-2.3.4
   * Hadoop version :
   hadoop2.7.3
   * Storage (HDFS/S3/GCS..) :
   hdfs
   * Running on Docker? (yes/no) :
   no
   
   1.when i write data to hudi,the error is
   `Logical Plan:
   RepartitionByExpression [dbName#23, tblName#24], 6
   +- Project [row#21.dbName AS dbName#23, row#21.tblName AS tblName#24, 
row#21.opr AS opr#25, row#21.datalakeLogicalDeletion AS 
datalakeLogicalDeletion#26, row#21.etlTime AS etlTime#27L, row#21.jsonData AS 
jsonData#28]
      +- Project [jsontostructs(StructField(dbName,StringType,true), 
StructField(tblName,StringType,true), StructField(opr,StringType,true), 
StructField(datalakeLogicalDeletion,IntegerType,true), 
StructField(etlTime,LongType,true), StructField(jsonData,StringType,true), 
cast(value#8 as string), Some(PRC)) AS row#21]
         +- StreamingExecutionRelation KafkaV2[Subscribe[datalake_advertise]], 
[key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12, 
timestampType#13]
   
        at 
org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:297)
        at 
org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:193)
   
        at 
org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:297)
        at 
org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:193)
   Caused by: org.apache.hudi.exception.HoodieIOException: Failed to create 
file 
/user/datalake/hudi/hbase/f_mid_business_card/.hoodie/20210114202219.deltacommit
        at 
org.apache.hudi.common.table.timeline.HoodieActiveTimeline.createImmutableFileInPath(HoodieActiveTimeline.java:449)
        at 
org.apache.hudi.common.table.timeline.HoodieActiveTimeline.transitionState(HoodieActiveTimeline.java:333)
        at 
org.apache.hudi.common.table.timeline.HoodieActiveTimeline.createImmutableFileInPath(HoodieActiveTimeline.java:449)
        at 
org.apache.hudi.common.table.timeline.HoodieActiveTimeline.transitionState(HoodieActiveTimeline.java:333)
        at 
org.apache.hudi.common.table.timeline.HoodieActiveTimeline.transitionState(HoodieActiveTimeline.java:308)
        at 
org.apache.hudi.common.table.timeline.HoodieActiveTimeline.saveAsComplete(HoodieActiveTimeline.java:143)
        at 
org.apache.hudi.client.AbstractHoodieWriteClient.commitStats(AbstractHoodieWriteClient.java:124)
   
        at 
org.apache.hudi.client.AbstractHoodieWriteClient.commitStats(AbstractHoodieWriteClient.java:124)
        at 
org.apache.hudi.client.AbstractHoodieWriteClient.commit(AbstractHoodieWriteClient.java:99)
   
        at 
org.apache.hudi.HoodieSparkSqlWriter$.commitAndPerformPostOperations(HoodieSparkSqlWriter.scala:397)
   
        at 
org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:205)  at 
org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:205)
        at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:125)
        at 
org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
   `
   2.my hoodieWriteConfig :
   `hoodie.filesystem.view.incr.timeline.sync.enable -> false,
   hoodie.bulkinsert.sort.mode -> GLOBAL_SORT,
   hoodie.avro.schema.externalTransformation -> false,
   hoodie.bootstrap.parallelism -> 1500,
   hoodie.delete.shuffle.parallelism -> 1500,
   hoodie.simple.index.use.caching -> true,
   hoodie.bloom.index.filter.type -> DYNAMIC_V0,
   hoodie.filesystem.view.remote.port -> 26754,
   hoodie.datasource.write.operation -> upsert,
   hoodie.cleaner.parallelism -> 200,
   hoodie.global.simple.index.parallelism -> 100,
   hoodie.bootstrap.mode.selector.regex -> .*,
   hoodie.parquet.page.size -> 1048576,
   hoodie.datasource.write.table.type -> MERGE_ON_READ,
   hoodie.datasource.hive_sync.table -> f_mid_business_card,
   hoodie.compaction.daybased.target.partitions -> 10,
   hoodie.metrics.reporter.class -> ,
   hoodie.parquet.block.size -> 125829120,
   hoodie.cleaner.delete.bootstrap.base.file -> false,
   hoodie.consistency.check.max_interval_ms -> 300000,
   hoodie.insert.shuffle.parallelism -> 100,
   hoodie.upsert.shuffle.parallelism -> 100,
   hoodie.bulkinsert.shuffle.parallelism -> 1500,
   hoodie.write.commit.callback.on -> false,
   hoodie.cleaner.fileversions.retained -> 3,
   hoodie.datasource.hive_sync.partition_extractor_class -> 
org.apache.hudi.hive.NonPartitionedExtractor,
   hoodie.parquet.compression.codec -> gzip,
   hoodie.datasource.write.hive_style_partitioning -> true,
   hoodie.copyonwrite.insert.split.size -> 500000,
   hoodie.optimistic.consistency.guard.sleep_time_ms -> 500,
   hoodie.datasource.hive_sync.use_jdbc -> true,
   hoodie.metrics.reporter.type -> GRAPHITE,
   hoodie.bootstrap.index.class -> 
org.apache.hudi.common.bootstrap.index.HFileBootstrapIndex,
   hoodie.filesystem.remote.backup.view.enable -> true,
   hoodie.logfile.to.parquet.compression.ratio -> 0.35,
   hoodie.filesystem.view.spillable.mem -> 104857600,
   hoodie.write.status.storage.level -> MEMORY_AND_DISK_SER,
   hoodie.write.commit.callback.http.timeout.seconds -> 3,
   hoodie.copyonwrite.insert.auto.split -> true,
   hoodie.logfile.data.block.max.size -> 268435456,
   hoodie.index.type -> BLOOM,
   hoodie.keep.min.commits -> 6,
   hoodie.memory.spillable.map.path -> /tmp/,
   hoodie.filesystem.view.rocksdb.base.path -> /tmp/hoodie_timeline_rocksdb,
   hoodie.compact.inline -> false,
   hoodie.clean.async -> true,
   hoodie.record.size.estimation.threshold -> 1.0,
   hoodie.metrics.graphite.host -> localhost,
   hoodie.simple.index.update.partition.path -> false,
   hoodie.bloom.index.filter.dynamic.max.entries -> 100000,
   hoodie.compaction.reverse.log.read -> false,
   hoodie.metrics.jmx.port -> 9889,
   hoodie.writestatus.class -> org.apache.hudi.client.WriteStatus,
   hoodie.datasource.hive_sync.enable -> true,
   hoodie.finalize.write.parallelism -> 1500,
   hoodie.rollback.parallelism -> 100,
   hoodie.index.bloom.num_entries -> 60000,
   hoodie.memory.merge.max.size -> 131072,
   hoodie.bootstrap.mode.selector.regex.mode -> METADATA_ONLY,
   hoodie.rollback.using.markers -> false,
   hoodie.copyonwrite.record.size.estimate -> 1024,
   hoodie.bloom.index.input.storage.level -> MEMORY_AND_DISK_SER,
   hoodie.simple.index.parallelism -> 50,
   hoodie.consistency.check.enabled -> false,
   hoodie.bloom.index.use.caching -> true,
   hoodie.metrics.on -> false,
   hoodie.memory.compaction.max.size -> 1048576,
   hoodie.parquet.small.file.limit -> 104857600,
   hoodie.combine.before.insert -> false,
   hoodie.cleaner.commits.retained -> 2,
   hoodie.embed.timeline.server -> true,
   hoodie.bootstrap.mode.selector -> 
org.apache.hudi.client.bootstrap.selector.MetadataOnlyBootstrapModeSelector,
   hoodie.filesystem.view.secondary.type -> MEMORY,
   _.hoodie.allow.multi.write.on.same.instant -> false,
   hoodie.datasource.write.partitionpath.field -> ,
   _hoodie.optimistic.consistency.guard.enable -> true,
   hoodie.datasource.hive_sync.database -> hbase,
   hoodie.bloom.index.update.partition.path -> true,
   hoodie.fail.on.timeline.archiving -> true,
   hoodie.markers.delete.parallelism -> 100,
   hoodie.filesystem.view.type -> MEMORY,
   hoodie.parquet.max.file.size -> 125829120,
   hoodie.datasource.write.keygenerator.class -> 
org.apache.hudi.keygen.NonpartitionedKeyGenerator,
   hoodie.bootstrap.partitionpath.translator.class -> 
org.apache.hudi.client.bootstrap.translator.IdentityBootstrapPartitionPathTranslator,
   hoodie.bloom.index.prune.by.ranges -> true,
   hoodie.base.path -> /user/datalake/hudi/hbase/f_mid_business_card,
   hoodie.index.class -> ,
   hoodie.clean.automatic -> true,
   hoodie.filesystem.view.remote.host -> localhost,
   hoodie.compaction.lazy.block.read -> false,
   hoodie.memory.writestatus.failure.fraction -> 0.1,
   hoodie.metrics.graphite.port -> 4756,
   hoodie.cleaner.policy -> KEEP_LATEST_COMMITS,
   hoodie.logfile.max.size -> 1073741824,
   hoodie.filesystem.view.spillable.compaction.mem.fraction -> 0.01,
   hoodie.datasource.write.recordkey.field -> datalake_rowkey,
   hoodie.avro.schema.validate -> false,
   hoodie.simple.index.input.storage.level -> MEMORY_AND_DISK_SER,
   hoodie.timeline.layout.version -> 1,
   hoodie.consistency.check.max_checks -> 7,
   hoodie.consistency.check.initial_interval_ms -> 2000,
   hoodie.keep.max.commits -> 8,
   hoodie.compact.inline.max.delta.commits -> 5,
   hoodie.parquet.compression.ratio -> 0.1,
   hoodie.memory.dfs.buffer.max.size -> 16777216,
   hoodie.auto.commit -> true,
   hoodie.write.commit.callback.http.api.key -> hudi_write_commit_http_callback,
   hoodie.assume.date.partitioning -> false,
   hoodie.filesystem.view.spillable.dir -> /tmp/view_map/,
   hoodie.compaction.strategy -> 
org.apache.hudi.table.action.compact.strategy.LogFileSizeBasedCompactionStrategy,
   hoodie.combine.before.upsert -> true,
   hoodie.bloom.index.keys.per.bucket -> 10000000,
   hoodie.write.commit.callback.class -> 
org.apache.hudi.callback.impl.HoodieWriteCommitHttpCallback,
   hoodie.bloom.index.parallelism -> 0,
   hoodie.cleaner.incremental.mode -> true,
   hoodie.commits.archival.batch -> 5,
   hoodie.datasource.hive_sync.partition_fields -> ,
   hoodie.compaction.target.io -> 512000,
   hoodie.table.name -> f_mid_business_card,
   hoodie.bloom.index.bucketized.checking -> true,
   hoodie.compaction.payload.class -> 
org.apache.hudi.common.model.OverwriteWithLatestAvroPayload,
   hoodie.combine.before.delete -> true,
   hoodie.datasource.write.precombine.field -> ts,
   hoodie.filesystem.view.spillable.bootstrap.base.file.mem.fraction -> 0.05,
   hoodie.metrics.jmx.host -> localhost,
   hoodie.index.bloom.fpp -> 0.000000001,
   hoodie.datasource.hive_sync.jdbcurl -> jdbc:hive2://172.16.116.102:10000,
   hoodie.bloom.index.use.treebased.filter -> true`
   
   **To Reproduce**
   Key code is
   `val hudiDF = buildHudiDF(tableConfig, partitionDF, dbNameSource, 
tableNameSource)
                 val hoodieWriteConfigMap = buildHudiWriteConfig(tableConfig)
                 hudiDF.write.format("hudi")
                   .options(hoodieWriteConfigMap)
                   .mode(SaveMode.Append)
                   .save(hoodieWriteConfigMap.getOrElse(BASE_PATH_PROP, 
"/tmp"))`
   
   `  private def buildHudiDF(tableConfig: TableConfig, sourceDataFrame: 
DataFrame, dbNameKafka: String, tblNameKafka: String): DataFrame = {
       val dataLakeColumns = tableConfig.getDataLakeColumns.asScala.toArray
       val commonSchema = new StructType(dataLakeColumns.map(column => 
StructField(column.getColumnName, StringType)))
       val castArray = dataLakeColumns.map(column => 
col(column.getColumnName).cast(column.getColumnType).toString())
       var targetDataFrame = sourceDataFrame
         .select(from_json(col("jsonData"), commonSchema).alias("row"), 
col("datalakeLogicalDeletion"))
         .select(col("row.*"), col("datalakeLogicalDeletion"))
         .selectExpr(castArray :+ 
(col("datalakeLogicalDeletion").alias("datalake_logical_deletion").toString()): 
_*)
         
.filter(col("dbName").isNull.or(col("dbName").equalTo(dbNameKafka)).and(col("tblName").equalTo(tblNameKafka)))
   
       val precombineField = tableConfig.getPrecombineFieldOptKey
       if (StringUtils.isEmpty(precombineField) || 
!commonSchema.contains(StructField(precombineField, StringType))) {
         targetDataFrame = targetDataFrame
           .withColumn("ts", current_timestamp())
       }
       val complexRecordKey = tableConfig.getRecordkeyFieldOptKey
       if (StringUtils.isNotEmpty(complexRecordKey)) {
         val keyArray = complexRecordKey.split(StringUtils.COMMA_SYMBOL)
         var conditionChain = 
col(keyArray(0)).isNotNull.and(col(keyArray(0)).notEqual(StringUtils.EMPTY))
         keyArray.tail.foreach(key => 
conditionChain.or(col(key).isNotNull.and(col(key).notEqual(StringUtils.EMPTY))))
         targetDataFrame = targetDataFrame.filter(conditionChain)
       }
   
       targetDataFrame
     }`
   
   `  private def buildHudiWriteConfig(tableConfig: TableConfig): 
mutable.Map[String, String] = {
       val options = new mutable.HashMap[String, String]
       val dbName = tableConfig.getDbName
       val tblName = tableConfig.getTableName
       val partition_field = tableConfig.getPartitionpathFieldOptKey
   
       options += (TABLE_TYPE_OPT_KEY -> tableConfig.getTableTypeOptKey)
       options += (OPERATION_OPT_KEY -> tableConfig.getOperationOptKey)
       options += (RECORDKEY_FIELD_OPT_KEY -> 
tableConfig.getRecordkeyFieldOptKey)
       options += (PRECOMBINE_FIELD_OPT_KEY -> 
tableConfig.getPrecombineFieldOptKey)
       options += (HIVE_SYNC_ENABLED_OPT_KEY -> "true")
       options += (INDEX_TYPE_PROP -> tableConfig.getIndexTypeProp)
       options += (BLOOM_INDEX_FILTER_TYPE -> 
BloomFilterTypeCode.DYNAMIC_V0.name())
       options += (KEYGENERATOR_CLASS_OPT_KEY -> 
tableConfig.getKeygeneratorClassOptKey)
       options += (HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY -> 
tableConfig.getHivePartitionExtractorClassOptKey)
       options += (HIVE_STYLE_PARTITIONING_OPT_KEY -> "true")
       options += (PARTITIONPATH_FIELD_OPT_KEY -> partition_field)
       options += (HIVE_PARTITION_FIELDS_OPT_KEY -> 
partition_field.split(",").map(pair => pair.split(":")(0)).mkString(","))
       options += (HIVE_DATABASE_OPT_KEY -> dbName)
       options += (HIVE_TABLE_OPT_KEY -> tblName)
       options += (HIVE_USE_JDBC_OPT_KEY -> tableConfig.getHiveUseJdbcOptKey)
       options += (HIVE_URL_OPT_KEY -> tableConfig.getHiveUrlOptKey)
   
       val hoodieCompactionConfig = HoodieCompactionConfig.newBuilder()
         .retainCommits(2)
         .withCommitsArchivalBatchSize(5)
         .archiveCommitsWith(6, 8)
         .withAsyncClean(true)
         .build()
   
       val hoodieIndexConfig = HoodieIndexConfig.newBuilder()
         .withBloomIndexUpdatePartitionPath(true)
         .build()
   
       val hoodieMemoryConfig = HoodieMemoryConfig.newBuilder()
         .withMaxMemoryMaxSize(128 * 1024, 1024 * 1024)
         .build()
   
       val hoodieWriteConfig = HoodieWriteConfig.newBuilder()
         .withParallelism(tableConfig.getShuffleParallelism.toInt, 
tableConfig.getShuffleParallelism.toInt)
         .withIndexConfig(hoodieIndexConfig)
         .withCompactionConfig(hoodieCompactionConfig)
         .withMemoryConfig(hoodieMemoryConfig)
         .withProps(options.asJava)
         .forTable(tblName)
         .withPath(StringUtils.concat(tableConfig.getBasePath, File.separator, 
tableConfig.getDbName, File.separator, tableConfig.getTableName))
         .build()
   
       logger.info(s"hoodieWriteConfig -> 
{${hoodieWriteConfig.getProps.asScala.toString()}}")
       hoodieWriteConfig.getProps.asScala
     }`
   
   
   but ,when the hoodieWriteConfig like below,it is ok.but the log file is too 
big(2GB+),and small data file(1MB-) too much. and cause OOM error each hour.
   `hoodie.datasource.hive_sync.table->f_mid_order_details,
   hoodie.bloom.index.update.partition.path->true,
   hoodie.bloom.index.filter.type->DYNAMIC_V0,
   
hoodie.datasource.write.keygenerator.class->org.apache.hudi.keygen.SimpleKeyGenerator,
   hoodie.datasource.hive_sync.database->hudi,
   hoodie.datasource.write.table.type->MERGE_ON_READ,
   hoodie.datasource.write.partitionpath.field->payment_date,
   hoodie.datasource.hive_sync.partition_fields->payment_date,
   
hoodie.datasource.hive_sync.partition_extractor_class->org.apache.hudi.hive.MultiPartKeysValueExtractor,
   hoodie.datasource.write.recordkey.field->key,
   hoodie.datasource.hive_sync.enable->true,
   hoodie.upsert.shuffle.parallelism->100,
   hoodie.index.type->GLOBAL_BLOOM,
   hoodie.datasource.hive_sync.jdbcurl->jdbc:hive2://172.16.117.73:10000,
   hoodie.compact.inline->true,
   hoodie.datasource.write.precombine.field->ts,
   hoodie.table.name->f_mid_order_details,
   hoodie.datasource.write.hive_style_partitioning->true,
   hoodie.datasource.write.operation->upsert`
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to