DRILL-5657: Size-aware vector writer structure - Vector and accessor layer - Row Set layer - Tuple and column models - Revised write-time metadata - "Result set loader" layer
this closes #914 Project: http://git-wip-us.apache.org/repos/asf/drill/repo Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/40de8ca4 Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/40de8ca4 Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/40de8ca4 Branch: refs/heads/master Commit: 40de8ca4f47533fa6593d1266403868ae1a2119f Parents: eb0c403 Author: Paul Rogers <[email protected]> Authored: Thu Aug 17 22:41:30 2017 -0700 Committer: Paul Rogers <[email protected]> Committed: Wed Dec 20 21:17:48 2017 -0800 ---------------------------------------------------------------------- .../exec/physical/rowSet/ResultSetLoader.java | 204 +++ .../exec/physical/rowSet/ResultVectorCache.java | 33 + .../exec/physical/rowSet/RowSetLoader.java | 153 +++ .../exec/physical/rowSet/impl/ColumnState.java | 358 +++++ .../physical/rowSet/impl/NullProjectionSet.java | 41 + .../rowSet/impl/NullResultVectorCacheImpl.java | 41 + .../physical/rowSet/impl/NullVectorState.java | 52 + .../rowSet/impl/NullableVectorState.java | 108 ++ .../physical/rowSet/impl/OptionBuilder.java | 134 ++ .../rowSet/impl/PrimitiveColumnState.java | 105 ++ .../physical/rowSet/impl/ProjectionSet.java | 48 + .../physical/rowSet/impl/ProjectionSetImpl.java | 136 ++ .../rowSet/impl/RepeatedVectorState.java | 168 +++ .../rowSet/impl/ResultSetLoaderImpl.java | 775 +++++++++++ .../rowSet/impl/ResultVectorCacheImpl.java | 186 +++ .../physical/rowSet/impl/RowSetLoaderImpl.java | 98 ++ .../physical/rowSet/impl/SingleVectorState.java | 274 ++++ .../exec/physical/rowSet/impl/TupleState.java | 388 ++++++ .../rowSet/impl/VectorContainerBuilder.java | 257 ++++ .../exec/physical/rowSet/impl/VectorState.java | 102 ++ .../physical/rowSet/impl/WriterIndexImpl.java | 100 ++ .../exec/physical/rowSet/impl/package-info.java | 304 +++++ .../physical/rowSet/model/BaseTupleModel.java | 117 ++ .../physical/rowSet/model/ContainerVisitor.java | 115 ++ .../physical/rowSet/model/MetadataProvider.java | 93 ++ .../exec/physical/rowSet/model/ReaderIndex.java | 53 + .../physical/rowSet/model/SchemaInference.java | 61 + .../exec/physical/rowSet/model/TupleModel.java | 117 ++ .../rowSet/model/hyper/BaseReaderBuilder.java | 149 +++ .../rowSet/model/hyper/package-info.java | 30 + .../physical/rowSet/model/package-info.java | 68 + .../rowSet/model/single/BaseReaderBuilder.java | 89 ++ .../rowSet/model/single/BaseWriterBuilder.java | 72 + .../model/single/BuildVectorsFromMetadata.java | 97 ++ .../rowSet/model/single/VectorAllocator.java | 112 ++ .../rowSet/model/single/package-info.java | 28 + .../exec/physical/rowSet/package-info.java | 193 +++ .../apache/drill/exec/record/BatchSchema.java | 42 +- .../apache/drill/exec/record/RecordBatch.java | 3 +- .../apache/drill/exec/record/TupleSchema.java | 534 ++++++++ .../exec/record/selection/SelectionVector2.java | 20 +- .../exec/cache/TestBatchSerialization.java | 22 +- .../exec/physical/impl/TopN/TopNBatchTest.java | 26 +- .../impl/validate/TestBatchValidator.java | 64 +- .../physical/impl/xsort/TestExternalSort.java | 12 +- .../impl/xsort/managed/SortTestUtilities.java | 8 +- .../physical/impl/xsort/managed/TestCopier.java | 146 +- .../impl/xsort/managed/TestShortArrays.java | 8 +- .../impl/xsort/managed/TestSortImpl.java | 46 +- .../physical/impl/xsort/managed/TestSorter.java | 38 +- .../rowSet/impl/TestResultSetLoaderLimits.java | 224 ++++ .../impl/TestResultSetLoaderMapArray.java | 481 +++++++ .../rowSet/impl/TestResultSetLoaderMaps.java | 810 +++++++++++ .../impl/TestResultSetLoaderOmittedValues.java | 379 ++++++ .../impl/TestResultSetLoaderOverflow.java | 680 ++++++++++ .../impl/TestResultSetLoaderProjection.java | 470 +++++++ .../impl/TestResultSetLoaderProtocol.java | 586 ++++++++ .../rowSet/impl/TestResultSetLoaderTorture.java | 453 +++++++ .../rowSet/impl/TestResultSetSchemaChange.java | 245 ++++ .../drill/exec/record/TestTupleSchema.java | 509 +++++++ .../drill/exec/record/TestVectorContainer.java | 127 -- .../exec/record/vector/TestValueVector.java | 12 + .../apache/drill/exec/sql/TestInfoSchema.java | 2 +- .../exec/store/easy/text/compliant/TestCsv.java | 6 +- .../java/org/apache/drill/test/ExampleTest.java | 4 +- .../org/apache/drill/test/OperatorFixture.java | 30 +- .../org/apache/drill/test/QueryBuilder.java | 12 +- .../apache/drill/test/QueryRowSetIterator.java | 2 +- .../drill/test/rowSet/AbstractRowSet.java | 109 +- .../drill/test/rowSet/AbstractSingleRowSet.java | 182 +-- .../apache/drill/test/rowSet/DirectRowSet.java | 171 +-- .../drill/test/rowSet/HyperRowSetImpl.java | 245 +--- .../drill/test/rowSet/IndirectRowSet.java | 38 +- .../org/apache/drill/test/rowSet/RowSet.java | 81 +- .../apache/drill/test/rowSet/RowSetBuilder.java | 32 +- .../drill/test/rowSet/RowSetComparison.java | 124 +- .../apache/drill/test/rowSet/RowSetPrinter.java | 30 +- .../apache/drill/test/rowSet/RowSetReader.java | 54 + .../drill/test/rowSet/RowSetReaderImpl.java | 76 ++ .../apache/drill/test/rowSet/RowSetSchema.java | 304 ----- .../drill/test/rowSet/RowSetUtilities.java | 101 +- .../apache/drill/test/rowSet/RowSetWriter.java | 119 ++ .../drill/test/rowSet/RowSetWriterImpl.java | 155 +++ .../apache/drill/test/rowSet/SchemaBuilder.java | 87 +- .../drill/test/rowSet/file/JsonFileBuilder.java | 35 +- .../drill/test/rowSet/test/DummyWriterTest.java | 169 +++ .../drill/test/rowSet/test/PerformanceTool.java | 296 ++++ .../drill/test/rowSet/test/RowSetTest.java | 858 +++++++----- .../drill/test/rowSet/test/TestFillEmpties.java | 241 ++++ .../test/rowSet/test/TestFixedWidthWriter.java | 444 ++++++ .../rowSet/test/TestOffsetVectorWriter.java | 425 ++++++ .../test/rowSet/test/TestScalarAccessors.java | 1266 ++++++++++++++++++ .../rowSet/test/TestVariableWidthWriter.java | 418 ++++++ .../drill/test/rowSet/test/VectorPrinter.java | 72 + .../apache/drill/vector/TestFillEmpties.java | 55 +- .../apache/drill/vector/TestVectorLimits.java | 487 ------- exec/jdbc-all/pom.xml | 2 +- .../src/main/java/io/netty/buffer/DrillBuf.java | 70 +- .../netty/buffer/PooledByteBufAllocatorL.java | 62 +- .../drill/exec/memory/AllocationManager.java | 89 +- .../main/codegen/templates/ColumnAccessors.java | 383 +++--- .../codegen/templates/FixedValueVectors.java | 293 +--- .../codegen/templates/NullableValueVectors.java | 91 +- .../codegen/templates/RepeatedValueVectors.java | 71 +- .../src/main/codegen/templates/UnionVector.java | 44 +- .../templates/VariableLengthVectors.java | 216 +-- .../drill/exec/record/ColumnMetadata.java | 114 ++ .../drill/exec/record/MaterializedField.java | 41 +- .../apache/drill/exec/record/TupleMetadata.java | 88 ++ .../drill/exec/record/TupleNameSpace.java | 89 ++ .../drill/exec/vector/AllocationHelper.java | 2 +- .../drill/exec/vector/BaseDataValueVector.java | 16 + .../org/apache/drill/exec/vector/BitVector.java | 52 +- .../drill/exec/vector/FixedWidthVector.java | 7 +- .../apache/drill/exec/vector/ObjectVector.java | 26 +- .../drill/exec/vector/UntypedNullVector.java | 59 +- .../apache/drill/exec/vector/ValueVector.java | 53 +- .../drill/exec/vector/VariableWidthVector.java | 4 +- .../apache/drill/exec/vector/VectorUtils.java | 63 - .../apache/drill/exec/vector/ZeroVector.java | 6 +- .../exec/vector/accessor/AccessorUtilities.java | 125 -- .../drill/exec/vector/accessor/ArrayReader.java | 108 +- .../drill/exec/vector/accessor/ArrayWriter.java | 60 +- .../exec/vector/accessor/ColumnAccessor.java | 40 - .../exec/vector/accessor/ColumnReader.java | 64 - .../exec/vector/accessor/ColumnReaderIndex.java | 28 + .../exec/vector/accessor/ColumnWriter.java | 45 - .../exec/vector/accessor/ColumnWriterIndex.java | 76 ++ .../exec/vector/accessor/ObjectReader.java | 60 + .../drill/exec/vector/accessor/ObjectType.java | 28 + .../exec/vector/accessor/ObjectWriter.java | 101 ++ .../vector/accessor/ScalarElementReader.java | 65 + .../exec/vector/accessor/ScalarReader.java | 75 ++ .../exec/vector/accessor/ScalarWriter.java | 71 +- .../exec/vector/accessor/TupleAccessor.java | 71 - .../drill/exec/vector/accessor/TupleReader.java | 36 +- .../drill/exec/vector/accessor/TupleWriter.java | 154 ++- .../drill/exec/vector/accessor/ValueType.java | 31 + .../accessor/impl/AbstractArrayReader.java | 128 -- .../accessor/impl/AbstractArrayWriter.java | 127 -- .../accessor/impl/AbstractColumnAccessor.java | 43 - .../accessor/impl/AbstractColumnReader.java | 126 -- .../accessor/impl/AbstractColumnWriter.java | 87 -- .../accessor/impl/AbstractTupleAccessor.java | 38 - .../vector/accessor/impl/AccessorUtilities.java | 53 + .../accessor/impl/ColumnAccessorFactory.java | 122 -- .../accessor/impl/HierarchicalFormatter.java | 38 + .../accessor/impl/HierarchicalPrinter.java | 238 ++++ .../vector/accessor/impl/TupleReaderImpl.java | 151 --- .../vector/accessor/impl/TupleWriterImpl.java | 162 --- .../exec/vector/accessor/package-info.java | 79 +- .../accessor/reader/AbstractArrayReader.java | 188 +++ .../accessor/reader/AbstractObjectReader.java | 52 + .../accessor/reader/AbstractTupleReader.java | 189 +++ .../accessor/reader/BaseElementReader.java | 187 +++ .../accessor/reader/BaseScalarReader.java | 189 +++ .../accessor/reader/ColumnReaderFactory.java | 109 ++ .../accessor/reader/ElementReaderIndex.java | 24 + .../reader/FixedWidthElementReaderIndex.java | 38 + .../exec/vector/accessor/reader/MapReader.java | 43 + .../accessor/reader/ObjectArrayReader.java | 159 +++ .../accessor/reader/ScalarArrayReader.java | 102 ++ .../vector/accessor/reader/VectorAccessor.java | 26 + .../vector/accessor/reader/package-info.java | 26 + .../accessor/writer/AbstractArrayWriter.java | 348 +++++ .../writer/AbstractFixedWidthWriter.java | 258 ++++ .../accessor/writer/AbstractObjectWriter.java | 72 + .../accessor/writer/AbstractScalarWriter.java | 126 ++ .../accessor/writer/AbstractTupleWriter.java | 450 +++++++ .../accessor/writer/BaseScalarWriter.java | 272 ++++ .../accessor/writer/BaseVarWidthWriter.java | 157 +++ .../accessor/writer/ColumnWriterFactory.java | 196 +++ .../exec/vector/accessor/writer/MapWriter.java | 155 +++ .../accessor/writer/NullableScalarWriter.java | 190 +++ .../accessor/writer/ObjectArrayWriter.java | 143 ++ .../accessor/writer/OffsetVectorWriter.java | 283 ++++ .../accessor/writer/ScalarArrayWriter.java | 229 ++++ .../vector/accessor/writer/WriterEvents.java | 127 ++ .../accessor/writer/dummy/DummyArrayWriter.java | 96 ++ .../writer/dummy/DummyScalarWriter.java | 89 ++ .../accessor/writer/dummy/package-info.java | 54 + .../vector/accessor/writer/package-info.java | 151 +++ .../exec/vector/complex/AbstractMapVector.java | 13 +- .../vector/complex/BaseRepeatedValueVector.java | 13 +- .../drill/exec/vector/complex/ListVector.java | 4 +- .../drill/exec/vector/complex/MapVector.java | 24 +- .../exec/vector/complex/RepeatedListVector.java | 20 +- .../exec/vector/complex/RepeatedMapVector.java | 21 +- 188 files changed, 22717 insertions(+), 4811 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/ResultSetLoader.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/ResultSetLoader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/ResultSetLoader.java new file mode 100644 index 0000000..a4b260b --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/ResultSetLoader.java @@ -0,0 +1,204 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.rowSet; + +import org.apache.drill.exec.record.TupleMetadata; +import org.apache.drill.exec.record.VectorContainer; +import org.apache.drill.exec.vector.BaseValueVector; +import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter; + +/** + * Builds a result set (series of zero or more row sets) based on a defined + * schema which may + * evolve (expand) over time. Automatically rolls "overflow" rows over + * when a batch fills. + * <p> + * Many of the methods in this interface verify that the loader is + * in the proper state. For example, an exception is thrown if the caller + * attempts to save a row before starting a batch. However, the per-column + * write methods are checked only through assertions that should enabled + * during testing, but will be disabled during production. + * + * @see {@link VectorContainerWriter}, the class which this class + * replaces + */ + +public interface ResultSetLoader { + + public static final int DEFAULT_ROW_COUNT = BaseValueVector.INITIAL_VALUE_ALLOCATION; + + /** + * Current schema version. The version increments by one each time + * a column is added. + * @return the current schema version + */ + + int schemaVersion(); + + /** + * Adjust the number of rows to produce in the next batch. Takes + * affect after the next call to {@link #startBatch()}. + * + * @param count target batch row count + */ + + void setTargetRowCount(int count); + + /** + * The number of rows produced by this loader (as configured in the loader + * options.) + * + * @return the target row count for batches that this loader produces + */ + + int targetRowCount(); + + /** + * The largest vector size produced by this loader (as specified by + * the value vector limit.) + * + * @return the largest vector size. Attempting to extend a vector beyond + * this limit causes automatic vector overflow and terminates the + * in-flight batch, even if the batch has not yet reached the target + * row count + */ + + int targetVectorSize(); + + /** + * Total number of batches created. Includes the current batch if + * the row count in this batch is non-zero. + * @return the number of batches produced including the current + * one + */ + + int batchCount(); + + /** + * Total number of rows loaded for all previous batches and the + * current batch. + * @return total row count + */ + + int totalRowCount(); + + /** + * Start a new row batch. Valid only when first started, or after the + * previous batch has been harvested. + */ + + void startBatch(); + + /** + * Writer for the top-level tuple (the entire row). Valid only when + * the mutator is actively writing a batch (after <tt>startBatch()</tt> + * but before </tt>harvest()</tt>.) + * + * @return writer for the top-level columns + */ + + RowSetLoader writer(); + boolean writeable(); + + /** + * Load a row using column values passed as variable-length arguments. Expects + * map values to represented as an array. + * A schema of (a:int, b:map(c:varchar)) would be> + * set as <br><tt>loadRow(10, new Object[] {"foo"});</tt><br> + * Values of arrays can be expressed as a Java + * array. A schema of (a:int, b:int[]) can be set as<br> + * <tt>loadRow(10, new int[] {100, 200});</tt><br>. + * Primarily for testing, too slow for production code. + * <p> + * If the row consists of a single map or list, then the one value will be an + * <tt>Object</tt> array, creating an ambiguity. Use <tt>writer().set(0, value);</tt> + * in this case. + * + * @param values column values in column index order + * @return this loader + */ + + ResultSetLoader setRow(Object...values); + + /** + * Return the output container, primarily to obtain the schema + * and set of vectors. Depending on when this is called, the + * data may or may not be populated: call + * {@link #harvest()} to obtain the container for a batch. + * <p> + * This method is useful when the schema is known and fixed. + * After declaring the schema, call this method to get the container + * that holds the vectors for use in planning projection, etc. + * <p> + * If the result set schema changes, then a call to this method will + * return the latest schema. But, if the schema changes during the + * overflow row, then this method will not see those changes until + * after harvesting the current batch. (This avoid the appearance + * of phantom columns in the output since the new column won't + * appear until the next batch.) + * <p> + * Never count on the data in the container; it may be empty, half + * written, or inconsistent. Always call + * {@link #harvest()} to obtain the container for a batch. + * + * @return the output container including schema and value + * vectors + */ + + VectorContainer outputContainer(); + + /** + * Harvest the current row batch, and reset the mutator + * to the start of the next row batch (which may already contain + * an overflow row. + * <p> + * The schema of the returned container is defined as: + * <ul> + * <li>The schema as passed in via the loader options, plus</li> + * <li>Columns added dynamically during write, minus</li> + * <li>Any columns not included in the project list, minus</li> + * <li>Any columns added in the overflow row.</li> + * </ul> + * That is, column order is as defined by the initial schema and column + * additions. In particular, the schema order is <b>not</b> defined by + * the projection list. (Another mechanism is required to reorder columns + * for the actual projection.) + * + * @return the row batch to send downstream + */ + + VectorContainer harvest(); + + /** + * The schema of the harvested batch. Valid until the start of the + * next batch. + * + * @return the extended schema of the harvested batch which includes + * any allocation hints used when creating the batch + */ + + TupleMetadata harvestSchema(); + + /** + * Called after all rows are returned, whether because no more data is + * available, or the caller wishes to cancel the current row batch + * and complete. + */ + + void close(); +} http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/ResultVectorCache.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/ResultVectorCache.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/ResultVectorCache.java new file mode 100644 index 0000000..6e32b5d --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/ResultVectorCache.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.rowSet; + +import org.apache.drill.exec.memory.BufferAllocator; +import org.apache.drill.exec.record.MaterializedField; +import org.apache.drill.exec.vector.ValueVector; + +/** + * Interface for a cache that implements "vector persistence" across + * multiple result set loaders. Allows a single scan operator to offer + * the same set of vectors even when data is read by a set of readers. + */ + +public interface ResultVectorCache { + BufferAllocator allocator(); + ValueVector addOrGet(MaterializedField colSchema); +} http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/RowSetLoader.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/RowSetLoader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/RowSetLoader.java new file mode 100644 index 0000000..070e9a9 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/RowSetLoader.java @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.rowSet; + +import org.apache.drill.exec.vector.accessor.TupleWriter; + +/** + * Interface for writing values to a row set. Only available for newly-created + * single row sets. + * <p> + * Typical usage: + * + * <pre></code> + * void writeABatch() { + * RowSetLoader writer = ... + * while (! writer.isFull()) { + * writer.start(); + * writer.scalar(0).setInt(10); + * writer.scalar(1).setString("foo"); + * ... + * writer.save(); + * } + * }</code></pre> + * Alternative usage: + * + * <pre></code> + * void writeABatch() { + * RowSetLoader writer = ... + * while (writer.start()) { + * writer.scalar(0).setInt(10); + * writer.scalar(1).setString("foo"); + * ... + * writer.save(); + * } + * }</code></pre> + * + * The above writes until the batch is full, based on size or vector overflow. + * That is, the details of vector overflow are hidden from the code that calls + * the writer. + */ + +public interface RowSetLoader extends TupleWriter { + + ResultSetLoader loader(); + + /** + * Write a row of values, given by Java objects. Object type must match + * expected column type. Stops writing, and returns false, if any value causes + * vector overflow. Value format: + * <ul> + * <li>For scalars, the value as a suitable Java type (int or Integer, say, + * for <tt>INTEGER</tt> values.)</li> + * <li>For scalar arrays, an array of a suitable Java primitive type for + * scalars. For example, <tt>int[]</tt> for an <tt>INTEGER</tt> column.</li> + * <li>For a Map, an <tt>Object<tt> array with values encoded as above. + * (In fact, the list here is the same as the map format.</li> + * <li>For a list (repeated map, list of list), an <tt>Object</tt> array with + * values encoded as above. (So, for a repeated map, an outer <tt>Object</tt> + * map encodes the array, an inner one encodes the map members.</li> + * </ul> + * + * @param values + * variable-length argument list of column values + */ + + RowSetLoader addRow(Object... values); + + /** + * Indicates that no more rows fit into the current row batch and that the row + * batch should be harvested and sent downstream. Any overflow row is + * automatically saved for the next cycle. The value is undefined when a batch + * is not active. + * <p> + * Will be false on the first row, and all subsequent rows until either the + * maximum number of rows are written, or a vector overflows. After that, will + * return true. The method returns false as soon as any column writer + * overflows even in the middle of a row write. That is, this writer does not + * automatically handle overflow rows because that added complexity is seldom + * needed for tests. + * + * @return true if another row can be written, false if not + */ + + boolean isFull(); + + /** + * The number of rows in the current row set. Does not count any overflow row + * saved for the next batch. + * + * @return number of rows to be sent downstream + */ + + int rowCount(); + + /** + * The index of the current row. Same as the row count except in an overflow + * row in which case the row index will revert to zero as soon as any vector + * overflows. Note: this means that the index can change between columns in a + * single row. Applications usually don't use this index directly; rely on the + * writers to write to the proper location. + * + * @return the current write index + */ + + int rowIndex(); + + /** + * Prepare a new row for writing. Call this before each row. + * <p> + * Handles a very special case: that of discarding the last row written. + * A reader can read a row into vectors, then "sniff" the row to check, + * for example, against a filter. If the row is not wanted, simply omit + * the call to <tt>save()</tt> and the next all to <tt>start()</tt> will + * discard the unsaved row. + * <p> + * Note that the vectors still contain values in the + * discarded position; just the various pointers are unset. If + * the batch ends before the discarded values are overwritten, the + * discarded values just exist at the end of the vector. Since vectors + * start with garbage contents, the discarded values are simply a different + * kind of garbage. But, if the client writes a new row, then the new + * row overwrites the discarded row. This works because we only change + * the tail part of a vector; never the internals. + * + * @return true if another row can be added, false if the batch is full + */ + + boolean start(); + + /** + * Saves the current row and moves to the next row. Failing to call this + * method effectively abandons the in-flight row; something that may be useful + * to recover from partially-written rows that turn out to contain errors. + * Done automatically if using <tt>setRow()</tt>. + */ + + void save(); +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/ColumnState.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/ColumnState.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/ColumnState.java new file mode 100644 index 0000000..f3626d9 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/ColumnState.java @@ -0,0 +1,358 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.rowSet.impl; + +import java.util.ArrayList; + +import org.apache.drill.common.exceptions.UserException; +import org.apache.drill.exec.physical.rowSet.impl.SingleVectorState.OffsetVectorState; +import org.apache.drill.exec.physical.rowSet.impl.TupleState.MapState; +import org.apache.drill.exec.record.ColumnMetadata; +import org.apache.drill.exec.vector.UInt4Vector; +import org.apache.drill.exec.vector.ValueVector; +import org.apache.drill.exec.vector.accessor.impl.HierarchicalFormatter; +import org.apache.drill.exec.vector.accessor.writer.AbstractArrayWriter; +import org.apache.drill.exec.vector.accessor.writer.AbstractObjectWriter; +import org.apache.drill.exec.vector.accessor.writer.ColumnWriterFactory; +import org.apache.drill.exec.vector.complex.BaseRepeatedValueVector; + +/** + * Represents the write-time state for a column including the writer and the (optional) + * backing vector. Implements per-column operations such as vector overflow. If a column + * is a (possibly repeated) map, then the column state will hold a tuple state. + * <p> + * If a column is not projected, then the writer exists (to make life easier for the + * reader), but there will be no vector backing the writer. + * <p> + * Different columns need different kinds of vectors: a data vector, possibly an offset + * vector, or even a non-existent vector. The {@link VectorState} class abstracts out + * these diffrences. + */ + +public abstract class ColumnState { + + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ColumnState.class); + + public static abstract class BaseMapColumnState extends ColumnState { + protected final MapState mapState; + + public BaseMapColumnState(ResultSetLoaderImpl resultSetLoader, + AbstractObjectWriter writer, VectorState vectorState, + ProjectionSet projectionSet) { + super(resultSetLoader, writer, vectorState); + mapState = new MapState(resultSetLoader, this, projectionSet); + } + + @Override + public void rollover() { + super.rollover(); + mapState.rollover(); + } + + @Override + public void startBatch() { + super.startBatch(); + mapState.startBatch(); + } + + @Override + public void harvestWithLookAhead() { + super.harvestWithLookAhead(); + mapState.harvestWithLookAhead(); + } + + @Override + public void close() { + super.close(); + mapState.close(); + } + + public MapState mapState() { return mapState; } + } + + public static class MapColumnState extends BaseMapColumnState { + + public MapColumnState(ResultSetLoaderImpl resultSetLoader, + ColumnMetadata columnSchema, + ProjectionSet projectionSet) { + super(resultSetLoader, + ColumnWriterFactory.buildMap(columnSchema, null, + new ArrayList<AbstractObjectWriter>()), + new NullVectorState(), + projectionSet); + } + + @Override + public void updateCardinality(int cardinality) { + super.updateCardinality(cardinality); + mapState.updateCardinality(cardinality); + } + } + + public static class MapArrayColumnState extends BaseMapColumnState { + + public MapArrayColumnState(ResultSetLoaderImpl resultSetLoader, + AbstractObjectWriter writer, + VectorState vectorState, + ProjectionSet projectionSet) { + super(resultSetLoader, writer, + vectorState, + projectionSet); + } + + @SuppressWarnings("resource") + public static MapArrayColumnState build(ResultSetLoaderImpl resultSetLoader, + ColumnMetadata columnSchema, + ProjectionSet projectionSet) { + + // Create the map's offset vector. + + UInt4Vector offsetVector = new UInt4Vector( + BaseRepeatedValueVector.OFFSETS_FIELD, + resultSetLoader.allocator()); + + // Create the writer using the offset vector + + AbstractObjectWriter writer = ColumnWriterFactory.buildMapArray( + columnSchema, offsetVector, + new ArrayList<AbstractObjectWriter>()); + + // Wrap the offset vector in a vector state + + VectorState vectorState = new OffsetVectorState( + ((AbstractArrayWriter) writer.array()).offsetWriter(), + offsetVector, + (AbstractObjectWriter) writer.array().entry()); + + // Assemble it all into the column state. + + return new MapArrayColumnState(resultSetLoader, + writer, vectorState, projectionSet); + } + + @Override + public void updateCardinality(int cardinality) { + super.updateCardinality(cardinality); + int childCardinality = cardinality * schema().expectedElementCount(); + mapState.updateCardinality(childCardinality); + } + } + + /** + * Columns move through various lifecycle states as identified by this + * enum. (Yes, sorry that the term "state" is used in two different ways + * here: the variables for a column and the point within the column + * lifecycle. + */ + + protected enum State { + + /** + * Column is in the normal state of writing with no overflow + * in effect. + */ + + NORMAL, + + /** + * Like NORMAL, but means that the data has overflowed and the + * column's data for the current row appears in the new, + * overflow batch. For a client that omits some columns, written + * columns will be in OVERFLOW state, unwritten columns in + * NORMAL state. + */ + + OVERFLOW, + + /** + * Indicates that the column has data saved + * in the overflow batch. + */ + + LOOK_AHEAD, + + /** + * Like LOOK_AHEAD, but indicates the special case that the column + * was added after overflow, so there is no vector for the column + * in the harvested batch. + */ + + NEW_LOOK_AHEAD + } + + protected final ResultSetLoaderImpl resultSetLoader; + protected final int addVersion; + protected final VectorState vectorState; + protected State state; + protected AbstractObjectWriter writer; + + /** + * Cardinality of the value itself. If this is an array, + * then this is the number of arrays. A separate number, + * the inner cardinality, is computed as the outer cardinality + * times the expected array count (from metadata.) The inner + * cardinality is the total number of array items in the + * vector. + */ + + protected int outerCardinality; + + public ColumnState(ResultSetLoaderImpl resultSetLoader, + AbstractObjectWriter writer, VectorState vectorState) { + this.resultSetLoader = resultSetLoader; + this.vectorState = vectorState; + this.addVersion = resultSetLoader.bumpVersion(); + state = resultSetLoader.hasOverflow() ? + State.NEW_LOOK_AHEAD : State.NORMAL; + this.writer = writer; + } + + public AbstractObjectWriter writer() { return writer; } + public ColumnMetadata schema() { return writer.schema(); } + + public ValueVector vector() { return vectorState.vector(); } + + public void allocateVectors() { + assert outerCardinality != 0; + resultSetLoader.tallyAllocations( + vectorState.allocate(outerCardinality)); + } + + /** + * Prepare the column for a new row batch after overflow on the previous + * batch. Restore the look-ahead buffer to the + * active vector so we start writing where we left off. + */ + + public void startBatch() { + switch (state) { + case NORMAL: + resultSetLoader.tallyAllocations(vectorState.allocate(outerCardinality)); + break; + + case NEW_LOOK_AHEAD: + + // Column is new, was not exchanged with backup vector + + break; + + case LOOK_AHEAD: + + // Restore the look-ahead values to the main vector. + + vectorState.startBatchWithLookAhead(); + break; + + default: + throw new IllegalStateException("Unexpected state: " + state); + } + + // In all cases, we are back to normal writing. + + state = State.NORMAL; + } + + /** + * A column within the row batch overflowed. Prepare to absorb the rest of the + * in-flight row by rolling values over to a new vector, saving the complete + * vector for later. This column could have a value for the overflow row, or + * for some previous row, depending on exactly when and where the overflow + * occurs. + */ + + public void rollover() { + assert state == State.NORMAL; + + // If the source index is 0, then we could not fit this one + // value in the original vector. Nothing will be accomplished by + // trying again with an an overflow vector. Just fail. + // + // Note that this is a judgment call. It is possible to allow the + // vector to double beyond the limit, but that will require a bit + // of thought to get right -- and, of course, completely defeats + // the purpose of limiting vector size to avoid memory fragmentation... + + if (resultSetLoader.writerIndex().vectorIndex() == 0) { + throw UserException + .memoryError("A single column value is larger than the maximum allowed size of 16 MB") + .build(logger); + } + + // Otherwise, do the roll-over to a look-ahead vector. + + vectorState.rollover(outerCardinality); + + // Remember that we did this overflow processing. + + state = State.OVERFLOW; + } + + /** + * Writing of a row batch is complete. Prepare the vector for harvesting + * to send downstream. If this batch encountered overflow, set aside the + * look-ahead vector and put the full vector buffer back into the active + * vector. + */ + + public void harvestWithLookAhead() { + switch (state) { + case NEW_LOOK_AHEAD: + + // If added after overflow, no data to save from the complete + // batch: the vector does not appear in the completed batch. + + break; + + case OVERFLOW: + + // Otherwise, restore the original, full buffer and + // last write position. + + vectorState.harvestWithLookAhead(); + + // Remember that we have look-ahead values stashed away in the + // backup vector. + + state = State.LOOK_AHEAD; + break; + + default: + throw new IllegalStateException("Unexpected state: " + state); + } + } + + public void close() { + vectorState.reset(); + } + + public void updateCardinality(int cardinality) { + outerCardinality = cardinality; + } + + public void dump(HierarchicalFormatter format) { + format + .startObject(this) + .attribute("addVersion", addVersion) + .attribute("state", state) + .attributeIdentity("writer", writer) + .attribute("vectorState") + ; + vectorState.dump(format); + format.endObject(); + } +} http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/NullProjectionSet.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/NullProjectionSet.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/NullProjectionSet.java new file mode 100644 index 0000000..2fcc813 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/NullProjectionSet.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.rowSet.impl; + +/** + * Represents a wildcard: SELECT * when used at the root tuple. + * When used with maps, means selection of all map columns, either + * implicitly, or because the map itself is selected. + */ + +public class NullProjectionSet implements ProjectionSet { + + private boolean allProjected; + + public NullProjectionSet(boolean allProjected) { + this.allProjected = allProjected; + } + + @Override + public boolean isProjected(String colName) { return allProjected; } + + @Override + public ProjectionSet mapProjection(String colName) { + return new NullProjectionSet(allProjected); + } +} http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/NullResultVectorCacheImpl.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/NullResultVectorCacheImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/NullResultVectorCacheImpl.java new file mode 100644 index 0000000..930dc30 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/NullResultVectorCacheImpl.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.rowSet.impl; + +import org.apache.drill.exec.expr.TypeHelper; +import org.apache.drill.exec.memory.BufferAllocator; +import org.apache.drill.exec.physical.rowSet.ResultVectorCache; +import org.apache.drill.exec.record.MaterializedField; +import org.apache.drill.exec.vector.ValueVector; + +public class NullResultVectorCacheImpl implements ResultVectorCache { + + private final BufferAllocator allocator; + + public NullResultVectorCacheImpl(BufferAllocator allocator) { + this.allocator = allocator; + } + + @Override + public BufferAllocator allocator() { return allocator; } + + @Override + public ValueVector addOrGet(MaterializedField colSchema) { + return TypeHelper.getNewVector(colSchema, allocator, null); + } +} http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/NullVectorState.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/NullVectorState.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/NullVectorState.java new file mode 100644 index 0000000..8372758 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/NullVectorState.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.rowSet.impl; + +import org.apache.drill.exec.vector.ValueVector; +import org.apache.drill.exec.vector.accessor.impl.HierarchicalFormatter; + +/** + * Do-nothing vector state for a map column which has no actual vector + * associated with it. + */ + +public class NullVectorState implements VectorState { + + @Override public int allocate(int cardinality) { return 0; } + @Override public void rollover(int cardinality) { } + @Override public void harvestWithLookAhead() { } + @Override public void startBatchWithLookAhead() { } + @Override public void reset() { } + @Override public ValueVector vector() { return null; } + + public static class UnmanagedVectorState extends NullVectorState { + ValueVector vector; + + public UnmanagedVectorState(ValueVector vector) { + this.vector = vector; + } + + @Override + public ValueVector vector() { return vector; } + } + + @Override + public void dump(HierarchicalFormatter format) { + format.startObject(this).endObject(); + } +} http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/NullableVectorState.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/NullableVectorState.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/NullableVectorState.java new file mode 100644 index 0000000..bf91032 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/NullableVectorState.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.rowSet.impl; + +import org.apache.drill.exec.physical.rowSet.impl.SingleVectorState.ValuesVectorState; +import org.apache.drill.exec.record.ColumnMetadata; +import org.apache.drill.exec.vector.FixedWidthVector; +import org.apache.drill.exec.vector.NullableVector; +import org.apache.drill.exec.vector.ValueVector; +import org.apache.drill.exec.vector.accessor.impl.HierarchicalFormatter; +import org.apache.drill.exec.vector.accessor.writer.AbstractObjectWriter; +import org.apache.drill.exec.vector.accessor.writer.AbstractScalarWriter; +import org.apache.drill.exec.vector.accessor.writer.NullableScalarWriter; + +public class NullableVectorState implements VectorState { + + public static class BitsVectorState extends ValuesVectorState { + + public BitsVectorState(ColumnMetadata schema, AbstractScalarWriter writer, ValueVector mainVector) { + super(schema, writer, mainVector); + } + + @Override + public int allocateVector(ValueVector vector, int cardinality) { + ((FixedWidthVector) vector).allocateNew(cardinality); + return vector.getBufferSize(); + } + } + + private final ColumnMetadata schema; + private final NullableScalarWriter writer; + private final NullableVector vector; + private final ValuesVectorState bitsState; + private final ValuesVectorState valuesState; + + public NullableVectorState(AbstractObjectWriter writer, NullableVector vector) { + this.schema = writer.schema(); + this.vector = vector; + + this.writer = (NullableScalarWriter) writer.scalar(); + bitsState = new BitsVectorState(schema, this.writer.bitsWriter(), vector.getBitsVector()); + valuesState = new ValuesVectorState(schema, this.writer.baseWriter(), vector.getValuesVector()); + } + + @Override + public int allocate(int cardinality) { + return bitsState.allocate(cardinality) + + valuesState.allocate(cardinality); + } + + @Override + public void rollover(int cardinality) { + bitsState.rollover(cardinality); + valuesState.rollover(cardinality); + } + + @Override + public void harvestWithLookAhead() { + bitsState.harvestWithLookAhead(); + valuesState.harvestWithLookAhead(); + } + + @Override + public void startBatchWithLookAhead() { + bitsState.startBatchWithLookAhead(); + valuesState.startBatchWithLookAhead(); + } + + @Override + public void reset() { + bitsState.reset(); + valuesState.reset(); + } + + @Override + public ValueVector vector() { return vector; } + + @Override + public void dump(HierarchicalFormatter format) { + format + .startObject(this) + .attribute("schema", schema) + .attributeIdentity("writer", writer) + .attributeIdentity("vector", vector) + .attribute("bitsState"); + bitsState.dump(format); + format + .attribute("valuesState"); + valuesState.dump(format); + format + .endObject(); + } +} http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/OptionBuilder.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/OptionBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/OptionBuilder.java new file mode 100644 index 0000000..a743052 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/OptionBuilder.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.rowSet.impl; + +import java.util.Collection; + +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.exec.physical.rowSet.ResultVectorCache; +import org.apache.drill.exec.physical.rowSet.impl.ResultSetLoaderImpl.ResultSetOptions; +import org.apache.drill.exec.record.TupleMetadata; +import org.apache.drill.exec.vector.BaseValueVector; +import org.apache.drill.exec.vector.ValueVector; + +/** + * Builder for the options for the row set loader. Reasonable defaults + * are provided for all options; use these options for test code or + * for clients that don't need special settings. + */ + +public class OptionBuilder { + protected int vectorSizeLimit; + protected int rowCountLimit; + protected Collection<SchemaPath> projection; + protected ResultVectorCache vectorCache; + protected TupleMetadata schema; + protected long maxBatchSize; + + public OptionBuilder() { + ResultSetOptions options = new ResultSetOptions(); + vectorSizeLimit = options.vectorSizeLimit; + rowCountLimit = options.rowCountLimit; + maxBatchSize = options.maxBatchSize; + } + + /** + * Specify the maximum number of rows per batch. Defaults to + * {@link BaseValueVector#INITIAL_VALUE_ALLOCATION}. Batches end either + * when this limit is reached, or when a vector overflows, whichever + * occurs first. The limit is capped at + * {@link ValueVector#MAX_ROW_COUNT}. + * + * @param limit the row count limit + * @return this builder + */ + + public OptionBuilder setRowCountLimit(int limit) { + rowCountLimit = Math.max(1, + Math.min(limit, ValueVector.MAX_ROW_COUNT)); + return this; + } + + public OptionBuilder setBatchSizeLimit(int bytes) { + maxBatchSize = bytes; + return this; + } + + /** + * Record (batch) readers often read a subset of available table columns, + * but want to use a writer schema that includes all columns for ease of + * writing. (For example, a CSV reader must read all columns, even if the user + * wants a subset. The unwanted columns are simply discarded.) + * <p> + * This option provides a projection list, in the form of column names, for + * those columns which are to be projected. Only those columns will be + * backed by value vectors; non-projected columns will be backed by "null" + * writers that discard all values. + * + * @param projection the list of projected columns + * @return this builder + */ + + // TODO: Use SchemaPath in place of strings. + + public OptionBuilder setProjection(Collection<SchemaPath> projection) { + this.projection = projection; + return this; + } + + /** + * Downstream operators require "vector persistence": the same vector + * must represent the same column in every batch. For the scan operator, + * which creates multiple readers, this can be a challenge. The vector + * cache provides a transparent mechanism to enable vector persistence + * by returning the same vector for a set of independent readers. By + * default, the code uses a "null" cache which creates a new vector on + * each request. If a true cache is needed, the caller must provide one + * here. + */ + + public OptionBuilder setVectorCache(ResultVectorCache vectorCache) { + this.vectorCache = vectorCache; + return this; + } + + /** + * Clients can use the row set builder in several ways: + * <ul> + * <li>Provide the schema up front, when known, by using this method to + * provide the schema.</li> + * <li>Discover the schema on the fly, adding columns during the write + * operation. Leave this method unset to start with an empty schema.</li> + * <li>A combination of the above.</li> + * </ul> + * @param schema the initial schema for the loader + * @return this builder + */ + + public OptionBuilder setSchema(TupleMetadata schema) { + this.schema = schema; + return this; + } + + // TODO: No setter for vector length yet: is hard-coded + // at present in the value vector. + + public ResultSetOptions build() { + return new ResultSetOptions(this); + } +} http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/PrimitiveColumnState.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/PrimitiveColumnState.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/PrimitiveColumnState.java new file mode 100644 index 0000000..c97ec18 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/PrimitiveColumnState.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.rowSet.impl; + +import org.apache.drill.exec.physical.rowSet.impl.SingleVectorState.ValuesVectorState; +import org.apache.drill.exec.vector.NullableVector; +import org.apache.drill.exec.vector.ValueVector; +import org.apache.drill.exec.vector.accessor.ScalarWriter; +import org.apache.drill.exec.vector.accessor.ScalarWriter.ColumnWriterListener; +import org.apache.drill.exec.vector.accessor.impl.HierarchicalFormatter; +import org.apache.drill.exec.vector.accessor.writer.AbstractObjectWriter; +import org.apache.drill.exec.vector.accessor.writer.AbstractScalarWriter; +import org.apache.drill.exec.vector.complex.RepeatedValueVector; + +/** + * Primitive (non-map) column state. Handles all three cardinalities. + * Column metadata is hosted on the writer. + */ + +public class PrimitiveColumnState extends ColumnState implements ColumnWriterListener { + + public PrimitiveColumnState(ResultSetLoaderImpl resultSetLoader, + AbstractObjectWriter colWriter, + VectorState vectorState) { + super(resultSetLoader, colWriter, vectorState); + writer.bindListener(this); + } + + public static PrimitiveColumnState newPrimitive( + ResultSetLoaderImpl resultSetLoader, + ValueVector vector, + AbstractObjectWriter writer) { + VectorState vectorState; + if (vector == null) { + vectorState = new NullVectorState(); + } else { + vectorState = new ValuesVectorState( + writer.schema(), + (AbstractScalarWriter) writer.scalar(), + vector); + } + return new PrimitiveColumnState(resultSetLoader, writer, + vectorState); + } + + public static PrimitiveColumnState newNullablePrimitive( + ResultSetLoaderImpl resultSetLoader, + ValueVector vector, + AbstractObjectWriter writer) { + VectorState vectorState; + if (vector == null) { + vectorState = new NullVectorState(); + } else { + vectorState = new NullableVectorState( + writer, + (NullableVector) vector); + } + return new PrimitiveColumnState(resultSetLoader, writer, + vectorState); + } + + public static PrimitiveColumnState newPrimitiveArray( + ResultSetLoaderImpl resultSetLoader, + ValueVector vector, + AbstractObjectWriter writer) { + VectorState vectorState; + if (vector == null) { + vectorState = new NullVectorState(); + } else { + vectorState = new RepeatedVectorState(writer, (RepeatedValueVector) vector); + } + return new PrimitiveColumnState(resultSetLoader, writer, + vectorState); + } + + @Override + public void overflowed(ScalarWriter writer) { + resultSetLoader.overflowed(); + } + + @Override + public void dump(HierarchicalFormatter format) { + // TODO Auto-generated method stub + } + + @Override + public boolean canExpand(ScalarWriter writer, int delta) { + return resultSetLoader.canExpand(delta); + } +} http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/ProjectionSet.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/ProjectionSet.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/ProjectionSet.java new file mode 100644 index 0000000..9ea118f --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/ProjectionSet.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.rowSet.impl; + +/** + * Represents the set of columns projected for a tuple (row or map.) + * The projected columns might themselves be columns, so returns a + * projection set for such columns. + * <p> + * Three implementations exist: + * <ul> + * <li>Project all ({@link NullProjectionSet): used for a tuple when + * all columns are projected. Example: the root tuple (the row) in + * a <tt>SELECT *</tt> query.</li> + * <li>Project none (also {@link NullProjectionSet): used when no + * columns are projected from a tuple, such as when a map itself is + * not projected, so none of its member columns are projected.</li> + * <li>Project some ({@link ProjectionSetImpl}: used in the + * <tt>SELECT a, c, e</tt> case in which the query identifies which + * columns to project (implicitly leaving out others, such as b and + * d in our example.)</li> + * </ul> + * <p> + * The result is that each tuple (row and map) has an associated + * projection set which the code can query to determine if a newly + * added column is wanted (and so should have a backing vector) or + * is unwanted (and can just receive a dummy writer.) + */ + +interface ProjectionSet { + boolean isProjected(String colName); + ProjectionSet mapProjection(String colName); +} http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/ProjectionSetImpl.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/ProjectionSetImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/ProjectionSetImpl.java new file mode 100644 index 0000000..e17f486 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/ProjectionSetImpl.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.rowSet.impl; + +import java.util.Collection; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import org.apache.drill.common.expression.PathSegment; +import org.apache.drill.common.expression.PathSegment.NameSegment; +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.common.map.CaseInsensitiveMap; + +/** + * Represents an explicit projection at some tuple level. + * <p> + * A column is projected if it is explicitly listed in the selection list. + * <p> + * If a column is a map, then the projection for the map's columns is based on + * two rules: + * <ol> + * <li>If the projection list includes at least one explicit mention of a map + * member, then include only those columns explicitly listed.</li> + * <li>If the projection at the parent level lists only the map column itself + * (which the projection can't know is a map), then assume this implies all + * columns, as if the entry where "map.*".</li> + * </ol> + * <p> + * Examples:<br> + * <code>m</code><br> + * If m turns out to be a map, project all members of m.<br> + * <code>m.a</code><br> + * Column m must be a map. Project only column a.<br> + * <code>m, m.a</code><br> + * Tricky case. We interpret this as projecting only the "a" element of map m. + * <p> + * The projection set is build from a list of columns, represented as + * {@link SchemaPath} objects, provided by the physical plan. The structure of + * <tt>SchemaPath</tt> is a bit awkward: + * <p> + * <ul> + * <li><tt>SchemaPath> is a wrapper for a column which directly holds the + * <tt>NameSegment</tt> for the top-level column.</li> + * <li><tt>NameSegment</tt> holds a name. This can be a top name such as + * `a`, or parts of a compound name such as `a`.`b`. Each <tt>NameSegment</tt> + * has a "child" that points to the option following parts of the name.</li> + * <li><PathSegment</tt> is the base class for the parts of a name.</tt> + * <li><tt>ArraySegment</tt> is the other kind of name part and represents + * an array index such as the "[1]" in `columns`[1].</li> + * <ul> + * The parser here consumes only names, this mechanism does not consider + * array indexes. As a result, there may be multiple projected columns that + * map to the same projection here: `columns`[1] and `columns`[2] both map to + * the name `columns`, for example. + */ + +public class ProjectionSetImpl implements ProjectionSet { + + Set<String> projection = new HashSet<>(); + Map<String, ProjectionSetImpl> mapProjections = CaseInsensitiveMap + .newHashMap(); + + @Override + public boolean isProjected(String colName) { + return projection.contains(colName.toLowerCase()); + } + + @Override + public ProjectionSet mapProjection(String colName) { + ProjectionSet mapProj = mapProjections.get(colName.toLowerCase()); + if (mapProj != null) { + return mapProj; + } + + // No explicit information for the map. Members inherit the + // same projection as the map itself. + + return new NullProjectionSet(isProjected(colName)); + } + + /** + * Parse a projection list. The list should consist of a list of column + * names; any wildcards should have been processed by the caller. An + * empty or null list means everything is projected (that is, an + * empty list here is equivalent to a wildcard in the SELECT + * statement.) + * + * @param projList + * @return + */ + public static ProjectionSet parse(Collection<SchemaPath> projList) { + if (projList == null || projList.isEmpty()) { + return new NullProjectionSet(true); + } + ProjectionSetImpl projSet = new ProjectionSetImpl(); + for (SchemaPath col : projList) { + projSet.addSegment(col.getRootSegment()); + } + return projSet; + } + + private void addSegment(NameSegment rootSegment) { + String rootKey = rootSegment.getPath().toLowerCase(); + projection.add(rootKey); + PathSegment child = rootSegment.getChild(); + if (child == null) { + return; + } + if (child.isArray()) { + // Ignore the [x] array suffix. + return; + } + ProjectionSetImpl map = mapProjections.get(rootKey); + if (map == null) { + map = new ProjectionSetImpl(); + mapProjections.put(rootKey, map); + } + map.addSegment((NameSegment) child); + } +} http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/RepeatedVectorState.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/RepeatedVectorState.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/RepeatedVectorState.java new file mode 100644 index 0000000..98b6beb --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/RepeatedVectorState.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.rowSet.impl; + +import org.apache.drill.exec.physical.rowSet.impl.SingleVectorState.OffsetVectorState; +import org.apache.drill.exec.physical.rowSet.impl.SingleVectorState.ValuesVectorState; +import org.apache.drill.exec.record.ColumnMetadata; +import org.apache.drill.exec.vector.ValueVector; +import org.apache.drill.exec.vector.accessor.impl.HierarchicalFormatter; +import org.apache.drill.exec.vector.accessor.writer.AbstractArrayWriter; +import org.apache.drill.exec.vector.accessor.writer.AbstractObjectWriter; +import org.apache.drill.exec.vector.accessor.writer.AbstractScalarWriter; +import org.apache.drill.exec.vector.complex.RepeatedValueVector; + +/** + * Vector state for a scalar array (repeated scalar) vector. Manages both the + * offsets vector and data vector during overflow and other operations. + */ + +public class RepeatedVectorState implements VectorState { + private final ColumnMetadata schema; + private final AbstractArrayWriter arrayWriter; + private final RepeatedValueVector vector; + private final OffsetVectorState offsetsState; + private final ValuesVectorState valuesState; + + public RepeatedVectorState(AbstractObjectWriter writer, RepeatedValueVector vector) { + this.schema = writer.schema(); + + // Get the repeated vector + + this.vector = vector; + + // Create the values state using the value (data) portion of the repeated + // vector, and the scalar (value) portion of the array writer. + + arrayWriter = (AbstractArrayWriter) writer.array(); + AbstractScalarWriter colWriter = (AbstractScalarWriter) arrayWriter.scalar(); + valuesState = new ValuesVectorState(schema, colWriter, vector.getDataVector()); + + // Create the offsets state with the offset vector portion of the repeated + // vector, and the offset writer portion of the array writer. + + offsetsState = new OffsetVectorState(arrayWriter.offsetWriter(), + vector.getOffsetVector(), + (AbstractObjectWriter) arrayWriter.entry()); + } + + @Override + public ValueVector vector() { return vector; } + + @Override + public int allocate(int cardinality) { + return offsetsState.allocate(cardinality) + + valuesState.allocate(childCardinality(cardinality)); + } + + private int childCardinality(int cardinality) { + return cardinality * schema.expectedElementCount(); + } + + /** + * The column is a scalar or an array of scalars. We need to roll over both the column + * values and the offsets that point to those values. The index provided is + * the index into the offset vector. We use that to obtain the index of the + * values to roll-over. + * <p> + * Data structure: + * <p><pre></code> + * RepeatedVectorState (this class) + * +- OffsetVectorState + * . +- OffsetVectorWriter (A) + * . +- Offset vector (B) + * . +- Backup (e.g. look-ahead) offset vector + * +- ValuesVectorState + * . +- Scalar (element) writer (C) + * . +- Data (elements) vector (D) + * . +- Backup elements vector + * +- Array Writer + * . +- ColumnWriterIndex (for array as a whole) + * . +- OffsetVectorWriter (A) + * . . +- Offset vector (B) + * . +- ArrayElementWriterIndex + * . +- ScalarWriter (D) + * . . +- ArrayElementWriterIndex + * . . +- Scalar vector (D) + * </code></pre> + * <p> + * The top group of objects point into the writer objects in the second + * group. Letters in parens show the connections. + * <p> + * To perform the roll-over, we must: + * <ul> + * <li>Copy values from the current vectors to a set of new, look-ahead + * vectors.</li> + * <li>Swap buffers between the main and "backup" vectors, effectively + * moving the "full" batch to the sidelines, putting the look-ahead vectors + * into play in order to finish writing the current row.</li> + * <li>Update the writers to point to the look-ahead buffers, including + * the initial set of data copied into those vectors.</li> + * <li>Update the vector indexes to point to the next write positions + * after the values copied during roll-over.</li> + * </ul> + * + * @param cardinality the number of outer elements to create in the look-ahead + * vector + */ + + @Override + public void rollover(int cardinality) { + + // Swap out the two vectors. The index presented to the caller + // is that of the data vector: the next position in the data + // vector to be set into the data vector writer index. + + valuesState.rollover(childCardinality(cardinality)); + offsetsState.rollover(cardinality); + } + + @Override + public void harvestWithLookAhead() { + offsetsState.harvestWithLookAhead(); + valuesState.harvestWithLookAhead(); + } + + @Override + public void startBatchWithLookAhead() { + offsetsState.startBatchWithLookAhead(); + valuesState.startBatchWithLookAhead(); + } + + @Override + public void reset() { + offsetsState.reset(); + valuesState.reset(); + } + + @Override + public void dump(HierarchicalFormatter format) { + format + .startObject(this) + .attribute("schema", schema) + .attributeIdentity("writer", arrayWriter) + .attributeIdentity("vector", vector) + .attribute("offsetsState"); + offsetsState.dump(format); + format + .attribute("valuesState"); + valuesState.dump(format); + format + .endObject(); + } +}
