jtmzheng opened a new issue #2408:
URL: https://github.com/apache/hudi/issues/2408
**Describe the problem you faced**
We have a Spark Streaming application running on EMR 5.31.0 that reads from
a Kinesis stream (batch interval of 30 minutes) and upserts to a MOR dataset
that is partitioned by date. This dataset is ~ 2.2 TB in size and ~ 6 billion
records. Our problem is the application is now persistently crashing on an
OutOfMemory error regardless of the batch input size (stacktrace attached below
is for an input of ~ 1 million records and size ~ 250 mb). For debugging we've
tried replacing the Hudi upsert with a simple count and afterwards there seems
to be minimal memory usage by the application based on the Spark UI.
```
df.write.format("org.apache.hudi").options(**hudi_options).mode("append").save(OUTPUT_PATH)```
This seems similar to
https://github.com/apache/hudi/issues/1491#issuecomment-610626104 since it
seems to be running out of memory on reading old records based on the
stacktrace. There are a lot of large log files in the dataset:
```
...
2021-01-01 00:28:08 910487106
hudi/production/transactions_v9/2020/12/30/.0ef05182-e0ad-44b5-b52d-0412a6b44a01-1_20210101033018.log.1_3774-34-62086
2021-01-01 21:03:26 910490317
hudi/production/transactions_v9/2020/12/30/.0ef05182-e0ad-44b5-b52d-0412a6b44a01-1_20210101033018.log.11_3774-34-62109
2021-01-01 16:52:39 910495970
hudi/production/transactions_v9/2020/12/30/.0ef05182-e0ad-44b5-b52d-0412a6b44a01-1_20210101033018.log.9_3774-34-62083
```
Our Hudi configs are:
```
hudi_options = {
"hoodie.table.name": "transactions",
"hoodie.datasource.write.recordkey.field": "id.value",
"hoodie.datasource.write.keygenerator.class":
"org.apache.hudi.keygen.ComplexKeyGenerator",
"hoodie.datasource.write.partitionpath.field": "year,month,day",
"hoodie.datasource.write.table.name": "transactions",
"hoodie.datasource.write.table.type": "MERGE_ON_READ",
"hoodie.datasource.write.operation": "upsert",
"hoodie.consistency.check.enabled": "true",
"hoodie.datasource.write.precombine.field": "publishedAtUnixNano",
"hoodie.compact.inline": True,
"hoodie.compact.inline.max.delta.commits": 10,
"hoodie.cleaner.commits.retained": 1,
}
```
Our Spark configs are (CORES_PER_EXECUTOR = 5, NUM_EXECUTORS = 30) for a
cluster running on 10 r5.4xlarge instances:
```
"spark.serializer": "org.apache.spark.serializer.KryoSerializer",
"spark.sql.hive.convertMetastoreParquet": "false",
# Recommended from
https://aws.amazon.com/blogs/big-data/best-practices-for-successfully-managing-memory-for-apache-spark-applications-on-amazon-emr/
"spark.executor.cores": CORES_PER_EXECUTOR,
"spark.executor.memory": "36g",
"spark.yarn.executor.memoryOverhead": "4g",
"spark.driver.cores": CORES_PER_EXECUTOR,
"spark.driver.memory": "36g",
"spark.executor.instances": NUM_EXECUTORS,
"spark.default.parallelism": NUM_EXECUTORS * CORES_PER_EXECUTOR
* 2,
"spark.dynamicAllocation.enabled": "false",
"spark.streaming.dynamicAllocation.enabled": "false",
"spark.streaming.backpressure.enabled": "true",
# Set max rate limit per receiver to limit memory usage
"spark.streaming.receiver.maxRate": "10",
# Shutdown gracefully on JVM shutdown
"spark.streaming.stopGracefullyOnShutdown": "true",
# GC options
(https://databricks.com/blog/2015/05/28/tuning-java-garbage-collection-for-spark-applications.html)
"spark.executor.defaultJavaOptions": "-XX:+UseG1GC
-XX:+PrintFlagsFinal -XX:+PrintReferenceGC -verbose:gc -XX:+PrintGCDetails
-XX:+PrintGCTimeStamps -XX:+PrintAdaptiveSizePolicy
-XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark
-XX:InitiatingHeapOccupancyPercent=35 -XX:ConcGCThreads=12",
"spark.driver.defaultJavaOptions": "-XX:+UseG1GC
-XX:+PrintFlagsFinal -XX:+PrintReferenceGC -verbose:gc -XX:+PrintGCDetails
-XX:+PrintGCTimeStamps -XX:+PrintAdaptiveSizePolicy
-XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark
-XX:InitiatingHeapOccupancyPercent=35 -XX:ConcGCThreads=12",
"spark.streaming.kinesis.retry.waitTime": "1000ms", # default
is 100 ms
"spark.streaming.kinesis.retry.maxAttempts": "10", # default is
10
# Set max retries on S3 rate limits
"spark.hadoop.fs.s3.maxRetries": "20",
```
**Environment Description**
* Hudi version : 0.6.0
* Spark version : 2.4.6 (EMR 5.31.0)
* Hive version : Hive 2.3.7
* Hadoop version : Amazon 2.10.0
* Storage (HDFS/S3/GCS..) : S3
* Running on Docker? (yes/no) : no
**Additional context**
Although the records are partitioned by date the Kinesis stream contains
many backfill records as well so many partitions may be upserted to in a single
batch.
Please let me know what other context I can provide that would be useful.
Thank you!
**Stacktrace**
```Traceback (most recent call last):
File "stream_transactions.py", line 965, in <module>
ssc.awaitTermination()
File
"/mnt2/yarn/usercache/hadoop/appcache/application_1609869637845_0001/container_1609869637845_0001_01_000001/pyspark.zip/pyspark/streaming/context.py",
line 192, in awaitTermination
File
"/mnt2/yarn/usercache/hadoop/appcache/application_1609869637845_0001/container_1609869637845_0001_01_000001/py4j-0.10.7-src.zip/py4j/java_gateway.py",
line 1257, in __call__
File
"/mnt2/yarn/usercache/hadoop/appcache/application_1609869637845_0001/container_1609869637845_0001_01_000001/pyspark.zip/pyspark/sql/utils.py",
line 63, in deco
File
"/mnt2/yarn/usercache/hadoop/appcache/application_1609869637845_0001/container_1609869637845_0001_01_000001/py4j-0.10.7-src.zip/py4j/protocol.py",
line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling
o89.awaitTermination.
: org.apache.spark.SparkException: An exception was raised by Python:
Traceback (most recent call last):
File
"/mnt2/yarn/usercache/hadoop/appcache/application_1609869637845_0001/container_1609869637845_0001_01_000001/pyspark.zip/pyspark/streaming/util.py",
line 68, in call
r = self.func(t, *rdds)
File "stream_transactions.py", line 954, in process
df.write.format("org.apache.hudi").options(**hudi_options).mode("append").save(OUTPUT_PATH)
File
"/mnt2/yarn/usercache/hadoop/appcache/application_1609869637845_0001/container_1609869637845_0001_01_000001/pyspark.zip/pyspark/sql/readwriter.py",
line 739, in save
self._jwrite.save(path)
File
"/mnt2/yarn/usercache/hadoop/appcache/application_1609869637845_0001/container_1609869637845_0001_01_000001/py4j-0.10.7-src.zip/py4j/java_gateway.py",
line 1257, in __call__
answer, self.gateway_client, self.target_id, self.name)
File
"/mnt2/yarn/usercache/hadoop/appcache/application_1609869637845_0001/container_1609869637845_0001_01_000001/pyspark.zip/pyspark/sql/utils.py",
line 63, in deco
return f(*a, **kw)
File
"/mnt2/yarn/usercache/hadoop/appcache/application_1609869637845_0001/container_1609869637845_0001_01_000001/py4j-0.10.7-src.zip/py4j/protocol.py",
line 328, in get_return_value
format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o1902.save.
: java.lang.OutOfMemoryError
at
java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123)
at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117)
at
java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
at java.io.DataOutputStream.write(DataOutputStream.java:107)
at java.io.FilterOutputStream.write(FilterOutputStream.java:97)
at
org.apache.hudi.common.table.log.block.HoodieAvroDataBlock.serializeRecords(HoodieAvroDataBlock.java:114)
at
org.apache.hudi.common.table.log.block.HoodieDataBlock.getContentBytes(HoodieDataBlock.java:95)
at
org.apache.hudi.common.table.log.HoodieLogFormatWriter.appendBlock(HoodieLogFormatWriter.java:139)
at
org.apache.hudi.table.HoodieTimelineArchiveLog.writeToFile(HoodieTimelineArchiveLog.java:309)
at
org.apache.hudi.table.HoodieTimelineArchiveLog.archive(HoodieTimelineArchiveLog.java:282)
at
org.apache.hudi.table.HoodieTimelineArchiveLog.archiveIfRequired(HoodieTimelineArchiveLog.java:133)
at
org.apache.hudi.client.HoodieWriteClient.postCommit(HoodieWriteClient.java:381)
at
org.apache.hudi.client.AbstractHoodieWriteClient.commitStats(AbstractHoodieWriteClient.java:126)
at
org.apache.hudi.client.AbstractHoodieWriteClient.commit(AbstractHoodieWriteClient.java:99)
at
org.apache.hudi.client.AbstractHoodieWriteClient.commit(AbstractHoodieWriteClient.java:90)
at
org.apache.hudi.HoodieSparkSqlWriter$.commitAndPerformPostOperations(HoodieSparkSqlWriter.scala:395)
at
org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:205)
at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:125)
at
org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:173)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:169)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:197)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:194)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:169)
at
org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:114)
at
org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:112)
at
org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:677)
at
org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:677)
at
org.apache.spark.sql.execution.SQLExecution$.org$apache$spark$sql$execution$SQLExecution$$executeQuery$1(SQLExecution.scala:83)
at
org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1$$anonfun$apply$1.apply(SQLExecution.scala:94)
at
org.apache.spark.sql.execution.QueryExecutionMetrics$.withMetrics(QueryExecutionMetrics.scala:141)
at
org.apache.spark.sql.execution.SQLExecution$.org$apache$spark$sql$execution$SQLExecution$$withMetrics(SQLExecution.scala:178)
at
org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:93)
at
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:200)
at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:92)
at
org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:677)
at
org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:286)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:272)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:230)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
at
org.apache.spark.streaming.api.python.TransformFunction.callPythonTransformFunction(PythonDStream.scala:95)
at
org.apache.spark.streaming.api.python.TransformFunction.apply(PythonDStream.scala:78)
at
org.apache.spark.streaming.api.python.PythonDStream$$anonfun$callForeachRDD$1.apply(PythonDStream.scala:179)
at
org.apache.spark.streaming.api.python.PythonDStream$$anonfun$callForeachRDD$1.apply(PythonDStream.scala:179)
at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)
at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
at
org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50)
at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
at scala.util.Try$.apply(Try.scala:192)
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
at
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:257)
at
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
at
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]