cgivre commented on a change in pull request #2239:
URL: https://github.com/apache/drill/pull/2239#discussion_r640787712
##########
File path:
contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaRecordReader.java
##########
@@ -17,131 +17,105 @@
*/
package org.apache.drill.exec.store.kafka;
-import java.io.IOException;
-import java.util.Collection;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-
+import org.apache.drill.common.exceptions.ChildErrorContext;
+import org.apache.drill.common.exceptions.CustomErrorContext;
import org.apache.drill.common.exceptions.UserException;
-import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.ops.OperatorContext;
-import org.apache.drill.exec.physical.impl.OutputMutator;
-import org.apache.drill.exec.store.AbstractRecordReader;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.framework.SchemaNegotiator;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.server.options.OptionManager;
import org.apache.drill.exec.store.kafka.decoders.MessageReader;
import org.apache.drill.exec.store.kafka.decoders.MessageReaderFactory;
-import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
-import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+import java.io.IOException;
-public class KafkaRecordReader extends AbstractRecordReader {
+public class KafkaRecordReader implements ManagedReader<SchemaNegotiator> {
private static final Logger logger =
LoggerFactory.getLogger(KafkaRecordReader.class);
- private static final long DEFAULT_MESSAGES_PER_BATCH = 4000;
-
private final ReadOptions readOptions;
private final KafkaStoragePlugin plugin;
private final KafkaPartitionScanSpec subScanSpec;
+ private final int maxRecords;
- private VectorContainerWriter writer;
private MessageReader messageReader;
-
private long currentOffset;
private MessageIterator msgItr;
- private int currentMessageCount;
- public KafkaRecordReader(KafkaPartitionScanSpec subScanSpec,
List<SchemaPath> projectedColumns,
- FragmentContext context, KafkaStoragePlugin plugin) {
- setColumns(projectedColumns);
- this.readOptions = new ReadOptions(context.getOptions());
+ public KafkaRecordReader(KafkaPartitionScanSpec subScanSpec, OptionManager
options, KafkaStoragePlugin plugin, int maxRecords) {
+ this.readOptions = new ReadOptions(options);
this.plugin = plugin;
this.subScanSpec = subScanSpec;
+ this.maxRecords = maxRecords;
}
@Override
- protected Collection<SchemaPath> transformColumns(Collection<SchemaPath>
projectedColumns) {
- Set<SchemaPath> transformed = new LinkedHashSet<>();
- if (isStarQuery()) {
- transformed.add(SchemaPath.STAR_COLUMN);
- } else {
- transformed.addAll(projectedColumns);
- }
- return transformed;
- }
+ public boolean open(SchemaNegotiator negotiator) {
+ CustomErrorContext errorContext = new
ChildErrorContext(negotiator.parentErrorContext()) {
+ @Override
+ public void addContext(UserException.Builder builder) {
+ super.addContext(builder);
+ builder.addContext("topic_name", subScanSpec.getTopicName());
+ }
+ };
+ negotiator.setErrorContext(errorContext);
- @Override
- public void setup(OperatorContext context, OutputMutator output) {
- this.writer = new VectorContainerWriter(output,
readOptions.isEnableUnionType());
messageReader =
MessageReaderFactory.getMessageReader(readOptions.getMessageReader());
- messageReader.init(context.getManagedBuffer(),
Lists.newArrayList(getColumns()), writer, readOptions);
+ messageReader.init(negotiator, readOptions, plugin);
msgItr = new MessageIterator(messageReader.getConsumer(plugin),
subScanSpec, readOptions.getPollTimeOut());
+
+ return true;
}
/**
* KafkaConsumer.poll will fetch 500 messages per poll call. So hasNext will
* take care of polling multiple times for this given batch next invocation
*/
@Override
- public int next() {
- writer.allocate();
- writer.reset();
- Stopwatch watch = logger.isDebugEnabled() ? Stopwatch.createStarted() :
null;
- currentMessageCount = 0;
-
- try {
- while (currentOffset < subScanSpec.getEndOffset() && msgItr.hasNext()) {
- ConsumerRecord<byte[], byte[]> consumerRecord = msgItr.next();
- currentOffset = consumerRecord.offset();
- writer.setPosition(currentMessageCount);
- boolean status = messageReader.readMessage(consumerRecord);
- // increment record count only if message was read successfully
- if (status) {
- if (++currentMessageCount >= DEFAULT_MESSAGES_PER_BATCH) {
- break;
- }
- }
+ public boolean next() {
+ RowSetLoader rowWriter = messageReader.getResultSetLoader().writer();
Review comment:
Got it. Thanks!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]