meatheadmike opened a new issue, #11960:
URL: https://github.com/apache/hudi/issues/11960
**Describe the problem you faced**
Hi Folks. I'm trying to get some advice here on how to better deal with a
large upsert dataset.
The data has a very wide key space and no great/obvious partitionable
columns. What I'm aiming for is a record key with roughly 1.5 billion unique
values. Any one of those records could be updated at any time. The pipeline so
far is a pyspark based streaming application. The eventual source will be a
kafka topic, but at the moment it's using a datagenerator set to emit 10000
rows per second (specifically it's the dbldatagen python module). The sink is
an S3 bucket with a MOR hudi table. The spark cluster is hosted under
kubernetes on AWS. I'm using the latest hudi beta, the latest production spark
(3.5.2), hive metastore 3.x and zookeeper for locking (latest production
version). The executors are currently set to 10 at 8GB ram and 1 core each. And
while I could feasibly increase the ram on the executors, I had previously done
this (from 4 to 8) and it didn't buy me much. I seem to be topped out at
roughly 500 million records (and 500 million distinct values on the record key).
The issue is that while I can get the pipeline to run, eventually it gets to
a point where it simply OOM's on the executors during a union on the upsert.
I've tried numerous memory / parallelism tuning options to no avail.
Is there a way to make this work without infinitely scaling memory? Is a
wide keyspace like this a dead end? I did try partitioning by applying an md5
to the record key and using the 1st two characters. That gave me 256 unique
more-or-less evenly distributed partitions. But that led to drastically
increased batch times.
Record level indexing is enabled. That seems to help things on the write
path. On the read path I've noticed that when data skipping is enabled queries
actually take significantly longer. So perhaps my index has grown unmanageable?
Maybe this is partially responsible for the OOM's on the write side.
**To Reproduce**
Steps to reproduce the behavior:
hudi config:
```
{
"hoodie.insert.shuffle.parallelism": "500",
"hoodie.upsert.shuffle.parallelism": "500",
"hoodie.delete.shuffle.parallelism": "500",
"hoodie.bulkinsert.shuffle.parallelism": "500",
"hoodie.clean.commits.retained": 5,
"hoodie.clean.policy": "KEEP_LATEST_COMMITS",
"hoodie.clean.fileversions.retained": "1",
"hoodie.keep.min.commits": "10",
"hoodie.datasource.compaction.async.enable": "true",
"hoodie.logfile.data.block.max.size": 8388608,
"hoodie.logfile.max.size": 8388608,
"hoodie.datasource.hive_sync.db": "default",
"hoodie.datasource.hive_sync.enable": true,
"hoodie.datasource.hive_sync.ignore_exceptions": true,
"hoodie.datasource.hive_sync.jdbcurl":
"jdbc:hive2://hive-metastore.default:10000",
"hoodie.datasource.hive_sync.metastore.uris":
"thrift://hive-metastore.default:9083",
"hoodie.datasource.hive_sync.mode": "hms",
"hoodie.datasource.hive_sync.partition_extractor_class":
"org.apache.hudi.hive.NonPartitionedExtractor",
"hoodie.datasource.hive_sync.password": "",
"hoodie.datasource.hive_sync.recreate_table_on_error": true,
"hoodie.datasource.hive_sync.skip_ro_suffix": true,
"hoodie.datasource.hive_sync.table": "hudi_sink",
"hoodie.datasource.hive_sync.username": "hive",
"hoodie.archive.async": true,
"hoodie.clean.async.enabled": true,
"hoodie.clean.automatic": true,
"hoodie.cleaner.policy.failed.writes": "LAZY",
"hoodie.clustering.async.enabled": false,
"hoodie.clustering.async.max.commits": 0,
"hoodie.clustering.execution.strategy.class":
"org.apache.hudi.client.clustering.run.strategy.SparkSortAndSizeExecutionStrategy",
"hoodie.clustering.inline": true,
"hoodie.clustering.inline.max.commits": 0,
"hoodie.clustering.plan.strategy.class":
"org.apache.hudi.client.clustering.plan.strategy.SparkSizeBasedClusteringPlanStrategy",
"hoodie.clustering.plan.strategy.sort.columns": "name",
"hoodie.clustering.preserve.commit.metadata": true,
"hoodie.clustering.rollback.pending.replacecommit.on.conflict": true,
"hoodie.clustering.updates.strategy":
"org.apache.hudi.client.clustering.update.strategy.SparkAllowUpdateStrategy",
"hoodie.compact.inline.max.delta.commits": 2,
"hoodie.datasource.meta.sync.enable": "true",
"hoodie.datasource.read.incr.fallback.fulltablescan.enable": true,
"hoodie.datasource.read.use.new.parquet.file.format": true,
"hoodie.datasource.write.hive_style_partitioning": "true",
"hoodie.datasource.write.operation": "upsert",
"hoodie.datasource.write.precombine.field": "timestamp",
"hoodie.datasource.write.record.merger.impls":
"org.apache.hudi.HoodieSparkRecordMerger",
"hoodie.datasource.write.recordkey.field": "name",
"hoodie.datasource.write.row.writer.enable": true,
"hoodie.datasource.write.streaming.checkpoint.identifier":
"datagen-writer1",
"hoodie.datasource.write.table.name": "hudi_sink",
"hoodie.datasource.write.table.type": "MERGE_ON_READ",
"hoodie.enable.data.skipping": true,
"hoodie.index.type": "RECORD_INDEX",
"hoodie.logfile.data.block.format": "parquet",
"hoodie.merge.use.record.positions": true,
"hoodie.metadata.auto.initialize": true,
"hoodie.metadata.enable": true,
"hoodie.metadata.clean.async": true,
"hoodie.metadata.index.async": false,
"hoodie.metadata.index.column.stats.columns": "name",
"hoodie.metadata.index.column.stats.column.list": "name",
"hoodie.metadata.index.column.stats.enable": true,
"hoodie.metadata.record.index.enable": true,
"hoodie.parquet.compression.codec": "snappy",
"hoodie.record.index.use.caching": true,
"hoodie.table.name": "hudi_sink",
"hoodie.table.services.enabled": true,
"hoodie.write.concurrency.early.conflict.detection.enable": true,
"hoodie.write.concurrency.mode": "optimistic_concurrency_control",
"hoodie.write.executor.type": "DISRUPTOR",
"hoodie.write.lock.provider":
"org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider",
"hoodie.write.lock.zookeeper.base_path": "/hudi/locl/table",
"hoodie.write.lock.zookeeper.lock_key": "hudi_sink",
"hoodie.write.lock.zookeeper.port": "2181",
"hoodie.write.lock.zookeeper.url": "zk-cs.default",
"checkpointLocation": "s3a://XXXXX/hudi_checkpoints",
"path": "s3a://XXXXX/hudi_out_data"
}
```
spark config:
```
spark.checkpoint.compress true
spark.driver.maxResultSize 2g
spark.dynamicAllocation.shuffleTracking.enabled true
spark.eventLog.dir s3a://XXXXX/spark_logs/
spark.eventLog.enabled true
spark.eventLog.rolling.enabled true
spark.eventLog.rolling.maxFileSize 20m
spark.executor.memoryOverhead 2g
spark.hadoop.fs.s3a.aws.credentials.provider
com.amazonaws.auth.WebIdentityTokenCredentialsProvider
spark.hadoop.fs.s3a.fast.upload true
spark.hadoop.fs.s3a.impl org.apache.hadoop.fs.s3a.S3AFileSystem
spark.hadoop.fs.s3a.path.style.access true
spark.hadoop.hive.metastore.warehouse.dir s3a://XXXXX/warehouse
spark.hadoop.parquet.avro.write-old-list-structure false
spark.history.fs.logDirectory s3a://XXXXX/spark_logs/
spark.hive.metastore.warehouse.dir s3a://XXXXX/warehouse
spark.io.compression.codec snappy
spark.kryo.registrator org.apache.spark.HoodieSparkKryoRegistrar
spark.memory.fraction 0.2
spark.memory.storageFraction 0.2
spark.rdd.compress true
spark.serializer org.apache.spark.serializer.KryoSerializer
spark.shuffle.service.enabled false # leave disabled as kubernetes doesn't
support exrernal shuffle service
spark.speculation false
spark.streaming.stopGracefullyOnShutdown true
spark.sql.catalog.spark_catalog
org.apache.spark.sql.hudi.catalog.HoodieCatalog
spark.sql.catalogImplementation hive
spark.sql.extensions org.apache.spark.sql.hudi.HoodieSparkSessionExtension
spark.sql.hive.convertMetastoreParquet false
spark.sql.hive.metastore.jars path
spark.sql.hive.metastore.jars.path
file:///opt/apache-hive-3.1.3-bin/lib/*.jar
spark.sql.hive.metastore.version 3.1.3
spark.sql.parquet.enableVectorizedReader true
spark.sql.shuffle.partitions 500
spark.sql.streaming.stateStore.compression.codec snappy
spark.sql.streaming.stateStore.providerClass
org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider
spark.sql.warehouse.dir s3a://XXXXX/warehouse
spark.ui.prometheus.enabled true
#
# S3 optimization settings (see
https://spark.apache.org/docs/latest/cloud-integration.html):
#
spark.hadoop.fs.s3a.committer.name directory
spark.hadoop.parquet.enable.summary-metadata false
spark.sql.hive.metastorePartitionPruning true
spark.sql.orc.cache.stripe.details.size 10000
spark.sql.orc.filterPushdown true
spark.sql.orc.splits.include.file.footer true
spark.sql.parquet.filterPushdown true
spark.sql.parquet.mergeSchema false
spark.sql.parquet.output.committer.class
org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter
spark.sql.sources.commitProtocolClass
org.apache.spark.internal.io.cloud.PathOutputCommitProtocol
spark.sql.streaming.checkpointFileManagerClass
org.apache.spark.internal.io.cloud.AbortableStreamBasedCheckpointFileManager
```
**Expected behavior**
Pipeline should run without OOM
**Environment Description**
* Hudi version : 1.0.0-beta2
* Spark version : 3.5.2
* Hive version : 3.1.3
* Hadoop version : 3.3.4
* Storage (HDFS/S3/GCS..) : S3
* Running on Docker? (yes/no) : yes
**Additional context**
Add any other context about the problem here.
**Stacktrace**
```
│ 24/09/18 18:10:29 WARN TaskSetManager: Lost task 5.0 in stage 68.0 (TID
3753) (10.239.14.87 executor 8): java.lang.OutOfMemoryError: Java heap space
│
│ at java.base/java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:64)
│
│ at java.base/java.nio.ByteBuffer.allocate(ByteBuffer.java:363)
│
│ at
org.apache.spark.storage.memory.SerializedValuesHolder.$anonfun$allocator$1(MemoryStore.scala:715)
│
│ at
org.apache.spark.storage.memory.SerializedValuesHolder.$anonfun$allocator$1$adapted(MemoryStore.scala:715)
│
│ at
org.apache.spark.storage.memory.SerializedValuesHolder$$Lambda$1216/0x00007995a8a1fa28.apply(Unknown
Source) │
│ at
org.apache.spark.util.io.ChunkedByteBufferOutputStream.allocateNewChunkIfNeeded(ChunkedByteBufferOutputStream.scala:87)
│
│ at
org.apache.spark.util.io.ChunkedByteBufferOutputStream.write(ChunkedByteBufferOutputStream.scala:75)
│
│ at
org.apache.spark.storage.memory.RedirectableOutputStream.write(MemoryStore.scala:809)
│
│ at
net.jpountz.lz4.LZ4BlockOutputStream.flushBufferedData(LZ4BlockOutputStream.java:225)
│
│ at
net.jpountz.lz4.LZ4BlockOutputStream.write(LZ4BlockOutputStream.java:178)
│
│ at com.esotericsoftware.kryo.io.Output.flush(Output.java:185)
│
│ at com.esotericsoftware.kryo.io.Output.require(Output.java:164)
│
│ at
com.esotericsoftware.kryo.io.UnsafeOutput.writeLong(UnsafeOutput.java:124)
│
│ at
com.esotericsoftware.kryo.io.UnsafeOutput.writeLong(UnsafeOutput.java:160)
│
│ at
com.esotericsoftware.kryo.serializers.DefaultSerializers$LongSerializer.write(DefaultSerializers.java:151)
│
│ at
com.esotericsoftware.kryo.serializers.DefaultSerializers$LongSerializer.write(DefaultSerializers.java:145)
│
│ at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:629)
│
│ at
com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:86)
│
│ at
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:508)
│
│ at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:575)
│
│ at
com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:79)
│
│ at
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:508)
│
│ at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:651)
│
│ at
org.apache.hudi.common.model.HoodieAvroRecord.writeRecordPayload(HoodieAvroRecord.java:225)
│
│ at
org.apache.hudi.common.model.HoodieAvroRecord.writeRecordPayload(HoodieAvroRecord.java:48)
│
│ at
org.apache.hudi.common.model.HoodieRecord.write(HoodieRecord.java:365)
│
│ at
com.esotericsoftware.kryo.serializers.DefaultSerializers$KryoSerializableSerializer.write(DefaultSerializers.java:514)
│
│ at
com.esotericsoftware.kryo.serializers.DefaultSerializers$KryoSerializableSerializer.write(DefaultSerializers.java:512)
│
│ at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:651)
│
│ at
org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:278)
│
│ at
org.apache.spark.storage.memory.SerializedValuesHolder.storeValue(MemoryStore.scala:729)
│
│ at
org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:224)
```
Any help / tips would be greatly appreciated!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]