http://git-wip-us.apache.org/repos/asf/tajo/blob/0d1bf41f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/json/JsonLineDeserializer.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/json/JsonLineDeserializer.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/json/JsonLineDeserializer.java index 204f607..0d1c94a 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/json/JsonLineDeserializer.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/json/JsonLineDeserializer.java @@ -23,6 +23,7 @@ import io.netty.buffer.ByteBuf; import net.minidev.json.JSONObject; import net.minidev.json.parser.JSONParser; import net.minidev.json.parser.ParseException; +import org.apache.tajo.catalog.*; import org.apache.commons.net.util.Base64; import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.SchemaUtil; @@ -35,26 +36,212 @@ import org.apache.tajo.datum.TextDatum; import org.apache.tajo.storage.Tuple; import org.apache.tajo.storage.text.TextLineDeserializer; import org.apache.tajo.storage.text.TextLineParsingError; +import org.apache.tajo.util.StringUtils; +import org.apache.tajo.util.TUtil; import java.io.IOException; +import java.util.Iterator; +import java.util.Map; public class JsonLineDeserializer extends TextLineDeserializer { private JSONParser parser; - private Type[] types; - private String[] columnNames; + // Full Path -> Type + private Map<String, Type> types; + private String [] projectedPaths; - public JsonLineDeserializer(Schema schema, TableMeta meta, int[] targetColumnIndexes) { - super(schema, meta, targetColumnIndexes); + public JsonLineDeserializer(Schema schema, TableMeta meta, Column [] projected) { + super(schema, meta); + + projectedPaths = new String[projected.length]; + for (int i = 0; i < projected.length; i++) { + this.projectedPaths[i] = projected[i].getSimpleName(); + } } @Override public void init() { - types = SchemaUtil.toTypes(schema); - columnNames = SchemaUtil.toSimpleNames(schema); + types = TUtil.newHashMap(); + for (Column column : schema.getAllColumns()) { + // Keep types which only belong to projected paths + // For example, assume that a projected path is 'name/first_name', where name is RECORD and first_name is TEXT. + // In this case, we should keep two types: + // * name - RECORD + // * name/first_name TEXT + for (String p :projectedPaths) { + if (p.startsWith(column.getSimpleName())) { + types.put(column.getSimpleName(), column.getDataType().getType()); + } + } + } parser = new JSONParser(JSONParser.MODE_JSON_SIMPLE | JSONParser.IGNORE_CONTROL_CHAR); } + private static String makePath(String [] path, int depth) { + StringBuilder sb = new StringBuilder(); + for (int i = 0; i <= depth; i++) { + sb.append(path[i]); + if (i < depth) { + sb.append(NestedPathUtil.PATH_DELIMITER); + } + } + + return sb.toString(); + } + + /** + * + * + * @param object + * @param pathElements + * @param depth + * @param fieldIndex + * @param output + * @throws IOException + */ + private void getValue(JSONObject object, + String fullPath, + String [] pathElements, + int depth, + int fieldIndex, + Tuple output) throws IOException { + String fieldName = pathElements[depth]; + + if (!object.containsKey(fieldName)) { + output.put(fieldIndex, NullDatum.get()); + } + + switch (types.get(fullPath)) { + case BOOLEAN: + String boolStr = object.getAsString(fieldName); + if (boolStr != null) { + output.put(fieldIndex, DatumFactory.createBool(boolStr.equals("true"))); + } else { + output.put(fieldIndex, NullDatum.get()); + } + break; + case CHAR: + String charStr = object.getAsString(fieldName); + if (charStr != null) { + output.put(fieldIndex, DatumFactory.createChar(charStr)); + } else { + output.put(fieldIndex, NullDatum.get()); + } + break; + case INT1: + case INT2: + Number int2Num = object.getAsNumber(fieldName); + if (int2Num != null) { + output.put(fieldIndex, DatumFactory.createInt2(int2Num.shortValue())); + } else { + output.put(fieldIndex, NullDatum.get()); + } + break; + case INT4: + Number int4Num = object.getAsNumber(fieldName); + if (int4Num != null) { + output.put(fieldIndex, DatumFactory.createInt4(int4Num.intValue())); + } else { + output.put(fieldIndex, NullDatum.get()); + } + break; + case INT8: + Number int8Num = object.getAsNumber(fieldName); + if (int8Num != null) { + output.put(fieldIndex, DatumFactory.createInt8(int8Num.longValue())); + } else { + output.put(fieldIndex, NullDatum.get()); + } + break; + case FLOAT4: + Number float4Num = object.getAsNumber(fieldName); + if (float4Num != null) { + output.put(fieldIndex, DatumFactory.createFloat4(float4Num.floatValue())); + } else { + output.put(fieldIndex, NullDatum.get()); + } + break; + case FLOAT8: + Number float8Num = object.getAsNumber(fieldName); + if (float8Num != null) { + output.put(fieldIndex, DatumFactory.createFloat8(float8Num.doubleValue())); + } else { + output.put(fieldIndex, NullDatum.get()); + } + break; + case TEXT: + String textStr = object.getAsString(fieldName); + if (textStr != null) { + output.put(fieldIndex, DatumFactory.createText(textStr)); + } else { + output.put(fieldIndex, NullDatum.get()); + } + break; + case TIMESTAMP: + String timestampStr = object.getAsString(fieldName); + if (timestampStr != null) { + output.put(fieldIndex, DatumFactory.createTimestamp(timestampStr)); + } else { + output.put(fieldIndex, NullDatum.get()); + } + break; + case TIME: + String timeStr = object.getAsString(fieldName); + if (timeStr != null) { + output.put(fieldIndex, DatumFactory.createTime(timeStr)); + } else { + output.put(fieldIndex, NullDatum.get()); + } + break; + case DATE: + String dateStr = object.getAsString(fieldName); + if (dateStr != null) { + output.put(fieldIndex, DatumFactory.createDate(dateStr)); + } else { + output.put(fieldIndex, NullDatum.get()); + } + break; + case BIT: + case BINARY: + case VARBINARY: + case BLOB: { + Object jsonObject = object.getAsString(fieldName); + + if (jsonObject == null) { + output.put(fieldIndex, NullDatum.get()); + break; + } + + output.put(fieldIndex, DatumFactory.createBlob(Base64.decodeBase64((String) jsonObject))); + break; + } + case INET4: + String inetStr = object.getAsString(fieldName); + if (inetStr != null) { + output.put(fieldIndex, DatumFactory.createInet4(inetStr)); + } else { + output.put(fieldIndex, NullDatum.get()); + } + break; + + case RECORD: + JSONObject nestedObject = (JSONObject) object.get(fieldName); + if (nestedObject != null) { + getValue(nestedObject, fullPath + "/" + pathElements[depth+1], pathElements, depth + 1, fieldIndex, output); + } else { + output.put(fieldIndex, NullDatum.get()); + } + break; + + case NULL_TYPE: + output.put(fieldIndex, NullDatum.get()); + break; + + default: + throw new NotImplementedException(types.get(fullPath).name() + " is not supported."); + } + } + @Override public void deserialize(ByteBuf buf, Tuple output) throws IOException, TextLineParsingError { byte[] line = new byte[buf.readableBytes()]; @@ -70,135 +257,9 @@ public class JsonLineDeserializer extends TextLineDeserializer { throw new TextLineParsingError(new String(line, TextDatum.DEFAULT_CHARSET), ae); } - for (int i = 0; i < targetColumnIndexes.length; i++) { - int actualIdx = targetColumnIndexes[i]; - String fieldName = columnNames[actualIdx]; - - if (!object.containsKey(fieldName)) { - output.put(actualIdx, NullDatum.get()); - continue; - } - - switch (types[actualIdx]) { - case BOOLEAN: - String boolStr = object.getAsString(fieldName); - if (boolStr != null) { - output.put(actualIdx, DatumFactory.createBool(boolStr.equals("true"))); - } else { - output.put(actualIdx, NullDatum.get()); - } - break; - case CHAR: - String charStr = object.getAsString(fieldName); - if (charStr != null) { - output.put(actualIdx, DatumFactory.createChar(charStr)); - } else { - output.put(actualIdx, NullDatum.get()); - } - break; - case INT1: - case INT2: - Number int2Num = object.getAsNumber(fieldName); - if (int2Num != null) { - output.put(actualIdx, DatumFactory.createInt2(int2Num.shortValue())); - } else { - output.put(actualIdx, NullDatum.get()); - } - break; - case INT4: - Number int4Num = object.getAsNumber(fieldName); - if (int4Num != null) { - output.put(actualIdx, DatumFactory.createInt4(int4Num.intValue())); - } else { - output.put(actualIdx, NullDatum.get()); - } - break; - case INT8: - Number int8Num = object.getAsNumber(fieldName); - if (int8Num != null) { - output.put(actualIdx, DatumFactory.createInt8(int8Num.longValue())); - } else { - output.put(actualIdx, NullDatum.get()); - } - break; - case FLOAT4: - Number float4Num = object.getAsNumber(fieldName); - if (float4Num != null) { - output.put(actualIdx, DatumFactory.createFloat4(float4Num.floatValue())); - } else { - output.put(actualIdx, NullDatum.get()); - } - break; - case FLOAT8: - Number float8Num = object.getAsNumber(fieldName); - if (float8Num != null) { - output.put(actualIdx, DatumFactory.createFloat8(float8Num.doubleValue())); - } else { - output.put(actualIdx, NullDatum.get()); - } - break; - case TEXT: - String textStr = object.getAsString(fieldName); - if (textStr != null) { - output.put(actualIdx, DatumFactory.createText(textStr)); - } else { - output.put(actualIdx, NullDatum.get()); - } - break; - case TIMESTAMP: - String timestampStr = object.getAsString(fieldName); - if (timestampStr != null) { - output.put(actualIdx, DatumFactory.createTimestamp(timestampStr)); - } else { - output.put(actualIdx, NullDatum.get()); - } - break; - case TIME: - String timeStr = object.getAsString(fieldName); - if (timeStr != null) { - output.put(actualIdx, DatumFactory.createTime(timeStr)); - } else { - output.put(actualIdx, NullDatum.get()); - } - break; - case DATE: - String dateStr = object.getAsString(fieldName); - if (dateStr != null) { - output.put(actualIdx, DatumFactory.createDate(dateStr)); - } else { - output.put(actualIdx, NullDatum.get()); - } - break; - case BIT: - case BINARY: - case VARBINARY: - case BLOB: { - Object jsonObject = object.getAsString(fieldName); - - if (jsonObject == null) { - output.put(actualIdx, NullDatum.get()); - break; - } - - output.put(actualIdx, DatumFactory.createBlob(Base64.decodeBase64((String) jsonObject))); - break; - } - case INET4: - String inetStr = object.getAsString(fieldName); - if (inetStr != null) { - output.put(actualIdx, DatumFactory.createInet4(inetStr)); - } else { - output.put(actualIdx, NullDatum.get()); - } - break; - - case NULL_TYPE: - output.put(actualIdx, NullDatum.get()); - break; - - default: - throw new NotImplementedException(types[actualIdx].name() + " is not supported."); - } + for (int i = 0; i < projectedPaths.length; i++) { + String [] paths = projectedPaths[i].split(NestedPathUtil.PATH_DELIMITER); + getValue(object, paths[0], paths, 0, i, output); } }
http://git-wip-us.apache.org/repos/asf/tajo/blob/0d1bf41f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/json/JsonLineSerDe.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/json/JsonLineSerDe.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/json/JsonLineSerDe.java index 6db2c29..5f12d76 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/json/JsonLineSerDe.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/json/JsonLineSerDe.java @@ -18,6 +18,7 @@ package org.apache.tajo.storage.json; +import org.apache.tajo.catalog.Column; import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.TableMeta; import org.apache.tajo.storage.text.TextLineDeserializer; @@ -26,8 +27,8 @@ import org.apache.tajo.storage.text.TextLineSerializer; public class JsonLineSerDe extends TextLineSerDe { @Override - public TextLineDeserializer createDeserializer(Schema schema, TableMeta meta, int[] targetColumnIndexes) { - return new JsonLineDeserializer(schema, meta, targetColumnIndexes); + public TextLineDeserializer createDeserializer(Schema schema, TableMeta meta, Column [] projected) { + return new JsonLineDeserializer(schema, meta, projected); } @Override http://git-wip-us.apache.org/repos/asf/tajo/blob/0d1bf41f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/json/JsonLineSerializer.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/json/JsonLineSerializer.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/json/JsonLineSerializer.java index d6faf2d..34e9661 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/json/JsonLineSerializer.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/json/JsonLineSerializer.java @@ -35,8 +35,6 @@ import java.io.IOException; import java.io.OutputStream; public class JsonLineSerializer extends TextLineSerializer { - private static ProtobufJsonFormat protobufJsonFormat = ProtobufJsonFormat.getInstance(); - private Type [] types; private String [] simpleNames; private int columnNum; http://git-wip-us.apache.org/repos/asf/tajo/blob/0d1bf41f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoRecordConverter.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoRecordConverter.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoRecordConverter.java index a091eac..4c675a4 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoRecordConverter.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoRecordConverter.java @@ -77,10 +77,11 @@ public class TajoRecordConverter extends GroupConverter { continue; } Type type = parquetSchema.getType(index); + final int writeIndex = i; converters[index] = newConverter(column, type, new ParentValueContainer() { @Override void add(Object value) { - TajoRecordConverter.this.set(projectionIndex, value); + TajoRecordConverter.this.set(writeIndex, value); } }); ++index; @@ -145,7 +146,7 @@ public class TajoRecordConverter extends GroupConverter { */ @Override public void start() { - currentTuple = new VTuple(tupleSize); + currentTuple = new VTuple(projectionMap.length); } /** @@ -157,7 +158,7 @@ public class TajoRecordConverter extends GroupConverter { final int projectionIndex = projectionMap[i]; Column column = tajoReadSchema.getColumn(projectionIndex); if (column.getDataType().getType() == TajoDataTypes.Type.NULL_TYPE - || currentTuple.get(projectionIndex) == null) { + || currentTuple.get(i) == null) { set(projectionIndex, NullDatum.get()); } } http://git-wip-us.apache.org/repos/asf/tajo/blob/0d1bf41f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java index 62e5ed9..af260b4 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java @@ -1635,7 +1635,7 @@ public class RCFile { return null; } - Tuple tuple = new VTuple(schema.size()); + Tuple tuple = new VTuple(targets.length); getCurrentRow(tuple); return tuple; } @@ -1705,16 +1705,16 @@ public class RCFile { for (int j = 0; j < selectedColumns.length; ++j) { SelectedColumn col = selectedColumns[j]; - int i = col.colIndex; + int actualColumnIdx = col.colIndex; if (col.isNulled) { - tuple.put(i, NullDatum.get()); + tuple.put(j, NullDatum.get()); } else { colAdvanceRow(j, col); - Datum datum = serde.deserialize(schema.getColumn(i), + Datum datum = serde.deserialize(schema.getColumn(actualColumnIdx), currentValue.loadedColumnsValueBuffer[j].getData(), col.rowReadIndex, col.prvLength, nullChars); - tuple.put(i, datum); + tuple.put(j, datum); col.rowReadIndex += col.prvLength; } } http://git-wip-us.apache.org/repos/asf/tajo/blob/0d1bf41f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileScanner.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileScanner.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileScanner.java index 92a041c..af0973e 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileScanner.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileScanner.java @@ -108,9 +108,9 @@ public class SequenceFileScanner extends FileScanner { } - fieldIsNull = new boolean[schema.getColumns().size()]; - fieldStart = new int[schema.getColumns().size()]; - fieldLength = new int[schema.getColumns().size()]; + fieldIsNull = new boolean[schema.getRootColumns().size()]; + fieldStart = new int[schema.getRootColumns().size()]; + fieldLength = new int[schema.getRootColumns().size()]; prepareProjection(targets); @@ -172,7 +172,7 @@ public class SequenceFileScanner extends FileScanner { Text text = new Text(); reader.getCurrentValue(text); cells = BytesUtils.splitPreserveAllTokens(text.getBytes(), - delimiter, projectionMap, schema.getColumns().size()); + delimiter, projectionMap, schema.getRootColumns().size()); totalBytes += (long)text.getBytes().length; tuple = new LazyTuple(schema, cells, 0, nullChars, serde); } @@ -197,7 +197,7 @@ public class SequenceFileScanner extends FileScanner { * So, tajo must make a tuple after parsing hive style BinarySerDe. */ private Tuple makeTuple(BytesWritable value) throws IOException{ - Tuple tuple = new VTuple(schema.getColumns().size()); + Tuple tuple = new VTuple(schema.getRootColumns().size()); int start = 0; int length = value.getLength(); @@ -213,7 +213,7 @@ public class SequenceFileScanner extends FileScanner { int lastFieldByteEnd = start + 1; // Go through all bytes in the byte[] - for (int i = 0; i < schema.getColumns().size(); i++) { + for (int i = 0; i < schema.getRootColumns().size(); i++) { fieldIsNull[i] = true; if ((nullByte & (1 << (i % 8))) != 0) { fieldIsNull[i] = false; @@ -322,12 +322,12 @@ public class SequenceFileScanner extends FileScanner { @Override public boolean isProjectable() { - return true; + return false; } @Override public boolean isSelectable() { - return true; + return false; } @Override http://git-wip-us.apache.org/repos/asf/tajo/blob/0d1bf41f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineDeserializer.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineDeserializer.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineDeserializer.java index 03a0a26..0901c0b 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineDeserializer.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineDeserializer.java @@ -19,6 +19,7 @@ package org.apache.tajo.storage.text; import io.netty.buffer.ByteBuf; +import org.apache.tajo.catalog.Column; import io.netty.buffer.ByteBufProcessor; import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.TableMeta; @@ -28,6 +29,7 @@ import org.apache.tajo.storage.FieldSerializerDeserializer; import org.apache.tajo.storage.Tuple; import java.io.IOException; +import java.util.Arrays; public class CSVLineDeserializer extends TextLineDeserializer { private ByteBufProcessor processor; @@ -35,8 +37,18 @@ public class CSVLineDeserializer extends TextLineDeserializer { private ByteBuf nullChars; private int delimiterCompensation; - public CSVLineDeserializer(Schema schema, TableMeta meta, int[] targetColumnIndexes) { - super(schema, meta, targetColumnIndexes); + private int [] targetColumnIndexes; + private Column [] projected; + + public CSVLineDeserializer(Schema schema, TableMeta meta, Column [] projected) { + super(schema, meta); + + this.projected = projected; + targetColumnIndexes = new int[projected.length]; + for (int i = 0; i < projected.length; i++) { + targetColumnIndexes[i] = schema.getColumnId(projected[i].getQualifiedName()); + } + Arrays.sort(targetColumnIndexes); } @Override @@ -66,7 +78,7 @@ public class CSVLineDeserializer extends TextLineDeserializer { final int rowLength = lineBuf.readableBytes(); int start = 0, fieldLength = 0, end = 0; - //Projection + // Projection int currentTarget = 0; int currentIndex = 0; @@ -83,10 +95,10 @@ public class CSVLineDeserializer extends TextLineDeserializer { lineBuf.setIndex(start, start + fieldLength); try { - Datum datum = fieldSerDer.deserialize(lineBuf, schema.getColumn(currentIndex), currentIndex, nullChars); - output.put(currentIndex, datum); + Datum datum = fieldSerDer.deserialize(lineBuf, projected[currentTarget], currentIndex, nullChars); + output.put(currentTarget, datum); } catch (Exception e) { - output.put(currentIndex, NullDatum.get()); + output.put(currentTarget, NullDatum.get()); } currentTarget++; @@ -103,7 +115,7 @@ public class CSVLineDeserializer extends TextLineDeserializer { /* If a text row is less than table schema size, tuple should set to NullDatum */ if (projection.length > currentTarget) { for (; currentTarget < projection.length; currentTarget++) { - output.put(projection[currentTarget], NullDatum.get()); + output.put(currentTarget, NullDatum.get()); } } } http://git-wip-us.apache.org/repos/asf/tajo/blob/0d1bf41f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineSerDe.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineSerDe.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineSerDe.java index 988d5d1..4ebdbe8 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineSerDe.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineSerDe.java @@ -19,6 +19,7 @@ package org.apache.tajo.storage.text; import org.apache.commons.lang.StringEscapeUtils; +import org.apache.tajo.catalog.Column; import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.TableMeta; import org.apache.tajo.storage.StorageConstants; @@ -26,8 +27,8 @@ import org.apache.tajo.util.Bytes; public class CSVLineSerDe extends TextLineSerDe { @Override - public TextLineDeserializer createDeserializer(Schema schema, TableMeta meta, int[] targetColumnIndexes) { - return new CSVLineDeserializer(schema, meta, targetColumnIndexes); + public TextLineDeserializer createDeserializer(Schema schema, TableMeta meta, Column [] projected) { + return new CSVLineDeserializer(schema, meta, projected); } @Override http://git-wip-us.apache.org/repos/asf/tajo/blob/0d1bf41f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java index 5e7bd94..55a2b96 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java @@ -275,7 +275,6 @@ public class DelimitedTextFile { private final long endOffset; /** The number of actual read records */ private int recordCount = 0; - private int[] targetColumnIndexes; private DelimitedLineReader reader; private TextLineDeserializer deserializer; @@ -321,13 +320,7 @@ public class DelimitedTextFile { targets = schema.toArray(); } - targetColumnIndexes = new int[targets.length]; - for (int i = 0; i < targets.length; i++) { - targetColumnIndexes[i] = schema.getColumnId(targets[i].getQualifiedName()); - } - super.init(); - Arrays.sort(targetColumnIndexes); if (LOG.isDebugEnabled()) { LOG.debug("DelimitedTextFileScanner open:" + fragment.getPath() + "," + startOffset + "," + endOffset); } @@ -336,7 +329,7 @@ public class DelimitedTextFile { reader.readLine(); // skip first line; } - deserializer = getLineSerde().createDeserializer(schema, meta, targetColumnIndexes); + deserializer = getLineSerde().createDeserializer(schema, meta, targets); deserializer.init(); } @@ -391,7 +384,7 @@ public class DelimitedTextFile { return EmptyTuple.get(); } - tuple = new VTuple(schema.size()); + tuple = new VTuple(targets.length); tuple.setOffset(offset); try { http://git-wip-us.apache.org/repos/asf/tajo/blob/0d1bf41f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextLineDeserializer.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextLineDeserializer.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextLineDeserializer.java index 89a7de9..f067cb3 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextLineDeserializer.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextLineDeserializer.java @@ -31,12 +31,10 @@ import java.io.IOException; public abstract class TextLineDeserializer { protected final Schema schema; protected final TableMeta meta; - protected final int[] targetColumnIndexes; - public TextLineDeserializer(Schema schema, TableMeta meta, int [] targetColumnIndexes) { + public TextLineDeserializer(Schema schema, TableMeta meta) { this.schema = schema; this.meta = meta; - this.targetColumnIndexes = targetColumnIndexes; } /** http://git-wip-us.apache.org/repos/asf/tajo/blob/0d1bf41f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextLineSerDe.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextLineSerDe.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextLineSerDe.java index 1a53bb0..c09a83b 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextLineSerDe.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextLineSerDe.java @@ -21,6 +21,7 @@ package org.apache.tajo.storage.text; import io.netty.buffer.ByteBuf; import org.apache.commons.lang.StringEscapeUtils; import org.apache.commons.lang.StringUtils; +import org.apache.tajo.catalog.Column; import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.TableMeta; import org.apache.tajo.datum.NullDatum; @@ -36,7 +37,7 @@ public abstract class TextLineSerDe { public TextLineSerDe() { } - public abstract TextLineDeserializer createDeserializer(Schema schema, TableMeta meta, int [] targetColumnIndexes); + public abstract TextLineDeserializer createDeserializer(Schema schema, TableMeta meta, Column [] projected); public abstract TextLineSerializer createSerializer(Schema schema, TableMeta meta); http://git-wip-us.apache.org/repos/asf/tajo/blob/0d1bf41f/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestMergeScanner.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestMergeScanner.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestMergeScanner.java index 561e2ef..322818d 100644 --- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestMergeScanner.java +++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestMergeScanner.java @@ -175,7 +175,19 @@ public class TestMergeScanner { Tuple tuple; while ((tuple = scanner.next()) != null) { totalCounts++; - if (isProjectableStorage(meta.getStoreType())) { + + if (storeType == StoreType.RAW) { + assertEquals(4, tuple.size()); + assertNotNull(tuple.get(0)); + assertNotNull(tuple.get(1)); + assertNotNull(tuple.get(2)); + assertNotNull(tuple.get(3)); + } else if (scanner.isProjectable()) { + assertEquals(2, tuple.size()); + assertNotNull(tuple.get(0)); + assertNotNull(tuple.get(1)); + } else { + assertEquals(4, tuple.size()); assertNotNull(tuple.get(0)); assertNull(tuple.get(1)); assertNotNull(tuple.get(2)); @@ -189,14 +201,13 @@ public class TestMergeScanner { private static boolean isProjectableStorage(StoreType type) { switch (type) { - case RCFILE: - case PARQUET: - case SEQUENCEFILE: - case CSV: - case AVRO: - return true; - default: - return false; + case CSV: + case SEQUENCEFILE: + case RAW: + case ROWFILE: + return false; + default: + return true; } } } http://git-wip-us.apache.org/repos/asf/tajo/blob/0d1bf41f/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java index 456ea00..a735307 100644 --- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java +++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java @@ -297,15 +297,7 @@ public class TestStorages { int tupleCnt = 0; Tuple tuple; while ((tuple = scanner.next()) != null) { - if (storeType == StoreType.RCFILE - || storeType == StoreType.CSV - || storeType == StoreType.PARQUET - || storeType == StoreType.SEQUENCEFILE - || storeType == StoreType.AVRO) { - assertTrue(tuple.get(0) == null); - } - assertTrue(tupleCnt + 2 == tuple.get(1).asInt8()); - assertTrue(tupleCnt + 3 == tuple.get(2).asFloat4()); + verifyProjectedFields(scanner.isProjectable(), tuple, tupleCnt); tupleCnt++; } scanner.close(); @@ -313,6 +305,20 @@ public class TestStorages { assertEquals(tupleNum, tupleCnt); } + private void verifyProjectedFields(boolean projectable, Tuple tuple, int tupleCnt) { + if (projectable) { + assertTrue(tupleCnt + 2 == tuple.get(0).asInt8()); + assertTrue(tupleCnt + 3 == tuple.get(1).asFloat4()); + } else { + // RAW and ROW always project all fields. + if (storeType != StoreType.RAW && storeType != StoreType.ROWFILE) { + assertTrue(tuple.get(0) == null); + } + assertTrue(tupleCnt + 2 == tuple.get(1).asInt8()); + assertTrue(tupleCnt + 3 == tuple.get(2).asFloat4()); + } + } + @Test public void testVariousTypes() throws IOException { boolean handleProtobuf = storeType != StoreType.JSON; @@ -956,7 +962,7 @@ public class TestStorages { @Test public void testLessThanSchemaSize() throws IOException { /* RAW is internal storage. It must be same with schema size */ - if (storeType == StoreType.RAW || storeType == StoreType.AVRO){ + if (storeType == StoreType.RAW || storeType == StoreType.AVRO || storeType == StoreType.PARQUET) { return; } @@ -1008,7 +1014,12 @@ public class TestStorages { Tuple tuple = scanner.next(); scanner.close(); - assertEquals(expect.get(1), tuple.get(1)); - assertEquals(NullDatum.get(), tuple.get(4)); + if (scanner.isProjectable()) { + assertEquals(expect.get(1), tuple.get(0)); + assertEquals(NullDatum.get(), tuple.get(1)); + } else { + assertEquals(expect.get(1), tuple.get(1)); + assertEquals(NullDatum.get(), tuple.get(4)); + } } }
