Repository: tajo Updated Branches: refs/heads/branch-0.11.1 e63721eb4 -> f2e4773f3
TAJO-2010: Parquet can not read null value. Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/f2e4773f Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/f2e4773f Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/f2e4773f Branch: refs/heads/branch-0.11.1 Commit: f2e4773f3b85beba8e39f76d6cf362f72e3a0ddc Parents: e63721e Author: Jinho Kim <[email protected]> Authored: Fri Dec 4 11:38:32 2015 +0900 Committer: Jinho Kim <[email protected]> Committed: Fri Dec 4 11:38:32 2015 +0900 ---------------------------------------------------------------------- CHANGES | 2 + .../storage/parquet/TajoRecordConverter.java | 13 +-- .../storage/parquet/TajoRecordMaterializer.java | 21 +--- .../org/apache/tajo/storage/TestStorages.java | 116 ++++++++++++++++++- 4 files changed, 124 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/f2e4773f/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index 9d64534..fffaaf7 100644 --- a/CHANGES +++ b/CHANGES @@ -24,6 +24,8 @@ Release 0.11.1 - unreleased BUG FIXES + TAJO-2010: Parquet can not read null value. (jinho) + TAJO-2001: DirectRawFileScanner.getProgress occasionally fails. (jinho) TAJO-1753: GlobalEngine causes NPE occurs occasionally. (jinho) http://git-wip-us.apache.org/repos/asf/tajo/blob/f2e4773f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoRecordConverter.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoRecordConverter.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoRecordConverter.java index 7f236b6..7d73021 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoRecordConverter.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoRecordConverter.java @@ -35,6 +35,7 @@ import parquet.schema.GroupType; import parquet.schema.Type; import java.nio.ByteBuffer; +import java.util.Arrays; /** * Converter to convert a Parquet record into a Tajo Tuple. @@ -146,7 +147,9 @@ public class TajoRecordConverter extends GroupConverter { */ @Override public void start() { - currentTuple = new VTuple(projectionMap.length); + Datum[] datums = new Datum[projectionMap.length]; + Arrays.fill(datums, NullDatum.get()); + currentTuple = new VTuple(datums); } /** @@ -154,14 +157,6 @@ public class TajoRecordConverter extends GroupConverter { */ @Override public void end() { - for (int i = 0; i < projectionMap.length; ++i) { - final int projectionIndex = projectionMap[i]; - Column column = tajoReadSchema.getColumn(projectionIndex); - if (column.getDataType().getType() == TajoDataTypes.Type.NULL_TYPE - || currentTuple.isBlankOrNull(i)) { - set(projectionIndex, NullDatum.get()); - } - } } /** http://git-wip-us.apache.org/repos/asf/tajo/blob/f2e4773f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoRecordMaterializer.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoRecordMaterializer.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoRecordMaterializer.java index 436159c..25610fc 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoRecordMaterializer.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoRecordMaterializer.java @@ -18,8 +18,8 @@ package org.apache.tajo.storage.parquet; -import org.apache.tajo.catalog.Column; import org.apache.tajo.catalog.Schema; +import org.apache.tajo.plan.util.PlannerUtil; import org.apache.tajo.storage.Tuple; import parquet.io.api.GroupConverter; import parquet.io.api.RecordMaterializer; @@ -35,24 +35,13 @@ class TajoRecordMaterializer extends RecordMaterializer<Tuple> { * Creates a new TajoRecordMaterializer. * * @param parquetSchema The Parquet schema of the projection. - * @param tajoSchema The Tajo schema of the projection. + * @param tajoRequestSchema The Tajo schema of the projection. * @param tajoReadSchema The Tajo schema of the table. */ - public TajoRecordMaterializer(MessageType parquetSchema, Schema tajoSchema, + public TajoRecordMaterializer(MessageType parquetSchema, Schema tajoRequestSchema, Schema tajoReadSchema) { - int[] projectionMap = getProjectionMap(tajoReadSchema, tajoSchema); - this.root = new TajoRecordConverter(parquetSchema, tajoReadSchema, - projectionMap); - } - - private int[] getProjectionMap(Schema schema, Schema projection) { - Column[] targets = projection.toArray(); - int[] projectionMap = new int[targets.length]; - for (int i = 0; i < targets.length; ++i) { - int tid = schema.getColumnId(targets[i].getQualifiedName()); - projectionMap[i] = tid; - } - return projectionMap; + int[] projectionMap = PlannerUtil.getTargetIds(tajoReadSchema, tajoRequestSchema.toArray()); + this.root = new TajoRecordConverter(parquetSchema, tajoReadSchema, projectionMap); } /** http://git-wip-us.apache.org/repos/asf/tajo/blob/f2e4773f/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java index 1f6168a..15fc5f7 100644 --- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java +++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java @@ -40,6 +40,7 @@ import org.apache.tajo.datum.DatumFactory; import org.apache.tajo.datum.NullDatum; import org.apache.tajo.datum.ProtobufDatumFactory; import org.apache.tajo.exception.ValueTooLongForTypeCharactersException; +import org.apache.tajo.plan.util.PlannerUtil; import org.apache.tajo.storage.fragment.FileFragment; import org.apache.tajo.storage.fragment.Fragment; import org.apache.tajo.storage.rcfile.RCFile; @@ -55,10 +56,9 @@ import java.io.IOException; import java.util.Arrays; import java.util.Collection; import java.util.List; +import java.util.Random; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; +import static org.junit.Assert.*; @RunWith(Parameterized.class) public class TestStorages { @@ -541,6 +541,116 @@ public class TestStorages { } @Test + public void testNullHandlingTypesWithProjection() throws IOException { + if (internalType) return; + + boolean handleProtobuf = !dataFormat.equalsIgnoreCase(BuiltinStorages.JSON); + + Schema schema = new Schema(); + schema.addColumn("col1", Type.BOOLEAN); + schema.addColumn("col2", Type.CHAR, 7); + schema.addColumn("col3", Type.INT2); + schema.addColumn("col4", Type.INT4); + schema.addColumn("col5", Type.INT8); + schema.addColumn("col6", Type.FLOAT4); + schema.addColumn("col7", Type.FLOAT8); + schema.addColumn("col8", Type.TEXT); + schema.addColumn("col9", Type.BLOB); + schema.addColumn("col10", Type.INET4); + schema.addColumn("col11", Type.NULL_TYPE); + + if (handleProtobuf) { + schema.addColumn("col12", CatalogUtil.newDataType(Type.PROTOBUF, TajoIdProtos.QueryIdProto.class.getName())); + } + + KeyValueSet options = new KeyValueSet(); + TableMeta meta = CatalogUtil.newTableMeta(dataFormat, options); + meta.setPropertySet(CatalogUtil.newDefaultProperty(dataFormat)); + meta.putProperty(StorageConstants.TEXT_NULL, "\\\\N"); + meta.putProperty(StorageConstants.RCFILE_NULL, "\\\\N"); + meta.putProperty(StorageConstants.RCFILE_SERDE, TextSerializerDeserializer.class.getName()); + meta.putProperty(StorageConstants.SEQUENCEFILE_NULL, "\\"); + if (dataFormat.equalsIgnoreCase("AVRO")) { + meta.putProperty(StorageConstants.AVRO_SCHEMA_LITERAL, TEST_NULL_HANDLING_TYPES_AVRO_SCHEMA); + } + + Path tablePath = new Path(testDir, "testProjectedNullHandlingTypes.data"); + FileTablespace sm = TablespaceManager.getLocalFs(); + Appender appender = sm.getAppender(meta, schema, tablePath); + appender.init(); + + QueryId queryid = new QueryId("12345", 5); + ProtobufDatumFactory factory = ProtobufDatumFactory.get(TajoIdProtos.QueryIdProto.class.getName()); + int columnNum = 11 + (handleProtobuf ? 1 : 0); + VTuple seedTuple = new VTuple(columnNum); + seedTuple.put(new Datum[]{ + DatumFactory.createBool(true), // 0 + DatumFactory.createChar("hyunsik"), // 2 + DatumFactory.createInt2((short) 17), // 3 + DatumFactory.createInt4(59), // 4 + DatumFactory.createInt8(23l), // 5 + DatumFactory.createFloat4(77.9f), // 6 + DatumFactory.createFloat8(271.9f), // 7 + DatumFactory.createText("hyunsik"), // 8 + DatumFactory.createBlob("hyunsik".getBytes()),// 9 + DatumFactory.createInet4("192.168.0.1"), // 10 + NullDatum.get(), // 11 + }); + + if (handleProtobuf) { + seedTuple.put(11, factory.createDatum(queryid.getProto())); // 12 + } + + // Making tuples with different null column positions + Tuple tuple; + for (int i = 0; i < columnNum; i++) { + tuple = new VTuple(columnNum); + for (int j = 0; j < columnNum; j++) { + if (i == j) { // i'th column will have NULL value + tuple.put(j, NullDatum.get()); + } else { + tuple.put(j, seedTuple.get(j)); + } + } + appender.addTuple(tuple); + } + appender.flush(); + appender.close(); + + + // Making projection schema with different column positions + Schema target = new Schema(); + Random random = new Random(); + for (int i = 1; i < schema.size(); i++) { + int num = random.nextInt(schema.size() - 1) + 1; + if (i % num == 0) { + target.addColumn(schema.getColumn(i)); + } + } + + FileStatus status = fs.getFileStatus(tablePath); + FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen()); + Scanner scanner = TablespaceManager.getLocalFs().getScanner(meta, schema, fragment, target); + scanner.init(); + + Tuple retrieved; + int[] targetIds = PlannerUtil.getTargetIds(schema, target.toArray()); + int i = 0; + while ((retrieved = scanner.next()) != null) { + assertEquals(target.size(), retrieved.size()); + for (int j = 0; j < targetIds.length; j++) { + if (i == targetIds[j]) { + assertEquals(NullDatum.get(), retrieved.asDatum(j)); + } else { + assertEquals(seedTuple.get(targetIds[j]), retrieved.asDatum(j)); + } + } + i++; + } + scanner.close(); + } + + @Test public void testRCFileTextSerializeDeserialize() throws IOException { if(!dataFormat.equalsIgnoreCase(BuiltinStorages.RCFILE)) return;
