This is an automated email from the ASF dual-hosted git repository.

vjasani pushed a commit to branch 4.16
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/4.16 by this push:
     new 2605cc2  PHOENIX-6506 : Tenant Connection is not able to 
access/validate Global Sequences (#1270) (#1261)
2605cc2 is described below

commit 2605cc233e206678a72e02322a76b4b0bbf180f7
Author: Lokesh Khurana <khuranalokes...@gmail.com>
AuthorDate: Thu Jul 29 21:20:33 2021 +0530

    PHOENIX-6506 : Tenant Connection is not able to access/validate Global 
Sequences (#1270) (#1261)
    
    Signed-off-by: Viraj Jasani <vjas...@apache.org>
---
 .../org/apache/phoenix/end2end/UpsertValuesIT.java | 148 ++++++++++++++++++++-
 .../phoenix/query/ConnectionQueryServicesImpl.java |  40 +++++-
 2 files changed, 186 insertions(+), 2 deletions(-)

diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertValuesIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertValuesIT.java
index e89cc58..32662ec 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertValuesIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertValuesIT.java
@@ -44,11 +44,15 @@ import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.schema.SequenceNotFoundException;
 import org.apache.phoenix.schema.types.PInteger;
 import org.apache.phoenix.util.DateUtil;
 import org.apache.phoenix.util.EnvironmentEdgeManager;
 import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.TestUtil;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.junit.Assert;
 import org.junit.Test;
 
 
@@ -670,5 +674,147 @@ public class UpsertValuesIT extends 
ParallelStatsDisabledIT {
             assertTrue(next.containsColumn(Bytes.toBytes("CF2"), 
PInteger.INSTANCE.toBytes(3)));
         }
     }
-    
+
+
+    @Test
+    public void testUpsertValueWithDiffSequenceAndConnections() throws 
Exception {
+        String tableName = generateUniqueName();
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            conn.setAutoCommit(true);
+            PreparedStatement createTableStatement = 
conn.prepareStatement(String.format("CREATE TABLE IF NOT EXISTS " +
+                    "%s (SERVICE VARCHAR NOT NULL, SEQUENCE_NUMBER BIGINT NOT 
NULL , " +
+                    "CONSTRAINT PK PRIMARY KEY (SERVICE, SEQUENCE_NUMBER)) 
MULTI_TENANT = TRUE", tableName));
+            createTableStatement.execute();
+        }
+
+        testGlobalSequenceUpsertWithTenantConnection(tableName);
+        testGlobalSequenceUpsertWithGlobalConnection(tableName);
+        testTenantSequenceUpsertWithSameTenantConnection(tableName);
+        testTenantSequenceUpsertWithDifferentTenantConnection(tableName);
+        testTenantSequenceUpsertWithGlobalConnection(tableName);
+
+    }
+
+    private void testTenantSequenceUpsertWithGlobalConnection(String 
tableName) throws Exception {
+        String sequenceName = generateUniqueSequenceName();
+
+        try (Connection conn = getTenantConnection("PHOENIX")) {
+            conn.setAutoCommit(true);
+            PreparedStatement createSequenceStatement = 
conn.prepareStatement(String.format("CREATE SEQUENCE " +
+                    "IF NOT EXISTS %s", sequenceName));
+            createSequenceStatement.execute();
+        }
+
+        try (Connection tenantConn = DriverManager.getConnection(getUrl())) {
+            tenantConn.setAutoCommit(true);
+            Statement executeUpdateStatement = tenantConn.createStatement();
+            try {
+                executeUpdateStatement.execute(String.format("UPSERT INTO %s ( 
SERVICE, SEQUENCE_NUMBER) VALUES " +
+                        "( 'PHOENIX', NEXT VALUE FOR %s)", tableName, 
sequenceName));
+                Assert.fail();
+            } catch (SequenceNotFoundException e) {
+                assertTrue(true);
+            } catch (Exception e) {
+                Assert.fail();
+            }
+        }
+    }
+
+    private void testTenantSequenceUpsertWithDifferentTenantConnection(String 
tableName) throws Exception {
+        String sequenceName = generateUniqueSequenceName();
+
+        try (Connection conn = getTenantConnection("PHOENIX")) {
+            conn.setAutoCommit(true);
+            PreparedStatement createSequenceStatement = 
conn.prepareStatement(String.format("CREATE SEQUENCE " +
+                    "IF NOT EXISTS %s", sequenceName));
+            createSequenceStatement.execute();
+        }
+
+        try (Connection tenantConn = getTenantConnection("HBASE")) {
+            tenantConn.setAutoCommit(true);
+
+            Statement executeUpdateStatement = tenantConn.createStatement();
+            try {
+                executeUpdateStatement.execute(String.format("UPSERT INTO %s ( 
SEQUENCE_NUMBER) VALUES " +
+                        "( NEXT VALUE FOR %s)", tableName, sequenceName));
+                Assert.fail();
+            } catch (SequenceNotFoundException e) {
+                assertTrue(true);
+            } catch (Exception e) {
+                Assert.fail();
+            }
+        }
+    }
+
+    private void testTenantSequenceUpsertWithSameTenantConnection(String 
tableName) throws Exception {
+        String sequenceName = generateUniqueSequenceName();
+
+        try (Connection conn = getTenantConnection("ZOOKEEPER")) {
+            conn.setAutoCommit(true);
+            PreparedStatement createSequenceStatement = 
conn.prepareStatement(String.format("CREATE SEQUENCE " +
+                    "IF NOT EXISTS %s", sequenceName));
+            createSequenceStatement.execute();
+            Statement executeUpdateStatement = conn.createStatement();
+            executeUpdateStatement.execute(String.format("UPSERT INTO %s ( 
SEQUENCE_NUMBER) VALUES " +
+                    "( NEXT VALUE FOR %s)", tableName, sequenceName));
+            ResultSet rs = executeUpdateStatement.executeQuery("select * from 
" + tableName);
+            assertTrue(rs.next());
+            assertEquals("1", rs.getString(1));
+            assertFalse(rs.next());
+        }
+
+    }
+
+    private void testGlobalSequenceUpsertWithGlobalConnection(String 
tableName) throws Exception {
+        String sequenceName = generateUniqueSequenceName();
+
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            conn.setAutoCommit(true);
+            PreparedStatement createSequenceStatement = 
conn.prepareStatement(String.format("CREATE SEQUENCE " +
+                    "IF NOT EXISTS %s", sequenceName));
+            createSequenceStatement.execute();
+            Statement executeUpdateStatement = conn.createStatement();
+            executeUpdateStatement.execute(String.format("UPSERT INTO %s ( 
SERVICE, SEQUENCE_NUMBER) VALUES " +
+                    "( 'PHOENIX', NEXT VALUE FOR %s)", tableName, 
sequenceName));
+            ResultSet rs = executeUpdateStatement.executeQuery("select * from 
" + tableName);
+            assertTrue(rs.next());
+            assertEquals("HBASE", rs.getString(1));
+            assertEquals("1", rs.getString(2));
+            assertTrue(rs.next());
+            assertEquals("PHOENIX", rs.getString(1));
+            assertEquals("1", rs.getString(2));
+            assertFalse(rs.next());
+        }
+    }
+
+    private void testGlobalSequenceUpsertWithTenantConnection(String 
tableName) throws Exception {
+        String sequenceName = generateUniqueSequenceName();
+
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            conn.setAutoCommit(true);
+            PreparedStatement createSequenceStatement = 
conn.prepareStatement(String.format("CREATE SEQUENCE " +
+                    "IF NOT EXISTS %s", sequenceName));
+            createSequenceStatement.execute();
+        }
+
+        try (Connection tenantConn = getTenantConnection("HBASE")) {
+            tenantConn.setAutoCommit(true);
+
+            Statement executeUpdateStatement = tenantConn.createStatement();
+            executeUpdateStatement.execute(String.format("UPSERT INTO %s ( 
SEQUENCE_NUMBER) VALUES " +
+                    "( NEXT VALUE FOR %s)", tableName, sequenceName));
+
+            ResultSet rs = executeUpdateStatement.executeQuery("select * from 
" + tableName);
+            assertTrue(rs.next());
+            assertEquals("1", rs.getString(1));
+            assertFalse(rs.next());
+
+        }
+    }
+
+    private static Connection getTenantConnection(String tenantId) throws 
Exception {
+        Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES);
+        props.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId);
+        return DriverManager.getConnection(getUrl(), props);
+    }
 }
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index 7fdffdd..64ae6a6 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -4949,7 +4949,7 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
         for (SequenceAllocation sequenceAllocation : sequenceAllocations) {
             SequenceKey key = sequenceAllocation.getSequenceKey();
             Sequence newSequences = new Sequence(key);
-            Sequence sequence = sequenceMap.putIfAbsent(key, newSequences);
+            Sequence sequence = getSequence(sequenceAllocation);
             if (sequence == null) {
                 sequence = newSequences;
             }
@@ -5022,6 +5022,44 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
         }
     }
 
+    /**
+     * checks if sequenceAllocation's sequence there in sequenceMap, also 
returns Global Sequences
+     * from Tenant sequenceAllocations
+     * @param sequenceAllocation
+     * @return
+     */
+
+    private Sequence getSequence(SequenceAllocation sequenceAllocation) {
+        SequenceKey key = sequenceAllocation.getSequenceKey();
+        if (key.getTenantId() == null) {
+            return sequenceMap.putIfAbsent(key, new Sequence(key));
+        } else {
+            Sequence sequence = sequenceMap.get(key);
+            if (sequence == null) {
+                for (Map.Entry<SequenceKey,Sequence> entry : 
sequenceMap.entrySet()) {
+                    if (compareSequenceKeysWithoutTenant(key, entry.getKey())) 
{
+                        return entry.getValue();
+                    }
+                }
+            } else {
+                return sequence;
+            }
+        }
+        return null;
+    }
+
+    private boolean compareSequenceKeysWithoutTenant(SequenceKey keyToCompare, 
SequenceKey availableKey) {
+        if (availableKey.getTenantId() != null) {
+            return false;
+        }
+        boolean sameSchema = keyToCompare.getSchemaName() == null ? 
availableKey.getSchemaName() == null :
+                
keyToCompare.getSchemaName().equals(availableKey.getSchemaName());
+        if (!sameSchema) {
+            return false;
+        }
+        return 
keyToCompare.getSequenceName().equals(availableKey.getSequenceName());
+    }
+
     @Override
     public void clearTableFromCache(final byte[] tenantId, final byte[] 
schemaName, final byte[] tableName,
             final long clientTS) throws SQLException {

Reply via email to