http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/package-info.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/package-info.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/package-info.java new file mode 100644 index 0000000..4c11499 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/package-info.java @@ -0,0 +1,304 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Handles the details of the result set loader implementation. + * <p> + * The primary purpose of this loader, and the most complex to understand and + * maintain, is overflow handling. + * + * <h4>Detailed Use Cases</h4> + * + * Let's examine it by considering a number of + * use cases. + * <table style="border: 1px solid; border-collapse: collapse;"> + * <tr><th>Row</th><th>a</th><th>b</th><th>c</th><th>d</th><th>e</th><th>f</th><th>g</th><th>h</th></tr> + * <tr><td>n-2</td><td>X</td><td>X</td><td>X</td><td>X</td><td>X</td><td>X</td><td>-</td><td>-</td></tr> + * <tr><td>n-1</td><td>X</td><td>X</td><td>X</td><td>X</td><td> </td><td> </td><td>-</td><td>-</td></tr> + * <tr><td>n </td><td>X</td><td>!</td><td>O</td><td> </td><td>O</td><td> </td><td>O</td><td> </td></tr> + * </table> + * Here: + * <ul> + * <li>n-2, n-1, and n are rows. n is the overflow row.</li> + * <li>X indicates a value was written before overflow.</li> + * <li>Blank indicates no value was written in that row.</li> + * <li>! indicates the value that triggered overflow.</li> + * <li>- indicates a column that did not exist prior to overflow.</li> + * <li>O indicates a value written after overflow.</li> + * </ul> + * Column a is written before overflow occurs, b causes overflow, and all other + * columns either are not written, or written after overflow. + * <p> + * The scenarios, identified by column names above, are: + * <dl> + * <dt>a</dt> + * <dd>a contains values for all three rows. + * <ul> + * <li>Two values were written in the "main" batch, while a third was written to + * what becomes the overflow row.</li> + * <li>When overflow occurs, the last write position is at n. It must be moved + * back to n-1.</li> + * <li>Since data was written to the overflow row, it is copied to the look- + * ahead batch.</li> + * <li>The last write position in the lookahead batch is 0 (since data was + * copied into the 0th row.</li> + * <li>When harvesting, no empty-filling is needed. Values in the main + * batch are zero-filled when the batch is finished, values in the look-ahead + * batch are back-filled when the first value is written.</li> + * <li>When starting the next batch, the last write position must be set to 0 to + * reflect the presence of the value for row n.</li> + * </ul> + * </dd> + * <dt>b</dt> + * <dd>b contains values for all three rows. The value for row n triggers + * overflow. + * <ul> + * <li>The last write position is at n-1, which is kept for the "main" + * vector.</li> + * <li>A new overflow vector is created and starts empty, with the last write + * position at -1.</li> + * <li>Once created, b is immediately written to the overflow vector, advancing + * the last write position to 0.</li> + * <li>Harvesting, and starting the next for column b works the same as column + * a.</li> + * </ul> + * </dd> + * <dt>c</dt> + * <dd>Column c has values for all rows. + * <ul> + * <li>The value for row n is written after overflow.</li> + * <li>At overflow, the last write position is at n-1.</li> + * <li>At overflow, a new lookahead vector is created with the last write + * position at -1.</li> + * <li>The value of c is written to the lookahead vector, advancing the last + * write position to -1.</li> + * <li>Harvesting, and starting the next for column c works the same as column + * a.</li> + * </ul> + * </dd> + * <dt>d</dt> + * <dd>Column d writes values to the last two rows before overflow, but not to + * the overflow row. + * <ul> + * <li>The last write position for the main batch is at n-1.</li> + * <li>The last write position in the lookahead batch remains at -1.</li> + * <li>Harvesting for column d requires filling an empty value for row n-1.</li> + * <li>When starting the next batch, the last write position must be set to -1, + * indicating no data yet written.</li> + * </ul> + * </dd> + * <dt>f</dt> + * <dd>Column f has no data in the last position of the main batch, and no data + * in the overflow row. + * <ul> + * <li>The last write position is at n-2.</li> + * <li>An empty value must be written into position n-1 during harvest.</li> + * <li>On start of the next batch, the last write position starts at -1.</li> + * </ul> + * </dd> + * <dt>g</dt> + * <dd>Column g is added after overflow, and has a value written to the overflow + * row. + * <ul> + * <li>On harvest, column g is simply skipped.</li> + * <li>On start of the next row, the last write position can be left unchanged + * since no "exchange" was done.</li> + * </ul> + * </dd> + * <dt>h</dt> + * <dd>Column h is added after overflow, but does not have data written to it + * during the overflow row. Similar to column g, but the last write position + * starts at -1 for the next batch.</dd> + * </dl> + * + * <h4>General Rules</h4> + * + * The above can be summarized into a smaller set of rules: + * <p> + * At the time of overflow on row n: + * <ul> + * <li>Create or clear the lookahead vector.</li> + * <li>Copy (last write position - n + 1) values from row n in the old vector to 0 + * in the new one. If the copy count is negative, copy nothing. (A negative + * copy count means that the last write position is behind the current + * row position. Should not occur after back-filling.)</li> + * <li>Save the last write position from the old vector, clamped at n. + * (That is, if the last write position is before n, keep it. If at + * n+1, set it back to n.)</li> + * <li>Set the last write position of the overflow vector to (original last + * write position - n), clamped at -1. That is, if the original last write + * position was before n, the new one is -1. If the original last write + * position is after n, shift it down by n places.</li> + * <li>Swap buffers from the main vectors and the overflow vectors. This sets + * aside the main values, and allows writing to continue using the overflow + * buffers.</li> + * </ul> + * <p> + * As the overflow write proceeds: + * <ul> + * <li>For existing columns, write as normal. The last write position moves from + * -1 to 0.</li> + * <li>Columns not written leave the last write position at -1.</li> + * <li>If a new column appears, set its last write position to -1. If it is then + * written, proceed as in the first point above.</li> + * </ul> + * <p> + * At harvest time: + * <ul> + * <li>For every writer, save the last write position.</li> + * <li>Swap the overflow and main buffers to put the main batch back into the + * main vectors.</li> + * <li>Reset the last write position for all writers to the values saved at + * overflow time above.</li> + * <li>Finish the batch for the main vectors as normal. No special handling + * needed.</li> + * </ul> + * <p> + * When starting the next batch: + * <ul> + * <li>Swap buffers again, putting the overflow row back into the main vectors. + * (At this point, the harvested vectors should all have zero buffers.)</li> + * <li>Restore the last write position saved during harvest.</li> + * </ul> + * <h4>Constraints</h4> + * A number of constraints are worth pointing out: + * <ul> + * <li>Writers are bound to vectors, so we can't easily swap vectors during + * overflow.</li> + * <li>The project operator to which this operator feeds data also binds to + * vectors, so the same set of vectors must be presented on every batch.</li> + * <li>The client binds to writers, so we cannot swap writers between main and + * overflow batches.</li> + * <li>Therefore, the unit of swapping is the buffer that backs the vectors. + * </li> + * <li>Swapping is not copying; it is only exchanging pointers.</li> + * <li>The only copying in this entire process occurs when moving previously- + * written values in the overflow row to the new vector at the time of + * overflow.</li> + * </ul> + * + * <h4>Arrays</h4> + * + * The above covers the case of scalar, top-level columns. The extension to + * scalar maps is straightforward: at run time, the members of maps are just + * simple scalar vectors that reside in a map name space, but the structure + * of map fields is the same as for top-level fields. (Think of map fields + * as being "flattened" into the top-level tuple.) + * <p> + * Arrays are a different matter: each row can have many values associated + * with it. Consider an array of scalars. We have: + * <pre><code> + * Row 0 Row 1 Row 2 + * 0 1 2 3 4 5 6 7 8 + * [ [a b c] [d e f] | [g h i] ] + * </code></pre> + * Here, the letters indicate values. The brackets show the overall vector + * (outer brackets) and individual rows (inner brackets). The vertical line + * shows where overflow occurred. The same rules as discussed earier still + * apply, but we must consider both the row indexes and the array indexes. + * <ul> + * <li>Overflow occurs at the row level. Here row 2 overflowed and must + * be moved to the look-ahead vector.</li> + * <li>Value movement occurs at the value level. Here, values 6, 7 and 8 + * must be move to the look-ahead vector.</li> + * </ul> + * The result, after overflow, is: + * <pre><code> + * Row 0 Row 1 Row 0 + * 0 1 2 3 4 5 0 1 2 + * [ [a b c] [d e f] ] [ [g h i] ] + * </code></pre> + * Further, we must consider lists: a column may consist of a list of + * arrays. Or, a column may consist of an array of maps, one of which is + * a list of arrays. So, the above reasoning must apply recursively down + * the value tree. + * <p> + * As it turns out, there is a simple recursive algorithm, which is a + * simple extension of the reasoning for the top-level scalar case, that can + * handle arrays: + * <ul> + * <li>Start with the row index of the overflow row.</li> + * <li>If column c, say, is an array, obtain the index of the first value for + * the overflow row.</li> + * <li>If c is a list, or a repeated map, then repeat the above, for each + * member of c (a single column for a list, a set of columns for a map), but + * replace the row index with the index of the first element.</li> + * </ul> + * The result will be a walk of the value tree in which the overflow index + * starts as an index relative to the result set (a row index), and is + * recursively replaced with an array offset for each level of the array. + * + * <h4>Resynching Writers after Overflow</h4> + * + * When an overflow occurs, our focus is starts with the single top-level row + * that will not fit into the current batch. We move this row to the look-ahead + * vectors. Doing so is quite simple when each row is a simple tuple. As + * described above, the work is quite a bit more complex when the structure + * is a JSON-like tree flattened into vectors. + * <p> + * Consider the writers. Each writer corresponds to a single vector. Writers + * are grouped into logical tree nodes. Those in the root node write to + * (single, scalar) columns that are either top-level columns, or nested + * some level down in single-value (not array) tuples. Another tree level + * occurs in an array: the elements of the array use a different + * (faster-changing) index than the top (row-level) writers. Different arrays + * have different indexes: a row may have, say, four elements in array A, + * but 20 elements in array B. + * <p> + * Further, arrays can be singular (a repeated int, say) or for an entire + * tuple (a repeated map.) And, since Drill supports the full JSON model, in + * the most general case, there is a tree of array indexes that can be nested + * to an arbitrary level. (A row can have an array of maps which contains a + * column that is, itself, a list of repeated maps, a field of which is an + * array of ints.) + * <p> + * Writers handle this index tree via a tree of {@link ColumnWriterIndex} + * objects, often specialized for various tasks. + * <p> + * Now we can get to the key concept in this section: how we update those indexes + * after an overflow. The top-level index reverts to zero. (We start writing + * the 0th row in the new look-ahead batch.) But, nested indexes (those for arrays) + * will start at some other position depending on the number elements already + * written in an overflow row. The number of such elements is determined by a + * top-down traversal of the tree (to determine the start offset of each array + * for the row.) Resetting the writer indexes is a bottom-up process: based on + * the number of elements in that array, the writer index is reset to match. + * <p> + * This flow is the opposite of the "normal" case in which a new batch is started + * top-down, with each index being reset to zero. + * + * <h4>The Need for a Uniform Structure</h4> + * + * Drill has vastly different implementations and interfaces for: + * <ul> + * <li>Result sets (as a {@link VectorContainer}),</li> + * <li>Arrays (as a generated repeated vector),</li> + * <li>Lists (as a {@link ListVector}),</li> + * <li>Repeated lists (as a {@link RepeatedList vector}, and</li> + * <li>Repeated maps ({@link RepeatedMapVector}.</li> + * </ul> + * If we were to work directly with the above abstractions the code would be + * vastly complex. Instead, we abstract out the common structure into the + * {@link TupleMode} abstraction. In particular, we use the + * single tuple model which works with a single batch. This model provides a + * simple, uniform interface to work with columns and tuples (rows, maps), + * and a simple way to work with arrays. This interface reduces the above + * array algorithm to a simple set of recursive method calls. + */ + +package org.apache.drill.exec.physical.rowSet.impl; \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/model/BaseTupleModel.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/model/BaseTupleModel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/model/BaseTupleModel.java new file mode 100644 index 0000000..40da4ec --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/model/BaseTupleModel.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.rowSet.model; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.drill.exec.record.ColumnMetadata; +import org.apache.drill.exec.record.TupleMetadata; +import org.apache.drill.exec.record.TupleSchema; +import org.apache.drill.exec.record.TupleSchema.AbstractColumnMetadata; +import org.apache.drill.exec.record.VectorContainer; + +/** + * Base implementation for a tuple model which is common to the "single" + * and "hyper" cases. Deals primarily with the structure of the model, + * which is common between the two physical implementations. + */ + +public abstract class BaseTupleModel implements TupleModel { + + public static abstract class BaseColumnModel implements ColumnModel { + + /** + * Extended schema associated with a column. + */ + + protected final ColumnMetadata schema; + + public BaseColumnModel(ColumnMetadata schema) { + this.schema = schema; + } + + @Override + public ColumnMetadata schema() { return schema; } + + @Override + public TupleModel mapModel() { return null; } + } + + /** + * Columns within the tuple. Columns may, themselves, be represented + * as tuples. + */ + + protected final List<ColumnModel> columns; + + /** + * Descriptive schema associated with the columns above. Unlike a + * {@link VectorContainer}, this abstraction keeps the schema in sync + * with vectors as columns are added. + */ + + protected final TupleSchema schema; + + public BaseTupleModel() { + + // Schema starts empty and is built as columns are added. + // This ensures that the schema stays in sync with the + // backing vectors. + + schema = new TupleSchema(); + columns = new ArrayList<>(); + } + + public BaseTupleModel(TupleSchema schema, List<ColumnModel> columns) { + this.schema = schema; + this.columns = columns; + assert schema.size() == columns.size(); + } + + @Override + public TupleMetadata schema() { return schema; } + + @Override + public int size() { return schema.size(); } + + @Override + public ColumnModel column(int index) { + return columns.get(index); + } + + @Override + public ColumnModel column(String name) { + return column(schema.index(name)); + } + + /** + * Perform the work of keeping the list of columns and schema in-sync + * as columns are added. This is protected because derived classes + * must add logic to keep the new column in sync with the underlying + * container or map vector. + * + * @param column column implementation to add + */ + + protected void addBaseColumn(BaseColumnModel column) { + schema.add((AbstractColumnMetadata) column.schema()); + columns.add(column); + assert columns.size() == schema.size(); + } +} http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/model/ContainerVisitor.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/model/ContainerVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/model/ContainerVisitor.java new file mode 100644 index 0000000..28c8c59 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/model/ContainerVisitor.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.rowSet.model; + +import org.apache.drill.common.types.TypeProtos.DataMode; +import org.apache.drill.common.types.TypeProtos.MajorType; +import org.apache.drill.common.types.TypeProtos.MinorType; +import org.apache.drill.exec.record.MaterializedField; +import org.apache.drill.exec.record.VectorContainer; +import org.apache.drill.exec.vector.ValueVector; +import org.apache.drill.exec.vector.complex.AbstractMapVector; +import org.apache.drill.exec.vector.complex.BaseRepeatedValueVector; +import org.apache.drill.exec.vector.complex.ListVector; +import org.apache.drill.exec.vector.complex.RepeatedListVector; +import org.apache.drill.exec.vector.complex.RepeatedMapVector; + +public class ContainerVisitor<R, A> { + + public R apply(VectorContainer container, A arg) { + return visitContainer(container, arg); + } + + private R visitContainer(VectorContainer container, A arg) { + return visitChildren(container, arg); + } + + public R visitChildren(VectorContainer container, A arg) { + for (int i = 0; i < container.getNumberOfColumns(); i++) { + @SuppressWarnings("resource") + ValueVector vector = container.getValueVector(i).getValueVector(); + apply(vector, arg); + } + return null; + } + + protected R apply(ValueVector vector, A arg) { + MaterializedField schema = vector.getField(); + MajorType majorType = schema.getType(); + MinorType type = majorType.getMinorType(); + DataMode mode = majorType.getMode(); + switch (type) { + case MAP: + if (mode == DataMode.REPEATED) { + return visitRepeatedMap((RepeatedMapVector) vector, arg); + } else { + return visitMap((AbstractMapVector) vector, arg); + } + case LIST: + if (mode == DataMode.REPEATED) { + return visitRepeatedList((RepeatedListVector) vector, arg); + } else { + return visitList((ListVector) vector, arg); + } + default: + if (mode == DataMode.REPEATED) { + return visitRepeatedPrimitive((BaseRepeatedValueVector) vector, arg); + } else { + return visitPrimitive(vector, arg); + } + } + } + + protected R visitRepeatedMap(RepeatedMapVector vector, A arg) { + visitChildren(vector, arg); + return visitVector(vector, arg); + } + + protected R visitMap(AbstractMapVector vector, A arg) { + visitChildren(vector, arg); + return visitVector(vector, arg); + } + + private R visitChildren(AbstractMapVector vector, A arg) { + for (int i = 0; i < vector.size(); i++) { + apply(vector.getChildByOrdinal(i), arg); + } + return null; + } + + protected R visitRepeatedList(RepeatedListVector vector, A arg) { + return visitVector(vector, arg); + } + + protected R visitList(ListVector vector, A arg) { + return visitVector(vector, arg); + } + + protected R visitRepeatedPrimitive(BaseRepeatedValueVector vector, A arg) { + return visitVector(vector, arg); + } + + protected R visitPrimitive(ValueVector vector, A arg) { + return visitVector(vector, arg); + } + + protected R visitVector(ValueVector vector, A arg) { + return null; + } + +} http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/model/MetadataProvider.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/model/MetadataProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/model/MetadataProvider.java new file mode 100644 index 0000000..bb5e18e --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/model/MetadataProvider.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.rowSet.model; + +import org.apache.drill.exec.record.ColumnMetadata; +import org.apache.drill.exec.record.MaterializedField; +import org.apache.drill.exec.record.TupleMetadata; +import org.apache.drill.exec.record.TupleSchema; + +/** + * Interface for retrieving and/or creating metadata given + * a vector. + */ + +public interface MetadataProvider { + ColumnMetadata metadata(int index, MaterializedField field); + MetadataProvider childProvider(ColumnMetadata colMetadata); + TupleMetadata tuple(); + + public static class VectorDescrip { + public final MetadataProvider parent; + public final ColumnMetadata metadata; + + public VectorDescrip(MetadataProvider provider, int index, + MaterializedField field) { + parent = provider; + metadata = provider.metadata(index, field); + } + } + + public static class MetadataCreator implements MetadataProvider { + + private final TupleSchema tuple; + + public MetadataCreator() { + tuple = new TupleSchema(); + } + + public MetadataCreator(TupleSchema tuple) { + this.tuple = tuple; + } + + @Override + public ColumnMetadata metadata(int index, MaterializedField field) { + return tuple.addView(field); + } + + @Override + public MetadataProvider childProvider(ColumnMetadata colMetadata) { + return new MetadataCreator((TupleSchema) colMetadata.mapSchema()); + } + + @Override + public TupleMetadata tuple() { return tuple; } + } + + public static class MetadataRetrieval implements MetadataProvider { + + private final TupleMetadata tuple; + + public MetadataRetrieval(TupleMetadata schema) { + tuple = schema; + } + + @Override + public ColumnMetadata metadata(int index, MaterializedField field) { + return tuple.metadata(index); + } + + @Override + public MetadataProvider childProvider(ColumnMetadata colMetadata) { + return new MetadataRetrieval((TupleSchema) colMetadata.mapSchema()); + } + + @Override + public TupleMetadata tuple() { return tuple; } + } +} http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/model/ReaderIndex.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/model/ReaderIndex.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/model/ReaderIndex.java new file mode 100644 index 0000000..c4b0415 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/model/ReaderIndex.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.rowSet.model; + +import org.apache.drill.exec.vector.accessor.ColumnReaderIndex; + +/** + * Row set index base class used when indexing rows within a row + * set for a row set reader. Keeps track of the current position, + * which starts before the first row, meaning that the client + * must call <tt>next()</tt> to advance to the first row. + */ + +public abstract class ReaderIndex implements ColumnReaderIndex { + + protected int rowIndex = -1; + protected final int rowCount; + + public ReaderIndex(int rowCount) { + this.rowCount = rowCount; + } + + public int position() { return rowIndex; } + public void set(int index) { rowIndex = index; } + + public boolean next() { + if (++rowIndex < rowCount ) { + return true; + } else { + rowIndex--; + return false; + } + } + + public int size() { return rowCount; } + + public boolean valid() { return rowIndex < rowCount; } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/model/SchemaInference.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/model/SchemaInference.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/model/SchemaInference.java new file mode 100644 index 0000000..3db01dd --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/model/SchemaInference.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.rowSet.model; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.drill.common.types.TypeProtos.MinorType; +import org.apache.drill.exec.record.ColumnMetadata; +import org.apache.drill.exec.record.MaterializedField; +import org.apache.drill.exec.record.TupleMetadata; +import org.apache.drill.exec.record.TupleSchema; +import org.apache.drill.exec.record.VectorContainer; + +/** + * Produce a metadata schema from a vector container. Used when given a + * record batch without metadata. + */ + +public class SchemaInference { + + public TupleMetadata infer(VectorContainer container) { + List<ColumnMetadata> columns = new ArrayList<>(); + for (int i = 0; i < container.getNumberOfColumns(); i++) { + MaterializedField field = container.getValueVector(i).getField(); + columns.add(inferVector(field)); + } + return TupleSchema.fromColumns(columns); + } + + private ColumnMetadata inferVector(MaterializedField field) { + if (field.getType().getMinorType() == MinorType.MAP) { + return TupleSchema.newMap(field, inferMapSchema(field)); + } else { + return TupleSchema.fromField(field); + } + } + + private TupleSchema inferMapSchema(MaterializedField field) { + List<ColumnMetadata> columns = new ArrayList<>(); + for (MaterializedField child : field.getChildren()) { + columns.add(inferVector(child)); + } + return TupleSchema.fromColumns(columns); + } +} http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/model/TupleModel.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/model/TupleModel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/model/TupleModel.java new file mode 100644 index 0000000..5fcba73 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/model/TupleModel.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.rowSet.model; + +import javax.sql.RowSet; + +import org.apache.drill.exec.record.ColumnMetadata; +import org.apache.drill.exec.record.MaterializedField; +import org.apache.drill.exec.record.TupleMetadata; +import org.apache.drill.exec.record.TupleSchema; +import org.apache.drill.exec.record.VectorContainer; +import org.apache.drill.exec.vector.complex.AbstractMapVector; + +/** + * Common interface to access a tuple backed by a vector container or a + * map vector. Provides a visitor interface to apply tasks such as vector + * allocation, reader or writer creation, and so on. Allows either static + * or dynamic vector allocation. + * <p> + * The terminology used here: + * <dl> + * <dt>Row set</dt> + * <dd>A collection of rows stored as value vectors. Elsewhere in + * Drill we call this a "record batch", but that term has been overloaded to + * mean the runtime implementation of an operator.</dd> + * <dt>Tuple</dt> + * <dd>The relational-theory term for a row. Drill maps have a fixed schema. + * Impala, Hive and other tools use the term "structure" (or "struct") for + * what Drill calls a map. A structure is simply a nested tuple, modeled + * here by the same tuple abstraction used for rows.</dd> + * <dt>Column</dt> + * <dd>A column is represented by a vector (which may have internal + * null-flag or offset vectors.) Maps are a kind of column that has an + * associated tuple. Because this abstraction models structure, array + * columns are grouped with single values: the array-ness is just cardinality.</dd> + * <dt>Visitor</dt> + * <dd>The visitor abstraction (classic Gang-of-Four pattern) allows adding + * functionality without complicating the structure classes. Allows the same + * abstraction to be used for the testing {@link RowSet} abstractions and + * the scan operator "loader" classes.</dd> + * <dt>Metadata</dt> + * <dd>Metadata is simply data about data. Here, data about tuples and columns. + * The column metadata mostly expands on that available in {@link MaterializedField}, + * but also adds allocation hints. + * </dl> + * <p> + * This abstraction is the physical dual of a {@link VectorContainer}. + * The vectors are "owned" by + * the associated container. The structure here simply applies additional + * metadata and visitor behavior to allow much easier processing that is + * possible with the raw container structure. + * <p> + * A key value of this abstraction is the extended {@link TupleSchema} + * associated with the structure. Unlike a + * {@link VectorContainer}, this abstraction keeps the schema in sync + * with vectors as columns are added. + * <p> + * Some future version may wish to merge the two concepts. That way, metadata + * discovered by one operator will be available to another. Complex recursive + * functions can be replace by a visitor with the recursion handled inside + * implementations of this interface. + * <p> + * Tuples provide access to columns by both index and name. Both the schema and + * model classes follow this convention. Compared with the VectorContainer and + * {@link AbstractMapVector} classes, the vector index is a first-class concept: + * the column model and schema are guaranteed to reside at the same index relative + * to the enclosing tuple. In addition, name access is efficient using a hash + * index. + * <p> + * Visitor classes are defined by the "simple" (single batch) and "hyper" + * (multi-batch) implementations to allow vector implementations to work + * with the specifics of each type of batch. + */ + +public interface TupleModel { + + /** + * Common interface to access a column vector, its metadata, and its + * tuple definition (for maps.) Provides a visitor interface for common + * vector tasks. + */ + + public interface ColumnModel { + ColumnMetadata schema(); + TupleModel mapModel(); + } + + /** + * Tuple-model interface for the top-level row (tuple) structure. + * Provides access to the {@link VectorContainer} representation of the + * row set (record batch.) + */ + + public interface RowSetModel extends TupleModel { + VectorContainer container(); + } + + TupleMetadata schema(); + int size(); + ColumnModel column(int index); + ColumnModel column(String name); +} http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/model/hyper/BaseReaderBuilder.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/model/hyper/BaseReaderBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/model/hyper/BaseReaderBuilder.java new file mode 100644 index 0000000..ee856be --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/model/hyper/BaseReaderBuilder.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.rowSet.model.hyper; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.drill.common.types.TypeProtos.DataMode; +import org.apache.drill.common.types.TypeProtos.MajorType; +import org.apache.drill.common.types.TypeProtos.MinorType; +import org.apache.drill.exec.physical.rowSet.model.MetadataProvider; +import org.apache.drill.exec.physical.rowSet.model.MetadataProvider.VectorDescrip; +import org.apache.drill.exec.physical.rowSet.model.ReaderIndex; +import org.apache.drill.exec.record.HyperVectorWrapper; +import org.apache.drill.exec.record.MaterializedField; +import org.apache.drill.exec.record.VectorContainer; +import org.apache.drill.exec.record.VectorWrapper; +import org.apache.drill.exec.record.selection.SelectionVector4; +import org.apache.drill.exec.vector.ValueVector; +import org.apache.drill.exec.vector.accessor.ColumnReaderIndex; +import org.apache.drill.exec.vector.accessor.impl.AccessorUtilities; +import org.apache.drill.exec.vector.accessor.reader.AbstractObjectReader; +import org.apache.drill.exec.vector.accessor.reader.ColumnReaderFactory; +import org.apache.drill.exec.vector.accessor.reader.MapReader; +import org.apache.drill.exec.vector.accessor.reader.ObjectArrayReader; +import org.apache.drill.exec.vector.accessor.reader.VectorAccessor; +import org.apache.drill.exec.vector.complex.AbstractMapVector; + +public abstract class BaseReaderBuilder { + + /** + * Read-only row index into the hyper row set with batch and index + * values mapping via an SV4. + */ + + public static class HyperRowIndex extends ReaderIndex { + + private final SelectionVector4 sv4; + + public HyperRowIndex(SelectionVector4 sv4) { + super(sv4.getCount()); + this.sv4 = sv4; + } + + @Override + public int vectorIndex() { + return AccessorUtilities.sv4Index(sv4.get(rowIndex)); + } + + @Override + public int batchIndex( ) { + return AccessorUtilities.sv4Batch(sv4.get(rowIndex)); + } + } + + /** + * Vector accessor used by the column accessors to obtain the vector for + * each column value. That is, position 0 might be batch 4, index 3, + * while position 1 might be batch 1, index 7, and so on. + */ + + public static class HyperVectorAccessor implements VectorAccessor { + + private final ValueVector[] vectors; + private ColumnReaderIndex rowIndex; + + public HyperVectorAccessor(VectorWrapper<?> vw) { + vectors = vw.getValueVectors(); + } + + @Override + public void bind(ColumnReaderIndex index) { + rowIndex = index; + } + + @Override + public ValueVector vector() { + return vectors[rowIndex.batchIndex()]; + } + } + + + protected AbstractObjectReader[] buildContainerChildren( + VectorContainer container, MetadataProvider mdProvider) { + List<AbstractObjectReader> readers = new ArrayList<>(); + for (int i = 0; i < container.getNumberOfColumns(); i++) { + VectorWrapper<?> vw = container.getValueVector(i); + VectorDescrip descrip = new VectorDescrip(mdProvider, i, vw.getField()); + readers.add(buildVectorReader(vw, descrip)); + } + return readers.toArray(new AbstractObjectReader[readers.size()]); + } + + @SuppressWarnings("unchecked") + private AbstractObjectReader buildVectorReader(VectorWrapper<?> vw, VectorDescrip descrip) { + MajorType type = vw.getField().getType(); + if (type.getMinorType() == MinorType.MAP) { + if (type.getMode() == DataMode.REPEATED) { + return buildMapArrayReader((HyperVectorWrapper<? extends AbstractMapVector>) vw, descrip); + } else { + return buildMapReader((HyperVectorWrapper<? extends AbstractMapVector>) vw, descrip); + } + } else { + return buildPrimitiveReader(vw, descrip); + } + } + + private AbstractObjectReader buildMapArrayReader(HyperVectorWrapper<? extends AbstractMapVector> vectors, VectorDescrip descrip) { + AbstractObjectReader mapReader = MapReader.build(descrip.metadata, buildMap(vectors, descrip)); + return ObjectArrayReader.build(new HyperVectorAccessor(vectors), mapReader); + } + + private AbstractObjectReader buildMapReader(HyperVectorWrapper<? extends AbstractMapVector> vectors, VectorDescrip descrip) { + return MapReader.build(descrip.metadata, buildMap(vectors, descrip)); + } + + private AbstractObjectReader buildPrimitiveReader(VectorWrapper<?> vw, VectorDescrip descrip) { + return ColumnReaderFactory.buildColumnReader( + vw.getField().getType(), new HyperVectorAccessor(vw)); + } + + private List<AbstractObjectReader> buildMap(HyperVectorWrapper<? extends AbstractMapVector> vectors, VectorDescrip descrip) { + List<AbstractObjectReader> readers = new ArrayList<>(); + MetadataProvider provider = descrip.parent.childProvider(descrip.metadata); + MaterializedField mapField = vectors.getField(); + for (int i = 0; i < mapField.getChildren().size(); i++) { + HyperVectorWrapper<? extends ValueVector> child = (HyperVectorWrapper<? extends ValueVector>) vectors.getChildWrapper(new int[] {i}); + VectorDescrip childDescrip = new VectorDescrip(provider, i, child.getField()); + readers.add(buildVectorReader(child, childDescrip)); + i++; + } + return readers; + } +} http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/model/hyper/package-info.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/model/hyper/package-info.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/model/hyper/package-info.java new file mode 100644 index 0000000..433231e --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/model/hyper/package-info.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Implementation of a row set model for hyper-batches. A hyper batch is + * one that contains a list of batches. The batch is logically comprised + * of "hyper-vectors" which are the individual vectors from each batch + * stacked "end-to-end." + * <p> + * Hyper batches allow only reading. So, the only services here are to + * parse a hyper-container into a row set model, then use that model to + * create a matching set of readers. + */ + +package org.apache.drill.exec.physical.rowSet.model.hyper; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/model/package-info.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/model/package-info.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/model/package-info.java new file mode 100644 index 0000000..6f24d33 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/model/package-info.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * The "row set model" provides a "dual" of the vector structure used to create, + * allocate and work with a collection of vectors. The model provides an enhanced + * "metadata" schema, given by {@link TupleMetadata} and {@link ColumnMetadata}, + * with allocation hints that goes beyond the {@link MaterializedField} + * used by value vectors. + * <p> + * In an ideal world, this structure would not be necessary; the vectors could, by + * themselves, provide the needed structure. However, vectors are used in many + * places, in many ways, and are hard to evolve. Further, Drill may eventually + * choose to move to Arrow, which would not have the structure provided here. + * <p> + * A set of visitor classes provide the logic to traverse the vector structure, + * avoiding the need for multiple implementations of vector traversal. (Traversal + * is needed because maps contain vectors, some of which can be maps, resulting + * in a tree structure. Further, the API provided by containers (a top-level + * tuple) differs from that of a map vector (nested tuple.) This structure provides + * a uniform API for both cases. + * <p> + * Three primary tasks provided by this structure are: + * <ol> + * <li>Create writers for a set of vectors. Allow incremental write-time + * addition of columns, keeping the vectors, columns and metadata all in + * sync.</li> + * <li>Create readers for a set of vectors. Vectors are immutable once written, + * so the reader mechanism does not provide any dynamic schema change + * support.</li> + * <li>Allocate vectors based on metadata provided. Allocation metadata + * includes estimated widths for variable-width columns and estimated + * cardinality for array columns.</li> + * </ol> + * <p> + * Drill supports two kinds of batches, reflected by two implementations of + * the structure: + * <dl> + * <dt>Single batch</dt> + * <dd>Represents a single batch in which each column is backed by a single + * value vector. Single batches support both reading and writing. Writing can + * be done only for "new" batches; reading can be done only after writing + * is complete. Modeled by the {#link org.apache.drill.exec.physical.rowSet.model.single + * single} package.</dd> + * <dt>Hyper batch</dt> + * <dd>Represents a stacked set of batches in which each column is backed + * by a list of columns. A hyper batch is indexed by an "sv4" (four-byte + * selection vector.) A hyper batch allows only reading. Modeled by the + * {@link org.apache.drill.exec.physical.rowSet.model.hyper hyper} package.</dd> + * </dl> + */ + +package org.apache.drill.exec.physical.rowSet.model; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/model/single/BaseReaderBuilder.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/model/single/BaseReaderBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/model/single/BaseReaderBuilder.java new file mode 100644 index 0000000..80ad19f --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/model/single/BaseReaderBuilder.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.rowSet.model.single; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.drill.common.types.TypeProtos.DataMode; +import org.apache.drill.common.types.TypeProtos.MajorType; +import org.apache.drill.common.types.TypeProtos.MinorType; +import org.apache.drill.exec.physical.rowSet.model.MetadataProvider; +import org.apache.drill.exec.physical.rowSet.model.MetadataProvider.VectorDescrip; +import org.apache.drill.exec.record.VectorContainer; +import org.apache.drill.exec.vector.ValueVector; +import org.apache.drill.exec.vector.accessor.reader.AbstractObjectReader; +import org.apache.drill.exec.vector.accessor.reader.ColumnReaderFactory; +import org.apache.drill.exec.vector.accessor.reader.MapReader; +import org.apache.drill.exec.vector.accessor.reader.ObjectArrayReader; +import org.apache.drill.exec.vector.complex.AbstractMapVector; +import org.apache.drill.exec.vector.complex.MapVector; +import org.apache.drill.exec.vector.complex.RepeatedMapVector; + +public abstract class BaseReaderBuilder { + + protected List<AbstractObjectReader> buildContainerChildren( + VectorContainer container, MetadataProvider mdProvider) { + List<AbstractObjectReader> writers = new ArrayList<>(); + for (int i = 0; i < container.getNumberOfColumns(); i++) { + @SuppressWarnings("resource") + ValueVector vector = container.getValueVector(i).getValueVector(); + VectorDescrip descrip = new VectorDescrip(mdProvider, i, vector.getField()); + writers.add(buildVectorReader(vector, descrip)); + } + return writers; + } + + private AbstractObjectReader buildVectorReader(ValueVector vector, VectorDescrip descrip) { + MajorType type = vector.getField().getType(); + if (type.getMinorType() == MinorType.MAP) { + if (type.getMode() == DataMode.REPEATED) { + return buildMapArrayReader((RepeatedMapVector) vector, descrip); + } else { + return buildMapReader((MapVector) vector, descrip); + } + } else { + return buildPrimitiveReader(vector, descrip); + } + } + + private AbstractObjectReader buildMapArrayReader(RepeatedMapVector vector, VectorDescrip descrip) { + AbstractObjectReader mapReader = MapReader.build(descrip.metadata, buildMap(vector, descrip)); + return ObjectArrayReader.build(vector, mapReader); + } + + private AbstractObjectReader buildMapReader(MapVector vector, VectorDescrip descrip) { + return MapReader.build(descrip.metadata, buildMap(vector, descrip)); + } + + private AbstractObjectReader buildPrimitiveReader(ValueVector vector, VectorDescrip descrip) { + return ColumnReaderFactory.buildColumnReader(vector); + } + + private List<AbstractObjectReader> buildMap(AbstractMapVector vector, VectorDescrip descrip) { + List<AbstractObjectReader> readers = new ArrayList<>(); + MetadataProvider provider = descrip.parent.childProvider(descrip.metadata); + int i = 0; + for (ValueVector child : vector) { + VectorDescrip childDescrip = new VectorDescrip(provider, i, child.getField()); + readers.add(buildVectorReader(child, childDescrip)); + i++; + } + return readers; + } +} http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/model/single/BaseWriterBuilder.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/model/single/BaseWriterBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/model/single/BaseWriterBuilder.java new file mode 100644 index 0000000..bab7b39 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/model/single/BaseWriterBuilder.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.rowSet.model.single; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.drill.common.types.TypeProtos.MajorType; +import org.apache.drill.common.types.TypeProtos.MinorType; +import org.apache.drill.exec.physical.rowSet.model.MetadataProvider; +import org.apache.drill.exec.physical.rowSet.model.MetadataProvider.VectorDescrip; +import org.apache.drill.exec.record.VectorContainer; +import org.apache.drill.exec.vector.ValueVector; +import org.apache.drill.exec.vector.accessor.writer.AbstractObjectWriter; +import org.apache.drill.exec.vector.accessor.writer.ColumnWriterFactory; +import org.apache.drill.exec.vector.complex.AbstractMapVector; + +/** + * Build a set of writers for a single (non-hyper) vector container. + */ + +public abstract class BaseWriterBuilder { + + protected List<AbstractObjectWriter> buildContainerChildren(VectorContainer container, MetadataProvider mdProvider) { + List<AbstractObjectWriter> writers = new ArrayList<>(); + for (int i = 0; i < container.getNumberOfColumns(); i++) { + @SuppressWarnings("resource") + ValueVector vector = container.getValueVector(i).getValueVector(); + VectorDescrip descrip = new VectorDescrip(mdProvider, i, vector.getField()); + writers.add(buildVectorWriter(vector, descrip)); + } + return writers; + } + + private AbstractObjectWriter buildVectorWriter(ValueVector vector, VectorDescrip descrip) { + MajorType type = vector.getField().getType(); + if (type.getMinorType() == MinorType.MAP) { + return ColumnWriterFactory.buildMapWriter(descrip.metadata, + (AbstractMapVector) vector, + buildMap((AbstractMapVector) vector, descrip)); + } else { + return ColumnWriterFactory.buildColumnWriter(descrip.metadata, vector); + } + } + + private List<AbstractObjectWriter> buildMap(AbstractMapVector vector, VectorDescrip descrip) { + List<AbstractObjectWriter> writers = new ArrayList<>(); + MetadataProvider provider = descrip.parent.childProvider(descrip.metadata); + int i = 0; + for (ValueVector child : vector) { + VectorDescrip childDescrip = new VectorDescrip(provider, i, child.getField()); + writers.add(buildVectorWriter(child, childDescrip)); + i++; + } + return writers; + } +} http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/model/single/BuildVectorsFromMetadata.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/model/single/BuildVectorsFromMetadata.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/model/single/BuildVectorsFromMetadata.java new file mode 100644 index 0000000..30f60b3 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/model/single/BuildVectorsFromMetadata.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.rowSet.model.single; + +import org.apache.drill.exec.expr.TypeHelper; +import org.apache.drill.exec.memory.BufferAllocator; +import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; +import org.apache.drill.exec.record.ColumnMetadata; +import org.apache.drill.exec.record.MaterializedField; +import org.apache.drill.exec.record.TupleMetadata; +import org.apache.drill.exec.record.VectorContainer; +import org.apache.drill.exec.vector.ValueVector; +import org.apache.drill.exec.vector.complex.AbstractMapVector; + +/** + * Build (materialize) as set of vectors based on a provided + * metadata schema. + */ + +public class BuildVectorsFromMetadata { + + private final BufferAllocator allocator; + + public BuildVectorsFromMetadata(BufferAllocator allocator) { + this.allocator = allocator; + } + + public VectorContainer build(TupleMetadata schema) { + VectorContainer container = new VectorContainer(allocator); + for (int i = 0; i < schema.size(); i++) { + container.add(buildVector(schema.metadata(i))); + } + + // Build the row set from a matching triple of schema, container and + // column models. + + container.buildSchema(SelectionVectorMode.NONE); + return container; + } + + private ValueVector buildVector(ColumnMetadata metadata) { + if (metadata.isMap()) { + return buildMap(metadata); + } else { + return TypeHelper.getNewVector(metadata.schema(), allocator, null); + } + } + + /** + * Build a map column including the members of the map given a map + * column schema. + * + * @param schema the schema of the map column + * @return the completed map vector column model + */ + + private AbstractMapVector buildMap(ColumnMetadata schema) { + + // Creating the map vector will create its contained vectors if we + // give it a materialized field with children. So, instead pass a clone + // without children so we can add them. + + MaterializedField mapField = schema.schema(); + MaterializedField emptyClone = MaterializedField.create(mapField.getName(), mapField.getType()); + + // Don't get the map vector from the vector cache. Map vectors may + // have content that varies from batch to batch. Only the leaf + // vectors can be cached. + + AbstractMapVector mapVector = (AbstractMapVector) TypeHelper.getNewVector(emptyClone, allocator, null); + + // Create the contents building the model as we go. + + TupleMetadata mapSchema = schema.mapSchema(); + for (int i = 0; i < mapSchema.size(); i++) { + ColumnMetadata childSchema = mapSchema.metadata(i); + mapVector.putChild(childSchema.name(), buildVector(childSchema)); + } + + return mapVector; + } +} http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/model/single/VectorAllocator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/model/single/VectorAllocator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/model/single/VectorAllocator.java new file mode 100644 index 0000000..34a6960 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/model/single/VectorAllocator.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.rowSet.model.single; + +import org.apache.drill.common.types.TypeProtos.DataMode; +import org.apache.drill.common.types.TypeProtos.MajorType; +import org.apache.drill.common.types.TypeProtos.MinorType; +import org.apache.drill.exec.physical.impl.spill.RecordBatchSizer; +import org.apache.drill.exec.physical.rowSet.model.MetadataProvider; +import org.apache.drill.exec.physical.rowSet.model.MetadataProvider.MetadataCreator; +import org.apache.drill.exec.physical.rowSet.model.MetadataProvider.MetadataRetrieval; +import org.apache.drill.exec.record.ColumnMetadata; +import org.apache.drill.exec.record.TupleMetadata; +import org.apache.drill.exec.record.VectorContainer; +import org.apache.drill.exec.vector.AllocationHelper; +import org.apache.drill.exec.vector.ValueVector; +import org.apache.drill.exec.vector.complex.AbstractMapVector; +import org.apache.drill.exec.vector.complex.RepeatedMapVector; + +/** + * Given a vector container, and a metadata schema that matches the container, + * walk the schema tree to allocate new vectors according to a given + * row count and the size information provided in column metadata. + * <p> + * @see {@link AllocationHelper} - the class which this one replaces + * @see {@link VectorInitializer} - an earlier cut at implementation + * based on data from the {@link RecordBatchSizer} + */ + +// TODO: Does not yet handle lists; lists are a simple extension +// of the array-handling logic below. + +public class VectorAllocator { + + private final VectorContainer container; + + public VectorAllocator(VectorContainer container) { + this.container = container; + } + + public void allocate(int rowCount) { + allocate(rowCount, new MetadataCreator()); + } + + public void allocate(int rowCount, TupleMetadata schema) { + allocate(rowCount, new MetadataRetrieval(schema)); + } + + public void allocate(int rowCount, MetadataProvider mdProvider) { + for (int i = 0; i < container.getNumberOfColumns(); i++) { + @SuppressWarnings("resource") + ValueVector vector = container.getValueVector(i).getValueVector(); + allocateVector(vector, mdProvider.metadata(i, vector.getField()), rowCount, mdProvider); + } + } + + private void allocateVector(ValueVector vector, ColumnMetadata metadata, int valueCount, MetadataProvider mdProvider) { + MajorType type = vector.getField().getType(); + assert vector.getField().getName().equals(metadata.name()); + assert type.getMinorType() == metadata.type(); + if (type.getMinorType() == MinorType.MAP) { + if (type.getMode() == DataMode.REPEATED) { + allocateMapArray((RepeatedMapVector) vector, metadata, valueCount, mdProvider); + } else { + allocateMap((AbstractMapVector) vector, metadata, valueCount, mdProvider); + } + } else { + allocatePrimitive(vector, metadata, valueCount); + } + } + + private void allocatePrimitive(ValueVector vector, + ColumnMetadata metadata, int valueCount) { + AllocationHelper.allocatePrecomputedChildCount(vector, + valueCount, + metadata.expectedWidth(), + metadata.expectedElementCount()); + } + + private void allocateMapArray(RepeatedMapVector vector, + ColumnMetadata metadata, int valueCount, MetadataProvider mdProvider) { + ((RepeatedMapVector) vector).getOffsetVector().allocateNew(valueCount); + int expectedValueCount = valueCount * metadata.expectedElementCount(); + allocateMap(vector, metadata, expectedValueCount, mdProvider); + } + + private void allocateMap(AbstractMapVector vector, ColumnMetadata metadata, int valueCount, MetadataProvider mdProvider) { + MetadataProvider mapProvider = mdProvider.childProvider(metadata); + TupleMetadata mapSchema = metadata.mapSchema(); + assert mapSchema != null; + int i = 0; + for (ValueVector child : vector) { + allocateVector(child, mapProvider.metadata(i, child.getField()), valueCount, mapProvider); + i++; + } + } +} http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/model/single/package-info.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/model/single/package-info.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/model/single/package-info.java new file mode 100644 index 0000000..6cb6f27 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/model/single/package-info.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * This set of classes models the structure of a batch consisting + * of single vectors (as contrasted with a hyper batch.) Provides tools + * or metdata-based construction, allocation, reading and writing of + * the vectors. + * <p> + * The classes here walk the container/map/vector tree to apply + * operations. + */ + +package org.apache.drill.exec.physical.rowSet.model.single; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/package-info.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/package-info.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/package-info.java new file mode 100644 index 0000000..d92c6b7 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/package-info.java @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Provides a second-generation row set (AKA "record batch") writer used + * by client code to<ul> + * <li>Define the schema of a result set.</li> + * <li>Write data into the vectors backing a row set.</li></ul> + * <p> + * <h4>Terminology</h4> + * The code here follows the "row/column" naming convention rather than + * the "record/field" convention. + * <dl> + * <dt>Result set</dt> + * <dd>A set of zero or more row sets that hold rows of data.<dd> + * <dt>Row set</dt> + * <dd>A collection of rows with a common schema. Also called a "row + * batch" or "record batch." (But, in Drill, the term "record batch" also + * usually means an operator on that set of records. Here, a row set is + * just the rows &nash; separate from operations on that data.</dd> + * <dt>Row</dt> + * <dd>A single row of data, in the usual database sense. Here, a row is + * a kind of tuple (see below) allowing both name and index access to + * columns.</dd> + * <dt>Tuple</dt> + * <dd>In relational theory, a row is a tuple: a collection of values + * defined by a schema. Tuple values are indexed by position or name.</dd> + * <dt>Column</dt> + * <dd>A single value within a row or row set. (Generally, the context + * makes clear if the term refers to single value or all values for a + * column for a row set. Columns are backed by value vectors.</dd> + * <dt>Map</dt> + * <dd>In Drill, a map is what other systems call a "structure". It is, + * in fact, a nested tuple. In a Java or Python map, each map instance has + * a distinct set of name/value pairs. But, in Drill, all map instances have + * the same schema; hence the so-called "map" is really a tuple. This + * implementation exploits that fact and treats the row, and nested maps, + * almost identically: both provide columns indexed by name or position.</dd> + * <dt>Row Set Mutator</dt> + * <dd>An awkward name, but retains the "mutator" name from the previous + * generation. The mechanism to build a result set as series of row sets.</dd> + * <dt>Tuple Loader</dt> + * <dd>Mechanism to build a single tuple (row or map) by providing name + * or index access to columns. A better name would b "tuple writer", but + * that name is already used elsewhere.</dd> + * <dt>Column Loader</dt> + * <dd>Mechanism to write values to a single column.<dd> + * </dl> + * <h4>Building the Schema</h4> + * The row set mutator works for two cases: a known schema or a discovered + * schema. A known schema occurs in the case, such as JDBC, where the + * underlying data source can describe the schema before reading any rows. + * In this case, client code can build the schema and pass that schema to + * the mutator directly. Alternatively, the client code can build the + * schema column-by-column before the first row is read. + * <p> + * Readers that discover schema can build the schema incrementally: add + * a column, load data for that column for one row, discover the next + * column, and so on. Almost any kind of column can be added at any time + * within the first batch:<ul> + * <li>Required columns are "back-filled" with zeros in the active batch, + * if that value + * makes sense for the column. (Date and Interval columns will throw an + * exception if added after the first row as there is no good "zero" + * value for that column. Varchar columns are back-filled with blanks.<li> + * <li>Optional (nullable) columns can be added at any time; they are + * back-filled with nulls in the active batch. In general, if a column is + * added after the first row, it should be nullable, not required, unless + * the data source has a "missing = blank or zero" policy.</li> + * <li>Repeated (array) columns can be added at any time; they are + * back-filled with empty entries in the first batch. Arrays can also be + * safely added at any time.</li></ul> + * Client code must be aware of the semantics of adding columns at various + * times.<ul> + * <li>Columns added before or during the first row are the trivial case; + * this works for all data types and modes.</li> + * <li>Required (non-nullable0 structured columns (Date, Period) cannot be + * added after the first row (as there is no good zero-fill value.)</li> + * <li>Columns added within the first batch appear to the rest of Drill as + * if they were added before the first row: the downstream operators see the + * same schema from batch to batch.</li> + * <li>Columns added <i>after</i> the first batch will trigger a + * schema-change event downstream.</li> + * <li>The above is true during an "overflow row" (see below.) Once + * overflow occurs, columns added later in that overflow row will actually + * appear in the next batch, and will trigger a schema change when that + * batch is returned. That is, overflow "time shifts" a row addition from + * one batch to the next, and so it also time-shifts the column addition. + * </li></ul> + * Use the {@link LoaderSchema} class to build the schema. The schema class is + * part of the {@link TupleLoader} object available from the + * {@link #root()} method. + * <h4>Using the Schema</h4> + * Presents columns using a physical schema. That is, map columns appear + * as columns that provide a nested map schema. Presumes that column + * access is primarily structural: first get a map, then process all + * columns for the map. + * <p> + * If the input is a flat structure, then the physical schema has a + * flattened schema as the degenerate case. + * <p> + * In both cases, access to columns is by index or by name. If new columns + * are added while loading, their index is always at the end of the existing + * columns. + * <h4>Writing Data to the Batch</h4> + * Each batch is delimited by a call to {@link #startBatch()} and a call to + * {@link #harvestWithLookAhead()} to obtain the completed batch. Note that readers do not + * call these methods; the scan operator does this work. + * <p> + * Each row is delimited by a call to {@link #startValue()} and a call to + * {@link #saveRow()}. <tt>startRow()</tt> performs initialization necessary + * for some vectors such as repeated vectors. <tt>saveRow()</tt> moves the + * row pointer ahead. + * <p> + * A reader can easily reject a row by calling <tt>startRow()</tt>, begin + * to load a row, but omitting the call to <tt>saveRow()</tt> In this case, + * the next call to <tt>startRow()</tt> repositions the row pointer to the + * same row, and new data will overwrite the previous data, effectively erasing + * the unwanted row. This also works for the last row; omitting the call to + * <tt>saveRow()</tt> causes the batch to hold only the rows actually + * saved. + * <p> + * Readers then write to each column. Columns are accessible via index + * ({@link TupleLoader#column(int)} or by name + * ({@link TupleLoader#column(String)}. Indexed access is much faster. + * Column indexes are defined by the order that columns are added. The first + * column is column 0, the second is column 1 and so on. + * <p> + * Each call to the above methods returns the same column writer, allowing the + * reader to cache column writers for additional performance. + * <p> + * All column writers are of the same class; there is no need to cast to a + * type corresponding to the vector. Instead, they provide a variety of + * <tt>set<i>Type</i></tt> methods, where the type is one of various Java + * primitive or structured types. Most vectors provide just one method, but + * others (such as VarChar) provide two. The implementation will throw an + * exception if the vector does not support a particular type. + * <p> + * Note that this class uses the term "loader" for row and column writers + * since the term "writer" is already used by the legacy record set mutator + * and column writers. + * <h4>Handling Batch Limits</h4> + * The mutator enforces two sets of batch limits:<ol> + * <li>The number of rows per batch. The limit defaults to 64K (the Drill + * maximum), but can be set lower by the client.</li> + * <li>The size of the largest vector, which is capped at 16 MB. (A future + * version may allow adjustable caps, or cap the memory of the entire + * batch.</li></ol> + * Both limits are presented to the client via the {@link #isFull()} + * method. After each call to {@link #saveRow()}, the client should call + * <tt>isFull()</tt> to determine if the client can add another row. Note + * that failing to do this check will cause the next call to + * {@link #startBatch()} to throw an exception. + * <p> + * The limits have subtle differences, however. Row limits are simple: at + * the end of the last row, the mutator notices that no more rows are possible, + * and so does not allow starting a new row. + * <p> + * Vector overflow is more complex. A row may consist of columns (a, b, c). + * The client may write column a, but then column b might trigger a vector + * overflow. (For example, b is a Varchar, and the value for b is larger than + * the space left in the vector.) The client cannot stop and rewrite a. Instead, + * the client simply continues writing the row. The mutator, internally, moves + * this "overflow" row to a new batch. The overflow row becomes the first row + * of the next batch rather than the first row of the current batch. + * <p> + * For this reason, the client can treat the two overflow cases identically, + * as described above. + * <p> + * There are some subtle differences between the two cases that clients may + * occasionally may need to expect:<ul> + * <li>When a vector overflow occurs, the returned batch will have one + * fewer rows than the client might expect if it is simply counting the rows + * written.</li> + * <li>A new column added to the batch after overflow occurs will appear in + * the <i>next</i> batch, triggering a schema change between the current and + * next batches.</li></ul> + */ +package org.apache.drill.exec.physical.rowSet; http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java index 0497cfd..2d01ef4 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java @@ -96,6 +96,9 @@ public class BatchSchema implements Iterable<MaterializedField> { return result; } + // DRILL-5525: the semantics of this method are badly broken. + // Caveat emptor. + @Override public boolean equals(Object obj) { if (this == obj) { @@ -108,13 +111,24 @@ public class BatchSchema implements Iterable<MaterializedField> { return false; } BatchSchema other = (BatchSchema) obj; + if (selectionVectorMode != other.selectionVectorMode) { + return false; + } if (fields == null) { - if (other.fields != null) { - return false; - } - } else if (!fields.equals(other.fields)) { + return other.fields == null; + } + + // Compare names. + // (DRILL-5525: actually compares all fields.) + + if (!fields.equals(other.fields)) { return false; } + + // Compare types + // (DRILL-5525: this code is redundant because any differences + // will fail above.) + for (int i = 0; i < fields.size(); i++) { MajorType t1 = fields.get(i).getType(); MajorType t2 = other.fields.get(i).getType(); @@ -128,13 +142,25 @@ public class BatchSchema implements Iterable<MaterializedField> { } } } - if (selectionVectorMode != other.selectionVectorMode) { - return false; - } return true; } + /** + * Compare that two schemas are identical according to the rules defined + * in {@ link MaterializedField#isEquivalent(MaterializedField)}. In particular, + * this method requires that the fields have a 1:1 ordered correspondence + * in the two schemas. + * + * @param other another non-null batch schema + * @return <tt>true</tt> if the two schemas are equivalent according to + * the {@link MaterializedField#isEquivalent(MaterializedField)} rules, + * false otherwise + */ + public boolean isEquivalent(BatchSchema other) { + if (this == other) { + return true; + } if (fields == null || other.fields == null) { return fields == other.fields; } @@ -172,7 +198,7 @@ public class BatchSchema implements Iterable<MaterializedField> { } /** - * Merge two schema to produce a new, merged schema. The caller is responsible + * Merge two schemas to produce a new, merged schema. The caller is responsible * for ensuring that column names are unique. The order of the fields in the * new schema is the same as that of this schema, with the other schema's fields * appended in the order defined in the other schema. http://git-wip-us.apache.org/repos/asf/drill/blob/40de8ca4/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java index b4ae2d2..acb7a9b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java @@ -19,6 +19,7 @@ package org.apache.drill.exec.record; import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.vector.ValueVector; /** * A record batch contains a set of field values for a particular range of @@ -38,7 +39,7 @@ import org.apache.drill.exec.ops.FragmentContext; public interface RecordBatch extends VectorAccessible { /** max batch size, limited by 2-byte length in SV2: 65536 = 2^16 */ - public static final int MAX_BATCH_SIZE = 65536; + public static final int MAX_BATCH_SIZE = ValueVector.MAX_ROW_COUNT; /** * Describes the outcome of incrementing RecordBatch forward by a call to
