virajjasani commented on code in PR #2247: URL: https://github.com/apache/phoenix/pull/2247#discussion_r2240870681
########## phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CDCCompactionUtil.java: ########## @@ -53,19 +54,206 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.phoenix.thirdparty.com.google.common.cache.Cache; +import org.apache.phoenix.thirdparty.com.google.common.cache.CacheBuilder; + /** * Utility class for CDC (Change Data Capture) operations during compaction. This class contains * utilities for handling TTL row expiration events and generating CDC events with pre-image data - * that are written directly to CDC index tables. + * that are written directly to CDC index tables using batch mutations. */ public final class CDCCompactionUtil { private static final Logger LOGGER = LoggerFactory.getLogger(CDCCompactionUtil.class); + // Shared cache for row images across all CompactionScanner instances in the JVM. + // Entries expire after 1200 seconds (20 minutes) by default. + // The JVM level cache helps merge the pre-image for the row with multiple CFs. + // The key of the cache contains (regionId + data table rowkey). + // The value contains pre-image that needs to be directly inserted in the CDC index. + private static volatile Cache<ImmutableBytesPtr, Map<String, Object>> sharedTtlImageCache; + private CDCCompactionUtil() { // empty } + /** + * Gets the shared row image cache, initializing it lazily with configuration. + * @param config The Hadoop configuration to read cache expiry from + * @return the shared cache instance + */ + static Cache<ImmutableBytesPtr, Map<String, Object>> + getSharedRowImageCache(Configuration config) { + if (sharedTtlImageCache == null) { + synchronized (CDCCompactionUtil.class) { + if (sharedTtlImageCache == null) { + int expirySeconds = config.getInt(QueryServices.CDC_TTL_SHARED_CACHE_EXPIRY_SECONDS, + QueryServicesOptions.DEFAULT_CDC_TTL_SHARED_CACHE_EXPIRY_SECONDS); + sharedTtlImageCache = + CacheBuilder.newBuilder().expireAfterWrite(expirySeconds, TimeUnit.SECONDS).build(); + LOGGER.info("Initialized shared CDC row image cache with expiry of {} seconds", + expirySeconds); + } + } + } + return sharedTtlImageCache; + } + + /** + * Batch processor for CDC mutations during compaction. This class manages accumulating mutations + * and maintaining in-memory image tracking for the duration of the compaction operation. + */ + public static class CDCBatchProcessor { + + private final Map<ImmutableBytesPtr, Put> pendingMutations; + private final PTable cdcIndex; + private final PTable dataTable; + private final RegionCoprocessorEnvironment env; + private final Region region; + private final byte[] compactionTimeBytes; + private final long eventTimestamp; + private final String tableName; + private final int cdcTtlMutationMaxRetries; + private final int batchSize; + private final Configuration config; + + public CDCBatchProcessor(PTable cdcIndex, PTable dataTable, RegionCoprocessorEnvironment env, + Region region, byte[] compactionTimeBytes, long eventTimestamp, String tableName, + int cdcTtlMutationMaxRetries, int batchSize) { + this.pendingMutations = new HashMap<>(); + this.cdcIndex = cdcIndex; + this.dataTable = dataTable; + this.env = env; + this.region = region; + this.compactionTimeBytes = compactionTimeBytes; + this.eventTimestamp = eventTimestamp; + this.tableName = tableName; + this.cdcTtlMutationMaxRetries = cdcTtlMutationMaxRetries; + this.batchSize = batchSize; + this.config = env.getConfiguration(); + } + + /** + * Adds a CDC event for the specified expired row. If the row already exists in memory, merges + * the image with the existing image. Accumulates mutations for batching. + * @param expiredRow The expired row. + * @throws Exception If something goes wrong. + */ + public void addCDCEvent(List<Cell> expiredRow) throws Exception { + Cell firstCell = expiredRow.get(0); + byte[] dataRowKey = CellUtil.cloneRow(firstCell); + + Put expiredRowPut = new Put(dataRowKey); + for (Cell cell : expiredRow) { + expiredRowPut.add(cell); + } + + IndexMaintainer cdcIndexMaintainer; + // rowKey for the Index mutation + byte[] rowKey; + try (PhoenixConnection serverConnection = + QueryUtil.getConnectionOnServer(new Properties(), env.getConfiguration()) + .unwrap(PhoenixConnection.class)) { + cdcIndexMaintainer = cdcIndex.getIndexMaintainer(dataTable, serverConnection); + + ValueGetter dataRowVG = new IndexUtil.SimpleValueGetter(expiredRowPut); + ImmutableBytesPtr rowKeyPtr = new ImmutableBytesPtr(expiredRowPut.getRow()); + + Put cdcIndexPut = cdcIndexMaintainer.buildUpdateMutation(GenericKeyValueBuilder.INSTANCE, + dataRowVG, rowKeyPtr, eventTimestamp, null, null, false, + region.getRegionInfo().getEncodedNameAsBytes()); + + rowKey = cdcIndexPut.getRow().clone(); + System.arraycopy(compactionTimeBytes, 0, rowKey, PartitionIdFunction.PARTITION_ID_LENGTH, + PDate.INSTANCE.getByteSize()); + } + + byte[] rowKeyWithoutTimestamp = new byte[rowKey.length - PDate.INSTANCE.getByteSize()]; + // copy PARTITION_ID() from offset 0 to 31 + System.arraycopy(rowKey, 0, rowKeyWithoutTimestamp, 0, + PartitionIdFunction.PARTITION_ID_LENGTH); + // copy data table rowkey from offset (32 + 8) to end of rowkey + System.arraycopy(rowKey, + PartitionIdFunction.PARTITION_ID_LENGTH + PDate.INSTANCE.getByteSize(), + rowKeyWithoutTimestamp, PartitionIdFunction.PARTITION_ID_LENGTH, + rowKeyWithoutTimestamp.length - PartitionIdFunction.PARTITION_ID_LENGTH); + ImmutableBytesPtr cacheKeyPtr = new ImmutableBytesPtr(rowKeyWithoutTimestamp); + + // Check if we already have an image for this row in the shared cache, from other store + // compaction of the same region + Cache<ImmutableBytesPtr, Map<String, Object>> cache = getSharedRowImageCache(config); + Map<String, Object> existingPreImage = cache.getIfPresent(cacheKeyPtr); + if (existingPreImage == null) { + existingPreImage = new HashMap<>(); + cache.put(cacheKeyPtr, existingPreImage); + } + + // Create CDC event with merged pre-image + Map<String, Object> cdcEvent = + createTTLDeleteCDCEvent(expiredRowPut, dataTable, existingPreImage); + byte[] cdcEventBytes = JacksonUtil.getObjectWriter(HashMap.class).writeValueAsBytes(cdcEvent); + Put cdcIndexPut = buildCDCIndexPut(eventTimestamp, cdcEventBytes, rowKey, cdcIndexMaintainer); + + pendingMutations.put(cacheKeyPtr, cdcIndexPut); + + if (pendingMutations.size() >= batchSize) { + flushBatch(); + } + } + + /** + * Flushes any pending mutations in the current batch. + */ + public void flushBatch() throws Exception { + if (pendingMutations.isEmpty()) { + return; + } + + Exception lastException = null; + for (int retryCount = 0; retryCount < cdcTtlMutationMaxRetries; retryCount++) { + try (Table cdcIndexTable = + env.getConnection().getTable(TableName.valueOf(cdcIndex.getPhysicalName().getBytes()))) { + cdcIndexTable.put(new ArrayList<>(pendingMutations.values())); Review Comment: Since we are using batchMutate() on the table where all the rowkeys are most likely expected to go to same region, either all mutations will be successful or none -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@phoenix.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org