http://git-wip-us.apache.org/repos/asf/kafka/blob/ebc7f7ca/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java index cfd702e..2dd4553 100644 --- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java @@ -32,18 +32,24 @@ import org.junit.Test; import java.util.Arrays; import java.util.HashMap; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.Properties; +import static org.apache.kafka.common.requests.IsolationLevel.READ_COMMITTED; +import static org.apache.kafka.common.requests.IsolationLevel.READ_UNCOMMITTED; +import static org.apache.kafka.streams.StreamsConfig.EXACTLY_ONCE; import static org.apache.kafka.streams.StreamsConfig.consumerPrefix; import static org.apache.kafka.streams.StreamsConfig.producerPrefix; -import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.core.IsEqual.equalTo; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; public class StreamsConfigTest { - private Properties props = new Properties(); + private final Properties props = new Properties(); private StreamsConfig streamsConfig; @Before @@ -58,40 +64,56 @@ public class StreamsConfigTest { streamsConfig = new StreamsConfig(props); } + @Test(expected = ConfigException.class) + public void shouldThrowExceptionIfApplicationIdIsNotSet() { + props.remove(StreamsConfig.APPLICATION_ID_CONFIG); + new StreamsConfig(props); + } + + @Test(expected = ConfigException.class) + public void shouldThrowExceptionIfBootstrapServersIsNotSet() { + props.remove(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG); + new StreamsConfig(props); + } + @Test public void testGetProducerConfigs() throws Exception { - Map<String, Object> returnedProps = streamsConfig.getProducerConfigs("client"); - assertEquals(returnedProps.get(ProducerConfig.CLIENT_ID_CONFIG), "client-producer"); + final String clientId = "client"; + final Map<String, Object> returnedProps = streamsConfig.getProducerConfigs(clientId); + assertEquals(returnedProps.get(ProducerConfig.CLIENT_ID_CONFIG), clientId + "-producer"); assertEquals(returnedProps.get(ProducerConfig.LINGER_MS_CONFIG), "100"); assertNull(returnedProps.get("DUMMY")); } @Test public void testGetConsumerConfigs() throws Exception { - Map<String, Object> returnedProps = streamsConfig.getConsumerConfigs(null, "example-application", "client"); - assertEquals(returnedProps.get(ConsumerConfig.CLIENT_ID_CONFIG), "client-consumer"); - assertEquals(returnedProps.get(ConsumerConfig.GROUP_ID_CONFIG), "example-application"); + final String groupId = "example-application"; + final String clientId = "client"; + final Map<String, Object> returnedProps = streamsConfig.getConsumerConfigs(null, groupId, clientId); + assertEquals(returnedProps.get(ConsumerConfig.CLIENT_ID_CONFIG), clientId + "-consumer"); + assertEquals(returnedProps.get(ConsumerConfig.GROUP_ID_CONFIG), groupId); assertEquals(returnedProps.get(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), "1000"); assertNull(returnedProps.get("DUMMY")); } @Test public void testGetRestoreConsumerConfigs() throws Exception { - Map<String, Object> returnedProps = streamsConfig.getRestoreConsumerConfigs("client"); - assertEquals(returnedProps.get(ConsumerConfig.CLIENT_ID_CONFIG), "client-restore-consumer"); + final String clientId = "client"; + final Map<String, Object> returnedProps = streamsConfig.getRestoreConsumerConfigs(clientId); + assertEquals(returnedProps.get(ConsumerConfig.CLIENT_ID_CONFIG), clientId + "-restore-consumer"); assertNull(returnedProps.get(ConsumerConfig.GROUP_ID_CONFIG)); assertNull(returnedProps.get("DUMMY")); } @Test public void defaultSerdeShouldBeConfigured() { - Map<String, Object> serializerConfigs = new HashMap<>(); + final Map<String, Object> serializerConfigs = new HashMap<>(); serializerConfigs.put("key.serializer.encoding", "UTF8"); serializerConfigs.put("value.serializer.encoding", "UTF-16"); - Serializer<String> serializer = Serdes.String().serializer(); + final Serializer<String> serializer = Serdes.String().serializer(); - String str = "my string for testing"; - String topic = "my topic"; + final String str = "my string for testing"; + final String topic = "my topic"; serializer.configure(serializerConfigs, true); assertEquals("Should get the original string after serialization and deserialization with the configured encoding", @@ -104,14 +126,14 @@ public class StreamsConfigTest { @Test public void shouldSupportMultipleBootstrapServers() { - List<String> expectedBootstrapServers = Arrays.asList("broker1:9092", "broker2:9092"); - String bootstrapServersString = Utils.join(expectedBootstrapServers, ","); - Properties props = new Properties(); + final List<String> expectedBootstrapServers = Arrays.asList("broker1:9092", "broker2:9092"); + final String bootstrapServersString = Utils.join(expectedBootstrapServers, ","); + final Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "irrelevant"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServersString); - StreamsConfig config = new StreamsConfig(props); + final StreamsConfig config = new StreamsConfig(props); - List<String> actualBootstrapServers = config.getList(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG); + final List<String> actualBootstrapServers = config.getList(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG); assertEquals(expectedBootstrapServers, actualBootstrapServers); } @@ -165,7 +187,7 @@ public class StreamsConfigTest { props.put(producerPrefix(ProducerConfig.BUFFER_MEMORY_CONFIG), 10); props.put(producerPrefix(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG), 1); final StreamsConfig streamsConfig = new StreamsConfig(props); - final Map<String, Object> configs = streamsConfig.getProducerConfigs("client"); + final Map<String, Object> configs = streamsConfig.getProducerConfigs("clientId"); assertEquals(10, configs.get(ProducerConfig.BUFFER_MEMORY_CONFIG)); assertEquals(1, configs.get(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG)); } @@ -195,7 +217,7 @@ public class StreamsConfigTest { props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 10); props.put(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG, 1); final StreamsConfig streamsConfig = new StreamsConfig(props); - final Map<String, Object> configs = streamsConfig.getProducerConfigs("client"); + final Map<String, Object> configs = streamsConfig.getProducerConfigs("clientId"); assertEquals(10, configs.get(ProducerConfig.BUFFER_MEMORY_CONFIG)); assertEquals(1, configs.get(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG)); } @@ -230,7 +252,7 @@ public class StreamsConfigTest { public void shouldOverrideStreamsDefaultProducerConfigs() throws Exception { props.put(StreamsConfig.producerPrefix(ProducerConfig.LINGER_MS_CONFIG), "10000"); final StreamsConfig streamsConfig = new StreamsConfig(props); - final Map<String, Object> producerConfigs = streamsConfig.getProducerConfigs("client"); + final Map<String, Object> producerConfigs = streamsConfig.getProducerConfigs("clientId"); assertEquals("10000", producerConfigs.get(ProducerConfig.LINGER_MS_CONFIG)); } @@ -239,7 +261,7 @@ public class StreamsConfigTest { props.put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "latest"); props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), "10"); final StreamsConfig streamsConfig = new StreamsConfig(props); - final Map<String, Object> consumerConfigs = streamsConfig.getRestoreConsumerConfigs("client"); + final Map<String, Object> consumerConfigs = streamsConfig.getRestoreConsumerConfigs("clientId"); assertEquals("latest", consumerConfigs.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)); assertEquals("10", consumerConfigs.get(ConsumerConfig.MAX_POLL_RECORDS_CONFIG)); } @@ -261,10 +283,113 @@ public class StreamsConfigTest { @Test public void shouldSetInternalLeaveGroupOnCloseConfigToFalseInConsumer() throws Exception { final StreamsConfig streamsConfig = new StreamsConfig(props); - final Map<String, Object> consumerConfigs = streamsConfig.getConsumerConfigs(null, "group", "client"); + final Map<String, Object> consumerConfigs = streamsConfig.getConsumerConfigs(null, "groupId", "clientId"); assertThat(consumerConfigs.get("internal.leave.group.on.close"), CoreMatchers.<Object>equalTo(false)); } + @Test + public void shouldAcceptAtLeastOnce() { + // don't use `StreamsConfig.AT_LEAST_ONCE` to actually do a useful test + props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, "at_least_once"); + new StreamsConfig(props); + } + + @Test + public void shouldAcceptExactlyOnce() { + // don't use `StreamsConfig.EXACLTY_ONCE` to actually do a useful test + props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, "exactly_once"); + new StreamsConfig(props); + } + + @Test(expected = ConfigException.class) + public void shouldThrowExceptionIfNotAtLestOnceOrExactlyOnce() { + props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, "bad_value"); + new StreamsConfig(props); + } + + @Test(expected = ConfigException.class) + public void shouldThrowExceptionIfConsumerIsolationLevelIsOverriddenIfEosEnabled() { + props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE); + props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "anyValue"); + final StreamsConfig streamsConfig = new StreamsConfig(props); + streamsConfig.getConsumerConfigs(null, "groupId", "clientId"); + } + + @Test + public void shouldAllowSettingConsumerIsolationLevelIfEosDisabled() { + props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, READ_UNCOMMITTED.name().toLowerCase(Locale.ROOT)); + final StreamsConfig streamsConfig = new StreamsConfig(props); + streamsConfig.getConsumerConfigs(null, "groupId", "clientId"); + } + + + @Test(expected = ConfigException.class) + public void shouldThrowExceptionIfProducerEnableIdempotenceIsOverriddenIfEosEnabled() { + props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE); + props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "anyValue"); + final StreamsConfig streamsConfig = new StreamsConfig(props); + streamsConfig.getProducerConfigs("clientId"); + } + + @Test + public void shouldAllowSettingProducerEnableIdempotenceIfEosDisabled() { + props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); + final StreamsConfig streamsConfig = new StreamsConfig(props); + streamsConfig.getProducerConfigs("clientId"); + } + + @Test(expected = ConfigException.class) + public void shouldThrowExceptionIfProducerMaxInFlightRequestPerConnectionsIsOverriddenIfEosEnabled() { + props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE); + props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "anyValue"); + final StreamsConfig streamsConfig = new StreamsConfig(props); + streamsConfig.getProducerConfigs("clientId"); + } + + @Test + public void shouldAllowSettingProducerMaxInFlightRequestPerConnectionsWhenEosDisabled() { + props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "anyValue"); + final StreamsConfig streamsConfig = new StreamsConfig(props); + streamsConfig.getProducerConfigs("clientId"); + } + + @Test + public void shouldSetDifferentDefaultsIfEosEnabled() { + props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE); + final StreamsConfig streamsConfig = new StreamsConfig(props); + + final Map<String, Object> consumerConfigs = streamsConfig.getConsumerConfigs(null, "groupId", "clientId"); + final Map<String, Object> producerConfigs = streamsConfig.getProducerConfigs("clientId"); + + assertThat((String) consumerConfigs.get(ConsumerConfig.ISOLATION_LEVEL_CONFIG), equalTo(READ_COMMITTED.name().toLowerCase(Locale.ROOT))); + assertTrue((Boolean) producerConfigs.get(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG)); + assertThat((Integer) producerConfigs.get(ProducerConfig.RETRIES_CONFIG), equalTo(Integer.MAX_VALUE)); + assertThat((Integer) producerConfigs.get(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION), equalTo(1)); + assertThat(streamsConfig.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG), equalTo(100L)); + } + + @Test + public void shouldNotOverrideUserConfigRetriesIfExactlyOnceEnabled() { + final int numberOfRetries = 42; + props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE); + props.put(ProducerConfig.RETRIES_CONFIG, numberOfRetries); + final StreamsConfig streamsConfig = new StreamsConfig(props); + + final Map<String, Object> producerConfigs = streamsConfig.getProducerConfigs("clientId"); + + assertThat((Integer) producerConfigs.get(ProducerConfig.RETRIES_CONFIG), equalTo(numberOfRetries)); + } + + @Test + public void shouldNotOverrideUserConfigCommitIntervalMsIfExactlyOnceEnabled() { + final long commitIntervalMs = 73L; + props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE); + props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, commitIntervalMs); + final StreamsConfig streamsConfig = new StreamsConfig(props); + + assertThat(streamsConfig.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG), equalTo(commitIntervalMs)); + } + static class MisconfiguredSerde implements Serde { @Override public void configure(final Map configs, final boolean isKey) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/ebc7f7ca/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java index 5705ea6..ba3230a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java @@ -27,6 +27,7 @@ import org.apache.kafka.common.errors.WakeupException; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Time; +import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.errors.ProcessorStateException; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.TaskId; @@ -35,6 +36,7 @@ import org.apache.kafka.test.TestUtils; import org.junit.Test; import java.util.Collections; +import java.util.Properties; public class AbstractTaskTest { @@ -61,6 +63,10 @@ public class AbstractTaskTest { private AbstractTask createTask(final Consumer consumer) { final MockTime time = new MockTime(); + final Properties properties = new Properties(); + properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-id"); + properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummyhost:9092"); + final StreamsConfig config = new StreamsConfig(properties); return new AbstractTask(new TaskId(0, 0), "app", Collections.singletonList(new TopicPartition("t", 0)), @@ -74,7 +80,8 @@ public class AbstractTaskTest { new StoreChangelogReader(consumer, Time.SYSTEM, 5000), false, new StateDirectory("app", TestUtils.tempDirectory().getPath(), time), - new ThreadCache("testCache", 0, new MockStreamsMetrics(new Metrics()))) { + new ThreadCache("testCache", 0, new MockStreamsMetrics(new Metrics())), + config) { @Override public void resume() {} @@ -85,7 +92,7 @@ public class AbstractTaskTest { public void suspend() {} @Override - public void close() {} + public void close(final boolean clean) {} }; } @@ -98,4 +105,4 @@ public class AbstractTaskTest { }; } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/kafka/blob/ebc7f7ca/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java index 9f54b4a..476b009 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java @@ -16,8 +16,6 @@ */ package org.apache.kafka.streams.processor.internals; -import static org.junit.Assert.assertEquals; - import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.Deserializer; @@ -33,6 +31,8 @@ import java.util.Arrays; import java.util.HashMap; import java.util.List; +import static org.junit.Assert.assertEquals; + public class PartitionGroupTest { private final Serializer<Integer> intSerializer = new IntegerSerializer(); private final Deserializer<Integer> intDeserializer = new IntegerDeserializer(); @@ -58,7 +58,7 @@ public class PartitionGroupTest { assertEquals(0, group.numBuffered()); // add three 3 records with timestamp 1, 3, 5 to partition-1 - List<ConsumerRecord<byte[], byte[]>> list1 = Arrays.asList( + final List<ConsumerRecord<byte[], byte[]>> list1 = Arrays.asList( new ConsumerRecord<>("topic", 1, 1L, recordKey, recordValue), new ConsumerRecord<>("topic", 1, 3L, recordKey, recordValue), new ConsumerRecord<>("topic", 1, 5L, recordKey, recordValue)); @@ -66,7 +66,7 @@ public class PartitionGroupTest { group.addRawRecords(partition1, list1); // add three 3 records with timestamp 2, 4, 6 to partition-2 - List<ConsumerRecord<byte[], byte[]>> list2 = Arrays.asList( + final List<ConsumerRecord<byte[], byte[]>> list2 = Arrays.asList( new ConsumerRecord<>("topic", 2, 2L, recordKey, recordValue), new ConsumerRecord<>("topic", 2, 4L, recordKey, recordValue), new ConsumerRecord<>("topic", 2, 6L, recordKey, recordValue)); @@ -79,7 +79,7 @@ public class PartitionGroupTest { assertEquals(1L, group.timestamp()); StampedRecord record; - PartitionGroup.RecordInfo info = new PartitionGroup.RecordInfo(); + final PartitionGroup.RecordInfo info = new PartitionGroup.RecordInfo(); // get one record, now the time should be advanced record = group.nextRecord(info); @@ -100,7 +100,7 @@ public class PartitionGroupTest { assertEquals(3L, group.timestamp()); // add three 3 records with timestamp 2, 4, 6 to partition-1 again - List<ConsumerRecord<byte[], byte[]>> list3 = Arrays.asList( + final List<ConsumerRecord<byte[], byte[]>> list3 = Arrays.asList( new ConsumerRecord<>("topic", 1, 2L, recordKey, recordValue), new ConsumerRecord<>("topic", 1, 4L, recordKey, recordValue)); http://git-wip-us.apache.org/repos/asf/kafka/blob/ebc7f7ca/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java index cfc1022..f454216 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java @@ -45,6 +45,7 @@ import java.util.Set; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; @@ -90,15 +91,23 @@ public class ProcessorStateManagerTest { public void testRegisterPersistentStore() throws IOException { final TaskId taskId = new TaskId(0, 2); - MockStateStoreSupplier.MockStateStore persistentStore = new MockStateStoreSupplier.MockStateStore("persistentStore", true); // persistent store - ProcessorStateManager stateMgr = new ProcessorStateManager(taskId, noPartitions, false, stateDirectory, new HashMap<String, String>() { - { - put(persistentStoreName, persistentStoreTopicName); - put(nonPersistentStoreName, nonPersistentStoreName); - } - }, changelogReader); - try { + final MockStateStoreSupplier.MockStateStore persistentStore + = new MockStateStoreSupplier.MockStateStore("persistentStore", true); // persistent store + final ProcessorStateManager stateMgr = new ProcessorStateManager( + taskId, + noPartitions, + false, + stateDirectory, + new HashMap<String, String>() { + { + put(persistentStoreName, persistentStoreTopicName); + put(nonPersistentStoreName, nonPersistentStoreName); + } + }, + changelogReader, + false); + try { stateMgr.register(persistentStore, true, persistentStore.stateRestoreCallback); assertTrue(changelogReader.wasRegistered(new TopicPartition(persistentStoreTopicName, 2))); } finally { @@ -108,18 +117,25 @@ public class ProcessorStateManagerTest { @Test public void testRegisterNonPersistentStore() throws IOException { - MockStateStoreSupplier.MockStateStore nonPersistentStore = new MockStateStoreSupplier.MockStateStore(nonPersistentStoreName, false); // non persistent store - ProcessorStateManager stateMgr = new ProcessorStateManager(new TaskId(0, 2), noPartitions, false, stateDirectory, new HashMap<String, String>() { - { - put(persistentStoreName, persistentStoreTopicName); - put(nonPersistentStoreName, nonPersistentStoreTopicName); - } - }, changelogReader); - try { + final MockStateStoreSupplier.MockStateStore nonPersistentStore + = new MockStateStoreSupplier.MockStateStore(nonPersistentStoreName, false); // non persistent store + final ProcessorStateManager stateMgr = new ProcessorStateManager( + new TaskId(0, 2), + noPartitions, + false, + stateDirectory, + new HashMap<String, String>() { + { + put(persistentStoreName, persistentStoreTopicName); + put(nonPersistentStoreName, nonPersistentStoreTopicName); + } + }, + changelogReader, + false); + try { stateMgr.register(nonPersistentStore, true, nonPersistentStore.stateRestoreCallback); assertTrue(changelogReader.wasRegistered(new TopicPartition(nonPersistentStoreTopicName, 2))); - } finally { stateMgr.close(Collections.<TopicPartition, Long>emptyMap()); } @@ -128,41 +144,49 @@ public class ProcessorStateManagerTest { @Test public void testChangeLogOffsets() throws IOException { final TaskId taskId = new TaskId(0, 0); - long lastCheckpointedOffset = 10L; - String storeName1 = "store1"; - String storeName2 = "store2"; - String storeName3 = "store3"; + final long lastCheckpointedOffset = 10L; + final String storeName1 = "store1"; + final String storeName2 = "store2"; + final String storeName3 = "store3"; - String storeTopicName1 = ProcessorStateManager.storeChangelogTopic(applicationId, storeName1); - String storeTopicName2 = ProcessorStateManager.storeChangelogTopic(applicationId, storeName2); - String storeTopicName3 = ProcessorStateManager.storeChangelogTopic(applicationId, storeName3); + final String storeTopicName1 = ProcessorStateManager.storeChangelogTopic(applicationId, storeName1); + final String storeTopicName2 = ProcessorStateManager.storeChangelogTopic(applicationId, storeName2); + final String storeTopicName3 = ProcessorStateManager.storeChangelogTopic(applicationId, storeName3); - Map<String, String> storeToChangelogTopic = new HashMap<>(); + final Map<String, String> storeToChangelogTopic = new HashMap<>(); storeToChangelogTopic.put(storeName1, storeTopicName1); storeToChangelogTopic.put(storeName2, storeTopicName2); storeToChangelogTopic.put(storeName3, storeTopicName3); - OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(stateDirectory.directoryForTask(taskId), ProcessorStateManager.CHECKPOINT_FILE_NAME)); + final OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(stateDirectory.directoryForTask(taskId), ProcessorStateManager.CHECKPOINT_FILE_NAME)); checkpoint.write(Collections.singletonMap(new TopicPartition(storeTopicName1, 0), lastCheckpointedOffset)); - TopicPartition partition1 = new TopicPartition(storeTopicName1, 0); - TopicPartition partition2 = new TopicPartition(storeTopicName2, 0); - TopicPartition partition3 = new TopicPartition(storeTopicName3, 1); + final TopicPartition partition1 = new TopicPartition(storeTopicName1, 0); + final TopicPartition partition2 = new TopicPartition(storeTopicName2, 0); + final TopicPartition partition3 = new TopicPartition(storeTopicName3, 1); - MockStateStoreSupplier.MockStateStore store1 = new MockStateStoreSupplier.MockStateStore(storeName1, true); - MockStateStoreSupplier.MockStateStore store2 = new MockStateStoreSupplier.MockStateStore(storeName2, true); - MockStateStoreSupplier.MockStateStore store3 = new MockStateStoreSupplier.MockStateStore(storeName3, true); + final MockStateStoreSupplier.MockStateStore store1 = new MockStateStoreSupplier.MockStateStore(storeName1, true); + final MockStateStoreSupplier.MockStateStore store2 = new MockStateStoreSupplier.MockStateStore(storeName2, true); + final MockStateStoreSupplier.MockStateStore store3 = new MockStateStoreSupplier.MockStateStore(storeName3, true); // if there is a source partition, inherit the partition id - Set<TopicPartition> sourcePartitions = Utils.mkSet(new TopicPartition(storeTopicName3, 1)); + final Set<TopicPartition> sourcePartitions = Utils.mkSet(new TopicPartition(storeTopicName3, 1)); + + final ProcessorStateManager stateMgr = new ProcessorStateManager( + taskId, + sourcePartitions, + true, // standby + stateDirectory, + storeToChangelogTopic, + changelogReader, + false); - ProcessorStateManager stateMgr = new ProcessorStateManager(taskId, sourcePartitions, true, stateDirectory, storeToChangelogTopic, changelogReader); // standby try { stateMgr.register(store1, true, store1.stateRestoreCallback); stateMgr.register(store2, true, store2.stateRestoreCallback); stateMgr.register(store3, true, store3.stateRestoreCallback); - Map<TopicPartition, Long> changeLogOffsets = stateMgr.checkpointed(); + final Map<TopicPartition, Long> changeLogOffsets = stateMgr.checkpointed(); assertEquals(3, changeLogOffsets.size()); assertTrue(changeLogOffsets.containsKey(partition1)); @@ -180,7 +204,14 @@ public class ProcessorStateManagerTest { @Test public void testGetStore() throws IOException { final MockStateStoreSupplier.MockStateStore mockStateStore = new MockStateStoreSupplier.MockStateStore(nonPersistentStoreName, false); - final ProcessorStateManager stateMgr = new ProcessorStateManager(new TaskId(0, 1), noPartitions, false, stateDirectory, Collections.<String, String>emptyMap(), changelogReader); + final ProcessorStateManager stateMgr = new ProcessorStateManager( + new TaskId(0, 1), + noPartitions, + false, + stateDirectory, + Collections.<String, String>emptyMap(), + changelogReader, + false); try { stateMgr.register(mockStateStore, true, mockStateStore.stateRestoreCallback); @@ -202,12 +233,19 @@ public class ProcessorStateManagerTest { ackedOffsets.put(new TopicPartition(nonPersistentStoreTopicName, 1), 456L); ackedOffsets.put(new TopicPartition(ProcessorStateManager.storeChangelogTopic(applicationId, "otherTopic"), 1), 789L); - ProcessorStateManager stateMgr = new ProcessorStateManager(taskId, noPartitions, false, stateDirectory, new HashMap<String, String>() { - { - put(persistentStoreName, persistentStoreTopicName); - put(nonPersistentStoreName, nonPersistentStoreTopicName); - } - }, changelogReader); + final ProcessorStateManager stateMgr = new ProcessorStateManager( + taskId, + noPartitions, + false, + stateDirectory, + new HashMap<String, String>() { + { + put(persistentStoreName, persistentStoreTopicName); + put(nonPersistentStoreName, nonPersistentStoreTopicName); + } + }, + changelogReader, + false); try { // make sure the checkpoint file isn't deleted assertTrue(checkpointFile.exists()); @@ -234,7 +272,14 @@ public class ProcessorStateManagerTest { @Test public void shouldRegisterStoreWithoutLoggingEnabledAndNotBackedByATopic() throws Exception { - final ProcessorStateManager stateMgr = new ProcessorStateManager(new TaskId(0, 1), noPartitions, false, stateDirectory, Collections.<String, String>emptyMap(), changelogReader); + final ProcessorStateManager stateMgr = new ProcessorStateManager( + new TaskId(0, 1), + noPartitions, + false, + stateDirectory, + Collections.<String, String>emptyMap(), + changelogReader, + false); stateMgr.register(nonPersistentStore, false, nonPersistentStore.stateRestoreCallback); assertNotNull(stateMgr.getStore(nonPersistentStoreName)); } @@ -245,7 +290,14 @@ public class ProcessorStateManagerTest { checkpoint.write(offsets); final MockStateStoreSupplier.MockStateStore persistentStore = new MockStateStoreSupplier.MockStateStore(persistentStoreName, true); - final ProcessorStateManager stateMgr = new ProcessorStateManager(taskId, noPartitions, false, stateDirectory, Collections.<String, String>emptyMap(), changelogReader); + final ProcessorStateManager stateMgr = new ProcessorStateManager( + taskId, + noPartitions, + false, + stateDirectory, + Collections.<String, String>emptyMap(), + changelogReader, + false); stateMgr.register(persistentStore, true, persistentStore.stateRestoreCallback); stateMgr.close(null); final Map<TopicPartition, Long> read = checkpoint.read(); @@ -254,13 +306,14 @@ public class ProcessorStateManagerTest { @Test public void shouldWriteCheckpointForPersistentLogEnabledStore() throws Exception { - final ProcessorStateManager stateMgr = new ProcessorStateManager(taskId, - noPartitions, - false, - stateDirectory, - Collections.singletonMap(persistentStore.name(), - persistentStoreTopicName), - changelogReader); + final ProcessorStateManager stateMgr = new ProcessorStateManager( + taskId, + noPartitions, + false, + stateDirectory, + Collections.singletonMap(persistentStore.name(), persistentStoreTopicName), + changelogReader, + false); stateMgr.register(persistentStore, true, persistentStore.stateRestoreCallback); @@ -271,13 +324,14 @@ public class ProcessorStateManagerTest { @Test public void shouldWriteCheckpointForStandbyReplica() throws Exception { - final ProcessorStateManager stateMgr = new ProcessorStateManager(taskId, - noPartitions, - true, - stateDirectory, - Collections.singletonMap(persistentStore.name(), - persistentStoreTopicName), - changelogReader); + final ProcessorStateManager stateMgr = new ProcessorStateManager( + taskId, + noPartitions, + true, // standby + stateDirectory, + Collections.singletonMap(persistentStore.name(), persistentStoreTopicName), + changelogReader, + false); stateMgr.register(persistentStore, true, persistentStore.stateRestoreCallback); final byte[] bytes = Serdes.Integer().serializer().serialize("", 10); @@ -300,14 +354,14 @@ public class ProcessorStateManagerTest { public void shouldNotWriteCheckpointForNonPersistent() throws Exception { final TopicPartition topicPartition = new TopicPartition(nonPersistentStoreTopicName, 1); - - final ProcessorStateManager stateMgr = new ProcessorStateManager(taskId, - noPartitions, - true, - stateDirectory, - Collections.singletonMap(nonPersistentStoreName, - nonPersistentStoreTopicName), - changelogReader); + final ProcessorStateManager stateMgr = new ProcessorStateManager( + taskId, + noPartitions, + true, // standby + stateDirectory, + Collections.singletonMap(nonPersistentStoreName, nonPersistentStoreTopicName), + changelogReader, + false); stateMgr.register(nonPersistentStore, true, nonPersistentStore.stateRestoreCallback); stateMgr.checkpoint(Collections.singletonMap(topicPartition, 876L)); @@ -318,12 +372,14 @@ public class ProcessorStateManagerTest { @Test public void shouldNotWriteCheckpointForStoresWithoutChangelogTopic() throws Exception { - final ProcessorStateManager stateMgr = new ProcessorStateManager(taskId, - noPartitions, - true, - stateDirectory, - Collections.<String, String>emptyMap(), - changelogReader); + final ProcessorStateManager stateMgr = new ProcessorStateManager( + taskId, + noPartitions, + true, // standby + stateDirectory, + Collections.<String, String>emptyMap(), + changelogReader, + false); stateMgr.register(persistentStore, true, persistentStore.stateRestoreCallback); @@ -344,7 +400,14 @@ public class ProcessorStateManagerTest { final FileLock lock = channel.lock(); try { - new ProcessorStateManager(taskId, noPartitions, false, stateDirectory, Collections.<String, String>emptyMap(), changelogReader); + new ProcessorStateManager( + taskId, + noPartitions, + false, + stateDirectory, + Collections.<String, String>emptyMap(), + changelogReader, + false); fail("Should have thrown LockException"); } catch (final LockException e) { // pass @@ -356,11 +419,14 @@ public class ProcessorStateManagerTest { @Test public void shouldThrowIllegalArgumentExceptionIfStoreNameIsSameAsCheckpointFileName() throws Exception { - final ProcessorStateManager stateManager = new ProcessorStateManager(taskId, - noPartitions, - false, - stateDirectory, - Collections.<String, String>emptyMap(), changelogReader); + final ProcessorStateManager stateManager = new ProcessorStateManager( + taskId, + noPartitions, + false, + stateDirectory, + Collections.<String, String>emptyMap(), + changelogReader, + false); try { stateManager.register(new MockStateStoreSupplier.MockStateStore(ProcessorStateManager.CHECKPOINT_FILE_NAME, true), true, null); @@ -372,11 +438,14 @@ public class ProcessorStateManagerTest { @Test public void shouldThrowIllegalArgumentExceptionOnRegisterWhenStoreHasAlreadyBeenRegistered() throws Exception { - final ProcessorStateManager stateManager = new ProcessorStateManager(taskId, - noPartitions, - false, - stateDirectory, - Collections.<String, String>emptyMap(), changelogReader); + final ProcessorStateManager stateManager = new ProcessorStateManager( + taskId, + noPartitions, + false, + stateDirectory, + Collections.<String, String>emptyMap(), + changelogReader, + false); stateManager.register(mockStateStore, false, null); try { @@ -391,12 +460,14 @@ public class ProcessorStateManagerTest { @Test public void shouldThrowProcessorStateExceptionOnCloseIfStoreThrowsAnException() throws Exception { - final ProcessorStateManager stateManager = new ProcessorStateManager(taskId, - Collections.singleton(changelogTopicPartition), - false, - stateDirectory, - Collections.singletonMap(storeName, changelogTopic), - changelogReader); + final ProcessorStateManager stateManager = new ProcessorStateManager( + taskId, + Collections.singleton(changelogTopicPartition), + false, + stateDirectory, + Collections.singletonMap(storeName, changelogTopic), + changelogReader, + false); final MockStateStoreSupplier.MockStateStore stateStore = new MockStateStoreSupplier.MockStateStore(storeName, true) { @Override @@ -414,5 +485,28 @@ public class ProcessorStateManagerTest { } } + @Test + public void shouldDeleteCheckpointFileOnCreationIfEosEnabled() throws Exception { + checkpoint.write(Collections.<TopicPartition, Long>emptyMap()); + assertTrue(checkpointFile.exists()); + + ProcessorStateManager stateManager = null; + try { + stateManager = new ProcessorStateManager( + taskId, + noPartitions, + false, + stateDirectory, + Collections.<String, String>emptyMap(), + changelogReader, + true); + + assertFalse(checkpointFile.exists()); + } finally { + if (stateManager != null) { + stateManager.close(null); + } + } + } } http://git-wip-us.apache.org/repos/asf/kafka/blob/ebc7f7ca/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java index 3add508..b8c86f2 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java @@ -43,7 +43,6 @@ import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.state.internals.InMemoryKeyValueStore; import org.apache.kafka.streams.state.internals.OffsetCheckpoint; -import org.apache.kafka.streams.state.internals.ThreadCache; import org.apache.kafka.test.MockProcessorNode; import org.apache.kafka.test.MockSourceNode; import org.apache.kafka.test.MockTimestampExtractor; @@ -112,14 +111,14 @@ public class StreamTaskTest { private final StreamsMetrics streamsMetrics = new MockStreamsMetrics(metrics); private final TaskId taskId00 = new TaskId(0, 0); private final MockTime time = new MockTime(); - private File baseDir; + private File baseDir = TestUtils.tempDirectory(); private StateDirectory stateDirectory; private final RecordCollectorImpl recordCollector = new RecordCollectorImpl(producer, "taskId"); - private final ThreadCache testCache = new ThreadCache("testCache", 0, streamsMetrics); private StreamsConfig config; + private StreamsConfig eosConfig; private StreamTask task; - private StreamsConfig createConfig(final File baseDir) throws Exception { + private StreamsConfig createConfig(final boolean enableEoS) throws Exception { return new StreamsConfig(new Properties() { { setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "stream-task-test"); @@ -127,6 +126,9 @@ public class StreamTaskTest { setProperty(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, "3"); setProperty(StreamsConfig.STATE_DIR_CONFIG, baseDir.getCanonicalPath()); setProperty(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MockTimestampExtractor.class.getName()); + if (enableEoS) { + setProperty(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE); + } } }); } @@ -139,23 +141,22 @@ public class StreamTaskTest { consumer.assign(Arrays.asList(partition1, partition2)); source1.addChild(processor); source2.addChild(processor); - baseDir = TestUtils.tempDirectory(); - config = createConfig(baseDir); + config = createConfig(false); + eosConfig = createConfig(true); stateDirectory = new StateDirectory("applicationId", baseDir.getPath(), new MockTime()); task = new StreamTask(taskId00, applicationId, partitions, topology, consumer, - changelogReader, config, streamsMetrics, stateDirectory, null, time, recordCollector); + changelogReader, config, streamsMetrics, stateDirectory, null, time, producer); } @After public void cleanup() throws IOException { - if (task != null) { - try { - task.close(); - } catch (final Exception e) { - // ignore exceptions + try { + if (task != null) { + task.close(true); } + } finally { + Utils.delete(baseDir); } - Utils.delete(baseDir); } @SuppressWarnings("unchecked") @@ -354,8 +355,12 @@ public class StreamTaskTest { }; final List<ProcessorNode> processorNodes = Collections.<ProcessorNode>singletonList(processorNode); - final Map<String, SourceNode> sourceNodes - = Collections.<String, SourceNode>singletonMap(topic1[0], processorNode); + final Map<String, SourceNode> sourceNodes = new HashMap() { + { + put(topic1[0], processorNode); + put(topic2[0], processorNode); + } + }; final ProcessorTopology topology = new ProcessorTopology(processorNodes, sourceNodes, Collections.<String, SinkNode>emptyMap(), @@ -363,10 +368,10 @@ public class StreamTaskTest { Collections.<String, String>emptyMap(), Collections.<StateStore>emptyList()); - task.close(); + task.close(true); - task = new StreamTask(taskId00, applicationId, Utils.mkSet(partition1), - topology, consumer, changelogReader, config, streamsMetrics, stateDirectory, testCache, time, recordCollector); + task = new StreamTask(taskId00, applicationId, partitions, topology, consumer, changelogReader, config, + streamsMetrics, stateDirectory, null, time, producer); final int offset = 20; task.addRecords(partition1, Collections.singletonList( new ConsumerRecord<>(partition1.topic(), partition1.partition(), offset, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue))); @@ -415,16 +420,20 @@ public class StreamTaskTest { @Test public void shouldFlushRecordCollectorOnFlushState() throws Exception { final AtomicBoolean flushed = new AtomicBoolean(false); - final NoOpRecordCollector recordCollector = new NoOpRecordCollector() { + final StreamsMetrics streamsMetrics = new MockStreamsMetrics(new Metrics()); + final StreamTask streamTask = new StreamTask(taskId00, "appId", partitions, topology, consumer, + changelogReader, config, streamsMetrics, stateDirectory, null, time, producer) { + @Override - public void flush() { - flushed.set(true); + RecordCollector createRecordCollector() { + return new NoOpRecordCollector() { + @Override + public void flush() { + flushed.set(true); + } + }; } }; - final StreamsMetrics streamsMetrics = new MockStreamsMetrics(new Metrics()); - final StreamTask streamTask = new StreamTask(taskId00, "appId", partitions, topology, consumer, - changelogReader, createConfig(baseDir), streamsMetrics, - stateDirectory, testCache, time, recordCollector); streamTask.flushState(); assertTrue(flushed.get()); } @@ -458,13 +467,6 @@ public class StreamTaskTest { Collections.<StateStore>emptyList()); final TopicPartition partition = new TopicPartition(changelogTopic, 0); - final NoOpRecordCollector recordCollector = new NoOpRecordCollector() { - @Override - public Map<TopicPartition, Long> offsets() { - - return Collections.singletonMap(partition, 543L); - } - }; restoreStateConsumer.updatePartitions(changelogTopic, Collections.singletonList( @@ -472,22 +474,29 @@ public class StreamTaskTest { restoreStateConsumer.updateEndOffsets(Collections.singletonMap(partition, 0L)); restoreStateConsumer.updateBeginningOffsets(Collections.singletonMap(partition, 0L)); - final StreamsMetrics streamsMetrics = new MockStreamsMetrics(new Metrics()); - final TaskId taskId = new TaskId(0, 0); - final MockTime time = new MockTime(); - final StreamsConfig config = createConfig(baseDir); - final StreamTask streamTask = new StreamTask(taskId, "appId", partitions, topology, consumer, - changelogReader, config, streamsMetrics, - stateDirectory, new ThreadCache("testCache", 0, streamsMetrics), - time, recordCollector); + final long offset = 543L; + final StreamTask streamTask = new StreamTask(taskId00, "appId", partitions, topology, consumer, + changelogReader, config, streamsMetrics, stateDirectory, null, time, producer) { + + @Override + RecordCollector createRecordCollector() { + return new NoOpRecordCollector() { + @Override + public Map<TopicPartition, Long> offsets() { + + return Collections.singletonMap(partition, offset); + } + }; + } + }; time.sleep(config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG)); streamTask.commit(); - final OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(stateDirectory.directoryForTask(taskId), + final OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(stateDirectory.directoryForTask(taskId00), ProcessorStateManager.CHECKPOINT_FILE_NAME)); - assertThat(checkpoint.read(), equalTo(Collections.singletonMap(partition, 544L))); + assertThat(checkpoint.read(), equalTo(Collections.singletonMap(partition, offset + 1))); } @Test @@ -528,45 +537,161 @@ public class StreamTaskTest { @SuppressWarnings("unchecked") @Test - public void shouldThrowExceptionIfAnyExceptionsRaisedDuringCloseTopology() throws Exception { - task.close(); + public void shouldThrowExceptionIfAnyExceptionsRaisedDuringCloseButStillCloseAllProcessorNodesTopology() throws Exception { + task.close(true); task = createTaskThatThrowsExceptionOnClose(); try { - task.close(); + task.close(true); fail("should have thrown runtime exception"); } catch (final RuntimeException e) { - // ok - } - } - - @Test - public void shouldCloseAllProcessorNodesWhenExceptionsRaised() throws Exception { - task.close(); - task = createTaskThatThrowsExceptionOnClose(); - try { - task.close(); - } catch (final RuntimeException e) { - // expected + task = null; } assertTrue(processor.closed); assertTrue(source1.closed); assertTrue(source2.closed); } - @SuppressWarnings("unchecked") @Test - public void shouldCloseProducerWhenExactlyOneEnabled() { - final Map properties = config.values(); - properties.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, "exactly_once"); - final StreamsConfig config = new StreamsConfig(properties); + public void shouldInitAndBeginTransactionOnCreateIfEosEnabled() throws Exception { + final MockProducer producer = new MockProducer(); + task = new StreamTask(taskId00, applicationId, partitions, topology, consumer, changelogReader, + eosConfig, streamsMetrics, stateDirectory, null, time, producer); + assertTrue(producer.transactionInitialized()); + assertTrue(producer.transactionInFlight()); + } + + @Test + public void shouldNotInitOrBeginTransactionOnCreateIfEosDisabled() throws Exception { final MockProducer producer = new MockProducer(); + task = new StreamTask(taskId00, applicationId, partitions, topology, consumer, changelogReader, + config, streamsMetrics, stateDirectory, null, time, producer); - task = new StreamTask(taskId00, applicationId, partitions, topology, consumer, - changelogReader, config, streamsMetrics, stateDirectory, null, time, new RecordCollectorImpl(producer, "taskId")); + assertFalse(producer.transactionInitialized()); + assertFalse(producer.transactionInFlight()); + } + + @Test + public void shouldSendOffsetsAndCommitTransactionButNotStartNewTransactionOnSuspendIfEosEnabled() throws Exception { + final MockProducer producer = new MockProducer(); + task = new StreamTask(taskId00, applicationId, partitions, topology, consumer, changelogReader, + eosConfig, streamsMetrics, stateDirectory, null, time, producer); + + task.addRecords(partition1, Collections.singletonList( + new ConsumerRecord<>(partition1.topic(), partition1.partition(), 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue))); + task.process(); - task.close(); + task.suspend(); + assertTrue(producer.sentOffsets()); + assertTrue(producer.transactionCommitted()); + assertFalse(producer.transactionInFlight()); + } + + @Test + public void shouldNotSendOffsetsAndCommitTransactionNorStartNewTransactionOnSuspendIfEosDisabled() throws Exception { + final MockProducer producer = new MockProducer(); + task = new StreamTask(taskId00, applicationId, partitions, topology, consumer, changelogReader, + config, streamsMetrics, stateDirectory, null, time, producer); + + task.addRecords(partition1, Collections.singletonList( + new ConsumerRecord<>(partition1.topic(), partition1.partition(), 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue))); + task.process(); + + task.suspend(); + assertFalse(producer.sentOffsets()); + assertFalse(producer.transactionCommitted()); + assertFalse(producer.transactionInFlight()); + } + + @Test + public void shouldStartNewTransactionOnResumeIfEosEnabled() throws Exception { + final MockProducer producer = new MockProducer(); + task = new StreamTask(taskId00, applicationId, partitions, topology, consumer, changelogReader, + eosConfig, streamsMetrics, stateDirectory, null, time, producer); + + task.addRecords(partition1, Collections.singletonList( + new ConsumerRecord<>(partition1.topic(), partition1.partition(), 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue))); + task.process(); + task.suspend(); + task.resume(); + assertTrue(producer.transactionInFlight()); + } + + @Test + public void shouldNotStartNewTransactionOnResumeIfEosDisabled() throws Exception { + final MockProducer producer = new MockProducer(); + task = new StreamTask(taskId00, applicationId, partitions, topology, consumer, changelogReader, + config, streamsMetrics, stateDirectory, null, time, producer); + + task.addRecords(partition1, Collections.singletonList( + new ConsumerRecord<>(partition1.topic(), partition1.partition(), 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue))); + task.process(); + task.suspend(); + + task.resume(); + assertFalse(producer.transactionInFlight()); + } + + @Test + public void shouldStartNewTransactionOnCommitIfEosEnabled() throws Exception { + final MockProducer producer = new MockProducer(); + task = new StreamTask(taskId00, applicationId, partitions, topology, consumer, changelogReader, + eosConfig, streamsMetrics, stateDirectory, null, time, producer); + + task.addRecords(partition1, Collections.singletonList( + new ConsumerRecord<>(partition1.topic(), partition1.partition(), 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue))); + task.process(); + + task.commit(); + assertTrue(producer.transactionInFlight()); + } + + @Test + public void shouldNotStartNewTransactionOnCommitIfEosDisabled() throws Exception { + final MockProducer producer = new MockProducer(); + task = new StreamTask(taskId00, applicationId, partitions, topology, consumer, changelogReader, + config, streamsMetrics, stateDirectory, null, time, producer); + + task.addRecords(partition1, Collections.singletonList( + new ConsumerRecord<>(partition1.topic(), partition1.partition(), 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue))); + task.process(); + + task.commit(); + assertFalse(producer.transactionInFlight()); + } + + @Test + public void shouldAbortTransactionOnDirtyClosedIfEosEnabled() throws Exception { + final MockProducer producer = new MockProducer(); + task = new StreamTask(taskId00, applicationId, partitions, topology, consumer, changelogReader, + eosConfig, streamsMetrics, stateDirectory, null, time, producer); + + task.close(false); + task = null; + assertTrue(producer.transactionAborted()); + } + + @Test + public void shouldNotAbortTransactionOnDirtyClosedIfEosDisabled() throws Exception { + final MockProducer producer = new MockProducer(); + task = new StreamTask(taskId00, applicationId, partitions, topology, consumer, changelogReader, + config, streamsMetrics, stateDirectory, null, time, producer); + + task.close(false); + assertFalse(producer.transactionAborted()); + } + + @SuppressWarnings("unchecked") + @Test + public void shouldCloseProducerOnCloseWhenEosEnabled() throws Exception { + final MockProducer producer = new MockProducer(); + + task = new StreamTask(taskId00, applicationId, partitions, topology, consumer, + changelogReader, eosConfig, streamsMetrics, stateDirectory, null, time, producer); + + task.close(true); + task = null; assertTrue(producer.closed()); } @@ -579,8 +704,12 @@ public class StreamTaskTest { } }; final List<ProcessorNode> processorNodes = Arrays.asList(processorNode, processor, source1, source2); - final Map<String, SourceNode> sourceNodes - = Collections.<String, SourceNode>singletonMap(topic1[0], processorNode); + final Map<String, SourceNode> sourceNodes = new HashMap() { + { + put(topic1[0], processorNode); + put(topic2[0], processorNode); + } + }; final ProcessorTopology topology = new ProcessorTopology(processorNodes, sourceNodes, Collections.<String, SinkNode>emptyMap(), @@ -588,34 +717,12 @@ public class StreamTaskTest { Collections.<String, String>emptyMap(), Collections.<StateStore>emptyList()); - return new StreamTask(taskId00, applicationId, Utils.mkSet(partition1), - topology, consumer, changelogReader, config, streamsMetrics, stateDirectory, testCache, time, recordCollector); + return new StreamTask(taskId00, applicationId, partitions, topology, consumer, changelogReader, config, + streamsMetrics, stateDirectory, null, time, producer); } private Iterable<ConsumerRecord<byte[], byte[]>> records(final ConsumerRecord<byte[], byte[]>... recs) { return Arrays.asList(recs); } - private final static class MockedProducer extends MockProducer { - private final AtomicBoolean flushed; - boolean closed = false; - - MockedProducer(final AtomicBoolean flushed) { - super(false, null, null); - this.flushed = flushed; - } - - @Override - public void flush() { - if (flushed != null) { - flushed.set(true); - } - } - - @Override - public void close() { - closed = true; - } - } - }
