Repository: phoenix Updated Branches: refs/heads/txn 99a180205 -> d10a52659
Inner Join with any table or view with Multi_Tenant=true causes "could not find hash cache for joinId" error Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/c75d2202 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/c75d2202 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/c75d2202 Branch: refs/heads/txn Commit: c75d2202ba4bb18b2abfd61fb9b4713f3de88d3a Parents: ef2fa43 Author: maryannxue <[email protected]> Authored: Thu Nov 12 11:25:07 2015 -0500 Committer: maryannxue <[email protected]> Committed: Thu Nov 12 11:25:07 2015 -0500 ---------------------------------------------------------------------- .../org/apache/phoenix/end2end/HashJoinIT.java | 63 ++++++++++++++++++++ .../apache/phoenix/cache/ServerCacheClient.java | 14 ++++- .../apache/phoenix/execute/BaseQueryPlan.java | 18 +++--- .../java/org/apache/phoenix/util/ScanUtil.java | 7 +++ 4 files changed, 89 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/c75d2202/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinIT.java index 88e03ca..433f74f 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinIT.java @@ -34,6 +34,7 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import java.sql.Array; import java.sql.Connection; import java.sql.Date; import java.sql.DriverManager; @@ -3867,6 +3868,68 @@ public class HashJoinIT extends BaseHBaseManagedTimeIT { conn.close(); } } + + // PHOENIX-2381 + @Test + public void testJoinWithMultiTenancy() throws Exception { + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + Connection conn = DriverManager.getConnection(getUrl(), props); + try { + conn.createStatement().execute("CREATE TABLE INVENTORY (" + + " TENANTID UNSIGNED_INT NOT NULL" + + ",ID UNSIGNED_INT NOT NULL" + + ",FOO UNSIGNED_INT NOT NULL" + + ",TIMESTAMP UNSIGNED_LONG NOT NULL" + + ",CODES INTEGER ARRAY[] NOT NULL" + + ",V UNSIGNED_LONG" + + " CONSTRAINT pk PRIMARY KEY (TENANTID, ID, FOO, TIMESTAMP, CODES))" + + " DEFAULT_COLUMN_FAMILY ='E'," + + " MULTI_TENANT=true"); + PreparedStatement upsertStmt = conn.prepareStatement( + "upsert into INVENTORY " + + "(tenantid, id, foo, timestamp, codes) " + + "values (?, ?, ?, ?, ?)"); + upsertStmt.setInt(1, 15); + upsertStmt.setInt(2, 5); + upsertStmt.setInt(3, 0); + upsertStmt.setLong(4, 6); + Array array = conn.createArrayOf("INTEGER", new Object[] {1, 2}); + upsertStmt.setArray(5, array); + upsertStmt.executeUpdate(); + conn.commit(); + + conn.createStatement().execute("CREATE TABLE PRODUCT_IDS (" + + " PRODUCT_ID UNSIGNED_INT NOT NULL" + + ",PRODUCT_NAME VARCHAR" + + " CONSTRAINT pk PRIMARY KEY (PRODUCT_ID))" + + " DEFAULT_COLUMN_FAMILY ='NAME'"); + upsertStmt = conn.prepareStatement( + "upsert into PRODUCT_IDS " + + "(product_id, product_name) " + + "values (?, ?)"); + upsertStmt.setInt(1, 5); + upsertStmt.setString(2, "DUMMY"); + upsertStmt.executeUpdate(); + conn.commit(); + conn.close(); + + // Create a tenant-specific connection. + props.setProperty("TenantId", "15"); + conn = DriverManager.getConnection(getUrl(), props); + ResultSet rs = conn.createStatement().executeQuery( + "SELECT * FROM INVENTORY INNER JOIN PRODUCT_IDS ON (PRODUCT_ID = INVENTORY.ID)"); + assertTrue(rs.next()); + assertEquals(rs.getInt(1), 5); + assertFalse(rs.next()); + rs = conn.createStatement().executeQuery( + "SELECT * FROM INVENTORY RIGHT JOIN PRODUCT_IDS ON (PRODUCT_ID = INVENTORY.ID)"); + assertTrue(rs.next()); + assertEquals(rs.getInt(1), 5); + assertFalse(rs.next()); + } finally { + conn.close(); + } + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/c75d2202/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java b/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java index 9ad9ef5..3cfdd71 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java @@ -161,7 +161,7 @@ public class ServerCacheClient { ExecutorService executor = services.getExecutor(); List<Future<Boolean>> futures = Collections.emptyList(); try { - PTable cacheUsingTable = cacheUsingTableRef.getTable(); + final PTable cacheUsingTable = cacheUsingTableRef.getTable(); List<HRegionLocation> locations = services.getAllTableRegions(cacheUsingTable.getPhysicalName().getBytes()); int nRegions = locations.size(); // Size these based on worst case @@ -196,7 +196,17 @@ public class ServerCacheClient { new BlockingRpcCallback<AddServerCacheResponse>(); AddServerCacheRequest.Builder builder = AddServerCacheRequest.newBuilder(); if(connection.getTenantId() != null){ - builder.setTenantId(ByteStringer.wrap(connection.getTenantId().getBytes())); + try { + byte[] tenantIdBytes = + ScanUtil.getTenantIdBytes( + cacheUsingTable.getRowKeySchema(), + cacheUsingTable.getBucketNum()!=null, + connection.getTenantId(), + cacheUsingTable.isMultiTenant()); + builder.setTenantId(ByteStringer.wrap(tenantIdBytes)); + } catch (SQLException e) { + new IOException(e); + } } builder.setCacheId(ByteStringer.wrap(cacheId)); builder.setCachePtr(org.apache.phoenix.protobuf.ProtobufUtil.toProto(cachePtr)); http://git-wip-us.apache.org/repos/asf/phoenix/blob/c75d2202/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java index e873df7..1768621 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java @@ -225,17 +225,13 @@ public abstract class BaseQueryPlan implements QueryPlan { } catch (IOException e) { throw new RuntimeException(e); } - byte[] tenantIdBytes; - if( table.isMultiTenant() == true ) { - tenantIdBytes = connection.getTenantId() == null ? null : - ScanUtil.getTenantIdBytes( - table.getRowKeySchema(), - table.getBucketNum()!=null, - connection.getTenantId()); - } else { - tenantIdBytes = connection.getTenantId() == null ? null : connection.getTenantId().getBytes(); - } - + byte[] tenantIdBytes = connection.getTenantId() == null ? + null + : ScanUtil.getTenantIdBytes( + table.getRowKeySchema(), + table.getBucketNum()!=null, + connection.getTenantId(), + table.isMultiTenant()); ScanUtil.setTenantId(scan, tenantIdBytes); String customAnnotations = LogUtil.customAnnotationsToString(connection); ScanUtil.setCustomAnnotations(scan, customAnnotations == null ? null : customAnnotations.getBytes()); http://git-wip-us.apache.org/repos/asf/phoenix/blob/c75d2202/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java index 641398f..183bfa7 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java @@ -692,6 +692,13 @@ public class ScanUtil { return Bytes.compareTo(key, 0, nBytesToCheck, ZERO_BYTE_ARRAY, 0, nBytesToCheck) != 0; } + public static byte[] getTenantIdBytes(RowKeySchema schema, boolean isSalted, PName tenantId, boolean isMultiTenantTable) + throws SQLException { + return isMultiTenantTable ? + getTenantIdBytes(schema, isSalted, tenantId) + : tenantId.getBytes(); + } + public static byte[] getTenantIdBytes(RowKeySchema schema, boolean isSalted, PName tenantId) throws SQLException { int pkPos = isSalted ? 1 : 0;
