http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/index/bst/BSTIndex.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/index/bst/BSTIndex.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/index/bst/BSTIndex.java new file mode 100644 index 0000000..81a1ffd --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/index/bst/BSTIndex.java @@ -0,0 +1,623 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.index.bst; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.proto.CatalogProtos.SchemaProto; +import org.apache.tajo.storage.*; +import org.apache.tajo.storage.RowStoreUtil.RowStoreDecoder; +import org.apache.tajo.storage.RowStoreUtil.RowStoreEncoder; +import org.apache.tajo.storage.index.IndexMethod; +import org.apache.tajo.storage.index.IndexWriter; +import org.apache.tajo.storage.index.OrderIndexReader; + +import java.io.Closeable; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.LinkedList; +import java.util.Set; +import java.util.TreeMap; + +import static org.apache.tajo.index.IndexProtos.TupleComparatorProto; + +/** + * This is two-level binary search tree index. This is one of the value-list + * index structure. Thus, it is inefficient in the case where + * the many of the values are same. Also, the BST shows the fast performance + * when the selectivity of rows to be retrieved is less than 5%. + * BSTIndexWriter is not thread-safe, whereas BSTIndexReader is thread-safe. + */ +public class BSTIndex implements IndexMethod { + private static final Log LOG = LogFactory.getLog(BSTIndex.class); + + public static final int ONE_LEVEL_INDEX = 1; + public static final int TWO_LEVEL_INDEX = 2; + + private final Configuration conf; + + public BSTIndex(final Configuration conf) { + this.conf = conf; + } + + @Override + public BSTIndexWriter getIndexWriter(final Path fileName, int level, Schema keySchema, + TupleComparator comparator) throws IOException { + return new BSTIndexWriter(fileName, level, keySchema, comparator); + } + + @Override + public BSTIndexReader getIndexReader(Path fileName, Schema keySchema, TupleComparator comparator) throws IOException { + return new BSTIndexReader(fileName, keySchema, comparator); + } + + public BSTIndexReader getIndexReader(Path fileName) throws IOException { + return new BSTIndexReader(fileName); + } + + public class BSTIndexWriter extends IndexWriter implements Closeable { + private FSDataOutputStream out; + private FileSystem fs; + private int level; + private int loadNum = 4096; + private Path fileName; + + private final Schema keySchema; + private final TupleComparator compartor; + private final KeyOffsetCollector collector; + private KeyOffsetCollector rootCollector; + + private Tuple firstKey; + private Tuple lastKey; + + private RowStoreEncoder rowStoreEncoder; + + // private Tuple lastestKey = null; + + /** + * constructor + * + * @param level + * : IndexCreater.ONE_LEVEL_INDEX or IndexCreater.TWO_LEVEL_INDEX + * @throws java.io.IOException + */ + public BSTIndexWriter(final Path fileName, int level, Schema keySchema, + TupleComparator comparator) throws IOException { + this.fileName = fileName; + this.level = level; + this.keySchema = keySchema; + this.compartor = comparator; + this.collector = new KeyOffsetCollector(comparator); + this.rowStoreEncoder = RowStoreUtil.createEncoder(keySchema); + } + + public void setLoadNum(int loadNum) { + this.loadNum = loadNum; + } + + public void open() throws IOException { + fs = fileName.getFileSystem(conf); + if (fs.exists(fileName)) { + throw new IOException("ERROR: index file (" + fileName + " already exists"); + } + out = fs.create(fileName); + } + + @Override + public void write(Tuple key, long offset) throws IOException { + if (firstKey == null || compartor.compare(key, firstKey) < 0) { + firstKey = key; + } + if (lastKey == null || compartor.compare(lastKey, key) < 0) { + lastKey = key; + } + + collector.put(key, offset); + } + + public TupleComparator getComparator() { + return this.compartor; + } + + public void flush() throws IOException { + out.flush(); + } + + public void writeHeader(int entryNum) throws IOException { + // schema + byte [] schemaBytes = keySchema.getProto().toByteArray(); + out.writeInt(schemaBytes.length); + out.write(schemaBytes); + + // comparator + byte [] comparatorBytes = compartor.getProto().toByteArray(); + out.writeInt(comparatorBytes.length); + out.write(comparatorBytes); + + // level + out.writeInt(this.level); + // entry + out.writeInt(entryNum); + if (entryNum > 0) { + byte [] minBytes = rowStoreEncoder.toBytes(firstKey); + out.writeInt(minBytes.length); + out.write(minBytes); + byte [] maxBytes = rowStoreEncoder.toBytes(lastKey); + out.writeInt(maxBytes.length); + out.write(maxBytes); + } + out.flush(); + } + + public void close() throws IOException { + /* two level initialize */ + if (this.level == TWO_LEVEL_INDEX) { + rootCollector = new KeyOffsetCollector(this.compartor); + } + + /* data writing phase */ + TreeMap<Tuple, LinkedList<Long>> keyOffsetMap = collector.getMap(); + Set<Tuple> keySet = keyOffsetMap.keySet(); + + int entryNum = keySet.size(); + writeHeader(entryNum); + + int loadCount = this.loadNum - 1; + for (Tuple key : keySet) { + + if (this.level == TWO_LEVEL_INDEX) { + loadCount++; + if (loadCount == this.loadNum) { + rootCollector.put(key, out.getPos()); + loadCount = 0; + } + } + /* key writing */ + byte[] buf = rowStoreEncoder.toBytes(key); + out.writeInt(buf.length); + out.write(buf); + + /**/ + LinkedList<Long> offsetList = keyOffsetMap.get(key); + /* offset num writing */ + int offsetSize = offsetList.size(); + out.writeInt(offsetSize); + /* offset writing */ + for (Long offset : offsetList) { + out.writeLong(offset); + } + } + + out.flush(); + out.close(); + keySet.clear(); + collector.clear(); + + FSDataOutputStream rootOut = null; + /* root index creating phase */ + if (this.level == TWO_LEVEL_INDEX) { + TreeMap<Tuple, LinkedList<Long>> rootMap = rootCollector.getMap(); + keySet = rootMap.keySet(); + + rootOut = fs.create(new Path(fileName + ".root")); + rootOut.writeInt(this.loadNum); + rootOut.writeInt(keySet.size()); + + /* root key writing */ + for (Tuple key : keySet) { + byte[] buf = rowStoreEncoder.toBytes(key); + rootOut.writeInt(buf.length); + rootOut.write(buf); + + LinkedList<Long> offsetList = rootMap.get(key); + if (offsetList.size() > 1 || offsetList.size() == 0) { + throw new IOException("Why root index doen't have one offset?"); + } + rootOut.writeLong(offsetList.getFirst()); + + } + rootOut.flush(); + rootOut.close(); + + keySet.clear(); + rootCollector.clear(); + } + } + + private class KeyOffsetCollector { + private TreeMap<Tuple, LinkedList<Long>> map; + + public KeyOffsetCollector(TupleComparator comparator) { + map = new TreeMap<Tuple, LinkedList<Long>>(comparator); + } + + public void put(Tuple key, long offset) { + if (map.containsKey(key)) { + map.get(key).add(offset); + } else { + LinkedList<Long> list = new LinkedList<Long>(); + list.add(offset); + map.put(key, list); + } + } + + public TreeMap<Tuple, LinkedList<Long>> getMap() { + return this.map; + } + + public void clear() { + this.map.clear(); + } + } + } + + /** + * BSTIndexReader is thread-safe. + */ + public class BSTIndexReader implements OrderIndexReader , Closeable{ + private Path fileName; + private Schema keySchema; + private TupleComparator comparator; + + private FileSystem fs; + private FSDataInputStream indexIn; + private FSDataInputStream subIn; + + private int level; + private int entryNum; + private int loadNum = -1; + private Tuple firstKey; + private Tuple lastKey; + + // the cursors of BST + private int rootCursor; + private int keyCursor; + private int offsetCursor; + + // mutex + private final Object mutex = new Object(); + + private RowStoreDecoder rowStoreDecoder; + + /** + * + * @param fileName + * @param keySchema + * @param comparator + * @throws java.io.IOException + */ + public BSTIndexReader(final Path fileName, Schema keySchema, TupleComparator comparator) throws IOException { + this.fileName = fileName; + this.keySchema = keySchema; + this.comparator = comparator; + this.rowStoreDecoder = RowStoreUtil.createDecoder(keySchema); + } + + public BSTIndexReader(final Path fileName) throws IOException { + this.fileName = fileName; + } + + public Schema getKeySchema() { + return this.keySchema; + } + + public TupleComparator getComparator() { + return this.comparator; + } + + private void readHeader() throws IOException { + // schema + int schemaByteSize = indexIn.readInt(); + byte [] schemaBytes = new byte[schemaByteSize]; + StorageUtil.readFully(indexIn, schemaBytes, 0, schemaByteSize); + + SchemaProto.Builder builder = SchemaProto.newBuilder(); + builder.mergeFrom(schemaBytes); + SchemaProto proto = builder.build(); + this.keySchema = new Schema(proto); + this.rowStoreDecoder = RowStoreUtil.createDecoder(keySchema); + + // comparator + int compByteSize = indexIn.readInt(); + byte [] compBytes = new byte[compByteSize]; + StorageUtil.readFully(indexIn, compBytes, 0, compByteSize); + + TupleComparatorProto.Builder compProto = TupleComparatorProto.newBuilder(); + compProto.mergeFrom(compBytes); + this.comparator = new BaseTupleComparator(compProto.build()); + + // level + this.level = indexIn.readInt(); + // entry + this.entryNum = indexIn.readInt(); + if (entryNum > 0) { // if there is no any entry, do not read firstKey/lastKey values + byte [] minBytes = new byte[indexIn.readInt()]; + StorageUtil.readFully(indexIn, minBytes, 0, minBytes.length); + this.firstKey = rowStoreDecoder.toTuple(minBytes); + + byte [] maxBytes = new byte[indexIn.readInt()]; + StorageUtil.readFully(indexIn, maxBytes, 0, maxBytes.length); + this.lastKey = rowStoreDecoder.toTuple(maxBytes); + } + } + + public void open() + throws IOException { + /* init the index file */ + fs = fileName.getFileSystem(conf); + if (!fs.exists(fileName)) { + throw new FileNotFoundException("ERROR: does not exist " + fileName.toString()); + } + + indexIn = fs.open(this.fileName); + readHeader(); + fillData(); + } + + private void fillData() throws IOException { + /* load on memory */ + if (this.level == TWO_LEVEL_INDEX) { + + Path rootPath = new Path(this.fileName + ".root"); + if (!fs.exists(rootPath)) { + throw new FileNotFoundException("root index did not created"); + } + + subIn = indexIn; + indexIn = fs.open(rootPath); + /* root index header reading : type => loadNum => indexSize */ + this.loadNum = indexIn.readInt(); + this.entryNum = indexIn.readInt(); + /**/ + fillRootIndex(entryNum, indexIn); + + } else { + fillLeafIndex(entryNum, indexIn, -1); + } + } + + /** + * + * @return + * @throws java.io.IOException + */ + public long find(Tuple key) throws IOException { + return find(key, false); + } + + @Override + public long find(Tuple key, boolean nextKey) throws IOException { + synchronized (mutex) { + int pos = -1; + if (this.level == ONE_LEVEL_INDEX) { + pos = oneLevBS(key); + } else if (this.level == TWO_LEVEL_INDEX) { + pos = twoLevBS(key, this.loadNum + 1); + } else { + throw new IOException("More than TWL_LEVEL_INDEX is not supported."); + } + + if (nextKey) { + if (pos + 1 >= this.offsetSubIndex.length) { + return -1; + } + keyCursor = pos + 1; + offsetCursor = 0; + } else { + if (correctable) { + keyCursor = pos; + offsetCursor = 0; + } else { + return -1; + } + } + + return this.offsetSubIndex[keyCursor][offsetCursor]; + } + } + + public long next() throws IOException { + synchronized (mutex) { + if (offsetSubIndex[keyCursor].length - 1 > offsetCursor) { + offsetCursor++; + } else { + if (offsetSubIndex.length - 1 > keyCursor) { + keyCursor++; + offsetCursor = 0; + } else { + if (offsetIndex.length -1 > rootCursor) { + rootCursor++; + fillLeafIndex(loadNum + 1, subIn, this.offsetIndex[rootCursor]); + keyCursor = 1; + offsetCursor = 0; + } else { + return -1; + } + } + } + + return this.offsetSubIndex[keyCursor][offsetCursor]; + } + } + + public boolean isCurInMemory() { + return (offsetSubIndex[keyCursor].length - 1 >= offsetCursor); + } + + private void fillLeafIndex(int entryNum, FSDataInputStream in, long pos) + throws IOException { + int counter = 0; + try { + if (pos != -1) { + in.seek(pos); + } + this.dataSubIndex = new Tuple[entryNum]; + this.offsetSubIndex = new long[entryNum][]; + + byte[] buf; + for (int i = 0; i < entryNum; i++) { + counter++; + buf = new byte[in.readInt()]; + StorageUtil.readFully(in, buf, 0, buf.length); + dataSubIndex[i] = rowStoreDecoder.toTuple(buf); + + int offsetNum = in.readInt(); + this.offsetSubIndex[i] = new long[offsetNum]; + for (int j = 0; j < offsetNum; j++) { + this.offsetSubIndex[i][j] = in.readLong(); + } + + } + + } catch (IOException e) { + counter--; + if (pos != -1) { + in.seek(pos); + } + this.dataSubIndex = new Tuple[counter]; + this.offsetSubIndex = new long[counter][]; + + byte[] buf; + for (int i = 0; i < counter; i++) { + buf = new byte[in.readInt()]; + StorageUtil.readFully(in, buf, 0, buf.length); + dataSubIndex[i] = rowStoreDecoder.toTuple(buf); + + int offsetNum = in.readInt(); + this.offsetSubIndex[i] = new long[offsetNum]; + for (int j = 0; j < offsetNum; j++) { + this.offsetSubIndex[i][j] = in.readLong(); + } + + } + } + } + + public Tuple getFirstKey() { + return this.firstKey; + } + + public Tuple getLastKey() { + return this.lastKey; + } + + private void fillRootIndex(int entryNum, FSDataInputStream in) + throws IOException { + this.dataIndex = new Tuple[entryNum]; + this.offsetIndex = new long[entryNum]; + Tuple keyTuple; + byte[] buf; + for (int i = 0; i < entryNum; i++) { + buf = new byte[in.readInt()]; + StorageUtil.readFully(in, buf, 0, buf.length); + keyTuple = rowStoreDecoder.toTuple(buf); + dataIndex[i] = keyTuple; + this.offsetIndex[i] = in.readLong(); + } + } + + /* memory index, only one is used. */ + private Tuple[] dataIndex = null; + private Tuple[] dataSubIndex = null; + + /* offset index */ + private long[] offsetIndex = null; + private long[][] offsetSubIndex = null; + + private boolean correctable = true; + + private int oneLevBS(Tuple key) throws IOException { + correctable = true; + int pos = binarySearch(this.dataSubIndex, key, 0, this.dataSubIndex.length); + return pos; + } + + private int twoLevBS(Tuple key, int loadNum) throws IOException { + int pos = binarySearch(this.dataIndex, key, 0, this.dataIndex.length); + if(pos > 0) { + rootCursor = pos; + } else { + rootCursor = 0; + } + fillLeafIndex(loadNum, subIn, this.offsetIndex[rootCursor]); + pos = binarySearch(this.dataSubIndex, key, 0, this.dataSubIndex.length); + + return pos; + } + + private int binarySearch(Tuple[] arr, Tuple key, int startPos, int endPos) { + int offset = -1; + int start = startPos; + int end = endPos; + + //http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6412541 + int centerPos = (start + end) >>> 1; + while (true) { + if (comparator.compare(arr[centerPos], key) > 0) { + if (centerPos == 0) { + correctable = false; + break; + } else if (comparator.compare(arr[centerPos - 1], key) < 0) { + correctable = false; + offset = centerPos - 1; + break; + } else { + end = centerPos; + centerPos = (start + end) / 2; + } + } else if (comparator.compare(arr[centerPos], key) < 0) { + if (centerPos == arr.length - 1) { + correctable = false; + offset = centerPos; + break; + } else if (comparator.compare(arr[centerPos + 1], key) > 0) { + correctable = false; + offset = centerPos; + break; + } else { + start = centerPos + 1; + centerPos = (start + end) / 2; + } + } else { + correctable = true; + offset = centerPos; + break; + } + } + return offset; + } + + @Override + public void close() throws IOException { + this.indexIn.close(); + this.subIn.close(); + } + + @Override + public String toString() { + return "BSTIndex (" + firstKey + ", " + lastKey + ") " + fileName; + } + } +}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/json/JsonLineDeserializer.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/json/JsonLineDeserializer.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/json/JsonLineDeserializer.java new file mode 100644 index 0000000..dfe36f6 --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/json/JsonLineDeserializer.java @@ -0,0 +1,225 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.json; + + +import io.netty.buffer.ByteBuf; +import net.minidev.json.JSONArray; +import net.minidev.json.JSONObject; +import net.minidev.json.parser.JSONParser; +import net.minidev.json.parser.ParseException; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.SchemaUtil; +import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.common.TajoDataTypes.Type; +import org.apache.tajo.common.exception.NotImplementedException; +import org.apache.tajo.datum.DatumFactory; +import org.apache.tajo.datum.NullDatum; +import org.apache.tajo.datum.TextDatum; +import org.apache.tajo.datum.protobuf.ProtobufJsonFormat; +import org.apache.tajo.storage.Tuple; +import org.apache.tajo.storage.text.TextLineDeserializer; +import org.apache.tajo.storage.text.TextLineParsingError; + +import java.io.IOException; +import java.util.Iterator; + +public class JsonLineDeserializer extends TextLineDeserializer { + private JSONParser parser; + private Type [] types; + private String [] columnNames; + + public JsonLineDeserializer(Schema schema, TableMeta meta, int[] targetColumnIndexes) { + super(schema, meta, targetColumnIndexes); + } + + @Override + public void init() { + types = SchemaUtil.toTypes(schema); + columnNames = SchemaUtil.toSimpleNames(schema); + + parser = new JSONParser(JSONParser.MODE_JSON_SIMPLE); + } + + @Override + public void deserialize(ByteBuf buf, Tuple output) throws IOException, TextLineParsingError { + byte [] line = new byte[buf.readableBytes()]; + buf.readBytes(line); + + try { + JSONObject object = (JSONObject) parser.parse(line); + + for (int i = 0; i < targetColumnIndexes.length; i++) { + int actualIdx = targetColumnIndexes[i]; + String fieldName = columnNames[actualIdx]; + + if (!object.containsKey(fieldName)) { + output.put(actualIdx, NullDatum.get()); + continue; + } + + switch (types[actualIdx]) { + case BOOLEAN: + String boolStr = object.getAsString(fieldName); + if (boolStr != null) { + output.put(actualIdx, DatumFactory.createBool(boolStr.equals("true"))); + } else { + output.put(actualIdx, NullDatum.get()); + } + break; + case CHAR: + String charStr = object.getAsString(fieldName); + if (charStr != null) { + output.put(actualIdx, DatumFactory.createChar(charStr)); + } else { + output.put(actualIdx, NullDatum.get()); + } + break; + case INT1: + case INT2: + Number int2Num = object.getAsNumber(fieldName); + if (int2Num != null) { + output.put(actualIdx, DatumFactory.createInt2(int2Num.shortValue())); + } else { + output.put(actualIdx, NullDatum.get()); + } + break; + case INT4: + Number int4Num = object.getAsNumber(fieldName); + if (int4Num != null) { + output.put(actualIdx, DatumFactory.createInt4(int4Num.intValue())); + } else { + output.put(actualIdx, NullDatum.get()); + } + break; + case INT8: + Number int8Num = object.getAsNumber(fieldName); + if (int8Num != null) { + output.put(actualIdx, DatumFactory.createInt8(int8Num.longValue())); + } else { + output.put(actualIdx, NullDatum.get()); + } + break; + case FLOAT4: + Number float4Num = object.getAsNumber(fieldName); + if (float4Num != null) { + output.put(actualIdx, DatumFactory.createFloat4(float4Num.floatValue())); + } else { + output.put(actualIdx, NullDatum.get()); + } + break; + case FLOAT8: + Number float8Num = object.getAsNumber(fieldName); + if (float8Num != null) { + output.put(actualIdx, DatumFactory.createFloat8(float8Num.doubleValue())); + } else { + output.put(actualIdx, NullDatum.get()); + } + break; + case TEXT: + String textStr = object.getAsString(fieldName); + if (textStr != null) { + output.put(actualIdx, DatumFactory.createText(textStr)); + } else { + output.put(actualIdx, NullDatum.get()); + } + break; + case TIMESTAMP: + String timestampStr = object.getAsString(fieldName); + if (timestampStr != null) { + output.put(actualIdx, DatumFactory.createTimestamp(timestampStr)); + } else { + output.put(actualIdx, NullDatum.get()); + } + break; + case TIME: + String timeStr = object.getAsString(fieldName); + if (timeStr != null) { + output.put(actualIdx, DatumFactory.createTime(timeStr)); + } else { + output.put(actualIdx, NullDatum.get()); + } + break; + case DATE: + String dateStr = object.getAsString(fieldName); + if (dateStr != null) { + output.put(actualIdx, DatumFactory.createDate(dateStr)); + } else { + output.put(actualIdx, NullDatum.get()); + } + break; + case BIT: + case BINARY: + case VARBINARY: + case BLOB: { + Object jsonObject = object.get(fieldName); + + if (jsonObject == null) { + output.put(actualIdx, NullDatum.get()); + break; + } + if (jsonObject instanceof String) { + output.put(actualIdx, DatumFactory.createBlob((String) jsonObject)); + } else if (jsonObject instanceof JSONArray) { + JSONArray jsonArray = (JSONArray) jsonObject; + byte[] bytes = new byte[jsonArray.size()]; + Iterator<Object> it = jsonArray.iterator(); + int arrayIdx = 0; + while (it.hasNext()) { + bytes[arrayIdx++] = ((Long) it.next()).byteValue(); + } + if (bytes.length > 0) { + output.put(actualIdx, DatumFactory.createBlob(bytes)); + } else { + output.put(actualIdx, NullDatum.get()); + } + break; + } else { + throw new IOException("Unknown json object: " + object.getClass().getSimpleName()); + } + break; + } + case INET4: + String inetStr = object.getAsString(fieldName); + if (inetStr != null) { + output.put(actualIdx, DatumFactory.createInet4(inetStr)); + } else { + output.put(actualIdx, NullDatum.get()); + } + break; + + case NULL_TYPE: + output.put(actualIdx, NullDatum.get()); + break; + + default: + throw new NotImplementedException(types[actualIdx].name() + " is not supported."); + } + } + } catch (ParseException pe) { + throw new TextLineParsingError(new String(line, TextDatum.DEFAULT_CHARSET), pe); + } catch (Throwable e) { + throw new IOException(e); + } + } + + @Override + public void release() { + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/json/JsonLineSerDe.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/json/JsonLineSerDe.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/json/JsonLineSerDe.java new file mode 100644 index 0000000..6db2c29 --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/json/JsonLineSerDe.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.json; + +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.storage.text.TextLineDeserializer; +import org.apache.tajo.storage.text.TextLineSerDe; +import org.apache.tajo.storage.text.TextLineSerializer; + +public class JsonLineSerDe extends TextLineSerDe { + @Override + public TextLineDeserializer createDeserializer(Schema schema, TableMeta meta, int[] targetColumnIndexes) { + return new JsonLineDeserializer(schema, meta, targetColumnIndexes); + } + + @Override + public TextLineSerializer createSerializer(Schema schema, TableMeta meta) { + return new JsonLineSerializer(schema, meta); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/json/JsonLineSerializer.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/json/JsonLineSerializer.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/json/JsonLineSerializer.java new file mode 100644 index 0000000..cd31ada --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/json/JsonLineSerializer.java @@ -0,0 +1,130 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.json; + + +import net.minidev.json.JSONObject; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.SchemaUtil; +import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.common.TajoDataTypes.Type; +import org.apache.tajo.common.exception.NotImplementedException; +import org.apache.tajo.datum.TextDatum; +import org.apache.tajo.datum.protobuf.ProtobufJsonFormat; +import org.apache.tajo.storage.Tuple; +import org.apache.tajo.storage.text.TextLineSerializer; + +import java.io.IOException; +import java.io.OutputStream; + +public class JsonLineSerializer extends TextLineSerializer { + private static ProtobufJsonFormat protobufJsonFormat = ProtobufJsonFormat.getInstance(); + + private Type [] types; + private String [] simpleNames; + private int columnNum; + + + public JsonLineSerializer(Schema schema, TableMeta meta) { + super(schema, meta); + } + + @Override + public void init() { + types = SchemaUtil.toTypes(schema); + simpleNames = SchemaUtil.toSimpleNames(schema); + columnNum = schema.size(); + } + + @Override + public int serialize(OutputStream out, Tuple input) throws IOException { + JSONObject jsonObject = new JSONObject(); + + for (int i = 0; i < columnNum; i++) { + if (input.isNull(i)) { + continue; + } + + String fieldName = simpleNames[i]; + Type type = types[i]; + + switch (type) { + + case BOOLEAN: + jsonObject.put(fieldName, input.getBool(i)); + break; + + case INT1: + case INT2: + jsonObject.put(fieldName, input.getInt2(i)); + break; + + case INT4: + jsonObject.put(fieldName, input.getInt4(i)); + break; + + case INT8: + jsonObject.put(fieldName, input.getInt8(i)); + break; + + case FLOAT4: + jsonObject.put(fieldName, input.getFloat4(i)); + break; + + case FLOAT8: + jsonObject.put(fieldName, input.getFloat8(i)); + break; + + case CHAR: + case TEXT: + case VARCHAR: + case INET4: + case TIMESTAMP: + case DATE: + case TIME: + case INTERVAL: + jsonObject.put(fieldName, input.getText(i)); + break; + + case BIT: + case BINARY: + case BLOB: + case VARBINARY: + jsonObject.put(fieldName, input.getBytes(i)); + break; + + case NULL_TYPE: + break; + + default: + throw new NotImplementedException(types[i].name() + " is not supported."); + } + } + + String jsonStr = jsonObject.toJSONString(); + byte [] jsonBytes = jsonStr.getBytes(TextDatum.DEFAULT_CHARSET); + out.write(jsonBytes); + return jsonBytes.length; + } + + @Override + public void release() { + + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/ParquetAppender.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/ParquetAppender.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/ParquetAppender.java new file mode 100644 index 0000000..b10d423 --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/ParquetAppender.java @@ -0,0 +1,151 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.parquet; + +import org.apache.tajo.QueryUnitAttemptId; +import org.apache.tajo.storage.StorageConstants; +import parquet.hadoop.ParquetOutputFormat; +import parquet.hadoop.metadata.CompressionCodecName; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.catalog.statistics.TableStats; +import org.apache.tajo.storage.FileAppender; +import org.apache.tajo.storage.TableStatistics; +import org.apache.tajo.storage.Tuple; + +import java.io.IOException; + +/** + * FileAppender for writing to Parquet files. + */ +public class ParquetAppender extends FileAppender { + private TajoParquetWriter writer; + private int blockSize; + private int pageSize; + private CompressionCodecName compressionCodecName; + private boolean enableDictionary; + private boolean validating; + private TableStatistics stats; + + /** + * Creates a new ParquetAppender. + * + * @param conf Configuration properties. + * @param schema The table schema. + * @param meta The table metadata. + * @param workDir The path of the Parquet file to write to. + */ + public ParquetAppender(Configuration conf, QueryUnitAttemptId taskAttemptId, Schema schema, TableMeta meta, + Path workDir) throws IOException { + super(conf, taskAttemptId, schema, meta, workDir); + this.blockSize = Integer.parseInt( + meta.getOption(ParquetOutputFormat.BLOCK_SIZE, StorageConstants.PARQUET_DEFAULT_BLOCK_SIZE)); + this.pageSize = Integer.parseInt( + meta.getOption(ParquetOutputFormat.PAGE_SIZE, StorageConstants.PARQUET_DEFAULT_PAGE_SIZE)); + this.compressionCodecName = CompressionCodecName.fromConf( + meta.getOption(ParquetOutputFormat.COMPRESSION, StorageConstants.PARQUET_DEFAULT_COMPRESSION_CODEC_NAME)); + this.enableDictionary = Boolean.parseBoolean( + meta.getOption(ParquetOutputFormat.ENABLE_DICTIONARY, StorageConstants.PARQUET_DEFAULT_IS_DICTIONARY_ENABLED)); + this.validating = Boolean.parseBoolean( + meta.getOption(ParquetOutputFormat.VALIDATION, StorageConstants.PARQUET_DEFAULT_IS_VALIDATION_ENABLED)); + } + + /** + * Initializes the Appender. This method creates a new TajoParquetWriter + * and initializes the table statistics if enabled. + */ + public void init() throws IOException { + writer = new TajoParquetWriter(path, + schema, + compressionCodecName, + blockSize, + pageSize, + enableDictionary, + validating); + if (enabledStats) { + this.stats = new TableStatistics(schema); + } + super.init(); + } + + /** + * Gets the current offset. Tracking offsets is currenly not implemented, so + * this method always returns 0. + * + * @return 0 + */ + @Override + public long getOffset() throws IOException { + return 0; + } + + /** + * Write a Tuple to the Parquet file. + * + * @param tuple The Tuple to write. + */ + @Override + public void addTuple(Tuple tuple) throws IOException { + if (enabledStats) { + for (int i = 0; i < schema.size(); ++i) { + stats.analyzeField(i, tuple.get(i)); + } + } + writer.write(tuple); + if (enabledStats) { + stats.incrementRow(); + } + } + + /** + * The ParquetWriter does not need to be flushed, so this is a no-op. + */ + @Override + public void flush() throws IOException { + } + + /** + * Closes the Appender. + */ + @Override + public void close() throws IOException { + writer.close(); + } + + public long getEstimatedOutputSize() throws IOException { + return writer.getEstimatedWrittenSize(); + } + + /** + * If table statistics is enabled, retrieve the table statistics. + * + * @return Table statistics if enabled or null otherwise. + */ + @Override + public TableStats getStats() { + if (enabledStats) { + return stats.getTableStat(); + } else { + return null; + } + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/ParquetScanner.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/ParquetScanner.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/ParquetScanner.java new file mode 100644 index 0000000..2f8efcf --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/ParquetScanner.java @@ -0,0 +1,119 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.parquet; + +import org.apache.hadoop.conf.Configuration; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.storage.FileScanner; +import org.apache.tajo.storage.Tuple; +import org.apache.tajo.storage.fragment.Fragment; + +import java.io.IOException; + +/** + * FileScanner for reading Parquet files + */ +public class ParquetScanner extends FileScanner { + private TajoParquetReader reader; + + /** + * Creates a new ParquetScanner. + * + * @param conf + * @param schema + * @param meta + * @param fragment + */ + public ParquetScanner(Configuration conf, final Schema schema, + final TableMeta meta, final Fragment fragment) { + super(conf, schema, meta, fragment); + } + + /** + * Initializes the ParquetScanner. This method initializes the + * TajoParquetReader. + */ + @Override + public void init() throws IOException { + if (targets == null) { + targets = schema.toArray(); + } + reader = new TajoParquetReader(fragment.getPath(), schema, new Schema(targets)); + super.init(); + } + + /** + * Reads the next Tuple from the Parquet file. + * + * @return The next Tuple from the Parquet file or null if end of file is + * reached. + */ + @Override + public Tuple next() throws IOException { + return reader.read(); + } + + /** + * Resets the scanner + */ + @Override + public void reset() throws IOException { + } + + /** + * Closes the scanner. + */ + @Override + public void close() throws IOException { + if (reader != null) { + reader.close(); + } + } + + /** + * Returns whether this scanner is projectable. + * + * @return true + */ + @Override + public boolean isProjectable() { + return true; + } + + /** + * Returns whether this scanner is selectable. + * + * @return false + */ + @Override + public boolean isSelectable() { + return false; + } + + /** + * Returns whether this scanner is splittable. + * + * @return false + */ + @Override + public boolean isSplittable() { + return false; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoParquetReader.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoParquetReader.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoParquetReader.java new file mode 100644 index 0000000..a765f48 --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoParquetReader.java @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.parquet; + +import org.apache.hadoop.fs.Path; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.storage.Tuple; +import org.apache.tajo.storage.thirdparty.parquet.ParquetReader; +import parquet.filter.UnboundRecordFilter; + +import java.io.IOException; + +/** + * Tajo implementation of {@link ParquetReader} to read Tajo records from a + * Parquet file. Users should use {@link ParquetScanner} and not this class + * directly. + */ +public class TajoParquetReader extends ParquetReader<Tuple> { + /** + * Creates a new TajoParquetReader. + * + * @param file The file to read from. + * @param readSchema Tajo schema of the table. + */ + public TajoParquetReader(Path file, Schema readSchema) throws IOException { + super(file, new TajoReadSupport(readSchema)); + } + + /** + * Creates a new TajoParquetReader. + * + * @param file The file to read from. + * @param readSchema Tajo schema of the table. + * @param requestedSchema Tajo schema of the projection. + */ + public TajoParquetReader(Path file, Schema readSchema, + Schema requestedSchema) throws IOException { + super(file, new TajoReadSupport(readSchema, requestedSchema)); + } + + /** + * Creates a new TajoParquetReader. + * + * @param file The file to read from. + * @param readSchema Tajo schema of the table. + * @param recordFilter Record filter. + */ + public TajoParquetReader(Path file, Schema readSchema, + UnboundRecordFilter recordFilter) + throws IOException { + super(file, new TajoReadSupport(readSchema), recordFilter); + } + + /** + * Creates a new TajoParquetReader. + * + * @param file The file to read from. + * @param readSchema Tajo schema of the table. + * @param requestedSchema Tajo schema of the projection. + * @param recordFilter Record filter. + */ + public TajoParquetReader(Path file, Schema readSchema, + Schema requestedSchema, + UnboundRecordFilter recordFilter) + throws IOException { + super(file, new TajoReadSupport(readSchema, requestedSchema), + recordFilter); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoParquetWriter.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoParquetWriter.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoParquetWriter.java new file mode 100644 index 0000000..5f220c5 --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoParquetWriter.java @@ -0,0 +1,104 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.parquet; + +import org.apache.hadoop.fs.Path; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.storage.Tuple; +import org.apache.tajo.storage.thirdparty.parquet.ParquetWriter; +import parquet.hadoop.metadata.CompressionCodecName; + +import java.io.IOException; + +/** + * Tajo implementation of {@link ParquetWriter} to write Tajo records to a + * Parquet file. Users should use {@link ParquetAppender} and not this class + * directly. + */ +public class TajoParquetWriter extends ParquetWriter<Tuple> { + /** + * Create a new TajoParquetWriter + * + * @param file The file name to write to. + * @param schema The Tajo schema of the table. + * @param compressionCodecName Compression codec to use, or + * CompressionCodecName.UNCOMPRESSED. + * @param blockSize The block size threshold. + * @param pageSize See parquet write up. Blocks are subdivided into pages + * for alignment. + * @throws java.io.IOException + */ + public TajoParquetWriter(Path file, + Schema schema, + CompressionCodecName compressionCodecName, + int blockSize, + int pageSize) throws IOException { + super(file, + new TajoWriteSupport(schema), + compressionCodecName, + blockSize, + pageSize); + } + + /** + * Create a new TajoParquetWriter. + * + * @param file The file name to write to. + * @param schema The Tajo schema of the table. + * @param compressionCodecName Compression codec to use, or + * CompressionCodecName.UNCOMPRESSED. + * @param blockSize The block size threshold. + * @param pageSize See parquet write up. Blocks are subdivided into pages + * for alignment. + * @param enableDictionary Whether to use a dictionary to compress columns. + * @param validating Whether to turn on validation. + * @throws java.io.IOException + */ + public TajoParquetWriter(Path file, + Schema schema, + CompressionCodecName compressionCodecName, + int blockSize, + int pageSize, + boolean enableDictionary, + boolean validating) throws IOException { + super(file, + new TajoWriteSupport(schema), + compressionCodecName, + blockSize, + pageSize, + enableDictionary, + validating); + } + + /** + * Creates a new TajoParquetWriter. The default block size is 128 MB. + * The default page size is 1 MB. Default compression is no compression. + * + * @param file The Path of the file to write to. + * @param schema The Tajo schema of the table. + * @throws java.io.IOException + */ + public TajoParquetWriter(Path file, Schema schema) throws IOException { + this(file, + schema, + CompressionCodecName.UNCOMPRESSED, + DEFAULT_BLOCK_SIZE, + DEFAULT_PAGE_SIZE); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoReadSupport.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoReadSupport.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoReadSupport.java new file mode 100644 index 0000000..a64e987 --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoReadSupport.java @@ -0,0 +1,99 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.parquet; + +import org.apache.hadoop.conf.Configuration; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.storage.Tuple; +import parquet.Log; +import parquet.hadoop.api.InitContext; +import parquet.hadoop.api.ReadSupport; +import parquet.io.api.RecordMaterializer; +import parquet.schema.MessageType; + +import java.util.Map; + +/** + * Tajo implementation of {@link parquet.hadoop.api.ReadSupport} for {@link org.apache.tajo.storage.Tuple}s. + * Users should use {@link org.apache.tajo.storage.parquet.ParquetScanner} and not this class directly. + */ +public class TajoReadSupport extends ReadSupport<Tuple> { + private static final Log LOG = Log.getLog(TajoReadSupport.class); + + private Schema readSchema; + private Schema requestedSchema; + + /** + * Creates a new TajoReadSupport. + * + * @param requestedSchema The Tajo schema of the requested projection passed + * down by ParquetScanner. + */ + public TajoReadSupport(Schema readSchema, Schema requestedSchema) { + super(); + this.readSchema = readSchema; + this.requestedSchema = requestedSchema; + } + + /** + * Creates a new TajoReadSupport. + * + * @param readSchema The schema of the table. + */ + public TajoReadSupport(Schema readSchema) { + super(); + this.readSchema = readSchema; + this.requestedSchema = readSchema; + } + + /** + * Initializes the ReadSupport. + * + * @param context The InitContext. + * @return A ReadContext that defines how to read the file. + */ + @Override + public ReadContext init(InitContext context) { + if (requestedSchema == null) { + throw new RuntimeException("requestedSchema is null."); + } + MessageType requestedParquetSchema = + new TajoSchemaConverter().convert(requestedSchema); + LOG.debug("Reading data with projection:\n" + requestedParquetSchema); + return new ReadContext(requestedParquetSchema); + } + + /** + * Prepares for read. + * + * @param configuration The job configuration. + * @param keyValueMetaData App-specific metadata from the file. + * @param fileSchema The schema of the Parquet file. + * @param readContext Returned by the init method. + */ + @Override + public RecordMaterializer<Tuple> prepareForRead( + Configuration configuration, + Map<String, String> keyValueMetaData, + MessageType fileSchema, + ReadContext readContext) { + MessageType parquetRequestedSchema = readContext.getRequestedSchema(); + return new TajoRecordMaterializer(parquetRequestedSchema, requestedSchema, readSchema); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoRecordConverter.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoRecordConverter.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoRecordConverter.java new file mode 100644 index 0000000..a091eac --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoRecordConverter.java @@ -0,0 +1,380 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.parquet; + +import com.google.protobuf.InvalidProtocolBufferException; +import com.google.protobuf.Message; +import org.apache.tajo.catalog.Column; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.common.TajoDataTypes; +import org.apache.tajo.common.TajoDataTypes.DataType; +import org.apache.tajo.datum.*; +import org.apache.tajo.storage.Tuple; +import org.apache.tajo.storage.VTuple; +import parquet.io.api.Binary; +import parquet.io.api.Converter; +import parquet.io.api.GroupConverter; +import parquet.io.api.PrimitiveConverter; +import parquet.schema.GroupType; +import parquet.schema.Type; + +import java.nio.ByteBuffer; + +/** + * Converter to convert a Parquet record into a Tajo Tuple. + */ +public class TajoRecordConverter extends GroupConverter { + private final GroupType parquetSchema; + private final Schema tajoReadSchema; + private final int[] projectionMap; + private final int tupleSize; + + private final Converter[] converters; + + private Tuple currentTuple; + + /** + * Creates a new TajoRecordConverter. + * + * @param parquetSchema The Parquet schema of the projection. + * @param tajoReadSchema The Tajo schema of the table. + * @param projectionMap An array mapping the projection column to the column + * index in the table. + */ + public TajoRecordConverter(GroupType parquetSchema, Schema tajoReadSchema, + int[] projectionMap) { + this.parquetSchema = parquetSchema; + this.tajoReadSchema = tajoReadSchema; + this.projectionMap = projectionMap; + this.tupleSize = tajoReadSchema.size(); + + // The projectionMap.length does not match parquetSchema.getFieldCount() + // when the projection contains NULL_TYPE columns. We will skip over the + // NULL_TYPE columns when we construct the converters and populate the + // NULL_TYPE columns with NullDatums in start(). + int index = 0; + this.converters = new Converter[parquetSchema.getFieldCount()]; + for (int i = 0; i < projectionMap.length; ++i) { + final int projectionIndex = projectionMap[i]; + Column column = tajoReadSchema.getColumn(projectionIndex); + if (column.getDataType().getType() == TajoDataTypes.Type.NULL_TYPE) { + continue; + } + Type type = parquetSchema.getType(index); + converters[index] = newConverter(column, type, new ParentValueContainer() { + @Override + void add(Object value) { + TajoRecordConverter.this.set(projectionIndex, value); + } + }); + ++index; + } + } + + private void set(int index, Object value) { + currentTuple.put(index, (Datum)value); + } + + private Converter newConverter(Column column, Type type, + ParentValueContainer parent) { + DataType dataType = column.getDataType(); + switch (dataType.getType()) { + case BOOLEAN: + return new FieldBooleanConverter(parent); + case BIT: + return new FieldBitConverter(parent); + case CHAR: + return new FieldCharConverter(parent); + case INT2: + return new FieldInt2Converter(parent); + case INT4: + return new FieldInt4Converter(parent); + case INT8: + return new FieldInt8Converter(parent); + case FLOAT4: + return new FieldFloat4Converter(parent); + case FLOAT8: + return new FieldFloat8Converter(parent); + case INET4: + return new FieldInet4Converter(parent); + case INET6: + throw new RuntimeException("No converter for INET6"); + case TEXT: + return new FieldTextConverter(parent); + case PROTOBUF: + return new FieldProtobufConverter(parent, dataType); + case BLOB: + return new FieldBlobConverter(parent); + case NULL_TYPE: + throw new RuntimeException("No converter for NULL_TYPE."); + default: + throw new RuntimeException("Unsupported data type"); + } + } + + /** + * Gets the converter for a specific field. + * + * @param fieldIndex Index of the field in the projection. + * @return The converter for the field. + */ + @Override + public Converter getConverter(int fieldIndex) { + return converters[fieldIndex]; + } + + /** + * Called before processing fields. This method fills any fields that have + * NULL values or have type NULL_TYPE with a NullDatum. + */ + @Override + public void start() { + currentTuple = new VTuple(tupleSize); + } + + /** + * Called after all fields have been processed. + */ + @Override + public void end() { + for (int i = 0; i < projectionMap.length; ++i) { + final int projectionIndex = projectionMap[i]; + Column column = tajoReadSchema.getColumn(projectionIndex); + if (column.getDataType().getType() == TajoDataTypes.Type.NULL_TYPE + || currentTuple.get(projectionIndex) == null) { + set(projectionIndex, NullDatum.get()); + } + } + } + + /** + * Returns the current record converted by this converter. + * + * @return The current record. + */ + public Tuple getCurrentRecord() { + return currentTuple; + } + + static abstract class ParentValueContainer { + /** + * Adds the value to the parent. + * + * @param value The value to add. + */ + abstract void add(Object value); + } + + static final class FieldBooleanConverter extends PrimitiveConverter { + private final ParentValueContainer parent; + + public FieldBooleanConverter(ParentValueContainer parent) { + this.parent = parent; + } + + @Override + final public void addBoolean(boolean value) { + parent.add(DatumFactory.createBool(value)); + } + } + + static final class FieldBitConverter extends PrimitiveConverter { + private final ParentValueContainer parent; + + public FieldBitConverter(ParentValueContainer parent) { + this.parent = parent; + } + + @Override + final public void addInt(int value) { + parent.add(DatumFactory.createBit((byte)(value & 0xff))); + } + } + + static final class FieldCharConverter extends PrimitiveConverter { + private final ParentValueContainer parent; + + public FieldCharConverter(ParentValueContainer parent) { + this.parent = parent; + } + + @Override + final public void addBinary(Binary value) { + parent.add(DatumFactory.createChar(value.getBytes())); + } + } + + static final class FieldInt2Converter extends PrimitiveConverter { + private final ParentValueContainer parent; + + public FieldInt2Converter(ParentValueContainer parent) { + this.parent = parent; + } + + @Override + final public void addInt(int value) { + parent.add(DatumFactory.createInt2((short)value)); + } + } + + static final class FieldInt4Converter extends PrimitiveConverter { + private final ParentValueContainer parent; + + public FieldInt4Converter(ParentValueContainer parent) { + this.parent = parent; + } + + @Override + final public void addInt(int value) { + parent.add(DatumFactory.createInt4(value)); + } + } + + static final class FieldInt8Converter extends PrimitiveConverter { + private final ParentValueContainer parent; + + public FieldInt8Converter(ParentValueContainer parent) { + this.parent = parent; + } + + @Override + final public void addLong(long value) { + parent.add(DatumFactory.createInt8(value)); + } + + @Override + final public void addInt(int value) { + parent.add(DatumFactory.createInt8(Long.valueOf(value))); + } + } + + static final class FieldFloat4Converter extends PrimitiveConverter { + private final ParentValueContainer parent; + + public FieldFloat4Converter(ParentValueContainer parent) { + this.parent = parent; + } + + @Override + final public void addInt(int value) { + parent.add(DatumFactory.createFloat4(Float.valueOf(value))); + } + + @Override + final public void addLong(long value) { + parent.add(DatumFactory.createFloat4(Float.valueOf(value))); + } + + @Override + final public void addFloat(float value) { + parent.add(DatumFactory.createFloat4(value)); + } + } + + static final class FieldFloat8Converter extends PrimitiveConverter { + private final ParentValueContainer parent; + + public FieldFloat8Converter(ParentValueContainer parent) { + this.parent = parent; + } + + @Override + final public void addInt(int value) { + parent.add(DatumFactory.createFloat8(Double.valueOf(value))); + } + + @Override + final public void addLong(long value) { + parent.add(DatumFactory.createFloat8(Double.valueOf(value))); + } + + @Override + final public void addFloat(float value) { + parent.add(DatumFactory.createFloat8(Double.valueOf(value))); + } + + @Override + final public void addDouble(double value) { + parent.add(DatumFactory.createFloat8(value)); + } + } + + static final class FieldInet4Converter extends PrimitiveConverter { + private final ParentValueContainer parent; + + public FieldInet4Converter(ParentValueContainer parent) { + this.parent = parent; + } + + @Override + final public void addBinary(Binary value) { + parent.add(DatumFactory.createInet4(value.getBytes())); + } + } + + static final class FieldTextConverter extends PrimitiveConverter { + private final ParentValueContainer parent; + + public FieldTextConverter(ParentValueContainer parent) { + this.parent = parent; + } + + @Override + final public void addBinary(Binary value) { + parent.add(DatumFactory.createText(value.getBytes())); + } + } + + static final class FieldBlobConverter extends PrimitiveConverter { + private final ParentValueContainer parent; + + public FieldBlobConverter(ParentValueContainer parent) { + this.parent = parent; + } + + @Override + final public void addBinary(Binary value) { + parent.add(new BlobDatum(ByteBuffer.wrap(value.getBytes()))); + } + } + + static final class FieldProtobufConverter extends PrimitiveConverter { + private final ParentValueContainer parent; + private final DataType dataType; + + public FieldProtobufConverter(ParentValueContainer parent, + DataType dataType) { + this.parent = parent; + this.dataType = dataType; + } + + @Override + final public void addBinary(Binary value) { + try { + ProtobufDatumFactory factory = + ProtobufDatumFactory.get(dataType.getCode()); + Message.Builder builder = factory.newBuilder(); + builder.mergeFrom(value.getBytes()); + parent.add(factory.createDatum(builder)); + } catch (InvalidProtocolBufferException e) { + throw new RuntimeException(e); + } + } + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoRecordMaterializer.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoRecordMaterializer.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoRecordMaterializer.java new file mode 100644 index 0000000..436159c --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoRecordMaterializer.java @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.parquet; + +import org.apache.tajo.catalog.Column; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.storage.Tuple; +import parquet.io.api.GroupConverter; +import parquet.io.api.RecordMaterializer; +import parquet.schema.MessageType; + +/** + * Materializes a Tajo Tuple from a stream of Parquet data. + */ +class TajoRecordMaterializer extends RecordMaterializer<Tuple> { + private final TajoRecordConverter root; + + /** + * Creates a new TajoRecordMaterializer. + * + * @param parquetSchema The Parquet schema of the projection. + * @param tajoSchema The Tajo schema of the projection. + * @param tajoReadSchema The Tajo schema of the table. + */ + public TajoRecordMaterializer(MessageType parquetSchema, Schema tajoSchema, + Schema tajoReadSchema) { + int[] projectionMap = getProjectionMap(tajoReadSchema, tajoSchema); + this.root = new TajoRecordConverter(parquetSchema, tajoReadSchema, + projectionMap); + } + + private int[] getProjectionMap(Schema schema, Schema projection) { + Column[] targets = projection.toArray(); + int[] projectionMap = new int[targets.length]; + for (int i = 0; i < targets.length; ++i) { + int tid = schema.getColumnId(targets[i].getQualifiedName()); + projectionMap[i] = tid; + } + return projectionMap; + } + + /** + * Returns the current record being materialized. + * + * @return The record being materialized. + */ + @Override + public Tuple getCurrentRecord() { + return root.getCurrentRecord(); + } + + /** + * Returns the root converter. + * + * @return The root converter + */ + @Override + public GroupConverter getRootConverter() { + return root; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoSchemaConverter.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoSchemaConverter.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoSchemaConverter.java new file mode 100644 index 0000000..555b623 --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoSchemaConverter.java @@ -0,0 +1,206 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.parquet; + +import org.apache.tajo.catalog.Column; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.common.TajoDataTypes; +import parquet.schema.MessageType; +import parquet.schema.OriginalType; +import parquet.schema.PrimitiveType; +import parquet.schema.PrimitiveType.PrimitiveTypeName; +import parquet.schema.Type; + +import java.util.ArrayList; +import java.util.List; + +/** + * Converts between Parquet and Tajo schemas. See package documentation for + * details on the mapping. + */ +public class TajoSchemaConverter { + private static final String TABLE_SCHEMA = "table_schema"; + + /** + * Creates a new TajoSchemaConverter. + */ + public TajoSchemaConverter() { + } + + /** + * Converts a Parquet schema to a Tajo schema. + * + * @param parquetSchema The Parquet schema to convert. + * @return The resulting Tajo schema. + */ + public Schema convert(MessageType parquetSchema) { + return convertFields(parquetSchema.getFields()); + } + + private Schema convertFields(List<Type> parquetFields) { + List<Column> columns = new ArrayList<Column>(); + for (int i = 0; i < parquetFields.size(); ++i) { + Type fieldType = parquetFields.get(i); + if (fieldType.isRepetition(Type.Repetition.REPEATED)) { + throw new RuntimeException("REPEATED not supported outside LIST or" + + " MAP. Type: " + fieldType); + } + columns.add(convertField(fieldType)); + } + Column[] columnsArray = new Column[columns.size()]; + columnsArray = columns.toArray(columnsArray); + return new Schema(columnsArray); + } + + private Column convertField(final Type fieldType) { + if (fieldType.isPrimitive()) { + return convertPrimitiveField(fieldType); + } else { + return convertComplexField(fieldType); + } + } + + private Column convertPrimitiveField(final Type fieldType) { + final String fieldName = fieldType.getName(); + final PrimitiveTypeName parquetPrimitiveTypeName = + fieldType.asPrimitiveType().getPrimitiveTypeName(); + final OriginalType originalType = fieldType.getOriginalType(); + return parquetPrimitiveTypeName.convert( + new PrimitiveType.PrimitiveTypeNameConverter<Column, RuntimeException>() { + @Override + public Column convertBOOLEAN(PrimitiveTypeName primitiveTypeName) { + return new Column(fieldName, TajoDataTypes.Type.BOOLEAN); + } + + @Override + public Column convertINT32(PrimitiveTypeName primitiveTypeName) { + return new Column(fieldName, TajoDataTypes.Type.INT4); + } + + @Override + public Column convertINT64(PrimitiveTypeName primitiveTypeName) { + return new Column(fieldName, TajoDataTypes.Type.INT8); + } + + @Override + public Column convertFLOAT(PrimitiveTypeName primitiveTypeName) { + return new Column(fieldName, TajoDataTypes.Type.FLOAT4); + } + + @Override + public Column convertDOUBLE(PrimitiveTypeName primitiveTypeName) { + return new Column(fieldName, TajoDataTypes.Type.FLOAT8); + } + + @Override + public Column convertFIXED_LEN_BYTE_ARRAY( + PrimitiveTypeName primitiveTypeName) { + return new Column(fieldName, TajoDataTypes.Type.BLOB); + } + + @Override + public Column convertBINARY(PrimitiveTypeName primitiveTypeName) { + if (originalType == OriginalType.UTF8) { + return new Column(fieldName, TajoDataTypes.Type.TEXT); + } else { + return new Column(fieldName, TajoDataTypes.Type.BLOB); + } + } + + @Override + public Column convertINT96(PrimitiveTypeName primitiveTypeName) { + throw new RuntimeException("Converting from INT96 not supported."); + } + }); + } + + private Column convertComplexField(final Type fieldType) { + throw new RuntimeException("Complex types not supported."); + } + + /** + * Converts a Tajo schema to a Parquet schema. + * + * @param tajoSchema The Tajo schema to convert. + * @return The resulting Parquet schema. + */ + public MessageType convert(Schema tajoSchema) { + List<Type> types = new ArrayList<Type>(); + for (int i = 0; i < tajoSchema.size(); ++i) { + Column column = tajoSchema.getColumn(i); + if (column.getDataType().getType() == TajoDataTypes.Type.NULL_TYPE) { + continue; + } + types.add(convertColumn(column)); + } + return new MessageType(TABLE_SCHEMA, types); + } + + private Type convertColumn(Column column) { + TajoDataTypes.Type type = column.getDataType().getType(); + switch (type) { + case BOOLEAN: + return primitive(column.getSimpleName(), + PrimitiveTypeName.BOOLEAN); + case BIT: + case INT2: + case INT4: + return primitive(column.getSimpleName(), + PrimitiveTypeName.INT32); + case INT8: + return primitive(column.getSimpleName(), + PrimitiveTypeName.INT64); + case FLOAT4: + return primitive(column.getSimpleName(), + PrimitiveTypeName.FLOAT); + case FLOAT8: + return primitive(column.getSimpleName(), + PrimitiveTypeName.DOUBLE); + case CHAR: + case TEXT: + return primitive(column.getSimpleName(), + PrimitiveTypeName.BINARY, + OriginalType.UTF8); + case PROTOBUF: + return primitive(column.getSimpleName(), + PrimitiveTypeName.BINARY); + case BLOB: + return primitive(column.getSimpleName(), + PrimitiveTypeName.BINARY); + case INET4: + case INET6: + return primitive(column.getSimpleName(), + PrimitiveTypeName.BINARY); + default: + throw new RuntimeException("Cannot convert Tajo type: " + type); + } + } + + private PrimitiveType primitive(String name, + PrimitiveTypeName primitive) { + return new PrimitiveType(Type.Repetition.OPTIONAL, primitive, name, null); + } + + private PrimitiveType primitive(String name, + PrimitiveTypeName primitive, + OriginalType originalType) { + return new PrimitiveType(Type.Repetition.OPTIONAL, primitive, name, + originalType); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoWriteSupport.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoWriteSupport.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoWriteSupport.java new file mode 100644 index 0000000..e05aeaf --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoWriteSupport.java @@ -0,0 +1,148 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.parquet; + +import org.apache.hadoop.conf.Configuration; +import org.apache.tajo.catalog.Column; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.common.TajoDataTypes; +import org.apache.tajo.datum.Datum; +import org.apache.tajo.storage.Tuple; +import parquet.hadoop.api.WriteSupport; +import parquet.io.api.Binary; +import parquet.io.api.RecordConsumer; +import parquet.schema.GroupType; +import parquet.schema.MessageType; +import parquet.schema.Type; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Tajo implementation of {@link parquet.hadoop.api.WriteSupport} for {@link org.apache.tajo.storage.Tuple}s. + * Users should use {@link ParquetAppender} and not this class directly. + */ +public class TajoWriteSupport extends WriteSupport<Tuple> { + private RecordConsumer recordConsumer; + private MessageType rootSchema; + private Schema rootTajoSchema; + + /** + * Creates a new TajoWriteSupport. + * + * @param tajoSchema The Tajo schema for the table. + */ + public TajoWriteSupport(Schema tajoSchema) { + this.rootSchema = new TajoSchemaConverter().convert(tajoSchema); + this.rootTajoSchema = tajoSchema; + } + + /** + * Initializes the WriteSupport. + * + * @param configuration The job's configuration. + * @return A WriteContext that describes how to write the file. + */ + @Override + public WriteContext init(Configuration configuration) { + Map<String, String> extraMetaData = new HashMap<String, String>(); + return new WriteContext(rootSchema, extraMetaData); + } + + /** + * Called once per row group. + * + * @param recordConsumer The {@link parquet.io.api.RecordConsumer} to write to. + */ + @Override + public void prepareForWrite(RecordConsumer recordConsumer) { + this.recordConsumer = recordConsumer; + } + + /** + * Writes a Tuple to the file. + * + * @param tuple The Tuple to write to the file. + */ + @Override + public void write(Tuple tuple) { + recordConsumer.startMessage(); + writeRecordFields(rootSchema, rootTajoSchema, tuple); + recordConsumer.endMessage(); + } + + private void writeRecordFields(GroupType schema, Schema tajoSchema, + Tuple tuple) { + List<Type> fields = schema.getFields(); + // Parquet ignores Tajo NULL_TYPE columns, so the index may differ. + int index = 0; + for (int tajoIndex = 0; tajoIndex < tajoSchema.size(); ++tajoIndex) { + Column column = tajoSchema.getColumn(tajoIndex); + if (column.getDataType().getType() == TajoDataTypes.Type.NULL_TYPE) { + continue; + } + Datum datum = tuple.get(tajoIndex); + Type fieldType = fields.get(index); + if (!tuple.isNull(tajoIndex)) { + recordConsumer.startField(fieldType.getName(), index); + writeValue(fieldType, column, datum); + recordConsumer.endField(fieldType.getName(), index); + } else if (fieldType.isRepetition(Type.Repetition.REQUIRED)) { + throw new RuntimeException("Null-value for required field: " + + column.getSimpleName()); + } + ++index; + } + } + + private void writeValue(Type fieldType, Column column, Datum datum) { + switch (column.getDataType().getType()) { + case BOOLEAN: + recordConsumer.addBoolean(datum.asBool()); + break; + case BIT: + case INT2: + case INT4: + recordConsumer.addInteger(datum.asInt4()); + break; + case INT8: + recordConsumer.addLong(datum.asInt8()); + break; + case FLOAT4: + recordConsumer.addFloat(datum.asFloat4()); + break; + case FLOAT8: + recordConsumer.addDouble(datum.asFloat8()); + break; + case CHAR: + case TEXT: + recordConsumer.addBinary(Binary.fromByteArray(datum.asTextBytes())); + break; + case PROTOBUF: + case BLOB: + case INET4: + case INET6: + recordConsumer.addBinary(Binary.fromByteArray(datum.asByteArray())); + break; + default: + break; + } + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/package-info.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/package-info.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/package-info.java new file mode 100644 index 0000000..d7d16b7 --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/package-info.java @@ -0,0 +1,96 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * <p> + * Provides read and write support for Parquet files. Tajo schemas are + * converted to Parquet schemas according to the following mapping of Tajo + * and Parquet types: + * </p> + * + * <table> + * <tr> + * <th>Tajo type</th> + * <th>Parquet type</th> + * </tr> + * <tr> + * <td>NULL_TYPE</td> + * <td>No type. The field is not encoded in Parquet.</td> + * </tr> + * <tr> + * <td>BOOLEAN</td> + * <td>BOOLEAN</td> + * </tr> + * <tr> + * <td>BIT</td> + * <td>INT32</td> + * </tr> + * <tr> + * <td>INT2</td> + * <td>INT32</td> + * </tr> + * <tr> + * <td>INT4</td> + * <td>INT32</td> + * </tr> + * <tr> + * <td>INT8</td> + * <td>INT64</td> + * </tr> + * <tr> + * <td>FLOAT4</td> + * <td>FLOAT</td> + * </tr> + * <tr> + * <td>FLOAT8</td> + * <td>DOUBLE</td> + * </tr> + * <tr> + * <td>CHAR</td> + * <td>BINARY (with OriginalType UTF8)</td> + * </tr> + * <tr> + * <td>TEXT</td> + * <td>BINARY (with OriginalType UTF8)</td> + * </tr> + * <tr> + * <td>PROTOBUF</td> + * <td>BINARY</td> + * </tr> + * <tr> + * <td>BLOB</td> + * <td>BINARY</td> + * </tr> + * <tr> + * <td>INET4</td> + * <td>BINARY</td> + * </tr> + * </table> + * + * <p> + * Because Tajo fields can be NULL, all Parquet fields are marked as optional. + * </p> + * + * <p> + * The conversion from Tajo to Parquet is lossy without the original Tajo + * schema. As a result, Parquet files are read using the Tajo schema saved in + * the Tajo catalog for the table the Parquet files belong to, which was + * defined when the table was created. + * </p> + */ + +package org.apache.tajo.storage.parquet;
