This is an automated email from the ASF dual-hosted git repository.
pmaheshwari pushed a commit to branch state-backend-async-commit
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/state-backend-async-commit by
this push:
new c2b77f0 Fix StorageConfig bug from newly introduced BlobStoreRestore
factory config (#1503)
c2b77f0 is described below
commit c2b77f099d062b2c303be932dfe4386818c0a4ac
Author: shekhars-li <[email protected]>
AuthorDate: Fri May 28 10:16:53 2021 -0700
Fix StorageConfig bug from newly introduced BlobStoreRestore factory config
(#1503)
---
.../org/apache/samza/config/StorageConfig.java | 6 ++++--
.../storage/KafkaChangelogStateBackendFactory.java | 12 ++++++++++++
.../apache/samza/container/SamzaContainer.scala | 17 -----------------
.../org/apache/samza/config/TestStorageConfig.java | 22 ++++++++++++++++++++++
4 files changed, 38 insertions(+), 19 deletions(-)
diff --git
a/samza-core/src/main/java/org/apache/samza/config/StorageConfig.java
b/samza-core/src/main/java/org/apache/samza/config/StorageConfig.java
index aa8df95..7535d65 100644
--- a/samza-core/src/main/java/org/apache/samza/config/StorageConfig.java
+++ b/samza-core/src/main/java/org/apache/samza/config/StorageConfig.java
@@ -74,7 +74,8 @@ public class StorageConfig extends MapConfig {
KAFKA_STATE_BACKEND_FACTORY);
public static final String STORE_BACKUP_FACTORIES = STORE_PREFIX +
"%s.backup.factories";
// TODO BLOCKER dchen make this per store
- public static final String STORE_RESTORE_FACTORY = STORE_PREFIX +
"restore.factory";
+ public static final String RESTORE_FACTORY_SUFFIX = "restore.factory";
+ public static final String STORE_RESTORE_FACTORY = STORE_PREFIX +
RESTORE_FACTORY_SUFFIX;
static final String CHANGELOG_SYSTEM = "job.changelog.system";
static final String CHANGELOG_DELETE_RETENTION_MS = STORE_PREFIX +
"%s.changelog.delete.retention.ms";
@@ -100,7 +101,8 @@ public class StorageConfig extends MapConfig {
for (String key : subConfig.keySet()) {
if (key.endsWith(SIDE_INPUT_PROCESSOR_FACTORY_SUFFIX)) {
storeNames.add(key.substring(0, key.length() -
SIDE_INPUT_PROCESSOR_FACTORY_SUFFIX.length()));
- } else if (key.endsWith(FACTORY_SUFFIX)) {
+ } else if (key.endsWith(FACTORY_SUFFIX) &&
!key.equals(RESTORE_FACTORY_SUFFIX)) {
+ // TODO HIGH dchen STORE_RESTORE_FACTORY added here to be ignored.
Update/remove after changing restore factory -> factories
storeNames.add(key.substring(0, key.length() -
FACTORY_SUFFIX.length()));
}
}
diff --git
a/samza-core/src/main/java/org/apache/samza/storage/KafkaChangelogStateBackendFactory.java
b/samza-core/src/main/java/org/apache/samza/storage/KafkaChangelogStateBackendFactory.java
index 54f20da..be419b6 100644
---
a/samza-core/src/main/java/org/apache/samza/storage/KafkaChangelogStateBackendFactory.java
+++
b/samza-core/src/main/java/org/apache/samza/storage/KafkaChangelogStateBackendFactory.java
@@ -48,6 +48,18 @@ import org.apache.samza.util.Clock;
public class KafkaChangelogStateBackendFactory implements StateBackendFactory {
private StreamMetadataCache streamCache;
+ /*
+ * This keeps track of the changelog SSPs that are associated with the whole
container. This is used so that we can
+ * prefetch the metadata about the all of the changelog SSPs associated with
the container whenever we need the
+ * metadata about some of the changelog SSPs.
+ * An example use case is when Samza writes offset files for stores ({@link
TaskStorageManager}). Each task is
+ * responsible for its own offset file, but if we can do prefetching, then
most tasks will already have cached
+ * metadata by the time they need the offset metadata.
+ * Note: By using all changelog streams to build the sspsToPrefetch, any
fetches done for persisted stores will
+ * include the ssps for non-persisted stores, so this is slightly
suboptimal. However, this does not increase the
+ * actual number of calls to the {@link SystemAdmin}, and we can decouple
this logic from the per-task objects (e.g.
+ * {@link TaskStorageManager}).
+ */
private SSPMetadataCache sspCache;
@Override
diff --git
a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index 1e16928..b3360f2 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -346,23 +346,6 @@ object SamzaContainer extends Logging {
info("Got change log system streams: %s" format storeChangelogs)
- /*
- * This keeps track of the changelog SSPs that are associated with the
whole container. This is used so that we can
- * prefetch the metadata about the all of the changelog SSPs associated
with the container whenever we need the
- * metadata about some of the changelog SSPs.
- * An example use case is when Samza writes offset files for stores
({@link TaskStorageManager}). Each task is
- * responsible for its own offset file, but if we can do prefetching, then
most tasks will already have cached
- * metadata by the time they need the offset metadata.
- * Note: By using all changelog streams to build the sspsToPrefetch, any
fetches done for persisted stores will
- * include the ssps for non-persisted stores, so this is slightly
suboptimal. However, this does not increase the
- * actual number of calls to the {@link SystemAdmin}, and we can decouple
this logic from the per-task objects (e.g.
- * {@link TaskStorageManager}).
- */
- val changelogSSPMetadataCache = new SSPMetadataCache(systemAdmins,
- Duration.ofSeconds(5),
- SystemClock.instance,
- getChangelogSSPsForContainer(containerModel, storeChangelogs).asJava)
-
val intermediateStreams = streamConfig
.getStreamIds()
.asScala
diff --git
a/samza-core/src/test/java/org/apache/samza/config/TestStorageConfig.java
b/samza-core/src/test/java/org/apache/samza/config/TestStorageConfig.java
index ca7196a..91f1fa1 100644
--- a/samza-core/src/test/java/org/apache/samza/config/TestStorageConfig.java
+++ b/samza-core/src/test/java/org/apache/samza/config/TestStorageConfig.java
@@ -70,6 +70,28 @@ public class TestStorageConfig {
assertEquals(expectedStoreNames, ImmutableSet.copyOf(actual));
}
+ /**
+ * Test verifies that the {@link StorageConfig#STORE_RESTORE_FACTORY} which
matches pattern for store.%s.factory
+ * is not picked up as in store names list
+ */
+ @Test
+ public void testGetStoreNamesIgnoreStateRestoreFactory() {
+ // empty config, so no stores
+ assertEquals(Collections.emptyList(), new StorageConfig(new
MapConfig()).getStoreNames());
+
+ Set<String> expectedStoreNames = ImmutableSet.of(STORE_NAME0, STORE_NAME1);
+ // has stores
+ StorageConfig storageConfig = new StorageConfig(new MapConfig(
+ ImmutableMap.of(String.format(StorageConfig.FACTORY, STORE_NAME0),
"store0.factory.class",
+ String.format(StorageConfig.FACTORY, STORE_NAME1),
"store1.factory.class",
+ STORE_RESTORE_FACTORY, "org.apache.class")));
+
+ List<String> actual = storageConfig.getStoreNames();
+ // ordering shouldn't matter
+ assertEquals(2, actual.size());
+ assertEquals(expectedStoreNames, ImmutableSet.copyOf(actual));
+ }
+
@Test
public void testGetChangelogStream() {
// empty config, so no changelog stream