This is an automated email from the ASF dual-hosted git repository. vitalii pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/drill.git
commit 98e5de3b5af862779244bac8329852b3c9a901df Author: Bohdan Kazydub <[email protected]> AuthorDate: Fri Aug 31 18:20:34 2018 +0300 DRILL-6724: Dump operator context to logs when error occurs during query execution closes #1455 --- .../drill/common/exceptions/UserException.java | 4 +- .../common/exceptions/UserExceptionContext.java | 19 ++- .../store/mapr/db/json/MaprDBJsonRecordReader.java | 17 ++- .../drill/exec/store/hbase/HBaseRecordReader.java | 5 + .../store/hive/readers/HiveAbstractReader.java | 15 ++ .../drill/exec/store/jdbc/JdbcRecordReader.java | 8 +- .../drill/exec/store/kafka/KafkaRecordReader.java | 26 +++- .../store/kafka/decoders/JsonMessageReader.java | 4 + .../drill/exec/store/kudu/KuduRecordReader.java | 11 ++ .../apache/drill/exec/store/kudu/KuduWriter.java | 5 + .../drill/exec/store/mongo/MongoRecordReader.java | 5 + .../drill/exec/physical/config/HashAggregate.java | 8 ++ .../physical/config/OrderedPartitionSender.java | 11 ++ .../drill/exec/physical/config/PartitionLimit.java | 5 + .../apache/drill/exec/physical/config/Sort.java | 7 + .../apache/drill/exec/physical/config/TopN.java | 7 + .../drill/exec/physical/config/WindowPOP.java | 16 +++ .../drill/exec/physical/impl/BaseRootExec.java | 23 +++ .../apache/drill/exec/physical/impl/RootExec.java | 6 + .../apache/drill/exec/physical/impl/ScanBatch.java | 30 +++- .../drill/exec/physical/impl/TopN/TopNBatch.java | 9 +- .../exec/physical/impl/WriterRecordBatch.java | 6 + .../exec/physical/impl/aggregate/HashAggBatch.java | 9 ++ .../physical/impl/aggregate/HashAggTemplate.java | 9 ++ .../impl/aggregate/SpilledRecordbatch.java | 35 ++++- .../physical/impl/aggregate/StreamingAggBatch.java | 6 + .../impl/aggregate/StreamingAggTemplate.java | 10 ++ .../physical/impl/filter/FilterRecordBatch.java | 8 ++ .../exec/physical/impl/filter/FilterTemplate2.java | 7 + .../exec/physical/impl/filter/FilterTemplate4.java | 6 + .../impl/filter/RuntimeFilterRecordBatch.java | 9 +- .../physical/impl/flatten/FlattenRecordBatch.java | 6 + .../physical/impl/flatten/FlattenTemplate.java | 11 ++ .../exec/physical/impl/join/HashJoinBatch.java | 6 + .../physical/impl/join/HashJoinProbeTemplate.java | 13 ++ .../exec/physical/impl/join/LateralJoinBatch.java | 8 ++ .../exec/physical/impl/join/MergeJoinBatch.java | 7 + .../physical/impl/join/NestedLoopJoinBatch.java | 8 ++ .../exec/physical/impl/limit/LimitRecordBatch.java | 74 +++++----- .../impl/limit/PartitionLimitRecordBatch.java | 10 +- .../impl/mergereceiver/MergingRecordBatch.java | 8 ++ .../OrderedPartitionRecordBatch.java | 6 + .../impl/producer/ProducerConsumerBatch.java | 5 + .../physical/impl/project/ProjectRecordBatch.java | 6 + .../physical/impl/project/ProjectorTemplate.java | 6 + .../impl/protocol/OperatorRecordBatch.java | 18 ++- .../drill/exec/physical/impl/sort/SortBatch.java | 4 + .../exec/physical/impl/sort/SortTemplate.java | 4 + .../impl/svremover/RemovingRecordBatch.java | 5 + .../exec/physical/impl/trace/TraceRecordBatch.java | 6 +- .../physical/impl/union/UnionAllRecordBatch.java | 5 + .../exec/physical/impl/unnest/UnnestImpl.java | 10 ++ .../physical/impl/unnest/UnnestRecordBatch.java | 7 +- .../unorderedreceiver/UnorderedReceiverBatch.java | 36 +++-- .../validate/IteratorValidatorBatchIterator.java | 13 +- .../physical/impl/window/FrameSupportTemplate.java | 11 ++ .../impl/window/NoFrameSupportTemplate.java | 11 ++ .../exec/physical/impl/window/WindowDataBatch.java | 5 + .../impl/window/WindowFrameRecordBatch.java | 7 + .../physical/impl/xsort/ExternalSortBatch.java | 4 + .../exec/physical/impl/xsort/MSortTemplate.java | 9 ++ .../impl/xsort/SingleBatchSorterTemplate.java | 5 + .../impl/xsort/managed/ExternalSortBatch.java | 7 + .../physical/impl/xsort/managed/SortConfig.java | 9 ++ .../exec/physical/impl/xsort/managed/SortImpl.java | 8 ++ .../drill/exec/record/AbstractRecordBatch.java | 50 +++++-- .../record/AbstractTableFunctionRecordBatch.java | 1 - .../org/apache/drill/exec/record/RecordBatch.java | 15 ++ .../drill/exec/record/RecordBatchLoader.java | 8 ++ .../apache/drill/exec/record/RecordIterator.java | 14 ++ .../apache/drill/exec/record/SchemalessBatch.java | 10 ++ .../drill/exec/record/SimpleRecordBatch.java | 13 ++ .../exec/record/selection/SelectionVector4.java | 9 ++ .../drill/exec/store/AbstractRecordReader.java | 1 - .../org/apache/drill/exec/store/RecordReader.java | 4 +- .../apache/drill/exec/store/StorageStrategy.java | 7 +- .../drill/exec/store/avro/AvroRecordReader.java | 15 +- .../drill/exec/store/bson/BsonRecordReader.java | 25 ++-- .../drill/exec/store/dfs/easy/EasyWriter.java | 8 ++ .../exec/store/easy/json/JSONRecordReader.java | 46 +++--- .../drill/exec/store/easy/json/JsonProcessor.java | 16 +-- .../store/easy/json/reader/BaseJsonProcessor.java | 48 ++++--- .../store/easy/json/reader/CountingJsonReader.java | 14 +- .../sequencefile/SequenceFileRecordReader.java | 17 ++- .../text/compliant/CompliantTextRecordReader.java | 9 +- .../exec/store/easy/text/compliant/TextReader.java | 8 ++ .../exec/store/httpd/HttpdLogFormatPlugin.java | 8 ++ .../drill/exec/store/image/ImageRecordReader.java | 7 +- .../drill/exec/store/log/LogRecordReader.java | 9 +- .../drill/exec/store/parquet/ParquetWriter.java | 7 + .../parquet/columnreaders/ParquetRecordReader.java | 10 ++ .../exec/store/parquet2/DrillParquetReader.java | 5 + .../drill/exec/store/pcap/PcapRecordReader.java | 5 + .../exec/store/text/DrillTextRecordReader.java | 9 ++ .../drill/exec/vector/complex/fn/JsonReader.java | 62 +++----- .../drill/exec/work/fragment/FragmentExecutor.java | 4 + .../parquet/hadoop/ColumnChunkIncReadStore.java | 5 + .../java/org/apache/drill/TestOperatorDump.java | 159 +++++++++++++++++++++ .../drill/exec/physical/impl/MockRecordBatch.java | 9 ++ .../drill/exec/physical/impl/SimpleRootExec.java | 5 + .../physical/impl/unnest/MockLateralJoinBatch.java | 9 ++ .../drill/exec/work/filter/BloomFilterTest.java | 9 ++ .../java/org/apache/drill/test/LogFixture.java | 12 +- 103 files changed, 1165 insertions(+), 211 deletions(-) diff --git a/common/src/main/java/org/apache/drill/common/exceptions/UserException.java b/common/src/main/java/org/apache/drill/common/exceptions/UserException.java index 56cb935..3a3ca5a 100644 --- a/common/src/main/java/org/apache/drill/common/exceptions/UserException.java +++ b/common/src/main/java/org/apache/drill/common/exceptions/UserException.java @@ -771,8 +771,10 @@ public class UserException extends DrillRuntimeException { * @return generated user error message */ private String generateMessage(boolean includeErrorIdAndIdentity) { + boolean seeLogsMessage = errorType == DrillPBError.ErrorType.INTERNAL_ERROR + || errorType == DrillPBError.ErrorType.SYSTEM; return errorType + " ERROR: " + super.getMessage() + "\n\n" + - context.generateContextMessage(includeErrorIdAndIdentity); + context.generateContextMessage(includeErrorIdAndIdentity, seeLogsMessage); } } diff --git a/common/src/main/java/org/apache/drill/common/exceptions/UserExceptionContext.java b/common/src/main/java/org/apache/drill/common/exceptions/UserExceptionContext.java index fa2c437..271462a 100644 --- a/common/src/main/java/org/apache/drill/common/exceptions/UserExceptionContext.java +++ b/common/src/main/java/org/apache/drill/common/exceptions/UserExceptionContext.java @@ -29,6 +29,8 @@ import org.apache.drill.exec.proto.CoordinationProtos; */ class UserExceptionContext { + private static final String NEW_LINE = System.lineSeparator(); + private final String errorId; private final List<String> contextList; @@ -133,17 +135,26 @@ class UserExceptionContext { * generate a context message * @return string containing all context information concatenated */ - String generateContextMessage(boolean includeErrorIdAndIdentity) { + String generateContextMessage(boolean includeErrorIdAndIdentity, boolean includeSeeLogsMessage) { StringBuilder sb = new StringBuilder(); for (String context : contextList) { - sb.append(context).append("\n"); + sb.append(context) + .append(NEW_LINE); + } + + if (includeSeeLogsMessage) { + sb.append(NEW_LINE) + .append("Please, refer to logs for more information.") + .append(NEW_LINE); } if (includeErrorIdAndIdentity) { // add identification infos - sb.append("\n[Error Id: "); - sb.append(errorId).append(" "); + sb.append(NEW_LINE) + .append("[Error Id: ") + .append(errorId) + .append(" "); if (endpoint != null) { sb.append("on ") .append(endpoint.getAddress()) diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/MaprDBJsonRecordReader.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/MaprDBJsonRecordReader.java index 3c7ca8e..b68f574 100644 --- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/MaprDBJsonRecordReader.java +++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/MaprDBJsonRecordReader.java @@ -81,6 +81,7 @@ public class MaprDBJsonRecordReader extends AbstractRecordReader { private final String tableName; private OperatorContext operatorContext; private VectorContainerWriter vectorWriter; + private DBDocumentReaderBase reader; private DrillBuf buffer; @@ -195,7 +196,7 @@ public class MaprDBJsonRecordReader extends AbstractRecordReader { vectorWriter.reset(); int recordCount = 0; - DBDocumentReaderBase reader = null; + reader = null; while(recordCount < BaseValueVector.INITIAL_VALUE_ALLOCATION) { vectorWriter.setPosition(recordCount); @@ -526,4 +527,18 @@ public class MaprDBJsonRecordReader extends AbstractRecordReader { table.close(); } } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder("MaprDBJsonRecordReader[Table=") + .append(table.getPath()); + if (reader != null) { + sb.append(", Document ID=") + .append(IdCodec.asString(reader.getId())); + } + sb.append(", reader=") + .append(reader) + .append(']'); + return sb.toString(); + } } diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java index 86038c4..2db1d02 100644 --- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java +++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java @@ -338,4 +338,9 @@ public class HBaseRecordReader extends AbstractRecordReader implements DrillHBas return rowCount < TARGET_RECORD_COUNT && operatorContext.getAllocator().getAllocatedMemory() < MAX_ALLOCATED_MEMORY_PER_BATCH; } + + @Override + public String toString() { + return "HBaseRecordReader[Table=" + hbaseTableName.getNamespaceAsString() + "]"; + } } diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/HiveAbstractReader.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/HiveAbstractReader.java index 354a61e..ba1cd30 100644 --- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/HiveAbstractReader.java +++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/HiveAbstractReader.java @@ -430,4 +430,19 @@ public abstract class HiveAbstractReader extends AbstractRecordReader { } } + @Override + public String toString() { + long position = -1; + try { + if (reader != null) { + position = reader.getPos(); + } + } catch (IOException e) { + logger.trace("Unable to obtain reader position: " + e.getMessage()); + } + return getClass().getSimpleName() + "[Database=" + table.getDbName() + + ", Table=" + table.getTableName() + + ", Position=" + position + + "]"; + } } diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcRecordReader.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcRecordReader.java index 1b6e211..cd732a6 100755 --- a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcRecordReader.java +++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcRecordReader.java @@ -279,6 +279,13 @@ class JdbcRecordReader extends AbstractRecordReader { AutoCloseables.close(resultSet, statement, connection); } + @Override + public String toString() { + return "JdbcRecordReader[sql=" + sql + + ", Plugin=" + storagePluginName + + "]"; + } + private abstract class Copier<T extends ValueVector.Mutator> { protected final int columnIndex; protected final ResultSet result; @@ -478,5 +485,4 @@ class JdbcRecordReader extends AbstractRecordReader { } } - } diff --git a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaRecordReader.java b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaRecordReader.java index d715ada..9559c3d 100644 --- a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaRecordReader.java +++ b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaRecordReader.java @@ -60,6 +60,7 @@ public class KafkaRecordReader extends AbstractRecordReader { private final boolean enableAllTextMode; private final boolean readNumbersAsDouble; private final String kafkaMsgReader; + private int currentMessageCount; public KafkaRecordReader(KafkaPartitionScanSpec subScanSpec, List<SchemaPath> projectedColumns, FragmentContext context, KafkaStoragePlugin plugin) { @@ -105,27 +106,27 @@ public class KafkaRecordReader extends AbstractRecordReader { writer.allocate(); writer.reset(); Stopwatch watch = Stopwatch.createStarted(); - int messageCount = 0; + currentMessageCount = 0; try { while (currentOffset < subScanSpec.getEndOffset() - 1 && msgItr.hasNext()) { ConsumerRecord<byte[], byte[]> consumerRecord = msgItr.next(); currentOffset = consumerRecord.offset(); - writer.setPosition(messageCount); + writer.setPosition(currentMessageCount); messageReader.readMessage(consumerRecord); - if (++messageCount >= DEFAULT_MESSAGES_PER_BATCH) { + if (++currentMessageCount >= DEFAULT_MESSAGES_PER_BATCH) { break; } } messageReader.ensureAtLeastOneField(); - writer.setValueCount(messageCount); - logger.debug("Took {} ms to process {} records.", watch.elapsed(TimeUnit.MILLISECONDS), messageCount); + writer.setValueCount(currentMessageCount); + logger.debug("Took {} ms to process {} records.", watch.elapsed(TimeUnit.MILLISECONDS), currentMessageCount); logger.debug("Last offset consumed for {}:{} is {}", subScanSpec.getTopicName(), subScanSpec.getPartitionId(), currentOffset); - return messageCount; + return currentMessageCount; } catch (Exception e) { - String msg = "Failure while reading messages from kafka. Recordreader was at record: " + (messageCount + 1); + String msg = "Failure while reading messages from kafka. Recordreader was at record: " + (currentMessageCount + 1); throw UserException.dataReadError(e).message(msg).addContext(e.getMessage()).build(logger); } } @@ -139,4 +140,15 @@ public class KafkaRecordReader extends AbstractRecordReader { messageReader.close(); } + @Override + public String toString() { + return "KafkaRecordReader[messageReader=" + messageReader + + ", kafkaPollTimeOut=" + kafkaPollTimeOut + + ", currentOffset=" + currentOffset + + ", enableAllTextMode=" + enableAllTextMode + + ", readNumbersAsDouble=" + readNumbersAsDouble + + ", kafkaMsgReader=" + kafkaMsgReader + + ", currentMessageCount=" + currentMessageCount + + "]"; + } } diff --git a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/JsonMessageReader.java b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/JsonMessageReader.java index a62357d..40e9e12 100644 --- a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/JsonMessageReader.java +++ b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/JsonMessageReader.java @@ -105,4 +105,8 @@ public class JsonMessageReader implements MessageReader { } } + @Override + public String toString() { + return "JsonMessageReader[jsonReader=" + jsonReader + "]"; + } } diff --git a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduRecordReader.java b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduRecordReader.java index 845738c..976b16d 100644 --- a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduRecordReader.java +++ b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduRecordReader.java @@ -78,6 +78,9 @@ public class KuduRecordReader extends AbstractRecordReader { private OutputMutator output; private OperatorContext context; + private String lastColumnName; + private Type lastColumnType; + private static class ProjectedColumnInfo { int index; ValueVector vv; @@ -176,6 +179,8 @@ public class KuduRecordReader extends AbstractRecordReader { final String name = col.getName(); final Type kuduType = col.getType(); + lastColumnName = name; + lastColumnType = kuduType; MinorType minorType = TYPES.get(kuduType); if (minorType == null) { logger.warn("Ignoring column that is unsupported.", UserException @@ -326,4 +331,10 @@ public class KuduRecordReader extends AbstractRecordReader { public void close() { } + @Override + public String toString() { + return "KuduRecordReader[Column=" + lastColumnName + + ", Type=" + lastColumnType + + "]"; + } } diff --git a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduWriter.java b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduWriter.java index d0fa158..7611576 100644 --- a/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduWriter.java +++ b/contrib/storage-kudu/src/main/java/org/apache/drill/exec/store/kudu/KuduWriter.java @@ -77,4 +77,9 @@ public class KuduWriter extends AbstractWriter { public KuduStoragePlugin getPlugin() { return plugin; } + + @Override + public String toString() { + return "KuduWriter[name=" + name + ", storageStrategy=" + getStorageStrategy() + "]"; + } } diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java index a79e39a..f5d1f2e 100644 --- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java +++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java @@ -216,4 +216,9 @@ public class MongoRecordReader extends AbstractRecordReader { public void close() { } + @Override + public String toString() { + Object reader = isBsonRecordReader ? bsonReader : jsonReader; + return "MongoRecordReader[reader=" + reader + "]"; + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashAggregate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashAggregate.java index da988de..521aba1 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashAggregate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashAggregate.java @@ -106,4 +106,12 @@ public class HashAggregate extends AbstractSingle { return queryContext == null || 1 < (int) queryContext.getOptions().getOption(ExecConstants.HASHAGG_NUM_PARTITIONS_VALIDATOR); } + + @Override + public String toString() { + return "HashAggregate[groupByExprs=" + groupByExprs + + ", aggrExprs=" + aggrExprs + + ", cardinality=" + cardinality + + "]"; + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionSender.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionSender.java index d28d563..320bc6d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionSender.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionSender.java @@ -106,4 +106,15 @@ public class OrderedPartitionSender extends AbstractSender { public int getOperatorType() { return CoreOperatorType.ORDERED_PARTITION_SENDER_VALUE; } + + @Override + public String toString() { + return "OrderedPartitionSender[orderings=" + orderings + + ", ref=" + ref + + ", sendingWidth=" + sendingWidth + + ", recordsToSample=" + recordsToSample + + ", samplingFactor=" + samplingFactor + + ", completionFactor=" + completionFactor + + "]"; + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/PartitionLimit.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/PartitionLimit.java index 29f8bb2..4ea710d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/PartitionLimit.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/PartitionLimit.java @@ -59,4 +59,9 @@ public class PartitionLimit extends Limit { public int getOperatorType() { return CoreOperatorType.PARTITION_LIMIT_VALUE; } + + @Override + public String toString() { + return "PartitionLimit[partitionColumn=" + partitionColumn + "]"; + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Sort.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Sort.java index 85ef7da..5d65c39 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Sort.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Sort.java @@ -71,4 +71,11 @@ public class Sort extends AbstractSingle{ public int getOperatorType() { return CoreOperatorType.OLD_SORT_VALUE; } + + @Override + public String toString() { + return "Sort[orderings=" + orderings + + ", reverse=" + reverse + + "]"; + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/TopN.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/TopN.java index 54d1b4d..d2a87d5 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/TopN.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/TopN.java @@ -69,4 +69,11 @@ public class TopN extends Sort { return CoreOperatorType.TOP_N_SORT_VALUE; } + @Override + public String toString() { + return "TopN[orderings=" + orderings + + ", reverse=" + reverse + + ", limit=" + limit + + "]"; + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/WindowPOP.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/WindowPOP.java index 543c09f..3ddaa7f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/WindowPOP.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/WindowPOP.java @@ -95,6 +95,17 @@ public class WindowPOP extends AbstractSingle { return frameUnitsRows; } + @Override + public String toString() { + return "WindowPOP[withins=" + withins + + ", aggregations=" + aggregations + + ", orderings=" + orderings + + ", frameUnitsRows=" + frameUnitsRows + + ", start=" + start + + ", end=" + end + + "]"; + } + @JsonTypeName("windowBound") public static class Bound { private final boolean unbounded; @@ -117,6 +128,11 @@ public class WindowPOP extends AbstractSingle { public long getOffset() { return offset; } + + @Override + public String toString() { + return "Bound[unbounded=" + unbounded + ", offset=" + offset + "]"; + } } public static Bound newBound(RexWindowBound windowBound) { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java index e148278..9142a2a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java @@ -17,6 +17,7 @@ */ package org.apache.drill.exec.physical.impl; +import java.util.LinkedList; import java.util.List; import org.apache.drill.common.DeferredException; @@ -124,6 +125,28 @@ public abstract class BaseRootExec implements RootExec { } @Override + public void dumpBatches() { + final int numberOfBatchesToDump = 2; + logger.error("Batch dump started: dumping last {} failed batches", numberOfBatchesToDump); + // As batches are stored in a 'flat' List there is a need to filter out the failed batch + // and a few of its parent (actual number of batches is set by a constant defined above) + List<CloseableRecordBatch> failedBatchStack = new LinkedList<>(); + for (int i = operators.size() - 1; i >= 0; i--) { + CloseableRecordBatch batch = operators.get(i); + if (batch.hasFailed()) { + failedBatchStack.add(0, batch); + if (failedBatchStack.size() == numberOfBatchesToDump) { + break; + } + } + } + for (CloseableRecordBatch batch : failedBatchStack) { + batch.dump(); + } + logger.error("Batch dump completed."); + } + + @Override public void close() throws Exception { // We want to account for the time spent waiting here as Wait time in the operator profile try { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootExec.java index df0f89b..34f2131 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootExec.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootExec.java @@ -44,4 +44,10 @@ public interface RootExec extends AutoCloseable { * @param handle The handle pointing to the downstream receiver that does not need anymore data. */ void receivingFragmentFinished(FragmentHandle handle); + + /** + * Dump failed batches' state preceded by its parent's state to logs. Invoked when there is a + * failure during fragment execution. + */ + void dumpBatches(); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java index cd24a4c..dc8dd0f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java @@ -83,6 +83,10 @@ public class ScanBatch implements CloseableRecordBatch { private final List<Map<String, String>> implicitColumnList; private String currentReaderClassName; private final RecordBatchStatsContext batchStatsContext; + // Represents last outcome of next(). If an Exception is thrown + // during the method's execution a value IterOutcome.STOP will be assigned. + private IterOutcome lastOutcome; + /** * * @param context @@ -160,7 +164,8 @@ public class ScanBatch implements CloseableRecordBatch { @Override public IterOutcome next() { if (done) { - return IterOutcome.NONE; + lastOutcome = IterOutcome.NONE; + return lastOutcome; } oContext.getStats().startProcessing(); try { @@ -168,7 +173,8 @@ public class ScanBatch implements CloseableRecordBatch { if (currentReader == null && !getNextReaderIfHas()) { releaseAssets(); // All data has been read. Release resource. done = true; - return IterOutcome.NONE; + lastOutcome = IterOutcome.NONE; + return lastOutcome; } injector.injectChecked(context.getExecutionControls(), "next-allocate", OutOfMemoryException.class); currentReader.allocate(mutator.fieldVectorMap()); @@ -191,7 +197,8 @@ public class ScanBatch implements CloseableRecordBatch { // This could happen when data sources have a non-trivial schema with 0 row. container.buildSchema(SelectionVectorMode.NONE); schema = container.getSchema(); - return IterOutcome.OK_NEW_SCHEMA; + lastOutcome = IterOutcome.OK_NEW_SCHEMA; + return lastOutcome; } // Handle case of same schema. @@ -199,11 +206,13 @@ public class ScanBatch implements CloseableRecordBatch { continue; // Skip to next loop iteration if reader returns 0 row and has same schema. } else { // return OK if recordCount > 0 && ! isNewSchema - return IterOutcome.OK; + lastOutcome = IterOutcome.OK; + return lastOutcome; } } } catch (OutOfMemoryException ex) { clearFieldVectorMap(); + lastOutcome = IterOutcome.STOP; throw UserException.memoryError(ex).build(logger); } catch (ExecutionSetupException e) { if (currentReader != null) { @@ -213,12 +222,15 @@ public class ScanBatch implements CloseableRecordBatch { logger.error("Close failed for reader " + currentReaderClassName, e2); } } + lastOutcome = IterOutcome.STOP; throw UserException.internalError(e) .addContext("Setup failed for", currentReaderClassName) .build(logger); } catch (UserException ex) { + lastOutcome = IterOutcome.STOP; throw ex; } catch (Exception ex) { + lastOutcome = IterOutcome.STOP; throw UserException.internalError(ex).build(logger); } finally { oContext.getStats().stopProcessing(); @@ -559,4 +571,14 @@ public class ScanBatch implements CloseableRecordBatch { return true; } + + @Override + public boolean hasFailed() { + return lastOutcome == IterOutcome.STOP; + } + + @Override + public void dump() { + logger.error("ScanBatch[container={}, currentReader={}, schema={}]", container, currentReader, schema); + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java index ba4b94a..22dfdf0 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java @@ -74,6 +74,7 @@ import static org.apache.drill.exec.record.RecordBatch.IterOutcome.EMIT; import static org.apache.drill.exec.record.RecordBatch.IterOutcome.NONE; import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK; import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK_NEW_SCHEMA; +import static org.apache.drill.exec.record.RecordBatch.IterOutcome.STOP; /** * Operator Batch which implements the TopN functionality. It is more efficient than (sort + limit) since unlike sort @@ -185,7 +186,7 @@ public class TopNBatch extends AbstractRecordBatch<TopN> { // Reset the TopN state for next iteration resetTopNState(); - try{ + try { boolean incomingHasSv2 = false; switch (incoming.getSchema().getSelectionVectorMode()) { case NONE: { @@ -693,4 +694,10 @@ public class TopNBatch extends AbstractRecordBatch<TopN> { return sv4; } } + + @Override + public void dump() { + logger.error("TopNBatch[container={}, config={}, schema={}, sv4={}, countSincePurge={}, " + + "batchCount={}, recordCount={}]", container, config, schema, sv4, countSincePurge, batchCount, recordCount); + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java index 65d0c54..3a8485a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java @@ -209,4 +209,10 @@ public class WriterRecordBatch extends AbstractRecordBatch<Writer> { closeWriter(); super.close(); } + + @Override + public void dump() { + logger.error("WriterRecordBatch[container={}, popConfig={}, counter={}, fragmentUniqueId={}, schema={}]", + container, popConfig, counter, fragmentUniqueId, schema); + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java index 9de9aae..80d25ed 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java @@ -18,6 +18,7 @@ package org.apache.drill.exec.physical.impl.aggregate; import java.io.IOException; +import java.util.Arrays; import java.util.List; import java.util.Map; @@ -500,4 +501,12 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> { wasKilled = true; incoming.kill(sendUpstream); } + + @Override + public void dump() { + logger.error("HashAggBatch[container={}, aggregator={}, groupByOutFieldIds={}, aggrOutFieldIds={}, " + + "incomingSchema={}, wasKilled={}, numGroupByExprs={}, numAggrExprs={}, popConfig={}]", + container, aggregator, Arrays.toString(groupByOutFieldIds), Arrays.toString(aggrOutFieldIds), incomingSchema, + wasKilled, numGroupByExprs, numAggrExprs, popConfig); + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java index e8ae30e..f6dd3da 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java @@ -25,6 +25,7 @@ import java.util.concurrent.TimeUnit; import javax.inject.Named; +import org.apache.commons.lang3.builder.ReflectionToStringBuilder; import org.apache.drill.common.exceptions.RetryAfterSpillException; import org.apache.drill.common.exceptions.UserException; import org.apache.drill.common.expression.ExpressionPosition; @@ -1603,6 +1604,14 @@ public abstract class HashAggTemplate implements HashAggregator { } } + @Override + public String toString() { + // The fields are excluded because they are passed from HashAggBatch + String[] excludedFields = new String[] { + "baseHashTable", "incoming", "outgoing", "context", "oContext", "allocator", "htables", "newIncoming"}; + return ReflectionToStringBuilder.toStringExclude(this, excludedFields); + } + // Code-generated methods (implemented in HashAggBatch) public abstract void doSetup(@Named("incoming") RecordBatch incoming) throws SchemaChangeException; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/SpilledRecordbatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/SpilledRecordbatch.java index 7ebce2b..822a810 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/SpilledRecordbatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/SpilledRecordbatch.java @@ -25,6 +25,7 @@ import org.apache.drill.exec.ops.OperatorContext; import org.apache.drill.exec.physical.impl.spill.SpillSet; import org.apache.drill.exec.record.BatchSchema; import org.apache.drill.exec.record.CloseableRecordBatch; +import org.apache.drill.exec.record.SimpleRecordBatch; import org.apache.drill.exec.record.TypedFieldId; import org.apache.drill.exec.record.VectorContainer; import org.apache.drill.exec.record.VectorWrapper; @@ -40,6 +41,9 @@ import java.util.Iterator; * A class to replace "incoming" - instead scanning a spilled partition file */ public class SpilledRecordbatch implements CloseableRecordBatch { + + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SimpleRecordBatch.class); + private VectorContainer container; private InputStream spillStream; private int spilledBatches; @@ -49,6 +53,9 @@ public class SpilledRecordbatch implements CloseableRecordBatch { private String spillFile; VectorAccessibleSerializable vas; private IterOutcome initialOutcome; + // Represents last outcome of next(). If an Exception is thrown + // during the method's execution a value IterOutcome.STOP will be assigned. + private IterOutcome lastOutcome; public SpilledRecordbatch(String spillFile, int spilledBatches, FragmentContext context, BatchSchema schema, OperatorContext oContext, SpillSet spillSet) { this.context = context; @@ -66,6 +73,7 @@ public class SpilledRecordbatch implements CloseableRecordBatch { } initialOutcome = next(); // initialize the container + lastOutcome = initialOutcome; } @Override @@ -126,14 +134,19 @@ public class SpilledRecordbatch implements CloseableRecordBatch { @Override public IterOutcome next() { - if ( ! context.getExecutorState().shouldContinue() ) { return IterOutcome.STOP; } + if (!context.getExecutorState().shouldContinue()) { + lastOutcome = IterOutcome.STOP; + return lastOutcome; + } if ( spilledBatches <= 0 ) { // no more batches to read in this partition this.close(); - return IterOutcome.NONE; + lastOutcome = IterOutcome.NONE; + return lastOutcome; } if ( spillStream == null ) { + lastOutcome = IterOutcome.STOP; throw new IllegalStateException("Spill stream was null"); } @@ -152,11 +165,16 @@ public class SpilledRecordbatch implements CloseableRecordBatch { container = vas.get(); } } catch (IOException e) { + lastOutcome = IterOutcome.STOP; throw UserException.dataReadError(e).addContext("Failed reading from a spill file").build(HashAggTemplate.logger); + } catch (Exception e) { + lastOutcome = IterOutcome.STOP; + throw e; } spilledBatches--; // one less batch to read - return IterOutcome.OK; + lastOutcome = IterOutcome.OK; + return lastOutcome; } /** @@ -164,6 +182,17 @@ public class SpilledRecordbatch implements CloseableRecordBatch { */ public IterOutcome getInitialOutcome() { return initialOutcome; } + @Override + public void dump() { + logger.error("SpilledRecordbatch[container={}, spilledBatches={}, schema={}, spillFile={}, spillSet={}]", + container, spilledBatches, schema, spillFile, spillSet); + } + + @Override + public boolean hasFailed() { + return lastOutcome == IterOutcome.STOP; + } + /** * Note: ignoring any IO errors (e.g. file not found) */ diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java index c42f9bf..2b9b317 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java @@ -648,4 +648,10 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> { protected void killIncoming(boolean sendUpstream) { incoming.kill(sendUpstream); } + + @Override + public void dump() { + logger.error("StreamingAggBatch[container={}, popConfig={}, aggregator={}, incomingSchema={}]", + container, popConfig, aggregator, incomingSchema); + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java index f30616b..4bde7ab 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java @@ -498,6 +498,16 @@ public abstract class StreamingAggTemplate implements StreamingAggregator { public void cleanup() { } + @Override + public String toString() { + return "StreamingAggTemplate[underlyingIndex=" + underlyingIndex + + ", previousIndex=" + previousIndex + + ", currentIndex=" + currentIndex + + ", addedRecordCount=" + addedRecordCount + + ", outputCount=" + outputCount + + "]"; + } + public abstract void setupInterior(@Named("incoming") RecordBatch incoming, @Named("outgoing") RecordBatch outgoing) throws SchemaChangeException; public abstract boolean isSame(@Named("index1") int index1, @Named("index2") int index2); public abstract boolean isSamePrev(@Named("b1Index") int b1Index, @Named("b1") InternalBatch b1, @Named("b2Index") int b2Index); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java index b8d4e76..179e6c1 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java @@ -43,6 +43,9 @@ import org.apache.drill.exec.vector.ValueVector; import org.apache.drill.shaded.guava.com.google.common.collect.Lists; public class FilterRecordBatch extends AbstractSingleRecordBatch<Filter> { + + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FilterRecordBatch.class); + private SelectionVector2 sv2; private SelectionVector4 sv4; private Filterer filter; @@ -196,4 +199,9 @@ public class FilterRecordBatch extends AbstractSingleRecordBatch<Filter> { throw new SchemaChangeException("Failure while attempting to load generated class", e); } } + + @Override + public void dump() { + logger.error("FilterRecordBatch[container={}, selectionVector2={}, filter={}, popConfig={}]", container, sv2, filter, popConfig); + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate2.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate2.java index 7b0183b..483a9ed 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate2.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate2.java @@ -119,4 +119,11 @@ public abstract class FilterTemplate2 implements Filterer { @Named("outIndex") int outIndex) throws SchemaChangeException; + @Override + public String toString() { + return "FilterTemplate2[outgoingSelectionVector=" + outgoingSelectionVector + + ", incomingSelectionVector=" + incomingSelectionVector + + ", svMode=" + svMode + + "]"; + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate4.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate4.java index e09ed75..d85d6f7 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate4.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate4.java @@ -64,4 +64,10 @@ public abstract class FilterTemplate4 implements Filterer { public abstract void doSetup(@Named("context") FragmentContext context, @Named("incoming") RecordBatch incoming, @Named("outgoing") RecordBatch outgoing); public abstract boolean doEval(@Named("inIndex") int inIndex, @Named("outIndex") int outIndex); + @Override + public String toString() { + return "FilterTemplate4[outgoingSelectionVector=" + outgoingSelectionVector + + ", incomingSelectionVector=" + incomingSelectionVector + + "]"; + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/RuntimeFilterRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/RuntimeFilterRecordBatch.java index 7faaaa5..bc21580 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/RuntimeFilterRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/RuntimeFilterRecordBatch.java @@ -247,4 +247,11 @@ public class RuntimeFilterRecordBatch extends AbstractSingleRecordBatch<RuntimeF } } } -} \ No newline at end of file + + @Override + public void dump() { + logger.error("RuntimeFilterRecordBatch[container={}, selectionVector={}, toFilterFields={}, " + + "originalRecordCount={}, batchSchema={}]", + container, sv2, toFilterFields, originalRecordCount, incoming.getSchema()); + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java index 1623319..86ddcd1 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java @@ -532,4 +532,10 @@ public class FlattenRecordBatch extends AbstractSingleRecordBatch<FlattenPOP> { updateStats(); super.close(); } + + @Override + public void dump() { + logger.error("FlattenRecordbatch[hasRemainder={}, remainderIndex={}, recordCount={}, flattener={}, container={}]", + hasRemainder, remainderIndex, recordCount, flattener, container); + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenTemplate.java index e59abac..fe38244 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenTemplate.java @@ -175,4 +175,15 @@ public abstract class FlattenTemplate implements Flattener { @Named("outgoing") RecordBatch outgoing) throws SchemaChangeException; public abstract boolean doEval(@Named("inIndex") int inIndex, @Named("outIndex") int outIndex) throws SchemaChangeException; + + @Override + public String toString() { + return "FlattenTemplate[svMode=" + svMode + + ", fieldToFlatten=" + fieldToFlatten + + ", valueIndex=" + valueIndex + + ", outputLimit=" + outputLimit + + ", innerValueIndex=" + innerValueIndex + + ", currentInnerValueIndex=" + currentInnerValueIndex + + "]"; + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java index 368bb5d..89ab8d4 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java @@ -1276,4 +1276,10 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> { return hj; } + @Override + public void dump() { + logger.error("HashJoinBatch[container={}, left={}, right={}, leftOutcome={}, rightOutcome={}, joinType={}, hashJoinProbe={}," + + " rightExpr={}, canSpill={}, buildSchema={}, probeSchema={}]", container, left, right, leftUpstream, rightUpstream, + joinType, hashJoinProbe, rightExpr, canSpill, buildSchema, probeSchema); + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java index 639f757..71abeda 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java @@ -432,4 +432,17 @@ public abstract class HashJoinProbeTemplate implements HashJoinProbe { (joinType == JoinRelType.RIGHT || joinType == JoinRelType.FULL) ? ProbeState.PROJECT_RIGHT : ProbeState.DONE; // else we're done } + + @Override + public String toString() { + return "HashJoinProbeTemplate[container=" + container + + ", probeSchema=" + probeSchema + + ", joinType=" + joinType + + ", recordsToProcess=" + recordsToProcess + + ", recordsProcessed=" + recordsProcessed + + ", outputRecords=" + outputRecords + + ", probeState=" + probeState + + ", unmatchedBuildIndexes=" + unmatchedBuildIndexes + + "]"; + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java index 1aaf5e2..242687f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java @@ -1205,4 +1205,12 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP> } } } + + @Override + public void dump() { + logger.error("LateralJoinBatch[container={}, left={}, right={}, leftOutcome={}, rightOutcome={}, leftSchema={}, " + + "rightSchema={}, outputIndex={}, leftJoinIndex={}, rightJoinIndex={}, hasRemainderForLeftJoin={}]", + container, left, right, leftUpstream, rightUpstream, leftSchema, rightSchema, outputIndex, + leftJoinIndex, rightJoinIndex, hasRemainderForLeftJoin); + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java index 72f776a..d502c4f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java @@ -547,4 +547,11 @@ public class MergeJoinBatch extends AbstractBinaryRecordBatch<MergeJoinPOP> { } return materializedExpr; } + + @Override + public void dump() { + logger.error("MergeJoinBatch[container={}, left={}, right={}, leftOutcome={}, rightOutcome={}, joinType={}, leftIterator={}," + + " rightIterator={}, joinStatus={}, joinType={}]", + container, left, right, leftUpstream, rightUpstream, joinType, leftIterator, rightIterator, status, joinType); + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java index e2f93ec..6b0c749 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java @@ -459,4 +459,12 @@ public class NestedLoopJoinBatch extends AbstractBinaryRecordBatch<NestedLoopJoi public int getRecordCount() { return outputRecords; } + + @Override + public void dump() { + logger.error("NestedLoopJoinBatch[container={}, left={}, right={}, leftOutcome={}, rightOutcome={}, " + + "leftSchema={}, rightSchema={}, outputRecords={}, rightContainer={}, rightCounts={}]", + container, left, right, leftUpstream, rightUpstream, + leftSchema, rightSchema, outputRecords, rightContainer, rightCounts); + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java index 784d955..bb49187 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java @@ -37,7 +37,7 @@ import static org.apache.drill.exec.record.RecordBatch.IterOutcome.EMIT; import static org.apache.drill.exec.record.RecordBatch.IterOutcome.NONE; public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> { - // private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LimitRecordBatch.class); + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LimitRecordBatch.class); private SelectionVector2 outgoingSv; private SelectionVector2 incomingSv; @@ -58,46 +58,46 @@ public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> { @Override public IterOutcome innerNext() { if (!first && !needMoreRecords(numberOfRecords)) { - outgoingSv.setRecordCount(0); - incoming.kill(true); + outgoingSv.setRecordCount(0); + incoming.kill(true); - IterOutcome upStream = next(incoming); + IterOutcome upStream = next(incoming); + if (upStream == IterOutcome.OUT_OF_MEMORY) { + return upStream; + } + + while (upStream == IterOutcome.OK || upStream == IterOutcome.OK_NEW_SCHEMA) { + // Clear the memory for the incoming batch + for (VectorWrapper<?> wrapper : incoming) { + wrapper.getValueVector().clear(); + } + // clear memory for incoming sv (if any) + if (incomingSv != null) { + incomingSv.clear(); + } + upStream = next(incoming); if (upStream == IterOutcome.OUT_OF_MEMORY) { return upStream; } - - while (upStream == IterOutcome.OK || upStream == IterOutcome.OK_NEW_SCHEMA) { - // Clear the memory for the incoming batch - for (VectorWrapper<?> wrapper : incoming) { - wrapper.getValueVector().clear(); - } - // clear memory for incoming sv (if any) - if (incomingSv != null) { - incomingSv.clear(); - } - upStream = next(incoming); - if (upStream == IterOutcome.OUT_OF_MEMORY) { - return upStream; - } + } + // If EMIT that means leaf operator is UNNEST, in this case refresh the limit states and return EMIT. + if (upStream == EMIT) { + // Clear the memory for the incoming batch + for (VectorWrapper<?> wrapper : incoming) { + wrapper.getValueVector().clear(); } - // If EMIT that means leaf operator is UNNEST, in this case refresh the limit states and return EMIT. - if (upStream == EMIT) { - // Clear the memory for the incoming batch - for (VectorWrapper<?> wrapper : incoming) { - wrapper.getValueVector().clear(); - } - - // clear memory for incoming sv (if any) - if (incomingSv != null) { - incomingSv.clear(); - } - - refreshLimitState(); - return upStream; + + // clear memory for incoming sv (if any) + if (incomingSv != null) { + incomingSv.clear(); } - // other leaf operator behave as before. - return NONE; + + refreshLimitState(); + return upStream; } + // other leaf operator behave as before. + return NONE; + } return super.innerNext(); } @@ -266,4 +266,10 @@ public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> { Integer.MIN_VALUE : Math.max(0, popConfig.getLast()) - recordStartOffset; first = true; } + + @Override + public void dump() { + logger.error("LimitRecordBatch[container={}, offset={}, numberOfRecords={}, incomingSV={}, outgoingSV={}]", + container, recordStartOffset, numberOfRecords, incomingSv, outgoingSv); + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/PartitionLimitRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/PartitionLimitRecordBatch.java index fe1660f..ed7b265 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/PartitionLimitRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/PartitionLimitRecordBatch.java @@ -40,7 +40,7 @@ import static org.apache.drill.exec.record.RecordBatch.IterOutcome.EMIT; * implicit column for rowId for each row. */ public class PartitionLimitRecordBatch extends AbstractSingleRecordBatch<PartitionLimit> { - // private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LimitRecordBatch.class); + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LimitRecordBatch.class); private SelectionVector2 outgoingSv; private SelectionVector2 incomingSv; @@ -250,4 +250,12 @@ public class PartitionLimitRecordBatch extends AbstractSingleRecordBatch<Partiti numberOfRecords = (popConfig.getLast() == null) ? Integer.MIN_VALUE : Math.max(0, popConfig.getLast()) - recordStartOffset; } + + @Override + public void dump() { + logger.error("PartitionLimitRecordBatch[container={}, popConfig={}, incomingSV={}, outgoingSV={}," + + " recordStartOffset={}, numberOfRecords={}, partitionId={}, unionTypeEnabled={}, state={}]", + container, popConfig, incomingSv, outgoingSv, recordStartOffset, numberOfRecords, + partitionId, unionTypeEnabled, state); + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java index 791b24a..12ee668 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java @@ -18,6 +18,7 @@ package org.apache.drill.exec.physical.impl.mergereceiver; import java.io.IOException; +import java.util.Arrays; import java.util.Comparator; import java.util.Iterator; import java.util.List; @@ -827,4 +828,11 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP> super.close(); } + @Override + public void dump() { + logger.error("MergingRecordBatch[container={}, outgoingPosition={}, incomingBatches={}, batchOffsets={}, " + + "tempBatchHolder={}, inputCounts={}, outputCounts={}]", + container, outgoingPosition, Arrays.toString(incomingBatches), Arrays.toString(batchOffsets), + Arrays.toString(tempBatchHolder), Arrays.toString(inputCounts), Arrays.toString(outputCounts)); + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java index a3aa11b..63a0121 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java @@ -646,4 +646,10 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart } } + @Override + public void dump() { + logger.error("OrderedPartitionRecordBatch[container={}, popConfig={}, partitionVectors={}, partitions={}, " + + "recordsSampled={}, recordCount={}]", + container, popConfig, partitionVectors, partitions, recordsSampled, recordCount); + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java index bbcb758..9385400 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java @@ -247,4 +247,9 @@ public class ProducerConsumerBatch extends AbstractRecordBatch<ProducerConsumer> } } + @Override + public void dump() { + logger.error("ProducerConsumerBatch[container={}, recordCount={}, schema={}, stop={}]", + container, recordCount, schema, stop); + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java index 4d55f00..8ea15d3 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java @@ -898,4 +898,10 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> { wasNone = true; return IterOutcome.OK_NEW_SCHEMA; } + + @Override + public void dump() { + logger.error("ProjectRecordBatch[projector={}, hasRemainder={}, remainderIndex={}, recordCount={}, container={}]", + projector, hasRemainder, remainderIndex, recordCount, container); + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectorTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectorTemplate.java index 02ccd4b..2f1aa02 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectorTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectorTemplate.java @@ -110,4 +110,10 @@ public abstract class ProjectorTemplate implements Projector { @Named("outIndex") int outIndex) throws SchemaChangeException; + @Override + public String toString() { + return "Projector[vector2=" + vector2 + + ", selectionVectorMode=" + svMode + + "]"; + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/OperatorRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/OperatorRecordBatch.java index 620f150..e0beab1 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/OperatorRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/OperatorRecordBatch.java @@ -55,6 +55,7 @@ public class OperatorRecordBatch implements CloseableRecordBatch { private final OperatorDriver driver; private final BatchAccessor batchAccessor; + private IterOutcome lastOutcome; public OperatorRecordBatch(FragmentContext context, PhysicalOperator config, OperatorExec opExec) { OperatorContext opContext = context.newOperatorContext(config); @@ -143,7 +144,12 @@ public class OperatorRecordBatch implements CloseableRecordBatch { public IterOutcome next() { try { driver.operatorContext().getStats().startProcessing(); - return driver.next(); + lastOutcome = driver.next(); + return lastOutcome; + } catch (Exception e) { + // mark batch as failed + lastOutcome = IterOutcome.STOP; + throw e; } finally { driver.operatorContext().getStats().stopProcessing(); } @@ -158,4 +164,14 @@ public class OperatorRecordBatch implements CloseableRecordBatch { public VectorContainer getContainer() { return batchAccessor.getOutgoingContainer(); } + + @Override + public boolean hasFailed() { + return lastOutcome == IterOutcome.STOP; + } + + @Override + public void dump() { + logger.error("OperatorRecordBatch[batchAccessor={}, lastOutcome={}]", batchAccessor, lastOutcome); + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java index fc49d43..fcbb10e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java @@ -213,4 +213,8 @@ public class SortBatch extends AbstractRecordBatch<Sort> { incoming.kill(sendUpstream); } + @Override + public void dump() { + logger.error("SortBatch[popConfig={}, container={}, sorter={}, schema={}]", popConfig, container, sorter, schema); + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortTemplate.java index da476cc..b3c6a7f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortTemplate.java @@ -70,4 +70,8 @@ public abstract class SortTemplate implements Sorter, IndexedSortable{ public abstract void doSetup(@Named("context") FragmentContext context, @Named("incoming") VectorContainer incoming, @Named("outgoing") RecordBatch outgoing); public abstract int doEval(@Named("leftIndex") int leftIndex, @Named("rightIndex") int rightIndex); + @Override + public String toString() { + return "SortTemplate[vector4=" + vector4 + "]"; + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java index 1471d5e..a8c3622 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java @@ -92,4 +92,9 @@ public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVect public WritableBatch getWritableBatch() { return WritableBatch.get(this); } + + @Override + public void dump() { + logger.error("RemovingRecordBatch[container={}, state={}, copier={}]", container, state, copier); + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java index 50cb26b..d56f848 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java @@ -39,7 +39,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -/* TraceRecordBatch contains value vectors which are exactly the same +/** TraceRecordBatch contains value vectors which are exactly the same * as the incoming record batch's value vectors. If the incoming * record batch has a selection vector (type 2) then TraceRecordBatch * will also contain a selection vector. @@ -171,4 +171,8 @@ public class TraceRecordBatch extends AbstractSingleRecordBatch<Trace> { super.close(); } + @Override + public void dump() { + logger.error("TraceRecordBatch[filename={}, logLocation={}, selectionVector={}]", getFileName(), logLocation, sv); + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java index 7e16d6a..e83fddf 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java @@ -440,4 +440,9 @@ public class UnionAllRecordBatch extends AbstractBinaryRecordBatch<UnionAll> { batchMemoryManager.getAvgOutputRowWidth(), batchMemoryManager.getTotalOutputRecords()); } + @Override + public void dump() { + logger.error("UnionAllRecordBatch[container={}, left={}, right={}, leftOutcome={}, rightOutcome={}, " + + "recordCount={}]", container, left, right, leftUpstream, rightUpstream, recordCount); + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestImpl.java index 285481f..508999f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestImpl.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestImpl.java @@ -191,4 +191,14 @@ public class UnnestImpl implements Unnest { transfers = null; } } + + @Override + public String toString() { + return "UnnestImpl[svMode=" + svMode + + ", outputLimit=" + outputLimit + + ", valueIndex=" + valueIndex + + ", innerValueIndex=" + innerValueIndex + + ", runningInnerValueIndex=" + runningInnerValueIndex + + "]"; + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java index 5f63967..1c8336d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java @@ -241,7 +241,7 @@ public class UnnestRecordBatch extends AbstractTableFunctionRecordBatch<UnnestPO } } - @Override + @Override public VectorContainer getOutgoingContainer() { return this.container; } @@ -446,4 +446,9 @@ public class UnnestRecordBatch extends AbstractTableFunctionRecordBatch<UnnestPO super.close(); } + @Override + public void dump() { + logger.error("UnnestRecordBatch[container={}, unnest={}, hasRemainder={}, remainderIndex={}, " + + "unnestFieldMetadata={}]", container, unnest, hasRemainder, remainderIndex, unnestFieldMetadata); + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java index 8cdc0a1..a6ebef0 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java @@ -64,6 +64,9 @@ public class UnorderedReceiverBatch implements CloseableRecordBatch { private boolean first = true; private final UnorderedReceiver config; private final OperatorContext oContext; + // Represents last outcome of next(). If an Exception is thrown + // during the method's execution a value IterOutcome.STOP will be assigned. + private IterOutcome lastOutcome; public enum Metric implements MetricDef { BYTES_RECEIVED, @@ -156,7 +159,7 @@ public class UnorderedReceiverBatch implements CloseableRecordBatch { public IterOutcome next() { batchLoader.resetRecordCount(); stats.startProcessing(); - try{ + try { RawFragmentBatch batch; try { stats.startWait(); @@ -174,15 +177,17 @@ public class UnorderedReceiverBatch implements CloseableRecordBatch { first = false; if (batch == null) { + lastOutcome = IterOutcome.NONE; batchLoader.zero(); if (!context.getExecutorState().shouldContinue()) { - return IterOutcome.STOP; + lastOutcome = IterOutcome.STOP; } - return IterOutcome.NONE; + return lastOutcome; } if (context.getAllocator().isOverLimit()) { - return IterOutcome.OUT_OF_MEMORY; + lastOutcome = IterOutcome.OUT_OF_MEMORY; + return lastOutcome; } final RecordBatchDef rbd = batch.getHeader().getDef(); @@ -195,14 +200,19 @@ public class UnorderedReceiverBatch implements CloseableRecordBatch { if(schemaChanged) { this.schema = batchLoader.getSchema(); stats.batchReceived(0, rbd.getRecordCount(), true); - return IterOutcome.OK_NEW_SCHEMA; + lastOutcome = IterOutcome.OK_NEW_SCHEMA; } else { stats.batchReceived(0, rbd.getRecordCount(), false); - return IterOutcome.OK; + lastOutcome = IterOutcome.OK; } - } catch(SchemaChangeException | IOException ex) { + return lastOutcome; + } catch (SchemaChangeException | IOException ex) { context.getExecutorState().fail(ex); - return IterOutcome.STOP; + lastOutcome = IterOutcome.STOP; + return lastOutcome; + } catch (Exception e) { + lastOutcome = IterOutcome.STOP; + throw e; } finally { stats.stopProcessing(); } @@ -270,4 +280,14 @@ public class UnorderedReceiverBatch implements CloseableRecordBatch { } } } + + @Override + public void dump() { + logger.error("UnorderedReceiverBatch[batchLoader={}, schema={}]", batchLoader, schema); + } + + @Override + public boolean hasFailed() { + return lastOutcome == IterOutcome.STOP; + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java index 88f4c7d..1ea3895 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java @@ -322,8 +322,7 @@ public class IteratorValidatorBatchIterator implements CloseableRecordBatch { } return batchState; - } - catch (RuntimeException | Error e) { + } catch (RuntimeException | Error e) { exceptionState = e; logger.trace("[#{}, on {}]: incoming next() exception: ({} ->) {}", instNum, batchTypeName, prevBatchState, exceptionState, @@ -366,4 +365,14 @@ public class IteratorValidatorBatchIterator implements CloseableRecordBatch { public RecordBatch getIncoming() { return incoming; } + @Override + public boolean hasFailed() { + return exceptionState != null || batchState == STOP; + } + + @Override + public void dump() { + logger.error("IteratorValidatorBatchIterator[container={}, instNum={}, batchTypeName={}, lastSchema={}, " + + "lastNewSchema={}]", getContainer(), instNum, batchTypeName, lastSchema, lastNewSchema); + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/FrameSupportTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/FrameSupportTemplate.java index 5cd6de7..1e477ec 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/FrameSupportTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/FrameSupportTemplate.java @@ -295,6 +295,17 @@ public abstract class FrameSupportTemplate implements WindowFramer { internal.clear(); } + @Override + public String toString() { + return "FrameSupportTemplate[internal=" + internal + + ", outputCount=" + outputCount + + ", current=" + current + + ", frameLastRow=" + frameLastRow + + ", remainingRows=" + remainingRows + + ", partialPartition=" + partialPartition + + "]"; + } + /** * called once for each peer row of the current frame. * @param index of row to aggregate diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/NoFrameSupportTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/NoFrameSupportTemplate.java index 55c27c1..cc7a04d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/NoFrameSupportTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/NoFrameSupportTemplate.java @@ -298,6 +298,17 @@ public abstract class NoFrameSupportTemplate implements WindowFramer { internal.clear(); } + @Override + public String toString() { + return "FrameSupportTemplate[internal=" + internal + + ", outputCount=" + outputCount + + ", current=" + current + + ", requireFullPartition=" + requireFullPartition + + ", partition=" + partition + + "]"; + } + + /** * called once for each row after we evaluate all peer rows. Used to write a value in the row * diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowDataBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowDataBatch.java index 7d98724..a9baea9 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowDataBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowDataBatch.java @@ -106,4 +106,9 @@ public class WindowDataBatch implements VectorAccessible { public void clear() { container.clear(); } + + @Override + public String toString() { + return "WindowDataBatch[container=" + container + ", recordCount=" + recordCount + "]"; + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java index a372a3c..59e84ef 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java @@ -18,6 +18,7 @@ package org.apache.drill.exec.physical.impl.window; import java.io.IOException; +import java.util.Arrays; import java.util.List; import org.apache.drill.shaded.guava.com.google.common.collect.Iterables; @@ -426,4 +427,10 @@ public class WindowFrameRecordBatch extends AbstractRecordBatch<WindowPOP> { public int getRecordCount() { return framers[0].getOutputCount(); } + + @Override + public void dump() { + logger.error("WindowFrameRecordBatch[container={}, popConfig={}, framers={}, schema={}]", + container, popConfig, Arrays.toString(framers), schema); + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java index 0edf974..262a241 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java @@ -824,4 +824,8 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> { incoming.kill(sendUpstream); } + @Override + public void dump() { + logger.error("ExternalSortBatch[schema={}, sorter={}, mSorter={}, container={}]", schema, sorter, mSorter, container); + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java index 5ebec50..9b10d43 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java @@ -206,4 +206,13 @@ public abstract class MSortTemplate implements MSorter, IndexedSortable { public abstract int doEval(@Named("leftIndex") int leftIndex, @Named("rightIndex") int rightIndex) throws SchemaChangeException; + + @Override + public String toString() { + return "MSortTemplate[vector4=" + vector4 + + ", aux=" + aux + + ", runStarts=" + runStarts + + ", desiredRecordBatchCount=" + desiredRecordBatchCount + + "]"; + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/SingleBatchSorterTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/SingleBatchSorterTemplate.java index de783df..57d2ec3 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/SingleBatchSorterTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/SingleBatchSorterTemplate.java @@ -83,4 +83,9 @@ public abstract class SingleBatchSorterTemplate implements SingleBatchSorter, In public abstract int doEval(@Named("leftIndex") char leftIndex, @Named("rightIndex") char rightIndex) throws SchemaChangeException; + + @Override + public String toString() { + return "SinglebatchSorterTemplate[vector2=" + vector2 + "]"; + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java index 7db4d3b..8fc4a74 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java @@ -698,4 +698,11 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> { SpilledRuns spilledRuns = new SpilledRuns(oContext, spillSet, copierHolder); return new SortImpl(oContext, sortConfig, spilledRuns, outputWrapperContainer); } + + @Override + public void dump() { + logger.error("ExternalSortBatch[schema={}, sortState={}, sortConfig={}, outputWrapperContainer={}, " + + "outputSV4={}, container={}]", + schema, sortState, sortConfig, outputWrapperContainer, outputSV4, container); + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortConfig.java index e592ccb..fbcf1ae 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortConfig.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortConfig.java @@ -137,6 +137,15 @@ public class SortConfig { mergeBatchSize, mSortBatchSize); } + @Override + public String toString() { + return "SortConfig[spillFileSize=" + spillFileSize + + ", spillBatchSize=" + spillBatchSize + + ", mergeBatchSize=" + mergeBatchSize + + ", mSortBatchSize=" + mSortBatchSize + + "]"; + } + public long maxMemory() { return maxMemory; } public int mergeLimit() { return mergeLimit; } public long spillFileSize() { return spillFileSize; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortImpl.java index ac30e94..79294cc 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortImpl.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortImpl.java @@ -592,4 +592,12 @@ public class SortImpl { throw ex; } } + + @Override + public String toString() { + return "SortImpl[config=" + config + + ", outputBatch=" + outputBatch + + ", sizer=" + sizer + + "]"; + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java index c38de2d..362ea29 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java @@ -47,6 +47,10 @@ public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements protected final boolean unionTypeEnabled; protected BatchState state; + // Represents last outcome of next(). If an Exception is thrown + // during the method's execution a value IterOutcome.STOP will be assigned. + private IterOutcome lastOutcome; + protected AbstractRecordBatch(final T popConfig, final FragmentContext context) throws OutOfMemoryException { this(popConfig, context, true, context.newOperatorContext(popConfig)); } @@ -113,7 +117,7 @@ public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements } public final IterOutcome next(final int inputIndex, final RecordBatch b){ - IterOutcome next = null; + IterOutcome next; stats.stopProcessing(); try{ if (!context.getExecutorState().shouldContinue()) { @@ -132,15 +136,15 @@ public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements } switch(next) { - case OK_NEW_SCHEMA: - stats.batchReceived(inputIndex, b.getRecordCount(), true); - break; - case OK: - case EMIT: - stats.batchReceived(inputIndex, b.getRecordCount(), false); - break; - default: - break; + case OK_NEW_SCHEMA: + stats.batchReceived(inputIndex, b.getRecordCount(), true); + break; + case OK: + case EMIT: + stats.batchReceived(inputIndex, b.getRecordCount(), false); + break; + default: + break; } return next; @@ -155,27 +159,38 @@ public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements buildSchema(); switch (state) { case DONE: - return IterOutcome.NONE; + lastOutcome = IterOutcome.NONE; + break; case OUT_OF_MEMORY: // because we don't support schema changes, it is safe to fail the query right away context.getExecutorState().fail(UserException.memoryError() .build(logger)); // FALL-THROUGH case STOP: - return IterOutcome.STOP; + lastOutcome = IterOutcome.STOP; + break; default: state = BatchState.FIRST; - return IterOutcome.OK_NEW_SCHEMA; + lastOutcome = IterOutcome.OK_NEW_SCHEMA; + break; } + break; } case DONE: { - return IterOutcome.NONE; + lastOutcome = IterOutcome.NONE; + break; } default: - return innerNext(); + lastOutcome = innerNext(); + break; } + return lastOutcome; } catch (final SchemaChangeException e) { + lastOutcome = IterOutcome.STOP; throw new DrillRuntimeException(e); + } catch (Exception e) { + lastOutcome = IterOutcome.STOP; + throw e; } finally { stats.stopProcessing(); } @@ -245,6 +260,11 @@ public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements return container; } + @Override + public boolean hasFailed() { + return lastOutcome == IterOutcome.STOP; + } + public RecordBatchStatsContext getRecordBatchStatsContext() { return batchStatsContext; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractTableFunctionRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractTableFunctionRecordBatch.java index cb3a61c..dda4ef5 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractTableFunctionRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractTableFunctionRecordBatch.java @@ -59,6 +59,5 @@ public abstract class AbstractTableFunctionRecordBatch<T extends PhysicalOperato setIncoming(incoming.getIncoming()); lateral = incoming; } - } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java index c65827c..7473c8c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java @@ -320,4 +320,19 @@ public interface RecordBatch extends VectorAccessible { * buffers. */ WritableBatch getWritableBatch(); + + /** + * Perform dump of this batch's state to logs. + */ + void dump(); + + /** + * Use this method to see if the batch has failed. Currently used when logging {@code RecordBatch}'s + * state using {@link #dump()} method. + * + * @return {@code true} if either {@link org.apache.drill.exec.record.RecordBatch.IterOutcome#STOP} + * was returned by its or child's {@link #next()} invocation or there was an {@code Exception} thrown + * during execution of the batch; {@code false} otherwise + */ + boolean hasFailed(); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java index e841f2f..696d6db 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java @@ -292,4 +292,12 @@ public class RecordBatchLoader implements VectorAccessible, Iterable<VectorWrapp container.clear(); resetRecordCount(); } + + @Override + public String toString() { + return "RecordBatchLoader[container=" + container + + ", valueCount=" + valueCount + + ", schema=" + schema + + "]"; + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordIterator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordIterator.java index 47b11a6..04cf32e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordIterator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordIterator.java @@ -367,4 +367,18 @@ public class RecordIterator implements VectorAccessible { clear(); clearInflightBatches(); } + + @Override + public String toString() { + return "RecordIterator[outerPosition=" + outerPosition + + ", innerPosition=" + innerPosition + + ", innerRecordCount=" + innerRecordCount + + ", totalRecordCount=" + totalRecordCount + + ", startBatchPosition=" + startBatchPosition + + ", markedInnerPosition" + markedInnerPosition + + ", markedOuterPosition=" + markedOuterPosition + + ", lastOutcome=" + lastOutcome + + ", inputIndex=" + inputIndex + + "]"; + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemalessBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemalessBatch.java index 9dfa129..e4278ba 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemalessBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemalessBatch.java @@ -107,4 +107,14 @@ public class SchemalessBatch implements CloseableRecordBatch { @Override public VectorContainer getContainer() { return null; } + + @Override + public boolean hasFailed() { + return false; + } + + @Override + public void dump() { + logger.error("SchemalessBatch[]"); + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleRecordBatch.java index 4063e55..c588f25 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleRecordBatch.java @@ -28,6 +28,9 @@ import java.util.Iterator; * Wrap a VectorContainer into a record batch. */ public class SimpleRecordBatch implements RecordBatch { + + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SimpleRecordBatch.class); + private VectorContainer container; private FragmentContext context; @@ -99,4 +102,14 @@ public class SimpleRecordBatch implements RecordBatch { public VectorContainer getContainer() { return container; } + + @Override + public void dump() { + logger.error("SimpleRecordBatch[container=" + container + "]"); + } + + @Override + public boolean hasFailed() { + return false; + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector4.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector4.java index a0b47ed..4f4f88d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector4.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector4.java @@ -161,4 +161,13 @@ public class SelectionVector4 implements AutoCloseable { public void close() { clear(); } + + @Override + public String toString() { + return "SelectionVector4[data=" + data + + ", recordCount=" + recordCount + + ", start=" + start + + ", length=" + length + + "]"; + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractRecordReader.java index 7accdc4..9314da6 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractRecordReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractRecordReader.java @@ -104,5 +104,4 @@ public abstract class AbstractRecordReader implements RecordReader { protected List<SchemaPath> getDefaultColumnsToRead() { return GroupScan.ALL_COLUMNS; } - } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/RecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/RecordReader.java index a0dda01..edd91d1 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/RecordReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/RecordReader.java @@ -26,8 +26,8 @@ import org.apache.drill.exec.physical.impl.OutputMutator; import org.apache.drill.exec.vector.ValueVector; public interface RecordReader extends AutoCloseable { - public static final long ALLOCATOR_INITIAL_RESERVATION = 1*1024*1024; - public static final long ALLOCATOR_MAX_RESERVATION = 20L*1000*1000*1000; + long ALLOCATOR_INITIAL_RESERVATION = 1 * 1024 * 1024; + long ALLOCATOR_MAX_RESERVATION = 20L * 1000 * 1000 * 1000; /** * Configure the RecordReader with the provided schema and the record batch that should be written to. diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StorageStrategy.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StorageStrategy.java index 68c3c36..31c0103 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StorageStrategy.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StorageStrategy.java @@ -222,4 +222,9 @@ public class StorageStrategy { fs.deleteOnExit(path); } } -} \ No newline at end of file + + @Override + public String toString() { + return "StorageStrategy[umask=" + umask + ", deleteOnExist=" + deleteOnExit + "]"; + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroRecordReader.java index 6945fff..7668130 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroRecordReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroRecordReader.java @@ -66,7 +66,7 @@ import org.joda.time.DateTimeConstants; /** * A RecordReader implementation for Avro data files. * - * @see RecordReader + * @see org.apache.drill.exec.store.RecordReader */ public class AvroRecordReader extends AbstractRecordReader { @@ -398,4 +398,17 @@ public class AvroRecordReader extends AbstractRecordReader { } } } + + @Override + public String toString() { + long currentPosition = -1L; + try { + currentPosition = reader.tell(); + } catch (IOException e) { + logger.trace("Unable to obtain reader position: " + e.getMessage()); + } + return "AvroRecordReader[File=" + hadoop + + ", Position=" + currentPosition + + "]"; + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/bson/BsonRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/bson/BsonRecordReader.java index 5d9e105..2580010 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/bson/BsonRecordReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/bson/BsonRecordReader.java @@ -23,7 +23,6 @@ import java.nio.ByteBuffer; import java.util.List; import org.apache.drill.common.exceptions.DrillRuntimeException; -import org.apache.drill.common.exceptions.UserException; import org.apache.drill.common.expression.PathSegment; import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.exec.expr.holders.BigIntHolder; @@ -54,6 +53,8 @@ public class BsonRecordReader { private final boolean readNumbersAsDouble; protected DrillBuf workBuf; private String currentFieldName; + // Used for error context + private BsonReader reader; public BsonRecordReader(DrillBuf managedBuf, boolean allTextMode, boolean readNumbersAsDouble) { this(managedBuf, GroupScan.ALL_COLUMNS, readNumbersAsDouble); @@ -67,6 +68,7 @@ public class BsonRecordReader { } public void write(ComplexWriter writer, BsonReader reader) throws IOException { + this.reader = reader; reader.readStartDocument(); BsonType readBsonType = reader.getCurrentBsonType(); switch (readBsonType) { @@ -364,17 +366,20 @@ public class BsonRecordReader { } } - public UserException.Builder getExceptionWithContext(UserException.Builder exceptionBuilder, String field, - String msg, Object... args) { - return null; - } - - public UserException.Builder getExceptionWithContext(Throwable exception, String field, String msg, Object... args) { - return null; - } - private void ensure(final int length) { workBuf = workBuf.reallocIfNeeded(length); } + @Override + public String toString() { + StringBuilder sb = new StringBuilder("BsonRecordReader["); + if (reader != null) { + sb.append("Name=") + .append(reader.getCurrentName()) + .append(", Type=") + .append(reader.getCurrentBsonType()); + } + sb.append(']'); + return sb.toString(); + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyWriter.java index 7a4e4a2..379e2c9 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyWriter.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyWriter.java @@ -103,4 +103,12 @@ public class EasyWriter extends AbstractWriter { public int getOperatorType() { return formatPlugin.getWriterOperatorType(); } + + @Override + public String toString() { + return "EasyWriter[location=" + location + + ", storageStrategy=" + getStorageStrategy() + + ", partitionColumns=" + partitionColumns + + "]"; + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java index 4b8bbf8..62ace66 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java @@ -104,7 +104,7 @@ public class JSONRecordReader extends AbstractRecordReader { "One of inputPath or embeddedContent must be set but not both." ); - if(inputPath != null) { + if (inputPath != null) { this.hadoopPath = new Path(inputPath); } else { this.embeddedContent = embeddedContent; @@ -126,9 +126,11 @@ public class JSONRecordReader extends AbstractRecordReader { public String toString() { return super.toString() + "[hadoopPath = " + hadoopPath + + ", currentRecord=" + currentRecordNumberInFile() + + ", jsonReader=" + jsonReader + ", recordCount = " + recordCount + ", parseErrorCount = " + parseErrorCount - + ", runningRecordCount = " + runningRecordCount + ", ...]"; + + ", runningRecordCount = " + runningRecordCount + ", ...]"; } @Override @@ -162,9 +164,9 @@ public class JSONRecordReader extends AbstractRecordReader { } private void setupParser() throws IOException { - if(hadoopPath != null){ + if (hadoopPath != null) { jsonReader.setSource(stream); - }else{ + } else { jsonReader.setSource(embeddedContent); } jsonReader.setIgnoreJSONParseErrors(skipMalformedJSONRecords); @@ -182,7 +184,7 @@ public class JSONRecordReader extends AbstractRecordReader { } UserException.Builder exceptionBuilder = UserException.dataReadError(e) - .message("%s - %s", suffix, message); + .message("%s - %s", suffix, message); if (columnNr > 0) { exceptionBuilder.pushContext("Column ", columnNr); } @@ -205,36 +207,32 @@ public class JSONRecordReader extends AbstractRecordReader { writer.reset(); recordCount = 0; parseErrorCount = 0; - if(write == ReadState.JSON_RECORD_PARSE_EOF_ERROR){ + if (write == ReadState.JSON_RECORD_PARSE_EOF_ERROR) { return recordCount; } - outside: while(recordCount < DEFAULT_ROWS_PER_BATCH){ - try{ + while (recordCount < DEFAULT_ROWS_PER_BATCH) { + try { writer.setPosition(recordCount); write = jsonReader.write(writer); - if(write == ReadState.WRITE_SUCCEED){ + if (write == ReadState.WRITE_SUCCEED) { recordCount++; - } - else if(write == ReadState.JSON_RECORD_PARSE_ERROR || write == ReadState.JSON_RECORD_PARSE_EOF_ERROR){ - if(skipMalformedJSONRecords == false){ - handleAndRaise("Error parsing JSON", new Exception(hadoopPath.getName() + " : line nos :" + (recordCount+1))); + } else if (write == ReadState.JSON_RECORD_PARSE_ERROR || write == ReadState.JSON_RECORD_PARSE_EOF_ERROR) { + if (!skipMalformedJSONRecords) { + handleAndRaise("Error parsing JSON", new Exception()); } ++parseErrorCount; - if(printSkippedMalformedJSONRecordLineNumber){ - logger.debug("Error parsing JSON in " + hadoopPath.getName() + " : line nos :" + (recordCount+parseErrorCount)); + if (printSkippedMalformedJSONRecordLineNumber) { + logger.debug("Error parsing JSON in " + hadoopPath.getName() + " : line nos :" + (recordCount + parseErrorCount)); } - if(write == ReadState.JSON_RECORD_PARSE_EOF_ERROR){ - break outside; + if (write == ReadState.JSON_RECORD_PARSE_EOF_ERROR) { + break; } + } else { + break; } - else{ - break outside; - } + } catch (IOException ex) { + handleAndRaise("Error parsing JSON", ex); } - catch(IOException ex) - { - handleAndRaise("Error parsing JSON", ex); - } } // Skip empty json file with 0 row. // Only when data source has > 0 row, ensure the batch has one field. diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JsonProcessor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JsonProcessor.java index fba80e5..adab033 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JsonProcessor.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JsonProcessor.java @@ -27,7 +27,7 @@ import com.fasterxml.jackson.databind.JsonNode; public interface JsonProcessor { - public static enum ReadState { + enum ReadState { END_OF_STREAM, JSON_RECORD_PARSE_ERROR, JSON_RECORD_PARSE_EOF_ERROR, @@ -41,17 +41,11 @@ public interface JsonProcessor { void ensureAtLeastOneField(BaseWriter.ComplexWriter writer); - public UserException.Builder getExceptionWithContext(UserException.Builder exceptionBuilder, - String field, - String msg, - Object... args); + UserException.Builder getExceptionWithContext(UserException.Builder exceptionBuilder, String message); - public UserException.Builder getExceptionWithContext(Throwable exception, - String field, - String msg, - Object... args); + UserException.Builder getExceptionWithContext(Throwable exception, String message); - public boolean ignoreJSONParseError(); + boolean ignoreJSONParseError(); - public void setIgnoreJSONParseErrors(boolean ignoreJSONParseErrors); + void setIgnoreJSONParseErrors(boolean ignoreJSONParseErrors); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/reader/BaseJsonProcessor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/reader/BaseJsonProcessor.java index aaa74ae..48a1464 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/reader/BaseJsonProcessor.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/reader/BaseJsonProcessor.java @@ -24,6 +24,7 @@ import java.io.InputStream; import org.apache.drill.exec.store.easy.json.JsonProcessor; +import com.fasterxml.jackson.core.JsonLocation; import com.fasterxml.jackson.core.JsonParser; import com.fasterxml.jackson.core.JsonToken; import com.fasterxml.jackson.databind.JsonNode; @@ -49,8 +50,12 @@ public abstract class BaseJsonProcessor implements JsonProcessor { protected JsonParser parser; protected DrillBuf workBuf; - protected JsonToken lastSeenJsonToken = null; - boolean ignoreJSONParseErrors = false; // default False + protected JsonToken lastSeenJsonToken; + boolean ignoreJSONParseErrors; + /** + * The name of the current field being parsed. For Error messages. + */ + protected String currentFieldName; /** * @@ -90,26 +95,31 @@ public abstract class BaseJsonProcessor implements JsonProcessor { } @Override - public UserException.Builder getExceptionWithContext( - UserException.Builder exceptionBuilder, String field, String msg, - Object... args) { - if (msg != null) { - exceptionBuilder.message(msg, args); - } - if (field != null) { - exceptionBuilder.pushContext("Field ", field); + public String toString() { + JsonLocation location = parser.getCurrentLocation(); + return getClass().getSimpleName() + "[Line=" + location.getLineNr() + + ", Column=" + (location.getColumnNr() + 1) + + ", Field=" + getCurrentField() + + "]"; + } + + @Override + public UserException.Builder getExceptionWithContext(UserException.Builder builder, String message) { + builder.message(message); + JsonLocation location = parser.getCurrentLocation(); + builder.addContext("Line", location.getLineNr()) + .addContext("Column", location.getColumnNr() + 1); + String fieldName = getCurrentField(); + if (fieldName != null) { + builder.addContext("Field", fieldName); } - exceptionBuilder.pushContext("Column ", - parser.getCurrentLocation().getColumnNr() + 1).pushContext("Line ", - parser.getCurrentLocation().getLineNr()); - return exceptionBuilder; + return builder; } @Override - public UserException.Builder getExceptionWithContext(Throwable e, - String field, String msg, Object... args) { + public UserException.Builder getExceptionWithContext(Throwable e, String message) { UserException.Builder exceptionBuilder = UserException.dataReadError(e); - return getExceptionWithContext(exceptionBuilder, field, msg, args); + return getExceptionWithContext(exceptionBuilder, message); } /* @@ -138,4 +148,8 @@ public abstract class BaseJsonProcessor implements JsonProcessor { } return JsonExceptionProcessingState.PROC_SUCCEED; } + + protected String getCurrentField() { + return currentFieldName; + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/reader/CountingJsonReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/reader/CountingJsonReader.java index a5e9f1a..0f92ec5 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/reader/CountingJsonReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/reader/CountingJsonReader.java @@ -23,8 +23,6 @@ import com.fasterxml.jackson.core.JsonToken; import io.netty.buffer.DrillBuf; -import org.apache.drill.exec.store.easy.json.JsonProcessor.ReadState; -import org.apache.drill.exec.store.easy.json.reader.BaseJsonProcessor.JsonExceptionProcessingState; import org.apache.drill.exec.vector.complex.writer.BaseWriter; public class CountingJsonReader extends BaseJsonProcessor { @@ -41,18 +39,14 @@ public class CountingJsonReader extends BaseJsonProcessor { token = parser.nextToken(); } lastSeenJsonToken = null; + if (token == JsonToken.FIELD_NAME) { + currentFieldName = parser.getText(); + } if (!parser.hasCurrentToken()) { return ReadState.END_OF_STREAM; } else if (token != JsonToken.START_OBJECT) { throw new com.fasterxml.jackson.core.JsonParseException( - parser, - String - .format( - "Cannot read from the middle of a record. Current token was %s ", - token)); - // throw new - // IllegalStateException(String.format("Cannot read from the middle of a record. Current token was %s", - // token)); + parser, String.format("Cannot read from the middle of a record. Current token was %s ", token)); } writer.rootAsMap().bit("count").writeBit(1); parser.skipChildren(); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/sequencefile/SequenceFileRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/sequencefile/SequenceFileRecordReader.java index 3d88b1a..549df82 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/sequencefile/SequenceFileRecordReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/sequencefile/SequenceFileRecordReader.java @@ -163,4 +163,19 @@ public class SequenceFileRecordReader extends AbstractRecordReader { logger.warn("Exception closing reader: {}", e); } } -} \ No newline at end of file + + @Override + public String toString() { + long position = -1L; + try { + if (reader != null) { + position = reader.getPos(); + } + } catch (IOException e) { + logger.trace("Unable to obtain reader position: " + e.getMessage()); + } + return "SequenceFileRecordReader[File=" + split.getPath() + + ", Position=" + position + + "]"; + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/CompliantTextRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/CompliantTextRecordReader.java index 9a1d486..7aa9b04 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/CompliantTextRecordReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/CompliantTextRecordReader.java @@ -216,7 +216,7 @@ public class CompliantTextRecordReader extends AbstractRecordReader { } catch (IOException | TextParsingException e) { throw UserException.dataReadError(e) .addContext("Failure while reading file %s. Happened at or shortly before byte position %d.", - split.getPath(), reader.getPos()) + split.getPath(), reader.getPos()) .build(logger); } } @@ -248,4 +248,11 @@ public class CompliantTextRecordReader extends AbstractRecordReader { logger.warn("Exception while closing stream.", e); } } + + @Override + public String toString() { + return "CompliantTextRecordReader[File=" + split.getPath() + + ", reader=" + reader + + "]"; + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/TextReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/TextReader.java index a181d42..7a9ed46 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/TextReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/TextReader.java @@ -500,4 +500,12 @@ final class TextReader { input.close(); } + @Override + public String toString() { + return "TextReader[Line=" + context.currentLine() + + ", Column=" + context.currentChar() + + ", Record=" + context.currentRecord() + + ", Byte pos=" + getPos() + + "]"; + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/httpd/HttpdLogFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/httpd/HttpdLogFormatPlugin.java index 18437df..f43bb88 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/httpd/HttpdLogFormatPlugin.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/httpd/HttpdLogFormatPlugin.java @@ -242,6 +242,14 @@ public class HttpdLogFormatPlugin extends EasyFormatPlugin<HttpdLogFormatPlugin. } } + @Override + public String toString() { + return "HttpdLogRecordReader[Path=" + work.getPath() + + ", Start=" + work.getStart() + + ", Length=" + work.getLength() + + ", Line=" + lineNumber.get() + + "]"; + } } /** diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/image/ImageRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/image/ImageRecordReader.java index 91f8b99..2a4b4fb 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/image/ImageRecordReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/image/ImageRecordReader.java @@ -261,7 +261,7 @@ public class ImageRecordReader extends AbstractRecordReader { } private void processXmpDirectory(final MapWriter writer, final XmpDirectory directory) { - HashSet<String> listItems = new HashSet(); + HashSet<String> listItems = new HashSet<>(); XMPMeta xmpMeta = directory.getXMPMeta(); if (xmpMeta != null) { try { @@ -490,4 +490,9 @@ public class ImageRecordReader extends AbstractRecordReader { metadataStream.close(); } } + + @Override + public String toString() { + return "ImageRecordReader[Path=" + hadoopPath.toUri().getPath() + "]"; + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogRecordReader.java index 56bc1cc..e5d1dc4 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogRecordReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogRecordReader.java @@ -761,4 +761,11 @@ public class LogRecordReader extends AbstractRecordReader { reader = null; } } -} \ No newline at end of file + + @Override + public String toString() { + return "LogRecordReader[File=" + fileWork.getPath() + + ", Line=" + rowIndex + + "]"; + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetWriter.java index 1cd393b..aea3218 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetWriter.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetWriter.java @@ -121,4 +121,11 @@ public class ParquetWriter extends AbstractWriter { return CoreOperatorType.PARQUET_WRITER_VALUE; } + @Override + public String toString() { + return "ParquetWriter[location=" + location + + ", storageStrategy=" + getStorageStrategy() + + ", partitionColumns=" + partitionColumns + + "]"; + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java index 2ada17d..17cf8c4 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java @@ -348,4 +348,14 @@ public class ParquetRecordReader extends AbstractRecordReader { return (int) Math.min(numRecordsToRead, footer.getBlocks().get(rowGroupIndex).getRowCount()); } } + + @Override + public String toString() { + return "ParquetRecordReader[File=" + hadoopPath.toUri() + + ", Row group index=" + rowGroupIndex + + ", Records in row group=" + footer.getBlocks().get(rowGroupIndex).getRowCount() + + ", Total records read=" + (readState != null ? readState.recordsRead() : -1) + + ", Metadata" + footer + + "]"; + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java index 0c69d6d..7108ca6 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java @@ -344,4 +344,9 @@ public class DrillParquetReader extends AbstractRecordReader { this.type = type; } } + + @Override + public String toString() { + return "DrillParquetReader[pageReadStore=" + pageReadStore + "]"; + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapRecordReader.java index 794687f..d688f3b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapRecordReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapRecordReader.java @@ -396,4 +396,9 @@ public class PcapRecordReader extends AbstractRecordReader { .setSafe(count, value, 0, value.remaining()); } } + + @Override + public String toString() { + return "PcapRecordReader[File=" + pathToFile.toUri() + "]"; + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java index d2cd9a8..0db17fb 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java @@ -232,4 +232,13 @@ public class DrillTextRecordReader extends AbstractRecordReader { logger.warn("Exception closing reader: {}", e); } } + + @Override + public String toString() { + return "DrillTextRecordReader[File=" + split.getPath() + + ", Record=" + (totalRecordsRead + 1) + + ", Start=" + split.getStart() + + ", Length=" + split.getLength() + + "]"; + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java index 2810c04..aaa9806 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java @@ -70,10 +70,6 @@ public class JsonReader extends BaseJsonProcessor { * outer list. */ private boolean inOuterList; - /** - * The name of the current field being parsed. For Error messages. - */ - private String currentFieldName; private FieldSelection selection; @@ -214,10 +210,9 @@ public class JsonReader extends BaseJsonProcessor { case WRITE_SUCCEED: break; default: - throw getExceptionWithContext(UserException.dataReadError(), - currentFieldName, null).message( - "Failure while reading JSON. (Got an invalid read state %s )", - readState.toString()).build(logger); + throw getExceptionWithContext(UserException.dataReadError(), null).message( + "Failure while reading JSON. (Got an invalid read state %s )", readState.toString()) + .build(logger); } } catch (com.fasterxml.jackson.core.JsonParseException ex) { if (ignoreJSONParseError()) { @@ -236,13 +231,10 @@ public class JsonReader extends BaseJsonProcessor { private void confirmLast() throws IOException { parser.nextToken(); if (!parser.isClosed()) { - throw getExceptionWithContext(UserException.dataReadError(), - currentFieldName, null) - .message( - "Drill attempted to unwrap a toplevel list " - + "in your document. However, it appears that there is trailing content after this top level list. Drill only " - + "supports querying a set of distinct maps or a single json array with multiple inner maps.") - .build(logger); + String message = "Drill attempted to unwrap a toplevel list in your document. " + + "However, it appears that there is trailing content after this top level list. Drill only " + + "supports querying a set of distinct maps or a single json array with multiple inner maps."; + throw getExceptionWithContext(UserException.dataReadError(), message).build(logger); } } @@ -255,11 +247,9 @@ public class JsonReader extends BaseJsonProcessor { break; case START_ARRAY: if (inOuterList) { - throw getExceptionWithContext(UserException.dataReadError(), - currentFieldName, null) - .message( - "The top level of your document must either be a single array of maps or a set " - + "of white space delimited maps.").build(logger); + String message = "The top level of your document must either be a single array of maps or a set " + + "of white space delimited maps."; + throw getExceptionWithContext(UserException.dataReadError(), message).build(logger); } if (skipOuterList) { @@ -268,11 +258,9 @@ public class JsonReader extends BaseJsonProcessor { inOuterList = true; writeDataSwitch(writer.rootAsMap()); } else { - throw getExceptionWithContext(UserException.dataReadError(), - currentFieldName, null) - .message( - "The top level of your document must either be a single array of maps or a set " - + "of white space delimited maps.").build(logger); + String message = "The top level of your document must either be a single array of maps or a set " + + "of white space delimited maps."; + throw getExceptionWithContext(UserException.dataReadError(), message).build(logger); } } else { @@ -285,17 +273,14 @@ public class JsonReader extends BaseJsonProcessor { confirmLast(); return ReadState.END_OF_STREAM; } else { - throw getExceptionWithContext(UserException.dataReadError(), - currentFieldName, null).message( - "Failure while parsing JSON. Ran across unexpected %s.", - JsonToken.END_ARRAY).build(logger); + throw getExceptionWithContext(UserException.dataReadError(), null).message( + "Failure while parsing JSON. Ran across unexpected %s.", JsonToken.END_ARRAY).build(logger); } case NOT_AVAILABLE: return ReadState.END_OF_STREAM; default: - throw getExceptionWithContext(UserException.dataReadError(), - currentFieldName, null) + throw getExceptionWithContext(UserException.dataReadError(), null) .message( "Failure while parsing JSON. Found token of [%s]. Drill currently only supports parsing " + "json strings that contain either lists or maps. The root object cannot be a scalar.", @@ -412,9 +397,9 @@ public class JsonReader extends BaseJsonProcessor { break; default: - throw getExceptionWithContext(UserException.dataReadError(), - currentFieldName, null).message("Unexpected token %s", - parser.getCurrentToken()).build(logger); + throw getExceptionWithContext(UserException.dataReadError(), null) + .message("Unexpected token %s", parser.getCurrentToken()) + .build(logger); } } @@ -478,8 +463,7 @@ public class JsonReader extends BaseJsonProcessor { break; default: - throw getExceptionWithContext(UserException.dataReadError(), - currentFieldName, null).message("Unexpected token %s", + throw getExceptionWithContext(UserException.dataReadError(), null).message("Unexpected token %s", parser.getCurrentToken()).build(logger); } } @@ -591,8 +575,7 @@ public class JsonReader extends BaseJsonProcessor { .build(logger); } } catch (Exception e) { - throw getExceptionWithContext(e, this.currentFieldName, null).build( - logger); + throw getExceptionWithContext(e, null).build(logger); } } list.endList(); @@ -637,8 +620,7 @@ public class JsonReader extends BaseJsonProcessor { handleString(parser, list); break; default: - throw getExceptionWithContext(UserException.dataReadError(), - currentFieldName, null).message("Unexpected token %s", + throw getExceptionWithContext(UserException.dataReadError(), null).message("Unexpected token %s", parser.getCurrentToken()).build(logger); } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java index 7cb07eb..a9e9e62 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java @@ -207,6 +207,10 @@ public class FragmentExecutor implements Runnable { } private void cleanup(FragmentState state) { + if (fragmentState.get() == FragmentState.FAILED) { + root.dumpBatches(); + } + closeOutResources(); updateState(state); diff --git a/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ColumnChunkIncReadStore.java b/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ColumnChunkIncReadStore.java index 89731ff..4bb1a22 100644 --- a/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ColumnChunkIncReadStore.java +++ b/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ColumnChunkIncReadStore.java @@ -269,4 +269,9 @@ public class ColumnChunkIncReadStore implements PageReadStore { public long getRowCount() { return rowCount; } + + @Override + public String toString() { + return "ColumnChunkIncReadStore[File=" + path.toUri() + "]"; + } } diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestOperatorDump.java b/exec/java-exec/src/test/java/org/apache/drill/TestOperatorDump.java new file mode 100644 index 0000000..18ba61c --- /dev/null +++ b/exec/java-exec/src/test/java/org/apache/drill/TestOperatorDump.java @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill; + +import ch.qos.logback.classic.spi.ILoggingEvent; +import ch.qos.logback.core.ConsoleAppender; +import org.apache.drill.common.exceptions.UserRemoteException; +import org.apache.drill.exec.exception.OutOfMemoryException; +import org.apache.drill.exec.physical.impl.ScanBatch; +import org.apache.drill.exec.physical.impl.xsort.managed.ExternalSortBatch; +import org.apache.drill.exec.testing.Controls; +import org.apache.drill.exec.testing.ControlsInjectionUtil; +import org.apache.drill.test.ClusterFixture; +import org.apache.drill.test.ClusterFixtureBuilder; +import org.apache.drill.test.ClusterTest; +import org.apache.drill.test.LogFixture; +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.junit.Assert.assertTrue; + +public class TestOperatorDump extends ClusterTest { + + private static final String ENTRY_DUMP_COMPLETED = "Batch dump completed"; + private static final String ENTRY_DUMP_STARTED = "Batch dump started"; + + private LogFixture logFixture; + private EventAwareContextAppender appender; + + @BeforeClass + public static void setupFiles() { + dirTestWatcher.copyResourceToRoot(Paths.get("multilevel")); + } + + @Before + public void setup() throws Exception { + ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher); + appender = new EventAwareContextAppender(); + logFixture = LogFixture.builder() + .toConsole(appender, LogFixture.DEFAULT_CONSOLE_FORMAT) + .build(); + startCluster(builder); + } + + @After + public void tearDown(){ + logFixture.close(); + } + + @Test(expected = UserRemoteException.class) + public void testScanBatchChecked() throws Exception { + String exceptionDesc = "next-allocate"; + final String controls = Controls.newBuilder() + .addException(ScanBatch.class, exceptionDesc, OutOfMemoryException.class, 0, 1) + .build(); + ControlsInjectionUtil.setControls(client.client(), controls); + String query = "select * from dfs.`multilevel/parquet` limit 100"; + try { + client.queryBuilder().sql(query).run(); + } catch (UserRemoteException e) { + assertTrue(e.getMessage().contains(exceptionDesc)); + + String[] expectedEntries = new String[] {ENTRY_DUMP_STARTED, ENTRY_DUMP_COMPLETED}; + validateContainsEntries(expectedEntries, ScanBatch.class.getName()); + throw e; + } + } + + @Test(expected = UserRemoteException.class) + public void testExternalSortUnchecked() throws Exception { + Class<?> siteClass = org.apache.drill.exec.physical.impl.xsort.ExternalSortBatch.class; + final String controls = Controls.newBuilder() + .addException(siteClass, ExternalSortBatch.INTERRUPTION_AFTER_SORT, RuntimeException.class) + .build(); + ControlsInjectionUtil.setControls(client.client(), controls); + String query = "select n_name from cp.`tpch/lineitem.parquet` order by n_name"; + try { + client.queryBuilder().sql(query).run(); + } catch (UserRemoteException e) { + assertTrue(e.getMessage().contains(ExternalSortBatch.INTERRUPTION_AFTER_SORT)); + + String[] expectedEntries = new String[] {ENTRY_DUMP_STARTED, ENTRY_DUMP_COMPLETED}; + validateContainsEntries(expectedEntries, ExternalSortBatch.class.getName()); + throw e; + } + } + + private void validateContainsEntries(String[] entries, String expectedClassName) { + if (entries == null) { + entries = new String[0]; + } + List<String> messages = appender.getMessages(); + List<String> entryList = new ArrayList<>(entries.length); + Collections.addAll(entryList, entries); + Iterator<String> it = entryList.iterator(); + while (it.hasNext()) { + String entry = it.next(); + for (String message : messages) { + if (message.contains(entry)) { + it.remove(); + break; + } + } + } + assertTrue(String.format("Entries %s were not found in logs.", entryList), entryList.isEmpty()); + + Set<String> loggerNames = appender.getLoggerNames(); + assertTrue(String.format("Entry for class %s was not found", expectedClassName), + loggerNames.contains(expectedClassName)); + } + + // ConsoleAppender which stores logged events + private static class EventAwareContextAppender extends ConsoleAppender<ILoggingEvent> { + + private List<ILoggingEvent> events = new ArrayList<>(); + + @Override + protected void append(ILoggingEvent e) { + events.add(e); + } + + List<String> getMessages() { + return events.stream() + .map(ILoggingEvent::getMessage) + .collect(Collectors.toList()); + } + + Set<String> getLoggerNames() { + return events.stream() + .map(ILoggingEvent::getLoggerName) + .collect(Collectors.toSet()); + } + } +} diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/MockRecordBatch.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/MockRecordBatch.java index 34d735e..94e0c0e 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/MockRecordBatch.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/MockRecordBatch.java @@ -288,6 +288,15 @@ public class MockRecordBatch implements CloseableRecordBatch { this.limitWithUnnest = limitWithUnnest; } + @Override + public boolean hasFailed() { + return false; + } + + @Override + public void dump() { + } + public static class Builder { private final List<RowSet> rowSets = new ArrayList<>(); private final List<IterOutcome> iterOutcomes = new ArrayList<>(); diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/SimpleRootExec.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/SimpleRootExec.java index 6d5b666..a176646 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/SimpleRootExec.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/SimpleRootExec.java @@ -106,6 +106,11 @@ public class SimpleRootExec implements RootExec, Iterable<ValueVector> { } @Override + public void dumpBatches() { + screenRoot.dumpBatches(); + } + + @Override public void close() throws Exception { screenRoot.close(); } diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/MockLateralJoinBatch.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/MockLateralJoinBatch.java index c7105f9..aefa28a 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/MockLateralJoinBatch.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/MockLateralJoinBatch.java @@ -162,6 +162,15 @@ public class MockLateralJoinBatch implements LateralContract, CloseableRecordBat } + @Override + public boolean hasFailed() { + return false; + } + + @Override + public void dump() { + } + @Override public int getRecordCount() { return 0; } diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/work/filter/BloomFilterTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/work/filter/BloomFilterTest.java index 6f1e6e0..c05cdfd 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/work/filter/BloomFilterTest.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/work/filter/BloomFilterTest.java @@ -121,6 +121,15 @@ public class BloomFilterTest { public Iterator<VectorWrapper<?>> iterator() { return null; } + + @Override + public void dump() { + } + + @Override + public boolean hasFailed() { + return false; + } } diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/LogFixture.java b/exec/java-exec/src/test/java/org/apache/drill/test/LogFixture.java index b54b0b0..b62a188 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/test/LogFixture.java +++ b/exec/java-exec/src/test/java/org/apache/drill/test/LogFixture.java @@ -86,6 +86,7 @@ public class LogFixture implements AutoCloseable { private String consoleFormat = DEFAULT_CONSOLE_FORMAT; private boolean logToConsole; private List<LogSpec> loggers = new ArrayList<>(); + private ConsoleAppender<ILoggingEvent> appender; /** * Send all enabled logging to the console (if not already configured.) Some @@ -102,6 +103,11 @@ public class LogFixture implements AutoCloseable { return this; } + public LogFixtureBuilder toConsole(ConsoleAppender<ILoggingEvent> appender, String format) { + this.appender = appender; + return toConsole(format); + } + /** * Send logging to the console using the defined format. * @@ -195,7 +201,7 @@ public class LogFixture implements AutoCloseable { private void setupConsole(LogFixtureBuilder builder) { drillLogger = (Logger)LoggerFactory.getLogger(DRILL_PACKAGE_NAME); - if (drillLogger.getAppender("STDOUT") != null) { + if (builder.appender == null && drillLogger.getAppender("STDOUT") != null) { return; } LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory(); @@ -204,10 +210,10 @@ public class LogFixture implements AutoCloseable { ple.setContext(lc); ple.start(); - appender = new ConsoleAppender<>( ); + appender = builder.appender == null ? new ConsoleAppender<>() : builder.appender; appender.setContext(lc); appender.setName("Console"); - appender.setEncoder( ple ); + appender.setEncoder(ple); appender.start(); Logger root = (Logger)LoggerFactory.getLogger(Logger.ROOT_LOGGER_NAME);
