Repository: tajo Updated Branches: refs/heads/master e5b30e542 -> 4820610f4
http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/WindowAggExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/WindowAggExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/WindowAggExec.java index d177c48..c09e85e 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/WindowAggExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/WindowAggExec.java @@ -35,7 +35,6 @@ import org.apache.tajo.storage.VTuple; import org.apache.tajo.worker.TaskAttemptContext; import java.io.IOException; -import java.util.ArrayList; import java.util.Collections; import java.util.Iterator; import java.util.List; @@ -67,6 +66,8 @@ public class WindowAggExec extends UnaryPhysicalExec { private boolean [] windowFuncFlags; private boolean [] endUnboundedFollowingFlags; private boolean [] endCurrentRowFlags; + private Tuple currentKey; + private Tuple evaluatedTuple; // operator state enum WindowState { @@ -79,10 +80,9 @@ public class WindowAggExec extends UnaryPhysicalExec { // Transient state boolean firstTime = true; - List<Tuple> evaluatedTuples = null; - List<Tuple> accumulatedInTuples = null; - List<Tuple> nextAccumulatedProjected = null; - List<Tuple> nextAccumulatedInTuples = null; + TupleList evaluatedTuples = null; + TupleList accumulatedInTuples = null; + TupleList nextAccumulatedInTuples = null; WindowState state = WindowState.NEW_WINDOW; Iterator<Tuple> tupleInFrameIterator = null; @@ -105,6 +105,8 @@ public class WindowAggExec extends UnaryPhysicalExec { hasPartitionKeys = false; } + currentKey = new VTuple(partitionKeyNum); + if (plan.hasAggFunctions()) { functions = plan.getWindowFunctions(); functionNum = functions.length; @@ -169,7 +171,7 @@ public class WindowAggExec extends UnaryPhysicalExec { schemaForOrderBy = outSchema; } - + evaluatedTuple = new VTuple(schemaForOrderBy.size()); nonFunctionColumnNum = plan.getTargets().length - functionNum; nonFunctionColumns = new int[nonFunctionColumnNum]; for (int idx = 0; idx < plan.getTargets().length - functionNum; idx++) { @@ -193,7 +195,6 @@ public class WindowAggExec extends UnaryPhysicalExec { @Override public Tuple next() throws IOException { - Tuple currentKey = null; Tuple readTuple = null; while(!context.isStopped() && state != WindowState.END_OF_TUPLE) { @@ -212,7 +213,6 @@ public class WindowAggExec extends UnaryPhysicalExec { } if (readTuple != null && hasPartitionKeys) { // get a key tuple - currentKey = new VTuple(partitionKeyIds.length); for (int i = 0; i < partitionKeyIds.length; i++) { currentKey.put(i, readTuple.asDatum(partitionKeyIds[i])); } @@ -244,7 +244,7 @@ public class WindowAggExec extends UnaryPhysicalExec { private void initWindow() { if (firstTime) { - accumulatedInTuples = Lists.newArrayList(); + accumulatedInTuples = new TupleList(); contexts = new FunctionContext[functionNum]; for(int evalIdx = 0; evalIdx < functionNum; evalIdx++) { @@ -257,8 +257,10 @@ public class WindowAggExec extends UnaryPhysicalExec { private void accumulatingWindow(Tuple currentKey, Tuple inTuple) { if (lastKey == null || lastKey.equals(currentKey)) { // if the current key is same to the previous key - accumulatedInTuples.add(new VTuple(inTuple)); - + accumulatedInTuples.add(inTuple); + if (lastKey == null) { + lastKey = new VTuple(currentKey.size()); + } } else { // if the current key is different from the previous key, // the current key belongs to the next window frame. preaccumulatingNextWindow() will @@ -267,37 +269,30 @@ public class WindowAggExec extends UnaryPhysicalExec { transition(WindowState.EVALUATION); } - lastKey = currentKey; + lastKey.put(currentKey.getValues()); } private void preAccumulatingNextWindow(Tuple inTuple) { - Tuple projectedTuple = new VTuple(outSchema.size()); - for(int idx = 0; idx < nonFunctionColumnNum; idx++) { - projectedTuple.put(idx, inTuple.asDatum(nonFunctionColumns[idx])); - } - nextAccumulatedProjected = Lists.newArrayList(); - nextAccumulatedProjected.add(projectedTuple); - nextAccumulatedInTuples = Lists.newArrayList(); - nextAccumulatedInTuples.add(new VTuple(inTuple)); + nextAccumulatedInTuples = new TupleList(); + nextAccumulatedInTuples.add(inTuple); } private void evaluationWindowFrame() { TupleComparator comp; - evaluatedTuples = new ArrayList<Tuple>(); + evaluatedTuples = new TupleList(); for (int i = 0; i <accumulatedInTuples.size(); i++) { Tuple inTuple = accumulatedInTuples.get(i); - Tuple projectedTuple = new VTuple(schemaForOrderBy.size()); for (int c = 0; c < nonFunctionColumnNum; c++) { - projectedTuple.put(c, inTuple.asDatum(nonFunctionColumns[c])); + evaluatedTuple.put(c, inTuple.asDatum(nonFunctionColumns[c])); } for (int c = 0; c < sortKeyColumns.length; c++) { - projectedTuple.put(outputColumnNum + c, inTuple.asDatum(sortKeyColumns[c])); + evaluatedTuple.put(outputColumnNum + c, inTuple.asDatum(sortKeyColumns[c])); } - evaluatedTuples.add(projectedTuple); + evaluatedTuples.add(evaluatedTuple); } for (int idx = 0; idx < functions.length; idx++) { @@ -354,4 +349,22 @@ public class WindowAggExec extends UnaryPhysicalExec { lastKey = null; noMoreTuples = false; } + + @Override + public void close() throws IOException { + if (evaluatedTuples != null) { + evaluatedTuples.clear(); + evaluatedTuples = null; + } + if (accumulatedInTuples != null) { + accumulatedInTuples.clear(); + accumulatedInTuples = null; + } + if (nextAccumulatedInTuples != null) { + nextAccumulatedInTuples.clear(); + nextAccumulatedInTuples = null; + } + + super.close(); + } } http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryContext.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryContext.java b/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryContext.java index da2f2ad..7279e8e 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryContext.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryContext.java @@ -29,6 +29,8 @@ import org.apache.tajo.plan.logical.NodeType; import org.apache.tajo.session.Session; import java.net.URI; +import java.util.HashMap; +import java.util.Map; import static org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.KeyValueSetProto; @@ -42,7 +44,10 @@ public class QueryContext extends OverridableConf { public QueryContext(TajoConf conf, Session session) { super(conf, ConfigKey.ConfigType.QUERY, ConfigKey.ConfigType.SESSION); - putAll(session.getAllVariables()); + Map<String, String> copy = new HashMap<String, String>(session.getAllVariables()); + // Among session variables, timezone must not be + copy.remove("TIMEZONE"); + putAll(copy); } public QueryContext(TajoConf conf, KeyValueSetProto proto) { http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-core/src/main/java/org/apache/tajo/engine/utils/CacheHolder.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/utils/CacheHolder.java b/tajo-core/src/main/java/org/apache/tajo/engine/utils/CacheHolder.java index addca49..05936be 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/utils/CacheHolder.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/utils/CacheHolder.java @@ -20,15 +20,14 @@ package org.apache.tajo.engine.utils; import org.apache.tajo.catalog.proto.CatalogProtos; import org.apache.tajo.catalog.statistics.TableStats; -import org.apache.tajo.storage.Tuple; +import org.apache.tajo.engine.planner.physical.TupleList; +import org.apache.tajo.engine.planner.physical.TupleMap; import org.apache.tajo.storage.fragment.Fragment; import org.apache.tajo.storage.fragment.FragmentConvertor; import org.apache.tajo.util.Deallocatable; import org.apache.tajo.worker.TaskAttemptContext; import java.io.IOException; -import java.util.List; -import java.util.Map; public interface CacheHolder<T> { @@ -52,19 +51,19 @@ public interface CacheHolder<T> { * This is a cache-holder for a join table * It will release when execution block is finished */ - public static class BroadcastCacheHolder implements CacheHolder<Map<Tuple, List<Tuple>>> { - private Map<Tuple, List<Tuple>> data; + class BroadcastCacheHolder implements CacheHolder<TupleMap<TupleList>> { + private TupleMap<TupleList> data; private Deallocatable rowBlock; private TableStats tableStats; - public BroadcastCacheHolder(Map<Tuple, List<Tuple>> data, TableStats tableStats, Deallocatable rowBlock){ + public BroadcastCacheHolder(TupleMap<TupleList> data, TableStats tableStats, Deallocatable rowBlock){ this.data = data; this.tableStats = tableStats; this.rowBlock = rowBlock; } @Override - public Map<Tuple, List<Tuple>> getData() { + public TupleMap<TupleList> getData() { return data; } http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-core/src/main/java/org/apache/tajo/engine/utils/TupleUtil.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/utils/TupleUtil.java b/tajo-core/src/main/java/org/apache/tajo/engine/utils/TupleUtil.java index 1dcbcab..0a04213 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/utils/TupleUtil.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/utils/TupleUtil.java @@ -19,7 +19,6 @@ package org.apache.tajo.engine.utils; import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; import com.google.common.collect.Maps; import org.apache.commons.codec.binary.Base64; import org.apache.commons.logging.Log; @@ -30,17 +29,13 @@ import org.apache.tajo.catalog.SortSpec; import org.apache.tajo.catalog.statistics.ColumnStats; import org.apache.tajo.datum.DatumFactory; import org.apache.tajo.datum.NullDatum; -import org.apache.tajo.plan.expr.EvalNode; import org.apache.tajo.storage.RowStoreUtil; import org.apache.tajo.storage.RowStoreUtil.RowStoreEncoder; -import org.apache.tajo.storage.Tuple; import org.apache.tajo.storage.TupleRange; import org.apache.tajo.storage.VTuple; import java.io.UnsupportedEncodingException; import java.net.URLEncoder; -import java.util.Collection; -import java.util.Iterator; import java.util.List; import java.util.Map; @@ -177,20 +172,4 @@ public class TupleUtil { } return new TupleRange(sortSpecs, startTuple, endTuple); } - - /** - * It creates a tuple of a given size filled with NULL values in all fields - * It is usually used in outer join algorithms. - * - * @param size The number of columns of a creating tuple - * @return The created tuple filled with NULL values - */ - public static Tuple createNullPaddedTuple(int size){ - VTuple aTuple = new VTuple(size); - int i; - for(i = 0; i < size; i++){ - aTuple.put(i, DatumFactory.createNullDatum()); - } - return aTuple; - } } http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java index 52c8d79..228c0b8 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java @@ -694,8 +694,7 @@ public class NonForwardQueryResultSystemScanner implements NonForwardQueryResult @Override public Tuple next() throws IOException { Tuple aTuple; - Tuple outTuple = new VTuple(outColumnNum); - + if (isClosed) { return null; } @@ -707,7 +706,7 @@ public class NonForwardQueryResultSystemScanner implements NonForwardQueryResult if (!scanNode.hasQual()) { if (currentRow < cachedData.size()) { aTuple = cachedData.get(currentRow++); - projector.eval(aTuple, outTuple); + Tuple outTuple = projector.eval(aTuple); outTuple.setOffset(aTuple.getOffset()); return outTuple; } @@ -716,7 +715,8 @@ public class NonForwardQueryResultSystemScanner implements NonForwardQueryResult while (currentRow < cachedData.size()) { aTuple = cachedData.get(currentRow++); if (qual.eval(aTuple).isTrue()) { - projector.eval(aTuple, outTuple); + Tuple outTuple = projector.eval(aTuple); + outTuple.setOffset(aTuple.getOffset()); return outTuple; } } http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java b/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java index 60349dd..25b306f 100644 --- a/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java +++ b/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java @@ -154,6 +154,7 @@ public class TajoTestingCluster { // Memory cache termination conf.setIntVar(ConfVars.WORKER_HISTORY_EXPIRE_PERIOD, 1); + // Python function path conf.setStrings(ConfVars.PYTHON_CODE_DIR.varname, getClass().getResource("/python").toString()); /* Since Travis CI limits the size of standard output log up to 4MB */ http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java index c01adb5..d383662 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java @@ -40,6 +40,7 @@ import org.apache.tajo.engine.function.builtin.SumInt; import org.apache.tajo.engine.json.CoreGsonHelper; import org.apache.tajo.engine.parser.SQLAnalyzer; import org.apache.tajo.engine.query.QueryContext; +import org.apache.tajo.plan.*; import org.apache.tajo.exception.TajoException; import org.apache.tajo.plan.LogicalOptimizer; import org.apache.tajo.plan.LogicalPlan; @@ -1334,5 +1335,4 @@ public class TestLogicalPlanner { assertEquals(alterTableNode.getPartitionValues()[1], "01"); assertEquals(alterTableNode.getPartitionValues()[2], "11"); } - -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestTupleSorter.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestTupleSorter.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestTupleSorter.java index ea1214e..4061b25 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestTupleSorter.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestTupleSorter.java @@ -33,13 +33,7 @@ import org.apache.tajo.storage.Tuple; import org.apache.tajo.storage.VTuple; import org.junit.Test; -import java.util.Arrays; -import java.util.Collections; -import java.util.Iterator; -import java.util.List; -import java.util.Random; -import java.util.Set; -import java.util.TreeSet; +import java.util.*; import static org.junit.Assert.assertArrayEquals; @@ -80,7 +74,8 @@ public class TestTupleSorter { long[] time1 = new long[ITERATION]; long[] time2 = new long[ITERATION]; for(int iteration = 0; iteration < ITERATION; iteration++) { - List<Tuple> target = Arrays.asList(Arrays.copyOf(tuples, tuples.length)); + TupleList target = new TupleList(tuples.length); + target.addAll(Arrays.asList(Arrays.copyOf(tuples, tuples.length))); Set<Integer> keys = new TreeSet<Integer>(); for (int i = 0; i < MAX_SORT_KEY; i++) { keys.add(rnd.nextInt(schema.size())); http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-core/src/test/java/org/apache/tajo/engine/query/TestGroupByQuery.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestGroupByQuery.java b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestGroupByQuery.java index 3055362..a5caf38 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestGroupByQuery.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestGroupByQuery.java @@ -26,8 +26,8 @@ import org.apache.tajo.common.TajoDataTypes.Type; import org.apache.tajo.conf.TajoConf.ConfVars; import org.apache.tajo.querymaster.Query; import org.apache.tajo.querymaster.QueryMasterTask; -import org.apache.tajo.querymaster.Task; import org.apache.tajo.querymaster.Stage; +import org.apache.tajo.querymaster.Task; import org.apache.tajo.storage.StorageConstants; import org.apache.tajo.util.KeyValueSet; import org.apache.tajo.util.TUtil; @@ -565,12 +565,12 @@ public class TestGroupByQuery extends QueryTestCaseBase { } @Test + @Option(sort = true) + @SimpleTest public final void testHavingWithAggFunction() throws Exception { // select l_orderkey, avg(l_partkey) total, sum(l_linenumber) as num from lineitem group by l_orderkey // having avg(l_partkey) = 2.5 or num = 1; - ResultSet res = executeQuery(); - assertResultSet(res); - cleanupQuery(res); + runSimpleTests(); } @Test @@ -693,6 +693,8 @@ public class TestGroupByQuery extends QueryTestCaseBase { @Test public final void testNumShufflePartition() throws Exception { + + Thread.sleep(5000); KeyValueSet tableOptions = new KeyValueSet(); tableOptions.set(StorageConstants.TEXT_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER); tableOptions.set(StorageConstants.TEXT_NULL, "\\\\N"); @@ -801,17 +803,17 @@ public class TestGroupByQuery extends QueryTestCaseBase { } @Test + @Option(sort = true) + @SimpleTest public final void testGroupbyWithPythonFunc() throws Exception { - ResultSet res = executeQuery(); - assertResultSet(res); - cleanupQuery(res); + runSimpleTests(); } @Test + @Option(sort = true) + @SimpleTest public final void testGroupbyWithPythonFunc2() throws Exception { - ResultSet res = executeQuery(); - assertResultSet(res); - cleanupQuery(res); + runSimpleTests(); } @Test @@ -829,10 +831,10 @@ public class TestGroupByQuery extends QueryTestCaseBase { } @Test + @Option(sort = true) + @SimpleTest public final void testPythonUdaf3() throws Exception { - ResultSet res = executeQuery(); - assertResultSet(res); - cleanupQuery(res); + runSimpleTests(); } // TODO: this test cannot be executed due to the bug of logical planner (TAJO-1588) http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-core/src/test/resources/queries/TestGroupByQuery/testGroupbyWithJson.json ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/resources/queries/TestGroupByQuery/testGroupbyWithJson.json b/tajo-core/src/test/resources/queries/TestGroupByQuery/testGroupbyWithJson.json index 7a60c64..072cf9b 100644 --- a/tajo-core/src/test/resources/queries/TestGroupByQuery/testGroupbyWithJson.json +++ b/tajo-core/src/test/resources/queries/TestGroupByQuery/testGroupbyWithJson.json @@ -42,57 +42,86 @@ } ], "Expr": { - "HavingCondition": { - "LeftExpr": { - "LeftExpr": { - "ColumnName": "total", + "SortSpecs": [ + { + "SortKey": { + "ColumnName": "l_orderkey", "OpType": "Column" }, - "RightExpr": { - "Value": "2", - "ValueType": "Unsigned_Integer", - "OpType": "Literal" + "IsAsc": true, + "IsNullFirst": false + }, + { + "SortKey": { + "ColumnName": "total", + "OpType": "Column" }, - "OpType": "GreaterThanOrEquals" + "IsAsc": true, + "IsNullFirst": false }, - "RightExpr": { - "LeftExpr": { + { + "SortKey": { "ColumnName": "num", "OpType": "Column" }, + "IsAsc": true, + "IsNullFirst": false + } + ], + "Expr": { + "HavingCondition": { + "LeftExpr": { + "LeftExpr": { + "ColumnName": "total", + "OpType": "Column" + }, + "RightExpr": { + "Value": "2", + "ValueType": "Unsigned_Integer", + "OpType": "Literal" + }, + "OpType": "GreaterThanOrEquals" + }, "RightExpr": { - "Value": "3", - "ValueType": "Unsigned_Integer", - "OpType": "Literal" + "LeftExpr": { + "ColumnName": "num", + "OpType": "Column" + }, + "RightExpr": { + "Value": "3", + "ValueType": "Unsigned_Integer", + "OpType": "Literal" + }, + "OpType": "Equals" }, - "OpType": "Equals" + "OpType": "Or" }, - "OpType": "Or" - }, - "Expr": { - "Groups": [ - { - "GroupType": "OrdinaryGroup", - "Dimensions": [ - { - "ColumnName": "l_orderkey", - "OpType": "Column" - } - ] - } - ], "Expr": { - "Relations": [ + "Groups": [ { - "TableName": "lineitem", - "OpType": "Relation" + "GroupType": "OrdinaryGroup", + "Dimensions": [ + { + "ColumnName": "l_orderkey", + "OpType": "Column" + } + ] } ], - "OpType": "RelationList" + "Expr": { + "Relations": [ + { + "TableName": "lineitem", + "OpType": "Relation" + } + ], + "OpType": "RelationList" + }, + "OpType": "Aggregation" }, - "OpType": "Aggregation" + "OpType": "Having" }, - "OpType": "Having" + "OpType": "Sort" }, "OpType": "Projection" -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-core/src/test/resources/queries/TestGroupByQuery/testGroupbyWithLimit3.sql ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/resources/queries/TestGroupByQuery/testGroupbyWithLimit3.sql b/tajo-core/src/test/resources/queries/TestGroupByQuery/testGroupbyWithLimit3.sql index 1bb7c7f..fd1e5fb 100644 --- a/tajo-core/src/test/resources/queries/TestGroupByQuery/testGroupbyWithLimit3.sql +++ b/tajo-core/src/test/resources/queries/TestGroupByQuery/testGroupbyWithLimit3.sql @@ -1 +1 @@ -select l_orderkey, sum(l_linenumber) from lineitem group by l_orderkey limit 1; \ No newline at end of file +select l_orderkey, sum(l_linenumber) from lineitem group by l_orderkey order by l_orderkey limit 1; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-core/src/test/resources/queries/TestSelectNestedRecord/testNestedFieldAsGroupbyKey1.sql ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/resources/queries/TestSelectNestedRecord/testNestedFieldAsGroupbyKey1.sql b/tajo-core/src/test/resources/queries/TestSelectNestedRecord/testNestedFieldAsGroupbyKey1.sql index 057ba05..c19e69e 100644 --- a/tajo-core/src/test/resources/queries/TestSelectNestedRecord/testNestedFieldAsGroupbyKey1.sql +++ b/tajo-core/src/test/resources/queries/TestSelectNestedRecord/testNestedFieldAsGroupbyKey1.sql @@ -4,4 +4,6 @@ SELECT FROM tweets GROUP BY + user.name +order by user.name; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-core/src/test/resources/results/TestGroupByQuery/testGroupbyWithJson.result ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/resources/results/TestGroupByQuery/testGroupbyWithJson.result b/tajo-core/src/test/resources/results/TestGroupByQuery/testGroupbyWithJson.result index 366b76e..81f1bfd 100644 --- a/tajo-core/src/test/resources/results/TestGroupByQuery/testGroupbyWithJson.result +++ b/tajo-core/src/test/resources/results/TestGroupByQuery/testGroupbyWithJson.result @@ -1,5 +1,5 @@ l_orderkey,total,num ------------------------------- -3,2.5,3 +1,1.0,3 2,2.0,1 -1,1.0,3 \ No newline at end of file +3,2.5,3 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-core/src/test/resources/results/TestGroupByQuery/testGroupbyWithLimit3.result ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/resources/results/TestGroupByQuery/testGroupbyWithLimit3.result b/tajo-core/src/test/resources/results/TestGroupByQuery/testGroupbyWithLimit3.result index cb1e141..e882158 100644 --- a/tajo-core/src/test/resources/results/TestGroupByQuery/testGroupbyWithLimit3.result +++ b/tajo-core/src/test/resources/results/TestGroupByQuery/testGroupbyWithLimit3.result @@ -1,3 +1,3 @@ l_orderkey,?sum ------------------------------- -3,3 \ No newline at end of file +1,3 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-core/src/test/resources/results/TestGroupByQuery/testGroupbyWithPythonFunc.result ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/resources/results/TestGroupByQuery/testGroupbyWithPythonFunc.result b/tajo-core/src/test/resources/results/TestGroupByQuery/testGroupbyWithPythonFunc.result index 2a5fb8a..1e8821c 100644 --- a/tajo-core/src/test/resources/results/TestGroupByQuery/testGroupbyWithPythonFunc.result +++ b/tajo-core/src/test/resources/results/TestGroupByQuery/testGroupbyWithPythonFunc.result @@ -1,7 +1,7 @@ ?count ------------------------------- 4 +4 5 5 5 -4 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-core/src/test/resources/results/TestGroupByQuery/testGroupbyWithPythonFunc2.result ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/resources/results/TestGroupByQuery/testGroupbyWithPythonFunc2.result b/tajo-core/src/test/resources/results/TestGroupByQuery/testGroupbyWithPythonFunc2.result index 08561fd..1413e41 100644 --- a/tajo-core/src/test/resources/results/TestGroupByQuery/testGroupbyWithPythonFunc2.result +++ b/tajo-core/src/test/resources/results/TestGroupByQuery/testGroupbyWithPythonFunc2.result @@ -1,7 +1,7 @@ n_regionkey,cnt ------------------------------- 0,5 +1,5 +2,5 3,5 4,5 -2,5 -1,5 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-core/src/test/resources/results/TestGroupByQuery/testHavingWithAggFunction.result ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/resources/results/TestGroupByQuery/testHavingWithAggFunction.result b/tajo-core/src/test/resources/results/TestGroupByQuery/testHavingWithAggFunction.result index b8369d2..fcd604e 100644 --- a/tajo-core/src/test/resources/results/TestGroupByQuery/testHavingWithAggFunction.result +++ b/tajo-core/src/test/resources/results/TestGroupByQuery/testHavingWithAggFunction.result @@ -1,4 +1,4 @@ l_orderkey,total,num ------------------------------- +2,2.0,1 3,2.5,3 -2,2.0,1 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-core/src/test/resources/results/TestGroupByQuery/testPythonUdaf3.result ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/resources/results/TestGroupByQuery/testPythonUdaf3.result b/tajo-core/src/test/resources/results/TestGroupByQuery/testPythonUdaf3.result index 0607720..7338988 100644 --- a/tajo-core/src/test/resources/results/TestGroupByQuery/testPythonUdaf3.result +++ b/tajo-core/src/test/resources/results/TestGroupByQuery/testPythonUdaf3.result @@ -1,5 +1,5 @@ ?avgpy,?countpy_1,?avg_2,?count_3 ------------------------------- +173665.47,1,173665.47,1 193846.25,1,193846.25,1 46929.18,1,46929.18,1 -173665.47,1,173665.47,1 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-core/src/test/resources/results/TestSelectNestedRecord/testNestedFieldAsGroupbyKey1.result ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/resources/results/TestSelectNestedRecord/testNestedFieldAsGroupbyKey1.result b/tajo-core/src/test/resources/results/TestSelectNestedRecord/testNestedFieldAsGroupbyKey1.result index debf06e..d881f87 100644 --- a/tajo-core/src/test/resources/results/TestSelectNestedRecord/testNestedFieldAsGroupbyKey1.result +++ b/tajo-core/src/test/resources/results/TestSelectNestedRecord/testNestedFieldAsGroupbyKey1.result @@ -1,6 +1,6 @@ user/name,total_retweet ------------------------------- Chaz Martenstein,2 -Thomas John Wakeman,3 +Marty Elmer,4 Sean Cummings,1 -Marty Elmer,4 \ No newline at end of file +Thomas John Wakeman,3 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-core/src/test/resources/results/TestWindowQuery/testWindowWithAggregation4.result ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/resources/results/TestWindowQuery/testWindowWithAggregation4.result b/tajo-core/src/test/resources/results/TestWindowQuery/testWindowWithAggregation4.result index c34577c..fffa2dd 100644 --- a/tajo-core/src/test/resources/results/TestWindowQuery/testWindowWithAggregation4.result +++ b/tajo-core/src/test/resources/results/TestWindowQuery/testWindowWithAggregation4.result @@ -1,5 +1,5 @@ l_orderkey,cnt,row_num ------------------------------- -3,2,1 -1,2,2 +1,2,1 +3,2,2 2,1,3 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-core/src/test/resources/results/TestWindowQuery/testWindowWithAggregation6.result ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/resources/results/TestWindowQuery/testWindowWithAggregation6.result b/tajo-core/src/test/resources/results/TestWindowQuery/testWindowWithAggregation6.result index 7dd39ac..c2e02d5 100644 --- a/tajo-core/src/test/resources/results/TestWindowQuery/testWindowWithAggregation6.result +++ b/tajo-core/src/test/resources/results/TestWindowQuery/testWindowWithAggregation6.result @@ -1,5 +1,5 @@ l_orderkey,cnt,row_num ------------------------------- -1,2,2 +1,2,1 2,1,3 -3,2,1 \ No newline at end of file +3,2,2 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/MetaDataTuple.java ---------------------------------------------------------------------- diff --git a/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/MetaDataTuple.java b/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/MetaDataTuple.java index 9ae1837..fad285c 100644 --- a/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/MetaDataTuple.java +++ b/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/MetaDataTuple.java @@ -93,6 +93,11 @@ public class MetaDataTuple implements Tuple { } @Override + public void clearOffset() { + throw new UnsupportedException("clearOffset"); + } + + @Override public Datum asDatum(int fieldId) { return values.get(fieldId); } http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java b/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java index 5011d87..4a208c2 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java @@ -1299,7 +1299,7 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex QueryBlock block = context.queryBlock; ScanNode scanNode = block.getNodeFromExpr(expr); - updatePhysicalInfo(scanNode.getTableDesc()); + updatePhysicalInfo(context, scanNode.getTableDesc()); // Find expression which can be evaluated at this relation node. // Except for column references, additional expressions used in select list, where clause, order-by clauses @@ -1358,12 +1358,12 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex return targets; } - private void updatePhysicalInfo(TableDesc desc) { + private void updatePhysicalInfo(PlanContext planContext, TableDesc desc) { if (desc.getUri() != null && desc.getMeta().getStoreType() != "SYSTEM" && PlannerUtil.isFileStorageType(desc.getMeta().getStoreType())) { try { Path path = new Path(desc.getUri()); - FileSystem fs = path.getFileSystem(new Configuration()); + FileSystem fs = path.getFileSystem(planContext.queryContext.getConf()); FileStatus status = fs.getFileStatus(path); if (desc.getStats() != null && (status.isDirectory() || status.isFile())) { ContentSummary summary = fs.getContentSummary(path); http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-plan/src/main/java/org/apache/tajo/plan/TablePropertyUtil.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/TablePropertyUtil.java b/tajo-plan/src/main/java/org/apache/tajo/plan/TablePropertyUtil.java index 5576889..2fe0cbe 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/TablePropertyUtil.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/TablePropertyUtil.java @@ -44,8 +44,6 @@ public class TablePropertyUtil { if (storeType.equalsIgnoreCase("CSV") || storeType.equalsIgnoreCase("TEXT")) { setSessionToProperty(context, SessionVars.NULL_CHAR, property, StorageConstants.TEXT_NULL); } - - setSessionToProperty(context, SessionVars.TIMEZONE, property, StorageConstants.TIMEZONE); } private static void setSessionToProperty(OverridableConf context, http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/FrameTuple.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/FrameTuple.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/FrameTuple.java index ed53832..de39d08 100644 --- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/FrameTuple.java +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/FrameTuple.java @@ -125,6 +125,11 @@ public class FrameTuple implements Tuple, Cloneable { } @Override + public void clearOffset() { + throw new UnsupportedException(); + } + + @Override public void setOffset(long offset) { throw new UnsupportedException(); } http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/LazyTuple.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/LazyTuple.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/LazyTuple.java index 7bfc166..d4597fa 100644 --- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/LazyTuple.java +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/LazyTuple.java @@ -29,6 +29,7 @@ import org.apache.tajo.util.datetime.TimeMeta; import java.util.Arrays; +@Deprecated public class LazyTuple implements Tuple, Cloneable { private long offset; private Datum[] values; @@ -122,6 +123,11 @@ public class LazyTuple implements Tuple, Cloneable { return get(fieldId).size(); } + @Override + public void clearOffset() { + this.offset = -1; + } + ////////////////////////////////////////////////////// // Getter ////////////////////////////////////////////////////// http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/RowStoreUtil.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/RowStoreUtil.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/RowStoreUtil.java index ba9f873..51cedb2 100644 --- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/RowStoreUtil.java +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/RowStoreUtil.java @@ -45,7 +45,7 @@ public class RowStoreUtil { } public static Tuple project(Tuple in, Tuple out, int[] targetIds) { - out.clear(); + out.clearOffset(); for (int idx = 0; idx < targetIds.length; idx++) { out.put(idx, in.asDatum(targetIds[idx])); } http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/HeapTuple.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/HeapTuple.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/HeapTuple.java index 1f43ef8..8f36b35 100644 --- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/HeapTuple.java +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/HeapTuple.java @@ -65,6 +65,10 @@ public class HeapTuple implements Tuple { return UNSAFE.getInt(data, BASE_OFFSET + checkNullAndGetOffset(fieldId)); } + @Override + public void clearOffset() { + } + public ByteBuffer nioBuffer() { return ByteBuffer.wrap(data); } http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTuple.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTuple.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTuple.java index e7bd2aa..4ccba7b 100644 --- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTuple.java +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTuple.java @@ -186,6 +186,10 @@ public abstract class UnSafeTuple implements Tuple { } @Override + public void clearOffset() { + } + + @Override public void setOffset(long offset) { } http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseScanner.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseScanner.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseScanner.java index 11851ec..1626526 100644 --- a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseScanner.java +++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseScanner.java @@ -85,6 +85,8 @@ public class HBaseScanner implements Scanner { private int[] rowKeyFieldIndexes; private char rowKeyDelimiter; + private Tuple outTuple; + public HBaseScanner (Configuration conf, Schema schema, TableMeta meta, Fragment fragment) throws IOException { Preconditions.checkNotNull(conf); Preconditions.checkNotNull(schema); @@ -119,6 +121,8 @@ public class HBaseScanner implements Scanner { targets = schema.toArray(); } + outTuple = new VTuple(targets.length); + columnMapping = new ColumnMapping(schema, meta.getOptions()); targetIndexes = new int[targets.length]; int index = 0; @@ -206,12 +210,11 @@ public class HBaseScanner implements Scanner { } Result result = scanResults[scanResultIndex++]; - Tuple resultTuple = new VTuple(targetIndexes.length); for (int i = 0; i < targetIndexes.length; i++) { - resultTuple.put(i, getDatum(result, targetIndexes[i])); + outTuple.put(i, getDatum(result, targetIndexes[i])); } numRows++; - return resultTuple; + return outTuple; } private Datum getDatum(Result result, int fieldId) throws IOException { http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/CSVFile.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/CSVFile.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/CSVFile.java index 8b8ca76..e55e34b 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/CSVFile.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/CSVFile.java @@ -18,12 +18,14 @@ package org.apache.tajo.storage; +import io.netty.buffer.ByteBuf; import org.apache.commons.lang.StringEscapeUtils; import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.*; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.*; @@ -40,8 +42,10 @@ import org.apache.tajo.storage.compress.CodecPool; import org.apache.tajo.storage.exception.AlreadyExistsStorageException; import org.apache.tajo.storage.fragment.Fragment; import org.apache.tajo.storage.rcfile.NonSyncByteArrayOutputStream; +import org.apache.tajo.storage.text.ByteBufLineReader; +import org.apache.tajo.storage.text.DelimitedTextFile; +import org.apache.tajo.storage.text.TextLineDeserializer; import org.apache.tajo.util.Bytes; -import org.apache.tajo.util.BytesUtils; import java.io.*; import java.util.ArrayList; @@ -50,7 +54,6 @@ import java.util.Arrays; public class CSVFile { public static final byte LF = '\n'; - public static int EOF = -1; private static final Log LOG = LogFactory.getLog(CSVFile.class); @@ -262,26 +265,9 @@ public class CSVFile { if (codec == null || codec instanceof SplittableCompressionCodec) { splittable = true; } - - //Delimiter - this.delimiter = StringEscapeUtils.unescapeJava( - meta.getOption(StorageConstants.TEXT_DELIMITER, - meta.getOption(StorageConstants.CSVFILE_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER))) - .getBytes(Bytes.UTF8_CHARSET); - - String nullCharacters = StringEscapeUtils.unescapeJava( - meta.getOption(StorageConstants.TEXT_NULL, - meta.getOption(StorageConstants.CSVFILE_NULL, NullDatum.DEFAULT_TEXT))); - - if (StringUtils.isEmpty(nullCharacters)) { - nullChars = NullDatum.get().asTextBytes(); - } else { - nullChars = nullCharacters.getBytes(Bytes.UTF8_CHARSET); - } } private final static int DEFAULT_PAGE_SIZE = 256 * 1024; - private byte[] delimiter; private FileSystem fs; private FSDataInputStream fis; private InputStream is; //decompressd stream @@ -294,13 +280,14 @@ public class CSVFile { private int currentIdx = 0, validIdx = 0, recordCount = 0; private int[] targetColumnIndexes; private boolean eof = false; - private final byte[] nullChars; private SplitLineReader reader; private ArrayList<Long> fileOffsets; private ArrayList<Integer> rowLengthList; private ArrayList<Integer> startOffsets; private NonSyncByteArrayOutputStream buffer; - private SerializerDeserializer serde; + private Tuple outTuple; + private TextLineDeserializer deserializer; + private ByteBuf byteBuf = BufferPool.directBuffer(ByteBufLineReader.DEFAULT_BUFFER); @Override public void init() throws IOException { @@ -347,22 +334,15 @@ public class CSVFile { targets = schema.toArray(); } + outTuple = new VTuple(targets.length); + deserializer = DelimitedTextFile.getLineSerde(meta).createDeserializer(schema, meta, targets); + deserializer.init(); + targetColumnIndexes = new int[targets.length]; for (int i = 0; i < targets.length; i++) { targetColumnIndexes[i] = schema.getColumnId(targets[i].getQualifiedName()); } - try { - //FIXME - String serdeClass = this.meta.getOption(StorageConstants.CSVFILE_SERDE, - TextSerializerDeserializer.class.getName()); - serde = (SerializerDeserializer) Class.forName(serdeClass).newInstance(); - serde.init(schema); - } catch (Exception e) { - LOG.error(e.getMessage(), e); - throw new IOException(e); - } - super.init(); Arrays.sort(targetColumnIndexes); if (LOG.isDebugEnabled()) { @@ -396,7 +376,7 @@ public class CSVFile { } private void page() throws IOException { -// // Index initialization + // Index initialization currentIdx = 0; validIdx = 0; int currentBufferPos = 0; @@ -473,15 +453,13 @@ public class CSVFile { } } - long offset = -1; - if(!isCompress()){ - offset = fileOffsets.get(currentIdx); - } + byteBuf.clear(); + byteBuf.writeBytes(buffer.getData(), startOffsets.get(currentIdx), rowLengthList.get(currentIdx)); + + deserializer.deserialize(byteBuf, outTuple); - byte[][] cells = BytesUtils.splitPreserveAllTokens(buffer.getData(), startOffsets.get(currentIdx), - rowLengthList.get(currentIdx), delimiter, targetColumnIndexes, schema.size()); currentIdx++; - return new LazyTuple(schema, cells, offset, nullChars, serde); + return outTuple; } catch (Throwable t) { LOG.error("Tuple list length: " + (fileOffsets != null ? fileOffsets.size() : 0), t); LOG.error("Tuple list current index: " + currentIdx, t); @@ -523,12 +501,16 @@ public class CSVFile { CodecPool.returnDecompressor(decompressor); decompressor = null; } + outTuple = null; + if (this.byteBuf.refCnt() > 0) { + this.byteBuf.release(); + } } } @Override public boolean isProjectable() { - return false; + return true; } @Override http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RawFile.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RawFile.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RawFile.java index 3b655be..e3594f9 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RawFile.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RawFile.java @@ -58,7 +58,7 @@ public class RawFile { private ByteBuffer buffer; private ByteBuf buf; - private Tuple tuple; + private Tuple outTuple; private int headerSize = 0; // Header size of a tuple private BitArray nullFlags; @@ -105,7 +105,7 @@ public class RawFile { columnTypes[i] = schema.getColumn(i).getDataType(); } - tuple = new VTuple(columnTypes.length); + outTuple = new VTuple(columnTypes.length); nullFlags = new BitArray(schema.size()); headerSize = RECORD_SIZE + 2 + nullFlags.bytesLength(); // The middle 2 bytes is for NullFlagSize @@ -274,51 +274,51 @@ public class RawFile { for (int i = 0; i < columnTypes.length; i++) { // check if the i'th column is null if (nullFlags.get(i)) { - tuple.put(i, DatumFactory.createNullDatum()); + outTuple.put(i, DatumFactory.createNullDatum()); continue; } switch (columnTypes[i].getType()) { case BOOLEAN : - tuple.put(i, DatumFactory.createBool(buffer.get())); + outTuple.put(i, DatumFactory.createBool(buffer.get())); break; case BIT : - tuple.put(i, DatumFactory.createBit(buffer.get())); + outTuple.put(i, DatumFactory.createBit(buffer.get())); break; case CHAR : int realLen = readRawVarint32(); byte[] buf = new byte[realLen]; buffer.get(buf); - tuple.put(i, DatumFactory.createChar(buf)); + outTuple.put(i, DatumFactory.createChar(buf)); break; case INT2 : - tuple.put(i, DatumFactory.createInt2(buffer.getShort())); + outTuple.put(i, DatumFactory.createInt2(buffer.getShort())); break; case INT4 : - tuple.put(i, DatumFactory.createInt4(decodeZigZag32(readRawVarint32()))); + outTuple.put(i, DatumFactory.createInt4(decodeZigZag32(readRawVarint32()))); break; case INT8 : - tuple.put(i, DatumFactory.createInt8(decodeZigZag64(readRawVarint64()))); + outTuple.put(i, DatumFactory.createInt8(decodeZigZag64(readRawVarint64()))); break; case FLOAT4 : - tuple.put(i, DatumFactory.createFloat4(buffer.getFloat())); + outTuple.put(i, DatumFactory.createFloat4(buffer.getFloat())); break; case FLOAT8 : - tuple.put(i, DatumFactory.createFloat8(buffer.getDouble())); + outTuple.put(i, DatumFactory.createFloat8(buffer.getDouble())); break; case TEXT : { int len = readRawVarint32(); byte [] strBytes = new byte[len]; buffer.get(strBytes); - tuple.put(i, DatumFactory.createText(strBytes)); + outTuple.put(i, DatumFactory.createText(strBytes)); break; } @@ -326,7 +326,7 @@ public class RawFile { int len = readRawVarint32(); byte [] rawBytes = new byte[len]; buffer.get(rawBytes); - tuple.put(i, DatumFactory.createBlob(rawBytes)); + outTuple.put(i, DatumFactory.createBlob(rawBytes)); break; } @@ -338,22 +338,22 @@ public class RawFile { ProtobufDatumFactory factory = ProtobufDatumFactory.get(columnTypes[i]); Message.Builder builder = factory.newBuilder(); builder.mergeFrom(rawBytes); - tuple.put(i, factory.createDatum(builder.build())); + outTuple.put(i, factory.createDatum(builder.build())); break; } case INET4 : byte [] ipv4Bytes = new byte[4]; buffer.get(ipv4Bytes); - tuple.put(i, DatumFactory.createInet4(ipv4Bytes)); + outTuple.put(i, DatumFactory.createInet4(ipv4Bytes)); break; case DATE: { int val = buffer.getInt(); if (val < Integer.MIN_VALUE + 1) { - tuple.put(i, DatumFactory.createNullDatum()); + outTuple.put(i, DatumFactory.createNullDatum()); } else { - tuple.put(i, DatumFactory.createFromInt4(columnTypes[i], val)); + outTuple.put(i, DatumFactory.createFromInt4(columnTypes[i], val)); } break; } @@ -361,14 +361,14 @@ public class RawFile { case TIMESTAMP: { long val = buffer.getLong(); if (val < Long.MIN_VALUE + 1) { - tuple.put(i, DatumFactory.createNullDatum()); + outTuple.put(i, DatumFactory.createNullDatum()); } else { - tuple.put(i, DatumFactory.createFromInt8(columnTypes[i], val)); + outTuple.put(i, DatumFactory.createFromInt8(columnTypes[i], val)); } break; } case NULL_TYPE: - tuple.put(i, NullDatum.get()); + outTuple.put(i, NullDatum.get()); break; default: @@ -380,7 +380,7 @@ public class RawFile { if(filePosition - buffer.remaining() >= endOffset){ eos = true; } - return new VTuple(tuple); + return outTuple; } private void reSizeBuffer(int writableBytes){ http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RowFile.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RowFile.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RowFile.java index 2be2ec0..ef597ea 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RowFile.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RowFile.java @@ -119,6 +119,8 @@ public class RowFile { buffer.flip(); } + tuple = new VTuple(schema.size()); + super.init(); } @@ -182,7 +184,6 @@ public class RowFile { } int i; - tuple = new VTuple(schema.size()); int nullFlagSize = buffer.getShort(); byte[] nullFlagBytes = new byte[nullFlagSize]; http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/avro/AvroScanner.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/avro/AvroScanner.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/avro/AvroScanner.java index 729c237..217acba 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/avro/AvroScanner.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/avro/AvroScanner.java @@ -54,6 +54,7 @@ public class AvroScanner extends FileScanner { private List<Schema.Field> avroFields; private DataFileReader<GenericRecord> dataFileReader; private int[] projectionMap; + private Tuple outTuple; /** * Creates a new AvroScanner. @@ -78,6 +79,7 @@ public class AvroScanner extends FileScanner { targets = schema.toArray(); } prepareProjection(targets); + outTuple = new VTuple(projectionMap.length); avroSchema = AvroUtil.getAvroSchema(meta, conf); avroFields = avroSchema.getFields(); @@ -176,13 +178,12 @@ public class AvroScanner extends FileScanner { return null; } - Tuple tuple = new VTuple(projectionMap.length); GenericRecord record = dataFileReader.next(); for (int i = 0; i < projectionMap.length; ++i) { int columnIndex = projectionMap[i]; Object value = record.get(columnIndex); if (value == null) { - tuple.put(i, NullDatum.get()); + outTuple.put(i, NullDatum.get()); continue; } @@ -197,28 +198,28 @@ public class AvroScanner extends FileScanner { TajoDataTypes.Type tajoType = dataType.getType(); switch (avroType) { case NULL: - tuple.put(i, NullDatum.get()); + outTuple.put(i, NullDatum.get()); break; case BOOLEAN: - tuple.put(i, DatumFactory.createBool((Boolean)value)); + outTuple.put(i, DatumFactory.createBool((Boolean) value)); break; case INT: - tuple.put(i, convertInt(value, tajoType)); + outTuple.put(i, convertInt(value, tajoType)); break; case LONG: - tuple.put(i, DatumFactory.createInt8((Long)value)); + outTuple.put(i, DatumFactory.createInt8((Long) value)); break; case FLOAT: - tuple.put(i, DatumFactory.createFloat4((Float)value)); + outTuple.put(i, DatumFactory.createFloat4((Float) value)); break; case DOUBLE: - tuple.put(i, DatumFactory.createFloat8((Double)value)); + outTuple.put(i, DatumFactory.createFloat8((Double) value)); break; case BYTES: - tuple.put(i, convertBytes(value, tajoType, dataType)); + outTuple.put(i, convertBytes(value, tajoType, dataType)); break; case STRING: - tuple.put(i, convertString(value, tajoType)); + outTuple.put(i, convertString(value, tajoType)); break; case RECORD: throw new RuntimeException("Avro RECORD not supported."); @@ -229,13 +230,13 @@ public class AvroScanner extends FileScanner { case UNION: throw new RuntimeException("Avro UNION not supported."); case FIXED: - tuple.put(i, new BlobDatum(((GenericFixed)value).bytes())); + outTuple.put(i, new BlobDatum(((GenericFixed) value).bytes())); break; default: throw new RuntimeException("Unknown type."); } } - return tuple; + return outTuple; } /** @@ -253,6 +254,7 @@ public class AvroScanner extends FileScanner { if (dataFileReader != null) { dataFileReader.close(); } + outTuple = null; } /** http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/index/bst/BSTIndex.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/index/bst/BSTIndex.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/index/bst/BSTIndex.java index 81a1ffd..ba81c3e 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/index/bst/BSTIndex.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/index/bst/BSTIndex.java @@ -126,15 +126,21 @@ public class BSTIndex implements IndexMethod { } @Override - public void write(Tuple key, long offset) throws IOException { - if (firstKey == null || compartor.compare(key, firstKey) < 0) { - firstKey = key; + public void write(final Tuple key, final long offset) throws IOException { + Tuple keyTuple; + try { + keyTuple = key.clone(); + } catch (CloneNotSupportedException e) { + throw new IOException(e); + } + if (firstKey == null || compartor.compare(keyTuple, firstKey) < 0) { + firstKey = keyTuple; } - if (lastKey == null || compartor.compare(lastKey, key) < 0) { - lastKey = key; + if (lastKey == null || compartor.compare(lastKey, keyTuple) < 0) { + lastKey = keyTuple; } - collector.put(key, offset); + collector.put(keyTuple, offset); } public TupleComparator getComparator() { @@ -253,7 +259,7 @@ public class BSTIndex implements IndexMethod { map = new TreeMap<Tuple, LinkedList<Long>>(comparator); } - public void put(Tuple key, long offset) { + public void put(final Tuple key, final long offset) { if (map.containsKey(key)) { map.get(key).add(offset); } else { http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java index 286ee3a..47d0267 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java @@ -1182,6 +1182,8 @@ public class RCFile { private byte[] nullChars; private SerializerDeserializer serde; + private Tuple outTuple; + public RCFileScanner(Configuration conf, final Schema schema, final TableMeta meta, final Fragment fragment) throws IOException { super(conf, schema, meta, fragment); @@ -1214,6 +1216,8 @@ public class RCFile { targets = schema.toArray(); } + outTuple = new VTuple(targets.length); + targetColumnIndexes = new int[targets.length]; for (int i = 0; i < targets.length; i++) { targetColumnIndexes[i] = schema.getColumnId(targets[i].getQualifiedName()); @@ -1641,9 +1645,8 @@ public class RCFile { return null; } - Tuple tuple = new VTuple(targets.length); - getCurrentRow(tuple); - return tuple; + getCurrentRow(outTuple); + return outTuple; } @Override http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileScanner.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileScanner.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileScanner.java index 340e2fa..bac8779 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileScanner.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileScanner.java @@ -18,6 +18,7 @@ package org.apache.tajo.storage.sequencefile; +import io.netty.buffer.ByteBuf; import org.apache.commons.lang.StringEscapeUtils; import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; @@ -35,7 +36,10 @@ import org.apache.tajo.exception.UnsupportedException; import org.apache.tajo.plan.expr.EvalNode; import org.apache.tajo.storage.*; import org.apache.tajo.storage.fragment.Fragment; -import org.apache.tajo.util.BytesUtils; +import org.apache.tajo.storage.text.ByteBufLineReader; +import org.apache.tajo.storage.text.DelimitedTextFile; +import org.apache.tajo.storage.text.TextLineDeserializer; +import org.apache.tajo.storage.text.TextLineParsingError; import java.io.IOException; @@ -73,6 +77,11 @@ public class SequenceFileScanner extends FileScanner { private Writable EMPTY_KEY; + private TextLineDeserializer deserializer; + private ByteBuf byteBuf = BufferPool.directBuffer(ByteBufLineReader.DEFAULT_BUFFER); + + private Tuple outTuple; + public SequenceFileScanner(Configuration conf, Schema schema, TableMeta meta, Fragment fragment) throws IOException { super(conf, schema, meta, fragment); } @@ -109,6 +118,9 @@ public class SequenceFileScanner extends FileScanner { targets = schema.toArray(); } + outTuple = new VTuple(targets.length); + deserializer = DelimitedTextFile.getLineSerde(meta).createDeserializer(schema, meta, targets); + deserializer.init(); fieldIsNull = new boolean[schema.getRootColumns().size()]; fieldStart = new int[schema.getRootColumns().size()]; @@ -149,6 +161,8 @@ public class SequenceFileScanner extends FileScanner { } } + Text text = new Text(); + @Override public Tuple next() throws IOException { if (!more) return null; @@ -163,24 +177,25 @@ public class SequenceFileScanner extends FileScanner { } if (more) { - Tuple tuple = null; - byte[][] cells; - if (hasBinarySerDe) { BytesWritable bytesWritable = new BytesWritable(); reader.getCurrentValue(bytesWritable); - tuple = makeTuple(bytesWritable); + makeTuple(bytesWritable); totalBytes += (long)bytesWritable.getBytes().length; } else { - Text text = new Text(); reader.getCurrentValue(text); - cells = BytesUtils.splitPreserveAllTokens(text.getBytes(), - delimiter, projectionMap, schema.getRootColumns().size()); - totalBytes += (long)text.getBytes().length; - tuple = new LazyTuple(schema, cells, 0, nullChars, serde); + + byteBuf.clear(); + byteBuf.writeBytes(text.getBytes(), 0, text.getLength()); + + try { + deserializer.deserialize(byteBuf, outTuple); + } catch (TextLineParsingError e) { + throw new IOException(e); + } } currentIdx++; - return tuple; + return outTuple; } else { return null; } @@ -200,8 +215,6 @@ public class SequenceFileScanner extends FileScanner { * So, tajo must make a tuple after parsing hive style BinarySerDe. */ private Tuple makeTuple(BytesWritable value) throws IOException{ - Tuple tuple = new VTuple(schema.getRootColumns().size()); - int start = 0; int length = value.getLength(); @@ -229,7 +242,7 @@ public class SequenceFileScanner extends FileScanner { for (int j = 0; j < projectionMap.length; j++) { if (projectionMap[j] == i) { Datum datum = serde.deserialize(i, bytes, fieldStart[i], fieldLength[i], nullChars); - tuple.put(i, datum); + outTuple.put(i, datum); } } } @@ -247,7 +260,7 @@ public class SequenceFileScanner extends FileScanner { } } - return tuple; + return outTuple; } /** @@ -321,11 +334,16 @@ public class SequenceFileScanner extends FileScanner { tableStats.setReadBytes(totalBytes); tableStats.setNumRows(currentIdx); } + + outTuple = null; + if (this.byteBuf.refCnt() > 0) { + this.byteBuf.release(); + } } @Override public boolean isProjectable() { - return false; + return true; } @Override http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/ByteBufLineReader.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/ByteBufLineReader.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/ByteBufLineReader.java index e23e8f8..3c667bf 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/ByteBufLineReader.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/ByteBufLineReader.java @@ -29,7 +29,7 @@ import java.io.IOException; import java.util.concurrent.atomic.AtomicInteger; public class ByteBufLineReader implements Closeable { - private static int DEFAULT_BUFFER = 64 * 1024; + public static int DEFAULT_BUFFER = 64 * 1024; private int bufferSize; private long readBytes; http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineSerializer.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineSerializer.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineSerializer.java index 21984f2..3445676 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineSerializer.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineSerializer.java @@ -21,6 +21,7 @@ package org.apache.tajo.storage.text; import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.TableMeta; import org.apache.tajo.storage.FieldSerializerDeserializer; +import org.apache.tajo.storage.StorageConstants; import org.apache.tajo.storage.Tuple; import java.io.IOException; http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java index fdeba4e..c68690a 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java @@ -286,6 +286,8 @@ public class DelimitedTextFile { /** How many errors have occurred? */ private int errorNum; + private VTuple outTuple; + public DelimitedTextFileScanner(Configuration conf, final Schema schema, final TableMeta meta, final Fragment fragment) throws IOException { @@ -321,6 +323,8 @@ public class DelimitedTextFile { targets = schema.toArray(); } + outTuple = new VTuple(targets.length); + super.init(); if (LOG.isDebugEnabled()) { LOG.debug("DelimitedTextFileScanner open:" + fragment.getPath() + "," + startOffset + "," + endOffset); @@ -375,7 +379,6 @@ public class DelimitedTextFile { @Override public Tuple next() throws IOException { - VTuple tuple; if (!reader.isReadable()) { return null; @@ -400,11 +403,10 @@ public class DelimitedTextFile { return EmptyTuple.get(); } - tuple = new VTuple(targets.length); - tuple.setOffset(offset); + outTuple.setOffset(offset); try { - deserializer.deserialize(buf, tuple); + deserializer.deserialize(buf, outTuple); // if a line is read normally, it exits this loop. break; @@ -429,7 +431,7 @@ public class DelimitedTextFile { // recordCount means the number of actual read records. We increment the count here. recordCount++; - return tuple; + return outTuple; } catch (Throwable t) { LOG.error(t); @@ -459,6 +461,7 @@ public class DelimitedTextFile { } finally { IOUtils.cleanup(LOG, reader); reader = null; + outTuple = null; } } http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextLineSerDe.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextLineSerDe.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextLineSerDe.java index c09a83b..4cdf7df 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextLineSerDe.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextLineSerDe.java @@ -42,7 +42,12 @@ public abstract class TextLineSerDe { public abstract TextLineSerializer createSerializer(Schema schema, TableMeta meta); public static ByteBuf getNullChars(TableMeta meta) { - byte[] nullCharByteArray = getNullCharsAsBytes(meta); + byte[] nullCharByteArray; + if (meta.getStoreType().equals("SEQUENCEFILE")) { + nullCharByteArray = getNullCharsAsBytes(meta, StorageConstants.SEQUENCEFILE_NULL, "\\"); + } else { + nullCharByteArray = getNullCharsAsBytes(meta); + } ByteBuf nullChars = BufferPool.directBuffer(nullCharByteArray.length, nullCharByteArray.length); nullChars.writeBytes(nullCharByteArray); @@ -51,10 +56,13 @@ public abstract class TextLineSerDe { } public static byte [] getNullCharsAsBytes(TableMeta meta) { + return getNullCharsAsBytes(meta, StorageConstants.TEXT_NULL, NullDatum.DEFAULT_TEXT); + } + + public static byte[] getNullCharsAsBytes(TableMeta meta, String key, String defaultVal) { byte [] nullChars; - String nullCharacters = StringEscapeUtils.unescapeJava(meta.getOption(StorageConstants.TEXT_NULL, - NullDatum.DEFAULT_TEXT)); + String nullCharacters = StringEscapeUtils.unescapeJava(meta.getOption(key, defaultVal)); if (StringUtils.isEmpty(nullCharacters)) { nullChars = NullDatum.get().asTextBytes(); } else { http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestMergeScanner.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestMergeScanner.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestMergeScanner.java index 35a5ea4..0ba75dd 100644 --- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestMergeScanner.java +++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestMergeScanner.java @@ -109,8 +109,7 @@ public class TestMergeScanner { TableMeta meta = CatalogUtil.newTableMeta(storeType, options); meta.setOptions(CatalogUtil.newDefaultProperty(storeType)); if (storeType.equalsIgnoreCase("AVRO")) { - meta.putOption(StorageConstants.AVRO_SCHEMA_LITERAL, - TEST_MULTIPLE_FILES_AVRO_SCHEMA); + meta.putOption(StorageConstants.AVRO_SCHEMA_LITERAL, TEST_MULTIPLE_FILES_AVRO_SCHEMA); } Path table1Path = new Path(testDir, storeType + "_1.data"); @@ -201,6 +200,8 @@ public class TestMergeScanner { private static boolean isProjectableStorage(String type) { if (type.equalsIgnoreCase("RCFILE") || type.equalsIgnoreCase("PARQUET") || + type.equalsIgnoreCase("CSV") || + type.equalsIgnoreCase("SEQUENCEFILE") || type.equalsIgnoreCase("AVRO")) { return true; } else { http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java index b53dbec..c3f31a0 100644 --- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java +++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java @@ -369,6 +369,7 @@ public class TestStorages { DatumFactory.createInet4("192.168.0.1"), NullDatum.get() }); + if (handleProtobuf) { tuple.put(11, factory.createDatum(queryid.getProto())); } @@ -420,8 +421,7 @@ public class TestStorages { meta.putOption(StorageConstants.RCFILE_SERDE, TextSerializerDeserializer.class.getName()); meta.putOption(StorageConstants.SEQUENCEFILE_NULL, "\\"); if (storeType.equalsIgnoreCase("AVRO")) { - meta.putOption(StorageConstants.AVRO_SCHEMA_LITERAL, - TEST_NULL_HANDLING_TYPES_AVRO_SCHEMA); + meta.putOption(StorageConstants.AVRO_SCHEMA_LITERAL, TEST_NULL_HANDLING_TYPES_AVRO_SCHEMA); } Path tablePath = new Path(testDir, "testVariousTypes.data");
