dxichen commented on a change in pull request #1429:
URL: https://github.com/apache/samza/pull/1429#discussion_r578000163



##########
File path: 
samza-api/src/main/java/org/apache/samza/storage/TaskBackupManager.java
##########
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+import org.apache.samza.checkpoint.CheckpointId;
+import org.apache.samza.checkpoint.StateCheckpointMarker;
+
+/**
+ * <p>
+ * TaskBackupManager is the interface that must be implemented for
+ * any remote system that Samza persists its state to. The interface will be
+ * evoked in the following way:
+ * </p>
+ *
+ * <ul>
+ *   <li>Snapshot will be called before Upload.</li>
+ *   <li>persistToFilesystem will be called after Upload is completed</li>
+ *   <li>Cleanup is only called after Upload and persistToFilesystem has 
successfully completed</li>
+ * </ul>
+ */
+public interface TaskBackupManager {
+
+  /**
+   * Initiates the TaskBackupManagerIntance
+   */
+  default void init() {}

Review comment:
       The default is because currently the KafkaBackupManagers do not need to 
be initiated, so I don't think the BackupManager should be forced to implement 
this

##########
File path: samza-api/src/main/java/org/apache/samza/checkpoint/Checkpoint.java
##########
@@ -30,22 +33,53 @@
  * of restarting a failed container within a running job.
  */
 public class Checkpoint {
-  private final Map<SystemStreamPartition, String> offsets;
+  private static final short PROTOCOL_VERSION = 1;
+  private final CheckpointId checkpointId;
+  private final Map<SystemStreamPartition, String> inputOffsets;
+  private final Map<String, List<StateCheckpointMarker>> stateCheckpoints;
 
   /**
-   * Constructs a new checkpoint based off a map of Samza stream offsets.
-   * @param offsets Map of Samza streams to their current offset.
+   * Constructs a new checkpoint based off a map of Samza stream offsets, 
using a default checkpoint id
+   * @param inputOffsets Map of Samza streams to their current offset.
    */
-  public Checkpoint(Map<SystemStreamPartition, String> offsets) {
-    this.offsets = offsets;
+  public Checkpoint(Map<SystemStreamPartition, String>  inputOffsets) {
+    this(CheckpointId.getPlaceholderCheckpointId(), inputOffsets, new 
HashMap<>());

Review comment:
       This is useful when we are using the new reading scheme which looks for 
the checkpoint id on every checkpoint.

##########
File path: 
samza-kafka/src/main/scala/org/apache/samza/storage/KafkaNonTransactionalStateTaskBackupManager.scala
##########
@@ -20,62 +20,70 @@
 package org.apache.samza.storage
 
 import java.io._
+import java.util
+import java.util.concurrent.CompletableFuture
 
 import com.google.common.annotations.VisibleForTesting
 import com.google.common.collect.ImmutableSet
-import org.apache.samza.checkpoint.CheckpointId
+import org.apache.samza.checkpoint.kafka.KafkaStateCheckpointMarker
+import org.apache.samza.checkpoint.{CheckpointId, StateCheckpointMarker}
 import org.apache.samza.container.TaskName
 import org.apache.samza.job.model.TaskMode
 import org.apache.samza.system._
 import org.apache.samza.util.Logging
-import org.apache.samza.util.ScalaJavaUtil.JavaOptionals
 import org.apache.samza.{Partition, SamzaException}
 
+import scala.collection.JavaConversions.mapAsScalaMap
 import scala.collection.JavaConverters._
+import scala.collection.mutable
 
 /**
  * Manage all the storage engines for a given task
  */
-class NonTransactionalStateTaskStorageManager(
+class KafkaNonTransactionalStateTaskBackupManager(
   taskName: TaskName,
-  containerStorageManager: ContainerStorageManager,
-  storeChangelogs: Map[String, SystemStream] = Map(),
+  taskStores: util.Map[String, StorageEngine],
+  storeChangelogs: util.Map[String, SystemStream] = new util.HashMap[String, 
SystemStream](),
   systemAdmins: SystemAdmins,
   loggedStoreBaseDir: File = new File(System.getProperty("user.dir"), "state"),
-  partition: Partition) extends Logging with TaskStorageManager {
+  partition: Partition) extends Logging with TaskBackupManager {
 
   private val storageManagerUtil = new StorageManagerUtil
-  private val persistedStores = 
containerStorageManager.getAllStores(taskName).asScala
+  private val persistedStores = taskStores.asScala
     .filter { case (storeName, storageEngine) => 
storageEngine.getStoreProperties.isPersistedToDisk }
 
-  def getStore(storeName: String): Option[StorageEngine] =  
JavaOptionals.toRichOptional(containerStorageManager.getStore(taskName, 
storeName)).toOption
-
-  def flush(): Map[SystemStreamPartition, Option[String]] = {
+  override def snapshot(checkpointId: CheckpointId): util.Map[String, 
StateCheckpointMarker] = {
     debug("Flushing stores.")
-    
containerStorageManager.getAllStores(taskName).asScala.values.foreach(_.flush)
+    taskStores.asScala.values.foreach(_.flush)
     val newestChangelogSSPOffsets = getNewestChangelogSSPOffsets()
-    writeChangelogOffsetFiles(newestChangelogSSPOffsets)
+    writeChangelogOffsetFiles(KafkaStateCheckpointMarker
+      .stateCheckpointMarkerToSSPmap(newestChangelogSSPOffsets))
     newestChangelogSSPOffsets
   }
 
-  override def checkpoint(checkpointId: CheckpointId,
-    newestChangelogOffsets: Map[SystemStreamPartition, Option[String]]): Unit 
= {}
+  override def upload(checkpointId: CheckpointId,
+    stateCheckpointMarkers: util.Map[String, StateCheckpointMarker]): 
CompletableFuture[util.Map[String, StateCheckpointMarker]] = {
+     CompletableFuture.completedFuture(stateCheckpointMarkers)
+  }
+
+  override def persistToFilesystem(checkpointId: CheckpointId,

Review comment:
       it was originally in the snapshot call above, moved it to this method

##########
File path: 
samza-api/src/main/java/org/apache/samza/checkpoint/KafkaStateChangelogOffset.java
##########
@@ -26,57 +26,58 @@
  * Checkpointed changelog offset has the format: [checkpointId, offset], 
separated by a colon.
  */
 @InterfaceStability.Unstable
-public class CheckpointedChangelogOffset {
+public class KafkaStateChangelogOffset {

Review comment:
       This is kept for backwards compatibility, the old checkpoints in the 
kafka topic will be using this format

##########
File path: 
samza-kafka/src/main/java/org/apache/samza/storage/KafkaChangelogStateBackendFactory.java
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+import java.io.File;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.StorageConfig;
+import org.apache.samza.config.TaskConfig;
+import org.apache.samza.container.SamzaContainer;
+import org.apache.samza.job.model.ContainerModel;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.job.model.TaskModel;
+import org.apache.samza.system.SSPMetadataCache;
+import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemAdmins;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.Clock;
+
+
+public class KafkaChangelogStateBackendFactory implements StateBackendFactory {
+  private static StreamMetadataCache streamCache;
+  private static SSPMetadataCache sspCache;
+
+  private static StreamMetadataCache getStreamCache(SystemAdmins admins, Clock 
clock) {
+    if (streamCache == null) {
+      streamCache = new StreamMetadataCache(admins, 5000, clock);

Review comment:
       Good point, since this is called for the SamzaContainer from a single 
thread, it does not need to be thread safe. May change with further pathces

##########
File path: 
samza-kafka/src/main/java/org/apache/samza/storage/KafkaChangelogStateBackendFactory.java
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+import java.io.File;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.StorageConfig;
+import org.apache.samza.config.TaskConfig;
+import org.apache.samza.container.SamzaContainer;
+import org.apache.samza.job.model.ContainerModel;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.job.model.TaskModel;
+import org.apache.samza.system.SSPMetadataCache;
+import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemAdmins;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.Clock;
+
+
+public class KafkaChangelogStateBackendFactory implements StateBackendFactory {
+  private static StreamMetadataCache streamCache;
+  private static SSPMetadataCache sspCache;
+
+  private static StreamMetadataCache getStreamCache(SystemAdmins admins, Clock 
clock) {
+    if (streamCache == null) {
+      streamCache = new StreamMetadataCache(admins, 5000, clock);
+    }
+    return streamCache;
+  }
+
+  private static SSPMetadataCache getSspCache(SystemAdmins admins, Clock 
clock, Set<SystemStreamPartition> ssps) {
+    if (sspCache == null) {
+      sspCache = new SSPMetadataCache(admins, Duration.ofSeconds(5), clock, 
ssps);
+    }
+    return sspCache;
+  }
+
+  @Override
+  public TaskBackupManager getBackupManager(JobModel jobModel, ContainerModel 
containerModel, TaskModel taskModel,
+      Map<String, StorageEngine> taskStores, Config config, Clock clock) {
+    SystemAdmins systemAdmins = new SystemAdmins(config);
+    StorageConfig storageConfig = new StorageConfig(config);
+    Map<String, SystemStream> storeChangelogs = 
storageConfig.getStoreChangelogs();
+
+    File defaultFileDir = new File(System.getProperty("user.dir"), "state");
+    File loggedStoreBaseDir = SamzaContainer.getLoggedStorageBaseDir(new 
JobConfig(config), defaultFileDir);
+
+    if (new TaskConfig(config).getTransactionalStateCheckpointEnabled()) {
+      return new 
KafkaTransactionalStateTaskBackupManager(taskModel.getTaskName(),
+          taskStores, storeChangelogs, systemAdmins, loggedStoreBaseDir, 
taskModel.getChangelogPartition(),
+          taskModel.getTaskMode(), new StorageManagerUtil());
+    } else {
+      return new 
KafkaNonTransactionalStateTaskBackupManager(taskModel.getTaskName(),
+          taskStores, storeChangelogs, systemAdmins, loggedStoreBaseDir, 
taskModel.getChangelogPartition());
+    }
+  }
+
+  @Override
+  public TaskRestoreManager getRestoreManager(JobModel jobModel, 
ContainerModel containerModel, TaskModel taskModel, Map<String, StorageEngine> 
taskStores, Config config, Clock clock) {
+    SystemAdmins systemAdmins = new SystemAdmins(config);
+    Map<String, SystemStream> storeChangelogs = new 
StorageConfig(config).getStoreChangelogs();
+    Map<String, SystemStream> filteredStoreChangelogs = ContainerStorageManager
+        .getChangelogSystemStreams(containerModel, storeChangelogs, null);
+
+    File defaultFileDir = new File(System.getProperty("user.dir"), "state");
+    File loggedStoreBaseDir = SamzaContainer.getLoggedStorageBaseDir(new 
JobConfig(config), defaultFileDir);
+    File nonLoggedStoreBaseDir = SamzaContainer.getNonLoggedStorageBaseDir(new 
JobConfig(config), defaultFileDir);
+
+    if (new TaskConfig(config).getTransactionalStateRestoreEnabled()) {
+      return new TransactionalStateTaskRestoreManager(
+          taskModel,
+          taskStores,
+          filteredStoreChangelogs,
+          systemAdmins,
+          null, // TODO @dchen have the restore managers create and manage 
Kafka consume lifecycle
+          KafkaChangelogStateBackendFactory
+              .getSspCache(systemAdmins, clock, Collections.emptySet()),
+          loggedStoreBaseDir,
+          nonLoggedStoreBaseDir,
+          config,
+          clock
+      );
+    } else {
+      return new NonTransactionalStateTaskRestoreManager(
+          taskModel,
+          filteredStoreChangelogs,
+          taskStores,
+          systemAdmins,
+          KafkaChangelogStateBackendFactory.getStreamCache(systemAdmins, 
clock),
+          null, // TODO  @dchen have the restore managers create and manage 
Kafka consume lifecycle

Review comment:
       This path is not yet used, but will eventually need to when the restore 
managers are refactored

##########
File path: 
samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
##########
@@ -224,8 +228,8 @@ class TaskInstance(
     val allCheckpointOffsets = new java.util.HashMap[SystemStreamPartition, 
String]()
     val inputCheckpoint = offsetManager.buildCheckpoint(taskName)
     if (inputCheckpoint != null) {
-      trace("Got input offsets for taskName: %s as: %s" format(taskName, 
inputCheckpoint.getOffsets))
-      allCheckpointOffsets.putAll(inputCheckpoint.getOffsets)
+      trace("Got input offsets for taskName: %s as: %s" format(taskName, 
inputCheckpoint.getInputOffsets))
+      allCheckpointOffsets.putAll(inputCheckpoint.getInputOffsets)

Review comment:
       removed allCheckpointOffsets

##########
File path: 
samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
##########
@@ -73,8 +76,9 @@ class TaskInstance(
 
   private val kvStoreSupplier = ScalaJavaUtil.toJavaFunction(
     (storeName: String) => {
-      if (storageManager != null && 
storageManager.getStore(storeName).isDefined) {
-        storageManager.getStore(storeName).get.asInstanceOf[KeyValueStore[_, 
_]]
+      if (containerStorageManager != null) {
+        val storeOption = 
JavaOptionals.toRichOptional(containerStorageManager.getStore(taskName, 
storeName)).toOption

Review comment:
       removed java optional




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to