prateekm commented on a change in pull request #1501: URL: https://github.com/apache/samza/pull/1501#discussion_r632847404
########## File path: samza-core/src/main/java/org/apache/samza/storage/blobstore/util/BlobStoreUtil.java ########## @@ -0,0 +1,721 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.storage.blobstore.util; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import java.util.Optional; +import org.apache.samza.storage.blobstore.BlobStoreManager; +import org.apache.samza.storage.blobstore.metrics.BlobStoreBackupManagerMetrics; +import org.apache.samza.storage.blobstore.Metadata; +import org.apache.samza.storage.blobstore.diff.DirDiff; +import org.apache.samza.storage.blobstore.exceptions.RetriableException; +import org.apache.samza.storage.blobstore.index.DirIndex; +import org.apache.samza.storage.blobstore.index.FileBlob; +import org.apache.samza.storage.blobstore.index.FileIndex; +import org.apache.samza.storage.blobstore.index.FileMetadata; +import org.apache.samza.storage.blobstore.index.SnapshotIndex; +import org.apache.samza.storage.blobstore.index.SnapshotMetadata; +import org.apache.samza.storage.blobstore.index.serde.SnapshotIndexSerde; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.nio.file.attribute.PosixFileAttributes; +import java.nio.file.attribute.PosixFilePermissions; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.ExecutorService; +import java.util.function.BiPredicate; +import java.util.function.Predicate; +import java.util.function.Supplier; +import java.util.stream.Collectors; +import java.util.zip.CRC32; +import java.util.zip.CheckedInputStream; +import org.apache.commons.lang3.StringUtils; +import org.apache.samza.SamzaException; +import org.apache.samza.storage.blobstore.metrics.BlobStoreRestoreManagerMetrics; +import org.apache.samza.util.FutureUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Helper methods to interact with remote blob store service and GET/PUT/DELETE a + * {@link SnapshotIndex} or {@link DirDiff}. + */ +public class BlobStoreUtil { + private static final Logger LOG = LoggerFactory.getLogger(BlobStoreUtil.class); + + private final SnapshotIndexSerde snapshotIndexSerde = new SnapshotIndexSerde(); + private final BlobStoreManager blobStoreManager; + private final ExecutorService executor; + private final BlobStoreBackupManagerMetrics backupMetrics; + private final BlobStoreRestoreManagerMetrics restoreMetrics; + + public BlobStoreUtil(BlobStoreManager blobStoreManager, ExecutorService executor, + BlobStoreBackupManagerMetrics backupMetrics, BlobStoreRestoreManagerMetrics restoreMetrics) { + this.blobStoreManager = blobStoreManager; + this.executor = executor; + this.backupMetrics = backupMetrics; + this.restoreMetrics = restoreMetrics; + } + + /** + * Recursively upload all new files and upload or update contents of all subdirs in the {@link DirDiff} and return a + * Future containing the {@link DirIndex} associated with the directory. + * @param dirDiff diff for the contents of this directory + * @return A future with the {@link DirIndex} if the upload completed successfully. + */ + public CompletionStage<DirIndex> putDir(DirDiff dirDiff, SnapshotMetadata snapshotMetadata) { + // Upload all new files in the dir + List<File> filesToUpload = dirDiff.getFilesAdded(); + List<CompletionStage<FileIndex>> fileFutures = filesToUpload.stream() + .map(file -> putFile(file, snapshotMetadata)) + .collect(Collectors.toList()); + + CompletableFuture<Void> allFilesFuture = + CompletableFuture.allOf(fileFutures.toArray(new CompletableFuture[0])); + + List<CompletionStage<DirIndex>> subDirFutures = new ArrayList<>(); + // recursively upload all new subdirs of this dir + for (DirDiff subDirAdded: dirDiff.getSubDirsAdded()) { + subDirFutures.add(putDir(subDirAdded, snapshotMetadata)); + } + // recursively update contents of all subdirs that are retained but might have been modified + for (DirDiff subDirRetained: dirDiff.getSubDirsRetained()) { + subDirFutures.add(putDir(subDirRetained, snapshotMetadata)); + } + CompletableFuture<Void> allDirBlobsFuture = + CompletableFuture.allOf(subDirFutures.toArray(new CompletableFuture[0])); + + // TODO LOW shesharm can we cancel other uploads if any one of them fails? + // Check with Ambry: if client closes, what happens to inflight uploads + return CompletableFuture.allOf(allDirBlobsFuture, allFilesFuture) + .thenApplyAsync(f -> { + LOG.trace("All file and dir uploads complete for task: {} store: {}", + snapshotMetadata.getTaskName(), snapshotMetadata.getStoreName()); + List<FileIndex> filesPresent = fileFutures.stream() + .map(blob -> blob.toCompletableFuture().join()) + .collect(Collectors.toList()); + + filesPresent.addAll(dirDiff.getFilesRetained()); + + List<DirIndex> subDirsPresent = subDirFutures.stream() + .map(subDir -> subDir.toCompletableFuture().join()) + .collect(Collectors.toList()); + + LOG.debug("Uploaded diff for task: {} store: {} with statistics: {}", + snapshotMetadata.getTaskName(), snapshotMetadata.getStoreName(), DirDiff.getStats(dirDiff)); + + LOG.trace("Returning new DirIndex for task: {} store: {}", + snapshotMetadata.getTaskName(), snapshotMetadata.getStoreName()); + return new DirIndex(dirDiff.getDirName(), + filesPresent, + dirDiff.getFilesRemoved(), + subDirsPresent, + dirDiff.getSubDirsRemoved()); + }, executor); + } + + /** + * PUTs the {@link SnapshotIndex} to the blob store. + * @param snapshotIndex SnapshotIndex to put. + * @return a Future containing the blob ID of the {@link SnapshotIndex}. + */ + public CompletableFuture<String> putSnapshotIndex(SnapshotIndex snapshotIndex) { + byte[] bytes = snapshotIndexSerde.toBytes(snapshotIndex); + String opName = "putSnapshotIndex for checkpointId " + snapshotIndex.getSnapshotMetadata().getCheckpointId(); Review comment: Nitpick: "... for checkpointId: " (colon at end) ########## File path: samza-core/src/main/java/org/apache/samza/storage/blobstore/util/BlobStoreUtil.java ########## @@ -0,0 +1,721 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.storage.blobstore.util; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import java.util.Optional; +import org.apache.samza.storage.blobstore.BlobStoreManager; +import org.apache.samza.storage.blobstore.metrics.BlobStoreBackupManagerMetrics; +import org.apache.samza.storage.blobstore.Metadata; +import org.apache.samza.storage.blobstore.diff.DirDiff; +import org.apache.samza.storage.blobstore.exceptions.RetriableException; +import org.apache.samza.storage.blobstore.index.DirIndex; +import org.apache.samza.storage.blobstore.index.FileBlob; +import org.apache.samza.storage.blobstore.index.FileIndex; +import org.apache.samza.storage.blobstore.index.FileMetadata; +import org.apache.samza.storage.blobstore.index.SnapshotIndex; +import org.apache.samza.storage.blobstore.index.SnapshotMetadata; +import org.apache.samza.storage.blobstore.index.serde.SnapshotIndexSerde; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.nio.file.attribute.PosixFileAttributes; +import java.nio.file.attribute.PosixFilePermissions; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.ExecutorService; +import java.util.function.BiPredicate; +import java.util.function.Predicate; +import java.util.function.Supplier; +import java.util.stream.Collectors; +import java.util.zip.CRC32; +import java.util.zip.CheckedInputStream; +import org.apache.commons.lang3.StringUtils; +import org.apache.samza.SamzaException; +import org.apache.samza.storage.blobstore.metrics.BlobStoreRestoreManagerMetrics; +import org.apache.samza.util.FutureUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Helper methods to interact with remote blob store service and GET/PUT/DELETE a + * {@link SnapshotIndex} or {@link DirDiff}. + */ +public class BlobStoreUtil { + private static final Logger LOG = LoggerFactory.getLogger(BlobStoreUtil.class); + + private final SnapshotIndexSerde snapshotIndexSerde = new SnapshotIndexSerde(); + private final BlobStoreManager blobStoreManager; + private final ExecutorService executor; + private final BlobStoreBackupManagerMetrics backupMetrics; + private final BlobStoreRestoreManagerMetrics restoreMetrics; + + public BlobStoreUtil(BlobStoreManager blobStoreManager, ExecutorService executor, + BlobStoreBackupManagerMetrics backupMetrics, BlobStoreRestoreManagerMetrics restoreMetrics) { + this.blobStoreManager = blobStoreManager; + this.executor = executor; + this.backupMetrics = backupMetrics; + this.restoreMetrics = restoreMetrics; + } + + /** + * Recursively upload all new files and upload or update contents of all subdirs in the {@link DirDiff} and return a + * Future containing the {@link DirIndex} associated with the directory. + * @param dirDiff diff for the contents of this directory + * @return A future with the {@link DirIndex} if the upload completed successfully. + */ + public CompletionStage<DirIndex> putDir(DirDiff dirDiff, SnapshotMetadata snapshotMetadata) { + // Upload all new files in the dir + List<File> filesToUpload = dirDiff.getFilesAdded(); + List<CompletionStage<FileIndex>> fileFutures = filesToUpload.stream() + .map(file -> putFile(file, snapshotMetadata)) + .collect(Collectors.toList()); + + CompletableFuture<Void> allFilesFuture = + CompletableFuture.allOf(fileFutures.toArray(new CompletableFuture[0])); + + List<CompletionStage<DirIndex>> subDirFutures = new ArrayList<>(); + // recursively upload all new subdirs of this dir + for (DirDiff subDirAdded: dirDiff.getSubDirsAdded()) { + subDirFutures.add(putDir(subDirAdded, snapshotMetadata)); + } + // recursively update contents of all subdirs that are retained but might have been modified + for (DirDiff subDirRetained: dirDiff.getSubDirsRetained()) { + subDirFutures.add(putDir(subDirRetained, snapshotMetadata)); + } + CompletableFuture<Void> allDirBlobsFuture = + CompletableFuture.allOf(subDirFutures.toArray(new CompletableFuture[0])); + + // TODO LOW shesharm can we cancel other uploads if any one of them fails? + // Check with Ambry: if client closes, what happens to inflight uploads + return CompletableFuture.allOf(allDirBlobsFuture, allFilesFuture) + .thenApplyAsync(f -> { + LOG.trace("All file and dir uploads complete for task: {} store: {}", + snapshotMetadata.getTaskName(), snapshotMetadata.getStoreName()); + List<FileIndex> filesPresent = fileFutures.stream() + .map(blob -> blob.toCompletableFuture().join()) + .collect(Collectors.toList()); + + filesPresent.addAll(dirDiff.getFilesRetained()); + + List<DirIndex> subDirsPresent = subDirFutures.stream() + .map(subDir -> subDir.toCompletableFuture().join()) + .collect(Collectors.toList()); + + LOG.debug("Uploaded diff for task: {} store: {} with statistics: {}", + snapshotMetadata.getTaskName(), snapshotMetadata.getStoreName(), DirDiff.getStats(dirDiff)); + + LOG.trace("Returning new DirIndex for task: {} store: {}", + snapshotMetadata.getTaskName(), snapshotMetadata.getStoreName()); + return new DirIndex(dirDiff.getDirName(), + filesPresent, + dirDiff.getFilesRemoved(), + subDirsPresent, + dirDiff.getSubDirsRemoved()); + }, executor); + } + + /** + * PUTs the {@link SnapshotIndex} to the blob store. + * @param snapshotIndex SnapshotIndex to put. + * @return a Future containing the blob ID of the {@link SnapshotIndex}. + */ + public CompletableFuture<String> putSnapshotIndex(SnapshotIndex snapshotIndex) { + byte[] bytes = snapshotIndexSerde.toBytes(snapshotIndex); + String opName = "putSnapshotIndex for checkpointId " + snapshotIndex.getSnapshotMetadata().getCheckpointId(); + return FutureUtil.executeAsyncWithRetries(opName, () -> { + InputStream inputStream = new ByteArrayInputStream(bytes); // no need to close ByteArrayInputStream + SnapshotMetadata snapshotMetadata = snapshotIndex.getSnapshotMetadata(); + Metadata metadata = new Metadata(Metadata.PAYLOAD_PATH_SNAPSHOT_INDEX, Optional.of((long) bytes.length), + snapshotMetadata.getJobName(), snapshotMetadata.getJobId(), snapshotMetadata.getTaskName(), + snapshotMetadata.getStoreName()); + return blobStoreManager.put(inputStream, metadata).toCompletableFuture(); + }, isCauseNonRetriable(), executor); + } + + /** + * GETs the {@link SnapshotIndex} from the blob store. + * @param blobId blob ID of the {@link SnapshotIndex} to get + * @return a Future containing the {@link SnapshotIndex} + */ + public CompletableFuture<SnapshotIndex> getSnapshotIndex(String blobId, Metadata metadata) { + Preconditions.checkState(StringUtils.isNotBlank(blobId)); + String opName = "getSnapshotIndex: " + blobId; + return FutureUtil.executeAsyncWithRetries(opName, () -> { + ByteArrayOutputStream indexBlobStream = new ByteArrayOutputStream(); // no need to close ByteArrayOutputStream + return blobStoreManager.get(blobId, indexBlobStream, metadata).toCompletableFuture() + .thenApplyAsync(f -> snapshotIndexSerde.fromBytes(indexBlobStream.toByteArray()), executor); + }, isCauseNonRetriable(), executor); + } + + /** + * Recursively issue delete requests for files and dirs marked to be removed in a previously created remote snapshot. + * Note: We do not immediately delete files/dirs to be removed when uploading a snapshot to the remote + * store. We just track them for deletion during the upload, and delete them AFTER the snapshot is uploaded, and the + * blob IDs have been persisted as part of the checkpoint. This is to prevent data loss if a failure happens + * part way through the commit. We issue delete these file/subdirs in cleanUp() phase of commit lifecycle. + * @param dirIndex the dir in the remote snapshot to clean up. + * @param metadata Metadata related to the request + * @return a future that completes when all the files and subdirs marked for deletion are cleaned up. + */ + public CompletionStage<Void> cleanUpDir(DirIndex dirIndex, Metadata metadata) { + String dirName = dirIndex.getDirName(); + if (DirIndex.ROOT_DIR_NAME.equals(dirName)) { + LOG.debug("Cleaning up root dir in blob store."); + } else { + LOG.debug("Cleaning up dir: {} in blob store.", dirIndex.getDirName()); + } + + List<CompletionStage<Void>> cleanUpFuture = new ArrayList<>(); + List<FileIndex> files = dirIndex.getFilesRemoved(); + for (FileIndex file: files) { + Metadata requestMetadata = + new Metadata(file.getFileName(), Optional.of(file.getFileMetadata().getSize()), metadata.getJobName(), + metadata.getJobId(), metadata.getTaskName(), metadata.getStoreName()); + cleanUpFuture.add(deleteFile(file, requestMetadata)); + } + + for (DirIndex subDirToDelete : dirIndex.getSubDirsRemoved()) { + // recursively delete ALL contents of the subDirToDelete. + cleanUpFuture.add(deleteDir(subDirToDelete, metadata)); + } + + for (DirIndex subDirToRetain : dirIndex.getSubDirsPresent()) { + // recursively clean up the subDir, only deleting files and subdirs marked for deletion. + cleanUpFuture.add(cleanUpDir(subDirToRetain, metadata)); + } + + return CompletableFuture.allOf(cleanUpFuture.toArray(new CompletableFuture[0])); + } + + /** + * WARNING: This method deletes the **SnapshotIndex blob** from the snapshot. This should only be called to clean + * up an older snapshot **AFTER** all the files and sub-dirs to be deleted from this snapshot are already deleted + * using {@link #cleanUpDir(DirIndex, Metadata)} + * + * @param snapshotIndexBlobId blob ID of SnapshotIndex blob to delete + * @return a future that completes when the index blob is deleted from remote store. + */ + public CompletionStage<Void> deleteSnapshotIndexBlob(String snapshotIndexBlobId, Metadata metadata) { + Preconditions.checkState(StringUtils.isNotBlank(snapshotIndexBlobId)); + LOG.debug("Deleting SnapshotIndex blob {} from blob store", snapshotIndexBlobId); + String opName = "deleteSnapshotIndexBlob: " + snapshotIndexBlobId; + return FutureUtil.executeAsyncWithRetries(opName, () -> + blobStoreManager.delete(snapshotIndexBlobId, metadata).toCompletableFuture(), isCauseNonRetriable(), executor); + } + + /** + * Marks all the blobs associated with an {@link SnapshotIndex} to never expire. + * @param snapshotIndex {@link SnapshotIndex} of the remote snapshot + * @param metadata {@link Metadata} related to the request + * @return A future that completes when all the files and subdirs associated with this remote snapshot are marked to + * never expire. + */ + public CompletionStage<Void> removeTTL(String indexBlobId, SnapshotIndex snapshotIndex, Metadata metadata) { + SnapshotMetadata snapshotMetadata = snapshotIndex.getSnapshotMetadata(); + LOG.debug("Marking contents of SnapshotIndex: {} to never expire", snapshotMetadata.toString()); + + String opName = "removeTTL for SnapshotIndex for checkpointId: " + snapshotMetadata.getCheckpointId(); + Supplier<CompletionStage<Void>> removeDirIndexTTLAction = + () -> removeTTL(snapshotIndex.getDirIndex(), metadata).toCompletableFuture(); + CompletableFuture<Void> dirIndexTTLRemovalFuture = FutureUtil.executeAsyncWithRetries(opName, + removeDirIndexTTLAction, isCauseNonRetriable(), executor); + + return dirIndexTTLRemovalFuture.thenComposeAsync(aVoid -> { + String op2Name = "removeTTL for indexBlobId: " + indexBlobId; + Supplier<CompletionStage<Void>> removeIndexBlobTTLAction = () -> + blobStoreManager.removeTTL(indexBlobId, metadata).toCompletableFuture(); + return FutureUtil.executeAsyncWithRetries(op2Name, removeIndexBlobTTLAction, isCauseNonRetriable(), executor); + }, executor); + } + + /** + * Upload a File to blob store. + * @param file File to upload to blob store. + * @return A future containing the {@link FileIndex} for the uploaded file. + */ + @VisibleForTesting + CompletableFuture<FileIndex> putFile(File file, SnapshotMetadata snapshotMetadata) { + if (file == null || !file.isFile()) { + String message = file != null ? "Dir or Symbolic link" : "null"; + throw new SamzaException(String.format("Required a non-null parameter of type file, provided: %s", message)); + } + long putFileStartTime = System.nanoTime(); + + String opName = "putFile: " + file.getAbsolutePath(); + Supplier<CompletionStage<FileIndex>> fileUploadAction = () -> { + LOG.debug("Putting file: {} to blob store.", file.getPath()); + CompletableFuture<FileIndex> fileBlobFuture; + CheckedInputStream inputStream = null; + try { + // TODO maybe use the more efficient CRC32C / PureJavaCRC32 impl Review comment: HIGH, followup for @shekhars-li. (Update to "TODO HIGH shesharm") ########## File path: samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbOptionsHelper.java ########## @@ -36,9 +40,29 @@ public class RocksDbOptionsHelper { private static final Logger log = LoggerFactory.getLogger(RocksDbOptionsHelper.class); + public static final String ROCKSDB_WAL_ENABLED = "rocksdb.wal.enabled"; Review comment: @shekhars-li Leave a TODO HIGH for me to add these to RocksDBTableDescriptor. ########## File path: samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbOptionsHelper.java ########## @@ -36,9 +40,29 @@ public class RocksDbOptionsHelper { private static final Logger log = LoggerFactory.getLogger(RocksDbOptionsHelper.class); + public static final String ROCKSDB_WAL_ENABLED = "rocksdb.wal.enabled"; Review comment: @dxichen FYI, there's more here to review below the tests. ########## File path: samza-core/src/main/java/org/apache/samza/storage/blobstore/util/BlobStoreUtil.java ########## @@ -0,0 +1,721 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.storage.blobstore.util; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import java.util.Optional; +import org.apache.samza.storage.blobstore.BlobStoreManager; +import org.apache.samza.storage.blobstore.metrics.BlobStoreBackupManagerMetrics; +import org.apache.samza.storage.blobstore.Metadata; +import org.apache.samza.storage.blobstore.diff.DirDiff; +import org.apache.samza.storage.blobstore.exceptions.RetriableException; +import org.apache.samza.storage.blobstore.index.DirIndex; +import org.apache.samza.storage.blobstore.index.FileBlob; +import org.apache.samza.storage.blobstore.index.FileIndex; +import org.apache.samza.storage.blobstore.index.FileMetadata; +import org.apache.samza.storage.blobstore.index.SnapshotIndex; +import org.apache.samza.storage.blobstore.index.SnapshotMetadata; +import org.apache.samza.storage.blobstore.index.serde.SnapshotIndexSerde; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.nio.file.attribute.PosixFileAttributes; +import java.nio.file.attribute.PosixFilePermissions; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.ExecutorService; +import java.util.function.BiPredicate; +import java.util.function.Predicate; +import java.util.function.Supplier; +import java.util.stream.Collectors; +import java.util.zip.CRC32; +import java.util.zip.CheckedInputStream; +import org.apache.commons.lang3.StringUtils; +import org.apache.samza.SamzaException; +import org.apache.samza.storage.blobstore.metrics.BlobStoreRestoreManagerMetrics; +import org.apache.samza.util.FutureUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Helper methods to interact with remote blob store service and GET/PUT/DELETE a + * {@link SnapshotIndex} or {@link DirDiff}. + */ +public class BlobStoreUtil { + private static final Logger LOG = LoggerFactory.getLogger(BlobStoreUtil.class); + + private final SnapshotIndexSerde snapshotIndexSerde = new SnapshotIndexSerde(); + private final BlobStoreManager blobStoreManager; + private final ExecutorService executor; + private final BlobStoreBackupManagerMetrics backupMetrics; + private final BlobStoreRestoreManagerMetrics restoreMetrics; + + public BlobStoreUtil(BlobStoreManager blobStoreManager, ExecutorService executor, + BlobStoreBackupManagerMetrics backupMetrics, BlobStoreRestoreManagerMetrics restoreMetrics) { + this.blobStoreManager = blobStoreManager; + this.executor = executor; + this.backupMetrics = backupMetrics; + this.restoreMetrics = restoreMetrics; + } + + /** + * Recursively upload all new files and upload or update contents of all subdirs in the {@link DirDiff} and return a + * Future containing the {@link DirIndex} associated with the directory. + * @param dirDiff diff for the contents of this directory + * @return A future with the {@link DirIndex} if the upload completed successfully. + */ + public CompletionStage<DirIndex> putDir(DirDiff dirDiff, SnapshotMetadata snapshotMetadata) { + // Upload all new files in the dir + List<File> filesToUpload = dirDiff.getFilesAdded(); + List<CompletionStage<FileIndex>> fileFutures = filesToUpload.stream() + .map(file -> putFile(file, snapshotMetadata)) + .collect(Collectors.toList()); + + CompletableFuture<Void> allFilesFuture = + CompletableFuture.allOf(fileFutures.toArray(new CompletableFuture[0])); + + List<CompletionStage<DirIndex>> subDirFutures = new ArrayList<>(); + // recursively upload all new subdirs of this dir + for (DirDiff subDirAdded: dirDiff.getSubDirsAdded()) { + subDirFutures.add(putDir(subDirAdded, snapshotMetadata)); + } + // recursively update contents of all subdirs that are retained but might have been modified + for (DirDiff subDirRetained: dirDiff.getSubDirsRetained()) { + subDirFutures.add(putDir(subDirRetained, snapshotMetadata)); + } + CompletableFuture<Void> allDirBlobsFuture = + CompletableFuture.allOf(subDirFutures.toArray(new CompletableFuture[0])); + + // TODO LOW shesharm can we cancel other uploads if any one of them fails? Review comment: Delete these. ########## File path: samza-core/src/main/java/org/apache/samza/storage/blobstore/util/BlobStoreUtil.java ########## @@ -0,0 +1,721 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.storage.blobstore.util; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import java.util.Optional; +import org.apache.samza.storage.blobstore.BlobStoreManager; +import org.apache.samza.storage.blobstore.metrics.BlobStoreBackupManagerMetrics; +import org.apache.samza.storage.blobstore.Metadata; +import org.apache.samza.storage.blobstore.diff.DirDiff; +import org.apache.samza.storage.blobstore.exceptions.RetriableException; +import org.apache.samza.storage.blobstore.index.DirIndex; +import org.apache.samza.storage.blobstore.index.FileBlob; +import org.apache.samza.storage.blobstore.index.FileIndex; +import org.apache.samza.storage.blobstore.index.FileMetadata; +import org.apache.samza.storage.blobstore.index.SnapshotIndex; +import org.apache.samza.storage.blobstore.index.SnapshotMetadata; +import org.apache.samza.storage.blobstore.index.serde.SnapshotIndexSerde; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.nio.file.attribute.PosixFileAttributes; +import java.nio.file.attribute.PosixFilePermissions; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.ExecutorService; +import java.util.function.BiPredicate; +import java.util.function.Predicate; +import java.util.function.Supplier; +import java.util.stream.Collectors; +import java.util.zip.CRC32; +import java.util.zip.CheckedInputStream; +import org.apache.commons.lang3.StringUtils; +import org.apache.samza.SamzaException; +import org.apache.samza.storage.blobstore.metrics.BlobStoreRestoreManagerMetrics; +import org.apache.samza.util.FutureUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Helper methods to interact with remote blob store service and GET/PUT/DELETE a + * {@link SnapshotIndex} or {@link DirDiff}. + */ +public class BlobStoreUtil { + private static final Logger LOG = LoggerFactory.getLogger(BlobStoreUtil.class); + + private final SnapshotIndexSerde snapshotIndexSerde = new SnapshotIndexSerde(); + private final BlobStoreManager blobStoreManager; + private final ExecutorService executor; + private final BlobStoreBackupManagerMetrics backupMetrics; + private final BlobStoreRestoreManagerMetrics restoreMetrics; + + public BlobStoreUtil(BlobStoreManager blobStoreManager, ExecutorService executor, + BlobStoreBackupManagerMetrics backupMetrics, BlobStoreRestoreManagerMetrics restoreMetrics) { + this.blobStoreManager = blobStoreManager; + this.executor = executor; + this.backupMetrics = backupMetrics; + this.restoreMetrics = restoreMetrics; + } + + /** + * Recursively upload all new files and upload or update contents of all subdirs in the {@link DirDiff} and return a + * Future containing the {@link DirIndex} associated with the directory. + * @param dirDiff diff for the contents of this directory + * @return A future with the {@link DirIndex} if the upload completed successfully. + */ + public CompletionStage<DirIndex> putDir(DirDiff dirDiff, SnapshotMetadata snapshotMetadata) { + // Upload all new files in the dir + List<File> filesToUpload = dirDiff.getFilesAdded(); + List<CompletionStage<FileIndex>> fileFutures = filesToUpload.stream() + .map(file -> putFile(file, snapshotMetadata)) + .collect(Collectors.toList()); + + CompletableFuture<Void> allFilesFuture = + CompletableFuture.allOf(fileFutures.toArray(new CompletableFuture[0])); + + List<CompletionStage<DirIndex>> subDirFutures = new ArrayList<>(); + // recursively upload all new subdirs of this dir + for (DirDiff subDirAdded: dirDiff.getSubDirsAdded()) { + subDirFutures.add(putDir(subDirAdded, snapshotMetadata)); + } + // recursively update contents of all subdirs that are retained but might have been modified + for (DirDiff subDirRetained: dirDiff.getSubDirsRetained()) { + subDirFutures.add(putDir(subDirRetained, snapshotMetadata)); + } + CompletableFuture<Void> allDirBlobsFuture = + CompletableFuture.allOf(subDirFutures.toArray(new CompletableFuture[0])); + + // TODO LOW shesharm can we cancel other uploads if any one of them fails? + // Check with Ambry: if client closes, what happens to inflight uploads + return CompletableFuture.allOf(allDirBlobsFuture, allFilesFuture) + .thenApplyAsync(f -> { + LOG.trace("All file and dir uploads complete for task: {} store: {}", + snapshotMetadata.getTaskName(), snapshotMetadata.getStoreName()); + List<FileIndex> filesPresent = fileFutures.stream() + .map(blob -> blob.toCompletableFuture().join()) + .collect(Collectors.toList()); + + filesPresent.addAll(dirDiff.getFilesRetained()); + + List<DirIndex> subDirsPresent = subDirFutures.stream() + .map(subDir -> subDir.toCompletableFuture().join()) + .collect(Collectors.toList()); + + LOG.debug("Uploaded diff for task: {} store: {} with statistics: {}", + snapshotMetadata.getTaskName(), snapshotMetadata.getStoreName(), DirDiff.getStats(dirDiff)); + + LOG.trace("Returning new DirIndex for task: {} store: {}", + snapshotMetadata.getTaskName(), snapshotMetadata.getStoreName()); + return new DirIndex(dirDiff.getDirName(), + filesPresent, + dirDiff.getFilesRemoved(), + subDirsPresent, + dirDiff.getSubDirsRemoved()); + }, executor); + } + + /** + * PUTs the {@link SnapshotIndex} to the blob store. + * @param snapshotIndex SnapshotIndex to put. + * @return a Future containing the blob ID of the {@link SnapshotIndex}. + */ + public CompletableFuture<String> putSnapshotIndex(SnapshotIndex snapshotIndex) { + byte[] bytes = snapshotIndexSerde.toBytes(snapshotIndex); + String opName = "putSnapshotIndex for checkpointId " + snapshotIndex.getSnapshotMetadata().getCheckpointId(); + return FutureUtil.executeAsyncWithRetries(opName, () -> { + InputStream inputStream = new ByteArrayInputStream(bytes); // no need to close ByteArrayInputStream + SnapshotMetadata snapshotMetadata = snapshotIndex.getSnapshotMetadata(); + Metadata metadata = new Metadata(Metadata.PAYLOAD_PATH_SNAPSHOT_INDEX, Optional.of((long) bytes.length), + snapshotMetadata.getJobName(), snapshotMetadata.getJobId(), snapshotMetadata.getTaskName(), + snapshotMetadata.getStoreName()); + return blobStoreManager.put(inputStream, metadata).toCompletableFuture(); + }, isCauseNonRetriable(), executor); + } + + /** + * GETs the {@link SnapshotIndex} from the blob store. + * @param blobId blob ID of the {@link SnapshotIndex} to get + * @return a Future containing the {@link SnapshotIndex} + */ + public CompletableFuture<SnapshotIndex> getSnapshotIndex(String blobId, Metadata metadata) { + Preconditions.checkState(StringUtils.isNotBlank(blobId)); + String opName = "getSnapshotIndex: " + blobId; + return FutureUtil.executeAsyncWithRetries(opName, () -> { + ByteArrayOutputStream indexBlobStream = new ByteArrayOutputStream(); // no need to close ByteArrayOutputStream + return blobStoreManager.get(blobId, indexBlobStream, metadata).toCompletableFuture() + .thenApplyAsync(f -> snapshotIndexSerde.fromBytes(indexBlobStream.toByteArray()), executor); + }, isCauseNonRetriable(), executor); + } + + /** + * Recursively issue delete requests for files and dirs marked to be removed in a previously created remote snapshot. + * Note: We do not immediately delete files/dirs to be removed when uploading a snapshot to the remote + * store. We just track them for deletion during the upload, and delete them AFTER the snapshot is uploaded, and the + * blob IDs have been persisted as part of the checkpoint. This is to prevent data loss if a failure happens + * part way through the commit. We issue delete these file/subdirs in cleanUp() phase of commit lifecycle. + * @param dirIndex the dir in the remote snapshot to clean up. + * @param metadata Metadata related to the request + * @return a future that completes when all the files and subdirs marked for deletion are cleaned up. + */ + public CompletionStage<Void> cleanUpDir(DirIndex dirIndex, Metadata metadata) { + String dirName = dirIndex.getDirName(); + if (DirIndex.ROOT_DIR_NAME.equals(dirName)) { + LOG.debug("Cleaning up root dir in blob store."); + } else { + LOG.debug("Cleaning up dir: {} in blob store.", dirIndex.getDirName()); + } + + List<CompletionStage<Void>> cleanUpFuture = new ArrayList<>(); + List<FileIndex> files = dirIndex.getFilesRemoved(); + for (FileIndex file: files) { + Metadata requestMetadata = + new Metadata(file.getFileName(), Optional.of(file.getFileMetadata().getSize()), metadata.getJobName(), + metadata.getJobId(), metadata.getTaskName(), metadata.getStoreName()); + cleanUpFuture.add(deleteFile(file, requestMetadata)); + } + + for (DirIndex subDirToDelete : dirIndex.getSubDirsRemoved()) { + // recursively delete ALL contents of the subDirToDelete. + cleanUpFuture.add(deleteDir(subDirToDelete, metadata)); + } + + for (DirIndex subDirToRetain : dirIndex.getSubDirsPresent()) { + // recursively clean up the subDir, only deleting files and subdirs marked for deletion. + cleanUpFuture.add(cleanUpDir(subDirToRetain, metadata)); + } + + return CompletableFuture.allOf(cleanUpFuture.toArray(new CompletableFuture[0])); + } + + /** + * WARNING: This method deletes the **SnapshotIndex blob** from the snapshot. This should only be called to clean + * up an older snapshot **AFTER** all the files and sub-dirs to be deleted from this snapshot are already deleted + * using {@link #cleanUpDir(DirIndex, Metadata)} + * + * @param snapshotIndexBlobId blob ID of SnapshotIndex blob to delete + * @return a future that completes when the index blob is deleted from remote store. + */ + public CompletionStage<Void> deleteSnapshotIndexBlob(String snapshotIndexBlobId, Metadata metadata) { + Preconditions.checkState(StringUtils.isNotBlank(snapshotIndexBlobId)); + LOG.debug("Deleting SnapshotIndex blob {} from blob store", snapshotIndexBlobId); + String opName = "deleteSnapshotIndexBlob: " + snapshotIndexBlobId; + return FutureUtil.executeAsyncWithRetries(opName, () -> + blobStoreManager.delete(snapshotIndexBlobId, metadata).toCompletableFuture(), isCauseNonRetriable(), executor); + } + + /** + * Marks all the blobs associated with an {@link SnapshotIndex} to never expire. + * @param snapshotIndex {@link SnapshotIndex} of the remote snapshot + * @param metadata {@link Metadata} related to the request + * @return A future that completes when all the files and subdirs associated with this remote snapshot are marked to + * never expire. + */ + public CompletionStage<Void> removeTTL(String indexBlobId, SnapshotIndex snapshotIndex, Metadata metadata) { + SnapshotMetadata snapshotMetadata = snapshotIndex.getSnapshotMetadata(); + LOG.debug("Marking contents of SnapshotIndex: {} to never expire", snapshotMetadata.toString()); + + String opName = "removeTTL for SnapshotIndex for checkpointId: " + snapshotMetadata.getCheckpointId(); + Supplier<CompletionStage<Void>> removeDirIndexTTLAction = + () -> removeTTL(snapshotIndex.getDirIndex(), metadata).toCompletableFuture(); + CompletableFuture<Void> dirIndexTTLRemovalFuture = FutureUtil.executeAsyncWithRetries(opName, + removeDirIndexTTLAction, isCauseNonRetriable(), executor); + + return dirIndexTTLRemovalFuture.thenComposeAsync(aVoid -> { + String op2Name = "removeTTL for indexBlobId: " + indexBlobId; + Supplier<CompletionStage<Void>> removeIndexBlobTTLAction = () -> + blobStoreManager.removeTTL(indexBlobId, metadata).toCompletableFuture(); + return FutureUtil.executeAsyncWithRetries(op2Name, removeIndexBlobTTLAction, isCauseNonRetriable(), executor); + }, executor); + } + + /** + * Upload a File to blob store. + * @param file File to upload to blob store. + * @return A future containing the {@link FileIndex} for the uploaded file. + */ + @VisibleForTesting + CompletableFuture<FileIndex> putFile(File file, SnapshotMetadata snapshotMetadata) { + if (file == null || !file.isFile()) { + String message = file != null ? "Dir or Symbolic link" : "null"; + throw new SamzaException(String.format("Required a non-null parameter of type file, provided: %s", message)); + } + long putFileStartTime = System.nanoTime(); + + String opName = "putFile: " + file.getAbsolutePath(); + Supplier<CompletionStage<FileIndex>> fileUploadAction = () -> { + LOG.debug("Putting file: {} to blob store.", file.getPath()); + CompletableFuture<FileIndex> fileBlobFuture; + CheckedInputStream inputStream = null; + try { + // TODO maybe use the more efficient CRC32C / PureJavaCRC32 impl + inputStream = new CheckedInputStream(new FileInputStream(file), new CRC32()); + CheckedInputStream finalInputStream = inputStream; + FileMetadata fileMetadata = FileMetadata.fromFile(file); + if (backupMetrics != null) { + backupMetrics.avgFileSizeBytes.update(fileMetadata.getSize()); + } + + Metadata metadata = + new Metadata(file.getAbsolutePath(), Optional.of(fileMetadata.getSize()), snapshotMetadata.getJobName(), + snapshotMetadata.getJobId(), snapshotMetadata.getTaskName(), snapshotMetadata.getStoreName()); + + fileBlobFuture = blobStoreManager.put(inputStream, metadata) + .thenApplyAsync(id -> { + LOG.trace("Put complete. Closing input stream for file: {}.", file.getPath()); + try { + finalInputStream.close(); + } catch (Exception e) { + throw new SamzaException(String.format("Error closing input stream for file %s", + file.getAbsolutePath()), e); + } + + LOG.trace("Returning new FileIndex for file: {}.", file.getPath()); + return new FileIndex( + file.getName(), + Collections.singletonList(new FileBlob(id, 0)), + fileMetadata, + finalInputStream.getChecksum().getValue()); + }, executor).toCompletableFuture(); + } catch (Exception e) { + try { + if (inputStream != null) { + inputStream.close(); + } + } catch (Exception err) { + LOG.error("Error closing input stream for file {}", file.getName(), err); Review comment: Nitpick: Use a colon before variables in all log lines and exception messages. E.g.: "for file: {}". ########## File path: samza-core/src/main/java/org/apache/samza/storage/blobstore/util/BlobStoreUtil.java ########## @@ -0,0 +1,721 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.storage.blobstore.util; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import java.util.Optional; +import org.apache.samza.storage.blobstore.BlobStoreManager; +import org.apache.samza.storage.blobstore.metrics.BlobStoreBackupManagerMetrics; +import org.apache.samza.storage.blobstore.Metadata; +import org.apache.samza.storage.blobstore.diff.DirDiff; +import org.apache.samza.storage.blobstore.exceptions.RetriableException; +import org.apache.samza.storage.blobstore.index.DirIndex; +import org.apache.samza.storage.blobstore.index.FileBlob; +import org.apache.samza.storage.blobstore.index.FileIndex; +import org.apache.samza.storage.blobstore.index.FileMetadata; +import org.apache.samza.storage.blobstore.index.SnapshotIndex; +import org.apache.samza.storage.blobstore.index.SnapshotMetadata; +import org.apache.samza.storage.blobstore.index.serde.SnapshotIndexSerde; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.nio.file.attribute.PosixFileAttributes; +import java.nio.file.attribute.PosixFilePermissions; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.ExecutorService; +import java.util.function.BiPredicate; +import java.util.function.Predicate; +import java.util.function.Supplier; +import java.util.stream.Collectors; +import java.util.zip.CRC32; +import java.util.zip.CheckedInputStream; +import org.apache.commons.lang3.StringUtils; +import org.apache.samza.SamzaException; +import org.apache.samza.storage.blobstore.metrics.BlobStoreRestoreManagerMetrics; +import org.apache.samza.util.FutureUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Helper methods to interact with remote blob store service and GET/PUT/DELETE a + * {@link SnapshotIndex} or {@link DirDiff}. + */ +public class BlobStoreUtil { + private static final Logger LOG = LoggerFactory.getLogger(BlobStoreUtil.class); + + private final SnapshotIndexSerde snapshotIndexSerde = new SnapshotIndexSerde(); + private final BlobStoreManager blobStoreManager; + private final ExecutorService executor; + private final BlobStoreBackupManagerMetrics backupMetrics; + private final BlobStoreRestoreManagerMetrics restoreMetrics; + + public BlobStoreUtil(BlobStoreManager blobStoreManager, ExecutorService executor, + BlobStoreBackupManagerMetrics backupMetrics, BlobStoreRestoreManagerMetrics restoreMetrics) { + this.blobStoreManager = blobStoreManager; + this.executor = executor; + this.backupMetrics = backupMetrics; + this.restoreMetrics = restoreMetrics; + } + + /** + * Recursively upload all new files and upload or update contents of all subdirs in the {@link DirDiff} and return a + * Future containing the {@link DirIndex} associated with the directory. + * @param dirDiff diff for the contents of this directory + * @return A future with the {@link DirIndex} if the upload completed successfully. + */ + public CompletionStage<DirIndex> putDir(DirDiff dirDiff, SnapshotMetadata snapshotMetadata) { + // Upload all new files in the dir + List<File> filesToUpload = dirDiff.getFilesAdded(); + List<CompletionStage<FileIndex>> fileFutures = filesToUpload.stream() + .map(file -> putFile(file, snapshotMetadata)) + .collect(Collectors.toList()); + + CompletableFuture<Void> allFilesFuture = + CompletableFuture.allOf(fileFutures.toArray(new CompletableFuture[0])); + + List<CompletionStage<DirIndex>> subDirFutures = new ArrayList<>(); + // recursively upload all new subdirs of this dir + for (DirDiff subDirAdded: dirDiff.getSubDirsAdded()) { + subDirFutures.add(putDir(subDirAdded, snapshotMetadata)); + } + // recursively update contents of all subdirs that are retained but might have been modified + for (DirDiff subDirRetained: dirDiff.getSubDirsRetained()) { + subDirFutures.add(putDir(subDirRetained, snapshotMetadata)); + } + CompletableFuture<Void> allDirBlobsFuture = + CompletableFuture.allOf(subDirFutures.toArray(new CompletableFuture[0])); + + // TODO LOW shesharm can we cancel other uploads if any one of them fails? + // Check with Ambry: if client closes, what happens to inflight uploads + return CompletableFuture.allOf(allDirBlobsFuture, allFilesFuture) + .thenApplyAsync(f -> { + LOG.trace("All file and dir uploads complete for task: {} store: {}", + snapshotMetadata.getTaskName(), snapshotMetadata.getStoreName()); + List<FileIndex> filesPresent = fileFutures.stream() + .map(blob -> blob.toCompletableFuture().join()) + .collect(Collectors.toList()); + + filesPresent.addAll(dirDiff.getFilesRetained()); + + List<DirIndex> subDirsPresent = subDirFutures.stream() + .map(subDir -> subDir.toCompletableFuture().join()) + .collect(Collectors.toList()); + + LOG.debug("Uploaded diff for task: {} store: {} with statistics: {}", + snapshotMetadata.getTaskName(), snapshotMetadata.getStoreName(), DirDiff.getStats(dirDiff)); + + LOG.trace("Returning new DirIndex for task: {} store: {}", + snapshotMetadata.getTaskName(), snapshotMetadata.getStoreName()); + return new DirIndex(dirDiff.getDirName(), + filesPresent, + dirDiff.getFilesRemoved(), + subDirsPresent, + dirDiff.getSubDirsRemoved()); + }, executor); + } + + /** + * PUTs the {@link SnapshotIndex} to the blob store. + * @param snapshotIndex SnapshotIndex to put. + * @return a Future containing the blob ID of the {@link SnapshotIndex}. + */ + public CompletableFuture<String> putSnapshotIndex(SnapshotIndex snapshotIndex) { + byte[] bytes = snapshotIndexSerde.toBytes(snapshotIndex); + String opName = "putSnapshotIndex for checkpointId " + snapshotIndex.getSnapshotMetadata().getCheckpointId(); + return FutureUtil.executeAsyncWithRetries(opName, () -> { + InputStream inputStream = new ByteArrayInputStream(bytes); // no need to close ByteArrayInputStream + SnapshotMetadata snapshotMetadata = snapshotIndex.getSnapshotMetadata(); + Metadata metadata = new Metadata(Metadata.PAYLOAD_PATH_SNAPSHOT_INDEX, Optional.of((long) bytes.length), + snapshotMetadata.getJobName(), snapshotMetadata.getJobId(), snapshotMetadata.getTaskName(), + snapshotMetadata.getStoreName()); + return blobStoreManager.put(inputStream, metadata).toCompletableFuture(); + }, isCauseNonRetriable(), executor); + } + + /** + * GETs the {@link SnapshotIndex} from the blob store. + * @param blobId blob ID of the {@link SnapshotIndex} to get + * @return a Future containing the {@link SnapshotIndex} + */ + public CompletableFuture<SnapshotIndex> getSnapshotIndex(String blobId, Metadata metadata) { + Preconditions.checkState(StringUtils.isNotBlank(blobId)); + String opName = "getSnapshotIndex: " + blobId; + return FutureUtil.executeAsyncWithRetries(opName, () -> { + ByteArrayOutputStream indexBlobStream = new ByteArrayOutputStream(); // no need to close ByteArrayOutputStream + return blobStoreManager.get(blobId, indexBlobStream, metadata).toCompletableFuture() + .thenApplyAsync(f -> snapshotIndexSerde.fromBytes(indexBlobStream.toByteArray()), executor); + }, isCauseNonRetriable(), executor); + } + + /** + * Recursively issue delete requests for files and dirs marked to be removed in a previously created remote snapshot. + * Note: We do not immediately delete files/dirs to be removed when uploading a snapshot to the remote + * store. We just track them for deletion during the upload, and delete them AFTER the snapshot is uploaded, and the + * blob IDs have been persisted as part of the checkpoint. This is to prevent data loss if a failure happens + * part way through the commit. We issue delete these file/subdirs in cleanUp() phase of commit lifecycle. + * @param dirIndex the dir in the remote snapshot to clean up. + * @param metadata Metadata related to the request + * @return a future that completes when all the files and subdirs marked for deletion are cleaned up. + */ + public CompletionStage<Void> cleanUpDir(DirIndex dirIndex, Metadata metadata) { + String dirName = dirIndex.getDirName(); + if (DirIndex.ROOT_DIR_NAME.equals(dirName)) { + LOG.debug("Cleaning up root dir in blob store."); + } else { + LOG.debug("Cleaning up dir: {} in blob store.", dirIndex.getDirName()); + } + + List<CompletionStage<Void>> cleanUpFuture = new ArrayList<>(); + List<FileIndex> files = dirIndex.getFilesRemoved(); + for (FileIndex file: files) { + Metadata requestMetadata = + new Metadata(file.getFileName(), Optional.of(file.getFileMetadata().getSize()), metadata.getJobName(), + metadata.getJobId(), metadata.getTaskName(), metadata.getStoreName()); + cleanUpFuture.add(deleteFile(file, requestMetadata)); + } + + for (DirIndex subDirToDelete : dirIndex.getSubDirsRemoved()) { + // recursively delete ALL contents of the subDirToDelete. + cleanUpFuture.add(deleteDir(subDirToDelete, metadata)); + } + + for (DirIndex subDirToRetain : dirIndex.getSubDirsPresent()) { + // recursively clean up the subDir, only deleting files and subdirs marked for deletion. + cleanUpFuture.add(cleanUpDir(subDirToRetain, metadata)); + } + + return CompletableFuture.allOf(cleanUpFuture.toArray(new CompletableFuture[0])); + } + + /** + * WARNING: This method deletes the **SnapshotIndex blob** from the snapshot. This should only be called to clean + * up an older snapshot **AFTER** all the files and sub-dirs to be deleted from this snapshot are already deleted + * using {@link #cleanUpDir(DirIndex, Metadata)} + * + * @param snapshotIndexBlobId blob ID of SnapshotIndex blob to delete + * @return a future that completes when the index blob is deleted from remote store. + */ + public CompletionStage<Void> deleteSnapshotIndexBlob(String snapshotIndexBlobId, Metadata metadata) { + Preconditions.checkState(StringUtils.isNotBlank(snapshotIndexBlobId)); + LOG.debug("Deleting SnapshotIndex blob {} from blob store", snapshotIndexBlobId); + String opName = "deleteSnapshotIndexBlob: " + snapshotIndexBlobId; + return FutureUtil.executeAsyncWithRetries(opName, () -> + blobStoreManager.delete(snapshotIndexBlobId, metadata).toCompletableFuture(), isCauseNonRetriable(), executor); + } + + /** + * Marks all the blobs associated with an {@link SnapshotIndex} to never expire. + * @param snapshotIndex {@link SnapshotIndex} of the remote snapshot + * @param metadata {@link Metadata} related to the request + * @return A future that completes when all the files and subdirs associated with this remote snapshot are marked to + * never expire. + */ + public CompletionStage<Void> removeTTL(String indexBlobId, SnapshotIndex snapshotIndex, Metadata metadata) { + SnapshotMetadata snapshotMetadata = snapshotIndex.getSnapshotMetadata(); + LOG.debug("Marking contents of SnapshotIndex: {} to never expire", snapshotMetadata.toString()); + + String opName = "removeTTL for SnapshotIndex for checkpointId: " + snapshotMetadata.getCheckpointId(); + Supplier<CompletionStage<Void>> removeDirIndexTTLAction = + () -> removeTTL(snapshotIndex.getDirIndex(), metadata).toCompletableFuture(); + CompletableFuture<Void> dirIndexTTLRemovalFuture = FutureUtil.executeAsyncWithRetries(opName, + removeDirIndexTTLAction, isCauseNonRetriable(), executor); + + return dirIndexTTLRemovalFuture.thenComposeAsync(aVoid -> { + String op2Name = "removeTTL for indexBlobId: " + indexBlobId; + Supplier<CompletionStage<Void>> removeIndexBlobTTLAction = () -> + blobStoreManager.removeTTL(indexBlobId, metadata).toCompletableFuture(); + return FutureUtil.executeAsyncWithRetries(op2Name, removeIndexBlobTTLAction, isCauseNonRetriable(), executor); + }, executor); + } + + /** + * Upload a File to blob store. + * @param file File to upload to blob store. + * @return A future containing the {@link FileIndex} for the uploaded file. + */ + @VisibleForTesting + CompletableFuture<FileIndex> putFile(File file, SnapshotMetadata snapshotMetadata) { + if (file == null || !file.isFile()) { + String message = file != null ? "Dir or Symbolic link" : "null"; + throw new SamzaException(String.format("Required a non-null parameter of type file, provided: %s", message)); + } + long putFileStartTime = System.nanoTime(); + + String opName = "putFile: " + file.getAbsolutePath(); + Supplier<CompletionStage<FileIndex>> fileUploadAction = () -> { + LOG.debug("Putting file: {} to blob store.", file.getPath()); + CompletableFuture<FileIndex> fileBlobFuture; + CheckedInputStream inputStream = null; + try { + // TODO maybe use the more efficient CRC32C / PureJavaCRC32 impl + inputStream = new CheckedInputStream(new FileInputStream(file), new CRC32()); + CheckedInputStream finalInputStream = inputStream; + FileMetadata fileMetadata = FileMetadata.fromFile(file); + if (backupMetrics != null) { + backupMetrics.avgFileSizeBytes.update(fileMetadata.getSize()); + } + + Metadata metadata = + new Metadata(file.getAbsolutePath(), Optional.of(fileMetadata.getSize()), snapshotMetadata.getJobName(), + snapshotMetadata.getJobId(), snapshotMetadata.getTaskName(), snapshotMetadata.getStoreName()); + + fileBlobFuture = blobStoreManager.put(inputStream, metadata) + .thenApplyAsync(id -> { + LOG.trace("Put complete. Closing input stream for file: {}.", file.getPath()); + try { + finalInputStream.close(); + } catch (Exception e) { + throw new SamzaException(String.format("Error closing input stream for file %s", + file.getAbsolutePath()), e); + } + + LOG.trace("Returning new FileIndex for file: {}.", file.getPath()); + return new FileIndex( + file.getName(), + Collections.singletonList(new FileBlob(id, 0)), + fileMetadata, + finalInputStream.getChecksum().getValue()); + }, executor).toCompletableFuture(); + } catch (Exception e) { + try { + if (inputStream != null) { + inputStream.close(); + } + } catch (Exception err) { + LOG.error("Error closing input stream for file {}", file.getName(), err); + } + LOG.error("Error putting file {}", file.getName(), e); + throw new SamzaException(String.format("Error putting file %s", file.getAbsolutePath()), e); + } + return fileBlobFuture; + }; + + return FutureUtil.executeAsyncWithRetries(opName, fileUploadAction, isCauseNonRetriable(), executor) + .whenComplete((res, ex) -> { + if (backupMetrics != null) { + backupMetrics.avgFileUploadNs.update(System.nanoTime() - putFileStartTime); + + long fileSize = file.length(); + backupMetrics.uploadRate.inc(fileSize); + backupMetrics.filesUploaded.getValue().addAndGet(1); + backupMetrics.bytesUploaded.getValue().addAndGet(fileSize); + backupMetrics.filesRemaining.getValue().addAndGet(-1); + backupMetrics.bytesRemaining.getValue().addAndGet(-1 * fileSize); + } + }); + } + + /** + * Non-blocking restore of a {@link SnapshotIndex} to local store by downloading all the files and sub-dirs associated + * with this remote snapshot. + * @return A list of future for all the async downloads + */ + @VisibleForTesting + public List<CompletableFuture<Void>> restoreDir(File baseDir, DirIndex dirIndex, Metadata metadata) { + LOG.debug("Restoring contents of directory: {} from remote snapshot.", baseDir); + + List<CompletableFuture<Void>> downloadFutures = new ArrayList<>(); + + try { + // create parent directories if they don't exist + Files.createDirectories(baseDir.toPath()); + } catch (IOException exception) { + LOG.error("Error creating directory: {} for restore", baseDir.getAbsolutePath(), exception); + throw new SamzaException(String.format("Error creating directory: %s for restore", + baseDir.getAbsolutePath()), exception); + } + + // restore all files in the directory + for (FileIndex fileIndex : dirIndex.getFilesPresent()) { + File fileToRestore = Paths.get(baseDir.getAbsolutePath(), fileIndex.getFileName()).toFile(); + Metadata requestMetadata = + new Metadata(fileToRestore.getAbsolutePath(), Optional.of(fileToRestore.length()), + metadata.getJobName(), metadata.getJobId(), metadata.getTaskName(), metadata.getStoreName()); + List<FileBlob> fileBlobs = fileIndex.getBlobs(); + + String opName = "restoreFile: " + fileToRestore.getAbsolutePath(); Review comment: Does it make sense to extract getFile as a separate method, like putFile above? ########## File path: samza-core/src/main/java/org/apache/samza/storage/blobstore/util/BlobStoreUtil.java ########## @@ -0,0 +1,721 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.storage.blobstore.util; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import java.util.Optional; +import org.apache.samza.storage.blobstore.BlobStoreManager; +import org.apache.samza.storage.blobstore.metrics.BlobStoreBackupManagerMetrics; +import org.apache.samza.storage.blobstore.Metadata; +import org.apache.samza.storage.blobstore.diff.DirDiff; +import org.apache.samza.storage.blobstore.exceptions.RetriableException; +import org.apache.samza.storage.blobstore.index.DirIndex; +import org.apache.samza.storage.blobstore.index.FileBlob; +import org.apache.samza.storage.blobstore.index.FileIndex; +import org.apache.samza.storage.blobstore.index.FileMetadata; +import org.apache.samza.storage.blobstore.index.SnapshotIndex; +import org.apache.samza.storage.blobstore.index.SnapshotMetadata; +import org.apache.samza.storage.blobstore.index.serde.SnapshotIndexSerde; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.nio.file.attribute.PosixFileAttributes; +import java.nio.file.attribute.PosixFilePermissions; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.ExecutorService; +import java.util.function.BiPredicate; +import java.util.function.Predicate; +import java.util.function.Supplier; +import java.util.stream.Collectors; +import java.util.zip.CRC32; +import java.util.zip.CheckedInputStream; +import org.apache.commons.lang3.StringUtils; +import org.apache.samza.SamzaException; +import org.apache.samza.storage.blobstore.metrics.BlobStoreRestoreManagerMetrics; +import org.apache.samza.util.FutureUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Helper methods to interact with remote blob store service and GET/PUT/DELETE a + * {@link SnapshotIndex} or {@link DirDiff}. + */ +public class BlobStoreUtil { + private static final Logger LOG = LoggerFactory.getLogger(BlobStoreUtil.class); + + private final SnapshotIndexSerde snapshotIndexSerde = new SnapshotIndexSerde(); + private final BlobStoreManager blobStoreManager; + private final ExecutorService executor; + private final BlobStoreBackupManagerMetrics backupMetrics; + private final BlobStoreRestoreManagerMetrics restoreMetrics; + + public BlobStoreUtil(BlobStoreManager blobStoreManager, ExecutorService executor, + BlobStoreBackupManagerMetrics backupMetrics, BlobStoreRestoreManagerMetrics restoreMetrics) { + this.blobStoreManager = blobStoreManager; + this.executor = executor; + this.backupMetrics = backupMetrics; + this.restoreMetrics = restoreMetrics; + } + + /** + * Recursively upload all new files and upload or update contents of all subdirs in the {@link DirDiff} and return a + * Future containing the {@link DirIndex} associated with the directory. + * @param dirDiff diff for the contents of this directory + * @return A future with the {@link DirIndex} if the upload completed successfully. + */ + public CompletionStage<DirIndex> putDir(DirDiff dirDiff, SnapshotMetadata snapshotMetadata) { + // Upload all new files in the dir + List<File> filesToUpload = dirDiff.getFilesAdded(); + List<CompletionStage<FileIndex>> fileFutures = filesToUpload.stream() + .map(file -> putFile(file, snapshotMetadata)) + .collect(Collectors.toList()); + + CompletableFuture<Void> allFilesFuture = + CompletableFuture.allOf(fileFutures.toArray(new CompletableFuture[0])); + + List<CompletionStage<DirIndex>> subDirFutures = new ArrayList<>(); + // recursively upload all new subdirs of this dir + for (DirDiff subDirAdded: dirDiff.getSubDirsAdded()) { + subDirFutures.add(putDir(subDirAdded, snapshotMetadata)); + } + // recursively update contents of all subdirs that are retained but might have been modified + for (DirDiff subDirRetained: dirDiff.getSubDirsRetained()) { + subDirFutures.add(putDir(subDirRetained, snapshotMetadata)); + } + CompletableFuture<Void> allDirBlobsFuture = + CompletableFuture.allOf(subDirFutures.toArray(new CompletableFuture[0])); + + // TODO LOW shesharm can we cancel other uploads if any one of them fails? + // Check with Ambry: if client closes, what happens to inflight uploads + return CompletableFuture.allOf(allDirBlobsFuture, allFilesFuture) + .thenApplyAsync(f -> { + LOG.trace("All file and dir uploads complete for task: {} store: {}", + snapshotMetadata.getTaskName(), snapshotMetadata.getStoreName()); + List<FileIndex> filesPresent = fileFutures.stream() + .map(blob -> blob.toCompletableFuture().join()) + .collect(Collectors.toList()); + + filesPresent.addAll(dirDiff.getFilesRetained()); + + List<DirIndex> subDirsPresent = subDirFutures.stream() + .map(subDir -> subDir.toCompletableFuture().join()) + .collect(Collectors.toList()); + + LOG.debug("Uploaded diff for task: {} store: {} with statistics: {}", + snapshotMetadata.getTaskName(), snapshotMetadata.getStoreName(), DirDiff.getStats(dirDiff)); + + LOG.trace("Returning new DirIndex for task: {} store: {}", + snapshotMetadata.getTaskName(), snapshotMetadata.getStoreName()); + return new DirIndex(dirDiff.getDirName(), + filesPresent, + dirDiff.getFilesRemoved(), + subDirsPresent, + dirDiff.getSubDirsRemoved()); + }, executor); + } + + /** + * PUTs the {@link SnapshotIndex} to the blob store. + * @param snapshotIndex SnapshotIndex to put. + * @return a Future containing the blob ID of the {@link SnapshotIndex}. + */ + public CompletableFuture<String> putSnapshotIndex(SnapshotIndex snapshotIndex) { + byte[] bytes = snapshotIndexSerde.toBytes(snapshotIndex); + String opName = "putSnapshotIndex for checkpointId " + snapshotIndex.getSnapshotMetadata().getCheckpointId(); + return FutureUtil.executeAsyncWithRetries(opName, () -> { + InputStream inputStream = new ByteArrayInputStream(bytes); // no need to close ByteArrayInputStream + SnapshotMetadata snapshotMetadata = snapshotIndex.getSnapshotMetadata(); + Metadata metadata = new Metadata(Metadata.PAYLOAD_PATH_SNAPSHOT_INDEX, Optional.of((long) bytes.length), + snapshotMetadata.getJobName(), snapshotMetadata.getJobId(), snapshotMetadata.getTaskName(), + snapshotMetadata.getStoreName()); + return blobStoreManager.put(inputStream, metadata).toCompletableFuture(); + }, isCauseNonRetriable(), executor); + } + + /** + * GETs the {@link SnapshotIndex} from the blob store. + * @param blobId blob ID of the {@link SnapshotIndex} to get + * @return a Future containing the {@link SnapshotIndex} + */ + public CompletableFuture<SnapshotIndex> getSnapshotIndex(String blobId, Metadata metadata) { + Preconditions.checkState(StringUtils.isNotBlank(blobId)); + String opName = "getSnapshotIndex: " + blobId; + return FutureUtil.executeAsyncWithRetries(opName, () -> { + ByteArrayOutputStream indexBlobStream = new ByteArrayOutputStream(); // no need to close ByteArrayOutputStream + return blobStoreManager.get(blobId, indexBlobStream, metadata).toCompletableFuture() + .thenApplyAsync(f -> snapshotIndexSerde.fromBytes(indexBlobStream.toByteArray()), executor); + }, isCauseNonRetriable(), executor); + } + + /** + * Recursively issue delete requests for files and dirs marked to be removed in a previously created remote snapshot. + * Note: We do not immediately delete files/dirs to be removed when uploading a snapshot to the remote + * store. We just track them for deletion during the upload, and delete them AFTER the snapshot is uploaded, and the + * blob IDs have been persisted as part of the checkpoint. This is to prevent data loss if a failure happens + * part way through the commit. We issue delete these file/subdirs in cleanUp() phase of commit lifecycle. + * @param dirIndex the dir in the remote snapshot to clean up. + * @param metadata Metadata related to the request + * @return a future that completes when all the files and subdirs marked for deletion are cleaned up. + */ + public CompletionStage<Void> cleanUpDir(DirIndex dirIndex, Metadata metadata) { + String dirName = dirIndex.getDirName(); + if (DirIndex.ROOT_DIR_NAME.equals(dirName)) { + LOG.debug("Cleaning up root dir in blob store."); + } else { + LOG.debug("Cleaning up dir: {} in blob store.", dirIndex.getDirName()); + } + + List<CompletionStage<Void>> cleanUpFuture = new ArrayList<>(); + List<FileIndex> files = dirIndex.getFilesRemoved(); + for (FileIndex file: files) { + Metadata requestMetadata = + new Metadata(file.getFileName(), Optional.of(file.getFileMetadata().getSize()), metadata.getJobName(), + metadata.getJobId(), metadata.getTaskName(), metadata.getStoreName()); + cleanUpFuture.add(deleteFile(file, requestMetadata)); + } + + for (DirIndex subDirToDelete : dirIndex.getSubDirsRemoved()) { + // recursively delete ALL contents of the subDirToDelete. + cleanUpFuture.add(deleteDir(subDirToDelete, metadata)); + } + + for (DirIndex subDirToRetain : dirIndex.getSubDirsPresent()) { + // recursively clean up the subDir, only deleting files and subdirs marked for deletion. + cleanUpFuture.add(cleanUpDir(subDirToRetain, metadata)); + } + + return CompletableFuture.allOf(cleanUpFuture.toArray(new CompletableFuture[0])); + } + + /** + * WARNING: This method deletes the **SnapshotIndex blob** from the snapshot. This should only be called to clean + * up an older snapshot **AFTER** all the files and sub-dirs to be deleted from this snapshot are already deleted + * using {@link #cleanUpDir(DirIndex, Metadata)} + * + * @param snapshotIndexBlobId blob ID of SnapshotIndex blob to delete + * @return a future that completes when the index blob is deleted from remote store. + */ + public CompletionStage<Void> deleteSnapshotIndexBlob(String snapshotIndexBlobId, Metadata metadata) { + Preconditions.checkState(StringUtils.isNotBlank(snapshotIndexBlobId)); + LOG.debug("Deleting SnapshotIndex blob {} from blob store", snapshotIndexBlobId); + String opName = "deleteSnapshotIndexBlob: " + snapshotIndexBlobId; + return FutureUtil.executeAsyncWithRetries(opName, () -> + blobStoreManager.delete(snapshotIndexBlobId, metadata).toCompletableFuture(), isCauseNonRetriable(), executor); + } + + /** + * Marks all the blobs associated with an {@link SnapshotIndex} to never expire. + * @param snapshotIndex {@link SnapshotIndex} of the remote snapshot + * @param metadata {@link Metadata} related to the request + * @return A future that completes when all the files and subdirs associated with this remote snapshot are marked to + * never expire. + */ + public CompletionStage<Void> removeTTL(String indexBlobId, SnapshotIndex snapshotIndex, Metadata metadata) { + SnapshotMetadata snapshotMetadata = snapshotIndex.getSnapshotMetadata(); + LOG.debug("Marking contents of SnapshotIndex: {} to never expire", snapshotMetadata.toString()); + + String opName = "removeTTL for SnapshotIndex for checkpointId: " + snapshotMetadata.getCheckpointId(); + Supplier<CompletionStage<Void>> removeDirIndexTTLAction = + () -> removeTTL(snapshotIndex.getDirIndex(), metadata).toCompletableFuture(); + CompletableFuture<Void> dirIndexTTLRemovalFuture = FutureUtil.executeAsyncWithRetries(opName, + removeDirIndexTTLAction, isCauseNonRetriable(), executor); + + return dirIndexTTLRemovalFuture.thenComposeAsync(aVoid -> { + String op2Name = "removeTTL for indexBlobId: " + indexBlobId; + Supplier<CompletionStage<Void>> removeIndexBlobTTLAction = () -> + blobStoreManager.removeTTL(indexBlobId, metadata).toCompletableFuture(); + return FutureUtil.executeAsyncWithRetries(op2Name, removeIndexBlobTTLAction, isCauseNonRetriable(), executor); + }, executor); + } + + /** + * Upload a File to blob store. + * @param file File to upload to blob store. + * @return A future containing the {@link FileIndex} for the uploaded file. + */ + @VisibleForTesting + CompletableFuture<FileIndex> putFile(File file, SnapshotMetadata snapshotMetadata) { + if (file == null || !file.isFile()) { + String message = file != null ? "Dir or Symbolic link" : "null"; + throw new SamzaException(String.format("Required a non-null parameter of type file, provided: %s", message)); + } + long putFileStartTime = System.nanoTime(); + + String opName = "putFile: " + file.getAbsolutePath(); + Supplier<CompletionStage<FileIndex>> fileUploadAction = () -> { + LOG.debug("Putting file: {} to blob store.", file.getPath()); + CompletableFuture<FileIndex> fileBlobFuture; + CheckedInputStream inputStream = null; + try { + // TODO maybe use the more efficient CRC32C / PureJavaCRC32 impl + inputStream = new CheckedInputStream(new FileInputStream(file), new CRC32()); + CheckedInputStream finalInputStream = inputStream; + FileMetadata fileMetadata = FileMetadata.fromFile(file); + if (backupMetrics != null) { + backupMetrics.avgFileSizeBytes.update(fileMetadata.getSize()); + } + + Metadata metadata = + new Metadata(file.getAbsolutePath(), Optional.of(fileMetadata.getSize()), snapshotMetadata.getJobName(), + snapshotMetadata.getJobId(), snapshotMetadata.getTaskName(), snapshotMetadata.getStoreName()); + + fileBlobFuture = blobStoreManager.put(inputStream, metadata) + .thenApplyAsync(id -> { + LOG.trace("Put complete. Closing input stream for file: {}.", file.getPath()); + try { + finalInputStream.close(); + } catch (Exception e) { + throw new SamzaException(String.format("Error closing input stream for file %s", + file.getAbsolutePath()), e); + } + + LOG.trace("Returning new FileIndex for file: {}.", file.getPath()); + return new FileIndex( + file.getName(), + Collections.singletonList(new FileBlob(id, 0)), + fileMetadata, + finalInputStream.getChecksum().getValue()); + }, executor).toCompletableFuture(); + } catch (Exception e) { + try { + if (inputStream != null) { + inputStream.close(); + } + } catch (Exception err) { + LOG.error("Error closing input stream for file {}", file.getName(), err); + } + LOG.error("Error putting file {}", file.getName(), e); + throw new SamzaException(String.format("Error putting file %s", file.getAbsolutePath()), e); + } + return fileBlobFuture; + }; + + return FutureUtil.executeAsyncWithRetries(opName, fileUploadAction, isCauseNonRetriable(), executor) + .whenComplete((res, ex) -> { + if (backupMetrics != null) { + backupMetrics.avgFileUploadNs.update(System.nanoTime() - putFileStartTime); + + long fileSize = file.length(); + backupMetrics.uploadRate.inc(fileSize); + backupMetrics.filesUploaded.getValue().addAndGet(1); + backupMetrics.bytesUploaded.getValue().addAndGet(fileSize); + backupMetrics.filesRemaining.getValue().addAndGet(-1); + backupMetrics.bytesRemaining.getValue().addAndGet(-1 * fileSize); + } + }); + } + + /** + * Non-blocking restore of a {@link SnapshotIndex} to local store by downloading all the files and sub-dirs associated + * with this remote snapshot. + * @return A list of future for all the async downloads + */ + @VisibleForTesting Review comment: Remove, this is a public method now. ########## File path: samza-core/src/main/java/org/apache/samza/config/StorageConfig.java ########## @@ -66,6 +66,9 @@ public static final String CHANGELOG_MIN_COMPACTION_LAG_MS = STORE_PREFIX + "%s.changelog." + MIN_COMPACTION_LAG_MS; public static final long DEFAULT_CHANGELOG_MIN_COMPACTION_LAG_MS = TimeUnit.HOURS.toMillis(4); + public static final String BLOB_STORE_BACKEND_ADMIN_FACTORY = "blob.store.backend.admin.factory"; Review comment: Blob store state backend specific configs should not be in StorageConfig. ########## File path: samza-core/src/main/java/org/apache/samza/storage/blobstore/util/BlobStoreUtil.java ########## @@ -0,0 +1,721 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.storage.blobstore.util; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import java.util.Optional; +import org.apache.samza.storage.blobstore.BlobStoreManager; +import org.apache.samza.storage.blobstore.metrics.BlobStoreBackupManagerMetrics; +import org.apache.samza.storage.blobstore.Metadata; +import org.apache.samza.storage.blobstore.diff.DirDiff; +import org.apache.samza.storage.blobstore.exceptions.RetriableException; +import org.apache.samza.storage.blobstore.index.DirIndex; +import org.apache.samza.storage.blobstore.index.FileBlob; +import org.apache.samza.storage.blobstore.index.FileIndex; +import org.apache.samza.storage.blobstore.index.FileMetadata; +import org.apache.samza.storage.blobstore.index.SnapshotIndex; +import org.apache.samza.storage.blobstore.index.SnapshotMetadata; +import org.apache.samza.storage.blobstore.index.serde.SnapshotIndexSerde; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.nio.file.attribute.PosixFileAttributes; +import java.nio.file.attribute.PosixFilePermissions; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.ExecutorService; +import java.util.function.BiPredicate; +import java.util.function.Predicate; +import java.util.function.Supplier; +import java.util.stream.Collectors; +import java.util.zip.CRC32; +import java.util.zip.CheckedInputStream; +import org.apache.commons.lang3.StringUtils; +import org.apache.samza.SamzaException; +import org.apache.samza.storage.blobstore.metrics.BlobStoreRestoreManagerMetrics; +import org.apache.samza.util.FutureUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Helper methods to interact with remote blob store service and GET/PUT/DELETE a + * {@link SnapshotIndex} or {@link DirDiff}. + */ +public class BlobStoreUtil { + private static final Logger LOG = LoggerFactory.getLogger(BlobStoreUtil.class); + + private final SnapshotIndexSerde snapshotIndexSerde = new SnapshotIndexSerde(); + private final BlobStoreManager blobStoreManager; + private final ExecutorService executor; + private final BlobStoreBackupManagerMetrics backupMetrics; + private final BlobStoreRestoreManagerMetrics restoreMetrics; + + public BlobStoreUtil(BlobStoreManager blobStoreManager, ExecutorService executor, + BlobStoreBackupManagerMetrics backupMetrics, BlobStoreRestoreManagerMetrics restoreMetrics) { + this.blobStoreManager = blobStoreManager; + this.executor = executor; + this.backupMetrics = backupMetrics; + this.restoreMetrics = restoreMetrics; + } + + /** + * Recursively upload all new files and upload or update contents of all subdirs in the {@link DirDiff} and return a + * Future containing the {@link DirIndex} associated with the directory. + * @param dirDiff diff for the contents of this directory + * @return A future with the {@link DirIndex} if the upload completed successfully. + */ + public CompletionStage<DirIndex> putDir(DirDiff dirDiff, SnapshotMetadata snapshotMetadata) { + // Upload all new files in the dir + List<File> filesToUpload = dirDiff.getFilesAdded(); + List<CompletionStage<FileIndex>> fileFutures = filesToUpload.stream() + .map(file -> putFile(file, snapshotMetadata)) + .collect(Collectors.toList()); + + CompletableFuture<Void> allFilesFuture = + CompletableFuture.allOf(fileFutures.toArray(new CompletableFuture[0])); + + List<CompletionStage<DirIndex>> subDirFutures = new ArrayList<>(); + // recursively upload all new subdirs of this dir + for (DirDiff subDirAdded: dirDiff.getSubDirsAdded()) { + subDirFutures.add(putDir(subDirAdded, snapshotMetadata)); + } + // recursively update contents of all subdirs that are retained but might have been modified + for (DirDiff subDirRetained: dirDiff.getSubDirsRetained()) { + subDirFutures.add(putDir(subDirRetained, snapshotMetadata)); + } + CompletableFuture<Void> allDirBlobsFuture = + CompletableFuture.allOf(subDirFutures.toArray(new CompletableFuture[0])); + + // TODO LOW shesharm can we cancel other uploads if any one of them fails? + // Check with Ambry: if client closes, what happens to inflight uploads + return CompletableFuture.allOf(allDirBlobsFuture, allFilesFuture) + .thenApplyAsync(f -> { + LOG.trace("All file and dir uploads complete for task: {} store: {}", + snapshotMetadata.getTaskName(), snapshotMetadata.getStoreName()); + List<FileIndex> filesPresent = fileFutures.stream() + .map(blob -> blob.toCompletableFuture().join()) + .collect(Collectors.toList()); + + filesPresent.addAll(dirDiff.getFilesRetained()); + + List<DirIndex> subDirsPresent = subDirFutures.stream() + .map(subDir -> subDir.toCompletableFuture().join()) + .collect(Collectors.toList()); + + LOG.debug("Uploaded diff for task: {} store: {} with statistics: {}", + snapshotMetadata.getTaskName(), snapshotMetadata.getStoreName(), DirDiff.getStats(dirDiff)); + + LOG.trace("Returning new DirIndex for task: {} store: {}", + snapshotMetadata.getTaskName(), snapshotMetadata.getStoreName()); + return new DirIndex(dirDiff.getDirName(), + filesPresent, + dirDiff.getFilesRemoved(), + subDirsPresent, + dirDiff.getSubDirsRemoved()); + }, executor); + } + + /** + * PUTs the {@link SnapshotIndex} to the blob store. + * @param snapshotIndex SnapshotIndex to put. + * @return a Future containing the blob ID of the {@link SnapshotIndex}. + */ + public CompletableFuture<String> putSnapshotIndex(SnapshotIndex snapshotIndex) { + byte[] bytes = snapshotIndexSerde.toBytes(snapshotIndex); + String opName = "putSnapshotIndex for checkpointId " + snapshotIndex.getSnapshotMetadata().getCheckpointId(); + return FutureUtil.executeAsyncWithRetries(opName, () -> { + InputStream inputStream = new ByteArrayInputStream(bytes); // no need to close ByteArrayInputStream + SnapshotMetadata snapshotMetadata = snapshotIndex.getSnapshotMetadata(); + Metadata metadata = new Metadata(Metadata.PAYLOAD_PATH_SNAPSHOT_INDEX, Optional.of((long) bytes.length), + snapshotMetadata.getJobName(), snapshotMetadata.getJobId(), snapshotMetadata.getTaskName(), + snapshotMetadata.getStoreName()); + return blobStoreManager.put(inputStream, metadata).toCompletableFuture(); + }, isCauseNonRetriable(), executor); + } + + /** + * GETs the {@link SnapshotIndex} from the blob store. + * @param blobId blob ID of the {@link SnapshotIndex} to get + * @return a Future containing the {@link SnapshotIndex} + */ + public CompletableFuture<SnapshotIndex> getSnapshotIndex(String blobId, Metadata metadata) { + Preconditions.checkState(StringUtils.isNotBlank(blobId)); + String opName = "getSnapshotIndex: " + blobId; + return FutureUtil.executeAsyncWithRetries(opName, () -> { + ByteArrayOutputStream indexBlobStream = new ByteArrayOutputStream(); // no need to close ByteArrayOutputStream + return blobStoreManager.get(blobId, indexBlobStream, metadata).toCompletableFuture() + .thenApplyAsync(f -> snapshotIndexSerde.fromBytes(indexBlobStream.toByteArray()), executor); + }, isCauseNonRetriable(), executor); + } + + /** + * Recursively issue delete requests for files and dirs marked to be removed in a previously created remote snapshot. + * Note: We do not immediately delete files/dirs to be removed when uploading a snapshot to the remote + * store. We just track them for deletion during the upload, and delete them AFTER the snapshot is uploaded, and the + * blob IDs have been persisted as part of the checkpoint. This is to prevent data loss if a failure happens + * part way through the commit. We issue delete these file/subdirs in cleanUp() phase of commit lifecycle. + * @param dirIndex the dir in the remote snapshot to clean up. + * @param metadata Metadata related to the request + * @return a future that completes when all the files and subdirs marked for deletion are cleaned up. + */ + public CompletionStage<Void> cleanUpDir(DirIndex dirIndex, Metadata metadata) { + String dirName = dirIndex.getDirName(); + if (DirIndex.ROOT_DIR_NAME.equals(dirName)) { + LOG.debug("Cleaning up root dir in blob store."); + } else { + LOG.debug("Cleaning up dir: {} in blob store.", dirIndex.getDirName()); + } + + List<CompletionStage<Void>> cleanUpFuture = new ArrayList<>(); + List<FileIndex> files = dirIndex.getFilesRemoved(); + for (FileIndex file: files) { + Metadata requestMetadata = + new Metadata(file.getFileName(), Optional.of(file.getFileMetadata().getSize()), metadata.getJobName(), + metadata.getJobId(), metadata.getTaskName(), metadata.getStoreName()); + cleanUpFuture.add(deleteFile(file, requestMetadata)); + } + + for (DirIndex subDirToDelete : dirIndex.getSubDirsRemoved()) { + // recursively delete ALL contents of the subDirToDelete. + cleanUpFuture.add(deleteDir(subDirToDelete, metadata)); + } + + for (DirIndex subDirToRetain : dirIndex.getSubDirsPresent()) { + // recursively clean up the subDir, only deleting files and subdirs marked for deletion. + cleanUpFuture.add(cleanUpDir(subDirToRetain, metadata)); + } + + return CompletableFuture.allOf(cleanUpFuture.toArray(new CompletableFuture[0])); + } + + /** + * WARNING: This method deletes the **SnapshotIndex blob** from the snapshot. This should only be called to clean + * up an older snapshot **AFTER** all the files and sub-dirs to be deleted from this snapshot are already deleted + * using {@link #cleanUpDir(DirIndex, Metadata)} + * + * @param snapshotIndexBlobId blob ID of SnapshotIndex blob to delete + * @return a future that completes when the index blob is deleted from remote store. + */ + public CompletionStage<Void> deleteSnapshotIndexBlob(String snapshotIndexBlobId, Metadata metadata) { + Preconditions.checkState(StringUtils.isNotBlank(snapshotIndexBlobId)); + LOG.debug("Deleting SnapshotIndex blob {} from blob store", snapshotIndexBlobId); + String opName = "deleteSnapshotIndexBlob: " + snapshotIndexBlobId; + return FutureUtil.executeAsyncWithRetries(opName, () -> + blobStoreManager.delete(snapshotIndexBlobId, metadata).toCompletableFuture(), isCauseNonRetriable(), executor); + } + + /** + * Marks all the blobs associated with an {@link SnapshotIndex} to never expire. + * @param snapshotIndex {@link SnapshotIndex} of the remote snapshot + * @param metadata {@link Metadata} related to the request + * @return A future that completes when all the files and subdirs associated with this remote snapshot are marked to + * never expire. + */ + public CompletionStage<Void> removeTTL(String indexBlobId, SnapshotIndex snapshotIndex, Metadata metadata) { + SnapshotMetadata snapshotMetadata = snapshotIndex.getSnapshotMetadata(); + LOG.debug("Marking contents of SnapshotIndex: {} to never expire", snapshotMetadata.toString()); + + String opName = "removeTTL for SnapshotIndex for checkpointId: " + snapshotMetadata.getCheckpointId(); + Supplier<CompletionStage<Void>> removeDirIndexTTLAction = + () -> removeTTL(snapshotIndex.getDirIndex(), metadata).toCompletableFuture(); + CompletableFuture<Void> dirIndexTTLRemovalFuture = FutureUtil.executeAsyncWithRetries(opName, + removeDirIndexTTLAction, isCauseNonRetriable(), executor); + + return dirIndexTTLRemovalFuture.thenComposeAsync(aVoid -> { + String op2Name = "removeTTL for indexBlobId: " + indexBlobId; + Supplier<CompletionStage<Void>> removeIndexBlobTTLAction = () -> + blobStoreManager.removeTTL(indexBlobId, metadata).toCompletableFuture(); + return FutureUtil.executeAsyncWithRetries(op2Name, removeIndexBlobTTLAction, isCauseNonRetriable(), executor); + }, executor); + } + + /** + * Upload a File to blob store. + * @param file File to upload to blob store. + * @return A future containing the {@link FileIndex} for the uploaded file. + */ + @VisibleForTesting + CompletableFuture<FileIndex> putFile(File file, SnapshotMetadata snapshotMetadata) { + if (file == null || !file.isFile()) { + String message = file != null ? "Dir or Symbolic link" : "null"; + throw new SamzaException(String.format("Required a non-null parameter of type file, provided: %s", message)); + } + long putFileStartTime = System.nanoTime(); + + String opName = "putFile: " + file.getAbsolutePath(); + Supplier<CompletionStage<FileIndex>> fileUploadAction = () -> { + LOG.debug("Putting file: {} to blob store.", file.getPath()); + CompletableFuture<FileIndex> fileBlobFuture; + CheckedInputStream inputStream = null; + try { + // TODO maybe use the more efficient CRC32C / PureJavaCRC32 impl + inputStream = new CheckedInputStream(new FileInputStream(file), new CRC32()); + CheckedInputStream finalInputStream = inputStream; + FileMetadata fileMetadata = FileMetadata.fromFile(file); + if (backupMetrics != null) { + backupMetrics.avgFileSizeBytes.update(fileMetadata.getSize()); + } + + Metadata metadata = + new Metadata(file.getAbsolutePath(), Optional.of(fileMetadata.getSize()), snapshotMetadata.getJobName(), + snapshotMetadata.getJobId(), snapshotMetadata.getTaskName(), snapshotMetadata.getStoreName()); + + fileBlobFuture = blobStoreManager.put(inputStream, metadata) + .thenApplyAsync(id -> { + LOG.trace("Put complete. Closing input stream for file: {}.", file.getPath()); + try { + finalInputStream.close(); + } catch (Exception e) { + throw new SamzaException(String.format("Error closing input stream for file %s", + file.getAbsolutePath()), e); + } + + LOG.trace("Returning new FileIndex for file: {}.", file.getPath()); + return new FileIndex( + file.getName(), + Collections.singletonList(new FileBlob(id, 0)), + fileMetadata, + finalInputStream.getChecksum().getValue()); + }, executor).toCompletableFuture(); + } catch (Exception e) { + try { + if (inputStream != null) { + inputStream.close(); + } + } catch (Exception err) { + LOG.error("Error closing input stream for file {}", file.getName(), err); + } + LOG.error("Error putting file {}", file.getName(), e); + throw new SamzaException(String.format("Error putting file %s", file.getAbsolutePath()), e); + } + return fileBlobFuture; + }; + + return FutureUtil.executeAsyncWithRetries(opName, fileUploadAction, isCauseNonRetriable(), executor) + .whenComplete((res, ex) -> { + if (backupMetrics != null) { + backupMetrics.avgFileUploadNs.update(System.nanoTime() - putFileStartTime); + + long fileSize = file.length(); + backupMetrics.uploadRate.inc(fileSize); + backupMetrics.filesUploaded.getValue().addAndGet(1); + backupMetrics.bytesUploaded.getValue().addAndGet(fileSize); + backupMetrics.filesRemaining.getValue().addAndGet(-1); + backupMetrics.bytesRemaining.getValue().addAndGet(-1 * fileSize); + } + }); + } + + /** + * Non-blocking restore of a {@link SnapshotIndex} to local store by downloading all the files and sub-dirs associated + * with this remote snapshot. + * @return A list of future for all the async downloads + */ + @VisibleForTesting + public List<CompletableFuture<Void>> restoreDir(File baseDir, DirIndex dirIndex, Metadata metadata) { + LOG.debug("Restoring contents of directory: {} from remote snapshot.", baseDir); + + List<CompletableFuture<Void>> downloadFutures = new ArrayList<>(); + + try { + // create parent directories if they don't exist + Files.createDirectories(baseDir.toPath()); + } catch (IOException exception) { + LOG.error("Error creating directory: {} for restore", baseDir.getAbsolutePath(), exception); + throw new SamzaException(String.format("Error creating directory: %s for restore", + baseDir.getAbsolutePath()), exception); + } + + // restore all files in the directory + for (FileIndex fileIndex : dirIndex.getFilesPresent()) { + File fileToRestore = Paths.get(baseDir.getAbsolutePath(), fileIndex.getFileName()).toFile(); + Metadata requestMetadata = + new Metadata(fileToRestore.getAbsolutePath(), Optional.of(fileToRestore.length()), + metadata.getJobName(), metadata.getJobId(), metadata.getTaskName(), metadata.getStoreName()); + List<FileBlob> fileBlobs = fileIndex.getBlobs(); + + String opName = "restoreFile: " + fileToRestore.getAbsolutePath(); + CompletableFuture<Void> fileRestoreFuture = FutureUtil.executeAsyncWithRetries(opName, () -> { + long restoreFileStartTime = System.nanoTime(); + FileOutputStream outputStream = null; + try { + // TODO HIGH shesharm ensure that ambry + standby is handled correctly (i.e. no continuous restore for ambry Review comment: Remove comment, address separately. ########## File path: samza-core/src/main/java/org/apache/samza/storage/blobstore/util/BlobStoreUtil.java ########## @@ -0,0 +1,721 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.storage.blobstore.util; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import java.util.Optional; +import org.apache.samza.storage.blobstore.BlobStoreManager; +import org.apache.samza.storage.blobstore.metrics.BlobStoreBackupManagerMetrics; +import org.apache.samza.storage.blobstore.Metadata; +import org.apache.samza.storage.blobstore.diff.DirDiff; +import org.apache.samza.storage.blobstore.exceptions.RetriableException; +import org.apache.samza.storage.blobstore.index.DirIndex; +import org.apache.samza.storage.blobstore.index.FileBlob; +import org.apache.samza.storage.blobstore.index.FileIndex; +import org.apache.samza.storage.blobstore.index.FileMetadata; +import org.apache.samza.storage.blobstore.index.SnapshotIndex; +import org.apache.samza.storage.blobstore.index.SnapshotMetadata; +import org.apache.samza.storage.blobstore.index.serde.SnapshotIndexSerde; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.nio.file.attribute.PosixFileAttributes; +import java.nio.file.attribute.PosixFilePermissions; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.ExecutorService; +import java.util.function.BiPredicate; +import java.util.function.Predicate; +import java.util.function.Supplier; +import java.util.stream.Collectors; +import java.util.zip.CRC32; +import java.util.zip.CheckedInputStream; +import org.apache.commons.lang3.StringUtils; +import org.apache.samza.SamzaException; +import org.apache.samza.storage.blobstore.metrics.BlobStoreRestoreManagerMetrics; +import org.apache.samza.util.FutureUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Helper methods to interact with remote blob store service and GET/PUT/DELETE a + * {@link SnapshotIndex} or {@link DirDiff}. + */ +public class BlobStoreUtil { + private static final Logger LOG = LoggerFactory.getLogger(BlobStoreUtil.class); + + private final SnapshotIndexSerde snapshotIndexSerde = new SnapshotIndexSerde(); + private final BlobStoreManager blobStoreManager; + private final ExecutorService executor; + private final BlobStoreBackupManagerMetrics backupMetrics; + private final BlobStoreRestoreManagerMetrics restoreMetrics; + + public BlobStoreUtil(BlobStoreManager blobStoreManager, ExecutorService executor, + BlobStoreBackupManagerMetrics backupMetrics, BlobStoreRestoreManagerMetrics restoreMetrics) { + this.blobStoreManager = blobStoreManager; + this.executor = executor; + this.backupMetrics = backupMetrics; + this.restoreMetrics = restoreMetrics; + } + + /** + * Recursively upload all new files and upload or update contents of all subdirs in the {@link DirDiff} and return a + * Future containing the {@link DirIndex} associated with the directory. + * @param dirDiff diff for the contents of this directory + * @return A future with the {@link DirIndex} if the upload completed successfully. + */ + public CompletionStage<DirIndex> putDir(DirDiff dirDiff, SnapshotMetadata snapshotMetadata) { + // Upload all new files in the dir + List<File> filesToUpload = dirDiff.getFilesAdded(); + List<CompletionStage<FileIndex>> fileFutures = filesToUpload.stream() + .map(file -> putFile(file, snapshotMetadata)) + .collect(Collectors.toList()); + + CompletableFuture<Void> allFilesFuture = + CompletableFuture.allOf(fileFutures.toArray(new CompletableFuture[0])); + + List<CompletionStage<DirIndex>> subDirFutures = new ArrayList<>(); + // recursively upload all new subdirs of this dir + for (DirDiff subDirAdded: dirDiff.getSubDirsAdded()) { + subDirFutures.add(putDir(subDirAdded, snapshotMetadata)); + } + // recursively update contents of all subdirs that are retained but might have been modified + for (DirDiff subDirRetained: dirDiff.getSubDirsRetained()) { + subDirFutures.add(putDir(subDirRetained, snapshotMetadata)); + } + CompletableFuture<Void> allDirBlobsFuture = + CompletableFuture.allOf(subDirFutures.toArray(new CompletableFuture[0])); + + // TODO LOW shesharm can we cancel other uploads if any one of them fails? + // Check with Ambry: if client closes, what happens to inflight uploads + return CompletableFuture.allOf(allDirBlobsFuture, allFilesFuture) + .thenApplyAsync(f -> { + LOG.trace("All file and dir uploads complete for task: {} store: {}", + snapshotMetadata.getTaskName(), snapshotMetadata.getStoreName()); + List<FileIndex> filesPresent = fileFutures.stream() + .map(blob -> blob.toCompletableFuture().join()) + .collect(Collectors.toList()); + + filesPresent.addAll(dirDiff.getFilesRetained()); + + List<DirIndex> subDirsPresent = subDirFutures.stream() + .map(subDir -> subDir.toCompletableFuture().join()) + .collect(Collectors.toList()); + + LOG.debug("Uploaded diff for task: {} store: {} with statistics: {}", + snapshotMetadata.getTaskName(), snapshotMetadata.getStoreName(), DirDiff.getStats(dirDiff)); + + LOG.trace("Returning new DirIndex for task: {} store: {}", + snapshotMetadata.getTaskName(), snapshotMetadata.getStoreName()); + return new DirIndex(dirDiff.getDirName(), + filesPresent, + dirDiff.getFilesRemoved(), + subDirsPresent, + dirDiff.getSubDirsRemoved()); + }, executor); + } + + /** + * PUTs the {@link SnapshotIndex} to the blob store. + * @param snapshotIndex SnapshotIndex to put. + * @return a Future containing the blob ID of the {@link SnapshotIndex}. + */ + public CompletableFuture<String> putSnapshotIndex(SnapshotIndex snapshotIndex) { + byte[] bytes = snapshotIndexSerde.toBytes(snapshotIndex); + String opName = "putSnapshotIndex for checkpointId " + snapshotIndex.getSnapshotMetadata().getCheckpointId(); + return FutureUtil.executeAsyncWithRetries(opName, () -> { + InputStream inputStream = new ByteArrayInputStream(bytes); // no need to close ByteArrayInputStream + SnapshotMetadata snapshotMetadata = snapshotIndex.getSnapshotMetadata(); + Metadata metadata = new Metadata(Metadata.PAYLOAD_PATH_SNAPSHOT_INDEX, Optional.of((long) bytes.length), + snapshotMetadata.getJobName(), snapshotMetadata.getJobId(), snapshotMetadata.getTaskName(), + snapshotMetadata.getStoreName()); + return blobStoreManager.put(inputStream, metadata).toCompletableFuture(); + }, isCauseNonRetriable(), executor); + } + + /** + * GETs the {@link SnapshotIndex} from the blob store. + * @param blobId blob ID of the {@link SnapshotIndex} to get + * @return a Future containing the {@link SnapshotIndex} + */ + public CompletableFuture<SnapshotIndex> getSnapshotIndex(String blobId, Metadata metadata) { + Preconditions.checkState(StringUtils.isNotBlank(blobId)); + String opName = "getSnapshotIndex: " + blobId; + return FutureUtil.executeAsyncWithRetries(opName, () -> { + ByteArrayOutputStream indexBlobStream = new ByteArrayOutputStream(); // no need to close ByteArrayOutputStream + return blobStoreManager.get(blobId, indexBlobStream, metadata).toCompletableFuture() + .thenApplyAsync(f -> snapshotIndexSerde.fromBytes(indexBlobStream.toByteArray()), executor); + }, isCauseNonRetriable(), executor); + } + + /** + * Recursively issue delete requests for files and dirs marked to be removed in a previously created remote snapshot. + * Note: We do not immediately delete files/dirs to be removed when uploading a snapshot to the remote + * store. We just track them for deletion during the upload, and delete them AFTER the snapshot is uploaded, and the + * blob IDs have been persisted as part of the checkpoint. This is to prevent data loss if a failure happens + * part way through the commit. We issue delete these file/subdirs in cleanUp() phase of commit lifecycle. + * @param dirIndex the dir in the remote snapshot to clean up. + * @param metadata Metadata related to the request + * @return a future that completes when all the files and subdirs marked for deletion are cleaned up. + */ + public CompletionStage<Void> cleanUpDir(DirIndex dirIndex, Metadata metadata) { + String dirName = dirIndex.getDirName(); + if (DirIndex.ROOT_DIR_NAME.equals(dirName)) { + LOG.debug("Cleaning up root dir in blob store."); + } else { + LOG.debug("Cleaning up dir: {} in blob store.", dirIndex.getDirName()); + } + + List<CompletionStage<Void>> cleanUpFuture = new ArrayList<>(); + List<FileIndex> files = dirIndex.getFilesRemoved(); + for (FileIndex file: files) { + Metadata requestMetadata = + new Metadata(file.getFileName(), Optional.of(file.getFileMetadata().getSize()), metadata.getJobName(), + metadata.getJobId(), metadata.getTaskName(), metadata.getStoreName()); + cleanUpFuture.add(deleteFile(file, requestMetadata)); + } + + for (DirIndex subDirToDelete : dirIndex.getSubDirsRemoved()) { + // recursively delete ALL contents of the subDirToDelete. + cleanUpFuture.add(deleteDir(subDirToDelete, metadata)); + } + + for (DirIndex subDirToRetain : dirIndex.getSubDirsPresent()) { + // recursively clean up the subDir, only deleting files and subdirs marked for deletion. + cleanUpFuture.add(cleanUpDir(subDirToRetain, metadata)); + } + + return CompletableFuture.allOf(cleanUpFuture.toArray(new CompletableFuture[0])); + } + + /** + * WARNING: This method deletes the **SnapshotIndex blob** from the snapshot. This should only be called to clean + * up an older snapshot **AFTER** all the files and sub-dirs to be deleted from this snapshot are already deleted + * using {@link #cleanUpDir(DirIndex, Metadata)} + * + * @param snapshotIndexBlobId blob ID of SnapshotIndex blob to delete + * @return a future that completes when the index blob is deleted from remote store. + */ + public CompletionStage<Void> deleteSnapshotIndexBlob(String snapshotIndexBlobId, Metadata metadata) { + Preconditions.checkState(StringUtils.isNotBlank(snapshotIndexBlobId)); + LOG.debug("Deleting SnapshotIndex blob {} from blob store", snapshotIndexBlobId); + String opName = "deleteSnapshotIndexBlob: " + snapshotIndexBlobId; + return FutureUtil.executeAsyncWithRetries(opName, () -> + blobStoreManager.delete(snapshotIndexBlobId, metadata).toCompletableFuture(), isCauseNonRetriable(), executor); + } + + /** + * Marks all the blobs associated with an {@link SnapshotIndex} to never expire. + * @param snapshotIndex {@link SnapshotIndex} of the remote snapshot + * @param metadata {@link Metadata} related to the request + * @return A future that completes when all the files and subdirs associated with this remote snapshot are marked to + * never expire. + */ + public CompletionStage<Void> removeTTL(String indexBlobId, SnapshotIndex snapshotIndex, Metadata metadata) { + SnapshotMetadata snapshotMetadata = snapshotIndex.getSnapshotMetadata(); + LOG.debug("Marking contents of SnapshotIndex: {} to never expire", snapshotMetadata.toString()); + + String opName = "removeTTL for SnapshotIndex for checkpointId: " + snapshotMetadata.getCheckpointId(); + Supplier<CompletionStage<Void>> removeDirIndexTTLAction = + () -> removeTTL(snapshotIndex.getDirIndex(), metadata).toCompletableFuture(); + CompletableFuture<Void> dirIndexTTLRemovalFuture = FutureUtil.executeAsyncWithRetries(opName, + removeDirIndexTTLAction, isCauseNonRetriable(), executor); + + return dirIndexTTLRemovalFuture.thenComposeAsync(aVoid -> { + String op2Name = "removeTTL for indexBlobId: " + indexBlobId; + Supplier<CompletionStage<Void>> removeIndexBlobTTLAction = () -> + blobStoreManager.removeTTL(indexBlobId, metadata).toCompletableFuture(); + return FutureUtil.executeAsyncWithRetries(op2Name, removeIndexBlobTTLAction, isCauseNonRetriable(), executor); + }, executor); + } + + /** + * Upload a File to blob store. + * @param file File to upload to blob store. + * @return A future containing the {@link FileIndex} for the uploaded file. + */ + @VisibleForTesting + CompletableFuture<FileIndex> putFile(File file, SnapshotMetadata snapshotMetadata) { + if (file == null || !file.isFile()) { + String message = file != null ? "Dir or Symbolic link" : "null"; + throw new SamzaException(String.format("Required a non-null parameter of type file, provided: %s", message)); + } + long putFileStartTime = System.nanoTime(); + + String opName = "putFile: " + file.getAbsolutePath(); + Supplier<CompletionStage<FileIndex>> fileUploadAction = () -> { + LOG.debug("Putting file: {} to blob store.", file.getPath()); + CompletableFuture<FileIndex> fileBlobFuture; + CheckedInputStream inputStream = null; + try { + // TODO maybe use the more efficient CRC32C / PureJavaCRC32 impl + inputStream = new CheckedInputStream(new FileInputStream(file), new CRC32()); + CheckedInputStream finalInputStream = inputStream; + FileMetadata fileMetadata = FileMetadata.fromFile(file); + if (backupMetrics != null) { + backupMetrics.avgFileSizeBytes.update(fileMetadata.getSize()); + } + + Metadata metadata = + new Metadata(file.getAbsolutePath(), Optional.of(fileMetadata.getSize()), snapshotMetadata.getJobName(), + snapshotMetadata.getJobId(), snapshotMetadata.getTaskName(), snapshotMetadata.getStoreName()); + + fileBlobFuture = blobStoreManager.put(inputStream, metadata) + .thenApplyAsync(id -> { + LOG.trace("Put complete. Closing input stream for file: {}.", file.getPath()); + try { + finalInputStream.close(); + } catch (Exception e) { + throw new SamzaException(String.format("Error closing input stream for file %s", + file.getAbsolutePath()), e); + } + + LOG.trace("Returning new FileIndex for file: {}.", file.getPath()); + return new FileIndex( + file.getName(), + Collections.singletonList(new FileBlob(id, 0)), + fileMetadata, + finalInputStream.getChecksum().getValue()); + }, executor).toCompletableFuture(); + } catch (Exception e) { + try { + if (inputStream != null) { + inputStream.close(); + } + } catch (Exception err) { + LOG.error("Error closing input stream for file {}", file.getName(), err); + } + LOG.error("Error putting file {}", file.getName(), e); + throw new SamzaException(String.format("Error putting file %s", file.getAbsolutePath()), e); + } + return fileBlobFuture; + }; + + return FutureUtil.executeAsyncWithRetries(opName, fileUploadAction, isCauseNonRetriable(), executor) + .whenComplete((res, ex) -> { + if (backupMetrics != null) { + backupMetrics.avgFileUploadNs.update(System.nanoTime() - putFileStartTime); + + long fileSize = file.length(); + backupMetrics.uploadRate.inc(fileSize); + backupMetrics.filesUploaded.getValue().addAndGet(1); + backupMetrics.bytesUploaded.getValue().addAndGet(fileSize); + backupMetrics.filesRemaining.getValue().addAndGet(-1); + backupMetrics.bytesRemaining.getValue().addAndGet(-1 * fileSize); + } + }); + } + + /** + * Non-blocking restore of a {@link SnapshotIndex} to local store by downloading all the files and sub-dirs associated + * with this remote snapshot. + * @return A list of future for all the async downloads + */ + @VisibleForTesting + public List<CompletableFuture<Void>> restoreDir(File baseDir, DirIndex dirIndex, Metadata metadata) { + LOG.debug("Restoring contents of directory: {} from remote snapshot.", baseDir); + + List<CompletableFuture<Void>> downloadFutures = new ArrayList<>(); + + try { + // create parent directories if they don't exist + Files.createDirectories(baseDir.toPath()); + } catch (IOException exception) { + LOG.error("Error creating directory: {} for restore", baseDir.getAbsolutePath(), exception); + throw new SamzaException(String.format("Error creating directory: %s for restore", + baseDir.getAbsolutePath()), exception); + } + + // restore all files in the directory + for (FileIndex fileIndex : dirIndex.getFilesPresent()) { + File fileToRestore = Paths.get(baseDir.getAbsolutePath(), fileIndex.getFileName()).toFile(); + Metadata requestMetadata = + new Metadata(fileToRestore.getAbsolutePath(), Optional.of(fileToRestore.length()), + metadata.getJobName(), metadata.getJobId(), metadata.getTaskName(), metadata.getStoreName()); + List<FileBlob> fileBlobs = fileIndex.getBlobs(); + + String opName = "restoreFile: " + fileToRestore.getAbsolutePath(); + CompletableFuture<Void> fileRestoreFuture = FutureUtil.executeAsyncWithRetries(opName, () -> { + long restoreFileStartTime = System.nanoTime(); + FileOutputStream outputStream = null; + try { + // TODO HIGH shesharm ensure that ambry + standby is handled correctly (i.e. no continuous restore for ambry + // backed stores, but restore is done correctly on a failover). + if (fileToRestore.exists()) { + // delete the file if it already exists, e.g. from a previous retry. + Files.delete(fileToRestore.toPath()); + } + + // TODO HIGH shesharm add integration tests to ensure empty files and directories are handled correctly E2E. + fileToRestore.createNewFile(); // create file for 0 byte files (fileIndex entry but no fileBlobs). + + outputStream = new FileOutputStream(fileToRestore); + final FileOutputStream finalOutputStream = outputStream; + // create a copy to ensure list being sorted is mutable. + List<FileBlob> fileBlobsCopy = new ArrayList<>(fileBlobs); + fileBlobsCopy.sort(Comparator.comparingInt(FileBlob::getOffset)); // sort by offset. + + // chain the futures such that write to file for blobs is sequential. + // can be optimized to write concurrently to the file later. + CompletableFuture<Void> resultFuture = CompletableFuture.completedFuture(null); + for (FileBlob fileBlob : fileBlobsCopy) { + resultFuture = resultFuture + .thenComposeAsync(v -> { + LOG.debug("Starting restore for file: {} with blob id: {} at offset: {}", + fileToRestore, fileBlob.getBlobId(), fileBlob.getOffset()); + return blobStoreManager.get(fileBlob.getBlobId(), finalOutputStream, requestMetadata); + }, executor); + } + + resultFuture = resultFuture.thenRunAsync(() -> { + LOG.debug("Finished restore for file: {}. Closing output stream.", fileToRestore); + try { + // flush the file contents to disk + finalOutputStream.getFD().sync(); + finalOutputStream.close(); + } catch (Exception e) { + throw new SamzaException(String.format("Error closing output stream for file: %s", + fileToRestore.getAbsolutePath()), e); + } + }, executor); + + resultFuture.whenComplete((res, ex) -> { + if (restoreMetrics != null) { + restoreMetrics.avgFileRestoreNs.update(System.nanoTime() - restoreFileStartTime); + + long fileSize = fileIndex.getFileMetadata().getSize(); + restoreMetrics.restoreRate.inc(fileSize); + restoreMetrics.filesRestored.getValue().addAndGet(1); + restoreMetrics.bytesRestored.getValue().addAndGet(fileSize); + restoreMetrics.filesRemaining.getValue().addAndGet(-1); + restoreMetrics.bytesRemaining.getValue().addAndGet(-1 * fileSize); + } + }); + + return resultFuture; + } catch (Exception e) { + try { + if (outputStream != null) { + outputStream.close(); + } + } catch (Exception err) { + LOG.error("Error closing output stream for file: {}", fileToRestore.getAbsolutePath(), err); + } + + throw new SamzaException(String.format("Error restoring file: %s in directory: %s", + fileIndex.getFileName(), dirIndex.getDirName()), e); + } + }, isCauseNonRetriable(), executor); + + downloadFutures.add(fileRestoreFuture); + } + + // restore any sub-directories + List<DirIndex> subDirs = dirIndex.getSubDirsPresent(); + for (DirIndex subDir : subDirs) { + File subDirFile = Paths.get(baseDir.getAbsolutePath(), subDir.getDirName()).toFile(); + downloadFutures.addAll(restoreDir(subDirFile, subDir, metadata)); + } + + return downloadFutures; + } + + /** + * WARNING: Recursively delete **ALL** the associated files and subdirs within the provided {@link DirIndex}. + * @param dirIndex {@link DirIndex} whose entire contents are to be deleted. + * @param metadata {@link Metadata} related to the request + * @return a future that completes when ALL the files and subdirs associated with the dirIndex have been + * marked for deleted in the remote blob store. + */ + public CompletionStage<Void> deleteDir(DirIndex dirIndex, Metadata metadata) { + LOG.debug("Completely deleting dir: {} in blob store", dirIndex.getDirName()); + List<CompletionStage<Void>> deleteFutures = new ArrayList<>(); + // Delete all files present in subDir + for (FileIndex file: dirIndex.getFilesPresent()) { + Metadata requestMetadata = + new Metadata(file.getFileName(), Optional.of(file.getFileMetadata().getSize()), + metadata.getJobName(), metadata.getJobId(), metadata.getTaskName(), metadata.getStoreName()); + deleteFutures.add(deleteFile(file, requestMetadata)); + } + + // Delete all subDirs present recursively + for (DirIndex subDir: dirIndex.getSubDirsPresent()) { + deleteFutures.add(deleteDir(subDir, metadata)); + } + + return CompletableFuture.allOf(deleteFutures.toArray(new CompletableFuture[0])); + } + + /** + * Delete a {@link FileIndex} from the remote store by deleting all {@link FileBlob}s associated with it. + * @param fileIndex FileIndex of the file to delete from the remote store. + * @param metadata + * @return a future that completes when the FileIndex has been marked for deletion in the remote blob store. + */ + private CompletionStage<Void> deleteFile(FileIndex fileIndex, Metadata metadata) { + List<CompletionStage<Void>> deleteFutures = new ArrayList<>(); + List<FileBlob> fileBlobs = fileIndex.getBlobs(); + for (FileBlob fileBlob : fileBlobs) { + LOG.debug("Deleting file: {} blobId: {} from blob store.", fileIndex.getFileName(), fileBlob.getBlobId()); + String opName = "deleteFile: " + fileIndex.getFileName() + " blobId: " + fileBlob.getBlobId(); + Supplier<CompletionStage<Void>> fileDeletionAction = () -> + blobStoreManager.delete(fileBlob.getBlobId(), metadata).toCompletableFuture(); + CompletableFuture<Void> fileDeletionFuture = + FutureUtil.executeAsyncWithRetries(opName, fileDeletionAction, isCauseNonRetriable(), executor); + deleteFutures.add(fileDeletionFuture); + } + + return CompletableFuture.allOf(deleteFutures.toArray(new CompletableFuture[0])); + } + + /** + * Recursively mark all the blobs associated with the {@link DirIndex} to never expire (remove TTL). + * @param dirIndex the {@link DirIndex} whose contents' TTL needs to be removed + * @param metadata {@link Metadata} related to the request + * @return A future that completes when all the blobs associated with this dirIndex are marked to + * never expire. + */ + private CompletableFuture<Void> removeTTL(DirIndex dirIndex, Metadata metadata) { + String dirName = dirIndex.getDirName(); + if (DirIndex.ROOT_DIR_NAME.equals(dirName)) { + LOG.debug("Removing TTL for files and dirs present in DirIndex for root dir."); + } else { + LOG.debug("Removing TTL for files and dirs present in DirIndex for dir: {}", dirName); + } + + List<CompletableFuture<Void>> updateTTLsFuture = new ArrayList<>(); + for (DirIndex subDir: dirIndex.getSubDirsPresent()) { + updateTTLsFuture.add(removeTTL(subDir, metadata)); + } + + for (FileIndex file: dirIndex.getFilesPresent()) { + Metadata requestMetadata = + new Metadata(file.getFileName(), Optional.of(file.getFileMetadata().getSize()), + metadata.getJobName(), metadata.getJobId(), metadata.getTaskName(), metadata.getStoreName()); + List<FileBlob> fileBlobs = file.getBlobs(); + for (FileBlob fileBlob : fileBlobs) { + String opname = "removeTTL for fileBlob: " + file.getFileName() + " with blobId: {}" + fileBlob.getBlobId(); + Supplier<CompletionStage<Void>> ttlRemovalAction = () -> + blobStoreManager.removeTTL(fileBlob.getBlobId(), requestMetadata).toCompletableFuture(); + CompletableFuture<Void> ttlRemovalFuture = + FutureUtil.executeAsyncWithRetries(opname, ttlRemovalAction, isCauseNonRetriable(), executor); + updateTTLsFuture.add(ttlRemovalFuture); + } + } + + return CompletableFuture.allOf(updateTTLsFuture.toArray(new CompletableFuture[0])); + } + + /** + * Bipredicate to test a local file in the filesystem and a remote file {@link FileIndex} and find out if they represent + * the same file. Files with same attributes as well as content are same file. A SST file in a special case. They are + * immutable, so we only compare their attributes but not the content. + * @param compareLargeFileChecksums whether to compare checksums for large files (> 1 MB). + * @return BiPredicate to test similarity of local and remote files + */ + public static BiPredicate<File, FileIndex> areSameFile(boolean compareLargeFileChecksums) { + return (localFile, remoteFile) -> { + if (localFile.getName().equals(remoteFile.getFileName())) { + FileMetadata remoteFileMetadata = remoteFile.getFileMetadata(); + + PosixFileAttributes localFileAttrs = null; + try { + localFileAttrs = Files.readAttributes(localFile.toPath(), PosixFileAttributes.class); + } catch (IOException e) { + LOG.error("Error reading attributes for file {}", localFile.getAbsolutePath()); + throw new RuntimeException(String.format("Error reading attributes for file: %s", localFile.getAbsolutePath())); + } + + // Don't compare file timestamps. The ctime of a local file just restored will be different than the + // remote file, and will cause the file to be uploaded again during the first commit after restore. + + boolean areSameFiles = + localFileAttrs.size() == remoteFileMetadata.getSize() && + localFileAttrs.group().getName().equals(remoteFileMetadata.getGroup()) && + localFileAttrs.owner().getName().equals(remoteFileMetadata.getOwner()) && + PosixFilePermissions.toString(localFileAttrs.permissions()).equals(remoteFileMetadata.getPermissions()); + + if (!areSameFiles) { + LOG.debug("Local file {} and remote file {} are not same. " + + "Local file attributes: {}. Remote file attributes: {}.", + localFile.getAbsolutePath(), remoteFile.getFileName(), + fileAttributesToString(localFileAttrs), remoteFile.getFileMetadata().toString()); + return false; + } else { + LOG.trace("Local file {}. Remote file {}. " + + "Local file attributes: {}. Remote file attributes: {}.", + localFile.getAbsolutePath(), remoteFile.getFileName(), + fileAttributesToString(localFileAttrs), remoteFile.getFileMetadata().toString()); + } + + boolean isLargeFile = localFileAttrs.size() > 1024 * 1024; + if (!compareLargeFileChecksums && isLargeFile) { + // Since RocksDB SST files are immutable after creation, we can skip the expensive checksum computations + // which requires reading the entire file. + LOG.debug("Local file {} and remote file {} are same. " + + "Skipping checksum calculation for large file of size: {}.", + localFile.getAbsolutePath(), remoteFile.getFileName(), localFileAttrs.size()); + return true; + } else { + try { + FileInputStream fis = new FileInputStream(localFile); + CheckedInputStream cis = new CheckedInputStream(fis, new CRC32()); + byte[] buffer = new byte[8 * 1024]; // 8 KB + while (cis.read(buffer, 0, buffer.length) >= 0) {} + long localFileChecksum = cis.getChecksum().getValue(); + cis.close(); + + boolean areSameChecksum = localFileChecksum == remoteFile.getChecksum(); + if (!areSameChecksum) { + LOG.debug("Local file {} and remote file {} are not same. " + + "Local checksum: {}. Remote checksum: {}", + localFile.getAbsolutePath(), remoteFile.getFileName(), localFileChecksum, remoteFile.getChecksum()); + } else { + LOG.debug("Local file {} and remote file: {} are same. Local checksum: {}. Remote checksum: {}", + localFile.getAbsolutePath(), remoteFile.getFileName(), localFileChecksum, remoteFile.getChecksum()); + } + return areSameChecksum; + } catch (IOException e) { + throw new SamzaException("Error calculating checksum for local file: " + localFile.getAbsolutePath(), e); + } + } + } + + return false; + }; + } + + /** + * Checks if a local directory and a remote directory are identical. Local and remote directories are identical iff: + * 1. The local directory has exactly the same set of files as the remote directory, and the files are themselves + * identical, as determined by {@link #areSameFile(boolean)}, except for those allowed to differ according to + * {@param filesToIgnore}. + * 2. The local directory has exactly the same set of sub-directories as the remote directory. + * + * @param filesToIgnore a set of file names to ignore during the directory comparisons + * (does not exclude directory names) + * @param compareLargeFileChecksums whether to compare checksums for large files (> 1 MB). + * @return boolean indicating whether the local and remote directory are identical. + */ + // TODO HIGH shesharm add unit tests Review comment: Is this complete? Fix and remove before merging. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
