http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/JoinExamples.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/JoinExamples.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/JoinExamples.java index 60f6788..a6e1e37 100644 --- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/JoinExamples.java +++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/JoinExamples.java @@ -50,109 +50,109 @@ import org.joda.time.Duration; * */ public class JoinExamples { - static PCollection<String> joinEvents(PCollection<String> streamA, - PCollection<String> streamB) throws Exception { - - final TupleTag<String> firstInfoTag = new TupleTag<>(); - final TupleTag<String> secondInfoTag = new TupleTag<>(); - - // transform both input collections to tuple collections, where the keys are country - // codes in both cases. - PCollection<KV<String, String>> firstInfo = streamA.apply( - ParDo.of(new ExtractEventDataFn())); - PCollection<KV<String, String>> secondInfo = streamB.apply( - ParDo.of(new ExtractEventDataFn())); - - // country code 'key' -> CGBKR (<event info>, <country name>) - PCollection<KV<String, CoGbkResult>> kvpCollection = KeyedPCollectionTuple - .of(firstInfoTag, firstInfo) - .and(secondInfoTag, secondInfo) - .apply(CoGroupByKey.<String>create()); - - // Process the CoGbkResult elements generated by the CoGroupByKey transform. - // country code 'key' -> string of <event info>, <country name> - PCollection<KV<String, String>> finalResultCollection = - kvpCollection.apply(ParDo.named("Process").of( - new DoFn<KV<String, CoGbkResult>, KV<String, String>>() { - private static final long serialVersionUID = 0; - - @Override - public void processElement(ProcessContext c) { - KV<String, CoGbkResult> e = c.element(); - String key = e.getKey(); - - String defaultA = "NO_VALUE"; - - // the following getOnly is a bit tricky because it expects to have - // EXACTLY ONE value in the corresponding stream and for the corresponding key. - - String lineA = e.getValue().getOnly(firstInfoTag, defaultA); - for (String lineB : c.element().getValue().getAll(secondInfoTag)) { - // Generate a string that combines information from both collection values - c.output(KV.of(key, "Value A: " + lineA + " - Value B: " + lineB)); - } - } - })); - - return finalResultCollection - .apply(ParDo.named("Format").of(new DoFn<KV<String, String>, String>() { - private static final long serialVersionUID = 0; - - @Override - public void processElement(ProcessContext c) { - String result = c.element().getKey() + " -> " + c.element().getValue(); - System.out.println(result); - c.output(result); - } - })); - } - - static class ExtractEventDataFn extends DoFn<String, KV<String, String>> { - private static final long serialVersionUID = 0; - - @Override - public void processElement(ProcessContext c) { - String line = c.element().toLowerCase(); - String key = line.split("\\s")[0]; - c.output(KV.of(key, line)); - } - } - - private interface Options extends WindowedWordCount.StreamingWordCountOptions { - - } - - public static void main(String[] args) throws Exception { - Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class); - options.setStreaming(true); - options.setCheckpointingInterval(1000L); - options.setNumberOfExecutionRetries(5); - options.setExecutionRetryDelay(3000L); - options.setRunner(FlinkPipelineRunner.class); - - PTransform<? super PBegin, PCollection<String>> readSourceA = - Read.from(new UnboundedSocketSource<>("localhost", 9999, '\n', 3)).named("FirstStream"); - PTransform<? super PBegin, PCollection<String>> readSourceB = - Read.from(new UnboundedSocketSource<>("localhost", 9998, '\n', 3)).named("SecondStream"); - - WindowFn<Object, ?> windowFn = FixedWindows.of(Duration.standardSeconds(options.getWindowSize())); - - Pipeline p = Pipeline.create(options); - - // the following two 'applys' create multiple inputs to our pipeline, one for each - // of our two input sources. - PCollection<String> streamA = p.apply(readSourceA) - .apply(Window.<String>into(windowFn) - .triggering(AfterWatermark.pastEndOfWindow()).withAllowedLateness(Duration.ZERO) - .discardingFiredPanes()); - PCollection<String> streamB = p.apply(readSourceB) - .apply(Window.<String>into(windowFn) - .triggering(AfterWatermark.pastEndOfWindow()).withAllowedLateness(Duration.ZERO) - .discardingFiredPanes()); - - PCollection<String> formattedResults = joinEvents(streamA, streamB); - formattedResults.apply(TextIO.Write.to("./outputJoin.txt")); - p.run(); - } + static PCollection<String> joinEvents(PCollection<String> streamA, + PCollection<String> streamB) throws Exception { + + final TupleTag<String> firstInfoTag = new TupleTag<>(); + final TupleTag<String> secondInfoTag = new TupleTag<>(); + + // transform both input collections to tuple collections, where the keys are country + // codes in both cases. + PCollection<KV<String, String>> firstInfo = streamA.apply( + ParDo.of(new ExtractEventDataFn())); + PCollection<KV<String, String>> secondInfo = streamB.apply( + ParDo.of(new ExtractEventDataFn())); + + // country code 'key' -> CGBKR (<event info>, <country name>) + PCollection<KV<String, CoGbkResult>> kvpCollection = KeyedPCollectionTuple + .of(firstInfoTag, firstInfo) + .and(secondInfoTag, secondInfo) + .apply(CoGroupByKey.<String>create()); + + // Process the CoGbkResult elements generated by the CoGroupByKey transform. + // country code 'key' -> string of <event info>, <country name> + PCollection<KV<String, String>> finalResultCollection = + kvpCollection.apply(ParDo.named("Process").of( + new DoFn<KV<String, CoGbkResult>, KV<String, String>>() { + private static final long serialVersionUID = 0; + + @Override + public void processElement(ProcessContext c) { + KV<String, CoGbkResult> e = c.element(); + String key = e.getKey(); + + String defaultA = "NO_VALUE"; + + // the following getOnly is a bit tricky because it expects to have + // EXACTLY ONE value in the corresponding stream and for the corresponding key. + + String lineA = e.getValue().getOnly(firstInfoTag, defaultA); + for (String lineB : c.element().getValue().getAll(secondInfoTag)) { + // Generate a string that combines information from both collection values + c.output(KV.of(key, "Value A: " + lineA + " - Value B: " + lineB)); + } + } + })); + + return finalResultCollection + .apply(ParDo.named("Format").of(new DoFn<KV<String, String>, String>() { + private static final long serialVersionUID = 0; + + @Override + public void processElement(ProcessContext c) { + String result = c.element().getKey() + " -> " + c.element().getValue(); + System.out.println(result); + c.output(result); + } + })); + } + + static class ExtractEventDataFn extends DoFn<String, KV<String, String>> { + private static final long serialVersionUID = 0; + + @Override + public void processElement(ProcessContext c) { + String line = c.element().toLowerCase(); + String key = line.split("\\s")[0]; + c.output(KV.of(key, line)); + } + } + + private interface Options extends WindowedWordCount.StreamingWordCountOptions { + + } + + public static void main(String[] args) throws Exception { + Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class); + options.setStreaming(true); + options.setCheckpointingInterval(1000L); + options.setNumberOfExecutionRetries(5); + options.setExecutionRetryDelay(3000L); + options.setRunner(FlinkPipelineRunner.class); + + PTransform<? super PBegin, PCollection<String>> readSourceA = + Read.from(new UnboundedSocketSource<>("localhost", 9999, '\n', 3)).named("FirstStream"); + PTransform<? super PBegin, PCollection<String>> readSourceB = + Read.from(new UnboundedSocketSource<>("localhost", 9998, '\n', 3)).named("SecondStream"); + + WindowFn<Object, ?> windowFn = FixedWindows.of(Duration.standardSeconds(options.getWindowSize())); + + Pipeline p = Pipeline.create(options); + + // the following two 'applys' create multiple inputs to our pipeline, one for each + // of our two input sources. + PCollection<String> streamA = p.apply(readSourceA) + .apply(Window.<String>into(windowFn) + .triggering(AfterWatermark.pastEndOfWindow()).withAllowedLateness(Duration.ZERO) + .discardingFiredPanes()); + PCollection<String> streamB = p.apply(readSourceB) + .apply(Window.<String>into(windowFn) + .triggering(AfterWatermark.pastEndOfWindow()).withAllowedLateness(Duration.ZERO) + .discardingFiredPanes()); + + PCollection<String> formattedResults = joinEvents(streamA, streamB); + formattedResults.apply(TextIO.Write.to("./outputJoin.txt")); + p.run(); + } }
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/KafkaWindowedWordCountExample.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/KafkaWindowedWordCountExample.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/KafkaWindowedWordCountExample.java index dba2721..b97c35c 100644 --- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/KafkaWindowedWordCountExample.java +++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/KafkaWindowedWordCountExample.java @@ -36,106 +36,106 @@ import java.util.Properties; public class KafkaWindowedWordCountExample { - static final String KAFKA_TOPIC = "test"; // Default kafka topic to read from - static final String KAFKA_BROKER = "localhost:9092"; // Default kafka broker to contact - static final String GROUP_ID = "myGroup"; // Default groupId - static final String ZOOKEEPER = "localhost:2181"; // Default zookeeper to connect to for Kafka - - public static class ExtractWordsFn extends DoFn<String, String> { - private final Aggregator<Long, Long> emptyLines = - createAggregator("emptyLines", new Sum.SumLongFn()); - - @Override - public void processElement(ProcessContext c) { - if (c.element().trim().isEmpty()) { - emptyLines.addValue(1L); - } - - // Split the line into words. - String[] words = c.element().split("[^a-zA-Z']+"); - - // Output each word encountered into the output PCollection. - for (String word : words) { - if (!word.isEmpty()) { - c.output(word); - } - } - } - } - - public static class FormatAsStringFn extends DoFn<KV<String, Long>, String> { - @Override - public void processElement(ProcessContext c) { - String row = c.element().getKey() + " - " + c.element().getValue() + " @ " + c.timestamp().toString(); - System.out.println(row); - c.output(row); - } - } - - public interface KafkaStreamingWordCountOptions extends WindowedWordCount.StreamingWordCountOptions { - @Description("The Kafka topic to read from") - @Default.String(KAFKA_TOPIC) - String getKafkaTopic(); - - void setKafkaTopic(String value); - - @Description("The Kafka Broker to read from") - @Default.String(KAFKA_BROKER) - String getBroker(); - - void setBroker(String value); - - @Description("The Zookeeper server to connect to") - @Default.String(ZOOKEEPER) - String getZookeeper(); - - void setZookeeper(String value); - - @Description("The groupId") - @Default.String(GROUP_ID) - String getGroup(); - - void setGroup(String value); - - } - - public static void main(String[] args) { - PipelineOptionsFactory.register(KafkaStreamingWordCountOptions.class); - KafkaStreamingWordCountOptions options = PipelineOptionsFactory.fromArgs(args).as(KafkaStreamingWordCountOptions.class); - options.setJobName("KafkaExample"); - options.setStreaming(true); - options.setCheckpointingInterval(1000L); - options.setNumberOfExecutionRetries(5); - options.setExecutionRetryDelay(3000L); - options.setRunner(FlinkPipelineRunner.class); - - System.out.println(options.getKafkaTopic() +" "+ options.getZookeeper() +" "+ options.getBroker() +" "+ options.getGroup() ); - Pipeline pipeline = Pipeline.create(options); - - Properties p = new Properties(); - p.setProperty("zookeeper.connect", options.getZookeeper()); - p.setProperty("bootstrap.servers", options.getBroker()); - p.setProperty("group.id", options.getGroup()); - - // this is the Flink consumer that reads the input to - // the program from a kafka topic. - FlinkKafkaConsumer082 kafkaConsumer = new FlinkKafkaConsumer082<>( - options.getKafkaTopic(), - new SimpleStringSchema(), p); - - PCollection<String> words = pipeline - .apply(Read.from(new UnboundedFlinkSource<String, UnboundedSource.CheckpointMark>(options, kafkaConsumer)).named("StreamingWordCount")) - .apply(ParDo.of(new ExtractWordsFn())) - .apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(options.getWindowSize()))) - .triggering(AfterWatermark.pastEndOfWindow()).withAllowedLateness(Duration.ZERO) - .discardingFiredPanes()); - - PCollection<KV<String, Long>> wordCounts = - words.apply(Count.<String>perElement()); - - wordCounts.apply(ParDo.of(new FormatAsStringFn())) - .apply(TextIO.Write.to("./outputKafka.txt")); - - pipeline.run(); - } + static final String KAFKA_TOPIC = "test"; // Default kafka topic to read from + static final String KAFKA_BROKER = "localhost:9092"; // Default kafka broker to contact + static final String GROUP_ID = "myGroup"; // Default groupId + static final String ZOOKEEPER = "localhost:2181"; // Default zookeeper to connect to for Kafka + + public static class ExtractWordsFn extends DoFn<String, String> { + private final Aggregator<Long, Long> emptyLines = + createAggregator("emptyLines", new Sum.SumLongFn()); + + @Override + public void processElement(ProcessContext c) { + if (c.element().trim().isEmpty()) { + emptyLines.addValue(1L); + } + + // Split the line into words. + String[] words = c.element().split("[^a-zA-Z']+"); + + // Output each word encountered into the output PCollection. + for (String word : words) { + if (!word.isEmpty()) { + c.output(word); + } + } + } + } + + public static class FormatAsStringFn extends DoFn<KV<String, Long>, String> { + @Override + public void processElement(ProcessContext c) { + String row = c.element().getKey() + " - " + c.element().getValue() + " @ " + c.timestamp().toString(); + System.out.println(row); + c.output(row); + } + } + + public interface KafkaStreamingWordCountOptions extends WindowedWordCount.StreamingWordCountOptions { + @Description("The Kafka topic to read from") + @Default.String(KAFKA_TOPIC) + String getKafkaTopic(); + + void setKafkaTopic(String value); + + @Description("The Kafka Broker to read from") + @Default.String(KAFKA_BROKER) + String getBroker(); + + void setBroker(String value); + + @Description("The Zookeeper server to connect to") + @Default.String(ZOOKEEPER) + String getZookeeper(); + + void setZookeeper(String value); + + @Description("The groupId") + @Default.String(GROUP_ID) + String getGroup(); + + void setGroup(String value); + + } + + public static void main(String[] args) { + PipelineOptionsFactory.register(KafkaStreamingWordCountOptions.class); + KafkaStreamingWordCountOptions options = PipelineOptionsFactory.fromArgs(args).as(KafkaStreamingWordCountOptions.class); + options.setJobName("KafkaExample"); + options.setStreaming(true); + options.setCheckpointingInterval(1000L); + options.setNumberOfExecutionRetries(5); + options.setExecutionRetryDelay(3000L); + options.setRunner(FlinkPipelineRunner.class); + + System.out.println(options.getKafkaTopic() +" "+ options.getZookeeper() +" "+ options.getBroker() +" "+ options.getGroup() ); + Pipeline pipeline = Pipeline.create(options); + + Properties p = new Properties(); + p.setProperty("zookeeper.connect", options.getZookeeper()); + p.setProperty("bootstrap.servers", options.getBroker()); + p.setProperty("group.id", options.getGroup()); + + // this is the Flink consumer that reads the input to + // the program from a kafka topic. + FlinkKafkaConsumer082 kafkaConsumer = new FlinkKafkaConsumer082<>( + options.getKafkaTopic(), + new SimpleStringSchema(), p); + + PCollection<String> words = pipeline + .apply(Read.from(new UnboundedFlinkSource<String, UnboundedSource.CheckpointMark>(options, kafkaConsumer)).named("StreamingWordCount")) + .apply(ParDo.of(new ExtractWordsFn())) + .apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(options.getWindowSize()))) + .triggering(AfterWatermark.pastEndOfWindow()).withAllowedLateness(Duration.ZERO) + .discardingFiredPanes()); + + PCollection<KV<String, Long>> wordCounts = + words.apply(Count.<String>perElement()); + + wordCounts.apply(ParDo.of(new FormatAsStringFn())) + .apply(TextIO.Write.to("./outputKafka.txt")); + + pipeline.run(); + } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/WindowedWordCount.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/WindowedWordCount.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/WindowedWordCount.java index 37dc39a..753cbc3 100644 --- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/WindowedWordCount.java +++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/WindowedWordCount.java @@ -45,84 +45,84 @@ import java.io.IOException; * */ public class WindowedWordCount { - private static final Logger LOG = LoggerFactory.getLogger(WindowedWordCount.class); - - static final long WINDOW_SIZE = 10; // Default window duration in seconds - static final long SLIDE_SIZE = 5; // Default window slide in seconds - - static class FormatAsStringFn extends DoFn<KV<String, Long>, String> { - @Override - public void processElement(ProcessContext c) { - String row = c.element().getKey() + " - " + c.element().getValue() + " @ " + c.timestamp().toString(); - c.output(row); - } - } - - static class ExtractWordsFn extends DoFn<String, String> { - private final Aggregator<Long, Long> emptyLines = - createAggregator("emptyLines", new Sum.SumLongFn()); - - @Override - public void processElement(ProcessContext c) { - if (c.element().trim().isEmpty()) { - emptyLines.addValue(1L); - } - - // Split the line into words. - String[] words = c.element().split("[^a-zA-Z']+"); - - // Output each word encountered into the output PCollection. - for (String word : words) { - if (!word.isEmpty()) { - c.output(word); - } - } - } - } - - public interface StreamingWordCountOptions extends com.dataartisans.flink.dataflow.examples.WordCount.Options { - @Description("Sliding window duration, in seconds") - @Default.Long(WINDOW_SIZE) - Long getWindowSize(); - - void setWindowSize(Long value); - - @Description("Window slide, in seconds") - @Default.Long(SLIDE_SIZE) - Long getSlide(); - - void setSlide(Long value); - } - - public static void main(String[] args) throws IOException { - StreamingWordCountOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(StreamingWordCountOptions.class); - options.setStreaming(true); - options.setWindowSize(10L); - options.setSlide(5L); - options.setCheckpointingInterval(1000L); - options.setNumberOfExecutionRetries(5); - options.setExecutionRetryDelay(3000L); - options.setRunner(FlinkPipelineRunner.class); - - LOG.info("Windpwed WordCount with Sliding Windows of " + options.getWindowSize() + - " sec. and a slide of " + options.getSlide()); - - Pipeline pipeline = Pipeline.create(options); - - PCollection<String> words = pipeline - .apply(Read.from(new UnboundedSocketSource<>("localhost", 9999, '\n', 3)).named("StreamingWordCount")) - .apply(ParDo.of(new ExtractWordsFn())) - .apply(Window.<String>into(SlidingWindows.of(Duration.standardSeconds(options.getWindowSize())) - .every(Duration.standardSeconds(options.getSlide()))) - .triggering(AfterWatermark.pastEndOfWindow()).withAllowedLateness(Duration.ZERO) - .discardingFiredPanes()); - - PCollection<KV<String, Long>> wordCounts = - words.apply(Count.<String>perElement()); - - wordCounts.apply(ParDo.of(new FormatAsStringFn())) - .apply(TextIO.Write.to("./outputWordCount.txt")); - - pipeline.run(); - } + private static final Logger LOG = LoggerFactory.getLogger(WindowedWordCount.class); + + static final long WINDOW_SIZE = 10; // Default window duration in seconds + static final long SLIDE_SIZE = 5; // Default window slide in seconds + + static class FormatAsStringFn extends DoFn<KV<String, Long>, String> { + @Override + public void processElement(ProcessContext c) { + String row = c.element().getKey() + " - " + c.element().getValue() + " @ " + c.timestamp().toString(); + c.output(row); + } + } + + static class ExtractWordsFn extends DoFn<String, String> { + private final Aggregator<Long, Long> emptyLines = + createAggregator("emptyLines", new Sum.SumLongFn()); + + @Override + public void processElement(ProcessContext c) { + if (c.element().trim().isEmpty()) { + emptyLines.addValue(1L); + } + + // Split the line into words. + String[] words = c.element().split("[^a-zA-Z']+"); + + // Output each word encountered into the output PCollection. + for (String word : words) { + if (!word.isEmpty()) { + c.output(word); + } + } + } + } + + public interface StreamingWordCountOptions extends com.dataartisans.flink.dataflow.examples.WordCount.Options { + @Description("Sliding window duration, in seconds") + @Default.Long(WINDOW_SIZE) + Long getWindowSize(); + + void setWindowSize(Long value); + + @Description("Window slide, in seconds") + @Default.Long(SLIDE_SIZE) + Long getSlide(); + + void setSlide(Long value); + } + + public static void main(String[] args) throws IOException { + StreamingWordCountOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(StreamingWordCountOptions.class); + options.setStreaming(true); + options.setWindowSize(10L); + options.setSlide(5L); + options.setCheckpointingInterval(1000L); + options.setNumberOfExecutionRetries(5); + options.setExecutionRetryDelay(3000L); + options.setRunner(FlinkPipelineRunner.class); + + LOG.info("Windpwed WordCount with Sliding Windows of " + options.getWindowSize() + + " sec. and a slide of " + options.getSlide()); + + Pipeline pipeline = Pipeline.create(options); + + PCollection<String> words = pipeline + .apply(Read.from(new UnboundedSocketSource<>("localhost", 9999, '\n', 3)).named("StreamingWordCount")) + .apply(ParDo.of(new ExtractWordsFn())) + .apply(Window.<String>into(SlidingWindows.of(Duration.standardSeconds(options.getWindowSize())) + .every(Duration.standardSeconds(options.getSlide()))) + .triggering(AfterWatermark.pastEndOfWindow()).withAllowedLateness(Duration.ZERO) + .discardingFiredPanes()); + + PCollection<KV<String, Long>> wordCounts = + words.apply(Count.<String>perElement()); + + wordCounts.apply(ParDo.of(new FormatAsStringFn())) + .apply(TextIO.Write.to("./outputWordCount.txt")); + + pipeline.run(); + } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/main/java/com/dataartisans/flink/dataflow/io/ConsoleIO.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/io/ConsoleIO.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/io/ConsoleIO.java index 90fb635..3f3492c 100644 --- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/io/ConsoleIO.java +++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/io/ConsoleIO.java @@ -28,53 +28,53 @@ import com.google.cloud.dataflow.sdk.values.PDone; */ public class ConsoleIO { - /** - * A PTransform that writes a PCollection to a standard output. - */ - public static class Write { + /** + * A PTransform that writes a PCollection to a standard output. + */ + public static class Write { - /** - * Returns a ConsoleIO.Write PTransform with a default step name. - */ - public static Bound create() { - return new Bound(); - } + /** + * Returns a ConsoleIO.Write PTransform with a default step name. + */ + public static Bound create() { + return new Bound(); + } - /** - * Returns a ConsoleIO.Write PTransform with the given step name. - */ - public static Bound named(String name) { - return new Bound().named(name); - } + /** + * Returns a ConsoleIO.Write PTransform with the given step name. + */ + public static Bound named(String name) { + return new Bound().named(name); + } - /** - * A PTransform that writes a bounded PCollection to standard output. - */ - public static class Bound extends PTransform<PCollection<?>, PDone> { - private static final long serialVersionUID = 0; + /** + * A PTransform that writes a bounded PCollection to standard output. + */ + public static class Bound extends PTransform<PCollection<?>, PDone> { + private static final long serialVersionUID = 0; - Bound() { - super("ConsoleIO.Write"); - } + Bound() { + super("ConsoleIO.Write"); + } - Bound(String name) { - super(name); - } + Bound(String name) { + super(name); + } - /** - * Returns a new ConsoleIO.Write PTransform that's like this one but with the given - * step - * name. Does not modify this object. - */ - public Bound named(String name) { - return new Bound(name); - } + /** + * Returns a new ConsoleIO.Write PTransform that's like this one but with the given + * step + * name. Does not modify this object. + */ + public Bound named(String name) { + return new Bound(name); + } - @Override - public PDone apply(PCollection<?> input) { - return PDone.in(input.getPipeline()); - } - } - } + @Override + public PDone apply(PCollection<?> input) { + return PDone.in(input.getPipeline()); + } + } + } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkBatchPipelineTranslator.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkBatchPipelineTranslator.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkBatchPipelineTranslator.java index a1e4410..82b7e97 100644 --- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkBatchPipelineTranslator.java +++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkBatchPipelineTranslator.java @@ -31,119 +31,119 @@ import org.apache.flink.api.java.ExecutionEnvironment; */ public class FlinkBatchPipelineTranslator extends FlinkPipelineTranslator { - /** - * The necessary context in the case of a batch job. - */ - private final FlinkBatchTranslationContext batchContext; - - private int depth = 0; - - /** - * Composite transform that we want to translate before proceeding with other transforms. - */ - private PTransform<?, ?> currentCompositeTransform; - - public FlinkBatchPipelineTranslator(ExecutionEnvironment env, PipelineOptions options) { - this.batchContext = new FlinkBatchTranslationContext(env, options); - } - - // -------------------------------------------------------------------------------------------- - // Pipeline Visitor Methods - // -------------------------------------------------------------------------------------------- - - @Override - public void enterCompositeTransform(TransformTreeNode node) { - System.out.println(genSpaces(this.depth) + "enterCompositeTransform- " + formatNodeName(node)); - - PTransform<?, ?> transform = node.getTransform(); - if (transform != null && currentCompositeTransform == null) { - - BatchTransformTranslator<?> translator = FlinkBatchTransformTranslators.getTranslator(transform); - if (translator != null) { - currentCompositeTransform = transform; - if (transform instanceof CoGroupByKey && node.getInput().expand().size() != 2) { - // we can only optimize CoGroupByKey for input size 2 - currentCompositeTransform = null; - } - } - } - this.depth++; - } - - @Override - public void leaveCompositeTransform(TransformTreeNode node) { - PTransform<?, ?> transform = node.getTransform(); - if (transform != null && currentCompositeTransform == transform) { - - BatchTransformTranslator<?> translator = FlinkBatchTransformTranslators.getTranslator(transform); - if (translator != null) { - System.out.println(genSpaces(this.depth) + "doingCompositeTransform- " + formatNodeName(node)); - applyBatchTransform(transform, node, translator); - currentCompositeTransform = null; - } else { - throw new IllegalStateException("Attempted to translate composite transform " + - "but no translator was found: " + currentCompositeTransform); - } - } - this.depth--; - System.out.println(genSpaces(this.depth) + "leaveCompositeTransform- " + formatNodeName(node)); - } - - @Override - public void visitTransform(TransformTreeNode node) { - System.out.println(genSpaces(this.depth) + "visitTransform- " + formatNodeName(node)); - if (currentCompositeTransform != null) { - // ignore it - return; - } - - // get the transformation corresponding to hte node we are - // currently visiting and translate it into its Flink alternative. - - PTransform<?, ?> transform = node.getTransform(); - BatchTransformTranslator<?> translator = FlinkBatchTransformTranslators.getTranslator(transform); - if (translator == null) { - System.out.println(node.getTransform().getClass()); - throw new UnsupportedOperationException("The transform " + transform + " is currently not supported."); - } - applyBatchTransform(transform, node, translator); - } - - @Override - public void visitValue(PValue value, TransformTreeNode producer) { - // do nothing here - } - - private <T extends PTransform<?, ?>> void applyBatchTransform(PTransform<?, ?> transform, TransformTreeNode node, BatchTransformTranslator<?> translator) { - - @SuppressWarnings("unchecked") - T typedTransform = (T) transform; - - @SuppressWarnings("unchecked") - BatchTransformTranslator<T> typedTranslator = (BatchTransformTranslator<T>) translator; - - // create the applied PTransform on the batchContext - batchContext.setCurrentTransform(AppliedPTransform.of( - node.getFullName(), node.getInput(), node.getOutput(), (PTransform) transform)); - typedTranslator.translateNode(typedTransform, batchContext); - } - - /** - * A translator of a {@link PTransform}. - */ - public interface BatchTransformTranslator<Type extends PTransform> { - void translateNode(Type transform, FlinkBatchTranslationContext context); - } - - private static String genSpaces(int n) { - String s = ""; - for (int i = 0; i < n; i++) { - s += "| "; - } - return s; - } - - private static String formatNodeName(TransformTreeNode node) { - return node.toString().split("@")[1] + node.getTransform(); - } + /** + * The necessary context in the case of a batch job. + */ + private final FlinkBatchTranslationContext batchContext; + + private int depth = 0; + + /** + * Composite transform that we want to translate before proceeding with other transforms. + */ + private PTransform<?, ?> currentCompositeTransform; + + public FlinkBatchPipelineTranslator(ExecutionEnvironment env, PipelineOptions options) { + this.batchContext = new FlinkBatchTranslationContext(env, options); + } + + // -------------------------------------------------------------------------------------------- + // Pipeline Visitor Methods + // -------------------------------------------------------------------------------------------- + + @Override + public void enterCompositeTransform(TransformTreeNode node) { + System.out.println(genSpaces(this.depth) + "enterCompositeTransform- " + formatNodeName(node)); + + PTransform<?, ?> transform = node.getTransform(); + if (transform != null && currentCompositeTransform == null) { + + BatchTransformTranslator<?> translator = FlinkBatchTransformTranslators.getTranslator(transform); + if (translator != null) { + currentCompositeTransform = transform; + if (transform instanceof CoGroupByKey && node.getInput().expand().size() != 2) { + // we can only optimize CoGroupByKey for input size 2 + currentCompositeTransform = null; + } + } + } + this.depth++; + } + + @Override + public void leaveCompositeTransform(TransformTreeNode node) { + PTransform<?, ?> transform = node.getTransform(); + if (transform != null && currentCompositeTransform == transform) { + + BatchTransformTranslator<?> translator = FlinkBatchTransformTranslators.getTranslator(transform); + if (translator != null) { + System.out.println(genSpaces(this.depth) + "doingCompositeTransform- " + formatNodeName(node)); + applyBatchTransform(transform, node, translator); + currentCompositeTransform = null; + } else { + throw new IllegalStateException("Attempted to translate composite transform " + + "but no translator was found: " + currentCompositeTransform); + } + } + this.depth--; + System.out.println(genSpaces(this.depth) + "leaveCompositeTransform- " + formatNodeName(node)); + } + + @Override + public void visitTransform(TransformTreeNode node) { + System.out.println(genSpaces(this.depth) + "visitTransform- " + formatNodeName(node)); + if (currentCompositeTransform != null) { + // ignore it + return; + } + + // get the transformation corresponding to hte node we are + // currently visiting and translate it into its Flink alternative. + + PTransform<?, ?> transform = node.getTransform(); + BatchTransformTranslator<?> translator = FlinkBatchTransformTranslators.getTranslator(transform); + if (translator == null) { + System.out.println(node.getTransform().getClass()); + throw new UnsupportedOperationException("The transform " + transform + " is currently not supported."); + } + applyBatchTransform(transform, node, translator); + } + + @Override + public void visitValue(PValue value, TransformTreeNode producer) { + // do nothing here + } + + private <T extends PTransform<?, ?>> void applyBatchTransform(PTransform<?, ?> transform, TransformTreeNode node, BatchTransformTranslator<?> translator) { + + @SuppressWarnings("unchecked") + T typedTransform = (T) transform; + + @SuppressWarnings("unchecked") + BatchTransformTranslator<T> typedTranslator = (BatchTransformTranslator<T>) translator; + + // create the applied PTransform on the batchContext + batchContext.setCurrentTransform(AppliedPTransform.of( + node.getFullName(), node.getInput(), node.getOutput(), (PTransform) transform)); + typedTranslator.translateNode(typedTransform, batchContext); + } + + /** + * A translator of a {@link PTransform}. + */ + public interface BatchTransformTranslator<Type extends PTransform> { + void translateNode(Type transform, FlinkBatchTranslationContext context); + } + + private static String genSpaces(int n) { + String s = ""; + for (int i = 0; i < n; i++) { + s += "| "; + } + return s; + } + + private static String formatNodeName(TransformTreeNode node) { + return node.toString().split("@")[1] + node.getTransform(); + } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkBatchTransformTranslators.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkBatchTransformTranslators.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkBatchTransformTranslators.java index 0e45a21..6a8409c 100644 --- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkBatchTransformTranslators.java +++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkBatchTransformTranslators.java @@ -93,502 +93,502 @@ import java.util.Map; */ public class FlinkBatchTransformTranslators { - // -------------------------------------------------------------------------------------------- - // Transform Translator Registry - // -------------------------------------------------------------------------------------------- - - @SuppressWarnings("rawtypes") - private static final Map<Class<? extends PTransform>, FlinkBatchPipelineTranslator.BatchTransformTranslator> TRANSLATORS = new HashMap<>(); + // -------------------------------------------------------------------------------------------- + // Transform Translator Registry + // -------------------------------------------------------------------------------------------- + + @SuppressWarnings("rawtypes") + private static final Map<Class<? extends PTransform>, FlinkBatchPipelineTranslator.BatchTransformTranslator> TRANSLATORS = new HashMap<>(); - // register the known translators - static { - TRANSLATORS.put(View.CreatePCollectionView.class, new CreatePCollectionViewTranslatorBatch()); + // register the known translators + static { + TRANSLATORS.put(View.CreatePCollectionView.class, new CreatePCollectionViewTranslatorBatch()); - TRANSLATORS.put(Combine.PerKey.class, new CombinePerKeyTranslatorBatch()); - // we don't need this because we translate the Combine.PerKey directly - //TRANSLATORS.put(Combine.GroupedValues.class, new CombineGroupedValuesTranslator()); + TRANSLATORS.put(Combine.PerKey.class, new CombinePerKeyTranslatorBatch()); + // we don't need this because we translate the Combine.PerKey directly + //TRANSLATORS.put(Combine.GroupedValues.class, new CombineGroupedValuesTranslator()); - TRANSLATORS.put(Create.Values.class, new CreateTranslatorBatch()); + TRANSLATORS.put(Create.Values.class, new CreateTranslatorBatch()); - TRANSLATORS.put(Flatten.FlattenPCollectionList.class, new FlattenPCollectionTranslatorBatch()); + TRANSLATORS.put(Flatten.FlattenPCollectionList.class, new FlattenPCollectionTranslatorBatch()); - TRANSLATORS.put(GroupByKey.GroupByKeyOnly.class, new GroupByKeyOnlyTranslatorBatch()); - // TODO we're currently ignoring windows here but that has to change in the future - TRANSLATORS.put(GroupByKey.class, new GroupByKeyTranslatorBatch()); - - TRANSLATORS.put(ParDo.BoundMulti.class, new ParDoBoundMultiTranslatorBatch()); - TRANSLATORS.put(ParDo.Bound.class, new ParDoBoundTranslatorBatch()); - - TRANSLATORS.put(CoGroupByKey.class, new CoGroupByKeyTranslatorBatch()); - - TRANSLATORS.put(AvroIO.Read.Bound.class, new AvroIOReadTranslatorBatch()); - TRANSLATORS.put(AvroIO.Write.Bound.class, new AvroIOWriteTranslatorBatch()); - - TRANSLATORS.put(Read.Bounded.class, new ReadSourceTranslatorBatch()); - TRANSLATORS.put(Write.Bound.class, new WriteSinkTranslatorBatch()); - - TRANSLATORS.put(TextIO.Read.Bound.class, new TextIOReadTranslatorBatch()); - TRANSLATORS.put(TextIO.Write.Bound.class, new TextIOWriteTranslatorBatch()); - - // Flink-specific - TRANSLATORS.put(ConsoleIO.Write.Bound.class, new ConsoleIOWriteTranslatorBatch()); - - } - - - public static FlinkBatchPipelineTranslator.BatchTransformTranslator<?> getTranslator(PTransform<?, ?> transform) { - return TRANSLATORS.get(transform.getClass()); - } - - private static class ReadSourceTranslatorBatch<T> implements FlinkBatchPipelineTranslator.BatchTransformTranslator<Read.Bounded<T>> { - - @Override - public void translateNode(Read.Bounded<T> transform, FlinkBatchTranslationContext context) { - String name = transform.getName(); - BoundedSource<T> source = transform.getSource(); - PCollection<T> output = context.getOutput(transform); - Coder<T> coder = output.getCoder(); - - TypeInformation<T> typeInformation = context.getTypeInfo(output); - - DataSource<T> dataSource = new DataSource<>(context.getExecutionEnvironment(), - new SourceInputFormat<>(source, context.getPipelineOptions()), typeInformation, name); - - context.setOutputDataSet(output, dataSource); - } - } - - private static class AvroIOReadTranslatorBatch<T> implements FlinkBatchPipelineTranslator.BatchTransformTranslator<AvroIO.Read.Bound<T>> { - private static final Logger LOG = LoggerFactory.getLogger(AvroIOReadTranslatorBatch.class); - - @Override - public void translateNode(AvroIO.Read.Bound<T> transform, FlinkBatchTranslationContext context) { - String path = transform.getFilepattern(); - String name = transform.getName(); -// Schema schema = transform.getSchema(); - PValue output = context.getOutput(transform); - - TypeInformation<T> typeInformation = context.getTypeInfo(output); - - // This is super hacky, but unfortunately we cannot get the type otherwise - Class<T> extractedAvroType; - try { - Field typeField = transform.getClass().getDeclaredField("type"); - typeField.setAccessible(true); - @SuppressWarnings("unchecked") - Class<T> avroType = (Class<T>) typeField.get(transform); - extractedAvroType = avroType; - } catch (NoSuchFieldException | IllegalAccessException e) { - // we know that the field is there and it is accessible - throw new RuntimeException("Could not access type from AvroIO.Bound", e); - } - - DataSource<T> source = new DataSource<>(context.getExecutionEnvironment(), - new AvroInputFormat<>(new Path(path), extractedAvroType), - typeInformation, name); - - context.setOutputDataSet(output, source); - } - } - - private static class AvroIOWriteTranslatorBatch<T> implements FlinkBatchPipelineTranslator.BatchTransformTranslator<AvroIO.Write.Bound<T>> { - private static final Logger LOG = LoggerFactory.getLogger(AvroIOWriteTranslatorBatch.class); - - @Override - public void translateNode(AvroIO.Write.Bound<T> transform, FlinkBatchTranslationContext context) { - DataSet<T> inputDataSet = context.getInputDataSet(context.getInput(transform)); - String filenamePrefix = transform.getFilenamePrefix(); - String filenameSuffix = transform.getFilenameSuffix(); - int numShards = transform.getNumShards(); - String shardNameTemplate = transform.getShardNameTemplate(); - - // TODO: Implement these. We need Flink support for this. - LOG.warn("Translation of TextIO.Write.filenameSuffix not yet supported. Is: {}.", - filenameSuffix); - LOG.warn("Translation of TextIO.Write.shardNameTemplate not yet supported. Is: {}.", shardNameTemplate); - - // This is super hacky, but unfortunately we cannot get the type otherwise - Class<T> extractedAvroType; - try { - Field typeField = transform.getClass().getDeclaredField("type"); - typeField.setAccessible(true); - @SuppressWarnings("unchecked") - Class<T> avroType = (Class<T>) typeField.get(transform); - extractedAvroType = avroType; - } catch (NoSuchFieldException | IllegalAccessException e) { - // we know that the field is there and it is accessible - throw new RuntimeException("Could not access type from AvroIO.Bound", e); - } - - DataSink<T> dataSink = inputDataSet.output(new AvroOutputFormat<>(new Path - (filenamePrefix), extractedAvroType)); - - if (numShards > 0) { - dataSink.setParallelism(numShards); - } - } - } - - private static class TextIOReadTranslatorBatch implements FlinkBatchPipelineTranslator.BatchTransformTranslator<TextIO.Read.Bound<String>> { - private static final Logger LOG = LoggerFactory.getLogger(TextIOReadTranslatorBatch.class); - - @Override - public void translateNode(TextIO.Read.Bound<String> transform, FlinkBatchTranslationContext context) { - String path = transform.getFilepattern(); - String name = transform.getName(); - - TextIO.CompressionType compressionType = transform.getCompressionType(); - boolean needsValidation = transform.needsValidation(); - - // TODO: Implement these. We need Flink support for this. - LOG.warn("Translation of TextIO.CompressionType not yet supported. Is: {}.", compressionType); - LOG.warn("Translation of TextIO.Read.needsValidation not yet supported. Is: {}.", needsValidation); - - PValue output = context.getOutput(transform); - - TypeInformation<String> typeInformation = context.getTypeInfo(output); - DataSource<String> source = new DataSource<>(context.getExecutionEnvironment(), new TextInputFormat(new Path(path)), typeInformation, name); - - context.setOutputDataSet(output, source); - } - } - - private static class TextIOWriteTranslatorBatch<T> implements FlinkBatchPipelineTranslator.BatchTransformTranslator<TextIO.Write.Bound<T>> { - private static final Logger LOG = LoggerFactory.getLogger(TextIOWriteTranslatorBatch.class); - - @Override - public void translateNode(TextIO.Write.Bound<T> transform, FlinkBatchTranslationContext context) { - PValue input = context.getInput(transform); - DataSet<T> inputDataSet = context.getInputDataSet(input); - - String filenamePrefix = transform.getFilenamePrefix(); - String filenameSuffix = transform.getFilenameSuffix(); - boolean needsValidation = transform.needsValidation(); - int numShards = transform.getNumShards(); - String shardNameTemplate = transform.getShardNameTemplate(); - - // TODO: Implement these. We need Flink support for this. - LOG.warn("Translation of TextIO.Write.needsValidation not yet supported. Is: {}.", needsValidation); - LOG.warn("Translation of TextIO.Write.filenameSuffix not yet supported. Is: {}.", filenameSuffix); - LOG.warn("Translation of TextIO.Write.shardNameTemplate not yet supported. Is: {}.", shardNameTemplate); - - //inputDataSet.print(); - DataSink<T> dataSink = inputDataSet.writeAsText(filenamePrefix); - - if (numShards > 0) { - dataSink.setParallelism(numShards); - } - } - } - - private static class ConsoleIOWriteTranslatorBatch implements FlinkBatchPipelineTranslator.BatchTransformTranslator<ConsoleIO.Write.Bound> { - @Override - public void translateNode(ConsoleIO.Write.Bound transform, FlinkBatchTranslationContext context) { - PValue input = context.getInput(transform); - DataSet<?> inputDataSet = context.getInputDataSet(input); - inputDataSet.printOnTaskManager(transform.getName()); - } - } + TRANSLATORS.put(GroupByKey.GroupByKeyOnly.class, new GroupByKeyOnlyTranslatorBatch()); + // TODO we're currently ignoring windows here but that has to change in the future + TRANSLATORS.put(GroupByKey.class, new GroupByKeyTranslatorBatch()); + + TRANSLATORS.put(ParDo.BoundMulti.class, new ParDoBoundMultiTranslatorBatch()); + TRANSLATORS.put(ParDo.Bound.class, new ParDoBoundTranslatorBatch()); + + TRANSLATORS.put(CoGroupByKey.class, new CoGroupByKeyTranslatorBatch()); + + TRANSLATORS.put(AvroIO.Read.Bound.class, new AvroIOReadTranslatorBatch()); + TRANSLATORS.put(AvroIO.Write.Bound.class, new AvroIOWriteTranslatorBatch()); + + TRANSLATORS.put(Read.Bounded.class, new ReadSourceTranslatorBatch()); + TRANSLATORS.put(Write.Bound.class, new WriteSinkTranslatorBatch()); + + TRANSLATORS.put(TextIO.Read.Bound.class, new TextIOReadTranslatorBatch()); + TRANSLATORS.put(TextIO.Write.Bound.class, new TextIOWriteTranslatorBatch()); + + // Flink-specific + TRANSLATORS.put(ConsoleIO.Write.Bound.class, new ConsoleIOWriteTranslatorBatch()); + + } + + + public static FlinkBatchPipelineTranslator.BatchTransformTranslator<?> getTranslator(PTransform<?, ?> transform) { + return TRANSLATORS.get(transform.getClass()); + } + + private static class ReadSourceTranslatorBatch<T> implements FlinkBatchPipelineTranslator.BatchTransformTranslator<Read.Bounded<T>> { + + @Override + public void translateNode(Read.Bounded<T> transform, FlinkBatchTranslationContext context) { + String name = transform.getName(); + BoundedSource<T> source = transform.getSource(); + PCollection<T> output = context.getOutput(transform); + Coder<T> coder = output.getCoder(); + + TypeInformation<T> typeInformation = context.getTypeInfo(output); + + DataSource<T> dataSource = new DataSource<>(context.getExecutionEnvironment(), + new SourceInputFormat<>(source, context.getPipelineOptions()), typeInformation, name); + + context.setOutputDataSet(output, dataSource); + } + } + + private static class AvroIOReadTranslatorBatch<T> implements FlinkBatchPipelineTranslator.BatchTransformTranslator<AvroIO.Read.Bound<T>> { + private static final Logger LOG = LoggerFactory.getLogger(AvroIOReadTranslatorBatch.class); + + @Override + public void translateNode(AvroIO.Read.Bound<T> transform, FlinkBatchTranslationContext context) { + String path = transform.getFilepattern(); + String name = transform.getName(); +// Schema schema = transform.getSchema(); + PValue output = context.getOutput(transform); + + TypeInformation<T> typeInformation = context.getTypeInfo(output); + + // This is super hacky, but unfortunately we cannot get the type otherwise + Class<T> extractedAvroType; + try { + Field typeField = transform.getClass().getDeclaredField("type"); + typeField.setAccessible(true); + @SuppressWarnings("unchecked") + Class<T> avroType = (Class<T>) typeField.get(transform); + extractedAvroType = avroType; + } catch (NoSuchFieldException | IllegalAccessException e) { + // we know that the field is there and it is accessible + throw new RuntimeException("Could not access type from AvroIO.Bound", e); + } + + DataSource<T> source = new DataSource<>(context.getExecutionEnvironment(), + new AvroInputFormat<>(new Path(path), extractedAvroType), + typeInformation, name); + + context.setOutputDataSet(output, source); + } + } + + private static class AvroIOWriteTranslatorBatch<T> implements FlinkBatchPipelineTranslator.BatchTransformTranslator<AvroIO.Write.Bound<T>> { + private static final Logger LOG = LoggerFactory.getLogger(AvroIOWriteTranslatorBatch.class); + + @Override + public void translateNode(AvroIO.Write.Bound<T> transform, FlinkBatchTranslationContext context) { + DataSet<T> inputDataSet = context.getInputDataSet(context.getInput(transform)); + String filenamePrefix = transform.getFilenamePrefix(); + String filenameSuffix = transform.getFilenameSuffix(); + int numShards = transform.getNumShards(); + String shardNameTemplate = transform.getShardNameTemplate(); + + // TODO: Implement these. We need Flink support for this. + LOG.warn("Translation of TextIO.Write.filenameSuffix not yet supported. Is: {}.", + filenameSuffix); + LOG.warn("Translation of TextIO.Write.shardNameTemplate not yet supported. Is: {}.", shardNameTemplate); + + // This is super hacky, but unfortunately we cannot get the type otherwise + Class<T> extractedAvroType; + try { + Field typeField = transform.getClass().getDeclaredField("type"); + typeField.setAccessible(true); + @SuppressWarnings("unchecked") + Class<T> avroType = (Class<T>) typeField.get(transform); + extractedAvroType = avroType; + } catch (NoSuchFieldException | IllegalAccessException e) { + // we know that the field is there and it is accessible + throw new RuntimeException("Could not access type from AvroIO.Bound", e); + } + + DataSink<T> dataSink = inputDataSet.output(new AvroOutputFormat<>(new Path + (filenamePrefix), extractedAvroType)); + + if (numShards > 0) { + dataSink.setParallelism(numShards); + } + } + } + + private static class TextIOReadTranslatorBatch implements FlinkBatchPipelineTranslator.BatchTransformTranslator<TextIO.Read.Bound<String>> { + private static final Logger LOG = LoggerFactory.getLogger(TextIOReadTranslatorBatch.class); + + @Override + public void translateNode(TextIO.Read.Bound<String> transform, FlinkBatchTranslationContext context) { + String path = transform.getFilepattern(); + String name = transform.getName(); + + TextIO.CompressionType compressionType = transform.getCompressionType(); + boolean needsValidation = transform.needsValidation(); + + // TODO: Implement these. We need Flink support for this. + LOG.warn("Translation of TextIO.CompressionType not yet supported. Is: {}.", compressionType); + LOG.warn("Translation of TextIO.Read.needsValidation not yet supported. Is: {}.", needsValidation); + + PValue output = context.getOutput(transform); + + TypeInformation<String> typeInformation = context.getTypeInfo(output); + DataSource<String> source = new DataSource<>(context.getExecutionEnvironment(), new TextInputFormat(new Path(path)), typeInformation, name); + + context.setOutputDataSet(output, source); + } + } + + private static class TextIOWriteTranslatorBatch<T> implements FlinkBatchPipelineTranslator.BatchTransformTranslator<TextIO.Write.Bound<T>> { + private static final Logger LOG = LoggerFactory.getLogger(TextIOWriteTranslatorBatch.class); + + @Override + public void translateNode(TextIO.Write.Bound<T> transform, FlinkBatchTranslationContext context) { + PValue input = context.getInput(transform); + DataSet<T> inputDataSet = context.getInputDataSet(input); + + String filenamePrefix = transform.getFilenamePrefix(); + String filenameSuffix = transform.getFilenameSuffix(); + boolean needsValidation = transform.needsValidation(); + int numShards = transform.getNumShards(); + String shardNameTemplate = transform.getShardNameTemplate(); + + // TODO: Implement these. We need Flink support for this. + LOG.warn("Translation of TextIO.Write.needsValidation not yet supported. Is: {}.", needsValidation); + LOG.warn("Translation of TextIO.Write.filenameSuffix not yet supported. Is: {}.", filenameSuffix); + LOG.warn("Translation of TextIO.Write.shardNameTemplate not yet supported. Is: {}.", shardNameTemplate); + + //inputDataSet.print(); + DataSink<T> dataSink = inputDataSet.writeAsText(filenamePrefix); + + if (numShards > 0) { + dataSink.setParallelism(numShards); + } + } + } + + private static class ConsoleIOWriteTranslatorBatch implements FlinkBatchPipelineTranslator.BatchTransformTranslator<ConsoleIO.Write.Bound> { + @Override + public void translateNode(ConsoleIO.Write.Bound transform, FlinkBatchTranslationContext context) { + PValue input = context.getInput(transform); + DataSet<?> inputDataSet = context.getInputDataSet(input); + inputDataSet.printOnTaskManager(transform.getName()); + } + } - private static class WriteSinkTranslatorBatch<T> implements FlinkBatchPipelineTranslator.BatchTransformTranslator<Write.Bound<T>> { - - @Override - public void translateNode(Write.Bound<T> transform, FlinkBatchTranslationContext context) { - String name = transform.getName(); - PValue input = context.getInput(transform); - DataSet<T> inputDataSet = context.getInputDataSet(input); + private static class WriteSinkTranslatorBatch<T> implements FlinkBatchPipelineTranslator.BatchTransformTranslator<Write.Bound<T>> { + + @Override + public void translateNode(Write.Bound<T> transform, FlinkBatchTranslationContext context) { + String name = transform.getName(); + PValue input = context.getInput(transform); + DataSet<T> inputDataSet = context.getInputDataSet(input); - inputDataSet.output(new SinkOutputFormat<>(transform, context.getPipelineOptions())).name(name); - } - } + inputDataSet.output(new SinkOutputFormat<>(transform, context.getPipelineOptions())).name(name); + } + } - private static class GroupByKeyOnlyTranslatorBatch<K, V> implements FlinkBatchPipelineTranslator.BatchTransformTranslator<GroupByKey.GroupByKeyOnly<K, V>> { + private static class GroupByKeyOnlyTranslatorBatch<K, V> implements FlinkBatchPipelineTranslator.BatchTransformTranslator<GroupByKey.GroupByKeyOnly<K, V>> { - @Override - public void translateNode(GroupByKey.GroupByKeyOnly<K, V> transform, FlinkBatchTranslationContext context) { - DataSet<KV<K, V>> inputDataSet = context.getInputDataSet(context.getInput(transform)); - GroupReduceFunction<KV<K, V>, KV<K, Iterable<V>>> groupReduceFunction = new FlinkKeyedListAggregationFunction<>(); + @Override + public void translateNode(GroupByKey.GroupByKeyOnly<K, V> transform, FlinkBatchTranslationContext context) { + DataSet<KV<K, V>> inputDataSet = context.getInputDataSet(context.getInput(transform)); + GroupReduceFunction<KV<K, V>, KV<K, Iterable<V>>> groupReduceFunction = new FlinkKeyedListAggregationFunction<>(); - TypeInformation<KV<K, Iterable<V>>> typeInformation = context.getTypeInfo(context.getOutput(transform)); + TypeInformation<KV<K, Iterable<V>>> typeInformation = context.getTypeInfo(context.getOutput(transform)); - Grouping<KV<K, V>> grouping = new UnsortedGrouping<>(inputDataSet, new Keys.ExpressionKeys<>(new String[]{"key"}, inputDataSet.getType())); + Grouping<KV<K, V>> grouping = new UnsortedGrouping<>(inputDataSet, new Keys.ExpressionKeys<>(new String[]{"key"}, inputDataSet.getType())); - GroupReduceOperator<KV<K, V>, KV<K, Iterable<V>>> outputDataSet = - new GroupReduceOperator<>(grouping, typeInformation, groupReduceFunction, transform.getName()); - context.setOutputDataSet(context.getOutput(transform), outputDataSet); - } - } + GroupReduceOperator<KV<K, V>, KV<K, Iterable<V>>> outputDataSet = + new GroupReduceOperator<>(grouping, typeInformation, groupReduceFunction, transform.getName()); + context.setOutputDataSet(context.getOutput(transform), outputDataSet); + } + } - /** - * Translates a GroupByKey while ignoring window assignments. This is identical to the {@link GroupByKeyOnlyTranslatorBatch} - */ - private static class GroupByKeyTranslatorBatch<K, V> implements FlinkBatchPipelineTranslator.BatchTransformTranslator<GroupByKey<K, V>> { + /** + * Translates a GroupByKey while ignoring window assignments. This is identical to the {@link GroupByKeyOnlyTranslatorBatch} + */ + private static class GroupByKeyTranslatorBatch<K, V> implements FlinkBatchPipelineTranslator.BatchTransformTranslator<GroupByKey<K, V>> { - @Override - public void translateNode(GroupByKey<K, V> transform, FlinkBatchTranslationContext context) { - DataSet<KV<K, V>> inputDataSet = context.getInputDataSet(context.getInput(transform)); - GroupReduceFunction<KV<K, V>, KV<K, Iterable<V>>> groupReduceFunction = new FlinkKeyedListAggregationFunction<>(); + @Override + public void translateNode(GroupByKey<K, V> transform, FlinkBatchTranslationContext context) { + DataSet<KV<K, V>> inputDataSet = context.getInputDataSet(context.getInput(transform)); + GroupReduceFunction<KV<K, V>, KV<K, Iterable<V>>> groupReduceFunction = new FlinkKeyedListAggregationFunction<>(); - TypeInformation<KV<K, Iterable<V>>> typeInformation = context.getTypeInfo(context.getOutput(transform)); + TypeInformation<KV<K, Iterable<V>>> typeInformation = context.getTypeInfo(context.getOutput(transform)); - Grouping<KV<K, V>> grouping = new UnsortedGrouping<>(inputDataSet, new Keys.ExpressionKeys<>(new String[]{"key"}, inputDataSet.getType())); + Grouping<KV<K, V>> grouping = new UnsortedGrouping<>(inputDataSet, new Keys.ExpressionKeys<>(new String[]{"key"}, inputDataSet.getType())); - GroupReduceOperator<KV<K, V>, KV<K, Iterable<V>>> outputDataSet = - new GroupReduceOperator<>(grouping, typeInformation, groupReduceFunction, transform.getName()); + GroupReduceOperator<KV<K, V>, KV<K, Iterable<V>>> outputDataSet = + new GroupReduceOperator<>(grouping, typeInformation, groupReduceFunction, transform.getName()); - context.setOutputDataSet(context.getOutput(transform), outputDataSet); - } - } + context.setOutputDataSet(context.getOutput(transform), outputDataSet); + } + } - private static class CombinePerKeyTranslatorBatch<K, VI, VA, VO> implements FlinkBatchPipelineTranslator.BatchTransformTranslator<Combine.PerKey<K, VI, VO>> { + private static class CombinePerKeyTranslatorBatch<K, VI, VA, VO> implements FlinkBatchPipelineTranslator.BatchTransformTranslator<Combine.PerKey<K, VI, VO>> { - @Override - public void translateNode(Combine.PerKey<K, VI, VO> transform, FlinkBatchTranslationContext context) { - DataSet<KV<K, VI>> inputDataSet = context.getInputDataSet(context.getInput(transform)); + @Override + public void translateNode(Combine.PerKey<K, VI, VO> transform, FlinkBatchTranslationContext context) { + DataSet<KV<K, VI>> inputDataSet = context.getInputDataSet(context.getInput(transform)); - @SuppressWarnings("unchecked") - Combine.KeyedCombineFn<K, VI, VA, VO> keyedCombineFn = (Combine.KeyedCombineFn<K, VI, VA, VO>) transform.getFn(); + @SuppressWarnings("unchecked") + Combine.KeyedCombineFn<K, VI, VA, VO> keyedCombineFn = (Combine.KeyedCombineFn<K, VI, VA, VO>) transform.getFn(); - KvCoder<K, VI> inputCoder = (KvCoder<K, VI>) context.getInput(transform).getCoder(); + KvCoder<K, VI> inputCoder = (KvCoder<K, VI>) context.getInput(transform).getCoder(); - Coder<VA> accumulatorCoder = - null; - try { - accumulatorCoder = keyedCombineFn.getAccumulatorCoder(context.getInput(transform).getPipeline().getCoderRegistry(), inputCoder.getKeyCoder(), inputCoder.getValueCoder()); - } catch (CannotProvideCoderException e) { - e.printStackTrace(); - // TODO - } + Coder<VA> accumulatorCoder = + null; + try { + accumulatorCoder = keyedCombineFn.getAccumulatorCoder(context.getInput(transform).getPipeline().getCoderRegistry(), inputCoder.getKeyCoder(), inputCoder.getValueCoder()); + } catch (CannotProvideCoderException e) { + e.printStackTrace(); + // TODO + } - TypeInformation<KV<K, VI>> kvCoderTypeInformation = new KvCoderTypeInformation<>(inputCoder); - TypeInformation<KV<K, VA>> partialReduceTypeInfo = new KvCoderTypeInformation<>(KvCoder.of(inputCoder.getKeyCoder(), accumulatorCoder)); + TypeInformation<KV<K, VI>> kvCoderTypeInformation = new KvCoderTypeInformation<>(inputCoder); + TypeInformation<KV<K, VA>> partialReduceTypeInfo = new KvCoderTypeInformation<>(KvCoder.of(inputCoder.getKeyCoder(), accumulatorCoder)); - Grouping<KV<K, VI>> inputGrouping = new UnsortedGrouping<>(inputDataSet, new Keys.ExpressionKeys<>(new String[]{"key"}, kvCoderTypeInformation)); + Grouping<KV<K, VI>> inputGrouping = new UnsortedGrouping<>(inputDataSet, new Keys.ExpressionKeys<>(new String[]{"key"}, kvCoderTypeInformation)); - FlinkPartialReduceFunction<K, VI, VA> partialReduceFunction = new FlinkPartialReduceFunction<>(keyedCombineFn); + FlinkPartialReduceFunction<K, VI, VA> partialReduceFunction = new FlinkPartialReduceFunction<>(keyedCombineFn); - // Partially GroupReduce the values into the intermediate format VA (combine) - GroupCombineOperator<KV<K, VI>, KV<K, VA>> groupCombine = - new GroupCombineOperator<>(inputGrouping, partialReduceTypeInfo, partialReduceFunction, - "GroupCombine: " + transform.getName()); + // Partially GroupReduce the values into the intermediate format VA (combine) + GroupCombineOperator<KV<K, VI>, KV<K, VA>> groupCombine = + new GroupCombineOperator<>(inputGrouping, partialReduceTypeInfo, partialReduceFunction, + "GroupCombine: " + transform.getName()); - // Reduce fully to VO - GroupReduceFunction<KV<K, VA>, KV<K, VO>> reduceFunction = new FlinkReduceFunction<>(keyedCombineFn); + // Reduce fully to VO + GroupReduceFunction<KV<K, VA>, KV<K, VO>> reduceFunction = new FlinkReduceFunction<>(keyedCombineFn); - TypeInformation<KV<K, VO>> reduceTypeInfo = context.getTypeInfo(context.getOutput(transform)); + TypeInformation<KV<K, VO>> reduceTypeInfo = context.getTypeInfo(context.getOutput(transform)); - Grouping<KV<K, VA>> intermediateGrouping = new UnsortedGrouping<>(groupCombine, new Keys.ExpressionKeys<>(new String[]{"key"}, groupCombine.getType())); + Grouping<KV<K, VA>> intermediateGrouping = new UnsortedGrouping<>(groupCombine, new Keys.ExpressionKeys<>(new String[]{"key"}, groupCombine.getType())); - // Fully reduce the values and create output format VO - GroupReduceOperator<KV<K, VA>, KV<K, VO>> outputDataSet = - new GroupReduceOperator<>(intermediateGrouping, reduceTypeInfo, reduceFunction, transform.getName()); + // Fully reduce the values and create output format VO + GroupReduceOperator<KV<K, VA>, KV<K, VO>> outputDataSet = + new GroupReduceOperator<>(intermediateGrouping, reduceTypeInfo, reduceFunction, transform.getName()); - context.setOutputDataSet(context.getOutput(transform), outputDataSet); - } - } + context.setOutputDataSet(context.getOutput(transform), outputDataSet); + } + } -// private static class CombineGroupedValuesTranslator<K, VI, VO> implements FlinkPipelineTranslator.TransformTranslator<Combine.GroupedValues<K, VI, VO>> { +// private static class CombineGroupedValuesTranslator<K, VI, VO> implements FlinkPipelineTranslator.TransformTranslator<Combine.GroupedValues<K, VI, VO>> { // -// @Override -// public void translateNode(Combine.GroupedValues<K, VI, VO> transform, TranslationContext context) { -// DataSet<KV<K, VI>> inputDataSet = context.getInputDataSet(transform.getInput()); +// @Override +// public void translateNode(Combine.GroupedValues<K, VI, VO> transform, TranslationContext context) { +// DataSet<KV<K, VI>> inputDataSet = context.getInputDataSet(transform.getInput()); // -// Combine.KeyedCombineFn<? super K, ? super VI, ?, VO> keyedCombineFn = transform.getFn(); +// Combine.KeyedCombineFn<? super K, ? super VI, ?, VO> keyedCombineFn = transform.getFn(); // -// GroupReduceFunction<KV<K, VI>, KV<K, VO>> groupReduceFunction = new FlinkCombineFunction<>(keyedCombineFn); +// GroupReduceFunction<KV<K, VI>, KV<K, VO>> groupReduceFunction = new FlinkCombineFunction<>(keyedCombineFn); // -// TypeInformation<KV<K, VO>> typeInformation = context.getTypeInfo(transform.getOutput()); +// TypeInformation<KV<K, VO>> typeInformation = context.getTypeInfo(transform.getOutput()); // -// Grouping<KV<K, VI>> grouping = new UnsortedGrouping<>(inputDataSet, new Keys.ExpressionKeys<>(new String[]{""}, inputDataSet.getType())); +// Grouping<KV<K, VI>> grouping = new UnsortedGrouping<>(inputDataSet, new Keys.ExpressionKeys<>(new String[]{""}, inputDataSet.getType())); // -// GroupReduceOperator<KV<K, VI>, KV<K, VO>> outputDataSet = -// new GroupReduceOperator<>(grouping, typeInformation, groupReduceFunction, transform.getName()); -// context.setOutputDataSet(transform.getOutput(), outputDataSet); -// } -// } - - private static class ParDoBoundTranslatorBatch<IN, OUT> implements FlinkBatchPipelineTranslator.BatchTransformTranslator<ParDo.Bound<IN, OUT>> { - private static final Logger LOG = LoggerFactory.getLogger(ParDoBoundTranslatorBatch.class); - - @Override - public void translateNode(ParDo.Bound<IN, OUT> transform, FlinkBatchTranslationContext context) { - DataSet<IN> inputDataSet = context.getInputDataSet(context.getInput(transform)); - - final DoFn<IN, OUT> doFn = transform.getFn(); - - TypeInformation<OUT> typeInformation = context.getTypeInfo(context.getOutput(transform)); - - FlinkDoFnFunction<IN, OUT> doFnWrapper = new FlinkDoFnFunction<>(doFn, context.getPipelineOptions()); - MapPartitionOperator<IN, OUT> outputDataSet = new MapPartitionOperator<>(inputDataSet, typeInformation, doFnWrapper, transform.getName()); - - transformSideInputs(transform.getSideInputs(), outputDataSet, context); - - context.setOutputDataSet(context.getOutput(transform), outputDataSet); - } - } - - private static class ParDoBoundMultiTranslatorBatch<IN, OUT> implements FlinkBatchPipelineTranslator.BatchTransformTranslator<ParDo.BoundMulti<IN, OUT>> { - private static final Logger LOG = LoggerFactory.getLogger(ParDoBoundMultiTranslatorBatch.class); - - @Override - public void translateNode(ParDo.BoundMulti<IN, OUT> transform, FlinkBatchTranslationContext context) { - DataSet<IN> inputDataSet = context.getInputDataSet(context.getInput(transform)); - - final DoFn<IN, OUT> doFn = transform.getFn(); - - Map<TupleTag<?>, PCollection<?>> outputs = context.getOutput(transform).getAll(); - - Map<TupleTag<?>, Integer> outputMap = Maps.newHashMap(); - // put the main output at index 0, FlinkMultiOutputDoFnFunction also expects this - outputMap.put(transform.getMainOutputTag(), 0); - int count = 1; - for (TupleTag<?> tag: outputs.keySet()) { - if (!outputMap.containsKey(tag)) { - outputMap.put(tag, count++); - } - } - - // collect all output Coders and create a UnionCoder for our tagged outputs - List<Coder<?>> outputCoders = Lists.newArrayList(); - for (PCollection<?> coll: outputs.values()) { - outputCoders.add(coll.getCoder()); - } - - UnionCoder unionCoder = UnionCoder.of(outputCoders); - - @SuppressWarnings("unchecked") - TypeInformation<RawUnionValue> typeInformation = new CoderTypeInformation<>(unionCoder); - - @SuppressWarnings("unchecked") - FlinkMultiOutputDoFnFunction<IN, OUT> doFnWrapper = new FlinkMultiOutputDoFnFunction(doFn, context.getPipelineOptions(), outputMap); - MapPartitionOperator<IN, RawUnionValue> outputDataSet = new MapPartitionOperator<>(inputDataSet, typeInformation, doFnWrapper, transform.getName()); - - transformSideInputs(transform.getSideInputs(), outputDataSet, context); - - for (Map.Entry<TupleTag<?>, PCollection<?>> output: outputs.entrySet()) { - TypeInformation<Object> outputType = context.getTypeInfo(output.getValue()); - int outputTag = outputMap.get(output.getKey()); - FlinkMultiOutputPruningFunction<Object> pruningFunction = new FlinkMultiOutputPruningFunction<>(outputTag); - FlatMapOperator<RawUnionValue, Object> pruningOperator = new - FlatMapOperator<>(outputDataSet, outputType, - pruningFunction, output.getValue().getName()); - context.setOutputDataSet(output.getValue(), pruningOperator); - - } - } - } - - private static class FlattenPCollectionTranslatorBatch<T> implements FlinkBatchPipelineTranslator.BatchTransformTranslator<Flatten.FlattenPCollectionList<T>> { - - @Override - public void translateNode(Flatten.FlattenPCollectionList<T> transform, FlinkBatchTranslationContext context) { - List<PCollection<T>> allInputs = context.getInput(transform).getAll(); - DataSet<T> result = null; - for(PCollection<T> collection : allInputs) { - DataSet<T> current = context.getInputDataSet(collection); - if (result == null) { - result = current; - } else { - result = result.union(current); - } - } - context.setOutputDataSet(context.getOutput(transform), result); - } - } - - private static class CreatePCollectionViewTranslatorBatch<R, T> implements FlinkBatchPipelineTranslator.BatchTransformTranslator<View.CreatePCollectionView<R, T>> { - @Override - public void translateNode(View.CreatePCollectionView<R, T> transform, FlinkBatchTranslationContext context) { - DataSet<T> inputDataSet = context.getInputDataSet(context.getInput(transform)); - PCollectionView<T> input = transform.apply(null); - context.setSideInputDataSet(input, inputDataSet); - } - } - - private static class CreateTranslatorBatch<OUT> implements FlinkBatchPipelineTranslator.BatchTransformTranslator<Create.Values<OUT>> { - - @Override - public void translateNode(Create.Values<OUT> transform, FlinkBatchTranslationContext context) { - TypeInformation<OUT> typeInformation = context.getOutputTypeInfo(); - Iterable<OUT> elements = transform.getElements(); - - // we need to serialize the elements to byte arrays, since they might contain - // elements that are not serializable by Java serialization. We deserialize them - // in the FlatMap function using the Coder. - - List<byte[]> serializedElements = Lists.newArrayList(); - Coder<OUT> coder = context.getOutput(transform).getCoder(); - for (OUT element: elements) { - ByteArrayOutputStream bao = new ByteArrayOutputStream(); - try { - coder.encode(element, bao, Coder.Context.OUTER); - serializedElements.add(bao.toByteArray()); - } catch (IOException e) { - throw new RuntimeException("Could not serialize Create elements using Coder: " + e); - } - } - - DataSet<Integer> initDataSet = context.getExecutionEnvironment().fromElements(1); - FlinkCreateFunction<Integer, OUT> flatMapFunction = new FlinkCreateFunction<>(serializedElements, coder); - FlatMapOperator<Integer, OUT> outputDataSet = new FlatMapOperator<>(initDataSet, typeInformation, flatMapFunction, transform.getName()); - - context.setOutputDataSet(context.getOutput(transform), outputDataSet); - } - } - - private static void transformSideInputs(List<PCollectionView<?>> sideInputs, - MapPartitionOperator<?, ?> outputDataSet, - FlinkBatchTranslationContext context) { - // get corresponding Flink broadcast DataSets - for(PCollectionView<?> input : sideInputs) { - DataSet<?> broadcastSet = context.getSideInputDataSet(input); - outputDataSet.withBroadcastSet(broadcastSet, input.getTagInternal().getId()); - } - } +// GroupReduceOperator<KV<K, VI>, KV<K, VO>> outputDataSet = +// new GroupReduceOperator<>(grouping, typeInformation, groupReduceFunction, transform.getName()); +// context.setOutputDataSet(transform.getOutput(), outputDataSet); +// } +// } + + private static class ParDoBoundTranslatorBatch<IN, OUT> implements FlinkBatchPipelineTranslator.BatchTransformTranslator<ParDo.Bound<IN, OUT>> { + private static final Logger LOG = LoggerFactory.getLogger(ParDoBoundTranslatorBatch.class); + + @Override + public void translateNode(ParDo.Bound<IN, OUT> transform, FlinkBatchTranslationContext context) { + DataSet<IN> inputDataSet = context.getInputDataSet(context.getInput(transform)); + + final DoFn<IN, OUT> doFn = transform.getFn(); + + TypeInformation<OUT> typeInformation = context.getTypeInfo(context.getOutput(transform)); + + FlinkDoFnFunction<IN, OUT> doFnWrapper = new FlinkDoFnFunction<>(doFn, context.getPipelineOptions()); + MapPartitionOperator<IN, OUT> outputDataSet = new MapPartitionOperator<>(inputDataSet, typeInformation, doFnWrapper, transform.getName()); + + transformSideInputs(transform.getSideInputs(), outputDataSet, context); + + context.setOutputDataSet(context.getOutput(transform), outputDataSet); + } + } + + private static class ParDoBoundMultiTranslatorBatch<IN, OUT> implements FlinkBatchPipelineTranslator.BatchTransformTranslator<ParDo.BoundMulti<IN, OUT>> { + private static final Logger LOG = LoggerFactory.getLogger(ParDoBoundMultiTranslatorBatch.class); + + @Override + public void translateNode(ParDo.BoundMulti<IN, OUT> transform, FlinkBatchTranslationContext context) { + DataSet<IN> inputDataSet = context.getInputDataSet(context.getInput(transform)); + + final DoFn<IN, OUT> doFn = transform.getFn(); + + Map<TupleTag<?>, PCollection<?>> outputs = context.getOutput(transform).getAll(); + + Map<TupleTag<?>, Integer> outputMap = Maps.newHashMap(); + // put the main output at index 0, FlinkMultiOutputDoFnFunction also expects this + outputMap.put(transform.getMainOutputTag(), 0); + int count = 1; + for (TupleTag<?> tag: outputs.keySet()) { + if (!outputMap.containsKey(tag)) { + outputMap.put(tag, count++); + } + } + + // collect all output Coders and create a UnionCoder for our tagged outputs + List<Coder<?>> outputCoders = Lists.newArrayList(); + for (PCollection<?> coll: outputs.values()) { + outputCoders.add(coll.getCoder()); + } + + UnionCoder unionCoder = UnionCoder.of(outputCoders); + + @SuppressWarnings("unchecked") + TypeInformation<RawUnionValue> typeInformation = new CoderTypeInformation<>(unionCoder); + + @SuppressWarnings("unchecked") + FlinkMultiOutputDoFnFunction<IN, OUT> doFnWrapper = new FlinkMultiOutputDoFnFunction(doFn, context.getPipelineOptions(), outputMap); + MapPartitionOperator<IN, RawUnionValue> outputDataSet = new MapPartitionOperator<>(inputDataSet, typeInformation, doFnWrapper, transform.getName()); + + transformSideInputs(transform.getSideInputs(), outputDataSet, context); + + for (Map.Entry<TupleTag<?>, PCollection<?>> output: outputs.entrySet()) { + TypeInformation<Object> outputType = context.getTypeInfo(output.getValue()); + int outputTag = outputMap.get(output.getKey()); + FlinkMultiOutputPruningFunction<Object> pruningFunction = new FlinkMultiOutputPruningFunction<>(outputTag); + FlatMapOperator<RawUnionValue, Object> pruningOperator = new + FlatMapOperator<>(outputDataSet, outputType, + pruningFunction, output.getValue().getName()); + context.setOutputDataSet(output.getValue(), pruningOperator); + + } + } + } + + private static class FlattenPCollectionTranslatorBatch<T> implements FlinkBatchPipelineTranslator.BatchTransformTranslator<Flatten.FlattenPCollectionList<T>> { + + @Override + public void translateNode(Flatten.FlattenPCollectionList<T> transform, FlinkBatchTranslationContext context) { + List<PCollection<T>> allInputs = context.getInput(transform).getAll(); + DataSet<T> result = null; + for(PCollection<T> collection : allInputs) { + DataSet<T> current = context.getInputDataSet(collection); + if (result == null) { + result = current; + } else { + result = result.union(current); + } + } + context.setOutputDataSet(context.getOutput(transform), result); + } + } + + private static class CreatePCollectionViewTranslatorBatch<R, T> implements FlinkBatchPipelineTranslator.BatchTransformTranslator<View.CreatePCollectionView<R, T>> { + @Override + public void translateNode(View.CreatePCollectionView<R, T> transform, FlinkBatchTranslationContext context) { + DataSet<T> inputDataSet = context.getInputDataSet(context.getInput(transform)); + PCollectionView<T> input = transform.apply(null); + context.setSideInputDataSet(input, inputDataSet); + } + } + + private static class CreateTranslatorBatch<OUT> implements FlinkBatchPipelineTranslator.BatchTransformTranslator<Create.Values<OUT>> { + + @Override + public void translateNode(Create.Values<OUT> transform, FlinkBatchTranslationContext context) { + TypeInformation<OUT> typeInformation = context.getOutputTypeInfo(); + Iterable<OUT> elements = transform.getElements(); + + // we need to serialize the elements to byte arrays, since they might contain + // elements that are not serializable by Java serialization. We deserialize them + // in the FlatMap function using the Coder. + + List<byte[]> serializedElements = Lists.newArrayList(); + Coder<OUT> coder = context.getOutput(transform).getCoder(); + for (OUT element: elements) { + ByteArrayOutputStream bao = new ByteArrayOutputStream(); + try { + coder.encode(element, bao, Coder.Context.OUTER); + serializedElements.add(bao.toByteArray()); + } catch (IOException e) { + throw new RuntimeException("Could not serialize Create elements using Coder: " + e); + } + } + + DataSet<Integer> initDataSet = context.getExecutionEnvironment().fromElements(1); + FlinkCreateFunction<Integer, OUT> flatMapFunction = new FlinkCreateFunction<>(serializedElements, coder); + FlatMapOperator<Integer, OUT> outputDataSet = new FlatMapOperator<>(initDataSet, typeInformation, flatMapFunction, transform.getName()); + + context.setOutputDataSet(context.getOutput(transform), outputDataSet); + } + } + + private static void transformSideInputs(List<PCollectionView<?>> sideInputs, + MapPartitionOperator<?, ?> outputDataSet, + FlinkBatchTranslationContext context) { + // get corresponding Flink broadcast DataSets + for(PCollectionView<?> input : sideInputs) { + DataSet<?> broadcastSet = context.getSideInputDataSet(input); + outputDataSet.withBroadcastSet(broadcastSet, input.getTagInternal().getId()); + } + } // Disabled because it depends on a pending pull request to the DataFlowSDK - /** - * Special composite transform translator. Only called if the CoGroup is two dimensional. - * @param <K> - */ - private static class CoGroupByKeyTranslatorBatch<K, V1, V2> implements FlinkBatchPipelineTranslator.BatchTransformTranslator<CoGroupByKey<K>> { + /** + * Special composite transform translator. Only called if the CoGroup is two dimensional. + * @param <K> + */ + private static class CoGroupByKeyTranslatorBatch<K, V1, V2> implements FlinkBatchPipelineTranslator.BatchTransformTranslator<CoGroupByKey<K>> { - @Override - public void translateNode(CoGroupByKey<K> transform, FlinkBatchTranslationContext context) { - KeyedPCollectionTuple<K> input = context.getInput(transform); + @Override + public void translateNode(CoGroupByKey<K> transform, FlinkBatchTranslationContext context) { + KeyedPCollectionTuple<K> input = context.getInput(transform); - CoGbkResultSchema schema = input.getCoGbkResultSchema(); - List<KeyedPCollectionTuple.TaggedKeyedPCollection<K, ?>> keyedCollections = input.getKeyedCollections(); + CoGbkResultSchema schema = input.getCoGbkResultSchema(); + List<KeyedPCollectionTuple.TaggedKeyedPCollection<K, ?>> keyedCollections = input.getKeyedCollections(); - KeyedPCollectionTuple.TaggedKeyedPCollection<K, ?> taggedCollection1 = keyedCollections.get(0); - KeyedPCollectionTuple.TaggedKeyedPCollection<K, ?> taggedCollection2 = keyedCollections.get(1); + KeyedPCollectionTuple.TaggedKeyedPCollection<K, ?> taggedCollection1 = keyedCollections.get(0); + KeyedPCollectionTuple.TaggedKeyedPCollection<K, ?> taggedCollection2 = keyedCollections.get(1); - TupleTag<?> tupleTag1 = taggedCollection1.getTupleTag(); - TupleTag<?> tupleTag2 = taggedCollection2.getTupleTag(); + TupleTag<?> tupleTag1 = taggedCollection1.getTupleTag(); + TupleTag<?> tupleTag2 = taggedCollection2.getTupleTag(); - PCollection<? extends KV<K, ?>> collection1 = taggedCollection1.getCollection(); - PCollection<? extends KV<K, ?>> collection2 = taggedCollection2.getCollection(); + PCollection<? extends KV<K, ?>> collection1 = taggedCollection1.getCollection(); + PCollection<? extends KV<K, ?>> collection2 = taggedCollection2.getCollection(); - DataSet<KV<K,V1>> inputDataSet1 = context.getInputDataSet(collection1); - DataSet<KV<K,V2>> inputDataSet2 = context.getInputDataSet(collection2); + DataSet<KV<K,V1>> inputDataSet1 = context.getInputDataSet(collection1); + DataSet<KV<K,V2>> inputDataSet2 = context.getInputDataSet(collection2); - TypeInformation<KV<K,CoGbkResult>> typeInfo = context.getOutputTypeInfo(); + TypeInformation<KV<K,CoGbkResult>> typeInfo = context.getOutputTypeInfo(); - FlinkCoGroupKeyedListAggregator<K,V1,V2> aggregator = new FlinkCoGroupKeyedListAggregator<>(schema, tupleTag1, tupleTag2); + FlinkCoGroupKeyedListAggregator<K,V1,V2> aggregator = new FlinkCoGroupKeyedListAggregator<>(schema, tupleTag1, tupleTag2); - Keys.ExpressionKeys<KV<K,V1>> keySelector1 = new Keys.ExpressionKeys<>(new String[]{"key"}, inputDataSet1.getType()); - Keys.ExpressionKeys<KV<K,V2>> keySelector2 = new Keys.ExpressionKeys<>(new String[]{"key"}, inputDataSet2.getType()); + Keys.ExpressionKeys<KV<K,V1>> keySelector1 = new Keys.ExpressionKeys<>(new String[]{"key"}, inputDataSet1.getType()); + Keys.ExpressionKeys<KV<K,V2>> keySelector2 = new Keys.ExpressionKeys<>(new String[]{"key"}, inputDataSet2.getType()); - DataSet<KV<K, CoGbkResult>> out = new CoGroupOperator<>(inputDataSet1, inputDataSet2, - keySelector1, keySelector2, - aggregator, typeInfo, null, transform.getName()); - context.setOutputDataSet(context.getOutput(transform), out); - } - } + DataSet<KV<K, CoGbkResult>> out = new CoGroupOperator<>(inputDataSet1, inputDataSet2, + keySelector1, keySelector2, + aggregator, typeInfo, null, transform.getName()); + context.setOutputDataSet(context.getOutput(transform), out); + } + } - // -------------------------------------------------------------------------------------------- - // Miscellaneous - // -------------------------------------------------------------------------------------------- - - private FlinkBatchTransformTranslators() {} + // -------------------------------------------------------------------------------------------- + // Miscellaneous + // -------------------------------------------------------------------------------------------- + + private FlinkBatchTransformTranslators() {} }
