[hotfix] Remove outdated class OperatorStateHandles and replace it with OperatorSubtaskState
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/617e67c2 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/617e67c2 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/617e67c2 Branch: refs/heads/master Commit: 617e67c2f540b62b97ca5198b6a5c42b89b6f392 Parents: 7b5c53f Author: Stefan Richter <[email protected]> Authored: Fri Feb 23 10:36:09 2018 +0100 Committer: Stefan Richter <[email protected]> Committed: Sun Feb 25 15:10:28 2018 +0100 ---------------------------------------------------------------------- .../connectors/fs/RollingSinkITCase.java | 8 +- .../bucketing/BucketingSinkMigrationTest.java | 4 +- .../fs/bucketing/BucketingSinkTest.java | 10 +-- .../kafka/FlinkKafkaProducer011ITCase.java | 21 +++-- .../FlinkKafkaConsumerBaseMigrationTest.java | 4 +- .../kafka/FlinkKafkaConsumerBaseTest.java | 6 +- .../FlinkKinesisConsumerMigrationTest.java | 4 +- .../connectors/rabbitmq/RMQSourceTest.java | 4 +- .../org/apache/flink/util/FileUtilsTest.java | 2 + .../ContinuousFileProcessingMigrationTest.java | 6 +- .../hdfstests/ContinuousFileProcessingTest.java | 6 +- .../flink/cep/operator/CEPMigrationTest.java | 14 ++-- .../flink/cep/operator/CEPOperatorTest.java | 26 +++---- .../flink/cep/operator/CEPRescalingTest.java | 6 +- .../runtime/tasks/OperatorStateHandles.java | 81 -------------------- .../api/checkpoint/ListCheckpointedTest.java | 8 +- .../api/functions/FromElementsFunctionTest.java | 4 +- .../functions/StatefulSequenceSourceTest.java | 4 +- .../sink/TwoPhaseCommitSinkFunctionTest.java | 8 +- .../operators/AbstractStreamOperatorTest.java | 8 +- .../api/operators/KeyedProcessOperatorTest.java | 4 +- .../StreamOperatorSnapshotRestoreTest.java | 4 +- .../WrappingFunctionSnapshotRestoreTest.java | 6 +- .../operators/async/AsyncWaitOperatorTest.java | 4 +- .../co/CoBroadcastWithKeyedOperatorTest.java | 8 +- .../co/CoBroadcastWithNonKeyedOperatorTest.java | 10 +-- .../co/KeyedCoProcessOperatorTest.java | 4 +- .../ContinuousFileProcessingRescalingTest.java | 6 +- .../operators/WriteAheadSinkTestBase.java | 12 +-- .../windowing/WindowOperatorContractTest.java | 4 +- .../windowing/WindowOperatorMigrationTest.java | 16 ++-- .../operators/windowing/WindowOperatorTest.java | 22 +++--- .../util/AbstractStreamOperatorTestHarness.java | 35 ++++----- .../streaming/util/OperatorSnapshotUtil.java | 36 ++++----- .../PojoSerializerUpgradeTest.java | 8 +- 35 files changed, 164 insertions(+), 249 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/617e67c2/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java index e1124e4..1875792 100644 --- a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java +++ b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java @@ -23,13 +23,13 @@ import org.apache.flink.api.common.functions.RichFilterFunction; import org.apache.flink.api.common.functions.RichFlatMapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.core.testutils.MultiShotLatch; +import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.operators.StreamSink; import org.apache.flink.streaming.connectors.fs.AvroKeyValueSinkWriter.AvroKeyValue; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles; import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; import org.apache.flink.test.util.MiniClusterResource; @@ -673,7 +673,7 @@ public class RollingSinkITCase extends TestLogger { testHarness.notifyOfCompletedCheckpoint(0); checkFs(outDir, 1, 0, 2, 0); - OperatorStateHandles snapshot = testHarness.snapshot(1, 0); + OperatorSubtaskState snapshot = testHarness.snapshot(1, 0); testHarness.close(); checkFs(outDir, 0, 1, 2, 0); @@ -735,7 +735,7 @@ public class RollingSinkITCase extends TestLogger { checkFs(outDir, 3, 5, 0, 0); // intentionally we snapshot them in a not ascending order so that the states are shuffled - OperatorStateHandles mergedSnapshot = AbstractStreamOperatorTestHarness.repackageState( + OperatorSubtaskState mergedSnapshot = AbstractStreamOperatorTestHarness.repackageState( testHarness3.snapshot(0, 0), testHarness1.snapshot(0, 0), testHarness2.snapshot(0, 0) @@ -786,7 +786,7 @@ public class RollingSinkITCase extends TestLogger { checkFs(outDir, 2, 3, 0, 0); // intentionally we snapshot them in the reverse order so that the states are shuffled - OperatorStateHandles mergedSnapshot = AbstractStreamOperatorTestHarness.repackageState( + OperatorSubtaskState mergedSnapshot = AbstractStreamOperatorTestHarness.repackageState( testHarness2.snapshot(0, 0), testHarness1.snapshot(0, 0) ); http://git-wip-us.apache.org/repos/asf/flink/blob/617e67c2/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkMigrationTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkMigrationTest.java b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkMigrationTest.java index 3b7c7d4..39e8d7d 100644 --- a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkMigrationTest.java +++ b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkMigrationTest.java @@ -21,11 +21,11 @@ package org.apache.flink.streaming.connectors.fs.bucketing; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.OperatorStateStore; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.streaming.api.operators.StreamSink; import org.apache.flink.streaming.connectors.fs.StringWriter; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; import org.apache.flink.streaming.util.OperatorSnapshotUtil; import org.apache.flink.streaming.util.migration.MigrationTestUtil; @@ -135,7 +135,7 @@ public class BucketingSinkMigrationTest { checkFs(outDir, 1, 4, 0, 0); - OperatorStateHandles snapshot = testHarness.snapshot(0L, 0L); + OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L); OperatorSnapshotUtil.writeStateHandle(snapshot, "src/test/resources/bucketing-sink-migration-test-flink" + flinkGenerateSavepointVersion + "-snapshot"); testHarness.close(); http://git-wip-us.apache.org/repos/asf/flink/blob/617e67c2/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java index 6fa0713..39af139 100644 --- a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java +++ b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java @@ -22,6 +22,7 @@ import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; import org.apache.flink.streaming.api.operators.StreamSink; import org.apache.flink.streaming.connectors.fs.AvroKeyValueSinkWriter; import org.apache.flink.streaming.connectors.fs.Clock; @@ -29,7 +30,6 @@ import org.apache.flink.streaming.connectors.fs.SequenceFileWriter; import org.apache.flink.streaming.connectors.fs.StringWriter; import org.apache.flink.streaming.connectors.fs.Writer; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles; import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; import org.apache.flink.util.NetUtils; @@ -237,7 +237,7 @@ public class BucketingSinkTest extends TestLogger { testHarness.notifyOfCompletedCheckpoint(0); checkFs(outDir, 1, 0, 2, 0); - OperatorStateHandles snapshot = testHarness.snapshot(1, 0); + OperatorSubtaskState snapshot = testHarness.snapshot(1, 0); testHarness.close(); checkFs(outDir, 0, 1, 2, 0); @@ -287,7 +287,7 @@ public class BucketingSinkTest extends TestLogger { checkFs(outDir, 2, 0, 0, 0); // intentionally we snapshot them in the reverse order so that the states are shuffled - OperatorStateHandles mergedSnapshot = AbstractStreamOperatorTestHarness.repackageState( + OperatorSubtaskState mergedSnapshot = AbstractStreamOperatorTestHarness.repackageState( testHarness2.snapshot(0, 0), testHarness1.snapshot(0, 0) ); @@ -348,7 +348,7 @@ public class BucketingSinkTest extends TestLogger { checkFs(outDir, 4, 0, 0, 0); // intentionally we snapshot them in the reverse order so that the states are shuffled - OperatorStateHandles mergedSnapshot = AbstractStreamOperatorTestHarness.repackageState( + OperatorSubtaskState mergedSnapshot = AbstractStreamOperatorTestHarness.repackageState( testHarness3.snapshot(0, 0), testHarness1.snapshot(0, 0), testHarness2.snapshot(0, 0) @@ -393,7 +393,7 @@ public class BucketingSinkTest extends TestLogger { checkFs(outDir, 5, 0, 0, 0); // intentionally we snapshot them in the reverse order so that the states are shuffled - OperatorStateHandles mergedSnapshot = AbstractStreamOperatorTestHarness.repackageState( + OperatorSubtaskState mergedSnapshot = AbstractStreamOperatorTestHarness.repackageState( testHarness2.snapshot(0, 0), testHarness1.snapshot(0, 0) ); http://git-wip-us.apache.org/repos/asf/flink/blob/617e67c2/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java index ab3a2e2..e76cf13 100644 --- a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java +++ b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java @@ -22,9 +22,9 @@ import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.serialization.TypeInformationSerializationSchema; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; import org.apache.flink.runtime.state.OperatorStateHandle; import org.apache.flink.streaming.api.operators.StreamSink; -import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema; import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper; @@ -87,7 +87,7 @@ public class FlinkKafkaProducer011ITCase extends KafkaTestBase { testHarness1.setup(); testHarness1.open(); testHarness1.processElement(42, 0); - OperatorStateHandles snapshot = testHarness1.snapshot(0, 0); + OperatorSubtaskState snapshot = testHarness1.snapshot(0, 0); testHarness1.processElement(43, 0); testHarness1.notifyOfCompletedCheckpoint(0); try { @@ -134,7 +134,7 @@ public class FlinkKafkaProducer011ITCase extends KafkaTestBase { testHarness.processElement(42, 0); testHarness.snapshot(0, 1); testHarness.processElement(43, 2); - OperatorStateHandles snapshot = testHarness.snapshot(1, 3); + OperatorSubtaskState snapshot = testHarness.snapshot(1, 3); int leaderId = kafkaServer.getLeaderToShutDown(topic); failBroker(leaderId); @@ -188,7 +188,7 @@ public class FlinkKafkaProducer011ITCase extends KafkaTestBase { testHarness1.snapshot(0, 1); testHarness1.processElement(43, 2); int transactionCoordinatorId = kafkaProducer.getTransactionCoordinatorId(); - OperatorStateHandles snapshot = testHarness1.snapshot(1, 3); + OperatorSubtaskState snapshot = testHarness1.snapshot(1, 3); failBroker(transactionCoordinatorId); @@ -231,7 +231,7 @@ public class FlinkKafkaProducer011ITCase extends KafkaTestBase { testHarness.processElement(42, 0); testHarness.snapshot(0, 1); testHarness.processElement(43, 2); - OperatorStateHandles snapshot1 = testHarness.snapshot(1, 3); + OperatorSubtaskState snapshot1 = testHarness.snapshot(1, 3); testHarness.processElement(44, 4); testHarness.snapshot(2, 5); @@ -266,7 +266,7 @@ public class FlinkKafkaProducer011ITCase extends KafkaTestBase { public void testFailAndRecoverSameCheckpointTwice() throws Exception { String topic = "flink-kafka-producer-fail-and-recover-same-checkpoint-twice"; - OperatorStateHandles snapshot1; + OperatorSubtaskState snapshot1; try (OneInputStreamOperatorTestHarness<Integer, Object> testHarness = createTestHarness(topic)) { testHarness.setup(); testHarness.open(); @@ -447,18 +447,17 @@ public class FlinkKafkaProducer011ITCase extends KafkaTestBase { testHarness.setup(); - testHarness.initializeState(new OperatorStateHandles( - 0, + testHarness.initializeState(new OperatorSubtaskState( + inputStates, Collections.emptyList(), Collections.emptyList(), - inputStates, Collections.emptyList())); testHarness.open(); if (inputData.hasNext()) { int nextValue = inputData.next(); testHarness.processElement(nextValue, 0); - OperatorStateHandles snapshot = testHarness.snapshot(0, 0); + OperatorSubtaskState snapshot = testHarness.snapshot(0, 0); outputStates.addAll(snapshot.getManagedOperatorState()); checkState(snapshot.getRawOperatorState() == null, "Unexpected raw operator state"); @@ -488,7 +487,7 @@ public class FlinkKafkaProducer011ITCase extends KafkaTestBase { testHarness.setup(); testHarness.open(); // producerA - start transaction (txn) 0 testHarness.processElement(42, 0); // producerA - write 42 in txn 0 - OperatorStateHandles checkpoint0 = testHarness.snapshot(0, 1); // producerA - pre commit txn 0, producerB - start txn 1 + OperatorSubtaskState checkpoint0 = testHarness.snapshot(0, 1); // producerA - pre commit txn 0, producerB - start txn 1 testHarness.processElement(43, 2); // producerB - write 43 in txn 1 testHarness.notifyOfCompletedCheckpoint(0); // producerA - commit txn 0 and return to the pool testHarness.snapshot(1, 3); // producerB - pre txn 1, producerA - start txn 2 http://git-wip-us.apache.org/repos/asf/flink/blob/617e67c2/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java index 94097e9..d9a9e41 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java @@ -20,6 +20,7 @@ package org.apache.flink.streaming.connectors.kafka; import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks; @@ -33,7 +34,6 @@ import org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDi import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionStateSentinel; import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicsDescriptor; -import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles; import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness; import org.apache.flink.streaming.util.OperatorSnapshotUtil; import org.apache.flink.streaming.util.migration.MigrationTestUtil; @@ -169,7 +169,7 @@ public class FlinkKafkaConsumerBaseMigrationTest { latch.await(); } - final OperatorStateHandles snapshot; + final OperatorSubtaskState snapshot; synchronized (testHarness.getCheckpointLock()) { snapshot = testHarness.snapshot(0L, 0L); } http://git-wip-us.apache.org/repos/asf/flink/blob/617e67c2/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java index 403e627..263ed8e 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java @@ -31,6 +31,7 @@ import org.apache.flink.core.testutils.CheckedThread; import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; +import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; import org.apache.flink.runtime.memory.MemoryManager; import org.apache.flink.runtime.operators.testutils.MockEnvironment; import org.apache.flink.runtime.state.FunctionInitializationContext; @@ -51,7 +52,6 @@ import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicsDescriptor; import org.apache.flink.streaming.connectors.kafka.testutils.TestPartitionDiscoverer; import org.apache.flink.streaming.connectors.kafka.testutils.TestSourceContext; -import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles; import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService; import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness; import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; @@ -538,13 +538,13 @@ public class FlinkKafkaConsumerBaseTest { assertThat(globalSubscribedPartitions.values(), hasSize(numPartitions)); assertThat(mockFetchedPartitionsOnStartup, everyItem(isIn(globalSubscribedPartitions.keySet()))); - OperatorStateHandles[] state = new OperatorStateHandles[initialParallelism]; + OperatorSubtaskState[] state = new OperatorSubtaskState[initialParallelism]; for (int i = 0; i < initialParallelism; i++) { state[i] = testHarnesses[i].snapshot(0, 0); } - OperatorStateHandles mergedState = AbstractStreamOperatorTestHarness.repackageState(state); + OperatorSubtaskState mergedState = AbstractStreamOperatorTestHarness.repackageState(state); // ----------------------------------------------------------------------------------------- // restore http://git-wip-us.apache.org/repos/asf/flink/blob/617e67c2/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerMigrationTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerMigrationTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerMigrationTest.java index 75f584e..a99e845 100644 --- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerMigrationTest.java +++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerMigrationTest.java @@ -20,6 +20,7 @@ package org.apache.flink.streaming.connectors.kinesis; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.operators.StreamSource; @@ -35,7 +36,6 @@ import org.apache.flink.streaming.connectors.kinesis.testutils.KinesisShardIdGen import org.apache.flink.streaming.connectors.kinesis.testutils.TestRuntimeContext; import org.apache.flink.streaming.connectors.kinesis.testutils.TestSourceContext; import org.apache.flink.streaming.connectors.kinesis.testutils.TestUtils; -import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles; import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness; import org.apache.flink.streaming.util.OperatorSnapshotUtil; import org.apache.flink.streaming.util.migration.MigrationTestUtil; @@ -367,7 +367,7 @@ public class FlinkKinesisConsumerMigrationTest { fetcher.waitUntilRun(); - final OperatorStateHandles snapshot; + final OperatorSubtaskState snapshot; synchronized (testHarness.getCheckpointLock()) { snapshot = testHarness.snapshot(0L, 0L); } http://git-wip-us.apache.org/repos/asf/flink/blob/617e67c2/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java b/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java index bbf893f..01fea29 100644 --- a/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java +++ b/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java @@ -25,13 +25,13 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.operators.StreamSource; import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig; -import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles; import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness; import com.rabbitmq.client.AMQP; @@ -159,7 +159,7 @@ public class RMQSourceTest { for (int i = 0; i < numSnapshots; i++) { long snapshotId = random.nextLong(); - OperatorStateHandles data; + OperatorSubtaskState data; synchronized (DummySourceContext.lock) { data = testHarness.snapshot(snapshotId, System.currentTimeMillis()); http://git-wip-us.apache.org/repos/asf/flink/blob/617e67c2/flink-core/src/test/java/org/apache/flink/util/FileUtilsTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/util/FileUtilsTest.java b/flink-core/src/test/java/org/apache/flink/util/FileUtilsTest.java index 67f831a..4cd4814 100644 --- a/flink-core/src/test/java/org/apache/flink/util/FileUtilsTest.java +++ b/flink-core/src/test/java/org/apache/flink/util/FileUtilsTest.java @@ -22,6 +22,7 @@ import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.Path; import org.apache.flink.core.testutils.CheckedThread; +import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; @@ -143,6 +144,7 @@ public class FileUtilsTest { } } + @Ignore @Test public void testDeleteDirectoryConcurrently() throws Exception { final File parent = tmp.newFolder(); http://git-wip-us.apache.org/repos/asf/flink/blob/617e67c2/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingMigrationTest.java ---------------------------------------------------------------------- diff --git a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingMigrationTest.java b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingMigrationTest.java index 30a6119..d29ded5 100644 --- a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingMigrationTest.java +++ b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingMigrationTest.java @@ -27,6 +27,7 @@ import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.core.fs.FileInputSplit; import org.apache.flink.core.fs.Path; import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction; import org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator; @@ -36,7 +37,6 @@ import org.apache.flink.streaming.api.functions.source.TimestampedFileInputSplit import org.apache.flink.streaming.api.operators.StreamSource; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles; import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; import org.apache.flink.streaming.util.OperatorSnapshotUtil; @@ -146,7 +146,7 @@ public class ContinuousFileProcessingMigrationTest { // to initialize another reader and compare the results of the // two operators. - final OperatorStateHandles snapshot; + final OperatorSubtaskState snapshot; synchronized (testHarness.getCheckpointLock()) { snapshot = testHarness.snapshot(0L, 0L); } @@ -270,7 +270,7 @@ public class ContinuousFileProcessingMigrationTest { latch.await(); } - final OperatorStateHandles snapshot; + final OperatorSubtaskState snapshot; synchronized (testHarness.getCheckpointLock()) { snapshot = testHarness.snapshot(0L, 0L); } http://git-wip-us.apache.org/repos/asf/flink/blob/617e67c2/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java ---------------------------------------------------------------------- diff --git a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java index 5d5a1c3..d880f4f 100644 --- a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java +++ b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java @@ -30,6 +30,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.FileInputSplit; import org.apache.flink.core.fs.Path; import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction; import org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator; @@ -39,7 +40,6 @@ import org.apache.flink.streaming.api.functions.source.TimestampedFileInputSplit import org.apache.flink.streaming.api.operators.StreamSource; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles; import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; import org.apache.flink.util.OperatingSystem; @@ -438,7 +438,7 @@ public class ContinuousFileProcessingTest { // to initialize another reader and compare the results of the // two operators. - final OperatorStateHandles snapshot; + final OperatorSubtaskState snapshot; synchronized (initTestInstance.getCheckpointLock()) { snapshot = initTestInstance.snapshot(0L, 0L); } @@ -816,7 +816,7 @@ public class ContinuousFileProcessingTest { // this means it has processed all the splits and updated its state. synchronized (sourceContext.getCheckpointLock()) {} - OperatorStateHandles snapshot = testHarness.snapshot(0, 0); + OperatorSubtaskState snapshot = testHarness.snapshot(0, 0); monitoringFunction.cancel(); runner.join(); http://git-wip-us.apache.org/repos/asf/flink/blob/617e67c2/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigrationTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigrationTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigrationTest.java index d45c611..2fda47f 100644 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigrationTest.java +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigrationTest.java @@ -26,10 +26,10 @@ import org.apache.flink.cep.nfa.NFA; import org.apache.flink.cep.nfa.compiler.NFACompiler; import org.apache.flink.cep.pattern.Pattern; import org.apache.flink.cep.pattern.conditions.SimpleCondition; +import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles; import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; import org.apache.flink.streaming.util.OperatorSnapshotUtil; @@ -118,7 +118,7 @@ public class CEPMigrationTest { harness.processWatermark(new Watermark(5)); // do snapshot and save to file - OperatorStateHandles snapshot = harness.snapshot(0L, 0L); + OperatorSubtaskState snapshot = harness.snapshot(0L, 0L); OperatorSnapshotUtil.writeStateHandle(snapshot, "src/test/resources/cep-migration-after-branching-flink" + flinkGenerateSavepointVersion + "-snapshot"); } finally { @@ -205,7 +205,7 @@ public class CEPMigrationTest { harness.processElement(new StreamRecord<Event>(middleEvent3, 23)); // simulate snapshot/restore with some elements in internal sorting queue - OperatorStateHandles snapshot = harness.snapshot(1L, 1L); + OperatorSubtaskState snapshot = harness.snapshot(1L, 1L); harness.close(); harness = new KeyedOneInputStreamOperatorTestHarness<>( @@ -279,7 +279,7 @@ public class CEPMigrationTest { harness.processWatermark(new Watermark(5)); // do snapshot and save to file - OperatorStateHandles snapshot = harness.snapshot(0L, 0L); + OperatorSubtaskState snapshot = harness.snapshot(0L, 0L); OperatorSnapshotUtil.writeStateHandle(snapshot, "src/test/resources/cep-migration-starting-new-pattern-flink" + flinkGenerateSavepointVersion + "-snapshot"); } finally { @@ -381,7 +381,7 @@ public class CEPMigrationTest { harness.processElement(new StreamRecord<Event>(middleEvent3, 23)); // simulate snapshot/restore with some elements in internal sorting queue - OperatorStateHandles snapshot = harness.snapshot(1L, 1L); + OperatorSubtaskState snapshot = harness.snapshot(1L, 1L); harness.close(); harness = new KeyedOneInputStreamOperatorTestHarness<>( @@ -449,7 +449,7 @@ public class CEPMigrationTest { harness.processWatermark(new Watermark(5)); // do snapshot and save to file - OperatorStateHandles snapshot = harness.snapshot(0L, 0L); + OperatorSubtaskState snapshot = harness.snapshot(0L, 0L); OperatorSnapshotUtil.writeStateHandle(snapshot, "src/test/resources/cep-migration-single-pattern-afterwards-flink" + flinkGenerateSavepointVersion + "-snapshot"); } finally { @@ -542,7 +542,7 @@ public class CEPMigrationTest { harness.processWatermark(new Watermark(6)); // do snapshot and save to file - OperatorStateHandles snapshot = harness.snapshot(0L, 0L); + OperatorSubtaskState snapshot = harness.snapshot(0L, 0L); OperatorSnapshotUtil.writeStateHandle(snapshot, "src/test/resources/cep-migration-conditions-flink" + flinkGenerateSavepointVersion + "-snapshot"); } finally { http://git-wip-us.apache.org/repos/asf/flink/blob/617e67c2/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java index 4fa6d09..322ee99 100644 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java @@ -34,11 +34,11 @@ import org.apache.flink.cep.pattern.Pattern; import org.apache.flink.cep.pattern.conditions.IterativeCondition; import org.apache.flink.cep.pattern.conditions.SimpleCondition; import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; +import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; import org.apache.flink.runtime.state.memory.MemoryStateBackend; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles; import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; import org.apache.flink.util.OutputTag; @@ -113,7 +113,7 @@ public class CEPOperatorTest extends TestLogger { harness.processElement(new StreamRecord<>(new Event(42, "foobar", 1.0), 2L)); // simulate snapshot/restore with some elements in internal sorting queue - OperatorStateHandles snapshot = harness.snapshot(0L, 0L); + OperatorSubtaskState snapshot = harness.snapshot(0L, 0L); harness.close(); harness = getCepTestHarness(false); @@ -136,7 +136,7 @@ public class CEPOperatorTest extends TestLogger { harness.processElement(new StreamRecord<>(endEvent, 5L)); // simulate snapshot/restore with empty element queue but NFA state - OperatorStateHandles snapshot2 = harness.snapshot(1L, 1L); + OperatorSubtaskState snapshot2 = harness.snapshot(1L, 1L); harness.close(); harness = getCepTestHarness(false); @@ -182,7 +182,7 @@ public class CEPOperatorTest extends TestLogger { harness.processElement(new StreamRecord<>(new Event(42, "foobar", 1.0), 2L)); // simulate snapshot/restore with some elements in internal sorting queue - OperatorStateHandles snapshot = harness.snapshot(0L, 0L); + OperatorSubtaskState snapshot = harness.snapshot(0L, 0L); harness.close(); harness = getCepTestHarness(false); @@ -205,7 +205,7 @@ public class CEPOperatorTest extends TestLogger { harness.processWatermark(new Watermark(2L)); // simulate snapshot/restore with empty element queue but NFA state - OperatorStateHandles snapshot2 = harness.snapshot(1L, 1L); + OperatorSubtaskState snapshot2 = harness.snapshot(1L, 1L); harness.close(); harness = getCepTestHarness(false); @@ -348,7 +348,7 @@ public class CEPOperatorTest extends TestLogger { harness.processElement(new StreamRecord<>(startEvent, 1L)); // simulate snapshot/restore with some elements in internal sorting queue - OperatorStateHandles snapshot = harness.snapshot(0L, 0L); + OperatorSubtaskState snapshot = harness.snapshot(0L, 0L); harness.close(); operator = CepOperatorTestUtilities.getKeyedCepOpearator(true, new SimpleNFAFactory()); @@ -359,7 +359,7 @@ public class CEPOperatorTest extends TestLogger { harness.open(); harness.processElement(new StreamRecord<>(new Event(42, "d", 1.0), 4L)); - OperatorStateHandles snapshot2 = harness.snapshot(0L, 0L); + OperatorSubtaskState snapshot2 = harness.snapshot(0L, 0L); harness.close(); operator = CepOperatorTestUtilities.getKeyedCepOpearator(true, new SimpleNFAFactory()); @@ -409,7 +409,7 @@ public class CEPOperatorTest extends TestLogger { harness.processElement(new StreamRecord<>(startEvent, 1L)); // simulate snapshot/restore with some elements in internal sorting queue - OperatorStateHandles snapshot = harness.snapshot(0L, 0L); + OperatorSubtaskState snapshot = harness.snapshot(0L, 0L); harness.close(); operator = CepOperatorTestUtilities.getKeyedCepOpearator(true, new SimpleNFAFactory()); @@ -423,7 +423,7 @@ public class CEPOperatorTest extends TestLogger { harness.open(); harness.processElement(new StreamRecord<>(new Event(42, "d", 1.0), 4L)); - OperatorStateHandles snapshot2 = harness.snapshot(0L, 0L); + OperatorSubtaskState snapshot2 = harness.snapshot(0L, 0L); harness.close(); operator = CepOperatorTestUtilities.getKeyedCepOpearator(true, new SimpleNFAFactory()); @@ -588,7 +588,7 @@ public class CEPOperatorTest extends TestLogger { harness.processElement(new StreamRecord<>(startEvent2, 4L)); harness.processElement(new StreamRecord<Event>(middleEvent2, 5L)); - OperatorStateHandles snapshot = harness.snapshot(0L, 0L); + OperatorSubtaskState snapshot = harness.snapshot(0L, 0L); harness.close(); SelectCepOperator<Event, Integer, Map<String, List<Event>>> operator2 = getKeyedCepOperator(false); @@ -764,7 +764,7 @@ public class CEPOperatorTest extends TestLogger { harness.processElement(new StreamRecord<>(startEvent2, 3L)); harness.processElement(new StreamRecord<Event>(middleEvent2, 4L)); - OperatorStateHandles snapshot = harness.snapshot(0L, 0L); + OperatorSubtaskState snapshot = harness.snapshot(0L, 0L); harness.close(); SelectCepOperator<Event, Integer, Map<String, List<Event>>> operator2 = getKeyedCepOperator(true); @@ -955,7 +955,7 @@ public class CEPOperatorTest extends TestLogger { harness.processElement(new StreamRecord<>(middleEvent1, 3L)); harness.processElement(new StreamRecord<>(startEvent2, 3L)); - OperatorStateHandles snapshot = harness.snapshot(0L, 0L); + OperatorSubtaskState snapshot = harness.snapshot(0L, 0L); harness.close(); SelectCepOperator<Event, Integer, Map<String, List<Event>>> operator2 = getKeyedCepOperatorWithComparator(true); @@ -1016,7 +1016,7 @@ public class CEPOperatorTest extends TestLogger { harness.processElement(new StreamRecord<Event>(middleEvent2, 5L)); harness.processElement(new StreamRecord<Event>(middleEvent1, 5L)); - OperatorStateHandles snapshot = harness.snapshot(0L, 0L); + OperatorSubtaskState snapshot = harness.snapshot(0L, 0L); harness.close(); SelectCepOperator<Event, Integer, Map<String, List<Event>>> operator2 = getKeyedCepOperatorWithComparator(false); http://git-wip-us.apache.org/repos/asf/flink/blob/617e67c2/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java index f5236c1..8a73556 100644 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java @@ -26,11 +26,11 @@ import org.apache.flink.cep.nfa.NFA; import org.apache.flink.cep.nfa.compiler.NFACompiler; import org.apache.flink.cep.pattern.Pattern; import org.apache.flink.cep.pattern.conditions.SimpleCondition; +import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; import org.apache.flink.runtime.state.KeyGroupRangeAssignment; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles; import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness; import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; @@ -101,7 +101,7 @@ public class CEPRescalingTest { new StreamRecord<Event>(middleEvent2, 4)); // valid element // take a snapshot with some elements in internal sorting queue - OperatorStateHandles snapshot = harness.snapshot(0, 0); + OperatorSubtaskState snapshot = harness.snapshot(0, 0); harness.close(); // initialize two sub-tasks with the previously snapshotted state to simulate scaling up @@ -273,7 +273,7 @@ public class CEPRescalingTest { // we take a snapshot and make it look as a single operator // this will be the initial state of all downstream tasks. - OperatorStateHandles snapshot = AbstractStreamOperatorTestHarness.repackageState( + OperatorSubtaskState snapshot = AbstractStreamOperatorTestHarness.repackageState( harness2.snapshot(0, 0), harness1.snapshot(0, 0), harness3.snapshot(0, 0) http://git-wip-us.apache.org/repos/asf/flink/blob/617e67c2/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorStateHandles.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorStateHandles.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorStateHandles.java deleted file mode 100644 index 0b03b79..0000000 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorStateHandles.java +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.runtime.tasks; - -import org.apache.flink.annotation.Internal; -import org.apache.flink.annotation.VisibleForTesting; -import org.apache.flink.runtime.state.KeyedStateHandle; -import org.apache.flink.runtime.state.OperatorStateHandle; -import org.apache.flink.util.CollectionUtil; - -import java.util.Collection; -import java.util.List; - -/** - * This class holds all state handles for one operator. - */ -@Internal -@VisibleForTesting -public class OperatorStateHandles { - - private final int operatorChainIndex; - - private final Collection<KeyedStateHandle> managedKeyedState; - private final Collection<KeyedStateHandle> rawKeyedState; - private final Collection<OperatorStateHandle> managedOperatorState; - private final Collection<OperatorStateHandle> rawOperatorState; - - public OperatorStateHandles( - int operatorChainIndex, - Collection<KeyedStateHandle> managedKeyedState, - Collection<KeyedStateHandle> rawKeyedState, - Collection<OperatorStateHandle> managedOperatorState, - Collection<OperatorStateHandle> rawOperatorState) { - - this.operatorChainIndex = operatorChainIndex; - this.managedKeyedState = managedKeyedState; - this.rawKeyedState = rawKeyedState; - this.managedOperatorState = managedOperatorState; - this.rawOperatorState = rawOperatorState; - } - - public Collection<KeyedStateHandle> getManagedKeyedState() { - return managedKeyedState; - } - - public Collection<KeyedStateHandle> getRawKeyedState() { - return rawKeyedState; - } - - public Collection<OperatorStateHandle> getManagedOperatorState() { - return managedOperatorState; - } - - public Collection<OperatorStateHandle> getRawOperatorState() { - return rawOperatorState; - } - - public int getOperatorChainIndex() { - return operatorChainIndex; - } - - private static <T> T getSafeItemAtIndexOrNull(List<T> list, int idx) { - return CollectionUtil.isNullOrEmpty(list) ? null : list.get(idx); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/617e67c2/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/checkpoint/ListCheckpointedTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/checkpoint/ListCheckpointedTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/checkpoint/ListCheckpointedTest.java index 4d201f4..ff0b0fc 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/checkpoint/ListCheckpointedTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/checkpoint/ListCheckpointedTest.java @@ -19,8 +19,8 @@ package org.apache.flink.streaming.api.checkpoint; import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; import org.apache.flink.streaming.api.operators.StreamMap; -import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles; import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness; import org.junit.Assert; @@ -41,7 +41,7 @@ public class ListCheckpointedTest { AbstractStreamOperatorTestHarness<Integer> testHarness = new AbstractStreamOperatorTestHarness<>(new StreamMap<>(userFunction), 1, 1, 0); testHarness.open(); - OperatorStateHandles snapshot = testHarness.snapshot(0L, 0L); + OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L); testHarness.initializeState(snapshot); Assert.assertTrue(userFunction.isRestored()); } @@ -52,7 +52,7 @@ public class ListCheckpointedTest { AbstractStreamOperatorTestHarness<Integer> testHarness = new AbstractStreamOperatorTestHarness<>(new StreamMap<>(userFunction), 1, 1, 0); testHarness.open(); - OperatorStateHandles snapshot = testHarness.snapshot(0L, 0L); + OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L); testHarness.initializeState(snapshot); Assert.assertTrue(userFunction.isRestored()); } @@ -63,7 +63,7 @@ public class ListCheckpointedTest { AbstractStreamOperatorTestHarness<Integer> testHarness = new AbstractStreamOperatorTestHarness<>(new StreamMap<>(userFunction), 1, 1, 0); testHarness.open(); - OperatorStateHandles snapshot = testHarness.snapshot(0L, 0L); + OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L); testHarness.initializeState(snapshot); Assert.assertTrue(userFunction.isRestored()); } http://git-wip-us.apache.org/repos/asf/flink/blob/617e67c2/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/FromElementsFunctionTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/FromElementsFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/FromElementsFunctionTest.java index 9268ef7..d92a009 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/FromElementsFunctionTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/FromElementsFunctionTest.java @@ -26,10 +26,10 @@ import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.api.java.typeutils.ValueTypeInfo; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; import org.apache.flink.streaming.api.functions.source.FromElementsFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.operators.StreamSource; -import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles; import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness; import org.apache.flink.types.Value; import org.apache.flink.util.ExceptionUtils; @@ -174,7 +174,7 @@ public class FromElementsFunctionTest { // make a checkpoint List<Integer> checkpointData = new ArrayList<>(numElements); - OperatorStateHandles handles = null; + OperatorSubtaskState handles = null; synchronized (ctx.getCheckpointLock()) { handles = testHarness.snapshot(566, System.currentTimeMillis()); checkpointData.addAll(result); http://git-wip-us.apache.org/repos/asf/flink/blob/617e67c2/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/StatefulSequenceSourceTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/StatefulSequenceSourceTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/StatefulSequenceSourceTest.java index de9f1c7..df70651 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/StatefulSequenceSourceTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/StatefulSequenceSourceTest.java @@ -19,11 +19,11 @@ package org.apache.flink.streaming.api.functions; import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.functions.source.StatefulSequenceSource; import org.apache.flink.streaming.api.operators.StreamSource; import org.apache.flink.streaming.api.watermark.Watermark; -import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles; import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness; import org.junit.Assert; @@ -112,7 +112,7 @@ public class StatefulSequenceSourceTest { latchToTrigger2.await(); } - OperatorStateHandles snapshot = AbstractStreamOperatorTestHarness.repackageState( + OperatorSubtaskState snapshot = AbstractStreamOperatorTestHarness.repackageState( testHarness1.snapshot(0L, 0L), testHarness2.snapshot(0L, 0L) ); http://git-wip-us.apache.org/repos/asf/flink/blob/617e67c2/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java index 3123675..166dc5a 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java @@ -21,8 +21,8 @@ import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeutils.base.StringSerializer; import org.apache.flink.api.common.typeutils.base.VoidSerializer; import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer; +import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; import org.apache.flink.streaming.api.operators.StreamSink; -import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles; import org.apache.flink.streaming.util.ContentDump; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; @@ -159,7 +159,7 @@ public class TwoPhaseCommitSinkFunctionTest { harness.processElement("42", 0); harness.snapshot(0, 1); harness.processElement("43", 2); - OperatorStateHandles snapshot = harness.snapshot(1, 3); + OperatorSubtaskState snapshot = harness.snapshot(1, 3); tmpDirectory.setWritable(false); try { @@ -192,7 +192,7 @@ public class TwoPhaseCommitSinkFunctionTest { harness.open(); harness.processElement("42", 0); - final OperatorStateHandles snapshot = harness.snapshot(0, 1); + final OperatorSubtaskState snapshot = harness.snapshot(0, 1); harness.notifyOfCompletedCheckpoint(1); final long transactionTimeout = 1000; @@ -248,7 +248,7 @@ public class TwoPhaseCommitSinkFunctionTest { harness.open(); - final OperatorStateHandles snapshot = harness.snapshot(0, 1); + final OperatorSubtaskState snapshot = harness.snapshot(0, 1); final long elapsedTime = (long) ((double) transactionTimeout * warningRatio + 2); clock.setEpochMilli(elapsedTime); harness.initializeState(snapshot); http://git-wip-us.apache.org/repos/asf/flink/blob/617e67c2/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java index 97a0182..46cae27 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java @@ -25,6 +25,7 @@ import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.core.fs.CloseableRegistry; import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; import org.apache.flink.runtime.state.AbstractKeyedStateBackend; import org.apache.flink.runtime.state.CheckpointStreamFactory; import org.apache.flink.runtime.state.KeyGroupRange; @@ -37,7 +38,6 @@ import org.apache.flink.runtime.state.VoidNamespace; import org.apache.flink.runtime.state.VoidNamespaceSerializer; import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles; import org.apache.flink.streaming.runtime.tasks.StreamTask; import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness; import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; @@ -194,7 +194,7 @@ public class AbstractStreamOperatorTest { testHarness.processElement(new Tuple2<>(0, "SET_PROC_TIME_TIMER:10"), 0); - OperatorStateHandles snapshot = testHarness.snapshot(0, 0); + OperatorSubtaskState snapshot = testHarness.snapshot(0, 0); TestOperator testOperator1 = new TestOperator(); @@ -303,7 +303,7 @@ public class AbstractStreamOperatorTest { assertTrue(extractResult(testHarness).isEmpty()); - OperatorStateHandles snapshot = testHarness.snapshot(0, 0); + OperatorSubtaskState snapshot = testHarness.snapshot(0, 0); // now, restore in two operators, first operator 1 @@ -445,7 +445,7 @@ public class AbstractStreamOperatorTest { // take a snapshot from each one of the "parallel" instances of the operator // and combine them into one so that we can scale down - OperatorStateHandles repackagedState = + OperatorSubtaskState repackagedState = AbstractStreamOperatorTestHarness.repackageState( testHarness1.snapshot(0, 0), testHarness2.snapshot(0, 0) http://git-wip-us.apache.org/repos/asf/flink/blob/617e67c2/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/KeyedProcessOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/KeyedProcessOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/KeyedProcessOperatorTest.java index f043fa8..e1986f3 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/KeyedProcessOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/KeyedProcessOperatorTest.java @@ -22,11 +22,11 @@ import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeutils.base.IntSerializer; import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; import org.apache.flink.streaming.api.TimeDomain; import org.apache.flink.streaming.api.functions.ProcessFunction; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles; import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; import org.apache.flink.streaming.util.TestHarnessUtil; @@ -255,7 +255,7 @@ public class KeyedProcessOperatorTest extends TestLogger { testHarness.processElement(new StreamRecord<>(5, 12L)); // snapshot and restore from scratch - OperatorStateHandles snapshot = testHarness.snapshot(0, 0); + OperatorSubtaskState snapshot = testHarness.snapshot(0, 0); testHarness.close(); http://git-wip-us.apache.org/repos/asf/flink/blob/617e67c2/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest.java index c4b6894..9d0b9e2 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest.java @@ -29,6 +29,7 @@ import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataInputViewStreamWrapper; import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.state.AbstractKeyedStateBackend; import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider; @@ -40,7 +41,6 @@ import org.apache.flink.runtime.state.StatePartitionStreamProvider; import org.apache.flink.runtime.state.StateSnapshotContext; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles; import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; @@ -83,7 +83,7 @@ public class StreamOperatorSnapshotRestoreTest { testHarness.processElement(new StreamRecord<>(i)); } - OperatorStateHandles handles = testHarness.snapshot(1L, 1L); + OperatorSubtaskState handles = testHarness.snapshot(1L, 1L); testHarness.close(); http://git-wip-us.apache.org/repos/asf/flink/blob/617e67c2/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/WrappingFunctionSnapshotRestoreTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/WrappingFunctionSnapshotRestoreTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/WrappingFunctionSnapshotRestoreTest.java index 6c93894..41bb00c 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/WrappingFunctionSnapshotRestoreTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/WrappingFunctionSnapshotRestoreTest.java @@ -24,12 +24,12 @@ import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.typeutils.base.IntSerializer; import org.apache.flink.api.java.operators.translation.WrappingFunction; +import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.FunctionSnapshotContext; import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; import org.apache.flink.streaming.api.checkpoint.ListCheckpointed; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; import org.junit.Assert; @@ -59,7 +59,7 @@ public class WrappingFunctionSnapshotRestoreTest { testHarness.processElement(new StreamRecord<>(5, 12L)); // snapshot and restore from scratch - OperatorStateHandles snapshot = testHarness.snapshot(0, 0); + OperatorSubtaskState snapshot = testHarness.snapshot(0, 0); testHarness.close(); @@ -91,7 +91,7 @@ public class WrappingFunctionSnapshotRestoreTest { testHarness.processElement(new StreamRecord<>(5, 12L)); // snapshot and restore from scratch - OperatorStateHandles snapshot = testHarness.snapshot(0, 0); + OperatorSubtaskState snapshot = testHarness.snapshot(0, 0); testHarness.close(); http://git-wip-us.apache.org/repos/asf/flink/blob/617e67c2/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java index 507ff0b..8e80f44 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java @@ -27,6 +27,7 @@ import org.apache.flink.core.testutils.CheckedThread; import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.runtime.checkpoint.CheckpointMetaData; import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; import org.apache.flink.runtime.checkpoint.TaskStateSnapshot; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.io.network.api.CheckpointBarrier; @@ -52,7 +53,6 @@ import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask; import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness; -import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles; import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback; import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; import org.apache.flink.streaming.runtime.tasks.StreamTask; @@ -969,7 +969,7 @@ public class AsyncWaitOperatorTest extends TestLogger { snapshotHarness.open(); - final OperatorStateHandles snapshot; + final OperatorSubtaskState snapshot; final ArrayList<Integer> expectedOutput = new ArrayList<>(capacity + 1); http://git-wip-us.apache.org/repos/asf/flink/blob/617e67c2/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithKeyedOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithKeyedOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithKeyedOperatorTest.java index 3fa439f..96607d4 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithKeyedOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithKeyedOperatorTest.java @@ -25,11 +25,11 @@ import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; import org.apache.flink.runtime.state.KeyedStateFunction; import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles; import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness; import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness; import org.apache.flink.streaming.util.TestHarnessUtil; @@ -413,7 +413,7 @@ public class CoBroadcastWithKeyedOperatorTest { keysToRegister.add("test2"); keysToRegister.add("test3"); - final OperatorStateHandles mergedSnapshot; + final OperatorSubtaskState mergedSnapshot; try ( TwoInputStreamOperatorTestHarness<String, Integer, String> testHarness1 = getInitializedTestHarness( @@ -512,7 +512,7 @@ public class CoBroadcastWithKeyedOperatorTest { keysToRegister.add("test2"); keysToRegister.add("test3"); - final OperatorStateHandles mergedSnapshot; + final OperatorSubtaskState mergedSnapshot; try ( TwoInputStreamOperatorTestHarness<String, Integer, String> testHarness1 = getInitializedTestHarness( @@ -710,7 +710,7 @@ public class CoBroadcastWithKeyedOperatorTest { final int maxParallelism, final int numTasks, final int taskIdx, - final OperatorStateHandles initState) throws Exception { + final OperatorSubtaskState initState) throws Exception { final TwoInputStreamOperatorTestHarness<IN1, IN2, OUT> testHarness = new KeyedTwoInputStreamOperatorTestHarness<>( http://git-wip-us.apache.org/repos/asf/flink/blob/617e67c2/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithNonKeyedOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithNonKeyedOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithNonKeyedOperatorTest.java index 96e1c3e..fa72e45 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithNonKeyedOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithNonKeyedOperatorTest.java @@ -21,10 +21,10 @@ package org.apache.flink.streaming.api.operators.co; import org.apache.flink.api.common.state.MapStateDescriptor; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles; import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness; import org.apache.flink.streaming.util.TestHarnessUtil; import org.apache.flink.streaming.util.TwoInputStreamOperatorTestHarness; @@ -243,7 +243,7 @@ public class CoBroadcastWithNonKeyedOperatorTest { keysToRegister.add("test2"); keysToRegister.add("test3"); - final OperatorStateHandles mergedSnapshot; + final OperatorSubtaskState mergedSnapshot; try ( TwoInputStreamOperatorTestHarness<String, Integer, String> testHarness1 = getInitializedTestHarness( @@ -335,7 +335,7 @@ public class CoBroadcastWithNonKeyedOperatorTest { keysToRegister.add("test2"); keysToRegister.add("test3"); - final OperatorStateHandles mergedSnapshot; + final OperatorSubtaskState mergedSnapshot; try ( TwoInputStreamOperatorTestHarness<String, Integer, String> testHarness1 = getInitializedTestHarness( @@ -547,7 +547,7 @@ public class CoBroadcastWithNonKeyedOperatorTest { // final int maxParallelism, // final int numTasks, // final int taskIdx, -// final OperatorStateHandles initState) throws Exception { +// final OperatorSubtaskState initState) throws Exception { // // return getInitializedTestHarness(function, maxParallelism, numTasks, taskIdx, initState, STATE_DESCRIPTOR); // } @@ -557,7 +557,7 @@ public class CoBroadcastWithNonKeyedOperatorTest { final int maxParallelism, final int numTasks, final int taskIdx, - final OperatorStateHandles initState, + final OperatorSubtaskState initState, final MapStateDescriptor<?, ?>... descriptors) throws Exception { TwoInputStreamOperatorTestHarness<IN1, IN2, OUT> testHarness = new TwoInputStreamOperatorTestHarness<>( http://git-wip-us.apache.org/repos/asf/flink/blob/617e67c2/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/KeyedCoProcessOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/KeyedCoProcessOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/KeyedCoProcessOperatorTest.java index 13c6a19..157053a 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/KeyedCoProcessOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/KeyedCoProcessOperatorTest.java @@ -22,11 +22,11 @@ import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeutils.base.StringSerializer; import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; import org.apache.flink.streaming.api.TimeDomain; import org.apache.flink.streaming.api.functions.co.CoProcessFunction; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles; import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness; import org.apache.flink.streaming.util.TestHarnessUtil; import org.apache.flink.streaming.util.TwoInputStreamOperatorTestHarness; @@ -294,7 +294,7 @@ public class KeyedCoProcessOperatorTest extends TestLogger { testHarness.processElement2(new StreamRecord<>("5", 12L)); // snapshot and restore from scratch - OperatorStateHandles snapshot = testHarness.snapshot(0, 0); + OperatorSubtaskState snapshot = testHarness.snapshot(0, 0); testHarness.close(); http://git-wip-us.apache.org/repos/asf/flink/blob/617e67c2/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/ContinuousFileProcessingRescalingTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/ContinuousFileProcessingRescalingTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/ContinuousFileProcessingRescalingTest.java index ca49a2a..bf83ab4 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/ContinuousFileProcessingRescalingTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/ContinuousFileProcessingRescalingTest.java @@ -25,12 +25,12 @@ import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.core.fs.FileInputSplit; import org.apache.flink.core.fs.Path; import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator; import org.apache.flink.streaming.api.functions.source.TimestampedFileInputSplit; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles; import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; import org.apache.flink.util.Preconditions; @@ -89,7 +89,7 @@ public class ContinuousFileProcessingRescalingTest { // 2) and take the snapshots from the previous instances and merge them // into a new one which will be then used to initialize a third instance - OperatorStateHandles mergedState = AbstractStreamOperatorTestHarness. + OperatorSubtaskState mergedState = AbstractStreamOperatorTestHarness. repackageState( testHarness2.snapshot(0, 0), testHarness1.snapshot(0, 0) @@ -157,7 +157,7 @@ public class ContinuousFileProcessingRescalingTest { } // this will be the state shared by the 2 new instances. - OperatorStateHandles snapshot = testHarness1.snapshot(0, 0); + OperatorSubtaskState snapshot = testHarness1.snapshot(0, 0); // 1) clear the output of instance so that we can compare it with one created by the new instances, and // 2) let the operator process the rest of its state http://git-wip-us.apache.org/repos/asf/flink/blob/617e67c2/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/WriteAheadSinkTestBase.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/WriteAheadSinkTestBase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/WriteAheadSinkTestBase.java index d3fd585..5fbce36 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/WriteAheadSinkTestBase.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/WriteAheadSinkTestBase.java @@ -19,8 +19,8 @@ package org.apache.flink.streaming.runtime.operators; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles; import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; import org.apache.flink.util.TestLogger; @@ -140,7 +140,7 @@ public abstract class WriteAheadSinkTestBase<IN, S extends GenericWriteAheadSink elementCounter++; } - OperatorStateHandles latestSnapshot = testHarness.snapshot(snapshotCount++, 0); + OperatorSubtaskState latestSnapshot = testHarness.snapshot(snapshotCount++, 0); testHarness.notifyOfCompletedCheckpoint(snapshotCount - 1); for (int x = 0; x < 20; x++) { @@ -195,11 +195,11 @@ public abstract class WriteAheadSinkTestBase<IN, S extends GenericWriteAheadSink } // snapshot at checkpoint 0 for testHarness1 and testHarness 2 - OperatorStateHandles snapshot1 = testHarness1.snapshot(snapshotCount, 0); - OperatorStateHandles snapshot2 = testHarness2.snapshot(snapshotCount, 0); + OperatorSubtaskState snapshot1 = testHarness1.snapshot(snapshotCount, 0); + OperatorSubtaskState snapshot2 = testHarness2.snapshot(snapshotCount, 0); // merge the two partial states - OperatorStateHandles mergedSnapshot = AbstractStreamOperatorTestHarness + OperatorSubtaskState mergedSnapshot = AbstractStreamOperatorTestHarness .repackageState(snapshot1, snapshot2); testHarness1.close(); @@ -255,7 +255,7 @@ public abstract class WriteAheadSinkTestBase<IN, S extends GenericWriteAheadSink } // this will be the state that will be split between the two new operators - OperatorStateHandles snapshot = testHarness1.snapshot(++snapshotCount, 0); + OperatorSubtaskState snapshot = testHarness1.snapshot(++snapshotCount, 0); testHarness1.close(); http://git-wip-us.apache.org/repos/asf/flink/blob/617e67c2/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java index 30b8da9..30382ed 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java @@ -21,6 +21,7 @@ package org.apache.flink.streaming.runtime.operators.windowing; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner; import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner; @@ -31,7 +32,6 @@ import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.streaming.api.windowing.windows.Window; import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalWindowFunction; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles; import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness; import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; @@ -2356,7 +2356,7 @@ public abstract class WindowOperatorContractTest extends TestLogger { assertEquals(5, testHarness.numKeyedStateEntries()); assertEquals(4, testHarness.numEventTimeTimers()); // timers/gc timers for two windows - OperatorStateHandles snapshot = testHarness.snapshot(0, 0); + OperatorSubtaskState snapshot = testHarness.snapshot(0, 0); // restore mockAssigner = mockMergingAssigner(); http://git-wip-us.apache.org/repos/asf/flink/blob/617e67c2/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorMigrationTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorMigrationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorMigrationTest.java index ec537c6..e41b9cc 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorMigrationTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorMigrationTest.java @@ -32,6 +32,7 @@ import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.typeutils.TypeInfoParser; import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer; import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; import org.apache.flink.streaming.api.functions.windowing.PassThroughWindowFunction; import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction; import org.apache.flink.streaming.api.functions.windowing.WindowFunction; @@ -49,7 +50,6 @@ import org.apache.flink.streaming.api.windowing.windows.Window; import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction; import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles; import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; import org.apache.flink.streaming.util.OperatorSnapshotUtil; @@ -142,7 +142,7 @@ public class WindowOperatorMigrationTest { testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 2), 1000)); // do snapshot and save to file - OperatorStateHandles snapshot = testHarness.snapshot(0L, 0L); + OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L); OperatorSnapshotUtil.writeStateHandle( snapshot, @@ -237,7 +237,7 @@ public class WindowOperatorMigrationTest { testHarness.open(); // do snapshot and save to file - OperatorStateHandles snapshot = testHarness.snapshot(0, 0); + OperatorSubtaskState snapshot = testHarness.snapshot(0, 0); OperatorSnapshotUtil.writeStateHandle( snapshot, "src/test/resources/win-op-migration-test-session-with-stateful-trigger-mint-flink" + flinkGenerateSavepointVersion + "-snapshot"); @@ -369,7 +369,7 @@ public class WindowOperatorMigrationTest { TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator<>()); // do snapshot and save to file - OperatorStateHandles snapshot = testHarness.snapshot(0, 0); + OperatorSubtaskState snapshot = testHarness.snapshot(0, 0); OperatorSnapshotUtil.writeStateHandle( snapshot, "src/test/resources/win-op-migration-test-reduce-event-time-flink" + flinkGenerateSavepointVersion + "-snapshot"); @@ -485,7 +485,7 @@ public class WindowOperatorMigrationTest { TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator<>()); // do snapshot and save to file - OperatorStateHandles snapshot = testHarness.snapshot(0, 0); + OperatorSubtaskState snapshot = testHarness.snapshot(0, 0); OperatorSnapshotUtil.writeStateHandle( snapshot, "src/test/resources/win-op-migration-test-apply-event-time-flink" + flinkGenerateSavepointVersion + "-snapshot"); @@ -594,7 +594,7 @@ public class WindowOperatorMigrationTest { TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator<>()); // do snapshot and save to file - OperatorStateHandles snapshot = testHarness.snapshot(0, 0); + OperatorSubtaskState snapshot = testHarness.snapshot(0, 0); OperatorSnapshotUtil.writeStateHandle( snapshot, "src/test/resources/win-op-migration-test-reduce-processing-time-flink" + flinkGenerateSavepointVersion + "-snapshot"); @@ -699,7 +699,7 @@ public class WindowOperatorMigrationTest { TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator<>()); // do snapshot and save to file - OperatorStateHandles snapshot = testHarness.snapshot(0, 0); + OperatorSubtaskState snapshot = testHarness.snapshot(0, 0); OperatorSnapshotUtil.writeStateHandle( snapshot, "src/test/resources/win-op-migration-test-apply-processing-time-flink" + flinkGenerateSavepointVersion + "-snapshot"); @@ -813,7 +813,7 @@ public class WindowOperatorMigrationTest { TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator<>()); // do snapshot and save to file - OperatorStateHandles snapshot = testHarness.snapshot(0, 0); + OperatorSubtaskState snapshot = testHarness.snapshot(0, 0); OperatorSnapshotUtil.writeStateHandle( snapshot, "src/test/resources/win-op-migration-test-kryo-serialized-key-flink" + flinkGenerateSavepointVersion + "-snapshot");
