Repository: kafka Updated Branches: refs/heads/trunk a1c8e7d94 -> 9198467eb
http://git-wip-us.apache.org/repos/asf/kafka/blob/9198467e/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java index e9c4ef9..3add508 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java @@ -126,7 +126,7 @@ public class StreamTaskTest { setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2171"); setProperty(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, "3"); setProperty(StreamsConfig.STATE_DIR_CONFIG, baseDir.getCanonicalPath()); - setProperty(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MockTimestampExtractor.class.getName()); + setProperty(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MockTimestampExtractor.class.getName()); } }); } @@ -365,8 +365,8 @@ public class StreamTaskTest { task.close(); - task = new StreamTask(taskId00, applicationId, partitions, - topology, consumer, changelogReader, config, streamsMetrics, stateDirectory, testCache, time, recordCollector); + task = new StreamTask(taskId00, applicationId, Utils.mkSet(partition1), + topology, consumer, changelogReader, config, streamsMetrics, stateDirectory, testCache, time, recordCollector); final int offset = 20; task.addRecords(partition1, Collections.singletonList( new ConsumerRecord<>(partition1.topic(), partition1.partition(), offset, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue))); @@ -445,8 +445,13 @@ public class StreamTaskTest { return true; } }; + Map<String, SourceNode> sourceByTopics = new HashMap() { { + put(partition1.topic(), source1); + put(partition2.topic(), source2); + } + }; final ProcessorTopology topology = new ProcessorTopology(Collections.<ProcessorNode>emptyList(), - Collections.<String, SourceNode>emptyMap(), + sourceByTopics, Collections.<String, SinkNode>emptyMap(), Collections.<StateStore>singletonList(inMemoryStore), Collections.singletonMap(storeName, changelogTopic), @@ -583,8 +588,8 @@ public class StreamTaskTest { Collections.<String, String>emptyMap(), Collections.<StateStore>emptyList()); - return new StreamTask(taskId00, applicationId, partitions, - topology, consumer, changelogReader, config, streamsMetrics, stateDirectory, testCache, time, recordCollector); + return new StreamTask(taskId00, applicationId, Utils.mkSet(partition1), + topology, consumer, changelogReader, config, streamsMetrics, stateDirectory, testCache, time, recordCollector); } private Iterable<ConsumerRecord<byte[], byte[]>> records(final ConsumerRecord<byte[], byte[]>... recs) { http://git-wip-us.apache.org/repos/asf/kafka/blob/9198467e/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java index 5b44260..7abe4dd 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java @@ -47,6 +47,7 @@ import org.junit.Before; import org.junit.Test; import java.io.File; +import java.lang.reflect.Field; import java.nio.ByteBuffer; import java.nio.file.Files; import java.util.Arrays; @@ -140,7 +141,7 @@ public class StreamThreadTest { setProperty(StreamsConfig.APPLICATION_ID_CONFIG, applicationId); setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2171"); setProperty(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, "3"); - setProperty(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MockTimestampExtractor.class.getName()); + setProperty(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MockTimestampExtractor.class.getName()); setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath()); } }; @@ -344,6 +345,7 @@ public class StreamThreadTest { .persistent() .build() ); + builder.addSource("source", TOPIC); final StreamsConfig config = new StreamsConfig(configProps()); final MockClientSupplier mockClientSupplier = new MockClientSupplier(); mockClientSupplier.consumer.assign(Arrays.asList(new TopicPartition(TOPIC, 0), new TopicPartition(TOPIC, 1))); @@ -683,7 +685,7 @@ public class StreamThreadTest { @Test public void shouldInjectSharedProducerForAllTasksUsingClientSupplierWhenEosDisabled() { - final TopologyBuilder builder = new TopologyBuilder().setApplicationId("X"); + final TopologyBuilder builder = new TopologyBuilder().setApplicationId("X").addSource("source1", "someTopic"); final StreamsConfig config = new StreamsConfig(configProps()); final MockClientSupplier clientSupplier = new MockClientSupplier(); final StreamThread thread = new StreamThread( @@ -717,7 +719,7 @@ public class StreamThreadTest { @Test public void shouldInjectProducerPerTaskUsingClientSupplierForEoS() { - final TopologyBuilder builder = new TopologyBuilder().setApplicationId("X"); + final TopologyBuilder builder = new TopologyBuilder().setApplicationId("X").addSource("source1", "someTopic"); final Properties properties = configProps(); properties.setProperty(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE); final StreamsConfig config = new StreamsConfig(properties); @@ -756,7 +758,7 @@ public class StreamThreadTest { @Test public void shouldCloseAllTaskProducers() { - final TopologyBuilder builder = new TopologyBuilder().setApplicationId("X"); + final TopologyBuilder builder = new TopologyBuilder().setApplicationId("X").addSource("source1", "someTopic"); final Properties properties = configProps(); properties.setProperty(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE); final StreamsConfig config = new StreamsConfig(properties); @@ -790,7 +792,7 @@ public class StreamThreadTest { @Test public void shouldCloseThreadProducer() { - final TopologyBuilder builder = new TopologyBuilder().setApplicationId("X"); + final TopologyBuilder builder = new TopologyBuilder().setApplicationId("X").addSource("source1", "someTopic"); final StreamsConfig config = new StreamsConfig(configProps()); final MockClientSupplier clientSupplier = new MockClientSupplier(); final StreamThread thread = new StreamThread( @@ -993,6 +995,13 @@ public class StreamThreadTest { } }); + StreamPartitionAssignor.SubscriptionUpdates subscriptionUpdates = new StreamPartitionAssignor.SubscriptionUpdates(); + Field updatedTopicsField = subscriptionUpdates.getClass().getDeclaredField("updatedTopicSubscriptions"); + updatedTopicsField.setAccessible(true); + Set<String> updatedTopics = (Set<String>) updatedTopicsField.get(subscriptionUpdates); + updatedTopics.add(t1.topic()); + builder.updateSubscriptions(subscriptionUpdates, null); + // should create task for id 0_0 with a single partition thread.rebalanceListener.onPartitionsRevoked(Collections.<TopicPartition>emptyList()); thread.rebalanceListener.onPartitionsAssigned(task00Partitions); @@ -1002,6 +1011,8 @@ public class StreamThreadTest { // update assignment for the task 0_0 so it now has 2 partitions task00Partitions.add(new TopicPartition("t2", 0)); + updatedTopics.add("t2"); + thread.rebalanceListener.onPartitionsRevoked(Collections.<TopicPartition>emptyList()); thread.rebalanceListener.onPartitionsAssigned(task00Partitions); http://git-wip-us.apache.org/repos/asf/kafka/blob/9198467e/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java index 9a60197..b4598fd 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java @@ -221,9 +221,9 @@ public class KeyValueStoreTestDriver<K, V> { props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "application-id"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); - props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MockTimestampExtractor.class); - props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, serdes.keySerde().getClass()); - props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, serdes.valueSerde().getClass()); + props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MockTimestampExtractor.class); + props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, serdes.keySerde().getClass()); + props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, serdes.valueSerde().getClass()); props.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, RocksDBKeyValueStoreTest.TheRocksDbConfigSetter.class); context = new MockProcessorContext(stateDir, serdes.keySerde(), serdes.valueSerde(), recordCollector, null) { http://git-wip-us.apache.org/repos/asf/kafka/blob/9198467e/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java index dec3718..f8b17b2 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java @@ -68,11 +68,12 @@ public class StreamThreadStateStoreProviderTest { private StateDirectory stateDirectory; private File stateDir; private boolean storesAvailable; + private final String topicName = "topic"; @Before public void before() throws IOException { final TopologyBuilder builder = new TopologyBuilder(); - builder.addSource("the-source", "the-source"); + builder.addSource("the-source", topicName); builder.addProcessor("the-processor", new MockProcessorSupplier(), "the-source"); builder.addStateStore(Stores.create("kv-store") .withStringKeys() @@ -188,7 +189,7 @@ public class StreamThreadStateStoreProviderTest { final ProcessorTopology topology, final TaskId taskId) { return new StreamTask(taskId, applicationId, Collections - .singletonList(new TopicPartition("topic", taskId.partition)), topology, + .singletonList(new TopicPartition(topicName, taskId.partition)), topology, clientSupplier.consumer, new StoreChangelogReader(clientSupplier.restoreConsumer, Time.SYSTEM, 5000), streamsConfig, new MockStreamsMetrics(new Metrics()), stateDirectory, null, new MockTime(), new NoOpRecordCollector()) { http://git-wip-us.apache.org/repos/asf/kafka/blob/9198467e/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java index 88d1ccc..c04b3d1 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java @@ -55,8 +55,8 @@ public class BrokerCompatibilityTest { streamsProperties.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-streams-system-test-broker-compatibility"); streamsProperties.put(StreamsConfig.STATE_DIR_CONFIG, stateDir.toString()); streamsProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - streamsProperties.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); - streamsProperties.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); + streamsProperties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); + streamsProperties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); streamsProperties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100); final int timeout = 6000; streamsProperties.put(StreamsConfig.consumerPrefix(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG), timeout); http://git-wip-us.apache.org/repos/asf/kafka/blob/9198467e/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java index 1e97e11..5d46ce0 100644 --- a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java +++ b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java @@ -92,7 +92,7 @@ import java.io.IOException; * StringDeserializer strDeserializer = new StringDeserializer(); * Properties props = new Properties(); * props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091"); - * props.setProperty(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, CustomTimestampExtractor.class.getName()); + * props.setProperty(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, CustomTimestampExtractor.class.getName()); * props.setProperty(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, strSerializer.getClass().getName()); * props.setProperty(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, strDeserializer.getClass().getName()); * props.setProperty(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, strSerializer.getClass().getName()); http://git-wip-us.apache.org/repos/asf/kafka/blob/9198467e/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java b/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java index 1e5c1f8..30ec90a 100644 --- a/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java +++ b/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java @@ -39,8 +39,8 @@ public class StreamsTestUtils { streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId); streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); streamsConfiguration.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "1000"); - streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, keySerdeClassName); - streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, valueSerdeClassName); + streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, keySerdeClassName); + streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, valueSerdeClassName); streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
