vvysotskyi commented on a change in pull request #2239:
URL: https://github.com/apache/drill/pull/2239#discussion_r640779940
##########
File path:
exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/JsonStructureOptions.java
##########
@@ -47,6 +47,13 @@
*/
public boolean skipMalformedRecords;
+ /**
+ * This property works only when {@link #skipMalformedRecords} enabled.
+ * If true, {@link TokenIterator.RecoverableJsonException} will be populated
for the case of
+ * malformed empty document, so it will be possible to handle this exception
by caller.
+ */
+ public boolean skipMalformedDocument;
Review comment:
There is no option for this. It is for internal usage only.
##########
File path:
contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/JsonMessageReader.java
##########
@@ -55,61 +47,78 @@
public class JsonMessageReader implements MessageReader {
private static final Logger logger =
LoggerFactory.getLogger(JsonMessageReader.class);
- private JsonReader jsonReader;
- private VectorContainerWriter writer;
- private ObjectMapper objectMapper;
+
+ private final SingleElementIterator<InputStream> stream = new
SingleElementIterator<>();
+
+ private KafkaJsonLoader kafkaJsonLoader;
+ private ResultSetLoader resultSetLoader;
+ private SchemaNegotiator negotiator;
+ private ReadOptions readOptions;
+ private Properties kafkaConsumerProps;
@Override
- public void init(DrillBuf buf, List<SchemaPath> columns,
VectorContainerWriter writer, ReadOptions readOptions) {
- // set skipOuterList to false as it doesn't applicable for JSON records
and it's only applicable for JSON files.
- this.jsonReader = new JsonReader.Builder(buf)
- .schemaPathColumns(columns)
- .allTextMode(readOptions.isAllTextMode())
- .readNumbersAsDouble(readOptions.isReadNumbersAsDouble())
- .enableNanInf(readOptions.isAllowNanInf())
- .enableEscapeAnyChar(readOptions.isAllowEscapeAnyChar())
- .build();
- jsonReader.setIgnoreJSONParseErrors(readOptions.isSkipInvalidRecords());
- this.writer = writer;
- this.objectMapper = BaseJsonProcessor.getDefaultMapper()
- .configure(JsonParser.Feature.ALLOW_NON_NUMERIC_NUMBERS,
readOptions.isAllowNanInf())
- .configure(JsonParser.Feature.ALLOW_BACKSLASH_ESCAPING_ANY_CHARACTER,
readOptions.isAllowEscapeAnyChar());
+ public void init(SchemaNegotiator negotiator, ReadOptions readOptions,
KafkaStoragePlugin plugin) {
+ this.negotiator = negotiator;
+ this.resultSetLoader = negotiator.build();
+ this.readOptions = readOptions;
+ this.kafkaConsumerProps = plugin.getConfig().getKafkaConsumerProps();
}
@Override
- public boolean readMessage(ConsumerRecord<?, ?> record) {
+ public void readMessage(ConsumerRecord<?, ?> record) {
byte[] recordArray = (byte[]) record.value();
- String data = new String(recordArray, Charsets.UTF_8);
try {
- JsonNode jsonNode = objectMapper.readTree(data);
- if (jsonNode != null && jsonNode.isObject()) {
- ObjectNode objectNode = (ObjectNode) jsonNode;
- objectNode.put(KAFKA_TOPIC.getFieldName(), record.topic());
- objectNode.put(KAFKA_PARTITION_ID.getFieldName(), record.partition());
- objectNode.put(KAFKA_OFFSET.getFieldName(), record.offset());
- objectNode.put(KAFKA_TIMESTAMP.getFieldName(), record.timestamp());
- objectNode.put(KAFKA_MSG_KEY.getFieldName(), record.key() != null ?
record.key().toString() : null);
- } else {
- throw new IOException("Unsupported node type: " + (jsonNode == null ?
"NO CONTENT" : jsonNode.getNodeType()));
+ parseAndWrite(record, recordArray);
+ } catch (TokenIterator.RecoverableJsonException e) {
+ if (!readOptions.isSkipInvalidRecords()) {
+ throw e;
Review comment:
Thanks, done
##########
File path:
contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaRecordReader.java
##########
@@ -17,131 +17,105 @@
*/
package org.apache.drill.exec.store.kafka;
-import java.io.IOException;
-import java.util.Collection;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-
+import org.apache.drill.common.exceptions.ChildErrorContext;
+import org.apache.drill.common.exceptions.CustomErrorContext;
import org.apache.drill.common.exceptions.UserException;
-import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.ops.OperatorContext;
-import org.apache.drill.exec.physical.impl.OutputMutator;
-import org.apache.drill.exec.store.AbstractRecordReader;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.framework.SchemaNegotiator;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.server.options.OptionManager;
import org.apache.drill.exec.store.kafka.decoders.MessageReader;
import org.apache.drill.exec.store.kafka.decoders.MessageReaderFactory;
-import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
-import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+import java.io.IOException;
-public class KafkaRecordReader extends AbstractRecordReader {
+public class KafkaRecordReader implements ManagedReader<SchemaNegotiator> {
private static final Logger logger =
LoggerFactory.getLogger(KafkaRecordReader.class);
- private static final long DEFAULT_MESSAGES_PER_BATCH = 4000;
-
private final ReadOptions readOptions;
private final KafkaStoragePlugin plugin;
private final KafkaPartitionScanSpec subScanSpec;
+ private final int maxRecords;
- private VectorContainerWriter writer;
private MessageReader messageReader;
-
private long currentOffset;
private MessageIterator msgItr;
- private int currentMessageCount;
- public KafkaRecordReader(KafkaPartitionScanSpec subScanSpec,
List<SchemaPath> projectedColumns,
- FragmentContext context, KafkaStoragePlugin plugin) {
- setColumns(projectedColumns);
- this.readOptions = new ReadOptions(context.getOptions());
+ public KafkaRecordReader(KafkaPartitionScanSpec subScanSpec, OptionManager
options, KafkaStoragePlugin plugin, int maxRecords) {
+ this.readOptions = new ReadOptions(options);
this.plugin = plugin;
this.subScanSpec = subScanSpec;
+ this.maxRecords = maxRecords;
}
@Override
- protected Collection<SchemaPath> transformColumns(Collection<SchemaPath>
projectedColumns) {
- Set<SchemaPath> transformed = new LinkedHashSet<>();
- if (isStarQuery()) {
- transformed.add(SchemaPath.STAR_COLUMN);
- } else {
- transformed.addAll(projectedColumns);
- }
- return transformed;
- }
+ public boolean open(SchemaNegotiator negotiator) {
+ CustomErrorContext errorContext = new
ChildErrorContext(negotiator.parentErrorContext()) {
+ @Override
+ public void addContext(UserException.Builder builder) {
+ super.addContext(builder);
+ builder.addContext("topic_name", subScanSpec.getTopicName());
+ }
+ };
+ negotiator.setErrorContext(errorContext);
- @Override
- public void setup(OperatorContext context, OutputMutator output) {
- this.writer = new VectorContainerWriter(output,
readOptions.isEnableUnionType());
messageReader =
MessageReaderFactory.getMessageReader(readOptions.getMessageReader());
- messageReader.init(context.getManagedBuffer(),
Lists.newArrayList(getColumns()), writer, readOptions);
+ messageReader.init(negotiator, readOptions, plugin);
msgItr = new MessageIterator(messageReader.getConsumer(plugin),
subScanSpec, readOptions.getPollTimeOut());
+
+ return true;
}
/**
* KafkaConsumer.poll will fetch 500 messages per poll call. So hasNext will
* take care of polling multiple times for this given batch next invocation
*/
@Override
- public int next() {
- writer.allocate();
- writer.reset();
- Stopwatch watch = logger.isDebugEnabled() ? Stopwatch.createStarted() :
null;
- currentMessageCount = 0;
-
- try {
- while (currentOffset < subScanSpec.getEndOffset() && msgItr.hasNext()) {
- ConsumerRecord<byte[], byte[]> consumerRecord = msgItr.next();
- currentOffset = consumerRecord.offset();
- writer.setPosition(currentMessageCount);
- boolean status = messageReader.readMessage(consumerRecord);
- // increment record count only if message was read successfully
- if (status) {
- if (++currentMessageCount >= DEFAULT_MESSAGES_PER_BATCH) {
- break;
- }
- }
+ public boolean next() {
+ RowSetLoader rowWriter = messageReader.getResultSetLoader().writer();
Review comment:
Yes, it works slightly differently, `rowWriter.start()` is called in
`MessageReader.readMessage()` method after all checks below for batch size are
passed.
##########
File path:
contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaRecordReader.java
##########
@@ -17,131 +17,105 @@
*/
package org.apache.drill.exec.store.kafka;
-import java.io.IOException;
-import java.util.Collection;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-
+import org.apache.drill.common.exceptions.ChildErrorContext;
+import org.apache.drill.common.exceptions.CustomErrorContext;
import org.apache.drill.common.exceptions.UserException;
-import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.ops.OperatorContext;
-import org.apache.drill.exec.physical.impl.OutputMutator;
-import org.apache.drill.exec.store.AbstractRecordReader;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.framework.SchemaNegotiator;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.server.options.OptionManager;
import org.apache.drill.exec.store.kafka.decoders.MessageReader;
import org.apache.drill.exec.store.kafka.decoders.MessageReaderFactory;
-import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
-import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+import java.io.IOException;
-public class KafkaRecordReader extends AbstractRecordReader {
+public class KafkaRecordReader implements ManagedReader<SchemaNegotiator> {
private static final Logger logger =
LoggerFactory.getLogger(KafkaRecordReader.class);
- private static final long DEFAULT_MESSAGES_PER_BATCH = 4000;
-
private final ReadOptions readOptions;
private final KafkaStoragePlugin plugin;
private final KafkaPartitionScanSpec subScanSpec;
+ private final int maxRecords;
- private VectorContainerWriter writer;
private MessageReader messageReader;
-
private long currentOffset;
private MessageIterator msgItr;
- private int currentMessageCount;
- public KafkaRecordReader(KafkaPartitionScanSpec subScanSpec,
List<SchemaPath> projectedColumns,
- FragmentContext context, KafkaStoragePlugin plugin) {
- setColumns(projectedColumns);
- this.readOptions = new ReadOptions(context.getOptions());
+ public KafkaRecordReader(KafkaPartitionScanSpec subScanSpec, OptionManager
options, KafkaStoragePlugin plugin, int maxRecords) {
+ this.readOptions = new ReadOptions(options);
this.plugin = plugin;
this.subScanSpec = subScanSpec;
+ this.maxRecords = maxRecords;
}
@Override
- protected Collection<SchemaPath> transformColumns(Collection<SchemaPath>
projectedColumns) {
- Set<SchemaPath> transformed = new LinkedHashSet<>();
- if (isStarQuery()) {
- transformed.add(SchemaPath.STAR_COLUMN);
- } else {
- transformed.addAll(projectedColumns);
- }
- return transformed;
- }
+ public boolean open(SchemaNegotiator negotiator) {
+ CustomErrorContext errorContext = new
ChildErrorContext(negotiator.parentErrorContext()) {
+ @Override
+ public void addContext(UserException.Builder builder) {
+ super.addContext(builder);
+ builder.addContext("topic_name", subScanSpec.getTopicName());
+ }
+ };
+ negotiator.setErrorContext(errorContext);
- @Override
- public void setup(OperatorContext context, OutputMutator output) {
- this.writer = new VectorContainerWriter(output,
readOptions.isEnableUnionType());
messageReader =
MessageReaderFactory.getMessageReader(readOptions.getMessageReader());
- messageReader.init(context.getManagedBuffer(),
Lists.newArrayList(getColumns()), writer, readOptions);
+ messageReader.init(negotiator, readOptions, plugin);
msgItr = new MessageIterator(messageReader.getConsumer(plugin),
subScanSpec, readOptions.getPollTimeOut());
+
+ return true;
}
/**
* KafkaConsumer.poll will fetch 500 messages per poll call. So hasNext will
* take care of polling multiple times for this given batch next invocation
*/
@Override
- public int next() {
- writer.allocate();
- writer.reset();
- Stopwatch watch = logger.isDebugEnabled() ? Stopwatch.createStarted() :
null;
- currentMessageCount = 0;
-
- try {
- while (currentOffset < subScanSpec.getEndOffset() && msgItr.hasNext()) {
- ConsumerRecord<byte[], byte[]> consumerRecord = msgItr.next();
- currentOffset = consumerRecord.offset();
- writer.setPosition(currentMessageCount);
- boolean status = messageReader.readMessage(consumerRecord);
- // increment record count only if message was read successfully
- if (status) {
- if (++currentMessageCount >= DEFAULT_MESSAGES_PER_BATCH) {
- break;
- }
- }
+ public boolean next() {
+ RowSetLoader rowWriter = messageReader.getResultSetLoader().writer();
+ while (!rowWriter.isFull()) {
+ if (!nextLine(rowWriter)) {
+ return false;
}
+ }
+ return messageReader.endBatch();
+ }
- if (currentMessageCount > 0) {
- messageReader.ensureAtLeastOneField();
- }
- writer.setValueCount(currentMessageCount);
- if (watch != null) {
- logger.debug("Took {} ms to process {} records.",
watch.elapsed(TimeUnit.MILLISECONDS), currentMessageCount);
- }
- logger.debug("Last offset consumed for {}:{} is {}",
subScanSpec.getTopicName(), subScanSpec.getPartitionId(),
- currentOffset);
- return currentMessageCount;
- } catch (Exception e) {
- String msg = "Failure while reading messages from kafka. Record reader
was at record: " + (currentMessageCount + 1);
- throw UserException.dataReadError(e)
- .message(msg)
- .addContext(e.getMessage())
- .build(logger);
+ private boolean nextLine(RowSetLoader rowWriter) {
+ if (rowWriter.limitReached(maxRecords)) {
+ return false;
+ }
+
+ if (currentOffset >= subScanSpec.getEndOffset() || !msgItr.hasNext()) {
+ return false;
}
+ ConsumerRecord<byte[], byte[]> consumerRecord = msgItr.next();
+ currentOffset = consumerRecord.offset();
+ messageReader.readMessage(consumerRecord);
+ return true;
}
@Override
- public void close() throws IOException {
+ public void close() {
logger.debug("Last offset processed for {}:{} is - {}",
subScanSpec.getTopicName(), subScanSpec.getPartitionId(),
currentOffset);
logger.debug("Total time to fetch messages from {}:{} is - {}
milliseconds", subScanSpec.getTopicName(),
subScanSpec.getPartitionId(), msgItr.getTotalFetchTime());
plugin.registerToClose(msgItr);
- messageReader.close();
+ try {
+ messageReader.close();
+ } catch (IOException e) {
+ logger.warn("Error closing Kafka message reader: {}", e.getMessage(), e);
+ }
}
@Override
public String toString() {
return "KafkaRecordReader[readOptions=" + readOptions
Review comment:
Thanks, used it here.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]