http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/pom.xml ---------------------------------------------------------------------- diff --git a/runners/flink/runner/pom.xml b/runners/flink/runner/pom.xml deleted file mode 100644 index 18343ef..0000000 --- a/runners/flink/runner/pom.xml +++ /dev/null @@ -1,330 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - Licensed to the Apache Software Foundation (ASF) under one or more - contributor license agreements. See the NOTICE file distributed with - this work for additional information regarding copyright ownership. - The ASF licenses this file to You under the Apache License, Version 2.0 - (the "License"); you may not use this file except in compliance with - the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. ---> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - - <modelVersion>4.0.0</modelVersion> - - <parent> - <groupId>org.apache.beam</groupId> - <artifactId>beam-runners-flink-parent</artifactId> - <version>0.7.0-SNAPSHOT</version> - <relativePath>../pom.xml</relativePath> - </parent> - - <artifactId>beam-runners-flink_2.10</artifactId> - - <name>Apache Beam :: Runners :: Flink :: Core</name> - - <packaging>jar</packaging> - - <profiles> - <profile> - <id>local-validates-runner-tests</id> - <activation><activeByDefault>false</activeByDefault></activation> - <build> - <plugins> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-surefire-plugin</artifactId> - <executions> - - <!-- This configures the inherited validates-runner-tests - execution to execute with a local Flink instance. --> - <execution> - <id>validates-runner-tests</id> - <phase>integration-test</phase> - <goals> - <goal>test</goal> - </goals> - <configuration> - <groups>org.apache.beam.sdk.testing.ValidatesRunner</groups> - <excludedGroups> - org.apache.beam.sdk.testing.FlattenWithHeterogeneousCoders, - org.apache.beam.sdk.testing.UsesSplittableParDo, - org.apache.beam.sdk.testing.UsesAttemptedMetrics, - org.apache.beam.sdk.testing.UsesCommittedMetrics, - org.apache.beam.sdk.testing.UsesTestStream - </excludedGroups> - <parallel>none</parallel> - <failIfNoTests>true</failIfNoTests> - <dependenciesToScan> - <dependency>org.apache.beam:beam-sdks-java-core</dependency> - </dependenciesToScan> - <systemPropertyVariables> - <beamTestPipelineOptions> - [ - "--runner=TestFlinkRunner", - "--streaming=false" - ] - </beamTestPipelineOptions> - </systemPropertyVariables> - </configuration> - </execution> - - <!-- This second execution runs the tests in streaming mode --> - <execution> - <id>streaming-validates-runner-tests</id> - <phase>integration-test</phase> - <goals> - <goal>test</goal> - </goals> - <configuration> - <groups>org.apache.beam.sdk.testing.ValidatesRunner</groups> - <excludedGroups> - org.apache.beam.sdk.testing.FlattenWithHeterogeneousCoders, - org.apache.beam.sdk.testing.UsesSetState, - org.apache.beam.sdk.testing.UsesMapState, - org.apache.beam.sdk.testing.UsesAttemptedMetrics, - org.apache.beam.sdk.testing.UsesCommittedMetrics, - org.apache.beam.sdk.testing.UsesTestStream, - org.apache.beam.sdk.testing.UsesSplittableParDoWithWindowedSideInputs - </excludedGroups> - <parallel>none</parallel> - <failIfNoTests>true</failIfNoTests> - <dependenciesToScan> - <dependency>org.apache.beam:beam-sdks-java-core</dependency> - </dependenciesToScan> - <systemPropertyVariables> - <beamTestPipelineOptions> - [ - "--runner=TestFlinkRunner", - "--streaming=true" - ] - </beamTestPipelineOptions> - </systemPropertyVariables> - </configuration> - </execution> - </executions> - </plugin> - </plugins> - </build> - </profile> - </profiles> - - <build> - <plugins> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-compiler-plugin</artifactId> - </plugin> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-jar-plugin</artifactId> - </plugin> - - <!-- Integration Tests --> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-failsafe-plugin</artifactId> - </plugin> - - <!-- Unit Tests --> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-surefire-plugin</artifactId> - </plugin> - </plugins> - </build> - - <dependencies> - <!-- Flink dependencies --> - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-java</artifactId> - <version>${flink.version}</version> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-clients_2.10</artifactId> - <version>${flink.version}</version> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-runtime_2.10</artifactId> - <version>${flink.version}</version> - </dependency> - - <!-- For testing --> - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-core</artifactId> - <version>${flink.version}</version> - <type>test-jar</type> - <scope>test</scope> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-runtime_2.10</artifactId> - <version>${flink.version}</version> - <type>test-jar</type> - <scope>test</scope> - </dependency> - - <!-- Beam --> - <dependency> - <groupId>org.apache.beam</groupId> - <artifactId>beam-sdks-java-core</artifactId> - <exclusions> - <exclusion> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-jdk14</artifactId> - </exclusion> - </exclusions> - </dependency> - - <dependency> - <groupId>org.apache.beam</groupId> - <artifactId>beam-runners-core-java</artifactId> - <exclusions> - <exclusion> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-jdk14</artifactId> - </exclusion> - </exclusions> - </dependency> - - <dependency> - <groupId>org.apache.beam</groupId> - <artifactId>beam-runners-core-construction-java</artifactId> - <exclusions> - <exclusion> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-jdk14</artifactId> - </exclusion> - </exclusions> - </dependency> - - <dependency> - <groupId>com.fasterxml.jackson.core</groupId> - <artifactId>jackson-annotations</artifactId> - </dependency> - - <dependency> - <groupId>com.fasterxml.jackson.core</groupId> - <artifactId>jackson-databind</artifactId> - </dependency> - - <dependency> - <groupId>com.google.guava</groupId> - <artifactId>guava</artifactId> - </dependency> - - <dependency> - <groupId>com.google.code.findbugs</groupId> - <artifactId>jsr305</artifactId> - </dependency> - - <!-- - Force an upgrade on the version of Apache Commons from Flink to support DEFLATE compression. - --> - <dependency> - <groupId>org.apache.commons</groupId> - <artifactId>commons-compress</artifactId> - <scope>runtime</scope> - </dependency> - - <!-- Test scoped --> - <dependency> - <groupId>com.google.apis</groupId> - <artifactId>google-api-services-bigquery</artifactId> - <scope>test</scope> - </dependency> - - <dependency> - <groupId>org.apache.commons</groupId> - <artifactId>commons-lang3</artifactId> - <scope>test</scope> - </dependency> - - <dependency> - <groupId>org.hamcrest</groupId> - <artifactId>hamcrest-all</artifactId> - <scope>test</scope> - </dependency> - - <dependency> - <groupId>junit</groupId> - <artifactId>junit</artifactId> - <scope>test</scope> - </dependency> - - <dependency> - <groupId>org.mockito</groupId> - <artifactId>mockito-all</artifactId> - <scope>test</scope> - </dependency> - - <!-- Depend on test jar to scan for ValidatesRunner tests --> - <dependency> - <groupId>org.apache.beam</groupId> - <artifactId>beam-sdks-java-core</artifactId> - <classifier>tests</classifier> - <scope>test</scope> - <exclusions> - <exclusion> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-jdk14</artifactId> - </exclusion> - </exclusions> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-streaming-java_2.10</artifactId> - <version>${flink.version}</version> - <scope>test</scope> - <type>test-jar</type> - </dependency> - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-test-utils_2.10</artifactId> - <version>${flink.version}</version> - <scope>test</scope> - <exclusions> - <exclusion> - <artifactId>apacheds-jdbm1</artifactId> - <groupId>org.apache.directory.jdbm</groupId> - </exclusion> - </exclusions> - </dependency> - - <!-- Optional Pipeline Registration --> - <dependency> - <groupId>com.google.auto.service</groupId> - <artifactId>auto-service</artifactId> - <optional>true</optional> - </dependency> - - <!-- transitive test dependencies from beam-sdk-java-core --> - <dependency> - <groupId>com.fasterxml.jackson.dataformat</groupId> - <artifactId>jackson-dataformat-yaml</artifactId> - <scope>test</scope> - </dependency> - - <dependency> - <groupId>org.apache.beam</groupId> - <artifactId>beam-sdks-common-fn-api</artifactId> - <type>test-jar</type> - <scope>test</scope> - </dependency> - </dependencies> -</project>
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/DefaultParallelismFactory.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/DefaultParallelismFactory.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/DefaultParallelismFactory.java deleted file mode 100644 index b745f0b..0000000 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/DefaultParallelismFactory.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink; - -import org.apache.beam.sdk.options.DefaultValueFactory; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.flink.configuration.ConfigConstants; -import org.apache.flink.configuration.GlobalConfiguration; - -/** - * {@link DefaultValueFactory} for getting a default value for the parallelism option - * on {@link FlinkPipelineOptions}. - * - * <p>This will return either the default value from {@link GlobalConfiguration} or {@code 1}. - * A valid {@link GlobalConfiguration} is only available if the program is executed by the Flink - * run scripts. - */ -public class DefaultParallelismFactory implements DefaultValueFactory<Integer> { - @Override - public Integer create(PipelineOptions options) { - return GlobalConfiguration.loadConfiguration() - .getInteger(ConfigConstants.DEFAULT_PARALLELISM_KEY, 1); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkBatchPipelineTranslator.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkBatchPipelineTranslator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkBatchPipelineTranslator.java deleted file mode 100644 index 854b674..0000000 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkBatchPipelineTranslator.java +++ /dev/null @@ -1,139 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink; - -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.runners.TransformHierarchy; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.io.DiscardingOutputFormat; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * {@link Pipeline.PipelineVisitor} for executing a {@link Pipeline} as a - * Flink batch job. - */ -class FlinkBatchPipelineTranslator extends FlinkPipelineTranslator { - - private static final Logger LOG = LoggerFactory.getLogger(FlinkBatchPipelineTranslator.class); - - /** - * The necessary context in the case of a batch job. - */ - private final FlinkBatchTranslationContext batchContext; - - private int depth = 0; - - public FlinkBatchPipelineTranslator(ExecutionEnvironment env, PipelineOptions options) { - this.batchContext = new FlinkBatchTranslationContext(env, options); - } - - @Override - @SuppressWarnings("rawtypes, unchecked") - public void translate(Pipeline pipeline) { - super.translate(pipeline); - - // terminate dangling DataSets - for (DataSet<?> dataSet: batchContext.getDanglingDataSets().values()) { - dataSet.output(new DiscardingOutputFormat()); - } - } - - // -------------------------------------------------------------------------------------------- - // Pipeline Visitor Methods - // -------------------------------------------------------------------------------------------- - - @Override - public CompositeBehavior enterCompositeTransform(TransformHierarchy.Node node) { - LOG.info("{} enterCompositeTransform- {}", genSpaces(this.depth), node.getFullName()); - this.depth++; - - BatchTransformTranslator<?> translator = getTranslator(node); - - if (translator != null) { - applyBatchTransform(node.getTransform(), node, translator); - LOG.info("{} translated- {}", genSpaces(this.depth), node.getFullName()); - return CompositeBehavior.DO_NOT_ENTER_TRANSFORM; - } else { - return CompositeBehavior.ENTER_TRANSFORM; - } - } - - @Override - public void leaveCompositeTransform(TransformHierarchy.Node node) { - this.depth--; - LOG.info("{} leaveCompositeTransform- {}", genSpaces(this.depth), node.getFullName()); - } - - @Override - public void visitPrimitiveTransform(TransformHierarchy.Node node) { - LOG.info("{} visitPrimitiveTransform- {}", genSpaces(this.depth), node.getFullName()); - - // get the transformation corresponding to the node we are - // currently visiting and translate it into its Flink alternative. - PTransform<?, ?> transform = node.getTransform(); - BatchTransformTranslator<?> translator = - FlinkBatchTransformTranslators.getTranslator(transform); - if (translator == null) { - LOG.info(node.getTransform().getClass().toString()); - throw new UnsupportedOperationException("The transform " + transform - + " is currently not supported."); - } - applyBatchTransform(transform, node, translator); - } - - private <T extends PTransform<?, ?>> void applyBatchTransform( - PTransform<?, ?> transform, - TransformHierarchy.Node node, - BatchTransformTranslator<?> translator) { - - @SuppressWarnings("unchecked") - T typedTransform = (T) transform; - - @SuppressWarnings("unchecked") - BatchTransformTranslator<T> typedTranslator = (BatchTransformTranslator<T>) translator; - - // create the applied PTransform on the batchContext - batchContext.setCurrentTransform(node.toAppliedPTransform()); - typedTranslator.translateNode(typedTransform, batchContext); - } - - /** - * A translator of a {@link PTransform}. - */ - public interface BatchTransformTranslator<TransformT extends PTransform> { - void translateNode(TransformT transform, FlinkBatchTranslationContext context); - } - - /** - * Returns a translator for the given node, if it is possible, otherwise null. - */ - private static BatchTransformTranslator<?> getTranslator(TransformHierarchy.Node node) { - PTransform<?, ?> transform = node.getTransform(); - - // Root of the graph is null - if (transform == null) { - return null; - } - - return FlinkBatchTransformTranslators.getTranslator(transform); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java deleted file mode 100644 index ff9521c..0000000 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java +++ /dev/null @@ -1,723 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink; - -import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkState; - -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import org.apache.beam.runners.flink.translation.functions.FlinkAssignWindows; -import org.apache.beam.runners.flink.translation.functions.FlinkDoFnFunction; -import org.apache.beam.runners.flink.translation.functions.FlinkMergingNonShuffleReduceFunction; -import org.apache.beam.runners.flink.translation.functions.FlinkMergingPartialReduceFunction; -import org.apache.beam.runners.flink.translation.functions.FlinkMergingReduceFunction; -import org.apache.beam.runners.flink.translation.functions.FlinkMultiOutputPruningFunction; -import org.apache.beam.runners.flink.translation.functions.FlinkPartialReduceFunction; -import org.apache.beam.runners.flink.translation.functions.FlinkReduceFunction; -import org.apache.beam.runners.flink.translation.functions.FlinkStatefulDoFnFunction; -import org.apache.beam.runners.flink.translation.types.CoderTypeInformation; -import org.apache.beam.runners.flink.translation.types.KvKeySelector; -import org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat; -import org.apache.beam.sdk.coders.CannotProvideCoderException; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.CoderRegistry; -import org.apache.beam.sdk.coders.KvCoder; -import org.apache.beam.sdk.coders.ListCoder; -import org.apache.beam.sdk.coders.VoidCoder; -import org.apache.beam.sdk.io.BoundedSource; -import org.apache.beam.sdk.io.Read; -import org.apache.beam.sdk.transforms.Combine; -import org.apache.beam.sdk.transforms.CombineFnBase; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.Flatten; -import org.apache.beam.sdk.transforms.GroupByKey; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.View; -import org.apache.beam.sdk.transforms.join.RawUnionValue; -import org.apache.beam.sdk.transforms.join.UnionCoder; -import org.apache.beam.sdk.transforms.reflect.DoFnSignature; -import org.apache.beam.sdk.transforms.reflect.DoFnSignatures; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.GlobalWindow; -import org.apache.beam.sdk.transforms.windowing.IntervalWindow; -import org.apache.beam.sdk.transforms.windowing.Window; -import org.apache.beam.sdk.transforms.windowing.WindowFn; -import org.apache.beam.sdk.util.Reshuffle; -import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.util.WindowingStrategy; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionView; -import org.apache.beam.sdk.values.PValue; -import org.apache.beam.sdk.values.TupleTag; -import org.apache.flink.api.common.functions.FilterFunction; -import org.apache.flink.api.common.functions.FlatMapFunction; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.operators.DataSource; -import org.apache.flink.api.java.operators.FlatMapOperator; -import org.apache.flink.api.java.operators.GroupCombineOperator; -import org.apache.flink.api.java.operators.GroupReduceOperator; -import org.apache.flink.api.java.operators.Grouping; -import org.apache.flink.api.java.operators.MapPartitionOperator; -import org.apache.flink.api.java.operators.SingleInputUdfOperator; -import org.apache.flink.util.Collector; - -/** - * Translators for transforming {@link PTransform PTransforms} to - * Flink {@link DataSet DataSets}. - */ -class FlinkBatchTransformTranslators { - - // -------------------------------------------------------------------------------------------- - // Transform Translator Registry - // -------------------------------------------------------------------------------------------- - - @SuppressWarnings("rawtypes") - private static final Map< - Class<? extends PTransform>, - FlinkBatchPipelineTranslator.BatchTransformTranslator> TRANSLATORS = new HashMap<>(); - - static { - TRANSLATORS.put(View.CreatePCollectionView.class, new CreatePCollectionViewTranslatorBatch()); - - TRANSLATORS.put(Combine.PerKey.class, new CombinePerKeyTranslatorBatch()); - TRANSLATORS.put(GroupByKey.class, new GroupByKeyTranslatorBatch()); - TRANSLATORS.put(Reshuffle.class, new ReshuffleTranslatorBatch()); - - TRANSLATORS.put(Flatten.PCollections.class, new FlattenPCollectionTranslatorBatch()); - - TRANSLATORS.put(Window.Assign.class, new WindowAssignTranslatorBatch()); - - TRANSLATORS.put(ParDo.MultiOutput.class, new ParDoTranslatorBatch()); - - TRANSLATORS.put(Read.Bounded.class, new ReadSourceTranslatorBatch()); - } - - - static FlinkBatchPipelineTranslator.BatchTransformTranslator<?> getTranslator( - PTransform<?, ?> transform) { - return TRANSLATORS.get(transform.getClass()); - } - - private static class ReadSourceTranslatorBatch<T> - implements FlinkBatchPipelineTranslator.BatchTransformTranslator<Read.Bounded<T>> { - - @Override - public void translateNode(Read.Bounded<T> transform, FlinkBatchTranslationContext context) { - String name = transform.getName(); - BoundedSource<T> source = transform.getSource(); - PCollection<T> output = context.getOutput(transform); - - TypeInformation<WindowedValue<T>> typeInformation = context.getTypeInfo(output); - - DataSource<WindowedValue<T>> dataSource = new DataSource<>( - context.getExecutionEnvironment(), - new SourceInputFormat<>(source, context.getPipelineOptions()), - typeInformation, - name); - - context.setOutputDataSet(output, dataSource); - } - } - - private static class WindowAssignTranslatorBatch<T> - implements FlinkBatchPipelineTranslator.BatchTransformTranslator<Window.Assign<T>> { - - @Override - public void translateNode(Window.Assign<T> transform, FlinkBatchTranslationContext context) { - PValue input = context.getInput(transform); - - TypeInformation<WindowedValue<T>> resultTypeInfo = - context.getTypeInfo(context.getOutput(transform)); - - DataSet<WindowedValue<T>> inputDataSet = context.getInputDataSet(input); - - @SuppressWarnings("unchecked") - final WindowingStrategy<T, ? extends BoundedWindow> windowingStrategy = - (WindowingStrategy<T, ? extends BoundedWindow>) - context.getOutput(transform).getWindowingStrategy(); - - WindowFn<T, ? extends BoundedWindow> windowFn = windowingStrategy.getWindowFn(); - - FlinkAssignWindows<T, ? extends BoundedWindow> assignWindowsFunction = - new FlinkAssignWindows<>(windowFn); - - DataSet<WindowedValue<T>> resultDataSet = inputDataSet - .flatMap(assignWindowsFunction) - .name(context.getOutput(transform).getName()) - .returns(resultTypeInfo); - - context.setOutputDataSet(context.getOutput(transform), resultDataSet); - } - } - - private static class GroupByKeyTranslatorBatch<K, InputT> - implements FlinkBatchPipelineTranslator.BatchTransformTranslator<GroupByKey<K, InputT>> { - - @Override - public void translateNode( - GroupByKey<K, InputT> transform, - FlinkBatchTranslationContext context) { - - // for now, this is copied from the Combine.PerKey translater. Once we have the new runner API - // we can replace GroupByKey by a Combine.PerKey with the Concatenate CombineFn - - DataSet<WindowedValue<KV<K, InputT>>> inputDataSet = - context.getInputDataSet(context.getInput(transform)); - - Combine.KeyedCombineFn<K, InputT, List<InputT>, List<InputT>> combineFn = - new Concatenate<InputT>().asKeyedFn(); - - KvCoder<K, InputT> inputCoder = - (KvCoder<K, InputT>) context.getInput(transform).getCoder(); - - Coder<List<InputT>> accumulatorCoder; - - try { - accumulatorCoder = - combineFn.getAccumulatorCoder( - context.getInput(transform).getPipeline().getCoderRegistry(), - inputCoder.getKeyCoder(), - inputCoder.getValueCoder()); - } catch (CannotProvideCoderException e) { - throw new RuntimeException(e); - } - - WindowingStrategy<?, ?> windowingStrategy = - context.getInput(transform).getWindowingStrategy(); - - TypeInformation<WindowedValue<KV<K, List<InputT>>>> partialReduceTypeInfo = - new CoderTypeInformation<>( - WindowedValue.getFullCoder( - KvCoder.of(inputCoder.getKeyCoder(), accumulatorCoder), - windowingStrategy.getWindowFn().windowCoder())); - - - Grouping<WindowedValue<KV<K, InputT>>> inputGrouping = - inputDataSet.groupBy(new KvKeySelector<InputT, K>(inputCoder.getKeyCoder())); - - FlinkPartialReduceFunction<K, InputT, List<InputT>, ?> partialReduceFunction; - FlinkReduceFunction<K, List<InputT>, List<InputT>, ?> reduceFunction; - - if (windowingStrategy.getWindowFn().isNonMerging()) { - @SuppressWarnings("unchecked") - WindowingStrategy<?, BoundedWindow> boundedStrategy = - (WindowingStrategy<?, BoundedWindow>) windowingStrategy; - - partialReduceFunction = new FlinkPartialReduceFunction<>( - combineFn, - boundedStrategy, - Collections.<PCollectionView<?>, WindowingStrategy<?, ?>>emptyMap(), - context.getPipelineOptions()); - - reduceFunction = new FlinkReduceFunction<>( - combineFn, - boundedStrategy, - Collections.<PCollectionView<?>, WindowingStrategy<?, ?>>emptyMap(), - context.getPipelineOptions()); - - } else { - if (!windowingStrategy.getWindowFn().windowCoder().equals(IntervalWindow.getCoder())) { - throw new UnsupportedOperationException( - "Merging WindowFn with windows other than IntervalWindow are not supported."); - } - - @SuppressWarnings("unchecked") - WindowingStrategy<?, IntervalWindow> intervalStrategy = - (WindowingStrategy<?, IntervalWindow>) windowingStrategy; - - partialReduceFunction = new FlinkMergingPartialReduceFunction<>( - combineFn, - intervalStrategy, - Collections.<PCollectionView<?>, WindowingStrategy<?, ?>>emptyMap(), - context.getPipelineOptions()); - - reduceFunction = new FlinkMergingReduceFunction<>( - combineFn, - intervalStrategy, - Collections.<PCollectionView<?>, WindowingStrategy<?, ?>>emptyMap(), - context.getPipelineOptions()); - } - - // Partially GroupReduce the values into the intermediate format AccumT (combine) - GroupCombineOperator< - WindowedValue<KV<K, InputT>>, - WindowedValue<KV<K, List<InputT>>>> groupCombine = - new GroupCombineOperator<>( - inputGrouping, - partialReduceTypeInfo, - partialReduceFunction, - "GroupCombine: " + transform.getName()); - - Grouping<WindowedValue<KV<K, List<InputT>>>> intermediateGrouping = - groupCombine.groupBy(new KvKeySelector<List<InputT>, K>(inputCoder.getKeyCoder())); - - // Fully reduce the values and create output format VO - GroupReduceOperator< - WindowedValue<KV<K, List<InputT>>>, WindowedValue<KV<K, List<InputT>>>> outputDataSet = - new GroupReduceOperator<>( - intermediateGrouping, partialReduceTypeInfo, reduceFunction, transform.getName()); - - context.setOutputDataSet(context.getOutput(transform), outputDataSet); - - } - - } - - private static class ReshuffleTranslatorBatch<K, InputT> - implements FlinkBatchPipelineTranslator.BatchTransformTranslator<Reshuffle<K, InputT>> { - - @Override - public void translateNode( - Reshuffle<K, InputT> transform, - FlinkBatchTranslationContext context) { - - DataSet<WindowedValue<KV<K, InputT>>> inputDataSet = - context.getInputDataSet(context.getInput(transform)); - - context.setOutputDataSet(context.getOutput(transform), inputDataSet.rebalance()); - - } - - } - - /** - * Combiner that combines {@code T}s into a single {@code List<T>} containing all inputs. - * - * <p>For internal use to translate {@link GroupByKey}. For a large {@link PCollection} this - * is expected to crash! - * - * <p>This is copied from the dataflow runner code. - * - * @param <T> the type of elements to concatenate. - */ - private static class Concatenate<T> extends Combine.CombineFn<T, List<T>, List<T>> { - @Override - public List<T> createAccumulator() { - return new ArrayList<>(); - } - - @Override - public List<T> addInput(List<T> accumulator, T input) { - accumulator.add(input); - return accumulator; - } - - @Override - public List<T> mergeAccumulators(Iterable<List<T>> accumulators) { - List<T> result = createAccumulator(); - for (List<T> accumulator : accumulators) { - result.addAll(accumulator); - } - return result; - } - - @Override - public List<T> extractOutput(List<T> accumulator) { - return accumulator; - } - - @Override - public Coder<List<T>> getAccumulatorCoder(CoderRegistry registry, Coder<T> inputCoder) { - return ListCoder.of(inputCoder); - } - - @Override - public Coder<List<T>> getDefaultOutputCoder(CoderRegistry registry, Coder<T> inputCoder) { - return ListCoder.of(inputCoder); - } - } - - - private static class CombinePerKeyTranslatorBatch<K, InputT, AccumT, OutputT> - implements FlinkBatchPipelineTranslator.BatchTransformTranslator< - Combine.PerKey<K, InputT, OutputT>> { - - @Override - @SuppressWarnings("unchecked") - public void translateNode( - Combine.PerKey<K, InputT, OutputT> transform, - FlinkBatchTranslationContext context) { - DataSet<WindowedValue<KV<K, InputT>>> inputDataSet = - context.getInputDataSet(context.getInput(transform)); - - CombineFnBase.PerKeyCombineFn<K, InputT, AccumT, OutputT> combineFn = - (CombineFnBase.PerKeyCombineFn<K, InputT, AccumT, OutputT>) transform.getFn(); - - KvCoder<K, InputT> inputCoder = - (KvCoder<K, InputT>) context.getInput(transform).getCoder(); - - Coder<AccumT> accumulatorCoder; - - try { - accumulatorCoder = - combineFn.getAccumulatorCoder( - context.getInput(transform).getPipeline().getCoderRegistry(), - inputCoder.getKeyCoder(), - inputCoder.getValueCoder()); - } catch (CannotProvideCoderException e) { - throw new RuntimeException(e); - } - - WindowingStrategy<?, ?> windowingStrategy = - context.getInput(transform).getWindowingStrategy(); - - TypeInformation<WindowedValue<KV<K, AccumT>>> partialReduceTypeInfo = - context.getTypeInfo( - KvCoder.of(inputCoder.getKeyCoder(), accumulatorCoder), - windowingStrategy); - - Grouping<WindowedValue<KV<K, InputT>>> inputGrouping = - inputDataSet.groupBy(new KvKeySelector<InputT, K>(inputCoder.getKeyCoder())); - - // construct a map from side input to WindowingStrategy so that - // the DoFn runner can map main-input windows to side input windows - Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputStrategies = new HashMap<>(); - for (PCollectionView<?> sideInput: transform.getSideInputs()) { - sideInputStrategies.put(sideInput, sideInput.getWindowingStrategyInternal()); - } - - if (windowingStrategy.getWindowFn().isNonMerging()) { - WindowingStrategy<?, BoundedWindow> boundedStrategy = - (WindowingStrategy<?, BoundedWindow>) windowingStrategy; - - FlinkPartialReduceFunction<K, InputT, AccumT, ?> partialReduceFunction = - new FlinkPartialReduceFunction<>( - combineFn, - boundedStrategy, - sideInputStrategies, - context.getPipelineOptions()); - - FlinkReduceFunction<K, AccumT, OutputT, ?> reduceFunction = - new FlinkReduceFunction<>( - combineFn, - boundedStrategy, - sideInputStrategies, - context.getPipelineOptions()); - - // Partially GroupReduce the values into the intermediate format AccumT (combine) - GroupCombineOperator< - WindowedValue<KV<K, InputT>>, - WindowedValue<KV<K, AccumT>>> groupCombine = - new GroupCombineOperator<>( - inputGrouping, - partialReduceTypeInfo, - partialReduceFunction, - "GroupCombine: " + transform.getName()); - - transformSideInputs(transform.getSideInputs(), groupCombine, context); - - TypeInformation<WindowedValue<KV<K, OutputT>>> reduceTypeInfo = - context.getTypeInfo(context.getOutput(transform)); - - Grouping<WindowedValue<KV<K, AccumT>>> intermediateGrouping = - groupCombine.groupBy(new KvKeySelector<AccumT, K>(inputCoder.getKeyCoder())); - - // Fully reduce the values and create output format OutputT - GroupReduceOperator< - WindowedValue<KV<K, AccumT>>, WindowedValue<KV<K, OutputT>>> outputDataSet = - new GroupReduceOperator<>( - intermediateGrouping, reduceTypeInfo, reduceFunction, transform.getName()); - - transformSideInputs(transform.getSideInputs(), outputDataSet, context); - - context.setOutputDataSet(context.getOutput(transform), outputDataSet); - - } else { - if (!windowingStrategy.getWindowFn().windowCoder().equals(IntervalWindow.getCoder())) { - throw new UnsupportedOperationException( - "Merging WindowFn with windows other than IntervalWindow are not supported."); - } - - // for merging windows we can't to a pre-shuffle combine step since - // elements would not be in their correct windows for side-input access - - WindowingStrategy<?, IntervalWindow> intervalStrategy = - (WindowingStrategy<?, IntervalWindow>) windowingStrategy; - - FlinkMergingNonShuffleReduceFunction<K, InputT, AccumT, OutputT, ?> reduceFunction = - new FlinkMergingNonShuffleReduceFunction<>( - combineFn, - intervalStrategy, - sideInputStrategies, - context.getPipelineOptions()); - - TypeInformation<WindowedValue<KV<K, OutputT>>> reduceTypeInfo = - context.getTypeInfo(context.getOutput(transform)); - - Grouping<WindowedValue<KV<K, InputT>>> grouping = - inputDataSet.groupBy(new KvKeySelector<InputT, K>(inputCoder.getKeyCoder())); - - // Fully reduce the values and create output format OutputT - GroupReduceOperator< - WindowedValue<KV<K, InputT>>, WindowedValue<KV<K, OutputT>>> outputDataSet = - new GroupReduceOperator<>( - grouping, reduceTypeInfo, reduceFunction, transform.getName()); - - transformSideInputs(transform.getSideInputs(), outputDataSet, context); - - context.setOutputDataSet(context.getOutput(transform), outputDataSet); - } - - - } - } - - private static void rejectSplittable(DoFn<?, ?> doFn) { - DoFnSignature signature = DoFnSignatures.getSignature(doFn.getClass()); - if (signature.processElement().isSplittable()) { - throw new UnsupportedOperationException( - String.format( - "%s does not currently support splittable DoFn: %s", - FlinkRunner.class.getSimpleName(), doFn)); - } - } - - private static class ParDoTranslatorBatch<InputT, OutputT> - implements FlinkBatchPipelineTranslator.BatchTransformTranslator< - ParDo.MultiOutput<InputT, OutputT>> { - - @Override - @SuppressWarnings("unchecked") - public void translateNode( - ParDo.MultiOutput<InputT, OutputT> transform, - FlinkBatchTranslationContext context) { - DoFn<InputT, OutputT> doFn = transform.getFn(); - rejectSplittable(doFn); - DataSet<WindowedValue<InputT>> inputDataSet = - context.getInputDataSet(context.getInput(transform)); - - Map<TupleTag<?>, PValue> outputs = context.getOutputs(transform); - - Map<TupleTag<?>, Integer> outputMap = Maps.newHashMap(); - // put the main output at index 0, FlinkMultiOutputDoFnFunction expects this - outputMap.put(transform.getMainOutputTag(), 0); - int count = 1; - for (TupleTag<?> tag : outputs.keySet()) { - if (!outputMap.containsKey(tag)) { - outputMap.put(tag, count++); - } - } - - // assume that the windowing strategy is the same for all outputs - WindowingStrategy<?, ?> windowingStrategy = null; - - // collect all output Coders and create a UnionCoder for our tagged outputs - List<Coder<?>> outputCoders = Lists.newArrayList(); - for (PValue taggedValue : outputs.values()) { - checkState( - taggedValue instanceof PCollection, - "Within ParDo, got a non-PCollection output %s of type %s", - taggedValue, - taggedValue.getClass().getSimpleName()); - PCollection<?> coll = (PCollection<?>) taggedValue; - outputCoders.add(coll.getCoder()); - windowingStrategy = coll.getWindowingStrategy(); - } - - if (windowingStrategy == null) { - throw new IllegalStateException("No outputs defined."); - } - - UnionCoder unionCoder = UnionCoder.of(outputCoders); - - TypeInformation<WindowedValue<RawUnionValue>> typeInformation = - new CoderTypeInformation<>( - WindowedValue.getFullCoder( - unionCoder, - windowingStrategy.getWindowFn().windowCoder())); - - List<PCollectionView<?>> sideInputs = transform.getSideInputs(); - - // construct a map from side input to WindowingStrategy so that - // the DoFn runner can map main-input windows to side input windows - Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputStrategies = new HashMap<>(); - for (PCollectionView<?> sideInput: sideInputs) { - sideInputStrategies.put(sideInput, sideInput.getWindowingStrategyInternal()); - } - - SingleInputUdfOperator<WindowedValue<InputT>, WindowedValue<RawUnionValue>, ?> outputDataSet; - DoFnSignature signature = DoFnSignatures.getSignature(transform.getFn().getClass()); - if (signature.stateDeclarations().size() > 0 - || signature.timerDeclarations().size() > 0) { - - // Based on the fact that the signature is stateful, DoFnSignatures ensures - // that it is also keyed - KvCoder<?, InputT> inputCoder = - (KvCoder<?, InputT>) context.getInput(transform).getCoder(); - - FlinkStatefulDoFnFunction<?, ?, OutputT> doFnWrapper = new FlinkStatefulDoFnFunction<>( - (DoFn) doFn, windowingStrategy, sideInputStrategies, context.getPipelineOptions(), - outputMap, transform.getMainOutputTag() - ); - - Grouping<WindowedValue<InputT>> grouping = - inputDataSet.groupBy(new KvKeySelector(inputCoder.getKeyCoder())); - - outputDataSet = - new GroupReduceOperator(grouping, typeInformation, doFnWrapper, transform.getName()); - - } else { - FlinkDoFnFunction<InputT, RawUnionValue> doFnWrapper = - new FlinkDoFnFunction( - doFn, - windowingStrategy, - sideInputStrategies, - context.getPipelineOptions(), - outputMap, - transform.getMainOutputTag()); - - outputDataSet = new MapPartitionOperator<>( - inputDataSet, typeInformation, - doFnWrapper, transform.getName()); - - } - - transformSideInputs(sideInputs, outputDataSet, context); - - for (Entry<TupleTag<?>, PValue> output : outputs.entrySet()) { - pruneOutput( - outputDataSet, - context, - outputMap.get(output.getKey()), - (PCollection) output.getValue()); - } - - } - - private <T> void pruneOutput( - DataSet<WindowedValue<RawUnionValue>> taggedDataSet, - FlinkBatchTranslationContext context, - int integerTag, - PCollection<T> collection) { - TypeInformation<WindowedValue<T>> outputType = context.getTypeInfo(collection); - - FlinkMultiOutputPruningFunction<T> pruningFunction = - new FlinkMultiOutputPruningFunction<>(integerTag); - - FlatMapOperator<WindowedValue<RawUnionValue>, WindowedValue<T>> pruningOperator = - new FlatMapOperator<>( - taggedDataSet, - outputType, - pruningFunction, - collection.getName()); - - context.setOutputDataSet(collection, pruningOperator); - } - } - - private static class FlattenPCollectionTranslatorBatch<T> - implements FlinkBatchPipelineTranslator.BatchTransformTranslator< - Flatten.PCollections<T>> { - - @Override - @SuppressWarnings("unchecked") - public void translateNode( - Flatten.PCollections<T> transform, - FlinkBatchTranslationContext context) { - - Map<TupleTag<?>, PValue> allInputs = context.getInputs(transform); - DataSet<WindowedValue<T>> result = null; - - if (allInputs.isEmpty()) { - - // create an empty dummy source to satisfy downstream operations - // we cannot create an empty source in Flink, therefore we have to - // add the flatMap that simply never forwards the single element - DataSource<String> dummySource = - context.getExecutionEnvironment().fromElements("dummy"); - result = dummySource.flatMap(new FlatMapFunction<String, WindowedValue<T>>() { - @Override - public void flatMap(String s, Collector<WindowedValue<T>> collector) throws Exception { - // never return anything - } - }).returns( - new CoderTypeInformation<>( - WindowedValue.getFullCoder( - (Coder<T>) VoidCoder.of(), - GlobalWindow.Coder.INSTANCE))); - } else { - for (PValue taggedPc : allInputs.values()) { - checkArgument( - taggedPc instanceof PCollection, - "Got non-PCollection input to flatten: %s of type %s", - taggedPc, - taggedPc.getClass().getSimpleName()); - PCollection<T> collection = (PCollection<T>) taggedPc; - DataSet<WindowedValue<T>> current = context.getInputDataSet(collection); - if (result == null) { - result = current; - } else { - result = result.union(current); - } - } - } - - // insert a dummy filter, there seems to be a bug in Flink - // that produces duplicate elements after the union in some cases - // if we don't - result = result.filter(new FilterFunction<WindowedValue<T>>() { - @Override - public boolean filter(WindowedValue<T> tWindowedValue) throws Exception { - return true; - } - }).name("UnionFixFilter"); - context.setOutputDataSet(context.getOutput(transform), result); - } - } - - private static class CreatePCollectionViewTranslatorBatch<ElemT, ViewT> - implements FlinkBatchPipelineTranslator.BatchTransformTranslator< - View.CreatePCollectionView<ElemT, ViewT>> { - - @Override - public void translateNode( - View.CreatePCollectionView<ElemT, ViewT> transform, - FlinkBatchTranslationContext context) { - DataSet<WindowedValue<ElemT>> inputDataSet = - context.getInputDataSet(context.getInput(transform)); - - PCollectionView<ViewT> input = transform.getView(); - - context.setSideInputDataSet(input, inputDataSet); - } - } - - private static void transformSideInputs( - List<PCollectionView<?>> sideInputs, - SingleInputUdfOperator<?, ?, ?> outputDataSet, - FlinkBatchTranslationContext context) { - // get corresponding Flink broadcast DataSets - for (PCollectionView<?> input : sideInputs) { - DataSet<?> broadcastSet = context.getSideInputDataSet(input); - outputDataSet.withBroadcastSet(broadcastSet, input.getTagInternal().getId()); - } - } - - private FlinkBatchTransformTranslators() {} - -} http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkBatchTranslationContext.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkBatchTranslationContext.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkBatchTranslationContext.java deleted file mode 100644 index 98dd0fb..0000000 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkBatchTranslationContext.java +++ /dev/null @@ -1,153 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink; - -import com.google.common.collect.Iterables; -import java.util.HashMap; -import java.util.Map; -import org.apache.beam.runners.flink.translation.types.CoderTypeInformation; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.transforms.AppliedPTransform; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.util.WindowingStrategy; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionView; -import org.apache.beam.sdk.values.PValue; -import org.apache.beam.sdk.values.TupleTag; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; - -/** - * Helper for {@link FlinkBatchPipelineTranslator} and translators in - * {@link FlinkBatchTransformTranslators}. - */ -class FlinkBatchTranslationContext { - - private final Map<PValue, DataSet<?>> dataSets; - private final Map<PCollectionView<?>, DataSet<?>> broadcastDataSets; - - /** - * For keeping track about which DataSets don't have a successor. We - * need to terminate these with a discarding sink because the Beam - * model allows dangling operations. - */ - private final Map<PValue, DataSet<?>> danglingDataSets; - - private final ExecutionEnvironment env; - private final PipelineOptions options; - - private AppliedPTransform<?, ?, ?> currentTransform; - - // ------------------------------------------------------------------------ - - public FlinkBatchTranslationContext(ExecutionEnvironment env, PipelineOptions options) { - this.env = env; - this.options = options; - this.dataSets = new HashMap<>(); - this.broadcastDataSets = new HashMap<>(); - - this.danglingDataSets = new HashMap<>(); - } - - // ------------------------------------------------------------------------ - - public Map<PValue, DataSet<?>> getDanglingDataSets() { - return danglingDataSets; - } - - public ExecutionEnvironment getExecutionEnvironment() { - return env; - } - - public PipelineOptions getPipelineOptions() { - return options; - } - - @SuppressWarnings("unchecked") - public <T> DataSet<WindowedValue<T>> getInputDataSet(PValue value) { - // assume that the DataSet is used as an input if retrieved here - danglingDataSets.remove(value); - return (DataSet<WindowedValue<T>>) dataSets.get(value); - } - - public <T> void setOutputDataSet(PValue value, DataSet<WindowedValue<T>> set) { - if (!dataSets.containsKey(value)) { - dataSets.put(value, set); - danglingDataSets.put(value, set); - } - } - - /** - * Sets the AppliedPTransform which carries input/output. - * @param currentTransform - */ - public void setCurrentTransform(AppliedPTransform<?, ?, ?> currentTransform) { - this.currentTransform = currentTransform; - } - - @SuppressWarnings("unchecked") - public <T> DataSet<T> getSideInputDataSet(PCollectionView<?> value) { - return (DataSet<T>) broadcastDataSets.get(value); - } - - public <ViewT, ElemT> void setSideInputDataSet( - PCollectionView<ViewT> value, - DataSet<WindowedValue<ElemT>> set) { - if (!broadcastDataSets.containsKey(value)) { - broadcastDataSets.put(value, set); - } - } - - @SuppressWarnings("unchecked") - public <T> TypeInformation<WindowedValue<T>> getTypeInfo(PCollection<T> collection) { - return getTypeInfo(collection.getCoder(), collection.getWindowingStrategy()); - } - - @SuppressWarnings("unchecked") - public <T> TypeInformation<WindowedValue<T>> getTypeInfo( - Coder<T> coder, - WindowingStrategy<?, ?> windowingStrategy) { - WindowedValue.FullWindowedValueCoder<T> windowedValueCoder = - WindowedValue.getFullCoder( - coder, - windowingStrategy.getWindowFn().windowCoder()); - - return new CoderTypeInformation<>(windowedValueCoder); - } - - Map<TupleTag<?>, PValue> getInputs(PTransform<?, ?> transform) { - return currentTransform.getInputs(); - } - - @SuppressWarnings("unchecked") - <T extends PValue> T getInput(PTransform<T, ?> transform) { - return (T) Iterables.getOnlyElement(currentTransform.getInputs().values()); - } - - Map<TupleTag<?>, PValue> getOutputs(PTransform<?, ?> transform) { - return currentTransform.getOutputs(); - } - - @SuppressWarnings("unchecked") - <T extends PValue> T getOutput(PTransform<?, T> transform) { - return (T) Iterables.getOnlyElement(currentTransform.getOutputs().values()); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkDetachedRunnerResult.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkDetachedRunnerResult.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkDetachedRunnerResult.java deleted file mode 100644 index bf4395f..0000000 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkDetachedRunnerResult.java +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink; - -import java.io.IOException; - -import org.apache.beam.sdk.AggregatorRetrievalException; -import org.apache.beam.sdk.AggregatorValues; -import org.apache.beam.sdk.PipelineResult; -import org.apache.beam.sdk.metrics.MetricResults; -import org.apache.beam.sdk.transforms.Aggregator; -import org.joda.time.Duration; - - -/** - * Result of a detached execution of a {@link org.apache.beam.sdk.Pipeline} with Flink. - * In detached execution, results and job execution are currently unavailable. - */ -public class FlinkDetachedRunnerResult implements PipelineResult { - - FlinkDetachedRunnerResult() {} - - @Override - public State getState() { - return State.UNKNOWN; - } - - @Override - public <T> AggregatorValues<T> getAggregatorValues(final Aggregator<?, T> aggregator) - throws AggregatorRetrievalException { - throw new AggregatorRetrievalException( - "Accumulators can't be retrieved for detached Job executions.", - new UnsupportedOperationException()); - } - - @Override - public MetricResults metrics() { - throw new UnsupportedOperationException("The FlinkRunner does not currently support metrics."); - } - - @Override - public State cancel() throws IOException { - throw new UnsupportedOperationException("Cancelling is not yet supported."); - } - - @Override - public State waitUntilFinish() { - return State.UNKNOWN; - } - - @Override - public State waitUntilFinish(Duration duration) { - return State.UNKNOWN; - } - - @Override - public String toString() { - return "FlinkDetachedRunnerResult{}"; - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java deleted file mode 100644 index ba00036..0000000 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java +++ /dev/null @@ -1,241 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink; - -import static com.google.common.base.Preconditions.checkNotNull; - -import java.util.List; -import org.apache.beam.sdk.Pipeline; -import org.apache.flink.api.common.JobExecutionResult; -import org.apache.flink.api.java.CollectionEnvironment; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.runtime.state.AbstractStateBackend; -import org.apache.flink.streaming.api.TimeCharacteristic; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * The class that instantiates and manages the execution of a given job. - * Depending on if the job is a Streaming or Batch processing one, it creates - * the adequate execution environment ({@link ExecutionEnvironment} - * or {@link StreamExecutionEnvironment}), the necessary {@link FlinkPipelineTranslator} - * ({@link FlinkBatchPipelineTranslator} or {@link FlinkStreamingPipelineTranslator}) to - * transform the Beam job into a Flink one, and executes the (translated) job. - */ -class FlinkPipelineExecutionEnvironment { - - private static final Logger LOG = - LoggerFactory.getLogger(FlinkPipelineExecutionEnvironment.class); - - private final FlinkPipelineOptions options; - - /** - * The Flink Batch execution environment. This is instantiated to either a - * {@link org.apache.flink.api.java.CollectionEnvironment}, - * a {@link org.apache.flink.api.java.LocalEnvironment} or - * a {@link org.apache.flink.api.java.RemoteEnvironment}, depending on the configuration - * options. - */ - private ExecutionEnvironment flinkBatchEnv; - - /** - * The Flink Streaming execution environment. This is instantiated to either a - * {@link org.apache.flink.streaming.api.environment.LocalStreamEnvironment} or - * a {@link org.apache.flink.streaming.api.environment.RemoteStreamEnvironment}, depending - * on the configuration options, and more specifically, the url of the master. - */ - private StreamExecutionEnvironment flinkStreamEnv; - - /** - * Creates a {@link FlinkPipelineExecutionEnvironment} with the user-specified parameters in the - * provided {@link FlinkPipelineOptions}. - * - * @param options the user-defined pipeline options. - * */ - FlinkPipelineExecutionEnvironment(FlinkPipelineOptions options) { - this.options = checkNotNull(options); - } - - /** - * Depending on if the job is a Streaming or a Batch one, this method creates - * the necessary execution environment and pipeline translator, and translates - * the {@link org.apache.beam.sdk.values.PCollection} program into - * a {@link org.apache.flink.api.java.DataSet} - * or {@link org.apache.flink.streaming.api.datastream.DataStream} one. - * */ - public void translate(FlinkRunner flinkRunner, Pipeline pipeline) { - this.flinkBatchEnv = null; - this.flinkStreamEnv = null; - - PipelineTranslationOptimizer optimizer = - new PipelineTranslationOptimizer(TranslationMode.BATCH, options); - - optimizer.translate(pipeline); - TranslationMode translationMode = optimizer.getTranslationMode(); - - FlinkPipelineTranslator translator; - if (translationMode == TranslationMode.STREAMING) { - this.flinkStreamEnv = createStreamExecutionEnvironment(); - translator = new FlinkStreamingPipelineTranslator(flinkRunner, flinkStreamEnv, options); - } else { - this.flinkBatchEnv = createBatchExecutionEnvironment(); - translator = new FlinkBatchPipelineTranslator(flinkBatchEnv, options); - } - - translator.translate(pipeline); - } - - /** - * Launches the program execution. - * */ - public JobExecutionResult executePipeline() throws Exception { - final String jobName = options.getJobName(); - - if (flinkBatchEnv != null) { - return flinkBatchEnv.execute(jobName); - } else if (flinkStreamEnv != null) { - return flinkStreamEnv.execute(jobName); - } else { - throw new IllegalStateException("The Pipeline has not yet been translated."); - } - } - - /** - * If the submitted job is a batch processing job, this method creates the adequate - * Flink {@link org.apache.flink.api.java.ExecutionEnvironment} depending - * on the user-specified options. - */ - private ExecutionEnvironment createBatchExecutionEnvironment() { - - LOG.info("Creating the required Batch Execution Environment."); - - String masterUrl = options.getFlinkMaster(); - ExecutionEnvironment flinkBatchEnv; - - // depending on the master, create the right environment. - if (masterUrl.equals("[local]")) { - flinkBatchEnv = ExecutionEnvironment.createLocalEnvironment(); - } else if (masterUrl.equals("[collection]")) { - flinkBatchEnv = new CollectionEnvironment(); - } else if (masterUrl.equals("[auto]")) { - flinkBatchEnv = ExecutionEnvironment.getExecutionEnvironment(); - } else if (masterUrl.matches(".*:\\d*")) { - String[] parts = masterUrl.split(":"); - List<String> stagingFiles = options.getFilesToStage(); - flinkBatchEnv = ExecutionEnvironment.createRemoteEnvironment(parts[0], - Integer.parseInt(parts[1]), - stagingFiles.toArray(new String[stagingFiles.size()])); - } else { - LOG.warn("Unrecognized Flink Master URL {}. Defaulting to [auto].", masterUrl); - flinkBatchEnv = ExecutionEnvironment.getExecutionEnvironment(); - } - - // set the correct parallelism. - if (options.getParallelism() != -1 && !(flinkBatchEnv instanceof CollectionEnvironment)) { - flinkBatchEnv.setParallelism(options.getParallelism()); - } - - // set parallelism in the options (required by some execution code) - options.setParallelism(flinkBatchEnv.getParallelism()); - - if (options.getObjectReuse()) { - flinkBatchEnv.getConfig().enableObjectReuse(); - } else { - flinkBatchEnv.getConfig().disableObjectReuse(); - } - - return flinkBatchEnv; - } - - /** - * If the submitted job is a stream processing job, this method creates the adequate - * Flink {@link org.apache.flink.streaming.api.environment.StreamExecutionEnvironment} depending - * on the user-specified options. - */ - private StreamExecutionEnvironment createStreamExecutionEnvironment() { - - LOG.info("Creating the required Streaming Environment."); - - String masterUrl = options.getFlinkMaster(); - StreamExecutionEnvironment flinkStreamEnv = null; - - // depending on the master, create the right environment. - if (masterUrl.equals("[local]")) { - flinkStreamEnv = StreamExecutionEnvironment.createLocalEnvironment(); - } else if (masterUrl.equals("[auto]")) { - flinkStreamEnv = StreamExecutionEnvironment.getExecutionEnvironment(); - } else if (masterUrl.matches(".*:\\d*")) { - String[] parts = masterUrl.split(":"); - List<String> stagingFiles = options.getFilesToStage(); - flinkStreamEnv = StreamExecutionEnvironment.createRemoteEnvironment(parts[0], - Integer.parseInt(parts[1]), stagingFiles.toArray(new String[stagingFiles.size()])); - } else { - LOG.warn("Unrecognized Flink Master URL {}. Defaulting to [auto].", masterUrl); - flinkStreamEnv = StreamExecutionEnvironment.getExecutionEnvironment(); - } - - // set the correct parallelism. - if (options.getParallelism() != -1) { - flinkStreamEnv.setParallelism(options.getParallelism()); - } - - // set parallelism in the options (required by some execution code) - options.setParallelism(flinkStreamEnv.getParallelism()); - - if (options.getObjectReuse()) { - flinkStreamEnv.getConfig().enableObjectReuse(); - } else { - flinkStreamEnv.getConfig().disableObjectReuse(); - } - - // default to event time - flinkStreamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); - - // for the following 2 parameters, a value of -1 means that Flink will use - // the default values as specified in the configuration. - int numRetries = options.getNumberOfExecutionRetries(); - if (numRetries != -1) { - flinkStreamEnv.setNumberOfExecutionRetries(numRetries); - } - long retryDelay = options.getExecutionRetryDelay(); - if (retryDelay != -1) { - flinkStreamEnv.getConfig().setExecutionRetryDelay(retryDelay); - } - - // A value of -1 corresponds to disabled checkpointing (see CheckpointConfig in Flink). - // If the value is not -1, then the validity checks are applied. - // By default, checkpointing is disabled. - long checkpointInterval = options.getCheckpointingInterval(); - if (checkpointInterval != -1) { - if (checkpointInterval < 1) { - throw new IllegalArgumentException("The checkpoint interval must be positive"); - } - flinkStreamEnv.enableCheckpointing(checkpointInterval); - } - - // State backend - final AbstractStateBackend stateBackend = options.getStateBackend(); - if (stateBackend != null) { - flinkStreamEnv.setStateBackend(stateBackend); - } - - return flinkStreamEnv; - } - -} http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java deleted file mode 100644 index ef9afea..0000000 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java +++ /dev/null @@ -1,101 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink; - - -import com.fasterxml.jackson.annotation.JsonIgnore; -import java.util.List; -import org.apache.beam.sdk.options.ApplicationNameOptions; -import org.apache.beam.sdk.options.Default; -import org.apache.beam.sdk.options.Description; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.options.StreamingOptions; -import org.apache.flink.runtime.state.AbstractStateBackend; - -/** - * Options which can be used to configure a Flink PipelineRunner. - */ -public interface FlinkPipelineOptions - extends PipelineOptions, ApplicationNameOptions, StreamingOptions { - - /** - * List of local files to make available to workers. - * - * <p>Jars are placed on the worker's classpath. - * - * <p>The default value is the list of jars from the main program's classpath. - */ - @Description("Jar-Files to send to all workers and put on the classpath. " - + "The default value is all files from the classpath.") - @JsonIgnore - List<String> getFilesToStage(); - void setFilesToStage(List<String> value); - - /** - * The url of the Flink JobManager on which to execute pipelines. This can either be - * the the address of a cluster JobManager, in the form "host:port" or one of the special - * Strings "[local]", "[collection]" or "[auto]". "[local]" will start a local Flink - * Cluster in the JVM, "[collection]" will execute the pipeline on Java Collections while - * "[auto]" will let the system decide where to execute the pipeline based on the environment. - */ - @Description("Address of the Flink Master where the Pipeline should be executed. Can" - + " either be of the form \"host:port\" or one of the special values [local], " - + "[collection] or [auto].") - String getFlinkMaster(); - void setFlinkMaster(String value); - - @Description("The degree of parallelism to be used when distributing operations onto workers.") - @Default.InstanceFactory(DefaultParallelismFactory.class) - Integer getParallelism(); - void setParallelism(Integer value); - - @Description("The interval between consecutive checkpoints (i.e. snapshots of the current" - + "pipeline state used for fault tolerance).") - @Default.Long(-1L) - Long getCheckpointingInterval(); - void setCheckpointingInterval(Long interval); - - @Description("Sets the number of times that failed tasks are re-executed. " - + "A value of zero effectively disables fault tolerance. A value of -1 indicates " - + "that the system default value (as defined in the configuration) should be used.") - @Default.Integer(-1) - Integer getNumberOfExecutionRetries(); - void setNumberOfExecutionRetries(Integer retries); - - @Description("Sets the delay between executions. A value of {@code -1} " - + "indicates that the default value should be used.") - @Default.Long(-1L) - Long getExecutionRetryDelay(); - void setExecutionRetryDelay(Long delay); - - @Description("Sets the behavior of reusing objects.") - @Default.Boolean(false) - Boolean getObjectReuse(); - void setObjectReuse(Boolean reuse); - - /** - * State backend to store Beam's state during computation. - * Note: Only applicable when executing in streaming mode. - */ - @Description("Sets the state backend to use in streaming mode. " - + "Otherwise the default is read from the Flink config.") - @JsonIgnore - AbstractStateBackend getStateBackend(); - void setStateBackend(AbstractStateBackend stateBackend); - -} http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineTranslator.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineTranslator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineTranslator.java deleted file mode 100644 index 65f416d..0000000 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineTranslator.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink; - -import org.apache.beam.sdk.Pipeline; - -/** - * The role of this class is to translate the Beam operators to - * their Flink counterparts. If we have a streaming job, this is instantiated as a - * {@link FlinkStreamingPipelineTranslator}. In other case, i.e. for a batch job, - * a {@link FlinkBatchPipelineTranslator} is created. Correspondingly, the - * {@link org.apache.beam.sdk.values.PCollection}-based user-provided job is translated into - * a {@link org.apache.flink.streaming.api.datastream.DataStream} (for streaming) or a - * {@link org.apache.flink.api.java.DataSet} (for batch) one. - */ -abstract class FlinkPipelineTranslator extends Pipeline.PipelineVisitor.Defaults { - - /** - * Translates the pipeline by passing this class as a visitor. - * @param pipeline The pipeline to be translated - */ - public void translate(Pipeline pipeline) { - pipeline.traverseTopologically(this); - } - - /** - * Utility formatting method. - * @param n number of spaces to generate - * @return String with "|" followed by n spaces - */ - protected static String genSpaces(int n) { - StringBuilder builder = new StringBuilder(); - for (int i = 0; i < n; i++) { - builder.append("| "); - } - return builder.toString(); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java deleted file mode 100644 index 096f030..0000000 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java +++ /dev/null @@ -1,232 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink; - -import com.google.common.base.Joiner; -import java.io.File; -import java.net.URISyntaxException; -import java.net.URL; -import java.net.URLClassLoader; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.SortedSet; -import java.util.TreeSet; -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.PipelineResult; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.options.PipelineOptionsValidator; -import org.apache.beam.sdk.runners.PipelineRunner; -import org.apache.beam.sdk.runners.TransformHierarchy; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.View; -import org.apache.beam.sdk.values.PValue; -import org.apache.flink.api.common.JobExecutionResult; -import org.apache.flink.client.program.DetachedEnvironment; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * A {@link PipelineRunner} that executes the operations in the - * pipeline by first translating them to a Flink Plan and then executing them either locally - * or on a Flink cluster, depending on the configuration. - */ -public class FlinkRunner extends PipelineRunner<PipelineResult> { - - private static final Logger LOG = LoggerFactory.getLogger(FlinkRunner.class); - - /** - * Provided options. - */ - private final FlinkPipelineOptions options; - - /** - * Construct a runner from the provided options. - * - * @param options Properties which configure the runner. - * @return The newly created runner. - */ - public static FlinkRunner fromOptions(PipelineOptions options) { - FlinkPipelineOptions flinkOptions = - PipelineOptionsValidator.validate(FlinkPipelineOptions.class, options); - ArrayList<String> missing = new ArrayList<>(); - - if (flinkOptions.getAppName() == null) { - missing.add("appName"); - } - if (missing.size() > 0) { - throw new IllegalArgumentException( - "Missing required values: " + Joiner.on(',').join(missing)); - } - - if (flinkOptions.getFilesToStage() == null) { - flinkOptions.setFilesToStage(detectClassPathResourcesToStage( - FlinkRunner.class.getClassLoader())); - LOG.info("PipelineOptions.filesToStage was not specified. " - + "Defaulting to files from the classpath: will stage {} files. " - + "Enable logging at DEBUG level to see which files will be staged.", - flinkOptions.getFilesToStage().size()); - LOG.debug("Classpath elements: {}", flinkOptions.getFilesToStage()); - } - - // Set Flink Master to [auto] if no option was specified. - if (flinkOptions.getFlinkMaster() == null) { - flinkOptions.setFlinkMaster("[auto]"); - } - - return new FlinkRunner(flinkOptions); - } - - private FlinkRunner(FlinkPipelineOptions options) { - this.options = options; - this.ptransformViewsWithNonDeterministicKeyCoders = new HashSet<>(); - } - - @Override - public PipelineResult run(Pipeline pipeline) { - logWarningIfPCollectionViewHasNonDeterministicKeyCoder(pipeline); - - LOG.info("Executing pipeline using FlinkRunner."); - - FlinkPipelineExecutionEnvironment env = new FlinkPipelineExecutionEnvironment(options); - - LOG.info("Translating pipeline to Flink program."); - env.translate(this, pipeline); - - JobExecutionResult result; - try { - LOG.info("Starting execution of Flink program."); - result = env.executePipeline(); - } catch (Exception e) { - LOG.error("Pipeline execution failed", e); - throw new RuntimeException("Pipeline execution failed", e); - } - - if (result instanceof DetachedEnvironment.DetachedJobExecutionResult) { - LOG.info("Pipeline submitted in Detached mode"); - return new FlinkDetachedRunnerResult(); - } else { - LOG.info("Execution finished in {} msecs", result.getNetRuntime()); - Map<String, Object> accumulators = result.getAllAccumulatorResults(); - if (accumulators != null && !accumulators.isEmpty()) { - LOG.info("Final aggregator values:"); - - for (Map.Entry<String, Object> entry : result.getAllAccumulatorResults().entrySet()) { - LOG.info("{} : {}", entry.getKey(), entry.getValue()); - } - } - - return new FlinkRunnerResult(accumulators, result.getNetRuntime()); - } - } - - /** - * For testing. - */ - public FlinkPipelineOptions getPipelineOptions() { - return options; - } - - @Override - public String toString() { - return "FlinkRunner#" + hashCode(); - } - - /** - * Attempts to detect all the resources the class loader has access to. This does not recurse - * to class loader parents stopping it from pulling in resources from the system class loader. - * - * @param classLoader The URLClassLoader to use to detect resources to stage. - * @return A list of absolute paths to the resources the class loader uses. - * @throws IllegalArgumentException If either the class loader is not a URLClassLoader or one - * of the resources the class loader exposes is not a file resource. - */ - protected static List<String> detectClassPathResourcesToStage( - ClassLoader classLoader) { - if (!(classLoader instanceof URLClassLoader)) { - String message = String.format("Unable to use ClassLoader to detect classpath elements. " - + "Current ClassLoader is %s, only URLClassLoaders are supported.", classLoader); - LOG.error(message); - throw new IllegalArgumentException(message); - } - - List<String> files = new ArrayList<>(); - for (URL url : ((URLClassLoader) classLoader).getURLs()) { - try { - files.add(new File(url.toURI()).getAbsolutePath()); - } catch (IllegalArgumentException | URISyntaxException e) { - String message = String.format("Unable to convert url (%s) to file.", url); - LOG.error(message); - throw new IllegalArgumentException(message, e); - } - } - return files; - } - - /** A set of {@link View}s with non-deterministic key coders. */ - Set<PTransform<?, ?>> ptransformViewsWithNonDeterministicKeyCoders; - - /** - * Records that the {@link PTransform} requires a deterministic key coder. - */ - void recordViewUsesNonDeterministicKeyCoder(PTransform<?, ?> ptransform) { - ptransformViewsWithNonDeterministicKeyCoders.add(ptransform); - } - - /** Outputs a warning about PCollection views without deterministic key coders. */ - private void logWarningIfPCollectionViewHasNonDeterministicKeyCoder(Pipeline pipeline) { - // We need to wait till this point to determine the names of the transforms since only - // at this time do we know the hierarchy of the transforms otherwise we could - // have just recorded the full names during apply time. - if (!ptransformViewsWithNonDeterministicKeyCoders.isEmpty()) { - final SortedSet<String> ptransformViewNamesWithNonDeterministicKeyCoders = new TreeSet<>(); - pipeline.traverseTopologically(new Pipeline.PipelineVisitor() { - @Override - public void visitValue(PValue value, TransformHierarchy.Node producer) { - } - - @Override - public void visitPrimitiveTransform(TransformHierarchy.Node node) { - if (ptransformViewsWithNonDeterministicKeyCoders.contains(node.getTransform())) { - ptransformViewNamesWithNonDeterministicKeyCoders.add(node.getFullName()); - } - } - - @Override - public CompositeBehavior enterCompositeTransform(TransformHierarchy.Node node) { - if (ptransformViewsWithNonDeterministicKeyCoders.contains(node.getTransform())) { - ptransformViewNamesWithNonDeterministicKeyCoders.add(node.getFullName()); - } - return CompositeBehavior.ENTER_TRANSFORM; - } - - @Override - public void leaveCompositeTransform(TransformHierarchy.Node node) { - } - }); - - LOG.warn("Unable to use indexed implementation for View.AsMap and View.AsMultimap for {} " - + "because the key coder is not deterministic. Falling back to singleton implementation " - + "which may cause memory and/or performance problems. Future major versions of " - + "the Flink runner will require deterministic key coders.", - ptransformViewNamesWithNonDeterministicKeyCoders); - } - } -}
