Lin Liu created HUDI-7451:
-----------------------------
Summary: Fix flaky test from CDC
Key: HUDI-7451
URL: https://issues.apache.org/jira/browse/HUDI-7451
Project: Apache Hudi
Issue Type: Bug
Reporter: Lin Liu
Assignee: Lin Liu
{code:java}
[ERROR] Tests run: 3, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 20.372
s <<< FAILURE! - in org.apache.hudi.functional.cdc.TestCDCStreamingSuite
[ERROR] cdcStreaming{HoodieCDCSupplementalLoggingMode}[3] Time elapsed: 6.127
s <<< ERROR!
org.scalatest.exceptions.TestFailedException: Results do not match for query:
Timezone:
sun.util.calendar.ZoneInfo[id="Etc/UTC",offset=0,dstSavings=0,useDaylight=false,transitions=0,lastRule=null]
Timezone Env: == Parsed Logical Plan ==
'Sort ['country ASC NULLS FIRST], true
+- Project [country#162815, population#162816L]
+- Relation
[_hoodie_commit_time#162810,_hoodie_commit_seqno#162811,_hoodie_record_key#162812,_hoodie_partition_path#162813,_hoodie_file_name#162814,country#162815,population#162816L,ts#162817]
parquet== Analyzed Logical Plan ==
country: string, population: bigint
Sort [country#162815 ASC NULLS FIRST], true
+- Project [country#162815, population#162816L]
+- Relation
[_hoodie_commit_time#162810,_hoodie_commit_seqno#162811,_hoodie_record_key#162812,_hoodie_partition_path#162813,_hoodie_file_name#162814,country#162815,population#162816L,ts#162817]
parquet== Optimized Logical Plan ==
Sort [country#162815 ASC NULLS FIRST], true
+- Project [country#162815, population#162816L]
+- Relation
[_hoodie_commit_time#162810,_hoodie_commit_seqno#162811,_hoodie_record_key#162812,_hoodie_partition_path#162813,_hoodie_file_name#162814,country#162815,population#162816L,ts#162817]
parquet== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=true
+- == Final Plan ==
*(1) Sort [country#162815 ASC NULLS FIRST], true, 0
+- AQEShuffleRead coalesced
+- ShuffleQueryStage 0
+- Exchange rangepartitioning(country#162815 ASC NULLS FIRST, 4),
ENSURE_REQUIREMENTS, [plan_id=71307]
+- FileScan parquet [country#162815,population#162816L] Batched:
false, DataFilters: [], Format: Parquet, Location: HoodieFileIndex(1
paths)[/tmp/junit8132202571330653449/dataset/country_to_population_table],
PartitionFilters: [], PushedFilters: [], ReadSchema:
struct<country:string,population:bigint>
+- == Initial Plan ==
Sort [country#162815 ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(country#162815 ASC NULLS FIRST, 4),
ENSURE_REQUIREMENTS, [plan_id=71307]
+- FileScan parquet [country#162815,population#162816L] Batched: false,
DataFilters: [], Format: Parquet, Location: HoodieFileIndex(1
paths)[/tmp/junit8132202571330653449/dataset/country_to_population_table],
PartitionFilters: [], PushedFilters: [], ReadSchema:
struct<country:string,population:bigint>== Results ==== Results ==
!== Correct Answer - 4 == == Spark Answer - 3 ==
!struct<> struct<country:string,population:bigint>
![Canada,1] [China,50]
![China,50] [Singapore,20]
![Singapore,22] [US,205]
![US,204]
at org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472)
at
org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:471)
at
org.apache.spark.sql.QueryTest$.newAssertionFailedException(QueryTest.scala:233)
at org.scalatest.Assertions.fail(Assertions.scala:933)
at org.scalatest.Assertions.fail$(Assertions.scala:929)
at org.apache.spark.sql.QueryTest$.fail(QueryTest.scala:233)
at org.apache.spark.sql.QueryTest$.checkAnswer(QueryTest.scala:243)
at
org.apache.hudi.functional.cdc.TestCDCStreamingSuite.cdcStreaming(TestCDCStreamingSuite.scala:207)
1284938 [stream execution thread for [id =
63e38842-d368-4717-a1b7-322997a146c9, runId =
db9ade39-841c-40a7-8780-9fbd0ea5ca76]] WARN
org.apache.spark.rpc.netty.NettyRpcEnv [] - Ignored failure:
java.util.concurrent.RejectedExecutionException: Task
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask@56c72779
rejected from
java.util.concurrent.ScheduledThreadPoolExecutor@38c2932e[Terminated, pool size
= 0, active threads = 0, queued tasks = 0, completed tasks = 0]
Exception in thread "stream execution thread for [id =
63e38842-d368-4717-a1b7-322997a146c9, runId =
db9ade39-841c-40a7-8780-9fbd0ea5ca76]" org.apache.spark.SparkException:
Exception thrown in awaitResult:
at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:301)
at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:103)
at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:87)
at
org.apache.spark.sql.execution.streaming.state.StateStoreCoordinatorRef.deactivateInstances(StateStoreCoordinator.scala:119)
at
org.apache.spark.sql.streaming.StreamingQueryManager.notifyQueryTermination(StreamingQueryManager.scala:402)
at
org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$3(StreamExecution.scala:352)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at
org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:77)
at
org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:333)
at
org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:209)
Caused by: org.apache.spark.rpc.RpcEnvStoppedException: RpcEnv already stopped.
at org.apache.spark.rpc.netty.Dispatcher.postMessage(Dispatcher.scala:176)
at
org.apache.spark.rpc.netty.Dispatcher.postLocalMessage(Dispatcher.scala:144)
at org.apache.spark.rpc.netty.NettyRpcEnv.askAbortable(NettyRpcEnv.scala:242)
at
org.apache.spark.rpc.netty.NettyRpcEndpointRef.askAbortable(NettyRpcEnv.scala:554)
at org.apache.spark.rpc.netty.NettyRpcEndpointRef.ask(NettyRpcEnv.scala:558)
at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:102)
... 8 more {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)