This is an automated email from the ASF dual-hosted git repository.

vjasani pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/master by this push:
     new d3ba58889e PHOENIX-7375 CQSI connection init from regionserver hosting 
SYSTEM.CATALOG does not require RPC calls to system tables (#1950)
d3ba58889e is described below

commit d3ba58889eea9360e75f46aa828f3bee0e003100
Author: Viraj Jasani <[email protected]>
AuthorDate: Tue Aug 13 18:04:54 2024 -0700

    PHOENIX-7375 CQSI connection init from regionserver hosting SYSTEM.CATALOG 
does not require RPC calls to system tables (#1950)
---
 .../phoenix/coprocessor/MetaDataEndpointImpl.java  |  91 ++++++++----
 .../end2end/MetadataServerConnectionsIT.java       | 164 +++++++++++++++++++++
 2 files changed, 226 insertions(+), 29 deletions(-)

diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
index b54fd8c1e9..9497ce01af 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
@@ -104,6 +104,7 @@ import static 
org.apache.phoenix.util.ViewUtil.getSystemTableForChildLinks;
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.security.PrivilegedExceptionAction;
+import java.sql.Connection;
 import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
 import java.sql.Statement;
@@ -258,6 +259,7 @@ import org.apache.phoenix.schema.types.PLong;
 import org.apache.phoenix.schema.types.PTinyint;
 import org.apache.phoenix.schema.types.PVarbinary;
 import org.apache.phoenix.schema.types.PVarchar;
+import org.apache.phoenix.thirdparty.com.google.common.base.Preconditions;
 import org.apache.phoenix.trace.util.Tracing;
 import org.apache.phoenix.transaction.TransactionFactory;
 import org.apache.phoenix.util.ByteUtil;
@@ -722,7 +724,8 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
             if (request.getClientVersion() < MIN_SPLITTABLE_SYSTEM_CATALOG
                     && table.getType() == PTableType.VIEW
                     && table.getViewType() != MAPPED) {
-                try (PhoenixConnection connection = 
QueryUtil.getConnectionOnServer(env.getConfiguration()).unwrap(PhoenixConnection.class))
 {
+                try (PhoenixConnection connection = 
getServerConnectionForMetaData(
+                        
env.getConfiguration()).unwrap(PhoenixConnection.class)) {
                     PTable pTable = 
connection.getTableNoCache(table.getParentName().getString());
                     table = ViewUtil.addDerivedColumnsFromParent(connection, 
table, pTable);
                 }
@@ -1577,7 +1580,8 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
                             // Hence, it is recommended to scan SYSTEM.CATALOG 
table again using
                             // separate CQSI connection as SYSTEM.CATALOG is 
splittable so the
                             // PTable with famName might be available on 
different region.
-                            try (PhoenixConnection connection = 
QueryUtil.getConnectionOnServer(env.getConfiguration()).unwrap(PhoenixConnection.class))
 {
+                            try (PhoenixConnection connection = 
getServerConnectionForMetaData(
+                                    
env.getConfiguration()).unwrap(PhoenixConnection.class)) {
                                 parentTable = 
connection.getTableNoCache(famName.getString());
                             } catch (TableNotFoundException e) {
                                 // It is ok to swallow this exception since 
this could be a view index and _IDX_ table is not there.
@@ -2542,7 +2546,8 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
                 // tableMetadata and set the view statement and partition 
column correctly
                 if (parentTable != null && 
parentTable.getAutoPartitionSeqName() != null) {
                     long autoPartitionNum = 1;
-                    try (PhoenixConnection connection = 
QueryUtil.getConnectionOnServer(env.getConfiguration()).unwrap(PhoenixConnection.class);
+                    try (PhoenixConnection connection = 
getServerConnectionForMetaData(
+                            
env.getConfiguration()).unwrap(PhoenixConnection.class);
                          Statement stmt = connection.createStatement()) {
                         String seqName = parentTable.getAutoPartitionSeqName();
                         // Not going through the standard route of using 
statement.execute() as that code path
@@ -2616,7 +2621,8 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
                 Long indexId = null;
                 if (request.hasAllocateIndexId() && 
request.getAllocateIndexId()) {
                     String tenantIdStr = tenantIdBytes.length == 0 ? null : 
Bytes.toString(tenantIdBytes);
-                    try (PhoenixConnection connection = 
QueryUtil.getConnectionOnServer(env.getConfiguration()).unwrap(PhoenixConnection.class))
 {
+                    try (PhoenixConnection connection = 
getServerConnectionForMetaData(
+                            
env.getConfiguration()).unwrap(PhoenixConnection.class)) {
                         PName physicalName = parentTable.getPhysicalName();
                         long seqValue = getViewIndexSequenceValue(connection, 
tenantIdStr, parentTable);
                         Put tableHeaderPut = 
MetaDataUtil.getPutOnlyTableHeaderRow(tableMetadata);
@@ -3022,9 +3028,8 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
                             && (clientVersion >= MIN_SPLITTABLE_SYSTEM_CATALOG 
||
                             
SchemaUtil.getPhysicalTableName(SYSTEM_CHILD_LINK_NAME_BYTES,
                                     
env.getConfiguration()).equals(hTable.getName()))) {
-                        try (PhoenixConnection conn =
-                                
QueryUtil.getConnectionOnServer(env.getConfiguration())
-                                    .unwrap(PhoenixConnection.class)) {
+                        try (PhoenixConnection conn = 
getServerConnectionForMetaData(
+                                
env.getConfiguration()).unwrap(PhoenixConnection.class)) {
                             ServerTask.addTask(new 
SystemTaskParams.SystemTaskParamsBuilder()
                                 .setConn(conn)
                                 .setTaskType(PTable.TaskType.DROP_CHILD_VIEWS)
@@ -3169,9 +3174,8 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
                 User.runAsLoginUser(new PrivilegedExceptionAction<Object>() {
                     @Override
                     public Object run() throws Exception {
-                        try (PhoenixConnection connection =
-                                     
QueryUtil.getConnectionOnServer(env.getConfiguration())
-                                             .unwrap(PhoenixConnection.class)) 
{
+                        try (PhoenixConnection connection = 
getServerConnectionForMetaData(
+                                
env.getConfiguration()).unwrap(PhoenixConnection.class)) {
                             try {
                                 MetaDataUtil.deleteFromStatsTable(connection, 
deletedTable,
                                         physicalTableNames, sharedTableStates);
@@ -3344,7 +3348,7 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
         }
 
         if (clientVersion < MIN_SPLITTABLE_SYSTEM_CATALOG && tableType == 
PTableType.VIEW) {
-            try (PhoenixConnection connection = 
QueryUtil.getConnectionOnServer(
+            try (PhoenixConnection connection = getServerConnectionForMetaData(
                     env.getConfiguration()).unwrap(PhoenixConnection.class)) {
                 PTable pTable = 
connection.getTableNoCache(table.getParentName().getString());
                 table = 
ViewUtil.addDerivedColumnsAndIndexesFromParent(connection, table, pTable);
@@ -3544,9 +3548,8 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
                     if (clientTimeStamp != HConstants.LATEST_TIMESTAMP) {
                         props.setProperty("CurrentSCN", 
Long.toString(clientTimeStamp));
                     }
-                    try (PhoenixConnection connection =
-                                 QueryUtil.getConnectionOnServer(props, 
env.getConfiguration())
-                                         .unwrap(PhoenixConnection.class)) {
+                    try (PhoenixConnection connection = 
getServerConnectionForMetaData(props,
+                            
env.getConfiguration()).unwrap(PhoenixConnection.class)) {
                         table = 
ViewUtil.addDerivedColumnsAndIndexesFromParent(connection, table,
                                 parentTable);
                     }
@@ -3722,7 +3725,7 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
                         invalidateAllChildTablesAndIndexes(table, childViews);
                     }
                     if (clientVersion < MIN_SPLITTABLE_SYSTEM_CATALOG && type 
== PTableType.VIEW) {
-                        try (PhoenixConnection connection = 
QueryUtil.getConnectionOnServer(
+                        try (PhoenixConnection connection = 
getServerConnectionForMetaData(
                                 
env.getConfiguration()).unwrap(PhoenixConnection.class)) {
                             PTable pTable = connection.getTableNoCache(
                                     table.getParentName().getString());
@@ -3759,11 +3762,7 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
                     + " phoenix.metadata.invalidate.cache.enabled is set to 
false");
             return;
         }
-        Properties properties = new Properties();
-        // Skip checking of system table existence since the system tables 
should have created
-        // by now.
-        properties.setProperty(SKIP_SYSTEM_TABLES_EXISTENCE_CHECK, "true");
-        try (PhoenixConnection connection = 
QueryUtil.getConnectionOnServer(properties,
+        try (PhoenixConnection connection = getServerConnectionForMetaData(
                 env.getConfiguration()).unwrap(PhoenixConnection.class)) {
             ConnectionQueryServices queryServices = 
connection.getQueryServices();
             queryServices.invalidateServerMetadataCache(requests);
@@ -3805,9 +3804,8 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
         if (clientTimeStamp != HConstants.LATEST_TIMESTAMP) {
             props.setProperty("CurrentSCN", Long.toString(clientTimeStamp));
         }
-        try (PhoenixConnection connection =
-                     QueryUtil.getConnectionOnServer(props, 
env.getConfiguration())
-                             .unwrap(PhoenixConnection.class)) {
+        try (PhoenixConnection connection = 
getServerConnectionForMetaData(props,
+                env.getConfiguration()).unwrap(PhoenixConnection.class)) {
             ConnectionQueryServices queryServices = 
connection.getQueryServices();
             queryServices.clearTableFromCache(ByteUtil.EMPTY_BYTE_ARRAY, 
schemaName, tableName,
                     clientTimeStamp);
@@ -4070,9 +4068,9 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
         // index and then invalidate it
         // Covered columns are deleted from the index by the client
         Region region = env.getRegion();
-        PhoenixConnection connection =
-                table.getIndexes().isEmpty() ? null : 
QueryUtil.getConnectionOnServer(
-                        
env.getConfiguration()).unwrap(PhoenixConnection.class);
+        PhoenixConnection connection = table.getIndexes().isEmpty() ? null :
+                getServerConnectionForMetaData(env.getConfiguration()).unwrap(
+                        PhoenixConnection.class);
         for (PTable index : table.getIndexes()) {
             // ignore any indexes derived from ancestors
             if (index.getName().getString().contains(
@@ -4158,9 +4156,9 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
         // Look for columnToDelete in any indexes. If found as PK column, get 
lock and drop the
         // index and then invalidate it
         // Covered columns are deleted from the index by the client
-        PhoenixConnection connection =
-                table.getIndexes().isEmpty() ? null : 
QueryUtil.getConnectionOnServer(
-                        
env.getConfiguration()).unwrap(PhoenixConnection.class);
+        PhoenixConnection connection = table.getIndexes().isEmpty() ? null :
+                getServerConnectionForMetaData(env.getConfiguration()).unwrap(
+                        PhoenixConnection.class);
         for (PTable index : table.getIndexes()) {
             byte[] tenantId = index.getTenantId() == null ? 
ByteUtil.EMPTY_BYTE_ARRAY : index.getTenantId().getBytes();
             IndexMaintainer indexMaintainer = index.getIndexMaintainer(table, 
connection);
@@ -5138,4 +5136,39 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
                                         table.getTableName(), 
table.isNamespaceMapped())
                                 .getBytes());
     }
+
+    /**
+     * Get the server side connection by skipping system table existence check.
+     *
+     * @param config The configuration object.
+     * @return Connection object.
+     * @throws SQLException If the Connection could not be retrieved.
+     */
+    private static Connection getServerConnectionForMetaData(final 
Configuration config)
+            throws SQLException {
+        Preconditions.checkNotNull(config, "The configs must not be null");
+        return getServerConnectionForMetaData(new Properties(), config);
+    }
+
+    /**
+     * Get the server side connection by skipping system table existence check.
+     *
+     * @param props The properties to be used while retrieving the Connection. 
Adds skipping of
+     * the system table existence check to the properties.
+     * @param config The configuration object.
+     * @return Connection object.
+     * @throws SQLException If the Connection could not be retrieved.
+     */
+    private static Connection getServerConnectionForMetaData(final Properties 
props,
+                                                             final 
Configuration config)
+            throws SQLException {
+        Preconditions.checkNotNull(props, "The properties must not be null");
+        Preconditions.checkNotNull(config, "The configs must not be null");
+        // No need to check for system table existence as the coproc is 
already running,
+        // hence the system tables are already created.
+        // Similarly, no need to check for client - server version 
compatibility as
+        // this is already server hosting SYSTEM.CATALOG region(s).
+        props.setProperty(SKIP_SYSTEM_TABLES_EXISTENCE_CHECK, 
Boolean.TRUE.toString());
+        return QueryUtil.getConnectionOnServer(props, config);
+    }
 }
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MetadataServerConnectionsIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MetadataServerConnectionsIT.java
new file mode 100644
index 0000000000..63fd4e7c43
--- /dev/null
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MetadataServerConnectionsIT.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.end2end;
+
+import com.google.protobuf.RpcCallback;
+import com.google.protobuf.RpcController;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.phoenix.coprocessor.MetaDataEndpointImpl;
+import org.apache.phoenix.coprocessor.generated.MetaDataProtos;
+import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
+import org.apache.phoenix.protobuf.ProtobufUtil;
+import org.apache.phoenix.query.BaseTest;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
+import org.apache.phoenix.util.ClientUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.apache.phoenix.util.SchemaUtil;
+import org.apache.phoenix.util.TestUtil;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.util.Map;
+
+import static 
org.apache.phoenix.query.QueryServices.DISABLE_VIEW_SUBTREE_VALIDATION;
+
+/**
+ * Tests to ensure connection creation by metadata coproc does not need to make
+ * RPC call to metada coproc internally.
+ */
+@Category(NeedsOwnMiniClusterTest.class)
+public class MetadataServerConnectionsIT extends BaseTest {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(MetadataServerConnectionsIT.class);
+
+    @BeforeClass
+    public static synchronized void doSetup() throws Exception {
+        Map<String, String> props = Maps.newHashMapWithExpectedSize(1);
+        props.put(QueryServices.TASK_HANDLING_INITIAL_DELAY_MS_ATTRIB,
+                Long.toString(Long.MAX_VALUE));
+        props.put(DISABLE_VIEW_SUBTREE_VALIDATION, "true");
+        setUpTestDriver(new ReadOnlyProps(props));
+    }
+
+    public static class TestMetaDataEndpointImpl extends MetaDataEndpointImpl {
+
+        @Override
+        public void getVersion(RpcController controller, 
MetaDataProtos.GetVersionRequest request,
+                               RpcCallback<MetaDataProtos.GetVersionResponse> 
done) {
+            MetaDataProtos.GetVersionResponse.Builder builder =
+                    MetaDataProtos.GetVersionResponse.newBuilder();
+            LOGGER.error("This is unexpected");
+            ProtobufUtil.setControllerException(controller,
+                    ClientUtil.createIOException(
+                            SchemaUtil.getPhysicalTableName(
+                                    
PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES, false)
+                                    .toString(),
+                            new DoNotRetryIOException("Not allowed")));
+            builder.setVersion(-1);
+            done.run(builder.build());
+        }
+    }
+
+    @Test
+    public void testConnectionFromMetadataServer() throws Throwable {
+        final String tableName = generateUniqueName();
+        final String view01 = "v01_" + tableName;
+        final String view02 = "v02_" + tableName;
+        final String index_view01 = "idx_v01_" + tableName;
+        final String index_view02 = "idx_v02_" + tableName;
+        final String index_view03 = "idx_v03_" + tableName;
+        final String index_view04 = "idx_v04_" + tableName;
+
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            final Statement stmt = conn.createStatement();
+
+            stmt.execute("CREATE TABLE " + tableName
+                    + " (COL1 CHAR(10) NOT NULL, COL2 CHAR(5) NOT NULL, COL3 
VARCHAR,"
+                    + " COL4 VARCHAR CONSTRAINT pk PRIMARY KEY(COL1, COL2))"
+                    + " UPDATE_CACHE_FREQUENCY=ALWAYS");
+            stmt.execute("CREATE VIEW " + view01
+                    + " (VCOL1 CHAR(8), COL5 VARCHAR) AS SELECT * FROM " + 
tableName
+                    + " WHERE COL1 = 'col1'");
+            stmt.execute("CREATE VIEW " + view02 + " (VCOL2 CHAR(10), COL6 
VARCHAR)"
+                    + " AS SELECT * FROM " + view01 + " WHERE VCOL1 = 
'vcol1'");
+
+            conn.commit();
+
+            TestUtil.removeCoprocessor(conn, "SYSTEM.CATALOG", 
MetaDataEndpointImpl.class);
+            TestUtil.addCoprocessor(conn, "SYSTEM.CATALOG", 
TestMetaDataEndpointImpl.class);
+
+            stmt.execute("CREATE INDEX " + index_view01 + " ON " + view01 + " 
(COL5) INCLUDE "
+                    + "(COL1, COL2, COL3)");
+            stmt.execute("CREATE INDEX " + index_view02 + " ON " + view02 + " 
(COL6) INCLUDE "
+                    + "(COL1, COL2, COL3)");
+            stmt.execute("CREATE INDEX " + index_view03 + " ON " + view01 + " 
(COL5) INCLUDE "
+                    + "(COL2, COL1)");
+            stmt.execute("CREATE INDEX " + index_view04 + " ON " + view02 + " 
(COL6) INCLUDE "
+                    + "(COL2, COL1)");
+
+            stmt.execute("UPSERT INTO " + view02
+                    + " (col2, vcol2, col5, col6) values ('0001', 'vcol2_01', 
'col5_01', " +
+                    "'col6_01')");
+            stmt.execute("UPSERT INTO " + view02
+                    +
+                    " (col2, vcol2, col5, col6) values ('0002', 'vcol2_02', 
'col5_02', 'col6_02')");
+            stmt.execute("UPSERT INTO " + view02
+                    +
+                    " (col2, vcol2, col5, col6) values ('0003', 'vcol2_03', 
'col5_03', 'col6_03')");
+            stmt.execute("UPSERT INTO " + view01 + " (col2, vcol1, col3, col4, 
col5) values "
+                    + "('0004', 'vcol2', 'col3_04', 'col4_04', 'col5_04')");
+            stmt.execute("UPSERT INTO " + view01 + " (col2, vcol1, col3, col4, 
col5) values "
+                    + "('0005', 'vcol-2', 'col3_05', 'col4_05', 'col5_05')");
+            stmt.execute("UPSERT INTO " + view01 + " (col2, vcol1, col3, col4, 
col5) values "
+                    + "('0006', 'vcol-1', 'col3_06', 'col4_06', 'col5_06')");
+            stmt.execute("UPSERT INTO " + view01 + " (col2, vcol1, col3, col4, 
col5) values "
+                    + "('0007', 'vcol1', 'col3_07', 'col4_07', 'col5_07')");
+            stmt.execute("UPSERT INTO " + view02
+                    +
+                    " (col2, vcol2, col5, col6) values ('0008', 'vcol2_08', 
'col5_08', 'col6_02')");
+            conn.commit();
+
+            final Statement statement = conn.createStatement();
+            ResultSet rs =
+                    statement.executeQuery(
+                            "SELECT COL2, VCOL1, VCOL2, COL5, COL6 FROM " + 
view02);
+            // No need to verify each row result as the primary focus of this 
test is to ensure
+            // no RPC call from MetaDataEndpointImpl to MetaDataEndpointImpl 
is done while
+            // creating server side connection.
+            Assert.assertTrue(rs.next());
+            Assert.assertTrue(rs.next());
+            Assert.assertTrue(rs.next());
+            Assert.assertTrue(rs.next());
+            Assert.assertTrue(rs.next());
+            Assert.assertFalse(rs.next());
+
+            TestUtil.removeCoprocessor(conn, "SYSTEM.CATALOG", 
TestMetaDataEndpointImpl.class);
+            TestUtil.addCoprocessor(conn, "SYSTEM.CATALOG", 
MetaDataEndpointImpl.class);
+        }
+    }
+
+}

Reply via email to