DRILL-2288: Fix ScanBatch violation of IterOutcome protocol and downstream chain of bugs.
Increments: 2288: Pt. 1 Core: Added unit test. [Drill2288GetColumnsMetadataWhenNoRowsTest, empty.json] 2288: Pt. 1 Core: Changed HBase test table #1's # of regions from 1 to 2. [HBaseTestsSuite] Also added TODO(DRILL-3954) comment about # of regions. 2288: Pt. 2 Core: Documented IterOutcome much more clearly. [RecordBatch] Also edited some related Javadoc. 2288: Pt. 2 Hyg.: Edited doc., added @Override, etc. [AbstractRecordBatch, RecordBatch] Purged unused SetupOutcome. Added @Override. Edited comments. Fix some comments to doc. comments. 2288: Pt. 3 Core&Hyg.: Added validation of IterOutcome sequence. [IteratorValidatorBatchIterator] Also: Renamed internal members for clarity. Added comments. 2288: Pt. 4 Core: Fixed a NONE -> OK_NEW_SCHEMA in ScanBatch.next(). [ScanBatch] (With nearby comments.) 2288: Pt. 4 Hyg.: Edited comments, reordered, whitespace. [ScanBatch] Reordered Added comments. Aligned. 2288: Pt. 4 Core+: Fixed UnionAllRecordBatch to receive IterOutcome sequence right. (3659) [UnionAllRecordBatch] 2288: Pt. 5 Core: Fixed ScanBatch.Mutator.isNewSchema() to stop spurious "new schema" reports (fix short-circuit OR, to call resetting method right). [ScanBatch] 2288: Pt. 5 Hyg.: Renamed, edited comments, reordered. [ScanBatch, SchemaChangeCallBack, AbstractSingleRecordBatch] Renamed getSchemaChange -> getSchemaChangedAndReset. Renamed schemaChange -> schemaChanged. Added doc. comments. Aligned. 2288: Pt. 6 Core: Avoided dummy Null.IntVec. column in JsonReader when not needed (MapWriter.isEmptyMap()). [JsonReader, 3 vector files] 2288: Pt. 6 Hyg.: Edited comments, message. Fixed message formatting. [RecordReader, JSONFormatPlugin, JSONRecordReader, AbstractMapVector, JsonReader] Fixed message formatting. Edited comments. Edited message. Fixed spurious line break. 2288: Pt. 7 Core: Added column families in HBaseRecordReader* to avoid dummy Null.IntVec. clash. [HBaseRecordReader] 2288: Pt. 8 Core.1: Cleared recordCount in OrderedPartitionRecordBatch.innerNext(). [OrderedPartitionRecordBatch] 2288: Pt. 8 Core.2: Cleared recordCount in ProjectRecordBatch.innerNext. [ProjectRecordBatch] 2288: Pt. 8 Core.3: Cleared recordCount in TopNBatch.innerNext. [TopNBatch] 2288: Pt. 9 Core: Had UnorderedReceiverBatch reset RecordBatchLoader's record count. [UnorderedReceiverBatch, RecordBatchLoader] 2288: Pt. 9 Hyg.: Added comments. [RecordBatchLoader] 2288: Pt. 10 Core: Worked around mismatched map child vectors in MapVector.getObject(). [MapVector] 2288: Pt. 11 Core: Added OK_NEW_SCHEMA schema comparison for HashAgg. [HashAggTemplate] 2288: Pt. 12 Core: Fixed memory leak in BaseTestQuery's printing. Fixed bad skipping of RecordBatchLoader.clear(...) and QueryDataBatch.load(...) for zero-row batches in printResult(...). Also, dropped suppression of call to VectorUtil.showVectorAccessibleContent(...) (so zero-row batches are as visible as others). 2288: Pt. 13 Core: Fixed test that used unhandled periods in column alias identifiers. 2288: Misc.: Added # of rows to showVectorAccessibleContent's output. [VectorUtil] 2288: Misc.: Added simple/partial toString() [VectorContainer, AbstractRecordReader, JSONRecordReader, BaseValueVector, FieldSelection, AbstractBaseWriter] 2288: Misc. Hyg.: Added doc. comments to VectorContainer. [VectorContainer] 2288: Misc. Hyg.: Edited comment. [DrillStringUtils] 2288: Misc. Hyg.: Clarified message for unhandled identifier containing period. 2288: Pt. 3 Core&Hyg. Upd.: Added schema comparison result to logging. [IteratorValidatorBatchIterator] 2288: Pt. 7 Core Upd.: Handled HBase columns too re NullableIntVectors. [HBaseRecordReader, TestTableGenerator, TestHBaseFilterPushDown] Created map-child vectors for requested columns. Added unit test method testDummyColumnsAreAvoided, adding new row to test table, updated some row counts. 2288: Pt. 7 Hyg. Upd.: Edited comment. [HBaseRecordReader] 2288: Pt. 11 Core Upd.: REVERTED all of bad OK_NEW_SCHEMA schema comparison for HashAgg. [HashAggTemplate] This reverts commit 0939660f4620c03da97f4e1bf25a27514e6d0b81. 2288: Pt. 6 Core Upd.: Added isEmptyMap override in new (just-rebased-in) PromotableWriter. [PromotableWriter] Adjusted definition and default implementation of isEmptyMap (to handle MongoDB storage plugin's use of JsonReader). 2288: Pt. 6 Hyg. Upd.: Purged old atLeastOneWrite flag. [JsonReader] 2288: Pt. 14: Disabled newly dying test testNestedFlatten(). Project: http://git-wip-us.apache.org/repos/asf/drill/repo Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/a0be3ae0 Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/a0be3ae0 Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/a0be3ae0 Branch: refs/heads/master Commit: a0be3ae0a5a69634be98cc517bcc31c11ffec91d Parents: 194680c Author: dbarclay <[email protected]> Authored: Tue Oct 27 19:25:25 2015 -0700 Committer: Hanifi Gunes <[email protected]> Committed: Mon Nov 9 12:17:03 2015 -0800 ---------------------------------------------------------------------- .../drill/common/expression/FieldReference.java | 6 +- .../drill/common/util/DrillStringUtils.java | 8 +- .../exec/store/hbase/HBaseRecordReader.java | 34 ++- .../drill/hbase/HBaseRecordReaderTest.java | 2 +- .../org/apache/drill/hbase/HBaseTestsSuite.java | 4 +- .../drill/hbase/TestHBaseFilterPushDown.java | 28 +- .../drill/hbase/TestHBaseProjectPushDown.java | 12 +- .../apache/drill/hbase/TestTableGenerator.java | 6 + .../codegen/templates/AbstractFieldWriter.java | 11 + .../src/main/codegen/templates/BaseWriter.java | 11 + .../src/main/codegen/templates/MapWriters.java | 5 + .../drill/exec/physical/impl/ScanBatch.java | 80 ++++-- .../exec/physical/impl/TopN/TopNBatch.java | 1 + .../OrderedPartitionRecordBatch.java | 1 + .../impl/project/ProjectRecordBatch.java | 1 + .../impl/union/UnionAllRecordBatch.java | 43 +-- .../UnorderedReceiverBatch.java | 1 + .../IteratorValidatorBatchIterator.java | 262 ++++++++++++++++--- .../drill/exec/record/AbstractRecordBatch.java | 18 +- .../exec/record/AbstractSingleRecordBatch.java | 2 +- .../apache/drill/exec/record/RecordBatch.java | 244 ++++++++++++++--- .../drill/exec/record/RecordBatchLoader.java | 28 ++ .../drill/exec/record/VectorContainer.java | 16 ++ .../drill/exec/store/AbstractRecordReader.java | 8 + .../apache/drill/exec/store/RecordReader.java | 3 +- .../exec/store/easy/json/JSONFormatPlugin.java | 3 +- .../exec/store/easy/json/JSONRecordReader.java | 12 +- .../org/apache/drill/exec/util/VectorUtil.java | 2 + .../drill/exec/vector/BaseValueVector.java | 5 + .../drill/exec/vector/SchemaChangeCallBack.java | 26 +- .../exec/vector/complex/AbstractMapVector.java | 4 +- .../drill/exec/vector/complex/MapVector.java | 9 +- .../exec/vector/complex/fn/FieldSelection.java | 9 + .../exec/vector/complex/fn/JsonReader.java | 37 +-- .../vector/complex/impl/AbstractBaseWriter.java | 5 + .../vector/complex/impl/PromotableWriter.java | 5 + .../java/org/apache/drill/BaseTestQuery.java | 3 - .../complex/writer/TestComplexTypeReader.java | 2 + ...ill2288GetColumnsMetadataWhenNoRowsTest.java | 201 ++++++++++++++ exec/jdbc/src/test/resources/empty.json | 0 40 files changed, 977 insertions(+), 181 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/drill/blob/a0be3ae0/common/src/main/java/org/apache/drill/common/expression/FieldReference.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/drill/common/expression/FieldReference.java b/common/src/main/java/org/apache/drill/common/expression/FieldReference.java index d97be14..7d0e86f 100644 --- a/common/src/main/java/org/apache/drill/common/expression/FieldReference.java +++ b/common/src/main/java/org/apache/drill/common/expression/FieldReference.java @@ -55,7 +55,11 @@ public class FieldReference extends SchemaPath { private void checkSimpleString(CharSequence value) { if (value.toString().contains(".")) { - throw new UnsupportedOperationException("Field references must be singular names."); + throw new UnsupportedOperationException( + String.format( + "Unhandled field reference \"%s\"; a field reference identifier" + + " must not have the form of a qualified name (i.e., with \".\").", + value)); } } http://git-wip-us.apache.org/repos/asf/drill/blob/a0be3ae0/common/src/main/java/org/apache/drill/common/util/DrillStringUtils.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/drill/common/util/DrillStringUtils.java b/common/src/main/java/org/apache/drill/common/util/DrillStringUtils.java index 83bfdc1..b016184 100644 --- a/common/src/main/java/org/apache/drill/common/util/DrillStringUtils.java +++ b/common/src/main/java/org/apache/drill/common/util/DrillStringUtils.java @@ -53,11 +53,11 @@ public class DrillStringUtils { } /** - * Escapes the characters in a {@code String} using Java String rules. + * Escapes the characters in a {@code String} according to Java string literal + * rules. * - * Deals correctly with quotes and control-chars (tab, backslash, cr, ff, etc.) - * - * So a tab becomes the characters {@code '\\'} and + * Deals correctly with quotes and control-chars (tab, backslash, cr, ff, + * etc.) so, for example, a tab becomes the characters {@code '\\'} and * {@code 't'}. * * Example: http://git-wip-us.apache.org/repos/asf/drill/blob/a0be3ae0/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java index ba10592..32780f8 100644 --- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java +++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java @@ -18,10 +18,12 @@ package org.apache.drill.exec.store.hbase; import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.NavigableSet; import java.util.Set; import java.util.concurrent.TimeUnit; @@ -44,6 +46,7 @@ import org.apache.drill.exec.vector.complex.MapVector; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; @@ -133,7 +136,13 @@ public class HBaseRecordReader extends AbstractRecordReader implements DrillHBas familyVectorMap = new HashMap<String, MapVector>(); try { - // Add Vectors to output in the order specified when creating reader + logger.debug("Opening scanner for HBase table '{}', Zookeeper quorum '{}', port '{}', znode '{}'.", + hbaseTableName, hbaseConf.get(HConstants.ZOOKEEPER_QUORUM), + hbaseConf.get(HBASE_ZOOKEEPER_PORT), hbaseConf.get(HConstants.ZOOKEEPER_ZNODE_PARENT)); + hTable = new HTable(hbaseConf, hbaseTableName); + + // Add top-level column-family map vectors to output in the order specified + // when creating reader (order of first appearance in query). for (SchemaPath column : getColumns()) { if (column.equals(ROW_KEY_PATH)) { MaterializedField field = MaterializedField.create(column, ROW_KEY_TYPE); @@ -142,10 +151,25 @@ public class HBaseRecordReader extends AbstractRecordReader implements DrillHBas getOrCreateFamilyVector(column.getRootSegment().getPath(), false); } } - logger.debug("Opening scanner for HBase table '{}', Zookeeper quorum '{}', port '{}', znode '{}'.", - hbaseTableName, hbaseConf.get(HConstants.ZOOKEEPER_QUORUM), - hbaseConf.get(HBASE_ZOOKEEPER_PORT), hbaseConf.get(HConstants.ZOOKEEPER_ZNODE_PARENT)); - hTable = new HTable(hbaseConf, hbaseTableName); + + // Add map and child vectors for any HBase column families and/or HBase + // columns that are requested (in order to avoid later creation of dummy + // NullableIntVectors for them). + final Set<Map.Entry<byte[], NavigableSet<byte []>>> familiesEntries = + hbaseScan.getFamilyMap().entrySet(); + for (Map.Entry<byte[], NavigableSet<byte []>> familyEntry : familiesEntries) { + final String familyName = new String(familyEntry.getKey(), + StandardCharsets.UTF_8); + final MapVector familyVector = getOrCreateFamilyVector(familyName, false); + final Set<byte []> children = familyEntry.getValue(); + if (null != children) { + for (byte[] childNameBytes : children) { + final String childName = new String(childNameBytes, + StandardCharsets.UTF_8); + getOrCreateColumnVector(familyVector, childName); + } + } + } resultScanner = hTable.getScanner(hbaseScan); } catch (SchemaChangeException | IOException e) { throw new ExecutionSetupException(e); http://git-wip-us.apache.org/repos/asf/drill/blob/a0be3ae0/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseRecordReaderTest.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseRecordReaderTest.java b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseRecordReaderTest.java index 79db8b6..6414f8b 100644 --- a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseRecordReaderTest.java +++ b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseRecordReaderTest.java @@ -24,7 +24,7 @@ public class HBaseRecordReaderTest extends BaseHBaseTest { @Test public void testLocalDistributed() throws Exception { String planName = "/hbase/hbase_scan_screen_physical.json"; - runHBasePhysicalVerifyCount(planName, HBaseTestsSuite.TEST_TABLE_1, 7); + runHBasePhysicalVerifyCount(planName, HBaseTestsSuite.TEST_TABLE_1, 8); } @Test http://git-wip-us.apache.org/repos/asf/drill/blob/a0be3ae0/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseTestsSuite.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseTestsSuite.java b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseTestsSuite.java index 2063503..4ecb4da 100644 --- a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseTestsSuite.java +++ b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseTestsSuite.java @@ -161,12 +161,14 @@ public class HBaseTestsSuite { } private static void createTestTables() throws Exception { + // TODO(DRILL-3954): Change number of regions from 1 to multiple for other + // tables and remaining problems not addressed by DRILL-2288 fixes. /* * We are seeing some issues with (Drill) Filter operator if a group scan span * multiple fragments. Hence the number of regions in the HBase table is set to 1. * Will revert to multiple region once the issue is resolved. */ - TestTableGenerator.generateHBaseDataset1(admin, TEST_TABLE_1, 1); + TestTableGenerator.generateHBaseDataset1(admin, TEST_TABLE_1, 2); TestTableGenerator.generateHBaseDataset3(admin, TEST_TABLE_3, 1); TestTableGenerator.generateHBaseDatasetCompositeKeyDate(admin, TEST_TABLE_COMPOSITE_DATE, 1); TestTableGenerator.generateHBaseDatasetCompositeKeyTime(admin, TEST_TABLE_COMPOSITE_TIME, 1); http://git-wip-us.apache.org/repos/asf/drill/blob/a0be3ae0/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseFilterPushDown.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseFilterPushDown.java b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseFilterPushDown.java index 05fb0b7..7ef7954 100644 --- a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseFilterPushDown.java +++ b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseFilterPushDown.java @@ -18,6 +18,7 @@ package org.apache.drill.hbase; import org.apache.drill.PlanTestBase; +import org.junit.Ignore; import org.junit.Test; public class TestHBaseFilterPushDown extends BaseHBaseTest { @@ -517,7 +518,7 @@ public class TestHBaseFilterPushDown extends BaseHBaseTest { + "WHERE\n" + " row_key > 'b4'"; - runHBaseSQLVerifyCount(sql, 3); + runHBaseSQLVerifyCount(sql, 4); final String[] expectedPlan = {".*startRow=b4\\\\x00.*stopRow=,.*"}; final String[] excludedPlan ={}; @@ -589,7 +590,7 @@ public class TestHBaseFilterPushDown extends BaseHBaseTest { + "WHERE\n" + " (row_key >= 'b5' OR row_key <= 'a2') AND (t.f.c1 >= '1' OR t.f.c1 is null)"; - runHBaseSQLVerifyCount(sql, 4); + runHBaseSQLVerifyCount(sql, 5); final String[] expectedPlan = {".*startRow=, stopRow=, filter=FilterList OR.*GREATER_OR_EQUAL, b5.*LESS_OR_EQUAL, a2.*"}; final String[] excludedPlan ={}; @@ -623,7 +624,7 @@ public class TestHBaseFilterPushDown extends BaseHBaseTest { + "WHERE\n" + " convert_from(row_key, 'UTF8') > 'b4'"; - runHBaseSQLVerifyCount(sql, 3); + runHBaseSQLVerifyCount(sql, 4); final String[] expectedPlan = {".*startRow=b4\\\\x00, stopRow=,.*"}; final String[] excludedPlan ={}; @@ -755,5 +756,26 @@ public class TestHBaseFilterPushDown extends BaseHBaseTest { } + @Test + public void testDummyColumnsAreAvoided() throws Exception { + setColumnWidth(10); + // Key aspects: + // - HBase columns c2 and c3 are referenced in the query + // - column c2 appears in rows in one region but not in rows in a second + // region, and c3 appears only in the second region + // - a downstream operation (e.g., sorting) doesn't handle schema changes + final String sql = "SELECT\n" + + " row_key, \n" + + " t.f .c2, t.f .c3, \n" + + " t.f2.c2, t.f2.c3 \n" + + "FROM\n" + + " hbase.`[TABLE_NAME]` t\n" + + "WHERE\n" + + " row_key = 'a3' OR row_key = 'b7' \n" + + "ORDER BY row_key"; + + runHBaseSQLVerifyCount(sql, 2); + } + } http://git-wip-us.apache.org/repos/asf/drill/blob/a0be3ae0/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseProjectPushDown.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseProjectPushDown.java b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseProjectPushDown.java index b27b2a0..befe1d8 100644 --- a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseProjectPushDown.java +++ b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseProjectPushDown.java @@ -28,7 +28,7 @@ public class TestHBaseProjectPushDown extends BaseHBaseTest { + "row_key\n" + "FROM\n" + " hbase.`[TABLE_NAME]` tableName" - , 7); + , 8); } @Test @@ -45,10 +45,14 @@ public class TestHBaseProjectPushDown extends BaseHBaseTest { public void testRowKeyAndColumnPushDown() throws Exception{ setColumnWidths(new int[] {8, 9, 6, 2, 6}); runHBaseSQLVerifyCount("SELECT\n" - + "row_key, t.f.c1*31 as `t.f.c1*31`, t.f.c2 as `t.f.c2`, 5 as `5`, 'abc' as `'abc'`\n" + // Note: Can't currently use period in column alias (not even with + // qualified identifier) because Drill internals don't currently encode + // names sufficiently. + + "row_key, t.f.c1 * 31 as `t dot f dot c1 * 31`, " + + "t.f.c2 as `t dot f dot c2`, 5 as `5`, 'abc' as `'abc'`\n" + "FROM\n" + " hbase.`[TABLE_NAME]` t" - , 7); + , 8); } @Test @@ -58,7 +62,7 @@ public class TestHBaseProjectPushDown extends BaseHBaseTest { + "row_key, f, f2\n" + "FROM\n" + " hbase.`[TABLE_NAME]` tableName" - , 7); + , 8); } } http://git-wip-us.apache.org/repos/asf/drill/blob/a0be3ae0/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestTableGenerator.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestTableGenerator.java b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestTableGenerator.java index e738bba..77e9d64 100644 --- a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestTableGenerator.java +++ b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestTableGenerator.java @@ -118,6 +118,12 @@ public class TestTableGenerator { p.add("f".getBytes(), "c8".getBytes(), "5".getBytes()); p.add("f2".getBytes(), "c9".getBytes(), "6".getBytes()); table.put(p); + + p = new Put("b7".getBytes()); + p.add("f".getBytes(), "c1".getBytes(), "1".getBytes()); + p.add("f".getBytes(), "c2".getBytes(), "2".getBytes()); + table.put(p); + table.flushCommits(); table.close(); } http://git-wip-us.apache.org/repos/asf/drill/blob/a0be3ae0/exec/java-exec/src/main/codegen/templates/AbstractFieldWriter.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/codegen/templates/AbstractFieldWriter.java b/exec/java-exec/src/main/codegen/templates/AbstractFieldWriter.java index 2da7141..7ab5dce 100644 --- a/exec/java-exec/src/main/codegen/templates/AbstractFieldWriter.java +++ b/exec/java-exec/src/main/codegen/templates/AbstractFieldWriter.java @@ -72,6 +72,17 @@ abstract class AbstractFieldWriter extends AbstractBaseWriter implements FieldWr fail("${name}"); } + /** + * This implementation returns {@code false}. + * <p> + * Must be overridden by map writers. + * </p> + */ + @Override + public boolean isEmptyMap() { + return false; + } + @Override public MapWriter map() { fail("Map"); http://git-wip-us.apache.org/repos/asf/drill/blob/a0be3ae0/exec/java-exec/src/main/codegen/templates/BaseWriter.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/codegen/templates/BaseWriter.java b/exec/java-exec/src/main/codegen/templates/BaseWriter.java index da27e66..8a9ea56 100644 --- a/exec/java-exec/src/main/codegen/templates/BaseWriter.java +++ b/exec/java-exec/src/main/codegen/templates/BaseWriter.java @@ -38,6 +38,17 @@ package org.apache.drill.exec.vector.complex.writer; MaterializedField getField(); + /** + * Whether this writer is a map writer and is empty (has no children). + * + * <p> + * Intended only for use in determining whether to add dummy vector to + * avoid empty (zero-column) schema, as in JsonReader. + * </p> + * + */ + boolean isEmptyMap(); + <#list vv.types as type><#list type.minor as minor> <#assign lowerName = minor.class?uncap_first /> <#if lowerName == "int" ><#assign lowerName = "integer" /></#if> http://git-wip-us.apache.org/repos/asf/drill/blob/a0be3ae0/exec/java-exec/src/main/codegen/templates/MapWriters.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/codegen/templates/MapWriters.java b/exec/java-exec/src/main/codegen/templates/MapWriters.java index 27ffbdd..93f2edb 100644 --- a/exec/java-exec/src/main/codegen/templates/MapWriters.java +++ b/exec/java-exec/src/main/codegen/templates/MapWriters.java @@ -70,6 +70,11 @@ public class ${mode}MapWriter extends AbstractFieldWriter { } @Override + public boolean isEmptyMap() { + return 0 == container.size(); + } + + @Override public MaterializedField getField() { return container.getField(); } http://git-wip-us.apache.org/repos/asf/drill/blob/a0be3ae0/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java index 1ac4f7b..dbb5e00 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java @@ -67,9 +67,13 @@ public class ScanBatch implements CloseableRecordBatch { private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ScanBatch.class); private static final ControlsInjector injector = ControlsInjectorFactory.getInjector(ScanBatch.class); - private final Map<MaterializedField.Key, ValueVector> fieldVectorMap = Maps.newHashMap(); - + /** Main collection of fields' value vectors. */ private final VectorContainer container = new VectorContainer(); + + /** Fields' value vectors indexed by fields' keys. */ + private final Map<MaterializedField.Key, ValueVector> fieldVectorMap = + Maps.newHashMap(); + private int recordCount; private final FragmentContext context; private final OperatorContext oContext; @@ -85,8 +89,12 @@ public class ScanBatch implements CloseableRecordBatch { private boolean done = false; private SchemaChangeCallBack callBack = new SchemaChangeCallBack(); private boolean hasReadNonEmptyFile = false; - public ScanBatch(PhysicalOperator subScanConfig, FragmentContext context, OperatorContext oContext, - Iterator<RecordReader> readers, List<String[]> partitionColumns, List<Integer> selectedPartitionColumns) throws ExecutionSetupException { + + + public ScanBatch(PhysicalOperator subScanConfig, FragmentContext context, + OperatorContext oContext, Iterator<RecordReader> readers, + List<String[]> partitionColumns, + List<Integer> selectedPartitionColumns) throws ExecutionSetupException { this.context = context; this.readers = readers; if (!readers.hasNext()) { @@ -123,7 +131,8 @@ public class ScanBatch implements CloseableRecordBatch { addPartitionVectors(); } - public ScanBatch(PhysicalOperator subScanConfig, FragmentContext context, Iterator<RecordReader> readers) + public ScanBatch(PhysicalOperator subScanConfig, FragmentContext context, + Iterator<RecordReader> readers) throws ExecutionSetupException { this(subScanConfig, context, context.newOperatorContext(subScanConfig, false /* ScanBatch is not subject to fragment memory limit */), @@ -183,18 +192,27 @@ public class ScanBatch implements CloseableRecordBatch { while ((recordCount = currentReader.next()) == 0) { try { if (!readers.hasNext()) { + // We're on the last reader, and it has no (more) rows. currentReader.close(); releaseAssets(); - done = true; + done = true; // have any future call to next() return NONE + if (mutator.isNewSchema()) { + // This last reader has a new schema (e.g., we have a zero-row + // file or other source). (Note that some sources have a non- + // null/non-trivial schema even when there are no rows.) + container.buildSchema(SelectionVectorMode.NONE); schema = container.getSchema(); + + return IterOutcome.OK_NEW_SCHEMA; } return IterOutcome.NONE; } + // At this point, the reader that hit its end is not the last reader. // If all the files we have read so far are just empty, the schema is not useful - if(!hasReadNonEmptyFile) { + if (! hasReadNonEmptyFile) { container.clear(); for (ValueVector v : fieldVectorMap.values()) { v.clear(); @@ -221,6 +239,7 @@ public class ScanBatch implements CloseableRecordBatch { return IterOutcome.STOP; } } + // At this point, the current reader has read 1 or more rows. hasReadNonEmptyFile = true; populatePartitionVectors(); @@ -264,7 +283,7 @@ public class ScanBatch implements CloseableRecordBatch { for (int i : selectedPartitionColumns) { final MaterializedField field = MaterializedField.create(SchemaPath.getSimplePath(partitionColumnDesignator + i), - Types.optional(MinorType.VARCHAR)); + Types.optional(MinorType.VARCHAR)); final ValueVector v = mutator.addField(field, NullableVarCharVector.class); partitionVectors.add(v); } @@ -313,19 +332,26 @@ public class ScanBatch implements CloseableRecordBatch { } private class Mutator implements OutputMutator { - private boolean schemaChange = true; + /** Whether schema has changed since last inquiry (via #isNewSchema}). Is + * true before first inquiry. */ + private boolean schemaChanged = true; + + @SuppressWarnings("unchecked") @Override - public <T extends ValueVector> T addField(MaterializedField field, Class<T> clazz) throws SchemaChangeException { - // Check if the field exists + public <T extends ValueVector> T addField(MaterializedField field, + Class<T> clazz) throws SchemaChangeException { + // Check if the field exists. ValueVector v = fieldVectorMap.get(field.key()); if (v == null || v.getClass() != clazz) { - // Field does not exist add it to the map and the output container + // Field does not exist--add it to the map and the output container. v = TypeHelper.getNewVector(field, oContext.getAllocator(), callBack); if (!clazz.isAssignableFrom(v.getClass())) { - throw new SchemaChangeException(String.format( - "The class that was provided %s does not correspond to the expected vector type of %s.", - clazz.getSimpleName(), v.getClass().getSimpleName())); + throw new SchemaChangeException( + String.format( + "The class that was provided, %s, does not correspond to the " + + "expected vector type of %s.", + clazz.getSimpleName(), v.getClass().getSimpleName())); } final ValueVector old = fieldVectorMap.put(field.key(), v); @@ -335,8 +361,8 @@ public class ScanBatch implements CloseableRecordBatch { } container.add(v); - // Adding new vectors to the container mark that the schema has changed - schemaChange = true; + // Added new vectors to the container--mark that the schema has changed. + schemaChanged = true; } return clazz.cast(v); @@ -349,11 +375,21 @@ public class ScanBatch implements CloseableRecordBatch { } } + /** + * Reports whether schema has changed (field was added or re-added) since + * last call to {@link #isNewSchema}. Returns true at first call. + */ @Override public boolean isNewSchema() { - // Check if top level schema has changed, second condition checks if one of the deeper map schema has changed - if (schemaChange || callBack.getSchemaChange()) { - schemaChange = false; + // Check if top-level schema or any of the deeper map schemas has changed. + + // Note: Callback's getSchemaChangedAndReset() must get called in order + // to reset it and avoid false reports of schema changes in future. (Be + // careful with short-circuit OR (||) operator.) + + final boolean deeperSchemaChanged = callBack.getSchemaChangedAndReset(); + if (schemaChanged || deeperSchemaChanged) { + schemaChanged = false; return true; } return false; @@ -392,6 +428,8 @@ public class ScanBatch implements CloseableRecordBatch { @Override public VectorContainer getOutgoingContainer() { - throw new UnsupportedOperationException(String.format(" You should not call getOutgoingContainer() for class %s", this.getClass().getCanonicalName())); + throw new UnsupportedOperationException( + String.format("You should not call getOutgoingContainer() for class %s", + this.getClass().getCanonicalName())); } } http://git-wip-us.apache.org/repos/asf/drill/blob/a0be3ae0/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java index 3ef6bfe..aca7549 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java @@ -159,6 +159,7 @@ public class TopNBatch extends AbstractRecordBatch<TopN> { @Override public IterOutcome innerNext() { + recordCount = 0; if (state == BatchState.DONE) { return IterOutcome.NONE; } http://git-wip-us.apache.org/repos/asf/drill/blob/a0be3ae0/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java index 3061f99..629a3e2 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java @@ -462,6 +462,7 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart @Override public IterOutcome innerNext() { + recordCount = 0; container.zeroVectors(); // if we got IterOutcome.NONE while getting partition vectors, and there are no batches on the queue, then we are http://git-wip-us.apache.org/repos/asf/drill/blob/a0be3ae0/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java index ab01db4..ce7c5ec 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java @@ -124,6 +124,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> { @Override public IterOutcome innerNext() { + recordCount = 0; if (hasRemainder) { handleRemainder(); return IterOutcome.OK; http://git-wip-us.apache.org/repos/asf/drill/blob/a0be3ae0/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java index 445568b..357269d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java @@ -103,7 +103,7 @@ public class UnionAllRecordBatch extends AbstractRecordBatch<UnionAll> { public IterOutcome innerNext() { try { IterOutcome upstream = unionAllInput.nextBatch(); - logger.debug("Upstream of Union-All: ", upstream.toString()); + logger.debug("Upstream of Union-All: {}", upstream); switch(upstream) { case NONE: case OUT_OF_MEMORY: @@ -306,28 +306,36 @@ public class UnionAllRecordBatch extends AbstractRecordBatch<UnionAll> { case OUT_OF_MEMORY: return iterLeft; - case NONE: - throw new SchemaChangeException("The left input of Union-All should not come from an empty data source"); - default: - throw new IllegalStateException(String.format("Unknown state %s.", iterLeft)); + throw new IllegalStateException( + String.format("Unexpected state %s.", iterLeft)); } IterOutcome iterRight = rightSide.nextBatch(); switch(iterRight) { case OK_NEW_SCHEMA: // Unless there is no record batch on the left side of the inputs, - // always start processing from the left side + // always start processing from the left side. unionAllRecordBatch.setCurrentRecordBatch(leftSide.getRecordBatch()); - inferOutputFields(); - break; - case NONE: - // If the right input side comes from an empty data source, - // use the left input side's schema directly - unionAllRecordBatch.setCurrentRecordBatch(leftSide.getRecordBatch()); - inferOutputFieldsFromLeftSide(); - rightIsFinish = true; + // If the record count of the first batch from right input is zero, + // there are two possibilities: + // 1. The right side is an empty input (e.g., file). + // 2. There will be more records carried by later batches. + if (rightSide.getRecordBatch().getRecordCount() == 0) { + iterRight = rightSide.nextBatch(); + + if (iterRight == IterOutcome.NONE) { + // Case 1: The right side was an empty input. + inferOutputFieldsFromLeftSide(); + rightIsFinish = true; + } else { + // Case 2: There are more records carried by the latter batches. + inferOutputFields(); + } + } else { + inferOutputFields(); + } break; case STOP: @@ -335,7 +343,8 @@ public class UnionAllRecordBatch extends AbstractRecordBatch<UnionAll> { return iterRight; default: - throw new IllegalStateException(String.format("Unknown state %s.", iterRight)); + throw new IllegalStateException( + String.format("Unexpected state %s.", iterRight)); } upstream = IterOutcome.OK_NEW_SCHEMA; @@ -387,7 +396,7 @@ public class UnionAllRecordBatch extends AbstractRecordBatch<UnionAll> { return upstream; default: - throw new SchemaChangeException("Schema change detected in the left input of Union-All. This is not currently supported"); + throw new IllegalStateException(String.format("Unknown state %s.", iterOutcome)); } } else { IterOutcome iterOutcome = leftSide.nextBatch(); @@ -535,4 +544,4 @@ public class UnionAllRecordBatch extends AbstractRecordBatch<UnionAll> { } } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/drill/blob/a0be3ae0/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java index caabfce..fafa14e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java @@ -150,6 +150,7 @@ public class UnorderedReceiverBatch implements CloseableRecordBatch { @Override public IterOutcome next() { + batchLoader.resetRecordCount(); stats.startProcessing(); try{ RawFragmentBatch batch; http://git-wip-us.apache.org/repos/asf/drill/blob/a0be3ae0/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java index ed7da9b..01c3c92 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java @@ -33,36 +33,113 @@ import org.apache.drill.exec.record.selection.SelectionVector4; import org.apache.drill.exec.util.BatchPrinter; import org.apache.drill.exec.vector.VectorValidator; +import static org.apache.drill.exec.record.RecordBatch.IterOutcome.*; + + public class IteratorValidatorBatchIterator implements CloseableRecordBatch { - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(IteratorValidatorBatchIterator.class); + private static final org.slf4j.Logger logger = + org.slf4j.LoggerFactory.getLogger(IteratorValidatorBatchIterator.class); static final boolean VALIDATE_VECTORS = false; - private IterOutcome state = IterOutcome.NOT_YET; + /** For logging/debuggability only. */ + private static volatile int instanceCount; + + /** For logging/debuggability only. */ + private final int instNum; + { + instNum = ++instanceCount; + } + + /** + * The upstream batch, calls to which and return values from which are + * checked by this validator. + */ private final RecordBatch incoming; - private boolean first = true; + + /** Incoming batch's type (simple class name); for logging/debuggability + * only. */ + private final String batchTypeName; + + /** Exception state of incoming batch; last value thrown by its next() + * method. */ + private Throwable exceptionState = null; + + /** Main state of incoming batch; last value returned by its next() method. */ + private IterOutcome batchState = null; + + /** Last schema retrieved after OK_NEW_SCHEMA or OK from next(). Null if none + * yet. Currently for logging/debuggability only. */ + private BatchSchema lastSchema = null; + + /** Last schema retrieved after OK_NEW_SCHEMA from next(). Null if none yet. + * Currently for logging/debuggability only. */ + private BatchSchema lastNewSchema = null; + + /** + * {@link IterOutcome} return value sequence validation state. + * (Only needs enough to validate returns of OK.) + */ + private enum ValidationState { + /** Initial state: Have not gotten any OK_NEW_SCHEMA yet and not + * terminated. OK is not allowed yet. */ + INITIAL_NO_SCHEMA, + /** Have gotten OK_NEW_SCHEMA already and not terminated. OK is allowed + * now. */ + HAVE_SCHEMA, + /** Terminal state: Have seen NONE or STOP. Nothing more is allowed. */ + TERMINAL + } + + /** High-level IterOutcome sequence state. */ + private ValidationState validationState = ValidationState.INITIAL_NO_SCHEMA; + public IteratorValidatorBatchIterator(RecordBatch incoming) { this.incoming = incoming; + batchTypeName = incoming.getClass().getSimpleName(); + + // (Log construction and close() at same level to bracket instance's activity.) + logger.trace( "[#{}; on {}]: Being constructed.", instNum, batchTypeName); } - private void validateReadState() { - switch (state) { + @Override + public String toString() { + return + super.toString() + + "[" + + "instNum = " + instNum + + ", validationState = " + validationState + + ", batchState = " + batchState + + ", ... " + + "; incoming = " + incoming + + "]"; + } + + private void validateReadState(String operation) { + if (batchState == null) { + throw new IllegalStateException( + String.format( + "Batch data read operation (%s) attempted before first next() call" + + " on batch [#%d, %s].", + operation, instNum, batchTypeName)); + } + switch (batchState) { case OK: case OK_NEW_SCHEMA: return; default: throw new IllegalStateException( - String - .format( - "You tried to do a batch data read operation when you were in a state of %s. You can only do this type of operation when you are in a state of OK or OK_NEW_SCHEMA.", - state.name())); + String.format( + "Batch data read operation (%s) attempted when last next() call" + + " on batch [#%d, %s] returned %s (not %s or %s).", + operation, instNum, batchTypeName, batchState, OK, OK_NEW_SCHEMA)); } } @Override public Iterator<VectorWrapper<?>> iterator() { - validateReadState(); + validateReadState("iterator()"); return incoming.iterator(); } @@ -78,7 +155,7 @@ public class IteratorValidatorBatchIterator implements CloseableRecordBatch { @Override public int getRecordCount() { - validateReadState(); + validateReadState("getRecordCount()"); return incoming.getRecordCount(); } @@ -89,19 +166,19 @@ public class IteratorValidatorBatchIterator implements CloseableRecordBatch { @Override public SelectionVector2 getSelectionVector2() { - validateReadState(); + validateReadState("getSelectionVector2()"); return incoming.getSelectionVector2(); } @Override public SelectionVector4 getSelectionVector4() { - validateReadState(); + validateReadState("getSelectionVector4()"); return incoming.getSelectionVector4(); } @Override public TypedFieldId getValueVectorId(SchemaPath path) { - validateReadState(); + validateReadState("getValueVectorId(SchemaPath)"); return incoming.getValueVectorId(path); } @@ -113,48 +190,161 @@ public class IteratorValidatorBatchIterator implements CloseableRecordBatch { @Override public IterOutcome next() { - if (state == IterOutcome.NONE ) { - throw new IllegalStateException("The incoming iterator has previously moved to a state of NONE. You should not be attempting to call next() again."); - } - state = incoming.next(); - if (first) { - first = !first; - } + logger.trace( "[#{}; on {}]: next() called.", instNum, batchTypeName); + final IterOutcome prevBatchState = batchState; + try { - if (state == IterOutcome.OK || state == IterOutcome.OK_NEW_SCHEMA) { - BatchSchema schema = incoming.getSchema(); - if (schema == null) { - return state; + // Check whether next() should even have been called in current state. + if (null != exceptionState) { + throw new IllegalStateException( + String.format( + "next() [on #%d; %s] called again after it threw %s (after" + + " returning %s). Caller should not have called next() again.", + instNum, batchTypeName, exceptionState, batchState)); } - - if (schema.getFieldCount() == 0) { - throw new IllegalStateException ("Incoming batch has an empty schema. This is not allowed."); + // (Note: This could use validationState.) + if (batchState == NONE || batchState == STOP) { + throw new IllegalStateException( + String.format( + "next() [on #%d, %s] called again after it returned %s." + + " Caller should not have called next() again.", + instNum, batchTypeName, batchState)); } - if (incoming.getRecordCount() > MAX_BATCH_SIZE) { - throw new IllegalStateException (String.format("Incoming batch of %s has size %d, which is beyond the limit of %d", incoming.getClass().getName(), incoming.getRecordCount(), MAX_BATCH_SIZE)); + + // Now get result from upstream next(). + batchState = incoming.next(); + + logger.trace("[#{}; on {}]: incoming next() return: ({} ->) {}", + instNum, batchTypeName, prevBatchState, batchState); + + // Check state transition and update high-level state. + switch (batchState) { + case OK_NEW_SCHEMA: + // OK_NEW_SCHEMA is allowed at any time, except if terminated (checked + // above). + // OK_NEW_SCHEMA moves to have-seen-schema state. + validationState = ValidationState.HAVE_SCHEMA; + break; + case OK: + // OK is allowed as long as OK_NEW_SCHEMA was seen, except if terminated + // (checked above). + if (validationState != ValidationState.HAVE_SCHEMA) { + throw new IllegalStateException( + String.format( + "next() returned %s without first returning %s [#%d, %s]", + batchState, OK_NEW_SCHEMA, instNum, batchTypeName)); + } + // OK doesn't change high-level state. + break; + case NONE: + // NONE is allowed as long as OK_NEW_SCHEMA was seen, except if + // already terminated (checked above). + if (validationState != ValidationState.HAVE_SCHEMA) { + throw new IllegalStateException( + String.format( + "next() returned %s without first returning %s [#%d, %s]", + batchState, OK_NEW_SCHEMA, instNum, batchTypeName)); + } + // NONE moves to terminal high-level state. + validationState = ValidationState.TERMINAL; + break; + case STOP: + // STOP is allowed at any time, except if already terminated (checked + // above). + // STOP moves to terminal high-level state. + validationState = ValidationState.TERMINAL; + break; + case NOT_YET: + case OUT_OF_MEMORY: + // NOT_YET and OUT_OF_MEMORY are allowed at any time, except if + // terminated (checked above). + // NOT_YET and OUT_OF_MEMORY OK don't change high-level state. + break; + default: + throw new AssertionError( + "Unhandled new " + IterOutcome.class.getSimpleName() + " value " + + batchState); + //break; } - if (VALIDATE_VECTORS) { - VectorValidator.validate(incoming); + // Validate schema when available. + if (batchState == OK || batchState == OK_NEW_SCHEMA) { + final BatchSchema prevLastSchema = lastSchema; + final BatchSchema prevLastNewSchema = lastNewSchema; + + lastSchema = incoming.getSchema(); + if (batchState == OK_NEW_SCHEMA) { + lastNewSchema = lastSchema; + } + + if (logger.isTraceEnabled()) { + logger.trace("[#{}; on {}]: incoming next() return: #records = {}, " + + "\n schema:" + + "\n {}, " + + "\n prev. new ({}):" + + "\n {}", + instNum, batchTypeName, incoming.getRecordCount(), + lastSchema, + lastSchema.equals(prevLastNewSchema) ? "equal" : "not equal", + prevLastNewSchema); + } + + if (lastSchema == null) { + throw new IllegalStateException( + String.format( + "Incoming batch [#%d, %s] has a null schema. This is not allowed.", + instNum, batchTypeName)); + } + if (lastSchema.getFieldCount() == 0) { + throw new IllegalStateException( + String.format( + "Incoming batch [#%d, %s] has an empty schema. This is not allowed.", + instNum, batchTypeName)); + } + if (incoming.getRecordCount() > MAX_BATCH_SIZE) { + throw new IllegalStateException( + String.format( + "Incoming batch [#%d, %s] has size %d, which is beyond the" + + " limit of %d", + instNum, batchTypeName, incoming.getRecordCount(), MAX_BATCH_SIZE + )); + } + + if (VALIDATE_VECTORS) { + VectorValidator.validate(incoming); + } } - } - return state; + return batchState; + } + catch (RuntimeException | Error e) { + exceptionState = e; + logger.trace("[#{}, on {}]: incoming next() exception: ({} ->) {}", + instNum, batchTypeName, prevBatchState, exceptionState, + exceptionState); + throw e; + } } @Override public WritableBatch getWritableBatch() { - validateReadState(); + validateReadState("getWritableBatch()"); return incoming.getWritableBatch(); } @Override public void close() { + // (Log construction and close() calls at same logging level to bracket + // instance's activity.) + logger.trace( "[#{}; on {}]: close() called, state = {} / {}.", + instNum, batchTypeName, batchState, exceptionState); } @Override public VectorContainer getOutgoingContainer() { - throw new UnsupportedOperationException(String.format(" You should not call getOutgoingContainer() for class %s", this.getClass().getCanonicalName())); + throw new UnsupportedOperationException( + String.format("You should not call getOutgoingContainer() for class %s", + this.getClass().getCanonicalName())); } } http://git-wip-us.apache.org/repos/asf/drill/blob/a0be3ae0/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java index aaa6f9e..f06c397 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java @@ -74,12 +74,18 @@ public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements } protected static enum BatchState { - BUILD_SCHEMA, // Need to build schema and return - FIRST, // This is still the first data batch - NOT_FIRST, // The first data batch has already been returned - STOP, // The query most likely failed, we need to propagate STOP to the root - OUT_OF_MEMORY, // Out of Memory while building the Schema...Ouch! - DONE // All work is done, no more data to be sent + /** Need to build schema and return. */ + BUILD_SCHEMA, + /** This is still the first data batch. */ + FIRST, + /** The first data batch has already been returned. */ + NOT_FIRST, + /** The query most likely failed, we need to propagate STOP to the root. */ + STOP, + /** Out of Memory while building the Schema...Ouch! */ + OUT_OF_MEMORY, + /** All work is done, no more data to be sent. */ + DONE } @Override http://git-wip-us.apache.org/repos/asf/drill/blob/a0be3ae0/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java index e84057b..4f91317 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java @@ -104,7 +104,7 @@ public abstract class AbstractSingleRecordBatch<T extends PhysicalOperator> exte } // Check if schema has changed - if (callBack.getSchemaChange()) { + if (callBack.getSchemaChangedAndReset()) { return IterOutcome.OK_NEW_SCHEMA; } http://git-wip-us.apache.org/repos/asf/drill/blob/a0be3ae0/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java index 6f10a1c..8229e58 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java @@ -23,60 +23,214 @@ import org.apache.drill.exec.record.selection.SelectionVector2; import org.apache.drill.exec.record.selection.SelectionVector4; /** - * A record batch contains a set of field values for a particular range of records. In the case of a record batch - * composed of ValueVectors, ideally a batch fits within L2 cache (~256k per core). The set of value vectors do not - * change unless the next() IterOutcome is a *_NEW_SCHEMA type. - * - * A key thing to know is that the Iterator provided by record batch must align with the rank positions of the field ids - * provided utilizing getValueVectorId(); + * A record batch contains a set of field values for a particular range of + * records. + * <p> + * In the case of a record batch composed of ValueVectors, ideally a batch + * fits within L2 cache (~256kB per core). The set of value vectors does + * not change except during a call to {@link #next()} that returns + * {@link IterOutcome#OK_NEW_SCHEMA} value. + * </p> + * <p> + * A key thing to know is that the Iterator provided by a record batch must + * align with the rank positions of the field IDs provided using + * {@link getValueVectorId}. + * </p> */ public interface RecordBatch extends VectorAccessible { - /* max batch size, limited by 2-byte-length in SV2 : 65536 = 2^16 */ + /** max batch size, limited by 2-byte length in SV2: 65536 = 2^16 */ public static final int MAX_BATCH_SIZE = 65536; /** - * Describes the outcome of a RecordBatch being incremented forward. + * Describes the outcome of incrementing RecordBatch forward by a call to + * {@link #next()}. + * <p> + * Key characteristics of the return value sequence: + * </p> + * <ul> + * <li> + * {@code OK_NEW_SCHEMA} always appears unless {@code STOP} appears. (A + * batch returns {@code OK_NEW_SCHEMA} before returning {@code NONE} even + * if the batch has zero rows.) + * </li> + * <li>{@code OK_NEW_SCHEMA} always appears before {@code OK} appears.</li> + * <li> + * The last value is always {@code NONE} or {@code STOP}, and {@code NONE} + * and {@code STOP} appear only as the last value. + * </li> + * </ul> + * <p> + * <strong>Details</strong>: + * </p> + * <p> + * For normal completion, the basic sequence of return values from calls to + * {@code next()} on a {@code RecordBatch} is: + * </p> + * <ol> + * <li> + * an {@link #OK_NEW_SCHEMA} value followed by zero or more {@link #OK} + * values, + * </li> + * <li> + * zero or more subsequences each having an {@code OK_NEW_SCHEMA} value + * followed by zero or more {@code OK} values, and then + * </li> + * <li> + * a {@link #NONE} value. + * </li> + * </ol> + * <p> + * In addition to that basic sequence, {@link #NOT_YET} and + * {@link #OUT_OF_MEMORY} values can appear anywhere in the subsequence + * before the terminal value ({@code NONE} or {@code STOP}). + * </p> + * <p> + * For abnormal termination, the sequence is truncated (before the + * {@code NONE}) and ends with {@link #STOP}. That is, the sequence begins + * with a subsequence that is some prefix of a normal-completion sequence + * and that does not contain {@code NONE}, and ends with {@code STOP}. + * </p> + * <p> + * (The normal-completion return sequence is matched by the following + * regular-expression-style grammar: + * <pre> + * ( ( NOT_YET | OUT_OF_MEMORY )* OK_NEW_SCHEMA + * ( NOT_YET | OUT_OF_MEMORY )* OK )* + * )+ + * ( NOT_YET | OUT_OF_+MEMORY )* NONE + * </pre> + * ) + * </p> */ public static enum IterOutcome { - NONE, // No more records were found. - OK, // A new range of records have been provided. - OK_NEW_SCHEMA, // A full collection of records - STOP, // Informs parent nodes that the query has terminated. In this case, a consumer can consume their QueryContext - // to understand the current state of things. - NOT_YET, // used by batches that haven't received incoming data yet. - OUT_OF_MEMORY // an upstream operator was unable to allocate memory. A batch receiving this should release memory if it can - } + /** + * Normal completion of batch. + * <p> + * The call to {@link #next()} + * read no records, + * the batch has and will have no more results to return, + * and {@code next()} must not be called again. + * </p> + * <p> + * This value will be returned only after {@link #OK_NEW_SCHEMA} has been + * returned at least once (not necessarily <em>immediately</em> after). + * </p> + */ + NONE, + + /** + * Zero or more records with same schema. + * <p> + * The call to {@link #next()} + * read zero or more records, + * the schema has not changed since the last time {@code OK_NEW_SCHEMA} + * was returned, + * and the batch will have more results to return (at least completion or + * abnormal termination ({@code NONE} or {@code STOP})). + * ({@code next()} should be called again.) + * </p> + * <p> + * This will be returned only after {@link #OK_NEW_SCHEMA} has been + * returned at least once (not necessarily <em>immediately</em> after). + * </p> + */ + OK, + + /** + * New schema, maybe with records. + * <p> + * The call to {@link #next()} + * changed the schema and vector structures + * and read zero or more records, + * and the batch will have more results to return (at least completion or + * abnormal termination ({@code NONE} or {@code STOP})). + * ({@code next()} should be called again.) + * </p> + */ + OK_NEW_SCHEMA, - public static enum SetupOutcome { - OK, OK_NEW_SCHEMA, FAILED + /** + * Non-completion (abnormal) termination. + * <p> + * The call to {@link #next()} + * reports that the query has terminated other than by normal completion, + * and that the caller must not call any of the schema-access or + * data-access methods nor call {@code next()} again. + * </p> + * <p> + * The caller can consume its QueryContext to understand the current state + * of things. + * </p> + */ + STOP, + + /** + * No data yet. + * <p> + * The call to {@link #next()} + * read no data, + * and the batch will have more results to return in the future (at least + * completion or abnormal termination ({@code NONE} or {@code STOP})). + * The caller should call {@code next()} again, but should do so later + * (including by returning {@code NOT_YET} to its caller). + * </p> + * <p> + * Normally, the caller should perform any locally available work while + * waiting for incoming data from the callee, for example, doing partial + * sorts on already received data while waiting for additional data to + * sort. + * </p> + * <p> + * Used by batches that haven't received incoming data yet. + * </p> + */ + NOT_YET, + + /** + * Out of memory (not fatal). + * <p> + * The call to {@link #next()}, + * including upstream operators, was unable to allocate memory + * and did not read any records, + * and the batch will have more results to return (at least completion or + * abnormal termination ({@code NONE} or {@code STOP})). + * The caller should release memory if it can (including by returning + * {@code OUT_OF_MEMORY} to its caller) and call {@code next()} again. + * </p> + */ + OUT_OF_MEMORY } /** - * Access the FragmentContext of the current query fragment. Useful for reporting failure information or other query - * level information. - * - * @return + * Gets the FragmentContext of the current query fragment. Useful for + * reporting failure information or other query-level information. */ public FragmentContext getContext(); /** - * Provide the schema of the current RecordBatch. This changes if and only if a *_NEW_SCHEMA IterOutcome is provided. - * - * @return + * Gets the current schema of this record batch. + * <p> + * May be called only when the most recent call to {@link #next}, if any, + * returned {@link #OK_NEW_SCHEMA} or {@link #OK}. + * </p> + * <p> + * The schema changes when and only when {@link #next} returns + * {@link #OK_NEW_SCHEMA}. + * </p> */ + @Override public BatchSchema getSchema(); /** - * Provide the number of records that are within this record count - * - * @return + * Gets the number of records that are within this record. */ + @Override public int getRecordCount(); /** - * Inform child nodes that this query should be terminated. Child nodes should utilize the QueryContext to determine - * what has happened. + * Informs child nodes that this query should be terminated. Child nodes + * should use the QueryContext to determine what has happened. */ public void kill(boolean sendUpstream); @@ -88,32 +242,44 @@ public interface RecordBatch extends VectorAccessible { public VectorContainer getOutgoingContainer(); /** - * Get the value vector type and id for the given schema path. The TypedFieldId should store a fieldId which is the - * same as the ordinal position of the field within the Iterator provided this classes implementation of - * Iterable<ValueVector>. + * Gets the value vector type and ID for the given schema path. The + * TypedFieldId should store a fieldId which is the same as the ordinal + * position of the field within the Iterator provided this class's + * implementation of Iterable<ValueVector>. * * @param path * The path where the vector should be located. * @return The local field id associated with this vector. If no field matches this path, this will return a null * TypedFieldId */ + @Override public abstract TypedFieldId getValueVectorId(SchemaPath path); + @Override public abstract VectorWrapper<?> getValueAccessorById(Class<?> clazz, int... ids); /** - * Update the data in each Field reading interface for the next range of records. Once a RecordBatch returns an - * IterOutcome.NONE, the consumer should no longer next(). Behavior at this point is undetermined and likely to throw - * an exception. + * Updates the data in each Field reading interface for the next range of + * records. + * <p> + * Once a RecordBatch's {@code next()} has returned {@link IterOutcome#NONE} + * or {@link IterOutcome#STOP}, the consumer should no longer call + * {@code next()}. Behavior at this point is undefined and likely to + * throw an exception. + * </p> + * <p> + * See {@link IterOutcome} for the protocol (possible sequences of return + * values). + * </p> + * * * @return An IterOutcome describing the result of the iteration. */ public IterOutcome next(); /** - * Get a writable version of this batch. Takes over owernship of existing buffers. - * - * @return + * Gets a writable version of this batch. Takes over ownership of existing + * buffers. */ public WritableBatch getWritableBatch(); http://git-wip-us.apache.org/repos/asf/drill/blob/a0be3ae0/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java index 55ae309..ed86358 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java @@ -40,6 +40,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +/** + * Holds record batch loaded from record batch message. + */ public class RecordBatchLoader implements VectorAccessible, Iterable<VectorWrapper<?>>{ private final static Logger logger = LoggerFactory.getLogger(RecordBatchLoader.class); @@ -48,6 +51,10 @@ public class RecordBatchLoader implements VectorAccessible, Iterable<VectorWrapp private int valueCount; private BatchSchema schema; + + /** + * Constructs a loader using the given allocator for vector buffer allocation. + */ public RecordBatchLoader(BufferAllocator allocator) { this.allocator = Preconditions.checkNotNull(allocator); } @@ -72,6 +79,11 @@ public class RecordBatchLoader implements VectorAccessible, Iterable<VectorWrapp valueCount = def.getRecordCount(); boolean schemaChanged = schema == null; + // Load vectors from the batch buffer, while tracking added and/or removed + // vectors (relative to the previous call) in order to determine whether the + // the schema has changed since the previous call. + + // Set up to recognize previous fields that no longer exist. final Map<MaterializedField, ValueVector> oldFields = Maps.newHashMap(); for(final VectorWrapper<?> wrapper : container) { final ValueVector vector = wrapper.getValueVector(); @@ -87,15 +99,18 @@ public class RecordBatchLoader implements VectorAccessible, Iterable<VectorWrapp ValueVector vector = oldFields.remove(fieldDef); if (vector == null) { + // Field did not exist previously--is schema change. schemaChanged = true; vector = TypeHelper.getNewVector(fieldDef, allocator); } else if (!vector.getField().getType().equals(fieldDef.getType())) { + // Field had different type before--is schema change. // clear previous vector vector.clear(); schemaChanged = true; vector = TypeHelper.getNewVector(fieldDef, allocator); } + // Load the vector. if (field.getValueCount() == 0) { AllocationHelper.allocate(vector, 0, 0, 0); } else { @@ -179,10 +194,23 @@ public class RecordBatchLoader implements VectorAccessible, Iterable<VectorWrapp return schema; } + public void resetRecordCount() { + valueCount = 0; + } + + /** + * Clears this loader, which clears the internal vector container (see + * {@link VectorContainer#clear}) and resets the record count to zero. + */ public void clear() { container.clear(); + resetRecordCount(); } + /** + * Sorts vectors into canonical order (by field name). Updates schema and + * internal vector container. + */ public void canonicalize() { //logger.debug( "RecordBatchLoader : before schema " + schema); container = VectorContainer.canonicalize(container); http://git-wip-us.apache.org/repos/asf/drill/blob/a0be3ae0/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java index ef22d52..815e2d8 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java @@ -55,6 +55,16 @@ public class VectorContainer implements Iterable<VectorWrapper<?>>, VectorAccess this.oContext = oContext; } + @Override + public String toString() { + return super.toString() + + "[recordCount = " + recordCount + + ", schemaChanged = " + schemaChanged + + ", schema = " + schema + + ", wrappers = " + wrappers + + ", ...]"; + } + /** * Get the OperatorContext. * @@ -164,6 +174,9 @@ public class VectorContainer implements Iterable<VectorWrapper<?>>, VectorAccess return vc; } + /** + * Sorts vectors into canonical order (by field name) in new VectorContainer. + */ public static VectorContainer canonicalize(VectorContainer original) { VectorContainer vc = new VectorContainer(); List<VectorWrapper<?>> canonicalWrappers = new ArrayList<VectorWrapper<?>>(original.wrappers); @@ -328,6 +341,9 @@ public class VectorContainer implements Iterable<VectorWrapper<?>>, VectorAccess return recordCount; } + /** + * Clears the contained vectors. (See {@link ValueVector#clear}). + */ public void zeroVectors() { for (VectorWrapper<?> w : wrappers) { w.clear(); http://git-wip-us.apache.org/repos/asf/drill/blob/a0be3ae0/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractRecordReader.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractRecordReader.java index 8ca3ec8..41285c7 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractRecordReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractRecordReader.java @@ -39,6 +39,14 @@ public abstract class AbstractRecordReader implements RecordReader { private boolean isStarQuery = false; private boolean isSkipQuery = false; + @Override + public String toString() { + return super.toString() + + "[columns = " + columns + + ", isStarQuery = " + isStarQuery + + ", isSkipQuery = " + isSkipQuery + "]"; + } + protected final void setColumns(Collection<SchemaPath> projected) { assert Preconditions.checkNotNull(projected, COL_NULL_ERROR).size() > 0 : COL_EMPTY_ERROR; if (projected instanceof ColumnList) { http://git-wip-us.apache.org/repos/asf/drill/blob/a0be3ae0/exec/java-exec/src/main/java/org/apache/drill/exec/store/RecordReader.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/RecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/RecordReader.java index c2ab0d0..f1b55e7 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/RecordReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/RecordReader.java @@ -44,7 +44,8 @@ public interface RecordReader extends AutoCloseable { void allocate(Map<Key, ValueVector> vectorMap) throws OutOfMemoryException; /** - * Increment record reader forward, writing into the provided output batch. + * Increments this record reader forward, writing via the provided output + * mutator into the output batch. * * @return The number of additional records added to the output. */ http://git-wip-us.apache.org/repos/asf/drill/blob/a0be3ae0/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java index 015fcf6..72f7d84 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java @@ -46,8 +46,7 @@ import com.fasterxml.jackson.annotation.JsonTypeName; import com.google.common.collect.ImmutableList; import com.google.common.collect.Maps; -public class - JSONFormatPlugin extends EasyFormatPlugin<JSONFormatConfig> { +public class JSONFormatPlugin extends EasyFormatPlugin<JSONFormatConfig> { private static final boolean IS_COMPRESSIBLE = true; private static final String DEFAULT_NAME = "json"; http://git-wip-us.apache.org/repos/asf/drill/blob/a0be3ae0/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java index 0e3c908..8160f1c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java @@ -66,7 +66,7 @@ public class JSONRecordReader extends AbstractRecordReader { * @param fragmentContext * @param inputPath * @param fileSystem - * @param columns + * @param columns pathnames of columns/subfields to read * @throws OutOfMemoryException */ public JSONRecordReader(final FragmentContext fragmentContext, final String inputPath, final DrillFileSystem fileSystem, @@ -79,7 +79,7 @@ public class JSONRecordReader extends AbstractRecordReader { * @param fragmentContext * @param embeddedContent * @param fileSystem - * @param columns + * @param columns pathnames of columns/subfields to read * @throws OutOfMemoryException */ public JSONRecordReader(final FragmentContext fragmentContext, final JsonNode embeddedContent, @@ -114,6 +114,14 @@ public class JSONRecordReader extends AbstractRecordReader { } @Override + public String toString() { + return super.toString() + + "[hadoopPath = " + hadoopPath + + ", recordCount = " + recordCount + + ", runningRecordCount = " + runningRecordCount + ", ...]"; + } + + @Override public void setup(final OperatorContext context, final OutputMutator output) throws ExecutionSetupException { try{ if (hadoopPath != null) { http://git-wip-us.apache.org/repos/asf/drill/blob/a0be3ae0/exec/java-exec/src/main/java/org/apache/drill/exec/util/VectorUtil.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/util/VectorUtil.java b/exec/java-exec/src/main/java/org/apache/drill/exec/util/VectorUtil.java index efbd30d..9f4115b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/util/VectorUtil.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/util/VectorUtil.java @@ -37,6 +37,7 @@ public class VectorUtil { public static void showVectorAccessibleContent(VectorAccessible va, final String delimiter) { int rows = va.getRecordCount(); + System.out.println(rows + " row(s):"); List<String> columns = Lists.newArrayList(); for (VectorWrapper<?> vw : va) { columns.add(vw.getValueVector().getField().getPath().getAsUnescapedPath()); @@ -138,6 +139,7 @@ public class VectorUtil { } int rows = va.getRecordCount(); + System.out.println(rows + " row(s):"); for (int row = 0; row < rows; row++) { // header, every 50 rows. if (row%50 == 0) { http://git-wip-us.apache.org/repos/asf/drill/blob/a0be3ae0/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseValueVector.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseValueVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseValueVector.java index 7b3ab41..eb5dbcd 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseValueVector.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseValueVector.java @@ -47,6 +47,11 @@ public abstract class BaseValueVector implements ValueVector { } @Override + public String toString() { + return super.toString() + "[field = " + field + ", ...]"; + } + + @Override public void clear() { getMutator().reset(); } http://git-wip-us.apache.org/repos/asf/drill/blob/a0be3ae0/exec/java-exec/src/main/java/org/apache/drill/exec/vector/SchemaChangeCallBack.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/SchemaChangeCallBack.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/SchemaChangeCallBack.java index de05131..4c2491c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/SchemaChangeCallBack.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/SchemaChangeCallBack.java @@ -20,16 +20,32 @@ package org.apache.drill.exec.vector; import org.apache.drill.exec.util.CallBack; + public class SchemaChangeCallBack implements CallBack { - private boolean schemaChange = false; + private boolean schemaChanged = false; + + /** + * Constructs a schema-change callback with the schema-changed state set to + * {@code false}. + */ + public SchemaChangeCallBack() { + } + /** + * Sets the schema-changed state to {@code true}. + */ + @Override public void doWork() { - schemaChange = true; + schemaChanged = true; } - public boolean getSchemaChange() { - final boolean current = schemaChange; - schemaChange = false; + /** + * Returns the value of schema-changed state, <strong>resetting</strong> the + * schema-changed state to {@code false}. + */ + public boolean getSchemaChangedAndReset() { + final boolean current = schemaChanged; + schemaChanged = false; return current; } } http://git-wip-us.apache.org/repos/asf/drill/blob/a0be3ae0/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/AbstractMapVector.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/AbstractMapVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/AbstractMapVector.java index 2c93c31..35df691 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/AbstractMapVector.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/AbstractMapVector.java @@ -189,8 +189,8 @@ public abstract class AbstractMapVector extends AbstractContainerVector { Preconditions.checkNotNull(vector, "vector cannot be null") ); if (old != null && old != vector) { - logger.debug("Field [%s] mutated from [%s] to [%s]", name, old.getClass().getSimpleName(), - vector.getClass().getSimpleName()); + logger.debug("Field [{}] mutated from [{}] to [{}]", name, old.getClass().getSimpleName(), + vector.getClass().getSimpleName()); } } http://git-wip-us.apache.org/repos/asf/drill/blob/a0be3ae0/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/MapVector.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/MapVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/MapVector.java index 8b4b858..048358c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/MapVector.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/MapVector.java @@ -309,7 +309,14 @@ public class MapVector extends AbstractMapVector { Map<String, Object> vv = new JsonStringHashMap<>(); for (String child:getChildFieldNames()) { ValueVector v = getChild(child); - if (v != null) { + // TODO(DRILL-4001): Resolve this hack: + // The index/value count check in the following if statement is a hack + // to work around the current fact that RecordBatchLoader.load and + // MapVector.load leave child vectors with a length of zero (as opposed + // to matching the lengths of siblings and the parent map vector) + // because they don't remove (or set the lengths of) vectors from + // previous batches that aren't in the current batch. + if (v != null && index < v.getAccessor().getValueCount()) { Object value = v.getAccessor().getObject(index); if (value != null) { vv.put(child, value); http://git-wip-us.apache.org/repos/asf/drill/blob/a0be3ae0/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/FieldSelection.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/FieldSelection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/FieldSelection.java index 1857479..dfaf5de 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/FieldSelection.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/FieldSelection.java @@ -59,6 +59,15 @@ class FieldSelection { this.mode = mode; } + @Override + public String toString() { + return + super.toString() + + "[mode = " + mode + + ", children = " + children + + ", childrenInsensitive = " + childrenInsensitive + "]"; + } + /** * Create a new tree that has all leaves fixed to support full depth validity. */
