Repository: spark
Updated Branches:
  refs/heads/master 381a967a7 -> 810d59ce4


[SPARK-24882][FOLLOWUP] Fix flaky synchronization in Kafka tests.

## What changes were proposed in this pull request?

Fix flaky synchronization in Kafka tests - we need to use the scan config that 
was persisted rather than reconstructing it to identify the stream's current 
configuration.

We caught most instances of this in the original PR, but this one slipped 
through.

## How was this patch tested?

n/a

Closes #22245 from jose-torres/fixflake.

Authored-by: Jose Torres <torres.joseph.f+git...@gmail.com>
Signed-off-by: Shixiong Zhu <zsxw...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/810d59ce
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/810d59ce
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/810d59ce

Branch: refs/heads/master
Commit: 810d59ce44e43f725d1b6d822166c2d97ff49929
Parents: 381a967
Author: Jose Torres <torres.joseph.f+git...@gmail.com>
Authored: Mon Aug 27 11:04:39 2018 -0700
Committer: Shixiong Zhu <zsxw...@gmail.com>
Committed: Mon Aug 27 11:04:39 2018 -0700

----------------------------------------------------------------------
 .../spark/sql/kafka010/KafkaContinuousSourceSuite.scala   | 10 +++++-----
 1 file changed, 5 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/810d59ce/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala
 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala
index 3216650..5d68a14 100644
--- 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala
+++ 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.sql.kafka010
 
 import org.apache.spark.sql.Dataset
-import 
org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2Relation
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExec
 import org.apache.spark.sql.streaming.Trigger
 
 // Run tests in KafkaSourceSuiteBase in continuous execution mode.
@@ -60,10 +60,10 @@ class KafkaContinuousSourceTopicDeletionSuite extends 
KafkaContinuousTest {
         testUtils.createTopic(topic2, partitions = 5)
         eventually(timeout(streamingTimeout)) {
           assert(
-            query.lastExecution.logical.collectFirst {
-              case r: StreamingDataSourceV2Relation
-                  if r.readSupport.isInstanceOf[KafkaContinuousReadSupport] =>
-                
r.scanConfigBuilder.build().asInstanceOf[KafkaContinuousScanConfig]
+            query.lastExecution.executedPlan.collectFirst {
+              case scan: DataSourceV2ScanExec
+                if scan.readSupport.isInstanceOf[KafkaContinuousReadSupport] =>
+                scan.scanConfig.asInstanceOf[KafkaContinuousScanConfig]
             }.exists { config =>
               // Ensure the new topic is present and the old topic is gone.
               config.knownPartitions.exists(_.topic == topic2)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to