FelixKJose opened a new issue #4891:
URL: https://github.com/apache/hudi/issues/4891
I am having a large partitioned MOR Hudi table and I have tried to perform
async clustering using hudi clustering utility but it failed without any stack
trace. Then I tried inline clustering but the clustering job failed with OOM
error. The clustering was performed on 365 partitions and each partition was
having 518 million records and each record (without compression) is 3-4 KB.
Then I tried to perform clustering on 10 partitions and it worked but it seems
like, the clustering is getting all the data for those partitions into Driver
memory after sorting and then partitioned back to worker nodes for writing.
1. How does normally people perform inline clustering or async clustering on
partitions with large amount of data? Do you expect driver memory should be
larger than the clustering data size?
2. What are the configurations I should be using to perform clustering on
these large tables?
3. In PROD I will have 1.8 billion records (each record 3-4 KB in memory),
so is it advised to perform clustering frequently (every 10 to 20 commits) or
daily?
4. Does MOR table supports async clustering with OCC assurance?
My config:
"hoodie.datasource.write.table.type": "MERGE_ON_READ",
"hoodie.datasource.write.precombine.field": "eventDateTime",
"hoodie.datasource.write.hive_style_partitioning": "true",
"hoodie.datasource.write.operation": "bulk_insert",
"hoodie.table.name": "flattened_calculations_mor_awstest_clust",
"hoodie.datasource.write.recordkey.field": "identifier",
"hoodie.datasource.hive_sync.table":
"flattened_calculations_mor_awstest_clust",
"hoodie.datasource.write.partitionpath.field":
"observationEndDate",
"hoodie.datasource.hive_sync.partition_fields":
"observationEndDate",
"hoodie.insert.shuffle.parallelism": 7050,
"hoodie.bulkinsert.shuffle.parallelism": 7050,
"hoodie.parquet.small.file.limit": 0,
"hoodie.datasource.clustering.inline.enable": "true",
"hoodie.clustering.inline.max.commits": 1,
"hoodie.clustering.plan.strategy.target.file.max.bytes":
1073741824,
"hoodie.clustering.plan.strategy.small.file.limit": 629145600,
"hoodie.cleaner.commits.retained": 1,
"hoodie.keep.min.commits": 2,
"hoodie.compact.inline": "true",
"hoodie.clustering.plan.strategy.sort.columns":
"patientIdentifier_identifier_value",
"hoodie.clustering.plan.strategy.daybased.lookback.partitions":
365
**Environment Description**
* Hudi version : 0.9.0
* Spark version : 3.1.0
* AWS EMR: 6.5.0
* Storage (HDFS/S3/GCS..) : S3
* Running on Docker? (yes/no) : NO
**Additional context**
For async clustering via Hudi Util:
sudo -s spark-submit \
--class org.apache.hudi.utilities.HoodieClusteringJob \
/usr/lib/hudi/hudi-utilities-bundle.jar \
--props s3://******/aws/config/clusteringjob.properties \
--mode scheduleAndExecute \
--base-path
s3://******/aws/ss2/device_observations/flattened_calculations_mor_awstest2_s/data/
\
--table-name flattened_calculations_mor_awstest2_s \
--spark-memory 12g
==clusteringjob.properties==
hoodie.clustering.async.enabled=true
hoodie.clustering.async.max.commits=4
hoodie.clustering.plan.strategy.target.file.max.bytes=1073741824
hoodie.clustering.plan.strategy.small.file.limit=629145600
hoodie.clustering.execution.strategy.class=org.apache.hudi.client.clustering.run.strategy.SparkSortAndSizeExecutionStrategy
hoodie.clustering.plan.strategy.sort.columns=patientIdentifier_identifier_value
I am getting following error:
**Stacktrace**
```22/02/08 20:17:52 INFO Javalin: Starting Javalin ...
22/02/08 20:17:52 INFO Javalin: Listening on http://localhost:46705/
22/02/08 20:17:52 INFO Javalin: Javalin started in 192ms 💃
22/02/08 20:17:52 INFO S3NativeFileSystem: Opening
's3://*****/aws/ss2/device_observations/flattened_calculations_mor_awstest2_s/data/.hoodie/hoodie.properties'
for reading
22/02/08 20:17:52 INFO Javalin: Stopping Javalin ...
22/02/08 20:17:52 INFO Javalin: Javalin has stopped
22/02/08 20:17:52 ERROR HoodieClusteringJob: Clustering with basePath:
s3://*****/aws/ss2/device_observations/flattened_calculations_mor_awstest2_s/data/,
tableName: flattened_calculations_mor_awstest2_s, runningMode:
scheduleAndExecute failed
22/02/08 20:17:52 INFO AbstractConnector: Stopped Spark@5d66941f{HTTP/1.1,
(http/1.1)}{0.0.0.0:4040}
22/02/08 20:17:52 INFO SparkUI: Stopped Spark web UI at
http://ip-10-57-102-186.ec2.internal:4040/
22/02/08 20:17:52 INFO YarnClientSchedulerBackend: Interrupting monitor
thread
22/02/08 20:17:52 INFO YarnClientSchedulerBackend: Shutting down all
executors
22/02/08 20:17:52 INFO YarnSchedulerBackend$YarnDriverEndpoint: Asking each
executor to shut down
22/02/08 20:17:52 INFO YarnClientSchedulerBackend: YARN client scheduler
backend Stopped
22/02/08 20:17:52 INFO MapOutputTrackerMasterEndpoint:
MapOutputTrackerMasterEndpoint stopped!
22/02/08 20:17:52 INFO MemoryStore: MemoryStore cleared
22/02/08 20:17:52 INFO BlockManager: BlockManager stopped
22/02/08 20:17:52 INFO BlockManagerMaster: BlockManagerMaster stopped
22/02/08 20:17:52 INFO
OutputCommitCoordinator$OutputCommitCoordinatorEndpoint:
OutputCommitCoordinator stopped!
22/02/08 20:17:52 INFO SparkContext: Successfully stopped SparkContext```
No other stack trace other than this ERROR HoodieClusteringJob: Clustering
with basePath:
s3://*****/aws/ss2/device_observations/flattened_calculations_mor_awstest2_s/data/,
tableName: flattened_calculations_mor_awstest2_s, runningMode:
scheduleAndExecute failed
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]