Repository: kafka
Updated Branches:
  refs/heads/trunk f27a6f319 -> 896ad63f1


MINOR: Allow creation of statestore without logging enabled or explicit source 
topic

guozhangwang

Author: dan norwood <norw...@confluent.io>

Reviewers: Eno Thereska, Damian Guy, Guozhang Wang

Closes #1828 from norwood/manual-store


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/896ad63f
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/896ad63f
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/896ad63f

Branch: refs/heads/trunk
Commit: 896ad63f14c5240207daeedea304c62c8fc7bb7e
Parents: f27a6f3
Author: Dan Norwood <norw...@confluent.io>
Authored: Mon Sep 19 12:37:21 2016 -0700
Committer: Guozhang Wang <wangg...@gmail.com>
Committed: Mon Sep 19 12:37:21 2016 -0700

----------------------------------------------------------------------
 .../processor/internals/ProcessorStateManager.java   | 15 ++++++++++-----
 1 file changed, 10 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/896ad63f/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
index 5e5eaa9..51a50e9 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -133,13 +133,16 @@ public class ProcessorStateManager {
         }
         
         // check that the underlying change log topic exist or not
-        String topic;
+        String topic = null;
         if (loggingEnabled) {
             topic = storeChangelogTopic(this.applicationId, store.name());
         } else if (sourceStoreToSourceTopic != null && 
sourceStoreToSourceTopic.containsKey(store.name())) {
             topic = sourceStoreToSourceTopic.get(store.name());
-        } else {
-            throw new IllegalArgumentException(String.format("task [%s]  Store 
is neither built from source topic, nor has a changelog.", taskId));
+        }
+
+        if (topic == null) {
+            this.stores.put(store.name(), store);
+            return;
         }
 
         // block until the partition is ready for this state changelog topic 
or time has elapsed
@@ -167,12 +170,14 @@ public class ProcessorStateManager {
             }
         } while (partitionNotFound && System.currentTimeMillis() < startTime + 
waitTime);
 
-        if (partitionNotFound)
+        if (partitionNotFound) {
             throw new StreamsException(String.format("task [%s]  Store %s's 
change log (%s) does not contain partition %s", taskId, store.name(), topic, 
partition));
+        }
 
         if (isStandby) {
-            if (store.persistent())
+            if (store.persistent()) {
                 restoreCallbacks.put(topic, stateRestoreCallback);
+            }
         } else {
             restoreActiveState(topic, stateRestoreCallback);
         }

Reply via email to