[FLINK-6545] [checkpoint] Make incremental checkpoints externalizable
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/098e46f2 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/098e46f2 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/098e46f2 Branch: refs/heads/master Commit: 098e46f2d222e8e6c5c27bc7ded40ee642dad104 Parents: efbb41b Author: Stefan Richter <s.rich...@data-artisans.com> Authored: Thu May 11 21:04:29 2017 +0200 Committer: Stefan Richter <s.rich...@data-artisans.com> Committed: Sun May 14 13:49:50 2017 +0200 ---------------------------------------------------------------------- .../RocksDBIncrementalKeyedStateHandle.java | 241 -------------- .../state/RocksDBKeyedStateBackend.java | 155 ++++----- .../org/apache/flink/util/StringBasedID.java | 69 ++++ .../savepoint/SavepointV2Serializer.java | 83 ++++- .../state/IncrementalKeyedStateHandle.java | 324 +++++++++++++++++++ .../state/PlaceholderStreamStateHandle.java | 88 +++++ .../flink/runtime/state/SharedStateHandle.java | 39 --- .../runtime/state/SharedStateRegistry.java | 2 +- .../runtime/state/SharedStateRegistryKey.java | 42 +-- .../apache/flink/runtime/state/StateHandle.java | 37 --- .../flink/runtime/state/StateHandleID.java | 37 +++ .../savepoint/CheckpointTestUtils.java | 87 +++-- .../savepoint/SavepointV2SerializerTest.java | 1 - .../flink/runtime/state/StateUtilTest.java | 36 --- 14 files changed, 736 insertions(+), 505 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/098e46f2/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalKeyedStateHandle.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalKeyedStateHandle.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalKeyedStateHandle.java deleted file mode 100644 index 961182d..0000000 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalKeyedStateHandle.java +++ /dev/null @@ -1,241 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.contrib.streaming.state; - -import org.apache.flink.runtime.state.CompositeStateHandle; -import org.apache.flink.runtime.state.KeyGroupRange; -import org.apache.flink.runtime.state.KeyedStateHandle; -import org.apache.flink.runtime.state.SharedStateRegistry; -import org.apache.flink.runtime.state.SharedStateRegistryKey; -import org.apache.flink.runtime.state.StateUtil; -import org.apache.flink.runtime.state.StreamStateHandle; -import org.apache.flink.util.Preconditions; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Map; - -/** - * The handle to states in incremental snapshots taken by {@link RocksDBKeyedStateBackend}. - * - * The states contained in an incremental snapshot include - * <ul> - * <li> New SST state which includes the sst files produced since the last completed - * checkpoint. These files can be referenced by succeeding checkpoints if the - * checkpoint succeeds to complete. </li> - * <li> Old SST state which includes the sst files materialized in previous - * checkpoints. </li> - * <li> MISC state which include the other files in the RocksDB instance, e.g. the - * LOG and MANIFEST files. These files are mutable, hence cannot be shared by - * other checkpoints. </li> - * <li> Meta state which includes the information of existing states. </li> - * </ul> - */ -public class RocksDBIncrementalKeyedStateHandle implements KeyedStateHandle, CompositeStateHandle { - - private static final Logger LOG = LoggerFactory.getLogger(RocksDBIncrementalKeyedStateHandle.class); - - private static final long serialVersionUID = -8328808513197388231L; - - private final String operatorIdentifier; - - private final KeyGroupRange keyGroupRange; - - private final long checkpointId; - - private final Map<String, StreamStateHandle> unregisteredSstFiles; - - private final Map<String, StreamStateHandle> registeredSstFiles; - - private final Map<String, StreamStateHandle> miscFiles; - - private final StreamStateHandle metaStateHandle; - - /** - * True if the state handle has already registered shared states. - * - * Once the shared states are registered, it's the {@link SharedStateRegistry}'s - * responsibility to maintain the shared states. But in the cases where the - * state handle is discarded before performing the registration, the handle - * should delete all the shared states created by it. - */ - private boolean registered; - - RocksDBIncrementalKeyedStateHandle( - String operatorIdentifier, - KeyGroupRange keyGroupRange, - long checkpointId, - Map<String, StreamStateHandle> unregisteredSstFiles, - Map<String, StreamStateHandle> registeredSstFiles, - Map<String, StreamStateHandle> miscFiles, - StreamStateHandle metaStateHandle) { - - this.operatorIdentifier = Preconditions.checkNotNull(operatorIdentifier); - this.keyGroupRange = Preconditions.checkNotNull(keyGroupRange); - this.checkpointId = checkpointId; - this.unregisteredSstFiles = Preconditions.checkNotNull(unregisteredSstFiles); - this.registeredSstFiles = Preconditions.checkNotNull(registeredSstFiles); - this.miscFiles = Preconditions.checkNotNull(miscFiles); - this.metaStateHandle = Preconditions.checkNotNull(metaStateHandle); - this.registered = false; - } - - @Override - public KeyGroupRange getKeyGroupRange() { - return keyGroupRange; - } - - long getCheckpointId() { - return checkpointId; - } - - Map<String, StreamStateHandle> getUnregisteredSstFiles() { - return unregisteredSstFiles; - } - - Map<String, StreamStateHandle> getRegisteredSstFiles() { - return registeredSstFiles; - } - - Map<String, StreamStateHandle> getMiscFiles() { - return miscFiles; - } - - StreamStateHandle getMetaStateHandle() { - return metaStateHandle; - } - - @Override - public KeyedStateHandle getIntersection(KeyGroupRange keyGroupRange) { - if (this.keyGroupRange.getIntersection(keyGroupRange) != KeyGroupRange.EMPTY_KEY_GROUP_RANGE) { - return this; - } else { - return null; - } - } - - @Override - public void discardState() throws Exception { - - Preconditions.checkState(!registered, "Attempt to dispose a registered composite state with registered shared state. Must unregister first."); - - try { - metaStateHandle.discardState(); - } catch (Exception e) { - LOG.warn("Could not properly discard meta data.", e); - } - - try { - StateUtil.bestEffortDiscardAllStateObjects(miscFiles.values()); - } catch (Exception e) { - LOG.warn("Could not properly discard misc file states.", e); - } - - try { - StateUtil.bestEffortDiscardAllStateObjects(unregisteredSstFiles.values()); - } catch (Exception e) { - LOG.warn("Could not properly discard new sst file states.", e); - } - - } - - @Override - public long getStateSize() { - long size = StateUtil.getStateSize(metaStateHandle); - - for (StreamStateHandle newSstFileHandle : unregisteredSstFiles.values()) { - size += newSstFileHandle.getStateSize(); - } - - for (StreamStateHandle oldSstFileHandle : registeredSstFiles.values()) { - size += oldSstFileHandle.getStateSize(); - } - - for (StreamStateHandle miscFileHandle : miscFiles.values()) { - size += miscFileHandle.getStateSize(); - } - - return size; - } - - @Override - public void registerSharedStates(SharedStateRegistry stateRegistry) { - - Preconditions.checkState(!registered, "The state handle has already registered its shared states."); - - for (Map.Entry<String, StreamStateHandle> newSstFileEntry : unregisteredSstFiles.entrySet()) { - SharedStateRegistryKey registryKey = - createSharedStateRegistryKeyFromFileName(newSstFileEntry.getKey()); - - SharedStateRegistry.Result result = - stateRegistry.registerNewReference(registryKey, newSstFileEntry.getValue()); - - // We update our reference with the result from the registry, to prevent the following - // problem: - // A previous checkpoint n has already registered the state. This can happen if a - // following checkpoint (n + x) wants to reference the same state before the backend got - // notified that checkpoint n completed. In this case, the shared registry did - // deduplication and returns the previous reference. - newSstFileEntry.setValue(result.getReference()); - } - - for (Map.Entry<String, StreamStateHandle> oldSstFileName : registeredSstFiles.entrySet()) { - SharedStateRegistryKey registryKey = - createSharedStateRegistryKeyFromFileName(oldSstFileName.getKey()); - - SharedStateRegistry.Result result = stateRegistry.obtainReference(registryKey); - - // Again we update our state handle with the result from the registry, thus replacing - // placeholder state handles with the originals. - oldSstFileName.setValue(result.getReference()); - } - - // Migrate state from unregistered to registered, so that it will not count as private state - // for #discardState() from now. - registeredSstFiles.putAll(unregisteredSstFiles); - unregisteredSstFiles.clear(); - - registered = true; - } - - @Override - public void unregisterSharedStates(SharedStateRegistry stateRegistry) { - - Preconditions.checkState(registered, "The state handle has not registered its shared states yet."); - - for (Map.Entry<String, StreamStateHandle> newSstFileEntry : unregisteredSstFiles.entrySet()) { - SharedStateRegistryKey registryKey = - createSharedStateRegistryKeyFromFileName(newSstFileEntry.getKey()); - stateRegistry.releaseReference(registryKey); - } - - for (Map.Entry<String, StreamStateHandle> oldSstFileEntry : registeredSstFiles.entrySet()) { - SharedStateRegistryKey registryKey = - createSharedStateRegistryKeyFromFileName(oldSstFileEntry.getKey()); - stateRegistry.releaseReference(registryKey); - } - - registered = false; - } - - private SharedStateRegistryKey createSharedStateRegistryKeyFromFileName(String fileName) { - return new SharedStateRegistryKey(operatorIdentifier + "-" + keyGroupRange + "-" + fileName); - } -} - http://git-wip-us.apache.org/repos/asf/flink/blob/098e46f2/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java index b9468f7..4bd94fd 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java @@ -55,13 +55,16 @@ import org.apache.flink.runtime.query.TaskKvStateRegistry; import org.apache.flink.runtime.state.AbstractKeyedStateBackend; import org.apache.flink.runtime.state.CheckpointStreamFactory; import org.apache.flink.runtime.state.DoneFuture; +import org.apache.flink.runtime.state.IncrementalKeyedStateHandle; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyGroupRangeAssignment; import org.apache.flink.runtime.state.KeyGroupRangeOffsets; import org.apache.flink.runtime.state.KeyGroupsStateHandle; import org.apache.flink.runtime.state.KeyedBackendSerializationProxy; import org.apache.flink.runtime.state.KeyedStateHandle; +import org.apache.flink.runtime.state.PlaceholderStreamStateHandle; import org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo; +import org.apache.flink.runtime.state.StateHandleID; import org.apache.flink.runtime.state.StateMigrationUtil; import org.apache.flink.runtime.state.StateObject; import org.apache.flink.runtime.state.StateUtil; @@ -72,7 +75,6 @@ import org.apache.flink.runtime.state.internal.InternalListState; import org.apache.flink.runtime.state.internal.InternalMapState; import org.apache.flink.runtime.state.internal.InternalReducingState; import org.apache.flink.runtime.state.internal.InternalValueState; -import org.apache.flink.runtime.state.memory.ByteStreamStateHandle; import org.apache.flink.runtime.util.SerializableObject; import org.apache.flink.util.FileUtils; import org.apache.flink.util.IOUtils; @@ -172,7 +174,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { private final boolean enableIncrementalCheckpointing; /** The sst files materialized in pending checkpoints */ - private final SortedMap<Long, Map<String, StreamStateHandle>> materializedSstFiles = new TreeMap<>(); + private final SortedMap<Long, Map<StateHandleID, StreamStateHandle>> materializedSstFiles = new TreeMap<>(); /** The identifier of the last completed checkpoint */ private long lastCompletedCheckpointId = -1; @@ -723,7 +725,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { private final long checkpointTimestamp; /** All sst files that were part of the last previously completed checkpoint */ - private Map<String, StreamStateHandle> baseSstFiles; + private Map<StateHandleID, StreamStateHandle> baseSstFiles; /** The state meta data */ private final List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> stateMetaInfoSnapshots = new ArrayList<>(); @@ -735,13 +737,13 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { private final CloseableRegistry closeableRegistry = new CloseableRegistry(); // new sst files since the last completed checkpoint - private final Map<String, StreamStateHandle> newSstFiles = new HashMap<>(); + private final Map<StateHandleID, StreamStateHandle> newSstFiles = new HashMap<>(); // old sst files which have been materialized in previous completed checkpoints - private final Map<String, StreamStateHandle> oldSstFiles = new HashMap<>(); + private final Map<StateHandleID, StreamStateHandle> oldSstFiles = new HashMap<>(); // handles to the misc files in the current snapshot - private final Map<String, StreamStateHandle> miscFiles = new HashMap<>(); + private final Map<StateHandleID, StreamStateHandle> miscFiles = new HashMap<>(); private StreamStateHandle metaStateHandle = null; @@ -865,8 +867,9 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { FileStatus[] fileStatuses = backupFileSystem.listStatus(backupPath); if (fileStatuses != null) { for (FileStatus fileStatus : fileStatuses) { - Path filePath = fileStatus.getPath(); - String fileName = filePath.getName(); + final Path filePath = fileStatus.getPath(); + final String fileName = filePath.getName(); + final StateHandleID stateHandleID = new StateHandleID(fileName); if (fileName.endsWith(SST_FILE_SUFFIX)) { StreamStateHandle fileHandle = @@ -874,20 +877,24 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { if (fileHandle == null) { fileHandle = materializeStateData(filePath); - newSstFiles.put(fileName, fileHandle); + newSstFiles.put(stateHandleID, fileHandle); } else { // we introduce a placeholder state handle, that is replaced with the // original from the shared state registry (created from a previous checkpoint) - oldSstFiles.put(fileName, new PlaceholderStreamStateHandle(fileHandle.getStateSize())); + oldSstFiles.put( + stateHandleID, + new PlaceholderStreamStateHandle(fileHandle.getStateSize())); } } else { StreamStateHandle fileHandle = materializeStateData(filePath); - miscFiles.put(fileName, fileHandle); + miscFiles.put(stateHandleID, fileHandle); } } } - Map<String, StreamStateHandle> sstFiles = new HashMap<>(newSstFiles.size() + oldSstFiles.size()); + Map<StateHandleID, StreamStateHandle> sstFiles = + new HashMap<>(newSstFiles.size() + oldSstFiles.size()); + sstFiles.putAll(newSstFiles); sstFiles.putAll(oldSstFiles); @@ -895,7 +902,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { stateBackend.materializedSstFiles.put(checkpointId, sstFiles); } - return new RocksDBIncrementalKeyedStateHandle( + return new IncrementalKeyedStateHandle( stateBackend.operatorIdentifier, stateBackend.keyGroupRange, checkpointId, @@ -940,39 +947,6 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { } } } - - /** - * A placeholder state handle for shared state that will replaced by an original that was - * created in a previous checkpoint. So we don't have to send the handle twice, e.g. in - * case of {@link ByteStreamStateHandle}. - */ - private static final class PlaceholderStreamStateHandle implements StreamStateHandle { - - private static final long serialVersionUID = 1L; - - /** We remember the size of the original file for which this is a placeholder */ - private final long originalSize; - - public PlaceholderStreamStateHandle(long originalSize) { - this.originalSize = originalSize; - } - - @Override - public FSDataInputStream openInputStream() { - throw new UnsupportedOperationException( - "This is only a placeholder to be replaced by a real StreamStateHandle in the checkpoint coordinator."); - } - - @Override - public void discardState() throws Exception { - // nothing to do. - } - - @Override - public long getStateSize() { - return originalSize; - } - } } @Override @@ -989,7 +963,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { } else if (MigrationUtil.isOldSavepointKeyedState(restoreState)) { LOG.info("Converting RocksDB state from old savepoint."); restoreOldSavepointKeyedState(restoreState); - } else if (restoreState.iterator().next() instanceof RocksDBIncrementalKeyedStateHandle) { + } else if (restoreState.iterator().next() instanceof IncrementalKeyedStateHandle) { RocksDBIncrementalRestoreOperation restoreOperation = new RocksDBIncrementalRestoreOperation(this); restoreOperation.restore(restoreState); } else { @@ -1302,7 +1276,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { } private void restoreInstance( - RocksDBIncrementalKeyedStateHandle restoreStateHandle, + IncrementalKeyedStateHandle restoreStateHandle, boolean hasExtraKeys) throws Exception { // read state data @@ -1311,29 +1285,16 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { UUID.randomUUID().toString()); try { - Map<String, StreamStateHandle> newSstFiles = restoreStateHandle.getUnregisteredSstFiles(); - for (Map.Entry<String, StreamStateHandle> newSstFileEntry : newSstFiles.entrySet()) { - String fileName = newSstFileEntry.getKey(); - StreamStateHandle remoteFileHandle = newSstFileEntry.getValue(); + final Map<StateHandleID, StreamStateHandle> newSstFiles = + restoreStateHandle.getCreatedSharedState(); + final Map<StateHandleID, StreamStateHandle> oldSstFiles = + restoreStateHandle.getReferencedSharedState(); + final Map<StateHandleID, StreamStateHandle> miscFiles = + restoreStateHandle.getPrivateState(); - readStateData(new Path(restoreInstancePath, fileName), remoteFileHandle); - } - - Map<String, StreamStateHandle> oldSstFiles = restoreStateHandle.getRegisteredSstFiles(); - for (Map.Entry<String, StreamStateHandle> oldSstFileEntry : oldSstFiles.entrySet()) { - String fileName = oldSstFileEntry.getKey(); - StreamStateHandle remoteFileHandle = oldSstFileEntry.getValue(); - - readStateData(new Path(restoreInstancePath, fileName), remoteFileHandle); - } - - Map<String, StreamStateHandle> miscFiles = restoreStateHandle.getMiscFiles(); - for (Map.Entry<String, StreamStateHandle> miscFileEntry : miscFiles.entrySet()) { - String fileName = miscFileEntry.getKey(); - StreamStateHandle remoteFileHandle = miscFileEntry.getValue(); - - readStateData(new Path(restoreInstancePath, fileName), remoteFileHandle); - } + readAllStateData(newSstFiles, restoreInstancePath); + readAllStateData(oldSstFiles, restoreInstancePath); + readAllStateData(miscFiles, restoreInstancePath); // read meta data List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> stateMetaInfoSnapshots = @@ -1425,26 +1386,9 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { throw new IOException("Could not create RocksDB data directory."); } - for (String newSstFileName : newSstFiles.keySet()) { - File restoreFile = new File(restoreInstancePath.getPath(), newSstFileName); - File targetFile = new File(stateBackend.instanceRocksDBPath, newSstFileName); - - Files.createLink(targetFile.toPath(), restoreFile.toPath()); - } - - for (String oldSstFileName : oldSstFiles.keySet()) { - File restoreFile = new File(restoreInstancePath.getPath(), oldSstFileName); - File targetFile = new File(stateBackend.instanceRocksDBPath, oldSstFileName); - - Files.createLink(targetFile.toPath(), restoreFile.toPath()); - } - - for (String miscFileName : miscFiles.keySet()) { - File restoreFile = new File(restoreInstancePath.getPath(), miscFileName); - File targetFile = new File(stateBackend.instanceRocksDBPath, miscFileName); - - Files.createLink(targetFile.toPath(), restoreFile.toPath()); - } + createFileHardLinksInRestorePath(newSstFiles, restoreInstancePath); + createFileHardLinksInRestorePath(oldSstFiles, restoreInstancePath); + createFileHardLinksInRestorePath(miscFiles, restoreInstancePath); List<ColumnFamilyHandle> columnFamilyHandles = new ArrayList<>(); stateBackend.db = stateBackend.openDB( @@ -1470,7 +1414,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { // use the restore sst files as the base for succeeding checkpoints - Map<String, StreamStateHandle> sstFiles = new HashMap<>(); + Map<StateHandleID, StreamStateHandle> sstFiles = new HashMap<>(); sstFiles.putAll(newSstFiles); sstFiles.putAll(oldSstFiles); stateBackend.materializedSstFiles.put(restoreStateHandle.getCheckpointId(), sstFiles); @@ -1485,6 +1429,29 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { } } + private void readAllStateData( + Map<StateHandleID, StreamStateHandle> stateHandleMap, + Path restoreInstancePath) throws IOException { + + for (Map.Entry<StateHandleID, StreamStateHandle> entry : stateHandleMap.entrySet()) { + StateHandleID stateHandleID = entry.getKey(); + StreamStateHandle remoteFileHandle = entry.getValue(); + readStateData(new Path(restoreInstancePath, stateHandleID.toString()), remoteFileHandle); + } + } + + private void createFileHardLinksInRestorePath( + Map<StateHandleID, StreamStateHandle> stateHandleMap, + Path restoreInstancePath) throws IOException { + + for (StateHandleID stateHandleID : stateHandleMap.keySet()) { + String newSstFileName = stateHandleID.toString(); + File restoreFile = new File(restoreInstancePath.getPath(), newSstFileName); + File targetFile = new File(stateBackend.instanceRocksDBPath, newSstFileName); + Files.createLink(targetFile.toPath(), restoreFile.toPath()); + } + } + void restore(Collection<KeyedStateHandle> restoreStateHandles) throws Exception { boolean hasExtraKeys = (restoreStateHandles.size() > 1 || @@ -1496,13 +1463,13 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { for (KeyedStateHandle rawStateHandle : restoreStateHandles) { - if (! (rawStateHandle instanceof RocksDBIncrementalKeyedStateHandle)) { + if (! (rawStateHandle instanceof IncrementalKeyedStateHandle)) { throw new IllegalStateException("Unexpected state handle type, " + - "expected " + RocksDBIncrementalKeyedStateHandle.class + + "expected " + IncrementalKeyedStateHandle.class + ", but found " + rawStateHandle.getClass()); } - RocksDBIncrementalKeyedStateHandle keyedStateHandle = (RocksDBIncrementalKeyedStateHandle) rawStateHandle; + IncrementalKeyedStateHandle keyedStateHandle = (IncrementalKeyedStateHandle) rawStateHandle; restoreInstance(keyedStateHandle, hasExtraKeys); } http://git-wip-us.apache.org/repos/asf/flink/blob/098e46f2/flink-core/src/main/java/org/apache/flink/util/StringBasedID.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/util/StringBasedID.java b/flink-core/src/main/java/org/apache/flink/util/StringBasedID.java new file mode 100644 index 0000000..7245e61 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/util/StringBasedID.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.util; + +import java.io.Serializable; + +/** + * Base class for typed IDs that are internally represented by a string. This class is not intended + * for direct use, but should be subclassed for type-safety. + */ +public class StringBasedID implements Serializable { + + private static final long serialVersionUID = 1L; + + /** + * Uses a String as internal representation + */ + private final String keyString; + + /** + * Protected constructor to enfore that subclassing. + */ + protected StringBasedID(String keyString) { + this.keyString = Preconditions.checkNotNull(keyString); + } + + public String getKeyString() { + return keyString; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + StringBasedID that = (StringBasedID) o; + return keyString.equals(that.keyString); + } + + @Override + public int hashCode() { + return keyString.hashCode(); + } + + @Override + public String toString() { + return keyString; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/098e46f2/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java index 1b5f2c6..b71418b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java @@ -20,14 +20,17 @@ package org.apache.flink.runtime.checkpoint.savepoint; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.checkpoint.MasterState; -import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.checkpoint.OperatorState; import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.state.IncrementalKeyedStateHandle; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyGroupRangeOffsets; import org.apache.flink.runtime.state.KeyGroupsStateHandle; import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.runtime.state.OperatorStateHandle; +import org.apache.flink.runtime.state.PlaceholderStreamStateHandle; +import org.apache.flink.runtime.state.StateHandleID; import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.state.filesystem.FileStateHandle; import org.apache.flink.runtime.state.memory.ByteStreamStateHandle; @@ -71,6 +74,8 @@ class SavepointV2Serializer implements SavepointSerializer<SavepointV2> { private static final byte FILE_STREAM_STATE_HANDLE = 2; private static final byte KEY_GROUPS_HANDLE = 3; private static final byte PARTITIONABLE_OPERATOR_STATE_HANDLE = 4; + private static final byte INCREMENTAL_KEY_GROUPS_HANDLE = 5; + private static final byte PLACEHOLDER_STREAM_STATE_HANDLE = 6; /** The singleton instance of the serializer */ public static final SavepointV2Serializer INSTANCE = new SavepointV2Serializer(); @@ -287,7 +292,6 @@ class SavepointV2Serializer implements SavepointSerializer<SavepointV2> { KeyedStateHandle keyedStateStream = deserializeKeyedStateHandle(dis); - return new OperatorSubtaskState( nonPartitionableState, operatorStateBackend, @@ -311,19 +315,63 @@ class SavepointV2Serializer implements SavepointSerializer<SavepointV2> { dos.writeLong(keyGroupsStateHandle.getOffsetForKeyGroup(keyGroup)); } serializeStreamStateHandle(keyGroupsStateHandle.getDelegateStateHandle(), dos); + } else if (stateHandle instanceof IncrementalKeyedStateHandle) { + IncrementalKeyedStateHandle incrementalKeyedStateHandle = + (IncrementalKeyedStateHandle) stateHandle; + + dos.writeByte(INCREMENTAL_KEY_GROUPS_HANDLE); + + dos.writeLong(incrementalKeyedStateHandle.getCheckpointId()); + dos.writeUTF(incrementalKeyedStateHandle.getOperatorIdentifier()); + dos.writeInt(incrementalKeyedStateHandle.getKeyGroupRange().getStartKeyGroup()); + dos.writeInt(incrementalKeyedStateHandle.getKeyGroupRange().getNumberOfKeyGroups()); + + serializeStreamStateHandle(incrementalKeyedStateHandle.getMetaStateHandle(), dos); + + serializeStreamStateHandleMap(incrementalKeyedStateHandle.getCreatedSharedState(), dos); + serializeStreamStateHandleMap(incrementalKeyedStateHandle.getReferencedSharedState(), dos); + serializeStreamStateHandleMap(incrementalKeyedStateHandle.getPrivateState(), dos); } else { throw new IllegalStateException("Unknown KeyedStateHandle type: " + stateHandle.getClass()); } } + private static void serializeStreamStateHandleMap( + Map<StateHandleID, StreamStateHandle> map, + DataOutputStream dos) throws IOException { + dos.writeInt(map.size()); + for (Map.Entry<StateHandleID, StreamStateHandle> entry : map.entrySet()) { + dos.writeUTF(entry.getKey().toString()); + serializeStreamStateHandle(entry.getValue(), dos); + } + } + + private static Map<StateHandleID, StreamStateHandle> deserializeStreamStateHandleMap( + DataInputStream dis) throws IOException { + + final int size = dis.readInt(); + Map<StateHandleID, StreamStateHandle> result = new HashMap<>(size); + + for (int i = 0; i < size; ++i) { + StateHandleID stateHandleID = new StateHandleID(dis.readUTF()); + StreamStateHandle stateHandle = deserializeStreamStateHandle(dis); + result.put(stateHandleID, stateHandle); + } + + return result; + } + private static KeyedStateHandle deserializeKeyedStateHandle(DataInputStream dis) throws IOException { final int type = dis.readByte(); if (NULL_HANDLE == type) { + return null; } else if (KEY_GROUPS_HANDLE == type) { + int startKeyGroup = dis.readInt(); int numKeyGroups = dis.readInt(); - KeyGroupRange keyGroupRange = KeyGroupRange.of(startKeyGroup, startKeyGroup + numKeyGroups - 1); + KeyGroupRange keyGroupRange = + KeyGroupRange.of(startKeyGroup, startKeyGroup + numKeyGroups - 1); long[] offsets = new long[numKeyGroups]; for (int i = 0; i < numKeyGroups; ++i) { offsets[i] = dis.readLong(); @@ -332,6 +380,28 @@ class SavepointV2Serializer implements SavepointSerializer<SavepointV2> { keyGroupRange, offsets); StreamStateHandle stateHandle = deserializeStreamStateHandle(dis); return new KeyGroupsStateHandle(keyGroupRangeOffsets, stateHandle); + } else if (INCREMENTAL_KEY_GROUPS_HANDLE == type) { + + long checkpointId = dis.readLong(); + String operatorId = dis.readUTF(); + int startKeyGroup = dis.readInt(); + int numKeyGroups = dis.readInt(); + KeyGroupRange keyGroupRange = + KeyGroupRange.of(startKeyGroup, startKeyGroup + numKeyGroups - 1); + + StreamStateHandle metaDataStateHandle = deserializeStreamStateHandle(dis); + Map<StateHandleID, StreamStateHandle> createdStates = deserializeStreamStateHandleMap(dis); + Map<StateHandleID, StreamStateHandle> referencedStates = deserializeStreamStateHandleMap(dis); + Map<StateHandleID, StreamStateHandle> privateStates = deserializeStreamStateHandleMap(dis); + + return new IncrementalKeyedStateHandle( + operatorId, + keyGroupRange, + checkpointId, + createdStates, + referencedStates, + privateStates, + metaDataStateHandle); } else { throw new IllegalStateException("Reading invalid KeyedStateHandle, type: " + type); } @@ -415,7 +485,10 @@ class SavepointV2Serializer implements SavepointSerializer<SavepointV2> { byte[] internalData = byteStreamStateHandle.getData(); dos.writeInt(internalData.length); dos.write(byteStreamStateHandle.getData()); - + } else if (stateHandle instanceof PlaceholderStreamStateHandle) { + PlaceholderStreamStateHandle placeholder = (PlaceholderStreamStateHandle) stateHandle; + dos.writeByte(PLACEHOLDER_STREAM_STATE_HANDLE); + dos.writeLong(placeholder.getStateSize()); } else { throw new IOException("Unknown implementation of StreamStateHandle: " + stateHandle.getClass()); } @@ -437,6 +510,8 @@ class SavepointV2Serializer implements SavepointSerializer<SavepointV2> { byte[] data = new byte[numBytes]; dis.readFully(data); return new ByteStreamStateHandle(handleName, data); + } else if (PLACEHOLDER_STREAM_STATE_HANDLE == type) { + return new PlaceholderStreamStateHandle(dis.readLong()); } else { throw new IOException("Unknown implementation of StreamStateHandle, code: " + type); } http://git-wip-us.apache.org/repos/asf/flink/blob/098e46f2/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandle.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandle.java new file mode 100644 index 0000000..706e219 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandle.java @@ -0,0 +1,324 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; + +/** + * The handle to states of an incremental snapshot. + * <p> + * The states contained in an incremental snapshot include + * <ul> + * <li> Created shared state which includes (the supposed to be) shared files produced since the last + * completed checkpoint. These files can be referenced by succeeding checkpoints if the + * checkpoint succeeds to complete. </li> + * <li> Referenced shared state which includes the shared files materialized in previous + * checkpoints. </li> + * <li> Private state which includes all other files, typically mutable, that cannot be shared by + * other checkpoints. </li> + * <li> Backend meta state which includes the information of existing states. </li> + * </ul> + * + * IMPORTANT: This class currently overrides equals and hash code only for testing purposes. They + * should not be called from production code. This means this class is also not suited to serve as + * a key, e.g. in hash maps. + */ +public class IncrementalKeyedStateHandle implements KeyedStateHandle { + + private static final Logger LOG = LoggerFactory.getLogger(IncrementalKeyedStateHandle.class); + + private static final long serialVersionUID = -8328808513197388231L; + + /** + * The operator instance identifier for this handle + */ + private final String operatorIdentifier; + + /** + * The key-group range covered by this state handle + */ + private final KeyGroupRange keyGroupRange; + + /** + * The checkpoint Id + */ + private final long checkpointId; + + /** + * State that the incremental checkpoint created new + */ + private final Map<StateHandleID, StreamStateHandle> createdSharedState; + + /** + * State that the incremental checkpoint references from previous checkpoints + */ + private final Map<StateHandleID, StreamStateHandle> referencedSharedState; + + /** + * Private state in the incremental checkpoint + */ + private final Map<StateHandleID, StreamStateHandle> privateState; + + /** + * Primary meta data state of the incremental checkpoint + */ + private final StreamStateHandle metaStateHandle; + + /** + * True if the state handle has already registered shared states. + * <p> + * Once the shared states are registered, it's the {@link SharedStateRegistry}'s + * responsibility to maintain the shared states. But in the cases where the + * state handle is discarded before performing the registration, the handle + * should delete all the shared states created by it. + */ + private boolean registered; + + public IncrementalKeyedStateHandle( + String operatorIdentifier, + KeyGroupRange keyGroupRange, + long checkpointId, + Map<StateHandleID, StreamStateHandle> createdSharedState, + Map<StateHandleID, StreamStateHandle> referencedSharedState, + Map<StateHandleID, StreamStateHandle> privateState, + StreamStateHandle metaStateHandle) { + + this.operatorIdentifier = Preconditions.checkNotNull(operatorIdentifier); + this.keyGroupRange = Preconditions.checkNotNull(keyGroupRange); + this.checkpointId = checkpointId; + this.createdSharedState = Preconditions.checkNotNull(createdSharedState); + this.referencedSharedState = Preconditions.checkNotNull(referencedSharedState); + this.privateState = Preconditions.checkNotNull(privateState); + this.metaStateHandle = Preconditions.checkNotNull(metaStateHandle); + this.registered = false; + } + + @Override + public KeyGroupRange getKeyGroupRange() { + return keyGroupRange; + } + + public long getCheckpointId() { + return checkpointId; + } + + public Map<StateHandleID, StreamStateHandle> getCreatedSharedState() { + return createdSharedState; + } + + public Map<StateHandleID, StreamStateHandle> getReferencedSharedState() { + return referencedSharedState; + } + + public Map<StateHandleID, StreamStateHandle> getPrivateState() { + return privateState; + } + + public StreamStateHandle getMetaStateHandle() { + return metaStateHandle; + } + + public String getOperatorIdentifier() { + return operatorIdentifier; + } + + @Override + public KeyedStateHandle getIntersection(KeyGroupRange keyGroupRange) { + if (this.keyGroupRange.getIntersection(keyGroupRange) != KeyGroupRange.EMPTY_KEY_GROUP_RANGE) { + return this; + } else { + return null; + } + } + + @Override + public void discardState() throws Exception { + + Preconditions.checkState(!registered, "Attempt to dispose a registered composite state with registered shared state. Must unregister first."); + + try { + metaStateHandle.discardState(); + } catch (Exception e) { + LOG.warn("Could not properly discard meta data.", e); + } + + try { + StateUtil.bestEffortDiscardAllStateObjects(privateState.values()); + } catch (Exception e) { + LOG.warn("Could not properly discard misc file states.", e); + } + + try { + StateUtil.bestEffortDiscardAllStateObjects(createdSharedState.values()); + } catch (Exception e) { + LOG.warn("Could not properly discard new sst file states.", e); + } + + } + + @Override + public long getStateSize() { + long size = getPrivateStateSize(); + + for (StreamStateHandle oldSstFileHandle : referencedSharedState.values()) { + size += oldSstFileHandle.getStateSize(); + } + + return size; + } + + /** + * Returns the size of the state that is privately owned by this handle. + */ + public long getPrivateStateSize() { + long size = StateUtil.getStateSize(metaStateHandle); + + for (StreamStateHandle newSstFileHandle : createdSharedState.values()) { + size += newSstFileHandle.getStateSize(); + } + + for (StreamStateHandle miscFileHandle : privateState.values()) { + size += miscFileHandle.getStateSize(); + } + + return size; + } + + @Override + public void registerSharedStates(SharedStateRegistry stateRegistry) { + + Preconditions.checkState(!registered, "The state handle has already registered its shared states."); + + for (Map.Entry<StateHandleID, StreamStateHandle> newSstFileEntry : createdSharedState.entrySet()) { + SharedStateRegistryKey registryKey = + createSharedStateRegistryKeyFromFileName(newSstFileEntry.getKey()); + + SharedStateRegistry.Result result = + stateRegistry.registerNewReference(registryKey, newSstFileEntry.getValue()); + + // We update our reference with the result from the registry, to prevent the following + // problem: + // A previous checkpoint n has already registered the state. This can happen if a + // following checkpoint (n + x) wants to reference the same state before the backend got + // notified that checkpoint n completed. In this case, the shared registry did + // deduplication and returns the previous reference. + newSstFileEntry.setValue(result.getReference()); + } + + for (Map.Entry<StateHandleID, StreamStateHandle> oldSstFileName : referencedSharedState.entrySet()) { + SharedStateRegistryKey registryKey = + createSharedStateRegistryKeyFromFileName(oldSstFileName.getKey()); + + SharedStateRegistry.Result result = stateRegistry.obtainReference(registryKey); + + // Again we update our state handle with the result from the registry, thus replacing + // placeholder state handles with the originals. + oldSstFileName.setValue(result.getReference()); + } + + // Migrate state from unregistered to registered, so that it will not count as private state + // for #discardState() from now. + referencedSharedState.putAll(createdSharedState); + createdSharedState.clear(); + + registered = true; + } + + @Override + public void unregisterSharedStates(SharedStateRegistry stateRegistry) { + + Preconditions.checkState(registered, "The state handle has not registered its shared states yet."); + + for (Map.Entry<StateHandleID, StreamStateHandle> newSstFileEntry : createdSharedState.entrySet()) { + SharedStateRegistryKey registryKey = + createSharedStateRegistryKeyFromFileName(newSstFileEntry.getKey()); + stateRegistry.releaseReference(registryKey); + } + + for (Map.Entry<StateHandleID, StreamStateHandle> oldSstFileEntry : referencedSharedState.entrySet()) { + SharedStateRegistryKey registryKey = + createSharedStateRegistryKeyFromFileName(oldSstFileEntry.getKey()); + stateRegistry.releaseReference(registryKey); + } + + registered = false; + } + + private SharedStateRegistryKey createSharedStateRegistryKeyFromFileName(StateHandleID shId) { + return new SharedStateRegistryKey(operatorIdentifier + '-' + keyGroupRange, shId); + } + + /** + * This method is should only be called in tests! This should never serve as key in a hash map. + */ + @VisibleForTesting + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + IncrementalKeyedStateHandle that = (IncrementalKeyedStateHandle) o; + + if (getCheckpointId() != that.getCheckpointId()) { + return false; + } + if (!getOperatorIdentifier().equals(that.getOperatorIdentifier())) { + return false; + } + if (!getKeyGroupRange().equals(that.getKeyGroupRange())) { + return false; + } + if (!getCreatedSharedState().equals(that.getCreatedSharedState())) { + return false; + } + if (!getReferencedSharedState().equals(that.getReferencedSharedState())) { + return false; + } + if (!getPrivateState().equals(that.getPrivateState())) { + return false; + } + return getMetaStateHandle().equals(that.getMetaStateHandle()); + } + + /** + * This method should only be called in tests! This should never serve as key in a hash map. + */ + @VisibleForTesting + @Override + public int hashCode() { + int result = getOperatorIdentifier().hashCode(); + result = 31 * result + getKeyGroupRange().hashCode(); + result = 31 * result + (int) (getCheckpointId() ^ (getCheckpointId() >>> 32)); + result = 31 * result + getCreatedSharedState().hashCode(); + result = 31 * result + getReferencedSharedState().hashCode(); + result = 31 * result + getPrivateState().hashCode(); + result = 31 * result + getMetaStateHandle().hashCode(); + return result; + } +} + http://git-wip-us.apache.org/repos/asf/flink/blob/098e46f2/flink-runtime/src/main/java/org/apache/flink/runtime/state/PlaceholderStreamStateHandle.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/PlaceholderStreamStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/PlaceholderStreamStateHandle.java new file mode 100644 index 0000000..2136061 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/PlaceholderStreamStateHandle.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.core.fs.FSDataInputStream; +import org.apache.flink.runtime.state.memory.ByteStreamStateHandle; + +/** + * A placeholder state handle for shared state that will replaced by an original that was + * created in a previous checkpoint. So we don't have to send the handle twice, e.g. in + * case of {@link ByteStreamStateHandle}. To be used in the referenced states of + * {@link IncrementalKeyedStateHandle}. + * <p> + * IMPORTANT: This class currently overrides equals and hash code only for testing purposes. They + * should not be called from production code. This means this class is also not suited to serve as + * a key, e.g. in hash maps. + */ +public class PlaceholderStreamStateHandle implements StreamStateHandle { + + private static final long serialVersionUID = 1L; + + /** We remember the size of the original file for which this is a placeholder */ + private final long originalSize; + + public PlaceholderStreamStateHandle(long originalSize) { + this.originalSize = originalSize; + } + + @Override + public FSDataInputStream openInputStream() { + throw new UnsupportedOperationException( + "This is only a placeholder to be replaced by a real StreamStateHandle in the checkpoint coordinator."); + } + + @Override + public void discardState() throws Exception { + // nothing to do. + } + + @Override + public long getStateSize() { + return originalSize; + } + + /** + * This method is should only be called in tests! This should never serve as key in a hash map. + */ + @VisibleForTesting + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + PlaceholderStreamStateHandle that = (PlaceholderStreamStateHandle) o; + + return originalSize == that.originalSize; + } + + /** + * This method is should only be called in tests! This should never serve as key in a hash map. + */ + @VisibleForTesting + @Override + public int hashCode() { + return (int) (originalSize ^ (originalSize >>> 32)); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/098e46f2/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateHandle.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateHandle.java deleted file mode 100644 index c8c4046..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateHandle.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.state; - -/** - * A handle to those states that are referenced by different checkpoints. - * - * <p> Each shared state handle is identified by a unique key. Two shared states - * are considered equal if their keys are identical. - * - * <p> All shared states are registered at the {@link SharedStateRegistry} once - * they are received by the {@link org.apache.flink.runtime.checkpoint.CheckpointCoordinator} - * and will be unregistered when the checkpoints are discarded. A shared state - * will be discarded once it is not referenced by any checkpoint. A shared state - * should not be referenced any more if it has been discarded. - */ -public interface SharedStateHandle extends StateObject { - - /** - * Return the identifier of the shared state. - */ - String getRegistrationKey(); -} http://git-wip-us.apache.org/repos/asf/flink/blob/098e46f2/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java index dbf4642..9cfdec7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java @@ -31,7 +31,7 @@ import java.util.concurrent.Executor; /** * A {@code SharedStateRegistry} will be deployed in the * {@link org.apache.flink.runtime.checkpoint.CompletedCheckpointStore} to - * maintain the reference count of {@link SharedStateHandle}s which are shared + * maintain the reference count of {@link StreamStateHandle}s which are shared * among different incremental checkpoints. */ public class SharedStateRegistry { http://git-wip-us.apache.org/repos/asf/flink/blob/098e46f2/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryKey.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryKey.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryKey.java index 9e59359..58262ca 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryKey.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryKey.java @@ -18,9 +18,8 @@ package org.apache.flink.runtime.state; -import org.apache.flink.util.Preconditions; - -import java.io.Serializable; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.util.StringBasedID; /** * This class represents a key that uniquely identifies (on a logical level) state handles for @@ -28,41 +27,16 @@ import java.io.Serializable; * be the same should have the same {@link SharedStateRegistryKey}. The meaning of logical * equivalence is up to the application. */ -public class SharedStateRegistryKey implements Serializable { +public class SharedStateRegistryKey extends StringBasedID { private static final long serialVersionUID = 1L; - /** Uses a String as internal representation */ - private final String keyString; - - public SharedStateRegistryKey(String keyString) { - this.keyString = Preconditions.checkNotNull(keyString); - } - - public String getKeyString() { - return keyString; + public SharedStateRegistryKey(String prefix, StateHandleID stateHandleID) { + super(prefix + '-' + stateHandleID); } - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - - SharedStateRegistryKey that = (SharedStateRegistryKey) o; - return keyString.equals(that.keyString); - } - - @Override - public int hashCode() { - return keyString.hashCode(); - } - - @Override - public String toString() { - return keyString; + @VisibleForTesting + public SharedStateRegistryKey(String keyString) { + super(keyString); } } http://git-wip-us.apache.org/repos/asf/flink/blob/098e46f2/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateHandle.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateHandle.java deleted file mode 100644 index b736252..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateHandle.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.state; - -/** - * StateHandle is a general handle interface meant to abstract operator state fetching. - * A StateHandle implementation can for example include the state itself in cases where the state - * is lightweight or fetching it lazily from some external storage when the state is too large. - */ -public interface StateHandle<T> extends StateObject { - - /** - * This retrieves and return the state represented by the handle. - * - * @param userCodeClassLoader Class loader for deserializing user code specific classes - * - * @return The state represented by the handle. - * @throws java.lang.Exception Thrown, if the state cannot be fetched. - */ - T getState(ClassLoader userCodeClassLoader) throws Exception; -} http://git-wip-us.apache.org/repos/asf/flink/blob/098e46f2/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateHandleID.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateHandleID.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateHandleID.java new file mode 100644 index 0000000..5e95cff --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateHandleID.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.util.StringBasedID; + +/** + * Unique ID that allows for logical comparison between state handles. + * <p> + * Two state handles that are considered as logically equal should always return the same ID + * (whatever logically equal means is up to the implementation). For example, this could be based + * on the string representation of the full filepath for a state that is based on a file. + */ +public class StateHandleID extends StringBasedID { + + private static final long serialVersionUID = 1L; + + public StateHandleID(String keyString) { + super(keyString); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/098e46f2/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/CheckpointTestUtils.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/CheckpointTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/CheckpointTestUtils.java index ba77dbc..b63782d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/CheckpointTestUtils.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/CheckpointTestUtils.java @@ -27,10 +27,15 @@ import org.apache.flink.runtime.checkpoint.TaskState; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.state.ChainedStateHandle; +import org.apache.flink.runtime.state.IncrementalKeyedStateHandle; +import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyGroupRangeOffsets; import org.apache.flink.runtime.state.KeyGroupsStateHandle; +import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.runtime.state.OperatorStateHandle; import org.apache.flink.runtime.state.OperatorStateHandle.StateMetaInfo; +import org.apache.flink.runtime.state.PlaceholderStreamStateHandle; +import org.apache.flink.runtime.state.StateHandleID; import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.util.TestByteStreamStateHandleDeepCompare; import org.apache.flink.util.StringUtils; @@ -41,6 +46,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Random; +import java.util.UUID; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; @@ -78,6 +84,7 @@ public class CheckpointTestUtils { boolean hasKeyedBackend = random.nextInt(4) != 0; boolean hasKeyedStream = random.nextInt(4) != 0; + boolean isIncremental = random.nextInt(3) == 0; for (int subtaskIdx = 0; subtaskIdx < numSubtasksPerTask; subtaskIdx++) { @@ -108,21 +115,19 @@ public class CheckpointTestUtils { operatorStateHandleStream = new OperatorStateHandle(offsetsMap, operatorStateStream); } - KeyGroupsStateHandle keyedStateBackend = null; - KeyGroupsStateHandle keyedStateStream = null; + KeyedStateHandle keyedStateBackend = null; + KeyedStateHandle keyedStateStream = null; if (hasKeyedBackend) { - keyedStateBackend = new KeyGroupsStateHandle( - new KeyGroupRangeOffsets(1, 1, new long[]{42}), - new TestByteStreamStateHandleDeepCompare("c", "Hello" - .getBytes(ConfigConstants.DEFAULT_CHARSET))); + if (isIncremental) { + keyedStateBackend = createDummyIncrementalKeyedStateHandle(random); + } else { + keyedStateBackend = createDummyKeyGroupStateHandle(random); + } } if (hasKeyedStream) { - keyedStateStream = new KeyGroupsStateHandle( - new KeyGroupRangeOffsets(1, 1, new long[]{23}), - new TestByteStreamStateHandleDeepCompare("d", "World" - .getBytes(ConfigConstants.DEFAULT_CHARSET))); + keyedStateStream = createDummyKeyGroupStateHandle(random); } taskState.putState(subtaskIdx, new OperatorSubtaskState( @@ -210,17 +215,11 @@ public class CheckpointTestUtils { KeyGroupsStateHandle keyedStateStream = null; if (hasKeyedBackend) { - keyedStateBackend = new KeyGroupsStateHandle( - new KeyGroupRangeOffsets(1, 1, new long[]{42}), - new TestByteStreamStateHandleDeepCompare("c", "Hello" - .getBytes(ConfigConstants.DEFAULT_CHARSET))); + keyedStateBackend = createDummyKeyGroupStateHandle(random); } if (hasKeyedStream) { - keyedStateStream = new KeyGroupsStateHandle( - new KeyGroupRangeOffsets(1, 1, new long[]{23}), - new TestByteStreamStateHandleDeepCompare("d", "World" - .getBytes(ConfigConstants.DEFAULT_CHARSET))); + keyedStateStream = createDummyKeyGroupStateHandle(random); } taskState.putState(subtaskIdx, new SubtaskState( @@ -272,4 +271,56 @@ public class CheckpointTestUtils { /** utility class, not meant to be instantiated */ private CheckpointTestUtils() {} + + + private static IncrementalKeyedStateHandle createDummyIncrementalKeyedStateHandle(Random rnd) { + return new IncrementalKeyedStateHandle( + createRandomUUID(rnd).toString(), + new KeyGroupRange(1, 1), + 42L, + createRandomOwnedHandleMap(rnd), + createRandomReferencedHandleMap(rnd), + createRandomOwnedHandleMap(rnd), + createDummyStreamStateHandle(rnd)); + } + + private static Map<StateHandleID, StreamStateHandle> createRandomOwnedHandleMap(Random rnd) { + final int size = rnd.nextInt(4); + Map<StateHandleID, StreamStateHandle> result = new HashMap<>(size); + for (int i = 0; i < size; ++i) { + StateHandleID randomId = new StateHandleID(createRandomUUID(rnd).toString()); + StreamStateHandle stateHandle = createDummyStreamStateHandle(rnd); + result.put(randomId, stateHandle); + } + + return result; + } + + private static Map<StateHandleID, StreamStateHandle> createRandomReferencedHandleMap(Random rnd) { + final int size = rnd.nextInt(4); + Map<StateHandleID, StreamStateHandle> result = new HashMap<>(size); + for (int i = 0; i < size; ++i) { + StateHandleID randomId = new StateHandleID(createRandomUUID(rnd).toString()); + result.put(randomId, new PlaceholderStreamStateHandle(rnd.nextInt(1024))); + } + + return result; + } + + private static KeyGroupsStateHandle createDummyKeyGroupStateHandle(Random rnd) { + return new KeyGroupsStateHandle( + new KeyGroupRangeOffsets(1, 1, new long[]{rnd.nextInt(1024)}), + createDummyStreamStateHandle(rnd)); + } + + private static StreamStateHandle createDummyStreamStateHandle(Random rnd) { + return new TestByteStreamStateHandleDeepCompare( + String.valueOf(createRandomUUID(rnd)), + String.valueOf(createRandomUUID(rnd)).getBytes(ConfigConstants.DEFAULT_CHARSET)); + } + + private static UUID createRandomUUID(Random rnd) { + return new UUID(rnd.nextLong(), rnd.nextLong()); + } + } http://git-wip-us.apache.org/repos/asf/flink/blob/098e46f2/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2SerializerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2SerializerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2SerializerTest.java index 154d761..602390b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2SerializerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2SerializerTest.java @@ -24,7 +24,6 @@ import org.apache.flink.core.memory.DataInputViewStreamWrapper; import org.apache.flink.core.memory.DataOutputViewStreamWrapper; import org.apache.flink.runtime.checkpoint.MasterState; import org.apache.flink.runtime.checkpoint.OperatorState; - import org.junit.Test; import java.io.DataInputStream; http://git-wip-us.apache.org/repos/asf/flink/blob/098e46f2/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateUtilTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateUtilTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateUtilTest.java deleted file mode 100644 index d6966d0..0000000 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateUtilTest.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.state; - -import org.apache.flink.util.TestLogger; -import org.junit.Test; - -import java.util.concurrent.RunnableFuture; - -public class StateUtilTest extends TestLogger { - - /** - * Tests that {@link StateUtil#discardStateFuture} can handle state futures with null value. - */ - @Test - public void testDiscardRunnableFutureWithNullValue() throws Exception { - RunnableFuture<StateHandle<?>> stateFuture = DoneFuture.nullValue(); - StateUtil.discardStateFuture(stateFuture); - } -}