Repository: kafka Updated Branches: refs/heads/0.11.0 58125ced7 -> 2a4eeb1c6
KAFKA-5562; Do streams state directory cleanup on a single thread Backported from trunk: https://github.com/apache/kafka/pull/3516 Author: Damian Guy <damian....@gmail.com> Reviewers: Guozhang Wang <wangg...@gmail.com>, Eno Thereska <eno.there...@gmail.com> Closes #3654 from dguy/cherry-pick-stream-thread-cleanup Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/2a4eeb1c Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/2a4eeb1c Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/2a4eeb1c Branch: refs/heads/0.11.0 Commit: 2a4eeb1c6f32c84ea73058bc502b10c2232b011f Parents: 58125ce Author: Damian Guy <damian....@gmail.com> Authored: Fri Aug 11 16:40:28 2017 +0100 Committer: Damian Guy <damian....@gmail.com> Committed: Fri Aug 11 16:40:28 2017 +0100 ---------------------------------------------------------------------- build.gradle | 1 + .../org/apache/kafka/streams/KafkaStreams.java | 48 ++- .../processor/internals/StateDirectory.java | 69 ++-- .../processor/internals/StreamThread.java | 16 +- .../apache/kafka/streams/KafkaStreamsTest.java | 55 +++ .../integration/RegexSourceIntegrationTest.java | 3 +- .../processor/internals/StateDirectoryTest.java | 57 +++ .../internals/StreamPartitionAssignorTest.java | 209 ++++++++-- .../processor/internals/StreamThreadTest.java | 392 ++++++++----------- .../StreamThreadStateStoreProviderTest.java | 3 +- 10 files changed, 536 insertions(+), 317 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/2a4eeb1c/build.gradle ---------------------------------------------------------------------- diff --git a/build.gradle b/build.gradle index 4d54271..9a23747 100644 --- a/build.gradle +++ b/build.gradle @@ -841,6 +841,7 @@ project(':streams') { testCompile project(':core') testCompile project(':core').sourceSets.test.output testCompile libs.junit + testCompile libs.easymock testRuntime libs.slf4jlog4j } http://git-wip-us.apache.org/repos/asf/kafka/blob/2a4eeb1c/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java index c16f379..0c7c598 100644 --- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java +++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java @@ -56,7 +56,6 @@ import org.apache.kafka.streams.state.internals.StreamThreadStateStoreProvider; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.File; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; @@ -69,6 +68,9 @@ import java.util.Map; import java.util.Properties; import java.util.Set; import java.util.UUID; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import static org.apache.kafka.common.utils.Utils.getHost; @@ -123,6 +125,7 @@ public class KafkaStreams { private static final int DEFAULT_CLOSE_TIMEOUT = 0; private GlobalStreamThread globalStreamThread; + private final ScheduledExecutorService stateDirCleaner; private final StreamThread[] threads; private final Metrics metrics; private final QueryableStoreProvider queryableStoreProvider; @@ -136,6 +139,7 @@ public class KafkaStreams { private final StreamsMetadataState streamsMetadataState; private final StreamsConfig config; + private final StateDirectory stateDirectory; // container states /** @@ -358,13 +362,13 @@ public class KafkaStreams { final long cacheSizeBytes = Math.max(0, config.getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG) / (config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG) + (globalTaskTopology == null ? 0 : 1))); - + stateDirectory = new StateDirectory(applicationId, config.getString(StreamsConfig.STATE_DIR_CONFIG), time); if (globalTaskTopology != null) { final String globalThreadId = clientId + "-GlobalStreamThread"; globalStreamThread = new GlobalStreamThread(globalTaskTopology, config, clientSupplier.getRestoreConsumer(config.getRestoreConsumerConfigs(clientId + "-global")), - new StateDirectory(applicationId, globalThreadId, config.getString(StreamsConfig.STATE_DIR_CONFIG), time), + stateDirectory, metrics, time, globalThreadId); @@ -381,13 +385,24 @@ public class KafkaStreams { metrics, time, streamsMetadataState, - cacheSizeBytes); + cacheSizeBytes, + stateDirectory); + threads[i].setStateListener(streamStateListener); threadState.put(threads[i].getId(), threads[i].state()); storeProviders.add(new StreamThreadStateStoreProvider(threads[i])); } final GlobalStateStoreProvider globalStateStoreProvider = new GlobalStateStoreProvider(builder.globalStateStores()); queryableStoreProvider = new QueryableStoreProvider(storeProviders, globalStateStoreProvider); + final String cleanupThreadName = clientId + "-CleanupThread"; + stateDirCleaner = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() { + @Override + public Thread newThread(final Runnable r) { + final Thread thread = new Thread(r, cleanupThreadName); + thread.setDaemon(true); + return thread; + } + }); } private static HostInfo parseHostInfo(final String endPoint) { @@ -450,10 +465,23 @@ public class KafkaStreams { thread.start(); } + final Long cleanupDelay = config.getLong(StreamsConfig.STATE_CLEANUP_DELAY_MS_CONFIG); + stateDirCleaner.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + synchronized (stateLock) { + if (state == State.RUNNING) { + stateDirectory.cleanRemovedTasks(cleanupDelay); + } + } + } + }, cleanupDelay, cleanupDelay, TimeUnit.MILLISECONDS); + log.info("{} Started Kafka Stream process", logPrefix); } else { throw new IllegalStateException("Cannot start again."); } + } /** @@ -478,6 +506,7 @@ public class KafkaStreams { log.debug("{} Stopping Kafka Stream process.", logPrefix); if (state.isCreatedOrRunning()) { setState(State.PENDING_SHUTDOWN); + stateDirCleaner.shutdownNow(); // save the current thread so that if it is a stream thread // we don't attempt to join it and cause a deadlock final Thread shutdown = new Thread(new Runnable() { @@ -576,17 +605,6 @@ public class KafkaStreams { if (state.isRunning()) { throw new IllegalStateException("Cannot clean up while running."); } - - final String appId = config.getString(StreamsConfig.APPLICATION_ID_CONFIG); - final String stateDir = config.getString(StreamsConfig.STATE_DIR_CONFIG); - - final String localApplicationDir = stateDir + File.separator + appId; - log.debug("{} Removing local Kafka Streams application data in {} for application {}.", - logPrefix, - localApplicationDir, - appId); - - final StateDirectory stateDirectory = new StateDirectory(appId, "cleanup", stateDir, Time.SYSTEM); stateDirectory.cleanRemovedTasks(0); } http://git-wip-us.apache.org/repos/asf/kafka/blob/2a4eeb1c/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java index 3e547eb..8d46da1 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java @@ -45,21 +45,25 @@ public class StateDirectory { private static final Logger log = LoggerFactory.getLogger(StateDirectory.class); private final File stateDir; - private final String logPrefix; private final HashMap<TaskId, FileChannel> channels = new HashMap<>(); - private final HashMap<TaskId, FileLock> locks = new HashMap<>(); + private final HashMap<TaskId, LockAndOwner> locks = new HashMap<>(); private final Time time; private FileChannel globalStateChannel; private FileLock globalStateLock; - public StateDirectory(final String applicationId, final String stateDirConfig, final Time time) { - this(applicationId, "", stateDirConfig, time); + private static class LockAndOwner { + final FileLock lock; + final String owningThread; + + LockAndOwner(final String owningThread, final FileLock lock) { + this.owningThread = owningThread; + this.lock = lock; + } } - public StateDirectory(final String applicationId, final String threadId, final String stateDirConfig, final Time time) { + public StateDirectory(final String applicationId, final String stateDirConfig, final Time time) { this.time = time; - this.logPrefix = String.format("stream-thread [%s]", threadId); final File baseDir = new File(stateDirConfig); if (!baseDir.exists() && !baseDir.mkdirs()) { throw new ProcessorStateException(String.format("state directory [%s] doesn't exist and couldn't be created", @@ -95,6 +99,10 @@ public class StateDirectory { return dir; } + private String logPrefix() { + return String.format("stream-thread [%s]", Thread.currentThread().getName()); + } + /** * Get the lock for the {@link TaskId}s directory if it is available * @param taskId @@ -102,13 +110,19 @@ public class StateDirectory { * @return true if successful * @throws IOException */ - boolean lock(final TaskId taskId, int retry) throws IOException { + synchronized boolean lock(final TaskId taskId, int retry) throws IOException { + final File lockFile; // we already have the lock so bail out here - if (locks.containsKey(taskId)) { - log.trace("{} Found cached state dir lock for task {}", logPrefix, taskId); + final LockAndOwner lockAndOwner = locks.get(taskId); + if (lockAndOwner != null && lockAndOwner.owningThread.equals(Thread.currentThread().getName())) { + log.trace("{} Found cached state dir lock for task {}", logPrefix(), taskId); return true; + } else if (lockAndOwner != null) { + // another thread owns the lock + return false; } + try { lockFile = new File(directoryForTask(taskId), LOCK_FILE_NAME); } catch (ProcessorStateException e) { @@ -130,16 +144,16 @@ public class StateDirectory { final FileLock lock = tryLock(retry, channel); if (lock != null) { - locks.put(taskId, lock); + locks.put(taskId, new LockAndOwner(Thread.currentThread().getName(), lock)); - log.debug("{} Acquired state dir lock for task {}", logPrefix, taskId); + log.debug("{} Acquired state dir lock for task {}", logPrefix(), taskId); } return lock != null; } - boolean lockGlobalState(final int retry) throws IOException { + synchronized boolean lockGlobalState(final int retry) throws IOException { if (globalStateLock != null) { - log.trace("{} Found cached state dir lock for the global task", logPrefix); + log.trace("{} Found cached state dir lock for the global task", logPrefix()); return true; } @@ -161,12 +175,12 @@ public class StateDirectory { globalStateChannel = channel; globalStateLock = fileLock; - log.debug("{} Acquired global state dir lock", logPrefix); + log.debug("{} Acquired global state dir lock", logPrefix()); return true; } - void unlockGlobalState() throws IOException { + synchronized void unlockGlobalState() throws IOException { if (globalStateLock == null) { return; } @@ -175,7 +189,7 @@ public class StateDirectory { globalStateLock = null; globalStateChannel = null; - log.debug("{} Released global state dir lock", logPrefix); + log.debug("{} Released global state dir lock", logPrefix()); } /** @@ -183,18 +197,20 @@ public class StateDirectory { * @param taskId * @throws IOException */ - void unlock(final TaskId taskId) throws IOException { - final FileLock lock = locks.remove(taskId); - if (lock != null) { - lock.release(); - - log.debug("{} Released state dir lock for task {}", logPrefix, taskId); + synchronized boolean unlock(final TaskId taskId) throws IOException { + final LockAndOwner lockAndOwner = locks.get(taskId); + if (lockAndOwner != null && lockAndOwner.owningThread.equals(Thread.currentThread().getName())) { + locks.remove(taskId); + lockAndOwner.lock.release(); + log.debug("{} Released state dir lock for task {}", logPrefix(), taskId); final FileChannel fileChannel = channels.remove(taskId); if (fileChannel != null) { fileChannel.close(); } + return true; } + return false; } /** @@ -204,12 +220,11 @@ public class StateDirectory { * @param cleanupDelayMs only remove directories if they haven't been modified for at least * this amount of time (milliseconds) */ - public void cleanRemovedTasks(final long cleanupDelayMs) { + public synchronized void cleanRemovedTasks(final long cleanupDelayMs) { final File[] taskDirs = listTaskDirectories(); if (taskDirs == null || taskDirs.length == 0) { return; // nothing to do } - for (File taskDir : taskDirs) { final String dirName = taskDir.getName(); TaskId id = TaskId.parse(dirName); @@ -219,19 +234,19 @@ public class StateDirectory { long now = time.milliseconds(); long lastModifiedMs = taskDir.lastModified(); if (now > lastModifiedMs + cleanupDelayMs) { - log.info("{} Deleting obsolete state directory {} for task {} as {}ms has elapsed (cleanup delay is {}ms)", logPrefix, dirName, id, now - lastModifiedMs, cleanupDelayMs); + log.info("{} Deleting obsolete state directory {} for task {} as {}ms has elapsed (cleanup delay is {}ms)", logPrefix(), dirName, id, now - lastModifiedMs, cleanupDelayMs); Utils.delete(taskDir); } } } catch (OverlappingFileLockException e) { // locked by another thread } catch (IOException e) { - log.error("{} Failed to lock the state directory due to an unexpected exception", logPrefix, e); + log.error("{} Failed to lock the state directory due to an unexpected exception", logPrefix(), e); } finally { try { unlock(id); } catch (IOException e) { - log.error("{} Failed to release the state directory lock", logPrefix); + log.error("{} Failed to release the state directory lock", logPrefix()); } } } http://git-wip-us.apache.org/repos/asf/kafka/blob/2a4eeb1c/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index 9774627..c2da0cb 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -438,7 +438,8 @@ public class StreamThread extends Thread { final Metrics metrics, final Time time, final StreamsMetadataState streamsMetadataState, - final long cacheSizeBytes) { + final long cacheSizeBytes, + final StateDirectory stateDirectory) { super(clientId + "-StreamThread-" + STREAM_THREAD_ID_SEQUENCE.getAndIncrement()); this.applicationId = applicationId; this.config = config; @@ -488,7 +489,7 @@ public class StreamThread extends Thread { // standby KTables standbyRecords = new HashMap<>(); - stateDirectory = new StateDirectory(applicationId, threadClientId, config.getString(StreamsConfig.STATE_DIR_CONFIG), time); + this.stateDirectory = stateDirectory; final Object maxPollInterval = consumerConfigs.get(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG); rebalanceTimeoutMs = (Integer) ConfigDef.parseType(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollInterval, Type.INT); pollTimeMs = config.getLong(StreamsConfig.POLL_MS_CONFIG); @@ -557,7 +558,6 @@ public class StreamThread extends Thread { maybeCommit(timerStartedMs); maybeUpdateStandbyTasks(timerStartedMs); - maybeClean(timerStartedMs); } log.info("{} Shutting down at user request", logPrefix); } @@ -881,16 +881,6 @@ public class StreamThread extends Thread { } /** - * Cleanup any states of the tasks that have been removed from this thread - */ - protected void maybeClean(final long now) { - if (now > lastCleanMs + cleanTimeMs) { - stateDirectory.cleanRemovedTasks(cleanTimeMs); - lastCleanMs = now; - } - } - - /** * Compute the latency based on the current marked timestamp, and update the marked timestamp * with the current system timestamp. * http://git-wip-us.apache.org/repos/asf/kafka/blob/2a4eeb1c/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java index 4ebc42b..a03b7cc 100644 --- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java @@ -30,6 +30,7 @@ import org.apache.kafka.streams.kstream.KStreamBuilder; import org.apache.kafka.streams.processor.StreamPartitioner; import org.apache.kafka.test.IntegrationTest; import org.apache.kafka.test.MockMetricsReporter; +import org.apache.kafka.test.TestCondition; import org.apache.kafka.test.TestUtils; import org.junit.Assert; import org.junit.Before; @@ -37,6 +38,7 @@ import org.junit.ClassRule; import org.junit.Test; import org.junit.experimental.categories.Category; +import java.io.File; import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -302,6 +304,59 @@ public class KafkaStreamsTest { } + @Test + public void shouldCleanupOldStateDirs() throws InterruptedException { + final Properties props = new Properties(); + final String appId = "cleanupOldStateDirs"; + final String stateDir = TestUtils.tempDirectory().getPath(); + props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, appId); + props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); + props.setProperty(StreamsConfig.STATE_CLEANUP_DELAY_MS_CONFIG, "1"); + props.setProperty(StreamsConfig.STATE_DIR_CONFIG, stateDir); + + + final String topic = "topic"; + CLUSTER.createTopic(topic); + final KStreamBuilder builder = new KStreamBuilder(); + + builder.stream(Serdes.String(), Serdes.String(), topic); + + final KafkaStreams streams = new KafkaStreams(builder, props); + final CountDownLatch latch = new CountDownLatch(1); + streams.setStateListener(new KafkaStreams.StateListener() { + @Override + public void onChange(final KafkaStreams.State newState, final KafkaStreams.State oldState) { + if (newState == KafkaStreams.State.RUNNING && oldState == KafkaStreams.State.REBALANCING) { + latch.countDown(); + } + } + }); + final String appDir = stateDir + File.separator + appId; + final File oldTaskDir = new File(appDir, "10_1"); + assertTrue(oldTaskDir.mkdirs()); + try { + streams.start(); + latch.await(30, TimeUnit.SECONDS); + verifyCleanupStateDir(appDir, oldTaskDir); + assertTrue(oldTaskDir.mkdirs()); + verifyCleanupStateDir(appDir, oldTaskDir); + } finally { + streams.close(); + } + } + + private void verifyCleanupStateDir(final String appDir, final File oldTaskDir) throws InterruptedException { + final File taskDir = new File(appDir, "0_0"); + TestUtils.waitForCondition(new TestCondition() { + @Override + public boolean conditionMet() { + return !oldTaskDir.exists() && taskDir.exists(); + } + }, 30000, "cleanup has not successfully run"); + assertTrue(taskDir.exists()); + } + + public static class StateListenerStub implements KafkaStreams.StateListener { public int numChanges = 0; public KafkaStreams.State oldState; http://git-wip-us.apache.org/repos/asf/kafka/blob/2a4eeb1c/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java index 0b5c5e9..8b749be 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java @@ -36,6 +36,7 @@ import org.apache.kafka.streams.processor.ProcessorSupplier; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.TopologyBuilder; import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier; +import org.apache.kafka.streams.processor.internals.StateDirectory; import org.apache.kafka.streams.processor.internals.StreamTask; import org.apache.kafka.streams.processor.internals.StreamThread; import org.apache.kafka.streams.processor.internals.StreamsMetadataState; @@ -440,7 +441,7 @@ public class RegexSourceIntegrationTest { public TestStreamThread(final TopologyBuilder builder, final StreamsConfig config, final KafkaClientSupplier clientSupplier, final String applicationId, final String clientId, final UUID processId, final Metrics metrics, final Time time) { super(builder, config, clientSupplier, applicationId, clientId, processId, metrics, time, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), - 0); + 0, new StateDirectory(applicationId, config.getString(StreamsConfig.STATE_DIR_CONFIG), time)); } @Override http://git-wip-us.apache.org/repos/asf/kafka/blob/2a4eeb1c/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java index f1b5efe..815d640 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java @@ -32,9 +32,13 @@ import java.nio.channels.OverlappingFileLockException; import java.nio.file.StandardOpenOption; import java.util.Arrays; import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -266,4 +270,57 @@ public class StateDirectoryTest { } } + @Test + public void shouldNotLockStateDirLockedByAnotherThread() throws IOException, InterruptedException { + final TaskId taskId = new TaskId(0, 0); + final AtomicReference<IOException> exceptionOnThread = new AtomicReference<>(); + final Thread thread = new Thread(new Runnable() { + @Override + public void run() { + try { + directory.lock(taskId, 1); + } catch (final IOException e) { + exceptionOnThread.set(e); + } + } + }); + thread.start(); + thread.join(30000); + assertNull("should not have had an exception during locking on other thread", exceptionOnThread.get()); + assertFalse(directory.lock(taskId, 1)); + } + + @Test + public void shouldNotUnLockStateDirLockedByAnotherThread() throws IOException, InterruptedException { + final TaskId taskId = new TaskId(0, 0); + final CountDownLatch lockLatch = new CountDownLatch(1); + final CountDownLatch unlockLatch = new CountDownLatch(1); + final AtomicReference<Exception> exceptionOnThread = new AtomicReference<>(); + final Thread thread = new Thread(new Runnable() { + @Override + public void run() { + try { + assertTrue(directory.lock(taskId, 1)); + lockLatch.countDown(); + unlockLatch.await(); + directory.unlock(taskId); + } catch (final Exception e) { + exceptionOnThread.set(e); + } + } + }); + thread.start(); + lockLatch.await(5, TimeUnit.SECONDS); + + assertNull("should not have had an exception on other thread", exceptionOnThread.get()); + assertFalse(directory.unlock(taskId)); + assertFalse(directory.lock(taskId, 1)); + + unlockLatch.countDown(); + thread.join(30000); + + assertNull("should not have had an exception on other thread", exceptionOnThread.get()); + assertTrue(directory.lock(taskId, 1)); + } + } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/2a4eeb1c/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java index 17eb50a..98cd20a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java @@ -44,6 +44,7 @@ import org.apache.kafka.test.MockInternalTopicManager; import org.apache.kafka.test.MockProcessorSupplier; import org.apache.kafka.test.MockStateStoreSupplier; import org.apache.kafka.test.MockTimestampExtractor; +import org.apache.kafka.test.TestUtils; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -106,11 +107,12 @@ public class StreamPartitionAssignorTest { private final MockClientSupplier mockClientSupplier = new MockClientSupplier(); private final TopologyBuilder builder = new TopologyBuilder(); private final StreamsConfig config = new StreamsConfig(configProps()); + private final StateDirectory stateDirectory = new StateDirectory("appId", TestUtils.tempDirectory().getPath(), new MockTime()); private final StreamThread mockStreamThread = new StreamThread(builder, config, mockClientSupplier, "appID", "clientId", UUID.randomUUID(), new Metrics(), new MockTime(), - null, 1L); + null, 1L, stateDirectory); private final Map<String, Object> configurationMap = new HashMap<>(); private Properties configProps() { @@ -146,8 +148,20 @@ public class StreamPartitionAssignorTest { String clientId = "client-id"; UUID processId = UUID.randomUUID(); - StreamThread thread = new StreamThread(builder, config, new MockClientSupplier(), "test", clientId, processId, new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), - 0) { + StreamThread thread = new StreamThread( + builder, + config, + new MockClientSupplier(), + "test", + clientId, + processId, + new Metrics(), + Time.SYSTEM, + new StreamsMetadataState(builder, + StreamsMetadataState.UNKNOWN_HOST), + 0, + stateDirectory) { + @Override public Set<TaskId> prevActiveTasks() { return prevTasks; @@ -192,10 +206,19 @@ public class StreamPartitionAssignorTest { String client1 = "client1"; - StreamThread thread10 = new StreamThread(builder, config, mockClientSupplier, "test", client1, uuid1, new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), - 0); - + StreamThread thread10 = new StreamThread( + builder, + config, + mockClientSupplier, + "test", + client1, + uuid1, + new Metrics(), + Time.SYSTEM, + new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), + 0, + stateDirectory); partitionAssignor.configure(config.getConsumerConfigs(thread10, "test", client1)); partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(thread10.config, mockClientSupplier.restoreConsumer)); @@ -258,7 +281,19 @@ public class StreamPartitionAssignorTest { UUID uuid1 = UUID.randomUUID(); String client1 = "client1"; - StreamThread thread10 = new StreamThread(builder, config, mockClientSupplier, "test", client1, uuid1, new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), 0); + StreamThread thread10 = new StreamThread( + builder, + config, + mockClientSupplier, + "test", + client1, + uuid1, + new Metrics(), + Time.SYSTEM, + new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), + 0, + stateDirectory); + partitionAssignor.configure(config.getConsumerConfigs(thread10, "test", client1)); partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(thread10.config, mockClientSupplier.restoreConsumer)); @@ -297,7 +332,18 @@ public class StreamPartitionAssignorTest { UUID uuid1 = UUID.randomUUID(); String client1 = "client1"; - StreamThread thread10 = new StreamThread(builder, config, new MockClientSupplier(), "test", client1, uuid1, new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), 0); + StreamThread thread10 = new StreamThread( + builder, + config, + new MockClientSupplier(), + "test", + client1, + uuid1, + new Metrics(), + Time.SYSTEM, + new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), + 0, + stateDirectory); partitionAssignor.configure(config.getConsumerConfigs(thread10, "test", client1)); @@ -355,8 +401,18 @@ public class StreamPartitionAssignorTest { UUID uuid2 = UUID.randomUUID(); String client1 = "client1"; - StreamThread thread10 = new StreamThread(builder, config, mockClientSupplier, "test", client1, uuid1, new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), - 0); + StreamThread thread10 = new StreamThread( + builder, + config, + mockClientSupplier, + "test", + client1, + uuid1, + new Metrics(), + Time.SYSTEM, + new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), + 0, + stateDirectory); partitionAssignor.configure(config.getConsumerConfigs(thread10, "test", client1)); partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(thread10.config, mockClientSupplier.restoreConsumer)); @@ -423,8 +479,19 @@ public class StreamPartitionAssignorTest { String client1 = "client1"; - StreamThread thread10 = new StreamThread(builder, config, mockClientSupplier, applicationId, client1, uuid1, new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), - 0); + StreamThread thread10 = new StreamThread( + builder, + config, + mockClientSupplier, + applicationId, + client1, + uuid1, + new Metrics(), + Time.SYSTEM, + new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), + 0, + stateDirectory); + partitionAssignor.configure(config.getConsumerConfigs(thread10, applicationId, client1)); partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(thread10.config, mockClientSupplier.restoreConsumer)); @@ -507,8 +574,18 @@ public class StreamPartitionAssignorTest { UUID uuid2 = UUID.randomUUID(); String client1 = "client1"; - StreamThread thread10 = new StreamThread(builder, config, mockClientSupplier, "test", client1, uuid1, new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), - 0); + StreamThread thread10 = new StreamThread( + builder, + config, + mockClientSupplier, + "test", + client1, + uuid1, + new Metrics(), + Time.SYSTEM, + new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), + 0, + stateDirectory); partitionAssignor.configure(config.getConsumerConfigs(thread10, "test", client1)); partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(thread10.config, mockClientSupplier.restoreConsumer)); @@ -568,8 +645,18 @@ public class StreamPartitionAssignorTest { UUID uuid = UUID.randomUUID(); String client1 = "client1"; - StreamThread thread = new StreamThread(builder, config, mockClientSupplier, "test", client1, uuid, new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), - 0); + StreamThread thread = new StreamThread( + builder, + config, + mockClientSupplier, + "test", + client1, + uuid, + new Metrics(), + Time.SYSTEM, + new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), + 0, + stateDirectory); partitionAssignor.configure(config.getConsumerConfigs(thread, "test", client1)); @@ -605,9 +692,18 @@ public class StreamPartitionAssignorTest { UUID uuid1 = UUID.randomUUID(); String client1 = "client1"; - - StreamThread thread10 = new StreamThread(builder, config, mockClientSupplier, applicationId, client1, uuid1, new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), - 0); + StreamThread thread10 = new StreamThread( + builder, + config, + mockClientSupplier, + applicationId, + client1, + uuid1, + new Metrics(), + Time.SYSTEM, + new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), + 0, + stateDirectory); partitionAssignor.configure(config.getConsumerConfigs(thread10, applicationId, client1)); MockInternalTopicManager internalTopicManager = new MockInternalTopicManager(thread10.config, mockClientSupplier.restoreConsumer); @@ -645,7 +741,7 @@ public class StreamPartitionAssignorTest { String client1 = "client1"; StreamThread thread10 = new StreamThread(builder, config, mockClientSupplier, applicationId, client1, uuid1, new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), - 0); + 0, stateDirectory); partitionAssignor.configure(config.getConsumerConfigs(thread10, applicationId, client1)); MockInternalTopicManager internalTopicManager = new MockInternalTopicManager(thread10.config, mockClientSupplier.restoreConsumer); @@ -678,7 +774,7 @@ public class StreamPartitionAssignorTest { final String client1 = "client1"; final StreamThread streamThread = new StreamThread(builder, config, mockClientSupplier, applicationId, client1, uuid1, new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), - 0); + 0, stateDirectory); partitionAssignor.configure(config.getConsumerConfigs(streamThread, applicationId, client1)); final PartitionAssignor.Subscription subscription = partitionAssignor.subscription(Utils.mkSet("input")); @@ -703,8 +799,18 @@ public class StreamPartitionAssignorTest { final UUID uuid1 = UUID.randomUUID(); final String client1 = "client1"; - final StreamThread streamThread = new StreamThread(builder, config, mockClientSupplier, applicationId, client1, uuid1, new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), - 0); + final StreamThread streamThread = new StreamThread( + builder, + config, + mockClientSupplier, + applicationId, + client1, + uuid1, + new Metrics(), + Time.SYSTEM, + new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), + 0, + stateDirectory); final StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor(); partitionAssignor.configure(config.getConsumerConfigs(streamThread, applicationId, client1)); @@ -735,9 +841,18 @@ public class StreamPartitionAssignorTest { final String applicationId = "application-id"; builder.setApplicationId(applicationId); - final StreamThread streamThread = new StreamThread(builder, config, mockClientSupplier, applicationId, client1, uuid1, - new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), - 0); + final StreamThread streamThread = new StreamThread( + builder, + config, + mockClientSupplier, + applicationId, + client1, + uuid1, + new Metrics(), + Time.SYSTEM, + new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), + 0, + stateDirectory); partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(streamThread.config, mockClientSupplier.restoreConsumer)); @@ -760,10 +875,18 @@ public class StreamPartitionAssignorTest { final String applicationId = "application-id"; builder.setApplicationId(applicationId); - - final StreamThread streamThread = new StreamThread(builder, config, mockClientSupplier, applicationId, client1, uuid1, - new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), - 0); + final StreamThread streamThread = new StreamThread( + builder, + config, + mockClientSupplier, + applicationId, + client1, + uuid1, + new Metrics(), + Time.SYSTEM, + new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), + 0, + stateDirectory); try { partitionAssignor.configure(config.getConsumerConfigs(streamThread, applicationId, client1)); @@ -875,7 +998,18 @@ public class StreamPartitionAssignorTest { final UUID uuid = UUID.randomUUID(); final String client = "client1"; - final StreamThread streamThread = new StreamThread(builder, config, mockClientSupplier, applicationId, client, uuid, new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), 0); + final StreamThread streamThread = new StreamThread( + builder, + config, + mockClientSupplier, + applicationId, + client, + uuid, + new Metrics(), + Time.SYSTEM, + new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), + 0, + stateDirectory); partitionAssignor.configure(config.getConsumerConfigs(streamThread, applicationId, client)); final MockInternalTopicManager mockInternalTopicManager = new MockInternalTopicManager(streamThread.config, mockClientSupplier.restoreConsumer); @@ -954,7 +1088,18 @@ public class StreamPartitionAssignorTest { final UUID uuid = UUID.randomUUID(); final String client = "client1"; - final StreamThread streamThread = new StreamThread(builder, config, mockClientSupplier, applicationId, client, uuid, new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), 0); + final StreamThread streamThread = new StreamThread( + builder, + config, + mockClientSupplier, + applicationId, + client, + uuid, + new Metrics(), + Time.SYSTEM, + new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), + 0, + stateDirectory); partitionAssignor.configure(config.getConsumerConfigs(streamThread, applicationId, client)); partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(streamThread.config, mockClientSupplier.restoreConsumer)); http://git-wip-us.apache.org/repos/asf/kafka/blob/2a4eeb1c/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java index d8d2e4f..3b280f1 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java @@ -48,11 +48,13 @@ import org.apache.kafka.test.MockStateStoreSupplier; import org.apache.kafka.test.MockTimestampExtractor; import org.apache.kafka.test.TestCondition; import org.apache.kafka.test.TestUtils; +import org.easymock.EasyMock; import org.junit.Assert; import org.junit.Before; import org.junit.Test; import java.io.File; +import java.io.IOException; import java.lang.reflect.Field; import java.nio.ByteBuffer; import java.nio.file.Files; @@ -92,7 +94,8 @@ public class StreamThreadTest { private UUID processId = UUID.randomUUID(); final KStreamBuilder builder = new KStreamBuilder(); private final StreamsConfig config = new StreamsConfig(configProps(false)); - + private final String stateDir = TestUtils.tempDirectory().getPath(); + private final StateDirectory stateDirectory = new StateDirectory("applicationId", stateDir, mockTime); @Before public void setUp() throws Exception { @@ -441,8 +444,47 @@ public class StreamThreadTest { assertTrue(thread.tasks().isEmpty()); thread.close(); + assertTrue((thread.state() == StreamThread.State.PENDING_SHUTDOWN) || (thread.state() == StreamThread.State.CREATED)); + + } + + @SuppressWarnings("unchecked") + @Test + public void testStateChangeStartClose() throws InterruptedException { + + final StreamThread thread = new StreamThread( + builder, + config, + clientSupplier, + applicationId, + clientId, + processId, + metrics, + Time.SYSTEM, + new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), + 0, + stateDirectory); + + final StateListenerStub stateListener = new StateListenerStub(); + thread.setStateListener(stateListener); + thread.start(); + TestUtils.waitForCondition(new TestCondition() { + @Override + public boolean conditionMet() { + return thread.state() == StreamThread.State.RUNNING; + } + }, 10 * 1000, "Thread never started."); + thread.close(); + assertEquals(thread.state(), StreamThread.State.PENDING_SHUTDOWN); + TestUtils.waitForCondition(new TestCondition() { + @Override + public boolean conditionMet() { + return thread.state() == StreamThread.State.DEAD; + } + }, 10 * 1000, "Thread never shut down."); + thread.close(); } private final static String TOPIC = "topic"; @@ -474,7 +516,8 @@ public class StreamThreadTest { metrics, Time.SYSTEM, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), - 0); + 0, + stateDirectory); final StreamThread thread2 = new StreamThread( builder, config, @@ -485,7 +528,8 @@ public class StreamThreadTest { metrics, Time.SYSTEM, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), - 0); + 0, + stateDirectory); final Map<TaskId, Set<TopicPartition>> task0 = Collections.singletonMap(new TaskId(0, 0), task0Assignment); final Map<TaskId, Set<TopicPartition>> task1 = Collections.singletonMap(new TaskId(0, 1), task1Assignment); @@ -580,7 +624,8 @@ public class StreamThreadTest { metrics, mockTime, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), - 0); + 0, + stateDirectory); final String defaultGroupName = "stream-metrics"; final String defaultPrefix = "thread." + thread.threadClientId(); final Map<String, String> defaultTags = Collections.singletonMap("client-id", thread.threadClientId()); @@ -611,157 +656,7 @@ public class StreamThreadTest { } @Test - public void testMaybeClean() throws Exception { - final File baseDir = Files.createTempDirectory("test").toFile(); - try { - final long cleanupDelay = 1000L; - final Properties props = configProps(false); - props.setProperty(StreamsConfig.STATE_CLEANUP_DELAY_MS_CONFIG, Long.toString(cleanupDelay)); - props.setProperty(StreamsConfig.STATE_DIR_CONFIG, baseDir.getCanonicalPath()); - final StreamsConfig config = new StreamsConfig(props); - final File applicationDir = new File(baseDir, applicationId); - applicationDir.mkdir(); - final File stateDir1 = new File(applicationDir, task1.toString()); - final File stateDir2 = new File(applicationDir, task2.toString()); - final File stateDir3 = new File(applicationDir, task3.toString()); - final File extraDir = new File(applicationDir, applicationId); - stateDir1.mkdir(); - stateDir2.mkdir(); - stateDir3.mkdir(); - extraDir.mkdir(); - - builder.addSource("source1", "topic1"); - - final StreamThread thread = new StreamThread( - builder, - config, - clientSupplier, - applicationId, - clientId, - processId, - metrics, - mockTime, - new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), - 0) { - - @Override - public void maybeClean(final long now) { - super.maybeClean(now); - } - - @Override - protected StreamTask createStreamTask(final TaskId id, final Collection<TopicPartition> partitionsForTask) { - final ProcessorTopology topology = builder.build(id.topicGroupId); - return new TestStreamTask( - id, - applicationId, - partitionsForTask, - topology, - consumer, - clientSupplier.getProducer(new HashMap<String, Object>()), - restoreConsumer, - config, - new MockStreamsMetrics(new Metrics()), - stateDirectory); - } - }; - - initPartitionGrouper(config, thread, clientSupplier); - assertTrue(thread.tasks().isEmpty()); - mockTime.sleep(cleanupDelay); - - // all directories exist since an assignment didn't happen - assertTrue(stateDir1.exists()); - assertTrue(stateDir2.exists()); - assertTrue(stateDir3.exists()); - assertTrue(extraDir.exists()); - - List<TopicPartition> revokedPartitions; - List<TopicPartition> assignedPartitions; - Map<TaskId, StreamTask> prevTasks; - - // - // Assign t1p1 and t1p2. This should create task1 & task2 - // - final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>(); - activeTasks.put(task1, Collections.singleton(t1p1)); - activeTasks.put(task2, Collections.singleton(t1p2)); - thread.setPartitionAssignor(new MockStreamsPartitionAssignor(activeTasks)); - - revokedPartitions = Collections.emptyList(); - assignedPartitions = Arrays.asList(t1p1, t1p2); - prevTasks = new HashMap<>(thread.tasks()); - - final ConsumerRebalanceListener rebalanceListener = thread.rebalanceListener; - rebalanceListener.onPartitionsRevoked(revokedPartitions); - rebalanceListener.onPartitionsAssigned(assignedPartitions); - - // there shouldn't be any previous task - assertTrue(prevTasks.isEmpty()); - - // task 1 & 2 are created - assertEquals(2, thread.tasks().size()); - - // all directories should still exit before the cleanup delay time - mockTime.sleep(cleanupDelay - 10L); - thread.maybeClean(mockTime.milliseconds()); - assertTrue(stateDir1.exists()); - assertTrue(stateDir2.exists()); - assertTrue(stateDir3.exists()); - assertTrue(extraDir.exists()); - - // all state directories except for task task2 & task3 will be removed. the extra directory should still exists - mockTime.sleep(11L); - thread.maybeClean(mockTime.milliseconds()); - assertTrue(stateDir1.exists()); - assertTrue(stateDir2.exists()); - assertFalse(stateDir3.exists()); - assertTrue(extraDir.exists()); - - // - // Revoke t1p1 and t1p2. This should remove task1 & task2 - // - activeTasks.clear(); - - revokedPartitions = assignedPartitions; - assignedPartitions = Collections.emptyList(); - prevTasks = new HashMap<>(thread.tasks()); - - rebalanceListener.onPartitionsRevoked(revokedPartitions); - rebalanceListener.onPartitionsAssigned(assignedPartitions); - - // previous tasks should be committed - assertEquals(2, prevTasks.size()); - for (final StreamTask task : prevTasks.values()) { - assertTrue(((TestStreamTask) task).committed); - ((TestStreamTask) task).committed = false; - } - - // no task - assertTrue(thread.tasks().isEmpty()); - - // all state directories for task task1 & task2 still exist before the cleanup delay time - mockTime.sleep(cleanupDelay - 10L); - thread.maybeClean(mockTime.milliseconds()); - assertTrue(stateDir1.exists()); - assertTrue(stateDir2.exists()); - assertFalse(stateDir3.exists()); - assertTrue(extraDir.exists()); - - // all state directories for task task1 & task2 are removed - mockTime.sleep(11L); - thread.maybeClean(mockTime.milliseconds()); - assertFalse(stateDir1.exists()); - assertFalse(stateDir2.exists()); - assertFalse(stateDir3.exists()); - assertTrue(extraDir.exists()); - } finally { - Utils.delete(baseDir); - } - } - - @Test - public void testMaybeCommit() throws Exception { + public void testMaybeCommit() throws IOException, InterruptedException { final File baseDir = Files.createTempDirectory("test").toFile(); try { final long commitInterval = 1000L; @@ -783,7 +678,9 @@ public class StreamThreadTest { metrics, mockTime, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), - 0) { + 0, + stateDirectory) { + @Override public void maybeCommit(final long now) { @@ -873,7 +770,8 @@ public class StreamThreadTest { metrics, mockTime, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), - 0); + 0, + stateDirectory); final Map<TaskId, Set<TopicPartition>> assignment = new HashMap<>(); assignment.put(new TaskId(0, 0), Collections.singleton(new TopicPartition("someTopic", 0))); @@ -907,7 +805,8 @@ public class StreamThreadTest { metrics, mockTime, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), - 0); + 0, + stateDirectory); final Map<TaskId, Set<TopicPartition>> assignment = new HashMap<>(); assignment.put(new TaskId(0, 0), Collections.singleton(new TopicPartition("someTopic", 0))); @@ -944,7 +843,8 @@ public class StreamThreadTest { metrics, mockTime, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), - 0); + 0, + stateDirectory); final Map<TaskId, Set<TopicPartition>> assignment = new HashMap<>(); assignment.put(new TaskId(0, 0), Collections.singleton(new TopicPartition("someTopic", 0))); @@ -975,7 +875,8 @@ public class StreamThreadTest { metrics, mockTime, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), - 0); + 0, + stateDirectory); final Map<TaskId, Set<TopicPartition>> assignment = new HashMap<>(); assignment.put(new TaskId(0, 0), Collections.singleton(new TopicPartition("someTopic", 0))); @@ -1004,7 +905,8 @@ public class StreamThreadTest { metrics, mockTime, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), - 0); + 0, + stateDirectory); thread.setPartitionAssignor(new StreamPartitionAssignor() { @Override @@ -1043,7 +945,8 @@ public class StreamThreadTest { metrics, mockTime, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), - 0) { + 0, + stateDirectory) { @Override protected StreamTask createStreamTask(final TaskId id, final Collection<TopicPartition> partitionsForTask) { @@ -1095,7 +998,8 @@ public class StreamThreadTest { metrics, mockTime, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), - 0); + 0, + stateDirectory); final MockConsumer<byte[], byte[]> restoreConsumer = clientSupplier.restoreConsumer; restoreConsumer.updatePartitions("stream-thread-test-count-one-changelog", @@ -1153,7 +1057,8 @@ public class StreamThreadTest { metrics, mockTime, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), - 0); + 0, + stateDirectory); final MockConsumer<byte[], byte[]> restoreConsumer = clientSupplier.restoreConsumer; restoreConsumer.updatePartitions("stream-thread-test-count-one-changelog", Collections.singletonList(new PartitionInfo("stream-thread-test-count-one-changelog", @@ -1226,7 +1131,8 @@ public class StreamThreadTest { metrics, mockTime, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), - 0) { + 0, + stateDirectory) { @Override protected StreamTask createStreamTask(final TaskId id, final Collection<TopicPartition> partitions) { @@ -1304,7 +1210,8 @@ public class StreamThreadTest { metrics, mockTime, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), - 0); + 0, + stateDirectory); final MockConsumer consumer = clientSupplier.consumer; consumer.updatePartitions(TOPIC, Collections.singletonList(new PartitionInfo(TOPIC, 0, null, null, null))); @@ -1395,7 +1302,8 @@ public class StreamThreadTest { new Metrics(), new MockTime(), new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), - 0); + 0, + stateDirectory); final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>(); activeTasks.put(task1, task0Assignment); @@ -1450,7 +1358,8 @@ public class StreamThreadTest { metrics, mockTime, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), - 0) { + 0, + stateDirectory) { @Override protected StreamTask createStreamTask(final TaskId id, final Collection<TopicPartition> partitions) { @@ -1504,7 +1413,8 @@ public class StreamThreadTest { metrics, mockTime, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), - 0) { + 0, + stateDirectory) { @Override protected StreamTask createStreamTask(final TaskId id, final Collection<TopicPartition> partitions) { @@ -1564,7 +1474,8 @@ public class StreamThreadTest { metrics, mockTime, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), - 0) { + 0, + stateDirectory) { @Override protected StreamTask createStreamTask(final TaskId id, final Collection<TopicPartition> partitions) { @@ -1620,7 +1531,8 @@ public class StreamThreadTest { metrics, mockTime, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), - 0) { + 0, + stateDirectory) { @Override protected StreamTask createStreamTask(final TaskId id, final Collection<TopicPartition> partitions) { @@ -1679,7 +1591,9 @@ public class StreamThreadTest { processId, metrics, mockTime, - new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), 0) { + new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), + 0, + stateDirectory) { @Override protected StreamTask createStreamTask(final TaskId id, final Collection<TopicPartition> partitions) { @@ -1723,7 +1637,8 @@ public class StreamThreadTest { metrics, mockTime, new StreamsMetadataState(topologyBuilder, StreamsMetadataState.UNKNOWN_HOST), - 0); + 0, + stateDirectory); final StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor(); final Map<String, Object> configurationMap = new HashMap<>(); @@ -1796,56 +1711,40 @@ public class StreamThreadTest { public void shouldReleaseStateDirLockIfFailureOnTaskSuspend() throws Exception { final TaskId taskId = new TaskId(0, 0); - final StreamThread thread = setupTest(taskId); + final StateDirectory stateDirMock = mockStateDirInteractions(taskId); + final StreamThread thread = setupTest(taskId, stateDirMock); - final StateDirectory testStateDir = new StateDirectory( - applicationId, - config.getString(StreamsConfig.STATE_DIR_CONFIG), - mockTime); - assertFalse(testStateDir.lock(taskId, 0)); try { thread.rebalanceListener.onPartitionsRevoked(Collections.<TopicPartition>emptyList()); fail("Should have thrown exception"); } catch (final Exception e) { - assertTrue(testStateDir.lock(taskId, 0)); + // } finally { - testStateDir.unlock(taskId); + thread.close(); } + + EasyMock.verify(stateDirMock); } @Test public void shouldReleaseStateDirLockIfFailureOnTaskCloseForSuspendedTask() throws Exception { final TaskId taskId = new TaskId(0, 0); - final StreamThread thread = setupTest(taskId); - thread.start(); - - final StateDirectory testStateDir = new StateDirectory( - applicationId, - config.getString(StreamsConfig.STATE_DIR_CONFIG), - mockTime); + final StateDirectory stateDirMock = mockStateDirInteractions(taskId); - assertFalse(testStateDir.lock(taskId, 0)); - try { - thread.close(); - thread.join(); - assertTrue(testStateDir.lock(taskId, 0)); - } finally { - testStateDir.unlock(taskId); - } + final StreamThread thread = setupTest(taskId, stateDirMock); + thread.close(); + thread.join(); + EasyMock.verify(stateDirMock); } - private StreamThread setupTest(final TaskId taskId) { + private StreamThread setupTest(final TaskId taskId, final StateDirectory stateDirectory) throws InterruptedException { final TopologyBuilder builder = new TopologyBuilder(); builder.setApplicationId(applicationId); builder.addSource("source", "topic"); final MockClientSupplier clientSupplier = new MockClientSupplier(); - final StateDirectory stateDirectory = new StateDirectory( - applicationId, - config.getString(StreamsConfig.STATE_DIR_CONFIG), - mockTime); final TestStreamTask testStreamTask = new TestStreamTask(taskId, applicationId, @@ -1864,9 +1763,19 @@ public class StreamThreadTest { } }; - final StreamThread thread = new StreamThread(builder, config, clientSupplier, applicationId, - clientId, processId, new Metrics(), new MockTime(), - new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), 0) { + + final StreamThread thread = new StreamThread( + builder, + config, + clientSupplier, + applicationId, + clientId, + processId, + new Metrics(), + new MockTime(), + new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), + 0, stateDirectory) { + @Override protected StreamTask createStreamTask(final TaskId id, final Collection<TopicPartition> partitions) { return testStreamTask; @@ -1876,6 +1785,8 @@ public class StreamThreadTest { final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>(); activeTasks.put(testStreamTask.id, testStreamTask.partitions); thread.setPartitionAssignor(new MockStreamsPartitionAssignor(activeTasks)); + thread.start(); + thread.rebalanceListener.onPartitionsRevoked(Collections.<TopicPartition>emptySet()); thread.rebalanceListener.onPartitionsAssigned(testStreamTask.partitions); return thread; @@ -1885,45 +1796,62 @@ public class StreamThreadTest { public void shouldReleaseStateDirLockIfFailureOnStandbyTaskSuspend() throws Exception { final TaskId taskId = new TaskId(0, 0); - final StreamThread thread = setupStandbyTest(taskId); + final StateDirectory stateDirMock = mockStateDirInteractions(taskId); + final StreamThread thread = setupStandbyTest(taskId, stateDirMock); - final StateDirectory testStateDir = new StateDirectory(applicationId, - config.getString(StreamsConfig.STATE_DIR_CONFIG), - mockTime); + startThreadAndRebalance(thread); - assertFalse(testStateDir.lock(taskId, 0)); try { thread.rebalanceListener.onPartitionsRevoked(Collections.<TopicPartition>emptyList()); fail("Should have thrown exception"); } catch (final Exception e) { - assertTrue(testStateDir.lock(taskId, 0)); + // ok } finally { - testStateDir.unlock(taskId); + thread.close(); + } + EasyMock.verify(stateDirMock); + } + + private void startThreadAndRebalance(final StreamThread thread) throws InterruptedException { + thread.start(); + TestUtils.waitForCondition(new TestCondition() { + @Override + public boolean conditionMet() { + return thread.state() == StreamThread.State.RUNNING; + } + }, "thread didn't transition to running"); + thread.rebalanceListener.onPartitionsRevoked(Collections.<TopicPartition>emptySet()); + thread.rebalanceListener.onPartitionsAssigned(Collections.<TopicPartition>emptySet()); } @Test public void shouldReleaseStateDirLockIfFailureOnStandbyTaskCloseForUnassignedSuspendedStandbyTask() throws Exception { final TaskId taskId = new TaskId(0, 0); - final StreamThread thread = setupStandbyTest(taskId); - thread.start(); - - final StateDirectory testStateDir = new StateDirectory(applicationId, - config.getString(StreamsConfig.STATE_DIR_CONFIG), - mockTime); - - assertFalse(testStateDir.lock(taskId, 0)); + final StateDirectory stateDirMock = mockStateDirInteractions(taskId); + final StreamThread thread = setupStandbyTest(taskId, stateDirMock); + startThreadAndRebalance(thread); try { thread.close(); thread.join(); - assertTrue(testStateDir.lock(taskId, 0)); } finally { - testStateDir.unlock(taskId); + thread.close(); } + EasyMock.verify(stateDirMock); } - private StreamThread setupStandbyTest(final TaskId taskId) { + private StateDirectory mockStateDirInteractions(final TaskId taskId) throws IOException { + final StateDirectory stateDirMock = EasyMock.createNiceMock(StateDirectory.class); + EasyMock.expect(stateDirMock.lock(EasyMock.eq(taskId), EasyMock.anyInt())).andReturn(true); + EasyMock.expect(stateDirMock.directoryForTask(taskId)).andReturn(new File(stateDir)); + EasyMock.expect(stateDirMock.unlock(taskId)).andReturn(true); + EasyMock.expectLastCall(); + EasyMock.replay(stateDirMock); + return stateDirMock; + } + + private StreamThread setupStandbyTest(final TaskId taskId, final StateDirectory stateDirectory) { final String storeName = "store"; final String changelogTopic = applicationId + "-" + storeName + "-changelog"; @@ -1945,9 +1873,18 @@ public class StreamThreadTest { } }); - final StreamThread thread = new StreamThread(builder, config, clientSupplier, applicationId, - clientId, processId, new Metrics(), new MockTime(), - new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), 0) { + final StreamThread thread = new StreamThread( + builder, + config, + clientSupplier, + applicationId, + clientId, + processId, + new Metrics(), + new MockTime(), + new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), + 0, + stateDirectory) { @Override protected StandbyTask createStandbyTask(final TaskId id, final Collection<TopicPartition> partitions) { @@ -1978,8 +1915,6 @@ public class StreamThreadTest { final Map<TaskId, Set<TopicPartition>> standbyTasks = new HashMap<>(); standbyTasks.put(taskId, Collections.singleton(new TopicPartition("topic", 0))); thread.setPartitionAssignor(new MockStreamsPartitionAssignor(Collections.<TaskId, Set<TopicPartition>>emptyMap(), standbyTasks)); - thread.rebalanceListener.onPartitionsAssigned(Collections.<TopicPartition>emptySet()); - return thread; } @@ -2028,7 +1963,8 @@ public class StreamThreadTest { metrics, Time.SYSTEM, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), - 0) { + 0, + stateDirectory) { @Override protected StreamTask createStreamTask(final TaskId id, final Collection<TopicPartition> partitionsForTask) { http://git-wip-us.apache.org/repos/asf/kafka/blob/2a4eeb1c/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java index bf55b47..f04f80a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java @@ -122,7 +122,8 @@ public class StreamThreadStateStoreProviderTest { new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), - 0) { + 0, + stateDirectory) { @Override public Map<TaskId, StreamTask> tasks() {