Repository: kafka
Updated Branches:
  refs/heads/trunk a1c8e7d94 -> 9198467eb


http://git-wip-us.apache.org/repos/asf/kafka/blob/9198467e/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index e9c4ef9..3add508 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -126,7 +126,7 @@ public class StreamTaskTest {
                 setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
"localhost:2171");
                 
setProperty(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, "3");
                 setProperty(StreamsConfig.STATE_DIR_CONFIG, 
baseDir.getCanonicalPath());
-                setProperty(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, 
MockTimestampExtractor.class.getName());
+                
setProperty(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, 
MockTimestampExtractor.class.getName());
             }
         });
     }
@@ -365,8 +365,8 @@ public class StreamTaskTest {
 
         task.close();
 
-        task  = new StreamTask(taskId00, applicationId, partitions,
-                                                     topology, consumer, 
changelogReader, config, streamsMetrics, stateDirectory, testCache, time, 
recordCollector);
+        task  = new StreamTask(taskId00, applicationId, 
Utils.mkSet(partition1),
+                topology, consumer, changelogReader, config, streamsMetrics, 
stateDirectory, testCache, time, recordCollector);
         final int offset = 20;
         task.addRecords(partition1, Collections.singletonList(
                 new ConsumerRecord<>(partition1.topic(), 
partition1.partition(), offset, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, 
recordKey, recordValue)));
@@ -445,8 +445,13 @@ public class StreamTaskTest {
                 return true;
             }
         };
+        Map<String, SourceNode> sourceByTopics =  new HashMap() { {
+                put(partition1.topic(), source1);
+                put(partition2.topic(), source2);
+            }
+        };
         final ProcessorTopology topology = new 
ProcessorTopology(Collections.<ProcessorNode>emptyList(),
-                                                                 
Collections.<String, SourceNode>emptyMap(),
+                                                                 
sourceByTopics,
                                                                  
Collections.<String, SinkNode>emptyMap(),
                                                                  
Collections.<StateStore>singletonList(inMemoryStore),
                                                                  
Collections.singletonMap(storeName, changelogTopic),
@@ -583,8 +588,8 @@ public class StreamTaskTest {
                                                                  
Collections.<String, String>emptyMap(),
                                                                  
Collections.<StateStore>emptyList());
 
-        return new StreamTask(taskId00, applicationId, partitions,
-                              topology, consumer, changelogReader, config, 
streamsMetrics, stateDirectory, testCache, time, recordCollector);
+        return new StreamTask(taskId00, applicationId, Utils.mkSet(partition1),
+                topology, consumer, changelogReader, config, streamsMetrics, 
stateDirectory, testCache, time, recordCollector);
     }
 
     private Iterable<ConsumerRecord<byte[], byte[]>> records(final 
ConsumerRecord<byte[], byte[]>... recs) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/9198467e/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 5b44260..7abe4dd 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -47,6 +47,7 @@ import org.junit.Before;
 import org.junit.Test;
 
 import java.io.File;
+import java.lang.reflect.Field;
 import java.nio.ByteBuffer;
 import java.nio.file.Files;
 import java.util.Arrays;
@@ -140,7 +141,7 @@ public class StreamThreadTest {
                 setProperty(StreamsConfig.APPLICATION_ID_CONFIG, 
applicationId);
                 setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
"localhost:2171");
                 
setProperty(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, "3");
-                setProperty(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, 
MockTimestampExtractor.class.getName());
+                
setProperty(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, 
MockTimestampExtractor.class.getName());
                 setProperty(StreamsConfig.STATE_DIR_CONFIG, 
TestUtils.tempDirectory().getAbsolutePath());
             }
         };
@@ -344,6 +345,7 @@ public class StreamThreadTest {
                 .persistent()
                 .build()
         );
+        builder.addSource("source", TOPIC);
         final StreamsConfig config = new StreamsConfig(configProps());
         final MockClientSupplier mockClientSupplier = new MockClientSupplier();
         mockClientSupplier.consumer.assign(Arrays.asList(new 
TopicPartition(TOPIC, 0), new TopicPartition(TOPIC, 1)));
@@ -683,7 +685,7 @@ public class StreamThreadTest {
 
     @Test
     public void 
shouldInjectSharedProducerForAllTasksUsingClientSupplierWhenEosDisabled() {
-        final TopologyBuilder builder = new 
TopologyBuilder().setApplicationId("X");
+        final TopologyBuilder builder = new 
TopologyBuilder().setApplicationId("X").addSource("source1", "someTopic");
         final StreamsConfig config = new StreamsConfig(configProps());
         final MockClientSupplier clientSupplier = new MockClientSupplier();
         final StreamThread thread = new StreamThread(
@@ -717,7 +719,7 @@ public class StreamThreadTest {
 
     @Test
     public void shouldInjectProducerPerTaskUsingClientSupplierForEoS() {
-        final TopologyBuilder builder = new 
TopologyBuilder().setApplicationId("X");
+        final TopologyBuilder builder = new 
TopologyBuilder().setApplicationId("X").addSource("source1", "someTopic");
         final Properties properties = configProps();
         properties.setProperty(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, 
StreamsConfig.EXACTLY_ONCE);
         final StreamsConfig config = new StreamsConfig(properties);
@@ -756,7 +758,7 @@ public class StreamThreadTest {
 
     @Test
     public void shouldCloseAllTaskProducers() {
-        final TopologyBuilder builder = new 
TopologyBuilder().setApplicationId("X");
+        final TopologyBuilder builder = new 
TopologyBuilder().setApplicationId("X").addSource("source1", "someTopic");
         final Properties properties = configProps();
         properties.setProperty(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, 
StreamsConfig.EXACTLY_ONCE);
         final StreamsConfig config = new StreamsConfig(properties);
@@ -790,7 +792,7 @@ public class StreamThreadTest {
 
     @Test
     public void shouldCloseThreadProducer() {
-        final TopologyBuilder builder = new 
TopologyBuilder().setApplicationId("X");
+        final TopologyBuilder builder = new 
TopologyBuilder().setApplicationId("X").addSource("source1", "someTopic");
         final StreamsConfig config = new StreamsConfig(configProps());
         final MockClientSupplier clientSupplier = new MockClientSupplier();
         final StreamThread thread = new StreamThread(
@@ -993,6 +995,13 @@ public class StreamThreadTest {
             }
         });
 
+        StreamPartitionAssignor.SubscriptionUpdates subscriptionUpdates = new 
StreamPartitionAssignor.SubscriptionUpdates();
+        Field updatedTopicsField  = 
subscriptionUpdates.getClass().getDeclaredField("updatedTopicSubscriptions");
+        updatedTopicsField.setAccessible(true);
+        Set<String> updatedTopics = (Set<String>) 
updatedTopicsField.get(subscriptionUpdates);
+        updatedTopics.add(t1.topic());
+        builder.updateSubscriptions(subscriptionUpdates, null);
+
         // should create task for id 0_0 with a single partition
         
thread.rebalanceListener.onPartitionsRevoked(Collections.<TopicPartition>emptyList());
         thread.rebalanceListener.onPartitionsAssigned(task00Partitions);
@@ -1002,6 +1011,8 @@ public class StreamThreadTest {
 
         // update assignment for the task 0_0 so it now has 2 partitions
         task00Partitions.add(new TopicPartition("t2", 0));
+        updatedTopics.add("t2");
+
         
thread.rebalanceListener.onPartitionsRevoked(Collections.<TopicPartition>emptyList());
         thread.rebalanceListener.onPartitionsAssigned(task00Partitions);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/9198467e/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
index 9a60197..b4598fd 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
@@ -221,9 +221,9 @@ public class KeyValueStoreTestDriver<K, V> {
         props = new Properties();
         props.put(StreamsConfig.APPLICATION_ID_CONFIG, "application-id");
         props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
-        props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, 
MockTimestampExtractor.class);
-        props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, 
serdes.keySerde().getClass());
-        props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, 
serdes.valueSerde().getClass());
+        props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, 
MockTimestampExtractor.class);
+        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
serdes.keySerde().getClass());
+        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
serdes.valueSerde().getClass());
         props.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, 
RocksDBKeyValueStoreTest.TheRocksDbConfigSetter.class);
 
         context = new MockProcessorContext(stateDir, serdes.keySerde(), 
serdes.valueSerde(), recordCollector, null) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/9198467e/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
index dec3718..f8b17b2 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
@@ -68,11 +68,12 @@ public class StreamThreadStateStoreProviderTest {
     private StateDirectory stateDirectory;
     private File stateDir;
     private boolean storesAvailable;
+    private final String topicName = "topic";
 
     @Before
     public void before() throws IOException {
         final TopologyBuilder builder = new TopologyBuilder();
-        builder.addSource("the-source", "the-source");
+        builder.addSource("the-source", topicName);
         builder.addProcessor("the-processor", new MockProcessorSupplier(), 
"the-source");
         builder.addStateStore(Stores.create("kv-store")
                                   .withStringKeys()
@@ -188,7 +189,7 @@ public class StreamThreadStateStoreProviderTest {
                                          final ProcessorTopology topology,
                                          final TaskId taskId) {
         return new StreamTask(taskId, applicationId, Collections
-                .singletonList(new TopicPartition("topic", taskId.partition)), 
topology,
+                .singletonList(new TopicPartition(topicName, 
taskId.partition)), topology,
                               clientSupplier.consumer,
                               new 
StoreChangelogReader(clientSupplier.restoreConsumer, Time.SYSTEM, 5000),
                               streamsConfig, new MockStreamsMetrics(new 
Metrics()), stateDirectory, null, new MockTime(), new NoOpRecordCollector()) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/9198467e/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java
index 88d1ccc..c04b3d1 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java
@@ -55,8 +55,8 @@ public class BrokerCompatibilityTest {
         streamsProperties.put(StreamsConfig.APPLICATION_ID_CONFIG, 
"kafka-streams-system-test-broker-compatibility");
         streamsProperties.put(StreamsConfig.STATE_DIR_CONFIG, 
stateDir.toString());
         streamsProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest");
-        streamsProperties.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, 
Serdes.String().getClass());
-        streamsProperties.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, 
Serdes.String().getClass());
+        streamsProperties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.String().getClass());
+        streamsProperties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
Serdes.String().getClass());
         streamsProperties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
         final int timeout = 6000;
         
streamsProperties.put(StreamsConfig.consumerPrefix(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG),
 timeout);

http://git-wip-us.apache.org/repos/asf/kafka/blob/9198467e/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java 
b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
index 1e97e11..5d46ce0 100644
--- 
a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
+++ 
b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
@@ -92,7 +92,7 @@ import java.io.IOException;
  * StringDeserializer strDeserializer = new StringDeserializer();
  * Properties props = new Properties();
  * props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
- * props.setProperty(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, 
CustomTimestampExtractor.class.getName());
+ * props.setProperty(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, 
CustomTimestampExtractor.class.getName());
  * props.setProperty(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, 
strSerializer.getClass().getName());
  * props.setProperty(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
strDeserializer.getClass().getName());
  * props.setProperty(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
strSerializer.getClass().getName());

http://git-wip-us.apache.org/repos/asf/kafka/blob/9198467e/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java 
b/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java
index 1e5c1f8..30ec90a 100644
--- a/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java
+++ b/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java
@@ -39,8 +39,8 @@ public class StreamsTestUtils {
         streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, 
applicationId);
         streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServers);
         streamsConfiguration.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, 
"1000");
-        streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, 
keySerdeClassName);
-        streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, 
valueSerdeClassName);
+        streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
keySerdeClassName);
+        
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
valueSerdeClassName);
         streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest");
         streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, 
TestUtils.tempDirectory().getPath());
         
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);

Reply via email to