[ 
https://issues.apache.org/jira/browse/KAFKA-6592?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16438962#comment-16438962
 ] 

ASF GitHub Bot commented on KAFKA-6592:
---------------------------------------

guozhangwang closed pull request #4864: KAFKA-6592: Follow-up
URL: https://github.com/apache/kafka/pull/4864
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/clients/src/test/java/org/apache/kafka/test/MockDeserializer.java 
b/clients/src/test/java/org/apache/kafka/test/MockDeserializer.java
index 99551f718a9..ac2865e9bb8 100644
--- a/clients/src/test/java/org/apache/kafka/test/MockDeserializer.java
+++ b/clients/src/test/java/org/apache/kafka/test/MockDeserializer.java
@@ -31,6 +31,9 @@
     public static ClusterResource noClusterId = new 
ClusterResource("no_cluster_id");
     public static AtomicReference<ClusterResource> clusterIdBeforeDeserialize 
= new AtomicReference<>(noClusterId);
 
+    public boolean isKey;
+    public Map<String, ?> configs;
+
     public static void resetStaticVariables() {
         initCount = new AtomicInteger(0);
         closeCount = new AtomicInteger(0);
@@ -44,6 +47,8 @@ public MockDeserializer() {
 
     @Override
     public void configure(Map<String, ?> configs, boolean isKey) {
+        this.configs = configs;
+        this.isKey = isKey;
     }
 
     @Override
diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala 
b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
index 9df4fb42070..5139324ec17 100755
--- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
+++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
@@ -19,8 +19,9 @@ package kafka.tools
 
 import java.io.PrintStream
 import java.nio.charset.StandardCharsets
+import java.util
 import java.util.concurrent.CountDownLatch
-import java.util.{Locale, Properties, Random}
+import java.util.{Locale, Map, Properties, Random}
 
 import com.typesafe.scalalogging.LazyLogging
 import joptsimple._
@@ -46,11 +47,6 @@ import scala.collection.JavaConverters._
 object ConsoleConsumer extends Logging {
 
   var messageCount = 0
-  // Keep same names with StreamConfig.DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS
-  // and StreamConfig.DEFAULT_WINDOWED_VALUE_SERDE_INNER_CLASS
-  // visible for testing
-  private[tools] val innerKeySerdeName = "default.windowed.key.serde.inner"
-  private[tools] val innerValueSerdeName = "default.windowed.value.serde.inner"
 
   private val shutdownLatch = new CountDownLatch(1)
 
@@ -306,8 +302,8 @@ object ConsoleConsumer extends Logging {
         "\tline.separator=<line.separator>\n" +
         "\tkey.deserializer=<key.deserializer>\n" +
         "\tvalue.deserializer=<value.deserializer>\n" +
-        "\tdefault.windowed.key.serde.inner=<windowed.key.serde.inner>\n" +
-        "\tdefault.windowed.value.serde.inner=<windowed.value.serde.inner>")
+        "\nUsers can also pass in customized properties for their formatter; 
more specifically, users " +
+        "can pass in properties keyed with \'key.deserializer.\' and 
\'value.deserializer.\' prefixes to configure their deserializers.")
       .withRequiredArg
       .describedAs("prop")
       .ofType(classOf[String])
@@ -344,18 +340,6 @@ object ConsoleConsumer extends Logging {
       .withRequiredArg
       .describedAs("deserializer for values")
       .ofType(classOf[String])
-    val innerKeyDeserializerOpt = parser.accepts(innerKeySerdeName,
-      "inner serde for key when windowed deserialzier is used; would be 
ignored otherwise. " +
-        "For example: 
org.apache.kafka.common.serialization.Serdes\\$StringSerde")
-      .withRequiredArg
-      .describedAs("inner serde for key")
-      .ofType(classOf[String])
-    val innerValueDeserializerOpt = parser.accepts(innerValueSerdeName,
-      "inner serde for value when windowed deserialzier is used; would be 
ignored otherwise. " +
-        "For example: 
org.apache.kafka.common.serialization.Serdes\\$StringSerde")
-      .withRequiredArg
-      .describedAs("inner serde for values")
-      .ofType(classOf[String])
     val enableSystestEventsLoggingOpt = parser.accepts("enable-systest-events",
                                                        "Log lifecycle events 
of the consumer in addition to logging consumed " +
                                                        "messages. (This is 
specific for system tests.)")
@@ -400,8 +384,6 @@ object ConsoleConsumer extends Logging {
     val bootstrapServer = options.valueOf(bootstrapServerOpt)
     val keyDeserializer = options.valueOf(keyDeserializerOpt)
     val valueDeserializer = options.valueOf(valueDeserializerOpt)
-    val innerKeyDeserializer = options.valueOf(innerKeyDeserializerOpt)
-    val innerValueDeserializer = options.valueOf(innerValueDeserializerOpt)
     val isolationLevel = options.valueOf(isolationLevelOpt).toString
     val formatter: MessageFormatter = 
messageFormatterClass.newInstance().asInstanceOf[MessageFormatter]
 
@@ -411,12 +393,6 @@ object ConsoleConsumer extends Logging {
     if (valueDeserializer != null && !valueDeserializer.isEmpty) {
       
formatterArgs.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
valueDeserializer)
     }
-    if (innerKeyDeserializer != null && !innerKeyDeserializer.isEmpty) {
-      formatterArgs.setProperty(innerKeySerdeName, innerKeyDeserializer)
-    }
-    if (innerValueDeserializer != null && !innerValueDeserializer.isEmpty) {
-      formatterArgs.setProperty(innerValueSerdeName, innerValueDeserializer)
-    }
 
     formatter.init(formatterArgs)
 
@@ -560,15 +536,29 @@ class DefaultMessageFormatter extends MessageFormatter {
     // Note that `toString` will be called on the instance returned by 
`Deserializer.deserialize`
     if (props.containsKey("key.deserializer")) {
       keyDeserializer = 
Some(Class.forName(props.getProperty("key.deserializer")).newInstance().asInstanceOf[Deserializer[_]])
-      
keyDeserializer.get.configure(JavaConversions.propertiesAsScalaMap(props).asJava,
 true)
+      
keyDeserializer.get.configure(JavaConversions.propertiesAsScalaMap(stripWithPrefix("key.deserializer.",
 props)).asJava, true)
     }
     // Note that `toString` will be called on the instance returned by 
`Deserializer.deserialize`
     if (props.containsKey("value.deserializer")) {
       valueDeserializer = 
Some(Class.forName(props.getProperty("value.deserializer")).newInstance().asInstanceOf[Deserializer[_]])
-      
valueDeserializer.get.configure(JavaConversions.propertiesAsScalaMap(props).asJava,
 false)
+      
valueDeserializer.get.configure(JavaConversions.propertiesAsScalaMap(stripWithPrefix("value.deserializer.",
 props)).asJava, false)
     }
   }
 
+  def stripWithPrefix(prefix: String, props: Properties): Properties = {
+    val newProps = new Properties()
+    import scala.collection.JavaConversions._
+    for (entry <- props) {
+      val key: String = entry._1
+      val value: String = entry._2
+
+      if (key.startsWith(prefix) && key.length > prefix.length)
+        newProps.put(key.substring(prefix.length), value)
+    }
+
+    newProps
+  }
+
   def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], 
output: PrintStream) {
 
     def writeSeparator(columnSeparator: Boolean): Unit = {
diff --git a/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala 
b/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala
index f5195c3cf76..6f465557d7b 100644
--- a/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala
+++ b/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala
@@ -25,7 +25,7 @@ import kafka.utils.{Exit, TestUtils}
 import org.apache.kafka.clients.consumer.{ConsumerRecord, MockConsumer, 
OffsetResetStrategy}
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.clients.consumer.ConsumerConfig
-import org.apache.kafka.common.serialization.ByteArrayDeserializer
+import org.apache.kafka.test.MockDeserializer
 import org.easymock.EasyMock
 import org.junit.Assert._
 import org.junit.{Before, Test}
@@ -545,15 +545,17 @@ class ConsoleConsumerTest {
       "--bootstrap-server", "localhost:9092",
       "--topic", "test",
       "--property", "print.key=true",
-      "--property", 
"key.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer",
-      "--" + ConsoleConsumer.innerKeySerdeName, 
"org.apache.kafka.common.serialization.Serdes$StringSerde",
-      "--property", "my-test1=abc"
+      "--property", "key.deserializer=org.apache.kafka.test.MockDeserializer",
+      "--property", "key.deserializer.my-props=abc"
     )
     val config = new ConsoleConsumer.ConsumerConfig(args)
     assertTrue(config.formatter.isInstanceOf[DefaultMessageFormatter])
+    assertTrue(config.formatterArgs.containsKey("key.deserializer.my-props"))
     val formatter = config.formatter.asInstanceOf[DefaultMessageFormatter]
-    
assertTrue(formatter.keyDeserializer.get.isInstanceOf[ByteArrayDeserializer])
-    assertTrue(config.formatterArgs.containsKey("my-test1"))
-    
assertTrue(config.formatterArgs.containsKey(ConsoleConsumer.innerKeySerdeName))
+    assertTrue(formatter.keyDeserializer.get.isInstanceOf[MockDeserializer])
+    assertEquals(1, 
formatter.keyDeserializer.get.asInstanceOf[MockDeserializer].configs.size)
+    assertEquals("abc", 
formatter.keyDeserializer.get.asInstanceOf[MockDeserializer].configs.get("my-props"))
+    
assertTrue(formatter.keyDeserializer.get.asInstanceOf[MockDeserializer].isKey)
   }
+
 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
index fc673d0258e..52b9ee80915 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
@@ -781,7 +781,7 @@ private void startStreams() {
                 "--property", "key.deserializer=" + 
keyDeserializer.getClass().getName(),
                 "--property", "value.deserializer=" + 
valueDeserializer.getClass().getName(),
                 "--property", "key.separator=" + keySeparator,
-                "--" + StreamsConfig.DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS, 
Serdes.serdeFrom(innerClass).getClass().getName()
+                "--property", "key.deserializer." + 
StreamsConfig.DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS + "=" + 
Serdes.serdeFrom(innerClass).getClass().getName()
             };
 
             ConsoleConsumer.messageCount_$eq(0); //reset the message count


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> NullPointerException thrown when executing ConsoleCosumer with deserializer 
> set to `WindowedDeserializer`
> ---------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-6592
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6592
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams, tools
>    Affects Versions: 1.0.0
>            Reporter: huxihx
>            Assignee: huxihx
>            Priority: Minor
>
> When reading streams app's output topic with WindowedDeserializer deserilizer 
> using kafka-console-consumer.sh, NullPointerException was thrown due to the 
> fact that the inner deserializer was not initialized since there is no place 
> in ConsoleConsumer to set this class.
> Complete stack trace is shown below:
> {code:java}
> [2018-02-26 14:56:04,736] ERROR Unknown error when running consumer:  
> (kafka.tools.ConsoleConsumer$)
> java.lang.NullPointerException
> at 
> org.apache.kafka.streams.kstream.internals.WindowedDeserializer.deserialize(WindowedDeserializer.java:89)
> at 
> org.apache.kafka.streams.kstream.internals.WindowedDeserializer.deserialize(WindowedDeserializer.java:35)
> at 
> kafka.tools.DefaultMessageFormatter.$anonfun$writeTo$2(ConsoleConsumer.scala:544)
> at scala.Option.map(Option.scala:146)
> at kafka.tools.DefaultMessageFormatter.write$1(ConsoleConsumer.scala:545)
> at kafka.tools.DefaultMessageFormatter.writeTo(ConsoleConsumer.scala:560)
> at kafka.tools.ConsoleConsumer$.process(ConsoleConsumer.scala:147)
> at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:84)
> at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:54)
> at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to