zyclove opened a new issue, #9973:
URL: https://github.com/apache/hudi/issues/9973
**Describe the problem you faced**
**To Reproduce**
Steps to reproduce the behavior:
1. SQL
```java
CREATE TABLE if NOT EXISTS bi_ods_real.smart_datapoint_report_rw_clear_rt(
id STRING COMMENT 'id',
uuid STRING COMMENT 'log uuid',
data_id STRING,
dev_id STRING COMMENT '设备id',
gw_id STRING ,
product_id STRING,
uid STRING COMMENT '用户ID',
dp_code STRING,
dp_id STRING COMMENT 'dp点',
gmtModified STRING,
dp_mode STRING ,
dp_name STRING ,
dp_time STRING ,
dp_type STRING ,
dp_value STRING ,
gmt_modified bigint COMMENT 'ct 时间',
dt STRING COMMENT '时间分区字段'
)
using hudi
tblproperties (
type = 'mor',
primaryKey = 'id',
hoodie.combine.before.upsert='false',
hoodie.bucket.index.num.buckets=50,
preCombineField = 'gmt_modified',
hoodie.compact.inline='false',
hoodie.common.spillable.diskmap.type='ROCKS_DB',
hoodie.datasource.write.partitionpath.field='dt,dp_mode'
)
PARTITIONED BY (dt,dp_mode)
COMMENT '';
add jar
/opt/resource/sucx/hadoop-analyse-udf-0.0.100-jar-with-dependencies.jar;
set
hoodie.write.lock.zookeeper.lock_key=bi_ods_real.smart_datapoint_report_rw_clear_rt;
set hoodie.insert.shuffle.parallelism = 400;
set hoodie.upsert.shuffle.parallelism = 400;
set hoodie.delete.shuffle.parallelism = 400;
set hoodie.write.markers.type=TIMELINE_SERVER_BASED;
create temporary function toJsonArray as 'com.udf.ToJsonArray';
create temporary function dataPointExplode as 'com.udf.DataPointExplode';
call
copy_to_temp_view(table=>'bi_ods_real.ods_log_smart_datapoint_report_batch_rt',view_name=>'report_view',query_type=>'incremental',begin_instance_time=>'20231031093500000',end_instance_time=>'20231031100500000');
set hoodie.sql.insert.mode=non-strict;
insert into bi_ods_real.smart_datapoint_report_rw_clear_rt
select
/*+ coalesce(400) */
md5(concat(coalesce(data_id,''),coalesce(dev_id,''),coalesce(gw_id,''),coalesce(product_id,''),coalesce(uid,''),coalesce(dp_code,''),coalesce(dp_id,''),coalesce(gmtModified,''),if(dp_mode
in
('ro','rw','wr'),dp_mode,'un'),coalesce(dp_name,''),coalesce(dp_time,''),coalesce(dp_type,''),coalesce(dp_value,''),coalesce(ct,'')))
as id,
_hoodie_record_key as uuid,
data_id,dev_id,gw_id,product_id,uid,
dp_code,dp_id,gmtModified,if(dp_mode in ('ro','rw','wr'),dp_mode,'un')
as dp_mode ,dp_name,dp_time,dp_type,dp_value,
ct as gmt_modified,
case
when length(ct)=10 then
date_format(from_unixtime(ct),'yyyyMMddHH')
when length(ct)=13 then
date_format(from_unixtime(ct/1000),'yyyyMMddHH')
else '1970010100' end as dt
from
report_view
lateral view dataPointExplode(split(value,'\001')[0]) dps as ct,
data_id, dev_id, gw_id, product_id, uid, dp_code, dp_id, gmtModified, dp_mode,
dp_name, dp_time, dp_type, dp_value
where _hoodie_commit_time >20231031093500000 and
_hoodie_commit_time<=20231031100500000
```
2.
spark-sql -f /tmp/VOLCANO_JOB_1698198652813_020309.sql --master yarn
--driver-memory 8g --conf spark.driver.memoryOverhead=8G --num-executors 10
--conf spark.dynamicAllocation.maxExecutors=20 --executor-memory 8G
--executor-cores 2 --packages
org.apache.hudi:hudi-spark$SPARK_VERSION-bundle_2.12:0.14.0 --conf
spark.serializer=org.apache.spark.serializer.KryoSerializer --conf
spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension
--conf
spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog
--conf spark.kryo.registrator=org.apache.spark.HoodieSparkKryoRegistrar --conf
spark.sql.autoBroadcastJoinThreshold=2G --conf spark.memory.storageFraction=0.7
--conf spark.memory.storageFraction=0.75 --conf
spark.sql.broadcastTimeout=60000 --conf spark.yarn.priority=5 --conf
spark.sql.broadcastTimeout=600000 --conf spark.network.timeout=600000 --conf
spark.eventLog.enable=false --conf spark.driver.maxResultSize=4g
**Expected behavior**
```
23/11/02 02:20:16 INFO S3NativeFileSystem: Opening
's3://big-data-us/hudi/bi/bi_ods_real/smart_datapoint_report_rw_clear_rt/.hoodie/metadata/files/files-0000_0-4-4_20231101184035767001.hfile'
for reading
23/11/02 02:21:12 INFO AsyncEventQueue: Process of event
SparkListenerExecutorMetricsUpdate(driver,WrappedArray(),Map((-1,-1) ->
org.apache.spark.executor.ExecutorMetrics@4de4ff12)) by listener
SQLAppStatusListener took 3.068272976s.
23/11/02 02:23:08 INFO AsyncEventQueue: Process of event
SparkListenerExecutorMetricsUpdate(driver,WrappedArray(),Map((-1,-1) ->
org.apache.spark.executor.ExecutorMetrics@141289a6)) by listener
SQLAppStatusListener took 8.345486202s.
23/11/02 02:24:28 INFO AsyncEventQueue: Process of event
SparkListenerExecutorMetricsUpdate(driver,WrappedArray(),Map((-1,-1) ->
org.apache.spark.executor.ExecutorMetrics@c08b320)) by listener ExecutorMonitor
took 6.109781102s.
23/11/02 02:25:19 WARN RetryHelper: Catch Exception for Sending request,
will retry after 415 ms.
java.net.SocketTimeoutException: Read timed out
at java.net.SocketInputStream.socketRead0(Native Method)
at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
at java.net.SocketInputStream.read(SocketInputStream.java:171)
at java.net.SocketInputStream.read(SocketInputStream.java:141)
at
org.apache.hudi.org.apache.http.impl.io.SessionInputBufferImpl.streamRead(SessionInputBufferImpl.java:139)
at
org.apache.hudi.org.apache.http.impl.io.SessionInputBufferImpl.fillBuffer(SessionInputBufferImpl.java:155)
at
org.apache.hudi.org.apache.http.impl.io.SessionInputBufferImpl.readLine(SessionInputBufferImpl.java:284)
at
org.apache.hudi.org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:140)
at
org.apache.hudi.org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:57)
at
org.apache.hudi.org.apache.http.impl.io.AbstractMessageParser.parse(AbstractMessageParser.java:261)
at
org.apache.hudi.org.apache.http.impl.DefaultBHttpClientConnection.receiveResponseHeader(DefaultBHttpClientConnection.java:165)
at
org.apache.hudi.org.apache.http.impl.conn.CPoolProxy.receiveResponseHeader(CPoolProxy.java:167)
at
org.apache.hudi.org.apache.http.protocol.HttpRequestExecutor.doReceiveResponse(HttpRequestExecutor.java:272)
at
org.apache.hudi.org.apache.http.protocol.HttpRequestExecutor.execute(HttpRequestExecutor.java:124)
at
org.apache.hudi.org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:271)
at
org.apache.hudi.org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:184)
at
org.apache.hudi.org.apache.http.impl.execchain.RetryExec.execute(RetryExec.java:88)
at
org.apache.hudi.org.apache.http.impl.execchain.RedirectExec.execute(RedirectExec.java:110)
at
org.apache.hudi.org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:184)
at
org.apache.hudi.org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:82)
at
org.apache.hudi.org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:107)
at
org.apache.hudi.org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:55)
at
org.apache.hudi.org.apache.http.client.fluent.Request.execute(Request.java:151)
at
org.apache.hudi.common.table.view.RemoteHoodieTableFileSystemView.get(RemoteHoodieTableFileSystemView.java:597)
at
org.apache.hudi.common.table.view.RemoteHoodieTableFileSystemView.lambda$executeRequest$8cadb3e1$1(RemoteHoodieTableFileSystemView.java:193)
at org.apache.hudi.common.util.RetryHelper.start(RetryHelper.java:84)
at
org.apache.hudi.common.table.view.RemoteHoodieTableFileSystemView.executeRequest(RemoteHoodieTableFileSystemView.java:193)
at
org.apache.hudi.common.table.view.RemoteHoodieTableFileSystemView.loadAllPartitions(RemoteHoodieTableFileSystemView.java:496)
at
org.apache.hudi.common.table.view.PriorityBasedFileSystemView.execute(PriorityBasedFileSystemView.java:69)
at
org.apache.hudi.common.table.view.PriorityBasedFileSystemView.loadAllPartitions(PriorityBasedFileSystemView.java:172)
at
org.apache.hudi.table.action.clean.CleanPlanner.<init>(CleanPlanner.java:110)
at
org.apache.hudi.table.action.clean.CleanPlanActionExecutor.requestClean(CleanPlanActionExecutor.java:105)
at
org.apache.hudi.table.action.clean.CleanPlanActionExecutor.requestClean(CleanPlanActionExecutor.java:151)
at
org.apache.hudi.table.action.clean.CleanPlanActionExecutor.execute(CleanPlanActionExecutor.java:177)
at
org.apache.hudi.table.HoodieSparkCopyOnWriteTable.scheduleCleaning(HoodieSparkCopyOnWriteTable.java:217)
at
org.apache.hudi.client.BaseHoodieTableServiceClient.scheduleTableServiceInternal(BaseHoodieTableServiceClient.java:628)
at
org.apache.hudi.client.BaseHoodieTableServiceClient.clean(BaseHoodieTableServiceClient.java:751)
at
org.apache.hudi.client.BaseHoodieWriteClient.clean(BaseHoodieWriteClient.java:861)
at
org.apache.hudi.client.BaseHoodieWriteClient.clean(BaseHoodieWriteClient.java:834)
at
org.apache.hudi.client.BaseHoodieWriteClient.clean(BaseHoodieWriteClient.java:865)
at
org.apache.hudi.client.BaseHoodieWriteClient.autoCleanOnCommit(BaseHoodieWriteClient.java:599)
at
org.apache.hudi.client.BaseHoodieWriteClient.mayBeCleanAndArchive(BaseHoodieWriteClient.java:578)
at
org.apache.hudi.client.BaseHoodieWriteClient.commitStats(BaseHoodieWriteClient.java:248)
at
org.apache.hudi.client.SparkRDDWriteClient.commit(SparkRDDWriteClient.java:104)
at
org.apache.hudi.HoodieSparkSqlWriter$.commitAndPerformPostOperations(HoodieSparkSqlWriter.scala:1059)
at
org.apache.hudi.HoodieSparkSqlWriter$.writeInternal(HoodieSparkSqlWriter.scala:441)
at
org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:132)
at
org.apache.spark.sql.hudi.command.InsertIntoHoodieTableCommand$.run(InsertIntoHoodieTableCommand.scala:108)
at
org.apache.spark.sql.hudi.command.InsertIntoHoodieTableCommand.run(InsertIntoHoodieTableCommand.scala:61)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
at
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:115)
at
org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
at
org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
at
org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:110)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:135)
at
org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
at
org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:135)
at
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:253)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:134)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68)
at
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:112)
at
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:108)
at
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:519)
at
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:83)
at
org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:519)
at
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
at
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
at
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
at
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
at
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
at
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:495)
at
org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:108)
at
org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:95)
at
org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:93)
at org.apache.spark.sql.Dataset.<init>(Dataset.scala:221)
at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:101)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:98)
at
org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:618)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:613)
at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:651)
at
org.apache.spark.sql.hive.thriftserver.SparkSQLDriver.run(SparkSQLDriver.scala:67)
at
org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processCmd(SparkSQLCLIDriver.scala:384)
at
org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.$anonfun$processLine$1(SparkSQLCLIDriver.scala:504)
at
org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.$anonfun$processLine$1$adapted(SparkSQLCLIDriver.scala:498)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at scala.collection.IterableLike.foreach(IterableLike.scala:74)
at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
at
org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processLine(SparkSQLCLIDriver.scala:498)
at org.apache.hadoop.hive.cli.CliDriver.processLine(CliDriver.java:336)
at
org.apache.hadoop.hive.cli.CliDriver.processReader(CliDriver.java:474)
at org.apache.hadoop.hive.cli.CliDriver.processFile(CliDriver.java:490)
at
org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver$.main(SparkSQLCLIDriver.scala:213)
at
org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.main(SparkSQLCLIDriver.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
at
org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:1000)
at
org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
at
org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1089)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1098)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
23/11/02 02:27:23 INFO AsyncEventQueue: Process of event
SparkListenerExecutorMetricsUpdate(driver,WrappedArray(),Map((-1,-1) ->
org.apache.spark.executor.ExecutorMetrics@3944e740)) by listener
SQLAppStatusListener took 6.412619567s.
23/11/02 02:29:59 INFO AsyncEventQueue: Process of event
SparkListenerExecutorMetricsUpdate(driver,WrappedArray(),Map((-1,-1) ->
org.apache.spark.executor.ExecutorMetrics@5ce8e788)) by listener
AppStatusListener took 4.160406911s.
23/11/02 02:30:35 INFO AsyncEventQueue: Process of event
SparkListenerExecutorMetricsUpdate(driver,WrappedArray(),Map((-1,-1) ->
org.apache.spark.executor.ExecutorMetrics@361e9f68)) by listener
AppStatusListener took 8.347071876s.
23/11/02 02:30:57 INFO AsyncEventQueue: Process of event
SparkListenerExecutorMetricsUpdate(driver,WrappedArray(),Map((-1,-1) ->
org.apache.spark.executor.ExecutorMetrics@36416fb4)) by listener
AppStatusListener took 9.053349954s.
23/11/02 02:32:05 INFO AsyncEventQueue: Process of event
SparkListenerExecutorMetricsUpdate(driver,WrappedArray(),Map((-1,-1) ->
org.apache.spark.executor.ExecutorMetrics@2821e0db)) by listener
AppStatusListener took 6.192278739s.
23/11/02 02:33:34 WARN DataStreamer: Exception for
BP-452038477-172.16.13.57-1696927694357:blk_1074472233_2628950
java.io.EOFException: Unexpected EOF while trying to read response from
server
at
org.apache.hadoop.hdfs.protocolPB.PBHelperClient.vintPrefixed(PBHelperClient.java:567)
at
org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck.readFields(PipelineAck.java:213)
at
org.apache.hadoop.hdfs.DataStreamer$ResponseProcessor.run(DataStreamer.java:1086)
#
# java.lang.OutOfMemoryError: Java heap space
# -XX:OnOutOfMemoryError="kill -9 %p"
# Executing /bin/sh -c "kill -9 23856"...
```
**Environment Description**
* Hudi version : 0.14
* Spark version :3.2.1
* Hive version :3.1.3
* Hadoop version :3.2.2
* Storage (HDFS/S3/GCS..) :s3
* Running on Docker? (yes/no) :no
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]