http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/index/bst/BSTIndex.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/index/bst/BSTIndex.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/index/bst/BSTIndex.java new file mode 100644 index 0000000..81a1ffd --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/index/bst/BSTIndex.java @@ -0,0 +1,623 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.index.bst; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.proto.CatalogProtos.SchemaProto; +import org.apache.tajo.storage.*; +import org.apache.tajo.storage.RowStoreUtil.RowStoreDecoder; +import org.apache.tajo.storage.RowStoreUtil.RowStoreEncoder; +import org.apache.tajo.storage.index.IndexMethod; +import org.apache.tajo.storage.index.IndexWriter; +import org.apache.tajo.storage.index.OrderIndexReader; + +import java.io.Closeable; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.LinkedList; +import java.util.Set; +import java.util.TreeMap; + +import static org.apache.tajo.index.IndexProtos.TupleComparatorProto; + +/** + * This is two-level binary search tree index. This is one of the value-list + * index structure. Thus, it is inefficient in the case where + * the many of the values are same. Also, the BST shows the fast performance + * when the selectivity of rows to be retrieved is less than 5%. + * BSTIndexWriter is not thread-safe, whereas BSTIndexReader is thread-safe. + */ +public class BSTIndex implements IndexMethod { + private static final Log LOG = LogFactory.getLog(BSTIndex.class); + + public static final int ONE_LEVEL_INDEX = 1; + public static final int TWO_LEVEL_INDEX = 2; + + private final Configuration conf; + + public BSTIndex(final Configuration conf) { + this.conf = conf; + } + + @Override + public BSTIndexWriter getIndexWriter(final Path fileName, int level, Schema keySchema, + TupleComparator comparator) throws IOException { + return new BSTIndexWriter(fileName, level, keySchema, comparator); + } + + @Override + public BSTIndexReader getIndexReader(Path fileName, Schema keySchema, TupleComparator comparator) throws IOException { + return new BSTIndexReader(fileName, keySchema, comparator); + } + + public BSTIndexReader getIndexReader(Path fileName) throws IOException { + return new BSTIndexReader(fileName); + } + + public class BSTIndexWriter extends IndexWriter implements Closeable { + private FSDataOutputStream out; + private FileSystem fs; + private int level; + private int loadNum = 4096; + private Path fileName; + + private final Schema keySchema; + private final TupleComparator compartor; + private final KeyOffsetCollector collector; + private KeyOffsetCollector rootCollector; + + private Tuple firstKey; + private Tuple lastKey; + + private RowStoreEncoder rowStoreEncoder; + + // private Tuple lastestKey = null; + + /** + * constructor + * + * @param level + * : IndexCreater.ONE_LEVEL_INDEX or IndexCreater.TWO_LEVEL_INDEX + * @throws java.io.IOException + */ + public BSTIndexWriter(final Path fileName, int level, Schema keySchema, + TupleComparator comparator) throws IOException { + this.fileName = fileName; + this.level = level; + this.keySchema = keySchema; + this.compartor = comparator; + this.collector = new KeyOffsetCollector(comparator); + this.rowStoreEncoder = RowStoreUtil.createEncoder(keySchema); + } + + public void setLoadNum(int loadNum) { + this.loadNum = loadNum; + } + + public void open() throws IOException { + fs = fileName.getFileSystem(conf); + if (fs.exists(fileName)) { + throw new IOException("ERROR: index file (" + fileName + " already exists"); + } + out = fs.create(fileName); + } + + @Override + public void write(Tuple key, long offset) throws IOException { + if (firstKey == null || compartor.compare(key, firstKey) < 0) { + firstKey = key; + } + if (lastKey == null || compartor.compare(lastKey, key) < 0) { + lastKey = key; + } + + collector.put(key, offset); + } + + public TupleComparator getComparator() { + return this.compartor; + } + + public void flush() throws IOException { + out.flush(); + } + + public void writeHeader(int entryNum) throws IOException { + // schema + byte [] schemaBytes = keySchema.getProto().toByteArray(); + out.writeInt(schemaBytes.length); + out.write(schemaBytes); + + // comparator + byte [] comparatorBytes = compartor.getProto().toByteArray(); + out.writeInt(comparatorBytes.length); + out.write(comparatorBytes); + + // level + out.writeInt(this.level); + // entry + out.writeInt(entryNum); + if (entryNum > 0) { + byte [] minBytes = rowStoreEncoder.toBytes(firstKey); + out.writeInt(minBytes.length); + out.write(minBytes); + byte [] maxBytes = rowStoreEncoder.toBytes(lastKey); + out.writeInt(maxBytes.length); + out.write(maxBytes); + } + out.flush(); + } + + public void close() throws IOException { + /* two level initialize */ + if (this.level == TWO_LEVEL_INDEX) { + rootCollector = new KeyOffsetCollector(this.compartor); + } + + /* data writing phase */ + TreeMap<Tuple, LinkedList<Long>> keyOffsetMap = collector.getMap(); + Set<Tuple> keySet = keyOffsetMap.keySet(); + + int entryNum = keySet.size(); + writeHeader(entryNum); + + int loadCount = this.loadNum - 1; + for (Tuple key : keySet) { + + if (this.level == TWO_LEVEL_INDEX) { + loadCount++; + if (loadCount == this.loadNum) { + rootCollector.put(key, out.getPos()); + loadCount = 0; + } + } + /* key writing */ + byte[] buf = rowStoreEncoder.toBytes(key); + out.writeInt(buf.length); + out.write(buf); + + /**/ + LinkedList<Long> offsetList = keyOffsetMap.get(key); + /* offset num writing */ + int offsetSize = offsetList.size(); + out.writeInt(offsetSize); + /* offset writing */ + for (Long offset : offsetList) { + out.writeLong(offset); + } + } + + out.flush(); + out.close(); + keySet.clear(); + collector.clear(); + + FSDataOutputStream rootOut = null; + /* root index creating phase */ + if (this.level == TWO_LEVEL_INDEX) { + TreeMap<Tuple, LinkedList<Long>> rootMap = rootCollector.getMap(); + keySet = rootMap.keySet(); + + rootOut = fs.create(new Path(fileName + ".root")); + rootOut.writeInt(this.loadNum); + rootOut.writeInt(keySet.size()); + + /* root key writing */ + for (Tuple key : keySet) { + byte[] buf = rowStoreEncoder.toBytes(key); + rootOut.writeInt(buf.length); + rootOut.write(buf); + + LinkedList<Long> offsetList = rootMap.get(key); + if (offsetList.size() > 1 || offsetList.size() == 0) { + throw new IOException("Why root index doen't have one offset?"); + } + rootOut.writeLong(offsetList.getFirst()); + + } + rootOut.flush(); + rootOut.close(); + + keySet.clear(); + rootCollector.clear(); + } + } + + private class KeyOffsetCollector { + private TreeMap<Tuple, LinkedList<Long>> map; + + public KeyOffsetCollector(TupleComparator comparator) { + map = new TreeMap<Tuple, LinkedList<Long>>(comparator); + } + + public void put(Tuple key, long offset) { + if (map.containsKey(key)) { + map.get(key).add(offset); + } else { + LinkedList<Long> list = new LinkedList<Long>(); + list.add(offset); + map.put(key, list); + } + } + + public TreeMap<Tuple, LinkedList<Long>> getMap() { + return this.map; + } + + public void clear() { + this.map.clear(); + } + } + } + + /** + * BSTIndexReader is thread-safe. + */ + public class BSTIndexReader implements OrderIndexReader , Closeable{ + private Path fileName; + private Schema keySchema; + private TupleComparator comparator; + + private FileSystem fs; + private FSDataInputStream indexIn; + private FSDataInputStream subIn; + + private int level; + private int entryNum; + private int loadNum = -1; + private Tuple firstKey; + private Tuple lastKey; + + // the cursors of BST + private int rootCursor; + private int keyCursor; + private int offsetCursor; + + // mutex + private final Object mutex = new Object(); + + private RowStoreDecoder rowStoreDecoder; + + /** + * + * @param fileName + * @param keySchema + * @param comparator + * @throws java.io.IOException + */ + public BSTIndexReader(final Path fileName, Schema keySchema, TupleComparator comparator) throws IOException { + this.fileName = fileName; + this.keySchema = keySchema; + this.comparator = comparator; + this.rowStoreDecoder = RowStoreUtil.createDecoder(keySchema); + } + + public BSTIndexReader(final Path fileName) throws IOException { + this.fileName = fileName; + } + + public Schema getKeySchema() { + return this.keySchema; + } + + public TupleComparator getComparator() { + return this.comparator; + } + + private void readHeader() throws IOException { + // schema + int schemaByteSize = indexIn.readInt(); + byte [] schemaBytes = new byte[schemaByteSize]; + StorageUtil.readFully(indexIn, schemaBytes, 0, schemaByteSize); + + SchemaProto.Builder builder = SchemaProto.newBuilder(); + builder.mergeFrom(schemaBytes); + SchemaProto proto = builder.build(); + this.keySchema = new Schema(proto); + this.rowStoreDecoder = RowStoreUtil.createDecoder(keySchema); + + // comparator + int compByteSize = indexIn.readInt(); + byte [] compBytes = new byte[compByteSize]; + StorageUtil.readFully(indexIn, compBytes, 0, compByteSize); + + TupleComparatorProto.Builder compProto = TupleComparatorProto.newBuilder(); + compProto.mergeFrom(compBytes); + this.comparator = new BaseTupleComparator(compProto.build()); + + // level + this.level = indexIn.readInt(); + // entry + this.entryNum = indexIn.readInt(); + if (entryNum > 0) { // if there is no any entry, do not read firstKey/lastKey values + byte [] minBytes = new byte[indexIn.readInt()]; + StorageUtil.readFully(indexIn, minBytes, 0, minBytes.length); + this.firstKey = rowStoreDecoder.toTuple(minBytes); + + byte [] maxBytes = new byte[indexIn.readInt()]; + StorageUtil.readFully(indexIn, maxBytes, 0, maxBytes.length); + this.lastKey = rowStoreDecoder.toTuple(maxBytes); + } + } + + public void open() + throws IOException { + /* init the index file */ + fs = fileName.getFileSystem(conf); + if (!fs.exists(fileName)) { + throw new FileNotFoundException("ERROR: does not exist " + fileName.toString()); + } + + indexIn = fs.open(this.fileName); + readHeader(); + fillData(); + } + + private void fillData() throws IOException { + /* load on memory */ + if (this.level == TWO_LEVEL_INDEX) { + + Path rootPath = new Path(this.fileName + ".root"); + if (!fs.exists(rootPath)) { + throw new FileNotFoundException("root index did not created"); + } + + subIn = indexIn; + indexIn = fs.open(rootPath); + /* root index header reading : type => loadNum => indexSize */ + this.loadNum = indexIn.readInt(); + this.entryNum = indexIn.readInt(); + /**/ + fillRootIndex(entryNum, indexIn); + + } else { + fillLeafIndex(entryNum, indexIn, -1); + } + } + + /** + * + * @return + * @throws java.io.IOException + */ + public long find(Tuple key) throws IOException { + return find(key, false); + } + + @Override + public long find(Tuple key, boolean nextKey) throws IOException { + synchronized (mutex) { + int pos = -1; + if (this.level == ONE_LEVEL_INDEX) { + pos = oneLevBS(key); + } else if (this.level == TWO_LEVEL_INDEX) { + pos = twoLevBS(key, this.loadNum + 1); + } else { + throw new IOException("More than TWL_LEVEL_INDEX is not supported."); + } + + if (nextKey) { + if (pos + 1 >= this.offsetSubIndex.length) { + return -1; + } + keyCursor = pos + 1; + offsetCursor = 0; + } else { + if (correctable) { + keyCursor = pos; + offsetCursor = 0; + } else { + return -1; + } + } + + return this.offsetSubIndex[keyCursor][offsetCursor]; + } + } + + public long next() throws IOException { + synchronized (mutex) { + if (offsetSubIndex[keyCursor].length - 1 > offsetCursor) { + offsetCursor++; + } else { + if (offsetSubIndex.length - 1 > keyCursor) { + keyCursor++; + offsetCursor = 0; + } else { + if (offsetIndex.length -1 > rootCursor) { + rootCursor++; + fillLeafIndex(loadNum + 1, subIn, this.offsetIndex[rootCursor]); + keyCursor = 1; + offsetCursor = 0; + } else { + return -1; + } + } + } + + return this.offsetSubIndex[keyCursor][offsetCursor]; + } + } + + public boolean isCurInMemory() { + return (offsetSubIndex[keyCursor].length - 1 >= offsetCursor); + } + + private void fillLeafIndex(int entryNum, FSDataInputStream in, long pos) + throws IOException { + int counter = 0; + try { + if (pos != -1) { + in.seek(pos); + } + this.dataSubIndex = new Tuple[entryNum]; + this.offsetSubIndex = new long[entryNum][]; + + byte[] buf; + for (int i = 0; i < entryNum; i++) { + counter++; + buf = new byte[in.readInt()]; + StorageUtil.readFully(in, buf, 0, buf.length); + dataSubIndex[i] = rowStoreDecoder.toTuple(buf); + + int offsetNum = in.readInt(); + this.offsetSubIndex[i] = new long[offsetNum]; + for (int j = 0; j < offsetNum; j++) { + this.offsetSubIndex[i][j] = in.readLong(); + } + + } + + } catch (IOException e) { + counter--; + if (pos != -1) { + in.seek(pos); + } + this.dataSubIndex = new Tuple[counter]; + this.offsetSubIndex = new long[counter][]; + + byte[] buf; + for (int i = 0; i < counter; i++) { + buf = new byte[in.readInt()]; + StorageUtil.readFully(in, buf, 0, buf.length); + dataSubIndex[i] = rowStoreDecoder.toTuple(buf); + + int offsetNum = in.readInt(); + this.offsetSubIndex[i] = new long[offsetNum]; + for (int j = 0; j < offsetNum; j++) { + this.offsetSubIndex[i][j] = in.readLong(); + } + + } + } + } + + public Tuple getFirstKey() { + return this.firstKey; + } + + public Tuple getLastKey() { + return this.lastKey; + } + + private void fillRootIndex(int entryNum, FSDataInputStream in) + throws IOException { + this.dataIndex = new Tuple[entryNum]; + this.offsetIndex = new long[entryNum]; + Tuple keyTuple; + byte[] buf; + for (int i = 0; i < entryNum; i++) { + buf = new byte[in.readInt()]; + StorageUtil.readFully(in, buf, 0, buf.length); + keyTuple = rowStoreDecoder.toTuple(buf); + dataIndex[i] = keyTuple; + this.offsetIndex[i] = in.readLong(); + } + } + + /* memory index, only one is used. */ + private Tuple[] dataIndex = null; + private Tuple[] dataSubIndex = null; + + /* offset index */ + private long[] offsetIndex = null; + private long[][] offsetSubIndex = null; + + private boolean correctable = true; + + private int oneLevBS(Tuple key) throws IOException { + correctable = true; + int pos = binarySearch(this.dataSubIndex, key, 0, this.dataSubIndex.length); + return pos; + } + + private int twoLevBS(Tuple key, int loadNum) throws IOException { + int pos = binarySearch(this.dataIndex, key, 0, this.dataIndex.length); + if(pos > 0) { + rootCursor = pos; + } else { + rootCursor = 0; + } + fillLeafIndex(loadNum, subIn, this.offsetIndex[rootCursor]); + pos = binarySearch(this.dataSubIndex, key, 0, this.dataSubIndex.length); + + return pos; + } + + private int binarySearch(Tuple[] arr, Tuple key, int startPos, int endPos) { + int offset = -1; + int start = startPos; + int end = endPos; + + //http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6412541 + int centerPos = (start + end) >>> 1; + while (true) { + if (comparator.compare(arr[centerPos], key) > 0) { + if (centerPos == 0) { + correctable = false; + break; + } else if (comparator.compare(arr[centerPos - 1], key) < 0) { + correctable = false; + offset = centerPos - 1; + break; + } else { + end = centerPos; + centerPos = (start + end) / 2; + } + } else if (comparator.compare(arr[centerPos], key) < 0) { + if (centerPos == arr.length - 1) { + correctable = false; + offset = centerPos; + break; + } else if (comparator.compare(arr[centerPos + 1], key) > 0) { + correctable = false; + offset = centerPos; + break; + } else { + start = centerPos + 1; + centerPos = (start + end) / 2; + } + } else { + correctable = true; + offset = centerPos; + break; + } + } + return offset; + } + + @Override + public void close() throws IOException { + this.indexIn.close(); + this.subIn.close(); + } + + @Override + public String toString() { + return "BSTIndex (" + firstKey + ", " + lastKey + ") " + fileName; + } + } +}
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/ParquetAppender.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/ParquetAppender.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/ParquetAppender.java new file mode 100644 index 0000000..b10d423 --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/ParquetAppender.java @@ -0,0 +1,151 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.parquet; + +import org.apache.tajo.QueryUnitAttemptId; +import org.apache.tajo.storage.StorageConstants; +import parquet.hadoop.ParquetOutputFormat; +import parquet.hadoop.metadata.CompressionCodecName; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.catalog.statistics.TableStats; +import org.apache.tajo.storage.FileAppender; +import org.apache.tajo.storage.TableStatistics; +import org.apache.tajo.storage.Tuple; + +import java.io.IOException; + +/** + * FileAppender for writing to Parquet files. + */ +public class ParquetAppender extends FileAppender { + private TajoParquetWriter writer; + private int blockSize; + private int pageSize; + private CompressionCodecName compressionCodecName; + private boolean enableDictionary; + private boolean validating; + private TableStatistics stats; + + /** + * Creates a new ParquetAppender. + * + * @param conf Configuration properties. + * @param schema The table schema. + * @param meta The table metadata. + * @param workDir The path of the Parquet file to write to. + */ + public ParquetAppender(Configuration conf, QueryUnitAttemptId taskAttemptId, Schema schema, TableMeta meta, + Path workDir) throws IOException { + super(conf, taskAttemptId, schema, meta, workDir); + this.blockSize = Integer.parseInt( + meta.getOption(ParquetOutputFormat.BLOCK_SIZE, StorageConstants.PARQUET_DEFAULT_BLOCK_SIZE)); + this.pageSize = Integer.parseInt( + meta.getOption(ParquetOutputFormat.PAGE_SIZE, StorageConstants.PARQUET_DEFAULT_PAGE_SIZE)); + this.compressionCodecName = CompressionCodecName.fromConf( + meta.getOption(ParquetOutputFormat.COMPRESSION, StorageConstants.PARQUET_DEFAULT_COMPRESSION_CODEC_NAME)); + this.enableDictionary = Boolean.parseBoolean( + meta.getOption(ParquetOutputFormat.ENABLE_DICTIONARY, StorageConstants.PARQUET_DEFAULT_IS_DICTIONARY_ENABLED)); + this.validating = Boolean.parseBoolean( + meta.getOption(ParquetOutputFormat.VALIDATION, StorageConstants.PARQUET_DEFAULT_IS_VALIDATION_ENABLED)); + } + + /** + * Initializes the Appender. This method creates a new TajoParquetWriter + * and initializes the table statistics if enabled. + */ + public void init() throws IOException { + writer = new TajoParquetWriter(path, + schema, + compressionCodecName, + blockSize, + pageSize, + enableDictionary, + validating); + if (enabledStats) { + this.stats = new TableStatistics(schema); + } + super.init(); + } + + /** + * Gets the current offset. Tracking offsets is currenly not implemented, so + * this method always returns 0. + * + * @return 0 + */ + @Override + public long getOffset() throws IOException { + return 0; + } + + /** + * Write a Tuple to the Parquet file. + * + * @param tuple The Tuple to write. + */ + @Override + public void addTuple(Tuple tuple) throws IOException { + if (enabledStats) { + for (int i = 0; i < schema.size(); ++i) { + stats.analyzeField(i, tuple.get(i)); + } + } + writer.write(tuple); + if (enabledStats) { + stats.incrementRow(); + } + } + + /** + * The ParquetWriter does not need to be flushed, so this is a no-op. + */ + @Override + public void flush() throws IOException { + } + + /** + * Closes the Appender. + */ + @Override + public void close() throws IOException { + writer.close(); + } + + public long getEstimatedOutputSize() throws IOException { + return writer.getEstimatedWrittenSize(); + } + + /** + * If table statistics is enabled, retrieve the table statistics. + * + * @return Table statistics if enabled or null otherwise. + */ + @Override + public TableStats getStats() { + if (enabledStats) { + return stats.getTableStat(); + } else { + return null; + } + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/ParquetScanner.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/ParquetScanner.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/ParquetScanner.java new file mode 100644 index 0000000..2f8efcf --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/ParquetScanner.java @@ -0,0 +1,119 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.parquet; + +import org.apache.hadoop.conf.Configuration; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.storage.FileScanner; +import org.apache.tajo.storage.Tuple; +import org.apache.tajo.storage.fragment.Fragment; + +import java.io.IOException; + +/** + * FileScanner for reading Parquet files + */ +public class ParquetScanner extends FileScanner { + private TajoParquetReader reader; + + /** + * Creates a new ParquetScanner. + * + * @param conf + * @param schema + * @param meta + * @param fragment + */ + public ParquetScanner(Configuration conf, final Schema schema, + final TableMeta meta, final Fragment fragment) { + super(conf, schema, meta, fragment); + } + + /** + * Initializes the ParquetScanner. This method initializes the + * TajoParquetReader. + */ + @Override + public void init() throws IOException { + if (targets == null) { + targets = schema.toArray(); + } + reader = new TajoParquetReader(fragment.getPath(), schema, new Schema(targets)); + super.init(); + } + + /** + * Reads the next Tuple from the Parquet file. + * + * @return The next Tuple from the Parquet file or null if end of file is + * reached. + */ + @Override + public Tuple next() throws IOException { + return reader.read(); + } + + /** + * Resets the scanner + */ + @Override + public void reset() throws IOException { + } + + /** + * Closes the scanner. + */ + @Override + public void close() throws IOException { + if (reader != null) { + reader.close(); + } + } + + /** + * Returns whether this scanner is projectable. + * + * @return true + */ + @Override + public boolean isProjectable() { + return true; + } + + /** + * Returns whether this scanner is selectable. + * + * @return false + */ + @Override + public boolean isSelectable() { + return false; + } + + /** + * Returns whether this scanner is splittable. + * + * @return false + */ + @Override + public boolean isSplittable() { + return false; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoParquetReader.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoParquetReader.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoParquetReader.java new file mode 100644 index 0000000..a765f48 --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoParquetReader.java @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.parquet; + +import org.apache.hadoop.fs.Path; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.storage.Tuple; +import org.apache.tajo.storage.thirdparty.parquet.ParquetReader; +import parquet.filter.UnboundRecordFilter; + +import java.io.IOException; + +/** + * Tajo implementation of {@link ParquetReader} to read Tajo records from a + * Parquet file. Users should use {@link ParquetScanner} and not this class + * directly. + */ +public class TajoParquetReader extends ParquetReader<Tuple> { + /** + * Creates a new TajoParquetReader. + * + * @param file The file to read from. + * @param readSchema Tajo schema of the table. + */ + public TajoParquetReader(Path file, Schema readSchema) throws IOException { + super(file, new TajoReadSupport(readSchema)); + } + + /** + * Creates a new TajoParquetReader. + * + * @param file The file to read from. + * @param readSchema Tajo schema of the table. + * @param requestedSchema Tajo schema of the projection. + */ + public TajoParquetReader(Path file, Schema readSchema, + Schema requestedSchema) throws IOException { + super(file, new TajoReadSupport(readSchema, requestedSchema)); + } + + /** + * Creates a new TajoParquetReader. + * + * @param file The file to read from. + * @param readSchema Tajo schema of the table. + * @param recordFilter Record filter. + */ + public TajoParquetReader(Path file, Schema readSchema, + UnboundRecordFilter recordFilter) + throws IOException { + super(file, new TajoReadSupport(readSchema), recordFilter); + } + + /** + * Creates a new TajoParquetReader. + * + * @param file The file to read from. + * @param readSchema Tajo schema of the table. + * @param requestedSchema Tajo schema of the projection. + * @param recordFilter Record filter. + */ + public TajoParquetReader(Path file, Schema readSchema, + Schema requestedSchema, + UnboundRecordFilter recordFilter) + throws IOException { + super(file, new TajoReadSupport(readSchema, requestedSchema), + recordFilter); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoParquetWriter.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoParquetWriter.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoParquetWriter.java new file mode 100644 index 0000000..5f220c5 --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoParquetWriter.java @@ -0,0 +1,104 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.parquet; + +import org.apache.hadoop.fs.Path; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.storage.Tuple; +import org.apache.tajo.storage.thirdparty.parquet.ParquetWriter; +import parquet.hadoop.metadata.CompressionCodecName; + +import java.io.IOException; + +/** + * Tajo implementation of {@link ParquetWriter} to write Tajo records to a + * Parquet file. Users should use {@link ParquetAppender} and not this class + * directly. + */ +public class TajoParquetWriter extends ParquetWriter<Tuple> { + /** + * Create a new TajoParquetWriter + * + * @param file The file name to write to. + * @param schema The Tajo schema of the table. + * @param compressionCodecName Compression codec to use, or + * CompressionCodecName.UNCOMPRESSED. + * @param blockSize The block size threshold. + * @param pageSize See parquet write up. Blocks are subdivided into pages + * for alignment. + * @throws java.io.IOException + */ + public TajoParquetWriter(Path file, + Schema schema, + CompressionCodecName compressionCodecName, + int blockSize, + int pageSize) throws IOException { + super(file, + new TajoWriteSupport(schema), + compressionCodecName, + blockSize, + pageSize); + } + + /** + * Create a new TajoParquetWriter. + * + * @param file The file name to write to. + * @param schema The Tajo schema of the table. + * @param compressionCodecName Compression codec to use, or + * CompressionCodecName.UNCOMPRESSED. + * @param blockSize The block size threshold. + * @param pageSize See parquet write up. Blocks are subdivided into pages + * for alignment. + * @param enableDictionary Whether to use a dictionary to compress columns. + * @param validating Whether to turn on validation. + * @throws java.io.IOException + */ + public TajoParquetWriter(Path file, + Schema schema, + CompressionCodecName compressionCodecName, + int blockSize, + int pageSize, + boolean enableDictionary, + boolean validating) throws IOException { + super(file, + new TajoWriteSupport(schema), + compressionCodecName, + blockSize, + pageSize, + enableDictionary, + validating); + } + + /** + * Creates a new TajoParquetWriter. The default block size is 128 MB. + * The default page size is 1 MB. Default compression is no compression. + * + * @param file The Path of the file to write to. + * @param schema The Tajo schema of the table. + * @throws java.io.IOException + */ + public TajoParquetWriter(Path file, Schema schema) throws IOException { + this(file, + schema, + CompressionCodecName.UNCOMPRESSED, + DEFAULT_BLOCK_SIZE, + DEFAULT_PAGE_SIZE); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoReadSupport.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoReadSupport.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoReadSupport.java new file mode 100644 index 0000000..a64e987 --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoReadSupport.java @@ -0,0 +1,99 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.parquet; + +import org.apache.hadoop.conf.Configuration; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.storage.Tuple; +import parquet.Log; +import parquet.hadoop.api.InitContext; +import parquet.hadoop.api.ReadSupport; +import parquet.io.api.RecordMaterializer; +import parquet.schema.MessageType; + +import java.util.Map; + +/** + * Tajo implementation of {@link parquet.hadoop.api.ReadSupport} for {@link org.apache.tajo.storage.Tuple}s. + * Users should use {@link org.apache.tajo.storage.parquet.ParquetScanner} and not this class directly. + */ +public class TajoReadSupport extends ReadSupport<Tuple> { + private static final Log LOG = Log.getLog(TajoReadSupport.class); + + private Schema readSchema; + private Schema requestedSchema; + + /** + * Creates a new TajoReadSupport. + * + * @param requestedSchema The Tajo schema of the requested projection passed + * down by ParquetScanner. + */ + public TajoReadSupport(Schema readSchema, Schema requestedSchema) { + super(); + this.readSchema = readSchema; + this.requestedSchema = requestedSchema; + } + + /** + * Creates a new TajoReadSupport. + * + * @param readSchema The schema of the table. + */ + public TajoReadSupport(Schema readSchema) { + super(); + this.readSchema = readSchema; + this.requestedSchema = readSchema; + } + + /** + * Initializes the ReadSupport. + * + * @param context The InitContext. + * @return A ReadContext that defines how to read the file. + */ + @Override + public ReadContext init(InitContext context) { + if (requestedSchema == null) { + throw new RuntimeException("requestedSchema is null."); + } + MessageType requestedParquetSchema = + new TajoSchemaConverter().convert(requestedSchema); + LOG.debug("Reading data with projection:\n" + requestedParquetSchema); + return new ReadContext(requestedParquetSchema); + } + + /** + * Prepares for read. + * + * @param configuration The job configuration. + * @param keyValueMetaData App-specific metadata from the file. + * @param fileSchema The schema of the Parquet file. + * @param readContext Returned by the init method. + */ + @Override + public RecordMaterializer<Tuple> prepareForRead( + Configuration configuration, + Map<String, String> keyValueMetaData, + MessageType fileSchema, + ReadContext readContext) { + MessageType parquetRequestedSchema = readContext.getRequestedSchema(); + return new TajoRecordMaterializer(parquetRequestedSchema, requestedSchema, readSchema); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoRecordConverter.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoRecordConverter.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoRecordConverter.java new file mode 100644 index 0000000..4375fa4 --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoRecordConverter.java @@ -0,0 +1,380 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.parquet; + +import com.google.protobuf.InvalidProtocolBufferException; +import com.google.protobuf.Message; +import org.apache.tajo.catalog.Column; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.common.TajoDataTypes; +import org.apache.tajo.common.TajoDataTypes.DataType; +import org.apache.tajo.datum.*; +import org.apache.tajo.storage.Tuple; +import org.apache.tajo.storage.VTuple; +import parquet.io.api.Binary; +import parquet.io.api.Converter; +import parquet.io.api.GroupConverter; +import parquet.io.api.PrimitiveConverter; +import parquet.schema.GroupType; +import parquet.schema.Type; + +import java.nio.ByteBuffer; + +/** + * Converter to convert a Parquet record into a Tajo Tuple. + */ +public class TajoRecordConverter extends GroupConverter { + private final GroupType parquetSchema; + private final Schema tajoReadSchema; + private final int[] projectionMap; + private final int tupleSize; + + private final Converter[] converters; + + private Tuple currentTuple; + + /** + * Creates a new TajoRecordConverter. + * + * @param parquetSchema The Parquet schema of the projection. + * @param tajoReadSchema The Tajo schema of the table. + * @param projectionMap An array mapping the projection column to the column + * index in the table. + */ + public TajoRecordConverter(GroupType parquetSchema, Schema tajoReadSchema, + int[] projectionMap) { + this.parquetSchema = parquetSchema; + this.tajoReadSchema = tajoReadSchema; + this.projectionMap = projectionMap; + this.tupleSize = tajoReadSchema.size(); + + // The projectionMap.length does not match parquetSchema.getFieldCount() + // when the projection contains NULL_TYPE columns. We will skip over the + // NULL_TYPE columns when we construct the converters and populate the + // NULL_TYPE columns with NullDatums in start(). + int index = 0; + this.converters = new Converter[parquetSchema.getFieldCount()]; + for (int i = 0; i < projectionMap.length; ++i) { + final int projectionIndex = projectionMap[i]; + Column column = tajoReadSchema.getColumn(projectionIndex); + if (column.getDataType().getType() == TajoDataTypes.Type.NULL_TYPE) { + continue; + } + Type type = parquetSchema.getType(index); + converters[index] = newConverter(column, type, new ParentValueContainer() { + @Override + void add(Object value) { + TajoRecordConverter.this.set(projectionIndex, value); + } + }); + ++index; + } + } + + private void set(int index, Object value) { + currentTuple.put(index, (Datum)value); + } + + private Converter newConverter(Column column, Type type, + ParentValueContainer parent) { + DataType dataType = column.getDataType(); + switch (dataType.getType()) { + case BOOLEAN: + return new FieldBooleanConverter(parent); + case BIT: + return new FieldBitConverter(parent); + case CHAR: + return new FieldCharConverter(parent); + case INT2: + return new FieldInt2Converter(parent); + case INT4: + return new FieldInt4Converter(parent); + case INT8: + return new FieldInt8Converter(parent); + case FLOAT4: + return new FieldFloat4Converter(parent); + case FLOAT8: + return new FieldFloat8Converter(parent); + case INET4: + return new FieldInet4Converter(parent); + case INET6: + throw new RuntimeException("No converter for INET6"); + case TEXT: + return new FieldTextConverter(parent); + case PROTOBUF: + return new FieldProtobufConverter(parent, dataType); + case BLOB: + return new FieldBlobConverter(parent); + case NULL_TYPE: + throw new RuntimeException("No converter for NULL_TYPE."); + default: + throw new RuntimeException("Unsupported data type"); + } + } + + /** + * Gets the converter for a specific field. + * + * @param fieldIndex Index of the field in the projection. + * @return The converter for the field. + */ + @Override + public Converter getConverter(int fieldIndex) { + return converters[fieldIndex]; + } + + /** + * Called before processing fields. This method fills any fields that have + * NULL values or have type NULL_TYPE with a NullDatum. + */ + @Override + public void start() { + currentTuple = new VTuple(tupleSize); + } + + /** + * Called after all fields have been processed. + */ + @Override + public void end() { + for (int i = 0; i < projectionMap.length; ++i) { + final int projectionIndex = projectionMap[i]; + Column column = tajoReadSchema.getColumn(projectionIndex); + if (column.getDataType().getType() == TajoDataTypes.Type.NULL_TYPE + || currentTuple.get(projectionIndex) == null) { + set(projectionIndex, NullDatum.get()); + } + } + } + + /** + * Returns the current record converted by this converter. + * + * @return The current record. + */ + public Tuple getCurrentRecord() { + return currentTuple; + } + + static abstract class ParentValueContainer { + /** + * Adds the value to the parent. + * + * @param value The value to add. + */ + abstract void add(Object value); + } + + static final class FieldBooleanConverter extends PrimitiveConverter { + private final ParentValueContainer parent; + + public FieldBooleanConverter(ParentValueContainer parent) { + this.parent = parent; + } + + @Override + final public void addBoolean(boolean value) { + parent.add(DatumFactory.createBool(value)); + } + } + + static final class FieldBitConverter extends PrimitiveConverter { + private final ParentValueContainer parent; + + public FieldBitConverter(ParentValueContainer parent) { + this.parent = parent; + } + + @Override + final public void addInt(int value) { + parent.add(DatumFactory.createBit((byte)(value & 0xff))); + } + } + + static final class FieldCharConverter extends PrimitiveConverter { + private final ParentValueContainer parent; + + public FieldCharConverter(ParentValueContainer parent) { + this.parent = parent; + } + + @Override + final public void addBinary(Binary value) { + parent.add(DatumFactory.createChar(value.toStringUsingUTF8())); + } + } + + static final class FieldInt2Converter extends PrimitiveConverter { + private final ParentValueContainer parent; + + public FieldInt2Converter(ParentValueContainer parent) { + this.parent = parent; + } + + @Override + final public void addInt(int value) { + parent.add(DatumFactory.createInt2((short)value)); + } + } + + static final class FieldInt4Converter extends PrimitiveConverter { + private final ParentValueContainer parent; + + public FieldInt4Converter(ParentValueContainer parent) { + this.parent = parent; + } + + @Override + final public void addInt(int value) { + parent.add(DatumFactory.createInt4(value)); + } + } + + static final class FieldInt8Converter extends PrimitiveConverter { + private final ParentValueContainer parent; + + public FieldInt8Converter(ParentValueContainer parent) { + this.parent = parent; + } + + @Override + final public void addLong(long value) { + parent.add(DatumFactory.createInt8(value)); + } + + @Override + final public void addInt(int value) { + parent.add(DatumFactory.createInt8(Long.valueOf(value))); + } + } + + static final class FieldFloat4Converter extends PrimitiveConverter { + private final ParentValueContainer parent; + + public FieldFloat4Converter(ParentValueContainer parent) { + this.parent = parent; + } + + @Override + final public void addInt(int value) { + parent.add(DatumFactory.createFloat4(Float.valueOf(value))); + } + + @Override + final public void addLong(long value) { + parent.add(DatumFactory.createFloat4(Float.valueOf(value))); + } + + @Override + final public void addFloat(float value) { + parent.add(DatumFactory.createFloat4(value)); + } + } + + static final class FieldFloat8Converter extends PrimitiveConverter { + private final ParentValueContainer parent; + + public FieldFloat8Converter(ParentValueContainer parent) { + this.parent = parent; + } + + @Override + final public void addInt(int value) { + parent.add(DatumFactory.createFloat8(Double.valueOf(value))); + } + + @Override + final public void addLong(long value) { + parent.add(DatumFactory.createFloat8(Double.valueOf(value))); + } + + @Override + final public void addFloat(float value) { + parent.add(DatumFactory.createFloat8(Double.valueOf(value))); + } + + @Override + final public void addDouble(double value) { + parent.add(DatumFactory.createFloat8(value)); + } + } + + static final class FieldInet4Converter extends PrimitiveConverter { + private final ParentValueContainer parent; + + public FieldInet4Converter(ParentValueContainer parent) { + this.parent = parent; + } + + @Override + final public void addBinary(Binary value) { + parent.add(DatumFactory.createInet4(value.getBytes())); + } + } + + static final class FieldTextConverter extends PrimitiveConverter { + private final ParentValueContainer parent; + + public FieldTextConverter(ParentValueContainer parent) { + this.parent = parent; + } + + @Override + final public void addBinary(Binary value) { + parent.add(DatumFactory.createText(value.toStringUsingUTF8())); + } + } + + static final class FieldBlobConverter extends PrimitiveConverter { + private final ParentValueContainer parent; + + public FieldBlobConverter(ParentValueContainer parent) { + this.parent = parent; + } + + @Override + final public void addBinary(Binary value) { + parent.add(new BlobDatum(ByteBuffer.wrap(value.getBytes()))); + } + } + + static final class FieldProtobufConverter extends PrimitiveConverter { + private final ParentValueContainer parent; + private final DataType dataType; + + public FieldProtobufConverter(ParentValueContainer parent, + DataType dataType) { + this.parent = parent; + this.dataType = dataType; + } + + @Override + final public void addBinary(Binary value) { + try { + ProtobufDatumFactory factory = + ProtobufDatumFactory.get(dataType.getCode()); + Message.Builder builder = factory.newBuilder(); + builder.mergeFrom(value.getBytes()); + parent.add(factory.createDatum(builder)); + } catch (InvalidProtocolBufferException e) { + throw new RuntimeException(e); + } + } + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoRecordMaterializer.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoRecordMaterializer.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoRecordMaterializer.java new file mode 100644 index 0000000..436159c --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoRecordMaterializer.java @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.parquet; + +import org.apache.tajo.catalog.Column; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.storage.Tuple; +import parquet.io.api.GroupConverter; +import parquet.io.api.RecordMaterializer; +import parquet.schema.MessageType; + +/** + * Materializes a Tajo Tuple from a stream of Parquet data. + */ +class TajoRecordMaterializer extends RecordMaterializer<Tuple> { + private final TajoRecordConverter root; + + /** + * Creates a new TajoRecordMaterializer. + * + * @param parquetSchema The Parquet schema of the projection. + * @param tajoSchema The Tajo schema of the projection. + * @param tajoReadSchema The Tajo schema of the table. + */ + public TajoRecordMaterializer(MessageType parquetSchema, Schema tajoSchema, + Schema tajoReadSchema) { + int[] projectionMap = getProjectionMap(tajoReadSchema, tajoSchema); + this.root = new TajoRecordConverter(parquetSchema, tajoReadSchema, + projectionMap); + } + + private int[] getProjectionMap(Schema schema, Schema projection) { + Column[] targets = projection.toArray(); + int[] projectionMap = new int[targets.length]; + for (int i = 0; i < targets.length; ++i) { + int tid = schema.getColumnId(targets[i].getQualifiedName()); + projectionMap[i] = tid; + } + return projectionMap; + } + + /** + * Returns the current record being materialized. + * + * @return The record being materialized. + */ + @Override + public Tuple getCurrentRecord() { + return root.getCurrentRecord(); + } + + /** + * Returns the root converter. + * + * @return The root converter + */ + @Override + public GroupConverter getRootConverter() { + return root; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoSchemaConverter.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoSchemaConverter.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoSchemaConverter.java new file mode 100644 index 0000000..555b623 --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoSchemaConverter.java @@ -0,0 +1,206 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.parquet; + +import org.apache.tajo.catalog.Column; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.common.TajoDataTypes; +import parquet.schema.MessageType; +import parquet.schema.OriginalType; +import parquet.schema.PrimitiveType; +import parquet.schema.PrimitiveType.PrimitiveTypeName; +import parquet.schema.Type; + +import java.util.ArrayList; +import java.util.List; + +/** + * Converts between Parquet and Tajo schemas. See package documentation for + * details on the mapping. + */ +public class TajoSchemaConverter { + private static final String TABLE_SCHEMA = "table_schema"; + + /** + * Creates a new TajoSchemaConverter. + */ + public TajoSchemaConverter() { + } + + /** + * Converts a Parquet schema to a Tajo schema. + * + * @param parquetSchema The Parquet schema to convert. + * @return The resulting Tajo schema. + */ + public Schema convert(MessageType parquetSchema) { + return convertFields(parquetSchema.getFields()); + } + + private Schema convertFields(List<Type> parquetFields) { + List<Column> columns = new ArrayList<Column>(); + for (int i = 0; i < parquetFields.size(); ++i) { + Type fieldType = parquetFields.get(i); + if (fieldType.isRepetition(Type.Repetition.REPEATED)) { + throw new RuntimeException("REPEATED not supported outside LIST or" + + " MAP. Type: " + fieldType); + } + columns.add(convertField(fieldType)); + } + Column[] columnsArray = new Column[columns.size()]; + columnsArray = columns.toArray(columnsArray); + return new Schema(columnsArray); + } + + private Column convertField(final Type fieldType) { + if (fieldType.isPrimitive()) { + return convertPrimitiveField(fieldType); + } else { + return convertComplexField(fieldType); + } + } + + private Column convertPrimitiveField(final Type fieldType) { + final String fieldName = fieldType.getName(); + final PrimitiveTypeName parquetPrimitiveTypeName = + fieldType.asPrimitiveType().getPrimitiveTypeName(); + final OriginalType originalType = fieldType.getOriginalType(); + return parquetPrimitiveTypeName.convert( + new PrimitiveType.PrimitiveTypeNameConverter<Column, RuntimeException>() { + @Override + public Column convertBOOLEAN(PrimitiveTypeName primitiveTypeName) { + return new Column(fieldName, TajoDataTypes.Type.BOOLEAN); + } + + @Override + public Column convertINT32(PrimitiveTypeName primitiveTypeName) { + return new Column(fieldName, TajoDataTypes.Type.INT4); + } + + @Override + public Column convertINT64(PrimitiveTypeName primitiveTypeName) { + return new Column(fieldName, TajoDataTypes.Type.INT8); + } + + @Override + public Column convertFLOAT(PrimitiveTypeName primitiveTypeName) { + return new Column(fieldName, TajoDataTypes.Type.FLOAT4); + } + + @Override + public Column convertDOUBLE(PrimitiveTypeName primitiveTypeName) { + return new Column(fieldName, TajoDataTypes.Type.FLOAT8); + } + + @Override + public Column convertFIXED_LEN_BYTE_ARRAY( + PrimitiveTypeName primitiveTypeName) { + return new Column(fieldName, TajoDataTypes.Type.BLOB); + } + + @Override + public Column convertBINARY(PrimitiveTypeName primitiveTypeName) { + if (originalType == OriginalType.UTF8) { + return new Column(fieldName, TajoDataTypes.Type.TEXT); + } else { + return new Column(fieldName, TajoDataTypes.Type.BLOB); + } + } + + @Override + public Column convertINT96(PrimitiveTypeName primitiveTypeName) { + throw new RuntimeException("Converting from INT96 not supported."); + } + }); + } + + private Column convertComplexField(final Type fieldType) { + throw new RuntimeException("Complex types not supported."); + } + + /** + * Converts a Tajo schema to a Parquet schema. + * + * @param tajoSchema The Tajo schema to convert. + * @return The resulting Parquet schema. + */ + public MessageType convert(Schema tajoSchema) { + List<Type> types = new ArrayList<Type>(); + for (int i = 0; i < tajoSchema.size(); ++i) { + Column column = tajoSchema.getColumn(i); + if (column.getDataType().getType() == TajoDataTypes.Type.NULL_TYPE) { + continue; + } + types.add(convertColumn(column)); + } + return new MessageType(TABLE_SCHEMA, types); + } + + private Type convertColumn(Column column) { + TajoDataTypes.Type type = column.getDataType().getType(); + switch (type) { + case BOOLEAN: + return primitive(column.getSimpleName(), + PrimitiveTypeName.BOOLEAN); + case BIT: + case INT2: + case INT4: + return primitive(column.getSimpleName(), + PrimitiveTypeName.INT32); + case INT8: + return primitive(column.getSimpleName(), + PrimitiveTypeName.INT64); + case FLOAT4: + return primitive(column.getSimpleName(), + PrimitiveTypeName.FLOAT); + case FLOAT8: + return primitive(column.getSimpleName(), + PrimitiveTypeName.DOUBLE); + case CHAR: + case TEXT: + return primitive(column.getSimpleName(), + PrimitiveTypeName.BINARY, + OriginalType.UTF8); + case PROTOBUF: + return primitive(column.getSimpleName(), + PrimitiveTypeName.BINARY); + case BLOB: + return primitive(column.getSimpleName(), + PrimitiveTypeName.BINARY); + case INET4: + case INET6: + return primitive(column.getSimpleName(), + PrimitiveTypeName.BINARY); + default: + throw new RuntimeException("Cannot convert Tajo type: " + type); + } + } + + private PrimitiveType primitive(String name, + PrimitiveTypeName primitive) { + return new PrimitiveType(Type.Repetition.OPTIONAL, primitive, name, null); + } + + private PrimitiveType primitive(String name, + PrimitiveTypeName primitive, + OriginalType originalType) { + return new PrimitiveType(Type.Repetition.OPTIONAL, primitive, name, + originalType); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoWriteSupport.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoWriteSupport.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoWriteSupport.java new file mode 100644 index 0000000..00aadf4 --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoWriteSupport.java @@ -0,0 +1,148 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.parquet; + +import org.apache.hadoop.conf.Configuration; +import org.apache.tajo.catalog.Column; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.common.TajoDataTypes; +import org.apache.tajo.datum.Datum; +import org.apache.tajo.storage.Tuple; +import parquet.hadoop.api.WriteSupport; +import parquet.io.api.Binary; +import parquet.io.api.RecordConsumer; +import parquet.schema.GroupType; +import parquet.schema.MessageType; +import parquet.schema.Type; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Tajo implementation of {@link parquet.hadoop.api.WriteSupport} for {@link org.apache.tajo.storage.Tuple}s. + * Users should use {@link ParquetAppender} and not this class directly. + */ +public class TajoWriteSupport extends WriteSupport<Tuple> { + private RecordConsumer recordConsumer; + private MessageType rootSchema; + private Schema rootTajoSchema; + + /** + * Creates a new TajoWriteSupport. + * + * @param tajoSchema The Tajo schema for the table. + */ + public TajoWriteSupport(Schema tajoSchema) { + this.rootSchema = new TajoSchemaConverter().convert(tajoSchema); + this.rootTajoSchema = tajoSchema; + } + + /** + * Initializes the WriteSupport. + * + * @param configuration The job's configuration. + * @return A WriteContext that describes how to write the file. + */ + @Override + public WriteContext init(Configuration configuration) { + Map<String, String> extraMetaData = new HashMap<String, String>(); + return new WriteContext(rootSchema, extraMetaData); + } + + /** + * Called once per row group. + * + * @param recordConsumer The {@link parquet.io.api.RecordConsumer} to write to. + */ + @Override + public void prepareForWrite(RecordConsumer recordConsumer) { + this.recordConsumer = recordConsumer; + } + + /** + * Writes a Tuple to the file. + * + * @param tuple The Tuple to write to the file. + */ + @Override + public void write(Tuple tuple) { + recordConsumer.startMessage(); + writeRecordFields(rootSchema, rootTajoSchema, tuple); + recordConsumer.endMessage(); + } + + private void writeRecordFields(GroupType schema, Schema tajoSchema, + Tuple tuple) { + List<Type> fields = schema.getFields(); + // Parquet ignores Tajo NULL_TYPE columns, so the index may differ. + int index = 0; + for (int tajoIndex = 0; tajoIndex < tajoSchema.size(); ++tajoIndex) { + Column column = tajoSchema.getColumn(tajoIndex); + if (column.getDataType().getType() == TajoDataTypes.Type.NULL_TYPE) { + continue; + } + Datum datum = tuple.get(tajoIndex); + Type fieldType = fields.get(index); + if (!tuple.isNull(tajoIndex)) { + recordConsumer.startField(fieldType.getName(), index); + writeValue(fieldType, column, datum); + recordConsumer.endField(fieldType.getName(), index); + } else if (fieldType.isRepetition(Type.Repetition.REQUIRED)) { + throw new RuntimeException("Null-value for required field: " + + column.getSimpleName()); + } + ++index; + } + } + + private void writeValue(Type fieldType, Column column, Datum datum) { + switch (column.getDataType().getType()) { + case BOOLEAN: + recordConsumer.addBoolean((Boolean) datum.asBool()); + break; + case BIT: + case INT2: + case INT4: + recordConsumer.addInteger(datum.asInt4()); + break; + case INT8: + recordConsumer.addLong(datum.asInt8()); + break; + case FLOAT4: + recordConsumer.addFloat(datum.asFloat4()); + break; + case FLOAT8: + recordConsumer.addDouble(datum.asFloat8()); + break; + case CHAR: + case TEXT: + recordConsumer.addBinary(Binary.fromString(datum.asChars())); + break; + case PROTOBUF: + case BLOB: + case INET4: + case INET6: + recordConsumer.addBinary(Binary.fromByteArray(datum.asByteArray())); + break; + default: + break; + } + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/package-info.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/package-info.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/package-info.java new file mode 100644 index 0000000..d7d16b7 --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/package-info.java @@ -0,0 +1,96 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * <p> + * Provides read and write support for Parquet files. Tajo schemas are + * converted to Parquet schemas according to the following mapping of Tajo + * and Parquet types: + * </p> + * + * <table> + * <tr> + * <th>Tajo type</th> + * <th>Parquet type</th> + * </tr> + * <tr> + * <td>NULL_TYPE</td> + * <td>No type. The field is not encoded in Parquet.</td> + * </tr> + * <tr> + * <td>BOOLEAN</td> + * <td>BOOLEAN</td> + * </tr> + * <tr> + * <td>BIT</td> + * <td>INT32</td> + * </tr> + * <tr> + * <td>INT2</td> + * <td>INT32</td> + * </tr> + * <tr> + * <td>INT4</td> + * <td>INT32</td> + * </tr> + * <tr> + * <td>INT8</td> + * <td>INT64</td> + * </tr> + * <tr> + * <td>FLOAT4</td> + * <td>FLOAT</td> + * </tr> + * <tr> + * <td>FLOAT8</td> + * <td>DOUBLE</td> + * </tr> + * <tr> + * <td>CHAR</td> + * <td>BINARY (with OriginalType UTF8)</td> + * </tr> + * <tr> + * <td>TEXT</td> + * <td>BINARY (with OriginalType UTF8)</td> + * </tr> + * <tr> + * <td>PROTOBUF</td> + * <td>BINARY</td> + * </tr> + * <tr> + * <td>BLOB</td> + * <td>BINARY</td> + * </tr> + * <tr> + * <td>INET4</td> + * <td>BINARY</td> + * </tr> + * </table> + * + * <p> + * Because Tajo fields can be NULL, all Parquet fields are marked as optional. + * </p> + * + * <p> + * The conversion from Tajo to Parquet is lossy without the original Tajo + * schema. As a result, Parquet files are read using the Tajo schema saved in + * the Tajo catalog for the table the Parquet files belong to, which was + * defined when the table was created. + * </p> + */ + +package org.apache.tajo.storage.parquet; http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/BytesRefArrayWritable.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/BytesRefArrayWritable.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/BytesRefArrayWritable.java new file mode 100644 index 0000000..5e200a0 --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/BytesRefArrayWritable.java @@ -0,0 +1,261 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.rcfile; + +import com.google.common.base.Objects; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableFactories; +import org.apache.hadoop.io.WritableFactory; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.Arrays; + +/** + * <tt>BytesRefArrayWritable</tt> holds an array reference to BytesRefWritable, + * and is able to resize without recreating new array if not necessary. + * <p> + * + * Each <tt>BytesRefArrayWritable holds</tt> instance has a <i>valid</i> field, + * which is the desired valid number of <tt>BytesRefWritable</tt> it holds. + * <tt>resetValid</tt> can reset the valid, but it will not care the underlying + * BytesRefWritable. + */ + +public class BytesRefArrayWritable implements Writable, + Comparable<BytesRefArrayWritable> { + + private BytesRefWritable[] bytesRefWritables = null; + + private int valid = 0; + + /** + * Constructs an empty array with the specified capacity. + * + * @param capacity + * initial capacity + * @exception IllegalArgumentException + * if the specified initial capacity is negative + */ + public BytesRefArrayWritable(int capacity) { + if (capacity < 0) { + throw new IllegalArgumentException("Capacity can not be negative."); + } + bytesRefWritables = new BytesRefWritable[0]; + ensureCapacity(capacity); + } + + /** + * Constructs an empty array with a capacity of ten. + */ + public BytesRefArrayWritable() { + this(10); + } + + /** + * Returns the number of valid elements. + * + * @return the number of valid elements + */ + public int size() { + return valid; + } + + /** + * Gets the BytesRefWritable at the specified position. Make sure the position + * is valid by first call resetValid. + * + * @param index + * the position index, starting from zero + * @throws IndexOutOfBoundsException + */ + public BytesRefWritable get(int index) { + if (index >= valid) { + throw new IndexOutOfBoundsException( + "This BytesRefArrayWritable only has " + valid + " valid values."); + } + return bytesRefWritables[index]; + } + + /** + * Gets the BytesRefWritable at the specified position without checking. + * + * @param index + * the position index, starting from zero + * @throws IndexOutOfBoundsException + */ + public BytesRefWritable unCheckedGet(int index) { + return bytesRefWritables[index]; + } + + /** + * Set the BytesRefWritable at the specified position with the specified + * BytesRefWritable. + * + * @param index + * index position + * @param bytesRefWritable + * the new element + * @throws IllegalArgumentException + * if the specified new element is null + */ + public void set(int index, BytesRefWritable bytesRefWritable) { + if (bytesRefWritable == null) { + throw new IllegalArgumentException("Can not assign null."); + } + ensureCapacity(index + 1); + bytesRefWritables[index] = bytesRefWritable; + if (valid <= index) { + valid = index + 1; + } + } + + /** + * {@inheritDoc} + */ + @Override + public int compareTo(BytesRefArrayWritable other) { + if (other == null) { + throw new IllegalArgumentException("Argument can not be null."); + } + if (this == other) { + return 0; + } + int sizeDiff = valid - other.valid; + if (sizeDiff != 0) { + return sizeDiff; + } + for (int i = 0; i < valid; i++) { + if (other.contains(bytesRefWritables[i])) { + continue; + } else { + return 1; + } + } + return 0; + } + + @Override + public int hashCode(){ + return Objects.hashCode(bytesRefWritables); + } + /** + * Returns <tt>true</tt> if this instance contains one or more the specified + * BytesRefWritable. + * + * @param bytesRefWritable + * BytesRefWritable element to be tested + * @return <tt>true</tt> if contains the specified element + * @throws IllegalArgumentException + * if the specified element is null + */ + public boolean contains(BytesRefWritable bytesRefWritable) { + if (bytesRefWritable == null) { + throw new IllegalArgumentException("Argument can not be null."); + } + for (int i = 0; i < valid; i++) { + if (bytesRefWritables[i].equals(bytesRefWritable)) { + return true; + } + } + return false; + } + + /** + * {@inheritDoc} + */ + @Override + public boolean equals(Object o) { + if (o == null || !(o instanceof BytesRefArrayWritable)) { + return false; + } + return compareTo((BytesRefArrayWritable) o) == 0; + } + + /** + * Removes all elements. + */ + public void clear() { + valid = 0; + } + + /** + * enlarge the capacity if necessary, to ensure that it can hold the number of + * elements specified by newValidCapacity argument. It will also narrow the + * valid capacity when needed. Notice: it only enlarge or narrow the valid + * capacity with no care of the already stored invalid BytesRefWritable. + * + * @param newValidCapacity + * the desired capacity + */ + public void resetValid(int newValidCapacity) { + ensureCapacity(newValidCapacity); + valid = newValidCapacity; + } + + protected void ensureCapacity(int newCapacity) { + int size = bytesRefWritables.length; + if (size < newCapacity) { + bytesRefWritables = Arrays.copyOf(bytesRefWritables, newCapacity); + while (size < newCapacity) { + bytesRefWritables[size] = new BytesRefWritable(); + size++; + } + } + } + + /** + * {@inheritDoc} + */ + @Override + public void readFields(DataInput in) throws IOException { + int count = in.readInt(); + ensureCapacity(count); + for (int i = 0; i < count; i++) { + bytesRefWritables[i].readFields(in); + } + valid = count; + } + + /** + * {@inheritDoc} + */ + @Override + public void write(DataOutput out) throws IOException { + out.writeInt(valid); + + for (int i = 0; i < valid; i++) { + BytesRefWritable cu = bytesRefWritables[i]; + cu.write(out); + } + } + + static { + WritableFactories.setFactory(BytesRefArrayWritable.class, + new WritableFactory() { + + @Override + public Writable newInstance() { + return new BytesRefArrayWritable(); + } + + }); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/BytesRefWritable.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/BytesRefWritable.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/BytesRefWritable.java new file mode 100644 index 0000000..158c740 --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/BytesRefWritable.java @@ -0,0 +1,248 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.rcfile; + +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparator; +import org.apache.hadoop.io.WritableFactories; +import org.apache.hadoop.io.WritableFactory; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +/** + * <tt>BytesRefWritable</tt> referenced a section of byte array. It can be used + * to avoid unnecessary byte copy. + */ +public class BytesRefWritable implements Writable, Comparable<BytesRefWritable> { + + private static final byte[] EMPTY_BYTES = new byte[0]; + public static final BytesRefWritable ZeroBytesRefWritable = new BytesRefWritable(); + + int start = 0; + int length = 0; + byte[] bytes = null; + + LazyDecompressionCallback lazyDecompressObj; + + /** + * Create a zero-size bytes. + */ + public BytesRefWritable() { + this(EMPTY_BYTES); + } + + /** + * Create a BytesRefWritable with <tt>length</tt> bytes. + */ + public BytesRefWritable(int length) { + assert length > 0; + this.length = length; + bytes = new byte[this.length]; + start = 0; + } + + /** + * Create a BytesRefWritable referenced to the given bytes. + */ + public BytesRefWritable(byte[] bytes) { + this.bytes = bytes; + length = bytes.length; + start = 0; + } + + /** + * Create a BytesRefWritable referenced to one section of the given bytes. The + * section is determined by argument <tt>offset</tt> and <tt>len</tt>. + */ + public BytesRefWritable(byte[] data, int offset, int len) { + bytes = data; + start = offset; + length = len; + } + + /** + * Create a BytesRefWritable referenced to one section of the given bytes. The + * argument <tt>lazyDecompressData</tt> refers to a LazyDecompressionCallback + * object. The arguments <tt>offset</tt> and <tt>len</tt> are referred to + * uncompressed bytes of <tt>lazyDecompressData</tt>. Use <tt>offset</tt> and + * <tt>len</tt> after uncompressing the data. + */ + public BytesRefWritable(LazyDecompressionCallback lazyDecompressData, + int offset, int len) { + lazyDecompressObj = lazyDecompressData; + start = offset; + length = len; + } + + private void lazyDecompress() throws IOException { + if (bytes == null && lazyDecompressObj != null) { + bytes = lazyDecompressObj.decompress(); + } + } + + /** + * Returns a copy of the underlying bytes referenced by this instance. + * + * @return a new copied byte array + * @throws java.io.IOException + */ + public byte[] getBytesCopy() throws IOException { + lazyDecompress(); + byte[] bb = new byte[length]; + System.arraycopy(bytes, start, bb, 0, length); + return bb; + } + + /** + * Returns the underlying bytes. + * + * @throws java.io.IOException + */ + public byte[] getData() throws IOException { + lazyDecompress(); + return bytes; + } + + /** + * readFields() will corrupt the array. So use the set method whenever + * possible. + * + * @see #readFields(java.io.DataInput) + */ + public void set(byte[] newData, int offset, int len) { + bytes = newData; + start = offset; + length = len; + lazyDecompressObj = null; + } + + /** + * readFields() will corrupt the array. So use the set method whenever + * possible. + * + * @see #readFields(java.io.DataInput) + */ + public void set(LazyDecompressionCallback newData, int offset, int len) { + bytes = null; + start = offset; + length = len; + lazyDecompressObj = newData; + } + + public void writeDataTo(DataOutput out) throws IOException { + lazyDecompress(); + out.write(bytes, start, length); + } + + /** + * Always reuse the bytes array if length of bytes array is equal or greater + * to the current record, otherwise create a new one. readFields will corrupt + * the array. Please use set() whenever possible. + * + * @see #set(byte[], int, int) + */ + public void readFields(DataInput in) throws IOException { + int len = in.readInt(); + if (len > bytes.length) { + bytes = new byte[len]; + } + start = 0; + length = len; + in.readFully(bytes, start, length); + } + + /** {@inheritDoc} */ + public void write(DataOutput out) throws IOException { + lazyDecompress(); + out.writeInt(length); + out.write(bytes, start, length); + } + + /** {@inheritDoc} */ + @Override + public int hashCode() { + return super.hashCode(); + } + + /** {@inheritDoc} */ + @Override + public String toString() { + StringBuilder sb = new StringBuilder(3 * length); + for (int idx = start; idx < length; idx++) { + // if not the first, put a blank separator in + if (idx != 0) { + sb.append(' '); + } + String num = Integer.toHexString(0xff & bytes[idx]); + // if it is only one digit, add a leading 0. + if (num.length() < 2) { + sb.append('0'); + } + sb.append(num); + } + return sb.toString(); + } + + /** {@inheritDoc} */ + @Override + public int compareTo(BytesRefWritable other) { + if (other == null) { + throw new IllegalArgumentException("Argument can not be null."); + } + if (this == other) { + return 0; + } + try { + return WritableComparator.compareBytes(getData(), start, getLength(), + other.getData(), other.start, other.getLength()); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + /** {@inheritDoc} */ + @Override + public boolean equals(Object right_obj) { + if (right_obj == null || !(right_obj instanceof BytesRefWritable)) { + return false; + } + return compareTo((BytesRefWritable) right_obj) == 0; + } + + static { + WritableFactories.setFactory(BytesRefWritable.class, new WritableFactory() { + + @Override + public Writable newInstance() { + return new BytesRefWritable(); + } + + }); + } + + public int getLength() { + return length; + } + + public int getStart() { + return start; + } +}
