dxichen commented on a change in pull request #1490: URL: https://github.com/apache/samza/pull/1490#discussion_r625502173
########## File path: samza-core/src/main/java/org/apache/samza/storage/TaskStorageCommitManager.java ########## @@ -0,0 +1,355 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.storage; + +import com.google.common.annotations.VisibleForTesting; +import java.io.File; +import java.io.FileFilter; +import java.io.IOException; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import org.apache.commons.io.FileUtils; +import org.apache.commons.io.filefilter.WildcardFileFilter; +import org.apache.samza.Partition; +import org.apache.samza.SamzaException; +import org.apache.samza.checkpoint.Checkpoint; +import org.apache.samza.checkpoint.CheckpointId; +import org.apache.samza.checkpoint.CheckpointManager; +import org.apache.samza.checkpoint.CheckpointV1; +import org.apache.samza.checkpoint.CheckpointV2; +import org.apache.samza.checkpoint.kafka.KafkaChangelogSSPOffset; +import org.apache.samza.config.Config; +import org.apache.samza.container.TaskName; +import org.apache.samza.job.model.TaskMode; +import org.apache.samza.system.SystemStream; +import org.apache.samza.system.SystemStreamPartition; +import org.apache.samza.util.FutureUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Handles the commit of the state stores of the task. + */ +public class TaskStorageCommitManager { + + private static final Logger LOG = LoggerFactory.getLogger(TaskStorageCommitManager.class); + + private final TaskName taskName; + private final CheckpointManager checkpointManager; + private final ContainerStorageManager containerStorageManager; + private final Map<String, TaskBackupManager> stateBackendToBackupManager; + private final Partition taskChangelogPartition; + private final StorageManagerUtil storageManagerUtil; + private final ExecutorService backupExecutor; + private final File durableStoreBaseDir; + private final Map<String, SystemStream> storeChangelogs; + + // Available after init(), since stores are created by ContainerStorageManager#start() + private Map<String, StorageEngine> storageEngines; + + public TaskStorageCommitManager(TaskName taskName, Map<String, TaskBackupManager> stateBackendToBackupManager, + ContainerStorageManager containerStorageManager, Map<String, SystemStream> storeChangelogs, Partition changelogPartition, + CheckpointManager checkpointManager, Config config, ExecutorService backupExecutor, + StorageManagerUtil storageManagerUtil, File durableStoreBaseDir) { + this.taskName = taskName; + this.containerStorageManager = containerStorageManager; + this.stateBackendToBackupManager = stateBackendToBackupManager; + this.taskChangelogPartition = changelogPartition; + this.checkpointManager = checkpointManager; + this.backupExecutor = backupExecutor; + this.durableStoreBaseDir = durableStoreBaseDir; + this.storeChangelogs = storeChangelogs; + this.storageManagerUtil = storageManagerUtil; + } + + public void init() { + // Assuming that container storage manager has already started and created to stores + storageEngines = containerStorageManager.getAllStores(taskName); + if (checkpointManager != null) { + Checkpoint checkpoint = checkpointManager.readLastCheckpoint(taskName); + LOG.debug("Last checkpoint on start for task: {} is: {}", taskName, checkpoint); + stateBackendToBackupManager.values() + .forEach(storageBackupManager -> storageBackupManager.init(checkpoint)); + } else { + stateBackendToBackupManager.values() + .forEach(storageBackupManager -> storageBackupManager.init(null)); + } + } + + /** + * Synchronously captures the current state of the stores in order to persist it to the backup manager + * in the async {@link #upload(CheckpointId, Map)} phase. Returns a map of state backend factory name to + * a map of store name to state checkpoint markers for all configured state backends and stores. + * + * @param checkpointId {@link CheckpointId} of the current commit + * @return a map of state backend factory name to a map of store name to state checkpoint markers + */ + public Map<String, Map<String, String>> snapshot(CheckpointId checkpointId) { + // Flush all stores + storageEngines.values().forEach(StorageEngine::flush); + + // Checkpoint all persisted and durable stores + storageEngines.forEach((storeName, storageEngine) -> { + if (storageEngine.getStoreProperties().isPersistedToDisk() && + storageEngine.getStoreProperties().isDurableStore()) { + storageEngine.checkpoint(checkpointId); + } + }); + + // state backend factory -> store Name -> state checkpoint marker + Map<String, Map<String, String>> stateBackendToStoreSCMs = new HashMap<>(); + + // for each configured state backend factory, backup the state for all stores in this task. + stateBackendToBackupManager.forEach((stateBackendFactoryName, backupManager) -> { + Map<String, String> snapshotSCMs = backupManager.snapshot(checkpointId); + LOG.debug("Created snapshot for taskName: {}, checkpoint id: {}, state backend: {}. Snapshot SCMs: {}", + taskName, checkpointId, stateBackendFactoryName, snapshotSCMs); + stateBackendToStoreSCMs.put(stateBackendFactoryName, snapshotSCMs); + }); Review comment: We need to backup all the stores that are durable (or logged) and write as a checkpoint (to file) all the state that are persisted. Similarly, changelog enabled stores are persisted to disk only if they are persisted (ie non inmem), which should perserve the existing behavior. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
