Fix late type binding for json record reader
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/b1e48b32 Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/b1e48b32 Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/b1e48b32 Branch: refs/heads/master Commit: b1e48b32e3bc5e240a01d75f83ac5d2be4b2e7ae Parents: a15f5b1 Author: Timothy Chen <[email protected]> Authored: Sun Aug 11 11:55:24 2013 -0700 Committer: Timothy Chen <[email protected]> Committed: Sun Aug 11 11:55:24 2013 -0700 ---------------------------------------------------------------------- .../org/apache/drill/common/types/Types.java | 27 ++++- .../org/apache/drill/exec/schema/Field.java | 111 +++++++++++-------- .../exec/schema/json/jackson/JacksonHelper.java | 1 + .../drill/exec/store/JSONRecordReader.java | 49 +++++--- .../drill/exec/store/JSONRecordReaderTest.java | 41 +++++-- .../src/test/resources/scan_json_test_5.json | 33 +++--- 6 files changed, 170 insertions(+), 92 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b1e48b32/sandbox/prototype/common/src/main/java/org/apache/drill/common/types/Types.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/types/Types.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/types/Types.java index e81bc89..f07f726 100644 --- a/sandbox/prototype/common/src/main/java/org/apache/drill/common/types/Types.java +++ b/sandbox/prototype/common/src/main/java/org/apache/drill/common/types/Types.java @@ -4,6 +4,8 @@ import org.apache.drill.common.types.TypeProtos.DataMode; import org.apache.drill.common.types.TypeProtos.MajorType; import org.apache.drill.common.types.TypeProtos.MinorType; +import static org.apache.drill.common.types.TypeProtos.DataMode.REPEATED; + public class Types { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Types.class); @@ -16,7 +18,7 @@ public class Types { } public static boolean isNumericType(MajorType type){ - if(type.getMode() == DataMode.REPEATED) return false; + if(type.getMode() == REPEATED) return false; switch(type.getMinorType()){ case BIGINT: @@ -40,7 +42,7 @@ public class Types { } public static boolean usesHolderForGet(MajorType type){ - if(type.getMode() == DataMode.REPEATED) return true; + if(type.getMode() == REPEATED) return true; switch(type.getMinorType()){ case BIGINT: case DECIMAL4: @@ -76,7 +78,7 @@ public class Types { public static boolean isStringScalarType(MajorType type){ - if(type.getMode() == DataMode.REPEATED) return false; + if(type.getMode() == REPEATED) return false; switch(type.getMinorType()){ case FIXEDCHAR: case FIXED16CHAR: @@ -89,7 +91,7 @@ public class Types { } public static boolean isBytesScalarType(MajorType type){ - if(type.getMode() == DataMode.REPEATED) return false; + if(type.getMode() == REPEATED) return false; switch(type.getMinorType()){ case FIXEDBINARY: case VARBINARY: @@ -100,7 +102,7 @@ public class Types { } public static Comparability getComparability(MajorType type){ - if(type.getMode() == DataMode.REPEATED) return Comparability.NONE; + if(type.getMode() == REPEATED) return Comparability.NONE; if(type.getMinorType() == MinorType.LATE) return Comparability.UNKNOWN; switch(type.getMinorType()){ @@ -144,12 +146,25 @@ public class Types { } public static MajorType repeated(MinorType type){ - return MajorType.newBuilder().setMode(DataMode.REPEATED).setMinorType(type).build(); + return MajorType.newBuilder().setMode(REPEATED).setMinorType(type).build(); } public static MajorType optional(MinorType type){ return MajorType.newBuilder().setMode(DataMode.OPTIONAL).setMinorType(type).build(); } + + public static MajorType overrideMinorType(MajorType originalMajorType, MinorType overrideMinorType) { + switch(originalMajorType.getMode()) { + case REPEATED: + return repeated(overrideMinorType); + case OPTIONAL: + return optional(overrideMinorType); + case REQUIRED: + return required(overrideMinorType); + default: + throw new UnsupportedOperationException(); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b1e48b32/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/Field.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/Field.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/Field.java index 85bbdf3..080be92 100644 --- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/Field.java +++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/Field.java @@ -20,6 +20,7 @@ package org.apache.drill.exec.schema; import org.apache.drill.common.expression.ExpressionPosition; import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.common.types.TypeProtos; import org.apache.drill.common.types.TypeProtos.MajorType; import org.apache.drill.exec.record.MaterializedField; @@ -27,64 +28,80 @@ import com.google.common.base.Objects; import com.google.common.base.Strings; public abstract class Field { - final MajorType fieldType; - final String prefixFieldName; - RecordSchema schema; - RecordSchema parentSchema; - boolean read; - - public Field(RecordSchema parentSchema, MajorType type, String prefixFieldName) { - fieldType = type; - this.prefixFieldName = prefixFieldName; - this.parentSchema = parentSchema; + final String prefixFieldName; + MajorType fieldType; + RecordSchema schema; + RecordSchema parentSchema; + boolean read; + + public Field(RecordSchema parentSchema, MajorType type, String prefixFieldName) { + fieldType = type; + this.prefixFieldName = prefixFieldName; + this.parentSchema = parentSchema; + } + + public MaterializedField getAsMaterializedField() { + return MaterializedField.create(new SchemaPath(getFieldName(), ExpressionPosition.UNKNOWN), fieldType); + } + + public abstract String getFieldName(); + + public String getFullFieldName() { + String fieldName = getFieldName(); + if(Strings.isNullOrEmpty(prefixFieldName)) { + return fieldName; + } else if(Strings.isNullOrEmpty(fieldName)) { + return prefixFieldName; + } else { + return prefixFieldName + "." + getFieldName(); } + } - public MaterializedField getAsMaterializedField(){ - return MaterializedField.create(new SchemaPath(getFieldName(), ExpressionPosition.UNKNOWN), fieldType); - } - - public abstract String getFieldName(); + public void setRead(boolean read) { + this.read = read; + } - public String getFullFieldName() { - return Strings.isNullOrEmpty(prefixFieldName) ? getFieldName() : prefixFieldName + "." + getFieldName(); - } + protected abstract Objects.ToStringHelper addAttributesToHelper(Objects.ToStringHelper helper); - public void setRead(boolean read) { - this.read = read; - } + Objects.ToStringHelper getAttributesStringHelper() { + return Objects.toStringHelper(this).add("type", fieldType) + .add("fullFieldName", getFullFieldName()) + .add("schema", schema == null ? null : schema.toSchemaString()).omitNullValues(); + } - protected abstract Objects.ToStringHelper addAttributesToHelper(Objects.ToStringHelper helper); + @Override + public String toString() { + return addAttributesToHelper(getAttributesStringHelper()).toString(); + } - Objects.ToStringHelper getAttributesStringHelper() { - return Objects.toStringHelper(this).add("type", fieldType) - .add("fullFieldName", getFullFieldName()) - .add("schema", schema == null ? null : schema.toSchemaString()).omitNullValues(); - } + public RecordSchema getAssignedSchema() { + return schema; + } - @Override - public String toString() { - return addAttributesToHelper(getAttributesStringHelper()).toString(); + public void assignSchemaIfNull(RecordSchema newSchema) { + if (!hasSchema()) { + schema = newSchema; } + } - public RecordSchema getAssignedSchema() { - return schema; - } + public boolean isRead() { + return read; + } - public void assignSchemaIfNull(RecordSchema newSchema) { - if (!hasSchema()) { - schema = newSchema; - } - } + public boolean hasSchema() { + return schema != null; + } - public boolean isRead() { - return read; - } + public MajorType getFieldType() { + return fieldType; + } - public boolean hasSchema() { - return schema != null; - } + public void setFieldType(MajorType fieldType) { + this.fieldType = fieldType; + } - public MajorType getFieldType() { - return fieldType; - } + @Override + public int hashCode() { + return getFullFieldName().hashCode(); + } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b1e48b32/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/json/jackson/JacksonHelper.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/json/jackson/JacksonHelper.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/json/jackson/JacksonHelper.java index d8f0646..22167b1 100644 --- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/json/jackson/JacksonHelper.java +++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/json/jackson/JacksonHelper.java @@ -92,6 +92,7 @@ public class JacksonHelper { case BIT: return parser.getBooleanValue(); case LATE: + case NULL: return null; default: throw new RuntimeException("Unexpected Field type to return value: " + fieldType.toString()); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b1e48b32/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/JSONRecordReader.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/JSONRecordReader.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/JSONRecordReader.java index a4887c0..21b8c1b 100644 --- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/JSONRecordReader.java +++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/JSONRecordReader.java @@ -16,6 +16,7 @@ import org.apache.drill.common.expression.ExpressionPosition; import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.common.types.TypeProtos.MajorType; import org.apache.drill.common.types.TypeProtos.MinorType; +import org.apache.drill.common.types.Types; import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.ops.FragmentContext; @@ -42,7 +43,7 @@ public class JSONRecordReader implements RecordReader { private final String inputPath; - private final Map<Field, VectorHolder> valueVectorMap; + private final Map<String, VectorHolder> valueVectorMap; private JsonParser parser; private SchemaIdGenerator generator; @@ -181,13 +182,11 @@ public class JSONRecordReader implements RecordReader { @Override public Field createField(RecordSchema parentSchema, String prefixFieldName, String fieldName, MajorType fieldType, int index) { return new NamedField(parentSchema, prefixFieldName, fieldName, fieldType); - //return new OrderedField(parentSchema, fieldType, prefixFieldName, index); } @Override public RecordSchema createSchema() throws IOException { return new ObjectSchema(); - //return new ListSchema(); } }, OBJECT(END_OBJECT) { @@ -287,18 +286,30 @@ public class JSONRecordReader implements RecordReader { int colIndex, int groupCount) throws IOException, SchemaChangeException { RecordSchema currentSchema = reader.getCurrentSchema(); - Field field = currentSchema.getField(fieldName, colIndex); + Field field = currentSchema.getField(fieldName == null ? prefixFieldName : fieldName, colIndex); boolean isFieldFound = field != null; List<Field> removedFields = reader.getRemovedFields(); - if (!isFieldFound || !field.getFieldType().equals(fieldType)) { - if (isFieldFound) { + boolean newFieldLateBound = fieldType.getMinorType().equals(MinorType.LATE); + + if (isFieldFound && !field.getFieldType().equals(fieldType)) { + boolean existingFieldLateBound = field.getFieldType().getMinorType().equals(MinorType.LATE); + + if (newFieldLateBound && !existingFieldLateBound) { + fieldType = Types.overrideMinorType(fieldType, field.getFieldType().getMinorType()); + } else if (!newFieldLateBound && existingFieldLateBound) { + field.setFieldType(Types.overrideMinorType(field.getFieldType(), fieldType.getMinorType())); + } else if (!newFieldLateBound && !existingFieldLateBound) { if (field.hasSchema()) { removeChildFields(removedFields, field); } removedFields.add(field); currentSchema.removeField(field, colIndex); + + isFieldFound = false; } + } + if (!isFieldFound) { field = createField( currentSchema, prefixFieldName, @@ -316,16 +327,19 @@ public class JSONRecordReader implements RecordReader { VectorHolder holder = getOrCreateVectorHolder(reader, field); if (readType != null) { RecordSchema fieldSchema = field.getAssignedSchema(); - reader.setCurrentSchema(fieldSchema); - RecordSchema newSchema = readType.createSchema(); - field.assignSchemaIfNull(newSchema); - if (fieldSchema == null) reader.setCurrentSchema(newSchema); - readType.readRecord(reader, field.getFullFieldName(), rowIndex, groupCount); + if (readType != ReadType.ARRAY) { + reader.setCurrentSchema(fieldSchema); + if (fieldSchema == null) reader.setCurrentSchema(newSchema); + readType.readRecord(reader, field.getFullFieldName(), rowIndex, groupCount); + } else { + readType.readRecord(reader, field.getFullFieldName(), rowIndex, groupCount); + } reader.setCurrentSchema(currentSchema); - } else { + + } else if (holder != null && !newFieldLateBound && fieldType.getMinorType() != MinorType.LATE) { return addValueToVector( rowIndex, holder, @@ -447,22 +461,23 @@ public class JSONRecordReader implements RecordReader { } private VectorHolder getOrCreateVectorHolder(Field field) throws SchemaChangeException { - VectorHolder holder = valueVectorMap.get(field); + String fullFieldName = field.getFullFieldName(); + VectorHolder holder = valueVectorMap.get(fullFieldName); if (holder == null) { MajorType type = field.getFieldType(); - MaterializedField f = MaterializedField.create(new SchemaPath(field.getFullFieldName(), ExpressionPosition.UNKNOWN), type); - - MinorType minorType = f.getType().getMinorType(); + MinorType minorType = type.getMinorType(); if (minorType.equals(MinorType.MAP) || minorType.equals(MinorType.LATE)) { return null; } + MaterializedField f = MaterializedField.create(new SchemaPath(fullFieldName, ExpressionPosition.UNKNOWN), type); + ValueVector v = TypeHelper.getNewVector(f, allocator); AllocationHelper.allocate(v, batchSize, 50); holder = new VectorHolder(batchSize, v); - valueVectorMap.put(field, holder); + valueVectorMap.put(fullFieldName, holder); outputMutator.addField(v); return holder; } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b1e48b32/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/JSONRecordReaderTest.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/JSONRecordReaderTest.java b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/JSONRecordReaderTest.java index b39ac8a..6b353ae 100644 --- a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/JSONRecordReaderTest.java +++ b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/JSONRecordReaderTest.java @@ -70,10 +70,6 @@ public class JSONRecordReaderTest { assertEquals(expectedMinorType, def.getMajorType().getMinorType()); String[] parts = name.split("\\."); int expected = parts.length; - boolean expectingArray = List.class.isAssignableFrom(value.getClass()); - if (expectingArray) { - expected += 1; - } assertEquals(expected, def.getNameList().size()); for(int i = 0; i < parts.length; ++i) { assertEquals(parts[i], def.getName(i).getName()); @@ -203,12 +199,12 @@ public class JSONRecordReaderTest { assertEquals("c", removedFields.get(0).getName()); removedFields.clear(); assertEquals(1, jr.next()); - assertEquals(8, addFields.size()); // The reappearing of field 'c' is also included + assertEquals(7, addFields.size()); // The reappearing of field 'c' is also included assertField(addFields.get(0), 0, MinorType.INT, 12345, "test"); assertField(addFields.get(3), 0, MinorType.BIT, true, "bool"); assertField(addFields.get(5), 0, MinorType.INT, 6, "d"); - assertField(addFields.get(6), 0, MinorType.FLOAT4, (float) 5.16, "c"); - assertField(addFields.get(7), 0, MinorType.VARCHAR, "test3".getBytes(UTF_8), "str2"); + assertField(addFields.get(2), 0, MinorType.FLOAT4, (float) 5.16, "c"); + assertField(addFields.get(6), 0, MinorType.VARCHAR, "test3".getBytes(UTF_8), "str2"); assertEquals(2, removedFields.size()); Iterables.find(removedFields, new Predicate<MaterializedField>() { @Override @@ -282,4 +278,35 @@ public class JSONRecordReaderTest { assertEquals(0, jr.next()); assertTrue(mutator.getRemovedFields().isEmpty()); } + + @Test + public void testRepeatedMissingFields(@Injectable final FragmentContext context) throws ExecutionSetupException { + new Expectations() { + { + context.getAllocator(); + returns(new DirectBufferAllocator()); + } + }; + + JSONRecordReader jr = new JSONRecordReader(context, getResource("scan_json_test_5.json")); + + MockOutputMutator mutator = new MockOutputMutator(); + List<ValueVector> addFields = mutator.getAddFields(); + jr.setup(mutator); + assertEquals(9, jr.next()); + assertEquals(1, addFields.size()); + assertField(addFields.get(0), 0, MinorType.INT, Arrays.<Integer>asList(), "test"); + assertField(addFields.get(0), 1, MinorType.INT, Arrays.asList(1, 2, 3), "test"); + assertField(addFields.get(0), 2, MinorType.INT, Arrays.<Integer>asList(), "test"); + assertField(addFields.get(0), 3, MinorType.INT, Arrays.<Integer>asList(), "test"); + assertField(addFields.get(0), 4, MinorType.INT, Arrays.asList(4, 5, 6), "test"); + assertField(addFields.get(0), 5, MinorType.INT, Arrays.<Integer>asList(), "test"); + assertField(addFields.get(0), 6, MinorType.INT, Arrays.<Integer>asList(), "test"); + assertField(addFields.get(0), 7, MinorType.INT, Arrays.asList(7, 8, 9), "test"); + assertField(addFields.get(0), 8, MinorType.INT, Arrays.<Integer>asList(), "test"); + + + assertEquals(0, jr.next()); + assertTrue(mutator.getRemovedFields().isEmpty()); + } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b1e48b32/sandbox/prototype/exec/java-exec/src/test/resources/scan_json_test_5.json ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/java-exec/src/test/resources/scan_json_test_5.json b/sandbox/prototype/exec/java-exec/src/test/resources/scan_json_test_5.json index ae1aaf2..4977c60 100644 --- a/sandbox/prototype/exec/java-exec/src/test/resources/scan_json_test_5.json +++ b/sandbox/prototype/exec/java-exec/src/test/resources/scan_json_test_5.json @@ -1,21 +1,24 @@ { - "test": 123, - "test2": [1,2,3], - "a": { - "b": 1 - } + "test": [] } { - "test": 1234, - "test3": false, - "a": { - "b": 2 - } + "test": [1,2,3] +} +{ + "test": [] +} +{ + "test": null +} +{ + "test": [4,5,6] +} +{ +} +{ +} +{ + "test": [7,8,9] } { - "test": 1234, - "test2": 1.5, - "a": { - "b": 2 - } } \ No newline at end of file
