http://git-wip-us.apache.org/repos/asf/flink/blob/180cd3f6/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/CountTrigger.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/CountTrigger.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/CountTrigger.java index 3fcfb46..725cbf6 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/CountTrigger.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/CountTrigger.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -15,12 +15,12 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.streaming.api.windowing.triggers; -import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; -import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeutils.base.LongSerializer; import org.apache.flink.streaming.api.windowing.windows.Window; import java.io.IOException; @@ -35,8 +35,8 @@ public class CountTrigger<W extends Window> implements Trigger<Object, W> { private final long maxCount; - private final ValueStateDescriptor<Long> stateDesc = new ValueStateDescriptor<>("count", 0L, - BasicTypeInfo.LONG_TYPE_INFO.createSerializer(new ExecutionConfig())); + private final ValueStateDescriptor<Long> stateDesc = + new ValueStateDescriptor<>("count", LongSerializer.INSTANCE, 0L); private CountTrigger(long maxCount) {
http://git-wip-us.apache.org/repos/asf/flink/blob/180cd3f6/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/DeltaTrigger.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/DeltaTrigger.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/DeltaTrigger.java index 3135961..55c719a 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/DeltaTrigger.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/DeltaTrigger.java @@ -43,7 +43,7 @@ public class DeltaTrigger<T, W extends Window> implements Trigger<T, W> { private DeltaTrigger(double threshold, DeltaFunction<T> deltaFunction, TypeSerializer<T> stateSerializer) { this.deltaFunction = deltaFunction; this.threshold = threshold; - stateDesc = new ValueStateDescriptor<>("last-element", null, stateSerializer); + stateDesc = new ValueStateDescriptor<>("last-element", stateSerializer, null); } http://git-wip-us.apache.org/repos/asf/flink/blob/180cd3f6/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java index 5c71355..fb61064 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java @@ -179,7 +179,7 @@ public interface Trigger<T, W extends Window> extends Serializable { * @throws UnsupportedOperationException Thrown, if no partitioned state is available for the * function (function is not part os a KeyedStream). */ - <S extends State> S getPartitionedState(StateDescriptor<S> stateDescriptor); + <S extends State> S getPartitionedState(StateDescriptor<S, ?> stateDescriptor); /** * Retrieves a {@link ValueState} object that can be used to interact with http://git-wip-us.apache.org/repos/asf/flink/blob/180cd3f6/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java index f163de1..41ec91a 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java @@ -55,13 +55,13 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends Window private final Evictor<? super IN, ? super W> evictor; - private final StateDescriptor<? extends ListState<StreamRecord<IN>>> windowStateDescriptor; + private final StateDescriptor<? extends ListState<StreamRecord<IN>>, ?> windowStateDescriptor; public EvictingWindowOperator(WindowAssigner<? super IN, W> windowAssigner, TypeSerializer<W> windowSerializer, KeySelector<IN, K> keySelector, TypeSerializer<K> keySerializer, - StateDescriptor<? extends ListState<StreamRecord<IN>>> windowStateDescriptor, + StateDescriptor<? extends ListState<StreamRecord<IN>>, ?> windowStateDescriptor, WindowFunction<Iterable<IN>, OUT, K, W> windowFunction, Trigger<? super IN, ? super W> trigger, Evictor<? super IN, ? super W> evictor) { @@ -161,7 +161,7 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends Window @Override @VisibleForTesting @SuppressWarnings("unchecked, rawtypes") - public StateDescriptor<? extends MergingState<IN, Iterable<IN>>> getStateDescriptor() { - return (StateDescriptor<? extends MergingState<IN, Iterable<IN>>>) windowStateDescriptor; + public StateDescriptor<? extends MergingState<IN, Iterable<IN>>, ?> getStateDescriptor() { + return (StateDescriptor<? extends MergingState<IN, Iterable<IN>>, ?>) windowStateDescriptor; } } http://git-wip-us.apache.org/repos/asf/flink/blob/180cd3f6/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java index 1b712d9..d7dbaf5 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java @@ -448,13 +448,14 @@ public class NonKeyedWindowOperator<IN, OUT, W extends Window> requireNonNull(name, "The name of the state must not be null"); requireNonNull(stateType, "The state type information must not be null"); - ValueStateDescriptor<S> stateDesc = new ValueStateDescriptor<>(name, defaultState, stateType.createSerializer(getExecutionConfig())); + ValueStateDescriptor<S> stateDesc = new ValueStateDescriptor<>( + name, stateType.createSerializer(getExecutionConfig()), defaultState); return getPartitionedState(stateDesc); } @Override @SuppressWarnings("rawtypes, unchecked") - public <S extends State> S getPartitionedState(final StateDescriptor<S> stateDescriptor) { + public <S extends State> S getPartitionedState(final StateDescriptor<S, ?> stateDescriptor) { if (!(stateDescriptor instanceof ValueStateDescriptor)) { throw new UnsupportedOperationException("NonKeyedWindowOperator Triggers only " + "support ValueState."); http://git-wip-us.apache.org/repos/asf/flink/blob/180cd3f6/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java index d562925..eccaeee 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java @@ -102,7 +102,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window> protected final Trigger<? super IN, ? super W> trigger; - protected final StateDescriptor<? extends MergingState<IN, ACC>> windowStateDescriptor; + protected final StateDescriptor<? extends MergingState<IN, ACC>, ?> windowStateDescriptor; /** * If this is true. The current processing time is set as the timestamp of incoming elements. @@ -167,7 +167,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window> TypeSerializer<W> windowSerializer, KeySelector<IN, K> keySelector, TypeSerializer<K> keySerializer, - StateDescriptor<? extends MergingState<IN, ACC>> windowStateDescriptor, + StateDescriptor<? extends MergingState<IN, ACC>, ?> windowStateDescriptor, WindowFunction<ACC, OUT, K, W> windowFunction, Trigger<? super IN, ? super W> trigger) { @@ -374,15 +374,14 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window> requireNonNull(name, "The name of the state must not be null"); requireNonNull(stateType, "The state type information must not be null"); - ValueStateDescriptor<S> stateDesc = new ValueStateDescriptor<>(name, defaultState, stateType.createSerializer(getExecutionConfig())); + ValueStateDescriptor<S> stateDesc = new ValueStateDescriptor<>(name, stateType.createSerializer(getExecutionConfig()), defaultState); return getPartitionedState(stateDesc); } @SuppressWarnings("unchecked") - public <S extends State> S getPartitionedState(StateDescriptor<S> stateDescriptor) { + public <S extends State> S getPartitionedState(StateDescriptor<S, ?> stateDescriptor) { try { - return WindowOperator.this.getPartitionedState(window, windowSerializer, - stateDescriptor); + return WindowOperator.this.getPartitionedState(window, windowSerializer, stateDescriptor); } catch (Exception e) { throw new RuntimeException("Could not retrieve state", e); } @@ -608,7 +607,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window> } @VisibleForTesting - public StateDescriptor<? extends MergingState<IN, ACC>> getStateDescriptor() { + public StateDescriptor<? extends MergingState<IN, ACC>, ?> getStateDescriptor() { return windowStateDescriptor; } } http://git-wip-us.apache.org/repos/asf/flink/blob/180cd3f6/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java new file mode 100644 index 0000000..72e02c2 --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.TaskInfo; +import org.apache.flink.api.common.accumulators.Accumulator; +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.ReducingStateDescriptor; +import org.apache.flink.api.common.state.StateDescriptor; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.execution.Environment; + +import org.junit.Test; + +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import java.util.Collections; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicReference; + +import static org.mockito.Mockito.*; +import static org.junit.Assert.*; + +public class StreamingRuntimeContextTest { + + @Test + public void testValueStateInstantiation() throws Exception { + + final ExecutionConfig config = new ExecutionConfig(); + config.registerKryoType(Path.class); + + final AtomicReference<Object> descriptorCapture = new AtomicReference<>(); + + StreamingRuntimeContext context = new StreamingRuntimeContext( + createMockOp(descriptorCapture, config), + createMockEnvironment(), + Collections.<String, Accumulator<?, ?>>emptyMap()); + + ValueStateDescriptor<TaskInfo> descr = new ValueStateDescriptor<>("name", TaskInfo.class, null); + context.getState(descr); + + StateDescriptor<?, ?> descrIntercepted = (StateDescriptor<?, ?>) descriptorCapture.get(); + TypeSerializer<?> serializer = descrIntercepted.getSerializer(); + + // check that the Path class is really registered, i.e., the execution config was applied + assertTrue(serializer instanceof KryoSerializer); + assertTrue(((KryoSerializer<?>) serializer).getKryo().getRegistration(Path.class).getId() > 0); + } + + @Test + public void testReduceingStateInstantiation() throws Exception { + + final ExecutionConfig config = new ExecutionConfig(); + config.registerKryoType(Path.class); + + final AtomicReference<Object> descriptorCapture = new AtomicReference<>(); + + StreamingRuntimeContext context = new StreamingRuntimeContext( + createMockOp(descriptorCapture, config), + createMockEnvironment(), + Collections.<String, Accumulator<?, ?>>emptyMap()); + + @SuppressWarnings("unchecked") + ReduceFunction<TaskInfo> reducer = (ReduceFunction<TaskInfo>) mock(ReduceFunction.class); + + ReducingStateDescriptor<TaskInfo> descr = + new ReducingStateDescriptor<>("name", reducer, TaskInfo.class); + + context.getReducingState(descr); + + StateDescriptor<?, ?> descrIntercepted = (StateDescriptor<?, ?>) descriptorCapture.get(); + TypeSerializer<?> serializer = descrIntercepted.getSerializer(); + + // check that the Path class is really registered, i.e., the execution config was applied + assertTrue(serializer instanceof KryoSerializer); + assertTrue(((KryoSerializer<?>) serializer).getKryo().getRegistration(Path.class).getId() > 0); + } + + @Test + public void testListStateInstantiation() throws Exception { + + final ExecutionConfig config = new ExecutionConfig(); + config.registerKryoType(Path.class); + + final AtomicReference<Object> descriptorCapture = new AtomicReference<>(); + + StreamingRuntimeContext context = new StreamingRuntimeContext( + createMockOp(descriptorCapture, config), + createMockEnvironment(), + Collections.<String, Accumulator<?, ?>>emptyMap()); + + ListStateDescriptor<TaskInfo> descr = new ListStateDescriptor<>("name", TaskInfo.class); + context.getListState(descr); + + StateDescriptor<?, ?> descrIntercepted = (StateDescriptor<?, ?>) descriptorCapture.get(); + TypeSerializer<?> serializer = descrIntercepted.getSerializer(); + + // check that the Path class is really registered, i.e., the execution config was applied + assertTrue(serializer instanceof KryoSerializer); + assertTrue(((KryoSerializer<?>) serializer).getKryo().getRegistration(Path.class).getId() > 0); + } + + // ------------------------------------------------------------------------ + // + // ------------------------------------------------------------------------ + + @SuppressWarnings("unchecked") + private static AbstractStreamOperator<?> createMockOp( + final AtomicReference<Object> ref, final ExecutionConfig config) throws Exception { + + AbstractStreamOperator<?> operatorMock = mock(AbstractStreamOperator.class); + when(operatorMock.getExecutionConfig()).thenReturn(config); + + when(operatorMock.getPartitionedState(any(StateDescriptor.class))).thenAnswer( + new Answer<Object>() { + + @Override + public Object answer(InvocationOnMock invocationOnMock) throws Throwable { + ref.set(invocationOnMock.getArguments()[0]); + return null; + } + }); + + return operatorMock; + } + + private static Environment createMockEnvironment() { + Environment env = mock(Environment.class); + when(env.getUserClassLoader()).thenReturn(StreamingRuntimeContextTest.class.getClassLoader()); + when(env.getDistributedCacheEntries()).thenReturn(Collections.<String, Future<Path>>emptyMap()); + when(env.getTaskInfo()).thenReturn(new TaskInfo("test task", 0, 1, 1)); + return env; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/180cd3f6/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java index f2f8c5a..b6e51c6 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java @@ -763,7 +763,7 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest { public void open(Configuration parameters) { assertNotNull(getRuntimeContext()); state = getRuntimeContext().getState( - new ValueStateDescriptor<>("totalCount", 0, IntSerializer.INSTANCE)); + new ValueStateDescriptor<>("totalCount", Integer.class, 0)); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/180cd3f6/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java index e88f6de..3429215 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java @@ -943,8 +943,7 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest { // start with one, so the final count is correct and we test that we do not // initialize with 0 always by default - state = getRuntimeContext().getState( - new ValueStateDescriptor<>("totalCount", 1, IntSerializer.INSTANCE)); + state = getRuntimeContext().getState(new ValueStateDescriptor<>("totalCount", Integer.class, 1)); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/180cd3f6/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/StatefulFunction.scala ---------------------------------------------------------------------- diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/StatefulFunction.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/StatefulFunction.scala index 5e1d1db..ff6aca4 100644 --- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/StatefulFunction.scala +++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/StatefulFunction.scala @@ -45,7 +45,7 @@ trait StatefulFunction[I, O, S] extends RichFunction { } override def open(c: Configuration) = { - val info = new ValueStateDescriptor[S]("state", null.asInstanceOf[S], stateSerializer) + val info = new ValueStateDescriptor[S]("state", stateSerializer, null.asInstanceOf[S]) state = getRuntimeContext().getState(info) } } http://git-wip-us.apache.org/repos/asf/flink/blob/180cd3f6/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java index 19ff090..2039528 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java @@ -21,7 +21,6 @@ package org.apache.flink.test.checkpointing; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; -import org.apache.flink.api.common.typeutils.base.IntSerializer; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple4; @@ -45,6 +44,7 @@ import org.apache.flink.test.util.ForkableFlinkMiniCluster; import org.apache.flink.test.util.SuccessException; import org.apache.flink.util.Collector; import org.apache.flink.util.TestLogger; + import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; @@ -224,7 +224,7 @@ public class EventTimeWindowCheckpointingITCase extends TestLogger { assertEquals(PARALLELISM, getRuntimeContext().getNumberOfParallelSubtasks()); open = true; count = getRuntimeContext().getState( - new ValueStateDescriptor<>("count", 0, IntSerializer.INSTANCE)); + new ValueStateDescriptor<>("count", Integer.class, 0)); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/180cd3f6/flink-tests/src/test/java/org/apache/flink/test/checkpointing/PartitionedStateCheckpointingITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/PartitionedStateCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/PartitionedStateCheckpointingITCase.java index a2d6c24..0728b41 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/PartitionedStateCheckpointingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/PartitionedStateCheckpointingITCase.java @@ -27,14 +27,11 @@ import java.util.Map.Entry; import java.util.Random; import java.util.concurrent.ConcurrentHashMap; -import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; -import org.apache.flink.api.common.typeutils.base.LongSerializer; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.checkpoint.Checkpointed; import org.apache.flink.streaming.api.datastream.DataStream; @@ -173,7 +170,7 @@ public class PartitionedStateCheckpointingITCase extends StreamFaultToleranceTes failurePos = (new Random().nextLong() % (failurePosMax - failurePosMin)) + failurePosMin; count = 0; sum = getRuntimeContext().getState( - new ValueStateDescriptor<>("my_state", 0L, LongSerializer.INSTANCE)); + new ValueStateDescriptor<>("my_state", Long.class, 0L)); } @Override @@ -202,11 +199,10 @@ public class PartitionedStateCheckpointingITCase extends StreamFaultToleranceTes public void open(Configuration parameters) throws IOException { aCounts = getRuntimeContext().getState( - new ValueStateDescriptor<>("a", NonSerializableLong.of(0L), - new KryoSerializer<>(NonSerializableLong.class, new ExecutionConfig()))); + new ValueStateDescriptor<>("a", NonSerializableLong.class, NonSerializableLong.of(0L))); bCounts = getRuntimeContext().getState( - new ValueStateDescriptor<>("b", 0L, LongSerializer.INSTANCE)); + new ValueStateDescriptor<>("b", Long.class, 0L)); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/180cd3f6/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java index d8131c7..27c0f80 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java @@ -22,7 +22,6 @@ import org.apache.flink.api.common.functions.RichFilterFunction; import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; -import org.apache.flink.api.common.typeutils.base.LongSerializer; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.checkpoint.Checkpointed; import org.apache.flink.streaming.api.datastream.DataStream; @@ -255,8 +254,7 @@ public class StreamCheckpointingITCase extends StreamFaultToleranceTestBase { failurePos = (new Random().nextLong() % (failurePosMax - failurePosMin)) + failurePosMin; count = 0; - pCount = getRuntimeContext().getState( - new ValueStateDescriptor<>("pCount", 0L, LongSerializer.INSTANCE)); + pCount = getRuntimeContext().getState(new ValueStateDescriptor<>("pCount", Long.class, 0L)); } @Override
