http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/ResultSetLoaderImpl.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/ResultSetLoaderImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/ResultSetLoaderImpl.java new file mode 100644 index 0000000..b875e7e --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/ResultSetLoaderImpl.java @@ -0,0 +1,775 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.rowSet.impl; + +import java.util.Collection; + +import org.apache.drill.common.exceptions.UserException; +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.exec.memory.BufferAllocator; +import org.apache.drill.exec.physical.rowSet.ResultSetLoader; +import org.apache.drill.exec.physical.rowSet.ResultVectorCache; +import org.apache.drill.exec.physical.rowSet.RowSetLoader; +import org.apache.drill.exec.physical.rowSet.impl.TupleState.RowState; +import org.apache.drill.exec.record.TupleMetadata; +import org.apache.drill.exec.record.VectorContainer; +import org.apache.drill.exec.vector.ValueVector; +import org.apache.drill.exec.vector.accessor.impl.HierarchicalFormatter; + +/** + * Implementation of the result set loader. + * @see {@link ResultSetLoader} + */ + +public class ResultSetLoaderImpl implements ResultSetLoader { + + /** + * Read-only set of options for the result set loader. + */ + + public static class ResultSetOptions { + public final int vectorSizeLimit; + public final int rowCountLimit; + public final ResultVectorCache vectorCache; + public final Collection<SchemaPath> projection; + public final TupleMetadata schema; + public final long maxBatchSize; + + public ResultSetOptions() { + vectorSizeLimit = ValueVector.MAX_BUFFER_SIZE; + rowCountLimit = DEFAULT_ROW_COUNT; + projection = null; + vectorCache = null; + schema = null; + maxBatchSize = -1; + } + + public ResultSetOptions(OptionBuilder builder) { + this.vectorSizeLimit = builder.vectorSizeLimit; + this.rowCountLimit = builder.rowCountLimit; + this.projection = builder.projection; + this.vectorCache = builder.vectorCache; + this.schema = builder.schema; + this.maxBatchSize = builder.maxBatchSize; + } + + public void dump(HierarchicalFormatter format) { + format + .startObject(this) + .attribute("vectorSizeLimit", vectorSizeLimit) + .attribute("rowCountLimit", rowCountLimit) + .attribute("projection", projection) + .endObject(); + } + } + + private enum State { + /** + * Before the first batch. + */ + + START, + + /** + * Writing to a batch normally. + */ + + ACTIVE, + + /** + * Batch overflowed a vector while writing. Can continue + * to write to a temporary "overflow" batch until the + * end of the current row. + */ + + OVERFLOW, + + /** + * Temporary state to avoid batch-size related overflow while + * an overflow is in progress. + */ + + IN_OVERFLOW, + + /** + * Batch is full due to reaching the row count limit + * when saving a row. + * No more writes allowed until harvesting the current batch. + */ + + FULL_BATCH, + + /** + * Current batch was harvested: data is gone. No lookahead + * batch exists. + */ + + HARVESTED, + + /** + * Current batch was harvested and its data is gone. However, + * overflow occurred during that batch and the data exists + * in the overflow vectors. + * <p> + * This state needs special consideration. The column writer + * structure maintains its state (offsets, etc.) from the OVERFLOW + * state, but the buffers currently in the vectors are from the + * complete batch. <b>No writes can be done in this state!</b> + * The writer state does not match the data in the buffers. + * The code here does what it can to catch this state. But, if + * some client tries to write to a column writer in this state, + * bad things will happen. Doing so is invalid (the write is outside + * of a batch), so this is not a terrible restriction. + * <p> + * Said another way, the current writer state is invalid with respect + * to the active buffers, but only if the writers try to act on the + * buffers. Since the writers won't do so, this temporary state is + * fine. The correct buffers are restored once a new batch is started + * and the state moves to ACTIVE. + */ + + LOOK_AHEAD, + + /** + * Mutator is closed: no more operations are allowed. + */ + + CLOSED + } + + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ResultSetLoaderImpl.class); + + /** + * Options provided to this loader. + */ + + private final ResultSetOptions options; + + /** + * Allocator for vectors created by this loader. + */ + + final BufferAllocator allocator; + + /** + * Internal structure used to work with the vectors (real or dummy) used + * by this loader. + */ + + final RowState rootState; + + /** + * Top-level writer index that steps through the rows as they are written. + * When an overflow batch is in effect, indexes into that batch instead. + * Since a batch is really a tree of tuples, in which some branches of + * the tree are arrays, the root indexes here feeds into array indexes + * within the writer structure that points to the current position within + * an array column. + */ + + private final WriterIndexImpl writerIndex; + + /** + * The row-level writer for stepping through rows as they are written, + * and for accessing top-level columns. + */ + + private final RowSetLoaderImpl rootWriter; + + /** + * Vector cache for this loader. + * @see {@link OptionBuilder#setVectorCache()}. + */ + + private final ResultVectorCache vectorCache; + + /** + * Tracks the state of the row set loader. Handling vector overflow requires + * careful stepping through a variety of states as the write proceeds. + */ + + private State state = State.START; + + /** + * Track the current schema as seen by the writer. Each addition of a column + * anywhere in the schema causes the active schema version to increase by one. + * This allows very easy checks for schema changes: save the prior version number + * and compare it against the current version number. + */ + + private int activeSchemaVersion; + + /** + * Track the current schema as seen by the consumer of the batches that this + * loader produces. The harvest schema version can be behind the active schema + * version in the case in which new columns are added to the overflow row. + * Since the overflow row won't be visible to the harvested batch, that batch + * sees the schema as it existed at a prior version: the harvest schema + * version. + */ + + private int harvestSchemaVersion; + + /** + * Builds the harvest vector container that includes only the columns that + * are included in the harvest schema version. That is, it excludes columns + * added while writing the overflow row. + */ + + private VectorContainerBuilder containerBuilder; + + /** + * Counts the batches harvested (sent downstream) from this loader. Does + * not include the current, in-flight batch. + */ + + private int harvestBatchCount; + + /** + * Counts the rows included in previously-harvested batches. Does not + * include the number of rows in the current batch. + */ + + private int previousRowCount; + + /** + * Number of rows in the harvest batch. If an overflow batch is in effect, + * then this is the number of rows in the "main" batch before the overflow; + * that is the number of rows in the batch that will be harvested. If no + * overflow row is in effect, then this number is undefined (and should be + * zero.) + */ + + private int pendingRowCount; + + /** + * The number of rows per batch. Starts with the configured amount. Can be + * adjusted between batches, perhaps based on the actual observed size of + * input data. + */ + + private int targetRowCount; + + /** + * Total bytes allocated to the current batch. + */ + + protected int accumulatedBatchSize; + + protected final ProjectionSet projectionSet; + + public ResultSetLoaderImpl(BufferAllocator allocator, ResultSetOptions options) { + this.allocator = allocator; + this.options = options; + targetRowCount = options.rowCountLimit; + writerIndex = new WriterIndexImpl(this); + + if (options.vectorCache == null) { + vectorCache = new NullResultVectorCacheImpl(allocator); + } else { + vectorCache = options.vectorCache; + } + + // If projection, build the projection map. + + projectionSet = ProjectionSetImpl.parse(options.projection); + + // Build the row set model depending on whether a schema is provided. + + rootState = new RowState(this); + rootWriter = rootState.rootWriter(); + + // If no schema, columns will be added incrementally as they + // are discovered. Start with an empty model. + + if (options.schema != null) { + + // Schema provided. Populate a model (and create vectors) for the + // provided schema. The schema can be extended later, but normally + // won't be if known up front. + + logger.debug("Schema: " + options.schema.toString()); + rootState.buildSchema(options.schema); + } + } + + private void updateCardinality() { + rootState.updateCardinality(targetRowCount()); + } + + public ResultSetLoaderImpl(BufferAllocator allocator) { + this(allocator, new ResultSetOptions()); + } + + public BufferAllocator allocator() { return allocator; } + + protected int bumpVersion() { + + // Update the active schema version. We cannot update the published + // schema version at this point because a column later in this same + // row might cause overflow, and any new columns in this row will + // be hidden until a later batch. But, if we are between batches, + // then it is fine to add the column to the schema. + + activeSchemaVersion++; + switch (state) { + case HARVESTED: + case START: + case LOOK_AHEAD: + harvestSchemaVersion = activeSchemaVersion; + break; + default: + break; + + } + return activeSchemaVersion; + } + + @Override + public int schemaVersion() { return harvestSchemaVersion; } + + @Override + public void startBatch() { + switch (state) { + case HARVESTED: + case START: + logger.trace("Start batch"); + accumulatedBatchSize = 0; + updateCardinality(); + rootState.startBatch(); + checkInitialAllocation(); + + // The previous batch ended without overflow, so start + // a new batch, and reset the write index to 0. + + writerIndex.reset(); + rootWriter.startWrite(); + break; + + case LOOK_AHEAD: + + // A row overflowed so keep the writer index at its current value + // as it points to the second row in the overflow batch. However, + // the last write position of each writer must be restored on + // a column-by-column basis, which is done by the visitor. + + logger.trace("Start batch after overflow"); + rootState.startBatch(); + + // Note: no need to do anything with the writers; they were left + // pointing to the correct positions in the look-ahead batch. + // The above simply puts the look-ahead vectors back "under" + // the writers. + + break; + + default: + throw new IllegalStateException("Unexpected state: " + state); + } + + // Update the visible schema with any pending overflow batch + // updates. + + harvestSchemaVersion = activeSchemaVersion; + pendingRowCount = 0; + state = State.ACTIVE; + } + + @Override + public RowSetLoader writer() { + if (state == State.CLOSED) { + throw new IllegalStateException("Unexpected state: " + state); + } + return rootWriter; + } + + @Override + public ResultSetLoader setRow(Object... values) { + startRow(); + writer().setTuple(values); + saveRow(); + return this; + } + + /** + * Called before writing a new row. Implementation of + * {@link RowSetLoader#start()}. + */ + + protected void startRow() { + switch (state) { + case ACTIVE: + + // Update the visible schema with any pending overflow batch + // updates. + + harvestSchemaVersion = activeSchemaVersion; + rootWriter.startRow(); + break; + default: + throw new IllegalStateException("Unexpected state: " + state); + } + } + + /** + * Finalize the current row. Implementation of + * {@link RowSetLoader#save()}. + */ + + protected void saveRow() { + switch (state) { + case ACTIVE: + rootWriter.endArrayValue(); + rootWriter.saveRow(); + if (! writerIndex.next()) { + state = State.FULL_BATCH; + } + + // No overflow row. Advertise the schema version to the client. + + harvestSchemaVersion = activeSchemaVersion; + break; + + case OVERFLOW: + + // End the value of the look-ahead row in the look-ahead vectors. + + rootWriter.endArrayValue(); + rootWriter.saveRow(); + + // Advance the writer index relative to the look-ahead batch. + + writerIndex.next(); + + // Stay in the overflow state. Doing so will cause the writer + // to report that it is full. + // + // Also, do not change the harvest schema version. We will + // expose to the downstream operators the schema in effect + // at the start of the row. Columns added within the row won't + // appear until the next batch. + + break; + + default: + throw new IllegalStateException("Unexpected state: " + state); + } + } + + /** + * Implementation of {@link RowSetLoader#isFull()} + * @return true if the batch is full (reached vector capacity or the + * row count limit), false if more rows can be added + */ + + protected boolean isFull() { + switch (state) { + case ACTIVE: + return ! writerIndex.valid(); + case OVERFLOW: + case FULL_BATCH: + return true; + default: + return false; + } + } + + @Override + public boolean writeable() { + return state == State.ACTIVE || state == State.OVERFLOW; + } + + private boolean isBatchActive() { + return state == State.ACTIVE || state == State.OVERFLOW || + state == State.FULL_BATCH ; + } + + /** + * Implementation for {#link {@link RowSetLoader#rowCount()}. + * + * @return the number of rows to be sent downstream for this + * batch. Does not include the overflow row. + */ + + protected int rowCount() { + switch (state) { + case ACTIVE: + case FULL_BATCH: + return writerIndex.size(); + case OVERFLOW: + return pendingRowCount; + default: + return 0; + } + } + + protected WriterIndexImpl writerIndex() { return writerIndex; } + + @Override + public void setTargetRowCount(int rowCount) { + targetRowCount = Math.max(1, rowCount); + } + + @Override + public int targetRowCount() { return targetRowCount; } + + @Override + public int targetVectorSize() { return options.vectorSizeLimit; } + + protected void overflowed() { + logger.trace("Vector overflow"); + + // If we see overflow when we are already handling overflow, it means + // that a single value is too large to fit into an entire vector. + // Fail the query. + // + // Note that this is a judgment call. It is possible to allow the + // vector to double beyond the limit, but that will require a bit + // of thought to get right -- and, of course, completely defeats + // the purpose of limiting vector size to avoid memory fragmentation... + // + // Individual columns handle the case in which overflow occurs on the + // first row of the main batch. This check handles the pathological case + // in which we successfully overflowed, but then another column + // overflowed during the overflow row -- that indicates that that one + // column can't fit in an empty vector. That is, this check is for a + // second-order overflow. + + if (state == State.OVERFLOW) { + throw UserException + .memoryError("A single column value is larger than the maximum allowed size of 16 MB") + .build(logger); + } + if (state != State.ACTIVE) { + throw new IllegalStateException("Unexpected state: " + state); + } + state = State.IN_OVERFLOW; + + // Preserve the number of rows in the now-complete batch. + + pendingRowCount = writerIndex.vectorIndex(); + + // Roll-over will allocate new vectors. Update with the latest + // array cardinality. + + updateCardinality(); + +// rootWriter.dump(new HierarchicalPrinter()); + + // Wrap up the completed rows into a batch. Sets + // vector value counts. The rollover data still exists so + // it can be moved, but it is now past the recorded + // end of the vectors (though, obviously, not past the + // physical end.) + + rootWriter.preRollover(); + + // Roll over vector values. + + accumulatedBatchSize = 0; + rootState.rollover(); + + // Adjust writer state to match the new vector values. This is + // surprisingly easy if we not that the current row is shifted to + // the 0 position in the new vector, so we just shift all offsets + // downward by the current row position at each repeat level. + + rootWriter.postRollover(); + + // The writer index is reset back to 0. Because of the above roll-over + // processing, some vectors may now already have values in the 0 slot. + // However, the vector that triggered overflow has not yet written to + // the current record, and so will now write to position 0. After the + // completion of the row, all 0-position values should be written (or + // at least those provided by the client.) + // + // For arrays, the writer might have written a set of values + // (v1, v2, v3), and v4 might have triggered the overflow. In this case, + // the array values have been moved, offset vectors adjusted, the + // element writer adjusted, so that v4 will be written to index 3 + // to produce (v1, v2, v3, v4, v5, ...) in the look-ahead vector. + + writerIndex.rollover(); + checkInitialAllocation(); + + // Remember that overflow is in effect. + + state = State.OVERFLOW; + } + + protected boolean hasOverflow() { return state == State.OVERFLOW; } + + @Override + public VectorContainer harvest() { + int rowCount; + switch (state) { + case ACTIVE: + case FULL_BATCH: + rowCount = harvestNormalBatch(); + logger.trace("Harvesting {} rows", rowCount); + break; + case OVERFLOW: + rowCount = harvestOverflowBatch(); + logger.trace("Harvesting {} rows after overflow", rowCount); + break; + default: + throw new IllegalStateException("Unexpected state: " + state); + } + + // Build the output container + + VectorContainer container = outputContainer(); + container.setRecordCount(rowCount); + + // Finalize: update counts, set state. + + harvestBatchCount++; + previousRowCount += rowCount; + return container; + } + + private int harvestNormalBatch() { + + // Wrap up the vectors: final fill-in, set value count, etc. + + rootWriter.endBatch(); + harvestSchemaVersion = activeSchemaVersion; + state = State.HARVESTED; + return writerIndex.size(); + } + + private int harvestOverflowBatch() { + rootState.harvestWithLookAhead(); + state = State.LOOK_AHEAD; + return pendingRowCount; + } + + @Override + public VectorContainer outputContainer() { + // Build the output container. + + if (containerBuilder == null) { + containerBuilder = new VectorContainerBuilder(this); + } + containerBuilder.update(harvestSchemaVersion); + return containerBuilder.container(); + } + + @Override + public TupleMetadata harvestSchema() { + return containerBuilder.schema(); + } + + @Override + public void close() { + if (state == State.CLOSED) { + return; + } + rootState.close(); + + // Do not close the vector cache; the caller owns that and + // will, presumably, reuse those vectors for another writer. + + state = State.CLOSED; + } + + @Override + public int batchCount() { + return harvestBatchCount + (rowCount() == 0 ? 0 : 1); + } + + @Override + public int totalRowCount() { + int total = previousRowCount; + if (isBatchActive()) { + total += pendingRowCount + writerIndex.size(); + } + return total; + } + + public ResultVectorCache vectorCache() { return vectorCache; } + public RowState rootState() { return rootState; } + + /** + * Return whether a vector within the current batch can expand. Limits + * are enforce only if a limit was provided in the options. + * + * @param delta increase in vector size + * @return true if the vector can expand, false if an overflow + * event should occur + */ + + public boolean canExpand(int delta) { + accumulatedBatchSize += delta; + return state == State.IN_OVERFLOW || + options.maxBatchSize <= 0 || + accumulatedBatchSize <= options.maxBatchSize; + } + + /** + * Accumulate the initial vector allocation sizes. + * + * @param allocationBytes number of bytes allocated to a vector + * in the batch setup step + */ + + public void tallyAllocations(int allocationBytes) { + accumulatedBatchSize += allocationBytes; + } + + /** + * Log and check the initial vector allocation. If a batch size + * limit is set, warn if the initial allocation exceeds the limit. + * This will occur if the target row count is incorrect for the + * data size. + */ + + private void checkInitialAllocation() { + if (options.maxBatchSize < 0) { + logger.debug("Initial vector allocation: {}, no batch limit specified", + accumulatedBatchSize); + } + else if (accumulatedBatchSize > options.maxBatchSize) { + logger.warn("Initial vector allocation: {}, but batch size limit is: {}", + accumulatedBatchSize, options.maxBatchSize); + } else { + logger.debug("Initial vector allocation: {}, batch size limit: {}", + accumulatedBatchSize, options.maxBatchSize); + } + } + + public void dump(HierarchicalFormatter format) { + format + .startObject(this) + .attribute("options"); + options.dump(format); + format + .attribute("index", writerIndex.vectorIndex()) + .attribute("state", state) + .attribute("activeSchemaVersion", activeSchemaVersion) + .attribute("harvestSchemaVersion", harvestSchemaVersion) + .attribute("pendingRowCount", pendingRowCount) + .attribute("targetRowCount", targetRowCount) + ; + format.attribute("root"); + rootState.dump(format); + format.attribute("rootWriter"); + rootWriter.dump(format); + format.endObject(); + } +}
http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/ResultVectorCacheImpl.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/ResultVectorCacheImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/ResultVectorCacheImpl.java new file mode 100644 index 0000000..c7288b2 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/ResultVectorCacheImpl.java @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.rowSet.impl; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.drill.common.types.TypeProtos.MajorType; +import org.apache.drill.exec.expr.TypeHelper; +import org.apache.drill.exec.memory.BufferAllocator; +import org.apache.drill.exec.physical.rowSet.ResultVectorCache; +import org.apache.drill.exec.record.MaterializedField; +import org.apache.drill.exec.vector.ValueVector; + +/** + * Manages an inventory of value vectors used across row batch readers. + * Drill semantics for batches is complex. Each operator logically returns + * a batch of records on each call of the Drill Volcano iterator protocol + * <tt>next()</tt> operation. However, the batches "returned" are not + * separate objects. Instead, Drill enforces the following semantics: + * <ul> + * <li>If a <tt>next()</tt> call returns <tt>OK</tt> then the set of vectors + * in the "returned" batch must be identical to those in the prior batch. Not + * just the same type; they must be the same <tt>ValueVector</tt> objects. + * (The buffers within the vectors will be different.)</li> + * <li>If the set of vectors changes in any way (add a vector, remove a + * vector, change the type of a vector), then the <tt>next()</tt> call + * <b>must</b> return <tt>OK_NEW_SCHEMA</tt>.</ul> + * </ul> + * These rules create interesting constraints for the scan operator. + * Conceptually, each batch is distinct. But, it must share vectors. The + * {@link ResultSetLoader} class handles this by managing the set of vectors + * used by a single reader. + * <p> + * Readers are independent: each may read a distinct schema (as in JSON.) + * Yet, the Drill protocol requires minimizing spurious <tt>OK_NEW_SCHEMA</tt> + * events. As a result, two readers run by the same scan operator must + * share the same set of vectors, despite the fact that they may have + * different schemas and thus different <tt>ResultSetLoader</tt>s. + * <p> + * The purpose of this inventory is to persist vectors across readers, even + * when, say, reader B does not use a vector that reader A created. + * <p> + * The semantics supported by this class include: + * <ul> + * <li>Ability to "pre-declare" columns based on columns that appear in + * an explicit select list. This ensures that the columns are known (but + * not their types).</li> + * <li>Ability to reuse a vector across readers if the column retains the same + * name and type (minor type and mode.)</li> + * <li>Ability to flush unused vectors for readers with changing schemas + * if a schema change occurs.</li> + * <li>Support schema "hysteresis"; that is, the a "sticky" schema that + * minimizes spurious changes. Once a vector is declared, it can be included + * in all subsequent batches (provided the column is nullable or an array.)</li> + * </ul> + */ +public class ResultVectorCacheImpl implements ResultVectorCache { + + /** + * State of a projected vector. At first all we have is a name. + * Later, we'll discover the type. + */ + + private static class VectorState { + protected final String name; + protected ValueVector vector; + protected boolean touched; + + public VectorState(String name) { + this.name = name; + } + + public boolean satisfies(MaterializedField colSchema) { + if (vector == null) { + return false; + } + MaterializedField vectorSchema = vector.getField(); + return vectorSchema.getType().equals(colSchema.getType()); + } + } + + private final BufferAllocator allocator; + private final Map<String, VectorState> vectors = new HashMap<>(); + + public ResultVectorCacheImpl(BufferAllocator allocator) { + this.allocator = allocator; + } + + @Override + public BufferAllocator allocator() { return allocator; } + + public void predefine(List<String> selected) { + for (String colName : selected) { + addVector(colName); + } + } + + private VectorState addVector(String colName) { + VectorState vs = new VectorState(colName); + vectors.put(vs.name, vs); + return vs; + } + + public void newBatch() { + for (VectorState vs : vectors.values()) { + vs.touched = false; + } + } + + public void trimUnused() { + List<VectorState> unused = new ArrayList<>(); + for (VectorState vs : vectors.values()) { + if (! vs.touched) { + unused.add(vs); + } + } + if (unused.isEmpty()) { + return; + } + for (VectorState vs : unused) { + vectors.remove(vs.name); + } + } + + @Override + public ValueVector addOrGet(MaterializedField colSchema) { + VectorState vs = vectors.get(colSchema.getName()); + + // If the vector is found, and is of the right type, reuse it. + + if (vs != null && vs.satisfies(colSchema)) { + return vs.vector; + } + + // If no vector, this is a late schema. Create the vector. + + if (vs == null) { + vs = addVector(colSchema.getName()); + + // Else, if the vector changed type, close the old one. + + } else if (vs.vector != null) { + vs.vector.close(); + vs.vector = null; + } + + // Create the new vector. + + vs.touched = true; + vs.vector = TypeHelper.getNewVector(colSchema, allocator, null); + return vs.vector; + } + + public MajorType getType(String name) { + VectorState vs = vectors.get(name); + if (vs == null || vs.vector == null) { + return null; + } + return vs.vector.getField().getType(); + } + + public void close() { + for (VectorState vs : vectors.values()) { + vs.vector.close(); + } + vectors.clear(); + } +} http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/RowSetLoaderImpl.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/RowSetLoaderImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/RowSetLoaderImpl.java new file mode 100644 index 0000000..ec61ae7 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/RowSetLoaderImpl.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.rowSet.impl; + +import java.util.ArrayList; + +import org.apache.drill.exec.physical.rowSet.ResultSetLoader; +import org.apache.drill.exec.physical.rowSet.RowSetLoader; +import org.apache.drill.exec.record.TupleMetadata; +import org.apache.drill.exec.vector.accessor.writer.AbstractObjectWriter; +import org.apache.drill.exec.vector.accessor.writer.AbstractTupleWriter; + +/** + * Implementation of the row set loader. Provides row-level operations, leaving the + * result set loader to provide batch-level operations. However, all control + * operations are actually delegated to the result set loader, which handles + * the details of working with overflow rows. + */ + +public class RowSetLoaderImpl extends AbstractTupleWriter implements RowSetLoader { + + private final ResultSetLoaderImpl rsLoader; + + protected RowSetLoaderImpl(ResultSetLoaderImpl rsLoader, TupleMetadata schema) { + super(schema, new ArrayList<AbstractObjectWriter>()); + this.rsLoader = rsLoader; + bindIndex(rsLoader.writerIndex()); + } + + @Override + public ResultSetLoader loader() { return rsLoader; } + + @Override + public RowSetLoader addRow(Object...values) { + if (! start()) { + throw new IllegalStateException("Batch is full."); + } + setObject(values); + save(); + return this; + } + + @Override + public int rowIndex() { return rsLoader.writerIndex().vectorIndex(); } + + @Override + public void save() { rsLoader.saveRow(); } + + @Override + public boolean start() { + if (rsLoader.isFull()) { + + // Full batch? Return false. + + return false; + } else if (state == State.IN_ROW) { + + // Already in a row? Rewind the to start of the row. + + restartRow(); + } else { + + // Otherwise, advance to the next row. + + rsLoader.startRow(); + } + return true; + } + + public void endBatch() { + if (state == State.IN_ROW) { + restartRow(); + state = State.IN_WRITE; + } + endWrite(); + } + + @Override + public boolean isFull( ) { return rsLoader.isFull(); } + + @Override + public int rowCount() { return rsLoader.rowCount(); } +} http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/SingleVectorState.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/SingleVectorState.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/SingleVectorState.java new file mode 100644 index 0000000..f6bc5f3 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/SingleVectorState.java @@ -0,0 +1,274 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.rowSet.impl; + +import org.apache.drill.exec.expr.TypeHelper; +import org.apache.drill.exec.record.ColumnMetadata; +import org.apache.drill.exec.vector.FixedWidthVector; +import org.apache.drill.exec.vector.UInt4Vector; +import org.apache.drill.exec.vector.ValueVector; +import org.apache.drill.exec.vector.VariableWidthVector; +import org.apache.drill.exec.vector.accessor.impl.HierarchicalFormatter; +import org.apache.drill.exec.vector.accessor.writer.AbstractObjectWriter; +import org.apache.drill.exec.vector.accessor.writer.AbstractScalarWriter; +import org.apache.drill.exec.vector.accessor.writer.OffsetVectorWriter; + +/** + * Base class for a single vector. Handles the bulk of work for that vector. + * Subclasses are specialized for offset vectors or values vectors. + * (The "single vector" name contrasts with classes that manage compound + * vectors, such as a data and offsets vector.) + */ + +public abstract class SingleVectorState implements VectorState { + + /** + * State for a scalar value vector. The vector might be for a simple (non-array) + * vector, or might be the payload part of a scalar array (repeated scalar) + * vector. + */ + + public static class ValuesVectorState extends SingleVectorState { + + private final ColumnMetadata schema; + + public ValuesVectorState(ColumnMetadata schema, AbstractScalarWriter writer, ValueVector mainVector) { + super(writer, mainVector); + this.schema = schema; + } + + @Override + public int allocateVector(ValueVector vector, int cardinality) { + if (schema.isVariableWidth()) { + + // Cap the allocated size to the maximum. + + int size = (int) Math.min(ValueVector.MAX_BUFFER_SIZE, (long) cardinality * schema.expectedWidth()); + ((VariableWidthVector) vector).allocateNew(size, cardinality); + } else { + ((FixedWidthVector) vector).allocateNew(cardinality); + } + return vector.getBufferSize(); + } + + @Override + protected void copyOverflow(int sourceStartIndex, int sourceEndIndex) { + int newIndex = 0; + ResultSetLoaderImpl.logger.trace("Vector {} of type {}: copy {} values from {} to {}", + mainVector.getField().toString(), + mainVector.getClass().getSimpleName(), + Math.max(0, sourceEndIndex - sourceStartIndex + 1), + sourceStartIndex, newIndex); + + // Copy overflow values from the full vector to the new + // look-ahead vector. Uses vector-level operations for convenience. + // These aren't very efficient, but overflow does not happen very + // often. + + for (int src = sourceStartIndex; src <= sourceEndIndex; src++, newIndex++) { + mainVector.copyEntry(newIndex, backupVector, src); + } + } + } + + /** + * Special case for an offset vector. Offset vectors are managed like any other + * vector with respect to overflow and allocation. This means that the loader + * classes avoid the use of the RepeatedVector class methods, instead working + * with the offsets vector (here) or the values vector to allow the needed + * fine control over overflow operations. + */ + + public static class OffsetVectorState extends SingleVectorState { + + private final AbstractObjectWriter childWriter; + + public OffsetVectorState(AbstractScalarWriter writer, ValueVector mainVector, + AbstractObjectWriter childWriter) { + super(writer, mainVector); + this.childWriter = childWriter; + } + + @Override + public int allocateVector(ValueVector toAlloc, int cardinality) { + ((UInt4Vector) toAlloc).allocateNew(cardinality); + return toAlloc.getBufferSize(); + } + + public int rowStartOffset() { + return ((OffsetVectorWriter) writer).rowStartOffset(); + } + + @Override + protected void copyOverflow(int sourceStartIndex, int sourceEndIndex) { + + if (sourceStartIndex > sourceEndIndex) { + return; + } + + // This is an offset vector. The data to copy is one greater + // than the row index. + + sourceStartIndex++; + sourceEndIndex++; + + // Copy overflow values from the full vector to the new + // look-ahead vector. Since this is an offset vector, values must + // be adjusted as they move across. + // + // Indexing can be confusing. Offset vectors have values offset + // from their row by one position. The offset vector position for + // row i has the start value for row i. The offset vector position for + // i+1 has the start of the next value. The difference between the + // two is the element length. As a result, the offset vector always has + // one more value than the number of rows, and position 0 is always 0. + // + // The index passed in here is that of the row that overflowed. That + // offset vector position contains the offset of the start of the data + // for the current row. We must subtract that offset from each copied + // value to adjust the offset for the destination. + + UInt4Vector.Accessor sourceAccessor = ((UInt4Vector) backupVector).getAccessor(); + UInt4Vector.Mutator destMutator = ((UInt4Vector) mainVector).getMutator(); + int offset = childWriter.events().writerIndex().rowStartIndex(); + int newIndex = 1; + ResultSetLoaderImpl.logger.trace("Offset vector: copy {} values from {} to {} with offset {}", + Math.max(0, sourceEndIndex - sourceStartIndex + 1), + sourceStartIndex, newIndex, offset); + assert offset == sourceAccessor.get(sourceStartIndex - 1); + + // Position zero is special and will be filled in by the writer + // later. + + for (int src = sourceStartIndex; src <= sourceEndIndex; src++, newIndex++) { + destMutator.set(newIndex, sourceAccessor.get(src) - offset); + } +// VectorPrinter.printOffsets((UInt4Vector) backupVector, sourceStartIndex - 1, sourceEndIndex - sourceStartIndex + 3); +// VectorPrinter.printOffsets((UInt4Vector) mainVector, 0, newIndex); + } + } + + protected final AbstractScalarWriter writer; + protected final ValueVector mainVector; + protected ValueVector backupVector; + + public SingleVectorState(AbstractScalarWriter writer, ValueVector mainVector) { + this.writer = writer; + this.mainVector = mainVector; + } + + @Override + public ValueVector vector() { return mainVector; } + + @Override + public int allocate(int cardinality) { + return allocateVector(mainVector, cardinality); + } + + protected abstract int allocateVector(ValueVector vector, int cardinality); + + /** + * A column within the row batch overflowed. Prepare to absorb the rest of + * the in-flight row by rolling values over to a new vector, saving the + * complete vector for later. This column could have a value for the overflow + * row, or for some previous row, depending on exactly when and where the + * overflow occurs. + * + * @param sourceStartIndex the index of the row that caused the overflow, the + * values of which should be copied to a new "look-ahead" vector. If the + * vector is an array, then the overflowIndex is the position of the first + * element to be moved, and multiple elements may need to move + */ + + @Override + public void rollover(int cardinality) { + + int sourceStartIndex = writer.writerIndex().rowStartIndex(); + + // Remember the last write index for the original vector. + // This tells us the end of the set of values to move, while the + // sourceStartIndex above tells us the start. + + int sourceEndIndex = writer.lastWriteIndex(); + + // Switch buffers between the backup vector and the writer's output + // vector. Done this way because writers are bound to vectors and + // we wish to keep the binding. + + if (backupVector == null) { + backupVector = TypeHelper.getNewVector(mainVector.getField(), mainVector.getAllocator(), null); + } + assert cardinality > 0; + allocateVector(backupVector, cardinality); + mainVector.exchange(backupVector); + + // Copy overflow values from the full vector to the new + // look-ahead vector. + + copyOverflow(sourceStartIndex, sourceEndIndex); + + // At this point, the writer is positioned to write to the look-ahead + // vector at the position after the copied values. The original vector + // is saved along with a last write position that is no greater than + // the retained values. + } + + protected abstract void copyOverflow(int sourceStartIndex, int sourceEndIndex); + + /** + * Exchange the data from the backup vector and the main vector, putting + * the completed buffers back into the main vectors, and stashing the + * overflow buffers away in the backup vector. + * Restore the main vector's last write position. + */ + + @Override + public void harvestWithLookAhead() { + mainVector.exchange(backupVector); + } + + /** + * The previous full batch has been sent downstream and the client is + * now ready to start writing to the next batch. Initialize that new batch + * with the look-ahead values saved during overflow of the previous batch. + */ + + @Override + public void startBatchWithLookAhead() { + mainVector.exchange(backupVector); + backupVector.clear(); + } + + @Override + public void reset() { + mainVector.clear(); + if (backupVector != null) { + backupVector.clear(); + } + } + + @Override + public void dump(HierarchicalFormatter format) { + format + .startObject(this) + .attributeIdentity("writer", writer) + .attributeIdentity("mainVector", mainVector) + .attributeIdentity("backupVector", backupVector) + .endObject(); + } +} http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/TupleState.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/TupleState.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/TupleState.java new file mode 100644 index 0000000..de41ee4 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/TupleState.java @@ -0,0 +1,388 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.rowSet.impl; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.drill.exec.physical.rowSet.impl.ColumnState.BaseMapColumnState; +import org.apache.drill.exec.physical.rowSet.impl.ColumnState.MapArrayColumnState; +import org.apache.drill.exec.physical.rowSet.impl.ColumnState.MapColumnState; +import org.apache.drill.exec.record.ColumnMetadata; +import org.apache.drill.exec.record.MaterializedField; +import org.apache.drill.exec.record.TupleMetadata; +import org.apache.drill.exec.record.TupleSchema; +import org.apache.drill.exec.record.TupleSchema.AbstractColumnMetadata; +import org.apache.drill.exec.vector.ValueVector; +import org.apache.drill.exec.vector.accessor.ObjectType; +import org.apache.drill.exec.vector.accessor.ObjectWriter; +import org.apache.drill.exec.vector.accessor.TupleWriter; +import org.apache.drill.exec.vector.accessor.TupleWriter.TupleWriterListener; +import org.apache.drill.exec.vector.accessor.impl.HierarchicalFormatter; +import org.apache.drill.exec.vector.accessor.writer.AbstractObjectWriter; +import org.apache.drill.exec.vector.accessor.writer.AbstractTupleWriter; +import org.apache.drill.exec.vector.accessor.writer.ColumnWriterFactory; + +/** + * Represents the loader state for a tuple: a row or a map. This is "state" in + * the sense of variables that are carried along with each tuple. Handles + * write-time issues such as defining new columns, allocating memory, handling + * overflow, assembling the output version of the map, and so on. Each + * row and map in the result set has a tuple state instances associated + * with it. + * <p> + * Here, by "tuple" we mean a container of vectors, each of which holds + * a variety of values. So, the "tuple" here is structural, not a specific + * set of values, but rather the collection of vectors that hold tuple + * values. + */ + +public abstract class TupleState implements TupleWriterListener { + + /** + * Handles the details of the top-level tuple, the data row itself. + * Note that by "row" we mean the set of vectors that define the + * set of rows. + */ + + public static class RowState extends TupleState { + + /** + * The row-level writer for stepping through rows as they are written, + * and for accessing top-level columns. + */ + + private final RowSetLoaderImpl writer; + + public RowState(ResultSetLoaderImpl rsLoader) { + super(rsLoader, rsLoader.projectionSet); + writer = new RowSetLoaderImpl(rsLoader, schema); + writer.bindListener(this); + } + + public RowSetLoaderImpl rootWriter() { return writer; } + + @Override + public AbstractTupleWriter writer() { return writer; } + + @Override + public int innerCardinality() { return resultSetLoader.targetRowCount();} + } + + /** + * Represents a tuple defined as a Drill map: single or repeated. Note that + * the map vector does not exist here; it is assembled only when "harvesting" + * a batch. This design supports the obscure case in which a new column + * is added during an overflow row, so exists within this abstraction, + * but is not published to the map that makes up the output. + */ + + public static class MapState extends TupleState { + + protected final BaseMapColumnState mapColumnState; + protected int outerCardinality; + + public MapState(ResultSetLoaderImpl rsLoader, + BaseMapColumnState mapColumnState, + ProjectionSet projectionSet) { + super(rsLoader, projectionSet); + this.mapColumnState = mapColumnState; + mapColumnState.writer().bindListener(this); + } + + /** + * Return the tuple writer for the map. If this is a single + * map, then it is the writer itself. If this is a map array, + * then the tuple is nested inside the array. + */ + + @Override + public AbstractTupleWriter writer() { + AbstractObjectWriter objWriter = mapColumnState.writer(); + TupleWriter tupleWriter; + if (objWriter.type() == ObjectType.ARRAY) { + tupleWriter = objWriter.array().tuple(); + } else { + tupleWriter = objWriter.tuple(); + } + return (AbstractTupleWriter) tupleWriter; + } + + /** + * In order to allocate the correct-sized vectors, the map must know + * its member cardinality: the number of elements in each row. This + * is 1 for a single map, but may be any number for a map array. Then, + * this value is recursively pushed downward to compute the cardinality + * of lists of maps that contains lists of maps, and so on. + */ + + @Override + public void updateCardinality(int outerCardinality) { + this.outerCardinality = outerCardinality; + super.updateCardinality(outerCardinality); + } + + @Override + public int innerCardinality() { + return outerCardinality * mapColumnState.schema().expectedElementCount(); + } + + @Override + public void dump(HierarchicalFormatter format) { + format + .startObject(this) + .attribute("column", mapColumnState.schema().name()) + .attribute("cardinality", outerCardinality) + .endObject(); + } + } + + protected final ResultSetLoaderImpl resultSetLoader; + protected final List<ColumnState> columns = new ArrayList<>(); + protected final TupleSchema schema = new TupleSchema(); + protected final ProjectionSet projectionSet; + + protected TupleState(ResultSetLoaderImpl rsLoader, ProjectionSet projectionSet) { + this.resultSetLoader = rsLoader; + this.projectionSet = projectionSet; + } + + public abstract int innerCardinality(); + + /** + * Returns an ordered set of the columns which make up the tuple. + * Column order is the same as that defined by the map's schema, + * to allow indexed access. New columns always appear at the end + * of the list to preserve indexes. + * + * @return ordered list of column states for the columns within + * this tuple + */ + + public List<ColumnState> columns() { return columns; } + + public TupleMetadata schema() { return writer().schema(); } + + public abstract AbstractTupleWriter writer(); + + @Override + public ObjectWriter addColumn(TupleWriter tupleWriter, MaterializedField column) { + return addColumn(tupleWriter, TupleSchema.fromField(column)); + } + + @Override + public ObjectWriter addColumn(TupleWriter tupleWriter, ColumnMetadata columnSchema) { + + // Verify name is not a (possibly case insensitive) duplicate. + + TupleMetadata tupleSchema = schema(); + String colName = columnSchema.name(); + if (tupleSchema.column(colName) != null) { + throw new IllegalArgumentException("Duplicate column: " + colName); + } + + return addColumn(columnSchema); + } + + /** + * Implementation of the work to add a new column to this tuple given a + * schema description of the column. + * + * @param columnSchema schema of the column + * @return writer for the new column + */ + + private AbstractObjectWriter addColumn(ColumnMetadata columnSchema) { + + // Indicate projection in the metadata. + + ((AbstractColumnMetadata) columnSchema).setProjected( + projectionSet.isProjected(columnSchema.name())); + + // Build the column + + ColumnState colState; + if (columnSchema.isMap()) { + colState = buildMap(columnSchema); + } else { + colState = buildPrimitive(columnSchema); + } + columns.add(colState); + colState.updateCardinality(innerCardinality()); + colState.allocateVectors(); + return colState.writer(); + } + + /** + * Build a primitive column. Check if the column is projected. If not, + * allocate a dummy writer for the column. If projected, then allocate + * a vector, a writer, and the column state which binds the two together + * and manages the column. + * + * @param columnSchema schema of the new primitive column + * @return column state for the new column + */ + + @SuppressWarnings("resource") + private ColumnState buildPrimitive(ColumnMetadata columnSchema) { + ValueVector vector; + if (columnSchema.isProjected()) { + + // Create the vector for the column. + + vector = resultSetLoader.vectorCache().addOrGet(columnSchema.schema()); + } else { + + // Column is not projected. No materialized backing for the column. + + vector = null; + } + + // Create the writer. Will be returned to the tuple writer. + + AbstractObjectWriter colWriter = ColumnWriterFactory.buildColumnWriter(columnSchema, vector); + + if (columnSchema.isArray()) { + return PrimitiveColumnState.newPrimitiveArray(resultSetLoader, vector, colWriter); + } else { + return PrimitiveColumnState.newPrimitive(resultSetLoader, vector, colWriter); + } + } + + /** + * Build a new map (single or repeated) column. No map vector is created + * here, instead we create a tuple state to hold the columns, and defer the + * map vector (or vector container) until harvest time. + * + * @param columnSchema description of the map column + * @return column state for the map column + */ + + private ColumnState buildMap(ColumnMetadata columnSchema) { + + // When dynamically adding columns, must add the (empty) + // map by itself, then add columns to the map via separate + // calls. + + assert columnSchema.isMap(); + assert columnSchema.mapSchema().size() == 0; + + // Create the writer. Will be returned to the tuple writer. + + ProjectionSet childProjection = projectionSet.mapProjection(columnSchema.name()); + if (columnSchema.isArray()) { + return MapArrayColumnState.build(resultSetLoader, + columnSchema, + childProjection); + } else { + return new MapColumnState(resultSetLoader, + columnSchema, + childProjection); + } + } + + /** + * When creating a schema up front, provide the schema of the desired tuple, + * then build vectors and writers to match. Allows up-front schema definition + * in addition to on-the-fly schema creation handled elsewhere. + * + * @param schema desired tuple schema to be materialized + */ + + public void buildSchema(TupleMetadata schema) { + for (int i = 0; i < schema.size(); i++) { + ColumnMetadata colSchema = schema.metadata(i); + AbstractObjectWriter colWriter; + if (colSchema.isMap()) { + colWriter = addColumn(colSchema.cloneEmpty()); + BaseMapColumnState mapColState = (BaseMapColumnState) columns.get(columns.size() - 1); + mapColState.mapState().buildSchema(colSchema.mapSchema()); + } else { + colWriter = addColumn(colSchema); + } + writer().addColumnWriter(colWriter); + } + } + + public void updateCardinality(int cardinality) { + for (ColumnState colState : columns) { + colState.updateCardinality(cardinality); + } + } + + /** + * A column within the row batch overflowed. Prepare to absorb the rest of the + * in-flight row by rolling values over to a new vector, saving the complete + * vector for later. This column could have a value for the overflow row, or + * for some previous row, depending on exactly when and where the overflow + * occurs. + */ + + public void rollover() { + for (ColumnState colState : columns) { + colState.rollover(); + } + } + + /** + * Writing of a row batch is complete, and an overflow occurred. Prepare the + * vector for harvesting to send downstream. Set aside the look-ahead vector + * and put the full vector buffer back into the active vector. + */ + + public void harvestWithLookAhead() { + for (ColumnState colState : columns) { + colState.harvestWithLookAhead(); + } + } + + /** + * Start a new batch by shifting the overflow buffers back into the main + * write vectors and updating the writers. + */ + + public void startBatch() { + for (ColumnState colState : columns) { + colState.startBatch(); + } + } + + /** + * Clean up state (such as backup vectors) associated with the state + * for each vector. + */ + + public void close() { + for (ColumnState colState : columns) { + colState.close(); + } + } + + public void dump(HierarchicalFormatter format) { + format + .startObject(this) + .attributeArray("columns"); + for (int i = 0; i < columns.size(); i++) { + format.element(i); + columns.get(i).dump(format); + } + format + .endArray() + .endObject(); + } +} http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/VectorContainerBuilder.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/VectorContainerBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/VectorContainerBuilder.java new file mode 100644 index 0000000..faa68cb --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/VectorContainerBuilder.java @@ -0,0 +1,257 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.rowSet.impl; + +import java.util.List; + +import org.apache.drill.exec.memory.BufferAllocator; +import org.apache.drill.exec.physical.rowSet.impl.ColumnState.BaseMapColumnState; +import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; +import org.apache.drill.exec.record.ColumnMetadata; +import org.apache.drill.exec.record.TupleMetadata; +import org.apache.drill.exec.record.TupleSchema; +import org.apache.drill.exec.record.VectorContainer; +import org.apache.drill.exec.vector.UInt4Vector; +import org.apache.drill.exec.vector.ValueVector; +import org.apache.drill.exec.vector.complex.AbstractMapVector; +import org.apache.drill.exec.vector.complex.MapVector; +import org.apache.drill.exec.vector.complex.RepeatedMapVector; + +/** + * Builds the harvest vector container that includes only the columns that + * are included in the harvest schema version. That is, it excludes columns + * added while writing an overflow row. + * <p> + * Because a Drill row is actually a hierarchy, walks the internal hierarchy + * and builds a corresponding output hierarchy. + * <ul> + * <li>The root node is the row itself (vector container),</li> + * <li>Internal nodes are maps (structures),</li> + * <li>Leaf notes are primitive vectors (which may be arrays).</li> + * </ul> + * The basic algorithm is to identify the version of the output schema, + * then add any new columns added up to that version. This object maintains + * the output container across batches, meaning that updates are incremental: + * we need only add columns that are new since the last update. And, those new + * columns will always appear directly after all existing columns in the row + * or in a map. + * <p> + * As special case occurs when columns are added in the overflow row. These + * columns <i>do not</i> appear in the output container for the main part + * of the batch; instead they appear in the <i>next</i> output container + * that includes the overflow row. + * <p> + * Since the container here may contain a subset of the internal columns, an + * interesting case occurs for maps. The maps in the output container are + * <b>not</b> the same as those used internally. Since a map column can contain + * either one list of columns or another, the internal and external maps must + * differ. The set of child vectors (except for child maps) are shared. + */ + +public class VectorContainerBuilder { + + /** + * Drill vector containers and maps are both tuples, but they irritatingly + * have completely different APIs for working with their child vectors. + * This class acts as a proxy to wrap the two APIs to provide a common + * view for the use of the container builder. + */ + + public static abstract class TupleProxy { + protected TupleMetadata schema; + + public TupleProxy(TupleMetadata schema) { + this.schema = schema; + } + + protected abstract int size(); + protected abstract ValueVector vector(int index); + protected abstract void add(ValueVector vector); + + protected TupleProxy mapProxy(int index) { + return new MapProxy( + schema.metadata(index).mapSchema(), + (AbstractMapVector) vector(index)); + } + } + + /** + * Proxy wrapper class for a vector container. + */ + + protected static class ContainerProxy extends TupleProxy { + + private VectorContainer container; + + protected ContainerProxy(TupleMetadata schema, VectorContainer container) { + super(schema); + this.container = container; + } + + @Override + protected int size() { + return container.getNumberOfColumns(); + } + + @Override + protected ValueVector vector(int index) { + return container.getValueVector(index).getValueVector(); + } + + @Override + protected void add(ValueVector vector) { + container.add(vector); + } + } + + /** + * Proxy wrapper for a map container. + */ + + protected static class MapProxy extends TupleProxy { + + private AbstractMapVector mapVector; + + protected MapProxy(TupleMetadata schema, AbstractMapVector mapVector) { + super(schema); + this.mapVector = mapVector; + } + + @Override + protected int size() { + return mapVector.size(); + } + + @Override + protected ValueVector vector(int index) { + return mapVector.getChildByOrdinal(index); + } + + @Override + protected void add(ValueVector vector) { + mapVector.putChild(vector.getField().getName(), vector); + } + } + + private final ResultSetLoaderImpl resultSetLoader; + private int outputSchemaVersion = -1; + private TupleMetadata schema; + private VectorContainer container; + + public VectorContainerBuilder(ResultSetLoaderImpl rsLoader) { + this.resultSetLoader = rsLoader; + container = new VectorContainer(rsLoader.allocator); + schema = new TupleSchema(); + } + + public void update(int targetVersion) { + if (outputSchemaVersion >= targetVersion) { + return; + } + outputSchemaVersion = targetVersion; + updateTuple(resultSetLoader.rootState(), new ContainerProxy(schema, container)); + container.buildSchema(SelectionVectorMode.NONE); + } + + public VectorContainer container() { return container; } + + public int outputSchemaVersion() { return outputSchemaVersion; } + + public BufferAllocator allocator() { + return resultSetLoader.allocator(); + } + + private void updateTuple(TupleState sourceModel, TupleProxy destProxy) { + int prevCount = destProxy.size(); + List<ColumnState> cols = sourceModel.columns(); + int currentCount = cols.size(); + + // Scan any existing maps for column additions + + for (int i = 0; i < prevCount; i++) { + ColumnState colState = cols.get(i); + if (! colState.schema().isProjected()) { + continue; + } + if (colState.schema().isMap()) { + updateTuple((TupleState) ((BaseMapColumnState) colState).mapState(), destProxy.mapProxy(i)); + } + } + + // Add new columns, which may be maps + + for (int i = prevCount; i < currentCount; i++) { + ColumnState colState = cols.get(i); + if (! colState.schema().isProjected()) { + continue; + } + + // If the column was added after the output schema version cutoff, + // skip that column for now. + + if (colState.addVersion > outputSchemaVersion) { + break; + } + if (colState.schema().isMap()) { + buildMap(destProxy, (BaseMapColumnState) colState); + } else { + destProxy.add(colState.vector()); + destProxy.schema.addColumn(colState.schema()); + assert destProxy.size() == destProxy.schema.size(); + } + } + } + + @SuppressWarnings("resource") + private void buildMap(TupleProxy parentTuple, BaseMapColumnState colModel) { + + // Creating the map vector will create its contained vectors if we + // give it a materialized field with children. So, instead pass a clone + // without children so we can add them. + + ColumnMetadata mapColSchema = colModel.schema().cloneEmpty(); + + // Don't get the map vector from the vector cache. Map vectors may + // have content that varies from batch to batch. Only the leaf + // vectors can be cached. + + AbstractMapVector mapVector; + if (mapColSchema.isArray()) { + + // A repeated map shares an offset vector with the internal + // repeated map. + + UInt4Vector offsets = (UInt4Vector) colModel.vector(); + mapVector = new RepeatedMapVector(mapColSchema.schema(), offsets, null); + } else { + mapVector = new MapVector(mapColSchema.schema(), allocator(), null); + } + + // Add the map vector and schema to the parent tuple + + parentTuple.add(mapVector); + int index = parentTuple.schema.addColumn(mapColSchema); + assert parentTuple.size() == parentTuple.size(); + + // Update the tuple, which will add the new columns in the map + + updateTuple(colModel.mapState(), parentTuple.mapProxy(index)); + } + + public TupleMetadata schema() { return schema; } +} http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/VectorState.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/VectorState.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/VectorState.java new file mode 100644 index 0000000..4a1c698 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/VectorState.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.rowSet.impl; + +import org.apache.drill.exec.vector.ValueVector; +import org.apache.drill.exec.vector.accessor.impl.HierarchicalFormatter; + +/** + * Handles batch and overflow operation for a (possibly compound) vector. + * <p> + * The data model is the following: + * <ul> + * <li>Column model<ul> + * <li>Value vector itself</li> + * <li>Column writer</li> + * <li>Column schema</li> + * <li>Column coordinator (this class)</li> + * </ul></li></ul> + * The vector state coordinates events between the result set loader + * on the one side and the vectors, writers and schema on the other. + * For example: + * <pre><code> + * Result Set Vector + * Loader <--> State <--> Vectors + * </code></pre> + * Events from the row set loader deal with allocation, roll-over, + * harvesting completed batches and so on. Events from the writer, + * via the tuple model deal with adding columns and column + * overflow. + */ + +public interface VectorState { + + /** + * Allocate a new vector with the number of elements given. If the vector + * is an array, then the cardinality given is the number of arrays. + * @param cardinality number of elements desired in the allocated + * vector + * + * @return the number of bytes allocated + */ + + int allocate(int cardinality); + + /** + * A vector has overflowed. Create a new look-ahead vector of the given + * cardinality, then copy the overflow values from the main vector to the + * look-ahead vector. + * + * @param cardinality the number of elements in the new vector. If this + * vector is an array, then this is the number of arrays + * @return the new next write position for the vector index associated + * with the writer for this vector + */ + + void rollover(int cardinality); + + /** + * A batch is being harvested after an overflow. Put the full batch + * back into the main vector so it can be harvested. + */ + + void harvestWithLookAhead(); + + /** + * A new batch is starting while an look-ahead vector exists. Move + * the look-ahead buffers into the main vector to prepare for writing + * the rest of the batch. + */ + + void startBatchWithLookAhead(); + + /** + * Clear the vector(s) associated with this state. + */ + + void reset(); + + /** + * Underlying vector: the one presented to the consumer of the + * result set loader. + */ + + ValueVector vector(); + + void dump(HierarchicalFormatter format); +} http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/WriterIndexImpl.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/WriterIndexImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/WriterIndexImpl.java new file mode 100644 index 0000000..2158dd1 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/WriterIndexImpl.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.rowSet.impl; + +import org.apache.drill.exec.physical.rowSet.ResultSetLoader; +import org.apache.drill.exec.vector.accessor.ColumnWriterIndex; + +/** + * Writer index that points to each row in the row set. The index starts at + * the 0th row and advances one row on each increment. This allows writers to + * start positioned at the first row. Writes happen in the current row. + * Calling <tt>next()</tt> advances to the next position, effectively saving + * the current row. The most recent row can be abandoned easily simply by not + * calling <tt>next()</tt>. This means that the number of completed rows is + * the same as the row index. + * <p> + * The writer index enforces the row count limit for a new batch. The + * limit is set by the result set loader and can vary from batch to batch + * if the client chooses in order to adjust the row count based on actual + * data size. + */ + +class WriterIndexImpl implements ColumnWriterIndex { + + private final ResultSetLoader rsLoader; + private int rowIndex = 0; + + public WriterIndexImpl(ResultSetLoader rsLoader) { + this.rsLoader = rsLoader; + } + + @Override + public int vectorIndex() { return rowIndex; } + + @Override + public int rowStartIndex() { return rowIndex; } + + public boolean next() { + if (++rowIndex < rsLoader.targetRowCount()) { + return true; + } else { + // Should not call next() again once batch is full. + rowIndex = rsLoader.targetRowCount(); + return false; + } + } + + public int size() { + + // The index always points to the next slot past the + // end of valid rows. + + return rowIndex; + } + + public boolean valid() { return rowIndex < rsLoader.targetRowCount(); } + + @Override + public void rollover() { + + // The top level index always rolls over to 0 -- + // the first row position in the new vectors. + + reset(); + } + + public void reset() { rowIndex = 0; } + + @Override + public void nextElement() { } + + @Override + public ColumnWriterIndex outerIndex() { return null; } + + @Override + public String toString() { + return new StringBuilder() + .append("[") + .append(getClass().getSimpleName()) + .append(" rowIndex = ") + .append(rowIndex) + .append("]") + .toString(); + } +}
