This is an automated email from the ASF dual-hosted git repository.
volodymyr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git
The following commit(s) were added to refs/heads/master by this push:
new f73496d DRILL-5940: Add support for Avro messages with schema
registry for Kafka (#2239)
f73496d is described below
commit f73496dfb6228c75f745cb670da17b24ce7bef34
Author: Volodymyr Vysotskyi <[email protected]>
AuthorDate: Fri May 28 09:24:40 2021 +0300
DRILL-5940: Add support for Avro messages with schema registry for Kafka
(#2239)
---
contrib/storage-kafka/pom.xml | 21 +++
.../drill/exec/store/kafka/KafkaRecordReader.java | 132 ++++++--------
.../exec/store/kafka/KafkaScanBatchCreator.java | 44 +++--
.../drill/exec/store/kafka/MetaDataField.java | 20 ++-
.../store/kafka/decoders/AvroMessageReader.java | 125 ++++++++++++++
.../store/kafka/decoders/JsonMessageReader.java | 192 ++++++++++++---------
.../KafkaJsonLoader.java} | 30 ++--
.../exec/store/kafka/decoders/MessageReader.java | 19 +-
.../exec/store/kafka/KafkaMessageGenerator.java | 37 ++--
.../drill/exec/store/kafka/KafkaQueriesTest.java | 25 +++
.../drill/exec/store/kafka/KafkaTestBase.java | 4 +
.../drill/exec/store/kafka/TestKafkaSuit.java | 4 +
.../store/easy/json/loader/JsonLoaderImpl.java | 25 +--
.../easy/json/parser/JsonStructureOptions.java | 7 +
.../easy/json/parser/JsonStructureParser.java | 76 ++++----
.../exec/store/easy/json/parser/TokenIterator.java | 97 +++++++++--
exec/jdbc-all/pom.xml | 2 +
pom.xml | 10 ++
18 files changed, 590 insertions(+), 280 deletions(-)
diff --git a/contrib/storage-kafka/pom.xml b/contrib/storage-kafka/pom.xml
index d626d40..a9008b0 100644
--- a/contrib/storage-kafka/pom.xml
+++ b/contrib/storage-kafka/pom.xml
@@ -35,6 +35,21 @@
<kafka.TestSuite>**/TestKafkaSuit.class</kafka.TestSuite>
</properties>
+ <repositories>
+ <repository>
+ <id>confluent</id>
+ <name>Confluent</name>
+ <url>https://packages.confluent.io/maven/</url>
+ <releases>
+ <enabled>true</enabled>
+ <checksumPolicy>fail</checksumPolicy>
+ </releases>
+ <snapshots>
+ <enabled>false</enabled>
+ </snapshots>
+ </repository>
+ </repositories>
+
<dependencies>
<dependency>
<groupId>org.apache.drill.exec</groupId>
@@ -78,6 +93,12 @@
</exclusions>
</dependency>
+ <dependency>
+ <groupId>io.confluent</groupId>
+ <artifactId>kafka-avro-serializer</artifactId>
+ <version>6.1.1</version>
+ </dependency>
+
<!-- Test dependencies -->
<dependency>
<groupId>org.apache.drill.exec</groupId>
diff --git
a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaRecordReader.java
b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaRecordReader.java
index 5218c3b..551b62f 100644
---
a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaRecordReader.java
+++
b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaRecordReader.java
@@ -17,70 +17,57 @@
*/
package org.apache.drill.exec.store.kafka;
-import java.io.IOException;
-import java.util.Collection;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-
+import org.apache.drill.common.PlanStringBuilder;
+import org.apache.drill.common.exceptions.ChildErrorContext;
+import org.apache.drill.common.exceptions.CustomErrorContext;
import org.apache.drill.common.exceptions.UserException;
-import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.ops.OperatorContext;
-import org.apache.drill.exec.physical.impl.OutputMutator;
-import org.apache.drill.exec.store.AbstractRecordReader;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.framework.SchemaNegotiator;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.server.options.OptionManager;
import org.apache.drill.exec.store.kafka.decoders.MessageReader;
import org.apache.drill.exec.store.kafka.decoders.MessageReaderFactory;
-import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
-import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+import java.io.IOException;
-public class KafkaRecordReader extends AbstractRecordReader {
+public class KafkaRecordReader implements ManagedReader<SchemaNegotiator> {
private static final Logger logger =
LoggerFactory.getLogger(KafkaRecordReader.class);
- private static final long DEFAULT_MESSAGES_PER_BATCH = 4000;
-
private final ReadOptions readOptions;
private final KafkaStoragePlugin plugin;
private final KafkaPartitionScanSpec subScanSpec;
+ private final int maxRecords;
- private VectorContainerWriter writer;
private MessageReader messageReader;
-
private long currentOffset;
private MessageIterator msgItr;
- private int currentMessageCount;
- public KafkaRecordReader(KafkaPartitionScanSpec subScanSpec,
List<SchemaPath> projectedColumns,
- FragmentContext context, KafkaStoragePlugin plugin) {
- setColumns(projectedColumns);
- this.readOptions = new ReadOptions(context.getOptions());
+ public KafkaRecordReader(KafkaPartitionScanSpec subScanSpec, OptionManager
options, KafkaStoragePlugin plugin, int maxRecords) {
+ this.readOptions = new ReadOptions(options);
this.plugin = plugin;
this.subScanSpec = subScanSpec;
+ this.maxRecords = maxRecords;
}
@Override
- protected Collection<SchemaPath> transformColumns(Collection<SchemaPath>
projectedColumns) {
- Set<SchemaPath> transformed = new LinkedHashSet<>();
- if (isStarQuery()) {
- transformed.add(SchemaPath.STAR_COLUMN);
- } else {
- transformed.addAll(projectedColumns);
- }
- return transformed;
- }
+ public boolean open(SchemaNegotiator negotiator) {
+ CustomErrorContext errorContext = new
ChildErrorContext(negotiator.parentErrorContext()) {
+ @Override
+ public void addContext(UserException.Builder builder) {
+ super.addContext(builder);
+ builder.addContext("topic_name", subScanSpec.getTopicName());
+ }
+ };
+ negotiator.setErrorContext(errorContext);
- @Override
- public void setup(OperatorContext context, OutputMutator output) {
- this.writer = new VectorContainerWriter(output,
readOptions.isEnableUnionType());
messageReader =
MessageReaderFactory.getMessageReader(readOptions.getMessageReader());
- messageReader.init(context.getManagedBuffer(),
Lists.newArrayList(getColumns()), writer, readOptions);
+ messageReader.init(negotiator, readOptions, plugin);
msgItr = new MessageIterator(messageReader.getConsumer(plugin),
subScanSpec, readOptions.getPollTimeOut());
+
+ return true;
}
/**
@@ -88,60 +75,49 @@ public class KafkaRecordReader extends AbstractRecordReader
{
* take care of polling multiple times for this given batch next invocation
*/
@Override
- public int next() {
- writer.allocate();
- writer.reset();
- Stopwatch watch = logger.isDebugEnabled() ? Stopwatch.createStarted() :
null;
- currentMessageCount = 0;
-
- try {
- while (currentOffset < subScanSpec.getEndOffset() && msgItr.hasNext()) {
- ConsumerRecord<byte[], byte[]> consumerRecord = msgItr.next();
- currentOffset = consumerRecord.offset();
- writer.setPosition(currentMessageCount);
- boolean status = messageReader.readMessage(consumerRecord);
- // increment record count only if message was read successfully
- if (status) {
- if (++currentMessageCount >= DEFAULT_MESSAGES_PER_BATCH) {
- break;
- }
- }
+ public boolean next() {
+ RowSetLoader rowWriter = messageReader.getResultSetLoader().writer();
+ while (!rowWriter.isFull()) {
+ if (!nextLine(rowWriter)) {
+ return false;
}
+ }
+ return messageReader.endBatch();
+ }
- if (currentMessageCount > 0) {
- messageReader.ensureAtLeastOneField();
- }
- writer.setValueCount(currentMessageCount);
- if (watch != null) {
- logger.debug("Took {} ms to process {} records.",
watch.elapsed(TimeUnit.MILLISECONDS), currentMessageCount);
- }
- logger.debug("Last offset consumed for {}:{} is {}",
subScanSpec.getTopicName(), subScanSpec.getPartitionId(),
- currentOffset);
- return currentMessageCount;
- } catch (Exception e) {
- String msg = "Failure while reading messages from kafka. Record reader
was at record: " + (currentMessageCount + 1);
- throw UserException.dataReadError(e)
- .message(msg)
- .addContext(e.getMessage())
- .build(logger);
+ private boolean nextLine(RowSetLoader rowWriter) {
+ if (rowWriter.limitReached(maxRecords)) {
+ return false;
+ }
+
+ if (currentOffset >= subScanSpec.getEndOffset() || !msgItr.hasNext()) {
+ return false;
}
+ ConsumerRecord<byte[], byte[]> consumerRecord = msgItr.next();
+ currentOffset = consumerRecord.offset();
+ messageReader.readMessage(consumerRecord);
+ return true;
}
@Override
- public void close() throws IOException {
+ public void close() {
logger.debug("Last offset processed for {}:{} is - {}",
subScanSpec.getTopicName(), subScanSpec.getPartitionId(),
currentOffset);
logger.debug("Total time to fetch messages from {}:{} is - {}
milliseconds", subScanSpec.getTopicName(),
subScanSpec.getPartitionId(), msgItr.getTotalFetchTime());
plugin.registerToClose(msgItr);
- messageReader.close();
+ try {
+ messageReader.close();
+ } catch (IOException e) {
+ logger.warn("Error closing Kafka message reader: {}", e.getMessage(), e);
+ }
}
@Override
public String toString() {
- return "KafkaRecordReader[readOptions=" + readOptions
- + ", currentOffset=" + currentOffset
- + ", currentMessageCount=" + currentMessageCount
- + "]";
+ return new PlanStringBuilder(this)
+ .field("readOptions", readOptions)
+ .field("currentOffset", currentOffset)
+ .toString();
}
}
diff --git
a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaScanBatchCreator.java
b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaScanBatchCreator.java
index 8083e33..7d2ebcc 100644
---
a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaScanBatchCreator.java
+++
b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaScanBatchCreator.java
@@ -21,33 +21,51 @@ import java.util.List;
import java.util.stream.Collectors;
import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.Types;
import org.apache.drill.exec.ops.ExecutorFragmentContext;
-import org.apache.drill.exec.physical.base.GroupScan;
import org.apache.drill.exec.physical.impl.BatchCreator;
-import org.apache.drill.exec.physical.impl.ScanBatch;
+import org.apache.drill.exec.physical.impl.scan.framework.BasicScanFactory;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedScanFramework;
+import org.apache.drill.exec.physical.impl.scan.framework.SchemaNegotiator;
import org.apache.drill.exec.record.CloseableRecordBatch;
import org.apache.drill.exec.record.RecordBatch;
-import org.apache.drill.exec.store.RecordReader;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.drill.exec.server.options.OptionManager;
import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
public class KafkaScanBatchCreator implements BatchCreator<KafkaSubScan> {
- private static final Logger logger =
LoggerFactory.getLogger(KafkaScanBatchCreator.class);
@Override
public CloseableRecordBatch getBatch(ExecutorFragmentContext context,
KafkaSubScan subScan, List<RecordBatch> children)
throws ExecutionSetupException {
Preconditions.checkArgument(children.isEmpty());
- List<SchemaPath> columns = subScan.getColumns() != null ?
subScan.getColumns() : GroupScan.ALL_COLUMNS;
+ try {
+ ManagedScanFramework.ScanFrameworkBuilder builder =
createBuilder(context.getOptions(), subScan);
+ return builder.buildScanOperator(context, subScan);
+ } catch (UserException e) {
+ // Rethrow user exceptions directly
+ throw e;
+ } catch (Throwable e) {
+ // Wrap all others
+ throw new ExecutionSetupException(e);
+ }
+ }
- List<RecordReader> readers = subScan.getPartitionSubScanSpecList().stream()
- .map(scanSpec -> new KafkaRecordReader(scanSpec, columns, context,
subScan.getKafkaStoragePlugin()))
- .collect(Collectors.toList());
+ private ManagedScanFramework.ScanFrameworkBuilder
createBuilder(OptionManager options,
+ KafkaSubScan subScan) {
+ ManagedScanFramework.ScanFrameworkBuilder builder = new
ManagedScanFramework.ScanFrameworkBuilder();
+ builder.projection(subScan.getColumns());
+ builder.setUserName(subScan.getUserName());
- logger.debug("Number of record readers initialized : {}", readers.size());
- return new ScanBatch(subScan, context, readers);
+ List<ManagedReader<SchemaNegotiator>> readers =
subScan.getPartitionSubScanSpecList().stream()
+ .map(scanSpec -> new KafkaRecordReader(scanSpec, options,
subScan.getKafkaStoragePlugin(), -1))
+ .collect(Collectors.toList());
+ ManagedScanFramework.ReaderFactory readerFactory = new
BasicScanFactory(readers.iterator());
+ builder.setReaderFactory(readerFactory);
+ builder.nullType(Types.optional(TypeProtos.MinorType.VARCHAR));
+ return builder;
}
}
diff --git
a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/MetaDataField.java
b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/MetaDataField.java
index af3e163..89c8d79 100644
---
a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/MetaDataField.java
+++
b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/MetaDataField.java
@@ -17,25 +17,33 @@
*/
package org.apache.drill.exec.store.kafka;
+import org.apache.drill.common.types.TypeProtos;
+
/**
* MetaData fields provide additional information about each message.
* It is expected that one should not modify the fieldName of each constant as
it breaks the compatibility.
*/
public enum MetaDataField {
- KAFKA_TOPIC("kafkaTopic"),
- KAFKA_PARTITION_ID("kafkaPartitionId"),
- KAFKA_OFFSET("kafkaMsgOffset"),
- KAFKA_TIMESTAMP("kafkaMsgTimestamp"),
- KAFKA_MSG_KEY("kafkaMsgKey");
+ KAFKA_TOPIC("kafkaTopic", TypeProtos.MinorType.VARCHAR),
+ KAFKA_PARTITION_ID("kafkaPartitionId", TypeProtos.MinorType.BIGINT),
+ KAFKA_OFFSET("kafkaMsgOffset", TypeProtos.MinorType.BIGINT),
+ KAFKA_TIMESTAMP("kafkaMsgTimestamp", TypeProtos.MinorType.BIGINT),
+ KAFKA_MSG_KEY("kafkaMsgKey", TypeProtos.MinorType.VARCHAR);
private final String fieldName;
+ private final TypeProtos.MinorType fieldType;
- MetaDataField(final String fieldName) {
+ MetaDataField(final String fieldName, TypeProtos.MinorType type) {
this.fieldName = fieldName;
+ this.fieldType = type;
}
public String getFieldName() {
return fieldName;
}
+
+ public TypeProtos.MinorType getFieldType() {
+ return fieldType;
+ }
}
diff --git
a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/AvroMessageReader.java
b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/AvroMessageReader.java
new file mode 100644
index 0000000..3846b92
--- /dev/null
+++
b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/AvroMessageReader.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kafka.decoders;
+
+import io.confluent.kafka.serializers.KafkaAvroDeserializer;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.exec.physical.impl.scan.framework.SchemaNegotiator;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.ColumnConverter;
+import org.apache.drill.exec.record.metadata.ColumnMetadata;
+import org.apache.drill.exec.record.metadata.MetadataUtils;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.record.metadata.TupleSchema;
+import org.apache.drill.exec.store.avro.AvroColumnConverterFactory;
+import org.apache.drill.exec.store.kafka.KafkaStoragePlugin;
+import org.apache.drill.exec.store.kafka.MetaDataField;
+import org.apache.drill.exec.store.kafka.ReadOptions;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Properties;
+import java.util.stream.Collectors;
+
+public class AvroMessageReader implements MessageReader {
+ private static final Logger logger =
LoggerFactory.getLogger(AvroMessageReader.class);
+
+ private KafkaAvroDeserializer deserializer;
+ private ColumnConverter converter;
+ private ResultSetLoader loader;
+
+ @Override
+ public void init(SchemaNegotiator negotiator, ReadOptions readOptions,
KafkaStoragePlugin plugin) {
+ Properties kafkaConsumerProps = plugin.getConfig().getKafkaConsumerProps();
+ Map<String, Object> propertiesMap = kafkaConsumerProps.entrySet().stream()
+ .collect(Collectors.toMap(e -> e.getKey().toString(),
Map.Entry::getValue));
+ deserializer = new KafkaAvroDeserializer(null, propertiesMap);
+ TupleMetadata providedSchema = negotiator.providedSchema();
+ loader = negotiator.build();
+ AvroColumnConverterFactory factory = new
AvroColumnConverterFactory(providedSchema);
+ converter = factory.getRootConverter(providedSchema, new TupleSchema(),
loader.writer());
+ }
+
+ @Override
+ public void readMessage(ConsumerRecord<?, ?> record) {
+ RowSetLoader rowWriter = loader.writer();
+ byte[] recordArray = (byte[]) record.value();
+ GenericRecord genericRecord = (GenericRecord)
deserializer.deserialize(null, recordArray);
+
+ Schema schema = genericRecord.getSchema();
+
+ if (Schema.Type.RECORD != schema.getType()) {
+ throw UserException.dataReadError()
+ .message("Root object must be record type. Found: %s",
schema.getType())
+ .addContext("Reader", this)
+ .build(logger);
+ }
+
+ rowWriter.start();
+ converter.convert(genericRecord);
+ writeValue(rowWriter, MetaDataField.KAFKA_TOPIC, record.topic());
+ writeValue(rowWriter, MetaDataField.KAFKA_PARTITION_ID,
record.partition());
+ writeValue(rowWriter, MetaDataField.KAFKA_OFFSET, record.offset());
+ writeValue(rowWriter, MetaDataField.KAFKA_TIMESTAMP, record.timestamp());
+ writeValue(rowWriter, MetaDataField.KAFKA_MSG_KEY, record.key() != null ?
record.key().toString() : null);
+ rowWriter.save();
+ }
+
+ private <T> void writeValue(RowSetLoader rowWriter, MetaDataField
metaDataField, T value) {
+ if (rowWriter.tupleSchema().column(metaDataField.getFieldName()) == null) {
+ ColumnMetadata colSchema =
MetadataUtils.newScalar(metaDataField.getFieldName(),
metaDataField.getFieldType(), TypeProtos.DataMode.OPTIONAL);
+ rowWriter.addColumn(colSchema);
+ }
+ rowWriter.column(metaDataField.getFieldName()).setObject(value);
+ }
+
+ @Override
+ public KafkaConsumer<byte[], byte[]> getConsumer(KafkaStoragePlugin plugin) {
+ return new KafkaConsumer<>(plugin.getConfig().getKafkaConsumerProps(),
+ new ByteArrayDeserializer(), new ByteArrayDeserializer());
+ }
+
+ @Override
+ public ResultSetLoader getResultSetLoader() {
+ return loader;
+ }
+
+ @Override
+ public boolean endBatch() {
+ return loader.hasRows();
+ }
+
+ @Override
+ public void close() throws IOException {
+ try {
+ deserializer.close();
+ loader.close();
+ } catch (Exception e) {
+ logger.warn("Error while closing AvroMessageReader: {}", e.getMessage());
+ }
+ }
+}
diff --git
a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/JsonMessageReader.java
b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/JsonMessageReader.java
index eb503aa..a9aee5a 100644
---
a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/JsonMessageReader.java
+++
b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/JsonMessageReader.java
@@ -17,36 +17,30 @@
*/
package org.apache.drill.exec.store.kafka.decoders;
-import static org.apache.drill.exec.store.kafka.MetaDataField.KAFKA_MSG_KEY;
-import static org.apache.drill.exec.store.kafka.MetaDataField.KAFKA_OFFSET;
-import static
org.apache.drill.exec.store.kafka.MetaDataField.KAFKA_PARTITION_ID;
-import static org.apache.drill.exec.store.kafka.MetaDataField.KAFKA_TIMESTAMP;
-import static org.apache.drill.exec.store.kafka.MetaDataField.KAFKA_TOPIC;
-
-import java.io.IOException;
-import java.util.List;
-
-import com.fasterxml.jackson.core.JsonParser;
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.drill.common.exceptions.UserException;
-import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.exec.store.easy.json.JsonProcessor;
-import org.apache.drill.exec.store.easy.json.reader.BaseJsonProcessor;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.physical.impl.scan.framework.SchemaNegotiator;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.ColumnMetadata;
+import org.apache.drill.exec.record.metadata.MetadataUtils;
+import org.apache.drill.exec.store.easy.json.loader.JsonLoaderOptions;
+import org.apache.drill.exec.store.easy.json.parser.TokenIterator;
import org.apache.drill.exec.store.kafka.KafkaStoragePlugin;
+import org.apache.drill.exec.store.kafka.MetaDataField;
import org.apache.drill.exec.store.kafka.ReadOptions;
-import org.apache.drill.exec.vector.complex.fn.JsonReader;
-import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.drill.shaded.guava.com.google.common.base.Charsets;
-
-import io.netty.buffer.DrillBuf;
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+import java.util.Iterator;
+import java.util.Properties;
+import java.util.StringJoiner;
/**
* MessageReader class which will convert ConsumerRecord into JSON and writes
to
@@ -55,61 +49,82 @@ import io.netty.buffer.DrillBuf;
public class JsonMessageReader implements MessageReader {
private static final Logger logger =
LoggerFactory.getLogger(JsonMessageReader.class);
- private JsonReader jsonReader;
- private VectorContainerWriter writer;
- private ObjectMapper objectMapper;
+
+ private final SingleElementIterator<InputStream> stream = new
SingleElementIterator<>();
+
+ private KafkaJsonLoader kafkaJsonLoader;
+ private ResultSetLoader resultSetLoader;
+ private SchemaNegotiator negotiator;
+ private ReadOptions readOptions;
+ private Properties kafkaConsumerProps;
@Override
- public void init(DrillBuf buf, List<SchemaPath> columns,
VectorContainerWriter writer, ReadOptions readOptions) {
- // set skipOuterList to false as it doesn't applicable for JSON records
and it's only applicable for JSON files.
- this.jsonReader = new JsonReader.Builder(buf)
- .schemaPathColumns(columns)
- .allTextMode(readOptions.isAllTextMode())
- .readNumbersAsDouble(readOptions.isReadNumbersAsDouble())
- .enableNanInf(readOptions.isAllowNanInf())
- .enableEscapeAnyChar(readOptions.isAllowEscapeAnyChar())
- .build();
- jsonReader.setIgnoreJSONParseErrors(readOptions.isSkipInvalidRecords());
- this.writer = writer;
- this.objectMapper = BaseJsonProcessor.getDefaultMapper()
- .configure(JsonParser.Feature.ALLOW_NON_NUMERIC_NUMBERS,
readOptions.isAllowNanInf())
- .configure(JsonParser.Feature.ALLOW_BACKSLASH_ESCAPING_ANY_CHARACTER,
readOptions.isAllowEscapeAnyChar());
+ public void init(SchemaNegotiator negotiator, ReadOptions readOptions,
KafkaStoragePlugin plugin) {
+ this.negotiator = negotiator;
+ this.resultSetLoader = negotiator.build();
+ this.readOptions = readOptions;
+ this.kafkaConsumerProps = plugin.getConfig().getKafkaConsumerProps();
}
@Override
- public boolean readMessage(ConsumerRecord<?, ?> record) {
+ public void readMessage(ConsumerRecord<?, ?> record) {
byte[] recordArray = (byte[]) record.value();
- String data = new String(recordArray, Charsets.UTF_8);
try {
- JsonNode jsonNode = objectMapper.readTree(data);
- if (jsonNode != null && jsonNode.isObject()) {
- ObjectNode objectNode = (ObjectNode) jsonNode;
- objectNode.put(KAFKA_TOPIC.getFieldName(), record.topic());
- objectNode.put(KAFKA_PARTITION_ID.getFieldName(), record.partition());
- objectNode.put(KAFKA_OFFSET.getFieldName(), record.offset());
- objectNode.put(KAFKA_TIMESTAMP.getFieldName(), record.timestamp());
- objectNode.put(KAFKA_MSG_KEY.getFieldName(), record.key() != null ?
record.key().toString() : null);
- } else {
- throw new IOException("Unsupported node type: " + (jsonNode == null ?
"NO CONTENT" : jsonNode.getNodeType()));
+ parseAndWrite(record, recordArray);
+ } catch (TokenIterator.RecoverableJsonException e) {
+ if (!readOptions.isSkipInvalidRecords()) {
+ throw UserException.dataReadError(e)
+ .message(String.format("Error happened when parsing invalid
record. " +
+ "Please set `%s` option to 'true' to skip invalid records.",
ExecConstants.KAFKA_READER_SKIP_INVALID_RECORDS))
+ .addContext(resultSetLoader.errorContext())
+ .build(logger);
}
- jsonReader.setSource(jsonNode);
- return convertJsonReadState(jsonReader.write(writer));
- } catch (IOException | IllegalArgumentException e) {
- String message = String.format("JSON record %s: %s", data,
e.getMessage());
- if (jsonReader.ignoreJSONParseError()) {
- logger.debug("Skipping {}", message, e);
- return false;
- }
- throw UserException.dataReadError(e)
- .message("Failed to read " + message)
- .addContext("MessageReader", JsonMessageReader.class.getName())
- .build(logger);
}
}
+ private void parseAndWrite(ConsumerRecord<?, ?> record, byte[] recordArray) {
+ stream.setValue(new ByteArrayInputStream(recordArray));
+ if (kafkaJsonLoader == null) {
+ JsonLoaderOptions jsonLoaderOptions = new JsonLoaderOptions();
+ jsonLoaderOptions.allTextMode = readOptions.isAllTextMode();
+ jsonLoaderOptions.readNumbersAsDouble =
readOptions.isReadNumbersAsDouble();
+ jsonLoaderOptions.skipMalformedRecords =
readOptions.isSkipInvalidRecords();
+ jsonLoaderOptions.allowNanInf = readOptions.isAllowNanInf();
+ jsonLoaderOptions.enableEscapeAnyChar =
readOptions.isAllowEscapeAnyChar();
+ jsonLoaderOptions.skipMalformedDocument =
readOptions.isSkipInvalidRecords();
+
+ kafkaJsonLoader = (KafkaJsonLoader) new
KafkaJsonLoader.KafkaJsonLoaderBuilder()
+ .resultSetLoader(resultSetLoader)
+ .standardOptions(negotiator.queryOptions())
+ .options(jsonLoaderOptions)
+ .errorContext(negotiator.parentErrorContext())
+ .fromStream(() -> stream)
+ .build();
+ }
+
+ RowSetLoader rowWriter = resultSetLoader.writer();
+ rowWriter.start();
+ if (kafkaJsonLoader.parser().next()) {
+ writeValue(rowWriter, MetaDataField.KAFKA_TOPIC, record.topic());
+ writeValue(rowWriter, MetaDataField.KAFKA_PARTITION_ID,
record.partition());
+ writeValue(rowWriter, MetaDataField.KAFKA_OFFSET, record.offset());
+ writeValue(rowWriter, MetaDataField.KAFKA_TIMESTAMP, record.timestamp());
+ writeValue(rowWriter, MetaDataField.KAFKA_MSG_KEY, record.key() != null
? record.key().toString() : null);
+ rowWriter.save();
+ }
+ }
+
+ private <T> void writeValue(RowSetLoader rowWriter, MetaDataField
metaDataField, T value) {
+ if (rowWriter.tupleSchema().column(metaDataField.getFieldName()) == null) {
+ ColumnMetadata colSchema =
MetadataUtils.newScalar(metaDataField.getFieldName(),
metaDataField.getFieldType(), TypeProtos.DataMode.OPTIONAL);
+ rowWriter.addColumn(colSchema);
+ }
+ rowWriter.column(metaDataField.getFieldName()).setObject(value);
+ }
+
@Override
- public void ensureAtLeastOneField() {
- jsonReader.ensureAtLeastOneField(writer);
+ public ResultSetLoader getResultSetLoader() {
+ return resultSetLoader;
}
@Override
@@ -119,10 +134,16 @@ public class JsonMessageReader implements MessageReader {
}
@Override
+ public boolean endBatch() {
+ kafkaJsonLoader.endBatch();
+ return resultSetLoader.hasRows();
+ }
+
+ @Override
public void close() {
- this.writer.clear();
try {
- this.writer.close();
+ kafkaJsonLoader.close();
+ resultSetLoader.close();
} catch (Exception e) {
logger.warn("Error while closing JsonMessageReader: {}", e.getMessage());
}
@@ -130,26 +151,29 @@ public class JsonMessageReader implements MessageReader {
@Override
public String toString() {
- return "JsonMessageReader[jsonReader=" + jsonReader + "]";
+ return new StringJoiner(", ", JsonMessageReader.class.getSimpleName() +
"[", "]")
+ .add("kafkaJsonLoader=" + kafkaJsonLoader)
+ .add("resultSetLoader=" + resultSetLoader)
+ .toString();
}
- /**
- * Converts {@link JsonProcessor.ReadState} into true / false result.
- *
- * @param jsonReadState JSON reader read state
- * @return true if read was successful, false otherwise
- * @throws IllegalArgumentException if unexpected read state was encountered
- */
- private boolean convertJsonReadState(JsonProcessor.ReadState jsonReadState) {
- switch (jsonReadState) {
- case WRITE_SUCCEED:
- case END_OF_STREAM:
- return true;
- case JSON_RECORD_PARSE_ERROR:
- case JSON_RECORD_PARSE_EOF_ERROR:
- return false;
- default:
- throw new IllegalArgumentException("Unexpected JSON read state: " +
jsonReadState);
+ public static class SingleElementIterator<T> implements Iterator<T> {
+ private T value;
+
+ @Override
+ public boolean hasNext() {
+ return value != null;
+ }
+
+ @Override
+ public T next() {
+ T value = this.value;
+ this.value = null;
+ return value;
+ }
+
+ public void setValue(T value) {
+ this.value = value;
}
}
}
diff --git
a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/MetaDataField.java
b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/KafkaJsonLoader.java
similarity index 57%
copy from
contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/MetaDataField.java
copy to
contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/KafkaJsonLoader.java
index af3e163..c09fa52 100644
---
a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/MetaDataField.java
+++
b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/KafkaJsonLoader.java
@@ -15,27 +15,25 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.drill.exec.store.kafka;
+package org.apache.drill.exec.store.kafka.decoders;
-/**
- * MetaData fields provide additional information about each message.
- * It is expected that one should not modify the fieldName of each constant as
it breaks the compatibility.
- */
-public enum MetaDataField {
+import org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl;
- KAFKA_TOPIC("kafkaTopic"),
- KAFKA_PARTITION_ID("kafkaPartitionId"),
- KAFKA_OFFSET("kafkaMsgOffset"),
- KAFKA_TIMESTAMP("kafkaMsgTimestamp"),
- KAFKA_MSG_KEY("kafkaMsgKey");
+public class KafkaJsonLoader extends JsonLoaderImpl {
- private final String fieldName;
+ protected KafkaJsonLoader(KafkaJsonLoaderBuilder builder) {
+ super(builder);
+ }
- MetaDataField(final String fieldName) {
- this.fieldName = fieldName;
+ @Override
+ public void endBatch() {
+ super.endBatch();
}
- public String getFieldName() {
- return fieldName;
+ public static class KafkaJsonLoaderBuilder extends JsonLoaderBuilder {
+ @Override
+ public KafkaJsonLoader build() {
+ return new KafkaJsonLoader(this);
+ }
}
}
diff --git
a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/MessageReader.java
b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/MessageReader.java
index f925fce..2e78780 100644
---
a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/MessageReader.java
+++
b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/MessageReader.java
@@ -17,17 +17,14 @@
*/
package org.apache.drill.exec.store.kafka.decoders;
-import java.io.Closeable;
-import java.util.List;
-
-import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.physical.impl.scan.framework.SchemaNegotiator;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
import org.apache.drill.exec.store.kafka.KafkaStoragePlugin;
import org.apache.drill.exec.store.kafka.ReadOptions;
-import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
-import io.netty.buffer.DrillBuf;
+import java.io.Closeable;
/**
* MessageReader interface provides mechanism to handle various Kafka Message
@@ -35,11 +32,13 @@ import io.netty.buffer.DrillBuf;
*/
public interface MessageReader extends Closeable {
- void init(DrillBuf buf, List<SchemaPath> columns, VectorContainerWriter
writer, ReadOptions readOptions);
+ void init(SchemaNegotiator negotiator, ReadOptions readOptions,
KafkaStoragePlugin plugin);
- boolean readMessage(ConsumerRecord<?, ?> message);
-
- void ensureAtLeastOneField();
+ void readMessage(ConsumerRecord<?, ?> message);
KafkaConsumer<byte[], byte[]> getConsumer(KafkaStoragePlugin plugin);
+
+ ResultSetLoader getResultSetLoader();
+
+ boolean endBatch();
}
diff --git
a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaMessageGenerator.java
b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaMessageGenerator.java
index 745edb5..807c84e 100644
---
a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaMessageGenerator.java
+++
b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaMessageGenerator.java
@@ -17,15 +17,14 @@
*/
package org.apache.drill.exec.store.kafka;
-import java.io.IOException;
import java.util.List;
-import java.util.Map;
import java.util.Properties;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
+import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData.Record;
import org.apache.avro.generic.GenericRecord;
@@ -40,8 +39,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
-import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
-import org.apache.drill.shaded.guava.com.google.common.io.Resources;
import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import com.google.gson.JsonPrimitive;
@@ -49,7 +46,10 @@ import com.google.gson.JsonPrimitive;
public class KafkaMessageGenerator {
private static final Logger logger =
LoggerFactory.getLogger(KafkaMessageGenerator.class);
- private Properties producerProperties = new Properties();
+
+ public static final String SCHEMA_REGISTRY_URL = "mock://testurl";
+
+ private final Properties producerProperties = new Properties();
public KafkaMessageGenerator (final String broker, Class<?> valueSerializer)
{
producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, broker);
@@ -63,12 +63,23 @@ public class KafkaMessageGenerator {
producerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
producerProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
valueSerializer);
producerProperties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
//So that retries do not cause duplicates
+
producerProperties.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
SCHEMA_REGISTRY_URL);
}
- public void populateAvroMsgIntoKafka(String topic, int numMsg) throws
IOException {
- try (KafkaProducer<String, GenericRecord> producer = new
KafkaProducer<>(producerProperties)) {
+ public void populateAvroMsgIntoKafka(String topic, int numMsg) {
+ try (KafkaProducer<Object, GenericRecord> producer = new
KafkaProducer<>(producerProperties)) {
Schema.Parser parser = new Schema.Parser();
- Schema schema =
parser.parse(Resources.getResource("drill-avro-test.avsc").openStream());
+ String userSchema = "{\"type\":\"record\"," +
+ "\"name\":\"myrecord\"," +
+ "\"fields\":[" +
+ "{\"name\":\"key1\",\"type\":\"string\"}," +
+ "{\"name\":\"key2\",\"type\":\"int\"}," +
+ "{\"name\":\"key3\",\"type\":\"boolean\"}," +
+
"{\"name\":\"key5\",\"type\":{\"type\":\"array\",\"items\":\"int\"}}," +
+
"{\"name\":\"key6\",\"type\":{\"type\":\"record\",\"name\":\"myrecord6\",\"fields\":["
+
+ "{\"name\":\"key61\",\"type\":\"double\"}," +
+ "{\"name\":\"key62\",\"type\":\"double\"}]}}]}";
+ Schema schema = parser.parse(userSchema);
GenericRecordBuilder builder = new GenericRecordBuilder(schema);
Random rand = new Random();
for (int i = 0; i < numMsg; ++i) {
@@ -82,14 +93,14 @@ public class KafkaMessageGenerator {
list.add(rand.nextInt(100));
builder.set("key5", list);
- Map<String, Double> map = Maps.newHashMap();
- map.put("key61", rand.nextDouble());
- map.put("key62", rand.nextDouble());
- builder.set("key6", map);
+ GenericRecordBuilder innerBuilder = new
GenericRecordBuilder(schema.getField("key6").schema());
+ innerBuilder.set("key61", rand.nextDouble());
+ innerBuilder.set("key62", rand.nextDouble());
+ builder.set("key6", innerBuilder.build());
Record producerRecord = builder.build();
- ProducerRecord<String, GenericRecord> record = new
ProducerRecord<>(topic, producerRecord);
+ ProducerRecord<Object, GenericRecord> record = new
ProducerRecord<>(topic, producerRecord);
producer.send(record);
}
}
diff --git
a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaQueriesTest.java
b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaQueriesTest.java
index 9e89171..d690e70 100644
---
a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaQueriesTest.java
+++
b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaQueriesTest.java
@@ -67,6 +67,18 @@ public class KafkaQueriesTest extends KafkaTestBase {
}
@Test
+ public void testAvroResultCount() {
+ try {
+ client.alterSession(ExecConstants.KAFKA_RECORD_READER,
+ "org.apache.drill.exec.store.kafka.decoders.AvroMessageReader");
+ String queryString = String.format(TestQueryConstants.MSG_SELECT_QUERY,
TestQueryConstants.AVRO_TOPIC);
+ runKafkaSQLVerifyCount(queryString, TestKafkaSuit.NUM_JSON_MSG);
+ } finally {
+ client.resetSession(ExecConstants.KAFKA_RECORD_READER);
+ }
+ }
+
+ @Test
public void testPartitionMinOffset() throws Exception {
// following kafka.tools.GetOffsetShell for earliest as -2
Map<TopicPartition, Long> startOffsetsMap = fetchOffsets(-2);
@@ -144,6 +156,19 @@ public class KafkaQueriesTest extends KafkaTestBase {
}
@Test
+ public void testPhysicalPlanSubmissionAvro() throws Exception {
+ try {
+ client.alterSession(ExecConstants.KAFKA_RECORD_READER,
+ "org.apache.drill.exec.store.kafka.decoders.AvroMessageReader");
+ String query = String.format(TestQueryConstants.MSG_SELECT_QUERY,
TestQueryConstants.AVRO_TOPIC);
+ String plan = queryBuilder().sql(query).explainJson();
+ queryBuilder().physical(plan).run();
+ } finally {
+ client.resetSession(ExecConstants.KAFKA_RECORD_READER);
+ }
+ }
+
+ @Test
public void testOneMessageTopic() throws Exception {
String topicName = "topicWithOneMessage";
TestKafkaSuit.createTopicHelper(topicName, 1);
diff --git
a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaTestBase.java
b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaTestBase.java
index c233b1f..c426b06 100644
---
a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaTestBase.java
+++
b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaTestBase.java
@@ -19,6 +19,7 @@ package org.apache.drill.exec.store.kafka;
import java.util.Map;
+import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.store.StoragePluginRegistry;
import org.apache.drill.exec.store.kafka.cluster.EmbeddedKafkaCluster;
@@ -34,6 +35,8 @@ import org.junit.BeforeClass;
import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
+import static
org.apache.drill.exec.store.kafka.KafkaMessageGenerator.SCHEMA_REGISTRY_URL;
+
public class KafkaTestBase extends ClusterTest {
protected static KafkaStoragePluginConfig storagePluginConfig;
@@ -52,6 +55,7 @@ public class KafkaTestBase extends ClusterTest {
Map<String, String> kafkaConsumerProps = Maps.newHashMap();
kafkaConsumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
embeddedKafkaCluster.getKafkaBrokerList());
kafkaConsumerProps.put(ConsumerConfig.GROUP_ID_CONFIG,
"drill-test-consumer");
+
kafkaConsumerProps.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
SCHEMA_REGISTRY_URL);
storagePluginConfig = new KafkaStoragePluginConfig(kafkaConsumerProps);
storagePluginConfig.setEnabled(true);
pluginRegistry.put(KafkaStoragePluginConfig.NAME, storagePluginConfig);
diff --git
a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/TestKafkaSuit.java
b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/TestKafkaSuit.java
index 7bb099d..73384d2 100644
---
a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/TestKafkaSuit.java
+++
b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/TestKafkaSuit.java
@@ -17,6 +17,7 @@
*/
package org.apache.drill.exec.store.kafka;
+import io.confluent.kafka.serializers.KafkaAvroSerializer;
import kafka.zk.KafkaZkClient;
import org.apache.drill.categories.KafkaStorageTest;
import org.apache.drill.categories.SlowTest;
@@ -85,8 +86,11 @@ public class TestKafkaSuit extends BaseTest {
"kafka.server", "SessionExpireListener",
Option.<String>empty(), Option.<ZKClientConfig>empty());
createTopicHelper(TestQueryConstants.JSON_TOPIC, 1);
+ createTopicHelper(TestQueryConstants.AVRO_TOPIC, 1);
KafkaMessageGenerator generator = new
KafkaMessageGenerator(embeddedKafkaCluster.getKafkaBrokerList(),
StringSerializer.class);
+ KafkaMessageGenerator avroGenerator = new
KafkaMessageGenerator(embeddedKafkaCluster.getKafkaBrokerList(),
KafkaAvroSerializer.class);
generator.populateJsonMsgIntoKafka(TestQueryConstants.JSON_TOPIC,
NUM_JSON_MSG);
+ avroGenerator.populateAvroMsgIntoKafka(TestQueryConstants.AVRO_TOPIC,
NUM_JSON_MSG);
}
initCount.incrementAndGet();
runningSuite = true;
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/JsonLoaderImpl.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/JsonLoaderImpl.java
index f1e934f..97cd200 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/JsonLoaderImpl.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/JsonLoaderImpl.java
@@ -21,6 +21,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.Reader;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import org.apache.drill.common.exceptions.CustomErrorContext;
@@ -141,7 +142,7 @@ public class JsonLoaderImpl implements JsonLoader,
ErrorFactory {
private TupleMetadata providedSchema;
private JsonLoaderOptions options;
private CustomErrorContext errorContext;
- private InputStream stream;
+ private Iterable<InputStream> streams;
private Reader reader;
private String dataPath;
private MessageParser messageParser;
@@ -171,8 +172,13 @@ public class JsonLoaderImpl implements JsonLoader,
ErrorFactory {
return this;
}
- public JsonLoaderBuilder fromStream(InputStream stream) {
- this.stream = stream;
+ public JsonLoaderBuilder fromStream(InputStream... stream) {
+ this.streams = Arrays.asList(stream);
+ return this;
+ }
+
+ public JsonLoaderBuilder fromStream(Iterable<InputStream> streams) {
+ this.streams = streams;
return this;
}
@@ -226,7 +232,7 @@ public class JsonLoaderImpl implements JsonLoader,
ErrorFactory {
// case. Usually just one or two fields have deferred nulls.
private final List<NullTypeMarker> nullStates = new ArrayList<>();
- private JsonLoaderImpl(JsonLoaderBuilder builder) {
+ protected JsonLoaderImpl(JsonLoaderBuilder builder) {
this.rsLoader = builder.rsLoader;
this.options = builder.options;
this.errorContext = builder. errorContext;
@@ -236,7 +242,7 @@ public class JsonLoaderImpl implements JsonLoader,
ErrorFactory {
private JsonStructureParser buildParser(JsonLoaderBuilder builder) {
return new JsonStructureParserBuilder()
- .fromStream(builder.stream)
+ .fromStream(builder.streams)
.fromReader(builder.reader)
.options(builder.options)
.parserFactory(parser ->
@@ -307,15 +313,12 @@ public class JsonLoaderImpl implements JsonLoader,
ErrorFactory {
* Bottom line: the user is responsible for not giving Drill
* ambiguous data that would require Drill to predict the future.
*/
- private void endBatch() {
+ protected void endBatch() {
// Make a copy. Forcing resolution will remove the
// element from the original list.
- List<NullTypeMarker> copy = new ArrayList<>();
- copy.addAll(nullStates);
- for (NullTypeMarker marker : copy) {
- marker.forceResolution();
- }
+ List<NullTypeMarker> copy = new ArrayList<>(nullStates);
+ copy.forEach(NullTypeMarker::forceResolution);
assert nullStates.isEmpty();
}
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/JsonStructureOptions.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/JsonStructureOptions.java
index f4a3277..ebaae46 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/JsonStructureOptions.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/JsonStructureOptions.java
@@ -47,6 +47,13 @@ public class JsonStructureOptions {
*/
public boolean skipMalformedRecords;
+ /**
+ * This property works only when {@link #skipMalformedRecords} enabled.
+ * If true, {@link TokenIterator.RecoverableJsonException} will be populated
for the case of
+ * malformed empty document, so it will be possible to handle this exception
by caller.
+ */
+ public boolean skipMalformedDocument;
+
public boolean enableEscapeAnyChar;
public JsonStructureOptions() { }
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/JsonStructureParser.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/JsonStructureParser.java
index ded872c..2f71aa8 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/JsonStructureParser.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/JsonStructureParser.java
@@ -20,8 +20,10 @@ package org.apache.drill.exec.store.easy.json.parser;
import java.io.IOException;
import java.io.InputStream;
import java.io.Reader;
+import java.util.Arrays;
import java.util.function.Function;
+import org.apache.commons.collections4.IterableUtils;
import
org.apache.drill.exec.store.easy.json.parser.MessageParser.MessageContextException;
import
org.apache.drill.exec.store.easy.json.parser.RootParser.EmbeddedArrayParser;
import
org.apache.drill.exec.store.easy.json.parser.RootParser.EmbeddedObjectParser;
@@ -74,7 +76,7 @@ public class JsonStructureParser {
protected static final Logger logger =
LoggerFactory.getLogger(JsonStructureParser.class);
public static class JsonStructureParserBuilder {
- private InputStream stream;
+ private Iterable<InputStream> streams;
private Reader reader;
private JsonStructureOptions options;
private Function<JsonStructureParser, ObjectParser> parserFactory;
@@ -98,8 +100,13 @@ public class JsonStructureParser {
return this;
}
- public JsonStructureParserBuilder fromStream(InputStream stream) {
- this.stream = stream;
+ public JsonStructureParserBuilder fromStream(InputStream... stream) {
+ this.streams = Arrays.asList(stream);
+ return this;
+ }
+
+ public JsonStructureParserBuilder fromStream(Iterable<InputStream>
streams) {
+ this.streams = streams;
return this;
}
@@ -130,7 +137,6 @@ public class JsonStructureParser {
}
}
- private final JsonParser parser;
private final JsonStructureOptions options;
private final ErrorFactory errorFactory;
private final TokenIterator tokenizer;
@@ -141,37 +147,23 @@ public class JsonStructureParser {
/**
* Constructor for the structure parser.
*
- * @param stream the source of JSON text
- * @param options configuration options for the Jackson JSON parser
- * and this structure parser
- * @param rootListener listener for the top-level objects in the
- * JSON stream
- * @param errorFactory factory for errors thrown for various
- * conditions
+ * @param builder builder
*/
private JsonStructureParser(JsonStructureParserBuilder builder) {
this.options = Preconditions.checkNotNull(builder.options);
this.errorFactory = Preconditions.checkNotNull(builder.errorFactory);
- try {
- ObjectMapper mapper = new ObjectMapper()
- .configure(JsonParser.Feature.ALLOW_COMMENTS, true)
- .configure(JsonParser.Feature.ALLOW_UNQUOTED_FIELD_NAMES, true)
- .configure(JsonReadFeature.ALLOW_NON_NUMERIC_NUMBERS.mappedFeature(),
- options.allowNanInf)
-
.configure(JsonReadFeature.ALLOW_BACKSLASH_ESCAPING_ANY_CHARACTER.mappedFeature(),
- options.enableEscapeAnyChar);
-
- if (builder.stream != null) {
- parser = mapper.getFactory().createParser(builder.stream);
- } else {
- parser =
mapper.getFactory().createParser(Preconditions.checkNotNull(builder.reader));
- }
- } catch (JsonParseException e) {
- throw errorFactory().parseError("Failed to create the JSON parser", e);
- } catch (IOException e) {
- throw errorFactory().ioException(e);
- }
- tokenizer = new TokenIterator(parser, options, errorFactory());
+ ObjectMapper mapper = new ObjectMapper()
+ .configure(JsonParser.Feature.ALLOW_COMMENTS, true)
+ .configure(JsonParser.Feature.ALLOW_UNQUOTED_FIELD_NAMES, true)
+ .configure(JsonReadFeature.ALLOW_NON_NUMERIC_NUMBERS.mappedFeature(),
+ options.allowNanInf)
+
.configure(JsonReadFeature.ALLOW_BACKSLASH_ESCAPING_ANY_CHARACTER.mappedFeature(),
+ options.enableEscapeAnyChar);
+
+ boolean isStream = !IterableUtils.isEmpty(builder.streams);
+ Function<InputStream, JsonParser> parserFunction = stream ->
getJsonParser(builder, mapper, stream, isStream);
+
+ tokenizer = new TokenIterator(builder.streams, parserFunction, options,
errorFactory());
fieldFactory = new FieldParserFactory(this,
Preconditions.checkNotNull(builder.parserFactory));
@@ -185,6 +177,20 @@ public class JsonStructureParser {
}
}
+ private JsonParser getJsonParser(JsonStructureParserBuilder builder,
ObjectMapper mapper, InputStream stream, boolean isStream) {
+ try {
+ if (isStream) {
+ return mapper.getFactory().createParser(stream);
+ } else {
+ return
mapper.getFactory().createParser(Preconditions.checkNotNull(builder.reader));
+ }
+ } catch (JsonParseException e) {
+ throw errorFactory().parseError("Failed to create the JSON parser", e);
+ } catch (IOException e) {
+ throw errorFactory().ioException(e);
+ }
+ }
+
public JsonStructureOptions options() { return options; }
public ErrorFactory errorFactory() { return errorFactory; }
public FieldParserFactory fieldFactory() { return fieldFactory; }
@@ -263,7 +269,7 @@ public class JsonStructureParser {
*
* @return {@code true} if another record can be read, {@code false}
* if EOF.
- * @throws UserException if the error is unrecoverable
+ * @throws org.apache.drill.common.exceptions.UserException if the error is
unrecoverable
* @see <a
href="https://issues.apache.org/jira/browse/DRILL-4653">DRILL-4653</a>
* @see <a
href="https://issues.apache.org/jira/browse/DRILL-5953">DRILL-5953</a>
*/
@@ -273,12 +279,12 @@ public class JsonStructureParser {
while (true) {
while (true) {
try {
- if (parser.isClosed()) {
+ if (tokenizer.getParser().isClosed()) {
throw errorFactory().unrecoverableError();
}
JsonToken token = tokenizer.next();
if (token == null) {
- if (firstAttempt) {
+ if (firstAttempt && !options().skipMalformedDocument) {
throw errorFactory().unrecoverableError();
}
return false;
@@ -331,7 +337,7 @@ public class JsonStructureParser {
errorRecoveryCount);
}
try {
- parser.close();
+ tokenizer.close();
} catch (IOException e) {
logger.warn("Ignored failure when closing JSON source", e);
}
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/TokenIterator.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/TokenIterator.java
index a7d6a34..38774c6 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/TokenIterator.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/TokenIterator.java
@@ -18,7 +18,11 @@
package org.apache.drill.exec.store.easy.json.parser;
import java.io.IOException;
+import java.io.InputStream;
+import java.util.Iterator;
+import java.util.function.Function;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.exec.vector.accessor.UnsupportedConversionError;
import com.fasterxml.jackson.core.JsonLocation;
@@ -37,16 +41,24 @@ public class TokenIterator {
public static class RecoverableJsonException extends RuntimeException {
}
- private final JsonParser parser;
+ private final ParserManager parserManager;
private final JsonStructureOptions options;
private final ErrorFactory errorFactory;
private final JsonToken[] lookahead = new JsonToken[MAX_LOOKAHEAD];
private int count;
- public TokenIterator(JsonParser parser, JsonStructureOptions options,
ErrorFactory errorFactory) {
- this.parser = parser;
+ public TokenIterator(Iterable<InputStream> streams, Function<InputStream,
JsonParser> parserFunction, JsonStructureOptions options, ErrorFactory
errorFactory) {
this.options = options;
this.errorFactory = errorFactory;
+ this.parserManager = new ParserManager(streams, parserFunction);
+ }
+
+ public JsonParser getParser() {
+ JsonParser parser = parserManager.getParser();
+ if (parser == null) {
+ parserManager.nextParser();
+ }
+ return parserManager.getParser();
}
public ErrorFactory errorFactory() { return errorFactory; }
@@ -56,7 +68,7 @@ public class TokenIterator {
return lookahead[--count];
}
try {
- return parser.nextToken();
+ return getNextToken();
} catch (JsonParseException e) {
if (options.skipMalformedRecords) {
throw new RecoverableJsonException();
@@ -68,11 +80,23 @@ public class TokenIterator {
}
}
+ private JsonToken getNextToken() throws IOException {
+ JsonToken jsonToken = getParser().nextToken();
+ if (jsonToken == null) {
+ parserManager.nextParser();
+ JsonParser parser = getParser();
+ if (parser != null) {
+ jsonToken = parser.nextToken();
+ }
+ }
+ return jsonToken;
+ }
+
public String context() {
- JsonLocation location = parser.getCurrentLocation();
+ JsonLocation location = getParser().getCurrentLocation();
String token;
try {
- token = parser.getText();
+ token = getParser().getText();
} catch (IOException e) {
token = "<unknown>";
}
@@ -88,16 +112,19 @@ public class TokenIterator {
}
public int lineNumber() {
- return parser.getCurrentLocation().getLineNr();
+ JsonParser parser = getParser();
+ return parser != null ? parser.getCurrentLocation().getLineNr() : 0;
}
public int columnNumber() {
- return parser.getCurrentLocation().getColumnNr();
+ JsonParser parser = getParser();
+ return parser != null ? parser.getCurrentLocation().getColumnNr() : 0;
}
public String token() {
try {
- return parser.getText();
+ JsonParser parser = getParser();
+ return parser != null ? getParser().getText() : null;
} catch (IOException e) {
return null;
}
@@ -127,7 +154,7 @@ public class TokenIterator {
public String textValue() {
try {
- return parser.getText();
+ return getParser().getText();
} catch (JsonParseException e) {
throw errorFactory.syntaxError(e);
} catch (IOException e) {
@@ -137,7 +164,7 @@ public class TokenIterator {
public long longValue() {
try {
- return parser.getLongValue();
+ return getParser().getLongValue();
} catch (JsonParseException e) {
throw errorFactory.syntaxError(e);
} catch (IOException e) {
@@ -149,7 +176,7 @@ public class TokenIterator {
public String stringValue() {
try {
- return parser.getValueAsString();
+ return getParser().getValueAsString();
} catch (JsonParseException e) {
throw errorFactory.syntaxError(e);
} catch (IOException e) {
@@ -161,7 +188,7 @@ public class TokenIterator {
public double doubleValue() {
try {
- return parser.getValueAsDouble();
+ return getParser().getValueAsDouble();
} catch (JsonParseException e) {
throw errorFactory.syntaxError(e);
} catch (IOException e) {
@@ -173,7 +200,7 @@ public class TokenIterator {
public byte[] binaryValue() {
try {
- return parser.getBinaryValue();
+ return getParser().getBinaryValue();
} catch (JsonParseException e) {
throw errorFactory.syntaxError(e);
} catch (IOException e) {
@@ -186,4 +213,46 @@ public class TokenIterator {
public RuntimeException invalidValue(JsonToken token) {
return errorFactory.structureError("Unexpected JSON value: " +
token.name());
}
+
+ public static class ParserManager {
+ private final Function<InputStream, JsonParser> parserFunction;
+ private final Iterator<InputStream> parsersIterator;
+ private JsonParser currentParser;
+
+ public ParserManager(Iterable<InputStream> parsers, Function<InputStream,
JsonParser> parserFunction) {
+ this.parsersIterator = parsers.iterator();
+ this.parserFunction = parserFunction;
+ this.nextParser();
+ }
+
+ public JsonParser getParser() {
+ return currentParser;
+ }
+
+ public ParserManager nextParser() {
+ if (parsersIterator.hasNext()) {
+ try {
+ if (currentParser != null) {
+ currentParser.close();
+ }
+ } catch (IOException e) {
+ throw new DrillRuntimeException(e);
+ }
+ currentParser = parserFunction.apply(parsersIterator.next());
+ } else {
+ currentParser = null;
+ }
+ return this;
+ }
+
+ public void close() throws IOException {
+ if (currentParser != null) {
+ currentParser.close();
+ }
+ }
+ }
+
+ public void close() throws IOException {
+ parserManager.close();
+ }
}
diff --git a/exec/jdbc-all/pom.xml b/exec/jdbc-all/pom.xml
index c865756..59a23b9 100644
--- a/exec/jdbc-all/pom.xml
+++ b/exec/jdbc-all/pom.xml
@@ -336,6 +336,7 @@
<exclude>javax.xml.stream:stax-api</exclude>
<exclude>javax.activation:activation</exclude>
<exclude>commons-cli:commons-cli</exclude>
+ <exclude>org.apache.commons:commons-collections4</exclude>
<exclude>commons-io:commons-io</exclude>
<exclude>commons-beanutils:commons-beanutils-core:jar:*</exclude>
<exclude>commons-beanutils:commons-beanutils:jar:*</exclude>
@@ -622,6 +623,7 @@
<exclude>javax.xml.stream:stax-api</exclude>
<exclude>javax.activation:activation</exclude>
<exclude>commons-cli:commons-cli</exclude>
+ <exclude>org.apache.commons:commons-collections4</exclude>
<exclude>commons-io:commons-io</exclude>
<exclude>commons-beanutils:commons-beanutils-core:jar:*</exclude>
<exclude>commons-beanutils:commons-beanutils:jar:*</exclude>
diff --git a/pom.xml b/pom.xml
index 312bc62..3648921 100644
--- a/pom.xml
+++ b/pom.xml
@@ -70,6 +70,7 @@
<findbugs.version>3.0.0</findbugs.version>
<netty.tcnative.classifier />
<commons.io.version>2.7</commons.io.version>
+ <commons.collections.version>4.4</commons.collections.version>
<hamcrest.core.version>1.3</hamcrest.core.version>
<curator.version>5.1.0</curator.version>
<wiremock.standalone.version>2.23.2</wiremock.standalone.version>
@@ -1085,6 +1086,10 @@
<artifactId>commons-io</artifactId>
<version>${commons.io.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-collections4</artifactId>
+ </dependency>
<!-- Test Dependencies -->
<dependency>
@@ -1827,6 +1832,11 @@
<version>${zookeeper.version}</version>
</dependency>
<dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-collections4</artifactId>
+ <version>${commons.collections.version}</version>
+ </dependency>
+ <dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-client</artifactId>
<version>${curator.version}</version>