Repository: kafka
Updated Branches:
  refs/heads/trunk 8c07e0f31 -> 70afd5f9d


KAFKA-4175: Can't have StandbyTasks in KafkaStreams where 
NUM_STREAM_THREADS_CONFIG > 1

standby tasks should be assigned per consumer not per process

Author: Damian Guy <damian....@gmail.com>

Reviewers: Eno Thereska, Guozhang Wang

Closes #1862 from dguy/kafka-4175


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/70afd5f9
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/70afd5f9
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/70afd5f9

Branch: refs/heads/trunk
Commit: 70afd5f9dd2eddc784a24fa2518992ef3371f0a4
Parents: 8c07e0f
Author: Damian Guy <damian....@gmail.com>
Authored: Mon Sep 19 10:30:58 2016 -0700
Committer: Guozhang Wang <wangg...@gmail.com>
Committed: Mon Sep 19 10:30:58 2016 -0700

----------------------------------------------------------------------
 .../streams/processor/internals/StreamPartitionAssignor.java    | 3 ++-
 .../processor/internals/StreamPartitionAssignorTest.java        | 5 ++++-
 .../org/apache/kafka/streams/smoketest/SmokeTestClient.java     | 2 +-
 3 files changed, 7 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/70afd5f9/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
index b6cebf4..3be9c11 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
@@ -418,10 +418,11 @@ public class StreamPartitionAssignor implements 
PartitionAssignor, Configurable
             }
 
             final int numConsumers = consumers.size();
-            Map<TaskId, Set<TopicPartition>> standby = new HashMap<>();
+
 
             int i = 0;
             for (String consumer : consumers) {
+                Map<TaskId, Set<TopicPartition>> standby = new HashMap<>();
                 ArrayList<AssignedPartition> assignedPartitions = new 
ArrayList<>();
 
                 final int numTaskIds = taskIds.size();

http://git-wip-us.apache.org/repos/asf/kafka/blob/70afd5f9/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
index 4f4d2eb..e46a016 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
@@ -52,6 +52,7 @@ import java.util.Set;
 import java.util.UUID;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNotNull;
 
 public class StreamPartitionAssignorTest {
@@ -385,6 +386,8 @@ public class StreamPartitionAssignorTest {
         allActiveTasks.addAll(info11.activeTasks);
         allStandbyTasks.addAll(info11.standbyTasks.keySet());
 
+        assertNotEquals("same processId has same set of standby tasks", 
info11.standbyTasks.keySet(), info10.standbyTasks.keySet());
+
         // check active tasks assigned to the first client
         assertEquals(Utils.mkSet(task0, task1), new HashSet<>(allActiveTasks));
         assertEquals(Utils.mkSet(task2), new HashSet<>(allStandbyTasks));
@@ -650,7 +653,7 @@ public class StreamPartitionAssignorTest {
             // pass
         }
     }
-
+    
     @Test
     public void 
shouldThrowExceptionIfApplicationServerConfigPortIsNotAnInteger() throws 
Exception {
         final Properties properties = configProps();

http://git-wip-us.apache.org/repos/asf/kafka/blob/70afd5f9/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java 
b/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java
index 5302900..ba71e05 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java
@@ -85,7 +85,7 @@ public class SmokeTestClient extends SmokeTestUtil {
         props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
         props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, zookeeper);
         props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, 
TestTimestampExtractor.class.getName());
-        props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1);
+        props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 3);
         props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 2);
         props.put(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, 100);
         props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 2);

Reply via email to