[FLINK-4379] [checkpoints] Introduce rescalable operator state This introduces the Operator State Backend, which stores state that is not partitioned by a key. It replaces the 'Checkpointed' interface.
Additionally, this introduces CheckpointStateHandles as container for all checkpoint related state handles This closes #2512 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/53ed6ada Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/53ed6ada Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/53ed6ada Branch: refs/heads/master Commit: 53ed6adac8cbe6b5dcb692dc9b94970f3ec5887c Parents: 2afc092 Author: Stefan Richter <[email protected]> Authored: Wed Aug 31 23:59:27 2016 +0200 Committer: Stephan Ewen <[email protected]> Committed: Fri Sep 30 12:38:46 2016 +0200 ---------------------------------------------------------------------- .../streaming/state/AbstractRocksDBState.java | 6 +- .../state/RocksDBKeyedStateBackend.java | 75 ++-- .../streaming/state/RocksDBStateBackend.java | 8 +- .../state/RocksDBAsyncSnapshotTest.java | 12 +- .../state/RocksDBStateBackendConfigTest.java | 48 ++- .../api/common/functions/RuntimeContext.java | 125 +----- .../util/AbstractRuntimeUDFContext.java | 28 +- .../flink/api/common/state/OperatorState.java | 70 --- .../flink/api/common/state/ValueState.java | 2 +- .../java/typeutils/runtime/JavaSerializer.java | 116 +++++ .../flink/hdfstests/FileStateBackendTest.java | 26 +- .../AbstractCEPBasePatternOperator.java | 3 +- .../operator/AbstractCEPPatternOperator.java | 2 - .../AbstractKeyedCEPPatternOperator.java | 2 - .../checkpoint/CheckpointCoordinator.java | 127 +++++- .../runtime/checkpoint/CompletedCheckpoint.java | 5 - .../checkpoint/OperatorStateRepartitioner.java | 42 ++ .../runtime/checkpoint/PendingCheckpoint.java | 95 +++-- .../RoundRobinOperatorStateRepartitioner.java | 190 +++++++++ .../flink/runtime/checkpoint/SubtaskState.java | 9 - .../flink/runtime/checkpoint/TaskState.java | 79 +++- .../savepoint/SavepointV1Serializer.java | 97 ++++- .../deployment/TaskDeploymentDescriptor.java | 50 ++- .../flink/runtime/execution/Environment.java | 16 +- .../flink/runtime/executiongraph/Execution.java | 25 +- .../runtime/executiongraph/ExecutionGraph.java | 2 - .../runtime/executiongraph/ExecutionVertex.java | 6 +- .../runtime/jobgraph/tasks/StatefulTask.java | 11 +- .../checkpoint/AcknowledgeCheckpoint.java | 67 ++- .../runtime/state/AbstractCloseableHandle.java | 126 ------ .../state/AbstractKeyedStateBackend.java | 342 +++++++++++++++ .../runtime/state/AbstractStateBackend.java | 43 +- .../flink/runtime/state/ChainedStateHandle.java | 7 +- .../runtime/state/CheckpointStateHandles.java | 103 +++++ .../flink/runtime/state/ClosableRegistry.java | 84 ++++ .../state/DefaultOperatorStateBackend.java | 215 ++++++++++ .../runtime/state/KeyGroupRangeOffsets.java | 2 + .../runtime/state/KeyGroupsStateHandle.java | 6 - .../flink/runtime/state/KeyedStateBackend.java | 301 ++----------- .../runtime/state/OperatorStateBackend.java | 35 ++ .../runtime/state/OperatorStateHandle.java | 109 +++++ .../flink/runtime/state/OperatorStateStore.java | 47 +++ ...artitionableCheckpointStateOutputStream.java | 96 +++++ .../state/RetrievableStreamStateHandle.java | 2 +- .../flink/runtime/state/SnapshotProvider.java | 45 ++ .../apache/flink/runtime/state/StateObject.java | 6 +- .../apache/flink/runtime/state/StateUtil.java | 37 -- .../state/filesystem/FileStateHandle.java | 8 +- .../state/filesystem/FsStateBackend.java | 6 +- .../state/heap/HeapKeyedStateBackend.java | 210 ++++----- .../state/memory/ByteStreamStateHandle.java | 13 +- .../state/memory/MemoryStateBackend.java | 9 +- .../ActorGatewayCheckpointResponder.java | 11 +- .../taskmanager/CheckpointResponder.java | 15 +- .../runtime/taskmanager/RuntimeEnvironment.java | 12 +- .../apache/flink/runtime/taskmanager/Task.java | 11 +- .../checkpoint/CheckpointCoordinatorTest.java | 421 +++++++++++++++---- .../checkpoint/CheckpointStateRestoreTest.java | 46 +- .../CompletedCheckpointStoreTest.java | 2 +- .../checkpoint/PendingCheckpointTest.java | 2 +- .../checkpoint/PendingSavepointTest.java | 2 +- ...ZooKeeperCompletedCheckpointStoreITCase.java | 5 - .../checkpoint/savepoint/SavepointV1Test.java | 20 +- .../stats/SimpleCheckpointStatsTrackerTest.java | 2 +- .../jobmanager/JobManagerHARecoveryTest.java | 20 +- .../messages/CheckpointMessagesTest.java | 17 +- .../operators/testutils/DummyEnvironment.java | 3 +- .../operators/testutils/MockEnvironment.java | 3 +- .../runtime/query/QueryableStateClientTest.java | 4 +- .../runtime/query/netty/KvStateClientTest.java | 5 +- .../query/netty/KvStateServerHandlerTest.java | 7 +- .../runtime/query/netty/KvStateServerTest.java | 4 +- .../state/AbstractCloseableHandleTest.java | 97 ----- .../runtime/state/FileStateBackendTest.java | 35 +- .../runtime/state/MemoryStateBackendTest.java | 15 +- .../runtime/state/OperatorStateBackendTest.java | 155 +++++++ .../runtime/state/StateBackendTestBase.java | 115 ++--- .../FsCheckpointStateOutputStreamTest.java | 16 +- .../runtime/taskmanager/TaskAsyncCallTest.java | 5 +- .../ZooKeeperStateHandleStoreITCase.java | 4 - .../connectors/kafka/FlinkKafkaConsumer08.java | 37 +- .../connectors/kafka/KafkaConsumer08Test.java | 4 +- .../connectors/kafka/FlinkKafkaConsumer09.java | 43 +- .../kafka/FlinkKafkaConsumerBase.java | 182 +++++--- .../kafka/FlinkKafkaProducerBase.java | 27 +- .../kafka/internals/AbstractFetcher.java | 4 +- .../kafka/AtLeastOnceProducerTest.java | 13 +- .../kafka/FlinkKafkaConsumerBaseTest.java | 149 ++++++- .../connectors/kafka/KafkaConsumerTestBase.java | 30 +- .../kafka/testutils/MockRuntimeContext.java | 10 - .../streaming/api/checkpoint/Checkpointed.java | 1 + .../api/checkpoint/CheckpointedFunction.java | 65 +++ .../api/checkpoint/ListCheckpointed.java | 65 +++ .../source/ContinuousFileReaderOperator.java | 5 +- .../api/operators/AbstractStreamOperator.java | 96 ++++- .../operators/AbstractUdfStreamOperator.java | 65 ++- .../operators/StreamCheckpointedOperator.java | 58 +++ .../streaming/api/operators/StreamOperator.java | 43 +- .../api/operators/StreamingRuntimeContext.java | 32 -- .../operators/GenericWriteAheadSink.java | 25 +- .../windowing/EvictingWindowOperator.java | 4 +- .../operators/windowing/WindowOperator.java | 14 +- .../streaming/runtime/tasks/OperatorChain.java | 50 ++- .../streaming/runtime/tasks/StreamTask.java | 314 ++++++++------ .../operators/StreamingRuntimeContextTest.java | 8 +- .../streaming/runtime/io/BarrierBufferTest.java | 8 +- .../runtime/io/BarrierTrackerTest.java | 13 +- .../operators/StreamOperatorChainingTest.java | 15 +- .../tasks/InterruptSensitiveRestoreTest.java | 68 +-- .../runtime/tasks/OneInputStreamTaskTest.java | 82 +++- .../runtime/tasks/StreamMockEnvironment.java | 3 +- .../KeyedOneInputStreamOperatorTestHarness.java | 37 +- .../util/OneInputStreamOperatorTestHarness.java | 24 +- .../UdfStreamOperatorCheckpointingITCase.java | 16 +- .../streaming/runtime/StateBackendITCase.java | 11 +- 115 files changed, 3981 insertions(+), 1890 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java index e878ad5..9da33ef 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java @@ -156,7 +156,7 @@ public abstract class AbstractRocksDBState<K, N, S extends State, SD extends Sta private void writeKey(K key) throws IOException { //write key - int beforeWrite = (int) keySerializationStream.getPosition(); + int beforeWrite = keySerializationStream.getPosition(); backend.getKeySerializer().serialize(key, keySerializationDateDataOutputView); if (ambiguousKeyPossible) { @@ -166,7 +166,7 @@ public abstract class AbstractRocksDBState<K, N, S extends State, SD extends Sta } private void writeNameSpace(N namespace) throws IOException { - int beforeWrite = (int) keySerializationStream.getPosition(); + int beforeWrite = keySerializationStream.getPosition(); namespaceSerializer.serialize(namespace, keySerializationDateDataOutputView); if (ambiguousKeyPossible) { @@ -176,7 +176,7 @@ public abstract class AbstractRocksDBState<K, N, S extends State, SD extends Sta } private void writeLengthFrom(int fromPosition) throws IOException { - int length = (int) (keySerializationStream.getPosition() - fromPosition); + int length = keySerializationStream.getPosition() - fromPosition; writeVariableIntBytes(length); } http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java index d5a96af..126ebd2 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java @@ -39,12 +39,12 @@ import org.apache.flink.core.memory.DataOutputViewStreamWrapper; import org.apache.flink.runtime.io.async.AbstractAsyncIOCallable; import org.apache.flink.runtime.io.async.AsyncStoppableTaskWithCallback; import org.apache.flink.runtime.query.TaskKvStateRegistry; +import org.apache.flink.runtime.state.AbstractKeyedStateBackend; import org.apache.flink.runtime.state.CheckpointStreamFactory; import org.apache.flink.runtime.state.DoneFuture; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyGroupRangeOffsets; import org.apache.flink.runtime.state.KeyGroupsStateHandle; -import org.apache.flink.runtime.state.KeyedStateBackend; import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.util.SerializableObject; import org.apache.flink.util.InstantiationUtil; @@ -73,12 +73,12 @@ import java.util.PriorityQueue; import java.util.concurrent.RunnableFuture; /** - * A {@link KeyedStateBackend} that stores its state in {@code RocksDB} and will serialize state to + * A {@link AbstractKeyedStateBackend} that stores its state in {@code RocksDB} and will serialize state to * streams provided by a {@link org.apache.flink.runtime.state.CheckpointStreamFactory} upon * checkpointing. This state backend can store very large state that exceeds memory and spills - * to disk. + * to disk. Except for the snapshotting, this class should be accessed as if it is not threadsafe. */ -public class RocksDBKeyedStateBackend<K> extends KeyedStateBackend<K> { +public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { private static final Logger LOG = LoggerFactory.getLogger(RocksDBKeyedStateBackend.class); @@ -98,9 +98,9 @@ public class RocksDBKeyedStateBackend<K> extends KeyedStateBackend<K> { private final File instanceRocksDBPath; /** - * Lock for protecting cleanup of the RocksDB db. We acquire this when doing asynchronous - * checkpoints and when disposing the db. Otherwise, the asynchronous snapshot might try - * iterating over a disposed db. + * Lock for protecting cleanup of the RocksDB against the checkpointing thread. We acquire this when doing + * asynchronous checkpoints and when disposing the DB. Otherwise, the asynchronous snapshot might try + * iterating over a disposed DB. After aquriring the lock, always first check if (db == null). */ private final SerializableObject dbDisposeLock = new SerializableObject(); @@ -110,13 +110,13 @@ public class RocksDBKeyedStateBackend<K> extends KeyedStateBackend<K> { * instance. They all write to this instance but to their own column family. */ @GuardedBy("dbDisposeLock") - protected volatile RocksDB db; + protected RocksDB db; /** * Information about the k/v states as we create them. This is used to retrieve the * column family that is used for a state and also for sanity checks when restoring. */ - private Map<String, Tuple2<ColumnFamilyHandle, StateDescriptor>> kvStateInformation; + private Map<String, Tuple2<ColumnFamilyHandle, StateDescriptor<?, ?>>> kvStateInformation; /** Number of bytes required to prefix the key groups. */ private final int keyGroupPrefixBytes; @@ -187,8 +187,8 @@ public class RocksDBKeyedStateBackend<K> extends KeyedStateBackend<K> { KeyGroupRange keyGroupRange, List<KeyGroupsStateHandle> restoreState ) throws Exception { - this( - jobId, + + this(jobId, operatorIdentifier, userCodeClassLoader, instanceBasePath, @@ -210,15 +210,11 @@ public class RocksDBKeyedStateBackend<K> extends KeyedStateBackend<K> { } /** - * @see java.io.Closeable - * - * Should only be called by one thread. - * - * @throws Exception + * Should only be called by one thread, and only after all accesses to the DB happened. */ @Override - public void close() throws Exception { - super.close(); + public void dispose() { + super.dispose(); final RocksDB cleanupRockDBReference; @@ -233,13 +229,17 @@ public class RocksDBKeyedStateBackend<K> extends KeyedStateBackend<K> { // Dispose decoupled db if (cleanupRockDBReference != null) { - for (Tuple2<ColumnFamilyHandle, StateDescriptor> column : kvStateInformation.values()) { + for (Tuple2<ColumnFamilyHandle, StateDescriptor<?, ?>> column : kvStateInformation.values()) { column.f0.dispose(); } cleanupRockDBReference.dispose(); } - FileUtils.deleteDirectory(instanceBasePath); + try { + FileUtils.deleteDirectory(instanceBasePath); + } catch (IOException ioex) { + LOG.info("Could not delete instace base path for RocksDB: " + instanceBasePath); + } } public int getKeyGroupPrefixBytes() { @@ -248,7 +248,7 @@ public class RocksDBKeyedStateBackend<K> extends KeyedStateBackend<K> { /** * Triggers an asynchronous snapshot of the keyed state backend from RocksDB. This snapshot can be canceled and - * is also stopped when the backend is closed through {@link #close()}. For each backend, this method must always + * is also stopped when the backend is closed through {@link #dispose()}. For each backend, this method must always * be called by the same thread. * * @param checkpointId The Id of the checkpoint. @@ -386,13 +386,13 @@ public class RocksDBKeyedStateBackend<K> extends KeyedStateBackend<K> { Preconditions.checkArgument(outStream == null, "Output stream for snapshot is already set."); outStream = checkpointStreamFactory. createCheckpointStateOutputStream(checkpointId, checkpointTimeStamp); + stateBackend.cancelStreamRegistry.registerClosable(outStream); outputView = new DataOutputViewStreamWrapper(outStream); } /** * 3) Write the actual data from RocksDB from the time we took the snapshot object in (1). * - * @return * @throws IOException */ public void writeDBSnapshot() throws IOException, InterruptedException { @@ -408,7 +408,8 @@ public class RocksDBKeyedStateBackend<K> extends KeyedStateBackend<K> { * @throws IOException */ public void closeCheckpointStream() throws IOException { - if(outStream != null) { + if (outStream != null) { + stateBackend.cancelStreamRegistry.unregisterClosable(outStream); snapshotResultStateHandle = closeSnapshotStreamAndGetHandle(); } } @@ -451,7 +452,7 @@ public class RocksDBKeyedStateBackend<K> extends KeyedStateBackend<K> { int kvStateId = 0; //iterate all column families, where each column family holds one k/v state, to write the metadata - for (Map.Entry<String, Tuple2<ColumnFamilyHandle, StateDescriptor>> column : stateBackend.kvStateInformation.entrySet()) { + for (Map.Entry<String, Tuple2<ColumnFamilyHandle, StateDescriptor<?, ?>>> column : stateBackend.kvStateInformation.entrySet()) { //be cooperative and check for interruption from time to time in the hot loop checkInterrupted(); @@ -463,7 +464,7 @@ public class RocksDBKeyedStateBackend<K> extends KeyedStateBackend<K> { ReadOptions readOptions = new ReadOptions(); readOptions.setSnapshot(snapshot); RocksIterator iterator = stateBackend.db.newIterator(column.getValue().f0, readOptions); - kvStateIterators.add(new Tuple2<RocksIterator, Integer>(iterator, kvStateId)); + kvStateIterators.add(new Tuple2<>(iterator, kvStateId)); ++kvStateId; } } @@ -624,15 +625,16 @@ public class RocksDBKeyedStateBackend<K> extends KeyedStateBackend<K> { throws IOException, RocksDBException, ClassNotFoundException { try { currentStateHandleInStream = currentKeyGroupsStateHandle.getStateHandle().openInputStream(); + rocksDBKeyedStateBackend.cancelStreamRegistry.registerClosable(currentStateHandleInStream); currentStateHandleInView = new DataInputViewStreamWrapper(currentStateHandleInStream); restoreKVStateMetaData(); restoreKVStateData(); } finally { - if(currentStateHandleInStream != null) { + if (currentStateHandleInStream != null) { + rocksDBKeyedStateBackend.cancelStreamRegistry.unregisterClosable(currentStateHandleInStream); currentStateHandleInStream.close(); } } - } /** @@ -652,19 +654,20 @@ public class RocksDBKeyedStateBackend<K> extends KeyedStateBackend<K> { //restore the empty columns for the k/v states through the metadata for (int i = 0; i < numColumns; i++) { - StateDescriptor stateDescriptor = InstantiationUtil.deserializeObject( + StateDescriptor<?, ?> stateDescriptor = (StateDescriptor<?, ?>) InstantiationUtil.deserializeObject( currentStateHandleInStream, rocksDBKeyedStateBackend.userCodeClassLoader); - Tuple2<ColumnFamilyHandle, StateDescriptor> columnFamily = rocksDBKeyedStateBackend. + Tuple2<ColumnFamilyHandle, StateDescriptor<?, ?>> columnFamily = rocksDBKeyedStateBackend. kvStateInformation.get(stateDescriptor.getName()); - if(null == columnFamily) { + if (null == columnFamily) { ColumnFamilyDescriptor columnFamilyDescriptor = new ColumnFamilyDescriptor( stateDescriptor.getName().getBytes(), rocksDBKeyedStateBackend.columnOptions); - columnFamily = new Tuple2<>(rocksDBKeyedStateBackend.db. - createColumnFamily(columnFamilyDescriptor), stateDescriptor); + columnFamily = new Tuple2<ColumnFamilyHandle, StateDescriptor<?, ?>>( + rocksDBKeyedStateBackend.db.createColumnFamily(columnFamilyDescriptor), stateDescriptor); + rocksDBKeyedStateBackend.kvStateInformation.put(stateDescriptor.getName(), columnFamily); } @@ -727,9 +730,9 @@ public class RocksDBKeyedStateBackend<K> extends KeyedStateBackend<K> { * <p>This also checks whether the {@link StateDescriptor} for a state matches the one * that we checkpointed, i.e. is already in the map of column families. */ - protected ColumnFamilyHandle getColumnFamily(StateDescriptor descriptor) { + protected ColumnFamilyHandle getColumnFamily(StateDescriptor<?, ?> descriptor) { - Tuple2<ColumnFamilyHandle, StateDescriptor> stateInfo = kvStateInformation.get(descriptor.getName()); + Tuple2<ColumnFamilyHandle, StateDescriptor<?, ?>> stateInfo = kvStateInformation.get(descriptor.getName()); if (stateInfo != null) { if (!stateInfo.f1.equals(descriptor)) { @@ -744,7 +747,9 @@ public class RocksDBKeyedStateBackend<K> extends KeyedStateBackend<K> { try { ColumnFamilyHandle columnFamily = db.createColumnFamily(columnDescriptor); - kvStateInformation.put(descriptor.getName(), new Tuple2<>(columnFamily, descriptor)); + Tuple2<ColumnFamilyHandle, StateDescriptor<?, ?>> tuple = + new Tuple2<ColumnFamilyHandle, StateDescriptor<?, ?>>(columnFamily, descriptor); + kvStateInformation.put(descriptor.getName(), tuple); return columnFamily; } catch (RocksDBException e) { throw new RuntimeException("Error creating ColumnFamilyHandle.", e); http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java index b6ce224..a0c980b 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java @@ -23,11 +23,11 @@ import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.query.TaskKvStateRegistry; +import org.apache.flink.runtime.state.AbstractKeyedStateBackend; import org.apache.flink.runtime.state.AbstractStateBackend; import org.apache.flink.runtime.state.CheckpointStreamFactory; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyGroupsStateHandle; -import org.apache.flink.runtime.state.KeyedStateBackend; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.rocksdb.ColumnFamilyOptions; import org.rocksdb.DBOptions; @@ -224,7 +224,7 @@ public class RocksDBStateBackend extends AbstractStateBackend { } @Override - public <K> KeyedStateBackend<K> createKeyedStateBackend( + public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend( Environment env, JobID jobID, String operatorIdentifier, @@ -251,7 +251,9 @@ public class RocksDBStateBackend extends AbstractStateBackend { } @Override - public <K> KeyedStateBackend<K> restoreKeyedStateBackend(Environment env, JobID jobID, + public <K> AbstractKeyedStateBackend<K> restoreKeyedStateBackend( + Environment env, + JobID jobID, String operatorIdentifier, TypeSerializer<K> keySerializer, int numberOfKeyGroups, http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java index c0c9ca1..bccbabc 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java @@ -29,10 +29,8 @@ import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider; -import org.apache.flink.runtime.state.ChainedStateHandle; +import org.apache.flink.runtime.state.CheckpointStateHandles; import org.apache.flink.runtime.state.CheckpointStreamFactory; -import org.apache.flink.runtime.state.KeyGroupsStateHandle; -import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.state.VoidNamespace; import org.apache.flink.runtime.state.VoidNamespaceSerializer; import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory; @@ -66,7 +64,6 @@ import java.io.IOException; import java.lang.reflect.Field; import java.net.URI; import java.util.Arrays; -import java.util.List; import java.util.UUID; import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutorService; @@ -138,12 +135,11 @@ public class RocksDBAsyncSnapshotTest { @Override public void acknowledgeCheckpoint( long checkpointId, - ChainedStateHandle<StreamStateHandle> chainedStateHandle, - List<KeyGroupsStateHandle> keyGroupStateHandles, + CheckpointStateHandles checkpointStateHandles, long synchronousDurationMillis, long asynchronousDurationMillis, long bytesBufferedInAlignment, long alignmentDurationNanos) { - super.acknowledgeCheckpoint(checkpointId, chainedStateHandle, keyGroupStateHandles, + super.acknowledgeCheckpoint(checkpointId, checkpointStateHandles, synchronousDurationMillis, asynchronousDurationMillis, bytesBufferedInAlignment, alignmentDurationNanos); @@ -156,7 +152,7 @@ public class RocksDBAsyncSnapshotTest { } // should be only one k/v state - assertEquals(1, keyGroupStateHandles.size()); + assertEquals(1, checkpointStateHandles.getKeyGroupsStateHandle().size()); // we now know that the checkpoint went through ensureCheckpointLatch.trigger(); http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java index 3b851be..07fc27c 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java @@ -26,7 +26,6 @@ import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.query.KvStateRegistry; - import org.apache.flink.runtime.state.AbstractStateBackend; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.util.OperatingSystem; @@ -34,7 +33,6 @@ import org.junit.Assume; import org.junit.Before; import org.junit.Rule; import org.junit.Test; - import org.junit.rules.TemporaryFolder; import org.rocksdb.ColumnFamilyOptions; import org.rocksdb.CompactionStyle; @@ -45,8 +43,18 @@ import java.io.File; import static org.hamcrest.CoreMatchers.anyOf; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.startsWith; -import static org.junit.Assert.*; -import static org.mockito.Mockito.*; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; /** @@ -88,13 +96,15 @@ public class RocksDBStateBackendConfigTest { assertArrayEquals(new String[] { testDir1.getAbsolutePath(), testDir2.getAbsolutePath() }, rocksDbBackend.getDbStoragePaths()); Environment env = getMockEnvironment(new File[] {}); - RocksDBKeyedStateBackend<Integer> keyedBackend = (RocksDBKeyedStateBackend<Integer>) rocksDbBackend.createKeyedStateBackend(env, - env.getJobID(), - "test_op", - IntSerializer.INSTANCE, - 1, - new KeyGroupRange(0, 0), - env.getTaskKvStateRegistry()); + RocksDBKeyedStateBackend<Integer> keyedBackend = (RocksDBKeyedStateBackend<Integer>) rocksDbBackend. + createKeyedStateBackend( + env, + env.getJobID(), + "test_op", + IntSerializer.INSTANCE, + 1, + new KeyGroupRange(0, 0), + env.getTaskKvStateRegistry()); File instanceBasePath = keyedBackend.getInstanceBasePath(); @@ -142,13 +152,15 @@ public class RocksDBStateBackendConfigTest { assertNull(rocksDbBackend.getDbStoragePaths()); Environment env = getMockEnvironment(tempDirs); - RocksDBKeyedStateBackend<Integer> keyedBackend = (RocksDBKeyedStateBackend<Integer>) rocksDbBackend.createKeyedStateBackend(env, - env.getJobID(), - "test_op", - IntSerializer.INSTANCE, - 1, - new KeyGroupRange(0, 0), - env.getTaskKvStateRegistry()); + RocksDBKeyedStateBackend<Integer> keyedBackend = (RocksDBKeyedStateBackend<Integer>) rocksDbBackend. + createKeyedStateBackend( + env, + env.getJobID(), + "test_op", + IntSerializer.INSTANCE, + 1, + new KeyGroupRange(0, 0), + env.getTaskKvStateRegistry()); File instanceBasePath = keyedBackend.getInstanceBasePath(); http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java index a9e8da9..ce513cb 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java @@ -18,12 +18,8 @@ package org.apache.flink.api.common.functions; -import java.io.Serializable; -import java.util.List; -import java.util.Map; - -import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.annotation.Public; +import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.api.common.accumulators.DoubleCounter; @@ -33,14 +29,16 @@ import org.apache.flink.api.common.accumulators.LongCounter; import org.apache.flink.api.common.cache.DistributedCache; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; -import org.apache.flink.api.common.state.OperatorState; import org.apache.flink.api.common.state.ReducingState; import org.apache.flink.api.common.state.ReducingStateDescriptor; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; -import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.metrics.MetricGroup; +import java.io.Serializable; +import java.util.List; +import java.util.Map; + /** * A RuntimeContext contains information about the context in which functions are executed. Each parallel instance * of the function will have a context through which it can access static contextual information (such as @@ -347,117 +345,4 @@ public interface RuntimeContext { */ @PublicEvolving <T> ReducingState<T> getReducingState(ReducingStateDescriptor<T> stateProperties); - - /** - * Gets the key/value state, which is only accessible if the function is executed on - * a KeyedStream. Upon calling {@link ValueState#value()}, the key/value state will - * return the value bound to the key of the element currently processed by the function. - * Each operator may maintain multiple key/value states, addressed with different names. - * - * <p>Because the scope of each value is the key of the currently processed element, - * and the elements are distributed by the Flink runtime, the system can transparently - * scale out and redistribute the state and KeyedStream. - * - * <p>The following code example shows how to implement a continuous counter that counts - * how many times elements of a certain key occur, and emits an updated count for that - * element on each occurrence. - * - * <pre>{@code - * DataStream<MyType> stream = ...; - * KeyedStream<MyType> keyedStream = stream.keyBy("id"); - * - * keyedStream.map(new RichMapFunction<MyType, Tuple2<MyType, Long>>() { - * - * private State<Long> state; - * - * public void open(Configuration cfg) { - * state = getRuntimeContext().getKeyValueState(Long.class, 0L); - * } - * - * public Tuple2<MyType, Long> map(MyType value) { - * long count = state.value(); - * state.update(value + 1); - * return new Tuple2<>(value, count); - * } - * }); - * - * }</pre> - * - * <p>This method attempts to deduce the type information from the given type class. If the - * full type cannot be determined from the class (for example because of generic parameters), - * the TypeInformation object must be manually passed via - * {@link #getKeyValueState(String, TypeInformation, Object)}. - * - * - * @param name The name of the key/value state. - * @param stateType The class of the type that is stored in the state. Used to generate - * serializers for managed memory and checkpointing. - * @param defaultState The default state value, returned when the state is accessed and - * no value has yet been set for the key. May be null. - * - * @param <S> The type of the state. - * - * @return The key/value state access. - * - * @throws UnsupportedOperationException Thrown, if no key/value state is available for the - * function (function is not part os a KeyedStream). - * - * @deprecated Use the more expressive {@link #getState(ValueStateDescriptor)} instead. - */ - @Deprecated - @PublicEvolving - <S> OperatorState<S> getKeyValueState(String name, Class<S> stateType, S defaultState); - - /** - * Gets the key/value state, which is only accessible if the function is executed on - * a KeyedStream. Upon calling {@link ValueState#value()}, the key/value state will - * return the value bound to the key of the element currently processed by the function. - * Each operator may maintain multiple key/value states, addressed with different names. - * - * <p>Because the scope of each value is the key of the currently processed element, - * and the elements are distributed by the Flink runtime, the system can transparently - * scale out and redistribute the state and KeyedStream. - * - * <p>The following code example shows how to implement a continuous counter that counts - * how many times elements of a certain key occur, and emits an updated count for that - * element on each occurrence. - * - * <pre>{@code - * DataStream<MyType> stream = ...; - * KeyedStream<MyType> keyedStream = stream.keyBy("id"); - * - * keyedStream.map(new RichMapFunction<MyType, Tuple2<MyType, Long>>() { - * - * private State<Long> state; - * - * public void open(Configuration cfg) { - * state = getRuntimeContext().getKeyValueState(Long.class, 0L); - * } - * - * public Tuple2<MyType, Long> map(MyType value) { - * long count = state.value(); - * state.update(value + 1); - * return new Tuple2<>(value, count); - * } - * }); - * - * }</pre> - * - * @param name The name of the key/value state. - * @param stateType The type information for the type that is stored in the state. - * Used to create serializers for managed memory and checkpoints. - * @param defaultState The default state value, returned when the state is accessed and - * no value has yet been set for the key. May be null. - * @param <S> The type of the state. - * - * @return The key/value state access. - * - * @throws UnsupportedOperationException Thrown, if no key/value state is available for the - * function (function is not part os a KeyedStream). - * - * @deprecated Use the more expressive {@link #getState(ValueStateDescriptor)} instead. - */ - @Deprecated - @PublicEvolving - <S> OperatorState<S> getKeyValueState(String name, TypeInformation<S> stateType, S defaultState); } http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java index 6645964..4f559bf 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java @@ -18,11 +18,6 @@ package org.apache.flink.api.common.functions.util; -import java.io.Serializable; -import java.util.Collections; -import java.util.Map; -import java.util.concurrent.Future; - import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.TaskInfo; @@ -36,15 +31,18 @@ import org.apache.flink.api.common.cache.DistributedCache; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; -import org.apache.flink.api.common.state.OperatorState; import org.apache.flink.api.common.state.ReducingState; import org.apache.flink.api.common.state.ReducingStateDescriptor; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; -import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.core.fs.Path; import org.apache.flink.metrics.MetricGroup; +import java.io.Serializable; +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.Future; + import static org.apache.flink.util.Preconditions.checkNotNull; /** @@ -207,20 +205,4 @@ public abstract class AbstractRuntimeUDFContext implements RuntimeContext { throw new UnsupportedOperationException( "This state is only accessible by functions executed on a KeyedStream"); } - - @Override - @Deprecated - @PublicEvolving - public <S> OperatorState<S> getKeyValueState(String name, Class<S> stateType, S defaultState) { - throw new UnsupportedOperationException( - "This state is only accessible by functions executed on a KeyedStream"); - } - - @Override - @Deprecated - @PublicEvolving - public <S> OperatorState<S> getKeyValueState(String name, TypeInformation<S> stateType, S defaultState) { - throw new UnsupportedOperationException( - "This state is only accessible by functions executed on a KeyedStream"); - } } http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorState.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorState.java b/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorState.java deleted file mode 100644 index ac4ed07..0000000 --- a/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorState.java +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.common.state; - -import org.apache.flink.annotation.PublicEvolving; - -import java.io.IOException; - -/** - * This state interface abstracts persistent key/value state in streaming programs. - * The state is accessed and modified by user functions, and checkpointed consistently - * by the system as part of the distributed snapshots. - * - * <p>The state is only accessible by functions applied on a KeyedDataStream. The key is - * automatically supplied by the system, so the function always sees the value mapped to the - * key of the current element. That way, the system can handle stream and state partitioning - * consistently together. - * - * @param <T> Type of the value in the operator state - * - * @deprecated OperatorState has been replaced by {@link ValueState}. - */ -@Deprecated -@PublicEvolving -public interface OperatorState<T> extends State { - - /** - * Returns the current value for the state. When the state is not - * partitioned the returned value is the same for all inputs in a given - * operator instance. If state partitioning is applied, the value returned - * depends on the current operator input, as the operator maintains an - * independent state for each partition. - * - * @return The operator state value corresponding to the current input. - * - * @throws IOException Thrown if the system cannot access the state. - */ - T value() throws IOException; - - /** - * Updates the operator state accessible by {@link #value()} to the given - * value. The next time {@link #value()} is called (for the same state - * partition) the returned state will represent the updated value. When a - * partitioned state is updated with null, the state for the current key - * will be removed and the default value is returned on the next access. - * - * @param value - * The new value for the state. - * - * @throws IOException Thrown if the system cannot access the state. - */ - void update(T value) throws IOException; - -} http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-core/src/main/java/org/apache/flink/api/common/state/ValueState.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/ValueState.java b/flink-core/src/main/java/org/apache/flink/api/common/state/ValueState.java index 607cb32..de3250a 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/state/ValueState.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/state/ValueState.java @@ -37,7 +37,7 @@ import java.io.IOException; * @param <T> Type of the value in the state. */ @PublicEvolving -public interface ValueState<T> extends State, OperatorState<T> { +public interface ValueState<T> extends State { /** * Returns the current value for the state. When the state is not http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/JavaSerializer.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/JavaSerializer.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/JavaSerializer.java new file mode 100644 index 0000000..4ae00d1 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/JavaSerializer.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils.runtime; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.util.InstantiationUtil; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; + +public class JavaSerializer<T extends Serializable> extends TypeSerializer<T> { + + private static final long serialVersionUID = 1L; + + @Override + public boolean isImmutableType() { + return false; + } + + @Override + public TypeSerializer<T> duplicate() { + return this; + } + + @Override + public T createInstance() { + return null; + } + + @Override + public T copy(T from) { + + try { + return InstantiationUtil.clone(from); + } catch (IOException | ClassNotFoundException e) { + throw new RuntimeException("Could not copy instance of " + from + '.', e); + } + } + + @Override + public T copy(T from, T reuse) { + return copy(from); + } + + @Override + public int getLength() { + return 0; + } + + @Override + public void serialize(T record, DataOutputView target) throws IOException { + ObjectOutputStream oos = new ObjectOutputStream(new DataOutputViewStream(target)); + oos.writeObject(record); + oos.flush(); + } + + @Override + public T deserialize(DataInputView source) throws IOException { + ObjectInputStream ois = new ObjectInputStream(new DataInputViewStream(source)); + + try { + @SuppressWarnings("unchecked") + T nfa = (T) ois.readObject(); + return nfa; + } catch (ClassNotFoundException e) { + throw new RuntimeException("Could not deserialize NFA.", e); + } + } + + @Override + public T deserialize(T reuse, DataInputView source) throws IOException { + return deserialize(source); + } + + @Override + public void copy(DataInputView source, DataOutputView target) throws IOException { + int size = source.readInt(); + target.writeInt(size); + target.write(source, size); + } + + @Override + public boolean equals(Object obj) { + return obj instanceof JavaSerializer && ((JavaSerializer<T>) obj).canEqual(this); + } + + @Override + public boolean canEqual(Object obj) { + return obj instanceof JavaSerializer; + } + + @Override + public int hashCode() { + return getClass().hashCode(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FileStateBackendTest.java ---------------------------------------------------------------------- diff --git a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FileStateBackendTest.java b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FileStateBackendTest.java index df40998..080485e 100644 --- a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FileStateBackendTest.java +++ b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FileStateBackendTest.java @@ -19,7 +19,6 @@ package org.apache.flink.hdfstests; import org.apache.commons.io.FileUtils; - import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.core.fs.FileStatus; @@ -31,10 +30,8 @@ import org.apache.flink.runtime.state.StateBackendTestBase; import org.apache.flink.runtime.state.filesystem.FileStateHandle; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.runtime.state.memory.ByteStreamStateHandle; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.MiniDFSCluster; - import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; @@ -243,16 +240,21 @@ public class FileStateBackendTest extends StateBackendTestBase<FsStateBackend> { } private static void validateBytesInStream(InputStream is, byte[] data) throws IOException { - byte[] holder = new byte[data.length]; - int pos = 0; - int read; - while (pos < holder.length && (read = is.read(holder, pos, holder.length - pos)) != -1) { - pos += read; - } + try { + byte[] holder = new byte[data.length]; + + int pos = 0; + int read; + while (pos < holder.length && (read = is.read(holder, pos, holder.length - pos)) != -1) { + pos += read; + } - assertEquals("not enough data", holder.length, pos); - assertEquals("too much data", -1, is.read()); - assertArrayEquals("wrong data", data, holder); + assertEquals("not enough data", holder.length, pos); + assertEquals("too much data", -1, is.read()); + assertArrayEquals("wrong data", data, holder); + } finally { + is.close(); + } } } http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPBasePatternOperator.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPBasePatternOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPBasePatternOperator.java index aad408c..2f21346 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPBasePatternOperator.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPBasePatternOperator.java @@ -20,6 +20,7 @@ package org.apache.flink.cep.operator; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.cep.nfa.NFA; +import org.apache.flink.streaming.api.operators.StreamCheckpointedOperator; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; @@ -36,7 +37,7 @@ import java.util.PriorityQueue; */ public abstract class AbstractCEPBasePatternOperator<IN, OUT> extends AbstractStreamOperator<OUT> - implements OneInputStreamOperator<IN, OUT> { + implements OneInputStreamOperator<IN, OUT>, StreamCheckpointedOperator { private static final long serialVersionUID = -4166778210774160757L; http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPPatternOperator.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPPatternOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPPatternOperator.java index 64ffa2a..10bb6ff 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPPatternOperator.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPPatternOperator.java @@ -104,7 +104,6 @@ abstract public class AbstractCEPPatternOperator<IN, OUT> extends AbstractCEPBas @Override public void snapshotState(FSDataOutputStream out, long checkpointId, long timestamp) throws Exception { - super.snapshotState(out, checkpointId, timestamp); final ObjectOutputStream oos = new ObjectOutputStream(out); oos.writeObject(nfa); @@ -119,7 +118,6 @@ abstract public class AbstractCEPPatternOperator<IN, OUT> extends AbstractCEPBas @Override @SuppressWarnings("unchecked") public void restoreState(FSDataInputStream state) throws Exception { - super.restoreState(state); final ObjectInputStream ois = new ObjectInputStream(state); nfa = (NFA<IN>)ois.readObject(); http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java index 09773a2..07e2662 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java @@ -187,7 +187,6 @@ abstract public class AbstractKeyedCEPPatternOperator<IN, KEY, OUT> extends Abst @Override public void snapshotState(FSDataOutputStream out, long checkpointId, long timestamp) throws Exception { - super.snapshotState(out, checkpointId, timestamp); DataOutputView ov = new DataOutputViewStreamWrapper(out); ov.writeInt(keys.size()); @@ -199,7 +198,6 @@ abstract public class AbstractKeyedCEPPatternOperator<IN, KEY, OUT> extends Abst @Override public void restoreState(FSDataInputStream state) throws Exception { - super.restoreState(state); DataInputView inputView = new DataInputViewStreamWrapper(state); http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java index 6a43ddf..4428427 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java @@ -34,9 +34,11 @@ import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint; import org.apache.flink.runtime.messages.checkpoint.NotifyCheckpointComplete; import org.apache.flink.runtime.messages.checkpoint.TriggerCheckpoint; import org.apache.flink.runtime.state.ChainedStateHandle; +import org.apache.flink.runtime.state.CheckpointStateHandles; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyGroupRangeAssignment; import org.apache.flink.runtime.state.KeyGroupsStateHandle; +import org.apache.flink.runtime.state.OperatorStateHandle; import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.util.Preconditions; import org.slf4j.Logger; @@ -45,7 +47,9 @@ import scala.concurrent.Future; import java.util.ArrayDeque; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.Iterator; import java.util.LinkedHashMap; @@ -398,9 +402,9 @@ public class CheckpointCoordinator { return new CheckpointTriggerResult(CheckpointDeclineReason.EXCEPTION); } - final PendingCheckpoint checkpoint = props.isSavepoint() ? - new PendingSavepoint(job, checkpointID, timestamp, ackTasks, savepointStore) : - new PendingCheckpoint(job, checkpointID, timestamp, ackTasks); + final PendingCheckpoint checkpoint = props.isSavepoint() ? + new PendingSavepoint(job, checkpointID, timestamp, ackTasks, savepointStore) : + new PendingCheckpoint(job, checkpointID, timestamp, ackTasks); // schedule the timer that will clean up the expired checkpoints TimerTask canceller = new TimerTask() { @@ -627,9 +631,8 @@ public class CheckpointCoordinator { isPendingCheckpoint = true; if (checkpoint.acknowledgeTask( - message.getTaskExecutionId(), - message.getStateHandle(), - message.getKeyGroupsStateHandle())) { + message.getTaskExecutionId(), + message.getCheckpointStateHandles())) { if (checkpoint.isFullyAcknowledged()) { completed = checkpoint.finalizeCheckpoint(); @@ -640,7 +643,7 @@ public class CheckpointCoordinator { if (LOG.isDebugEnabled()) { StringBuilder builder = new StringBuilder(); - for (Map.Entry<JobVertexID, TaskState> entry: completed.getTaskStates().entrySet()) { + for (Map.Entry<JobVertexID, TaskState> entry : completed.getTaskStates().entrySet()) { builder.append("JobVertexID: ").append(entry.getKey()).append(" {").append(entry.getValue()).append("}"); } @@ -654,8 +657,7 @@ public class CheckpointCoordinator { triggerQueuedRequests(); } - } - else { + } else { // checkpoint did not accept message LOG.error("Received duplicate or invalid acknowledge message for checkpoint " + checkpointId + " , task " + message.getTaskExecutionId()); @@ -790,22 +792,80 @@ public class CheckpointCoordinator { } + int oldParallelism = taskState.getParallelism(); + int newParallelism = executionJobVertex.getParallelism(); + boolean parallelismChanged = oldParallelism != newParallelism; boolean hasNonPartitionedState = taskState.hasNonPartitionedState(); - if (hasNonPartitionedState && taskState.getParallelism() != executionJobVertex.getParallelism()) { + if (hasNonPartitionedState && parallelismChanged) { throw new IllegalStateException("Cannot restore the latest checkpoint because " + "the operator " + executionJobVertex.getJobVertexId() + " has non-partitioned " + "state and its parallelism changed. The operator" + executionJobVertex.getJobVertexId() + - " has parallelism " + executionJobVertex.getParallelism() + " whereas the corresponding" + - "state object has a parallelism of " + taskState.getParallelism()); + " has parallelism " + newParallelism + " whereas the corresponding" + + "state object has a parallelism of " + oldParallelism); } List<KeyGroupRange> keyGroupPartitions = createKeyGroupPartitions( - executionJobVertex.getMaxParallelism(), - executionJobVertex.getParallelism()); + executionJobVertex.getMaxParallelism(), + newParallelism); + + // operator chain index -> list of the stored partitionables states from all parallel instances + @SuppressWarnings("unchecked") + List<OperatorStateHandle>[] chainParallelStates = + new List[taskState.getChainLength()]; + + for (int i = 0; i < oldParallelism; ++i) { + + ChainedStateHandle<OperatorStateHandle> partitionableState = + taskState.getPartitionableState(i); + + if (partitionableState != null) { + for (int j = 0; j < partitionableState.getLength(); ++j) { + OperatorStateHandle opParalleState = partitionableState.get(j); + if (opParalleState != null) { + List<OperatorStateHandle> opParallelStates = + chainParallelStates[j]; + if (opParallelStates == null) { + opParallelStates = new ArrayList<>(); + chainParallelStates[j] = opParallelStates; + } + opParallelStates.add(opParalleState); + } + } + } + } + + // operator chain index -> lists with collected states (one collection for each parallel subtasks) + @SuppressWarnings("unchecked") + List<Collection<OperatorStateHandle>>[] redistributedParallelStates = + new List[taskState.getChainLength()]; + + //TODO here we can employ different redistribution strategies for state, e.g. union state. For now we only offer round robin as the default. + OperatorStateRepartitioner repartitioner = RoundRobinOperatorStateRepartitioner.INSTANCE; + + for (int i = 0; i < chainParallelStates.length; ++i) { + List<OperatorStateHandle> chainOpParallelStates = chainParallelStates[i]; + if (chainOpParallelStates != null) { + //We only redistribute if the parallelism of the operator changed from previous executions + if (parallelismChanged) { + redistributedParallelStates[i] = repartitioner.repartitionState( + chainOpParallelStates, + newParallelism); + } else { + List<Collection<OperatorStateHandle>> repacking = new ArrayList<>(newParallelism); + for (OperatorStateHandle operatorStateHandle : chainOpParallelStates) { + repacking.add(Collections.singletonList(operatorStateHandle)); + } + redistributedParallelStates[i] = repacking; + } + } + } int counter = 0; - for (int i = 0; i < executionJobVertex.getParallelism(); i++) { + + for (int i = 0; i < newParallelism; ++i) { + + // non-partitioned state ChainedStateHandle<StreamStateHandle> state = null; if (hasNonPartitionedState) { @@ -813,25 +873,46 @@ public class CheckpointCoordinator { if (subtaskState != null) { // count the number of executions for which we set a state - counter++; + ++counter; state = subtaskState.getChainedStateHandle(); } } + // partitionable state + @SuppressWarnings("unchecked") + Collection<OperatorStateHandle>[] ia = new Collection[taskState.getChainLength()]; + List<Collection<OperatorStateHandle>> subTaskPartitionableState = Arrays.asList(ia); + + for (int j = 0; j < redistributedParallelStates.length; ++j) { + List<Collection<OperatorStateHandle>> redistributedParallelState = + redistributedParallelStates[j]; + + if (redistributedParallelState != null) { + subTaskPartitionableState.set(j, redistributedParallelState.get(i)); + } + } + + // key-partitioned state KeyGroupRange subtaskKeyGroupIds = keyGroupPartitions.get(i); - List<KeyGroupsStateHandle> subtaskKeyGroupStates = getKeyGroupsStateHandles( - taskState.getKeyGroupStates(), - subtaskKeyGroupIds); + // Again, we only repartition if the parallelism changed + List<KeyGroupsStateHandle> subtaskKeyGroupStates = parallelismChanged ? + getKeyGroupsStateHandles(taskState.getKeyGroupStates(), subtaskKeyGroupIds) + : Collections.singletonList(taskState.getKeyGroupState(i)); Execution currentExecutionAttempt = executionJobVertex .getTaskVertices()[i] .getCurrentExecutionAttempt(); - currentExecutionAttempt.setInitialState(state, subtaskKeyGroupStates); + CheckpointStateHandles checkpointStateHandles = new CheckpointStateHandles( + state, + null/*subTaskPartionableState*/, //TODO chose right structure and put redistributed states here + subtaskKeyGroupStates); + + currentExecutionAttempt.setInitialState(checkpointStateHandles, subTaskPartitionableState); } - if (allOrNothingState && counter > 0 && counter < executionJobVertex.getParallelism()) { + if (allOrNothingState && counter > 0 && counter < newParallelism) { throw new IllegalStateException("The checkpoint contained state only for " + "a subset of tasks for vertex " + executionJobVertex); } @@ -859,7 +940,7 @@ public class CheckpointCoordinator { for (KeyGroupsStateHandle storedKeyGroup : allKeyGroupsHandles) { KeyGroupsStateHandle intersection = storedKeyGroup.getKeyGroupIntersection(subtaskKeyGroupIds); - if(intersection.getNumberOfKeyGroups() > 0) { + if (intersection.getNumberOfKeyGroups() > 0) { subtaskKeyGroupStates.add(intersection); } } @@ -881,7 +962,7 @@ public class CheckpointCoordinator { public static List<KeyGroupRange> createKeyGroupPartitions(int numberKeyGroups, int parallelism) { Preconditions.checkArgument(numberKeyGroups >= parallelism); List<KeyGroupRange> result = new ArrayList<>(parallelism); - int start = 0; + for (int i = 0; i < parallelism; ++i) { result.add(KeyGroupRangeAssignment.computeKeyGroupRangeForOperatorIndex(numberKeyGroups, parallelism, i)); } http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java index 7cb3916..0d279f1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java @@ -153,9 +153,4 @@ public class CompletedCheckpoint implements StateObject { public String toString() { return String.format("Checkpoint %d @ %d for %s", checkpointID, timestamp, job); } - - @Override - public void close() throws IOException { - StateUtil.bestEffortCloseAllStateObjects(taskStates.values()); - } } http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorStateRepartitioner.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorStateRepartitioner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorStateRepartitioner.java new file mode 100644 index 0000000..98810f1 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorStateRepartitioner.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint; + +import org.apache.flink.runtime.state.OperatorStateHandle; + +import java.util.Collection; +import java.util.List; + +/** + * Interface that allows to implement different strategies for repartitioning of operator state as parallelism changes. + */ +public interface OperatorStateRepartitioner { + + /** + * @param previousParallelSubtaskStates List of state handles to the parallel subtask states of an operator, as they + * have been checkpointed. + * @param parallelism The parallelism that we consider for the state redistribution. Determines the size of the + * returned list. + * @return List with one entry per parallel subtask. Each subtask receives now one collection of states that build + * of the new total state for this subtask. + */ + List<Collection<OperatorStateHandle>> repartitionState( + List<OperatorStateHandle> previousParallelSubtaskStates, + int parallelism); +} http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java index d499a5a..2ca9d69 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java @@ -23,7 +23,9 @@ import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.executiongraph.ExecutionVertex; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.state.ChainedStateHandle; +import org.apache.flink.runtime.state.CheckpointStateHandles; import org.apache.flink.runtime.state.KeyGroupsStateHandle; +import org.apache.flink.runtime.state.OperatorStateHandle; import org.apache.flink.runtime.state.StateUtil; import org.apache.flink.runtime.state.StreamStateHandle; @@ -167,49 +169,80 @@ public class PendingCheckpoint { } public boolean acknowledgeTask( - ExecutionAttemptID attemptID, - ChainedStateHandle<StreamStateHandle> state, - List<KeyGroupsStateHandle> keyGroupsState) { + ExecutionAttemptID attemptID, + CheckpointStateHandles checkpointStateHandles) { synchronized (lock) { if (discarded) { return false; } - - ExecutionVertex vertex = notYetAcknowledgedTasks.remove(attemptID); - if (vertex != null) { - if (state != null || keyGroupsState != null) { - JobVertexID jobVertexID = vertex.getJobvertexId(); + ExecutionVertex vertex = notYetAcknowledgedTasks.remove(attemptID); - TaskState taskState; + if (vertex != null) { - if (taskStates.containsKey(jobVertexID)) { - taskState = taskStates.get(jobVertexID); - } else { - taskState = new TaskState(jobVertexID, vertex.getTotalNumberOfParallelSubtasks(), vertex.getMaxParallelism()); - taskStates.put(jobVertexID, taskState); + if (checkpointStateHandles != null) { + List<KeyGroupsStateHandle> keyGroupsState = checkpointStateHandles.getKeyGroupsStateHandle(); + ChainedStateHandle<StreamStateHandle> nonPartitionedState = + checkpointStateHandles.getNonPartitionedStateHandles(); + ChainedStateHandle<OperatorStateHandle> partitioneableState = + checkpointStateHandles.getPartitioneableStateHandles(); + + if (nonPartitionedState != null || partitioneableState != null || keyGroupsState != null) { + + JobVertexID jobVertexID = vertex.getJobvertexId(); + + int subtaskIndex = vertex.getParallelSubtaskIndex(); + + TaskState taskState; + + if (taskStates.containsKey(jobVertexID)) { + taskState = taskStates.get(jobVertexID); + } else { + //TODO this should go away when we remove chained state, assigning state to operators directly instead + int chainLength; + if (nonPartitionedState != null) { + chainLength = nonPartitionedState.getLength(); + } else if (partitioneableState != null) { + chainLength = partitioneableState.getLength(); + } else { + chainLength = 1; + } + + taskState = new TaskState( + jobVertexID, + vertex.getTotalNumberOfParallelSubtasks(), + vertex.getMaxParallelism(), + chainLength); + + taskStates.put(jobVertexID, taskState); + } + + long duration = System.currentTimeMillis() - checkpointTimestamp; + + if (nonPartitionedState != null) { + taskState.putState( + subtaskIndex, + new SubtaskState(nonPartitionedState, duration)); + } + + if(partitioneableState != null && !partitioneableState.isEmpty()) { + taskState.putPartitionableState(subtaskIndex, partitioneableState); + } + + // currently a checkpoint can only contain keyed state + // for the head operator + if (keyGroupsState != null && !keyGroupsState.isEmpty()) { + KeyGroupsStateHandle keyGroupsStateHandle = keyGroupsState.get(0); + taskState.putKeyedState(subtaskIndex, keyGroupsStateHandle); + } } + } - long duration = System.currentTimeMillis() - checkpointTimestamp; + ++numAcknowledgedTasks; - if (state != null) { - taskState.putState( - vertex.getParallelSubtaskIndex(), - new SubtaskState(state, duration)); - } - - // currently a checkpoint can only contain keyed state - // for the head operator - if (keyGroupsState != null && !keyGroupsState.isEmpty()) { - KeyGroupsStateHandle keyGroupsStateHandle = keyGroupsState.get(0); - taskState.putKeyedState(vertex.getParallelSubtaskIndex(), keyGroupsStateHandle); - } - } - numAcknowledgedTasks++; return true; - } - else { + } else { return false; } } http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/RoundRobinOperatorStateRepartitioner.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/RoundRobinOperatorStateRepartitioner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/RoundRobinOperatorStateRepartitioner.java new file mode 100644 index 0000000..09a35f6 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/RoundRobinOperatorStateRepartitioner.java @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.state.OperatorStateHandle; +import org.apache.flink.runtime.state.StreamStateHandle; +import org.apache.flink.util.Preconditions; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Current default implementation of {@link OperatorStateRepartitioner} that redistributes state in round robin fashion. + */ +public class RoundRobinOperatorStateRepartitioner implements OperatorStateRepartitioner { + + public static final OperatorStateRepartitioner INSTANCE = new RoundRobinOperatorStateRepartitioner(); + private static final boolean OPTIMIZE_MEMORY_USE = false; + + @Override + public List<Collection<OperatorStateHandle>> repartitionState( + List<OperatorStateHandle> previousParallelSubtaskStates, + int parallelism) { + + Preconditions.checkNotNull(previousParallelSubtaskStates); + Preconditions.checkArgument(parallelism > 0); + + // Reorganize: group by (State Name -> StreamStateHandle + Offsets) + Map<String, List<Tuple2<StreamStateHandle, long[]>>> nameToState = + groupByStateName(previousParallelSubtaskStates); + + if (OPTIMIZE_MEMORY_USE) { + previousParallelSubtaskStates.clear(); // free for GC at to cost that old handles are no longer available + } + + // Assemble result from all merge maps + List<Collection<OperatorStateHandle>> result = new ArrayList<>(parallelism); + + // Do the actual repartitioning for all named states + List<Map<StreamStateHandle, OperatorStateHandle>> mergeMapList = + repartition(nameToState, parallelism); + + for (int i = 0; i < mergeMapList.size(); ++i) { + result.add(i, new ArrayList<>(mergeMapList.get(i).values())); + } + + return result; + } + + /** + * Group by the different named states. + */ + private Map<String, List<Tuple2<StreamStateHandle, long[]>>> groupByStateName( + List<OperatorStateHandle> previousParallelSubtaskStates) { + + //Reorganize: group by (State Name -> StreamStateHandle + Offsets) + Map<String, List<Tuple2<StreamStateHandle, long[]>>> nameToState = new HashMap<>(); + for (OperatorStateHandle psh : previousParallelSubtaskStates) { + + for (Map.Entry<String, long[]> e : psh.getStateNameToPartitionOffsets().entrySet()) { + + List<Tuple2<StreamStateHandle, long[]>> stateLocations = nameToState.get(e.getKey()); + + if (stateLocations == null) { + stateLocations = new ArrayList<>(); + nameToState.put(e.getKey(), stateLocations); + } + + stateLocations.add(new Tuple2<>(psh.getDelegateStateHandle(), e.getValue())); + } + } + return nameToState; + } + + /** + * Repartition all named states. + */ + private List<Map<StreamStateHandle, OperatorStateHandle>> repartition( + Map<String, List<Tuple2<StreamStateHandle, long[]>>> nameToState, int parallelism) { + + // We will use this to merge w.r.t. StreamStateHandles for each parallel subtask inside the maps + List<Map<StreamStateHandle, OperatorStateHandle>> mergeMapList = new ArrayList<>(parallelism); + // Initialize + for (int i = 0; i < parallelism; ++i) { + mergeMapList.add(new HashMap<StreamStateHandle, OperatorStateHandle>()); + } + + int startParallelOP = 0; + // Iterate all named states and repartition one named state at a time per iteration + for (Map.Entry<String, List<Tuple2<StreamStateHandle, long[]>>> e : nameToState.entrySet()) { + + List<Tuple2<StreamStateHandle, long[]>> current = e.getValue(); + + // Determine actual number of partitions for this named state + int totalPartitions = 0; + for (Tuple2<StreamStateHandle, long[]> offsets : current) { + totalPartitions += offsets.f1.length; + } + + // Repartition the state across the parallel operator instances + int lstIdx = 0; + int offsetIdx = 0; + int baseFraction = totalPartitions / parallelism; + int remainder = totalPartitions % parallelism; + + int newStartParallelOp = startParallelOP; + + for (int i = 0; i < parallelism; ++i) { + + // Preparation: calculate the actual index considering wrap around + int parallelOpIdx = (i + startParallelOP) % parallelism; + + // Now calculate the number of partitions we will assign to the parallel instance in this round ... + int numberOfPartitionsToAssign = baseFraction; + + // ... and distribute odd partitions while we still have some, one at a time + if (remainder > 0) { + ++numberOfPartitionsToAssign; + --remainder; + } else if (remainder == 0) { + // We are out of odd partitions now and begin our next redistribution round with the current + // parallel operator to ensure fair load balance + newStartParallelOp = parallelOpIdx; + --remainder; + } + + // Now start collection the partitions for the parallel instance into this list + List<Tuple2<StreamStateHandle, long[]>> parallelOperatorState = new ArrayList<>(); + + while (numberOfPartitionsToAssign > 0) { + Tuple2<StreamStateHandle, long[]> handleWithOffsets = current.get(lstIdx); + long[] offsets = handleWithOffsets.f1; + int remaining = offsets.length - offsetIdx; + // Repartition offsets + long[] offs; + if (remaining > numberOfPartitionsToAssign) { + offs = Arrays.copyOfRange(offsets, offsetIdx, offsetIdx + numberOfPartitionsToAssign); + offsetIdx += numberOfPartitionsToAssign; + } else { + if (OPTIMIZE_MEMORY_USE) { + handleWithOffsets.f1 = null; // GC + } + offs = Arrays.copyOfRange(offsets, offsetIdx, offsets.length); + offsetIdx = 0; + ++lstIdx; + } + + parallelOperatorState.add( + new Tuple2<>(handleWithOffsets.f0, offs)); + + numberOfPartitionsToAssign -= remaining; + + // As a last step we merge partitions that use the same StreamStateHandle in a single + // OperatorStateHandle + Map<StreamStateHandle, OperatorStateHandle> mergeMap = mergeMapList.get(parallelOpIdx); + OperatorStateHandle psh = mergeMap.get(handleWithOffsets.f0); + if (psh == null) { + psh = new OperatorStateHandle(handleWithOffsets.f0, new HashMap<String, long[]>()); + mergeMap.put(handleWithOffsets.f0, psh); + } + psh.getStateNameToPartitionOffsets().put(e.getKey(), offs); + } + } + startParallelOP = newStartParallelOp; + e.setValue(null); + } + return mergeMapList; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SubtaskState.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SubtaskState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SubtaskState.java index 9beb233..2aa0491 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SubtaskState.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SubtaskState.java @@ -24,8 +24,6 @@ import org.apache.flink.runtime.state.StreamStateHandle; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; - import static org.apache.flink.util.Preconditions.checkNotNull; /** @@ -112,11 +110,4 @@ public class SubtaskState implements StateObject { public String toString() { return String.format("SubtaskState(Size: %d, Duration: %d, State: %s)", stateSize, duration, chainedStateHandle); } - - @Override - public void close() throws IOException { - chainedStateHandle.close(); - } - - }
