[flink] convert tabs to 2 spaces
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/8852eb15 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/8852eb15 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/8852eb15 Branch: refs/heads/master Commit: 8852eb15deec6a7354f29c894d7d2626748d5704 Parents: 39f08fa Author: Maximilian Michels <[email protected]> Authored: Wed Mar 2 23:03:37 2016 +0100 Committer: Davor Bonaci <[email protected]> Committed: Fri Mar 4 10:04:23 2016 -0800 ---------------------------------------------------------------------- .../FlinkPipelineExecutionEnvironment.java | 394 +++--- .../flink/dataflow/FlinkPipelineOptions.java | 102 +- .../flink/dataflow/FlinkPipelineRunner.java | 308 ++-- .../flink/dataflow/FlinkRunnerResult.java | 64 +- .../flink/dataflow/examples/TFIDF.java | 720 +++++----- .../flink/dataflow/examples/WordCount.java | 154 +- .../examples/streaming/JoinExamples.java | 208 +-- .../KafkaWindowedWordCountExample.java | 204 +-- .../examples/streaming/WindowedWordCount.java | 160 +-- .../flink/dataflow/io/ConsoleIO.java | 82 +- .../FlinkBatchPipelineTranslator.java | 230 +-- .../FlinkBatchTransformTranslators.java | 892 ++++++------ .../FlinkBatchTranslationContext.java | 154 +- .../translation/FlinkPipelineTranslator.java | 6 +- .../FlinkStreamingPipelineTranslator.java | 218 +-- .../FlinkStreamingTransformTranslators.java | 684 ++++----- .../FlinkStreamingTranslationContext.java | 88 +- .../FlinkCoGroupKeyedListAggregator.java | 48 +- .../functions/FlinkCreateFunction.java | 50 +- .../functions/FlinkDoFnFunction.java | 300 ++-- .../FlinkKeyedListAggregationFunction.java | 74 +- .../functions/FlinkMultiOutputDoFnFunction.java | 242 ++-- .../FlinkMultiOutputPruningFunction.java | 22 +- .../functions/FlinkPartialReduceFunction.java | 50 +- .../functions/FlinkReduceFunction.java | 32 +- .../translation/functions/UnionCoder.java | 218 +-- .../translation/types/CoderComparator.java | 362 ++--- .../translation/types/CoderTypeInformation.java | 162 +-- .../translation/types/CoderTypeSerializer.java | 228 +-- .../types/InspectableByteArrayOutputStream.java | 12 +- .../translation/types/KvCoderComperator.java | 452 +++--- .../types/KvCoderTypeInformation.java | 298 ++-- .../types/VoidCoderTypeSerializer.java | 154 +- .../wrappers/CombineFnAggregatorWrapper.java | 94 +- .../wrappers/DataInputViewWrapper.java | 50 +- .../wrappers/DataOutputViewWrapper.java | 32 +- .../SerializableFnAggregatorWrapper.java | 92 +- .../translation/wrappers/SinkOutputFormat.java | 160 +-- .../translation/wrappers/SourceInputFormat.java | 240 ++-- .../translation/wrappers/SourceInputSplit.java | 30 +- .../streaming/FlinkAbstractParDoWrapper.java | 432 +++--- .../FlinkGroupAlsoByWindowWrapper.java | 1116 +++++++-------- .../streaming/FlinkGroupByKeyWrapper.java | 46 +- .../streaming/FlinkParDoBoundMultiWrapper.java | 70 +- .../streaming/FlinkParDoBoundWrapper.java | 102 +- .../io/FlinkStreamingCreateFunction.java | 52 +- .../streaming/io/UnboundedFlinkSource.java | 76 +- .../streaming/io/UnboundedSocketSource.java | 372 ++--- .../streaming/io/UnboundedSourceWrapper.java | 182 +-- .../state/AbstractFlinkTimerInternals.java | 178 +-- .../streaming/state/FlinkStateInternals.java | 1338 +++++++++--------- .../streaming/state/StateCheckpointReader.java | 122 +- .../streaming/state/StateCheckpointUtils.java | 232 +-- .../streaming/state/StateCheckpointWriter.java | 198 +-- .../wrappers/streaming/state/StateType.java | 68 +- .../dataartisans/flink/dataflow/AvroITCase.java | 128 +- .../flink/dataflow/FlattenizeITCase.java | 60 +- .../flink/dataflow/FlinkTestPipeline.java | 74 +- .../flink/dataflow/JoinExamplesITCase.java | 122 +- .../flink/dataflow/MaybeEmptyTestITCase.java | 64 +- .../flink/dataflow/ParDoMultiOutputITCase.java | 128 +- .../flink/dataflow/ReadSourceITCase.java | 244 ++-- .../dataflow/RemoveDuplicatesEmptyITCase.java | 48 +- .../flink/dataflow/RemoveDuplicatesITCase.java | 50 +- .../flink/dataflow/SideInputITCase.java | 52 +- .../flink/dataflow/TfIdfITCase.java | 58 +- .../flink/dataflow/WordCountITCase.java | 54 +- .../flink/dataflow/WordCountJoin2ITCase.java | 196 +-- .../flink/dataflow/WordCountJoin3ITCase.java | 234 +-- .../flink/dataflow/WriteSinkITCase.java | 218 +-- .../streaming/GroupAlsoByWindowTest.java | 920 ++++++------ .../dataflow/streaming/GroupByNullKeyTest.java | 138 +- .../streaming/StateSerializationTest.java | 506 +++---- .../streaming/TopWikipediaSessionsITCase.java | 178 +-- .../flink/dataflow/util/JoinExamples.java | 232 +-- 75 files changed, 8179 insertions(+), 8179 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkPipelineExecutionEnvironment.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkPipelineExecutionEnvironment.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkPipelineExecutionEnvironment.java index 796849d..c2139c6 100644 --- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkPipelineExecutionEnvironment.java +++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkPipelineExecutionEnvironment.java @@ -40,228 +40,228 @@ import java.util.List; */ public class FlinkPipelineExecutionEnvironment { - private static final Logger LOG = LoggerFactory.getLogger(FlinkPipelineExecutionEnvironment.class); + private static final Logger LOG = LoggerFactory.getLogger(FlinkPipelineExecutionEnvironment.class); - private final FlinkPipelineOptions options; + private final FlinkPipelineOptions options; - /** - * The Flink Batch execution environment. This is instantiated to either a - * {@link org.apache.flink.api.java.CollectionEnvironment}, - * a {@link org.apache.flink.api.java.LocalEnvironment} or - * a {@link org.apache.flink.api.java.RemoteEnvironment}, depending on the configuration - * options. - */ - private ExecutionEnvironment flinkBatchEnv; + /** + * The Flink Batch execution environment. This is instantiated to either a + * {@link org.apache.flink.api.java.CollectionEnvironment}, + * a {@link org.apache.flink.api.java.LocalEnvironment} or + * a {@link org.apache.flink.api.java.RemoteEnvironment}, depending on the configuration + * options. + */ + private ExecutionEnvironment flinkBatchEnv; - /** - * The Flink Streaming execution environment. This is instantiated to either a - * {@link org.apache.flink.streaming.api.environment.LocalStreamEnvironment} or - * a {@link org.apache.flink.streaming.api.environment.RemoteStreamEnvironment}, depending - * on the configuration options, and more specifically, the url of the master. - */ - private StreamExecutionEnvironment flinkStreamEnv; + /** + * The Flink Streaming execution environment. This is instantiated to either a + * {@link org.apache.flink.streaming.api.environment.LocalStreamEnvironment} or + * a {@link org.apache.flink.streaming.api.environment.RemoteStreamEnvironment}, depending + * on the configuration options, and more specifically, the url of the master. + */ + private StreamExecutionEnvironment flinkStreamEnv; - /** - * Translator for this FlinkPipelineRunner. Its role is to translate the Beam operators to - * their Flink counterparts. Based on the options provided by the user, if we have a streaming job, - * this is instantiated as a {@link FlinkStreamingPipelineTranslator}. In other case, i.e. a batch job, - * a {@link FlinkBatchPipelineTranslator} is created. - */ - private FlinkPipelineTranslator flinkPipelineTranslator; + /** + * Translator for this FlinkPipelineRunner. Its role is to translate the Beam operators to + * their Flink counterparts. Based on the options provided by the user, if we have a streaming job, + * this is instantiated as a {@link FlinkStreamingPipelineTranslator}. In other case, i.e. a batch job, + * a {@link FlinkBatchPipelineTranslator} is created. + */ + private FlinkPipelineTranslator flinkPipelineTranslator; - /** - * Creates a {@link FlinkPipelineExecutionEnvironment} with the user-specified parameters in the - * provided {@link FlinkPipelineOptions}. - * - * @param options the user-defined pipeline options. - * */ - public FlinkPipelineExecutionEnvironment(FlinkPipelineOptions options) { - this.options = Preconditions.checkNotNull(options); - this.createPipelineExecutionEnvironment(); - this.createPipelineTranslator(); - } + /** + * Creates a {@link FlinkPipelineExecutionEnvironment} with the user-specified parameters in the + * provided {@link FlinkPipelineOptions}. + * + * @param options the user-defined pipeline options. + * */ + public FlinkPipelineExecutionEnvironment(FlinkPipelineOptions options) { + this.options = Preconditions.checkNotNull(options); + this.createPipelineExecutionEnvironment(); + this.createPipelineTranslator(); + } - /** - * Depending on the type of job (Streaming or Batch) and the user-specified options, - * this method creates the adequate ExecutionEnvironment. - */ - private void createPipelineExecutionEnvironment() { - if (options.isStreaming()) { - createStreamExecutionEnvironment(); - } else { - createBatchExecutionEnvironment(); - } - } + /** + * Depending on the type of job (Streaming or Batch) and the user-specified options, + * this method creates the adequate ExecutionEnvironment. + */ + private void createPipelineExecutionEnvironment() { + if (options.isStreaming()) { + createStreamExecutionEnvironment(); + } else { + createBatchExecutionEnvironment(); + } + } - /** - * Depending on the type of job (Streaming or Batch), this method creates the adequate job graph - * translator. In the case of batch, it will work with {@link org.apache.flink.api.java.DataSet}, - * while for streaming, it will work with {@link org.apache.flink.streaming.api.datastream.DataStream}. - */ - private void createPipelineTranslator() { - checkInitializationState(); - if (this.flinkPipelineTranslator != null) { - throw new IllegalStateException("FlinkPipelineTranslator already initialized."); - } + /** + * Depending on the type of job (Streaming or Batch), this method creates the adequate job graph + * translator. In the case of batch, it will work with {@link org.apache.flink.api.java.DataSet}, + * while for streaming, it will work with {@link org.apache.flink.streaming.api.datastream.DataStream}. + */ + private void createPipelineTranslator() { + checkInitializationState(); + if (this.flinkPipelineTranslator != null) { + throw new IllegalStateException("FlinkPipelineTranslator already initialized."); + } - this.flinkPipelineTranslator = options.isStreaming() ? - new FlinkStreamingPipelineTranslator(flinkStreamEnv, options) : - new FlinkBatchPipelineTranslator(flinkBatchEnv, options); - } + this.flinkPipelineTranslator = options.isStreaming() ? + new FlinkStreamingPipelineTranslator(flinkStreamEnv, options) : + new FlinkBatchPipelineTranslator(flinkBatchEnv, options); + } - /** - * Depending on if the job is a Streaming or a Batch one, this method creates - * the necessary execution environment and pipeline translator, and translates - * the {@link com.google.cloud.dataflow.sdk.values.PCollection} program into - * a {@link org.apache.flink.api.java.DataSet} or {@link org.apache.flink.streaming.api.datastream.DataStream} - * one. - * */ - public void translate(Pipeline pipeline) { - checkInitializationState(); - if(this.flinkBatchEnv == null && this.flinkStreamEnv == null) { - createPipelineExecutionEnvironment(); - } - if (this.flinkPipelineTranslator == null) { - createPipelineTranslator(); - } - this.flinkPipelineTranslator.translate(pipeline); - } + /** + * Depending on if the job is a Streaming or a Batch one, this method creates + * the necessary execution environment and pipeline translator, and translates + * the {@link com.google.cloud.dataflow.sdk.values.PCollection} program into + * a {@link org.apache.flink.api.java.DataSet} or {@link org.apache.flink.streaming.api.datastream.DataStream} + * one. + * */ + public void translate(Pipeline pipeline) { + checkInitializationState(); + if(this.flinkBatchEnv == null && this.flinkStreamEnv == null) { + createPipelineExecutionEnvironment(); + } + if (this.flinkPipelineTranslator == null) { + createPipelineTranslator(); + } + this.flinkPipelineTranslator.translate(pipeline); + } - /** - * Launches the program execution. - * */ - public JobExecutionResult executePipeline() throws Exception { - if (options.isStreaming()) { - if (this.flinkStreamEnv == null) { - throw new RuntimeException("FlinkPipelineExecutionEnvironment not initialized."); - } - if (this.flinkPipelineTranslator == null) { - throw new RuntimeException("FlinkPipelineTranslator not initialized."); - } - return this.flinkStreamEnv.execute(); - } else { - if (this.flinkBatchEnv == null) { - throw new RuntimeException("FlinkPipelineExecutionEnvironment not initialized."); - } - if (this.flinkPipelineTranslator == null) { - throw new RuntimeException("FlinkPipelineTranslator not initialized."); - } - return this.flinkBatchEnv.execute(); - } - } + /** + * Launches the program execution. + * */ + public JobExecutionResult executePipeline() throws Exception { + if (options.isStreaming()) { + if (this.flinkStreamEnv == null) { + throw new RuntimeException("FlinkPipelineExecutionEnvironment not initialized."); + } + if (this.flinkPipelineTranslator == null) { + throw new RuntimeException("FlinkPipelineTranslator not initialized."); + } + return this.flinkStreamEnv.execute(); + } else { + if (this.flinkBatchEnv == null) { + throw new RuntimeException("FlinkPipelineExecutionEnvironment not initialized."); + } + if (this.flinkPipelineTranslator == null) { + throw new RuntimeException("FlinkPipelineTranslator not initialized."); + } + return this.flinkBatchEnv.execute(); + } + } - /** - * If the submitted job is a batch processing job, this method creates the adequate - * Flink {@link org.apache.flink.api.java.ExecutionEnvironment} depending - * on the user-specified options. - */ - private void createBatchExecutionEnvironment() { - if (this.flinkStreamEnv != null || this.flinkBatchEnv != null) { - throw new RuntimeException("FlinkPipelineExecutionEnvironment already initialized."); - } + /** + * If the submitted job is a batch processing job, this method creates the adequate + * Flink {@link org.apache.flink.api.java.ExecutionEnvironment} depending + * on the user-specified options. + */ + private void createBatchExecutionEnvironment() { + if (this.flinkStreamEnv != null || this.flinkBatchEnv != null) { + throw new RuntimeException("FlinkPipelineExecutionEnvironment already initialized."); + } - LOG.info("Creating the required Batch Execution Environment."); + LOG.info("Creating the required Batch Execution Environment."); - String masterUrl = options.getFlinkMaster(); - this.flinkStreamEnv = null; + String masterUrl = options.getFlinkMaster(); + this.flinkStreamEnv = null; - // depending on the master, create the right environment. - if (masterUrl.equals("[local]")) { - this.flinkBatchEnv = ExecutionEnvironment.createLocalEnvironment(); - } else if (masterUrl.equals("[collection]")) { - this.flinkBatchEnv = new CollectionEnvironment(); - } else if (masterUrl.equals("[auto]")) { - this.flinkBatchEnv = ExecutionEnvironment.getExecutionEnvironment(); - } else if (masterUrl.matches(".*:\\d*")) { - String[] parts = masterUrl.split(":"); - List<String> stagingFiles = options.getFilesToStage(); - this.flinkBatchEnv = ExecutionEnvironment.createRemoteEnvironment(parts[0], - Integer.parseInt(parts[1]), - stagingFiles.toArray(new String[stagingFiles.size()])); - } else { - LOG.warn("Unrecognized Flink Master URL {}. Defaulting to [auto].", masterUrl); - this.flinkBatchEnv = ExecutionEnvironment.getExecutionEnvironment(); - } + // depending on the master, create the right environment. + if (masterUrl.equals("[local]")) { + this.flinkBatchEnv = ExecutionEnvironment.createLocalEnvironment(); + } else if (masterUrl.equals("[collection]")) { + this.flinkBatchEnv = new CollectionEnvironment(); + } else if (masterUrl.equals("[auto]")) { + this.flinkBatchEnv = ExecutionEnvironment.getExecutionEnvironment(); + } else if (masterUrl.matches(".*:\\d*")) { + String[] parts = masterUrl.split(":"); + List<String> stagingFiles = options.getFilesToStage(); + this.flinkBatchEnv = ExecutionEnvironment.createRemoteEnvironment(parts[0], + Integer.parseInt(parts[1]), + stagingFiles.toArray(new String[stagingFiles.size()])); + } else { + LOG.warn("Unrecognized Flink Master URL {}. Defaulting to [auto].", masterUrl); + this.flinkBatchEnv = ExecutionEnvironment.getExecutionEnvironment(); + } - // set the correct parallelism. - if (options.getParallelism() != -1 && !(this.flinkBatchEnv instanceof CollectionEnvironment)) { - this.flinkBatchEnv.setParallelism(options.getParallelism()); - } + // set the correct parallelism. + if (options.getParallelism() != -1 && !(this.flinkBatchEnv instanceof CollectionEnvironment)) { + this.flinkBatchEnv.setParallelism(options.getParallelism()); + } - // set parallelism in the options (required by some execution code) - options.setParallelism(flinkBatchEnv.getParallelism()); - } + // set parallelism in the options (required by some execution code) + options.setParallelism(flinkBatchEnv.getParallelism()); + } - /** - * If the submitted job is a stream processing job, this method creates the adequate - * Flink {@link org.apache.flink.streaming.api.environment.StreamExecutionEnvironment} depending - * on the user-specified options. - */ - private void createStreamExecutionEnvironment() { - if (this.flinkStreamEnv != null || this.flinkBatchEnv != null) { - throw new RuntimeException("FlinkPipelineExecutionEnvironment already initialized."); - } + /** + * If the submitted job is a stream processing job, this method creates the adequate + * Flink {@link org.apache.flink.streaming.api.environment.StreamExecutionEnvironment} depending + * on the user-specified options. + */ + private void createStreamExecutionEnvironment() { + if (this.flinkStreamEnv != null || this.flinkBatchEnv != null) { + throw new RuntimeException("FlinkPipelineExecutionEnvironment already initialized."); + } - LOG.info("Creating the required Streaming Environment."); + LOG.info("Creating the required Streaming Environment."); - String masterUrl = options.getFlinkMaster(); - this.flinkBatchEnv = null; + String masterUrl = options.getFlinkMaster(); + this.flinkBatchEnv = null; - // depending on the master, create the right environment. - if (masterUrl.equals("[local]")) { - this.flinkStreamEnv = StreamExecutionEnvironment.createLocalEnvironment(); - } else if (masterUrl.equals("[auto]")) { - this.flinkStreamEnv = StreamExecutionEnvironment.getExecutionEnvironment(); - } else if (masterUrl.matches(".*:\\d*")) { - String[] parts = masterUrl.split(":"); - List<String> stagingFiles = options.getFilesToStage(); - this.flinkStreamEnv = StreamExecutionEnvironment.createRemoteEnvironment(parts[0], - Integer.parseInt(parts[1]), stagingFiles.toArray(new String[stagingFiles.size()])); - } else { - LOG.warn("Unrecognized Flink Master URL {}. Defaulting to [auto].", masterUrl); - this.flinkStreamEnv = StreamExecutionEnvironment.getExecutionEnvironment(); - } + // depending on the master, create the right environment. + if (masterUrl.equals("[local]")) { + this.flinkStreamEnv = StreamExecutionEnvironment.createLocalEnvironment(); + } else if (masterUrl.equals("[auto]")) { + this.flinkStreamEnv = StreamExecutionEnvironment.getExecutionEnvironment(); + } else if (masterUrl.matches(".*:\\d*")) { + String[] parts = masterUrl.split(":"); + List<String> stagingFiles = options.getFilesToStage(); + this.flinkStreamEnv = StreamExecutionEnvironment.createRemoteEnvironment(parts[0], + Integer.parseInt(parts[1]), stagingFiles.toArray(new String[stagingFiles.size()])); + } else { + LOG.warn("Unrecognized Flink Master URL {}. Defaulting to [auto].", masterUrl); + this.flinkStreamEnv = StreamExecutionEnvironment.getExecutionEnvironment(); + } - // set the correct parallelism. - if (options.getParallelism() != -1) { - this.flinkStreamEnv.setParallelism(options.getParallelism()); - } + // set the correct parallelism. + if (options.getParallelism() != -1) { + this.flinkStreamEnv.setParallelism(options.getParallelism()); + } - // set parallelism in the options (required by some execution code) - options.setParallelism(flinkStreamEnv.getParallelism()); + // set parallelism in the options (required by some execution code) + options.setParallelism(flinkStreamEnv.getParallelism()); - // default to event time - this.flinkStreamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); + // default to event time + this.flinkStreamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); - // for the following 2 parameters, a value of -1 means that Flink will use - // the default values as specified in the configuration. - int numRetries = options.getNumberOfExecutionRetries(); - if (numRetries != -1) { - this.flinkStreamEnv.setNumberOfExecutionRetries(numRetries); - } - long retryDelay = options.getExecutionRetryDelay(); - if (retryDelay != -1) { - this.flinkStreamEnv.getConfig().setExecutionRetryDelay(retryDelay); - } + // for the following 2 parameters, a value of -1 means that Flink will use + // the default values as specified in the configuration. + int numRetries = options.getNumberOfExecutionRetries(); + if (numRetries != -1) { + this.flinkStreamEnv.setNumberOfExecutionRetries(numRetries); + } + long retryDelay = options.getExecutionRetryDelay(); + if (retryDelay != -1) { + this.flinkStreamEnv.getConfig().setExecutionRetryDelay(retryDelay); + } - // A value of -1 corresponds to disabled checkpointing (see CheckpointConfig in Flink). - // If the value is not -1, then the validity checks are applied. - // By default, checkpointing is disabled. - long checkpointInterval = options.getCheckpointingInterval(); - if(checkpointInterval != -1) { - if (checkpointInterval < 1) { - throw new IllegalArgumentException("The checkpoint interval must be positive"); - } - this.flinkStreamEnv.enableCheckpointing(checkpointInterval); - } - } + // A value of -1 corresponds to disabled checkpointing (see CheckpointConfig in Flink). + // If the value is not -1, then the validity checks are applied. + // By default, checkpointing is disabled. + long checkpointInterval = options.getCheckpointingInterval(); + if(checkpointInterval != -1) { + if (checkpointInterval < 1) { + throw new IllegalArgumentException("The checkpoint interval must be positive"); + } + this.flinkStreamEnv.enableCheckpointing(checkpointInterval); + } + } - private void checkInitializationState() { - if (options.isStreaming() && this.flinkBatchEnv != null) { - throw new IllegalStateException("Attempted to run a Streaming Job with a Batch Execution Environment."); - } else if (!options.isStreaming() && this.flinkStreamEnv != null) { - throw new IllegalStateException("Attempted to run a Batch Job with a Streaming Execution Environment."); - } - } + private void checkInitializationState() { + if (options.isStreaming() && this.flinkBatchEnv != null) { + throw new IllegalStateException("Attempted to run a Streaming Job with a Batch Execution Environment."); + } else if (!options.isStreaming() && this.flinkStreamEnv != null) { + throw new IllegalStateException("Attempted to run a Batch Job with a Streaming Execution Environment."); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkPipelineOptions.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkPipelineOptions.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkPipelineOptions.java index 2429cac..fabbfad 100644 --- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkPipelineOptions.java +++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkPipelineOptions.java @@ -31,61 +31,61 @@ import java.util.List; */ public interface FlinkPipelineOptions extends PipelineOptions, ApplicationNameOptions, StreamingOptions { - /** - * List of local files to make available to workers. - * <p> - * Jars are placed on the worker's classpath. - * <p> - * The default value is the list of jars from the main program's classpath. - */ - @Description("Jar-Files to send to all workers and put on the classpath. " + - "The default value is all files from the classpath.") - @JsonIgnore - List<String> getFilesToStage(); - void setFilesToStage(List<String> value); + /** + * List of local files to make available to workers. + * <p> + * Jars are placed on the worker's classpath. + * <p> + * The default value is the list of jars from the main program's classpath. + */ + @Description("Jar-Files to send to all workers and put on the classpath. " + + "The default value is all files from the classpath.") + @JsonIgnore + List<String> getFilesToStage(); + void setFilesToStage(List<String> value); - /** - * The job name is used to identify jobs running on a Flink cluster. - */ - @Description("Dataflow job name, to uniquely identify active jobs. " - + "Defaults to using the ApplicationName-UserName-Date.") - @Default.InstanceFactory(DataflowPipelineOptions.JobNameFactory.class) - String getJobName(); - void setJobName(String value); + /** + * The job name is used to identify jobs running on a Flink cluster. + */ + @Description("Dataflow job name, to uniquely identify active jobs. " + + "Defaults to using the ApplicationName-UserName-Date.") + @Default.InstanceFactory(DataflowPipelineOptions.JobNameFactory.class) + String getJobName(); + void setJobName(String value); - /** - * The url of the Flink JobManager on which to execute pipelines. This can either be - * the the address of a cluster JobManager, in the form "host:port" or one of the special - * Strings "[local]", "[collection]" or "[auto]". "[local]" will start a local Flink - * Cluster in the JVM, "[collection]" will execute the pipeline on Java Collections while - * "[auto]" will let the system decide where to execute the pipeline based on the environment. - */ - @Description("Address of the Flink Master where the Pipeline should be executed. Can" + - " either be of the form \"host:port\" or one of the special values [local], " + - "[collection] or [auto].") - String getFlinkMaster(); - void setFlinkMaster(String value); + /** + * The url of the Flink JobManager on which to execute pipelines. This can either be + * the the address of a cluster JobManager, in the form "host:port" or one of the special + * Strings "[local]", "[collection]" or "[auto]". "[local]" will start a local Flink + * Cluster in the JVM, "[collection]" will execute the pipeline on Java Collections while + * "[auto]" will let the system decide where to execute the pipeline based on the environment. + */ + @Description("Address of the Flink Master where the Pipeline should be executed. Can" + + " either be of the form \"host:port\" or one of the special values [local], " + + "[collection] or [auto].") + String getFlinkMaster(); + void setFlinkMaster(String value); - @Description("The degree of parallelism to be used when distributing operations onto workers.") - @Default.Integer(-1) - Integer getParallelism(); - void setParallelism(Integer value); + @Description("The degree of parallelism to be used when distributing operations onto workers.") + @Default.Integer(-1) + Integer getParallelism(); + void setParallelism(Integer value); - @Description("The interval between consecutive checkpoints (i.e. snapshots of the current pipeline state used for " + - "fault tolerance).") - @Default.Long(-1L) - Long getCheckpointingInterval(); - void setCheckpointingInterval(Long interval); + @Description("The interval between consecutive checkpoints (i.e. snapshots of the current pipeline state used for " + + "fault tolerance).") + @Default.Long(-1L) + Long getCheckpointingInterval(); + void setCheckpointingInterval(Long interval); - @Description("Sets the number of times that failed tasks are re-executed. " + - "A value of zero effectively disables fault tolerance. A value of -1 indicates " + - "that the system default value (as defined in the configuration) should be used.") - @Default.Integer(-1) - Integer getNumberOfExecutionRetries(); - void setNumberOfExecutionRetries(Integer retries); + @Description("Sets the number of times that failed tasks are re-executed. " + + "A value of zero effectively disables fault tolerance. A value of -1 indicates " + + "that the system default value (as defined in the configuration) should be used.") + @Default.Integer(-1) + Integer getNumberOfExecutionRetries(); + void setNumberOfExecutionRetries(Integer retries); - @Description("Sets the delay between executions. A value of {@code -1} indicates that the default value should be used.") - @Default.Long(-1L) - Long getExecutionRetryDelay(); - void setExecutionRetryDelay(Long delay); + @Description("Sets the delay between executions. A value of {@code -1} indicates that the default value should be used.") + @Default.Long(-1L) + Long getExecutionRetryDelay(); + void setExecutionRetryDelay(Long delay); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkPipelineRunner.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkPipelineRunner.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkPipelineRunner.java index 7ea8370..742a316 100644 --- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkPipelineRunner.java +++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkPipelineRunner.java @@ -47,158 +47,158 @@ import java.util.Map; */ public class FlinkPipelineRunner extends PipelineRunner<FlinkRunnerResult> { - private static final Logger LOG = LoggerFactory.getLogger(FlinkPipelineRunner.class); - - /** - * Provided options. - */ - private final FlinkPipelineOptions options; - - private final FlinkPipelineExecutionEnvironment flinkJobEnv; - - /** - * Construct a runner from the provided options. - * - * @param options Properties which configure the runner. - * @return The newly created runner. - */ - public static FlinkPipelineRunner fromOptions(PipelineOptions options) { - FlinkPipelineOptions flinkOptions = - PipelineOptionsValidator.validate(FlinkPipelineOptions.class, options); - ArrayList<String> missing = new ArrayList<>(); - - if (flinkOptions.getAppName() == null) { - missing.add("appName"); - } - if (missing.size() > 0) { - throw new IllegalArgumentException( - "Missing required values: " + Joiner.on(',').join(missing)); - } - - if (flinkOptions.getFilesToStage() == null) { - flinkOptions.setFilesToStage(detectClassPathResourcesToStage( - DataflowPipelineRunner.class.getClassLoader())); - LOG.info("PipelineOptions.filesToStage was not specified. " - + "Defaulting to files from the classpath: will stage {} files. " - + "Enable logging at DEBUG level to see which files will be staged.", - flinkOptions.getFilesToStage().size()); - LOG.debug("Classpath elements: {}", flinkOptions.getFilesToStage()); - } - - // Verify jobName according to service requirements. - String jobName = flinkOptions.getJobName().toLowerCase(); - Preconditions.checkArgument(jobName.matches("[a-z]([-a-z0-9]*[a-z0-9])?"), "JobName invalid; " + - "the name must consist of only the characters " + "[-a-z0-9], starting with a letter " + - "and ending with a letter " + "or number"); - Preconditions.checkArgument(jobName.length() <= 40, - "JobName too long; must be no more than 40 characters in length"); - - // Set Flink Master to [auto] if no option was specified. - if (flinkOptions.getFlinkMaster() == null) { - flinkOptions.setFlinkMaster("[auto]"); - } - - return new FlinkPipelineRunner(flinkOptions); - } - - private FlinkPipelineRunner(FlinkPipelineOptions options) { - this.options = options; - this.flinkJobEnv = new FlinkPipelineExecutionEnvironment(options); - } - - @Override - public FlinkRunnerResult run(Pipeline pipeline) { - LOG.info("Executing pipeline using FlinkPipelineRunner."); - - LOG.info("Translating pipeline to Flink program."); - - this.flinkJobEnv.translate(pipeline); - - LOG.info("Starting execution of Flink program."); - - JobExecutionResult result; - try { - result = this.flinkJobEnv.executePipeline(); - } catch (Exception e) { - LOG.error("Pipeline execution failed", e); - throw new RuntimeException("Pipeline execution failed", e); - } - - LOG.info("Execution finished in {} msecs", result.getNetRuntime()); - - Map<String, Object> accumulators = result.getAllAccumulatorResults(); - if (accumulators != null && !accumulators.isEmpty()) { - LOG.info("Final aggregator values:"); - - for (Map.Entry<String, Object> entry : result.getAllAccumulatorResults().entrySet()) { - LOG.info("{} : {}", entry.getKey(), entry.getValue()); - } - } - - return new FlinkRunnerResult(accumulators, result.getNetRuntime()); - } - - /** - * For testing. - */ - public FlinkPipelineOptions getPipelineOptions() { - return options; - } - - /** - * Constructs a runner with default properties for testing. - * - * @return The newly created runner. - */ - public static FlinkPipelineRunner createForTest(boolean streaming) { - FlinkPipelineOptions options = PipelineOptionsFactory.as(FlinkPipelineOptions.class); - // we use [auto] for testing since this will make it pick up the Testing - // ExecutionEnvironment - options.setFlinkMaster("[auto]"); - options.setStreaming(streaming); - return new FlinkPipelineRunner(options); - } - - @Override - public <Output extends POutput, Input extends PInput> Output apply( - PTransform<Input, Output> transform, Input input) { - return super.apply(transform, input); - } - - ///////////////////////////////////////////////////////////////////////////// - - @Override - public String toString() { - return "DataflowPipelineRunner#" + hashCode(); - } - - /** - * Attempts to detect all the resources the class loader has access to. This does not recurse - * to class loader parents stopping it from pulling in resources from the system class loader. - * - * @param classLoader The URLClassLoader to use to detect resources to stage. - * @return A list of absolute paths to the resources the class loader uses. - * @throws IllegalArgumentException If either the class loader is not a URLClassLoader or one - * of the resources the class loader exposes is not a file resource. - */ - protected static List<String> detectClassPathResourcesToStage(ClassLoader classLoader) { - if (!(classLoader instanceof URLClassLoader)) { - String message = String.format("Unable to use ClassLoader to detect classpath elements. " - + "Current ClassLoader is %s, only URLClassLoaders are supported.", classLoader); - LOG.error(message); - throw new IllegalArgumentException(message); - } - - List<String> files = new ArrayList<>(); - for (URL url : ((URLClassLoader) classLoader).getURLs()) { - try { - files.add(new File(url.toURI()).getAbsolutePath()); - } catch (IllegalArgumentException | URISyntaxException e) { - String message = String.format("Unable to convert url (%s) to file.", url); - LOG.error(message); - throw new IllegalArgumentException(message, e); - } - } - return files; - } + private static final Logger LOG = LoggerFactory.getLogger(FlinkPipelineRunner.class); + + /** + * Provided options. + */ + private final FlinkPipelineOptions options; + + private final FlinkPipelineExecutionEnvironment flinkJobEnv; + + /** + * Construct a runner from the provided options. + * + * @param options Properties which configure the runner. + * @return The newly created runner. + */ + public static FlinkPipelineRunner fromOptions(PipelineOptions options) { + FlinkPipelineOptions flinkOptions = + PipelineOptionsValidator.validate(FlinkPipelineOptions.class, options); + ArrayList<String> missing = new ArrayList<>(); + + if (flinkOptions.getAppName() == null) { + missing.add("appName"); + } + if (missing.size() > 0) { + throw new IllegalArgumentException( + "Missing required values: " + Joiner.on(',').join(missing)); + } + + if (flinkOptions.getFilesToStage() == null) { + flinkOptions.setFilesToStage(detectClassPathResourcesToStage( + DataflowPipelineRunner.class.getClassLoader())); + LOG.info("PipelineOptions.filesToStage was not specified. " + + "Defaulting to files from the classpath: will stage {} files. " + + "Enable logging at DEBUG level to see which files will be staged.", + flinkOptions.getFilesToStage().size()); + LOG.debug("Classpath elements: {}", flinkOptions.getFilesToStage()); + } + + // Verify jobName according to service requirements. + String jobName = flinkOptions.getJobName().toLowerCase(); + Preconditions.checkArgument(jobName.matches("[a-z]([-a-z0-9]*[a-z0-9])?"), "JobName invalid; " + + "the name must consist of only the characters " + "[-a-z0-9], starting with a letter " + + "and ending with a letter " + "or number"); + Preconditions.checkArgument(jobName.length() <= 40, + "JobName too long; must be no more than 40 characters in length"); + + // Set Flink Master to [auto] if no option was specified. + if (flinkOptions.getFlinkMaster() == null) { + flinkOptions.setFlinkMaster("[auto]"); + } + + return new FlinkPipelineRunner(flinkOptions); + } + + private FlinkPipelineRunner(FlinkPipelineOptions options) { + this.options = options; + this.flinkJobEnv = new FlinkPipelineExecutionEnvironment(options); + } + + @Override + public FlinkRunnerResult run(Pipeline pipeline) { + LOG.info("Executing pipeline using FlinkPipelineRunner."); + + LOG.info("Translating pipeline to Flink program."); + + this.flinkJobEnv.translate(pipeline); + + LOG.info("Starting execution of Flink program."); + + JobExecutionResult result; + try { + result = this.flinkJobEnv.executePipeline(); + } catch (Exception e) { + LOG.error("Pipeline execution failed", e); + throw new RuntimeException("Pipeline execution failed", e); + } + + LOG.info("Execution finished in {} msecs", result.getNetRuntime()); + + Map<String, Object> accumulators = result.getAllAccumulatorResults(); + if (accumulators != null && !accumulators.isEmpty()) { + LOG.info("Final aggregator values:"); + + for (Map.Entry<String, Object> entry : result.getAllAccumulatorResults().entrySet()) { + LOG.info("{} : {}", entry.getKey(), entry.getValue()); + } + } + + return new FlinkRunnerResult(accumulators, result.getNetRuntime()); + } + + /** + * For testing. + */ + public FlinkPipelineOptions getPipelineOptions() { + return options; + } + + /** + * Constructs a runner with default properties for testing. + * + * @return The newly created runner. + */ + public static FlinkPipelineRunner createForTest(boolean streaming) { + FlinkPipelineOptions options = PipelineOptionsFactory.as(FlinkPipelineOptions.class); + // we use [auto] for testing since this will make it pick up the Testing + // ExecutionEnvironment + options.setFlinkMaster("[auto]"); + options.setStreaming(streaming); + return new FlinkPipelineRunner(options); + } + + @Override + public <Output extends POutput, Input extends PInput> Output apply( + PTransform<Input, Output> transform, Input input) { + return super.apply(transform, input); + } + + ///////////////////////////////////////////////////////////////////////////// + + @Override + public String toString() { + return "DataflowPipelineRunner#" + hashCode(); + } + + /** + * Attempts to detect all the resources the class loader has access to. This does not recurse + * to class loader parents stopping it from pulling in resources from the system class loader. + * + * @param classLoader The URLClassLoader to use to detect resources to stage. + * @return A list of absolute paths to the resources the class loader uses. + * @throws IllegalArgumentException If either the class loader is not a URLClassLoader or one + * of the resources the class loader exposes is not a file resource. + */ + protected static List<String> detectClassPathResourcesToStage(ClassLoader classLoader) { + if (!(classLoader instanceof URLClassLoader)) { + String message = String.format("Unable to use ClassLoader to detect classpath elements. " + + "Current ClassLoader is %s, only URLClassLoaders are supported.", classLoader); + LOG.error(message); + throw new IllegalArgumentException(message); + } + + List<String> files = new ArrayList<>(); + for (URL url : ((URLClassLoader) classLoader).getURLs()) { + try { + files.add(new File(url.toURI()).getAbsolutePath()); + } catch (IllegalArgumentException | URISyntaxException e) { + String message = String.format("Unable to convert url (%s) to file.", url); + LOG.error(message); + throw new IllegalArgumentException(message, e); + } + } + return files; + } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkRunnerResult.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkRunnerResult.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkRunnerResult.java index 59b8b63..dfbaf66 100644 --- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkRunnerResult.java +++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkRunnerResult.java @@ -29,38 +29,38 @@ import java.util.Map; * {@link com.google.cloud.dataflow.sdk.transforms.Aggregator}s. */ public class FlinkRunnerResult implements PipelineResult { - - private final Map<String, Object> aggregators; - - private final long runtime; - - public FlinkRunnerResult(Map<String, Object> aggregators, long runtime) { - this.aggregators = (aggregators == null || aggregators.isEmpty()) ? - Collections.<String, Object>emptyMap() : - Collections.unmodifiableMap(aggregators); - - this.runtime = runtime; - } + + private final Map<String, Object> aggregators; + + private final long runtime; + + public FlinkRunnerResult(Map<String, Object> aggregators, long runtime) { + this.aggregators = (aggregators == null || aggregators.isEmpty()) ? + Collections.<String, Object>emptyMap() : + Collections.unmodifiableMap(aggregators); + + this.runtime = runtime; + } - @Override - public State getState() { - return null; - } + @Override + public State getState() { + return null; + } - @Override - public <T> AggregatorValues<T> getAggregatorValues(final Aggregator<?, T> aggregator) throws AggregatorRetrievalException { - // TODO provide a list of all accumulator step values - Object value = aggregators.get(aggregator.getName()); - if (value != null) { - return new AggregatorValues<T>() { - @Override - public Map<String, T> getValuesAtSteps() { - return (Map<String, T>) aggregators; - } - }; - } else { - throw new AggregatorRetrievalException("Accumulator results not found.", - new RuntimeException("Accumulator does not exist.")); - } - } + @Override + public <T> AggregatorValues<T> getAggregatorValues(final Aggregator<?, T> aggregator) throws AggregatorRetrievalException { + // TODO provide a list of all accumulator step values + Object value = aggregators.get(aggregator.getName()); + if (value != null) { + return new AggregatorValues<T>() { + @Override + public Map<String, T> getValuesAtSteps() { + return (Map<String, T>) aggregators; + } + }; + } else { + throw new AggregatorRetrievalException("Accumulator results not found.", + new RuntimeException("Accumulator does not exist.")); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/TFIDF.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/TFIDF.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/TFIDF.java index 86a2695..8accae7 100644 --- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/TFIDF.java +++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/TFIDF.java @@ -89,364 +89,364 @@ import java.util.Set; * {@code --input}. */ public class TFIDF { - /** - * Options supported by {@link TFIDF}. - * <p> - * Inherits standard configuration options. - */ - private interface Options extends PipelineOptions, FlinkPipelineOptions { - @Description("Path to the directory or GCS prefix containing files to read from") - @Default.String("gs://dataflow-samples/shakespeare/") - String getInput(); - void setInput(String value); - - @Description("Prefix of output URI to write to") - @Validation.Required - String getOutput(); - void setOutput(String value); - } - - /** - * Lists documents contained beneath the {@code options.input} prefix/directory. - */ - public static Set<URI> listInputDocuments(Options options) - throws URISyntaxException, IOException { - URI baseUri = new URI(options.getInput()); - - // List all documents in the directory or GCS prefix. - URI absoluteUri; - if (baseUri.getScheme() != null) { - absoluteUri = baseUri; - } else { - absoluteUri = new URI( - "file", - baseUri.getAuthority(), - baseUri.getPath(), - baseUri.getQuery(), - baseUri.getFragment()); - } - - Set<URI> uris = new HashSet<>(); - if (absoluteUri.getScheme().equals("file")) { - File directory = new File(absoluteUri); - for (String entry : directory.list()) { - File path = new File(directory, entry); - uris.add(path.toURI()); - } - } else if (absoluteUri.getScheme().equals("gs")) { - GcsUtil gcsUtil = options.as(GcsOptions.class).getGcsUtil(); - URI gcsUriGlob = new URI( - absoluteUri.getScheme(), - absoluteUri.getAuthority(), - absoluteUri.getPath() + "*", - absoluteUri.getQuery(), - absoluteUri.getFragment()); - for (GcsPath entry : gcsUtil.expand(GcsPath.fromUri(gcsUriGlob))) { - uris.add(entry.toUri()); - } - } - - return uris; - } - - /** - * Reads the documents at the provided uris and returns all lines - * from the documents tagged with which document they are from. - */ - public static class ReadDocuments - extends PTransform<PInput, PCollection<KV<URI, String>>> { - private static final long serialVersionUID = 0; - - private Iterable<URI> uris; - - public ReadDocuments(Iterable<URI> uris) { - this.uris = uris; - } - - @Override - public Coder<?> getDefaultOutputCoder() { - return KvCoder.of(StringDelegateCoder.of(URI.class), StringUtf8Coder.of()); - } - - @Override - public PCollection<KV<URI, String>> apply(PInput input) { - Pipeline pipeline = input.getPipeline(); - - // Create one TextIO.Read transform for each document - // and add its output to a PCollectionList - PCollectionList<KV<URI, String>> urisToLines = - PCollectionList.empty(pipeline); - - // TextIO.Read supports: - // - file: URIs and paths locally - // - gs: URIs on the service - for (final URI uri : uris) { - String uriString; - if (uri.getScheme().equals("file")) { - uriString = new File(uri).getPath(); - } else { - uriString = uri.toString(); - } - - PCollection<KV<URI, String>> oneUriToLines = pipeline - .apply(TextIO.Read.from(uriString) - .named("TextIO.Read(" + uriString + ")")) - .apply("WithKeys(" + uriString + ")", WithKeys.<URI, String>of(uri)); - - urisToLines = urisToLines.and(oneUriToLines); - } - - return urisToLines.apply(Flatten.<KV<URI, String>>pCollections()); - } - } - - /** - * A transform containing a basic TF-IDF pipeline. The input consists of KV objects - * where the key is the document's URI and the value is a piece - * of the document's content. The output is mapping from terms to - * scores for each document URI. - */ - public static class ComputeTfIdf - extends PTransform<PCollection<KV<URI, String>>, PCollection<KV<String, KV<URI, Double>>>> { - private static final long serialVersionUID = 0; - - public ComputeTfIdf() { } - - @Override - public PCollection<KV<String, KV<URI, Double>>> apply( - PCollection<KV<URI, String>> uriToContent) { - - // Compute the total number of documents, and - // prepare this singleton PCollectionView for - // use as a side input. - final PCollectionView<Long> totalDocuments = - uriToContent - .apply("GetURIs", Keys.<URI>create()) - .apply("RemoveDuplicateDocs", RemoveDuplicates.<URI>create()) - .apply(Count.<URI>globally()) - .apply(View.<Long>asSingleton()); - - // Create a collection of pairs mapping a URI to each - // of the words in the document associated with that that URI. - PCollection<KV<URI, String>> uriToWords = uriToContent - .apply(ParDo.named("SplitWords").of( - new DoFn<KV<URI, String>, KV<URI, String>>() { - private static final long serialVersionUID = 0; - - @Override - public void processElement(ProcessContext c) { - URI uri = c.element().getKey(); - String line = c.element().getValue(); - for (String word : line.split("\\W+")) { - // Log INFO messages when the word âloveâ is found. - if (word.toLowerCase().equals("love")) { - LOG.info("Found {}", word.toLowerCase()); - } - - if (!word.isEmpty()) { - c.output(KV.of(uri, word.toLowerCase())); - } - } - } - })); - - // Compute a mapping from each word to the total - // number of documents in which it appears. - PCollection<KV<String, Long>> wordToDocCount = uriToWords - .apply("RemoveDuplicateWords", RemoveDuplicates.<KV<URI, String>>create()) - .apply(Values.<String>create()) - .apply("CountDocs", Count.<String>perElement()); - - // Compute a mapping from each URI to the total - // number of words in the document associated with that URI. - PCollection<KV<URI, Long>> uriToWordTotal = uriToWords - .apply("GetURIs2", Keys.<URI>create()) - .apply("CountWords", Count.<URI>perElement()); - - // Count, for each (URI, word) pair, the number of - // occurrences of that word in the document associated - // with the URI. - PCollection<KV<KV<URI, String>, Long>> uriAndWordToCount = uriToWords - .apply("CountWordDocPairs", Count.<KV<URI, String>>perElement()); - - // Adjust the above collection to a mapping from - // (URI, word) pairs to counts into an isomorphic mapping - // from URI to (word, count) pairs, to prepare for a join - // by the URI key. - PCollection<KV<URI, KV<String, Long>>> uriToWordAndCount = uriAndWordToCount - .apply(ParDo.named("ShiftKeys").of( - new DoFn<KV<KV<URI, String>, Long>, KV<URI, KV<String, Long>>>() { - private static final long serialVersionUID = 0; - - @Override - public void processElement(ProcessContext c) { - URI uri = c.element().getKey().getKey(); - String word = c.element().getKey().getValue(); - Long occurrences = c.element().getValue(); - c.output(KV.of(uri, KV.of(word, occurrences))); - } - })); - - // Prepare to join the mapping of URI to (word, count) pairs with - // the mapping of URI to total word counts, by associating - // each of the input PCollection<KV<URI, ...>> with - // a tuple tag. Each input must have the same key type, URI - // in this case. The type parameter of the tuple tag matches - // the types of the values for each collection. - final TupleTag<Long> wordTotalsTag = new TupleTag<>(); - final TupleTag<KV<String, Long>> wordCountsTag = new TupleTag<>(); - KeyedPCollectionTuple<URI> coGbkInput = KeyedPCollectionTuple - .of(wordTotalsTag, uriToWordTotal) - .and(wordCountsTag, uriToWordAndCount); - - // Perform a CoGroupByKey (a sort of pre-join) on the prepared - // inputs. This yields a mapping from URI to a CoGbkResult - // (CoGroupByKey Result). The CoGbkResult is a mapping - // from the above tuple tags to the values in each input - // associated with a particular URI. In this case, each - // KV<URI, CoGbkResult> group a URI with the total number of - // words in that document as well as all the (word, count) - // pairs for particular words. - PCollection<KV<URI, CoGbkResult>> uriToWordAndCountAndTotal = coGbkInput - .apply("CoGroupByUri", CoGroupByKey.<URI>create()); - - // Compute a mapping from each word to a (URI, term frequency) - // pair for each URI. A word's term frequency for a document - // is simply the number of times that word occurs in the document - // divided by the total number of words in the document. - PCollection<KV<String, KV<URI, Double>>> wordToUriAndTf = uriToWordAndCountAndTotal - .apply(ParDo.named("ComputeTermFrequencies").of( - new DoFn<KV<URI, CoGbkResult>, KV<String, KV<URI, Double>>>() { - private static final long serialVersionUID = 0; - - @Override - public void processElement(ProcessContext c) { - URI uri = c.element().getKey(); - Long wordTotal = c.element().getValue().getOnly(wordTotalsTag); - - for (KV<String, Long> wordAndCount - : c.element().getValue().getAll(wordCountsTag)) { - String word = wordAndCount.getKey(); - Long wordCount = wordAndCount.getValue(); - Double termFrequency = wordCount.doubleValue() / wordTotal.doubleValue(); - c.output(KV.of(word, KV.of(uri, termFrequency))); - } - } - })); - - // Compute a mapping from each word to its document frequency. - // A word's document frequency in a corpus is the number of - // documents in which the word appears divided by the total - // number of documents in the corpus. Note how the total number of - // documents is passed as a side input; the same value is - // presented to each invocation of the DoFn. - PCollection<KV<String, Double>> wordToDf = wordToDocCount - .apply(ParDo - .named("ComputeDocFrequencies") - .withSideInputs(totalDocuments) - .of(new DoFn<KV<String, Long>, KV<String, Double>>() { - private static final long serialVersionUID = 0; - - @Override - public void processElement(ProcessContext c) { - String word = c.element().getKey(); - Long documentCount = c.element().getValue(); - Long documentTotal = c.sideInput(totalDocuments); - Double documentFrequency = documentCount.doubleValue() - / documentTotal.doubleValue(); - - c.output(KV.of(word, documentFrequency)); - } - })); - - // Join the term frequency and document frequency - // collections, each keyed on the word. - final TupleTag<KV<URI, Double>> tfTag = new TupleTag<>(); - final TupleTag<Double> dfTag = new TupleTag<>(); - PCollection<KV<String, CoGbkResult>> wordToUriAndTfAndDf = KeyedPCollectionTuple - .of(tfTag, wordToUriAndTf) - .and(dfTag, wordToDf) - .apply(CoGroupByKey.<String>create()); - - // Compute a mapping from each word to a (URI, TF-IDF) score - // for each URI. There are a variety of definitions of TF-IDF - // ("term frequency - inverse document frequency") score; - // here we use a basic version that is the term frequency - // divided by the log of the document frequency. - - return wordToUriAndTfAndDf - .apply(ParDo.named("ComputeTfIdf").of( - new DoFn<KV<String, CoGbkResult>, KV<String, KV<URI, Double>>>() { - private static final long serialVersionUID1 = 0; - - @Override - public void processElement(ProcessContext c) { - String word = c.element().getKey(); - Double df = c.element().getValue().getOnly(dfTag); - - for (KV<URI, Double> uriAndTf : c.element().getValue().getAll(tfTag)) { - URI uri = uriAndTf.getKey(); - Double tf = uriAndTf.getValue(); - Double tfIdf = tf * Math.log(1 / df); - c.output(KV.of(word, KV.of(uri, tfIdf))); - } - } - })); - } - - // Instantiate Logger. - // It is suggested that the user specify the class name of the containing class - // (in this case ComputeTfIdf). - private static final Logger LOG = LoggerFactory.getLogger(ComputeTfIdf.class); - } - - /** - * A {@link PTransform} to write, in CSV format, a mapping from term and URI - * to score. - */ - public static class WriteTfIdf - extends PTransform<PCollection<KV<String, KV<URI, Double>>>, PDone> { - private static final long serialVersionUID = 0; - - private String output; - - public WriteTfIdf(String output) { - this.output = output; - } - - @Override - public PDone apply(PCollection<KV<String, KV<URI, Double>>> wordToUriAndTfIdf) { - return wordToUriAndTfIdf - .apply(ParDo.named("Format").of(new DoFn<KV<String, KV<URI, Double>>, String>() { - private static final long serialVersionUID = 0; - - @Override - public void processElement(ProcessContext c) { - c.output(String.format("%s,\t%s,\t%f", - c.element().getKey(), - c.element().getValue().getKey(), - c.element().getValue().getValue())); - } - })) - .apply(TextIO.Write - .to(output) - .withSuffix(".csv")); - } - } - - public static void main(String[] args) throws Exception { - Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class); - - options.setRunner(FlinkPipelineRunner.class); - - Pipeline pipeline = Pipeline.create(options); - pipeline.getCoderRegistry().registerCoder(URI.class, StringDelegateCoder.of(URI.class)); - - pipeline - .apply(new ReadDocuments(listInputDocuments(options))) - .apply(new ComputeTfIdf()) - .apply(new WriteTfIdf(options.getOutput())); - - pipeline.run(); - } + /** + * Options supported by {@link TFIDF}. + * <p> + * Inherits standard configuration options. + */ + private interface Options extends PipelineOptions, FlinkPipelineOptions { + @Description("Path to the directory or GCS prefix containing files to read from") + @Default.String("gs://dataflow-samples/shakespeare/") + String getInput(); + void setInput(String value); + + @Description("Prefix of output URI to write to") + @Validation.Required + String getOutput(); + void setOutput(String value); + } + + /** + * Lists documents contained beneath the {@code options.input} prefix/directory. + */ + public static Set<URI> listInputDocuments(Options options) + throws URISyntaxException, IOException { + URI baseUri = new URI(options.getInput()); + + // List all documents in the directory or GCS prefix. + URI absoluteUri; + if (baseUri.getScheme() != null) { + absoluteUri = baseUri; + } else { + absoluteUri = new URI( + "file", + baseUri.getAuthority(), + baseUri.getPath(), + baseUri.getQuery(), + baseUri.getFragment()); + } + + Set<URI> uris = new HashSet<>(); + if (absoluteUri.getScheme().equals("file")) { + File directory = new File(absoluteUri); + for (String entry : directory.list()) { + File path = new File(directory, entry); + uris.add(path.toURI()); + } + } else if (absoluteUri.getScheme().equals("gs")) { + GcsUtil gcsUtil = options.as(GcsOptions.class).getGcsUtil(); + URI gcsUriGlob = new URI( + absoluteUri.getScheme(), + absoluteUri.getAuthority(), + absoluteUri.getPath() + "*", + absoluteUri.getQuery(), + absoluteUri.getFragment()); + for (GcsPath entry : gcsUtil.expand(GcsPath.fromUri(gcsUriGlob))) { + uris.add(entry.toUri()); + } + } + + return uris; + } + + /** + * Reads the documents at the provided uris and returns all lines + * from the documents tagged with which document they are from. + */ + public static class ReadDocuments + extends PTransform<PInput, PCollection<KV<URI, String>>> { + private static final long serialVersionUID = 0; + + private Iterable<URI> uris; + + public ReadDocuments(Iterable<URI> uris) { + this.uris = uris; + } + + @Override + public Coder<?> getDefaultOutputCoder() { + return KvCoder.of(StringDelegateCoder.of(URI.class), StringUtf8Coder.of()); + } + + @Override + public PCollection<KV<URI, String>> apply(PInput input) { + Pipeline pipeline = input.getPipeline(); + + // Create one TextIO.Read transform for each document + // and add its output to a PCollectionList + PCollectionList<KV<URI, String>> urisToLines = + PCollectionList.empty(pipeline); + + // TextIO.Read supports: + // - file: URIs and paths locally + // - gs: URIs on the service + for (final URI uri : uris) { + String uriString; + if (uri.getScheme().equals("file")) { + uriString = new File(uri).getPath(); + } else { + uriString = uri.toString(); + } + + PCollection<KV<URI, String>> oneUriToLines = pipeline + .apply(TextIO.Read.from(uriString) + .named("TextIO.Read(" + uriString + ")")) + .apply("WithKeys(" + uriString + ")", WithKeys.<URI, String>of(uri)); + + urisToLines = urisToLines.and(oneUriToLines); + } + + return urisToLines.apply(Flatten.<KV<URI, String>>pCollections()); + } + } + + /** + * A transform containing a basic TF-IDF pipeline. The input consists of KV objects + * where the key is the document's URI and the value is a piece + * of the document's content. The output is mapping from terms to + * scores for each document URI. + */ + public static class ComputeTfIdf + extends PTransform<PCollection<KV<URI, String>>, PCollection<KV<String, KV<URI, Double>>>> { + private static final long serialVersionUID = 0; + + public ComputeTfIdf() { } + + @Override + public PCollection<KV<String, KV<URI, Double>>> apply( + PCollection<KV<URI, String>> uriToContent) { + + // Compute the total number of documents, and + // prepare this singleton PCollectionView for + // use as a side input. + final PCollectionView<Long> totalDocuments = + uriToContent + .apply("GetURIs", Keys.<URI>create()) + .apply("RemoveDuplicateDocs", RemoveDuplicates.<URI>create()) + .apply(Count.<URI>globally()) + .apply(View.<Long>asSingleton()); + + // Create a collection of pairs mapping a URI to each + // of the words in the document associated with that that URI. + PCollection<KV<URI, String>> uriToWords = uriToContent + .apply(ParDo.named("SplitWords").of( + new DoFn<KV<URI, String>, KV<URI, String>>() { + private static final long serialVersionUID = 0; + + @Override + public void processElement(ProcessContext c) { + URI uri = c.element().getKey(); + String line = c.element().getValue(); + for (String word : line.split("\\W+")) { + // Log INFO messages when the word âloveâ is found. + if (word.toLowerCase().equals("love")) { + LOG.info("Found {}", word.toLowerCase()); + } + + if (!word.isEmpty()) { + c.output(KV.of(uri, word.toLowerCase())); + } + } + } + })); + + // Compute a mapping from each word to the total + // number of documents in which it appears. + PCollection<KV<String, Long>> wordToDocCount = uriToWords + .apply("RemoveDuplicateWords", RemoveDuplicates.<KV<URI, String>>create()) + .apply(Values.<String>create()) + .apply("CountDocs", Count.<String>perElement()); + + // Compute a mapping from each URI to the total + // number of words in the document associated with that URI. + PCollection<KV<URI, Long>> uriToWordTotal = uriToWords + .apply("GetURIs2", Keys.<URI>create()) + .apply("CountWords", Count.<URI>perElement()); + + // Count, for each (URI, word) pair, the number of + // occurrences of that word in the document associated + // with the URI. + PCollection<KV<KV<URI, String>, Long>> uriAndWordToCount = uriToWords + .apply("CountWordDocPairs", Count.<KV<URI, String>>perElement()); + + // Adjust the above collection to a mapping from + // (URI, word) pairs to counts into an isomorphic mapping + // from URI to (word, count) pairs, to prepare for a join + // by the URI key. + PCollection<KV<URI, KV<String, Long>>> uriToWordAndCount = uriAndWordToCount + .apply(ParDo.named("ShiftKeys").of( + new DoFn<KV<KV<URI, String>, Long>, KV<URI, KV<String, Long>>>() { + private static final long serialVersionUID = 0; + + @Override + public void processElement(ProcessContext c) { + URI uri = c.element().getKey().getKey(); + String word = c.element().getKey().getValue(); + Long occurrences = c.element().getValue(); + c.output(KV.of(uri, KV.of(word, occurrences))); + } + })); + + // Prepare to join the mapping of URI to (word, count) pairs with + // the mapping of URI to total word counts, by associating + // each of the input PCollection<KV<URI, ...>> with + // a tuple tag. Each input must have the same key type, URI + // in this case. The type parameter of the tuple tag matches + // the types of the values for each collection. + final TupleTag<Long> wordTotalsTag = new TupleTag<>(); + final TupleTag<KV<String, Long>> wordCountsTag = new TupleTag<>(); + KeyedPCollectionTuple<URI> coGbkInput = KeyedPCollectionTuple + .of(wordTotalsTag, uriToWordTotal) + .and(wordCountsTag, uriToWordAndCount); + + // Perform a CoGroupByKey (a sort of pre-join) on the prepared + // inputs. This yields a mapping from URI to a CoGbkResult + // (CoGroupByKey Result). The CoGbkResult is a mapping + // from the above tuple tags to the values in each input + // associated with a particular URI. In this case, each + // KV<URI, CoGbkResult> group a URI with the total number of + // words in that document as well as all the (word, count) + // pairs for particular words. + PCollection<KV<URI, CoGbkResult>> uriToWordAndCountAndTotal = coGbkInput + .apply("CoGroupByUri", CoGroupByKey.<URI>create()); + + // Compute a mapping from each word to a (URI, term frequency) + // pair for each URI. A word's term frequency for a document + // is simply the number of times that word occurs in the document + // divided by the total number of words in the document. + PCollection<KV<String, KV<URI, Double>>> wordToUriAndTf = uriToWordAndCountAndTotal + .apply(ParDo.named("ComputeTermFrequencies").of( + new DoFn<KV<URI, CoGbkResult>, KV<String, KV<URI, Double>>>() { + private static final long serialVersionUID = 0; + + @Override + public void processElement(ProcessContext c) { + URI uri = c.element().getKey(); + Long wordTotal = c.element().getValue().getOnly(wordTotalsTag); + + for (KV<String, Long> wordAndCount + : c.element().getValue().getAll(wordCountsTag)) { + String word = wordAndCount.getKey(); + Long wordCount = wordAndCount.getValue(); + Double termFrequency = wordCount.doubleValue() / wordTotal.doubleValue(); + c.output(KV.of(word, KV.of(uri, termFrequency))); + } + } + })); + + // Compute a mapping from each word to its document frequency. + // A word's document frequency in a corpus is the number of + // documents in which the word appears divided by the total + // number of documents in the corpus. Note how the total number of + // documents is passed as a side input; the same value is + // presented to each invocation of the DoFn. + PCollection<KV<String, Double>> wordToDf = wordToDocCount + .apply(ParDo + .named("ComputeDocFrequencies") + .withSideInputs(totalDocuments) + .of(new DoFn<KV<String, Long>, KV<String, Double>>() { + private static final long serialVersionUID = 0; + + @Override + public void processElement(ProcessContext c) { + String word = c.element().getKey(); + Long documentCount = c.element().getValue(); + Long documentTotal = c.sideInput(totalDocuments); + Double documentFrequency = documentCount.doubleValue() + / documentTotal.doubleValue(); + + c.output(KV.of(word, documentFrequency)); + } + })); + + // Join the term frequency and document frequency + // collections, each keyed on the word. + final TupleTag<KV<URI, Double>> tfTag = new TupleTag<>(); + final TupleTag<Double> dfTag = new TupleTag<>(); + PCollection<KV<String, CoGbkResult>> wordToUriAndTfAndDf = KeyedPCollectionTuple + .of(tfTag, wordToUriAndTf) + .and(dfTag, wordToDf) + .apply(CoGroupByKey.<String>create()); + + // Compute a mapping from each word to a (URI, TF-IDF) score + // for each URI. There are a variety of definitions of TF-IDF + // ("term frequency - inverse document frequency") score; + // here we use a basic version that is the term frequency + // divided by the log of the document frequency. + + return wordToUriAndTfAndDf + .apply(ParDo.named("ComputeTfIdf").of( + new DoFn<KV<String, CoGbkResult>, KV<String, KV<URI, Double>>>() { + private static final long serialVersionUID1 = 0; + + @Override + public void processElement(ProcessContext c) { + String word = c.element().getKey(); + Double df = c.element().getValue().getOnly(dfTag); + + for (KV<URI, Double> uriAndTf : c.element().getValue().getAll(tfTag)) { + URI uri = uriAndTf.getKey(); + Double tf = uriAndTf.getValue(); + Double tfIdf = tf * Math.log(1 / df); + c.output(KV.of(word, KV.of(uri, tfIdf))); + } + } + })); + } + + // Instantiate Logger. + // It is suggested that the user specify the class name of the containing class + // (in this case ComputeTfIdf). + private static final Logger LOG = LoggerFactory.getLogger(ComputeTfIdf.class); + } + + /** + * A {@link PTransform} to write, in CSV format, a mapping from term and URI + * to score. + */ + public static class WriteTfIdf + extends PTransform<PCollection<KV<String, KV<URI, Double>>>, PDone> { + private static final long serialVersionUID = 0; + + private String output; + + public WriteTfIdf(String output) { + this.output = output; + } + + @Override + public PDone apply(PCollection<KV<String, KV<URI, Double>>> wordToUriAndTfIdf) { + return wordToUriAndTfIdf + .apply(ParDo.named("Format").of(new DoFn<KV<String, KV<URI, Double>>, String>() { + private static final long serialVersionUID = 0; + + @Override + public void processElement(ProcessContext c) { + c.output(String.format("%s,\t%s,\t%f", + c.element().getKey(), + c.element().getValue().getKey(), + c.element().getValue().getValue())); + } + })) + .apply(TextIO.Write + .to(output) + .withSuffix(".csv")); + } + } + + public static void main(String[] args) throws Exception { + Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class); + + options.setRunner(FlinkPipelineRunner.class); + + Pipeline pipeline = Pipeline.create(options); + pipeline.getCoderRegistry().registerCoder(URI.class, StringDelegateCoder.of(URI.class)); + + pipeline + .apply(new ReadDocuments(listInputDocuments(options))) + .apply(new ComputeTfIdf()) + .apply(new WriteTfIdf(options.getOutput())); + + pipeline.run(); + } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/WordCount.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/WordCount.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/WordCount.java index e737fe8..4f721b4 100644 --- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/WordCount.java +++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/WordCount.java @@ -29,83 +29,83 @@ import com.google.cloud.dataflow.sdk.values.PCollection; public class WordCount { - public static class ExtractWordsFn extends DoFn<String, String> { - private final Aggregator<Long, Long> emptyLines = - createAggregator("emptyLines", new Sum.SumLongFn()); - - @Override - public void processElement(ProcessContext c) { - if (c.element().trim().isEmpty()) { - emptyLines.addValue(1L); - } - - // Split the line into words. - String[] words = c.element().split("[^a-zA-Z']+"); - - // Output each word encountered into the output PCollection. - for (String word : words) { - if (!word.isEmpty()) { - c.output(word); - } - } - } - } - - public static class CountWords extends PTransform<PCollection<String>, + public static class ExtractWordsFn extends DoFn<String, String> { + private final Aggregator<Long, Long> emptyLines = + createAggregator("emptyLines", new Sum.SumLongFn()); + + @Override + public void processElement(ProcessContext c) { + if (c.element().trim().isEmpty()) { + emptyLines.addValue(1L); + } + + // Split the line into words. + String[] words = c.element().split("[^a-zA-Z']+"); + + // Output each word encountered into the output PCollection. + for (String word : words) { + if (!word.isEmpty()) { + c.output(word); + } + } + } + } + + public static class CountWords extends PTransform<PCollection<String>, PCollection<KV<String, Long>>> { - @Override - public PCollection<KV<String, Long>> apply(PCollection<String> lines) { - - // Convert lines of text into individual words. - PCollection<String> words = lines.apply( - ParDo.of(new ExtractWordsFn())); - - // Count the number of times each word occurs. - PCollection<KV<String, Long>> wordCounts = - words.apply(Count.<String>perElement()); - - return wordCounts; - } - } - - /** A SimpleFunction that converts a Word and Count into a printable string. */ - public static class FormatAsTextFn extends SimpleFunction<KV<String, Long>, String> { - @Override - public String apply(KV<String, Long> input) { - return input.getKey() + ": " + input.getValue(); - } - } - - /** - * Options supported by {@link WordCount}. - * <p> - * Inherits standard configuration options. - */ - public interface Options extends PipelineOptions, FlinkPipelineOptions { - @Description("Path of the file to read from") - @Default.String("gs://dataflow-samples/shakespeare/kinglear.txt") - String getInput(); - void setInput(String value); - - @Description("Path of the file to write to") - String getOutput(); - void setOutput(String value); - } - - public static void main(String[] args) { - - Options options = PipelineOptionsFactory.fromArgs(args).withValidation() - .as(Options.class); - options.setRunner(FlinkPipelineRunner.class); - - Pipeline p = Pipeline.create(options); - - p.apply(TextIO.Read.named("ReadLines").from(options.getInput())) - .apply(new CountWords()) - .apply(MapElements.via(new FormatAsTextFn())) - .apply(TextIO.Write.named("WriteCounts").to(options.getOutput())); - - p.run(); - } + @Override + public PCollection<KV<String, Long>> apply(PCollection<String> lines) { + + // Convert lines of text into individual words. + PCollection<String> words = lines.apply( + ParDo.of(new ExtractWordsFn())); + + // Count the number of times each word occurs. + PCollection<KV<String, Long>> wordCounts = + words.apply(Count.<String>perElement()); + + return wordCounts; + } + } + + /** A SimpleFunction that converts a Word and Count into a printable string. */ + public static class FormatAsTextFn extends SimpleFunction<KV<String, Long>, String> { + @Override + public String apply(KV<String, Long> input) { + return input.getKey() + ": " + input.getValue(); + } + } + + /** + * Options supported by {@link WordCount}. + * <p> + * Inherits standard configuration options. + */ + public interface Options extends PipelineOptions, FlinkPipelineOptions { + @Description("Path of the file to read from") + @Default.String("gs://dataflow-samples/shakespeare/kinglear.txt") + String getInput(); + void setInput(String value); + + @Description("Path of the file to write to") + String getOutput(); + void setOutput(String value); + } + + public static void main(String[] args) { + + Options options = PipelineOptionsFactory.fromArgs(args).withValidation() + .as(Options.class); + options.setRunner(FlinkPipelineRunner.class); + + Pipeline p = Pipeline.create(options); + + p.apply(TextIO.Read.named("ReadLines").from(options.getInput())) + .apply(new CountWords()) + .apply(MapElements.via(new FormatAsTextFn())) + .apply(TextIO.Write.named("WriteCounts").to(options.getOutput())); + + p.run(); + } }
