http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java new file mode 100644 index 0000000..ba00036 --- /dev/null +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java @@ -0,0 +1,241 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink; + +import static com.google.common.base.Preconditions.checkNotNull; + +import java.util.List; +import org.apache.beam.sdk.Pipeline; +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.java.CollectionEnvironment; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.runtime.state.AbstractStateBackend; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * The class that instantiates and manages the execution of a given job. + * Depending on if the job is a Streaming or Batch processing one, it creates + * the adequate execution environment ({@link ExecutionEnvironment} + * or {@link StreamExecutionEnvironment}), the necessary {@link FlinkPipelineTranslator} + * ({@link FlinkBatchPipelineTranslator} or {@link FlinkStreamingPipelineTranslator}) to + * transform the Beam job into a Flink one, and executes the (translated) job. + */ +class FlinkPipelineExecutionEnvironment { + + private static final Logger LOG = + LoggerFactory.getLogger(FlinkPipelineExecutionEnvironment.class); + + private final FlinkPipelineOptions options; + + /** + * The Flink Batch execution environment. This is instantiated to either a + * {@link org.apache.flink.api.java.CollectionEnvironment}, + * a {@link org.apache.flink.api.java.LocalEnvironment} or + * a {@link org.apache.flink.api.java.RemoteEnvironment}, depending on the configuration + * options. + */ + private ExecutionEnvironment flinkBatchEnv; + + /** + * The Flink Streaming execution environment. This is instantiated to either a + * {@link org.apache.flink.streaming.api.environment.LocalStreamEnvironment} or + * a {@link org.apache.flink.streaming.api.environment.RemoteStreamEnvironment}, depending + * on the configuration options, and more specifically, the url of the master. + */ + private StreamExecutionEnvironment flinkStreamEnv; + + /** + * Creates a {@link FlinkPipelineExecutionEnvironment} with the user-specified parameters in the + * provided {@link FlinkPipelineOptions}. + * + * @param options the user-defined pipeline options. + * */ + FlinkPipelineExecutionEnvironment(FlinkPipelineOptions options) { + this.options = checkNotNull(options); + } + + /** + * Depending on if the job is a Streaming or a Batch one, this method creates + * the necessary execution environment and pipeline translator, and translates + * the {@link org.apache.beam.sdk.values.PCollection} program into + * a {@link org.apache.flink.api.java.DataSet} + * or {@link org.apache.flink.streaming.api.datastream.DataStream} one. + * */ + public void translate(FlinkRunner flinkRunner, Pipeline pipeline) { + this.flinkBatchEnv = null; + this.flinkStreamEnv = null; + + PipelineTranslationOptimizer optimizer = + new PipelineTranslationOptimizer(TranslationMode.BATCH, options); + + optimizer.translate(pipeline); + TranslationMode translationMode = optimizer.getTranslationMode(); + + FlinkPipelineTranslator translator; + if (translationMode == TranslationMode.STREAMING) { + this.flinkStreamEnv = createStreamExecutionEnvironment(); + translator = new FlinkStreamingPipelineTranslator(flinkRunner, flinkStreamEnv, options); + } else { + this.flinkBatchEnv = createBatchExecutionEnvironment(); + translator = new FlinkBatchPipelineTranslator(flinkBatchEnv, options); + } + + translator.translate(pipeline); + } + + /** + * Launches the program execution. + * */ + public JobExecutionResult executePipeline() throws Exception { + final String jobName = options.getJobName(); + + if (flinkBatchEnv != null) { + return flinkBatchEnv.execute(jobName); + } else if (flinkStreamEnv != null) { + return flinkStreamEnv.execute(jobName); + } else { + throw new IllegalStateException("The Pipeline has not yet been translated."); + } + } + + /** + * If the submitted job is a batch processing job, this method creates the adequate + * Flink {@link org.apache.flink.api.java.ExecutionEnvironment} depending + * on the user-specified options. + */ + private ExecutionEnvironment createBatchExecutionEnvironment() { + + LOG.info("Creating the required Batch Execution Environment."); + + String masterUrl = options.getFlinkMaster(); + ExecutionEnvironment flinkBatchEnv; + + // depending on the master, create the right environment. + if (masterUrl.equals("[local]")) { + flinkBatchEnv = ExecutionEnvironment.createLocalEnvironment(); + } else if (masterUrl.equals("[collection]")) { + flinkBatchEnv = new CollectionEnvironment(); + } else if (masterUrl.equals("[auto]")) { + flinkBatchEnv = ExecutionEnvironment.getExecutionEnvironment(); + } else if (masterUrl.matches(".*:\\d*")) { + String[] parts = masterUrl.split(":"); + List<String> stagingFiles = options.getFilesToStage(); + flinkBatchEnv = ExecutionEnvironment.createRemoteEnvironment(parts[0], + Integer.parseInt(parts[1]), + stagingFiles.toArray(new String[stagingFiles.size()])); + } else { + LOG.warn("Unrecognized Flink Master URL {}. Defaulting to [auto].", masterUrl); + flinkBatchEnv = ExecutionEnvironment.getExecutionEnvironment(); + } + + // set the correct parallelism. + if (options.getParallelism() != -1 && !(flinkBatchEnv instanceof CollectionEnvironment)) { + flinkBatchEnv.setParallelism(options.getParallelism()); + } + + // set parallelism in the options (required by some execution code) + options.setParallelism(flinkBatchEnv.getParallelism()); + + if (options.getObjectReuse()) { + flinkBatchEnv.getConfig().enableObjectReuse(); + } else { + flinkBatchEnv.getConfig().disableObjectReuse(); + } + + return flinkBatchEnv; + } + + /** + * If the submitted job is a stream processing job, this method creates the adequate + * Flink {@link org.apache.flink.streaming.api.environment.StreamExecutionEnvironment} depending + * on the user-specified options. + */ + private StreamExecutionEnvironment createStreamExecutionEnvironment() { + + LOG.info("Creating the required Streaming Environment."); + + String masterUrl = options.getFlinkMaster(); + StreamExecutionEnvironment flinkStreamEnv = null; + + // depending on the master, create the right environment. + if (masterUrl.equals("[local]")) { + flinkStreamEnv = StreamExecutionEnvironment.createLocalEnvironment(); + } else if (masterUrl.equals("[auto]")) { + flinkStreamEnv = StreamExecutionEnvironment.getExecutionEnvironment(); + } else if (masterUrl.matches(".*:\\d*")) { + String[] parts = masterUrl.split(":"); + List<String> stagingFiles = options.getFilesToStage(); + flinkStreamEnv = StreamExecutionEnvironment.createRemoteEnvironment(parts[0], + Integer.parseInt(parts[1]), stagingFiles.toArray(new String[stagingFiles.size()])); + } else { + LOG.warn("Unrecognized Flink Master URL {}. Defaulting to [auto].", masterUrl); + flinkStreamEnv = StreamExecutionEnvironment.getExecutionEnvironment(); + } + + // set the correct parallelism. + if (options.getParallelism() != -1) { + flinkStreamEnv.setParallelism(options.getParallelism()); + } + + // set parallelism in the options (required by some execution code) + options.setParallelism(flinkStreamEnv.getParallelism()); + + if (options.getObjectReuse()) { + flinkStreamEnv.getConfig().enableObjectReuse(); + } else { + flinkStreamEnv.getConfig().disableObjectReuse(); + } + + // default to event time + flinkStreamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); + + // for the following 2 parameters, a value of -1 means that Flink will use + // the default values as specified in the configuration. + int numRetries = options.getNumberOfExecutionRetries(); + if (numRetries != -1) { + flinkStreamEnv.setNumberOfExecutionRetries(numRetries); + } + long retryDelay = options.getExecutionRetryDelay(); + if (retryDelay != -1) { + flinkStreamEnv.getConfig().setExecutionRetryDelay(retryDelay); + } + + // A value of -1 corresponds to disabled checkpointing (see CheckpointConfig in Flink). + // If the value is not -1, then the validity checks are applied. + // By default, checkpointing is disabled. + long checkpointInterval = options.getCheckpointingInterval(); + if (checkpointInterval != -1) { + if (checkpointInterval < 1) { + throw new IllegalArgumentException("The checkpoint interval must be positive"); + } + flinkStreamEnv.enableCheckpointing(checkpointInterval); + } + + // State backend + final AbstractStateBackend stateBackend = options.getStateBackend(); + if (stateBackend != null) { + flinkStreamEnv.setStateBackend(stateBackend); + } + + return flinkStreamEnv; + } + +}
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java new file mode 100644 index 0000000..ef9afea --- /dev/null +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink; + + +import com.fasterxml.jackson.annotation.JsonIgnore; +import java.util.List; +import org.apache.beam.sdk.options.ApplicationNameOptions; +import org.apache.beam.sdk.options.Default; +import org.apache.beam.sdk.options.Description; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.StreamingOptions; +import org.apache.flink.runtime.state.AbstractStateBackend; + +/** + * Options which can be used to configure a Flink PipelineRunner. + */ +public interface FlinkPipelineOptions + extends PipelineOptions, ApplicationNameOptions, StreamingOptions { + + /** + * List of local files to make available to workers. + * + * <p>Jars are placed on the worker's classpath. + * + * <p>The default value is the list of jars from the main program's classpath. + */ + @Description("Jar-Files to send to all workers and put on the classpath. " + + "The default value is all files from the classpath.") + @JsonIgnore + List<String> getFilesToStage(); + void setFilesToStage(List<String> value); + + /** + * The url of the Flink JobManager on which to execute pipelines. This can either be + * the the address of a cluster JobManager, in the form "host:port" or one of the special + * Strings "[local]", "[collection]" or "[auto]". "[local]" will start a local Flink + * Cluster in the JVM, "[collection]" will execute the pipeline on Java Collections while + * "[auto]" will let the system decide where to execute the pipeline based on the environment. + */ + @Description("Address of the Flink Master where the Pipeline should be executed. Can" + + " either be of the form \"host:port\" or one of the special values [local], " + + "[collection] or [auto].") + String getFlinkMaster(); + void setFlinkMaster(String value); + + @Description("The degree of parallelism to be used when distributing operations onto workers.") + @Default.InstanceFactory(DefaultParallelismFactory.class) + Integer getParallelism(); + void setParallelism(Integer value); + + @Description("The interval between consecutive checkpoints (i.e. snapshots of the current" + + "pipeline state used for fault tolerance).") + @Default.Long(-1L) + Long getCheckpointingInterval(); + void setCheckpointingInterval(Long interval); + + @Description("Sets the number of times that failed tasks are re-executed. " + + "A value of zero effectively disables fault tolerance. A value of -1 indicates " + + "that the system default value (as defined in the configuration) should be used.") + @Default.Integer(-1) + Integer getNumberOfExecutionRetries(); + void setNumberOfExecutionRetries(Integer retries); + + @Description("Sets the delay between executions. A value of {@code -1} " + + "indicates that the default value should be used.") + @Default.Long(-1L) + Long getExecutionRetryDelay(); + void setExecutionRetryDelay(Long delay); + + @Description("Sets the behavior of reusing objects.") + @Default.Boolean(false) + Boolean getObjectReuse(); + void setObjectReuse(Boolean reuse); + + /** + * State backend to store Beam's state during computation. + * Note: Only applicable when executing in streaming mode. + */ + @Description("Sets the state backend to use in streaming mode. " + + "Otherwise the default is read from the Flink config.") + @JsonIgnore + AbstractStateBackend getStateBackend(); + void setStateBackend(AbstractStateBackend stateBackend); + +} http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineTranslator.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineTranslator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineTranslator.java new file mode 100644 index 0000000..65f416d --- /dev/null +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineTranslator.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink; + +import org.apache.beam.sdk.Pipeline; + +/** + * The role of this class is to translate the Beam operators to + * their Flink counterparts. If we have a streaming job, this is instantiated as a + * {@link FlinkStreamingPipelineTranslator}. In other case, i.e. for a batch job, + * a {@link FlinkBatchPipelineTranslator} is created. Correspondingly, the + * {@link org.apache.beam.sdk.values.PCollection}-based user-provided job is translated into + * a {@link org.apache.flink.streaming.api.datastream.DataStream} (for streaming) or a + * {@link org.apache.flink.api.java.DataSet} (for batch) one. + */ +abstract class FlinkPipelineTranslator extends Pipeline.PipelineVisitor.Defaults { + + /** + * Translates the pipeline by passing this class as a visitor. + * @param pipeline The pipeline to be translated + */ + public void translate(Pipeline pipeline) { + pipeline.traverseTopologically(this); + } + + /** + * Utility formatting method. + * @param n number of spaces to generate + * @return String with "|" followed by n spaces + */ + protected static String genSpaces(int n) { + StringBuilder builder = new StringBuilder(); + for (int i = 0; i < n; i++) { + builder.append("| "); + } + return builder.toString(); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java new file mode 100644 index 0000000..096f030 --- /dev/null +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java @@ -0,0 +1,232 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink; + +import com.google.common.base.Joiner; +import java.io.File; +import java.net.URISyntaxException; +import java.net.URL; +import java.net.URLClassLoader; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.PipelineResult; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsValidator; +import org.apache.beam.sdk.runners.PipelineRunner; +import org.apache.beam.sdk.runners.TransformHierarchy; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.View; +import org.apache.beam.sdk.values.PValue; +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.client.program.DetachedEnvironment; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A {@link PipelineRunner} that executes the operations in the + * pipeline by first translating them to a Flink Plan and then executing them either locally + * or on a Flink cluster, depending on the configuration. + */ +public class FlinkRunner extends PipelineRunner<PipelineResult> { + + private static final Logger LOG = LoggerFactory.getLogger(FlinkRunner.class); + + /** + * Provided options. + */ + private final FlinkPipelineOptions options; + + /** + * Construct a runner from the provided options. + * + * @param options Properties which configure the runner. + * @return The newly created runner. + */ + public static FlinkRunner fromOptions(PipelineOptions options) { + FlinkPipelineOptions flinkOptions = + PipelineOptionsValidator.validate(FlinkPipelineOptions.class, options); + ArrayList<String> missing = new ArrayList<>(); + + if (flinkOptions.getAppName() == null) { + missing.add("appName"); + } + if (missing.size() > 0) { + throw new IllegalArgumentException( + "Missing required values: " + Joiner.on(',').join(missing)); + } + + if (flinkOptions.getFilesToStage() == null) { + flinkOptions.setFilesToStage(detectClassPathResourcesToStage( + FlinkRunner.class.getClassLoader())); + LOG.info("PipelineOptions.filesToStage was not specified. " + + "Defaulting to files from the classpath: will stage {} files. " + + "Enable logging at DEBUG level to see which files will be staged.", + flinkOptions.getFilesToStage().size()); + LOG.debug("Classpath elements: {}", flinkOptions.getFilesToStage()); + } + + // Set Flink Master to [auto] if no option was specified. + if (flinkOptions.getFlinkMaster() == null) { + flinkOptions.setFlinkMaster("[auto]"); + } + + return new FlinkRunner(flinkOptions); + } + + private FlinkRunner(FlinkPipelineOptions options) { + this.options = options; + this.ptransformViewsWithNonDeterministicKeyCoders = new HashSet<>(); + } + + @Override + public PipelineResult run(Pipeline pipeline) { + logWarningIfPCollectionViewHasNonDeterministicKeyCoder(pipeline); + + LOG.info("Executing pipeline using FlinkRunner."); + + FlinkPipelineExecutionEnvironment env = new FlinkPipelineExecutionEnvironment(options); + + LOG.info("Translating pipeline to Flink program."); + env.translate(this, pipeline); + + JobExecutionResult result; + try { + LOG.info("Starting execution of Flink program."); + result = env.executePipeline(); + } catch (Exception e) { + LOG.error("Pipeline execution failed", e); + throw new RuntimeException("Pipeline execution failed", e); + } + + if (result instanceof DetachedEnvironment.DetachedJobExecutionResult) { + LOG.info("Pipeline submitted in Detached mode"); + return new FlinkDetachedRunnerResult(); + } else { + LOG.info("Execution finished in {} msecs", result.getNetRuntime()); + Map<String, Object> accumulators = result.getAllAccumulatorResults(); + if (accumulators != null && !accumulators.isEmpty()) { + LOG.info("Final aggregator values:"); + + for (Map.Entry<String, Object> entry : result.getAllAccumulatorResults().entrySet()) { + LOG.info("{} : {}", entry.getKey(), entry.getValue()); + } + } + + return new FlinkRunnerResult(accumulators, result.getNetRuntime()); + } + } + + /** + * For testing. + */ + public FlinkPipelineOptions getPipelineOptions() { + return options; + } + + @Override + public String toString() { + return "FlinkRunner#" + hashCode(); + } + + /** + * Attempts to detect all the resources the class loader has access to. This does not recurse + * to class loader parents stopping it from pulling in resources from the system class loader. + * + * @param classLoader The URLClassLoader to use to detect resources to stage. + * @return A list of absolute paths to the resources the class loader uses. + * @throws IllegalArgumentException If either the class loader is not a URLClassLoader or one + * of the resources the class loader exposes is not a file resource. + */ + protected static List<String> detectClassPathResourcesToStage( + ClassLoader classLoader) { + if (!(classLoader instanceof URLClassLoader)) { + String message = String.format("Unable to use ClassLoader to detect classpath elements. " + + "Current ClassLoader is %s, only URLClassLoaders are supported.", classLoader); + LOG.error(message); + throw new IllegalArgumentException(message); + } + + List<String> files = new ArrayList<>(); + for (URL url : ((URLClassLoader) classLoader).getURLs()) { + try { + files.add(new File(url.toURI()).getAbsolutePath()); + } catch (IllegalArgumentException | URISyntaxException e) { + String message = String.format("Unable to convert url (%s) to file.", url); + LOG.error(message); + throw new IllegalArgumentException(message, e); + } + } + return files; + } + + /** A set of {@link View}s with non-deterministic key coders. */ + Set<PTransform<?, ?>> ptransformViewsWithNonDeterministicKeyCoders; + + /** + * Records that the {@link PTransform} requires a deterministic key coder. + */ + void recordViewUsesNonDeterministicKeyCoder(PTransform<?, ?> ptransform) { + ptransformViewsWithNonDeterministicKeyCoders.add(ptransform); + } + + /** Outputs a warning about PCollection views without deterministic key coders. */ + private void logWarningIfPCollectionViewHasNonDeterministicKeyCoder(Pipeline pipeline) { + // We need to wait till this point to determine the names of the transforms since only + // at this time do we know the hierarchy of the transforms otherwise we could + // have just recorded the full names during apply time. + if (!ptransformViewsWithNonDeterministicKeyCoders.isEmpty()) { + final SortedSet<String> ptransformViewNamesWithNonDeterministicKeyCoders = new TreeSet<>(); + pipeline.traverseTopologically(new Pipeline.PipelineVisitor() { + @Override + public void visitValue(PValue value, TransformHierarchy.Node producer) { + } + + @Override + public void visitPrimitiveTransform(TransformHierarchy.Node node) { + if (ptransformViewsWithNonDeterministicKeyCoders.contains(node.getTransform())) { + ptransformViewNamesWithNonDeterministicKeyCoders.add(node.getFullName()); + } + } + + @Override + public CompositeBehavior enterCompositeTransform(TransformHierarchy.Node node) { + if (ptransformViewsWithNonDeterministicKeyCoders.contains(node.getTransform())) { + ptransformViewNamesWithNonDeterministicKeyCoders.add(node.getFullName()); + } + return CompositeBehavior.ENTER_TRANSFORM; + } + + @Override + public void leaveCompositeTransform(TransformHierarchy.Node node) { + } + }); + + LOG.warn("Unable to use indexed implementation for View.AsMap and View.AsMultimap for {} " + + "because the key coder is not deterministic. Falling back to singleton implementation " + + "which may cause memory and/or performance problems. Future major versions of " + + "the Flink runner will require deterministic key coders.", + ptransformViewNamesWithNonDeterministicKeyCoders); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunnerRegistrar.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunnerRegistrar.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunnerRegistrar.java new file mode 100644 index 0000000..681459a --- /dev/null +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunnerRegistrar.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.flink; + +import com.google.auto.service.AutoService; +import com.google.common.collect.ImmutableList; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsRegistrar; +import org.apache.beam.sdk.runners.PipelineRunner; +import org.apache.beam.sdk.runners.PipelineRunnerRegistrar; + + +/** + * AutoService registrar - will register FlinkRunner and FlinkOptions + * as possible pipeline runner services. + * + * <p>It ends up in META-INF/services and gets picked up by Beam. + * + */ +public class FlinkRunnerRegistrar { + private FlinkRunnerRegistrar() { } + + /** + * Pipeline runner registrar. + */ + @AutoService(PipelineRunnerRegistrar.class) + public static class Runner implements PipelineRunnerRegistrar { + @Override + public Iterable<Class<? extends PipelineRunner<?>>> getPipelineRunners() { + return ImmutableList.<Class<? extends PipelineRunner<?>>>of( + FlinkRunner.class, + TestFlinkRunner.class); + } + } + + /** + * Pipeline options registrar. + */ + @AutoService(PipelineOptionsRegistrar.class) + public static class Options implements PipelineOptionsRegistrar { + @Override + public Iterable<Class<? extends PipelineOptions>> getPipelineOptions() { + return ImmutableList.<Class<? extends PipelineOptions>>of(FlinkPipelineOptions.class); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java new file mode 100644 index 0000000..0682b56 --- /dev/null +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink; + +import java.io.IOException; +import java.util.Collections; +import java.util.Map; +import org.apache.beam.sdk.AggregatorRetrievalException; +import org.apache.beam.sdk.AggregatorValues; +import org.apache.beam.sdk.PipelineResult; +import org.apache.beam.sdk.metrics.MetricResults; +import org.apache.beam.sdk.transforms.Aggregator; +import org.joda.time.Duration; + +/** + * Result of executing a {@link org.apache.beam.sdk.Pipeline} with Flink. This + * has methods to query to job runtime and the final values of + * {@link org.apache.beam.sdk.transforms.Aggregator}s. + */ +public class FlinkRunnerResult implements PipelineResult { + + private final Map<String, Object> aggregators; + + private final long runtime; + + FlinkRunnerResult(Map<String, Object> aggregators, long runtime) { + this.aggregators = (aggregators == null || aggregators.isEmpty()) + ? Collections.<String, Object>emptyMap() + : Collections.unmodifiableMap(aggregators); + this.runtime = runtime; + } + + @Override + public State getState() { + return State.DONE; + } + + @Override + public <T> AggregatorValues<T> getAggregatorValues(final Aggregator<?, T> aggregator) + throws AggregatorRetrievalException { + // TODO provide a list of all accumulator step values + Object value = aggregators.get(aggregator.getName()); + if (value != null) { + return new AggregatorValues<T>() { + @Override + public Map<String, T> getValuesAtSteps() { + return (Map<String, T>) aggregators; + } + }; + } else { + throw new AggregatorRetrievalException("Accumulator results not found.", + new RuntimeException("Accumulator does not exist.")); + } + } + + @Override + public String toString() { + return "FlinkRunnerResult{" + + "aggregators=" + aggregators + + ", runtime=" + runtime + + '}'; + } + + @Override + public State cancel() throws IOException { + throw new UnsupportedOperationException("FlinkRunnerResult does not support cancel."); + } + + @Override + public State waitUntilFinish() { + return State.DONE; + } + + @Override + public State waitUntilFinish(Duration duration) { + return State.DONE; + } + + @Override + public MetricResults metrics() { + throw new UnsupportedOperationException("The FlinkRunner does not currently support metrics."); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java new file mode 100644 index 0000000..0459ef7 --- /dev/null +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java @@ -0,0 +1,276 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink; + +import com.google.common.collect.ImmutableList; +import java.util.List; +import java.util.Map; +import org.apache.beam.runners.core.SplittableParDo; +import org.apache.beam.runners.core.construction.PTransformMatchers; +import org.apache.beam.runners.core.construction.PTransformReplacements; +import org.apache.beam.runners.core.construction.ReplacementOutputs; +import org.apache.beam.runners.core.construction.SingleInputOutputOverrideFactory; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.runners.PTransformOverride; +import org.apache.beam.sdk.runners.PTransformOverrideFactory; +import org.apache.beam.sdk.runners.TransformHierarchy; +import org.apache.beam.sdk.transforms.AppliedPTransform; +import org.apache.beam.sdk.transforms.Combine; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo.MultiOutput; +import org.apache.beam.sdk.transforms.View; +import org.apache.beam.sdk.util.InstanceBuilder; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.PValue; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This is a {@link FlinkPipelineTranslator} for streaming jobs. Its role is to translate + * the user-provided {@link org.apache.beam.sdk.values.PCollection}-based job into a + * {@link org.apache.flink.streaming.api.datastream.DataStream} one. + * + */ +class FlinkStreamingPipelineTranslator extends FlinkPipelineTranslator { + + private static final Logger LOG = LoggerFactory.getLogger(FlinkStreamingPipelineTranslator.class); + + /** The necessary context in the case of a straming job. */ + private final FlinkStreamingTranslationContext streamingContext; + + private int depth = 0; + + private FlinkRunner flinkRunner; + + public FlinkStreamingPipelineTranslator( + FlinkRunner flinkRunner, + StreamExecutionEnvironment env, + PipelineOptions options) { + this.streamingContext = new FlinkStreamingTranslationContext(env, options); + this.flinkRunner = flinkRunner; + } + + @Override + public void translate(Pipeline pipeline) { + List<PTransformOverride> transformOverrides = + ImmutableList.<PTransformOverride>builder() + .add( + PTransformOverride.of( + PTransformMatchers.splittableParDoMulti(), + new SplittableParDoOverrideFactory())) + .add( + PTransformOverride.of( + PTransformMatchers.classEqualTo(View.AsIterable.class), + new ReflectiveOneToOneOverrideFactory( + FlinkStreamingViewOverrides.StreamingViewAsIterable.class, flinkRunner))) + .add( + PTransformOverride.of( + PTransformMatchers.classEqualTo(View.AsList.class), + new ReflectiveOneToOneOverrideFactory( + FlinkStreamingViewOverrides.StreamingViewAsList.class, flinkRunner))) + .add( + PTransformOverride.of( + PTransformMatchers.classEqualTo(View.AsMap.class), + new ReflectiveOneToOneOverrideFactory( + FlinkStreamingViewOverrides.StreamingViewAsMap.class, flinkRunner))) + .add( + PTransformOverride.of( + PTransformMatchers.classEqualTo(View.AsMultimap.class), + new ReflectiveOneToOneOverrideFactory( + FlinkStreamingViewOverrides.StreamingViewAsMultimap.class, flinkRunner))) + .add( + PTransformOverride.of( + PTransformMatchers.classEqualTo(View.AsSingleton.class), + new ReflectiveOneToOneOverrideFactory( + FlinkStreamingViewOverrides.StreamingViewAsSingleton.class, flinkRunner))) + // this has to be last since the ViewAsSingleton override + // can expand to a Combine.GloballyAsSingletonView + .add( + PTransformOverride.of( + PTransformMatchers.classEqualTo(Combine.GloballyAsSingletonView.class), + new ReflectiveOneToOneOverrideFactory( + FlinkStreamingViewOverrides.StreamingCombineGloballyAsSingletonView.class, + flinkRunner))) + .build(); + + pipeline.replaceAll(transformOverrides); + super.translate(pipeline); + } + + // -------------------------------------------------------------------------------------------- + // Pipeline Visitor Methods + // -------------------------------------------------------------------------------------------- + + @Override + public CompositeBehavior enterCompositeTransform(TransformHierarchy.Node node) { + LOG.info("{} enterCompositeTransform- {}", genSpaces(this.depth), node.getFullName()); + this.depth++; + + PTransform<?, ?> transform = node.getTransform(); + if (transform != null) { + StreamTransformTranslator<?> translator = + FlinkStreamingTransformTranslators.getTranslator(transform); + + if (translator != null && applyCanTranslate(transform, node, translator)) { + applyStreamingTransform(transform, node, translator); + LOG.info("{} translated- {}", genSpaces(this.depth), node.getFullName()); + return CompositeBehavior.DO_NOT_ENTER_TRANSFORM; + } + } + return CompositeBehavior.ENTER_TRANSFORM; + } + + @Override + public void leaveCompositeTransform(TransformHierarchy.Node node) { + this.depth--; + LOG.info("{} leaveCompositeTransform- {}", genSpaces(this.depth), node.getFullName()); + } + + @Override + public void visitPrimitiveTransform(TransformHierarchy.Node node) { + LOG.info("{} visitPrimitiveTransform- {}", genSpaces(this.depth), node.getFullName()); + // get the transformation corresponding to hte node we are + // currently visiting and translate it into its Flink alternative. + + PTransform<?, ?> transform = node.getTransform(); + StreamTransformTranslator<?> translator = + FlinkStreamingTransformTranslators.getTranslator(transform); + + if (translator == null || !applyCanTranslate(transform, node, translator)) { + LOG.info(node.getTransform().getClass().toString()); + throw new UnsupportedOperationException( + "The transform " + transform + " is currently not supported."); + } + applyStreamingTransform(transform, node, translator); + } + + @Override + public void visitValue(PValue value, TransformHierarchy.Node producer) { + // do nothing here + } + + private <T extends PTransform<?, ?>> void applyStreamingTransform( + PTransform<?, ?> transform, + TransformHierarchy.Node node, + StreamTransformTranslator<?> translator) { + + @SuppressWarnings("unchecked") + T typedTransform = (T) transform; + + @SuppressWarnings("unchecked") + StreamTransformTranslator<T> typedTranslator = (StreamTransformTranslator<T>) translator; + + // create the applied PTransform on the streamingContext + streamingContext.setCurrentTransform(node.toAppliedPTransform()); + typedTranslator.translateNode(typedTransform, streamingContext); + } + + private <T extends PTransform<?, ?>> boolean applyCanTranslate( + PTransform<?, ?> transform, + TransformHierarchy.Node node, + StreamTransformTranslator<?> translator) { + + @SuppressWarnings("unchecked") + T typedTransform = (T) transform; + + @SuppressWarnings("unchecked") + StreamTransformTranslator<T> typedTranslator = (StreamTransformTranslator<T>) translator; + + streamingContext.setCurrentTransform(node.toAppliedPTransform()); + + return typedTranslator.canTranslate(typedTransform, streamingContext); + } + + /** + * The interface that every Flink translator of a Beam operator should implement. + * This interface is for <b>streaming</b> jobs. For examples of such translators see + * {@link FlinkStreamingTransformTranslators}. + */ + abstract static class StreamTransformTranslator<T extends PTransform> { + + /** + * Translate the given transform. + */ + abstract void translateNode(T transform, FlinkStreamingTranslationContext context); + + /** + * Returns true iff this translator can translate the given transform. + */ + boolean canTranslate(T transform, FlinkStreamingTranslationContext context) { + return true; + } + } + + private static class ReflectiveOneToOneOverrideFactory< + InputT, OutputT, TransformT extends PTransform<PCollection<InputT>, PCollection<OutputT>>> + extends SingleInputOutputOverrideFactory< + PCollection<InputT>, PCollection<OutputT>, TransformT> { + private final Class<PTransform<PCollection<InputT>, PCollection<OutputT>>> replacement; + private final FlinkRunner runner; + + private ReflectiveOneToOneOverrideFactory( + Class<PTransform<PCollection<InputT>, PCollection<OutputT>>> replacement, + FlinkRunner runner) { + this.replacement = replacement; + this.runner = runner; + } + + @Override + public PTransformReplacement<PCollection<InputT>, PCollection<OutputT>> getReplacementTransform( + AppliedPTransform<PCollection<InputT>, PCollection<OutputT>, TransformT> transform) { + return PTransformReplacement.of( + PTransformReplacements.getSingletonMainInput(transform), + InstanceBuilder.ofType(replacement) + .withArg(FlinkRunner.class, runner) + .withArg( + (Class<PTransform<PCollection<InputT>, PCollection<OutputT>>>) + transform.getTransform().getClass(), + transform.getTransform()) + .build()); + } + } + + /** + * A {@link PTransformOverrideFactory} that overrides a <a + * href="https://s.apache.org/splittable-do-fn">Splittable DoFn</a> with {@link SplittableParDo}. + */ + static class SplittableParDoOverrideFactory<InputT, OutputT> + implements PTransformOverrideFactory< + PCollection<InputT>, PCollectionTuple, MultiOutput<InputT, OutputT>> { + @Override + public PTransformReplacement<PCollection<InputT>, PCollectionTuple> + getReplacementTransform( + AppliedPTransform< + PCollection<InputT>, PCollectionTuple, MultiOutput<InputT, OutputT>> + transform) { + return PTransformReplacement.of( + PTransformReplacements.getSingletonMainInput(transform), + new SplittableParDo<>(transform.getTransform())); + } + + @Override + public Map<PValue, ReplacementOutput> mapOutputs( + Map<TupleTag<?>, PValue> outputs, PCollectionTuple newOutput) { + return ReplacementOutputs.tagged(outputs, newOutput); + } + } +}
