Repository: kafka Updated Branches: refs/heads/trunk 67f1e5b91 -> 859113786
KAFKA-4509: Task reusage on rebalance fails for threads on same host Author: Matthias J. Sax <[email protected]> Reviewers: Damian Guy, Guozhang Wang Closes #2233 from mjsax/kafka-4509-task-reusage-fix Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/85911378 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/85911378 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/85911378 Branch: refs/heads/trunk Commit: 859113786957a36381222f21287a940767e92f1c Parents: 67f1e5b Author: Matthias J. Sax <[email protected]> Authored: Tue Dec 13 12:11:09 2016 -0800 Committer: Guozhang Wang <[email protected]> Committed: Tue Dec 13 12:11:09 2016 -0800 ---------------------------------------------------------------------- .../kafka/streams/errors/LockException.java | 36 ++++++ .../internals/ProcessorStateManager.java | 11 +- .../processor/internals/StreamThread.java | 123 ++++++++++++++++--- .../processor/internals/StreamThreadTest.java | 88 ++++++++++++- 4 files changed, 233 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/85911378/streams/src/main/java/org/apache/kafka/streams/errors/LockException.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/errors/LockException.java b/streams/src/main/java/org/apache/kafka/streams/errors/LockException.java new file mode 100644 index 0000000..00c75ec --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/errors/LockException.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.kafka.streams.errors; + +/** + * Indicates that the state store directory lock could not be acquired because another thread holds the lock. + */ +public class LockException extends StreamsException { + + private static final long serialVersionUID = 1L; + + public LockException(final String s) { + super(s); + } + + public LockException(String s, Throwable throwable) { + super(s, throwable); + } + + public LockException(Throwable throwable) { + super(throwable); + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/85911378/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java index 795949f..30b84f1 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java @@ -22,6 +22,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.streams.errors.LockException; import org.apache.kafka.streams.errors.ProcessorStateException; import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.processor.StateRestoreCallback; @@ -37,8 +38,8 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; -import java.util.LinkedHashMap; import java.util.HashSet; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Set; @@ -71,11 +72,13 @@ public class ProcessorStateManager { private final Map<StateStore, ProcessorNode> stateStoreProcessorNodeMap; /** - * @throws IOException if any error happens while creating or locking the state directory + * @throws LockException if the state directory cannot be locked because another thread holds the lock + * (this might be recoverable by retrying) + * @throws IOException if any severe error happens while creating or locking the state directory */ public ProcessorStateManager(String applicationId, TaskId taskId, Collection<TopicPartition> sources, Consumer<byte[], byte[]> restoreConsumer, boolean isStandby, StateDirectory stateDirectory, final Map<String, String> sourceStoreToSourceTopic, - final Map<StateStore, ProcessorNode> stateStoreProcessorNodeMap) throws IOException { + final Map<StateStore, ProcessorNode> stateStoreProcessorNodeMap) throws LockException, IOException { this.applicationId = applicationId; this.defaultPartition = taskId.partition; this.taskId = taskId; @@ -98,7 +101,7 @@ public class ProcessorStateManager { this.logPrefix = String.format("task [%s]", taskId); if (!stateDirectory.lock(taskId, 5)) { - throw new IOException(String.format("%s Failed to lock the state directory: %s", logPrefix, baseDir.getCanonicalPath())); + throw new LockException(String.format("%s Failed to lock the state directory: %s", logPrefix, baseDir.getCanonicalPath())); } // load the checkpoint information http://git-wip-us.apache.org/repos/asf/kafka/blob/85911378/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index 151bfd5..a7793f8 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -38,6 +38,8 @@ import org.apache.kafka.common.utils.Time; import org.apache.kafka.streams.KafkaClientSupplier; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.StreamsMetrics; +import org.apache.kafka.streams.errors.LockException; +import org.apache.kafka.streams.errors.ProcessorStateException; import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.errors.TaskIdFormatException; import org.apache.kafka.streams.processor.PartitionGrouper; @@ -56,6 +58,7 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; @@ -216,6 +219,9 @@ public class StreamThread extends Thread { private ThreadCache cache; + private final TaskCreator taskCreator = new TaskCreator(); + private final StandbyTaskCreator standbyTaskCreator = new StandbyTaskCreator(); + final ConsumerRebalanceListener rebalanceListener = new ConsumerRebalanceListener() { @Override public void onPartitionsAssigned(Collection<TopicPartition> assignment) { @@ -811,10 +817,12 @@ public class StreamThread extends Thread { if (partitionAssignor == null) throw new IllegalStateException(logPrefix + " Partition assignor has not been initialized while adding stream tasks: this should not happen."); - // create the active tasks + final Map<TaskId, Set<TopicPartition>> newTasks = new HashMap<>(); + + // collect newly assigned tasks and reopen re-assigned tasks for (Map.Entry<TaskId, Set<TopicPartition>> entry : partitionAssignor.activeTasks().entrySet()) { - TaskId taskId = entry.getKey(); - Set<TopicPartition> partitions = entry.getValue(); + final TaskId taskId = entry.getKey(); + final Set<TopicPartition> partitions = entry.getValue(); if (assignment.containsAll(partitions)) { try { @@ -823,14 +831,15 @@ public class StreamThread extends Thread { log.debug("{} recycling old task {}", logPrefix, taskId); suspendedTasks.remove(taskId); task.initTopology(); + + activeTasks.put(taskId, task); + + for (TopicPartition partition : partitions) { + activeTasksByPartition.put(partition, task); + } } else { - log.debug("{} creating new task {}", logPrefix, taskId); - task = createStreamTask(taskId, partitions); + newTasks.put(taskId, partitions); } - activeTasks.put(taskId, task); - - for (TopicPartition partition : partitions) - activeTasksByPartition.put(partition, task); } catch (StreamsException e) { log.error("{} Failed to create an active task {}: ", logPrefix, taskId, e); throw e; @@ -840,8 +849,12 @@ public class StreamThread extends Thread { } } - // finally destroy any remaining suspended tasks + // destroy any remaining suspended tasks removeSuspendedTasks(); + + // create all newly assigned tasks (guard against race condition with other thread via backoff and retry) + // -> other thread will call removeSuspendedTasks(); eventually + taskCreator.retryWithBackoff(newTasks); } private StandbyTask createStandbyTask(TaskId id, Collection<TopicPartition> partitions) { @@ -864,10 +877,12 @@ public class StreamThread extends Thread { Map<TopicPartition, Long> checkpointedOffsets = new HashMap<>(); - // create the standby tasks + final Map<TaskId, Set<TopicPartition>> newStandbyTasks = new HashMap<>(); + + // collect newly assigned standby tasks and reopen re-assigned standby tasks for (Map.Entry<TaskId, Set<TopicPartition>> entry : partitionAssignor.standbyTasks().entrySet()) { - TaskId taskId = entry.getKey(); - Set<TopicPartition> partitions = entry.getValue(); + final TaskId taskId = entry.getKey(); + final Set<TopicPartition> partitions = entry.getValue(); StandbyTask task = findMatchingSuspendedStandbyTask(taskId, partitions); if (task != null) { @@ -875,9 +890,9 @@ public class StreamThread extends Thread { suspendedStandbyTasks.remove(taskId); task.initTopology(); } else { - log.debug("{} creating new standby task {}", logPrefix, taskId); - task = createStandbyTask(taskId, partitions); + newStandbyTasks.put(taskId, partitions); } + if (task != null) { standbyTasks.put(taskId, task); for (TopicPartition partition : partitions) { @@ -891,9 +906,14 @@ public class StreamThread extends Thread { checkpointedOffsets.putAll(task.checkpointedOffsets()); } } - // finally destroy any remaining suspended tasks + + // destroy any remaining suspended tasks removeSuspendedStandbyTasks(); + // create all newly assigned standby tasks (guard against race condition with other thread via backoff and retry) + // -> other thread will call removeSuspendedStandbyTasks(); eventually + standbyTaskCreator.retryWithBackoff(newStandbyTasks); + restoreConsumer.assign(new ArrayList<>(checkpointedOffsets.keySet())); for (Map.Entry<TopicPartition, Long> entry : checkpointedOffsets.entrySet()) { @@ -1153,4 +1173,75 @@ public class StreamThread extends Thread { sensor.add(name, stat); } } + + abstract class AbstractTaskCreator { + void retryWithBackoff(final Map<TaskId, Set<TopicPartition>> tasksToBeCreated) { + long backoffTimeMs = 50L; + while (true) { + final Iterator<Map.Entry<TaskId, Set<TopicPartition>>> it = tasksToBeCreated.entrySet().iterator(); + while (it.hasNext()) { + final Map.Entry<TaskId, Set<TopicPartition>> newTaskAndPartitions = it.next(); + final TaskId taskId = newTaskAndPartitions.getKey(); + final Set<TopicPartition> partitions = newTaskAndPartitions.getValue(); + + try { + createTask(taskId, partitions); + it.remove(); + } catch (final ProcessorStateException e) { + if (e.getCause() instanceof LockException) { + // ignore and retry + log.warn("Could not create task {}. Will retry.", taskId, e); + } else { + throw e; + } + } + } + + if (tasksToBeCreated.isEmpty()) { + break; + } + + try { + Thread.sleep(backoffTimeMs); + backoffTimeMs <<= 1; + } catch (InterruptedException e) { + // ignore + } + } + } + + abstract void createTask(final TaskId id, final Collection<TopicPartition> partitions); + } + + class TaskCreator extends AbstractTaskCreator { + void createTask(final TaskId taskId, final Collection<TopicPartition> partitions) { + log.debug("{} creating new task {}", logPrefix, taskId); + final StreamTask task = createStreamTask(taskId, partitions); + + activeTasks.put(taskId, task); + + for (TopicPartition partition : partitions) { + activeTasksByPartition.put(partition, task); + } + } + } + + class StandbyTaskCreator extends AbstractTaskCreator { + void createTask(final TaskId taskId, final Collection<TopicPartition> partitions) { + log.debug("{} creating new standby task {}", logPrefix, taskId); + final StandbyTask task = createStandbyTask(taskId, partitions); + + standbyTasks.put(taskId, task); + + for (TopicPartition partition : partitions) { + standbyTasksByPartition.put(partition, task); + } + // collect checked pointed offsets to position the restore consumer + // this include all partitions from which we restore states + for (TopicPartition partition : task.checkpointedOffsets().keySet()) { + standbyTasksByPartition.put(partition, task); + } + } + } + } http://git-wip-us.apache.org/repos/asf/kafka/blob/85911378/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java index c491657..0c2ace9 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java @@ -17,11 +17,6 @@ package org.apache.kafka.streams.processor.internals; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertSame; -import static org.junit.Assert.assertTrue; - import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.internals.PartitionAssignor; @@ -37,9 +32,11 @@ import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.TopologyBuilder; +import org.apache.kafka.streams.state.Stores; import org.apache.kafka.test.MockClientSupplier; import org.apache.kafka.test.MockProcessorSupplier; import org.apache.kafka.test.MockTimestampExtractor; +import org.apache.kafka.test.TestUtils; import org.junit.Assert; import org.junit.Test; @@ -57,6 +54,13 @@ import java.util.Properties; import java.util.Set; import java.util.UUID; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + public class StreamThreadTest { private final String clientId = "clientId"; @@ -118,6 +122,7 @@ public class StreamThreadTest { setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2171"); setProperty(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, "3"); setProperty(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MockTimestampExtractor.class.getName()); + setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath()); } }; } @@ -290,6 +295,79 @@ public class StreamThreadTest { (thread.state() == StreamThread.State.NOT_RUNNING)); } + final static String TOPIC = "topic"; + final Set<TopicPartition> assignmentThread1 = Collections.singleton(new TopicPartition(TOPIC, 0)); + final Set<TopicPartition> assignmentThread2 = Collections.singleton(new TopicPartition(TOPIC, 1)); + + @Test + public void testHandingOverTaskFromOneToAnotherThread() throws Exception { + final TopologyBuilder builder = new TopologyBuilder(); + builder.addStateStore( + Stores + .create("store") + .withByteArrayKeys() + .withByteArrayValues() + .persistent() + .build() + ); + final StreamsConfig config = new StreamsConfig(configProps()); + final MockClientSupplier mockClientSupplier = new MockClientSupplier(); + mockClientSupplier.consumer.assign(Arrays.asList(new TopicPartition(TOPIC, 0), new TopicPartition(TOPIC, 1))); + + final StreamThread thread1 = new StreamThread(builder, config, mockClientSupplier, applicationId, clientId + 1, processId, new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder)); + final StreamThread thread2 = new StreamThread(builder, config, mockClientSupplier, applicationId, clientId + 2, processId, new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder)); + thread1.partitionAssignor(new MockStreamsPartitionAssignor()); + thread2.partitionAssignor(new MockStreamsPartitionAssignor()); + + // revoke (to get threads in correct state) + thread1.rebalanceListener.onPartitionsRevoked(Collections.EMPTY_SET); + thread2.rebalanceListener.onPartitionsRevoked(Collections.EMPTY_SET); + + // assign + thread1.rebalanceListener.onPartitionsAssigned(assignmentThread1); + thread2.rebalanceListener.onPartitionsAssigned(assignmentThread2); + + final Set<TaskId> originalTaskAssignmentThread1 = new HashSet<>(); + for (TaskId tid : thread1.tasks().keySet()) { + originalTaskAssignmentThread1.add(tid); + } + final Set<TaskId> originalTaskAssignmentThread2 = new HashSet<>(); + for (TaskId tid : thread2.tasks().keySet()) { + originalTaskAssignmentThread2.add(tid); + } + + // revoke (task will be suspended) + thread1.rebalanceListener.onPartitionsRevoked(assignmentThread1); + thread2.rebalanceListener.onPartitionsRevoked(assignmentThread2); + + // assign reverted + Thread runIt = new Thread(new Runnable() { + @Override + public void run() { + thread1.rebalanceListener.onPartitionsAssigned(assignmentThread2); + } + }); + runIt.start(); + + thread2.rebalanceListener.onPartitionsAssigned(assignmentThread1); + + runIt.join(); + + assertThat(thread1.tasks().keySet(), equalTo(originalTaskAssignmentThread2)); + assertThat(thread2.tasks().keySet(), equalTo(originalTaskAssignmentThread1)); + assertThat(thread1.prevTasks(), equalTo(originalTaskAssignmentThread1)); + assertThat(thread2.prevTasks(), equalTo(originalTaskAssignmentThread2)); + } + + private class MockStreamsPartitionAssignor extends StreamPartitionAssignor { + @Override + Map<TaskId, Set<TopicPartition>> activeTasks() { + Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>(); + activeTasks.put(new TaskId(0, 0), assignmentThread1); + activeTasks.put(new TaskId(0, 1), assignmentThread2); + return activeTasks; + } + } @Test public void testMaybeClean() throws Exception {
