spark git commit: [SPARK-25644][SS] Fix java foreachBatch in DataStreamWriter

2018-10-05 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/branch-2.4 2c700ee30 -> 0a70afdc0


[SPARK-25644][SS] Fix java foreachBatch in DataStreamWriter

## What changes were proposed in this pull request?

The java `foreachBatch` API in `DataStreamWriter` should accept 
`java.lang.Long` rather `scala.Long`.

## How was this patch tested?

New java test.

Closes #22633 from zsxwing/fix-java-foreachbatch.

Authored-by: Shixiong Zhu 
Signed-off-by: Shixiong Zhu 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0a70afdc
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0a70afdc
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0a70afdc

Branch: refs/heads/branch-2.4
Commit: 0a70afdc08d76f84c59ec50f2f92144f54271602
Parents: 2c700ee
Author: Shixiong Zhu 
Authored: Fri Oct 5 10:45:15 2018 -0700
Committer: Shixiong Zhu 
Committed: Fri Oct 5 11:18:49 2018 -0700

--
 .../spark/sql/streaming/DataStreamWriter.scala  |  2 +-
 .../JavaDataStreamReaderWriterSuite.java| 89 
 2 files changed, 90 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/0a70afdc/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
index 735fd17..4eb2918 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
@@ -379,7 +379,7 @@ final class DataStreamWriter[T] private[sql](ds: 
Dataset[T]) {
* @since 2.4.0
*/
   @InterfaceStability.Evolving
-  def foreachBatch(function: VoidFunction2[Dataset[T], Long]): 
DataStreamWriter[T] = {
+  def foreachBatch(function: VoidFunction2[Dataset[T], java.lang.Long]): 
DataStreamWriter[T] = {
 foreachBatch((batchDs: Dataset[T], batchId: Long) => 
function.call(batchDs, batchId))
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/0a70afdc/sql/core/src/test/java/test/org/apache/spark/sql/streaming/JavaDataStreamReaderWriterSuite.java
--
diff --git 
a/sql/core/src/test/java/test/org/apache/spark/sql/streaming/JavaDataStreamReaderWriterSuite.java
 
b/sql/core/src/test/java/test/org/apache/spark/sql/streaming/JavaDataStreamReaderWriterSuite.java
new file mode 100644
index 000..48cdb26
--- /dev/null
+++ 
b/sql/core/src/test/java/test/org/apache/spark/sql/streaming/JavaDataStreamReaderWriterSuite.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package test.org.apache.spark.sql.streaming;
+
+import java.io.File;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.spark.api.java.function.VoidFunction2;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.ForeachWriter;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.streaming.StreamingQuery;
+import org.apache.spark.sql.test.TestSparkSession;
+import org.apache.spark.util.Utils;
+
+public class JavaDataStreamReaderWriterSuite {
+  private SparkSession spark;
+  private String input;
+
+  @Before
+  public void setUp() {
+spark = new TestSparkSession();
+input = Utils.createTempDir(System.getProperty("java.io.tmpdir"), 
"input").toString();
+  }
+
+  @After
+  public void tearDown() {
+try {
+  Utils.deleteRecursively(new File(input));
+} finally {
+  spark.stop();
+  spark = null;
+}
+  }
+
+  @Test
+  public void testForeachBatchAPI() {
+StreamingQuery query = spark
+  .readStream()
+  .textFile(input)
+  .writeStream()
+  .foreachBatch(new VoidFunction2, Long>() {
+@Override
+public void call(Dataset v1, Long v2) throws Exception {}
+  })
+  .start();
+query.stop();
+  }
+
+  @Test
+  public 

spark git commit: [SPARK-25644][SS] Fix java foreachBatch in DataStreamWriter

2018-10-05 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/master 434ada12a -> 7dcc90fbb


[SPARK-25644][SS] Fix java foreachBatch in DataStreamWriter

## What changes were proposed in this pull request?

The java `foreachBatch` API in `DataStreamWriter` should accept 
`java.lang.Long` rather `scala.Long`.

## How was this patch tested?

New java test.

Closes #22633 from zsxwing/fix-java-foreachbatch.

Authored-by: Shixiong Zhu 
Signed-off-by: Shixiong Zhu 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7dcc90fb
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7dcc90fb
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7dcc90fb

Branch: refs/heads/master
Commit: 7dcc90fbb8dc75077819a5d8c42652f0c84424b5
Parents: 434ada1
Author: Shixiong Zhu 
Authored: Fri Oct 5 10:45:15 2018 -0700
Committer: Shixiong Zhu 
Committed: Fri Oct 5 10:45:15 2018 -0700

--
 .../spark/sql/streaming/DataStreamWriter.scala  |  2 +-
 .../JavaDataStreamReaderWriterSuite.java| 89 
 2 files changed, 90 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/7dcc90fb/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
index e9a1521..b23e86a 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
@@ -380,7 +380,7 @@ final class DataStreamWriter[T] private[sql](ds: 
Dataset[T]) {
* @since 2.4.0
*/
   @InterfaceStability.Evolving
-  def foreachBatch(function: VoidFunction2[Dataset[T], Long]): 
DataStreamWriter[T] = {
+  def foreachBatch(function: VoidFunction2[Dataset[T], java.lang.Long]): 
DataStreamWriter[T] = {
 foreachBatch((batchDs: Dataset[T], batchId: Long) => 
function.call(batchDs, batchId))
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/7dcc90fb/sql/core/src/test/java/test/org/apache/spark/sql/streaming/JavaDataStreamReaderWriterSuite.java
--
diff --git 
a/sql/core/src/test/java/test/org/apache/spark/sql/streaming/JavaDataStreamReaderWriterSuite.java
 
b/sql/core/src/test/java/test/org/apache/spark/sql/streaming/JavaDataStreamReaderWriterSuite.java
new file mode 100644
index 000..48cdb26
--- /dev/null
+++ 
b/sql/core/src/test/java/test/org/apache/spark/sql/streaming/JavaDataStreamReaderWriterSuite.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package test.org.apache.spark.sql.streaming;
+
+import java.io.File;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.spark.api.java.function.VoidFunction2;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.ForeachWriter;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.streaming.StreamingQuery;
+import org.apache.spark.sql.test.TestSparkSession;
+import org.apache.spark.util.Utils;
+
+public class JavaDataStreamReaderWriterSuite {
+  private SparkSession spark;
+  private String input;
+
+  @Before
+  public void setUp() {
+spark = new TestSparkSession();
+input = Utils.createTempDir(System.getProperty("java.io.tmpdir"), 
"input").toString();
+  }
+
+  @After
+  public void tearDown() {
+try {
+  Utils.deleteRecursively(new File(input));
+} finally {
+  spark.stop();
+  spark = null;
+}
+  }
+
+  @Test
+  public void testForeachBatchAPI() {
+StreamingQuery query = spark
+  .readStream()
+  .textFile(input)
+  .writeStream()
+  .foreachBatch(new VoidFunction2, Long>() {
+@Override
+public void call(Dataset v1, Long v2) throws Exception {}
+  })
+  .start();
+query.stop();
+  }
+
+  @Test
+  public void