This is an automated email from the ASF dual-hosted git repository.
vjasani pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/master by this push:
new d3ba58889e PHOENIX-7375 CQSI connection init from regionserver hosting
SYSTEM.CATALOG does not require RPC calls to system tables (#1950)
d3ba58889e is described below
commit d3ba58889eea9360e75f46aa828f3bee0e003100
Author: Viraj Jasani <[email protected]>
AuthorDate: Tue Aug 13 18:04:54 2024 -0700
PHOENIX-7375 CQSI connection init from regionserver hosting SYSTEM.CATALOG
does not require RPC calls to system tables (#1950)
---
.../phoenix/coprocessor/MetaDataEndpointImpl.java | 91 ++++++++----
.../end2end/MetadataServerConnectionsIT.java | 164 +++++++++++++++++++++
2 files changed, 226 insertions(+), 29 deletions(-)
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
index b54fd8c1e9..9497ce01af 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
@@ -104,6 +104,7 @@ import static
org.apache.phoenix.util.ViewUtil.getSystemTableForChildLinks;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.security.PrivilegedExceptionAction;
+import java.sql.Connection;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
@@ -258,6 +259,7 @@ import org.apache.phoenix.schema.types.PLong;
import org.apache.phoenix.schema.types.PTinyint;
import org.apache.phoenix.schema.types.PVarbinary;
import org.apache.phoenix.schema.types.PVarchar;
+import org.apache.phoenix.thirdparty.com.google.common.base.Preconditions;
import org.apache.phoenix.trace.util.Tracing;
import org.apache.phoenix.transaction.TransactionFactory;
import org.apache.phoenix.util.ByteUtil;
@@ -722,7 +724,8 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
if (request.getClientVersion() < MIN_SPLITTABLE_SYSTEM_CATALOG
&& table.getType() == PTableType.VIEW
&& table.getViewType() != MAPPED) {
- try (PhoenixConnection connection =
QueryUtil.getConnectionOnServer(env.getConfiguration()).unwrap(PhoenixConnection.class))
{
+ try (PhoenixConnection connection =
getServerConnectionForMetaData(
+
env.getConfiguration()).unwrap(PhoenixConnection.class)) {
PTable pTable =
connection.getTableNoCache(table.getParentName().getString());
table = ViewUtil.addDerivedColumnsFromParent(connection,
table, pTable);
}
@@ -1577,7 +1580,8 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
// Hence, it is recommended to scan SYSTEM.CATALOG
table again using
// separate CQSI connection as SYSTEM.CATALOG is
splittable so the
// PTable with famName might be available on
different region.
- try (PhoenixConnection connection =
QueryUtil.getConnectionOnServer(env.getConfiguration()).unwrap(PhoenixConnection.class))
{
+ try (PhoenixConnection connection =
getServerConnectionForMetaData(
+
env.getConfiguration()).unwrap(PhoenixConnection.class)) {
parentTable =
connection.getTableNoCache(famName.getString());
} catch (TableNotFoundException e) {
// It is ok to swallow this exception since
this could be a view index and _IDX_ table is not there.
@@ -2542,7 +2546,8 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
// tableMetadata and set the view statement and partition
column correctly
if (parentTable != null &&
parentTable.getAutoPartitionSeqName() != null) {
long autoPartitionNum = 1;
- try (PhoenixConnection connection =
QueryUtil.getConnectionOnServer(env.getConfiguration()).unwrap(PhoenixConnection.class);
+ try (PhoenixConnection connection =
getServerConnectionForMetaData(
+
env.getConfiguration()).unwrap(PhoenixConnection.class);
Statement stmt = connection.createStatement()) {
String seqName = parentTable.getAutoPartitionSeqName();
// Not going through the standard route of using
statement.execute() as that code path
@@ -2616,7 +2621,8 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
Long indexId = null;
if (request.hasAllocateIndexId() &&
request.getAllocateIndexId()) {
String tenantIdStr = tenantIdBytes.length == 0 ? null :
Bytes.toString(tenantIdBytes);
- try (PhoenixConnection connection =
QueryUtil.getConnectionOnServer(env.getConfiguration()).unwrap(PhoenixConnection.class))
{
+ try (PhoenixConnection connection =
getServerConnectionForMetaData(
+
env.getConfiguration()).unwrap(PhoenixConnection.class)) {
PName physicalName = parentTable.getPhysicalName();
long seqValue = getViewIndexSequenceValue(connection,
tenantIdStr, parentTable);
Put tableHeaderPut =
MetaDataUtil.getPutOnlyTableHeaderRow(tableMetadata);
@@ -3022,9 +3028,8 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
&& (clientVersion >= MIN_SPLITTABLE_SYSTEM_CATALOG
||
SchemaUtil.getPhysicalTableName(SYSTEM_CHILD_LINK_NAME_BYTES,
env.getConfiguration()).equals(hTable.getName()))) {
- try (PhoenixConnection conn =
-
QueryUtil.getConnectionOnServer(env.getConfiguration())
- .unwrap(PhoenixConnection.class)) {
+ try (PhoenixConnection conn =
getServerConnectionForMetaData(
+
env.getConfiguration()).unwrap(PhoenixConnection.class)) {
ServerTask.addTask(new
SystemTaskParams.SystemTaskParamsBuilder()
.setConn(conn)
.setTaskType(PTable.TaskType.DROP_CHILD_VIEWS)
@@ -3169,9 +3174,8 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
User.runAsLoginUser(new PrivilegedExceptionAction<Object>() {
@Override
public Object run() throws Exception {
- try (PhoenixConnection connection =
-
QueryUtil.getConnectionOnServer(env.getConfiguration())
- .unwrap(PhoenixConnection.class))
{
+ try (PhoenixConnection connection =
getServerConnectionForMetaData(
+
env.getConfiguration()).unwrap(PhoenixConnection.class)) {
try {
MetaDataUtil.deleteFromStatsTable(connection,
deletedTable,
physicalTableNames, sharedTableStates);
@@ -3344,7 +3348,7 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
}
if (clientVersion < MIN_SPLITTABLE_SYSTEM_CATALOG && tableType ==
PTableType.VIEW) {
- try (PhoenixConnection connection =
QueryUtil.getConnectionOnServer(
+ try (PhoenixConnection connection = getServerConnectionForMetaData(
env.getConfiguration()).unwrap(PhoenixConnection.class)) {
PTable pTable =
connection.getTableNoCache(table.getParentName().getString());
table =
ViewUtil.addDerivedColumnsAndIndexesFromParent(connection, table, pTable);
@@ -3544,9 +3548,8 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
if (clientTimeStamp != HConstants.LATEST_TIMESTAMP) {
props.setProperty("CurrentSCN",
Long.toString(clientTimeStamp));
}
- try (PhoenixConnection connection =
- QueryUtil.getConnectionOnServer(props,
env.getConfiguration())
- .unwrap(PhoenixConnection.class)) {
+ try (PhoenixConnection connection =
getServerConnectionForMetaData(props,
+
env.getConfiguration()).unwrap(PhoenixConnection.class)) {
table =
ViewUtil.addDerivedColumnsAndIndexesFromParent(connection, table,
parentTable);
}
@@ -3722,7 +3725,7 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
invalidateAllChildTablesAndIndexes(table, childViews);
}
if (clientVersion < MIN_SPLITTABLE_SYSTEM_CATALOG && type
== PTableType.VIEW) {
- try (PhoenixConnection connection =
QueryUtil.getConnectionOnServer(
+ try (PhoenixConnection connection =
getServerConnectionForMetaData(
env.getConfiguration()).unwrap(PhoenixConnection.class)) {
PTable pTable = connection.getTableNoCache(
table.getParentName().getString());
@@ -3759,11 +3762,7 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
+ " phoenix.metadata.invalidate.cache.enabled is set to
false");
return;
}
- Properties properties = new Properties();
- // Skip checking of system table existence since the system tables
should have created
- // by now.
- properties.setProperty(SKIP_SYSTEM_TABLES_EXISTENCE_CHECK, "true");
- try (PhoenixConnection connection =
QueryUtil.getConnectionOnServer(properties,
+ try (PhoenixConnection connection = getServerConnectionForMetaData(
env.getConfiguration()).unwrap(PhoenixConnection.class)) {
ConnectionQueryServices queryServices =
connection.getQueryServices();
queryServices.invalidateServerMetadataCache(requests);
@@ -3805,9 +3804,8 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
if (clientTimeStamp != HConstants.LATEST_TIMESTAMP) {
props.setProperty("CurrentSCN", Long.toString(clientTimeStamp));
}
- try (PhoenixConnection connection =
- QueryUtil.getConnectionOnServer(props,
env.getConfiguration())
- .unwrap(PhoenixConnection.class)) {
+ try (PhoenixConnection connection =
getServerConnectionForMetaData(props,
+ env.getConfiguration()).unwrap(PhoenixConnection.class)) {
ConnectionQueryServices queryServices =
connection.getQueryServices();
queryServices.clearTableFromCache(ByteUtil.EMPTY_BYTE_ARRAY,
schemaName, tableName,
clientTimeStamp);
@@ -4070,9 +4068,9 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
// index and then invalidate it
// Covered columns are deleted from the index by the client
Region region = env.getRegion();
- PhoenixConnection connection =
- table.getIndexes().isEmpty() ? null :
QueryUtil.getConnectionOnServer(
-
env.getConfiguration()).unwrap(PhoenixConnection.class);
+ PhoenixConnection connection = table.getIndexes().isEmpty() ? null :
+ getServerConnectionForMetaData(env.getConfiguration()).unwrap(
+ PhoenixConnection.class);
for (PTable index : table.getIndexes()) {
// ignore any indexes derived from ancestors
if (index.getName().getString().contains(
@@ -4158,9 +4156,9 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
// Look for columnToDelete in any indexes. If found as PK column, get
lock and drop the
// index and then invalidate it
// Covered columns are deleted from the index by the client
- PhoenixConnection connection =
- table.getIndexes().isEmpty() ? null :
QueryUtil.getConnectionOnServer(
-
env.getConfiguration()).unwrap(PhoenixConnection.class);
+ PhoenixConnection connection = table.getIndexes().isEmpty() ? null :
+ getServerConnectionForMetaData(env.getConfiguration()).unwrap(
+ PhoenixConnection.class);
for (PTable index : table.getIndexes()) {
byte[] tenantId = index.getTenantId() == null ?
ByteUtil.EMPTY_BYTE_ARRAY : index.getTenantId().getBytes();
IndexMaintainer indexMaintainer = index.getIndexMaintainer(table,
connection);
@@ -5138,4 +5136,39 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
table.getTableName(),
table.isNamespaceMapped())
.getBytes());
}
+
+ /**
+ * Get the server side connection by skipping system table existence check.
+ *
+ * @param config The configuration object.
+ * @return Connection object.
+ * @throws SQLException If the Connection could not be retrieved.
+ */
+ private static Connection getServerConnectionForMetaData(final
Configuration config)
+ throws SQLException {
+ Preconditions.checkNotNull(config, "The configs must not be null");
+ return getServerConnectionForMetaData(new Properties(), config);
+ }
+
+ /**
+ * Get the server side connection by skipping system table existence check.
+ *
+ * @param props The properties to be used while retrieving the Connection.
Adds skipping of
+ * the system table existence check to the properties.
+ * @param config The configuration object.
+ * @return Connection object.
+ * @throws SQLException If the Connection could not be retrieved.
+ */
+ private static Connection getServerConnectionForMetaData(final Properties
props,
+ final
Configuration config)
+ throws SQLException {
+ Preconditions.checkNotNull(props, "The properties must not be null");
+ Preconditions.checkNotNull(config, "The configs must not be null");
+ // No need to check for system table existence as the coproc is
already running,
+ // hence the system tables are already created.
+ // Similarly, no need to check for client - server version
compatibility as
+ // this is already server hosting SYSTEM.CATALOG region(s).
+ props.setProperty(SKIP_SYSTEM_TABLES_EXISTENCE_CHECK,
Boolean.TRUE.toString());
+ return QueryUtil.getConnectionOnServer(props, config);
+ }
}
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MetadataServerConnectionsIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MetadataServerConnectionsIT.java
new file mode 100644
index 0000000000..63fd4e7c43
--- /dev/null
+++
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MetadataServerConnectionsIT.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.end2end;
+
+import com.google.protobuf.RpcCallback;
+import com.google.protobuf.RpcController;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.phoenix.coprocessor.MetaDataEndpointImpl;
+import org.apache.phoenix.coprocessor.generated.MetaDataProtos;
+import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
+import org.apache.phoenix.protobuf.ProtobufUtil;
+import org.apache.phoenix.query.BaseTest;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
+import org.apache.phoenix.util.ClientUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.apache.phoenix.util.SchemaUtil;
+import org.apache.phoenix.util.TestUtil;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.util.Map;
+
+import static
org.apache.phoenix.query.QueryServices.DISABLE_VIEW_SUBTREE_VALIDATION;
+
+/**
+ * Tests to ensure connection creation by metadata coproc does not need to make
+ * RPC call to metada coproc internally.
+ */
+@Category(NeedsOwnMiniClusterTest.class)
+public class MetadataServerConnectionsIT extends BaseTest {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(MetadataServerConnectionsIT.class);
+
+ @BeforeClass
+ public static synchronized void doSetup() throws Exception {
+ Map<String, String> props = Maps.newHashMapWithExpectedSize(1);
+ props.put(QueryServices.TASK_HANDLING_INITIAL_DELAY_MS_ATTRIB,
+ Long.toString(Long.MAX_VALUE));
+ props.put(DISABLE_VIEW_SUBTREE_VALIDATION, "true");
+ setUpTestDriver(new ReadOnlyProps(props));
+ }
+
+ public static class TestMetaDataEndpointImpl extends MetaDataEndpointImpl {
+
+ @Override
+ public void getVersion(RpcController controller,
MetaDataProtos.GetVersionRequest request,
+ RpcCallback<MetaDataProtos.GetVersionResponse>
done) {
+ MetaDataProtos.GetVersionResponse.Builder builder =
+ MetaDataProtos.GetVersionResponse.newBuilder();
+ LOGGER.error("This is unexpected");
+ ProtobufUtil.setControllerException(controller,
+ ClientUtil.createIOException(
+ SchemaUtil.getPhysicalTableName(
+
PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES, false)
+ .toString(),
+ new DoNotRetryIOException("Not allowed")));
+ builder.setVersion(-1);
+ done.run(builder.build());
+ }
+ }
+
+ @Test
+ public void testConnectionFromMetadataServer() throws Throwable {
+ final String tableName = generateUniqueName();
+ final String view01 = "v01_" + tableName;
+ final String view02 = "v02_" + tableName;
+ final String index_view01 = "idx_v01_" + tableName;
+ final String index_view02 = "idx_v02_" + tableName;
+ final String index_view03 = "idx_v03_" + tableName;
+ final String index_view04 = "idx_v04_" + tableName;
+
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ final Statement stmt = conn.createStatement();
+
+ stmt.execute("CREATE TABLE " + tableName
+ + " (COL1 CHAR(10) NOT NULL, COL2 CHAR(5) NOT NULL, COL3
VARCHAR,"
+ + " COL4 VARCHAR CONSTRAINT pk PRIMARY KEY(COL1, COL2))"
+ + " UPDATE_CACHE_FREQUENCY=ALWAYS");
+ stmt.execute("CREATE VIEW " + view01
+ + " (VCOL1 CHAR(8), COL5 VARCHAR) AS SELECT * FROM " +
tableName
+ + " WHERE COL1 = 'col1'");
+ stmt.execute("CREATE VIEW " + view02 + " (VCOL2 CHAR(10), COL6
VARCHAR)"
+ + " AS SELECT * FROM " + view01 + " WHERE VCOL1 =
'vcol1'");
+
+ conn.commit();
+
+ TestUtil.removeCoprocessor(conn, "SYSTEM.CATALOG",
MetaDataEndpointImpl.class);
+ TestUtil.addCoprocessor(conn, "SYSTEM.CATALOG",
TestMetaDataEndpointImpl.class);
+
+ stmt.execute("CREATE INDEX " + index_view01 + " ON " + view01 + "
(COL5) INCLUDE "
+ + "(COL1, COL2, COL3)");
+ stmt.execute("CREATE INDEX " + index_view02 + " ON " + view02 + "
(COL6) INCLUDE "
+ + "(COL1, COL2, COL3)");
+ stmt.execute("CREATE INDEX " + index_view03 + " ON " + view01 + "
(COL5) INCLUDE "
+ + "(COL2, COL1)");
+ stmt.execute("CREATE INDEX " + index_view04 + " ON " + view02 + "
(COL6) INCLUDE "
+ + "(COL2, COL1)");
+
+ stmt.execute("UPSERT INTO " + view02
+ + " (col2, vcol2, col5, col6) values ('0001', 'vcol2_01',
'col5_01', " +
+ "'col6_01')");
+ stmt.execute("UPSERT INTO " + view02
+ +
+ " (col2, vcol2, col5, col6) values ('0002', 'vcol2_02',
'col5_02', 'col6_02')");
+ stmt.execute("UPSERT INTO " + view02
+ +
+ " (col2, vcol2, col5, col6) values ('0003', 'vcol2_03',
'col5_03', 'col6_03')");
+ stmt.execute("UPSERT INTO " + view01 + " (col2, vcol1, col3, col4,
col5) values "
+ + "('0004', 'vcol2', 'col3_04', 'col4_04', 'col5_04')");
+ stmt.execute("UPSERT INTO " + view01 + " (col2, vcol1, col3, col4,
col5) values "
+ + "('0005', 'vcol-2', 'col3_05', 'col4_05', 'col5_05')");
+ stmt.execute("UPSERT INTO " + view01 + " (col2, vcol1, col3, col4,
col5) values "
+ + "('0006', 'vcol-1', 'col3_06', 'col4_06', 'col5_06')");
+ stmt.execute("UPSERT INTO " + view01 + " (col2, vcol1, col3, col4,
col5) values "
+ + "('0007', 'vcol1', 'col3_07', 'col4_07', 'col5_07')");
+ stmt.execute("UPSERT INTO " + view02
+ +
+ " (col2, vcol2, col5, col6) values ('0008', 'vcol2_08',
'col5_08', 'col6_02')");
+ conn.commit();
+
+ final Statement statement = conn.createStatement();
+ ResultSet rs =
+ statement.executeQuery(
+ "SELECT COL2, VCOL1, VCOL2, COL5, COL6 FROM " +
view02);
+ // No need to verify each row result as the primary focus of this
test is to ensure
+ // no RPC call from MetaDataEndpointImpl to MetaDataEndpointImpl
is done while
+ // creating server side connection.
+ Assert.assertTrue(rs.next());
+ Assert.assertTrue(rs.next());
+ Assert.assertTrue(rs.next());
+ Assert.assertTrue(rs.next());
+ Assert.assertTrue(rs.next());
+ Assert.assertFalse(rs.next());
+
+ TestUtil.removeCoprocessor(conn, "SYSTEM.CATALOG",
TestMetaDataEndpointImpl.class);
+ TestUtil.addCoprocessor(conn, "SYSTEM.CATALOG",
MetaDataEndpointImpl.class);
+ }
+ }
+
+}