Repository: kafka
Updated Branches:
  refs/heads/trunk 67f1e5b91 -> 859113786


KAFKA-4509: Task reusage on rebalance fails for threads on same host

Author: Matthias J. Sax <[email protected]>

Reviewers: Damian Guy, Guozhang Wang

Closes #2233 from mjsax/kafka-4509-task-reusage-fix


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/85911378
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/85911378
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/85911378

Branch: refs/heads/trunk
Commit: 859113786957a36381222f21287a940767e92f1c
Parents: 67f1e5b
Author: Matthias J. Sax <[email protected]>
Authored: Tue Dec 13 12:11:09 2016 -0800
Committer: Guozhang Wang <[email protected]>
Committed: Tue Dec 13 12:11:09 2016 -0800

----------------------------------------------------------------------
 .../kafka/streams/errors/LockException.java     |  36 ++++++
 .../internals/ProcessorStateManager.java        |  11 +-
 .../processor/internals/StreamThread.java       | 123 ++++++++++++++++---
 .../processor/internals/StreamThreadTest.java   |  88 ++++++++++++-
 4 files changed, 233 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/85911378/streams/src/main/java/org/apache/kafka/streams/errors/LockException.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/errors/LockException.java 
b/streams/src/main/java/org/apache/kafka/streams/errors/LockException.java
new file mode 100644
index 0000000..00c75ec
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/errors/LockException.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements.  See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License.  You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.kafka.streams.errors;
+
+/**
+ * Indicates that the state store directory lock could not be acquired because 
another thread holds the lock.
+ */
+public class LockException extends StreamsException {
+
+    private static final long serialVersionUID = 1L;
+
+    public LockException(final String s) {
+        super(s);
+    }
+
+    public LockException(String s, Throwable throwable) {
+        super(s, throwable);
+    }
+
+    public LockException(Throwable throwable) {
+        super(throwable);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/85911378/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
index 795949f..30b84f1 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -22,6 +22,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.streams.errors.LockException;
 import org.apache.kafka.streams.errors.ProcessorStateException;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.processor.StateRestoreCallback;
@@ -37,8 +38,8 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.LinkedHashMap;
 import java.util.HashSet;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -71,11 +72,13 @@ public class ProcessorStateManager {
     private final Map<StateStore, ProcessorNode> stateStoreProcessorNodeMap;
 
     /**
-     * @throws IOException if any error happens while creating or locking the 
state directory
+     * @throws LockException if the state directory cannot be locked because 
another thread holds the lock
+     *                       (this might be recoverable by retrying)
+     * @throws IOException if any severe error happens while creating or 
locking the state directory
      */
     public ProcessorStateManager(String applicationId, TaskId taskId, 
Collection<TopicPartition> sources, Consumer<byte[], byte[]> restoreConsumer, 
boolean isStandby,
                                  StateDirectory stateDirectory, final 
Map<String, String> sourceStoreToSourceTopic,
-                                 final Map<StateStore, ProcessorNode> 
stateStoreProcessorNodeMap) throws IOException {
+                                 final Map<StateStore, ProcessorNode> 
stateStoreProcessorNodeMap) throws LockException, IOException {
         this.applicationId = applicationId;
         this.defaultPartition = taskId.partition;
         this.taskId = taskId;
@@ -98,7 +101,7 @@ public class ProcessorStateManager {
         this.logPrefix = String.format("task [%s]", taskId);
 
         if (!stateDirectory.lock(taskId, 5)) {
-            throw new IOException(String.format("%s Failed to lock the state 
directory: %s", logPrefix, baseDir.getCanonicalPath()));
+            throw new LockException(String.format("%s Failed to lock the state 
directory: %s", logPrefix, baseDir.getCanonicalPath()));
         }
 
         // load the checkpoint information

http://git-wip-us.apache.org/repos/asf/kafka/blob/85911378/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 151bfd5..a7793f8 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -38,6 +38,8 @@ import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.KafkaClientSupplier;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.StreamsMetrics;
+import org.apache.kafka.streams.errors.LockException;
+import org.apache.kafka.streams.errors.ProcessorStateException;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.errors.TaskIdFormatException;
 import org.apache.kafka.streams.processor.PartitionGrouper;
@@ -56,6 +58,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -216,6 +219,9 @@ public class StreamThread extends Thread {
 
     private ThreadCache cache;
 
+    private final TaskCreator taskCreator = new TaskCreator();
+    private final StandbyTaskCreator standbyTaskCreator = new 
StandbyTaskCreator();
+
     final ConsumerRebalanceListener rebalanceListener = new 
ConsumerRebalanceListener() {
         @Override
         public void onPartitionsAssigned(Collection<TopicPartition> 
assignment) {
@@ -811,10 +817,12 @@ public class StreamThread extends Thread {
         if (partitionAssignor == null)
             throw new IllegalStateException(logPrefix + " Partition assignor 
has not been initialized while adding stream tasks: this should not happen.");
 
-        // create the active tasks
+        final Map<TaskId, Set<TopicPartition>> newTasks = new HashMap<>();
+
+        // collect newly assigned tasks and reopen re-assigned tasks
         for (Map.Entry<TaskId, Set<TopicPartition>> entry : 
partitionAssignor.activeTasks().entrySet()) {
-            TaskId taskId = entry.getKey();
-            Set<TopicPartition> partitions = entry.getValue();
+            final TaskId taskId = entry.getKey();
+            final Set<TopicPartition> partitions = entry.getValue();
 
             if (assignment.containsAll(partitions)) {
                 try {
@@ -823,14 +831,15 @@ public class StreamThread extends Thread {
                         log.debug("{} recycling old task {}", logPrefix, 
taskId);
                         suspendedTasks.remove(taskId);
                         task.initTopology();
+
+                        activeTasks.put(taskId, task);
+
+                        for (TopicPartition partition : partitions) {
+                            activeTasksByPartition.put(partition, task);
+                        }
                     } else {
-                        log.debug("{} creating new task {}", logPrefix, 
taskId);
-                        task = createStreamTask(taskId, partitions);
+                        newTasks.put(taskId, partitions);
                     }
-                    activeTasks.put(taskId, task);
-
-                    for (TopicPartition partition : partitions)
-                        activeTasksByPartition.put(partition, task);
                 } catch (StreamsException e) {
                     log.error("{} Failed to create an active task {}: ", 
logPrefix, taskId, e);
                     throw e;
@@ -840,8 +849,12 @@ public class StreamThread extends Thread {
             }
         }
 
-        // finally destroy any remaining suspended tasks
+        // destroy any remaining suspended tasks
         removeSuspendedTasks();
+
+        // create all newly assigned tasks (guard against race condition with 
other thread via backoff and retry)
+        // -> other thread will call removeSuspendedTasks(); eventually
+        taskCreator.retryWithBackoff(newTasks);
     }
 
     private StandbyTask createStandbyTask(TaskId id, 
Collection<TopicPartition> partitions) {
@@ -864,10 +877,12 @@ public class StreamThread extends Thread {
 
         Map<TopicPartition, Long> checkpointedOffsets = new HashMap<>();
 
-        // create the standby tasks
+        final Map<TaskId, Set<TopicPartition>> newStandbyTasks = new 
HashMap<>();
+
+        // collect newly assigned standby tasks and reopen re-assigned standby 
tasks
         for (Map.Entry<TaskId, Set<TopicPartition>> entry : 
partitionAssignor.standbyTasks().entrySet()) {
-            TaskId taskId = entry.getKey();
-            Set<TopicPartition> partitions = entry.getValue();
+            final TaskId taskId = entry.getKey();
+            final Set<TopicPartition> partitions = entry.getValue();
             StandbyTask task = findMatchingSuspendedStandbyTask(taskId, 
partitions);
 
             if (task != null) {
@@ -875,9 +890,9 @@ public class StreamThread extends Thread {
                 suspendedStandbyTasks.remove(taskId);
                 task.initTopology();
             } else {
-                log.debug("{} creating new standby task {}", logPrefix, 
taskId);
-                task = createStandbyTask(taskId, partitions);
+                newStandbyTasks.put(taskId, partitions);
             }
+
             if (task != null) {
                 standbyTasks.put(taskId, task);
                 for (TopicPartition partition : partitions) {
@@ -891,9 +906,14 @@ public class StreamThread extends Thread {
                 checkpointedOffsets.putAll(task.checkpointedOffsets());
             }
         }
-        // finally destroy any remaining suspended tasks
+
+        // destroy any remaining suspended tasks
         removeSuspendedStandbyTasks();
 
+        // create all newly assigned standby tasks (guard against race 
condition with other thread via backoff and retry)
+        // -> other thread will call removeSuspendedStandbyTasks(); eventually
+        standbyTaskCreator.retryWithBackoff(newStandbyTasks);
+
         restoreConsumer.assign(new ArrayList<>(checkpointedOffsets.keySet()));
 
         for (Map.Entry<TopicPartition, Long> entry : 
checkpointedOffsets.entrySet()) {
@@ -1153,4 +1173,75 @@ public class StreamThread extends Thread {
                 sensor.add(name, stat);
         }
     }
+
+    abstract class AbstractTaskCreator {
+        void retryWithBackoff(final Map<TaskId, Set<TopicPartition>> 
tasksToBeCreated) {
+            long backoffTimeMs = 50L;
+            while (true) {
+                final Iterator<Map.Entry<TaskId, Set<TopicPartition>>> it = 
tasksToBeCreated.entrySet().iterator();
+                while (it.hasNext()) {
+                    final Map.Entry<TaskId, Set<TopicPartition>> 
newTaskAndPartitions = it.next();
+                    final TaskId taskId = newTaskAndPartitions.getKey();
+                    final Set<TopicPartition> partitions = 
newTaskAndPartitions.getValue();
+
+                    try {
+                        createTask(taskId, partitions);
+                        it.remove();
+                    } catch (final ProcessorStateException e) {
+                        if (e.getCause() instanceof LockException) {
+                            // ignore and retry
+                            log.warn("Could not create task {}. Will retry.", 
taskId, e);
+                        } else {
+                            throw e;
+                        }
+                    }
+                }
+
+                if (tasksToBeCreated.isEmpty()) {
+                    break;
+                }
+
+                try {
+                    Thread.sleep(backoffTimeMs);
+                    backoffTimeMs <<= 1;
+                } catch (InterruptedException e) {
+                    // ignore
+                }
+            }
+        }
+
+        abstract void createTask(final TaskId id, final 
Collection<TopicPartition> partitions);
+    }
+
+    class TaskCreator extends AbstractTaskCreator {
+        void createTask(final TaskId taskId, final Collection<TopicPartition> 
partitions) {
+            log.debug("{} creating new task {}", logPrefix, taskId);
+            final StreamTask task = createStreamTask(taskId, partitions);
+
+            activeTasks.put(taskId, task);
+
+            for (TopicPartition partition : partitions) {
+                activeTasksByPartition.put(partition, task);
+            }
+        }
+    }
+
+    class StandbyTaskCreator extends AbstractTaskCreator {
+        void createTask(final TaskId taskId, final Collection<TopicPartition> 
partitions) {
+            log.debug("{} creating new standby task {}", logPrefix, taskId);
+            final StandbyTask task = createStandbyTask(taskId, partitions);
+
+            standbyTasks.put(taskId, task);
+
+            for (TopicPartition partition : partitions) {
+                standbyTasksByPartition.put(partition, task);
+            }
+            // collect checked pointed offsets to position the restore consumer
+            // this include all partitions from which we restore states
+            for (TopicPartition partition : 
task.checkpointedOffsets().keySet()) {
+                standbyTasksByPartition.put(partition, task);
+            }
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/85911378/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index c491657..0c2ace9 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -17,11 +17,6 @@
 
 package org.apache.kafka.streams.processor.internals;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertSame;
-import static org.junit.Assert.assertTrue;
-
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.internals.PartitionAssignor;
@@ -37,9 +32,11 @@ import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.TopologyBuilder;
+import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.test.MockClientSupplier;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockTimestampExtractor;
+import org.apache.kafka.test.TestUtils;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -57,6 +54,13 @@ import java.util.Properties;
 import java.util.Set;
 import java.util.UUID;
 
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
 public class StreamThreadTest {
 
     private final String clientId = "clientId";
@@ -118,6 +122,7 @@ public class StreamThreadTest {
                 setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
"localhost:2171");
                 
setProperty(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, "3");
                 setProperty(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, 
MockTimestampExtractor.class.getName());
+                setProperty(StreamsConfig.STATE_DIR_CONFIG, 
TestUtils.tempDirectory().getAbsolutePath());
             }
         };
     }
@@ -290,6 +295,79 @@ public class StreamThreadTest {
             (thread.state() == StreamThread.State.NOT_RUNNING));
     }
 
+    final static String TOPIC = "topic";
+    final Set<TopicPartition> assignmentThread1 = Collections.singleton(new 
TopicPartition(TOPIC, 0));
+    final Set<TopicPartition> assignmentThread2 = Collections.singleton(new 
TopicPartition(TOPIC, 1));
+
+    @Test
+    public void testHandingOverTaskFromOneToAnotherThread() throws Exception {
+        final TopologyBuilder builder = new TopologyBuilder();
+        builder.addStateStore(
+            Stores
+                .create("store")
+                .withByteArrayKeys()
+                .withByteArrayValues()
+                .persistent()
+                .build()
+        );
+        final StreamsConfig config = new StreamsConfig(configProps());
+        final MockClientSupplier mockClientSupplier = new MockClientSupplier();
+        mockClientSupplier.consumer.assign(Arrays.asList(new 
TopicPartition(TOPIC, 0), new TopicPartition(TOPIC, 1)));
+
+        final StreamThread thread1 = new StreamThread(builder, config, 
mockClientSupplier, applicationId, clientId + 1, processId, new Metrics(), 
Time.SYSTEM, new StreamsMetadataState(builder));
+        final StreamThread thread2 = new StreamThread(builder, config, 
mockClientSupplier, applicationId, clientId + 2, processId, new Metrics(), 
Time.SYSTEM, new StreamsMetadataState(builder));
+        thread1.partitionAssignor(new MockStreamsPartitionAssignor());
+        thread2.partitionAssignor(new MockStreamsPartitionAssignor());
+
+        // revoke (to get threads in correct state)
+        thread1.rebalanceListener.onPartitionsRevoked(Collections.EMPTY_SET);
+        thread2.rebalanceListener.onPartitionsRevoked(Collections.EMPTY_SET);
+
+        // assign
+        thread1.rebalanceListener.onPartitionsAssigned(assignmentThread1);
+        thread2.rebalanceListener.onPartitionsAssigned(assignmentThread2);
+
+        final Set<TaskId> originalTaskAssignmentThread1 = new HashSet<>();
+        for (TaskId tid : thread1.tasks().keySet()) {
+            originalTaskAssignmentThread1.add(tid);
+        }
+        final Set<TaskId> originalTaskAssignmentThread2 = new HashSet<>();
+        for (TaskId tid : thread2.tasks().keySet()) {
+            originalTaskAssignmentThread2.add(tid);
+        }
+
+        // revoke (task will be suspended)
+        thread1.rebalanceListener.onPartitionsRevoked(assignmentThread1);
+        thread2.rebalanceListener.onPartitionsRevoked(assignmentThread2);
+
+        // assign reverted
+        Thread runIt = new Thread(new Runnable() {
+            @Override
+            public void run() {
+                
thread1.rebalanceListener.onPartitionsAssigned(assignmentThread2);
+            }
+        });
+        runIt.start();
+
+        thread2.rebalanceListener.onPartitionsAssigned(assignmentThread1);
+
+        runIt.join();
+
+        assertThat(thread1.tasks().keySet(), 
equalTo(originalTaskAssignmentThread2));
+        assertThat(thread2.tasks().keySet(), 
equalTo(originalTaskAssignmentThread1));
+        assertThat(thread1.prevTasks(), 
equalTo(originalTaskAssignmentThread1));
+        assertThat(thread2.prevTasks(), 
equalTo(originalTaskAssignmentThread2));
+    }
+
+    private class MockStreamsPartitionAssignor extends StreamPartitionAssignor 
{
+        @Override
+        Map<TaskId, Set<TopicPartition>> activeTasks() {
+            Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
+            activeTasks.put(new TaskId(0, 0), assignmentThread1);
+            activeTasks.put(new TaskId(0, 1), assignmentThread2);
+            return activeTasks;
+        }
+    }
 
     @Test
     public void testMaybeClean() throws Exception {

Reply via email to