bkosuru opened a new issue #4864:
URL: https://github.com/apache/hudi/issues/4864
Hello,
Insert with INSERT_DROP_DUPS_OPT_KEY fails after several hours. Any
suggestions to make it work? See the details below.
We want to prevent inserting duplicate records.
Hudi table size: 13.4 TB
Data size to insert: 3.8TB (uncompressed) {failed for 200GB input also}
The table has 2 partitions - spog/g=g1/p=p1
The data to be inserted belongs to one partition g=g2
The partition size is for g=g2 is 2TB
g2 has 44 p partitions with sizes ranging from 1.3 M to 270G
Environment Description:
Hudi version : 0.8.0
Spark version : 2.4.4
Storage (HDFS/S3/GCS..) : HDFS
Running on Docker? (yes/no) : No
Table type: COW
Spark settings:
new SparkConf()
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.set("spark.ui.enabled", "false")
.set("spark.sql.parquet.mergeSchema", "false")
.set("spark.sql.files.ignoreCorruptFiles", "true")
.set("spark.sql.hive.convertMetastoreParquet", "false")
--driver-memory 25G \
--executor-memory 50G \
--executor-cores 2 \
--num-executors 400 \
--conf spark.dynamicAllocation.enabled=False \
--conf spark.network.timeout=240s \
--conf spark.shuffle.sasl.timeout=60000 \
--conf spark.driver.maxResultSize=20g \
--conf spark.port.maxRetries=60 \
--conf spark.shuffle.service.enabled=True \
--conf spark.sql.shuffle.partitions=3000 \
--conf "spark.driver.extraJavaOptions=-XX:+HeapDumpOnOutOfMemoryError
-XX:HeapDumpPath=/tmp/hoodie-heapdump.hprof" \
--conf "spark.executor.extraJavaOptions=-XX:NewSize=1g -XX:SurvivorRatio=2
-XX:+UseCompressedOops -XX:+UseConcMarkSweepGC -XX:+UseParNewGC
-XX:CMSInitiatingOccupancyFraction=70 -XX:+HeapDumpOnOutOfMemoryError
-XX:HeapDumpPath=/tmp/hoodie-heapdump.hprof" \
--conf spark.driver.memoryOverhead=1024 \
--conf spark.executor.memoryOverhead=3072 \
--conf spark.yarn.max.executor.failures=100 \
--conf spark.kryoserializer.buffer.max=512m \
--conf spark.task.maxFailures=4 \
--conf spark.rdd.compress=True \
private val AVG_RECORD_SIZE: Int =
256 // approx bytes of our average record, contra Hudi default assumption of
1024
private val ONE_GIGABYTE: Int =
1024 * 1024 * 1024 // used for Parquet file size & block size
private val BLOOM_MAX_ENTRIES: Int = ONE_GIGABYTE / (2 * AVG_RECORD_SIZE)
df.write
.format("hudi")
// DataSourceWriteOptions
.option(HIVE_STYLE_PARTITIONING_OPT_KEY, "true")
.option( KEYGENERATOR_CLASS_OPT_KEY,"com.xyz.SpoKeyGenerator")
.option(OPERATION_OPT_KEY, INSERT_OPERATION_OPT_VAL)
.option(INSERT_DROP_DUPS_OPT_KEY, value = true)
.option(INSERT_PARALLELISM, 2000)
.option(PARTITIONPATH_FIELD_OPT_KEY, "g,p")
.option(PRECOMBINE_FIELD_OPT_KEY, "isDeleted")
.option(RECORDKEY_FIELD_OPT_KEY, "s,o")
.option(URL_ENCODE_PARTITIONING_OPT_KEY, value = true)
// HoodieIndexConfig
.option(HOODIE_BLOOM_INDEX_FILTER_DYNAMIC_MAX_ENTRIES, BLOOM_MAX_ENTRIES)
.option(BLOOM_INDEX_FILTER_TYPE, BloomFilterTypeCode.DYNAMIC_V0.name)
// HoodieCompactionConfig
.option(COPY_ON_WRITE_TABLE_RECORD_SIZE_ESTIMATE, 64)
// HoodieStorageConfig
.option(LOGFILE_SIZE_MAX_BYTES, ONE_GIGABYTE / 0.35)
.option(PARQUET_BLOCK_SIZE_BYTES, ONE_GIGABYTE)
.option(PARQUET_FILE_MAX_BYTES,ONE_GIGABYTE)
// Commit history
.option(CLEANER_COMMITS_RETAINED_PROP, Integer.MAX_VALUE - 2)
.option(MIN_COMMITS_TO_KEEP_PROP, Integer.MAX_VALUE - 1)
.option(MAX_COMMITS_TO_KEEP_PROP, Integer.MAX_VALUE)
// HoodieWriteConfig
.option(EMBEDDED_TIMELINE_SERVER_ENABLED, "false")
.option(TABLE_NAME, "spog")
.mode(SaveMode.Append)
class SpoKeyGenerator(props: TypedProperties)
extends ComplexKeyGenerator(props) {
def hash128(s: String): String = {
val h: Array[Long] = MurmurHash3.hash128(s.getBytes)
h(0).toString + h(1).toString
}
override def getRecordKey(record: GenericRecord): String = {
val s = HoodieAvroUtils.getNestedFieldValAsString(record, "s", false)
val o = HoodieAvroUtils.getNestedFieldValAsString(record, "o", false)
genKey(s, o)
}
private def genKey(s: String, o: String): String = hash128(s + o)
override def getRecordKey(row: Row): String = {
val s = row.getAs(0).toString
val o = row.getAs(1).toString
genKey(s, o)
}
}
Thanks,
Bindu
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]