psendyk opened a new issue, #12652:
URL: https://github.com/apache/hudi/issues/12652
**Describe the problem you faced**
After enabling timeline server and using `REMOTE_ONLY` file system view,
Spark Structured Streaming ingestion into Hudi fails on the second microbatch
with `org.apache.hudi.exception.HoodieRemoteException: Connect to
localhost:26754 [localhost/127.0.0.1] failed: Connection refused`
**To Reproduce**
Steps to reproduce the behavior:
1. Start Spark Structured Streaming ingestion into Hudi with following
filesystem view config:
```
HoodieWriteConfig.EMBEDDED_TIMELINE_SERVER_ENABLE.key() -> "true",
FileSystemViewStorageConfig.VIEW_TYPE.key() ->
FileSystemViewStorageType.REMOTE_ONLY.name(),
FileSystemViewStorageConfig.REMOTE_BACKUP_VIEW_ENABLE.key() ->
"false"
```
2. Wait for the second micro-batch, the job should fail immediately during
`Getting small files from partitions` stage
**Expected behavior**
The application should continue without failure
**Environment Description**
* Hudi version : 0.15.0
* Spark version : 3.3.0
* Hive version :
* Hadoop version : 3.2.1
* Storage (HDFS/S3/GCS..) : S3
* Running on Docker? (yes/no) : no
**Additional context**
This issue occurs because the timeline service is stopped after the first
write and is never restarted. Specifically, the write client is closed, which
then closes the timeline server in the `finally` block in
`HoodieSparkSqlWriterInternal.writeInternal`. Here's a stack trace showing how
this part of the code is reached:
```
at
org.apache.hudi.HoodieSparkSqlWriterInternal.writeInternal(HoodieSparkSqlWriter.scala:508)
at
org.apache.hudi.HoodieSparkSqlWriterInternal.write(HoodieSparkSqlWriter.scala:187)
at
org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:125)
at
org.apache.hudi.HoodieStreamingSink.$anonfun$addBatch$3(HoodieStreamingSink.scala:141)
at
org.apache.hudi.HoodieStreamingSink$$Lambda$3032/943647142.apply(Unknown Source)
at scala.util.Try$.apply(Try.scala:213)
at
org.apache.hudi.HoodieStreamingSink.$anonfun$addBatch$2(HoodieStreamingSink.scala:133)
at
org.apache.hudi.HoodieStreamingSink$$Lambda$3031/1476625006.apply(Unknown
Source)
at
org.apache.hudi.HoodieStreamingSink.retry(HoodieStreamingSink.scala:237)
at
org.apache.hudi.HoodieStreamingSink.addBatch(HoodieStreamingSink.scala:132)
- locked <0x0000ffefe71a1808> (a org.apache.hudi.HoodieStreamingSink)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$17(MicroBatchExecution.scala:660)
```
The timeline server is only started when instantiating `HoodieBaseClient`
via `startEmbeddedServerView` but the write client is reused across batches
within `HoodieStreamingSink.addBatch`. Therefore, the second batch uses a
client that has closed the timeline server. The error may first seem like a
config issue but it only looks like this because the view storage config is
updated with the timeline service config when the service is started, then
reset to the client-provided config when the service is stopped. We can see the
correct config pointing to the timeline service is used for the first write:
```
25/01/16 15:18:13 INFO FileSystemViewManager: Creating remote view for
basePath <REDACTED>. Server=ip-<REDACTED>:37393, Timeout=300
```
but not for the second write:
```
25/01/16 15:20:15 INFO FileSystemViewManager: Creating remote view for
basePath <REDACTED>. Server=localhost:26754, Timeout=300
```
I was able to work around this failure mode by modifying
`HoodieSparkSqlWriterInternal.writeInternal` to restart the timeline server at
the beginning of each write but as you can imagine this only gives us partial
improvement (within a batch but not across batches). I'm wondering if there's a
reason why the write client, and subsequently timeline server, is stopped after
each batch?
**Stacktrace**
```
Caused by: org.apache.spark.SparkException: Job aborted due to stage
failure: Task 47 in stage 11.0 failed 4 times, most recent failure: Lost task
47.3 in stage 11.0 (TID 12380) (ip-10-18-138-81.heap executor 6):
org.apache.hudi.exception.HoodieRemoteException: Connect to localhost:26754
[localhost/127.0.0.1] failed: Connection refused (Connection refused)
at
org.apache.hudi.common.table.view.RemoteHoodieTableFileSystemView.getLatestFileSlicesStreamFromParams(RemoteHoodieTableFileSystemView.java:313)
at
org.apache.hudi.common.table.view.RemoteHoodieTableFileSystemView.getLatestFileSlicesBeforeOrOn(RemoteHoodieTableFileSystemView.java:347)
at
org.apache.hudi.table.action.deltacommit.SparkUpsertDeltaCommitPartitioner.getSmallFileCandidates(SparkUpsertDeltaCommitPartitioner.java:106)
at
org.apache.hudi.table.action.deltacommit.SparkUpsertDeltaCommitPartitioner.getSmallFiles(SparkUpsertDeltaCommitPartitioner.java:66)
at
org.apache.hudi.table.action.commit.UpsertPartitioner.lambda$getSmallFilesForPartitions$f1d92f9e$1(UpsertPartitioner.java:285)
at
org.apache.spark.api.java.JavaPairRDD$.$anonfun$pairFunToScalaFun$1(JavaPairRDD.scala:1073)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
at
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
at
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
at scala.collection.AbstractIterator.to(Iterator.scala:1431)
at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1431)
at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1431)
at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1021)
at
org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2269)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:138)
at
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1516)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]