This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 92a104e680bb03a7ba16068ae80f055bbd82ea3a
Author: Etienne Chauchot <[email protected]>
AuthorDate: Fri Dec 28 10:28:18 2018 +0100

    Add ReadSourceTranslatorStreaming
---
 ...mingSource.java => DatasetSourceStreaming.java} |  2 +-
 .../streaming/ReadSourceTranslatorStreaming.java   | 76 ++++++++++++++++++++++
 2 files changed, 77 insertions(+), 1 deletion(-)

diff --git 
a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/DatasetStreamingSource.java
 
b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/DatasetSourceStreaming.java
similarity index 99%
rename from 
runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/DatasetStreamingSource.java
rename to 
runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/DatasetSourceStreaming.java
index 6947b6d..fad68d3 100644
--- 
a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/DatasetStreamingSource.java
+++ 
b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/DatasetSourceStreaming.java
@@ -55,7 +55,7 @@ import scala.collection.immutable.Map;
  * This is a spark structured streaming {@link DataSourceV2} implementation. 
As Continuous streaming
  * is tagged experimental in spark, this class does no implement {@link 
ContinuousReadSupport}.
  */
-public class DatasetStreamingSource<T> implements DataSourceV2, 
MicroBatchReadSupport{
+public class DatasetSourceStreaming<T> implements DataSourceV2, 
MicroBatchReadSupport{
 
   private int numPartitions;
   private Long bundleSize;
diff --git 
a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/ReadSourceTranslatorStreaming.java
 
b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/ReadSourceTranslatorStreaming.java
new file mode 100644
index 0000000..6066822
--- /dev/null
+++ 
b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/ReadSourceTranslatorStreaming.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package 
org.apache.beam.runners.spark.structuredstreaming.translation.streaming;
+
+import java.io.IOException;
+import org.apache.beam.runners.core.construction.ReadTranslation;
+import 
org.apache.beam.runners.spark.structuredstreaming.translation.TransformTranslator;
+import 
org.apache.beam.runners.spark.structuredstreaming.translation.TranslationContext;
+import 
org.apache.beam.runners.spark.structuredstreaming.translation.batch.DatasetSourceBatch;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.io.UnboundedSource;
+import org.apache.beam.sdk.runners.AppliedPTransform;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.spark.api.java.function.MapFunction;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+
+class ReadSourceTranslatorStreaming<T>
+    implements TransformTranslator<PTransform<PBegin, PCollection<T>>> {
+
+  private String SOURCE_PROVIDER_CLASS = 
DatasetSourceStreaming.class.getCanonicalName();
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public void translateTransform(
+      PTransform<PBegin, PCollection<T>> transform, TranslationContext 
context) {
+    AppliedPTransform<PBegin, PCollection<T>, PTransform<PBegin, 
PCollection<T>>> rootTransform =
+        (AppliedPTransform<PBegin, PCollection<T>, PTransform<PBegin, 
PCollection<T>>>)
+            context.getCurrentTransform();
+
+    UnboundedSource<T, UnboundedSource.CheckpointMark> source;
+    try {
+       source = ReadTranslation
+          .unboundedSourceFromTransform(rootTransform);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+    SparkSession sparkSession = context.getSparkSession();
+
+    Dataset<Row> rowDataset = 
sparkSession.readStream().format(SOURCE_PROVIDER_CLASS).load();
+
+    //TODO pass the source and the translation context serialized as string to 
the DatasetSource
+    MapFunction<Row, WindowedValue> func = new MapFunction<Row, 
WindowedValue>() {
+      @Override public WindowedValue call(Row value) throws Exception {
+        //there is only one value put in each Row by the InputPartitionReader
+        return value.<WindowedValue>getAs(0);
+      }
+    };
+    //TODO: is there a better way than using the raw WindowedValue? Can an 
Encoder<WindowedVAlue<T>>
+    // be created ?
+    Dataset<WindowedValue> dataset = rowDataset.map(func, 
Encoders.kryo(WindowedValue.class));
+
+    PCollection<T> output = (PCollection<T>) context.getOutput();
+    context.putDatasetRaw(output, dataset);
+  }
+}

Reply via email to