virajjasani commented on code in PR #2247:
URL: https://github.com/apache/phoenix/pull/2247#discussion_r2240870681


##########
phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CDCCompactionUtil.java:
##########
@@ -53,19 +54,206 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.phoenix.thirdparty.com.google.common.cache.Cache;
+import org.apache.phoenix.thirdparty.com.google.common.cache.CacheBuilder;
+
 /**
  * Utility class for CDC (Change Data Capture) operations during compaction. 
This class contains
  * utilities for handling TTL row expiration events and generating CDC events 
with pre-image data
- * that are written directly to CDC index tables.
+ * that are written directly to CDC index tables using batch mutations.
  */
 public final class CDCCompactionUtil {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(CDCCompactionUtil.class);
 
+  // Shared cache for row images across all CompactionScanner instances in the 
JVM.
+  // Entries expire after 1200 seconds (20 minutes) by default.
+  // The JVM level cache helps merge the pre-image for the row with multiple 
CFs.
+  // The key of the cache contains (regionId + data table rowkey).
+  // The value contains pre-image that needs to be directly inserted in the 
CDC index.
+  private static volatile Cache<ImmutableBytesPtr, Map<String, Object>> 
sharedTtlImageCache;
+
   private CDCCompactionUtil() {
     // empty
   }
 
+  /**
+   * Gets the shared row image cache, initializing it lazily with 
configuration.
+   * @param config The Hadoop configuration to read cache expiry from
+   * @return the shared cache instance
+   */
+  static Cache<ImmutableBytesPtr, Map<String, Object>>
+    getSharedRowImageCache(Configuration config) {
+    if (sharedTtlImageCache == null) {
+      synchronized (CDCCompactionUtil.class) {
+        if (sharedTtlImageCache == null) {
+          int expirySeconds = 
config.getInt(QueryServices.CDC_TTL_SHARED_CACHE_EXPIRY_SECONDS,
+            QueryServicesOptions.DEFAULT_CDC_TTL_SHARED_CACHE_EXPIRY_SECONDS);
+          sharedTtlImageCache =
+            CacheBuilder.newBuilder().expireAfterWrite(expirySeconds, 
TimeUnit.SECONDS).build();
+          LOGGER.info("Initialized shared CDC row image cache with expiry of 
{} seconds",
+            expirySeconds);
+        }
+      }
+    }
+    return sharedTtlImageCache;
+  }
+
+  /**
+   * Batch processor for CDC mutations during compaction. This class manages 
accumulating mutations
+   * and maintaining in-memory image tracking for the duration of the 
compaction operation.
+   */
+  public static class CDCBatchProcessor {
+
+    private final Map<ImmutableBytesPtr, Put> pendingMutations;
+    private final PTable cdcIndex;
+    private final PTable dataTable;
+    private final RegionCoprocessorEnvironment env;
+    private final Region region;
+    private final byte[] compactionTimeBytes;
+    private final long eventTimestamp;
+    private final String tableName;
+    private final int cdcTtlMutationMaxRetries;
+    private final int batchSize;
+    private final Configuration config;
+
+    public CDCBatchProcessor(PTable cdcIndex, PTable dataTable, 
RegionCoprocessorEnvironment env,
+      Region region, byte[] compactionTimeBytes, long eventTimestamp, String 
tableName,
+      int cdcTtlMutationMaxRetries, int batchSize) {
+      this.pendingMutations = new HashMap<>();
+      this.cdcIndex = cdcIndex;
+      this.dataTable = dataTable;
+      this.env = env;
+      this.region = region;
+      this.compactionTimeBytes = compactionTimeBytes;
+      this.eventTimestamp = eventTimestamp;
+      this.tableName = tableName;
+      this.cdcTtlMutationMaxRetries = cdcTtlMutationMaxRetries;
+      this.batchSize = batchSize;
+      this.config = env.getConfiguration();
+    }
+
+    /**
+     * Adds a CDC event for the specified expired row. If the row already 
exists in memory, merges
+     * the image with the existing image. Accumulates mutations for batching.
+     * @param expiredRow The expired row.
+     * @throws Exception If something goes wrong.
+     */
+    public void addCDCEvent(List<Cell> expiredRow) throws Exception {
+      Cell firstCell = expiredRow.get(0);
+      byte[] dataRowKey = CellUtil.cloneRow(firstCell);
+
+      Put expiredRowPut = new Put(dataRowKey);
+      for (Cell cell : expiredRow) {
+        expiredRowPut.add(cell);
+      }
+
+      IndexMaintainer cdcIndexMaintainer;
+      // rowKey for the Index mutation
+      byte[] rowKey;
+      try (PhoenixConnection serverConnection =
+        QueryUtil.getConnectionOnServer(new Properties(), 
env.getConfiguration())
+          .unwrap(PhoenixConnection.class)) {
+        cdcIndexMaintainer = cdcIndex.getIndexMaintainer(dataTable, 
serverConnection);
+
+        ValueGetter dataRowVG = new IndexUtil.SimpleValueGetter(expiredRowPut);
+        ImmutableBytesPtr rowKeyPtr = new 
ImmutableBytesPtr(expiredRowPut.getRow());
+
+        Put cdcIndexPut = 
cdcIndexMaintainer.buildUpdateMutation(GenericKeyValueBuilder.INSTANCE,
+          dataRowVG, rowKeyPtr, eventTimestamp, null, null, false,
+          region.getRegionInfo().getEncodedNameAsBytes());
+
+        rowKey = cdcIndexPut.getRow().clone();
+        System.arraycopy(compactionTimeBytes, 0, rowKey, 
PartitionIdFunction.PARTITION_ID_LENGTH,
+          PDate.INSTANCE.getByteSize());
+      }
+
+      byte[] rowKeyWithoutTimestamp = new byte[rowKey.length - 
PDate.INSTANCE.getByteSize()];
+      // copy PARTITION_ID() from offset 0 to 31
+      System.arraycopy(rowKey, 0, rowKeyWithoutTimestamp, 0,
+        PartitionIdFunction.PARTITION_ID_LENGTH);
+      // copy data table rowkey from offset (32 + 8) to end of rowkey
+      System.arraycopy(rowKey,
+        PartitionIdFunction.PARTITION_ID_LENGTH + PDate.INSTANCE.getByteSize(),
+        rowKeyWithoutTimestamp, PartitionIdFunction.PARTITION_ID_LENGTH,
+        rowKeyWithoutTimestamp.length - 
PartitionIdFunction.PARTITION_ID_LENGTH);
+      ImmutableBytesPtr cacheKeyPtr = new 
ImmutableBytesPtr(rowKeyWithoutTimestamp);
+
+      // Check if we already have an image for this row in the shared cache, 
from other store
+      // compaction of the same region
+      Cache<ImmutableBytesPtr, Map<String, Object>> cache = 
getSharedRowImageCache(config);
+      Map<String, Object> existingPreImage = cache.getIfPresent(cacheKeyPtr);
+      if (existingPreImage == null) {
+        existingPreImage = new HashMap<>();
+        cache.put(cacheKeyPtr, existingPreImage);
+      }
+
+      // Create CDC event with merged pre-image
+      Map<String, Object> cdcEvent =
+        createTTLDeleteCDCEvent(expiredRowPut, dataTable, existingPreImage);
+      byte[] cdcEventBytes = 
JacksonUtil.getObjectWriter(HashMap.class).writeValueAsBytes(cdcEvent);
+      Put cdcIndexPut = buildCDCIndexPut(eventTimestamp, cdcEventBytes, 
rowKey, cdcIndexMaintainer);
+
+      pendingMutations.put(cacheKeyPtr, cdcIndexPut);
+
+      if (pendingMutations.size() >= batchSize) {
+        flushBatch();
+      }
+    }
+
+    /**
+     * Flushes any pending mutations in the current batch.
+     */
+    public void flushBatch() throws Exception {
+      if (pendingMutations.isEmpty()) {
+        return;
+      }
+
+      Exception lastException = null;
+      for (int retryCount = 0; retryCount < cdcTtlMutationMaxRetries; 
retryCount++) {
+        try (Table cdcIndexTable =
+          
env.getConnection().getTable(TableName.valueOf(cdcIndex.getPhysicalName().getBytes())))
 {
+          cdcIndexTable.put(new ArrayList<>(pendingMutations.values()));

Review Comment:
   Since we are using batchMutate() on the table where all the rowkeys are most 
likely expected to go to same region, either all mutations will be successful 
or none



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@phoenix.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to