This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 70f25b9 MINOR: Improve on reset integration test (#4436) 70f25b9 is described below commit 70f25b95192bb7d56c62c8b8bc80b478b8e08ef9 Author: Guozhang Wang <wangg...@gmail.com> AuthorDate: Thu Jan 18 13:54:59 2018 -0800 MINOR: Improve on reset integration test (#4436) * Parameterize abstract reset integration test Reviewers: Damian Guy <damian....@gmail.com>, Matthias J. Sax <mj...@apache.org> --- .../integration/AbstractResetIntegrationTest.java | 534 +++++++++------------ .../streams/integration/ResetIntegrationTest.java | 64 +-- .../integration/ResetIntegrationWithSslTest.java | 62 ++- 3 files changed, 275 insertions(+), 385 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java index 26673ca..5819b6d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java @@ -21,11 +21,13 @@ import kafka.tools.StreamsResetter; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.admin.KafkaAdminClient; import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.config.SslConfigs; -import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.config.types.Password; import org.apache.kafka.common.serialization.LongDeserializer; import org.apache.kafka.common.serialization.LongSerializer; import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.streams.KafkaStreams; @@ -40,271 +42,319 @@ import org.apache.kafka.streams.kstream.KeyValueMapper; import org.apache.kafka.streams.kstream.Produced; import org.apache.kafka.streams.kstream.TimeWindows; import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.test.IntegrationTest; import org.apache.kafka.test.TestCondition; import org.apache.kafka.test.TestUtils; +import org.junit.AfterClass; import org.junit.Assert; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.junit.Rule; +import org.junit.experimental.categories.Category; +import org.junit.rules.TemporaryFolder; import java.io.BufferedWriter; import java.io.File; import java.io.FileWriter; -import java.io.IOException; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Arrays; import java.util.Calendar; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Properties; import java.util.concurrent.TimeUnit; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; -abstract class AbstractResetIntegrationTest { - private final static Logger log = LoggerFactory.getLogger(AbstractResetIntegrationTest.class); +@Category({IntegrationTest.class}) +public abstract class AbstractResetIntegrationTest { + static String testId; + static EmbeddedKafkaCluster cluster; + static Map<String, Object> sslConfig = null; + private static KafkaStreams streams; + private static MockTime mockTime; + private static AdminClient adminClient = null; + private static KafkaAdminClient kafkaAdminClient = null; + + @AfterClass + public static void afterClassCleanup() { + if (adminClient != null) { + adminClient.close(); + adminClient = null; + } + if (kafkaAdminClient != null) { + kafkaAdminClient.close(10, TimeUnit.SECONDS); + kafkaAdminClient = null; + } + } + + private String appID; + private Properties commonClientConfig; + + private void prepareEnvironment() { + if (adminClient == null) { + adminClient = AdminClient.create(commonClientConfig); + } + if (kafkaAdminClient == null) { + kafkaAdminClient = (KafkaAdminClient) org.apache.kafka.clients.admin.AdminClient.create(commonClientConfig); + } + + // we align time to seconds to get clean window boundaries and thus ensure the same result for each run + // otherwise, input records could fall into different windows for different runs depending on the initial mock time + final long alignedTime = (System.currentTimeMillis() / 1000 + 1) * 1000; + mockTime = cluster.time; + mockTime.setCurrentTimeMs(alignedTime); + } + + private void prepareConfigs() { + commonClientConfig = new Properties(); + commonClientConfig.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers()); - static final int NUM_BROKERS = 1; + if (sslConfig != null) { + commonClientConfig.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, sslConfig.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG)); + commonClientConfig.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, ((Password) sslConfig.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG)).value()); + commonClientConfig.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL"); + } + + PRODUCER_CONFIG.put(ProducerConfig.ACKS_CONFIG, "all"); + PRODUCER_CONFIG.put(ProducerConfig.RETRIES_CONFIG, 0); + PRODUCER_CONFIG.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class); + PRODUCER_CONFIG.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + PRODUCER_CONFIG.putAll(commonClientConfig); + + RESULT_CONSUMER_CONFIG.put(ConsumerConfig.GROUP_ID_CONFIG, testId + "-result-consumer"); + RESULT_CONSUMER_CONFIG.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + RESULT_CONSUMER_CONFIG.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class); + RESULT_CONSUMER_CONFIG.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class); + RESULT_CONSUMER_CONFIG.putAll(commonClientConfig); + + STREAMS_CONFIG.put(StreamsConfig.STATE_DIR_CONFIG, testFolder.getRoot().getPath()); + STREAMS_CONFIG.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Long().getClass()); + STREAMS_CONFIG.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); + STREAMS_CONFIG.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); + STREAMS_CONFIG.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100); + STREAMS_CONFIG.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100); + STREAMS_CONFIG.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + STREAMS_CONFIG.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "" + STREAMS_CONSUMER_TIMEOUT); + STREAMS_CONFIG.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true); + STREAMS_CONFIG.putAll(commonClientConfig); + } + + @Rule + public final TemporaryFolder testFolder = new TemporaryFolder(TestUtils.tempDirectory()); - private static final String APP_ID = "cleanup-integration-test"; private static final String INPUT_TOPIC = "inputTopic"; private static final String OUTPUT_TOPIC = "outputTopic"; private static final String OUTPUT_TOPIC_2 = "outputTopic2"; private static final String OUTPUT_TOPIC_2_RERUN = "outputTopic2_rerun"; private static final String INTERMEDIATE_USER_TOPIC = "userTopic"; - private static final String NON_EXISTING_TOPIC = "nonExistingTopic2"; + private static final String NON_EXISTING_TOPIC = "nonExistingTopic"; private static final long STREAMS_CONSUMER_TIMEOUT = 2000L; private static final long CLEANUP_CONSUMER_TIMEOUT = 2000L; private static final int TIMEOUT_MULTIPLIER = 5; - private static AdminClient adminClient = null; - private static KafkaAdminClient kafkaAdminClient = null; - private static int testNo = 0; - - static EmbeddedKafkaCluster cluster; - static String bootstrapServers; - static MockTime mockTime; - - private final AbstractResetIntegrationTest.WaitUntilConsumerGroupGotClosed consumerGroupInactive = new AbstractResetIntegrationTest.WaitUntilConsumerGroupGotClosed(); - - private class WaitUntilConsumerGroupGotClosed implements TestCondition { + private final TestCondition consumerGroupInactiveCondition = new TestCondition() { @Override public boolean conditionMet() { - return adminClient.describeConsumerGroup(APP_ID, 0).consumers().get().isEmpty(); + return adminClient.describeConsumerGroup(testId + "-result-consumer", 0).consumers().get().isEmpty(); } + }; + + private static final Properties STREAMS_CONFIG = new Properties(); + private final static Properties PRODUCER_CONFIG = new Properties(); + private final static Properties RESULT_CONSUMER_CONFIG = new Properties(); + + void prepareTest() throws Exception { + cluster.deleteAndRecreateTopics(INPUT_TOPIC, OUTPUT_TOPIC, OUTPUT_TOPIC_2, OUTPUT_TOPIC_2_RERUN); + + prepareConfigs(); + prepareEnvironment(); + + add10InputElements(); } - static void afterClassGlobalCleanup() { - if (adminClient != null) { - adminClient.close(); - adminClient = null; + void cleanupTest() throws Exception { + if (streams != null) { + streams.close(30, TimeUnit.SECONDS); } + IntegrationTestUtils.purgeLocalStreamsState(STREAMS_CONFIG); + } - if (kafkaAdminClient != null) { - kafkaAdminClient.close(10, TimeUnit.SECONDS); - kafkaAdminClient = null; + private void add10InputElements() throws java.util.concurrent.ExecutionException, InterruptedException { + List<KeyValue<Long, String>> records = Arrays.asList(KeyValue.pair(0L, "aaa"), + KeyValue.pair(1L, "bbb"), + KeyValue.pair(0L, "ccc"), + KeyValue.pair(1L, "ddd"), + KeyValue.pair(0L, "eee"), + KeyValue.pair(1L, "fff"), + KeyValue.pair(0L, "ggg"), + KeyValue.pair(1L, "hhh"), + KeyValue.pair(0L, "iii"), + KeyValue.pair(1L, "jjj")); + + for (KeyValue<Long, String> record : records) { + mockTime.sleep(10); + IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(record), PRODUCER_CONFIG, mockTime.milliseconds()); } } - void beforePrepareTest() throws Exception { - ++testNo; - mockTime = cluster.time; - bootstrapServers = cluster.bootstrapServers(); - - // we align time to seconds to get clean window boundaries and thus ensure the same result for each run - // otherwise, input records could fall into different windows for different runs depending on the initial mock time - final long alignedTime = (System.currentTimeMillis() / 1000 + 1) * 1000; - mockTime.setCurrentTimeMs(alignedTime); + void shouldNotAllowToResetWhileStreamsIsRunning() throws Exception { + appID = testId + "-not-reset-during-runtime"; + final String[] parameters = new String[] { + "--application-id", appID, + "--bootstrap-servers", cluster.bootstrapServers(), + "--input-topics", NON_EXISTING_TOPIC }; + final Properties cleanUpConfig = new Properties(); + cleanUpConfig.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100); + cleanUpConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "" + CLEANUP_CONSUMER_TIMEOUT); - Properties sslConfig = getClientSslConfig(); - if (sslConfig == null) { - sslConfig = new Properties(); - sslConfig.put("bootstrap.servers", bootstrapServers); - } + STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID); - if (adminClient == null) { - adminClient = AdminClient.create(sslConfig); - } + // RUN + streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), STREAMS_CONFIG); + streams.start(); - if (kafkaAdminClient == null) { - kafkaAdminClient = (KafkaAdminClient) org.apache.kafka.clients.admin.AdminClient.create(sslConfig); - } + final int exitCode = new StreamsResetter().run(parameters, cleanUpConfig); + Assert.assertEquals(1, exitCode); - // busy wait until cluster (ie, ConsumerGroupCoordinator) is available - while (true) { - Thread.sleep(50); + streams.close(); + } - try { - TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT, - "Test consumer group active even after waiting " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms."); - } catch (final TimeoutException e) { - continue; - } - break; - } + public void shouldNotAllowToResetWhenInputTopicAbsent() throws Exception { + appID = testId + "-not-reset-without-input-topic"; + final String[] parameters = new String[] { + "--application-id", appID, + "--bootstrap-servers", cluster.bootstrapServers(), + "--input-topics", NON_EXISTING_TOPIC }; + final Properties cleanUpConfig = new Properties(); + cleanUpConfig.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100); + cleanUpConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "" + CLEANUP_CONSUMER_TIMEOUT); - prepareInputData(); + final int exitCode = new StreamsResetter().run(parameters, cleanUpConfig); + Assert.assertEquals(1, exitCode); } - Properties getClientSslConfig() { - return null; + public void shouldNotAllowToResetWhenIntermediateTopicAbsent() throws Exception { + appID = testId + "-not-reset-without-intermediate-topic"; + final String[] parameters = new String[] { + "--application-id", appID, + "--bootstrap-servers", cluster.bootstrapServers(), + "--input-topics", NON_EXISTING_TOPIC }; + final Properties cleanUpConfig = new Properties(); + cleanUpConfig.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100); + cleanUpConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "" + CLEANUP_CONSUMER_TIMEOUT); + + final int exitCode = new StreamsResetter().run(parameters, cleanUpConfig); + Assert.assertEquals(1, exitCode); } void testReprocessingFromScratchAfterResetWithoutIntermediateUserTopic() throws Exception { - final Properties sslConfig = getClientSslConfig(); - final Properties streamsConfiguration = prepareTest(); - - final Properties resultTopicConsumerConfig = new Properties(); - if (sslConfig != null) { - resultTopicConsumerConfig.putAll(sslConfig); - } - resultTopicConsumerConfig.putAll(TestUtils.consumerConfig( - bootstrapServers, - APP_ID + "-standard-consumer-" + OUTPUT_TOPIC, - LongDeserializer.class, - LongDeserializer.class)); + appID = testId + "-from-scratch"; + STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID); // RUN - KafkaStreams streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfiguration); + streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), STREAMS_CONFIG); streams.start(); - final List<KeyValue<Long, Long>> result = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived( - resultTopicConsumerConfig, - OUTPUT_TOPIC, - 10); + final List<KeyValue<Long, Long>> result = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(RESULT_CONSUMER_CONFIG, OUTPUT_TOPIC, 10); streams.close(); - TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT, + TestUtils.waitForCondition(consumerGroupInactiveCondition, TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT, "Streams Application consumer group did not time out after " + (TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT) + " ms."); // RESET - streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfiguration); + streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), STREAMS_CONFIG); streams.cleanUp(); - cleanGlobal(sslConfig, false, null, null); - TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT, + cleanGlobal(false, null, null); + TestUtils.waitForCondition(consumerGroupInactiveCondition, TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT, "Reset Tool consumer group did not time out after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms."); assertInternalTopicsGotDeleted(null); // RE-RUN streams.start(); - final List<KeyValue<Long, Long>> resultRerun = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived( - resultTopicConsumerConfig, - OUTPUT_TOPIC, - 10); + final List<KeyValue<Long, Long>> resultRerun = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(RESULT_CONSUMER_CONFIG, OUTPUT_TOPIC, 10); streams.close(); assertThat(resultRerun, equalTo(result)); - TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT, + TestUtils.waitForCondition(consumerGroupInactiveCondition, TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT, "Reset Tool consumer group did not time out after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms."); - cleanGlobal(sslConfig, false, null, null); + cleanGlobal(false, null, null); } void testReprocessingFromScratchAfterResetWithIntermediateUserTopic() throws Exception { cluster.createTopic(INTERMEDIATE_USER_TOPIC); - final Properties sslConfig = getClientSslConfig(); - final Properties streamsConfiguration = prepareTest(); - - final Properties resultTopicConsumerConfig = new Properties(); - if (sslConfig != null) { - resultTopicConsumerConfig.putAll(sslConfig); - } - resultTopicConsumerConfig.putAll(TestUtils.consumerConfig( - bootstrapServers, - APP_ID + "-standard-consumer-" + OUTPUT_TOPIC, - LongDeserializer.class, - LongDeserializer.class)); + appID = testId + "-from-scratch-with-intermediate-topic"; + STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID); // RUN - KafkaStreams streams = new KafkaStreams(setupTopologyWithIntermediateUserTopic(OUTPUT_TOPIC_2), streamsConfiguration); + streams = new KafkaStreams(setupTopologyWithIntermediateUserTopic(OUTPUT_TOPIC_2), STREAMS_CONFIG); streams.start(); - final List<KeyValue<Long, Long>> result = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived( - resultTopicConsumerConfig, - OUTPUT_TOPIC, - 10); + final List<KeyValue<Long, Long>> result = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(RESULT_CONSUMER_CONFIG, OUTPUT_TOPIC, 10); // receive only first values to make sure intermediate user topic is not consumed completely // => required to test "seekToEnd" for intermediate topics - final List<KeyValue<Long, Long>> result2 = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived( - resultTopicConsumerConfig, - OUTPUT_TOPIC_2, - 40 - ); + final List<KeyValue<Long, Long>> result2 = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(RESULT_CONSUMER_CONFIG, OUTPUT_TOPIC_2, 40); streams.close(); - TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT, + TestUtils.waitForCondition(consumerGroupInactiveCondition, TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT, "Streams Application consumer group did not time out after " + (TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT) + " ms."); // insert bad record to make sure intermediate user topic gets seekToEnd() mockTime.sleep(1); - Properties producerConfig = sslConfig; - if (producerConfig == null) { - producerConfig = new Properties(); - } - producerConfig.putAll(TestUtils.producerConfig(bootstrapServers, LongSerializer.class, StringSerializer.class)); + KeyValue<Long, String> badMessage = new KeyValue<>(-1L, "badRecord-ShouldBeSkipped"); IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp( INTERMEDIATE_USER_TOPIC, - Collections.singleton(new KeyValue<>(-1L, "badRecord-ShouldBeSkipped")), - producerConfig, + Collections.singleton(badMessage), + PRODUCER_CONFIG, mockTime.milliseconds()); // RESET - streams = new KafkaStreams(setupTopologyWithIntermediateUserTopic(OUTPUT_TOPIC_2_RERUN), streamsConfiguration); + streams = new KafkaStreams(setupTopologyWithIntermediateUserTopic(OUTPUT_TOPIC_2_RERUN), STREAMS_CONFIG); streams.cleanUp(); - cleanGlobal(sslConfig, true, null, null); - TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT, + cleanGlobal(true, null, null); + TestUtils.waitForCondition(consumerGroupInactiveCondition, TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT, "Reset Tool consumer group did not time out after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms."); assertInternalTopicsGotDeleted(INTERMEDIATE_USER_TOPIC); // RE-RUN streams.start(); - final List<KeyValue<Long, Long>> resultRerun = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived( - resultTopicConsumerConfig, - OUTPUT_TOPIC, - 10); - final List<KeyValue<Long, Long>> resultRerun2 = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived( - resultTopicConsumerConfig, - OUTPUT_TOPIC_2_RERUN, - 40); + final List<KeyValue<Long, Long>> resultRerun = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(RESULT_CONSUMER_CONFIG, OUTPUT_TOPIC, 10); + final List<KeyValue<Long, Long>> resultRerun2 = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(RESULT_CONSUMER_CONFIG, OUTPUT_TOPIC_2_RERUN, 40); streams.close(); assertThat(resultRerun, equalTo(result)); assertThat(resultRerun2, equalTo(result2)); - TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT, + final Properties props = TestUtils.consumerConfig(cluster.bootstrapServers(), testId + "-result-consumer", LongDeserializer.class, StringDeserializer.class, commonClientConfig); + final List<KeyValue<Long, String>> resultIntermediate = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(props, INTERMEDIATE_USER_TOPIC, 21); + + for (int i = 0; i < 10; i++) { + assertThat(resultIntermediate.get(i), equalTo(resultIntermediate.get(i + 11))); + } + assertThat(resultIntermediate.get(10), equalTo(badMessage)); + + TestUtils.waitForCondition(consumerGroupInactiveCondition, TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT, "Reset Tool consumer group did not time out after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms."); - cleanGlobal(sslConfig, true, null, null); + cleanGlobal(true, null, null); cluster.deleteTopicAndWait(INTERMEDIATE_USER_TOPIC); } void testReprocessingFromFileAfterResetWithoutIntermediateUserTopic() throws Exception { - final Properties sslConfig = getClientSslConfig(); - final Properties streamsConfiguration = prepareTest(); - - final Properties resultTopicConsumerConfig = new Properties(); - if (sslConfig != null) { - resultTopicConsumerConfig.putAll(sslConfig); - } - resultTopicConsumerConfig.putAll(TestUtils.consumerConfig( - bootstrapServers, - APP_ID + "-standard-consumer-" + OUTPUT_TOPIC, - LongDeserializer.class, - LongDeserializer.class)); + appID = testId + "-from-file"; + STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID); // RUN - KafkaStreams streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfiguration); + streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), STREAMS_CONFIG); streams.start(); - final List<KeyValue<Long, Long>> result = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived( - resultTopicConsumerConfig, - OUTPUT_TOPIC, - 10); + final List<KeyValue<Long, Long>> result = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(RESULT_CONSUMER_CONFIG, OUTPUT_TOPIC, 10); streams.close(); - TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT, + TestUtils.waitForCondition(consumerGroupInactiveCondition, TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT, "Streams Application consumer group did not time out after " + (TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT) + " ms."); // RESET @@ -314,11 +364,11 @@ abstract class AbstractResetIntegrationTest { writer.close(); } - streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfiguration); + streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), STREAMS_CONFIG); streams.cleanUp(); - cleanGlobal(sslConfig, false, "--from-file", resetFile.getAbsolutePath()); - TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT, + cleanGlobal(false, "--from-file", resetFile.getAbsolutePath()); + TestUtils.waitForCondition(consumerGroupInactiveCondition, TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT, "Reset Tool consumer group did not time out after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms."); assertInternalTopicsGotDeleted(null); @@ -327,44 +377,28 @@ abstract class AbstractResetIntegrationTest { // RE-RUN streams.start(); - final List<KeyValue<Long, Long>> resultRerun = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived( - resultTopicConsumerConfig, - OUTPUT_TOPIC, - 5); + final List<KeyValue<Long, Long>> resultRerun = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(RESULT_CONSUMER_CONFIG, OUTPUT_TOPIC, 5); streams.close(); result.remove(0); assertThat(resultRerun, equalTo(result)); - TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT, + TestUtils.waitForCondition(consumerGroupInactiveCondition, TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT, "Reset Tool consumer group did not time out after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms."); - cleanGlobal(sslConfig, false, null, null); + cleanGlobal(false, null, null); } void testReprocessingFromDateTimeAfterResetWithoutIntermediateUserTopic() throws Exception { - final Properties sslConfig = getClientSslConfig(); - final Properties streamsConfiguration = prepareTest(); - - final Properties resultTopicConsumerConfig = new Properties(); - if (sslConfig != null) { - resultTopicConsumerConfig.putAll(sslConfig); - } - resultTopicConsumerConfig.putAll(TestUtils.consumerConfig( - bootstrapServers, - APP_ID + "-standard-consumer-" + OUTPUT_TOPIC, - LongDeserializer.class, - LongDeserializer.class)); + appID = testId + "-from-datetime"; + STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID); // RUN - KafkaStreams streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfiguration); + streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), STREAMS_CONFIG); streams.start(); - final List<KeyValue<Long, Long>> result = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived( - resultTopicConsumerConfig, - OUTPUT_TOPIC, - 10); + final List<KeyValue<Long, Long>> result = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(RESULT_CONSUMER_CONFIG, OUTPUT_TOPIC, 10); streams.close(); - TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT, + TestUtils.waitForCondition(consumerGroupInactiveCondition, TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT, "Streams Application consumer group did not time out after " + (TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT) + " ms."); // RESET @@ -374,7 +408,7 @@ abstract class AbstractResetIntegrationTest { writer.close(); } - streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfiguration); + streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), STREAMS_CONFIG); streams.cleanUp(); @@ -382,8 +416,8 @@ abstract class AbstractResetIntegrationTest { final Calendar calendar = Calendar.getInstance(); calendar.add(Calendar.DATE, -1); - cleanGlobal(sslConfig, false, "--to-datetime", format.format(calendar.getTime())); - TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT, + cleanGlobal(false, "--to-datetime", format.format(calendar.getTime())); + TestUtils.waitForCondition(consumerGroupInactiveCondition, TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT, "Reset Tool consumer group did not time out after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms."); assertInternalTopicsGotDeleted(null); @@ -392,43 +426,27 @@ abstract class AbstractResetIntegrationTest { // RE-RUN streams.start(); - final List<KeyValue<Long, Long>> resultRerun = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived( - resultTopicConsumerConfig, - OUTPUT_TOPIC, - 10); + final List<KeyValue<Long, Long>> resultRerun = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(RESULT_CONSUMER_CONFIG, OUTPUT_TOPIC, 10); streams.close(); assertThat(resultRerun, equalTo(result)); - TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT, + TestUtils.waitForCondition(consumerGroupInactiveCondition, TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT, "Reset Tool consumer group did not time out after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms."); - cleanGlobal(sslConfig, false, null, null); + cleanGlobal(false, null, null); } void testReprocessingByDurationAfterResetWithoutIntermediateUserTopic() throws Exception { - final Properties sslConfig = getClientSslConfig(); - final Properties streamsConfiguration = prepareTest(); - - final Properties resultTopicConsumerConfig = new Properties(); - if (sslConfig != null) { - resultTopicConsumerConfig.putAll(sslConfig); - } - resultTopicConsumerConfig.putAll(TestUtils.consumerConfig( - bootstrapServers, - APP_ID + "-standard-consumer-" + OUTPUT_TOPIC, - LongDeserializer.class, - LongDeserializer.class)); + appID = testId + "-from-duration"; + STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID); // RUN - KafkaStreams streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfiguration); + streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), STREAMS_CONFIG); streams.start(); - final List<KeyValue<Long, Long>> result = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived( - resultTopicConsumerConfig, - OUTPUT_TOPIC, - 10); + final List<KeyValue<Long, Long>> result = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(RESULT_CONSUMER_CONFIG, OUTPUT_TOPIC, 10); streams.close(); - TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT, + TestUtils.waitForCondition(consumerGroupInactiveCondition, TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT, "Streams Application consumer group did not time out after " + (TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT) + " ms."); // RESET @@ -438,11 +456,11 @@ abstract class AbstractResetIntegrationTest { writer.close(); } - streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfiguration); + streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), STREAMS_CONFIG); streams.cleanUp(); - cleanGlobal(sslConfig, false, "--by-duration", "PT1M"); + cleanGlobal(false, "--by-duration", "PT1M"); - TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT, + TestUtils.waitForCondition(consumerGroupInactiveCondition, TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT, "Reset Tool consumer group did not time out after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms."); assertInternalTopicsGotDeleted(null); @@ -451,74 +469,14 @@ abstract class AbstractResetIntegrationTest { // RE-RUN streams.start(); - final List<KeyValue<Long, Long>> resultRerun = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived( - resultTopicConsumerConfig, - OUTPUT_TOPIC, - 10); + final List<KeyValue<Long, Long>> resultRerun = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(RESULT_CONSUMER_CONFIG, OUTPUT_TOPIC, 10); streams.close(); assertThat(resultRerun, equalTo(result)); - TestUtils.waitForCondition(consumerGroupInactive, TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT, + TestUtils.waitForCondition(consumerGroupInactiveCondition, TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT, "Reset Tool consumer group did not time out after " + (TIMEOUT_MULTIPLIER * CLEANUP_CONSUMER_TIMEOUT) + " ms."); - cleanGlobal(sslConfig, false, null, null); - } - - private Properties prepareTest() throws IOException { - Properties streamsConfiguration = getClientSslConfig(); - if (streamsConfiguration == null) { - streamsConfiguration = new Properties(); - } - streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + testNo); - streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); - streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); - streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Long().getClass()); - streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); - streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); - streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100); - streamsConfiguration.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100); - streamsConfiguration.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "" + STREAMS_CONSUMER_TIMEOUT); - streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - streamsConfiguration.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true); - - IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration); - - return streamsConfiguration; - } - - private void prepareInputData() throws Exception { - cluster.deleteAndRecreateTopics(INPUT_TOPIC, OUTPUT_TOPIC, OUTPUT_TOPIC_2, OUTPUT_TOPIC_2_RERUN); - - add10InputElements(); - } - - private void add10InputElements() throws java.util.concurrent.ExecutionException, InterruptedException { - Properties producerConfig = getClientSslConfig(); - if (producerConfig == null) { - producerConfig = new Properties(); - } - producerConfig.putAll(TestUtils.producerConfig(bootstrapServers, LongSerializer.class, StringSerializer.class)); - - mockTime.sleep(10); - IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(0L, "aaa")), producerConfig, mockTime.milliseconds()); - mockTime.sleep(10); - IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(1L, "bbb")), producerConfig, mockTime.milliseconds()); - mockTime.sleep(10); - IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(0L, "ccc")), producerConfig, mockTime.milliseconds()); - mockTime.sleep(10); - IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(1L, "ddd")), producerConfig, mockTime.milliseconds()); - mockTime.sleep(10); - IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(0L, "eee")), producerConfig, mockTime.milliseconds()); - mockTime.sleep(10); - IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(1L, "fff")), producerConfig, mockTime.milliseconds()); - mockTime.sleep(1); - IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(0L, "ggg")), producerConfig, mockTime.milliseconds()); - mockTime.sleep(1); - IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(1L, "hhh")), producerConfig, mockTime.milliseconds()); - mockTime.sleep(1); - IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(0L, "iii")), producerConfig, mockTime.milliseconds()); - mockTime.sleep(1); - IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(1L, "jjj")), producerConfig, mockTime.milliseconds()); + cleanGlobal(false, null, null); } private Topology setupTopologyWithIntermediateUserTopic(final String outputTopic2) { @@ -570,14 +528,13 @@ abstract class AbstractResetIntegrationTest { return builder.build(); } - private void cleanGlobal(final Properties sslConfig, - final boolean withIntermediateTopics, + private void cleanGlobal(final boolean withIntermediateTopics, final String resetScenario, final String resetScenarioArg) throws Exception { // leaving --zookeeper arg here to ensure tool works if users add it final List<String> parameterList = new ArrayList<>( - Arrays.asList("--application-id", APP_ID + testNo, - "--bootstrap-servers", bootstrapServers, + Arrays.asList("--application-id", appID, + "--bootstrap-servers", cluster.bootstrapServers(), "--input-topics", INPUT_TOPIC)); if (withIntermediateTopics) { parameterList.add("--intermediate-topics"); @@ -588,7 +545,7 @@ abstract class AbstractResetIntegrationTest { final BufferedWriter writer = new BufferedWriter(new FileWriter(configFile)); writer.write(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG + "=SSL\n"); writer.write(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG + "=" + sslConfig.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG) + "\n"); - writer.write(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG + "=" + sslConfig.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG) + "\n"); + writer.write(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG + "=" + ((Password) sslConfig.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG)).value() + "\n"); writer.close(); parameterList.add("--config-file"); @@ -607,36 +564,10 @@ abstract class AbstractResetIntegrationTest { cleanUpConfig.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100); cleanUpConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "" + CLEANUP_CONSUMER_TIMEOUT); - log.info("Calling StreamsResetter with parameters {} and configs {}", parameters, cleanUpConfig); - final int exitCode = new StreamsResetter().run(parameters, cleanUpConfig); Assert.assertEquals(0, exitCode); } - void shouldNotAllowToResetWhileStreamsIsRunning() throws Exception { - - final Properties streamsConfiguration = prepareTest(); - final List<String> parameterList = new ArrayList<>( - Arrays.asList("--application-id", APP_ID + testNo, - "--bootstrap-servers", bootstrapServers, - "--input-topics", NON_EXISTING_TOPIC)); - - final String[] parameters = parameterList.toArray(new String[parameterList.size()]); - final Properties cleanUpConfig = new Properties(); - cleanUpConfig.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100); - cleanUpConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "" + CLEANUP_CONSUMER_TIMEOUT); - - // RUN - KafkaStreams streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfiguration); - streams.start(); - - final int exitCode = new StreamsResetter().run(parameters, cleanUpConfig); - Assert.assertEquals(1, exitCode); - - streams.close(); - - } - private void assertInternalTopicsGotDeleted(final String intermediateUserTopic) throws Exception { // do not use list topics request, but read from the embedded cluster's zookeeper path directly to confirm if (intermediateUserTopic != null) { @@ -645,5 +576,4 @@ abstract class AbstractResetIntegrationTest { cluster.waitForRemainingTopics(30000, INPUT_TOPIC, OUTPUT_TOPIC, OUTPUT_TOPIC_2, OUTPUT_TOPIC_2_RERUN, TestUtils.GROUP_METADATA_TOPIC_NAME); } } - } diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java index ef9a67d..6c0cc5e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java @@ -18,20 +18,15 @@ package org.apache.kafka.streams.integration; import kafka.server.KafkaConfig$; import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; -import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.test.IntegrationTest; -import org.junit.AfterClass; -import org.junit.Assert; + +import org.junit.After; import org.junit.Before; import org.junit.ClassRule; import org.junit.Test; import org.junit.experimental.categories.Category; -import kafka.tools.StreamsResetter; import java.util.Properties; -import java.util.Arrays; -import java.util.ArrayList; -import java.util.List; /** @@ -42,31 +37,29 @@ public class ResetIntegrationTest extends AbstractResetIntegrationTest { @ClassRule public static final EmbeddedKafkaCluster CLUSTER; - private static final long CLEANUP_CONSUMER_TIMEOUT = 2000L; - private static final String APP_ID = "Integration-test"; - private static final String NON_EXISTING_TOPIC = "nonExistingTopic"; - private static int testNo = 1; + + private static final String TEST_ID = "reset-integration-test"; static { - final Properties props = new Properties(); + final Properties brokerProps = new Properties(); // we double the value passed to `time.sleep` in each iteration in one of the map functions, so we disable // expiration of connections by the brokers to avoid errors when `AdminClient` sends requests after potentially // very long sleep times - props.put(KafkaConfig$.MODULE$.ConnectionsMaxIdleMsProp(), -1L); - CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS, props); - cluster = CLUSTER; - } - - @AfterClass - public static void globalCleanup() { - afterClassGlobalCleanup(); + brokerProps.put(KafkaConfig$.MODULE$.ConnectionsMaxIdleMsProp(), -1L); + CLUSTER = new EmbeddedKafkaCluster(1, brokerProps); } @Before public void before() throws Exception { - beforePrepareTest(); + testId = TEST_ID; + cluster = CLUSTER; + prepareTest(); } + @After + public void after() throws Exception { + cleanupTest(); + } @Test public void testReprocessingFromScratchAfterResetWithoutIntermediateUserTopic() throws Exception { @@ -100,36 +93,11 @@ public class ResetIntegrationTest extends AbstractResetIntegrationTest { @Test public void shouldNotAllowToResetWhenInputTopicAbsent() throws Exception { - - final List<String> parameterList = new ArrayList<>( - Arrays.asList("--application-id", APP_ID + testNo, - "--bootstrap-servers", bootstrapServers, - "--input-topics", NON_EXISTING_TOPIC)); - - final String[] parameters = parameterList.toArray(new String[parameterList.size()]); - final Properties cleanUpConfig = new Properties(); - cleanUpConfig.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100); - cleanUpConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "" + CLEANUP_CONSUMER_TIMEOUT); - - final int exitCode = new StreamsResetter().run(parameters, cleanUpConfig); - Assert.assertEquals(1, exitCode); + super.shouldNotAllowToResetWhenInputTopicAbsent(); } @Test public void shouldNotAllowToResetWhenIntermediateTopicAbsent() throws Exception { - - final List<String> parameterList = new ArrayList<>( - Arrays.asList("--application-id", APP_ID + testNo, - "--bootstrap-servers", bootstrapServers, - "--intermediate-topics", NON_EXISTING_TOPIC)); - - final String[] parameters = parameterList.toArray(new String[parameterList.size()]); - final Properties cleanUpConfig = new Properties(); - cleanUpConfig.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100); - cleanUpConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "" + CLEANUP_CONSUMER_TIMEOUT); - - final int exitCode = new StreamsResetter().run(parameters, cleanUpConfig); - Assert.assertEquals(1, exitCode); + super.shouldNotAllowToResetWhenIntermediateTopicAbsent(); } - } diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationWithSslTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationWithSslTest.java index abf4c38..cfdcfb0 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationWithSslTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationWithSslTest.java @@ -17,21 +17,17 @@ package org.apache.kafka.streams.integration; import kafka.server.KafkaConfig$; -import org.apache.kafka.clients.CommonClientConfigs; -import org.apache.kafka.common.config.SslConfigs; -import org.apache.kafka.common.config.types.Password; import org.apache.kafka.common.network.Mode; import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; import org.apache.kafka.test.IntegrationTest; import org.apache.kafka.test.TestSslUtils; import org.apache.kafka.test.TestUtils; -import org.junit.AfterClass; +import org.junit.After; import org.junit.Before; import org.junit.ClassRule; import org.junit.Test; import org.junit.experimental.categories.Category; -import java.util.Map; import java.util.Properties; /** @@ -40,54 +36,50 @@ import java.util.Properties; @Category({IntegrationTest.class}) public class ResetIntegrationWithSslTest extends AbstractResetIntegrationTest { - private static Map<String, Object> sslConfig; - static { - try { - sslConfig = TestSslUtils.createSslConfig(false, true, Mode.SERVER, TestUtils.tempFile(), "testCert"); - } catch (final Exception e) { - throw new RuntimeException(e); - } - } - @ClassRule public static final EmbeddedKafkaCluster CLUSTER; + private static final String TEST_ID = "reset-with-ssl-integration-test"; + static { - final Properties props = new Properties(); + final Properties brokerProps = new Properties(); // we double the value passed to `time.sleep` in each iteration in one of the map functions, so we disable // expiration of connections by the brokers to avoid errors when `AdminClient` sends requests after potentially // very long sleep times - props.put(KafkaConfig$.MODULE$.ConnectionsMaxIdleMsProp(), -1L); - props.put(KafkaConfig$.MODULE$.ListenersProp(), "SSL://localhost:0"); - props.put(KafkaConfig$.MODULE$.InterBrokerListenerNameProp(), "SSL"); - props.putAll(sslConfig); - CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS, props); - cluster = CLUSTER; - } + brokerProps.put(KafkaConfig$.MODULE$.ConnectionsMaxIdleMsProp(), -1L); + + try { + sslConfig = TestSslUtils.createSslConfig(false, true, Mode.SERVER, TestUtils.tempFile(), "testCert"); + + brokerProps.put(KafkaConfig$.MODULE$.ListenersProp(), "SSL://localhost:0"); + brokerProps.put(KafkaConfig$.MODULE$.InterBrokerListenerNameProp(), "SSL"); + brokerProps.putAll(sslConfig); + } catch (final Exception e) { + throw new RuntimeException(e); + } - @AfterClass - public static void globalCleanup() { - afterClassGlobalCleanup(); + CLUSTER = new EmbeddedKafkaCluster(1, brokerProps); } @Before public void before() throws Exception { - beforePrepareTest(); + testId = TEST_ID; + cluster = CLUSTER; + prepareTest(); } - Properties getClientSslConfig() { - final Properties props = new Properties(); - - props.put("bootstrap.servers", CLUSTER.bootstrapServers()); - props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, sslConfig.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG)); - props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, ((Password) sslConfig.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG)).value()); - props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL"); - - return props; + @After + public void after() throws Exception { + cleanupTest(); } @Test public void testReprocessingFromScratchAfterResetWithoutIntermediateUserTopic() throws Exception { super.testReprocessingFromScratchAfterResetWithoutIntermediateUserTopic(); } + + @Test + public void testReprocessingFromScratchAfterResetWithIntermediateUserTopic() throws Exception { + super.testReprocessingFromScratchAfterResetWithIntermediateUserTopic(); + } } -- To stop receiving notification emails like this one, please contact ['"commits@kafka.apache.org" <commits@kafka.apache.org>'].