http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/pom.xml ---------------------------------------------------------------------- diff --git a/runners/flink/pom.xml b/runners/flink/pom.xml index c8c5d84..31713cd 100644 --- a/runners/flink/pom.xml +++ b/runners/flink/pom.xml @@ -1,261 +1,167 @@ <?xml version="1.0" encoding="UTF-8"?> <!-- - Licensed to the Apache Software Foundation (ASF) under one - or more contributor license agreements. See the NOTICE file - distributed with this work for additional information - regarding copyright ownership. The ASF licenses this file - to you under the Apache License, Version 2.0 (the - "License"); you may not use this file except in compliance - with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, - software distributed under the License is distributed on an - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - KIND, either express or implied. See the License for the - specific language governing permissions and limitations - under the License. + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. --> <project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" - xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"> - - <modelVersion>4.0.0</modelVersion> - - <parent> - <groupId>org.apache.beam</groupId> - <artifactId>runners</artifactId> - <version>1.6.0-SNAPSHOT</version> - </parent> - - <artifactId>flink-runner</artifactId> - <version>0.3-SNAPSHOT</version> - - <name>Flink Beam Runner</name> - <packaging>jar</packaging> - - <inceptionYear>2015</inceptionYear> - - <licenses> - <license> - <name>The Apache Software License, Version 2.0</name> - <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url> - <distribution>repo</distribution> - </license> - </licenses> - - <properties> - <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> - <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> - <flink.version>1.0.0</flink.version> - <beam.version>1.6.0-SNAPSHOT</beam.version> - <scala.major.version>2.10</scala.major.version> - <!-- Default parameters for mvn exec:exec --> - <clazz>org.apache.beam.runners.flink.examples.WordCount</clazz> - <input>kinglear.txt</input> - <output>wordcounts.txt</output> - <parallelism>1</parallelism> - </properties> - - <repositories> - <repository> - <id>apache.snapshots</id> - <name>Apache Development Snapshot Repository</name> - <url>https://repository.apache.org/content/repositories/snapshots/</url> - <releases> - <enabled>false</enabled> - </releases> - <snapshots> - <enabled>true</enabled> - </snapshots> - </repository> - </repositories> - - <dependencies> - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-core</artifactId> - <version>${flink.version}</version> - </dependency> - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-streaming-java_${scala.major.version}</artifactId> - <version>${flink.version}</version> - </dependency> - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-streaming-java_${scala.major.version}</artifactId> - <version>${flink.version}</version> - <scope>test</scope> - <type>test-jar</type> - </dependency> - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-java</artifactId> - <version>${flink.version}</version> - </dependency> - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-clients_${scala.major.version}</artifactId> - <version>${flink.version}</version> - </dependency> - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-test-utils_${scala.major.version}</artifactId> - <version>${flink.version}</version> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-connector-kafka-0.8_${scala.major.version}</artifactId> - <version>${flink.version}</version> - </dependency> - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-avro_${scala.major.version}</artifactId> - <version>${flink.version}</version> - </dependency> - <dependency> - <groupId>com.google.cloud.dataflow</groupId> - <artifactId>google-cloud-dataflow-java-sdk-all</artifactId> - <version>${beam.version}</version> - <exclusions> - <exclusion> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-jdk14</artifactId> - </exclusion> - </exclusions> - </dependency> - <dependency> - <groupId>org.mockito</groupId> - <artifactId>mockito-all</artifactId> - <version>1.9.5</version> - <scope>test</scope> - </dependency> - </dependencies> - - <build> - <plugins> - - <!-- JAR Packaging --> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-jar-plugin</artifactId> - <version>2.6</version><!--$NO-MVN-MAN-VER$--> - <configuration> - <archive> - <manifest> - <addDefaultImplementationEntries>true</addDefaultImplementationEntries> - <addDefaultSpecificationEntries>true</addDefaultSpecificationEntries> - </manifest> - </archive> - </configuration> - </plugin> - - <!-- Java compiler --> - <plugin> - - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-compiler-plugin</artifactId> - <version>3.1</version><!--$NO-MVN-MAN-VER$--> - <configuration> - <source>1.7</source> - <target>1.7</target> - </configuration> - </plugin> - - <!-- Integration Tests --> - <plugin> - <artifactId>maven-failsafe-plugin</artifactId> - <version>2.17</version> - <executions> - <execution> - <goals> - <goal>integration-test</goal> - <goal>verify</goal> - </goals> - </execution> - </executions> - <configuration> - <forkCount>1</forkCount> - <argLine>-Dlog4j.configuration=log4j-test.properties -XX:-UseGCOverheadLimit</argLine> - </configuration> - </plugin> - - <!-- Unit Tests --> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-surefire-plugin</artifactId> - <version>2.17</version> - <configuration> - <argLine>-Dlog4j.configuration=log4j-test.properties -XX:-UseGCOverheadLimit</argLine> - </configuration> - </plugin> - - <!-- Eclipse Integration --> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-eclipse-plugin</artifactId> - <version>2.8</version> - <configuration> - <classpathContainers> - <classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer> - </classpathContainers> - <downloadSources>true</downloadSources> - <downloadJavadocs>true</downloadJavadocs> - </configuration> - </plugin> - - <!-- Maven minimum version check --> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-enforcer-plugin</artifactId> - <version>1.3.1</version><!--$NO-MVN-MAN-VER$--> - <executions> - <execution> - <id>enforce-maven</id> - <goals> - <goal>enforce</goal> - </goals> - <configuration> - <rules> - <requireJavaVersion> - <version>[1.7,)</version> - </requireJavaVersion> - <requireMavenVersion> - <!-- enforce at least mvn version 3.0.3 --> - <version>[3.0.3,)</version> - </requireMavenVersion> - </rules> - </configuration> - </execution> - </executions> - </plugin> - - <plugin> - <groupId>org.codehaus.mojo</groupId> - <artifactId>exec-maven-plugin</artifactId> - <version>1.2.1</version> - <executions> - <execution> - <phase>none</phase> - </execution> - </executions> - <configuration> - <executable>java</executable> - <arguments> - <argument>-classpath</argument> - <classpath/> - <argument>${clazz}</argument> - <argument>--input=${input}</argument> - <argument>--output=${output}</argument> - <argument>--parallelism=${parallelism}</argument> - </arguments> - </configuration> - </plugin> - - </plugins> - - </build> + xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.beam</groupId> + <artifactId>runners-parent</artifactId> + <version>1.6.0-SNAPSHOT</version> + </parent> + + <artifactId>flink-runner-parent</artifactId> + <version>0.4-SNAPSHOT</version> + + <name>Flink Beam Runner</name> + <packaging>pom</packaging> + + <inceptionYear>2015</inceptionYear> + + <modules> + <module>runner</module> + <module>examples</module> + </modules> + + <licenses> + <license> + <name>The Apache Software License, Version 2.0</name> + <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url> + <distribution>repo</distribution> + </license> + </licenses> + + <properties> + <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> + <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> + <flink.version>1.0.0</flink.version> + <beam.version>1.6.0-SNAPSHOT</beam.version> + </properties> + + <repositories> + <repository> + <id>apache.snapshots</id> + <name>Apache Development Snapshot Repository</name> + <url>https://repository.apache.org/content/repositories/snapshots/</url> + <releases> + <enabled>false</enabled> + </releases> + <snapshots> + <enabled>true</enabled> + </snapshots> + </repository> + </repositories> + + <build> + + <pluginManagement> + + <plugins> + + <!-- Integration Tests --> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-failsafe-plugin</artifactId> + <version>2.17</version> + <executions> + <execution> + <goals> + <goal>integration-test</goal> + <goal>verify</goal> + </goals> + </execution> + </executions> + <configuration> + <forkCount>1</forkCount> + <argLine>-Dlog4j.configuration=log4j-test.properties -XX:-UseGCOverheadLimit</argLine> + </configuration> + </plugin> + + <!-- Unit Tests --> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-plugin</artifactId> + <version>2.17</version> + <configuration> + <argLine>-Dlog4j.configuration=log4j-test.properties -XX:-UseGCOverheadLimit</argLine> + </configuration> + </plugin> + + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>exec-maven-plugin</artifactId> + <version>1.2.1</version> + <executions> + <execution> + <phase>none</phase> + </execution> + </executions> + </plugin> + + </plugins> + + </pluginManagement> + + + <plugins> + <!-- Java compiler --> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + <version>3.1</version> + <configuration> + <source>1.7</source> + <target>1.7</target> + </configuration> + </plugin> + + <!-- Maven minimum version check --> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-enforcer-plugin</artifactId> + <version>1.3.1</version> + <executions> + <execution> + <id>enforce-maven</id> + <goals> + <goal>enforce</goal> + </goals> + <configuration> + <rules> + <requireJavaVersion> + <version>[1.7,)</version> + </requireJavaVersion> + <requireMavenVersion> + <!-- enforce at least mvn version 3.0.3 --> + <version>[3.0.3,)</version> + </requireMavenVersion> + </rules> + </configuration> + </execution> + </executions> + </plugin> + + </plugins> + + + </build> </project>
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/runner/pom.xml ---------------------------------------------------------------------- diff --git a/runners/flink/runner/pom.xml b/runners/flink/runner/pom.xml new file mode 100644 index 0000000..212b973 --- /dev/null +++ b/runners/flink/runner/pom.xml @@ -0,0 +1,145 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +--> +<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" + xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.beam</groupId> + <artifactId>flink-runner-parent</artifactId> + <version>0.4-SNAPSHOT</version> + </parent> + + <artifactId>flink-runner_2.10</artifactId> + <version>0.4-SNAPSHOT</version> + + <name>Flink Beam Runner Core</name> + <packaging>jar</packaging> + + <inceptionYear>2015</inceptionYear> + + <licenses> + <license> + <name>The Apache Software License, Version 2.0</name> + <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url> + <distribution>repo</distribution> + </license> + </licenses> + + <dependencies> + <!-- Flink dependencies --> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-java_2.10</artifactId> + <version>${flink.version}</version> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-java</artifactId> + <version>${flink.version}</version> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-clients_2.10</artifactId> + <version>${flink.version}</version> + </dependency> + <!-- Libraries not part of Flink which need to be included by the user (see flink-examples) --> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connector-kafka-0.8_2.10</artifactId> + <version>${flink.version}</version> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-avro_2.10</artifactId> + <version>${flink.version}</version> + </dependency> + <!--- Dataflow --> + <dependency> + <groupId>com.google.cloud.dataflow</groupId> + <artifactId>google-cloud-dataflow-java-sdk-all</artifactId> + <version>${beam.version}</version> + <exclusions> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-jdk14</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>com.google.auto.service</groupId> + <artifactId>auto-service</artifactId> + <version>1.0-rc2</version> + </dependency> + <!-- Test scoped --> + <dependency> + <groupId>com.google.cloud.dataflow</groupId> + <artifactId>google-cloud-dataflow-java-examples-all</artifactId> + <version>${beam.version}</version> + <exclusions> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-jdk14</artifactId> + </exclusion> + </exclusions> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-java_2.10</artifactId> + <version>${flink.version}</version> + <scope>test</scope> + <type>test-jar</type> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-test-utils_2.10</artifactId> + <version>${flink.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-all</artifactId> + <version>1.9.5</version> + <scope>test</scope> + </dependency> + </dependencies> + + <build> + <plugins> + + <!-- Integration Tests --> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-failsafe-plugin</artifactId> + </plugin> + + <!-- Unit Tests --> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-plugin</artifactId> + </plugin> + + </plugins> + + </build> + +</project> http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java new file mode 100644 index 0000000..8825ed3 --- /dev/null +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java @@ -0,0 +1,269 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink; + +import org.apache.beam.runners.flink.translation.FlinkPipelineTranslator; +import org.apache.beam.runners.flink.translation.FlinkBatchPipelineTranslator; +import org.apache.beam.runners.flink.translation.FlinkStreamingPipelineTranslator; +import com.google.cloud.dataflow.sdk.Pipeline; +import com.google.common.base.Preconditions; +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.java.CollectionEnvironment; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; + +/** + * The class that instantiates and manages the execution of a given job. + * Depending on if the job is a Streaming or Batch processing one, it creates + * the adequate execution environment ({@link ExecutionEnvironment} or {@link StreamExecutionEnvironment}), + * the necessary {@link FlinkPipelineTranslator} ({@link FlinkBatchPipelineTranslator} or + * {@link FlinkStreamingPipelineTranslator})to transform the Beam job into a Flink one, and + * executes the (translated) job. + */ +public class FlinkPipelineExecutionEnvironment { + + private static final Logger LOG = LoggerFactory.getLogger(FlinkPipelineExecutionEnvironment.class); + + private final FlinkPipelineOptions options; + + /** + * The Flink Batch execution environment. This is instantiated to either a + * {@link org.apache.flink.api.java.CollectionEnvironment}, + * a {@link org.apache.flink.api.java.LocalEnvironment} or + * a {@link org.apache.flink.api.java.RemoteEnvironment}, depending on the configuration + * options. + */ + private ExecutionEnvironment flinkBatchEnv; + + + /** + * The Flink Streaming execution environment. This is instantiated to either a + * {@link org.apache.flink.streaming.api.environment.LocalStreamEnvironment} or + * a {@link org.apache.flink.streaming.api.environment.RemoteStreamEnvironment}, depending + * on the configuration options, and more specifically, the url of the master. + */ + private StreamExecutionEnvironment flinkStreamEnv; + + /** + * Translator for this FlinkPipelineRunner. Its role is to translate the Beam operators to + * their Flink counterparts. Based on the options provided by the user, if we have a streaming job, + * this is instantiated as a {@link FlinkStreamingPipelineTranslator}. In other case, i.e. a batch job, + * a {@link FlinkBatchPipelineTranslator} is created. + */ + private FlinkPipelineTranslator flinkPipelineTranslator; + + /** + * Creates a {@link FlinkPipelineExecutionEnvironment} with the user-specified parameters in the + * provided {@link FlinkPipelineOptions}. + * + * @param options the user-defined pipeline options. + * */ + public FlinkPipelineExecutionEnvironment(FlinkPipelineOptions options) { + this.options = Preconditions.checkNotNull(options); + this.createPipelineExecutionEnvironment(); + this.createPipelineTranslator(); + } + + /** + * Depending on the type of job (Streaming or Batch) and the user-specified options, + * this method creates the adequate ExecutionEnvironment. + */ + private void createPipelineExecutionEnvironment() { + if (options.isStreaming()) { + createStreamExecutionEnvironment(); + } else { + createBatchExecutionEnvironment(); + } + } + + /** + * Depending on the type of job (Streaming or Batch), this method creates the adequate job graph + * translator. In the case of batch, it will work with {@link org.apache.flink.api.java.DataSet}, + * while for streaming, it will work with {@link org.apache.flink.streaming.api.datastream.DataStream}. + */ + private void createPipelineTranslator() { + checkInitializationState(); + if (this.flinkPipelineTranslator != null) { + throw new IllegalStateException("FlinkPipelineTranslator already initialized."); + } + + this.flinkPipelineTranslator = options.isStreaming() ? + new FlinkStreamingPipelineTranslator(flinkStreamEnv, options) : + new FlinkBatchPipelineTranslator(flinkBatchEnv, options); + } + + /** + * Depending on if the job is a Streaming or a Batch one, this method creates + * the necessary execution environment and pipeline translator, and translates + * the {@link com.google.cloud.dataflow.sdk.values.PCollection} program into + * a {@link org.apache.flink.api.java.DataSet} or {@link org.apache.flink.streaming.api.datastream.DataStream} + * one. + * */ + public void translate(Pipeline pipeline) { + checkInitializationState(); + if(this.flinkBatchEnv == null && this.flinkStreamEnv == null) { + createPipelineExecutionEnvironment(); + } + if (this.flinkPipelineTranslator == null) { + createPipelineTranslator(); + } + this.flinkPipelineTranslator.translate(pipeline); + } + + /** + * Launches the program execution. + * */ + public JobExecutionResult executePipeline() throws Exception { + if (options.isStreaming()) { + if (this.flinkStreamEnv == null) { + throw new RuntimeException("FlinkPipelineExecutionEnvironment not initialized."); + } + if (this.flinkPipelineTranslator == null) { + throw new RuntimeException("FlinkPipelineTranslator not initialized."); + } + return this.flinkStreamEnv.execute(); + } else { + if (this.flinkBatchEnv == null) { + throw new RuntimeException("FlinkPipelineExecutionEnvironment not initialized."); + } + if (this.flinkPipelineTranslator == null) { + throw new RuntimeException("FlinkPipelineTranslator not initialized."); + } + return this.flinkBatchEnv.execute(); + } + } + + /** + * If the submitted job is a batch processing job, this method creates the adequate + * Flink {@link org.apache.flink.api.java.ExecutionEnvironment} depending + * on the user-specified options. + */ + private void createBatchExecutionEnvironment() { + if (this.flinkStreamEnv != null || this.flinkBatchEnv != null) { + throw new RuntimeException("FlinkPipelineExecutionEnvironment already initialized."); + } + + LOG.info("Creating the required Batch Execution Environment."); + + String masterUrl = options.getFlinkMaster(); + this.flinkStreamEnv = null; + + // depending on the master, create the right environment. + if (masterUrl.equals("[local]")) { + this.flinkBatchEnv = ExecutionEnvironment.createLocalEnvironment(); + } else if (masterUrl.equals("[collection]")) { + this.flinkBatchEnv = new CollectionEnvironment(); + } else if (masterUrl.equals("[auto]")) { + this.flinkBatchEnv = ExecutionEnvironment.getExecutionEnvironment(); + } else if (masterUrl.matches(".*:\\d*")) { + String[] parts = masterUrl.split(":"); + List<String> stagingFiles = options.getFilesToStage(); + this.flinkBatchEnv = ExecutionEnvironment.createRemoteEnvironment(parts[0], + Integer.parseInt(parts[1]), + stagingFiles.toArray(new String[stagingFiles.size()])); + } else { + LOG.warn("Unrecognized Flink Master URL {}. Defaulting to [auto].", masterUrl); + this.flinkBatchEnv = ExecutionEnvironment.getExecutionEnvironment(); + } + + // set the correct parallelism. + if (options.getParallelism() != -1 && !(this.flinkBatchEnv instanceof CollectionEnvironment)) { + this.flinkBatchEnv.setParallelism(options.getParallelism()); + } + + // set parallelism in the options (required by some execution code) + options.setParallelism(flinkBatchEnv.getParallelism()); + } + + /** + * If the submitted job is a stream processing job, this method creates the adequate + * Flink {@link org.apache.flink.streaming.api.environment.StreamExecutionEnvironment} depending + * on the user-specified options. + */ + private void createStreamExecutionEnvironment() { + if (this.flinkStreamEnv != null || this.flinkBatchEnv != null) { + throw new RuntimeException("FlinkPipelineExecutionEnvironment already initialized."); + } + + LOG.info("Creating the required Streaming Environment."); + + String masterUrl = options.getFlinkMaster(); + this.flinkBatchEnv = null; + + // depending on the master, create the right environment. + if (masterUrl.equals("[local]")) { + this.flinkStreamEnv = StreamExecutionEnvironment.createLocalEnvironment(); + } else if (masterUrl.equals("[auto]")) { + this.flinkStreamEnv = StreamExecutionEnvironment.getExecutionEnvironment(); + } else if (masterUrl.matches(".*:\\d*")) { + String[] parts = masterUrl.split(":"); + List<String> stagingFiles = options.getFilesToStage(); + this.flinkStreamEnv = StreamExecutionEnvironment.createRemoteEnvironment(parts[0], + Integer.parseInt(parts[1]), stagingFiles.toArray(new String[stagingFiles.size()])); + } else { + LOG.warn("Unrecognized Flink Master URL {}. Defaulting to [auto].", masterUrl); + this.flinkStreamEnv = StreamExecutionEnvironment.getExecutionEnvironment(); + } + + // set the correct parallelism. + if (options.getParallelism() != -1) { + this.flinkStreamEnv.setParallelism(options.getParallelism()); + } + + // set parallelism in the options (required by some execution code) + options.setParallelism(flinkStreamEnv.getParallelism()); + + // default to event time + this.flinkStreamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); + + // for the following 2 parameters, a value of -1 means that Flink will use + // the default values as specified in the configuration. + int numRetries = options.getNumberOfExecutionRetries(); + if (numRetries != -1) { + this.flinkStreamEnv.setNumberOfExecutionRetries(numRetries); + } + long retryDelay = options.getExecutionRetryDelay(); + if (retryDelay != -1) { + this.flinkStreamEnv.getConfig().setExecutionRetryDelay(retryDelay); + } + + // A value of -1 corresponds to disabled checkpointing (see CheckpointConfig in Flink). + // If the value is not -1, then the validity checks are applied. + // By default, checkpointing is disabled. + long checkpointInterval = options.getCheckpointingInterval(); + if(checkpointInterval != -1) { + if (checkpointInterval < 1) { + throw new IllegalArgumentException("The checkpoint interval must be positive"); + } + this.flinkStreamEnv.enableCheckpointing(checkpointInterval); + } + } + + private void checkInitializationState() { + if (options.isStreaming() && this.flinkBatchEnv != null) { + throw new IllegalStateException("Attempted to run a Streaming Job with a Batch Execution Environment."); + } else if (!options.isStreaming() && this.flinkStreamEnv != null) { + throw new IllegalStateException("Attempted to run a Batch Job with a Streaming Execution Environment."); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java new file mode 100644 index 0000000..2f4b3ea --- /dev/null +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink; + + +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.google.cloud.dataflow.sdk.options.ApplicationNameOptions; +import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions; +import com.google.cloud.dataflow.sdk.options.Default; +import com.google.cloud.dataflow.sdk.options.Description; +import com.google.cloud.dataflow.sdk.options.PipelineOptions; +import com.google.cloud.dataflow.sdk.options.StreamingOptions; + +import java.util.List; + +/** + * Options which can be used to configure a Flink PipelineRunner. + */ +public interface FlinkPipelineOptions extends PipelineOptions, ApplicationNameOptions, StreamingOptions { + + /** + * List of local files to make available to workers. + * <p> + * Jars are placed on the worker's classpath. + * <p> + * The default value is the list of jars from the main program's classpath. + */ + @Description("Jar-Files to send to all workers and put on the classpath. " + + "The default value is all files from the classpath.") + @JsonIgnore + List<String> getFilesToStage(); + void setFilesToStage(List<String> value); + + /** + * The job name is used to identify jobs running on a Flink cluster. + */ + @Description("Dataflow job name, to uniquely identify active jobs. " + + "Defaults to using the ApplicationName-UserName-Date.") + @Default.InstanceFactory(DataflowPipelineOptions.JobNameFactory.class) + String getJobName(); + void setJobName(String value); + + /** + * The url of the Flink JobManager on which to execute pipelines. This can either be + * the the address of a cluster JobManager, in the form "host:port" or one of the special + * Strings "[local]", "[collection]" or "[auto]". "[local]" will start a local Flink + * Cluster in the JVM, "[collection]" will execute the pipeline on Java Collections while + * "[auto]" will let the system decide where to execute the pipeline based on the environment. + */ + @Description("Address of the Flink Master where the Pipeline should be executed. Can" + + " either be of the form \"host:port\" or one of the special values [local], " + + "[collection] or [auto].") + String getFlinkMaster(); + void setFlinkMaster(String value); + + @Description("The degree of parallelism to be used when distributing operations onto workers.") + @Default.Integer(-1) + Integer getParallelism(); + void setParallelism(Integer value); + + @Description("The interval between consecutive checkpoints (i.e. snapshots of the current pipeline state used for " + + "fault tolerance).") + @Default.Long(-1L) + Long getCheckpointingInterval(); + void setCheckpointingInterval(Long interval); + + @Description("Sets the number of times that failed tasks are re-executed. " + + "A value of zero effectively disables fault tolerance. A value of -1 indicates " + + "that the system default value (as defined in the configuration) should be used.") + @Default.Integer(-1) + Integer getNumberOfExecutionRetries(); + void setNumberOfExecutionRetries(Integer retries); + + @Description("Sets the delay between executions. A value of {@code -1} indicates that the default value should be used.") + @Default.Long(-1L) + Long getExecutionRetryDelay(); + void setExecutionRetryDelay(Long delay); +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java new file mode 100644 index 0000000..fe773d9 --- /dev/null +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink; + +import com.google.cloud.dataflow.sdk.Pipeline; +import com.google.cloud.dataflow.sdk.options.PipelineOptions; +import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; +import com.google.cloud.dataflow.sdk.options.PipelineOptionsValidator; +import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner; +import com.google.cloud.dataflow.sdk.runners.PipelineRunner; +import com.google.cloud.dataflow.sdk.transforms.PTransform; +import com.google.cloud.dataflow.sdk.values.PInput; +import com.google.cloud.dataflow.sdk.values.POutput; +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; +import org.apache.flink.api.common.JobExecutionResult; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.net.URISyntaxException; +import java.net.URL; +import java.net.URLClassLoader; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +/** + * A {@link PipelineRunner} that executes the operations in the + * pipeline by first translating them to a Flink Plan and then executing them either locally + * or on a Flink cluster, depending on the configuration. + * <p> + * This is based on {@link com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner}. + */ +public class FlinkPipelineRunner extends PipelineRunner<FlinkRunnerResult> { + + private static final Logger LOG = LoggerFactory.getLogger(FlinkPipelineRunner.class); + + /** + * Provided options. + */ + private final FlinkPipelineOptions options; + + private final FlinkPipelineExecutionEnvironment flinkJobEnv; + + /** + * Construct a runner from the provided options. + * + * @param options Properties which configure the runner. + * @return The newly created runner. + */ + public static FlinkPipelineRunner fromOptions(PipelineOptions options) { + FlinkPipelineOptions flinkOptions = + PipelineOptionsValidator.validate(FlinkPipelineOptions.class, options); + ArrayList<String> missing = new ArrayList<>(); + + if (flinkOptions.getAppName() == null) { + missing.add("appName"); + } + if (missing.size() > 0) { + throw new IllegalArgumentException( + "Missing required values: " + Joiner.on(',').join(missing)); + } + + if (flinkOptions.getFilesToStage() == null) { + flinkOptions.setFilesToStage(detectClassPathResourcesToStage( + DataflowPipelineRunner.class.getClassLoader())); + LOG.info("PipelineOptions.filesToStage was not specified. " + + "Defaulting to files from the classpath: will stage {} files. " + + "Enable logging at DEBUG level to see which files will be staged.", + flinkOptions.getFilesToStage().size()); + LOG.debug("Classpath elements: {}", flinkOptions.getFilesToStage()); + } + + // Verify jobName according to service requirements. + String jobName = flinkOptions.getJobName().toLowerCase(); + Preconditions.checkArgument(jobName.matches("[a-z]([-a-z0-9]*[a-z0-9])?"), "JobName invalid; " + + "the name must consist of only the characters " + "[-a-z0-9], starting with a letter " + + "and ending with a letter " + "or number"); + Preconditions.checkArgument(jobName.length() <= 40, + "JobName too long; must be no more than 40 characters in length"); + + // Set Flink Master to [auto] if no option was specified. + if (flinkOptions.getFlinkMaster() == null) { + flinkOptions.setFlinkMaster("[auto]"); + } + + return new FlinkPipelineRunner(flinkOptions); + } + + private FlinkPipelineRunner(FlinkPipelineOptions options) { + this.options = options; + this.flinkJobEnv = new FlinkPipelineExecutionEnvironment(options); + } + + @Override + public FlinkRunnerResult run(Pipeline pipeline) { + LOG.info("Executing pipeline using FlinkPipelineRunner."); + + LOG.info("Translating pipeline to Flink program."); + + this.flinkJobEnv.translate(pipeline); + + LOG.info("Starting execution of Flink program."); + + JobExecutionResult result; + try { + result = this.flinkJobEnv.executePipeline(); + } catch (Exception e) { + LOG.error("Pipeline execution failed", e); + throw new RuntimeException("Pipeline execution failed", e); + } + + LOG.info("Execution finished in {} msecs", result.getNetRuntime()); + + Map<String, Object> accumulators = result.getAllAccumulatorResults(); + if (accumulators != null && !accumulators.isEmpty()) { + LOG.info("Final aggregator values:"); + + for (Map.Entry<String, Object> entry : result.getAllAccumulatorResults().entrySet()) { + LOG.info("{} : {}", entry.getKey(), entry.getValue()); + } + } + + return new FlinkRunnerResult(accumulators, result.getNetRuntime()); + } + + /** + * For testing. + */ + public FlinkPipelineOptions getPipelineOptions() { + return options; + } + + /** + * Constructs a runner with default properties for testing. + * + * @return The newly created runner. + */ + public static FlinkPipelineRunner createForTest(boolean streaming) { + FlinkPipelineOptions options = PipelineOptionsFactory.as(FlinkPipelineOptions.class); + // we use [auto] for testing since this will make it pick up the Testing + // ExecutionEnvironment + options.setFlinkMaster("[auto]"); + options.setStreaming(streaming); + return new FlinkPipelineRunner(options); + } + + @Override + public <Output extends POutput, Input extends PInput> Output apply( + PTransform<Input, Output> transform, Input input) { + return super.apply(transform, input); + } + + ///////////////////////////////////////////////////////////////////////////// + + @Override + public String toString() { + return "DataflowPipelineRunner#" + hashCode(); + } + + /** + * Attempts to detect all the resources the class loader has access to. This does not recurse + * to class loader parents stopping it from pulling in resources from the system class loader. + * + * @param classLoader The URLClassLoader to use to detect resources to stage. + * @return A list of absolute paths to the resources the class loader uses. + * @throws IllegalArgumentException If either the class loader is not a URLClassLoader or one + * of the resources the class loader exposes is not a file resource. + */ + protected static List<String> detectClassPathResourcesToStage(ClassLoader classLoader) { + if (!(classLoader instanceof URLClassLoader)) { + String message = String.format("Unable to use ClassLoader to detect classpath elements. " + + "Current ClassLoader is %s, only URLClassLoaders are supported.", classLoader); + LOG.error(message); + throw new IllegalArgumentException(message); + } + + List<String> files = new ArrayList<>(); + for (URL url : ((URLClassLoader) classLoader).getURLs()) { + try { + files.add(new File(url.toURI()).getAbsolutePath()); + } catch (IllegalArgumentException | URISyntaxException e) { + String message = String.format("Unable to convert url (%s) to file.", url); + LOG.error(message); + throw new IllegalArgumentException(message, e); + } + } + return files; + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java new file mode 100644 index 0000000..8fd08ec --- /dev/null +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink; + +import com.google.cloud.dataflow.sdk.PipelineResult; +import com.google.cloud.dataflow.sdk.runners.AggregatorRetrievalException; +import com.google.cloud.dataflow.sdk.runners.AggregatorValues; +import com.google.cloud.dataflow.sdk.transforms.Aggregator; + +import java.util.Collections; +import java.util.Map; + +/** + * Result of executing a {@link com.google.cloud.dataflow.sdk.Pipeline} with Flink. This + * has methods to query to job runtime and the final values of + * {@link com.google.cloud.dataflow.sdk.transforms.Aggregator}s. + */ +public class FlinkRunnerResult implements PipelineResult { + + private final Map<String, Object> aggregators; + + private final long runtime; + + public FlinkRunnerResult(Map<String, Object> aggregators, long runtime) { + this.aggregators = (aggregators == null || aggregators.isEmpty()) ? + Collections.<String, Object>emptyMap() : + Collections.unmodifiableMap(aggregators); + + this.runtime = runtime; + } + + @Override + public State getState() { + return null; + } + + @Override + public <T> AggregatorValues<T> getAggregatorValues(final Aggregator<?, T> aggregator) throws AggregatorRetrievalException { + // TODO provide a list of all accumulator step values + Object value = aggregators.get(aggregator.getName()); + if (value != null) { + return new AggregatorValues<T>() { + @Override + public Map<String, T> getValuesAtSteps() { + return (Map<String, T>) aggregators; + } + }; + } else { + throw new AggregatorRetrievalException("Accumulator results not found.", + new RuntimeException("Accumulator does not exist.")); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/io/ConsoleIO.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/io/ConsoleIO.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/io/ConsoleIO.java new file mode 100644 index 0000000..71e3b54 --- /dev/null +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/io/ConsoleIO.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.io; + +import com.google.cloud.dataflow.sdk.transforms.PTransform; +import com.google.cloud.dataflow.sdk.values.PCollection; +import com.google.cloud.dataflow.sdk.values.PDone; + +/** + * Transform for printing the contents of a {@link com.google.cloud.dataflow.sdk.values.PCollection}. + * to standard output. + * + * This is Flink-specific and will only work when executed using the + * {@link org.apache.beam.runners.flink.FlinkPipelineRunner}. + */ +public class ConsoleIO { + + /** + * A PTransform that writes a PCollection to a standard output. + */ + public static class Write { + + /** + * Returns a ConsoleIO.Write PTransform with a default step name. + */ + public static Bound create() { + return new Bound(); + } + + /** + * Returns a ConsoleIO.Write PTransform with the given step name. + */ + public static Bound named(String name) { + return new Bound().named(name); + } + + /** + * A PTransform that writes a bounded PCollection to standard output. + */ + public static class Bound extends PTransform<PCollection<?>, PDone> { + private static final long serialVersionUID = 0; + + Bound() { + super("ConsoleIO.Write"); + } + + Bound(String name) { + super(name); + } + + /** + * Returns a new ConsoleIO.Write PTransform that's like this one but with the given + * step + * name. Does not modify this object. + */ + public Bound named(String name) { + return new Bound(name); + } + + @Override + public PDone apply(PCollection<?> input) { + return PDone.in(input.getPipeline()); + } + } + } +} + http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchPipelineTranslator.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchPipelineTranslator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchPipelineTranslator.java new file mode 100644 index 0000000..28a10b7 --- /dev/null +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchPipelineTranslator.java @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.translation; + +import com.google.cloud.dataflow.sdk.options.PipelineOptions; +import com.google.cloud.dataflow.sdk.runners.TransformTreeNode; +import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform; +import com.google.cloud.dataflow.sdk.transforms.PTransform; +import com.google.cloud.dataflow.sdk.transforms.join.CoGroupByKey; +import com.google.cloud.dataflow.sdk.values.PValue; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * FlinkBatchPipelineTranslator knows how to translate Pipeline objects into Flink Jobs. + * This is based on {@link com.google.cloud.dataflow.sdk.runners.DataflowPipelineTranslator} + */ +public class FlinkBatchPipelineTranslator extends FlinkPipelineTranslator { + + private static final Logger LOG = LoggerFactory.getLogger(FlinkBatchPipelineTranslator.class); + + /** + * The necessary context in the case of a batch job. + */ + private final FlinkBatchTranslationContext batchContext; + + private int depth = 0; + + /** + * Composite transform that we want to translate before proceeding with other transforms. + */ + private PTransform<?, ?> currentCompositeTransform; + + public FlinkBatchPipelineTranslator(ExecutionEnvironment env, PipelineOptions options) { + this.batchContext = new FlinkBatchTranslationContext(env, options); + } + + // -------------------------------------------------------------------------------------------- + // Pipeline Visitor Methods + // -------------------------------------------------------------------------------------------- + + @Override + public void enterCompositeTransform(TransformTreeNode node) { + LOG.info(genSpaces(this.depth) + "enterCompositeTransform- " + formatNodeName(node)); + + PTransform<?, ?> transform = node.getTransform(); + if (transform != null && currentCompositeTransform == null) { + + BatchTransformTranslator<?> translator = FlinkBatchTransformTranslators.getTranslator(transform); + if (translator != null) { + currentCompositeTransform = transform; + if (transform instanceof CoGroupByKey && node.getInput().expand().size() != 2) { + // we can only optimize CoGroupByKey for input size 2 + currentCompositeTransform = null; + } + } + } + this.depth++; + } + + @Override + public void leaveCompositeTransform(TransformTreeNode node) { + PTransform<?, ?> transform = node.getTransform(); + if (transform != null && currentCompositeTransform == transform) { + + BatchTransformTranslator<?> translator = FlinkBatchTransformTranslators.getTranslator(transform); + if (translator != null) { + LOG.info(genSpaces(this.depth) + "doingCompositeTransform- " + formatNodeName(node)); + applyBatchTransform(transform, node, translator); + currentCompositeTransform = null; + } else { + throw new IllegalStateException("Attempted to translate composite transform " + + "but no translator was found: " + currentCompositeTransform); + } + } + this.depth--; + LOG.info(genSpaces(this.depth) + "leaveCompositeTransform- " + formatNodeName(node)); + } + + @Override + public void visitTransform(TransformTreeNode node) { + LOG.info(genSpaces(this.depth) + "visitTransform- " + formatNodeName(node)); + if (currentCompositeTransform != null) { + // ignore it + return; + } + + // get the transformation corresponding to hte node we are + // currently visiting and translate it into its Flink alternative. + + PTransform<?, ?> transform = node.getTransform(); + BatchTransformTranslator<?> translator = FlinkBatchTransformTranslators.getTranslator(transform); + if (translator == null) { + LOG.info(node.getTransform().getClass().toString()); + throw new UnsupportedOperationException("The transform " + transform + " is currently not supported."); + } + applyBatchTransform(transform, node, translator); + } + + @Override + public void visitValue(PValue value, TransformTreeNode producer) { + // do nothing here + } + + private <T extends PTransform<?, ?>> void applyBatchTransform(PTransform<?, ?> transform, TransformTreeNode node, BatchTransformTranslator<?> translator) { + + @SuppressWarnings("unchecked") + T typedTransform = (T) transform; + + @SuppressWarnings("unchecked") + BatchTransformTranslator<T> typedTranslator = (BatchTransformTranslator<T>) translator; + + // create the applied PTransform on the batchContext + batchContext.setCurrentTransform(AppliedPTransform.of( + node.getFullName(), node.getInput(), node.getOutput(), (PTransform) transform)); + typedTranslator.translateNode(typedTransform, batchContext); + } + + /** + * A translator of a {@link PTransform}. + */ + public interface BatchTransformTranslator<Type extends PTransform> { + void translateNode(Type transform, FlinkBatchTranslationContext context); + } + + private static String genSpaces(int n) { + String s = ""; + for (int i = 0; i < n; i++) { + s += "| "; + } + return s; + } + + private static String formatNodeName(TransformTreeNode node) { + return node.toString().split("@")[1] + node.getTransform(); + } +}
