http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java index 332d362..3c4cfbd 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java @@ -14,16 +14,9 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.streaming.api.functions.source; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.TreeMap; import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.io.FileInputFormat; @@ -41,9 +34,19 @@ import org.apache.flink.runtime.state.FunctionSnapshotContext; import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; import org.apache.flink.streaming.api.checkpoint.CheckpointedRestoring; import org.apache.flink.util.Preconditions; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; + /** * This is the single (non-parallel) monitoring task which takes a {@link FileInputFormat} * and, depending on the {@link FileProcessingMode} and the {@link FilePathFilter}, it is responsible for:
http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java index 54b6a03..3a9e8e1 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java @@ -14,19 +14,9 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.flink.streaming.api.functions.source; -import static org.apache.flink.util.Preconditions.checkNotNull; -import static org.apache.flink.util.Preconditions.checkState; +package org.apache.flink.streaming.api.functions.source; -import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.Serializable; -import java.util.ArrayList; -import java.util.LinkedList; -import java.util.List; -import java.util.PriorityQueue; -import java.util.Queue; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.io.CheckpointableInputFormat; @@ -50,9 +40,22 @@ import org.apache.flink.streaming.api.operators.StreamSourceContexts; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.util.Preconditions; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; +import java.util.PriorityQueue; +import java.util.Queue; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + /** * The operator that reads the {@link TimestampedFileInputSplit splits} received from the preceding * {@link ContinuousFileMonitoringFunction}. Contrary to the {@link ContinuousFileMonitoringFunction} http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileMonitoringFunction.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileMonitoringFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileMonitoringFunction.java index 2f91227..479cdf0 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileMonitoringFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileMonitoringFunction.java @@ -17,20 +17,22 @@ package org.apache.flink.streaming.api.functions.source; -import java.io.IOException; -import java.net.URI; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; import org.apache.flink.annotation.Internal; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.core.fs.FileStatus; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.Path; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.net.URI; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + /** * A {@link SourceFunction} that monitors a directory and sends events downstream when it detects * new files. Used together with {@link FileReadFunction}. http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileProcessingMode.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileProcessingMode.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileProcessingMode.java index f8c4fba..8659a4f 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileProcessingMode.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileProcessingMode.java @@ -15,6 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.streaming.api.functions.source; import org.apache.flink.annotation.PublicEvolving; http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileReadFunction.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileReadFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileReadFunction.java index 172f7a3..600b32c 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileReadFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileReadFunction.java @@ -17,10 +17,6 @@ package org.apache.flink.streaming.api.functions.source; -import java.io.BufferedReader; -import java.io.InputStreamReader; -import java.net.URI; - import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.tuple.Tuple3; @@ -29,6 +25,10 @@ import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.Path; import org.apache.flink.util.Collector; +import java.io.BufferedReader; +import java.io.InputStreamReader; +import java.net.URI; + /** * This is used together with {@link FileMonitoringFunction} to read from files that the * monitoring functions discovers. http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java index 745a26b..691d3d6 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java @@ -17,13 +17,6 @@ package org.apache.flink.streaming.api.functions.source; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.List; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; @@ -37,6 +30,14 @@ import org.apache.flink.runtime.state.FunctionSnapshotContext; import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; import org.apache.flink.util.Preconditions; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; + /** * A stream source function that returns a sequence of elements. * http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FromIteratorFunction.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FromIteratorFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FromIteratorFunction.java index b844a0a..a8b527f 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FromIteratorFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FromIteratorFunction.java @@ -17,9 +17,10 @@ package org.apache.flink.streaming.api.functions.source; -import java.util.Iterator; import org.apache.flink.annotation.PublicEvolving; +import java.util.Iterator; + /** * A {@link SourceFunction} that reads elements from an {@link Iterator} and emits them. */ http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FromSplittableIteratorFunction.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FromSplittableIteratorFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FromSplittableIteratorFunction.java index 68e18bb..db6c8a2 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FromSplittableIteratorFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FromSplittableIteratorFunction.java @@ -17,11 +17,12 @@ package org.apache.flink.streaming.api.functions.source; -import java.util.Iterator; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.configuration.Configuration; import org.apache.flink.util.SplittableIterator; +import java.util.Iterator; + /** * A {@link SourceFunction} that reads elements from an {@link SplittableIterator} and emits them. */ http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunction.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunction.java index e6e6ea3..3fdedf1 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunction.java @@ -17,8 +17,6 @@ package org.apache.flink.streaming.api.functions.source; -import java.util.Iterator; -import java.util.NoSuchElementException; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.io.InputFormat; import org.apache.flink.api.common.io.RichInputFormat; @@ -31,6 +29,9 @@ import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider; import org.apache.flink.runtime.jobgraph.tasks.InputSplitProviderException; import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; +import java.util.Iterator; +import java.util.NoSuchElementException; + /** * A {@link SourceFunction} that reads data using an {@link InputFormat}. */ http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledgingSourceBase.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledgingSourceBase.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledgingSourceBase.java index 3da1ec3..ab21586 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledgingSourceBase.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledgingSourceBase.java @@ -18,13 +18,6 @@ package org.apache.flink.streaming.api.functions.source; -import java.util.ArrayDeque; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Set; - import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.state.ListState; @@ -38,9 +31,17 @@ import org.apache.flink.runtime.state.FunctionSnapshotContext; import org.apache.flink.runtime.state.SerializedCheckpointData; import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; import org.apache.flink.util.Preconditions; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Set; + /** * Abstract base class for data sources that receive elements from a message queue and * acknowledge them back by IDs. http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MultipleIdsMessageAcknowledgingSourceBase.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MultipleIdsMessageAcknowledgingSourceBase.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MultipleIdsMessageAcknowledgingSourceBase.java index ceef4e0..e7cdb99 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MultipleIdsMessageAcknowledgingSourceBase.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MultipleIdsMessageAcknowledgingSourceBase.java @@ -18,19 +18,21 @@ package org.apache.flink.streaming.api.functions.source; -import java.util.ArrayDeque; -import java.util.ArrayList; -import java.util.Deque; -import java.util.Iterator; -import java.util.List; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.state.FunctionSnapshotContext; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Deque; +import java.util.Iterator; +import java.util.List; + /** * Abstract base class for data sources that receive elements from a message queue and * acknowledge them back by IDs. In contrast to {@link MessageAcknowledgingSourceBase}, this source @@ -132,7 +134,6 @@ public abstract class MultipleIdsMessageAcknowledgingSourceBase<Type, UId, Sessi // Checkpointing the data // ------------------------------------------------------------------------ - @Override public void snapshotState(FunctionSnapshotContext context) throws Exception { sessionIdsPerSnapshot.add(new Tuple2<>(context.getCheckpointId(), sessionIds)); http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java index e7d1673..1552ee2 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java @@ -17,17 +17,19 @@ package org.apache.flink.streaming.api.functions.source; -import static org.apache.flink.util.Preconditions.checkArgument; -import static org.apache.flink.util.Preconditions.checkNotNull; +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.util.IOUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.BufferedReader; import java.io.InputStreamReader; import java.net.InetSocketAddress; import java.net.Socket; -import org.apache.flink.annotation.PublicEvolving; -import org.apache.flink.util.IOUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; /** * A source function that reads strings from a socket. The source will read bytes from the socket @@ -64,7 +66,6 @@ public class SocketTextStreamFunction implements SourceFunction<String> { private volatile boolean isRunning = true; - public SocketTextStreamFunction(String hostname, int port, String delimiter, long maxNumRetries) { this(hostname, port, delimiter, maxNumRetries, DEFAULT_CONNECTION_RETRY_SLEEP); } http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java index 0db8984..4665cc6 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java @@ -18,7 +18,6 @@ package org.apache.flink.streaming.api.functions.source; -import java.io.Serializable; import org.apache.flink.annotation.Public; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.functions.Function; @@ -26,6 +25,8 @@ import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.functions.TimestampAssigner; import org.apache.flink.streaming.api.watermark.Watermark; +import java.io.Serializable; + /** * Base interface for all stream data sources in Flink. The contract of a stream source * is the following: When the source should start emitting elements, the {@link #run} method http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/StatefulSequenceSource.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/StatefulSequenceSource.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/StatefulSequenceSource.java index 598ccb3..6213ce0 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/StatefulSequenceSource.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/StatefulSequenceSource.java @@ -15,10 +15,9 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.streaming.api.functions.source; -import java.util.ArrayDeque; -import java.util.Deque; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; @@ -28,6 +27,9 @@ import org.apache.flink.runtime.state.FunctionSnapshotContext; import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; import org.apache.flink.util.Preconditions; +import java.util.ArrayDeque; +import java.util.Deque; + /** * A stateful streaming source that emits each number from a given interval exactly once, * possibly in parallel. http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/TimestampedFileInputSplit.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/TimestampedFileInputSplit.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/TimestampedFileInputSplit.java index ddc3559..4111d91 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/TimestampedFileInputSplit.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/TimestampedFileInputSplit.java @@ -17,11 +17,12 @@ package org.apache.flink.streaming.api.functions.source; -import java.io.Serializable; import org.apache.flink.core.fs.FileInputSplit; import org.apache.flink.core.fs.Path; import org.apache.flink.util.Preconditions; +import java.io.Serializable; + /** * An extended {@link FileInputSplit} that also includes information about: * <ul> http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/timestamps/AscendingTimestampExtractor.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/timestamps/AscendingTimestampExtractor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/timestamps/AscendingTimestampExtractor.java index 500348a..8a675d0 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/timestamps/AscendingTimestampExtractor.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/timestamps/AscendingTimestampExtractor.java @@ -18,14 +18,15 @@ package org.apache.flink.streaming.api.functions.timestamps; -import static java.util.Objects.requireNonNull; - import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; import org.apache.flink.streaming.api.watermark.Watermark; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static java.util.Objects.requireNonNull; + /** * A timestamp assigner and watermark generator for streams where timestamps are monotonously * ascending. In this case, the local watermarks for the streams are easy to generate, because http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/timestamps/BoundedOutOfOrdernessTimestampExtractor.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/timestamps/BoundedOutOfOrdernessTimestampExtractor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/timestamps/BoundedOutOfOrdernessTimestampExtractor.java index 982c283..9936b8e 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/timestamps/BoundedOutOfOrdernessTimestampExtractor.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/timestamps/BoundedOutOfOrdernessTimestampExtractor.java @@ -15,6 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.streaming.api.functions.timestamps; import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/AggregateApplyAllWindowFunction.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/AggregateApplyAllWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/AggregateApplyAllWindowFunction.java index 3d6c1e1..e20b878 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/AggregateApplyAllWindowFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/AggregateApplyAllWindowFunction.java @@ -18,13 +18,14 @@ package org.apache.flink.streaming.api.functions.windowing; -import java.util.Collections; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.functions.AggregateFunction; import org.apache.flink.api.java.operators.translation.WrappingFunction; import org.apache.flink.streaming.api.windowing.windows.Window; import org.apache.flink.util.Collector; +import java.util.Collections; + /** * A {@link AllWindowFunction} that composes an {@link AggregateFunction} and {@link AllWindowFunction}. * Upon invocation, this first applies {@code AggregateFunction} to the input, and then http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/AggregateApplyWindowFunction.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/AggregateApplyWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/AggregateApplyWindowFunction.java index ce5363f..6d2d7f4 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/AggregateApplyWindowFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/AggregateApplyWindowFunction.java @@ -15,15 +15,17 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.streaming.api.functions.windowing; -import java.util.Collections; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.functions.AggregateFunction; import org.apache.flink.api.java.operators.translation.WrappingFunction; import org.apache.flink.streaming.api.windowing.windows.Window; import org.apache.flink.util.Collector; +import java.util.Collections; + /** * A {@link WindowFunction} that composes an {@link AggregateFunction} and {@link WindowFunction}. * Upon invocation, this first applies {@code AggregateFunction} to the input, and then http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/AllWindowFunction.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/AllWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/AllWindowFunction.java index 04817b4..c497b4a 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/AllWindowFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/AllWindowFunction.java @@ -18,12 +18,13 @@ package org.apache.flink.streaming.api.functions.windowing; -import java.io.Serializable; import org.apache.flink.annotation.Public; import org.apache.flink.api.common.functions.Function; import org.apache.flink.streaming.api.windowing.windows.Window; import org.apache.flink.util.Collector; +import java.io.Serializable; + /** * Base interface for functions that are evaluated over non-keyed windows. * http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyAllWindowFunction.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyAllWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyAllWindowFunction.java index 2069f7a..5b2abeb 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyAllWindowFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyAllWindowFunction.java @@ -15,12 +15,9 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.streaming.api.functions.windowing; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.util.Collections; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.functions.FoldFunction; @@ -34,6 +31,11 @@ import org.apache.flink.streaming.api.operators.OutputTypeConfigurable; import org.apache.flink.streaming.api.windowing.windows.Window; import org.apache.flink.util.Collector; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.Collections; + /** * Internal {@link AllWindowFunction} that is used for implementing a fold on a window configuration * that only allows {@link AllWindowFunction} and cannot directly execute a {@link FoldFunction}. http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessAllWindowFunction.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessAllWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessAllWindowFunction.java index 1d39252..362956d 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessAllWindowFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessAllWindowFunction.java @@ -15,12 +15,9 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.streaming.api.functions.windowing; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.util.Collections; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.functions.FoldFunction; @@ -35,6 +32,11 @@ import org.apache.flink.streaming.api.operators.OutputTypeConfigurable; import org.apache.flink.streaming.api.windowing.windows.Window; import org.apache.flink.util.Collector; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.Collections; + /** * Internal {@link ProcessAllWindowFunction} that is used for implementing a fold on a window * configuration that only allows {@link ProcessAllWindowFunction} and cannot directly execute a http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessWindowFunction.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessWindowFunction.java index fa4fe86..183a676 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessWindowFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessWindowFunction.java @@ -15,12 +15,9 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.streaming.api.functions.windowing; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.util.Collections; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.functions.FoldFunction; @@ -35,6 +32,11 @@ import org.apache.flink.streaming.api.operators.OutputTypeConfigurable; import org.apache.flink.streaming.api.windowing.windows.Window; import org.apache.flink.util.Collector; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.Collections; + /** * Internal {@link ProcessWindowFunction} that is used for implementing a fold on a window * configuration that only allows {@link ProcessWindowFunction} and cannot directly execute a http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyWindowFunction.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyWindowFunction.java index 865dbc9..2ac4b62 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyWindowFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyWindowFunction.java @@ -15,12 +15,9 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.streaming.api.functions.windowing; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.util.Collections; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.functions.FoldFunction; @@ -34,6 +31,11 @@ import org.apache.flink.streaming.api.operators.OutputTypeConfigurable; import org.apache.flink.streaming.api.windowing.windows.Window; import org.apache.flink.util.Collector; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.Collections; + /** * Internal {@link WindowFunction} that is used for implementing a fold on a window configuration * that only allows {@link WindowFunction} and cannot directly execute a {@link FoldFunction}. http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/InternalProcessApplyAllWindowContext.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/InternalProcessApplyAllWindowContext.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/InternalProcessApplyAllWindowContext.java index e1a0a98..a27d71b 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/InternalProcessApplyAllWindowContext.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/InternalProcessApplyAllWindowContext.java @@ -15,6 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.streaming.api.functions.windowing; import org.apache.flink.annotation.Internal; http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/InternalProcessApplyWindowContext.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/InternalProcessApplyWindowContext.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/InternalProcessApplyWindowContext.java index fc31b07..47a2e3a 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/InternalProcessApplyWindowContext.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/InternalProcessApplyWindowContext.java @@ -15,6 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.streaming.api.functions.windowing; import org.apache.flink.annotation.Internal; http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/PassThroughAllWindowFunction.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/PassThroughAllWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/PassThroughAllWindowFunction.java index fb1ff81..a1a338b 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/PassThroughAllWindowFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/PassThroughAllWindowFunction.java @@ -15,6 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.streaming.api.functions.windowing; import org.apache.flink.annotation.Internal; http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/PassThroughWindowFunction.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/PassThroughWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/PassThroughWindowFunction.java index edc495c..ab64859 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/PassThroughWindowFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/PassThroughWindowFunction.java @@ -15,6 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.streaming.api.functions.windowing; import org.apache.flink.annotation.Internal; http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyAllWindowFunction.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyAllWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyAllWindowFunction.java index a8469f5..7c636ea 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyAllWindowFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyAllWindowFunction.java @@ -15,15 +15,17 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.streaming.api.functions.windowing; -import java.util.Collections; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.java.operators.translation.WrappingFunction; import org.apache.flink.streaming.api.windowing.windows.Window; import org.apache.flink.util.Collector; +import java.util.Collections; + /** * Internal {@link AllWindowFunction} that is used for implementing a fold on a window configuration * that only allows {@link AllWindowFunction} and cannot directly execute a {@link ReduceFunction}. http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessAllWindowFunction.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessAllWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessAllWindowFunction.java index e7e6609..108ba9e 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessAllWindowFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessAllWindowFunction.java @@ -15,9 +15,9 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.streaming.api.functions.windowing; -import java.util.Collections; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.common.functions.RuntimeContext; @@ -26,6 +26,8 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.windowing.windows.Window; import org.apache.flink.util.Collector; +import java.util.Collections; + /** * Internal {@link ProcessAllWindowFunction} that is used for implementing a fold on a window * configuration that only allows {@link AllWindowFunction} and cannot directly execute a http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessWindowFunction.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessWindowFunction.java index 18037b7..1e3f5a2 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessWindowFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessWindowFunction.java @@ -15,9 +15,9 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.streaming.api.functions.windowing; -import java.util.Collections; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.common.functions.RuntimeContext; @@ -26,6 +26,8 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.windowing.windows.Window; import org.apache.flink.util.Collector; +import java.util.Collections; + /** * Internal {@link ProcessWindowFunction} that is used for implementing a fold on a window * configuration that only allows {@link AllWindowFunction} and cannot directly execute a http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyWindowFunction.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyWindowFunction.java index 5890ec7..ff9d03e 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyWindowFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyWindowFunction.java @@ -15,15 +15,17 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.streaming.api.functions.windowing; -import java.util.Collections; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.java.operators.translation.WrappingFunction; import org.apache.flink.streaming.api.windowing.windows.Window; import org.apache.flink.util.Collector; +import java.util.Collections; + /** * Internal {@link WindowFunction} that is used for implementing a fold on a window configuration * that only allows {@link WindowFunction} and cannot directly execute a {@link ReduceFunction}. http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/RichAllWindowFunction.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/RichAllWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/RichAllWindowFunction.java index b0df6d8..b28a6c2 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/RichAllWindowFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/RichAllWindowFunction.java @@ -15,6 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.streaming.api.functions.windowing; import org.apache.flink.annotation.Public; http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/WindowFunction.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/WindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/WindowFunction.java index 96e665a..030e973 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/WindowFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/WindowFunction.java @@ -18,12 +18,13 @@ package org.apache.flink.streaming.api.functions.windowing; -import java.io.Serializable; import org.apache.flink.annotation.Public; import org.apache.flink.api.common.functions.Function; import org.apache.flink.streaming.api.windowing.windows.Window; import org.apache.flink.util.Collector; +import java.io.Serializable; + /** * Base interface for functions that are evaluated over keyed (grouped) windows. * http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/DeltaFunction.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/DeltaFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/DeltaFunction.java index 7420a5a..ed1f830 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/DeltaFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/DeltaFunction.java @@ -17,9 +17,10 @@ package org.apache.flink.streaming.api.functions.windowing.delta; -import java.io.Serializable; import org.apache.flink.annotation.PublicEvolving; +import java.io.Serializable; + /** * This interface allows the implementation of a function which calculates the * delta between two data points. Delta functions might be used in delta http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/Extractor.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/Extractor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/Extractor.java index 7914792..4263285 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/Extractor.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/Extractor.java @@ -17,9 +17,10 @@ package org.apache.flink.streaming.api.functions.windowing.delta.extractor; -import java.io.Serializable; import org.apache.flink.annotation.Internal; +import java.io.Serializable; + /** * Extractors allow to extract/convert one type to another. They are mostly used * to extract some fields out of a more complex structure (Tuple/Array) to run http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldFromArray.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldFromArray.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldFromArray.java index f8d1be3..9fc6354 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldFromArray.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldFromArray.java @@ -17,9 +17,10 @@ package org.apache.flink.streaming.api.functions.windowing.delta.extractor; -import java.lang.reflect.Array; import org.apache.flink.annotation.Internal; +import java.lang.reflect.Array; + /** * Extracts a single field out of an array. * http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldsFromArray.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldsFromArray.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldsFromArray.java index 0d4e0f9..1fa9eb8 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldsFromArray.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldsFromArray.java @@ -17,9 +17,10 @@ package org.apache.flink.streaming.api.functions.windowing.delta.extractor; -import java.lang.reflect.Array; import org.apache.flink.annotation.Internal; +import java.lang.reflect.Array; + /** * Extracts multiple fields from an array and puts them into a new array of the * specified type. http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/FunctionMasterCheckpointHookFactory.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/FunctionMasterCheckpointHookFactory.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/FunctionMasterCheckpointHookFactory.java index c256698..79d1dda 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/FunctionMasterCheckpointHookFactory.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/FunctionMasterCheckpointHookFactory.java @@ -18,11 +18,11 @@ package org.apache.flink.streaming.api.graph; -import static org.apache.flink.util.Preconditions.checkNotNull; - import org.apache.flink.runtime.checkpoint.MasterTriggerRestoreHook; import org.apache.flink.streaming.api.checkpoint.WithMasterCheckpointHook; +import static org.apache.flink.util.Preconditions.checkNotNull; + /** * Utility class that turns a {@link WithMasterCheckpointHook} into a * {@link org.apache.flink.runtime.checkpoint.MasterTriggerRestoreHook.Factory}. http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/JSONGenerator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/JSONGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/JSONGenerator.java index d4e4873..a9bb0b6 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/JSONGenerator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/JSONGenerator.java @@ -17,6 +17,13 @@ package org.apache.flink.streaming.api.graph; +import org.apache.flink.annotation.Internal; +import org.apache.flink.streaming.api.operators.StreamOperator; + +import org.apache.sling.commons.json.JSONArray; +import org.apache.sling.commons.json.JSONException; +import org.apache.sling.commons.json.JSONObject; + import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; @@ -24,12 +31,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import org.apache.flink.annotation.Internal; -import org.apache.flink.streaming.api.operators.StreamOperator; -import org.apache.sling.commons.json.JSONArray; -import org.apache.sling.commons.json.JSONException; -import org.apache.sling.commons.json.JSONObject; - /** * Helper class for generating a JSON representation from a {@link StreamGraph}. */ http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java index d5ee13a..77caa34 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java @@ -17,13 +17,6 @@ package org.apache.flink.streaming.api.graph; -import java.io.IOException; -import java.io.Serializable; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.functions.KeySelector; @@ -40,6 +33,14 @@ import org.apache.flink.util.InstantiationUtil; import org.apache.flink.util.OutputTag; import org.apache.flink.util.Preconditions; +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + /** * Internal configuration for a {@link StreamOperator}. This is created and populated by the * {@link StreamingJobGraphGenerator}. http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java index 95c80a8..2e89932 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java @@ -17,12 +17,13 @@ package org.apache.flink.streaming.api.graph; -import java.io.Serializable; -import java.util.List; import org.apache.flink.annotation.Internal; import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner; import org.apache.flink.util.OutputTag; +import java.io.Serializable; +import java.util.List; + /** * An edge in the streaming topology. One edge like this does not necessarily * gets converted to a connection between two job vertices (due to http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java index 4026817..20a361e 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java @@ -17,17 +17,6 @@ package org.apache.flink.streaming.api.graph; -import java.io.File; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.PrintWriter; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.io.InputFormat; @@ -60,9 +49,22 @@ import org.apache.flink.streaming.runtime.tasks.StreamIterationHead; import org.apache.flink.streaming.runtime.tasks.StreamIterationTail; import org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask; import org.apache.flink.util.OutputTag; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.PrintWriter; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + /** * Class representing the streaming topology. It contains all the information * necessary to build the jobgraph for the execution. @@ -93,7 +95,6 @@ public class StreamGraph extends StreamingPlan { private AbstractStateBackend stateBackend; private Set<Tuple2<StreamNode, StreamNode>> iterationSourceSinkPairs; - public StreamGraph(StreamExecutionEnvironment environment) { this.environment = environment; this.executionConfig = environment.getConfig(); @@ -118,7 +119,6 @@ public class StreamGraph extends StreamingPlan { sinks = new HashSet<>(); } - public StreamExecutionEnvironment getEnvironment() { return environment; } @@ -157,7 +157,6 @@ public class StreamGraph extends StreamingPlan { return chaining; } - public boolean isIterative() { return !vertexIDtoLoopTimeout.isEmpty(); } @@ -558,7 +557,6 @@ public class StreamGraph extends StreamingPlan { return sources; } - public Collection<Integer> getSinkIDs() { return sinks; } http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java index b20f6d6..70b9fd4 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java @@ -15,14 +15,9 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.streaming.api.graph; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.tuple.Tuple2; @@ -41,9 +36,17 @@ import org.apache.flink.streaming.api.transformations.SplitTransformation; import org.apache.flink.streaming.api.transformations.StreamTransformation; import org.apache.flink.streaming.api.transformations.TwoInputTransformation; import org.apache.flink.streaming.api.transformations.UnionTransformation; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + /** * A generator that generates a {@link StreamGraph} from a graph of * {@link StreamTransformation StreamTransformations}. @@ -264,7 +267,6 @@ public class StreamGraphGenerator { streamGraph.addOutputSelector(inputId, split.getOutputSelector()); } - return resultIds; } @@ -279,7 +281,6 @@ public class StreamGraphGenerator { StreamTransformation<T> input = select.getInput(); Collection<Integer> resultIds = transform(input); - // the recursive transform might have already transformed this if (alreadyTransformed.containsKey(select)) { return alreadyTransformed.get(select); @@ -307,7 +308,6 @@ public class StreamGraphGenerator { StreamTransformation<?> input = sideOutput.getInput(); Collection<Integer> resultIds = transform(input); - // the recursive transform might have already transformed this if (alreadyTransformed.containsKey(sideOutput)) { return alreadyTransformed.get(sideOutput); @@ -508,7 +508,6 @@ public class StreamGraphGenerator { ); } - if (sink.getStateKeySelector() != null) { TypeSerializer<?> keySerializer = sink.getStateKeyType().createSerializer(env.getConfig()); streamGraph.setOneInputStateKey(sink.getId(), sink.getStateKeySelector(), keySerializer); @@ -592,7 +591,6 @@ public class StreamGraphGenerator { streamGraph.setTwoInputStateKey(transform.getId(), transform.getStateKeySelector1(), transform.getStateKeySelector2(), keySerializer); } - streamGraph.setParallelism(transform.getId(), transform.getParallelism()); streamGraph.setMaxParallelism(transform.getId(), transform.getMaxParallelism()); http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphHasher.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphHasher.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphHasher.java index dd14d50..866fd1f 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphHasher.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphHasher.java @@ -18,9 +18,10 @@ package org.apache.flink.streaming.api.graph; -import java.util.Map; import org.apache.flink.runtime.jobgraph.JobVertexID; +import java.util.Map; + /** * Interface for different implementations of generating hashes over a stream graph. */ http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphHasherV2.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphHasherV2.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphHasherV2.java index 7c2416e..bb9e47b 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphHasherV2.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphHasherV2.java @@ -18,11 +18,19 @@ package org.apache.flink.streaming.api.graph; -import static org.apache.flink.util.StringUtils.byteToHexString; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator; +import org.apache.flink.streaming.api.operators.ChainingStrategy; +import org.apache.flink.streaming.api.operators.StreamOperator; +import org.apache.flink.streaming.api.transformations.StreamTransformation; +import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner; import com.google.common.hash.HashFunction; import com.google.common.hash.Hasher; import com.google.common.hash.Hashing; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.nio.charset.Charset; import java.util.ArrayDeque; import java.util.ArrayList; @@ -34,14 +42,8 @@ import java.util.List; import java.util.Map; import java.util.Queue; import java.util.Set; -import org.apache.flink.runtime.jobgraph.JobVertexID; -import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator; -import org.apache.flink.streaming.api.operators.ChainingStrategy; -import org.apache.flink.streaming.api.operators.StreamOperator; -import org.apache.flink.streaming.api.transformations.StreamTransformation; -import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; + +import static org.apache.flink.util.StringUtils.byteToHexString; /** * StreamGraphHasher from Flink 1.2. This contains duplicated code to ensure that the algorithm does not change with http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphUserHashHasher.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphUserHashHasher.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphUserHashHasher.java index 8a8c8b0..f17793e 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphUserHashHasher.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphUserHashHasher.java @@ -18,9 +18,10 @@ package org.apache.flink.streaming.api.graph; +import org.apache.flink.util.StringUtils; + import java.util.HashMap; import java.util.Map; -import org.apache.flink.util.StringUtils; /** * StreamGraphHasher that works with user provided hashes. This is useful in case we want to set (alternative) hashes http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java index 5c421aa..78ab877 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java @@ -17,9 +17,6 @@ package org.apache.flink.streaming.api.graph; -import java.io.Serializable; -import java.util.ArrayList; -import java.util.List; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.io.InputFormat; @@ -31,6 +28,10 @@ import org.apache.flink.streaming.api.collector.selector.OutputSelector; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.operators.StreamOperator; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; + /** * Class representing the operators in the streaming programs, with all their properties. */ http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java index 6d1af72..3008a43 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java @@ -17,17 +17,6 @@ package org.apache.flink.streaming.api.graph; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import org.apache.commons.lang3.StringUtils; - import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.cache.DistributedCache; import org.apache.flink.api.common.functions.Function; @@ -65,12 +54,23 @@ import org.apache.flink.streaming.runtime.partitioner.RescalePartitioner; import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner; import org.apache.flink.streaming.runtime.tasks.StreamIterationHead; import org.apache.flink.streaming.runtime.tasks.StreamIterationTail; - import org.apache.flink.util.FlinkRuntimeException; import org.apache.flink.util.SerializedValue; + +import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; + /** * The StreamingJobGraphGenerator converts a {@link StreamGraph} into a {@link JobGraph}. */ @@ -434,7 +434,6 @@ public class StreamingJobGraphGenerator { } } - config.setStreamOperator(vertex.getOperator()); config.setOutputSelectors(vertex.getOutputSelectors()); http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java index 057df2b..d711518 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java @@ -18,15 +18,6 @@ package org.apache.flink.streaming.api.operators; -import static org.apache.flink.util.Preconditions.checkArgument; - -import java.io.IOException; -import java.io.Serializable; -import java.util.Collection; -import java.util.ConcurrentModificationException; -import java.util.HashMap; -import java.util.Map; -import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.ExecutionConfig; @@ -73,9 +64,20 @@ import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles; import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; import org.apache.flink.streaming.runtime.tasks.StreamTask; import org.apache.flink.util.OutputTag; + +import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.io.Serializable; +import java.util.Collection; +import java.util.ConcurrentModificationException; +import java.util.HashMap; +import java.util.Map; + +import static org.apache.flink.util.Preconditions.checkArgument; + /** * Base class for all stream operators. Operators that contain a user function should extend the class * {@link AbstractUdfStreamOperator} instead (which is a specialized subclass of this class). @@ -609,7 +611,6 @@ public abstract class AbstractStreamOperator<OUT> return getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, stateDescriptor); } - protected <N, S extends State, T> S getOrCreateKeyedState( TypeSerializer<N> namespaceSerializer, StateDescriptor<S, T> stateDescriptor) throws Exception { @@ -635,7 +636,6 @@ public abstract class AbstractStreamOperator<OUT> TypeSerializer<N> namespaceSerializer, StateDescriptor<S, ?> stateDescriptor) throws Exception { - /* TODO: NOTE: This method does a lot of work caching / retrieving states just to update the namespace. This method should be removed for the sake of namespaces being lazily fetched from the keyed @@ -731,7 +731,6 @@ public abstract class AbstractStreamOperator<OUT> reportOrForwardLatencyMarker(latencyMarker); } - protected void reportOrForwardLatencyMarker(LatencyMarker marker) { // all operators are tracking latencies this.latencyGauge.reportLatency(marker, false); http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java index 14857de..efbebf4 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java @@ -18,9 +18,6 @@ package org.apache.flink.streaming.api.operators; -import static java.util.Objects.requireNonNull; - -import java.io.Serializable; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.functions.Function; @@ -43,6 +40,10 @@ import org.apache.flink.streaming.util.functions.StreamingFunctionUtils; import org.apache.flink.util.InstantiationUtil; import org.apache.flink.util.Migration; +import java.io.Serializable; + +import static java.util.Objects.requireNonNull; + /** * This is used as the base class for operators that have a user-defined * function. This class handles the opening and closing of the user-defined functions, @@ -85,7 +86,6 @@ public abstract class AbstractUdfStreamOperator<OUT, F extends Function> // operator life cycle // ------------------------------------------------------------------------ - @Override public void setup(StreamTask<?, ?> containingTask, StreamConfig config, Output<StreamRecord<OUT>> output) { super.setup(containingTask, config, output); http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapInternalTimerService.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapInternalTimerService.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapInternalTimerService.java index de3a7d2..1b531aa 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapInternalTimerService.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapInternalTimerService.java @@ -15,16 +15,9 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.flink.streaming.api.operators; -import static org.apache.flink.util.Preconditions.checkArgument; -import static org.apache.flink.util.Preconditions.checkNotNull; +package org.apache.flink.streaming.api.operators; -import java.io.IOException; -import java.util.HashSet; -import java.util.PriorityQueue; -import java.util.Set; -import java.util.concurrent.ScheduledFuture; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.core.memory.DataInputViewStreamWrapper; @@ -36,6 +29,15 @@ import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; import org.apache.flink.util.InstantiationUtil; import org.apache.flink.util.Preconditions; +import java.io.IOException; +import java.util.HashSet; +import java.util.PriorityQueue; +import java.util.Set; +import java.util.concurrent.ScheduledFuture; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + /** * {@link InternalTimerService} that stores timers on the Java heap. */ http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManager.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManager.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManager.java index ff5164d..17af3aa 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManager.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManager.java @@ -18,9 +18,6 @@ package org.apache.flink.streaming.api.operators; -import java.io.IOException; -import java.util.HashMap; -import java.util.Map; import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.typeutils.TypeSerializer; @@ -32,6 +29,10 @@ import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; import org.apache.flink.util.Preconditions; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + /** * An entity keeping all the time-related services available to all operators extending the * {@link AbstractStreamOperator}. Right now, this is only a http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimer.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimer.java index 1455712..4b86574 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimer.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimer.java @@ -15,9 +15,9 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.streaming.api.operators; -import java.io.IOException; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.typeutils.CompatibilityResult; import org.apache.flink.api.common.typeutils.TypeSerializer; @@ -26,6 +26,8 @@ import org.apache.flink.api.common.typeutils.base.LongSerializer; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; +import java.io.IOException; + /** * Internal class for keeping track of in-flight timers. * http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerService.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerService.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerService.java index 805f9d4..f55cb03 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerService.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerService.java @@ -15,6 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.streaming.api.operators; import org.apache.flink.annotation.Internal; http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/KeyContext.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/KeyContext.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/KeyContext.java index e0fd493..2160f1e 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/KeyContext.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/KeyContext.java @@ -15,6 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.streaming.api.operators; /** http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/KeyedProcessOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/KeyedProcessOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/KeyedProcessOperator.java index db4b183..a46897c 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/KeyedProcessOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/KeyedProcessOperator.java @@ -17,9 +17,6 @@ package org.apache.flink.streaming.api.operators; -import static org.apache.flink.util.Preconditions.checkNotNull; -import static org.apache.flink.util.Preconditions.checkState; - import org.apache.flink.annotation.Internal; import org.apache.flink.runtime.state.VoidNamespace; import org.apache.flink.runtime.state.VoidNamespaceSerializer; @@ -30,6 +27,9 @@ import org.apache.flink.streaming.api.functions.ProcessFunction; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.util.OutputTag; +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + /** * A {@link org.apache.flink.streaming.api.operators.StreamOperator} for executing keyed * {@link ProcessFunction ProcessFunctions}. http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OnWatermarkCallback.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OnWatermarkCallback.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OnWatermarkCallback.java index e78288d..bc317a9 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OnWatermarkCallback.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OnWatermarkCallback.java @@ -18,10 +18,11 @@ package org.apache.flink.streaming.api.operators; -import java.io.IOException; import org.apache.flink.annotation.Internal; import org.apache.flink.streaming.api.watermark.Watermark; +import java.io.IOException; + /** * A callback registered with the {@link InternalWatermarkCallbackService} service. This callback will * be invoked for all keys registered with the service, upon reception of a watermark. http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java index eb743dd..8aa76a5 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java @@ -18,12 +18,13 @@ package org.apache.flink.streaming.api.operators; -import java.util.concurrent.RunnableFuture; import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.runtime.state.OperatorStateHandle; import org.apache.flink.runtime.state.StateUtil; import org.apache.flink.util.ExceptionUtils; +import java.util.concurrent.RunnableFuture; + /** * Result of {@link StreamOperator#snapshotState}. */
