Repeated values in JsonRecordReader
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/38ab96f3 Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/38ab96f3 Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/38ab96f3 Branch: refs/heads/master Commit: 38ab96f335537c6bbdb6a4a64b1c6e13755172f6 Parents: 73fad99 Author: Timothy Chen <[email protected]> Authored: Sat Aug 3 15:29:35 2013 -0700 Committer: Timothy Chen <[email protected]> Committed: Thu Aug 8 23:36:41 2013 -0700 ---------------------------------------------------------------------- .../drill/common/expression/SchemaPath.java | 2 +- .../templates/RepeatedValueVectors.java | 10 +- .../exec/schema/json/jackson/JacksonHelper.java | 116 +++++++++++-------- .../drill/exec/store/JSONRecordReader.java | 86 ++++++++------ .../apache/drill/exec/store/VectorHolder.java | 20 ++-- .../drill/exec/vector/AllocationHelper.java | 2 +- .../drill/exec/vector/RepeatedMutator.java | 23 ---- .../physical/impl/TestSimpleFragmentRun.java | 81 ++++++++++++- .../drill/exec/store/JSONRecordReaderTest.java | 56 ++++++++- .../src/test/resources/scan_json_test_4.json | 23 ++-- 10 files changed, 279 insertions(+), 140 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/38ab96f3/sandbox/prototype/common/src/main/java/org/apache/drill/common/expression/SchemaPath.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/expression/SchemaPath.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/expression/SchemaPath.java index 19d1069..6f1a733 100644 --- a/sandbox/prototype/common/src/main/java/org/apache/drill/common/expression/SchemaPath.java +++ b/sandbox/prototype/common/src/main/java/org/apache/drill/common/expression/SchemaPath.java @@ -42,7 +42,7 @@ public class SchemaPath extends LogicalExpressionBase { ")*$"; // reads well in RegexBuddy - private static final String SEGMENT_REGEX = "(?:\n" + "(\\[\\d+\\])\n" + "|\n" + "'?\n" + private static final String SEGMENT_REGEX = "(?:\n" + "\\[(\\d+)\\]\n" + "|\n" + "'?\n" + "([^\\.\\[\\+\\-\\!\\]\\}]+) # identifier\n" + "'?\n" + ")\n" + "([\\+\\-\\!\\]\\}]?) # collision type"; private static final int GROUP_INDEX = 1; http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/38ab96f3/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/RepeatedValueVectors.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/RepeatedValueVectors.java b/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/RepeatedValueVectors.java index c629a1d..f4a7049 100644 --- a/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/RepeatedValueVectors.java +++ b/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/RepeatedValueVectors.java @@ -1,4 +1,4 @@ - +import org.apache.drill.exec.vector.ValueVector; <@pp.dropOutputFile /> <#list types as type> @@ -249,7 +249,7 @@ import com.google.common.collect.Lists; } } - public final class Mutator implements RepeatedMutator { + public final class Mutator implements ValueVector.Mutator { private Mutator(){ @@ -264,12 +264,18 @@ import com.google.common.collect.Lists; */ public void add(int index, <#if type.major == "VarLen">byte[]<#elseif (type.width < 4)>int<#else>${minor.javaType!type.javaType}</#if> value) { int nextOffset = offsets.getAccessor().get(index+1); + if (index > 0 && nextOffset == 0) { + nextOffset = offsets.getAccessor().get(index); + } values.getMutator().set(nextOffset, value); offsets.getMutator().set(index+1, nextOffset+1); } public void add(int index, ${minor.class}Holder holder){ int nextOffset = offsets.getAccessor().get(index+1); + if (index > 0 && nextOffset == 0) { + nextOffset = offsets.getAccessor().get(index); + } values.getMutator().set(nextOffset, holder); offsets.getMutator().set(index+1, nextOffset+1); } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/38ab96f3/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/json/jackson/JacksonHelper.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/json/jackson/JacksonHelper.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/json/jackson/JacksonHelper.java index 0e2c052..d8f0646 100644 --- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/json/jackson/JacksonHelper.java +++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/json/jackson/JacksonHelper.java @@ -18,63 +18,83 @@ package org.apache.drill.exec.schema.json.jackson; -import java.io.IOException; - -import org.apache.drill.common.types.Types; -import org.apache.drill.common.types.TypeProtos.DataMode; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.core.JsonToken; import org.apache.drill.common.types.TypeProtos.MajorType; import org.apache.drill.common.types.TypeProtos.MinorType; +import org.apache.drill.common.types.Types; -import com.fasterxml.jackson.core.JsonParser; -import com.fasterxml.jackson.core.JsonToken; +import java.io.IOException; public class JacksonHelper { - public static final MajorType STRING_TYPE = Types.optional(MinorType.VARCHAR); - public static final MajorType BOOLEAN_TYPE = Types.optional(MinorType.BIT); - public static final MajorType ARRAY_TYPE = Types.repeated(MinorType.LATE); - public static final MajorType MAP_TYPE = Types.repeated(MinorType.MAP); - public static final MajorType INT_TYPE = Types.optional(MinorType.INT); - public static final MajorType FLOAT_TYPE = Types.optional(MinorType.FLOAT4); - public static final MajorType NULL_TYPE = Types.optional(MinorType.LATE); + public static final MajorType STRING_TYPE = Types.optional(MinorType.VARCHAR); + public static final MajorType REPEATED_STRING_TYPE = Types.repeated(MinorType.VARCHAR); + public static final MajorType BOOLEAN_TYPE = Types.optional(MinorType.BIT); + public static final MajorType REPEATED_BOOLEAN_TYPE = Types.repeated(MinorType.BIT); + public static final MajorType ARRAY_TYPE = Types.repeated(MinorType.LATE); + public static final MajorType MAP_TYPE = Types.repeated(MinorType.MAP); + public static final MajorType INT_TYPE = Types.optional(MinorType.INT); + public static final MajorType REPEATED_INT_TYPE = Types.repeated(MinorType.INT); + public static final MajorType FLOAT_TYPE = Types.optional(MinorType.FLOAT4); + public static final MajorType REPEATED_FLOAT_TYPE = Types.repeated(MinorType.FLOAT4); + public static final MajorType NULL_TYPE = Types.optional(MinorType.LATE); - public static MajorType getFieldType(JsonToken token) { - switch(token) { - case VALUE_STRING: - return STRING_TYPE; - case VALUE_FALSE: - return BOOLEAN_TYPE; - case VALUE_TRUE: - return BOOLEAN_TYPE; - case START_ARRAY: - return ARRAY_TYPE; - case START_OBJECT: - return MAP_TYPE; - case VALUE_NUMBER_INT: - return INT_TYPE; - case VALUE_NUMBER_FLOAT: - return FLOAT_TYPE; - case VALUE_NULL: - return NULL_TYPE; - } + public static MajorType getFieldType(JsonToken token, boolean repeated) { + if (repeated) { + switch (token) { + case VALUE_STRING: + return REPEATED_STRING_TYPE; + case VALUE_FALSE: + case VALUE_TRUE: + return REPEATED_BOOLEAN_TYPE; + case START_ARRAY: + return ARRAY_TYPE; + case START_OBJECT: + return MAP_TYPE; + case VALUE_NUMBER_INT: + return REPEATED_INT_TYPE; + case VALUE_NUMBER_FLOAT: + return REPEATED_FLOAT_TYPE; + } + } else { - throw new UnsupportedOperationException("Unsupported Jackson type: " + token); + switch (token) { + case VALUE_STRING: + return STRING_TYPE; + case VALUE_FALSE: + case VALUE_TRUE: + return BOOLEAN_TYPE; + case START_ARRAY: + return ARRAY_TYPE; + case START_OBJECT: + return MAP_TYPE; + case VALUE_NUMBER_INT: + return INT_TYPE; + case VALUE_NUMBER_FLOAT: + return FLOAT_TYPE; + case VALUE_NULL: + return NULL_TYPE; + } } - public static Object getValueFromFieldType(JsonParser parser, MinorType fieldType) throws IOException { - switch (fieldType) { - case INT: - return parser.getIntValue(); - case VARCHAR: - return parser.getValueAsString(); - case FLOAT4: - return parser.getFloatValue(); - case BIT: - return parser.getBooleanValue(); - case LATE: - return null; - default: - throw new RuntimeException("Unexpected Field type to return value: " + fieldType.toString()); - } + throw new UnsupportedOperationException("Unsupported Jackson type: " + token + ", Repeated: " + repeated); + } + + public static Object getValueFromFieldType(JsonParser parser, MinorType fieldType) throws IOException { + switch (fieldType) { + case INT: + return parser.getIntValue(); + case VARCHAR: + return parser.getValueAsString(); + case FLOAT4: + return parser.getFloatValue(); + case BIT: + return parser.getBooleanValue(); + case LATE: + return null; + default: + throw new RuntimeException("Unexpected Field type to return value: " + fieldType.toString()); } + } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/38ab96f3/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/JSONRecordReader.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/JSONRecordReader.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/JSONRecordReader.java index ff7d315..a4887c0 100644 --- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/JSONRecordReader.java +++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/JSONRecordReader.java @@ -103,7 +103,7 @@ public class JSONRecordReader implements RecordReader { int nextRowIndex = 0; try { - while (ReadType.OBJECT.readRecord(null, this, null, nextRowIndex++, 0)) { + while (ReadType.OBJECT.readRecord(this, null, nextRowIndex++, 0)) { parser.nextToken(); // Read to START_OBJECT token if (!parser.hasCurrentToken()) { @@ -180,12 +180,14 @@ public class JSONRecordReader implements RecordReader { ARRAY(END_ARRAY) { @Override public Field createField(RecordSchema parentSchema, String prefixFieldName, String fieldName, MajorType fieldType, int index) { - return new OrderedField(parentSchema, fieldType, prefixFieldName, index); + return new NamedField(parentSchema, prefixFieldName, fieldName, fieldType); + //return new OrderedField(parentSchema, fieldType, prefixFieldName, index); } @Override public RecordSchema createSchema() throws IOException { - return new ListSchema(); + return new ObjectSchema(); + //return new ListSchema(); } }, OBJECT(END_OBJECT) { @@ -215,8 +217,7 @@ public class JSONRecordReader implements RecordReader { } @SuppressWarnings("ConstantConditions") - public boolean readRecord(Field parentField, - JSONRecordReader reader, + public boolean readRecord(JSONRecordReader reader, String prefixFieldName, int rowIndex, int groupCount) throws IOException, SchemaChangeException { @@ -232,7 +233,7 @@ public class JSONRecordReader implements RecordReader { } String fieldName = parser.getCurrentName(); - MajorType fieldType = JacksonHelper.getFieldType(token); + MajorType fieldType = JacksonHelper.getFieldType(token, this == ReadType.ARRAY); ReadType readType = null; switch (token) { case START_ARRAY: @@ -246,17 +247,17 @@ public class JSONRecordReader implements RecordReader { } if (fieldType != null) { // Including nulls - isFull = isFull || - !recordData( - parentField, - readType, - reader, - fieldType, - prefixFieldName, - fieldName, - rowIndex, - colIndex, - groupCount); + boolean currentFieldFull = !recordData( + readType, + reader, + fieldType, + prefixFieldName, + fieldName, + rowIndex, + colIndex, + groupCount); + + isFull = isFull || currentFieldFull; } token = parser.nextToken(); colIndex += 1; @@ -277,8 +278,7 @@ public class JSONRecordReader implements RecordReader { } } - private boolean recordData(Field parentField, - JSONRecordReader.ReadType readType, + private boolean recordData(JSONRecordReader.ReadType readType, JSONRecordReader reader, MajorType fieldType, String prefixFieldName, @@ -322,18 +322,13 @@ public class JSONRecordReader implements RecordReader { field.assignSchemaIfNull(newSchema); if (fieldSchema == null) reader.setCurrentSchema(newSchema); - if(readType == ReadType.ARRAY) { - readType.readRecord(field, reader, field.getFullFieldName(), rowIndex, groupCount); - } else { - readType.readRecord(field, reader, field.getFullFieldName(), rowIndex, groupCount); - } + readType.readRecord(reader, field.getFullFieldName(), rowIndex, groupCount); reader.setCurrentSchema(currentSchema); } else { return addValueToVector( rowIndex, holder, - reader.getAllocator(), JacksonHelper.getValueFromFieldType( reader.getParser(), fieldType.getMinorType() @@ -346,10 +341,10 @@ public class JSONRecordReader implements RecordReader { return true; } - private static <T> boolean addValueToVector(int index, VectorHolder holder, BufferAllocator allocator, T val, MinorType minorType, int groupCount) { + private static <T> boolean addValueToVector(int index, VectorHolder holder, T val, MinorType minorType, int groupCount) { switch (minorType) { case INT: { - holder.incAndCheckLength(32); + holder.incAndCheckLength(32 + 1); if (groupCount == 0) { if (val != null) { NullableIntVector int4 = (NullableIntVector) holder.getValueVector(); @@ -363,13 +358,14 @@ public class JSONRecordReader implements RecordReader { RepeatedIntVector repeatedInt4 = (RepeatedIntVector) holder.getValueVector(); RepeatedIntVector.Mutator m = repeatedInt4.getMutator(); + holder.setGroupCount(index); m.add(index, (Integer) val); } - return holder.hasEnoughSpace(32); + return holder.hasEnoughSpace(32 + 1); } case FLOAT4: { - holder.incAndCheckLength(32); + holder.incAndCheckLength(32 + 1); if (groupCount == 0) { if (val != null) { NullableFloat4Vector float4 = (NullableFloat4Vector) holder.getValueVector(); @@ -383,9 +379,10 @@ public class JSONRecordReader implements RecordReader { RepeatedFloat4Vector repeatedFloat4 = (RepeatedFloat4Vector) holder.getValueVector(); RepeatedFloat4Vector.Mutator m = repeatedFloat4.getMutator(); - m.add(groupCount, (Float) val); + holder.setGroupCount(index); + m.add(index, (Float) val); } - return holder.hasEnoughSpace(32); + return holder.hasEnoughSpace(32 + 1); } case VARCHAR: { if (val == null) { @@ -401,16 +398,29 @@ public class JSONRecordReader implements RecordReader { } else { RepeatedVarCharVector repeatedVarLen4 = (RepeatedVarCharVector) holder.getValueVector(); RepeatedVarCharVector.Mutator m = repeatedVarLen4.getMutator(); + holder.setGroupCount(index); m.add(index, bytes); } - return holder.hasEnoughSpace(length); + return holder.hasEnoughSpace(length + 4 + 1); } } case BIT: { - holder.incAndCheckLength(1); - NullableBitVector bit = (NullableBitVector) holder.getValueVector(); - if (val != null) { - bit.getMutator().set(index, (Boolean) val ? 1 : 0); + holder.incAndCheckLength(1 + 1); + if (groupCount == 0) { + if (val != null) { + NullableBitVector bit = (NullableBitVector) holder.getValueVector(); + NullableBitVector.Mutator m = bit.getMutator(); + m.set(index, (Boolean) val ? 1 : 0); + } + } else { + if (val == null) { + throw new UnsupportedOperationException("Nullable repeated boolean is not supported."); + } + + RepeatedBitVector repeatedBit = (RepeatedBitVector) holder.getValueVector(); + RepeatedBitVector.Mutator m = repeatedBit.getMutator(); + holder.setGroupCount(index); + m.add(index, (Boolean) val ? 1 : 0); } return holder.hasEnoughSpace(1 + 1); } @@ -443,7 +453,9 @@ public class JSONRecordReader implements RecordReader { MajorType type = field.getFieldType(); MaterializedField f = MaterializedField.create(new SchemaPath(field.getFullFieldName(), ExpressionPosition.UNKNOWN), type); - if (f.getType().getMinorType().equals(MinorType.MAP)) { + MinorType minorType = f.getType().getMinorType(); + + if (minorType.equals(MinorType.MAP) || minorType.equals(MinorType.LATE)) { return null; } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/38ab96f3/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/VectorHolder.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/VectorHolder.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/VectorHolder.java index 43d3cd9..2c28082 100644 --- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/VectorHolder.java +++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/VectorHolder.java @@ -64,22 +64,18 @@ public class VectorHolder { public void populateVectorLength() { ValueVector.Mutator mutator = vector.getMutator(); - if(mutator instanceof NonRepeatedMutator) { - ((NonRepeatedMutator)mutator).setValueCount(count); - } else if(mutator instanceof RepeatedMutator) { - ((RepeatedMutator)mutator).setGroupAndValueCount(groupCount, count); + if(vector instanceof RepeatedFixedWidthVector || vector instanceof RepeatedVariableWidthVector) { + mutator.setValueCount(groupCount); } else { - throw new UnsupportedOperationException("Mutator not supported: " + mutator.getClass().getName()); + mutator.setValueCount(count); } } public void allocateNew(int valueLength) { - if (vector instanceof FixedWidthVector) { - ((FixedWidthVector) vector).allocateNew(valueLength); - } else if (vector instanceof VariableWidthVector) { - ((VariableWidthVector) vector).allocateNew(valueLength * 10, valueLength); - } else { - throw new UnsupportedOperationException(); - } + AllocationHelper.allocate(vector, valueLength, 10, 5); + } + + public void allocateNew(int valueLength, int repeatedPerTop) { + AllocationHelper.allocate(vector, valueLength, 10, repeatedPerTop); } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/38ab96f3/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/AllocationHelper.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/AllocationHelper.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/AllocationHelper.java index 69c17f4..5007dbd 100644 --- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/AllocationHelper.java +++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/AllocationHelper.java @@ -10,7 +10,7 @@ public class AllocationHelper { public static void allocate(ValueVector v, int valueCount, int bytesPerValue, int repeatedPerTop){ if(v instanceof FixedWidthVector){ ((FixedWidthVector) v).allocateNew(valueCount); - }else if(v instanceof VariableWidthVector){ + } else if (v instanceof VariableWidthVector) { ((VariableWidthVector) v).allocateNew(valueCount * bytesPerValue, valueCount); }else if(v instanceof RepeatedFixedWidthVector){ ((RepeatedFixedWidthVector) v).allocateNew(valueCount, valueCount * repeatedPerTop); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/38ab96f3/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedMutator.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedMutator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedMutator.java deleted file mode 100644 index 1227d02..0000000 --- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedMutator.java +++ /dev/null @@ -1,23 +0,0 @@ -/******************************************************************************* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - ******************************************************************************/ - -package org.apache.drill.exec.vector; - -public interface RepeatedMutator extends ValueVector.Mutator { - public void setGroupAndValueCount(int groupCount, int valueCount); -} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/38ab96f3/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestSimpleFragmentRun.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestSimpleFragmentRun.java b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestSimpleFragmentRun.java index 5d4e700..cabe9b3 100644 --- a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestSimpleFragmentRun.java +++ b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestSimpleFragmentRun.java @@ -87,7 +87,6 @@ public class TestSimpleFragmentRun extends PopUnitTestBase { System.out.println(); } - for (int i = 0; i < batchLoader.getRecordCount(); i++) { boolean first = true; recordCount++; @@ -101,13 +100,87 @@ public class TestSimpleFragmentRun extends PopUnitTestBase { } if(!first) System.out.println(); } - - - } logger.debug("Received results {}", results); assertEquals(recordCount, 200); } } + + @Test + public void runJSONScanPopFragment() throws Exception { + try (RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet(); + Drillbit bit = new Drillbit(CONFIG, serviceSet); + DrillClient client = new DrillClient(CONFIG, serviceSet.getCoordinator())) { + + // run query. + bit.run(); + client.connect(); + List<QueryResultBatch> results = client.runQuery(QueryType.PHYSICAL, Files.toString(FileUtils.getResourceAsFile("/physical_json_scan_test1.json"), Charsets.UTF_8)); + + // look at records + RecordBatchLoader batchLoader = new RecordBatchLoader(bit.getContext().getAllocator()); + int recordCount = 0; + + int expectedBatchCount = 2; + + assertEquals(expectedBatchCount, results.size()); + + for (int i = 0; i < results.size(); ++i) { + QueryResultBatch batch = results.get(i); + if (i == 0) { + assertTrue(batch.hasData()); + } else { + assertFalse(batch.hasData()); + return; + } + + assertTrue(batchLoader.load(batch.getHeader().getDef(), batch.getData())); + boolean firstColumn = true; + + // print headers. + System.out.println("\n\n========NEW SCHEMA=========\n\n"); + for (VectorWrapper<?> v : batchLoader) { + + if (firstColumn) { + firstColumn = false; + } else { + System.out.print("\t"); + } + System.out.print(v.getField().getName()); + System.out.print("["); + System.out.print(v.getField().getType().getMinorType()); + System.out.print("]"); + } + + System.out.println(); + + + for (int r = 0; i < batchLoader.getRecordCount(); r++) { + boolean first = true; + recordCount++; + for (VectorWrapper<?> v : batchLoader) { + if (first) { + first = false; + } else { + System.out.print("\t"); + } + + ValueVector.Accessor accessor = v.getValueVector().getAccessor(); + + if (v.getField().getType().getMinorType() == TypeProtos.MinorType.VARCHAR) { + System.out.println(new String((byte[]) accessor.getObject(r), UTF_8)); + } else { + System.out.print(accessor.getObject(r)); + } + } + if (!first) System.out.println(); + } + + } + + assertEquals(2, recordCount); + } + } + } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/38ab96f3/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/JSONRecordReaderTest.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/JSONRecordReaderTest.java b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/JSONRecordReaderTest.java index 0ebb529..b39ac8a 100644 --- a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/JSONRecordReaderTest.java +++ b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/JSONRecordReaderTest.java @@ -5,6 +5,7 @@ import static org.junit.Assert.assertTrue; import java.io.IOException; import java.nio.charset.Charset; +import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -68,7 +69,12 @@ public class JSONRecordReaderTest { SchemaDefProtos.FieldDef def = metadata.getDef(); assertEquals(expectedMinorType, def.getMajorType().getMinorType()); String[] parts = name.split("\\."); - assertEquals(parts.length, def.getNameList().size()); + int expected = parts.length; + boolean expectingArray = List.class.isAssignableFrom(value.getClass()); + if (expectingArray) { + expected += 1; + } + assertEquals(expected, def.getNameList().size()); for(int i = 0; i < parts.length; ++i) { assertEquals(parts[i], def.getName(i).getName()); } @@ -78,10 +84,21 @@ public class JSONRecordReaderTest { } T val = (T) valueVector.getAccessor().getObject(index); - if (val instanceof byte[]) { - assertTrue(Arrays.equals((byte[]) value, (byte[]) val)); + assertValue(value, val); + } + + private void assertValue(Object expected, Object found) { + if (found instanceof byte[]) { + assertTrue(Arrays.equals((byte[]) expected, (byte[]) found)); + } else if(found instanceof ArrayList) { + List expectedArray = (List) expected; + List foundArray = (List) found; + assertEquals(expectedArray.size(), foundArray.size()); + for(int i = 0; i < expectedArray.size(); ++i) { + assertValue(expectedArray.get(i), foundArray.get(i)); + } } else { - assertEquals(value, val); + assertEquals(expected, found); } } @@ -234,4 +251,35 @@ public class JSONRecordReaderTest { assertEquals(0, jr.next()); assertTrue(mutator.getRemovedFields().isEmpty()); } + + @Test + public void testRepeatedFields(@Injectable final FragmentContext context) throws ExecutionSetupException { + new Expectations() { + { + context.getAllocator(); + returns(new DirectBufferAllocator()); + } + }; + + JSONRecordReader jr = new JSONRecordReader(context, getResource("scan_json_test_4.json")); + + MockOutputMutator mutator = new MockOutputMutator(); + List<ValueVector> addFields = mutator.getAddFields(); + jr.setup(mutator); + assertEquals(2, jr.next()); + assertEquals(7, addFields.size()); + assertField(addFields.get(0), 0, MinorType.INT, 123, "test"); + assertField(addFields.get(1), 0, MinorType.INT, Arrays.asList(1, 2, 3), "test2"); + assertField(addFields.get(2), 0, MinorType.INT, Arrays.asList(4, 5, 6), "test3.a"); + assertField(addFields.get(3), 0, MinorType.INT, Arrays.asList(7, 8, 9), "test3.b"); + assertField(addFields.get(4), 0, MinorType.INT, Arrays.asList(10, 11, 12), "test3.c.d"); + assertField(addFields.get(5), 0, MinorType.FLOAT4, Arrays.<Float>asList((float) 1.1, (float) 1.2, (float) 1.3), "testFloat"); + assertField(addFields.get(6), 0, MinorType.VARCHAR, Arrays.asList("hello".getBytes(UTF_8), "drill".getBytes(UTF_8)), "testStr"); + assertField(addFields.get(1), 1, MinorType.INT, Arrays.asList(1, 2), "test2"); + assertField(addFields.get(2), 1, MinorType.INT, Arrays.asList(7, 7, 7, 8), "test3.a"); + assertField(addFields.get(5), 1, MinorType.FLOAT4, Arrays.<Float>asList((float) 2.2, (float) 2.3,(float) 2.4), "testFloat"); + + assertEquals(0, jr.next()); + assertTrue(mutator.getRemovedFields().isEmpty()); + } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/38ab96f3/sandbox/prototype/exec/java-exec/src/test/resources/scan_json_test_4.json ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/java-exec/src/test/resources/scan_json_test_4.json b/sandbox/prototype/exec/java-exec/src/test/resources/scan_json_test_4.json index 0fb3202..fd003ac 100644 --- a/sandbox/prototype/exec/java-exec/src/test/resources/scan_json_test_4.json +++ b/sandbox/prototype/exec/java-exec/src/test/resources/scan_json_test_4.json @@ -1,14 +1,21 @@ { "test": 123, "test2": [1,2,3], - "a": { - "b": 1 - } + "test3": { + "a": [4,5,6], + "b": [7,8,9], + "c": { + "d": [10, 11, 12] + } + }, + "testFloat": [1.1, 1.2, 1.3], + "testStr": ["hello", "drill"] } { - "test": 1234, - "test3": false, - "a": { - "b": 2 - } + "test2": [1,2], + "test3": { + "a": [7,7,7,8], + "b": [] + }, + "testFloat": [2.2, 2.3, 2.4] } \ No newline at end of file
