This is an automated email from the ASF dual-hosted git repository.

luoc pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git


The following commit(s) were added to refs/heads/master by this push:
     new 36ac8f3  DRILL-7940: Fix Kafka key with avro schema can not displayed 
correctly (#2242)
36ac8f3 is described below

commit 36ac8f32a318c5bcbe27a62d04910ab16389b2cf
Author: Thinking Chen <[email protected]>
AuthorDate: Fri Jun 4 09:09:31 2021 +0800

    DRILL-7940: Fix Kafka key with avro schema can not displayed correctly 
(#2242)
---
 .../store/kafka/decoders/AvroMessageReader.java    | 15 ++++++++-
 .../exec/store/kafka/KafkaMessageGenerator.java    | 36 ++++++++++++++++------
 .../drill/exec/store/kafka/KafkaQueriesTest.java   |  8 +++++
 3 files changed, 48 insertions(+), 11 deletions(-)

diff --git 
a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/AvroMessageReader.java
 
b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/AvroMessageReader.java
index 3846b92..8179226 100644
--- 
a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/AvroMessageReader.java
+++ 
b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/AvroMessageReader.java
@@ -34,6 +34,7 @@ import 
org.apache.drill.exec.store.avro.AvroColumnConverterFactory;
 import org.apache.drill.exec.store.kafka.KafkaStoragePlugin;
 import org.apache.drill.exec.store.kafka.MetaDataField;
 import org.apache.drill.exec.store.kafka.ReadOptions;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
@@ -51,6 +52,7 @@ public class AvroMessageReader implements MessageReader {
   private KafkaAvroDeserializer deserializer;
   private ColumnConverter converter;
   private ResultSetLoader loader;
+  private boolean deserializeKey;
 
   @Override
   public void init(SchemaNegotiator negotiator, ReadOptions readOptions, 
KafkaStoragePlugin plugin) {
@@ -62,6 +64,9 @@ public class AvroMessageReader implements MessageReader {
     loader = negotiator.build();
     AvroColumnConverterFactory factory = new 
AvroColumnConverterFactory(providedSchema);
     converter = factory.getRootConverter(providedSchema, new TupleSchema(), 
loader.writer());
+
+    String keyDeserializer = 
kafkaConsumerProps.getProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG);
+    deserializeKey = keyDeserializer != null && 
keyDeserializer.equals(KafkaAvroDeserializer.class.getName());
   }
 
   @Override
@@ -85,10 +90,18 @@ public class AvroMessageReader implements MessageReader {
     writeValue(rowWriter, MetaDataField.KAFKA_PARTITION_ID, 
record.partition());
     writeValue(rowWriter, MetaDataField.KAFKA_OFFSET, record.offset());
     writeValue(rowWriter, MetaDataField.KAFKA_TIMESTAMP, record.timestamp());
-    writeValue(rowWriter, MetaDataField.KAFKA_MSG_KEY, record.key() != null ? 
record.key().toString() : null);
+    writeValue(rowWriter, MetaDataField.KAFKA_MSG_KEY, record.key() != null ? 
getKeyValue((byte[]) record.key()) : null);
     rowWriter.save();
   }
 
+  private Object getKeyValue(byte[] keyValue) {
+    if (deserializeKey) {
+      return deserializer.deserialize(null, keyValue).toString();
+    } else {
+      return new String(keyValue);
+    }
+  }
+
   private <T> void writeValue(RowSetLoader rowWriter, MetaDataField 
metaDataField, T value) {
     if (rowWriter.tupleSchema().column(metaDataField.getFieldName()) == null) {
       ColumnMetadata colSchema = 
MetadataUtils.newScalar(metaDataField.getFieldName(), 
metaDataField.getFieldType(), TypeProtos.DataMode.OPTIONAL);
diff --git 
a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaMessageGenerator.java
 
b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaMessageGenerator.java
index 807c84e..f4e4730 100644
--- 
a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaMessageGenerator.java
+++ 
b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaMessageGenerator.java
@@ -25,6 +25,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 
 import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;
+import io.confluent.kafka.serializers.KafkaAvroSerializer;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericData.Record;
 import org.apache.avro.generic.GenericRecord;
@@ -67,6 +68,7 @@ public class KafkaMessageGenerator {
   }
 
   public void populateAvroMsgIntoKafka(String topic, int numMsg) {
+    producerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
KafkaAvroSerializer.class);
     try (KafkaProducer<Object, GenericRecord> producer = new 
KafkaProducer<>(producerProperties)) {
       Schema.Parser parser = new Schema.Parser();
       String userSchema = "{\"type\":\"record\"," +
@@ -79,28 +81,42 @@ public class KafkaMessageGenerator {
           
"{\"name\":\"key6\",\"type\":{\"type\":\"record\",\"name\":\"myrecord6\",\"fields\":["
 +
           "{\"name\":\"key61\",\"type\":\"double\"}," +
           "{\"name\":\"key62\",\"type\":\"double\"}]}}]}";
-      Schema schema = parser.parse(userSchema);
-      GenericRecordBuilder builder = new GenericRecordBuilder(schema);
+      Schema valueSchema = parser.parse(userSchema);
+      GenericRecordBuilder valueBuilder = new 
GenericRecordBuilder(valueSchema);
+
+      String key1Schema = "{\"type\":\"record\"," +
+              "\"name\":\"key1record\"," +
+              "\"fields\":[" +
+              "{\"name\":\"key1\",\"type\":\"string\"}]}\"";
+      Schema keySchema = parser.parse(key1Schema);
+      GenericRecordBuilder keyBuilder = new GenericRecordBuilder(keySchema);
+
       Random rand = new Random();
       for (int i = 0; i < numMsg; ++i) {
-        builder.set("key1", UUID.randomUUID().toString());
-        builder.set("key2", rand.nextInt());
-        builder.set("key3", rand.nextBoolean());
+        // value record
+        String key1 = UUID.randomUUID().toString();
+        valueBuilder.set("key1", key1);
+        valueBuilder.set("key2", rand.nextInt());
+        valueBuilder.set("key3", rand.nextBoolean());
 
         List<Integer> list = Lists.newArrayList();
         list.add(rand.nextInt(100));
         list.add(rand.nextInt(100));
         list.add(rand.nextInt(100));
-        builder.set("key5", list);
+        valueBuilder.set("key5", list);
 
-        GenericRecordBuilder innerBuilder = new 
GenericRecordBuilder(schema.getField("key6").schema());
+        GenericRecordBuilder innerBuilder = new 
GenericRecordBuilder(valueSchema.getField("key6").schema());
         innerBuilder.set("key61", rand.nextDouble());
         innerBuilder.set("key62", rand.nextDouble());
-        builder.set("key6", innerBuilder.build());
+        valueBuilder.set("key6", innerBuilder.build());
+
+        Record producerRecord = valueBuilder.build();
 
-        Record producerRecord = builder.build();
+        // key record
+        keyBuilder.set("key1", key1);
+        Record keyRecord = keyBuilder.build();
 
-        ProducerRecord<Object, GenericRecord> record = new 
ProducerRecord<>(topic, producerRecord);
+        ProducerRecord<Object, GenericRecord> record = new 
ProducerRecord<>(topic, keyRecord, producerRecord);
         producer.send(record);
       }
     }
diff --git 
a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaQueriesTest.java
 
b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaQueriesTest.java
index d690e70..e04012c 100644
--- 
a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaQueriesTest.java
+++ 
b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaQueriesTest.java
@@ -17,12 +17,14 @@
  */
 package org.apache.drill.exec.store.kafka;
 
+import io.confluent.kafka.serializers.KafkaAvroDeserializer;
 import org.apache.drill.categories.KafkaStorageTest;
 import org.apache.drill.categories.SlowTest;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.rpc.RpcException;
 import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
@@ -71,6 +73,12 @@ public class KafkaQueriesTest extends KafkaTestBase {
     try {
       client.alterSession(ExecConstants.KAFKA_RECORD_READER,
           "org.apache.drill.exec.store.kafka.decoders.AvroMessageReader");
+
+      KafkaStoragePluginConfig config = (KafkaStoragePluginConfig) 
cluster.drillbit().getContext()
+              .getStorage().getStoredConfig(KafkaStoragePluginConfig.NAME);
+      
config.getKafkaConsumerProps().put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
+              KafkaAvroDeserializer.class.getName());
+
       String queryString = String.format(TestQueryConstants.MSG_SELECT_QUERY, 
TestQueryConstants.AVRO_TOPIC);
       runKafkaSQLVerifyCount(queryString, TestKafkaSuit.NUM_JSON_MSG);
     } finally {

Reply via email to