Repository: kafka
Updated Branches:
  refs/heads/0.11.0 58125ced7 -> 2a4eeb1c6


KAFKA-5562; Do streams state directory cleanup on a single thread

Backported from trunk: https://github.com/apache/kafka/pull/3516

Author: Damian Guy <damian....@gmail.com>

Reviewers: Guozhang Wang <wangg...@gmail.com>, Eno Thereska 
<eno.there...@gmail.com>

Closes #3654 from dguy/cherry-pick-stream-thread-cleanup


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/2a4eeb1c
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/2a4eeb1c
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/2a4eeb1c

Branch: refs/heads/0.11.0
Commit: 2a4eeb1c6f32c84ea73058bc502b10c2232b011f
Parents: 58125ce
Author: Damian Guy <damian....@gmail.com>
Authored: Fri Aug 11 16:40:28 2017 +0100
Committer: Damian Guy <damian....@gmail.com>
Committed: Fri Aug 11 16:40:28 2017 +0100

----------------------------------------------------------------------
 build.gradle                                    |   1 +
 .../org/apache/kafka/streams/KafkaStreams.java  |  48 ++-
 .../processor/internals/StateDirectory.java     |  69 ++--
 .../processor/internals/StreamThread.java       |  16 +-
 .../apache/kafka/streams/KafkaStreamsTest.java  |  55 +++
 .../integration/RegexSourceIntegrationTest.java |   3 +-
 .../processor/internals/StateDirectoryTest.java |  57 +++
 .../internals/StreamPartitionAssignorTest.java  | 209 ++++++++--
 .../processor/internals/StreamThreadTest.java   | 392 ++++++++-----------
 .../StreamThreadStateStoreProviderTest.java     |   3 +-
 10 files changed, 536 insertions(+), 317 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/2a4eeb1c/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 4d54271..9a23747 100644
--- a/build.gradle
+++ b/build.gradle
@@ -841,6 +841,7 @@ project(':streams') {
     testCompile project(':core')
     testCompile project(':core').sourceSets.test.output
     testCompile libs.junit
+    testCompile libs.easymock
 
     testRuntime libs.slf4jlog4j
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/2a4eeb1c/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java 
b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index c16f379..0c7c598 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -56,7 +56,6 @@ import 
org.apache.kafka.streams.state.internals.StreamThreadStateStoreProvider;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -69,6 +68,9 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 
 import static org.apache.kafka.common.utils.Utils.getHost;
@@ -123,6 +125,7 @@ public class KafkaStreams {
     private static final int DEFAULT_CLOSE_TIMEOUT = 0;
     private GlobalStreamThread globalStreamThread;
 
+    private final ScheduledExecutorService stateDirCleaner;
     private final StreamThread[] threads;
     private final Metrics metrics;
     private final QueryableStoreProvider queryableStoreProvider;
@@ -136,6 +139,7 @@ public class KafkaStreams {
     private final StreamsMetadataState streamsMetadataState;
 
     private final StreamsConfig config;
+    private final StateDirectory stateDirectory;
 
     // container states
     /**
@@ -358,13 +362,13 @@ public class KafkaStreams {
         final long cacheSizeBytes = Math.max(0, 
config.getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG) /
                 (config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG) + 
(globalTaskTopology == null ? 0 : 1)));
 
-
+        stateDirectory = new StateDirectory(applicationId, 
config.getString(StreamsConfig.STATE_DIR_CONFIG), time);
         if (globalTaskTopology != null) {
             final String globalThreadId = clientId + "-GlobalStreamThread";
             globalStreamThread = new GlobalStreamThread(globalTaskTopology,
                                                         config,
                                                         
clientSupplier.getRestoreConsumer(config.getRestoreConsumerConfigs(clientId + 
"-global")),
-                                                        new 
StateDirectory(applicationId, globalThreadId, 
config.getString(StreamsConfig.STATE_DIR_CONFIG), time),
+                                                        stateDirectory,
                                                         metrics,
                                                         time,
                                                         globalThreadId);
@@ -381,13 +385,24 @@ public class KafkaStreams {
                                           metrics,
                                           time,
                                           streamsMetadataState,
-                                          cacheSizeBytes);
+                                          cacheSizeBytes,
+                                          stateDirectory);
+
             threads[i].setStateListener(streamStateListener);
             threadState.put(threads[i].getId(), threads[i].state());
             storeProviders.add(new StreamThreadStateStoreProvider(threads[i]));
         }
         final GlobalStateStoreProvider globalStateStoreProvider = new 
GlobalStateStoreProvider(builder.globalStateStores());
         queryableStoreProvider = new QueryableStoreProvider(storeProviders, 
globalStateStoreProvider);
+        final String cleanupThreadName = clientId + "-CleanupThread";
+        stateDirCleaner = Executors.newSingleThreadScheduledExecutor(new 
ThreadFactory() {
+            @Override
+            public Thread newThread(final Runnable r) {
+                final Thread thread = new Thread(r, cleanupThreadName);
+                thread.setDaemon(true);
+                return thread;
+            }
+        });
     }
 
     private static HostInfo parseHostInfo(final String endPoint) {
@@ -450,10 +465,23 @@ public class KafkaStreams {
                 thread.start();
             }
 
+            final Long cleanupDelay = 
config.getLong(StreamsConfig.STATE_CLEANUP_DELAY_MS_CONFIG);
+            stateDirCleaner.scheduleAtFixedRate(new Runnable() {
+                @Override
+                public void run() {
+                    synchronized (stateLock) {
+                        if (state == State.RUNNING) {
+                            stateDirectory.cleanRemovedTasks(cleanupDelay);
+                        }
+                    }
+                }
+            }, cleanupDelay, cleanupDelay, TimeUnit.MILLISECONDS);
+
             log.info("{} Started Kafka Stream process", logPrefix);
         } else {
             throw new IllegalStateException("Cannot start again.");
         }
+
     }
 
     /**
@@ -478,6 +506,7 @@ public class KafkaStreams {
         log.debug("{} Stopping Kafka Stream process.", logPrefix);
         if (state.isCreatedOrRunning()) {
             setState(State.PENDING_SHUTDOWN);
+            stateDirCleaner.shutdownNow();
             // save the current thread so that if it is a stream thread
             // we don't attempt to join it and cause a deadlock
             final Thread shutdown = new Thread(new Runnable() {
@@ -576,17 +605,6 @@ public class KafkaStreams {
         if (state.isRunning()) {
             throw new IllegalStateException("Cannot clean up while running.");
         }
-
-        final String appId = 
config.getString(StreamsConfig.APPLICATION_ID_CONFIG);
-        final String stateDir = 
config.getString(StreamsConfig.STATE_DIR_CONFIG);
-
-        final String localApplicationDir = stateDir + File.separator + appId;
-        log.debug("{} Removing local Kafka Streams application data in {} for 
application {}.",
-            logPrefix,
-            localApplicationDir,
-            appId);
-
-        final StateDirectory stateDirectory = new StateDirectory(appId, 
"cleanup", stateDir, Time.SYSTEM);
         stateDirectory.cleanRemovedTasks(0);
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/2a4eeb1c/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
index 3e547eb..8d46da1 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
@@ -45,21 +45,25 @@ public class StateDirectory {
     private static final Logger log = 
LoggerFactory.getLogger(StateDirectory.class);
 
     private final File stateDir;
-    private final String logPrefix;
     private final HashMap<TaskId, FileChannel> channels = new HashMap<>();
-    private final HashMap<TaskId, FileLock> locks = new HashMap<>();
+    private final HashMap<TaskId, LockAndOwner> locks = new HashMap<>();
     private final Time time;
 
     private FileChannel globalStateChannel;
     private FileLock globalStateLock;
 
-    public StateDirectory(final String applicationId, final String 
stateDirConfig, final Time time) {
-        this(applicationId, "", stateDirConfig, time);
+    private static class LockAndOwner {
+        final FileLock lock;
+        final String owningThread;
+
+        LockAndOwner(final String owningThread, final FileLock lock) {
+            this.owningThread = owningThread;
+            this.lock = lock;
+        }
     }
 
-    public StateDirectory(final String applicationId, final String threadId, 
final String stateDirConfig, final Time time) {
+    public StateDirectory(final String applicationId, final String 
stateDirConfig, final Time time) {
         this.time = time;
-        this.logPrefix = String.format("stream-thread [%s]", threadId);
         final File baseDir = new File(stateDirConfig);
         if (!baseDir.exists() && !baseDir.mkdirs()) {
             throw new ProcessorStateException(String.format("state directory 
[%s] doesn't exist and couldn't be created",
@@ -95,6 +99,10 @@ public class StateDirectory {
         return dir;
     }
 
+    private String logPrefix() {
+        return String.format("stream-thread [%s]", 
Thread.currentThread().getName());
+    }
+
     /**
      * Get the lock for the {@link TaskId}s directory if it is available
      * @param taskId
@@ -102,13 +110,19 @@ public class StateDirectory {
      * @return true if successful
      * @throws IOException
      */
-    boolean lock(final TaskId taskId, int retry) throws IOException {
+    synchronized boolean lock(final TaskId taskId, int retry) throws 
IOException {
+
         final File lockFile;
         // we already have the lock so bail out here
-        if (locks.containsKey(taskId)) {
-            log.trace("{} Found cached state dir lock for task {}", logPrefix, 
taskId);
+        final LockAndOwner lockAndOwner = locks.get(taskId);
+        if (lockAndOwner != null && 
lockAndOwner.owningThread.equals(Thread.currentThread().getName())) {
+            log.trace("{} Found cached state dir lock for task {}", 
logPrefix(), taskId);
             return true;
+        } else if (lockAndOwner != null) {
+            // another thread owns the lock
+            return false;
         }
+
         try {
             lockFile = new File(directoryForTask(taskId), LOCK_FILE_NAME);
         } catch (ProcessorStateException e) {
@@ -130,16 +144,16 @@ public class StateDirectory {
 
         final FileLock lock = tryLock(retry, channel);
         if (lock != null) {
-            locks.put(taskId, lock);
+            locks.put(taskId, new 
LockAndOwner(Thread.currentThread().getName(), lock));
 
-            log.debug("{} Acquired state dir lock for task {}", logPrefix, 
taskId);
+            log.debug("{} Acquired state dir lock for task {}", logPrefix(), 
taskId);
         }
         return lock != null;
     }
 
-    boolean lockGlobalState(final int retry) throws IOException {
+    synchronized boolean lockGlobalState(final int retry) throws IOException {
         if (globalStateLock != null) {
-            log.trace("{} Found cached state dir lock for the global task", 
logPrefix);
+            log.trace("{} Found cached state dir lock for the global task", 
logPrefix());
             return true;
         }
 
@@ -161,12 +175,12 @@ public class StateDirectory {
         globalStateChannel = channel;
         globalStateLock = fileLock;
 
-        log.debug("{} Acquired global state dir lock", logPrefix);
+        log.debug("{} Acquired global state dir lock", logPrefix());
 
         return true;
     }
 
-    void unlockGlobalState() throws IOException {
+    synchronized void unlockGlobalState() throws IOException {
         if (globalStateLock == null) {
             return;
         }
@@ -175,7 +189,7 @@ public class StateDirectory {
         globalStateLock = null;
         globalStateChannel = null;
 
-        log.debug("{} Released global state dir lock", logPrefix);
+        log.debug("{} Released global state dir lock", logPrefix());
     }
 
     /**
@@ -183,18 +197,20 @@ public class StateDirectory {
      * @param taskId
      * @throws IOException
      */
-    void unlock(final TaskId taskId) throws IOException {
-        final FileLock lock = locks.remove(taskId);
-        if (lock != null) {
-            lock.release();
-
-            log.debug("{} Released state dir lock for task {}", logPrefix, 
taskId);
+    synchronized boolean unlock(final TaskId taskId) throws IOException {
+        final LockAndOwner lockAndOwner = locks.get(taskId);
+        if (lockAndOwner != null && 
lockAndOwner.owningThread.equals(Thread.currentThread().getName())) {
+            locks.remove(taskId);
+            lockAndOwner.lock.release();
+            log.debug("{} Released state dir lock for task {}", logPrefix(), 
taskId);
 
             final FileChannel fileChannel = channels.remove(taskId);
             if (fileChannel != null) {
                 fileChannel.close();
             }
+            return true;
         }
+        return false;
     }
 
     /**
@@ -204,12 +220,11 @@ public class StateDirectory {
      * @param cleanupDelayMs only remove directories if they haven't been 
modified for at least
      *                       this amount of time (milliseconds)
      */
-    public void cleanRemovedTasks(final long cleanupDelayMs) {
+    public synchronized void cleanRemovedTasks(final long cleanupDelayMs) {
         final File[] taskDirs = listTaskDirectories();
         if (taskDirs == null || taskDirs.length == 0) {
             return; // nothing to do
         }
-
         for (File taskDir : taskDirs) {
             final String dirName = taskDir.getName();
             TaskId id = TaskId.parse(dirName);
@@ -219,19 +234,19 @@ public class StateDirectory {
                         long now = time.milliseconds();
                         long lastModifiedMs = taskDir.lastModified();
                         if (now > lastModifiedMs + cleanupDelayMs) {
-                            log.info("{} Deleting obsolete state directory {} 
for task {} as {}ms has elapsed (cleanup delay is {}ms)", logPrefix, dirName, 
id, now - lastModifiedMs, cleanupDelayMs);
+                            log.info("{} Deleting obsolete state directory {} 
for task {} as {}ms has elapsed (cleanup delay is {}ms)", logPrefix(), dirName, 
id, now - lastModifiedMs, cleanupDelayMs);
                             Utils.delete(taskDir);
                         }
                     }
                 } catch (OverlappingFileLockException e) {
                     // locked by another thread
                 } catch (IOException e) {
-                    log.error("{} Failed to lock the state directory due to an 
unexpected exception", logPrefix, e);
+                    log.error("{} Failed to lock the state directory due to an 
unexpected exception", logPrefix(), e);
                 } finally {
                     try {
                         unlock(id);
                     } catch (IOException e) {
-                        log.error("{} Failed to release the state directory 
lock", logPrefix);
+                        log.error("{} Failed to release the state directory 
lock", logPrefix());
                     }
                 }
             }

http://git-wip-us.apache.org/repos/asf/kafka/blob/2a4eeb1c/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 9774627..c2da0cb 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -438,7 +438,8 @@ public class StreamThread extends Thread {
                         final Metrics metrics,
                         final Time time,
                         final StreamsMetadataState streamsMetadataState,
-                        final long cacheSizeBytes) {
+                        final long cacheSizeBytes,
+                        final StateDirectory stateDirectory) {
         super(clientId + "-StreamThread-" + 
STREAM_THREAD_ID_SEQUENCE.getAndIncrement());
         this.applicationId = applicationId;
         this.config = config;
@@ -488,7 +489,7 @@ public class StreamThread extends Thread {
         // standby KTables
         standbyRecords = new HashMap<>();
 
-        stateDirectory = new StateDirectory(applicationId, threadClientId, 
config.getString(StreamsConfig.STATE_DIR_CONFIG), time);
+        this.stateDirectory = stateDirectory;
         final Object maxPollInterval = 
consumerConfigs.get(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG);
         rebalanceTimeoutMs =  (Integer) 
ConfigDef.parseType(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 
maxPollInterval, Type.INT);
         pollTimeMs = config.getLong(StreamsConfig.POLL_MS_CONFIG);
@@ -557,7 +558,6 @@ public class StreamThread extends Thread {
 
             maybeCommit(timerStartedMs);
             maybeUpdateStandbyTasks(timerStartedMs);
-            maybeClean(timerStartedMs);
         }
         log.info("{} Shutting down at user request", logPrefix);
     }
@@ -881,16 +881,6 @@ public class StreamThread extends Thread {
     }
 
     /**
-     * Cleanup any states of the tasks that have been removed from this thread
-     */
-    protected void maybeClean(final long now) {
-        if (now > lastCleanMs + cleanTimeMs) {
-            stateDirectory.cleanRemovedTasks(cleanTimeMs);
-            lastCleanMs = now;
-        }
-    }
-
-    /**
      * Compute the latency based on the current marked timestamp, and update 
the marked timestamp
      * with the current system timestamp.
      *

http://git-wip-us.apache.org/repos/asf/kafka/blob/2a4eeb1c/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java 
b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
index 4ebc42b..a03b7cc 100644
--- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
@@ -30,6 +30,7 @@ import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.processor.StreamPartitioner;
 import org.apache.kafka.test.IntegrationTest;
 import org.apache.kafka.test.MockMetricsReporter;
+import org.apache.kafka.test.TestCondition;
 import org.apache.kafka.test.TestUtils;
 import org.junit.Assert;
 import org.junit.Before;
@@ -37,6 +38,7 @@ import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
+import java.io.File;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
@@ -302,6 +304,59 @@ public class KafkaStreamsTest {
     }
 
 
+    @Test
+    public void shouldCleanupOldStateDirs() throws InterruptedException {
+        final Properties props = new Properties();
+        final String appId = "cleanupOldStateDirs";
+        final String stateDir = TestUtils.tempDirectory().getPath();
+        props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, appId);
+        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
+        props.setProperty(StreamsConfig.STATE_CLEANUP_DELAY_MS_CONFIG, "1");
+        props.setProperty(StreamsConfig.STATE_DIR_CONFIG, stateDir);
+
+
+        final String topic = "topic";
+        CLUSTER.createTopic(topic);
+        final KStreamBuilder builder = new KStreamBuilder();
+
+        builder.stream(Serdes.String(), Serdes.String(), topic);
+
+        final KafkaStreams streams = new KafkaStreams(builder, props);
+        final CountDownLatch latch = new CountDownLatch(1);
+        streams.setStateListener(new KafkaStreams.StateListener() {
+            @Override
+            public void onChange(final KafkaStreams.State newState, final 
KafkaStreams.State oldState) {
+                if (newState == KafkaStreams.State.RUNNING && oldState == 
KafkaStreams.State.REBALANCING) {
+                    latch.countDown();
+                }
+            }
+        });
+        final String appDir = stateDir + File.separator + appId;
+        final File oldTaskDir = new File(appDir, "10_1");
+        assertTrue(oldTaskDir.mkdirs());
+        try {
+            streams.start();
+            latch.await(30, TimeUnit.SECONDS);
+            verifyCleanupStateDir(appDir, oldTaskDir);
+            assertTrue(oldTaskDir.mkdirs());
+            verifyCleanupStateDir(appDir, oldTaskDir);
+        } finally {
+            streams.close();
+        }
+    }
+
+    private void verifyCleanupStateDir(final String appDir, final File 
oldTaskDir) throws InterruptedException {
+        final File taskDir = new File(appDir, "0_0");
+        TestUtils.waitForCondition(new TestCondition() {
+            @Override
+            public boolean conditionMet() {
+                return !oldTaskDir.exists() && taskDir.exists();
+            }
+        }, 30000, "cleanup has not successfully run");
+        assertTrue(taskDir.exists());
+    }
+
+
     public static class StateListenerStub implements 
KafkaStreams.StateListener {
         public int numChanges = 0;
         public KafkaStreams.State oldState;

http://git-wip-us.apache.org/repos/asf/kafka/blob/2a4eeb1c/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
index 0b5c5e9..8b749be 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
@@ -36,6 +36,7 @@ import org.apache.kafka.streams.processor.ProcessorSupplier;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.TopologyBuilder;
 import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
+import org.apache.kafka.streams.processor.internals.StateDirectory;
 import org.apache.kafka.streams.processor.internals.StreamTask;
 import org.apache.kafka.streams.processor.internals.StreamThread;
 import org.apache.kafka.streams.processor.internals.StreamsMetadataState;
@@ -440,7 +441,7 @@ public class RegexSourceIntegrationTest {
 
         public TestStreamThread(final TopologyBuilder builder, final 
StreamsConfig config, final KafkaClientSupplier clientSupplier, final String 
applicationId, final String clientId, final UUID processId, final Metrics 
metrics, final Time time) {
             super(builder, config, clientSupplier, applicationId, clientId, 
processId, metrics, time, new StreamsMetadataState(builder, 
StreamsMetadataState.UNKNOWN_HOST),
-                  0);
+                  0, new StateDirectory(applicationId, 
config.getString(StreamsConfig.STATE_DIR_CONFIG), time));
         }
 
         @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/2a4eeb1c/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
index f1b5efe..815d640 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
@@ -32,9 +32,13 @@ import java.nio.channels.OverlappingFileLockException;
 import java.nio.file.StandardOpenOption;
 import java.util.Arrays;
 import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -266,4 +270,57 @@ public class StateDirectoryTest {
         }
     }
 
+    @Test
+    public void shouldNotLockStateDirLockedByAnotherThread() throws 
IOException, InterruptedException {
+        final TaskId taskId = new TaskId(0, 0);
+        final AtomicReference<IOException> exceptionOnThread = new 
AtomicReference<>();
+        final Thread thread = new Thread(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    directory.lock(taskId, 1);
+                } catch (final IOException e) {
+                    exceptionOnThread.set(e);
+                }
+            }
+        });
+        thread.start();
+        thread.join(30000);
+        assertNull("should not have had an exception during locking on other 
thread", exceptionOnThread.get());
+        assertFalse(directory.lock(taskId, 1));
+    }
+
+    @Test
+    public void shouldNotUnLockStateDirLockedByAnotherThread() throws 
IOException, InterruptedException {
+        final TaskId taskId = new TaskId(0, 0);
+        final CountDownLatch lockLatch = new CountDownLatch(1);
+        final CountDownLatch unlockLatch = new CountDownLatch(1);
+        final AtomicReference<Exception> exceptionOnThread = new 
AtomicReference<>();
+        final Thread thread = new Thread(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    assertTrue(directory.lock(taskId, 1));
+                    lockLatch.countDown();
+                    unlockLatch.await();
+                    directory.unlock(taskId);
+                } catch (final Exception e) {
+                    exceptionOnThread.set(e);
+                }
+            }
+        });
+        thread.start();
+        lockLatch.await(5, TimeUnit.SECONDS);
+
+        assertNull("should not have had an exception on other thread", 
exceptionOnThread.get());
+        assertFalse(directory.unlock(taskId));
+        assertFalse(directory.lock(taskId, 1));
+
+        unlockLatch.countDown();
+        thread.join(30000);
+
+        assertNull("should not have had an exception on other thread", 
exceptionOnThread.get());
+        assertTrue(directory.lock(taskId, 1));
+    }
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/2a4eeb1c/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
index 17eb50a..98cd20a 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
@@ -44,6 +44,7 @@ import org.apache.kafka.test.MockInternalTopicManager;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockStateStoreSupplier;
 import org.apache.kafka.test.MockTimestampExtractor;
+import org.apache.kafka.test.TestUtils;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -106,11 +107,12 @@ public class StreamPartitionAssignorTest {
     private final MockClientSupplier mockClientSupplier = new 
MockClientSupplier();
     private final TopologyBuilder builder = new TopologyBuilder();
     private final StreamsConfig config = new StreamsConfig(configProps());
+    private final StateDirectory stateDirectory = new StateDirectory("appId", 
TestUtils.tempDirectory().getPath(), new MockTime());
     private final StreamThread mockStreamThread = new StreamThread(builder, 
config,
                                                                    
mockClientSupplier, "appID",
                                                                    "clientId", 
UUID.randomUUID(),
                                                                    new 
Metrics(), new MockTime(),
-                                                                   null, 1L);
+                                                                   null, 1L, 
stateDirectory);
     private final Map<String, Object> configurationMap = new HashMap<>();
 
     private Properties configProps() {
@@ -146,8 +148,20 @@ public class StreamPartitionAssignorTest {
 
         String clientId = "client-id";
         UUID processId = UUID.randomUUID();
-        StreamThread thread = new StreamThread(builder, config, new 
MockClientSupplier(), "test", clientId, processId, new Metrics(), Time.SYSTEM, 
new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
-                                               0) {
+        StreamThread thread = new StreamThread(
+            builder,
+            config,
+            new MockClientSupplier(),
+            "test",
+            clientId,
+            processId,
+            new Metrics(),
+            Time.SYSTEM,
+            new StreamsMetadataState(builder,
+                StreamsMetadataState.UNKNOWN_HOST),
+            0,
+            stateDirectory) {
+
             @Override
             public Set<TaskId> prevActiveTasks() {
                 return prevTasks;
@@ -192,10 +206,19 @@ public class StreamPartitionAssignorTest {
         String client1 = "client1";
 
 
-        StreamThread thread10 = new StreamThread(builder, config, 
mockClientSupplier, "test", client1, uuid1, new Metrics(), Time.SYSTEM, new 
StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
-                                                 0);
-
 
+        StreamThread thread10 = new StreamThread(
+            builder,
+            config,
+            mockClientSupplier,
+            "test",
+            client1,
+            uuid1,
+            new Metrics(),
+            Time.SYSTEM,
+            new StreamsMetadataState(builder, 
StreamsMetadataState.UNKNOWN_HOST),
+            0,
+            stateDirectory);
         partitionAssignor.configure(config.getConsumerConfigs(thread10, 
"test", client1));
         partitionAssignor.setInternalTopicManager(new 
MockInternalTopicManager(thread10.config, mockClientSupplier.restoreConsumer));
 
@@ -258,7 +281,19 @@ public class StreamPartitionAssignorTest {
         UUID uuid1 = UUID.randomUUID();
         String client1 = "client1";
 
-        StreamThread thread10 = new StreamThread(builder, config, 
mockClientSupplier, "test", client1, uuid1, new Metrics(), Time.SYSTEM, new 
StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), 0);
+        StreamThread thread10 = new StreamThread(
+            builder,
+            config,
+            mockClientSupplier,
+            "test",
+            client1,
+            uuid1,
+            new Metrics(),
+            Time.SYSTEM,
+            new StreamsMetadataState(builder, 
StreamsMetadataState.UNKNOWN_HOST),
+            0,
+            stateDirectory);
+
 
         partitionAssignor.configure(config.getConsumerConfigs(thread10, 
"test", client1));
         partitionAssignor.setInternalTopicManager(new 
MockInternalTopicManager(thread10.config, mockClientSupplier.restoreConsumer));
@@ -297,7 +332,18 @@ public class StreamPartitionAssignorTest {
         UUID uuid1 = UUID.randomUUID();
         String client1 = "client1";
 
-        StreamThread thread10 = new StreamThread(builder, config, new 
MockClientSupplier(), "test", client1, uuid1, new Metrics(), Time.SYSTEM, new 
StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), 0);
+        StreamThread thread10 = new StreamThread(
+            builder,
+            config,
+            new MockClientSupplier(),
+            "test",
+            client1,
+            uuid1,
+            new Metrics(),
+            Time.SYSTEM,
+            new StreamsMetadataState(builder, 
StreamsMetadataState.UNKNOWN_HOST),
+            0,
+            stateDirectory);
 
         partitionAssignor.configure(config.getConsumerConfigs(thread10, 
"test", client1));
 
@@ -355,8 +401,18 @@ public class StreamPartitionAssignorTest {
         UUID uuid2 = UUID.randomUUID();
         String client1 = "client1";
 
-        StreamThread thread10 = new StreamThread(builder, config, 
mockClientSupplier, "test", client1, uuid1, new Metrics(), Time.SYSTEM, new 
StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
-                                                 0);
+        StreamThread thread10 = new StreamThread(
+            builder,
+            config,
+            mockClientSupplier,
+            "test",
+            client1,
+            uuid1,
+            new Metrics(),
+            Time.SYSTEM,
+            new StreamsMetadataState(builder, 
StreamsMetadataState.UNKNOWN_HOST),
+            0,
+            stateDirectory);
 
         partitionAssignor.configure(config.getConsumerConfigs(thread10, 
"test", client1));
         partitionAssignor.setInternalTopicManager(new 
MockInternalTopicManager(thread10.config, mockClientSupplier.restoreConsumer));
@@ -423,8 +479,19 @@ public class StreamPartitionAssignorTest {
         String client1 = "client1";
 
 
-        StreamThread thread10 = new StreamThread(builder, config, 
mockClientSupplier, applicationId, client1, uuid1, new Metrics(), Time.SYSTEM, 
new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
-                                                 0);
+        StreamThread thread10 = new StreamThread(
+            builder,
+            config,
+            mockClientSupplier,
+            applicationId,
+            client1,
+            uuid1,
+            new Metrics(),
+            Time.SYSTEM,
+            new StreamsMetadataState(builder, 
StreamsMetadataState.UNKNOWN_HOST),
+            0,
+            stateDirectory);
+
 
         partitionAssignor.configure(config.getConsumerConfigs(thread10, 
applicationId, client1));
         partitionAssignor.setInternalTopicManager(new 
MockInternalTopicManager(thread10.config, mockClientSupplier.restoreConsumer));
@@ -507,8 +574,18 @@ public class StreamPartitionAssignorTest {
         UUID uuid2 = UUID.randomUUID();
         String client1 = "client1";
 
-        StreamThread thread10 = new StreamThread(builder, config, 
mockClientSupplier, "test", client1, uuid1, new Metrics(), Time.SYSTEM, new 
StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
-                                                 0);
+        StreamThread thread10 = new StreamThread(
+            builder,
+            config,
+            mockClientSupplier,
+            "test",
+            client1,
+            uuid1,
+            new Metrics(),
+            Time.SYSTEM,
+            new StreamsMetadataState(builder, 
StreamsMetadataState.UNKNOWN_HOST),
+            0,
+            stateDirectory);
 
         partitionAssignor.configure(config.getConsumerConfigs(thread10, 
"test", client1));
         partitionAssignor.setInternalTopicManager(new 
MockInternalTopicManager(thread10.config, mockClientSupplier.restoreConsumer));
@@ -568,8 +645,18 @@ public class StreamPartitionAssignorTest {
         UUID uuid = UUID.randomUUID();
         String client1 = "client1";
 
-        StreamThread thread = new StreamThread(builder, config, 
mockClientSupplier, "test", client1, uuid, new Metrics(), Time.SYSTEM, new 
StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
-                                               0);
+        StreamThread thread = new StreamThread(
+            builder,
+            config,
+            mockClientSupplier,
+            "test",
+            client1,
+            uuid,
+            new Metrics(),
+            Time.SYSTEM,
+            new StreamsMetadataState(builder, 
StreamsMetadataState.UNKNOWN_HOST),
+            0,
+            stateDirectory);
 
         partitionAssignor.configure(config.getConsumerConfigs(thread, "test", 
client1));
 
@@ -605,9 +692,18 @@ public class StreamPartitionAssignorTest {
         UUID uuid1 = UUID.randomUUID();
         String client1 = "client1";
 
-
-        StreamThread thread10 = new StreamThread(builder, config, 
mockClientSupplier, applicationId, client1, uuid1, new Metrics(), Time.SYSTEM, 
new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
-                                                 0);
+        StreamThread thread10 = new StreamThread(
+            builder,
+            config,
+            mockClientSupplier,
+            applicationId,
+            client1,
+            uuid1,
+            new Metrics(),
+            Time.SYSTEM,
+            new StreamsMetadataState(builder, 
StreamsMetadataState.UNKNOWN_HOST),
+            0,
+            stateDirectory);
 
         partitionAssignor.configure(config.getConsumerConfigs(thread10, 
applicationId, client1));
         MockInternalTopicManager internalTopicManager = new 
MockInternalTopicManager(thread10.config, mockClientSupplier.restoreConsumer);
@@ -645,7 +741,7 @@ public class StreamPartitionAssignorTest {
         String client1 = "client1";
 
         StreamThread thread10 = new StreamThread(builder, config, 
mockClientSupplier, applicationId, client1, uuid1, new Metrics(), Time.SYSTEM, 
new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
-                                                 0);
+                                                 0, stateDirectory);
 
         partitionAssignor.configure(config.getConsumerConfigs(thread10, 
applicationId, client1));
         MockInternalTopicManager internalTopicManager = new 
MockInternalTopicManager(thread10.config, mockClientSupplier.restoreConsumer);
@@ -678,7 +774,7 @@ public class StreamPartitionAssignorTest {
         final String client1 = "client1";
 
         final StreamThread streamThread = new StreamThread(builder, config, 
mockClientSupplier, applicationId, client1, uuid1, new Metrics(), Time.SYSTEM, 
new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
-                                                           0);
+                                                           0, stateDirectory);
 
         partitionAssignor.configure(config.getConsumerConfigs(streamThread, 
applicationId, client1));
         final PartitionAssignor.Subscription subscription = 
partitionAssignor.subscription(Utils.mkSet("input"));
@@ -703,8 +799,18 @@ public class StreamPartitionAssignorTest {
         final UUID uuid1 = UUID.randomUUID();
         final String client1 = "client1";
 
-        final StreamThread streamThread = new StreamThread(builder, config, 
mockClientSupplier, applicationId, client1, uuid1, new Metrics(), Time.SYSTEM, 
new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
-                                                           0);
+        final StreamThread streamThread = new StreamThread(
+            builder,
+            config,
+            mockClientSupplier,
+            applicationId,
+            client1,
+            uuid1,
+            new Metrics(),
+            Time.SYSTEM,
+            new StreamsMetadataState(builder, 
StreamsMetadataState.UNKNOWN_HOST),
+            0,
+            stateDirectory);
 
         final StreamPartitionAssignor partitionAssignor = new 
StreamPartitionAssignor();
         partitionAssignor.configure(config.getConsumerConfigs(streamThread, 
applicationId, client1));
@@ -735,9 +841,18 @@ public class StreamPartitionAssignorTest {
         final String applicationId = "application-id";
         builder.setApplicationId(applicationId);
 
-        final StreamThread streamThread = new StreamThread(builder, config, 
mockClientSupplier, applicationId, client1, uuid1,
-                                                           new Metrics(), 
Time.SYSTEM, new StreamsMetadataState(builder, 
StreamsMetadataState.UNKNOWN_HOST),
-                                                           0);
+        final StreamThread streamThread = new StreamThread(
+            builder,
+            config,
+            mockClientSupplier,
+            applicationId,
+            client1,
+            uuid1,
+            new Metrics(),
+            Time.SYSTEM,
+            new StreamsMetadataState(builder, 
StreamsMetadataState.UNKNOWN_HOST),
+            0,
+            stateDirectory);
 
         partitionAssignor.setInternalTopicManager(new 
MockInternalTopicManager(streamThread.config, 
mockClientSupplier.restoreConsumer));
 
@@ -760,10 +875,18 @@ public class StreamPartitionAssignorTest {
         final String applicationId = "application-id";
         builder.setApplicationId(applicationId);
 
-
-        final StreamThread streamThread = new StreamThread(builder, config, 
mockClientSupplier, applicationId, client1, uuid1,
-                                                           new Metrics(), 
Time.SYSTEM, new StreamsMetadataState(builder, 
StreamsMetadataState.UNKNOWN_HOST),
-                                                           0);
+        final StreamThread streamThread = new StreamThread(
+            builder,
+            config,
+            mockClientSupplier,
+            applicationId,
+            client1,
+            uuid1,
+            new Metrics(),
+            Time.SYSTEM,
+            new StreamsMetadataState(builder, 
StreamsMetadataState.UNKNOWN_HOST),
+            0,
+            stateDirectory);
 
         try {
             
partitionAssignor.configure(config.getConsumerConfigs(streamThread, 
applicationId, client1));
@@ -875,7 +998,18 @@ public class StreamPartitionAssignorTest {
         final UUID uuid = UUID.randomUUID();
         final String client = "client1";
 
-        final StreamThread streamThread = new StreamThread(builder, config, 
mockClientSupplier, applicationId, client, uuid, new Metrics(), Time.SYSTEM, 
new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), 0);
+        final StreamThread streamThread = new StreamThread(
+            builder,
+            config,
+            mockClientSupplier,
+            applicationId,
+            client,
+            uuid,
+            new Metrics(),
+            Time.SYSTEM,
+            new StreamsMetadataState(builder, 
StreamsMetadataState.UNKNOWN_HOST),
+            0,
+            stateDirectory);
 
         partitionAssignor.configure(config.getConsumerConfigs(streamThread, 
applicationId, client));
         final MockInternalTopicManager mockInternalTopicManager = new 
MockInternalTopicManager(streamThread.config, 
mockClientSupplier.restoreConsumer);
@@ -954,7 +1088,18 @@ public class StreamPartitionAssignorTest {
         final UUID uuid = UUID.randomUUID();
         final String client = "client1";
 
-        final StreamThread streamThread = new StreamThread(builder, config, 
mockClientSupplier, applicationId, client, uuid, new Metrics(), Time.SYSTEM, 
new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), 0);
+        final StreamThread streamThread = new StreamThread(
+            builder,
+            config,
+            mockClientSupplier,
+            applicationId,
+            client,
+            uuid,
+            new Metrics(),
+            Time.SYSTEM,
+            new StreamsMetadataState(builder, 
StreamsMetadataState.UNKNOWN_HOST),
+            0,
+            stateDirectory);
 
         partitionAssignor.configure(config.getConsumerConfigs(streamThread, 
applicationId, client));
         partitionAssignor.setInternalTopicManager(new 
MockInternalTopicManager(streamThread.config, 
mockClientSupplier.restoreConsumer));

http://git-wip-us.apache.org/repos/asf/kafka/blob/2a4eeb1c/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index d8d2e4f..3b280f1 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -48,11 +48,13 @@ import org.apache.kafka.test.MockStateStoreSupplier;
 import org.apache.kafka.test.MockTimestampExtractor;
 import org.apache.kafka.test.TestCondition;
 import org.apache.kafka.test.TestUtils;
+import org.easymock.EasyMock;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
 import java.io.File;
+import java.io.IOException;
 import java.lang.reflect.Field;
 import java.nio.ByteBuffer;
 import java.nio.file.Files;
@@ -92,7 +94,8 @@ public class StreamThreadTest {
     private UUID processId = UUID.randomUUID();
     final KStreamBuilder builder = new KStreamBuilder();
     private final StreamsConfig config = new StreamsConfig(configProps(false));
-
+    private final String stateDir = TestUtils.tempDirectory().getPath();
+    private final StateDirectory stateDirectory  = new 
StateDirectory("applicationId", stateDir, mockTime);
 
     @Before
     public void setUp() throws Exception {
@@ -441,8 +444,47 @@ public class StreamThreadTest {
         assertTrue(thread.tasks().isEmpty());
 
         thread.close();
+
         assertTrue((thread.state() == StreamThread.State.PENDING_SHUTDOWN) ||
             (thread.state() == StreamThread.State.CREATED));
+
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void testStateChangeStartClose() throws InterruptedException {
+
+        final StreamThread thread = new StreamThread(
+            builder,
+            config,
+            clientSupplier,
+            applicationId,
+            clientId,
+            processId,
+            metrics,
+            Time.SYSTEM,
+            new StreamsMetadataState(builder, 
StreamsMetadataState.UNKNOWN_HOST),
+            0,
+            stateDirectory);
+
+        final StateListenerStub stateListener = new StateListenerStub();
+        thread.setStateListener(stateListener);
+        thread.start();
+        TestUtils.waitForCondition(new TestCondition() {
+            @Override
+            public boolean conditionMet() {
+                return thread.state() == StreamThread.State.RUNNING;
+            }
+        }, 10 * 1000, "Thread never started.");
+        thread.close();
+        assertEquals(thread.state(), StreamThread.State.PENDING_SHUTDOWN);
+        TestUtils.waitForCondition(new TestCondition() {
+            @Override
+            public boolean conditionMet() {
+                return thread.state() == StreamThread.State.DEAD;
+            }
+        }, 10 * 1000, "Thread never shut down.");
+        thread.close();
     }
 
     private final static String TOPIC = "topic";
@@ -474,7 +516,8 @@ public class StreamThreadTest {
             metrics,
             Time.SYSTEM,
             new StreamsMetadataState(builder, 
StreamsMetadataState.UNKNOWN_HOST),
-            0);
+            0,
+            stateDirectory);
         final StreamThread thread2 = new StreamThread(
             builder,
             config,
@@ -485,7 +528,8 @@ public class StreamThreadTest {
             metrics,
             Time.SYSTEM,
             new StreamsMetadataState(builder, 
StreamsMetadataState.UNKNOWN_HOST),
-            0);
+            0,
+            stateDirectory);
 
         final Map<TaskId, Set<TopicPartition>> task0 = 
Collections.singletonMap(new TaskId(0, 0), task0Assignment);
         final Map<TaskId, Set<TopicPartition>> task1 = 
Collections.singletonMap(new TaskId(0, 1), task1Assignment);
@@ -580,7 +624,8 @@ public class StreamThreadTest {
             metrics,
             mockTime,
             new StreamsMetadataState(builder, 
StreamsMetadataState.UNKNOWN_HOST),
-            0);
+            0,
+            stateDirectory);
         final String defaultGroupName = "stream-metrics";
         final String defaultPrefix = "thread." + thread.threadClientId();
         final Map<String, String> defaultTags = 
Collections.singletonMap("client-id", thread.threadClientId());
@@ -611,157 +656,7 @@ public class StreamThreadTest {
     }
 
     @Test
-    public void testMaybeClean() throws Exception {
-        final File baseDir = Files.createTempDirectory("test").toFile();
-        try {
-            final long cleanupDelay = 1000L;
-            final Properties props = configProps(false);
-            props.setProperty(StreamsConfig.STATE_CLEANUP_DELAY_MS_CONFIG, 
Long.toString(cleanupDelay));
-            props.setProperty(StreamsConfig.STATE_DIR_CONFIG, 
baseDir.getCanonicalPath());
-            final StreamsConfig config = new StreamsConfig(props);
-            final File applicationDir = new File(baseDir, applicationId);
-            applicationDir.mkdir();
-            final File stateDir1 = new File(applicationDir, task1.toString());
-            final File stateDir2 = new File(applicationDir, task2.toString());
-            final File stateDir3 = new File(applicationDir, task3.toString());
-            final File extraDir = new File(applicationDir, applicationId);
-            stateDir1.mkdir();
-            stateDir2.mkdir();
-            stateDir3.mkdir();
-            extraDir.mkdir();
-
-            builder.addSource("source1", "topic1");
-
-            final StreamThread thread = new StreamThread(
-                builder,
-                config,
-                clientSupplier,
-                applicationId,
-                clientId,
-                processId,
-                metrics,
-                mockTime,
-                new StreamsMetadataState(builder, 
StreamsMetadataState.UNKNOWN_HOST),
-                0) {
-
-                @Override
-                public void maybeClean(final long now) {
-                    super.maybeClean(now);
-                }
-
-                @Override
-                protected StreamTask createStreamTask(final TaskId id, final 
Collection<TopicPartition> partitionsForTask) {
-                    final ProcessorTopology topology = 
builder.build(id.topicGroupId);
-                    return new TestStreamTask(
-                        id,
-                        applicationId,
-                        partitionsForTask,
-                        topology,
-                        consumer,
-                        clientSupplier.getProducer(new HashMap<String, 
Object>()),
-                        restoreConsumer,
-                        config,
-                        new MockStreamsMetrics(new Metrics()),
-                        stateDirectory);
-                }
-            };
-
-            initPartitionGrouper(config, thread, clientSupplier);
-            assertTrue(thread.tasks().isEmpty());
-            mockTime.sleep(cleanupDelay);
-
-            // all directories exist since an assignment didn't happen
-            assertTrue(stateDir1.exists());
-            assertTrue(stateDir2.exists());
-            assertTrue(stateDir3.exists());
-            assertTrue(extraDir.exists());
-
-            List<TopicPartition> revokedPartitions;
-            List<TopicPartition> assignedPartitions;
-            Map<TaskId, StreamTask> prevTasks;
-
-            //
-            // Assign t1p1 and t1p2. This should create task1 & task2
-            //
-            final Map<TaskId, Set<TopicPartition>> activeTasks = new 
HashMap<>();
-            activeTasks.put(task1, Collections.singleton(t1p1));
-            activeTasks.put(task2, Collections.singleton(t1p2));
-            thread.setPartitionAssignor(new 
MockStreamsPartitionAssignor(activeTasks));
-
-            revokedPartitions = Collections.emptyList();
-            assignedPartitions = Arrays.asList(t1p1, t1p2);
-            prevTasks = new HashMap<>(thread.tasks());
-
-            final ConsumerRebalanceListener rebalanceListener = 
thread.rebalanceListener;
-            rebalanceListener.onPartitionsRevoked(revokedPartitions);
-            rebalanceListener.onPartitionsAssigned(assignedPartitions);
-
-            // there shouldn't be any previous task
-            assertTrue(prevTasks.isEmpty());
-
-            // task 1 & 2 are created
-            assertEquals(2, thread.tasks().size());
-
-            // all directories should still exit before the cleanup delay time
-            mockTime.sleep(cleanupDelay - 10L);
-            thread.maybeClean(mockTime.milliseconds());
-            assertTrue(stateDir1.exists());
-            assertTrue(stateDir2.exists());
-            assertTrue(stateDir3.exists());
-            assertTrue(extraDir.exists());
-
-            // all state directories except for task task2 & task3 will be 
removed. the extra directory should still exists
-            mockTime.sleep(11L);
-            thread.maybeClean(mockTime.milliseconds());
-            assertTrue(stateDir1.exists());
-            assertTrue(stateDir2.exists());
-            assertFalse(stateDir3.exists());
-            assertTrue(extraDir.exists());
-
-            //
-            // Revoke t1p1 and t1p2. This should remove task1 & task2
-            //
-            activeTasks.clear();
-
-            revokedPartitions = assignedPartitions;
-            assignedPartitions = Collections.emptyList();
-            prevTasks = new HashMap<>(thread.tasks());
-
-            rebalanceListener.onPartitionsRevoked(revokedPartitions);
-            rebalanceListener.onPartitionsAssigned(assignedPartitions);
-
-            // previous tasks should be committed
-            assertEquals(2, prevTasks.size());
-            for (final StreamTask task : prevTasks.values()) {
-                assertTrue(((TestStreamTask) task).committed);
-                ((TestStreamTask) task).committed = false;
-            }
-
-            // no task
-            assertTrue(thread.tasks().isEmpty());
-
-            // all state directories for task task1 & task2 still exist before 
the cleanup delay time
-            mockTime.sleep(cleanupDelay - 10L);
-            thread.maybeClean(mockTime.milliseconds());
-            assertTrue(stateDir1.exists());
-            assertTrue(stateDir2.exists());
-            assertFalse(stateDir3.exists());
-            assertTrue(extraDir.exists());
-
-            // all state directories for task task1 & task2 are removed
-            mockTime.sleep(11L);
-            thread.maybeClean(mockTime.milliseconds());
-            assertFalse(stateDir1.exists());
-            assertFalse(stateDir2.exists());
-            assertFalse(stateDir3.exists());
-            assertTrue(extraDir.exists());
-        } finally {
-            Utils.delete(baseDir);
-        }
-    }
-
-    @Test
-    public void testMaybeCommit() throws Exception {
+    public void testMaybeCommit() throws IOException, InterruptedException {
         final File baseDir = Files.createTempDirectory("test").toFile();
         try {
             final long commitInterval = 1000L;
@@ -783,7 +678,9 @@ public class StreamThreadTest {
                 metrics,
                 mockTime,
                 new StreamsMetadataState(builder, 
StreamsMetadataState.UNKNOWN_HOST),
-                0) {
+                0,
+                stateDirectory) {
+
 
                 @Override
                 public void maybeCommit(final long now) {
@@ -873,7 +770,8 @@ public class StreamThreadTest {
             metrics,
             mockTime,
             new StreamsMetadataState(builder, 
StreamsMetadataState.UNKNOWN_HOST),
-            0);
+            0,
+            stateDirectory);
 
         final Map<TaskId, Set<TopicPartition>> assignment = new HashMap<>();
         assignment.put(new TaskId(0, 0), Collections.singleton(new 
TopicPartition("someTopic", 0)));
@@ -907,7 +805,8 @@ public class StreamThreadTest {
             metrics,
             mockTime,
             new StreamsMetadataState(builder, 
StreamsMetadataState.UNKNOWN_HOST),
-            0);
+            0,
+            stateDirectory);
 
         final Map<TaskId, Set<TopicPartition>> assignment = new HashMap<>();
         assignment.put(new TaskId(0, 0), Collections.singleton(new 
TopicPartition("someTopic", 0)));
@@ -944,7 +843,8 @@ public class StreamThreadTest {
             metrics,
             mockTime,
             new StreamsMetadataState(builder, 
StreamsMetadataState.UNKNOWN_HOST),
-            0);
+            0,
+            stateDirectory);
 
         final Map<TaskId, Set<TopicPartition>> assignment = new HashMap<>();
         assignment.put(new TaskId(0, 0), Collections.singleton(new 
TopicPartition("someTopic", 0)));
@@ -975,7 +875,8 @@ public class StreamThreadTest {
             metrics,
             mockTime,
             new StreamsMetadataState(builder, 
StreamsMetadataState.UNKNOWN_HOST),
-            0);
+            0,
+            stateDirectory);
 
         final Map<TaskId, Set<TopicPartition>> assignment = new HashMap<>();
         assignment.put(new TaskId(0, 0), Collections.singleton(new 
TopicPartition("someTopic", 0)));
@@ -1004,7 +905,8 @@ public class StreamThreadTest {
             metrics,
             mockTime,
             new StreamsMetadataState(builder, 
StreamsMetadataState.UNKNOWN_HOST),
-            0);
+            0,
+            stateDirectory);
 
         thread.setPartitionAssignor(new StreamPartitionAssignor() {
             @Override
@@ -1043,7 +945,8 @@ public class StreamThreadTest {
             metrics,
             mockTime,
             new StreamsMetadataState(builder, 
StreamsMetadataState.UNKNOWN_HOST),
-            0) {
+            0,
+            stateDirectory) {
 
             @Override
             protected StreamTask createStreamTask(final TaskId id, final 
Collection<TopicPartition> partitionsForTask) {
@@ -1095,7 +998,8 @@ public class StreamThreadTest {
             metrics,
             mockTime,
             new StreamsMetadataState(builder, 
StreamsMetadataState.UNKNOWN_HOST),
-            0);
+            0,
+            stateDirectory);
 
         final MockConsumer<byte[], byte[]> restoreConsumer = 
clientSupplier.restoreConsumer;
         
restoreConsumer.updatePartitions("stream-thread-test-count-one-changelog",
@@ -1153,7 +1057,8 @@ public class StreamThreadTest {
             metrics,
             mockTime,
             new StreamsMetadataState(builder, 
StreamsMetadataState.UNKNOWN_HOST),
-            0);
+            0,
+            stateDirectory);
         final MockConsumer<byte[], byte[]> restoreConsumer = 
clientSupplier.restoreConsumer;
         
restoreConsumer.updatePartitions("stream-thread-test-count-one-changelog",
                                          Collections.singletonList(new 
PartitionInfo("stream-thread-test-count-one-changelog",
@@ -1226,7 +1131,8 @@ public class StreamThreadTest {
             metrics,
             mockTime,
             new StreamsMetadataState(builder, 
StreamsMetadataState.UNKNOWN_HOST),
-            0) {
+            0,
+            stateDirectory) {
 
             @Override
             protected StreamTask createStreamTask(final TaskId id, final 
Collection<TopicPartition> partitions) {
@@ -1304,7 +1210,8 @@ public class StreamThreadTest {
             metrics,
             mockTime,
             new StreamsMetadataState(builder, 
StreamsMetadataState.UNKNOWN_HOST),
-            0);
+            0,
+            stateDirectory);
 
         final MockConsumer consumer = clientSupplier.consumer;
         consumer.updatePartitions(TOPIC, Collections.singletonList(new 
PartitionInfo(TOPIC, 0, null, null, null)));
@@ -1395,7 +1302,8 @@ public class StreamThreadTest {
             new Metrics(),
             new MockTime(),
             new StreamsMetadataState(builder, 
StreamsMetadataState.UNKNOWN_HOST),
-            0);
+            0,
+            stateDirectory);
 
         final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
         activeTasks.put(task1, task0Assignment);
@@ -1450,7 +1358,8 @@ public class StreamThreadTest {
             metrics,
             mockTime,
             new StreamsMetadataState(builder, 
StreamsMetadataState.UNKNOWN_HOST),
-            0) {
+            0,
+            stateDirectory) {
 
             @Override
             protected StreamTask createStreamTask(final TaskId id, final 
Collection<TopicPartition> partitions) {
@@ -1504,7 +1413,8 @@ public class StreamThreadTest {
             metrics,
             mockTime,
             new StreamsMetadataState(builder, 
StreamsMetadataState.UNKNOWN_HOST),
-            0) {
+            0,
+            stateDirectory) {
 
             @Override
             protected StreamTask createStreamTask(final TaskId id, final 
Collection<TopicPartition> partitions) {
@@ -1564,7 +1474,8 @@ public class StreamThreadTest {
                 metrics,
                 mockTime,
                 new StreamsMetadataState(builder, 
StreamsMetadataState.UNKNOWN_HOST),
-                0) {
+                0,
+                stateDirectory) {
 
             @Override
             protected StreamTask createStreamTask(final TaskId id, final 
Collection<TopicPartition> partitions) {
@@ -1620,7 +1531,8 @@ public class StreamThreadTest {
             metrics,
             mockTime,
             new StreamsMetadataState(builder, 
StreamsMetadataState.UNKNOWN_HOST),
-            0) {
+            0,
+            stateDirectory) {
 
             @Override
             protected StreamTask createStreamTask(final TaskId id, final 
Collection<TopicPartition> partitions) {
@@ -1679,7 +1591,9 @@ public class StreamThreadTest {
             processId,
             metrics,
             mockTime,
-            new StreamsMetadataState(builder, 
StreamsMetadataState.UNKNOWN_HOST), 0) {
+            new StreamsMetadataState(builder, 
StreamsMetadataState.UNKNOWN_HOST),
+            0,
+            stateDirectory) {
 
             @Override
             protected StreamTask createStreamTask(final TaskId id, final 
Collection<TopicPartition> partitions) {
@@ -1723,7 +1637,8 @@ public class StreamThreadTest {
             metrics,
             mockTime,
             new StreamsMetadataState(topologyBuilder, 
StreamsMetadataState.UNKNOWN_HOST),
-            0);
+            0,
+            stateDirectory);
 
         final StreamPartitionAssignor partitionAssignor = new 
StreamPartitionAssignor();
         final Map<String, Object> configurationMap = new HashMap<>();
@@ -1796,56 +1711,40 @@ public class StreamThreadTest {
     public void shouldReleaseStateDirLockIfFailureOnTaskSuspend() throws 
Exception {
         final TaskId taskId = new TaskId(0, 0);
 
-        final StreamThread thread = setupTest(taskId);
+        final StateDirectory stateDirMock = mockStateDirInteractions(taskId);
+        final StreamThread thread = setupTest(taskId, stateDirMock);
 
-        final StateDirectory testStateDir = new StateDirectory(
-            applicationId,
-            config.getString(StreamsConfig.STATE_DIR_CONFIG),
-            mockTime);
 
-        assertFalse(testStateDir.lock(taskId, 0));
         try {
             
thread.rebalanceListener.onPartitionsRevoked(Collections.<TopicPartition>emptyList());
             fail("Should have thrown exception");
         } catch (final Exception e) {
-            assertTrue(testStateDir.lock(taskId, 0));
+            //
         } finally {
-            testStateDir.unlock(taskId);
+            thread.close();
         }
+
+        EasyMock.verify(stateDirMock);
     }
 
     @Test
     public void 
shouldReleaseStateDirLockIfFailureOnTaskCloseForSuspendedTask() throws 
Exception {
         final TaskId taskId = new TaskId(0, 0);
 
-        final StreamThread thread = setupTest(taskId);
-        thread.start();
-
-        final StateDirectory testStateDir = new StateDirectory(
-            applicationId,
-            config.getString(StreamsConfig.STATE_DIR_CONFIG),
-            mockTime);
+        final StateDirectory stateDirMock = mockStateDirInteractions(taskId);
 
-        assertFalse(testStateDir.lock(taskId, 0));
-        try {
-            thread.close();
-            thread.join();
-            assertTrue(testStateDir.lock(taskId, 0));
-        } finally {
-            testStateDir.unlock(taskId);
-        }
+        final StreamThread thread = setupTest(taskId, stateDirMock);
+        thread.close();
+        thread.join();
+        EasyMock.verify(stateDirMock);
     }
 
-    private StreamThread setupTest(final TaskId taskId) {
+    private StreamThread setupTest(final TaskId taskId, final StateDirectory 
stateDirectory) throws InterruptedException {
         final TopologyBuilder builder = new TopologyBuilder();
         builder.setApplicationId(applicationId);
         builder.addSource("source", "topic");
 
         final MockClientSupplier clientSupplier = new MockClientSupplier();
-        final StateDirectory stateDirectory = new StateDirectory(
-            applicationId,
-            config.getString(StreamsConfig.STATE_DIR_CONFIG),
-            mockTime);
 
         final TestStreamTask testStreamTask = new TestStreamTask(taskId,
             applicationId,
@@ -1864,9 +1763,19 @@ public class StreamThreadTest {
             }
         };
 
-        final StreamThread thread = new StreamThread(builder, config, 
clientSupplier, applicationId,
-            clientId, processId, new Metrics(), new MockTime(),
-            new StreamsMetadataState(builder, 
StreamsMetadataState.UNKNOWN_HOST), 0) {
+
+        final StreamThread thread = new StreamThread(
+            builder,
+            config,
+            clientSupplier,
+            applicationId,
+            clientId,
+            processId,
+            new Metrics(),
+            new MockTime(),
+            new StreamsMetadataState(builder, 
StreamsMetadataState.UNKNOWN_HOST),
+            0, stateDirectory) {
+
             @Override
             protected StreamTask createStreamTask(final TaskId id, final 
Collection<TopicPartition> partitions) {
                 return testStreamTask;
@@ -1876,6 +1785,8 @@ public class StreamThreadTest {
         final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
         activeTasks.put(testStreamTask.id, testStreamTask.partitions);
         thread.setPartitionAssignor(new 
MockStreamsPartitionAssignor(activeTasks));
+        thread.start();
+        
thread.rebalanceListener.onPartitionsRevoked(Collections.<TopicPartition>emptySet());
         
thread.rebalanceListener.onPartitionsAssigned(testStreamTask.partitions);
 
         return thread;
@@ -1885,45 +1796,62 @@ public class StreamThreadTest {
     public void shouldReleaseStateDirLockIfFailureOnStandbyTaskSuspend() 
throws Exception {
         final TaskId taskId = new TaskId(0, 0);
 
-        final StreamThread thread = setupStandbyTest(taskId);
+        final StateDirectory stateDirMock = mockStateDirInteractions(taskId);
+        final StreamThread thread = setupStandbyTest(taskId, stateDirMock);
 
-        final StateDirectory testStateDir = new StateDirectory(applicationId,
-            config.getString(StreamsConfig.STATE_DIR_CONFIG),
-            mockTime);
+        startThreadAndRebalance(thread);
 
-        assertFalse(testStateDir.lock(taskId, 0));
         try {
             
thread.rebalanceListener.onPartitionsRevoked(Collections.<TopicPartition>emptyList());
             fail("Should have thrown exception");
         } catch (final Exception e) {
-            assertTrue(testStateDir.lock(taskId, 0));
+            // ok
         } finally {
-            testStateDir.unlock(taskId);
+            thread.close();
+
         }
+        EasyMock.verify(stateDirMock);
+    }
+
+    private void startThreadAndRebalance(final StreamThread thread) throws 
InterruptedException {
+        thread.start();
+        TestUtils.waitForCondition(new TestCondition() {
+            @Override
+            public boolean conditionMet() {
+                return thread.state() == StreamThread.State.RUNNING;
+            }
+        }, "thread didn't transition to running");
+        
thread.rebalanceListener.onPartitionsRevoked(Collections.<TopicPartition>emptySet());
+        
thread.rebalanceListener.onPartitionsAssigned(Collections.<TopicPartition>emptySet());
     }
 
     @Test
     public void 
shouldReleaseStateDirLockIfFailureOnStandbyTaskCloseForUnassignedSuspendedStandbyTask()
 throws Exception {
         final TaskId taskId = new TaskId(0, 0);
 
-        final StreamThread thread = setupStandbyTest(taskId);
-        thread.start();
-
-        final StateDirectory testStateDir = new StateDirectory(applicationId,
-            config.getString(StreamsConfig.STATE_DIR_CONFIG),
-            mockTime);
-
-        assertFalse(testStateDir.lock(taskId, 0));
+        final StateDirectory stateDirMock = mockStateDirInteractions(taskId);
+        final StreamThread thread = setupStandbyTest(taskId, stateDirMock);
+        startThreadAndRebalance(thread);
         try {
             thread.close();
             thread.join();
-            assertTrue(testStateDir.lock(taskId, 0));
         } finally {
-            testStateDir.unlock(taskId);
+            thread.close();
         }
+        EasyMock.verify(stateDirMock);
     }
 
-    private StreamThread setupStandbyTest(final TaskId taskId) {
+    private StateDirectory mockStateDirInteractions(final TaskId taskId) 
throws IOException {
+        final StateDirectory stateDirMock = 
EasyMock.createNiceMock(StateDirectory.class);
+        EasyMock.expect(stateDirMock.lock(EasyMock.eq(taskId), 
EasyMock.anyInt())).andReturn(true);
+        EasyMock.expect(stateDirMock.directoryForTask(taskId)).andReturn(new 
File(stateDir));
+        EasyMock.expect(stateDirMock.unlock(taskId)).andReturn(true);
+        EasyMock.expectLastCall();
+        EasyMock.replay(stateDirMock);
+        return stateDirMock;
+    }
+
+    private StreamThread setupStandbyTest(final TaskId taskId, final 
StateDirectory stateDirectory) {
         final String storeName = "store";
         final String changelogTopic = applicationId + "-" + storeName + 
"-changelog";
 
@@ -1945,9 +1873,18 @@ public class StreamThreadTest {
             }
         });
 
-        final StreamThread thread = new StreamThread(builder, config, 
clientSupplier, applicationId,
-            clientId, processId, new Metrics(), new MockTime(),
-            new StreamsMetadataState(builder, 
StreamsMetadataState.UNKNOWN_HOST), 0) {
+        final StreamThread thread = new StreamThread(
+            builder,
+            config,
+            clientSupplier,
+            applicationId,
+            clientId,
+            processId,
+            new Metrics(),
+            new MockTime(),
+            new StreamsMetadataState(builder, 
StreamsMetadataState.UNKNOWN_HOST),
+            0,
+            stateDirectory) {
 
             @Override
             protected StandbyTask createStandbyTask(final TaskId id, final 
Collection<TopicPartition> partitions) {
@@ -1978,8 +1915,6 @@ public class StreamThreadTest {
         final Map<TaskId, Set<TopicPartition>> standbyTasks = new HashMap<>();
         standbyTasks.put(taskId, Collections.singleton(new 
TopicPartition("topic", 0)));
         thread.setPartitionAssignor(new 
MockStreamsPartitionAssignor(Collections.<TaskId, 
Set<TopicPartition>>emptyMap(), standbyTasks));
-        
thread.rebalanceListener.onPartitionsAssigned(Collections.<TopicPartition>emptySet());
-
         return thread;
     }
 
@@ -2028,7 +1963,8 @@ public class StreamThreadTest {
             metrics,
             Time.SYSTEM,
             new StreamsMetadataState(builder, 
StreamsMetadataState.UNKNOWN_HOST),
-            0) {
+            0,
+            stateDirectory) {
 
             @Override
             protected StreamTask createStreamTask(final TaskId id, final 
Collection<TopicPartition> partitionsForTask) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/2a4eeb1c/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
index bf55b47..f04f80a 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
@@ -122,7 +122,8 @@ public class StreamThreadStateStoreProviderTest {
                 new Metrics(),
                 Time.SYSTEM,
                 new StreamsMetadataState(builder, 
StreamsMetadataState.UNKNOWN_HOST),
-                0) {
+                0,
+                stateDirectory) {
 
                 @Override
                 public Map<TaskId, StreamTask> tasks() {

Reply via email to