zyclove opened a new issue, #9973:
URL: https://github.com/apache/hudi/issues/9973

   
   
   **Describe the problem you faced**
   
   
   **To Reproduce**
   
   Steps to reproduce the behavior:
   
   1. SQL
   ```java
   CREATE  TABLE if NOT EXISTS bi_ods_real.smart_datapoint_report_rw_clear_rt(
     id STRING COMMENT 'id',
     uuid STRING COMMENT 'log uuid',
     data_id STRING,
     dev_id STRING COMMENT '设备id',
     gw_id STRING ,
     product_id STRING,
     uid STRING  COMMENT '用户ID',
     dp_code STRING,
     dp_id STRING COMMENT 'dp点',
     gmtModified STRING,
     dp_mode STRING ,
     dp_name STRING ,
     dp_time STRING ,
     dp_type STRING ,
     dp_value STRING ,
     gmt_modified bigint COMMENT 'ct 时间',
     dt STRING COMMENT '时间分区字段'
   )
   using hudi 
   tblproperties (
     type = 'mor',
     primaryKey = 'id',
     hoodie.combine.before.upsert='false',
     hoodie.bucket.index.num.buckets=50,
     preCombineField = 'gmt_modified',
     hoodie.compact.inline='false',
     hoodie.common.spillable.diskmap.type='ROCKS_DB',
     hoodie.datasource.write.partitionpath.field='dt,dp_mode'
    )
   PARTITIONED BY (dt,dp_mode)
   COMMENT '';
   
   add jar 
/opt/resource/sucx/hadoop-analyse-udf-0.0.100-jar-with-dependencies.jar;
   set 
hoodie.write.lock.zookeeper.lock_key=bi_ods_real.smart_datapoint_report_rw_clear_rt;
   set hoodie.insert.shuffle.parallelism = 400;
   set hoodie.upsert.shuffle.parallelism = 400;
   set hoodie.delete.shuffle.parallelism = 400;
   set hoodie.write.markers.type=TIMELINE_SERVER_BASED;
   
   create temporary function toJsonArray as 'com.udf.ToJsonArray';
   
   create temporary function dataPointExplode as 'com.udf.DataPointExplode';
   
   call 
copy_to_temp_view(table=>'bi_ods_real.ods_log_smart_datapoint_report_batch_rt',view_name=>'report_view',query_type=>'incremental',begin_instance_time=>'20231031093500000',end_instance_time=>'20231031100500000');
   
   set hoodie.sql.insert.mode=non-strict;
   insert into bi_ods_real.smart_datapoint_report_rw_clear_rt 
   select
        /*+ coalesce(400) */
         
md5(concat(coalesce(data_id,''),coalesce(dev_id,''),coalesce(gw_id,''),coalesce(product_id,''),coalesce(uid,''),coalesce(dp_code,''),coalesce(dp_id,''),coalesce(gmtModified,''),if(dp_mode
 in 
('ro','rw','wr'),dp_mode,'un'),coalesce(dp_name,''),coalesce(dp_time,''),coalesce(dp_type,''),coalesce(dp_value,''),coalesce(ct,'')))
 as id, 
         _hoodie_record_key as uuid,
         data_id,dev_id,gw_id,product_id,uid,
         dp_code,dp_id,gmtModified,if(dp_mode in ('ro','rw','wr'),dp_mode,'un') 
as dp_mode ,dp_name,dp_time,dp_type,dp_value,
         ct as gmt_modified,
         case 
             when length(ct)=10 then 
date_format(from_unixtime(ct),'yyyyMMddHH')  
             when length(ct)=13 then 
date_format(from_unixtime(ct/1000),'yyyyMMddHH') 
             else '1970010100' end as dt
   from 
       report_view  
       lateral  view dataPointExplode(split(value,'\001')[0]) dps as ct, 
data_id, dev_id, gw_id, product_id, uid, dp_code, dp_id, gmtModified, dp_mode, 
dp_name, dp_time, dp_type, dp_value
   where _hoodie_commit_time >20231031093500000 and 
_hoodie_commit_time<=20231031100500000
   ``` 
   2. 
   spark-sql  -f /tmp/VOLCANO_JOB_1698198652813_020309.sql --master yarn 
--driver-memory 8g --conf spark.driver.memoryOverhead=8G --num-executors 10 
--conf spark.dynamicAllocation.maxExecutors=20 --executor-memory 8G 
--executor-cores 2 --packages 
org.apache.hudi:hudi-spark$SPARK_VERSION-bundle_2.12:0.14.0 --conf 
spark.serializer=org.apache.spark.serializer.KryoSerializer --conf 
spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension 
--conf 
spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog 
--conf spark.kryo.registrator=org.apache.spark.HoodieSparkKryoRegistrar --conf 
spark.sql.autoBroadcastJoinThreshold=2G --conf spark.memory.storageFraction=0.7 
--conf spark.memory.storageFraction=0.75 --conf 
spark.sql.broadcastTimeout=60000 --conf spark.yarn.priority=5 --conf 
spark.sql.broadcastTimeout=600000 --conf spark.network.timeout=600000 --conf 
spark.eventLog.enable=false --conf spark.driver.maxResultSize=4g
   
   
   
   **Expected behavior**
   ```
   23/11/02 02:20:16 INFO S3NativeFileSystem: Opening 
's3://big-data-us/hudi/bi/bi_ods_real/smart_datapoint_report_rw_clear_rt/.hoodie/metadata/files/files-0000_0-4-4_20231101184035767001.hfile'
 for reading
   23/11/02 02:21:12 INFO AsyncEventQueue: Process of event 
SparkListenerExecutorMetricsUpdate(driver,WrappedArray(),Map((-1,-1) -> 
org.apache.spark.executor.ExecutorMetrics@4de4ff12)) by listener 
SQLAppStatusListener took 3.068272976s.
   23/11/02 02:23:08 INFO AsyncEventQueue: Process of event 
SparkListenerExecutorMetricsUpdate(driver,WrappedArray(),Map((-1,-1) -> 
org.apache.spark.executor.ExecutorMetrics@141289a6)) by listener 
SQLAppStatusListener took 8.345486202s.
   23/11/02 02:24:28 INFO AsyncEventQueue: Process of event 
SparkListenerExecutorMetricsUpdate(driver,WrappedArray(),Map((-1,-1) -> 
org.apache.spark.executor.ExecutorMetrics@c08b320)) by listener ExecutorMonitor 
took 6.109781102s.
   23/11/02 02:25:19 WARN RetryHelper: Catch Exception for Sending request, 
will retry after 415 ms.
   java.net.SocketTimeoutException: Read timed out
        at java.net.SocketInputStream.socketRead0(Native Method)
        at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
        at java.net.SocketInputStream.read(SocketInputStream.java:171)
        at java.net.SocketInputStream.read(SocketInputStream.java:141)
        at 
org.apache.hudi.org.apache.http.impl.io.SessionInputBufferImpl.streamRead(SessionInputBufferImpl.java:139)
        at 
org.apache.hudi.org.apache.http.impl.io.SessionInputBufferImpl.fillBuffer(SessionInputBufferImpl.java:155)
        at 
org.apache.hudi.org.apache.http.impl.io.SessionInputBufferImpl.readLine(SessionInputBufferImpl.java:284)
        at 
org.apache.hudi.org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:140)
        at 
org.apache.hudi.org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:57)
        at 
org.apache.hudi.org.apache.http.impl.io.AbstractMessageParser.parse(AbstractMessageParser.java:261)
        at 
org.apache.hudi.org.apache.http.impl.DefaultBHttpClientConnection.receiveResponseHeader(DefaultBHttpClientConnection.java:165)
        at 
org.apache.hudi.org.apache.http.impl.conn.CPoolProxy.receiveResponseHeader(CPoolProxy.java:167)
        at 
org.apache.hudi.org.apache.http.protocol.HttpRequestExecutor.doReceiveResponse(HttpRequestExecutor.java:272)
        at 
org.apache.hudi.org.apache.http.protocol.HttpRequestExecutor.execute(HttpRequestExecutor.java:124)
        at 
org.apache.hudi.org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:271)
        at 
org.apache.hudi.org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:184)
        at 
org.apache.hudi.org.apache.http.impl.execchain.RetryExec.execute(RetryExec.java:88)
        at 
org.apache.hudi.org.apache.http.impl.execchain.RedirectExec.execute(RedirectExec.java:110)
        at 
org.apache.hudi.org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:184)
        at 
org.apache.hudi.org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:82)
        at 
org.apache.hudi.org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:107)
        at 
org.apache.hudi.org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:55)
        at 
org.apache.hudi.org.apache.http.client.fluent.Request.execute(Request.java:151)
        at 
org.apache.hudi.common.table.view.RemoteHoodieTableFileSystemView.get(RemoteHoodieTableFileSystemView.java:597)
        at 
org.apache.hudi.common.table.view.RemoteHoodieTableFileSystemView.lambda$executeRequest$8cadb3e1$1(RemoteHoodieTableFileSystemView.java:193)
        at org.apache.hudi.common.util.RetryHelper.start(RetryHelper.java:84)
        at 
org.apache.hudi.common.table.view.RemoteHoodieTableFileSystemView.executeRequest(RemoteHoodieTableFileSystemView.java:193)
        at 
org.apache.hudi.common.table.view.RemoteHoodieTableFileSystemView.loadAllPartitions(RemoteHoodieTableFileSystemView.java:496)
        at 
org.apache.hudi.common.table.view.PriorityBasedFileSystemView.execute(PriorityBasedFileSystemView.java:69)
        at 
org.apache.hudi.common.table.view.PriorityBasedFileSystemView.loadAllPartitions(PriorityBasedFileSystemView.java:172)
        at 
org.apache.hudi.table.action.clean.CleanPlanner.<init>(CleanPlanner.java:110)
        at 
org.apache.hudi.table.action.clean.CleanPlanActionExecutor.requestClean(CleanPlanActionExecutor.java:105)
        at 
org.apache.hudi.table.action.clean.CleanPlanActionExecutor.requestClean(CleanPlanActionExecutor.java:151)
        at 
org.apache.hudi.table.action.clean.CleanPlanActionExecutor.execute(CleanPlanActionExecutor.java:177)
        at 
org.apache.hudi.table.HoodieSparkCopyOnWriteTable.scheduleCleaning(HoodieSparkCopyOnWriteTable.java:217)
        at 
org.apache.hudi.client.BaseHoodieTableServiceClient.scheduleTableServiceInternal(BaseHoodieTableServiceClient.java:628)
        at 
org.apache.hudi.client.BaseHoodieTableServiceClient.clean(BaseHoodieTableServiceClient.java:751)
        at 
org.apache.hudi.client.BaseHoodieWriteClient.clean(BaseHoodieWriteClient.java:861)
        at 
org.apache.hudi.client.BaseHoodieWriteClient.clean(BaseHoodieWriteClient.java:834)
        at 
org.apache.hudi.client.BaseHoodieWriteClient.clean(BaseHoodieWriteClient.java:865)
        at 
org.apache.hudi.client.BaseHoodieWriteClient.autoCleanOnCommit(BaseHoodieWriteClient.java:599)
        at 
org.apache.hudi.client.BaseHoodieWriteClient.mayBeCleanAndArchive(BaseHoodieWriteClient.java:578)
        at 
org.apache.hudi.client.BaseHoodieWriteClient.commitStats(BaseHoodieWriteClient.java:248)
        at 
org.apache.hudi.client.SparkRDDWriteClient.commit(SparkRDDWriteClient.java:104)
        at 
org.apache.hudi.HoodieSparkSqlWriter$.commitAndPerformPostOperations(HoodieSparkSqlWriter.scala:1059)
        at 
org.apache.hudi.HoodieSparkSqlWriter$.writeInternal(HoodieSparkSqlWriter.scala:441)
        at 
org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:132)
        at 
org.apache.spark.sql.hudi.command.InsertIntoHoodieTableCommand$.run(InsertIntoHoodieTableCommand.scala:108)
        at 
org.apache.spark.sql.hudi.command.InsertIntoHoodieTableCommand.run(InsertIntoHoodieTableCommand.scala:61)
        at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
        at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
        at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
        at 
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:115)
        at 
org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
        at 
org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
        at 
org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:110)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:135)
        at 
org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
        at 
org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:135)
        at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:253)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:134)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
        at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68)
        at 
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:112)
        at 
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:108)
        at 
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:519)
        at 
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:83)
        at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:519)
        at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
        at 
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
        at 
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
        at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
        at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
        at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:495)
        at 
org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:108)
        at 
org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:95)
        at 
org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:93)
        at org.apache.spark.sql.Dataset.<init>(Dataset.scala:221)
        at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:101)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
        at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:98)
        at 
org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:618)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
        at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:613)
        at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:651)
        at 
org.apache.spark.sql.hive.thriftserver.SparkSQLDriver.run(SparkSQLDriver.scala:67)
        at 
org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processCmd(SparkSQLCLIDriver.scala:384)
        at 
org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.$anonfun$processLine$1(SparkSQLCLIDriver.scala:504)
        at 
org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.$anonfun$processLine$1$adapted(SparkSQLCLIDriver.scala:498)
        at scala.collection.Iterator.foreach(Iterator.scala:943)
        at scala.collection.Iterator.foreach$(Iterator.scala:943)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
        at scala.collection.IterableLike.foreach(IterableLike.scala:74)
        at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
        at 
org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processLine(SparkSQLCLIDriver.scala:498)
        at org.apache.hadoop.hive.cli.CliDriver.processLine(CliDriver.java:336)
        at 
org.apache.hadoop.hive.cli.CliDriver.processReader(CliDriver.java:474)
        at org.apache.hadoop.hive.cli.CliDriver.processFile(CliDriver.java:490)
        at 
org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver$.main(SparkSQLCLIDriver.scala:213)
        at 
org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.main(SparkSQLCLIDriver.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at 
org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
        at 
org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:1000)
        at 
org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
        at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
        at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
        at 
org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1089)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1098)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
   23/11/02 02:27:23 INFO AsyncEventQueue: Process of event 
SparkListenerExecutorMetricsUpdate(driver,WrappedArray(),Map((-1,-1) -> 
org.apache.spark.executor.ExecutorMetrics@3944e740)) by listener 
SQLAppStatusListener took 6.412619567s.
   23/11/02 02:29:59 INFO AsyncEventQueue: Process of event 
SparkListenerExecutorMetricsUpdate(driver,WrappedArray(),Map((-1,-1) -> 
org.apache.spark.executor.ExecutorMetrics@5ce8e788)) by listener 
AppStatusListener took 4.160406911s.
   23/11/02 02:30:35 INFO AsyncEventQueue: Process of event 
SparkListenerExecutorMetricsUpdate(driver,WrappedArray(),Map((-1,-1) -> 
org.apache.spark.executor.ExecutorMetrics@361e9f68)) by listener 
AppStatusListener took 8.347071876s.
   23/11/02 02:30:57 INFO AsyncEventQueue: Process of event 
SparkListenerExecutorMetricsUpdate(driver,WrappedArray(),Map((-1,-1) -> 
org.apache.spark.executor.ExecutorMetrics@36416fb4)) by listener 
AppStatusListener took 9.053349954s.
   23/11/02 02:32:05 INFO AsyncEventQueue: Process of event 
SparkListenerExecutorMetricsUpdate(driver,WrappedArray(),Map((-1,-1) -> 
org.apache.spark.executor.ExecutorMetrics@2821e0db)) by listener 
AppStatusListener took 6.192278739s.
   23/11/02 02:33:34 WARN DataStreamer: Exception for 
BP-452038477-172.16.13.57-1696927694357:blk_1074472233_2628950
   java.io.EOFException: Unexpected EOF while trying to read response from 
server
        at 
org.apache.hadoop.hdfs.protocolPB.PBHelperClient.vintPrefixed(PBHelperClient.java:567)
        at 
org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck.readFields(PipelineAck.java:213)
        at 
org.apache.hadoop.hdfs.DataStreamer$ResponseProcessor.run(DataStreamer.java:1086)
   #
   # java.lang.OutOfMemoryError: Java heap space
   # -XX:OnOutOfMemoryError="kill -9 %p"
   #   Executing /bin/sh -c "kill -9 23856"...
   ``` 
   
   **Environment Description**
   
   * Hudi version : 0.14
   
   * Spark version :3.2.1
   
   * Hive version :3.1.3
   
   * Hadoop version :3.2.2
   
   * Storage (HDFS/S3/GCS..) :s3
   
   * Running on Docker? (yes/no) :no
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to