jtmzheng opened a new issue #2408:
URL: https://github.com/apache/hudi/issues/2408


   **Describe the problem you faced**
   
   We have a Spark Streaming application running on EMR 5.31.0 that reads from 
a Kinesis stream (batch interval of 30 minutes) and upserts to a MOR dataset 
that is partitioned by date. This dataset is ~ 2.2 TB in size and ~ 6 billion 
records. Our problem is the application is now persistently crashing on an 
OutOfMemory error regardless of the batch input size (stacktrace attached below 
is for an input of ~ 1 million records and size ~ 250 mb). For debugging we've 
tried replacing the Hudi upsert with a simple count and afterwards there seems 
to be minimal memory usage by the application based on the Spark UI.
   
   ``` 
df.write.format("org.apache.hudi").options(**hudi_options).mode("append").save(OUTPUT_PATH)```
   
   This seems similar to 
https://github.com/apache/hudi/issues/1491#issuecomment-610626104 since it 
seems to be running out of memory on reading old records based on the 
stacktrace. There are a lot of large log files in the dataset:
   
   ```
   ...
   2021-01-01 00:28:08  910487106 
hudi/production/transactions_v9/2020/12/30/.0ef05182-e0ad-44b5-b52d-0412a6b44a01-1_20210101033018.log.1_3774-34-62086
   2021-01-01 21:03:26  910490317 
hudi/production/transactions_v9/2020/12/30/.0ef05182-e0ad-44b5-b52d-0412a6b44a01-1_20210101033018.log.11_3774-34-62109
   2021-01-01 16:52:39  910495970 
hudi/production/transactions_v9/2020/12/30/.0ef05182-e0ad-44b5-b52d-0412a6b44a01-1_20210101033018.log.9_3774-34-62083
   ```
   
   Our Hudi configs are:
   
   ```
   hudi_options = {
           "hoodie.table.name": "transactions",
           "hoodie.datasource.write.recordkey.field": "id.value",
           "hoodie.datasource.write.keygenerator.class": 
"org.apache.hudi.keygen.ComplexKeyGenerator",
           "hoodie.datasource.write.partitionpath.field": "year,month,day",
           "hoodie.datasource.write.table.name": "transactions",
           "hoodie.datasource.write.table.type": "MERGE_ON_READ",
           "hoodie.datasource.write.operation": "upsert",
           "hoodie.consistency.check.enabled": "true",
           "hoodie.datasource.write.precombine.field": "publishedAtUnixNano",
           "hoodie.compact.inline": True,
           "hoodie.compact.inline.max.delta.commits": 10,
           "hoodie.cleaner.commits.retained": 1,
   }
   ```
   
   Our Spark configs are (CORES_PER_EXECUTOR = 5, NUM_EXECUTORS = 30) for a 
cluster running on 10 r5.4xlarge instances:
   
   ```
               "spark.serializer": "org.apache.spark.serializer.KryoSerializer",
               "spark.sql.hive.convertMetastoreParquet": "false",
               # Recommended from 
https://aws.amazon.com/blogs/big-data/best-practices-for-successfully-managing-memory-for-apache-spark-applications-on-amazon-emr/
               "spark.executor.cores": CORES_PER_EXECUTOR,
               "spark.executor.memory": "36g",
               "spark.yarn.executor.memoryOverhead": "4g",
               "spark.driver.cores": CORES_PER_EXECUTOR,
               "spark.driver.memory": "36g",
               "spark.executor.instances": NUM_EXECUTORS,
               "spark.default.parallelism": NUM_EXECUTORS * CORES_PER_EXECUTOR 
* 2,
               "spark.dynamicAllocation.enabled": "false",
               "spark.streaming.dynamicAllocation.enabled": "false",
               "spark.streaming.backpressure.enabled": "true",
               # Set max rate limit per receiver to limit memory usage
               "spark.streaming.receiver.maxRate": "10",
               # Shutdown gracefully on JVM shutdown
               "spark.streaming.stopGracefullyOnShutdown": "true",
               # GC options 
(https://databricks.com/blog/2015/05/28/tuning-java-garbage-collection-for-spark-applications.html)
               "spark.executor.defaultJavaOptions": "-XX:+UseG1GC 
-XX:+PrintFlagsFinal -XX:+PrintReferenceGC -verbose:gc -XX:+PrintGCDetails 
-XX:+PrintGCTimeStamps -XX:+PrintAdaptiveSizePolicy 
-XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark 
-XX:InitiatingHeapOccupancyPercent=35 -XX:ConcGCThreads=12",
               "spark.driver.defaultJavaOptions": "-XX:+UseG1GC 
-XX:+PrintFlagsFinal -XX:+PrintReferenceGC -verbose:gc -XX:+PrintGCDetails 
-XX:+PrintGCTimeStamps -XX:+PrintAdaptiveSizePolicy 
-XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark 
-XX:InitiatingHeapOccupancyPercent=35 -XX:ConcGCThreads=12",
               "spark.streaming.kinesis.retry.waitTime": "1000ms",  # default 
is 100 ms
               "spark.streaming.kinesis.retry.maxAttempts": "10",  # default is 
10
               # Set max retries on S3 rate limits
               "spark.hadoop.fs.s3.maxRetries": "20",
   ```
   
   **Environment Description**
   
   * Hudi version : 0.6.0
   
   * Spark version : 2.4.6 (EMR 5.31.0)
   
   * Hive version : Hive 2.3.7
   
   * Hadoop version : Amazon 2.10.0
   
   * Storage (HDFS/S3/GCS..) : S3
   
   * Running on Docker? (yes/no) : no
   
   
   **Additional context**
   
   Although the records are partitioned by date the Kinesis stream contains 
many backfill records as well so many partitions may be upserted to in a single 
batch.
   
   Please let me know what other context I can provide that would be useful. 
Thank you!
   
   **Stacktrace**
   
   ```Traceback (most recent call last):
     File "stream_transactions.py", line 965, in <module>
       ssc.awaitTermination()
     File 
"/mnt2/yarn/usercache/hadoop/appcache/application_1609869637845_0001/container_1609869637845_0001_01_000001/pyspark.zip/pyspark/streaming/context.py",
 line 192, in awaitTermination
     File 
"/mnt2/yarn/usercache/hadoop/appcache/application_1609869637845_0001/container_1609869637845_0001_01_000001/py4j-0.10.7-src.zip/py4j/java_gateway.py",
 line 1257, in __call__
     File 
"/mnt2/yarn/usercache/hadoop/appcache/application_1609869637845_0001/container_1609869637845_0001_01_000001/pyspark.zip/pyspark/sql/utils.py",
 line 63, in deco
     File 
"/mnt2/yarn/usercache/hadoop/appcache/application_1609869637845_0001/container_1609869637845_0001_01_000001/py4j-0.10.7-src.zip/py4j/protocol.py",
 line 328, in get_return_value
   py4j.protocol.Py4JJavaError: An error occurred while calling 
o89.awaitTermination.
   : org.apache.spark.SparkException: An exception was raised by Python:
   Traceback (most recent call last):
     File 
"/mnt2/yarn/usercache/hadoop/appcache/application_1609869637845_0001/container_1609869637845_0001_01_000001/pyspark.zip/pyspark/streaming/util.py",
 line 68, in call
       r = self.func(t, *rdds)
     File "stream_transactions.py", line 954, in process
       
df.write.format("org.apache.hudi").options(**hudi_options).mode("append").save(OUTPUT_PATH)
     File 
"/mnt2/yarn/usercache/hadoop/appcache/application_1609869637845_0001/container_1609869637845_0001_01_000001/pyspark.zip/pyspark/sql/readwriter.py",
 line 739, in save
       self._jwrite.save(path)
     File 
"/mnt2/yarn/usercache/hadoop/appcache/application_1609869637845_0001/container_1609869637845_0001_01_000001/py4j-0.10.7-src.zip/py4j/java_gateway.py",
 line 1257, in __call__
       answer, self.gateway_client, self.target_id, self.name)
     File 
"/mnt2/yarn/usercache/hadoop/appcache/application_1609869637845_0001/container_1609869637845_0001_01_000001/pyspark.zip/pyspark/sql/utils.py",
 line 63, in deco
       return f(*a, **kw)
     File 
"/mnt2/yarn/usercache/hadoop/appcache/application_1609869637845_0001/container_1609869637845_0001_01_000001/py4j-0.10.7-src.zip/py4j/protocol.py",
 line 328, in get_return_value
       format(target_id, ".", name), value)
   py4j.protocol.Py4JJavaError: An error occurred while calling o1902.save.
   : java.lang.OutOfMemoryError
        at 
java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123)
        at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117)
        at 
java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
        at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
        at java.io.DataOutputStream.write(DataOutputStream.java:107)
        at java.io.FilterOutputStream.write(FilterOutputStream.java:97)
        at 
org.apache.hudi.common.table.log.block.HoodieAvroDataBlock.serializeRecords(HoodieAvroDataBlock.java:114)
        at 
org.apache.hudi.common.table.log.block.HoodieDataBlock.getContentBytes(HoodieDataBlock.java:95)
        at 
org.apache.hudi.common.table.log.HoodieLogFormatWriter.appendBlock(HoodieLogFormatWriter.java:139)
        at 
org.apache.hudi.table.HoodieTimelineArchiveLog.writeToFile(HoodieTimelineArchiveLog.java:309)
        at 
org.apache.hudi.table.HoodieTimelineArchiveLog.archive(HoodieTimelineArchiveLog.java:282)
        at 
org.apache.hudi.table.HoodieTimelineArchiveLog.archiveIfRequired(HoodieTimelineArchiveLog.java:133)
        at 
org.apache.hudi.client.HoodieWriteClient.postCommit(HoodieWriteClient.java:381)
        at 
org.apache.hudi.client.AbstractHoodieWriteClient.commitStats(AbstractHoodieWriteClient.java:126)
        at 
org.apache.hudi.client.AbstractHoodieWriteClient.commit(AbstractHoodieWriteClient.java:99)
        at 
org.apache.hudi.client.AbstractHoodieWriteClient.commit(AbstractHoodieWriteClient.java:90)
        at 
org.apache.hudi.HoodieSparkSqlWriter$.commitAndPerformPostOperations(HoodieSparkSqlWriter.scala:395)
        at 
org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:205)
        at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:125)
        at 
org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
        at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
        at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
        at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
        at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:173)
        at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:169)
        at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:197)
        at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at 
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:194)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:169)
        at 
org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:114)
        at 
org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:112)
        at 
org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:677)
        at 
org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:677)
        at 
org.apache.spark.sql.execution.SQLExecution$.org$apache$spark$sql$execution$SQLExecution$$executeQuery$1(SQLExecution.scala:83)
        at 
org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1$$anonfun$apply$1.apply(SQLExecution.scala:94)
        at 
org.apache.spark.sql.execution.QueryExecutionMetrics$.withMetrics(QueryExecutionMetrics.scala:141)
        at 
org.apache.spark.sql.execution.SQLExecution$.org$apache$spark$sql$execution$SQLExecution$$withMetrics(SQLExecution.scala:178)
        at 
org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:93)
        at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:200)
        at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:92)
        at 
org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:677)
        at 
org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:286)
        at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:272)
        at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:230)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.lang.Thread.run(Thread.java:748)
        at 
org.apache.spark.streaming.api.python.TransformFunction.callPythonTransformFunction(PythonDStream.scala:95)
        at 
org.apache.spark.streaming.api.python.TransformFunction.apply(PythonDStream.scala:78)
        at 
org.apache.spark.streaming.api.python.PythonDStream$$anonfun$callForeachRDD$1.apply(PythonDStream.scala:179)
        at 
org.apache.spark.streaming.api.python.PythonDStream$$anonfun$callForeachRDD$1.apply(PythonDStream.scala:179)
        at 
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)
        at 
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
        at 
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
        at 
org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
        at 
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50)
        at 
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
        at 
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
        at scala.util.Try$.apply(Try.scala:192)
        at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
        at 
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:257)
        at 
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
        at 
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
        at 
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)```
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to