This is an automated email from the ASF dual-hosted git repository.

vjasani pushed a commit to branch tmp-ec
in repository https://gitbox.apache.org/repos/asf/phoenix.git

commit 9d676e6fc4f3d3173a4e30c07c263299ac9cf6f7
Author: Viraj Jasani <[email protected]>
AuthorDate: Sun Mar 15 22:57:43 2026 -0700

    addendum - multi-tenant
---
 .../phoenix/hbase/index/IndexCDCConsumer.java      | 250 +++++++--
 .../MultiTenantEventualIndexGenerateIT.java        |  56 ++
 .../end2end/MultiTenantEventualIndexIT.java        | 624 +++++++++++++++++++++
 3 files changed, 893 insertions(+), 37 deletions(-)

diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexCDCConsumer.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexCDCConsumer.java
index 5616887c46..2fd66bae54 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexCDCConsumer.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexCDCConsumer.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.client.ConnectionUtils;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.phoenix.coprocessor.DelegateRegionCoprocessorEnvironment;
 import org.apache.phoenix.coprocessor.generated.IndexMutationsProtos;
@@ -45,9 +46,13 @@ import org.apache.phoenix.index.IndexMaintainer;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
 import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.schema.PColumn;
 import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.RowKeySchema;
 import org.apache.phoenix.schema.TableNotFoundException;
 import org.apache.phoenix.schema.types.IndexConsistency;
+import org.apache.phoenix.schema.types.PDataType;
+import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.CDCUtil;
 import org.apache.phoenix.util.EnvironmentEdgeManager;
 import org.apache.phoenix.util.QueryUtil;
@@ -144,6 +149,45 @@ public class IndexCDCConsumer implements Runnable {
   private boolean hasParentPartitions = false;
   private PTable cachedDataTable;
 
+  private boolean tenantInit = false;
+  private boolean isMultiTenant = false;
+  private String tenantIdColName;
+  private PDataType<?> tenantIdDataType;
+  private TenantScanInfo ownRegionScanInfo;
+
+  private final Map<String, TenantScanInfo> ancestorScanInfoCache = new 
HashMap<>();
+
+  private static class TenantScanInfo {
+
+    private static final TenantScanInfo EMPTY = new TenantScanInfo("", "", 
null, null, null);
+
+    private final String filter;
+    private final String orderBy;
+    private final Object startValue;
+    private final Object endValue;
+    private final PDataType<?> dataType;
+
+    TenantScanInfo(String filter, String orderBy, Object startValue, Object 
endValue,
+      PDataType<?> dataType) {
+      this.filter = filter;
+      this.orderBy = orderBy;
+      this.startValue = startValue;
+      this.endValue = endValue;
+      this.dataType = dataType;
+    }
+
+    int bindParams(PreparedStatement ps, int startIndex) throws SQLException {
+      int idx = startIndex;
+      if (startValue != null) {
+        ps.setObject(idx++, startValue, dataType.getSqlType());
+      }
+      if (endValue != null) {
+        ps.setObject(idx++, endValue, dataType.getSqlType());
+      }
+      return idx;
+    }
+  }
+
   /**
    * Creates a new IndexCDCConsumer for the given region with configurable 
serialization mode.
    * @param env                   region coprocessor environment.
@@ -224,6 +268,125 @@ public class IndexCDCConsumer implements Runnable {
     cachedDataTable = conn.getTable(dataTableName);
   }
 
+  private void initTenantInfo(PhoenixConnection conn) throws SQLException {
+    if (tenantInit) {
+      return;
+    }
+    PTable dataTable = getDataTable(conn);
+    isMultiTenant = dataTable.isMultiTenant();
+    if (!isMultiTenant) {
+      ownRegionScanInfo = TenantScanInfo.EMPTY;
+      tenantInit = true;
+      return;
+    }
+    int tenantColIndex = dataTable.getBucketNum() != null ? 1 : 0;
+    PColumn tenantCol = dataTable.getPKColumns().get(tenantColIndex);
+    tenantIdColName = tenantCol.getName().getString();
+    tenantIdDataType = tenantCol.getDataType();
+
+    byte[] regionStartKey = env.getRegion().getRegionInfo().getStartKey();
+    byte[] regionEndKey = env.getRegion().getRegionInfo().getEndKey();
+    ownRegionScanInfo = buildTenantScanInfo(regionStartKey, regionEndKey, 
dataTable);
+    LOG.debug(
+      "Initialized multi-tenant scan for table {} region {}:"
+        + " tenantCol {}, startTenant {}, endTenant {}",
+      dataTableName, encodedRegionName, tenantIdColName, 
ownRegionScanInfo.startValue,
+      ownRegionScanInfo.endValue);
+    tenantInit = true;
+  }
+
+  private TenantScanInfo buildTenantScanInfo(byte[] startKey, byte[] endKey, 
PTable dataTable) {
+    Object startVal = extractTenantIdFromRegionKey(startKey, dataTable);
+    Object endVal = extractTenantIdFromRegionKey(endKey, dataTable);
+    StringBuilder sb = new StringBuilder();
+    if (startVal != null) {
+      sb.append("\"").append(tenantIdColName).append("\" >= ? AND ");
+    }
+    if (endVal != null) {
+      sb.append("\"").append(tenantIdColName).append("\" <= ? AND ");
+    }
+    String filter = sb.toString();
+    String orderBy = filter.isEmpty() ? "" : "\"" + tenantIdColName + "\" 
ASC,";
+    return new TenantScanInfo(filter, orderBy, startVal, endVal, 
tenantIdDataType);
+  }
+
+  private Object extractTenantIdFromRegionKey(byte[] regionKey, PTable 
dataTable) {
+    if (regionKey == null || regionKey.length == 0) {
+      return null;
+    }
+    final RowKeySchema schema = dataTable.getRowKeySchema();
+    int pkPos = dataTable.getBucketNum() != null ? 1 : 0;
+    ImmutableBytesWritable ptr = new ImmutableBytesWritable();
+    int maxOffset = schema.iterator(regionKey, 0, regionKey.length, ptr);
+    for (int i = 0; i <= pkPos; i++) {
+      Boolean hasValue = schema.next(ptr, i, maxOffset);
+      if (!Boolean.TRUE.equals(hasValue)) {
+        return null;
+      }
+    }
+    byte[] tenantBytes = ByteUtil.copyKeyBytesIfNecessary(ptr);
+    PColumn tenantCol = dataTable.getPKColumns().get(pkPos);
+    return tenantCol.getDataType().toObject(tenantBytes, 0, tenantBytes.length,
+      tenantCol.getDataType(), tenantCol.getSortOrder(), 
tenantCol.getMaxLength(),
+      tenantCol.getScale());
+  }
+
+  private byte[][] lookupPartitionKeys(String partitionId) throws 
InterruptedException {
+    int retryCount = 0;
+    final String query = "SELECT PARTITION_START_KEY, PARTITION_END_KEY FROM "
+      + PhoenixDatabaseMetaData.SYSTEM_CDC_STREAM_NAME
+      + " WHERE TABLE_NAME = ? AND PARTITION_ID = ? LIMIT 1";
+    while (!stopped) {
+      try (
+        PhoenixConnection conn =
+          
QueryUtil.getConnectionOnServer(env.getConfiguration()).unwrap(PhoenixConnection.class);
+        PreparedStatement ps = conn.prepareStatement(query)) {
+        ps.setString(1, dataTableName);
+        ps.setString(2, partitionId);
+        try (ResultSet rs = ps.executeQuery()) {
+          if (rs.next()) {
+            byte[] startKey = rs.getBytes(1);
+            byte[] endKey = rs.getBytes(2);
+            return new byte[][] { startKey == null ? new byte[0] : startKey,
+              endKey == null ? new byte[0] : endKey };
+          }
+        }
+        LOG.error("No CDC_STREAM entry found for partition {} table {}. This 
should not happen.",
+          partitionId, dataTableName);
+        return new byte[][] { new byte[0], new byte[0] };
+      } catch (SQLException e) {
+        long sleepTime = ConnectionUtils.getPauseTime(pause, ++retryCount);
+        LOG.warn(
+          "Error while retrieving partition keys from CDC_STREAM for partition 
{} table {}. "
+            + "Retry #{}, sleeping {} ms before retrying...",
+          partitionId, dataTableName, retryCount, sleepTime, e);
+        sleepIfNotStopped(sleepTime);
+      }
+    }
+    return null;
+  }
+
+  private TenantScanInfo getPartitionTenantScanInfo(String partitionId)
+    throws InterruptedException {
+    if (!isMultiTenant) {
+      return TenantScanInfo.EMPTY;
+    }
+    if (partitionId.equals(encodedRegionName)) {
+      return ownRegionScanInfo;
+    }
+    TenantScanInfo cached = ancestorScanInfoCache.get(partitionId);
+    if (cached != null) {
+      return cached;
+    }
+    byte[][] keys = lookupPartitionKeys(partitionId);
+    if (keys == null) {
+      return TenantScanInfo.EMPTY;
+    }
+    TenantScanInfo info = buildTenantScanInfo(keys[0], keys[1], 
cachedDataTable);
+    ancestorScanInfoCache.put(partitionId, info);
+    return info;
+  }
+
   @Override
   public void run() {
     try {
@@ -730,19 +893,23 @@ public class IndexCDCConsumer implements Runnable {
       dataTableName, partitionId, ownerPartitionId, lastProcessedTimestamp);
     try (PhoenixConnection conn =
       QueryUtil.getConnectionOnServer(config).unwrap(PhoenixConnection.class)) 
{
+      initTenantInfo(conn);
       String cdcObjectName = getCdcObjectName(conn);
+      TenantScanInfo scanInfo = getPartitionTenantScanInfo(partitionId);
       String cdcQuery;
       if (isParentReplay) {
-        cdcQuery = String
-          .format("SELECT /*+ CDC_INCLUDE(IDX_MUTATIONS) */ 
PHOENIX_ROW_TIMESTAMP(), \"CDC JSON\" "
-            + "FROM %s WHERE PARTITION_ID() = ? AND PHOENIX_ROW_TIMESTAMP() > 
? "
-            + "ORDER BY PARTITION_ID() ASC, PHOENIX_ROW_TIMESTAMP() ASC LIMIT 
?", cdcObjectName);
+        cdcQuery = String.format(
+          "SELECT /*+ CDC_INCLUDE(IDX_MUTATIONS) */ PHOENIX_ROW_TIMESTAMP(), 
\"CDC JSON\" "
+            + "FROM %s WHERE %s PARTITION_ID() = ? AND PHOENIX_ROW_TIMESTAMP() 
> ? "
+            + "ORDER BY %s PARTITION_ID() ASC, PHOENIX_ROW_TIMESTAMP() ASC 
LIMIT ?",
+          cdcObjectName, scanInfo.filter, scanInfo.orderBy);
       } else {
-        cdcQuery = String
-          .format("SELECT /*+ CDC_INCLUDE(IDX_MUTATIONS) */ 
PHOENIX_ROW_TIMESTAMP(), \"CDC JSON\" "
-            + "FROM %s WHERE PARTITION_ID() = ? AND PHOENIX_ROW_TIMESTAMP() > 
? "
+        cdcQuery = String.format(
+          "SELECT /*+ CDC_INCLUDE(IDX_MUTATIONS) */ PHOENIX_ROW_TIMESTAMP(), 
\"CDC JSON\" "
+            + "FROM %s WHERE %s PARTITION_ID() = ? AND PHOENIX_ROW_TIMESTAMP() 
> ? "
             + "AND PHOENIX_ROW_TIMESTAMP() < ? "
-            + "ORDER BY PARTITION_ID() ASC, PHOENIX_ROW_TIMESTAMP() ASC LIMIT 
?", cdcObjectName);
+            + "ORDER BY %s PARTITION_ID() ASC, PHOENIX_ROW_TIMESTAMP() ASC 
LIMIT ?",
+          cdcObjectName, scanInfo.filter, scanInfo.orderBy);
       }
       List<Pair<Long, IndexMutationsProtos.IndexMutations>> batchMutations = 
new ArrayList<>();
       long newLastTimestamp = lastProcessedTimestamp;
@@ -750,7 +917,7 @@ public class IndexCDCConsumer implements Runnable {
       int retryCount = 0;
       while (hasMoreRows && batchMutations.isEmpty()) {
         try (PreparedStatement ps = conn.prepareStatement(cdcQuery)) {
-          setStatementParams(partitionId, isParentReplay, newLastTimestamp, 
ps);
+          setStatementParams(scanInfo, partitionId, isParentReplay, 
newLastTimestamp, ps);
           Pair<Long, Boolean> result =
             getMutationsAndTimestamp(ps, newLastTimestamp, batchMutations);
           hasMoreRows = result.getSecond();
@@ -765,15 +932,17 @@ public class IndexCDCConsumer implements Runnable {
       // With predefined LIMIT, there might be more rows with the same 
timestamp that were not
       // included in this batch.
       if (newLastTimestamp > lastProcessedTimestamp) {
-        String sameTimestampQuery = String
-          .format("SELECT /*+ CDC_INCLUDE(IDX_MUTATIONS) */ 
PHOENIX_ROW_TIMESTAMP(), \"CDC JSON\" "
-            + "FROM %s WHERE PARTITION_ID() = ? AND PHOENIX_ROW_TIMESTAMP() = 
? "
-            + "ORDER BY PARTITION_ID() ASC, PHOENIX_ROW_TIMESTAMP() ASC", 
cdcObjectName);
+        String sameTimestampQuery = String.format(
+          "SELECT /*+ CDC_INCLUDE(IDX_MUTATIONS) */ PHOENIX_ROW_TIMESTAMP(), 
\"CDC JSON\" "
+            + "FROM %s WHERE %s PARTITION_ID() = ? AND PHOENIX_ROW_TIMESTAMP() 
= ? "
+            + "ORDER BY %s PARTITION_ID() ASC, PHOENIX_ROW_TIMESTAMP() ASC",
+          cdcObjectName, scanInfo.filter, scanInfo.orderBy);
         final long timestampToRefetch = newLastTimestamp;
         batchMutations.removeIf(pair -> pair.getFirst() == timestampToRefetch);
         try (PreparedStatement ps = conn.prepareStatement(sameTimestampQuery)) 
{
-          ps.setString(1, partitionId);
-          ps.setDate(2, new Date(newLastTimestamp));
+          int idx = scanInfo.bindParams(ps, 1);
+          ps.setString(idx++, partitionId);
+          ps.setDate(idx, new Date(newLastTimestamp));
           Pair<Long, Boolean> result =
             getMutationsAndTimestamp(ps, newLastTimestamp, batchMutations);
           newLastTimestamp = result.getFirst();
@@ -832,19 +1001,23 @@ public class IndexCDCConsumer implements Runnable {
       dataTableName, partitionId, ownerPartitionId, lastProcessedTimestamp);
     try (PhoenixConnection conn =
       QueryUtil.getConnectionOnServer(config).unwrap(PhoenixConnection.class)) 
{
+      initTenantInfo(conn);
       String cdcObjectName = getCdcObjectName(conn);
+      TenantScanInfo scanInfo = getPartitionTenantScanInfo(partitionId);
       String cdcQuery;
       if (isParentReplay) {
-        cdcQuery = String
-          .format("SELECT /*+ CDC_INCLUDE(DATA_ROW_STATE) */ 
PHOENIX_ROW_TIMESTAMP(), \"CDC JSON\" "
-            + "FROM %s WHERE PARTITION_ID() = ? AND PHOENIX_ROW_TIMESTAMP() > 
? "
-            + "ORDER BY PARTITION_ID() ASC, PHOENIX_ROW_TIMESTAMP() ASC LIMIT 
?", cdcObjectName);
+        cdcQuery = String.format(
+          "SELECT /*+ CDC_INCLUDE(DATA_ROW_STATE) */ PHOENIX_ROW_TIMESTAMP(), 
\"CDC JSON\" "
+            + "FROM %s WHERE %s PARTITION_ID() = ? AND PHOENIX_ROW_TIMESTAMP() 
> ? "
+            + "ORDER BY %s PARTITION_ID() ASC, PHOENIX_ROW_TIMESTAMP() ASC 
LIMIT ?",
+          cdcObjectName, scanInfo.filter, scanInfo.orderBy);
       } else {
-        cdcQuery = String
-          .format("SELECT /*+ CDC_INCLUDE(DATA_ROW_STATE) */ 
PHOENIX_ROW_TIMESTAMP(), \"CDC JSON\" "
-            + "FROM %s WHERE PARTITION_ID() = ? AND PHOENIX_ROW_TIMESTAMP() > 
? "
+        cdcQuery = String.format(
+          "SELECT /*+ CDC_INCLUDE(DATA_ROW_STATE) */ PHOENIX_ROW_TIMESTAMP(), 
\"CDC JSON\" "
+            + "FROM %s WHERE %s PARTITION_ID() = ? AND PHOENIX_ROW_TIMESTAMP() 
> ? "
             + "AND PHOENIX_ROW_TIMESTAMP() < ? "
-            + "ORDER BY PARTITION_ID() ASC, PHOENIX_ROW_TIMESTAMP() ASC LIMIT 
?", cdcObjectName);
+            + "ORDER BY %s PARTITION_ID() ASC, PHOENIX_ROW_TIMESTAMP() ASC 
LIMIT ?",
+          cdcObjectName, scanInfo.filter, scanInfo.orderBy);
       }
 
       List<Pair<Long, IndexMutationsProtos.DataRowStates>> batchStates = new 
ArrayList<>();
@@ -854,7 +1027,7 @@ public class IndexCDCConsumer implements Runnable {
       int retryCount = 0;
       while (hasMoreRows && batchStates.isEmpty()) {
         try (PreparedStatement ps = conn.prepareStatement(cdcQuery)) {
-          setStatementParams(partitionId, isParentReplay, newLastTimestamp, 
ps);
+          setStatementParams(scanInfo, partitionId, isParentReplay, 
newLastTimestamp, ps);
           Pair<Long, Boolean> result =
             getDataRowStatesAndTimestamp(ps, newLastTimestamp, batchStates, 
lastScannedTimestamp);
           hasMoreRows = result.getSecond();
@@ -878,15 +1051,17 @@ public class IndexCDCConsumer implements Runnable {
         }
       }
       if (newLastTimestamp > lastProcessedTimestamp) {
-        String sameTimestampQuery = String
-          .format("SELECT /*+ CDC_INCLUDE(DATA_ROW_STATE) */ 
PHOENIX_ROW_TIMESTAMP(), \"CDC JSON\" "
-            + "FROM %s WHERE PARTITION_ID() = ? AND PHOENIX_ROW_TIMESTAMP() = 
? "
-            + "ORDER BY PARTITION_ID() ASC, PHOENIX_ROW_TIMESTAMP() ASC", 
cdcObjectName);
+        String sameTimestampQuery = String.format(
+          "SELECT /*+ CDC_INCLUDE(DATA_ROW_STATE) */ PHOENIX_ROW_TIMESTAMP(), 
\"CDC JSON\" "
+            + "FROM %s WHERE %s PARTITION_ID() = ? AND PHOENIX_ROW_TIMESTAMP() 
= ? "
+            + "ORDER BY %s PARTITION_ID() ASC, PHOENIX_ROW_TIMESTAMP() ASC",
+          cdcObjectName, scanInfo.filter, scanInfo.orderBy);
         final long timestampToRefetch = newLastTimestamp;
         batchStates.removeIf(pair -> pair.getFirst() == timestampToRefetch);
         try (PreparedStatement ps = conn.prepareStatement(sameTimestampQuery)) 
{
-          ps.setString(1, partitionId);
-          ps.setDate(2, new Date(newLastTimestamp));
+          int idx = scanInfo.bindParams(ps, 1);
+          ps.setString(idx++, partitionId);
+          ps.setDate(idx, new Date(newLastTimestamp));
           Pair<Long, Boolean> result =
             getDataRowStatesAndTimestamp(ps, newLastTimestamp, batchStates, 
lastScannedTimestamp);
           newLastTimestamp = result.getFirst();
@@ -908,16 +1083,17 @@ public class IndexCDCConsumer implements Runnable {
     }
   }
 
-  private void setStatementParams(String partitionId, boolean isParentReplay, 
long newLastTimestamp,
-    PreparedStatement ps) throws SQLException {
-    ps.setString(1, partitionId);
-    ps.setDate(2, new Date(newLastTimestamp));
+  private void setStatementParams(TenantScanInfo scanInfo, String partitionId,
+    boolean isParentReplay, long newLastTimestamp, PreparedStatement ps) 
throws SQLException {
+    int idx = scanInfo.bindParams(ps, 1);
+    ps.setString(idx++, partitionId);
+    ps.setDate(idx++, new Date(newLastTimestamp));
     if (isParentReplay) {
-      ps.setInt(3, batchSize);
+      ps.setInt(idx, batchSize);
     } else {
       long currentTime = EnvironmentEdgeManager.currentTimeMillis() - 
timestampBufferMs;
-      ps.setDate(3, new Date(currentTime));
-      ps.setInt(4, batchSize);
+      ps.setDate(idx++, new Date(currentTime));
+      ps.setInt(idx, batchSize);
     }
   }
 
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MultiTenantEventualIndexGenerateIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MultiTenantEventualIndexGenerateIT.java
new file mode 100644
index 0000000000..857abf5e81
--- /dev/null
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MultiTenantEventualIndexGenerateIT.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import static 
org.apache.phoenix.hbase.index.IndexCDCConsumer.INDEX_CDC_CONSUMER_BATCH_SIZE;
+import static 
org.apache.phoenix.hbase.index.IndexCDCConsumer.INDEX_CDC_CONSUMER_RETRY_PAUSE_MS;
+import static 
org.apache.phoenix.hbase.index.IndexCDCConsumer.INDEX_CDC_CONSUMER_TIMESTAMP_BUFFER_MS;
+import static 
org.apache.phoenix.hbase.index.IndexRegionObserver.PHOENIX_INDEX_CDC_MUTATION_SERIALIZE;
+
+import java.util.Map;
+import org.apache.phoenix.coprocessor.PhoenixMasterObserver;
+import org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.junit.BeforeClass;
+import org.junit.experimental.categories.Category;
+
+import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
+
+@Category(NeedsOwnMiniClusterTest.class)
+public class MultiTenantEventualIndexGenerateIT extends 
MultiTenantEventualIndexIT {
+
+  @BeforeClass
+  public static synchronized void doSetup() throws Exception {
+    Map<String, String> props = Maps.newHashMapWithExpectedSize(12);
+    
props.put(QueryServices.GLOBAL_INDEX_ROW_AGE_THRESHOLD_TO_DELETE_MS_ATTRIB, 
Long.toString(0));
+    
props.put(BaseScannerRegionObserverConstants.PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY,
+      Integer.toString(MAX_LOOKBACK_AGE));
+    props.put(INDEX_CDC_CONSUMER_BATCH_SIZE, Integer.toString(3500));
+    props.put(INDEX_CDC_CONSUMER_TIMESTAMP_BUFFER_MS, Integer.toString(4000));
+    props.put(INDEX_CDC_CONSUMER_RETRY_PAUSE_MS, Long.toString(10));
+    props.put(QueryServices.TASK_HANDLING_INTERVAL_MS_ATTRIB, 
Long.toString(2));
+    props.put(QueryServices.TASK_HANDLING_INITIAL_DELAY_MS_ATTRIB, 
Long.toString(1));
+    props.put(QueryServices.SERVER_SIDE_IMMUTABLE_INDEXES_ENABLED_ATTRIB, 
Boolean.TRUE.toString());
+    props.put("hbase.coprocessor.master.classes", 
PhoenixMasterObserver.class.getName());
+    props.put(PHOENIX_INDEX_CDC_MUTATION_SERIALIZE, Boolean.FALSE.toString());
+    props.put(QueryServices.PHOENIX_SERVER_PAGE_SIZE_MS, Integer.toString(-1));
+    setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+  }
+
+}
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MultiTenantEventualIndexIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MultiTenantEventualIndexIT.java
new file mode 100644
index 0000000000..33b7bd7465
--- /dev/null
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MultiTenantEventualIndexIT.java
@@ -0,0 +1,624 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import static org.apache.phoenix.end2end.IndexToolIT.verifyIndexTable;
+import static 
org.apache.phoenix.hbase.index.IndexCDCConsumer.INDEX_CDC_CONSUMER_BATCH_SIZE;
+import static 
org.apache.phoenix.hbase.index.IndexCDCConsumer.INDEX_CDC_CONSUMER_TIMESTAMP_BUFFER_MS;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.coprocessor.PhoenixMasterObserver;
+import org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.apache.phoenix.util.TestUtil;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
+
+@Category(NeedsOwnMiniClusterTest.class)
+public class MultiTenantEventualIndexIT extends ParallelStatsDisabledIT {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(MultiTenantEventualIndexIT.class);
+  protected static final int MAX_LOOKBACK_AGE = 1000000;
+  private static final long WAIT_MS = 25000;
+  private static final int ROWS_PER_TENANT_PER_PHASE = 10;
+  private static final String[] TENANT_PREFIXES = { "AA_", "BB_", "CC_", 
"DD_", "EE_", "FF_" };
+
+  @BeforeClass
+  public static synchronized void doSetup() throws Exception {
+    Map<String, String> props = Maps.newHashMapWithExpectedSize(12);
+    
props.put(QueryServices.GLOBAL_INDEX_ROW_AGE_THRESHOLD_TO_DELETE_MS_ATTRIB, 
Long.toString(0));
+    
props.put(BaseScannerRegionObserverConstants.PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY,
+      Integer.toString(MAX_LOOKBACK_AGE));
+    props.put(INDEX_CDC_CONSUMER_BATCH_SIZE, Integer.toString(3500));
+    props.put(INDEX_CDC_CONSUMER_TIMESTAMP_BUFFER_MS, Integer.toString(4000));
+    props.put(QueryServices.TASK_HANDLING_INTERVAL_MS_ATTRIB, 
Long.toString(2));
+    props.put(QueryServices.TASK_HANDLING_INITIAL_DELAY_MS_ATTRIB, 
Long.toString(1));
+    props.put(QueryServices.SERVER_SIDE_IMMUTABLE_INDEXES_ENABLED_ATTRIB, 
Boolean.TRUE.toString());
+    props.put("hbase.coprocessor.master.classes", 
PhoenixMasterObserver.class.getName());
+    setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+  }
+
+  private Connection getTenantConnection(String tenantId) throws SQLException {
+    Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES);
+    props.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId);
+    return DriverManager.getConnection(getUrl(), props);
+  }
+
+  private Connection getGlobalConnection() throws SQLException {
+    Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES);
+    return DriverManager.getConnection(getUrl(), props);
+  }
+
+  private void waitForEventualConsistency() throws InterruptedException {
+    Thread.sleep(WAIT_MS);
+  }
+
+  private String[] createTenants() {
+    String[] tenants = new String[TENANT_PREFIXES.length];
+    for (int i = 0; i < TENANT_PREFIXES.length; i++) {
+      tenants[i] = TENANT_PREFIXES[i] + generateUniqueName();
+    }
+    return tenants;
+  }
+
+  private void insertRows(String[] tenants, String tableName, String phase, 
int startRow,
+    int endRow) throws Exception {
+    for (String tenant : tenants) {
+      try (Connection conn = getTenantConnection(tenant)) {
+        for (int j = startRow; j <= endRow; j++) {
+          conn.createStatement().execute(
+            String.format("UPSERT INTO %s(PK2, V1, V2) VALUES ('%s_r%d', 
'%s_v%d', '%s_d%d')",
+              tableName, phase, j, phase, j, phase, j));
+        }
+        conn.commit();
+      }
+    }
+  }
+
+  private void updateRows(String[] tenants, String tableName, String phase, 
int startRow,
+    int endRow, String suffix) throws Exception {
+    for (String tenant : tenants) {
+      try (Connection conn = getTenantConnection(tenant)) {
+        for (int j = startRow; j <= endRow; j++) {
+          conn.createStatement()
+            .execute(String.format(
+              "UPSERT INTO %s(PK2, V1, V2) VALUES ('%s_r%d', '%s_v%d_%s', 
'%s_d%d_%s')", tableName,
+              phase, j, phase, j, suffix, phase, j, suffix));
+        }
+        conn.commit();
+      }
+    }
+  }
+
+  private void deleteRows(String[] tenants, String tableName, String phase, 
int startRow,
+    int endRow) throws Exception {
+    for (String tenant : tenants) {
+      try (Connection conn = getTenantConnection(tenant)) {
+        for (int j = startRow; j <= endRow; j++) {
+          conn.createStatement()
+            .execute(String.format("DELETE FROM %s WHERE PK2 = '%s_r%d'", 
tableName, phase, j));
+        }
+        conn.commit();
+      }
+    }
+  }
+
+  private void verifyRowCount(String[] tenants, String tableName, int 
expectedCount, String phase)
+    throws Exception {
+    for (String tenant : tenants) {
+      try (Connection conn = getTenantConnection(tenant)) {
+        ResultSet rs = conn.createStatement()
+          .executeQuery("SELECT COUNT(*) FROM " + tableName + " WHERE V1 IS 
NOT NULL");
+        assertTrue(rs.next());
+        assertEquals(phase + ": tenant " + tenant, expectedCount, 
rs.getInt(1));
+      }
+    }
+  }
+
+  private void verifyIndexLookup(String[] tenants, String tableName, String 
v1Value,
+    String expectedPk2, String expectedV2) throws Exception {
+    for (String tenant : tenants) {
+      try (Connection conn = getTenantConnection(tenant)) {
+        ResultSet rs = conn.createStatement()
+          .executeQuery("SELECT PK2, V2 FROM " + tableName + " WHERE V1 = '" + 
v1Value + "'");
+        assertTrue("Row with V1=" + v1Value + " not found for " + tenant, 
rs.next());
+        assertEquals(expectedPk2, rs.getString(1));
+        if (expectedV2 != null) {
+          assertEquals(expectedV2, rs.getString(2));
+        }
+        assertFalse(rs.next());
+      }
+    }
+  }
+
+  private void verifyNoResult(String[] tenants, String tableName, String 
v1Value, String message)
+    throws Exception {
+    for (String tenant : tenants) {
+      try (Connection conn = getTenantConnection(tenant)) {
+        ResultSet rs = conn.createStatement()
+          .executeQuery("SELECT PK2 FROM " + tableName + " WHERE V1 = '" + 
v1Value + "'");
+        assertFalse(message + " for " + tenant, rs.next());
+      }
+    }
+  }
+
+  private int getRegionCount(Connection conn, String tableName) throws 
Exception {
+    List<HRegionLocation> regions = TestUtil.getAllTableRegions(conn, 
tableName);
+    return regions.size();
+  }
+
+  @Test
+  public void testBasicMultiTenantEventualIndex() throws Exception {
+    String tableName = generateUniqueName();
+    String indexName = generateUniqueName();
+    String tenantA = "TENANT_A_" + generateUniqueName();
+    String tenantB = "TENANT_B_" + generateUniqueName();
+
+    try (Connection globalConn = getGlobalConnection()) {
+      globalConn.createStatement()
+        .execute("CREATE TABLE " + tableName
+          + " (TENANT_ID VARCHAR NOT NULL, PK2 VARCHAR NOT NULL, V1 VARCHAR, 
V2 VARCHAR"
+          + " CONSTRAINT pk PRIMARY KEY (TENANT_ID, PK2))"
+          + " MULTI_TENANT=true, COLUMN_ENCODED_BYTES=0");
+      globalConn.createStatement().execute("CREATE INDEX " + indexName + " ON 
" + tableName
+        + "(V1) INCLUDE (V2) CONSISTENCY=EVENTUAL");
+    }
+
+    try (Connection connA = getTenantConnection(tenantA)) {
+      connA.createStatement()
+        .execute("UPSERT INTO " + tableName + "(PK2, V1, V2) VALUES ('r1', 
'a1', 'x1')");
+      connA.createStatement()
+        .execute("UPSERT INTO " + tableName + "(PK2, V1, V2) VALUES ('r2', 
'a2', 'x2')");
+      connA.commit();
+    }
+
+    try (Connection connB = getTenantConnection(tenantB)) {
+      connB.createStatement()
+        .execute("UPSERT INTO " + tableName + "(PK2, V1, V2) VALUES ('r1', 
'b1', 'y1')");
+      connB.createStatement()
+        .execute("UPSERT INTO " + tableName + "(PK2, V1, V2) VALUES ('r2', 
'b2', 'y2')");
+      connB.commit();
+    }
+
+    waitForEventualConsistency();
+
+    try (Connection connA = getTenantConnection(tenantA)) {
+      ResultSet rs = connA.createStatement()
+        .executeQuery("SELECT PK2, V1, V2 FROM " + tableName + " WHERE V1 = 
'a1'");
+      assertTrue(rs.next());
+      assertEquals("r1", rs.getString(1));
+      assertEquals("a1", rs.getString(2));
+      assertEquals("x1", rs.getString(3));
+      assertFalse(rs.next());
+
+      rs = connA.createStatement()
+        .executeQuery("SELECT PK2, V1, V2 FROM " + tableName + " WHERE V1 = 
'a2'");
+      assertTrue(rs.next());
+      assertEquals("r2", rs.getString(1));
+      assertEquals("a2", rs.getString(2));
+      assertEquals("x2", rs.getString(3));
+      assertFalse(rs.next());
+    }
+
+    try (Connection connB = getTenantConnection(tenantB)) {
+      ResultSet rs = connB.createStatement()
+        .executeQuery("SELECT PK2, V1, V2 FROM " + tableName + " WHERE V1 = 
'b1'");
+      assertTrue(rs.next());
+      assertEquals("r1", rs.getString(1));
+      assertEquals("b1", rs.getString(2));
+      assertEquals("y1", rs.getString(3));
+      assertFalse(rs.next());
+    }
+
+    try (Connection globalConn = getGlobalConnection()) {
+      long rowCount = verifyIndexTable(tableName, indexName, globalConn);
+      assertEquals(4, rowCount);
+    }
+  }
+
+  @Test
+  public void testMultiTenantDeleteAndUpsert() throws Exception {
+    String tableName = generateUniqueName();
+    String indexName = generateUniqueName();
+    String tenantA = "TENANT_A_" + generateUniqueName();
+    String tenantB = "TENANT_B_" + generateUniqueName();
+
+    try (Connection globalConn = getGlobalConnection()) {
+      globalConn.createStatement()
+        .execute("CREATE TABLE " + tableName
+          + " (TENANT_ID VARCHAR NOT NULL, PK2 VARCHAR NOT NULL, V1 VARCHAR, 
V2 VARCHAR"
+          + " CONSTRAINT pk PRIMARY KEY (TENANT_ID, PK2))"
+          + " MULTI_TENANT=true, COLUMN_ENCODED_BYTES=0");
+      globalConn.createStatement().execute("CREATE INDEX " + indexName + " ON 
" + tableName
+        + "(V1) INCLUDE (V2) CONSISTENCY=EVENTUAL");
+    }
+
+    try (Connection connA = getTenantConnection(tenantA)) {
+      connA.createStatement()
+        .execute("UPSERT INTO " + tableName + "(PK2, V1, V2) VALUES ('r1', 
'a1', 'x1')");
+      connA.createStatement()
+        .execute("UPSERT INTO " + tableName + "(PK2, V1, V2) VALUES ('r2', 
'a2', 'x2')");
+      connA.commit();
+    }
+
+    try (Connection connB = getTenantConnection(tenantB)) {
+      connB.createStatement()
+        .execute("UPSERT INTO " + tableName + "(PK2, V1, V2) VALUES ('r1', 
'b1', 'y1')");
+      connB.commit();
+    }
+
+    waitForEventualConsistency();
+
+    try (Connection connA = getTenantConnection(tenantA)) {
+      connA.createStatement().execute("DELETE FROM " + tableName + " WHERE PK2 
= 'r1'");
+      connA.commit();
+    }
+
+    try (Connection connB = getTenantConnection(tenantB)) {
+      connB.createStatement().execute(
+        "UPSERT INTO " + tableName + "(PK2, V1, V2) VALUES ('r1', 
'b1_updated', 'y1_updated')");
+      connB.commit();
+    }
+
+    waitForEventualConsistency();
+
+    try (Connection connA = getTenantConnection(tenantA)) {
+      ResultSet rs = connA.createStatement()
+        .executeQuery("SELECT PK2, V1, V2 FROM " + tableName + " WHERE V1 = 
'a1'");
+      assertFalse("Deleted row should not be visible", rs.next());
+
+      rs = connA.createStatement()
+        .executeQuery("SELECT PK2, V1, V2 FROM " + tableName + " WHERE V1 = 
'a2'");
+      assertTrue(rs.next());
+      assertEquals("r2", rs.getString(1));
+      assertFalse(rs.next());
+    }
+
+    try (Connection connB = getTenantConnection(tenantB)) {
+      ResultSet rs = connB.createStatement()
+        .executeQuery("SELECT PK2, V1, V2 FROM " + tableName + " WHERE V1 = 
'b1_updated'");
+      assertTrue(rs.next());
+      assertEquals("r1", rs.getString(1));
+      assertEquals("y1_updated", rs.getString(3));
+      assertFalse(rs.next());
+    }
+
+    try (Connection globalConn = getGlobalConnection()) {
+      long rowCount = verifyIndexTable(tableName, indexName, globalConn);
+      assertEquals(2, rowCount);
+    }
+  }
+
+  @Test
+  public void testMultiTenantUncoveredEventualIndex() throws Exception {
+    String tableName = generateUniqueName();
+    String indexName = generateUniqueName();
+    String tenantA = "TENANT_A_" + generateUniqueName();
+    String tenantB = "TENANT_B_" + generateUniqueName();
+
+    try (Connection globalConn = getGlobalConnection()) {
+      globalConn.createStatement()
+        .execute("CREATE TABLE " + tableName
+          + " (TENANT_ID VARCHAR NOT NULL, PK2 VARCHAR NOT NULL, V1 VARCHAR, 
V2 VARCHAR"
+          + " CONSTRAINT pk PRIMARY KEY (TENANT_ID, PK2))"
+          + " MULTI_TENANT=true, COLUMN_ENCODED_BYTES=0");
+      globalConn.createStatement().execute(
+        "CREATE UNCOVERED INDEX " + indexName + " ON " + tableName + "(V1) 
CONSISTENCY=EVENTUAL");
+    }
+
+    try (Connection connA = getTenantConnection(tenantA)) {
+      connA.createStatement()
+        .execute("UPSERT INTO " + tableName + "(PK2, V1, V2) VALUES ('r1', 
'a1', 'x1')");
+      connA.createStatement()
+        .execute("UPSERT INTO " + tableName + "(PK2, V1, V2) VALUES ('r2', 
'a2', 'x2')");
+      connA.commit();
+    }
+
+    try (Connection connB = getTenantConnection(tenantB)) {
+      connB.createStatement()
+        .execute("UPSERT INTO " + tableName + "(PK2, V1, V2) VALUES ('r1', 
'b1', 'y1')");
+      connB.commit();
+    }
+
+    waitForEventualConsistency();
+
+    try (Connection connA = getTenantConnection(tenantA)) {
+      ResultSet rs = connA.createStatement()
+        .executeQuery("SELECT PK2, V1, V2 FROM " + tableName + " WHERE V1 = 
'a1'");
+      assertTrue(rs.next());
+      assertEquals("r1", rs.getString(1));
+      assertEquals("a1", rs.getString(2));
+      assertEquals("x1", rs.getString(3));
+      assertFalse(rs.next());
+    }
+
+    try (Connection connB = getTenantConnection(tenantB)) {
+      ResultSet rs = connB.createStatement()
+        .executeQuery("SELECT PK2, V1, V2 FROM " + tableName + " WHERE V1 = 
'b1'");
+      assertTrue(rs.next());
+      assertEquals("r1", rs.getString(1));
+      assertEquals("b1", rs.getString(2));
+      assertEquals("y1", rs.getString(3));
+      assertFalse(rs.next());
+    }
+
+    try (Connection connA = getTenantConnection(tenantA)) {
+      connA.createStatement().execute("DELETE FROM " + tableName + " WHERE PK2 
= 'r1'");
+      connA.commit();
+    }
+
+    waitForEventualConsistency();
+
+    try (Connection connA = getTenantConnection(tenantA)) {
+      ResultSet rs = connA.createStatement()
+        .executeQuery("SELECT PK2, V1, V2 FROM " + tableName + " WHERE V1 = 
'a1'");
+      assertFalse("Deleted row should not appear for tenant A", rs.next());
+    }
+
+    try (Connection connB = getTenantConnection(tenantB)) {
+      ResultSet rs = connB.createStatement()
+        .executeQuery("SELECT PK2, V1, V2 FROM " + tableName + " WHERE V1 = 
'b1'");
+      assertTrue("Tenant B row should still be visible", rs.next());
+      assertEquals("r1", rs.getString(1));
+      assertFalse(rs.next());
+    }
+
+    try (Connection globalConn = getGlobalConnection()) {
+      long rowCount = verifyIndexTable(tableName, indexName, globalConn);
+      assertEquals(2, rowCount);
+    }
+  }
+
+  @Test
+  public void testMultiTenantCoveredIndexWithSplits() throws Exception {
+    String tableName = generateUniqueName();
+    String indexName = generateUniqueName();
+    String[] tenants = createTenants();
+
+    try (Connection globalConn = getGlobalConnection()) {
+      globalConn.createStatement()
+        .execute("CREATE TABLE " + tableName
+          + " (TENANT_ID VARCHAR NOT NULL, PK2 VARCHAR NOT NULL, V1 VARCHAR, 
V2 VARCHAR"
+          + " CONSTRAINT pk PRIMARY KEY (TENANT_ID, PK2))"
+          + " MULTI_TENANT=true, COLUMN_ENCODED_BYTES=0");
+      globalConn.createStatement().execute("CREATE INDEX " + indexName + " ON 
" + tableName
+        + "(V1) INCLUDE (V2) CONSISTENCY=EVENTUAL");
+    }
+
+    // Phase 1: Insert 10 rows per tenant (60 total), single region
+    insertRows(tenants, tableName, "p1", 1, ROWS_PER_TENANT_PER_PHASE);
+    waitForEventualConsistency();
+    verifyRowCount(tenants, tableName, ROWS_PER_TENANT_PER_PHASE, "Phase 1");
+    verifyIndexLookup(tenants, tableName, "p1_v5", "p1_r5", "p1_d5");
+
+    try (Connection globalConn = getGlobalConnection()) {
+      TestUtil.splitTable(globalConn, tableName, Bytes.toBytes("CC"));
+      assertEquals("Region count after split 1", 2, getRegionCount(globalConn, 
tableName));
+    }
+
+    // Phase 2: Insert 10 more rows per tenant after first split
+    insertRows(tenants, tableName, "p2", 1, ROWS_PER_TENANT_PER_PHASE);
+    waitForEventualConsistency();
+    verifyRowCount(tenants, tableName, 2 * ROWS_PER_TENANT_PER_PHASE, "Phase 
2");
+    verifyIndexLookup(tenants, tableName, "p1_v3", "p1_r3", "p1_d3");
+    verifyIndexLookup(tenants, tableName, "p2_v7", "p2_r7", "p2_d7");
+
+    try (Connection globalConn = getGlobalConnection()) {
+      TestUtil.splitTable(globalConn, tableName, Bytes.toBytes("EE"));
+      assertEquals("Region count after split 2", 3, getRegionCount(globalConn, 
tableName));
+    }
+
+    insertRows(tenants, tableName, "p3", 1, 5);
+    updateRows(tenants, tableName, "p1", 1, 5, "upd");
+    waitForEventualConsistency();
+    // 10 (p1, 5 updated) + 10 (p2) + 5 (p3) = 25 unique PKs
+    verifyRowCount(tenants, tableName, 2 * ROWS_PER_TENANT_PER_PHASE + 5, 
"Phase 3");
+
+    verifyIndexLookup(tenants, tableName, "p1_v1_upd", "p1_r1", "p1_d1_upd");
+    verifyIndexLookup(tenants, tableName, "p1_v5_upd", "p1_r5", "p1_d5_upd");
+    verifyNoResult(tenants, tableName, "p1_v1", "Old p1_v1 should not exist");
+    verifyNoResult(tenants, tableName, "p1_v5", "Old p1_v5 should not exist");
+    verifyIndexLookup(tenants, tableName, "p1_v6", "p1_r6", "p1_d6");
+
+    deleteRows(tenants, tableName, "p2", 1, 2);
+    waitForEventualConsistency();
+    verifyRowCount(tenants, tableName, 2 * ROWS_PER_TENANT_PER_PHASE + 3, 
"Phase 4");
+    verifyNoResult(tenants, tableName, "p2_v1", "Deleted p2_r1 should not be 
visible via index");
+    verifyNoResult(tenants, tableName, "p2_v2", "Deleted p2_r2 should not be 
visible via index");
+    verifyIndexLookup(tenants, tableName, "p2_v3", "p2_r3", "p2_d3");
+
+    try (Connection globalConn = getGlobalConnection()) {
+      long rowCount = verifyIndexTable(tableName, indexName, globalConn);
+      assertEquals(23 * tenants.length, rowCount);
+    }
+  }
+
+  @Test
+  public void testMultiTenantUncoveredIndexWithSplits() throws Exception {
+    String tableName = generateUniqueName();
+    String indexName = generateUniqueName();
+    String[] tenants = createTenants();
+
+    try (Connection globalConn = getGlobalConnection()) {
+      globalConn.createStatement()
+        .execute("CREATE TABLE " + tableName
+          + " (TENANT_ID VARCHAR NOT NULL, PK2 VARCHAR NOT NULL, V1 VARCHAR, 
V2 VARCHAR"
+          + " CONSTRAINT pk PRIMARY KEY (TENANT_ID, PK2))"
+          + " MULTI_TENANT=true, COLUMN_ENCODED_BYTES=0");
+      globalConn.createStatement().execute(
+        "CREATE UNCOVERED INDEX " + indexName + " ON " + tableName + "(V1) 
CONSISTENCY=EVENTUAL");
+    }
+
+    insertRows(tenants, tableName, "p1", 1, ROWS_PER_TENANT_PER_PHASE);
+    waitForEventualConsistency();
+    verifyRowCount(tenants, tableName, ROWS_PER_TENANT_PER_PHASE, "Phase 1");
+    verifyIndexLookup(tenants, tableName, "p1_v5", "p1_r5", "p1_d5");
+
+    try (Connection globalConn = getGlobalConnection()) {
+      TestUtil.splitTable(globalConn, tableName, Bytes.toBytes("CC"));
+      assertEquals(2, getRegionCount(globalConn, tableName));
+    }
+
+    insertRows(tenants, tableName, "p2", 1, ROWS_PER_TENANT_PER_PHASE);
+    waitForEventualConsistency();
+    verifyRowCount(tenants, tableName, 2 * ROWS_PER_TENANT_PER_PHASE, "Phase 
2");
+
+    try (Connection globalConn = getGlobalConnection()) {
+      TestUtil.splitTable(globalConn, tableName, Bytes.toBytes("EE"));
+      assertEquals(3, getRegionCount(globalConn, tableName));
+    }
+
+    insertRows(tenants, tableName, "p3", 1, 5);
+    updateRows(tenants, tableName, "p1", 1, 5, "upd");
+    waitForEventualConsistency();
+    verifyRowCount(tenants, tableName, 2 * ROWS_PER_TENANT_PER_PHASE + 5, 
"Phase 3");
+    verifyIndexLookup(tenants, tableName, "p1_v3_upd", "p1_r3", "p1_d3_upd");
+    verifyNoResult(tenants, tableName, "p1_v3", "Old value should not exist 
after update");
+
+    deleteRows(tenants, tableName, "p2", 1, 2);
+    waitForEventualConsistency();
+    verifyRowCount(tenants, tableName, 2 * ROWS_PER_TENANT_PER_PHASE + 3, 
"Phase 4");
+    verifyNoResult(tenants, tableName, "p2_v1", "Deleted row should not be 
visible");
+    verifyIndexLookup(tenants, tableName, "p2_v5", "p2_r5", "p2_d5");
+
+    try (Connection globalConn = getGlobalConnection()) {
+      long rowCount = verifyIndexTable(tableName, indexName, globalConn);
+      assertEquals(23 * tenants.length, rowCount);
+    }
+  }
+
+  @Test(timeout = 1800000)
+  @Ignore("too aggressive for jenkins builds")
+  public void testConcurrentUpsertsWithTableSplits() throws Exception {
+    int nThreads = 8;
+    final int batchSize = 100;
+    final int nRows = 777;
+    final int nIndexValues = 23;
+    final int nSplits = 3;
+    final int totalUpserts = 10000;
+    final String tableName = generateUniqueName();
+    final String indexName1 = generateUniqueName();
+    final String indexName2 = generateUniqueName();
+    final String indexName3 = generateUniqueName();
+    final String indexName4 = generateUniqueName();
+    final String indexName5 = generateUniqueName();
+    final String[] tenants = createTenants();
+
+    try (Connection globalConn = getGlobalConnection()) {
+      globalConn.createStatement()
+        .execute("CREATE TABLE " + tableName
+          + "(TENANT_ID VARCHAR NOT NULL, K2 INTEGER NOT NULL, V1 INTEGER, V2 
INTEGER,"
+          + " V3 INTEGER, V4 INTEGER," + " CONSTRAINT pk PRIMARY KEY 
(TENANT_ID, K2))"
+          + " MULTI_TENANT=true, COLUMN_ENCODED_BYTES=0");
+      globalConn.createStatement().execute("CREATE INDEX " + indexName1 + " ON 
" + tableName
+        + "(V1) INCLUDE (V2, V3) CONSISTENCY=EVENTUAL");
+      globalConn.createStatement().execute(
+        "CREATE UNCOVERED INDEX " + indexName2 + " ON " + tableName + "(V2) 
CONSISTENCY=EVENTUAL");
+      globalConn.createStatement().execute("CREATE INDEX " + indexName3 + " ON 
" + tableName
+        + "(V3) INCLUDE (V1, V2) CONSISTENCY=EVENTUAL");
+      globalConn.createStatement().execute(
+        "CREATE UNCOVERED INDEX " + indexName4 + " ON " + tableName + "(V4) 
CONSISTENCY=EVENTUAL");
+      globalConn.createStatement().execute("CREATE INDEX " + indexName5 + " ON 
" + tableName
+        + "(V1, V2) INCLUDE (V3, V4) CONSISTENCY=EVENTUAL");
+    }
+
+    final CountDownLatch doneSignal = new CountDownLatch(nThreads);
+    Runnable[] runnables = new Runnable[nThreads];
+    Thread.sleep(3000);
+    long startTime = EnvironmentEdgeManager.currentTimeMillis();
+
+    for (int i = 0; i < nThreads; i++) {
+      final String tenant = tenants[i % tenants.length];
+      runnables[i] = () -> {
+        try (Connection conn = getTenantConnection(tenant)) {
+          ThreadLocalRandom rand = ThreadLocalRandom.current();
+          for (int j = 0; j < totalUpserts; j++) {
+            conn.createStatement()
+              .execute("UPSERT INTO " + tableName + " VALUES (" + (j % nRows) 
+ ", "
+                + (rand.nextBoolean() ? null : (rand.nextInt() % 
nIndexValues)) + ", "
+                + (rand.nextBoolean() ? null : rand.nextInt()) + ", "
+                + (rand.nextBoolean() ? null : rand.nextInt()) + ", "
+                + (rand.nextBoolean() ? null : rand.nextInt()) + ")");
+            if ((j % batchSize) == 0) {
+              conn.commit();
+            }
+          }
+          conn.commit();
+        } catch (SQLException e) {
+          LOG.warn("Exception during concurrent upsert for tenant {}", tenant, 
e);
+        } finally {
+          doneSignal.countDown();
+        }
+      };
+    }
+
+    Thread splitThread = new Thread(() -> TestUtil.splitTable(getUrl(), 
tableName, nSplits, 8000));
+    splitThread.start();
+    for (int i = 0; i < nThreads; i++) {
+      if (i >= (nThreads - 4)) {
+        Thread.sleep(12000);
+      }
+      Thread t = new Thread(runnables[i]);
+      t.start();
+    }
+    assertTrue("Ran out of time", doneSignal.await(350, TimeUnit.SECONDS));
+    splitThread.join(10000);
+    LOG.info("Total upsert time: {} ms", 
EnvironmentEdgeManager.currentTimeMillis() - startTime);
+
+    int expectedTotal = tenants.length * nRows;
+    List<String> allIndexes =
+      new ArrayList<>(Arrays.asList(indexName1, indexName2, indexName3, 
indexName4, indexName5));
+    Collections.shuffle(allIndexes, ThreadLocalRandom.current());
+    LOG.info("Randomly selected indexes to verify: {}, {}", allIndexes.get(0), 
allIndexes.get(1));
+    try (Connection globalConn = getGlobalConnection()) {
+      Thread.sleep(500000);
+      long rowCount = verifyIndexTable(tableName, allIndexes.get(0), 
globalConn, false);
+      assertEquals("Index " + allIndexes.get(0), expectedTotal, rowCount);
+      rowCount = verifyIndexTable(tableName, allIndexes.get(1), globalConn, 
false);
+      assertEquals("Index " + allIndexes.get(1), expectedTotal, rowCount);
+    }
+  }
+
+}

Reply via email to