Repository: spark Updated Branches: refs/heads/branch-2.4 2c700ee30 -> 0a70afdc0
[SPARK-25644][SS] Fix java foreachBatch in DataStreamWriter ## What changes were proposed in this pull request? The java `foreachBatch` API in `DataStreamWriter` should accept `java.lang.Long` rather `scala.Long`. ## How was this patch tested? New java test. Closes #22633 from zsxwing/fix-java-foreachbatch. Authored-by: Shixiong Zhu <zsxw...@gmail.com> Signed-off-by: Shixiong Zhu <zsxw...@gmail.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0a70afdc Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0a70afdc Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0a70afdc Branch: refs/heads/branch-2.4 Commit: 0a70afdc08d76f84c59ec50f2f92144f54271602 Parents: 2c700ee Author: Shixiong Zhu <zsxw...@gmail.com> Authored: Fri Oct 5 10:45:15 2018 -0700 Committer: Shixiong Zhu <zsxw...@gmail.com> Committed: Fri Oct 5 11:18:49 2018 -0700 ---------------------------------------------------------------------- .../spark/sql/streaming/DataStreamWriter.scala | 2 +- .../JavaDataStreamReaderWriterSuite.java | 89 ++++++++++++++++++++ 2 files changed, 90 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/0a70afdc/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala index 735fd17..4eb2918 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala @@ -379,7 +379,7 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) { * @since 2.4.0 */ @InterfaceStability.Evolving - def foreachBatch(function: VoidFunction2[Dataset[T], Long]): DataStreamWriter[T] = { + def foreachBatch(function: VoidFunction2[Dataset[T], java.lang.Long]): DataStreamWriter[T] = { foreachBatch((batchDs: Dataset[T], batchId: Long) => function.call(batchDs, batchId)) } http://git-wip-us.apache.org/repos/asf/spark/blob/0a70afdc/sql/core/src/test/java/test/org/apache/spark/sql/streaming/JavaDataStreamReaderWriterSuite.java ---------------------------------------------------------------------- diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/streaming/JavaDataStreamReaderWriterSuite.java b/sql/core/src/test/java/test/org/apache/spark/sql/streaming/JavaDataStreamReaderWriterSuite.java new file mode 100644 index 0000000..48cdb26 --- /dev/null +++ b/sql/core/src/test/java/test/org/apache/spark/sql/streaming/JavaDataStreamReaderWriterSuite.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package test.org.apache.spark.sql.streaming; + +import java.io.File; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import org.apache.spark.api.java.function.VoidFunction2; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.ForeachWriter; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.streaming.StreamingQuery; +import org.apache.spark.sql.test.TestSparkSession; +import org.apache.spark.util.Utils; + +public class JavaDataStreamReaderWriterSuite { + private SparkSession spark; + private String input; + + @Before + public void setUp() { + spark = new TestSparkSession(); + input = Utils.createTempDir(System.getProperty("java.io.tmpdir"), "input").toString(); + } + + @After + public void tearDown() { + try { + Utils.deleteRecursively(new File(input)); + } finally { + spark.stop(); + spark = null; + } + } + + @Test + public void testForeachBatchAPI() { + StreamingQuery query = spark + .readStream() + .textFile(input) + .writeStream() + .foreachBatch(new VoidFunction2<Dataset<String>, Long>() { + @Override + public void call(Dataset<String> v1, Long v2) throws Exception {} + }) + .start(); + query.stop(); + } + + @Test + public void testForeachAPI() { + StreamingQuery query = spark + .readStream() + .textFile(input) + .writeStream() + .foreach(new ForeachWriter<String>() { + @Override + public boolean open(long partitionId, long epochId) { + return true; + } + + @Override + public void process(String value) {} + + @Override + public void close(Throwable errorOrNull) {} + }) + .start(); + query.stop(); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org