Ambarish-Giri commented on issue #3605:
URL: https://github.com/apache/hudi/issues/3605#issuecomment-926577602
Hi @nsivabalan ,
1# Correct I was considering {segmentId,uuid} , ComplexKey as record key as
combined key uniquely identifies records, since partitioning is done on
segmentId it makes sense to have just uuid as record key. I have taken care of
the orthogonal issue you pointed out.
2# Partitioning by segmentId for the data seems to be appropriate and its
not of that low cardinality for eg. 50 GB data will have nearly 3000 unique
segments and the consecutive upserts will just add to that number probably 1000
more for upsert of equivalent data size .
3# I am using MOR write strategy.
4# Below are my cluster configuration:
1* r5.2xlarge master node and 100* r5.4xlarge core nodes
5# spark submit command:
`spark-submit --master yarn --deploy-mode client --num-executors 100
--driver-memory 12G --executor-memory 48G \
--conf spark.yarn.executor.memoryOverhead=8192 \
--conf spark.executor.extraJavaOptions="-XX:+UseG1GC" \
--conf spark.shuffle.io.numConnectionsPerPeer=3 \
--conf spark.shuffle.file.buffer=512k \
--conf spark.memory.fraction=0.7 \
--conf spark.memory.storageFraction=0.5 \
--conf spark.kryo.unsafe=true \
--conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem
\
--conf spark.hadoop.fs.s3a.connection.maximum=2000 \
--conf spark.hadoop.fs.s3a.fast.upload=true \
--conf spark.hadoop.fs.s3a.connection.establish.timeout=500 \
--conf spark.hadoop.fs.s3a.connection.timeout=5000 \
--conf spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version=2 \
--conf spark.hadoop.com.amazonaws.services.s3.enableV4=true \
--conf spark.hadoop.com.amazonaws.services.s3.enforceV4=true \
--conf spark.yarn.nodemanager.pmem-check-enabled=true \
--conf spark.yarn.nodemanager.vmem-check-enabled=true \
--conf spark.driver.cores=4 \
--conf spark.executor.cores=3 \
--conf spark.yarn.driver.memoryOverhead=4096 \
--conf spark.yarn.max.executor.failures=100 \
--conf spark.task.cpus=1 \
--conf spark.rdd.compress=true \
--conf spark.yarn.maxAppAttempts=3 \
--conf spark.segment.etl.numexecutors=100 \
--conf spark.network.timeout=800 \
--conf spark.shuffle.service.enabled=true \
--conf spark.sql.hive.convertMetastoreParquet=false \
--conf spark.task.maxFailures=4 \
--conf spark.shuffle.minNumPartitionsToHighlyCompress=32 \
--conf spark.segment.processor.partition.count=1536 \
--conf spark.segment.processor.output-shard.count=60 \
--conf
spark.segment.processor.binseg.partition.threshold.bytes=500000000000 \
--conf spark.driver.maxResultSize=2g \
--conf spark.hadoop.fs.s3.maxRetries=2 \
--conf spark.kryoserializer.buffer.max=512m \
--conf spark.kryo.registrationRequired=false \
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
--conf spark.sql.shuffle.partitions=1536 \
--class <class-name> \
--jars
/usr/lib/hudi/hudi-spark-bundle.jar,/usr/lib/spark/external/lib/spark-avro.jar
\
<jar-file-name>.jar`
6# Below are the benchmarking metrics:
BulkInsert MoR (54 GB data) : 1 hr
Upsert MoR (44 GB data) : 1.6 hr
7# Below are the Hudi Config:
BulkInsert:
`Df.write
.format("hudi")
.option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY,
DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
.option(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY, keyGenClass)
.option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, key)
.option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY,
partitionKey)
.option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, combineKey)
.option(HoodieWriteConfig.TABLE_NAME, tableName)
.option(HoodieIndexConfig.INDEX_TYPE_PROP,
HoodieIndex.IndexType.SIMPLE.toString)
.option(HoodieIndexConfig.SIMPLE_INDEX_PARALLELISM_PROP, 100)
.option(HoodieIndexConfig.SIMPLE_INDEX_INPUT_STORAGE_LEVEL,
"DISK_ONLY")
.option(HoodieWriteConfig.WRITE_STATUS_STORAGE_LEVEL, "DISK_ONLY")
.option(DataSourceWriteOptions.OPERATION_OPT_KEY,
DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
.option(HoodieWriteConfig.UPSERT_PARALLELISM, 2000)
.option(HoodieWriteConfig.COMBINE_BEFORE_UPSERT_PROP, "false")
.option(HoodieStorageConfig.LOGFILE_SIZE_MAX_BYTES, 256 * 1024 * 1024)
.option(HoodieStorageConfig.LOGFILE_TO_PARQUET_COMPRESSION_RATIO, 0.35)
.option(HoodieCompactionConfig.COPY_ON_WRITE_TABLE_RECORD_SIZE_ESTIMATE, 1024)
.option(HoodieCompactionConfig.COPY_ON_WRITE_TABLE_AUTO_SPLIT_INSERTS, "false")
.option(HoodieCompactionConfig.COPY_ON_WRITE_TABLE_INSERT_SPLIT_SIZE,
200 * 1000)
.option(HoodieCompactionConfig.PARQUET_SMALL_FILE_LIMIT_BYTES, 0)
.option(HoodieStorageConfig.PARQUET_FILE_MAX_BYTES, 50 * 1024 * 1024)
.option(HoodieStorageConfig.PARQUET_BLOCK_SIZE_BYTES, 50 * 2014 * 1024)
.option(DataSourceWriteOptions.HIVE_STYLE_PARTITIONING_OPT_KEY,
"false")
.mode(SaveMode.Append)
.save(s"$basePath/$tableName/")`
Upsert:
`Df.write
.format("hudi")
.option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY,
DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
.option(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY, keyGenClass)
.option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, key)
.option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY,partitionKey)
.option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, combineKey)
.option(HoodieWriteConfig.TABLE_NAME, tableName)
.option(HoodieIndexConfig.INDEX_TYPE_PROP,
HoodieIndex.IndexType.SIMPLE.toString)
.option(HoodieIndexConfig.SIMPLE_INDEX_PARALLELISM_PROP, 100)
.option(HoodieIndexConfig.SIMPLE_INDEX_INPUT_STORAGE_LEVEL,
"DISK_ONLY")
.option(HoodieWriteConfig.WRITE_STATUS_STORAGE_LEVEL, "DISK_ONLY")
.option(DataSourceWriteOptions.OPERATION_OPT_KEY,
DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL)
.option(HoodieWriteConfig.BULKINSERT_PARALLELISM, 2000)
.option(HoodieWriteConfig.COMBINE_BEFORE_INSERT_PROP, false)
.option(DataSourceWriteOptions.HIVE_STYLE_PARTITIONING_OPT_KEY, false)
.mode(SaveMode.Overwrite)
.save(s"$basePath/$tableName/")`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]