Sanil15 commented on a change in pull request #1501:
URL: https://github.com/apache/samza/pull/1501#discussion_r636308428



##########
File path: 
samza-api/src/main/java/org/apache/samza/storage/blobstore/Metadata.java
##########
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage.blobstore;
+
+import java.util.Optional;
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+
+
+public class Metadata {

Review comment:
       Docs on what this metadata is for? is BlobMetadata a better name, 
Metadata seems too generic 

##########
File path: 
samza-api/src/main/java/org/apache/samza/storage/blobstore/Metadata.java
##########
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage.blobstore;
+
+import java.util.Optional;
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+
+
+public class Metadata {
+  public static final String SNAPSHOT_INDEX_PAYLOAD_PATH = "snapshot-index";
+
+  private final String payloadPath;
+  private final long payloadSize;
+  private final String jobName;
+  private final String jobId;
+  private final String taskName;
+  private final String storeName;
+
+  public Metadata(String payloadPath, Optional<Long> payloadSize,
+      String jobName, String jobId, String taskName, String storeName) {
+    this.payloadPath = payloadPath;

Review comment:
       No Preconditions check here ?

##########
File path: 
samza-api/src/main/java/org/apache/samza/storage/blobstore/exceptions/RetriableException.java
##########
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage.blobstore.exceptions;
+
+
+public class RetriableException extends RuntimeException {

Review comment:
       Same should this not extend SamzaException?
   
   

##########
File path: 
samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
##########
@@ -267,6 +270,17 @@ public void run() {
       MetadataResourceUtil metadataResourceUtil = new 
MetadataResourceUtil(jobModel, this.metrics, config);
       metadataResourceUtil.createResources();
 
+      // create all the resources required for state backend factories

Review comment:
       abstract this logic in a helper, call the helper here? 

##########
File path: 
samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreBackupManager.java
##########
@@ -0,0 +1,339 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage.blobstore;
+
+import com.google.common.collect.ImmutableMap;
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.samza.SamzaException;
+import org.apache.samza.checkpoint.Checkpoint;
+import org.apache.samza.checkpoint.CheckpointId;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.StorageConfig;
+import org.apache.samza.job.model.ContainerModel;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.job.model.TaskModel;
+import org.apache.samza.storage.StorageManagerUtil;
+import org.apache.samza.storage.TaskBackupManager;
+import org.apache.samza.storage.blobstore.diff.DirDiff;
+import org.apache.samza.storage.blobstore.index.DirIndex;
+import org.apache.samza.storage.blobstore.index.SnapshotIndex;
+import org.apache.samza.storage.blobstore.index.SnapshotMetadata;
+import 
org.apache.samza.storage.blobstore.metrics.BlobStoreBackupManagerMetrics;
+import org.apache.samza.storage.blobstore.util.BlobStoreUtil;
+import org.apache.samza.storage.blobstore.util.DirDiffUtil;
+import org.apache.samza.util.Clock;
+import org.apache.samza.util.FutureUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class BlobStoreBackupManager implements TaskBackupManager {
+  private static final Logger LOG = 
LoggerFactory.getLogger(BlobStoreBackupManager.class);
+
+  private final JobModel jobModel;
+  private final ExecutorService executor;
+  private final String jobName;
+  private final String jobId;
+  private final ContainerModel containerModel;
+  private final TaskModel taskModel;
+  private final String taskName;
+  private final Config config;
+  private final Clock clock;
+  private final StorageManagerUtil storageManagerUtil;
+  private final List<String> storesToBackup;
+  private final File loggedStoreBaseDir;
+  private final BlobStoreUtil blobStoreUtil;
+
+  private final BlobStoreBackupManagerMetrics metrics;
+
+  /**
+   * Map of store name to a Pair of blob id of {@link SnapshotIndex} and the 
corresponding {@link SnapshotIndex} from
+   * last successful task checkpoint or {@link #upload}.
+   *
+   * After {@link #init}, the map reflects the contents of the last completed 
checkpoint for the task from the previous
+   * deployment, if any.
+   *
+   * During regular processing, this map is updated after each successful 
{@link #upload} with the blob id of
+   * {@link SnapshotIndex} and the corresponding {@link SnapshotIndex} of the 
upload.
+   *
+   * The contents of this map are used to calculate the diff for local state 
between the last and the current checkpoint
+   * during {@link #upload}.
+   *
+   * Since the task commit process guarantees that the async stage of the 
previous commit is complete before another
+   * commit can start, this future is guaranteed to be complete in the call to 
{@link #upload} during the next commit.
+   *
+   * This field is non-final, since the future itself is replaced in its 
entirety after init/upload.
+   * The internal map contents are never directly modified (e.g. using puts). 
It's volatile to ensure visibility
+   * across threads since the map assignment may happen on a different thread 
than the one reading the contents.
+   */
+  private volatile CompletableFuture<Map<String, Pair<String, SnapshotIndex>>>
+      prevStoreSnapshotIndexesFuture;
+
+  public BlobStoreBackupManager(JobModel jobModel, ContainerModel 
containerModel, TaskModel taskModel,
+      ExecutorService backupExecutor, BlobStoreBackupManagerMetrics 
blobStoreTaskBackupMetrics, Config config,
+      Clock clock, File loggedStoreBaseDir, StorageManagerUtil 
storageManagerUtil, BlobStoreUtil blobStoreUtil) {

Review comment:
       no Preconditions ?

##########
File path: 
samza-core/src/main/java/org/apache/samza/storage/KafkaChangelogStateBackendFactory.java
##########
@@ -152,8 +152,8 @@ public TaskRestoreManager getRestoreManager(JobContext 
jobContext,
   }
 
   @Override
-  public TaskStorageAdmin getAdmin() {
-    throw new SamzaException("getAdmin() method not supported for 
KafkaStateBackendFactory");
+  public StateBackendAdmin getStateBackendAdmin(JobModel jobModel, Config 
config) {
+    return new NoOpKafkaChangelogStateBackendAdmin();

Review comment:
       @shekhars-li please cut a asf jira under my name and attach it in this 
todo here with details, otherwise we might forget about this

##########
File path: 
samza-api/src/main/java/org/apache/samza/storage/blobstore/exceptions/DeletedException.java
##########
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage.blobstore.exceptions;
+
+public class DeletedException extends RuntimeException {

Review comment:
       Should this not extend SamzaException 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to