This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new cf092ae  KAFKA-5660 Don't throw TopologyBuilderException during 
runtime (#4645)
cf092ae is described below

commit cf092aeecc473b70d81c00b604e29de8c9f6d84b
Author: nafshartous <n...@afshartous.com>
AuthorDate: Tue Mar 6 20:49:33 2018 -0500

    KAFKA-5660 Don't throw TopologyBuilderException during runtime (#4645)
    
    Reviewers: Matthias J. Sax <mj...@apache.org>, Bill Bejeck 
<b...@confluent.io>, Guozhang Wang <guozh...@confluent.io>
---
 .../processor/internals/ProcessorContextImpl.java  |  8 ++---
 .../internals/StreamsPartitionAssignor.java        |  3 +-
 .../org/apache/kafka/streams/TopologyTest.java     | 16 ++++------
 .../streams/processor/TopologyBuilderTest.java     | 37 ++++++++++------------
 .../internals/InternalTopologyBuilderTest.java     | 17 +++++-----
 5 files changed, 37 insertions(+), 44 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
index 317581a..42d3d70 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
@@ -22,6 +22,7 @@ import org.apache.kafka.streams.processor.Cancellable;
 import org.apache.kafka.streams.processor.PunctuationType;
 import org.apache.kafka.streams.processor.Punctuator;
 import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.state.internals.ThreadCache;
 
@@ -54,13 +55,12 @@ public class ProcessorContextImpl extends 
AbstractProcessorContext implements Re
     }
 
     /**
-     * @throws org.apache.kafka.streams.errors.TopologyBuilderException if an 
attempt is made to access this state store from an unknown node
+     * @throws StreamsException if an attempt is made to access this state 
store from an unknown node
      */
-    @SuppressWarnings("deprecation")
     @Override
     public StateStore getStateStore(final String name) {
         if (currentNode() == null) {
-            throw new 
org.apache.kafka.streams.errors.TopologyBuilderException("Accessing from an 
unknown node");
+            throw new StreamsException("Accessing from an unknown node");
         }
 
         final StateStore global = stateManager.getGlobalStore(name);
@@ -69,7 +69,7 @@ public class ProcessorContextImpl extends 
AbstractProcessorContext implements Re
         }
 
         if (!currentNode().stateStores.contains(name)) {
-            throw new 
org.apache.kafka.streams.errors.TopologyBuilderException("Processor " + 
currentNode().name() + " has no access to StateStore " + name);
+            throw new StreamsException("Processor " + currentNode().name() + " 
has no access to StateStore " + name);
         }
 
         return stateManager.getStore(name);
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
index 71a84b2..0edbe2f 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
@@ -28,6 +28,7 @@ import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.errors.TaskAssignmentException;
 import org.apache.kafka.streams.processor.PartitionGrouper;
 import org.apache.kafka.streams.processor.TaskId;
@@ -694,7 +695,7 @@ public class StreamsPartitionAssignor implements 
PartitionAssignor, Configurable
                 continue;
             }
             if (numPartitions < 0) {
-                throw new 
org.apache.kafka.streams.errors.TopologyBuilderException(String.format("%sTopic 
[%s] number of partitions not defined", logPrefix, topic.name()));
+                throw new StreamsException(String.format("%sTopic [%s] number 
of partitions not defined", logPrefix, topic.name()));
             }
 
             topic.setNumberOfPartitions(numPartitions);
diff --git a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java 
b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
index 992ffd8..6834091 100644
--- a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
@@ -17,7 +17,6 @@
 package org.apache.kafka.streams;
 
 import org.apache.kafka.streams.errors.StreamsException;
-import org.apache.kafka.streams.errors.TopologyBuilderException;
 import org.apache.kafka.streams.errors.TopologyException;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
@@ -252,7 +251,7 @@ public class TopologyTest {
         } catch (final TopologyException expected) { }
     }
 
-    @Test(expected = TopologyBuilderException.class)
+    @Test
     public void shouldThrowOnUnassignedStateStoreAccess() throws Exception {
         final String sourceNodeName = "source";
         final String goodNodeName = "goodGuy";
@@ -276,15 +275,12 @@ public class TopologyTest {
 
         try {
             new ProcessorTopologyTestDriver(streamsConfig, 
topology.internalTopologyBuilder);
+            fail("Should have thrown StreamsException");
         } catch (final StreamsException e) {
-            final Throwable cause = e.getCause();
-            if (cause != null
-                && cause instanceof TopologyBuilderException
-                && cause.getMessage().equals("Invalid topology building: 
Processor " + badNodeName + " has no access to StateStore " + 
LocalMockProcessorSupplier.STORE_NAME)) {
-                throw (TopologyBuilderException) cause;
-            } else {
-                throw new RuntimeException("Did expect different exception. 
Did catch:", e);
-            }
+            final String error = e.toString();
+            final String expectedMessage = 
"org.apache.kafka.streams.errors.StreamsException: failed to initialize 
processor " + badNodeName;
+            
+            assertThat(error, equalTo(expectedMessage));
         }
     }
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
index 7a81594..f67b634 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
@@ -51,6 +51,7 @@ import java.util.regex.Pattern;
 
 import static org.apache.kafka.common.utils.Utils.mkList;
 import static org.apache.kafka.common.utils.Utils.mkSet;
+import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.core.IsInstanceOf.instanceOf;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -590,8 +591,7 @@ public class TopologyBuilderTest {
         assertEquals("appId-foo", topicConfig.name());
     }
 
-
-    @Test(expected = TopologyBuilderException.class)
+    @Test
     public void shouldThroughOnUnassignedStateStoreAccess() throws Exception {
         final String sourceNodeName = "source";
         final String goodNodeName = "goodGuy";
@@ -603,27 +603,24 @@ public class TopologyBuilderTest {
         config.put(StreamsConfig.STATE_DIR_CONFIG, 
TestUtils.tempDirectory().getAbsolutePath());
         final StreamsConfig streamsConfig = new StreamsConfig(config);
 
-        try {
-            final TopologyBuilder builder = new TopologyBuilder();
-            builder
-                .addSource(sourceNodeName, "topic")
-                .addProcessor(goodNodeName, new LocalMockProcessorSupplier(), 
sourceNodeName)
+        final TopologyBuilder builder = new TopologyBuilder();
+        builder.addSource(sourceNodeName, "topic")
+                .addProcessor(goodNodeName, new LocalMockProcessorSupplier(),
+                        sourceNodeName)
                 .addStateStore(
-                    
Stores.create(LocalMockProcessorSupplier.STORE_NAME).withStringKeys().withStringValues().inMemory().build(),
-                    goodNodeName)
-                .addProcessor(badNodeName, new LocalMockProcessorSupplier(), 
sourceNodeName);
-
+                        Stores.create(LocalMockProcessorSupplier.STORE_NAME)
+                                .withStringKeys().withStringValues().inMemory()
+                                .build(), goodNodeName)
+                .addProcessor(badNodeName, new LocalMockProcessorSupplier(),
+                        sourceNodeName);     
+        try {
             final ProcessorTopologyTestDriver driver = new 
ProcessorTopologyTestDriver(streamsConfig, builder.internalTopologyBuilder);
-            driver.process("topic", null, null);
+            fail("Should have thrown StreamsException");
         } catch (final StreamsException e) {
-            final Throwable cause = e.getCause();
-            if (cause != null
-                && cause instanceof TopologyBuilderException
-                && cause.getMessage().equals("Invalid topology building: 
Processor " + badNodeName + " has no access to StateStore " + 
LocalMockProcessorSupplier.STORE_NAME)) {
-                throw (TopologyBuilderException) cause;
-            } else {
-                throw new RuntimeException("Did expect different exception. 
Did catch:", e);
-            }
+            final String error = e.toString();
+            final String expectedMessage = 
"org.apache.kafka.streams.errors.StreamsException: failed to initialize 
processor " + badNodeName;
+
+            assertThat(error, equalTo(expectedMessage));
         }
     }
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
index a39e545..901fc4b 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
@@ -23,7 +23,6 @@ import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.Topology;
 import org.apache.kafka.streams.TopologyDescription;
 import org.apache.kafka.streams.errors.StreamsException;
-import org.apache.kafka.streams.errors.TopologyBuilderException;
 import org.apache.kafka.streams.errors.TopologyException;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
@@ -53,7 +52,9 @@ import java.util.regex.Pattern;
 
 import static org.apache.kafka.common.utils.Utils.mkList;
 import static org.apache.kafka.common.utils.Utils.mkSet;
+import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.core.IsInstanceOf.instanceOf;
+
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertThat;
@@ -578,17 +579,15 @@ public class InternalTopologyBuilderTest {
             
Stores.create(LocalMockProcessorSupplier.STORE_NAME).withStringKeys().withStringValues().inMemory().build(),
             goodNodeName);
         builder.addProcessor(badNodeName, new LocalMockProcessorSupplier(), 
sourceNodeName);
-
+        
         try {
             new ProcessorTopologyTestDriver(streamsConfig, builder);
             fail("Should have throw StreamsException");
-        } catch (final StreamsException expected) {
-            final Throwable cause = expected.getCause();
-            if (cause == null
-                || !(cause instanceof TopologyBuilderException)
-                || !cause.getMessage().equals("Invalid topology building: 
Processor " + badNodeName + " has no access to StateStore " + 
LocalMockProcessorSupplier.STORE_NAME)) {
-                throw new RuntimeException("Did expect different exception. 
Did catch:", expected);
-            }
+        } catch (final StreamsException e) {
+            final String error = e.toString();
+            final String expectedMessage = 
"org.apache.kafka.streams.errors.StreamsException: failed to initialize 
processor " + badNodeName;
+            
+            assertThat(error, equalTo(expectedMessage));
         }
     }
 

-- 
To stop receiving notification emails like this one, please contact
guozh...@apache.org.

Reply via email to