PHOENIX-1535 Secondary local index casues Undefined column error with queries involving joins(Maryann Xue)
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/f9ca8816 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/f9ca8816 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/f9ca8816 Branch: refs/heads/master Commit: f9ca8816d6356b64368d3e1b2b34eee8696f8cc2 Parents: 01cef51 Author: Rajeshbabu Chintaguntla <rajeshb...@apache.org> Authored: Mon Dec 22 11:07:39 2014 +0530 Committer: Rajeshbabu Chintaguntla <rajeshb...@apache.org> Committed: Mon Dec 22 11:07:39 2014 +0530 ---------------------------------------------------------------------- .../org/apache/phoenix/end2end/HashJoinIT.java | 251 +----------------- .../apache/phoenix/end2end/SortMergeJoinIT.java | 251 +----------------- .../org/apache/phoenix/end2end/SubqueryIT.java | 251 +----------------- .../apache/phoenix/compile/JoinCompiler.java | 114 +++++--- .../apache/phoenix/compile/QueryCompiler.java | 10 +- .../coprocessor/BaseScannerRegionObserver.java | 191 ++++++++++++++ .../GroupedAggregateRegionObserver.java | 64 ++--- .../phoenix/coprocessor/ScanRegionObserver.java | 174 +----------- .../UngroupedAggregateRegionObserver.java | 32 ++- .../java/org/apache/phoenix/query/BaseTest.java | 262 +++++++++++++++++++ 10 files changed, 593 insertions(+), 1007 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/f9ca8816/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinIT.java index f8c5899..23d6c13 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinIT.java @@ -89,7 +89,7 @@ public class HashJoinIT extends BaseHBaseManagedTimeIT { @Before public void initTable() throws Exception { - initTableValues(); + initJoinTableValues(getUrl(), null, null); if (indexDDL != null && indexDDL.length > 0) { Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); Connection conn = DriverManager.getConnection(getUrl(), props); @@ -1216,255 +1216,6 @@ public class HashJoinIT extends BaseHBaseManagedTimeIT { } - protected void initTableValues() throws Exception { - ensureTableCreated(getUrl(), JOIN_CUSTOMER_TABLE_FULL_NAME); - ensureTableCreated(getUrl(), JOIN_ITEM_TABLE_FULL_NAME); - ensureTableCreated(getUrl(), JOIN_SUPPLIER_TABLE_FULL_NAME); - ensureTableCreated(getUrl(), JOIN_ORDER_TABLE_FULL_NAME); - - Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); - Connection conn = DriverManager.getConnection(getUrl(), props); - try { - conn.createStatement().execute("CREATE SEQUENCE my.seq"); - // Insert into customer table - PreparedStatement stmt = conn.prepareStatement( - "upsert into " + JOIN_CUSTOMER_TABLE_FULL_NAME + - " (\"customer_id\", " + - " NAME, " + - " PHONE, " + - " ADDRESS, " + - " LOC_ID, " + - " DATE) " + - "values (?, ?, ?, ?, ?, ?)"); - stmt.setString(1, "0000000001"); - stmt.setString(2, "C1"); - stmt.setString(3, "999-999-1111"); - stmt.setString(4, "101 XXX Street"); - stmt.setString(5, "10001"); - stmt.setDate(6, new Date(format.parse("2013-11-01 10:20:36").getTime())); - stmt.execute(); - - stmt.setString(1, "0000000002"); - stmt.setString(2, "C2"); - stmt.setString(3, "999-999-2222"); - stmt.setString(4, "202 XXX Street"); - stmt.setString(5, null); - stmt.setDate(6, new Date(format.parse("2013-11-25 16:45:07").getTime())); - stmt.execute(); - - stmt.setString(1, "0000000003"); - stmt.setString(2, "C3"); - stmt.setString(3, "999-999-3333"); - stmt.setString(4, "303 XXX Street"); - stmt.setString(5, null); - stmt.setDate(6, new Date(format.parse("2013-11-25 10:06:29").getTime())); - stmt.execute(); - - stmt.setString(1, "0000000004"); - stmt.setString(2, "C4"); - stmt.setString(3, "999-999-4444"); - stmt.setString(4, "404 XXX Street"); - stmt.setString(5, "10004"); - stmt.setDate(6, new Date(format.parse("2013-11-22 14:22:56").getTime())); - stmt.execute(); - - stmt.setString(1, "0000000005"); - stmt.setString(2, "C5"); - stmt.setString(3, "999-999-5555"); - stmt.setString(4, "505 XXX Street"); - stmt.setString(5, "10005"); - stmt.setDate(6, new Date(format.parse("2013-11-27 09:37:50").getTime())); - stmt.execute(); - - stmt.setString(1, "0000000006"); - stmt.setString(2, "C6"); - stmt.setString(3, "999-999-6666"); - stmt.setString(4, "606 XXX Street"); - stmt.setString(5, "10001"); - stmt.setDate(6, new Date(format.parse("2013-11-01 10:20:36").getTime())); - stmt.execute(); - - // Insert into item table - stmt = conn.prepareStatement( - "upsert into " + JOIN_ITEM_TABLE_FULL_NAME + - " (\"item_id\", " + - " NAME, " + - " PRICE, " + - " DISCOUNT1, " + - " DISCOUNT2, " + - " \"supplier_id\", " + - " DESCRIPTION) " + - "values (?, ?, ?, ?, ?, ?, ?)"); - stmt.setString(1, "0000000001"); - stmt.setString(2, "T1"); - stmt.setInt(3, 100); - stmt.setInt(4, 5); - stmt.setInt(5, 10); - stmt.setString(6, "0000000001"); - stmt.setString(7, "Item T1"); - stmt.execute(); - - stmt.setString(1, "0000000002"); - stmt.setString(2, "T2"); - stmt.setInt(3, 200); - stmt.setInt(4, 5); - stmt.setInt(5, 8); - stmt.setString(6, "0000000001"); - stmt.setString(7, "Item T2"); - stmt.execute(); - - stmt.setString(1, "0000000003"); - stmt.setString(2, "T3"); - stmt.setInt(3, 300); - stmt.setInt(4, 8); - stmt.setInt(5, 12); - stmt.setString(6, "0000000002"); - stmt.setString(7, "Item T3"); - stmt.execute(); - - stmt.setString(1, "0000000004"); - stmt.setString(2, "T4"); - stmt.setInt(3, 400); - stmt.setInt(4, 6); - stmt.setInt(5, 10); - stmt.setString(6, "0000000002"); - stmt.setString(7, "Item T4"); - stmt.execute(); - - stmt.setString(1, "0000000005"); - stmt.setString(2, "T5"); - stmt.setInt(3, 500); - stmt.setInt(4, 8); - stmt.setInt(5, 15); - stmt.setString(6, "0000000005"); - stmt.setString(7, "Item T5"); - stmt.execute(); - - stmt.setString(1, "0000000006"); - stmt.setString(2, "T6"); - stmt.setInt(3, 600); - stmt.setInt(4, 8); - stmt.setInt(5, 15); - stmt.setString(6, "0000000006"); - stmt.setString(7, "Item T6"); - stmt.execute(); - - stmt.setString(1, "invalid001"); - stmt.setString(2, "INVALID-1"); - stmt.setInt(3, 0); - stmt.setInt(4, 0); - stmt.setInt(5, 0); - stmt.setString(6, "0000000000"); - stmt.setString(7, "Invalid item for join test"); - stmt.execute(); - - // Insert into supplier table - stmt = conn.prepareStatement( - "upsert into " + JOIN_SUPPLIER_TABLE_FULL_NAME + - " (\"supplier_id\", " + - " NAME, " + - " PHONE, " + - " ADDRESS, " + - " LOC_ID) " + - "values (?, ?, ?, ?, ?)"); - stmt.setString(1, "0000000001"); - stmt.setString(2, "S1"); - stmt.setString(3, "888-888-1111"); - stmt.setString(4, "101 YYY Street"); - stmt.setString(5, "10001"); - stmt.execute(); - - stmt.setString(1, "0000000002"); - stmt.setString(2, "S2"); - stmt.setString(3, "888-888-2222"); - stmt.setString(4, "202 YYY Street"); - stmt.setString(5, "10002"); - stmt.execute(); - - stmt.setString(1, "0000000003"); - stmt.setString(2, "S3"); - stmt.setString(3, "888-888-3333"); - stmt.setString(4, "303 YYY Street"); - stmt.setString(5, null); - stmt.execute(); - - stmt.setString(1, "0000000004"); - stmt.setString(2, "S4"); - stmt.setString(3, "888-888-4444"); - stmt.setString(4, "404 YYY Street"); - stmt.setString(5, null); - stmt.execute(); - - stmt.setString(1, "0000000005"); - stmt.setString(2, "S5"); - stmt.setString(3, "888-888-5555"); - stmt.setString(4, "505 YYY Street"); - stmt.setString(5, "10005"); - stmt.execute(); - - stmt.setString(1, "0000000006"); - stmt.setString(2, "S6"); - stmt.setString(3, "888-888-6666"); - stmt.setString(4, "606 YYY Street"); - stmt.setString(5, "10006"); - stmt.execute(); - - // Insert into order table - stmt = conn.prepareStatement( - "upsert into " + JOIN_ORDER_TABLE_FULL_NAME + - " (\"order_id\", " + - " \"customer_id\", " + - " \"item_id\", " + - " PRICE, " + - " QUANTITY," + - " DATE) " + - "values (?, ?, ?, ?, ?, ?)"); - stmt.setString(1, "000000000000001"); - stmt.setString(2, "0000000004"); - stmt.setString(3, "0000000001"); - stmt.setInt(4, 100); - stmt.setInt(5, 1000); - stmt.setTimestamp(6, new Timestamp(format.parse("2013-11-22 14:22:56").getTime())); - stmt.execute(); - - stmt.setString(1, "000000000000002"); - stmt.setString(2, "0000000003"); - stmt.setString(3, "0000000006"); - stmt.setInt(4, 552); - stmt.setInt(5, 2000); - stmt.setTimestamp(6, new Timestamp(format.parse("2013-11-25 10:06:29").getTime())); - stmt.execute(); - - stmt.setString(1, "000000000000003"); - stmt.setString(2, "0000000002"); - stmt.setString(3, "0000000002"); - stmt.setInt(4, 190); - stmt.setInt(5, 3000); - stmt.setTimestamp(6, new Timestamp(format.parse("2013-11-25 16:45:07").getTime())); - stmt.execute(); - - stmt.setString(1, "000000000000004"); - stmt.setString(2, "0000000004"); - stmt.setString(3, "0000000006"); - stmt.setInt(4, 510); - stmt.setInt(5, 4000); - stmt.setTimestamp(6, new Timestamp(format.parse("2013-11-26 13:26:04").getTime())); - stmt.execute(); - - stmt.setString(1, "000000000000005"); - stmt.setString(2, "0000000005"); - stmt.setString(3, "0000000003"); - stmt.setInt(4, 264); - stmt.setInt(5, 5000); - stmt.setTimestamp(6, new Timestamp(format.parse("2013-11-27 09:37:50").getTime())); - stmt.execute(); - - conn.commit(); - } finally { - conn.close(); - } - } - @Test public void testDefaultJoin() throws Exception { String query = "SELECT item.\"item_id\", item.name, supp.\"supplier_id\", supp.name FROM " + JOIN_ITEM_TABLE_FULL_NAME + " item JOIN " + JOIN_SUPPLIER_TABLE_FULL_NAME + " supp ON item.\"supplier_id\" = supp.\"supplier_id\""; http://git-wip-us.apache.org/repos/asf/phoenix/blob/f9ca8816/phoenix-core/src/it/java/org/apache/phoenix/end2end/SortMergeJoinIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SortMergeJoinIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SortMergeJoinIT.java index c75d49a..514664e 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SortMergeJoinIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SortMergeJoinIT.java @@ -88,7 +88,7 @@ public class SortMergeJoinIT extends BaseHBaseManagedTimeIT { @Before public void initTable() throws Exception { - initTableValues(); + initJoinTableValues(getUrl(), null, null); if (indexDDL != null && indexDDL.length > 0) { Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); Connection conn = DriverManager.getConnection(getUrl(), props); @@ -168,255 +168,6 @@ public class SortMergeJoinIT extends BaseHBaseManagedTimeIT { return testCases; } - - protected void initTableValues() throws Exception { - ensureTableCreated(getUrl(), JOIN_CUSTOMER_TABLE_FULL_NAME); - ensureTableCreated(getUrl(), JOIN_ITEM_TABLE_FULL_NAME); - ensureTableCreated(getUrl(), JOIN_SUPPLIER_TABLE_FULL_NAME); - ensureTableCreated(getUrl(), JOIN_ORDER_TABLE_FULL_NAME); - - Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); - Connection conn = DriverManager.getConnection(getUrl(), props); - try { - conn.createStatement().execute("CREATE SEQUENCE my.seq"); - // Insert into customer table - PreparedStatement stmt = conn.prepareStatement( - "upsert into " + JOIN_CUSTOMER_TABLE_FULL_NAME + - " (\"customer_id\", " + - " NAME, " + - " PHONE, " + - " ADDRESS, " + - " LOC_ID, " + - " DATE) " + - "values (?, ?, ?, ?, ?, ?)"); - stmt.setString(1, "0000000001"); - stmt.setString(2, "C1"); - stmt.setString(3, "999-999-1111"); - stmt.setString(4, "101 XXX Street"); - stmt.setString(5, "10001"); - stmt.setDate(6, new Date(format.parse("2013-11-01 10:20:36").getTime())); - stmt.execute(); - - stmt.setString(1, "0000000002"); - stmt.setString(2, "C2"); - stmt.setString(3, "999-999-2222"); - stmt.setString(4, "202 XXX Street"); - stmt.setString(5, null); - stmt.setDate(6, new Date(format.parse("2013-11-25 16:45:07").getTime())); - stmt.execute(); - - stmt.setString(1, "0000000003"); - stmt.setString(2, "C3"); - stmt.setString(3, "999-999-3333"); - stmt.setString(4, "303 XXX Street"); - stmt.setString(5, null); - stmt.setDate(6, new Date(format.parse("2013-11-25 10:06:29").getTime())); - stmt.execute(); - - stmt.setString(1, "0000000004"); - stmt.setString(2, "C4"); - stmt.setString(3, "999-999-4444"); - stmt.setString(4, "404 XXX Street"); - stmt.setString(5, "10004"); - stmt.setDate(6, new Date(format.parse("2013-11-22 14:22:56").getTime())); - stmt.execute(); - - stmt.setString(1, "0000000005"); - stmt.setString(2, "C5"); - stmt.setString(3, "999-999-5555"); - stmt.setString(4, "505 XXX Street"); - stmt.setString(5, "10005"); - stmt.setDate(6, new Date(format.parse("2013-11-27 09:37:50").getTime())); - stmt.execute(); - - stmt.setString(1, "0000000006"); - stmt.setString(2, "C6"); - stmt.setString(3, "999-999-6666"); - stmt.setString(4, "606 XXX Street"); - stmt.setString(5, "10001"); - stmt.setDate(6, new Date(format.parse("2013-11-01 10:20:36").getTime())); - stmt.execute(); - - // Insert into item table - stmt = conn.prepareStatement( - "upsert into " + JOIN_ITEM_TABLE_FULL_NAME + - " (\"item_id\", " + - " NAME, " + - " PRICE, " + - " DISCOUNT1, " + - " DISCOUNT2, " + - " \"supplier_id\", " + - " DESCRIPTION) " + - "values (?, ?, ?, ?, ?, ?, ?)"); - stmt.setString(1, "0000000001"); - stmt.setString(2, "T1"); - stmt.setInt(3, 100); - stmt.setInt(4, 5); - stmt.setInt(5, 10); - stmt.setString(6, "0000000001"); - stmt.setString(7, "Item T1"); - stmt.execute(); - - stmt.setString(1, "0000000002"); - stmt.setString(2, "T2"); - stmt.setInt(3, 200); - stmt.setInt(4, 5); - stmt.setInt(5, 8); - stmt.setString(6, "0000000001"); - stmt.setString(7, "Item T2"); - stmt.execute(); - - stmt.setString(1, "0000000003"); - stmt.setString(2, "T3"); - stmt.setInt(3, 300); - stmt.setInt(4, 8); - stmt.setInt(5, 12); - stmt.setString(6, "0000000002"); - stmt.setString(7, "Item T3"); - stmt.execute(); - - stmt.setString(1, "0000000004"); - stmt.setString(2, "T4"); - stmt.setInt(3, 400); - stmt.setInt(4, 6); - stmt.setInt(5, 10); - stmt.setString(6, "0000000002"); - stmt.setString(7, "Item T4"); - stmt.execute(); - - stmt.setString(1, "0000000005"); - stmt.setString(2, "T5"); - stmt.setInt(3, 500); - stmt.setInt(4, 8); - stmt.setInt(5, 15); - stmt.setString(6, "0000000005"); - stmt.setString(7, "Item T5"); - stmt.execute(); - - stmt.setString(1, "0000000006"); - stmt.setString(2, "T6"); - stmt.setInt(3, 600); - stmt.setInt(4, 8); - stmt.setInt(5, 15); - stmt.setString(6, "0000000006"); - stmt.setString(7, "Item T6"); - stmt.execute(); - - stmt.setString(1, "invalid001"); - stmt.setString(2, "INVALID-1"); - stmt.setInt(3, 0); - stmt.setInt(4, 0); - stmt.setInt(5, 0); - stmt.setString(6, "0000000000"); - stmt.setString(7, "Invalid item for join test"); - stmt.execute(); - - // Insert into supplier table - stmt = conn.prepareStatement( - "upsert into " + JOIN_SUPPLIER_TABLE_FULL_NAME + - " (\"supplier_id\", " + - " NAME, " + - " PHONE, " + - " ADDRESS, " + - " LOC_ID) " + - "values (?, ?, ?, ?, ?)"); - stmt.setString(1, "0000000001"); - stmt.setString(2, "S1"); - stmt.setString(3, "888-888-1111"); - stmt.setString(4, "101 YYY Street"); - stmt.setString(5, "10001"); - stmt.execute(); - - stmt.setString(1, "0000000002"); - stmt.setString(2, "S2"); - stmt.setString(3, "888-888-2222"); - stmt.setString(4, "202 YYY Street"); - stmt.setString(5, "10002"); - stmt.execute(); - - stmt.setString(1, "0000000003"); - stmt.setString(2, "S3"); - stmt.setString(3, "888-888-3333"); - stmt.setString(4, "303 YYY Street"); - stmt.setString(5, null); - stmt.execute(); - - stmt.setString(1, "0000000004"); - stmt.setString(2, "S4"); - stmt.setString(3, "888-888-4444"); - stmt.setString(4, "404 YYY Street"); - stmt.setString(5, null); - stmt.execute(); - - stmt.setString(1, "0000000005"); - stmt.setString(2, "S5"); - stmt.setString(3, "888-888-5555"); - stmt.setString(4, "505 YYY Street"); - stmt.setString(5, "10005"); - stmt.execute(); - - stmt.setString(1, "0000000006"); - stmt.setString(2, "S6"); - stmt.setString(3, "888-888-6666"); - stmt.setString(4, "606 YYY Street"); - stmt.setString(5, "10006"); - stmt.execute(); - - // Insert into order table - stmt = conn.prepareStatement( - "upsert into " + JOIN_ORDER_TABLE_FULL_NAME + - " (\"order_id\", " + - " \"customer_id\", " + - " \"item_id\", " + - " PRICE, " + - " QUANTITY," + - " DATE) " + - "values (?, ?, ?, ?, ?, ?)"); - stmt.setString(1, "000000000000001"); - stmt.setString(2, "0000000004"); - stmt.setString(3, "0000000001"); - stmt.setInt(4, 100); - stmt.setInt(5, 1000); - stmt.setTimestamp(6, new Timestamp(format.parse("2013-11-22 14:22:56").getTime())); - stmt.execute(); - - stmt.setString(1, "000000000000002"); - stmt.setString(2, "0000000003"); - stmt.setString(3, "0000000006"); - stmt.setInt(4, 552); - stmt.setInt(5, 2000); - stmt.setTimestamp(6, new Timestamp(format.parse("2013-11-25 10:06:29").getTime())); - stmt.execute(); - - stmt.setString(1, "000000000000003"); - stmt.setString(2, "0000000002"); - stmt.setString(3, "0000000002"); - stmt.setInt(4, 190); - stmt.setInt(5, 3000); - stmt.setTimestamp(6, new Timestamp(format.parse("2013-11-25 16:45:07").getTime())); - stmt.execute(); - - stmt.setString(1, "000000000000004"); - stmt.setString(2, "0000000004"); - stmt.setString(3, "0000000006"); - stmt.setInt(4, 510); - stmt.setInt(5, 4000); - stmt.setTimestamp(6, new Timestamp(format.parse("2013-11-26 13:26:04").getTime())); - stmt.execute(); - - stmt.setString(1, "000000000000005"); - stmt.setString(2, "0000000005"); - stmt.setString(3, "0000000003"); - stmt.setInt(4, 264); - stmt.setInt(5, 5000); - stmt.setTimestamp(6, new Timestamp(format.parse("2013-11-27 09:37:50").getTime())); - stmt.execute(); - - conn.commit(); - } finally { - conn.close(); - } - } @Test public void testDefaultJoin() throws Exception { http://git-wip-us.apache.org/repos/asf/phoenix/blob/f9ca8816/phoenix-core/src/it/java/org/apache/phoenix/end2end/SubqueryIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SubqueryIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SubqueryIT.java index 470ba9c..2d02970 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SubqueryIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SubqueryIT.java @@ -35,13 +35,10 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import java.sql.Connection; -import java.sql.Date; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; -import java.sql.Timestamp; -import java.text.SimpleDateFormat; import java.util.Collection; import java.util.List; import java.util.Map; @@ -67,7 +64,6 @@ import com.google.common.collect.Maps; @RunWith(Parameterized.class) public class SubqueryIT extends BaseHBaseManagedTimeIT { - private SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); private String[] indexDDL; private String[] plans; @@ -88,7 +84,8 @@ public class SubqueryIT extends BaseHBaseManagedTimeIT { @Before public void initTable() throws Exception { - initTableValues(); + initJoinTableValues(getUrl(), null, null); + initCoItemTableValues(); if (indexDDL != null && indexDDL.length > 0) { Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); Connection conn = DriverManager.getConnection(getUrl(), props); @@ -319,254 +316,14 @@ public class SubqueryIT extends BaseHBaseManagedTimeIT { } - protected void initTableValues() throws Exception { - ensureTableCreated(getUrl(), JOIN_CUSTOMER_TABLE_FULL_NAME); - ensureTableCreated(getUrl(), JOIN_ITEM_TABLE_FULL_NAME); - ensureTableCreated(getUrl(), JOIN_SUPPLIER_TABLE_FULL_NAME); - ensureTableCreated(getUrl(), JOIN_ORDER_TABLE_FULL_NAME); + protected void initCoItemTableValues() throws Exception { ensureTableCreated(getUrl(), JOIN_COITEM_TABLE_FULL_NAME); Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); Connection conn = DriverManager.getConnection(getUrl(), props); try { - conn.createStatement().execute("CREATE SEQUENCE my.seq"); - // Insert into customer table - PreparedStatement stmt = conn.prepareStatement( - "upsert into " + JOIN_CUSTOMER_TABLE_FULL_NAME + - " (\"customer_id\", " + - " NAME, " + - " PHONE, " + - " ADDRESS, " + - " LOC_ID, " + - " DATE) " + - "values (?, ?, ?, ?, ?, ?)"); - stmt.setString(1, "0000000001"); - stmt.setString(2, "C1"); - stmt.setString(3, "999-999-1111"); - stmt.setString(4, "101 XXX Street"); - stmt.setString(5, "10001"); - stmt.setDate(6, new Date(format.parse("2013-11-01 10:20:36").getTime())); - stmt.execute(); - - stmt.setString(1, "0000000002"); - stmt.setString(2, "C2"); - stmt.setString(3, "999-999-2222"); - stmt.setString(4, "202 XXX Street"); - stmt.setString(5, null); - stmt.setDate(6, new Date(format.parse("2013-11-25 16:45:07").getTime())); - stmt.execute(); - - stmt.setString(1, "0000000003"); - stmt.setString(2, "C3"); - stmt.setString(3, "999-999-3333"); - stmt.setString(4, "303 XXX Street"); - stmt.setString(5, null); - stmt.setDate(6, new Date(format.parse("2013-11-25 10:06:29").getTime())); - stmt.execute(); - - stmt.setString(1, "0000000004"); - stmt.setString(2, "C4"); - stmt.setString(3, "999-999-4444"); - stmt.setString(4, "404 XXX Street"); - stmt.setString(5, "10004"); - stmt.setDate(6, new Date(format.parse("2013-11-22 14:22:56").getTime())); - stmt.execute(); - - stmt.setString(1, "0000000005"); - stmt.setString(2, "C5"); - stmt.setString(3, "999-999-5555"); - stmt.setString(4, "505 XXX Street"); - stmt.setString(5, "10005"); - stmt.setDate(6, new Date(format.parse("2013-11-27 09:37:50").getTime())); - stmt.execute(); - - stmt.setString(1, "0000000006"); - stmt.setString(2, "C6"); - stmt.setString(3, "999-999-6666"); - stmt.setString(4, "606 XXX Street"); - stmt.setString(5, "10001"); - stmt.setDate(6, new Date(format.parse("2013-11-01 10:20:36").getTime())); - stmt.execute(); - - // Insert into item table - stmt = conn.prepareStatement( - "upsert into " + JOIN_ITEM_TABLE_FULL_NAME + - " (\"item_id\", " + - " NAME, " + - " PRICE, " + - " DISCOUNT1, " + - " DISCOUNT2, " + - " \"supplier_id\", " + - " DESCRIPTION) " + - "values (?, ?, ?, ?, ?, ?, ?)"); - stmt.setString(1, "0000000001"); - stmt.setString(2, "T1"); - stmt.setInt(3, 100); - stmt.setInt(4, 5); - stmt.setInt(5, 10); - stmt.setString(6, "0000000001"); - stmt.setString(7, "Item T1"); - stmt.execute(); - - stmt.setString(1, "0000000002"); - stmt.setString(2, "T2"); - stmt.setInt(3, 200); - stmt.setInt(4, 5); - stmt.setInt(5, 8); - stmt.setString(6, "0000000001"); - stmt.setString(7, "Item T2"); - stmt.execute(); - - stmt.setString(1, "0000000003"); - stmt.setString(2, "T3"); - stmt.setInt(3, 300); - stmt.setInt(4, 8); - stmt.setInt(5, 12); - stmt.setString(6, "0000000002"); - stmt.setString(7, "Item T3"); - stmt.execute(); - - stmt.setString(1, "0000000004"); - stmt.setString(2, "T4"); - stmt.setInt(3, 400); - stmt.setInt(4, 6); - stmt.setInt(5, 10); - stmt.setString(6, "0000000002"); - stmt.setString(7, "Item T4"); - stmt.execute(); - - stmt.setString(1, "0000000005"); - stmt.setString(2, "T5"); - stmt.setInt(3, 500); - stmt.setInt(4, 8); - stmt.setInt(5, 15); - stmt.setString(6, "0000000005"); - stmt.setString(7, "Item T5"); - stmt.execute(); - - stmt.setString(1, "0000000006"); - stmt.setString(2, "T6"); - stmt.setInt(3, 600); - stmt.setInt(4, 8); - stmt.setInt(5, 15); - stmt.setString(6, "0000000006"); - stmt.setString(7, "Item T6"); - stmt.execute(); - - stmt.setString(1, "invalid001"); - stmt.setString(2, "INVALID-1"); - stmt.setInt(3, 0); - stmt.setInt(4, 0); - stmt.setInt(5, 0); - stmt.setString(6, "0000000000"); - stmt.setString(7, "Invalid item for join test"); - stmt.execute(); - - // Insert into supplier table - stmt = conn.prepareStatement( - "upsert into " + JOIN_SUPPLIER_TABLE_FULL_NAME + - " (\"supplier_id\", " + - " NAME, " + - " PHONE, " + - " ADDRESS, " + - " LOC_ID) " + - "values (?, ?, ?, ?, ?)"); - stmt.setString(1, "0000000001"); - stmt.setString(2, "S1"); - stmt.setString(3, "888-888-1111"); - stmt.setString(4, "101 YYY Street"); - stmt.setString(5, "10001"); - stmt.execute(); - - stmt.setString(1, "0000000002"); - stmt.setString(2, "S2"); - stmt.setString(3, "888-888-2222"); - stmt.setString(4, "202 YYY Street"); - stmt.setString(5, "10002"); - stmt.execute(); - - stmt.setString(1, "0000000003"); - stmt.setString(2, "S3"); - stmt.setString(3, "888-888-3333"); - stmt.setString(4, "303 YYY Street"); - stmt.setString(5, null); - stmt.execute(); - - stmt.setString(1, "0000000004"); - stmt.setString(2, "S4"); - stmt.setString(3, "888-888-4444"); - stmt.setString(4, "404 YYY Street"); - stmt.setString(5, null); - stmt.execute(); - - stmt.setString(1, "0000000005"); - stmt.setString(2, "S5"); - stmt.setString(3, "888-888-5555"); - stmt.setString(4, "505 YYY Street"); - stmt.setString(5, "10005"); - stmt.execute(); - - stmt.setString(1, "0000000006"); - stmt.setString(2, "S6"); - stmt.setString(3, "888-888-6666"); - stmt.setString(4, "606 YYY Street"); - stmt.setString(5, "10006"); - stmt.execute(); - - // Insert into order table - stmt = conn.prepareStatement( - "upsert into " + JOIN_ORDER_TABLE_FULL_NAME + - " (\"order_id\", " + - " \"customer_id\", " + - " \"item_id\", " + - " PRICE, " + - " QUANTITY," + - " DATE) " + - "values (?, ?, ?, ?, ?, ?)"); - stmt.setString(1, "000000000000001"); - stmt.setString(2, "0000000004"); - stmt.setString(3, "0000000001"); - stmt.setInt(4, 100); - stmt.setInt(5, 1000); - stmt.setTimestamp(6, new Timestamp(format.parse("2013-11-22 14:22:56").getTime())); - stmt.execute(); - - stmt.setString(1, "000000000000002"); - stmt.setString(2, "0000000003"); - stmt.setString(3, "0000000006"); - stmt.setInt(4, 552); - stmt.setInt(5, 2000); - stmt.setTimestamp(6, new Timestamp(format.parse("2013-11-25 10:06:29").getTime())); - stmt.execute(); - - stmt.setString(1, "000000000000003"); - stmt.setString(2, "0000000002"); - stmt.setString(3, "0000000002"); - stmt.setInt(4, 190); - stmt.setInt(5, 3000); - stmt.setTimestamp(6, new Timestamp(format.parse("2013-11-25 16:45:07").getTime())); - stmt.execute(); - - stmt.setString(1, "000000000000004"); - stmt.setString(2, "0000000004"); - stmt.setString(3, "0000000006"); - stmt.setInt(4, 510); - stmt.setInt(5, 4000); - stmt.setTimestamp(6, new Timestamp(format.parse("2013-11-26 13:26:04").getTime())); - stmt.execute(); - - stmt.setString(1, "000000000000005"); - stmt.setString(2, "0000000005"); - stmt.setString(3, "0000000003"); - stmt.setInt(4, 264); - stmt.setInt(5, 5000); - stmt.setTimestamp(6, new Timestamp(format.parse("2013-11-27 09:37:50").getTime())); - stmt.execute(); - - conn.commit(); - // Insert into coitem table - stmt = conn.prepareStatement( + PreparedStatement stmt = conn.prepareStatement( "upsert into " + JOIN_COITEM_TABLE_FULL_NAME + " (item_id, " + " item_name, " + http://git-wip-us.apache.org/repos/asf/phoenix/blob/f9ca8816/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java index f90cef8..ecc66dd 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java @@ -41,6 +41,7 @@ import org.apache.phoenix.expression.CoerceExpression; import org.apache.phoenix.expression.Expression; import org.apache.phoenix.expression.LiteralExpression; import org.apache.phoenix.expression.function.CountAggregateFunction; +import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.jdbc.PhoenixStatement; import org.apache.phoenix.parse.AliasedNode; import org.apache.phoenix.parse.AndParseNode; @@ -69,26 +70,29 @@ import org.apache.phoenix.parse.WildcardParseNode; import org.apache.phoenix.schema.AmbiguousColumnException; import org.apache.phoenix.schema.ColumnNotFoundException; import org.apache.phoenix.schema.ColumnRef; -import org.apache.phoenix.schema.types.PDate; -import org.apache.phoenix.schema.types.PDecimal; -import org.apache.phoenix.schema.types.PBoolean; -import org.apache.phoenix.schema.types.PDouble; -import org.apache.phoenix.schema.types.PInteger; +import org.apache.phoenix.schema.MetaDataEntityNotFoundException; import org.apache.phoenix.schema.PColumn; import org.apache.phoenix.schema.PColumnImpl; -import org.apache.phoenix.schema.types.PDataType; -import org.apache.phoenix.schema.types.PLong; import org.apache.phoenix.schema.PName; import org.apache.phoenix.schema.PNameFactory; -import org.apache.phoenix.schema.types.PSmallint; import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.schema.PTable.IndexType; import org.apache.phoenix.schema.PTableImpl; import org.apache.phoenix.schema.PTableType; +import org.apache.phoenix.schema.TableRef; +import org.apache.phoenix.schema.types.PBoolean; +import org.apache.phoenix.schema.types.PDataType; +import org.apache.phoenix.schema.types.PDate; +import org.apache.phoenix.schema.types.PDecimal; +import org.apache.phoenix.schema.types.PDouble; +import org.apache.phoenix.schema.types.PInteger; +import org.apache.phoenix.schema.types.PLong; +import org.apache.phoenix.schema.types.PSmallint; import org.apache.phoenix.schema.types.PTimestamp; import org.apache.phoenix.schema.types.PTinyint; import org.apache.phoenix.schema.types.PVarbinary; import org.apache.phoenix.schema.types.PVarchar; -import org.apache.phoenix.schema.TableRef; +import org.apache.phoenix.util.IndexUtil; import org.apache.phoenix.util.SchemaUtil; import com.google.common.collect.ArrayListMultimap; @@ -129,9 +133,9 @@ public class JoinCompiler { joinTable.addFilter(select.getWhere()); } - ColumnRefParseNodeVisitor generalRefVisitor = new ColumnRefParseNodeVisitor(resolver); - ColumnRefParseNodeVisitor joinLocalRefVisitor = new ColumnRefParseNodeVisitor(resolver); - ColumnRefParseNodeVisitor prefilterRefVisitor = new ColumnRefParseNodeVisitor(resolver); + ColumnRefParseNodeVisitor generalRefVisitor = new ColumnRefParseNodeVisitor(resolver, statement.getConnection()); + ColumnRefParseNodeVisitor joinLocalRefVisitor = new ColumnRefParseNodeVisitor(resolver, statement.getConnection()); + ColumnRefParseNodeVisitor prefilterRefVisitor = new ColumnRefParseNodeVisitor(resolver, statement.getConnection()); joinTable.pushDownColumnRefVisitors(generalRefVisitor, joinLocalRefVisitor, prefilterRefVisitor); @@ -315,7 +319,7 @@ public class JoinCompiler { WhereNodeVisitor visitor = new WhereNodeVisitor(origResolver, table, postFilters, Collections.<TableRef>singletonList(table.getTableRef()), - isPrefilterAccepted, prefilterAcceptedTables); + isPrefilterAccepted, prefilterAcceptedTables, statement.getConnection()); filter.accept(visitor); } @@ -438,7 +442,7 @@ public class JoinCompiler { } } - public static class JoinSpec { + public class JoinSpec { private final JoinType type; private final List<EqualParseNode> onConditions; private final JoinTable joinTable; @@ -453,7 +457,7 @@ public class JoinCompiler { this.joinTable = joinTable; this.singleValueOnly = singleValueOnly; this.dependencies = new HashSet<TableRef>(); - this.onNodeVisitor = new OnNodeVisitor(resolver, onConditions, dependencies, joinTable); + this.onNodeVisitor = new OnNodeVisitor(resolver, onConditions, dependencies, joinTable, statement.getConnection()); if (onNode != null) { onNode.accept(this.onNodeVisitor); } @@ -711,13 +715,14 @@ public class JoinCompiler { } for (ColumnRef columnRef : columnRefs.keySet()) { if (columnRef.getTableRef().equals(tableRef) - && !SchemaUtil.isPKColumn(columnRef.getColumn())) { + && !SchemaUtil.isPKColumn(columnRef.getColumn()) + && !(columnRef instanceof LocalIndexColumnRef)) { scan.addColumn(columnRef.getColumn().getFamilyName().getBytes(), columnRef.getColumn().getName().getBytes()); } } } - public ProjectedPTableWrapper createProjectedTable(boolean retainPKColumns) throws SQLException { + public ProjectedPTableWrapper createProjectedTable(boolean retainPKColumns, StatementContext context) throws SQLException { assert(!isSubselect()); List<PColumn> projectedColumns = new ArrayList<PColumn>(); List<Expression> sourceExpressions = new ArrayList<Expression>(); @@ -727,14 +732,14 @@ public class JoinCompiler { if (retainPKColumns) { for (PColumn column : table.getPKColumns()) { addProjectedColumn(projectedColumns, sourceExpressions, columnNameMap, - column, column.getFamilyName(), hasSaltingColumn); + column, column.getFamilyName(), hasSaltingColumn, false, context); } } if (isWildCardSelect()) { for (PColumn column : table.getColumns()) { if (!retainPKColumns || !SchemaUtil.isPKColumn(column)) { addProjectedColumn(projectedColumns, sourceExpressions, columnNameMap, - column, PNameFactory.newName(TupleProjector.VALUE_COLUMN_FAMILY), hasSaltingColumn); + column, PNameFactory.newName(TupleProjector.VALUE_COLUMN_FAMILY), hasSaltingColumn, false, context); } } } else { @@ -745,7 +750,8 @@ public class JoinCompiler { && (!retainPKColumns || !SchemaUtil.isPKColumn(columnRef.getColumn()))) { PColumn column = columnRef.getColumn(); addProjectedColumn(projectedColumns, sourceExpressions, columnNameMap, - column, PNameFactory.newName(TupleProjector.VALUE_COLUMN_FAMILY), hasSaltingColumn); + column, PNameFactory.newName(TupleProjector.VALUE_COLUMN_FAMILY), hasSaltingColumn, + columnRef instanceof LocalIndexColumnRef, context); } } } @@ -758,7 +764,8 @@ public class JoinCompiler { } private void addProjectedColumn(List<PColumn> projectedColumns, List<Expression> sourceExpressions, - ListMultimap<String, String> columnNameMap, PColumn sourceColumn, PName familyName, boolean hasSaltingColumn) + ListMultimap<String, String> columnNameMap, PColumn sourceColumn, PName familyName, boolean hasSaltingColumn, + boolean isLocalIndexColumnRef, StatementContext context) throws SQLException { if (sourceColumn == SALTING_COLUMN) return; @@ -767,7 +774,7 @@ public class JoinCompiler { PTable table = tableRef.getTable(); String schemaName = table.getSchemaName().getString(); String tableName = table.getTableName().getString(); - String colName = sourceColumn.getName().getString(); + String colName = isLocalIndexColumnRef ? IndexUtil.getIndexColumnName(sourceColumn) : sourceColumn.getName().getString(); String fullName = getProjectedColumnName(schemaName, tableName, colName); String aliasedName = tableRef.getTableAlias() == null ? fullName : getProjectedColumnName(null, tableRef.getTableAlias(), colName); @@ -780,7 +787,9 @@ public class JoinCompiler { PColumnImpl column = new PColumnImpl(name, familyName, sourceColumn.getDataType(), sourceColumn.getMaxLength(), sourceColumn.getScale(), sourceColumn.isNullable(), position, sourceColumn.getSortOrder(), sourceColumn.getArraySize(), sourceColumn.getViewConstant(), sourceColumn.isViewReferenced()); - Expression sourceExpression = new ColumnRef(tableRef, sourceColumn.getPosition()).newColumnExpression(); + Expression sourceExpression = isLocalIndexColumnRef ? + NODE_FACTORY.column(TableName.create(schemaName, tableName), "\"" + colName + "\"", null).accept(new ExpressionCompiler(context)) + : new ColumnRef(tableRef, sourceColumn.getPosition()).newColumnExpression(); projectedColumns.add(column); sourceExpressions.add(sourceExpression); } @@ -818,13 +827,13 @@ public class JoinCompiler { public WhereNodeVisitor(ColumnResolver resolver, Table table, List<ParseNode> postFilters, List<TableRef> selfTableRefs, boolean isPrefilterAccepted, - List<JoinSpec> prefilterAcceptedTables) { + List<JoinSpec> prefilterAcceptedTables, PhoenixConnection connection) { this.table = table; this.postFilters = postFilters; this.selfTableRefs = selfTableRefs; this.isPrefilterAccepted = isPrefilterAccepted; this.prefilterAcceptedTables = prefilterAcceptedTables; - this.columnRefVisitor = new ColumnRefParseNodeVisitor(resolver); + this.columnRefVisitor = new ColumnRefParseNodeVisitor(resolver, connection); } @Override @@ -919,11 +928,11 @@ public class JoinCompiler { private ColumnRefParseNodeVisitor columnRefVisitor; public OnNodeVisitor(ColumnResolver resolver, List<EqualParseNode> onConditions, - Set<TableRef> dependencies, JoinTable joinTable) { + Set<TableRef> dependencies, JoinTable joinTable, PhoenixConnection connection) { this.onConditions = onConditions; this.dependencies = dependencies; this.joinTable = joinTable; - this.columnRefVisitor = new ColumnRefParseNodeVisitor(resolver); + this.columnRefVisitor = new ColumnRefParseNodeVisitor(resolver, connection); } @Override protected boolean enterBooleanNode(ParseNode node) throws SQLException { @@ -1005,18 +1014,35 @@ public class JoinCompiler { throw new SQLExceptionInfo.Builder(SQLExceptionCode.AMBIGUOUS_JOIN_CONDITION).build().buildException(); } } + + private static class LocalIndexColumnRef extends ColumnRef { + private final TableRef indexTableRef; + + public LocalIndexColumnRef(TableRef tableRef, String familyName, + String columnName, TableRef indexTableRef) throws MetaDataEntityNotFoundException { + super(tableRef, familyName, columnName); + this.indexTableRef = indexTableRef; + } + + @Override + public TableRef getTableRef() { + return indexTableRef; + } + } private static class ColumnRefParseNodeVisitor extends StatelessTraverseAllParseNodeVisitor { public enum ColumnRefType {NONE, SELF_ONLY, FOREIGN_ONLY, COMPLEX}; - private ColumnResolver resolver; + private final ColumnResolver resolver; + private final PhoenixConnection connection; private final Set<TableRef> tableRefSet; private final Map<ColumnRef, ColumnParseNode> columnRefMap; - public ColumnRefParseNodeVisitor(ColumnResolver resolver) { + public ColumnRefParseNodeVisitor(ColumnResolver resolver, PhoenixConnection connection) { this.resolver = resolver; this.tableRefSet = new HashSet<TableRef>(); this.columnRefMap = new HashMap<ColumnRef, ColumnParseNode>(); + this.connection = connection; } public void reset() { @@ -1026,7 +1052,27 @@ public class JoinCompiler { @Override public Void visit(ColumnParseNode node) throws SQLException { - ColumnRef columnRef = resolver.resolveColumn(node.getSchemaName(), node.getTableName(), node.getName()); + ColumnRef columnRef = null; + try { + columnRef = resolver.resolveColumn(node.getSchemaName(), node.getTableName(), node.getName()); + } catch (ColumnNotFoundException e) { + // This could be a LocalIndexDataColumnRef. If so, the table name must have + // been appended by the IndexStatementRewriter, and we can convert it into. + TableRef tableRef = resolver.resolveTable(node.getSchemaName(), node.getTableName()); + if (tableRef.getTable().getIndexType() == IndexType.LOCAL) { + TableRef parentTableRef = FromCompiler.getResolver( + NODE_FACTORY.namedTable(null, TableName.create(tableRef.getTable() + .getSchemaName().getString(), tableRef.getTable() + .getParentTableName().getString())), connection).resolveTable( + tableRef.getTable().getSchemaName().getString(), + tableRef.getTable().getParentTableName().getString()); + columnRef = new LocalIndexColumnRef(parentTableRef, + IndexUtil.getDataColumnFamilyName(node.getName()), + IndexUtil.getDataColumnName(node.getName()), tableRef); + } else { + throw e; + } + } columnRefMap.put(columnRef, node); tableRefSet.add(columnRef.getTableRef()); return null; @@ -1092,9 +1138,9 @@ public class JoinCompiler { return NODE_FACTORY.and(nodes); } - private static List<AliasedNode> extractFromSelect(List<AliasedNode> select, TableRef tableRef, ColumnResolver resolver) throws SQLException { + private List<AliasedNode> extractFromSelect(List<AliasedNode> select, TableRef tableRef, ColumnResolver resolver) throws SQLException { List<AliasedNode> ret = new ArrayList<AliasedNode>(); - ColumnRefParseNodeVisitor visitor = new ColumnRefParseNodeVisitor(resolver); + ColumnRefParseNodeVisitor visitor = new ColumnRefParseNodeVisitor(resolver, statement.getConnection()); for (AliasedNode aliasedNode : select) { ParseNode node = aliasedNode.getNode(); if (node instanceof TableWildcardParseNode) { @@ -1145,7 +1191,7 @@ public class JoinCompiler { TableRef groupByTableRef = null; TableRef orderByTableRef = null; if (select.getGroupBy() != null && !select.getGroupBy().isEmpty()) { - ColumnRefParseNodeVisitor groupByVisitor = new ColumnRefParseNodeVisitor(resolver); + ColumnRefParseNodeVisitor groupByVisitor = new ColumnRefParseNodeVisitor(resolver, statement.getConnection()); for (ParseNode node : select.getGroupBy()) { node.accept(groupByVisitor); } @@ -1154,7 +1200,7 @@ public class JoinCompiler { groupByTableRef = set.iterator().next(); } } else if (select.getOrderBy() != null && !select.getOrderBy().isEmpty()) { - ColumnRefParseNodeVisitor orderByVisitor = new ColumnRefParseNodeVisitor(resolver); + ColumnRefParseNodeVisitor orderByVisitor = new ColumnRefParseNodeVisitor(resolver, statement.getConnection()); for (OrderByNode node : select.getOrderBy()) { node.getNode().accept(orderByVisitor); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/f9ca8816/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java index 3f98ddc..014e73a 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java @@ -191,9 +191,9 @@ public class QueryCompiler { Table table = joinTable.getTable(); SelectStatement subquery = table.getAsSubquery(orderBy); if (!table.isSubselect()) { - ProjectedPTableWrapper projectedTable = table.createProjectedTable(!projectPKColumns); - TupleProjector.serializeProjectorIntoScan(context.getScan(), projectedTable.createTupleProjector()); context.setCurrentTable(table.getTableRef()); + ProjectedPTableWrapper projectedTable = table.createProjectedTable(!projectPKColumns, context); + TupleProjector.serializeProjectorIntoScan(context.getScan(), projectedTable.createTupleProjector()); context.setResolver(projectedTable.createColumnResolver()); table.projectColumns(context.getScan()); return compileSingleQuery(context, subquery, binds, asSubquery, !asSubquery); @@ -211,7 +211,8 @@ public class QueryCompiler { TableRef tableRef; SelectStatement query; if (!table.isSubselect()) { - initialProjectedTable = table.createProjectedTable(!projectPKColumns); + context.setCurrentTable(table.getTableRef()); + initialProjectedTable = table.createProjectedTable(!projectPKColumns, context); tableRef = table.getTableRef(); table.projectColumns(context.getScan()); query = joinTable.getAsSingleSubquery(table.getAsSubquery(orderBy), asSubquery); @@ -300,7 +301,8 @@ public class QueryCompiler { TableRef rhsTableRef; SelectStatement rhs; if (!rhsTable.isSubselect()) { - rhsProjTable = rhsTable.createProjectedTable(!projectPKColumns); + context.setCurrentTable(rhsTable.getTableRef()); + rhsProjTable = rhsTable.createProjectedTable(!projectPKColumns, context); rhsTableRef = rhsTable.getTableRef(); rhsTable.projectColumns(context.getScan()); rhs = rhsJoinTable.getAsSingleSubquery(rhsTable.getAsSubquery(orderBy), asSubquery); http://git-wip-us.apache.org/repos/asf/phoenix/blob/f9ca8816/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java index 68fa3d1..2e179a5 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java @@ -18,23 +18,43 @@ package org.apache.phoenix.coprocessor; import java.io.IOException; +import java.util.List; +import java.util.Set; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CoprocessorEnvironment; import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.KeyValue.Type; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; import org.apache.hadoop.hbase.coprocessor.ObserverContext; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.phoenix.execute.TupleProjector; +import org.apache.phoenix.expression.Expression; +import org.apache.phoenix.expression.KeyValueColumnExpression; +import org.apache.phoenix.hbase.index.covered.update.ColumnReference; +import org.apache.phoenix.index.IndexMaintainer; +import org.apache.phoenix.query.QueryConstants; +import org.apache.phoenix.schema.KeyValueSchema; import org.apache.phoenix.schema.StaleRegionBoundaryCacheException; +import org.apache.phoenix.schema.ValueBitSet; +import org.apache.phoenix.schema.tuple.MultiKeyValueTuple; import org.apache.phoenix.trace.util.Tracing; +import org.apache.phoenix.util.IndexUtil; import org.apache.phoenix.util.ScanUtil; import org.apache.phoenix.util.ServerUtil; import org.cloudera.htrace.Span; +import com.google.common.collect.ImmutableList; + abstract public class BaseScannerRegionObserver extends BaseRegionObserver { @@ -163,4 +183,175 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver { return null; // impossible } } + + /** + * Return wrapped scanner that catches unexpected exceptions (i.e. Phoenix bugs) and + * re-throws as DoNotRetryIOException to prevent needless retrying hanging the query + * for 30 seconds. Unfortunately, until HBASE-7481 gets fixed, there's no way to do + * the same from a custom filter. + * @param offset starting position in the rowkey. + * @param scan + * @param tupleProjector + * @param dataRegion + * @param indexMaintainer + * @param viewConstants + */ + protected RegionScanner getWrappedScanner(final ObserverContext<RegionCoprocessorEnvironment> c, + final RegionScanner s, final int offset, final Scan scan, + final ColumnReference[] dataColumns, final TupleProjector tupleProjector, + final HRegion dataRegion, final IndexMaintainer indexMaintainer, + final byte[][] viewConstants, final ImmutableBytesWritable ptr) { + return getWrappedScanner(c, s, null, null, offset, scan, dataColumns, tupleProjector, + dataRegion, indexMaintainer, viewConstants, null, null, ptr); + } + + /** + * Return wrapped scanner that catches unexpected exceptions (i.e. Phoenix bugs) and + * re-throws as DoNotRetryIOException to prevent needless retrying hanging the query + * for 30 seconds. Unfortunately, until HBASE-7481 gets fixed, there's no way to do + * the same from a custom filter. + * @param arrayFuncRefs + * @param arrayKVRefs + * @param offset starting position in the rowkey. + * @param scan + * @param tupleProjector + * @param dataRegion + * @param indexMaintainer + * @param viewConstants + */ + protected RegionScanner getWrappedScanner(final ObserverContext<RegionCoprocessorEnvironment> c, + final RegionScanner s, final Set<KeyValueColumnExpression> arrayKVRefs, + final Expression[] arrayFuncRefs, final int offset, final Scan scan, + final ColumnReference[] dataColumns, final TupleProjector tupleProjector, + final HRegion dataRegion, final IndexMaintainer indexMaintainer, + final byte[][] viewConstants, final KeyValueSchema kvSchema, + final ValueBitSet kvSchemaBitSet, final ImmutableBytesWritable ptr) { + return new RegionScanner() { + + @Override + public boolean next(List<Cell> results) throws IOException { + try { + return s.next(results); + } catch (Throwable t) { + ServerUtil.throwIOException(c.getEnvironment().getRegion().getRegionNameAsString(), t); + return false; // impossible + } + } + + @Override + public boolean next(List<Cell> result, int limit) throws IOException { + try { + return s.next(result, limit); + } catch (Throwable t) { + ServerUtil.throwIOException(c.getEnvironment().getRegion().getRegionNameAsString(), t); + return false; // impossible + } + } + + @Override + public void close() throws IOException { + s.close(); + } + + @Override + public HRegionInfo getRegionInfo() { + return s.getRegionInfo(); + } + + @Override + public boolean isFilterDone() throws IOException { + return s.isFilterDone(); + } + + @Override + public boolean reseek(byte[] row) throws IOException { + return s.reseek(row); + } + + @Override + public long getMvccReadPoint() { + return s.getMvccReadPoint(); + } + + @Override + public boolean nextRaw(List<Cell> result) throws IOException { + try { + boolean next = s.nextRaw(result); + if (result.size() == 0) { + return next; + } + if (arrayFuncRefs != null && arrayFuncRefs.length > 0 && arrayKVRefs.size() > 0) { + replaceArrayIndexElement(arrayKVRefs, arrayFuncRefs, result); + } + if (ScanUtil.isLocalIndex(scan) && !ScanUtil.isAnalyzeTable(scan)) { + IndexUtil.wrapResultUsingOffset(result, offset, dataColumns, tupleProjector, dataRegion, indexMaintainer, viewConstants, ptr); + } + // There is a scanattribute set to retrieve the specific array element + return next; + } catch (Throwable t) { + ServerUtil.throwIOException(c.getEnvironment().getRegion().getRegionNameAsString(), t); + return false; // impossible + } + } + + @Override + public boolean nextRaw(List<Cell> result, int limit) throws IOException { + try { + boolean next = s.nextRaw(result, limit); + if (result.size() == 0) { + return next; + } + if (arrayFuncRefs != null && arrayFuncRefs.length > 0 && arrayKVRefs.size() > 0) { + replaceArrayIndexElement(arrayKVRefs, arrayFuncRefs, result); + } + if ((offset > 0 || ScanUtil.isLocalIndex(scan)) && !ScanUtil.isAnalyzeTable(scan)) { + IndexUtil.wrapResultUsingOffset(result, offset, dataColumns, tupleProjector, dataRegion, indexMaintainer, viewConstants, ptr); + } + // There is a scanattribute set to retrieve the specific array element + return next; + } catch (Throwable t) { + ServerUtil.throwIOException(c.getEnvironment().getRegion().getRegionNameAsString(), t); + return false; // impossible + } + } + + private void replaceArrayIndexElement(final Set<KeyValueColumnExpression> arrayKVRefs, + final Expression[] arrayFuncRefs, List<Cell> result) { + // make a copy of the results array here, as we're modifying it below + MultiKeyValueTuple tuple = new MultiKeyValueTuple(ImmutableList.copyOf(result)); + // The size of both the arrays would be same? + // Using KeyValueSchema to set and retrieve the value + // collect the first kv to get the row + Cell rowKv = result.get(0); + for (KeyValueColumnExpression kvExp : arrayKVRefs) { + if (kvExp.evaluate(tuple, ptr)) { + for (int idx = tuple.size() - 1; idx >= 0; idx--) { + Cell kv = tuple.getValue(idx); + if (Bytes.equals(kvExp.getColumnFamily(), 0, kvExp.getColumnFamily().length, + kv.getFamilyArray(), kv.getFamilyOffset(), kv.getFamilyLength()) + && Bytes.equals(kvExp.getColumnName(), 0, kvExp.getColumnName().length, + kv.getQualifierArray(), kv.getQualifierOffset(), kv.getQualifierLength())) { + // remove the kv that has the full array values. + result.remove(idx); + break; + } + } + } + } + byte[] value = kvSchema.toBytes(tuple, arrayFuncRefs, + kvSchemaBitSet, ptr); + // Add a dummy kv with the exact value of the array index + result.add(new KeyValue(rowKv.getRowArray(), rowKv.getRowOffset(), rowKv.getRowLength(), + QueryConstants.ARRAY_VALUE_COLUMN_FAMILY, 0, QueryConstants.ARRAY_VALUE_COLUMN_FAMILY.length, + QueryConstants.ARRAY_VALUE_COLUMN_QUALIFIER, 0, + QueryConstants.ARRAY_VALUE_COLUMN_QUALIFIER.length, HConstants.LATEST_TIMESTAMP, + Type.codeToType(rowKv.getTypeByte()), value, 0, value.length)); + } + + @Override + public long getMaxResultSize() { + return s.getMaxResultSize(); + } + }; + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/f9ca8816/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java index ca21742..8b59b85 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java @@ -63,9 +63,9 @@ import org.apache.phoenix.index.IndexMaintainer; import org.apache.phoenix.join.HashJoinInfo; import org.apache.phoenix.memory.MemoryManager.MemoryChunk; import org.apache.phoenix.query.QueryConstants; -import org.apache.phoenix.schema.types.PInteger; import org.apache.phoenix.schema.SortOrder; import org.apache.phoenix.schema.tuple.MultiKeyValueTuple; +import org.apache.phoenix.schema.types.PInteger; import org.apache.phoenix.util.Closeables; import org.apache.phoenix.util.IndexUtil; import org.apache.phoenix.util.KeyValueUtil; @@ -123,23 +123,10 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver { .getAttribute(BaseScannerRegionObserver.AGGREGATORS), c .getEnvironment().getConfiguration()); - final TupleProjector p = TupleProjector.deserializeProjectorFromScan(scan); - final HashJoinInfo j = HashJoinInfo.deserializeHashJoinFromScan(scan); - long limit = Long.MAX_VALUE; - byte[] limitBytes = scan.getAttribute(GROUP_BY_LIMIT); - if (limitBytes != null) { - limit = PInteger.INSTANCE.getCodec().decodeInt(limitBytes, 0, SortOrder.getDefault()); - } - RegionScanner innerScanner = s; - if (p != null || j != null) { - innerScanner = - new HashJoinRegionScanner(s, p, j, ScanUtil.getTenantId(scan), - c.getEnvironment()); - } + byte[] localIndexBytes = scan.getAttribute(LOCAL_INDEX_BUILD); List<IndexMaintainer> indexMaintainers = localIndexBytes == null ? null : IndexMaintainer.deserialize(localIndexBytes); - boolean localIndexScan = ScanUtil.isLocalIndex(scan); TupleProjector tupleProjector = null; HRegion dataRegion = null; byte[][] viewConstants = null; @@ -150,17 +137,30 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver { dataRegion = IndexUtil.getDataRegion(c.getEnvironment()); viewConstants = IndexUtil.deserializeViewConstantsFromScan(scan); } + ImmutableBytesWritable tempPtr = new ImmutableBytesWritable(); + innerScanner = + getWrappedScanner(c, innerScanner, offset, scan, dataColumns, tupleProjector, + dataRegion, indexMaintainers == null ? null : indexMaintainers.get(0), viewConstants, tempPtr); } + final TupleProjector p = TupleProjector.deserializeProjectorFromScan(scan); + final HashJoinInfo j = HashJoinInfo.deserializeHashJoinFromScan(scan); + if (p != null || j != null) { + innerScanner = + new HashJoinRegionScanner(innerScanner, p, j, ScanUtil.getTenantId(scan), + c.getEnvironment()); + } + + long limit = Long.MAX_VALUE; + byte[] limitBytes = scan.getAttribute(GROUP_BY_LIMIT); + if (limitBytes != null) { + limit = PInteger.INSTANCE.getCodec().decodeInt(limitBytes, 0, SortOrder.getDefault()); + } if (keyOrdered) { // Optimize by taking advantage that the rows are // already in the required group by key order - return scanOrdered(c, scan, innerScanner, expressions, aggregators, limit, offset, - localIndexScan, dataColumns, tupleProjector, indexMaintainers, dataRegion, - viewConstants); + return scanOrdered(c, scan, innerScanner, expressions, aggregators, limit); } else { // Otherwse, collect them all up in an in memory map - return scanUnordered(c, scan, innerScanner, expressions, aggregators, limit, offset, - localIndexScan, dataColumns, tupleProjector, indexMaintainers, dataRegion, - viewConstants); + return scanUnordered(c, scan, innerScanner, expressions, aggregators, limit); } } @@ -376,10 +376,7 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver { */ private RegionScanner scanUnordered(ObserverContext<RegionCoprocessorEnvironment> c, Scan scan, final RegionScanner s, final List<Expression> expressions, - final ServerAggregators aggregators, long limit, int offset, boolean localIndexScan, - ColumnReference[] dataColumns, TupleProjector tupleProjector, - List<IndexMaintainer> indexMaintainers, HRegion dataRegion, byte[][] viewConstants) - throws IOException { + final ServerAggregators aggregators, long limit) throws IOException { if (logger.isDebugEnabled()) { logger.debug(LogUtil.addCustomAnnotations("Grouped aggregation over unordered rows with scan " + scan + ", group by " + expressions + ", aggregators " + aggregators, ScanUtil.getCustomAnnotations(scan))); @@ -401,7 +398,6 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver { GroupByCacheFactory.INSTANCE.newCache( env, ScanUtil.getTenantId(scan), ScanUtil.getCustomAnnotations(scan), aggregators, estDistVals); - ImmutableBytesWritable tempPtr = new ImmutableBytesWritable(); boolean success = false; try { boolean hasMore; @@ -423,11 +419,6 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver { // ones returned hasMore = s.nextRaw(results); if (!results.isEmpty()) { - if (localIndexScan) { - IndexUtil.wrapResultUsingOffset(results, offset, dataColumns, tupleProjector, - dataRegion, indexMaintainers == null ? null : indexMaintainers.get(0), - viewConstants, tempPtr); - } result.setKeyValues(results); ImmutableBytesWritable key = TupleUtil.getConcatenatedValue(result, expressions); @@ -463,16 +454,12 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver { */ private RegionScanner scanOrdered(final ObserverContext<RegionCoprocessorEnvironment> c, final Scan scan, final RegionScanner s, final List<Expression> expressions, - final ServerAggregators aggregators, final long limit, final int offset, - final boolean localIndexScan, final ColumnReference[] dataColumns, - final TupleProjector tupleProjector, final List<IndexMaintainer> indexMaintainers, - final HRegion dataRegion, final byte[][] viewConstants) throws IOException { + final ServerAggregators aggregators, final long limit) throws IOException { if (logger.isDebugEnabled()) { logger.debug(LogUtil.addCustomAnnotations("Grouped aggregation over ordered rows with scan " + scan + ", group by " + expressions + ", aggregators " + aggregators, ScanUtil.getCustomAnnotations(scan))); } - final ImmutableBytesWritable tempPtr = new ImmutableBytesWritable(); return new BaseRegionScanner() { private long rowCount = 0; private ImmutableBytesWritable currentKey = null; @@ -510,11 +497,6 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver { // ones returned hasMore = s.nextRaw(kvs); if (!kvs.isEmpty()) { - if (localIndexScan) { - IndexUtil.wrapResultUsingOffset(kvs, offset, dataColumns, tupleProjector, - dataRegion, indexMaintainers == null ? null : indexMaintainers.get(0), - viewConstants, tempPtr); - } result.setKeyValues(kvs); key = TupleUtil.getConcatenatedValue(result, expressions); aggBoundary = currentKey != null && currentKey.compareTo(key) != 0; http://git-wip-us.apache.org/repos/asf/phoenix/blob/f9ca8816/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java index 1672fd7..f0ae4a2 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java @@ -28,17 +28,13 @@ import java.util.Set; import com.google.common.collect.Sets; import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.KeyValue.Type; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.coprocessor.ObserverContext; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.RegionScanner; -import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.WritableUtils; import org.apache.phoenix.cache.GlobalCache; import org.apache.phoenix.cache.TenantCache; @@ -54,17 +50,14 @@ import org.apache.phoenix.iterate.RegionScannerResultIterator; import org.apache.phoenix.iterate.ResultIterator; import org.apache.phoenix.join.HashJoinInfo; import org.apache.phoenix.memory.MemoryManager.MemoryChunk; -import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.schema.KeyValueSchema; import org.apache.phoenix.schema.KeyValueSchema.KeyValueSchemaBuilder; import org.apache.phoenix.schema.ValueBitSet; -import org.apache.phoenix.schema.tuple.MultiKeyValueTuple; import org.apache.phoenix.schema.tuple.Tuple; import org.apache.phoenix.util.IndexUtil; import org.apache.phoenix.util.ScanUtil; import org.apache.phoenix.util.ServerUtil; -import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; @@ -187,14 +180,7 @@ public class ScanRegionObserver extends BaseScannerRegionObserver { ScanUtil.setRowKeyOffset(scan, offset); } - final TupleProjector p = TupleProjector.deserializeProjectorFromScan(scan); - final HashJoinInfo j = HashJoinInfo.deserializeHashJoinFromScan(scan); - final ImmutableBytesWritable tenantId = ScanUtil.getTenantId(scan); - RegionScanner innerScanner = s; - if (p != null || j != null) { - innerScanner = new HashJoinRegionScanner(s, p, j, tenantId, c.getEnvironment()); - } Set<KeyValueColumnExpression> arrayKVRefs = Sets.newHashSet(); Expression[] arrayFuncRefs = deserializeArrayPostionalExpressionInfoFromScan( @@ -214,7 +200,16 @@ public class ScanRegionObserver extends BaseScannerRegionObserver { } innerScanner = getWrappedScanner(c, innerScanner, arrayKVRefs, arrayFuncRefs, offset, scan, - dataColumns, tupleProjector, dataRegion, indexMaintainer, viewConstants); + dataColumns, tupleProjector, dataRegion, indexMaintainer, viewConstants, + kvSchema, kvSchemaBitSet, ptr); + + final TupleProjector p = TupleProjector.deserializeProjectorFromScan(scan); + final HashJoinInfo j = HashJoinInfo.deserializeHashJoinFromScan(scan); + final ImmutableBytesWritable tenantId = ScanUtil.getTenantId(scan); + if (p != null || j != null) { + innerScanner = new HashJoinRegionScanner(innerScanner, p, j, tenantId, c.getEnvironment()); + } + final OrderedResultIterator iterator = deserializeFromScan(scan,innerScanner); if (iterator == null) { return innerScanner; @@ -296,155 +291,6 @@ public class ScanRegionObserver extends BaseScannerRegionObserver { }; } - /** - * Return wrapped scanner that catches unexpected exceptions (i.e. Phoenix bugs) and - * re-throws as DoNotRetryIOException to prevent needless retrying hanging the query - * for 30 seconds. Unfortunately, until HBASE-7481 gets fixed, there's no way to do - * the same from a custom filter. - * @param arrayFuncRefs - * @param arrayKVRefs - * @param offset starting position in the rowkey. - * @param scan - * @param tupleProjector - * @param dataRegion - * @param indexMaintainer - * @param viewConstants - */ - private RegionScanner getWrappedScanner(final ObserverContext<RegionCoprocessorEnvironment> c, - final RegionScanner s, final Set<KeyValueColumnExpression> arrayKVRefs, - final Expression[] arrayFuncRefs, final int offset, final Scan scan, - final ColumnReference[] dataColumns, final TupleProjector tupleProjector, - final HRegion dataRegion, final IndexMaintainer indexMaintainer, - final byte[][] viewConstants) { - return new RegionScanner() { - - @Override - public boolean next(List<Cell> results) throws IOException { - try { - return s.next(results); - } catch (Throwable t) { - ServerUtil.throwIOException(c.getEnvironment().getRegion().getRegionNameAsString(), t); - return false; // impossible - } - } - - @Override - public boolean next(List<Cell> result, int limit) throws IOException { - try { - return s.next(result, limit); - } catch (Throwable t) { - ServerUtil.throwIOException(c.getEnvironment().getRegion().getRegionNameAsString(), t); - return false; // impossible - } - } - - @Override - public void close() throws IOException { - s.close(); - } - - @Override - public HRegionInfo getRegionInfo() { - return s.getRegionInfo(); - } - - @Override - public boolean isFilterDone() throws IOException { - return s.isFilterDone(); - } - - @Override - public boolean reseek(byte[] row) throws IOException { - return s.reseek(row); - } - - @Override - public long getMvccReadPoint() { - return s.getMvccReadPoint(); - } - - @Override - public boolean nextRaw(List<Cell> result) throws IOException { - try { - boolean next = s.nextRaw(result); - if (result.size() == 0) { - return next; - } - if (arrayFuncRefs != null && arrayFuncRefs.length > 0 && arrayKVRefs.size() > 0) { - replaceArrayIndexElement(arrayKVRefs, arrayFuncRefs, result); - } - if (ScanUtil.isLocalIndex(scan)) { - IndexUtil.wrapResultUsingOffset(result, offset, dataColumns, tupleProjector, dataRegion, indexMaintainer, viewConstants, ptr); - } - // There is a scanattribute set to retrieve the specific array element - return next; - } catch (Throwable t) { - ServerUtil.throwIOException(c.getEnvironment().getRegion().getRegionNameAsString(), t); - return false; // impossible - } - } - - @Override - public boolean nextRaw(List<Cell> result, int limit) throws IOException { - try { - boolean next = s.nextRaw(result, limit); - if (result.size() == 0) { - return next; - } - if (arrayFuncRefs != null && arrayFuncRefs.length > 0 && arrayKVRefs.size() > 0) { - replaceArrayIndexElement(arrayKVRefs, arrayFuncRefs, result); - } - if (offset > 0 || ScanUtil.isLocalIndex(scan)) { - IndexUtil.wrapResultUsingOffset(result, offset, dataColumns, tupleProjector, dataRegion, indexMaintainer, viewConstants, ptr); - } - // There is a scanattribute set to retrieve the specific array element - return next; - } catch (Throwable t) { - ServerUtil.throwIOException(c.getEnvironment().getRegion().getRegionNameAsString(), t); - return false; // impossible - } - } - - private void replaceArrayIndexElement(final Set<KeyValueColumnExpression> arrayKVRefs, - final Expression[] arrayFuncRefs, List<Cell> result) { - // make a copy of the results array here, as we're modifying it below - MultiKeyValueTuple tuple = new MultiKeyValueTuple(ImmutableList.copyOf(result)); - // The size of both the arrays would be same? - // Using KeyValueSchema to set and retrieve the value - // collect the first kv to get the row - Cell rowKv = result.get(0); - for (KeyValueColumnExpression kvExp : arrayKVRefs) { - if (kvExp.evaluate(tuple, ptr)) { - for (int idx = tuple.size() - 1; idx >= 0; idx--) { - Cell kv = tuple.getValue(idx); - if (Bytes.equals(kvExp.getColumnFamily(), 0, kvExp.getColumnFamily().length, - kv.getFamilyArray(), kv.getFamilyOffset(), kv.getFamilyLength()) - && Bytes.equals(kvExp.getColumnName(), 0, kvExp.getColumnName().length, - kv.getQualifierArray(), kv.getQualifierOffset(), kv.getQualifierLength())) { - // remove the kv that has the full array values. - result.remove(idx); - break; - } - } - } - } - byte[] value = kvSchema.toBytes(tuple, arrayFuncRefs, - kvSchemaBitSet, ptr); - // Add a dummy kv with the exact value of the array index - result.add(new KeyValue(rowKv.getRowArray(), rowKv.getRowOffset(), rowKv.getRowLength(), - QueryConstants.ARRAY_VALUE_COLUMN_FAMILY, 0, QueryConstants.ARRAY_VALUE_COLUMN_FAMILY.length, - QueryConstants.ARRAY_VALUE_COLUMN_QUALIFIER, 0, - QueryConstants.ARRAY_VALUE_COLUMN_QUALIFIER.length, HConstants.LATEST_TIMESTAMP, - Type.codeToType(rowKv.getTypeByte()), value, 0, value.length)); - } - - @Override - public long getMaxResultSize() { - return s.getMaxResultSize(); - } - }; - } - @Override protected boolean isRegionObserverFor(Scan scan) { return scan.getAttribute(BaseScannerRegionObserver.NON_AGGREGATE_QUERY) != null;