Ambarish-Giri commented on issue #3605:
URL: https://github.com/apache/hudi/issues/3605#issuecomment-926577602


   Hi @nsivabalan ,
   
   1# Correct I was considering {segmentId,uuid} , ComplexKey as record key as 
combined key uniquely identifies records, since partitioning is done on 
segmentId it makes sense to have just uuid as record key. I have taken care of 
the orthogonal issue you pointed out.
   
   2# Partitioning by segmentId for the data seems to be appropriate and its 
not of that low cardinality for eg. 50 GB data will have nearly 3000 unique 
segments and the consecutive upserts will just add to that number probably 1000 
more for upsert of equivalent data size .
   
   3# I am using MOR write strategy.
   
   4# Below are my cluster configuration:
   1* r5.2xlarge master node and 100* r5.4xlarge core nodes
   
   5# spark submit command:
   
   `spark-submit --master yarn --deploy-mode client --num-executors 100 
--driver-memory 12G --executor-memory 48G \
        --conf spark.yarn.executor.memoryOverhead=8192 \
         --conf spark.executor.extraJavaOptions="-XX:+UseG1GC" \
         --conf spark.shuffle.io.numConnectionsPerPeer=3 \
         --conf spark.shuffle.file.buffer=512k \
         --conf spark.memory.fraction=0.7 \
         --conf spark.memory.storageFraction=0.5 \
         --conf spark.kryo.unsafe=true \
         --conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem 
\
         --conf spark.hadoop.fs.s3a.connection.maximum=2000 \
         --conf spark.hadoop.fs.s3a.fast.upload=true \
         --conf spark.hadoop.fs.s3a.connection.establish.timeout=500 \
         --conf spark.hadoop.fs.s3a.connection.timeout=5000 \
         --conf spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version=2 \
         --conf spark.hadoop.com.amazonaws.services.s3.enableV4=true \
         --conf spark.hadoop.com.amazonaws.services.s3.enforceV4=true \
         --conf spark.yarn.nodemanager.pmem-check-enabled=true \
        --conf spark.yarn.nodemanager.vmem-check-enabled=true \
         --conf spark.driver.cores=4 \
         --conf spark.executor.cores=3 \
         --conf spark.yarn.driver.memoryOverhead=4096 \
        --conf spark.yarn.max.executor.failures=100 \
         --conf spark.task.cpus=1 \
         --conf spark.rdd.compress=true \
        --conf spark.yarn.maxAppAttempts=3 \
        --conf spark.segment.etl.numexecutors=100 \
        --conf spark.network.timeout=800 \
        --conf spark.shuffle.service.enabled=true \
        --conf spark.sql.hive.convertMetastoreParquet=false \
        --conf spark.task.maxFailures=4 \
        --conf spark.shuffle.minNumPartitionsToHighlyCompress=32 \
        --conf spark.segment.processor.partition.count=1536 \
        --conf spark.segment.processor.output-shard.count=60 \
        --conf 
spark.segment.processor.binseg.partition.threshold.bytes=500000000000 \
        --conf spark.driver.maxResultSize=2g \
        --conf spark.hadoop.fs.s3.maxRetries=2 \
        --conf spark.kryoserializer.buffer.max=512m \
        --conf spark.kryo.registrationRequired=false \
        --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
        --conf spark.sql.shuffle.partitions=1536 \
       --class  <class-name> \
        --jars 
/usr/lib/hudi/hudi-spark-bundle.jar,/usr/lib/spark/external/lib/spark-avro.jar  
\
        <jar-file-name>.jar`
   
   
   
   6# Below are the benchmarking metrics: 
        BulkInsert MoR (54 GB data) : 1 hr
        Upsert MoR (44 GB data) : 1.6 hr
   
   7# Below are the Hudi Config:
   BulkInsert: 
   `Df.write
         .format("hudi")
         .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY,
           DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
         .option(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY, keyGenClass)
         .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, key)
         .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, 
partitionKey)
         .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, combineKey)
         .option(HoodieWriteConfig.TABLE_NAME, tableName)
         .option(HoodieIndexConfig.INDEX_TYPE_PROP, 
HoodieIndex.IndexType.SIMPLE.toString)
         .option(HoodieIndexConfig.SIMPLE_INDEX_PARALLELISM_PROP, 100)
         .option(HoodieIndexConfig.SIMPLE_INDEX_INPUT_STORAGE_LEVEL, 
"DISK_ONLY")
         .option(HoodieWriteConfig.WRITE_STATUS_STORAGE_LEVEL, "DISK_ONLY")
         .option(DataSourceWriteOptions.OPERATION_OPT_KEY, 
DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
          .option(HoodieWriteConfig.UPSERT_PARALLELISM, 2000)
         .option(HoodieWriteConfig.COMBINE_BEFORE_UPSERT_PROP, "false")
         .option(HoodieStorageConfig.LOGFILE_SIZE_MAX_BYTES, 256 * 1024 * 1024)
         .option(HoodieStorageConfig.LOGFILE_TO_PARQUET_COMPRESSION_RATIO, 0.35)
         
.option(HoodieCompactionConfig.COPY_ON_WRITE_TABLE_RECORD_SIZE_ESTIMATE, 1024)
          
.option(HoodieCompactionConfig.COPY_ON_WRITE_TABLE_AUTO_SPLIT_INSERTS, "false")
         .option(HoodieCompactionConfig.COPY_ON_WRITE_TABLE_INSERT_SPLIT_SIZE, 
200 * 1000)
         .option(HoodieCompactionConfig.PARQUET_SMALL_FILE_LIMIT_BYTES, 0)
         .option(HoodieStorageConfig.PARQUET_FILE_MAX_BYTES, 50 * 1024 * 1024)
         .option(HoodieStorageConfig.PARQUET_BLOCK_SIZE_BYTES, 50 * 2014 * 1024)
         .option(DataSourceWriteOptions.HIVE_STYLE_PARTITIONING_OPT_KEY, 
"false")
         .mode(SaveMode.Append)
         .save(s"$basePath/$tableName/")`
   
   Upsert:
   `Df.write
         .format("hudi")
         .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY,
           DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
         .option(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY, keyGenClass)
         .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, key)
         
.option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY,partitionKey)
         .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, combineKey)
         .option(HoodieWriteConfig.TABLE_NAME, tableName)
         .option(HoodieIndexConfig.INDEX_TYPE_PROP, 
HoodieIndex.IndexType.SIMPLE.toString)
         .option(HoodieIndexConfig.SIMPLE_INDEX_PARALLELISM_PROP, 100)
         .option(HoodieIndexConfig.SIMPLE_INDEX_INPUT_STORAGE_LEVEL, 
"DISK_ONLY")
         .option(HoodieWriteConfig.WRITE_STATUS_STORAGE_LEVEL, "DISK_ONLY")
         .option(DataSourceWriteOptions.OPERATION_OPT_KEY, 
DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL)
          .option(HoodieWriteConfig.BULKINSERT_PARALLELISM, 2000)
          .option(HoodieWriteConfig.COMBINE_BEFORE_INSERT_PROP, false)
         .option(DataSourceWriteOptions.HIVE_STYLE_PARTITIONING_OPT_KEY, false)
         .mode(SaveMode.Overwrite)
         .save(s"$basePath/$tableName/")`
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to