Repository: kafka Updated Branches: refs/heads/trunk eba0ede87 -> 69ebf6f7b
http://git-wip-us.apache.org/repos/asf/kafka/blob/69ebf6f7/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java index b9a1cf6..2d9b9a5 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java @@ -35,10 +35,12 @@ import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KStreamBuilder; +import org.apache.kafka.streams.kstream.TimeWindows; import org.apache.kafka.streams.kstream.ValueMapper; import org.apache.kafka.streams.processor.internals.ProcessorStateManager; import org.apache.kafka.test.MockKeyValueMapper; import org.apache.kafka.test.TestUtils; +import org.junit.Before; import org.junit.BeforeClass; import org.junit.ClassRule; import org.junit.Test; @@ -50,8 +52,10 @@ import java.util.Arrays; import java.util.List; import java.util.Locale; import java.util.Properties; +import java.util.concurrent.TimeUnit; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; /** * Tests related to internal topics in streams @@ -65,6 +69,8 @@ public class InternalTopicIntegrationTest { private static final String DEFAULT_OUTPUT_TOPIC = "outputTopic"; private static final int DEFAULT_ZK_SESSION_TIMEOUT_MS = 10 * 1000; private static final int DEFAULT_ZK_CONNECTION_TIMEOUT_MS = 8 * 1000; + private Properties streamsConfiguration; + private String applicationId = "compact-topics-integration-test"; @BeforeClass public static void startKafkaCluster() throws Exception { @@ -72,14 +78,20 @@ public class InternalTopicIntegrationTest { CLUSTER.createTopic(DEFAULT_OUTPUT_TOPIC); } - /** - * Validates that any state changelog topics are compacted - * - * @return true if topics have a valid config, false otherwise - */ - private boolean isUsingCompactionForStateChangelogTopics() { - boolean valid = true; + @Before + public void before() { + streamsConfiguration = new Properties(); + streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId); + streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); + streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, CLUSTER.zKConnectString()); + streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); + streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); + streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); + streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + } + + private Properties getTopicConfigProperties(final String changelog) { // Note: You must initialize the ZkClient with ZKStringSerializer. If you don't, then // createTopic() will only seem to work (it will return without error). The topic will exist in // only ZooKeeper and will be returned when listing topics, but Kafka itself does not create the @@ -89,33 +101,28 @@ public class InternalTopicIntegrationTest { DEFAULT_ZK_SESSION_TIMEOUT_MS, DEFAULT_ZK_CONNECTION_TIMEOUT_MS, ZKStringSerializer$.MODULE$); - final boolean isSecure = false; - final ZkUtils zkUtils = new ZkUtils(zkClient, new ZkConnection(CLUSTER.zKConnectString()), isSecure); - - final Map<String, Properties> topicConfigs = AdminUtils.fetchAllTopicConfigs(zkUtils); - final Iterator it = topicConfigs.iterator(); - while (it.hasNext()) { - final Tuple2<String, Properties> topicConfig = (Tuple2<String, Properties>) it.next(); - final String topic = topicConfig._1; - final Properties prop = topicConfig._2; - - // state changelogs should be compacted - if (topic.endsWith(ProcessorStateManager.STATE_CHANGELOG_TOPIC_SUFFIX)) { - if (!prop.containsKey(LogConfig.CleanupPolicyProp()) || - !prop.getProperty(LogConfig.CleanupPolicyProp()).equals(LogConfig.Compact())) { - valid = false; - break; + try { + final boolean isSecure = false; + final ZkUtils zkUtils = new ZkUtils(zkClient, new ZkConnection(CLUSTER.zKConnectString()), isSecure); + + final Map<String, Properties> topicConfigs = AdminUtils.fetchAllTopicConfigs(zkUtils); + final Iterator it = topicConfigs.iterator(); + while (it.hasNext()) { + final Tuple2<String, Properties> topicConfig = (Tuple2<String, Properties>) it.next(); + final String topic = topicConfig._1; + final Properties prop = topicConfig._2; + if (topic.equals(changelog)) { + return prop; } } + return new Properties(); + } finally { + zkClient.close(); } - zkClient.close(); - return valid; } @Test public void shouldCompactTopicsForStateChangelogs() throws Exception { - final List<String> inputValues = Arrays.asList("hello", "world", "world", "hello world"); - // // Step 1: Configure and start a simple word count topology // @@ -154,6 +161,17 @@ public class InternalTopicIntegrationTest { // // Step 2: Produce some input data to the input topic. // + produceData(Arrays.asList("hello", "world", "world", "hello world")); + + // + // Step 3: Verify the state changelog topics are compact + // + streams.close(); + final Properties properties = getTopicConfigProperties(ProcessorStateManager.storeChangelogTopic(applicationId, "Counts")); + assertEquals(LogConfig.Compact(), properties.getProperty(LogConfig.CleanupPolicyProp())); + } + + private void produceData(final List<String> inputValues) throws java.util.concurrent.ExecutionException, InterruptedException { final Properties producerConfig = new Properties(); producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); producerConfig.put(ProducerConfig.ACKS_CONFIG, "all"); @@ -161,11 +179,47 @@ public class InternalTopicIntegrationTest { producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); IntegrationTestUtils.produceValuesSynchronously(DEFAULT_INPUT_TOPIC, inputValues, producerConfig, mockTime); + } + + @Test + public void shouldUseCompactAndDeleteForWindowStoreChangelogs() throws Exception { + KStreamBuilder builder = new KStreamBuilder(); + + KStream<String, String> textLines = builder.stream(DEFAULT_INPUT_TOPIC); + + final int durationMs = 2000; + textLines + .flatMapValues(new ValueMapper<String, Iterable<String>>() { + @Override + public Iterable<String> apply(String value) { + return Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+")); + } + }).groupBy(MockKeyValueMapper.<String, String>SelectValueMapper()) + .count(TimeWindows.of(1000).until(durationMs), "CountWindows").toStream(); + + + // Remove any state from previous test runs + IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration); + + KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration); + streams.start(); + + // + // Step 2: Produce some input data to the input topic. + // + produceData(Arrays.asList("hello", "world", "world", "hello world")); // // Step 3: Verify the state changelog topics are compact // streams.close(); - assertEquals(isUsingCompactionForStateChangelogTopics(), true); + final Properties properties = getTopicConfigProperties(ProcessorStateManager.storeChangelogTopic(applicationId, "CountWindows")); + final List<String> policies = Arrays.asList(properties.getProperty(LogConfig.CleanupPolicyProp()).split(",")); + assertEquals(2, policies.size()); + assertTrue(policies.contains(LogConfig.Compact())); + assertTrue(policies.contains(LogConfig.Delete())); + // retention should be 1 day + the window duration + final Long retention = TimeUnit.MILLISECONDS.convert(1, TimeUnit.DAYS) + durationMs; + assertEquals(retention, Long.valueOf(properties.getProperty(LogConfig.RetentionMsProp()))); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/69ebf6f7/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java index fe66acb..a4c008a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java @@ -19,19 +19,24 @@ package org.apache.kafka.streams.processor; import org.apache.kafka.streams.errors.TopologyBuilderException; import org.apache.kafka.streams.processor.TopologyBuilder.TopicsInfo; +import org.apache.kafka.streams.processor.internals.InternalTopicConfig; +import org.apache.kafka.streams.processor.internals.InternalTopicManager; import org.apache.kafka.streams.processor.internals.ProcessorNode; import org.apache.kafka.streams.processor.internals.ProcessorStateManager; import org.apache.kafka.streams.processor.internals.ProcessorTopology; +import org.apache.kafka.streams.state.internals.RocksDBWindowStoreSupplier; import org.apache.kafka.test.MockProcessorSupplier; import org.apache.kafka.test.MockStateStoreSupplier; import org.junit.Test; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Properties; import java.util.Set; import java.util.regex.Pattern; @@ -264,9 +269,9 @@ public class TopologyBuilderTest { Map<Integer, TopicsInfo> topicGroups = builder.topicGroups(); Map<Integer, TopicsInfo> expectedTopicGroups = new HashMap<>(); - expectedTopicGroups.put(0, new TopicsInfo(Collections.<String>emptySet(), mkSet("topic-1", "X-topic-1x", "topic-2"), Collections.<String>emptySet(), Collections.<String>emptySet())); - expectedTopicGroups.put(1, new TopicsInfo(Collections.<String>emptySet(), mkSet("topic-3", "topic-4"), Collections.<String>emptySet(), Collections.<String>emptySet())); - expectedTopicGroups.put(2, new TopicsInfo(Collections.<String>emptySet(), mkSet("topic-5"), Collections.<String>emptySet(), Collections.<String>emptySet())); + expectedTopicGroups.put(0, new TopicsInfo(Collections.<String>emptySet(), mkSet("topic-1", "X-topic-1x", "topic-2"), Collections.<String, InternalTopicConfig>emptyMap(), Collections.<String, InternalTopicConfig>emptyMap())); + expectedTopicGroups.put(1, new TopicsInfo(Collections.<String>emptySet(), mkSet("topic-3", "topic-4"), Collections.<String, InternalTopicConfig>emptyMap(), Collections.<String, InternalTopicConfig>emptyMap())); + expectedTopicGroups.put(2, new TopicsInfo(Collections.<String>emptySet(), mkSet("topic-5"), Collections.<String, InternalTopicConfig>emptyMap(), Collections.<String, InternalTopicConfig>emptyMap())); assertEquals(3, topicGroups.size()); assertEquals(expectedTopicGroups, topicGroups); @@ -302,9 +307,32 @@ public class TopologyBuilderTest { Map<Integer, TopicsInfo> topicGroups = builder.topicGroups(); Map<Integer, TopicsInfo> expectedTopicGroups = new HashMap<>(); - expectedTopicGroups.put(0, new TopicsInfo(Collections.<String>emptySet(), mkSet("topic-1", "topic-1x", "topic-2"), Collections.<String>emptySet(), mkSet(ProcessorStateManager.storeChangelogTopic("X", "store-1")))); - expectedTopicGroups.put(1, new TopicsInfo(Collections.<String>emptySet(), mkSet("topic-3", "topic-4"), Collections.<String>emptySet(), mkSet(ProcessorStateManager.storeChangelogTopic("X", "store-2")))); - expectedTopicGroups.put(2, new TopicsInfo(Collections.<String>emptySet(), mkSet("topic-5"), Collections.<String>emptySet(), mkSet(ProcessorStateManager.storeChangelogTopic("X", "store-3")))); + final String store1 = ProcessorStateManager.storeChangelogTopic("X", "store-1"); + final String store2 = ProcessorStateManager.storeChangelogTopic("X", "store-2"); + final String store3 = ProcessorStateManager.storeChangelogTopic("X", "store-3"); + expectedTopicGroups.put(0, new TopicsInfo(Collections.<String>emptySet(), mkSet("topic-1", "topic-1x", "topic-2"), + Collections.<String, InternalTopicConfig>emptyMap(), + Collections.singletonMap(store1, + new InternalTopicConfig( + store1, + Collections.singleton(InternalTopicConfig.CleanupPolicy.compact), + Collections.<String, String>emptyMap())))); + expectedTopicGroups.put(1, new TopicsInfo(Collections.<String>emptySet(), mkSet("topic-3", "topic-4"), + Collections.<String, InternalTopicConfig>emptyMap(), + Collections.singletonMap(store2, + new InternalTopicConfig( + store2, + Collections.singleton(InternalTopicConfig.CleanupPolicy.compact), + Collections.<String, String>emptyMap())))); + expectedTopicGroups.put(2, new TopicsInfo(Collections.<String>emptySet(), mkSet("topic-5"), + Collections.<String, InternalTopicConfig>emptyMap(), + Collections.singletonMap(store3, + new InternalTopicConfig( + store3, + Collections.singleton(InternalTopicConfig.CleanupPolicy.compact), + Collections.<String, String>emptyMap())))); + + assertEquals(3, topicGroups.size()); assertEquals(expectedTopicGroups, topicGroups); @@ -390,7 +418,7 @@ public class TopologyBuilderTest { @Test(expected = NullPointerException.class) public void shouldNotAddNullStateStoreSupplier() throws Exception { final TopologyBuilder builder = new TopologyBuilder(); - builder.addStateStore(null, true); + builder.addStateStore(null); } private Set<String> nodeNames(Collection<ProcessorNode> nodes) { @@ -406,7 +434,7 @@ public class TopologyBuilderTest { final TopologyBuilder builder = new TopologyBuilder(); builder.addSource("source", "topic"); builder.addProcessor("processor", new MockProcessorSupplier(), "source"); - builder.addStateStore(new MockStateStoreSupplier("store", false), true, "processor"); + builder.addStateStore(new MockStateStoreSupplier("store", false), "processor"); final Map<String, Set<String>> stateStoreNameToSourceTopic = builder.stateStoreNameToSourceTopics(); assertEquals(1, stateStoreNameToSourceTopic.size()); assertEquals(Collections.singleton("topic"), stateStoreNameToSourceTopic.get("store")); @@ -417,7 +445,7 @@ public class TopologyBuilderTest { final TopologyBuilder builder = new TopologyBuilder(); builder.addSource("source", "topic"); builder.addProcessor("processor", new MockProcessorSupplier(), "source"); - builder.addStateStore(new MockStateStoreSupplier("store", false), false, "processor"); + builder.addStateStore(new MockStateStoreSupplier("store", false), "processor"); final Map<String, Set<String>> stateStoreNameToSourceTopic = builder.stateStoreNameToSourceTopics(); assertEquals(1, stateStoreNameToSourceTopic.size()); assertEquals(Collections.singleton("topic"), stateStoreNameToSourceTopic.get("store")); @@ -430,10 +458,63 @@ public class TopologyBuilderTest { builder.addInternalTopic("internal-topic"); builder.addSource("source", "internal-topic"); builder.addProcessor("processor", new MockProcessorSupplier(), "source"); - builder.addStateStore(new MockStateStoreSupplier("store", false), true, "processor"); + builder.addStateStore(new MockStateStoreSupplier("store", false), "processor"); final Map<String, Set<String>> stateStoreNameToSourceTopic = builder.stateStoreNameToSourceTopics(); assertEquals(1, stateStoreNameToSourceTopic.size()); assertEquals(Collections.singleton("appId-internal-topic"), stateStoreNameToSourceTopic.get("store")); } + @SuppressWarnings("unchecked") + @Test + public void shouldAddInternalTopicConfigWithCompactAndDeleteSetForWindowStores() throws Exception { + final TopologyBuilder builder = new TopologyBuilder(); + builder.setApplicationId("appId"); + builder.addSource("source", "topic"); + builder.addProcessor("processor", new MockProcessorSupplier(), "source"); + builder.addStateStore(new RocksDBWindowStoreSupplier("store", 30000, 3, false, null, null, true, Collections.<String, String>emptyMap()), "processor"); + final Map<Integer, TopicsInfo> topicGroups = builder.topicGroups(); + final TopicsInfo topicsInfo = topicGroups.values().iterator().next(); + final InternalTopicConfig topicConfig = topicsInfo.stateChangelogTopics.get("appId-store-changelog"); + final Properties properties = topicConfig.toProperties(0); + final List<String> policies = Arrays.asList(properties.getProperty(InternalTopicManager.CLEANUP_POLICY_PROP).split(",")); + assertEquals("appId-store-changelog", topicConfig.name()); + assertTrue(policies.contains("compact")); + assertTrue(policies.contains("delete")); + assertEquals(2, policies.size()); + assertEquals("30000", properties.getProperty(InternalTopicManager.RETENTION_MS)); + assertEquals(2, properties.size()); + } + + @SuppressWarnings("unchecked") + @Test + public void shouldAddInternalTopicConfigWithCompactForNonWindowStores() throws Exception { + final TopologyBuilder builder = new TopologyBuilder(); + builder.setApplicationId("appId"); + builder.addSource("source", "topic"); + builder.addProcessor("processor", new MockProcessorSupplier(), "source"); + builder.addStateStore(new MockStateStoreSupplier("name", true), "processor"); + final Map<Integer, TopicsInfo> topicGroups = builder.topicGroups(); + final TopicsInfo topicsInfo = topicGroups.values().iterator().next(); + final InternalTopicConfig topicConfig = topicsInfo.stateChangelogTopics.get("appId-name-changelog"); + final Properties properties = topicConfig.toProperties(0); + assertEquals("appId-name-changelog", topicConfig.name()); + assertEquals("compact", properties.getProperty(InternalTopicManager.CLEANUP_POLICY_PROP)); + assertEquals(1, properties.size()); + } + + @SuppressWarnings("unchecked") + @Test + public void shouldAddInternalTopicConfigWithCleanupPolicyDeleteForInternalTopics() throws Exception { + final TopologyBuilder builder = new TopologyBuilder(); + builder.setApplicationId("appId"); + builder.addInternalTopic("foo"); + builder.addSource("source", "foo"); + final TopicsInfo topicsInfo = builder.topicGroups().values().iterator().next(); + final InternalTopicConfig topicConfig = topicsInfo.interSourceTopics.get("appId-foo"); + final Properties properties = topicConfig.toProperties(0); + assertEquals("appId-foo", topicConfig.name()); + assertEquals("delete", properties.getProperty(InternalTopicManager.CLEANUP_POLICY_PROP)); + assertEquals(1, properties.size()); + } + } http://git-wip-us.apache.org/repos/asf/kafka/blob/69ebf6f7/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicConfigTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicConfigTest.java new file mode 100644 index 0000000..b0a198b --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicConfigTest.java @@ -0,0 +1,122 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.processor.internals; + +import org.apache.kafka.common.utils.Utils; +import org.junit.Test; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +public class InternalTopicConfigTest { + + @Test + public void shouldHaveCompactionPropSetIfSupplied() throws Exception { + final Properties properties = new InternalTopicConfig("name", + Collections.singleton(InternalTopicConfig.CleanupPolicy.compact), + Collections.<String, String>emptyMap()).toProperties(0); + assertEquals("compact", properties.getProperty(InternalTopicManager.CLEANUP_POLICY_PROP)); + } + + + @Test(expected = NullPointerException.class) + public void shouldThrowIfNameIsNull() throws Exception { + new InternalTopicConfig(null, Collections.singleton(InternalTopicConfig.CleanupPolicy.compact), Collections.<String, String>emptyMap()); + } + + @Test + public void shouldConfigureRetentionMsWithAdditionalRetentionWhenCompactAndDelete() throws Exception { + final InternalTopicConfig topicConfig = new InternalTopicConfig("name", + Utils.mkSet(InternalTopicConfig.CleanupPolicy.compact, InternalTopicConfig.CleanupPolicy.delete), + Collections.<String, String>emptyMap()); + final int additionalRetentionMs = 20; + topicConfig.setRetentionMs(10); + final Properties properties = topicConfig.toProperties(additionalRetentionMs); + assertEquals("30", properties.getProperty(InternalTopicManager.RETENTION_MS)); + } + + @Test + public void shouldNotConfigureRetentionMsWhenCompact() throws Exception { + final InternalTopicConfig topicConfig = new InternalTopicConfig("name", + Collections.singleton(InternalTopicConfig.CleanupPolicy.compact), + Collections.<String, String>emptyMap()); + topicConfig.setRetentionMs(10); + final Properties properties = topicConfig.toProperties(0); + assertNull(null, properties.getProperty(InternalTopicManager.RETENTION_MS)); + } + + @Test + public void shouldNotConfigureRetentionMsWhenDelete() throws Exception { + final InternalTopicConfig topicConfig = new InternalTopicConfig("name", + Collections.singleton(InternalTopicConfig.CleanupPolicy.delete), + Collections.<String, String>emptyMap()); + topicConfig.setRetentionMs(10); + final Properties properties = topicConfig.toProperties(0); + assertNull(null, properties.getProperty(InternalTopicManager.RETENTION_MS)); + } + + + @Test + public void shouldBeCompactedIfCleanupPolicyCompactOrCompactAndDelete() throws Exception { + assertTrue(new InternalTopicConfig("name", + Collections.singleton(InternalTopicConfig.CleanupPolicy.compact), + Collections.<String, String>emptyMap()).isCompacted()); + assertTrue(new InternalTopicConfig("name", Utils.mkSet(InternalTopicConfig.CleanupPolicy.compact, + InternalTopicConfig.CleanupPolicy.delete), + Collections.<String, String>emptyMap()).isCompacted()); + } + + @Test + public void shouldNotBeCompactedWhenCleanupPolicyIsDelete() throws Exception { + assertFalse(new InternalTopicConfig("name", + Collections.singleton(InternalTopicConfig.CleanupPolicy.delete), + Collections.<String, String>emptyMap()).isCompacted()); + } + + @Test + public void shouldUseCleanupPolicyFromConfigIfSupplied() throws Exception { + final InternalTopicConfig config = new InternalTopicConfig("name", + Collections.singleton(InternalTopicConfig.CleanupPolicy.delete), + Collections.singletonMap("cleanup.policy", "compact")); + + final Properties properties = config.toProperties(0); + assertEquals("compact", properties.getProperty("cleanup.policy")); + } + + @Test + public void shouldHavePropertiesSuppliedByUser() throws Exception { + final Map<String, String> configs = new HashMap<>(); + configs.put("retention.ms", "1000"); + configs.put("retention.bytes", "10000"); + + final InternalTopicConfig topicConfig = new InternalTopicConfig("name", + Collections.singleton(InternalTopicConfig.CleanupPolicy.delete), + configs); + + final Properties properties = topicConfig.toProperties(0); + assertEquals("1000", properties.getProperty("retention.ms")); + assertEquals("10000", properties.getProperty("retention.bytes")); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/69ebf6f7/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java index e300966..e5ae7d8 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java @@ -735,15 +735,15 @@ public class StreamPartitionAssignorTest { } @Override - public void makeReady(String topic, int numPartitions, boolean compactTopic) { - readyTopics.put(topic, numPartitions); + public void makeReady(InternalTopicConfig topic, int numPartitions) { + readyTopics.put(topic.name(), numPartitions); List<PartitionInfo> partitions = new ArrayList<>(); for (int i = 0; i < numPartitions; i++) { - partitions.add(new PartitionInfo(topic, i, null, null, null)); + partitions.add(new PartitionInfo(topic.name(), i, null, null, null)); } - restoreConsumer.updatePartitions(topic, partitions); + restoreConsumer.updatePartitions(topic.name(), partitions); } } } http://git-wip-us.apache.org/repos/asf/kafka/blob/69ebf6f7/streams/src/test/java/org/apache/kafka/streams/state/StoresTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/StoresTest.java b/streams/src/test/java/org/apache/kafka/streams/state/StoresTest.java new file mode 100644 index 0000000..18d158d --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/state/StoresTest.java @@ -0,0 +1,84 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.state; + +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.processor.StateStoreSupplier; +import org.junit.Test; + +import java.util.Collections; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class StoresTest { + + @Test + public void shouldCreateInMemoryStoreSupplierWithLoggedConfig() throws Exception { + final StateStoreSupplier supplier = Stores.create("store") + .withKeys(Serdes.String()) + .withValues(Serdes.String()) + .inMemory() + .enableLogging(Collections.singletonMap("retention.ms", "1000")) + .build(); + + final Map<String, String> config = supplier.logConfig(); + assertTrue(supplier.loggingEnabled()); + assertEquals("1000", config.get("retention.ms")); + } + + @Test + public void shouldCreateInMemoryStoreSupplierNotLogged() throws Exception { + final StateStoreSupplier supplier = Stores.create("store") + .withKeys(Serdes.String()) + .withValues(Serdes.String()) + .inMemory() + .disableLogging() + .build(); + + assertFalse(supplier.loggingEnabled()); + } + + @Test + public void shouldCreatePersistenStoreSupplierWithLoggedConfig() throws Exception { + final StateStoreSupplier supplier = Stores.create("store") + .withKeys(Serdes.String()) + .withValues(Serdes.String()) + .persistent() + .enableLogging(Collections.singletonMap("retention.ms", "1000")) + .build(); + + final Map<String, String> config = supplier.logConfig(); + assertTrue(supplier.loggingEnabled()); + assertEquals("1000", config.get("retention.ms")); + } + + @Test + public void shouldCreatePersistenStoreSupplierNotLogged() throws Exception { + final StateStoreSupplier supplier = Stores.create("store") + .withKeys(Serdes.String()) + .withValues(Serdes.String()) + .persistent() + .disableLogging() + .build(); + + assertFalse(supplier.loggingEnabled()); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/69ebf6f7/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java index 8a22d37..700655e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java @@ -26,125 +26,128 @@ import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.KeyValueStoreTestDriver; +import org.apache.kafka.test.MockProcessorContext; +import org.junit.After; +import org.junit.Before; import org.junit.Test; public abstract class AbstractKeyValueStoreTest { + + protected abstract <K, V> KeyValueStore<K, V> createKeyValueStore(ProcessorContext context, Class<K> keyClass, Class<V> valueClass, boolean useContextSerdes); + protected KeyValueStore<Integer, String> store; + protected KeyValueStoreTestDriver<Integer, String> driver; + + @Before + public void before() { + driver = KeyValueStoreTestDriver.create(Integer.class, String.class); + final MockProcessorContext context = (MockProcessorContext) driver.context(); + context.setTime(10); + store = createKeyValueStore(context, Integer.class, String.class, false); + } + + @After + public void after() { + store.close(); + } + @Test public void testPutGetRange() { - // Create the test driver ... - KeyValueStoreTestDriver<Integer, String> driver = KeyValueStoreTestDriver.create(Integer.class, String.class); - KeyValueStore<Integer, String> store = createKeyValueStore(driver.context(), Integer.class, String.class, false); - try { - - // Verify that the store reads and writes correctly ... - store.put(0, "zero"); - store.put(1, "one"); - store.put(2, "two"); - store.put(4, "four"); - store.put(5, "five"); - assertEquals(5, driver.sizeOf(store)); - assertEquals("zero", store.get(0)); - assertEquals("one", store.get(1)); - assertEquals("two", store.get(2)); - assertNull(store.get(3)); - assertEquals("four", store.get(4)); - assertEquals("five", store.get(5)); - store.delete(5); - - // Flush the store and verify all current entries were properly flushed ... - store.flush(); - assertEquals("zero", driver.flushedEntryStored(0)); - assertEquals("one", driver.flushedEntryStored(1)); - assertEquals("two", driver.flushedEntryStored(2)); - assertEquals("four", driver.flushedEntryStored(4)); - assertEquals(null, driver.flushedEntryStored(5)); - - assertEquals(false, driver.flushedEntryRemoved(0)); - assertEquals(false, driver.flushedEntryRemoved(1)); - assertEquals(false, driver.flushedEntryRemoved(2)); - assertEquals(false, driver.flushedEntryRemoved(4)); - assertEquals(true, driver.flushedEntryRemoved(5)); - - // Check range iteration ... - try (KeyValueIterator<Integer, String> iter = store.range(2, 4)) { - while (iter.hasNext()) { - KeyValue<Integer, String> entry = iter.next(); - if (entry.key.equals(2)) - assertEquals("two", entry.value); - else if (entry.key.equals(4)) - assertEquals("four", entry.value); - else - fail("Unexpected entry: " + entry); - } + // Verify that the store reads and writes correctly ... + store.put(0, "zero"); + store.put(1, "one"); + store.put(2, "two"); + store.put(4, "four"); + store.put(5, "five"); + assertEquals(5, driver.sizeOf(store)); + assertEquals("zero", store.get(0)); + assertEquals("one", store.get(1)); + assertEquals("two", store.get(2)); + assertNull(store.get(3)); + assertEquals("four", store.get(4)); + assertEquals("five", store.get(5)); + store.delete(5); + + // Flush the store and verify all current entries were properly flushed ... + store.flush(); + assertEquals("zero", driver.flushedEntryStored(0)); + assertEquals("one", driver.flushedEntryStored(1)); + assertEquals("two", driver.flushedEntryStored(2)); + assertEquals("four", driver.flushedEntryStored(4)); + assertEquals(null, driver.flushedEntryStored(5)); + + assertEquals(false, driver.flushedEntryRemoved(0)); + assertEquals(false, driver.flushedEntryRemoved(1)); + assertEquals(false, driver.flushedEntryRemoved(2)); + assertEquals(false, driver.flushedEntryRemoved(4)); + assertEquals(true, driver.flushedEntryRemoved(5)); + + // Check range iteration ... + try (KeyValueIterator<Integer, String> iter = store.range(2, 4)) { + while (iter.hasNext()) { + KeyValue<Integer, String> entry = iter.next(); + if (entry.key.equals(2)) + assertEquals("two", entry.value); + else if (entry.key.equals(4)) + assertEquals("four", entry.value); + else + fail("Unexpected entry: " + entry); } + } - // Check range iteration ... - try (KeyValueIterator<Integer, String> iter = store.range(2, 6)) { - while (iter.hasNext()) { - KeyValue<Integer, String> entry = iter.next(); - if (entry.key.equals(2)) - assertEquals("two", entry.value); - else if (entry.key.equals(4)) - assertEquals("four", entry.value); - else - fail("Unexpected entry: " + entry); - } + // Check range iteration ... + try (KeyValueIterator<Integer, String> iter = store.range(2, 6)) { + while (iter.hasNext()) { + KeyValue<Integer, String> entry = iter.next(); + if (entry.key.equals(2)) + assertEquals("two", entry.value); + else if (entry.key.equals(4)) + assertEquals("four", entry.value); + else + fail("Unexpected entry: " + entry); } - } finally { - store.close(); } } @Test public void testPutGetRangeWithDefaultSerdes() { - // Create the test driver ... - KeyValueStoreTestDriver<Integer, String> driver = KeyValueStoreTestDriver.create(Integer.class, String.class); - KeyValueStore<Integer, String> store = createKeyValueStore(driver.context(), Integer.class, String.class, true); - try { - - // Verify that the store reads and writes correctly ... - store.put(0, "zero"); - store.put(1, "one"); - store.put(2, "two"); - store.put(4, "four"); - store.put(5, "five"); - assertEquals(5, driver.sizeOf(store)); - assertEquals("zero", store.get(0)); - assertEquals("one", store.get(1)); - assertEquals("two", store.get(2)); - assertNull(store.get(3)); - assertEquals("four", store.get(4)); - assertEquals("five", store.get(5)); - store.delete(5); - - // Flush the store and verify all current entries were properly flushed ... - store.flush(); - assertEquals("zero", driver.flushedEntryStored(0)); - assertEquals("one", driver.flushedEntryStored(1)); - assertEquals("two", driver.flushedEntryStored(2)); - assertEquals("four", driver.flushedEntryStored(4)); - assertEquals(null, driver.flushedEntryStored(5)); - - assertEquals(false, driver.flushedEntryRemoved(0)); - assertEquals(false, driver.flushedEntryRemoved(1)); - assertEquals(false, driver.flushedEntryRemoved(2)); - assertEquals(false, driver.flushedEntryRemoved(4)); - assertEquals(true, driver.flushedEntryRemoved(5)); - } finally { - store.close(); - } + // Verify that the store reads and writes correctly ... + store.put(0, "zero"); + store.put(1, "one"); + store.put(2, "two"); + store.put(4, "four"); + store.put(5, "five"); + assertEquals(5, driver.sizeOf(store)); + assertEquals("zero", store.get(0)); + assertEquals("one", store.get(1)); + assertEquals("two", store.get(2)); + assertNull(store.get(3)); + assertEquals("four", store.get(4)); + assertEquals("five", store.get(5)); + store.delete(5); + + // Flush the store and verify all current entries were properly flushed ... + store.flush(); + assertEquals("zero", driver.flushedEntryStored(0)); + assertEquals("one", driver.flushedEntryStored(1)); + assertEquals("two", driver.flushedEntryStored(2)); + assertEquals("four", driver.flushedEntryStored(4)); + assertEquals(null, driver.flushedEntryStored(5)); + + assertEquals(false, driver.flushedEntryRemoved(0)); + assertEquals(false, driver.flushedEntryRemoved(1)); + assertEquals(false, driver.flushedEntryRemoved(2)); + assertEquals(false, driver.flushedEntryRemoved(4)); + assertEquals(true, driver.flushedEntryRemoved(5)); } @Test public void testRestore() { - // Create the test driver ... - KeyValueStoreTestDriver<Integer, String> driver = KeyValueStoreTestDriver.create(Integer.class, String.class); - + store.close(); // Add any entries that will be restored to any store // that uses the driver's context ... driver.addEntryToRestoreLog(0, "zero"); @@ -154,23 +157,17 @@ public abstract class AbstractKeyValueStoreTest { // Create the store, which should register with the context and automatically // receive the restore entries ... - KeyValueStore<Integer, String> store = createKeyValueStore(driver.context(), Integer.class, String.class, false); - try { - // Verify that the store's contents were properly restored ... - assertEquals(0, driver.checkForRestoredEntries(store)); - - // and there are no other entries ... - assertEquals(4, driver.sizeOf(store)); - } finally { - store.close(); - } + store = createKeyValueStore(driver.context(), Integer.class, String.class, false); + // Verify that the store's contents were properly restored ... + assertEquals(0, driver.checkForRestoredEntries(store)); + + // and there are no other entries ... + assertEquals(4, driver.sizeOf(store)); } @Test public void testRestoreWithDefaultSerdes() { - // Create the test driver ... - KeyValueStoreTestDriver<Integer, String> driver = KeyValueStoreTestDriver.create(Integer.class, String.class); - + store.close(); // Add any entries that will be restored to any store // that uses the driver's context ... driver.addEntryToRestoreLog(0, "zero"); @@ -180,70 +177,51 @@ public abstract class AbstractKeyValueStoreTest { // Create the store, which should register with the context and automatically // receive the restore entries ... - KeyValueStore<Integer, String> store = createKeyValueStore(driver.context(), Integer.class, String.class, true); - try { - // Verify that the store's contents were properly restored ... - assertEquals(0, driver.checkForRestoredEntries(store)); - - // and there are no other entries ... - assertEquals(4, driver.sizeOf(store)); - } finally { - store.close(); - } + store = createKeyValueStore(driver.context(), Integer.class, String.class, true); + // Verify that the store's contents were properly restored ... + assertEquals(0, driver.checkForRestoredEntries(store)); + + // and there are no other entries ... + assertEquals(4, driver.sizeOf(store)); } @Test public void testPutIfAbsent() { - // Create the test driver ... - KeyValueStoreTestDriver<Integer, String> driver = KeyValueStoreTestDriver.create(Integer.class, String.class); - KeyValueStore<Integer, String> store = createKeyValueStore(driver.context(), Integer.class, String.class, true); - try { - - // Verify that the store reads and writes correctly ... - assertNull(store.putIfAbsent(0, "zero")); - assertNull(store.putIfAbsent(1, "one")); - assertNull(store.putIfAbsent(2, "two")); - assertNull(store.putIfAbsent(4, "four")); - assertEquals("four", store.putIfAbsent(4, "unexpected value")); - assertEquals(4, driver.sizeOf(store)); - assertEquals("zero", store.get(0)); - assertEquals("one", store.get(1)); - assertEquals("two", store.get(2)); - assertNull(store.get(3)); - assertEquals("four", store.get(4)); - - // Flush the store and verify all current entries were properly flushed ... - store.flush(); - assertEquals("zero", driver.flushedEntryStored(0)); - assertEquals("one", driver.flushedEntryStored(1)); - assertEquals("two", driver.flushedEntryStored(2)); - assertEquals("four", driver.flushedEntryStored(4)); - - assertEquals(false, driver.flushedEntryRemoved(0)); - assertEquals(false, driver.flushedEntryRemoved(1)); - assertEquals(false, driver.flushedEntryRemoved(2)); - assertEquals(false, driver.flushedEntryRemoved(4)); - } finally { - store.close(); - } + // Verify that the store reads and writes correctly ... + assertNull(store.putIfAbsent(0, "zero")); + assertNull(store.putIfAbsent(1, "one")); + assertNull(store.putIfAbsent(2, "two")); + assertNull(store.putIfAbsent(4, "four")); + assertEquals("four", store.putIfAbsent(4, "unexpected value")); + assertEquals(4, driver.sizeOf(store)); + assertEquals("zero", store.get(0)); + assertEquals("one", store.get(1)); + assertEquals("two", store.get(2)); + assertNull(store.get(3)); + assertEquals("four", store.get(4)); + + // Flush the store and verify all current entries were properly flushed ... + store.flush(); + assertEquals("zero", driver.flushedEntryStored(0)); + assertEquals("one", driver.flushedEntryStored(1)); + assertEquals("two", driver.flushedEntryStored(2)); + assertEquals("four", driver.flushedEntryStored(4)); + + assertEquals(false, driver.flushedEntryRemoved(0)); + assertEquals(false, driver.flushedEntryRemoved(1)); + assertEquals(false, driver.flushedEntryRemoved(2)); + assertEquals(false, driver.flushedEntryRemoved(4)); } @Test public void testSize() { - // Create the test driver ... - KeyValueStoreTestDriver<Integer, String> driver = KeyValueStoreTestDriver.create(Integer.class, String.class); - KeyValueStore<Integer, String> store = createKeyValueStore(driver.context(), Integer.class, String.class, true); - try { - assertEquals("A newly created store should have no entries", 0, store.approximateNumEntries()); - - store.put(0, "zero"); - store.put(1, "one"); - store.put(2, "two"); - store.put(4, "four"); - store.put(5, "five"); - assertEquals(5, store.approximateNumEntries()); - } finally { - store.close(); - } + assertEquals("A newly created store should have no entries", 0, store.approximateNumEntries()); + + store.put(0, "zero"); + store.put(1, "one"); + store.put(2, "two"); + store.put(4, "four"); + store.put(5, "five"); + assertEquals(5, store.approximateNumEntries()); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/69ebf6f7/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreTest.java index 82071b7..fd9ea96 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreTest.java @@ -19,7 +19,6 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStoreSupplier; import org.apache.kafka.streams.state.KeyValueStore; -import org.apache.kafka.streams.state.KeyValueStoreTestDriver; import org.apache.kafka.streams.state.Stores; import org.junit.Test; @@ -51,54 +50,47 @@ public class InMemoryLRUCacheStoreTest extends AbstractKeyValueStoreTest { @Test public void testEvict() { // Create the test driver ... - KeyValueStoreTestDriver<Integer, String> driver = KeyValueStoreTestDriver.create(Integer.class, String.class); - KeyValueStore<Integer, String> store = createKeyValueStore(driver.context(), Integer.class, String.class, false); + store.put(0, "zero"); + store.put(1, "one"); + store.put(2, "two"); + store.put(3, "three"); + store.put(4, "four"); + store.put(5, "five"); + store.put(6, "six"); + store.put(7, "seven"); + store.put(8, "eight"); + store.put(9, "nine"); + assertEquals(10, driver.sizeOf(store)); - try { - store.put(0, "zero"); - store.put(1, "one"); - store.put(2, "two"); - store.put(3, "three"); - store.put(4, "four"); - store.put(5, "five"); - store.put(6, "six"); - store.put(7, "seven"); - store.put(8, "eight"); - store.put(9, "nine"); - assertEquals(10, driver.sizeOf(store)); + store.put(10, "ten"); + store.flush(); + assertEquals(10, driver.sizeOf(store)); + assertTrue(driver.flushedEntryRemoved(0)); + assertEquals(1, driver.numFlushedEntryRemoved()); - store.put(10, "ten"); - store.flush(); - assertEquals(10, driver.sizeOf(store)); - assertTrue(driver.flushedEntryRemoved(0)); - assertEquals(1, driver.numFlushedEntryRemoved()); + store.delete(1); + store.flush(); + assertEquals(9, driver.sizeOf(store)); + assertTrue(driver.flushedEntryRemoved(0)); + assertTrue(driver.flushedEntryRemoved(1)); + assertEquals(2, driver.numFlushedEntryRemoved()); - store.delete(1); - store.flush(); - assertEquals(9, driver.sizeOf(store)); - assertTrue(driver.flushedEntryRemoved(0)); - assertTrue(driver.flushedEntryRemoved(1)); - assertEquals(2, driver.numFlushedEntryRemoved()); + store.put(11, "eleven"); + store.flush(); + assertEquals(10, driver.sizeOf(store)); + assertEquals(2, driver.numFlushedEntryRemoved()); - store.put(11, "eleven"); - store.flush(); - assertEquals(10, driver.sizeOf(store)); - assertEquals(2, driver.numFlushedEntryRemoved()); + store.put(2, "two-again"); + store.flush(); + assertEquals(10, driver.sizeOf(store)); + assertEquals(2, driver.numFlushedEntryRemoved()); - store.put(2, "two-again"); - store.flush(); - assertEquals(10, driver.sizeOf(store)); - assertEquals(2, driver.numFlushedEntryRemoved()); - - store.put(12, "twelve"); - store.flush(); - assertEquals(10, driver.sizeOf(store)); - assertTrue(driver.flushedEntryRemoved(0)); - assertTrue(driver.flushedEntryRemoved(1)); - assertTrue(driver.flushedEntryRemoved(3)); - assertEquals(3, driver.numFlushedEntryRemoved()); - } finally { - store.close(); - } + store.put(12, "twelve"); + store.flush(); + assertEquals(10, driver.sizeOf(store)); + assertTrue(driver.flushedEntryRemoved(0)); + assertTrue(driver.flushedEntryRemoved(1)); + assertTrue(driver.flushedEntryRemoved(3)); + assertEquals(3, driver.numFlushedEntryRemoved()); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/69ebf6f7/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java index 84c0320..521fa32 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java @@ -40,6 +40,7 @@ import java.io.IOException; import java.nio.file.Files; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -64,7 +65,14 @@ public class RocksDBWindowStoreTest { @SuppressWarnings("unchecked") protected <K, V> WindowStore<K, V> createWindowStore(ProcessorContext context) { - StateStoreSupplier supplier = new RocksDBWindowStoreSupplier<>(windowName, retentionPeriod, numSegments, true, intSerde, stringSerde); + StateStoreSupplier supplier = new RocksDBWindowStoreSupplier<>(windowName, + retentionPeriod, + numSegments, + true, + intSerde, + stringSerde, + true, + Collections.<String, String>emptyMap()); WindowStore<K, V> store = (WindowStore<K, V>) supplier.get(); store.init(context, store); http://git-wip-us.apache.org/repos/asf/kafka/blob/69ebf6f7/streams/src/test/java/org/apache/kafka/streams/state/internals/StateStoreTestUtils.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StateStoreTestUtils.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/StateStoreTestUtils.java index ec5d841..39b127f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StateStoreTestUtils.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StateStoreTestUtils.java @@ -29,12 +29,18 @@ import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; import org.apache.kafka.streams.state.StateSerdes; import org.apache.kafka.test.MockProcessorContext; +import java.util.Collections; + @SuppressWarnings("unchecked") public class StateStoreTestUtils { public static <K, V> KeyValueStore<K, V> newKeyValueStore(String name, Class<K> keyType, Class<V> valueType) { final InMemoryKeyValueStoreSupplier<K, V> supplier = new InMemoryKeyValueStoreSupplier<>(name, - null, null, new MockTime()); + null, + null, + new MockTime(), + false, + Collections.<String, String>emptyMap()); final StateStore stateStore = supplier.get(); stateStore.init(new MockProcessorContext(StateSerdes.withBuiltinTypes(name, keyType, valueType), http://git-wip-us.apache.org/repos/asf/kafka/blob/69ebf6f7/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java index 19cd8e9..7675f9b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java @@ -25,7 +25,6 @@ import java.util.Map; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.Serializer; -import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StreamPartitioner; import org.apache.kafka.streams.processor.internals.RecordCollector; import org.apache.kafka.streams.state.StateSerdes; @@ -41,7 +40,7 @@ public class StoreChangeLoggerTest { private final Map<Integer, String> logged = new HashMap<>(); private final Map<Integer, String> written = new HashMap<>(); - private final ProcessorContext context = new MockProcessorContext(StateSerdes.withBuiltinTypes(topic, Integer.class, String.class), + private final MockProcessorContext context = new MockProcessorContext(StateSerdes.withBuiltinTypes(topic, Integer.class, String.class), new RecordCollector(null, "StoreChangeLoggerTest") { @SuppressWarnings("unchecked") @Override @@ -69,6 +68,7 @@ public class StoreChangeLoggerTest { @Test public void testAddRemove() { + context.setTime(1); written.put(0, "zero"); changeLogger.add(0); written.put(1, "one"); http://git-wip-us.apache.org/repos/asf/kafka/blob/69ebf6f7/streams/src/test/java/org/apache/kafka/test/MockStateStoreSupplier.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/test/MockStateStoreSupplier.java b/streams/src/test/java/org/apache/kafka/test/MockStateStoreSupplier.java index f24dfda..3532623 100644 --- a/streams/src/test/java/org/apache/kafka/test/MockStateStoreSupplier.java +++ b/streams/src/test/java/org/apache/kafka/test/MockStateStoreSupplier.java @@ -25,6 +25,8 @@ import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.StateStoreSupplier; import java.util.ArrayList; +import java.util.Collections; +import java.util.Map; public class MockStateStoreSupplier implements StateStoreSupplier { private final String name; @@ -55,6 +57,16 @@ public class MockStateStoreSupplier implements StateStoreSupplier { } } + @Override + public Map<String, String> logConfig() { + return Collections.emptyMap(); + } + + @Override + public boolean loggingEnabled() { + return loggingEnabled; + } + public static class MockStateStore implements StateStore { private final String name; private final boolean persistent;
