Repository: spark
Updated Branches:
  refs/heads/branch-2.4 2c700ee30 -> 0a70afdc0


[SPARK-25644][SS] Fix java foreachBatch in DataStreamWriter

## What changes were proposed in this pull request?

The java `foreachBatch` API in `DataStreamWriter` should accept 
`java.lang.Long` rather `scala.Long`.

## How was this patch tested?

New java test.

Closes #22633 from zsxwing/fix-java-foreachbatch.

Authored-by: Shixiong Zhu <zsxw...@gmail.com>
Signed-off-by: Shixiong Zhu <zsxw...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0a70afdc
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0a70afdc
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0a70afdc

Branch: refs/heads/branch-2.4
Commit: 0a70afdc08d76f84c59ec50f2f92144f54271602
Parents: 2c700ee
Author: Shixiong Zhu <zsxw...@gmail.com>
Authored: Fri Oct 5 10:45:15 2018 -0700
Committer: Shixiong Zhu <zsxw...@gmail.com>
Committed: Fri Oct 5 11:18:49 2018 -0700

----------------------------------------------------------------------
 .../spark/sql/streaming/DataStreamWriter.scala  |  2 +-
 .../JavaDataStreamReaderWriterSuite.java        | 89 ++++++++++++++++++++
 2 files changed, 90 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/0a70afdc/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
index 735fd17..4eb2918 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
@@ -379,7 +379,7 @@ final class DataStreamWriter[T] private[sql](ds: 
Dataset[T]) {
    * @since 2.4.0
    */
   @InterfaceStability.Evolving
-  def foreachBatch(function: VoidFunction2[Dataset[T], Long]): 
DataStreamWriter[T] = {
+  def foreachBatch(function: VoidFunction2[Dataset[T], java.lang.Long]): 
DataStreamWriter[T] = {
     foreachBatch((batchDs: Dataset[T], batchId: Long) => 
function.call(batchDs, batchId))
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/0a70afdc/sql/core/src/test/java/test/org/apache/spark/sql/streaming/JavaDataStreamReaderWriterSuite.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/java/test/org/apache/spark/sql/streaming/JavaDataStreamReaderWriterSuite.java
 
b/sql/core/src/test/java/test/org/apache/spark/sql/streaming/JavaDataStreamReaderWriterSuite.java
new file mode 100644
index 0000000..48cdb26
--- /dev/null
+++ 
b/sql/core/src/test/java/test/org/apache/spark/sql/streaming/JavaDataStreamReaderWriterSuite.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package test.org.apache.spark.sql.streaming;
+
+import java.io.File;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.spark.api.java.function.VoidFunction2;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.ForeachWriter;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.streaming.StreamingQuery;
+import org.apache.spark.sql.test.TestSparkSession;
+import org.apache.spark.util.Utils;
+
+public class JavaDataStreamReaderWriterSuite {
+  private SparkSession spark;
+  private String input;
+
+  @Before
+  public void setUp() {
+    spark = new TestSparkSession();
+    input = Utils.createTempDir(System.getProperty("java.io.tmpdir"), 
"input").toString();
+  }
+
+  @After
+  public void tearDown() {
+    try {
+      Utils.deleteRecursively(new File(input));
+    } finally {
+      spark.stop();
+      spark = null;
+    }
+  }
+
+  @Test
+  public void testForeachBatchAPI() {
+    StreamingQuery query = spark
+      .readStream()
+      .textFile(input)
+      .writeStream()
+      .foreachBatch(new VoidFunction2<Dataset<String>, Long>() {
+        @Override
+        public void call(Dataset<String> v1, Long v2) throws Exception {}
+      })
+      .start();
+    query.stop();
+  }
+
+  @Test
+  public void testForeachAPI() {
+    StreamingQuery query = spark
+      .readStream()
+      .textFile(input)
+      .writeStream()
+      .foreach(new ForeachWriter<String>() {
+        @Override
+        public boolean open(long partitionId, long epochId) {
+          return true;
+        }
+
+        @Override
+        public void process(String value) {}
+
+        @Override
+        public void close(Throwable errorOrNull) {}
+      })
+      .start();
+    query.stop();
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to