stym06 opened a new issue #4713:
URL: https://github.com/apache/hudi/issues/4713
**Describe the problem you faced**
While upserting Mongo oplogs from Kafka to Blob, facing Executor OOM
**Environment Description**
* Hudi version : 0.9.0
* Spark version : 2.4.4
* Hive version : 3.1.2
* Hadoop version : 2.7.3
* Storage (HDFS/S3/GCS..) : Azure Blob
* Running on Docker? (yes/no) : K8s
**Additional context**
Spark K8s yaml file
```
apiVersion: "sparkoperator.k8s.io/v1beta2"
kind: SparkApplication
metadata:
name: hudi-ss-ah-ds-{{ ti.job_id }}
namespace: dataplatform
labels:
spark_name: hudi-ss-ah-ds-{{ ti.job_id }}
dag_name: hudi-ss-ah
task_name: ds
environment: "prod"
cloud: "aws"
tier: "t2"
team: "dataplatform"
service_type: "airflow"
k8s_cluster_name: "tapi"
plip_version: 0.1.10-dp-ev
spec:
type: Java
mode: cluster
image: "hudi-ds:4"
imagePullPolicy: Always
mainClass: org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer
mainApplicationFile:
"local:///opt/spark/hudi/hudi-utilities-bundle_2.11-0.9.0-SNAPSHOT.jar"
deps:
packages:
- org.apache.spark:spark-avro_2.11:2.4.4
sparkConf:
"spark.serializer": "org.apache.spark.serializer.KryoSerializer"
"spark.memory.fraction": "0.2"
"spark.memory.storageFraction": "0.2"
arguments:
- "--table-type"
- "COPY_ON_WRITE"
- "--props"
- "/opt/spark/hudi/config/source.properties"
- "--schemaprovider-class"
- "org.apache.hudi.utilities.schema.SchemaRegistryProvider"
- "--source-class"
- "org.apache.hudi.utilities.sources.JsonKafkaSource"
- "--target-base-path"
- "s3a://<ourbucket>/fusion/mongo/data/application_histories"
- "--target-table"
- "application_histories"
- "--op"
- "UPSERT"
- "--source-ordering-field"
- "__ts_ms"
- "--continuous"
- "--min-sync-interval-seconds"
- "60"
sparkVersion: "2.4.4"
restartPolicy:
type: Always
onFailureRetries: 100000
onFailureRetryInterval: 60
onSubmissionFailureRetries: 100000
onSubmissionFailureRetryInterval: 60
timeToLiveSeconds: 3600
volumes:
- name: hudi-ss-ah-ds
configMap:
name: hudi-ss-ah-ds
driver:
env:
- name: HOODIE_ENV_fs_DOT_s3a_DOT_access_DOT_key
value: {{ var.value.HOODIE_ENV_fs_DOT_s3a_DOT_access_DOT_key }}
- name: HOODIE_ENV_fs_DOT_s3a_DOT_secret_DOT_key
value: {{ var.value.HOODIE_ENV_fs_DOT_s3a_DOT_secret_DOT_key }}
- name: HOODIE_ENV_fs_DOT_s3a_DOT_impl
value: org.apache.hadoop.fs.s3a.S3AFileSystem
cores: 1
coreLimit: "1200m"
memory: "4G"
serviceAccount: "dataplatform"
volumeMounts:
- name: hudi-ss-ah-ds
mountPath: /opt/spark/hudi/config
subpath: config.yaml
javaOptions: "-Dnetworkaddress.cache.ttl=60 -Duser.timezone=IST
-XX:+PrintGCApplicationConcurrentTime -XX:+PrintGCTimeStamps
-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp/varadarb_ds_driver.hprof"
executor:
env:
- name: HOODIE_ENV_fs_DOT_s3a_DOT_access_DOT_key
value: {{ var.value.HOODIE_ENV_fs_DOT_s3a_DOT_access_DOT_key }}
- name: HOODIE_ENV_fs_DOT_s3a_DOT_secret_DOT_key
value: {{ var.value.HOODIE_ENV_fs_DOT_s3a_DOT_secret_DOT_key }}
- name: HOODIE_ENV_fs_DOT_s3a_DOT_impl
value: org.apache.hadoop.fs.s3a.S3AFileSystem
cores: 1
instances: 20
memory: "6G"
volumeMounts:
- name: hudi-ss-ah-ds
mountPath: /opt/spark/hudi/config
subpath: config.yaml
javaOptions: "-Dnetworkaddress.cache.ttl=60 -Duser.timezone=IST
-XX:+PrintGCApplicationConcurrentTime -XX:+PrintGCTimeStamps
-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp/varadarb_ds_driver.hprof"
sparkUIOptions:
ingressAnnotations:
kubernetes.io/ingress.class: nginx
monitoring:
exposeDriverMetrics: true
exposeExecutorMetrics: true
prometheus:
jmxExporterJar:
"/opt/spark/hudi/prometheus/jmx_prometheus_javaagent-0.16.1.jar"
port: 8090
```
source.properties
```
#base properties
hoodie.upsert.shuffle.parallelism=500
hoodie.insert.shuffle.parallelism=50
hoodie.delete.shuffle.parallelism=50
hoodie.bulkinsert.shuffle.parallelism=10
hoodie.embed.timeline.server=true
hoodie.filesystem.view.type=EMBEDDED_KV_STORE
hoodie.compact.inline=false
#datasource properties
hoodie.deltastreamer.schemaprovider.registry.url=http://localhost:8081/subjects/mongo.self_signup.application_histories-value/versions/latest
hoodie.datasource.write.recordkey.field=id
hoodie.datasource.write.partitionpath.field=
hoodie.deltastreamer.source.kafka.topic=self_signup.application_histories
hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.NonpartitionedKeyGenerator
hoodie.deltastreamer.kafka.source.maxEvents=50000
#cleaning
hoodie.cleaner.policy=KEEP_LATEST_COMMITS
hoodie.cleaner.commits.retained=1
hoodie.clean.async=true
#archival
hoodie.keep.min.commits=12
hoodie.keep.max.commits=15
#kafka props
bootstrap.servers=localhost:9092
auto.offset.reset=earliest
schema.registry.url=http://localhost:8081
#prometheus
hoodie.metrics.on=true
hoodie.metrics.reporter.type=PROMETHEUS_PUSHGATEWAY
hoodie.metrics.pushgateway.host=k8s-prometheus-pushgateway.observability.svc.cluster.local
hoodie.metrics.pushgateway.port=9091
hoodie.metrics.pushgateway.delete.on.shutdown=false
hoodie.metrics.pushgateway.random.job.name.suffix=false
hoodie.metrics.pushgateway.job.name=hudi-ss-ah
```
**Stacktrace**
```
22/01/28 17:40:05 INFO DAGScheduler: ShuffleMapStage 39 (countByKey at
SparkHoodieBloomIndex.java:114) failed in 5.523 s due to
org.apache.spark.shuffle.FetchFailedException: Failure while fetching
StreamChunkId{streamId=489876428219, chunkIndex=0}: java.io.IOException: Out of
memory
at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
at sun.nio.ch.FileDispatcherImpl.read(FileDispatcherImpl.java:46)
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
at sun.nio.ch.IOUtil.read(IOUtil.java:197)
at sun.nio.ch.FileChannelImpl.read(FileChannelImpl.java:159)
at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:65)
at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:109)
at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103)
at java.io.DataInputStream.readFully(DataInputStream.java:195)
at java.io.DataInputStream.readLong(DataInputStream.java:416)
at
org.apache.spark.shuffle.IndexShuffleBlockResolver.getBlockData(IndexShuffleBlockResolver.scala:208)
at
org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:382)
at
org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$1.apply(NettyBlockRpcServer.scala:61)
at
org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$1.apply(NettyBlockRpcServer.scala:60)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
at
scala.collection.convert.Wrappers$IteratorWrapper.next(Wrappers.scala:31)
at
org.apache.spark.network.server.OneForOneStreamManager.getChunk(OneForOneStreamManager.java:87)
at
org.apache.spark.network.server.TransportRequestHandler.processFetchRequest(TransportRequestHandler.java:130)
at
org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:101)
at
org.apache.spark.network.server.TransportChannelHandler.channelRead(TransportChannelHandler.java:118)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
at
io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
at
io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
at
org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:85)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
at
io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1359)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:935)
at
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:138)
at
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645)
at
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)
at
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
at
io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
at
io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
at java.lang.Thread.run(Thread.java:748)
at
org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:554)
at
org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:485)
at
org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:64)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31)
at
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at
org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:156)
at
org.apache.spark.Aggregator.combineCombinersByKey(Aggregator.scala:50)
at
org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:84)
at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:105)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:337)
at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:335)
at
org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1182)
at
org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156)
at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
at
org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
at
org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:286)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
at org.apache.spark.scheduler.Task.run(Task.scala:123)
at
org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]