[ 
https://issues.apache.org/jira/browse/KAFKA-6592?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16435886#comment-16435886
 ] 

ASF GitHub Bot commented on KAFKA-6592:
---------------------------------------

guozhangwang closed pull request #4797: KAFKA-6592: ConsoleConsumer should 
support WindowedSerdes
URL: https://github.com/apache/kafka/pull/4797
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala 
b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
index 24fa583f1bb..9df4fb42070 100755
--- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
+++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
@@ -37,6 +37,7 @@ import org.apache.kafka.common.record.TimestampType
 import org.apache.kafka.common.serialization.{ByteArrayDeserializer, 
Deserializer}
 import org.apache.kafka.common.utils.Utils
 
+import scala.collection.JavaConversions
 import scala.collection.JavaConverters._
 
 /**
@@ -45,6 +46,11 @@ import scala.collection.JavaConverters._
 object ConsoleConsumer extends Logging {
 
   var messageCount = 0
+  // Keep same names with StreamConfig.DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS
+  // and StreamConfig.DEFAULT_WINDOWED_VALUE_SERDE_INNER_CLASS
+  // visible for testing
+  private[tools] val innerKeySerdeName = "default.windowed.key.serde.inner"
+  private[tools] val innerValueSerdeName = "default.windowed.value.serde.inner"
 
   private val shutdownLatch = new CountDownLatch(1)
 
@@ -291,7 +297,17 @@ object ConsoleConsumer extends Logging {
       .describedAs("class")
       .ofType(classOf[String])
       .defaultsTo(classOf[DefaultMessageFormatter].getName)
-    val messageFormatterArgOpt = parser.accepts("property", "The properties to 
initialize the message formatter.")
+    val messageFormatterArgOpt = parser.accepts("property",
+      "The properties to initialize the message formatter. Default properties 
include:\n" +
+        "\tprint.timestamp=true|false\n" +
+        "\tprint.key=true|false\n" +
+        "\tprint.value=true|false\n" +
+        "\tkey.separator=<key.separator>\n" +
+        "\tline.separator=<line.separator>\n" +
+        "\tkey.deserializer=<key.deserializer>\n" +
+        "\tvalue.deserializer=<value.deserializer>\n" +
+        "\tdefault.windowed.key.serde.inner=<windowed.key.serde.inner>\n" +
+        "\tdefault.windowed.value.serde.inner=<windowed.value.serde.inner>")
       .withRequiredArg
       .describedAs("prop")
       .ofType(classOf[String])
@@ -328,6 +344,18 @@ object ConsoleConsumer extends Logging {
       .withRequiredArg
       .describedAs("deserializer for values")
       .ofType(classOf[String])
+    val innerKeyDeserializerOpt = parser.accepts(innerKeySerdeName,
+      "inner serde for key when windowed deserialzier is used; would be 
ignored otherwise. " +
+        "For example: 
org.apache.kafka.common.serialization.Serdes\\$StringSerde")
+      .withRequiredArg
+      .describedAs("inner serde for key")
+      .ofType(classOf[String])
+    val innerValueDeserializerOpt = parser.accepts(innerValueSerdeName,
+      "inner serde for value when windowed deserialzier is used; would be 
ignored otherwise. " +
+        "For example: 
org.apache.kafka.common.serialization.Serdes\\$StringSerde")
+      .withRequiredArg
+      .describedAs("inner serde for values")
+      .ofType(classOf[String])
     val enableSystestEventsLoggingOpt = parser.accepts("enable-systest-events",
                                                        "Log lifecycle events 
of the consumer in addition to logging consumed " +
                                                        "messages. (This is 
specific for system tests.)")
@@ -372,6 +400,8 @@ object ConsoleConsumer extends Logging {
     val bootstrapServer = options.valueOf(bootstrapServerOpt)
     val keyDeserializer = options.valueOf(keyDeserializerOpt)
     val valueDeserializer = options.valueOf(valueDeserializerOpt)
+    val innerKeyDeserializer = options.valueOf(innerKeyDeserializerOpt)
+    val innerValueDeserializer = options.valueOf(innerValueDeserializerOpt)
     val isolationLevel = options.valueOf(isolationLevelOpt).toString
     val formatter: MessageFormatter = 
messageFormatterClass.newInstance().asInstanceOf[MessageFormatter]
 
@@ -381,6 +411,13 @@ object ConsoleConsumer extends Logging {
     if (valueDeserializer != null && !valueDeserializer.isEmpty) {
       
formatterArgs.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
valueDeserializer)
     }
+    if (innerKeyDeserializer != null && !innerKeyDeserializer.isEmpty) {
+      formatterArgs.setProperty(innerKeySerdeName, innerKeyDeserializer)
+    }
+    if (innerValueDeserializer != null && !innerValueDeserializer.isEmpty) {
+      formatterArgs.setProperty(innerValueSerdeName, innerValueDeserializer)
+    }
+
     formatter.init(formatterArgs)
 
     if (useOldConsumer) {
@@ -521,11 +558,15 @@ class DefaultMessageFormatter extends MessageFormatter {
     if (props.containsKey("line.separator"))
       lineSeparator = 
props.getProperty("line.separator").getBytes(StandardCharsets.UTF_8)
     // Note that `toString` will be called on the instance returned by 
`Deserializer.deserialize`
-    if (props.containsKey("key.deserializer"))
+    if (props.containsKey("key.deserializer")) {
       keyDeserializer = 
Some(Class.forName(props.getProperty("key.deserializer")).newInstance().asInstanceOf[Deserializer[_]])
+      
keyDeserializer.get.configure(JavaConversions.propertiesAsScalaMap(props).asJava,
 true)
+    }
     // Note that `toString` will be called on the instance returned by 
`Deserializer.deserialize`
-    if (props.containsKey("value.deserializer"))
+    if (props.containsKey("value.deserializer")) {
       valueDeserializer = 
Some(Class.forName(props.getProperty("value.deserializer")).newInstance().asInstanceOf[Deserializer[_]])
+      
valueDeserializer.get.configure(JavaConversions.propertiesAsScalaMap(props).asJava,
 false)
+    }
   }
 
   def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], 
output: PrintStream) {
diff --git a/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala 
b/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala
index 9ae8b966ac0..f5195c3cf76 100644
--- a/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala
+++ b/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala
@@ -25,6 +25,7 @@ import kafka.utils.{Exit, TestUtils}
 import org.apache.kafka.clients.consumer.{ConsumerRecord, MockConsumer, 
OffsetResetStrategy}
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.clients.consumer.ConsumerConfig
+import org.apache.kafka.common.serialization.ByteArrayDeserializer
 import org.easymock.EasyMock
 import org.junit.Assert._
 import org.junit.{Before, Test}
@@ -537,4 +538,22 @@ class ConsoleConsumerTest {
 
     Exit.resetExitProcedure()
   }
+
+  @Test
+  def testCustomPropertyShouldBePassedToConfigureMethod(): Unit = {
+    val args = Array(
+      "--bootstrap-server", "localhost:9092",
+      "--topic", "test",
+      "--property", "print.key=true",
+      "--property", 
"key.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer",
+      "--" + ConsoleConsumer.innerKeySerdeName, 
"org.apache.kafka.common.serialization.Serdes$StringSerde",
+      "--property", "my-test1=abc"
+    )
+    val config = new ConsoleConsumer.ConsumerConfig(args)
+    assertTrue(config.formatter.isInstanceOf[DefaultMessageFormatter])
+    val formatter = config.formatter.asInstanceOf[DefaultMessageFormatter]
+    
assertTrue(formatter.keyDeserializer.get.isInstanceOf[ByteArrayDeserializer])
+    assertTrue(config.formatterArgs.containsKey("my-test1"))
+    
assertTrue(config.formatterArgs.containsKey(ConsoleConsumer.innerKeySerdeName))
+  }
 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
index 4527c19b471..fc673d0258e 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
@@ -16,12 +16,14 @@
  */
 package org.apache.kafka.streams.integration;
 
+import kafka.tools.ConsoleConsumer;
 import kafka.utils.MockTime;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.IntegerDeserializer;
 import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
@@ -43,10 +45,14 @@
 import org.apache.kafka.streams.kstream.Produced;
 import org.apache.kafka.streams.kstream.Reducer;
 import org.apache.kafka.streams.kstream.Serialized;
+import org.apache.kafka.streams.kstream.SessionWindowedDeserializer;
 import org.apache.kafka.streams.kstream.SessionWindows;
+import org.apache.kafka.streams.kstream.TimeWindowedDeserializer;
 import org.apache.kafka.streams.kstream.TimeWindows;
 import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.WindowedSerdes;
 import org.apache.kafka.streams.kstream.internals.SessionWindow;
+import org.apache.kafka.streams.kstream.internals.TimeWindow;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.QueryableStoreTypes;
 import org.apache.kafka.streams.state.ReadOnlySessionStore;
@@ -60,14 +66,18 @@
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
+import java.io.ByteArrayOutputStream;
 import java.io.IOException;
+import java.io.PrintStream;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
@@ -75,6 +85,7 @@
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.core.Is.is;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 
 @Category({IntegrationTest.class})
 public class KStreamAggregationIntegrationTest {
@@ -205,32 +216,36 @@ public void shouldReduceWindowed() throws Exception {
         produceMessages(secondBatchTimestamp);
         produceMessages(secondBatchTimestamp);
 
+        Serde<Windowed<String>> windowedSerde = 
WindowedSerdes.timeWindowedSerdeFrom(String.class);
         groupedStream
                 .windowedBy(TimeWindows.of(500L))
                 .reduce(reducer)
-                .toStream(new KeyValueMapper<Windowed<String>, String, 
String>() {
-                    @Override
-                    public String apply(final Windowed<String> windowedKey, 
final String value) {
-                        return windowedKey.key() + "@" + 
windowedKey.window().start();
-                    }
-                })
-            .to(outputTopic, Produced.with(Serdes.String(), Serdes.String()));
-
+                .toStream()
+                .to(outputTopic, Produced.with(windowedSerde, 
Serdes.String()));
 
         startStreams();
 
-        final List<KeyValue<String, String>> windowedOutput = receiveMessages(
-            new StringDeserializer(),
+        final List<KeyValue<Windowed<String>, String>> windowedOutput = 
receiveMessages(
+            new TimeWindowedDeserializer<String>(),
             new StringDeserializer(),
+            String.class,
             15);
 
-        final Comparator<KeyValue<String, String>>
+        // read from ConsoleConsumer
+        String resultFromConsoleConsumer = 
readWindowedKeyedMessagesViaConsoleConsumer(
+                new TimeWindowedDeserializer<String>(),
+                new StringDeserializer(),
+                String.class,
+                15);
+
+        final Comparator<KeyValue<Windowed<String>, String>>
             comparator =
-            new Comparator<KeyValue<String, String>>() {
+            new Comparator<KeyValue<Windowed<String>, String>>() {
                 @Override
-                public int compare(final KeyValue<String, String> o1,
-                                   final KeyValue<String, String> o2) {
-                    return KStreamAggregationIntegrationTest.compare(o1, o2);
+                public int compare(final KeyValue<Windowed<String>, String> o1,
+                                   final KeyValue<Windowed<String>, String> 
o2) {
+                    final int keyComparison = 
o1.key.key().compareTo(o2.key.key());
+                    return keyComparison == 0 ? o1.value.compareTo(o2.value) : 
keyComparison;
                 }
             };
 
@@ -238,25 +253,36 @@ public int compare(final KeyValue<String, String> o1,
         final long firstBatchWindow = firstBatchTimestamp / 500 * 500;
         final long secondBatchWindow = secondBatchTimestamp / 500 * 500;
 
-        assertThat(windowedOutput, is(
-            Arrays.asList(
-                new KeyValue<>("A@" + firstBatchWindow, "A"),
-                new KeyValue<>("A@" + secondBatchWindow, "A"),
-                new KeyValue<>("A@" + secondBatchWindow, "A:A"),
-                new KeyValue<>("B@" + firstBatchWindow, "B"),
-                new KeyValue<>("B@" + secondBatchWindow, "B"),
-                new KeyValue<>("B@" + secondBatchWindow, "B:B"),
-                new KeyValue<>("C@" + firstBatchWindow, "C"),
-                new KeyValue<>("C@" + secondBatchWindow, "C"),
-                new KeyValue<>("C@" + secondBatchWindow, "C:C"),
-                new KeyValue<>("D@" + firstBatchWindow, "D"),
-                new KeyValue<>("D@" + secondBatchWindow, "D"),
-                new KeyValue<>("D@" + secondBatchWindow, "D:D"),
-                new KeyValue<>("E@" + firstBatchWindow, "E"),
-                new KeyValue<>("E@" + secondBatchWindow, "E"),
-                new KeyValue<>("E@" + secondBatchWindow, "E:E")
-            )
-        ));
+        List<KeyValue<Windowed<String>, String>> expectResult = Arrays.asList(
+                new KeyValue<>(new Windowed<>("A", new 
TimeWindow(firstBatchWindow, Long.MAX_VALUE)), "A"),
+                new KeyValue<>(new Windowed<>("A", new 
TimeWindow(secondBatchWindow, Long.MAX_VALUE)), "A"),
+                new KeyValue<>(new Windowed<>("A", new 
TimeWindow(secondBatchWindow, Long.MAX_VALUE)), "A:A"),
+                new KeyValue<>(new Windowed<>("B", new 
TimeWindow(firstBatchWindow, Long.MAX_VALUE)), "B"),
+                new KeyValue<>(new Windowed<>("B", new 
TimeWindow(secondBatchWindow, Long.MAX_VALUE)), "B"),
+                new KeyValue<>(new Windowed<>("B", new 
TimeWindow(secondBatchWindow, Long.MAX_VALUE)), "B:B"),
+                new KeyValue<>(new Windowed<>("C", new 
TimeWindow(firstBatchWindow, Long.MAX_VALUE)), "C"),
+                new KeyValue<>(new Windowed<>("C", new 
TimeWindow(secondBatchWindow, Long.MAX_VALUE)), "C"),
+                new KeyValue<>(new Windowed<>("C", new 
TimeWindow(secondBatchWindow, Long.MAX_VALUE)), "C:C"),
+                new KeyValue<>(new Windowed<>("D", new 
TimeWindow(firstBatchWindow, Long.MAX_VALUE)), "D"),
+                new KeyValue<>(new Windowed<>("D", new 
TimeWindow(secondBatchWindow, Long.MAX_VALUE)), "D"),
+                new KeyValue<>(new Windowed<>("D", new 
TimeWindow(secondBatchWindow, Long.MAX_VALUE)), "D:D"),
+                new KeyValue<>(new Windowed<>("E", new 
TimeWindow(firstBatchWindow, Long.MAX_VALUE)), "E"),
+                new KeyValue<>(new Windowed<>("E", new 
TimeWindow(secondBatchWindow, Long.MAX_VALUE)), "E"),
+                new KeyValue<>(new Windowed<>("E", new 
TimeWindow(secondBatchWindow, Long.MAX_VALUE)), "E:E")
+        );
+        assertThat(windowedOutput, is(expectResult));
+
+        Set<String> expectResultString = new HashSet<>(expectResult.size());
+        for (KeyValue<Windowed<String>, String> eachRecord: expectResult) {
+            expectResultString.add(eachRecord.toString());
+        }
+
+        // check every message is contained in the expect result
+        String[] allRecords = resultFromConsoleConsumer.split("\n");
+        for (String record: allRecords) {
+            record = "KeyValue(" + record + ")";
+            assertTrue(expectResultString.contains(record));
+        }
     }
 
     @SuppressWarnings("deprecation")
@@ -309,34 +335,39 @@ public void shouldAggregateWindowed() throws Exception {
         produceMessages(secondTimestamp);
         produceMessages(secondTimestamp);
 
+        Serde<Windowed<String>> windowedSerde = 
WindowedSerdes.timeWindowedSerdeFrom(String.class);
         groupedStream.windowedBy(TimeWindows.of(500L))
                 .aggregate(
                         initializer,
                         aggregator,
                         Materialized.<String, Integer, WindowStore<Bytes, 
byte[]>>with(null, Serdes.Integer())
                 )
-                .toStream(new KeyValueMapper<Windowed<String>, Integer, 
String>() {
-                    @Override
-                    public String apply(final Windowed<String> windowedKey, 
final Integer value) {
-                        return windowedKey.key() + "@" + 
windowedKey.window().start();
-                    }
-                })
-                .to(outputTopic, Produced.with(Serdes.String(), 
Serdes.Integer()));
+                .toStream()
+                .to(outputTopic, Produced.with(windowedSerde, 
Serdes.Integer()));
 
         startStreams();
 
-        final List<KeyValue<String, Integer>> windowedMessages = 
receiveMessages(
-            new StringDeserializer(),
+        final List<KeyValue<Windowed<String>, Integer>> windowedMessages = 
receiveMessages(
+            new TimeWindowedDeserializer<String>(),
             new IntegerDeserializer(),
+            String.class,
             15);
 
-        final Comparator<KeyValue<String, Integer>>
+        // read from ConsoleConsumer
+        String resultFromConsoleConsumer = 
readWindowedKeyedMessagesViaConsoleConsumer(
+                new TimeWindowedDeserializer<String>(),
+                new IntegerDeserializer(),
+                String.class,
+                15);
+
+        final Comparator<KeyValue<Windowed<String>, Integer>>
             comparator =
-            new Comparator<KeyValue<String, Integer>>() {
+            new Comparator<KeyValue<Windowed<String>, Integer>>() {
                 @Override
-                public int compare(final KeyValue<String, Integer> o1,
-                                   final KeyValue<String, Integer> o2) {
-                    return KStreamAggregationIntegrationTest.compare(o1, o2);
+                public int compare(final KeyValue<Windowed<String>, Integer> 
o1,
+                                   final KeyValue<Windowed<String>, Integer> 
o2) {
+                    final int keyComparison = 
o1.key.key().compareTo(o2.key.key());
+                    return keyComparison == 0 ? o1.value.compareTo(o2.value) : 
keyComparison;
                 }
             };
 
@@ -345,24 +376,37 @@ public int compare(final KeyValue<String, Integer> o1,
         final long firstWindow = firstTimestamp / 500 * 500;
         final long secondWindow = secondTimestamp / 500 * 500;
 
-        assertThat(windowedMessages, is(
-            Arrays.asList(
-                new KeyValue<>("A@" + firstWindow, 1),
-                new KeyValue<>("A@" + secondWindow, 1),
-                new KeyValue<>("A@" + secondWindow, 2),
-                new KeyValue<>("B@" + firstWindow, 1),
-                new KeyValue<>("B@" + secondWindow, 1),
-                new KeyValue<>("B@" + secondWindow, 2),
-                new KeyValue<>("C@" + firstWindow, 1),
-                new KeyValue<>("C@" + secondWindow, 1),
-                new KeyValue<>("C@" + secondWindow, 2),
-                new KeyValue<>("D@" + firstWindow, 1),
-                new KeyValue<>("D@" + secondWindow, 1),
-                new KeyValue<>("D@" + secondWindow, 2),
-                new KeyValue<>("E@" + firstWindow, 1),
-                new KeyValue<>("E@" + secondWindow, 1),
-                new KeyValue<>("E@" + secondWindow, 2)
-            )));
+        List<KeyValue<Windowed<String>, Integer>> expectResult = Arrays.asList(
+                new KeyValue<>(new Windowed<>("A", new TimeWindow(firstWindow, 
Long.MAX_VALUE)), 1),
+                new KeyValue<>(new Windowed<>("A", new 
TimeWindow(secondWindow, Long.MAX_VALUE)), 1),
+                new KeyValue<>(new Windowed<>("A", new 
TimeWindow(secondWindow, Long.MAX_VALUE)), 2),
+                new KeyValue<>(new Windowed<>("B", new TimeWindow(firstWindow, 
Long.MAX_VALUE)), 1),
+                new KeyValue<>(new Windowed<>("B", new 
TimeWindow(secondWindow, Long.MAX_VALUE)), 1),
+                new KeyValue<>(new Windowed<>("B", new 
TimeWindow(secondWindow, Long.MAX_VALUE)), 2),
+                new KeyValue<>(new Windowed<>("C", new TimeWindow(firstWindow, 
Long.MAX_VALUE)), 1),
+                new KeyValue<>(new Windowed<>("C", new 
TimeWindow(secondWindow, Long.MAX_VALUE)), 1),
+                new KeyValue<>(new Windowed<>("C", new 
TimeWindow(secondWindow, Long.MAX_VALUE)), 2),
+                new KeyValue<>(new Windowed<>("D", new TimeWindow(firstWindow, 
Long.MAX_VALUE)), 1),
+                new KeyValue<>(new Windowed<>("D", new 
TimeWindow(secondWindow, Long.MAX_VALUE)), 1),
+                new KeyValue<>(new Windowed<>("D", new 
TimeWindow(secondWindow, Long.MAX_VALUE)), 2),
+                new KeyValue<>(new Windowed<>("E", new TimeWindow(firstWindow, 
Long.MAX_VALUE)), 1),
+                new KeyValue<>(new Windowed<>("E", new 
TimeWindow(secondWindow, Long.MAX_VALUE)), 1),
+                new KeyValue<>(new Windowed<>("E", new 
TimeWindow(secondWindow, Long.MAX_VALUE)), 2));
+
+        assertThat(windowedMessages, is(expectResult));
+
+        Set<String> expectResultString = new HashSet<>(expectResult.size());
+        for (KeyValue<Windowed<String>, Integer> eachRecord: expectResult) {
+            expectResultString.add(eachRecord.toString());
+        }
+
+        // check every message is contained in the expect result
+        String[] allRecords = resultFromConsoleConsumer.split("\n");
+        for (String record: allRecords) {
+            record = "KeyValue(" + record + ")";
+            assertTrue(expectResultString.contains(record));
+        }
+
     }
 
     private void shouldCountHelper() throws Exception {
@@ -685,26 +729,66 @@ private void startStreams() {
         kafkaStreams.start();
     }
 
-
     private <K, V> List<KeyValue<K, V>> receiveMessages(final Deserializer<K>
                                                             keyDeserializer,
                                                         final Deserializer<V>
                                                             valueDeserializer,
                                                         final int numMessages)
         throws InterruptedException {
+        return receiveMessages(keyDeserializer, valueDeserializer, null, 
numMessages);
+    }
+
+    private <K, V> List<KeyValue<K, V>> receiveMessages(final Deserializer<K>
+                                                                
keyDeserializer,
+                                                        final Deserializer<V>
+                                                                
valueDeserializer,
+                                                        final Class innerClass,
+                                                        final int numMessages) 
throws InterruptedException {
         final Properties consumerProperties = new Properties();
-        consumerProperties
-            .setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
+        
consumerProperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
         consumerProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, 
"kgroupedstream-test-" + testNo);
         
consumerProperties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest");
         
consumerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
keyDeserializer.getClass().getName());
         
consumerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
valueDeserializer.getClass().getName());
+        if (keyDeserializer instanceof TimeWindowedDeserializer || 
keyDeserializer instanceof SessionWindowedDeserializer) {
+            
consumerProperties.setProperty(StreamsConfig.DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS,
+                    Serdes.serdeFrom(innerClass).getClass().getName());
+        }
         return IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
-            consumerProperties,
-            outputTopic,
-            numMessages,
-            60 * 1000);
-
+                consumerProperties,
+                outputTopic,
+                numMessages,
+                60 * 1000);
     }
 
+    private <K, V> String readWindowedKeyedMessagesViaConsoleConsumer(final 
Deserializer<K> keyDeserializer,
+                                                  final Deserializer<V> 
valueDeserializer,
+                                                  final Class innerClass,
+                                                  final int numMessages) {
+        ByteArrayOutputStream newConsole = new ByteArrayOutputStream();
+        PrintStream originalStream = System.out;
+        try (PrintStream newStream = new PrintStream(newConsole)) {
+            System.setOut(newStream);
+
+            String keySeparator = ", ";
+            // manually construct the console consumer argument array
+            String[] args = new String[] {
+                "--bootstrap-server", CLUSTER.bootstrapServers(),
+                "--from-beginning",
+                "--property", "print.key=true",
+                "--topic", outputTopic,
+                "--max-messages", String.valueOf(numMessages),
+                "--property", "key.deserializer=" + 
keyDeserializer.getClass().getName(),
+                "--property", "value.deserializer=" + 
valueDeserializer.getClass().getName(),
+                "--property", "key.separator=" + keySeparator,
+                "--" + StreamsConfig.DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS, 
Serdes.serdeFrom(innerClass).getClass().getName()
+            };
+
+            ConsoleConsumer.messageCount_$eq(0); //reset the message count
+            ConsoleConsumer.run(new ConsoleConsumer.ConsumerConfig(args));
+            newStream.flush();
+            System.setOut(originalStream);
+            return newConsole.toString();
+        }
+    }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> NullPointerException thrown when executing ConsoleCosumer with deserializer 
> set to `WindowedDeserializer`
> ---------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-6592
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6592
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams, tools
>    Affects Versions: 1.0.0
>            Reporter: huxihx
>            Assignee: huxihx
>            Priority: Minor
>
> When reading streams app's output topic with WindowedDeserializer deserilizer 
> using kafka-console-consumer.sh, NullPointerException was thrown due to the 
> fact that the inner deserializer was not initialized since there is no place 
> in ConsoleConsumer to set this class.
> Complete stack trace is shown below:
> {code:java}
> [2018-02-26 14:56:04,736] ERROR Unknown error when running consumer:  
> (kafka.tools.ConsoleConsumer$)
> java.lang.NullPointerException
> at 
> org.apache.kafka.streams.kstream.internals.WindowedDeserializer.deserialize(WindowedDeserializer.java:89)
> at 
> org.apache.kafka.streams.kstream.internals.WindowedDeserializer.deserialize(WindowedDeserializer.java:35)
> at 
> kafka.tools.DefaultMessageFormatter.$anonfun$writeTo$2(ConsoleConsumer.scala:544)
> at scala.Option.map(Option.scala:146)
> at kafka.tools.DefaultMessageFormatter.write$1(ConsoleConsumer.scala:545)
> at kafka.tools.DefaultMessageFormatter.writeTo(ConsoleConsumer.scala:560)
> at kafka.tools.ConsoleConsumer$.process(ConsoleConsumer.scala:147)
> at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:84)
> at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:54)
> at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to