Repository: phoenix
Updated Branches:
  refs/heads/txn 99a180205 -> d10a52659


Inner Join with any table or view with Multi_Tenant=true causes "could not find 
hash cache for joinId" error


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/c75d2202
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/c75d2202
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/c75d2202

Branch: refs/heads/txn
Commit: c75d2202ba4bb18b2abfd61fb9b4713f3de88d3a
Parents: ef2fa43
Author: maryannxue <[email protected]>
Authored: Thu Nov 12 11:25:07 2015 -0500
Committer: maryannxue <[email protected]>
Committed: Thu Nov 12 11:25:07 2015 -0500

----------------------------------------------------------------------
 .../org/apache/phoenix/end2end/HashJoinIT.java  | 63 ++++++++++++++++++++
 .../apache/phoenix/cache/ServerCacheClient.java | 14 ++++-
 .../apache/phoenix/execute/BaseQueryPlan.java   | 18 +++---
 .../java/org/apache/phoenix/util/ScanUtil.java  |  7 +++
 4 files changed, 89 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/c75d2202/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinIT.java
index 88e03ca..433f74f 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinIT.java
@@ -34,6 +34,7 @@ import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+import java.sql.Array;
 import java.sql.Connection;
 import java.sql.Date;
 import java.sql.DriverManager;
@@ -3867,6 +3868,68 @@ public class HashJoinIT extends BaseHBaseManagedTimeIT {
             conn.close();
         }
     }
+    
+    // PHOENIX-2381
+    @Test
+    public void testJoinWithMultiTenancy() throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        try {
+            conn.createStatement().execute("CREATE TABLE INVENTORY (" +
+                            " TENANTID UNSIGNED_INT NOT NULL" +
+                            ",ID UNSIGNED_INT NOT NULL" +
+                            ",FOO UNSIGNED_INT NOT NULL" +
+                            ",TIMESTAMP UNSIGNED_LONG NOT NULL" +
+                            ",CODES INTEGER ARRAY[] NOT NULL" +
+                            ",V UNSIGNED_LONG" +
+                            " CONSTRAINT pk PRIMARY KEY (TENANTID, ID, FOO, 
TIMESTAMP, CODES))" +
+                            " DEFAULT_COLUMN_FAMILY ='E'," +
+                            " MULTI_TENANT=true");
+            PreparedStatement upsertStmt = conn.prepareStatement(
+                    "upsert into INVENTORY "
+                    + "(tenantid, id, foo, timestamp, codes) "
+                    + "values (?, ?, ?, ?, ?)");
+            upsertStmt.setInt(1, 15);
+            upsertStmt.setInt(2, 5);
+            upsertStmt.setInt(3, 0);
+            upsertStmt.setLong(4, 6);
+            Array array = conn.createArrayOf("INTEGER", new Object[] {1, 2});
+            upsertStmt.setArray(5, array);
+            upsertStmt.executeUpdate();
+            conn.commit();
+            
+            conn.createStatement().execute("CREATE TABLE PRODUCT_IDS (" +
+                            " PRODUCT_ID UNSIGNED_INT NOT NULL" +
+                            ",PRODUCT_NAME VARCHAR" +
+                            " CONSTRAINT pk PRIMARY KEY (PRODUCT_ID))" +
+                            " DEFAULT_COLUMN_FAMILY ='NAME'");
+            upsertStmt = conn.prepareStatement(
+                    "upsert into PRODUCT_IDS "
+                    + "(product_id, product_name) "
+                    + "values (?, ?)");
+            upsertStmt.setInt(1, 5);
+            upsertStmt.setString(2, "DUMMY");
+            upsertStmt.executeUpdate();
+            conn.commit();
+            conn.close();            
+
+            // Create a tenant-specific connection.
+            props.setProperty("TenantId", "15");
+            conn = DriverManager.getConnection(getUrl(), props);
+            ResultSet rs = conn.createStatement().executeQuery(
+                    "SELECT * FROM INVENTORY INNER JOIN PRODUCT_IDS ON 
(PRODUCT_ID = INVENTORY.ID)");
+            assertTrue(rs.next());
+            assertEquals(rs.getInt(1), 5);
+            assertFalse(rs.next());
+            rs = conn.createStatement().executeQuery(
+                    "SELECT * FROM INVENTORY RIGHT JOIN PRODUCT_IDS ON 
(PRODUCT_ID = INVENTORY.ID)");
+            assertTrue(rs.next());
+            assertEquals(rs.getInt(1), 5);
+            assertFalse(rs.next());
+        } finally {
+            conn.close();
+        }
+    }
 }
 
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/c75d2202/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java 
b/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java
index 9ad9ef5..3cfdd71 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java
@@ -161,7 +161,7 @@ public class ServerCacheClient {
         ExecutorService executor = services.getExecutor();
         List<Future<Boolean>> futures = Collections.emptyList();
         try {
-            PTable cacheUsingTable = cacheUsingTableRef.getTable();
+            final PTable cacheUsingTable = cacheUsingTableRef.getTable();
             List<HRegionLocation> locations = 
services.getAllTableRegions(cacheUsingTable.getPhysicalName().getBytes());
             int nRegions = locations.size();
             // Size these based on worst case
@@ -196,7 +196,17 @@ public class ServerCacheClient {
                                                             new 
BlockingRpcCallback<AddServerCacheResponse>();
                                                     
AddServerCacheRequest.Builder builder = AddServerCacheRequest.newBuilder();
                                                     
if(connection.getTenantId() != null){
-                                                        
builder.setTenantId(ByteStringer.wrap(connection.getTenantId().getBytes()));
+                                                        try {
+                                                            byte[] 
tenantIdBytes =
+                                                                    
ScanUtil.getTenantIdBytes(
+                                                                            
cacheUsingTable.getRowKeySchema(),
+                                                                            
cacheUsingTable.getBucketNum()!=null,
+                                                                            
connection.getTenantId(),
+                                                                            
cacheUsingTable.isMultiTenant());
+                                                            
builder.setTenantId(ByteStringer.wrap(tenantIdBytes));
+                                                        } catch (SQLException 
e) {
+                                                            new IOException(e);
+                                                        }
                                                     }
                                                     
builder.setCacheId(ByteStringer.wrap(cacheId));
                                                     
builder.setCachePtr(org.apache.phoenix.protobuf.ProtobufUtil.toProto(cachePtr));

http://git-wip-us.apache.org/repos/asf/phoenix/blob/c75d2202/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
index e873df7..1768621 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
@@ -225,17 +225,13 @@ public abstract class BaseQueryPlan implements QueryPlan {
         } catch (IOException e) {
             throw new RuntimeException(e);
         }
-        byte[] tenantIdBytes;
-        if( table.isMultiTenant() == true ) {
-            tenantIdBytes = connection.getTenantId() == null ? null :
-                    ScanUtil.getTenantIdBytes(
-                            table.getRowKeySchema(),
-                            table.getBucketNum()!=null,
-                            connection.getTenantId());
-        } else {
-            tenantIdBytes = connection.getTenantId() == null ? null : 
connection.getTenantId().getBytes();
-        }
-
+        byte[] tenantIdBytes = connection.getTenantId() == null ?
+                  null
+                : ScanUtil.getTenantIdBytes(
+                        table.getRowKeySchema(),
+                        table.getBucketNum()!=null,
+                        connection.getTenantId(),
+                        table.isMultiTenant());
         ScanUtil.setTenantId(scan, tenantIdBytes);
         String customAnnotations = 
LogUtil.customAnnotationsToString(connection);
         ScanUtil.setCustomAnnotations(scan, customAnnotations == null ? null : 
customAnnotations.getBytes());

http://git-wip-us.apache.org/repos/asf/phoenix/blob/c75d2202/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java 
b/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
index 641398f..183bfa7 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
@@ -692,6 +692,13 @@ public class ScanUtil {
         return Bytes.compareTo(key, 0, nBytesToCheck, ZERO_BYTE_ARRAY, 0, 
nBytesToCheck) != 0;
     }
 
+    public static byte[] getTenantIdBytes(RowKeySchema schema, boolean 
isSalted, PName tenantId, boolean isMultiTenantTable)
+            throws SQLException {
+        return isMultiTenantTable ?
+                  getTenantIdBytes(schema, isSalted, tenantId)
+                : tenantId.getBytes();
+    }
+
     public static byte[] getTenantIdBytes(RowKeySchema schema, boolean 
isSalted, PName tenantId)
             throws SQLException {
         int pkPos = isSalted ? 1 : 0;

Reply via email to