Lin Liu created HUDI-7451:
-----------------------------

             Summary: Fix flaky test from CDC
                 Key: HUDI-7451
                 URL: https://issues.apache.org/jira/browse/HUDI-7451
             Project: Apache Hudi
          Issue Type: Bug
            Reporter: Lin Liu
            Assignee: Lin Liu


{code:java}
[ERROR] Tests run: 3, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 20.372 
s <<< FAILURE! - in org.apache.hudi.functional.cdc.TestCDCStreamingSuite
[ERROR] cdcStreaming{HoodieCDCSupplementalLoggingMode}[3]  Time elapsed: 6.127 
s  <<< ERROR!
org.scalatest.exceptions.TestFailedException: Results do not match for query:
Timezone: 
sun.util.calendar.ZoneInfo[id="Etc/UTC",offset=0,dstSavings=0,useDaylight=false,transitions=0,lastRule=null]
Timezone Env: == Parsed Logical Plan ==
'Sort ['country ASC NULLS FIRST], true
+- Project [country#162815, population#162816L]
   +- Relation 
[_hoodie_commit_time#162810,_hoodie_commit_seqno#162811,_hoodie_record_key#162812,_hoodie_partition_path#162813,_hoodie_file_name#162814,country#162815,population#162816L,ts#162817]
 parquet== Analyzed Logical Plan ==
country: string, population: bigint
Sort [country#162815 ASC NULLS FIRST], true
+- Project [country#162815, population#162816L]
   +- Relation 
[_hoodie_commit_time#162810,_hoodie_commit_seqno#162811,_hoodie_record_key#162812,_hoodie_partition_path#162813,_hoodie_file_name#162814,country#162815,population#162816L,ts#162817]
 parquet== Optimized Logical Plan ==
Sort [country#162815 ASC NULLS FIRST], true
+- Project [country#162815, population#162816L]
   +- Relation 
[_hoodie_commit_time#162810,_hoodie_commit_seqno#162811,_hoodie_record_key#162812,_hoodie_partition_path#162813,_hoodie_file_name#162814,country#162815,population#162816L,ts#162817]
 parquet== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=true
+- == Final Plan ==
   *(1) Sort [country#162815 ASC NULLS FIRST], true, 0
   +- AQEShuffleRead coalesced
      +- ShuffleQueryStage 0
         +- Exchange rangepartitioning(country#162815 ASC NULLS FIRST, 4), 
ENSURE_REQUIREMENTS, [plan_id=71307]
            +- FileScan parquet [country#162815,population#162816L] Batched: 
false, DataFilters: [], Format: Parquet, Location: HoodieFileIndex(1 
paths)[/tmp/junit8132202571330653449/dataset/country_to_population_table], 
PartitionFilters: [], PushedFilters: [], ReadSchema: 
struct<country:string,population:bigint>
+- == Initial Plan ==
   Sort [country#162815 ASC NULLS FIRST], true, 0
   +- Exchange rangepartitioning(country#162815 ASC NULLS FIRST, 4), 
ENSURE_REQUIREMENTS, [plan_id=71307]
      +- FileScan parquet [country#162815,population#162816L] Batched: false, 
DataFilters: [], Format: Parquet, Location: HoodieFileIndex(1 
paths)[/tmp/junit8132202571330653449/dataset/country_to_population_table], 
PartitionFilters: [], PushedFilters: [], ReadSchema: 
struct<country:string,population:bigint>== Results ==== Results ==
!== Correct Answer - 4 ==   == Spark Answer - 3 ==
!struct<>                   struct<country:string,population:bigint>
![Canada,1]                 [China,50]
![China,50]                 [Singapore,20]
![Singapore,22]             [US,205]
![US,204]                          
   at org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472)
   at 
org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:471)
   at 
org.apache.spark.sql.QueryTest$.newAssertionFailedException(QueryTest.scala:233)
   at org.scalatest.Assertions.fail(Assertions.scala:933)
   at org.scalatest.Assertions.fail$(Assertions.scala:929)
   at org.apache.spark.sql.QueryTest$.fail(QueryTest.scala:233)
   at org.apache.spark.sql.QueryTest$.checkAnswer(QueryTest.scala:243)
   at 
org.apache.hudi.functional.cdc.TestCDCStreamingSuite.cdcStreaming(TestCDCStreamingSuite.scala:207)
1284938 [stream execution thread for [id = 
63e38842-d368-4717-a1b7-322997a146c9, runId = 
db9ade39-841c-40a7-8780-9fbd0ea5ca76]] WARN  
org.apache.spark.rpc.netty.NettyRpcEnv [] - Ignored failure: 
java.util.concurrent.RejectedExecutionException: Task 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask@56c72779 
rejected from 
java.util.concurrent.ScheduledThreadPoolExecutor@38c2932e[Terminated, pool size 
= 0, active threads = 0, queued tasks = 0, completed tasks = 0]
Exception in thread "stream execution thread for [id = 
63e38842-d368-4717-a1b7-322997a146c9, runId = 
db9ade39-841c-40a7-8780-9fbd0ea5ca76]" org.apache.spark.SparkException: 
Exception thrown in awaitResult: 
   at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:301)
   at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
   at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:103)
   at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:87)
   at 
org.apache.spark.sql.execution.streaming.state.StateStoreCoordinatorRef.deactivateInstances(StateStoreCoordinator.scala:119)
   at 
org.apache.spark.sql.streaming.StreamingQueryManager.notifyQueryTermination(StreamingQueryManager.scala:402)
   at 
org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$3(StreamExecution.scala:352)
   at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
   at 
org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:77)
   at 
org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:333)
   at 
org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:209)
Caused by: org.apache.spark.rpc.RpcEnvStoppedException: RpcEnv already stopped.
   at org.apache.spark.rpc.netty.Dispatcher.postMessage(Dispatcher.scala:176)
   at 
org.apache.spark.rpc.netty.Dispatcher.postLocalMessage(Dispatcher.scala:144)
   at org.apache.spark.rpc.netty.NettyRpcEnv.askAbortable(NettyRpcEnv.scala:242)
   at 
org.apache.spark.rpc.netty.NettyRpcEndpointRef.askAbortable(NettyRpcEnv.scala:554)
   at org.apache.spark.rpc.netty.NettyRpcEndpointRef.ask(NettyRpcEnv.scala:558)
   at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:102)
   ... 8 more {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to