cgivre commented on a change in pull request #2239:
URL: https://github.com/apache/drill/pull/2239#discussion_r640210763



##########
File path: 
contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/JsonMessageReader.java
##########
@@ -55,61 +47,78 @@
 public class JsonMessageReader implements MessageReader {
 
   private static final Logger logger = 
LoggerFactory.getLogger(JsonMessageReader.class);
-  private JsonReader jsonReader;
-  private VectorContainerWriter writer;
-  private ObjectMapper objectMapper;
+
+  private final SingleElementIterator<InputStream> stream = new 
SingleElementIterator<>();
+
+  private KafkaJsonLoader kafkaJsonLoader;
+  private ResultSetLoader resultSetLoader;
+  private SchemaNegotiator negotiator;
+  private ReadOptions readOptions;
+  private Properties kafkaConsumerProps;
 
   @Override
-  public void init(DrillBuf buf, List<SchemaPath> columns, 
VectorContainerWriter writer, ReadOptions readOptions) {
-    // set skipOuterList to false as it doesn't applicable for JSON records 
and it's only applicable for JSON files.
-    this.jsonReader = new JsonReader.Builder(buf)
-      .schemaPathColumns(columns)
-      .allTextMode(readOptions.isAllTextMode())
-      .readNumbersAsDouble(readOptions.isReadNumbersAsDouble())
-      .enableNanInf(readOptions.isAllowNanInf())
-      .enableEscapeAnyChar(readOptions.isAllowEscapeAnyChar())
-      .build();
-    jsonReader.setIgnoreJSONParseErrors(readOptions.isSkipInvalidRecords());
-    this.writer = writer;
-    this.objectMapper = BaseJsonProcessor.getDefaultMapper()
-      .configure(JsonParser.Feature.ALLOW_NON_NUMERIC_NUMBERS, 
readOptions.isAllowNanInf())
-      .configure(JsonParser.Feature.ALLOW_BACKSLASH_ESCAPING_ANY_CHARACTER, 
readOptions.isAllowEscapeAnyChar());
+  public void init(SchemaNegotiator negotiator, ReadOptions readOptions, 
KafkaStoragePlugin plugin) {
+    this.negotiator = negotiator;
+    this.resultSetLoader = negotiator.build();
+    this.readOptions = readOptions;
+    this.kafkaConsumerProps = plugin.getConfig().getKafkaConsumerProps();
   }
 
   @Override
-  public boolean readMessage(ConsumerRecord<?, ?> record) {
+  public void readMessage(ConsumerRecord<?, ?> record) {
     byte[] recordArray = (byte[]) record.value();
-    String data = new String(recordArray, Charsets.UTF_8);
     try {
-      JsonNode jsonNode = objectMapper.readTree(data);
-      if (jsonNode != null && jsonNode.isObject()) {
-        ObjectNode objectNode = (ObjectNode) jsonNode;
-        objectNode.put(KAFKA_TOPIC.getFieldName(), record.topic());
-        objectNode.put(KAFKA_PARTITION_ID.getFieldName(), record.partition());
-        objectNode.put(KAFKA_OFFSET.getFieldName(), record.offset());
-        objectNode.put(KAFKA_TIMESTAMP.getFieldName(), record.timestamp());
-        objectNode.put(KAFKA_MSG_KEY.getFieldName(), record.key() != null ? 
record.key().toString() : null);
-      } else {
-        throw new IOException("Unsupported node type: " + (jsonNode == null ? 
"NO CONTENT" : jsonNode.getNodeType()));
+      parseAndWrite(record, recordArray);
+    } catch (TokenIterator.RecoverableJsonException e) {
+      if (!readOptions.isSkipInvalidRecords()) {
+        throw e;

Review comment:
       Do you think we should throw a `UserException` here with an explanation 
and `errorContext`?

##########
File path: 
contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaRecordReader.java
##########
@@ -17,131 +17,105 @@
  */
 package org.apache.drill.exec.store.kafka;
 
-import java.io.IOException;
-import java.util.Collection;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-
+import org.apache.drill.common.exceptions.ChildErrorContext;
+import org.apache.drill.common.exceptions.CustomErrorContext;
 import org.apache.drill.common.exceptions.UserException;
-import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.ops.OperatorContext;
-import org.apache.drill.exec.physical.impl.OutputMutator;
-import org.apache.drill.exec.store.AbstractRecordReader;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.framework.SchemaNegotiator;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.server.options.OptionManager;
 import org.apache.drill.exec.store.kafka.decoders.MessageReader;
 import org.apache.drill.exec.store.kafka.decoders.MessageReaderFactory;
-import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
-import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+import java.io.IOException;
 
-public class KafkaRecordReader extends AbstractRecordReader {
+public class KafkaRecordReader implements ManagedReader<SchemaNegotiator> {
   private static final Logger logger = 
LoggerFactory.getLogger(KafkaRecordReader.class);
 
-  private static final long DEFAULT_MESSAGES_PER_BATCH = 4000;
-
   private final ReadOptions readOptions;
   private final KafkaStoragePlugin plugin;
   private final KafkaPartitionScanSpec subScanSpec;
+  private final int maxRecords;
 
-  private VectorContainerWriter writer;
   private MessageReader messageReader;
-
   private long currentOffset;
   private MessageIterator msgItr;
-  private int currentMessageCount;
 
-  public KafkaRecordReader(KafkaPartitionScanSpec subScanSpec, 
List<SchemaPath> projectedColumns,
-      FragmentContext context, KafkaStoragePlugin plugin) {
-    setColumns(projectedColumns);
-    this.readOptions = new ReadOptions(context.getOptions());
+  public KafkaRecordReader(KafkaPartitionScanSpec subScanSpec, OptionManager 
options, KafkaStoragePlugin plugin, int maxRecords) {
+    this.readOptions = new ReadOptions(options);
     this.plugin = plugin;
     this.subScanSpec = subScanSpec;
+    this.maxRecords = maxRecords;
   }
 
   @Override
-  protected Collection<SchemaPath> transformColumns(Collection<SchemaPath> 
projectedColumns) {
-    Set<SchemaPath> transformed = new LinkedHashSet<>();
-    if (isStarQuery()) {
-      transformed.add(SchemaPath.STAR_COLUMN);
-    } else {
-      transformed.addAll(projectedColumns);
-    }
-    return transformed;
-  }
+  public boolean open(SchemaNegotiator negotiator) {
+    CustomErrorContext errorContext = new 
ChildErrorContext(negotiator.parentErrorContext()) {
+      @Override
+      public void addContext(UserException.Builder builder) {
+        super.addContext(builder);
+        builder.addContext("topic_name", subScanSpec.getTopicName());
+      }
+    };
+    negotiator.setErrorContext(errorContext);
 
-  @Override
-  public void setup(OperatorContext context, OutputMutator output) {
-    this.writer = new VectorContainerWriter(output, 
readOptions.isEnableUnionType());
     messageReader = 
MessageReaderFactory.getMessageReader(readOptions.getMessageReader());
-    messageReader.init(context.getManagedBuffer(), 
Lists.newArrayList(getColumns()), writer, readOptions);
+    messageReader.init(negotiator, readOptions, plugin);
     msgItr = new MessageIterator(messageReader.getConsumer(plugin), 
subScanSpec, readOptions.getPollTimeOut());
+
+    return true;
   }
 
   /**
    * KafkaConsumer.poll will fetch 500 messages per poll call. So hasNext will
    * take care of polling multiple times for this given batch next invocation
    */
   @Override
-  public int next() {
-    writer.allocate();
-    writer.reset();
-    Stopwatch watch = logger.isDebugEnabled() ? Stopwatch.createStarted() : 
null;
-    currentMessageCount = 0;
-
-    try {
-      while (currentOffset < subScanSpec.getEndOffset() && msgItr.hasNext()) {
-        ConsumerRecord<byte[], byte[]> consumerRecord = msgItr.next();
-        currentOffset = consumerRecord.offset();
-        writer.setPosition(currentMessageCount);
-        boolean status = messageReader.readMessage(consumerRecord);
-        // increment record count only if message was read successfully
-        if (status) {
-          if (++currentMessageCount >= DEFAULT_MESSAGES_PER_BATCH) {
-            break;
-          }
-        }
+  public boolean next() {
+    RowSetLoader rowWriter = messageReader.getResultSetLoader().writer();

Review comment:
       @vvysotskyi 
   Does this line belong in the `setup()` method?  In the Splunk plugin for 
example:
   
   
https://github.com/apache/drill/blob/bc53c8e6c4a24b7cbce2112690a7d3e77e55fe41/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkBatchReader.java#L122-L129
   
   Then in each iteration of the `next()` method it calls `rowWriter.start()`.  
Or does this work a little differently?
   

##########
File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/JsonStructureOptions.java
##########
@@ -47,6 +47,13 @@
    */
   public boolean skipMalformedRecords;
 
+  /**
+   * This property works only when {@link #skipMalformedRecords} enabled.
+   * If true, {@link TokenIterator.RecoverableJsonException} will be populated 
for the case of
+   * malformed empty document, so it will be possible to handle this exception 
by caller.
+   */
+  public boolean skipMalformedDocument;

Review comment:
       Is there a system option for this?  Also is this documented anywhere?

##########
File path: 
contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaRecordReader.java
##########
@@ -17,131 +17,105 @@
  */
 package org.apache.drill.exec.store.kafka;
 
-import java.io.IOException;
-import java.util.Collection;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-
+import org.apache.drill.common.exceptions.ChildErrorContext;
+import org.apache.drill.common.exceptions.CustomErrorContext;
 import org.apache.drill.common.exceptions.UserException;
-import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.ops.OperatorContext;
-import org.apache.drill.exec.physical.impl.OutputMutator;
-import org.apache.drill.exec.store.AbstractRecordReader;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.framework.SchemaNegotiator;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.server.options.OptionManager;
 import org.apache.drill.exec.store.kafka.decoders.MessageReader;
 import org.apache.drill.exec.store.kafka.decoders.MessageReaderFactory;
-import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
-import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+import java.io.IOException;
 
-public class KafkaRecordReader extends AbstractRecordReader {
+public class KafkaRecordReader implements ManagedReader<SchemaNegotiator> {
   private static final Logger logger = 
LoggerFactory.getLogger(KafkaRecordReader.class);
 
-  private static final long DEFAULT_MESSAGES_PER_BATCH = 4000;
-
   private final ReadOptions readOptions;
   private final KafkaStoragePlugin plugin;
   private final KafkaPartitionScanSpec subScanSpec;
+  private final int maxRecords;
 
-  private VectorContainerWriter writer;
   private MessageReader messageReader;
-
   private long currentOffset;
   private MessageIterator msgItr;
-  private int currentMessageCount;
 
-  public KafkaRecordReader(KafkaPartitionScanSpec subScanSpec, 
List<SchemaPath> projectedColumns,
-      FragmentContext context, KafkaStoragePlugin plugin) {
-    setColumns(projectedColumns);
-    this.readOptions = new ReadOptions(context.getOptions());
+  public KafkaRecordReader(KafkaPartitionScanSpec subScanSpec, OptionManager 
options, KafkaStoragePlugin plugin, int maxRecords) {
+    this.readOptions = new ReadOptions(options);
     this.plugin = plugin;
     this.subScanSpec = subScanSpec;
+    this.maxRecords = maxRecords;
   }
 
   @Override
-  protected Collection<SchemaPath> transformColumns(Collection<SchemaPath> 
projectedColumns) {
-    Set<SchemaPath> transformed = new LinkedHashSet<>();
-    if (isStarQuery()) {
-      transformed.add(SchemaPath.STAR_COLUMN);
-    } else {
-      transformed.addAll(projectedColumns);
-    }
-    return transformed;
-  }
+  public boolean open(SchemaNegotiator negotiator) {
+    CustomErrorContext errorContext = new 
ChildErrorContext(negotiator.parentErrorContext()) {
+      @Override
+      public void addContext(UserException.Builder builder) {
+        super.addContext(builder);
+        builder.addContext("topic_name", subScanSpec.getTopicName());
+      }
+    };
+    negotiator.setErrorContext(errorContext);
 
-  @Override
-  public void setup(OperatorContext context, OutputMutator output) {
-    this.writer = new VectorContainerWriter(output, 
readOptions.isEnableUnionType());
     messageReader = 
MessageReaderFactory.getMessageReader(readOptions.getMessageReader());
-    messageReader.init(context.getManagedBuffer(), 
Lists.newArrayList(getColumns()), writer, readOptions);
+    messageReader.init(negotiator, readOptions, plugin);
     msgItr = new MessageIterator(messageReader.getConsumer(plugin), 
subScanSpec, readOptions.getPollTimeOut());
+
+    return true;
   }
 
   /**
    * KafkaConsumer.poll will fetch 500 messages per poll call. So hasNext will
    * take care of polling multiple times for this given batch next invocation
    */
   @Override
-  public int next() {
-    writer.allocate();
-    writer.reset();
-    Stopwatch watch = logger.isDebugEnabled() ? Stopwatch.createStarted() : 
null;
-    currentMessageCount = 0;
-
-    try {
-      while (currentOffset < subScanSpec.getEndOffset() && msgItr.hasNext()) {
-        ConsumerRecord<byte[], byte[]> consumerRecord = msgItr.next();
-        currentOffset = consumerRecord.offset();
-        writer.setPosition(currentMessageCount);
-        boolean status = messageReader.readMessage(consumerRecord);
-        // increment record count only if message was read successfully
-        if (status) {
-          if (++currentMessageCount >= DEFAULT_MESSAGES_PER_BATCH) {
-            break;
-          }
-        }
+  public boolean next() {
+    RowSetLoader rowWriter = messageReader.getResultSetLoader().writer();
+    while (!rowWriter.isFull()) {
+      if (!nextLine(rowWriter)) {
+        return false;
       }
+    }
+    return messageReader.endBatch();
+  }
 
-      if (currentMessageCount > 0) {
-        messageReader.ensureAtLeastOneField();
-      }
-      writer.setValueCount(currentMessageCount);
-      if (watch != null) {
-        logger.debug("Took {} ms to process {} records.", 
watch.elapsed(TimeUnit.MILLISECONDS), currentMessageCount);
-      }
-      logger.debug("Last offset consumed for {}:{} is {}", 
subScanSpec.getTopicName(), subScanSpec.getPartitionId(),
-          currentOffset);
-      return currentMessageCount;
-    } catch (Exception e) {
-      String msg = "Failure while reading messages from kafka. Record reader 
was at record: " + (currentMessageCount + 1);
-      throw UserException.dataReadError(e)
-        .message(msg)
-        .addContext(e.getMessage())
-        .build(logger);
+  private boolean nextLine(RowSetLoader rowWriter) {
+    if (rowWriter.limitReached(maxRecords)) {
+      return false;
+    }
+
+    if (currentOffset >= subScanSpec.getEndOffset() || !msgItr.hasNext()) {
+      return false;
     }
+    ConsumerRecord<byte[], byte[]> consumerRecord = msgItr.next();
+    currentOffset = consumerRecord.offset();
+    messageReader.readMessage(consumerRecord);
+    return true;
   }
 
   @Override
-  public void close() throws IOException {
+  public void close() {
     logger.debug("Last offset processed for {}:{} is - {}", 
subScanSpec.getTopicName(), subScanSpec.getPartitionId(),
         currentOffset);
     logger.debug("Total time to fetch messages from {}:{} is - {} 
milliseconds", subScanSpec.getTopicName(),
         subScanSpec.getPartitionId(), msgItr.getTotalFetchTime());
     plugin.registerToClose(msgItr);
-    messageReader.close();
+    try {
+      messageReader.close();
+    } catch (IOException e) {
+      logger.warn("Error closing Kafka message reader: {}", e.getMessage(), e);
+    }
   }
 
   @Override
   public String toString() {
     return "KafkaRecordReader[readOptions=" + readOptions

Review comment:
       Should we update this to use the `PlanStringBuilder`?  Here and 
elsewhere?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to