Repository: kafka Updated Branches: refs/heads/0.10.2 8bf70610b -> 6ca71c064
KAFKA-5167: fix unreleased state lock due to uncaught exception when closing a task Catch any exception throw from user code during `task.close()` and close state manger to release state lock even in case on an exception. Author: Matthias J. Sax <[email protected]> Reviewers: Eno Thereska, Damian Guy, Guozhang Wang Closes #3001 from mjsax/kafka-5167-rebalance-lock-exception-0102 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/6ca71c06 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/6ca71c06 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/6ca71c06 Branch: refs/heads/0.10.2 Commit: 6ca71c064940605873e93ebf82ca8c404c8b43e2 Parents: 8bf7061 Author: Matthias J. Sax <[email protected]> Authored: Fri May 12 22:50:05 2017 -0700 Committer: Guozhang Wang <[email protected]> Committed: Fri May 12 22:50:05 2017 -0700 ---------------------------------------------------------------------- .../processor/internals/StreamThread.java | 8 +- .../processor/internals/StreamThreadTest.java | 166 +++++++++++++++++-- 2 files changed, 155 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/6ca71c06/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index a0ee9a8..abaacce 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -899,6 +899,7 @@ public class StreamThread extends Thread { } catch (Exception e) { log.error("{} Failed to remove suspended task {}", logPrefix, next.getKey(), e); } finally { + task.closeStateManager(false); suspendedTaskIterator.remove(); } } @@ -907,11 +908,11 @@ public class StreamThread extends Thread { } private void closeNonAssignedSuspendedStandbyTasks() { - final Set<TaskId> currentSuspendedTaskIds = partitionAssignor.standbyTasks().keySet(); + final Set<TaskId> currentStandbyTaskIds = partitionAssignor.standbyTasks().keySet(); final Iterator<Map.Entry<TaskId, StandbyTask>> standByTaskIterator = suspendedStandbyTasks.entrySet().iterator(); while (standByTaskIterator.hasNext()) { final Map.Entry<TaskId, StandbyTask> suspendedTask = standByTaskIterator.next(); - if (!currentSuspendedTaskIds.contains(suspendedTask.getKey())) { + if (!currentStandbyTaskIds.contains(suspendedTask.getKey())) { log.debug("{} Closing suspended non-assigned standby task {}", logPrefix, suspendedTask.getKey()); final StandbyTask task = suspendedTask.getValue(); try { @@ -920,6 +921,7 @@ public class StreamThread extends Thread { } catch (Exception e) { log.error("{} Failed to remove suspended task standby {}", logPrefix, suspendedTask.getKey(), e); } finally { + task.closeStateManager(false); standByTaskIterator.remove(); } } @@ -1211,7 +1213,7 @@ public class StreamThread extends Thread { it.remove(); } catch (final LockException e) { // ignore and retry - log.warn("Could not create task {}. Will retry.", taskId, e); + log.warn("Could not create task {}. Will retry: {}", taskId, e.getMessage()); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/6ca71c06/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java index 0e98f56..c2f62d6 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java @@ -17,19 +17,6 @@ package org.apache.kafka.streams.processor.internals; -import java.io.File; -import java.nio.ByteBuffer; -import java.nio.file.Files; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.Set; -import java.util.UUID; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.MockConsumer; @@ -54,11 +41,24 @@ import org.apache.kafka.test.MockInternalTopicManager; import org.apache.kafka.test.MockProcessorSupplier; import org.apache.kafka.test.MockStateStoreSupplier; import org.apache.kafka.test.MockTimestampExtractor; -import org.junit.Before; import org.apache.kafka.test.TestUtils; import org.junit.Assert; +import org.junit.Before; import org.junit.Test; +import java.io.File; +import java.nio.ByteBuffer; +import java.nio.file.Files; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.UUID; import java.util.regex.Pattern; import static org.hamcrest.CoreMatchers.equalTo; @@ -67,11 +67,10 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -import static org.junit.Assert.assertThat; - public class StreamThreadTest { private final String clientId = "clientId"; @@ -414,15 +413,27 @@ public class StreamThreadTest { private class MockStreamsPartitionAssignor extends StreamPartitionAssignor { private final Map<TaskId, Set<TopicPartition>> activeTaskAssignment; + private final Map<TaskId, Set<TopicPartition>> standbyTaskAssignment; public MockStreamsPartitionAssignor(final Map<TaskId, Set<TopicPartition>> activeTaskAssignment) { + this(activeTaskAssignment, Collections.<TaskId, Set<TopicPartition>>emptyMap()); + } + + public MockStreamsPartitionAssignor(final Map<TaskId, Set<TopicPartition>> activeTaskAssignment, + final Map<TaskId, Set<TopicPartition>> standbyTaskAssignment) { this.activeTaskAssignment = activeTaskAssignment; + this.standbyTaskAssignment = standbyTaskAssignment; } @Override Map<TaskId, Set<TopicPartition>> activeTasks() { return activeTaskAssignment; } + + @Override + Map<TaskId, Set<TopicPartition>> standbyTasks() { + return standbyTaskAssignment; + } } @Test @@ -1104,6 +1115,129 @@ public class StreamThreadTest { } + @Test + public void shouldReleaseStateDirLockIfFailureOnTaskCloseForUnassignedSuspendedTask() throws Exception { + final TopologyBuilder builder = new TopologyBuilder(); + builder.setApplicationId(applicationId); + + final TaskId taskId = new TaskId(0, 0); + final MockClientSupplier clientSupplier = new MockClientSupplier(); + final StreamsConfig config = new StreamsConfig(configProps()); + final StateDirectory stateDirectory = new StateDirectory(applicationId, config.getString(StreamsConfig.STATE_DIR_CONFIG)); + + final TestStreamTask testStreamTask = new TestStreamTask(taskId, + applicationId, + Utils.mkSet(new TopicPartition("topic", 0)), + builder.build(0), + clientSupplier.consumer, + clientSupplier.producer, + clientSupplier.restoreConsumer, + config, + new MockStreamsMetrics(new Metrics()), + stateDirectory) { + + @Override + public void close() { + throw new RuntimeException("KABOOM!!!"); + } + }; + + final StreamThread thread = new StreamThread(builder, config, clientSupplier, applicationId, + clientId, processId, new Metrics(), new MockTime(), + new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), 0) { + @Override + protected StreamTask createStreamTask(final TaskId id, final Collection<TopicPartition> partitions) { + return testStreamTask; + } + }; + + final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>(); + activeTasks.put(testStreamTask.id, testStreamTask.partitions); + thread.partitionAssignor(new MockStreamsPartitionAssignor(activeTasks)); + thread.rebalanceListener.onPartitionsAssigned(testStreamTask.partitions); + thread.rebalanceListener.onPartitionsRevoked(Collections.<TopicPartition>emptyList()); + + thread.partitionAssignor(new MockStreamsPartitionAssignor(Collections.<TaskId, Set<TopicPartition>>emptyMap())); + + final StateDirectory testStateDir = new StateDirectory(applicationId, config.getString(StreamsConfig.STATE_DIR_CONFIG)); + try { + assertFalse(testStateDir.lock(taskId, 0)); + thread.rebalanceListener.onPartitionsAssigned(Collections.<TopicPartition>emptyList()); + assertTrue(testStateDir.lock(taskId, 0)); + } finally { + testStateDir.unlock(taskId); + } + } + + @Test + public void shouldReleaseStateDirLockIfFailureOnStandbyTaskCloseForUnassignedSuspendedStandbyTask() throws Exception { + final String storeName = "store"; + final String changelogTopic = applicationId + "-" + storeName + "-changelog"; + + final KStreamBuilder builder = new KStreamBuilder(); + builder.setApplicationId(applicationId); + builder.stream("topic1").groupByKey().count(storeName); + + final TaskId taskId = new TaskId(0, 0); + final MockClientSupplier clientSupplier = new MockClientSupplier(); + final StreamsConfig config = new StreamsConfig(configProps()); + final StateDirectory stateDirectory = new StateDirectory(applicationId, config.getString(StreamsConfig.STATE_DIR_CONFIG)); + + clientSupplier.restoreConsumer.updatePartitions(changelogTopic, + Collections.singletonList(new PartitionInfo(changelogTopic, 0, null, null, null))); + clientSupplier.restoreConsumer.updateBeginningOffsets(new HashMap<TopicPartition, Long>() { + { + put(new TopicPartition(changelogTopic, 0), 0L); + } + }); + clientSupplier.restoreConsumer.updateEndOffsets(new HashMap<TopicPartition, Long>() { + { + put(new TopicPartition(changelogTopic, 0), 0L); + } + }); + + final StreamThread thread = new StreamThread(builder, config, clientSupplier, applicationId, + clientId, processId, new Metrics(), new MockTime(), + new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), 0) + { + @Override + protected StandbyTask createStandbyTask(final TaskId id, final Collection<TopicPartition> partitions) { + return new StandbyTask( + taskId, + applicationId, + partitions, + builder.build(0), + clientSupplier.consumer, + clientSupplier.restoreConsumer, + config, + new StreamsMetricsImpl(new Metrics(), "groupName", Collections.<String, String>emptyMap()), + stateDirectory) { + + @Override + public void close() { + throw new RuntimeException("KABOOM!!!"); + } + }; + } + }; + + final Map<TaskId, Set<TopicPartition>> standbyTasks = new HashMap<>(); + standbyTasks.put(taskId, Collections.singleton(new TopicPartition("topic", 0))); + thread.partitionAssignor(new MockStreamsPartitionAssignor(Collections.<TaskId, Set<TopicPartition>>emptyMap(), standbyTasks)); + thread.rebalanceListener.onPartitionsAssigned(Collections.<TopicPartition>emptySet()); + thread.rebalanceListener.onPartitionsRevoked(Collections.<TopicPartition>emptyList()); + + thread.partitionAssignor(new MockStreamsPartitionAssignor(Collections.<TaskId, Set<TopicPartition>>emptyMap())); + + final StateDirectory testStateDir = new StateDirectory(applicationId, config.getString(StreamsConfig.STATE_DIR_CONFIG)); + try { + assertFalse(testStateDir.lock(taskId, 0)); + thread.rebalanceListener.onPartitionsAssigned(Collections.<TopicPartition>emptyList()); + assertTrue(testStateDir.lock(taskId, 0)); + } finally { + testStateDir.unlock(taskId); + } + } private void initPartitionGrouper(StreamsConfig config, StreamThread thread, MockClientSupplier clientSupplier) {
