dxichen commented on a change in pull request #1490:
URL: https://github.com/apache/samza/pull/1490#discussion_r625502173



##########
File path: 
samza-core/src/main/java/org/apache/samza/storage/TaskStorageCommitManager.java
##########
@@ -0,0 +1,355 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.File;
+import java.io.FileFilter;
+import java.io.IOException;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.filefilter.WildcardFileFilter;
+import org.apache.samza.Partition;
+import org.apache.samza.SamzaException;
+import org.apache.samza.checkpoint.Checkpoint;
+import org.apache.samza.checkpoint.CheckpointId;
+import org.apache.samza.checkpoint.CheckpointManager;
+import org.apache.samza.checkpoint.CheckpointV1;
+import org.apache.samza.checkpoint.CheckpointV2;
+import org.apache.samza.checkpoint.kafka.KafkaChangelogSSPOffset;
+import org.apache.samza.config.Config;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.job.model.TaskMode;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.FutureUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Handles the commit of the state stores of the task.
+ */
+public class TaskStorageCommitManager {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(TaskStorageCommitManager.class);
+
+  private final TaskName taskName;
+  private final CheckpointManager checkpointManager;
+  private final ContainerStorageManager containerStorageManager;
+  private final Map<String, TaskBackupManager> stateBackendToBackupManager;
+  private final Partition taskChangelogPartition;
+  private final StorageManagerUtil storageManagerUtil;
+  private final ExecutorService backupExecutor;
+  private final File durableStoreBaseDir;
+  private final Map<String, SystemStream> storeChangelogs;
+
+  // Available after init(), since stores are created by 
ContainerStorageManager#start()
+  private Map<String, StorageEngine> storageEngines;
+
+  public TaskStorageCommitManager(TaskName taskName, Map<String, 
TaskBackupManager> stateBackendToBackupManager,
+      ContainerStorageManager containerStorageManager, Map<String, 
SystemStream> storeChangelogs, Partition changelogPartition,
+      CheckpointManager checkpointManager, Config config, ExecutorService 
backupExecutor,
+      StorageManagerUtil storageManagerUtil, File durableStoreBaseDir) {
+    this.taskName = taskName;
+    this.containerStorageManager = containerStorageManager;
+    this.stateBackendToBackupManager = stateBackendToBackupManager;
+    this.taskChangelogPartition = changelogPartition;
+    this.checkpointManager = checkpointManager;
+    this.backupExecutor = backupExecutor;
+    this.durableStoreBaseDir = durableStoreBaseDir;
+    this.storeChangelogs = storeChangelogs;
+    this.storageManagerUtil = storageManagerUtil;
+  }
+
+  public void init() {
+    // Assuming that container storage manager has already started and created 
to stores
+    storageEngines = containerStorageManager.getAllStores(taskName);
+    if (checkpointManager != null) {
+      Checkpoint checkpoint = checkpointManager.readLastCheckpoint(taskName);
+      LOG.debug("Last checkpoint on start for task: {} is: {}", taskName, 
checkpoint);
+      stateBackendToBackupManager.values()
+          .forEach(storageBackupManager -> 
storageBackupManager.init(checkpoint));
+    } else {
+      stateBackendToBackupManager.values()
+          .forEach(storageBackupManager -> storageBackupManager.init(null));
+    }
+  }
+
+  /**
+   * Synchronously captures the current state of the stores in order to 
persist it to the backup manager
+   * in the async {@link #upload(CheckpointId, Map)} phase. Returns a map of 
state backend factory name to
+   * a map of store name to state checkpoint markers for all configured state 
backends and stores.
+   *
+   * @param checkpointId {@link CheckpointId} of the current commit
+   * @return a map of state backend factory name to a map of store name to 
state checkpoint markers
+   */
+  public Map<String, Map<String, String>> snapshot(CheckpointId checkpointId) {
+    // Flush all stores
+    storageEngines.values().forEach(StorageEngine::flush);
+
+    // Checkpoint all persisted and durable stores
+    storageEngines.forEach((storeName, storageEngine) -> {
+      if (storageEngine.getStoreProperties().isPersistedToDisk() &&
+          storageEngine.getStoreProperties().isDurableStore()) {
+        storageEngine.checkpoint(checkpointId);
+      }
+    });
+
+    // state backend factory -> store Name -> state checkpoint marker
+    Map<String, Map<String, String>> stateBackendToStoreSCMs = new HashMap<>();
+
+    // for each configured state backend factory, backup the state for all 
stores in this task.
+    stateBackendToBackupManager.forEach((stateBackendFactoryName, 
backupManager) -> {
+      Map<String, String> snapshotSCMs = backupManager.snapshot(checkpointId);
+      LOG.debug("Created snapshot for taskName: {}, checkpoint id: {}, state 
backend: {}. Snapshot SCMs: {}",
+          taskName, checkpointId, stateBackendFactoryName, snapshotSCMs);
+      stateBackendToStoreSCMs.put(stateBackendFactoryName, snapshotSCMs);
+    });

Review comment:
       We need to backup all the stores that are durable (or logged) and write 
as a checkpoint (to file) all the state that are persisted. Similarly, 
changelog enabled stores are persisted to disk only if they are persisted (ie 
non inmem), which should perserve the existing behavior.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to