This is an automated email from the ASF dual-hosted git repository.

jisaac pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/master by this push:
     new 9380d8ef7c PHOENIX-7638 Creating a large number of views leads to OS 
thread exhaustion (#2273)
9380d8ef7c is described below

commit 9380d8ef7c2a05079917711f4a402c727db8048a
Author: Jacob Isaac <jacobpisaa...@gmail.com>
AuthorDate: Wed Aug 20 10:11:58 2025 -0700

    PHOENIX-7638 Creating a large number of views leads to OS thread exhaustion 
(#2273)
---
 .../org/apache/phoenix/schema/MetaDataClient.java  |   6 +-
 .../java/org/apache/phoenix/util/TupleUtil.java    |   7 +-
 .../coprocessor/PhoenixRegionServerEndpoint.java   |   3 +
 .../java/org/apache/phoenix/util/ServerUtil.java   |  13 +-
 .../end2end/MetadataServerConnectionsIT.java       | 181 +++++++++++++++++++++
 .../org/apache/phoenix/end2end/UpsertValuesIT.java |   4 +-
 6 files changed, 193 insertions(+), 21 deletions(-)

diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index ae2248b86f..fdaeed259d 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -4745,9 +4745,9 @@ public class MetaDataClient {
           /**
            * To check if TTL is defined at any of the child below we are 
checking it at
            * {@link 
org.apache.phoenix.coprocessor.MetaDataEndpointImpl#mutateColumn(List, 
ColumnMutator, int, PTable, PTable, boolean)}
-           * level where in function {@link 
org.apache.phoenix.coprocessor.MetaDataEndpointImpl#
-           * validateIfMutationAllowedOnParent(PTable, List, PTableType, long, 
byte[], byte[],
-           * byte[], List, int)} we are already traversing through 
allDescendantViews.
+           * level where in function
+           * {@link org.apache.phoenix.coprocessor.MetaDataEndpointImpl# 
validateIfMutationAllowedOnParent(PTable, List, PTableType, long, byte[], 
byte[], byte[], List, int)}
+           * we are already traversing through allDescendantViews.
            */
         }
 
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/util/TupleUtil.java 
b/phoenix-core-client/src/main/java/org/apache/phoenix/util/TupleUtil.java
index b8f6a769d1..91e3da6e6d 100644
--- a/phoenix-core-client/src/main/java/org/apache/phoenix/util/TupleUtil.java
+++ b/phoenix-core-client/src/main/java/org/apache/phoenix/util/TupleUtil.java
@@ -33,7 +33,6 @@ import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
-
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
@@ -237,9 +236,9 @@ public class TupleUtil {
    * @throws SQLException If any SQL operation fails.
    */
   @SuppressWarnings(value = "OBL_UNSATISFIED_OBLIGATION_EXCEPTION_EDGE",
-          justification = "Tge statement object needs to be kept open for the 
returned RS to be "
-          + "valid, however this is acceptable as not 
callingPhoenixStatement.close() "
-          + "causes no resource leak")
+      justification = "Tge statement object needs to be kept open for the 
returned RS to be "
+        + "valid, however this is acceptable as not 
callingPhoenixStatement.close() "
+        + "causes no resource leak")
   public static ResultSet getResultSet(Tuple toProject, TableName tableName, 
Connection conn,
     boolean withPrefetch) throws SQLException {
     if (tableName == null) {
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixRegionServerEndpoint.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixRegionServerEndpoint.java
index bdddf0b10b..4444118583 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixRegionServerEndpoint.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixRegionServerEndpoint.java
@@ -41,6 +41,7 @@ import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.util.ClientUtil;
 import org.apache.phoenix.util.SchemaUtil;
+import org.apache.phoenix.util.ServerUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -66,10 +67,12 @@ public class PhoenixRegionServerEndpoint extends
 
   @Override
   public void stop(CoprocessorEnvironment env) throws IOException {
+    RegionServerCoprocessor.super.stop(env);
     if (uncoveredIndexThreadPool != null) {
       uncoveredIndexThreadPool
         .stop("PhoenixRegionServerEndpoint is stopping. Shutting down 
uncovered index threadpool.");
     }
+    ServerUtil.ConnectionFactory.shutdown();
   }
 
   @Override
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/util/ServerUtil.java 
b/phoenix-core-server/src/main/java/org/apache/phoenix/util/ServerUtil.java
index c46fe7f977..4a409ac69e 100644
--- a/phoenix-core-server/src/main/java/org/apache/phoenix/util/ServerUtil.java
+++ b/phoenix-core-server/src/main/java/org/apache/phoenix/util/ServerUtil.java
@@ -45,7 +45,6 @@ import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.regionserver.Region.RowLock;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.hbase.index.util.IndexManagementUtil;
-import org.apache.phoenix.hbase.index.util.VersionUtil;
 import org.apache.phoenix.hbase.index.write.IndexWriterUtils;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
@@ -53,14 +52,9 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class ServerUtil {
-  private static final int COPROCESSOR_SCAN_WORKS = 
VersionUtil.encodeVersion("0.98.6");
   private static final Logger LOGGER = 
LoggerFactory.getLogger(ServerUtil.class);
   private static final String FORMAT_FOR_TIMESTAMP = ",serverTimestamp=%d,";
 
-  private static boolean coprocessorScanWorks(RegionCoprocessorEnvironment 
env) {
-    return (VersionUtil.encodeVersion(env.getHBaseVersion()) >= 
COPROCESSOR_SCAN_WORKS);
-  }
-
   public static boolean hasCoprocessor(RegionCoprocessorEnvironment env,
     String CoprocessorClassName) {
     Collection<CoprocessorDescriptor> coprocessors =
@@ -99,17 +93,11 @@ public class ServerUtil {
 
   public static Table getHTableForCoprocessorScan(RegionCoprocessorEnvironment 
env,
     Table writerTable) throws IOException {
-    if (coprocessorScanWorks(env)) {
-      return writerTable;
-    }
     return getTableFromSingletonPool(env, writerTable.getName());
   }
 
   public static Table getHTableForCoprocessorScan(RegionCoprocessorEnvironment 
env,
     TableName tableName) throws IOException {
-    if (coprocessorScanWorks(env)) {
-      return env.getConnection().getTable(tableName);
-    }
     return getTableFromSingletonPool(env, tableName);
   }
 
@@ -222,6 +210,7 @@ public class ServerUtil {
 
     public static void shutdown() {
       synchronized (ConnectionFactory.class) {
+        LOGGER.info("Closing ServerUtil.ConnectionFactory connections");
         for (Connection connection : connections.values()) {
           try {
             connection.close();
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MetadataServerConnectionsIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MetadataServerConnectionsIT.java
index f24a3ba860..ecebf38016 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MetadataServerConnectionsIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MetadataServerConnectionsIT.java
@@ -18,15 +18,34 @@
 package org.apache.phoenix.end2end;
 
 import static 
org.apache.phoenix.query.QueryServices.DISABLE_VIEW_SUBTREE_VALIDATION;
+import static 
org.apache.phoenix.query.QueryServicesTestImpl.DEFAULT_HCONNECTION_POOL_CORE_SIZE;
+import static 
org.apache.phoenix.query.QueryServicesTestImpl.DEFAULT_HCONNECTION_POOL_MAX_SIZE;
+import static org.apache.phoenix.util.PhoenixRuntime.TENANT_ID_ATTRIB;
+import static org.junit.Assert.assertEquals;
 
 import com.google.protobuf.RpcCallback;
 import com.google.protobuf.RpcController;
+import java.io.IOException;
+import java.lang.reflect.Field;
 import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.ResultSet;
 import java.sql.Statement;
+import java.util.Arrays;
+import java.util.List;
 import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.hbase.CoprocessorEnvironment;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.ConnectionImplementation;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.phoenix.coprocessor.MetaDataEndpointImpl;
 import org.apache.phoenix.coprocessor.generated.MetaDataProtos;
 import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
@@ -34,8 +53,10 @@ import org.apache.phoenix.protobuf.ProtobufUtil;
 import org.apache.phoenix.query.BaseTest;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.util.ClientUtil;
+import org.apache.phoenix.util.MetaDataUtil;
 import org.apache.phoenix.util.ReadOnlyProps;
 import org.apache.phoenix.util.SchemaUtil;
+import org.apache.phoenix.util.ServerUtil;
 import org.apache.phoenix.util.TestUtil;
 import org.junit.Assert;
 import org.junit.BeforeClass;
@@ -64,6 +85,105 @@ public class MetadataServerConnectionsIT extends BaseTest {
   }
 
   public static class TestMetaDataEndpointImpl extends MetaDataEndpointImpl {
+    private RegionCoprocessorEnvironment env;
+
+    public static void setTestCreateView(boolean testCreateView) {
+      TestMetaDataEndpointImpl.testCreateView = testCreateView;
+    }
+
+    private static volatile boolean testCreateView = false;
+
+    @Override
+    public void start(CoprocessorEnvironment env) throws IOException {
+      super.start(env);
+      if (env instanceof RegionCoprocessorEnvironment) {
+        this.env = (RegionCoprocessorEnvironment) env;
+      } else {
+        throw new CoprocessorException("Must be loaded on a table region!");
+      }
+    }
+
+    @Override
+    public void createTable(RpcController controller, 
MetaDataProtos.CreateTableRequest request,
+      RpcCallback<MetaDataProtos.MetaDataResponse> done) {
+      // Invoke the actual create table routine
+      super.createTable(controller, request, done);
+
+      byte[][] rowKeyMetaData = new byte[3][];
+      byte[] schemaName = null;
+      byte[] tableName = null;
+      String fullTableName = null;
+
+      // Get the singleton connection for testing
+      org.apache.hadoop.hbase.client.Connection conn = 
ServerUtil.ConnectionFactory
+        .getConnection(ServerUtil.ConnectionType.DEFAULT_SERVER_CONNECTION, 
env);
+      try {
+        // Get the current table creation details
+        List<Mutation> tableMetadata = ProtobufUtil.getMutations(request);
+        MetaDataUtil.getTenantIdAndSchemaAndTableName(tableMetadata, 
rowKeyMetaData);
+        schemaName = rowKeyMetaData[PhoenixDatabaseMetaData.SCHEMA_NAME_INDEX];
+        tableName = rowKeyMetaData[PhoenixDatabaseMetaData.TABLE_NAME_INDEX];
+        fullTableName = SchemaUtil.getTableName(schemaName, tableName);
+
+        ThreadPoolExecutor ctpe = null;
+        ThreadPoolExecutor htpe = null;
+
+        // Get the thread pool executor from the connection.
+        if (conn instanceof ConnectionImplementation) {
+          ConnectionImplementation connImpl = ((ConnectionImplementation) 
conn);
+          Field props = null;
+          props = ConnectionImplementation.class.getDeclaredField("batchPool");
+          props.setAccessible(true);
+          ctpe = (ThreadPoolExecutor) props.get(connImpl);
+          LOGGER.debug("ConnectionImplementation Thread pool info :" + 
ctpe.toString());
+
+        }
+
+        // Get the thread pool executor from the HTable.
+        Table hTable =
+          ServerUtil.getHTableForCoprocessorScan(env, 
TableName.valueOf(fullTableName));
+        if (hTable instanceof HTable) {
+          HTable testTable = (HTable) hTable;
+          Field props = testTable.getClass().getDeclaredField("pool");
+          props.setAccessible(true);
+          htpe = ((ThreadPoolExecutor) props.get(hTable));
+          LOGGER.debug("HTable Thread pool info :" + htpe.toString());
+          // Assert the HTable thread pool config match the Connection pool 
configs.
+          // Since we are not overriding any defaults, it should match the 
defaults.
+          assertEquals(htpe.getMaximumPoolSize(), 
DEFAULT_HCONNECTION_POOL_MAX_SIZE);
+          assertEquals(htpe.getCorePoolSize(), 
DEFAULT_HCONNECTION_POOL_CORE_SIZE);
+          LOGGER.debug("HTable threadpool info {}, {}, {}, {}", 
htpe.getCorePoolSize(),
+            htpe.getMaximumPoolSize(), htpe.getQueue().remainingCapacity(),
+            htpe.getKeepAliveTime(TimeUnit.SECONDS));
+
+          int count = Thread.activeCount();
+          Thread[] th = new Thread[count];
+          // returns the number of threads put into the array
+          Thread.enumerate(th);
+          long hTablePoolCount =
+            Arrays.stream(th).filter(s -> 
s.getName().equals("htable-pool-0")).count();
+          // Assert no default HTable threadpools are created.
+          assertEquals(0, hTablePoolCount);
+          LOGGER.debug("htable-pool-0 threads {}", hTablePoolCount);
+        }
+        // Assert that the threadpool from Connection and HTable are the same.
+        assertEquals(ctpe, htpe);
+      } catch (RuntimeException | NoSuchFieldException | 
IllegalAccessException | IOException t) {
+        // handle cases that an IOE is wrapped inside a RuntimeException
+        // like HTableInterface#createHTableInterface
+        MetaDataProtos.MetaDataResponse.Builder builder =
+          MetaDataProtos.MetaDataResponse.newBuilder();
+
+        LOGGER.error("This is unexpected");
+        ProtobufUtil.setControllerException(controller,
+          ClientUtil.createIOException(SchemaUtil
+            
.getPhysicalTableName(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES, false)
+            .toString(), new DoNotRetryIOException("Not allowed")));
+        done.run(builder.build());
+
+      }
+
+    }
 
     @Override
     public void getVersion(RpcController controller, 
MetaDataProtos.GetVersionRequest request,
@@ -81,6 +201,67 @@ public class MetadataServerConnectionsIT extends BaseTest {
     }
   }
 
+  @Test
+  public void testViewCreationAndServerConnections() throws Throwable {
+    final String tableName = generateUniqueName();
+    final String view01 = "v01_" + tableName;
+    final String view02 = "v02_" + tableName;
+    final String index_view01 = "idx_v01_" + tableName;
+    final String index_view02 = "idx_v02_" + tableName;
+    final String index_view03 = "idx_v03_" + tableName;
+    final String index_view04 = "idx_v04_" + tableName;
+    final int NUM_VIEWS = 50;
+
+    TestMetaDataEndpointImpl.setTestCreateView(true);
+    try (Connection conn = DriverManager.getConnection(getUrl())) {
+      TestUtil.removeCoprocessor(conn, "SYSTEM.CATALOG", 
MetaDataEndpointImpl.class);
+      TestUtil.addCoprocessor(conn, "SYSTEM.CATALOG", 
TestMetaDataEndpointImpl.class);
+
+      final Statement stmt = conn.createStatement();
+
+      stmt.execute("CREATE TABLE " + tableName
+        + " (COL1 CHAR(10) NOT NULL, COL2 CHAR(5) NOT NULL, COL3 VARCHAR,"
+        + " COL4 VARCHAR CONSTRAINT pk PRIMARY KEY(COL1, COL2))"
+        + " UPDATE_CACHE_FREQUENCY=ALWAYS, MULTI_TENANT=true");
+      conn.commit();
+
+      for (int i = 0; i < NUM_VIEWS; i++) {
+        Properties props = new Properties();
+        String viewTenantId = String.format("00T%012d", i);
+        props.setProperty(TENANT_ID_ATTRIB, viewTenantId);
+        // Create multilevel tenant views
+        try (Connection tConn = DriverManager.getConnection(getUrl(), props)) {
+          final Statement viewStmt = tConn.createStatement();
+          viewStmt
+            .execute("CREATE VIEW " + view01 + " (VCOL1 CHAR(8), COL5 VARCHAR) 
AS SELECT * FROM "
+              + tableName + " WHERE COL2 = 'col2'");
+
+          viewStmt.execute("CREATE VIEW " + view02 + " (VCOL2 CHAR(10), COL6 
VARCHAR)"
+            + " AS SELECT * FROM " + view01 + " WHERE VCOL1 = 'vcol1'");
+          tConn.commit();
+
+          // Create multilevel tenant indexes
+          final Statement indexStmt = tConn.createStatement();
+          indexStmt.execute("CREATE INDEX " + index_view01 + " ON " + view01 + 
" (COL5) INCLUDE "
+            + "(COL1, COL2, COL3)");
+          indexStmt.execute("CREATE INDEX " + index_view02 + " ON " + view02 + 
" (COL6) INCLUDE "
+            + "(COL1, COL2, COL3)");
+          indexStmt.execute(
+            "CREATE INDEX " + index_view03 + " ON " + view01 + " (COL5) 
INCLUDE " + "(COL2, COL1)");
+          indexStmt.execute(
+            "CREATE INDEX " + index_view04 + " ON " + view02 + " (COL6) 
INCLUDE " + "(COL2, COL1)");
+
+          tConn.commit();
+
+        }
+
+      }
+
+      TestUtil.removeCoprocessor(conn, "SYSTEM.CATALOG", 
TestMetaDataEndpointImpl.class);
+      TestUtil.addCoprocessor(conn, "SYSTEM.CATALOG", 
MetaDataEndpointImpl.class);
+    }
+  }
+
   @Test
   public void testConnectionFromMetadataServer() throws Throwable {
     final String tableName = generateUniqueName();
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertValuesIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertValuesIT.java
index e2a6ac2cee..08da6be7ae 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertValuesIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertValuesIT.java
@@ -125,8 +125,8 @@ public class UpsertValuesIT extends ParallelStatsDisabledIT 
{
     assertEquals("a", rs.getString(1));
     assertEquals("b", rs.getString(2));
     assertFalse(rs.next());
-    stmt = conn.prepareStatement("UPSERT INTO " + tableName
-            + " (inst,host,\"DATE\") VALUES(?,'b',CURRENT_DATE())");
+    stmt = conn.prepareStatement(
+      "UPSERT INTO " + tableName + " (inst,host,\"DATE\") 
VALUES(?,'b',CURRENT_DATE())");
     stmt.setString(1, "a");
     stmt.execute();
     rs = stmt.getResultSet();

Reply via email to