[ 
https://issues.apache.org/jira/browse/KAFKA-5660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16388870#comment-16388870
 ] 

ASF GitHub Bot commented on KAFKA-5660:
---------------------------------------

guozhangwang closed pull request #4645: KAFKA-5660 Don't throw 
TopologyBuilderException during runtime
URL: https://github.com/apache/kafka/pull/4645
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
index 317581a6beb..42d3d70e396 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
@@ -22,6 +22,7 @@
 import org.apache.kafka.streams.processor.PunctuationType;
 import org.apache.kafka.streams.processor.Punctuator;
 import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.state.internals.ThreadCache;
 
@@ -54,13 +55,12 @@ public RecordCollector recordCollector() {
     }
 
     /**
-     * @throws org.apache.kafka.streams.errors.TopologyBuilderException if an 
attempt is made to access this state store from an unknown node
+     * @throws StreamsException if an attempt is made to access this state 
store from an unknown node
      */
-    @SuppressWarnings("deprecation")
     @Override
     public StateStore getStateStore(final String name) {
         if (currentNode() == null) {
-            throw new 
org.apache.kafka.streams.errors.TopologyBuilderException("Accessing from an 
unknown node");
+            throw new StreamsException("Accessing from an unknown node");
         }
 
         final StateStore global = stateManager.getGlobalStore(name);
@@ -69,7 +69,7 @@ public StateStore getStateStore(final String name) {
         }
 
         if (!currentNode().stateStores.contains(name)) {
-            throw new 
org.apache.kafka.streams.errors.TopologyBuilderException("Processor " + 
currentNode().name() + " has no access to StateStore " + name);
+            throw new StreamsException("Processor " + currentNode().name() + " 
has no access to StateStore " + name);
         }
 
         return stateManager.getStore(name);
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
index 9aa0e94c8c1..d4e7a5d5c62 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
@@ -28,6 +28,7 @@
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.errors.TaskAssignmentException;
 import org.apache.kafka.streams.processor.PartitionGrouper;
 import org.apache.kafka.streams.processor.TaskId;
@@ -644,7 +645,7 @@ private void prepareTopic(final Map<String, 
InternalTopicMetadata> topicPartitio
                 continue;
             }
             if (numPartitions < 0) {
-                throw new 
org.apache.kafka.streams.errors.TopologyBuilderException(String.format("%sTopic 
[%s] number of partitions not defined", logPrefix, topic.name()));
+                throw new StreamsException(String.format("%sTopic [%s] number 
of partitions not defined", logPrefix, topic.name()));
             }
 
             topic.setNumberOfPartitions(numPartitions);
diff --git a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java 
b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
index 992ffd818ff..68340910c2b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
@@ -17,7 +17,6 @@
 package org.apache.kafka.streams;
 
 import org.apache.kafka.streams.errors.StreamsException;
-import org.apache.kafka.streams.errors.TopologyBuilderException;
 import org.apache.kafka.streams.errors.TopologyException;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
@@ -252,7 +251,7 @@ public void shouldNotAllowToAddStoreWithSameName() {
         } catch (final TopologyException expected) { }
     }
 
-    @Test(expected = TopologyBuilderException.class)
+    @Test
     public void shouldThrowOnUnassignedStateStoreAccess() throws Exception {
         final String sourceNodeName = "source";
         final String goodNodeName = "goodGuy";
@@ -276,15 +275,12 @@ public void shouldThrowOnUnassignedStateStoreAccess() 
throws Exception {
 
         try {
             new ProcessorTopologyTestDriver(streamsConfig, 
topology.internalTopologyBuilder);
+            fail("Should have thrown StreamsException");
         } catch (final StreamsException e) {
-            final Throwable cause = e.getCause();
-            if (cause != null
-                && cause instanceof TopologyBuilderException
-                && cause.getMessage().equals("Invalid topology building: 
Processor " + badNodeName + " has no access to StateStore " + 
LocalMockProcessorSupplier.STORE_NAME)) {
-                throw (TopologyBuilderException) cause;
-            } else {
-                throw new RuntimeException("Did expect different exception. 
Did catch:", e);
-            }
+            final String error = e.toString();
+            final String expectedMessage = 
"org.apache.kafka.streams.errors.StreamsException: failed to initialize 
processor " + badNodeName;
+            
+            assertThat(error, equalTo(expectedMessage));
         }
     }
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
index 7a815944ecd..f67b6341f8d 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
@@ -51,6 +51,7 @@
 
 import static org.apache.kafka.common.utils.Utils.mkList;
 import static org.apache.kafka.common.utils.Utils.mkSet;
+import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.core.IsInstanceOf.instanceOf;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -590,8 +591,7 @@ public void 
shouldAddInternalTopicConfigForRepartitionTopics() {
         assertEquals("appId-foo", topicConfig.name());
     }
 
-
-    @Test(expected = TopologyBuilderException.class)
+    @Test
     public void shouldThroughOnUnassignedStateStoreAccess() throws Exception {
         final String sourceNodeName = "source";
         final String goodNodeName = "goodGuy";
@@ -603,27 +603,24 @@ public void shouldThroughOnUnassignedStateStoreAccess() 
throws Exception {
         config.put(StreamsConfig.STATE_DIR_CONFIG, 
TestUtils.tempDirectory().getAbsolutePath());
         final StreamsConfig streamsConfig = new StreamsConfig(config);
 
-        try {
-            final TopologyBuilder builder = new TopologyBuilder();
-            builder
-                .addSource(sourceNodeName, "topic")
-                .addProcessor(goodNodeName, new LocalMockProcessorSupplier(), 
sourceNodeName)
+        final TopologyBuilder builder = new TopologyBuilder();
+        builder.addSource(sourceNodeName, "topic")
+                .addProcessor(goodNodeName, new LocalMockProcessorSupplier(),
+                        sourceNodeName)
                 .addStateStore(
-                    
Stores.create(LocalMockProcessorSupplier.STORE_NAME).withStringKeys().withStringValues().inMemory().build(),
-                    goodNodeName)
-                .addProcessor(badNodeName, new LocalMockProcessorSupplier(), 
sourceNodeName);
-
+                        Stores.create(LocalMockProcessorSupplier.STORE_NAME)
+                                .withStringKeys().withStringValues().inMemory()
+                                .build(), goodNodeName)
+                .addProcessor(badNodeName, new LocalMockProcessorSupplier(),
+                        sourceNodeName);     
+        try {
             final ProcessorTopologyTestDriver driver = new 
ProcessorTopologyTestDriver(streamsConfig, builder.internalTopologyBuilder);
-            driver.process("topic", null, null);
+            fail("Should have thrown StreamsException");
         } catch (final StreamsException e) {
-            final Throwable cause = e.getCause();
-            if (cause != null
-                && cause instanceof TopologyBuilderException
-                && cause.getMessage().equals("Invalid topology building: 
Processor " + badNodeName + " has no access to StateStore " + 
LocalMockProcessorSupplier.STORE_NAME)) {
-                throw (TopologyBuilderException) cause;
-            } else {
-                throw new RuntimeException("Did expect different exception. 
Did catch:", e);
-            }
+            final String error = e.toString();
+            final String expectedMessage = 
"org.apache.kafka.streams.errors.StreamsException: failed to initialize 
processor " + badNodeName;
+
+            assertThat(error, equalTo(expectedMessage));
         }
     }
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
index a39e545f513..901fc4b7e0a 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
@@ -23,7 +23,6 @@
 import org.apache.kafka.streams.Topology;
 import org.apache.kafka.streams.TopologyDescription;
 import org.apache.kafka.streams.errors.StreamsException;
-import org.apache.kafka.streams.errors.TopologyBuilderException;
 import org.apache.kafka.streams.errors.TopologyException;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
@@ -53,7 +52,9 @@
 
 import static org.apache.kafka.common.utils.Utils.mkList;
 import static org.apache.kafka.common.utils.Utils.mkSet;
+import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.core.IsInstanceOf.instanceOf;
+
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertThat;
@@ -578,17 +579,15 @@ public void shouldThrowOnUnassignedStateStoreAccess() {
             
Stores.create(LocalMockProcessorSupplier.STORE_NAME).withStringKeys().withStringValues().inMemory().build(),
             goodNodeName);
         builder.addProcessor(badNodeName, new LocalMockProcessorSupplier(), 
sourceNodeName);
-
+        
         try {
             new ProcessorTopologyTestDriver(streamsConfig, builder);
             fail("Should have throw StreamsException");
-        } catch (final StreamsException expected) {
-            final Throwable cause = expected.getCause();
-            if (cause == null
-                || !(cause instanceof TopologyBuilderException)
-                || !cause.getMessage().equals("Invalid topology building: 
Processor " + badNodeName + " has no access to StateStore " + 
LocalMockProcessorSupplier.STORE_NAME)) {
-                throw new RuntimeException("Did expect different exception. 
Did catch:", expected);
-            }
+        } catch (final StreamsException e) {
+            final String error = e.toString();
+            final String expectedMessage = 
"org.apache.kafka.streams.errors.StreamsException: failed to initialize 
processor " + badNodeName;
+            
+            assertThat(error, equalTo(expectedMessage));
         }
     }
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Don't throw TopologyBuilderException during runtime
> ---------------------------------------------------
>
>                 Key: KAFKA-5660
>                 URL: https://issues.apache.org/jira/browse/KAFKA-5660
>             Project: Kafka
>          Issue Type: Task
>          Components: streams
>    Affects Versions: 0.11.0.0
>            Reporter: Matthias J. Sax
>            Assignee: Nick Afshartous
>            Priority: Major
>             Fix For: 1.2.0
>
>
> {{TopologyBuilderException}} is a pre-runtime exception that should only be 
> thrown beforeĀ {{KafkaStreams#start()}} is called.
> However, we do throw {{TopologyBuilderException}} within
>  - `SourceNodeFactory#getTopics`
>  - `ProcessorContextImpl#getStateStore`
>  - `StreamPartitionAssignor#prepareTopic `
> (and maybe somewhere else: we should double check if there are other places 
> in the code like those).
> We should replace those exception with either {{StreamsException}} or with a 
> new exception type.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to