This is an automated email from the ASF dual-hosted git repository.
vjasani pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/master by this push:
new af0f1da1b2 PHOENIX-7671 Sync maxLookback from data table to indexes
(#2230)
af0f1da1b2 is described below
commit af0f1da1b2bad86b233bea21c4ccc58176c7394a
Author: Viraj Jasani <[email protected]>
AuthorDate: Thu Jul 17 21:50:44 2025 -0700
PHOENIX-7671 Sync maxLookback from data table to indexes (#2230)
---
.../apache/phoenix/exception/SQLExceptionCode.java | 3 +
.../phoenix/query/ConnectionQueryServicesImpl.java | 51 ++++-
.../org/apache/phoenix/schema/MetaDataClient.java | 10 +-
.../org/apache/phoenix/end2end/CDCStreamIT.java | 229 ++++++++++++++++++++-
.../apache/phoenix/end2end/PropertiesInSyncIT.java | 38 ++++
.../org/apache/phoenix/end2end/TableTTLIT.java | 5 +-
.../phoenix/schema/ConditionalTTLExpressionIT.java | 19 +-
7 files changed, 328 insertions(+), 27 deletions(-)
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
index c85395ad2f..1eaa48d972 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
@@ -55,6 +55,7 @@ import org.apache.phoenix.util.MetaDataUtil;
import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
+import static
org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants.PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY;
import static
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CHANGE_DETECTION_ENABLED;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SALT_BUCKETS;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TTL;
@@ -398,6 +399,8 @@ public enum SQLExceptionCode {
CDC_ALREADY_ENABLED(10963, "44A45",
"CDC on this table is either enabled or is in the process of being
enabled."),
+ CANNOT_SET_OR_ALTER_MAX_LOOKBACK_FOR_INDEX(10964, "44A46",
+ "Cannot set or alter " + PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY + " on
an index"),
/** Sequence related */
SEQUENCE_ALREADY_EXIST(1200, "42Z00", "Sequence already exists.", new
Factory() {
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index 9e73cde0eb..d74f0e7579 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -23,6 +23,7 @@ import static
org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder.REPLI
import static org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder.TTL;
import static
org.apache.hadoop.hbase.client.MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY;
import static
org.apache.hadoop.hbase.ipc.RpcControllerFactory.CUSTOM_CONTROLLER_CONF_KEY;
+import static
org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants.PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY;
import static
org.apache.phoenix.coprocessorclient.MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP;
import static
org.apache.phoenix.coprocessorclient.MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_15_0;
import static
org.apache.phoenix.coprocessorclient.MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_16_0;
@@ -68,7 +69,6 @@ import static
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TASK_TABLE_TTL;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TENANT_ID;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TRANSACTIONAL;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TTL_FOR_MUTEX;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TTL_NOT_DEFINED;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_CONSTANT;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_INDEX_ID;
import static
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_HBASE_TABLE_NAME;
@@ -1290,6 +1290,16 @@ public class ConnectionQueryServicesImpl extends
DelegateQueryServices implement
}
}
+ private void modifyTableDescriptor(TableDescriptorBuilder td, Map<String,
Object> props) {
+ if (props != null) {
+ for (Entry<String, Object> entry : props.entrySet()) {
+ String propName = entry.getKey();
+ Object value = entry.getValue();
+ td.setValue(propName, value == null ? null : value.toString());
+ }
+ }
+ }
+
private TableDescriptorBuilder generateTableDescriptor(byte[]
physicalTableName, byte[] parentPhysicalTableName, TableDescriptor
existingDesc,
PTableType tableType, Map<String, Object> tableProps,
List<Pair<byte[], Map<String, Object>>> families,
byte[][] splits, boolean isNamespaceMapped) throws SQLException {
@@ -1318,6 +1328,11 @@ public class ConnectionQueryServicesImpl extends
DelegateQueryServices implement
// PHYSICAL_DATA_TABLE_NAME points to the name of the view
instead of the physical base table
baseTableDesc = existingDesc;
}
+ String baseTableMaxLookbackVal =
+ baseTableDesc.getValue(PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY);
+ if (baseTableMaxLookbackVal != null) {
+ tableProps.put(PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY,
baseTableMaxLookbackVal);
+ }
dataTableColDescForIndexTablePropSyncing =
baseTableDesc.getColumnFamily(defaultFamilyBytes);
// It's possible that the table has specific column families and
none of them are declared
// to be the DEFAULT_COLUMN_FAMILY, so we choose the first column
family for syncing properties
@@ -3165,6 +3180,7 @@ public class ConnectionQueryServicesImpl extends
DelegateQueryServices implement
Integer newReplicationScope = null;
KeepDeletedCells newKeepDeletedCells = null;
TransactionFactory.Provider txProvider = null;
+ Integer newMaxLookback = null;
for (String family : properties.keySet()) {
List<Pair<String, Object>> propsList = properties.get(family);
if (propsList != null && propsList.size() > 0) {
@@ -3191,6 +3207,15 @@ public class ConnectionQueryServicesImpl extends
DelegateQueryServices implement
.build()
.buildException();
}
+ if
(PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY.equals(propName)) {
+ if (table.getType() == PTableType.INDEX) {
+ throw new SQLExceptionInfo.Builder(
+
SQLExceptionCode.CANNOT_SET_OR_ALTER_MAX_LOOKBACK_FOR_INDEX)
+ .setMessage("Property: " + propName)
+ .build().buildException();
+ }
+ newMaxLookback = (Integer) propValue;
+ }
tableProps.put(propName, propValue);
} else {
if (TableProperty.isPhoenixTableProperty(propName)) {
@@ -3477,7 +3502,9 @@ public class ConnectionQueryServicesImpl extends
DelegateQueryServices implement
// Copy properties that need to be synced from the default column
family of the base table to
// the column families of each of its indexes (including indexes on
this base table's views)
// and store those table descriptor mappings as well
- setSyncedPropertiesForTableIndexes(table,
tableAndIndexDescriptorMappings, applyPropsToAllIndexColFams);
+ setSyncedPropertiesForTableIndexes(table,
tableAndIndexDescriptorMappings,
+ applyPropsToAllIndexColFams,
+ getNewSyncedPropsMapForTableDescriptor(newMaxLookback));
return tableAndIndexDescriptorMappings;
}
@@ -3575,6 +3602,15 @@ public class ConnectionQueryServicesImpl extends
DelegateQueryServices implement
return newSyncedProps;
}
+ private Map<String, Object> getNewSyncedPropsMapForTableDescriptor(Integer
newMaxLookback) {
+ if (newMaxLookback == null) {
+ return null;
+ }
+ Map<String, Object> newSyncedProps = new HashMap<>(1);
+ setPropIfNotNull(newSyncedProps, PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY,
newMaxLookback);
+ return newSyncedProps;
+ }
+
/**
* Set the new values for properties that are to be kept in sync amongst
those column families of the table which are
* not referenced in the context of our alter table command, including the
local index column family if it exists
@@ -3603,12 +3639,16 @@ public class ConnectionQueryServicesImpl extends
DelegateQueryServices implement
* @param table base table
* @param tableAndIndexDescriptorMappings old to new table descriptor
mappings
* @param applyPropsToAllIndexesDefaultCF new properties to apply to all
index column families
+ * @param applyPropsToAllIndexesTd new properties to apply to all index
table descriptors
* @throws SQLException
*/
private void setSyncedPropertiesForTableIndexes(PTable table,
- Map<TableDescriptor, TableDescriptor>
tableAndIndexDescriptorMappings,
- Map<String, Object> applyPropsToAllIndexesDefaultCF) throws
SQLException {
- if (applyPropsToAllIndexesDefaultCF == null ||
applyPropsToAllIndexesDefaultCF.isEmpty()) {
+ Map<TableDescriptor,
TableDescriptor> tableAndIndexDescriptorMappings,
+ Map<String, Object>
applyPropsToAllIndexesDefaultCF,
+ Map<String, Object>
applyPropsToAllIndexesTd)
+ throws SQLException {
+ if ((applyPropsToAllIndexesDefaultCF == null ||
applyPropsToAllIndexesDefaultCF.isEmpty())
+ && (applyPropsToAllIndexesTd == null ||
applyPropsToAllIndexesTd.isEmpty())) {
return;
}
@@ -3619,6 +3659,7 @@ public class ConnectionQueryServicesImpl extends
DelegateQueryServices implement
}
TableDescriptor origIndexDescriptor =
this.getTableDescriptor(indexTable.getPhysicalName().getBytes());
TableDescriptorBuilder newIndexDescriptorBuilder =
TableDescriptorBuilder.newBuilder(origIndexDescriptor);
+ modifyTableDescriptor(newIndexDescriptorBuilder,
applyPropsToAllIndexesTd);
byte[] defaultIndexColFam =
SchemaUtil.getEmptyColumnFamily(indexTable);
ColumnFamilyDescriptorBuilder indexDefaultColDescriptorBuilder =
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index 5a33d161f2..bd6c6fafc6 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -17,6 +17,7 @@
*/
package org.apache.phoenix.schema;
+import static
org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants.PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY;
import static
org.apache.phoenix.exception.SQLExceptionCode.CANNOT_SET_CONDITIONAL_TTL_ON_TABLE_WITH_MULTIPLE_COLUMN_FAMILIES;
import static
org.apache.phoenix.exception.SQLExceptionCode.CANNOT_TRANSFORM_TRANSACTIONAL_TABLE;
import static
org.apache.phoenix.exception.SQLExceptionCode.CDC_ALREADY_ENABLED;
@@ -27,7 +28,6 @@ import static
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CDC_INCLUDE_TABLE;
import static
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CDC_STREAM_NAME;
import static
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CDC_STREAM_STATUS_NAME;
import static
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.STREAMING_TOPIC_NAME;
-import static
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CDC_STREAM_STATUS_TABLE;
import static
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_TASK_TABLE;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TTL;
import static org.apache.phoenix.query.QueryConstants.SPLITS_FILE;
@@ -136,7 +136,6 @@ import static
org.apache.phoenix.schema.PTable.ImmutableStorageScheme.ONE_CELL_P
import static
org.apache.phoenix.schema.PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS;
import static
org.apache.phoenix.schema.PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS;
import static org.apache.phoenix.schema.PTable.ViewType.MAPPED;
-import static org.apache.phoenix.schema.PTable.ViewType.UPDATABLE;
import static org.apache.phoenix.schema.PTableType.INDEX;
import static org.apache.phoenix.schema.PTableType.TABLE;
import static org.apache.phoenix.schema.PTableType.VIEW;
@@ -1251,6 +1250,13 @@ public class MetaDataClient {
.setMessage("Property: " + prop.getFirst()).build()
.buildException();
}
+ if (tableType == PTableType.INDEX
+ &&
PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY.equals(prop.getFirst())) {
+ throw new SQLExceptionInfo.Builder(
+
SQLExceptionCode.CANNOT_SET_OR_ALTER_MAX_LOOKBACK_FOR_INDEX)
+ .setMessage("Property: " + prop.getFirst())
+ .build().buildException();
+ }
// Handle when TTL property is set
if (prop.getFirst().equalsIgnoreCase(TTL)
&& (tableType != PTableType.SYSTEM
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCStreamIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCStreamIT.java
index c515425ecd..233e733a3a 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCStreamIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCStreamIT.java
@@ -18,14 +18,16 @@
package org.apache.phoenix.end2end;
+import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.coprocessor.PhoenixMasterObserver;
import org.apache.phoenix.coprocessor.TaskRegionObserver;
-import org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants;
import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
@@ -46,7 +48,6 @@ import org.junit.experimental.categories.Category;
import org.apache.phoenix.coprocessorclient.metrics.MetricsPhoenixMasterSource;
import
org.apache.phoenix.coprocessorclient.metrics.MetricsPhoenixCoprocessorSourceFactory;
-import java.io.IOException;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
@@ -57,15 +58,21 @@ import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
+import static
org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants.PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY;
import static
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CDC_STREAM_NAME;
import static
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CDC_STREAM_STATUS_NAME;
import static org.apache.phoenix.query.QueryConstants.CDC_EVENT_TYPE;
+import static org.apache.phoenix.query.QueryConstants.CDC_POST_IMAGE;
import static org.apache.phoenix.query.QueryConstants.CDC_PRE_IMAGE;
+import static
org.apache.phoenix.query.QueryConstants.CDC_TTL_DELETE_EVENT_TYPE;
import static org.apache.phoenix.query.QueryConstants.CDC_UPSERT_EVENT_TYPE;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
@Category(NeedsOwnMiniClusterTest.class)
@@ -78,7 +85,7 @@ public class CDCStreamIT extends CDCBaseIT {
@BeforeClass
public static synchronized void doSetup() throws Exception {
Map<String, String> props = Maps.newHashMapWithExpectedSize(1);
-
props.put(BaseScannerRegionObserverConstants.PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY,
Integer.toString(60*60)); // An hour
+ props.put(PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY, Integer.toString(60 *
60)); // An hour
props.put(QueryServices.USE_STATS_FOR_PARALLELIZATION,
Boolean.toString(false));
props.put(QueryServices.TASK_HANDLING_INTERVAL_MS_ATTRIB,
Long.toString(Long.MAX_VALUE));
@@ -95,6 +102,222 @@ public class CDCStreamIT extends CDCBaseIT {
.findCoprocessorEnvironment(TaskRegionObserver.class.getName());
}
+ /**
+ * Test to verify CDC events with index/table level maxLookback and TTL.
+ */
+ @Test
+ public void testCdcWithMaxLookbackAndCompaction() throws Exception {
+ Connection conn = newConnection();
+ String tableName = generateUniqueName();
+ String cdcName = generateUniqueName();
+ String cdcDdl = "CREATE CDC " + cdcName + " ON " + tableName;
+ // maxLookback 27 hr
+ conn.createStatement().execute(
+ "CREATE TABLE " + tableName + " ( k VARCHAR NOT NULL,"
+ + " v1 VARCHAR, v2 BIGINT CONSTRAINT PK PRIMARY
KEY(k))"
+ + " \"phoenix.max.lookback.age.seconds\"=97200,"
+ + "TTL='TO_NUMBER(CURRENT_TIME()) > v2',
IS_STRICT_TTL=false");
+ // cdc index should get 27 hr maxLookback (for cdc index, maxLookback
= TTL)
+ createCDC(conn, cdcDdl, null);
+
+ long expiry = (System.currentTimeMillis() + TimeUnit.DAYS.toMillis(3))
/ 1000;
+ conn.createStatement()
+ .execute("UPSERT INTO " + tableName + " VALUES('a', 'aa', " +
expiry + ")");
+ conn.commit();
+
+ String sql = "SELECT /*+ CDC_INCLUDE(PRE, POST) */ * FROM " + cdcName;
+ // verify only pre-image exists, new row insert
+ verifyOnlyPostImage(conn, sql, expiry);
+
+ ManualEnvironmentEdge injectEdge = new ManualEnvironmentEdge();
+ long t = System.currentTimeMillis() + TimeUnit.DAYS.toMillis(3);
+ t = ((t / 1000) + 100) * 1000;
+ EnvironmentEdgeManager.injectEdge(injectEdge);
+ try {
+ injectEdge.setValue(t);
+ conn.createStatement()
+ .execute("UPSERT INTO " + tableName + " VALUES('a', 'bb',
" + expiry + ")");
+ conn.commit();
+
+ // verify insert + update event of the same row
+ verifyPreAndPostImages(conn, sql, expiry);
+
+ injectEdge.incrementValue(TimeUnit.DAYS.toMillis(1));
+ TestUtil.doMajorCompaction(conn, tableName);
+ TestUtil.doMajorCompaction(conn, CDCUtil.getCDCIndexName(cdcName));
+
+ // first compaction will expire initial insert event as we are
already past
+ // (3 days + 100 sec + 24 hr), however this should not generate
ttl expired event yet,
+ // because row is still in maxLookback window (27 hr) after the
latest update
+ verifyImagesAfterFirstCompaction(conn, sql, expiry);
+
+ injectEdge.incrementValue(TimeUnit.HOURS.toMillis(3) +
TimeUnit.SECONDS.toMillis(1));
+ TestUtil.doMajorCompaction(conn, tableName);
+ TestUtil.doMajorCompaction(conn, CDCUtil.getCDCIndexName(cdcName));
+
+ // The compaction after (3 days + 100 sec + 24 hr + 3 hr + 1 sec)
should expire the
+ // row, which was last updated (24 hr + 3 hr + 1 sec) in the past.
+ // The compaction should also generate ttl expired event
+ verifyTtlExpiredPreImage(conn, sql, expiry);
+ } finally {
+ EnvironmentEdgeManager.reset();
+ }
+ }
+
+ /**
+ * Test to verify CDC events with index/table level maxLookback and TTL.
+ */
+ @Test
+ public void testCdcWithMaxLookbackAlterTable() throws Exception {
+ Connection conn = newConnection();
+ String tableName = generateUniqueName();
+ String cdcName = generateUniqueName();
+ String cdcDdl = "CREATE CDC " + cdcName + " ON " + tableName;
+ // maxLookback 27 hr
+ conn.createStatement().execute(
+ "CREATE TABLE " + tableName + " ( k VARCHAR NOT NULL,"
+ + " v1 VARCHAR, v2 BIGINT CONSTRAINT PK PRIMARY
KEY(k))"
+ + " \"phoenix.max.lookback.age.seconds\"=97200,"
+ + "TTL='TO_NUMBER(CURRENT_TIME()) > v2',
IS_STRICT_TTL=false");
+ // cdc index should get 27 hr maxLookback (for cdc index, maxLookback
= TTL)
+ createCDC(conn, cdcDdl, null);
+
+ long expiry = (System.currentTimeMillis() + TimeUnit.DAYS.toMillis(3))
/ 1000;
+ conn.createStatement()
+ .execute("UPSERT INTO " + tableName + " VALUES('a', 'aa', " +
expiry + ")");
+ conn.commit();
+
+ Admin admin =
conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin();
+
+ TableDescriptor tableDescriptor =
+
admin.getDescriptor(TableName.valueOf(CDCUtil.getCDCIndexName(cdcName)));
+ String maxLookbackVal =
tableDescriptor.getValue(PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY);
+ assertEquals(97200, Integer.parseInt(maxLookbackVal));
+
+ tableDescriptor = admin.getDescriptor(TableName.valueOf(tableName));
+ maxLookbackVal =
tableDescriptor.getValue(PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY);
+ assertEquals(97200, Integer.parseInt(maxLookbackVal));
+
+ String sql = "SELECT /*+ CDC_INCLUDE(PRE, POST) */ * FROM " + cdcName;
+ // verify only pre-image exists, new row insert
+ verifyOnlyPostImage(conn, sql, expiry);
+
+ // update maxLookback of data table and cdc index both to 50 sec
+ conn.createStatement().execute("ALTER TABLE " + tableName +
+ " SET \"phoenix.max.lookback.age.seconds\"=50");
+
+ ManualEnvironmentEdge injectEdge = new ManualEnvironmentEdge();
+ long t = System.currentTimeMillis() + TimeUnit.DAYS.toMillis(3);
+ t = ((t / 1000) + 100) * 1000;
+ EnvironmentEdgeManager.injectEdge(injectEdge);
+ try {
+ injectEdge.setValue(t);
+ conn.createStatement()
+ .execute("UPSERT INTO " + tableName + " VALUES('a', 'bb',
" + expiry + ")");
+ conn.commit();
+
+ // verify insert + update event of the same row
+ verifyPreAndPostImages(conn, sql, expiry);
+
+ injectEdge.incrementValue(TimeUnit.DAYS.toMillis(1));
+ TestUtil.doMajorCompaction(conn, CDCUtil.getCDCIndexName(cdcName));
+
+ // compaction will expire all rows from CDC index because the
maxLookback of
+ // CDC index is no longer 27 hr, but it has been altered to 50 sec
+ ResultSet rs = conn.createStatement().executeQuery(sql);
+ assertFalse(rs.next());
+
+ tableDescriptor =
+
admin.getDescriptor(TableName.valueOf(CDCUtil.getCDCIndexName(cdcName)));
+ maxLookbackVal =
tableDescriptor.getValue(PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY);
+ assertEquals(50, Integer.parseInt(maxLookbackVal));
+
+ tableDescriptor =
admin.getDescriptor(TableName.valueOf(tableName));
+ maxLookbackVal =
tableDescriptor.getValue(PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY);
+ assertEquals(50, Integer.parseInt(maxLookbackVal));
+ } finally {
+ EnvironmentEdgeManager.reset();
+ }
+ }
+
+ private static void verifyPreAndPostImages(Connection conn, String sql,
long expiry)
+ throws SQLException, JsonProcessingException {
+ ResultSet rs = conn.createStatement().executeQuery(sql);
+ assertTrue(rs.next());
+ String cdcJson = rs.getString(3);
+ Map<String, Object> map = OBJECT_MAPPER.readValue(cdcJson, Map.class);
+ assertTrue(((Map<String, Object>)map.get(CDC_PRE_IMAGE)).isEmpty());
+ assertEquals(CDC_UPSERT_EVENT_TYPE, map.get(CDC_EVENT_TYPE));
+ Map<String, Object> postImage = (Map<String, Object>)
map.get(CDC_POST_IMAGE);
+ assertEquals("aa", postImage.get("V1"));
+ assertEquals(new Long(expiry).intValue(), postImage.get("V2"));
+
+ assertTrue(rs.next());
+ cdcJson = rs.getString(3);
+ map = OBJECT_MAPPER.readValue(cdcJson, Map.class);
+ assertEquals(CDC_UPSERT_EVENT_TYPE, map.get(CDC_EVENT_TYPE));
+ Map<String, Object> preImage = (Map<String, Object>)
map.get(CDC_PRE_IMAGE);
+ assertEquals("aa", preImage.get("V1"));
+ assertEquals(new Long(expiry).intValue(), preImage.get("V2"));
+
+ postImage = (Map<String, Object>) map.get(CDC_POST_IMAGE);
+ assertEquals("bb", postImage.get("V1"));
+ assertEquals(new Long(expiry).intValue(), postImage.get("V2"));
+
+ assertFalse(rs.next());
+ }
+
+ private static void verifyImagesAfterFirstCompaction(Connection conn,
String sql, long expiry)
+ throws SQLException, JsonProcessingException {
+ ResultSet rs = conn.createStatement().executeQuery(sql);
+ assertTrue(rs.next());
+ String cdcJson = rs.getString(3);
+ Map<String, Object> map = OBJECT_MAPPER.readValue(cdcJson, Map.class);
+ assertEquals(CDC_UPSERT_EVENT_TYPE, map.get(CDC_EVENT_TYPE));
+ Map<String, Object> preImage = (Map<String, Object>)
map.get(CDC_PRE_IMAGE);
+ assertEquals("aa", preImage.get("V1"));
+ assertEquals(new Long(expiry).intValue(), preImage.get("V2"));
+
+ Map<String, Object> postImage = (Map<String, Object>)
map.get(CDC_POST_IMAGE);
+ assertEquals("bb", postImage.get("V1"));
+ assertEquals(new Long(expiry).intValue(), postImage.get("V2"));
+
+ assertFalse(rs.next());
+ }
+
+ private static void verifyOnlyPostImage(Connection conn, String sql, long
expiry)
+ throws SQLException, JsonProcessingException {
+ ResultSet rs = conn.createStatement().executeQuery(sql);
+ assertTrue(rs.next());
+ String cdcJson = rs.getString(3);
+ Map<String, Object> map = OBJECT_MAPPER.readValue(cdcJson, Map.class);
+ assertTrue(((Map<String, Object>)map.get(CDC_PRE_IMAGE)).isEmpty());
+ assertEquals(CDC_UPSERT_EVENT_TYPE, map.get(CDC_EVENT_TYPE));
+
+ Map<String, Object> postImage = (Map<String, Object>)
map.get(CDC_POST_IMAGE);
+ assertEquals("aa", postImage.get("V1"));
+ assertEquals(new Long(expiry).intValue(), postImage.get("V2"));
+
+ assertFalse(rs.next());
+ }
+
+ private static void verifyTtlExpiredPreImage(Connection conn, String sql,
long expiry)
+ throws SQLException, JsonProcessingException {
+ ResultSet rs = conn.createStatement().executeQuery(sql);
+ assertTrue(rs.next());
+ String cdcJson = rs.getString(3);
+ Map<String, Object> map = OBJECT_MAPPER.readValue(cdcJson, Map.class);
+ assertEquals(CDC_TTL_DELETE_EVENT_TYPE, map.get(CDC_EVENT_TYPE));
+ Map<String, Object> preImage = (Map<String, Object>)
map.get(CDC_PRE_IMAGE);
+ assertEquals("bb", preImage.get("V1"));
+ assertEquals(new Long(expiry).intValue(), preImage.get("V2"));
+
+ Map<String, Object> postImage = (Map<String, Object>)
map.get(CDC_POST_IMAGE);
+ assertNull(postImage);
+
+ assertFalse(rs.next());
+ }
+
@Test
public void testStreamPartitionMetadataBootstrap() throws Exception {
Connection conn = newConnection();
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/PropertiesInSyncIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/PropertiesInSyncIT.java
index fde6e3515f..aa94a4ca3e 100644
---
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/PropertiesInSyncIT.java
+++
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/PropertiesInSyncIT.java
@@ -162,6 +162,35 @@ public class PropertiesInSyncIT extends
ParallelStatsDisabledIT {
SQLExceptionCode.CANNOT_SET_OR_ALTER_PROPERTY_FOR_INDEX.getErrorCode(),
sqlE.getErrorCode());
}
}
+ try {
+ conn.createStatement().execute("create local index " +
localIndexName
+ + " on " + tableName + "(name) "
+ + "\"phoenix.max.lookback.age.seconds\"=12345");
+ fail("Should fail with SQLException when setting synced property
for a local index");
+ } catch (SQLException sqlE) {
+ assertEquals("Should fail to set synced property for a local
index",
+
SQLExceptionCode.CANNOT_SET_OR_ALTER_MAX_LOOKBACK_FOR_INDEX.getErrorCode(),
+ sqlE.getErrorCode());
+ }
+ try {
+ conn.createStatement().execute("create index " + globalIndexName
+ + " on " + tableName + "(flag) "
+ + "\"phoenix.max.lookback.age.seconds\"=12345");
+ fail("Should fail with SQLException when setting synced property
for a global index");
+ } catch (SQLException sqlE) {
+ assertEquals("Should fail to set synced property for a global
index",
+
SQLExceptionCode.CANNOT_SET_OR_ALTER_MAX_LOOKBACK_FOR_INDEX.getErrorCode(),
+ sqlE.getErrorCode());
+ }
+ try {
+ conn.createStatement().execute("create index view_index"
+ + " on " + viewName + " (flag)" +
"\"phoenix.max.lookback.age.seconds\"=12345");
+ fail("Should fail with SQLException when setting synced property
for a view index");
+ } catch (SQLException sqlE) {
+ assertEquals("Should fail to set synced property for a view index",
+
SQLExceptionCode.CANNOT_SET_OR_ALTER_MAX_LOOKBACK_FOR_INDEX.getErrorCode(),
+ sqlE.getErrorCode());
+ }
conn.close();
}
@@ -327,6 +356,15 @@ public class PropertiesInSyncIT extends
ParallelStatsDisabledIT {
SQLExceptionCode.CANNOT_SET_OR_ALTER_PROPERTY_FOR_INDEX.getErrorCode(),
sqlE.getErrorCode());
}
}
+ try {
+ conn.createStatement().execute("alter table " + globalIndexName +
" set "
+ + "\"phoenix.max.lookback.age.seconds\"=12345");
+ fail("Should fail with SQLException when altering synced property
for a global index");
+ } catch (SQLException sqlE) {
+ assertEquals("Should fail to alter synced property for a global
index",
+
SQLExceptionCode.CANNOT_SET_OR_ALTER_MAX_LOOKBACK_FOR_INDEX.getErrorCode(),
+ sqlE.getErrorCode());
+ }
conn.close();
}
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableTTLIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableTTLIT.java
index 7fdabc648b..91130c695f 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableTTLIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableTTLIT.java
@@ -583,9 +583,8 @@ public class TableTTLIT extends BaseTest {
conn.createStatement().execute("Alter Table " + tableName
+ " set \"phoenix.max.lookback.age.seconds\" = " +
tableLevelMaxLookback);
String indexName = "I_" + generateUniqueName();
- String indexDDL = String.format("create index %s on %s (val1)
include (val2, val3) " +
- "\"phoenix.max.lookback.age.seconds\" = %d",
- indexName, tableName, tableLevelMaxLookback);
+ String indexDDL = String.format("create index %s on %s (val1)
include (val2, val3) ",
+ indexName, tableName);
conn.createStatement().execute(indexDDL);
updateRow(conn, tableName, "a1");
String indexColumnValue;
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/schema/ConditionalTTLExpressionIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/schema/ConditionalTTLExpressionIT.java
index b10ec93ebd..060b11b7b9 100644
---
a/phoenix-core/src/it/java/org/apache/phoenix/schema/ConditionalTTLExpressionIT.java
+++
b/phoenix-core/src/it/java/org/apache/phoenix/schema/ConditionalTTLExpressionIT.java
@@ -838,8 +838,7 @@ public class ConditionalTTLExpressionIT extends
ParallelStatsDisabledIT {
// now create the index async
String indexName = generateUniqueName();
String fullIndexName = SchemaUtil.getTableName(schemaName,
indexName);
- String indexDDL = String.format("create index %s on %s (%s)
include (%s) async "
- + "\"phoenix.max.lookback.age.seconds\" = %d",
+ String indexDDL = String.format("create index %s on %s (%s)
include (%s) async ",
indexName, fullDataTableName, "VAL1", ttlCol,
tableLevelMaxLookback);
conn.createStatement().execute(indexDDL);
IndexTool it = IndexToolIT.runIndexTool(false, schemaName,
tableName, indexName,
@@ -949,8 +948,7 @@ public class ConditionalTTLExpressionIT extends
ParallelStatsDisabledIT {
}
String ddl = String.format(ddlTemplate, tableName,
String.format(tableDDLOptions,
retainSingleQuotes(ttlExpression)));
- String indexDDL = String.format("create index %s ON %s (col1)
INCLUDE(col2) " +
- "\"phoenix.max.lookback.age.seconds\" = %d",
+ String indexDDL = String.format("create index %s ON %s (col1)
INCLUDE(col2) ",
indexName, tableName, tableLevelMaxLookback, isStrictTTL);
try (Connection conn = DriverManager.getConnection(getUrl())) {
conn.createStatement().execute(ddl);
@@ -1303,8 +1301,7 @@ public class ConditionalTTLExpressionIT extends
ParallelStatsDisabledIT {
String ddl = String.format("CREATE TABLE %s (ID BIGINT NOT NULL
PRIMARY KEY, " +
"EVENT_TYPE CHAR(15), CREATED_TS TIMESTAMP) %s",
tableName,
String.format(tableDDLOptions, ttlExpression));
- String indexDDL = String.format("CREATE INDEX %s ON %s (EVENT_TYPE)
INCLUDE(CREATED_TS) "
- + "\"phoenix.max.lookback.age.seconds\" = %d",
+ String indexDDL = String.format("CREATE INDEX %s ON %s (EVENT_TYPE)
INCLUDE(CREATED_TS) ",
indexName, tableName, tableLevelMaxLookback);
try (Connection conn = DriverManager.getConnection(getUrl())) {
conn.createStatement().execute(ddl);
@@ -1468,11 +1465,6 @@ public class ConditionalTTLExpressionIT extends
ParallelStatsDisabledIT {
String cdcIndexName = CDCUtil.getCDCIndexName(cdcName);
String fullCdcIndexName = SchemaUtil.getTableName(schemaName,
CDCUtil.getCDCIndexName(cdcName));
- // Explicitly set table level max lookback on CDC index
- String cdcIndexSetMaxLookbackDdl = String.format("ALTER INDEX %s
ON %s ACTIVE SET "
- + "\"phoenix.max.lookback.age.seconds\" = %d",
- cdcIndexName, tableName, tableLevelMaxLookback);
- conn.createStatement().execute(cdcIndexSetMaxLookbackDdl);
PTable cdcIndex = ((PhoenixConnection)
conn).getTableNoCache(fullCdcIndexName);
assertEquals(cdcIndex.getTTLExpression(), TTL_EXPRESSION_FOREVER);
@@ -1553,11 +1545,10 @@ public class ConditionalTTLExpressionIT extends
ParallelStatsDisabledIT {
String indexName = "I_" + generateUniqueName();
String tableName = schemaBuilder.getEntityTableName();
String schema = SchemaUtil.getSchemaNameFromFullName(tableName);
- String indexDDL = String.format("create index %s on %s (%s) include
(%s) "
- + "\"phoenix.max.lookback.age.seconds\" = %d",
+ String indexDDL = String.format("create index %s on %s (%s) include
(%s) ",
indexName, tableName,
Joiner.on(",").join(indexedColumns),
- Joiner.on(",").join(includedColumns), tableLevelMaxLookback);
+ Joiner.on(",").join(includedColumns));
try (Connection conn = DriverManager.getConnection(getUrl())) {
conn.createStatement().execute(indexDDL);
}