Repository: kafka
Updated Branches:
  refs/heads/trunk eba0ede87 -> 69ebf6f7b


http://git-wip-us.apache.org/repos/asf/kafka/blob/69ebf6f7/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
index b9a1cf6..2d9b9a5 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
@@ -35,10 +35,12 @@ import 
org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
 import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.kstream.TimeWindows;
 import org.apache.kafka.streams.kstream.ValueMapper;
 import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
 import org.apache.kafka.test.MockKeyValueMapper;
 import org.apache.kafka.test.TestUtils;
+import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.Test;
@@ -50,8 +52,10 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.Locale;
 import java.util.Properties;
+import java.util.concurrent.TimeUnit;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 /**
  * Tests related to internal topics in streams
@@ -65,6 +69,8 @@ public class InternalTopicIntegrationTest {
     private static final String DEFAULT_OUTPUT_TOPIC = "outputTopic";
     private static final int DEFAULT_ZK_SESSION_TIMEOUT_MS = 10 * 1000;
     private static final int DEFAULT_ZK_CONNECTION_TIMEOUT_MS = 8 * 1000;
+    private Properties streamsConfiguration;
+    private String applicationId = "compact-topics-integration-test";
 
     @BeforeClass
     public static void startKafkaCluster() throws Exception {
@@ -72,14 +78,20 @@ public class InternalTopicIntegrationTest {
         CLUSTER.createTopic(DEFAULT_OUTPUT_TOPIC);
     }
 
-    /**
-     * Validates that any state changelog topics are compacted
-     *
-     * @return true if topics have a valid config, false otherwise
-     */
-    private boolean isUsingCompactionForStateChangelogTopics() {
-        boolean valid = true;
+    @Before
+    public void before() {
+        streamsConfiguration = new Properties();
+        streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, 
applicationId);
+        streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
+        streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, 
CLUSTER.zKConnectString());
+        streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, 
Serdes.String().getClass().getName());
+        streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, 
Serdes.String().getClass().getName());
+        streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, 
TestUtils.tempDirectory().getPath());
+        streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest");
+    }
+
 
+    private Properties getTopicConfigProperties(final String changelog) {
         // Note: You must initialize the ZkClient with ZKStringSerializer.  If 
you don't, then
         // createTopic() will only seem to work (it will return without 
error).  The topic will exist in
         // only ZooKeeper and will be returned when listing topics, but Kafka 
itself does not create the
@@ -89,33 +101,28 @@ public class InternalTopicIntegrationTest {
             DEFAULT_ZK_SESSION_TIMEOUT_MS,
             DEFAULT_ZK_CONNECTION_TIMEOUT_MS,
             ZKStringSerializer$.MODULE$);
-        final boolean isSecure = false;
-        final ZkUtils zkUtils = new ZkUtils(zkClient, new 
ZkConnection(CLUSTER.zKConnectString()), isSecure);
-
-        final Map<String, Properties> topicConfigs = 
AdminUtils.fetchAllTopicConfigs(zkUtils);
-        final Iterator it = topicConfigs.iterator();
-        while (it.hasNext()) {
-            final Tuple2<String, Properties> topicConfig = (Tuple2<String, 
Properties>) it.next();
-            final String topic = topicConfig._1;
-            final Properties prop = topicConfig._2;
-
-            // state changelogs should be compacted
-            if 
(topic.endsWith(ProcessorStateManager.STATE_CHANGELOG_TOPIC_SUFFIX)) {
-                if (!prop.containsKey(LogConfig.CleanupPolicyProp()) ||
-                    
!prop.getProperty(LogConfig.CleanupPolicyProp()).equals(LogConfig.Compact())) {
-                    valid = false;
-                    break;
+        try {
+            final boolean isSecure = false;
+            final ZkUtils zkUtils = new ZkUtils(zkClient, new 
ZkConnection(CLUSTER.zKConnectString()), isSecure);
+
+            final Map<String, Properties> topicConfigs = 
AdminUtils.fetchAllTopicConfigs(zkUtils);
+            final Iterator it = topicConfigs.iterator();
+            while (it.hasNext()) {
+                final Tuple2<String, Properties> topicConfig = (Tuple2<String, 
Properties>) it.next();
+                final String topic = topicConfig._1;
+                final Properties prop = topicConfig._2;
+                if (topic.equals(changelog)) {
+                    return prop;
                 }
             }
+            return new Properties();
+        } finally {
+            zkClient.close();
         }
-        zkClient.close();
-        return valid;
     }
 
     @Test
     public void shouldCompactTopicsForStateChangelogs() throws Exception {
-        final List<String> inputValues = Arrays.asList("hello", "world", 
"world", "hello world");
-
         //
         // Step 1: Configure and start a simple word count topology
         //
@@ -154,6 +161,17 @@ public class InternalTopicIntegrationTest {
         //
         // Step 2: Produce some input data to the input topic.
         //
+        produceData(Arrays.asList("hello", "world", "world", "hello world"));
+
+        //
+        // Step 3: Verify the state changelog topics are compact
+        //
+        streams.close();
+        final Properties properties = 
getTopicConfigProperties(ProcessorStateManager.storeChangelogTopic(applicationId,
 "Counts"));
+        assertEquals(LogConfig.Compact(), 
properties.getProperty(LogConfig.CleanupPolicyProp()));
+    }
+
+    private void produceData(final List<String> inputValues) throws 
java.util.concurrent.ExecutionException, InterruptedException {
         final Properties producerConfig = new Properties();
         producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
         producerConfig.put(ProducerConfig.ACKS_CONFIG, "all");
@@ -161,11 +179,47 @@ public class InternalTopicIntegrationTest {
         producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class);
         producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class);
         IntegrationTestUtils.produceValuesSynchronously(DEFAULT_INPUT_TOPIC, 
inputValues, producerConfig, mockTime);
+    }
+
+    @Test
+    public void shouldUseCompactAndDeleteForWindowStoreChangelogs() throws 
Exception {
+        KStreamBuilder builder = new KStreamBuilder();
+
+        KStream<String, String> textLines = 
builder.stream(DEFAULT_INPUT_TOPIC);
+
+        final int durationMs = 2000;
+        textLines
+                .flatMapValues(new ValueMapper<String, Iterable<String>>() {
+                    @Override
+                    public Iterable<String> apply(String value) {
+                        return 
Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+"));
+                    }
+                }).groupBy(MockKeyValueMapper.<String, 
String>SelectValueMapper())
+                .count(TimeWindows.of(1000).until(durationMs), 
"CountWindows").toStream();
+
+
+        // Remove any state from previous test runs
+        IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
+
+        KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
+        streams.start();
+
+        //
+        // Step 2: Produce some input data to the input topic.
+        //
+        produceData(Arrays.asList("hello", "world", "world", "hello world"));
 
         //
         // Step 3: Verify the state changelog topics are compact
         //
         streams.close();
-        assertEquals(isUsingCompactionForStateChangelogTopics(), true);
+        final Properties properties = 
getTopicConfigProperties(ProcessorStateManager.storeChangelogTopic(applicationId,
 "CountWindows"));
+        final List<String> policies = 
Arrays.asList(properties.getProperty(LogConfig.CleanupPolicyProp()).split(","));
+        assertEquals(2, policies.size());
+        assertTrue(policies.contains(LogConfig.Compact()));
+        assertTrue(policies.contains(LogConfig.Delete()));
+        // retention should be 1 day + the window duration
+        final Long retention = TimeUnit.MILLISECONDS.convert(1, TimeUnit.DAYS) 
+ durationMs;
+        assertEquals(retention, 
Long.valueOf(properties.getProperty(LogConfig.RetentionMsProp())));
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/69ebf6f7/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
index fe66acb..a4c008a 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
@@ -19,19 +19,24 @@ package org.apache.kafka.streams.processor;
 
 import org.apache.kafka.streams.errors.TopologyBuilderException;
 import org.apache.kafka.streams.processor.TopologyBuilder.TopicsInfo;
+import org.apache.kafka.streams.processor.internals.InternalTopicConfig;
+import org.apache.kafka.streams.processor.internals.InternalTopicManager;
 import org.apache.kafka.streams.processor.internals.ProcessorNode;
 import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
 import org.apache.kafka.streams.processor.internals.ProcessorTopology;
+import org.apache.kafka.streams.state.internals.RocksDBWindowStoreSupplier;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockStateStoreSupplier;
 import org.junit.Test;
 
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
 import java.util.Set;
 import java.util.regex.Pattern;
 
@@ -264,9 +269,9 @@ public class TopologyBuilderTest {
         Map<Integer, TopicsInfo> topicGroups = builder.topicGroups();
 
         Map<Integer, TopicsInfo> expectedTopicGroups = new HashMap<>();
-        expectedTopicGroups.put(0, new 
TopicsInfo(Collections.<String>emptySet(), mkSet("topic-1", "X-topic-1x", 
"topic-2"), Collections.<String>emptySet(), Collections.<String>emptySet()));
-        expectedTopicGroups.put(1, new 
TopicsInfo(Collections.<String>emptySet(), mkSet("topic-3", "topic-4"), 
Collections.<String>emptySet(), Collections.<String>emptySet()));
-        expectedTopicGroups.put(2, new 
TopicsInfo(Collections.<String>emptySet(), mkSet("topic-5"), 
Collections.<String>emptySet(), Collections.<String>emptySet()));
+        expectedTopicGroups.put(0, new 
TopicsInfo(Collections.<String>emptySet(), mkSet("topic-1", "X-topic-1x", 
"topic-2"), Collections.<String, InternalTopicConfig>emptyMap(), 
Collections.<String, InternalTopicConfig>emptyMap()));
+        expectedTopicGroups.put(1, new 
TopicsInfo(Collections.<String>emptySet(), mkSet("topic-3", "topic-4"), 
Collections.<String, InternalTopicConfig>emptyMap(), Collections.<String, 
InternalTopicConfig>emptyMap()));
+        expectedTopicGroups.put(2, new 
TopicsInfo(Collections.<String>emptySet(), mkSet("topic-5"), 
Collections.<String, InternalTopicConfig>emptyMap(), Collections.<String, 
InternalTopicConfig>emptyMap()));
 
         assertEquals(3, topicGroups.size());
         assertEquals(expectedTopicGroups, topicGroups);
@@ -302,9 +307,32 @@ public class TopologyBuilderTest {
         Map<Integer, TopicsInfo> topicGroups = builder.topicGroups();
 
         Map<Integer, TopicsInfo> expectedTopicGroups = new HashMap<>();
-        expectedTopicGroups.put(0, new 
TopicsInfo(Collections.<String>emptySet(), mkSet("topic-1", "topic-1x", 
"topic-2"), Collections.<String>emptySet(), 
mkSet(ProcessorStateManager.storeChangelogTopic("X", "store-1"))));
-        expectedTopicGroups.put(1, new 
TopicsInfo(Collections.<String>emptySet(), mkSet("topic-3", "topic-4"), 
Collections.<String>emptySet(), 
mkSet(ProcessorStateManager.storeChangelogTopic("X", "store-2"))));
-        expectedTopicGroups.put(2, new 
TopicsInfo(Collections.<String>emptySet(), mkSet("topic-5"), 
Collections.<String>emptySet(), 
mkSet(ProcessorStateManager.storeChangelogTopic("X", "store-3"))));
+        final String store1 = ProcessorStateManager.storeChangelogTopic("X", 
"store-1");
+        final String store2 = ProcessorStateManager.storeChangelogTopic("X", 
"store-2");
+        final String store3 = ProcessorStateManager.storeChangelogTopic("X", 
"store-3");
+        expectedTopicGroups.put(0, new 
TopicsInfo(Collections.<String>emptySet(), mkSet("topic-1", "topic-1x", 
"topic-2"),
+                                                  Collections.<String, 
InternalTopicConfig>emptyMap(),
+                                                  
Collections.singletonMap(store1,
+                                                                           new 
InternalTopicConfig(
+                                                                               
    store1,
+                                                                               
    Collections.singleton(InternalTopicConfig.CleanupPolicy.compact),
+                                                                               
    Collections.<String, String>emptyMap()))));
+        expectedTopicGroups.put(1, new 
TopicsInfo(Collections.<String>emptySet(), mkSet("topic-3", "topic-4"),
+                                                  Collections.<String, 
InternalTopicConfig>emptyMap(),
+                                                  
Collections.singletonMap(store2,
+                                                                           new 
InternalTopicConfig(
+                                                                               
    store2,
+                                                                               
    Collections.singleton(InternalTopicConfig.CleanupPolicy.compact),
+                                                                               
    Collections.<String, String>emptyMap()))));
+        expectedTopicGroups.put(2, new 
TopicsInfo(Collections.<String>emptySet(), mkSet("topic-5"),
+                                                  Collections.<String, 
InternalTopicConfig>emptyMap(),
+                                                  
Collections.singletonMap(store3,
+                                                                           new 
InternalTopicConfig(
+                                                                               
    store3,
+                                                                               
    Collections.singleton(InternalTopicConfig.CleanupPolicy.compact),
+                                                                               
    Collections.<String, String>emptyMap()))));
+
+
 
         assertEquals(3, topicGroups.size());
         assertEquals(expectedTopicGroups, topicGroups);
@@ -390,7 +418,7 @@ public class TopologyBuilderTest {
     @Test(expected = NullPointerException.class)
     public void shouldNotAddNullStateStoreSupplier() throws Exception {
         final TopologyBuilder builder = new TopologyBuilder();
-        builder.addStateStore(null, true);
+        builder.addStateStore(null);
     }
 
     private Set<String> nodeNames(Collection<ProcessorNode> nodes) {
@@ -406,7 +434,7 @@ public class TopologyBuilderTest {
         final TopologyBuilder builder = new TopologyBuilder();
         builder.addSource("source", "topic");
         builder.addProcessor("processor", new MockProcessorSupplier(), 
"source");
-        builder.addStateStore(new MockStateStoreSupplier("store", false), 
true, "processor");
+        builder.addStateStore(new MockStateStoreSupplier("store", false), 
"processor");
         final Map<String, Set<String>> stateStoreNameToSourceTopic = 
builder.stateStoreNameToSourceTopics();
         assertEquals(1, stateStoreNameToSourceTopic.size());
         assertEquals(Collections.singleton("topic"), 
stateStoreNameToSourceTopic.get("store"));
@@ -417,7 +445,7 @@ public class TopologyBuilderTest {
         final TopologyBuilder builder = new TopologyBuilder();
         builder.addSource("source", "topic");
         builder.addProcessor("processor", new MockProcessorSupplier(), 
"source");
-        builder.addStateStore(new MockStateStoreSupplier("store", false), 
false, "processor");
+        builder.addStateStore(new MockStateStoreSupplier("store", false), 
"processor");
         final Map<String, Set<String>> stateStoreNameToSourceTopic = 
builder.stateStoreNameToSourceTopics();
         assertEquals(1, stateStoreNameToSourceTopic.size());
         assertEquals(Collections.singleton("topic"), 
stateStoreNameToSourceTopic.get("store"));
@@ -430,10 +458,63 @@ public class TopologyBuilderTest {
         builder.addInternalTopic("internal-topic");
         builder.addSource("source", "internal-topic");
         builder.addProcessor("processor", new MockProcessorSupplier(), 
"source");
-        builder.addStateStore(new MockStateStoreSupplier("store", false), 
true, "processor");
+        builder.addStateStore(new MockStateStoreSupplier("store", false), 
"processor");
         final Map<String, Set<String>> stateStoreNameToSourceTopic = 
builder.stateStoreNameToSourceTopics();
         assertEquals(1, stateStoreNameToSourceTopic.size());
         assertEquals(Collections.singleton("appId-internal-topic"), 
stateStoreNameToSourceTopic.get("store"));
     }
 
+    @SuppressWarnings("unchecked")
+    @Test
+    public void 
shouldAddInternalTopicConfigWithCompactAndDeleteSetForWindowStores() throws 
Exception {
+        final TopologyBuilder builder = new TopologyBuilder();
+        builder.setApplicationId("appId");
+        builder.addSource("source", "topic");
+        builder.addProcessor("processor", new MockProcessorSupplier(), 
"source");
+        builder.addStateStore(new RocksDBWindowStoreSupplier("store", 30000, 
3, false, null, null, true, Collections.<String, String>emptyMap()), 
"processor");
+        final Map<Integer, TopicsInfo> topicGroups = builder.topicGroups();
+        final TopicsInfo topicsInfo = topicGroups.values().iterator().next();
+        final InternalTopicConfig topicConfig = 
topicsInfo.stateChangelogTopics.get("appId-store-changelog");
+        final Properties properties = topicConfig.toProperties(0);
+        final List<String> policies = 
Arrays.asList(properties.getProperty(InternalTopicManager.CLEANUP_POLICY_PROP).split(","));
+        assertEquals("appId-store-changelog", topicConfig.name());
+        assertTrue(policies.contains("compact"));
+        assertTrue(policies.contains("delete"));
+        assertEquals(2, policies.size());
+        assertEquals("30000", 
properties.getProperty(InternalTopicManager.RETENTION_MS));
+        assertEquals(2, properties.size());
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void shouldAddInternalTopicConfigWithCompactForNonWindowStores() 
throws Exception {
+        final TopologyBuilder builder = new TopologyBuilder();
+        builder.setApplicationId("appId");
+        builder.addSource("source", "topic");
+        builder.addProcessor("processor", new MockProcessorSupplier(), 
"source");
+        builder.addStateStore(new MockStateStoreSupplier("name", true), 
"processor");
+        final Map<Integer, TopicsInfo> topicGroups = builder.topicGroups();
+        final TopicsInfo topicsInfo = topicGroups.values().iterator().next();
+        final InternalTopicConfig topicConfig = 
topicsInfo.stateChangelogTopics.get("appId-name-changelog");
+        final Properties properties = topicConfig.toProperties(0);
+        assertEquals("appId-name-changelog", topicConfig.name());
+        assertEquals("compact", 
properties.getProperty(InternalTopicManager.CLEANUP_POLICY_PROP));
+        assertEquals(1, properties.size());
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void 
shouldAddInternalTopicConfigWithCleanupPolicyDeleteForInternalTopics() throws 
Exception {
+        final TopologyBuilder builder = new TopologyBuilder();
+        builder.setApplicationId("appId");
+        builder.addInternalTopic("foo");
+        builder.addSource("source", "foo");
+        final TopicsInfo topicsInfo = 
builder.topicGroups().values().iterator().next();
+        final InternalTopicConfig topicConfig = 
topicsInfo.interSourceTopics.get("appId-foo");
+        final Properties properties = topicConfig.toProperties(0);
+        assertEquals("appId-foo", topicConfig.name());
+        assertEquals("delete", 
properties.getProperty(InternalTopicManager.CLEANUP_POLICY_PROP));
+        assertEquals(1, properties.size());
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/69ebf6f7/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicConfigTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicConfigTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicConfigTest.java
new file mode 100644
index 0000000..b0a198b
--- /dev/null
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicConfigTest.java
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.common.utils.Utils;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+public class InternalTopicConfigTest {
+
+    @Test
+    public void shouldHaveCompactionPropSetIfSupplied() throws Exception {
+        final Properties properties = new InternalTopicConfig("name",
+                                                              
Collections.singleton(InternalTopicConfig.CleanupPolicy.compact),
+                                                              
Collections.<String, String>emptyMap()).toProperties(0);
+        assertEquals("compact", 
properties.getProperty(InternalTopicManager.CLEANUP_POLICY_PROP));
+    }
+
+
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowIfNameIsNull() throws Exception {
+        new InternalTopicConfig(null, 
Collections.singleton(InternalTopicConfig.CleanupPolicy.compact), 
Collections.<String, String>emptyMap());
+    }
+
+    @Test
+    public void 
shouldConfigureRetentionMsWithAdditionalRetentionWhenCompactAndDelete() throws 
Exception {
+        final InternalTopicConfig topicConfig = new InternalTopicConfig("name",
+                                                                        
Utils.mkSet(InternalTopicConfig.CleanupPolicy.compact, 
InternalTopicConfig.CleanupPolicy.delete),
+                                                                        
Collections.<String, String>emptyMap());
+        final int additionalRetentionMs = 20;
+        topicConfig.setRetentionMs(10);
+        final Properties properties = 
topicConfig.toProperties(additionalRetentionMs);
+        assertEquals("30", 
properties.getProperty(InternalTopicManager.RETENTION_MS));
+    }
+
+    @Test
+    public void shouldNotConfigureRetentionMsWhenCompact() throws Exception {
+        final InternalTopicConfig topicConfig = new InternalTopicConfig("name",
+                                                                        
Collections.singleton(InternalTopicConfig.CleanupPolicy.compact),
+                                                                        
Collections.<String, String>emptyMap());
+        topicConfig.setRetentionMs(10);
+        final Properties properties = topicConfig.toProperties(0);
+        assertNull(null, 
properties.getProperty(InternalTopicManager.RETENTION_MS));
+    }
+
+    @Test
+    public void shouldNotConfigureRetentionMsWhenDelete() throws Exception {
+        final InternalTopicConfig topicConfig = new InternalTopicConfig("name",
+                                                                        
Collections.singleton(InternalTopicConfig.CleanupPolicy.delete),
+                                                                        
Collections.<String, String>emptyMap());
+        topicConfig.setRetentionMs(10);
+        final Properties properties = topicConfig.toProperties(0);
+        assertNull(null, 
properties.getProperty(InternalTopicManager.RETENTION_MS));
+    }
+
+
+    @Test
+    public void shouldBeCompactedIfCleanupPolicyCompactOrCompactAndDelete() 
throws Exception {
+        assertTrue(new InternalTopicConfig("name",
+                                           
Collections.singleton(InternalTopicConfig.CleanupPolicy.compact),
+                                           Collections.<String, 
String>emptyMap()).isCompacted());
+        assertTrue(new InternalTopicConfig("name", 
Utils.mkSet(InternalTopicConfig.CleanupPolicy.compact,
+                                                               
InternalTopicConfig.CleanupPolicy.delete),
+                                           Collections.<String, 
String>emptyMap()).isCompacted());
+    }
+
+    @Test
+    public void shouldNotBeCompactedWhenCleanupPolicyIsDelete() throws 
Exception {
+        assertFalse(new InternalTopicConfig("name",
+                                            
Collections.singleton(InternalTopicConfig.CleanupPolicy.delete),
+                                            Collections.<String, 
String>emptyMap()).isCompacted());
+    }
+
+    @Test
+    public void shouldUseCleanupPolicyFromConfigIfSupplied() throws Exception {
+        final InternalTopicConfig config = new InternalTopicConfig("name",
+                                                                   
Collections.singleton(InternalTopicConfig.CleanupPolicy.delete),
+                                                                   
Collections.singletonMap("cleanup.policy", "compact"));
+
+        final Properties properties = config.toProperties(0);
+        assertEquals("compact", properties.getProperty("cleanup.policy"));
+    }
+
+    @Test
+    public void shouldHavePropertiesSuppliedByUser() throws Exception {
+        final Map<String, String> configs = new HashMap<>();
+        configs.put("retention.ms", "1000");
+        configs.put("retention.bytes", "10000");
+
+        final InternalTopicConfig topicConfig = new InternalTopicConfig("name",
+                                                                 
Collections.singleton(InternalTopicConfig.CleanupPolicy.delete),
+                                                                 configs);
+
+        final Properties properties = topicConfig.toProperties(0);
+        assertEquals("1000", properties.getProperty("retention.ms"));
+        assertEquals("10000", properties.getProperty("retention.bytes"));
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/69ebf6f7/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
index e300966..e5ae7d8 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
@@ -735,15 +735,15 @@ public class StreamPartitionAssignorTest {
         }
 
         @Override
-        public void makeReady(String topic, int numPartitions, boolean 
compactTopic) {
-            readyTopics.put(topic, numPartitions);
+        public void makeReady(InternalTopicConfig topic, int numPartitions) {
+            readyTopics.put(topic.name(), numPartitions);
 
             List<PartitionInfo> partitions = new ArrayList<>();
             for (int i = 0; i < numPartitions; i++) {
-                partitions.add(new PartitionInfo(topic, i, null, null, null));
+                partitions.add(new PartitionInfo(topic.name(), i, null, null, 
null));
             }
 
-            restoreConsumer.updatePartitions(topic, partitions);
+            restoreConsumer.updatePartitions(topic.name(), partitions);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/69ebf6f7/streams/src/test/java/org/apache/kafka/streams/state/StoresTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/StoresTest.java 
b/streams/src/test/java/org/apache/kafka/streams/state/StoresTest.java
new file mode 100644
index 0000000..18d158d
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/state/StoresTest.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.state;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.processor.StateStoreSupplier;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class StoresTest {
+
+    @Test
+    public void shouldCreateInMemoryStoreSupplierWithLoggedConfig() throws 
Exception {
+        final StateStoreSupplier supplier = Stores.create("store")
+                .withKeys(Serdes.String())
+                .withValues(Serdes.String())
+                .inMemory()
+                .enableLogging(Collections.singletonMap("retention.ms", 
"1000"))
+                .build();
+
+        final Map<String, String> config = supplier.logConfig();
+        assertTrue(supplier.loggingEnabled());
+        assertEquals("1000", config.get("retention.ms"));
+    }
+
+    @Test
+    public void shouldCreateInMemoryStoreSupplierNotLogged() throws Exception {
+        final StateStoreSupplier supplier = Stores.create("store")
+                .withKeys(Serdes.String())
+                .withValues(Serdes.String())
+                .inMemory()
+                .disableLogging()
+                .build();
+
+        assertFalse(supplier.loggingEnabled());
+    }
+
+    @Test
+    public void shouldCreatePersistenStoreSupplierWithLoggedConfig() throws 
Exception {
+        final StateStoreSupplier supplier = Stores.create("store")
+                .withKeys(Serdes.String())
+                .withValues(Serdes.String())
+                .persistent()
+                .enableLogging(Collections.singletonMap("retention.ms", 
"1000"))
+                .build();
+
+        final Map<String, String> config = supplier.logConfig();
+        assertTrue(supplier.loggingEnabled());
+        assertEquals("1000", config.get("retention.ms"));
+    }
+
+    @Test
+    public void shouldCreatePersistenStoreSupplierNotLogged() throws Exception 
{
+        final StateStoreSupplier supplier = Stores.create("store")
+                .withKeys(Serdes.String())
+                .withValues(Serdes.String())
+                .persistent()
+                .disableLogging()
+                .build();
+
+        assertFalse(supplier.loggingEnabled());
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/69ebf6f7/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
index 8a22d37..700655e 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
@@ -26,125 +26,128 @@ import 
org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.KeyValueStoreTestDriver;
+import org.apache.kafka.test.MockProcessorContext;
+import org.junit.After;
+import org.junit.Before;
 import org.junit.Test;
 
 public abstract class AbstractKeyValueStoreTest {
 
+
+
     protected abstract <K, V> KeyValueStore<K, V> 
createKeyValueStore(ProcessorContext context,
                                                                       Class<K> 
keyClass, Class<V> valueClass,
                                                                       boolean 
useContextSerdes);
 
+    protected KeyValueStore<Integer, String> store;
+    protected KeyValueStoreTestDriver<Integer, String> driver;
+
+    @Before
+    public void before() {
+        driver = KeyValueStoreTestDriver.create(Integer.class, String.class);
+        final MockProcessorContext context = (MockProcessorContext) 
driver.context();
+        context.setTime(10);
+        store = createKeyValueStore(context, Integer.class, String.class, 
false);
+    }
+
+    @After
+    public void after() {
+        store.close();
+    }
+
     @Test
     public void testPutGetRange() {
-        // Create the test driver ...
-        KeyValueStoreTestDriver<Integer, String> driver = 
KeyValueStoreTestDriver.create(Integer.class, String.class);
-        KeyValueStore<Integer, String> store = 
createKeyValueStore(driver.context(), Integer.class, String.class, false);
-        try {
-
-            // Verify that the store reads and writes correctly ...
-            store.put(0, "zero");
-            store.put(1, "one");
-            store.put(2, "two");
-            store.put(4, "four");
-            store.put(5, "five");
-            assertEquals(5, driver.sizeOf(store));
-            assertEquals("zero", store.get(0));
-            assertEquals("one", store.get(1));
-            assertEquals("two", store.get(2));
-            assertNull(store.get(3));
-            assertEquals("four", store.get(4));
-            assertEquals("five", store.get(5));
-            store.delete(5);
-
-            // Flush the store and verify all current entries were properly 
flushed ...
-            store.flush();
-            assertEquals("zero", driver.flushedEntryStored(0));
-            assertEquals("one", driver.flushedEntryStored(1));
-            assertEquals("two", driver.flushedEntryStored(2));
-            assertEquals("four", driver.flushedEntryStored(4));
-            assertEquals(null, driver.flushedEntryStored(5));
-
-            assertEquals(false, driver.flushedEntryRemoved(0));
-            assertEquals(false, driver.flushedEntryRemoved(1));
-            assertEquals(false, driver.flushedEntryRemoved(2));
-            assertEquals(false, driver.flushedEntryRemoved(4));
-            assertEquals(true, driver.flushedEntryRemoved(5));
-
-            // Check range iteration ...
-            try (KeyValueIterator<Integer, String> iter = store.range(2, 4)) {
-                while (iter.hasNext()) {
-                    KeyValue<Integer, String> entry = iter.next();
-                    if (entry.key.equals(2))
-                        assertEquals("two", entry.value);
-                    else if (entry.key.equals(4))
-                        assertEquals("four", entry.value);
-                    else
-                        fail("Unexpected entry: " + entry);
-                }
+        // Verify that the store reads and writes correctly ...
+        store.put(0, "zero");
+        store.put(1, "one");
+        store.put(2, "two");
+        store.put(4, "four");
+        store.put(5, "five");
+        assertEquals(5, driver.sizeOf(store));
+        assertEquals("zero", store.get(0));
+        assertEquals("one", store.get(1));
+        assertEquals("two", store.get(2));
+        assertNull(store.get(3));
+        assertEquals("four", store.get(4));
+        assertEquals("five", store.get(5));
+        store.delete(5);
+
+        // Flush the store and verify all current entries were properly 
flushed ...
+        store.flush();
+        assertEquals("zero", driver.flushedEntryStored(0));
+        assertEquals("one", driver.flushedEntryStored(1));
+        assertEquals("two", driver.flushedEntryStored(2));
+        assertEquals("four", driver.flushedEntryStored(4));
+        assertEquals(null, driver.flushedEntryStored(5));
+
+        assertEquals(false, driver.flushedEntryRemoved(0));
+        assertEquals(false, driver.flushedEntryRemoved(1));
+        assertEquals(false, driver.flushedEntryRemoved(2));
+        assertEquals(false, driver.flushedEntryRemoved(4));
+        assertEquals(true, driver.flushedEntryRemoved(5));
+
+        // Check range iteration ...
+        try (KeyValueIterator<Integer, String> iter = store.range(2, 4)) {
+            while (iter.hasNext()) {
+                KeyValue<Integer, String> entry = iter.next();
+                if (entry.key.equals(2))
+                    assertEquals("two", entry.value);
+                else if (entry.key.equals(4))
+                    assertEquals("four", entry.value);
+                else
+                    fail("Unexpected entry: " + entry);
             }
+        }
 
-            // Check range iteration ...
-            try (KeyValueIterator<Integer, String> iter = store.range(2, 6)) {
-                while (iter.hasNext()) {
-                    KeyValue<Integer, String> entry = iter.next();
-                    if (entry.key.equals(2))
-                        assertEquals("two", entry.value);
-                    else if (entry.key.equals(4))
-                        assertEquals("four", entry.value);
-                    else
-                        fail("Unexpected entry: " + entry);
-                }
+        // Check range iteration ...
+        try (KeyValueIterator<Integer, String> iter = store.range(2, 6)) {
+            while (iter.hasNext()) {
+                KeyValue<Integer, String> entry = iter.next();
+                if (entry.key.equals(2))
+                    assertEquals("two", entry.value);
+                else if (entry.key.equals(4))
+                    assertEquals("four", entry.value);
+                else
+                    fail("Unexpected entry: " + entry);
             }
-        } finally {
-            store.close();
         }
     }
 
     @Test
     public void testPutGetRangeWithDefaultSerdes() {
-        // Create the test driver ...
-        KeyValueStoreTestDriver<Integer, String> driver = 
KeyValueStoreTestDriver.create(Integer.class, String.class);
-        KeyValueStore<Integer, String> store = 
createKeyValueStore(driver.context(), Integer.class, String.class, true);
-        try {
-
-            // Verify that the store reads and writes correctly ...
-            store.put(0, "zero");
-            store.put(1, "one");
-            store.put(2, "two");
-            store.put(4, "four");
-            store.put(5, "five");
-            assertEquals(5, driver.sizeOf(store));
-            assertEquals("zero", store.get(0));
-            assertEquals("one", store.get(1));
-            assertEquals("two", store.get(2));
-            assertNull(store.get(3));
-            assertEquals("four", store.get(4));
-            assertEquals("five", store.get(5));
-            store.delete(5);
-
-            // Flush the store and verify all current entries were properly 
flushed ...
-            store.flush();
-            assertEquals("zero", driver.flushedEntryStored(0));
-            assertEquals("one", driver.flushedEntryStored(1));
-            assertEquals("two", driver.flushedEntryStored(2));
-            assertEquals("four", driver.flushedEntryStored(4));
-            assertEquals(null, driver.flushedEntryStored(5));
-
-            assertEquals(false, driver.flushedEntryRemoved(0));
-            assertEquals(false, driver.flushedEntryRemoved(1));
-            assertEquals(false, driver.flushedEntryRemoved(2));
-            assertEquals(false, driver.flushedEntryRemoved(4));
-            assertEquals(true, driver.flushedEntryRemoved(5));
-        } finally {
-            store.close();
-        }
+        // Verify that the store reads and writes correctly ...
+        store.put(0, "zero");
+        store.put(1, "one");
+        store.put(2, "two");
+        store.put(4, "four");
+        store.put(5, "five");
+        assertEquals(5, driver.sizeOf(store));
+        assertEquals("zero", store.get(0));
+        assertEquals("one", store.get(1));
+        assertEquals("two", store.get(2));
+        assertNull(store.get(3));
+        assertEquals("four", store.get(4));
+        assertEquals("five", store.get(5));
+        store.delete(5);
+
+        // Flush the store and verify all current entries were properly 
flushed ...
+        store.flush();
+        assertEquals("zero", driver.flushedEntryStored(0));
+        assertEquals("one", driver.flushedEntryStored(1));
+        assertEquals("two", driver.flushedEntryStored(2));
+        assertEquals("four", driver.flushedEntryStored(4));
+        assertEquals(null, driver.flushedEntryStored(5));
+
+        assertEquals(false, driver.flushedEntryRemoved(0));
+        assertEquals(false, driver.flushedEntryRemoved(1));
+        assertEquals(false, driver.flushedEntryRemoved(2));
+        assertEquals(false, driver.flushedEntryRemoved(4));
+        assertEquals(true, driver.flushedEntryRemoved(5));
     }
 
     @Test
     public void testRestore() {
-        // Create the test driver ...
-        KeyValueStoreTestDriver<Integer, String> driver = 
KeyValueStoreTestDriver.create(Integer.class, String.class);
-
+        store.close();
         // Add any entries that will be restored to any store
         // that uses the driver's context ...
         driver.addEntryToRestoreLog(0, "zero");
@@ -154,23 +157,17 @@ public abstract class AbstractKeyValueStoreTest {
 
         // Create the store, which should register with the context and 
automatically
         // receive the restore entries ...
-        KeyValueStore<Integer, String> store = 
createKeyValueStore(driver.context(), Integer.class, String.class, false);
-        try {
-            // Verify that the store's contents were properly restored ...
-            assertEquals(0, driver.checkForRestoredEntries(store));
-
-            // and there are no other entries ...
-            assertEquals(4, driver.sizeOf(store));
-        } finally {
-            store.close();
-        }
+        store = createKeyValueStore(driver.context(), Integer.class, 
String.class, false);
+        // Verify that the store's contents were properly restored ...
+        assertEquals(0, driver.checkForRestoredEntries(store));
+
+        // and there are no other entries ...
+        assertEquals(4, driver.sizeOf(store));
     }
 
     @Test
     public void testRestoreWithDefaultSerdes() {
-        // Create the test driver ...
-        KeyValueStoreTestDriver<Integer, String> driver = 
KeyValueStoreTestDriver.create(Integer.class, String.class);
-
+        store.close();
         // Add any entries that will be restored to any store
         // that uses the driver's context ...
         driver.addEntryToRestoreLog(0, "zero");
@@ -180,70 +177,51 @@ public abstract class AbstractKeyValueStoreTest {
 
         // Create the store, which should register with the context and 
automatically
         // receive the restore entries ...
-        KeyValueStore<Integer, String> store = 
createKeyValueStore(driver.context(), Integer.class, String.class, true);
-        try {
-            // Verify that the store's contents were properly restored ...
-            assertEquals(0, driver.checkForRestoredEntries(store));
-
-            // and there are no other entries ...
-            assertEquals(4, driver.sizeOf(store));
-        } finally {
-            store.close();
-        }
+        store = createKeyValueStore(driver.context(), Integer.class, 
String.class, true);
+        // Verify that the store's contents were properly restored ...
+        assertEquals(0, driver.checkForRestoredEntries(store));
+
+        // and there are no other entries ...
+        assertEquals(4, driver.sizeOf(store));
     }
 
     @Test
     public void testPutIfAbsent() {
-        // Create the test driver ...
-        KeyValueStoreTestDriver<Integer, String> driver = 
KeyValueStoreTestDriver.create(Integer.class, String.class);
-        KeyValueStore<Integer, String> store = 
createKeyValueStore(driver.context(), Integer.class, String.class, true);
-        try {
-
-            // Verify that the store reads and writes correctly ...
-            assertNull(store.putIfAbsent(0, "zero"));
-            assertNull(store.putIfAbsent(1, "one"));
-            assertNull(store.putIfAbsent(2, "two"));
-            assertNull(store.putIfAbsent(4, "four"));
-            assertEquals("four", store.putIfAbsent(4, "unexpected value"));
-            assertEquals(4, driver.sizeOf(store));
-            assertEquals("zero", store.get(0));
-            assertEquals("one", store.get(1));
-            assertEquals("two", store.get(2));
-            assertNull(store.get(3));
-            assertEquals("four", store.get(4));
-
-            // Flush the store and verify all current entries were properly 
flushed ...
-            store.flush();
-            assertEquals("zero", driver.flushedEntryStored(0));
-            assertEquals("one", driver.flushedEntryStored(1));
-            assertEquals("two", driver.flushedEntryStored(2));
-            assertEquals("four", driver.flushedEntryStored(4));
-
-            assertEquals(false, driver.flushedEntryRemoved(0));
-            assertEquals(false, driver.flushedEntryRemoved(1));
-            assertEquals(false, driver.flushedEntryRemoved(2));
-            assertEquals(false, driver.flushedEntryRemoved(4));
-        } finally {
-            store.close();
-        }
+        // Verify that the store reads and writes correctly ...
+        assertNull(store.putIfAbsent(0, "zero"));
+        assertNull(store.putIfAbsent(1, "one"));
+        assertNull(store.putIfAbsent(2, "two"));
+        assertNull(store.putIfAbsent(4, "four"));
+        assertEquals("four", store.putIfAbsent(4, "unexpected value"));
+        assertEquals(4, driver.sizeOf(store));
+        assertEquals("zero", store.get(0));
+        assertEquals("one", store.get(1));
+        assertEquals("two", store.get(2));
+        assertNull(store.get(3));
+        assertEquals("four", store.get(4));
+
+        // Flush the store and verify all current entries were properly 
flushed ...
+        store.flush();
+        assertEquals("zero", driver.flushedEntryStored(0));
+        assertEquals("one", driver.flushedEntryStored(1));
+        assertEquals("two", driver.flushedEntryStored(2));
+        assertEquals("four", driver.flushedEntryStored(4));
+
+        assertEquals(false, driver.flushedEntryRemoved(0));
+        assertEquals(false, driver.flushedEntryRemoved(1));
+        assertEquals(false, driver.flushedEntryRemoved(2));
+        assertEquals(false, driver.flushedEntryRemoved(4));
     }
 
     @Test
     public void testSize() {
-        // Create the test driver ...
-        KeyValueStoreTestDriver<Integer, String> driver = 
KeyValueStoreTestDriver.create(Integer.class, String.class);
-        KeyValueStore<Integer, String> store = 
createKeyValueStore(driver.context(), Integer.class, String.class, true);
-        try {
-            assertEquals("A newly created store should have no entries", 0, 
store.approximateNumEntries());
-
-            store.put(0, "zero");
-            store.put(1, "one");
-            store.put(2, "two");
-            store.put(4, "four");
-            store.put(5, "five");
-            assertEquals(5, store.approximateNumEntries());
-        } finally {
-            store.close();
-        }
+        assertEquals("A newly created store should have no entries", 0, 
store.approximateNumEntries());
+
+        store.put(0, "zero");
+        store.put(1, "one");
+        store.put(2, "two");
+        store.put(4, "four");
+        store.put(5, "five");
+        assertEquals(5, store.approximateNumEntries());
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/69ebf6f7/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreTest.java
index 82071b7..fd9ea96 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreTest.java
@@ -19,7 +19,6 @@ package org.apache.kafka.streams.state.internals;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStoreSupplier;
 import org.apache.kafka.streams.state.KeyValueStore;
-import org.apache.kafka.streams.state.KeyValueStoreTestDriver;
 import org.apache.kafka.streams.state.Stores;
 import org.junit.Test;
 
@@ -51,54 +50,47 @@ public class InMemoryLRUCacheStoreTest extends 
AbstractKeyValueStoreTest {
     @Test
     public void testEvict() {
         // Create the test driver ...
-        KeyValueStoreTestDriver<Integer, String> driver = 
KeyValueStoreTestDriver.create(Integer.class, String.class);
-        KeyValueStore<Integer, String> store = 
createKeyValueStore(driver.context(), Integer.class, String.class, false);
+        store.put(0, "zero");
+        store.put(1, "one");
+        store.put(2, "two");
+        store.put(3, "three");
+        store.put(4, "four");
+        store.put(5, "five");
+        store.put(6, "six");
+        store.put(7, "seven");
+        store.put(8, "eight");
+        store.put(9, "nine");
+        assertEquals(10, driver.sizeOf(store));
 
-        try {
-            store.put(0, "zero");
-            store.put(1, "one");
-            store.put(2, "two");
-            store.put(3, "three");
-            store.put(4, "four");
-            store.put(5, "five");
-            store.put(6, "six");
-            store.put(7, "seven");
-            store.put(8, "eight");
-            store.put(9, "nine");
-            assertEquals(10, driver.sizeOf(store));
+        store.put(10, "ten");
+        store.flush();
+        assertEquals(10, driver.sizeOf(store));
+        assertTrue(driver.flushedEntryRemoved(0));
+        assertEquals(1, driver.numFlushedEntryRemoved());
 
-            store.put(10, "ten");
-            store.flush();
-            assertEquals(10, driver.sizeOf(store));
-            assertTrue(driver.flushedEntryRemoved(0));
-            assertEquals(1, driver.numFlushedEntryRemoved());
+        store.delete(1);
+        store.flush();
+        assertEquals(9, driver.sizeOf(store));
+        assertTrue(driver.flushedEntryRemoved(0));
+        assertTrue(driver.flushedEntryRemoved(1));
+        assertEquals(2, driver.numFlushedEntryRemoved());
 
-            store.delete(1);
-            store.flush();
-            assertEquals(9, driver.sizeOf(store));
-            assertTrue(driver.flushedEntryRemoved(0));
-            assertTrue(driver.flushedEntryRemoved(1));
-            assertEquals(2, driver.numFlushedEntryRemoved());
+        store.put(11, "eleven");
+        store.flush();
+        assertEquals(10, driver.sizeOf(store));
+        assertEquals(2, driver.numFlushedEntryRemoved());
 
-            store.put(11, "eleven");
-            store.flush();
-            assertEquals(10, driver.sizeOf(store));
-            assertEquals(2, driver.numFlushedEntryRemoved());
+        store.put(2, "two-again");
+        store.flush();
+        assertEquals(10, driver.sizeOf(store));
+        assertEquals(2, driver.numFlushedEntryRemoved());
 
-            store.put(2, "two-again");
-            store.flush();
-            assertEquals(10, driver.sizeOf(store));
-            assertEquals(2, driver.numFlushedEntryRemoved());
-
-            store.put(12, "twelve");
-            store.flush();
-            assertEquals(10, driver.sizeOf(store));
-            assertTrue(driver.flushedEntryRemoved(0));
-            assertTrue(driver.flushedEntryRemoved(1));
-            assertTrue(driver.flushedEntryRemoved(3));
-            assertEquals(3, driver.numFlushedEntryRemoved());
-        } finally {
-            store.close();
-        }
+        store.put(12, "twelve");
+        store.flush();
+        assertEquals(10, driver.sizeOf(store));
+        assertTrue(driver.flushedEntryRemoved(0));
+        assertTrue(driver.flushedEntryRemoved(1));
+        assertTrue(driver.flushedEntryRemoved(3));
+        assertEquals(3, driver.numFlushedEntryRemoved());
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/69ebf6f7/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
index 84c0320..521fa32 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
@@ -40,6 +40,7 @@ import java.io.IOException;
 import java.nio.file.Files;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -64,7 +65,14 @@ public class RocksDBWindowStoreTest {
 
     @SuppressWarnings("unchecked")
     protected <K, V> WindowStore<K, V> createWindowStore(ProcessorContext 
context) {
-        StateStoreSupplier supplier = new 
RocksDBWindowStoreSupplier<>(windowName, retentionPeriod, numSegments, true, 
intSerde, stringSerde);
+        StateStoreSupplier supplier = new 
RocksDBWindowStoreSupplier<>(windowName,
+                                                                       
retentionPeriod,
+                                                                       
numSegments,
+                                                                       true,
+                                                                       
intSerde,
+                                                                       
stringSerde,
+                                                                       true,
+                                                                       
Collections.<String, String>emptyMap());
 
         WindowStore<K, V> store = (WindowStore<K, V>) supplier.get();
         store.init(context, store);

http://git-wip-us.apache.org/repos/asf/kafka/blob/69ebf6f7/streams/src/test/java/org/apache/kafka/streams/state/internals/StateStoreTestUtils.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/StateStoreTestUtils.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/StateStoreTestUtils.java
index ec5d841..39b127f 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/StateStoreTestUtils.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/StateStoreTestUtils.java
@@ -29,12 +29,18 @@ import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
 import org.apache.kafka.streams.state.StateSerdes;
 import org.apache.kafka.test.MockProcessorContext;
 
+import java.util.Collections;
+
 @SuppressWarnings("unchecked")
 public class StateStoreTestUtils {
 
     public static <K, V> KeyValueStore<K, V> newKeyValueStore(String name, 
Class<K> keyType, Class<V> valueType) {
         final InMemoryKeyValueStoreSupplier<K, V> supplier = new 
InMemoryKeyValueStoreSupplier<>(name,
-                null, null, new MockTime());
+                                                                               
                  null,
+                                                                               
                  null,
+                                                                               
                  new MockTime(),
+                                                                               
                  false,
+                                                                               
                  Collections.<String, String>emptyMap());
 
         final StateStore stateStore = supplier.get();
         stateStore.init(new 
MockProcessorContext(StateSerdes.withBuiltinTypes(name, keyType, valueType),

http://git-wip-us.apache.org/repos/asf/kafka/blob/69ebf6f7/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java
index 19cd8e9..7675f9b 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java
@@ -25,7 +25,6 @@ import java.util.Map;
 
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StreamPartitioner;
 import org.apache.kafka.streams.processor.internals.RecordCollector;
 import org.apache.kafka.streams.state.StateSerdes;
@@ -41,7 +40,7 @@ public class StoreChangeLoggerTest {
     private final Map<Integer, String> logged = new HashMap<>();
     private final Map<Integer, String> written = new HashMap<>();
 
-    private final ProcessorContext context = new 
MockProcessorContext(StateSerdes.withBuiltinTypes(topic, Integer.class, 
String.class),
+    private final MockProcessorContext context = new 
MockProcessorContext(StateSerdes.withBuiltinTypes(topic, Integer.class, 
String.class),
             new RecordCollector(null, "StoreChangeLoggerTest") {
                 @SuppressWarnings("unchecked")
                 @Override
@@ -69,6 +68,7 @@ public class StoreChangeLoggerTest {
 
     @Test
     public void testAddRemove() {
+        context.setTime(1);
         written.put(0, "zero");
         changeLogger.add(0);
         written.put(1, "one");

http://git-wip-us.apache.org/repos/asf/kafka/blob/69ebf6f7/streams/src/test/java/org/apache/kafka/test/MockStateStoreSupplier.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/test/MockStateStoreSupplier.java 
b/streams/src/test/java/org/apache/kafka/test/MockStateStoreSupplier.java
index f24dfda..3532623 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockStateStoreSupplier.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockStateStoreSupplier.java
@@ -25,6 +25,8 @@ import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StateStoreSupplier;
 
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Map;
 
 public class MockStateStoreSupplier implements StateStoreSupplier {
     private final String name;
@@ -55,6 +57,16 @@ public class MockStateStoreSupplier implements 
StateStoreSupplier {
         }
     }
 
+    @Override
+    public Map<String, String> logConfig() {
+        return Collections.emptyMap();
+    }
+
+    @Override
+    public boolean loggingEnabled() {
+        return loggingEnabled;
+    }
+
     public static class MockStateStore implements StateStore {
         private final String name;
         private final boolean persistent;

Reply via email to