This is an automated email from the ASF dual-hosted git repository.

volodymyr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git


The following commit(s) were added to refs/heads/master by this push:
     new f73496d  DRILL-5940: Add support for Avro messages with schema 
registry for Kafka (#2239)
f73496d is described below

commit f73496dfb6228c75f745cb670da17b24ce7bef34
Author: Volodymyr Vysotskyi <[email protected]>
AuthorDate: Fri May 28 09:24:40 2021 +0300

    DRILL-5940: Add support for Avro messages with schema registry for Kafka 
(#2239)
---
 contrib/storage-kafka/pom.xml                      |  21 +++
 .../drill/exec/store/kafka/KafkaRecordReader.java  | 132 ++++++--------
 .../exec/store/kafka/KafkaScanBatchCreator.java    |  44 +++--
 .../drill/exec/store/kafka/MetaDataField.java      |  20 ++-
 .../store/kafka/decoders/AvroMessageReader.java    | 125 ++++++++++++++
 .../store/kafka/decoders/JsonMessageReader.java    | 192 ++++++++++++---------
 .../KafkaJsonLoader.java}                          |  30 ++--
 .../exec/store/kafka/decoders/MessageReader.java   |  19 +-
 .../exec/store/kafka/KafkaMessageGenerator.java    |  37 ++--
 .../drill/exec/store/kafka/KafkaQueriesTest.java   |  25 +++
 .../drill/exec/store/kafka/KafkaTestBase.java      |   4 +
 .../drill/exec/store/kafka/TestKafkaSuit.java      |   4 +
 .../store/easy/json/loader/JsonLoaderImpl.java     |  25 +--
 .../easy/json/parser/JsonStructureOptions.java     |   7 +
 .../easy/json/parser/JsonStructureParser.java      |  76 ++++----
 .../exec/store/easy/json/parser/TokenIterator.java |  97 +++++++++--
 exec/jdbc-all/pom.xml                              |   2 +
 pom.xml                                            |  10 ++
 18 files changed, 590 insertions(+), 280 deletions(-)

diff --git a/contrib/storage-kafka/pom.xml b/contrib/storage-kafka/pom.xml
index d626d40..a9008b0 100644
--- a/contrib/storage-kafka/pom.xml
+++ b/contrib/storage-kafka/pom.xml
@@ -35,6 +35,21 @@
     <kafka.TestSuite>**/TestKafkaSuit.class</kafka.TestSuite>
   </properties>
 
+  <repositories>
+    <repository>
+      <id>confluent</id>
+      <name>Confluent</name>
+      <url>https://packages.confluent.io/maven/</url>
+      <releases>
+        <enabled>true</enabled>
+        <checksumPolicy>fail</checksumPolicy>
+      </releases>
+      <snapshots>
+        <enabled>false</enabled>
+      </snapshots>
+    </repository>
+  </repositories>
+
   <dependencies>
     <dependency>
       <groupId>org.apache.drill.exec</groupId>
@@ -78,6 +93,12 @@
       </exclusions>
     </dependency>
 
+    <dependency>
+      <groupId>io.confluent</groupId>
+      <artifactId>kafka-avro-serializer</artifactId>
+      <version>6.1.1</version>
+    </dependency>
+
     <!-- Test dependencies -->
     <dependency>
       <groupId>org.apache.drill.exec</groupId>
diff --git 
a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaRecordReader.java
 
b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaRecordReader.java
index 5218c3b..551b62f 100644
--- 
a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaRecordReader.java
+++ 
b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaRecordReader.java
@@ -17,70 +17,57 @@
  */
 package org.apache.drill.exec.store.kafka;
 
-import java.io.IOException;
-import java.util.Collection;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-
+import org.apache.drill.common.PlanStringBuilder;
+import org.apache.drill.common.exceptions.ChildErrorContext;
+import org.apache.drill.common.exceptions.CustomErrorContext;
 import org.apache.drill.common.exceptions.UserException;
-import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.ops.OperatorContext;
-import org.apache.drill.exec.physical.impl.OutputMutator;
-import org.apache.drill.exec.store.AbstractRecordReader;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.framework.SchemaNegotiator;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.server.options.OptionManager;
 import org.apache.drill.exec.store.kafka.decoders.MessageReader;
 import org.apache.drill.exec.store.kafka.decoders.MessageReaderFactory;
-import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
-import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+import java.io.IOException;
 
-public class KafkaRecordReader extends AbstractRecordReader {
+public class KafkaRecordReader implements ManagedReader<SchemaNegotiator> {
   private static final Logger logger = 
LoggerFactory.getLogger(KafkaRecordReader.class);
 
-  private static final long DEFAULT_MESSAGES_PER_BATCH = 4000;
-
   private final ReadOptions readOptions;
   private final KafkaStoragePlugin plugin;
   private final KafkaPartitionScanSpec subScanSpec;
+  private final int maxRecords;
 
-  private VectorContainerWriter writer;
   private MessageReader messageReader;
-
   private long currentOffset;
   private MessageIterator msgItr;
-  private int currentMessageCount;
 
-  public KafkaRecordReader(KafkaPartitionScanSpec subScanSpec, 
List<SchemaPath> projectedColumns,
-      FragmentContext context, KafkaStoragePlugin plugin) {
-    setColumns(projectedColumns);
-    this.readOptions = new ReadOptions(context.getOptions());
+  public KafkaRecordReader(KafkaPartitionScanSpec subScanSpec, OptionManager 
options, KafkaStoragePlugin plugin, int maxRecords) {
+    this.readOptions = new ReadOptions(options);
     this.plugin = plugin;
     this.subScanSpec = subScanSpec;
+    this.maxRecords = maxRecords;
   }
 
   @Override
-  protected Collection<SchemaPath> transformColumns(Collection<SchemaPath> 
projectedColumns) {
-    Set<SchemaPath> transformed = new LinkedHashSet<>();
-    if (isStarQuery()) {
-      transformed.add(SchemaPath.STAR_COLUMN);
-    } else {
-      transformed.addAll(projectedColumns);
-    }
-    return transformed;
-  }
+  public boolean open(SchemaNegotiator negotiator) {
+    CustomErrorContext errorContext = new 
ChildErrorContext(negotiator.parentErrorContext()) {
+      @Override
+      public void addContext(UserException.Builder builder) {
+        super.addContext(builder);
+        builder.addContext("topic_name", subScanSpec.getTopicName());
+      }
+    };
+    negotiator.setErrorContext(errorContext);
 
-  @Override
-  public void setup(OperatorContext context, OutputMutator output) {
-    this.writer = new VectorContainerWriter(output, 
readOptions.isEnableUnionType());
     messageReader = 
MessageReaderFactory.getMessageReader(readOptions.getMessageReader());
-    messageReader.init(context.getManagedBuffer(), 
Lists.newArrayList(getColumns()), writer, readOptions);
+    messageReader.init(negotiator, readOptions, plugin);
     msgItr = new MessageIterator(messageReader.getConsumer(plugin), 
subScanSpec, readOptions.getPollTimeOut());
+
+    return true;
   }
 
   /**
@@ -88,60 +75,49 @@ public class KafkaRecordReader extends AbstractRecordReader 
{
    * take care of polling multiple times for this given batch next invocation
    */
   @Override
-  public int next() {
-    writer.allocate();
-    writer.reset();
-    Stopwatch watch = logger.isDebugEnabled() ? Stopwatch.createStarted() : 
null;
-    currentMessageCount = 0;
-
-    try {
-      while (currentOffset < subScanSpec.getEndOffset() && msgItr.hasNext()) {
-        ConsumerRecord<byte[], byte[]> consumerRecord = msgItr.next();
-        currentOffset = consumerRecord.offset();
-        writer.setPosition(currentMessageCount);
-        boolean status = messageReader.readMessage(consumerRecord);
-        // increment record count only if message was read successfully
-        if (status) {
-          if (++currentMessageCount >= DEFAULT_MESSAGES_PER_BATCH) {
-            break;
-          }
-        }
+  public boolean next() {
+    RowSetLoader rowWriter = messageReader.getResultSetLoader().writer();
+    while (!rowWriter.isFull()) {
+      if (!nextLine(rowWriter)) {
+        return false;
       }
+    }
+    return messageReader.endBatch();
+  }
 
-      if (currentMessageCount > 0) {
-        messageReader.ensureAtLeastOneField();
-      }
-      writer.setValueCount(currentMessageCount);
-      if (watch != null) {
-        logger.debug("Took {} ms to process {} records.", 
watch.elapsed(TimeUnit.MILLISECONDS), currentMessageCount);
-      }
-      logger.debug("Last offset consumed for {}:{} is {}", 
subScanSpec.getTopicName(), subScanSpec.getPartitionId(),
-          currentOffset);
-      return currentMessageCount;
-    } catch (Exception e) {
-      String msg = "Failure while reading messages from kafka. Record reader 
was at record: " + (currentMessageCount + 1);
-      throw UserException.dataReadError(e)
-        .message(msg)
-        .addContext(e.getMessage())
-        .build(logger);
+  private boolean nextLine(RowSetLoader rowWriter) {
+    if (rowWriter.limitReached(maxRecords)) {
+      return false;
+    }
+
+    if (currentOffset >= subScanSpec.getEndOffset() || !msgItr.hasNext()) {
+      return false;
     }
+    ConsumerRecord<byte[], byte[]> consumerRecord = msgItr.next();
+    currentOffset = consumerRecord.offset();
+    messageReader.readMessage(consumerRecord);
+    return true;
   }
 
   @Override
-  public void close() throws IOException {
+  public void close() {
     logger.debug("Last offset processed for {}:{} is - {}", 
subScanSpec.getTopicName(), subScanSpec.getPartitionId(),
         currentOffset);
     logger.debug("Total time to fetch messages from {}:{} is - {} 
milliseconds", subScanSpec.getTopicName(),
         subScanSpec.getPartitionId(), msgItr.getTotalFetchTime());
     plugin.registerToClose(msgItr);
-    messageReader.close();
+    try {
+      messageReader.close();
+    } catch (IOException e) {
+      logger.warn("Error closing Kafka message reader: {}", e.getMessage(), e);
+    }
   }
 
   @Override
   public String toString() {
-    return "KafkaRecordReader[readOptions=" + readOptions
-        + ", currentOffset=" + currentOffset
-        + ", currentMessageCount=" + currentMessageCount
-        + "]";
+    return new PlanStringBuilder(this)
+        .field("readOptions", readOptions)
+        .field("currentOffset", currentOffset)
+        .toString();
   }
 }
diff --git 
a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaScanBatchCreator.java
 
b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaScanBatchCreator.java
index 8083e33..7d2ebcc 100644
--- 
a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaScanBatchCreator.java
+++ 
b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaScanBatchCreator.java
@@ -21,33 +21,51 @@ import java.util.List;
 import java.util.stream.Collectors;
 
 import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.Types;
 import org.apache.drill.exec.ops.ExecutorFragmentContext;
-import org.apache.drill.exec.physical.base.GroupScan;
 import org.apache.drill.exec.physical.impl.BatchCreator;
-import org.apache.drill.exec.physical.impl.ScanBatch;
+import org.apache.drill.exec.physical.impl.scan.framework.BasicScanFactory;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedScanFramework;
+import org.apache.drill.exec.physical.impl.scan.framework.SchemaNegotiator;
 import org.apache.drill.exec.record.CloseableRecordBatch;
 import org.apache.drill.exec.record.RecordBatch;
-import org.apache.drill.exec.store.RecordReader;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.drill.exec.server.options.OptionManager;
 
 import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
 
 public class KafkaScanBatchCreator implements BatchCreator<KafkaSubScan> {
-  private static final Logger logger = 
LoggerFactory.getLogger(KafkaScanBatchCreator.class);
 
   @Override
   public CloseableRecordBatch getBatch(ExecutorFragmentContext context, 
KafkaSubScan subScan, List<RecordBatch> children)
       throws ExecutionSetupException {
     Preconditions.checkArgument(children.isEmpty());
-    List<SchemaPath> columns = subScan.getColumns() != null ? 
subScan.getColumns() : GroupScan.ALL_COLUMNS;
+    try {
+      ManagedScanFramework.ScanFrameworkBuilder builder = 
createBuilder(context.getOptions(), subScan);
+      return builder.buildScanOperator(context, subScan);
+    } catch (UserException e) {
+      // Rethrow user exceptions directly
+      throw e;
+    } catch (Throwable e) {
+      // Wrap all others
+      throw new ExecutionSetupException(e);
+    }
+  }
 
-    List<RecordReader> readers = subScan.getPartitionSubScanSpecList().stream()
-      .map(scanSpec -> new KafkaRecordReader(scanSpec, columns, context, 
subScan.getKafkaStoragePlugin()))
-      .collect(Collectors.toList());
+  private ManagedScanFramework.ScanFrameworkBuilder 
createBuilder(OptionManager options,
+      KafkaSubScan subScan) {
+    ManagedScanFramework.ScanFrameworkBuilder builder = new 
ManagedScanFramework.ScanFrameworkBuilder();
+    builder.projection(subScan.getColumns());
+    builder.setUserName(subScan.getUserName());
 
-    logger.debug("Number of record readers initialized : {}", readers.size());
-    return new ScanBatch(subScan, context, readers);
+    List<ManagedReader<SchemaNegotiator>> readers = 
subScan.getPartitionSubScanSpecList().stream()
+        .map(scanSpec -> new KafkaRecordReader(scanSpec, options, 
subScan.getKafkaStoragePlugin(), -1))
+        .collect(Collectors.toList());
+    ManagedScanFramework.ReaderFactory readerFactory = new 
BasicScanFactory(readers.iterator());
+    builder.setReaderFactory(readerFactory);
+    builder.nullType(Types.optional(TypeProtos.MinorType.VARCHAR));
+    return builder;
   }
 }
diff --git 
a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/MetaDataField.java
 
b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/MetaDataField.java
index af3e163..89c8d79 100644
--- 
a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/MetaDataField.java
+++ 
b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/MetaDataField.java
@@ -17,25 +17,33 @@
  */
 package org.apache.drill.exec.store.kafka;
 
+import org.apache.drill.common.types.TypeProtos;
+
 /**
  * MetaData fields provide additional information about each message.
  * It is expected that one should not modify the fieldName of each constant as 
it breaks the compatibility.
  */
 public enum MetaDataField {
 
-  KAFKA_TOPIC("kafkaTopic"),
-  KAFKA_PARTITION_ID("kafkaPartitionId"),
-  KAFKA_OFFSET("kafkaMsgOffset"),
-  KAFKA_TIMESTAMP("kafkaMsgTimestamp"),
-  KAFKA_MSG_KEY("kafkaMsgKey");
+  KAFKA_TOPIC("kafkaTopic", TypeProtos.MinorType.VARCHAR),
+  KAFKA_PARTITION_ID("kafkaPartitionId", TypeProtos.MinorType.BIGINT),
+  KAFKA_OFFSET("kafkaMsgOffset", TypeProtos.MinorType.BIGINT),
+  KAFKA_TIMESTAMP("kafkaMsgTimestamp", TypeProtos.MinorType.BIGINT),
+  KAFKA_MSG_KEY("kafkaMsgKey", TypeProtos.MinorType.VARCHAR);
 
   private final String fieldName;
+  private final TypeProtos.MinorType fieldType;
 
-  MetaDataField(final String fieldName) {
+  MetaDataField(final String fieldName, TypeProtos.MinorType type) {
     this.fieldName = fieldName;
+    this.fieldType = type;
   }
 
   public String getFieldName() {
     return fieldName;
   }
+
+  public TypeProtos.MinorType getFieldType() {
+    return fieldType;
+  }
 }
diff --git 
a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/AvroMessageReader.java
 
b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/AvroMessageReader.java
new file mode 100644
index 0000000..3846b92
--- /dev/null
+++ 
b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/AvroMessageReader.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kafka.decoders;
+
+import io.confluent.kafka.serializers.KafkaAvroDeserializer;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.exec.physical.impl.scan.framework.SchemaNegotiator;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.ColumnConverter;
+import org.apache.drill.exec.record.metadata.ColumnMetadata;
+import org.apache.drill.exec.record.metadata.MetadataUtils;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.record.metadata.TupleSchema;
+import org.apache.drill.exec.store.avro.AvroColumnConverterFactory;
+import org.apache.drill.exec.store.kafka.KafkaStoragePlugin;
+import org.apache.drill.exec.store.kafka.MetaDataField;
+import org.apache.drill.exec.store.kafka.ReadOptions;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Properties;
+import java.util.stream.Collectors;
+
+public class AvroMessageReader implements MessageReader {
+  private static final Logger logger = 
LoggerFactory.getLogger(AvroMessageReader.class);
+
+  private KafkaAvroDeserializer deserializer;
+  private ColumnConverter converter;
+  private ResultSetLoader loader;
+
+  @Override
+  public void init(SchemaNegotiator negotiator, ReadOptions readOptions, 
KafkaStoragePlugin plugin) {
+    Properties kafkaConsumerProps = plugin.getConfig().getKafkaConsumerProps();
+    Map<String, Object> propertiesMap = kafkaConsumerProps.entrySet().stream()
+        .collect(Collectors.toMap(e -> e.getKey().toString(), 
Map.Entry::getValue));
+    deserializer = new KafkaAvroDeserializer(null, propertiesMap);
+    TupleMetadata providedSchema = negotiator.providedSchema();
+    loader = negotiator.build();
+    AvroColumnConverterFactory factory = new 
AvroColumnConverterFactory(providedSchema);
+    converter = factory.getRootConverter(providedSchema, new TupleSchema(), 
loader.writer());
+  }
+
+  @Override
+  public void readMessage(ConsumerRecord<?, ?> record) {
+    RowSetLoader rowWriter = loader.writer();
+    byte[] recordArray = (byte[]) record.value();
+    GenericRecord genericRecord = (GenericRecord) 
deserializer.deserialize(null, recordArray);
+
+    Schema schema = genericRecord.getSchema();
+
+    if (Schema.Type.RECORD != schema.getType()) {
+      throw UserException.dataReadError()
+          .message("Root object must be record type. Found: %s", 
schema.getType())
+          .addContext("Reader", this)
+          .build(logger);
+    }
+
+    rowWriter.start();
+    converter.convert(genericRecord);
+    writeValue(rowWriter, MetaDataField.KAFKA_TOPIC, record.topic());
+    writeValue(rowWriter, MetaDataField.KAFKA_PARTITION_ID, 
record.partition());
+    writeValue(rowWriter, MetaDataField.KAFKA_OFFSET, record.offset());
+    writeValue(rowWriter, MetaDataField.KAFKA_TIMESTAMP, record.timestamp());
+    writeValue(rowWriter, MetaDataField.KAFKA_MSG_KEY, record.key() != null ? 
record.key().toString() : null);
+    rowWriter.save();
+  }
+
+  private <T> void writeValue(RowSetLoader rowWriter, MetaDataField 
metaDataField, T value) {
+    if (rowWriter.tupleSchema().column(metaDataField.getFieldName()) == null) {
+      ColumnMetadata colSchema = 
MetadataUtils.newScalar(metaDataField.getFieldName(), 
metaDataField.getFieldType(), TypeProtos.DataMode.OPTIONAL);
+      rowWriter.addColumn(colSchema);
+    }
+    rowWriter.column(metaDataField.getFieldName()).setObject(value);
+  }
+
+  @Override
+  public KafkaConsumer<byte[], byte[]> getConsumer(KafkaStoragePlugin plugin) {
+    return new KafkaConsumer<>(plugin.getConfig().getKafkaConsumerProps(),
+        new ByteArrayDeserializer(), new ByteArrayDeserializer());
+  }
+
+  @Override
+  public ResultSetLoader getResultSetLoader() {
+    return loader;
+  }
+
+  @Override
+  public boolean endBatch() {
+    return loader.hasRows();
+  }
+
+  @Override
+  public void close() throws IOException {
+    try {
+      deserializer.close();
+      loader.close();
+    } catch (Exception e) {
+      logger.warn("Error while closing AvroMessageReader: {}", e.getMessage());
+    }
+  }
+}
diff --git 
a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/JsonMessageReader.java
 
b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/JsonMessageReader.java
index eb503aa..a9aee5a 100644
--- 
a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/JsonMessageReader.java
+++ 
b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/JsonMessageReader.java
@@ -17,36 +17,30 @@
  */
 package org.apache.drill.exec.store.kafka.decoders;
 
-import static org.apache.drill.exec.store.kafka.MetaDataField.KAFKA_MSG_KEY;
-import static org.apache.drill.exec.store.kafka.MetaDataField.KAFKA_OFFSET;
-import static 
org.apache.drill.exec.store.kafka.MetaDataField.KAFKA_PARTITION_ID;
-import static org.apache.drill.exec.store.kafka.MetaDataField.KAFKA_TIMESTAMP;
-import static org.apache.drill.exec.store.kafka.MetaDataField.KAFKA_TOPIC;
-
-import java.io.IOException;
-import java.util.List;
-
-import com.fasterxml.jackson.core.JsonParser;
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
 import org.apache.drill.common.exceptions.UserException;
-import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.exec.store.easy.json.JsonProcessor;
-import org.apache.drill.exec.store.easy.json.reader.BaseJsonProcessor;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.physical.impl.scan.framework.SchemaNegotiator;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.ColumnMetadata;
+import org.apache.drill.exec.record.metadata.MetadataUtils;
+import org.apache.drill.exec.store.easy.json.loader.JsonLoaderOptions;
+import org.apache.drill.exec.store.easy.json.parser.TokenIterator;
 import org.apache.drill.exec.store.kafka.KafkaStoragePlugin;
+import org.apache.drill.exec.store.kafka.MetaDataField;
 import org.apache.drill.exec.store.kafka.ReadOptions;
-import org.apache.drill.exec.vector.complex.fn.JsonReader;
-import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.drill.shaded.guava.com.google.common.base.Charsets;
-
-import io.netty.buffer.DrillBuf;
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+import java.util.Iterator;
+import java.util.Properties;
+import java.util.StringJoiner;
 
 /**
  * MessageReader class which will convert ConsumerRecord into JSON and writes 
to
@@ -55,61 +49,82 @@ import io.netty.buffer.DrillBuf;
 public class JsonMessageReader implements MessageReader {
 
   private static final Logger logger = 
LoggerFactory.getLogger(JsonMessageReader.class);
-  private JsonReader jsonReader;
-  private VectorContainerWriter writer;
-  private ObjectMapper objectMapper;
+
+  private final SingleElementIterator<InputStream> stream = new 
SingleElementIterator<>();
+
+  private KafkaJsonLoader kafkaJsonLoader;
+  private ResultSetLoader resultSetLoader;
+  private SchemaNegotiator negotiator;
+  private ReadOptions readOptions;
+  private Properties kafkaConsumerProps;
 
   @Override
-  public void init(DrillBuf buf, List<SchemaPath> columns, 
VectorContainerWriter writer, ReadOptions readOptions) {
-    // set skipOuterList to false as it doesn't applicable for JSON records 
and it's only applicable for JSON files.
-    this.jsonReader = new JsonReader.Builder(buf)
-      .schemaPathColumns(columns)
-      .allTextMode(readOptions.isAllTextMode())
-      .readNumbersAsDouble(readOptions.isReadNumbersAsDouble())
-      .enableNanInf(readOptions.isAllowNanInf())
-      .enableEscapeAnyChar(readOptions.isAllowEscapeAnyChar())
-      .build();
-    jsonReader.setIgnoreJSONParseErrors(readOptions.isSkipInvalidRecords());
-    this.writer = writer;
-    this.objectMapper = BaseJsonProcessor.getDefaultMapper()
-      .configure(JsonParser.Feature.ALLOW_NON_NUMERIC_NUMBERS, 
readOptions.isAllowNanInf())
-      .configure(JsonParser.Feature.ALLOW_BACKSLASH_ESCAPING_ANY_CHARACTER, 
readOptions.isAllowEscapeAnyChar());
+  public void init(SchemaNegotiator negotiator, ReadOptions readOptions, 
KafkaStoragePlugin plugin) {
+    this.negotiator = negotiator;
+    this.resultSetLoader = negotiator.build();
+    this.readOptions = readOptions;
+    this.kafkaConsumerProps = plugin.getConfig().getKafkaConsumerProps();
   }
 
   @Override
-  public boolean readMessage(ConsumerRecord<?, ?> record) {
+  public void readMessage(ConsumerRecord<?, ?> record) {
     byte[] recordArray = (byte[]) record.value();
-    String data = new String(recordArray, Charsets.UTF_8);
     try {
-      JsonNode jsonNode = objectMapper.readTree(data);
-      if (jsonNode != null && jsonNode.isObject()) {
-        ObjectNode objectNode = (ObjectNode) jsonNode;
-        objectNode.put(KAFKA_TOPIC.getFieldName(), record.topic());
-        objectNode.put(KAFKA_PARTITION_ID.getFieldName(), record.partition());
-        objectNode.put(KAFKA_OFFSET.getFieldName(), record.offset());
-        objectNode.put(KAFKA_TIMESTAMP.getFieldName(), record.timestamp());
-        objectNode.put(KAFKA_MSG_KEY.getFieldName(), record.key() != null ? 
record.key().toString() : null);
-      } else {
-        throw new IOException("Unsupported node type: " + (jsonNode == null ? 
"NO CONTENT" : jsonNode.getNodeType()));
+      parseAndWrite(record, recordArray);
+    } catch (TokenIterator.RecoverableJsonException e) {
+      if (!readOptions.isSkipInvalidRecords()) {
+        throw UserException.dataReadError(e)
+            .message(String.format("Error happened when parsing invalid 
record. " +
+                "Please set `%s` option to 'true' to skip invalid records.", 
ExecConstants.KAFKA_READER_SKIP_INVALID_RECORDS))
+            .addContext(resultSetLoader.errorContext())
+            .build(logger);
       }
-      jsonReader.setSource(jsonNode);
-      return convertJsonReadState(jsonReader.write(writer));
-    } catch (IOException | IllegalArgumentException e) {
-      String message = String.format("JSON record %s: %s", data, 
e.getMessage());
-      if (jsonReader.ignoreJSONParseError()) {
-        logger.debug("Skipping {}", message, e);
-        return false;
-      }
-      throw UserException.dataReadError(e)
-        .message("Failed to read " + message)
-        .addContext("MessageReader", JsonMessageReader.class.getName())
-        .build(logger);
     }
   }
 
+  private void parseAndWrite(ConsumerRecord<?, ?> record, byte[] recordArray) {
+    stream.setValue(new ByteArrayInputStream(recordArray));
+    if (kafkaJsonLoader == null) {
+      JsonLoaderOptions jsonLoaderOptions = new JsonLoaderOptions();
+      jsonLoaderOptions.allTextMode = readOptions.isAllTextMode();
+      jsonLoaderOptions.readNumbersAsDouble = 
readOptions.isReadNumbersAsDouble();
+      jsonLoaderOptions.skipMalformedRecords = 
readOptions.isSkipInvalidRecords();
+      jsonLoaderOptions.allowNanInf = readOptions.isAllowNanInf();
+      jsonLoaderOptions.enableEscapeAnyChar = 
readOptions.isAllowEscapeAnyChar();
+      jsonLoaderOptions.skipMalformedDocument = 
readOptions.isSkipInvalidRecords();
+
+      kafkaJsonLoader = (KafkaJsonLoader) new 
KafkaJsonLoader.KafkaJsonLoaderBuilder()
+          .resultSetLoader(resultSetLoader)
+          .standardOptions(negotiator.queryOptions())
+          .options(jsonLoaderOptions)
+          .errorContext(negotiator.parentErrorContext())
+          .fromStream(() -> stream)
+          .build();
+    }
+
+    RowSetLoader rowWriter = resultSetLoader.writer();
+    rowWriter.start();
+    if (kafkaJsonLoader.parser().next()) {
+      writeValue(rowWriter, MetaDataField.KAFKA_TOPIC, record.topic());
+      writeValue(rowWriter, MetaDataField.KAFKA_PARTITION_ID, 
record.partition());
+      writeValue(rowWriter, MetaDataField.KAFKA_OFFSET, record.offset());
+      writeValue(rowWriter, MetaDataField.KAFKA_TIMESTAMP, record.timestamp());
+      writeValue(rowWriter, MetaDataField.KAFKA_MSG_KEY, record.key() != null 
? record.key().toString() : null);
+      rowWriter.save();
+    }
+  }
+
+  private <T> void writeValue(RowSetLoader rowWriter, MetaDataField 
metaDataField, T value) {
+    if (rowWriter.tupleSchema().column(metaDataField.getFieldName()) == null) {
+      ColumnMetadata colSchema = 
MetadataUtils.newScalar(metaDataField.getFieldName(), 
metaDataField.getFieldType(), TypeProtos.DataMode.OPTIONAL);
+      rowWriter.addColumn(colSchema);
+    }
+    rowWriter.column(metaDataField.getFieldName()).setObject(value);
+  }
+
   @Override
-  public void ensureAtLeastOneField() {
-    jsonReader.ensureAtLeastOneField(writer);
+  public ResultSetLoader getResultSetLoader() {
+    return resultSetLoader;
   }
 
   @Override
@@ -119,10 +134,16 @@ public class JsonMessageReader implements MessageReader {
   }
 
   @Override
+  public boolean endBatch() {
+    kafkaJsonLoader.endBatch();
+    return resultSetLoader.hasRows();
+  }
+
+  @Override
   public void close() {
-    this.writer.clear();
     try {
-      this.writer.close();
+      kafkaJsonLoader.close();
+      resultSetLoader.close();
     } catch (Exception e) {
       logger.warn("Error while closing JsonMessageReader: {}", e.getMessage());
     }
@@ -130,26 +151,29 @@ public class JsonMessageReader implements MessageReader {
 
   @Override
   public String toString() {
-    return "JsonMessageReader[jsonReader=" + jsonReader + "]";
+    return new StringJoiner(", ", JsonMessageReader.class.getSimpleName() + 
"[", "]")
+        .add("kafkaJsonLoader=" + kafkaJsonLoader)
+        .add("resultSetLoader=" + resultSetLoader)
+        .toString();
   }
 
-  /**
-   * Converts {@link JsonProcessor.ReadState} into true / false result.
-   *
-   * @param jsonReadState JSON reader read state
-   * @return true if read was successful, false otherwise
-   * @throws IllegalArgumentException if unexpected read state was encountered
-   */
-  private boolean convertJsonReadState(JsonProcessor.ReadState jsonReadState) {
-    switch (jsonReadState) {
-      case WRITE_SUCCEED:
-      case END_OF_STREAM:
-        return true;
-      case JSON_RECORD_PARSE_ERROR:
-      case JSON_RECORD_PARSE_EOF_ERROR:
-        return false;
-      default:
-        throw new IllegalArgumentException("Unexpected JSON read state: " + 
jsonReadState);
+  public static class SingleElementIterator<T> implements Iterator<T> {
+    private T value;
+
+    @Override
+    public boolean hasNext() {
+      return value != null;
+    }
+
+    @Override
+    public T next() {
+      T value = this.value;
+      this.value = null;
+      return value;
+    }
+
+    public void setValue(T value) {
+      this.value = value;
     }
   }
 }
diff --git 
a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/MetaDataField.java
 
b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/KafkaJsonLoader.java
similarity index 57%
copy from 
contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/MetaDataField.java
copy to 
contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/KafkaJsonLoader.java
index af3e163..c09fa52 100644
--- 
a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/MetaDataField.java
+++ 
b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/KafkaJsonLoader.java
@@ -15,27 +15,25 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.drill.exec.store.kafka;
+package org.apache.drill.exec.store.kafka.decoders;
 
-/**
- * MetaData fields provide additional information about each message.
- * It is expected that one should not modify the fieldName of each constant as 
it breaks the compatibility.
- */
-public enum MetaDataField {
+import org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl;
 
-  KAFKA_TOPIC("kafkaTopic"),
-  KAFKA_PARTITION_ID("kafkaPartitionId"),
-  KAFKA_OFFSET("kafkaMsgOffset"),
-  KAFKA_TIMESTAMP("kafkaMsgTimestamp"),
-  KAFKA_MSG_KEY("kafkaMsgKey");
+public class KafkaJsonLoader extends JsonLoaderImpl {
 
-  private final String fieldName;
+  protected KafkaJsonLoader(KafkaJsonLoaderBuilder builder) {
+    super(builder);
+  }
 
-  MetaDataField(final String fieldName) {
-    this.fieldName = fieldName;
+  @Override
+  public void endBatch() {
+    super.endBatch();
   }
 
-  public String getFieldName() {
-    return fieldName;
+  public static class KafkaJsonLoaderBuilder extends JsonLoaderBuilder {
+    @Override
+    public KafkaJsonLoader build() {
+      return new KafkaJsonLoader(this);
+    }
   }
 }
diff --git 
a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/MessageReader.java
 
b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/MessageReader.java
index f925fce..2e78780 100644
--- 
a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/MessageReader.java
+++ 
b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/MessageReader.java
@@ -17,17 +17,14 @@
  */
 package org.apache.drill.exec.store.kafka.decoders;
 
-import java.io.Closeable;
-import java.util.List;
-
-import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.physical.impl.scan.framework.SchemaNegotiator;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
 import org.apache.drill.exec.store.kafka.KafkaStoragePlugin;
 import org.apache.drill.exec.store.kafka.ReadOptions;
-import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 
-import io.netty.buffer.DrillBuf;
+import java.io.Closeable;
 
 /**
  * MessageReader interface provides mechanism to handle various Kafka Message
@@ -35,11 +32,13 @@ import io.netty.buffer.DrillBuf;
  */
 public interface MessageReader extends Closeable {
 
-  void init(DrillBuf buf, List<SchemaPath> columns, VectorContainerWriter 
writer, ReadOptions readOptions);
+  void init(SchemaNegotiator negotiator, ReadOptions readOptions, 
KafkaStoragePlugin plugin);
 
-  boolean readMessage(ConsumerRecord<?, ?> message);
-
-  void ensureAtLeastOneField();
+  void readMessage(ConsumerRecord<?, ?> message);
 
   KafkaConsumer<byte[], byte[]> getConsumer(KafkaStoragePlugin plugin);
+
+  ResultSetLoader getResultSetLoader();
+
+  boolean endBatch();
 }
diff --git 
a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaMessageGenerator.java
 
b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaMessageGenerator.java
index 745edb5..807c84e 100644
--- 
a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaMessageGenerator.java
+++ 
b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaMessageGenerator.java
@@ -17,15 +17,14 @@
  */
 package org.apache.drill.exec.store.kafka;
 
-import java.io.IOException;
 import java.util.List;
-import java.util.Map;
 import java.util.Properties;
 import java.util.Random;
 import java.util.UUID;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 
+import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericData.Record;
 import org.apache.avro.generic.GenericRecord;
@@ -40,8 +39,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
-import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
-import org.apache.drill.shaded.guava.com.google.common.io.Resources;
 import com.google.gson.JsonArray;
 import com.google.gson.JsonObject;
 import com.google.gson.JsonPrimitive;
@@ -49,7 +46,10 @@ import com.google.gson.JsonPrimitive;
 public class KafkaMessageGenerator {
 
   private static final Logger logger = 
LoggerFactory.getLogger(KafkaMessageGenerator.class);
-  private Properties producerProperties = new Properties();
+
+  public static final String SCHEMA_REGISTRY_URL = "mock://testurl";
+
+  private final Properties producerProperties = new Properties();
 
   public KafkaMessageGenerator (final String broker, Class<?> valueSerializer) 
{
     producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, broker);
@@ -63,12 +63,23 @@ public class KafkaMessageGenerator {
     producerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class);
     producerProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
valueSerializer);
     producerProperties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); 
//So that retries do not cause duplicates
+    
producerProperties.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
 SCHEMA_REGISTRY_URL);
   }
 
-  public void populateAvroMsgIntoKafka(String topic, int numMsg) throws 
IOException {
-    try (KafkaProducer<String, GenericRecord> producer = new 
KafkaProducer<>(producerProperties)) {
+  public void populateAvroMsgIntoKafka(String topic, int numMsg) {
+    try (KafkaProducer<Object, GenericRecord> producer = new 
KafkaProducer<>(producerProperties)) {
       Schema.Parser parser = new Schema.Parser();
-      Schema schema = 
parser.parse(Resources.getResource("drill-avro-test.avsc").openStream());
+      String userSchema = "{\"type\":\"record\"," +
+          "\"name\":\"myrecord\"," +
+          "\"fields\":[" +
+          "{\"name\":\"key1\",\"type\":\"string\"}," +
+          "{\"name\":\"key2\",\"type\":\"int\"}," +
+          "{\"name\":\"key3\",\"type\":\"boolean\"}," +
+          
"{\"name\":\"key5\",\"type\":{\"type\":\"array\",\"items\":\"int\"}}," +
+          
"{\"name\":\"key6\",\"type\":{\"type\":\"record\",\"name\":\"myrecord6\",\"fields\":["
 +
+          "{\"name\":\"key61\",\"type\":\"double\"}," +
+          "{\"name\":\"key62\",\"type\":\"double\"}]}}]}";
+      Schema schema = parser.parse(userSchema);
       GenericRecordBuilder builder = new GenericRecordBuilder(schema);
       Random rand = new Random();
       for (int i = 0; i < numMsg; ++i) {
@@ -82,14 +93,14 @@ public class KafkaMessageGenerator {
         list.add(rand.nextInt(100));
         builder.set("key5", list);
 
-        Map<String, Double> map = Maps.newHashMap();
-        map.put("key61", rand.nextDouble());
-        map.put("key62", rand.nextDouble());
-        builder.set("key6", map);
+        GenericRecordBuilder innerBuilder = new 
GenericRecordBuilder(schema.getField("key6").schema());
+        innerBuilder.set("key61", rand.nextDouble());
+        innerBuilder.set("key62", rand.nextDouble());
+        builder.set("key6", innerBuilder.build());
 
         Record producerRecord = builder.build();
 
-        ProducerRecord<String, GenericRecord> record = new 
ProducerRecord<>(topic, producerRecord);
+        ProducerRecord<Object, GenericRecord> record = new 
ProducerRecord<>(topic, producerRecord);
         producer.send(record);
       }
     }
diff --git 
a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaQueriesTest.java
 
b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaQueriesTest.java
index 9e89171..d690e70 100644
--- 
a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaQueriesTest.java
+++ 
b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaQueriesTest.java
@@ -67,6 +67,18 @@ public class KafkaQueriesTest extends KafkaTestBase {
   }
 
   @Test
+  public void testAvroResultCount() {
+    try {
+      client.alterSession(ExecConstants.KAFKA_RECORD_READER,
+          "org.apache.drill.exec.store.kafka.decoders.AvroMessageReader");
+      String queryString = String.format(TestQueryConstants.MSG_SELECT_QUERY, 
TestQueryConstants.AVRO_TOPIC);
+      runKafkaSQLVerifyCount(queryString, TestKafkaSuit.NUM_JSON_MSG);
+    } finally {
+      client.resetSession(ExecConstants.KAFKA_RECORD_READER);
+    }
+  }
+
+  @Test
   public void testPartitionMinOffset() throws Exception {
     // following kafka.tools.GetOffsetShell for earliest as -2
     Map<TopicPartition, Long> startOffsetsMap = fetchOffsets(-2);
@@ -144,6 +156,19 @@ public class KafkaQueriesTest extends KafkaTestBase {
   }
 
   @Test
+  public void testPhysicalPlanSubmissionAvro() throws Exception {
+    try {
+      client.alterSession(ExecConstants.KAFKA_RECORD_READER,
+          "org.apache.drill.exec.store.kafka.decoders.AvroMessageReader");
+      String query = String.format(TestQueryConstants.MSG_SELECT_QUERY, 
TestQueryConstants.AVRO_TOPIC);
+      String plan = queryBuilder().sql(query).explainJson();
+      queryBuilder().physical(plan).run();
+    } finally {
+      client.resetSession(ExecConstants.KAFKA_RECORD_READER);
+    }
+  }
+
+  @Test
   public void testOneMessageTopic() throws Exception {
     String topicName = "topicWithOneMessage";
     TestKafkaSuit.createTopicHelper(topicName, 1);
diff --git 
a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaTestBase.java
 
b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaTestBase.java
index c233b1f..c426b06 100644
--- 
a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaTestBase.java
+++ 
b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaTestBase.java
@@ -19,6 +19,7 @@ package org.apache.drill.exec.store.kafka;
 
 import java.util.Map;
 
+import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.store.StoragePluginRegistry;
 import org.apache.drill.exec.store.kafka.cluster.EmbeddedKafkaCluster;
@@ -34,6 +35,8 @@ import org.junit.BeforeClass;
 
 import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
 
+import static 
org.apache.drill.exec.store.kafka.KafkaMessageGenerator.SCHEMA_REGISTRY_URL;
+
 public class KafkaTestBase extends ClusterTest {
   protected static KafkaStoragePluginConfig storagePluginConfig;
 
@@ -52,6 +55,7 @@ public class KafkaTestBase extends ClusterTest {
     Map<String, String> kafkaConsumerProps = Maps.newHashMap();
     kafkaConsumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
embeddedKafkaCluster.getKafkaBrokerList());
     kafkaConsumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, 
"drill-test-consumer");
+    
kafkaConsumerProps.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
 SCHEMA_REGISTRY_URL);
     storagePluginConfig = new KafkaStoragePluginConfig(kafkaConsumerProps);
     storagePluginConfig.setEnabled(true);
     pluginRegistry.put(KafkaStoragePluginConfig.NAME, storagePluginConfig);
diff --git 
a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/TestKafkaSuit.java
 
b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/TestKafkaSuit.java
index 7bb099d..73384d2 100644
--- 
a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/TestKafkaSuit.java
+++ 
b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/TestKafkaSuit.java
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.exec.store.kafka;
 
+import io.confluent.kafka.serializers.KafkaAvroSerializer;
 import kafka.zk.KafkaZkClient;
 import org.apache.drill.categories.KafkaStorageTest;
 import org.apache.drill.categories.SlowTest;
@@ -85,8 +86,11 @@ public class TestKafkaSuit extends BaseTest {
             "kafka.server", "SessionExpireListener",
             Option.<String>empty(), Option.<ZKClientConfig>empty());
         createTopicHelper(TestQueryConstants.JSON_TOPIC, 1);
+        createTopicHelper(TestQueryConstants.AVRO_TOPIC, 1);
         KafkaMessageGenerator generator = new 
KafkaMessageGenerator(embeddedKafkaCluster.getKafkaBrokerList(), 
StringSerializer.class);
+        KafkaMessageGenerator avroGenerator = new 
KafkaMessageGenerator(embeddedKafkaCluster.getKafkaBrokerList(), 
KafkaAvroSerializer.class);
         generator.populateJsonMsgIntoKafka(TestQueryConstants.JSON_TOPIC, 
NUM_JSON_MSG);
+        avroGenerator.populateAvroMsgIntoKafka(TestQueryConstants.AVRO_TOPIC, 
NUM_JSON_MSG);
       }
       initCount.incrementAndGet();
       runningSuite = true;
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/JsonLoaderImpl.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/JsonLoaderImpl.java
index f1e934f..97cd200 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/JsonLoaderImpl.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/JsonLoaderImpl.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.Reader;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 
 import org.apache.drill.common.exceptions.CustomErrorContext;
@@ -141,7 +142,7 @@ public class JsonLoaderImpl implements JsonLoader, 
ErrorFactory {
     private TupleMetadata providedSchema;
     private JsonLoaderOptions options;
     private CustomErrorContext errorContext;
-    private InputStream stream;
+    private Iterable<InputStream> streams;
     private Reader reader;
     private String dataPath;
     private MessageParser messageParser;
@@ -171,8 +172,13 @@ public class JsonLoaderImpl implements JsonLoader, 
ErrorFactory {
       return this;
     }
 
-    public JsonLoaderBuilder fromStream(InputStream stream) {
-      this.stream = stream;
+    public JsonLoaderBuilder fromStream(InputStream... stream) {
+      this.streams = Arrays.asList(stream);
+      return this;
+    }
+
+    public JsonLoaderBuilder fromStream(Iterable<InputStream> streams) {
+      this.streams = streams;
       return this;
     }
 
@@ -226,7 +232,7 @@ public class JsonLoaderImpl implements JsonLoader, 
ErrorFactory {
   // case. Usually just one or two fields have deferred nulls.
   private final List<NullTypeMarker> nullStates = new ArrayList<>();
 
-  private JsonLoaderImpl(JsonLoaderBuilder builder) {
+  protected JsonLoaderImpl(JsonLoaderBuilder builder) {
     this.rsLoader = builder.rsLoader;
     this.options = builder.options;
     this.errorContext = builder. errorContext;
@@ -236,7 +242,7 @@ public class JsonLoaderImpl implements JsonLoader, 
ErrorFactory {
 
   private JsonStructureParser buildParser(JsonLoaderBuilder builder) {
     return new JsonStructureParserBuilder()
-            .fromStream(builder.stream)
+            .fromStream(builder.streams)
             .fromReader(builder.reader)
             .options(builder.options)
             .parserFactory(parser ->
@@ -307,15 +313,12 @@ public class JsonLoaderImpl implements JsonLoader, 
ErrorFactory {
    * Bottom line: the user is responsible for not giving Drill
    * ambiguous data that would require Drill to predict the future.
    */
-  private void endBatch() {
+  protected void endBatch() {
 
     // Make a copy. Forcing resolution will remove the
     // element from the original list.
-    List<NullTypeMarker> copy = new ArrayList<>();
-    copy.addAll(nullStates);
-    for (NullTypeMarker marker : copy) {
-      marker.forceResolution();
-    }
+    List<NullTypeMarker> copy = new ArrayList<>(nullStates);
+    copy.forEach(NullTypeMarker::forceResolution);
     assert nullStates.isEmpty();
   }
 
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/JsonStructureOptions.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/JsonStructureOptions.java
index f4a3277..ebaae46 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/JsonStructureOptions.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/JsonStructureOptions.java
@@ -47,6 +47,13 @@ public class JsonStructureOptions {
    */
   public boolean skipMalformedRecords;
 
+  /**
+   * This property works only when {@link #skipMalformedRecords} enabled.
+   * If true, {@link TokenIterator.RecoverableJsonException} will be populated 
for the case of
+   * malformed empty document, so it will be possible to handle this exception 
by caller.
+   */
+  public boolean skipMalformedDocument;
+
   public boolean enableEscapeAnyChar;
 
   public JsonStructureOptions() { }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/JsonStructureParser.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/JsonStructureParser.java
index ded872c..2f71aa8 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/JsonStructureParser.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/JsonStructureParser.java
@@ -20,8 +20,10 @@ package org.apache.drill.exec.store.easy.json.parser;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.Reader;
+import java.util.Arrays;
 import java.util.function.Function;
 
+import org.apache.commons.collections4.IterableUtils;
 import 
org.apache.drill.exec.store.easy.json.parser.MessageParser.MessageContextException;
 import 
org.apache.drill.exec.store.easy.json.parser.RootParser.EmbeddedArrayParser;
 import 
org.apache.drill.exec.store.easy.json.parser.RootParser.EmbeddedObjectParser;
@@ -74,7 +76,7 @@ public class JsonStructureParser {
   protected static final Logger logger = 
LoggerFactory.getLogger(JsonStructureParser.class);
 
   public static class JsonStructureParserBuilder {
-    private InputStream stream;
+    private Iterable<InputStream> streams;
     private Reader reader;
     private JsonStructureOptions options;
     private Function<JsonStructureParser, ObjectParser> parserFactory;
@@ -98,8 +100,13 @@ public class JsonStructureParser {
       return this;
     }
 
-    public JsonStructureParserBuilder fromStream(InputStream stream) {
-      this.stream = stream;
+    public JsonStructureParserBuilder fromStream(InputStream... stream) {
+      this.streams = Arrays.asList(stream);
+      return this;
+    }
+
+    public JsonStructureParserBuilder fromStream(Iterable<InputStream> 
streams) {
+      this.streams = streams;
       return this;
     }
 
@@ -130,7 +137,6 @@ public class JsonStructureParser {
     }
   }
 
-  private final JsonParser parser;
   private final JsonStructureOptions options;
   private final ErrorFactory errorFactory;
   private final TokenIterator tokenizer;
@@ -141,37 +147,23 @@ public class JsonStructureParser {
   /**
    * Constructor for the structure parser.
    *
-   * @param stream the source of JSON text
-   * @param options configuration options for the Jackson JSON parser
-   * and this structure parser
-   * @param rootListener listener for the top-level objects in the
-   * JSON stream
-   * @param errorFactory factory for errors thrown for various
-   * conditions
+   * @param builder builder
    */
   private JsonStructureParser(JsonStructureParserBuilder builder) {
     this.options = Preconditions.checkNotNull(builder.options);
     this.errorFactory = Preconditions.checkNotNull(builder.errorFactory);
-    try {
-      ObjectMapper mapper = new ObjectMapper()
-          .configure(JsonParser.Feature.ALLOW_COMMENTS, true)
-          .configure(JsonParser.Feature.ALLOW_UNQUOTED_FIELD_NAMES, true)
-          .configure(JsonReadFeature.ALLOW_NON_NUMERIC_NUMBERS.mappedFeature(),
-              options.allowNanInf)
-          
.configure(JsonReadFeature.ALLOW_BACKSLASH_ESCAPING_ANY_CHARACTER.mappedFeature(),
-              options.enableEscapeAnyChar);
-
-      if (builder.stream != null) {
-        parser = mapper.getFactory().createParser(builder.stream);
-      } else {
-        parser = 
mapper.getFactory().createParser(Preconditions.checkNotNull(builder.reader));
-      }
-    } catch (JsonParseException e) {
-      throw errorFactory().parseError("Failed to create the JSON parser", e);
-    } catch (IOException e) {
-      throw errorFactory().ioException(e);
-    }
-    tokenizer = new TokenIterator(parser, options, errorFactory());
+    ObjectMapper mapper = new ObjectMapper()
+        .configure(JsonParser.Feature.ALLOW_COMMENTS, true)
+        .configure(JsonParser.Feature.ALLOW_UNQUOTED_FIELD_NAMES, true)
+        .configure(JsonReadFeature.ALLOW_NON_NUMERIC_NUMBERS.mappedFeature(),
+            options.allowNanInf)
+        
.configure(JsonReadFeature.ALLOW_BACKSLASH_ESCAPING_ANY_CHARACTER.mappedFeature(),
+            options.enableEscapeAnyChar);
+
+    boolean isStream = !IterableUtils.isEmpty(builder.streams);
+    Function<InputStream, JsonParser> parserFunction = stream -> 
getJsonParser(builder, mapper, stream, isStream);
+
+    tokenizer = new TokenIterator(builder.streams, parserFunction, options, 
errorFactory());
     fieldFactory = new FieldParserFactory(this,
         Preconditions.checkNotNull(builder.parserFactory));
 
@@ -185,6 +177,20 @@ public class JsonStructureParser {
     }
   }
 
+  private JsonParser getJsonParser(JsonStructureParserBuilder builder, 
ObjectMapper mapper, InputStream stream, boolean isStream) {
+    try {
+      if (isStream) {
+        return mapper.getFactory().createParser(stream);
+      } else {
+        return 
mapper.getFactory().createParser(Preconditions.checkNotNull(builder.reader));
+      }
+    } catch (JsonParseException e) {
+      throw errorFactory().parseError("Failed to create the JSON parser", e);
+    } catch (IOException e) {
+      throw errorFactory().ioException(e);
+    }
+  }
+
   public JsonStructureOptions options() { return options; }
   public ErrorFactory errorFactory() { return errorFactory; }
   public FieldParserFactory fieldFactory() { return fieldFactory; }
@@ -263,7 +269,7 @@ public class JsonStructureParser {
    *
    * @return {@code true}  if another record can be read, {@code false}
    * if EOF.
-   * @throws UserException if the error is unrecoverable
+   * @throws org.apache.drill.common.exceptions.UserException if the error is 
unrecoverable
    * @see <a 
href="https://issues.apache.org/jira/browse/DRILL-4653";>DRILL-4653</a>
    * @see <a 
href="https://issues.apache.org/jira/browse/DRILL-5953";>DRILL-5953</a>
    */
@@ -273,12 +279,12 @@ public class JsonStructureParser {
     while (true) {
       while (true) {
         try {
-          if (parser.isClosed()) {
+          if (tokenizer.getParser().isClosed()) {
             throw errorFactory().unrecoverableError();
           }
           JsonToken token = tokenizer.next();
           if (token == null) {
-            if (firstAttempt) {
+            if (firstAttempt && !options().skipMalformedDocument) {
               throw errorFactory().unrecoverableError();
             }
             return false;
@@ -331,7 +337,7 @@ public class JsonStructureParser {
           errorRecoveryCount);
     }
     try {
-      parser.close();
+      tokenizer.close();
     } catch (IOException e) {
       logger.warn("Ignored failure when closing JSON source", e);
     }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/TokenIterator.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/TokenIterator.java
index a7d6a34..38774c6 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/TokenIterator.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/TokenIterator.java
@@ -18,7 +18,11 @@
 package org.apache.drill.exec.store.easy.json.parser;
 
 import java.io.IOException;
+import java.io.InputStream;
+import java.util.Iterator;
+import java.util.function.Function;
 
+import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.exec.vector.accessor.UnsupportedConversionError;
 
 import com.fasterxml.jackson.core.JsonLocation;
@@ -37,16 +41,24 @@ public class TokenIterator {
   public static class RecoverableJsonException extends RuntimeException {
   }
 
-  private final JsonParser parser;
+  private final ParserManager parserManager;
   private final JsonStructureOptions options;
   private final ErrorFactory errorFactory;
   private final JsonToken[] lookahead = new JsonToken[MAX_LOOKAHEAD];
   private int count;
 
-  public TokenIterator(JsonParser parser, JsonStructureOptions options, 
ErrorFactory errorFactory) {
-    this.parser = parser;
+  public TokenIterator(Iterable<InputStream> streams, Function<InputStream, 
JsonParser> parserFunction, JsonStructureOptions options, ErrorFactory 
errorFactory) {
     this.options = options;
     this.errorFactory = errorFactory;
+    this.parserManager = new ParserManager(streams, parserFunction);
+  }
+
+  public JsonParser getParser() {
+    JsonParser parser = parserManager.getParser();
+    if (parser == null) {
+      parserManager.nextParser();
+    }
+    return parserManager.getParser();
   }
 
   public ErrorFactory errorFactory() { return errorFactory; }
@@ -56,7 +68,7 @@ public class TokenIterator {
       return lookahead[--count];
     }
     try {
-      return parser.nextToken();
+      return getNextToken();
     } catch (JsonParseException e) {
       if (options.skipMalformedRecords) {
         throw new RecoverableJsonException();
@@ -68,11 +80,23 @@ public class TokenIterator {
     }
   }
 
+  private JsonToken getNextToken() throws IOException {
+    JsonToken jsonToken = getParser().nextToken();
+    if (jsonToken == null) {
+      parserManager.nextParser();
+      JsonParser parser = getParser();
+      if (parser != null) {
+        jsonToken = parser.nextToken();
+      }
+    }
+    return jsonToken;
+  }
+
   public String context() {
-    JsonLocation location = parser.getCurrentLocation();
+    JsonLocation location = getParser().getCurrentLocation();
     String token;
     try {
-      token = parser.getText();
+      token = getParser().getText();
     } catch (IOException e) {
       token = "<unknown>";
     }
@@ -88,16 +112,19 @@ public class TokenIterator {
   }
 
   public int lineNumber() {
-    return parser.getCurrentLocation().getLineNr();
+    JsonParser parser = getParser();
+    return parser != null ? parser.getCurrentLocation().getLineNr() : 0;
   }
 
   public int columnNumber() {
-    return parser.getCurrentLocation().getColumnNr();
+    JsonParser parser = getParser();
+    return parser != null ? parser.getCurrentLocation().getColumnNr() : 0;
   }
 
   public String token() {
     try {
-      return parser.getText();
+      JsonParser parser = getParser();
+      return parser != null ? getParser().getText() : null;
     } catch (IOException e) {
       return null;
     }
@@ -127,7 +154,7 @@ public class TokenIterator {
 
   public String textValue() {
     try {
-      return parser.getText();
+      return getParser().getText();
     } catch (JsonParseException e) {
       throw errorFactory.syntaxError(e);
     } catch (IOException e) {
@@ -137,7 +164,7 @@ public class TokenIterator {
 
   public long longValue() {
     try {
-      return parser.getLongValue();
+      return getParser().getLongValue();
     } catch (JsonParseException e) {
       throw errorFactory.syntaxError(e);
     } catch (IOException e) {
@@ -149,7 +176,7 @@ public class TokenIterator {
 
   public String stringValue() {
     try {
-      return parser.getValueAsString();
+      return getParser().getValueAsString();
     } catch (JsonParseException e) {
       throw errorFactory.syntaxError(e);
     } catch (IOException e) {
@@ -161,7 +188,7 @@ public class TokenIterator {
 
   public double doubleValue() {
     try {
-      return parser.getValueAsDouble();
+      return getParser().getValueAsDouble();
     } catch (JsonParseException e) {
       throw errorFactory.syntaxError(e);
     } catch (IOException e) {
@@ -173,7 +200,7 @@ public class TokenIterator {
 
   public byte[] binaryValue() {
     try {
-      return parser.getBinaryValue();
+      return getParser().getBinaryValue();
     } catch (JsonParseException e) {
       throw errorFactory.syntaxError(e);
     } catch (IOException e) {
@@ -186,4 +213,46 @@ public class TokenIterator {
   public RuntimeException invalidValue(JsonToken token) {
     return errorFactory.structureError("Unexpected JSON value: " + 
token.name());
   }
+
+  public static class ParserManager {
+    private final Function<InputStream, JsonParser> parserFunction;
+    private final Iterator<InputStream> parsersIterator;
+    private JsonParser currentParser;
+
+    public ParserManager(Iterable<InputStream> parsers, Function<InputStream, 
JsonParser> parserFunction) {
+      this.parsersIterator = parsers.iterator();
+      this.parserFunction = parserFunction;
+      this.nextParser();
+    }
+
+    public JsonParser getParser() {
+      return currentParser;
+    }
+
+    public ParserManager nextParser() {
+      if (parsersIterator.hasNext()) {
+        try {
+          if (currentParser != null) {
+            currentParser.close();
+          }
+        } catch (IOException e) {
+          throw new DrillRuntimeException(e);
+        }
+        currentParser = parserFunction.apply(parsersIterator.next());
+      } else {
+        currentParser = null;
+      }
+      return this;
+    }
+
+    public void close() throws IOException {
+      if (currentParser != null) {
+        currentParser.close();
+      }
+    }
+  }
+
+  public void close() throws IOException {
+    parserManager.close();
+  }
 }
diff --git a/exec/jdbc-all/pom.xml b/exec/jdbc-all/pom.xml
index c865756..59a23b9 100644
--- a/exec/jdbc-all/pom.xml
+++ b/exec/jdbc-all/pom.xml
@@ -336,6 +336,7 @@
               <exclude>javax.xml.stream:stax-api</exclude>
               <exclude>javax.activation:activation</exclude>
               <exclude>commons-cli:commons-cli</exclude>
+              <exclude>org.apache.commons:commons-collections4</exclude>
               <exclude>commons-io:commons-io</exclude>
               <exclude>commons-beanutils:commons-beanutils-core:jar:*</exclude>
               <exclude>commons-beanutils:commons-beanutils:jar:*</exclude>
@@ -622,6 +623,7 @@
                     <exclude>javax.xml.stream:stax-api</exclude>
                     <exclude>javax.activation:activation</exclude>
                     <exclude>commons-cli:commons-cli</exclude>
+                    <exclude>org.apache.commons:commons-collections4</exclude>
                     <exclude>commons-io:commons-io</exclude>
                     
<exclude>commons-beanutils:commons-beanutils-core:jar:*</exclude>
                     
<exclude>commons-beanutils:commons-beanutils:jar:*</exclude>
diff --git a/pom.xml b/pom.xml
index 312bc62..3648921 100644
--- a/pom.xml
+++ b/pom.xml
@@ -70,6 +70,7 @@
     <findbugs.version>3.0.0</findbugs.version>
     <netty.tcnative.classifier />
     <commons.io.version>2.7</commons.io.version>
+    <commons.collections.version>4.4</commons.collections.version>
     <hamcrest.core.version>1.3</hamcrest.core.version>
     <curator.version>5.1.0</curator.version>
     <wiremock.standalone.version>2.23.2</wiremock.standalone.version>
@@ -1085,6 +1086,10 @@
       <artifactId>commons-io</artifactId>
       <version>${commons.io.version}</version>
     </dependency>
+    <dependency>
+      <groupId>org.apache.commons</groupId>
+      <artifactId>commons-collections4</artifactId>
+    </dependency>
 
     <!-- Test Dependencies -->
     <dependency>
@@ -1827,6 +1832,11 @@
         <version>${zookeeper.version}</version>
       </dependency>
       <dependency>
+        <groupId>org.apache.commons</groupId>
+        <artifactId>commons-collections4</artifactId>
+        <version>${commons.collections.version}</version>
+      </dependency>
+      <dependency>
         <groupId>org.apache.curator</groupId>
         <artifactId>curator-client</artifactId>
         <version>${curator.version}</version>

Reply via email to