This is an automated email from the ASF dual-hosted git repository.

vjasani pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/master by this push:
     new 26444e0ccc PHOENIX-7677 TTL_DELETE CDC event to use batch mutation 
(#2247)
26444e0ccc is described below

commit 26444e0cccb4bf13fa95ec130ecf07b7573fe635
Author: Viraj Jasani <vjas...@apache.org>
AuthorDate: Tue Jul 29 21:51:39 2025 -0700

    PHOENIX-7677 TTL_DELETE CDC event to use batch mutation (#2247)
---
 .../org/apache/phoenix/query/QueryServices.java    |   6 +
 .../apache/phoenix/query/QueryServicesOptions.java |   8 +-
 .../phoenix/coprocessor/CDCCompactionUtil.java     | 452 ++++++++++++---------
 .../phoenix/coprocessor/CompactionScanner.java     |  36 +-
 .../org/apache/phoenix/end2end/TableTTLIT.java     | 126 ++++++
 .../phoenix/schema/ConditionalTTLExpressionIT.java |  20 +
 .../java/org/apache/phoenix/util/TestUtil.java     |  13 +-
 7 files changed, 454 insertions(+), 207 deletions(-)

diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java 
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java
index 94f8574361..e3f494897d 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -503,6 +503,12 @@ public interface QueryServices extends SQLCloseable {
   // CDC TTL mutation retry configuration
   String CDC_TTL_MUTATION_MAX_RETRIES = "phoenix.cdc.ttl.mutation.max.retries";
 
+  // CDC TTL mutation batch size configuration
+  String CDC_TTL_MUTATION_BATCH_SIZE = "phoenix.cdc.ttl.mutation.batch.size";
+
+  // CDC TTL shared cache expiration time in seconds
+  String CDC_TTL_SHARED_CACHE_EXPIRY_SECONDS = 
"phoenix.cdc.ttl.shared.cache.expiry.seconds";
+
   // This config is used to move (copy and delete) the child links from the 
SYSTEM.CATALOG to
   // SYSTEM.CHILD_LINK table.
   // As opposed to a copy and async (out of band) delete.
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index 16c1c28709..f6f44c23c1 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -24,7 +24,9 @@ import static 
org.apache.phoenix.query.QueryServices.ALLOW_VIEWS_ADD_NEW_CF_BASE
 import static org.apache.phoenix.query.QueryServices.AUTO_UPGRADE_ENABLED;
 import static 
org.apache.phoenix.query.QueryServices.CALL_QUEUE_PRODUCER_ATTRIB_NAME;
 import static 
org.apache.phoenix.query.QueryServices.CALL_QUEUE_ROUND_ROBIN_ATTRIB;
+import static 
org.apache.phoenix.query.QueryServices.CDC_TTL_MUTATION_BATCH_SIZE;
 import static 
org.apache.phoenix.query.QueryServices.CDC_TTL_MUTATION_MAX_RETRIES;
+import static 
org.apache.phoenix.query.QueryServices.CDC_TTL_SHARED_CACHE_EXPIRY_SECONDS;
 import static 
org.apache.phoenix.query.QueryServices.CLIENT_INDEX_ASYNC_THRESHOLD;
 import static org.apache.phoenix.query.QueryServices.CLIENT_METRICS_TAG;
 import static 
org.apache.phoenix.query.QueryServices.CLIENT_SPOOL_THRESHOLD_BYTES_ATTRIB;
@@ -493,6 +495,8 @@ public class QueryServicesOptions {
   public static final Boolean 
DEFAULT_CQSI_THREAD_POOL_ALLOW_CORE_THREAD_TIMEOUT = true;
   public static final Boolean DEFAULT_CQSI_THREAD_POOL_METRICS_ENABLED = false;
   public static final int DEFAULT_CDC_TTL_MUTATION_MAX_RETRIES = 5;
+  public static final int DEFAULT_CDC_TTL_MUTATION_BATCH_SIZE = 50;
+  public static final int DEFAULT_CDC_TTL_SHARED_CACHE_EXPIRY_SECONDS = 1200;
 
   public static final long 
DEFAULT_PHOENIX_CDC_STREAM_PARTITION_EXPIRY_MIN_AGE_MS =
     30 * 60 * 60 * 1000; // 30 hours
@@ -613,7 +617,9 @@ public class QueryServicesOptions {
       .setIfUnset(CQSI_THREAD_POOL_ALLOW_CORE_THREAD_TIMEOUT,
         DEFAULT_CQSI_THREAD_POOL_ALLOW_CORE_THREAD_TIMEOUT)
       .setIfUnset(CQSI_THREAD_POOL_METRICS_ENABLED, 
DEFAULT_CQSI_THREAD_POOL_METRICS_ENABLED)
-      .setIfUnset(CDC_TTL_MUTATION_MAX_RETRIES, 
DEFAULT_CDC_TTL_MUTATION_MAX_RETRIES);
+      .setIfUnset(CDC_TTL_MUTATION_MAX_RETRIES, 
DEFAULT_CDC_TTL_MUTATION_MAX_RETRIES)
+      .setIfUnset(CDC_TTL_MUTATION_BATCH_SIZE, 
DEFAULT_CDC_TTL_MUTATION_BATCH_SIZE)
+      .setIfUnset(CDC_TTL_SHARED_CACHE_EXPIRY_SECONDS, 
DEFAULT_CDC_TTL_SHARED_CACHE_EXPIRY_SECONDS);
 
     // HBase sets this to 1, so we reset it to something more appropriate.
     // Hopefully HBase will change this, because we can't know if a user set
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CDCCompactionUtil.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CDCCompactionUtil.java
index d078a3fae6..9921284591 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CDCCompactionUtil.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CDCCompactionUtil.java
@@ -20,18 +20,17 @@ package org.apache.phoenix.coprocessor;
 import static org.apache.phoenix.query.QueryConstants.NAME_SEPARATOR;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.CheckAndMutate;
-import org.apache.hadoop.hbase.client.CheckAndMutateResult;
-import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.regionserver.Region;
@@ -43,6 +42,8 @@ import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.index.IndexMaintainer;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.schema.PColumn;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.types.PDate;
@@ -53,19 +54,229 @@ import org.apache.phoenix.util.QueryUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.phoenix.thirdparty.com.google.common.cache.Cache;
+import org.apache.phoenix.thirdparty.com.google.common.cache.CacheBuilder;
+
 /**
  * Utility class for CDC (Change Data Capture) operations during compaction. 
This class contains
- * utilities for handling TTL row expiration events and generating CDC events 
with pre-image data
- * that are written directly to CDC index tables.
+ * utilities for handling TTL row expiration events and generating CDC events 
with pre-image data.
+ * CDC mutations are accumulated during compaction and written to CDC index 
tables in batches only
+ * when compaction completes.
  */
 public final class CDCCompactionUtil {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(CDCCompactionUtil.class);
 
+  // Shared cache for row images across all CompactionScanner instances in the 
JVM.
+  // Entries expire after 1200 seconds (20 minutes) by default.
+  // The JVM level cache helps merge the pre-image for the row with multiple 
CFs.
+  // The key of the cache contains (regionId + data table rowkey).
+  // The value contains pre-image that needs to be directly inserted in the 
CDC index.
+  private static volatile Cache<ImmutableBytesPtr, Map<String, Object>> 
sharedTtlImageCache;
+
   private CDCCompactionUtil() {
     // empty
   }
 
+  /**
+   * Gets the shared row image cache, initializing it lazily with 
configuration.
+   * @param config The Hadoop configuration to read cache expiry from
+   * @return the shared cache instance
+   */
+  static Cache<ImmutableBytesPtr, Map<String, Object>>
+    getSharedRowImageCache(Configuration config) {
+    if (sharedTtlImageCache == null) {
+      synchronized (CDCCompactionUtil.class) {
+        if (sharedTtlImageCache == null) {
+          int expirySeconds = 
config.getInt(QueryServices.CDC_TTL_SHARED_CACHE_EXPIRY_SECONDS,
+            QueryServicesOptions.DEFAULT_CDC_TTL_SHARED_CACHE_EXPIRY_SECONDS);
+          sharedTtlImageCache =
+            CacheBuilder.newBuilder().expireAfterWrite(expirySeconds, 
TimeUnit.SECONDS).build();
+          LOGGER.info("Initialized shared CDC row image cache with expiry of 
{} seconds",
+            expirySeconds);
+        }
+      }
+    }
+    return sharedTtlImageCache;
+  }
+
+  /**
+   * Batch processor for CDC mutations during compaction. This class 
accumulates all mutations
+   * during the compaction operation and writes them to the CDC index in 
batches only when the
+   * compaction is complete.
+   */
+  public static class CDCBatchProcessor {
+
+    private final Map<ImmutableBytesPtr, Put> pendingMutations;
+    private final PTable cdcIndex;
+    private final PTable dataTable;
+    private final RegionCoprocessorEnvironment env;
+    private final Region region;
+    private final byte[] compactionTimeBytes;
+    private final long eventTimestamp;
+    private final String tableName;
+    private final int cdcTtlMutationMaxRetries;
+    private final int batchSize;
+    private final Configuration config;
+
+    public CDCBatchProcessor(PTable cdcIndex, PTable dataTable, 
RegionCoprocessorEnvironment env,
+      Region region, byte[] compactionTimeBytes, long eventTimestamp, String 
tableName,
+      int cdcTtlMutationMaxRetries, int batchSize) {
+      this.pendingMutations = new HashMap<>();
+      this.cdcIndex = cdcIndex;
+      this.dataTable = dataTable;
+      this.env = env;
+      this.region = region;
+      this.compactionTimeBytes = compactionTimeBytes;
+      this.eventTimestamp = eventTimestamp;
+      this.tableName = tableName;
+      this.cdcTtlMutationMaxRetries = cdcTtlMutationMaxRetries;
+      this.batchSize = batchSize;
+      this.config = env.getConfiguration();
+    }
+
+    /**
+     * Adds a CDC event for the specified expired row. If the row already 
exists in memory, merges
+     * the image with the existing image. Accumulates mutations in memory for 
batch processing
+     * during close() instead of immediately writing to the CDC index.
+     * @param expiredRow The expired row.
+     * @throws Exception If something goes wrong.
+     */
+    public void addCDCEvent(List<Cell> expiredRow) throws Exception {
+      Cell firstCell = expiredRow.get(0);
+      byte[] dataRowKey = CellUtil.cloneRow(firstCell);
+
+      Put expiredRowPut = new Put(dataRowKey);
+      for (Cell cell : expiredRow) {
+        expiredRowPut.add(cell);
+      }
+
+      IndexMaintainer cdcIndexMaintainer;
+      // rowKey for the Index mutation
+      byte[] rowKey;
+      try (PhoenixConnection serverConnection =
+        QueryUtil.getConnectionOnServer(new Properties(), 
env.getConfiguration())
+          .unwrap(PhoenixConnection.class)) {
+        cdcIndexMaintainer = cdcIndex.getIndexMaintainer(dataTable, 
serverConnection);
+
+        ValueGetter dataRowVG = new IndexUtil.SimpleValueGetter(expiredRowPut);
+        ImmutableBytesPtr rowKeyPtr = new 
ImmutableBytesPtr(expiredRowPut.getRow());
+
+        Put cdcIndexPut = 
cdcIndexMaintainer.buildUpdateMutation(GenericKeyValueBuilder.INSTANCE,
+          dataRowVG, rowKeyPtr, eventTimestamp, null, null, false,
+          region.getRegionInfo().getEncodedNameAsBytes());
+
+        rowKey = cdcIndexPut.getRow().clone();
+        System.arraycopy(compactionTimeBytes, 0, rowKey, 
PartitionIdFunction.PARTITION_ID_LENGTH,
+          PDate.INSTANCE.getByteSize());
+      }
+
+      byte[] rowKeyWithoutTimestamp = new byte[rowKey.length - 
PDate.INSTANCE.getByteSize()];
+      // copy PARTITION_ID() from offset 0 to 31
+      System.arraycopy(rowKey, 0, rowKeyWithoutTimestamp, 0,
+        PartitionIdFunction.PARTITION_ID_LENGTH);
+      // copy data table rowkey from offset (32 + 8) to end of rowkey
+      System.arraycopy(rowKey,
+        PartitionIdFunction.PARTITION_ID_LENGTH + PDate.INSTANCE.getByteSize(),
+        rowKeyWithoutTimestamp, PartitionIdFunction.PARTITION_ID_LENGTH,
+        rowKeyWithoutTimestamp.length - 
PartitionIdFunction.PARTITION_ID_LENGTH);
+      ImmutableBytesPtr cacheKeyPtr = new 
ImmutableBytesPtr(rowKeyWithoutTimestamp);
+
+      // Check if we already have an image for this row in the shared cache, 
from other store
+      // compaction of the same region
+      Cache<ImmutableBytesPtr, Map<String, Object>> cache = 
getSharedRowImageCache(config);
+      Map<String, Object> existingPreImage = cache.getIfPresent(cacheKeyPtr);
+      if (existingPreImage == null) {
+        existingPreImage = new HashMap<>();
+        cache.put(cacheKeyPtr, existingPreImage);
+      }
+
+      // Create CDC event with merged pre-image
+      Map<String, Object> cdcEvent =
+        createTTLDeleteCDCEvent(expiredRowPut, dataTable, existingPreImage);
+      byte[] cdcEventBytes = 
JacksonUtil.getObjectWriter(HashMap.class).writeValueAsBytes(cdcEvent);
+      Put cdcIndexPut = buildCDCIndexPut(eventTimestamp, cdcEventBytes, 
rowKey, cdcIndexMaintainer);
+
+      pendingMutations.put(cacheKeyPtr, cdcIndexPut);
+    }
+
+    /**
+     * Flushes a specific list of mutations to the CDC index table.
+     * @param mutations List of mutations to flush
+     */
+    private void flushMutations(List<Put> mutations) throws Exception {
+      if (mutations.isEmpty()) {
+        return;
+      }
+
+      Exception lastException = null;
+      for (int retryCount = 0; retryCount < cdcTtlMutationMaxRetries; 
retryCount++) {
+        try (Table cdcIndexTable =
+          
env.getConnection().getTable(TableName.valueOf(cdcIndex.getPhysicalName().getBytes())))
 {
+          cdcIndexTable.put(mutations);
+          lastException = null;
+          LOGGER.debug("Successfully flushed batch of {} CDC mutations for 
table {}",
+            mutations.size(), tableName);
+          break;
+        } catch (Exception e) {
+          lastException = e;
+          long backoffMs = 100;
+          LOGGER.warn("CDC batch mutation attempt {}/{} failed, retrying in 
{}ms. Batch size: {}",
+            retryCount + 1, cdcTtlMutationMaxRetries, backoffMs, 
mutations.size(), e);
+          try {
+            Thread.sleep(backoffMs);
+          } catch (InterruptedException ie) {
+            Thread.currentThread().interrupt();
+            throw new IOException("Interrupted during CDC batch mutation 
retry", ie);
+          }
+        }
+      }
+
+      if (lastException != null) {
+        LOGGER.error(
+          "Failed to flush CDC batch after {} attempts for table {}, index {}. 
{} "
+            + "events are missed.",
+          cdcTtlMutationMaxRetries, tableName, 
cdcIndex.getPhysicalName().getString(),
+          mutations.size(), lastException);
+      }
+    }
+
+    /**
+     * Finalizes the batch processor by flushing all accumulated mutations in 
batches. This method
+     * processes all accumulated mutations and writes them to the CDC index in 
batches of the
+     * configured batch size.
+     */
+    public void close() throws Exception {
+      if (pendingMutations.isEmpty()) {
+        LOGGER.trace("No CDC mutations to flush for table {}", tableName);
+        return;
+      }
+
+      int totalMutations = pendingMutations.size();
+      LOGGER.info("Flushing {} accumulated CDC mutations for table {} in 
batches of {}",
+        totalMutations, tableName, batchSize);
+
+      List<Put> allMutations = new ArrayList<>(pendingMutations.values());
+
+      for (int i = 0; i < allMutations.size(); i += batchSize) {
+        int endIndex = Math.min(i + batchSize, allMutations.size());
+        List<Put> batch = allMutations.subList(i, endIndex);
+        flushMutations(batch);
+        LOGGER.debug("Flushed CDC batch {}/{} for table {} (mutations {}-{} of 
{})",
+          (i / batchSize) + 1, (allMutations.size() + batchSize - 1) / 
batchSize, tableName, i + 1,
+          endIndex, totalMutations);
+      }
+
+      pendingMutations.clear();
+
+      Cache<ImmutableBytesPtr, Map<String, Object>> cache = 
getSharedRowImageCache(config);
+      LOGGER.info(
+        "CDC batch processor closed for table {}. Processed {} mutations in {} 
batches."
+          + " Shared cache size: {}",
+        tableName, totalMutations, (totalMutations + batchSize - 1) / 
batchSize, cache.size());
+    }
+  }
+
   /**
    * Finds the column name for a given cell in the data table.
    * @param dataTable The data table
@@ -128,223 +339,80 @@ public final class CDCCompactionUtil {
 
   /**
    * Builds CDC index Put mutation.
-   * @param cdcIndex            The CDC index table
-   * @param expiredRowPut       The expired row data as a Put
-   * @param eventTimestamp      The timestamp for the CDC event
-   * @param cdcEventBytes       The CDC event data to store
-   * @param dataTable           The data table
-   * @param env                 The region coprocessor environment
-   * @param region              The HBase region
-   * @param compactionTimeBytes The compaction time as bytes
+   * @param eventTimestamp     The timestamp for the CDC event
+   * @param cdcEventBytes      The CDC event data to store
+   * @param rowKey             The rowKey of the CDC index mutation
+   * @param cdcIndexMaintainer The index maintainer object for the CDC index
    * @return The CDC index Put mutation
    */
-  private static Put buildCDCIndexPut(PTable cdcIndex, Put expiredRowPut, long 
eventTimestamp,
-    byte[] cdcEventBytes, PTable dataTable, RegionCoprocessorEnvironment env, 
Region region,
-    byte[] compactionTimeBytes) throws Exception {
+  private static Put buildCDCIndexPut(long eventTimestamp, byte[] 
cdcEventBytes, byte[] rowKey,
+    IndexMaintainer cdcIndexMaintainer) {
 
-    try (PhoenixConnection serverConnection =
-      QueryUtil.getConnectionOnServer(new Properties(), env.getConfiguration())
-        .unwrap(PhoenixConnection.class)) {
+    Put newCdcIndexPut = new Put(rowKey, eventTimestamp);
 
-      IndexMaintainer cdcIndexMaintainer = 
cdcIndex.getIndexMaintainer(dataTable, serverConnection);
+    
newCdcIndexPut.addColumn(cdcIndexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary(),
+      cdcIndexMaintainer.getEmptyKeyValueQualifier(), eventTimestamp,
+      QueryConstants.UNVERIFIED_BYTES);
 
-      ValueGetter dataRowVG = new IndexUtil.SimpleValueGetter(expiredRowPut);
-      ImmutableBytesPtr rowKeyPtr = new 
ImmutableBytesPtr(expiredRowPut.getRow());
+    // Add CDC event data
+    newCdcIndexPut.addColumn(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES,
+      QueryConstants.CDC_IMAGE_CQ_BYTES, eventTimestamp, cdcEventBytes);
 
-      Put cdcIndexPut = 
cdcIndexMaintainer.buildUpdateMutation(GenericKeyValueBuilder.INSTANCE,
-        dataRowVG, rowKeyPtr, eventTimestamp, null, null, false,
-        region.getRegionInfo().getEncodedNameAsBytes());
-
-      byte[] rowKey = cdcIndexPut.getRow().clone();
-      System.arraycopy(compactionTimeBytes, 0, rowKey, 
PartitionIdFunction.PARTITION_ID_LENGTH,
-        PDate.INSTANCE.getByteSize());
-      Put newCdcIndexPut = new Put(rowKey, eventTimestamp);
-
-      
newCdcIndexPut.addColumn(cdcIndexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary(),
-        cdcIndexMaintainer.getEmptyKeyValueQualifier(), eventTimestamp,
-        QueryConstants.UNVERIFIED_BYTES);
-
-      // Add CDC event data
-      newCdcIndexPut.addColumn(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES,
-        QueryConstants.CDC_IMAGE_CQ_BYTES, eventTimestamp, cdcEventBytes);
-
-      return newCdcIndexPut;
-    }
+    return newCdcIndexPut;
   }
 
   /**
-   * Generates and applies a CDC index mutation for TTL expired row with 
retries if required.
-   * @param cdcIndex                 The CDC index table
+   * Creates a CDC batch processor for the given data table and configuration.
    * @param dataTable                The data table
-   * @param expiredRowPut            The expired row data as a Put
-   * @param eventTimestamp           The timestamp for the CDC event
-   * @param tableName                The table name for logging
    * @param env                      The region coprocessor environment
    * @param region                   The HBase region
    * @param compactionTimeBytes      The compaction time as bytes
-   * @param cdcTtlMutationMaxRetries Maximum retry attempts for CDC mutations
-   */
-  private static void generateCDCIndexMutation(PTable cdcIndex, PTable 
dataTable, Put expiredRowPut,
-    long eventTimestamp, String tableName, RegionCoprocessorEnvironment env, 
Region region,
-    byte[] compactionTimeBytes, int cdcTtlMutationMaxRetries) throws Exception 
{
-    Map<String, Object> cdcEvent =
-      createTTLDeleteCDCEvent(expiredRowPut, dataTable, new HashMap<>());
-    byte[] cdcEventBytes = 
JacksonUtil.getObjectWriter(HashMap.class).writeValueAsBytes(cdcEvent);
-    Put cdcIndexPut = buildCDCIndexPut(cdcIndex, expiredRowPut, 
eventTimestamp, cdcEventBytes,
-      dataTable, env, region, compactionTimeBytes);
-
-    Exception lastException = null;
-    for (int retryCount = 0; retryCount < cdcTtlMutationMaxRetries; 
retryCount++) {
-      try (Table cdcIndexTable =
-        
env.getConnection().getTable(TableName.valueOf(cdcIndex.getPhysicalName().getBytes())))
 {
-        CheckAndMutate checkAndMutate = 
CheckAndMutate.newBuilder(cdcIndexPut.getRow())
-          .ifNotExists(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES,
-            QueryConstants.CDC_IMAGE_CQ_BYTES)
-          .build(cdcIndexPut);
-        CheckAndMutateResult result = 
cdcIndexTable.checkAndMutate(checkAndMutate);
-
-        if (result.isSuccess()) {
-          // Successfully inserted new CDC event - Single CF case
-          lastException = null;
-          break;
-        } else {
-          // Row already exists, need to retrieve existing pre-image and merge
-          // Likely to happen for multi CF case
-          Get get = new Get(cdcIndexPut.getRow());
-          get.addColumn(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES,
-            QueryConstants.CDC_IMAGE_CQ_BYTES);
-          Result existingResult = cdcIndexTable.get(get);
-
-          if (!existingResult.isEmpty()) {
-            Cell existingCell = existingResult.getColumnLatestCell(
-              QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, 
QueryConstants.CDC_IMAGE_CQ_BYTES);
-
-            if (existingCell != null) {
-              byte[] existingCdcBytes = CellUtil.cloneValue(existingCell);
-              Map<String, Object> existingCdcEvent =
-                
JacksonUtil.getObjectReader(HashMap.class).readValue(existingCdcBytes);
-              Map<String, Object> existingPreImage = (Map<String, Object>) 
existingCdcEvent
-                .getOrDefault(QueryConstants.CDC_PRE_IMAGE, new HashMap<>());
-
-              // Create new TTL delete event with merged pre-image
-              Map<String, Object> mergedCdcEvent =
-                createTTLDeleteCDCEvent(expiredRowPut, dataTable, 
existingPreImage);
-              byte[] mergedCdcEventBytes =
-                
JacksonUtil.getObjectWriter(HashMap.class).writeValueAsBytes(mergedCdcEvent);
-
-              Put mergedCdcIndexPut = buildCDCIndexPut(cdcIndex, 
expiredRowPut, eventTimestamp,
-                mergedCdcEventBytes, dataTable, env, region, 
compactionTimeBytes);
-
-              cdcIndexTable.put(mergedCdcIndexPut);
-              lastException = null;
-              break;
-            } else {
-              LOGGER.warn("Rare event: Skipping CDC TTL mutation because other 
type"
-                + " of CDC event is recorded at time {}", eventTimestamp);
-              break;
-            }
-          } else {
-            LOGGER.warn("Rare event.. Skipping CDC TTL mutation because other 
type"
-              + " of CDC event is recorded at time {}", eventTimestamp);
-            break;
-          }
-        }
-      } catch (Exception e) {
-        lastException = e;
-        long backoffMs = 100;
-        LOGGER.warn("CDC mutation attempt {}/{} failed, retrying in {}ms", 
retryCount + 1,
-          cdcTtlMutationMaxRetries + 1, backoffMs, e);
-        try {
-          Thread.sleep(backoffMs);
-        } catch (InterruptedException ie) {
-          Thread.currentThread().interrupt();
-          throw new IOException("Interrupted during CDC mutation retry", ie);
-        }
-      }
-    }
-    if (lastException != null) {
-      LOGGER.error(
-        "Failed to generate CDC mutation after {} attempts for table {}, index 
"
-          + "{}. The event update is missed.",
-        cdcTtlMutationMaxRetries + 1, tableName, 
cdcIndex.getPhysicalName().getString(),
-        lastException);
-    }
-  }
-
-  /**
-   * Generates CDC TTL delete event and writes it directly to CDC index 
tables. This bypasses the
-   * normal CDC update path since the row is being expired.
-   * @param expiredRow               The cells of the expired row
-   * @param tableName                The table name for logging
    * @param compactionTime           The compaction timestamp
-   * @param dataTable                The data table
-   * @param env                      The region coprocessor environment
-   * @param region                   The HBase region
-   * @param compactionTimeBytes      The compaction time as bytes
+   * @param tableName                The table name for logging
    * @param cdcTtlMutationMaxRetries Maximum retry attempts for CDC mutations
+   * @param batchSize                The batch size for CDC mutations
+   * @return CDCBatchProcessor instance or null if no active CDC index
    */
-  private static void generateCDCTTLDeleteEvent(List<Cell> expiredRow, String 
tableName,
-    long compactionTime, PTable dataTable, RegionCoprocessorEnvironment env, 
Region region,
-    byte[] compactionTimeBytes, int cdcTtlMutationMaxRetries) {
-    try {
-      PTable cdcIndex = CDCUtil.getActiveCDCIndex(dataTable);
-      if (cdcIndex == null) {
-        LOGGER.warn("No active CDC index found for table {}", tableName);
-        return;
-      }
-      Cell firstCell = expiredRow.get(0);
-      byte[] dataRowKey = CellUtil.cloneRow(firstCell);
-      Put expiredRowPut = new Put(dataRowKey);
-
-      for (Cell cell : expiredRow) {
-        expiredRowPut.add(cell);
-      }
-
-      try {
-        generateCDCIndexMutation(cdcIndex, dataTable, expiredRowPut, 
compactionTime, tableName, env,
-          region, compactionTimeBytes, cdcTtlMutationMaxRetries);
-      } catch (Exception e) {
-        LOGGER.error("Failed to generate CDC mutation for index {}: {}",
-          cdcIndex.getName().getString(), e.getMessage(), e);
-      }
-    } catch (Exception e) {
-      LOGGER.error("Error generating CDC TTL delete event for table {}", 
tableName, e);
+  public static CDCBatchProcessor createBatchProcessor(PTable dataTable,
+    RegionCoprocessorEnvironment env, Region region, byte[] 
compactionTimeBytes,
+    long compactionTime, String tableName, int cdcTtlMutationMaxRetries, int 
batchSize) {
+    PTable cdcIndex = CDCUtil.getActiveCDCIndex(dataTable);
+    if (cdcIndex == null) {
+      LOGGER.warn("No active CDC index found for table {}", tableName);
+      return null;
     }
+    return new CDCBatchProcessor(cdcIndex, dataTable, env, region, 
compactionTimeBytes,
+      compactionTime, tableName, cdcTtlMutationMaxRetries, batchSize);
   }
 
   /**
-   * Handles TTL row expiration for CDC event generation. This method is 
called when a row is
-   * detected as expired during major compaction.
-   * @param expiredRow               The cells of the expired row
-   * @param expirationType           The type of TTL expiration
-   * @param tableName                The table name for logging purposes
-   * @param compactionTime           The timestamp when compaction started
-   * @param table                    The Phoenix data table metadata
-   * @param env                      The region coprocessor environment for 
accessing HBase
-   *                                 resources
-   * @param region                   The HBase region being compacted
-   * @param compactionTimeBytes      The compaction timestamp as byte array 
for CDC index row key
-   *                                 construction
-   * @param cdcTtlMutationMaxRetries Maximum number of retry attempts for CDC 
mutation operations
+   * Handles TTL row expiration for CDC event generation using batch 
processing. This method is
+   * called when a row is detected as expired during major compaction.
+   * @param expiredRow     The cells of the expired row
+   * @param expirationType The type of TTL expiration
+   * @param tableName      The table name for logging purposes
+   * @param batchProcessor The CDC batch processor instance
    */
   static void handleTTLRowExpiration(List<Cell> expiredRow, String 
expirationType, String tableName,
-    long compactionTime, PTable table, RegionCoprocessorEnvironment env, 
Region region,
-    byte[] compactionTimeBytes, int cdcTtlMutationMaxRetries) {
+    CDCBatchProcessor batchProcessor) {
+    if (batchProcessor == null) {
+      return;
+    }
+
     try {
       Cell firstCell = expiredRow.get(0);
       byte[] rowKey = CellUtil.cloneRow(firstCell);
 
-      LOGGER.info(
+      LOGGER.debug(
         "TTL row expiration detected: table={}, rowKey={}, expirationType={}, "
           + "cellCount={}, compactionTime={}",
-        tableName, Bytes.toStringBinary(rowKey), expirationType, 
expiredRow.size(), compactionTime);
-
-      // Generate CDC TTL delete event with pre-image data
-      generateCDCTTLDeleteEvent(expiredRow, tableName, compactionTime, table, 
env, region,
-        compactionTimeBytes, cdcTtlMutationMaxRetries);
+        tableName, Bytes.toStringBinary(rowKey), expirationType, 
expiredRow.size(),
+        batchProcessor.eventTimestamp);
 
+      batchProcessor.addCDCEvent(expiredRow);
     } catch (Exception e) {
       LOGGER.error("Error handling TTL row expiration for CDC: table {}", 
tableName, e);
     }
   }
+
 }
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java
index c82a421e9b..f12dc77f72 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java
@@ -164,6 +164,7 @@ public class CompactionScanner implements InternalScanner {
   private final boolean isCdcTtlEnabled;
   private final PTable table;
   private final int cdcTtlMutationMaxRetries;
+  private CDCCompactionUtil.CDCBatchProcessor cdcBatchProcessor;
 
   // Only for forcing minor compaction while testing
   private static boolean forceMinorCompaction = false;
@@ -216,6 +217,15 @@ public class CompactionScanner implements InternalScanner {
       env.getConfiguration().getInt(QueryServices.CDC_TTL_MUTATION_MAX_RETRIES,
         QueryServicesOptions.DEFAULT_CDC_TTL_MUTATION_MAX_RETRIES);
 
+    if (isCdcTtlEnabled) {
+      int cdcTtlMutationBatchSize =
+        
env.getConfiguration().getInt(QueryServices.CDC_TTL_MUTATION_BATCH_SIZE,
+          QueryServicesOptions.DEFAULT_CDC_TTL_MUTATION_BATCH_SIZE);
+      cdcBatchProcessor =
+        CDCCompactionUtil.createBatchProcessor(table, env, region, 
compactionTimeBytes,
+          compactionTime, tableName, cdcTtlMutationMaxRetries, 
cdcTtlMutationBatchSize);
+    }
+
     // Initialize the tracker that computes the TTL for the compacting table.
     // The TTL tracker can be
     // simple (one single TTL for the table) when the compacting table is not 
Partitioned
@@ -414,9 +424,9 @@ public class CompactionScanner implements InternalScanner {
     CompiledConditionalTTLExpression ttlExpr =
       (CompiledConditionalTTLExpression) rowContext.ttlExprForRow;
     if (ttlExpr.isExpired(result, true)) {
-      if (isCdcTtlEnabled && !result.isEmpty()) {
+      if (isCdcTtlEnabled && cdcBatchProcessor != null && !result.isEmpty()) {
         CDCCompactionUtil.handleTTLRowExpiration(result, "conditional_ttl", 
tableName,
-          compactionTime, table, env, region, compactionTimeBytes, 
cdcTtlMutationMaxRetries);
+          cdcBatchProcessor);
       }
       // If the row is expired, purge the row
       result.clear();
@@ -452,10 +462,23 @@ public class CompactionScanner implements InternalScanner 
{
     LOGGER.info("Closing CompactionScanner for table " + tableName + " store " 
+ columnFamilyName
       + (major ? " major " : " not major ") + "compaction retained " + 
outputCellCount + " of "
       + inputCellCount + " cells" + (phoenixLevelOnly ? " phoenix level only" 
: ""));
+
     if (forceMinorCompaction) {
       forceMinorCompaction = false;
     }
     storeScanner.close();
+
+    // Flush any remaining CDC mutations in the batch
+    if (cdcBatchProcessor != null) {
+      try {
+        cdcBatchProcessor.close();
+      } catch (Exception e) {
+        LOGGER.error("Error closing CDC batch processor for table {}", 
tableName, e);
+        throw new IOException("Failed to close CDC batch processor", e);
+      } finally {
+        
CDCCompactionUtil.getSharedRowImageCache(env.getConfiguration()).cleanUp();
+      }
+    }
   }
 
   enum MatcherType {
@@ -2427,9 +2450,9 @@ public class CompactionScanner implements InternalScanner 
{
         // Only do this check for major compaction as for minor compactions we 
don't expire cells.
         // The row version should not be visible via the max lookback window. 
Nothing to do
 
-        if (isCdcTtlEnabled && !lastRow.isEmpty()) {
+        if (isCdcTtlEnabled && cdcBatchProcessor != null && 
!lastRow.isEmpty()) {
           CDCCompactionUtil.handleTTLRowExpiration(lastRow, "time_based_ttl", 
tableName,
-            compactionTime, table, env, region, compactionTimeBytes, 
cdcTtlMutationMaxRetries);
+            cdcBatchProcessor);
         }
         return;
       }
@@ -2521,9 +2544,9 @@ public class CompactionScanner implements InternalScanner 
{
           // store is not the empty column family store.
           return false;
         }
-        if (isCdcTtlEnabled && !lastRowVersion.isEmpty()) {
+        if (isCdcTtlEnabled && cdcBatchProcessor != null && 
!lastRowVersion.isEmpty()) {
           CDCCompactionUtil.handleTTLRowExpiration(lastRowVersion, 
"max_lookback_ttl", tableName,
-            compactionTime, table, env, region, compactionTimeBytes, 
cdcTtlMutationMaxRetries);
+            cdcBatchProcessor);
         }
         return true;
       }
@@ -2575,7 +2598,6 @@ public class CompactionScanner implements InternalScanner 
{
           lastRowVersion.clear();
           lastRowVersion.addAll(trimmedRow);
           trimmedEmptyColumn.clear();
-          ;
           for (Cell cell : emptyColumn) {
             if (cell.getTimestamp() >= minTimestamp) {
               trimmedEmptyColumn.add(cell);
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableTTLIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableTTLIT.java
index 97eecf2c42..b7f481d118 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableTTLIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableTTLIT.java
@@ -28,8 +28,10 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import java.io.IOException;
 import java.sql.Connection;
 import java.sql.DriverManager;
+import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
+import java.sql.Timestamp;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashMap;
@@ -782,6 +784,130 @@ public class TableTTLIT extends BaseTest {
     TestUtil.majorCompact(getUtility(), table);
   }
 
+  /**
+   * Test CDC batch mutations for TTL expired rows. This test creates a table 
with TTL and CDC
+   * index, inserts 82 rows (to test batching: 25+25+25+7), lets them expire 
via TTL, and verifies
+   * that all 82 rows have CDC TTL_DELETE events recorded with correct 
pre-image data.
+   */
+  @Test
+  public void testCDCBatchMutationsForTTLExpiredRows() throws Exception {
+    final int maxLookbackAge =
+      tableLevelMaxLookback != null ? tableLevelMaxLookback : MAX_LOOKBACK_AGE;
+    final int numRows = 182;
+
+    try (Connection conn = DriverManager.getConnection(getUrl())) {
+      String tableName = generateUniqueName();
+      String cdcName = generateUniqueName();
+      ObjectMapper mapper = new ObjectMapper();
+
+      createTable(tableName);
+      conn.createStatement().execute("ALTER TABLE " + tableName
+        + " SET \"phoenix.max.lookback.age.seconds\" = " + maxLookbackAge);
+
+      String cdcSql = "CREATE CDC " + cdcName + " ON " + tableName + " INCLUDE 
(PRE, POST)";
+      conn.createStatement().execute(cdcSql);
+      conn.commit();
+
+      String cdcFullName = SchemaUtil.getTableName(null, cdcName);
+
+      long startTime = System.currentTimeMillis() + 1000;
+      startTime = (startTime / 1000) * 1000;
+      EnvironmentEdgeManager.injectEdge(injectEdge);
+      injectEdge.setValue(startTime);
+
+      // Track post-images for each row to verify against pre-images later
+      Map<String, Map<String, Object>> lastPostImages = new HashMap<>();
+
+      for (int i = 1; i <= numRows; i++) {
+        String rowId = "row" + i;
+        updateRow(conn, tableName, rowId);
+        injectEdge.incrementValue(100);
+      }
+
+      // Get the post-images from the UPSERT events
+      String cdcQuery = "SELECT /*+ CDC_INCLUDE(PRE, POST) */ * FROM " + 
cdcFullName;
+      try (ResultSet rs = conn.createStatement().executeQuery(cdcQuery)) {
+        while (rs.next()) {
+          Map<String, Object> cdcEvent = mapper.readValue(rs.getString(3), 
HashMap.class);
+          assertEquals("Should be upsert event", CDC_UPSERT_EVENT_TYPE,
+            cdcEvent.get(CDC_EVENT_TYPE));
+
+          Map<String, Object> postImage = (Map<String, Object>) 
cdcEvent.get(CDC_POST_IMAGE);
+          String rowId = rs.getString(2);
+          lastPostImages.put(rowId, postImage);
+        }
+      }
+
+      assertEquals("Should have captured post-images for all " + numRows + " 
rows", numRows,
+        lastPostImages.size());
+
+      // Advance time past TTL to expire all rows
+      injectEdge.incrementValue((ttl + maxLookbackAge + 1) * 1000);
+
+      EnvironmentEdgeManager.reset();
+      flush(TableName.valueOf(tableName));
+      EnvironmentEdgeManager.injectEdge(injectEdge);
+
+      Timestamp ts = new Timestamp(injectEdge.currentTime());
+      majorCompact(TableName.valueOf(tableName));
+
+      // Verify all rows are expired from data table
+      String dataQuery = "SELECT COUNT(*) FROM " + tableName;
+      try (ResultSet rs = conn.createStatement().executeQuery(dataQuery)) {
+        assertTrue("Should have count result", rs.next());
+        assertEquals("All rows should be expired from data table", 0, 
rs.getInt(1));
+      }
+
+      // Verify all TTL_DELETE CDC events were generated
+      String ttlDeleteQuery = "SELECT /*+ CDC_INCLUDE(PRE, POST) */ * FROM " + 
cdcFullName
+        + " WHERE PHOENIX_ROW_TIMESTAMP() >= ?";
+
+      Map<String, Map<String, Object>> ttlDeletePreImages = new HashMap<>();
+      int ttlDeleteEventCount = 0;
+
+      try (PreparedStatement pst = conn.prepareStatement(ttlDeleteQuery)) {
+        pst.setTimestamp(1, ts);
+
+        try (ResultSet rs = pst.executeQuery(ttlDeleteQuery)) {
+          while (rs.next()) {
+            ttlDeleteEventCount++;
+            Map<String, Object> cdcEvent = mapper.readValue(rs.getString(3), 
HashMap.class);
+
+            assertEquals("Should be ttl_delete event", 
CDC_TTL_DELETE_EVENT_TYPE,
+              cdcEvent.get(CDC_EVENT_TYPE));
+
+            assertTrue("TTL delete should have pre-image", 
cdcEvent.containsKey(CDC_PRE_IMAGE));
+            Map<String, Object> preImage = (Map<String, Object>) 
cdcEvent.get(CDC_PRE_IMAGE);
+            assertNotNull("Pre-image should not be null", preImage);
+            assertFalse("Pre-image should not be empty", preImage.isEmpty());
+
+            String rowId = rs.getString(2);
+            ttlDeletePreImages.put(rowId, preImage);
+          }
+        }
+      }
+
+      assertEquals("Should have exactly " + numRows + " TTL_DELETE events", 
numRows,
+        ttlDeleteEventCount);
+      assertEquals("Should have pre-images for all " + numRows + " rows", 
numRows,
+        ttlDeletePreImages.size());
+
+      // Verify that pre-images in TTL_DELETE events match the last 
post-images from UPSERT events
+      for (String rowId : lastPostImages.keySet()) {
+        assertTrue("Should have TTL_DELETE pre-image for row " + rowId,
+          ttlDeletePreImages.containsKey(rowId));
+
+        Map<String, Object> lastPostImage = lastPostImages.get(rowId);
+        Map<String, Object> ttlDeletePreImage = ttlDeletePreImages.get(rowId);
+
+        assertEquals(
+          "Pre-image in TTL_DELETE should match last post-image from UPSERT 
for row " + rowId,
+          lastPostImage, ttlDeletePreImage);
+      }
+
+    }
+  }
+
   private void deleteRow(Connection conn, String tableName, String id) throws 
SQLException {
     String dml = "DELETE from " + tableName + " WHERE id = '" + id + "'";
     conn.createStatement().executeUpdate(dml);
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/schema/ConditionalTTLExpressionIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/schema/ConditionalTTLExpressionIT.java
index c16accfbe7..3cf200e46e 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/schema/ConditionalTTLExpressionIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/schema/ConditionalTTLExpressionIT.java
@@ -39,6 +39,8 @@ import static org.junit.Assert.fail;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
 import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.PreparedStatement;
@@ -57,14 +59,17 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.mapreduce.CounterGroup;
+import org.apache.phoenix.coprocessor.CDCCompactionUtil;
 import org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants;
 import org.apache.phoenix.end2end.IndexToolIT;
 import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
 import org.apache.phoenix.end2end.ParallelStatsDisabledIT;
 import org.apache.phoenix.hbase.index.IndexRegionObserver;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixResultSet;
 import org.apache.phoenix.mapreduce.index.IndexTool;
@@ -101,6 +106,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.phoenix.thirdparty.com.google.common.base.Joiner;
+import org.apache.phoenix.thirdparty.com.google.common.cache.Cache;
 import org.apache.phoenix.thirdparty.com.google.common.collect.Lists;
 import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
 
@@ -158,6 +164,7 @@ public class ConditionalTTLExpressionIT extends 
ParallelStatsDisabledIT {
       Integer.toString(0));
     props.put("hbase.procedure.remote.dispatcher.delay.msec", "0");
     
props.put(QueryServices.GLOBAL_INDEX_ROW_AGE_THRESHOLD_TO_DELETE_MS_ATTRIB, 
Long.toString(0));
+    props.put(QueryServices.CDC_TTL_SHARED_CACHE_EXPIRY_SECONDS, 
Integer.toString(1));
     setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
   }
 
@@ -543,6 +550,9 @@ public class ConditionalTTLExpressionIT extends 
ParallelStatsDisabledIT {
 
       // Trigger TTL expiration again
       injectEdge.incrementValue(ttl);
+
+      Thread.sleep(700);
+      cleanUpSharedTtlImageCache();
       doMajorCompaction(tableName);
 
       // Verify all rows are expired from data table
@@ -1470,6 +1480,16 @@ public class ConditionalTTLExpressionIT extends 
ParallelStatsDisabledIT {
     }
   }
 
+  private void cleanUpSharedTtlImageCache()
+    throws NoSuchMethodException, InvocationTargetException, 
IllegalAccessException {
+    Method getSharedCacheMethod =
+      CDCCompactionUtil.class.getDeclaredMethod("getSharedRowImageCache", 
Configuration.class);
+    getSharedCacheMethod.setAccessible(true);
+    Cache<ImmutableBytesPtr, Map<String, Object>> cache = 
(Cache<ImmutableBytesPtr,
+      Map<String, Object>>) getSharedCacheMethod.invoke(null, (Configuration) 
null);
+    cache.cleanUp();
+  }
+
   private void doMajorCompaction(String tableName) throws IOException, 
InterruptedException {
     TestUtil.flush(getUtility(), TableName.valueOf(tableName));
     TestUtil.majorCompact(getUtility(), TableName.valueOf(tableName));
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java 
b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
index e08a406af6..6541dd7338 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
@@ -1653,13 +1653,12 @@ public class TestUtil {
     TableOperation operation) throws Exception {
     ConnectionQueryServices services = 
conn.unwrap(PhoenixConnection.class).getQueryServices();
     Configuration configuration = services.getConfiguration();
-    org.apache.hadoop.hbase.client.Connection hbaseConn =
-      ConnectionFactory.createConnection(configuration);
-    Admin admin = services.getAdmin();
-    RegionLocator regionLocator = 
hbaseConn.getRegionLocator(TableName.valueOf(tableName));
-    int nRegions = regionLocator.getAllRegionLocations().size();
-    operation.execute(admin, regionLocator, nRegions);
-
+    try (org.apache.hadoop.hbase.client.Connection hbaseConn =
+      ConnectionFactory.createConnection(configuration); Admin admin = 
services.getAdmin()) {
+      RegionLocator regionLocator = 
hbaseConn.getRegionLocator(TableName.valueOf(tableName));
+      int nRegions = regionLocator.getAllRegionLocations().size();
+      operation.execute(admin, regionLocator, nRegions);
+    }
   }
 
   private static void waitForRegionChange(RegionLocator regionLocator, int 
initialRegionCount)


Reply via email to