http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/index/bst/BSTIndex.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/index/bst/BSTIndex.java b/tajo-storage/src/main/java/org/apache/tajo/storage/index/bst/BSTIndex.java deleted file mode 100644 index f093f9d..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/index/bst/BSTIndex.java +++ /dev/null @@ -1,623 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage.index.bst; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.tajo.catalog.Schema; -import org.apache.tajo.catalog.proto.CatalogProtos.SchemaProto; -import org.apache.tajo.storage.*; -import org.apache.tajo.storage.RowStoreUtil.RowStoreDecoder; -import org.apache.tajo.storage.RowStoreUtil.RowStoreEncoder; -import org.apache.tajo.storage.index.IndexMethod; -import org.apache.tajo.storage.index.IndexWriter; -import org.apache.tajo.storage.index.OrderIndexReader; - -import java.io.Closeable; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.util.LinkedList; -import java.util.Set; -import java.util.TreeMap; - -import static org.apache.tajo.index.IndexProtos.TupleComparatorProto; - -/** - * This is two-level binary search tree index. This is one of the value-list - * index structure. Thus, it is inefficient in the case where - * the many of the values are same. Also, the BST shows the fast performance - * when the selectivity of rows to be retrieved is less than 5%. - * BSTIndexWriter is not thread-safe, whereas BSTIndexReader is thread-safe. - */ -public class BSTIndex implements IndexMethod { - private static final Log LOG = LogFactory.getLog(BSTIndex.class); - - public static final int ONE_LEVEL_INDEX = 1; - public static final int TWO_LEVEL_INDEX = 2; - - private final Configuration conf; - - public BSTIndex(final Configuration conf) { - this.conf = conf; - } - - @Override - public BSTIndexWriter getIndexWriter(final Path fileName, int level, Schema keySchema, - TupleComparator comparator) throws IOException { - return new BSTIndexWriter(fileName, level, keySchema, comparator); - } - - @Override - public BSTIndexReader getIndexReader(Path fileName, Schema keySchema, TupleComparator comparator) throws IOException { - return new BSTIndexReader(fileName, keySchema, comparator); - } - - public BSTIndexReader getIndexReader(Path fileName) throws IOException { - return new BSTIndexReader(fileName); - } - - public class BSTIndexWriter extends IndexWriter implements Closeable { - private FSDataOutputStream out; - private FileSystem fs; - private int level; - private int loadNum = 4096; - private Path fileName; - - private final Schema keySchema; - private final TupleComparator compartor; - private final KeyOffsetCollector collector; - private KeyOffsetCollector rootCollector; - - private Tuple firstKey; - private Tuple lastKey; - - private RowStoreEncoder rowStoreEncoder; - - // private Tuple lastestKey = null; - - /** - * constructor - * - * @param level - * : IndexCreater.ONE_LEVEL_INDEX or IndexCreater.TWO_LEVEL_INDEX - * @throws IOException - */ - public BSTIndexWriter(final Path fileName, int level, Schema keySchema, - TupleComparator comparator) throws IOException { - this.fileName = fileName; - this.level = level; - this.keySchema = keySchema; - this.compartor = comparator; - this.collector = new KeyOffsetCollector(comparator); - this.rowStoreEncoder = RowStoreUtil.createEncoder(keySchema); - } - - public void setLoadNum(int loadNum) { - this.loadNum = loadNum; - } - - public void open() throws IOException { - fs = fileName.getFileSystem(conf); - if (fs.exists(fileName)) { - throw new IOException("ERROR: index file (" + fileName + " already exists"); - } - out = fs.create(fileName); - } - - @Override - public void write(Tuple key, long offset) throws IOException { - if (firstKey == null || compartor.compare(key, firstKey) < 0) { - firstKey = key; - } - if (lastKey == null || compartor.compare(lastKey, key) < 0) { - lastKey = key; - } - - collector.put(key, offset); - } - - public TupleComparator getComparator() { - return this.compartor; - } - - public void flush() throws IOException { - out.flush(); - } - - public void writeHeader(int entryNum) throws IOException { - // schema - byte [] schemaBytes = keySchema.getProto().toByteArray(); - out.writeInt(schemaBytes.length); - out.write(schemaBytes); - - // comparator - byte [] comparatorBytes = compartor.getProto().toByteArray(); - out.writeInt(comparatorBytes.length); - out.write(comparatorBytes); - - // level - out.writeInt(this.level); - // entry - out.writeInt(entryNum); - if (entryNum > 0) { - byte [] minBytes = rowStoreEncoder.toBytes(firstKey); - out.writeInt(minBytes.length); - out.write(minBytes); - byte [] maxBytes = rowStoreEncoder.toBytes(lastKey); - out.writeInt(maxBytes.length); - out.write(maxBytes); - } - out.flush(); - } - - public void close() throws IOException { - /* two level initialize */ - if (this.level == TWO_LEVEL_INDEX) { - rootCollector = new KeyOffsetCollector(this.compartor); - } - - /* data writing phase */ - TreeMap<Tuple, LinkedList<Long>> keyOffsetMap = collector.getMap(); - Set<Tuple> keySet = keyOffsetMap.keySet(); - - int entryNum = keySet.size(); - writeHeader(entryNum); - - int loadCount = this.loadNum - 1; - for (Tuple key : keySet) { - - if (this.level == TWO_LEVEL_INDEX) { - loadCount++; - if (loadCount == this.loadNum) { - rootCollector.put(key, out.getPos()); - loadCount = 0; - } - } - /* key writing */ - byte[] buf = rowStoreEncoder.toBytes(key); - out.writeInt(buf.length); - out.write(buf); - - /**/ - LinkedList<Long> offsetList = keyOffsetMap.get(key); - /* offset num writing */ - int offsetSize = offsetList.size(); - out.writeInt(offsetSize); - /* offset writing */ - for (Long offset : offsetList) { - out.writeLong(offset); - } - } - - out.flush(); - out.close(); - keySet.clear(); - collector.clear(); - - FSDataOutputStream rootOut = null; - /* root index creating phase */ - if (this.level == TWO_LEVEL_INDEX) { - TreeMap<Tuple, LinkedList<Long>> rootMap = rootCollector.getMap(); - keySet = rootMap.keySet(); - - rootOut = fs.create(new Path(fileName + ".root")); - rootOut.writeInt(this.loadNum); - rootOut.writeInt(keySet.size()); - - /* root key writing */ - for (Tuple key : keySet) { - byte[] buf = rowStoreEncoder.toBytes(key); - rootOut.writeInt(buf.length); - rootOut.write(buf); - - LinkedList<Long> offsetList = rootMap.get(key); - if (offsetList.size() > 1 || offsetList.size() == 0) { - throw new IOException("Why root index doen't have one offset?"); - } - rootOut.writeLong(offsetList.getFirst()); - - } - rootOut.flush(); - rootOut.close(); - - keySet.clear(); - rootCollector.clear(); - } - } - - private class KeyOffsetCollector { - private TreeMap<Tuple, LinkedList<Long>> map; - - public KeyOffsetCollector(TupleComparator comparator) { - map = new TreeMap<Tuple, LinkedList<Long>>(comparator); - } - - public void put(Tuple key, long offset) { - if (map.containsKey(key)) { - map.get(key).add(offset); - } else { - LinkedList<Long> list = new LinkedList<Long>(); - list.add(offset); - map.put(key, list); - } - } - - public TreeMap<Tuple, LinkedList<Long>> getMap() { - return this.map; - } - - public void clear() { - this.map.clear(); - } - } - } - - /** - * BSTIndexReader is thread-safe. - */ - public class BSTIndexReader implements OrderIndexReader , Closeable{ - private Path fileName; - private Schema keySchema; - private TupleComparator comparator; - - private FileSystem fs; - private FSDataInputStream indexIn; - private FSDataInputStream subIn; - - private int level; - private int entryNum; - private int loadNum = -1; - private Tuple firstKey; - private Tuple lastKey; - - // the cursors of BST - private int rootCursor; - private int keyCursor; - private int offsetCursor; - - // mutex - private final Object mutex = new Object(); - - private RowStoreDecoder rowStoreDecoder; - - /** - * - * @param fileName - * @param keySchema - * @param comparator - * @throws IOException - */ - public BSTIndexReader(final Path fileName, Schema keySchema, TupleComparator comparator) throws IOException { - this.fileName = fileName; - this.keySchema = keySchema; - this.comparator = comparator; - this.rowStoreDecoder = RowStoreUtil.createDecoder(keySchema); - } - - public BSTIndexReader(final Path fileName) throws IOException { - this.fileName = fileName; - } - - public Schema getKeySchema() { - return this.keySchema; - } - - public TupleComparator getComparator() { - return this.comparator; - } - - private void readHeader() throws IOException { - // schema - int schemaByteSize = indexIn.readInt(); - byte [] schemaBytes = new byte[schemaByteSize]; - StorageUtil.readFully(indexIn, schemaBytes, 0, schemaByteSize); - - SchemaProto.Builder builder = SchemaProto.newBuilder(); - builder.mergeFrom(schemaBytes); - SchemaProto proto = builder.build(); - this.keySchema = new Schema(proto); - this.rowStoreDecoder = RowStoreUtil.createDecoder(keySchema); - - // comparator - int compByteSize = indexIn.readInt(); - byte [] compBytes = new byte[compByteSize]; - StorageUtil.readFully(indexIn, compBytes, 0, compByteSize); - - TupleComparatorProto.Builder compProto = TupleComparatorProto.newBuilder(); - compProto.mergeFrom(compBytes); - this.comparator = new BaseTupleComparator(compProto.build()); - - // level - this.level = indexIn.readInt(); - // entry - this.entryNum = indexIn.readInt(); - if (entryNum > 0) { // if there is no any entry, do not read firstKey/lastKey values - byte [] minBytes = new byte[indexIn.readInt()]; - StorageUtil.readFully(indexIn, minBytes, 0, minBytes.length); - this.firstKey = rowStoreDecoder.toTuple(minBytes); - - byte [] maxBytes = new byte[indexIn.readInt()]; - StorageUtil.readFully(indexIn, maxBytes, 0, maxBytes.length); - this.lastKey = rowStoreDecoder.toTuple(maxBytes); - } - } - - public void open() - throws IOException { - /* init the index file */ - fs = fileName.getFileSystem(conf); - if (!fs.exists(fileName)) { - throw new FileNotFoundException("ERROR: does not exist " + fileName.toString()); - } - - indexIn = fs.open(this.fileName); - readHeader(); - fillData(); - } - - private void fillData() throws IOException { - /* load on memory */ - if (this.level == TWO_LEVEL_INDEX) { - - Path rootPath = new Path(this.fileName + ".root"); - if (!fs.exists(rootPath)) { - throw new FileNotFoundException("root index did not created"); - } - - subIn = indexIn; - indexIn = fs.open(rootPath); - /* root index header reading : type => loadNum => indexSize */ - this.loadNum = indexIn.readInt(); - this.entryNum = indexIn.readInt(); - /**/ - fillRootIndex(entryNum, indexIn); - - } else { - fillLeafIndex(entryNum, indexIn, -1); - } - } - - /** - * - * @return - * @throws IOException - */ - public long find(Tuple key) throws IOException { - return find(key, false); - } - - @Override - public long find(Tuple key, boolean nextKey) throws IOException { - synchronized (mutex) { - int pos = -1; - if (this.level == ONE_LEVEL_INDEX) { - pos = oneLevBS(key); - } else if (this.level == TWO_LEVEL_INDEX) { - pos = twoLevBS(key, this.loadNum + 1); - } else { - throw new IOException("More than TWL_LEVEL_INDEX is not supported."); - } - - if (nextKey) { - if (pos + 1 >= this.offsetSubIndex.length) { - return -1; - } - keyCursor = pos + 1; - offsetCursor = 0; - } else { - if (correctable) { - keyCursor = pos; - offsetCursor = 0; - } else { - return -1; - } - } - - return this.offsetSubIndex[keyCursor][offsetCursor]; - } - } - - public long next() throws IOException { - synchronized (mutex) { - if (offsetSubIndex[keyCursor].length - 1 > offsetCursor) { - offsetCursor++; - } else { - if (offsetSubIndex.length - 1 > keyCursor) { - keyCursor++; - offsetCursor = 0; - } else { - if (offsetIndex.length -1 > rootCursor) { - rootCursor++; - fillLeafIndex(loadNum + 1, subIn, this.offsetIndex[rootCursor]); - keyCursor = 1; - offsetCursor = 0; - } else { - return -1; - } - } - } - - return this.offsetSubIndex[keyCursor][offsetCursor]; - } - } - - public boolean isCurInMemory() { - return (offsetSubIndex[keyCursor].length - 1 >= offsetCursor); - } - - private void fillLeafIndex(int entryNum, FSDataInputStream in, long pos) - throws IOException { - int counter = 0; - try { - if (pos != -1) { - in.seek(pos); - } - this.dataSubIndex = new Tuple[entryNum]; - this.offsetSubIndex = new long[entryNum][]; - - byte[] buf; - for (int i = 0; i < entryNum; i++) { - counter++; - buf = new byte[in.readInt()]; - StorageUtil.readFully(in, buf, 0, buf.length); - dataSubIndex[i] = rowStoreDecoder.toTuple(buf); - - int offsetNum = in.readInt(); - this.offsetSubIndex[i] = new long[offsetNum]; - for (int j = 0; j < offsetNum; j++) { - this.offsetSubIndex[i][j] = in.readLong(); - } - - } - - } catch (IOException e) { - counter--; - if (pos != -1) { - in.seek(pos); - } - this.dataSubIndex = new Tuple[counter]; - this.offsetSubIndex = new long[counter][]; - - byte[] buf; - for (int i = 0; i < counter; i++) { - buf = new byte[in.readInt()]; - StorageUtil.readFully(in, buf, 0, buf.length); - dataSubIndex[i] = rowStoreDecoder.toTuple(buf); - - int offsetNum = in.readInt(); - this.offsetSubIndex[i] = new long[offsetNum]; - for (int j = 0; j < offsetNum; j++) { - this.offsetSubIndex[i][j] = in.readLong(); - } - - } - } - } - - public Tuple getFirstKey() { - return this.firstKey; - } - - public Tuple getLastKey() { - return this.lastKey; - } - - private void fillRootIndex(int entryNum, FSDataInputStream in) - throws IOException { - this.dataIndex = new Tuple[entryNum]; - this.offsetIndex = new long[entryNum]; - Tuple keyTuple; - byte[] buf; - for (int i = 0; i < entryNum; i++) { - buf = new byte[in.readInt()]; - StorageUtil.readFully(in, buf, 0, buf.length); - keyTuple = rowStoreDecoder.toTuple(buf); - dataIndex[i] = keyTuple; - this.offsetIndex[i] = in.readLong(); - } - } - - /* memory index, only one is used. */ - private Tuple[] dataIndex = null; - private Tuple[] dataSubIndex = null; - - /* offset index */ - private long[] offsetIndex = null; - private long[][] offsetSubIndex = null; - - private boolean correctable = true; - - private int oneLevBS(Tuple key) throws IOException { - correctable = true; - int pos = binarySearch(this.dataSubIndex, key, 0, this.dataSubIndex.length); - return pos; - } - - private int twoLevBS(Tuple key, int loadNum) throws IOException { - int pos = binarySearch(this.dataIndex, key, 0, this.dataIndex.length); - if(pos > 0) { - rootCursor = pos; - } else { - rootCursor = 0; - } - fillLeafIndex(loadNum, subIn, this.offsetIndex[rootCursor]); - pos = binarySearch(this.dataSubIndex, key, 0, this.dataSubIndex.length); - - return pos; - } - - private int binarySearch(Tuple[] arr, Tuple key, int startPos, int endPos) { - int offset = -1; - int start = startPos; - int end = endPos; - - //http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6412541 - int centerPos = (start + end) >>> 1; - while (true) { - if (comparator.compare(arr[centerPos], key) > 0) { - if (centerPos == 0) { - correctable = false; - break; - } else if (comparator.compare(arr[centerPos - 1], key) < 0) { - correctable = false; - offset = centerPos - 1; - break; - } else { - end = centerPos; - centerPos = (start + end) / 2; - } - } else if (comparator.compare(arr[centerPos], key) < 0) { - if (centerPos == arr.length - 1) { - correctable = false; - offset = centerPos; - break; - } else if (comparator.compare(arr[centerPos + 1], key) > 0) { - correctable = false; - offset = centerPos; - break; - } else { - start = centerPos + 1; - centerPos = (start + end) / 2; - } - } else { - correctable = true; - offset = centerPos; - break; - } - } - return offset; - } - - @Override - public void close() throws IOException { - this.indexIn.close(); - this.subIn.close(); - } - - @Override - public String toString() { - return "BSTIndex (" + firstKey + ", " + lastKey + ") " + fileName; - } - } -}
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/ParquetAppender.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/ParquetAppender.java b/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/ParquetAppender.java deleted file mode 100644 index b10d423..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/ParquetAppender.java +++ /dev/null @@ -1,151 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage.parquet; - -import org.apache.tajo.QueryUnitAttemptId; -import org.apache.tajo.storage.StorageConstants; -import parquet.hadoop.ParquetOutputFormat; -import parquet.hadoop.metadata.CompressionCodecName; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.tajo.catalog.Schema; -import org.apache.tajo.catalog.TableMeta; -import org.apache.tajo.catalog.statistics.TableStats; -import org.apache.tajo.storage.FileAppender; -import org.apache.tajo.storage.TableStatistics; -import org.apache.tajo.storage.Tuple; - -import java.io.IOException; - -/** - * FileAppender for writing to Parquet files. - */ -public class ParquetAppender extends FileAppender { - private TajoParquetWriter writer; - private int blockSize; - private int pageSize; - private CompressionCodecName compressionCodecName; - private boolean enableDictionary; - private boolean validating; - private TableStatistics stats; - - /** - * Creates a new ParquetAppender. - * - * @param conf Configuration properties. - * @param schema The table schema. - * @param meta The table metadata. - * @param workDir The path of the Parquet file to write to. - */ - public ParquetAppender(Configuration conf, QueryUnitAttemptId taskAttemptId, Schema schema, TableMeta meta, - Path workDir) throws IOException { - super(conf, taskAttemptId, schema, meta, workDir); - this.blockSize = Integer.parseInt( - meta.getOption(ParquetOutputFormat.BLOCK_SIZE, StorageConstants.PARQUET_DEFAULT_BLOCK_SIZE)); - this.pageSize = Integer.parseInt( - meta.getOption(ParquetOutputFormat.PAGE_SIZE, StorageConstants.PARQUET_DEFAULT_PAGE_SIZE)); - this.compressionCodecName = CompressionCodecName.fromConf( - meta.getOption(ParquetOutputFormat.COMPRESSION, StorageConstants.PARQUET_DEFAULT_COMPRESSION_CODEC_NAME)); - this.enableDictionary = Boolean.parseBoolean( - meta.getOption(ParquetOutputFormat.ENABLE_DICTIONARY, StorageConstants.PARQUET_DEFAULT_IS_DICTIONARY_ENABLED)); - this.validating = Boolean.parseBoolean( - meta.getOption(ParquetOutputFormat.VALIDATION, StorageConstants.PARQUET_DEFAULT_IS_VALIDATION_ENABLED)); - } - - /** - * Initializes the Appender. This method creates a new TajoParquetWriter - * and initializes the table statistics if enabled. - */ - public void init() throws IOException { - writer = new TajoParquetWriter(path, - schema, - compressionCodecName, - blockSize, - pageSize, - enableDictionary, - validating); - if (enabledStats) { - this.stats = new TableStatistics(schema); - } - super.init(); - } - - /** - * Gets the current offset. Tracking offsets is currenly not implemented, so - * this method always returns 0. - * - * @return 0 - */ - @Override - public long getOffset() throws IOException { - return 0; - } - - /** - * Write a Tuple to the Parquet file. - * - * @param tuple The Tuple to write. - */ - @Override - public void addTuple(Tuple tuple) throws IOException { - if (enabledStats) { - for (int i = 0; i < schema.size(); ++i) { - stats.analyzeField(i, tuple.get(i)); - } - } - writer.write(tuple); - if (enabledStats) { - stats.incrementRow(); - } - } - - /** - * The ParquetWriter does not need to be flushed, so this is a no-op. - */ - @Override - public void flush() throws IOException { - } - - /** - * Closes the Appender. - */ - @Override - public void close() throws IOException { - writer.close(); - } - - public long getEstimatedOutputSize() throws IOException { - return writer.getEstimatedWrittenSize(); - } - - /** - * If table statistics is enabled, retrieve the table statistics. - * - * @return Table statistics if enabled or null otherwise. - */ - @Override - public TableStats getStats() { - if (enabledStats) { - return stats.getTableStat(); - } else { - return null; - } - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/ParquetScanner.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/ParquetScanner.java b/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/ParquetScanner.java deleted file mode 100644 index 2f8efcf..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/ParquetScanner.java +++ /dev/null @@ -1,119 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage.parquet; - -import org.apache.hadoop.conf.Configuration; -import org.apache.tajo.catalog.Schema; -import org.apache.tajo.catalog.TableMeta; -import org.apache.tajo.storage.FileScanner; -import org.apache.tajo.storage.Tuple; -import org.apache.tajo.storage.fragment.Fragment; - -import java.io.IOException; - -/** - * FileScanner for reading Parquet files - */ -public class ParquetScanner extends FileScanner { - private TajoParquetReader reader; - - /** - * Creates a new ParquetScanner. - * - * @param conf - * @param schema - * @param meta - * @param fragment - */ - public ParquetScanner(Configuration conf, final Schema schema, - final TableMeta meta, final Fragment fragment) { - super(conf, schema, meta, fragment); - } - - /** - * Initializes the ParquetScanner. This method initializes the - * TajoParquetReader. - */ - @Override - public void init() throws IOException { - if (targets == null) { - targets = schema.toArray(); - } - reader = new TajoParquetReader(fragment.getPath(), schema, new Schema(targets)); - super.init(); - } - - /** - * Reads the next Tuple from the Parquet file. - * - * @return The next Tuple from the Parquet file or null if end of file is - * reached. - */ - @Override - public Tuple next() throws IOException { - return reader.read(); - } - - /** - * Resets the scanner - */ - @Override - public void reset() throws IOException { - } - - /** - * Closes the scanner. - */ - @Override - public void close() throws IOException { - if (reader != null) { - reader.close(); - } - } - - /** - * Returns whether this scanner is projectable. - * - * @return true - */ - @Override - public boolean isProjectable() { - return true; - } - - /** - * Returns whether this scanner is selectable. - * - * @return false - */ - @Override - public boolean isSelectable() { - return false; - } - - /** - * Returns whether this scanner is splittable. - * - * @return false - */ - @Override - public boolean isSplittable() { - return false; - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoParquetReader.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoParquetReader.java b/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoParquetReader.java deleted file mode 100644 index a765f48..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoParquetReader.java +++ /dev/null @@ -1,85 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage.parquet; - -import org.apache.hadoop.fs.Path; -import org.apache.tajo.catalog.Schema; -import org.apache.tajo.storage.Tuple; -import org.apache.tajo.storage.thirdparty.parquet.ParquetReader; -import parquet.filter.UnboundRecordFilter; - -import java.io.IOException; - -/** - * Tajo implementation of {@link ParquetReader} to read Tajo records from a - * Parquet file. Users should use {@link ParquetScanner} and not this class - * directly. - */ -public class TajoParquetReader extends ParquetReader<Tuple> { - /** - * Creates a new TajoParquetReader. - * - * @param file The file to read from. - * @param readSchema Tajo schema of the table. - */ - public TajoParquetReader(Path file, Schema readSchema) throws IOException { - super(file, new TajoReadSupport(readSchema)); - } - - /** - * Creates a new TajoParquetReader. - * - * @param file The file to read from. - * @param readSchema Tajo schema of the table. - * @param requestedSchema Tajo schema of the projection. - */ - public TajoParquetReader(Path file, Schema readSchema, - Schema requestedSchema) throws IOException { - super(file, new TajoReadSupport(readSchema, requestedSchema)); - } - - /** - * Creates a new TajoParquetReader. - * - * @param file The file to read from. - * @param readSchema Tajo schema of the table. - * @param recordFilter Record filter. - */ - public TajoParquetReader(Path file, Schema readSchema, - UnboundRecordFilter recordFilter) - throws IOException { - super(file, new TajoReadSupport(readSchema), recordFilter); - } - - /** - * Creates a new TajoParquetReader. - * - * @param file The file to read from. - * @param readSchema Tajo schema of the table. - * @param requestedSchema Tajo schema of the projection. - * @param recordFilter Record filter. - */ - public TajoParquetReader(Path file, Schema readSchema, - Schema requestedSchema, - UnboundRecordFilter recordFilter) - throws IOException { - super(file, new TajoReadSupport(readSchema, requestedSchema), - recordFilter); - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoParquetWriter.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoParquetWriter.java b/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoParquetWriter.java deleted file mode 100644 index 69b76c4..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoParquetWriter.java +++ /dev/null @@ -1,104 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage.parquet; - -import org.apache.hadoop.fs.Path; -import org.apache.tajo.catalog.Schema; -import org.apache.tajo.storage.Tuple; -import org.apache.tajo.storage.thirdparty.parquet.ParquetWriter; -import parquet.hadoop.metadata.CompressionCodecName; - -import java.io.IOException; - -/** - * Tajo implementation of {@link ParquetWriter} to write Tajo records to a - * Parquet file. Users should use {@link ParquetAppender} and not this class - * directly. - */ -public class TajoParquetWriter extends ParquetWriter<Tuple> { - /** - * Create a new TajoParquetWriter - * - * @param file The file name to write to. - * @param schema The Tajo schema of the table. - * @param compressionCodecName Compression codec to use, or - * CompressionCodecName.UNCOMPRESSED. - * @param blockSize The block size threshold. - * @param pageSize See parquet write up. Blocks are subdivided into pages - * for alignment. - * @throws IOException - */ - public TajoParquetWriter(Path file, - Schema schema, - CompressionCodecName compressionCodecName, - int blockSize, - int pageSize) throws IOException { - super(file, - new TajoWriteSupport(schema), - compressionCodecName, - blockSize, - pageSize); - } - - /** - * Create a new TajoParquetWriter. - * - * @param file The file name to write to. - * @param schema The Tajo schema of the table. - * @param compressionCodecName Compression codec to use, or - * CompressionCodecName.UNCOMPRESSED. - * @param blockSize The block size threshold. - * @param pageSize See parquet write up. Blocks are subdivided into pages - * for alignment. - * @param enableDictionary Whether to use a dictionary to compress columns. - * @param validating Whether to turn on validation. - * @throws IOException - */ - public TajoParquetWriter(Path file, - Schema schema, - CompressionCodecName compressionCodecName, - int blockSize, - int pageSize, - boolean enableDictionary, - boolean validating) throws IOException { - super(file, - new TajoWriteSupport(schema), - compressionCodecName, - blockSize, - pageSize, - enableDictionary, - validating); - } - - /** - * Creates a new TajoParquetWriter. The default block size is 128 MB. - * The default page size is 1 MB. Default compression is no compression. - * - * @param file The Path of the file to write to. - * @param schema The Tajo schema of the table. - * @throws IOException - */ - public TajoParquetWriter(Path file, Schema schema) throws IOException { - this(file, - schema, - CompressionCodecName.UNCOMPRESSED, - DEFAULT_BLOCK_SIZE, - DEFAULT_PAGE_SIZE); - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoReadSupport.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoReadSupport.java b/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoReadSupport.java deleted file mode 100644 index 269f782..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoReadSupport.java +++ /dev/null @@ -1,101 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage.parquet; - -import java.util.Map; - -import parquet.Log; -import parquet.hadoop.api.InitContext; -import parquet.hadoop.api.ReadSupport; -import parquet.io.api.RecordMaterializer; -import parquet.schema.MessageType; - -import org.apache.hadoop.conf.Configuration; -import org.apache.tajo.catalog.Schema; -import org.apache.tajo.catalog.json.CatalogGsonHelper; -import org.apache.tajo.storage.Tuple; - -/** - * Tajo implementation of {@link ReadSupport} for {@link Tuple}s. - * Users should use {@link ParquetScanner} and not this class directly. - */ -public class TajoReadSupport extends ReadSupport<Tuple> { - private static final Log LOG = Log.getLog(TajoReadSupport.class); - - private Schema readSchema; - private Schema requestedSchema; - - /** - * Creates a new TajoReadSupport. - * - * @param requestedSchema The Tajo schema of the requested projection passed - * down by ParquetScanner. - */ - public TajoReadSupport(Schema readSchema, Schema requestedSchema) { - super(); - this.readSchema = readSchema; - this.requestedSchema = requestedSchema; - } - - /** - * Creates a new TajoReadSupport. - * - * @param readSchema The schema of the table. - */ - public TajoReadSupport(Schema readSchema) { - super(); - this.readSchema = readSchema; - this.requestedSchema = readSchema; - } - - /** - * Initializes the ReadSupport. - * - * @param context The InitContext. - * @return A ReadContext that defines how to read the file. - */ - @Override - public ReadContext init(InitContext context) { - if (requestedSchema == null) { - throw new RuntimeException("requestedSchema is null."); - } - MessageType requestedParquetSchema = - new TajoSchemaConverter().convert(requestedSchema); - LOG.debug("Reading data with projection:\n" + requestedParquetSchema); - return new ReadContext(requestedParquetSchema); - } - - /** - * Prepares for read. - * - * @param configuration The job configuration. - * @param keyValueMetaData App-specific metadata from the file. - * @param fileSchema The schema of the Parquet file. - * @param readContext Returned by the init method. - */ - @Override - public RecordMaterializer<Tuple> prepareForRead( - Configuration configuration, - Map<String, String> keyValueMetaData, - MessageType fileSchema, - ReadContext readContext) { - MessageType parquetRequestedSchema = readContext.getRequestedSchema(); - return new TajoRecordMaterializer(parquetRequestedSchema, requestedSchema, readSchema); - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoRecordConverter.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoRecordConverter.java b/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoRecordConverter.java deleted file mode 100644 index 7c3d79d..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoRecordConverter.java +++ /dev/null @@ -1,386 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage.parquet; - -import com.google.protobuf.Message; -import com.google.protobuf.InvalidProtocolBufferException; - -import java.nio.ByteBuffer; - -import parquet.io.api.GroupConverter; -import parquet.io.api.Converter; -import parquet.io.api.PrimitiveConverter; -import parquet.io.api.Binary; -import parquet.schema.Type; -import parquet.schema.GroupType; - -import org.apache.tajo.common.TajoDataTypes; -import org.apache.tajo.common.TajoDataTypes.DataType; -import org.apache.tajo.catalog.Schema; -import org.apache.tajo.catalog.Column; -import org.apache.tajo.storage.Tuple; -import org.apache.tajo.storage.VTuple; -import org.apache.tajo.datum.DatumFactory; -import org.apache.tajo.datum.Datum; -import org.apache.tajo.datum.BlobDatum; -import org.apache.tajo.datum.NullDatum; -import org.apache.tajo.datum.ProtobufDatumFactory; - -/** - * Converter to convert a Parquet record into a Tajo Tuple. - */ -public class TajoRecordConverter extends GroupConverter { - private final GroupType parquetSchema; - private final Schema tajoReadSchema; - private final int[] projectionMap; - private final int tupleSize; - - private final Converter[] converters; - - private Tuple currentTuple; - - /** - * Creates a new TajoRecordConverter. - * - * @param parquetSchema The Parquet schema of the projection. - * @param tajoReadSchema The Tajo schema of the table. - * @param projectionMap An array mapping the projection column to the column - * index in the table. - */ - public TajoRecordConverter(GroupType parquetSchema, Schema tajoReadSchema, - int[] projectionMap) { - this.parquetSchema = parquetSchema; - this.tajoReadSchema = tajoReadSchema; - this.projectionMap = projectionMap; - this.tupleSize = tajoReadSchema.size(); - - // The projectionMap.length does not match parquetSchema.getFieldCount() - // when the projection contains NULL_TYPE columns. We will skip over the - // NULL_TYPE columns when we construct the converters and populate the - // NULL_TYPE columns with NullDatums in start(). - int index = 0; - this.converters = new Converter[parquetSchema.getFieldCount()]; - for (int i = 0; i < projectionMap.length; ++i) { - final int projectionIndex = projectionMap[i]; - Column column = tajoReadSchema.getColumn(projectionIndex); - if (column.getDataType().getType() == TajoDataTypes.Type.NULL_TYPE) { - continue; - } - Type type = parquetSchema.getType(index); - converters[index] = newConverter(column, type, new ParentValueContainer() { - @Override - void add(Object value) { - TajoRecordConverter.this.set(projectionIndex, value); - } - }); - ++index; - } - } - - private void set(int index, Object value) { - currentTuple.put(index, (Datum)value); - } - - private Converter newConverter(Column column, Type type, - ParentValueContainer parent) { - DataType dataType = column.getDataType(); - switch (dataType.getType()) { - case BOOLEAN: - return new FieldBooleanConverter(parent); - case BIT: - return new FieldBitConverter(parent); - case CHAR: - return new FieldCharConverter(parent); - case INT2: - return new FieldInt2Converter(parent); - case INT4: - return new FieldInt4Converter(parent); - case INT8: - return new FieldInt8Converter(parent); - case FLOAT4: - return new FieldFloat4Converter(parent); - case FLOAT8: - return new FieldFloat8Converter(parent); - case INET4: - return new FieldInet4Converter(parent); - case INET6: - throw new RuntimeException("No converter for INET6"); - case TEXT: - return new FieldTextConverter(parent); - case PROTOBUF: - return new FieldProtobufConverter(parent, dataType); - case BLOB: - return new FieldBlobConverter(parent); - case NULL_TYPE: - throw new RuntimeException("No converter for NULL_TYPE."); - default: - throw new RuntimeException("Unsupported data type"); - } - } - - /** - * Gets the converter for a specific field. - * - * @param fieldIndex Index of the field in the projection. - * @return The converter for the field. - */ - @Override - public Converter getConverter(int fieldIndex) { - return converters[fieldIndex]; - } - - /** - * Called before processing fields. This method fills any fields that have - * NULL values or have type NULL_TYPE with a NullDatum. - */ - @Override - public void start() { - currentTuple = new VTuple(tupleSize); - } - - /** - * Called after all fields have been processed. - */ - @Override - public void end() { - for (int i = 0; i < projectionMap.length; ++i) { - final int projectionIndex = projectionMap[i]; - Column column = tajoReadSchema.getColumn(projectionIndex); - if (column.getDataType().getType() == TajoDataTypes.Type.NULL_TYPE - || currentTuple.get(projectionIndex) == null) { - set(projectionIndex, NullDatum.get()); - } - } - } - - /** - * Returns the current record converted by this converter. - * - * @return The current record. - */ - public Tuple getCurrentRecord() { - return currentTuple; - } - - static abstract class ParentValueContainer { - /** - * Adds the value to the parent. - * - * @param value The value to add. - */ - abstract void add(Object value); - } - - static final class FieldBooleanConverter extends PrimitiveConverter { - private final ParentValueContainer parent; - - public FieldBooleanConverter(ParentValueContainer parent) { - this.parent = parent; - } - - @Override - final public void addBoolean(boolean value) { - parent.add(DatumFactory.createBool(value)); - } - } - - static final class FieldBitConverter extends PrimitiveConverter { - private final ParentValueContainer parent; - - public FieldBitConverter(ParentValueContainer parent) { - this.parent = parent; - } - - @Override - final public void addInt(int value) { - parent.add(DatumFactory.createBit((byte)(value & 0xff))); - } - } - - static final class FieldCharConverter extends PrimitiveConverter { - private final ParentValueContainer parent; - - public FieldCharConverter(ParentValueContainer parent) { - this.parent = parent; - } - - @Override - final public void addBinary(Binary value) { - parent.add(DatumFactory.createChar(value.toStringUsingUTF8())); - } - } - - static final class FieldInt2Converter extends PrimitiveConverter { - private final ParentValueContainer parent; - - public FieldInt2Converter(ParentValueContainer parent) { - this.parent = parent; - } - - @Override - final public void addInt(int value) { - parent.add(DatumFactory.createInt2((short)value)); - } - } - - static final class FieldInt4Converter extends PrimitiveConverter { - private final ParentValueContainer parent; - - public FieldInt4Converter(ParentValueContainer parent) { - this.parent = parent; - } - - @Override - final public void addInt(int value) { - parent.add(DatumFactory.createInt4(value)); - } - } - - static final class FieldInt8Converter extends PrimitiveConverter { - private final ParentValueContainer parent; - - public FieldInt8Converter(ParentValueContainer parent) { - this.parent = parent; - } - - @Override - final public void addLong(long value) { - parent.add(DatumFactory.createInt8(value)); - } - - @Override - final public void addInt(int value) { - parent.add(DatumFactory.createInt8(Long.valueOf(value))); - } - } - - static final class FieldFloat4Converter extends PrimitiveConverter { - private final ParentValueContainer parent; - - public FieldFloat4Converter(ParentValueContainer parent) { - this.parent = parent; - } - - @Override - final public void addInt(int value) { - parent.add(DatumFactory.createFloat4(Float.valueOf(value))); - } - - @Override - final public void addLong(long value) { - parent.add(DatumFactory.createFloat4(Float.valueOf(value))); - } - - @Override - final public void addFloat(float value) { - parent.add(DatumFactory.createFloat4(value)); - } - } - - static final class FieldFloat8Converter extends PrimitiveConverter { - private final ParentValueContainer parent; - - public FieldFloat8Converter(ParentValueContainer parent) { - this.parent = parent; - } - - @Override - final public void addInt(int value) { - parent.add(DatumFactory.createFloat8(Double.valueOf(value))); - } - - @Override - final public void addLong(long value) { - parent.add(DatumFactory.createFloat8(Double.valueOf(value))); - } - - @Override - final public void addFloat(float value) { - parent.add(DatumFactory.createFloat8(Double.valueOf(value))); - } - - @Override - final public void addDouble(double value) { - parent.add(DatumFactory.createFloat8(value)); - } - } - - static final class FieldInet4Converter extends PrimitiveConverter { - private final ParentValueContainer parent; - - public FieldInet4Converter(ParentValueContainer parent) { - this.parent = parent; - } - - @Override - final public void addBinary(Binary value) { - parent.add(DatumFactory.createInet4(value.getBytes())); - } - } - - static final class FieldTextConverter extends PrimitiveConverter { - private final ParentValueContainer parent; - - public FieldTextConverter(ParentValueContainer parent) { - this.parent = parent; - } - - @Override - final public void addBinary(Binary value) { - parent.add(DatumFactory.createText(value.toStringUsingUTF8())); - } - } - - static final class FieldBlobConverter extends PrimitiveConverter { - private final ParentValueContainer parent; - - public FieldBlobConverter(ParentValueContainer parent) { - this.parent = parent; - } - - @Override - final public void addBinary(Binary value) { - parent.add(new BlobDatum(ByteBuffer.wrap(value.getBytes()))); - } - } - - static final class FieldProtobufConverter extends PrimitiveConverter { - private final ParentValueContainer parent; - private final DataType dataType; - - public FieldProtobufConverter(ParentValueContainer parent, - DataType dataType) { - this.parent = parent; - this.dataType = dataType; - } - - @Override - final public void addBinary(Binary value) { - try { - ProtobufDatumFactory factory = - ProtobufDatumFactory.get(dataType.getCode()); - Message.Builder builder = factory.newBuilder(); - builder.mergeFrom(value.getBytes()); - parent.add(factory.createDatum(builder)); - } catch (InvalidProtocolBufferException e) { - throw new RuntimeException(e); - } - } - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoRecordMaterializer.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoRecordMaterializer.java b/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoRecordMaterializer.java deleted file mode 100644 index e31828c..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoRecordMaterializer.java +++ /dev/null @@ -1,78 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage.parquet; - -import parquet.io.api.GroupConverter; -import parquet.io.api.RecordMaterializer; -import parquet.schema.MessageType; - -import org.apache.tajo.catalog.Schema; -import org.apache.tajo.catalog.Column; -import org.apache.tajo.storage.Tuple; - -/** - * Materializes a Tajo Tuple from a stream of Parquet data. - */ -class TajoRecordMaterializer extends RecordMaterializer<Tuple> { - private final TajoRecordConverter root; - - /** - * Creates a new TajoRecordMaterializer. - * - * @param parquetSchema The Parquet schema of the projection. - * @param tajoSchema The Tajo schema of the projection. - * @param tajoReadSchema The Tajo schema of the table. - */ - public TajoRecordMaterializer(MessageType parquetSchema, Schema tajoSchema, - Schema tajoReadSchema) { - int[] projectionMap = getProjectionMap(tajoReadSchema, tajoSchema); - this.root = new TajoRecordConverter(parquetSchema, tajoReadSchema, - projectionMap); - } - - private int[] getProjectionMap(Schema schema, Schema projection) { - Column[] targets = projection.toArray(); - int[] projectionMap = new int[targets.length]; - for (int i = 0; i < targets.length; ++i) { - int tid = schema.getColumnId(targets[i].getQualifiedName()); - projectionMap[i] = tid; - } - return projectionMap; - } - - /** - * Returns the current record being materialized. - * - * @return The record being materialized. - */ - @Override - public Tuple getCurrentRecord() { - return root.getCurrentRecord(); - } - - /** - * Returns the root converter. - * - * @return The root converter - */ - @Override - public GroupConverter getRootConverter() { - return root; - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoSchemaConverter.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoSchemaConverter.java b/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoSchemaConverter.java deleted file mode 100644 index 2592231..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoSchemaConverter.java +++ /dev/null @@ -1,206 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage.parquet; - -import org.apache.tajo.catalog.Column; -import org.apache.tajo.catalog.Schema; -import org.apache.tajo.common.TajoDataTypes; -import parquet.schema.MessageType; -import parquet.schema.OriginalType; -import parquet.schema.PrimitiveType; -import parquet.schema.PrimitiveType.PrimitiveTypeName; -import parquet.schema.Type; - -import java.util.ArrayList; -import java.util.List; - -/** - * Converts between Parquet and Tajo schemas. See package documentation for - * details on the mapping. - */ -public class TajoSchemaConverter { - private static final String TABLE_SCHEMA = "table_schema"; - - /** - * Creates a new TajoSchemaConverter. - */ - public TajoSchemaConverter() { - } - - /** - * Converts a Parquet schema to a Tajo schema. - * - * @param parquetSchema The Parquet schema to convert. - * @return The resulting Tajo schema. - */ - public Schema convert(MessageType parquetSchema) { - return convertFields(parquetSchema.getFields()); - } - - private Schema convertFields(List<Type> parquetFields) { - List<Column> columns = new ArrayList<Column>(); - for (int i = 0; i < parquetFields.size(); ++i) { - Type fieldType = parquetFields.get(i); - if (fieldType.isRepetition(Type.Repetition.REPEATED)) { - throw new RuntimeException("REPEATED not supported outside LIST or" + - " MAP. Type: " + fieldType); - } - columns.add(convertField(fieldType)); - } - Column[] columnsArray = new Column[columns.size()]; - columnsArray = columns.toArray(columnsArray); - return new Schema(columnsArray); - } - - private Column convertField(final Type fieldType) { - if (fieldType.isPrimitive()) { - return convertPrimitiveField(fieldType); - } else { - return convertComplexField(fieldType); - } - } - - private Column convertPrimitiveField(final Type fieldType) { - final String fieldName = fieldType.getName(); - final PrimitiveTypeName parquetPrimitiveTypeName = - fieldType.asPrimitiveType().getPrimitiveTypeName(); - final OriginalType originalType = fieldType.getOriginalType(); - return parquetPrimitiveTypeName.convert( - new PrimitiveType.PrimitiveTypeNameConverter<Column, RuntimeException>() { - @Override - public Column convertBOOLEAN(PrimitiveTypeName primitiveTypeName) { - return new Column(fieldName, TajoDataTypes.Type.BOOLEAN); - } - - @Override - public Column convertINT32(PrimitiveTypeName primitiveTypeName) { - return new Column(fieldName, TajoDataTypes.Type.INT4); - } - - @Override - public Column convertINT64(PrimitiveTypeName primitiveTypeName) { - return new Column(fieldName, TajoDataTypes.Type.INT8); - } - - @Override - public Column convertFLOAT(PrimitiveTypeName primitiveTypeName) { - return new Column(fieldName, TajoDataTypes.Type.FLOAT4); - } - - @Override - public Column convertDOUBLE(PrimitiveTypeName primitiveTypeName) { - return new Column(fieldName, TajoDataTypes.Type.FLOAT8); - } - - @Override - public Column convertFIXED_LEN_BYTE_ARRAY( - PrimitiveTypeName primitiveTypeName) { - return new Column(fieldName, TajoDataTypes.Type.BLOB); - } - - @Override - public Column convertBINARY(PrimitiveTypeName primitiveTypeName) { - if (originalType == OriginalType.UTF8) { - return new Column(fieldName, TajoDataTypes.Type.TEXT); - } else { - return new Column(fieldName, TajoDataTypes.Type.BLOB); - } - } - - @Override - public Column convertINT96(PrimitiveTypeName primitiveTypeName) { - throw new RuntimeException("Converting from INT96 not supported."); - } - }); - } - - private Column convertComplexField(final Type fieldType) { - throw new RuntimeException("Complex types not supported."); - } - - /** - * Converts a Tajo schema to a Parquet schema. - * - * @param tajoSchema The Tajo schema to convert. - * @return The resulting Parquet schema. - */ - public MessageType convert(Schema tajoSchema) { - List<Type> types = new ArrayList<Type>(); - for (int i = 0; i < tajoSchema.size(); ++i) { - Column column = tajoSchema.getColumn(i); - if (column.getDataType().getType() == TajoDataTypes.Type.NULL_TYPE) { - continue; - } - types.add(convertColumn(column)); - } - return new MessageType(TABLE_SCHEMA, types); - } - - private Type convertColumn(Column column) { - TajoDataTypes.Type type = column.getDataType().getType(); - switch (type) { - case BOOLEAN: - return primitive(column.getSimpleName(), - PrimitiveType.PrimitiveTypeName.BOOLEAN); - case BIT: - case INT2: - case INT4: - return primitive(column.getSimpleName(), - PrimitiveType.PrimitiveTypeName.INT32); - case INT8: - return primitive(column.getSimpleName(), - PrimitiveType.PrimitiveTypeName.INT64); - case FLOAT4: - return primitive(column.getSimpleName(), - PrimitiveType.PrimitiveTypeName.FLOAT); - case FLOAT8: - return primitive(column.getSimpleName(), - PrimitiveType.PrimitiveTypeName.DOUBLE); - case CHAR: - case TEXT: - return primitive(column.getSimpleName(), - PrimitiveType.PrimitiveTypeName.BINARY, - OriginalType.UTF8); - case PROTOBUF: - return primitive(column.getSimpleName(), - PrimitiveType.PrimitiveTypeName.BINARY); - case BLOB: - return primitive(column.getSimpleName(), - PrimitiveType.PrimitiveTypeName.BINARY); - case INET4: - case INET6: - return primitive(column.getSimpleName(), - PrimitiveType.PrimitiveTypeName.BINARY); - default: - throw new RuntimeException("Cannot convert Tajo type: " + type); - } - } - - private PrimitiveType primitive(String name, - PrimitiveType.PrimitiveTypeName primitive) { - return new PrimitiveType(Type.Repetition.OPTIONAL, primitive, name, null); - } - - private PrimitiveType primitive(String name, - PrimitiveType.PrimitiveTypeName primitive, - OriginalType originalType) { - return new PrimitiveType(Type.Repetition.OPTIONAL, primitive, name, - originalType); - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoWriteSupport.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoWriteSupport.java b/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoWriteSupport.java deleted file mode 100644 index 35165de..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoWriteSupport.java +++ /dev/null @@ -1,149 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage.parquet; - -import java.util.Map; -import java.util.HashMap; -import java.util.List; - -import parquet.hadoop.api.WriteSupport; -import parquet.io.api.Binary; -import parquet.io.api.RecordConsumer; -import parquet.schema.GroupType; -import parquet.schema.MessageType; -import parquet.schema.Type; - -import org.apache.hadoop.conf.Configuration; -import org.apache.tajo.catalog.Schema; -import org.apache.tajo.catalog.Column; -import org.apache.tajo.common.TajoDataTypes; -import org.apache.tajo.storage.Tuple; -import org.apache.tajo.datum.Datum; - -/** - * Tajo implementation of {@link WriteSupport} for {@link Tuple}s. - * Users should use {@link ParquetAppender} and not this class directly. - */ -public class TajoWriteSupport extends WriteSupport<Tuple> { - private RecordConsumer recordConsumer; - private MessageType rootSchema; - private Schema rootTajoSchema; - - /** - * Creates a new TajoWriteSupport. - * - * @param tajoSchema The Tajo schema for the table. - */ - public TajoWriteSupport(Schema tajoSchema) { - this.rootSchema = new TajoSchemaConverter().convert(tajoSchema); - this.rootTajoSchema = tajoSchema; - } - - /** - * Initializes the WriteSupport. - * - * @param configuration The job's configuration. - * @return A WriteContext that describes how to write the file. - */ - @Override - public WriteContext init(Configuration configuration) { - Map<String, String> extraMetaData = new HashMap<String, String>(); - return new WriteContext(rootSchema, extraMetaData); - } - - /** - * Called once per row group. - * - * @param recordConsumer The {@link RecordConsumer} to write to. - */ - @Override - public void prepareForWrite(RecordConsumer recordConsumer) { - this.recordConsumer = recordConsumer; - } - - /** - * Writes a Tuple to the file. - * - * @param tuple The Tuple to write to the file. - */ - @Override - public void write(Tuple tuple) { - recordConsumer.startMessage(); - writeRecordFields(rootSchema, rootTajoSchema, tuple); - recordConsumer.endMessage(); - } - - private void writeRecordFields(GroupType schema, Schema tajoSchema, - Tuple tuple) { - List<Type> fields = schema.getFields(); - // Parquet ignores Tajo NULL_TYPE columns, so the index may differ. - int index = 0; - for (int tajoIndex = 0; tajoIndex < tajoSchema.size(); ++tajoIndex) { - Column column = tajoSchema.getColumn(tajoIndex); - if (column.getDataType().getType() == TajoDataTypes.Type.NULL_TYPE) { - continue; - } - Datum datum = tuple.get(tajoIndex); - Type fieldType = fields.get(index); - if (!tuple.isNull(tajoIndex)) { - recordConsumer.startField(fieldType.getName(), index); - writeValue(fieldType, column, datum); - recordConsumer.endField(fieldType.getName(), index); - } else if (fieldType.isRepetition(Type.Repetition.REQUIRED)) { - throw new RuntimeException("Null-value for required field: " + - column.getSimpleName()); - } - ++index; - } - } - - private void writeValue(Type fieldType, Column column, Datum datum) { - switch (column.getDataType().getType()) { - case BOOLEAN: - recordConsumer.addBoolean((Boolean) datum.asBool()); - break; - case BIT: - case INT2: - case INT4: - recordConsumer.addInteger(datum.asInt4()); - break; - case INT8: - recordConsumer.addLong(datum.asInt8()); - break; - case FLOAT4: - recordConsumer.addFloat(datum.asFloat4()); - break; - case FLOAT8: - recordConsumer.addDouble(datum.asFloat8()); - break; - case CHAR: - case TEXT: - recordConsumer.addBinary(Binary.fromString(datum.asChars())); - break; - case PROTOBUF: - case BLOB: - case INET4: - case INET6: - recordConsumer.addBinary(Binary.fromByteArray(datum.asByteArray())); - break; - default: - break; - } - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/package-info.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/package-info.java b/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/package-info.java deleted file mode 100644 index d7d16b7..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/package-info.java +++ /dev/null @@ -1,96 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -/** - * <p> - * Provides read and write support for Parquet files. Tajo schemas are - * converted to Parquet schemas according to the following mapping of Tajo - * and Parquet types: - * </p> - * - * <table> - * <tr> - * <th>Tajo type</th> - * <th>Parquet type</th> - * </tr> - * <tr> - * <td>NULL_TYPE</td> - * <td>No type. The field is not encoded in Parquet.</td> - * </tr> - * <tr> - * <td>BOOLEAN</td> - * <td>BOOLEAN</td> - * </tr> - * <tr> - * <td>BIT</td> - * <td>INT32</td> - * </tr> - * <tr> - * <td>INT2</td> - * <td>INT32</td> - * </tr> - * <tr> - * <td>INT4</td> - * <td>INT32</td> - * </tr> - * <tr> - * <td>INT8</td> - * <td>INT64</td> - * </tr> - * <tr> - * <td>FLOAT4</td> - * <td>FLOAT</td> - * </tr> - * <tr> - * <td>FLOAT8</td> - * <td>DOUBLE</td> - * </tr> - * <tr> - * <td>CHAR</td> - * <td>BINARY (with OriginalType UTF8)</td> - * </tr> - * <tr> - * <td>TEXT</td> - * <td>BINARY (with OriginalType UTF8)</td> - * </tr> - * <tr> - * <td>PROTOBUF</td> - * <td>BINARY</td> - * </tr> - * <tr> - * <td>BLOB</td> - * <td>BINARY</td> - * </tr> - * <tr> - * <td>INET4</td> - * <td>BINARY</td> - * </tr> - * </table> - * - * <p> - * Because Tajo fields can be NULL, all Parquet fields are marked as optional. - * </p> - * - * <p> - * The conversion from Tajo to Parquet is lossy without the original Tajo - * schema. As a result, Parquet files are read using the Tajo schema saved in - * the Tajo catalog for the table the Parquet files belong to, which was - * defined when the table was created. - * </p> - */ - -package org.apache.tajo.storage.parquet; http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/BytesRefArrayWritable.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/BytesRefArrayWritable.java b/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/BytesRefArrayWritable.java deleted file mode 100644 index 5e200a0..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/BytesRefArrayWritable.java +++ /dev/null @@ -1,261 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage.rcfile; - -import com.google.common.base.Objects; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.WritableFactories; -import org.apache.hadoop.io.WritableFactory; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.util.Arrays; - -/** - * <tt>BytesRefArrayWritable</tt> holds an array reference to BytesRefWritable, - * and is able to resize without recreating new array if not necessary. - * <p> - * - * Each <tt>BytesRefArrayWritable holds</tt> instance has a <i>valid</i> field, - * which is the desired valid number of <tt>BytesRefWritable</tt> it holds. - * <tt>resetValid</tt> can reset the valid, but it will not care the underlying - * BytesRefWritable. - */ - -public class BytesRefArrayWritable implements Writable, - Comparable<BytesRefArrayWritable> { - - private BytesRefWritable[] bytesRefWritables = null; - - private int valid = 0; - - /** - * Constructs an empty array with the specified capacity. - * - * @param capacity - * initial capacity - * @exception IllegalArgumentException - * if the specified initial capacity is negative - */ - public BytesRefArrayWritable(int capacity) { - if (capacity < 0) { - throw new IllegalArgumentException("Capacity can not be negative."); - } - bytesRefWritables = new BytesRefWritable[0]; - ensureCapacity(capacity); - } - - /** - * Constructs an empty array with a capacity of ten. - */ - public BytesRefArrayWritable() { - this(10); - } - - /** - * Returns the number of valid elements. - * - * @return the number of valid elements - */ - public int size() { - return valid; - } - - /** - * Gets the BytesRefWritable at the specified position. Make sure the position - * is valid by first call resetValid. - * - * @param index - * the position index, starting from zero - * @throws IndexOutOfBoundsException - */ - public BytesRefWritable get(int index) { - if (index >= valid) { - throw new IndexOutOfBoundsException( - "This BytesRefArrayWritable only has " + valid + " valid values."); - } - return bytesRefWritables[index]; - } - - /** - * Gets the BytesRefWritable at the specified position without checking. - * - * @param index - * the position index, starting from zero - * @throws IndexOutOfBoundsException - */ - public BytesRefWritable unCheckedGet(int index) { - return bytesRefWritables[index]; - } - - /** - * Set the BytesRefWritable at the specified position with the specified - * BytesRefWritable. - * - * @param index - * index position - * @param bytesRefWritable - * the new element - * @throws IllegalArgumentException - * if the specified new element is null - */ - public void set(int index, BytesRefWritable bytesRefWritable) { - if (bytesRefWritable == null) { - throw new IllegalArgumentException("Can not assign null."); - } - ensureCapacity(index + 1); - bytesRefWritables[index] = bytesRefWritable; - if (valid <= index) { - valid = index + 1; - } - } - - /** - * {@inheritDoc} - */ - @Override - public int compareTo(BytesRefArrayWritable other) { - if (other == null) { - throw new IllegalArgumentException("Argument can not be null."); - } - if (this == other) { - return 0; - } - int sizeDiff = valid - other.valid; - if (sizeDiff != 0) { - return sizeDiff; - } - for (int i = 0; i < valid; i++) { - if (other.contains(bytesRefWritables[i])) { - continue; - } else { - return 1; - } - } - return 0; - } - - @Override - public int hashCode(){ - return Objects.hashCode(bytesRefWritables); - } - /** - * Returns <tt>true</tt> if this instance contains one or more the specified - * BytesRefWritable. - * - * @param bytesRefWritable - * BytesRefWritable element to be tested - * @return <tt>true</tt> if contains the specified element - * @throws IllegalArgumentException - * if the specified element is null - */ - public boolean contains(BytesRefWritable bytesRefWritable) { - if (bytesRefWritable == null) { - throw new IllegalArgumentException("Argument can not be null."); - } - for (int i = 0; i < valid; i++) { - if (bytesRefWritables[i].equals(bytesRefWritable)) { - return true; - } - } - return false; - } - - /** - * {@inheritDoc} - */ - @Override - public boolean equals(Object o) { - if (o == null || !(o instanceof BytesRefArrayWritable)) { - return false; - } - return compareTo((BytesRefArrayWritable) o) == 0; - } - - /** - * Removes all elements. - */ - public void clear() { - valid = 0; - } - - /** - * enlarge the capacity if necessary, to ensure that it can hold the number of - * elements specified by newValidCapacity argument. It will also narrow the - * valid capacity when needed. Notice: it only enlarge or narrow the valid - * capacity with no care of the already stored invalid BytesRefWritable. - * - * @param newValidCapacity - * the desired capacity - */ - public void resetValid(int newValidCapacity) { - ensureCapacity(newValidCapacity); - valid = newValidCapacity; - } - - protected void ensureCapacity(int newCapacity) { - int size = bytesRefWritables.length; - if (size < newCapacity) { - bytesRefWritables = Arrays.copyOf(bytesRefWritables, newCapacity); - while (size < newCapacity) { - bytesRefWritables[size] = new BytesRefWritable(); - size++; - } - } - } - - /** - * {@inheritDoc} - */ - @Override - public void readFields(DataInput in) throws IOException { - int count = in.readInt(); - ensureCapacity(count); - for (int i = 0; i < count; i++) { - bytesRefWritables[i].readFields(in); - } - valid = count; - } - - /** - * {@inheritDoc} - */ - @Override - public void write(DataOutput out) throws IOException { - out.writeInt(valid); - - for (int i = 0; i < valid; i++) { - BytesRefWritable cu = bytesRefWritables[i]; - cu.write(out); - } - } - - static { - WritableFactories.setFactory(BytesRefArrayWritable.class, - new WritableFactory() { - - @Override - public Writable newInstance() { - return new BytesRefArrayWritable(); - } - - }); - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/BytesRefWritable.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/BytesRefWritable.java b/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/BytesRefWritable.java deleted file mode 100644 index c83b505..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/BytesRefWritable.java +++ /dev/null @@ -1,248 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage.rcfile; - -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.WritableComparator; -import org.apache.hadoop.io.WritableFactories; -import org.apache.hadoop.io.WritableFactory; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; - -/** - * <tt>BytesRefWritable</tt> referenced a section of byte array. It can be used - * to avoid unnecessary byte copy. - */ -public class BytesRefWritable implements Writable, Comparable<BytesRefWritable> { - - private static final byte[] EMPTY_BYTES = new byte[0]; - public static final BytesRefWritable ZeroBytesRefWritable = new BytesRefWritable(); - - int start = 0; - int length = 0; - byte[] bytes = null; - - LazyDecompressionCallback lazyDecompressObj; - - /** - * Create a zero-size bytes. - */ - public BytesRefWritable() { - this(EMPTY_BYTES); - } - - /** - * Create a BytesRefWritable with <tt>length</tt> bytes. - */ - public BytesRefWritable(int length) { - assert length > 0; - this.length = length; - bytes = new byte[this.length]; - start = 0; - } - - /** - * Create a BytesRefWritable referenced to the given bytes. - */ - public BytesRefWritable(byte[] bytes) { - this.bytes = bytes; - length = bytes.length; - start = 0; - } - - /** - * Create a BytesRefWritable referenced to one section of the given bytes. The - * section is determined by argument <tt>offset</tt> and <tt>len</tt>. - */ - public BytesRefWritable(byte[] data, int offset, int len) { - bytes = data; - start = offset; - length = len; - } - - /** - * Create a BytesRefWritable referenced to one section of the given bytes. The - * argument <tt>lazyDecompressData</tt> refers to a LazyDecompressionCallback - * object. The arguments <tt>offset</tt> and <tt>len</tt> are referred to - * uncompressed bytes of <tt>lazyDecompressData</tt>. Use <tt>offset</tt> and - * <tt>len</tt> after uncompressing the data. - */ - public BytesRefWritable(LazyDecompressionCallback lazyDecompressData, - int offset, int len) { - lazyDecompressObj = lazyDecompressData; - start = offset; - length = len; - } - - private void lazyDecompress() throws IOException { - if (bytes == null && lazyDecompressObj != null) { - bytes = lazyDecompressObj.decompress(); - } - } - - /** - * Returns a copy of the underlying bytes referenced by this instance. - * - * @return a new copied byte array - * @throws IOException - */ - public byte[] getBytesCopy() throws IOException { - lazyDecompress(); - byte[] bb = new byte[length]; - System.arraycopy(bytes, start, bb, 0, length); - return bb; - } - - /** - * Returns the underlying bytes. - * - * @throws IOException - */ - public byte[] getData() throws IOException { - lazyDecompress(); - return bytes; - } - - /** - * readFields() will corrupt the array. So use the set method whenever - * possible. - * - * @see #readFields(DataInput) - */ - public void set(byte[] newData, int offset, int len) { - bytes = newData; - start = offset; - length = len; - lazyDecompressObj = null; - } - - /** - * readFields() will corrupt the array. So use the set method whenever - * possible. - * - * @see #readFields(DataInput) - */ - public void set(LazyDecompressionCallback newData, int offset, int len) { - bytes = null; - start = offset; - length = len; - lazyDecompressObj = newData; - } - - public void writeDataTo(DataOutput out) throws IOException { - lazyDecompress(); - out.write(bytes, start, length); - } - - /** - * Always reuse the bytes array if length of bytes array is equal or greater - * to the current record, otherwise create a new one. readFields will corrupt - * the array. Please use set() whenever possible. - * - * @see #set(byte[], int, int) - */ - public void readFields(DataInput in) throws IOException { - int len = in.readInt(); - if (len > bytes.length) { - bytes = new byte[len]; - } - start = 0; - length = len; - in.readFully(bytes, start, length); - } - - /** {@inheritDoc} */ - public void write(DataOutput out) throws IOException { - lazyDecompress(); - out.writeInt(length); - out.write(bytes, start, length); - } - - /** {@inheritDoc} */ - @Override - public int hashCode() { - return super.hashCode(); - } - - /** {@inheritDoc} */ - @Override - public String toString() { - StringBuilder sb = new StringBuilder(3 * length); - for (int idx = start; idx < length; idx++) { - // if not the first, put a blank separator in - if (idx != 0) { - sb.append(' '); - } - String num = Integer.toHexString(0xff & bytes[idx]); - // if it is only one digit, add a leading 0. - if (num.length() < 2) { - sb.append('0'); - } - sb.append(num); - } - return sb.toString(); - } - - /** {@inheritDoc} */ - @Override - public int compareTo(BytesRefWritable other) { - if (other == null) { - throw new IllegalArgumentException("Argument can not be null."); - } - if (this == other) { - return 0; - } - try { - return WritableComparator.compareBytes(getData(), start, getLength(), - other.getData(), other.start, other.getLength()); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - /** {@inheritDoc} */ - @Override - public boolean equals(Object right_obj) { - if (right_obj == null || !(right_obj instanceof BytesRefWritable)) { - return false; - } - return compareTo((BytesRefWritable) right_obj) == 0; - } - - static { - WritableFactories.setFactory(BytesRefWritable.class, new WritableFactory() { - - @Override - public Writable newInstance() { - return new BytesRefWritable(); - } - - }); - } - - public int getLength() { - return length; - } - - public int getStart() { - return start; - } -}
