Repository: tajo Updated Branches: refs/heads/master a2a2c3ca9 -> e06ffa93c
TAJO-692: Missing Null handling for INET4 in RowStoreUtil. (jihoon) Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/e06ffa93 Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/e06ffa93 Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/e06ffa93 Branch: refs/heads/master Commit: e06ffa93cf60ace4949ab7218077b7755c2f2425 Parents: a2a2c3c Author: Jihoon Son <[email protected]> Authored: Wed Mar 26 18:09:24 2014 +0900 Committer: Jihoon Son <[email protected]> Committed: Wed Mar 26 18:09:24 2014 +0900 ---------------------------------------------------------------------- CHANGES.txt | 2 + .../engine/planner/RangePartitionAlgorithm.java | 2 +- .../org/apache/tajo/engine/utils/TupleUtil.java | 13 +- .../tajo/master/querymaster/Repartitioner.java | 4 +- .../tajo/worker/RangeRetrieverHandler.java | 8 +- .../planner/physical/TestBSTIndexExec.java | 5 - .../planner/physical/TestPhysicalPlanner.java | 6 +- .../apache/tajo/engine/query/TestNetTypes.java | 11 ++ .../apache/tajo/engine/util/TestTupleUtil.java | 15 ++- .../tajo/worker/TestRangeRetrieverHandler.java | 6 +- .../dataset/TestNetTypes/table2/table2.tbl | 3 +- .../queries/TestNetTypes/testSort2.sql | 1 + .../results/TestNetTypes/testJoin.result | 2 +- .../results/TestNetTypes/testSort2.result | 6 + .../tajo/pullserver/PullServerAuxService.java | 7 +- .../tajo/pullserver/TajoPullServerService.java | 7 +- .../org/apache/tajo/storage/RowStoreUtil.java | 122 +++++++++++-------- .../apache/tajo/storage/index/bst/BSTIndex.java | 30 +++-- 18 files changed, 156 insertions(+), 94 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/e06ffa93/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index ad923bb..abadb62 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -283,6 +283,8 @@ Release 0.8.0 - unreleased BUG FIXES + TAJO-692: Missing Null handling for INET4 in RowStoreUtil. (jihoon) + TAJO-701: Invalid bytes when creating BlobDatum with offset. (jinho) TAJO-708: Test failure after a successful test. (jihoon) http://git-wip-us.apache.org/repos/asf/tajo/blob/e06ffa93/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/RangePartitionAlgorithm.java ---------------------------------------------------------------------- diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/RangePartitionAlgorithm.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/RangePartitionAlgorithm.java index c4b1ae1..35876ba 100644 --- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/RangePartitionAlgorithm.java +++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/RangePartitionAlgorithm.java @@ -148,7 +148,7 @@ public abstract class RangePartitionAlgorithm { throw new UnsupportedOperationException(dataType + " is not supported yet"); } - return inclusive ? columnCard.add(new BigDecimal(1)) : columnCard; + return inclusive ? columnCard.add(new BigDecimal(1)).abs() : columnCard.abs(); } /** http://git-wip-us.apache.org/repos/asf/tajo/blob/e06ffa93/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/utils/TupleUtil.java ---------------------------------------------------------------------- diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/utils/TupleUtil.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/utils/TupleUtil.java index 54c6f74..9809aee 100644 --- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/utils/TupleUtil.java +++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/utils/TupleUtil.java @@ -30,7 +30,7 @@ import org.apache.tajo.catalog.statistics.ColumnStats; import org.apache.tajo.datum.DatumFactory; import org.apache.tajo.datum.NullDatum; import org.apache.tajo.engine.eval.EvalNode; -import org.apache.tajo.storage.RowStoreUtil; +import org.apache.tajo.storage.RowStoreUtil.RowStoreEncoder; import org.apache.tajo.storage.Tuple; import org.apache.tajo.storage.TupleRange; import org.apache.tajo.storage.VTuple; @@ -46,11 +46,14 @@ public class TupleUtil { public static String rangeToQuery(Schema schema, TupleRange range, boolean last) throws UnsupportedEncodingException { + return rangeToQuery(range, last, RowStoreEncoder.createInstance(schema)); + } + + public static String rangeToQuery(TupleRange range, boolean last, RowStoreEncoder encoder) + throws UnsupportedEncodingException { StringBuilder sb = new StringBuilder(); - byte [] firstKeyBytes = RowStoreUtil.RowStoreEncoder - .toBytes(schema, range.getStart()); - byte [] endKeyBytes = RowStoreUtil.RowStoreEncoder - .toBytes(schema, range.getEnd()); + byte [] firstKeyBytes = encoder.toBytes(range.getStart()); + byte [] endKeyBytes = encoder.toBytes(range.getEnd()); String firstKeyBase64 = new String(Base64.encodeBase64(firstKeyBytes)); String lastKeyBase64 = new String(Base64.encodeBase64(endKeyBytes)); http://git-wip-us.apache.org/repos/asf/tajo/blob/e06ffa93/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java ---------------------------------------------------------------------- diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java index 36203bb..b2adaa4 100644 --- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java +++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java @@ -43,6 +43,7 @@ import org.apache.tajo.exception.InternalException; import org.apache.tajo.master.TaskSchedulerContext; import org.apache.tajo.master.querymaster.QueryUnit.IntermediateEntry; import org.apache.tajo.storage.AbstractStorageManager; +import org.apache.tajo.storage.RowStoreUtil; import org.apache.tajo.storage.TupleRange; import org.apache.tajo.storage.fragment.FileFragment; import org.apache.tajo.util.TUtil; @@ -378,11 +379,12 @@ public class Repartitioner { Set<URI> uris; try { + RowStoreUtil.RowStoreEncoder encoder = RowStoreUtil.RowStoreEncoder.createInstance(sortSchema); for (int i = 0; i < ranges.length; i++) { uris = new HashSet<URI>(); for (String uri: basicFetchURIs) { String rangeParam = - TupleUtil.rangeToQuery(sortSchema, ranges[i], ascendingFirstKey ? i == (ranges.length - 1) : i == 0); + TupleUtil.rangeToQuery(ranges[i], ascendingFirstKey ? i == (ranges.length - 1) : i == 0, encoder); URI finalUri = URI.create(uri + "&" + rangeParam); uris.add(finalUri); } http://git-wip-us.apache.org/repos/asf/tajo/blob/e06ffa93/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/RangeRetrieverHandler.java ---------------------------------------------------------------------- diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/RangeRetrieverHandler.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/RangeRetrieverHandler.java index a54fa80..0e8ae72 100644 --- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/RangeRetrieverHandler.java +++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/RangeRetrieverHandler.java @@ -26,7 +26,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.tajo.catalog.Schema; import org.apache.tajo.conf.TajoConf; -import org.apache.tajo.storage.RowStoreUtil; +import org.apache.tajo.storage.RowStoreUtil.RowStoreDecoder; import org.apache.tajo.storage.Tuple; import org.apache.tajo.storage.TupleComparator; import org.apache.tajo.storage.index.bst.BSTIndex; @@ -57,6 +57,7 @@ public class RangeRetrieverHandler implements RetrieverHandler { private final BSTIndex.BSTIndexReader idxReader; private final Schema schema; private final TupleComparator comp; + private final RowStoreDecoder decoder; public RangeRetrieverHandler(File outDir, Schema schema, TupleComparator comp) throws IOException { this.file = outDir; @@ -70,6 +71,7 @@ public class RangeRetrieverHandler implements RetrieverHandler { this.idxReader.open(); LOG.info("BSTIndex is loaded from disk (" + idxReader.getFirstKey() + ", " + idxReader.getLastKey()); + this.decoder = RowStoreDecoder.createInstance(schema); } @Override @@ -78,11 +80,11 @@ public class RangeRetrieverHandler implements RetrieverHandler { // its validity of the file. File data = new File(this.file, "data/data"); byte [] startBytes = Base64.decodeBase64(kvs.get("start").get(0)); - Tuple start = RowStoreUtil.RowStoreDecoder.toTuple(schema, startBytes); + Tuple start = decoder.toTuple(startBytes); byte [] endBytes; Tuple end; endBytes = Base64.decodeBase64(kvs.get("end").get(0)); - end = RowStoreUtil.RowStoreDecoder.toTuple(schema, endBytes); + end = decoder.toTuple(endBytes); boolean last = kvs.containsKey("final"); if(!comp.isAscendingFirstKey()) { http://git-wip-us.apache.org/repos/asf/tajo/blob/e06ffa93/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java index b74527a..a47bde3 100644 --- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java +++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java @@ -187,11 +187,6 @@ public class TestBSTIndexExec { assertEquals(tupleCount , counter); } - @After - public void shutdown() { - - } - private class TmpPlanner extends PhysicalPlannerImpl { public TmpPlanner(TajoConf conf, AbstractStorageManager sm) { super(conf, sm); http://git-wip-us.apache.org/repos/asf/tajo/blob/e06ffa93/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java ---------------------------------------------------------------------- diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java index f32cd1e..cf17d89 100644 --- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java +++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java @@ -48,6 +48,7 @@ import org.apache.tajo.ipc.TajoWorkerProtocol; import org.apache.tajo.master.TajoMaster; import org.apache.tajo.master.session.Session; import org.apache.tajo.storage.*; +import org.apache.tajo.storage.RowStoreUtil.RowStoreEncoder; import org.apache.tajo.storage.fragment.FileFragment; import org.apache.tajo.storage.index.bst.BSTIndex; import org.apache.tajo.util.CommonTestingUtil; @@ -895,6 +896,7 @@ public class TestPhysicalPlanner { // The below is for testing RangeRetrieverHandler. + RowStoreEncoder encoder = RowStoreEncoder.createInstance(keySchema); RangeRetrieverHandler handler = new RangeRetrieverHandler( new File(new Path(workDir, "output").toUri()), keySchema, comp); Map<String,List<String>> kvs = Maps.newHashMap(); @@ -902,12 +904,12 @@ public class TestPhysicalPlanner { startTuple.put(0, DatumFactory.createInt4(50)); kvs.put("start", Lists.newArrayList( new String(Base64.encodeBase64( - RowStoreUtil.RowStoreEncoder.toBytes(keySchema, startTuple), false)))); + encoder.toBytes(startTuple), false)))); Tuple endTuple = new VTuple(1); endTuple.put(0, DatumFactory.createInt4(80)); kvs.put("end", Lists.newArrayList( new String(Base64.encodeBase64( - RowStoreUtil.RowStoreEncoder.toBytes(keySchema, endTuple), false)))); + encoder.toBytes(endTuple), false)))); FileChunk chunk = handler.get(kvs); scanner.seek(chunk.startOffset()); http://git-wip-us.apache.org/repos/asf/tajo/blob/e06ffa93/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestNetTypes.java ---------------------------------------------------------------------- diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestNetTypes.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestNetTypes.java index 17235a6..182bf5b 100644 --- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestNetTypes.java +++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestNetTypes.java @@ -79,6 +79,17 @@ public class TestNetTypes extends QueryTestCaseBase { } @Test + public final void testSort2() throws Exception { + // Skip all tests when HCatalogStore is used. + if (!testingCluster.isHCatalogStoreRunning()) { + // select addr from table2 order by addr; + ResultSet res = executeQuery(); + assertResultSet(res); + cleanupQuery(res); + } + } + + @Test public final void testJoin() throws Exception { // Skip all tests when HCatalogStore is used. if (!testingCluster.isHCatalogStoreRunning()) { http://git-wip-us.apache.org/repos/asf/tajo/blob/e06ffa93/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/util/TestTupleUtil.java ---------------------------------------------------------------------- diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/util/TestTupleUtil.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/util/TestTupleUtil.java index c6ec236..86fa798 100644 --- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/util/TestTupleUtil.java +++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/util/TestTupleUtil.java @@ -28,7 +28,12 @@ import org.apache.tajo.engine.planner.PlannerUtil; import org.apache.tajo.engine.planner.RangePartitionAlgorithm; import org.apache.tajo.engine.planner.UniformRangePartition; import org.apache.tajo.engine.utils.TupleUtil; -import org.apache.tajo.storage.*; +import org.apache.tajo.storage.RowStoreUtil.RowStoreDecoder; +import org.apache.tajo.storage.RowStoreUtil.RowStoreEncoder; +import org.apache.tajo.storage.Tuple; +import org.apache.tajo.storage.TupleComparator; +import org.apache.tajo.storage.TupleRange; +import org.apache.tajo.storage.VTuple; import org.junit.Test; import static org.junit.Assert.*; @@ -64,9 +69,11 @@ public class TestTupleUtil { DatumFactory.createBlob("hyunsik".getBytes()), DatumFactory.createInet4("192.168.0.1") }); - - byte [] bytes = RowStoreUtil.RowStoreEncoder.toBytes(schema, tuple); - Tuple tuple2 = RowStoreUtil.RowStoreDecoder.toTuple(schema, bytes); + + RowStoreEncoder encoder = RowStoreEncoder.createInstance(schema); + RowStoreDecoder decoder = RowStoreDecoder.createInstance(schema); + byte [] bytes = encoder.toBytes(tuple); + Tuple tuple2 = decoder.toTuple(bytes); assertEquals(tuple, tuple2); } http://git-wip-us.apache.org/repos/asf/tajo/blob/e06ffa93/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java ---------------------------------------------------------------------- diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java index de4560e..4e770ce 100644 --- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java +++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java @@ -42,6 +42,7 @@ import org.apache.tajo.engine.planner.physical.PhysicalExec; import org.apache.tajo.engine.planner.physical.ProjectionExec; import org.apache.tajo.engine.planner.physical.RangeShuffleFileWriteExec; import org.apache.tajo.storage.*; +import org.apache.tajo.storage.RowStoreUtil.RowStoreEncoder; import org.apache.tajo.storage.fragment.FileFragment; import org.apache.tajo.storage.index.bst.BSTIndex; import org.apache.tajo.util.CommonTestingUtil; @@ -355,13 +356,14 @@ public class TestRangeRetrieverHandler { private FileChunk getFileChunk(RangeRetrieverHandler handler, Schema keySchema, TupleRange range, boolean last) throws IOException { Map<String,List<String>> kvs = Maps.newHashMap(); + RowStoreEncoder encoder = RowStoreEncoder.createInstance(keySchema); kvs.put("start", Lists.newArrayList( new String(Base64.encodeBase64( - RowStoreUtil.RowStoreEncoder.toBytes(keySchema, range.getStart()), + encoder.toBytes(range.getStart()), false)))); kvs.put("end", Lists.newArrayList( new String(Base64.encodeBase64( - RowStoreUtil.RowStoreEncoder.toBytes(keySchema, range.getEnd()), false)))); + encoder.toBytes(range.getEnd()), false)))); if (last) { kvs.put("final", Lists.newArrayList("true")); http://git-wip-us.apache.org/repos/asf/tajo/blob/e06ffa93/tajo-core/tajo-core-backend/src/test/resources/dataset/TestNetTypes/table2/table2.tbl ---------------------------------------------------------------------- diff --git a/tajo-core/tajo-core-backend/src/test/resources/dataset/TestNetTypes/table2/table2.tbl b/tajo-core/tajo-core-backend/src/test/resources/dataset/TestNetTypes/table2/table2.tbl index f33b22c..9d46bc0 100644 --- a/tajo-core/tajo-core-backend/src/test/resources/dataset/TestNetTypes/table2/table2.tbl +++ b/tajo-core/tajo-core-backend/src/test/resources/dataset/TestNetTypes/table2/table2.tbl @@ -1,4 +1,5 @@ 1|NULL|NULL|a|127.0.0.8 2|NULL|NULL|b|127.0.0.8 NULL|NULL|10.0|c|NULL -NULL|NULL|20.0|d|127.0.0.1 \ No newline at end of file +NULL|NULL|20.0|d|127.0.0.1 +NULL|NULL|20.0|d|255.255.255.255 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/e06ffa93/tajo-core/tajo-core-backend/src/test/resources/queries/TestNetTypes/testSort2.sql ---------------------------------------------------------------------- diff --git a/tajo-core/tajo-core-backend/src/test/resources/queries/TestNetTypes/testSort2.sql b/tajo-core/tajo-core-backend/src/test/resources/queries/TestNetTypes/testSort2.sql new file mode 100644 index 0000000..b613d4a --- /dev/null +++ b/tajo-core/tajo-core-backend/src/test/resources/queries/TestNetTypes/testSort2.sql @@ -0,0 +1 @@ +select addr from table2 order by addr; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/e06ffa93/tajo-core/tajo-core-backend/src/test/resources/results/TestNetTypes/testJoin.result ---------------------------------------------------------------------- diff --git a/tajo-core/tajo-core-backend/src/test/resources/results/TestNetTypes/testJoin.result b/tajo-core/tajo-core-backend/src/test/resources/results/TestNetTypes/testJoin.result index b5817f8..9d2cdf1 100644 --- a/tajo-core/tajo-core-backend/src/test/resources/results/TestNetTypes/testJoin.result +++ b/tajo-core/tajo-core-backend/src/test/resources/results/TestNetTypes/testJoin.result @@ -1,6 +1,6 @@ id,name,score,type,addr,id,name,score,type,addr ------------------------------- -0,,20.0,d,127.0.0.1,1,ooo,1.1,a,127.0.0.1 1,,0.0,a,127.0.0.8,3,qqq,3.4,c,127.0.0.8 2,,0.0,b,127.0.0.8,3,qqq,3.4,c,127.0.0.8 +0,,20.0,d,127.0.0.1,1,ooo,1.1,a,127.0.0.1 0,,20.0,d,127.0.0.1,4,rrr,4.5,d,127.0.0.1 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/e06ffa93/tajo-core/tajo-core-backend/src/test/resources/results/TestNetTypes/testSort2.result ---------------------------------------------------------------------- diff --git a/tajo-core/tajo-core-backend/src/test/resources/results/TestNetTypes/testSort2.result b/tajo-core/tajo-core-backend/src/test/resources/results/TestNetTypes/testSort2.result new file mode 100644 index 0000000..debbb98 --- /dev/null +++ b/tajo-core/tajo-core-backend/src/test/resources/results/TestNetTypes/testSort2.result @@ -0,0 +1,6 @@ +addr +------------------------------- +127.0.0.1 +127.0.0.8 +127.0.0.8 +255.255.255.255 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/e06ffa93/tajo-core/tajo-core-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerAuxService.java ---------------------------------------------------------------------- diff --git a/tajo-core/tajo-core-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerAuxService.java b/tajo-core/tajo-core-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerAuxService.java index 8054a40..d098797 100644 --- a/tajo-core/tajo-core-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerAuxService.java +++ b/tajo-core/tajo-core-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerAuxService.java @@ -48,7 +48,7 @@ import org.apache.tajo.catalog.Schema; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.conf.TajoConf.ConfVars; import org.apache.tajo.pullserver.retriever.FileChunk; -import org.apache.tajo.storage.RowStoreUtil; +import org.apache.tajo.storage.RowStoreUtil.RowStoreDecoder; import org.apache.tajo.storage.Tuple; import org.apache.tajo.storage.TupleComparator; import org.apache.tajo.storage.index.bst.BSTIndex; @@ -563,17 +563,18 @@ public class PullServerAuxService extends AuxiliaryService { byte [] startBytes = Base64.decodeBase64(startKey); byte [] endBytes = Base64.decodeBase64(endKey); + RowStoreDecoder decoder = RowStoreDecoder.createInstance(keySchema); Tuple start; Tuple end; try { - start = RowStoreUtil.RowStoreDecoder.toTuple(keySchema, startBytes); + start = decoder.toTuple(startBytes); } catch (Throwable t) { throw new IllegalArgumentException("StartKey: " + startKey + ", decoded byte size: " + startBytes.length, t); } try { - end = RowStoreUtil.RowStoreDecoder.toTuple(keySchema, endBytes); + end = decoder.toTuple(endBytes); } catch (Throwable t) { throw new IllegalArgumentException("EndKey: " + endKey + ", decoded byte size: " + endBytes.length, t); http://git-wip-us.apache.org/repos/asf/tajo/blob/e06ffa93/tajo-core/tajo-core-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java ---------------------------------------------------------------------- diff --git a/tajo-core/tajo-core-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java b/tajo-core/tajo-core-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java index a427635..c1fcef1 100644 --- a/tajo-core/tajo-core-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java +++ b/tajo-core/tajo-core-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java @@ -45,7 +45,7 @@ import org.apache.tajo.conf.TajoConf; import org.apache.tajo.conf.TajoConf.ConfVars; import org.apache.tajo.pullserver.retriever.FileChunk; import org.apache.tajo.rpc.RpcChannelFactory; -import org.apache.tajo.storage.RowStoreUtil; +import org.apache.tajo.storage.RowStoreUtil.RowStoreDecoder; import org.apache.tajo.storage.Tuple; import org.apache.tajo.storage.TupleComparator; import org.apache.tajo.storage.index.bst.BSTIndex; @@ -560,17 +560,18 @@ public class TajoPullServerService extends AbstractService { byte [] startBytes = Base64.decodeBase64(startKey); byte [] endBytes = Base64.decodeBase64(endKey); + RowStoreDecoder decoder = RowStoreDecoder.createInstance(keySchema); Tuple start; Tuple end; try { - start = RowStoreUtil.RowStoreDecoder.toTuple(keySchema, startBytes); + start = decoder.toTuple(startBytes); } catch (Throwable t) { throw new IllegalArgumentException("StartKey: " + startKey + ", decoded byte size: " + startBytes.length, t); } try { - end = RowStoreUtil.RowStoreDecoder.toTuple(keySchema, endBytes); + end = decoder.toTuple(endBytes); } catch (Throwable t) { throw new IllegalArgumentException("EndKey: " + endKey + ", decoded byte size: " + endBytes.length, t); http://git-wip-us.apache.org/repos/asf/tajo/blob/e06ffa93/tajo-storage/src/main/java/org/apache/tajo/storage/RowStoreUtil.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/RowStoreUtil.java b/tajo-storage/src/main/java/org/apache/tajo/storage/RowStoreUtil.java index 7a421a8..66d016b 100644 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/RowStoreUtil.java +++ b/tajo-storage/src/main/java/org/apache/tajo/storage/RowStoreUtil.java @@ -22,7 +22,7 @@ import org.apache.tajo.catalog.Column; import org.apache.tajo.catalog.Schema; import org.apache.tajo.common.TajoDataTypes; import org.apache.tajo.datum.DatumFactory; -import org.apache.tajo.util.Bytes; +import org.apache.tajo.util.BitArray; import java.nio.ByteBuffer; @@ -48,101 +48,90 @@ public class RowStoreUtil { public static class RowStoreDecoder { - public static Tuple toTuple(Schema schema, byte [] bytes) { + private Schema schema; + private BitArray nullFlags; + private int headerSize; + + public static RowStoreDecoder createInstance(Schema schema) { + return new RowStoreDecoder(schema); + } + + private RowStoreDecoder(Schema schema) { + this.schema = schema; + nullFlags = new BitArray(schema.size()); + headerSize = nullFlags.bytesLength(); + } + + + public Tuple toTuple(byte [] bytes) { + nullFlags.clear(); ByteBuffer bb = ByteBuffer.wrap(bytes); Tuple tuple = new VTuple(schema.size()); Column col; TajoDataTypes.DataType type; + + bb.limit(headerSize); + nullFlags.fromByteBuffer(bb); + bb.limit(bytes.length); + for (int i =0; i < schema.size(); i++) { + if (nullFlags.get(i)) { + tuple.put(i, DatumFactory.createNullDatum()); + continue; + } + col = schema.getColumn(i); type = col.getDataType(); switch (type.getType()) { case BOOLEAN: tuple.put(i, DatumFactory.createBool(bb.get())); break; case BIT: byte b = bb.get(); - if(b == 0) { - tuple.put(i, DatumFactory.createNullDatum()); - } else { - tuple.put(i, DatumFactory.createBit(b)); - } + tuple.put(i, DatumFactory.createBit(b)); break; case CHAR: byte c = bb.get(); - if(c == 0) { - tuple.put(i, DatumFactory.createNullDatum()); - } else { - tuple.put(i, DatumFactory.createChar(c)); - } + tuple.put(i, DatumFactory.createChar(c)); break; case INT2: short s = bb.getShort(); - if(s < Short.MIN_VALUE + 1) { - tuple.put(i, DatumFactory.createNullDatum()); - }else { - tuple.put(i, DatumFactory.createInt2(s)); - } + tuple.put(i, DatumFactory.createInt2(s)); break; case INT4: case DATE: int i_ = bb.getInt(); - if ( i_ < Integer.MIN_VALUE + 1) { - tuple.put(i, DatumFactory.createNullDatum()); - } else { - tuple.put(i, DatumFactory.createFromInt4(type, i_)); - } + tuple.put(i, DatumFactory.createFromInt4(type, i_)); break; case INT8: case TIME: case TIMESTAMP: long l = bb.getLong(); - if ( l < Long.MIN_VALUE + 1) { - tuple.put(i, DatumFactory.createNullDatum()); - }else { - tuple.put(i, DatumFactory.createFromInt8(type, l)); - } + tuple.put(i, DatumFactory.createFromInt8(type, l)); break; case FLOAT4: float f = bb.getFloat(); - if (Float.isNaN(f)) { - tuple.put(i, DatumFactory.createNullDatum()); - }else { - tuple.put(i, DatumFactory.createFloat4(f)); - } + tuple.put(i, DatumFactory.createFloat4(f)); break; case FLOAT8: double d = bb.getDouble(); - if(Double.isNaN(d)) { - tuple.put(i, DatumFactory.createNullDatum()); - }else { - tuple.put(i, DatumFactory.createFloat8(d)); - } + tuple.put(i, DatumFactory.createFloat8(d)); break; case TEXT: byte [] _string = new byte[bb.getInt()]; bb.get(_string); - String str = new String(_string); - if(str.compareTo("NULL") == 0) { - tuple.put(i, DatumFactory.createNullDatum()); - }else { - tuple.put(i, DatumFactory.createText(str)); - } + tuple.put(i, DatumFactory.createText(_string)); break; case BLOB: byte [] _bytes = new byte[bb.getInt()]; bb.get(_bytes); - if(Bytes.compareTo(bytes, Bytes.toBytes("NULL")) == 0) { - tuple.put(i, DatumFactory.createNullDatum()); - } else { - tuple.put(i, DatumFactory.createBlob(_bytes)); - } + tuple.put(i, DatumFactory.createBlob(_bytes)); break; case INET4: @@ -156,17 +145,40 @@ public class RowStoreUtil { } return tuple; } + + public Schema getSchema() { + return schema; + } } public static class RowStoreEncoder { + private Schema schema; + private BitArray nullFlags; + private int headerSize; - public static byte [] toBytes(Schema schema, Tuple tuple) { + public static RowStoreEncoder createInstance(Schema schema) { + return new RowStoreEncoder(schema); + } + + private RowStoreEncoder(Schema schema) { + this.schema = schema; + nullFlags = new BitArray(schema.size()); + headerSize = nullFlags.bytesLength(); + } + public byte [] toBytes(Tuple tuple) { + nullFlags.clear(); int size = StorageUtil.getRowByteSize(schema); - ByteBuffer bb = ByteBuffer.allocate(size); + ByteBuffer bb = ByteBuffer.allocate(size+headerSize); + bb.position(headerSize); Column col; for (int i = 0; i < schema.size(); i++) { + if (tuple.isNull(i)) { + nullFlags.set(i); + } + col = schema.getColumn(i); switch (col.getDataType().getType()) { + case NULL_TYPE: nullFlags.set(i); break; case BOOLEAN: bb.put(tuple.get(i).asByte()); break; case BIT: bb.put(tuple.get(i).asByte()); break; case CHAR: bb.put(tuple.get(i).asByte()); break; @@ -199,10 +211,20 @@ public class RowStoreUtil { } } + byte[] flags = nullFlags.toArray(); + int finalPosition = bb.position(); + bb.position(0); + bb.put(flags); + + bb.position(finalPosition); bb.flip(); byte [] buf = new byte [bb.limit()]; bb.get(buf); return buf; } + + public Schema getSchema() { + return schema; + } } } http://git-wip-us.apache.org/repos/asf/tajo/blob/e06ffa93/tajo-storage/src/main/java/org/apache/tajo/storage/index/bst/BSTIndex.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/index/bst/BSTIndex.java b/tajo-storage/src/main/java/org/apache/tajo/storage/index/bst/BSTIndex.java index bc8fe96..b149584 100644 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/index/bst/BSTIndex.java +++ b/tajo-storage/src/main/java/org/apache/tajo/storage/index/bst/BSTIndex.java @@ -27,7 +27,8 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.proto.CatalogProtos.SchemaProto; -import org.apache.tajo.storage.RowStoreUtil; +import org.apache.tajo.storage.RowStoreUtil.RowStoreDecoder; +import org.apache.tajo.storage.RowStoreUtil.RowStoreEncoder; import org.apache.tajo.storage.Tuple; import org.apache.tajo.storage.TupleComparator; import org.apache.tajo.storage.index.IndexMethod; @@ -94,6 +95,7 @@ public class BSTIndex implements IndexMethod { private Tuple firstKey; private Tuple lastKey; + private RowStoreEncoder rowStoreEncoder; // private Tuple lastestKey = null; @@ -111,6 +113,7 @@ public class BSTIndex implements IndexMethod { this.keySchema = keySchema; this.compartor = comparator; this.collector = new KeyOffsetCollector(comparator); + this.rowStoreEncoder = RowStoreEncoder.createInstance(keySchema); } public void setLoadNum(int loadNum) { @@ -161,12 +164,10 @@ public class BSTIndex implements IndexMethod { // entry out.writeInt(entryNum); if (entryNum > 0) { - byte [] minBytes = RowStoreUtil.RowStoreEncoder.toBytes(keySchema, - firstKey); + byte [] minBytes = rowStoreEncoder.toBytes(firstKey); out.writeInt(minBytes.length); out.write(minBytes); - byte [] maxBytes = RowStoreUtil.RowStoreEncoder.toBytes(keySchema, - lastKey); + byte [] maxBytes = rowStoreEncoder.toBytes(lastKey); out.writeInt(maxBytes.length); out.write(maxBytes); } @@ -197,7 +198,7 @@ public class BSTIndex implements IndexMethod { } } /* key writing */ - byte[] buf = RowStoreUtil.RowStoreEncoder.toBytes(this.keySchema, key); + byte[] buf = rowStoreEncoder.toBytes(key); out.writeInt(buf.length); out.write(buf); @@ -229,7 +230,7 @@ public class BSTIndex implements IndexMethod { /* root key writing */ for (Tuple key : keySet) { - byte[] buf = RowStoreUtil.RowStoreEncoder.toBytes(keySchema, key); + byte[] buf = rowStoreEncoder.toBytes(key); rootOut.writeInt(buf.length); rootOut.write(buf); @@ -301,6 +302,8 @@ public class BSTIndex implements IndexMethod { // mutex private final Object mutex = new Object(); + private RowStoreDecoder rowStoreDecoder; + /** * * @param fileName @@ -312,6 +315,7 @@ public class BSTIndex implements IndexMethod { this.fileName = fileName; this.keySchema = keySchema; this.comparator = comparator; + this.rowStoreDecoder = RowStoreDecoder.createInstance(keySchema); } public BSTIndexReader(final Path fileName) throws IOException { @@ -336,6 +340,7 @@ public class BSTIndex implements IndexMethod { builder.mergeFrom(schemaBytes); SchemaProto proto = builder.build(); this.keySchema = new Schema(proto); + this.rowStoreDecoder = RowStoreDecoder.createInstance(keySchema); // comparator int compByteSize = indexIn.readInt(); @@ -353,11 +358,11 @@ public class BSTIndex implements IndexMethod { if (entryNum > 0) { // if there is no any entry, do not read firstKey/lastKey values byte [] minBytes = new byte[indexIn.readInt()]; Bytes.readFully(indexIn, minBytes, 0, minBytes.length); - this.firstKey = RowStoreUtil.RowStoreDecoder.toTuple(keySchema, minBytes); + this.firstKey = rowStoreDecoder.toTuple(minBytes); byte [] maxBytes = new byte[indexIn.readInt()]; Bytes.readFully(indexIn, maxBytes, 0, maxBytes.length); - this.lastKey = RowStoreUtil.RowStoreDecoder.toTuple(keySchema, maxBytes); + this.lastKey = rowStoreDecoder.toTuple(maxBytes); } } @@ -476,12 +481,11 @@ public class BSTIndex implements IndexMethod { this.offsetSubIndex = new long[entryNum][]; byte[] buf; - for (int i = 0; i < entryNum; i++) { counter++; buf = new byte[in.readInt()]; Bytes.readFully(in, buf, 0, buf.length); - dataSubIndex[i] = RowStoreUtil.RowStoreDecoder.toTuple(keySchema, buf); + dataSubIndex[i] = rowStoreDecoder.toTuple(buf); int offsetNum = in.readInt(); this.offsetSubIndex[i] = new long[offsetNum]; @@ -503,7 +507,7 @@ public class BSTIndex implements IndexMethod { for (int i = 0; i < counter; i++) { buf = new byte[in.readInt()]; Bytes.readFully(in, buf, 0, buf.length); - dataSubIndex[i] = RowStoreUtil.RowStoreDecoder.toTuple(keySchema, buf); + dataSubIndex[i] = rowStoreDecoder.toTuple(buf); int offsetNum = in.readInt(); this.offsetSubIndex[i] = new long[offsetNum]; @@ -532,7 +536,7 @@ public class BSTIndex implements IndexMethod { for (int i = 0; i < entryNum; i++) { buf = new byte[in.readInt()]; Bytes.readFully(in, buf, 0, buf.length); - keyTuple = RowStoreUtil.RowStoreDecoder.toTuple(keySchema, buf); + keyTuple = rowStoreDecoder.toTuple(buf); dataIndex[i] = keyTuple; this.offsetIndex[i] = in.readLong(); }
