[FLINK-6364] [checkpoint] Incremental checkpointing in RocksDBKeyedStateBackend
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6e94cf19 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6e94cf19 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6e94cf19 Branch: refs/heads/master Commit: 6e94cf19736b3b3751abe55cf0f3ce4aa740ef96 Parents: 5795ebe Author: xiaogang.sxg <[email protected]> Authored: Sat Apr 29 23:44:36 2017 +0800 Committer: Stefan Richter <[email protected]> Committed: Fri May 5 16:30:06 2017 +0200 ---------------------------------------------------------------------- .../RocksDBIncrementalKeyedStateHandle.java | 248 +++++++ .../state/RocksDBKeyedStateBackend.java | 711 ++++++++++++++++++- .../streaming/state/RocksDBStateBackend.java | 60 +- .../state/RocksDBAggregatingStateTest.java | 6 +- .../state/RocksDBAsyncSnapshotTest.java | 19 +- .../streaming/state/RocksDBListStateTest.java | 6 +- .../state/RocksDBReducingStateTest.java | 6 +- .../state/RocksDBStateBackendTest.java | 41 +- .../flink/runtime/checkpoint/SubtaskState.java | 16 +- .../state/AbstractKeyedStateBackend.java | 10 + .../runtime/state/KeyGroupsStateHandle.java | 10 + .../flink/runtime/state/KeyedStateHandle.java | 2 +- .../apache/flink/runtime/state/StateUtil.java | 9 + .../state/heap/HeapKeyedStateBackend.java | 9 + .../runtime/state/StateBackendTestBase.java | 15 +- .../api/operators/AbstractStreamOperator.java | 6 +- .../streaming/runtime/tasks/StreamTask.java | 7 +- .../KeyedOneInputStreamOperatorTestHarness.java | 4 +- .../PartitionedStateCheckpointingITCase.java | 45 ++ .../KVStateRequestSerializerRocksDBTest.java | 8 +- 20 files changed, 1163 insertions(+), 75 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/6e94cf19/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalKeyedStateHandle.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalKeyedStateHandle.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalKeyedStateHandle.java new file mode 100644 index 0000000..5ac9e46 --- /dev/null +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalKeyedStateHandle.java @@ -0,0 +1,248 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.state.CompositeStateHandle; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyedStateHandle; +import org.apache.flink.runtime.state.SharedStateHandle; +import org.apache.flink.runtime.state.SharedStateRegistry; +import org.apache.flink.runtime.state.StateUtil; +import org.apache.flink.runtime.state.StreamStateHandle; +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; + +/** + * The handle to states in incremental snapshots taken by {@link RocksDBKeyedStateBackend}. + * + * The states contained in an incremental snapshot include + * <ul> + * <li> New SST state which includes the sst files produced since the last completed + * checkpoint. These files can be referenced by succeeding checkpoints if the + * checkpoint succeeds to complete. </li> + * <li> Old SST state which includes the sst files materialized in previous + * checkpoints. </li> + * <li> MISC state which include the other files in the RocksDB instance, e.g. the + * LOG and MANIFEST files. These files are mutable, hence cannot be shared by + * other checkpoints. </li> + * <li> Meta state which includes the information of existing states. </li> + * </ul> + */ +public class RocksDBIncrementalKeyedStateHandle implements KeyedStateHandle, CompositeStateHandle { + + private static final Logger LOG = LoggerFactory.getLogger(RocksDBIncrementalKeyedStateHandle.class); + + private static final long serialVersionUID = -8328808513197388231L; + + private final JobID jobId; + + private final String operatorIdentifier; + + private final KeyGroupRange keyGroupRange; + + private final long checkpointId; + + private final Map<String, StreamStateHandle> newSstFiles; + + private final Map<String, StreamStateHandle> oldSstFiles; + + private final Map<String, StreamStateHandle> miscFiles; + + private final StreamStateHandle metaStateHandle; + + /** + * True if the state handle has already registered shared states. + * + * Once the shared states are registered, it's the {@link SharedStateRegistry}'s + * responsibility to maintain the shared states. But in the cases where the + * state handle is discarded before performing the registration, the handle + * should delete all the shared states created by it. + */ + private boolean registered; + + RocksDBIncrementalKeyedStateHandle( + JobID jobId, + String operatorIdentifier, + KeyGroupRange keyGroupRange, + long checkpointId, + Map<String, StreamStateHandle> newSstFiles, + Map<String, StreamStateHandle> oldSstFiles, + Map<String, StreamStateHandle> miscFiles, + StreamStateHandle metaStateHandle) { + + this.jobId = Preconditions.checkNotNull(jobId); + this.operatorIdentifier = Preconditions.checkNotNull(operatorIdentifier); + this.keyGroupRange = Preconditions.checkNotNull(keyGroupRange); + this.checkpointId = checkpointId; + this.newSstFiles = Preconditions.checkNotNull(newSstFiles); + this.oldSstFiles = Preconditions.checkNotNull(oldSstFiles); + this.miscFiles = Preconditions.checkNotNull(miscFiles); + this.metaStateHandle = Preconditions.checkNotNull(metaStateHandle); + this.registered = false; + } + + @Override + public KeyGroupRange getKeyGroupRange() { + return keyGroupRange; + } + + long getCheckpointId() { + return checkpointId; + } + + Map<String, StreamStateHandle> getNewSstFiles() { + return newSstFiles; + } + + Map<String, StreamStateHandle> getOldSstFiles() { + return oldSstFiles; + } + + Map<String, StreamStateHandle> getMiscFiles() { + return miscFiles; + } + + StreamStateHandle getMetaStateHandle() { + return metaStateHandle; + } + + @Override + public KeyedStateHandle getIntersection(KeyGroupRange keyGroupRange) { + if (this.keyGroupRange.getIntersection(keyGroupRange) != KeyGroupRange.EMPTY_KEY_GROUP_RANGE) { + return this; + } else { + return null; + } + } + + @Override + public void discardState() throws Exception { + + try { + metaStateHandle.discardState(); + } catch (Exception e) { + LOG.warn("Could not properly discard meta data.", e); + } + + try { + StateUtil.bestEffortDiscardAllStateObjects(miscFiles.values()); + } catch (Exception e) { + LOG.warn("Could not properly discard misc file states.", e); + } + + if (!registered) { + try { + StateUtil.bestEffortDiscardAllStateObjects(newSstFiles.values()); + } catch (Exception e) { + LOG.warn("Could not properly discard new sst file states.", e); + } + } + } + + @Override + public long getStateSize() { + long size = StateUtil.getStateSize(metaStateHandle); + + for (StreamStateHandle newSstFileHandle : newSstFiles.values()) { + size += newSstFileHandle.getStateSize(); + } + + for (StreamStateHandle oldSstFileHandle : oldSstFiles.values()) { + size += oldSstFileHandle.getStateSize(); + } + + for (StreamStateHandle miscFileHandle : miscFiles.values()) { + size += miscFileHandle.getStateSize(); + } + + return size; + } + + @Override + public void registerSharedStates(SharedStateRegistry stateRegistry) { + Preconditions.checkState(!registered, "The state handle has already registered its shared states."); + + for (Map.Entry<String, StreamStateHandle> newSstFileEntry : newSstFiles.entrySet()) { + SstFileStateHandle stateHandle = new SstFileStateHandle(newSstFileEntry.getKey(), newSstFileEntry.getValue()); + + int referenceCount = stateRegistry.register(stateHandle); + Preconditions.checkState(referenceCount == 1); + } + + for (Map.Entry<String, StreamStateHandle> oldSstFileEntry : oldSstFiles.entrySet()) { + SstFileStateHandle stateHandle = new SstFileStateHandle(oldSstFileEntry.getKey(), oldSstFileEntry.getValue()); + + int referenceCount = stateRegistry.register(stateHandle); + Preconditions.checkState(referenceCount > 1); + } + + registered = true; + } + + @Override + public void unregisterSharedStates(SharedStateRegistry stateRegistry) { + Preconditions.checkState(registered, "The state handle has not registered its shared states yet."); + + for (Map.Entry<String, StreamStateHandle> newSstFileEntry : newSstFiles.entrySet()) { + stateRegistry.unregister(new SstFileStateHandle(newSstFileEntry.getKey(), newSstFileEntry.getValue())); + } + + for (Map.Entry<String, StreamStateHandle> oldSstFileEntry : oldSstFiles.entrySet()) { + stateRegistry.unregister(new SstFileStateHandle(oldSstFileEntry.getKey(), oldSstFileEntry.getValue())); + } + + registered = false; + } + + private class SstFileStateHandle implements SharedStateHandle { + + private static final long serialVersionUID = 9092049285789170669L; + + private final String fileName; + + private final StreamStateHandle delegateStateHandle; + + private SstFileStateHandle( + String fileName, + StreamStateHandle delegateStateHandle) { + this.fileName = fileName; + this.delegateStateHandle = delegateStateHandle; + } + + @Override + public String getRegistrationKey() { + return jobId + "-" + operatorIdentifier + "-" + keyGroupRange + "-" + fileName; + } + + @Override + public void discardState() throws Exception { + delegateStateHandle.discardState(); + } + + @Override + public long getStateSize() { + return delegateStateHandle.getStateSize(); + } + } +} + http://git-wip-us.apache.org/repos/asf/flink/blob/6e94cf19/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java index 199a5a4..ee5f956 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java @@ -31,7 +31,12 @@ import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerial import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream; import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.core.fs.CloseableRegistry; import org.apache.flink.core.fs.FSDataInputStream; +import org.apache.flink.core.fs.FSDataOutputStream; +import org.apache.flink.core.fs.FileStatus; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; import org.apache.flink.core.memory.ByteArrayInputStreamWithPos; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataInputViewStreamWrapper; @@ -55,6 +60,8 @@ import org.apache.flink.runtime.state.KeyGroupsStateHandle; import org.apache.flink.runtime.state.KeyedBackendSerializationProxy; import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.runtime.state.RegisteredBackendStateMetaInfo; +import org.apache.flink.runtime.state.StateObject; +import org.apache.flink.runtime.state.StateUtil; import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.state.internal.InternalAggregatingState; import org.apache.flink.runtime.state.internal.InternalFoldingState; @@ -67,6 +74,7 @@ import org.apache.flink.util.FileUtils; import org.apache.flink.util.IOUtils; import org.apache.flink.util.InstantiationUtil; import org.apache.flink.util.Preconditions; +import org.rocksdb.Checkpoint; import org.rocksdb.ColumnFamilyDescriptor; import org.rocksdb.ColumnFamilyHandle; import org.rocksdb.ColumnFamilyOptions; @@ -83,13 +91,20 @@ import java.io.EOFException; import java.io.File; import java.io.IOException; import java.io.ObjectInputStream; +import java.nio.file.Files; import java.util.ArrayList; import java.util.Collection; import java.util.Comparator; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.PriorityQueue; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.FutureTask; import java.util.concurrent.RunnableFuture; /** @@ -102,6 +117,10 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { private static final Logger LOG = LoggerFactory.getLogger(RocksDBKeyedStateBackend.class); + private final JobID jobId; + + private final String operatorIdentifier; + /** The column family options from the options factory */ private final ColumnFamilyOptions columnOptions; @@ -137,6 +156,17 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { /** Number of bytes required to prefix the key groups. */ private final int keyGroupPrefixBytes; + /** True if incremental checkpointing is enabled */ + private final boolean enableIncrementalCheckpointing; + + /** The sst files materialized in pending checkpoints */ + private final SortedMap<Long, Map<String, StreamStateHandle>> materializedSstFiles = new TreeMap<>(); + + /** The identifier of the last completed checkpoint */ + private long lastCompletedCheckpointId = -1; + + private static final String SST_FILE_SUFFIX = ".sst"; + public RocksDBKeyedStateBackend( JobID jobId, String operatorIdentifier, @@ -148,10 +178,17 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { TypeSerializer<K> keySerializer, int numberOfKeyGroups, KeyGroupRange keyGroupRange, - ExecutionConfig executionConfig + ExecutionConfig executionConfig, + boolean enableIncrementalCheckpointing ) throws IOException { super(kvStateRegistry, keySerializer, userCodeClassLoader, numberOfKeyGroups, keyGroupRange, executionConfig); + + this.jobId = Preconditions.checkNotNull(jobId); + this.operatorIdentifier = Preconditions.checkNotNull(operatorIdentifier); + + this.enableIncrementalCheckpointing = enableIncrementalCheckpointing; + this.columnOptions = Preconditions.checkNotNull(columnFamilyOptions); this.dbOptions = Preconditions.checkNotNull(dbOptions); @@ -174,21 +211,6 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { throw new IOException("Error cleaning RocksDB data directory.", e); } - List<ColumnFamilyDescriptor> columnFamilyDescriptors = new ArrayList<>(1); - // RocksDB seems to need this... - columnFamilyDescriptors.add(new ColumnFamilyDescriptor("default".getBytes(ConfigConstants.DEFAULT_CHARSET))); - List<ColumnFamilyHandle> columnFamilyHandles = new ArrayList<>(1); - try { - - db = RocksDB.open( - Preconditions.checkNotNull(dbOptions), - instanceRocksDBPath.getAbsolutePath(), - columnFamilyDescriptors, - columnFamilyHandles); - - } catch (RocksDBException e) { - throw new IOException("Error while opening RocksDB instance.", e); - } keyGroupPrefixBytes = getNumberOfKeyGroups() > (Byte.MAX_VALUE + 1) ? 2 : 1; kvStateInformation = new HashMap<>(); } @@ -265,9 +287,71 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { final CheckpointStreamFactory streamFactory, CheckpointOptions checkpointOptions) throws Exception { + if (checkpointOptions.getCheckpointType() != CheckpointOptions.CheckpointType.SAVEPOINT && + enableIncrementalCheckpointing) { + return snapshotIncrementally(checkpointId, timestamp, streamFactory); + } else { + return snapshotFully(checkpointId, timestamp, streamFactory); + } + } + + private RunnableFuture<KeyedStateHandle> snapshotIncrementally( + final long checkpointId, + final long checkpointTimestamp, + final CheckpointStreamFactory checkpointStreamFactory) throws Exception { + + final RocksDBIncrementalSnapshotOperation snapshotOperation = + new RocksDBIncrementalSnapshotOperation( + this, + checkpointStreamFactory, + checkpointId, + checkpointTimestamp); + + synchronized (asyncSnapshotLock) { + if (db == null) { + throw new IOException("RocksDB closed."); + } + + if (!hasRegisteredState()) { + if (LOG.isDebugEnabled()) { + LOG.debug("Asynchronous RocksDB snapshot performed on empty keyed state at " + + checkpointTimestamp + " . Returning null."); + } + return DoneFuture.nullValue(); + } + + snapshotOperation.takeSnapshot(); + } + + return new FutureTask<KeyedStateHandle>( + new Callable<KeyedStateHandle>() { + @Override + public KeyedStateHandle call() throws Exception { + return snapshotOperation.materializeSnapshot(); + } + } + ) { + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + snapshotOperation.stop(); + return super.cancel(mayInterruptIfRunning); + } + + @Override + protected void done() { + snapshotOperation.releaseResources(isCancelled()); + } + }; + } + + private RunnableFuture<KeyedStateHandle> snapshotFully( + final long checkpointId, + final long timestamp, + final CheckpointStreamFactory streamFactory) throws Exception { + long startTime = System.currentTimeMillis(); - final RocksDBSnapshotOperation snapshotOperation = new RocksDBSnapshotOperation(this, streamFactory); + final RocksDBFullSnapshotOperation snapshotOperation = new RocksDBFullSnapshotOperation(this, streamFactory); // hold the db lock while operation on the db to guard us against async db disposal synchronized (asyncSnapshotLock) { @@ -342,7 +426,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { /** * Encapsulates the process to perform a snapshot of a RocksDBKeyedStateBackend. */ - static final class RocksDBSnapshotOperation { + static final class RocksDBFullSnapshotOperation { static final int FIRST_BIT_IN_BYTE_MASK = 0x80; static final int END_OF_KEY_GROUP_MARK = 0xFFFF; @@ -362,7 +446,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { private DataOutputView outputView; private KeyGroupsStateHandle snapshotResultStateHandle; - RocksDBSnapshotOperation( + RocksDBFullSnapshotOperation( RocksDBKeyedStateBackend<?> stateBackend, CheckpointStreamFactory checkpointStreamFactory) { @@ -607,11 +691,11 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { } static void clearMetaDataFollowsFlag(byte[] key) { - key[0] &= (~RocksDBSnapshotOperation.FIRST_BIT_IN_BYTE_MASK); + key[0] &= (~RocksDBFullSnapshotOperation.FIRST_BIT_IN_BYTE_MASK); } static boolean hasMetaDataFollowsFlag(byte[] key) { - return 0 != (key[0] & RocksDBSnapshotOperation.FIRST_BIT_IN_BYTE_MASK); + return 0 != (key[0] & RocksDBFullSnapshotOperation.FIRST_BIT_IN_BYTE_MASK); } private static void checkInterrupted() throws InterruptedException { @@ -621,6 +705,239 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { } } + private static class RocksDBIncrementalSnapshotOperation { + + private final RocksDBKeyedStateBackend<?> stateBackend; + + private final CheckpointStreamFactory checkpointStreamFactory; + + private final long checkpointId; + + private final long checkpointTimestamp; + + private Map<String, StreamStateHandle> baseSstFiles; + + private List<KeyedBackendSerializationProxy.StateMetaInfo<?, ?>> stateMetaInfos = new ArrayList<>(); + + private FileSystem backupFileSystem; + private Path backupPath; + + // Registry for all opened i/o streams + private CloseableRegistry closeableRegistry = new CloseableRegistry(); + + // new sst files since the last completed checkpoint + private Map<String, StreamStateHandle> newSstFiles = new HashMap<>(); + + // old sst files which have been materialized in previous completed checkpoints + private Map<String, StreamStateHandle> oldSstFiles = new HashMap<>(); + + // handles to the misc files in the current snapshot + private Map<String, StreamStateHandle> miscFiles = new HashMap<>(); + + private StreamStateHandle metaStateHandle = null; + + private RocksDBIncrementalSnapshotOperation( + RocksDBKeyedStateBackend<?> stateBackend, + CheckpointStreamFactory checkpointStreamFactory, + long checkpointId, + long checkpointTimestamp) { + + this.stateBackend = stateBackend; + this.checkpointStreamFactory = checkpointStreamFactory; + this.checkpointId = checkpointId; + this.checkpointTimestamp = checkpointTimestamp; + } + + private StreamStateHandle materializeStateData(Path filePath) throws Exception { + FSDataInputStream inputStream = null; + CheckpointStreamFactory.CheckpointStateOutputStream outputStream = null; + + try { + final byte[] buffer = new byte[1024]; + + FileSystem backupFileSystem = backupPath.getFileSystem(); + inputStream = backupFileSystem.open(filePath); + closeableRegistry.registerClosable(inputStream); + + outputStream = checkpointStreamFactory + .createCheckpointStateOutputStream(checkpointId, checkpointTimestamp); + closeableRegistry.registerClosable(outputStream); + + while (true) { + int numBytes = inputStream.read(buffer); + + if (numBytes == -1) { + break; + } + + outputStream.write(buffer, 0, numBytes); + } + + closeableRegistry.unregisterClosable(outputStream); + StreamStateHandle result = outputStream.closeAndGetHandle(); + outputStream = null; + + return result; + } finally { + if (inputStream != null) { + closeableRegistry.unregisterClosable(inputStream); + inputStream.close(); + } + + if (outputStream != null) { + closeableRegistry.unregisterClosable(outputStream); + outputStream.close(); + } + } + } + + private StreamStateHandle materializeMetaData() throws Exception { + CheckpointStreamFactory.CheckpointStateOutputStream outputStream = null; + + try { + outputStream = checkpointStreamFactory + .createCheckpointStateOutputStream(checkpointId, checkpointTimestamp); + stateBackend.cancelStreamRegistry.registerClosable(outputStream); + + KeyedBackendSerializationProxy serializationProxy = + new KeyedBackendSerializationProxy(stateBackend.keySerializer, stateMetaInfos); + DataOutputView out = new DataOutputViewStreamWrapper(outputStream); + + serializationProxy.write(out); + + stateBackend.cancelStreamRegistry.unregisterClosable(outputStream); + StreamStateHandle result = outputStream.closeAndGetHandle(); + outputStream = null; + + return result; + } finally { + if (outputStream != null) { + stateBackend.cancelStreamRegistry.unregisterClosable(outputStream); + outputStream.close(); + } + } + } + + void takeSnapshot() throws Exception { + // use the last completed checkpoint as the comparison base. + baseSstFiles = stateBackend.materializedSstFiles.get(stateBackend.lastCompletedCheckpointId); + + // save meta data + for (Map.Entry<String, Tuple2<ColumnFamilyHandle, RegisteredBackendStateMetaInfo<?, ?>>> stateMetaInfoEntry : stateBackend.kvStateInformation.entrySet()) { + + RegisteredBackendStateMetaInfo<?, ?> metaInfo = stateMetaInfoEntry.getValue().f1; + + KeyedBackendSerializationProxy.StateMetaInfo<?, ?> metaInfoProxy = + new KeyedBackendSerializationProxy.StateMetaInfo<>( + metaInfo.getStateType(), + metaInfo.getName(), + metaInfo.getNamespaceSerializer(), + metaInfo.getStateSerializer()); + + stateMetaInfos.add(metaInfoProxy); + } + + // save state data + backupPath = new Path(stateBackend.instanceBasePath.getAbsolutePath(), "chk-" + checkpointId); + backupFileSystem = backupPath.getFileSystem(); + if (backupFileSystem.exists(backupPath)) { + throw new IllegalStateException("Unexpected existence of the backup directory."); + } + + // create hard links of living files in the checkpoint path + Checkpoint checkpoint = Checkpoint.create(stateBackend.db); + checkpoint.createCheckpoint(backupPath.getPath()); + } + + KeyedStateHandle materializeSnapshot() throws Exception { + + synchronized (stateBackend.asyncSnapshotLock) { + + if (stateBackend.db == null) { + throw new IOException("RocksDB closed."); + } + + stateBackend.cancelStreamRegistry.registerClosable(closeableRegistry); + + // write meta data + metaStateHandle = materializeMetaData(); + + // write state data + Preconditions.checkState(backupFileSystem.exists(backupPath)); + + FileStatus[] fileStatuses = backupFileSystem.listStatus(backupPath); + if (fileStatuses != null) { + for (FileStatus fileStatus : fileStatuses) { + Path filePath = fileStatus.getPath(); + String fileName = filePath.getName(); + + if (fileName.endsWith(SST_FILE_SUFFIX)) { + StreamStateHandle fileHandle = + baseSstFiles == null ? null : baseSstFiles.get(fileName); + + if (fileHandle == null) { + fileHandle = materializeStateData(filePath); + + newSstFiles.put(fileName, fileHandle); + } else { + oldSstFiles.put(fileName, fileHandle); + } + } else { + StreamStateHandle fileHandle = materializeStateData(filePath); + miscFiles.put(fileName, fileHandle); + } + } + } + + Map<String, StreamStateHandle> sstFiles = new HashMap<>(newSstFiles.size() + oldSstFiles.size()); + sstFiles.putAll(newSstFiles); + sstFiles.putAll(oldSstFiles); + + stateBackend.materializedSstFiles.put(checkpointId, sstFiles); + + return new RocksDBIncrementalKeyedStateHandle(stateBackend.jobId, + stateBackend.operatorIdentifier, stateBackend.keyGroupRange, + checkpointId, newSstFiles, oldSstFiles, miscFiles, metaStateHandle); + } + } + + void stop() { + try { + closeableRegistry.close(); + } catch (IOException e) { + LOG.warn("Could not properly close io streams.", e); + } + } + + void releaseResources(boolean canceled) { + stateBackend.cancelStreamRegistry.unregisterClosable(closeableRegistry); + + if (backupPath != null) { + try { + if (backupFileSystem.exists(backupPath)) { + backupFileSystem.delete(backupPath, true); + } + } catch (Exception e) { + LOG.warn("Could not properly delete the checkpoint directory.", e); + } + } + + if (canceled) { + List<StateObject> statesToDiscard = new ArrayList<>(); + + statesToDiscard.add(metaStateHandle); + statesToDiscard.addAll(miscFiles.values()); + statesToDiscard.addAll(newSstFiles.values()); + + try { + StateUtil.bestEffortDiscardAllStateObjects(statesToDiscard); + } catch (Exception e) { + LOG.warn("Could not properly discard states.", e); + } + } + } + } + @Override public void restore(Collection<KeyedStateHandle> restoreState) throws Exception { LOG.info("Initializing RocksDB keyed state backend from snapshot."); @@ -630,11 +947,16 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { } try { - if (MigrationUtil.isOldSavepointKeyedState(restoreState)) { + if (restoreState == null || restoreState.isEmpty()) { + createDB(); + } else if (MigrationUtil.isOldSavepointKeyedState(restoreState)) { LOG.info("Converting RocksDB state from old savepoint."); restoreOldSavepointKeyedState(restoreState); + } else if (restoreState.iterator().next() instanceof RocksDBIncrementalKeyedStateHandle) { + RocksDBIncrementalRestoreOperation restoreOperation = new RocksDBIncrementalRestoreOperation(this); + restoreOperation.restore(restoreState); } else { - RocksDBRestoreOperation restoreOperation = new RocksDBRestoreOperation(this); + RocksDBFullRestoreOperation restoreOperation = new RocksDBFullRestoreOperation(this); restoreOperation.doRestore(restoreState); } } catch (Exception ex) { @@ -643,10 +965,68 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { } } + @Override + public void notifyOfCompletedCheckpoint(long completedCheckpointId) { + synchronized (asyncSnapshotLock) { + if (completedCheckpointId < lastCompletedCheckpointId) { + return; + } + + Iterator<Long> materializedCheckpointIterator = materializedSstFiles.keySet().iterator(); + while (materializedCheckpointIterator.hasNext()) { + long materializedCheckpointId = materializedCheckpointIterator.next(); + + if (materializedCheckpointId < completedCheckpointId) { + materializedCheckpointIterator.remove(); + } + } + + lastCompletedCheckpointId = completedCheckpointId; + } + } + + private void createDB() throws IOException { + db = openDB(instanceRocksDBPath.getAbsolutePath(), + new ArrayList<ColumnFamilyDescriptor>(), + null); + } + + private RocksDB openDB( + String path, + List<ColumnFamilyDescriptor> stateColumnFamilyDescriptors, + List<ColumnFamilyHandle> stateColumnFamilyHandles) throws IOException { + + List<ColumnFamilyDescriptor> columnFamilyDescriptors = new ArrayList<>(stateColumnFamilyDescriptors); + columnFamilyDescriptors.add( + new ColumnFamilyDescriptor( + "default".getBytes(ConfigConstants.DEFAULT_CHARSET), columnOptions)); + + List<ColumnFamilyHandle> columnFamilyHandles = new ArrayList<>(columnFamilyDescriptors.size()); + + RocksDB db; + + try { + db = RocksDB.open( + Preconditions.checkNotNull(dbOptions), + Preconditions.checkNotNull(path), + columnFamilyDescriptors, + columnFamilyHandles); + } catch (RocksDBException e) { + throw new IOException("Error while opening RocksDB instance.", e); + } + + if (stateColumnFamilyHandles != null) { + stateColumnFamilyHandles.addAll( + columnFamilyHandles.subList(0, columnFamilyHandles.size() - 1)); + } + + return db; + } + /** * Encapsulates the process of restoring a RocksDBKeyedStateBackend from a snapshot. */ - static final class RocksDBRestoreOperation { + static final class RocksDBFullRestoreOperation { private final RocksDBKeyedStateBackend<?> rocksDBKeyedStateBackend; @@ -664,7 +1044,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { * * @param rocksDBKeyedStateBackend the state backend into which we restore */ - public RocksDBRestoreOperation(RocksDBKeyedStateBackend<?> rocksDBKeyedStateBackend) { + public RocksDBFullRestoreOperation(RocksDBKeyedStateBackend<?> rocksDBKeyedStateBackend) { this.rocksDBKeyedStateBackend = Preconditions.checkNotNull(rocksDBKeyedStateBackend); } @@ -679,6 +1059,8 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { public void doRestore(Collection<KeyedStateHandle> keyedStateHandles) throws IOException, ClassNotFoundException, RocksDBException { + rocksDBKeyedStateBackend.createDB(); + for (KeyedStateHandle keyedStateHandle : keyedStateHandles) { if (keyedStateHandle != null) { @@ -787,14 +1169,14 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { while (keyGroupHasMoreKeys) { byte[] key = BytePrimitiveArraySerializer.INSTANCE.deserialize(currentStateHandleInView); byte[] value = BytePrimitiveArraySerializer.INSTANCE.deserialize(currentStateHandleInView); - if (RocksDBSnapshotOperation.hasMetaDataFollowsFlag(key)) { + if (RocksDBFullSnapshotOperation.hasMetaDataFollowsFlag(key)) { //clear the signal bit in the key to make it ready for insertion again - RocksDBSnapshotOperation.clearMetaDataFollowsFlag(key); + RocksDBFullSnapshotOperation.clearMetaDataFollowsFlag(key); rocksDBKeyedStateBackend.db.put(handle, key, value); //TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible - kvStateId = RocksDBSnapshotOperation.END_OF_KEY_GROUP_MARK + kvStateId = RocksDBFullSnapshotOperation.END_OF_KEY_GROUP_MARK & currentStateHandleInView.readShort(); - if (RocksDBSnapshotOperation.END_OF_KEY_GROUP_MARK == kvStateId) { + if (RocksDBFullSnapshotOperation.END_OF_KEY_GROUP_MARK == kvStateId) { keyGroupHasMoreKeys = false; } else { handle = currentStateHandleKVStateColumnFamilies.get(kvStateId); @@ -808,6 +1190,272 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { } } + private static class RocksDBIncrementalRestoreOperation { + + private final RocksDBKeyedStateBackend<?> stateBackend; + + private RocksDBIncrementalRestoreOperation(RocksDBKeyedStateBackend<?> stateBackend) { + this.stateBackend = stateBackend; + } + + private List<KeyedBackendSerializationProxy.StateMetaInfo<?, ?>> readMetaData( + StreamStateHandle metaStateHandle) throws Exception { + + FSDataInputStream inputStream = null; + + try { + inputStream = metaStateHandle.openInputStream(); + stateBackend.cancelStreamRegistry.registerClosable(inputStream); + + KeyedBackendSerializationProxy serializationProxy = + new KeyedBackendSerializationProxy(stateBackend.userCodeClassLoader); + DataInputView in = new DataInputViewStreamWrapper(inputStream); + serializationProxy.read(in); + + return serializationProxy.getNamedStateSerializationProxies(); + } finally { + if (inputStream != null) { + stateBackend.cancelStreamRegistry.unregisterClosable(inputStream); + inputStream.close(); + } + } + } + + private void readStateData( + Path restoreFilePath, + StreamStateHandle remoteFileHandle) throws IOException { + + FileSystem restoreFileSystem = restoreFilePath.getFileSystem(); + + FSDataInputStream inputStream = null; + FSDataOutputStream outputStream = null; + + try { + inputStream = remoteFileHandle.openInputStream(); + stateBackend.cancelStreamRegistry.registerClosable(inputStream); + + outputStream = restoreFileSystem.create(restoreFilePath, FileSystem.WriteMode.OVERWRITE); + stateBackend.cancelStreamRegistry.registerClosable(outputStream); + + byte[] buffer = new byte[1024]; + while (true) { + int numBytes = inputStream.read(buffer); + if (numBytes == -1) { + break; + } + + outputStream.write(buffer, 0, numBytes); + } + } finally { + if (inputStream != null) { + stateBackend.cancelStreamRegistry.unregisterClosable(inputStream); + inputStream.close(); + } + + if (outputStream != null) { + stateBackend.cancelStreamRegistry.unregisterClosable(outputStream); + outputStream.close(); + } + } + } + + private void restoreInstance( + RocksDBIncrementalKeyedStateHandle restoreStateHandle, + boolean hasExtraKeys) throws Exception { + + // read state data + Path restoreInstancePath = new Path( + stateBackend.instanceBasePath.getAbsolutePath(), + UUID.randomUUID().toString()); + + try { + Map<String, StreamStateHandle> newSstFiles = restoreStateHandle.getNewSstFiles(); + for (Map.Entry<String, StreamStateHandle> newSstFileEntry : newSstFiles.entrySet()) { + String fileName = newSstFileEntry.getKey(); + StreamStateHandle remoteFileHandle = newSstFileEntry.getValue(); + + readStateData(new Path(restoreInstancePath, fileName), remoteFileHandle); + } + + Map<String, StreamStateHandle> oldSstFiles = restoreStateHandle.getOldSstFiles(); + for (Map.Entry<String, StreamStateHandle> oldSstFileEntry : oldSstFiles.entrySet()) { + String fileName = oldSstFileEntry.getKey(); + StreamStateHandle remoteFileHandle = oldSstFileEntry.getValue(); + + readStateData(new Path(restoreInstancePath, fileName), remoteFileHandle); + } + + Map<String, StreamStateHandle> miscFiles = restoreStateHandle.getMiscFiles(); + for (Map.Entry<String, StreamStateHandle> miscFileEntry : miscFiles.entrySet()) { + String fileName = miscFileEntry.getKey(); + StreamStateHandle remoteFileHandle = miscFileEntry.getValue(); + + readStateData(new Path(restoreInstancePath, fileName), remoteFileHandle); + } + + // read meta data + List<KeyedBackendSerializationProxy.StateMetaInfo<?, ?>> stateMetaInfoProxies = + readMetaData(restoreStateHandle.getMetaStateHandle()); + + List<ColumnFamilyDescriptor> columnFamilyDescriptors = new ArrayList<>(); + + for (KeyedBackendSerializationProxy.StateMetaInfo<?, ?> stateMetaInfoProxy : stateMetaInfoProxies) { + + ColumnFamilyDescriptor columnFamilyDescriptor = new ColumnFamilyDescriptor( + stateMetaInfoProxy.getStateName().getBytes(ConfigConstants.DEFAULT_CHARSET), + stateBackend.columnOptions); + + columnFamilyDescriptors.add(columnFamilyDescriptor); + } + + if (hasExtraKeys) { + + List<ColumnFamilyHandle> columnFamilyHandles = new ArrayList<>(); + + try (RocksDB restoreDb = stateBackend.openDB( + restoreInstancePath.getPath(), + columnFamilyDescriptors, + columnFamilyHandles)) { + + for (int i = 0; i < columnFamilyHandles.size(); ++i) { + ColumnFamilyHandle columnFamilyHandle = columnFamilyHandles.get(i); + ColumnFamilyDescriptor columnFamilyDescriptor = columnFamilyDescriptors.get(i); + KeyedBackendSerializationProxy.StateMetaInfo<?, ?> stateMetaInfoProxy = stateMetaInfoProxies.get(i); + + Tuple2<ColumnFamilyHandle, RegisteredBackendStateMetaInfo<?, ?>> registeredStateMetaInfoEntry = + stateBackend.kvStateInformation.get(stateMetaInfoProxy.getStateName()); + + if (null == registeredStateMetaInfoEntry) { + + RegisteredBackendStateMetaInfo<?, ?> stateMetaInfo = + new RegisteredBackendStateMetaInfo<>(stateMetaInfoProxy); + + registeredStateMetaInfoEntry = + new Tuple2<ColumnFamilyHandle, RegisteredBackendStateMetaInfo<?, ?>>( + stateBackend.db.createColumnFamily(columnFamilyDescriptor), + stateMetaInfo); + + stateBackend.kvStateInformation.put( + stateMetaInfoProxy.getStateName(), + registeredStateMetaInfoEntry); + } + + ColumnFamilyHandle targetColumnFamilyHandle = registeredStateMetaInfoEntry.f0; + + try (RocksIterator iterator = restoreDb.newIterator(columnFamilyHandle)) { + + int startKeyGroup = stateBackend.getKeyGroupRange().getStartKeyGroup(); + byte[] startKeyGroupPrefixBytes = new byte[stateBackend.keyGroupPrefixBytes]; + for (int j = 0; j < stateBackend.keyGroupPrefixBytes; ++j) { + startKeyGroupPrefixBytes[j] = (byte)(startKeyGroup >>> ((stateBackend.keyGroupPrefixBytes - j - 1) * Byte.SIZE)); + } + + iterator.seek(startKeyGroupPrefixBytes); + + while (iterator.isValid()) { + + int keyGroup = 0; + for (int j = 0; j < stateBackend.keyGroupPrefixBytes; ++j) { + keyGroup = (keyGroup << Byte.SIZE) + iterator.key()[j]; + } + + if (stateBackend.keyGroupRange.contains(keyGroup)) { + stateBackend.db.put(targetColumnFamilyHandle, + iterator.key(), iterator.value()); + } + + iterator.next(); + } + } + } + } + } else { + + // create hard links in the instance directory + if (!stateBackend.instanceRocksDBPath.mkdirs()) { + throw new IOException("Could not create RocksDB data directory."); + } + + for (String newSstFileName : newSstFiles.keySet()) { + File restoreFile = new File(restoreInstancePath.getPath(), newSstFileName); + File targetFile = new File(stateBackend.instanceRocksDBPath, newSstFileName); + + Files.createLink(targetFile.toPath(), restoreFile.toPath()); + } + + for (String oldSstFileName : oldSstFiles.keySet()) { + File restoreFile = new File(restoreInstancePath.getPath(), oldSstFileName); + File targetFile = new File(stateBackend.instanceRocksDBPath, oldSstFileName); + + Files.createLink(targetFile.toPath(), restoreFile.toPath()); + } + + for (String miscFileName : miscFiles.keySet()) { + File restoreFile = new File(restoreInstancePath.getPath(), miscFileName); + File targetFile = new File(stateBackend.instanceRocksDBPath, miscFileName); + + Files.createLink(targetFile.toPath(), restoreFile.toPath()); + } + + List<ColumnFamilyHandle> columnFamilyHandles = new ArrayList<>(); + stateBackend.db = stateBackend.openDB( + stateBackend.instanceRocksDBPath.getAbsolutePath(), + columnFamilyDescriptors, columnFamilyHandles); + + for (int i = 0; i < columnFamilyDescriptors.size(); ++i) { + KeyedBackendSerializationProxy.StateMetaInfo<?, ?> stateMetaInfoProxy = stateMetaInfoProxies.get(i); + + ColumnFamilyHandle columnFamilyHandle = columnFamilyHandles.get(i); + RegisteredBackendStateMetaInfo<?, ?> stateMetaInfo = + new RegisteredBackendStateMetaInfo<>(stateMetaInfoProxy); + + stateBackend.kvStateInformation.put( + stateMetaInfoProxy.getStateName(), + new Tuple2<ColumnFamilyHandle, RegisteredBackendStateMetaInfo<?, ?>>( + columnFamilyHandle, stateMetaInfo)); + } + + + // use the restore sst files as the base for succeeding checkpoints + Map<String, StreamStateHandle> sstFiles = new HashMap<>(); + sstFiles.putAll(newSstFiles); + sstFiles.putAll(oldSstFiles); + stateBackend.materializedSstFiles.put(restoreStateHandle.getCheckpointId(), sstFiles); + + stateBackend.lastCompletedCheckpointId = restoreStateHandle.getCheckpointId(); + } + } finally { + FileSystem restoreFileSystem = restoreInstancePath.getFileSystem(); + if (restoreFileSystem.exists(restoreInstancePath)) { + restoreFileSystem.delete(restoreInstancePath, true); + } + } + } + + void restore(Collection<KeyedStateHandle> restoreStateHandles) throws Exception { + + boolean hasExtraKeys = (restoreStateHandles.size() > 1 || + !restoreStateHandles.iterator().next().getKeyGroupRange().equals(stateBackend.keyGroupRange)); + + if (hasExtraKeys) { + stateBackend.createDB(); + } + + for (KeyedStateHandle rawStateHandle : restoreStateHandles) { + + if (! (rawStateHandle instanceof RocksDBIncrementalKeyedStateHandle)) { + throw new IllegalStateException("Unexpected state handle type, " + + "expected " + RocksDBIncrementalKeyedStateHandle.class + + ", but found " + rawStateHandle.getClass()); + } + + RocksDBIncrementalKeyedStateHandle keyedStateHandle = (RocksDBIncrementalKeyedStateHandle) rawStateHandle; + + restoreInstance(keyedStateHandle, hasExtraKeys); + } + } + } + // ------------------------------------------------------------------------ // State factories // ------------------------------------------------------------------------ @@ -1160,10 +1808,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { */ @Deprecated private void restoreOldSavepointKeyedState(Collection<KeyedStateHandle> restoreState) throws Exception { - - if (restoreState.isEmpty()) { - return; - } + createDB(); Preconditions.checkState(1 == restoreState.size(), "Only one element expected here."); http://git-wip-us.apache.org/repos/asf/flink/blob/6e94cf19/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java index 80c9a29..55b8be2 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java @@ -109,6 +109,9 @@ public class RocksDBStateBackend extends AbstractStateBackend { /** Whether we already lazily initialized our local storage directories. */ private transient boolean isInitialized = false; + /** True if incremental checkpointing is enabled */ + private boolean enableIncrementalCheckpointing; + /** * Creates a new {@code RocksDBStateBackend} that stores its checkpoint data in the @@ -123,7 +126,24 @@ public class RocksDBStateBackend extends AbstractStateBackend { * @throws IOException Thrown, if no file system can be found for the scheme in the URI. */ public RocksDBStateBackend(String checkpointDataUri) throws IOException { - this(new Path(checkpointDataUri).toUri()); + this(new Path(checkpointDataUri).toUri(), false); + } + + /** + * Creates a new {@code RocksDBStateBackend} that stores its checkpoint data in the + * file system and location defined by the given URI. + * + * <p>A state backend that stores checkpoints in HDFS or S3 must specify the file system + * host and port in the URI, or have the Hadoop configuration that describes the file system + * (host / high-availability group / possibly credentials) either referenced from the Flink + * config, or included in the classpath. + * + * @param checkpointDataUri The URI describing the filesystem and path to the checkpoint data directory. + * @param enableIncrementalCheckpointing True if incremental checkpointing is enabled. + * @throws IOException Thrown, if no file system can be found for the scheme in the URI. + */ + public RocksDBStateBackend(String checkpointDataUri, boolean enableIncrementalCheckpointing) throws IOException { + this(new Path(checkpointDataUri).toUri(), enableIncrementalCheckpointing); } /** @@ -139,7 +159,24 @@ public class RocksDBStateBackend extends AbstractStateBackend { * @throws IOException Thrown, if no file system can be found for the scheme in the URI. */ public RocksDBStateBackend(URI checkpointDataUri) throws IOException { - this(new FsStateBackend(checkpointDataUri)); + this(new FsStateBackend(checkpointDataUri), false); + } + + /** + * Creates a new {@code RocksDBStateBackend} that stores its checkpoint data in the + * file system and location defined by the given URI. + * + * <p>A state backend that stores checkpoints in HDFS or S3 must specify the file system + * host and port in the URI, or have the Hadoop configuration that describes the file system + * (host / high-availability group / possibly credentials) either referenced from the Flink + * config, or included in the classpath. + * + * @param checkpointDataUri The URI describing the filesystem and path to the checkpoint data directory. + * @param enableIncrementalCheckpointing True if incremental checkpointing is enabled. + * @throws IOException Thrown, if no file system can be found for the scheme in the URI. + */ + public RocksDBStateBackend(URI checkpointDataUri, boolean enableIncrementalCheckpointing) throws IOException { + this(new FsStateBackend(checkpointDataUri), enableIncrementalCheckpointing); } /** @@ -156,6 +193,22 @@ public class RocksDBStateBackend extends AbstractStateBackend { this.checkpointStreamBackend = requireNonNull(checkpointStreamBackend); } + /** + * Creates a new {@code RocksDBStateBackend} that uses the given state backend to store its + * checkpoint data streams. Typically, one would supply a filesystem or database state backend + * here where the snapshots from RocksDB would be stored. + * + * <p>The snapshots of the RocksDB state will be stored using the given backend's + * {@link AbstractStateBackend#createStreamFactory(JobID, String) checkpoint stream}. + * + * @param checkpointStreamBackend The backend to store the + * @param enableIncrementalCheckpointing True if incremental checkponting is enabled + */ + public RocksDBStateBackend(AbstractStateBackend checkpointStreamBackend, boolean enableIncrementalCheckpointing) { + this.checkpointStreamBackend = requireNonNull(checkpointStreamBackend); + this.enableIncrementalCheckpointing = enableIncrementalCheckpointing; + } + // ------------------------------------------------------------------------ // State backend methods // ------------------------------------------------------------------------ @@ -260,7 +313,8 @@ public class RocksDBStateBackend extends AbstractStateBackend { keySerializer, numberOfKeyGroups, keyGroupRange, - env.getExecutionConfig()); + env.getExecutionConfig(), + enableIncrementalCheckpointing); } // ------------------------------------------------------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/6e94cf19/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingStateTest.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingStateTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingStateTest.java index 983e569..1b65466 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingStateTest.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingStateTest.java @@ -204,7 +204,7 @@ public class RocksDBAggregatingStateTest { } private static RocksDBKeyedStateBackend<String> createKeyedBackend(RocksDBStateBackend backend) throws Exception { - return (RocksDBKeyedStateBackend<String>) backend.createKeyedStateBackend( + RocksDBKeyedStateBackend<String> keyedBackend = (RocksDBKeyedStateBackend<String>) backend.createKeyedStateBackend( new DummyEnvironment("TestTask", 1, 0), new JobID(), "test-op", @@ -212,6 +212,10 @@ public class RocksDBAggregatingStateTest { 16, new KeyGroupRange(2, 3), mock(TaskKvStateRegistry.class)); + + keyedBackend.restore(null); + + return keyedBackend; } // test functions http://git-wip-us.apache.org/repos/asf/flink/blob/6e94cf19/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java index ffe2ce2..812babb 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java @@ -41,7 +41,6 @@ import org.apache.flink.runtime.state.AbstractKeyedStateBackend; import org.apache.flink.runtime.state.AbstractStateBackend; import org.apache.flink.runtime.state.CheckpointStreamFactory; import org.apache.flink.runtime.state.KeyGroupRange; -import org.apache.flink.runtime.state.KeyGroupsStateHandle; import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.runtime.state.VoidNamespace; import org.apache.flink.runtime.state.VoidNamespaceSerializer; @@ -338,6 +337,8 @@ public class RocksDBAsyncSnapshotTest { new KeyGroupRange(0, 0), null); + keyedStateBackend.restore(null); + // register a state so that the state backend has to checkpoint something keyedStateBackend.getPartitionedState( "namespace", @@ -360,19 +361,21 @@ public class RocksDBAsyncSnapshotTest { @Test public void testConsistentSnapshotSerializationFlagsAndMasks() { - Assert.assertEquals(0xFFFF, RocksDBKeyedStateBackend.RocksDBSnapshotOperation.END_OF_KEY_GROUP_MARK); - Assert.assertEquals(0x80, RocksDBKeyedStateBackend.RocksDBSnapshotOperation.FIRST_BIT_IN_BYTE_MASK); + Assert.assertEquals(0xFFFF, RocksDBKeyedStateBackend.RocksDBFullSnapshotOperation.END_OF_KEY_GROUP_MARK); + Assert.assertEquals(0x80, RocksDBKeyedStateBackend.RocksDBFullSnapshotOperation.FIRST_BIT_IN_BYTE_MASK); byte[] expectedKey = new byte[] {42, 42}; byte[] modKey = expectedKey.clone(); - Assert.assertFalse(RocksDBKeyedStateBackend.RocksDBSnapshotOperation.hasMetaDataFollowsFlag(modKey)); + Assert.assertFalse( + RocksDBKeyedStateBackend.RocksDBFullSnapshotOperation.hasMetaDataFollowsFlag(modKey)); - RocksDBKeyedStateBackend.RocksDBSnapshotOperation.setMetaDataFollowsFlagInKey(modKey); - Assert.assertTrue(RocksDBKeyedStateBackend.RocksDBSnapshotOperation.hasMetaDataFollowsFlag(modKey)); + RocksDBKeyedStateBackend.RocksDBFullSnapshotOperation.setMetaDataFollowsFlagInKey(modKey); + Assert.assertTrue(RocksDBKeyedStateBackend.RocksDBFullSnapshotOperation.hasMetaDataFollowsFlag(modKey)); - RocksDBKeyedStateBackend.RocksDBSnapshotOperation.clearMetaDataFollowsFlag(modKey); - Assert.assertFalse(RocksDBKeyedStateBackend.RocksDBSnapshotOperation.hasMetaDataFollowsFlag(modKey)); + RocksDBKeyedStateBackend.RocksDBFullSnapshotOperation.clearMetaDataFollowsFlag(modKey); + Assert.assertFalse( + RocksDBKeyedStateBackend.RocksDBFullSnapshotOperation.hasMetaDataFollowsFlag(modKey)); Assert.assertTrue(Arrays.equals(expectedKey, modKey)); } http://git-wip-us.apache.org/repos/asf/flink/blob/6e94cf19/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBListStateTest.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBListStateTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBListStateTest.java index d8d0308..e7efcfa 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBListStateTest.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBListStateTest.java @@ -210,7 +210,7 @@ public class RocksDBListStateTest { // ------------------------------------------------------------------------ private static RocksDBKeyedStateBackend<String> createKeyedBackend(RocksDBStateBackend backend) throws Exception { - return (RocksDBKeyedStateBackend<String>) backend.createKeyedStateBackend( + RocksDBKeyedStateBackend<String> keyedBackend = (RocksDBKeyedStateBackend<String>) backend.createKeyedStateBackend( new DummyEnvironment("TestTask", 1, 0), new JobID(), "test-op", @@ -218,6 +218,10 @@ public class RocksDBListStateTest { 16, new KeyGroupRange(2, 3), mock(TaskKvStateRegistry.class)); + + keyedBackend.restore(null); + + return keyedBackend; } private static <T> void validateResult(Iterable<T> values, Set<T> expected) { http://git-wip-us.apache.org/repos/asf/flink/blob/6e94cf19/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBReducingStateTest.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBReducingStateTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBReducingStateTest.java index fb854f2..a8b4535 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBReducingStateTest.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBReducingStateTest.java @@ -210,7 +210,7 @@ public class RocksDBReducingStateTest { // ------------------------------------------------------------------------ private static RocksDBKeyedStateBackend<String> createKeyedBackend(RocksDBStateBackend backend) throws Exception { - return (RocksDBKeyedStateBackend<String>) backend.createKeyedStateBackend( + RocksDBKeyedStateBackend<String> keyedBackend = (RocksDBKeyedStateBackend<String>) backend.createKeyedStateBackend( new DummyEnvironment("TestTask", 1, 0), new JobID(), "test-op", @@ -218,6 +218,10 @@ public class RocksDBReducingStateTest { 16, new KeyGroupRange(2, 3), mock(TaskKvStateRegistry.class)); + + keyedBackend.restore(null); + + return keyedBackend; } // ------------------------------------------------------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/6e94cf19/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java index b5f18a4..fad1559 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java @@ -33,7 +33,6 @@ import org.apache.flink.runtime.operators.testutils.DummyEnvironment; import org.apache.flink.runtime.query.TaskKvStateRegistry; import org.apache.flink.runtime.state.AbstractKeyedStateBackend; import org.apache.flink.runtime.state.KeyGroupRange; -import org.apache.flink.runtime.state.KeyGroupsStateHandle; import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.runtime.state.StateBackendTestBase; import org.apache.flink.runtime.state.VoidNamespace; @@ -42,8 +41,11 @@ import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; +import org.rocksdb.Checkpoint; import org.rocksdb.ColumnFamilyDescriptor; import org.rocksdb.ColumnFamilyHandle; import org.rocksdb.ReadOptions; @@ -55,6 +57,7 @@ import org.rocksdb.Snapshot; import java.io.File; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.List; import java.util.concurrent.RunnableFuture; @@ -73,6 +76,7 @@ import static org.powermock.api.mockito.PowerMockito.spy; /** * Tests for the partitioned state part of {@link RocksDBStateBackend}. */ +@RunWith(Parameterized.class) public class RocksDBStateBackendTest extends StateBackendTestBase<RocksDBStateBackend> { private OneShotLatch blocker; @@ -83,17 +87,25 @@ public class RocksDBStateBackendTest extends StateBackendTestBase<RocksDBStateBa private ValueState<Integer> testState1; private ValueState<String> testState2; + @Parameterized.Parameters + public static Collection<Boolean> parameters() { + return Arrays.asList(false, true); + } + + @Parameterized.Parameter + public boolean enableIncrementalCheckpointing; + @Rule public TemporaryFolder tempFolder = new TemporaryFolder(); // Store it because we need it for the cleanup test. - private String dbPath; + String dbPath; @Override protected RocksDBStateBackend getStateBackend() throws IOException { dbPath = tempFolder.newFolder().getAbsolutePath(); String checkpointPath = tempFolder.newFolder().toURI().toString(); - RocksDBStateBackend backend = new RocksDBStateBackend(new FsStateBackend(checkpointPath)); + RocksDBStateBackend backend = new RocksDBStateBackend(new FsStateBackend(checkpointPath), enableIncrementalCheckpointing); backend.setDbStoragePath(dbPath); return backend; } @@ -105,7 +117,7 @@ public class RocksDBStateBackendTest extends StateBackendTestBase<RocksDBStateBa testStreamFactory = new BlockerCheckpointStreamFactory(1024 * 1024); testStreamFactory.setBlockerLatch(blocker); testStreamFactory.setWaiterLatch(waiter); - testStreamFactory.setAfterNumberInvocations(100); + testStreamFactory.setAfterNumberInvocations(10); RocksDBStateBackend backend = getStateBackend(); Environment env = new DummyEnvironment("TestTask", 1, 0); @@ -119,6 +131,8 @@ public class RocksDBStateBackendTest extends StateBackendTestBase<RocksDBStateBa new KeyGroupRange(0, 1), mock(TaskKvStateRegistry.class)); + keyedStateBackend.restore(null); + testState1 = keyedStateBackend.getPartitionedState( VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, @@ -178,8 +192,10 @@ public class RocksDBStateBackendTest extends StateBackendTestBase<RocksDBStateBa RocksDB spyDB = keyedStateBackend.db; - verify(spyDB, times(1)).getSnapshot(); - verify(spyDB, times(0)).releaseSnapshot(any(Snapshot.class)); + if (!enableIncrementalCheckpointing) { + verify(spyDB, times(1)).getSnapshot(); + verify(spyDB, times(0)).releaseSnapshot(any(Snapshot.class)); + } this.keyedStateBackend.dispose(); verify(spyDB, times(1)).close(); @@ -216,8 +232,10 @@ public class RocksDBStateBackendTest extends StateBackendTestBase<RocksDBStateBa RocksDB spyDB = keyedStateBackend.db; - verify(spyDB, times(1)).getSnapshot(); - verify(spyDB, times(0)).releaseSnapshot(any(Snapshot.class)); + if (!enableIncrementalCheckpointing) { + verify(spyDB, times(1)).getSnapshot(); + verify(spyDB, times(0)).releaseSnapshot(any(Snapshot.class)); + } this.keyedStateBackend.dispose(); verify(spyDB, times(1)).close(); @@ -319,7 +337,6 @@ public class RocksDBStateBackendTest extends StateBackendTestBase<RocksDBStateBa backend.setCurrentKey(1); state.update("Hello"); - Collection<File> allFilesInDbDir = FileUtils.listFilesAndDirs(new File(dbPath), new AcceptAllFilter(), new AcceptAllFilter()); @@ -356,8 +373,10 @@ public class RocksDBStateBackendTest extends StateBackendTestBase<RocksDBStateBa assertNotNull(null, keyedStateBackend.db); RocksDB spyDB = keyedStateBackend.db; - verify(spyDB, times(1)).getSnapshot(); - verify(spyDB, times(1)).releaseSnapshot(any(Snapshot.class)); + if (!enableIncrementalCheckpointing) { + verify(spyDB, times(1)).getSnapshot(); + verify(spyDB, times(1)).releaseSnapshot(any(Snapshot.class)); + } keyedStateBackend.dispose(); verify(spyDB, times(1)).close(); http://git-wip-us.apache.org/repos/asf/flink/blob/6e94cf19/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SubtaskState.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SubtaskState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SubtaskState.java index 121ac57..a77baf3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SubtaskState.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SubtaskState.java @@ -152,12 +152,24 @@ public class SubtaskState implements CompositeStateHandle { @Override public void registerSharedStates(SharedStateRegistry sharedStateRegistry) { - // No shared states + if (managedKeyedState != null) { + managedKeyedState.registerSharedStates(sharedStateRegistry); + } + + if (rawKeyedState != null) { + rawKeyedState.registerSharedStates(sharedStateRegistry); + } } @Override public void unregisterSharedStates(SharedStateRegistry sharedStateRegistry) { - // No shared states + if (managedKeyedState != null) { + managedKeyedState.unregisterSharedStates(sharedStateRegistry); + } + + if (rawKeyedState != null) { + rawKeyedState.unregisterSharedStates(sharedStateRegistry); + } } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/6e94cf19/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java index e86f1f8..61f397c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java @@ -212,6 +212,16 @@ public abstract class AbstractKeyedStateBackend<K> MapStateDescriptor<UK, UV> stateDesc) throws Exception; /** + * Called when the checkpoint with the given ID is completed and acknowledged on the JobManager. + * + * @param checkpointId The ID of the checkpoint that has been completed. + * + * @throws Exception Exceptions during checkpoint acknowledgement may be forwarded and will cause + * the program to fail and enter recovery. + */ + public abstract void notifyOfCompletedCheckpoint(long checkpointId) throws Exception; + + /** * @see KeyedStateBackend */ @Override http://git-wip-us.apache.org/repos/asf/flink/blob/6e94cf19/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java index bad7fd4..8280460 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java @@ -93,6 +93,16 @@ public class KeyGroupsStateHandle implements StreamStateHandle, KeyedStateHandle } @Override + public void registerSharedStates(SharedStateRegistry stateRegistry) { + // No shared states + } + + @Override + public void unregisterSharedStates(SharedStateRegistry stateRegistry) { + // No shared states + } + + @Override public void discardState() throws Exception { stateHandle.discardState(); } http://git-wip-us.apache.org/repos/asf/flink/blob/6e94cf19/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateHandle.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateHandle.java index dc9c97d..704ec14 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateHandle.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateHandle.java @@ -23,7 +23,7 @@ package org.apache.flink.runtime.state; * recovering from failures, the handle will be passed to all tasks whose key * group ranges overlap with it. */ -public interface KeyedStateHandle extends StateObject { +public interface KeyedStateHandle extends CompositeStateHandle { /** * Returns the range of the key groups contained in the state. http://git-wip-us.apache.org/repos/asf/flink/blob/6e94cf19/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtil.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtil.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtil.java index b250831..6f231e4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtil.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtil.java @@ -33,6 +33,15 @@ public class StateUtil { } /** + * Returns the size of a state object + * + * @param handle The handle to the retrieved state + */ + public static long getStateSize(StateObject handle) { + return handle == null ? 0 : handle.getStateSize(); + } + + /** * Iterates through the passed state handles and calls discardState() on each handle that is not null. All * occurring exceptions are suppressed and collected until the iteration is over and emitted as a single exception. * http://git-wip-us.apache.org/repos/asf/flink/blob/6e94cf19/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java index 38817cd..ead89b3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java @@ -328,6 +328,10 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { @SuppressWarnings("deprecation") @Override public void restore(Collection<KeyedStateHandle> restoredState) throws Exception { + if (restoredState == null || restoredState.isEmpty()) { + return; + } + LOG.info("Initializing heap keyed state backend from snapshot."); if (LOG.isDebugEnabled()) { @@ -426,6 +430,11 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { } @Override + public void notifyOfCompletedCheckpoint(long checkpointId) { + //Nothing to do + } + + @Override public String toString() { return "HeapKeyedStateBackend"; } http://git-wip-us.apache.org/repos/asf/flink/blob/6e94cf19/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java index ccc1eae..60f9c81 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java @@ -133,7 +133,8 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten int numberOfKeyGroups, KeyGroupRange keyGroupRange, Environment env) throws Exception { - return getStateBackend().createKeyedStateBackend( + + AbstractKeyedStateBackend<K> backend = getStateBackend().createKeyedStateBackend( env, new JobID(), "test_op", @@ -141,6 +142,10 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten numberOfKeyGroups, keyGroupRange, env.getTaskKvStateRegistry()); + + backend.restore(null); + + return backend; } protected <K> AbstractKeyedStateBackend<K> restoreKeyedBackend(TypeSerializer<K> keySerializer, KeyedStateHandle state) throws Exception { @@ -2197,9 +2202,11 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten Assert.assertNotNull(stateHandle); - backend = createKeyedBackend(IntSerializer.INSTANCE); + backend = null; + try { - backend.restore(Collections.singleton(stateHandle)); + backend = restoreKeyedBackend(IntSerializer.INSTANCE, stateHandle); + InternalValueState<VoidNamespace, Integer> valueState = backend.createValueState( VoidNamespaceSerializer.INSTANCE, new ValueStateDescriptor<>("test", IntSerializer.INSTANCE)); @@ -2297,7 +2304,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten * Returns the value by getting the serialized value and deserializing it * if it is not null. */ - private static <V, K, N> V getSerializedValue( + protected static <V, K, N> V getSerializedValue( InternalKvState<N> kvState, K key, TypeSerializer<K> keySerializer, http://git-wip-us.apache.org/repos/asf/flink/blob/6e94cf19/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java index 1850007..d45ad42 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java @@ -504,7 +504,11 @@ public abstract class AbstractStreamOperator<OUT> } @Override - public void notifyOfCompletedCheckpoint(long checkpointId) throws Exception {} + public void notifyOfCompletedCheckpoint(long checkpointId) throws Exception { + if (keyedStateBackend != null) { + keyedStateBackend.notifyOfCompletedCheckpoint(checkpointId); + } + } /** * Returns a checkpoint stream factory for the provided options. http://git-wip-us.apache.org/repos/asf/flink/blob/6e94cf19/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java index 57e43de..bc66751 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java @@ -765,9 +765,10 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> cancelables.registerClosable(keyedStateBackend); // restore if we have some old state - if (null != restoreStateHandles && null != restoreStateHandles.getManagedKeyedState()) { - keyedStateBackend.restore(restoreStateHandles.getManagedKeyedState()); - } + Collection<KeyedStateHandle> restoreKeyedStateHandles = + restoreStateHandles == null ? null : restoreStateHandles.getManagedKeyedState(); + + keyedStateBackend.restore(restoreKeyedStateHandles); @SuppressWarnings("unchecked") AbstractKeyedStateBackend<K> typedBackend = (AbstractKeyedStateBackend<K>) keyedStateBackend; http://git-wip-us.apache.org/repos/asf/flink/blob/6e94cf19/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java index d9c7387..c6d0bce 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java @@ -116,9 +116,7 @@ public class KeyedOneInputStreamOperatorTestHarness<K, IN, OUT> keyGroupRange, mockTask.getEnvironment().getTaskKvStateRegistry()); - if (restoredKeyedState != null) { - keyedStateBackend.restore(restoredKeyedState); - } + keyedStateBackend.restore(restoredKeyedState); return keyedStateBackend; } http://git-wip-us.apache.org/repos/asf/flink/blob/6e94cf19/flink-tests/src/test/java/org/apache/flink/test/checkpointing/PartitionedStateCheckpointingITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/PartitionedStateCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/PartitionedStateCheckpointingITCase.java index 4761d70..517c82b 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/PartitionedStateCheckpointingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/PartitionedStateCheckpointingITCase.java @@ -22,6 +22,8 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Map; @@ -35,11 +37,18 @@ import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; +import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; +import org.apache.flink.runtime.state.AbstractStateBackend; +import org.apache.flink.runtime.state.filesystem.FsStateBackend; +import org.apache.flink.runtime.state.memory.MemoryStateBackend; import org.apache.flink.streaming.api.checkpoint.ListCheckpointed; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; /** * A simple test that runs a streaming topology with checkpointing enabled. @@ -50,15 +59,49 @@ import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunctio * It is designed to check partitioned states. */ @SuppressWarnings("serial") +@RunWith(Parameterized.class) public class PartitionedStateCheckpointingITCase extends StreamFaultToleranceTestBase { + private static final int MAX_MEM_STATE_SIZE = 10 * 1024 * 1024; + final long NUM_STRINGS = 10_000_000L; final static int NUM_KEYS = 40; + @Parameterized.Parameters + public static Collection<AbstractStateBackend> parameters() throws IOException { + TemporaryFolder tempFolder = new TemporaryFolder(); + tempFolder.create(); + + MemoryStateBackend syncMemBackend = new MemoryStateBackend(MAX_MEM_STATE_SIZE, false); + MemoryStateBackend asyncMemBackend = new MemoryStateBackend(MAX_MEM_STATE_SIZE, true); + + FsStateBackend syncFsBackend = new FsStateBackend("file://" + tempFolder.newFolder().getAbsolutePath(), false); + FsStateBackend asyncFsBackend = new FsStateBackend("file://" + tempFolder.newFolder().getAbsolutePath(), true); + + RocksDBStateBackend fullRocksDbBackend = new RocksDBStateBackend(new MemoryStateBackend(MAX_MEM_STATE_SIZE), false); + fullRocksDbBackend.setDbStoragePath(tempFolder.newFolder().getAbsolutePath()); + + RocksDBStateBackend incRocksDbBackend = new RocksDBStateBackend(new MemoryStateBackend(MAX_MEM_STATE_SIZE), true); + incRocksDbBackend.setDbStoragePath(tempFolder.newFolder().getAbsolutePath()); + + return Arrays.asList( + syncMemBackend, + asyncMemBackend, + syncFsBackend, + asyncFsBackend, + fullRocksDbBackend, + incRocksDbBackend); + } + + @Parameterized.Parameter + public AbstractStateBackend stateBackend; + @Override public void testProgram(StreamExecutionEnvironment env) { assertTrue("Broken test setup", (NUM_STRINGS/2) % NUM_KEYS == 0); + env.setStateBackend(stateBackend); + DataStream<Integer> stream1 = env.addSource(new IntGeneratingSourceFunction(NUM_STRINGS / 2)); DataStream<Integer> stream2 = env.addSource(new IntGeneratingSourceFunction(NUM_STRINGS / 2)); @@ -163,6 +206,7 @@ public class PartitionedStateCheckpointingITCase extends StreamFaultToleranceTes OnceFailingPartitionedSum(long numElements) { this.numElements = numElements; + this.hasFailed = false; } @Override @@ -181,6 +225,7 @@ public class PartitionedStateCheckpointingITCase extends StreamFaultToleranceTes @Override public Tuple2<Integer, Long> map(Integer value) throws Exception { count++; + if (!hasFailed && count >= failurePos) { hasFailed = true; throw new Exception("Test Failure"); http://git-wip-us.apache.org/repos/asf/flink/blob/6e94cf19/flink-tests/src/test/java/org/apache/flink/test/query/KVStateRequestSerializerRocksDBTest.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/query/KVStateRequestSerializerRocksDBTest.java b/flink-tests/src/test/java/org/apache/flink/test/query/KVStateRequestSerializerRocksDBTest.java index 3c86f90..05f72c2 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/query/KVStateRequestSerializerRocksDBTest.java +++ b/flink-tests/src/test/java/org/apache/flink/test/query/KVStateRequestSerializerRocksDBTest.java @@ -81,7 +81,7 @@ public final class KVStateRequestSerializerRocksDBTest { super(jobId, operatorIdentifier, userCodeClassLoader, instanceBasePath, dbOptions, columnFamilyOptions, kvStateRegistry, keySerializer, - numberOfKeyGroups, keyGroupRange, executionConfig); + numberOfKeyGroups, keyGroupRange, executionConfig, false); } @Override @@ -120,6 +120,7 @@ public final class KVStateRequestSerializerRocksDBTest { 1, new KeyGroupRange(0, 0), new ExecutionConfig() ); + longHeapKeyedStateBackend.restore(null); longHeapKeyedStateBackend.setCurrentKey(key); final InternalListState<VoidNamespace, Long> listState = longHeapKeyedStateBackend @@ -154,8 +155,9 @@ public final class KVStateRequestSerializerRocksDBTest { mock(TaskKvStateRegistry.class), LongSerializer.INSTANCE, 1, new KeyGroupRange(0, 0), - new ExecutionConfig() - ); + new ExecutionConfig(), + false); + longHeapKeyedStateBackend.restore(null); longHeapKeyedStateBackend.setCurrentKey(key); final InternalMapState<VoidNamespace, Long, String> mapState = (InternalMapState<VoidNamespace, Long, String>)
