This is an automated email from the ASF dual-hosted git repository. vjasani pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/master by this push: new 26444e0ccc PHOENIX-7677 TTL_DELETE CDC event to use batch mutation (#2247) 26444e0ccc is described below commit 26444e0cccb4bf13fa95ec130ecf07b7573fe635 Author: Viraj Jasani <vjas...@apache.org> AuthorDate: Tue Jul 29 21:51:39 2025 -0700 PHOENIX-7677 TTL_DELETE CDC event to use batch mutation (#2247) --- .../org/apache/phoenix/query/QueryServices.java | 6 + .../apache/phoenix/query/QueryServicesOptions.java | 8 +- .../phoenix/coprocessor/CDCCompactionUtil.java | 452 ++++++++++++--------- .../phoenix/coprocessor/CompactionScanner.java | 36 +- .../org/apache/phoenix/end2end/TableTTLIT.java | 126 ++++++ .../phoenix/schema/ConditionalTTLExpressionIT.java | 20 + .../java/org/apache/phoenix/util/TestUtil.java | 13 +- 7 files changed, 454 insertions(+), 207 deletions(-) diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java index 94f8574361..e3f494897d 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java @@ -503,6 +503,12 @@ public interface QueryServices extends SQLCloseable { // CDC TTL mutation retry configuration String CDC_TTL_MUTATION_MAX_RETRIES = "phoenix.cdc.ttl.mutation.max.retries"; + // CDC TTL mutation batch size configuration + String CDC_TTL_MUTATION_BATCH_SIZE = "phoenix.cdc.ttl.mutation.batch.size"; + + // CDC TTL shared cache expiration time in seconds + String CDC_TTL_SHARED_CACHE_EXPIRY_SECONDS = "phoenix.cdc.ttl.shared.cache.expiry.seconds"; + // This config is used to move (copy and delete) the child links from the SYSTEM.CATALOG to // SYSTEM.CHILD_LINK table. // As opposed to a copy and async (out of band) delete. diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java index 16c1c28709..f6f44c23c1 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java @@ -24,7 +24,9 @@ import static org.apache.phoenix.query.QueryServices.ALLOW_VIEWS_ADD_NEW_CF_BASE import static org.apache.phoenix.query.QueryServices.AUTO_UPGRADE_ENABLED; import static org.apache.phoenix.query.QueryServices.CALL_QUEUE_PRODUCER_ATTRIB_NAME; import static org.apache.phoenix.query.QueryServices.CALL_QUEUE_ROUND_ROBIN_ATTRIB; +import static org.apache.phoenix.query.QueryServices.CDC_TTL_MUTATION_BATCH_SIZE; import static org.apache.phoenix.query.QueryServices.CDC_TTL_MUTATION_MAX_RETRIES; +import static org.apache.phoenix.query.QueryServices.CDC_TTL_SHARED_CACHE_EXPIRY_SECONDS; import static org.apache.phoenix.query.QueryServices.CLIENT_INDEX_ASYNC_THRESHOLD; import static org.apache.phoenix.query.QueryServices.CLIENT_METRICS_TAG; import static org.apache.phoenix.query.QueryServices.CLIENT_SPOOL_THRESHOLD_BYTES_ATTRIB; @@ -493,6 +495,8 @@ public class QueryServicesOptions { public static final Boolean DEFAULT_CQSI_THREAD_POOL_ALLOW_CORE_THREAD_TIMEOUT = true; public static final Boolean DEFAULT_CQSI_THREAD_POOL_METRICS_ENABLED = false; public static final int DEFAULT_CDC_TTL_MUTATION_MAX_RETRIES = 5; + public static final int DEFAULT_CDC_TTL_MUTATION_BATCH_SIZE = 50; + public static final int DEFAULT_CDC_TTL_SHARED_CACHE_EXPIRY_SECONDS = 1200; public static final long DEFAULT_PHOENIX_CDC_STREAM_PARTITION_EXPIRY_MIN_AGE_MS = 30 * 60 * 60 * 1000; // 30 hours @@ -613,7 +617,9 @@ public class QueryServicesOptions { .setIfUnset(CQSI_THREAD_POOL_ALLOW_CORE_THREAD_TIMEOUT, DEFAULT_CQSI_THREAD_POOL_ALLOW_CORE_THREAD_TIMEOUT) .setIfUnset(CQSI_THREAD_POOL_METRICS_ENABLED, DEFAULT_CQSI_THREAD_POOL_METRICS_ENABLED) - .setIfUnset(CDC_TTL_MUTATION_MAX_RETRIES, DEFAULT_CDC_TTL_MUTATION_MAX_RETRIES); + .setIfUnset(CDC_TTL_MUTATION_MAX_RETRIES, DEFAULT_CDC_TTL_MUTATION_MAX_RETRIES) + .setIfUnset(CDC_TTL_MUTATION_BATCH_SIZE, DEFAULT_CDC_TTL_MUTATION_BATCH_SIZE) + .setIfUnset(CDC_TTL_SHARED_CACHE_EXPIRY_SECONDS, DEFAULT_CDC_TTL_SHARED_CACHE_EXPIRY_SECONDS); // HBase sets this to 1, so we reset it to something more appropriate. // Hopefully HBase will change this, because we can't know if a user set diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CDCCompactionUtil.java b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CDCCompactionUtil.java index d078a3fae6..9921284591 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CDCCompactionUtil.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CDCCompactionUtil.java @@ -20,18 +20,17 @@ package org.apache.phoenix.coprocessor; import static org.apache.phoenix.query.QueryConstants.NAME_SEPARATOR; import java.io.IOException; +import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.concurrent.TimeUnit; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.CheckAndMutate; -import org.apache.hadoop.hbase.client.CheckAndMutateResult; -import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.regionserver.Region; @@ -43,6 +42,8 @@ import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; import org.apache.phoenix.index.IndexMaintainer; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.query.QueryConstants; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.query.QueryServicesOptions; import org.apache.phoenix.schema.PColumn; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.types.PDate; @@ -53,19 +54,229 @@ import org.apache.phoenix.util.QueryUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.phoenix.thirdparty.com.google.common.cache.Cache; +import org.apache.phoenix.thirdparty.com.google.common.cache.CacheBuilder; + /** * Utility class for CDC (Change Data Capture) operations during compaction. This class contains - * utilities for handling TTL row expiration events and generating CDC events with pre-image data - * that are written directly to CDC index tables. + * utilities for handling TTL row expiration events and generating CDC events with pre-image data. + * CDC mutations are accumulated during compaction and written to CDC index tables in batches only + * when compaction completes. */ public final class CDCCompactionUtil { private static final Logger LOGGER = LoggerFactory.getLogger(CDCCompactionUtil.class); + // Shared cache for row images across all CompactionScanner instances in the JVM. + // Entries expire after 1200 seconds (20 minutes) by default. + // The JVM level cache helps merge the pre-image for the row with multiple CFs. + // The key of the cache contains (regionId + data table rowkey). + // The value contains pre-image that needs to be directly inserted in the CDC index. + private static volatile Cache<ImmutableBytesPtr, Map<String, Object>> sharedTtlImageCache; + private CDCCompactionUtil() { // empty } + /** + * Gets the shared row image cache, initializing it lazily with configuration. + * @param config The Hadoop configuration to read cache expiry from + * @return the shared cache instance + */ + static Cache<ImmutableBytesPtr, Map<String, Object>> + getSharedRowImageCache(Configuration config) { + if (sharedTtlImageCache == null) { + synchronized (CDCCompactionUtil.class) { + if (sharedTtlImageCache == null) { + int expirySeconds = config.getInt(QueryServices.CDC_TTL_SHARED_CACHE_EXPIRY_SECONDS, + QueryServicesOptions.DEFAULT_CDC_TTL_SHARED_CACHE_EXPIRY_SECONDS); + sharedTtlImageCache = + CacheBuilder.newBuilder().expireAfterWrite(expirySeconds, TimeUnit.SECONDS).build(); + LOGGER.info("Initialized shared CDC row image cache with expiry of {} seconds", + expirySeconds); + } + } + } + return sharedTtlImageCache; + } + + /** + * Batch processor for CDC mutations during compaction. This class accumulates all mutations + * during the compaction operation and writes them to the CDC index in batches only when the + * compaction is complete. + */ + public static class CDCBatchProcessor { + + private final Map<ImmutableBytesPtr, Put> pendingMutations; + private final PTable cdcIndex; + private final PTable dataTable; + private final RegionCoprocessorEnvironment env; + private final Region region; + private final byte[] compactionTimeBytes; + private final long eventTimestamp; + private final String tableName; + private final int cdcTtlMutationMaxRetries; + private final int batchSize; + private final Configuration config; + + public CDCBatchProcessor(PTable cdcIndex, PTable dataTable, RegionCoprocessorEnvironment env, + Region region, byte[] compactionTimeBytes, long eventTimestamp, String tableName, + int cdcTtlMutationMaxRetries, int batchSize) { + this.pendingMutations = new HashMap<>(); + this.cdcIndex = cdcIndex; + this.dataTable = dataTable; + this.env = env; + this.region = region; + this.compactionTimeBytes = compactionTimeBytes; + this.eventTimestamp = eventTimestamp; + this.tableName = tableName; + this.cdcTtlMutationMaxRetries = cdcTtlMutationMaxRetries; + this.batchSize = batchSize; + this.config = env.getConfiguration(); + } + + /** + * Adds a CDC event for the specified expired row. If the row already exists in memory, merges + * the image with the existing image. Accumulates mutations in memory for batch processing + * during close() instead of immediately writing to the CDC index. + * @param expiredRow The expired row. + * @throws Exception If something goes wrong. + */ + public void addCDCEvent(List<Cell> expiredRow) throws Exception { + Cell firstCell = expiredRow.get(0); + byte[] dataRowKey = CellUtil.cloneRow(firstCell); + + Put expiredRowPut = new Put(dataRowKey); + for (Cell cell : expiredRow) { + expiredRowPut.add(cell); + } + + IndexMaintainer cdcIndexMaintainer; + // rowKey for the Index mutation + byte[] rowKey; + try (PhoenixConnection serverConnection = + QueryUtil.getConnectionOnServer(new Properties(), env.getConfiguration()) + .unwrap(PhoenixConnection.class)) { + cdcIndexMaintainer = cdcIndex.getIndexMaintainer(dataTable, serverConnection); + + ValueGetter dataRowVG = new IndexUtil.SimpleValueGetter(expiredRowPut); + ImmutableBytesPtr rowKeyPtr = new ImmutableBytesPtr(expiredRowPut.getRow()); + + Put cdcIndexPut = cdcIndexMaintainer.buildUpdateMutation(GenericKeyValueBuilder.INSTANCE, + dataRowVG, rowKeyPtr, eventTimestamp, null, null, false, + region.getRegionInfo().getEncodedNameAsBytes()); + + rowKey = cdcIndexPut.getRow().clone(); + System.arraycopy(compactionTimeBytes, 0, rowKey, PartitionIdFunction.PARTITION_ID_LENGTH, + PDate.INSTANCE.getByteSize()); + } + + byte[] rowKeyWithoutTimestamp = new byte[rowKey.length - PDate.INSTANCE.getByteSize()]; + // copy PARTITION_ID() from offset 0 to 31 + System.arraycopy(rowKey, 0, rowKeyWithoutTimestamp, 0, + PartitionIdFunction.PARTITION_ID_LENGTH); + // copy data table rowkey from offset (32 + 8) to end of rowkey + System.arraycopy(rowKey, + PartitionIdFunction.PARTITION_ID_LENGTH + PDate.INSTANCE.getByteSize(), + rowKeyWithoutTimestamp, PartitionIdFunction.PARTITION_ID_LENGTH, + rowKeyWithoutTimestamp.length - PartitionIdFunction.PARTITION_ID_LENGTH); + ImmutableBytesPtr cacheKeyPtr = new ImmutableBytesPtr(rowKeyWithoutTimestamp); + + // Check if we already have an image for this row in the shared cache, from other store + // compaction of the same region + Cache<ImmutableBytesPtr, Map<String, Object>> cache = getSharedRowImageCache(config); + Map<String, Object> existingPreImage = cache.getIfPresent(cacheKeyPtr); + if (existingPreImage == null) { + existingPreImage = new HashMap<>(); + cache.put(cacheKeyPtr, existingPreImage); + } + + // Create CDC event with merged pre-image + Map<String, Object> cdcEvent = + createTTLDeleteCDCEvent(expiredRowPut, dataTable, existingPreImage); + byte[] cdcEventBytes = JacksonUtil.getObjectWriter(HashMap.class).writeValueAsBytes(cdcEvent); + Put cdcIndexPut = buildCDCIndexPut(eventTimestamp, cdcEventBytes, rowKey, cdcIndexMaintainer); + + pendingMutations.put(cacheKeyPtr, cdcIndexPut); + } + + /** + * Flushes a specific list of mutations to the CDC index table. + * @param mutations List of mutations to flush + */ + private void flushMutations(List<Put> mutations) throws Exception { + if (mutations.isEmpty()) { + return; + } + + Exception lastException = null; + for (int retryCount = 0; retryCount < cdcTtlMutationMaxRetries; retryCount++) { + try (Table cdcIndexTable = + env.getConnection().getTable(TableName.valueOf(cdcIndex.getPhysicalName().getBytes()))) { + cdcIndexTable.put(mutations); + lastException = null; + LOGGER.debug("Successfully flushed batch of {} CDC mutations for table {}", + mutations.size(), tableName); + break; + } catch (Exception e) { + lastException = e; + long backoffMs = 100; + LOGGER.warn("CDC batch mutation attempt {}/{} failed, retrying in {}ms. Batch size: {}", + retryCount + 1, cdcTtlMutationMaxRetries, backoffMs, mutations.size(), e); + try { + Thread.sleep(backoffMs); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + throw new IOException("Interrupted during CDC batch mutation retry", ie); + } + } + } + + if (lastException != null) { + LOGGER.error( + "Failed to flush CDC batch after {} attempts for table {}, index {}. {} " + + "events are missed.", + cdcTtlMutationMaxRetries, tableName, cdcIndex.getPhysicalName().getString(), + mutations.size(), lastException); + } + } + + /** + * Finalizes the batch processor by flushing all accumulated mutations in batches. This method + * processes all accumulated mutations and writes them to the CDC index in batches of the + * configured batch size. + */ + public void close() throws Exception { + if (pendingMutations.isEmpty()) { + LOGGER.trace("No CDC mutations to flush for table {}", tableName); + return; + } + + int totalMutations = pendingMutations.size(); + LOGGER.info("Flushing {} accumulated CDC mutations for table {} in batches of {}", + totalMutations, tableName, batchSize); + + List<Put> allMutations = new ArrayList<>(pendingMutations.values()); + + for (int i = 0; i < allMutations.size(); i += batchSize) { + int endIndex = Math.min(i + batchSize, allMutations.size()); + List<Put> batch = allMutations.subList(i, endIndex); + flushMutations(batch); + LOGGER.debug("Flushed CDC batch {}/{} for table {} (mutations {}-{} of {})", + (i / batchSize) + 1, (allMutations.size() + batchSize - 1) / batchSize, tableName, i + 1, + endIndex, totalMutations); + } + + pendingMutations.clear(); + + Cache<ImmutableBytesPtr, Map<String, Object>> cache = getSharedRowImageCache(config); + LOGGER.info( + "CDC batch processor closed for table {}. Processed {} mutations in {} batches." + + " Shared cache size: {}", + tableName, totalMutations, (totalMutations + batchSize - 1) / batchSize, cache.size()); + } + } + /** * Finds the column name for a given cell in the data table. * @param dataTable The data table @@ -128,223 +339,80 @@ public final class CDCCompactionUtil { /** * Builds CDC index Put mutation. - * @param cdcIndex The CDC index table - * @param expiredRowPut The expired row data as a Put - * @param eventTimestamp The timestamp for the CDC event - * @param cdcEventBytes The CDC event data to store - * @param dataTable The data table - * @param env The region coprocessor environment - * @param region The HBase region - * @param compactionTimeBytes The compaction time as bytes + * @param eventTimestamp The timestamp for the CDC event + * @param cdcEventBytes The CDC event data to store + * @param rowKey The rowKey of the CDC index mutation + * @param cdcIndexMaintainer The index maintainer object for the CDC index * @return The CDC index Put mutation */ - private static Put buildCDCIndexPut(PTable cdcIndex, Put expiredRowPut, long eventTimestamp, - byte[] cdcEventBytes, PTable dataTable, RegionCoprocessorEnvironment env, Region region, - byte[] compactionTimeBytes) throws Exception { + private static Put buildCDCIndexPut(long eventTimestamp, byte[] cdcEventBytes, byte[] rowKey, + IndexMaintainer cdcIndexMaintainer) { - try (PhoenixConnection serverConnection = - QueryUtil.getConnectionOnServer(new Properties(), env.getConfiguration()) - .unwrap(PhoenixConnection.class)) { + Put newCdcIndexPut = new Put(rowKey, eventTimestamp); - IndexMaintainer cdcIndexMaintainer = cdcIndex.getIndexMaintainer(dataTable, serverConnection); + newCdcIndexPut.addColumn(cdcIndexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary(), + cdcIndexMaintainer.getEmptyKeyValueQualifier(), eventTimestamp, + QueryConstants.UNVERIFIED_BYTES); - ValueGetter dataRowVG = new IndexUtil.SimpleValueGetter(expiredRowPut); - ImmutableBytesPtr rowKeyPtr = new ImmutableBytesPtr(expiredRowPut.getRow()); + // Add CDC event data + newCdcIndexPut.addColumn(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, + QueryConstants.CDC_IMAGE_CQ_BYTES, eventTimestamp, cdcEventBytes); - Put cdcIndexPut = cdcIndexMaintainer.buildUpdateMutation(GenericKeyValueBuilder.INSTANCE, - dataRowVG, rowKeyPtr, eventTimestamp, null, null, false, - region.getRegionInfo().getEncodedNameAsBytes()); - - byte[] rowKey = cdcIndexPut.getRow().clone(); - System.arraycopy(compactionTimeBytes, 0, rowKey, PartitionIdFunction.PARTITION_ID_LENGTH, - PDate.INSTANCE.getByteSize()); - Put newCdcIndexPut = new Put(rowKey, eventTimestamp); - - newCdcIndexPut.addColumn(cdcIndexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary(), - cdcIndexMaintainer.getEmptyKeyValueQualifier(), eventTimestamp, - QueryConstants.UNVERIFIED_BYTES); - - // Add CDC event data - newCdcIndexPut.addColumn(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, - QueryConstants.CDC_IMAGE_CQ_BYTES, eventTimestamp, cdcEventBytes); - - return newCdcIndexPut; - } + return newCdcIndexPut; } /** - * Generates and applies a CDC index mutation for TTL expired row with retries if required. - * @param cdcIndex The CDC index table + * Creates a CDC batch processor for the given data table and configuration. * @param dataTable The data table - * @param expiredRowPut The expired row data as a Put - * @param eventTimestamp The timestamp for the CDC event - * @param tableName The table name for logging * @param env The region coprocessor environment * @param region The HBase region * @param compactionTimeBytes The compaction time as bytes - * @param cdcTtlMutationMaxRetries Maximum retry attempts for CDC mutations - */ - private static void generateCDCIndexMutation(PTable cdcIndex, PTable dataTable, Put expiredRowPut, - long eventTimestamp, String tableName, RegionCoprocessorEnvironment env, Region region, - byte[] compactionTimeBytes, int cdcTtlMutationMaxRetries) throws Exception { - Map<String, Object> cdcEvent = - createTTLDeleteCDCEvent(expiredRowPut, dataTable, new HashMap<>()); - byte[] cdcEventBytes = JacksonUtil.getObjectWriter(HashMap.class).writeValueAsBytes(cdcEvent); - Put cdcIndexPut = buildCDCIndexPut(cdcIndex, expiredRowPut, eventTimestamp, cdcEventBytes, - dataTable, env, region, compactionTimeBytes); - - Exception lastException = null; - for (int retryCount = 0; retryCount < cdcTtlMutationMaxRetries; retryCount++) { - try (Table cdcIndexTable = - env.getConnection().getTable(TableName.valueOf(cdcIndex.getPhysicalName().getBytes()))) { - CheckAndMutate checkAndMutate = CheckAndMutate.newBuilder(cdcIndexPut.getRow()) - .ifNotExists(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, - QueryConstants.CDC_IMAGE_CQ_BYTES) - .build(cdcIndexPut); - CheckAndMutateResult result = cdcIndexTable.checkAndMutate(checkAndMutate); - - if (result.isSuccess()) { - // Successfully inserted new CDC event - Single CF case - lastException = null; - break; - } else { - // Row already exists, need to retrieve existing pre-image and merge - // Likely to happen for multi CF case - Get get = new Get(cdcIndexPut.getRow()); - get.addColumn(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, - QueryConstants.CDC_IMAGE_CQ_BYTES); - Result existingResult = cdcIndexTable.get(get); - - if (!existingResult.isEmpty()) { - Cell existingCell = existingResult.getColumnLatestCell( - QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, QueryConstants.CDC_IMAGE_CQ_BYTES); - - if (existingCell != null) { - byte[] existingCdcBytes = CellUtil.cloneValue(existingCell); - Map<String, Object> existingCdcEvent = - JacksonUtil.getObjectReader(HashMap.class).readValue(existingCdcBytes); - Map<String, Object> existingPreImage = (Map<String, Object>) existingCdcEvent - .getOrDefault(QueryConstants.CDC_PRE_IMAGE, new HashMap<>()); - - // Create new TTL delete event with merged pre-image - Map<String, Object> mergedCdcEvent = - createTTLDeleteCDCEvent(expiredRowPut, dataTable, existingPreImage); - byte[] mergedCdcEventBytes = - JacksonUtil.getObjectWriter(HashMap.class).writeValueAsBytes(mergedCdcEvent); - - Put mergedCdcIndexPut = buildCDCIndexPut(cdcIndex, expiredRowPut, eventTimestamp, - mergedCdcEventBytes, dataTable, env, region, compactionTimeBytes); - - cdcIndexTable.put(mergedCdcIndexPut); - lastException = null; - break; - } else { - LOGGER.warn("Rare event: Skipping CDC TTL mutation because other type" - + " of CDC event is recorded at time {}", eventTimestamp); - break; - } - } else { - LOGGER.warn("Rare event.. Skipping CDC TTL mutation because other type" - + " of CDC event is recorded at time {}", eventTimestamp); - break; - } - } - } catch (Exception e) { - lastException = e; - long backoffMs = 100; - LOGGER.warn("CDC mutation attempt {}/{} failed, retrying in {}ms", retryCount + 1, - cdcTtlMutationMaxRetries + 1, backoffMs, e); - try { - Thread.sleep(backoffMs); - } catch (InterruptedException ie) { - Thread.currentThread().interrupt(); - throw new IOException("Interrupted during CDC mutation retry", ie); - } - } - } - if (lastException != null) { - LOGGER.error( - "Failed to generate CDC mutation after {} attempts for table {}, index " - + "{}. The event update is missed.", - cdcTtlMutationMaxRetries + 1, tableName, cdcIndex.getPhysicalName().getString(), - lastException); - } - } - - /** - * Generates CDC TTL delete event and writes it directly to CDC index tables. This bypasses the - * normal CDC update path since the row is being expired. - * @param expiredRow The cells of the expired row - * @param tableName The table name for logging * @param compactionTime The compaction timestamp - * @param dataTable The data table - * @param env The region coprocessor environment - * @param region The HBase region - * @param compactionTimeBytes The compaction time as bytes + * @param tableName The table name for logging * @param cdcTtlMutationMaxRetries Maximum retry attempts for CDC mutations + * @param batchSize The batch size for CDC mutations + * @return CDCBatchProcessor instance or null if no active CDC index */ - private static void generateCDCTTLDeleteEvent(List<Cell> expiredRow, String tableName, - long compactionTime, PTable dataTable, RegionCoprocessorEnvironment env, Region region, - byte[] compactionTimeBytes, int cdcTtlMutationMaxRetries) { - try { - PTable cdcIndex = CDCUtil.getActiveCDCIndex(dataTable); - if (cdcIndex == null) { - LOGGER.warn("No active CDC index found for table {}", tableName); - return; - } - Cell firstCell = expiredRow.get(0); - byte[] dataRowKey = CellUtil.cloneRow(firstCell); - Put expiredRowPut = new Put(dataRowKey); - - for (Cell cell : expiredRow) { - expiredRowPut.add(cell); - } - - try { - generateCDCIndexMutation(cdcIndex, dataTable, expiredRowPut, compactionTime, tableName, env, - region, compactionTimeBytes, cdcTtlMutationMaxRetries); - } catch (Exception e) { - LOGGER.error("Failed to generate CDC mutation for index {}: {}", - cdcIndex.getName().getString(), e.getMessage(), e); - } - } catch (Exception e) { - LOGGER.error("Error generating CDC TTL delete event for table {}", tableName, e); + public static CDCBatchProcessor createBatchProcessor(PTable dataTable, + RegionCoprocessorEnvironment env, Region region, byte[] compactionTimeBytes, + long compactionTime, String tableName, int cdcTtlMutationMaxRetries, int batchSize) { + PTable cdcIndex = CDCUtil.getActiveCDCIndex(dataTable); + if (cdcIndex == null) { + LOGGER.warn("No active CDC index found for table {}", tableName); + return null; } + return new CDCBatchProcessor(cdcIndex, dataTable, env, region, compactionTimeBytes, + compactionTime, tableName, cdcTtlMutationMaxRetries, batchSize); } /** - * Handles TTL row expiration for CDC event generation. This method is called when a row is - * detected as expired during major compaction. - * @param expiredRow The cells of the expired row - * @param expirationType The type of TTL expiration - * @param tableName The table name for logging purposes - * @param compactionTime The timestamp when compaction started - * @param table The Phoenix data table metadata - * @param env The region coprocessor environment for accessing HBase - * resources - * @param region The HBase region being compacted - * @param compactionTimeBytes The compaction timestamp as byte array for CDC index row key - * construction - * @param cdcTtlMutationMaxRetries Maximum number of retry attempts for CDC mutation operations + * Handles TTL row expiration for CDC event generation using batch processing. This method is + * called when a row is detected as expired during major compaction. + * @param expiredRow The cells of the expired row + * @param expirationType The type of TTL expiration + * @param tableName The table name for logging purposes + * @param batchProcessor The CDC batch processor instance */ static void handleTTLRowExpiration(List<Cell> expiredRow, String expirationType, String tableName, - long compactionTime, PTable table, RegionCoprocessorEnvironment env, Region region, - byte[] compactionTimeBytes, int cdcTtlMutationMaxRetries) { + CDCBatchProcessor batchProcessor) { + if (batchProcessor == null) { + return; + } + try { Cell firstCell = expiredRow.get(0); byte[] rowKey = CellUtil.cloneRow(firstCell); - LOGGER.info( + LOGGER.debug( "TTL row expiration detected: table={}, rowKey={}, expirationType={}, " + "cellCount={}, compactionTime={}", - tableName, Bytes.toStringBinary(rowKey), expirationType, expiredRow.size(), compactionTime); - - // Generate CDC TTL delete event with pre-image data - generateCDCTTLDeleteEvent(expiredRow, tableName, compactionTime, table, env, region, - compactionTimeBytes, cdcTtlMutationMaxRetries); + tableName, Bytes.toStringBinary(rowKey), expirationType, expiredRow.size(), + batchProcessor.eventTimestamp); + batchProcessor.addCDCEvent(expiredRow); } catch (Exception e) { LOGGER.error("Error handling TTL row expiration for CDC: table {}", tableName, e); } } + } diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java index c82a421e9b..f12dc77f72 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java @@ -164,6 +164,7 @@ public class CompactionScanner implements InternalScanner { private final boolean isCdcTtlEnabled; private final PTable table; private final int cdcTtlMutationMaxRetries; + private CDCCompactionUtil.CDCBatchProcessor cdcBatchProcessor; // Only for forcing minor compaction while testing private static boolean forceMinorCompaction = false; @@ -216,6 +217,15 @@ public class CompactionScanner implements InternalScanner { env.getConfiguration().getInt(QueryServices.CDC_TTL_MUTATION_MAX_RETRIES, QueryServicesOptions.DEFAULT_CDC_TTL_MUTATION_MAX_RETRIES); + if (isCdcTtlEnabled) { + int cdcTtlMutationBatchSize = + env.getConfiguration().getInt(QueryServices.CDC_TTL_MUTATION_BATCH_SIZE, + QueryServicesOptions.DEFAULT_CDC_TTL_MUTATION_BATCH_SIZE); + cdcBatchProcessor = + CDCCompactionUtil.createBatchProcessor(table, env, region, compactionTimeBytes, + compactionTime, tableName, cdcTtlMutationMaxRetries, cdcTtlMutationBatchSize); + } + // Initialize the tracker that computes the TTL for the compacting table. // The TTL tracker can be // simple (one single TTL for the table) when the compacting table is not Partitioned @@ -414,9 +424,9 @@ public class CompactionScanner implements InternalScanner { CompiledConditionalTTLExpression ttlExpr = (CompiledConditionalTTLExpression) rowContext.ttlExprForRow; if (ttlExpr.isExpired(result, true)) { - if (isCdcTtlEnabled && !result.isEmpty()) { + if (isCdcTtlEnabled && cdcBatchProcessor != null && !result.isEmpty()) { CDCCompactionUtil.handleTTLRowExpiration(result, "conditional_ttl", tableName, - compactionTime, table, env, region, compactionTimeBytes, cdcTtlMutationMaxRetries); + cdcBatchProcessor); } // If the row is expired, purge the row result.clear(); @@ -452,10 +462,23 @@ public class CompactionScanner implements InternalScanner { LOGGER.info("Closing CompactionScanner for table " + tableName + " store " + columnFamilyName + (major ? " major " : " not major ") + "compaction retained " + outputCellCount + " of " + inputCellCount + " cells" + (phoenixLevelOnly ? " phoenix level only" : "")); + if (forceMinorCompaction) { forceMinorCompaction = false; } storeScanner.close(); + + // Flush any remaining CDC mutations in the batch + if (cdcBatchProcessor != null) { + try { + cdcBatchProcessor.close(); + } catch (Exception e) { + LOGGER.error("Error closing CDC batch processor for table {}", tableName, e); + throw new IOException("Failed to close CDC batch processor", e); + } finally { + CDCCompactionUtil.getSharedRowImageCache(env.getConfiguration()).cleanUp(); + } + } } enum MatcherType { @@ -2427,9 +2450,9 @@ public class CompactionScanner implements InternalScanner { // Only do this check for major compaction as for minor compactions we don't expire cells. // The row version should not be visible via the max lookback window. Nothing to do - if (isCdcTtlEnabled && !lastRow.isEmpty()) { + if (isCdcTtlEnabled && cdcBatchProcessor != null && !lastRow.isEmpty()) { CDCCompactionUtil.handleTTLRowExpiration(lastRow, "time_based_ttl", tableName, - compactionTime, table, env, region, compactionTimeBytes, cdcTtlMutationMaxRetries); + cdcBatchProcessor); } return; } @@ -2521,9 +2544,9 @@ public class CompactionScanner implements InternalScanner { // store is not the empty column family store. return false; } - if (isCdcTtlEnabled && !lastRowVersion.isEmpty()) { + if (isCdcTtlEnabled && cdcBatchProcessor != null && !lastRowVersion.isEmpty()) { CDCCompactionUtil.handleTTLRowExpiration(lastRowVersion, "max_lookback_ttl", tableName, - compactionTime, table, env, region, compactionTimeBytes, cdcTtlMutationMaxRetries); + cdcBatchProcessor); } return true; } @@ -2575,7 +2598,6 @@ public class CompactionScanner implements InternalScanner { lastRowVersion.clear(); lastRowVersion.addAll(trimmedRow); trimmedEmptyColumn.clear(); - ; for (Cell cell : emptyColumn) { if (cell.getTimestamp() >= minTimestamp) { trimmedEmptyColumn.add(cell); diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableTTLIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableTTLIT.java index 97eecf2c42..b7f481d118 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableTTLIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableTTLIT.java @@ -28,8 +28,10 @@ import com.fasterxml.jackson.databind.ObjectMapper; import java.io.IOException; import java.sql.Connection; import java.sql.DriverManager; +import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; +import java.sql.Timestamp; import java.util.Arrays; import java.util.Collection; import java.util.HashMap; @@ -782,6 +784,130 @@ public class TableTTLIT extends BaseTest { TestUtil.majorCompact(getUtility(), table); } + /** + * Test CDC batch mutations for TTL expired rows. This test creates a table with TTL and CDC + * index, inserts 82 rows (to test batching: 25+25+25+7), lets them expire via TTL, and verifies + * that all 82 rows have CDC TTL_DELETE events recorded with correct pre-image data. + */ + @Test + public void testCDCBatchMutationsForTTLExpiredRows() throws Exception { + final int maxLookbackAge = + tableLevelMaxLookback != null ? tableLevelMaxLookback : MAX_LOOKBACK_AGE; + final int numRows = 182; + + try (Connection conn = DriverManager.getConnection(getUrl())) { + String tableName = generateUniqueName(); + String cdcName = generateUniqueName(); + ObjectMapper mapper = new ObjectMapper(); + + createTable(tableName); + conn.createStatement().execute("ALTER TABLE " + tableName + + " SET \"phoenix.max.lookback.age.seconds\" = " + maxLookbackAge); + + String cdcSql = "CREATE CDC " + cdcName + " ON " + tableName + " INCLUDE (PRE, POST)"; + conn.createStatement().execute(cdcSql); + conn.commit(); + + String cdcFullName = SchemaUtil.getTableName(null, cdcName); + + long startTime = System.currentTimeMillis() + 1000; + startTime = (startTime / 1000) * 1000; + EnvironmentEdgeManager.injectEdge(injectEdge); + injectEdge.setValue(startTime); + + // Track post-images for each row to verify against pre-images later + Map<String, Map<String, Object>> lastPostImages = new HashMap<>(); + + for (int i = 1; i <= numRows; i++) { + String rowId = "row" + i; + updateRow(conn, tableName, rowId); + injectEdge.incrementValue(100); + } + + // Get the post-images from the UPSERT events + String cdcQuery = "SELECT /*+ CDC_INCLUDE(PRE, POST) */ * FROM " + cdcFullName; + try (ResultSet rs = conn.createStatement().executeQuery(cdcQuery)) { + while (rs.next()) { + Map<String, Object> cdcEvent = mapper.readValue(rs.getString(3), HashMap.class); + assertEquals("Should be upsert event", CDC_UPSERT_EVENT_TYPE, + cdcEvent.get(CDC_EVENT_TYPE)); + + Map<String, Object> postImage = (Map<String, Object>) cdcEvent.get(CDC_POST_IMAGE); + String rowId = rs.getString(2); + lastPostImages.put(rowId, postImage); + } + } + + assertEquals("Should have captured post-images for all " + numRows + " rows", numRows, + lastPostImages.size()); + + // Advance time past TTL to expire all rows + injectEdge.incrementValue((ttl + maxLookbackAge + 1) * 1000); + + EnvironmentEdgeManager.reset(); + flush(TableName.valueOf(tableName)); + EnvironmentEdgeManager.injectEdge(injectEdge); + + Timestamp ts = new Timestamp(injectEdge.currentTime()); + majorCompact(TableName.valueOf(tableName)); + + // Verify all rows are expired from data table + String dataQuery = "SELECT COUNT(*) FROM " + tableName; + try (ResultSet rs = conn.createStatement().executeQuery(dataQuery)) { + assertTrue("Should have count result", rs.next()); + assertEquals("All rows should be expired from data table", 0, rs.getInt(1)); + } + + // Verify all TTL_DELETE CDC events were generated + String ttlDeleteQuery = "SELECT /*+ CDC_INCLUDE(PRE, POST) */ * FROM " + cdcFullName + + " WHERE PHOENIX_ROW_TIMESTAMP() >= ?"; + + Map<String, Map<String, Object>> ttlDeletePreImages = new HashMap<>(); + int ttlDeleteEventCount = 0; + + try (PreparedStatement pst = conn.prepareStatement(ttlDeleteQuery)) { + pst.setTimestamp(1, ts); + + try (ResultSet rs = pst.executeQuery(ttlDeleteQuery)) { + while (rs.next()) { + ttlDeleteEventCount++; + Map<String, Object> cdcEvent = mapper.readValue(rs.getString(3), HashMap.class); + + assertEquals("Should be ttl_delete event", CDC_TTL_DELETE_EVENT_TYPE, + cdcEvent.get(CDC_EVENT_TYPE)); + + assertTrue("TTL delete should have pre-image", cdcEvent.containsKey(CDC_PRE_IMAGE)); + Map<String, Object> preImage = (Map<String, Object>) cdcEvent.get(CDC_PRE_IMAGE); + assertNotNull("Pre-image should not be null", preImage); + assertFalse("Pre-image should not be empty", preImage.isEmpty()); + + String rowId = rs.getString(2); + ttlDeletePreImages.put(rowId, preImage); + } + } + } + + assertEquals("Should have exactly " + numRows + " TTL_DELETE events", numRows, + ttlDeleteEventCount); + assertEquals("Should have pre-images for all " + numRows + " rows", numRows, + ttlDeletePreImages.size()); + + // Verify that pre-images in TTL_DELETE events match the last post-images from UPSERT events + for (String rowId : lastPostImages.keySet()) { + assertTrue("Should have TTL_DELETE pre-image for row " + rowId, + ttlDeletePreImages.containsKey(rowId)); + + Map<String, Object> lastPostImage = lastPostImages.get(rowId); + Map<String, Object> ttlDeletePreImage = ttlDeletePreImages.get(rowId); + + assertEquals( + "Pre-image in TTL_DELETE should match last post-image from UPSERT for row " + rowId, + lastPostImage, ttlDeletePreImage); + } + + } + } + private void deleteRow(Connection conn, String tableName, String id) throws SQLException { String dml = "DELETE from " + tableName + " WHERE id = '" + id + "'"; conn.createStatement().executeUpdate(dml); diff --git a/phoenix-core/src/it/java/org/apache/phoenix/schema/ConditionalTTLExpressionIT.java b/phoenix-core/src/it/java/org/apache/phoenix/schema/ConditionalTTLExpressionIT.java index c16accfbe7..3cf200e46e 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/schema/ConditionalTTLExpressionIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/schema/ConditionalTTLExpressionIT.java @@ -39,6 +39,8 @@ import static org.junit.Assert.fail; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import java.io.IOException; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; @@ -57,14 +59,17 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import java.util.stream.Stream; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.mapreduce.CounterGroup; +import org.apache.phoenix.coprocessor.CDCCompactionUtil; import org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants; import org.apache.phoenix.end2end.IndexToolIT; import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest; import org.apache.phoenix.end2end.ParallelStatsDisabledIT; import org.apache.phoenix.hbase.index.IndexRegionObserver; +import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.jdbc.PhoenixResultSet; import org.apache.phoenix.mapreduce.index.IndexTool; @@ -101,6 +106,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.phoenix.thirdparty.com.google.common.base.Joiner; +import org.apache.phoenix.thirdparty.com.google.common.cache.Cache; import org.apache.phoenix.thirdparty.com.google.common.collect.Lists; import org.apache.phoenix.thirdparty.com.google.common.collect.Maps; @@ -158,6 +164,7 @@ public class ConditionalTTLExpressionIT extends ParallelStatsDisabledIT { Integer.toString(0)); props.put("hbase.procedure.remote.dispatcher.delay.msec", "0"); props.put(QueryServices.GLOBAL_INDEX_ROW_AGE_THRESHOLD_TO_DELETE_MS_ATTRIB, Long.toString(0)); + props.put(QueryServices.CDC_TTL_SHARED_CACHE_EXPIRY_SECONDS, Integer.toString(1)); setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator())); } @@ -543,6 +550,9 @@ public class ConditionalTTLExpressionIT extends ParallelStatsDisabledIT { // Trigger TTL expiration again injectEdge.incrementValue(ttl); + + Thread.sleep(700); + cleanUpSharedTtlImageCache(); doMajorCompaction(tableName); // Verify all rows are expired from data table @@ -1470,6 +1480,16 @@ public class ConditionalTTLExpressionIT extends ParallelStatsDisabledIT { } } + private void cleanUpSharedTtlImageCache() + throws NoSuchMethodException, InvocationTargetException, IllegalAccessException { + Method getSharedCacheMethod = + CDCCompactionUtil.class.getDeclaredMethod("getSharedRowImageCache", Configuration.class); + getSharedCacheMethod.setAccessible(true); + Cache<ImmutableBytesPtr, Map<String, Object>> cache = (Cache<ImmutableBytesPtr, + Map<String, Object>>) getSharedCacheMethod.invoke(null, (Configuration) null); + cache.cleanUp(); + } + private void doMajorCompaction(String tableName) throws IOException, InterruptedException { TestUtil.flush(getUtility(), TableName.valueOf(tableName)); TestUtil.majorCompact(getUtility(), TableName.valueOf(tableName)); diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java index e08a406af6..6541dd7338 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java @@ -1653,13 +1653,12 @@ public class TestUtil { TableOperation operation) throws Exception { ConnectionQueryServices services = conn.unwrap(PhoenixConnection.class).getQueryServices(); Configuration configuration = services.getConfiguration(); - org.apache.hadoop.hbase.client.Connection hbaseConn = - ConnectionFactory.createConnection(configuration); - Admin admin = services.getAdmin(); - RegionLocator regionLocator = hbaseConn.getRegionLocator(TableName.valueOf(tableName)); - int nRegions = regionLocator.getAllRegionLocations().size(); - operation.execute(admin, regionLocator, nRegions); - + try (org.apache.hadoop.hbase.client.Connection hbaseConn = + ConnectionFactory.createConnection(configuration); Admin admin = services.getAdmin()) { + RegionLocator regionLocator = hbaseConn.getRegionLocator(TableName.valueOf(tableName)); + int nRegions = regionLocator.getAllRegionLocations().size(); + operation.execute(admin, regionLocator, nRegions); + } } private static void waitForRegionChange(RegionLocator regionLocator, int initialRegionCount)