[ 
https://issues.apache.org/jira/browse/KAFKA-6592?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16394724#comment-16394724
 ] 

ASF GitHub Bot commented on KAFKA-6592:
---------------------------------------

huxihx closed pull request #4620: KAFKA-6592: ConsoleConsumer should support 
specifying inner deserializers
URL: https://github.com/apache/kafka/pull/4620
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala 
b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
index 7d2d3710157..91eaf31aa0e 100755
--- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
+++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
@@ -330,6 +330,14 @@ object ConsoleConsumer extends Logging {
       .withRequiredArg
       .describedAs("deserializer for values")
       .ofType(classOf[String])
+    val innerKeyDeserializerOpt = 
parser.accepts("key.deserializer.inner.class")
+      .withRequiredArg
+      .describedAs("inner deserializer for key")
+      .ofType(classOf[String])
+    val innerValueDeserializerOpt = 
parser.accepts("value.deserializer.inner.class")
+      .withRequiredArg
+      .describedAs("inner deserializer for values")
+      .ofType(classOf[String])
     val enableSystestEventsLoggingOpt = parser.accepts("enable-systest-events",
                                                        "Log lifecycle events 
of the consumer in addition to logging consumed " +
                                                        "messages. (This is 
specific for system tests.)")
@@ -374,6 +382,8 @@ object ConsoleConsumer extends Logging {
     val bootstrapServer = options.valueOf(bootstrapServerOpt)
     val keyDeserializer = options.valueOf(keyDeserializerOpt)
     val valueDeserializer = options.valueOf(valueDeserializerOpt)
+    val innerKeyDeserializer = options.valueOf(innerKeyDeserializerOpt)
+    val innerValueDeserializer = options.valueOf(innerValueDeserializerOpt)
     val isolationLevel = options.valueOf(isolationLevelOpt).toString
     val formatter: MessageFormatter = 
messageFormatterClass.newInstance().asInstanceOf[MessageFormatter]
 
@@ -383,6 +393,12 @@ object ConsoleConsumer extends Logging {
     if (valueDeserializer != null && !valueDeserializer.isEmpty) {
       
formatterArgs.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
valueDeserializer)
     }
+    if (innerKeyDeserializer != null && !innerKeyDeserializer.isEmpty) {
+      formatterArgs.setProperty("key.deserializer.inner.class", 
innerKeyDeserializer)
+    }
+    if (innerValueDeserializer != null && !innerValueDeserializer.isEmpty) {
+      formatterArgs.setProperty("value.deserializer.inner.class", 
innerValueDeserializer)
+    }
     formatter.init(formatterArgs)
 
     if (useOldConsumer) {
@@ -523,11 +539,19 @@ class DefaultMessageFormatter extends MessageFormatter {
     if (props.containsKey("line.separator"))
       lineSeparator = 
props.getProperty("line.separator").getBytes(StandardCharsets.UTF_8)
     // Note that `toString` will be called on the instance returned by 
`Deserializer.deserialize`
-    if (props.containsKey("key.deserializer"))
+    if (props.containsKey("key.deserializer")) {
       keyDeserializer = 
Some(Class.forName(props.getProperty("key.deserializer")).newInstance().asInstanceOf[Deserializer[_]])
+      // Instantiate the inner key class if `key.deserializer.inner.class` is 
specified
+      if (props.containsKey("key.deserializer.inner.class"))
+        keyDeserializer.get.configure(Map("key.deserializer.inner.class" -> 
props.get("key.deserializer.inner.class")).asJava, true)
+    }
     // Note that `toString` will be called on the instance returned by 
`Deserializer.deserialize`
-    if (props.containsKey("value.deserializer"))
+    if (props.containsKey("value.deserializer")) {
       valueDeserializer = 
Some(Class.forName(props.getProperty("value.deserializer")).newInstance().asInstanceOf[Deserializer[_]])
+      // Instantiate the inner value class if `value.deserializer.inner.class` 
is specified
+      if (props.containsKey("value.deserializer.inner.class"))
+        valueDeserializer.get.configure(Map("value.deserializer.inner.class" 
-> props.get("value.deserializer.inner.class")).asJava, false)
+    }
   }
 
   def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], 
output: PrintStream) {


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> NullPointerException thrown when executing ConsoleCosumer with deserializer 
> set to `WindowedDeserializer`
> ---------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-6592
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6592
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams, tools
>    Affects Versions: 1.0.0
>            Reporter: huxihx
>            Assignee: huxihx
>            Priority: Minor
>
> When reading streams app's output topic with WindowedDeserializer deserilizer 
> using kafka-console-consumer.sh, NullPointerException was thrown due to the 
> fact that the inner deserializer was not initialized since there is no place 
> in ConsoleConsumer to set this class.
> Complete stack trace is shown below:
> {code:java}
> [2018-02-26 14:56:04,736] ERROR Unknown error when running consumer:  
> (kafka.tools.ConsoleConsumer$)
> java.lang.NullPointerException
> at 
> org.apache.kafka.streams.kstream.internals.WindowedDeserializer.deserialize(WindowedDeserializer.java:89)
> at 
> org.apache.kafka.streams.kstream.internals.WindowedDeserializer.deserialize(WindowedDeserializer.java:35)
> at 
> kafka.tools.DefaultMessageFormatter.$anonfun$writeTo$2(ConsoleConsumer.scala:544)
> at scala.Option.map(Option.scala:146)
> at kafka.tools.DefaultMessageFormatter.write$1(ConsoleConsumer.scala:545)
> at kafka.tools.DefaultMessageFormatter.writeTo(ConsoleConsumer.scala:560)
> at kafka.tools.ConsoleConsumer$.process(ConsoleConsumer.scala:147)
> at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:84)
> at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:54)
> at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to