This is an automated email from the ASF dual-hosted git repository. vjasani pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/master by this push: new 8b506916d1 PHOENIX-7653 New CDC Event for TTL expired rows (#2209) 8b506916d1 is described below commit 8b506916d1fa942607945a55b73e8987ef6045bd Author: Viraj Jasani <vjas...@apache.org> AuthorDate: Mon Jul 7 17:22:01 2025 -0700 PHOENIX-7653 New CDC Event for TTL expired rows (#2209) --- .../org/apache/phoenix/query/QueryConstants.java | 3 + .../org/apache/phoenix/query/QueryServices.java | 3 + .../apache/phoenix/query/QueryServicesOptions.java | 6 +- .../org/apache/phoenix/util/CDCChangeBuilder.java | 3 +- .../main/java/org/apache/phoenix/util/CDCUtil.java | 34 +- .../phoenix/coprocessor/CDCCompactionUtil.java | 395 +++++++++++++++++++++ .../coprocessor/CDCGlobalIndexRegionScanner.java | 101 +++++- .../phoenix/coprocessor/CompactionScanner.java | 33 +- .../java/org/apache/phoenix/end2end/Bson3IT.java | 291 ++++++++++++++- .../org/apache/phoenix/end2end/TableTTLIT.java | 255 +++++++++++++ .../phoenix/schema/ConditionalTTLExpressionIT.java | 193 +++++++++- 11 files changed, 1298 insertions(+), 19 deletions(-) diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryConstants.java b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryConstants.java index 9911f5f0a3..57da865493 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryConstants.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryConstants.java @@ -363,6 +363,9 @@ public interface QueryConstants { String CDC_UPSERT_EVENT_TYPE = "upsert"; String CDC_DELETE_EVENT_TYPE = "delete"; String SPLITS_FILE = "SPLITS_FILE"; + String CDC_TTL_DELETE_EVENT_TYPE = "ttl_delete"; + String CDC_IMAGE_CQ = "_CDC_IMG_"; + byte[] CDC_IMAGE_CQ_BYTES = Bytes.toBytes(CDC_IMAGE_CQ); /** * We mark counter values 0 to 10 as reserved. Value 0 is used by diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java index e16e716958..e9d0e8d86e 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java @@ -440,6 +440,9 @@ public interface QueryServices extends SQLCloseable { String CONNECTION_QUERY_SERVICE_HISTOGRAM_SIZE_RANGES = "phoenix.conn.query.service.histogram.size.ranges"; + // CDC TTL mutation retry configuration + String CDC_TTL_MUTATION_MAX_RETRIES = "phoenix.cdc.ttl.mutation.max.retries"; + // This config is used to move (copy and delete) the child links from the SYSTEM.CATALOG to SYSTEM.CHILD_LINK table. // As opposed to a copy and async (out of band) delete. public static final String MOVE_CHILD_LINKS_DURING_UPGRADE_ENABLED = "phoenix.move.child_link.during.upgrade"; diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java index bb5ef67e49..b0ab07d901 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java @@ -24,6 +24,7 @@ import static org.apache.phoenix.query.QueryServices.ALLOW_VIEWS_ADD_NEW_CF_BASE import static org.apache.phoenix.query.QueryServices.AUTO_UPGRADE_ENABLED; import static org.apache.phoenix.query.QueryServices.CALL_QUEUE_PRODUCER_ATTRIB_NAME; import static org.apache.phoenix.query.QueryServices.CALL_QUEUE_ROUND_ROBIN_ATTRIB; +import static org.apache.phoenix.query.QueryServices.CDC_TTL_MUTATION_MAX_RETRIES; import static org.apache.phoenix.query.QueryServices.CLIENT_METRICS_TAG; import static org.apache.phoenix.query.QueryServices.CLIENT_SPOOL_THRESHOLD_BYTES_ATTRIB; import static org.apache.phoenix.query.QueryServices.CLUSTER_ROLE_BASED_MUTATION_BLOCK_ENABLED; @@ -469,7 +470,7 @@ public class QueryServicesOptions { public static final int DEFAULT_CQSI_THREAD_POOL_MAX_QUEUE = 512; public static final Boolean DEFAULT_CQSI_THREAD_POOL_ALLOW_CORE_THREAD_TIMEOUT = true; public static final Boolean DEFAULT_CQSI_THREAD_POOL_METRICS_ENABLED = false; - + public static final int DEFAULT_CDC_TTL_MUTATION_MAX_RETRIES = 5; private final Configuration config; @@ -585,7 +586,8 @@ public class QueryServicesOptions { .setIfUnset(CQSI_THREAD_POOL_MAX_QUEUE, DEFAULT_CQSI_THREAD_POOL_MAX_QUEUE) .setIfUnset(CQSI_THREAD_POOL_ALLOW_CORE_THREAD_TIMEOUT, DEFAULT_CQSI_THREAD_POOL_ALLOW_CORE_THREAD_TIMEOUT) - .setIfUnset(CQSI_THREAD_POOL_METRICS_ENABLED, DEFAULT_CQSI_THREAD_POOL_METRICS_ENABLED); + .setIfUnset(CQSI_THREAD_POOL_METRICS_ENABLED, DEFAULT_CQSI_THREAD_POOL_METRICS_ENABLED) + .setIfUnset(CDC_TTL_MUTATION_MAX_RETRIES, DEFAULT_CDC_TTL_MUTATION_MAX_RETRIES); // HBase sets this to 1, so we reset it to something more appropriate. // Hopefully HBase will change this, because we can't know if a user set diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/util/CDCChangeBuilder.java b/phoenix-core-client/src/main/java/org/apache/phoenix/util/CDCChangeBuilder.java index 4bd2567ddf..101d0f9335 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/util/CDCChangeBuilder.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/util/CDCChangeBuilder.java @@ -31,6 +31,7 @@ import static org.apache.phoenix.query.QueryConstants.CDC_DELETE_EVENT_TYPE; import static org.apache.phoenix.query.QueryConstants.CDC_EVENT_TYPE; import static org.apache.phoenix.query.QueryConstants.CDC_POST_IMAGE; import static org.apache.phoenix.query.QueryConstants.CDC_PRE_IMAGE; +import static org.apache.phoenix.query.QueryConstants.CDC_TTL_DELETE_EVENT_TYPE; import static org.apache.phoenix.query.QueryConstants.CDC_UPSERT_EVENT_TYPE; public class CDCChangeBuilder { @@ -69,7 +70,7 @@ public class CDCChangeBuilder { } public boolean isDeletionEvent() { - return changeType == CDC_DELETE_EVENT_TYPE; + return changeType == CDC_DELETE_EVENT_TYPE || changeType == CDC_TTL_DELETE_EVENT_TYPE; } public boolean isNonEmptyEvent() { diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/util/CDCUtil.java b/phoenix-core-client/src/main/java/org/apache/phoenix/util/CDCUtil.java index 4cdd48cf97..44c09055f2 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/util/CDCUtil.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/util/CDCUtil.java @@ -38,6 +38,7 @@ import org.apache.hadoop.util.StringUtils; import org.apache.phoenix.exception.SQLExceptionCode; import org.apache.phoenix.exception.SQLExceptionInfo; import org.apache.phoenix.execute.DescVarLengthFastByteComparisons; +import org.apache.phoenix.schema.PIndexState; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.types.PDataType; import org.bson.RawBsonDocument; @@ -112,6 +113,37 @@ public class CDCUtil { return isCDCIndex(indexTable.getTableName().getString()); } + public static boolean isCDCIndexActive(PTable indexTable) { + return isCDCIndex(indexTable.getTableName().getString()) + && indexTable.getIndexState() == PIndexState.ACTIVE; + } + + /** + * Check if the given table has an active CDC index. + * + * @param table The PTable object. + * @return true if the table has an active CDC index, false otherwise. + */ + public static boolean hasActiveCDCIndex(PTable table) { + if (table == null || table.getIndexes() == null) { + return false; + } + return table.getIndexes().stream().anyMatch(CDCUtil::isCDCIndexActive); + } + + /** + * Return PTable of the active CDC index for the given data table. + * + * @param dataTable The data table. + * @return active CDC index. + */ + public static PTable getActiveCDCIndex(PTable dataTable) { + return dataTable.getIndexes().stream() + .filter(CDCUtil::isCDCIndexActive) + .findFirst() + .orElse(null); + } + /** * Check if the given table has any CDC indexes. * @@ -152,7 +184,7 @@ public class CDCUtil { public static Object getColumnEncodedValue(Object value, PDataType dataType) { if (value != null) { if (dataType.getSqlType() == PDataType.BSON_TYPE) { - value = Bytes.toBytes(((RawBsonDocument) value).getByteBuffer().asNIO()); + value = ByteUtil.toBytes(((RawBsonDocument) value).getByteBuffer().asNIO()); } else if (isBinaryType(dataType)) { // Unfortunately, Base64.Encoder has no option to specify offset and length so can't // avoid copying bytes. diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CDCCompactionUtil.java b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CDCCompactionUtil.java new file mode 100644 index 0000000000..78fd936c29 --- /dev/null +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CDCCompactionUtil.java @@ -0,0 +1,395 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.coprocessor; + +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.CheckAndMutate; +import org.apache.hadoop.hbase.client.CheckAndMutateResult; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.regionserver.Region; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.phoenix.expression.function.PartitionIdFunction; +import org.apache.phoenix.hbase.index.ValueGetter; +import org.apache.phoenix.hbase.index.util.GenericKeyValueBuilder; +import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; +import org.apache.phoenix.index.IndexMaintainer; +import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.query.QueryConstants; +import org.apache.phoenix.schema.PColumn; +import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.schema.types.PDate; +import org.apache.phoenix.util.CDCUtil; +import org.apache.phoenix.util.IndexUtil; +import org.apache.phoenix.util.JacksonUtil; +import org.apache.phoenix.util.QueryUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import static org.apache.phoenix.query.QueryConstants.NAME_SEPARATOR; + +/** + * Utility class for CDC (Change Data Capture) operations during compaction. + * This class contains utilities for handling TTL row expiration events and generating + * CDC events with pre-image data that are written directly to CDC index tables. + */ +public final class CDCCompactionUtil { + + private static final Logger LOGGER = LoggerFactory.getLogger(CDCCompactionUtil.class); + + private CDCCompactionUtil() { + // empty + } + + /** + * Finds the column name for a given cell in the data table. + * + * @param dataTable The data table + * @param cell The cell + * @return The column name or null if not found + */ + private static String findColumnName(PTable dataTable, Cell cell) { + try { + byte[] family = CellUtil.cloneFamily(cell); + byte[] qualifier = CellUtil.cloneQualifier(cell); + byte[] defaultCf = dataTable.getDefaultFamilyName() != null + ? dataTable.getDefaultFamilyName().getBytes() + : QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES; + for (PColumn column : dataTable.getColumns()) { + if (column.getFamilyName() != null + && Bytes.equals(family, column.getFamilyName().getBytes()) + && Bytes.equals(qualifier, column.getColumnQualifierBytes())) { + if (Bytes.equals(defaultCf, column.getFamilyName().getBytes())) { + return column.getName().getString(); + } else { + return column.getFamilyName().getString() + NAME_SEPARATOR + + column.getName().getString(); + } + } + } + } catch (Exception e) { + LOGGER.error("Error finding column name for cell: {}", CellUtil.toString(cell, true), + e); + } + return null; + } + + /** + * Creates a CDC event map for TTL delete with pre-image data. + * + * @param expiredRowPut The expired row data + * @param dataTable The data table + * @param preImage Pre-image map + * @return CDC event map + */ + private static Map<String, Object> createTTLDeleteCDCEvent(Put expiredRowPut, PTable dataTable, + Map<String, Object> preImage) + throws Exception { + Map<String, Object> cdcEvent = new HashMap<>(); + cdcEvent.put(QueryConstants.CDC_EVENT_TYPE, QueryConstants.CDC_TTL_DELETE_EVENT_TYPE); + for (List<Cell> familyCells : expiredRowPut.getFamilyCellMap().values()) { + for (Cell cell : familyCells) { + String columnName = findColumnName(dataTable, cell); + if (columnName != null) { + PColumn column = dataTable.getColumnForColumnQualifier( + CellUtil.cloneFamily(cell), CellUtil.cloneQualifier(cell)); + Object value = column.getDataType().toObject(cell.getValueArray(), + cell.getValueOffset(), + cell.getValueLength()); + Object encodedValue = + CDCUtil.getColumnEncodedValue(value, column.getDataType()); + preImage.put(columnName, encodedValue); + } + } + } + cdcEvent.put(QueryConstants.CDC_PRE_IMAGE, preImage); + cdcEvent.put(QueryConstants.CDC_POST_IMAGE, Collections.emptyMap()); + return cdcEvent; + } + + /** + * Builds CDC index Put mutation. + * + * @param cdcIndex The CDC index table + * @param expiredRowPut The expired row data as a Put + * @param eventTimestamp The timestamp for the CDC event + * @param cdcEventBytes The CDC event data to store + * @param dataTable The data table + * @param env The region coprocessor environment + * @param region The HBase region + * @param compactionTimeBytes The compaction time as bytes + * @return The CDC index Put mutation + */ + private static Put buildCDCIndexPut(PTable cdcIndex, Put expiredRowPut, long eventTimestamp, + byte[] cdcEventBytes, PTable dataTable, + RegionCoprocessorEnvironment env, Region region, + byte[] compactionTimeBytes) throws Exception { + + try (PhoenixConnection serverConnection = QueryUtil.getConnectionOnServer(new Properties(), + env.getConfiguration()).unwrap(PhoenixConnection.class)) { + + IndexMaintainer cdcIndexMaintainer = + cdcIndex.getIndexMaintainer(dataTable, serverConnection); + + ValueGetter dataRowVG = new IndexUtil.SimpleValueGetter(expiredRowPut); + ImmutableBytesPtr rowKeyPtr = new ImmutableBytesPtr(expiredRowPut.getRow()); + + Put cdcIndexPut = cdcIndexMaintainer.buildUpdateMutation( + GenericKeyValueBuilder.INSTANCE, + dataRowVG, + rowKeyPtr, + eventTimestamp, + null, + null, + false, + region.getRegionInfo().getEncodedNameAsBytes()); + + byte[] rowKey = cdcIndexPut.getRow().clone(); + System.arraycopy(compactionTimeBytes, 0, rowKey, + PartitionIdFunction.PARTITION_ID_LENGTH, PDate.INSTANCE.getByteSize()); + Put newCdcIndexPut = new Put(rowKey, eventTimestamp); + + newCdcIndexPut.addColumn( + cdcIndexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary(), + cdcIndexMaintainer.getEmptyKeyValueQualifier(), eventTimestamp, + QueryConstants.UNVERIFIED_BYTES); + + // Add CDC event data + newCdcIndexPut.addColumn( + QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, + QueryConstants.CDC_IMAGE_CQ_BYTES, + eventTimestamp, + cdcEventBytes); + + return newCdcIndexPut; + } + } + + /** + * Generates and applies a CDC index mutation for TTL expired row with retries if required. + * + * @param cdcIndex The CDC index table + * @param dataTable The data table + * @param expiredRowPut The expired row data as a Put + * @param eventTimestamp The timestamp for the CDC event + * @param tableName The table name for logging + * @param env The region coprocessor environment + * @param region The HBase region + * @param compactionTimeBytes The compaction time as bytes + * @param cdcTtlMutationMaxRetries Maximum retry attempts for CDC mutations + */ + private static void generateCDCIndexMutation(PTable cdcIndex, PTable dataTable, + Put expiredRowPut, + long eventTimestamp, String tableName, + RegionCoprocessorEnvironment env, Region region, + byte[] compactionTimeBytes, + int cdcTtlMutationMaxRetries) + throws Exception { + Map<String, Object> cdcEvent = + createTTLDeleteCDCEvent(expiredRowPut, dataTable, new HashMap<>()); + byte[] cdcEventBytes = + JacksonUtil.getObjectWriter(HashMap.class).writeValueAsBytes(cdcEvent); + Put cdcIndexPut = + buildCDCIndexPut(cdcIndex, expiredRowPut, eventTimestamp, cdcEventBytes, + dataTable, env, region, compactionTimeBytes); + + Exception lastException = null; + for (int retryCount = 0; retryCount < cdcTtlMutationMaxRetries; retryCount++) { + try (Table cdcIndexTable = env.getConnection().getTable(TableName.valueOf( + cdcIndex.getPhysicalName().getBytes()))) { + CheckAndMutate checkAndMutate = + CheckAndMutate.newBuilder(cdcIndexPut.getRow()) + .ifNotExists(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, + QueryConstants.CDC_IMAGE_CQ_BYTES) + .build(cdcIndexPut); + CheckAndMutateResult result = cdcIndexTable.checkAndMutate(checkAndMutate); + + if (result.isSuccess()) { + // Successfully inserted new CDC event - Single CF case + lastException = null; + break; + } else { + // Row already exists, need to retrieve existing pre-image and merge + // Likely to happen for multi CF case + Get get = new Get(cdcIndexPut.getRow()); + get.addColumn(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, + QueryConstants.CDC_IMAGE_CQ_BYTES); + Result existingResult = cdcIndexTable.get(get); + + if (!existingResult.isEmpty()) { + Cell existingCell = existingResult.getColumnLatestCell( + QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, + QueryConstants.CDC_IMAGE_CQ_BYTES); + + if (existingCell != null) { + byte[] existingCdcBytes = CellUtil.cloneValue(existingCell); + Map<String, Object> existingCdcEvent = + JacksonUtil.getObjectReader(HashMap.class) + .readValue(existingCdcBytes); + Map<String, Object> existingPreImage = + (Map<String, Object>) existingCdcEvent.getOrDefault( + QueryConstants.CDC_PRE_IMAGE, new HashMap<>()); + + // Create new TTL delete event with merged pre-image + Map<String, Object> mergedCdcEvent = + createTTLDeleteCDCEvent(expiredRowPut, dataTable, + existingPreImage); + byte[] mergedCdcEventBytes = + JacksonUtil.getObjectWriter(HashMap.class) + .writeValueAsBytes(mergedCdcEvent); + + Put mergedCdcIndexPut = buildCDCIndexPut(cdcIndex, expiredRowPut, + eventTimestamp, mergedCdcEventBytes, dataTable, env, region, + compactionTimeBytes); + + cdcIndexTable.put(mergedCdcIndexPut); + lastException = null; + break; + } else { + LOGGER.warn("Rare event: Skipping CDC TTL mutation because other type" + + " of CDC event is recorded at time {}", eventTimestamp); + break; + } + } else { + LOGGER.warn("Rare event.. Skipping CDC TTL mutation because other type" + + " of CDC event is recorded at time {}", eventTimestamp); + break; + } + } + } catch (Exception e) { + lastException = e; + long backoffMs = 100; + LOGGER.warn("CDC mutation attempt {}/{} failed, retrying in {}ms", + retryCount + 1, cdcTtlMutationMaxRetries + 1, backoffMs, e); + try { + Thread.sleep(backoffMs); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + throw new IOException("Interrupted during CDC mutation retry", ie); + } + } + } + if (lastException != null) { + LOGGER.error("Failed to generate CDC mutation after {} attempts for table {}, index " + + "{}. The event update is missed.", + cdcTtlMutationMaxRetries + 1, + tableName, + cdcIndex.getPhysicalName().getString(), + lastException); + } + } + + /** + * Generates CDC TTL delete event and writes it directly to CDC index tables. + * This bypasses the normal CDC update path since the row is being expired. + * + * @param expiredRow The cells of the expired row + * @param tableName The table name for logging + * @param compactionTime The compaction timestamp + * @param dataTable The data table + * @param env The region coprocessor environment + * @param region The HBase region + * @param compactionTimeBytes The compaction time as bytes + * @param cdcTtlMutationMaxRetries Maximum retry attempts for CDC mutations + */ + private static void generateCDCTTLDeleteEvent(List<Cell> expiredRow, String tableName, + long compactionTime, PTable dataTable, + RegionCoprocessorEnvironment env, Region region, + byte[] compactionTimeBytes, + int cdcTtlMutationMaxRetries) { + try { + PTable cdcIndex = CDCUtil.getActiveCDCIndex(dataTable); + if (cdcIndex == null) { + LOGGER.warn("No active CDC index found for table {}", tableName); + return; + } + Cell firstCell = expiredRow.get(0); + byte[] dataRowKey = CellUtil.cloneRow(firstCell); + Put expiredRowPut = new Put(dataRowKey); + + for (Cell cell : expiredRow) { + expiredRowPut.add(cell); + } + + try { + generateCDCIndexMutation(cdcIndex, dataTable, expiredRowPut, compactionTime, + tableName, env, region, compactionTimeBytes, cdcTtlMutationMaxRetries); + } catch (Exception e) { + LOGGER.error("Failed to generate CDC mutation for index {}: {}", + cdcIndex.getName().getString(), e.getMessage(), e); + } + } catch (Exception e) { + LOGGER.error("Error generating CDC TTL delete event for table {}", + tableName, e); + } + } + + /** + * Handles TTL row expiration for CDC event generation. + * This method is called when a row is detected as expired during major compaction. + * + * @param expiredRow The cells of the expired row + * @param expirationType The type of TTL expiration + * @param tableName The table name for logging purposes + * @param compactionTime The timestamp when compaction started + * @param table The Phoenix data table metadata + * @param env The region coprocessor environment for accessing HBase + * resources + * @param region The HBase region being compacted + * @param compactionTimeBytes The compaction timestamp as byte array for CDC index row key + * construction + * @param cdcTtlMutationMaxRetries Maximum number of retry attempts for CDC mutation operations + */ + static void handleTTLRowExpiration(List<Cell> expiredRow, String expirationType, + String tableName, long compactionTime, PTable table, + RegionCoprocessorEnvironment env, Region region, + byte[] compactionTimeBytes, + int cdcTtlMutationMaxRetries) { + try { + Cell firstCell = expiredRow.get(0); + byte[] rowKey = CellUtil.cloneRow(firstCell); + + LOGGER.info("TTL row expiration detected: table={}, rowKey={}, expirationType={}, " + + "cellCount={}, compactionTime={}", + tableName, + Bytes.toStringBinary(rowKey), + expirationType, + expiredRow.size(), + compactionTime); + + // Generate CDC TTL delete event with pre-image data + generateCDCTTLDeleteEvent(expiredRow, tableName, compactionTime, table, env, region, + compactionTimeBytes, cdcTtlMutationMaxRetries); + + } catch (Exception e) { + LOGGER.error("Error handling TTL row expiration for CDC: table {}", tableName, e); + } + } +} diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java index 4cbb4d6147..6cede61c45 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java @@ -22,9 +22,11 @@ import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellBuilder; import org.apache.hadoop.hbase.CellBuilderFactory; import org.apache.hadoop.hbase.CellBuilderType; +import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.RegionScanner; @@ -35,10 +37,9 @@ import org.apache.phoenix.expression.SingleCellColumnExpression; import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; import org.apache.phoenix.index.CDCTableInfo; import org.apache.phoenix.index.IndexMaintainer; -import org.apache.phoenix.schema.SortOrder; import org.apache.phoenix.schema.tuple.ResultTuple; import org.apache.phoenix.schema.types.PDataType; -import org.apache.phoenix.schema.types.PLong; +import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.util.CDCChangeBuilder; import org.apache.phoenix.util.CDCUtil; import org.apache.phoenix.util.EncodedColumnsUtil; @@ -50,12 +51,43 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.List; import static org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants.CDC_DATA_TABLE_DEF; import static org.apache.phoenix.util.ByteUtil.EMPTY_BYTE_ARRAY; +/** + * CDC (Change Data Capture) enabled region scanner for global indexes that processes + * uncovered CDC index queries by reconstructing CDC events from index and data table rows. + * + * <h3>Purpose</h3> + * This scanner extends {@link UncoveredGlobalIndexRegionScanner} to handle CDC index queries + * where the CDC index doesn't contain all the columns needed to satisfy the query. It bridges + * the gap between CDC index rows and the original data table to reconstruct complete CDC events. + * + * <h3>CDC Event Processing</h3> + * The scanner processes two types of CDC events: + * <ul> + * <li><b>Regular CDC Events:</b> Requires data table scan to build CDC event JSON from + * current/historical row state</li> + * <li><b>Pre-Image CDC Events:</b> Contains embedded CDC data (e.g., TTL delete events) + * that can be returned directly without data table scan</li> + * </ul> + * + * <h3>CDC Event Structure</h3> + * The scanner produces CDC events in JSON format containing: + * <ul> + * <li><b>event_type:</b> "upsert", "delete", or "ttl_delete"</li> + * <li><b>pre_image:</b> Row state before the change (for updates/deletes)</li> + * <li><b>post_image:</b> Row state after the change (for inserts/updates)</li> + * </ul> + * + * @see UncoveredGlobalIndexRegionScanner + * @see CDCChangeBuilder + * @see CDCTableInfo + */ public class CDCGlobalIndexRegionScanner extends UncoveredGlobalIndexRegionScanner { private static final Logger LOGGER = LoggerFactory.getLogger(CDCGlobalIndexRegionScanner.class); @@ -101,6 +133,12 @@ public class CDCGlobalIndexRegionScanner extends UncoveredGlobalIndexRegionScann List<Cell> indexRow = indexRowIterator.next(); Cell indexCell = indexRow.get(0); byte[] indexRowKey = ImmutableBytesPtr.cloneCellRowIfNecessary(indexCell); + if (indexRow.size() > 1) { + boolean success = handlePreImageCDCEvent(indexRow, indexRowKey, indexCell, result); + if (success) { + return true; + } + } ImmutableBytesPtr dataRowKey = new ImmutableBytesPtr( indexToDataRowKeyMap.get(indexRowKey)); Result dataRow = dataRows.get(dataRowKey); @@ -233,16 +271,29 @@ public class CDCGlobalIndexRegionScanner extends UncoveredGlobalIndexRegionScann private Result getCDCImage(byte[] indexRowKey, Cell firstCell) throws JsonProcessingException { byte[] value = JacksonUtil.getObjectWriter(HashMap.class).writeValueAsBytes( changeBuilder.buildCDCEvent()); + return createCDCResult(indexRowKey, firstCell, changeBuilder.getChangeTimestamp(), value); + } + + /** + * Generates the Result object for the CDC event. + * + * @param indexRowKey The CDC index row key + * @param firstCell The first cell + * @param timestamp The timestamp for the CDC event + * @param value The CDC event JSON bytes + * @return Result containing the CDC data + */ + private Result createCDCResult(byte[] indexRowKey, Cell firstCell, long timestamp, + byte[] value) { CellBuilder builder = CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY); - Result cdcRow = Result.create(Arrays.asList(builder + return Result.create(Collections.singletonList(builder .setRow(indexRowKey) .setFamily(ImmutableBytesPtr.cloneCellFamilyIfNecessary(firstCell)) .setQualifier(cdcDataTableInfo.getCdcJsonColQualBytes()) - .setTimestamp(changeBuilder.getChangeTimestamp()) + .setTimestamp(timestamp) .setValue(value) .setType(Cell.Type.Put) .build())); - return cdcRow; } private Object getColumnValue(Cell cell, PDataType dataType) { @@ -259,4 +310,44 @@ public class CDCGlobalIndexRegionScanner extends UncoveredGlobalIndexRegionScann } return CDCUtil.getColumnEncodedValue(value, dataType); } + + /** + * Handles CDC events that already contain pre-image data, avoiding data table scan. + * Supports both the new CDC_IMAGE_CQ column and traditional CDC JSON column. + * + * @param indexRow The CDC index row cells + * @param indexRowKey The CDC index row key + * @param indexCell The primary index cell + * @param result The result list to populate + * @return true if event was processed successfully + */ + private boolean handlePreImageCDCEvent(List<Cell> indexRow, byte[] indexRowKey, + Cell indexCell, List<Cell> result) { + Cell cdcDataCell = null; + for (Cell cell : indexRow) { + if (Bytes.equals(cell.getQualifierArray(), cell.getQualifierOffset(), + cell.getQualifierLength(), + QueryConstants.CDC_IMAGE_CQ_BYTES, 0, + QueryConstants.CDC_IMAGE_CQ_BYTES.length)) { + cdcDataCell = cell; + break; + } + } + if (cdcDataCell == null) { + return false; + } + byte[] cdcEventBytes = CellUtil.cloneValue(cdcDataCell); + Result cdcRow = createCDCResult(indexRowKey, indexCell, cdcDataCell.getTimestamp(), + cdcEventBytes); + + if (tupleProjector != null) { + result.add(indexCell); + IndexUtil.addTupleAsOneCell(result, new ResultTuple(cdcRow), tupleProjector, ptr); + } else { + result.clear(); + } + LOGGER.debug("Processed CDC event with embedded data, skipped data table scan for" + + " row key: {}", Bytes.toStringBinary(indexRowKey)); + return true; + } } diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java index 2b9215b740..f3577611d0 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java @@ -26,6 +26,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.Comparator; +import java.util.Date; import java.util.HashSet; import java.util.Iterator; import java.util.LinkedList; @@ -82,6 +83,7 @@ import org.apache.phoenix.schema.SortOrder; import org.apache.phoenix.schema.TTLExpression; import org.apache.phoenix.schema.TTLExpressionFactory; import org.apache.phoenix.schema.types.PDataType; +import org.apache.phoenix.schema.types.PDate; import org.apache.phoenix.schema.types.PLong; import org.apache.phoenix.schema.types.PSmallint; import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting; @@ -150,6 +152,7 @@ public class CompactionScanner implements InternalScanner { private final int familyCount; private KeepDeletedCells keepDeletedCells; private long compactionTime; + private byte[] compactionTimeBytes; private final byte[] emptyCF; private final byte[] emptyCQ; private final byte[] storeColumnFamily; @@ -163,6 +166,9 @@ public class CompactionScanner implements InternalScanner { private long outputCellCount = 0; private boolean phoenixLevelOnly = false; private boolean isCDCIndex; + private final boolean isCdcTtlEnabled; + private final PTable table; + private final int cdcTtlMutationMaxRetries; // Only for forcing minor compaction while testing private static boolean forceMinorCompaction = false; @@ -184,6 +190,7 @@ public class CompactionScanner implements InternalScanner { this.emptyCF = SchemaUtil.getEmptyColumnFamily(table); this.emptyCQ = SchemaUtil.getEmptyColumnQualifier(table); compactionTime = EnvironmentEdgeManager.currentTimeMillis(); + compactionTimeBytes = PDate.INSTANCE.toBytes(new Date(compactionTime)); columnFamilyName = store.getColumnFamilyName(); storeColumnFamily = columnFamilyName.getBytes(); tableName = region.getRegionInfo().getTable().getNameAsString(); @@ -205,7 +212,15 @@ public class CompactionScanner implements InternalScanner { emptyCFStore = familyCount == 1 || columnFamilyName.equals(Bytes.toString(emptyCF)) || localIndex; - isCDCIndex = table != null ? CDCUtil.isCDCIndex(table) : false; + this.table = table; + isCDCIndex = CDCUtil.isCDCIndex(table); + isCdcTtlEnabled = + CDCUtil.hasActiveCDCIndex(table) && major && !table.isMultiTenant() + && table.getType() == PTableType.TABLE; + cdcTtlMutationMaxRetries = env.getConfiguration().getInt( + QueryServices.CDC_TTL_MUTATION_MAX_RETRIES, + QueryServicesOptions.DEFAULT_CDC_TTL_MUTATION_MAX_RETRIES); + // Initialize the tracker that computes the TTL for the compacting table. // The TTL tracker can be // simple (one single TTL for the table) when the compacting table is not Partitioned @@ -412,6 +427,11 @@ public class CompactionScanner implements InternalScanner { CompiledConditionalTTLExpression ttlExpr = (CompiledConditionalTTLExpression) rowContext.ttlExprForRow; if (ttlExpr.isExpired(result, true)) { + if (isCdcTtlEnabled && !result.isEmpty()) { + CDCCompactionUtil.handleTTLRowExpiration(result, "conditional_ttl", tableName, + compactionTime, table, env, region, compactionTimeBytes, + cdcTtlMutationMaxRetries); + } // If the row is expired, purge the row result.clear(); } @@ -2591,6 +2611,12 @@ public class CompactionScanner implements InternalScanner { if (major && compactionTime - rowContext.maxTimestamp > maxLookbackInMillis + ttl) { // Only do this check for major compaction as for minor compactions we don't expire cells. // The row version should not be visible via the max lookback window. Nothing to do + + if (isCdcTtlEnabled && !lastRow.isEmpty()) { + CDCCompactionUtil.handleTTLRowExpiration(lastRow, "time_based_ttl", tableName, + compactionTime, table, env, region, compactionTimeBytes, + cdcTtlMutationMaxRetries); + } return; } retainedCells.addAll(lastRow); @@ -2683,6 +2709,11 @@ public class CompactionScanner implements InternalScanner { // store is not the empty column family store. return false; } + if (isCdcTtlEnabled && !lastRowVersion.isEmpty()) { + CDCCompactionUtil.handleTTLRowExpiration(lastRowVersion, "max_lookback_ttl", + tableName, compactionTime, table, env, region, compactionTimeBytes, + cdcTtlMutationMaxRetries); + } return true; } // If the time gap between two back to back mutations is more than ttl then we know diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/Bson3IT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/Bson3IT.java index 1adad43dc3..8b55334c50 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/Bson3IT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/Bson3IT.java @@ -21,12 +21,17 @@ package org.apache.phoenix.end2end; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.commons.io.FileUtils; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.util.Bytes; import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.schema.types.PDouble; import org.apache.phoenix.thirdparty.com.google.common.base.Preconditions; import org.apache.phoenix.util.CDCUtil; +import org.apache.phoenix.util.EnvironmentEdgeManager; +import org.apache.phoenix.util.ManualEnvironmentEdge; import org.apache.phoenix.util.PropertiesUtil; +import org.apache.phoenix.util.TestUtil; import org.bson.BsonArray; import org.bson.BsonBinary; import org.bson.BsonBoolean; @@ -39,6 +44,8 @@ import org.bson.RawBsonDocument; import org.junit.Assert; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import java.io.File; import java.io.IOException; @@ -52,12 +59,19 @@ import java.sql.SQLException; import java.sql.Timestamp; import java.util.Arrays; import java.util.Base64; +import java.util.Collection; +import java.util.HashMap; import java.util.Map; import java.util.Properties; +import static org.apache.phoenix.query.QueryConstants.CDC_EVENT_TYPE; +import static org.apache.phoenix.query.QueryConstants.CDC_POST_IMAGE; +import static org.apache.phoenix.query.QueryConstants.CDC_PRE_IMAGE; +import static org.apache.phoenix.query.QueryConstants.CDC_TTL_DELETE_EVENT_TYPE; import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; @@ -65,8 +79,25 @@ import static org.junit.Assert.assertTrue; * Tests for BSON. */ @Category(ParallelStatsDisabledTest.class) +@RunWith(Parameterized.class) public class Bson3IT extends ParallelStatsDisabledIT { + private final boolean columnEncoded; + + public Bson3IT(boolean columnEncoded) { + this.columnEncoded = columnEncoded; + } + + @Parameterized.Parameters(name = + "Bson3IT_columnEncoded={0}") + public static synchronized Collection<Object[]> data() { + return Arrays.asList( + new Object[][]{ + {false}, + {true} + }); + } + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); private static String getJsonString(String jsonFilePath) throws IOException { @@ -86,7 +117,8 @@ public class Bson3IT extends ParallelStatsDisabledIT { try (Connection conn = DriverManager.getConnection(getUrl(), props)) { String ddl = "CREATE TABLE " + tableName + " (PK1 VARCHAR NOT NULL, C1 VARCHAR, COL BSON" - + " CONSTRAINT pk PRIMARY KEY(PK1))"; + + " CONSTRAINT pk PRIMARY KEY(PK1)) " + + (this.columnEncoded ? "" : "COLUMN_ENCODED_BYTES=0"); String cdcDdl = "CREATE CDC " + cdcName + " ON " + tableName; conn.createStatement().execute(ddl); conn.createStatement().execute(cdcDdl); @@ -627,7 +659,8 @@ public class Bson3IT extends ParallelStatsDisabledIT { try (Connection conn = DriverManager.getConnection(getUrl(), props)) { String ddl = "CREATE TABLE " + tableName + " (PK1 VARCHAR NOT NULL, C1 VARCHAR, COL BSON" - + " CONSTRAINT pk PRIMARY KEY(PK1))"; + + " CONSTRAINT pk PRIMARY KEY(PK1)) " + + (this.columnEncoded ? "" : "COLUMN_ENCODED_BYTES=0"); String cdcDdl = "CREATE CDC " + cdcName + " ON " + tableName; conn.createStatement().execute(ddl); conn.createStatement().execute(cdcDdl); @@ -1073,7 +1106,8 @@ public class Bson3IT extends ParallelStatsDisabledIT { try (Connection conn = DriverManager.getConnection(getUrl(), props)) { String ddl = "CREATE TABLE " + tableName + " (PK1 VARCHAR NOT NULL, C1 VARCHAR, COL BSON" - + " CONSTRAINT pk PRIMARY KEY(PK1))"; + + " CONSTRAINT pk PRIMARY KEY(PK1)) " + + (this.columnEncoded ? "" : "COLUMN_ENCODED_BYTES=0"); String cdcDdl = "CREATE CDC " + cdcName + " ON " + tableName; conn.createStatement().execute(ddl); conn.createStatement().execute(cdcDdl); @@ -1443,7 +1477,8 @@ public class Bson3IT extends ParallelStatsDisabledIT { try (Connection conn = DriverManager.getConnection(getUrl(), props)) { String ddl = "CREATE TABLE " + tableName + " (PK1 VARCHAR NOT NULL, C1 VARCHAR, COL BSON" - + " CONSTRAINT pk PRIMARY KEY(PK1))"; + + " CONSTRAINT pk PRIMARY KEY(PK1)) " + + (this.columnEncoded ? "" : "COLUMN_ENCODED_BYTES=0"); String cdcDdl = "CREATE CDC " + cdcName + " ON " + tableName; conn.createStatement().execute(ddl); conn.createStatement().execute(cdcDdl); @@ -1865,12 +1900,14 @@ public class Bson3IT extends ParallelStatsDisabledIT { @Test public void testCDCWithCaseSenstitiveTableAndPks() throws Exception { Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); - String tableName = "XYZ.\"test.table\""; - String cdcName = "XYZ.\"CDC_test.table\""; - String cdcNameWithoutSchema = "\"CDC_test.table\""; + String nameQuotes = "test.tableTESt-_123" + generateUniqueName(); + String tableName = "XYZ.\"" + nameQuotes + "\""; + String cdcName = "XYZ.\"CDC_" + nameQuotes + "\""; + String cdcNameWithoutSchema = "\"CDC_" + nameQuotes + "\""; try (Connection conn = DriverManager.getConnection(getUrl(), props)) { String ddl = "CREATE TABLE " + tableName + - " (\"hk\" VARCHAR NOT NULL, COL BSON CONSTRAINT pk PRIMARY KEY(\"hk\"))"; + " (\"hk\" VARCHAR NOT NULL, COL BSON CONSTRAINT pk PRIMARY KEY(\"hk\")) " + + (this.columnEncoded ? "" : "COLUMN_ENCODED_BYTES=0"); conn.createStatement().execute(ddl); String cdcDdl = "CREATE CDC " + cdcNameWithoutSchema + " ON " + tableName; @@ -1930,6 +1967,244 @@ public class Bson3IT extends ParallelStatsDisabledIT { actualDoc); assertFalse("Should only have one CDC record", rs.next()); + + conn.createStatement().execute("DROP TABLE " + tableName + " CASCADE"); + } + } + + /** + * Test BSON operations with SQL conditions and TTL functionality. + */ + @Test + public void testBsonOpsWithSqlConditionsUpdateSuccessWithTTL() throws Exception { + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + String tableName = generateUniqueName(); + String cdcName = generateUniqueName(); + final int ttlSeconds = 10; + final int maxLookbackAge = 5; + + try (Connection conn = DriverManager.getConnection(getUrl(), props)) { + String ddl = "CREATE TABLE " + tableName + + " (PK1 VARCHAR NOT NULL, C1 VARCHAR, COL BSON" + + " CONSTRAINT pk PRIMARY KEY(PK1)) TTL=" + + ttlSeconds + ", \"phoenix.max.lookback.age.seconds\" = " + maxLookbackAge + + (this.columnEncoded ? "" : ", COLUMN_ENCODED_BYTES=0"); + String cdcDdl = "CREATE CDC " + cdcName + " ON " + tableName; + conn.createStatement().execute(ddl); + conn.createStatement().execute(cdcDdl); + + ManualEnvironmentEdge injectEdge = new ManualEnvironmentEdge(); + long startTime = System.currentTimeMillis() + 1000; + startTime = (startTime / 1000) * 1000; + EnvironmentEdgeManager.injectEdge(injectEdge); + injectEdge.setValue(startTime); + + String sample1 = getJsonString("json/sample_01.json"); + String sample2 = getJsonString("json/sample_02.json"); + String sample3 = getJsonString("json/sample_03.json"); + BsonDocument bsonDocument1 = RawBsonDocument.parse(sample1); + BsonDocument bsonDocument2 = RawBsonDocument.parse(sample2); + BsonDocument bsonDocument3 = RawBsonDocument.parse(sample3); + + PreparedStatement stmt = + conn.prepareStatement("UPSERT INTO " + tableName + " VALUES (?,?,?)"); + stmt.setString(1, "pk0001"); + stmt.setString(2, "0002"); + stmt.setObject(3, bsonDocument1); + stmt.executeUpdate(); + + stmt.setString(1, "pk1010"); + stmt.setString(2, "1010"); + stmt.setObject(3, bsonDocument2); + stmt.executeUpdate(); + + stmt.setString(1, "pk1011"); + stmt.setString(2, "1011"); + stmt.setObject(3, bsonDocument3); + stmt.executeUpdate(); + + conn.commit(); + injectEdge.incrementValue(1000); + + String conditionExpression = + "press = $press AND track[0].shot[2][0].city.standard[5] = $softly"; + + BsonDocument conditionDoc = new BsonDocument(); + conditionDoc.put("$EXPR", new BsonString(conditionExpression)); + conditionDoc.put("$VAL", new BsonDocument() + .append("$press", new BsonString("beat")) + .append("$softly", new BsonString("softly"))); + + BsonDocument updateExp = new BsonDocument() + .append("$SET", new BsonDocument() + .append("browserling", + new BsonBinary(PDouble.INSTANCE.toBytes(-505169340.54880095))) + .append("track[0].shot[2][0].city.standard[5]", new BsonString("soft")) + .append("track[0].shot[2][0].city.problem[2]", + new BsonString("track[0].shot[2][0].city.problem[2] + 529.435"))) + .append("$UNSET", new BsonDocument() + .append("track[0].shot[2][0].city.flame", new BsonNull())); + + stmt = conn.prepareStatement("UPSERT INTO " + tableName + + " VALUES (?) ON DUPLICATE KEY UPDATE COL = CASE WHEN" + + " BSON_CONDITION_EXPRESSION(COL, '" + conditionDoc.toJson() + "')" + + " THEN BSON_UPDATE_EXPRESSION(COL, '" + updateExp + "') ELSE COL END," + + " C1 = ?"); + stmt.setString(1, "pk0001"); + stmt.setString(2, "0003"); + stmt.executeUpdate(); + + String query = "SELECT * FROM " + tableName + " WHERE PK1 = 'pk0001'"; + ResultSet rs = conn.createStatement().executeQuery(query); + assertTrue(rs.next()); + BsonDocument document1 = (BsonDocument) rs.getObject(3); + + updateExp = new BsonDocument() + .append("$ADD", new BsonDocument() + .append("new_samples", + new BsonDocument().append("$set", + new BsonArray(Arrays.asList( + new BsonBinary(Bytes.toBytes("Sample10")), + new BsonBinary(Bytes.toBytes("Sample12")), + new BsonBinary(Bytes.toBytes("Sample13")), + new BsonBinary(Bytes.toBytes("Sample14")) + ))))) + .append("$DELETE_FROM_SET", new BsonDocument() + .append("new_samples", + new BsonDocument().append("$set", + new BsonArray(Arrays.asList( + new BsonBinary(Bytes.toBytes("Sample02")), + new BsonBinary(Bytes.toBytes("Sample03")) + ))))) + .append("$SET", new BsonDocument() + .append("newrecord", ((BsonArray) (document1.get("track"))).get(0))) + .append("$UNSET", new BsonDocument() + .append("rather[3].outline.halfway.so[2][2]", new BsonNull())); + + conditionExpression = + "field_not_exists(newrecord) AND field_exists(rather[3].outline.halfway.so[2][2])"; + + conditionDoc = new BsonDocument(); + conditionDoc.put("$EXPR", new BsonString(conditionExpression)); + conditionDoc.put("$VAL", new BsonDocument()); + + stmt = conn.prepareStatement("UPSERT INTO " + tableName + + " VALUES (?) ON DUPLICATE KEY UPDATE COL = CASE WHEN" + + " BSON_CONDITION_EXPRESSION(COL, '" + conditionDoc.toJson() + "')" + + " THEN BSON_UPDATE_EXPRESSION(COL, '" + updateExp + "') ELSE COL END"); + + stmt.setString(1, "pk1010"); + stmt.executeUpdate(); + + updateExp = new BsonDocument() + .append("$SET", new BsonDocument() + .append("result[1].location.state", new BsonString("AK"))) + .append("$UNSET", new BsonDocument() + .append("result[4].emails[1]", new BsonNull())); + + conditionExpression = + "result[2].location.coordinates.latitude > $latitude OR " + + "(field_exists(result[1].location) AND result[1].location.state != $state" + + " AND field_exists(result[4].emails[1]))"; + + conditionDoc = new BsonDocument(); + conditionDoc.put("$EXPR", new BsonString(conditionExpression)); + conditionDoc.put("$VAL", new BsonDocument() + .append("$latitude", new BsonDouble(0)) + .append("$state", new BsonString("AK"))); + + stmt = conn.prepareStatement("UPSERT INTO " + tableName + + " VALUES (?) ON DUPLICATE KEY UPDATE COL = CASE WHEN" + + " BSON_CONDITION_EXPRESSION(COL, '" + conditionDoc.toJson() + "')" + + " THEN BSON_UPDATE_EXPRESSION(COL, '" + updateExp + "') ELSE COL END"); + + stmt.setString(1, "pk1011"); + stmt.executeUpdate(); + + conn.commit(); + injectEdge.incrementValue(1000); + + // Capture timestamp before TTL expiration + Timestamp beforeTTLTimestamp = new Timestamp(injectEdge.currentTime()); + + // Capture last post-images for each row before TTL expiration + Map<String, Map<String, Object>> lastPostImages = new HashMap<>(); + + String cdcQuery = "SELECT /*+ CDC_INCLUDE(PRE, POST) */ * FROM " + cdcName + + " WHERE PHOENIX_ROW_TIMESTAMP() <= ? ORDER BY PHOENIX_ROW_TIMESTAMP() DESC"; + try (PreparedStatement pst = conn.prepareStatement(cdcQuery)) { + pst.setTimestamp(1, beforeTTLTimestamp); + try (ResultSet cdcRs = pst.executeQuery()) { + while (cdcRs.next()) { + String pk = cdcRs.getString(2); + if (!lastPostImages.containsKey(pk)) { + String cdcVal = cdcRs.getString(3); + Map<String, Object> cdcEvent = OBJECT_MAPPER.readValue(cdcVal, HashMap.class); + if (cdcEvent.containsKey(CDC_POST_IMAGE)) { + lastPostImages.put(pk, (Map<String, Object>) cdcEvent.get(CDC_POST_IMAGE)); + } + } + } + } + } + + // Verify all rows have post-images captured + assertEquals("Should have post-images for all 3 rows", 3, lastPostImages.size()); + assertNotNull("Should have post-image for pk0001", lastPostImages.get("pk0001")); + assertNotNull("Should have post-image for pk1010", lastPostImages.get("pk1010")); + assertNotNull("Should have post-image for pk1011", lastPostImages.get("pk1011")); + + // Advance time past TTL to expire rows + injectEdge.incrementValue((ttlSeconds + maxLookbackAge + 1) * 1000); + + // Flush and major compact to trigger TTL expiration + Admin admin = getUtility().getAdmin(); + admin.flush(TableName.valueOf(tableName)); + + TestUtil.majorCompact(getUtility(), TableName.valueOf(tableName)); + + // Verify all rows are expired from data table + String dataQuery = "SELECT * FROM " + tableName; + try (ResultSet dataRs = conn.createStatement().executeQuery(dataQuery)) { + assertFalse("All rows should be expired from data table", dataRs.next()); + } + + // Verify TTL_DELETE CDC events were generated for all rows + String ttlDeleteQuery = "SELECT /*+ CDC_INCLUDE(PRE, POST) */ * FROM " + cdcName + + " WHERE PHOENIX_ROW_TIMESTAMP() > ?"; + Map<String, Map<String, Object>> ttlDeleteEvents = new HashMap<>(); + + try (PreparedStatement pst = conn.prepareStatement(ttlDeleteQuery)) { + pst.setTimestamp(1, beforeTTLTimestamp); + try (ResultSet ttlRs = pst.executeQuery()) { + while (ttlRs.next()) { + String pk = ttlRs.getString(2); + String cdcVal = ttlRs.getString(3); + Map<String, Object> cdcEvent = OBJECT_MAPPER.readValue(cdcVal, HashMap.class); + + // Only process TTL delete events + if (CDC_TTL_DELETE_EVENT_TYPE.equals(cdcEvent.get(CDC_EVENT_TYPE))) { + ttlDeleteEvents.put(pk, (Map<String, Object>) cdcEvent.get(CDC_PRE_IMAGE)); + } + } + } + } + + // Verify TTL delete events for all rows + assertEquals("Should have TTL delete events for all 3 rows", 3, ttlDeleteEvents.size()); + + // Verify pre-image consistency for each row + for (String pk : Arrays.asList("pk0001", "pk1010", "pk1011")) { + Map<String, Object> ttlPreImage = ttlDeleteEvents.get(pk); + assertNotNull("Should have TTL delete event for " + pk, ttlPreImage); + Map<String, Object> lastPostImage = lastPostImages.get(pk); + assertNotNull("TTL pre-image should not be null for " + pk, ttlPreImage); + assertNotNull("Last post-image should not be null for " + pk, lastPostImage); + assertEquals("TTL delete pre-image should match last post-image for " + pk, + lastPostImage, ttlPreImage); + } + } finally { + EnvironmentEdgeManager.reset(); } } diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableTTLIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableTTLIT.java index 1d68ce776c..7fdabc648b 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableTTLIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableTTLIT.java @@ -28,6 +28,9 @@ import org.apache.phoenix.query.BaseTest; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.thirdparty.com.google.common.collect.Maps; import org.apache.phoenix.util.*; +import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.schema.PTable; +import com.fasterxml.jackson.databind.ObjectMapper; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -46,10 +49,16 @@ import java.sql.ResultSet; import java.sql.SQLException; import java.util.Arrays; import java.util.Collection; +import java.util.HashMap; import java.util.Map; import java.util.Properties; import java.util.Random; +import static org.apache.phoenix.query.QueryConstants.CDC_EVENT_TYPE; +import static org.apache.phoenix.query.QueryConstants.CDC_POST_IMAGE; +import static org.apache.phoenix.query.QueryConstants.CDC_PRE_IMAGE; +import static org.apache.phoenix.query.QueryConstants.CDC_TTL_DELETE_EVENT_TYPE; +import static org.apache.phoenix.query.QueryConstants.CDC_UPSERT_EVENT_TYPE; import static org.junit.Assert.*; @Category(NeedsOwnMiniClusterTest.class) @@ -314,6 +323,102 @@ public class TableTTLIT extends BaseTest { } } + @Test + public void testRowSpansMultipleTTLWindowsWithCdc() throws Exception { + final int maxLookbackAge = tableLevelMaxLookback != null + ? tableLevelMaxLookback : MAX_LOOKBACK_AGE; + + try (Connection conn = DriverManager.getConnection(getUrl())) { + String schemaName = generateUniqueName(); + String tableName = schemaName + "." + generateUniqueName(); + String noCompactTableName = generateUniqueName(); + createTable(tableName); + createTable(noCompactTableName); + conn.createStatement().execute("ALTER TABLE " + tableName + + " SET \"phoenix.max.lookback.age.seconds\" = " + maxLookbackAge); + + // Create CDC index for TTL verification + String cdcName = generateUniqueName(); + String cdcSql = "CREATE CDC " + cdcName + " ON " + tableName + + " INCLUDE (PRE, POST)"; + conn.createStatement().execute(cdcSql); + conn.commit(); + + String cdcFullName = SchemaUtil.getTableName(null, schemaName + "." + cdcName); + + ObjectMapper mapper = new ObjectMapper(); + long startTime = System.currentTimeMillis() + 1000; + startTime = (startTime / 1000) * 1000; + EnvironmentEdgeManager.injectEdge(injectEdge); + injectEdge.setValue(startTime); + + // Track the last post-image from normal CDC events + Map<String, Object> lastPostImage = null; + + for (int columnIndex = 1; columnIndex <= MAX_COLUMN_INDEX; columnIndex++) { + String value = Integer.toString(RAND.nextInt(1000)); + updateColumn(conn, tableName, "a", columnIndex, value); + updateColumn(conn, noCompactTableName, "a", columnIndex, value); + conn.commit(); + + // Capture the last post-image from CDC events + String cdcQuery = + "SELECT PHOENIX_ROW_TIMESTAMP(), \"CDC JSON\" FROM " + cdcFullName + + " ORDER BY PHOENIX_ROW_TIMESTAMP() DESC LIMIT 1"; + try (ResultSet rs = conn.createStatement().executeQuery(cdcQuery)) { + if (rs.next()) { + Map<String, Object> cdcEvent = + mapper.readValue(rs.getString(2), HashMap.class); + if (cdcEvent.containsKey(CDC_POST_IMAGE)) { + lastPostImage = (Map<String, Object>) cdcEvent.get(CDC_POST_IMAGE); + } + } + } + + injectEdge.incrementValue(ttl * 1000 - 1000); + } + assertNotNull("Last post-image should not be null", lastPostImage); + + // Advance time past TTL to expire the row + injectEdge.incrementValue((ttl + maxLookbackAge + 1) * 1000); + + flush(TableName.valueOf(tableName)); + majorCompact(TableName.valueOf(tableName)); + + // Verify row is expired from data table + String dataQuery = "SELECT * FROM " + tableName + " WHERE id = 'a'"; + try (ResultSet rs = conn.createStatement().executeQuery(dataQuery)) { + assertFalse("Row should be expired from data table", rs.next()); + } + + // Verify TTL_DELETE CDC event was generated and compare pre-image + String cdcQuery = "SELECT \"CDC JSON\" FROM " + cdcFullName + + " ORDER BY PHOENIX_ROW_TIMESTAMP() DESC LIMIT 1"; + try (ResultSet rs = conn.createStatement().executeQuery(cdcQuery)) { + assertTrue("Should find TTL delete event", rs.next()); + Map<String, Object> ttlDeleteEvent = + mapper.readValue(rs.getString(1), HashMap.class); + LOG.info("TTL delete event: {}", ttlDeleteEvent); + + assertEquals("Should be ttl_delete event", CDC_TTL_DELETE_EVENT_TYPE, + ttlDeleteEvent.get(CDC_EVENT_TYPE)); + + Map<String, Object> ttlPreImage = + (Map<String, Object>) ttlDeleteEvent.get(CDC_PRE_IMAGE); + assertNotNull("TTL pre-image should not be null", ttlPreImage); + + assertEquals( + "TTL delete pre-image should match last post-image from normal CDC events", + lastPostImage, ttlPreImage); + + assertFalse("No more event should be found", rs.next()); + } + + compareRow(conn, tableName, noCompactTableName, "a", MAX_COLUMN_INDEX); + injectEdge.incrementValue(1000); + } + } + @Test public void testMultipleRowsWithUpdatesMoreThanTTLApart() throws Exception { // for the purpose of this test only considering cases when maxlookback is 0 @@ -544,6 +649,156 @@ public class TableTTLIT extends BaseTest { } } + /** + * Test CDC events for TTL expired rows. This test creates a table with TTL and CDC index, + * verifies insert/update CDC events with pre/post images, then triggers major compaction + * to expire rows and verifies TTL_DELETE events with pre-image data. + */ + @Test + public void testCDCTTLExpiredRows() throws Exception { + final int maxLookbackAge = tableLevelMaxLookback != null + ? tableLevelMaxLookback : MAX_LOOKBACK_AGE; + + try (Connection conn = DriverManager.getConnection(getUrl())) { + String schemaName = generateUniqueName(); + String tableName = schemaName + "." + generateUniqueName(); + String cdcName = generateUniqueName(); + ObjectMapper mapper = new ObjectMapper(); + + createTable(tableName); + conn.createStatement().execute("ALTER TABLE " + tableName + + " SET \"phoenix.max.lookback.age.seconds\" = " + maxLookbackAge); + + String cdcSql = "CREATE CDC " + cdcName + " ON " + tableName + + " INCLUDE (PRE, POST)"; + conn.createStatement().execute(cdcSql); + conn.commit(); + + String cdcIndexName = + schemaName + "." + CDCUtil.getCDCIndexName(schemaName + "." + cdcName); + String cdcFullName = SchemaUtil.getTableName(null, schemaName + "." + cdcName); + + PTable cdcIndex = ((PhoenixConnection) conn).getTableNoCache(cdcIndexName); + assertNotNull("CDC index should be created", cdcIndex); + assertTrue("CDC index should be CDC type", CDCUtil.isCDCIndex(cdcIndex)); + + // Setup time injection + long startTime = System.currentTimeMillis() + 1000; + startTime = (startTime / 1000) * 1000; + EnvironmentEdgeManager.injectEdge(injectEdge); + injectEdge.setValue(startTime); + + // Insert initial row + updateRow(conn, tableName, "row1"); + long insertTime = injectEdge.currentTime(); + injectEdge.incrementValue(1000); + + // Update the row + updateColumn(conn, tableName, "row1", 1, "updated_val1"); + updateColumn(conn, tableName, "row1", 2, "updated_val2"); + conn.commit(); + long updateTime = injectEdge.currentTime(); + injectEdge.incrementValue(1000); + + // Verify CDC events for insert and update + String cdcQuery = "SELECT PHOENIX_ROW_TIMESTAMP(), \"CDC JSON\" FROM " + cdcFullName; + Map<String, Object> postImage; + try (ResultSet rs = conn.createStatement().executeQuery(cdcQuery)) { + // First event - insert + assertTrue("Should have insert CDC event", rs.next()); + long eventTimestamp = rs.getTimestamp(1).getTime(); + assertTrue("Insert event timestamp should be close to insert time", + Math.abs(eventTimestamp - insertTime) < 2000); + + Map<String, Object> cdcEvent = mapper.readValue(rs.getString(2), HashMap.class); + assertEquals("Should be upsert event", CDC_UPSERT_EVENT_TYPE, + cdcEvent.get(CDC_EVENT_TYPE)); + assertTrue("Should have post-image", cdcEvent.containsKey(CDC_POST_IMAGE)); + + postImage = (Map<String, Object>) cdcEvent.get(CDC_POST_IMAGE); + assertFalse("post image must contain something", postImage.isEmpty()); + + // Second event - update + assertTrue("Should have update CDC event", rs.next()); + eventTimestamp = rs.getTimestamp(1).getTime(); + assertTrue("Update event timestamp should be close to update time", + Math.abs(eventTimestamp - updateTime) < 2000); + + cdcEvent = mapper.readValue(rs.getString(2), HashMap.class); + assertEquals("Should be upsert event", CDC_UPSERT_EVENT_TYPE, + cdcEvent.get(CDC_EVENT_TYPE)); + assertTrue("Should have pre-image", cdcEvent.containsKey(CDC_PRE_IMAGE)); + assertTrue("Should have post-image", cdcEvent.containsKey(CDC_POST_IMAGE)); + + Map<String, Object> preImage = (Map<String, Object>) cdcEvent.get(CDC_PRE_IMAGE); + assertEquals("Comparison of last post-image with new pre-image", postImage, + preImage); + postImage = (Map<String, Object>) cdcEvent.get(CDC_POST_IMAGE); + LOG.info("Post-image {}", postImage); + } + + // Advance time past TTL to expire the row + injectEdge.incrementValue((ttl + maxLookbackAge + 1) * 1000); + + TestUtil.dumpTable(conn, TableName.valueOf(tableName)); + TestUtil.dumpTable(conn, TableName.valueOf(cdcIndexName)); + flush(TableName.valueOf(tableName)); + majorCompact(TableName.valueOf(tableName)); + TestUtil.dumpTable(conn, TableName.valueOf(tableName)); + TestUtil.dumpTable(conn, TableName.valueOf(cdcIndexName)); + + // Verify row is expired from data table + String dataQuery = "SELECT * FROM " + tableName + " WHERE id = 'row1'"; + try (ResultSet rs = conn.createStatement().executeQuery(dataQuery)) { + assertFalse("Row should be expired from data table", rs.next()); + } + + // Verify TTL_DELETE CDC event was generated + try (ResultSet rs = conn.createStatement().executeQuery(cdcQuery)) { + int eventCount = 0; + Map<String, Object> ttlDeleteEvent = null; + + while (rs.next()) { + eventCount++; + Map<String, Object> cdcEvent = mapper.readValue(rs.getString(2), HashMap.class); + String eventType = (String) cdcEvent.get(CDC_EVENT_TYPE); + assertEquals("Event type must be " + CDC_TTL_DELETE_EVENT_TYPE + " but found " + + eventType, + CDC_TTL_DELETE_EVENT_TYPE, eventType); + if (CDC_TTL_DELETE_EVENT_TYPE.equals(eventType)) { + ttlDeleteEvent = cdcEvent; + } + } + + assertEquals("Should have only 1 event for TTL_DELETE because other events are " + + "expired due to major compaction", 1, eventCount); + assertNotNull("Should have TTL delete event", ttlDeleteEvent); + + // Verify TTL delete event structure + assertEquals("Should be ttl_delete event", CDC_TTL_DELETE_EVENT_TYPE, + ttlDeleteEvent.get(CDC_EVENT_TYPE)); + assertTrue("TTL delete should have pre-image", + ttlDeleteEvent.containsKey(CDC_PRE_IMAGE)); + + Map<String, Object> preImage = + (Map<String, Object>) ttlDeleteEvent.get(CDC_PRE_IMAGE); + assertEquals("Comparison of last post-image with new pre-image", postImage, + preImage); + LOG.info("TTL delete event verified: {}", ttlDeleteEvent); + } + + String cdcScanQuery = "SELECT \"CDC JSON\" FROM " + cdcFullName + + " WHERE \"CDC JSON\" LIKE '%ttl_delete%'"; + try (ResultSet rs = conn.createStatement().executeQuery(cdcScanQuery)) { + assertTrue("Should find TTL delete event via scan", rs.next()); + Map<String, Object> cdcEvent = mapper.readValue(rs.getString(1), HashMap.class); + assertEquals("Should be ttl_delete event", CDC_TTL_DELETE_EVENT_TYPE, + cdcEvent.get(CDC_EVENT_TYPE)); + } + + LOG.info("CDC TTL test completed successfully for table: {}", tableName); + } + } private void flush(TableName table) throws IOException { Admin admin = getUtility().getAdmin(); diff --git a/phoenix-core/src/it/java/org/apache/phoenix/schema/ConditionalTTLExpressionIT.java b/phoenix-core/src/it/java/org/apache/phoenix/schema/ConditionalTTLExpressionIT.java index 6fdc674a04..8e582373f4 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/schema/ConditionalTTLExpressionIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/schema/ConditionalTTLExpressionIT.java @@ -31,7 +31,6 @@ import static org.apache.phoenix.schema.LiteralTTLExpression.TTL_EXPRESSION_FORE import static org.apache.phoenix.util.TestUtil.retainSingleQuotes; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -43,6 +42,7 @@ import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Timestamp; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Date; @@ -55,6 +55,7 @@ import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import java.util.stream.Stream; +import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.mapreduce.CounterGroup; @@ -66,6 +67,7 @@ import org.apache.phoenix.hbase.index.IndexRegionObserver; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.jdbc.PhoenixResultSet; import org.apache.phoenix.mapreduce.index.IndexTool; +import org.apache.phoenix.query.PhoenixTestBuilder; import org.apache.phoenix.query.PhoenixTestBuilder.SchemaBuilder; import org.apache.phoenix.query.PhoenixTestBuilder.SchemaBuilder.OtherOptions; import org.apache.phoenix.query.PhoenixTestBuilder.SchemaBuilder.TableOptions; @@ -130,6 +132,8 @@ public class ConditionalTTLExpressionIT extends ParallelStatsDisabledIT { private Map<Integer, String> dataRowPosToKey = Maps.newHashMap(); private Map<Integer, String> indexRowPosToKey = Maps.newHashMap(); + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + public ConditionalTTLExpressionIT(boolean columnEncoded, Integer tableLevelMaxLooback) { this.columnEncoded = columnEncoded; @@ -421,6 +425,193 @@ public class ConditionalTTLExpressionIT extends ParallelStatsDisabledIT { } } + /** + * Tests CDC (Change Data Capture) functionality with TTL (Time To Live) expired rows. + * This test validates the complete CDC lifecycle including: + */ + @Test + public void testPhoenixRowTimestampWithCdc() throws Exception { + int ttl = 50 * 1000; + String ttlExpression = String.format( + "TO_NUMBER(CURRENT_TIME()) - TO_NUMBER(PHOENIX_ROW_TIMESTAMP()) >= %d", ttl); + createTable(ttlExpression); + String tableName = schemaBuilder.getEntityTableName(); + String cdcName = "cdc_" + generateUniqueName(); + injectEdge(); + int rowCount = 5; + long actual; + try (Connection conn = DriverManager.getConnection(getUrl())) { + // Initial Setup - Create CDC index on the table + conn.createStatement().execute("CREATE CDC " + cdcName + " ON " + tableName); + populateTable(conn, rowCount); + + // Verify initial row count + actual = TestUtil.getRowCount(conn, tableName, true); + assertEquals("Table should contain all inserted rows", 5, actual); + + // Query initial CDC events (inserts) + String cdcQuery = "SELECT /*+ CDC_INCLUDE(PRE, POST) */ * FROM " + + PhoenixTestBuilder.DDLDefaults.DEFAULT_SCHEMA_NAME + "." + cdcName; + + ResultSet resultSet = conn.createStatement().executeQuery(cdcQuery); + List<Map<String, Object>> postImageList = new ArrayList<>(); + while (resultSet.next()) { + String cdcVal = resultSet.getString(4); + Map<String, Object> map = OBJECT_MAPPER.readValue(cdcVal, Map.class); + + // Validate insert events have no pre-image but have post-image + Map<String, Object> preImage = + (Map<String, Object>) map.get(QueryConstants.CDC_PRE_IMAGE); + assertTrue("Insert events should have empty pre-image", preImage.isEmpty()); + + Map<String, Object> postImage = + (Map<String, Object>) map.get(QueryConstants.CDC_POST_IMAGE); + assertFalse("Insert events should have non-empty post-image", postImage.isEmpty()); + postImageList.add(postImage); + + assertEquals("Initial events should be UPSERT type", + QueryConstants.CDC_UPSERT_EVENT_TYPE, + map.get(QueryConstants.CDC_EVENT_TYPE)); + } + assertEquals("Post image list size should be 5 but it is " + postImageList.size(), 5, + postImageList.size()); + + // TTL Expiration - Advance time to trigger TTL expiration + injectEdge.incrementValue(ttl); + doMajorCompaction(tableName); + + // Verify all rows are expired from data table + actual = TestUtil.getRowCount(conn, tableName, true); + assertEquals("All rows should be expired after TTL", 0, actual); + + // TTL CDC Events - Validate TTL_DELETE events are generated + resultSet = conn.createStatement().executeQuery(cdcQuery); + int i = 0; + while (resultSet.next()) { + String cdcVal = resultSet.getString(4); + Map<String, Object> map = OBJECT_MAPPER.readValue(cdcVal, Map.class); + + // Validate TTL delete events + assertEquals("TTL expired rows should generate TTL_DELETE events", + QueryConstants.CDC_TTL_DELETE_EVENT_TYPE, + map.get(QueryConstants.CDC_EVENT_TYPE)); + + Map<String, Object> preImage = + (Map<String, Object>) map.get(QueryConstants.CDC_PRE_IMAGE); + assertFalse("TTL_DELETE events should have non-empty pre-image", + preImage.isEmpty()); + + Map<String, Object> postImage = + (Map<String, Object>) map.get(QueryConstants.CDC_POST_IMAGE); + assertTrue("TTL_DELETE events should have empty post-image", postImage.isEmpty()); + + // TTL delete pre-image should match previous upsert post-image + assertEquals("TTL_DELETE pre-image should match original insert post-image", + postImageList.get(i), preImage); + i++; + } + assertEquals("Num of TTL_DELETE events verified should be 5 but it is " + i, 5, i); + + // Update an expired row to bring it back + injectEdge.incrementValue(1); + long currentTime = injectEdge.currentTime(); + updateColumn(conn, 1, "VAL4", currentTime); + + // Verify the row + actual = TestUtil.getRowCount(conn, tableName, true); + assertEquals("Only one row should be resurrected after update", 1, actual); + + // Verify resurrected row has only updated column visible + try (ResultSet rs = readRow(conn, 1)) { + assertTrue("Resurrected row should exist", rs.next()); + for (String col : COLUMNS) { + if (!col.equals("VAL4")) { + assertNull("Non-updated columns should be null in resurrected row", + rs.getObject(col)); + } else { + assertEquals("Updated column should have new timestamp", + currentTime, rs.getTimestamp("VAL4").getTime()); + } + } + } + + // Advance time beyond max lookback window + injectEdge.incrementValue(tableLevelMaxLookback * 1000L + 2); + doMajorCompaction(tableName); + CellCount expectedCellCount = new CellCount(); + expectedCellCount.insertRow(dataRowPosToKey.get(1), 2); + validateTable(conn, tableName, expectedCellCount, dataRowPosToKey.values()); + + // Query CDC events + cdcQuery = "SELECT /*+ CDC_INCLUDE(PRE, POST) */ * FROM " + + PhoenixTestBuilder.DDLDefaults.DEFAULT_SCHEMA_NAME + "." + cdcName + + " WHERE PHOENIX_ROW_TIMESTAMP() >= ?"; + PreparedStatement ps = conn.prepareStatement(cdcQuery); + ps.setTimestamp(1, new Timestamp(currentTime)); + resultSet = ps.executeQuery(); + postImageList = new ArrayList<>(); + while (resultSet.next()) { + String cdcVal = resultSet.getString(4); + Map<String, Object> map = OBJECT_MAPPER.readValue(cdcVal, Map.class); + + assertEquals("Resurrection event should be UPSERT type", + QueryConstants.CDC_UPSERT_EVENT_TYPE, + map.get(QueryConstants.CDC_EVENT_TYPE)); + + Map<String, Object> preImage = + (Map<String, Object>) map.get(QueryConstants.CDC_PRE_IMAGE); + assertTrue("Resurrection event should have empty pre-image", preImage.isEmpty()); + + Map<String, Object> postImage = + (Map<String, Object>) map.get(QueryConstants.CDC_POST_IMAGE); + assertFalse("Resurrection event should have non-empty post-image", + postImage.isEmpty()); + postImageList.add(postImage); + } + assertEquals("Post image list size should be 5 but it is " + postImageList.size(), 1, + postImageList.size()); + + // Trigger TTL expiration again + injectEdge.incrementValue(ttl); + doMajorCompaction(tableName); + + // Verify all rows are expired from data table + actual = TestUtil.getRowCount(conn, tableName, true); + assertEquals("All rows should be expired after TTL", 0, actual); + + expectedCellCount = new CellCount(); + validateTable(conn, tableName, expectedCellCount, dataRowPosToKey.values()); + + // Validate second round of TTL_DELETE events + ps = conn.prepareStatement(cdcQuery); + ps.setTimestamp(1, new Timestamp(currentTime)); + resultSet = ps.executeQuery(); + i = 0; + while (resultSet.next()) { + String cdcVal = resultSet.getString(4); + Map<String, Object> map = OBJECT_MAPPER.readValue(cdcVal, Map.class); + + assertEquals("Second TTL expiration should generate TTL_DELETE events", + QueryConstants.CDC_TTL_DELETE_EVENT_TYPE, + map.get(QueryConstants.CDC_EVENT_TYPE)); + + Map<String, Object> preImage = + (Map<String, Object>) map.get(QueryConstants.CDC_PRE_IMAGE); + assertFalse("Second TTL_DELETE should have non-empty pre-image", + preImage.isEmpty()); + + Map<String, Object> postImage = + (Map<String, Object>) map.get(QueryConstants.CDC_POST_IMAGE); + assertTrue("Second TTL_DELETE should have empty post-image", postImage.isEmpty()); + + assertEquals("Second TTL_DELETE pre-image should match resurrection post-image", + postImageList.get(i), preImage); + i++; + } + assertEquals("Num of TTL_DELETE events verified should be 5 but it is " + i, 1, i); + } + } + @Test public void testDeleteMarkers() throws Exception { String ttlCol = "VAL5";