http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b9724454/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeInformation.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeInformation.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeInformation.java index f9d4dcd..895ecef 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeInformation.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeInformation.java @@ -17,18 +17,20 @@ */ package org.apache.beam.runners.flink.translation.types; -import com.google.cloud.dataflow.sdk.coders.Coder; -import com.google.cloud.dataflow.sdk.coders.VoidCoder; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.VoidCoder; + +import com.google.common.base.Preconditions; + import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeinfo.AtomicType; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeComparator; import org.apache.flink.api.common.typeutils.TypeSerializer; -import com.google.common.base.Preconditions; /** * Flink {@link org.apache.flink.api.common.typeinfo.TypeInformation} for - * Dataflow {@link com.google.cloud.dataflow.sdk.coders.Coder}s. + * Dataflow {@link org.apache.beam.sdk.coders.Coder}s. */ public class CoderTypeInformation<T> extends TypeInformation<T> implements AtomicType<T> {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b9724454/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java index 4e81054..c6f3921 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java @@ -19,8 +19,9 @@ package org.apache.beam.runners.flink.translation.types; import org.apache.beam.runners.flink.translation.wrappers.DataInputViewWrapper; import org.apache.beam.runners.flink.translation.wrappers.DataOutputViewWrapper; -import com.google.cloud.dataflow.sdk.coders.Coder; -import com.google.cloud.dataflow.sdk.coders.CoderException; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderException; + import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; @@ -32,7 +33,7 @@ import java.io.ObjectInputStream; /** * Flink {@link org.apache.flink.api.common.typeutils.TypeSerializer} for - * Dataflow {@link com.google.cloud.dataflow.sdk.coders.Coder}s + * Dataflow {@link org.apache.beam.sdk.coders.Coder}s */ public class CoderTypeSerializer<T> extends TypeSerializer<T> { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b9724454/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/KvCoderComperator.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/KvCoderComperator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/KvCoderComperator.java index 3912295..6f0c651 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/KvCoderComperator.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/KvCoderComperator.java @@ -18,9 +18,10 @@ package org.apache.beam.runners.flink.translation.types; import org.apache.beam.runners.flink.translation.wrappers.DataInputViewWrapper; -import com.google.cloud.dataflow.sdk.coders.Coder; -import com.google.cloud.dataflow.sdk.coders.KvCoder; -import com.google.cloud.dataflow.sdk.values.KV; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.values.KV; + import org.apache.flink.api.common.typeutils.TypeComparator; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; @@ -31,7 +32,7 @@ import java.io.ObjectInputStream; /** * Flink {@link org.apache.flink.api.common.typeutils.TypeComparator} for - * {@link com.google.cloud.dataflow.sdk.coders.KvCoder}. We have a special comparator + * {@link org.apache.beam.sdk.coders.KvCoder}. We have a special comparator * for {@link KV} that always compares on the key only. */ public class KvCoderComperator <K, V> extends TypeComparator<KV<K, V>> { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b9724454/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/KvCoderTypeInformation.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/KvCoderTypeInformation.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/KvCoderTypeInformation.java index 8862d48..74f3821 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/KvCoderTypeInformation.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/KvCoderTypeInformation.java @@ -17,20 +17,22 @@ */ package org.apache.beam.runners.flink.translation.types; -import com.google.cloud.dataflow.sdk.coders.KvCoder; -import com.google.cloud.dataflow.sdk.values.KV; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.values.KV; + +import com.google.common.base.Preconditions; + import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.CompositeType; import org.apache.flink.api.common.typeutils.TypeComparator; import org.apache.flink.api.common.typeutils.TypeSerializer; -import com.google.common.base.Preconditions; import java.util.List; /** * Flink {@link org.apache.flink.api.common.typeinfo.TypeInformation} for - * Dataflow {@link com.google.cloud.dataflow.sdk.coders.KvCoder}. + * Dataflow {@link org.apache.beam.sdk.coders.KvCoder}. */ public class KvCoderTypeInformation<K, V> extends CompositeType<KV<K, V>> { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b9724454/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/VoidCoderTypeSerializer.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/VoidCoderTypeSerializer.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/VoidCoderTypeSerializer.java index 8bc3620..7b48208 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/VoidCoderTypeSerializer.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/VoidCoderTypeSerializer.java @@ -25,7 +25,7 @@ import java.io.IOException; /** * Special Flink {@link org.apache.flink.api.common.typeutils.TypeSerializer} for - * {@link com.google.cloud.dataflow.sdk.coders.VoidCoder}. We need this because Flink does not + * {@link org.apache.beam.sdk.coders.VoidCoder}. We need this because Flink does not * allow returning {@code null} from an input reader. We return a {@link VoidValue} instead * that behaves like a {@code null}, hopefully. */ http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b9724454/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/CombineFnAggregatorWrapper.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/CombineFnAggregatorWrapper.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/CombineFnAggregatorWrapper.java index 445d411..e5567d3 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/CombineFnAggregatorWrapper.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/CombineFnAggregatorWrapper.java @@ -17,17 +17,19 @@ */ package org.apache.beam.runners.flink.translation.wrappers; -import com.google.cloud.dataflow.sdk.transforms.Aggregator; -import com.google.cloud.dataflow.sdk.transforms.Combine; +import org.apache.beam.sdk.transforms.Aggregator; +import org.apache.beam.sdk.transforms.Combine; + import com.google.common.collect.Lists; + import org.apache.flink.api.common.accumulators.Accumulator; import java.io.Serializable; /** - * Wrapper that wraps a {@link com.google.cloud.dataflow.sdk.transforms.Combine.CombineFn} + * Wrapper that wraps a {@link org.apache.beam.sdk.transforms.Combine.CombineFn} * in a Flink {@link org.apache.flink.api.common.accumulators.Accumulator} for using - * the combine function as an aggregator in a {@link com.google.cloud.dataflow.sdk.transforms.ParDo} + * the combine function as an aggregator in a {@link org.apache.beam.sdk.transforms.ParDo} * operation. */ public class CombineFnAggregatorWrapper<AI, AA, AR> implements Aggregator<AI, AR>, Accumulator<AI, Serializable> { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b9724454/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/DataInputViewWrapper.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/DataInputViewWrapper.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/DataInputViewWrapper.java index 6a3cf50..f1b8c73 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/DataInputViewWrapper.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/DataInputViewWrapper.java @@ -26,7 +26,7 @@ import java.io.InputStream; /** * Wrapper for {@link DataInputView}. We need this because Flink reads data using a * {@link org.apache.flink.core.memory.DataInputView} while - * Dataflow {@link com.google.cloud.dataflow.sdk.coders.Coder}s expect an + * Dataflow {@link org.apache.beam.sdk.coders.Coder}s expect an * {@link java.io.InputStream}. */ public class DataInputViewWrapper extends InputStream { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b9724454/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/DataOutputViewWrapper.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/DataOutputViewWrapper.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/DataOutputViewWrapper.java index 6bd2240..148f960 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/DataOutputViewWrapper.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/DataOutputViewWrapper.java @@ -25,7 +25,7 @@ import java.io.OutputStream; /** * Wrapper for {@link org.apache.flink.core.memory.DataOutputView}. We need this because * Flink writes data using a {@link org.apache.flink.core.memory.DataInputView} while - * Dataflow {@link com.google.cloud.dataflow.sdk.coders.Coder}s expect an + * Dataflow {@link org.apache.beam.sdk.coders.Coder}s expect an * {@link java.io.OutputStream}. */ public class DataOutputViewWrapper extends OutputStream { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b9724454/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SerializableFnAggregatorWrapper.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SerializableFnAggregatorWrapper.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SerializableFnAggregatorWrapper.java index 4409586..eb32fa2 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SerializableFnAggregatorWrapper.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SerializableFnAggregatorWrapper.java @@ -17,18 +17,20 @@ */ package org.apache.beam.runners.flink.translation.wrappers; -import com.google.cloud.dataflow.sdk.transforms.Aggregator; -import com.google.cloud.dataflow.sdk.transforms.Combine; +import org.apache.beam.sdk.transforms.Aggregator; +import org.apache.beam.sdk.transforms.Combine; + import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; + import org.apache.flink.api.common.accumulators.Accumulator; import java.io.Serializable; /** - * Wrapper that wraps a {@link com.google.cloud.dataflow.sdk.transforms.Combine.CombineFn} + * Wrapper that wraps a {@link org.apache.beam.sdk.transforms.Combine.CombineFn} * in a Flink {@link org.apache.flink.api.common.accumulators.Accumulator} for using - * the function as an aggregator in a {@link com.google.cloud.dataflow.sdk.transforms.ParDo} + * the function as an aggregator in a {@link org.apache.beam.sdk.transforms.ParDo} * operation. */ public class SerializableFnAggregatorWrapper<AI, AO> implements Aggregator<AI, AO>, Accumulator<AI, Serializable> { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b9724454/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SinkOutputFormat.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SinkOutputFormat.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SinkOutputFormat.java index 4c2475d..45267b8 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SinkOutputFormat.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SinkOutputFormat.java @@ -18,11 +18,14 @@ package org.apache.beam.runners.flink.translation.wrappers; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.cloud.dataflow.sdk.io.Sink; -import com.google.cloud.dataflow.sdk.options.PipelineOptions; +import org.apache.beam.sdk.io.Sink; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.Write; + import com.google.common.base.Preconditions; -import com.google.cloud.dataflow.sdk.transforms.Write; + +import com.fasterxml.jackson.databind.ObjectMapper; + import org.apache.flink.api.common.io.OutputFormat; import org.apache.flink.configuration.Configuration; import org.apache.flink.util.AbstractID; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b9724454/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java index cd5cd40..26e6297 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java @@ -17,11 +17,12 @@ */ package org.apache.beam.runners.flink.translation.wrappers; +import org.apache.beam.sdk.io.BoundedSource; +import org.apache.beam.sdk.io.Source; +import org.apache.beam.sdk.options.PipelineOptions; + import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.cloud.dataflow.sdk.coders.Coder; -import com.google.cloud.dataflow.sdk.io.BoundedSource; -import com.google.cloud.dataflow.sdk.io.Source; -import com.google.cloud.dataflow.sdk.options.PipelineOptions; + import org.apache.flink.api.common.io.InputFormat; import org.apache.flink.api.common.io.statistics.BaseStatistics; import org.apache.flink.configuration.Configuration; @@ -38,7 +39,7 @@ import java.util.List; /** * A Flink {@link org.apache.flink.api.common.io.InputFormat} that wraps a - * Dataflow {@link com.google.cloud.dataflow.sdk.io.Source}. + * Dataflow {@link org.apache.beam.sdk.io.Source}. */ public class SourceInputFormat<T> implements InputFormat<T, SourceInputSplit<T>> { private static final Logger LOG = LoggerFactory.getLogger(SourceInputFormat.class); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b9724454/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputSplit.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputSplit.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputSplit.java index cde2b35..c3672c0 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputSplit.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputSplit.java @@ -17,7 +17,8 @@ */ package org.apache.beam.runners.flink.translation.wrappers; -import com.google.cloud.dataflow.sdk.io.Source; +import org.apache.beam.sdk.io.Source; + import org.apache.flink.core.io.InputSplit; /** http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b9724454/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java index 10c8bbf..bb6ed67 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java @@ -18,18 +18,22 @@ package org.apache.beam.runners.flink.translation.wrappers.streaming; import org.apache.beam.runners.flink.translation.wrappers.SerializableFnAggregatorWrapper; -import com.google.cloud.dataflow.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.Aggregator; +import org.apache.beam.sdk.transforms.Combine; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.transforms.windowing.WindowFn; +import org.apache.beam.sdk.util.UserCodeException; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.util.WindowingInternals; +import org.apache.beam.sdk.util.WindowingStrategy; +import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.TupleTag; + import com.google.common.base.Preconditions; -import com.google.cloud.dataflow.sdk.transforms.Aggregator; -import com.google.cloud.dataflow.sdk.transforms.Combine; -import com.google.cloud.dataflow.sdk.transforms.DoFn; -import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow; -import com.google.cloud.dataflow.sdk.transforms.windowing.PaneInfo; -import com.google.cloud.dataflow.sdk.transforms.windowing.WindowFn; -import com.google.cloud.dataflow.sdk.util.*; -import com.google.cloud.dataflow.sdk.values.PCollectionView; -import com.google.cloud.dataflow.sdk.values.TupleTag; -import com.google.common.base.Throwables; + import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.api.common.accumulators.AccumulatorHelper; import org.apache.flink.api.common.functions.RichFlatMapFunction; @@ -40,8 +44,8 @@ import org.joda.time.format.PeriodFormat; import java.util.Collection; /** - * An abstract class that encapsulates the common code of the the {@link com.google.cloud.dataflow.sdk.transforms.ParDo.Bound} - * and {@link com.google.cloud.dataflow.sdk.transforms.ParDo.BoundMulti} wrappers. See the {@link FlinkParDoBoundWrapper} and + * An abstract class that encapsulates the common code of the the {@link org.apache.beam.sdk.transforms.ParDo.Bound} + * and {@link org.apache.beam.sdk.transforms.ParDo.BoundMulti} wrappers. See the {@link FlinkParDoBoundWrapper} and * {@link FlinkParDoBoundMultiWrapper} for the actual wrappers of the aforementioned transformations. * */ public abstract class FlinkAbstractParDoWrapper<IN, OUTDF, OUTFL> extends RichFlatMapFunction<WindowedValue<IN>, WindowedValue<OUTFL>> { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b9724454/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java index 3dc5a79..8e7493e 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java @@ -19,21 +19,41 @@ package org.apache.beam.runners.flink.translation.wrappers.streaming; import org.apache.beam.runners.flink.translation.types.CoderTypeInformation; import org.apache.beam.runners.flink.translation.wrappers.SerializableFnAggregatorWrapper; -import org.apache.beam.runners.flink.translation.wrappers.streaming.state.*; -import com.google.cloud.dataflow.sdk.coders.*; -import com.google.cloud.dataflow.sdk.options.PipelineOptions; -import com.google.cloud.dataflow.sdk.runners.PipelineRunner; -import com.google.cloud.dataflow.sdk.transforms.Aggregator; -import com.google.cloud.dataflow.sdk.transforms.Combine; -import com.google.cloud.dataflow.sdk.transforms.DoFn; -import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow; -import com.google.cloud.dataflow.sdk.transforms.windowing.OutputTimeFn; -import com.google.cloud.dataflow.sdk.transforms.windowing.PaneInfo; -import com.google.cloud.dataflow.sdk.util.*; -import com.google.cloud.dataflow.sdk.values.*; +import org.apache.beam.runners.flink.translation.wrappers.streaming.state.AbstractFlinkTimerInternals; +import org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkStateInternals; +import org.apache.beam.runners.flink.translation.wrappers.streaming.state.StateCheckpointReader; +import org.apache.beam.runners.flink.translation.wrappers.streaming.state.StateCheckpointUtils; +import org.apache.beam.runners.flink.translation.wrappers.streaming.state.StateCheckpointWriter; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderRegistry; +import org.apache.beam.sdk.coders.IterableCoder; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.runners.PipelineRunner; +import org.apache.beam.sdk.transforms.Aggregator; +import org.apache.beam.sdk.transforms.Combine; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.OutputTimeFn; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.util.AppliedCombineFn; +import org.apache.beam.sdk.util.GroupAlsoByWindowViaWindowSetDoFn; +import org.apache.beam.sdk.util.KeyedWorkItem; +import org.apache.beam.sdk.util.KeyedWorkItems; +import org.apache.beam.sdk.util.SystemReduceFn; +import org.apache.beam.sdk.util.TimerInternals; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.util.WindowingInternals; +import org.apache.beam.sdk.util.WindowingStrategy; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.TupleTag; + import com.google.common.base.Preconditions; import com.google.common.collect.HashMultimap; import com.google.common.collect.Multimap; + import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.api.common.accumulators.AccumulatorHelper; import org.apache.flink.core.memory.DataInputView; @@ -41,20 +61,29 @@ import org.apache.flink.runtime.state.AbstractStateBackend; import org.apache.flink.runtime.state.StateHandle; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.KeyedStream; -import org.apache.flink.streaming.api.operators.*; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.ChainingStrategy; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.TimestampedCollector; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.tasks.StreamTaskState; import org.joda.time.Instant; import java.io.IOException; -import java.util.*; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; /** * This class is the key class implementing all the windowing/triggering logic of Apache Beam. * To provide full compatibility and support for all the windowing/triggering combinations offered by * Beam, we opted for a strategy that uses the SDK's code for doing these operations. See the code in - * ({@link com.google.cloud.dataflow.sdk.util.GroupAlsoByWindowsDoFn}. + * ({@link org.apache.beam.sdk.util.GroupAlsoByWindowsDoFn}. * <p/> * In a nutshell, when the execution arrives to this operator, we expect to have a stream <b>already * grouped by key</b>. Each of the elements that enter here, registers a timer @@ -66,7 +95,7 @@ import java.util.*; * When a watermark arrives, all the registered timers are checked to see which ones are ready to * fire (see {@link FlinkGroupAlsoByWindowWrapper#processWatermark(Watermark)}). These are deregistered from * the {@link FlinkGroupAlsoByWindowWrapper#activeTimers} - * list, and are fed into the {@link com.google.cloud.dataflow.sdk.util.GroupAlsoByWindowsDoFn} + * list, and are fed into the {@link org.apache.beam.sdk.util.GroupAlsoByWindowsDoFn} * for furhter processing. */ public class FlinkGroupAlsoByWindowWrapper<K, VIN, VACC, VOUT> @@ -108,11 +137,11 @@ public class FlinkGroupAlsoByWindowWrapper<K, VIN, VACC, VOUT> * <p/> * The difference with {@link #createForIterable(PipelineOptions, PCollection, KeyedStream)} * is that this method assumes that a combiner function is provided - * (see {@link com.google.cloud.dataflow.sdk.transforms.Combine.KeyedCombineFn}). + * (see {@link org.apache.beam.sdk.transforms.Combine.KeyedCombineFn}). * A combiner helps at increasing the speed and, in most of the cases, reduce the per-window state. * * @param options the general job configuration options. - * @param input the input Dataflow {@link com.google.cloud.dataflow.sdk.values.PCollection}. + * @param input the input Dataflow {@link org.apache.beam.sdk.values.PCollection}. * @param groupedStreamByKey the input stream, it is assumed to already be grouped by key. * @param combiner the combiner to be used. * @param outputKvCoder the type of the output values. @@ -151,10 +180,10 @@ public class FlinkGroupAlsoByWindowWrapper<K, VIN, VACC, VOUT> * <p/> * The difference with {@link #create(PipelineOptions, PCollection, KeyedStream, Combine.KeyedCombineFn, KvCoder)} * is that this method assumes no combiner function - * (see {@link com.google.cloud.dataflow.sdk.transforms.Combine.KeyedCombineFn}). + * (see {@link org.apache.beam.sdk.transforms.Combine.KeyedCombineFn}). * * @param options the general job configuration options. - * @param input the input Dataflow {@link com.google.cloud.dataflow.sdk.values.PCollection}. + * @param input the input Dataflow {@link org.apache.beam.sdk.values.PCollection}. * @param groupedStreamByKey the input stream, it is assumed to already be grouped by key. */ public static <K, VIN> DataStream<WindowedValue<KV<K, Iterable<VIN>>>> createForIterable( @@ -224,9 +253,9 @@ public class FlinkGroupAlsoByWindowWrapper<K, VIN, VACC, VOUT> } /** - * Create the adequate {@link com.google.cloud.dataflow.sdk.util.GroupAlsoByWindowsDoFn}, + * Create the adequate {@link org.apache.beam.sdk.util.GroupAlsoByWindowsDoFn}, * <b> if not already created</b>. - * If a {@link com.google.cloud.dataflow.sdk.transforms.Combine.KeyedCombineFn} was provided, then + * If a {@link org.apache.beam.sdk.transforms.Combine.KeyedCombineFn} was provided, then * a function with that combiner is created, so that elements are combined as they arrive. This is * done for speed and (in most of the cases) for reduction of the per-window state. */ @@ -524,7 +553,7 @@ public class FlinkGroupAlsoByWindowWrapper<K, VIN, VACC, VOUT> return new WindowingInternals<KeyedWorkItem<K, VIN>, KV<K, VOUT>>() { @Override - public com.google.cloud.dataflow.sdk.util.state.StateInternals stateInternals() { + public org.apache.beam.sdk.util.state.StateInternals stateInternals() { return stateInternals; } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b9724454/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupByKeyWrapper.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupByKeyWrapper.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupByKeyWrapper.java index 1a6a665..3bf566b 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupByKeyWrapper.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupByKeyWrapper.java @@ -19,11 +19,12 @@ package org.apache.beam.runners.flink.translation.wrappers.streaming; import org.apache.beam.runners.flink.translation.types.CoderTypeInformation; import org.apache.beam.runners.flink.translation.types.VoidCoderTypeSerializer; -import com.google.cloud.dataflow.sdk.coders.Coder; -import com.google.cloud.dataflow.sdk.coders.KvCoder; -import com.google.cloud.dataflow.sdk.coders.VoidCoder; -import com.google.cloud.dataflow.sdk.util.WindowedValue; -import com.google.cloud.dataflow.sdk.values.KV; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.VoidCoder; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.KV; + import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.typeutils.ResultTypeQueryable; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b9724454/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkParDoBoundMultiWrapper.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkParDoBoundMultiWrapper.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkParDoBoundMultiWrapper.java index df7f953..a30a544 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkParDoBoundMultiWrapper.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkParDoBoundMultiWrapper.java @@ -17,21 +17,23 @@ */ package org.apache.beam.runners.flink.translation.wrappers.streaming; -import com.google.cloud.dataflow.sdk.options.PipelineOptions; -import com.google.cloud.dataflow.sdk.transforms.DoFn; -import com.google.cloud.dataflow.sdk.transforms.join.RawUnionValue; -import com.google.cloud.dataflow.sdk.util.WindowedValue; -import com.google.cloud.dataflow.sdk.util.WindowingInternals; -import com.google.cloud.dataflow.sdk.util.WindowingStrategy; -import com.google.cloud.dataflow.sdk.values.TupleTag; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.join.RawUnionValue; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.util.WindowingInternals; +import org.apache.beam.sdk.util.WindowingStrategy; +import org.apache.beam.sdk.values.TupleTag; + import com.google.common.base.Preconditions; + import org.apache.flink.util.Collector; import org.joda.time.Instant; import java.util.Map; /** - * A wrapper for the {@link com.google.cloud.dataflow.sdk.transforms.ParDo.BoundMulti} Beam transformation. + * A wrapper for the {@link org.apache.beam.sdk.transforms.ParDo.BoundMulti} Beam transformation. * */ public class FlinkParDoBoundMultiWrapper<IN, OUT> extends FlinkAbstractParDoWrapper<IN, OUT, RawUnionValue> { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b9724454/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkParDoBoundWrapper.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkParDoBoundWrapper.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkParDoBoundWrapper.java index 2ed5620..4def0c6 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkParDoBoundWrapper.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkParDoBoundWrapper.java @@ -17,23 +17,27 @@ */ package org.apache.beam.runners.flink.translation.wrappers.streaming; -import com.google.cloud.dataflow.sdk.coders.Coder; -import com.google.cloud.dataflow.sdk.options.PipelineOptions; -import com.google.cloud.dataflow.sdk.transforms.DoFn; -import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow; -import com.google.cloud.dataflow.sdk.transforms.windowing.PaneInfo; -import com.google.cloud.dataflow.sdk.util.*; -import com.google.cloud.dataflow.sdk.util.state.StateInternals; -import com.google.cloud.dataflow.sdk.values.PCollectionView; -import com.google.cloud.dataflow.sdk.values.TupleTag; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.util.TimerInternals; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.util.WindowingInternals; +import org.apache.beam.sdk.util.WindowingStrategy; +import org.apache.beam.sdk.util.state.StateInternals; +import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.TupleTag; + import org.apache.flink.util.Collector; import org.joda.time.Instant; import java.io.IOException; -import java.util.*; +import java.util.Collection; /** - * A wrapper for the {@link com.google.cloud.dataflow.sdk.transforms.ParDo.Bound} Beam transformation. + * A wrapper for the {@link org.apache.beam.sdk.transforms.ParDo.Bound} Beam transformation. * */ public class FlinkParDoBoundWrapper<IN, OUT> extends FlinkAbstractParDoWrapper<IN, OUT, OUT> { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b9724454/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/FlinkStreamingCreateFunction.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/FlinkStreamingCreateFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/FlinkStreamingCreateFunction.java index f6c243f..d6aff7d 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/FlinkStreamingCreateFunction.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/FlinkStreamingCreateFunction.java @@ -18,12 +18,12 @@ package org.apache.beam.runners.flink.translation.wrappers.streaming.io; import org.apache.beam.runners.flink.translation.types.VoidCoderTypeSerializer; -import com.google.cloud.dataflow.sdk.coders.Coder; -import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindow; -import com.google.cloud.dataflow.sdk.transforms.windowing.PaneInfo; -import com.google.cloud.dataflow.sdk.util.WindowedValue; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.util.WindowedValue; + import org.apache.flink.api.common.functions.FlatMapFunction; -import org.apache.flink.streaming.api.operators.TimestampedCollector; import org.apache.flink.util.Collector; import org.joda.time.Instant; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b9724454/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSource.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSource.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSource.java index 82984cb..05a8c7a 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSource.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSource.java @@ -17,16 +17,18 @@ */ package org.apache.beam.runners.flink.translation.wrappers.streaming.io; -import com.google.cloud.dataflow.sdk.coders.Coder; -import com.google.cloud.dataflow.sdk.io.UnboundedSource; -import com.google.cloud.dataflow.sdk.options.PipelineOptions; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.io.UnboundedSource; +import org.apache.beam.sdk.options.PipelineOptions; + import com.google.common.base.Preconditions; -import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; + import org.apache.flink.streaming.api.functions.source.SourceFunction; -import javax.annotation.Nullable; import java.util.List; +import javax.annotation.Nullable; + /** * A wrapper translating Flink Sources implementing the {@link SourceFunction} interface, into * unbounded Beam sources (see {@link UnboundedSource}). http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b9724454/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSocketSource.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSocketSource.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSocketSource.java index 1389e9d..08bdb50 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSocketSource.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSocketSource.java @@ -17,15 +17,17 @@ */ package org.apache.beam.runners.flink.translation.wrappers.streaming.io; -import com.google.cloud.dataflow.sdk.coders.Coder; -import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder; -import com.google.cloud.dataflow.sdk.io.UnboundedSource; -import com.google.cloud.dataflow.sdk.options.PipelineOptions; +import static com.google.common.base.Preconditions.checkArgument; + +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.io.UnboundedSource; +import org.apache.beam.sdk.options.PipelineOptions; + import org.joda.time.Instant; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.annotation.Nullable; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; @@ -36,7 +38,7 @@ import java.util.Collections; import java.util.List; import java.util.NoSuchElementException; -import static com.google.common.base.Preconditions.checkArgument; +import javax.annotation.Nullable; /** * An example unbounded Beam source that reads input from a socket. This is used mainly for testing and debugging. http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b9724454/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java index 43cf4b2..dcc7967 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java @@ -17,14 +17,16 @@ */ package org.apache.beam.runners.flink.translation.wrappers.streaming.io; +import org.apache.beam.sdk.io.Read; +import org.apache.beam.sdk.io.UnboundedSource; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.util.WindowedValue; + import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.cloud.dataflow.sdk.io.Read; -import com.google.cloud.dataflow.sdk.io.UnboundedSource; -import com.google.cloud.dataflow.sdk.options.PipelineOptions; -import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow; -import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindow; -import com.google.cloud.dataflow.sdk.transforms.windowing.PaneInfo; -import com.google.cloud.dataflow.sdk.util.WindowedValue; + import org.apache.flink.streaming.api.functions.source.RichSourceFunction; import org.apache.flink.streaming.api.operators.StreamSource; import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; @@ -38,7 +40,7 @@ import java.io.ObjectOutputStream; /** * A wrapper for Beam's unbounded sources. This class wraps around a source implementing the - * {@link com.google.cloud.dataflow.sdk.io.Read.Unbounded} interface. + * {@link org.apache.beam.sdk.io.Read.Unbounded} interface. * * For now we support non-parallel sources, checkpointing is WIP. * */ http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b9724454/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/AbstractFlinkTimerInternals.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/AbstractFlinkTimerInternals.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/AbstractFlinkTimerInternals.java index fc75948..9e55002 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/AbstractFlinkTimerInternals.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/AbstractFlinkTimerInternals.java @@ -17,20 +17,19 @@ */ package org.apache.beam.runners.flink.translation.wrappers.streaming.state; -import org.apache.beam.runners.flink.translation.types.CoderTypeSerializer; -import com.google.cloud.dataflow.sdk.coders.Coder; -import com.google.cloud.dataflow.sdk.coders.KvCoder; -import com.google.cloud.dataflow.sdk.transforms.DoFn; -import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow; -import com.google.cloud.dataflow.sdk.util.TimerInternals; -import com.google.cloud.dataflow.sdk.util.WindowedValue; -import com.google.cloud.dataflow.sdk.values.KV; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.util.TimerInternals; + import org.joda.time.Instant; -import javax.annotation.Nullable; import java.io.IOException; import java.io.Serializable; +import javax.annotation.Nullable; + /** * An implementation of Beam's {@link TimerInternals}, that also provides serialization functionality. * The latter is used when snapshots of the current state are taken, for fault-tolerance. http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b9724454/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java index 6cf46e5..352c550 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java @@ -17,22 +17,39 @@ */ package org.apache.beam.runners.flink.translation.wrappers.streaming.state; -import com.google.cloud.dataflow.sdk.coders.Coder; -import com.google.cloud.dataflow.sdk.options.PipelineOptions; -import com.google.cloud.dataflow.sdk.transforms.Combine; -import com.google.cloud.dataflow.sdk.transforms.CombineWithContext; -import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow; -import com.google.cloud.dataflow.sdk.transforms.windowing.OutputTimeFn; -import com.google.cloud.dataflow.sdk.util.state.*; -import com.google.cloud.dataflow.sdk.values.PCollectionView; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.Combine; +import org.apache.beam.sdk.transforms.CombineWithContext; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.OutputTimeFn; +import org.apache.beam.sdk.util.state.AccumulatorCombiningState; +import org.apache.beam.sdk.util.state.BagState; +import org.apache.beam.sdk.util.state.ReadableState; +import org.apache.beam.sdk.util.state.State; +import org.apache.beam.sdk.util.state.StateContext; +import org.apache.beam.sdk.util.state.StateInternals; +import org.apache.beam.sdk.util.state.StateNamespace; +import org.apache.beam.sdk.util.state.StateNamespaces; +import org.apache.beam.sdk.util.state.StateTable; +import org.apache.beam.sdk.util.state.StateTag; +import org.apache.beam.sdk.util.state.StateTags; +import org.apache.beam.sdk.util.state.ValueState; +import org.apache.beam.sdk.util.state.WatermarkHoldState; +import org.apache.beam.sdk.values.PCollectionView; + import com.google.common.base.Preconditions; import com.google.protobuf.ByteString; + import org.apache.flink.util.InstantiationUtil; import org.joda.time.Instant; import java.io.ByteArrayInputStream; import java.io.IOException; -import java.util.*; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; /** * An implementation of the Beam {@link StateInternals}. This implementation simply keeps elements in memory. http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b9724454/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/StateCheckpointReader.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/StateCheckpointReader.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/StateCheckpointReader.java index 5aadccd..5a843ab 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/StateCheckpointReader.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/StateCheckpointReader.java @@ -18,7 +18,9 @@ package org.apache.beam.runners.flink.translation.wrappers.streaming.state; import org.apache.beam.runners.flink.translation.types.CoderTypeSerializer; + import com.google.protobuf.ByteString; + import org.apache.flink.core.memory.DataInputView; import org.joda.time.Instant; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b9724454/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/StateCheckpointUtils.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/StateCheckpointUtils.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/StateCheckpointUtils.java index b2dc33c..4fbd6f0 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/StateCheckpointUtils.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/StateCheckpointUtils.java @@ -18,14 +18,14 @@ package org.apache.beam.runners.flink.translation.wrappers.streaming.state; import org.apache.beam.runners.flink.translation.types.CoderTypeSerializer; -import com.google.cloud.dataflow.sdk.coders.Coder; -import com.google.cloud.dataflow.sdk.transforms.Combine; -import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow; -import com.google.cloud.dataflow.sdk.transforms.windowing.OutputTimeFn; -import com.google.cloud.dataflow.sdk.util.TimeDomain; -import com.google.cloud.dataflow.sdk.util.TimerInternals; -import com.google.cloud.dataflow.sdk.util.state.StateNamespace; -import com.google.cloud.dataflow.sdk.util.state.StateNamespaces; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.OutputTimeFn; +import org.apache.beam.sdk.util.TimeDomain; +import org.apache.beam.sdk.util.TimerInternals; +import org.apache.beam.sdk.util.state.StateNamespace; +import org.apache.beam.sdk.util.state.StateNamespaces; + import org.joda.time.Instant; import java.io.IOException; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b9724454/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/StateCheckpointWriter.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/StateCheckpointWriter.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/StateCheckpointWriter.java index 18e118a..d09157c 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/StateCheckpointWriter.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/StateCheckpointWriter.java @@ -18,7 +18,9 @@ package org.apache.beam.runners.flink.translation.wrappers.streaming.state; import org.apache.beam.runners.flink.translation.types.CoderTypeSerializer; + import com.google.protobuf.ByteString; + import org.apache.flink.runtime.state.AbstractStateBackend; import org.joda.time.Instant; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b9724454/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/AvroITCase.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/AvroITCase.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/AvroITCase.java index 3536f87..113fee0 100644 --- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/AvroITCase.java +++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/AvroITCase.java @@ -18,14 +18,16 @@ package org.apache.beam.runners.flink; -import com.google.cloud.dataflow.sdk.Pipeline; -import com.google.cloud.dataflow.sdk.coders.AvroCoder; -import com.google.cloud.dataflow.sdk.io.AvroIO; -import com.google.cloud.dataflow.sdk.io.TextIO; -import com.google.cloud.dataflow.sdk.transforms.Create; -import com.google.cloud.dataflow.sdk.transforms.DoFn; -import com.google.cloud.dataflow.sdk.transforms.ParDo; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.coders.AvroCoder; +import org.apache.beam.sdk.io.AvroIO; +import org.apache.beam.sdk.io.TextIO; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; + import com.google.common.base.Joiner; + import org.apache.flink.test.util.JavaProgramTestBase; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b9724454/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/FlattenizeITCase.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/FlattenizeITCase.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/FlattenizeITCase.java index 5ae0e83..ac0a3d7 100644 --- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/FlattenizeITCase.java +++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/FlattenizeITCase.java @@ -17,13 +17,15 @@ */ package org.apache.beam.runners.flink; -import com.google.cloud.dataflow.sdk.Pipeline; -import com.google.cloud.dataflow.sdk.io.TextIO; -import com.google.cloud.dataflow.sdk.transforms.Create; -import com.google.cloud.dataflow.sdk.transforms.Flatten; -import com.google.cloud.dataflow.sdk.values.PCollection; -import com.google.cloud.dataflow.sdk.values.PCollectionList; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.io.TextIO; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.Flatten; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionList; + import com.google.common.base.Joiner; + import org.apache.flink.test.util.JavaProgramTestBase; public class FlattenizeITCase extends JavaProgramTestBase { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b9724454/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/FlinkRunnerRegistrarTest.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/FlinkRunnerRegistrarTest.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/FlinkRunnerRegistrarTest.java index 45ca830..bd149c7 100644 --- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/FlinkRunnerRegistrarTest.java +++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/FlinkRunnerRegistrarTest.java @@ -18,12 +18,13 @@ package org.apache.beam.runners.flink; -import com.google.cloud.dataflow.sdk.options.PipelineOptions; -import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; -import org.junit.Test; - import static org.junit.Assert.assertEquals; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; + +import org.junit.Test; + /** * Tests the proper registration of the Flink runner. */ http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b9724454/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/FlinkTestPipeline.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/FlinkTestPipeline.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/FlinkTestPipeline.java index 86a0cd0..f015a66 100644 --- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/FlinkTestPipeline.java +++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/FlinkTestPipeline.java @@ -17,13 +17,13 @@ */ package org.apache.beam.runners.flink; -import com.google.cloud.dataflow.sdk.Pipeline; -import com.google.cloud.dataflow.sdk.PipelineResult; -import com.google.cloud.dataflow.sdk.options.PipelineOptions; -import com.google.cloud.dataflow.sdk.runners.PipelineRunner; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.PipelineResult; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.runners.PipelineRunner; /** - * {@link com.google.cloud.dataflow.sdk.Pipeline} for testing Dataflow programs on the + * {@link org.apache.beam.sdk.Pipeline} for testing Dataflow programs on the * {@link org.apache.beam.runners.flink.FlinkPipelineRunner}. */ public class FlinkTestPipeline extends Pipeline { @@ -31,7 +31,7 @@ public class FlinkTestPipeline extends Pipeline { /** * Creates and returns a new test pipeline for batch execution. * - * <p> Use {@link com.google.cloud.dataflow.sdk.testing.PAssert} to add tests, then call + * <p> Use {@link org.apache.beam.sdk.testing.PAssert} to add tests, then call * {@link Pipeline#run} to execute the pipeline and check the tests. */ public static FlinkTestPipeline createForBatch() { @@ -41,7 +41,7 @@ public class FlinkTestPipeline extends Pipeline { /** * Creates and returns a new test pipeline for streaming execution. * - * <p> Use {@link com.google.cloud.dataflow.sdk.testing.PAssert} to add tests, then call + * <p> Use {@link org.apache.beam.sdk.testing.PAssert} to add tests, then call * {@link Pipeline#run} to execute the pipeline and check the tests. * * @return The Test Pipeline @@ -53,7 +53,7 @@ public class FlinkTestPipeline extends Pipeline { /** * Creates and returns a new test pipeline for streaming or batch execution. * - * <p> Use {@link com.google.cloud.dataflow.sdk.testing.PAssert} to add tests, then call + * <p> Use {@link org.apache.beam.sdk.testing.PAssert} to add tests, then call * {@link Pipeline#run} to execute the pipeline and check the tests. * * @param streaming <code>True</code> for streaming mode, <code>False</code> for batch. http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b9724454/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/JoinExamplesITCase.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/JoinExamplesITCase.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/JoinExamplesITCase.java index f60056d..47685b6 100644 --- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/JoinExamplesITCase.java +++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/JoinExamplesITCase.java @@ -18,13 +18,14 @@ package org.apache.beam.runners.flink; import org.apache.beam.runners.flink.util.JoinExamples; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.io.TextIO; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.values.PCollection; + import com.google.api.services.bigquery.model.TableRow; -import com.google.cloud.dataflow.sdk.Pipeline; -import com.google.cloud.dataflow.sdk.io.TextIO; -import com.google.cloud.dataflow.sdk.transforms.Create; -import com.google.cloud.dataflow.sdk.values.KV; -import com.google.cloud.dataflow.sdk.values.PCollection; import com.google.common.base.Joiner; + import org.apache.flink.test.util.JavaProgramTestBase; import java.util.Arrays; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b9724454/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/MaybeEmptyTestITCase.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/MaybeEmptyTestITCase.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/MaybeEmptyTestITCase.java index 199602c..4d66fa4 100644 --- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/MaybeEmptyTestITCase.java +++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/MaybeEmptyTestITCase.java @@ -17,12 +17,13 @@ */ package org.apache.beam.runners.flink; -import com.google.cloud.dataflow.sdk.Pipeline; -import com.google.cloud.dataflow.sdk.coders.VoidCoder; -import com.google.cloud.dataflow.sdk.io.TextIO; -import com.google.cloud.dataflow.sdk.transforms.Create; -import com.google.cloud.dataflow.sdk.transforms.DoFn; -import com.google.cloud.dataflow.sdk.transforms.ParDo; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.coders.VoidCoder; +import org.apache.beam.sdk.io.TextIO; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; + import org.apache.flink.test.util.JavaProgramTestBase; import java.io.Serializable; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b9724454/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/ParDoMultiOutputITCase.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/ParDoMultiOutputITCase.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/ParDoMultiOutputITCase.java index 403de29..a2ef4e2 100644 --- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/ParDoMultiOutputITCase.java +++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/ParDoMultiOutputITCase.java @@ -17,16 +17,18 @@ */ package org.apache.beam.runners.flink; -import com.google.cloud.dataflow.sdk.Pipeline; -import com.google.cloud.dataflow.sdk.io.TextIO; -import com.google.cloud.dataflow.sdk.transforms.Create; -import com.google.cloud.dataflow.sdk.transforms.DoFn; -import com.google.cloud.dataflow.sdk.transforms.ParDo; -import com.google.cloud.dataflow.sdk.values.PCollection; -import com.google.cloud.dataflow.sdk.values.PCollectionTuple; -import com.google.cloud.dataflow.sdk.values.TupleTag; -import com.google.cloud.dataflow.sdk.values.TupleTagList; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.io.TextIO; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.TupleTagList; + import com.google.common.base.Joiner; + import org.apache.flink.test.util.JavaProgramTestBase; import java.io.Serializable; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b9724454/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/ReadSourceITCase.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/ReadSourceITCase.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/ReadSourceITCase.java index 323c41b..bcad6f1 100644 --- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/ReadSourceITCase.java +++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/ReadSourceITCase.java @@ -17,18 +17,20 @@ */ package org.apache.beam.runners.flink; -import com.google.cloud.dataflow.sdk.Pipeline; -import com.google.cloud.dataflow.sdk.coders.BigEndianIntegerCoder; -import com.google.cloud.dataflow.sdk.coders.Coder; -import com.google.cloud.dataflow.sdk.io.BoundedSource; -import com.google.cloud.dataflow.sdk.io.Read; -import com.google.cloud.dataflow.sdk.io.TextIO; -import com.google.cloud.dataflow.sdk.options.PipelineOptions; -import com.google.cloud.dataflow.sdk.transforms.DoFn; -import com.google.cloud.dataflow.sdk.transforms.ParDo; -import com.google.cloud.dataflow.sdk.values.PCollection; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.coders.BigEndianIntegerCoder; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.io.BoundedSource; +import org.apache.beam.sdk.io.Read; +import org.apache.beam.sdk.io.TextIO; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; + import com.google.common.base.Joiner; import com.google.common.base.Preconditions; + import org.apache.flink.test.util.JavaProgramTestBase; import java.io.IOException; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b9724454/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/RemoveDuplicatesEmptyITCase.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/RemoveDuplicatesEmptyITCase.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/RemoveDuplicatesEmptyITCase.java index 524554a..471d326 100644 --- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/RemoveDuplicatesEmptyITCase.java +++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/RemoveDuplicatesEmptyITCase.java @@ -17,13 +17,15 @@ */ package org.apache.beam.runners.flink; -import com.google.cloud.dataflow.sdk.Pipeline; -import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder; -import com.google.cloud.dataflow.sdk.io.TextIO; -import com.google.cloud.dataflow.sdk.transforms.Create; -import com.google.cloud.dataflow.sdk.transforms.RemoveDuplicates; -import com.google.cloud.dataflow.sdk.values.PCollection; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.io.TextIO; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.RemoveDuplicates; +import org.apache.beam.sdk.values.PCollection; + import com.google.common.base.Joiner; + import org.apache.flink.test.util.JavaProgramTestBase; import java.util.Collections; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b9724454/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/RemoveDuplicatesITCase.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/RemoveDuplicatesITCase.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/RemoveDuplicatesITCase.java index 54e92aa..0544f20 100644 --- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/RemoveDuplicatesITCase.java +++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/RemoveDuplicatesITCase.java @@ -17,13 +17,15 @@ */ package org.apache.beam.runners.flink; -import com.google.cloud.dataflow.sdk.Pipeline; -import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder; -import com.google.cloud.dataflow.sdk.io.TextIO; -import com.google.cloud.dataflow.sdk.transforms.Create; -import com.google.cloud.dataflow.sdk.transforms.RemoveDuplicates; -import com.google.cloud.dataflow.sdk.values.PCollection; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.io.TextIO; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.RemoveDuplicates; +import org.apache.beam.sdk.values.PCollection; + import com.google.common.base.Joiner; + import org.apache.flink.test.util.JavaProgramTestBase; import java.util.Arrays; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b9724454/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/SideInputITCase.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/SideInputITCase.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/SideInputITCase.java index 7f73b83..2c7c65e 100644 --- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/SideInputITCase.java +++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/SideInputITCase.java @@ -17,13 +17,14 @@ */ package org.apache.beam.runners.flink; -import com.google.cloud.dataflow.sdk.Pipeline; -import com.google.cloud.dataflow.sdk.io.TextIO; -import com.google.cloud.dataflow.sdk.transforms.Create; -import com.google.cloud.dataflow.sdk.transforms.DoFn; -import com.google.cloud.dataflow.sdk.transforms.ParDo; -import com.google.cloud.dataflow.sdk.transforms.View; -import com.google.cloud.dataflow.sdk.values.PCollectionView; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.io.TextIO; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.View; +import org.apache.beam.sdk.values.PCollectionView; + import org.apache.flink.test.util.JavaProgramTestBase; import java.io.Serializable; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b9724454/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/TfIdfITCase.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/TfIdfITCase.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/TfIdfITCase.java index b0fb7b8..547f3c3 100644 --- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/TfIdfITCase.java +++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/TfIdfITCase.java @@ -17,16 +17,18 @@ */ package org.apache.beam.runners.flink; -import com.google.cloud.dataflow.examples.complete.TfIdf; -import com.google.cloud.dataflow.sdk.Pipeline; -import com.google.cloud.dataflow.sdk.coders.StringDelegateCoder; -import com.google.cloud.dataflow.sdk.io.TextIO; -import com.google.cloud.dataflow.sdk.transforms.Create; -import com.google.cloud.dataflow.sdk.transforms.Keys; -import com.google.cloud.dataflow.sdk.transforms.RemoveDuplicates; -import com.google.cloud.dataflow.sdk.values.KV; -import com.google.cloud.dataflow.sdk.values.PCollection; +import org.apache.beam.examples.complete.TfIdf; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.coders.StringDelegateCoder; +import org.apache.beam.sdk.io.TextIO; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.Keys; +import org.apache.beam.sdk.transforms.RemoveDuplicates; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; + import com.google.common.base.Joiner; + import org.apache.flink.test.util.JavaProgramTestBase; import java.net.URI; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b9724454/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WordCountITCase.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WordCountITCase.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WordCountITCase.java index 2677c9e..3254e78 100644 --- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WordCountITCase.java +++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WordCountITCase.java @@ -17,14 +17,16 @@ */ package org.apache.beam.runners.flink; -import com.google.cloud.dataflow.examples.WordCount; -import com.google.cloud.dataflow.sdk.Pipeline; -import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder; -import com.google.cloud.dataflow.sdk.io.TextIO; -import com.google.cloud.dataflow.sdk.transforms.Create; -import com.google.cloud.dataflow.sdk.transforms.MapElements; -import com.google.cloud.dataflow.sdk.values.PCollection; +import org.apache.beam.examples.WordCount; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.io.TextIO; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.values.PCollection; + import com.google.common.base.Joiner; + import org.apache.flink.test.util.JavaProgramTestBase; import java.util.Arrays; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b9724454/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WordCountJoin2ITCase.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WordCountJoin2ITCase.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WordCountJoin2ITCase.java index e73c456..6570e7d 100644 --- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WordCountJoin2ITCase.java +++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WordCountJoin2ITCase.java @@ -17,19 +17,21 @@ */ package org.apache.beam.runners.flink; -import com.google.cloud.dataflow.sdk.Pipeline; -import com.google.cloud.dataflow.sdk.io.TextIO; -import com.google.cloud.dataflow.sdk.transforms.Count; -import com.google.cloud.dataflow.sdk.transforms.Create; -import com.google.cloud.dataflow.sdk.transforms.DoFn; -import com.google.cloud.dataflow.sdk.transforms.ParDo; -import com.google.cloud.dataflow.sdk.transforms.join.CoGbkResult; -import com.google.cloud.dataflow.sdk.transforms.join.CoGroupByKey; -import com.google.cloud.dataflow.sdk.transforms.join.KeyedPCollectionTuple; -import com.google.cloud.dataflow.sdk.values.KV; -import com.google.cloud.dataflow.sdk.values.PCollection; -import com.google.cloud.dataflow.sdk.values.TupleTag; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.io.TextIO; +import org.apache.beam.sdk.transforms.Count; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.join.CoGbkResult; +import org.apache.beam.sdk.transforms.join.CoGroupByKey; +import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.TupleTag; + import com.google.common.base.Joiner; + import org.apache.flink.test.util.JavaProgramTestBase; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b9724454/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WordCountJoin3ITCase.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WordCountJoin3ITCase.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WordCountJoin3ITCase.java index 6b57d77..60dc74a 100644 --- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WordCountJoin3ITCase.java +++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WordCountJoin3ITCase.java @@ -17,19 +17,21 @@ */ package org.apache.beam.runners.flink; -import com.google.cloud.dataflow.sdk.Pipeline; -import com.google.cloud.dataflow.sdk.io.TextIO; -import com.google.cloud.dataflow.sdk.transforms.Count; -import com.google.cloud.dataflow.sdk.transforms.Create; -import com.google.cloud.dataflow.sdk.transforms.DoFn; -import com.google.cloud.dataflow.sdk.transforms.ParDo; -import com.google.cloud.dataflow.sdk.transforms.join.CoGbkResult; -import com.google.cloud.dataflow.sdk.transforms.join.CoGroupByKey; -import com.google.cloud.dataflow.sdk.transforms.join.KeyedPCollectionTuple; -import com.google.cloud.dataflow.sdk.values.KV; -import com.google.cloud.dataflow.sdk.values.PCollection; -import com.google.cloud.dataflow.sdk.values.TupleTag; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.io.TextIO; +import org.apache.beam.sdk.transforms.Count; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.join.CoGbkResult; +import org.apache.beam.sdk.transforms.join.CoGroupByKey; +import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.TupleTag; + import com.google.common.base.Joiner; + import org.apache.flink.test.util.JavaProgramTestBase; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b9724454/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WriteSinkITCase.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WriteSinkITCase.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WriteSinkITCase.java index dfa15ce..71514d3 100644 --- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WriteSinkITCase.java +++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WriteSinkITCase.java @@ -18,14 +18,18 @@ package org.apache.beam.runners.flink; -import com.google.cloud.dataflow.sdk.Pipeline; -import com.google.cloud.dataflow.sdk.coders.Coder; -import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder; -import com.google.cloud.dataflow.sdk.io.Sink; -import com.google.cloud.dataflow.sdk.options.PipelineOptions; -import com.google.cloud.dataflow.sdk.transforms.Create; -import com.google.cloud.dataflow.sdk.transforms.Write; +import static org.junit.Assert.assertNotNull; + +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.io.Sink; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.Write; + import com.google.common.base.Joiner; + import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.Path; import org.apache.flink.test.util.JavaProgramTestBase; @@ -34,8 +38,6 @@ import java.io.File; import java.io.PrintWriter; import java.net.URI; -import static org.junit.Assert.*; - /** * Tests the translation of custom Write.Bound sinks. */ http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b9724454/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/GroupAlsoByWindowTest.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/GroupAlsoByWindowTest.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/GroupAlsoByWindowTest.java index 880da59..f3ceba7 100644 --- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/GroupAlsoByWindowTest.java +++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/GroupAlsoByWindowTest.java @@ -19,18 +19,27 @@ package org.apache.beam.runners.flink.streaming; import org.apache.beam.runners.flink.FlinkTestPipeline; import org.apache.beam.runners.flink.translation.wrappers.streaming.FlinkGroupAlsoByWindowWrapper; -import com.google.cloud.dataflow.sdk.Pipeline; -import com.google.cloud.dataflow.sdk.coders.KvCoder; -import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder; -import com.google.cloud.dataflow.sdk.coders.VarIntCoder; -import com.google.cloud.dataflow.sdk.transforms.Combine; -import com.google.cloud.dataflow.sdk.transforms.Sum; -import com.google.cloud.dataflow.sdk.transforms.windowing.*; -import com.google.cloud.dataflow.sdk.util.UserCodeException; -import com.google.cloud.dataflow.sdk.util.WindowedValue; -import com.google.cloud.dataflow.sdk.util.WindowingStrategy; -import com.google.cloud.dataflow.sdk.values.KV; -import com.google.common.base.Throwables; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.coders.VarIntCoder; +import org.apache.beam.sdk.transforms.Combine; +import org.apache.beam.sdk.transforms.Sum; +import org.apache.beam.sdk.transforms.windowing.AfterPane; +import org.apache.beam.sdk.transforms.windowing.AfterWatermark; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.FixedWindows; +import org.apache.beam.sdk.transforms.windowing.IntervalWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.transforms.windowing.Repeatedly; +import org.apache.beam.sdk.transforms.windowing.Sessions; +import org.apache.beam.sdk.transforms.windowing.SlidingWindows; +import org.apache.beam.sdk.transforms.windowing.WindowFn; +import org.apache.beam.sdk.util.UserCodeException; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.util.WindowingStrategy; +import org.apache.beam.sdk.values.KV; + import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; @@ -73,9 +82,9 @@ public class GroupAlsoByWindowTest { /** * The default accumulation mode is - * {@link com.google.cloud.dataflow.sdk.util.WindowingStrategy.AccumulationMode#DISCARDING_FIRED_PANES}. + * {@link org.apache.beam.sdk.util.WindowingStrategy.AccumulationMode#DISCARDING_FIRED_PANES}. * This strategy changes it to - * {@link com.google.cloud.dataflow.sdk.util.WindowingStrategy.AccumulationMode#ACCUMULATING_FIRED_PANES} + * {@link org.apache.beam.sdk.util.WindowingStrategy.AccumulationMode#ACCUMULATING_FIRED_PANES} */ private final WindowingStrategy fixedWindowWithCompoundTriggerStrategyAcc = fixedWindowWithCompoundTriggerStrategy http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b9724454/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/GroupByNullKeyTest.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/GroupByNullKeyTest.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/GroupByNullKeyTest.java index 63e0bcf..9f7bd90 100644 --- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/GroupByNullKeyTest.java +++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/GroupByNullKeyTest.java @@ -18,19 +18,20 @@ package org.apache.beam.runners.flink.streaming; import org.apache.beam.runners.flink.FlinkTestPipeline; -import com.google.cloud.dataflow.sdk.Pipeline; -import com.google.cloud.dataflow.sdk.io.TextIO; -import com.google.cloud.dataflow.sdk.transforms.Create; -import com.google.cloud.dataflow.sdk.transforms.DoFn; -import com.google.cloud.dataflow.sdk.transforms.GroupByKey; -import com.google.cloud.dataflow.sdk.transforms.ParDo; -import com.google.cloud.dataflow.sdk.transforms.windowing.AfterWatermark; -import com.google.cloud.dataflow.sdk.transforms.windowing.FixedWindows; -import com.google.cloud.dataflow.sdk.transforms.windowing.Window; -import com.google.cloud.dataflow.sdk.values.KV; -import com.google.cloud.dataflow.sdk.values.PCollection; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.io.TextIO; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.windowing.AfterWatermark; +import org.apache.beam.sdk.transforms.windowing.FixedWindows; +import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; + import com.google.common.base.Joiner; -import org.apache.flink.api.java.tuple.Tuple2; + import org.apache.flink.streaming.util.StreamingProgramTestBase; import org.joda.time.Duration; import org.joda.time.Instant;