Repository: tajo Updated Branches: refs/heads/master c770fe75c -> ee6c2b5fe
TAJO-2000: BSTIndex can cause OOM. Closes #892 Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/ee6c2b5f Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/ee6c2b5f Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/ee6c2b5f Branch: refs/heads/master Commit: ee6c2b5fe75753ea0b5b54e833e86e368b7ef3b2 Parents: c770fe7 Author: Jinho Kim <[email protected]> Authored: Mon Dec 7 13:49:50 2015 +0900 Committer: Jinho Kim <[email protected]> Committed: Mon Dec 7 13:49:50 2015 +0900 ---------------------------------------------------------------------- CHANGES | 2 + .../org/apache/tajo/storage/BufferPool.java | 30 +- .../org/apache/tajo/tuple/memory/HeapTuple.java | 15 + .../apache/tajo/tuple/memory/UnSafeTuple.java | 14 + .../java/org/apache/tajo/util/FileUtil.java | 22 + .../planner/physical/BSTIndexScanExec.java | 7 +- .../physical/RangeShuffleFileWriteExec.java | 11 +- .../engine/planner/physical/StoreIndexExec.java | 2 +- .../java/org/apache/tajo/worker/TaskImpl.java | 4 +- .../tajo/pullserver/TajoPullServerService.java | 162 ++++--- .../apache/tajo/storage/index/bst/BSTIndex.java | 431 +++++++++++++------ .../apache/tajo/storage/index/TestBSTIndex.java | 135 +++++- .../index/TestSingleCSVFileBSTIndex.java | 8 +- 13 files changed, 595 insertions(+), 248 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/ee6c2b5f/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index f1afd9e..e54e661 100644 --- a/CHANGES +++ b/CHANGES @@ -57,6 +57,8 @@ Release 0.12.0 - unreleased BUG FIXES + TAJO-2000: BSTIndex can cause OOM. (jinho) + TAJO-1992: \set timezone in cli doesn't work because of casesensitive (DaeMyung) TAJO-1993: Table Timezone doesn't work when Timezone is not exactly same.(DaeMyung) http://git-wip-us.apache.org/repos/asf/tajo/blob/ee6c2b5f/tajo-common/src/main/java/org/apache/tajo/storage/BufferPool.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/storage/BufferPool.java b/tajo-common/src/main/java/org/apache/tajo/storage/BufferPool.java index 4913d3b..7c4e288 100644 --- a/tajo-common/src/main/java/org/apache/tajo/storage/BufferPool.java +++ b/tajo-common/src/main/java/org/apache/tajo/storage/BufferPool.java @@ -94,17 +94,39 @@ public class BufferPool { public static ByteBuf directBuffer(int size) { - return ALLOCATOR.directBuffer(size); + return directBuffer(size, ByteOrder.LITTLE_ENDIAN); + } + + /** + * @param size the initial capacity + * @param order the endianness + * @return allocated ByteBuf from pool + */ + public static ByteBuf directBuffer(int size, ByteOrder order) { + ByteBuf byteBuf = ALLOCATOR.directBuffer(size); + if (byteBuf.order() != order) byteBuf.order(order); + return byteBuf; } /** - * * @param size the initial capacity - * @param max the max capacity + * @param max the max capacity * @return allocated ByteBuf from pool */ public static ByteBuf directBuffer(int size, int max) { - return ALLOCATOR.directBuffer(size, max).order(ByteOrder.LITTLE_ENDIAN); + return directBuffer(size, max, ByteOrder.LITTLE_ENDIAN); + } + + /** + * @param size the initial capacity + * @param max the max capacity + * @param order the endianness + * @return allocated ByteBuf from pool + */ + public static ByteBuf directBuffer(int size, int max, ByteOrder order) { + ByteBuf byteBuf = ALLOCATOR.directBuffer(size, max); + if (byteBuf.order() != order) byteBuf.order(order); + return byteBuf; } /** http://git-wip-us.apache.org/repos/asf/tajo/blob/ee6c2b5f/tajo-common/src/main/java/org/apache/tajo/tuple/memory/HeapTuple.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/tuple/memory/HeapTuple.java b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/HeapTuple.java index c6c7daf..330b363 100644 --- a/tajo-common/src/main/java/org/apache/tajo/tuple/memory/HeapTuple.java +++ b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/HeapTuple.java @@ -33,6 +33,7 @@ import org.apache.tajo.util.StringUtils; import org.apache.tajo.util.datetime.TimeMeta; import java.nio.ByteOrder; +import java.util.Arrays; import static org.apache.tajo.common.TajoDataTypes.DataType; @@ -296,6 +297,20 @@ public class HeapTuple extends ZeroCopyTuple implements Cloneable { } @Override + public int hashCode() { + return Arrays.hashCode(getValues()); + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof Tuple) { + Tuple other = (Tuple) obj; + return Arrays.equals(getValues(), other.getValues()); + } + return false; + } + + @Override public Tuple clone() throws CloneNotSupportedException { HeapTuple heapTuple = (HeapTuple) super.clone(); heapTuple.buffer = buffer.copy(getRelativePos(), getLength()); http://git-wip-us.apache.org/repos/asf/tajo/blob/ee6c2b5f/tajo-common/src/main/java/org/apache/tajo/tuple/memory/UnSafeTuple.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/tuple/memory/UnSafeTuple.java b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/UnSafeTuple.java index 26f7df3..dcff801 100644 --- a/tajo-common/src/main/java/org/apache/tajo/tuple/memory/UnSafeTuple.java +++ b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/UnSafeTuple.java @@ -36,6 +36,7 @@ import sun.misc.Unsafe; import java.nio.ByteBuffer; import java.nio.charset.Charset; +import java.util.Arrays; import static org.apache.tajo.common.TajoDataTypes.DataType; @@ -338,6 +339,19 @@ public class UnSafeTuple extends ZeroCopyTuple { } @Override + public int hashCode() { + return Arrays.hashCode(getValues()); + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof Tuple) { + Tuple other = (Tuple) obj; + return Arrays.equals(getValues(), other.getValues()); + } + return false; + } + @Override public String toString() { return VTuple.toDisplayString(getValues()); } http://git-wip-us.apache.org/repos/asf/tajo/blob/ee6c2b5f/tajo-common/src/main/java/org/apache/tajo/util/FileUtil.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/util/FileUtil.java b/tajo-common/src/main/java/org/apache/tajo/util/FileUtil.java index 118f42a..95700d0 100644 --- a/tajo-common/src/main/java/org/apache/tajo/util/FileUtil.java +++ b/tajo-common/src/main/java/org/apache/tajo/util/FileUtil.java @@ -120,4 +120,26 @@ public class FileUtil { } } } + + /** + * Close the Closeable objects and <b>throw</b> first {@link IOException}, if failed + * @param closeables the objects to close + */ + public static void cleanupAndthrowIfFailed(java.io.Closeable... closeables) throws IOException { + IOException ioe = null; + + for (java.io.Closeable c : closeables) { + if (c != null) { + try { + c.close(); + } catch (IOException e) { + if (ioe == null) ioe = e; + } + } + } + + if (ioe != null) { + throw ioe; + } + } } http://git-wip-us.apache.org/repos/asf/tajo/blob/ee6c2b5f/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java index ee3762f..89c5b3d 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java @@ -90,7 +90,6 @@ public class BSTIndexScanExec extends ScanExec { Path indexPath = new Path(indexPrefix.toString(), IndexExecutorUtil.getIndexFileName(fragment)); this.reader = new BSTIndex(context.getConf()). getIndexReader(indexPath, keySchema, comparator); - this.reader.open(); } private static Schema mergeSubSchemas(Schema originalSchema, Schema subSchema, List<Target> targets, EvalNode qual) { @@ -101,9 +100,7 @@ public class BSTIndexScanExec extends ScanExec { qualAndTargets.addAll(EvalTreeUtil.findUniqueColumns(target.getEvalTree())); } for (Column column : originalSchema.getRootColumns()) { - if (subSchema.contains(column) - || qualAndTargets.contains(column) - || qualAndTargets.contains(column)) { + if (subSchema.contains(column) || qualAndTargets.contains(column)) { mergedSchema.addColumn(column); } } @@ -127,6 +124,8 @@ public class BSTIndexScanExec extends ScanExec { @Override public void init() throws IOException { + reader.init(); + Schema projected; // in the case where projected column or expression are given http://git-wip-us.apache.org/repos/asf/tajo/blob/ee6c2b5f/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java index bcd2b17..e4217b3 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java @@ -86,9 +86,8 @@ public class RangeShuffleFileWriteExec extends UnaryPhysicalExec { this.appender.enableStats(keySchema.getAllColumns()); this.appender.init(); this.indexWriter = bst.getIndexWriter(new Path(storeTablePath, "index"), - BSTIndex.TWO_LEVEL_INDEX, keySchema, comp); - this.indexWriter.setLoadNum(100); - this.indexWriter.open(); + BSTIndex.TWO_LEVEL_INDEX, keySchema, comp, true); + this.indexWriter.init(); super.init(); } @@ -121,13 +120,11 @@ public class RangeShuffleFileWriteExec extends UnaryPhysicalExec { super.close(); appender.flush(); - IOUtils.cleanup(LOG, appender); - indexWriter.flush(); - IOUtils.cleanup(LOG, indexWriter); - // Collect statistics data context.setResultStats(appender.getStats()); context.addShuffleFileOutput(0, context.getTaskId().toString()); + IOUtils.cleanup(LOG, appender); + indexWriter.close(); appender = null; indexWriter = null; } http://git-wip-us.apache.org/repos/asf/tajo/blob/ee6c2b5f/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreIndexExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreIndexExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreIndexExec.java index fa9fe3c..c5e1093 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreIndexExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreIndexExec.java @@ -78,7 +78,7 @@ public class StoreIndexExec extends UnaryPhysicalExec { this.comparator = new BaseTupleComparator(keySchema, sortSpecs); this.indexWriter = bst.getIndexWriter(indexPath, BSTIndex.TWO_LEVEL_INDEX, keySchema, comparator); this.indexWriter.setLoadNum(100); - this.indexWriter.open(); + this.indexWriter.init(); } @Override http://git-wip-us.apache.org/repos/asf/tajo/blob/ee6c2b5f/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java index 873d9e0..74805ce 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java @@ -768,8 +768,8 @@ public class TaskImpl implements Task { try { chunk = TajoPullServerService.getFileChunks(path, startKey, endKey, last); - } catch (Throwable t) { - LOG.error("getFileChunks() throws exception"); + } catch (Throwable t) { + LOG.error(t.getMessage(), t); return null; } http://git-wip-us.apache.org/repos/asf/tajo/blob/ee6c2b5f/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java ---------------------------------------------------------------------- diff --git a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java index 6bcb7b4..ef3d7e0 100644 --- a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java +++ b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java @@ -661,109 +661,107 @@ public class TajoPullServerService extends AbstractService { String endKey, boolean last) throws IOException { BSTIndex index = new BSTIndex(new TajoConf()); - BSTIndex.BSTIndexReader idxReader = - index.getIndexReader(new Path(outDir, "index")); - idxReader.open(); - Schema keySchema = idxReader.getKeySchema(); - TupleComparator comparator = idxReader.getComparator(); - - if (LOG.isDebugEnabled()) { - LOG.debug("BSTIndex is loaded from disk (" + idxReader.getFirstKey() + ", " + idxReader.getLastKey() + ")"); - } + try (BSTIndex.BSTIndexReader idxReader = index.getIndexReader(new Path(outDir, "index"))) { + Schema keySchema = idxReader.getKeySchema(); + TupleComparator comparator = idxReader.getComparator(); - File data = new File(URI.create(outDir.toUri() + "/output")); - byte [] startBytes = Base64.decodeBase64(startKey); - byte [] endBytes = Base64.decodeBase64(endKey); + if (LOG.isDebugEnabled()) { + LOG.debug("BSTIndex is loaded from disk (" + idxReader.getFirstKey() + ", " + idxReader.getLastKey() + ")"); + } - RowStoreDecoder decoder = RowStoreUtil.createDecoder(keySchema); - Tuple start; - Tuple end; - try { - start = decoder.toTuple(startBytes); - } catch (Throwable t) { - throw new IllegalArgumentException("StartKey: " + startKey - + ", decoded byte size: " + startBytes.length, t); - } + File data = new File(URI.create(outDir.toUri() + "/output")); + byte[] startBytes = Base64.decodeBase64(startKey); + byte[] endBytes = Base64.decodeBase64(endKey); - try { - end = decoder.toTuple(endBytes); - } catch (Throwable t) { - throw new IllegalArgumentException("EndKey: " + endKey - + ", decoded byte size: " + endBytes.length, t); - } + RowStoreDecoder decoder = RowStoreUtil.createDecoder(keySchema); + Tuple start; + Tuple end; + try { + start = decoder.toTuple(startBytes); + } catch (Throwable t) { + throw new IllegalArgumentException("StartKey: " + startKey + + ", decoded byte size: " + startBytes.length, t); + } - LOG.info("GET Request for " + data.getAbsolutePath() + " (start="+start+", end="+ end + - (last ? ", last=true" : "") + ")"); + try { + end = decoder.toTuple(endBytes); + } catch (Throwable t) { + throw new IllegalArgumentException("EndKey: " + endKey + + ", decoded byte size: " + endBytes.length, t); + } - if (idxReader.getFirstKey() == null && idxReader.getLastKey() == null) { // if # of rows is zero - LOG.info("There is no contents"); - return null; - } + LOG.info("GET Request for " + data.getAbsolutePath() + " (start=" + start + ", end=" + end + + (last ? ", last=true" : "") + ")"); - if (comparator.compare(end, idxReader.getFirstKey()) < 0 || - comparator.compare(idxReader.getLastKey(), start) < 0) { - LOG.warn("Out of Scope (indexed data [" + idxReader.getFirstKey() + ", " + idxReader.getLastKey() + - "], but request start:" + start + ", end: " + end); - return null; - } + if (idxReader.getFirstKey() == null && idxReader.getLastKey() == null) { // if # of rows is zero + LOG.info("There is no contents"); + return null; + } - long startOffset; - long endOffset; - try { - startOffset = idxReader.find(start); - } catch (IOException ioe) { - LOG.error("State Dump (the requested range: " - + "[" + start + ", " + end +")" + ", idx min: " - + idxReader.getFirstKey() + ", idx max: " - + idxReader.getLastKey()); - throw ioe; - } - try { - endOffset = idxReader.find(end); - if (endOffset == -1) { - endOffset = idxReader.find(end, true); + if (comparator.compare(end, idxReader.getFirstKey()) < 0 || + comparator.compare(idxReader.getLastKey(), start) < 0) { + LOG.warn("Out of Scope (indexed data [" + idxReader.getFirstKey() + ", " + idxReader.getLastKey() + + "], but request start:" + start + ", end: " + end); + return null; } - } catch (IOException ioe) { - LOG.error("State Dump (the requested range: " - + "[" + start + ", " + end +")" + ", idx min: " - + idxReader.getFirstKey() + ", idx max: " - + idxReader.getLastKey()); - throw ioe; - } - // if startOffset == -1 then case 2-1 or case 3 - if (startOffset == -1) { // this is a hack - // if case 2-1 or case 3 + long startOffset; + long endOffset; try { - startOffset = idxReader.find(start, true); + idxReader.init(); + startOffset = idxReader.find(start); } catch (IOException ioe) { LOG.error("State Dump (the requested range: " - + "[" + start + ", " + end +")" + ", idx min: " + + "[" + start + ", " + end + ")" + ", idx min: " + + idxReader.getFirstKey() + ", idx max: " + + idxReader.getLastKey()); + throw ioe; + } + try { + endOffset = idxReader.find(end); + if (endOffset == -1) { + endOffset = idxReader.find(end, true); + } + } catch (IOException ioe) { + LOG.error("State Dump (the requested range: " + + "[" + start + ", " + end + ")" + ", idx min: " + idxReader.getFirstKey() + ", idx max: " + idxReader.getLastKey()); throw ioe; } - } - if (startOffset == -1) { - throw new IllegalStateException("startOffset " + startOffset + " is negative \n" + - "State Dump (the requested range: " - + "[" + start + ", " + end +")" + ", idx min: " + idxReader.getFirstKey() + ", idx max: " - + idxReader.getLastKey()); - } + // if startOffset == -1 then case 2-1 or case 3 + if (startOffset == -1) { // this is a hack + // if case 2-1 or case 3 + try { + startOffset = idxReader.find(start, true); + } catch (IOException ioe) { + LOG.error("State Dump (the requested range: " + + "[" + start + ", " + end + ")" + ", idx min: " + + idxReader.getFirstKey() + ", idx max: " + + idxReader.getLastKey()); + throw ioe; + } + } - // if greater than indexed values - if (last || (endOffset == -1 - && comparator.compare(idxReader.getLastKey(), end) < 0)) { - endOffset = data.length(); - } + if (startOffset == -1) { + throw new IllegalStateException("startOffset " + startOffset + " is negative \n" + + "State Dump (the requested range: " + + "[" + start + ", " + end + ")" + ", idx min: " + idxReader.getFirstKey() + ", idx max: " + + idxReader.getLastKey()); + } - idxReader.close(); + // if greater than indexed values + if (last || (endOffset == -1 + && comparator.compare(idxReader.getLastKey(), end) < 0)) { + endOffset = data.length(); + } - FileChunk chunk = new FileChunk(data, startOffset, endOffset - startOffset); + FileChunk chunk = new FileChunk(data, startOffset, endOffset - startOffset); - if(LOG.isDebugEnabled()) LOG.debug("Retrieve File Chunk: " + chunk); - return chunk; + if (LOG.isDebugEnabled()) LOG.debug("Retrieve File Chunk: " + chunk); + return chunk; + } } public static List<String> splitMaps(List<String> mapq) { http://git-wip-us.apache.org/repos/asf/tajo/blob/ee6c2b5f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/index/bst/BSTIndex.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/index/bst/BSTIndex.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/index/bst/BSTIndex.java index 3affd50..d212c1c 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/index/bst/BSTIndex.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/index/bst/BSTIndex.java @@ -18,13 +18,13 @@ package org.apache.tajo.storage.index.bst; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.*; import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.proto.CatalogProtos.SchemaProto; import org.apache.tajo.storage.*; @@ -33,12 +33,14 @@ import org.apache.tajo.storage.RowStoreUtil.RowStoreEncoder; import org.apache.tajo.storage.index.IndexMethod; import org.apache.tajo.storage.index.IndexWriter; import org.apache.tajo.storage.index.OrderIndexReader; +import org.apache.tajo.unit.StorageUnit; +import org.apache.tajo.util.FileUtil; -import java.io.Closeable; -import java.io.FileNotFoundException; -import java.io.IOException; +import java.io.*; +import java.nio.ByteOrder; +import java.nio.channels.FileChannel; import java.util.LinkedList; -import java.util.Set; +import java.util.Map; import java.util.TreeMap; import static org.apache.tajo.index.IndexProtos.TupleComparatorProto; @@ -55,6 +57,9 @@ public class BSTIndex implements IndexMethod { public static final int ONE_LEVEL_INDEX = 1; public static final int TWO_LEVEL_INDEX = 2; + public static final int DEFAULT_INDEX_LOAD = 4096; + public static final int BUFFER_SIZE = 128 * StorageUnit.KB; + public static final String WRITER_INDEX_LOAD = "tajo.executor.index.writer.load-num"; private final Configuration conf; @@ -62,10 +67,16 @@ public class BSTIndex implements IndexMethod { this.conf = conf; } + + public BSTIndexWriter getIndexWriter(final Path fileName, int level, Schema keySchema, + TupleComparator comparator, boolean sorted) throws IOException { + return new BSTIndexWriter(fileName, level, keySchema, comparator, sorted); + } + @Override public BSTIndexWriter getIndexWriter(final Path fileName, int level, Schema keySchema, - TupleComparator comparator) throws IOException { - return new BSTIndexWriter(fileName, level, keySchema, comparator); + TupleComparator comparator) throws IOException { + return getIndexWriter(fileName, level, keySchema, comparator, false); } @Override @@ -78,23 +89,38 @@ public class BSTIndex implements IndexMethod { } public class BSTIndexWriter extends IndexWriter implements Closeable { + private FileChannel outChannel; + private RandomAccessFile outRandomAccessFile; private FSDataOutputStream out; - private FileSystem fs; + private long filePos; + + private FileChannel rootOutChannel; + private RandomAccessFile rootOutRandomAccessFile; + private FSDataOutputStream rootOut; + + private boolean isLocal; + private int level; - private int loadNum = 4096; + private int loadNum; private Path fileName; + // Target data set is sorted or not + private boolean sorted; + private boolean writeRootIndex; private final Schema keySchema; private final TupleComparator compartor; private final KeyOffsetCollector collector; private KeyOffsetCollector rootCollector; + private ByteBuf indexBuffer; + private ByteBuf rootIndexBuffer; private Tuple firstKey; private Tuple lastKey; private RowStoreEncoder rowStoreEncoder; - - // private Tuple lastestKey = null; + private int loadCount; + private int entrySize; + private int rootEntrySize; /** * constructor @@ -104,25 +130,63 @@ public class BSTIndex implements IndexMethod { * @throws java.io.IOException */ public BSTIndexWriter(final Path fileName, int level, Schema keySchema, - TupleComparator comparator) throws IOException { + TupleComparator comparator, boolean sorted) throws IOException { this.fileName = fileName; this.level = level; + this.writeRootIndex = level == TWO_LEVEL_INDEX; this.keySchema = keySchema; this.compartor = comparator; this.collector = new KeyOffsetCollector(comparator); + this.rootCollector = new KeyOffsetCollector(this.compartor); this.rowStoreEncoder = RowStoreUtil.createEncoder(keySchema); + this.sorted = sorted; + this.indexBuffer = BufferPool.directBuffer(BUFFER_SIZE, ByteOrder.nativeOrder()); + this.rootIndexBuffer = BufferPool.directBuffer(BUFFER_SIZE, ByteOrder.nativeOrder()); + this.loadCount = loadNum = conf.getInt(WRITER_INDEX_LOAD, DEFAULT_INDEX_LOAD); } - public void setLoadNum(int loadNum) { + public void setLoadNum(int loadNum) { this.loadNum = loadNum; + this.loadCount = loadNum; } - public void open() throws IOException { - fs = fileName.getFileSystem(conf); - if (fs.exists(fileName)) { - throw new IOException("ERROR: index file (" + fileName + " already exists"); + public void init() throws IOException { + FileSystem fs = fileName.getFileSystem(conf); + Path rootPath = new Path(fileName + ".root"); + if (fs.exists(fileName) || fs.exists(rootPath)) { + throw new IOException("ERROR: index file " + fileName + " or " + rootPath + " already exists"); + } + + if (fs instanceof LocalFileSystem) { + File outFile; + try { + if (!fs.exists(fileName.getParent())) { + fs.mkdirs(fileName.getParent()); + } + + if (fileName.toUri().getScheme() != null) { + outFile = new File(fileName.toUri()); + } else { + outFile = new File(fileName.toString()); + } + } catch (IllegalArgumentException iae) { + throw new IOException(iae); + } + + outRandomAccessFile = new RandomAccessFile(outFile, "rw"); + outChannel = outRandomAccessFile.getChannel(); + + if (writeRootIndex) { + rootOutRandomAccessFile = new RandomAccessFile(new File(outFile.getAbsolutePath() + ".root"), "rw"); + rootOutChannel = rootOutRandomAccessFile.getChannel(); + } + isLocal = true; + } else { + out = fs.create(fileName, true); + if (writeRootIndex) { + rootOut = fs.create(rootPath, true); + } } - out = fs.create(fileName); } @Override @@ -140,7 +204,83 @@ public class BSTIndex implements IndexMethod { lastKey = keyTuple; } - collector.put(keyTuple, offset); + if (sorted) { + /* root index writing */ + if (writeRootIndex) { + if (loadCount == loadNum) { + loadCount = 0; + writeRootIndex(rootIndexBuffer, keyTuple, filePos + indexBuffer.writerIndex()); + } + loadCount++; + } + + /* leaf index writing */ + writeIndex(indexBuffer, keyTuple, offset); + } else { + collector.put(keyTuple, offset); + } + } + + private void writeIndex(ByteBuf byteBuf, Tuple tuple, Long... offsets) throws IOException { + + byte[] buf = rowStoreEncoder.toBytes(tuple); + int size = buf.length + 8 + (offsets.length * 8); + if (!byteBuf.isWritable(size)) { + byteBuf.ensureWritable(size); + } + + // key writing + byteBuf.writeInt(buf.length); + byteBuf.writeBytes(buf); + + //offset num writing + byteBuf.writeInt(offsets.length); + + /* offset writing */ + for (long offset : offsets) { + byteBuf.writeLong(offset); + } + + entrySize++; + // flush to file and reset buffer + if (byteBuf.writerIndex() >= BUFFER_SIZE) { + filePos += flushBuffer(byteBuf, outChannel, out); + } + } + + private void writeRootIndex(ByteBuf byteBuf, Tuple tuple, long offset) throws IOException { + byte[] buf = rowStoreEncoder.toBytes(tuple); + int size = buf.length + 12; + if (!byteBuf.isWritable(size)) { + byteBuf.ensureWritable(size); + } + + // key writing + byteBuf.writeInt(buf.length); + byteBuf.writeBytes(buf); + + // leaf offset writing + byteBuf.writeLong(offset); + + rootEntrySize++; + // flush to file and reset buffer + if (byteBuf.writerIndex() >= BUFFER_SIZE) { + flushBuffer(byteBuf, rootOutChannel, rootOut); + } + } + + private int flushBuffer(ByteBuf byteBuf, FileChannel channel, FSDataOutputStream out) throws IOException { + // write buffer to file + int readableBytes = byteBuf.readableBytes(); + if (readableBytes > 0) { + if (isLocal) { + byteBuf.readBytes(channel, readableBytes); + } else { + byteBuf.readBytes(out, readableBytes); + } + byteBuf.clear(); + } + return readableBytes; } public TupleComparator getComparator() { @@ -148,107 +288,128 @@ public class BSTIndex implements IndexMethod { } public void flush() throws IOException { - out.flush(); + if (out != null) { + flushBuffer(indexBuffer, outChannel, out); + out.flush(); + } + + if (writeRootIndex && rootOut != null) { + flushBuffer(rootIndexBuffer, rootOutChannel, rootOut); + rootOut.flush(); + } } - public void writeHeader(int entryNum) throws IOException { + public void writeFooter(int entryNum) throws IOException { + indexBuffer.clear(); + + long startPosition = filePos; // schema byte [] schemaBytes = keySchema.getProto().toByteArray(); - out.writeInt(schemaBytes.length); - out.write(schemaBytes); - // comparator byte [] comparatorBytes = compartor.getProto().toByteArray(); - out.writeInt(comparatorBytes.length); - out.write(comparatorBytes); + + int size = schemaBytes.length + comparatorBytes.length + 16; + if(!indexBuffer.isWritable(size)) { + indexBuffer.ensureWritable(size); + } + + indexBuffer.writeInt(schemaBytes.length); + indexBuffer.writeBytes(schemaBytes); + + indexBuffer.writeInt(comparatorBytes.length); + indexBuffer.writeBytes(comparatorBytes); // level - out.writeInt(this.level); + indexBuffer.writeInt(this.level); // entry - out.writeInt(entryNum); + indexBuffer.writeInt(entryNum); if (entryNum > 0) { byte [] minBytes = rowStoreEncoder.toBytes(firstKey); - out.writeInt(minBytes.length); - out.write(minBytes); byte [] maxBytes = rowStoreEncoder.toBytes(lastKey); - out.writeInt(maxBytes.length); - out.write(maxBytes); - } - out.flush(); - } - public void close() throws IOException { - /* two level initialize */ - if (this.level == TWO_LEVEL_INDEX) { - rootCollector = new KeyOffsetCollector(this.compartor); + size = minBytes.length + maxBytes.length + 12; + if(!indexBuffer.isWritable(size)) { + filePos += flushBuffer(indexBuffer, outChannel, out); + indexBuffer.ensureWritable(size); + } + + indexBuffer.writeInt(minBytes.length); + indexBuffer.writeBytes(minBytes); + indexBuffer.writeInt(maxBytes.length); + indexBuffer.writeBytes(maxBytes); } - /* data writing phase */ - TreeMap<Tuple, LinkedList<Long>> keyOffsetMap = collector.getMap(); - Set<Tuple> keySet = keyOffsetMap.keySet(); + // write footer length + int footerSize = (int) (filePos + indexBuffer.readableBytes() + 4 - startPosition); + indexBuffer.writeInt(footerSize); - int entryNum = keySet.size(); - writeHeader(entryNum); + filePos += flushBuffer(indexBuffer, outChannel, out); + } - int loadCount = this.loadNum - 1; - for (Tuple key : keySet) { + public void close() throws IOException { + /* data writing phase */ + try { + if (sorted) { + // write remaining data to file + filePos += flushBuffer(indexBuffer, outChannel, out); + } else { + // flush collected index data + TreeMap<Tuple, LinkedList<Long>> keyOffsetMap = collector.getMap(); + for (Map.Entry<Tuple, LinkedList<Long>> entry : keyOffsetMap.entrySet()) { + + /* two level initialize */ + if (writeRootIndex) { + if (loadCount == loadNum) { + loadCount = 0; + rootCollector.put(entry.getKey(), filePos + indexBuffer.writerIndex()); + } + loadCount++; + } - if (this.level == TWO_LEVEL_INDEX) { - loadCount++; - if (loadCount == this.loadNum) { - rootCollector.put(key, out.getPos()); - loadCount = 0; + LinkedList<Long> offsetList = entry.getValue(); + writeIndex(indexBuffer, entry.getKey(), offsetList.toArray(new Long[offsetList.size()])); } + filePos += flushBuffer(indexBuffer, outChannel, out); + collector.clear(); } - /* key writing */ - byte[] buf = rowStoreEncoder.toBytes(key); - out.writeInt(buf.length); - out.write(buf); - - /**/ - LinkedList<Long> offsetList = keyOffsetMap.get(key); - /* offset num writing */ - int offsetSize = offsetList.size(); - out.writeInt(offsetSize); - /* offset writing */ - for (Long offset : offsetList) { - out.writeLong(offset); - } - } - out.flush(); - out.close(); - keySet.clear(); - collector.clear(); + writeFooter(entrySize); - FSDataOutputStream rootOut = null; - /* root index creating phase */ - if (this.level == TWO_LEVEL_INDEX) { - TreeMap<Tuple, LinkedList<Long>> rootMap = rootCollector.getMap(); - keySet = rootMap.keySet(); - - rootOut = fs.create(new Path(fileName + ".root")); - rootOut.writeInt(this.loadNum); - rootOut.writeInt(keySet.size()); - - /* root key writing */ - for (Tuple key : keySet) { - byte[] buf = rowStoreEncoder.toBytes(key); - rootOut.writeInt(buf.length); - rootOut.write(buf); - - LinkedList<Long> offsetList = rootMap.get(key); - if (offsetList.size() > 1 || offsetList.size() == 0) { - throw new IOException("Why root index doen't have one offset?"); - } - rootOut.writeLong(offsetList.getFirst()); + /* root index creating phase */ + if (writeRootIndex) { + if (sorted) { + //write root index header + rootIndexBuffer.writeInt(loadNum); + rootIndexBuffer.writeInt(rootEntrySize); + // write remaining data to file + flushBuffer(rootIndexBuffer, rootOutChannel, rootOut); + } else { + TreeMap<Tuple, LinkedList<Long>> rootMap = rootCollector.getMap(); + rootIndexBuffer.clear(); + /* root key writing */ + for (Map.Entry<Tuple, LinkedList<Long>> entry : rootMap.entrySet()) { + LinkedList<Long> offsetList = entry.getValue(); + if (offsetList.size() != 1) { + throw new IOException("Why root index doen't have one offset? offsets:" + offsetList.size()); + } + writeRootIndex(rootIndexBuffer, entry.getKey(), offsetList.getFirst()); + } + + //write root index header + rootIndexBuffer.writeInt(this.loadNum); + rootIndexBuffer.writeInt(rootEntrySize); + + flushBuffer(rootIndexBuffer, rootOutChannel, rootOut); + rootCollector.clear(); + } } - rootOut.flush(); - rootOut.close(); + } finally { + indexBuffer.release(); + rootIndexBuffer.release(); - keySet.clear(); - rootCollector.clear(); + FileUtil.cleanupAndthrowIfFailed(outChannel, outRandomAccessFile, out, + rootOutChannel, rootOutRandomAccessFile, rootOut); } } @@ -289,7 +450,6 @@ public class BSTIndex implements IndexMethod { private FileSystem fs; private FSDataInputStream indexIn; - private FSDataInputStream subIn; private int level; private int entryNum; @@ -301,6 +461,7 @@ public class BSTIndex implements IndexMethod { private int rootCursor; private int keyCursor; private int offsetCursor; + private long dataLength; // mutex private final Object mutex = new Object(); @@ -319,10 +480,12 @@ public class BSTIndex implements IndexMethod { this.keySchema = keySchema; this.comparator = comparator; this.rowStoreDecoder = RowStoreUtil.createDecoder(keySchema); + open(); } public BSTIndexReader(final Path fileName) throws IOException { this.fileName = fileName; + open(); } public Schema getKeySchema() { @@ -333,11 +496,21 @@ public class BSTIndex implements IndexMethod { return this.comparator; } - private void readHeader() throws IOException { + private void loadFooter() throws IOException { + long fileLength = fs.getFileStatus(this.fileName).getLen(); + + //read footer + indexIn.seek(fileLength - 4); + int footerSize = indexIn.readInt(); + dataLength = fileLength - footerSize; + ByteBuf byteBuf = Unpooled.buffer(footerSize, footerSize); + indexIn.seek(dataLength); + byteBuf.writeBytes(indexIn, footerSize); + // schema - int schemaByteSize = indexIn.readInt(); + int schemaByteSize = byteBuf.readInt(); byte [] schemaBytes = new byte[schemaByteSize]; - StorageUtil.readFully(indexIn, schemaBytes, 0, schemaByteSize); + byteBuf.readBytes(schemaBytes); SchemaProto.Builder builder = SchemaProto.newBuilder(); builder.mergeFrom(schemaBytes); @@ -346,30 +519,36 @@ public class BSTIndex implements IndexMethod { this.rowStoreDecoder = RowStoreUtil.createDecoder(keySchema); // comparator - int compByteSize = indexIn.readInt(); + int compByteSize = byteBuf.readInt(); byte [] compBytes = new byte[compByteSize]; - StorageUtil.readFully(indexIn, compBytes, 0, compByteSize); + byteBuf.readBytes(compBytes); TupleComparatorProto.Builder compProto = TupleComparatorProto.newBuilder(); compProto.mergeFrom(compBytes); this.comparator = new BaseTupleComparator(compProto.build()); // level - this.level = indexIn.readInt(); + this.level = byteBuf.readInt(); // entry - this.entryNum = indexIn.readInt(); + this.entryNum = byteBuf.readInt(); if (entryNum > 0) { // if there is no any entry, do not read firstKey/lastKey values - byte [] minBytes = new byte[indexIn.readInt()]; - StorageUtil.readFully(indexIn, minBytes, 0, minBytes.length); + byte [] minBytes = new byte[byteBuf.readInt()]; + byteBuf.readBytes(minBytes); this.firstKey = rowStoreDecoder.toTuple(minBytes); - byte [] maxBytes = new byte[indexIn.readInt()]; - StorageUtil.readFully(indexIn, maxBytes, 0, maxBytes.length); + byte [] maxBytes = new byte[byteBuf.readInt()]; + byteBuf.readBytes(maxBytes); this.lastKey = rowStoreDecoder.toTuple(maxBytes); } + byteBuf.release(); + } + + public void init() throws IOException { + open(); + fillData(); } - public void open() + private void open() throws IOException { /* init the index file */ fs = fileName.getFileSystem(conf); @@ -378,11 +557,11 @@ public class BSTIndex implements IndexMethod { } indexIn = fs.open(this.fileName); - readHeader(); - fillData(); + loadFooter(); } private void fillData() throws IOException { + indexIn.seek(0); /* load on memory */ if (this.level == TWO_LEVEL_INDEX) { @@ -391,13 +570,16 @@ public class BSTIndex implements IndexMethod { throw new FileNotFoundException("root index did not created"); } - subIn = indexIn; - indexIn = fs.open(rootPath); + try (FSDataInputStream rootIndexIn = fs.open(rootPath)) { + long fileLength = fs.getFileStatus(rootPath).getLen(); /* root index header reading : type => loadNum => indexSize */ - this.loadNum = indexIn.readInt(); - this.entryNum = indexIn.readInt(); - /**/ - fillRootIndex(entryNum, indexIn); + rootIndexIn.seek(fileLength - 8); + this.loadNum = rootIndexIn.readInt(); + this.entryNum = rootIndexIn.readInt(); + + rootIndexIn.seek(0); + fillRootIndex(entryNum, rootIndexIn); + } } else { fillLeafIndex(entryNum, indexIn, -1); @@ -455,7 +637,7 @@ public class BSTIndex implements IndexMethod { } else { if (offsetIndex.length -1 > rootCursor) { rootCursor++; - fillLeafIndex(loadNum + 1, subIn, this.offsetIndex[rootCursor]); + fillLeafIndex(loadNum + 1, indexIn, this.offsetIndex[rootCursor]); keyCursor = 1; offsetCursor = 0; } else { @@ -485,6 +667,10 @@ public class BSTIndex implements IndexMethod { byte[] buf; for (int i = 0; i < entryNum; i++) { counter++; + + if (in.getPos() >= dataLength) + throw new EOFException("Path:" + fileName + ", Pos: " + in.getPos() + ", Data len:" + dataLength); + buf = new byte[in.readInt()]; StorageUtil.readFully(in, buf, 0, buf.length); dataSubIndex[i] = rowStoreDecoder.toTuple(buf); @@ -494,10 +680,10 @@ public class BSTIndex implements IndexMethod { for (int j = 0; j < offsetNum; j++) { this.offsetSubIndex[i][j] = in.readLong(); } - } } catch (IOException e) { + //TODO this block should fix correctly counter--; if (pos != -1) { in.seek(pos); @@ -567,9 +753,9 @@ public class BSTIndex implements IndexMethod { } else { rootCursor = 0; } - fillLeafIndex(loadNum, subIn, this.offsetIndex[rootCursor]); + fillLeafIndex(loadNum, indexIn, this.offsetIndex[rootCursor]); pos = binarySearch(this.dataSubIndex, key, 0, this.dataSubIndex.length); - + return pos; } @@ -618,7 +804,6 @@ public class BSTIndex implements IndexMethod { @Override public void close() throws IOException { this.indexIn.close(); - this.subIn.close(); } @Override http://git-wip-us.apache.org/repos/asf/tajo/blob/ee6c2b5f/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java index 30cea60..a9d8ce2 100644 --- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java +++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java @@ -122,7 +122,7 @@ public class TestBSTIndex { BSTIndex.TWO_LEVEL_INDEX, keySchema, comp); creater.setLoadNum(LOAD_NUM); - creater.open(); + creater.init(); SeekableScanner scanner = OldStorageManager.getStorageManager(conf, meta.getDataFormat()). getSeekableScanner(meta, schema, tablet.getProto(), schema); @@ -147,7 +147,7 @@ public class TestBSTIndex { tuple = new VTuple(keySchema.size()); BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testFindValue_" + dataFormat + ".idx"), keySchema, comp); - reader.open(); + reader.init(); scanner = OldStorageManager.getStorageManager(conf, meta.getDataFormat()). getSeekableScanner(meta, schema, tablet.getProto(), schema); scanner.init(); @@ -197,7 +197,7 @@ public class TestBSTIndex { BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "testBuildIndexWithAppender_" + dataFormat + ".idx"), BSTIndex.TWO_LEVEL_INDEX, keySchema, comp); creater.setLoadNum(LOAD_NUM); - creater.open(); + creater.init(); Tuple tuple; long offset; @@ -227,7 +227,7 @@ public class TestBSTIndex { tuple = new VTuple(keySchema.size()); BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testBuildIndexWithAppender_" + dataFormat + ".idx"), keySchema, comp); - reader.open(); + reader.init(); SeekableScanner scanner = OldStorageManager.getStorageManager(conf, meta.getDataFormat()). getSeekableScanner(meta, schema, tablet.getProto(), schema); scanner.init(); @@ -290,7 +290,7 @@ public class TestBSTIndex { BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "testFindOmittedValue_" + dataFormat + ".idx"), BSTIndex.TWO_LEVEL_INDEX, keySchema, comp); creater.setLoadNum(LOAD_NUM); - creater.open(); + creater.init(); SeekableScanner scanner = OldStorageManager.getStorageManager(conf, meta.getDataFormat()). getSeekableScanner(meta, schema, tablet.getProto(), schema); @@ -315,7 +315,7 @@ public class TestBSTIndex { BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testFindOmittedValue_" + dataFormat + ".idx"), keySchema, comp); - reader.open(); + reader.init(); for (int i = 1; i < TUPLE_NUM - 1; i += 2) { keyTuple.put(0, DatumFactory.createInt8(i)); keyTuple.put(1, DatumFactory.createFloat8(i)); @@ -363,7 +363,7 @@ public class TestBSTIndex { BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "testFindNextKeyValue_" + dataFormat + ".idx"), BSTIndex.TWO_LEVEL_INDEX, keySchema, comp); creater.setLoadNum(LOAD_NUM); - creater.open(); + creater.init(); SeekableScanner scanner = OldStorageManager.getStorageManager(conf, meta.getDataFormat()). getSeekableScanner(meta, schema, tablet.getProto(), schema); @@ -388,7 +388,7 @@ public class TestBSTIndex { BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testFindNextKeyValue_" + dataFormat + ".idx"), keySchema, comp); - reader.open(); + reader.init(); scanner = OldStorageManager.getStorageManager(conf, meta.getDataFormat()). getSeekableScanner(meta, schema, tablet.getProto(), schema); scanner.init(); @@ -456,7 +456,7 @@ public class TestBSTIndex { BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "testFindNextKeyOmittedValue_" + dataFormat + ".idx"), BSTIndex.TWO_LEVEL_INDEX, keySchema, comp); creater.setLoadNum(LOAD_NUM); - creater.open(); + creater.init(); SeekableScanner scanner = OldStorageManager.getStorageManager(conf, meta.getDataFormat()). getSeekableScanner(meta, schema, tablet.getProto(), schema); @@ -481,7 +481,7 @@ public class TestBSTIndex { BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testFindNextKeyOmittedValue_" + dataFormat + ".idx"), keySchema, comp); - reader.open(); + reader.init(); scanner = OldStorageManager.getStorageManager(conf, meta.getDataFormat()). getSeekableScanner(meta, schema, tablet.getProto(), schema); scanner.init(); @@ -538,7 +538,7 @@ public class TestBSTIndex { BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "testFindMinValue_" + dataFormat + ".idx"), BSTIndex.TWO_LEVEL_INDEX, keySchema, comp); creater.setLoadNum(LOAD_NUM); - creater.open(); + creater.init(); SeekableScanner scanner = OldStorageManager.getStorageManager(conf, meta.getDataFormat()). getSeekableScanner(meta, schema, tablet.getProto(), schema); @@ -565,7 +565,7 @@ public class TestBSTIndex { BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testFindMinValue_" + dataFormat + ".idx"), keySchema, comp); - reader.open(); + reader.init(); scanner = OldStorageManager.getStorageManager(conf, meta.getDataFormat()). getSeekableScanner(meta, schema, tablet.getProto(), schema); scanner.init(); @@ -624,7 +624,7 @@ public class TestBSTIndex { BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "testMinMax_" + dataFormat + ".idx"), BSTIndex.TWO_LEVEL_INDEX, keySchema, comp); creater.setLoadNum(LOAD_NUM); - creater.open(); + creater.init(); SeekableScanner scanner = OldStorageManager.getStorageManager(conf, meta.getDataFormat()). getSeekableScanner(meta, schema, tablet.getProto(), schema); @@ -649,7 +649,7 @@ public class TestBSTIndex { BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testMinMax_" + dataFormat + ".idx"), keySchema, comp); - reader.open(); + reader.init(); Tuple min = reader.getFirstKey(); assertEquals(5, min.getInt4(0)); @@ -731,7 +731,7 @@ public class TestBSTIndex { BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "testConcurrentAccess_" + dataFormat + ".idx"), BSTIndex.TWO_LEVEL_INDEX, keySchema, comp); creater.setLoadNum(LOAD_NUM); - creater.open(); + creater.init(); SeekableScanner scanner = OldStorageManager.getStorageManager(conf, meta.getDataFormat()). getSeekableScanner(meta, schema, tablet.getProto(), schema); @@ -756,7 +756,7 @@ public class TestBSTIndex { BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testConcurrentAccess_" + dataFormat + ".idx"), keySchema, comp); - reader.open(); + reader.init(); Thread[] threads = new Thread[5]; ConcurrentAccessor[] accs = new ConcurrentAccessor[5]; @@ -812,9 +812,9 @@ public class TestBSTIndex { BSTIndex bst = new BSTIndex(conf); BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "testFindValueDescOrder_" + dataFormat + ".idx"), - BSTIndex.TWO_LEVEL_INDEX, keySchema, comp); + BSTIndex.TWO_LEVEL_INDEX, keySchema, comp, true); creater.setLoadNum(LOAD_NUM); - creater.open(); + creater.init(); SeekableScanner scanner = OldStorageManager.getStorageManager(conf, meta.getDataFormat()). getSeekableScanner(meta, schema, tablet.getProto(), schema); @@ -841,7 +841,7 @@ public class TestBSTIndex { BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testFindValueDescOrder_" + dataFormat + ".idx"), keySchema, comp); - reader.open(); + reader.init(); scanner = OldStorageManager.getStorageManager(conf, meta.getDataFormat()). getSeekableScanner(meta, schema, tablet.getProto(), schema); scanner.init(); @@ -906,7 +906,7 @@ public class TestBSTIndex { BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "testFindNextKeyValueDescOrder_" + dataFormat + ".idx"), BSTIndex.TWO_LEVEL_INDEX, keySchema, comp); creater.setLoadNum(LOAD_NUM); - creater.open(); + creater.init(); SeekableScanner scanner = OldStorageManager.getStorageManager(conf, meta.getDataFormat()). getSeekableScanner(meta, schema, tablet.getProto(), schema); @@ -932,7 +932,7 @@ public class TestBSTIndex { BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testFindNextKeyValueDescOrder_" + dataFormat + ".idx"), keySchema, comp); - reader.open(); + reader.init(); assertEquals(keySchema, reader.getKeySchema()); assertEquals(comp, reader.getComparator()); @@ -965,4 +965,97 @@ public class TestBSTIndex { reader.close(); scanner.close(); } + + @Test + public void testFindValueASCOrder() throws IOException { + meta = CatalogUtil.newTableMeta(dataFormat); + + Path tablePath = new Path(testDir, "testFindValue_" + dataFormat); + Appender appender = ((FileTablespace) TablespaceManager.getLocalFs()) + .getAppender(meta, schema, tablePath); + appender.init(); + Tuple tuple; + + // order by asc + for (int i = 0; i < TUPLE_NUM; i++) { + tuple = new VTuple(5); + tuple.put(0, DatumFactory.createInt4(i)); + tuple.put(1, DatumFactory.createInt8(i)); + tuple.put(2, DatumFactory.createFloat8(i)); + tuple.put(3, DatumFactory.createFloat4(i)); + tuple.put(4, DatumFactory.createText("field_" + i)); + appender.addTuple(tuple); + } + appender.close(); + + FileStatus status = fs.getFileStatus(tablePath); + long fileLen = status.getLen(); + FileFragment tablet = new FileFragment("table1_1", status.getPath(), 0, fileLen); + + SortSpec[] sortKeys = new SortSpec[2]; + sortKeys[0] = new SortSpec(schema.getColumn("long"), true, false); + sortKeys[1] = new SortSpec(schema.getColumn("double"), true, false); + + Schema keySchema = new Schema(); + keySchema.addColumn(new Column("long", Type.INT8)); + keySchema.addColumn(new Column("double", Type.FLOAT8)); + + BaseTupleComparator comp = new BaseTupleComparator(keySchema, sortKeys); + + BSTIndex bst = new BSTIndex(conf); + BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "testFindValue_" + dataFormat + ".idx"), + BSTIndex.TWO_LEVEL_INDEX, + keySchema, comp, true); + creater.setLoadNum(LOAD_NUM); + creater.init(); + + SeekableScanner scanner = OldStorageManager.getStorageManager(conf, meta.getDataFormat()). + getSeekableScanner(meta, schema, tablet.getProto(), schema); + scanner.init(); + + Tuple keyTuple; + long offset; + while (true) { + keyTuple = new VTuple(2); + offset = scanner.getNextOffset(); + tuple = scanner.next(); + if (tuple == null) break; + + keyTuple.put(0, tuple.asDatum(1)); + keyTuple.put(1, tuple.asDatum(2)); + creater.write(keyTuple, offset); + } + + creater.flush(); + creater.close(); + scanner.close(); + + tuple = new VTuple(keySchema.size()); + BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testFindValue_" + dataFormat + ".idx"), keySchema, comp); + reader.init(); + scanner = OldStorageManager.getStorageManager(conf, meta.getDataFormat()). + getSeekableScanner(meta, schema, tablet.getProto(), schema); + scanner.init(); + + for (int i = 0; i < TUPLE_NUM - 1; i++) { + tuple.put(0, DatumFactory.createInt8(i)); + tuple.put(1, DatumFactory.createFloat8(i)); + long offsets = reader.find(tuple); + scanner.seek(offsets); + tuple = scanner.next(); + assertTrue("seek check [" + (i) + " ," + (tuple.getInt8(1)) + "]", (i) == (tuple.getInt8(1))); + assertTrue("seek check [" + (i) + " ," + (tuple.getFloat8(2)) + "]", (i) == (tuple.getFloat8(2))); + + offsets = reader.next(); + if (offsets == -1) { + continue; + } + scanner.seek(offsets); + tuple = scanner.next(); + assertTrue("[seek check " + (i + 1) + " ]", (i + 1) == (tuple.getInt4(0))); + assertTrue("[seek check " + (i + 1) + " ]", (i + 1) == (tuple.getInt8(1))); + } + reader.close(); + scanner.close(); + } } http://git-wip-us.apache.org/repos/asf/tajo/blob/ee6c2b5f/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestSingleCSVFileBSTIndex.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestSingleCSVFileBSTIndex.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestSingleCSVFileBSTIndex.java index 8262073..b2ca5b8 100644 --- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestSingleCSVFileBSTIndex.java +++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestSingleCSVFileBSTIndex.java @@ -109,7 +109,7 @@ public class TestSingleCSVFileBSTIndex { BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "FindValueInCSV.idx"), BSTIndex.TWO_LEVEL_INDEX, keySchema, comp); creater.setLoadNum(LOAD_NUM); - creater.open(); + creater.init(); SeekableScanner fileScanner = OldStorageManager.getStorageManager(conf, meta.getDataFormat()) .getSeekableScanner(meta, schema, tablet.getProto(), schema); @@ -135,7 +135,7 @@ public class TestSingleCSVFileBSTIndex { tuple = new VTuple(keySchema.size()); BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "FindValueInCSV.idx"), keySchema, comp); - reader.open(); + reader.init(); fileScanner = OldStorageManager.getStorageManager(conf, meta.getDataFormat()) .getSeekableScanner(meta, schema, tablet.getProto(), schema); fileScanner.init(); @@ -200,7 +200,7 @@ public class TestSingleCSVFileBSTIndex { BSTIndexWriter creater = bst.getIndexWriter(new Path(testDir, "FindNextKeyValueInCSV.idx"), BSTIndex.TWO_LEVEL_INDEX, keySchema, comp); creater.setLoadNum(LOAD_NUM); - creater.open(); + creater.init(); SeekableScanner fileScanner = OldStorageManager.getStorageManager(conf, meta.getDataFormat()) .getSeekableScanner(meta, schema, tablet.getProto(), schema); @@ -223,7 +223,7 @@ public class TestSingleCSVFileBSTIndex { fileScanner.close(); BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "FindNextKeyValueInCSV.idx"), keySchema, comp); - reader.open(); + reader.init(); fileScanner = OldStorageManager.getStorageManager(conf, meta.getDataFormat()) .getSeekableScanner(meta, schema, tablet.getProto(), schema); fileScanner.init();
