This is an automated email from the ASF dual-hosted git repository.

vjasani pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/master by this push:
     new 1493190e0d PHOENIX-7727 Eliminate IndexMetadataCache RPCs by 
leveraging server PTable cache (#2321)
1493190e0d is described below

commit 1493190e0d69fa9ed95e35aab55888204fe3c63d
Author: Viraj Jasani <[email protected]>
AuthorDate: Wed Dec 17 16:59:40 2025 +0530

    PHOENIX-7727 Eliminate IndexMetadataCache RPCs by leveraging server PTable 
cache (#2321)
---
 .../phoenix/index/IndexMetaDataCacheClient.java    |  61 +++++-
 .../org/apache/phoenix/query/QueryServices.java    |   1 +
 .../apache/phoenix/query/QueryServicesOptions.java |   3 +
 .../java/org/apache/phoenix/schema/PTable.java     |  17 ++
 .../phoenix/index/PhoenixIndexMetaDataBuilder.java | 140 +++++++++++---
 .../phoenix/end2end/ConcurrentMutationsIT.java     |   4 +-
 .../phoenix/end2end/UCFWithDisabledIndexIT.java    |   2 +
 .../UCFWithDisabledIndexWithDDLValidationIT.java   |   3 +
 .../phoenix/end2end/UCFWithServerMetadataIT.java   | 213 +++++++++++++++++++++
 9 files changed, 419 insertions(+), 25 deletions(-)

diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheClient.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheClient.java
index aa03f4a76c..53a833bbe0 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheClient.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheClient.java
@@ -18,6 +18,9 @@
 package org.apache.phoenix.index;
 
 import static 
org.apache.phoenix.query.QueryServices.INDEX_MUTATE_BATCH_SIZE_THRESHOLD_ATTRIB;
+import static 
org.apache.phoenix.query.QueryServices.INDEX_USE_SERVER_METADATA_ATTRIB;
+import static 
org.apache.phoenix.query.QueryServices.SERVER_SIDE_IMMUTABLE_INDEXES_ENABLED_ATTRIB;
+import static 
org.apache.phoenix.query.QueryServicesOptions.DEFAULT_SERVER_SIDE_IMMUTABLE_INDEXES_ENABLED;
 import static org.apache.phoenix.schema.types.PDataType.TRUE_BYTES;
 
 import java.sql.SQLException;
@@ -32,15 +35,21 @@ import 
org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants;
 import org.apache.phoenix.coprocessorclient.MetaDataProtocol;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.join.MaxServerCacheSizeExceededException;
+import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTableType;
 import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.ReadOnlyProps;
 import org.apache.phoenix.util.ScanUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class IndexMetaDataCacheClient {
 
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(IndexMetaDataCacheClient.class);
+
   private final ServerCacheClient serverCache;
   private PTable cacheUsingTable;
 
@@ -120,10 +129,60 @@ public class IndexMetaDataCacheClient {
       txState = connection.getMutationState().encodeTransaction();
     }
     boolean hasIndexMetaData = indexMetaDataPtr.getLength() > 0;
+    ReadOnlyProps props = connection.getQueryServices().getProps();
     if (hasIndexMetaData) {
-      if (
+      List<PTable> indexes = table.getIndexes();
+      boolean sendIndexMaintainers = false;
+      if (indexes != null) {
+        for (PTable index : indexes) {
+          if (IndexMaintainer.sendIndexMaintainer(index)) {
+            sendIndexMaintainers = true;
+            break;
+          }
+        }
+      }
+      boolean useServerMetadata = 
props.getBoolean(INDEX_USE_SERVER_METADATA_ATTRIB,
+        QueryServicesOptions.DEFAULT_INDEX_USE_SERVER_METADATA)
+        && props.getBoolean(QueryServices.INDEX_REGION_OBSERVER_ENABLED_ATTRIB,
+          QueryServicesOptions.DEFAULT_INDEX_REGION_OBSERVER_ENABLED);
+      boolean serverSideImmutableIndexes =
+        props.getBoolean(SERVER_SIDE_IMMUTABLE_INDEXES_ENABLED_ATTRIB,
+          DEFAULT_SERVER_SIDE_IMMUTABLE_INDEXES_ENABLED);
+      boolean useServerCacheRpc =
         useIndexMetadataCache(connection, mutations, 
indexMetaDataPtr.getLength() + txState.length)
+          && sendIndexMaintainers;
+      long updateCacheFreq = table.getUpdateCacheFrequency();
+      // PHOENIX-7727 Eliminate IndexMetadataCache RPCs by leveraging server 
PTable cache and
+      // retrieve IndexMaintainer objects for each active index from the 
PTable object.
+      // To optimize rpc calls, use it only when all of these conditions are 
met:
+      // 1. Use server metadata feature is enabled (enabled by default).
+      // 2. New index design is used (IndexRegionObserver coproc).
+      // 3. Table is not of type System.
+      // 4. Either table has mutable indexes or server side handling of 
immutable indexes is
+      // enabled.
+      // 5. Table's UPDATE_CACHE_FREQUENCY is not ALWAYS. This ensures 
IndexRegionObserver
+      // does not have to make additional getTable() rpc call with each 
batchMutate() rpc call.
+      // 6. Table's UPDATE_CACHE_FREQUENCY is ALWAYS but addServerCache() rpc 
call is needed
+      // due to the size of mutations. Unless expensive addServerCache() rpc 
call is required,
+      // client can attach index maintainer mutation attribute so that 
IndexRegionObserver
+      // does not have to make additional getTable() rpc call with each 
batchMutate() rpc call
+      // with small mutation size (size < 
phoenix.index.mutableBatchSizeThreshold value).
+      // If (a) above conditions do not match, (b) the mutation size is 
greater than
+      // "phoenix.index.mutableBatchSizeThreshold" value, and (c) data table 
needs to send index
+      // mutation with the data table mutation, we can use expensive 
addServerCache() rpc call.
+      // However, (a) above conditions do not match, (b) the mutation size is 
greater than
+      // "phoenix.index.mutableBatchSizeThreshold" value, and (c) data table 
mutation does not need
+      // to send index mutation (because all indexes are only in any of 
DISABLE, CREATE_DISABLE,
+      // PENDING_ACTIVE states), we can avoid expensive addServerCache() rpc 
call.
+      if (
+        useServerMetadata && table.getType() != PTableType.SYSTEM
+          && (!table.isImmutableRows() || serverSideImmutableIndexes)
+          && (updateCacheFreq > 0 || useServerCacheRpc)
       ) {
+        LOGGER.trace("Using server-side metadata for table {}, not sending 
IndexMaintainer or UUID",
+          table.getTableName());
+        uuidValue = ByteUtil.EMPTY_BYTE_ARRAY;
+      } else if (useServerCacheRpc) {
         IndexMetaDataCacheClient client = new 
IndexMetaDataCacheClient(connection, table);
         cache = client.addIndexMetadataCache(mutations, indexMetaDataPtr, 
txState);
         uuidValue = cache.getId();
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java 
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java
index 66b8fe5379..dc5d042e46 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -115,6 +115,7 @@ public interface QueryServices extends SQLCloseable {
   public static final String IMMUTABLE_ROWS_ATTRIB = 
"phoenix.mutate.immutableRows";
   public static final String INDEX_MUTATE_BATCH_SIZE_THRESHOLD_ATTRIB =
     "phoenix.index.mutableBatchSizeThreshold";
+  public static final String INDEX_USE_SERVER_METADATA_ATTRIB = 
"phoenix.index.useServerMetadata";
   public static final String DROP_METADATA_ATTRIB = 
"phoenix.schema.dropMetaData";
   public static final String GROUPBY_SPILLABLE_ATTRIB = 
"phoenix.groupby.spillable";
   public static final String GROUPBY_SPILL_FILES_ATTRIB = 
"phoenix.groupby.spillFiles";
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index 5e345f4901..2af299aa2c 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -66,6 +66,7 @@ import static 
org.apache.phoenix.query.QueryServices.INDEX_CREATE_DEFAULT_STATE;
 import static 
org.apache.phoenix.query.QueryServices.INDEX_MUTATE_BATCH_SIZE_THRESHOLD_ATTRIB;
 import static 
org.apache.phoenix.query.QueryServices.INDEX_POPULATION_SLEEP_TIME;
 import static 
org.apache.phoenix.query.QueryServices.INDEX_REBUILD_TASK_INITIAL_DELAY;
+import static 
org.apache.phoenix.query.QueryServices.INDEX_USE_SERVER_METADATA_ATTRIB;
 import static 
org.apache.phoenix.query.QueryServices.IS_NAMESPACE_MAPPING_ENABLED;
 import static 
org.apache.phoenix.query.QueryServices.IS_SYSTEM_TABLE_MAPPED_TO_NAMESPACE;
 import static org.apache.phoenix.query.QueryServices.KEEP_ALIVE_MS_ATTRIB;
@@ -202,6 +203,7 @@ public class QueryServicesOptions {
   public static final int DEFAULT_DISTINCT_VALUE_COMPRESS_THRESHOLD = 1024 * 
1024 * 1; // 1 Mb
   public static final int DEFAULT_AGGREGATE_CHUNK_SIZE_INCREASE = 1024 * 1024 
* 1; // 1 Mb
   public static final int DEFAULT_INDEX_MUTATE_BATCH_SIZE_THRESHOLD = 3;
+  public static final boolean DEFAULT_INDEX_USE_SERVER_METADATA = true;
   public static final long DEFAULT_MAX_SPOOL_TO_DISK_BYTES = 1024000000;
   // Only the first chunked batches are fetched in parallel, so this default
   // should be on the relatively bigger side of things. Bigger means more
@@ -551,6 +553,7 @@ public class QueryServicesOptions {
       .setIfUnset(IMMUTABLE_ROWS_ATTRIB, DEFAULT_IMMUTABLE_ROWS)
       .setIfUnset(INDEX_MUTATE_BATCH_SIZE_THRESHOLD_ATTRIB,
         DEFAULT_INDEX_MUTATE_BATCH_SIZE_THRESHOLD)
+      .setIfUnset(INDEX_USE_SERVER_METADATA_ATTRIB, 
DEFAULT_INDEX_USE_SERVER_METADATA)
       .setIfUnset(MAX_SPOOL_TO_DISK_BYTES_ATTRIB, 
DEFAULT_MAX_SPOOL_TO_DISK_BYTES)
       .setIfUnset(DROP_METADATA_ATTRIB, DEFAULT_DROP_METADATA)
       .setIfUnset(GROUPBY_SPILLABLE_ATTRIB, DEFAULT_GROUPBY_SPILLABLE)
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/schema/PTable.java 
b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/PTable.java
index 2baa7b7e1c..36a9ab739f 100644
--- a/phoenix-core-client/src/main/java/org/apache/phoenix/schema/PTable.java
+++ b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/PTable.java
@@ -882,9 +882,26 @@ public interface PTable extends PMetaDataEntity {
   boolean getIndexMaintainers(ImmutableBytesWritable ptr, PhoenixConnection 
connection)
     throws SQLException;
 
+  /**
+   * Retrieves the IndexMaintainer for this index table. This method caches 
the IndexMaintainer
+   * object in the PTable. This is only relevant for PTable objects of type 
INDEX.
+   * @param dataTable  data table associated with the index.
+   * @param connection PhoenixConnection object.
+   * @return the IndexMaintainer for this index table.
+   * @throws SQLException if an error occurs during IndexMaintainer creation.
+   */
   IndexMaintainer getIndexMaintainer(PTable dataTable, PhoenixConnection 
connection)
     throws SQLException;
 
+  /**
+   * Retrieves the IndexMaintainer for this index table with CDC context. This 
method caches the
+   * IndexMaintainer object in the PTable. This is only relevant for PTable 
objects of type INDEX.
+   * @param dataTable  data table associated with the index.
+   * @param cdcTable   CDC table.
+   * @param connection PhoenixConnection object.
+   * @return the IndexMaintainer for this index table.
+   * @throws SQLException if an error occurs during IndexMaintainer creation.
+   */
   IndexMaintainer getIndexMaintainer(PTable dataTable, PTable cdcTable,
     PhoenixConnection connection) throws SQLException;
 
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaDataBuilder.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaDataBuilder.java
index 45f70d3bbb..3d38a5a394 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaDataBuilder.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaDataBuilder.java
@@ -18,9 +18,12 @@
 package org.apache.phoenix.index;
 
 import java.io.IOException;
+import java.sql.Connection;
 import java.sql.SQLException;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
@@ -32,14 +35,24 @@ import org.apache.phoenix.cache.TenantCache;
 import org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.exception.SQLExceptionInfo;
+import org.apache.phoenix.execute.MutationState;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.transaction.PhoenixTransactionContext;
 import org.apache.phoenix.transaction.TransactionFactory;
 import org.apache.phoenix.util.ClientUtil;
 import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.QueryUtil;
 import org.apache.phoenix.util.ScanUtil;
+import org.apache.phoenix.util.SchemaUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class PhoenixIndexMetaDataBuilder {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PhoenixIndexMetaDataBuilder.class);
+
   private final RegionCoprocessorEnvironment env;
 
   PhoenixIndexMetaDataBuilder(RegionCoprocessorEnvironment env) {
@@ -63,6 +76,13 @@ public class PhoenixIndexMetaDataBuilder {
     if (uuid == null) {
       return IndexMetaDataCache.EMPTY_INDEX_META_DATA_CACHE;
     }
+    boolean useServerMetadata = uuid.length == 0;
+    if (useServerMetadata) {
+      IndexMetaDataCache cacheFromPTable = 
getIndexMetaDataCacheFromPTable(env, attributes);
+      if (cacheFromPTable != null) {
+        return cacheFromPTable;
+      }
+    }
     byte[] md = attributes.get(PhoenixIndexCodec.INDEX_PROTO_MD);
     if (md == null) {
       md = attributes.get(PhoenixIndexCodec.INDEX_MD);
@@ -77,28 +97,7 @@ public class PhoenixIndexMetaDataBuilder {
         : Bytes.toInt(clientVersionBytes);
       final PhoenixTransactionContext txnContext =
         TransactionFactory.getTransactionContext(txState, clientVersion);
-      return new IndexMetaDataCache() {
-
-        @Override
-        public void close() throws IOException {
-        }
-
-        @Override
-        public List<IndexMaintainer> getIndexMaintainers() {
-          return indexMaintainers;
-        }
-
-        @Override
-        public PhoenixTransactionContext getTransactionContext() {
-          return txnContext;
-        }
-
-        @Override
-        public int getClientVersion() {
-          return clientVersion;
-        }
-
-      };
+      return getIndexMetaDataCache(clientVersion, txnContext, 
indexMaintainers);
     } else {
       byte[] tenantIdBytes = attributes.get(PhoenixRuntime.TENANT_ID_ATTRIB);
       ImmutableBytesPtr tenantId =
@@ -117,4 +116,101 @@ public class PhoenixIndexMetaDataBuilder {
     }
 
   }
+
+  /**
+   * Get IndexMetaDataCache by looking up PTable using table metadata 
attributes attached to the
+   * mutation.
+   * @param env        RegionCoprocessorEnvironment.
+   * @param attributes Mutation attributes.
+   * @return IndexMetaDataCache or null if table metadata not found in 
attributes.
+   */
+  private static IndexMetaDataCache getIndexMetaDataCacheFromPTable(
+    RegionCoprocessorEnvironment env, Map<String, byte[]> attributes) {
+    try {
+      byte[] schemaBytes =
+        
attributes.get(MutationState.MutationMetadataType.SCHEMA_NAME.toString());
+      byte[] tableBytes =
+        
attributes.get(MutationState.MutationMetadataType.LOGICAL_TABLE_NAME.toString());
+      if (schemaBytes == null || tableBytes == null) {
+        LOGGER.error("Table metadata for table name and schema name not found 
in mutation "
+          + "attributes, falling back to GlobalCache lookup");
+        return null;
+      }
+      byte[] tenantIdBytes =
+        
attributes.get(MutationState.MutationMetadataType.TENANT_ID.toString());
+      byte[] txState = 
attributes.get(BaseScannerRegionObserverConstants.TX_STATE);
+      byte[] clientVersionBytes = 
attributes.get(BaseScannerRegionObserverConstants.CLIENT_VERSION);
+
+      final int clientVersion = clientVersionBytes == null
+        ? ScanUtil.UNKNOWN_CLIENT_VERSION
+        : Bytes.toInt(clientVersionBytes);
+      final PhoenixTransactionContext txnContext =
+        TransactionFactory.getTransactionContext(txState, clientVersion);
+
+      String fullTableName = SchemaUtil.getTableName(schemaBytes, tableBytes);
+      String tenantId =
+        tenantIdBytes == null || tenantIdBytes.length == 0 ? null : 
Bytes.toString(tenantIdBytes);
+      Properties props = new Properties();
+      if (tenantId != null) {
+        props.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId);
+      }
+      try (Connection conn = QueryUtil.getConnectionOnServer(props, 
env.getConfiguration())) {
+        PhoenixConnection pconn = conn.unwrap(PhoenixConnection.class);
+        PTable dataTable = pconn.getTable(tenantId, fullTableName);
+        final List<IndexMaintainer> indexMaintainers =
+          buildIndexMaintainersFromPTable(dataTable, pconn);
+        if (indexMaintainers.isEmpty()) {
+          LOGGER.debug("No active indexes found for table {}", fullTableName);
+          return null;
+        }
+        return getIndexMetaDataCache(clientVersion, txnContext, 
indexMaintainers);
+      }
+    } catch (Exception e) {
+      LOGGER.warn("Failed to get PTable from CQSI cache, falling back to 
GlobalCache lookup", e);
+      return null;
+    }
+  }
+
+  private static IndexMetaDataCache getIndexMetaDataCache(int clientVersion,
+    PhoenixTransactionContext txnContext, List<IndexMaintainer> 
indexMaintainers) {
+    return new IndexMetaDataCache() {
+      @Override
+      public void close() throws IOException {
+      }
+
+      @Override
+      public List<IndexMaintainer> getIndexMaintainers() {
+        return indexMaintainers;
+      }
+
+      @Override
+      public PhoenixTransactionContext getTransactionContext() {
+        return txnContext;
+      }
+
+      @Override
+      public int getClientVersion() {
+        return clientVersion;
+      }
+    };
+  }
+
+  /**
+   * Build List of IndexMaintainer for each active index.
+   * @param dataTable  PTable of the data table.
+   * @param connection PhoenixConnection.
+   * @return List of IndexMaintainer objects for active indexes.
+   */
+  private static List<IndexMaintainer> buildIndexMaintainersFromPTable(PTable 
dataTable,
+    PhoenixConnection connection) throws SQLException {
+    List<IndexMaintainer> indexMaintainers = new ArrayList<>();
+    List<PTable> indexes = dataTable.getIndexes();
+    for (PTable index : indexes) {
+      if (IndexMaintainer.sendIndexMaintainer(index)) {
+        IndexMaintainer maintainer = index.getIndexMaintainer(dataTable, 
connection);
+        indexMaintainers.add(maintainer);
+      }
+    }
+    return indexMaintainers;
+  }
 }
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsIT.java
index a20a266871..f18bc11fe1 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsIT.java
@@ -225,7 +225,7 @@ public class ConcurrentMutationsIT extends 
ParallelStatsDisabledIT {
       conn.close();
 
       Timestamp expectedTimestamp;
-      ts = 1040;
+      ts = clock.time + 1;
       clock.time = ts;
       conn = DriverManager.getConnection(getUrl(), props);
       stmt = conn.prepareStatement("UPSERT INTO " + tableName + " 
VALUES('aa','aa',?, null)");
@@ -239,7 +239,7 @@ public class ConcurrentMutationsIT extends 
ParallelStatsDisabledIT {
       conn.commit();
       conn.close();
 
-      ts = 1050;
+      ts = clock.time + 1;
       clock.time = ts;
       conn = DriverManager.getConnection(getUrl(), props);
 
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UCFWithDisabledIndexIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UCFWithDisabledIndexIT.java
index d061f16110..a1372be549 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UCFWithDisabledIndexIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UCFWithDisabledIndexIT.java
@@ -18,6 +18,7 @@
 package org.apache.phoenix.end2end;
 
 import static 
org.apache.phoenix.query.QueryServices.DISABLE_VIEW_SUBTREE_VALIDATION;
+import static 
org.apache.phoenix.query.QueryServices.INDEX_USE_SERVER_METADATA_ATTRIB;
 
 import com.google.protobuf.RpcCallback;
 import com.google.protobuf.RpcController;
@@ -75,6 +76,7 @@ public class UCFWithDisabledIndexIT extends BaseTest {
     props.put(QueryServices.PHOENIX_METADATA_INVALIDATE_CACHE_ENABLED, 
Boolean.toString(false));
     props.put(QueryServices.TASK_HANDLING_INITIAL_DELAY_MS_ATTRIB, 
Long.toString(Long.MAX_VALUE));
     props.put(DISABLE_VIEW_SUBTREE_VALIDATION, "true");
+    props.put(INDEX_USE_SERVER_METADATA_ATTRIB, Boolean.toString(false));
     setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
   }
 
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UCFWithDisabledIndexWithDDLValidationIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UCFWithDisabledIndexWithDDLValidationIT.java
index ef2636e2ec..8c63fd90d8 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UCFWithDisabledIndexWithDDLValidationIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UCFWithDisabledIndexWithDDLValidationIT.java
@@ -17,6 +17,8 @@
  */
 package org.apache.phoenix.end2end;
 
+import static 
org.apache.phoenix.query.QueryServices.INDEX_USE_SERVER_METADATA_ATTRIB;
+
 import java.util.Map;
 import org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants;
 import org.apache.phoenix.query.QueryServices;
@@ -43,6 +45,7 @@ public class UCFWithDisabledIndexWithDDLValidationIT extends 
UCFWithDisabledInde
     props.put(QueryServices.LAST_DDL_TIMESTAMP_VALIDATION_ENABLED, 
Boolean.toString(true));
     props.put(QueryServices.PHOENIX_METADATA_INVALIDATE_CACHE_ENABLED, 
Boolean.toString(true));
     props.put(QueryServices.TASK_HANDLING_INITIAL_DELAY_MS_ATTRIB, 
Long.toString(Long.MAX_VALUE));
+    props.put(INDEX_USE_SERVER_METADATA_ATTRIB, Boolean.toString(false));
     setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
   }
 
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UCFWithServerMetadataIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UCFWithServerMetadataIT.java
new file mode 100644
index 0000000000..6a19bec2e6
--- /dev/null
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UCFWithServerMetadataIT.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import static org.junit.Assert.assertEquals;
+
+import com.google.protobuf.RpcCallback;
+import com.google.protobuf.RpcController;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.phoenix.coprocessor.MetaDataEndpointImpl;
+import org.apache.phoenix.coprocessor.ServerCachingEndpointImpl;
+import org.apache.phoenix.coprocessor.generated.MetaDataProtos.GetTableRequest;
+import 
org.apache.phoenix.coprocessor.generated.MetaDataProtos.MetaDataResponse;
+import 
org.apache.phoenix.coprocessor.generated.ServerCachingProtos.AddServerCacheRequest;
+import 
org.apache.phoenix.coprocessor.generated.ServerCachingProtos.AddServerCacheResponse;
+import org.apache.phoenix.query.BaseTest;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.apache.phoenix.util.TestUtil;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+/**
+ * Test GetTable rpc calls for the combination of UCF and useServerMetadata.
+ */
+@Category(NeedsOwnMiniClusterTest.class)
+@RunWith(Parameterized.class)
+public class UCFWithServerMetadataIT extends BaseTest {
+
+  private final String updateCacheFrequency;
+  private final boolean useServerMetadata;
+  private final boolean singleRowUpdate;
+  private static final AtomicLong getTableCallCount = new AtomicLong(0);
+  private static final AtomicLong addServerCacheCallCount = new AtomicLong(0);
+
+  public UCFWithServerMetadataIT(String updateCacheFrequency, boolean 
useServerMetadata,
+    boolean singleRowUpdate) {
+    this.updateCacheFrequency = updateCacheFrequency;
+    this.useServerMetadata = useServerMetadata;
+    this.singleRowUpdate = singleRowUpdate;
+  }
+
+  @Parameters(name = "UpdateCacheFrequency={0}, UseServerMetadata={1}, 
SingleRowUpdate={2}")
+  public static Collection<Object[]> data() {
+    return Arrays.asList(new Object[][] { { "60000", true, true }, { "60000", 
true, false },
+      { "60000", false, true }, { "60000", false, false }, { "ALWAYS", true, 
true },
+      { "ALWAYS", true, false }, { "ALWAYS", false, true }, { "ALWAYS", false, 
false } });
+  }
+
+  public static class TrackingMetaDataEndpointImpl extends 
MetaDataEndpointImpl {
+
+    @Override
+    public void getTable(RpcController controller, GetTableRequest request,
+      RpcCallback<MetaDataResponse> done) {
+      getTableCallCount.incrementAndGet();
+      super.getTable(controller, request, done);
+    }
+  }
+
+  public static class TrackingServerCachingEndpointImpl extends 
ServerCachingEndpointImpl {
+
+    @Override
+    public void addServerCache(RpcController controller, AddServerCacheRequest 
request,
+      RpcCallback<AddServerCacheResponse> done) {
+      addServerCacheCallCount.incrementAndGet();
+      super.addServerCache(controller, request, done);
+    }
+  }
+
+  @BeforeClass
+  public static synchronized void doSetup() throws Exception {
+    Map<String, String> props = new HashMap<>(1);
+    props.put(QueryServices.TASK_HANDLING_INITIAL_DELAY_MS_ATTRIB, 
Long.toString(Long.MAX_VALUE));
+    setUpTestDriver(new ReadOnlyProps(props));
+  }
+
+  @Before
+  public void setUp() {
+    getTableCallCount.set(0);
+    addServerCacheCallCount.set(0);
+  }
+
+  @Test
+  public void testUpdateCacheFrequency() throws Exception {
+    String dataTableName = generateUniqueName();
+    String coveredIndex1 = "CI1_" + generateUniqueName();
+    String coveredIndex2 = "CI2_" + generateUniqueName();
+    String uncoveredIndex1 = "UI1_" + generateUniqueName();
+    String uncoveredIndex2 = "UI2_" + generateUniqueName();
+    Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES);
+    props.setProperty(QueryServices.INDEX_USE_SERVER_METADATA_ATTRIB,
+      Boolean.toString(useServerMetadata));
+    try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+      String createTableDDL =
+        String.format("CREATE TABLE %s (id INTEGER PRIMARY KEY, name 
VARCHAR(50), "
+          + "age INTEGER, city VARCHAR(50), salary INTEGER, department 
VARCHAR(50)"
+          + ") UPDATE_CACHE_FREQUENCY=%s", dataTableName, 
updateCacheFrequency);
+      conn.createStatement().execute(createTableDDL);
+      attachCustomCoprocessor(conn, dataTableName);
+      conn.createStatement().execute(String
+        .format("CREATE INDEX %s ON %s (name) INCLUDE (age, city)", 
coveredIndex1, dataTableName));
+      conn.createStatement().execute(String.format(
+        "CREATE INDEX %s ON %s (city) INCLUDE (salary, department)", 
coveredIndex2, dataTableName));
+      conn.createStatement()
+        .execute(String.format("CREATE INDEX %s ON %s (age)", uncoveredIndex1, 
dataTableName));
+      conn.createStatement()
+        .execute(String.format("CREATE INDEX %s ON %s (salary)", 
uncoveredIndex2, dataTableName));
+      String upsertSQL = String.format(
+        "UPSERT INTO %s (id, name, age, city, salary, department) VALUES (?, 
?, ?, ?, ?, ?)",
+        dataTableName);
+      int totalRows = 52;
+      int batchSize = 8;
+      PreparedStatement stmt = conn.prepareStatement(upsertSQL);
+      long startGetTableCalls = getTableCallCount.get();
+      for (int i = 1; i <= totalRows; i++) {
+        stmt.setInt(1, i);
+        stmt.setString(2, "Name" + i);
+        stmt.setInt(3, 20 + (i % 40));
+        stmt.setString(4, "City" + (i % 10));
+        stmt.setInt(5, 30000 + (i * 1000));
+        stmt.setString(6, "Dept" + (i % 5));
+        stmt.executeUpdate();
+        if (singleRowUpdate) {
+          conn.commit();
+        } else if (i % batchSize == 0) {
+          conn.commit();
+        }
+      }
+      if (!singleRowUpdate) {
+        conn.commit();
+      }
+      long actualGetTableCalls = getTableCallCount.get() - startGetTableCalls;
+      int expectedCalls;
+      String caseKey = updateCacheFrequency + "_" + useServerMetadata + "_" + 
singleRowUpdate;
+      switch (caseKey) {
+        case "60000_true_true":
+        case "60000_true_false":
+          expectedCalls = 1;
+          break;
+        case "60000_false_true":
+        case "60000_false_false":
+          expectedCalls = 0;
+          break;
+        case "ALWAYS_true_true":
+          expectedCalls = totalRows;
+          break;
+        case "ALWAYS_true_false":
+          expectedCalls = (int) Math.ceil((double) totalRows / batchSize) * 2;
+          break;
+        case "ALWAYS_false_true":
+          expectedCalls = totalRows;
+          break;
+        case "ALWAYS_false_false":
+          expectedCalls = (int) Math.ceil((double) totalRows / batchSize);
+          break;
+        default:
+          throw new IllegalArgumentException("Unexpected test case: " + 
caseKey);
+      }
+      assertEquals("Expected exact number of getTable() calls for case: " + 
caseKey, expectedCalls,
+        actualGetTableCalls);
+      long actualAddServerCacheCalls = addServerCacheCallCount.get();
+      int expectedAddServerCacheCalls;
+      switch (caseKey) {
+        case "60000_false_false":
+        case "ALWAYS_false_false":
+          expectedAddServerCacheCalls = (int) Math.ceil((double) totalRows / 
batchSize);
+          break;
+        default:
+          expectedAddServerCacheCalls = 0;
+          break;
+      }
+      assertEquals("Expected exact number of addServerCache() calls for case: 
" + caseKey,
+        expectedAddServerCacheCalls, actualAddServerCacheCalls);
+    }
+  }
+
+  private void attachCustomCoprocessor(Connection conn, String dataTableName) 
throws Exception {
+    TestUtil.removeCoprocessor(conn, "SYSTEM.CATALOG", 
MetaDataEndpointImpl.class);
+    TestUtil.addCoprocessor(conn, "SYSTEM.CATALOG", 
TrackingMetaDataEndpointImpl.class);
+    TestUtil.removeCoprocessor(conn, dataTableName, 
ServerCachingEndpointImpl.class);
+    TestUtil.addCoprocessor(conn, dataTableName, 
TrackingServerCachingEndpointImpl.class);
+  }
+
+}

Reply via email to