prateekm commented on a change in pull request #1164: [WIP] Transactional State 
[5/5]: Added implementations for transactional state checkpoints and restore
URL: https://github.com/apache/samza/pull/1164#discussion_r332666546
 
 

 ##########
 File path: 
samza-core/src/main/java/org/apache/samza/storage/TransactionalStateTaskRestoreManager.java
 ##########
 @@ -0,0 +1,525 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ListMultimap;
+
+import java.io.File;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import org.apache.samza.Partition;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.TaskConfig;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.job.model.TaskMode;
+import org.apache.samza.job.model.TaskModel;
+import org.apache.samza.system.ChangelogSSPIterator;
+import org.apache.samza.system.SSPMetadataCache;
+import org.apache.samza.system.SystemAdmin;
+import org.apache.samza.system.SystemAdmins;
+import org.apache.samza.system.SystemConsumer;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamPartition;
+import 
org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata;
+import org.apache.samza.util.Clock;
+import org.apache.samza.util.FileUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class implements the state restore based on state snapshots of 
checkpoints and changelog.
+ */
+public class TransactionalStateTaskRestoreManager implements 
TaskRestoreManager {
+  private static final Logger LOG = 
LoggerFactory.getLogger(TransactionalStateTaskRestoreManager.class);
+
+  private final TaskModel taskModel;
+  private final Map<String, StorageEngine> storeEngines; // store name to 
storage engines
+  private final Map<String, SystemStream> storeChangelogs; // store name to 
changelog system stream
+  private final SystemAdmins systemAdmins;
+  private final Map<String, SystemConsumer> storeConsumers;
+  private final SSPMetadataCache sspMetadataCache;
+  private final File loggedStoreBaseDirectory;
+  private final File nonLoggedStoreBaseDirectory;
+  private final Config config;
+  private final Clock clock;
+  private final StorageManagerUtil storageManagerUtil;
+  private final FileUtil fileUtil;
+
+  private StoreActions storeActions; // available after init
+
+  public TransactionalStateTaskRestoreManager(
+      TaskModel taskModel,
+      Map<String, StorageEngine> storeEngines,
+      Map<String, SystemStream> storeChangelogs,
+      SystemAdmins systemAdmins,
+      Map<String, SystemConsumer> storeConsumers,
+      SSPMetadataCache sspMetadataCache,
+      File loggedStoreBaseDirectory,
+      File nonLoggedStoreBaseDirectory,
+      Config config,
+      Clock clock) {
+    this.taskModel = taskModel;
+    this.storeEngines = storeEngines;
+    this.storeChangelogs = storeChangelogs;
+    this.systemAdmins = systemAdmins;
+    this.storeConsumers = storeConsumers;
+    // OK to use SSPMetadataCache here since unlike commit newest changelog 
ssp offsets will not change
+    // between cache init and restore completion
+    this.sspMetadataCache = sspMetadataCache;
+    this.loggedStoreBaseDirectory = loggedStoreBaseDirectory;
+    this.nonLoggedStoreBaseDirectory = nonLoggedStoreBaseDirectory;
+    this.config = config;
+    this.clock = clock;
+    this.storageManagerUtil = new StorageManagerUtil();
+    this.fileUtil = new FileUtil();
+  }
+
+  @Override
+  public void init(Map<SystemStreamPartition, String> 
checkpointedChangelogOffsets) {
+    Map<SystemStreamPartition, SystemStreamPartitionMetadata> 
currentChangelogOffsets =
+        getCurrentChangelogOffsets(taskModel, storeChangelogs, 
sspMetadataCache);
+
+    this.storeActions = getStoreActions(taskModel, storeEngines, 
storeChangelogs,
+        checkpointedChangelogOffsets, currentChangelogOffsets, systemAdmins, 
storageManagerUtil,
+        loggedStoreBaseDirectory, nonLoggedStoreBaseDirectory, config, clock);
+
+    setupStoreDirs(taskModel, storeEngines, storeActions, storageManagerUtil, 
fileUtil,
+        loggedStoreBaseDirectory, nonLoggedStoreBaseDirectory);
+    registerStartingOffsets(taskModel, storeActions, storeChangelogs, 
systemAdmins, storeConsumers, currentChangelogOffsets);
+  }
+
+  @Override
+  public void restore() {
+    Map<String, RestoreOffsets> storesToRestore = storeActions.storesToRestore;
+
+    for (Map.Entry<String, RestoreOffsets> entry : storesToRestore.entrySet()) 
{
+      String storeName = entry.getKey();
+      String endOffset = entry.getValue().endingOffset;
+      SystemStream systemStream = storeChangelogs.get(storeName);
+      SystemAdmin systemAdmin = 
systemAdmins.getSystemAdmin(systemStream.getSystem());
+      SystemConsumer systemConsumer = storeConsumers.get(storeName);
+      SystemStreamPartition changelogSSP = new 
SystemStreamPartition(systemStream, taskModel.getChangelogPartition());
+
+      ChangelogSSPIterator changelogSSPIterator =
+          new ChangelogSSPIterator(systemConsumer, changelogSSP, endOffset, 
systemAdmin, true);
+      StorageEngine taskStore = storeEngines.get(storeName);
+
+      LOG.info("Restoring store: {} for task: {}", storeName, 
taskModel.getTaskName());
+      taskStore.restore(changelogSSPIterator);
+    }
+  }
+
+  /**
+   * Stop only persistent stores. In case of certain stores and store mode 
(such as RocksDB), this
+   * can invoke compaction. Persisted stores are recreated in read-write mode 
in {@link ContainerStorageManager}.
+   */
+  public void stopPersistentStores() {
+    TaskName taskName = taskModel.getTaskName();
+    storeEngines.forEach((storeName, storeEngine) -> {
+        if (storeEngine.getStoreProperties().isPersistedToDisk())
+          storeEngine.stop();
+        LOG.info("Stopped persistent store: {} in task: {}", storeName, 
taskName);
+      });
+  }
+
+  /**
+   * Get offset metadata for each changelog SSP for this task. A task may have 
multiple changelog streams
+   * (e.g., for different stores), but will have the same partition for all of 
them.
+   */
+  @VisibleForTesting
+  static Map<SystemStreamPartition, SystemStreamPartitionMetadata> 
getCurrentChangelogOffsets(
+      TaskModel taskModel, Map<String, SystemStream> storeChangelogs, 
SSPMetadataCache sspMetadataCache) {
+    Map<SystemStreamPartition, SystemStreamPartitionMetadata> changelogOffsets 
= new HashMap<>();
+
+    Partition changelogPartition = taskModel.getChangelogPartition();
+    for (Map.Entry<String, SystemStream> storeChangelog : 
storeChangelogs.entrySet()) {
+      SystemStream changelog = storeChangelog.getValue();
+      SystemStreamPartition changelogSSP = new SystemStreamPartition(
+          changelog.getSystem(), changelog.getStream(), changelogPartition);
+      SystemStreamPartitionMetadata metadata = 
sspMetadataCache.getMetadata(changelogSSP);
+      changelogOffsets.put(changelogSSP, metadata);
+    }
+
+    LOG.info("Got current changelog offsets for taskName: {} as: {}", 
taskModel.getTaskName(), changelogOffsets);
+    return changelogOffsets;
+  }
+
+  /**
+   * Marks each persistent but non-logged store for deletion.
+   *
+   * For each logged store, based on the current, checkpointed and local 
changelog offsets,
+   * 1. decides which directories (current and checkpoints) to delete for 
persistent stores.
+   * 2. decides which directories (checkpoints) to retain for persistent 
stores.
+   * 3. decides which stores (persistent or not) need to be restored, and the 
beginning and end offsets for the restore.
+   *
+   * When this method returns, in StoreActions,
+   * 1. all persistent store current directories will be present in 
storeDirsToDelete
+   * 2. each persistent store checkpoint directory will be present in either 
storeDirToRetain or storeDirsToDelete.
+   * 3. there will be at most one storeDirToRetain per persistent store, which 
will be a checkpoint directory.
+   * 4. any stores (persistent or not) that need to be restored from 
changelogs will be present in
+   *    storesToRestore with appropriate offsets.
+   */
+  @VisibleForTesting
+  static StoreActions getStoreActions(
+      TaskModel taskModel,
+      Map<String, StorageEngine> storeEngines,
+      Map<String, SystemStream> storeChangelogs,
+      Map<SystemStreamPartition, String> checkpointedChangelogOffsets,
+      Map<SystemStreamPartition, SystemStreamPartitionMetadata> 
currentChangelogOffsets,
+      SystemAdmins systemAdmins,
+      StorageManagerUtil storageManagerUtil,
+      File loggedStoreBaseDirectory,
+      File nonLoggedStoreBaseDirectory,
+      Config config,
+      Clock clock) {
+    TaskName taskName = taskModel.getTaskName();
+    TaskMode taskMode = taskModel.getTaskMode();
+
+    Map<String, File> storeDirToRetain = new HashMap<>();
+    ListMultimap<String, File> storeDirsToDelete = ArrayListMultimap.create();
+    Map<String, RestoreOffsets> storesToRestore = new HashMap<>();
+
+    storeEngines.forEach((storeName, storageEngine) -> {
+        // do nothing if store is non persistent and not logged (e.g. in 
memory cache only)
+        if (!storageEngine.getStoreProperties().isPersistedToDisk() &&
+          !storageEngine.getStoreProperties().isLoggedStore()) {
+          return;
+        }
+
+        // persistent but non-logged stores are always deleted
+        if (storageEngine.getStoreProperties().isPersistedToDisk() &&
+            !storageEngine.getStoreProperties().isLoggedStore()) {
+          File currentDir = storageManagerUtil.getTaskStoreDir(
+              nonLoggedStoreBaseDirectory, storeName, taskName, taskMode);
+          storeDirsToDelete.put(storeName, currentDir);
+          // persistent but non-logged stores should not have checkpoint dirs
+          return;
+        }
+
+        // get the oldest and newest current changelog SSP offsets as well as 
the checkpointed changelog SSP offset
+        SystemStream changelog = storeChangelogs.get(storeName);
+        SystemStreamPartition changelogSSP = new 
SystemStreamPartition(changelog, taskModel.getChangelogPartition());
+        SystemAdmin admin = 
systemAdmins.getSystemAdmin(changelogSSP.getSystem());
+        SystemStreamPartitionMetadata changelogSSPMetadata = 
currentChangelogOffsets.get(changelogSSP);
+        String oldestOffset = changelogSSPMetadata.getOldestOffset();
+        String newestOffset = changelogSSPMetadata.getNewestOffset();
+        String checkpointedOffset = 
checkpointedChangelogOffsets.get(changelogSSP);
+
+
+        Optional<File> currentDirOptional;
+        Optional<List<File>> checkpointDirsOptional;
+
+        if (!storageEngine.getStoreProperties().isPersistedToDisk()) {
+          currentDirOptional = Optional.empty();
+          checkpointDirsOptional = Optional.empty();
+        } else {
+          currentDirOptional = Optional.of(storageManagerUtil.getTaskStoreDir(
+              loggedStoreBaseDirectory, storeName, taskName, taskMode));
+          checkpointDirsOptional = 
Optional.of(storageManagerUtil.getTaskStoreCheckpointDirs(
+              loggedStoreBaseDirectory, storeName, taskName, taskMode));
+        }
+
+        LOG.info("For store: {} in task: {} got current dir: {}, checkpoint 
dirs: {}, checkpointed changelog offset: {}",
+            storeName, taskName, currentDirOptional, checkpointDirsOptional, 
checkpointedOffset);
+
+        // TODO BLOCKER pmaheshw: will do full restore from changelog even if 
retain existing state == true
+        // always delete current logged store dir for persistent stores.
+        currentDirOptional.ifPresent(currentDir -> 
storeDirsToDelete.put(storeName, currentDir));
+
+        // first check if checkpointed offset is invalid (i.e., out of range 
of current offsets, or null)
+        if (checkpointedOffset == null && oldestOffset != null) {
+          // this can mean that either this is the initial migration for this 
feature and there are no previously
+          // checkpointed changelog offsets, or that this is a new store or 
changelog topic after the initial migration.
+
+          // if this is the first time migration, it might be desirable to 
retain existing data.
+          // if this is new store or topic, it's possible that the container 
previously died after writing some data to
+          // the changelog but before a commit, so it's desirable to delete 
the store, not restore anything and
+          // trim the changelog
+
+          // since we can't easily tell the difference b/w the two scenarios 
by just looking at the store and changelogs,
+          // we'll request users to indicate whether to retain existing data 
using a config flag. this flag should only
+          // be set during migrations, and turned off after the first 
successful commit of the new container (i.e. next
+          // deploy). for simplicity, we'll always delete the local store, and 
restore from changelog if necessary.
+
+          checkpointDirsOptional.ifPresent(checkpointDirs ->
+              checkpointDirs.forEach(checkpointDir -> 
storeDirsToDelete.put(storeName, checkpointDir)));
+
+          if (new 
TaskConfig(config).getTransactionalStateRetainExistingState()) {
+            // mark for restore from (oldest, newest) to recreate local state.
+            LOG.warn("Checkpointed offset for store: {} in task: {} is null. 
Since retain existing state is true, " +
+                "local state will be restored from current changelog contents. 
" +
+                "There is no transactional local state guarantee.", storeName, 
taskName);
+            storesToRestore.put(storeName, new RestoreOffsets(oldestOffset, 
newestOffset));
+          } else {
+            LOG.warn("Checkpointed offset for store: {} in task: {} is null. 
Since retain existing state is false, " +
+                "any local state and changelog topic contents will be 
cleared", storeName, taskName);
+            // mark for restore from (oldest, null) to trim entire changelog.
+            storesToRestore.put(storeName, new RestoreOffsets(oldestOffset, 
null));
+          }
+        } else if (// check if the checkpointed offset is in range of current 
oldest and newest offsets
+            admin.offsetComparator(oldestOffset, checkpointedOffset) > 0 ||
+            admin.offsetComparator(checkpointedOffset, newestOffset) > 0) {
+          // checkpointed offset is out of range. this could mean that this is 
a TTL topic and the checkpointed
+          // offset was TTLd, or that the changelog topic was manually deleted 
and then recreated.
+          // we cannot guarantee transactional state for TTL stores, so delete 
everything and do a full restore
+          // for local store. if the topic was deleted and recreated, this 
will have the side effect of
+          // clearing the store as well.
+          LOG.warn("Checkpointed offset: {} for store: {} in task: {} is out 
of range of oldest: {} or newest: {} offset." +
+                  "Deleting existing store and restoring from changelog topic 
from oldest to newest offset. If the topic " +
+                  "has time-based retention, there is no transactional local 
state guarantees. If the topic was changed," +
+                  "local state will be cleaned up and fully restored to match 
the new topic contents.",
+              checkpointedOffset, storeName, taskName, oldestOffset, 
newestOffset);
+          checkpointDirsOptional.ifPresent(checkpointDirs ->
+              checkpointDirs.forEach(checkpointDir -> 
storeDirsToDelete.put(storeName, checkpointDir)));
+          storesToRestore.put(storeName, new RestoreOffsets(oldestOffset, 
newestOffset));
+        } else { // happy path. checkpointed offset is in range of current 
oldest and newest offsets
+          if (!checkpointDirsOptional.isPresent()) { // non-persistent logged 
store
+            storesToRestore.put(storeName, new RestoreOffsets(oldestOffset, 
checkpointedOffset));
+          } else { // persistent logged store
+            // if there exists a valid store checkpoint directory with oldest 
offset <= local offset <= checkpointed offset,
+            // retain it and restore the delta. delete all other checkpoint 
directories for the store. if more than one such
+            // checkpoint directory exists, retain the one with the highest 
local offset and delete the rest.
+            boolean hasValidCheckpointDir = false;
+            for (File checkpointDir: checkpointDirsOptional.get()) {
+              // TODO BLOCKER pmaheshw: should validation check / warn for 
compact lag config staleness too?
+              if (storageManagerUtil.isLoggedStoreValid(
+                  storeName, checkpointDir, config, storeChangelogs, 
taskModel, clock, storeEngines)) {
+                String localOffset = storageManagerUtil.readOffsetFile(
+                    checkpointDir, Collections.singleton(changelogSSP), 
false).get(changelogSSP);
+                LOG.info("Read local offset: {} for store: {} checkpoint dir: 
{} in task: {}", localOffset, storeName,
+                    checkpointDir, taskName);
+
+                if (admin.offsetComparator(localOffset, oldestOffset) >= 0 &&
+                    admin.offsetComparator(localOffset, checkpointedOffset) <= 
0 &&
+                    (storesToRestore.get(storeName) == null ||
+                        admin.offsetComparator(localOffset, 
storesToRestore.get(storeName).startingOffset) > 0)) {
+                  hasValidCheckpointDir = true;
+                  storeDirToRetain.put(storeName, checkpointDir);
+                  LOG.info("Temporarily retaining checkpoint dir: {}", 
checkpointDir);
+                  // mark for restore even if local == checkpointed, so that 
the changelog gets trimmed.
+                  storesToRestore.put(storeName, new 
RestoreOffsets(localOffset, checkpointedOffset));
+                }
+              }
+            }
+
+            // delete all non-retained checkpoint directories
+            for (File checkpointDir: checkpointDirsOptional.get()) {
+              if (storeDirToRetain.get(storeName) == null ||
+                  !storeDirToRetain.get(storeName).equals(checkpointDir)) {
+                storeDirsToDelete.put(storeName, checkpointDir);
+              }
+            }
+
+            // if the store had not valid checkpoint dirs to retain, restore 
from changelog
+            if (!hasValidCheckpointDir) {
 
 Review comment:
   As discussed offline, we will make the following changes:
   1. In the next release, stores will create the checkpoint directories and 
write the changelog offset in the checkpoint regardless of whether the 
transactional state feature is turned on or not.
   2. We will recommend that users first upgrade and deploy the new version 
with the feature turned off, and then after some time (at least one commit), 
turn the feature on.
   This way we will have a checkpointed changelog offset + a store checkpoint 
directory available when the feature is turned on.
   3. We will backport the change to handle null changelog offsets in the 
checkpoint (in CheckpointSerde) to version 1.2, to support rolling back to a 
previous version.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to