This is an automated email from the ASF dual-hosted git repository.

vjasani pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/master by this push:
     new af0f1da1b2 PHOENIX-7671 Sync maxLookback from data table to indexes 
(#2230)
af0f1da1b2 is described below

commit af0f1da1b2bad86b233bea21c4ccc58176c7394a
Author: Viraj Jasani <[email protected]>
AuthorDate: Thu Jul 17 21:50:44 2025 -0700

    PHOENIX-7671 Sync maxLookback from data table to indexes (#2230)
---
 .../apache/phoenix/exception/SQLExceptionCode.java |   3 +
 .../phoenix/query/ConnectionQueryServicesImpl.java |  51 ++++-
 .../org/apache/phoenix/schema/MetaDataClient.java  |  10 +-
 .../org/apache/phoenix/end2end/CDCStreamIT.java    | 229 ++++++++++++++++++++-
 .../apache/phoenix/end2end/PropertiesInSyncIT.java |  38 ++++
 .../org/apache/phoenix/end2end/TableTTLIT.java     |   5 +-
 .../phoenix/schema/ConditionalTTLExpressionIT.java |  19 +-
 7 files changed, 328 insertions(+), 27 deletions(-)

diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
index c85395ad2f..1eaa48d972 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
@@ -55,6 +55,7 @@ import org.apache.phoenix.util.MetaDataUtil;
 
 import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
 
+import static 
org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants.PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CHANGE_DETECTION_ENABLED;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SALT_BUCKETS;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TTL;
@@ -398,6 +399,8 @@ public enum SQLExceptionCode {
 
     CDC_ALREADY_ENABLED(10963, "44A45",
             "CDC on this table is either enabled or is in the process of being 
enabled."),
+    CANNOT_SET_OR_ALTER_MAX_LOOKBACK_FOR_INDEX(10964, "44A46",
+            "Cannot set or alter " + PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY + " on 
an index"),
 
     /** Sequence related */
     SEQUENCE_ALREADY_EXIST(1200, "42Z00", "Sequence already exists.", new 
Factory() {
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index 9e73cde0eb..d74f0e7579 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -23,6 +23,7 @@ import static 
org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder.REPLI
 import static org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder.TTL;
 import static 
org.apache.hadoop.hbase.client.MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY;
 import static 
org.apache.hadoop.hbase.ipc.RpcControllerFactory.CUSTOM_CONTROLLER_CONF_KEY;
+import static 
org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants.PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY;
 import static 
org.apache.phoenix.coprocessorclient.MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP;
 import static 
org.apache.phoenix.coprocessorclient.MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_15_0;
 import static 
org.apache.phoenix.coprocessorclient.MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_16_0;
@@ -68,7 +69,6 @@ import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TASK_TABLE_TTL;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TENANT_ID;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TRANSACTIONAL;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TTL_FOR_MUTEX;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TTL_NOT_DEFINED;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_CONSTANT;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_INDEX_ID;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_HBASE_TABLE_NAME;
@@ -1290,6 +1290,16 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
         }
     }
 
+    private void modifyTableDescriptor(TableDescriptorBuilder td, Map<String, 
Object> props) {
+        if (props != null) {
+            for (Entry<String, Object> entry : props.entrySet()) {
+                String propName = entry.getKey();
+                Object value = entry.getValue();
+                td.setValue(propName, value == null ? null : value.toString());
+            }
+        }
+    }
+
     private TableDescriptorBuilder generateTableDescriptor(byte[] 
physicalTableName,  byte[] parentPhysicalTableName, TableDescriptor 
existingDesc,
             PTableType tableType, Map<String, Object> tableProps, 
List<Pair<byte[], Map<String, Object>>> families,
             byte[][] splits, boolean isNamespaceMapped) throws SQLException {
@@ -1318,6 +1328,11 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
                 // PHYSICAL_DATA_TABLE_NAME points to the name of the view 
instead of the physical base table
                 baseTableDesc = existingDesc;
             }
+            String baseTableMaxLookbackVal =
+                    baseTableDesc.getValue(PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY);
+            if (baseTableMaxLookbackVal != null) {
+                tableProps.put(PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY, 
baseTableMaxLookbackVal);
+            }
             dataTableColDescForIndexTablePropSyncing = 
baseTableDesc.getColumnFamily(defaultFamilyBytes);
             // It's possible that the table has specific column families and 
none of them are declared
             // to be the DEFAULT_COLUMN_FAMILY, so we choose the first column 
family for syncing properties
@@ -3165,6 +3180,7 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
         Integer newReplicationScope = null;
         KeepDeletedCells newKeepDeletedCells = null;
         TransactionFactory.Provider txProvider = null;
+        Integer newMaxLookback = null;
         for (String family : properties.keySet()) {
             List<Pair<String, Object>> propsList = properties.get(family);
             if (propsList != null && propsList.size() > 0) {
@@ -3191,6 +3207,15 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
                             .build()
                             .buildException();
                         }
+                        if 
(PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY.equals(propName)) {
+                            if (table.getType() == PTableType.INDEX) {
+                                throw new SQLExceptionInfo.Builder(
+                                        
SQLExceptionCode.CANNOT_SET_OR_ALTER_MAX_LOOKBACK_FOR_INDEX)
+                                        .setMessage("Property: " + propName)
+                                        .build().buildException();
+                            }
+                            newMaxLookback = (Integer) propValue;
+                        }
                         tableProps.put(propName, propValue);
                     } else {
                         if (TableProperty.isPhoenixTableProperty(propName)) {
@@ -3477,7 +3502,9 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
         // Copy properties that need to be synced from the default column 
family of the base table to
         // the column families of each of its indexes (including indexes on 
this base table's views)
         // and store those table descriptor mappings as well
-        setSyncedPropertiesForTableIndexes(table, 
tableAndIndexDescriptorMappings, applyPropsToAllIndexColFams);
+        setSyncedPropertiesForTableIndexes(table, 
tableAndIndexDescriptorMappings,
+                applyPropsToAllIndexColFams,
+                getNewSyncedPropsMapForTableDescriptor(newMaxLookback));
         return tableAndIndexDescriptorMappings;
     }
 
@@ -3575,6 +3602,15 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
         return newSyncedProps;
     }
 
+    private Map<String, Object> getNewSyncedPropsMapForTableDescriptor(Integer 
newMaxLookback) {
+        if (newMaxLookback == null) {
+            return null;
+        }
+        Map<String, Object> newSyncedProps = new HashMap<>(1);
+        setPropIfNotNull(newSyncedProps, PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY, 
newMaxLookback);
+        return newSyncedProps;
+    }
+
     /**
      * Set the new values for properties that are to be kept in sync amongst 
those column families of the table which are
      * not referenced in the context of our alter table command, including the 
local index column family if it exists
@@ -3603,12 +3639,16 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
      * @param table base table
      * @param tableAndIndexDescriptorMappings old to new table descriptor 
mappings
      * @param applyPropsToAllIndexesDefaultCF new properties to apply to all 
index column families
+     * @param applyPropsToAllIndexesTd new properties to apply to all index 
table descriptors
      * @throws SQLException
      */
     private void setSyncedPropertiesForTableIndexes(PTable table,
-            Map<TableDescriptor, TableDescriptor> 
tableAndIndexDescriptorMappings,
-            Map<String, Object> applyPropsToAllIndexesDefaultCF) throws 
SQLException {
-        if (applyPropsToAllIndexesDefaultCF == null || 
applyPropsToAllIndexesDefaultCF.isEmpty()) {
+                                                    Map<TableDescriptor, 
TableDescriptor> tableAndIndexDescriptorMappings,
+                                                    Map<String, Object> 
applyPropsToAllIndexesDefaultCF,
+                                                    Map<String, Object> 
applyPropsToAllIndexesTd)
+            throws SQLException {
+        if ((applyPropsToAllIndexesDefaultCF == null || 
applyPropsToAllIndexesDefaultCF.isEmpty())
+                && (applyPropsToAllIndexesTd == null || 
applyPropsToAllIndexesTd.isEmpty())) {
             return;
         }
 
@@ -3619,6 +3659,7 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
             }
             TableDescriptor origIndexDescriptor = 
this.getTableDescriptor(indexTable.getPhysicalName().getBytes());
             TableDescriptorBuilder newIndexDescriptorBuilder = 
TableDescriptorBuilder.newBuilder(origIndexDescriptor);
+            modifyTableDescriptor(newIndexDescriptorBuilder, 
applyPropsToAllIndexesTd);
 
             byte[] defaultIndexColFam = 
SchemaUtil.getEmptyColumnFamily(indexTable);
             ColumnFamilyDescriptorBuilder indexDefaultColDescriptorBuilder =
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index 5a33d161f2..bd6c6fafc6 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -17,6 +17,7 @@
  */
 package org.apache.phoenix.schema;
 
+import static 
org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants.PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY;
 import static 
org.apache.phoenix.exception.SQLExceptionCode.CANNOT_SET_CONDITIONAL_TTL_ON_TABLE_WITH_MULTIPLE_COLUMN_FAMILIES;
 import static 
org.apache.phoenix.exception.SQLExceptionCode.CANNOT_TRANSFORM_TRANSACTIONAL_TABLE;
 import static 
org.apache.phoenix.exception.SQLExceptionCode.CDC_ALREADY_ENABLED;
@@ -27,7 +28,6 @@ import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CDC_INCLUDE_TABLE;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CDC_STREAM_NAME;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CDC_STREAM_STATUS_NAME;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.STREAMING_TOPIC_NAME;
-import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CDC_STREAM_STATUS_TABLE;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_TASK_TABLE;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TTL;
 import static org.apache.phoenix.query.QueryConstants.SPLITS_FILE;
@@ -136,7 +136,6 @@ import static 
org.apache.phoenix.schema.PTable.ImmutableStorageScheme.ONE_CELL_P
 import static 
org.apache.phoenix.schema.PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS;
 import static 
org.apache.phoenix.schema.PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS;
 import static org.apache.phoenix.schema.PTable.ViewType.MAPPED;
-import static org.apache.phoenix.schema.PTable.ViewType.UPDATABLE;
 import static org.apache.phoenix.schema.PTableType.INDEX;
 import static org.apache.phoenix.schema.PTableType.TABLE;
 import static org.apache.phoenix.schema.PTableType.VIEW;
@@ -1251,6 +1250,13 @@ public class MetaDataClient {
                             .setMessage("Property: " + prop.getFirst()).build()
                             .buildException();
                 }
+                if (tableType == PTableType.INDEX
+                        && 
PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY.equals(prop.getFirst())) {
+                    throw new SQLExceptionInfo.Builder(
+                            
SQLExceptionCode.CANNOT_SET_OR_ALTER_MAX_LOOKBACK_FOR_INDEX)
+                            .setMessage("Property: " + prop.getFirst())
+                            .build().buildException();
+                }
                 // Handle when TTL property is set
                 if (prop.getFirst().equalsIgnoreCase(TTL)
                         && (tableType != PTableType.SYSTEM
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCStreamIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCStreamIT.java
index c515425ecd..233e733a3a 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCStreamIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCStreamIT.java
@@ -18,14 +18,16 @@
 
 package org.apache.phoenix.end2end;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.TableDescriptor;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.coprocessor.PhoenixMasterObserver;
 import org.apache.phoenix.coprocessor.TaskRegionObserver;
-import org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
@@ -46,7 +48,6 @@ import org.junit.experimental.categories.Category;
 import org.apache.phoenix.coprocessorclient.metrics.MetricsPhoenixMasterSource;
 import 
org.apache.phoenix.coprocessorclient.metrics.MetricsPhoenixCoprocessorSourceFactory;
 
-import java.io.IOException;
 import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
@@ -57,15 +58,21 @@ import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
+import static 
org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants.PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CDC_STREAM_NAME;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CDC_STREAM_STATUS_NAME;
 import static org.apache.phoenix.query.QueryConstants.CDC_EVENT_TYPE;
+import static org.apache.phoenix.query.QueryConstants.CDC_POST_IMAGE;
 import static org.apache.phoenix.query.QueryConstants.CDC_PRE_IMAGE;
+import static 
org.apache.phoenix.query.QueryConstants.CDC_TTL_DELETE_EVENT_TYPE;
 import static org.apache.phoenix.query.QueryConstants.CDC_UPSERT_EVENT_TYPE;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
 @Category(NeedsOwnMiniClusterTest.class)
@@ -78,7 +85,7 @@ public class CDCStreamIT extends CDCBaseIT {
     @BeforeClass
     public static synchronized void doSetup() throws Exception {
         Map<String, String> props = Maps.newHashMapWithExpectedSize(1);
-        
props.put(BaseScannerRegionObserverConstants.PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY, 
Integer.toString(60*60)); // An hour
+        props.put(PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY, Integer.toString(60 * 
60)); // An hour
         props.put(QueryServices.USE_STATS_FOR_PARALLELIZATION, 
Boolean.toString(false));
         props.put(QueryServices.TASK_HANDLING_INTERVAL_MS_ATTRIB,
                 Long.toString(Long.MAX_VALUE));
@@ -95,6 +102,222 @@ public class CDCStreamIT extends CDCBaseIT {
                         
.findCoprocessorEnvironment(TaskRegionObserver.class.getName());
     }
 
+    /**
+     * Test to verify CDC events with index/table level maxLookback and TTL.
+     */
+    @Test
+    public void testCdcWithMaxLookbackAndCompaction() throws Exception {
+        Connection conn = newConnection();
+        String tableName = generateUniqueName();
+        String cdcName = generateUniqueName();
+        String cdcDdl = "CREATE CDC " + cdcName + " ON " + tableName;
+        // maxLookback 27 hr
+        conn.createStatement().execute(
+                "CREATE TABLE  " + tableName + " ( k VARCHAR NOT NULL,"
+                        + " v1 VARCHAR, v2 BIGINT CONSTRAINT PK PRIMARY 
KEY(k))"
+                        + " \"phoenix.max.lookback.age.seconds\"=97200,"
+                        + "TTL='TO_NUMBER(CURRENT_TIME()) > v2', 
IS_STRICT_TTL=false");
+        // cdc index should get 27 hr maxLookback (for cdc index, maxLookback 
= TTL)
+        createCDC(conn, cdcDdl, null);
+
+        long expiry = (System.currentTimeMillis() + TimeUnit.DAYS.toMillis(3)) 
/ 1000;
+        conn.createStatement()
+                .execute("UPSERT INTO " + tableName + " VALUES('a', 'aa', " + 
expiry + ")");
+        conn.commit();
+
+        String sql = "SELECT /*+ CDC_INCLUDE(PRE, POST) */ * FROM " + cdcName;
+        // verify only pre-image exists, new row insert
+        verifyOnlyPostImage(conn, sql, expiry);
+
+        ManualEnvironmentEdge injectEdge = new ManualEnvironmentEdge();
+        long t = System.currentTimeMillis() + TimeUnit.DAYS.toMillis(3);
+        t = ((t / 1000) + 100) * 1000;
+        EnvironmentEdgeManager.injectEdge(injectEdge);
+        try {
+            injectEdge.setValue(t);
+            conn.createStatement()
+                    .execute("UPSERT INTO " + tableName + " VALUES('a', 'bb', 
" + expiry + ")");
+            conn.commit();
+
+            // verify insert + update event of the same row
+            verifyPreAndPostImages(conn, sql, expiry);
+
+            injectEdge.incrementValue(TimeUnit.DAYS.toMillis(1));
+            TestUtil.doMajorCompaction(conn, tableName);
+            TestUtil.doMajorCompaction(conn, CDCUtil.getCDCIndexName(cdcName));
+
+            // first compaction will expire initial insert event as we are 
already past
+            // (3 days + 100 sec + 24 hr), however this should not generate 
ttl expired event yet,
+            // because row is still in maxLookback window (27 hr) after the 
latest update
+            verifyImagesAfterFirstCompaction(conn, sql, expiry);
+
+            injectEdge.incrementValue(TimeUnit.HOURS.toMillis(3) + 
TimeUnit.SECONDS.toMillis(1));
+            TestUtil.doMajorCompaction(conn, tableName);
+            TestUtil.doMajorCompaction(conn, CDCUtil.getCDCIndexName(cdcName));
+
+            // The compaction after (3 days + 100 sec + 24 hr + 3 hr + 1 sec) 
should expire the
+            // row, which was last updated (24 hr + 3 hr + 1 sec) in the past.
+            // The compaction should also generate ttl expired event
+            verifyTtlExpiredPreImage(conn, sql, expiry);
+        } finally {
+            EnvironmentEdgeManager.reset();
+        }
+    }
+
+    /**
+     * Test to verify CDC events with index/table level maxLookback and TTL.
+     */
+    @Test
+    public void testCdcWithMaxLookbackAlterTable() throws Exception {
+        Connection conn = newConnection();
+        String tableName = generateUniqueName();
+        String cdcName = generateUniqueName();
+        String cdcDdl = "CREATE CDC " + cdcName + " ON " + tableName;
+        // maxLookback 27 hr
+        conn.createStatement().execute(
+                "CREATE TABLE  " + tableName + " ( k VARCHAR NOT NULL,"
+                        + " v1 VARCHAR, v2 BIGINT CONSTRAINT PK PRIMARY 
KEY(k))"
+                        + " \"phoenix.max.lookback.age.seconds\"=97200,"
+                        + "TTL='TO_NUMBER(CURRENT_TIME()) > v2', 
IS_STRICT_TTL=false");
+        // cdc index should get 27 hr maxLookback (for cdc index, maxLookback 
= TTL)
+        createCDC(conn, cdcDdl, null);
+
+        long expiry = (System.currentTimeMillis() + TimeUnit.DAYS.toMillis(3)) 
/ 1000;
+        conn.createStatement()
+                .execute("UPSERT INTO " + tableName + " VALUES('a', 'aa', " + 
expiry + ")");
+        conn.commit();
+
+        Admin admin = 
conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin();
+
+        TableDescriptor tableDescriptor =
+                
admin.getDescriptor(TableName.valueOf(CDCUtil.getCDCIndexName(cdcName)));
+        String maxLookbackVal = 
tableDescriptor.getValue(PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY);
+        assertEquals(97200, Integer.parseInt(maxLookbackVal));
+
+        tableDescriptor = admin.getDescriptor(TableName.valueOf(tableName));
+        maxLookbackVal = 
tableDescriptor.getValue(PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY);
+        assertEquals(97200, Integer.parseInt(maxLookbackVal));
+
+        String sql = "SELECT /*+ CDC_INCLUDE(PRE, POST) */ * FROM " + cdcName;
+        // verify only pre-image exists, new row insert
+        verifyOnlyPostImage(conn, sql, expiry);
+
+        // update maxLookback of data table and cdc index both to 50 sec
+        conn.createStatement().execute("ALTER TABLE " + tableName +
+                " SET \"phoenix.max.lookback.age.seconds\"=50");
+
+        ManualEnvironmentEdge injectEdge = new ManualEnvironmentEdge();
+        long t = System.currentTimeMillis() + TimeUnit.DAYS.toMillis(3);
+        t = ((t / 1000) + 100) * 1000;
+        EnvironmentEdgeManager.injectEdge(injectEdge);
+        try {
+            injectEdge.setValue(t);
+            conn.createStatement()
+                    .execute("UPSERT INTO " + tableName + " VALUES('a', 'bb', 
" + expiry + ")");
+            conn.commit();
+
+            // verify insert + update event of the same row
+            verifyPreAndPostImages(conn, sql, expiry);
+
+            injectEdge.incrementValue(TimeUnit.DAYS.toMillis(1));
+            TestUtil.doMajorCompaction(conn, CDCUtil.getCDCIndexName(cdcName));
+
+            // compaction will expire all rows from CDC index because the 
maxLookback of
+            // CDC index is no longer 27 hr, but it has been altered to 50 sec
+            ResultSet rs = conn.createStatement().executeQuery(sql);
+            assertFalse(rs.next());
+
+            tableDescriptor =
+                    
admin.getDescriptor(TableName.valueOf(CDCUtil.getCDCIndexName(cdcName)));
+            maxLookbackVal = 
tableDescriptor.getValue(PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY);
+            assertEquals(50, Integer.parseInt(maxLookbackVal));
+
+            tableDescriptor = 
admin.getDescriptor(TableName.valueOf(tableName));
+            maxLookbackVal = 
tableDescriptor.getValue(PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY);
+            assertEquals(50, Integer.parseInt(maxLookbackVal));
+        } finally {
+            EnvironmentEdgeManager.reset();
+        }
+    }
+
+    private static void verifyPreAndPostImages(Connection conn, String sql, 
long expiry)
+            throws SQLException, JsonProcessingException {
+        ResultSet rs = conn.createStatement().executeQuery(sql);
+        assertTrue(rs.next());
+        String cdcJson = rs.getString(3);
+        Map<String, Object> map = OBJECT_MAPPER.readValue(cdcJson, Map.class);
+        assertTrue(((Map<String, Object>)map.get(CDC_PRE_IMAGE)).isEmpty());
+        assertEquals(CDC_UPSERT_EVENT_TYPE, map.get(CDC_EVENT_TYPE));
+        Map<String, Object> postImage = (Map<String, Object>) 
map.get(CDC_POST_IMAGE);
+        assertEquals("aa", postImage.get("V1"));
+        assertEquals(new Long(expiry).intValue(), postImage.get("V2"));
+
+        assertTrue(rs.next());
+        cdcJson = rs.getString(3);
+        map = OBJECT_MAPPER.readValue(cdcJson, Map.class);
+        assertEquals(CDC_UPSERT_EVENT_TYPE, map.get(CDC_EVENT_TYPE));
+        Map<String, Object> preImage = (Map<String, Object>) 
map.get(CDC_PRE_IMAGE);
+        assertEquals("aa", preImage.get("V1"));
+        assertEquals(new Long(expiry).intValue(), preImage.get("V2"));
+
+        postImage = (Map<String, Object>) map.get(CDC_POST_IMAGE);
+        assertEquals("bb", postImage.get("V1"));
+        assertEquals(new Long(expiry).intValue(), postImage.get("V2"));
+
+        assertFalse(rs.next());
+    }
+
+    private static void verifyImagesAfterFirstCompaction(Connection conn, 
String sql, long expiry)
+            throws SQLException, JsonProcessingException {
+        ResultSet rs = conn.createStatement().executeQuery(sql);
+        assertTrue(rs.next());
+        String cdcJson = rs.getString(3);
+        Map<String, Object> map = OBJECT_MAPPER.readValue(cdcJson, Map.class);
+        assertEquals(CDC_UPSERT_EVENT_TYPE, map.get(CDC_EVENT_TYPE));
+        Map<String, Object> preImage = (Map<String, Object>) 
map.get(CDC_PRE_IMAGE);
+        assertEquals("aa", preImage.get("V1"));
+        assertEquals(new Long(expiry).intValue(), preImage.get("V2"));
+
+        Map<String, Object> postImage = (Map<String, Object>) 
map.get(CDC_POST_IMAGE);
+        assertEquals("bb", postImage.get("V1"));
+        assertEquals(new Long(expiry).intValue(), postImage.get("V2"));
+
+        assertFalse(rs.next());
+    }
+
+    private static void verifyOnlyPostImage(Connection conn, String sql, long 
expiry)
+            throws SQLException, JsonProcessingException {
+        ResultSet rs = conn.createStatement().executeQuery(sql);
+        assertTrue(rs.next());
+        String cdcJson = rs.getString(3);
+        Map<String, Object> map = OBJECT_MAPPER.readValue(cdcJson, Map.class);
+        assertTrue(((Map<String, Object>)map.get(CDC_PRE_IMAGE)).isEmpty());
+        assertEquals(CDC_UPSERT_EVENT_TYPE, map.get(CDC_EVENT_TYPE));
+
+        Map<String, Object> postImage = (Map<String, Object>) 
map.get(CDC_POST_IMAGE);
+        assertEquals("aa", postImage.get("V1"));
+        assertEquals(new Long(expiry).intValue(), postImage.get("V2"));
+
+        assertFalse(rs.next());
+    }
+
+    private static void verifyTtlExpiredPreImage(Connection conn, String sql, 
long expiry)
+            throws SQLException, JsonProcessingException {
+        ResultSet rs = conn.createStatement().executeQuery(sql);
+        assertTrue(rs.next());
+        String cdcJson = rs.getString(3);
+        Map<String, Object> map = OBJECT_MAPPER.readValue(cdcJson, Map.class);
+        assertEquals(CDC_TTL_DELETE_EVENT_TYPE, map.get(CDC_EVENT_TYPE));
+        Map<String, Object> preImage = (Map<String, Object>) 
map.get(CDC_PRE_IMAGE);
+        assertEquals("bb", preImage.get("V1"));
+        assertEquals(new Long(expiry).intValue(), preImage.get("V2"));
+
+        Map<String, Object> postImage = (Map<String, Object>) 
map.get(CDC_POST_IMAGE);
+        assertNull(postImage);
+
+        assertFalse(rs.next());
+    }
+
     @Test
     public void testStreamPartitionMetadataBootstrap() throws Exception {
         Connection conn = newConnection();
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/PropertiesInSyncIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/PropertiesInSyncIT.java
index fde6e3515f..aa94a4ca3e 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/PropertiesInSyncIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/PropertiesInSyncIT.java
@@ -162,6 +162,35 @@ public class PropertiesInSyncIT extends 
ParallelStatsDisabledIT {
                         
SQLExceptionCode.CANNOT_SET_OR_ALTER_PROPERTY_FOR_INDEX.getErrorCode(), 
sqlE.getErrorCode());
             }
         }
+        try {
+            conn.createStatement().execute("create local index " + 
localIndexName
+                    + " on " + tableName + "(name) "
+                    + "\"phoenix.max.lookback.age.seconds\"=12345");
+            fail("Should fail with SQLException when setting synced property 
for a local index");
+        } catch (SQLException sqlE) {
+            assertEquals("Should fail to set synced property for a local 
index",
+                    
SQLExceptionCode.CANNOT_SET_OR_ALTER_MAX_LOOKBACK_FOR_INDEX.getErrorCode(),
+                    sqlE.getErrorCode());
+        }
+        try {
+            conn.createStatement().execute("create index " + globalIndexName
+                    + " on " + tableName + "(flag) "
+                    + "\"phoenix.max.lookback.age.seconds\"=12345");
+            fail("Should fail with SQLException when setting synced property 
for a global index");
+        } catch (SQLException sqlE) {
+            assertEquals("Should fail to set synced property for a global 
index",
+                    
SQLExceptionCode.CANNOT_SET_OR_ALTER_MAX_LOOKBACK_FOR_INDEX.getErrorCode(),
+                    sqlE.getErrorCode());
+        }
+        try {
+            conn.createStatement().execute("create index view_index"
+                    + " on " + viewName + " (flag)" + 
"\"phoenix.max.lookback.age.seconds\"=12345");
+            fail("Should fail with SQLException when setting synced property 
for a view index");
+        } catch (SQLException sqlE) {
+            assertEquals("Should fail to set synced property for a view index",
+                    
SQLExceptionCode.CANNOT_SET_OR_ALTER_MAX_LOOKBACK_FOR_INDEX.getErrorCode(),
+                    sqlE.getErrorCode());
+        }
         conn.close();
     }
 
@@ -327,6 +356,15 @@ public class PropertiesInSyncIT extends 
ParallelStatsDisabledIT {
                         
SQLExceptionCode.CANNOT_SET_OR_ALTER_PROPERTY_FOR_INDEX.getErrorCode(), 
sqlE.getErrorCode());
             }
         }
+        try {
+            conn.createStatement().execute("alter table " + globalIndexName + 
" set "
+                    + "\"phoenix.max.lookback.age.seconds\"=12345");
+            fail("Should fail with SQLException when altering synced property 
for a global index");
+        } catch (SQLException sqlE) {
+            assertEquals("Should fail to alter synced property for a global 
index",
+                    
SQLExceptionCode.CANNOT_SET_OR_ALTER_MAX_LOOKBACK_FOR_INDEX.getErrorCode(),
+                    sqlE.getErrorCode());
+        }
         conn.close();
     }
 
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableTTLIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableTTLIT.java
index 7fdabc648b..91130c695f 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableTTLIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableTTLIT.java
@@ -583,9 +583,8 @@ public class TableTTLIT extends BaseTest {
             conn.createStatement().execute("Alter Table " + tableName
                     + " set \"phoenix.max.lookback.age.seconds\" = " + 
tableLevelMaxLookback);
             String indexName = "I_" + generateUniqueName();
-            String indexDDL = String.format("create index %s on %s (val1) 
include (val2, val3) " +
-                            "\"phoenix.max.lookback.age.seconds\" = %d",
-                    indexName, tableName, tableLevelMaxLookback);
+            String indexDDL = String.format("create index %s on %s (val1) 
include (val2, val3) ",
+                    indexName, tableName);
             conn.createStatement().execute(indexDDL);
             updateRow(conn, tableName, "a1");
             String indexColumnValue;
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/schema/ConditionalTTLExpressionIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/schema/ConditionalTTLExpressionIT.java
index b10ec93ebd..060b11b7b9 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/schema/ConditionalTTLExpressionIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/schema/ConditionalTTLExpressionIT.java
@@ -838,8 +838,7 @@ public class ConditionalTTLExpressionIT extends 
ParallelStatsDisabledIT {
             // now create the index async
             String indexName = generateUniqueName();
             String fullIndexName = SchemaUtil.getTableName(schemaName, 
indexName);
-            String indexDDL = String.format("create index %s on %s (%s) 
include (%s) async "
-                            + "\"phoenix.max.lookback.age.seconds\" = %d",
+            String indexDDL = String.format("create index %s on %s (%s) 
include (%s) async ",
                     indexName, fullDataTableName, "VAL1", ttlCol, 
tableLevelMaxLookback);
             conn.createStatement().execute(indexDDL);
             IndexTool it = IndexToolIT.runIndexTool(false, schemaName, 
tableName, indexName,
@@ -949,8 +948,7 @@ public class ConditionalTTLExpressionIT extends 
ParallelStatsDisabledIT {
         }
         String ddl = String.format(ddlTemplate, tableName,
                 String.format(tableDDLOptions, 
retainSingleQuotes(ttlExpression)));
-        String indexDDL = String.format("create index %s ON %s (col1) 
INCLUDE(col2) " +
-                "\"phoenix.max.lookback.age.seconds\" = %d",
+        String indexDDL = String.format("create index %s ON %s (col1) 
INCLUDE(col2) ",
                 indexName, tableName, tableLevelMaxLookback, isStrictTTL);
         try (Connection conn = DriverManager.getConnection(getUrl())) {
             conn.createStatement().execute(ddl);
@@ -1303,8 +1301,7 @@ public class ConditionalTTLExpressionIT extends 
ParallelStatsDisabledIT {
         String ddl = String.format("CREATE TABLE %s (ID BIGINT NOT NULL 
PRIMARY KEY, " +
                         "EVENT_TYPE CHAR(15), CREATED_TS TIMESTAMP) %s", 
tableName,
                 String.format(tableDDLOptions, ttlExpression));
-        String indexDDL = String.format("CREATE INDEX %s ON %s (EVENT_TYPE) 
INCLUDE(CREATED_TS) "
-                        + "\"phoenix.max.lookback.age.seconds\" = %d",
+        String indexDDL = String.format("CREATE INDEX %s ON %s (EVENT_TYPE) 
INCLUDE(CREATED_TS) ",
                 indexName, tableName, tableLevelMaxLookback);
         try (Connection conn = DriverManager.getConnection(getUrl())) {
             conn.createStatement().execute(ddl);
@@ -1468,11 +1465,6 @@ public class ConditionalTTLExpressionIT extends 
ParallelStatsDisabledIT {
             String cdcIndexName = CDCUtil.getCDCIndexName(cdcName);
             String fullCdcIndexName = SchemaUtil.getTableName(schemaName,
                     CDCUtil.getCDCIndexName(cdcName));
-            // Explicitly set table level max lookback on CDC index
-            String cdcIndexSetMaxLookbackDdl = String.format("ALTER INDEX %s 
ON %s ACTIVE SET "
-                    + "\"phoenix.max.lookback.age.seconds\" = %d",
-                    cdcIndexName, tableName, tableLevelMaxLookback);
-            conn.createStatement().execute(cdcIndexSetMaxLookbackDdl);
             PTable cdcIndex = ((PhoenixConnection) 
conn).getTableNoCache(fullCdcIndexName);
             assertEquals(cdcIndex.getTTLExpression(), TTL_EXPRESSION_FOREVER);
 
@@ -1553,11 +1545,10 @@ public class ConditionalTTLExpressionIT extends 
ParallelStatsDisabledIT {
         String indexName = "I_" + generateUniqueName();
         String tableName = schemaBuilder.getEntityTableName();
         String schema = SchemaUtil.getSchemaNameFromFullName(tableName);
-        String indexDDL = String.format("create index %s on %s (%s) include 
(%s) "
-                        + "\"phoenix.max.lookback.age.seconds\" = %d",
+        String indexDDL = String.format("create index %s on %s (%s) include 
(%s) ",
                 indexName, tableName,
                 Joiner.on(",").join(indexedColumns),
-                Joiner.on(",").join(includedColumns), tableLevelMaxLookback);
+                Joiner.on(",").join(includedColumns));
         try (Connection conn = DriverManager.getConnection(getUrl())) {
             conn.createStatement().execute(indexDDL);
         }


Reply via email to