Rearranging the code and renaming certain classes.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/bc4c60eb Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/bc4c60eb Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/bc4c60eb Branch: refs/heads/master Commit: bc4c60ebfa49ad050367533809c265375e8c0b01 Parents: 37a9b29 Author: kl0u <[email protected]> Authored: Mon Feb 29 12:38:56 2016 +0100 Committer: Davor Bonaci <[email protected]> Committed: Fri Mar 4 10:04:23 2016 -0800 ---------------------------------------------------------------------- .../dataflow/FlinkJobExecutionEnvironment.java | 237 ----------------- .../FlinkPipelineExecutionEnvironment.java | 255 +++++++++++++++++++ .../flink/dataflow/FlinkPipelineOptions.java | 23 +- .../flink/dataflow/FlinkPipelineRunner.java | 6 +- .../examples/streaming/AutoComplete.java | 7 +- .../examples/streaming/JoinExamples.java | 5 +- .../KafkaWindowedWordCountExample.java | 3 + .../examples/streaming/WindowedWordCount.java | 3 + .../FlinkStreamingTransformTranslators.java | 2 - .../FlinkStreamingTranslationContext.java | 9 +- .../streaming/FlinkAbstractParDoWrapper.java | 10 +- .../FlinkGroupAlsoByWindowWrapper.java | 2 - .../io/FlinkStreamingCreateFunction.java | 7 +- .../streaming/io/UnboundedSourceWrapper.java | 11 +- .../dataartisans/flink/dataflow/AvroITCase.java | 4 +- .../flink/dataflow/FlattenizeITCase.java | 2 +- .../flink/dataflow/FlinkTestPipeline.java | 14 +- .../flink/dataflow/JoinExamplesITCase.java | 2 +- .../flink/dataflow/MaybeEmptyTestITCase.java | 2 +- .../flink/dataflow/ParDoMultiOutputITCase.java | 2 +- .../flink/dataflow/ReadSourceITCase.java | 2 +- .../dataflow/RemoveDuplicatesEmptyITCase.java | 2 +- .../flink/dataflow/RemoveDuplicatesITCase.java | 2 +- .../flink/dataflow/SideInputITCase.java | 2 +- .../flink/dataflow/TfIdfITCase.java | 2 +- .../dataflow/TopWikipediaSessionsITCase.java | 144 ----------- .../flink/dataflow/WordCountITCase.java | 2 +- .../flink/dataflow/WordCountJoin2ITCase.java | 2 +- .../flink/dataflow/WordCountJoin3ITCase.java | 2 +- .../flink/dataflow/WriteSinkITCase.java | 2 +- .../streaming/GroupAlsoByWindowTest.java | 8 +- .../streaming/TopWikipediaSessionsITCase.java | 145 +++++++++++ 32 files changed, 481 insertions(+), 440 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bc4c60eb/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkJobExecutionEnvironment.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkJobExecutionEnvironment.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkJobExecutionEnvironment.java deleted file mode 100644 index 91b2f64..0000000 --- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkJobExecutionEnvironment.java +++ /dev/null @@ -1,237 +0,0 @@ -/* - * Copyright 2015 Data Artisans GmbH - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.dataartisans.flink.dataflow; - -import com.dataartisans.flink.dataflow.translation.FlinkPipelineTranslator; -import com.dataartisans.flink.dataflow.translation.FlinkBatchPipelineTranslator; -import com.dataartisans.flink.dataflow.translation.FlinkStreamingPipelineTranslator; -import com.google.cloud.dataflow.sdk.Pipeline; -import org.apache.flink.api.common.JobExecutionResult; -import org.apache.flink.api.java.CollectionEnvironment; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.streaming.api.TimeCharacteristic; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.List; - -public class FlinkJobExecutionEnvironment { - - private static final Logger LOG = LoggerFactory.getLogger(FlinkJobExecutionEnvironment.class); - - private final FlinkPipelineOptions options; - - /** - * The Flink Batch execution environment. This is instantiated to either a - * {@link org.apache.flink.api.java.CollectionEnvironment}, - * a {@link org.apache.flink.api.java.LocalEnvironment} or - * a {@link org.apache.flink.api.java.RemoteEnvironment}, depending on the configuration - * options. - */ - private ExecutionEnvironment flinkBatchEnv; - - - /** - * The Flink Streaming execution environment. This is instantiated to either a - * {@link org.apache.flink.streaming.api.environment.LocalStreamEnvironment} or - * a {@link org.apache.flink.streaming.api.environment.RemoteStreamEnvironment}, depending - * on the configuration options, and more specifically, the url of the master url. - */ - private StreamExecutionEnvironment flinkStreamEnv; - - /** - * Translator for this FlinkPipelineRunner. Its role is to translate the Dataflow operators to - * their Flink based counterparts. Based on the options provided by the user, if we have a streaming job, - * this is instantiated to a FlinkStreamingPipelineTranslator. In other case, i.e. a batch job, - * a FlinkBatchPipelineTranslator is created. - */ - private FlinkPipelineTranslator flinkPipelineTranslator; - - public FlinkJobExecutionEnvironment(FlinkPipelineOptions options) { - if (options == null) { - throw new IllegalArgumentException("Options in the FlinkJobExecutionEnvironment cannot be NULL."); - } - this.options = options; - this.createJobEnvironment(); - this.createJobGraphTranslator(); - } - - /** - * Depending on the type of job (Streaming or Batch) and the user-specified options, - * this method creates the adequate ExecutionEnvironment. - */ - private void createJobEnvironment() { - if (options.isStreaming()) { - LOG.info("Creating the required STREAMING Environment."); - createStreamExecutionEnvironment(); - } else { - LOG.info("Creating the required BATCH Environment."); - createBatchExecutionEnvironment(); - } - } - - /** - * Depending on the type of job (Streaming or Batch), this method creates the adequate job graph - * translator. In the case of batch, it will work with DataSets, while for streaming, it will work - * with DataStreams. - */ - private void createJobGraphTranslator() { - checkInitializationState(); - if (this.flinkPipelineTranslator != null) { - throw new IllegalStateException("JobGraphTranslator already initialized."); - } - - this.flinkPipelineTranslator = options.isStreaming() ? - new FlinkStreamingPipelineTranslator(flinkStreamEnv, options) : - new FlinkBatchPipelineTranslator(flinkBatchEnv, options); - } - - public void translate(Pipeline pipeline) { - checkInitializationState(); - if(this.flinkBatchEnv == null && this.flinkStreamEnv == null) { - createJobEnvironment(); - } - if (this.flinkPipelineTranslator == null) { - createJobGraphTranslator(); - } - this.flinkPipelineTranslator.translate(pipeline); - } - - public JobExecutionResult executeJob() throws Exception { - if (options.isStreaming()) { - - System.out.println("Plan: " + this.flinkStreamEnv.getExecutionPlan()); - - if (this.flinkStreamEnv == null) { - throw new RuntimeException("JobExecutionEnvironment not initialized."); - } - if (this.flinkPipelineTranslator == null) { - throw new RuntimeException("JobGraphTranslator not initialized."); - } - return this.flinkStreamEnv.execute(); - } else { - if (this.flinkBatchEnv == null) { - throw new RuntimeException("JobExecutionEnvironment not initialized."); - } - if (this.flinkPipelineTranslator == null) { - throw new RuntimeException("JobGraphTranslator not initialized."); - } - return this.flinkBatchEnv.execute(); - } - } - - /** - * If the submitted job is a batch processing job, this method creates the adequate - * Flink {@link org.apache.flink.api.java.ExecutionEnvironment} depending - * on the user-specified options. - */ - private void createBatchExecutionEnvironment() { - if (this.flinkStreamEnv != null || this.flinkBatchEnv != null) { - throw new RuntimeException("JobExecutionEnvironment already initialized."); - } - - String masterUrl = options.getFlinkMaster(); - this.flinkStreamEnv = null; - - // depending on the master, create the right environment. - if (masterUrl.equals("[local]")) { - this.flinkBatchEnv = ExecutionEnvironment.createLocalEnvironment(); - } else if (masterUrl.equals("[collection]")) { - this.flinkBatchEnv = new CollectionEnvironment(); - } else if (masterUrl.equals("[auto]")) { - this.flinkBatchEnv = ExecutionEnvironment.getExecutionEnvironment(); - } else if (masterUrl.matches(".*:\\d*")) { - String[] parts = masterUrl.split(":"); - List<String> stagingFiles = options.getFilesToStage(); - this.flinkBatchEnv = ExecutionEnvironment.createRemoteEnvironment(parts[0], - Integer.parseInt(parts[1]), - stagingFiles.toArray(new String[stagingFiles.size()])); - } else { - LOG.warn("Unrecognized Flink Master URL {}. Defaulting to [auto].", masterUrl); - this.flinkBatchEnv = ExecutionEnvironment.getExecutionEnvironment(); - } - - // set the correct parallelism. - if (options.getParallelism() != -1 && !(this.flinkBatchEnv instanceof CollectionEnvironment)) { - this.flinkBatchEnv.setParallelism(options.getParallelism()); - } - - // set parallelism in the options (required by some execution code) - options.setParallelism(flinkBatchEnv.getParallelism()); - } - - /** - * If the submitted job is a stream processing job, this method creates the adequate - * Flink {@link org.apache.flink.streaming.api.environment.StreamExecutionEnvironment} depending - * on the user-specified options. - */ - private void createStreamExecutionEnvironment() { - if (this.flinkStreamEnv != null || this.flinkBatchEnv != null) { - throw new RuntimeException("JobExecutionEnvironment already initialized."); - } - - String masterUrl = options.getFlinkMaster(); - this.flinkBatchEnv = null; - - // depending on the master, create the right environment. - if (masterUrl.equals("[local]")) { - this.flinkStreamEnv = StreamExecutionEnvironment.createLocalEnvironment(); - } else if (masterUrl.equals("[auto]")) { - this.flinkStreamEnv = StreamExecutionEnvironment.getExecutionEnvironment(); - } else if (masterUrl.matches(".*:\\d*")) { - String[] parts = masterUrl.split(":"); - List<String> stagingFiles = options.getFilesToStage(); - this.flinkStreamEnv = StreamExecutionEnvironment.createRemoteEnvironment(parts[0], - Integer.parseInt(parts[1]), stagingFiles.toArray(new String[stagingFiles.size()])); - } else { - LOG.warn("Unrecognized Flink Master URL {}. Defaulting to [auto].", masterUrl); - this.flinkStreamEnv = StreamExecutionEnvironment.getExecutionEnvironment(); - } - - // set the correct parallelism. - if (options.getParallelism() != -1) { - this.flinkStreamEnv.setParallelism(options.getParallelism()); - } - - // set parallelism in the options (required by some execution code) - options.setParallelism(flinkStreamEnv.getParallelism()); - - // although we do not use the generated timestamps, - // enabling timestamps is needed for the watermarks. - this.flinkStreamEnv.getConfig().enableTimestamps(); - - this.flinkStreamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); - - this.flinkStreamEnv.enableCheckpointing(1000); - this.flinkStreamEnv.setNumberOfExecutionRetries(5); - - LOG.info("Setting execution retry delay to 3 sec"); - this.flinkStreamEnv.getConfig().setExecutionRetryDelay(3000); - } - - private void checkInitializationState() { - if (this.options == null) { - throw new IllegalStateException("FlinkJobExecutionEnvironment is not initialized yet."); - } - - if (options.isStreaming() && this.flinkBatchEnv != null) { - throw new IllegalStateException("Attempted to run a Streaming Job with a Batch Execution Environment."); - } else if (!options.isStreaming() && this.flinkStreamEnv != null) { - throw new IllegalStateException("Attempted to run a Batch Job with a Streaming Execution Environment."); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bc4c60eb/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkPipelineExecutionEnvironment.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkPipelineExecutionEnvironment.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkPipelineExecutionEnvironment.java new file mode 100644 index 0000000..a1372bd --- /dev/null +++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkPipelineExecutionEnvironment.java @@ -0,0 +1,255 @@ +/* + * Copyright 2015 Data Artisans GmbH + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.dataartisans.flink.dataflow; + +import com.dataartisans.flink.dataflow.translation.FlinkPipelineTranslator; +import com.dataartisans.flink.dataflow.translation.FlinkBatchPipelineTranslator; +import com.dataartisans.flink.dataflow.translation.FlinkStreamingPipelineTranslator; +import com.google.cloud.dataflow.sdk.Pipeline; +import com.google.common.base.Preconditions; +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.java.CollectionEnvironment; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; + +public class FlinkPipelineExecutionEnvironment { + + private static final Logger LOG = LoggerFactory.getLogger(FlinkPipelineExecutionEnvironment.class); + + private final FlinkPipelineOptions options; + + /** + * The Flink Batch execution environment. This is instantiated to either a + * {@link org.apache.flink.api.java.CollectionEnvironment}, + * a {@link org.apache.flink.api.java.LocalEnvironment} or + * a {@link org.apache.flink.api.java.RemoteEnvironment}, depending on the configuration + * options. + */ + private ExecutionEnvironment flinkBatchEnv; + + + /** + * The Flink Streaming execution environment. This is instantiated to either a + * {@link org.apache.flink.streaming.api.environment.LocalStreamEnvironment} or + * a {@link org.apache.flink.streaming.api.environment.RemoteStreamEnvironment}, depending + * on the configuration options, and more specifically, the url of the master. + */ + private StreamExecutionEnvironment flinkStreamEnv; + + /** + * Translator for this FlinkPipelineRunner. Its role is to translate the Beam operators to + * their Flink counterparts. Based on the options provided by the user, if we have a streaming job, + * this is instantiated as a {@link FlinkStreamingPipelineTranslator}. In other case, i.e. a batch job, + * a {@link FlinkBatchPipelineTranslator} is created. + */ + private FlinkPipelineTranslator flinkPipelineTranslator; + + /** + * Creates a {@link FlinkPipelineExecutionEnvironment} with the user-specified parameters in the + * provided {@link FlinkPipelineOptions}. + * + * @param options the user-defined pipeline options. + * */ + public FlinkPipelineExecutionEnvironment(FlinkPipelineOptions options) { + this.options = Preconditions.checkNotNull(options); + this.createPipelineExecutionEnvironment(); + this.createPipelineTranslator(); + } + + /** + * Depending on the type of job (Streaming or Batch) and the user-specified options, + * this method creates the adequate ExecutionEnvironment. + */ + private void createPipelineExecutionEnvironment() { + if (options.isStreaming()) { + createStreamExecutionEnvironment(); + } else { + createBatchExecutionEnvironment(); + } + } + + /** + * Depending on the type of job (Streaming or Batch), this method creates the adequate job graph + * translator. In the case of batch, it will work with {@link org.apache.flink.api.java.DataSet}, + * while for streaming, it will work with {@link org.apache.flink.streaming.api.datastream.DataStream}. + */ + private void createPipelineTranslator() { + checkInitializationState(); + if (this.flinkPipelineTranslator != null) { + throw new IllegalStateException("FlinkPipelineTranslator already initialized."); + } + + this.flinkPipelineTranslator = options.isStreaming() ? + new FlinkStreamingPipelineTranslator(flinkStreamEnv, options) : + new FlinkBatchPipelineTranslator(flinkBatchEnv, options); + } + + /** + * Depending on if the job is a Streaming or a Batch one, this method creates + * the necessary execution environment and pipeline translator, and translates + * the {@link com.google.cloud.dataflow.sdk.values.PCollection} program into + * a {@link org.apache.flink.api.java.DataSet} or {@link org.apache.flink.streaming.api.datastream.DataStream} + * one. + * */ + public void translate(Pipeline pipeline) { + checkInitializationState(); + if(this.flinkBatchEnv == null && this.flinkStreamEnv == null) { + createPipelineExecutionEnvironment(); + } + if (this.flinkPipelineTranslator == null) { + createPipelineTranslator(); + } + this.flinkPipelineTranslator.translate(pipeline); + } + + /** + * Launches the program execution. + * */ + public JobExecutionResult executePipeline() throws Exception { + if (options.isStreaming()) { + if (this.flinkStreamEnv == null) { + throw new RuntimeException("FlinkPipelineExecutionEnvironment not initialized."); + } + if (this.flinkPipelineTranslator == null) { + throw new RuntimeException("FlinkPipelineTranslator not initialized."); + } + return this.flinkStreamEnv.execute(); + } else { + if (this.flinkBatchEnv == null) { + throw new RuntimeException("FlinkPipelineExecutionEnvironment not initialized."); + } + if (this.flinkPipelineTranslator == null) { + throw new RuntimeException("FlinkPipelineTranslator not initialized."); + } + return this.flinkBatchEnv.execute(); + } + } + + /** + * If the submitted job is a batch processing job, this method creates the adequate + * Flink {@link org.apache.flink.api.java.ExecutionEnvironment} depending + * on the user-specified options. + */ + private void createBatchExecutionEnvironment() { + if (this.flinkStreamEnv != null || this.flinkBatchEnv != null) { + throw new RuntimeException("FlinkPipelineExecutionEnvironment already initialized."); + } + + LOG.info("Creating the required Batch Execution Environment."); + + String masterUrl = options.getFlinkMaster(); + this.flinkStreamEnv = null; + + // depending on the master, create the right environment. + if (masterUrl.equals("[local]")) { + this.flinkBatchEnv = ExecutionEnvironment.createLocalEnvironment(); + } else if (masterUrl.equals("[collection]")) { + this.flinkBatchEnv = new CollectionEnvironment(); + } else if (masterUrl.equals("[auto]")) { + this.flinkBatchEnv = ExecutionEnvironment.getExecutionEnvironment(); + } else if (masterUrl.matches(".*:\\d*")) { + String[] parts = masterUrl.split(":"); + List<String> stagingFiles = options.getFilesToStage(); + this.flinkBatchEnv = ExecutionEnvironment.createRemoteEnvironment(parts[0], + Integer.parseInt(parts[1]), + stagingFiles.toArray(new String[stagingFiles.size()])); + } else { + LOG.warn("Unrecognized Flink Master URL {}. Defaulting to [auto].", masterUrl); + this.flinkBatchEnv = ExecutionEnvironment.getExecutionEnvironment(); + } + + // set the correct parallelism. + if (options.getParallelism() != -1 && !(this.flinkBatchEnv instanceof CollectionEnvironment)) { + this.flinkBatchEnv.setParallelism(options.getParallelism()); + } + + // set parallelism in the options (required by some execution code) + options.setParallelism(flinkBatchEnv.getParallelism()); + } + + /** + * If the submitted job is a stream processing job, this method creates the adequate + * Flink {@link org.apache.flink.streaming.api.environment.StreamExecutionEnvironment} depending + * on the user-specified options. + */ + private void createStreamExecutionEnvironment() { + if (this.flinkStreamEnv != null || this.flinkBatchEnv != null) { + throw new RuntimeException("FlinkPipelineExecutionEnvironment already initialized."); + } + + LOG.info("Creating the required Streaming Environment."); + + String masterUrl = options.getFlinkMaster(); + this.flinkBatchEnv = null; + + // depending on the master, create the right environment. + if (masterUrl.equals("[local]")) { + this.flinkStreamEnv = StreamExecutionEnvironment.createLocalEnvironment(); + } else if (masterUrl.equals("[auto]")) { + this.flinkStreamEnv = StreamExecutionEnvironment.getExecutionEnvironment(); + } else if (masterUrl.matches(".*:\\d*")) { + String[] parts = masterUrl.split(":"); + List<String> stagingFiles = options.getFilesToStage(); + this.flinkStreamEnv = StreamExecutionEnvironment.createRemoteEnvironment(parts[0], + Integer.parseInt(parts[1]), stagingFiles.toArray(new String[stagingFiles.size()])); + } else { + LOG.warn("Unrecognized Flink Master URL {}. Defaulting to [auto].", masterUrl); + this.flinkStreamEnv = StreamExecutionEnvironment.getExecutionEnvironment(); + } + + // set the correct parallelism. + if (options.getParallelism() != -1) { + this.flinkStreamEnv.setParallelism(options.getParallelism()); + } + + // set parallelism in the options (required by some execution code) + options.setParallelism(flinkStreamEnv.getParallelism()); + + // although we do not use the generated timestamps, + // enabling timestamps is needed for the watermarks. + this.flinkStreamEnv.getConfig().enableTimestamps(); + this.flinkStreamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); + + // for the following 2 parameters, a value of -1 means that Flink will use + // the default values as specified in the configuration. + this.flinkStreamEnv.setNumberOfExecutionRetries(options.getNumberOfExecutionRetries()); + this.flinkStreamEnv.getConfig().setExecutionRetryDelay(options.getExecutionRetryDelay()); + + // A value of -1 corresponds to disabled checkpointing (see CheckpointConfig in Flink). + // If the value is not -1, then the validity checks are applied. + // By default, checkpointing is disabled. + long checkpointInterval = options.getCheckpointingInterval(); + if(checkpointInterval != -1) { + if (checkpointInterval < 1) { + throw new IllegalArgumentException("The checkpoint interval must be positive"); + } + this.flinkStreamEnv.enableCheckpointing(checkpointInterval); + } + } + + private void checkInitializationState() { + if (options.isStreaming() && this.flinkBatchEnv != null) { + throw new IllegalStateException("Attempted to run a Streaming Job with a Batch Execution Environment."); + } else if (!options.isStreaming() && this.flinkStreamEnv != null) { + throw new IllegalStateException("Attempted to run a Batch Job with a Streaming Execution Environment."); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bc4c60eb/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkPipelineOptions.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkPipelineOptions.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkPipelineOptions.java index e746f41..2429cac 100644 --- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkPipelineOptions.java +++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkPipelineOptions.java @@ -66,11 +66,26 @@ public interface FlinkPipelineOptions extends PipelineOptions, ApplicationNameOp String getFlinkMaster(); void setFlinkMaster(String value); - /** - * The degree of parallelism to be used when parallelizing operations onto workers. - */ - @Description("The degree of parallelism to be used when parallelizing operations onto workers.") + @Description("The degree of parallelism to be used when distributing operations onto workers.") @Default.Integer(-1) Integer getParallelism(); void setParallelism(Integer value); + + @Description("The interval between consecutive checkpoints (i.e. snapshots of the current pipeline state used for " + + "fault tolerance).") + @Default.Long(-1L) + Long getCheckpointingInterval(); + void setCheckpointingInterval(Long interval); + + @Description("Sets the number of times that failed tasks are re-executed. " + + "A value of zero effectively disables fault tolerance. A value of -1 indicates " + + "that the system default value (as defined in the configuration) should be used.") + @Default.Integer(-1) + Integer getNumberOfExecutionRetries(); + void setNumberOfExecutionRetries(Integer retries); + + @Description("Sets the delay between executions. A value of {@code -1} indicates that the default value should be used.") + @Default.Long(-1L) + Long getExecutionRetryDelay(); + void setExecutionRetryDelay(Long delay); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bc4c60eb/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkPipelineRunner.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkPipelineRunner.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkPipelineRunner.java index ebd2691..7ea8370 100644 --- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkPipelineRunner.java +++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkPipelineRunner.java @@ -54,7 +54,7 @@ public class FlinkPipelineRunner extends PipelineRunner<FlinkRunnerResult> { */ private final FlinkPipelineOptions options; - private final FlinkJobExecutionEnvironment flinkJobEnv; + private final FlinkPipelineExecutionEnvironment flinkJobEnv; /** * Construct a runner from the provided options. @@ -103,7 +103,7 @@ public class FlinkPipelineRunner extends PipelineRunner<FlinkRunnerResult> { private FlinkPipelineRunner(FlinkPipelineOptions options) { this.options = options; - this.flinkJobEnv = new FlinkJobExecutionEnvironment(options); + this.flinkJobEnv = new FlinkPipelineExecutionEnvironment(options); } @Override @@ -118,7 +118,7 @@ public class FlinkPipelineRunner extends PipelineRunner<FlinkRunnerResult> { JobExecutionResult result; try { - result = this.flinkJobEnv.executeJob(); + result = this.flinkJobEnv.executePipeline(); } catch (Exception e) { LOG.error("Pipeline execution failed", e); throw new RuntimeException("Pipeline execution failed", e); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bc4c60eb/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/AutoComplete.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/AutoComplete.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/AutoComplete.java index 711d9fb..493fb25 100644 --- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/AutoComplete.java +++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/AutoComplete.java @@ -325,7 +325,9 @@ public class AutoComplete { * Takes as input a the top candidates per prefix, and emits an entity * suitable for writing to Datastore. */ - static class FormatForPerTaskLocalFile extends DoFn<KV<String, List<CompletionCandidate>>, String> { + static class FormatForPerTaskLocalFile extends DoFn<KV<String, List<CompletionCandidate>>, String> + implements DoFn.RequiresWindowAccess{ + private static final long serialVersionUID = 0; @Override @@ -357,6 +359,9 @@ public class AutoComplete { public static void main(String[] args) throws IOException { Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class); options.setStreaming(true); + options.setCheckpointingInterval(1000L); + options.setNumberOfExecutionRetries(5); + options.setExecutionRetryDelay(3000L); options.setRunner(FlinkPipelineRunner.class); PTransform<? super PBegin, PCollection<String>> readSource = http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bc4c60eb/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/JoinExamples.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/JoinExamples.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/JoinExamples.java index 9a5db64..60f6788 100644 --- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/JoinExamples.java +++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/JoinExamples.java @@ -124,9 +124,10 @@ public class JoinExamples { public static void main(String[] args) throws Exception { Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class); - - // make it a streaming example. options.setStreaming(true); + options.setCheckpointingInterval(1000L); + options.setNumberOfExecutionRetries(5); + options.setExecutionRetryDelay(3000L); options.setRunner(FlinkPipelineRunner.class); PTransform<? super PBegin, PCollection<String>> readSourceA = http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bc4c60eb/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/KafkaWindowedWordCountExample.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/KafkaWindowedWordCountExample.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/KafkaWindowedWordCountExample.java index 42d3d88..dba2721 100644 --- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/KafkaWindowedWordCountExample.java +++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/KafkaWindowedWordCountExample.java @@ -104,6 +104,9 @@ public class KafkaWindowedWordCountExample { KafkaStreamingWordCountOptions options = PipelineOptionsFactory.fromArgs(args).as(KafkaStreamingWordCountOptions.class); options.setJobName("KafkaExample"); options.setStreaming(true); + options.setCheckpointingInterval(1000L); + options.setNumberOfExecutionRetries(5); + options.setExecutionRetryDelay(3000L); options.setRunner(FlinkPipelineRunner.class); System.out.println(options.getKafkaTopic() +" "+ options.getZookeeper() +" "+ options.getBroker() +" "+ options.getGroup() ); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bc4c60eb/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/WindowedWordCount.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/WindowedWordCount.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/WindowedWordCount.java index b539245..37dc39a 100644 --- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/WindowedWordCount.java +++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/examples/streaming/WindowedWordCount.java @@ -99,6 +99,9 @@ public class WindowedWordCount { options.setStreaming(true); options.setWindowSize(10L); options.setSlide(5L); + options.setCheckpointingInterval(1000L); + options.setNumberOfExecutionRetries(5); + options.setExecutionRetryDelay(3000L); options.setRunner(FlinkPipelineRunner.class); LOG.info("Windpwed WordCount with Sliding Windows of " + options.getWindowSize() + http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bc4c60eb/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkStreamingTransformTranslators.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkStreamingTransformTranslators.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkStreamingTransformTranslators.java index 46d3e36..27cc923 100644 --- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkStreamingTransformTranslators.java +++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkStreamingTransformTranslators.java @@ -69,7 +69,6 @@ public class FlinkStreamingTransformTranslators { // here you can find all the available translators. static { - TRANSLATORS.put(Create.Values.class, new CreateStreamingTranslator()); TRANSLATORS.put(Read.Unbounded.class, new UnboundedReadSourceTranslator()); TRANSLATORS.put(ParDo.Bound.class, new ParDoBoundStreamingTranslator()); @@ -79,7 +78,6 @@ public class FlinkStreamingTransformTranslators { TRANSLATORS.put(Combine.PerKey.class, new CombinePerKeyTranslator()); TRANSLATORS.put(Flatten.FlattenPCollectionList.class, new FlattenPCollectionTranslator()); TRANSLATORS.put(ParDo.BoundMulti.class, new ParDoBoundMultiStreamingTranslator()); - } public static FlinkStreamingPipelineTranslator.StreamTransformTranslator<?> getTranslator(PTransform<?, ?> transform) { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bc4c60eb/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkStreamingTranslationContext.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkStreamingTranslationContext.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkStreamingTranslationContext.java index df68e50..7c4ab93 100644 --- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkStreamingTranslationContext.java +++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkStreamingTranslationContext.java @@ -18,7 +18,10 @@ package com.dataartisans.flink.dataflow.translation; import com.google.cloud.dataflow.sdk.options.PipelineOptions; import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform; import com.google.cloud.dataflow.sdk.transforms.PTransform; -import com.google.cloud.dataflow.sdk.values.*; +import com.google.cloud.dataflow.sdk.values.PInput; +import com.google.cloud.dataflow.sdk.values.POutput; +import com.google.cloud.dataflow.sdk.values.PValue; +import com.google.common.base.Preconditions; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -40,8 +43,8 @@ public class FlinkStreamingTranslationContext { private AppliedPTransform<?, ?, ?> currentTransform; public FlinkStreamingTranslationContext(StreamExecutionEnvironment env, PipelineOptions options) { - this.env = env; - this.options = options; + this.env = Preconditions.checkNotNull(env); + this.options = Preconditions.checkNotNull(options); this.dataStreams = new HashMap<>(); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bc4c60eb/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java index 71f9c7f..dfb2b7d 100644 --- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java +++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java @@ -116,10 +116,10 @@ public abstract class FlinkAbstractParDoWrapper<IN, OUTDF, OUTFL> extends RichFl @Override public BoundedWindow window() { -// if (!(fn instanceof DoFn.RequiresWindowAccess)) { -// throw new UnsupportedOperationException( -// "window() is only available in the context of a DoFn marked as RequiresWindow."); -// } + if (!(fn instanceof DoFn.RequiresWindowAccess)) { + throw new UnsupportedOperationException( + "window() is only available in the context of a DoFn marked as RequiresWindow."); + } Collection<? extends BoundedWindow> windows = this.element.getWindows(); if (windows.size() != 1) { @@ -211,7 +211,7 @@ public abstract class FlinkAbstractParDoWrapper<IN, OUTDF, OUTFL> extends RichFl @Override public Object element() { throw new UnsupportedOperationException( - "WindowFn attempted to access input element when none was available"); // TODO: 12/16/15 aljoscha's comment in slack + "WindowFn attempted to access input element when none was available"); } @Override http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bc4c60eb/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java index 0f0a9d0..b78db65 100644 --- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java +++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java @@ -238,9 +238,7 @@ public class FlinkGroupAlsoByWindowWrapper<K, VIN, VACC, VOUT> this.operator = StreamingGroupAlsoByWindowsDoFn.createForIterable( this.windowingStrategy, inputValueCoder); } else { - Coder<K> inputKeyCoder = inputKvCoder.getKeyCoder(); - //CoderRegistry dataflowRegistry = input.getPipeline().getCoderRegistry(); AppliedCombineFn<K, VIN, VACC, VOUT> appliedCombineFn = AppliedCombineFn .withInputCoder(combineFn, coderRegistry, inputKvCoder); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bc4c60eb/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/io/FlinkStreamingCreateFunction.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/io/FlinkStreamingCreateFunction.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/io/FlinkStreamingCreateFunction.java index b8824f5..c952d6f 100644 --- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/io/FlinkStreamingCreateFunction.java +++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/io/FlinkStreamingCreateFunction.java @@ -22,6 +22,7 @@ import com.google.cloud.dataflow.sdk.transforms.windowing.PaneInfo; import com.google.cloud.dataflow.sdk.util.WindowedValue; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.util.Collector; +import org.joda.time.Instant; import java.io.ByteArrayInputStream; import java.util.List; @@ -44,17 +45,15 @@ public class FlinkStreamingCreateFunction<IN, OUT> implements FlatMapFunction<IN public void flatMap(IN value, Collector<WindowedValue<OUT>> out) throws Exception { @SuppressWarnings("unchecked") - // TODO Flink doesn't allow null values in records OUT voidValue = (OUT) VoidCoderTypeSerializer.VoidValue.INSTANCE; - for (byte[] element : elements) { ByteArrayInputStream bai = new ByteArrayInputStream(element); OUT outValue = coder.decode(bai, Coder.Context.OUTER); if (outValue == null) { - out.collect(WindowedValue.of(voidValue, GlobalWindow.TIMESTAMP_MIN_VALUE, GlobalWindow.INSTANCE, PaneInfo.NO_FIRING)); + out.collect(WindowedValue.of(voidValue, Instant.now(), GlobalWindow.INSTANCE, PaneInfo.NO_FIRING)); } else { - out.collect(WindowedValue.of(outValue, GlobalWindow.TIMESTAMP_MIN_VALUE, GlobalWindow.INSTANCE, PaneInfo.NO_FIRING)); + out.collect(WindowedValue.of(outValue, Instant.now(), GlobalWindow.INSTANCE, PaneInfo.NO_FIRING)); } } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bc4c60eb/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/io/UnboundedSourceWrapper.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/io/UnboundedSourceWrapper.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/io/UnboundedSourceWrapper.java index 3e248a6..cdc2e95 100644 --- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/io/UnboundedSourceWrapper.java +++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/io/UnboundedSourceWrapper.java @@ -38,7 +38,7 @@ public class UnboundedSourceWrapper<T> extends RichSourceFunction<WindowedValue< private final UnboundedSource.UnboundedReader<T> reader; private StreamingRuntimeContext runtime = null; - private StreamSource.ManualWatermarkContext<T> context = null; + private StreamSource.ManualWatermarkContext<WindowedValue<T>> context = null; private volatile boolean isRunning = false; @@ -51,8 +51,7 @@ public class UnboundedSourceWrapper<T> extends RichSourceFunction<WindowedValue< return this.name; } - WindowedValue<T> makeWindowedValue( - T output, Instant timestamp, Collection<? extends BoundedWindow> windows, PaneInfo pane) { + WindowedValue<T> makeWindowedValue(T output, Instant timestamp) { if (timestamp == null) { timestamp = BoundedWindow.TIMESTAMP_MIN_VALUE; } @@ -66,7 +65,7 @@ public class UnboundedSourceWrapper<T> extends RichSourceFunction<WindowedValue< "Apparently " + this.name + " is not. Probably you should consider writing your own Wrapper for this source."); } - context = (StreamSource.ManualWatermarkContext<T>) ctx; + context = (StreamSource.ManualWatermarkContext<WindowedValue<T>>) ctx; runtime = (StreamingRuntimeContext) getRuntimeContext(); this.isRunning = reader.start(); @@ -78,11 +77,9 @@ public class UnboundedSourceWrapper<T> extends RichSourceFunction<WindowedValue< T item = reader.getCurrent(); Instant timestamp = reader.getCurrentTimestamp(); - long milliseconds = timestamp.getMillis(); - // write it to the output collector synchronized (ctx.getCheckpointLock()) { - ctx.collectWithTimestamp(makeWindowedValue(item, timestamp, null, PaneInfo.NO_FIRING), milliseconds); + context.collectWithTimestamp(makeWindowedValue(item, timestamp), timestamp.getMillis()); } // try to go to the next record http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bc4c60eb/runners/flink/src/test/java/com/dataartisans/flink/dataflow/AvroITCase.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/AvroITCase.java b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/AvroITCase.java index c6e3e99..2b1f091 100644 --- a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/AvroITCase.java +++ b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/AvroITCase.java @@ -56,14 +56,14 @@ public class AvroITCase extends JavaProgramTestBase { } private static void runProgram(String tmpPath, String resultPath) { - Pipeline p = FlinkTestPipeline.create(); + Pipeline p = FlinkTestPipeline.createForBatch(); p.apply(Create.of(new User("Joe", 3, "red"), new User("Mary", 4, "blue")).withCoder(AvroCoder.of(User.class))) .apply(AvroIO.Write.to(tmpPath).withSchema(User.class)); p.run(); - p = FlinkTestPipeline.create(); + p = FlinkTestPipeline.createForBatch(); p.apply(AvroIO.Read.from(tmpPath).withSchema(User.class)) .apply(ParDo.of(new DoFn<User, String>() { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bc4c60eb/runners/flink/src/test/java/com/dataartisans/flink/dataflow/FlattenizeITCase.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/FlattenizeITCase.java b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/FlattenizeITCase.java index bc24514..928388c 100644 --- a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/FlattenizeITCase.java +++ b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/FlattenizeITCase.java @@ -51,7 +51,7 @@ public class FlattenizeITCase extends JavaProgramTestBase { @Override protected void testProgram() throws Exception { - Pipeline p = FlinkTestPipeline.create(); + Pipeline p = FlinkTestPipeline.createForBatch(); PCollection<String> p1 = p.apply(Create.of(words)); PCollection<String> p2 = p.apply(Create.of(words2)); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bc4c60eb/runners/flink/src/test/java/com/dataartisans/flink/dataflow/FlinkTestPipeline.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/FlinkTestPipeline.java b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/FlinkTestPipeline.java index 109b1ff..56af3f1 100644 --- a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/FlinkTestPipeline.java +++ b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/FlinkTestPipeline.java @@ -32,7 +32,7 @@ public class FlinkTestPipeline extends Pipeline { * <p> Use {@link com.google.cloud.dataflow.sdk.testing.DataflowAssert} to add tests, then call * {@link Pipeline#run} to execute the pipeline and check the tests. */ - public static FlinkTestPipeline create() { + public static FlinkTestPipeline createForBatch() { return create(false); } @@ -44,7 +44,7 @@ public class FlinkTestPipeline extends Pipeline { * * @return The Test Pipeline */ - public static FlinkTestPipeline createStreaming() { + public static FlinkTestPipeline createForStreaming() { return create(true); } @@ -54,18 +54,18 @@ public class FlinkTestPipeline extends Pipeline { * <p> Use {@link com.google.cloud.dataflow.sdk.testing.DataflowAssert} to add tests, then call * {@link Pipeline#run} to execute the pipeline and check the tests. * - * @param streaming True for streaming mode, False for batch - * @return The Test Pipeline + * @param streaming <code>True</code> for streaming mode, <code>False</code> for batch. + * @return The Test Pipeline. */ - public static FlinkTestPipeline create(boolean streaming) { + private static FlinkTestPipeline create(boolean streaming) { FlinkPipelineRunner flinkRunner = FlinkPipelineRunner.createForTest(streaming); FlinkPipelineOptions pipelineOptions = flinkRunner.getPipelineOptions(); pipelineOptions.setStreaming(streaming); return new FlinkTestPipeline(flinkRunner, pipelineOptions); } - private FlinkTestPipeline(PipelineRunner<? extends PipelineResult> runner, PipelineOptions - options) { + private FlinkTestPipeline(PipelineRunner<? extends PipelineResult> runner, + PipelineOptions options) { super(runner, options); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bc4c60eb/runners/flink/src/test/java/com/dataartisans/flink/dataflow/JoinExamplesITCase.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/JoinExamplesITCase.java b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/JoinExamplesITCase.java index ed2ecf5..af0f217 100644 --- a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/JoinExamplesITCase.java +++ b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/JoinExamplesITCase.java @@ -84,7 +84,7 @@ public class JoinExamplesITCase extends JavaProgramTestBase { @Override protected void testProgram() throws Exception { - Pipeline p = FlinkTestPipeline.create(); + Pipeline p = FlinkTestPipeline.createForBatch(); PCollection<TableRow> input1 = p.apply(Create.of(EVENT_ARRAY)); PCollection<TableRow> input2 = p.apply(Create.of(CC_ARRAY)); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bc4c60eb/runners/flink/src/test/java/com/dataartisans/flink/dataflow/MaybeEmptyTestITCase.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/MaybeEmptyTestITCase.java b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/MaybeEmptyTestITCase.java index 29c34d4..35f2eaf 100644 --- a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/MaybeEmptyTestITCase.java +++ b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/MaybeEmptyTestITCase.java @@ -47,7 +47,7 @@ public class MaybeEmptyTestITCase extends JavaProgramTestBase implements Seriali @Override protected void testProgram() throws Exception { - Pipeline p = FlinkTestPipeline.create(); + Pipeline p = FlinkTestPipeline.createForBatch(); p.apply(Create.of((Void) null)).setCoder(VoidCoder.of()) .apply(ParDo.of( http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bc4c60eb/runners/flink/src/test/java/com/dataartisans/flink/dataflow/ParDoMultiOutputITCase.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/ParDoMultiOutputITCase.java b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/ParDoMultiOutputITCase.java index dbe88d2..ccdbbf9 100644 --- a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/ParDoMultiOutputITCase.java +++ b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/ParDoMultiOutputITCase.java @@ -47,7 +47,7 @@ public class ParDoMultiOutputITCase extends JavaProgramTestBase implements Seria @Override protected void testProgram() throws Exception { - Pipeline p = FlinkTestPipeline.create(); + Pipeline p = FlinkTestPipeline.createForBatch(); PCollection<String> words = p.apply(Create.of("Hello", "Whatupmyman", "hey", "SPECIALthere", "MAAA", "MAAFOOO")); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bc4c60eb/runners/flink/src/test/java/com/dataartisans/flink/dataflow/ReadSourceITCase.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/ReadSourceITCase.java b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/ReadSourceITCase.java index ba675b1..39f54e4 100644 --- a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/ReadSourceITCase.java +++ b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/ReadSourceITCase.java @@ -61,7 +61,7 @@ public class ReadSourceITCase extends JavaProgramTestBase { private static void runProgram(String resultPath) { - Pipeline p = FlinkTestPipeline.create(); + Pipeline p = FlinkTestPipeline.createForBatch(); PCollection<String> result = p .apply(Read.from(new ReadSource(1, 10))) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bc4c60eb/runners/flink/src/test/java/com/dataartisans/flink/dataflow/RemoveDuplicatesEmptyITCase.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/RemoveDuplicatesEmptyITCase.java b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/RemoveDuplicatesEmptyITCase.java index ff59db7..db794f7 100644 --- a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/RemoveDuplicatesEmptyITCase.java +++ b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/RemoveDuplicatesEmptyITCase.java @@ -52,7 +52,7 @@ public class RemoveDuplicatesEmptyITCase extends JavaProgramTestBase { List<String> strings = Collections.emptyList(); - Pipeline p = FlinkTestPipeline.create(); + Pipeline p = FlinkTestPipeline.createForBatch(); PCollection<String> input = p.apply(Create.of(strings)) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bc4c60eb/runners/flink/src/test/java/com/dataartisans/flink/dataflow/RemoveDuplicatesITCase.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/RemoveDuplicatesITCase.java b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/RemoveDuplicatesITCase.java index a8200aa..04e06b8 100644 --- a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/RemoveDuplicatesITCase.java +++ b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/RemoveDuplicatesITCase.java @@ -53,7 +53,7 @@ public class RemoveDuplicatesITCase extends JavaProgramTestBase { List<String> strings = Arrays.asList("k1", "k5", "k5", "k2", "k1", "k2", "k3"); - Pipeline p = FlinkTestPipeline.create(); + Pipeline p = FlinkTestPipeline.createForBatch(); PCollection<String> input = p.apply(Create.of(strings)) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bc4c60eb/runners/flink/src/test/java/com/dataartisans/flink/dataflow/SideInputITCase.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/SideInputITCase.java b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/SideInputITCase.java index d932c80..ee8843c 100644 --- a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/SideInputITCase.java +++ b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/SideInputITCase.java @@ -36,7 +36,7 @@ public class SideInputITCase extends JavaProgramTestBase implements Serializable protected void testProgram() throws Exception { - Pipeline p = FlinkTestPipeline.create(); + Pipeline p = FlinkTestPipeline.createForBatch(); final PCollectionView<String> sidesInput = p http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bc4c60eb/runners/flink/src/test/java/com/dataartisans/flink/dataflow/TfIdfITCase.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/TfIdfITCase.java b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/TfIdfITCase.java index e801ac4..1b4afb3 100644 --- a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/TfIdfITCase.java +++ b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/TfIdfITCase.java @@ -53,7 +53,7 @@ public class TfIdfITCase extends JavaProgramTestBase { @Override protected void testProgram() throws Exception { - Pipeline pipeline = FlinkTestPipeline.create(); + Pipeline pipeline = FlinkTestPipeline.createForBatch(); pipeline.getCoderRegistry().registerCoder(URI.class, StringDelegateCoder.of(URI.class)); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bc4c60eb/runners/flink/src/test/java/com/dataartisans/flink/dataflow/TopWikipediaSessionsITCase.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/TopWikipediaSessionsITCase.java b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/TopWikipediaSessionsITCase.java deleted file mode 100644 index eb020c5..0000000 --- a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/TopWikipediaSessionsITCase.java +++ /dev/null @@ -1,144 +0,0 @@ -/* - * Copyright 2015 Data Artisans GmbH - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.dataartisans.flink.dataflow; - -import com.google.api.services.bigquery.model.TableRow; -import com.google.cloud.dataflow.sdk.Pipeline; -import com.google.cloud.dataflow.sdk.io.TextIO; -import com.google.cloud.dataflow.sdk.transforms.Count; -import com.google.cloud.dataflow.sdk.transforms.Create; -import com.google.cloud.dataflow.sdk.transforms.DoFn; -import com.google.cloud.dataflow.sdk.transforms.ParDo; -import com.google.cloud.dataflow.sdk.transforms.windowing.Sessions; -import com.google.cloud.dataflow.sdk.transforms.windowing.Window; -import com.google.cloud.dataflow.sdk.values.KV; -import com.google.cloud.dataflow.sdk.values.PCollection; -import com.google.common.base.Joiner; -import org.apache.flink.streaming.util.StreamingProgramTestBase; -import org.joda.time.Duration; -import org.joda.time.Instant; - -import java.io.Serializable; -import java.util.Arrays; - - -/** - * Session window test - */ -public class TopWikipediaSessionsITCase extends StreamingProgramTestBase implements Serializable { - protected String resultPath; - - public TopWikipediaSessionsITCase(){ - } - - static final String[] EXPECTED_RESULT = new String[] { - "user: user1 value:3", - "user: user1 value:1", - "user: user2 value:4", - "user: user2 value:6", - "user: user3 value:7", - "user: user3 value:2" - }; - - @Override - protected void preSubmit() throws Exception { - resultPath = getTempDirPath("result"); - } - - @Override - protected void postSubmit() throws Exception { - compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), resultPath); - } - - @Override - protected void testProgram() throws Exception { - - Pipeline p = FlinkTestPipeline.createStreaming(); - - long now = System.currentTimeMillis() + 10000; - System.out.println((now + 5000) / 1000); - - PCollection<KV<String, Long>> output = - p.apply(Create.of(Arrays.asList(new TableRow().set("timestamp", now).set - ("contributor_username", "user1"), new TableRow().set("timestamp", now + 10).set - ("contributor_username", "user3"), new TableRow().set("timestamp", now).set - ("contributor_username", "user2"), new TableRow().set("timestamp", now).set - ("contributor_username", "user1"), new TableRow().set("timestamp", now + 2).set - ("contributor_username", "user1"), new TableRow().set("timestamp", now).set - ("contributor_username", "user2"), new TableRow().set("timestamp", now + 1).set - ("contributor_username", "user2"), new TableRow().set("timestamp", now + 5).set - ("contributor_username", "user2"), new TableRow().set("timestamp", now + 7).set - ("contributor_username", "user2"), new TableRow().set("timestamp", now + 8).set - ("contributor_username", "user2"), new TableRow().set("timestamp", now + 200).set - ("contributor_username", "user2"), new TableRow().set("timestamp", now + 230).set - ("contributor_username", "user1"), new TableRow().set("timestamp", now + 230).set - ("contributor_username", "user2"), new TableRow().set("timestamp", now + 240).set - ("contributor_username", "user2"), new TableRow().set("timestamp", now + 245).set - ("contributor_username", "user3"), new TableRow().set("timestamp", now + 235).set - ("contributor_username", "user3"), new TableRow().set("timestamp", now + 236).set - ("contributor_username", "user3"), new TableRow().set("timestamp", now + 237).set - ("contributor_username", "user3"), new TableRow().set("timestamp", now + 238).set - ("contributor_username", "user3"), new TableRow().set("timestamp", now + 239).set - ("contributor_username", "user3"), new TableRow().set("timestamp", now + 240).set - ("contributor_username", "user3"), new TableRow().set("timestamp", now + 241).set - ("contributor_username", "user2"), new TableRow().set("timestamp", now) - .set("contributor_username", "user3")))) - - - - .apply(ParDo.of(new DoFn<TableRow, String>() { - @Override - public void processElement(ProcessContext c) throws Exception { - TableRow row = c.element(); - long timestamp = (Long) row.get("timestamp"); - String userName = (String) row.get("contributor_username"); - if (userName != null) { - // Sets the timestamp field to be used in windowing. - c.outputWithTimestamp(userName, new Instant(timestamp * 1000L)); - } - } - })) - - .apply(ParDo.named("SampleUsers").of( - new DoFn<String, String>() { - private static final long serialVersionUID = 0; - - @Override - public void processElement(ProcessContext c) { - if (Math.abs(c.element().hashCode()) <= Integer.MAX_VALUE * 1.0) { - c.output(c.element()); - } - } - })) - - .apply(Window.<String>into(Sessions.withGapDuration(Duration.standardMinutes(1)))) - .apply(Count.<String>perElement()); - - PCollection<String> format = output.apply(ParDo.of(new DoFn<KV<String, Long>, String>() { - @Override - public void processElement(ProcessContext c) throws Exception { - KV<String, Long> el = c.element(); - String out = "user: " + el.getKey() + " value:" + el.getValue(); - System.out.println(out); - c.output(out); - } - })); - - format.apply(TextIO.Write.to(resultPath)); - - p.run(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bc4c60eb/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WordCountITCase.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WordCountITCase.java b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WordCountITCase.java index 9427ab6..5ddd379 100644 --- a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WordCountITCase.java +++ b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WordCountITCase.java @@ -58,7 +58,7 @@ public class WordCountITCase extends JavaProgramTestBase { @Override protected void testProgram() throws Exception { - Pipeline p = FlinkTestPipeline.create(); + Pipeline p = FlinkTestPipeline.createForBatch(); PCollection<String> input = p.apply(Create.of(WORDS)).setCoder(StringUtf8Coder.of()); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bc4c60eb/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WordCountJoin2ITCase.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WordCountJoin2ITCase.java b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WordCountJoin2ITCase.java index c3eed61..ccc52c4 100644 --- a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WordCountJoin2ITCase.java +++ b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WordCountJoin2ITCase.java @@ -70,7 +70,7 @@ public class WordCountJoin2ITCase extends JavaProgramTestBase { @Override protected void testProgram() throws Exception { - Pipeline p = FlinkTestPipeline.create(); + Pipeline p = FlinkTestPipeline.createForBatch(); /* Create two PCollections and join them */ PCollection<KV<String,Long>> occurences1 = p.apply(Create.of(WORDS_1)) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bc4c60eb/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WordCountJoin3ITCase.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WordCountJoin3ITCase.java b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WordCountJoin3ITCase.java index 33e67cc..e6eddc0 100644 --- a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WordCountJoin3ITCase.java +++ b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WordCountJoin3ITCase.java @@ -80,7 +80,7 @@ public class WordCountJoin3ITCase extends JavaProgramTestBase { @Override protected void testProgram() throws Exception { - Pipeline p = FlinkTestPipeline.create(); + Pipeline p = FlinkTestPipeline.createForBatch(); /* Create two PCollections and join them */ PCollection<KV<String,Long>> occurences1 = p.apply(Create.of(WORDS_1)) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bc4c60eb/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WriteSinkITCase.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WriteSinkITCase.java b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WriteSinkITCase.java index 205fe9b..865fc5f 100644 --- a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WriteSinkITCase.java +++ b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WriteSinkITCase.java @@ -63,7 +63,7 @@ public class WriteSinkITCase extends JavaProgramTestBase { } private static void runProgram(String resultPath) { - Pipeline p = FlinkTestPipeline.create(); + Pipeline p = FlinkTestPipeline.createForBatch(); p.apply(Create.of(EXPECTED_RESULT)).setCoder(StringUtf8Coder.of()) .apply("CustomSink", Write.to(new MyCustomSink(resultPath))); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bc4c60eb/runners/flink/src/test/java/com/dataartisans/flink/dataflow/streaming/GroupAlsoByWindowTest.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/streaming/GroupAlsoByWindowTest.java b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/streaming/GroupAlsoByWindowTest.java index b667187..1f36ee7 100644 --- a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/streaming/GroupAlsoByWindowTest.java +++ b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/streaming/GroupAlsoByWindowTest.java @@ -86,7 +86,7 @@ public class GroupAlsoByWindowTest { .withMode(WindowingStrategy.AccumulationMode.ACCUMULATING_FIRED_PANES) .withAllowedLateness(Duration.millis(1000)); long initialTime = 0L; - Pipeline pipeline = FlinkTestPipeline.create(); + Pipeline pipeline = FlinkTestPipeline.createForStreaming(); KvCoder<String, Integer> inputCoder = KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()); @@ -145,7 +145,7 @@ public class GroupAlsoByWindowTest { WindowingStrategy strategy = sessionWindowingStrategy; long initialTime = 0L; - Pipeline pipeline = FlinkTestPipeline.create(); + Pipeline pipeline = FlinkTestPipeline.createForStreaming(); KvCoder<String, Integer> inputCoder = KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()); @@ -382,7 +382,7 @@ public class GroupAlsoByWindowTest { } private OneInputStreamOperatorTestHarness createTestingOperatorAndState(WindowingStrategy strategy, long initialTime) throws Exception { - Pipeline pipeline = FlinkTestPipeline.create(); + Pipeline pipeline = FlinkTestPipeline.createForStreaming(); KvCoder<String, Integer> inputCoder = KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()); @@ -478,7 +478,7 @@ public class GroupAlsoByWindowTest { @Override public Object element() { throw new UnsupportedOperationException( - "WindowFn attempted to access input element when none was available"); // TODO: 12/16/15 aljoscha's comment in slack + "WindowFn attempted to access input element when none was available"); } @Override http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bc4c60eb/runners/flink/src/test/java/com/dataartisans/flink/dataflow/streaming/TopWikipediaSessionsITCase.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/streaming/TopWikipediaSessionsITCase.java b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/streaming/TopWikipediaSessionsITCase.java new file mode 100644 index 0000000..1c800fa --- /dev/null +++ b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/streaming/TopWikipediaSessionsITCase.java @@ -0,0 +1,145 @@ +/* + * Copyright 2015 Data Artisans GmbH + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.dataartisans.flink.dataflow.streaming; + +import com.dataartisans.flink.dataflow.FlinkTestPipeline; +import com.google.api.services.bigquery.model.TableRow; +import com.google.cloud.dataflow.sdk.Pipeline; +import com.google.cloud.dataflow.sdk.io.TextIO; +import com.google.cloud.dataflow.sdk.transforms.Count; +import com.google.cloud.dataflow.sdk.transforms.Create; +import com.google.cloud.dataflow.sdk.transforms.DoFn; +import com.google.cloud.dataflow.sdk.transforms.ParDo; +import com.google.cloud.dataflow.sdk.transforms.windowing.Sessions; +import com.google.cloud.dataflow.sdk.transforms.windowing.Window; +import com.google.cloud.dataflow.sdk.values.KV; +import com.google.cloud.dataflow.sdk.values.PCollection; +import com.google.common.base.Joiner; +import org.apache.flink.streaming.util.StreamingProgramTestBase; +import org.joda.time.Duration; +import org.joda.time.Instant; + +import java.io.Serializable; +import java.util.Arrays; + + +/** + * Session window test + */ +public class TopWikipediaSessionsITCase extends StreamingProgramTestBase implements Serializable { + protected String resultPath; + + public TopWikipediaSessionsITCase(){ + } + + static final String[] EXPECTED_RESULT = new String[] { + "user: user1 value:3", + "user: user1 value:1", + "user: user2 value:4", + "user: user2 value:6", + "user: user3 value:7", + "user: user3 value:2" + }; + + @Override + protected void preSubmit() throws Exception { + resultPath = getTempDirPath("result"); + } + + @Override + protected void postSubmit() throws Exception { + compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), resultPath); + } + + @Override + protected void testProgram() throws Exception { + + Pipeline p = FlinkTestPipeline.createForStreaming(); + + long now = System.currentTimeMillis() + 10000; + System.out.println((now + 5000) / 1000); + + PCollection<KV<String, Long>> output = + p.apply(Create.of(Arrays.asList(new TableRow().set("timestamp", now).set + ("contributor_username", "user1"), new TableRow().set("timestamp", now + 10).set + ("contributor_username", "user3"), new TableRow().set("timestamp", now).set + ("contributor_username", "user2"), new TableRow().set("timestamp", now).set + ("contributor_username", "user1"), new TableRow().set("timestamp", now + 2).set + ("contributor_username", "user1"), new TableRow().set("timestamp", now).set + ("contributor_username", "user2"), new TableRow().set("timestamp", now + 1).set + ("contributor_username", "user2"), new TableRow().set("timestamp", now + 5).set + ("contributor_username", "user2"), new TableRow().set("timestamp", now + 7).set + ("contributor_username", "user2"), new TableRow().set("timestamp", now + 8).set + ("contributor_username", "user2"), new TableRow().set("timestamp", now + 200).set + ("contributor_username", "user2"), new TableRow().set("timestamp", now + 230).set + ("contributor_username", "user1"), new TableRow().set("timestamp", now + 230).set + ("contributor_username", "user2"), new TableRow().set("timestamp", now + 240).set + ("contributor_username", "user2"), new TableRow().set("timestamp", now + 245).set + ("contributor_username", "user3"), new TableRow().set("timestamp", now + 235).set + ("contributor_username", "user3"), new TableRow().set("timestamp", now + 236).set + ("contributor_username", "user3"), new TableRow().set("timestamp", now + 237).set + ("contributor_username", "user3"), new TableRow().set("timestamp", now + 238).set + ("contributor_username", "user3"), new TableRow().set("timestamp", now + 239).set + ("contributor_username", "user3"), new TableRow().set("timestamp", now + 240).set + ("contributor_username", "user3"), new TableRow().set("timestamp", now + 241).set + ("contributor_username", "user2"), new TableRow().set("timestamp", now) + .set("contributor_username", "user3")))) + + + + .apply(ParDo.of(new DoFn<TableRow, String>() { + @Override + public void processElement(ProcessContext c) throws Exception { + TableRow row = c.element(); + long timestamp = (Long) row.get("timestamp"); + String userName = (String) row.get("contributor_username"); + if (userName != null) { + // Sets the timestamp field to be used in windowing. + c.outputWithTimestamp(userName, new Instant(timestamp * 1000L)); + } + } + })) + + .apply(ParDo.named("SampleUsers").of( + new DoFn<String, String>() { + private static final long serialVersionUID = 0; + + @Override + public void processElement(ProcessContext c) { + if (Math.abs(c.element().hashCode()) <= Integer.MAX_VALUE * 1.0) { + c.output(c.element()); + } + } + })) + + .apply(Window.<String>into(Sessions.withGapDuration(Duration.standardMinutes(1)))) + .apply(Count.<String>perElement()); + + PCollection<String> format = output.apply(ParDo.of(new DoFn<KV<String, Long>, String>() { + @Override + public void processElement(ProcessContext c) throws Exception { + KV<String, Long> el = c.element(); + String out = "user: " + el.getKey() + " value:" + el.getValue(); + System.out.println(out); + c.output(out); + } + })); + + format.apply(TextIO.Write.to(resultPath)); + + p.run(); + } +}
