http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java deleted file mode 100644 index 8825ed3..0000000 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java +++ /dev/null @@ -1,269 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink; - -import org.apache.beam.runners.flink.translation.FlinkPipelineTranslator; -import org.apache.beam.runners.flink.translation.FlinkBatchPipelineTranslator; -import org.apache.beam.runners.flink.translation.FlinkStreamingPipelineTranslator; -import com.google.cloud.dataflow.sdk.Pipeline; -import com.google.common.base.Preconditions; -import org.apache.flink.api.common.JobExecutionResult; -import org.apache.flink.api.java.CollectionEnvironment; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.streaming.api.TimeCharacteristic; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.List; - -/** - * The class that instantiates and manages the execution of a given job. - * Depending on if the job is a Streaming or Batch processing one, it creates - * the adequate execution environment ({@link ExecutionEnvironment} or {@link StreamExecutionEnvironment}), - * the necessary {@link FlinkPipelineTranslator} ({@link FlinkBatchPipelineTranslator} or - * {@link FlinkStreamingPipelineTranslator})to transform the Beam job into a Flink one, and - * executes the (translated) job. - */ -public class FlinkPipelineExecutionEnvironment { - - private static final Logger LOG = LoggerFactory.getLogger(FlinkPipelineExecutionEnvironment.class); - - private final FlinkPipelineOptions options; - - /** - * The Flink Batch execution environment. This is instantiated to either a - * {@link org.apache.flink.api.java.CollectionEnvironment}, - * a {@link org.apache.flink.api.java.LocalEnvironment} or - * a {@link org.apache.flink.api.java.RemoteEnvironment}, depending on the configuration - * options. - */ - private ExecutionEnvironment flinkBatchEnv; - - - /** - * The Flink Streaming execution environment. This is instantiated to either a - * {@link org.apache.flink.streaming.api.environment.LocalStreamEnvironment} or - * a {@link org.apache.flink.streaming.api.environment.RemoteStreamEnvironment}, depending - * on the configuration options, and more specifically, the url of the master. - */ - private StreamExecutionEnvironment flinkStreamEnv; - - /** - * Translator for this FlinkPipelineRunner. Its role is to translate the Beam operators to - * their Flink counterparts. Based on the options provided by the user, if we have a streaming job, - * this is instantiated as a {@link FlinkStreamingPipelineTranslator}. In other case, i.e. a batch job, - * a {@link FlinkBatchPipelineTranslator} is created. - */ - private FlinkPipelineTranslator flinkPipelineTranslator; - - /** - * Creates a {@link FlinkPipelineExecutionEnvironment} with the user-specified parameters in the - * provided {@link FlinkPipelineOptions}. - * - * @param options the user-defined pipeline options. - * */ - public FlinkPipelineExecutionEnvironment(FlinkPipelineOptions options) { - this.options = Preconditions.checkNotNull(options); - this.createPipelineExecutionEnvironment(); - this.createPipelineTranslator(); - } - - /** - * Depending on the type of job (Streaming or Batch) and the user-specified options, - * this method creates the adequate ExecutionEnvironment. - */ - private void createPipelineExecutionEnvironment() { - if (options.isStreaming()) { - createStreamExecutionEnvironment(); - } else { - createBatchExecutionEnvironment(); - } - } - - /** - * Depending on the type of job (Streaming or Batch), this method creates the adequate job graph - * translator. In the case of batch, it will work with {@link org.apache.flink.api.java.DataSet}, - * while for streaming, it will work with {@link org.apache.flink.streaming.api.datastream.DataStream}. - */ - private void createPipelineTranslator() { - checkInitializationState(); - if (this.flinkPipelineTranslator != null) { - throw new IllegalStateException("FlinkPipelineTranslator already initialized."); - } - - this.flinkPipelineTranslator = options.isStreaming() ? - new FlinkStreamingPipelineTranslator(flinkStreamEnv, options) : - new FlinkBatchPipelineTranslator(flinkBatchEnv, options); - } - - /** - * Depending on if the job is a Streaming or a Batch one, this method creates - * the necessary execution environment and pipeline translator, and translates - * the {@link com.google.cloud.dataflow.sdk.values.PCollection} program into - * a {@link org.apache.flink.api.java.DataSet} or {@link org.apache.flink.streaming.api.datastream.DataStream} - * one. - * */ - public void translate(Pipeline pipeline) { - checkInitializationState(); - if(this.flinkBatchEnv == null && this.flinkStreamEnv == null) { - createPipelineExecutionEnvironment(); - } - if (this.flinkPipelineTranslator == null) { - createPipelineTranslator(); - } - this.flinkPipelineTranslator.translate(pipeline); - } - - /** - * Launches the program execution. - * */ - public JobExecutionResult executePipeline() throws Exception { - if (options.isStreaming()) { - if (this.flinkStreamEnv == null) { - throw new RuntimeException("FlinkPipelineExecutionEnvironment not initialized."); - } - if (this.flinkPipelineTranslator == null) { - throw new RuntimeException("FlinkPipelineTranslator not initialized."); - } - return this.flinkStreamEnv.execute(); - } else { - if (this.flinkBatchEnv == null) { - throw new RuntimeException("FlinkPipelineExecutionEnvironment not initialized."); - } - if (this.flinkPipelineTranslator == null) { - throw new RuntimeException("FlinkPipelineTranslator not initialized."); - } - return this.flinkBatchEnv.execute(); - } - } - - /** - * If the submitted job is a batch processing job, this method creates the adequate - * Flink {@link org.apache.flink.api.java.ExecutionEnvironment} depending - * on the user-specified options. - */ - private void createBatchExecutionEnvironment() { - if (this.flinkStreamEnv != null || this.flinkBatchEnv != null) { - throw new RuntimeException("FlinkPipelineExecutionEnvironment already initialized."); - } - - LOG.info("Creating the required Batch Execution Environment."); - - String masterUrl = options.getFlinkMaster(); - this.flinkStreamEnv = null; - - // depending on the master, create the right environment. - if (masterUrl.equals("[local]")) { - this.flinkBatchEnv = ExecutionEnvironment.createLocalEnvironment(); - } else if (masterUrl.equals("[collection]")) { - this.flinkBatchEnv = new CollectionEnvironment(); - } else if (masterUrl.equals("[auto]")) { - this.flinkBatchEnv = ExecutionEnvironment.getExecutionEnvironment(); - } else if (masterUrl.matches(".*:\\d*")) { - String[] parts = masterUrl.split(":"); - List<String> stagingFiles = options.getFilesToStage(); - this.flinkBatchEnv = ExecutionEnvironment.createRemoteEnvironment(parts[0], - Integer.parseInt(parts[1]), - stagingFiles.toArray(new String[stagingFiles.size()])); - } else { - LOG.warn("Unrecognized Flink Master URL {}. Defaulting to [auto].", masterUrl); - this.flinkBatchEnv = ExecutionEnvironment.getExecutionEnvironment(); - } - - // set the correct parallelism. - if (options.getParallelism() != -1 && !(this.flinkBatchEnv instanceof CollectionEnvironment)) { - this.flinkBatchEnv.setParallelism(options.getParallelism()); - } - - // set parallelism in the options (required by some execution code) - options.setParallelism(flinkBatchEnv.getParallelism()); - } - - /** - * If the submitted job is a stream processing job, this method creates the adequate - * Flink {@link org.apache.flink.streaming.api.environment.StreamExecutionEnvironment} depending - * on the user-specified options. - */ - private void createStreamExecutionEnvironment() { - if (this.flinkStreamEnv != null || this.flinkBatchEnv != null) { - throw new RuntimeException("FlinkPipelineExecutionEnvironment already initialized."); - } - - LOG.info("Creating the required Streaming Environment."); - - String masterUrl = options.getFlinkMaster(); - this.flinkBatchEnv = null; - - // depending on the master, create the right environment. - if (masterUrl.equals("[local]")) { - this.flinkStreamEnv = StreamExecutionEnvironment.createLocalEnvironment(); - } else if (masterUrl.equals("[auto]")) { - this.flinkStreamEnv = StreamExecutionEnvironment.getExecutionEnvironment(); - } else if (masterUrl.matches(".*:\\d*")) { - String[] parts = masterUrl.split(":"); - List<String> stagingFiles = options.getFilesToStage(); - this.flinkStreamEnv = StreamExecutionEnvironment.createRemoteEnvironment(parts[0], - Integer.parseInt(parts[1]), stagingFiles.toArray(new String[stagingFiles.size()])); - } else { - LOG.warn("Unrecognized Flink Master URL {}. Defaulting to [auto].", masterUrl); - this.flinkStreamEnv = StreamExecutionEnvironment.getExecutionEnvironment(); - } - - // set the correct parallelism. - if (options.getParallelism() != -1) { - this.flinkStreamEnv.setParallelism(options.getParallelism()); - } - - // set parallelism in the options (required by some execution code) - options.setParallelism(flinkStreamEnv.getParallelism()); - - // default to event time - this.flinkStreamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); - - // for the following 2 parameters, a value of -1 means that Flink will use - // the default values as specified in the configuration. - int numRetries = options.getNumberOfExecutionRetries(); - if (numRetries != -1) { - this.flinkStreamEnv.setNumberOfExecutionRetries(numRetries); - } - long retryDelay = options.getExecutionRetryDelay(); - if (retryDelay != -1) { - this.flinkStreamEnv.getConfig().setExecutionRetryDelay(retryDelay); - } - - // A value of -1 corresponds to disabled checkpointing (see CheckpointConfig in Flink). - // If the value is not -1, then the validity checks are applied. - // By default, checkpointing is disabled. - long checkpointInterval = options.getCheckpointingInterval(); - if(checkpointInterval != -1) { - if (checkpointInterval < 1) { - throw new IllegalArgumentException("The checkpoint interval must be positive"); - } - this.flinkStreamEnv.enableCheckpointing(checkpointInterval); - } - } - - private void checkInitializationState() { - if (options.isStreaming() && this.flinkBatchEnv != null) { - throw new IllegalStateException("Attempted to run a Streaming Job with a Batch Execution Environment."); - } else if (!options.isStreaming() && this.flinkStreamEnv != null) { - throw new IllegalStateException("Attempted to run a Batch Job with a Streaming Execution Environment."); - } - } -}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java deleted file mode 100644 index 2f4b3ea..0000000 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java +++ /dev/null @@ -1,93 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink; - - -import com.fasterxml.jackson.annotation.JsonIgnore; -import com.google.cloud.dataflow.sdk.options.ApplicationNameOptions; -import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions; -import com.google.cloud.dataflow.sdk.options.Default; -import com.google.cloud.dataflow.sdk.options.Description; -import com.google.cloud.dataflow.sdk.options.PipelineOptions; -import com.google.cloud.dataflow.sdk.options.StreamingOptions; - -import java.util.List; - -/** - * Options which can be used to configure a Flink PipelineRunner. - */ -public interface FlinkPipelineOptions extends PipelineOptions, ApplicationNameOptions, StreamingOptions { - - /** - * List of local files to make available to workers. - * <p> - * Jars are placed on the worker's classpath. - * <p> - * The default value is the list of jars from the main program's classpath. - */ - @Description("Jar-Files to send to all workers and put on the classpath. " + - "The default value is all files from the classpath.") - @JsonIgnore - List<String> getFilesToStage(); - void setFilesToStage(List<String> value); - - /** - * The job name is used to identify jobs running on a Flink cluster. - */ - @Description("Dataflow job name, to uniquely identify active jobs. " - + "Defaults to using the ApplicationName-UserName-Date.") - @Default.InstanceFactory(DataflowPipelineOptions.JobNameFactory.class) - String getJobName(); - void setJobName(String value); - - /** - * The url of the Flink JobManager on which to execute pipelines. This can either be - * the the address of a cluster JobManager, in the form "host:port" or one of the special - * Strings "[local]", "[collection]" or "[auto]". "[local]" will start a local Flink - * Cluster in the JVM, "[collection]" will execute the pipeline on Java Collections while - * "[auto]" will let the system decide where to execute the pipeline based on the environment. - */ - @Description("Address of the Flink Master where the Pipeline should be executed. Can" + - " either be of the form \"host:port\" or one of the special values [local], " + - "[collection] or [auto].") - String getFlinkMaster(); - void setFlinkMaster(String value); - - @Description("The degree of parallelism to be used when distributing operations onto workers.") - @Default.Integer(-1) - Integer getParallelism(); - void setParallelism(Integer value); - - @Description("The interval between consecutive checkpoints (i.e. snapshots of the current pipeline state used for " + - "fault tolerance).") - @Default.Long(-1L) - Long getCheckpointingInterval(); - void setCheckpointingInterval(Long interval); - - @Description("Sets the number of times that failed tasks are re-executed. " + - "A value of zero effectively disables fault tolerance. A value of -1 indicates " + - "that the system default value (as defined in the configuration) should be used.") - @Default.Integer(-1) - Integer getNumberOfExecutionRetries(); - void setNumberOfExecutionRetries(Integer retries); - - @Description("Sets the delay between executions. A value of {@code -1} indicates that the default value should be used.") - @Default.Long(-1L) - Long getExecutionRetryDelay(); - void setExecutionRetryDelay(Long delay); -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java deleted file mode 100644 index fe773d9..0000000 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java +++ /dev/null @@ -1,206 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink; - -import com.google.cloud.dataflow.sdk.Pipeline; -import com.google.cloud.dataflow.sdk.options.PipelineOptions; -import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; -import com.google.cloud.dataflow.sdk.options.PipelineOptionsValidator; -import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner; -import com.google.cloud.dataflow.sdk.runners.PipelineRunner; -import com.google.cloud.dataflow.sdk.transforms.PTransform; -import com.google.cloud.dataflow.sdk.values.PInput; -import com.google.cloud.dataflow.sdk.values.POutput; -import com.google.common.base.Joiner; -import com.google.common.base.Preconditions; -import org.apache.flink.api.common.JobExecutionResult; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.File; -import java.net.URISyntaxException; -import java.net.URL; -import java.net.URLClassLoader; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; - -/** - * A {@link PipelineRunner} that executes the operations in the - * pipeline by first translating them to a Flink Plan and then executing them either locally - * or on a Flink cluster, depending on the configuration. - * <p> - * This is based on {@link com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner}. - */ -public class FlinkPipelineRunner extends PipelineRunner<FlinkRunnerResult> { - - private static final Logger LOG = LoggerFactory.getLogger(FlinkPipelineRunner.class); - - /** - * Provided options. - */ - private final FlinkPipelineOptions options; - - private final FlinkPipelineExecutionEnvironment flinkJobEnv; - - /** - * Construct a runner from the provided options. - * - * @param options Properties which configure the runner. - * @return The newly created runner. - */ - public static FlinkPipelineRunner fromOptions(PipelineOptions options) { - FlinkPipelineOptions flinkOptions = - PipelineOptionsValidator.validate(FlinkPipelineOptions.class, options); - ArrayList<String> missing = new ArrayList<>(); - - if (flinkOptions.getAppName() == null) { - missing.add("appName"); - } - if (missing.size() > 0) { - throw new IllegalArgumentException( - "Missing required values: " + Joiner.on(',').join(missing)); - } - - if (flinkOptions.getFilesToStage() == null) { - flinkOptions.setFilesToStage(detectClassPathResourcesToStage( - DataflowPipelineRunner.class.getClassLoader())); - LOG.info("PipelineOptions.filesToStage was not specified. " - + "Defaulting to files from the classpath: will stage {} files. " - + "Enable logging at DEBUG level to see which files will be staged.", - flinkOptions.getFilesToStage().size()); - LOG.debug("Classpath elements: {}", flinkOptions.getFilesToStage()); - } - - // Verify jobName according to service requirements. - String jobName = flinkOptions.getJobName().toLowerCase(); - Preconditions.checkArgument(jobName.matches("[a-z]([-a-z0-9]*[a-z0-9])?"), "JobName invalid; " + - "the name must consist of only the characters " + "[-a-z0-9], starting with a letter " + - "and ending with a letter " + "or number"); - Preconditions.checkArgument(jobName.length() <= 40, - "JobName too long; must be no more than 40 characters in length"); - - // Set Flink Master to [auto] if no option was specified. - if (flinkOptions.getFlinkMaster() == null) { - flinkOptions.setFlinkMaster("[auto]"); - } - - return new FlinkPipelineRunner(flinkOptions); - } - - private FlinkPipelineRunner(FlinkPipelineOptions options) { - this.options = options; - this.flinkJobEnv = new FlinkPipelineExecutionEnvironment(options); - } - - @Override - public FlinkRunnerResult run(Pipeline pipeline) { - LOG.info("Executing pipeline using FlinkPipelineRunner."); - - LOG.info("Translating pipeline to Flink program."); - - this.flinkJobEnv.translate(pipeline); - - LOG.info("Starting execution of Flink program."); - - JobExecutionResult result; - try { - result = this.flinkJobEnv.executePipeline(); - } catch (Exception e) { - LOG.error("Pipeline execution failed", e); - throw new RuntimeException("Pipeline execution failed", e); - } - - LOG.info("Execution finished in {} msecs", result.getNetRuntime()); - - Map<String, Object> accumulators = result.getAllAccumulatorResults(); - if (accumulators != null && !accumulators.isEmpty()) { - LOG.info("Final aggregator values:"); - - for (Map.Entry<String, Object> entry : result.getAllAccumulatorResults().entrySet()) { - LOG.info("{} : {}", entry.getKey(), entry.getValue()); - } - } - - return new FlinkRunnerResult(accumulators, result.getNetRuntime()); - } - - /** - * For testing. - */ - public FlinkPipelineOptions getPipelineOptions() { - return options; - } - - /** - * Constructs a runner with default properties for testing. - * - * @return The newly created runner. - */ - public static FlinkPipelineRunner createForTest(boolean streaming) { - FlinkPipelineOptions options = PipelineOptionsFactory.as(FlinkPipelineOptions.class); - // we use [auto] for testing since this will make it pick up the Testing - // ExecutionEnvironment - options.setFlinkMaster("[auto]"); - options.setStreaming(streaming); - return new FlinkPipelineRunner(options); - } - - @Override - public <Output extends POutput, Input extends PInput> Output apply( - PTransform<Input, Output> transform, Input input) { - return super.apply(transform, input); - } - - ///////////////////////////////////////////////////////////////////////////// - - @Override - public String toString() { - return "DataflowPipelineRunner#" + hashCode(); - } - - /** - * Attempts to detect all the resources the class loader has access to. This does not recurse - * to class loader parents stopping it from pulling in resources from the system class loader. - * - * @param classLoader The URLClassLoader to use to detect resources to stage. - * @return A list of absolute paths to the resources the class loader uses. - * @throws IllegalArgumentException If either the class loader is not a URLClassLoader or one - * of the resources the class loader exposes is not a file resource. - */ - protected static List<String> detectClassPathResourcesToStage(ClassLoader classLoader) { - if (!(classLoader instanceof URLClassLoader)) { - String message = String.format("Unable to use ClassLoader to detect classpath elements. " - + "Current ClassLoader is %s, only URLClassLoaders are supported.", classLoader); - LOG.error(message); - throw new IllegalArgumentException(message); - } - - List<String> files = new ArrayList<>(); - for (URL url : ((URLClassLoader) classLoader).getURLs()) { - try { - files.add(new File(url.toURI()).getAbsolutePath()); - } catch (IllegalArgumentException | URISyntaxException e) { - String message = String.format("Unable to convert url (%s) to file.", url); - LOG.error(message); - throw new IllegalArgumentException(message, e); - } - } - return files; - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java deleted file mode 100644 index 8fd08ec..0000000 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink; - -import com.google.cloud.dataflow.sdk.PipelineResult; -import com.google.cloud.dataflow.sdk.runners.AggregatorRetrievalException; -import com.google.cloud.dataflow.sdk.runners.AggregatorValues; -import com.google.cloud.dataflow.sdk.transforms.Aggregator; - -import java.util.Collections; -import java.util.Map; - -/** - * Result of executing a {@link com.google.cloud.dataflow.sdk.Pipeline} with Flink. This - * has methods to query to job runtime and the final values of - * {@link com.google.cloud.dataflow.sdk.transforms.Aggregator}s. - */ -public class FlinkRunnerResult implements PipelineResult { - - private final Map<String, Object> aggregators; - - private final long runtime; - - public FlinkRunnerResult(Map<String, Object> aggregators, long runtime) { - this.aggregators = (aggregators == null || aggregators.isEmpty()) ? - Collections.<String, Object>emptyMap() : - Collections.unmodifiableMap(aggregators); - - this.runtime = runtime; - } - - @Override - public State getState() { - return null; - } - - @Override - public <T> AggregatorValues<T> getAggregatorValues(final Aggregator<?, T> aggregator) throws AggregatorRetrievalException { - // TODO provide a list of all accumulator step values - Object value = aggregators.get(aggregator.getName()); - if (value != null) { - return new AggregatorValues<T>() { - @Override - public Map<String, T> getValuesAtSteps() { - return (Map<String, T>) aggregators; - } - }; - } else { - throw new AggregatorRetrievalException("Accumulator results not found.", - new RuntimeException("Accumulator does not exist.")); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java deleted file mode 100644 index ab23b92..0000000 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java +++ /dev/null @@ -1,452 +0,0 @@ -/* - * Copyright (C) 2015 Google Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package org.apache.beam.runners.flink.examples; - -import org.apache.beam.runners.flink.FlinkPipelineOptions; -import org.apache.beam.runners.flink.FlinkPipelineRunner; -import com.google.cloud.dataflow.sdk.Pipeline; -import com.google.cloud.dataflow.sdk.coders.Coder; -import com.google.cloud.dataflow.sdk.coders.KvCoder; -import com.google.cloud.dataflow.sdk.coders.StringDelegateCoder; -import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder; -import com.google.cloud.dataflow.sdk.io.TextIO; -import com.google.cloud.dataflow.sdk.options.Default; -import com.google.cloud.dataflow.sdk.options.Description; -import com.google.cloud.dataflow.sdk.options.GcsOptions; -import com.google.cloud.dataflow.sdk.options.PipelineOptions; -import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; -import com.google.cloud.dataflow.sdk.options.Validation; -import com.google.cloud.dataflow.sdk.transforms.Count; -import com.google.cloud.dataflow.sdk.transforms.DoFn; -import com.google.cloud.dataflow.sdk.transforms.Flatten; -import com.google.cloud.dataflow.sdk.transforms.Keys; -import com.google.cloud.dataflow.sdk.transforms.PTransform; -import com.google.cloud.dataflow.sdk.transforms.ParDo; -import com.google.cloud.dataflow.sdk.transforms.RemoveDuplicates; -import com.google.cloud.dataflow.sdk.transforms.Values; -import com.google.cloud.dataflow.sdk.transforms.View; -import com.google.cloud.dataflow.sdk.transforms.WithKeys; -import com.google.cloud.dataflow.sdk.transforms.join.CoGbkResult; -import com.google.cloud.dataflow.sdk.transforms.join.CoGroupByKey; -import com.google.cloud.dataflow.sdk.transforms.join.KeyedPCollectionTuple; -import com.google.cloud.dataflow.sdk.util.GcsUtil; -import com.google.cloud.dataflow.sdk.util.gcsfs.GcsPath; -import com.google.cloud.dataflow.sdk.values.KV; -import com.google.cloud.dataflow.sdk.values.PCollection; -import com.google.cloud.dataflow.sdk.values.PCollectionList; -import com.google.cloud.dataflow.sdk.values.PCollectionView; -import com.google.cloud.dataflow.sdk.values.PDone; -import com.google.cloud.dataflow.sdk.values.PInput; -import com.google.cloud.dataflow.sdk.values.TupleTag; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.File; -import java.io.IOException; -import java.net.URI; -import java.net.URISyntaxException; -import java.util.HashSet; -import java.util.Set; - -/** - * An example that computes a basic TF-IDF search table for a directory or GCS prefix. - * - * <p> Concepts: joining data; side inputs; logging - * - * <p> To execute this pipeline locally, specify general pipeline configuration: - * <pre>{@code - * --project=YOUR_PROJECT_ID - * }</pre> - * and a local output file or output prefix on GCS: - * <pre>{@code - * --output=[YOUR_LOCAL_FILE | gs://YOUR_OUTPUT_PREFIX] - * }</pre> - * - * <p> To execute this pipeline using the Dataflow service, specify pipeline configuration: - * <pre>{@code - * --project=YOUR_PROJECT_ID - * --stagingLocation=gs://YOUR_STAGING_DIRECTORY - * --runner=BlockingDataflowPipelineRunner - * and an output prefix on GCS: - * --output=gs://YOUR_OUTPUT_PREFIX - * }</pre> - * - * <p> The default input is {@code gs://dataflow-samples/shakespeare/} and can be overridden with - * {@code --input}. - */ -public class TFIDF { - /** - * Options supported by {@link TFIDF}. - * <p> - * Inherits standard configuration options. - */ - private interface Options extends PipelineOptions, FlinkPipelineOptions { - @Description("Path to the directory or GCS prefix containing files to read from") - @Default.String("gs://dataflow-samples/shakespeare/") - String getInput(); - void setInput(String value); - - @Description("Prefix of output URI to write to") - @Validation.Required - String getOutput(); - void setOutput(String value); - } - - /** - * Lists documents contained beneath the {@code options.input} prefix/directory. - */ - public static Set<URI> listInputDocuments(Options options) - throws URISyntaxException, IOException { - URI baseUri = new URI(options.getInput()); - - // List all documents in the directory or GCS prefix. - URI absoluteUri; - if (baseUri.getScheme() != null) { - absoluteUri = baseUri; - } else { - absoluteUri = new URI( - "file", - baseUri.getAuthority(), - baseUri.getPath(), - baseUri.getQuery(), - baseUri.getFragment()); - } - - Set<URI> uris = new HashSet<>(); - if (absoluteUri.getScheme().equals("file")) { - File directory = new File(absoluteUri); - for (String entry : directory.list()) { - File path = new File(directory, entry); - uris.add(path.toURI()); - } - } else if (absoluteUri.getScheme().equals("gs")) { - GcsUtil gcsUtil = options.as(GcsOptions.class).getGcsUtil(); - URI gcsUriGlob = new URI( - absoluteUri.getScheme(), - absoluteUri.getAuthority(), - absoluteUri.getPath() + "*", - absoluteUri.getQuery(), - absoluteUri.getFragment()); - for (GcsPath entry : gcsUtil.expand(GcsPath.fromUri(gcsUriGlob))) { - uris.add(entry.toUri()); - } - } - - return uris; - } - - /** - * Reads the documents at the provided uris and returns all lines - * from the documents tagged with which document they are from. - */ - public static class ReadDocuments - extends PTransform<PInput, PCollection<KV<URI, String>>> { - private static final long serialVersionUID = 0; - - private Iterable<URI> uris; - - public ReadDocuments(Iterable<URI> uris) { - this.uris = uris; - } - - @Override - public Coder<?> getDefaultOutputCoder() { - return KvCoder.of(StringDelegateCoder.of(URI.class), StringUtf8Coder.of()); - } - - @Override - public PCollection<KV<URI, String>> apply(PInput input) { - Pipeline pipeline = input.getPipeline(); - - // Create one TextIO.Read transform for each document - // and add its output to a PCollectionList - PCollectionList<KV<URI, String>> urisToLines = - PCollectionList.empty(pipeline); - - // TextIO.Read supports: - // - file: URIs and paths locally - // - gs: URIs on the service - for (final URI uri : uris) { - String uriString; - if (uri.getScheme().equals("file")) { - uriString = new File(uri).getPath(); - } else { - uriString = uri.toString(); - } - - PCollection<KV<URI, String>> oneUriToLines = pipeline - .apply(TextIO.Read.from(uriString) - .named("TextIO.Read(" + uriString + ")")) - .apply("WithKeys(" + uriString + ")", WithKeys.<URI, String>of(uri)); - - urisToLines = urisToLines.and(oneUriToLines); - } - - return urisToLines.apply(Flatten.<KV<URI, String>>pCollections()); - } - } - - /** - * A transform containing a basic TF-IDF pipeline. The input consists of KV objects - * where the key is the document's URI and the value is a piece - * of the document's content. The output is mapping from terms to - * scores for each document URI. - */ - public static class ComputeTfIdf - extends PTransform<PCollection<KV<URI, String>>, PCollection<KV<String, KV<URI, Double>>>> { - private static final long serialVersionUID = 0; - - public ComputeTfIdf() { } - - @Override - public PCollection<KV<String, KV<URI, Double>>> apply( - PCollection<KV<URI, String>> uriToContent) { - - // Compute the total number of documents, and - // prepare this singleton PCollectionView for - // use as a side input. - final PCollectionView<Long> totalDocuments = - uriToContent - .apply("GetURIs", Keys.<URI>create()) - .apply("RemoveDuplicateDocs", RemoveDuplicates.<URI>create()) - .apply(Count.<URI>globally()) - .apply(View.<Long>asSingleton()); - - // Create a collection of pairs mapping a URI to each - // of the words in the document associated with that that URI. - PCollection<KV<URI, String>> uriToWords = uriToContent - .apply(ParDo.named("SplitWords").of( - new DoFn<KV<URI, String>, KV<URI, String>>() { - private static final long serialVersionUID = 0; - - @Override - public void processElement(ProcessContext c) { - URI uri = c.element().getKey(); - String line = c.element().getValue(); - for (String word : line.split("\\W+")) { - // Log INFO messages when the word âloveâ is found. - if (word.toLowerCase().equals("love")) { - LOG.info("Found {}", word.toLowerCase()); - } - - if (!word.isEmpty()) { - c.output(KV.of(uri, word.toLowerCase())); - } - } - } - })); - - // Compute a mapping from each word to the total - // number of documents in which it appears. - PCollection<KV<String, Long>> wordToDocCount = uriToWords - .apply("RemoveDuplicateWords", RemoveDuplicates.<KV<URI, String>>create()) - .apply(Values.<String>create()) - .apply("CountDocs", Count.<String>perElement()); - - // Compute a mapping from each URI to the total - // number of words in the document associated with that URI. - PCollection<KV<URI, Long>> uriToWordTotal = uriToWords - .apply("GetURIs2", Keys.<URI>create()) - .apply("CountWords", Count.<URI>perElement()); - - // Count, for each (URI, word) pair, the number of - // occurrences of that word in the document associated - // with the URI. - PCollection<KV<KV<URI, String>, Long>> uriAndWordToCount = uriToWords - .apply("CountWordDocPairs", Count.<KV<URI, String>>perElement()); - - // Adjust the above collection to a mapping from - // (URI, word) pairs to counts into an isomorphic mapping - // from URI to (word, count) pairs, to prepare for a join - // by the URI key. - PCollection<KV<URI, KV<String, Long>>> uriToWordAndCount = uriAndWordToCount - .apply(ParDo.named("ShiftKeys").of( - new DoFn<KV<KV<URI, String>, Long>, KV<URI, KV<String, Long>>>() { - private static final long serialVersionUID = 0; - - @Override - public void processElement(ProcessContext c) { - URI uri = c.element().getKey().getKey(); - String word = c.element().getKey().getValue(); - Long occurrences = c.element().getValue(); - c.output(KV.of(uri, KV.of(word, occurrences))); - } - })); - - // Prepare to join the mapping of URI to (word, count) pairs with - // the mapping of URI to total word counts, by associating - // each of the input PCollection<KV<URI, ...>> with - // a tuple tag. Each input must have the same key type, URI - // in this case. The type parameter of the tuple tag matches - // the types of the values for each collection. - final TupleTag<Long> wordTotalsTag = new TupleTag<>(); - final TupleTag<KV<String, Long>> wordCountsTag = new TupleTag<>(); - KeyedPCollectionTuple<URI> coGbkInput = KeyedPCollectionTuple - .of(wordTotalsTag, uriToWordTotal) - .and(wordCountsTag, uriToWordAndCount); - - // Perform a CoGroupByKey (a sort of pre-join) on the prepared - // inputs. This yields a mapping from URI to a CoGbkResult - // (CoGroupByKey Result). The CoGbkResult is a mapping - // from the above tuple tags to the values in each input - // associated with a particular URI. In this case, each - // KV<URI, CoGbkResult> group a URI with the total number of - // words in that document as well as all the (word, count) - // pairs for particular words. - PCollection<KV<URI, CoGbkResult>> uriToWordAndCountAndTotal = coGbkInput - .apply("CoGroupByUri", CoGroupByKey.<URI>create()); - - // Compute a mapping from each word to a (URI, term frequency) - // pair for each URI. A word's term frequency for a document - // is simply the number of times that word occurs in the document - // divided by the total number of words in the document. - PCollection<KV<String, KV<URI, Double>>> wordToUriAndTf = uriToWordAndCountAndTotal - .apply(ParDo.named("ComputeTermFrequencies").of( - new DoFn<KV<URI, CoGbkResult>, KV<String, KV<URI, Double>>>() { - private static final long serialVersionUID = 0; - - @Override - public void processElement(ProcessContext c) { - URI uri = c.element().getKey(); - Long wordTotal = c.element().getValue().getOnly(wordTotalsTag); - - for (KV<String, Long> wordAndCount - : c.element().getValue().getAll(wordCountsTag)) { - String word = wordAndCount.getKey(); - Long wordCount = wordAndCount.getValue(); - Double termFrequency = wordCount.doubleValue() / wordTotal.doubleValue(); - c.output(KV.of(word, KV.of(uri, termFrequency))); - } - } - })); - - // Compute a mapping from each word to its document frequency. - // A word's document frequency in a corpus is the number of - // documents in which the word appears divided by the total - // number of documents in the corpus. Note how the total number of - // documents is passed as a side input; the same value is - // presented to each invocation of the DoFn. - PCollection<KV<String, Double>> wordToDf = wordToDocCount - .apply(ParDo - .named("ComputeDocFrequencies") - .withSideInputs(totalDocuments) - .of(new DoFn<KV<String, Long>, KV<String, Double>>() { - private static final long serialVersionUID = 0; - - @Override - public void processElement(ProcessContext c) { - String word = c.element().getKey(); - Long documentCount = c.element().getValue(); - Long documentTotal = c.sideInput(totalDocuments); - Double documentFrequency = documentCount.doubleValue() - / documentTotal.doubleValue(); - - c.output(KV.of(word, documentFrequency)); - } - })); - - // Join the term frequency and document frequency - // collections, each keyed on the word. - final TupleTag<KV<URI, Double>> tfTag = new TupleTag<>(); - final TupleTag<Double> dfTag = new TupleTag<>(); - PCollection<KV<String, CoGbkResult>> wordToUriAndTfAndDf = KeyedPCollectionTuple - .of(tfTag, wordToUriAndTf) - .and(dfTag, wordToDf) - .apply(CoGroupByKey.<String>create()); - - // Compute a mapping from each word to a (URI, TF-IDF) score - // for each URI. There are a variety of definitions of TF-IDF - // ("term frequency - inverse document frequency") score; - // here we use a basic version that is the term frequency - // divided by the log of the document frequency. - - return wordToUriAndTfAndDf - .apply(ParDo.named("ComputeTfIdf").of( - new DoFn<KV<String, CoGbkResult>, KV<String, KV<URI, Double>>>() { - private static final long serialVersionUID1 = 0; - - @Override - public void processElement(ProcessContext c) { - String word = c.element().getKey(); - Double df = c.element().getValue().getOnly(dfTag); - - for (KV<URI, Double> uriAndTf : c.element().getValue().getAll(tfTag)) { - URI uri = uriAndTf.getKey(); - Double tf = uriAndTf.getValue(); - Double tfIdf = tf * Math.log(1 / df); - c.output(KV.of(word, KV.of(uri, tfIdf))); - } - } - })); - } - - // Instantiate Logger. - // It is suggested that the user specify the class name of the containing class - // (in this case ComputeTfIdf). - private static final Logger LOG = LoggerFactory.getLogger(ComputeTfIdf.class); - } - - /** - * A {@link PTransform} to write, in CSV format, a mapping from term and URI - * to score. - */ - public static class WriteTfIdf - extends PTransform<PCollection<KV<String, KV<URI, Double>>>, PDone> { - private static final long serialVersionUID = 0; - - private String output; - - public WriteTfIdf(String output) { - this.output = output; - } - - @Override - public PDone apply(PCollection<KV<String, KV<URI, Double>>> wordToUriAndTfIdf) { - return wordToUriAndTfIdf - .apply(ParDo.named("Format").of(new DoFn<KV<String, KV<URI, Double>>, String>() { - private static final long serialVersionUID = 0; - - @Override - public void processElement(ProcessContext c) { - c.output(String.format("%s,\t%s,\t%f", - c.element().getKey(), - c.element().getValue().getKey(), - c.element().getValue().getValue())); - } - })) - .apply(TextIO.Write - .to(output) - .withSuffix(".csv")); - } - } - - public static void main(String[] args) throws Exception { - Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class); - - options.setRunner(FlinkPipelineRunner.class); - - Pipeline pipeline = Pipeline.create(options); - pipeline.getCoderRegistry().registerCoder(URI.class, StringDelegateCoder.of(URI.class)); - - pipeline - .apply(new ReadDocuments(listInputDocuments(options))) - .apply(new ComputeTfIdf()) - .apply(new WriteTfIdf(options.getOutput())); - - pipeline.run(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java deleted file mode 100644 index 7d12fed..0000000 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java +++ /dev/null @@ -1,113 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink.examples; - -import org.apache.beam.runners.flink.FlinkPipelineOptions; -import org.apache.beam.runners.flink.FlinkPipelineRunner; -import com.google.cloud.dataflow.sdk.Pipeline; -import com.google.cloud.dataflow.sdk.io.TextIO; -import com.google.cloud.dataflow.sdk.options.Default; -import com.google.cloud.dataflow.sdk.options.Description; -import com.google.cloud.dataflow.sdk.options.PipelineOptions; -import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; -import com.google.cloud.dataflow.sdk.transforms.*; -import com.google.cloud.dataflow.sdk.values.KV; -import com.google.cloud.dataflow.sdk.values.PCollection; - -public class WordCount { - - public static class ExtractWordsFn extends DoFn<String, String> { - private final Aggregator<Long, Long> emptyLines = - createAggregator("emptyLines", new Sum.SumLongFn()); - - @Override - public void processElement(ProcessContext c) { - if (c.element().trim().isEmpty()) { - emptyLines.addValue(1L); - } - - // Split the line into words. - String[] words = c.element().split("[^a-zA-Z']+"); - - // Output each word encountered into the output PCollection. - for (String word : words) { - if (!word.isEmpty()) { - c.output(word); - } - } - } - } - - public static class CountWords extends PTransform<PCollection<String>, - PCollection<KV<String, Long>>> { - @Override - public PCollection<KV<String, Long>> apply(PCollection<String> lines) { - - // Convert lines of text into individual words. - PCollection<String> words = lines.apply( - ParDo.of(new ExtractWordsFn())); - - // Count the number of times each word occurs. - PCollection<KV<String, Long>> wordCounts = - words.apply(Count.<String>perElement()); - - return wordCounts; - } - } - - /** A SimpleFunction that converts a Word and Count into a printable string. */ - public static class FormatAsTextFn extends SimpleFunction<KV<String, Long>, String> { - @Override - public String apply(KV<String, Long> input) { - return input.getKey() + ": " + input.getValue(); - } - } - - /** - * Options supported by {@link WordCount}. - * <p> - * Inherits standard configuration options. - */ - public interface Options extends PipelineOptions, FlinkPipelineOptions { - @Description("Path of the file to read from") - @Default.String("gs://dataflow-samples/shakespeare/kinglear.txt") - String getInput(); - void setInput(String value); - - @Description("Path of the file to write to") - String getOutput(); - void setOutput(String value); - } - - public static void main(String[] args) { - - Options options = PipelineOptionsFactory.fromArgs(args).withValidation() - .as(Options.class); - options.setRunner(FlinkPipelineRunner.class); - - Pipeline p = Pipeline.create(options); - - p.apply(TextIO.Read.named("ReadLines").from(options.getInput())) - .apply(new CountWords()) - .apply(MapElements.via(new FormatAsTextFn())) - .apply(TextIO.Write.named("WriteCounts").to(options.getOutput())); - - p.run(); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java deleted file mode 100644 index 8168122..0000000 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java +++ /dev/null @@ -1,387 +0,0 @@ -/* - * Copyright (C) 2015 Google Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package org.apache.beam.runners.flink.examples.streaming; - -import org.apache.beam.runners.flink.FlinkPipelineRunner; -import org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSocketSource; -import com.google.cloud.dataflow.sdk.Pipeline; -import com.google.cloud.dataflow.sdk.coders.AvroCoder; -import com.google.cloud.dataflow.sdk.coders.DefaultCoder; -import com.google.cloud.dataflow.sdk.io.*; -import com.google.cloud.dataflow.sdk.options.Default; -import com.google.cloud.dataflow.sdk.options.Description; -import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; -import com.google.cloud.dataflow.sdk.transforms.*; -import com.google.cloud.dataflow.sdk.transforms.Partition.PartitionFn; -import com.google.cloud.dataflow.sdk.transforms.windowing.*; -import com.google.cloud.dataflow.sdk.values.KV; -import com.google.cloud.dataflow.sdk.values.PBegin; -import com.google.cloud.dataflow.sdk.values.PCollection; -import com.google.cloud.dataflow.sdk.values.PCollectionList; -import org.joda.time.Duration; - -import java.io.IOException; -import java.util.List; - -/** - * To run the example, first open a socket on a terminal by executing the command: - * <li> - * <li> - * <code>nc -lk 9999</code> - * </li> - * </li> - * and then launch the example. Now whatever you type in the terminal is going to be - * the input to the program. - * */ -public class AutoComplete { - - /** - * A PTransform that takes as input a list of tokens and returns - * the most common tokens per prefix. - */ - public static class ComputeTopCompletions - extends PTransform<PCollection<String>, PCollection<KV<String, List<CompletionCandidate>>>> { - private static final long serialVersionUID = 0; - - private final int candidatesPerPrefix; - private final boolean recursive; - - protected ComputeTopCompletions(int candidatesPerPrefix, boolean recursive) { - this.candidatesPerPrefix = candidatesPerPrefix; - this.recursive = recursive; - } - - public static ComputeTopCompletions top(int candidatesPerPrefix, boolean recursive) { - return new ComputeTopCompletions(candidatesPerPrefix, recursive); - } - - @Override - public PCollection<KV<String, List<CompletionCandidate>>> apply(PCollection<String> input) { - PCollection<CompletionCandidate> candidates = input - // First count how often each token appears. - .apply(new Count.PerElement<String>()) - - // Map the KV outputs of Count into our own CompletionCandiate class. - .apply(ParDo.named("CreateCompletionCandidates").of( - new DoFn<KV<String, Long>, CompletionCandidate>() { - private static final long serialVersionUID = 0; - - @Override - public void processElement(ProcessContext c) { - CompletionCandidate cand = new CompletionCandidate(c.element().getKey(), c.element().getValue()); - c.output(cand); - } - })); - - // Compute the top via either a flat or recursive algorithm. - if (recursive) { - return candidates - .apply(new ComputeTopRecursive(candidatesPerPrefix, 1)) - .apply(Flatten.<KV<String, List<CompletionCandidate>>>pCollections()); - } else { - return candidates - .apply(new ComputeTopFlat(candidatesPerPrefix, 1)); - } - } - } - - /** - * Lower latency, but more expensive. - */ - private static class ComputeTopFlat - extends PTransform<PCollection<CompletionCandidate>, - PCollection<KV<String, List<CompletionCandidate>>>> { - private static final long serialVersionUID = 0; - - private final int candidatesPerPrefix; - private final int minPrefix; - - public ComputeTopFlat(int candidatesPerPrefix, int minPrefix) { - this.candidatesPerPrefix = candidatesPerPrefix; - this.minPrefix = minPrefix; - } - - @Override - public PCollection<KV<String, List<CompletionCandidate>>> apply( - PCollection<CompletionCandidate> input) { - return input - // For each completion candidate, map it to all prefixes. - .apply(ParDo.of(new AllPrefixes(minPrefix))) - - // Find and return the top candiates for each prefix. - .apply(Top.<String, CompletionCandidate>largestPerKey(candidatesPerPrefix) - .withHotKeyFanout(new HotKeyFanout())); - } - - private static class HotKeyFanout implements SerializableFunction<String, Integer> { - private static final long serialVersionUID = 0; - - @Override - public Integer apply(String input) { - return (int) Math.pow(4, 5 - input.length()); - } - } - } - - /** - * Cheaper but higher latency. - * - * <p> Returns two PCollections, the first is top prefixes of size greater - * than minPrefix, and the second is top prefixes of size exactly - * minPrefix. - */ - private static class ComputeTopRecursive - extends PTransform<PCollection<CompletionCandidate>, - PCollectionList<KV<String, List<CompletionCandidate>>>> { - private static final long serialVersionUID = 0; - - private final int candidatesPerPrefix; - private final int minPrefix; - - public ComputeTopRecursive(int candidatesPerPrefix, int minPrefix) { - this.candidatesPerPrefix = candidatesPerPrefix; - this.minPrefix = minPrefix; - } - - private class KeySizePartitionFn implements PartitionFn<KV<String, List<CompletionCandidate>>> { - private static final long serialVersionUID = 0; - - @Override - public int partitionFor(KV<String, List<CompletionCandidate>> elem, int numPartitions) { - return elem.getKey().length() > minPrefix ? 0 : 1; - } - } - - private static class FlattenTops - extends DoFn<KV<String, List<CompletionCandidate>>, CompletionCandidate> { - private static final long serialVersionUID = 0; - - @Override - public void processElement(ProcessContext c) { - for (CompletionCandidate cc : c.element().getValue()) { - c.output(cc); - } - } - } - - @Override - public PCollectionList<KV<String, List<CompletionCandidate>>> apply( - PCollection<CompletionCandidate> input) { - if (minPrefix > 10) { - // Base case, partitioning to return the output in the expected format. - return input - .apply(new ComputeTopFlat(candidatesPerPrefix, minPrefix)) - .apply(Partition.of(2, new KeySizePartitionFn())); - } else { - // If a candidate is in the top N for prefix a...b, it must also be in the top - // N for a...bX for every X, which is typlically a much smaller set to consider. - // First, compute the top candidate for prefixes of size at least minPrefix + 1. - PCollectionList<KV<String, List<CompletionCandidate>>> larger = input - .apply(new ComputeTopRecursive(candidatesPerPrefix, minPrefix + 1)); - // Consider the top candidates for each prefix of length minPrefix + 1... - PCollection<KV<String, List<CompletionCandidate>>> small = - PCollectionList - .of(larger.get(1).apply(ParDo.of(new FlattenTops()))) - // ...together with those (previously excluded) candidates of length - // exactly minPrefix... - .and(input.apply(Filter.by(new SerializableFunction<CompletionCandidate, Boolean>() { - private static final long serialVersionUID = 0; - - @Override - public Boolean apply(CompletionCandidate c) { - return c.getValue().length() == minPrefix; - } - }))) - .apply("FlattenSmall", Flatten.<CompletionCandidate>pCollections()) - // ...set the key to be the minPrefix-length prefix... - .apply(ParDo.of(new AllPrefixes(minPrefix, minPrefix))) - // ...and (re)apply the Top operator to all of them together. - .apply(Top.<String, CompletionCandidate>largestPerKey(candidatesPerPrefix)); - - PCollection<KV<String, List<CompletionCandidate>>> flattenLarger = larger - .apply("FlattenLarge", Flatten.<KV<String, List<CompletionCandidate>>>pCollections()); - - return PCollectionList.of(flattenLarger).and(small); - } - } - } - - /** - * A DoFn that keys each candidate by all its prefixes. - */ - private static class AllPrefixes - extends DoFn<CompletionCandidate, KV<String, CompletionCandidate>> { - private static final long serialVersionUID = 0; - - private final int minPrefix; - private final int maxPrefix; - public AllPrefixes(int minPrefix) { - this(minPrefix, Integer.MAX_VALUE); - } - public AllPrefixes(int minPrefix, int maxPrefix) { - this.minPrefix = minPrefix; - this.maxPrefix = maxPrefix; - } - @Override - public void processElement(ProcessContext c) { - String word = c.element().value; - for (int i = minPrefix; i <= Math.min(word.length(), maxPrefix); i++) { - KV<String, CompletionCandidate> kv = KV.of(word.substring(0, i), c.element()); - c.output(kv); - } - } - } - - /** - * Class used to store tag-count pairs. - */ - @DefaultCoder(AvroCoder.class) - static class CompletionCandidate implements Comparable<CompletionCandidate> { - private long count; - private String value; - - public CompletionCandidate(String value, long count) { - this.value = value; - this.count = count; - } - - public String getValue() { - return value; - } - - // Empty constructor required for Avro decoding. - @SuppressWarnings("unused") - public CompletionCandidate() {} - - @Override - public int compareTo(CompletionCandidate o) { - if (this.count < o.count) { - return -1; - } else if (this.count == o.count) { - return this.value.compareTo(o.value); - } else { - return 1; - } - } - - @Override - public boolean equals(Object other) { - if (other instanceof CompletionCandidate) { - CompletionCandidate that = (CompletionCandidate) other; - return this.count == that.count && this.value.equals(that.value); - } else { - return false; - } - } - - @Override - public int hashCode() { - return Long.valueOf(count).hashCode() ^ value.hashCode(); - } - - @Override - public String toString() { - return "CompletionCandidate[" + value + ", " + count + "]"; - } - } - - static class ExtractWordsFn extends DoFn<String, String> { - private final Aggregator<Long, Long> emptyLines = - createAggregator("emptyLines", new Sum.SumLongFn()); - - @Override - public void processElement(ProcessContext c) { - if (c.element().trim().isEmpty()) { - emptyLines.addValue(1L); - } - - // Split the line into words. - String[] words = c.element().split("[^a-zA-Z']+"); - - // Output each word encountered into the output PCollection. - for (String word : words) { - if (!word.isEmpty()) { - c.output(word); - } - } - } - } - - /** - * Takes as input a the top candidates per prefix, and emits an entity - * suitable for writing to Datastore. - */ - static class FormatForPerTaskLocalFile extends DoFn<KV<String, List<CompletionCandidate>>, String> - implements DoFn.RequiresWindowAccess{ - - private static final long serialVersionUID = 0; - - @Override - public void processElement(ProcessContext c) { - StringBuilder str = new StringBuilder(); - KV<String, List<CompletionCandidate>> elem = c.element(); - - str.append(elem.getKey() +" @ "+ c.window() +" -> "); - for(CompletionCandidate cand: elem.getValue()) { - str.append(cand.toString() + " "); - } - System.out.println(str.toString()); - c.output(str.toString()); - } - } - - /** - * Options supported by this class. - * - * <p> Inherits standard Dataflow configuration options. - */ - private interface Options extends WindowedWordCount.StreamingWordCountOptions { - @Description("Whether to use the recursive algorithm") - @Default.Boolean(true) - Boolean getRecursive(); - void setRecursive(Boolean value); - } - - public static void main(String[] args) throws IOException { - Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class); - options.setStreaming(true); - options.setCheckpointingInterval(1000L); - options.setNumberOfExecutionRetries(5); - options.setExecutionRetryDelay(3000L); - options.setRunner(FlinkPipelineRunner.class); - - PTransform<? super PBegin, PCollection<String>> readSource = - Read.from(new UnboundedSocketSource<>("localhost", 9999, '\n', 3)).named("WordStream"); - WindowFn<Object, ?> windowFn = FixedWindows.of(Duration.standardSeconds(options.getWindowSize())); - - // Create the pipeline. - Pipeline p = Pipeline.create(options); - PCollection<KV<String, List<CompletionCandidate>>> toWrite = p - .apply(readSource) - .apply(ParDo.of(new ExtractWordsFn())) - .apply(Window.<String>into(windowFn) - .triggering(AfterWatermark.pastEndOfWindow()).withAllowedLateness(Duration.ZERO) - .discardingFiredPanes()) - .apply(ComputeTopCompletions.top(10, options.getRecursive())); - - toWrite - .apply(ParDo.named("FormatForPerTaskFile").of(new FormatForPerTaskLocalFile())) - .apply(TextIO.Write.to("./outputAutoComplete.txt")); - - p.run(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/src/main/java/org/apache/beam/runners/flink/examples/streaming/JoinExamples.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/examples/streaming/JoinExamples.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/examples/streaming/JoinExamples.java deleted file mode 100644 index 3a8bdb0..0000000 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/examples/streaming/JoinExamples.java +++ /dev/null @@ -1,158 +0,0 @@ -/* - * Copyright (C) 2015 Google Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package org.apache.beam.runners.flink.examples.streaming; - -import org.apache.beam.runners.flink.FlinkPipelineRunner; -import org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSocketSource; -import com.google.cloud.dataflow.sdk.Pipeline; -import com.google.cloud.dataflow.sdk.io.Read; -import com.google.cloud.dataflow.sdk.io.TextIO; -import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; -import com.google.cloud.dataflow.sdk.transforms.DoFn; -import com.google.cloud.dataflow.sdk.transforms.PTransform; -import com.google.cloud.dataflow.sdk.transforms.ParDo; -import com.google.cloud.dataflow.sdk.transforms.join.CoGbkResult; -import com.google.cloud.dataflow.sdk.transforms.join.CoGroupByKey; -import com.google.cloud.dataflow.sdk.transforms.join.KeyedPCollectionTuple; -import com.google.cloud.dataflow.sdk.transforms.windowing.*; -import com.google.cloud.dataflow.sdk.values.KV; -import com.google.cloud.dataflow.sdk.values.PBegin; -import com.google.cloud.dataflow.sdk.values.PCollection; -import com.google.cloud.dataflow.sdk.values.TupleTag; -import org.joda.time.Duration; - -/** - * To run the example, first open two sockets on two terminals by executing the commands: - * <li> - * <li> - * <code>nc -lk 9999</code>, and - * </li> - * <li> - * <code>nc -lk 9998</code> - * </li> - * </li> - * and then launch the example. Now whatever you type in the terminal is going to be - * the input to the program. - * */ -public class JoinExamples { - - static PCollection<String> joinEvents(PCollection<String> streamA, - PCollection<String> streamB) throws Exception { - - final TupleTag<String> firstInfoTag = new TupleTag<>(); - final TupleTag<String> secondInfoTag = new TupleTag<>(); - - // transform both input collections to tuple collections, where the keys are country - // codes in both cases. - PCollection<KV<String, String>> firstInfo = streamA.apply( - ParDo.of(new ExtractEventDataFn())); - PCollection<KV<String, String>> secondInfo = streamB.apply( - ParDo.of(new ExtractEventDataFn())); - - // country code 'key' -> CGBKR (<event info>, <country name>) - PCollection<KV<String, CoGbkResult>> kvpCollection = KeyedPCollectionTuple - .of(firstInfoTag, firstInfo) - .and(secondInfoTag, secondInfo) - .apply(CoGroupByKey.<String>create()); - - // Process the CoGbkResult elements generated by the CoGroupByKey transform. - // country code 'key' -> string of <event info>, <country name> - PCollection<KV<String, String>> finalResultCollection = - kvpCollection.apply(ParDo.named("Process").of( - new DoFn<KV<String, CoGbkResult>, KV<String, String>>() { - private static final long serialVersionUID = 0; - - @Override - public void processElement(ProcessContext c) { - KV<String, CoGbkResult> e = c.element(); - String key = e.getKey(); - - String defaultA = "NO_VALUE"; - - // the following getOnly is a bit tricky because it expects to have - // EXACTLY ONE value in the corresponding stream and for the corresponding key. - - String lineA = e.getValue().getOnly(firstInfoTag, defaultA); - for (String lineB : c.element().getValue().getAll(secondInfoTag)) { - // Generate a string that combines information from both collection values - c.output(KV.of(key, "Value A: " + lineA + " - Value B: " + lineB)); - } - } - })); - - return finalResultCollection - .apply(ParDo.named("Format").of(new DoFn<KV<String, String>, String>() { - private static final long serialVersionUID = 0; - - @Override - public void processElement(ProcessContext c) { - String result = c.element().getKey() + " -> " + c.element().getValue(); - System.out.println(result); - c.output(result); - } - })); - } - - static class ExtractEventDataFn extends DoFn<String, KV<String, String>> { - private static final long serialVersionUID = 0; - - @Override - public void processElement(ProcessContext c) { - String line = c.element().toLowerCase(); - String key = line.split("\\s")[0]; - c.output(KV.of(key, line)); - } - } - - private interface Options extends WindowedWordCount.StreamingWordCountOptions { - - } - - public static void main(String[] args) throws Exception { - Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class); - options.setStreaming(true); - options.setCheckpointingInterval(1000L); - options.setNumberOfExecutionRetries(5); - options.setExecutionRetryDelay(3000L); - options.setRunner(FlinkPipelineRunner.class); - - PTransform<? super PBegin, PCollection<String>> readSourceA = - Read.from(new UnboundedSocketSource<>("localhost", 9999, '\n', 3)).named("FirstStream"); - PTransform<? super PBegin, PCollection<String>> readSourceB = - Read.from(new UnboundedSocketSource<>("localhost", 9998, '\n', 3)).named("SecondStream"); - - WindowFn<Object, ?> windowFn = FixedWindows.of(Duration.standardSeconds(options.getWindowSize())); - - Pipeline p = Pipeline.create(options); - - // the following two 'applys' create multiple inputs to our pipeline, one for each - // of our two input sources. - PCollection<String> streamA = p.apply(readSourceA) - .apply(Window.<String>into(windowFn) - .triggering(AfterWatermark.pastEndOfWindow()).withAllowedLateness(Duration.ZERO) - .discardingFiredPanes()); - PCollection<String> streamB = p.apply(readSourceB) - .apply(Window.<String>into(windowFn) - .triggering(AfterWatermark.pastEndOfWindow()).withAllowedLateness(Duration.ZERO) - .discardingFiredPanes()); - - PCollection<String> formattedResults = joinEvents(streamA, streamB); - formattedResults.apply(TextIO.Write.to("./outputJoin.txt")); - p.run(); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java deleted file mode 100644 index 55cdc22..0000000 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java +++ /dev/null @@ -1,143 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink.examples.streaming; - -import org.apache.beam.runners.flink.FlinkPipelineRunner; -import org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedFlinkSource; -import com.google.cloud.dataflow.sdk.Pipeline; -import com.google.cloud.dataflow.sdk.io.Read; -import com.google.cloud.dataflow.sdk.io.TextIO; -import com.google.cloud.dataflow.sdk.io.UnboundedSource; -import com.google.cloud.dataflow.sdk.options.Default; -import com.google.cloud.dataflow.sdk.options.Description; -import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; -import com.google.cloud.dataflow.sdk.transforms.*; -import com.google.cloud.dataflow.sdk.transforms.windowing.*; -import com.google.cloud.dataflow.sdk.values.KV; -import com.google.cloud.dataflow.sdk.values.PCollection; -import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer082; -import org.apache.flink.streaming.util.serialization.SimpleStringSchema; -import org.joda.time.Duration; - -import java.util.Properties; - -public class KafkaWindowedWordCountExample { - - static final String KAFKA_TOPIC = "test"; // Default kafka topic to read from - static final String KAFKA_BROKER = "localhost:9092"; // Default kafka broker to contact - static final String GROUP_ID = "myGroup"; // Default groupId - static final String ZOOKEEPER = "localhost:2181"; // Default zookeeper to connect to for Kafka - - public static class ExtractWordsFn extends DoFn<String, String> { - private final Aggregator<Long, Long> emptyLines = - createAggregator("emptyLines", new Sum.SumLongFn()); - - @Override - public void processElement(ProcessContext c) { - if (c.element().trim().isEmpty()) { - emptyLines.addValue(1L); - } - - // Split the line into words. - String[] words = c.element().split("[^a-zA-Z']+"); - - // Output each word encountered into the output PCollection. - for (String word : words) { - if (!word.isEmpty()) { - c.output(word); - } - } - } - } - - public static class FormatAsStringFn extends DoFn<KV<String, Long>, String> { - @Override - public void processElement(ProcessContext c) { - String row = c.element().getKey() + " - " + c.element().getValue() + " @ " + c.timestamp().toString(); - System.out.println(row); - c.output(row); - } - } - - public interface KafkaStreamingWordCountOptions extends WindowedWordCount.StreamingWordCountOptions { - @Description("The Kafka topic to read from") - @Default.String(KAFKA_TOPIC) - String getKafkaTopic(); - - void setKafkaTopic(String value); - - @Description("The Kafka Broker to read from") - @Default.String(KAFKA_BROKER) - String getBroker(); - - void setBroker(String value); - - @Description("The Zookeeper server to connect to") - @Default.String(ZOOKEEPER) - String getZookeeper(); - - void setZookeeper(String value); - - @Description("The groupId") - @Default.String(GROUP_ID) - String getGroup(); - - void setGroup(String value); - - } - - public static void main(String[] args) { - PipelineOptionsFactory.register(KafkaStreamingWordCountOptions.class); - KafkaStreamingWordCountOptions options = PipelineOptionsFactory.fromArgs(args).as(KafkaStreamingWordCountOptions.class); - options.setJobName("KafkaExample"); - options.setStreaming(true); - options.setCheckpointingInterval(1000L); - options.setNumberOfExecutionRetries(5); - options.setExecutionRetryDelay(3000L); - options.setRunner(FlinkPipelineRunner.class); - - System.out.println(options.getKafkaTopic() +" "+ options.getZookeeper() +" "+ options.getBroker() +" "+ options.getGroup() ); - Pipeline pipeline = Pipeline.create(options); - - Properties p = new Properties(); - p.setProperty("zookeeper.connect", options.getZookeeper()); - p.setProperty("bootstrap.servers", options.getBroker()); - p.setProperty("group.id", options.getGroup()); - - // this is the Flink consumer that reads the input to - // the program from a kafka topic. - FlinkKafkaConsumer082 kafkaConsumer = new FlinkKafkaConsumer082<>( - options.getKafkaTopic(), - new SimpleStringSchema(), p); - - PCollection<String> words = pipeline - .apply(Read.from(new UnboundedFlinkSource<String, UnboundedSource.CheckpointMark>(options, kafkaConsumer)).named("StreamingWordCount")) - .apply(ParDo.of(new ExtractWordsFn())) - .apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(options.getWindowSize()))) - .triggering(AfterWatermark.pastEndOfWindow()).withAllowedLateness(Duration.ZERO) - .discardingFiredPanes()); - - PCollection<KV<String, Long>> wordCounts = - words.apply(Count.<String>perElement()); - - wordCounts.apply(ParDo.of(new FormatAsStringFn())) - .apply(TextIO.Write.to("./outputKafka.txt")); - - pipeline.run(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/071e4dd6/runners/flink/src/main/java/org/apache/beam/runners/flink/examples/streaming/WindowedWordCount.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/examples/streaming/WindowedWordCount.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/examples/streaming/WindowedWordCount.java deleted file mode 100644 index 7eb69ba..0000000 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/examples/streaming/WindowedWordCount.java +++ /dev/null @@ -1,130 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink.examples.streaming; - -import org.apache.beam.runners.flink.FlinkPipelineRunner; -import org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSocketSource; -import com.google.cloud.dataflow.sdk.Pipeline; -import com.google.cloud.dataflow.sdk.io.*; -import com.google.cloud.dataflow.sdk.options.Default; -import com.google.cloud.dataflow.sdk.options.Description; -import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; -import com.google.cloud.dataflow.sdk.transforms.*; -import com.google.cloud.dataflow.sdk.transforms.windowing.*; -import com.google.cloud.dataflow.sdk.values.KV; -import com.google.cloud.dataflow.sdk.values.PCollection; - -import org.joda.time.Duration; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; - -/** - * To run the example, first open a socket on a terminal by executing the command: - * <li> - * <li> - * <code>nc -lk 9999</code> - * </li> - * </li> - * and then launch the example. Now whatever you type in the terminal is going to be - * the input to the program. - * */ -public class WindowedWordCount { - - private static final Logger LOG = LoggerFactory.getLogger(WindowedWordCount.class); - - static final long WINDOW_SIZE = 10; // Default window duration in seconds - static final long SLIDE_SIZE = 5; // Default window slide in seconds - - static class FormatAsStringFn extends DoFn<KV<String, Long>, String> { - @Override - public void processElement(ProcessContext c) { - String row = c.element().getKey() + " - " + c.element().getValue() + " @ " + c.timestamp().toString(); - c.output(row); - } - } - - static class ExtractWordsFn extends DoFn<String, String> { - private final Aggregator<Long, Long> emptyLines = - createAggregator("emptyLines", new Sum.SumLongFn()); - - @Override - public void processElement(ProcessContext c) { - if (c.element().trim().isEmpty()) { - emptyLines.addValue(1L); - } - - // Split the line into words. - String[] words = c.element().split("[^a-zA-Z']+"); - - // Output each word encountered into the output PCollection. - for (String word : words) { - if (!word.isEmpty()) { - c.output(word); - } - } - } - } - - public interface StreamingWordCountOptions extends org.apache.beam.runners.flink.examples.WordCount.Options { - @Description("Sliding window duration, in seconds") - @Default.Long(WINDOW_SIZE) - Long getWindowSize(); - - void setWindowSize(Long value); - - @Description("Window slide, in seconds") - @Default.Long(SLIDE_SIZE) - Long getSlide(); - - void setSlide(Long value); - } - - public static void main(String[] args) throws IOException { - StreamingWordCountOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(StreamingWordCountOptions.class); - options.setStreaming(true); - options.setWindowSize(10L); - options.setSlide(5L); - options.setCheckpointingInterval(1000L); - options.setNumberOfExecutionRetries(5); - options.setExecutionRetryDelay(3000L); - options.setRunner(FlinkPipelineRunner.class); - - LOG.info("Windpwed WordCount with Sliding Windows of " + options.getWindowSize() + - " sec. and a slide of " + options.getSlide()); - - Pipeline pipeline = Pipeline.create(options); - - PCollection<String> words = pipeline - .apply(Read.from(new UnboundedSocketSource<>("localhost", 9999, '\n', 3)).named("StreamingWordCount")) - .apply(ParDo.of(new ExtractWordsFn())) - .apply(Window.<String>into(SlidingWindows.of(Duration.standardSeconds(options.getWindowSize())) - .every(Duration.standardSeconds(options.getSlide()))) - .triggering(AfterWatermark.pastEndOfWindow()).withAllowedLateness(Duration.ZERO) - .discardingFiredPanes()); - - PCollection<KV<String, Long>> wordCounts = - words.apply(Count.<String>perElement()); - - wordCounts.apply(ParDo.of(new FormatAsStringFn())) - .apply(TextIO.Write.to("./outputWordCount.txt")); - - pipeline.run(); - } -}
