This is an automated email from the ASF dual-hosted git repository.
vjasani pushed a commit to branch 5.3
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/5.3 by this push:
new fa19e20b2b PHOENIX-7727 Eliminate IndexMetadataCache RPCs by
leveraging server PTable cache (#2321)
fa19e20b2b is described below
commit fa19e20b2b990752edfa9c50871bc3d3ce1baf83
Author: Viraj Jasani <[email protected]>
AuthorDate: Wed Dec 17 16:59:40 2025 +0530
PHOENIX-7727 Eliminate IndexMetadataCache RPCs by leveraging server PTable
cache (#2321)
---
.../phoenix/index/IndexMetaDataCacheClient.java | 61 +++++-
.../org/apache/phoenix/query/QueryServices.java | 1 +
.../apache/phoenix/query/QueryServicesOptions.java | 3 +
.../java/org/apache/phoenix/schema/PTable.java | 17 ++
.../phoenix/index/PhoenixIndexMetaDataBuilder.java | 140 +++++++++++---
.../phoenix/end2end/ConcurrentMutationsIT.java | 4 +-
.../phoenix/end2end/UCFWithDisabledIndexIT.java | 2 +
.../UCFWithDisabledIndexWithDDLValidationIT.java | 3 +
.../phoenix/end2end/UCFWithServerMetadataIT.java | 213 +++++++++++++++++++++
9 files changed, 419 insertions(+), 25 deletions(-)
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheClient.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheClient.java
index aa03f4a76c..53a833bbe0 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheClient.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheClient.java
@@ -18,6 +18,9 @@
package org.apache.phoenix.index;
import static
org.apache.phoenix.query.QueryServices.INDEX_MUTATE_BATCH_SIZE_THRESHOLD_ATTRIB;
+import static
org.apache.phoenix.query.QueryServices.INDEX_USE_SERVER_METADATA_ATTRIB;
+import static
org.apache.phoenix.query.QueryServices.SERVER_SIDE_IMMUTABLE_INDEXES_ENABLED_ATTRIB;
+import static
org.apache.phoenix.query.QueryServicesOptions.DEFAULT_SERVER_SIDE_IMMUTABLE_INDEXES_ENABLED;
import static org.apache.phoenix.schema.types.PDataType.TRUE_BYTES;
import java.sql.SQLException;
@@ -32,15 +35,21 @@ import
org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants;
import org.apache.phoenix.coprocessorclient.MetaDataProtocol;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.join.MaxServerCacheSizeExceededException;
+import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTableType;
import org.apache.phoenix.util.ByteUtil;
import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.ReadOnlyProps;
import org.apache.phoenix.util.ScanUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class IndexMetaDataCacheClient {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(IndexMetaDataCacheClient.class);
+
private final ServerCacheClient serverCache;
private PTable cacheUsingTable;
@@ -120,10 +129,60 @@ public class IndexMetaDataCacheClient {
txState = connection.getMutationState().encodeTransaction();
}
boolean hasIndexMetaData = indexMetaDataPtr.getLength() > 0;
+ ReadOnlyProps props = connection.getQueryServices().getProps();
if (hasIndexMetaData) {
- if (
+ List<PTable> indexes = table.getIndexes();
+ boolean sendIndexMaintainers = false;
+ if (indexes != null) {
+ for (PTable index : indexes) {
+ if (IndexMaintainer.sendIndexMaintainer(index)) {
+ sendIndexMaintainers = true;
+ break;
+ }
+ }
+ }
+ boolean useServerMetadata =
props.getBoolean(INDEX_USE_SERVER_METADATA_ATTRIB,
+ QueryServicesOptions.DEFAULT_INDEX_USE_SERVER_METADATA)
+ && props.getBoolean(QueryServices.INDEX_REGION_OBSERVER_ENABLED_ATTRIB,
+ QueryServicesOptions.DEFAULT_INDEX_REGION_OBSERVER_ENABLED);
+ boolean serverSideImmutableIndexes =
+ props.getBoolean(SERVER_SIDE_IMMUTABLE_INDEXES_ENABLED_ATTRIB,
+ DEFAULT_SERVER_SIDE_IMMUTABLE_INDEXES_ENABLED);
+ boolean useServerCacheRpc =
useIndexMetadataCache(connection, mutations,
indexMetaDataPtr.getLength() + txState.length)
+ && sendIndexMaintainers;
+ long updateCacheFreq = table.getUpdateCacheFrequency();
+ // PHOENIX-7727 Eliminate IndexMetadataCache RPCs by leveraging server
PTable cache and
+ // retrieve IndexMaintainer objects for each active index from the
PTable object.
+ // To optimize rpc calls, use it only when all of these conditions are
met:
+ // 1. Use server metadata feature is enabled (enabled by default).
+ // 2. New index design is used (IndexRegionObserver coproc).
+ // 3. Table is not of type System.
+ // 4. Either table has mutable indexes or server side handling of
immutable indexes is
+ // enabled.
+ // 5. Table's UPDATE_CACHE_FREQUENCY is not ALWAYS. This ensures
IndexRegionObserver
+ // does not have to make additional getTable() rpc call with each
batchMutate() rpc call.
+ // 6. Table's UPDATE_CACHE_FREQUENCY is ALWAYS but addServerCache() rpc
call is needed
+ // due to the size of mutations. Unless expensive addServerCache() rpc
call is required,
+ // client can attach index maintainer mutation attribute so that
IndexRegionObserver
+ // does not have to make additional getTable() rpc call with each
batchMutate() rpc call
+ // with small mutation size (size <
phoenix.index.mutableBatchSizeThreshold value).
+ // If (a) above conditions do not match, (b) the mutation size is
greater than
+ // "phoenix.index.mutableBatchSizeThreshold" value, and (c) data table
needs to send index
+ // mutation with the data table mutation, we can use expensive
addServerCache() rpc call.
+ // However, (a) above conditions do not match, (b) the mutation size is
greater than
+ // "phoenix.index.mutableBatchSizeThreshold" value, and (c) data table
mutation does not need
+ // to send index mutation (because all indexes are only in any of
DISABLE, CREATE_DISABLE,
+ // PENDING_ACTIVE states), we can avoid expensive addServerCache() rpc
call.
+ if (
+ useServerMetadata && table.getType() != PTableType.SYSTEM
+ && (!table.isImmutableRows() || serverSideImmutableIndexes)
+ && (updateCacheFreq > 0 || useServerCacheRpc)
) {
+ LOGGER.trace("Using server-side metadata for table {}, not sending
IndexMaintainer or UUID",
+ table.getTableName());
+ uuidValue = ByteUtil.EMPTY_BYTE_ARRAY;
+ } else if (useServerCacheRpc) {
IndexMetaDataCacheClient client = new
IndexMetaDataCacheClient(connection, table);
cache = client.addIndexMetadataCache(mutations, indexMetaDataPtr,
txState);
uuidValue = cache.getId();
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java
index 66b8fe5379..dc5d042e46 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -115,6 +115,7 @@ public interface QueryServices extends SQLCloseable {
public static final String IMMUTABLE_ROWS_ATTRIB =
"phoenix.mutate.immutableRows";
public static final String INDEX_MUTATE_BATCH_SIZE_THRESHOLD_ATTRIB =
"phoenix.index.mutableBatchSizeThreshold";
+ public static final String INDEX_USE_SERVER_METADATA_ATTRIB =
"phoenix.index.useServerMetadata";
public static final String DROP_METADATA_ATTRIB =
"phoenix.schema.dropMetaData";
public static final String GROUPBY_SPILLABLE_ATTRIB =
"phoenix.groupby.spillable";
public static final String GROUPBY_SPILL_FILES_ATTRIB =
"phoenix.groupby.spillFiles";
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index 5e345f4901..2af299aa2c 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -66,6 +66,7 @@ import static
org.apache.phoenix.query.QueryServices.INDEX_CREATE_DEFAULT_STATE;
import static
org.apache.phoenix.query.QueryServices.INDEX_MUTATE_BATCH_SIZE_THRESHOLD_ATTRIB;
import static
org.apache.phoenix.query.QueryServices.INDEX_POPULATION_SLEEP_TIME;
import static
org.apache.phoenix.query.QueryServices.INDEX_REBUILD_TASK_INITIAL_DELAY;
+import static
org.apache.phoenix.query.QueryServices.INDEX_USE_SERVER_METADATA_ATTRIB;
import static
org.apache.phoenix.query.QueryServices.IS_NAMESPACE_MAPPING_ENABLED;
import static
org.apache.phoenix.query.QueryServices.IS_SYSTEM_TABLE_MAPPED_TO_NAMESPACE;
import static org.apache.phoenix.query.QueryServices.KEEP_ALIVE_MS_ATTRIB;
@@ -202,6 +203,7 @@ public class QueryServicesOptions {
public static final int DEFAULT_DISTINCT_VALUE_COMPRESS_THRESHOLD = 1024 *
1024 * 1; // 1 Mb
public static final int DEFAULT_AGGREGATE_CHUNK_SIZE_INCREASE = 1024 * 1024
* 1; // 1 Mb
public static final int DEFAULT_INDEX_MUTATE_BATCH_SIZE_THRESHOLD = 3;
+ public static final boolean DEFAULT_INDEX_USE_SERVER_METADATA = true;
public static final long DEFAULT_MAX_SPOOL_TO_DISK_BYTES = 1024000000;
// Only the first chunked batches are fetched in parallel, so this default
// should be on the relatively bigger side of things. Bigger means more
@@ -551,6 +553,7 @@ public class QueryServicesOptions {
.setIfUnset(IMMUTABLE_ROWS_ATTRIB, DEFAULT_IMMUTABLE_ROWS)
.setIfUnset(INDEX_MUTATE_BATCH_SIZE_THRESHOLD_ATTRIB,
DEFAULT_INDEX_MUTATE_BATCH_SIZE_THRESHOLD)
+ .setIfUnset(INDEX_USE_SERVER_METADATA_ATTRIB,
DEFAULT_INDEX_USE_SERVER_METADATA)
.setIfUnset(MAX_SPOOL_TO_DISK_BYTES_ATTRIB,
DEFAULT_MAX_SPOOL_TO_DISK_BYTES)
.setIfUnset(DROP_METADATA_ATTRIB, DEFAULT_DROP_METADATA)
.setIfUnset(GROUPBY_SPILLABLE_ATTRIB, DEFAULT_GROUPBY_SPILLABLE)
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/schema/PTable.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/PTable.java
index 2baa7b7e1c..36a9ab739f 100644
--- a/phoenix-core-client/src/main/java/org/apache/phoenix/schema/PTable.java
+++ b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/PTable.java
@@ -882,9 +882,26 @@ public interface PTable extends PMetaDataEntity {
boolean getIndexMaintainers(ImmutableBytesWritable ptr, PhoenixConnection
connection)
throws SQLException;
+ /**
+ * Retrieves the IndexMaintainer for this index table. This method caches
the IndexMaintainer
+ * object in the PTable. This is only relevant for PTable objects of type
INDEX.
+ * @param dataTable data table associated with the index.
+ * @param connection PhoenixConnection object.
+ * @return the IndexMaintainer for this index table.
+ * @throws SQLException if an error occurs during IndexMaintainer creation.
+ */
IndexMaintainer getIndexMaintainer(PTable dataTable, PhoenixConnection
connection)
throws SQLException;
+ /**
+ * Retrieves the IndexMaintainer for this index table with CDC context. This
method caches the
+ * IndexMaintainer object in the PTable. This is only relevant for PTable
objects of type INDEX.
+ * @param dataTable data table associated with the index.
+ * @param cdcTable CDC table.
+ * @param connection PhoenixConnection object.
+ * @return the IndexMaintainer for this index table.
+ * @throws SQLException if an error occurs during IndexMaintainer creation.
+ */
IndexMaintainer getIndexMaintainer(PTable dataTable, PTable cdcTable,
PhoenixConnection connection) throws SQLException;
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaDataBuilder.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaDataBuilder.java
index 45f70d3bbb..3d38a5a394 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaDataBuilder.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaDataBuilder.java
@@ -18,9 +18,12 @@
package org.apache.phoenix.index;
import java.io.IOException;
+import java.sql.Connection;
import java.sql.SQLException;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.Properties;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
@@ -32,14 +35,24 @@ import org.apache.phoenix.cache.TenantCache;
import org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants;
import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.exception.SQLExceptionInfo;
+import org.apache.phoenix.execute.MutationState;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.transaction.PhoenixTransactionContext;
import org.apache.phoenix.transaction.TransactionFactory;
import org.apache.phoenix.util.ClientUtil;
import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.QueryUtil;
import org.apache.phoenix.util.ScanUtil;
+import org.apache.phoenix.util.SchemaUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class PhoenixIndexMetaDataBuilder {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(PhoenixIndexMetaDataBuilder.class);
+
private final RegionCoprocessorEnvironment env;
PhoenixIndexMetaDataBuilder(RegionCoprocessorEnvironment env) {
@@ -63,6 +76,13 @@ public class PhoenixIndexMetaDataBuilder {
if (uuid == null) {
return IndexMetaDataCache.EMPTY_INDEX_META_DATA_CACHE;
}
+ boolean useServerMetadata = uuid.length == 0;
+ if (useServerMetadata) {
+ IndexMetaDataCache cacheFromPTable =
getIndexMetaDataCacheFromPTable(env, attributes);
+ if (cacheFromPTable != null) {
+ return cacheFromPTable;
+ }
+ }
byte[] md = attributes.get(PhoenixIndexCodec.INDEX_PROTO_MD);
if (md == null) {
md = attributes.get(PhoenixIndexCodec.INDEX_MD);
@@ -77,28 +97,7 @@ public class PhoenixIndexMetaDataBuilder {
: Bytes.toInt(clientVersionBytes);
final PhoenixTransactionContext txnContext =
TransactionFactory.getTransactionContext(txState, clientVersion);
- return new IndexMetaDataCache() {
-
- @Override
- public void close() throws IOException {
- }
-
- @Override
- public List<IndexMaintainer> getIndexMaintainers() {
- return indexMaintainers;
- }
-
- @Override
- public PhoenixTransactionContext getTransactionContext() {
- return txnContext;
- }
-
- @Override
- public int getClientVersion() {
- return clientVersion;
- }
-
- };
+ return getIndexMetaDataCache(clientVersion, txnContext,
indexMaintainers);
} else {
byte[] tenantIdBytes = attributes.get(PhoenixRuntime.TENANT_ID_ATTRIB);
ImmutableBytesPtr tenantId =
@@ -117,4 +116,101 @@ public class PhoenixIndexMetaDataBuilder {
}
}
+
+ /**
+ * Get IndexMetaDataCache by looking up PTable using table metadata
attributes attached to the
+ * mutation.
+ * @param env RegionCoprocessorEnvironment.
+ * @param attributes Mutation attributes.
+ * @return IndexMetaDataCache or null if table metadata not found in
attributes.
+ */
+ private static IndexMetaDataCache getIndexMetaDataCacheFromPTable(
+ RegionCoprocessorEnvironment env, Map<String, byte[]> attributes) {
+ try {
+ byte[] schemaBytes =
+
attributes.get(MutationState.MutationMetadataType.SCHEMA_NAME.toString());
+ byte[] tableBytes =
+
attributes.get(MutationState.MutationMetadataType.LOGICAL_TABLE_NAME.toString());
+ if (schemaBytes == null || tableBytes == null) {
+ LOGGER.error("Table metadata for table name and schema name not found
in mutation "
+ + "attributes, falling back to GlobalCache lookup");
+ return null;
+ }
+ byte[] tenantIdBytes =
+
attributes.get(MutationState.MutationMetadataType.TENANT_ID.toString());
+ byte[] txState =
attributes.get(BaseScannerRegionObserverConstants.TX_STATE);
+ byte[] clientVersionBytes =
attributes.get(BaseScannerRegionObserverConstants.CLIENT_VERSION);
+
+ final int clientVersion = clientVersionBytes == null
+ ? ScanUtil.UNKNOWN_CLIENT_VERSION
+ : Bytes.toInt(clientVersionBytes);
+ final PhoenixTransactionContext txnContext =
+ TransactionFactory.getTransactionContext(txState, clientVersion);
+
+ String fullTableName = SchemaUtil.getTableName(schemaBytes, tableBytes);
+ String tenantId =
+ tenantIdBytes == null || tenantIdBytes.length == 0 ? null :
Bytes.toString(tenantIdBytes);
+ Properties props = new Properties();
+ if (tenantId != null) {
+ props.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId);
+ }
+ try (Connection conn = QueryUtil.getConnectionOnServer(props,
env.getConfiguration())) {
+ PhoenixConnection pconn = conn.unwrap(PhoenixConnection.class);
+ PTable dataTable = pconn.getTable(tenantId, fullTableName);
+ final List<IndexMaintainer> indexMaintainers =
+ buildIndexMaintainersFromPTable(dataTable, pconn);
+ if (indexMaintainers.isEmpty()) {
+ LOGGER.debug("No active indexes found for table {}", fullTableName);
+ return null;
+ }
+ return getIndexMetaDataCache(clientVersion, txnContext,
indexMaintainers);
+ }
+ } catch (Exception e) {
+ LOGGER.warn("Failed to get PTable from CQSI cache, falling back to
GlobalCache lookup", e);
+ return null;
+ }
+ }
+
+ private static IndexMetaDataCache getIndexMetaDataCache(int clientVersion,
+ PhoenixTransactionContext txnContext, List<IndexMaintainer>
indexMaintainers) {
+ return new IndexMetaDataCache() {
+ @Override
+ public void close() throws IOException {
+ }
+
+ @Override
+ public List<IndexMaintainer> getIndexMaintainers() {
+ return indexMaintainers;
+ }
+
+ @Override
+ public PhoenixTransactionContext getTransactionContext() {
+ return txnContext;
+ }
+
+ @Override
+ public int getClientVersion() {
+ return clientVersion;
+ }
+ };
+ }
+
+ /**
+ * Build List of IndexMaintainer for each active index.
+ * @param dataTable PTable of the data table.
+ * @param connection PhoenixConnection.
+ * @return List of IndexMaintainer objects for active indexes.
+ */
+ private static List<IndexMaintainer> buildIndexMaintainersFromPTable(PTable
dataTable,
+ PhoenixConnection connection) throws SQLException {
+ List<IndexMaintainer> indexMaintainers = new ArrayList<>();
+ List<PTable> indexes = dataTable.getIndexes();
+ for (PTable index : indexes) {
+ if (IndexMaintainer.sendIndexMaintainer(index)) {
+ IndexMaintainer maintainer = index.getIndexMaintainer(dataTable,
connection);
+ indexMaintainers.add(maintainer);
+ }
+ }
+ return indexMaintainers;
+ }
}
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsIT.java
index a20a266871..f18bc11fe1 100644
---
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsIT.java
+++
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsIT.java
@@ -225,7 +225,7 @@ public class ConcurrentMutationsIT extends
ParallelStatsDisabledIT {
conn.close();
Timestamp expectedTimestamp;
- ts = 1040;
+ ts = clock.time + 1;
clock.time = ts;
conn = DriverManager.getConnection(getUrl(), props);
stmt = conn.prepareStatement("UPSERT INTO " + tableName + "
VALUES('aa','aa',?, null)");
@@ -239,7 +239,7 @@ public class ConcurrentMutationsIT extends
ParallelStatsDisabledIT {
conn.commit();
conn.close();
- ts = 1050;
+ ts = clock.time + 1;
clock.time = ts;
conn = DriverManager.getConnection(getUrl(), props);
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UCFWithDisabledIndexIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UCFWithDisabledIndexIT.java
index d061f16110..a1372be549 100644
---
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UCFWithDisabledIndexIT.java
+++
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UCFWithDisabledIndexIT.java
@@ -18,6 +18,7 @@
package org.apache.phoenix.end2end;
import static
org.apache.phoenix.query.QueryServices.DISABLE_VIEW_SUBTREE_VALIDATION;
+import static
org.apache.phoenix.query.QueryServices.INDEX_USE_SERVER_METADATA_ATTRIB;
import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcController;
@@ -75,6 +76,7 @@ public class UCFWithDisabledIndexIT extends BaseTest {
props.put(QueryServices.PHOENIX_METADATA_INVALIDATE_CACHE_ENABLED,
Boolean.toString(false));
props.put(QueryServices.TASK_HANDLING_INITIAL_DELAY_MS_ATTRIB,
Long.toString(Long.MAX_VALUE));
props.put(DISABLE_VIEW_SUBTREE_VALIDATION, "true");
+ props.put(INDEX_USE_SERVER_METADATA_ATTRIB, Boolean.toString(false));
setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
}
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UCFWithDisabledIndexWithDDLValidationIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UCFWithDisabledIndexWithDDLValidationIT.java
index ef2636e2ec..8c63fd90d8 100644
---
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UCFWithDisabledIndexWithDDLValidationIT.java
+++
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UCFWithDisabledIndexWithDDLValidationIT.java
@@ -17,6 +17,8 @@
*/
package org.apache.phoenix.end2end;
+import static
org.apache.phoenix.query.QueryServices.INDEX_USE_SERVER_METADATA_ATTRIB;
+
import java.util.Map;
import org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants;
import org.apache.phoenix.query.QueryServices;
@@ -43,6 +45,7 @@ public class UCFWithDisabledIndexWithDDLValidationIT extends
UCFWithDisabledInde
props.put(QueryServices.LAST_DDL_TIMESTAMP_VALIDATION_ENABLED,
Boolean.toString(true));
props.put(QueryServices.PHOENIX_METADATA_INVALIDATE_CACHE_ENABLED,
Boolean.toString(true));
props.put(QueryServices.TASK_HANDLING_INITIAL_DELAY_MS_ATTRIB,
Long.toString(Long.MAX_VALUE));
+ props.put(INDEX_USE_SERVER_METADATA_ATTRIB, Boolean.toString(false));
setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
}
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UCFWithServerMetadataIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UCFWithServerMetadataIT.java
new file mode 100644
index 0000000000..6a19bec2e6
--- /dev/null
+++
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UCFWithServerMetadataIT.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import static org.junit.Assert.assertEquals;
+
+import com.google.protobuf.RpcCallback;
+import com.google.protobuf.RpcController;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.phoenix.coprocessor.MetaDataEndpointImpl;
+import org.apache.phoenix.coprocessor.ServerCachingEndpointImpl;
+import org.apache.phoenix.coprocessor.generated.MetaDataProtos.GetTableRequest;
+import
org.apache.phoenix.coprocessor.generated.MetaDataProtos.MetaDataResponse;
+import
org.apache.phoenix.coprocessor.generated.ServerCachingProtos.AddServerCacheRequest;
+import
org.apache.phoenix.coprocessor.generated.ServerCachingProtos.AddServerCacheResponse;
+import org.apache.phoenix.query.BaseTest;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.apache.phoenix.util.TestUtil;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+/**
+ * Test GetTable rpc calls for the combination of UCF and useServerMetadata.
+ */
+@Category(NeedsOwnMiniClusterTest.class)
+@RunWith(Parameterized.class)
+public class UCFWithServerMetadataIT extends BaseTest {
+
+ private final String updateCacheFrequency;
+ private final boolean useServerMetadata;
+ private final boolean singleRowUpdate;
+ private static final AtomicLong getTableCallCount = new AtomicLong(0);
+ private static final AtomicLong addServerCacheCallCount = new AtomicLong(0);
+
+ public UCFWithServerMetadataIT(String updateCacheFrequency, boolean
useServerMetadata,
+ boolean singleRowUpdate) {
+ this.updateCacheFrequency = updateCacheFrequency;
+ this.useServerMetadata = useServerMetadata;
+ this.singleRowUpdate = singleRowUpdate;
+ }
+
+ @Parameters(name = "UpdateCacheFrequency={0}, UseServerMetadata={1},
SingleRowUpdate={2}")
+ public static Collection<Object[]> data() {
+ return Arrays.asList(new Object[][] { { "60000", true, true }, { "60000",
true, false },
+ { "60000", false, true }, { "60000", false, false }, { "ALWAYS", true,
true },
+ { "ALWAYS", true, false }, { "ALWAYS", false, true }, { "ALWAYS", false,
false } });
+ }
+
+ public static class TrackingMetaDataEndpointImpl extends
MetaDataEndpointImpl {
+
+ @Override
+ public void getTable(RpcController controller, GetTableRequest request,
+ RpcCallback<MetaDataResponse> done) {
+ getTableCallCount.incrementAndGet();
+ super.getTable(controller, request, done);
+ }
+ }
+
+ public static class TrackingServerCachingEndpointImpl extends
ServerCachingEndpointImpl {
+
+ @Override
+ public void addServerCache(RpcController controller, AddServerCacheRequest
request,
+ RpcCallback<AddServerCacheResponse> done) {
+ addServerCacheCallCount.incrementAndGet();
+ super.addServerCache(controller, request, done);
+ }
+ }
+
+ @BeforeClass
+ public static synchronized void doSetup() throws Exception {
+ Map<String, String> props = new HashMap<>(1);
+ props.put(QueryServices.TASK_HANDLING_INITIAL_DELAY_MS_ATTRIB,
Long.toString(Long.MAX_VALUE));
+ setUpTestDriver(new ReadOnlyProps(props));
+ }
+
+ @Before
+ public void setUp() {
+ getTableCallCount.set(0);
+ addServerCacheCallCount.set(0);
+ }
+
+ @Test
+ public void testUpdateCacheFrequency() throws Exception {
+ String dataTableName = generateUniqueName();
+ String coveredIndex1 = "CI1_" + generateUniqueName();
+ String coveredIndex2 = "CI2_" + generateUniqueName();
+ String uncoveredIndex1 = "UI1_" + generateUniqueName();
+ String uncoveredIndex2 = "UI2_" + generateUniqueName();
+ Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES);
+ props.setProperty(QueryServices.INDEX_USE_SERVER_METADATA_ATTRIB,
+ Boolean.toString(useServerMetadata));
+ try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ String createTableDDL =
+ String.format("CREATE TABLE %s (id INTEGER PRIMARY KEY, name
VARCHAR(50), "
+ + "age INTEGER, city VARCHAR(50), salary INTEGER, department
VARCHAR(50)"
+ + ") UPDATE_CACHE_FREQUENCY=%s", dataTableName,
updateCacheFrequency);
+ conn.createStatement().execute(createTableDDL);
+ attachCustomCoprocessor(conn, dataTableName);
+ conn.createStatement().execute(String
+ .format("CREATE INDEX %s ON %s (name) INCLUDE (age, city)",
coveredIndex1, dataTableName));
+ conn.createStatement().execute(String.format(
+ "CREATE INDEX %s ON %s (city) INCLUDE (salary, department)",
coveredIndex2, dataTableName));
+ conn.createStatement()
+ .execute(String.format("CREATE INDEX %s ON %s (age)", uncoveredIndex1,
dataTableName));
+ conn.createStatement()
+ .execute(String.format("CREATE INDEX %s ON %s (salary)",
uncoveredIndex2, dataTableName));
+ String upsertSQL = String.format(
+ "UPSERT INTO %s (id, name, age, city, salary, department) VALUES (?,
?, ?, ?, ?, ?)",
+ dataTableName);
+ int totalRows = 52;
+ int batchSize = 8;
+ PreparedStatement stmt = conn.prepareStatement(upsertSQL);
+ long startGetTableCalls = getTableCallCount.get();
+ for (int i = 1; i <= totalRows; i++) {
+ stmt.setInt(1, i);
+ stmt.setString(2, "Name" + i);
+ stmt.setInt(3, 20 + (i % 40));
+ stmt.setString(4, "City" + (i % 10));
+ stmt.setInt(5, 30000 + (i * 1000));
+ stmt.setString(6, "Dept" + (i % 5));
+ stmt.executeUpdate();
+ if (singleRowUpdate) {
+ conn.commit();
+ } else if (i % batchSize == 0) {
+ conn.commit();
+ }
+ }
+ if (!singleRowUpdate) {
+ conn.commit();
+ }
+ long actualGetTableCalls = getTableCallCount.get() - startGetTableCalls;
+ int expectedCalls;
+ String caseKey = updateCacheFrequency + "_" + useServerMetadata + "_" +
singleRowUpdate;
+ switch (caseKey) {
+ case "60000_true_true":
+ case "60000_true_false":
+ expectedCalls = 1;
+ break;
+ case "60000_false_true":
+ case "60000_false_false":
+ expectedCalls = 0;
+ break;
+ case "ALWAYS_true_true":
+ expectedCalls = totalRows;
+ break;
+ case "ALWAYS_true_false":
+ expectedCalls = (int) Math.ceil((double) totalRows / batchSize) * 2;
+ break;
+ case "ALWAYS_false_true":
+ expectedCalls = totalRows;
+ break;
+ case "ALWAYS_false_false":
+ expectedCalls = (int) Math.ceil((double) totalRows / batchSize);
+ break;
+ default:
+ throw new IllegalArgumentException("Unexpected test case: " +
caseKey);
+ }
+ assertEquals("Expected exact number of getTable() calls for case: " +
caseKey, expectedCalls,
+ actualGetTableCalls);
+ long actualAddServerCacheCalls = addServerCacheCallCount.get();
+ int expectedAddServerCacheCalls;
+ switch (caseKey) {
+ case "60000_false_false":
+ case "ALWAYS_false_false":
+ expectedAddServerCacheCalls = (int) Math.ceil((double) totalRows /
batchSize);
+ break;
+ default:
+ expectedAddServerCacheCalls = 0;
+ break;
+ }
+ assertEquals("Expected exact number of addServerCache() calls for case:
" + caseKey,
+ expectedAddServerCacheCalls, actualAddServerCacheCalls);
+ }
+ }
+
+ private void attachCustomCoprocessor(Connection conn, String dataTableName)
throws Exception {
+ TestUtil.removeCoprocessor(conn, "SYSTEM.CATALOG",
MetaDataEndpointImpl.class);
+ TestUtil.addCoprocessor(conn, "SYSTEM.CATALOG",
TrackingMetaDataEndpointImpl.class);
+ TestUtil.removeCoprocessor(conn, dataTableName,
ServerCachingEndpointImpl.class);
+ TestUtil.addCoprocessor(conn, dataTableName,
TrackingServerCachingEndpointImpl.class);
+ }
+
+}