Repository: tajo Updated Branches: refs/heads/master 9d59e06b1 -> b68329101
http://git-wip-us.apache.org/repos/asf/tajo/blob/b6832910/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java ---------------------------------------------------------------------- diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java index 94e5e71..a41812a 100644 --- a/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java +++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java @@ -433,8 +433,15 @@ public class TestTablePartitions extends QueryTestCaseBase { assertEquals(resultRows2.get(res.getDouble(4))[1], res.getInt(3)); } - res = executeString("SELECT col1, col2, col3 FROM " + tableName); + res = executeString("select * from " + tableName + " WHERE (col1 ='1' or col1 = '100') and col3 > 20"); String result = resultSetToString(res); + String expectedResult = "col4,col1,col2,col3\n" + + "-------------------------------\n" + + "N,1,1,36.0\n"; + res.close(); + assertEquals(expectedResult, result); + + res = executeString("SELECT col1, col2, col3 FROM " + tableName); res.close(); verifyPartitionDirectoryFromCatalog(DEFAULT_DATABASE_NAME, tableName, new String[]{"col1", "col2", "col3"}, @@ -589,7 +596,7 @@ public class TestTablePartitions extends QueryTestCaseBase { } private final void verifyKeptExistingData(ResultSet res, String tableName) throws Exception { - res = executeString("select * from " + tableName + " where col2 = 1"); + res = executeString("select * from " + tableName + " where col2 = 1 order by col4, col1, col2, col3"); String resultSetData = resultSetToString(res); res.close(); String expected = "col4,col1,col2,col3\n" + @@ -1253,7 +1260,7 @@ public class TestTablePartitions extends QueryTestCaseBase { } query.append(" ").append(partitionColumn); } - query.append(" FROM ").append(tableName); + query.append(" FROM ").append(databaseName).append(".").append(tableName); ResultSet res = executeString(query.toString()); StringBuilder partitionName = new StringBuilder(); @@ -1313,7 +1320,7 @@ public class TestTablePartitions extends QueryTestCaseBase { // If duplicated partitions had been removed, partitions just will contain 'KEY=N' partition and 'KEY=R' // partition. In previous Query and Stage, duplicated partitions were not deleted because they had been in List. // If you want to verify duplicated partitions, you need to use List instead of Set with DerbyStore. - List<PartitionDescProto> partitions = catalog.getPartitions(DEFAULT_DATABASE_NAME, tableName); + List<PartitionDescProto> partitions = catalog.getAllPartitions(DEFAULT_DATABASE_NAME, tableName); assertEquals(2, partitions.size()); PartitionDescProto firstPartition = catalog.getPartition(DEFAULT_DATABASE_NAME, tableName, "key=N"); @@ -1325,4 +1332,421 @@ public class TestTablePartitions extends QueryTestCaseBase { executeString("DROP TABLE " + tableName + " PURGE"); } } + + @Test + public final void testPatternMatchingPredicatesAndStringFunctions() throws Exception { + ResultSet res = null; + String tableName = CatalogUtil.normalizeIdentifier("testPatternMatchingPredicatesAndStringFunctions"); + String expectedResult; + + if (nodeType == NodeType.INSERT) { + executeString("create table " + tableName + + " (col1 int4, col2 int4) partition by column(l_shipdate text, l_returnflag text) ").close(); + + assertTrue(catalog.existsTable(DEFAULT_DATABASE_NAME, tableName)); + assertEquals(2, catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName).getSchema().size()); + assertEquals(4, catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName).getLogicalSchema().size()); + + executeString( + "insert overwrite into " + tableName + " select l_orderkey, l_partkey, l_shipdate, l_returnflag from lineitem"); + } else { + executeString( + "create table " + tableName + "(col1 int4, col2 int4) partition by column(l_shipdate text, l_returnflag text) " + + " as select l_orderkey, l_partkey, l_shipdate, l_returnflag from lineitem"); + } + + assertTrue(client.existTable(tableName)); + + // Like + res = executeString("SELECT * FROM " + tableName + + " WHERE l_shipdate LIKE '1996%' and l_returnflag = 'N' order by l_shipdate"); + + expectedResult = "col1,col2,l_shipdate,l_returnflag\n" + + "-------------------------------\n" + + "1,1,1996-03-13,N\n" + + "1,1,1996-04-12,N\n"; + + assertEquals(expectedResult, resultSetToString(res)); + res.close(); + + // Not like + res = executeString("SELECT * FROM " + tableName + + " WHERE l_shipdate NOT LIKE '1996%' and l_returnflag IN ('R') order by l_shipdate"); + + expectedResult = "col1,col2,l_shipdate,l_returnflag\n" + + "-------------------------------\n" + + "3,3,1993-11-09,R\n" + + "3,2,1994-02-02,R\n"; + + assertEquals(expectedResult, resultSetToString(res)); + res.close(); + + // In + res = executeString("SELECT * FROM " + tableName + + " WHERE l_shipdate IN ('1993-11-09', '1994-02-02', '1997-01-28') AND l_returnflag = 'R' order by l_shipdate"); + + expectedResult = "col1,col2,l_shipdate,l_returnflag\n" + + "-------------------------------\n" + + "3,3,1993-11-09,R\n" + + "3,2,1994-02-02,R\n"; + + assertEquals(expectedResult, resultSetToString(res)); + res.close(); + + // Similar to + res = executeString("SELECT * FROM " + tableName + " WHERE l_shipdate similar to '1993%' order by l_shipdate"); + + expectedResult = "col1,col2,l_shipdate,l_returnflag\n" + + "-------------------------------\n" + + "3,3,1993-11-09,R\n"; + + assertEquals(expectedResult, resultSetToString(res)); + res.close(); + + // Regular expression + res = executeString("SELECT * FROM " + tableName + + " WHERE l_shipdate regexp '[1-2][0-9][0-9][3-9]-[0-1][0-9]-[0-3][0-9]' " + + " AND l_returnflag <> 'N' ORDER BY l_shipdate"); + + expectedResult = "col1,col2,l_shipdate,l_returnflag\n" + + "-------------------------------\n" + + "3,3,1993-11-09,R\n" + + "3,2,1994-02-02,R\n"; + + assertEquals(expectedResult, resultSetToString(res)); + res.close(); + + // Concatenate + res = executeString("SELECT * FROM " + tableName + + " WHERE l_shipdate = ( '1996' || '-' || '03' || '-' || '13' ) order by l_shipdate"); + + expectedResult = "col1,col2,l_shipdate,l_returnflag\n" + + "-------------------------------\n" + + "1,1,1996-03-13,N\n"; + + assertEquals(expectedResult, resultSetToString(res)); + res.close(); + + executeString("DROP TABLE " + tableName + " PURGE").close(); + res.close(); + } + + @Test + public final void testDatePartitionColumn() throws Exception { + ResultSet res = null; + String tableName = CatalogUtil.normalizeIdentifier("testDatePartitionColumn"); + String expectedResult; + + if (nodeType == NodeType.INSERT) { + executeString("create table " + tableName + " (col1 int4, col2 int4) partition by column(key date) ").close(); + + assertTrue(catalog.existsTable(DEFAULT_DATABASE_NAME, tableName)); + assertEquals(2, catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName).getSchema().size()); + assertEquals(3, catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName).getLogicalSchema().size()); + + executeString( + "insert overwrite into " + tableName + " select l_orderkey, l_partkey, l_shipdate from lineitem"); + } else { + executeString( + "create table " + tableName + "(col1 int4, col2 int4) partition by column(key date) " + + " as select l_orderkey, l_partkey, l_shipdate::date from lineitem"); + } + + assertTrue(client.existTable(tableName)); + + // LessThanOrEquals + res = executeString("SELECT * FROM " + tableName + " WHERE key <= date '1995-09-01' order by col1, col2, key"); + + expectedResult = "col1,col2,key\n" + + "-------------------------------\n" + + "3,2,1994-02-02\n" + + "3,3,1993-11-09\n"; + + assertEquals(expectedResult, resultSetToString(res)); + res.close(); + + // LessThan and GreaterThan + res = executeString("SELECT * FROM " + tableName + + " WHERE key > to_date('1993-01-01', 'YYYY-MM-DD') " + + " and key < to_date('1996-01-01', 'YYYY-MM-DD') order by col1, col2, key desc"); + + expectedResult = "col1,col2,key\n" + + "-------------------------------\n" + + "3,2,1994-02-02\n" + + "3,3,1993-11-09\n"; + + assertEquals(expectedResult, resultSetToString(res)); + res.close(); + + // Between + res = executeString("SELECT * FROM " + tableName + + " WHERE key between date '1993-01-01' and date '1997-01-01' order by col1, col2, key desc"); + + expectedResult = "col1,col2,key\n" + + "-------------------------------\n" + + "1,1,1996-04-12\n" + + "1,1,1996-03-13\n" + + "3,2,1994-02-02\n" + + "3,3,1993-11-09\n"; + + assertEquals(expectedResult, resultSetToString(res)); + res.close(); + + // Cast + res = executeString("SELECT * FROM " + tableName + + " WHERE key > '1993-01-01'::date " + + " and key < '1997-01-01'::timestamp order by col1, col2, key "); + + expectedResult = "col1,col2,key\n" + + "-------------------------------\n" + + "1,1,1996-03-13\n" + + "1,1,1996-04-12\n" + + "3,2,1994-02-02\n" + + "3,3,1993-11-09\n"; + + assertEquals(expectedResult, resultSetToString(res)); + res.close(); + + // Interval + res = executeString("SELECT * FROM " + tableName + + " WHERE key > '1993-01-01'::date " + + " and key < date '1994-01-01' + interval '1 year' order by col1, col2, key "); + + expectedResult = "col1,col2,key\n" + + "-------------------------------\n" + + "3,2,1994-02-02\n" + + "3,3,1993-11-09\n"; + + assertEquals(expectedResult, resultSetToString(res)); + res.close(); + + // DateTime Function #1 + res = executeString("SELECT * FROM " + tableName + + " WHERE key > '1993-01-01'::date " + + " and key < add_months(date '1994-01-01', 12) order by col1, col2, key "); + + assertEquals(expectedResult, resultSetToString(res)); + res.close(); + + // DateTime Function #2 + res = executeString("SELECT * FROM " + tableName + + " WHERE key > '1993-01-01'::date " + + " and key < add_months('1994-01-01'::timestamp, 12) order by col1, col2, key "); + + assertEquals(expectedResult, resultSetToString(res)); + res.close(); + + executeString("DROP TABLE " + tableName + " PURGE").close(); + res.close(); + } + + @Test + public final void testTimestampPartitionColumn() throws Exception { + ResultSet res = null; + String tableName = CatalogUtil.normalizeIdentifier("testTimestampPartitionColumn"); + String expectedResult; + + if (nodeType == NodeType.INSERT) { + executeString("create table " + tableName + + " (col1 int4, col2 int4) partition by column(key timestamp) ").close(); + + assertTrue(catalog.existsTable(DEFAULT_DATABASE_NAME, tableName)); + assertEquals(2, catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName).getSchema().size()); + assertEquals(3, catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName).getLogicalSchema().size()); + + executeString( + "insert overwrite into " + tableName + + " select l_orderkey, l_partkey, to_timestamp(l_shipdate, 'YYYY-MM-DD') from lineitem"); + } else { + executeString( + "create table " + tableName + "(col1 int4, col2 int4) partition by column(key timestamp) " + + " as select l_orderkey, l_partkey, to_timestamp(l_shipdate, 'YYYY-MM-DD') from lineitem"); + } + + assertTrue(client.existTable(tableName)); + + // LessThanOrEquals + res = executeString("SELECT * FROM " + tableName + + " WHERE key <= to_timestamp('1995-09-01', 'YYYY-MM-DD') order by col1, col2, key"); + + expectedResult = "col1,col2,key\n" + + "-------------------------------\n" + + "3,2,1994-02-02 00:00:00\n" + + "3,3,1993-11-09 00:00:00\n"; + + assertEquals(expectedResult, resultSetToString(res)); + res.close(); + + // LessThan and GreaterThan + res = executeString("SELECT * FROM " + tableName + + " WHERE key > to_timestamp('1993-01-01', 'YYYY-MM-DD') and " + + "key < to_timestamp('1996-01-01', 'YYYY-MM-DD') order by col1, col2, key desc"); + + expectedResult = "col1,col2,key\n" + + "-------------------------------\n" + + "3,2,1994-02-02 00:00:00\n" + + "3,3,1993-11-09 00:00:00\n"; + + assertEquals(expectedResult, resultSetToString(res)); + res.close(); + + // Between + res = executeString("SELECT * FROM " + tableName + + " WHERE key between to_timestamp('1993-01-01', 'YYYY-MM-DD') " + + "and to_timestamp('1997-01-01', 'YYYY-MM-DD') order by col1, col2, key desc"); + + expectedResult = "col1,col2,key\n" + + "-------------------------------\n" + + "1,1,1996-04-12 00:00:00\n" + + "1,1,1996-03-13 00:00:00\n" + + "3,2,1994-02-02 00:00:00\n" + + "3,3,1993-11-09 00:00:00\n"; + + assertEquals(expectedResult, resultSetToString(res)); + res.close(); + + executeString("DROP TABLE " + tableName + " PURGE").close(); + res.close(); + } + + @Test + public final void testTimePartitionColumn() throws Exception { + ResultSet res = null; + String tableName = CatalogUtil.normalizeIdentifier("testTimePartitionColumn"); + String expectedResult; + + if (nodeType == NodeType.INSERT) { + executeString("create table " + tableName + + " (col1 int4, col2 int4) partition by column(key time) ").close(); + + assertTrue(catalog.existsTable(DEFAULT_DATABASE_NAME, tableName)); + assertEquals(2, catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName).getSchema().size()); + assertEquals(3, catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName).getLogicalSchema().size()); + + executeString( + "insert overwrite into " + tableName + + " select l_orderkey, l_partkey " + + " , CASE l_shipdate WHEN '1996-03-13' THEN cast ('11:20:40' as time) " + + " WHEN '1997-01-28' THEN cast ('12:10:20' as time) " + + " WHEN '1994-02-02' THEN cast ('12:10:30' as time) " + + " ELSE cast ('00:00:00' as time) END " + + " from lineitem"); + } else { + executeString( + "create table " + tableName + "(col1 int4, col2 int4) partition by column(key time) " + + " as select l_orderkey, l_partkey " + + " , CASE l_shipdate WHEN '1996-03-13' THEN cast ('11:20:40' as time) " + + " WHEN '1997-01-28' THEN cast ('12:10:20' as time) " + + " WHEN '1994-02-02' THEN cast ('12:10:30' as time) " + + " ELSE cast ('00:00:00' as time) END " + + " from lineitem"); + } + + assertTrue(client.existTable(tableName)); + // LessThanOrEquals + res = executeString("SELECT * FROM " + tableName + + " WHERE key <= cast('12:10:20' as time) order by col1, col2, key"); + + expectedResult = "col1,col2,key\n" + + "-------------------------------\n" + + "1,1,00:00:00\n" + + "1,1,11:20:40\n" + + "2,2,12:10:20\n" + + "3,3,00:00:00\n"; + + assertEquals(expectedResult, resultSetToString(res)); + res.close(); + + // LessThan and GreaterThan + res = executeString("SELECT * FROM " + tableName + + " WHERE key > cast('00:00:00' as time) and " + + "key < cast('12:10:00' as time) order by col1, col2, key desc"); + + expectedResult = "col1,col2,key\n" + + "-------------------------------\n" + + "1,1,11:20:40\n"; + + assertEquals(expectedResult, resultSetToString(res)); + res.close(); + + // Between + res = executeString("SELECT * FROM " + tableName + + " WHERE key between cast('11:00:00' as time) " + + "and cast('13:00:00' as time) order by col1, col2, key desc"); + + expectedResult = "col1,col2,key\n" + + "-------------------------------\n" + + "1,1,11:20:40\n" + + "2,2,12:10:20\n" + + "3,2,12:10:30\n"; + + assertEquals(expectedResult, resultSetToString(res)); + res.close(); + + executeString("DROP TABLE " + tableName + " PURGE").close(); + res.close(); + } + + @Test + public final void testRemainPartitionPath() throws Exception { + ResultSet res = null; + executeString("create database test_partition").close(); + + String databaseName = "test_partition"; + String tableName = CatalogUtil.normalizeIdentifier("part"); + + ClientProtos.SubmitQueryResponse response; + if (nodeType == NodeType.INSERT) { + res = executeString( + "create table " + databaseName + "." + tableName + " (col1 int4, col2 int4) partition by column(key float8) "); + res.close(); + + assertTrue(catalog.existsTable(databaseName, tableName)); + assertEquals(2, catalog.getTableDesc(databaseName, tableName).getSchema().size()); + assertEquals(3, catalog.getTableDesc(databaseName, tableName).getLogicalSchema().size()); + + response = client.executeQuery( + "insert overwrite into " + databaseName + "." + tableName + " select l_orderkey, l_partkey, " + + "l_quantity from lineitem"); + } else { + response = client.executeQuery( + "create table "+ databaseName + "." + tableName + "(col1 int4, col2 int4) partition by column(key float8) " + + " as select l_orderkey, l_partkey, l_quantity from lineitem"); + } + + QueryId queryId = new QueryId(response.getQueryId()); + testingCluster.waitForQuerySubmitted(queryId, 10); + QueryMasterTask queryMasterTask = testingCluster.getQueryMasterTask(queryId); + assertNotNull(queryMasterTask); + TajoClientUtil.waitCompletion(client, queryId); + + MasterPlan plan = queryMasterTask.getQuery().getPlan(); + + ExecutionBlock rootEB = plan.getRoot(); + + assertEquals(1, plan.getChildCount(rootEB.getId())); + + ExecutionBlock insertEB = plan.getChild(rootEB.getId(), 0); + assertNotNull(insertEB); + + assertEquals(nodeType, insertEB.getPlan().getType()); + assertEquals(1, plan.getChildCount(insertEB.getId())); + + ExecutionBlock scanEB = plan.getChild(insertEB.getId(), 0); + + List<DataChannel> list = plan.getOutgoingChannels(scanEB.getId()); + assertEquals(1, list.size()); + DataChannel channel = list.get(0); + assertNotNull(channel); + assertEquals(SCATTERED_HASH_SHUFFLE, channel.getShuffleType()); + assertEquals(1, channel.getShuffleKeys().length); + + TableDesc tableDesc = catalog.getTableDesc(databaseName, tableName); + verifyPartitionDirectoryFromCatalog(databaseName, tableName, new String[]{"key"}, + tableDesc.getStats().getNumRows()); + + executeString("DROP TABLE " + databaseName + "." + tableName + " PURGE").close(); + executeString("DROP database " + databaseName).close(); + } + } http://git-wip-us.apache.org/repos/asf/tajo/blob/b6832910/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ColPartitionStoreExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ColPartitionStoreExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ColPartitionStoreExec.java index a8a1c78..832b8d3 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ColPartitionStoreExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ColPartitionStoreExec.java @@ -38,6 +38,7 @@ import org.apache.tajo.plan.logical.NodeType; import org.apache.tajo.plan.logical.StoreTableNode; import org.apache.tajo.storage.*; import org.apache.tajo.unit.StorageUnit; +import org.apache.tajo.util.StringUtils; import org.apache.tajo.worker.TaskAttemptContext; import java.io.IOException; @@ -173,17 +174,18 @@ public abstract class ColPartitionStoreExec extends UnaryPhysicalExec { PartitionKeyProto.Builder keyBuilder = PartitionKeyProto.newBuilder(); keyBuilder.setColumnName(split[0]); - keyBuilder.setPartitionValue(split[1]); + // Partition path have been escaped to avoid URISyntaxException. But partition value of partition keys table + // need to contain unescaped value for comparing filter conditions in select statement. + keyBuilder.setPartitionValue(StringUtils.unescapePathName(split[1])); builder.addPartitionKeys(keyBuilder.build()); } if (this.plan.getUri() == null) { - // In CTAS, the uri would be null. So, - String[] split = CatalogUtil.splitTableName(plan.getTableName()); - int endIndex = storeTablePath.toString().indexOf(split[1]) + split[1].length(); + // In CTAS, the uri would be null. So, it get the uri from staging directory. + int endIndex = storeTablePath.toString().indexOf(FileTablespace.TMP_STAGING_DIR_PREFIX); String outputPath = storeTablePath.toString().substring(0, endIndex); - builder.setPath(outputPath + "/" + partition); + builder.setPath(outputPath + partition); } else { builder.setPath(this.plan.getUri().toString() + "/" + partition); } http://git-wip-us.apache.org/repos/asf/tajo/blob/b6832910/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java index cd43add..6160cdb 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java @@ -970,7 +970,7 @@ public class TajoMasterClientService extends AbstractService { tableName = request.getValue(); } - List<PartitionDescProto> partitions = catalog.getPartitions(databaseName, tableName); + List<PartitionDescProto> partitions = catalog.getAllPartitions(databaseName, tableName); return PartitionListResponse.newBuilder() .setState(OK) .addAllPartition(partitions) http://git-wip-us.apache.org/repos/asf/tajo/blob/b6832910/tajo-plan/src/main/java/org/apache/tajo/plan/expr/AlgebraicUtil.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/expr/AlgebraicUtil.java b/tajo-plan/src/main/java/org/apache/tajo/plan/expr/AlgebraicUtil.java index d06c1d3..e61d361 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/expr/AlgebraicUtil.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/expr/AlgebraicUtil.java @@ -18,15 +18,19 @@ package org.apache.tajo.plan.expr; +import com.google.common.base.Preconditions; +import com.google.common.collect.Sets; import org.apache.tajo.algebra.*; import org.apache.tajo.catalog.Column; +import org.apache.tajo.catalog.proto.CatalogProtos; import org.apache.tajo.exception.TajoException; import org.apache.tajo.plan.visitor.SimpleAlgebraVisitor; +import org.apache.tajo.util.TUtil; import java.util.*; public class AlgebraicUtil { - + /** * Transpose a given comparison expression into the expression * where the variable corresponding to the target is placed @@ -493,4 +497,190 @@ public class AlgebraicUtil { return super.visitTimeLiteral(ctx, stack, expr); } } + + /** + * Find the top expr matched to type from the given expr + * + * @param expr start expr + * @param type to find + * @return a found expr + */ + public static <T extends Expr> T findTopExpr(Expr expr, OpType type) throws TajoException { + Preconditions.checkNotNull(expr); + Preconditions.checkNotNull(type); + + ExprFinder finder = new ExprFinder(type); + finder.visit(null, new Stack<Expr>(), expr); + + if (finder.getFoundExprs().size() == 0) { + return null; + } + return (T) finder.getFoundExprs().get(0); + } + + private static class ExprFinder extends SimpleAlgebraVisitor<Object, Expr> { + private List<Expr> list = new ArrayList<Expr>(); + private final OpType[] tofind; + private boolean topmost = false; + private boolean finished = false; + + public ExprFinder(OpType... type) { + + this.tofind = type; + } + + public ExprFinder(OpType[] type, boolean topmost) { + this(type); + this.topmost = topmost; + } + + @Override + public Expr visit(Object ctx, Stack<Expr> stack, Expr expr) throws TajoException { + if (!finished) { + for (OpType type : tofind) { + if (expr.getType() == type) { + list.add(expr); + } + if (topmost && list.size() > 0) { + finished = true; + } + } + } + return super.visit(ctx, stack, expr); + } + + public List<Expr> getFoundExprs() { + return list; + } + + } + + public static Expr[] toConjunctiveNormalFormArray(Expr expr) { + List<Expr> list = new ArrayList<Expr>(); + toConjunctiveNormalFormArrayRecursive(expr, list); + return list.toArray(new Expr[list.size()]); + } + + private static void toConjunctiveNormalFormArrayRecursive(Expr node, List<Expr> found) { + if (node.getType() == OpType.And) { + toConjunctiveNormalFormArrayRecursive(((BinaryOperator) node).getLeft(), found); + toConjunctiveNormalFormArrayRecursive(((BinaryOperator) node).getRight(), found); + } else { + found.add(node); + } + } + + /** + * It finds unique columns from a Expr. + */ + public static LinkedHashSet<ColumnReferenceExpr> findUniqueColumnReferences(Expr expr) throws TajoException { + UniqueColumnReferenceFinder finder = new UniqueColumnReferenceFinder(); + finder.visit(null, new Stack<Expr>(), expr); + return finder.getColumnRefs(); + } + + private static class UniqueColumnReferenceFinder extends SimpleAlgebraVisitor<Object, Expr> { + private LinkedHashSet<ColumnReferenceExpr> columnSet = Sets.newLinkedHashSet(); + private ColumnReferenceExpr field = null; + + @Override + public Expr visit(Object ctx, Stack<Expr> stack, Expr expr) throws TajoException { + if (expr.getType() == OpType.Column) { + field = (ColumnReferenceExpr) expr; + columnSet.add(field); + } + return super.visit(ctx, stack, expr); + } + + public LinkedHashSet<ColumnReferenceExpr> getColumnRefs() { + return this.columnSet; + } + + } + + /** + * Build Exprs for all columns with a list of filter conditions. + * + * For example, consider you have a partitioned table for three columns (i.e., col1, col2, col3). + * Then, this methods will create three Exprs for (col1), (col2), (col3). + * + * Assume that an user gives a condition WHERE col1 ='A' and col3 = 'C'. + * There is no filter condition corresponding to col2. + * Then, the path filter conditions are corresponding to the followings: + * + * The first Expr: col1 = 'A' + * The second Expr: col2 IS NOT NULL + * The third Expr: col3 = 'C' + * + * 'IS NOT NULL' predicate is always true against the partition path. + * + * + * @param partitionColumns + * @param conjunctiveForms + * @return + */ + public static Expr[] getAccumulatedFiltersByExpr(String tableName, + List<CatalogProtos.ColumnProto> partitionColumns, Expr[] conjunctiveForms) throws TajoException { + Expr[] filters = new Expr[partitionColumns.size()]; + Column target; + + for (int i = 0; i < partitionColumns.size(); i++) { + List<Expr> accumulatedFilters = TUtil.newList(); + target = new Column(partitionColumns.get(i)); + ColumnReferenceExpr columnReference = new ColumnReferenceExpr(tableName, target.getSimpleName()); + + if (conjunctiveForms == null) { + accumulatedFilters.add(new IsNullPredicate(true, columnReference)); + } else { + for (Expr expr : conjunctiveForms) { + if (AlgebraicUtil.findUniqueColumnReferences(expr).contains(columnReference)) { + // Accumulate one qual per level + accumulatedFilters.add(expr); + } + } + + if (accumulatedFilters.size() == 0) { + accumulatedFilters.add(new IsNullPredicate(true, columnReference)); + } + } + + Expr filterPerLevel = AlgebraicUtil.createSingletonExprFromCNFByExpr( + accumulatedFilters.toArray(new Expr[accumulatedFilters.size()])); + filters[i] = filterPerLevel; + } + + return filters; + } + + public static Expr createSingletonExprFromCNFByExpr(Collection<Expr> cnfExprs) { + return createSingletonExprFromCNFByExpr(cnfExprs.toArray(new Expr[cnfExprs.size()])); + } + + /** + * Convert a list of conjunctive normal forms into a singleton expression. + * + * @param cnfExprs + * @return The EvalNode object that merges all CNF-formed expressions. + */ + public static Expr createSingletonExprFromCNFByExpr(Expr... cnfExprs) { + if (cnfExprs.length == 1) { + return cnfExprs[0]; + } + + return createSingletonExprFromCNFRecursiveByExpr(cnfExprs, 0); + } + + private static Expr createSingletonExprFromCNFRecursiveByExpr(Expr[] exprs, int idx) { + if (idx >= exprs.length) { + throw new ArrayIndexOutOfBoundsException("index " + idx + " is exceeded the maximum length ("+ + exprs.length+") of EvalNode"); + } + + if (idx == exprs.length - 2) { + return new BinaryOperator(OpType.And, exprs[idx], exprs[idx + 1]); + } else { + return new BinaryOperator(OpType.And, exprs[idx], createSingletonExprFromCNFRecursiveByExpr(exprs, idx + 1)); + } + } + } http://git-wip-us.apache.org/repos/asf/tajo/blob/b6832910/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/PartitionedTableRewriter.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/PartitionedTableRewriter.java b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/PartitionedTableRewriter.java index b5cd42b..47e2c6c 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/PartitionedTableRewriter.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/PartitionedTableRewriter.java @@ -24,19 +24,19 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.*; import org.apache.tajo.OverridableConf; -import org.apache.tajo.catalog.Column; -import org.apache.tajo.catalog.Schema; -import org.apache.tajo.catalog.TableDesc; +import org.apache.tajo.catalog.*; import org.apache.tajo.catalog.partition.PartitionMethodDesc; +import org.apache.tajo.catalog.proto.CatalogProtos.PartitionsByAlgebraProto; +import org.apache.tajo.catalog.proto.CatalogProtos.PartitionDescProto; import org.apache.tajo.datum.DatumFactory; import org.apache.tajo.datum.NullDatum; -import org.apache.tajo.exception.TajoException; -import org.apache.tajo.exception.TajoInternalError; +import org.apache.tajo.exception.*; import org.apache.tajo.plan.LogicalPlan; import org.apache.tajo.plan.expr.*; import org.apache.tajo.plan.logical.*; import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRule; import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRuleContext; +import org.apache.tajo.plan.util.EvalNodeToExprConverter; import org.apache.tajo.plan.util.PlannerUtil; import org.apache.tajo.plan.visitor.BasicLogicalPlanVisitor; import org.apache.tajo.storage.Tuple; @@ -44,11 +44,11 @@ import org.apache.tajo.storage.VTuple; import org.apache.tajo.util.StringUtils; import java.io.IOException; -import java.util.List; -import java.util.Set; -import java.util.Stack; +import java.util.*; public class PartitionedTableRewriter implements LogicalPlanRewriteRule { + private CatalogService catalog; + private static final Log LOG = LogFactory.getLog(PartitionedTableRewriter.class); private static final String NAME = "Partitioned Table Rewriter"; @@ -78,6 +78,7 @@ public class PartitionedTableRewriter implements LogicalPlanRewriteRule { public LogicalPlan rewrite(LogicalPlanRewriteRuleContext context) throws TajoException { LogicalPlan plan = context.getPlan(); LogicalPlan.QueryBlock rootBlock = plan.getRootBlock(); + this.catalog = context.getCatalog(); rewriter.visit(context.getQueryContext(), plan, rootBlock, rootBlock.getRoot(), new Stack<LogicalNode>()); return plan; } @@ -118,32 +119,137 @@ public class PartitionedTableRewriter implements LogicalPlanRewriteRule { * @return * @throws IOException */ - private Path [] findFilteredPaths(OverridableConf queryContext, Schema partitionColumns, EvalNode [] conjunctiveForms, - Path tablePath) - throws IOException { + private Path [] findFilteredPaths(OverridableConf queryContext, String tableName, + Schema partitionColumns, EvalNode [] conjunctiveForms, Path tablePath) + throws IOException, UndefinedDatabaseException, UndefinedTableException, + UndefinedPartitionMethodException, UndefinedOperatorException { + Path [] filteredPaths = null; FileSystem fs = tablePath.getFileSystem(queryContext.getConf()); + String [] splits = CatalogUtil.splitFQTableName(tableName); + List<PartitionDescProto> partitions = null; - PathFilter [] filters; - if (conjunctiveForms == null) { - filters = buildAllAcceptingPathFilters(partitionColumns); - } else { - filters = buildPathFiltersForAllLevels(partitionColumns, conjunctiveForms); + try { + if (conjunctiveForms == null) { + partitions = catalog.getAllPartitions(splits[0], splits[1]); + } else { + PartitionsByAlgebraProto request = getPartitionsAlgebraProto(splits[0], splits[1], conjunctiveForms); + partitions = catalog.getPartitionsByAlgebra(request); + } + // If catalog returns list of table partitions successfully, build path lists for scanning table data. + if (partitions != null) { + filteredPaths = new Path[partitions.size()]; + for (int i = 0; i < partitions.size(); i++) { + filteredPaths[i] = new Path(partitions.get(i).getPath()); + } + } + } catch (TajoInternalError e) { + LOG.error(e.getMessage(), e); } - // loop from one to the number of partition columns - Path [] filteredPaths = toPathArray(fs.listStatus(tablePath, filters[0])); + // If we should fail to build path lists with catalog, we need to path lists using getting an array of FileStatus + // objects with the path-filter. + if (partitions == null || filteredPaths == null) { + PathFilter [] filters; + if (conjunctiveForms == null) { + filters = buildAllAcceptingPathFilters(partitionColumns); + } else { + filters = buildPathFiltersForAllLevels(partitionColumns, conjunctiveForms); + } + + // loop from one to the number of partition columns + filteredPaths = toPathArray(fs.listStatus(tablePath, filters[0])); - for (int i = 1; i < partitionColumns.size(); i++) { - // Get all file status matched to a ith level path filter. - filteredPaths = toPathArray(fs.listStatus(filteredPaths, filters[i])); + for (int i = 1; i < partitionColumns.size(); i++) { + // Get all file status matched to a ith level path filter. + filteredPaths = toPathArray(fs.listStatus(filteredPaths, filters[i])); + } } LOG.info("Filtered directory or files: " + filteredPaths.length); + return filteredPaths; } /** + * This will build algebra expressions for querying partitions and partition keys in CatalogStore. + * + * For example, consider you have a partitioned table for three columns (i.e., col1, col2, col3). + * Assume that an user gives a condition WHERE (col1 ='1' or col1 = '100') and col3 > 20 . + * + * Then, the algebra expression would be generated as following: + * + * { + * "LeftExpr": { + * "LeftExpr": { + * "Qualifier": "default.table1", + * "ColumnName": "col3", + * "OpType": "Column" + * }, + * "RightExpr": { + * "Value": "20.0", + * "ValueType": "Unsigned_Integer", + * "OpType": "Literal" + * }, + * "OpType": "GreaterThan" + * }, + * "RightExpr": { + * "LeftExpr": { + * "LeftExpr": { + * "Qualifier": "default.table1", + * "ColumnName": "col1", + * "OpType": "Column" + * }, + * "RightExpr": { + * "Value": "1", + * "ValueType": "String", + * "OpType": "Literal" + * }, + * "OpType": "Equals" + * }, + * "RightExpr": { + * "LeftExpr": { + * "Qualifier": "default.table1", + * "ColumnName": "col1", + * "OpType": "Column" + * }, + * "RightExpr": { + * "Value": "100", + * "ValueType": "String", + * "OpType": "Literal" + * }, + * "OpType": "Equals" + * }, + * "OpType": "Or" + * }, + * "OpType": "And" + *} + * + * @param databaseName + * @param tableName + * @param conjunctiveForms + * @return + */ + public static PartitionsByAlgebraProto getPartitionsAlgebraProto( + String databaseName, String tableName, EvalNode [] conjunctiveForms) { + + PartitionsByAlgebraProto.Builder request = PartitionsByAlgebraProto.newBuilder(); + request.setDatabaseName(databaseName); + request.setTableName(tableName); + + if (conjunctiveForms != null) { + EvalNode evalNode = AlgebraicUtil.createSingletonExprFromCNF(conjunctiveForms); + EvalNodeToExprConverter convertor = new EvalNodeToExprConverter(databaseName + "." + tableName); + convertor.visit(null, evalNode, new Stack<EvalNode>()); + request.setAlgebra(convertor.getResult().toJson()); + } else { + request.setAlgebra(""); + } + + return request.build(); + } + + /** * Build path filters for all levels with a list of filter conditions. * * For example, consider you have a partitioned table for three columns (i.e., col1, col2, col3). @@ -191,6 +297,7 @@ public class PartitionedTableRewriter implements LogicalPlanRewriteRule { accumulatedFilters.toArray(new EvalNode[accumulatedFilters.size()])); filters[i] = new PartitionPathFilter(partitionColumns, filterPerLevel); } + return filters; } @@ -222,7 +329,9 @@ public class PartitionedTableRewriter implements LogicalPlanRewriteRule { return paths; } - private Path [] findFilteredPartitionPaths(OverridableConf queryContext, ScanNode scanNode) throws IOException { + public Path [] findFilteredPartitionPaths(OverridableConf queryContext, ScanNode scanNode) throws IOException, + UndefinedDatabaseException, UndefinedTableException, UndefinedPartitionMethodException, + UndefinedOperatorException { TableDesc table = scanNode.getTableDesc(); PartitionMethodDesc partitionDesc = scanNode.getTableDesc().getPartitionMethod(); @@ -261,10 +370,10 @@ public class PartitionedTableRewriter implements LogicalPlanRewriteRule { } if (indexablePredicateSet.size() > 0) { // There are at least one indexable predicates - return findFilteredPaths(queryContext, paritionValuesSchema, + return findFilteredPaths(queryContext, table.getName(), paritionValuesSchema, indexablePredicateSet.toArray(new EvalNode[indexablePredicateSet.size()]), new Path(table.getUri())); } else { // otherwise, we will get all partition paths. - return findFilteredPaths(queryContext, paritionValuesSchema, null, new Path(table.getUri())); + return findFilteredPaths(queryContext, table.getName(), paritionValuesSchema, null, new Path(table.getUri())); } } http://git-wip-us.apache.org/repos/asf/tajo/blob/b6832910/tajo-plan/src/main/java/org/apache/tajo/plan/util/EvalNodeToExprConverter.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/util/EvalNodeToExprConverter.java b/tajo-plan/src/main/java/org/apache/tajo/plan/util/EvalNodeToExprConverter.java new file mode 100644 index 0000000..7510bcd --- /dev/null +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/util/EvalNodeToExprConverter.java @@ -0,0 +1,297 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.plan.util; + +import org.apache.tajo.algebra.*; +import org.apache.tajo.datum.DateDatum; +import org.apache.tajo.datum.Datum; +import org.apache.tajo.datum.TimeDatum; +import org.apache.tajo.datum.TimestampDatum; +import org.apache.tajo.plan.expr.*; + +import java.util.Stack; + +/** + * This converts EvalNode tree to Expr tree. + * + */ +public class EvalNodeToExprConverter extends SimpleEvalNodeVisitor<Object> { + private Stack<Expr> exprs = new Stack<Expr>(); + + private String tableName; + + public EvalNodeToExprConverter(String tableName) { + this.tableName = tableName; + } + + public Expr getResult() { + return exprs.pop(); + } + + @Override + protected EvalNode visitBinaryEval(Object o, Stack<EvalNode> stack, BinaryEval binaryEval) { + stack.push(binaryEval); + visit(o, binaryEval.getLeftExpr(), stack); + Expr left = exprs.pop(); + + visit(o, binaryEval.getRightExpr(), stack); + Expr right = exprs.pop(); + + Expr expr = null; + switch (binaryEval.getType()) { + // Arithmetic expression + case PLUS: + expr = new BinaryOperator(OpType.Plus, left, right); + break; + case MINUS: + expr = new BinaryOperator(OpType.Minus, left, right); + break; + case MULTIPLY: + expr = new BinaryOperator(OpType.Multiply, left, right); + break; + case DIVIDE: + expr = new BinaryOperator(OpType.Divide, left, right); + break; + case MODULAR: + expr = new BinaryOperator(OpType.Modular, left, right); + break; + + // Logical Predicates + case AND: + expr = new BinaryOperator(OpType.And, left, right); + break; + case OR: + expr = new BinaryOperator(OpType.Or, left, right); + break; + case NOT: + expr = new BinaryOperator(OpType.Not, left, right); + break; + + // Comparison Predicates + case EQUAL: + expr = new BinaryOperator(OpType.Equals, left, right); + break; + case NOT_EQUAL: + expr = new BinaryOperator(OpType.NotEquals, left, right); + break; + case LTH: + expr = new BinaryOperator(OpType.LessThan, left, right); + break; + case LEQ: + expr = new BinaryOperator(OpType.LessThanOrEquals, left, right); + break; + case GTH: + expr = new BinaryOperator(OpType.GreaterThan, left, right); + break; + case GEQ: + expr = new BinaryOperator(OpType.GreaterThanOrEquals, left, right); + break; + + // SQL standard predicates + case IS_NULL: + expr = new BinaryOperator(OpType.IsNullPredicate, left, right); + break; + case CASE: + expr = new BinaryOperator(OpType.CaseWhen, left, right); + break; + case IN: + InEval inEval = (InEval) binaryEval; + expr = new InPredicate(left, right, inEval.isNot()); + break; + + // String operators and Pattern match predicates + case LIKE: + LikePredicateEval likePredicateEval = (LikePredicateEval) binaryEval; + expr = new PatternMatchPredicate(OpType.LikePredicate, likePredicateEval.isNot(), left, right); + break; + case SIMILAR_TO: + SimilarToPredicateEval similarToPredicateEval = (SimilarToPredicateEval) binaryEval; + expr = new PatternMatchPredicate(OpType.SimilarToPredicate, similarToPredicateEval.isNot(), left, right); + break; + case REGEX: + RegexPredicateEval regexPredicateEval = (RegexPredicateEval) binaryEval; + expr = new PatternMatchPredicate(OpType.Regexp, regexPredicateEval.isNot(), left, right); + break; + case CONCATENATE: + default: + throw new RuntimeException("Unsupported type: " + binaryEval.getType().name()); + } + + if (expr != null) { + exprs.push(expr); + } + + stack.pop(); + return null; + } + + @Override + protected EvalNode visitConst(Object o, ConstEval evalNode, Stack<EvalNode> stack) { + Expr value = null; + DateValue dateValue; + TimeValue timeValue; + + switch (evalNode.getValueType().getType()) { + case NULL_TYPE: + value = new NullLiteral(); + break; + case BOOLEAN: + value = new LiteralValue(evalNode.getValue().asChars(), LiteralValue.LiteralType.Boolean); + break; + case INT1: + case INT2: + case INT4: + value = new LiteralValue(evalNode.getValue().asChars(), LiteralValue.LiteralType.Unsigned_Integer); + break; + case INT8: + value = new LiteralValue(evalNode.getValue().asChars(), LiteralValue.LiteralType.Unsigned_Large_Integer); + break; + case FLOAT4: + case FLOAT8: + value = new LiteralValue(evalNode.getValue().asChars(), LiteralValue.LiteralType.Unsigned_Float); + break; + case TEXT: + value = new LiteralValue(evalNode.getValue().asChars(), LiteralValue.LiteralType.String); + break; + case DATE: + DateDatum dateDatum = (DateDatum) evalNode.getValue(); + + dateValue = new DateValue(""+dateDatum.getYear(), + ""+dateDatum.getMonthOfYear(), ""+dateDatum.getDayOfMonth()); + value = new DateLiteral(dateValue); + + break; + case TIMESTAMP: + TimestampDatum timestampDatum = (TimestampDatum) evalNode.getValue(); + + dateValue = new DateValue(""+timestampDatum.getYear(), + ""+timestampDatum.getMonthOfYear(), ""+timestampDatum.getDayOfMonth()); + + timeValue = new TimeValue(""+timestampDatum.getHourOfDay() + , ""+timestampDatum.getMinuteOfHour(), ""+timestampDatum.getSecondOfMinute()); + + value = new TimestampLiteral(dateValue, timeValue); + break; + case TIME: + TimeDatum timeDatum = (TimeDatum) evalNode.getValue(); + timeValue = new TimeValue(""+timeDatum.getHourOfDay() + , ""+timeDatum.getMinuteOfHour(), ""+timeDatum.getSecondOfMinute()); + + value = new TimeLiteral(timeValue); + break; + default: + throw new RuntimeException("Unsupported type: " + evalNode.getValueType().getType().name()); + } + exprs.push(value); + + return super.visitConst(o, evalNode, stack); + } + + @Override + protected EvalNode visitRowConstant(Object o, RowConstantEval evalNode, Stack<EvalNode> stack) { + Expr[] values = new Expr[evalNode.getValues().length]; + for (int i = 0; i < evalNode.getValues().length; i++) { + Datum datum = evalNode.getValues()[i]; + LiteralValue value; + switch (datum.type()) { + case BOOLEAN: + value = new LiteralValue(datum.asChars(), LiteralValue.LiteralType.Boolean); + break; + case TEXT: + value = new LiteralValue(datum.asChars(), LiteralValue.LiteralType.String); + break; + case INT1: + case INT2: + case INT4: + value = new LiteralValue(datum.asChars(), LiteralValue.LiteralType.Unsigned_Integer); + break; + case INT8: + value = new LiteralValue(datum.asChars(), LiteralValue.LiteralType.Unsigned_Large_Integer); + break; + case FLOAT4: + case FLOAT8: + value = new LiteralValue(datum.asChars(), LiteralValue.LiteralType.Unsigned_Float); + break; + default: + throw new RuntimeException("Unsupported type: " + datum.type().name()); + } + values[i] = value; + } + ValueListExpr expr = new ValueListExpr(values); + exprs.push(expr); + + return super.visitRowConstant(o, evalNode, stack); + } + + @Override + protected EvalNode visitField(Object o, Stack<EvalNode> stack, FieldEval evalNode) { + ColumnReferenceExpr expr = new ColumnReferenceExpr(tableName, evalNode.getColumnName()); + exprs.push(expr); + return super.visitField(o, stack, evalNode); + } + + @Override + protected EvalNode visitBetween(Object o, BetweenPredicateEval evalNode, Stack<EvalNode> stack) { + stack.push(evalNode); + + visit(o, evalNode.getPredicand(), stack); + Expr predicand = exprs.pop(); + + visit(o, evalNode.getBegin(), stack); + Expr begin = exprs.pop(); + + visit(o, evalNode.getEnd(), stack); + Expr end = exprs.pop(); + + Expr expr = new BetweenPredicate(evalNode.isNot(), evalNode.isSymmetric(), predicand, begin, end); + exprs.push(expr); + + stack.pop(); + + return null; + } + + @Override + protected EvalNode visitCaseWhen(Object o, CaseWhenEval evalNode, Stack<EvalNode> stack) { + stack.push(evalNode); + + CaseWhenPredicate caseWhenPredicate = new CaseWhenPredicate(); + + for (CaseWhenEval.IfThenEval ifThenEval : evalNode.getIfThenEvals()) { + visit(o, ifThenEval.getCondition(), stack); + Expr condition = exprs.pop(); + visit(o, ifThenEval.getResult(), stack); + Expr result = exprs.pop(); + + caseWhenPredicate.addWhen(condition, result); + } + + if (evalNode.hasElse()) { + visit(o, evalNode.getElse(), stack); + Expr elseResult = exprs.pop(); + caseWhenPredicate.setElseResult(elseResult); + } + + exprs.push(caseWhenPredicate); + + stack.pop(); + + return null; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/b6832910/tajo-plan/src/main/java/org/apache/tajo/plan/util/PartitionFilterAlgebraVisitor.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/util/PartitionFilterAlgebraVisitor.java b/tajo-plan/src/main/java/org/apache/tajo/plan/util/PartitionFilterAlgebraVisitor.java new file mode 100644 index 0000000..72fd939 --- /dev/null +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/util/PartitionFilterAlgebraVisitor.java @@ -0,0 +1,573 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.tajo.plan.util; + +import org.apache.tajo.algebra.*; +import org.apache.tajo.catalog.CatalogConstants; +import org.apache.tajo.catalog.Column; +import org.apache.tajo.common.TajoDataTypes.*; +import org.apache.tajo.datum.TimeDatum; +import org.apache.tajo.exception.TajoException; +import org.apache.tajo.exception.UnsupportedException; +import org.apache.tajo.plan.ExprAnnotator; +import org.apache.tajo.plan.visitor.SimpleAlgebraVisitor; +import org.apache.tajo.util.Pair; +import org.apache.tajo.util.TUtil; +import org.apache.tajo.util.datetime.DateTimeUtil; +import org.apache.tajo.util.datetime.TimeMeta; + +import java.sql.Date; +import java.sql.Time; +import java.sql.Timestamp; +import java.util.List; +import java.util.Stack; +import java.util.TimeZone; + +/** + * This build SQL statements for getting partitions informs on CatalogStore with algebra expressions. + * This visitor assumes that all columns of algebra expressions are reserved for one table. + * + */ +public class PartitionFilterAlgebraVisitor extends SimpleAlgebraVisitor<Object, Expr> { + private String tableAlias; + private Column column; + private boolean isHiveCatalog = false; + + private Stack<String> queries = new Stack(); + private List<Pair<Type, Object>> parameters = TUtil.newList(); + + public String getTableAlias() { + return tableAlias; + } + + public void setTableAlias(String tableAlias) { + this.tableAlias = tableAlias; + } + + public Column getColumn() { + return column; + } + + public void setColumn(Column column) { + this.column = column; + } + + public boolean isHiveCatalog() { + return isHiveCatalog; + } + + public void setIsHiveCatalog(boolean isHiveCatalog) { + this.isHiveCatalog = isHiveCatalog; + } + + public List<Pair<Type, Object>> getParameters() { + return parameters; + } + + public void setParameters(List<Pair<Type, Object>> parameters) { + this.parameters = parameters; + } + + public void clearParameters() { + this.parameters.clear(); + } + + public String getResult() { + return queries.pop(); + } + + @Override + public Expr visit(Object ctx, Stack<Expr> stack, Expr expr) throws TajoException { + if (expr.getType() == OpType.LikePredicate) { + return visitLikePredicate(ctx, stack, (PatternMatchPredicate) expr); + } else if (expr.getType() == OpType.SimilarToPredicate) { + return visitSimilarToPredicate(ctx, stack, (PatternMatchPredicate) expr); + } else if (expr.getType() == OpType.Regexp) { + return visitRegexpPredicate(ctx, stack, (PatternMatchPredicate) expr); + } + return super.visit(ctx, stack, expr); + } + + @Override + public Expr visitDateLiteral(Object ctx, Stack<Expr> stack, DateLiteral expr) throws TajoException { + StringBuilder sb = new StringBuilder(); + + if (!isHiveCatalog) { + sb.append("?").append(" )"); + parameters.add(new Pair(Type.DATE, Date.valueOf(expr.toString()))); + } else { + sb.append("\"").append(expr.toString()).append("\""); + } + queries.push(sb.toString()); + return expr; + } + + @Override + public Expr visitTimestampLiteral(Object ctx, Stack<Expr> stack, TimestampLiteral expr) throws TajoException { + StringBuilder sb = new StringBuilder(); + + if (!isHiveCatalog) { + DateValue dateValue = expr.getDate(); + TimeValue timeValue = expr.getTime(); + + int [] dates = ExprAnnotator.dateToIntArray(dateValue.getYears(), + dateValue.getMonths(), + dateValue.getDays()); + int [] times = ExprAnnotator.timeToIntArray(timeValue.getHours(), + timeValue.getMinutes(), + timeValue.getSeconds(), + timeValue.getSecondsFraction()); + + long julianTimestamp; + if (timeValue.hasSecondsFraction()) { + julianTimestamp = DateTimeUtil.toJulianTimestamp(dates[0], dates[1], dates[2], times[0], times[1], times[2], + times[3] * 1000); + } else { + julianTimestamp = DateTimeUtil.toJulianTimestamp(dates[0], dates[1], dates[2], times[0], times[1], times[2], 0); + } + + TimeMeta tm = new TimeMeta(); + DateTimeUtil.toJulianTimeMeta(julianTimestamp, tm); + + TimeZone tz = TimeZone.getDefault(); + DateTimeUtil.toUTCTimezone(tm, tz); + + sb.append("?").append(" )"); + Timestamp timestamp = new Timestamp(DateTimeUtil.julianTimeToJavaTime(DateTimeUtil.toJulianTimestamp(tm))); + parameters.add(new Pair(Type.TIMESTAMP, timestamp)); + } else { + sb.append("\"").append(expr.toString()).append("\""); + } + queries.push(sb.toString()); + + return expr; + } + + @Override + public Expr visitTimeLiteral(Object ctx, Stack<Expr> stack, TimeLiteral expr) throws TajoException { + StringBuilder sb = new StringBuilder(); + + if (!isHiveCatalog) { + TimeValue timeValue = expr.getTime(); + + int [] times = ExprAnnotator.timeToIntArray(timeValue.getHours(), + timeValue.getMinutes(), + timeValue.getSeconds(), + timeValue.getSecondsFraction()); + + long time; + if (timeValue.hasSecondsFraction()) { + time = DateTimeUtil.toTime(times[0], times[1], times[2], times[3] * 1000); + } else { + time = DateTimeUtil.toTime(times[0], times[1], times[2], 0); + } + TimeDatum timeDatum = new TimeDatum(time); + TimeMeta tm = timeDatum.asTimeMeta(); + + TimeZone tz = TimeZone.getDefault(); + DateTimeUtil.toUTCTimezone(tm, tz); + + sb.append("?").append(" )"); + parameters.add(new Pair(Type.TIME, new Time(DateTimeUtil.toJavaTime(tm.hours, tm.minutes, tm.secs, tm.fsecs)))); + } else { + sb.append("\"").append(expr.toString()).append("\""); + } + queries.push(sb.toString()); + + return expr; + } + + @Override + public Expr visitLiteral(Object ctx, Stack<Expr> stack, LiteralValue expr) throws TajoException { + StringBuilder sb = new StringBuilder(); + + if (!isHiveCatalog) { + sb.append("?").append(" )"); + switch (expr.getValueType()) { + case Boolean: + parameters.add(new Pair(Type.BOOLEAN, Boolean.valueOf(expr.getValue()))); + break; + case Unsigned_Float: + parameters.add(new Pair(Type.FLOAT8, Double.valueOf(expr.getValue()))); + break; + case String: + parameters.add(new Pair(Type.TEXT, expr.getValue())); + break; + default: + parameters.add(new Pair(Type.INT8, Long.valueOf(expr.getValue()))); + break; + } + } else { + switch (expr.getValueType()) { + case String: + sb.append("\"").append(expr.getValue()).append("\""); + break; + default: + sb.append(expr.getValue()); + break; + } + } + queries.push(sb.toString()); + + return expr; + } + + @Override + public Expr visitValueListExpr(Object ctx, Stack<Expr> stack, ValueListExpr expr) throws TajoException { + StringBuilder sb = new StringBuilder(); + + if (!isHiveCatalog) { + sb.append("("); + for(int i = 0; i < expr.getValues().length; i++) { + if (i > 0) { + sb.append(", "); + } + sb.append("?"); + stack.push(expr.getValues()[i]); + visit(ctx, stack, expr.getValues()[i]); + stack.pop(); + } + sb.append(")"); + sb.append(" )"); + queries.push(sb.toString()); + } else { + throw new UnsupportedException("IN Operator"); + } + + return expr; + } + + @Override + public Expr visitColumnReference(Object ctx, Stack<Expr> stack, ColumnReferenceExpr expr) throws TajoException { + StringBuilder sb = new StringBuilder(); + + if (!isHiveCatalog) { + sb.append("( ").append(tableAlias).append(".").append(CatalogConstants.COL_COLUMN_NAME) + .append(" = '").append(expr.getName()).append("'") + .append(" AND ").append(tableAlias).append(".").append(CatalogConstants.COL_PARTITION_VALUE); + } else { + sb.append(expr.getName()); + } + queries.push(sb.toString()); + return expr; + } + + + @Override + public Expr visitUnaryOperator(Object ctx, Stack<Expr> stack, UnaryOperator expr) throws TajoException { + stack.push(expr); + Expr child = visit(ctx, stack, expr.getChild()); + stack.pop(); + + if (child.getType() == OpType.Literal) { + return new NullLiteral(); + } + + String childSql = queries.pop(); + + StringBuilder sb = new StringBuilder(); + if (expr.getType() == OpType.IsNullPredicate) { + IsNullPredicate isNullPredicate = (IsNullPredicate) expr; + sb.append(childSql); + sb.append(" IS "); + if (isNullPredicate.isNot()) { + sb.append("NOT NULL"); + } else { + sb.append("NULL"); + } + } + + if (!isHiveCatalog) { + sb.append(" )"); + } + queries.push(sb.toString()); + + return expr; + } + + @Override + public Expr visitBetween(Object ctx, Stack<Expr> stack, BetweenPredicate expr) throws TajoException { + stack.push(expr); + + visit(ctx, stack, expr.predicand()); + String predicandSql = queries.pop(); + + visit(ctx, stack, expr.begin()); + String beginSql= queries.pop(); + if (!isHiveCatalog && beginSql.endsWith(")")) { + beginSql = beginSql.substring(0, beginSql.length()-1); + } + + visit(ctx, stack, expr.end()); + String endSql = queries.pop(); + if (!isHiveCatalog && endSql.endsWith(")")) { + endSql = beginSql.substring(0, endSql.length()-1); + } + + StringBuilder sb = new StringBuilder(); + sb.append(predicandSql); + sb.append(" BETWEEN "); + sb.append(beginSql); + sb.append(" AND "); + sb.append(endSql); + + if (!isHiveCatalog) { + sb.append(")"); + } + + queries.push(sb.toString()); + + return expr; + } + + @Override + public Expr visitCaseWhen(Object ctx, Stack<Expr> stack, CaseWhenPredicate expr) throws TajoException { + stack.push(expr); + + StringBuilder sb = new StringBuilder(); + sb.append("CASE "); + + String condition, result; + + for (CaseWhenPredicate.WhenExpr when : expr.getWhens()) { + visit(ctx, stack, when.getCondition()); + condition = queries.pop(); + visit(ctx, stack, when.getResult()); + result = queries.pop(); + + String whenSql = condition + " " + result; + if (!isHiveCatalog && whenSql.endsWith(")")) { + whenSql = whenSql.substring(0, whenSql.length()-1); + } + + sb.append(whenSql).append(" "); + } + + if (expr.hasElseResult()) { + visit(ctx, stack, expr.getElseResult()); + String elseSql = queries.pop(); + if (!isHiveCatalog && elseSql.endsWith(")")) { + elseSql = elseSql.substring(0, elseSql.length()-1); + } + + sb.append("ELSE ").append(elseSql).append(" END"); + } + + if (!isHiveCatalog) { + sb.append(")"); + } + + queries.push(sb.toString()); + + stack.pop(); + return expr; + } + + @Override + public Expr visitBinaryOperator(Object ctx, Stack<Expr> stack, BinaryOperator expr) throws TajoException { + stack.push(expr); + Expr lhs = visit(ctx, stack, expr.getLeft()); + String leftSql = queries.pop(); + Expr rhs = visit(ctx, stack, expr.getRight()); + String rightSql = queries.pop(); + stack.pop(); + + if (!expr.getLeft().equals(lhs)) { + expr.setLeft(lhs); + } + if (!expr.getRight().equals(rhs)) { + expr.setRight(rhs); + } + + if (lhs.getType() == OpType.Literal && rhs.getType() == OpType.Literal) { + return new NullLiteral(); + } + + StringBuilder sb = new StringBuilder(); + sb.append(leftSql); + sb.append(" ").append(getOperator(expr.getType())).append(" "); + sb.append(rightSql); + queries.push(sb.toString()); + + return expr; + } + + private String getOperator(OpType type) { + String operator; + switch (type) { + case Not: + operator = "!"; + break; + case And: + operator = "AND"; + break; + case Or: + operator = "OR"; + break; + case Equals: + operator = "="; + break; + case IsNullPredicate: + operator = "IS NULL"; + break; + case NotEquals: + operator = "<>"; + break; + case LessThan: + operator = "<"; + break; + case LessThanOrEquals: + operator = "<="; + break; + case GreaterThan: + operator = ">"; + break; + case GreaterThanOrEquals: + operator = ">="; + break; + case Plus: + operator = "+"; + break; + case Minus: + operator = "-"; + break; + case Modular: + operator = "%"; + break; + case Multiply: + operator = "*"; + break; + case Divide: + operator = "/"; + break; + case LikePredicate: + operator = "LIKE"; + break; + case SimilarToPredicate: + operator = "([.])"; + break; + case InPredicate: + operator = "IN"; + break; + case Asterisk: + operator = "*"; + break; + //TODO: need to check more types. + default: + operator = type.name(); + break; + } + + return operator; + } + + @Override + public Expr visitLikePredicate(Object ctx, Stack<Expr> stack, PatternMatchPredicate expr) throws TajoException { + stack.push(expr); + + visit(ctx, stack, expr.getPredicand()); + String predicand = queries.pop(); + visit(ctx, stack, expr.getPattern()); + String pattern = queries.pop(); + stack.pop(); + + if(isHiveCatalog) { + if (pattern.startsWith("%") || pattern.endsWith("%")) { + throw new UnsupportedException("LIKE Operator with '%'"); + } + } + StringBuilder sb = new StringBuilder(); + sb.append(predicand); + + if (expr.isNot()) { + sb.append(" NOT "); + } + + if (expr.isCaseInsensitive()) { + sb.append(" ILIKE "); + } else { + sb.append(" LIKE "); + } + + + sb.append(pattern); + queries.push(sb.toString()); + + return expr; + } + + @Override + public Expr visitSimilarToPredicate(Object ctx, Stack<Expr> stack, PatternMatchPredicate expr) throws TajoException { + if (isHiveCatalog) { + throw new UnsupportedException("SIMILAR TO Operator"); + } + + stack.push(expr); + + visit(ctx, stack, expr.getPredicand()); + String predicand = queries.pop(); + visit(ctx, stack, expr.getPattern()); + String pattern = queries.pop(); + stack.pop(); + + StringBuilder sb = new StringBuilder(); + sb.append(predicand); + + if (expr.isNot()) { + sb.append(" NOT "); + } + + sb.append(" SIMILAR TO "); + + sb.append(pattern); + queries.push(sb.toString()); + + return expr; + } + + @Override + public Expr visitRegexpPredicate(Object ctx, Stack<Expr> stack, PatternMatchPredicate expr) throws TajoException { + if (isHiveCatalog) { + throw new UnsupportedException("REGEXP Operator"); + } + + stack.push(expr); + + visit(ctx, stack, expr.getPredicand()); + String predicand = queries.pop(); + visit(ctx, stack, expr.getPattern()); + String pattern = queries.pop(); + stack.pop(); + + StringBuilder sb = new StringBuilder(); + sb.append(predicand); + + if (expr.isNot()) { + sb.append(" NOT "); + } + sb.append(" REGEXP "); + + sb.append(pattern); + queries.push(sb.toString()); + + return expr; + } + +} \ No newline at end of file
