dxichen commented on a change in pull request #1491:
URL: https://github.com/apache/samza/pull/1491#discussion_r627784231



##########
File path: 
samza-api/src/main/java/org/apache/samza/storage/KafkaChangelogRestoreParams.java
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+import java.util.Map;
+import java.util.Set;
+import org.apache.samza.serializers.Serde;
+import org.apache.samza.system.SystemAdmin;
+import org.apache.samza.system.SystemConsumer;
+import org.apache.samza.task.MessageCollector;
+
+/**
+ * Provides the required for Kafka Changelog restore managers
+ */
+public class KafkaChangelogRestoreParams {
+  private final Map<String, SystemConsumer> storeConsumers;
+  private final Map<String, StorageEngine> inMemoryStores;
+  private final Map<String, SystemAdmin> systemAdmins;
+  private final Map<String, StorageEngineFactory<Object, Object>> 
storageEngineFactories;
+  private final Map<String, Serde<Object>> serdes;
+  private final MessageCollector collector;
+  private final Set<String> storeNames;
+
+  public KafkaChangelogRestoreParams(
+      Map<String, SystemConsumer> storeConsumers,
+      Map<String, StorageEngine> inMemoryStores,
+      Map<String, SystemAdmin> systemAdmins,
+      Map<String, StorageEngineFactory<Object, Object>> storageEngineFactories,
+      Map<String, Serde<Object>> serdes,
+      MessageCollector collector,
+      Set<String> storeNames) {
+    this.storeConsumers = storeConsumers;
+    this.inMemoryStores = inMemoryStores;
+    this.systemAdmins = systemAdmins;
+    this.storageEngineFactories = storageEngineFactories;
+    this.serdes = serdes;
+    this.collector = collector;
+    this.storeNames = storeNames;

Review comment:
       These are only the stores that are required to be restored filtered by 
containerStorageManager and should be a subset of the factories keyset

##########
File path: 
samza-core/src/main/java/org/apache/samza/storage/TransactionalStateTaskRestoreManager.java
##########
@@ -557,6 +596,60 @@ private static void validateRestoreOffsets(RestoreOffsets 
restoreOffsets, System
     }
   }
 
+  private Map<String, KafkaStateCheckpointMarker> 
getCheckpointedChangelogOffsets(Checkpoint checkpoint) {

Review comment:
       Yep, the stores all non-side input, and since this class is evoked at 
the task level the store name could be used to infer the changelog ssp (from 
<store, systemSteam> map, and task changelog partition)

##########
File path: 
samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
##########
@@ -357,25 +374,35 @@ public ContainerStorageManager(
   }
 
   private static Map<String, SystemConsumer> createStoreIndexedMap(Map<String, 
SystemStream> changelogSystemStreams,
-      Map<String, SystemConsumer> storeSystemConsumers) {
+      Map<String, SystemConsumer> systemNameToSystemConsumers) {
     // Map of each storeName to its respective systemConsumer
     Map<String, SystemConsumer> storeConsumers = new HashMap<>();
 
     // Populate the map of storeName to its relevant systemConsumer
     for (String storeName : changelogSystemStreams.keySet()) {
-      storeConsumers.put(storeName, 
storeSystemConsumers.get(changelogSystemStreams.get(storeName).getSystem()));
+      storeConsumers.put(storeName, 
systemNameToSystemConsumers.get(changelogSystemStreams.get(storeName).getSystem()));
     }
     return storeConsumers;
   }
 
-  private Map<TaskName, TaskRestoreManager> 
createTaskRestoreManagers(SystemAdmins systemAdmins, Clock clock, 
SamzaContainerMetrics samzaContainerMetrics) {
+  private Map<TaskName, TaskRestoreManager> 
createTaskRestoreManagers(StateBackendFactory factory, Clock clock,
+      SamzaContainerMetrics samzaContainerMetrics) {
     Map<TaskName, TaskRestoreManager> taskRestoreManagers = new HashMap<>();
+
     containerModel.getTasks().forEach((taskName, taskModel) -> {
+      MetricsRegistry taskMetricsRegistry =
+          taskInstanceMetrics.get(taskName) != null ? 
taskInstanceMetrics.get(taskName).registry() : new MetricsRegistryMap();
+      Set<String> nonSideInputStoreNames = 
storageEngineFactories.keySet().stream()
+          .filter(storeName -> !sideInputStoreNames.contains(storeName))
+          .collect(Collectors.toSet());
+      KafkaChangelogRestoreParams kafkaChangelogRestoreParams = new 
KafkaChangelogRestoreParams(storeConsumers,
+          inMemoryStores.get(taskName), systemAdmins.getSystemAdmins(), 
storageEngineFactories, serdes,
+          taskInstanceCollectors.get(taskName), nonSideInputStoreNames);
+
       taskRestoreManagers.put(taskName,
-          TaskRestoreManagerFactory.create(
-              taskModel, changelogSystemStreams, 
getNonSideInputStores(taskName), systemAdmins,
-              streamMetadataCache, sspMetadataCache, storeConsumers, 
maxChangeLogStreamPartitions,
-              loggedStoreBaseDirectory, nonLoggedStoreBaseDirectory, config, 
clock));
+          factory.getRestoreManager(jobContext, containerContext, taskModel, 
restoreExecutor,
+              taskMetricsRegistry, config, clock, loggedStoreBaseDirectory, 
nonLoggedStoreBaseDirectory,
+              kafkaChangelogRestoreParams));

Review comment:
       The streamMetadataCache is now created in the 
KafkaChangelogBackendFactory in the getRestoreManager call for state restore 
management

##########
File path: samza-core/src/main/java/org/apache/samza/system/SystemAdmins.java
##########
@@ -43,6 +43,10 @@ public SystemAdmins(Config config, String adminLabel) {
     this.systemAdminMap = systemConfig.getSystemAdmins(adminLabel);
   }
 
+  public SystemAdmins(Map<String, SystemAdmin> systemAdminMap) {
+    this.systemAdminMap = systemAdminMap;
+  }
+

Review comment:
       This is not just for testing and is used in 
KafkaChangelogStateBackendFactory for reconstructing the systemsAdmins class 
(since it was passed throught stateBackendFactory from samza-api)

##########
File path: 
samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
##########
@@ -388,106 +415,77 @@ public ContainerStorageManager(
   }
 
   /**
-   * Create taskStores for all stores in storageEngineFactories.
-   * The store mode is chosen as bulk-load if its a non-sideinput store, and 
readWrite if its a sideInput store
+   * Create taskStores for all stores in storesToCreate.
+   * The store mode is chosen as read-write mode.
    */
-  private Map<TaskName, Map<String, StorageEngine>> 
createTaskStores(ContainerModel containerModel, JobContext jobContext, 
ContainerContext containerContext,
+  private Map<TaskName, Map<String, StorageEngine>> 
createTaskStores(Set<String> storesToCreate,

Review comment:
       The bulk mode is only for changelog specific, since it is required 
during the initial load, that logic has been moved to the respective restore 
managers:
   TransactionalRestoreManager will be responsible to create the stores in BULK 
mode, restore the checkpoints for the stores, then close those stores. The 
stores and recreated again here after that restore process.
   
   For the blobStoreRestoreManager, we are join moving the files in the store 
phase so BULK stores will never be created




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to