http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c623a271/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java index 1d06b1a..443378f 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java @@ -17,6 +17,8 @@ */ package org.apache.beam.runners.flink.translation.wrappers; +import java.io.IOException; +import java.util.List; import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions; import org.apache.beam.sdk.io.BoundedSource; import org.apache.beam.sdk.io.Source; @@ -24,7 +26,6 @@ import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.WindowedValue; - import org.apache.flink.api.common.io.DefaultInputSplitAssigner; import org.apache.flink.api.common.io.InputFormat; import org.apache.flink.api.common.io.statistics.BaseStatistics; @@ -34,9 +35,6 @@ import org.joda.time.Instant; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.util.List; - /** * Wrapper for executing a {@link Source} as a Flink {@link InputFormat}.
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c623a271/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputSplit.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputSplit.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputSplit.java index c3672c0..e4a7386 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputSplit.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputSplit.java @@ -18,7 +18,6 @@ package org.apache.beam.runners.flink.translation.wrappers; import org.apache.beam.sdk.io.Source; - import org.apache.flink.core.io.InputSplit; /** http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c623a271/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java index 092a226..000d69f 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java @@ -18,7 +18,15 @@ package org.apache.beam.runners.flink.translation.wrappers.streaming; import avro.shaded.com.google.common.base.Preconditions; - +import com.google.common.collect.Iterables; +import java.io.IOException; +import java.io.Serializable; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import org.apache.beam.runners.core.SideInputHandler; import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions; import org.apache.beam.runners.flink.translation.wrappers.SerializableFnAggregatorWrapper; @@ -44,9 +52,6 @@ import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.util.state.StateInternals; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; - -import com.google.common.collect.Iterables; - import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.common.state.ListState; @@ -69,15 +74,6 @@ import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.tasks.StreamTaskState; -import java.io.IOException; -import java.io.Serializable; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - /** * Flink operator for executing {@link DoFn DoFns}. * http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c623a271/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkStateInternals.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkStateInternals.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkStateInternals.java index a3cf2e2..2e10400 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkStateInternals.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkStateInternals.java @@ -17,6 +17,11 @@ */ package org.apache.beam.runners.flink.translation.wrappers.streaming; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.Map; import org.apache.beam.runners.flink.translation.types.CoderTypeInformation; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderException; @@ -38,20 +43,12 @@ import org.apache.beam.sdk.util.state.StateNamespace; import org.apache.beam.sdk.util.state.StateTag; import org.apache.beam.sdk.util.state.ValueState; import org.apache.beam.sdk.util.state.WatermarkHoldState; - -import com.google.common.collect.Iterables; -import com.google.common.collect.Lists; - import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeutils.base.StringSerializer; import org.apache.flink.runtime.state.AbstractStateBackend; import org.joda.time.Instant; -import java.nio.ByteBuffer; -import java.util.HashMap; -import java.util.Map; - /** * {@link StateInternals} that uses a Flink {@link AbstractStateBackend} to * manage state. http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c623a271/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItem.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItem.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItem.java index 94bf3af..5751aac 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItem.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItem.java @@ -17,12 +17,11 @@ */ package org.apache.beam.runners.flink.translation.wrappers.streaming; +import java.util.Collections; import org.apache.beam.sdk.util.KeyedWorkItem; import org.apache.beam.sdk.util.TimerInternals; import org.apache.beam.sdk.util.WindowedValue; -import java.util.Collections; - public class SingletonKeyedWorkItem<K, ElemT> implements KeyedWorkItem<K, ElemT> { final K key; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c623a271/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItemCoder.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItemCoder.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItemCoder.java index 323f572..5e583e9 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItemCoder.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItemCoder.java @@ -19,6 +19,13 @@ package org.apache.beam.runners.flink.translation.wrappers.streaming; import static com.google.common.base.Preconditions.checkArgument; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.ImmutableList; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.List; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderException; import org.apache.beam.sdk.coders.StandardCoder; @@ -28,16 +35,6 @@ import org.apache.beam.sdk.util.KeyedWorkItemCoder; import org.apache.beam.sdk.util.PropertyNames; import org.apache.beam.sdk.util.WindowedValue; -import com.google.common.collect.ImmutableList; - -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; - -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.util.List; - public class SingletonKeyedWorkItemCoder<K, ElemT> extends StandardCoder<SingletonKeyedWorkItem<K, ElemT>> { /** * Create a new {@link KeyedWorkItemCoder} with the provided key coder, element coder, and window http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c623a271/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java index 73c1eed..c6dde51 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java @@ -17,6 +17,22 @@ */ package org.apache.beam.runners.flink.translation.wrappers.streaming; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.PriorityQueue; +import java.util.Queue; +import java.util.Set; +import javax.annotation.Nullable; import org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetDoFn; import org.apache.beam.runners.flink.translation.wrappers.DataInputViewWrapper; import org.apache.beam.sdk.coders.Coder; @@ -37,7 +53,6 @@ import org.apache.beam.sdk.util.state.StateInternalsFactory; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; - import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.core.memory.DataInputView; @@ -47,23 +62,6 @@ import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.tasks.StreamTaskState; import org.joda.time.Instant; -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.nio.ByteBuffer; -import java.util.Collection; -import java.util.Collections; -import java.util.Comparator; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.PriorityQueue; -import java.util.Queue; -import java.util.Set; -import javax.annotation.Nullable; - /** * Flink operator for executing window {@link DoFn DoFns}. * http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c623a271/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WorkItemKeySelector.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WorkItemKeySelector.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WorkItemKeySelector.java index 2bbed58..51d9e0c 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WorkItemKeySelector.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WorkItemKeySelector.java @@ -17,19 +17,16 @@ */ package org.apache.beam.runners.flink.translation.wrappers.streaming; +import java.nio.ByteBuffer; import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.VoidCoder; import org.apache.beam.sdk.util.CoderUtils; import org.apache.beam.sdk.util.KeyedWorkItem; import org.apache.beam.sdk.util.WindowedValue; - import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.typeutils.GenericTypeInfo; import org.apache.flink.api.java.typeutils.ResultTypeQueryable; -import java.nio.ByteBuffer; - /** * {@link KeySelector} that retrieves a key from a {@link KeyedWorkItem}. This will return * the key as encoded by the provided {@link Coder} in a {@link ByteBuffer}. This ensures http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c623a271/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java index 9d983b0..3cb93c0 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java @@ -17,15 +17,15 @@ */ package org.apache.beam.runners.flink.translation.wrappers.streaming.io; +import com.google.common.annotations.VisibleForTesting; +import java.util.ArrayList; +import java.util.List; import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions; import org.apache.beam.sdk.io.BoundedSource; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.WindowedValue; - -import com.google.common.annotations.VisibleForTesting; - import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; import org.apache.flink.streaming.api.operators.StreamSource; import org.apache.flink.streaming.api.watermark.Watermark; @@ -33,9 +33,6 @@ import org.joda.time.Instant; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.ArrayList; -import java.util.List; - /** * Wrapper for executing {@link BoundedSource UnboundedSources} as a Flink Source. */ http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c623a271/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSink.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSink.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSink.java index 098473d..2117e9d 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSink.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSink.java @@ -18,23 +18,19 @@ package org.apache.beam.runners.flink.translation.wrappers.streaming.io; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Collection; +import java.util.List; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderException; import org.apache.beam.sdk.io.Sink; import org.apache.beam.sdk.io.UnboundedSource; -import org.apache.beam.sdk.io.Write; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.util.CloudObject; import org.apache.beam.sdk.util.common.ElementByteSizeObserver; import org.apache.flink.streaming.api.functions.sink.SinkFunction; -import org.apache.flink.streaming.api.functions.source.SourceFunction; - -import javax.annotation.Nullable; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.util.Collection; -import java.util.List; /** * A wrapper translating Flink sinks implementing the {@link SinkFunction} interface, into http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c623a271/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSource.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSource.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSource.java index 716ca30..c6e0825 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSource.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSource.java @@ -19,18 +19,15 @@ package org.apache.beam.runners.flink.translation.wrappers.streaming.io; import static com.google.common.base.Preconditions.checkNotNull; +import java.util.List; +import javax.annotation.Nullable; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.io.UnboundedSource; import org.apache.beam.sdk.options.PipelineOptions; - import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; import org.apache.flink.streaming.api.functions.IngestionTimeExtractor; import org.apache.flink.streaming.api.functions.source.SourceFunction; -import java.util.List; - -import javax.annotation.Nullable; - /** * A wrapper translating Flink Sources implementing the {@link SourceFunction} interface, into * unbounded Beam sources (see {@link UnboundedSource}). http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c623a271/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSocketSource.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSocketSource.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSocketSource.java index 08bdb50..8d37fe7 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSocketSource.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSocketSource.java @@ -19,15 +19,6 @@ package org.apache.beam.runners.flink.translation.wrappers.streaming.io; import static com.google.common.base.Preconditions.checkArgument; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.io.UnboundedSource; -import org.apache.beam.sdk.options.PipelineOptions; - -import org.joda.time.Instant; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; @@ -37,8 +28,14 @@ import java.net.Socket; import java.util.Collections; import java.util.List; import java.util.NoSuchElementException; - import javax.annotation.Nullable; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.io.UnboundedSource; +import org.apache.beam.sdk.options.PipelineOptions; +import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * An example unbounded Beam source that reads input from a socket. This is used mainly for testing and debugging. http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c623a271/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java index 7f26a65..8647322 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java @@ -17,6 +17,12 @@ */ package org.apache.beam.runners.flink.translation.wrappers.streaming.io; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Function; +import com.google.common.collect.Lists; +import java.io.ByteArrayInputStream; +import java.util.ArrayList; +import java.util.List; import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.KvCoder; @@ -29,11 +35,6 @@ import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.TypeDescriptor; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Function; -import com.google.common.collect.Lists; - import org.apache.commons.io.output.ByteArrayOutputStream; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.checkpoint.Checkpointed; @@ -46,10 +47,6 @@ import org.joda.time.Instant; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.ByteArrayInputStream; -import java.util.ArrayList; -import java.util.List; - /** * Wrapper for executing {@link UnboundedSource UnboundedSources} as a Flink Source. */ http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c623a271/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/EncodedValueComparatorTest.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/EncodedValueComparatorTest.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/EncodedValueComparatorTest.java index 68ede89..10d6d9d 100644 --- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/EncodedValueComparatorTest.java +++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/EncodedValueComparatorTest.java @@ -22,7 +22,6 @@ import org.apache.beam.runners.flink.translation.types.EncodedValueTypeInformati import org.apache.beam.sdk.coders.CoderException; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.util.CoderUtils; - import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeutils.ComparatorTestBase; import org.apache.flink.api.common.typeutils.TypeComparator; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c623a271/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/FlinkRunnerRegistrarTest.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/FlinkRunnerRegistrarTest.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/FlinkRunnerRegistrarTest.java index ff1025f..d9d174c 100644 --- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/FlinkRunnerRegistrarTest.java +++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/FlinkRunnerRegistrarTest.java @@ -22,7 +22,6 @@ import static org.junit.Assert.assertEquals; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; - import org.junit.Test; /** http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c623a271/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java index 77e8a47..32339dc 100644 --- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java +++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java @@ -21,6 +21,8 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; +import java.util.Collections; +import java.util.HashMap; import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions; import org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator; import org.apache.beam.sdk.options.Default; @@ -34,7 +36,6 @@ import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; - import org.apache.commons.lang.SerializationUtils; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeinfo.TypeHint; @@ -46,9 +47,6 @@ import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; -import java.util.Collections; -import java.util.HashMap; - /** * Tests for serialization and deserialization of {@link PipelineOptions} in {@link DoFnOperator}. */ http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c623a271/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/ReadSourceITCase.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/ReadSourceITCase.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/ReadSourceITCase.java index 516c7ba..44c9017 100644 --- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/ReadSourceITCase.java +++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/ReadSourceITCase.java @@ -17,20 +17,17 @@ */ package org.apache.beam.runners.flink; +import com.google.common.base.Joiner; +import java.io.File; +import java.net.URI; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.io.CountingInput; import org.apache.beam.sdk.io.TextIO; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.values.PCollection; - -import com.google.common.base.Joiner; - import org.apache.flink.test.util.JavaProgramTestBase; -import java.io.File; -import java.net.URI; - /** * Reads from a bounded source in batch execution. */ http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c623a271/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/ReadSourceStreamingITCase.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/ReadSourceStreamingITCase.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/ReadSourceStreamingITCase.java index ea58d0d..79b7882 100644 --- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/ReadSourceStreamingITCase.java +++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/ReadSourceStreamingITCase.java @@ -17,14 +17,12 @@ */ package org.apache.beam.runners.flink; +import com.google.common.base.Joiner; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.io.CountingInput; import org.apache.beam.sdk.io.TextIO; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.ParDo; - -import com.google.common.base.Joiner; - import org.apache.flink.streaming.util.StreamingProgramTestBase; /** http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c623a271/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WriteSinkITCase.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WriteSinkITCase.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WriteSinkITCase.java index f1d9097..0988146 100644 --- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WriteSinkITCase.java +++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WriteSinkITCase.java @@ -20,6 +20,11 @@ package org.apache.beam.runners.flink; import static org.junit.Assert.assertNotNull; +import com.google.common.base.Joiner; +import java.io.File; +import java.io.IOException; +import java.io.PrintWriter; +import java.net.URI; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.StringUtf8Coder; @@ -27,18 +32,10 @@ import org.apache.beam.sdk.io.Sink; import org.apache.beam.sdk.io.Write; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.Create; - -import com.google.common.base.Joiner; - import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.Path; import org.apache.flink.test.util.JavaProgramTestBase; -import java.io.File; -import java.io.IOException; -import java.io.PrintWriter; -import java.net.URI; - /** * Tests the translation of custom Write.Bound sinks. */ http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c623a271/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java index fb1b1e8..5f1b066 100644 --- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java +++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java @@ -20,6 +20,10 @@ package org.apache.beam.runners.flink.streaming; import static org.hamcrest.collection.IsIterableContainingInOrder.contains; import static org.junit.Assert.assertThat; +import com.google.common.collect.ImmutableList; +import java.util.Collections; +import java.util.HashMap; +import javax.annotation.Nullable; import org.apache.beam.runners.flink.FlinkPipelineOptions; import org.apache.beam.runners.flink.translation.types.CoderTypeInformation; import org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator; @@ -36,9 +40,6 @@ import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; - -import com.google.common.collect.ImmutableList; - import org.apache.flink.shaded.com.google.common.base.Function; import org.apache.flink.shaded.com.google.common.base.Predicate; import org.apache.flink.shaded.com.google.common.collect.FluentIterable; @@ -53,10 +54,6 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; -import java.util.Collections; -import java.util.HashMap; -import javax.annotation.Nullable; - /** * Tests for {@link DoFnOperator}. */ http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c623a271/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java index 627f545..711ae00 100644 --- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java +++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java @@ -22,6 +22,8 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertThat; +import java.nio.ByteBuffer; +import java.util.Arrays; import org.apache.beam.runners.flink.translation.wrappers.streaming.FlinkStateInternals; import org.apache.beam.sdk.coders.CoderException; import org.apache.beam.sdk.coders.StringUtf8Coder; @@ -42,7 +44,6 @@ import org.apache.beam.sdk.util.state.StateTag; import org.apache.beam.sdk.util.state.StateTags; import org.apache.beam.sdk.util.state.ValueState; import org.apache.beam.sdk.util.state.WatermarkHoldState; - import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.java.typeutils.GenericTypeInfo; import org.apache.flink.runtime.operators.testutils.DummyEnvironment; @@ -54,9 +55,6 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; -import java.nio.ByteBuffer; -import java.util.Arrays; - /** * Tests for {@link FlinkStateInternals}. This is based on the tests for {@code InMemoryStateInternals}. */ http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c623a271/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/GroupByNullKeyTest.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/GroupByNullKeyTest.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/GroupByNullKeyTest.java index ca183a8..ab98c27 100644 --- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/GroupByNullKeyTest.java +++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/GroupByNullKeyTest.java @@ -17,28 +17,25 @@ */ package org.apache.beam.runners.flink.streaming; +import com.google.common.base.Joiner; +import java.io.Serializable; +import java.util.Arrays; import org.apache.beam.runners.flink.FlinkTestPipeline; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.io.TextIO; import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.windowing.AfterWatermark; import org.apache.beam.sdk.transforms.windowing.FixedWindows; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; - -import com.google.common.base.Joiner; - import org.apache.flink.streaming.util.StreamingProgramTestBase; import org.joda.time.Duration; import org.joda.time.Instant; -import java.io.Serializable; -import java.util.Arrays; - public class GroupByNullKeyTest extends StreamingProgramTestBase implements Serializable { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c623a271/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/TestCountingSource.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/TestCountingSource.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/TestCountingSource.java index 3ced02e..9251d42 100644 --- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/TestCountingSource.java +++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/TestCountingSource.java @@ -19,6 +19,11 @@ package org.apache.beam.runners.flink.streaming; import static org.apache.beam.sdk.util.CoderUtils.encodeToByteArray; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ThreadLocalRandom; +import javax.annotation.Nullable; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.DelegateCoder; import org.apache.beam.sdk.coders.KvCoder; @@ -26,17 +31,10 @@ import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.io.UnboundedSource; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.values.KV; - import org.joda.time.Instant; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.ThreadLocalRandom; -import javax.annotation.Nullable; - /** * An unbounded source for testing the unbounded sources framework code. * http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c623a271/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/TopWikipediaSessionsITCase.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/TopWikipediaSessionsITCase.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/TopWikipediaSessionsITCase.java index 7912aee..64f978f 100644 --- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/TopWikipediaSessionsITCase.java +++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/TopWikipediaSessionsITCase.java @@ -17,6 +17,10 @@ */ package org.apache.beam.runners.flink.streaming; +import com.google.api.services.bigquery.model.TableRow; +import com.google.common.base.Joiner; +import java.io.Serializable; +import java.util.Arrays; import org.apache.beam.runners.flink.FlinkTestPipeline; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.io.TextIO; @@ -28,17 +32,10 @@ import org.apache.beam.sdk.transforms.windowing.Sessions; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; - -import com.google.api.services.bigquery.model.TableRow; -import com.google.common.base.Joiner; - import org.apache.flink.streaming.util.StreamingProgramTestBase; import org.joda.time.Duration; import org.joda.time.Instant; -import java.io.Serializable; -import java.util.Arrays; - /** * Session window test http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c623a271/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java index 1122179..a70ad49 100644 --- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java +++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java @@ -23,12 +23,14 @@ import static org.junit.Assert.fail; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; import org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.KV; - import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.configuration.Configuration; @@ -43,10 +45,6 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.tasks.StreamTask; import org.junit.Test; -import java.util.Collections; -import java.util.HashSet; -import java.util.Set; - /** * Tests for {@link UnboundedSourceWrapper}. */ http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c623a271/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BlockingDataflowRunner.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BlockingDataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BlockingDataflowRunner.java index e7cd67e..d265361 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BlockingDataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BlockingDataflowRunner.java @@ -17,6 +17,8 @@ */ package org.apache.beam.runners.dataflow; +import java.io.IOException; +import javax.annotation.Nullable; import org.apache.beam.runners.dataflow.options.BlockingDataflowPipelineOptions; import org.apache.beam.runners.dataflow.util.MonitoringUtil; import org.apache.beam.sdk.Pipeline; @@ -28,15 +30,10 @@ import org.apache.beam.sdk.runners.PipelineRunner; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.values.PInput; import org.apache.beam.sdk.values.POutput; - import org.joda.time.Duration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; - -import javax.annotation.Nullable; - /** * A {@link PipelineRunner} that's like {@link DataflowRunner} * but that waits for the launched job to finish. http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c623a271/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowJobException.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowJobException.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowJobException.java index 74c0f80..6e32fde 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowJobException.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowJobException.java @@ -18,7 +18,6 @@ package org.apache.beam.runners.dataflow; import java.util.Objects; - import javax.annotation.Nullable; /** http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c623a271/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java index 3d0f145..9a515fa 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java @@ -19,17 +19,6 @@ package org.apache.beam.runners.dataflow; import static org.apache.beam.runners.dataflow.util.TimeUtil.fromCloudTime; -import org.apache.beam.runners.dataflow.internal.DataflowAggregatorTransforms; -import org.apache.beam.runners.dataflow.internal.DataflowMetricUpdateExtractor; -import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; -import org.apache.beam.runners.dataflow.util.MonitoringUtil; -import org.apache.beam.sdk.AggregatorRetrievalException; -import org.apache.beam.sdk.AggregatorValues; -import org.apache.beam.sdk.PipelineResult; -import org.apache.beam.sdk.transforms.Aggregator; -import org.apache.beam.sdk.util.AttemptAndTimeBoundedExponentialBackOff; -import org.apache.beam.sdk.util.AttemptBoundedExponentialBackOff; - import com.google.api.client.googleapis.json.GoogleJsonResponseException; import com.google.api.client.util.BackOff; import com.google.api.client.util.BackOffUtils; @@ -41,18 +30,25 @@ import com.google.api.services.dataflow.model.JobMetrics; import com.google.api.services.dataflow.model.MetricUpdate; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.MoreObjects; - -import org.joda.time.Duration; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.io.IOException; import java.net.SocketTimeoutException; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; - import javax.annotation.Nullable; +import org.apache.beam.runners.dataflow.internal.DataflowAggregatorTransforms; +import org.apache.beam.runners.dataflow.internal.DataflowMetricUpdateExtractor; +import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; +import org.apache.beam.runners.dataflow.util.MonitoringUtil; +import org.apache.beam.sdk.AggregatorRetrievalException; +import org.apache.beam.sdk.AggregatorValues; +import org.apache.beam.sdk.PipelineResult; +import org.apache.beam.sdk.transforms.Aggregator; +import org.apache.beam.sdk.util.AttemptAndTimeBoundedExponentialBackOff; +import org.apache.beam.sdk.util.AttemptBoundedExponentialBackOff; +import org.joda.time.Duration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * A DataflowPipelineJob represents a job submitted to Dataflow using http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c623a271/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineRegistrar.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineRegistrar.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineRegistrar.java index 4c88c4e..5090a8a 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineRegistrar.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineRegistrar.java @@ -17,6 +17,8 @@ */ package org.apache.beam.runners.dataflow; +import com.google.auto.service.AutoService; +import com.google.common.collect.ImmutableList; import org.apache.beam.runners.dataflow.options.BlockingDataflowPipelineOptions; import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; import org.apache.beam.sdk.options.PipelineOptions; @@ -24,9 +26,6 @@ import org.apache.beam.sdk.options.PipelineOptionsRegistrar; import org.apache.beam.sdk.runners.PipelineRunner; import org.apache.beam.sdk.runners.PipelineRunnerRegistrar; -import com.google.auto.service.AutoService; -import com.google.common.collect.ImmutableList; - /** * Contains the {@link PipelineOptionsRegistrar} and {@link PipelineRunnerRegistrar} for the * {@link DataflowRunner}. http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c623a271/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java index b8d9445..0d72881 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java @@ -17,6 +17,10 @@ */ package org.apache.beam.runners.dataflow; +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; +import static com.google.common.base.Strings.isNullOrEmpty; import static org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray; import static org.apache.beam.sdk.util.StringUtils.byteArrayToJsonString; import static org.apache.beam.sdk.util.StringUtils.jsonStringToByteArray; @@ -28,11 +32,24 @@ import static org.apache.beam.sdk.util.Structs.addObject; import static org.apache.beam.sdk.util.Structs.addString; import static org.apache.beam.sdk.util.Structs.getString; -import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkNotNull; -import static com.google.common.base.Preconditions.checkState; -import static com.google.common.base.Strings.isNullOrEmpty; - +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.api.services.dataflow.model.AutoscalingSettings; +import com.google.api.services.dataflow.model.DataflowPackage; +import com.google.api.services.dataflow.model.Disk; +import com.google.api.services.dataflow.model.Environment; +import com.google.api.services.dataflow.model.Job; +import com.google.api.services.dataflow.model.Step; +import com.google.api.services.dataflow.model.WorkerPool; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import javax.annotation.Nullable; import org.apache.beam.runners.dataflow.DataflowRunner.GroupByKeyAndSortValuesOnly; import org.apache.beam.runners.dataflow.internal.ReadTranslator; import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; @@ -71,30 +88,9 @@ import org.apache.beam.sdk.values.POutput; import org.apache.beam.sdk.values.PValue; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.TypedPValue; - -import com.google.api.services.dataflow.model.AutoscalingSettings; -import com.google.api.services.dataflow.model.DataflowPackage; -import com.google.api.services.dataflow.model.Disk; -import com.google.api.services.dataflow.model.Environment; -import com.google.api.services.dataflow.model.Job; -import com.google.api.services.dataflow.model.Step; -import com.google.api.services.dataflow.model.WorkerPool; - -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import javax.annotation.Nullable; - /** * {@link DataflowPipelineTranslator} knows how to translate {@link Pipeline} objects * into Cloud Dataflow Service API {@link Job}s. http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c623a271/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index 7f632a1..a0e24b1 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -17,13 +17,61 @@ */ package org.apache.beam.runners.dataflow; -import static org.apache.beam.sdk.util.StringUtils.approximatePTransformName; -import static org.apache.beam.sdk.util.StringUtils.approximateSimpleName; -import static org.apache.beam.sdk.util.WindowedValue.valueInEmptyWindows; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; import static com.google.common.base.Strings.isNullOrEmpty; +import static org.apache.beam.sdk.util.StringUtils.approximatePTransformName; +import static org.apache.beam.sdk.util.StringUtils.approximateSimpleName; +import static org.apache.beam.sdk.util.WindowedValue.valueInEmptyWindows; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.api.client.googleapis.json.GoogleJsonResponseException; +import com.google.api.services.clouddebugger.v2.Clouddebugger; +import com.google.api.services.clouddebugger.v2.model.Debuggee; +import com.google.api.services.clouddebugger.v2.model.RegisterDebuggeeRequest; +import com.google.api.services.clouddebugger.v2.model.RegisterDebuggeeResponse; +import com.google.api.services.dataflow.Dataflow; +import com.google.api.services.dataflow.model.DataflowPackage; +import com.google.api.services.dataflow.model.Job; +import com.google.api.services.dataflow.model.ListJobsResponse; +import com.google.api.services.dataflow.model.WorkerPool; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Function; +import com.google.common.base.Joiner; +import com.google.common.base.Optional; +import com.google.common.base.Utf8; +import com.google.common.collect.ForwardingMap; +import com.google.common.collect.HashMultimap; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; +import com.google.common.collect.Maps; +import com.google.common.collect.Multimap; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.PrintWriter; +import java.io.Serializable; +import java.net.URISyntaxException; +import java.net.URL; +import java.net.URLClassLoader; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; +import javax.annotation.Nullable; import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.JobSpecification; import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.TransformTranslator; import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.TranslationContext; @@ -114,65 +162,12 @@ import org.apache.beam.sdk.values.POutput; import org.apache.beam.sdk.values.PValue; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.TupleTagList; - -import com.google.api.client.googleapis.json.GoogleJsonResponseException; -import com.google.api.services.clouddebugger.v2.Clouddebugger; -import com.google.api.services.clouddebugger.v2.model.Debuggee; -import com.google.api.services.clouddebugger.v2.model.RegisterDebuggeeRequest; -import com.google.api.services.clouddebugger.v2.model.RegisterDebuggeeResponse; -import com.google.api.services.dataflow.Dataflow; -import com.google.api.services.dataflow.model.DataflowPackage; -import com.google.api.services.dataflow.model.Job; -import com.google.api.services.dataflow.model.ListJobsResponse; -import com.google.api.services.dataflow.model.WorkerPool; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Function; -import com.google.common.base.Joiner; -import com.google.common.base.Optional; -import com.google.common.base.Utf8; -import com.google.common.collect.ForwardingMap; -import com.google.common.collect.HashMultimap; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Iterables; -import com.google.common.collect.Maps; -import com.google.common.collect.Multimap; - -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; - import org.joda.time.DateTimeUtils; import org.joda.time.DateTimeZone; import org.joda.time.format.DateTimeFormat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.File; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.io.PrintWriter; -import java.io.Serializable; -import java.net.URISyntaxException; -import java.net.URL; -import java.net.URLClassLoader; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Random; -import java.util.Set; -import java.util.SortedSet; -import java.util.TreeSet; - -import javax.annotation.Nullable; - /** * A {@link PipelineRunner} that executes the operations in the pipeline by first translating them * to the Dataflow representation using the {@link DataflowPipelineTranslator} and then submitting http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c623a271/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunnerHooks.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunnerHooks.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunnerHooks.java index 05297ec..b67421d 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunnerHooks.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunnerHooks.java @@ -17,9 +17,8 @@ */ package org.apache.beam.runners.dataflow; -import org.apache.beam.sdk.annotations.Experimental; - import com.google.api.services.dataflow.model.Environment; +import org.apache.beam.sdk.annotations.Experimental; /** * An instance of this class can be passed to the http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c623a271/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/CustomSources.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/CustomSources.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/CustomSources.java index 73e5da0..aeb8103 100755 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/CustomSources.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/CustomSources.java @@ -17,30 +17,26 @@ */ package org.apache.beam.runners.dataflow.internal; +import static com.google.api.client.util.Base64.encodeBase64String; +import static com.google.common.base.Preconditions.checkArgument; import static org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray; import static org.apache.beam.sdk.util.Structs.addString; import static org.apache.beam.sdk.util.Structs.addStringList; -import static com.google.api.client.util.Base64.encodeBase64String; -import static com.google.common.base.Preconditions.checkArgument; - +import com.google.api.services.dataflow.model.SourceMetadata; +import com.google.common.annotations.VisibleForTesting; +import com.google.protobuf.ByteString; +import java.util.ArrayList; +import java.util.List; import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; import org.apache.beam.sdk.io.BoundedSource; import org.apache.beam.sdk.io.Source; import org.apache.beam.sdk.io.UnboundedSource; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.util.CloudObject; - -import com.google.api.services.dataflow.model.SourceMetadata; -import com.google.common.annotations.VisibleForTesting; -import com.google.protobuf.ByteString; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.ArrayList; -import java.util.List; - /** * A helper class for supporting sources defined as {@code Source}. http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c623a271/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowAggregatorTransforms.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowAggregatorTransforms.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowAggregatorTransforms.java index 7a08fde..fb78973 100755 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowAggregatorTransforms.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowAggregatorTransforms.java @@ -17,19 +17,17 @@ */ package org.apache.beam.runners.dataflow.internal; -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.transforms.Aggregator; -import org.apache.beam.sdk.transforms.AppliedPTransform; -import org.apache.beam.sdk.transforms.PTransform; - import com.google.common.collect.BiMap; import com.google.common.collect.HashBiMap; import com.google.common.collect.HashMultimap; import com.google.common.collect.Multimap; - import java.util.Collection; import java.util.HashSet; import java.util.Map; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.transforms.Aggregator; +import org.apache.beam.sdk.transforms.AppliedPTransform; +import org.apache.beam.sdk.transforms.PTransform; /** * A mapping relating {@link Aggregator}s and the {@link PTransform} in which they are used. http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c623a271/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowMetricUpdateExtractor.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowMetricUpdateExtractor.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowMetricUpdateExtractor.java index 8ab59fc..d715437 100755 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowMetricUpdateExtractor.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowMetricUpdateExtractor.java @@ -17,18 +17,16 @@ */ package org.apache.beam.runners.dataflow.internal; -import org.apache.beam.sdk.transforms.Aggregator; -import org.apache.beam.sdk.transforms.AppliedPTransform; -import org.apache.beam.sdk.transforms.Combine.CombineFn; -import org.apache.beam.sdk.transforms.PTransform; - import com.google.api.services.dataflow.model.MetricStructuredName; import com.google.api.services.dataflow.model.MetricUpdate; - import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; +import org.apache.beam.sdk.transforms.Aggregator; +import org.apache.beam.sdk.transforms.AppliedPTransform; +import org.apache.beam.sdk.transforms.Combine.CombineFn; +import org.apache.beam.sdk.transforms.PTransform; /** * Methods for extracting the values of an {@link Aggregator} from a collection of {@link http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c623a271/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowUnboundedReadFromBoundedSource.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowUnboundedReadFromBoundedSource.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowUnboundedReadFromBoundedSource.java index 283f012..85f5e73 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowUnboundedReadFromBoundedSource.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowUnboundedReadFromBoundedSource.java @@ -17,10 +17,25 @@ */ package org.apache.beam.runners.dataflow.internal; -import static org.apache.beam.sdk.util.StringUtils.approximateSimpleName; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; +import static org.apache.beam.sdk.util.StringUtils.approximateSimpleName; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Function; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Arrays; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import javax.annotation.Nullable; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderException; import org.apache.beam.sdk.coders.ListCoder; @@ -39,30 +54,10 @@ import org.apache.beam.sdk.util.PropertyNames; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PInput; import org.apache.beam.sdk.values.TimestampedValue; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Function; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Lists; - -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; - import org.joda.time.Instant; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.util.Arrays; -import java.util.Collections; -import java.util.Iterator; -import java.util.List; -import java.util.NoSuchElementException; - -import javax.annotation.Nullable; - /** * {@link PTransform} that converts a {@link BoundedSource} as an {@link UnboundedSource}. * http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c623a271/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/IsmFormat.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/IsmFormat.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/IsmFormat.java index d8bfe42..6f4a18b 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/IsmFormat.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/IsmFormat.java @@ -17,12 +17,25 @@ */ package org.apache.beam.runners.dataflow.internal; -import static org.apache.beam.sdk.util.Structs.addLong; - import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; +import static org.apache.beam.sdk.util.Structs.addLong; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.auto.value.AutoValue; +import com.google.common.collect.ImmutableList; +import com.google.common.hash.HashFunction; +import com.google.common.hash.Hashing; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.List; +import javax.annotation.Nullable; import org.apache.beam.runners.dataflow.util.RandomAccessData; import org.apache.beam.sdk.coders.AtomicCoder; import org.apache.beam.sdk.coders.ByteArrayCoder; @@ -38,24 +51,6 @@ import org.apache.beam.sdk.util.PropertyNames; import org.apache.beam.sdk.util.VarInt; import org.apache.beam.sdk.values.PCollection; -import com.google.auto.value.AutoValue; -import com.google.common.collect.ImmutableList; -import com.google.common.hash.HashFunction; -import com.google.common.hash.Hashing; - -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; - -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.util.ArrayList; -import java.util.List; - -import javax.annotation.Nullable; - /** * An Ism file is a prefix encoded composite key value file broken into shards. Each composite * key is composed of a fixed number of component keys. A fixed number of those sub keys represent http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c623a271/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/ReadTranslator.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/ReadTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/ReadTranslator.java index 373738a..094f405 100755 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/ReadTranslator.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/ReadTranslator.java @@ -21,6 +21,9 @@ import static org.apache.beam.sdk.util.Structs.addBoolean; import static org.apache.beam.sdk.util.Structs.addDictionary; import static org.apache.beam.sdk.util.Structs.addLong; +import com.google.api.services.dataflow.model.SourceMetadata; +import java.util.HashMap; +import java.util.Map; import org.apache.beam.runners.dataflow.DataflowPipelineTranslator; import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.TransformTranslator; import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.TranslationContext; @@ -31,11 +34,6 @@ import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.util.PropertyNames; import org.apache.beam.sdk.values.PValue; -import com.google.api.services.dataflow.model.SourceMetadata; - -import java.util.HashMap; -import java.util.Map; - /** * Translator for the {@code Read} {@code PTransform} for the Dataflow back-end. */ http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c623a271/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/CloudDebuggerOptions.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/CloudDebuggerOptions.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/CloudDebuggerOptions.java index dbfbb16..d1c8e7a 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/CloudDebuggerOptions.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/CloudDebuggerOptions.java @@ -17,15 +17,13 @@ */ package org.apache.beam.runners.dataflow.options; +import com.google.api.services.clouddebugger.v2.model.Debuggee; +import javax.annotation.Nullable; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.options.Default; import org.apache.beam.sdk.options.Description; import org.apache.beam.sdk.options.Hidden; -import com.google.api.services.clouddebugger.v2.model.Debuggee; - -import javax.annotation.Nullable; - /** * Options for controlling Cloud Debugger. */ http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c623a271/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java index ac2e0b7..dfe538d 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java @@ -17,6 +17,10 @@ */ package org.apache.beam.runners.dataflow.options; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.google.api.services.dataflow.Dataflow; +import java.util.List; +import java.util.Map; import org.apache.beam.runners.dataflow.util.DataflowTransport; import org.apache.beam.runners.dataflow.util.GcsStager; import org.apache.beam.runners.dataflow.util.Stager; @@ -28,13 +32,6 @@ import org.apache.beam.sdk.options.Hidden; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.util.InstanceBuilder; -import com.google.api.services.dataflow.Dataflow; - -import com.fasterxml.jackson.annotation.JsonIgnore; - -import java.util.List; -import java.util.Map; - /** * Internal. Options used to control execution of the Dataflow SDK for * debugging and testing purposes. http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c623a271/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java index 841741f..8ef43c5 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java @@ -20,6 +20,8 @@ package org.apache.beam.runners.dataflow.options; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Strings.isNullOrEmpty; +import com.google.common.base.MoreObjects; +import java.io.IOException; import org.apache.beam.runners.dataflow.DataflowRunner; import org.apache.beam.sdk.options.ApplicationNameOptions; import org.apache.beam.sdk.options.BigQueryOptions; @@ -33,16 +35,11 @@ import org.apache.beam.sdk.options.PubsubOptions; import org.apache.beam.sdk.options.StreamingOptions; import org.apache.beam.sdk.options.Validation; import org.apache.beam.sdk.util.IOChannelUtils; - -import com.google.common.base.MoreObjects; - import org.joda.time.DateTimeUtils; import org.joda.time.DateTimeZone; import org.joda.time.format.DateTimeFormat; import org.joda.time.format.DateTimeFormatter; -import java.io.IOException; - /** * Options that can be used to configure the {@link DataflowRunner}. */ http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c623a271/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineWorkerPoolOptions.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineWorkerPoolOptions.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineWorkerPoolOptions.java index 9b7a9ce..6c59f38 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineWorkerPoolOptions.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineWorkerPoolOptions.java @@ -17,6 +17,9 @@ */ package org.apache.beam.runners.dataflow.options; +import com.fasterxml.jackson.annotation.JsonIgnore; +import java.util.List; +import javax.annotation.Nullable; import org.apache.beam.runners.dataflow.DataflowRunner; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.options.Default; @@ -25,12 +28,6 @@ import org.apache.beam.sdk.options.Description; import org.apache.beam.sdk.options.Hidden; import org.apache.beam.sdk.options.PipelineOptions; -import com.fasterxml.jackson.annotation.JsonIgnore; - -import java.util.List; - -import javax.annotation.Nullable; - /** * Options that are used to configure the Dataflow pipeline worker pool. */ http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c623a271/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowProfilingOptions.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowProfilingOptions.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowProfilingOptions.java index c7b4c91..ae8801b 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowProfilingOptions.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowProfilingOptions.java @@ -17,12 +17,11 @@ */ package org.apache.beam.runners.dataflow.options; +import java.util.HashMap; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.options.Description; import org.apache.beam.sdk.options.Hidden; -import java.util.HashMap; - /** * Options for controlling profiling of pipeline execution. */ http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c623a271/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowWorkerLoggingOptions.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowWorkerLoggingOptions.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowWorkerLoggingOptions.java index 91ac62a..38c139c 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowWorkerLoggingOptions.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowWorkerLoggingOptions.java @@ -19,15 +19,13 @@ package org.apache.beam.runners.dataflow.options; import static com.google.common.base.Preconditions.checkNotNull; -import org.apache.beam.sdk.options.Default; -import org.apache.beam.sdk.options.Description; -import org.apache.beam.sdk.options.PipelineOptions; - import com.fasterxml.jackson.annotation.JsonCreator; - import java.util.Arrays; import java.util.HashMap; import java.util.Map; +import org.apache.beam.sdk.options.Default; +import org.apache.beam.sdk.options.Description; +import org.apache.beam.sdk.options.PipelineOptions; /** * Options that are used to control logging configuration on the Dataflow worker. http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c623a271/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java index f74f4dd..9be773b 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java @@ -19,6 +19,18 @@ package org.apache.beam.runners.dataflow.testing; import static org.hamcrest.MatcherAssert.assertThat; +import com.google.api.services.dataflow.model.JobMessage; +import com.google.api.services.dataflow.model.JobMetrics; +import com.google.api.services.dataflow.model.MetricUpdate; +import com.google.common.base.Joiner; +import com.google.common.base.Optional; +import com.google.common.base.Throwables; +import java.io.IOException; +import java.math.BigDecimal; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; import org.apache.beam.runners.dataflow.DataflowJobExecutionException; import org.apache.beam.runners.dataflow.DataflowPipelineJob; import org.apache.beam.runners.dataflow.DataflowRunner; @@ -34,25 +46,10 @@ import org.apache.beam.sdk.testing.TestPipelineOptions; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.values.PInput; import org.apache.beam.sdk.values.POutput; - -import com.google.api.services.dataflow.model.JobMessage; -import com.google.api.services.dataflow.model.JobMetrics; -import com.google.api.services.dataflow.model.MetricUpdate; -import com.google.common.base.Joiner; -import com.google.common.base.Optional; -import com.google.common.base.Throwables; - import org.joda.time.Duration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.math.BigDecimal; -import java.util.List; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; - /** * {@link TestDataflowRunner} is a pipeline runner that wraps a * {@link DataflowRunner} when running tests against the {@link TestPipeline}. http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c623a271/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DataflowTransport.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DataflowTransport.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DataflowTransport.java index f988749..0391594 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DataflowTransport.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DataflowTransport.java @@ -20,18 +20,16 @@ package org.apache.beam.runners.dataflow.util; import static org.apache.beam.sdk.util.Transport.getJsonFactory; import static org.apache.beam.sdk.util.Transport.getTransport; -import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; -import org.apache.beam.sdk.util.RetryHttpRequestInitializer; - import com.google.api.client.auth.oauth2.Credential; import com.google.api.client.http.HttpRequestInitializer; import com.google.api.services.clouddebugger.v2.Clouddebugger; import com.google.api.services.dataflow.Dataflow; import com.google.cloud.hadoop.util.ChainingHttpRequestInitializer; import com.google.common.collect.ImmutableList; - import java.net.MalformedURLException; import java.net.URL; +import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; +import org.apache.beam.sdk.util.RetryHttpRequestInitializer; /** * Helpers for cloud communication. http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c623a271/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java index 2017313..139db9d 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java @@ -17,13 +17,12 @@ */ package org.apache.beam.runners.dataflow.util; +import java.io.Serializable; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.values.PCollectionView; -import java.io.Serializable; - /** * Wrapper class holding the necessary information to serialize a OldDoFn. * http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c623a271/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/GcsStager.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/GcsStager.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/GcsStager.java index bf25ce4..6ca4c3f 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/GcsStager.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/GcsStager.java @@ -19,14 +19,12 @@ package org.apache.beam.runners.dataflow.util; import static com.google.common.base.Preconditions.checkNotNull; +import com.google.api.services.dataflow.model.DataflowPackage; +import java.util.List; import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions; import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; import org.apache.beam.sdk.options.PipelineOptions; -import com.google.api.services.dataflow.model.DataflowPackage; - -import java.util.List; - /** * Utility class for staging files to GCS. */