This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new e981627 improve internal topic integration test (#4437) e981627 is described below commit e98162792684b0874c60003c6a596ec739c934a3 Author: Guozhang Wang <wangg...@gmail.com> AuthorDate: Thu Jan 18 08:57:42 2018 -0800 improve internal topic integration test (#4437) Reviewers: Damian Guy <damian....@gmail.com> --- .../integration/InternalTopicIntegrationTest.java | 179 +++++++++++---------- 1 file changed, 90 insertions(+), 89 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java index 65a6de7..1469d18 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java @@ -22,7 +22,6 @@ import kafka.zk.AdminZkClient; import kafka.zk.KafkaZkClient; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.utils.Time; @@ -32,24 +31,27 @@ import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.TimeWindows; import org.apache.kafka.streams.kstream.ValueMapper; import org.apache.kafka.streams.processor.internals.ProcessorStateManager; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.WindowStore; import org.apache.kafka.test.IntegrationTest; import org.apache.kafka.test.MockMapper; import org.apache.kafka.test.TestUtils; +import org.junit.After; import org.junit.Before; import org.junit.BeforeClass; import org.junit.ClassRule; import org.junit.Test; import org.junit.experimental.categories.Category; -import scala.Tuple2; -import scala.collection.Iterator; -import scala.collection.Map; +import java.io.IOException; import java.util.Arrays; import java.util.List; import java.util.Locale; +import java.util.Map; import java.util.Properties; import java.util.concurrent.TimeUnit; @@ -61,93 +63,90 @@ import static org.junit.Assert.assertTrue; */ @Category({IntegrationTest.class}) public class InternalTopicIntegrationTest { - private static final int NUM_BROKERS = 1; @ClassRule - public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS); - private final MockTime mockTime = CLUSTER.time; + public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1); + + private static final String APP_ID = "internal-topics-integration-test"; private static final String DEFAULT_INPUT_TOPIC = "inputTopic"; - private static final String DEFAULT_OUTPUT_TOPIC = "outputTopic"; private static final int DEFAULT_ZK_SESSION_TIMEOUT_MS = 10 * 1000; private static final int DEFAULT_ZK_CONNECTION_TIMEOUT_MS = 8 * 1000; - private Properties streamsConfiguration; - private String applicationId = "compact-topics-integration-test"; + + private final MockTime mockTime = CLUSTER.time; + + private Properties streamsProp; @BeforeClass public static void startKafkaCluster() throws InterruptedException { - CLUSTER.createTopics(DEFAULT_INPUT_TOPIC, DEFAULT_OUTPUT_TOPIC); + CLUSTER.createTopics(DEFAULT_INPUT_TOPIC); } @Before public void before() { - streamsConfiguration = new Properties(); - streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId); - streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); - streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); - streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); - streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); - streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - streamsConfiguration.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true); - streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100); - streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); + streamsProp = new Properties(); + streamsProp.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); + streamsProp.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); + streamsProp.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); + streamsProp.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); + streamsProp.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + streamsProp.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true); + streamsProp.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100); + streamsProp.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); + } + + @After + public void after() throws IOException { + // Remove any state from previous test runs + IntegrationTestUtils.purgeLocalStreamsState(streamsProp); } - private Properties getTopicConfigProperties(final String changelog) { - final KafkaZkClient kafkaZkClient = KafkaZkClient.apply(CLUSTER.zKConnectString(), false, - DEFAULT_ZK_SESSION_TIMEOUT_MS, DEFAULT_ZK_CONNECTION_TIMEOUT_MS, Integer.MAX_VALUE, Time.SYSTEM, - "testMetricGroup", "testMetricType"); - try { + private void produceData(final List<String> inputValues) throws Exception { + final Properties producerProp = new Properties(); + producerProp.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); + producerProp.put(ProducerConfig.ACKS_CONFIG, "all"); + producerProp.put(ProducerConfig.RETRIES_CONFIG, 0); + producerProp.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + producerProp.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + + IntegrationTestUtils.produceValuesSynchronously(DEFAULT_INPUT_TOPIC, inputValues, producerProp, mockTime); + } + + private Properties getTopicProperties(final String changelog) { + try (KafkaZkClient kafkaZkClient = KafkaZkClient.apply(CLUSTER.zKConnectString(), false, + DEFAULT_ZK_SESSION_TIMEOUT_MS, DEFAULT_ZK_CONNECTION_TIMEOUT_MS, Integer.MAX_VALUE, + Time.SYSTEM, "testMetricGroup", "testMetricType")) { final AdminZkClient adminZkClient = new AdminZkClient(kafkaZkClient); + final Map<String, Properties> topicConfigs = scala.collection.JavaConversions.mapAsJavaMap(adminZkClient.getAllTopicConfigs()); - final Map<String, Properties> topicConfigs = adminZkClient.getAllTopicConfigs(); - final Iterator it = topicConfigs.iterator(); - while (it.hasNext()) { - final Tuple2<String, Properties> topicConfig = (Tuple2<String, Properties>) it.next(); - final String topic = topicConfig._1; - final Properties prop = topicConfig._2; - if (topic.equals(changelog)) { - return prop; - } + for (Map.Entry<String, Properties> topicConfig : topicConfigs.entrySet()) { + if (topicConfig.getKey().equals(changelog)) + return topicConfig.getValue(); } + return new Properties(); - } finally { - kafkaZkClient.close(); } } @Test - public void shouldCompactTopicsForStateChangelogs() throws Exception { + public void shouldCompactTopicsForKeyValueStoreChangelogs() throws Exception { + final String appID = APP_ID + "-compact"; + streamsProp.put(StreamsConfig.APPLICATION_ID_CONFIG, appID); + // // Step 1: Configure and start a simple word count topology // - final Serde<String> stringSerde = Serdes.String(); - final Serde<Long> longSerde = Serdes.Long(); - - final Properties streamsConfiguration = new Properties(); - streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "compact-topics-integration-test"); - streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); - streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); - streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); - streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); - streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); final StreamsBuilder builder = new StreamsBuilder(); - final KStream<String, String> textLines = builder.stream(DEFAULT_INPUT_TOPIC); - final KStream<String, Long> wordCounts = textLines - .flatMapValues(new ValueMapper<String, Iterable<String>>() { - @Override - public Iterable<String> apply(final String value) { - return Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+")); - } - }).groupBy(MockMapper.<String, String>selectValueMapper()) - .count("Counts").toStream(); - - wordCounts.to(stringSerde, longSerde, DEFAULT_OUTPUT_TOPIC); - - // Remove any state from previous test runs - IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration); + textLines.flatMapValues(new ValueMapper<String, Iterable<String>>() { + @Override + public Iterable<String> apply(final String value) { + return Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+")); + } + }) + .groupBy(MockMapper.<String, String>selectValueMapper()) + .count(Materialized.<String, Long, KeyValueStore<org.apache.kafka.common.utils.Bytes, byte[]>>as("Counts")); - final KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfiguration); + final KafkaStreams streams = new KafkaStreams(builder.build(), streamsProp); streams.start(); // @@ -159,41 +158,39 @@ public class InternalTopicIntegrationTest { // Step 3: Verify the state changelog topics are compact // streams.close(); - final Properties properties = getTopicConfigProperties(ProcessorStateManager.storeChangelogTopic(applicationId, "Counts")); - assertEquals(LogConfig.Compact(), properties.getProperty(LogConfig.CleanupPolicyProp())); - } - private void produceData(final List<String> inputValues) throws Exception { - final Properties producerConfig = new Properties(); - producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); - producerConfig.put(ProducerConfig.ACKS_CONFIG, "all"); - producerConfig.put(ProducerConfig.RETRIES_CONFIG, 0); - producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); - producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); - IntegrationTestUtils.produceValuesSynchronously(DEFAULT_INPUT_TOPIC, inputValues, producerConfig, mockTime); + final Properties changelogProps = getTopicProperties(ProcessorStateManager.storeChangelogTopic(appID, "Counts")); + assertEquals(LogConfig.Compact(), changelogProps.getProperty(LogConfig.CleanupPolicyProp())); + + final Properties repartitionProps = getTopicProperties(appID + "-Counts-repartition"); + assertEquals(LogConfig.Delete(), repartitionProps.getProperty(LogConfig.CleanupPolicyProp())); + assertEquals(4, repartitionProps.size()); } @Test - public void shouldUseCompactAndDeleteForWindowStoreChangelogs() throws Exception { - StreamsBuilder builder = new StreamsBuilder(); + public void shouldCompactAndDeleteTopicsForWindowStoreChangelogs() throws Exception { + final String appID = APP_ID + "-compact-delete"; + streamsProp.put(StreamsConfig.APPLICATION_ID_CONFIG, appID); + // + // Step 1: Configure and start a simple word count topology + // + StreamsBuilder builder = new StreamsBuilder(); KStream<String, String> textLines = builder.stream(DEFAULT_INPUT_TOPIC); final int durationMs = 2000; - textLines - .flatMapValues(new ValueMapper<String, Iterable<String>>() { - @Override - public Iterable<String> apply(String value) { - return Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+")); - } - }).groupBy(MockMapper.<String, String>selectValueMapper()) - .count(TimeWindows.of(1000).until(durationMs), "CountWindows").toStream(); - - // Remove any state from previous test runs - IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration); + textLines.flatMapValues(new ValueMapper<String, Iterable<String>>() { + @Override + public Iterable<String> apply(final String value) { + return Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+")); + } + }) + .groupBy(MockMapper.<String, String>selectValueMapper()) + .windowedBy(TimeWindows.of(1000).until(2000)) + .count(Materialized.<String, Long, WindowStore<org.apache.kafka.common.utils.Bytes, byte[]>>as("CountWindows")); - KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfiguration); + KafkaStreams streams = new KafkaStreams(builder.build(), streamsProp); streams.start(); // @@ -205,7 +202,7 @@ public class InternalTopicIntegrationTest { // Step 3: Verify the state changelog topics are compact // streams.close(); - final Properties properties = getTopicConfigProperties(ProcessorStateManager.storeChangelogTopic(applicationId, "CountWindows")); + final Properties properties = getTopicProperties(ProcessorStateManager.storeChangelogTopic(appID, "CountWindows")); final List<String> policies = Arrays.asList(properties.getProperty(LogConfig.CleanupPolicyProp()).split(",")); assertEquals(2, policies.size()); assertTrue(policies.contains(LogConfig.Compact())); @@ -213,5 +210,9 @@ public class InternalTopicIntegrationTest { // retention should be 1 day + the window duration final long retention = TimeUnit.MILLISECONDS.convert(1, TimeUnit.DAYS) + durationMs; assertEquals(retention, Long.parseLong(properties.getProperty(LogConfig.RetentionMsProp()))); + + final Properties repartitionProps = getTopicProperties(appID + "-CountWindows-repartition"); + assertEquals(LogConfig.Delete(), repartitionProps.getProperty(LogConfig.CleanupPolicyProp())); + assertEquals(4, repartitionProps.size()); } } -- To stop receiving notification emails like this one, please contact ['"commits@kafka.apache.org" <commits@kafka.apache.org>'].