This is an automated email from the ASF dual-hosted git repository.
luoc pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git
The following commit(s) were added to refs/heads/master by this push:
new 36ac8f3 DRILL-7940: Fix Kafka key with avro schema can not displayed
correctly (#2242)
36ac8f3 is described below
commit 36ac8f32a318c5bcbe27a62d04910ab16389b2cf
Author: Thinking Chen <[email protected]>
AuthorDate: Fri Jun 4 09:09:31 2021 +0800
DRILL-7940: Fix Kafka key with avro schema can not displayed correctly
(#2242)
---
.../store/kafka/decoders/AvroMessageReader.java | 15 ++++++++-
.../exec/store/kafka/KafkaMessageGenerator.java | 36 ++++++++++++++++------
.../drill/exec/store/kafka/KafkaQueriesTest.java | 8 +++++
3 files changed, 48 insertions(+), 11 deletions(-)
diff --git
a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/AvroMessageReader.java
b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/AvroMessageReader.java
index 3846b92..8179226 100644
---
a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/AvroMessageReader.java
+++
b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/AvroMessageReader.java
@@ -34,6 +34,7 @@ import
org.apache.drill.exec.store.avro.AvroColumnConverterFactory;
import org.apache.drill.exec.store.kafka.KafkaStoragePlugin;
import org.apache.drill.exec.store.kafka.MetaDataField;
import org.apache.drill.exec.store.kafka.ReadOptions;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
@@ -51,6 +52,7 @@ public class AvroMessageReader implements MessageReader {
private KafkaAvroDeserializer deserializer;
private ColumnConverter converter;
private ResultSetLoader loader;
+ private boolean deserializeKey;
@Override
public void init(SchemaNegotiator negotiator, ReadOptions readOptions,
KafkaStoragePlugin plugin) {
@@ -62,6 +64,9 @@ public class AvroMessageReader implements MessageReader {
loader = negotiator.build();
AvroColumnConverterFactory factory = new
AvroColumnConverterFactory(providedSchema);
converter = factory.getRootConverter(providedSchema, new TupleSchema(),
loader.writer());
+
+ String keyDeserializer =
kafkaConsumerProps.getProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG);
+ deserializeKey = keyDeserializer != null &&
keyDeserializer.equals(KafkaAvroDeserializer.class.getName());
}
@Override
@@ -85,10 +90,18 @@ public class AvroMessageReader implements MessageReader {
writeValue(rowWriter, MetaDataField.KAFKA_PARTITION_ID,
record.partition());
writeValue(rowWriter, MetaDataField.KAFKA_OFFSET, record.offset());
writeValue(rowWriter, MetaDataField.KAFKA_TIMESTAMP, record.timestamp());
- writeValue(rowWriter, MetaDataField.KAFKA_MSG_KEY, record.key() != null ?
record.key().toString() : null);
+ writeValue(rowWriter, MetaDataField.KAFKA_MSG_KEY, record.key() != null ?
getKeyValue((byte[]) record.key()) : null);
rowWriter.save();
}
+ private Object getKeyValue(byte[] keyValue) {
+ if (deserializeKey) {
+ return deserializer.deserialize(null, keyValue).toString();
+ } else {
+ return new String(keyValue);
+ }
+ }
+
private <T> void writeValue(RowSetLoader rowWriter, MetaDataField
metaDataField, T value) {
if (rowWriter.tupleSchema().column(metaDataField.getFieldName()) == null) {
ColumnMetadata colSchema =
MetadataUtils.newScalar(metaDataField.getFieldName(),
metaDataField.getFieldType(), TypeProtos.DataMode.OPTIONAL);
diff --git
a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaMessageGenerator.java
b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaMessageGenerator.java
index 807c84e..f4e4730 100644
---
a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaMessageGenerator.java
+++
b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaMessageGenerator.java
@@ -25,6 +25,7 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;
+import io.confluent.kafka.serializers.KafkaAvroSerializer;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData.Record;
import org.apache.avro.generic.GenericRecord;
@@ -67,6 +68,7 @@ public class KafkaMessageGenerator {
}
public void populateAvroMsgIntoKafka(String topic, int numMsg) {
+ producerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
KafkaAvroSerializer.class);
try (KafkaProducer<Object, GenericRecord> producer = new
KafkaProducer<>(producerProperties)) {
Schema.Parser parser = new Schema.Parser();
String userSchema = "{\"type\":\"record\"," +
@@ -79,28 +81,42 @@ public class KafkaMessageGenerator {
"{\"name\":\"key6\",\"type\":{\"type\":\"record\",\"name\":\"myrecord6\",\"fields\":["
+
"{\"name\":\"key61\",\"type\":\"double\"}," +
"{\"name\":\"key62\",\"type\":\"double\"}]}}]}";
- Schema schema = parser.parse(userSchema);
- GenericRecordBuilder builder = new GenericRecordBuilder(schema);
+ Schema valueSchema = parser.parse(userSchema);
+ GenericRecordBuilder valueBuilder = new
GenericRecordBuilder(valueSchema);
+
+ String key1Schema = "{\"type\":\"record\"," +
+ "\"name\":\"key1record\"," +
+ "\"fields\":[" +
+ "{\"name\":\"key1\",\"type\":\"string\"}]}\"";
+ Schema keySchema = parser.parse(key1Schema);
+ GenericRecordBuilder keyBuilder = new GenericRecordBuilder(keySchema);
+
Random rand = new Random();
for (int i = 0; i < numMsg; ++i) {
- builder.set("key1", UUID.randomUUID().toString());
- builder.set("key2", rand.nextInt());
- builder.set("key3", rand.nextBoolean());
+ // value record
+ String key1 = UUID.randomUUID().toString();
+ valueBuilder.set("key1", key1);
+ valueBuilder.set("key2", rand.nextInt());
+ valueBuilder.set("key3", rand.nextBoolean());
List<Integer> list = Lists.newArrayList();
list.add(rand.nextInt(100));
list.add(rand.nextInt(100));
list.add(rand.nextInt(100));
- builder.set("key5", list);
+ valueBuilder.set("key5", list);
- GenericRecordBuilder innerBuilder = new
GenericRecordBuilder(schema.getField("key6").schema());
+ GenericRecordBuilder innerBuilder = new
GenericRecordBuilder(valueSchema.getField("key6").schema());
innerBuilder.set("key61", rand.nextDouble());
innerBuilder.set("key62", rand.nextDouble());
- builder.set("key6", innerBuilder.build());
+ valueBuilder.set("key6", innerBuilder.build());
+
+ Record producerRecord = valueBuilder.build();
- Record producerRecord = builder.build();
+ // key record
+ keyBuilder.set("key1", key1);
+ Record keyRecord = keyBuilder.build();
- ProducerRecord<Object, GenericRecord> record = new
ProducerRecord<>(topic, producerRecord);
+ ProducerRecord<Object, GenericRecord> record = new
ProducerRecord<>(topic, keyRecord, producerRecord);
producer.send(record);
}
}
diff --git
a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaQueriesTest.java
b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaQueriesTest.java
index d690e70..e04012c 100644
---
a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaQueriesTest.java
+++
b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaQueriesTest.java
@@ -17,12 +17,14 @@
*/
package org.apache.drill.exec.store.kafka;
+import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import org.apache.drill.categories.KafkaStorageTest;
import org.apache.drill.categories.SlowTest;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.rpc.RpcException;
import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
@@ -71,6 +73,12 @@ public class KafkaQueriesTest extends KafkaTestBase {
try {
client.alterSession(ExecConstants.KAFKA_RECORD_READER,
"org.apache.drill.exec.store.kafka.decoders.AvroMessageReader");
+
+ KafkaStoragePluginConfig config = (KafkaStoragePluginConfig)
cluster.drillbit().getContext()
+ .getStorage().getStoredConfig(KafkaStoragePluginConfig.NAME);
+
config.getKafkaConsumerProps().put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
+ KafkaAvroDeserializer.class.getName());
+
String queryString = String.format(TestQueryConstants.MSG_SELECT_QUERY,
TestQueryConstants.AVRO_TOPIC);
runKafkaSQLVerifyCount(queryString, TestKafkaSuit.NUM_JSON_MSG);
} finally {