felixYyu opened a new issue #3885:
URL: https://github.com/apache/iceberg/issues/3885


   spark 3.0.1
   iceberg-spark3-runtime 0.12.1
   
   MySQL binlog with Maxwell tool to Kafka
   ```
   val df = spark
         .readStream
         .format("kafka")
         .option("kafka.bootstrap.servers", bootstrapServers)
         .option("subscribe", topic)
         .option("failOnDataLoss", false)
         .option("maxOffsetsPerTrigger", 1000000)
         .load()
   
       val stream = df
         .writeStream
         .trigger(Trigger.ProcessingTime(1, TimeUnit.MINUTES))
         .option("fanout-enabled", "true")
         .option("checkpointLocation", checkpointPath)
         .foreachBatch { (batchDF: DataFrame, batchId: Long) =>
           batchDF.persist()
           batchDF.selectExpr("CAST(key AS STRING)", "CAST(value AS 
STRING)").toDF("key", "value")
           // insert/update/delete DML binlog
           batchDF.filter(batchDF("type") === 
"insert").createOrReplaceTempView("tmp_data_insert")
           batchDF.filter(batchDF("type") === 
"update").createOrReplaceTempView("tmp_data_update")
           batchDF.filter(batchDF("type") === 
"delete").createOrReplaceTempView("tmp_data_delete")
           spark.sql("MERGE INTO hadoop_prod.wms.wms_do t " +
                          " USING (SELECT * FROM tmp_data_insert order by 
partition_col) s" +
                          " ON t.id = s.id AND t.ts > '2022-01-11' " +
                          "WHEN NOT MATCHED THEN INSERT *")
           spark.sql("MERGE INTO hadoop_prod.wms.wms_do t" +
                          " USING (SELECT * FROM tmp_data_insert order by 
partition_col) s" +
                          " ON t.id = s.id AND t.ts > '2022-01-01' " +
                          "WHEN MATCHED THEN UPDATE SET *")
           spark.sql("DELETE FROM hadoop_prod.wms.wms_do t" +
                          " WHERE EXISTS (SELECT * FROM tmp_data_delete WHERE 
t.id = id ")
           batchDF.unpersist()
           empty()
         }.start()
   
       stream.awaitTermination()
       stream.stop()
       spark.stop()
   ```
   
   when the job is runing several hours , oom error happen and this job failed.
   
   ```
   Application application_1641894951363_2568 failed 2 times due to AM 
Container for appattempt_1641894951363_2568_000002 exited with exitCode: -104
   For more detailed output, check application tracking 
page:http://hadoopmanager136:18088/proxy/application_1641894951363_2568/Then, 
click on links to logs of each attempt.
   Diagnostics: Container 
[pid=268684,containerID=container_e91_1641894951363_2568_02_000001] is running 
beyond physical memory limits. Current usage: 4.7 GB of 4.5 GB physical memory 
used; 6.8 GB of 9.4 GB virtual memory used. Killing container.
   Dump of the process-tree for container_e91_1641894951363_2568_02_000001 :
   |- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS) 
SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE
   |- 268689 268684 268684 268684 (java) 20942 1263 7232856064 1229083 
/usr/java/jdk1.8.0_181/bin/java -server -Xmx4096m 
-Djava.io.tmpdir=/data006/yarn/nm/usercache/etl/appcache/application_1641894951363_2568/container_e91_1641894951363_2568_02_000001/tmp
 
-Dspark.yarn.app.container.log.dir=/data003/yarn/container-logs/application_1641894951363_2568/container_e91_1641894951363_2568_02_000001
 org.apache.spark.deploy.yarn.ApplicationMaster --class 
com.jiuye.data.lake.incubation.v2.IcebergMergeData --jar 
file:/opt/maintain/scripts/iceberg/iceberg-sql-jar/rds-datalake-1.0.0.jar 
--properties-file 
/data006/yarn/nm/usercache/etl/appcache/application_1641894951363_2568/container_e91_1641894951363_2568_02_000001/__spark_conf__/__spark_conf__.properties
 --dist-cache-conf 
/data006/yarn/nm/usercache/etl/appcache/application_1641894951363_2568/container_e91_1641894951363_2568_02_000001/__spark_conf__/__spark_dist_cache__.properties
   |- 268684 268681 268684 268684 (bash) 0 0 115847168 671 /bin/bash -c 
/usr/java/jdk1.8.0_181/bin/java -server -Xmx4096m 
-Djava.io.tmpdir=/data006/yarn/nm/usercache/etl/appcache/application_1641894951363_2568/container_e91_1641894951363_2568_02_000001/tmp
 
-Dspark.yarn.app.container.log.dir=/data003/yarn/container-logs/application_1641894951363_2568/container_e91_1641894951363_2568_02_000001
 org.apache.spark.deploy.yarn.ApplicationMaster --class 
'com.jiuye.data.lake.incubation.v2.IcebergMergeData' --jar 
file:/opt/maintain/scripts/iceberg/iceberg-sql-jar/rds-datalake-1.0.0.jar 
--properties-file 
/data006/yarn/nm/usercache/etl/appcache/application_1641894951363_2568/container_e91_1641894951363_2568_02_000001/__spark_conf__/__spark_conf__.properties
 --dist-cache-conf 
/data006/yarn/nm/usercache/etl/appcache/application_1641894951363_2568/container_e91_1641894951363_2568_02_000001/__spark_conf__/__spark_dist_cache__.properties
 1> /data003/yarn/container-logs/application_1641894951363_2568/
 container_e91_1641894951363_2568_02_000001/stdout 2> 
/data003/yarn/container-logs/application_1641894951363_2568/container_e91_1641894951363_2568_02_000001/stderr
   Container killed on request. Exit code is 143
   Container exited with a non-zero exit code 143
   Failing this attempt. Failing the application.
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to