This is an automated email from the ASF dual-hosted git repository. volodymyr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/drill.git
commit 93a39cd9b0790129bb6888cbfefc100efd8a6bc8 Author: Paul Rogers <[email protected]> AuthorDate: Fri Nov 29 18:58:59 2019 -0800 DRILL-7324: Final set of "batch count" fixes Final set of fixes for batch count/record count issues. Enables vector checking for all operators. closes #1912 --- .../apache/drill/exec/physical/impl/ScanBatch.java | 1 - .../drill/exec/physical/impl/TopN/TopNBatch.java | 6 +- .../exec/physical/impl/aggregate/HashAggBatch.java | 2 +- .../physical/impl/aggregate/StreamingAggBatch.java | 4 +- .../exec/physical/impl/filter/FilterTemplate2.java | 7 +- .../exec/physical/impl/join/HashJoinBatch.java | 5 +- .../physical/impl/join/NestedLoopJoinBatch.java | 8 +- .../impl/metadata/MetadataControllerBatch.java | 33 ++--- .../RangePartitionRecordBatch.java | 1 - .../impl/statistics/StatisticsMergeBatch.java | 52 +++---- .../impl/svremover/RemovingRecordBatch.java | 7 +- .../physical/impl/union/UnionAllRecordBatch.java | 7 +- .../physical/impl/unnest/UnnestRecordBatch.java | 1 + .../impl/unpivot/UnpivotMapsRecordBatch.java | 4 +- .../physical/impl/validate/BatchValidator.java | 114 +-------------- .../impl/window/WindowFrameRecordBatch.java | 14 +- .../apache/drill/exec/record/VectorAccessible.java | 2 - .../apache/drill/exec/record/VectorContainer.java | 6 + .../exec/store/easy/json/JSONRecordReader.java | 22 +-- .../exec/vector/complex/writer/TestJsonReader.java | 159 +++++++++++++-------- 20 files changed, 177 insertions(+), 278 deletions(-) diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java index 3e658cb..f464b27 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java @@ -575,7 +575,6 @@ public class ScanBatch implements CloseableRecordBatch { } } - @Override public Iterator<VectorWrapper<?>> iterator() { return container.iterator(); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java index 5ae6e76..baef314 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java @@ -559,12 +559,8 @@ public class TopNBatch extends AbstractRecordBatch<TopN> { // Transfers count number of records from hyperBatch to simple container final int copiedRecords = copier.copyRecords(0, count); assert copiedRecords == count; - for (VectorWrapper<?> v : newContainer) { - ValueVector.Mutator m = v.getValueVector().getMutator(); - m.setValueCount(count); - } newContainer.buildSchema(BatchSchema.SelectionVectorMode.NONE); - newContainer.setRecordCount(count); + newContainer.setValueCount(count); // Store all the batches containing limit number of records batchBuilder.add(newBatch); } while (queueSv4.next()); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java index 45c670b..38fb14e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java @@ -269,7 +269,7 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> { for (VectorWrapper<?> w : container) { AllocationHelper.allocatePrecomputedChildCount(w.getValueVector(), 0, 0, 0); } - container.setValueCount(0); + container.setEmpty(); if (incoming.getRecordCount() > 0) { hashAggMemoryManager.update(); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java index c3b504a..586fa32 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java @@ -186,9 +186,7 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> { if (!createAggregator()) { state = BatchState.DONE; } - for (VectorWrapper<?> w : container) { - w.getValueVector().allocateNew(); - } + container.allocateNew(); if (complexWriters != null) { container.buildSchema(SelectionVectorMode.NONE); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate2.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate2.java index c189367..e3b6070 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate2.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate2.java @@ -66,12 +66,7 @@ public abstract class FilterTemplate2 implements Filterer { if (recordCount == 0) { outgoingSelectionVector.setRecordCount(0); outgoingSelectionVector.setBatchActualRecordCount(0); - - // Must allocate vectors, then set count to zero. Allocation - // is needed since offset vectors must contain at least one - // item (the required value of 0 in index location 0.) - outgoing.getContainer().allocateNew(); - outgoing.getContainer().setValueCount(0); + outgoing.getContainer().setEmpty(); return; } if (! outgoingSelectionVector.allocateNewSafe(recordCount)) { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java index eab38ec..cded844 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java @@ -77,6 +77,7 @@ import org.apache.drill.exec.record.JoinBatchMemoryManager; import org.apache.drill.exec.record.MaterializedField; import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.record.TypedFieldId; +import org.apache.drill.exec.record.VectorAccessibleUtilities; import org.apache.drill.exec.record.VectorContainer; import org.apache.drill.exec.record.VectorWrapper; import org.apache.drill.exec.util.record.RecordBatchStats; @@ -703,9 +704,7 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> implem private void killAndDrainUpstream(RecordBatch batch, IterOutcome upstream, boolean isLeft) { batch.kill(true); while (upstream == IterOutcome.OK_NEW_SCHEMA || upstream == IterOutcome.OK) { - for (VectorWrapper<?> wrapper : batch) { - wrapper.getValueVector().clear(); - } + VectorAccessibleUtilities.clear(batch); upstream = next( isLeft ? HashJoinHelper.LEFT_INPUT : HashJoinHelper.RIGHT_INPUT, batch); } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java index 6b7edd2..c84f954 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java @@ -87,10 +87,10 @@ public class NestedLoopJoinBatch extends AbstractBinaryRecordBatch<NestedLoopJoi private int outputRecords; // We accumulate all the batches on the right side in a hyper container. - private ExpandableHyperContainer rightContainer = new ExpandableHyperContainer(); + private final ExpandableHyperContainer rightContainer = new ExpandableHyperContainer(); // Record count of the individual batches in the right hyper container - private LinkedList<Integer> rightCounts = new LinkedList<>(); + private final LinkedList<Integer> rightCounts = new LinkedList<>(); // Generator mapping for the right side @@ -372,9 +372,7 @@ public class NestedLoopJoinBatch extends AbstractBinaryRecordBatch<NestedLoopJoi if (leftUpstream != IterOutcome.NONE) { leftSchema = left.getSchema(); - for (final VectorWrapper<?> vw : left) { - container.addOrGet(vw.getField()); - } + container.copySchemaFrom(left); } if (rightUpstream != IterOutcome.NONE) { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/metadata/MetadataControllerBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/metadata/MetadataControllerBatch.java index 9ccae49..ab82769 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/metadata/MetadataControllerBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/metadata/MetadataControllerBatch.java @@ -17,6 +17,17 @@ */ package org.apache.drill.exec.physical.impl.metadata; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; + import org.apache.commons.lang3.StringUtils; import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.common.types.TypeProtos; @@ -24,6 +35,7 @@ import org.apache.drill.common.types.Types; import org.apache.drill.exec.exception.OutOfMemoryException; import org.apache.drill.exec.metastore.ColumnNamesOptions; import org.apache.drill.exec.metastore.analyze.AnalyzeColumnUtils; +import org.apache.drill.exec.metastore.analyze.MetadataIdentifierUtils; import org.apache.drill.exec.metastore.analyze.MetastoreAnalyzeConstants; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.physical.config.MetadataControllerPOP; @@ -32,7 +44,6 @@ import org.apache.drill.exec.physical.rowSet.RowSetReader; import org.apache.drill.exec.planner.common.DrillStatsTable; import org.apache.drill.exec.planner.physical.PlannerSettings; import org.apache.drill.exec.planner.physical.WriterPrel; -import org.apache.drill.exec.metastore.analyze.MetadataIdentifierUtils; import org.apache.drill.exec.record.AbstractBinaryRecordBatch; import org.apache.drill.exec.record.BatchSchema; import org.apache.drill.exec.record.RecordBatch; @@ -80,17 +91,6 @@ import org.apache.hadoop.fs.Path; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.function.Function; -import java.util.stream.Collectors; - /** * Terminal operator for producing ANALYZE statement. This operator is responsible for converting * obtained metadata, fetching absent metadata from the Metastore and storing resulting metadata into the Metastore. @@ -109,9 +109,9 @@ public class MetadataControllerBatch extends AbstractBinaryRecordBatch<MetadataC private boolean firstLeft = true; private boolean firstRight = true; - private boolean finished = false; - private boolean finishedRight = false; - private int recordCount = 0; + private boolean finished; + private boolean finishedRight; + private int recordCount; protected MetadataControllerBatch(MetadataControllerPOP popConfig, FragmentContext context, RecordBatch left, RecordBatch right) throws OutOfMemoryException { @@ -129,13 +129,10 @@ public class MetadataControllerBatch extends AbstractBinaryRecordBatch<MetadataC protected boolean setupNewSchema() { container.clear(); - container.addOrGet(MetastoreAnalyzeConstants.OK_FIELD_NAME, Types.required(TypeProtos.MinorType.BIT), null); container.addOrGet(MetastoreAnalyzeConstants.SUMMARY_FIELD_NAME, Types.required(TypeProtos.MinorType.VARCHAR), null); - container.buildSchema(BatchSchema.SelectionVectorMode.NONE); container.setEmpty(); - return true; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/rangepartitioner/RangePartitionRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/rangepartitioner/RangePartitionRecordBatch.java index 11d307b..7a61489 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/rangepartitioner/RangePartitionRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/rangepartitioner/RangePartitionRecordBatch.java @@ -184,5 +184,4 @@ public class RangePartitionRecordBatch extends AbstractSingleRecordBatch<RangePa logger.error("RangePartitionRecordBatch[container={}, numPartitions={}, recordCount={}, partitionIdVector={}]", container, numPartitions, recordCount, partitionIdVector); } - } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/statistics/StatisticsMergeBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/statistics/StatisticsMergeBatch.java index 15962ad..921c92b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/statistics/StatisticsMergeBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/statistics/StatisticsMergeBatch.java @@ -23,6 +23,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.TimeZone; + import org.apache.drill.common.exceptions.UserException; import org.apache.drill.common.expression.LogicalExpression; import org.apache.drill.common.expression.ValueExpressions; @@ -46,11 +47,12 @@ import org.apache.drill.exec.vector.ValueVector; import org.apache.drill.exec.vector.complex.MapVector; import org.apache.drill.metastore.statistics.Statistic; import org.apache.drill.shaded.guava.com.google.common.collect.Lists; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** - * * Example input and output: - * Schema of incoming batch: + * Schema of incoming batch:<pre> * "columns" : MAP - Column names * "region_id" : VARCHAR * "sales_city" : VARCHAR @@ -65,7 +67,7 @@ import org.apache.drill.shaded.guava.com.google.common.collect.Lists; * "sales_city" : BIGINT - nonnullstatcount(sales_city) * "cnt" : BIGINT - nonnullstatcount(cnt) * .... another map for next stats function .... - * Schema of outgoing batch: + * </pre>Schema of outgoing batch:<pre> * "schema" : BIGINT - Schema number. For each schema change this number is incremented. * "computed" : DATE - What time is it computed? * "columns" : MAP - Column names @@ -82,17 +84,19 @@ import org.apache.drill.shaded.guava.com.google.common.collect.Lists; * "sales_city" : BIGINT - nonnullstatcount(sales_city) * "cnt" : BIGINT - nonnullstatcount(cnt) * .... another map for next stats function .... + * </pre> */ + public class StatisticsMergeBatch extends AbstractSingleRecordBatch<StatisticsMerge> { - private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(StatisticsMergeBatch.class); - private Map<String, String> functions; + private static final Logger logger = LoggerFactory.getLogger(StatisticsMergeBatch.class); + + private final Map<String, String> functions; private boolean first = true; - private boolean finished = false; - private int schema = 0; - private int recordCount = 0; - private List<String> columnsList = null; + private boolean finished; + private int schema; + private List<String> columnsList; private double samplePercent = 100.0; - private List<MergedStatistic> mergedStatisticList = null; + private final List<MergedStatistic> mergedStatisticList; public StatisticsMergeBatch(StatisticsMerge popConfig, RecordBatch incoming, FragmentContext context) throws OutOfMemoryException { @@ -115,20 +119,6 @@ public class StatisticsMergeBatch extends AbstractSingleRecordBatch<StatisticsMe } /* - * Adds the `name` column value vector in the `parent` map vector. These `name` columns are - * table columns for which statistics will be computed. - */ - private ValueVector addMapVector(String name, MapVector parent, LogicalExpression expr) - throws SchemaChangeException { - LogicalExpression mle = PhysicalOperatorUtil.materializeExpression(expr, incoming, context); - Class<? extends ValueVector> vvc = - TypeHelper.getValueVectorClass(mle.getMajorType().getMinorType(), - mle.getMajorType().getMode()); - ValueVector vector = parent.addOrGet(name, mle.getMajorType(), vvc); - return vector; - } - - /* * Identify the list of fields within a map which are generated by StatisticsMerge. Perform * basic sanity check i.e. all maps have the same number of columns and those columns are * the same in each map @@ -229,8 +219,7 @@ public class StatisticsMergeBatch extends AbstractSingleRecordBatch<StatisticsMe } } } - container.setRecordCount(0); - recordCount = 0; + container.setEmpty(); container.buildSchema(incoming.getSchema().getSelectionVectorMode()); } @@ -238,7 +227,7 @@ public class StatisticsMergeBatch extends AbstractSingleRecordBatch<StatisticsMe * Determines the MajorType based on the incoming value vector. Please look at the * comments above the class definition which describes the incoming/outgoing batch schema */ - private void addVectorToOutgoingContainer(String outStatName, VectorWrapper vw) + private void addVectorToOutgoingContainer(String outStatName, VectorWrapper<?> vw) throws SchemaChangeException { // Input map vector MapVector inputVector = (MapVector) vw.getValueVector(); @@ -306,9 +295,8 @@ public class StatisticsMergeBatch extends AbstractSingleRecordBatch<StatisticsMe } } } - ++recordCount; // Populate the number of records (1) inside the outgoing batch. - container.setRecordCount(1); + container.setValueCount(1); return IterOutcome.OK; } @@ -343,9 +331,7 @@ public class StatisticsMergeBatch extends AbstractSingleRecordBatch<StatisticsMe } @Override - public void dump() { - - } + public void dump() { } @Override public IterOutcome innerNext() { @@ -404,6 +390,6 @@ public class StatisticsMergeBatch extends AbstractSingleRecordBatch<StatisticsMe @Override public int getRecordCount() { - return recordCount; + return container.getRecordCount(); } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java index 4471248..a9584bb 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java @@ -25,13 +25,16 @@ import org.apache.drill.exec.record.AbstractSingleRecordBatch; import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.record.WritableBatch; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVectorRemover>{ - private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RemovingRecordBatch.class); + private static final Logger logger = LoggerFactory.getLogger(RemovingRecordBatch.class); private Copier copier; - public RemovingRecordBatch(SelectionVectorRemover popConfig, FragmentContext context, RecordBatch incoming) throws OutOfMemoryException { + public RemovingRecordBatch(SelectionVectorRemover popConfig, FragmentContext context, + RecordBatch incoming) throws OutOfMemoryException { super(popConfig, context, incoming); logger.debug("Created."); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java index 25dae80..ed2b66e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java @@ -18,6 +18,7 @@ package org.apache.drill.exec.physical.impl.union; import java.io.IOException; +import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.NoSuchElementException; @@ -68,8 +69,8 @@ public class UnionAllRecordBatch extends AbstractBinaryRecordBatch<UnionAll> { private final SchemaChangeCallBack callBack = new SchemaChangeCallBack(); private UnionAller unionall; - private final List<TransferPair> transfers = Lists.newArrayList(); - private final List<ValueVector> allocationVectors = Lists.newArrayList(); + private final List<TransferPair> transfers = new ArrayList<>(); + private final List<ValueVector> allocationVectors = new ArrayList<>(); private int recordCount; private UnionInputIterator unionInputIterator; @@ -341,7 +342,7 @@ public class UnionAllRecordBatch extends AbstractBinaryRecordBatch<UnionAll> { } private class UnionInputIterator implements Iterator<Pair<IterOutcome, BatchStatusWrappper>> { - private Stack<BatchStatusWrappper> batchStatusStack = new Stack<>(); + private final Stack<BatchStatusWrappper> batchStatusStack = new Stack<>(); UnionInputIterator(IterOutcome leftOutCome, RecordBatch left, IterOutcome rightOutCome, RecordBatch right) { if (rightOutCome == IterOutcome.OK_NEW_SCHEMA) { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java index 1715c99..85eceea 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java @@ -295,6 +295,7 @@ public class UnnestRecordBatch extends AbstractTableFunctionRecordBatch<UnnestPO remainderIndex = 0; logger.debug("IterOutcome: EMIT."); } + rowIdVector.getMutator().setValueCount(outputRecords); container.setValueCount(outputRecords); memoryManager.updateOutgoingStats(outputRecords); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unpivot/UnpivotMapsRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unpivot/UnpivotMapsRecordBatch.java index 99bd6d1..72a337a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unpivot/UnpivotMapsRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unpivot/UnpivotMapsRecordBatch.java @@ -64,8 +64,8 @@ import org.slf4j.LoggerFactory; * "sales_city" : BIGINT - nonnullstatcount(sales_city) * "cnt" : BIGINT - nonnullstatcount(cnt) * .... another map for next stats function .... - * - * Schema of output: + * </pre> + * Schema of output: <pre> * "schema" : BIGINT - Schema number. For each schema change this number is incremented. * "computed" : BIGINT - What time is this computed? * "column" : column name diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/BatchValidator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/BatchValidator.java index 8793a65..e1ffd7a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/BatchValidator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/BatchValidator.java @@ -17,41 +17,7 @@ */ package org.apache.drill.exec.physical.impl.validate; -import java.util.IdentityHashMap; -import java.util.Map; - import org.apache.drill.common.types.TypeProtos.MinorType; -import org.apache.drill.exec.physical.impl.ScanBatch; -import org.apache.drill.exec.physical.impl.WriterRecordBatch; -import org.apache.drill.exec.physical.impl.TopN.TopNBatch; -import org.apache.drill.exec.physical.impl.aggregate.HashAggBatch; -import org.apache.drill.exec.physical.impl.aggregate.StreamingAggBatch; -import org.apache.drill.exec.physical.impl.filter.FilterRecordBatch; -import org.apache.drill.exec.physical.impl.filter.RuntimeFilterRecordBatch; -import org.apache.drill.exec.physical.impl.flatten.FlattenRecordBatch; -import org.apache.drill.exec.physical.impl.join.HashJoinBatch; -import org.apache.drill.exec.physical.impl.join.MergeJoinBatch; -import org.apache.drill.exec.physical.impl.join.NestedLoopJoinBatch; -import org.apache.drill.exec.physical.impl.limit.LimitRecordBatch; -import org.apache.drill.exec.physical.impl.limit.PartitionLimitRecordBatch; -import org.apache.drill.exec.physical.impl.mergereceiver.MergingRecordBatch; -import org.apache.drill.exec.physical.impl.orderedpartitioner.OrderedPartitionRecordBatch; -import org.apache.drill.exec.physical.impl.metadata.MetadataHashAggBatch; -import org.apache.drill.exec.physical.impl.metadata.MetadataStreamAggBatch; -import org.apache.drill.exec.physical.impl.metadata.MetadataControllerBatch; -import org.apache.drill.exec.physical.impl.metadata.MetadataHandlerBatch; -import org.apache.drill.exec.physical.impl.project.ProjectRecordBatch; -import org.apache.drill.exec.physical.impl.protocol.OperatorRecordBatch; -import org.apache.drill.exec.physical.impl.rangepartitioner.RangePartitionRecordBatch; -import org.apache.drill.exec.physical.impl.svremover.RemovingRecordBatch; -import org.apache.drill.exec.physical.impl.trace.TraceRecordBatch; -import org.apache.drill.exec.physical.impl.union.UnionAllRecordBatch; -import org.apache.drill.exec.physical.impl.unnest.UnnestRecordBatch; -import org.apache.drill.exec.physical.impl.unorderedreceiver.UnorderedReceiverBatch; -import org.apache.drill.exec.physical.impl.unpivot.UnpivotMapsRecordBatch; -import org.apache.drill.exec.physical.impl.window.WindowFrameRecordBatch; -import org.apache.drill.exec.physical.impl.xsort.managed.ExternalSortBatch; -import org.apache.drill.exec.record.CloseableRecordBatch; import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.record.SimpleVectorWrapper; import org.apache.drill.exec.record.VectorAccessible; @@ -200,89 +166,13 @@ public class BatchValidator { } } - private enum CheckMode { - /** No checking. */ - NONE, - /** Check only batch, container counts. */ - COUNTS, - /** Check vector value counts. */ - VECTORS - }; - - private static final Map<Class<? extends CloseableRecordBatch>, CheckMode> checkRules = buildRules(); - private final ErrorReporter errorReporter; public BatchValidator(ErrorReporter errorReporter) { this.errorReporter = errorReporter; } - /** - * At present, most operators will not pass the checks here. The following - * table identifies those that should be checked, and the degree of check. - * Over time, this table should include all operators, and thus become - * unnecessary. - */ - private static Map<Class<? extends CloseableRecordBatch>, CheckMode> buildRules() { - Map<Class<? extends CloseableRecordBatch>, CheckMode> rules = new IdentityHashMap<>(); - rules.put(OperatorRecordBatch.class, CheckMode.VECTORS); - rules.put(ScanBatch.class, CheckMode.VECTORS); - rules.put(ProjectRecordBatch.class, CheckMode.VECTORS); - rules.put(FilterRecordBatch.class, CheckMode.VECTORS); - rules.put(PartitionLimitRecordBatch.class, CheckMode.VECTORS); - rules.put(UnnestRecordBatch.class, CheckMode.VECTORS); - rules.put(HashAggBatch.class, CheckMode.VECTORS); - rules.put(RemovingRecordBatch.class, CheckMode.VECTORS); - rules.put(StreamingAggBatch.class, CheckMode.VECTORS); - rules.put(RuntimeFilterRecordBatch.class, CheckMode.VECTORS); - rules.put(FlattenRecordBatch.class, CheckMode.VECTORS); - rules.put(MergeJoinBatch.class, CheckMode.VECTORS); - rules.put(NestedLoopJoinBatch.class, CheckMode.VECTORS); - rules.put(LimitRecordBatch.class, CheckMode.VECTORS); - rules.put(MergingRecordBatch.class, CheckMode.VECTORS); - rules.put(OrderedPartitionRecordBatch.class, CheckMode.VECTORS); - rules.put(RangePartitionRecordBatch.class, CheckMode.VECTORS); - rules.put(TraceRecordBatch.class, CheckMode.VECTORS); - rules.put(UnionAllRecordBatch.class, CheckMode.VECTORS); - rules.put(UnorderedReceiverBatch.class, CheckMode.VECTORS); - rules.put(UnpivotMapsRecordBatch.class, CheckMode.VECTORS); - rules.put(WindowFrameRecordBatch.class, CheckMode.VECTORS); - rules.put(TopNBatch.class, CheckMode.VECTORS); - rules.put(HashJoinBatch.class, CheckMode.VECTORS); - rules.put(ExternalSortBatch.class, CheckMode.VECTORS); - rules.put(WriterRecordBatch.class, CheckMode.VECTORS); - rules.put(MetadataStreamAggBatch.class, CheckMode.VECTORS); - rules.put(MetadataHashAggBatch.class, CheckMode.VECTORS); - rules.put(MetadataHandlerBatch.class, CheckMode.VECTORS); - rules.put(MetadataControllerBatch.class, CheckMode.VECTORS); - return rules; - } - - private static CheckMode lookup(Object subject) { - CheckMode checkMode = checkRules.get(subject.getClass()); - return checkMode == null ? CheckMode.NONE : checkMode; - } - public static boolean validate(RecordBatch batch) { - // This is a handy place to trace batches as they flow up - // the DAG. Works best for single-threaded runs with few records. - // System.out.println(batch.getClass().getSimpleName()); - // RowSetFormatter.print(batch); - - CheckMode checkMode = lookup(batch); - - // If no rule, don't check this batch. - - if (checkMode == CheckMode.NONE) { - - // As work proceeds, might want to log those batches not checked. - // For now, there are too many. - - return true; - } - - // All batches that do any checks will at least check counts. - ErrorReporter reporter = errorReporter(batch); int rowCount = batch.getRecordCount(); int valueCount = rowCount; @@ -340,9 +230,7 @@ public class BatchValidator { break; } } - if (checkMode == CheckMode.VECTORS) { - new BatchValidator(reporter).validateBatch(batch, valueCount); - } + new BatchValidator(reporter).validateBatch(batch, valueCount); return reporter.errorCount() == 0; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java index 6ed004f..07a5c76 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java @@ -18,6 +18,7 @@ package org.apache.drill.exec.physical.impl.window; import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -47,7 +48,6 @@ import org.apache.drill.exec.record.VectorAccessible; import org.apache.drill.exec.record.VectorWrapper; import org.apache.drill.exec.vector.ValueVector; import org.apache.drill.shaded.guava.com.google.common.collect.Iterables; -import org.apache.drill.shaded.guava.com.google.common.collect.Lists; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -64,7 +64,7 @@ public class WindowFrameRecordBatch extends AbstractRecordBatch<WindowPOP> { private List<WindowDataBatch> batches; private WindowFramer[] framers; - private final List<WindowFunction> functions = Lists.newArrayList(); + private final List<WindowFunction> functions = new ArrayList<>(); private boolean noMoreBatches; // true when downstream returns NONE private BatchSchema schema; @@ -75,7 +75,7 @@ public class WindowFrameRecordBatch extends AbstractRecordBatch<WindowPOP> { RecordBatch incoming) throws OutOfMemoryException { super(popConfig, context); this.incoming = incoming; - batches = Lists.newArrayList(); + batches = new ArrayList<>(); } /** @@ -260,17 +260,15 @@ public class WindowFrameRecordBatch extends AbstractRecordBatch<WindowPOP> { logger.trace("creating framer(s)"); - List<LogicalExpression> keyExprs = Lists.newArrayList(); - List<LogicalExpression> orderExprs = Lists.newArrayList(); + List<LogicalExpression> keyExprs = new ArrayList<>(); + List<LogicalExpression> orderExprs = new ArrayList<>(); boolean requireFullPartition = false; boolean useDefaultFrame = false; // at least one window function uses the DefaultFrameTemplate boolean useCustomFrame = false; // at least one window function uses the CustomFrameTemplate // all existing vectors will be transferred to the outgoing container in framer.doWork() - for (VectorWrapper<?> wrapper : batch) { - container.addOrGet(wrapper.getField()); - } + container.copySchemaFrom(batch); // add aggregation vectors to the container, and materialize corresponding expressions for (NamedExpression ne : popConfig.getAggregations()) { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorAccessible.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorAccessible.java index f51f521..03e8ffa 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorAccessible.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorAccessible.java @@ -21,10 +21,8 @@ import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.exec.record.selection.SelectionVector2; import org.apache.drill.exec.record.selection.SelectionVector4; -// TODO javadoc public interface VectorAccessible extends Iterable<VectorWrapper<?>> { // TODO are these <?> related in any way? Should they be the same one? - // TODO javadoc VectorWrapper<?> getValueAccessorById(Class<?> clazz, int... fieldIds); /** diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java index 1cfc61d..3796e5a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java @@ -553,4 +553,10 @@ public class VectorContainer implements VectorAccessible { // in the offset vectors that need it. setValueCount(0); } + + public void copySchemaFrom(VectorAccessible other) { + for (VectorWrapper<?> wrapper : other) { + addOrGet(wrapper.getField()); + } + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java index da42b27..0ab4181 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java @@ -79,8 +79,8 @@ public class JSONRecordReader extends AbstractRecordReader { * @param columns pathnames of columns/subfields to read * @throws OutOfMemoryException */ - public JSONRecordReader(final FragmentContext fragmentContext, final Path inputPath, final DrillFileSystem fileSystem, - final List<SchemaPath> columns) throws OutOfMemoryException { + public JSONRecordReader(FragmentContext fragmentContext, Path inputPath, DrillFileSystem fileSystem, + List<SchemaPath> columns) throws OutOfMemoryException { this(fragmentContext, inputPath, null, fileSystem, columns); } @@ -137,15 +137,15 @@ public class JSONRecordReader extends AbstractRecordReader { } @Override - public void setup(final OperatorContext context, final OutputMutator output) throws ExecutionSetupException { + public void setup(OperatorContext context, OutputMutator output) throws ExecutionSetupException { try{ if (hadoopPath != null) { - this.stream = fileSystem.openPossiblyCompressedStream(hadoopPath); + stream = fileSystem.openPossiblyCompressedStream(hadoopPath); } - this.writer = new VectorContainerWriter(output, unionEnabled); + writer = new VectorContainerWriter(output, unionEnabled); if (isSkipQuery()) { - this.jsonReader = new CountingJsonReader(fragmentContext.getManagedBuffer(), enableNanInf, enableEscapeAnyChar); + jsonReader = new CountingJsonReader(fragmentContext.getManagedBuffer(), enableNanInf, enableEscapeAnyChar); } else { this.jsonReader = new JsonReader.Builder(fragmentContext.getManagedBuffer()) .schemaPathColumns(ImmutableList.copyOf(getColumns())) @@ -157,7 +157,7 @@ public class JSONRecordReader extends AbstractRecordReader { .build(); } setupParser(); - } catch (final Exception e){ + } catch (Exception e){ handleAndRaise("Failure reading JSON file", e); } } @@ -182,7 +182,7 @@ public class JSONRecordReader extends AbstractRecordReader { int columnNr = -1; if (e instanceof JsonParseException) { - final JsonParseException ex = (JsonParseException) e; + JsonParseException ex = (JsonParseException) e; message = ex.getOriginalMessage(); columnNr = ex.getLocation().getColumnNr(); } @@ -226,7 +226,8 @@ public class JSONRecordReader extends AbstractRecordReader { } ++parseErrorCount; if (printSkippedMalformedJSONRecordLineNumber) { - logger.debug("Error parsing JSON in " + hadoopPath.getName() + " : line nos :" + (recordCount + parseErrorCount)); + logger.debug("Error parsing JSON in {}: line: {}", + hadoopPath.getName(), recordCount + parseErrorCount); } if (write == ReadState.JSON_RECORD_PARSE_EOF_ERROR) { break; @@ -254,8 +255,9 @@ public class JSONRecordReader extends AbstractRecordReader { @Override public void close() throws Exception { - if(stream != null) { + if (stream != null) { stream.close(); + stream = null; } } } diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestJsonReader.java b/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestJsonReader.java index 79aa1d3..04bc67d 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestJsonReader.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestJsonReader.java @@ -81,6 +81,9 @@ public class TestJsonReader extends BaseTestQuery { @Test public void schemaChange() throws Exception { + // Verifies that the schema change does not cause a + // crash. A pretty minimal test. + // TODO: Verify actual results. test("select b from dfs.`vector/complex/writer/schemaChange/`"); } @@ -267,12 +270,15 @@ public class TestJsonReader extends BaseTestQuery { @Test public void testAllTextMode() throws Exception { - test("alter system set `store.json.all_text_mode` = true"); - String[] queries = {"select * from cp.`store/json/schema_change_int_to_string.json`"}; - long[] rowCounts = {3}; - String filename = "/store/json/schema_change_int_to_string.json"; - runTestsOnFile(filename, UserBitShared.QueryType.SQL, queries, rowCounts); - test("alter system set `store.json.all_text_mode` = false"); + try { + alterSession(ExecConstants.JSON_ALL_TEXT_MODE, true); + String[] queries = {"select * from cp.`store/json/schema_change_int_to_string.json`"}; + long[] rowCounts = {3}; + String filename = "/store/json/schema_change_int_to_string.json"; + runTestsOnFile(filename, UserBitShared.QueryType.SQL, queries, rowCounts); + } finally { + resetSessionOption(ExecConstants.JSON_ALL_TEXT_MODE); + } } @Test @@ -293,58 +299,87 @@ public class TestJsonReader extends BaseTestQuery { @Test public void testNullWhereListExpected() throws Exception { - test("alter system set `store.json.all_text_mode` = true"); - String[] queries = {"select * from cp.`store/json/null_where_list_expected.json`"}; - long[] rowCounts = {3}; - String filename = "/store/json/null_where_list_expected.json"; - runTestsOnFile(filename, UserBitShared.QueryType.SQL, queries, rowCounts); - test("alter system set `store.json.all_text_mode` = false"); + try { + alterSession(ExecConstants.JSON_ALL_TEXT_MODE, true); + String[] queries = {"select * from cp.`store/json/null_where_list_expected.json`"}; + long[] rowCounts = {3}; + String filename = "/store/json/null_where_list_expected.json"; + runTestsOnFile(filename, UserBitShared.QueryType.SQL, queries, rowCounts); + } + finally { + resetSessionOption(ExecConstants.JSON_ALL_TEXT_MODE); + } } @Test public void testNullWhereMapExpected() throws Exception { - test("alter system set `store.json.all_text_mode` = true"); - String[] queries = {"select * from cp.`store/json/null_where_map_expected.json`"}; - long[] rowCounts = {3}; - String filename = "/store/json/null_where_map_expected.json"; - runTestsOnFile(filename, UserBitShared.QueryType.SQL, queries, rowCounts); - test("alter system set `store.json.all_text_mode` = false"); + try { + alterSession(ExecConstants.JSON_ALL_TEXT_MODE, true); + String[] queries = {"select * from cp.`store/json/null_where_map_expected.json`"}; + long[] rowCounts = {3}; + String filename = "/store/json/null_where_map_expected.json"; + runTestsOnFile(filename, UserBitShared.QueryType.SQL, queries, rowCounts); + } + finally { + resetSessionOption(ExecConstants.JSON_ALL_TEXT_MODE); + } } @Test public void ensureProjectionPushdown() throws Exception { - // Tests to make sure that we are correctly eliminating schema changing columns. If completes, means that the projection pushdown was successful. - test("alter system set `store.json.all_text_mode` = false; " - + "select t.field_1, t.field_3.inner_1, t.field_3.inner_2, t.field_4.inner_1 " - + "from cp.`store/json/schema_change_int_to_string.json` t"); + try { + // Tests to make sure that we are correctly eliminating schema changing + // columns. If completes, means that the projection pushdown was + // successful. + test("alter system set `store.json.all_text_mode` = false; " + + "select t.field_1, t.field_3.inner_1, t.field_3.inner_2, t.field_4.inner_1 " + + "from cp.`store/json/schema_change_int_to_string.json` t"); + } finally { + resetSessionOption(ExecConstants.JSON_ALL_TEXT_MODE); + } } - // The project pushdown rule is correctly adding the projected columns to the scan, however it is not removing - // the redundant project operator after the scan, this tests runs a physical plan generated from one of the tests to - // ensure that the project is filtering out the correct data in the scan alone + // The project pushdown rule is correctly adding the projected columns to the + // scan, however it is not removing the redundant project operator after the + // scan, this tests runs a physical plan generated from one of the tests to + // ensure that the project is filtering out the correct data in the scan alone. @Test public void testProjectPushdown() throws Exception { - String[] queries = {Files.asCharSource(DrillFileUtils.getResourceAsFile("/store/json/project_pushdown_json_physical_plan.json"), Charsets.UTF_8).read()}; - long[] rowCounts = {3}; - String filename = "/store/json/schema_change_int_to_string.json"; - test("alter system set `store.json.all_text_mode` = false"); - runTestsOnFile(filename, UserBitShared.QueryType.PHYSICAL, queries, rowCounts); - - List<QueryDataBatch> results = testPhysicalWithResults(queries[0]); - assertEquals(1, results.size()); - // "`field_1`", "`field_3`.`inner_1`", "`field_3`.`inner_2`", "`field_4`.`inner_1`" - - RecordBatchLoader batchLoader = new RecordBatchLoader(getAllocator()); - QueryDataBatch batch = results.get(0); - assertTrue(batchLoader.load(batch.getHeader().getDef(), batch.getData())); - - // this used to be five. It is now three. This is because the plan doesn't have a project. - // Scanners are not responsible for projecting non-existent columns (as long as they project one column) - assertEquals(3, batchLoader.getSchema().getFieldCount()); - testExistentColumns(batchLoader); - - batch.release(); - batchLoader.clear(); + try { + String[] queries = {Files.asCharSource(DrillFileUtils.getResourceAsFile( + "/store/json/project_pushdown_json_physical_plan.json"), Charsets.UTF_8).read()}; + String filename = "/store/json/schema_change_int_to_string.json"; + alterSession(ExecConstants.JSON_ALL_TEXT_MODE, false); + long[] rowCounts = {3}; + runTestsOnFile(filename, UserBitShared.QueryType.PHYSICAL, queries, rowCounts); + + List<QueryDataBatch> results = testPhysicalWithResults(queries[0]); + assertEquals(1, results.size()); + // "`field_1`", "`field_3`.`inner_1`", "`field_3`.`inner_2`", "`field_4`.`inner_1`" + + RecordBatchLoader batchLoader = new RecordBatchLoader(getAllocator()); + QueryDataBatch batch = results.get(0); + assertTrue(batchLoader.load(batch.getHeader().getDef(), batch.getData())); + + // this used to be five. It is now four. This is because the plan doesn't + // have a project. Scanners are not responsible for projecting non-existent + // columns (as long as they project one column) + // + // That said, the JSON format plugin does claim it can do project + // push-down, which means it will ensure columns for any column + // mentioned in the project list, in a form consistent with the schema + // path. In this case, `non_existent`.`nested`.`field` appears in + // the query. But, even more oddly, the missing field is inserted only + // if all text mode is true, omitted if all text mode is false. + // Seems overly complex. + assertEquals(3, batchLoader.getSchema().getFieldCount()); + testExistentColumns(batchLoader); + + batch.release(); + batchLoader.clear(); + } finally { + resetSessionOption(ExecConstants.JSON_ALL_TEXT_MODE); + } } @Test @@ -360,32 +395,32 @@ public class TestJsonReader extends BaseTestQuery { private void testExistentColumns(RecordBatchLoader batchLoader) throws SchemaChangeException { VectorWrapper<?> vw = batchLoader.getValueAccessorById( - RepeatedBigIntVector.class, // - batchLoader.getValueVectorId(SchemaPath.getCompoundPath("field_1")).getFieldIds() // + RepeatedBigIntVector.class, + batchLoader.getValueVectorId(SchemaPath.getCompoundPath("field_1")).getFieldIds() ); assertEquals("[1]", vw.getValueVector().getAccessor().getObject(0).toString()); assertEquals("[5]", vw.getValueVector().getAccessor().getObject(1).toString()); assertEquals("[5,10,15]", vw.getValueVector().getAccessor().getObject(2).toString()); vw = batchLoader.getValueAccessorById( - IntVector.class, // - batchLoader.getValueVectorId(SchemaPath.getCompoundPath("field_3", "inner_1")).getFieldIds() // + IntVector.class, + batchLoader.getValueVectorId(SchemaPath.getCompoundPath("field_3", "inner_1")).getFieldIds() ); assertNull(vw.getValueVector().getAccessor().getObject(0)); assertEquals(2l, vw.getValueVector().getAccessor().getObject(1)); assertEquals(5l, vw.getValueVector().getAccessor().getObject(2)); vw = batchLoader.getValueAccessorById( - IntVector.class, // - batchLoader.getValueVectorId(SchemaPath.getCompoundPath("field_3", "inner_2")).getFieldIds() // + IntVector.class, + batchLoader.getValueVectorId(SchemaPath.getCompoundPath("field_3", "inner_2")).getFieldIds() ); assertNull(vw.getValueVector().getAccessor().getObject(0)); assertNull(vw.getValueVector().getAccessor().getObject(1)); assertEquals(3l, vw.getValueVector().getAccessor().getObject(2)); vw = batchLoader.getValueAccessorById( - RepeatedBigIntVector.class, // - batchLoader.getValueVectorId(SchemaPath.getCompoundPath("field_4", "inner_1")).getFieldIds() // + RepeatedBigIntVector.class, + batchLoader.getValueVectorId(SchemaPath.getCompoundPath("field_4", "inner_1")).getFieldIds() ); assertEquals("[]", vw.getValueVector().getAccessor().getObject(0).toString()); assertEquals("[1,2,3]", vw.getValueVector().getAccessor().getObject(1).toString()); @@ -440,7 +475,7 @@ public class TestJsonReader extends BaseTestQuery { ) ).go(); } finally { - testNoResult("alter session set `exec.enable_union_type` = false"); + resetSessionOption(ExecConstants.ENABLE_UNION_TYPE_KEY); } } @@ -457,7 +492,7 @@ public class TestJsonReader extends BaseTestQuery { .baselineValues(13L, "BIGINT") .go(); } finally { - testNoResult("alter session set `exec.enable_union_type` = false"); + resetSessionOption(ExecConstants.ENABLE_UNION_TYPE_KEY); } } @@ -477,7 +512,7 @@ public class TestJsonReader extends BaseTestQuery { .baselineValues(3L) .go(); } finally { - testNoResult("alter session set `exec.enable_union_type` = false"); + resetSessionOption(ExecConstants.ENABLE_UNION_TYPE_KEY); } } @@ -495,7 +530,7 @@ public class TestJsonReader extends BaseTestQuery { .baselineValues(9L) .go(); } finally { - testNoResult("alter session set `exec.enable_union_type` = false"); + resetSessionOption(ExecConstants.ENABLE_UNION_TYPE_KEY); } } @@ -512,7 +547,7 @@ public class TestJsonReader extends BaseTestQuery { .baselineValues(11.0) .go(); } finally { - testNoResult("alter session set `exec.enable_union_type` = false"); + resetSessionOption(ExecConstants.ENABLE_UNION_TYPE_KEY); } } @@ -536,7 +571,7 @@ public class TestJsonReader extends BaseTestQuery { .baselineValues(20000L) .go(); } finally { - testNoResult("alter session set `exec.enable_union_type` = false"); + resetSessionOption(ExecConstants.ENABLE_UNION_TYPE_KEY); } } @@ -565,7 +600,7 @@ public class TestJsonReader extends BaseTestQuery { .baselineValues(20000L) .go(); } finally { - testNoResult("alter session set `exec.enable_union_type` = false"); + resetSessionOption(ExecConstants.ENABLE_UNION_TYPE_KEY); } } @@ -628,7 +663,7 @@ public class TestJsonReader extends BaseTestQuery { .go(); } finally { - testNoResult("alter session set `store.json.all_text_mode` = false"); + resetSessionOption(ExecConstants.JSON_ALL_TEXT_MODE); } }
