This is an automated email from the ASF dual-hosted git repository. kkloudas pushed a commit to branch release-1.8 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 4340811e9ecb53a7c451f321775939c9ae42c48c Author: Andrey Zagrebin <[email protected]> AuthorDate: Tue Feb 26 15:49:42 2019 +0100 [FLINK-9003][hotfix] Code cleanup. --- .../flink/streaming/tests/DataStreamAllroundTestJobFactory.java | 9 +++++---- .../flink/streaming/tests/DataStreamAllroundTestProgram.java | 5 +++-- .../org/apache/flink/streaming/tests/SemanticsCheckMapper.java | 2 +- .../apache/flink/streaming/tests/SequenceGeneratorSource.java | 7 +------ .../tests/artificialstate/StatefulComplexPayloadSerializer.java | 8 -------- .../tests/artificialstate/builder/ArtificialStateBuilder.java | 4 ++-- 6 files changed, 12 insertions(+), 23 deletions(-) diff --git a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java index 4f69cdb..06325ad 100644 --- a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java +++ b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java @@ -30,6 +30,7 @@ import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ConfigOptions; import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; +import org.apache.flink.runtime.state.StateBackend; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.TimeCharacteristic; @@ -268,13 +269,13 @@ public class DataStreamAllroundTestJobFactory { STATE_BACKEND_FILE_ASYNC.key(), STATE_BACKEND_FILE_ASYNC.defaultValue()); - env.setStateBackend(new FsStateBackend(checkpointDir, asyncCheckpoints)); + env.setStateBackend((StateBackend) new FsStateBackend(checkpointDir, asyncCheckpoints)); } else if ("rocks".equalsIgnoreCase(stateBackend)) { boolean incrementalCheckpoints = pt.getBoolean( STATE_BACKEND_ROCKS_INCREMENTAL.key(), STATE_BACKEND_ROCKS_INCREMENTAL.defaultValue()); - env.setStateBackend(new RocksDBStateBackend(checkpointDir, incrementalCheckpoints)); + env.setStateBackend((StateBackend) new RocksDBStateBackend(checkpointDir, incrementalCheckpoints)); } else { throw new IllegalArgumentException("Unknown backend requested: " + stateBackend); } @@ -441,7 +442,7 @@ public class DataStreamAllroundTestJobFactory { return new ArtificalOperatorStateMapper<>(mapFunction); } - static <IN, STATE> ArtificialStateBuilder<IN> createValueStateBuilder( + private static <IN, STATE> ArtificialStateBuilder<IN> createValueStateBuilder( JoinFunction<IN, STATE, STATE> inputAndOldStateToNewState, ValueStateDescriptor<STATE> valueStateDescriptor) { @@ -451,7 +452,7 @@ public class DataStreamAllroundTestJobFactory { valueStateDescriptor); } - static <IN, STATE> ArtificialStateBuilder<IN> createListStateBuilder( + private static <IN, STATE> ArtificialStateBuilder<IN> createListStateBuilder( JoinFunction<IN, STATE, STATE> inputAndOldStateToNewState, ListStateDescriptor<STATE> listStateDescriptor) { diff --git a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java index bb23310..e282a1e 100644 --- a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java +++ b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java @@ -141,10 +141,11 @@ public class DataStreamAllroundTestProgram { // apply a tumbling window that simply passes forward window elements; // this allows the job to cover timers state + @SuppressWarnings("Convert2Lambda") DataStream<Event> eventStream3 = applyTumblingWindows(eventStream2.keyBy(Event::getKey), pt) .apply(new WindowFunction<Event, Event, Integer, TimeWindow>() { @Override - public void apply(Integer integer, TimeWindow window, Iterable<Event> input, Collector<Event> out) throws Exception { + public void apply(Integer integer, TimeWindow window, Iterable<Event> input, Collector<Event> out) { for (Event e : input) { out.collect(e); } @@ -175,7 +176,7 @@ public class DataStreamAllroundTestProgram { @Override public void apply( Integer key, TimeWindow window, Iterable<Event> input, - Collector<Tuple2<Integer, List<Event>>> out) throws Exception { + Collector<Tuple2<Integer, List<Event>>> out) { out.collect(Tuple2.of(key, StreamSupport.stream(input.spliterator(), false).collect(Collectors.toList()))); } diff --git a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/SemanticsCheckMapper.java b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/SemanticsCheckMapper.java index 780e2ae..1fe5f61 100644 --- a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/SemanticsCheckMapper.java +++ b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/SemanticsCheckMapper.java @@ -60,7 +60,7 @@ public class SemanticsCheckMapper extends RichFlatMapFunction<Event, String> { } @Override - public void open(Configuration parameters) throws Exception { + public void open(Configuration parameters) { ValueStateDescriptor<Long> sequenceStateDescriptor = new ValueStateDescriptor<>("sequenceState", Long.class); diff --git a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/SequenceGeneratorSource.java b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/SequenceGeneratorSource.java index c6ecb3f..5dd09a9 100644 --- a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/SequenceGeneratorSource.java +++ b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/SequenceGeneratorSource.java @@ -28,9 +28,6 @@ import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunctio import org.apache.flink.util.Preconditions; import org.apache.flink.util.StringUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -43,8 +40,6 @@ public class SequenceGeneratorSource extends RichParallelSourceFunction<Event> i private static final long serialVersionUID = -3986989644799442178L; - private static final Logger LOG = LoggerFactory.getLogger(SequenceGeneratorSource.class); - /** Length of the artificial payload string generated for each event. */ private final int payloadLength; @@ -145,7 +140,7 @@ public class SequenceGeneratorSource extends RichParallelSourceFunction<Event> i } } - private void runIdle(SourceContext<Event> ctx) throws Exception { + private void runIdle(SourceContext<Event> ctx) { ctx.markAsTemporarilyIdle(); // just wait until this source is canceled diff --git a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/StatefulComplexPayloadSerializer.java b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/StatefulComplexPayloadSerializer.java index 8b04486..1ad7ae0 100644 --- a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/StatefulComplexPayloadSerializer.java +++ b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/StatefulComplexPayloadSerializer.java @@ -140,14 +140,6 @@ public class StatefulComplexPayloadSerializer extends TypeSerializer<ComplexPayl return new Snapshot(); } - private boolean isCompatibleSerializationFormatIdentifier(String identifier) { - return identifier.equals(getSerializationFormatIdentifier()); - } - - private String getSerializationFormatIdentifier() { - return getClass().getCanonicalName(); - } - // ---------------------------------------------------------------------------------------- /** diff --git a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/builder/ArtificialStateBuilder.java b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/builder/ArtificialStateBuilder.java index aed94ba..ac24f39 100644 --- a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/builder/ArtificialStateBuilder.java +++ b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/builder/ArtificialStateBuilder.java @@ -30,9 +30,9 @@ public abstract class ArtificialStateBuilder<T> implements Serializable { private static final long serialVersionUID = -5887676929924485788L; - protected final String stateName; + final String stateName; - public ArtificialStateBuilder(String stateName) { + ArtificialStateBuilder(String stateName) { this.stateName = stateName; }
