This is an automated email from the ASF dual-hosted git repository. jisaac pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/master by this push: new 9380d8ef7c PHOENIX-7638 Creating a large number of views leads to OS thread exhaustion (#2273) 9380d8ef7c is described below commit 9380d8ef7c2a05079917711f4a402c727db8048a Author: Jacob Isaac <jacobpisaa...@gmail.com> AuthorDate: Wed Aug 20 10:11:58 2025 -0700 PHOENIX-7638 Creating a large number of views leads to OS thread exhaustion (#2273) --- .../org/apache/phoenix/schema/MetaDataClient.java | 6 +- .../java/org/apache/phoenix/util/TupleUtil.java | 7 +- .../coprocessor/PhoenixRegionServerEndpoint.java | 3 + .../java/org/apache/phoenix/util/ServerUtil.java | 13 +- .../end2end/MetadataServerConnectionsIT.java | 181 +++++++++++++++++++++ .../org/apache/phoenix/end2end/UpsertValuesIT.java | 4 +- 6 files changed, 193 insertions(+), 21 deletions(-) diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java index ae2248b86f..fdaeed259d 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java @@ -4745,9 +4745,9 @@ public class MetaDataClient { /** * To check if TTL is defined at any of the child below we are checking it at * {@link org.apache.phoenix.coprocessor.MetaDataEndpointImpl#mutateColumn(List, ColumnMutator, int, PTable, PTable, boolean)} - * level where in function {@link org.apache.phoenix.coprocessor.MetaDataEndpointImpl# - * validateIfMutationAllowedOnParent(PTable, List, PTableType, long, byte[], byte[], - * byte[], List, int)} we are already traversing through allDescendantViews. + * level where in function + * {@link org.apache.phoenix.coprocessor.MetaDataEndpointImpl# validateIfMutationAllowedOnParent(PTable, List, PTableType, long, byte[], byte[], byte[], List, int)} + * we are already traversing through allDescendantViews. */ } diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/util/TupleUtil.java b/phoenix-core-client/src/main/java/org/apache/phoenix/util/TupleUtil.java index b8f6a769d1..91e3da6e6d 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/util/TupleUtil.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/util/TupleUtil.java @@ -33,7 +33,6 @@ import java.sql.SQLException; import java.util.ArrayList; import java.util.Collections; import java.util.List; - import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.DoNotRetryIOException; @@ -237,9 +236,9 @@ public class TupleUtil { * @throws SQLException If any SQL operation fails. */ @SuppressWarnings(value = "OBL_UNSATISFIED_OBLIGATION_EXCEPTION_EDGE", - justification = "Tge statement object needs to be kept open for the returned RS to be " - + "valid, however this is acceptable as not callingPhoenixStatement.close() " - + "causes no resource leak") + justification = "Tge statement object needs to be kept open for the returned RS to be " + + "valid, however this is acceptable as not callingPhoenixStatement.close() " + + "causes no resource leak") public static ResultSet getResultSet(Tuple toProject, TableName tableName, Connection conn, boolean withPrefetch) throws SQLException { if (tableName == null) { diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixRegionServerEndpoint.java b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixRegionServerEndpoint.java index bdddf0b10b..4444118583 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixRegionServerEndpoint.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixRegionServerEndpoint.java @@ -41,6 +41,7 @@ import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.query.QueryServicesOptions; import org.apache.phoenix.util.ClientUtil; import org.apache.phoenix.util.SchemaUtil; +import org.apache.phoenix.util.ServerUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -66,10 +67,12 @@ public class PhoenixRegionServerEndpoint extends @Override public void stop(CoprocessorEnvironment env) throws IOException { + RegionServerCoprocessor.super.stop(env); if (uncoveredIndexThreadPool != null) { uncoveredIndexThreadPool .stop("PhoenixRegionServerEndpoint is stopping. Shutting down uncovered index threadpool."); } + ServerUtil.ConnectionFactory.shutdown(); } @Override diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/util/ServerUtil.java b/phoenix-core-server/src/main/java/org/apache/phoenix/util/ServerUtil.java index c46fe7f977..4a409ac69e 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/util/ServerUtil.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/util/ServerUtil.java @@ -45,7 +45,6 @@ import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.Region.RowLock; import org.apache.hadoop.hbase.util.Bytes; import org.apache.phoenix.hbase.index.util.IndexManagementUtil; -import org.apache.phoenix.hbase.index.util.VersionUtil; import org.apache.phoenix.hbase.index.write.IndexWriterUtils; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.query.QueryServicesOptions; @@ -53,14 +52,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class ServerUtil { - private static final int COPROCESSOR_SCAN_WORKS = VersionUtil.encodeVersion("0.98.6"); private static final Logger LOGGER = LoggerFactory.getLogger(ServerUtil.class); private static final String FORMAT_FOR_TIMESTAMP = ",serverTimestamp=%d,"; - private static boolean coprocessorScanWorks(RegionCoprocessorEnvironment env) { - return (VersionUtil.encodeVersion(env.getHBaseVersion()) >= COPROCESSOR_SCAN_WORKS); - } - public static boolean hasCoprocessor(RegionCoprocessorEnvironment env, String CoprocessorClassName) { Collection<CoprocessorDescriptor> coprocessors = @@ -99,17 +93,11 @@ public class ServerUtil { public static Table getHTableForCoprocessorScan(RegionCoprocessorEnvironment env, Table writerTable) throws IOException { - if (coprocessorScanWorks(env)) { - return writerTable; - } return getTableFromSingletonPool(env, writerTable.getName()); } public static Table getHTableForCoprocessorScan(RegionCoprocessorEnvironment env, TableName tableName) throws IOException { - if (coprocessorScanWorks(env)) { - return env.getConnection().getTable(tableName); - } return getTableFromSingletonPool(env, tableName); } @@ -222,6 +210,7 @@ public class ServerUtil { public static void shutdown() { synchronized (ConnectionFactory.class) { + LOGGER.info("Closing ServerUtil.ConnectionFactory connections"); for (Connection connection : connections.values()) { try { connection.close(); diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MetadataServerConnectionsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MetadataServerConnectionsIT.java index f24a3ba860..ecebf38016 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MetadataServerConnectionsIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MetadataServerConnectionsIT.java @@ -18,15 +18,34 @@ package org.apache.phoenix.end2end; import static org.apache.phoenix.query.QueryServices.DISABLE_VIEW_SUBTREE_VALIDATION; +import static org.apache.phoenix.query.QueryServicesTestImpl.DEFAULT_HCONNECTION_POOL_CORE_SIZE; +import static org.apache.phoenix.query.QueryServicesTestImpl.DEFAULT_HCONNECTION_POOL_MAX_SIZE; +import static org.apache.phoenix.util.PhoenixRuntime.TENANT_ID_ATTRIB; +import static org.junit.Assert.assertEquals; import com.google.protobuf.RpcCallback; import com.google.protobuf.RpcController; +import java.io.IOException; +import java.lang.reflect.Field; import java.sql.Connection; import java.sql.DriverManager; import java.sql.ResultSet; import java.sql.Statement; +import java.util.Arrays; +import java.util.List; import java.util.Map; +import java.util.Properties; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import org.apache.hadoop.hbase.CoprocessorEnvironment; import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.ConnectionImplementation; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.coprocessor.CoprocessorException; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.phoenix.coprocessor.MetaDataEndpointImpl; import org.apache.phoenix.coprocessor.generated.MetaDataProtos; import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; @@ -34,8 +53,10 @@ import org.apache.phoenix.protobuf.ProtobufUtil; import org.apache.phoenix.query.BaseTest; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.util.ClientUtil; +import org.apache.phoenix.util.MetaDataUtil; import org.apache.phoenix.util.ReadOnlyProps; import org.apache.phoenix.util.SchemaUtil; +import org.apache.phoenix.util.ServerUtil; import org.apache.phoenix.util.TestUtil; import org.junit.Assert; import org.junit.BeforeClass; @@ -64,6 +85,105 @@ public class MetadataServerConnectionsIT extends BaseTest { } public static class TestMetaDataEndpointImpl extends MetaDataEndpointImpl { + private RegionCoprocessorEnvironment env; + + public static void setTestCreateView(boolean testCreateView) { + TestMetaDataEndpointImpl.testCreateView = testCreateView; + } + + private static volatile boolean testCreateView = false; + + @Override + public void start(CoprocessorEnvironment env) throws IOException { + super.start(env); + if (env instanceof RegionCoprocessorEnvironment) { + this.env = (RegionCoprocessorEnvironment) env; + } else { + throw new CoprocessorException("Must be loaded on a table region!"); + } + } + + @Override + public void createTable(RpcController controller, MetaDataProtos.CreateTableRequest request, + RpcCallback<MetaDataProtos.MetaDataResponse> done) { + // Invoke the actual create table routine + super.createTable(controller, request, done); + + byte[][] rowKeyMetaData = new byte[3][]; + byte[] schemaName = null; + byte[] tableName = null; + String fullTableName = null; + + // Get the singleton connection for testing + org.apache.hadoop.hbase.client.Connection conn = ServerUtil.ConnectionFactory + .getConnection(ServerUtil.ConnectionType.DEFAULT_SERVER_CONNECTION, env); + try { + // Get the current table creation details + List<Mutation> tableMetadata = ProtobufUtil.getMutations(request); + MetaDataUtil.getTenantIdAndSchemaAndTableName(tableMetadata, rowKeyMetaData); + schemaName = rowKeyMetaData[PhoenixDatabaseMetaData.SCHEMA_NAME_INDEX]; + tableName = rowKeyMetaData[PhoenixDatabaseMetaData.TABLE_NAME_INDEX]; + fullTableName = SchemaUtil.getTableName(schemaName, tableName); + + ThreadPoolExecutor ctpe = null; + ThreadPoolExecutor htpe = null; + + // Get the thread pool executor from the connection. + if (conn instanceof ConnectionImplementation) { + ConnectionImplementation connImpl = ((ConnectionImplementation) conn); + Field props = null; + props = ConnectionImplementation.class.getDeclaredField("batchPool"); + props.setAccessible(true); + ctpe = (ThreadPoolExecutor) props.get(connImpl); + LOGGER.debug("ConnectionImplementation Thread pool info :" + ctpe.toString()); + + } + + // Get the thread pool executor from the HTable. + Table hTable = + ServerUtil.getHTableForCoprocessorScan(env, TableName.valueOf(fullTableName)); + if (hTable instanceof HTable) { + HTable testTable = (HTable) hTable; + Field props = testTable.getClass().getDeclaredField("pool"); + props.setAccessible(true); + htpe = ((ThreadPoolExecutor) props.get(hTable)); + LOGGER.debug("HTable Thread pool info :" + htpe.toString()); + // Assert the HTable thread pool config match the Connection pool configs. + // Since we are not overriding any defaults, it should match the defaults. + assertEquals(htpe.getMaximumPoolSize(), DEFAULT_HCONNECTION_POOL_MAX_SIZE); + assertEquals(htpe.getCorePoolSize(), DEFAULT_HCONNECTION_POOL_CORE_SIZE); + LOGGER.debug("HTable threadpool info {}, {}, {}, {}", htpe.getCorePoolSize(), + htpe.getMaximumPoolSize(), htpe.getQueue().remainingCapacity(), + htpe.getKeepAliveTime(TimeUnit.SECONDS)); + + int count = Thread.activeCount(); + Thread[] th = new Thread[count]; + // returns the number of threads put into the array + Thread.enumerate(th); + long hTablePoolCount = + Arrays.stream(th).filter(s -> s.getName().equals("htable-pool-0")).count(); + // Assert no default HTable threadpools are created. + assertEquals(0, hTablePoolCount); + LOGGER.debug("htable-pool-0 threads {}", hTablePoolCount); + } + // Assert that the threadpool from Connection and HTable are the same. + assertEquals(ctpe, htpe); + } catch (RuntimeException | NoSuchFieldException | IllegalAccessException | IOException t) { + // handle cases that an IOE is wrapped inside a RuntimeException + // like HTableInterface#createHTableInterface + MetaDataProtos.MetaDataResponse.Builder builder = + MetaDataProtos.MetaDataResponse.newBuilder(); + + LOGGER.error("This is unexpected"); + ProtobufUtil.setControllerException(controller, + ClientUtil.createIOException(SchemaUtil + .getPhysicalTableName(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES, false) + .toString(), new DoNotRetryIOException("Not allowed"))); + done.run(builder.build()); + + } + + } @Override public void getVersion(RpcController controller, MetaDataProtos.GetVersionRequest request, @@ -81,6 +201,67 @@ public class MetadataServerConnectionsIT extends BaseTest { } } + @Test + public void testViewCreationAndServerConnections() throws Throwable { + final String tableName = generateUniqueName(); + final String view01 = "v01_" + tableName; + final String view02 = "v02_" + tableName; + final String index_view01 = "idx_v01_" + tableName; + final String index_view02 = "idx_v02_" + tableName; + final String index_view03 = "idx_v03_" + tableName; + final String index_view04 = "idx_v04_" + tableName; + final int NUM_VIEWS = 50; + + TestMetaDataEndpointImpl.setTestCreateView(true); + try (Connection conn = DriverManager.getConnection(getUrl())) { + TestUtil.removeCoprocessor(conn, "SYSTEM.CATALOG", MetaDataEndpointImpl.class); + TestUtil.addCoprocessor(conn, "SYSTEM.CATALOG", TestMetaDataEndpointImpl.class); + + final Statement stmt = conn.createStatement(); + + stmt.execute("CREATE TABLE " + tableName + + " (COL1 CHAR(10) NOT NULL, COL2 CHAR(5) NOT NULL, COL3 VARCHAR," + + " COL4 VARCHAR CONSTRAINT pk PRIMARY KEY(COL1, COL2))" + + " UPDATE_CACHE_FREQUENCY=ALWAYS, MULTI_TENANT=true"); + conn.commit(); + + for (int i = 0; i < NUM_VIEWS; i++) { + Properties props = new Properties(); + String viewTenantId = String.format("00T%012d", i); + props.setProperty(TENANT_ID_ATTRIB, viewTenantId); + // Create multilevel tenant views + try (Connection tConn = DriverManager.getConnection(getUrl(), props)) { + final Statement viewStmt = tConn.createStatement(); + viewStmt + .execute("CREATE VIEW " + view01 + " (VCOL1 CHAR(8), COL5 VARCHAR) AS SELECT * FROM " + + tableName + " WHERE COL2 = 'col2'"); + + viewStmt.execute("CREATE VIEW " + view02 + " (VCOL2 CHAR(10), COL6 VARCHAR)" + + " AS SELECT * FROM " + view01 + " WHERE VCOL1 = 'vcol1'"); + tConn.commit(); + + // Create multilevel tenant indexes + final Statement indexStmt = tConn.createStatement(); + indexStmt.execute("CREATE INDEX " + index_view01 + " ON " + view01 + " (COL5) INCLUDE " + + "(COL1, COL2, COL3)"); + indexStmt.execute("CREATE INDEX " + index_view02 + " ON " + view02 + " (COL6) INCLUDE " + + "(COL1, COL2, COL3)"); + indexStmt.execute( + "CREATE INDEX " + index_view03 + " ON " + view01 + " (COL5) INCLUDE " + "(COL2, COL1)"); + indexStmt.execute( + "CREATE INDEX " + index_view04 + " ON " + view02 + " (COL6) INCLUDE " + "(COL2, COL1)"); + + tConn.commit(); + + } + + } + + TestUtil.removeCoprocessor(conn, "SYSTEM.CATALOG", TestMetaDataEndpointImpl.class); + TestUtil.addCoprocessor(conn, "SYSTEM.CATALOG", MetaDataEndpointImpl.class); + } + } + @Test public void testConnectionFromMetadataServer() throws Throwable { final String tableName = generateUniqueName(); diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertValuesIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertValuesIT.java index e2a6ac2cee..08da6be7ae 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertValuesIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertValuesIT.java @@ -125,8 +125,8 @@ public class UpsertValuesIT extends ParallelStatsDisabledIT { assertEquals("a", rs.getString(1)); assertEquals("b", rs.getString(2)); assertFalse(rs.next()); - stmt = conn.prepareStatement("UPSERT INTO " + tableName - + " (inst,host,\"DATE\") VALUES(?,'b',CURRENT_DATE())"); + stmt = conn.prepareStatement( + "UPSERT INTO " + tableName + " (inst,host,\"DATE\") VALUES(?,'b',CURRENT_DATE())"); stmt.setString(1, "a"); stmt.execute(); rs = stmt.getResultSet();