bkosuru opened a new issue #4864:
URL: https://github.com/apache/hudi/issues/4864


   Hello,
   
   Insert with INSERT_DROP_DUPS_OPT_KEY fails after several hours. Any 
suggestions to make it work? See the details below.
   We want to prevent inserting duplicate records.
   
   Hudi table size: 13.4 TB
   Data size to insert: 3.8TB (uncompressed) {failed for 200GB input also}
   The table has 2 partitions - spog/g=g1/p=p1
   The data to be inserted belongs to one partition g=g2
   The partition size is for g=g2 is 2TB
   g2 has 44 p partitions with sizes ranging from 1.3 M to 270G
   
   Environment Description:
   Hudi version : 0.8.0
   Spark version : 2.4.4
   Storage (HDFS/S3/GCS..) : HDFS
   Running on Docker? (yes/no) : No
   Table type: COW
   
   Spark settings:
   new SparkConf()
   .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
   .set("spark.ui.enabled", "false")
   .set("spark.sql.parquet.mergeSchema", "false")
   .set("spark.sql.files.ignoreCorruptFiles", "true")
   .set("spark.sql.hive.convertMetastoreParquet", "false")
   
   --driver-memory 25G \
   --executor-memory 50G \
   --executor-cores 2 \
   --num-executors 400 \
   --conf spark.dynamicAllocation.enabled=False \
   --conf spark.network.timeout=240s \
   --conf spark.shuffle.sasl.timeout=60000 \
   --conf spark.driver.maxResultSize=20g \
   --conf spark.port.maxRetries=60 \
   --conf spark.shuffle.service.enabled=True \
   --conf spark.sql.shuffle.partitions=3000 \
   --conf "spark.driver.extraJavaOptions=-XX:+HeapDumpOnOutOfMemoryError 
-XX:HeapDumpPath=/tmp/hoodie-heapdump.hprof" \
   --conf "spark.executor.extraJavaOptions=-XX:NewSize=1g -XX:SurvivorRatio=2 
-XX:+UseCompressedOops -XX:+UseConcMarkSweepGC -XX:+UseParNewGC 
-XX:CMSInitiatingOccupancyFraction=70 -XX:+HeapDumpOnOutOfMemoryError 
-XX:HeapDumpPath=/tmp/hoodie-heapdump.hprof" \
   --conf spark.driver.memoryOverhead=1024 \
   --conf spark.executor.memoryOverhead=3072 \
   --conf spark.yarn.max.executor.failures=100 \
   --conf spark.kryoserializer.buffer.max=512m \
   --conf spark.task.maxFailures=4 \
   --conf spark.rdd.compress=True \
   
   private val AVG_RECORD_SIZE: Int =
   256 // approx bytes of our average record, contra Hudi default assumption of 
1024
   private val ONE_GIGABYTE: Int =
   1024 * 1024 * 1024 // used for Parquet file size & block size
   private val BLOOM_MAX_ENTRIES: Int = ONE_GIGABYTE / (2 * AVG_RECORD_SIZE)
   
   df.write
   .format("hudi")
   // DataSourceWriteOptions
   .option(HIVE_STYLE_PARTITIONING_OPT_KEY, "true")
   .option( KEYGENERATOR_CLASS_OPT_KEY,"com.xyz.SpoKeyGenerator")
   .option(OPERATION_OPT_KEY, INSERT_OPERATION_OPT_VAL)
   .option(INSERT_DROP_DUPS_OPT_KEY, value = true)
   .option(INSERT_PARALLELISM, 2000)
   .option(PARTITIONPATH_FIELD_OPT_KEY, "g,p")
   .option(PRECOMBINE_FIELD_OPT_KEY, "isDeleted")
   .option(RECORDKEY_FIELD_OPT_KEY, "s,o")
   .option(URL_ENCODE_PARTITIONING_OPT_KEY, value = true)
   // HoodieIndexConfig
   .option(HOODIE_BLOOM_INDEX_FILTER_DYNAMIC_MAX_ENTRIES, BLOOM_MAX_ENTRIES)
   .option(BLOOM_INDEX_FILTER_TYPE, BloomFilterTypeCode.DYNAMIC_V0.name)
   // HoodieCompactionConfig
   .option(COPY_ON_WRITE_TABLE_RECORD_SIZE_ESTIMATE, 64)
   // HoodieStorageConfig
   .option(LOGFILE_SIZE_MAX_BYTES, ONE_GIGABYTE / 0.35)
   .option(PARQUET_BLOCK_SIZE_BYTES, ONE_GIGABYTE)
   .option(PARQUET_FILE_MAX_BYTES,ONE_GIGABYTE)
   // Commit history
   .option(CLEANER_COMMITS_RETAINED_PROP, Integer.MAX_VALUE - 2)
   .option(MIN_COMMITS_TO_KEEP_PROP, Integer.MAX_VALUE - 1)
   .option(MAX_COMMITS_TO_KEEP_PROP, Integer.MAX_VALUE)
   // HoodieWriteConfig
   .option(EMBEDDED_TIMELINE_SERVER_ENABLED, "false")
   .option(TABLE_NAME, "spog")
   .mode(SaveMode.Append)
   
   class SpoKeyGenerator(props: TypedProperties)
   extends ComplexKeyGenerator(props) {
   
   def hash128(s: String): String = {
   val h: Array[Long] = MurmurHash3.hash128(s.getBytes)
   h(0).toString + h(1).toString
   }
   
   override def getRecordKey(record: GenericRecord): String = {
   val s = HoodieAvroUtils.getNestedFieldValAsString(record, "s", false)
   val o = HoodieAvroUtils.getNestedFieldValAsString(record, "o", false)
   genKey(s, o)
   }
   
   private def genKey(s: String, o: String): String = hash128(s + o)
   
   override def getRecordKey(row: Row): String = {
   val s = row.getAs(0).toString
   val o = row.getAs(1).toString
   genKey(s, o)
   }
   
   }
   
   Thanks,
   Bindu


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to