dxichen commented on a change in pull request #1491: URL: https://github.com/apache/samza/pull/1491#discussion_r627784231
########## File path: samza-api/src/main/java/org/apache/samza/storage/KafkaChangelogRestoreParams.java ########## @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.storage; + +import java.util.Map; +import java.util.Set; +import org.apache.samza.serializers.Serde; +import org.apache.samza.system.SystemAdmin; +import org.apache.samza.system.SystemConsumer; +import org.apache.samza.task.MessageCollector; + +/** + * Provides the required for Kafka Changelog restore managers + */ +public class KafkaChangelogRestoreParams { + private final Map<String, SystemConsumer> storeConsumers; + private final Map<String, StorageEngine> inMemoryStores; + private final Map<String, SystemAdmin> systemAdmins; + private final Map<String, StorageEngineFactory<Object, Object>> storageEngineFactories; + private final Map<String, Serde<Object>> serdes; + private final MessageCollector collector; + private final Set<String> storeNames; + + public KafkaChangelogRestoreParams( + Map<String, SystemConsumer> storeConsumers, + Map<String, StorageEngine> inMemoryStores, + Map<String, SystemAdmin> systemAdmins, + Map<String, StorageEngineFactory<Object, Object>> storageEngineFactories, + Map<String, Serde<Object>> serdes, + MessageCollector collector, + Set<String> storeNames) { + this.storeConsumers = storeConsumers; + this.inMemoryStores = inMemoryStores; + this.systemAdmins = systemAdmins; + this.storageEngineFactories = storageEngineFactories; + this.serdes = serdes; + this.collector = collector; + this.storeNames = storeNames; Review comment: These are only the stores that are required to be restored filtered by containerStorageManager and should be a subset of the factories keyset ########## File path: samza-core/src/main/java/org/apache/samza/storage/TransactionalStateTaskRestoreManager.java ########## @@ -557,6 +596,60 @@ private static void validateRestoreOffsets(RestoreOffsets restoreOffsets, System } } + private Map<String, KafkaStateCheckpointMarker> getCheckpointedChangelogOffsets(Checkpoint checkpoint) { Review comment: Yep, the stores all non-side input, and since this class is evoked at the task level the store name could be used to infer the changelog ssp (from <store, systemSteam> map, and task changelog partition) ########## File path: samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java ########## @@ -357,25 +374,35 @@ public ContainerStorageManager( } private static Map<String, SystemConsumer> createStoreIndexedMap(Map<String, SystemStream> changelogSystemStreams, - Map<String, SystemConsumer> storeSystemConsumers) { + Map<String, SystemConsumer> systemNameToSystemConsumers) { // Map of each storeName to its respective systemConsumer Map<String, SystemConsumer> storeConsumers = new HashMap<>(); // Populate the map of storeName to its relevant systemConsumer for (String storeName : changelogSystemStreams.keySet()) { - storeConsumers.put(storeName, storeSystemConsumers.get(changelogSystemStreams.get(storeName).getSystem())); + storeConsumers.put(storeName, systemNameToSystemConsumers.get(changelogSystemStreams.get(storeName).getSystem())); } return storeConsumers; } - private Map<TaskName, TaskRestoreManager> createTaskRestoreManagers(SystemAdmins systemAdmins, Clock clock, SamzaContainerMetrics samzaContainerMetrics) { + private Map<TaskName, TaskRestoreManager> createTaskRestoreManagers(StateBackendFactory factory, Clock clock, + SamzaContainerMetrics samzaContainerMetrics) { Map<TaskName, TaskRestoreManager> taskRestoreManagers = new HashMap<>(); + containerModel.getTasks().forEach((taskName, taskModel) -> { + MetricsRegistry taskMetricsRegistry = + taskInstanceMetrics.get(taskName) != null ? taskInstanceMetrics.get(taskName).registry() : new MetricsRegistryMap(); + Set<String> nonSideInputStoreNames = storageEngineFactories.keySet().stream() + .filter(storeName -> !sideInputStoreNames.contains(storeName)) + .collect(Collectors.toSet()); + KafkaChangelogRestoreParams kafkaChangelogRestoreParams = new KafkaChangelogRestoreParams(storeConsumers, + inMemoryStores.get(taskName), systemAdmins.getSystemAdmins(), storageEngineFactories, serdes, + taskInstanceCollectors.get(taskName), nonSideInputStoreNames); + taskRestoreManagers.put(taskName, - TaskRestoreManagerFactory.create( - taskModel, changelogSystemStreams, getNonSideInputStores(taskName), systemAdmins, - streamMetadataCache, sspMetadataCache, storeConsumers, maxChangeLogStreamPartitions, - loggedStoreBaseDirectory, nonLoggedStoreBaseDirectory, config, clock)); + factory.getRestoreManager(jobContext, containerContext, taskModel, restoreExecutor, + taskMetricsRegistry, config, clock, loggedStoreBaseDirectory, nonLoggedStoreBaseDirectory, + kafkaChangelogRestoreParams)); Review comment: The streamMetadataCache is now created in the KafkaChangelogBackendFactory in the getRestoreManager call for state restore management ########## File path: samza-core/src/main/java/org/apache/samza/system/SystemAdmins.java ########## @@ -43,6 +43,10 @@ public SystemAdmins(Config config, String adminLabel) { this.systemAdminMap = systemConfig.getSystemAdmins(adminLabel); } + public SystemAdmins(Map<String, SystemAdmin> systemAdminMap) { + this.systemAdminMap = systemAdminMap; + } + Review comment: This is not just for testing and is used in KafkaChangelogStateBackendFactory for reconstructing the systemsAdmins class (since it was passed throught stateBackendFactory from samza-api) ########## File path: samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java ########## @@ -388,106 +415,77 @@ public ContainerStorageManager( } /** - * Create taskStores for all stores in storageEngineFactories. - * The store mode is chosen as bulk-load if its a non-sideinput store, and readWrite if its a sideInput store + * Create taskStores for all stores in storesToCreate. + * The store mode is chosen as read-write mode. */ - private Map<TaskName, Map<String, StorageEngine>> createTaskStores(ContainerModel containerModel, JobContext jobContext, ContainerContext containerContext, + private Map<TaskName, Map<String, StorageEngine>> createTaskStores(Set<String> storesToCreate, Review comment: The bulk mode is only for changelog specific, since it is required during the initial load, that logic has been moved to the respective restore managers: TransactionalRestoreManager will be responsible to create the stores in BULK mode, restore the checkpoints for the stores, then close those stores. The stores and recreated again here after that restore process. For the blobStoreRestoreManager, we are join moving the files in the store phase so BULK stores will never be created -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
