meatheadmike opened a new issue, #11960:
URL: https://github.com/apache/hudi/issues/11960

   **Describe the problem you faced**
   
   Hi Folks. I'm trying to get some advice here on how to better deal with a 
large upsert dataset.
   
   The data has a very wide key space and no great/obvious partitionable 
columns. What I'm aiming for is a record key with roughly 1.5 billion unique 
values. Any one of those records could be updated at any time. The pipeline so 
far is a pyspark based streaming application. The eventual source will be a 
kafka topic, but at the moment it's using a datagenerator set to emit 10000 
rows per second (specifically it's the dbldatagen python module). The sink is 
an S3 bucket with a MOR hudi table. The spark cluster is hosted under 
kubernetes on AWS. I'm using the latest hudi beta, the latest production spark 
(3.5.2), hive metastore 3.x and zookeeper for locking (latest production 
version). The executors are currently set to 10 at 8GB ram and 1 core each. And 
while I could feasibly increase the ram on the executors, I had previously done 
this (from 4 to 8) and it didn't buy me much. I seem to be topped out at 
roughly 500 million records (and 500 million distinct values on the record key).
   
   The issue is that while I can get the pipeline to run, eventually it gets to 
a point where it simply OOM's on the executors during a union on the upsert. 
I've tried numerous memory / parallelism tuning options to no avail.
   
   Is there a way to make this work without infinitely scaling memory? Is a 
wide keyspace like this a dead end? I did try partitioning by applying an md5 
to the record key and using the 1st two characters. That gave me 256 unique 
more-or-less evenly distributed partitions. But that led to drastically 
increased batch times. 
   
   Record level indexing is enabled. That seems to help things on the write 
path. On the read path I've noticed that when data skipping is enabled queries 
actually take significantly longer. So perhaps my index has grown unmanageable? 
Maybe this is partially responsible for the OOM's on the write side.
   
   **To Reproduce**
   
   Steps to reproduce the behavior:
   
   hudi config:
   ```
   {
       "hoodie.insert.shuffle.parallelism": "500",
       "hoodie.upsert.shuffle.parallelism": "500",
       "hoodie.delete.shuffle.parallelism": "500",
       "hoodie.bulkinsert.shuffle.parallelism": "500",
       "hoodie.clean.commits.retained": 5,
       "hoodie.clean.policy": "KEEP_LATEST_COMMITS",
       "hoodie.clean.fileversions.retained": "1",
       "hoodie.keep.min.commits": "10",
       "hoodie.datasource.compaction.async.enable": "true",
       "hoodie.logfile.data.block.max.size": 8388608,
       "hoodie.logfile.max.size": 8388608,
       "hoodie.datasource.hive_sync.db": "default",
       "hoodie.datasource.hive_sync.enable": true,
       "hoodie.datasource.hive_sync.ignore_exceptions": true,
       "hoodie.datasource.hive_sync.jdbcurl": 
"jdbc:hive2://hive-metastore.default:10000",
       "hoodie.datasource.hive_sync.metastore.uris": 
"thrift://hive-metastore.default:9083",
       "hoodie.datasource.hive_sync.mode": "hms",
       "hoodie.datasource.hive_sync.partition_extractor_class": 
"org.apache.hudi.hive.NonPartitionedExtractor",
       "hoodie.datasource.hive_sync.password": "",
       "hoodie.datasource.hive_sync.recreate_table_on_error": true,
       "hoodie.datasource.hive_sync.skip_ro_suffix": true,
       "hoodie.datasource.hive_sync.table": "hudi_sink",
       "hoodie.datasource.hive_sync.username": "hive",
       "hoodie.archive.async": true,
       "hoodie.clean.async.enabled": true,
       "hoodie.clean.automatic": true,
       "hoodie.cleaner.policy.failed.writes": "LAZY",
       "hoodie.clustering.async.enabled": false,
       "hoodie.clustering.async.max.commits": 0,
       "hoodie.clustering.execution.strategy.class": 
"org.apache.hudi.client.clustering.run.strategy.SparkSortAndSizeExecutionStrategy",
       "hoodie.clustering.inline": true,
       "hoodie.clustering.inline.max.commits": 0,
       "hoodie.clustering.plan.strategy.class": 
"org.apache.hudi.client.clustering.plan.strategy.SparkSizeBasedClusteringPlanStrategy",
       "hoodie.clustering.plan.strategy.sort.columns": "name",
       "hoodie.clustering.preserve.commit.metadata": true,
       "hoodie.clustering.rollback.pending.replacecommit.on.conflict": true,
       "hoodie.clustering.updates.strategy": 
"org.apache.hudi.client.clustering.update.strategy.SparkAllowUpdateStrategy",
       "hoodie.compact.inline.max.delta.commits": 2,
       "hoodie.datasource.meta.sync.enable": "true",
       "hoodie.datasource.read.incr.fallback.fulltablescan.enable": true,
       "hoodie.datasource.read.use.new.parquet.file.format": true,
       "hoodie.datasource.write.hive_style_partitioning": "true",
       "hoodie.datasource.write.operation": "upsert",
       "hoodie.datasource.write.precombine.field": "timestamp",
       "hoodie.datasource.write.record.merger.impls": 
"org.apache.hudi.HoodieSparkRecordMerger",
       "hoodie.datasource.write.recordkey.field": "name",
       "hoodie.datasource.write.row.writer.enable": true,
       "hoodie.datasource.write.streaming.checkpoint.identifier": 
"datagen-writer1",
       "hoodie.datasource.write.table.name": "hudi_sink",
       "hoodie.datasource.write.table.type": "MERGE_ON_READ",
       "hoodie.enable.data.skipping": true,
       "hoodie.index.type": "RECORD_INDEX",
       "hoodie.logfile.data.block.format": "parquet",
       "hoodie.merge.use.record.positions": true,
       "hoodie.metadata.auto.initialize": true,
       "hoodie.metadata.enable": true,
       "hoodie.metadata.clean.async": true,
       "hoodie.metadata.index.async": false,
       "hoodie.metadata.index.column.stats.columns": "name",
       "hoodie.metadata.index.column.stats.column.list": "name",
       "hoodie.metadata.index.column.stats.enable": true,
       "hoodie.metadata.record.index.enable": true,
       "hoodie.parquet.compression.codec": "snappy",
       "hoodie.record.index.use.caching": true,
       "hoodie.table.name": "hudi_sink",
       "hoodie.table.services.enabled": true,
       "hoodie.write.concurrency.early.conflict.detection.enable": true,
       "hoodie.write.concurrency.mode": "optimistic_concurrency_control",
       "hoodie.write.executor.type": "DISRUPTOR",
       "hoodie.write.lock.provider": 
"org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider",
       "hoodie.write.lock.zookeeper.base_path": "/hudi/locl/table",
       "hoodie.write.lock.zookeeper.lock_key": "hudi_sink",
       "hoodie.write.lock.zookeeper.port": "2181",
       "hoodie.write.lock.zookeeper.url": "zk-cs.default",
       "checkpointLocation": "s3a://XXXXX/hudi_checkpoints",
       "path": "s3a://XXXXX/hudi_out_data"
   }
   ```
   spark config:
   ```
   spark.checkpoint.compress true
   spark.driver.maxResultSize 2g
   spark.dynamicAllocation.shuffleTracking.enabled true
   spark.eventLog.dir s3a://XXXXX/spark_logs/
   spark.eventLog.enabled true
   spark.eventLog.rolling.enabled true
   spark.eventLog.rolling.maxFileSize 20m
   spark.executor.memoryOverhead 2g
   spark.hadoop.fs.s3a.aws.credentials.provider 
com.amazonaws.auth.WebIdentityTokenCredentialsProvider
   spark.hadoop.fs.s3a.fast.upload true
   spark.hadoop.fs.s3a.impl org.apache.hadoop.fs.s3a.S3AFileSystem
   spark.hadoop.fs.s3a.path.style.access true
   spark.hadoop.hive.metastore.warehouse.dir s3a://XXXXX/warehouse
   spark.hadoop.parquet.avro.write-old-list-structure false
   spark.history.fs.logDirectory s3a://XXXXX/spark_logs/
   spark.hive.metastore.warehouse.dir s3a://XXXXX/warehouse
   spark.io.compression.codec snappy
   spark.kryo.registrator org.apache.spark.HoodieSparkKryoRegistrar
   spark.memory.fraction 0.2
   spark.memory.storageFraction 0.2
   spark.rdd.compress true
   spark.serializer org.apache.spark.serializer.KryoSerializer
   spark.shuffle.service.enabled false # leave disabled as kubernetes doesn't 
support exrernal shuffle service
   spark.speculation false
   spark.streaming.stopGracefullyOnShutdown true
   spark.sql.catalog.spark_catalog 
org.apache.spark.sql.hudi.catalog.HoodieCatalog
   spark.sql.catalogImplementation hive
   spark.sql.extensions org.apache.spark.sql.hudi.HoodieSparkSessionExtension
   spark.sql.hive.convertMetastoreParquet false
   spark.sql.hive.metastore.jars path
   spark.sql.hive.metastore.jars.path 
file:///opt/apache-hive-3.1.3-bin/lib/*.jar
   spark.sql.hive.metastore.version 3.1.3
   spark.sql.parquet.enableVectorizedReader true
   spark.sql.shuffle.partitions 500
   spark.sql.streaming.stateStore.compression.codec snappy
   spark.sql.streaming.stateStore.providerClass 
org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider
   spark.sql.warehouse.dir s3a://XXXXX/warehouse
   spark.ui.prometheus.enabled true
   #
   # S3 optimization settings (see 
https://spark.apache.org/docs/latest/cloud-integration.html):
   #
   spark.hadoop.fs.s3a.committer.name directory
   spark.hadoop.parquet.enable.summary-metadata false
   spark.sql.hive.metastorePartitionPruning true
   spark.sql.orc.cache.stripe.details.size 10000
   spark.sql.orc.filterPushdown true
   spark.sql.orc.splits.include.file.footer true
   spark.sql.parquet.filterPushdown true
   spark.sql.parquet.mergeSchema false
   spark.sql.parquet.output.committer.class 
org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter
   spark.sql.sources.commitProtocolClass 
org.apache.spark.internal.io.cloud.PathOutputCommitProtocol
   spark.sql.streaming.checkpointFileManagerClass 
org.apache.spark.internal.io.cloud.AbortableStreamBasedCheckpointFileManager
   
   ```
   **Expected behavior**
   
   Pipeline should run without OOM
   
   **Environment Description**
   
   * Hudi version : 1.0.0-beta2
   
   * Spark version : 3.5.2
   
   * Hive version : 3.1.3
   
   * Hadoop version : 3.3.4
   
   * Storage (HDFS/S3/GCS..) : S3
   
   * Running on Docker? (yes/no) : yes
   
   
   **Additional context**
   
   Add any other context about the problem here.
   
   **Stacktrace**
   
   ```
   │ 24/09/18 18:10:29 WARN TaskSetManager: Lost task 5.0 in stage 68.0 (TID 
3753) (10.239.14.87 executor 8): java.lang.OutOfMemoryError: Java heap space    
                     │
   │     at java.base/java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:64)    
                                                                                
                  │
   │     at java.base/java.nio.ByteBuffer.allocate(ByteBuffer.java:363)         
                                                                                
                  │
   │     at 
org.apache.spark.storage.memory.SerializedValuesHolder.$anonfun$allocator$1(MemoryStore.scala:715)
                                                                    │
   │     at 
org.apache.spark.storage.memory.SerializedValuesHolder.$anonfun$allocator$1$adapted(MemoryStore.scala:715)
                                                            │
   │     at 
org.apache.spark.storage.memory.SerializedValuesHolder$$Lambda$1216/0x00007995a8a1fa28.apply(Unknown
 Source)                                                          │
   │     at 
org.apache.spark.util.io.ChunkedByteBufferOutputStream.allocateNewChunkIfNeeded(ChunkedByteBufferOutputStream.scala:87)
                                               │
   │     at 
org.apache.spark.util.io.ChunkedByteBufferOutputStream.write(ChunkedByteBufferOutputStream.scala:75)
                                                                  │
   │     at 
org.apache.spark.storage.memory.RedirectableOutputStream.write(MemoryStore.scala:809)
                                                                                
 │
   │     at 
net.jpountz.lz4.LZ4BlockOutputStream.flushBufferedData(LZ4BlockOutputStream.java:225)
                                                                                
 │
   │     at 
net.jpountz.lz4.LZ4BlockOutputStream.write(LZ4BlockOutputStream.java:178)       
                                                                                
      │
   │     at com.esotericsoftware.kryo.io.Output.flush(Output.java:185)          
                                                                                
                  │
   │     at com.esotericsoftware.kryo.io.Output.require(Output.java:164)        
                                                                                
                  │
   │     at 
com.esotericsoftware.kryo.io.UnsafeOutput.writeLong(UnsafeOutput.java:124)      
                                                                                
      │
   │     at 
com.esotericsoftware.kryo.io.UnsafeOutput.writeLong(UnsafeOutput.java:160)      
                                                                                
      │
   │     at 
com.esotericsoftware.kryo.serializers.DefaultSerializers$LongSerializer.write(DefaultSerializers.java:151)
                                                            │
   │     at 
com.esotericsoftware.kryo.serializers.DefaultSerializers$LongSerializer.write(DefaultSerializers.java:145)
                                                            │
   │     at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:629)     
                                                                                
                  │
   │     at 
com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:86)    
                                                                                
      │
   │     at 
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:508)
                                                                                
 │
   │     at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:575)           
                                                                                
                  │
   │     at 
com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:79)    
                                                                                
      │
   │     at 
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:508)
                                                                                
 │
   │     at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:651)   
                                                                                
                  │
   │     at 
org.apache.hudi.common.model.HoodieAvroRecord.writeRecordPayload(HoodieAvroRecord.java:225)
                                                                           │
   │     at 
org.apache.hudi.common.model.HoodieAvroRecord.writeRecordPayload(HoodieAvroRecord.java:48)
                                                                            │
   │     at 
org.apache.hudi.common.model.HoodieRecord.write(HoodieRecord.java:365)          
                                                                                
      │
   │     at 
com.esotericsoftware.kryo.serializers.DefaultSerializers$KryoSerializableSerializer.write(DefaultSerializers.java:514)
                                                │
   │     at 
com.esotericsoftware.kryo.serializers.DefaultSerializers$KryoSerializableSerializer.write(DefaultSerializers.java:512)
                                                │
   │     at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:651)   
                                                                                
                  │
   │     at 
org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:278)
                                                                             │
   │     at 
org.apache.spark.storage.memory.SerializedValuesHolder.storeValue(MemoryStore.scala:729)
                                                                              │
   │     at 
org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:224)  
    
   ```
   
   Any help / tips would be greatly appreciated!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to