Repository: spark
Updated Branches:
  refs/heads/master 5a617ec4e -> 9cbf105ab


[SPARK-25644][SS][FOLLOWUP][BUILD] Fix Scala 2.12 build error due to 
foreachBatch

## What changes were proposed in this pull request?

This PR fixes the Scala-2.12 build error due to ambiguity in `foreachBatch` 
test cases.
- 
https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Test%20(Dashboard)/job/spark-master-test-maven-hadoop-2.7-ubuntu-scala-2.12/428/console
```scala
[error] 
/home/jenkins/workspace/spark-master-test-maven-hadoop-2.7-ubuntu-scala-2.12/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/ForeachBatchSinkSuite.scala:102:
 ambiguous reference to overloaded definition,
[error] both method foreachBatch in class DataStreamWriter of type (function: 
org.apache.spark.api.java.function.VoidFunction2[org.apache.spark.sql.Dataset[Int],Long])org.apache.spark.sql.streaming.DataStreamWriter[Int]
[error] and  method foreachBatch in class DataStreamWriter of type (function: 
(org.apache.spark.sql.Dataset[Int], Long) => 
Unit)org.apache.spark.sql.streaming.DataStreamWriter[Int]
[error] match argument types ((org.apache.spark.sql.Dataset[Int], Any) => Unit)
[error]       ds.writeStream.foreachBatch((_, _) => 
{}).trigger(Trigger.Continuous("1 second")).start()
[error]                      ^
[error] 
/home/jenkins/workspace/spark-master-test-maven-hadoop-2.7-ubuntu-scala-2.12/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/ForeachBatchSinkSuite.scala:106:
 ambiguous reference to overloaded definition,
[error] both method foreachBatch in class DataStreamWriter of type (function: 
org.apache.spark.api.java.function.VoidFunction2[org.apache.spark.sql.Dataset[Int],Long])org.apache.spark.sql.streaming.DataStreamWriter[Int]
[error] and  method foreachBatch in class DataStreamWriter of type (function: 
(org.apache.spark.sql.Dataset[Int], Long) => 
Unit)org.apache.spark.sql.streaming.DataStreamWriter[Int]
[error] match argument types ((org.apache.spark.sql.Dataset[Int], Any) => Unit)
[error]       ds.writeStream.foreachBatch((_, _) => 
{}).partitionBy("value").start()
[error]                      ^
```

## How was this patch tested?

Manual.

Since this failure occurs in Scala-2.12 profile and test cases, Jenkins will 
not test this. We need to build with Scala-2.12 and run the tests.

Closes #22649 from dongjoon-hyun/SPARK-SCALA212.

Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9cbf105a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9cbf105a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9cbf105a

Branch: refs/heads/master
Commit: 9cbf105ab1256d65f027115ba5505842ce8fffe3
Parents: 5a617ec
Author: Dongjoon Hyun <[email protected]>
Authored: Sat Oct 6 09:40:42 2018 -0700
Committer: Dongjoon Hyun <[email protected]>
Committed: Sat Oct 6 09:40:42 2018 -0700

----------------------------------------------------------------------
 .../apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala  | 4 ++--
 .../sql/execution/streaming/sources/ForeachBatchSinkSuite.scala | 5 +++--
 2 files changed, 5 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/9cbf105a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
index 39c2cde..5ee7699 100644
--- 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
+++ 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
@@ -33,7 +33,7 @@ import org.apache.kafka.common.TopicPartition
 import org.scalatest.concurrent.PatienceConfiguration.Timeout
 import org.scalatest.time.SpanSugar._
 
-import org.apache.spark.sql.{ForeachWriter, SparkSession}
+import org.apache.spark.sql.{Dataset, ForeachWriter, SparkSession}
 import 
org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2Relation
 import org.apache.spark.sql.execution.exchange.ReusedExchangeExec
 import org.apache.spark.sql.execution.streaming._
@@ -900,7 +900,7 @@ abstract class KafkaMicroBatchSourceSuiteBase extends 
KafkaSourceSuiteBase {
     }
     testUtils.waitUntilOffsetAppears(topicPartition, 5)
 
-    val q = ds.writeStream.foreachBatch { (ds, epochId) =>
+    val q = ds.writeStream.foreachBatch { (ds: Dataset[String], epochId: Long) 
=>
       if (epochId == 0) {
         // Send more message before the tasks of the current batch start 
reading the current batch
         // data, so that the executors will prefetch messages in the next 
batch and drop them. In

http://git-wip-us.apache.org/repos/asf/spark/blob/9cbf105a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/ForeachBatchSinkSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/ForeachBatchSinkSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/ForeachBatchSinkSuite.scala
index 71dff44..3e9ccb0 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/ForeachBatchSinkSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/ForeachBatchSinkSuite.scala
@@ -99,11 +99,12 @@ class ForeachBatchSinkSuite extends StreamTest {
     }
     assert(ex1.getMessage.contains("foreachBatch function cannot be null"))
     val ex2 = intercept[AnalysisException] {
-      ds.writeStream.foreachBatch((_, _) => {}).trigger(Trigger.Continuous("1 
second")).start()
+      ds.writeStream.foreachBatch((_: Dataset[Int], _: Long) => {})
+        .trigger(Trigger.Continuous("1 second")).start()
     }
     assert(ex2.getMessage.contains("'foreachBatch' is not supported with 
continuous trigger"))
     val ex3 = intercept[AnalysisException] {
-      ds.writeStream.foreachBatch((_, _) => {}).partitionBy("value").start()
+      ds.writeStream.foreachBatch((_: Dataset[Int], _: Long) => 
{}).partitionBy("value").start()
     }
     assert(ex3.getMessage.contains("'foreachBatch' does not support 
partitioning"))
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to