This is an automated email from the ASF dual-hosted git repository.

pmaheshwari pushed a commit to branch state-backend-async-commit
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/state-backend-async-commit by 
this push:
     new c2b77f0  Fix StorageConfig bug from newly introduced BlobStoreRestore 
factory config (#1503)
c2b77f0 is described below

commit c2b77f099d062b2c303be932dfe4386818c0a4ac
Author: shekhars-li <[email protected]>
AuthorDate: Fri May 28 10:16:53 2021 -0700

    Fix StorageConfig bug from newly introduced BlobStoreRestore factory config 
(#1503)
---
 .../org/apache/samza/config/StorageConfig.java     |  6 ++++--
 .../storage/KafkaChangelogStateBackendFactory.java | 12 ++++++++++++
 .../apache/samza/container/SamzaContainer.scala    | 17 -----------------
 .../org/apache/samza/config/TestStorageConfig.java | 22 ++++++++++++++++++++++
 4 files changed, 38 insertions(+), 19 deletions(-)

diff --git 
a/samza-core/src/main/java/org/apache/samza/config/StorageConfig.java 
b/samza-core/src/main/java/org/apache/samza/config/StorageConfig.java
index aa8df95..7535d65 100644
--- a/samza-core/src/main/java/org/apache/samza/config/StorageConfig.java
+++ b/samza-core/src/main/java/org/apache/samza/config/StorageConfig.java
@@ -74,7 +74,8 @@ public class StorageConfig extends MapConfig {
       KAFKA_STATE_BACKEND_FACTORY);
   public static final String STORE_BACKUP_FACTORIES = STORE_PREFIX + 
"%s.backup.factories";
   // TODO BLOCKER dchen make this per store
-  public static final String STORE_RESTORE_FACTORY = STORE_PREFIX + 
"restore.factory";
+  public static final String RESTORE_FACTORY_SUFFIX = "restore.factory";
+  public static final String STORE_RESTORE_FACTORY = STORE_PREFIX + 
RESTORE_FACTORY_SUFFIX;
 
   static final String CHANGELOG_SYSTEM = "job.changelog.system";
   static final String CHANGELOG_DELETE_RETENTION_MS = STORE_PREFIX + 
"%s.changelog.delete.retention.ms";
@@ -100,7 +101,8 @@ public class StorageConfig extends MapConfig {
     for (String key : subConfig.keySet()) {
       if (key.endsWith(SIDE_INPUT_PROCESSOR_FACTORY_SUFFIX)) {
         storeNames.add(key.substring(0, key.length() - 
SIDE_INPUT_PROCESSOR_FACTORY_SUFFIX.length()));
-      } else if (key.endsWith(FACTORY_SUFFIX)) {
+      } else if (key.endsWith(FACTORY_SUFFIX) && 
!key.equals(RESTORE_FACTORY_SUFFIX)) {
+        // TODO HIGH dchen STORE_RESTORE_FACTORY added here to be ignored. 
Update/remove after changing restore factory -> factories
         storeNames.add(key.substring(0, key.length() - 
FACTORY_SUFFIX.length()));
       }
     }
diff --git 
a/samza-core/src/main/java/org/apache/samza/storage/KafkaChangelogStateBackendFactory.java
 
b/samza-core/src/main/java/org/apache/samza/storage/KafkaChangelogStateBackendFactory.java
index 54f20da..be419b6 100644
--- 
a/samza-core/src/main/java/org/apache/samza/storage/KafkaChangelogStateBackendFactory.java
+++ 
b/samza-core/src/main/java/org/apache/samza/storage/KafkaChangelogStateBackendFactory.java
@@ -48,6 +48,18 @@ import org.apache.samza.util.Clock;
 
 public class KafkaChangelogStateBackendFactory implements StateBackendFactory {
   private StreamMetadataCache streamCache;
+  /*
+   * This keeps track of the changelog SSPs that are associated with the whole 
container. This is used so that we can
+   * prefetch the metadata about the all of the changelog SSPs associated with 
the container whenever we need the
+   * metadata about some of the changelog SSPs.
+   * An example use case is when Samza writes offset files for stores ({@link 
TaskStorageManager}). Each task is
+   * responsible for its own offset file, but if we can do prefetching, then 
most tasks will already have cached
+   * metadata by the time they need the offset metadata.
+   * Note: By using all changelog streams to build the sspsToPrefetch, any 
fetches done for persisted stores will
+   * include the ssps for non-persisted stores, so this is slightly 
suboptimal. However, this does not increase the
+   * actual number of calls to the {@link SystemAdmin}, and we can decouple 
this logic from the per-task objects (e.g.
+   * {@link TaskStorageManager}).
+   */
   private SSPMetadataCache sspCache;
 
   @Override
diff --git 
a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index 1e16928..b3360f2 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -346,23 +346,6 @@ object SamzaContainer extends Logging {
 
     info("Got change log system streams: %s" format storeChangelogs)
 
-    /*
-     * This keeps track of the changelog SSPs that are associated with the 
whole container. This is used so that we can
-     * prefetch the metadata about the all of the changelog SSPs associated 
with the container whenever we need the
-     * metadata about some of the changelog SSPs.
-     * An example use case is when Samza writes offset files for stores 
({@link TaskStorageManager}). Each task is
-     * responsible for its own offset file, but if we can do prefetching, then 
most tasks will already have cached
-     * metadata by the time they need the offset metadata.
-     * Note: By using all changelog streams to build the sspsToPrefetch, any 
fetches done for persisted stores will
-     * include the ssps for non-persisted stores, so this is slightly 
suboptimal. However, this does not increase the
-     * actual number of calls to the {@link SystemAdmin}, and we can decouple 
this logic from the per-task objects (e.g.
-     * {@link TaskStorageManager}).
-     */
-    val changelogSSPMetadataCache = new SSPMetadataCache(systemAdmins,
-      Duration.ofSeconds(5),
-      SystemClock.instance,
-      getChangelogSSPsForContainer(containerModel, storeChangelogs).asJava)
-
     val intermediateStreams = streamConfig
       .getStreamIds()
       .asScala
diff --git 
a/samza-core/src/test/java/org/apache/samza/config/TestStorageConfig.java 
b/samza-core/src/test/java/org/apache/samza/config/TestStorageConfig.java
index ca7196a..91f1fa1 100644
--- a/samza-core/src/test/java/org/apache/samza/config/TestStorageConfig.java
+++ b/samza-core/src/test/java/org/apache/samza/config/TestStorageConfig.java
@@ -70,6 +70,28 @@ public class TestStorageConfig {
     assertEquals(expectedStoreNames, ImmutableSet.copyOf(actual));
   }
 
+  /**
+   * Test verifies that the {@link StorageConfig#STORE_RESTORE_FACTORY} which 
matches pattern for store.%s.factory
+   * is not picked up as in store names list
+   */
+  @Test
+  public void testGetStoreNamesIgnoreStateRestoreFactory() {
+    // empty config, so no stores
+    assertEquals(Collections.emptyList(), new StorageConfig(new 
MapConfig()).getStoreNames());
+
+    Set<String> expectedStoreNames = ImmutableSet.of(STORE_NAME0, STORE_NAME1);
+    // has stores
+    StorageConfig storageConfig = new StorageConfig(new MapConfig(
+        ImmutableMap.of(String.format(StorageConfig.FACTORY, STORE_NAME0), 
"store0.factory.class",
+            String.format(StorageConfig.FACTORY, STORE_NAME1), 
"store1.factory.class",
+            STORE_RESTORE_FACTORY, "org.apache.class")));
+
+    List<String> actual = storageConfig.getStoreNames();
+    // ordering shouldn't matter
+    assertEquals(2, actual.size());
+    assertEquals(expectedStoreNames, ImmutableSet.copyOf(actual));
+  }
+
   @Test
   public void testGetChangelogStream() {
     // empty config, so no changelog stream

Reply via email to