Repository: phoenix Updated Branches: refs/heads/master 26a6f8812 -> 1c2b9b0e7
http://git-wip-us.apache.org/repos/asf/phoenix/blob/bc9974ba/phoenix-core/src/it/java/org/apache/phoenix/end2end/SortMergeJoinMoreIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SortMergeJoinMoreIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SortMergeJoinMoreIT.java new file mode 100644 index 0000000..d9016d0 --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SortMergeJoinMoreIT.java @@ -0,0 +1,314 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.end2end; + +import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.Statement; +import java.util.Map; +import java.util.Properties; + +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.util.PropertiesUtil; +import org.apache.phoenix.util.ReadOnlyProps; +import org.junit.BeforeClass; +import org.junit.Test; + +import com.google.common.collect.Maps; + +public class SortMergeJoinMoreIT extends BaseHBaseManagedTimeIT { + + @BeforeClass + @Shadower(classBeingShadowed = BaseHBaseManagedTimeIT.class) + public static void doSetup() throws Exception { + Map<String,String> props = Maps.newHashMapWithExpectedSize(3); + // Forces server cache to be used + props.put(QueryServices.INDEX_MUTATE_BATCH_SIZE_THRESHOLD_ATTRIB, Integer.toString(2)); + // Must update config before starting server + setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator())); + } + + @Test + public void testJoinOverSaltedTables() throws Exception { + String tempTableNoSalting = "TEMP_TABLE_NO_SALTING"; + String tempTableWithSalting = "TEMP_TABLE_WITH_SALTING"; + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + Connection conn = DriverManager.getConnection(getUrl(), props); + try { + conn.createStatement().execute("CREATE TABLE " + tempTableNoSalting + + " (mypk INTEGER NOT NULL PRIMARY KEY, " + + " col1 INTEGER)"); + conn.createStatement().execute("CREATE TABLE " + tempTableWithSalting + + " (mypk INTEGER NOT NULL PRIMARY KEY, " + + " col1 INTEGER) SALT_BUCKETS=4"); + + PreparedStatement upsertStmt = conn.prepareStatement( + "upsert into " + tempTableNoSalting + "(mypk, col1) " + "values (?, ?)"); + for (int i = 0; i < 3; i++) { + upsertStmt.setInt(1, i + 1); + upsertStmt.setInt(2, 3 - i); + upsertStmt.execute(); + } + conn.commit(); + + upsertStmt = conn.prepareStatement( + "upsert into " + tempTableWithSalting + "(mypk, col1) " + "values (?, ?)"); + for (int i = 0; i < 6; i++) { + upsertStmt.setInt(1, i + 1); + upsertStmt.setInt(2, 3 - (i % 3)); + upsertStmt.execute(); + } + conn.commit(); + + // LHS=unsalted JOIN RHS=salted + String query = "SELECT /*+ USE_SORT_MERGE_JOIN*/ lhs.mypk, lhs.col1, rhs.mypk, rhs.col1 FROM " + + tempTableNoSalting + " lhs JOIN " + + tempTableWithSalting + " rhs ON rhs.mypk = lhs.col1 ORDER BY lhs.mypk"; + PreparedStatement statement = conn.prepareStatement(query); + ResultSet rs = statement.executeQuery(); + assertTrue(rs.next()); + assertEquals(rs.getInt(1), 1); + assertEquals(rs.getInt(2), 3); + assertEquals(rs.getInt(3), 3); + assertEquals(rs.getInt(4), 1); + assertTrue(rs.next()); + assertEquals(rs.getInt(1), 2); + assertEquals(rs.getInt(2), 2); + assertEquals(rs.getInt(3), 2); + assertEquals(rs.getInt(4), 2); + assertTrue(rs.next()); + assertEquals(rs.getInt(1), 3); + assertEquals(rs.getInt(2), 1); + assertEquals(rs.getInt(3), 1); + assertEquals(rs.getInt(4), 3); + + assertFalse(rs.next()); + + // LHS=salted JOIN RHS=salted + query = "SELECT /*+ USE_SORT_MERGE_JOIN*/ lhs.mypk, lhs.col1, rhs.mypk, rhs.col1 FROM " + + tempTableWithSalting + " lhs JOIN " + + tempTableNoSalting + " rhs ON rhs.mypk = lhs.col1 ORDER BY lhs.mypk"; + statement = conn.prepareStatement(query); + rs = statement.executeQuery(); + assertTrue(rs.next()); + assertEquals(rs.getInt(1), 1); + assertEquals(rs.getInt(2), 3); + assertEquals(rs.getInt(3), 3); + assertEquals(rs.getInt(4), 1); + assertTrue(rs.next()); + assertEquals(rs.getInt(1), 2); + assertEquals(rs.getInt(2), 2); + assertEquals(rs.getInt(3), 2); + assertEquals(rs.getInt(4), 2); + assertTrue(rs.next()); + assertEquals(rs.getInt(1), 3); + assertEquals(rs.getInt(2), 1); + assertEquals(rs.getInt(3), 1); + assertEquals(rs.getInt(4), 3); + assertTrue(rs.next()); + assertEquals(rs.getInt(1), 4); + assertEquals(rs.getInt(2), 3); + assertEquals(rs.getInt(3), 3); + assertEquals(rs.getInt(4), 1); + assertTrue(rs.next()); + assertEquals(rs.getInt(1), 5); + assertEquals(rs.getInt(2), 2); + assertEquals(rs.getInt(3), 2); + assertEquals(rs.getInt(4), 2); + assertTrue(rs.next()); + assertEquals(rs.getInt(1), 6); + assertEquals(rs.getInt(2), 1); + assertEquals(rs.getInt(3), 1); + assertEquals(rs.getInt(4), 3); + + assertFalse(rs.next()); + + // LHS=salted JOIN RHS=salted + query = "SELECT /*+ USE_SORT_MERGE_JOIN*/ lhs.mypk, lhs.col1, rhs.mypk, rhs.col1 FROM " + + tempTableWithSalting + " lhs JOIN " + + tempTableWithSalting + " rhs ON rhs.mypk = (lhs.col1 + 3) ORDER BY lhs.mypk"; + statement = conn.prepareStatement(query); + rs = statement.executeQuery(); + assertTrue(rs.next()); + assertEquals(rs.getInt(1), 1); + assertEquals(rs.getInt(2), 3); + assertEquals(rs.getInt(3), 6); + assertEquals(rs.getInt(4), 1); + assertTrue(rs.next()); + assertEquals(rs.getInt(1), 2); + assertEquals(rs.getInt(2), 2); + assertEquals(rs.getInt(3), 5); + assertEquals(rs.getInt(4), 2); + assertTrue(rs.next()); + assertEquals(rs.getInt(1), 3); + assertEquals(rs.getInt(2), 1); + assertEquals(rs.getInt(3), 4); + assertEquals(rs.getInt(4), 3); + assertTrue(rs.next()); + assertEquals(rs.getInt(1), 4); + assertEquals(rs.getInt(2), 3); + assertEquals(rs.getInt(3), 6); + assertEquals(rs.getInt(4), 1); + assertTrue(rs.next()); + assertEquals(rs.getInt(1), 5); + assertEquals(rs.getInt(2), 2); + assertEquals(rs.getInt(3), 5); + assertEquals(rs.getInt(4), 2); + assertTrue(rs.next()); + assertEquals(rs.getInt(1), 6); + assertEquals(rs.getInt(2), 1); + assertEquals(rs.getInt(3), 4); + assertEquals(rs.getInt(4), 3); + + assertFalse(rs.next()); + } finally { + conn.close(); + } + } + + @Test + public void testJoinOnDynamicColumns() throws Exception { + String tableA = "tableA"; + String tableB = "tableB"; + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + Connection conn = null; + PreparedStatement stmt = null; + try { + conn = DriverManager.getConnection(getUrl(), props); + String ddlA = "CREATE TABLE " + tableA + " (pkA INTEGER NOT NULL, " + " colA1 INTEGER, " + + " colA2 VARCHAR " + "CONSTRAINT PK PRIMARY KEY" + "(pkA)" + ")"; + + String ddlB = "CREATE TABLE " + tableB + " (pkB INTEGER NOT NULL PRIMARY KEY, " + " colB INTEGER)"; + stmt = conn.prepareStatement(ddlA); + stmt.execute(); + stmt.close(); + + stmt = conn.prepareStatement(ddlB); + stmt.execute(); + stmt.close(); + + String upsertA = "UPSERT INTO TABLEA (pkA, colA1, colA2) VALUES(?, ?, ?)"; + stmt = conn.prepareStatement(upsertA); + int i = 0; + for (i = 0; i < 5; i++) { + stmt.setInt(1, i); + stmt.setInt(2, i + 10); + stmt.setString(3, "00" + i); + stmt.executeUpdate(); + } + conn.commit(); + stmt.close(); + + // upsert select dynamic columns in tableB + conn.createStatement().execute("CREATE SEQUENCE SEQB"); + String upsertBSelectA = "UPSERT INTO TABLEB (pkB, pkA INTEGER)" + + "SELECT NEXT VALUE FOR SEQB, pkA FROM TABLEA"; + stmt = conn.prepareStatement(upsertBSelectA); + stmt.executeUpdate(); + stmt.close(); + conn.commit(); + conn.createStatement().execute("DROP SEQUENCE SEQB"); + + // perform a join between tableB and tableA by joining on the dynamic column that we upserted in + // tableB. This join should return all the rows from table A. + String joinSql = "SELECT /*+ USE_SORT_MERGE_JOIN*/ A.pkA, A.COLA1, A.colA2 FROM TABLEB B(pkA INTEGER) JOIN TABLEA A ON a.pkA = b.pkA"; + stmt = conn.prepareStatement(joinSql); + ResultSet rs = stmt.executeQuery(); + i = 0; + while (rs.next()) { + // check that we get back all the rows that we upserted for tableA above. + assertEquals(rs.getInt(1), i); + assertEquals(rs.getInt(2), i + 10); + assertEquals(rs.getString(3), "00" + i); + i++; + } + assertEquals(5,i); + } finally { + if (stmt != null) { + stmt.close(); + } + if (conn != null) { + conn.close(); + } + + } + + } + + @Test + public void testSubqueryWithoutData() throws Exception { + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + Connection conn = DriverManager.getConnection(getUrl(), props); + conn.setAutoCommit(false); + + try { + String GRAMMAR_TABLE = "CREATE TABLE IF NOT EXISTS GRAMMAR_TABLE (ID INTEGER PRIMARY KEY, " + + "unsig_id UNSIGNED_INT, big_id BIGINT, unsig_long_id UNSIGNED_LONG, tiny_id TINYINT," + + "unsig_tiny_id UNSIGNED_TINYINT, small_id SMALLINT, unsig_small_id UNSIGNED_SMALLINT," + + "float_id FLOAT, unsig_float_id UNSIGNED_FLOAT, double_id DOUBLE, unsig_double_id UNSIGNED_DOUBLE," + + "decimal_id DECIMAL, boolean_id BOOLEAN, time_id TIME, date_id DATE, timestamp_id TIMESTAMP," + + "unsig_time_id TIME, unsig_date_id DATE, unsig_timestamp_id TIMESTAMP, varchar_id VARCHAR (30)," + + "char_id CHAR (30), binary_id BINARY (100), varbinary_id VARBINARY (100))"; + + String LARGE_TABLE = "CREATE TABLE IF NOT EXISTS LARGE_TABLE (ID INTEGER PRIMARY KEY, " + + "unsig_id UNSIGNED_INT, big_id BIGINT, unsig_long_id UNSIGNED_LONG, tiny_id TINYINT," + + "unsig_tiny_id UNSIGNED_TINYINT, small_id SMALLINT, unsig_small_id UNSIGNED_SMALLINT," + + "float_id FLOAT, unsig_float_id UNSIGNED_FLOAT, double_id DOUBLE, unsig_double_id UNSIGNED_DOUBLE," + + "decimal_id DECIMAL, boolean_id BOOLEAN, time_id TIME, date_id DATE, timestamp_id TIMESTAMP," + + "unsig_time_id TIME, unsig_date_id DATE, unsig_timestamp_id TIMESTAMP, varchar_id VARCHAR (30)," + + "char_id CHAR (30), binary_id BINARY (100), varbinary_id VARBINARY (100))"; + + String SECONDARY_LARGE_TABLE = "CREATE TABLE IF NOT EXISTS SECONDARY_LARGE_TABLE (SEC_ID INTEGER PRIMARY KEY," + + "sec_unsig_id UNSIGNED_INT, sec_big_id BIGINT, sec_usnig_long_id UNSIGNED_LONG, sec_tiny_id TINYINT," + + "sec_unsig_tiny_id UNSIGNED_TINYINT, sec_small_id SMALLINT, sec_unsig_small_id UNSIGNED_SMALLINT," + + "sec_float_id FLOAT, sec_unsig_float_id UNSIGNED_FLOAT, sec_double_id DOUBLE, sec_unsig_double_id UNSIGNED_DOUBLE," + + "sec_decimal_id DECIMAL, sec_boolean_id BOOLEAN, sec_time_id TIME, sec_date_id DATE," + + "sec_timestamp_id TIMESTAMP, sec_unsig_time_id TIME, sec_unsig_date_id DATE, sec_unsig_timestamp_id TIMESTAMP," + + "sec_varchar_id VARCHAR (30), sec_char_id CHAR (30), sec_binary_id BINARY (100), sec_varbinary_id VARBINARY (100))"; + createTestTable(getUrl(), GRAMMAR_TABLE); + createTestTable(getUrl(), LARGE_TABLE); + createTestTable(getUrl(), SECONDARY_LARGE_TABLE); + + String ddl = "SELECT /*+USE_SORT_MERGE_JOIN*/ * FROM (SELECT ID, BIG_ID, DATE_ID FROM LARGE_TABLE AS A WHERE (A.ID % 5) = 0) AS A " + + "INNER JOIN (SELECT SEC_ID, SEC_TINY_ID, SEC_UNSIG_FLOAT_ID FROM SECONDARY_LARGE_TABLE AS B WHERE (B.SEC_ID % 5) = 0) AS B " + + "ON A.ID=B.SEC_ID WHERE A.DATE_ID > ALL (SELECT SEC_DATE_ID FROM SECONDARY_LARGE_TABLE LIMIT 100) " + + "AND B.SEC_UNSIG_FLOAT_ID = ANY (SELECT sec_unsig_float_id FROM SECONDARY_LARGE_TABLE " + + "WHERE SEC_ID > ALL (SELECT MIN (ID) FROM GRAMMAR_TABLE WHERE UNSIG_ID IS NULL) AND " + + "SEC_UNSIG_ID < ANY (SELECT DISTINCT(UNSIG_ID) FROM LARGE_TABLE WHERE UNSIG_ID<2500) LIMIT 1000) " + + "AND A.ID < 10000"; + ResultSet rs = conn.createStatement().executeQuery(ddl); + assertFalse(rs.next()); + } finally { + Statement statement = conn.createStatement(); + String query = "drop table GRAMMAR_TABLE"; + statement.executeUpdate(query); + query = "drop table LARGE_TABLE"; + statement.executeUpdate(query); + query = "drop table SECONDARY_LARGE_TABLE"; + statement.executeUpdate(query); + conn.close(); + } + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/bc9974ba/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java index 8a9abc2..ad65c1c 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java @@ -285,23 +285,28 @@ public class QueryCompiler { JoinType[] joinTypes = new JoinType[count]; PTable[] tables = new PTable[count]; int[] fieldPositions = new int[count]; - HashSubPlan[] subPlans = new HashSubPlan[count]; + StatementContext[] subContexts = new StatementContext[count]; + QueryPlan[] subPlans = new QueryPlan[count]; + HashSubPlan[] hashPlans = new HashSubPlan[count]; fieldPositions[0] = projectedTable.getColumns().size() - projectedTable.getPKColumns().size(); for (int i = 0; i < count; i++) { JoinSpec joinSpec = joinSpecs.get(i); Scan subScan = ScanUtil.newScan(originalScan); - StatementContext subContext = new StatementContext(statement, context.getResolver(), subScan, new SequenceManager(statement)); - QueryPlan joinPlan = compileJoinQuery(subContext, binds, joinSpec.getJoinTable(), true, true, null); + subContexts[i] = new StatementContext(statement, context.getResolver(), subScan, new SequenceManager(statement)); + subPlans[i] = compileJoinQuery(subContexts[i], binds, joinSpec.getJoinTable(), true, true, null); boolean hasPostReference = joinSpec.getJoinTable().hasPostReference(); if (hasPostReference) { - tables[i] = subContext.getResolver().getTables().get(0).getTable(); + tables[i] = subContexts[i].getResolver().getTables().get(0).getTable(); projectedTable = JoinCompiler.joinProjectedTables(projectedTable, tables[i], joinSpec.getType()); } else { tables[i] = null; } + } + for (int i = 0; i < count; i++) { + JoinSpec joinSpec = joinSpecs.get(i); context.setResolver(FromCompiler.getResolverForProjectedTable(projectedTable, context.getConnection(), query.getUdfParseNodes())); joinIds[i] = new ImmutableBytesPtr(emptyByteArray); // place-holder - Pair<List<Expression>, List<Expression>> joinConditions = joinSpec.compileJoinConditions(context, subContext, true); + Pair<List<Expression>, List<Expression>> joinConditions = joinSpec.compileJoinConditions(context, subContexts[i], true); joinExpressions[i] = joinConditions.getFirst(); List<Expression> hashExpressions = joinConditions.getSecond(); Pair<Expression, Expression> keyRangeExpressions = new Pair<Expression, Expression>(null, null); @@ -312,7 +317,7 @@ public class QueryCompiler { if (i < count - 1) { fieldPositions[i + 1] = fieldPositions[i] + (tables[i] == null ? 0 : (tables[i].getColumns().size() - tables[i].getPKColumns().size())); } - subPlans[i] = new HashSubPlan(i, joinPlan, optimized ? null : hashExpressions, joinSpec.isSingleValueOnly(), keyRangeLhsExpression, keyRangeRhsExpression); + hashPlans[i] = new HashSubPlan(i, subPlans[i], optimized ? null : hashExpressions, joinSpec.isSingleValueOnly(), keyRangeLhsExpression, keyRangeRhsExpression); } TupleProjector.serializeProjectorIntoScan(context.getScan(), tupleProjector); QueryPlan plan = compileSingleFlatQuery(context, query, binds, asSubquery, !asSubquery && joinTable.isAllLeftJoin(), null, !table.isSubselect() && projectPKColumns ? tupleProjector : null, true); @@ -322,7 +327,7 @@ public class QueryCompiler { limit = plan.getLimit(); } HashJoinInfo joinInfo = new HashJoinInfo(projectedTable, joinIds, joinExpressions, joinTypes, starJoinVector, tables, fieldPositions, postJoinFilterExpression, limit); - return HashJoinPlan.create(joinTable.getStatement(), plan, joinInfo, subPlans); + return HashJoinPlan.create(joinTable.getStatement(), plan, joinInfo, hashPlans); } JoinSpec lastJoinSpec = joinSpecs.get(joinSpecs.size() - 1);
