This is an automated email from the ASF dual-hosted git repository.
vjasani pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/master by this push:
new 129fb671a0 PHOENIX-7667 Strict vs Relaxed TTL (#2225)
129fb671a0 is described below
commit 129fb671a03eb9acf8aa8ba6013cbce53dcb5a1a
Author: Viraj Jasani <[email protected]>
AuthorDate: Wed Jul 16 16:23:35 2025 -0700
PHOENIX-7667 Strict vs Relaxed TTL (#2225)
---
.../BaseScannerRegionObserverConstants.java | 1 +
.../coprocessorclient/MetaDataProtocol.java | 2 +-
.../phoenix/jdbc/PhoenixDatabaseMetaData.java | 3 +
.../phoenix/query/ConnectionQueryServicesImpl.java | 27 +--
.../org/apache/phoenix/query/QueryConstants.java | 2 +
.../org/apache/phoenix/query/QueryServices.java | 1 -
.../org/apache/phoenix/schema/DelegateTable.java | 5 +
.../org/apache/phoenix/schema/MetaDataClient.java | 63 ++++++-
.../java/org/apache/phoenix/schema/PTable.java | 7 +
.../java/org/apache/phoenix/schema/PTableImpl.java | 24 +++
.../org/apache/phoenix/schema/TableProperty.java | 20 +++
.../phoenix/schema/transform/TransformClient.java | 1 +
.../java/org/apache/phoenix/util/ScanUtil.java | 32 ++++
phoenix-core-client/src/main/protobuf/PTable.proto | 1 +
.../coprocessor/GlobalIndexRegionScanner.java | 9 +-
.../coprocessor/IndexRepairRegionScanner.java | 4 +-
.../phoenix/coprocessor/MetaDataEndpointImpl.java | 14 +-
.../phoenix/coprocessor/TTLRegionScanner.java | 3 +-
.../phoenix/hbase/index/IndexRegionObserver.java | 38 ++++-
.../phoenix/end2end/StrictTTLPropertyIT.java | 187 +++++++++++++++++++++
.../phoenix/schema/ConditionalTTLExpressionIT.java | 180 +++++++++++---------
.../index/PrepareIndexMutationsForRebuildTest.java | 24 +--
.../java/org/apache/phoenix/util/TestUtil.java | 8 +
23 files changed, 539 insertions(+), 117 deletions(-)
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/coprocessorclient/BaseScannerRegionObserverConstants.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/coprocessorclient/BaseScannerRegionObserverConstants.java
index 74f6c67026..f22e75f1a7 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/coprocessorclient/BaseScannerRegionObserverConstants.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/coprocessorclient/BaseScannerRegionObserverConstants.java
@@ -123,6 +123,7 @@ public class BaseScannerRegionObserverConstants {
public static final String MASK_PHOENIX_TTL_EXPIRED = "_MASK_TTL_EXPIRED";
public static final String DELETE_PHOENIX_TTL_EXPIRED =
"_DELETE_TTL_EXPIRED";
public static final String PHOENIX_TTL_SCAN_TABLE_NAME =
"_PhoenixTTLScanTableName";
+ public static final String IS_STRICT_TTL = "_IS_STRICT_TTL";
public static final String SCAN_ACTUAL_START_ROW = "_ScanActualStartRow";
public static final String REPLAY_WRITES = "_IGNORE_NEWER_MUTATIONS";
public final static String SCAN_OFFSET = "_RowOffset";
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/coprocessorclient/MetaDataProtocol.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/coprocessorclient/MetaDataProtocol.java
index 7d9085902b..9d5e6ecac6 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/coprocessorclient/MetaDataProtocol.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/coprocessorclient/MetaDataProtocol.java
@@ -100,7 +100,7 @@ public abstract class MetaDataProtocol extends
MetaDataService {
public static final long MIN_SYSTEM_TABLE_TIMESTAMP_4_16_0 =
MIN_TABLE_TIMESTAMP + 33;
public static final long MIN_SYSTEM_TABLE_TIMESTAMP_5_1_0 =
MIN_SYSTEM_TABLE_TIMESTAMP_4_16_0;
public static final long MIN_SYSTEM_TABLE_TIMESTAMP_5_2_0 =
MIN_TABLE_TIMESTAMP + 38;
- public static final long MIN_SYSTEM_TABLE_TIMESTAMP_5_3_0 =
MIN_TABLE_TIMESTAMP + 41;
+ public static final long MIN_SYSTEM_TABLE_TIMESTAMP_5_3_0 =
MIN_TABLE_TIMESTAMP + 42;
// MIN_SYSTEM_TABLE_TIMESTAMP needs to be set to the max of all the
MIN_SYSTEM_TABLE_TIMESTAMP_* constants
public static final long MIN_SYSTEM_TABLE_TIMESTAMP =
MIN_SYSTEM_TABLE_TIMESTAMP_5_3_0;
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
index c9cde56ca3..5a520d757a 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
@@ -412,6 +412,9 @@ public class PhoenixDatabaseMetaData implements
DatabaseMetaData {
public static final byte[] CHANGE_DETECTION_ENABLED_BYTES =
Bytes.toBytes(CHANGE_DETECTION_ENABLED);
+ public static final String IS_STRICT_TTL = "IS_STRICT_TTL";
+ public static final byte[] IS_STRICT_TTL_BYTES =
Bytes.toBytes(IS_STRICT_TTL);
+
public static final String SCHEMA_VERSION = "SCHEMA_VERSION";
public static final byte[] SCHEMA_VERSION_BYTES =
Bytes.toBytes(SCHEMA_VERSION);
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index 9d2f9c6975..9e73cde0eb 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -4551,46 +4551,51 @@ public class ConnectionQueryServicesImpl extends
DelegateQueryServices implement
if (currentServerSideTableTimeStamp <
MIN_SYSTEM_TABLE_TIMESTAMP_5_3_0) {
metaConnection =
addColumnsIfNotExists(metaConnection,
PhoenixDatabaseMetaData.SYSTEM_CATALOG,
- MIN_SYSTEM_TABLE_TIMESTAMP_5_3_0 - 7,
+ MIN_SYSTEM_TABLE_TIMESTAMP_5_3_0 - 8,
PhoenixDatabaseMetaData.PHYSICAL_TABLE_NAME + " "
+ PVarchar.INSTANCE.getSqlTypeName());
metaConnection =
addColumnsIfNotExists(metaConnection,
PhoenixDatabaseMetaData.SYSTEM_CATALOG,
- MIN_SYSTEM_TABLE_TIMESTAMP_5_3_0 - 6,
+ MIN_SYSTEM_TABLE_TIMESTAMP_5_3_0 - 7,
PhoenixDatabaseMetaData.SCHEMA_VERSION + " "
+ PVarchar.INSTANCE.getSqlTypeName());
metaConnection =
addColumnsIfNotExists(metaConnection,
PhoenixDatabaseMetaData.SYSTEM_CATALOG,
- MIN_SYSTEM_TABLE_TIMESTAMP_5_3_0 - 5,
+ MIN_SYSTEM_TABLE_TIMESTAMP_5_3_0 - 6,
PhoenixDatabaseMetaData.EXTERNAL_SCHEMA_ID + " "
+ PVarchar.INSTANCE.getSqlTypeName());
metaConnection =
addColumnsIfNotExists(metaConnection,
PhoenixDatabaseMetaData.SYSTEM_CATALOG,
- MIN_SYSTEM_TABLE_TIMESTAMP_5_3_0 - 4,
+ MIN_SYSTEM_TABLE_TIMESTAMP_5_3_0 - 5,
PhoenixDatabaseMetaData.STREAMING_TOPIC_NAME + " "
+ PVarchar.INSTANCE.getSqlTypeName());
metaConnection =
addColumnsIfNotExists(metaConnection,
PhoenixDatabaseMetaData.SYSTEM_CATALOG,
- MIN_SYSTEM_TABLE_TIMESTAMP_5_3_0 - 3,
+ MIN_SYSTEM_TABLE_TIMESTAMP_5_3_0 - 4,
PhoenixDatabaseMetaData.INDEX_WHERE + " "
+ PVarchar.INSTANCE.getSqlTypeName());
metaConnection =
- addColumnsIfNotExists(metaConnection,
PhoenixDatabaseMetaData.SYSTEM_CATALOG,
- MIN_SYSTEM_TABLE_TIMESTAMP_5_3_0 - 2,
- PhoenixDatabaseMetaData.CDC_INCLUDE_TABLE + " "
- + PVarchar.INSTANCE.getSqlTypeName());
+ addColumnsIfNotExists(metaConnection,
PhoenixDatabaseMetaData.SYSTEM_CATALOG,
+ MIN_SYSTEM_TABLE_TIMESTAMP_5_3_0 - 3,
+ PhoenixDatabaseMetaData.CDC_INCLUDE_TABLE + " "
+ + PVarchar.INSTANCE.getSqlTypeName());
/**
* TODO: Provide a path to copy existing data from PHOENIX_TTL to
TTL column and then
* to DROP PHOENIX_TTL Column. See PHOENIX-7023
*/
metaConnection = addColumnsIfNotExists(metaConnection,
- PhoenixDatabaseMetaData.SYSTEM_CATALOG,
MIN_SYSTEM_TABLE_TIMESTAMP_5_3_0 - 1,
+ PhoenixDatabaseMetaData.SYSTEM_CATALOG,
MIN_SYSTEM_TABLE_TIMESTAMP_5_3_0 - 2,
PhoenixDatabaseMetaData.TTL + " " +
PVarchar.INSTANCE.getSqlTypeName());
metaConnection = addColumnsIfNotExists(metaConnection,
- PhoenixDatabaseMetaData.SYSTEM_CATALOG,
MIN_SYSTEM_TABLE_TIMESTAMP_5_3_0,
+ PhoenixDatabaseMetaData.SYSTEM_CATALOG,
MIN_SYSTEM_TABLE_TIMESTAMP_5_3_0 - 1,
PhoenixDatabaseMetaData.ROW_KEY_MATCHER + " "
+ PVarbinary.INSTANCE.getSqlTypeName());
+ metaConnection = addColumnsIfNotExists(metaConnection,
+ PhoenixDatabaseMetaData.SYSTEM_CATALOG,
+ MIN_SYSTEM_TABLE_TIMESTAMP_5_3_0,
+ PhoenixDatabaseMetaData.IS_STRICT_TTL + " "
+ + PBoolean.INSTANCE.getSqlTypeName());
//Values in PHOENIX_TTL column will not be used for further
release as PHOENIX_TTL column is being deprecated
//and will be removed in later release. To copy
copyDataFromPhoenixTTLtoTTL(metaConnection) can be used but
//as that feature was not fully built we are not moving old value
to new column
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryConstants.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryConstants.java
index 57da865493..2030f37873 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryConstants.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryConstants.java
@@ -190,6 +190,7 @@ import static
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_INDEX_ID;
import static
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_INDEX_ID_DATA_TYPE;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_STATEMENT;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_TYPE;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IS_STRICT_TTL;
/**
*
@@ -410,6 +411,7 @@ public interface QueryConstants {
CDC_INCLUDE_TABLE + " VARCHAR, \n" +
TTL + " VARCHAR, \n" +
ROW_KEY_MATCHER + " VARBINARY_ENCODED, \n" +
+ IS_STRICT_TTL + " BOOLEAN, \n" +
// Column metadata (will be null for table row)
DATA_TYPE + " INTEGER," +
COLUMN_SIZE + " INTEGER," +
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java
index 00480c2dd4..eaefff6b6c 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -383,7 +383,6 @@ public interface QueryServices extends SQLCloseable {
public static final String WAL_EDIT_CODEC_ATTRIB =
"hbase.regionserver.wal.codec";
//Property to know whether TTL at View Level is enabled
public static final String PHOENIX_VIEW_TTL_ENABLED =
"phoenix.view.ttl.enabled";
-
public static final String PHOENIX_VIEW_TTL_TENANT_VIEWS_PER_SCAN_LIMIT =
"phoenix.view.ttl.tenant_views_per_scan.limit";
// Block mutations based on cluster role record
public static final String CLUSTER_ROLE_BASED_MUTATION_BLOCK_ENABLED =
"phoenix.cluster.role.based.mutation.block.enabled";
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/schema/DelegateTable.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/DelegateTable.java
index 062cd836f4..69cdbf870c 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/schema/DelegateTable.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/DelegateTable.java
@@ -406,6 +406,11 @@ public class DelegateTable implements PTable {
return delegate.isChangeDetectionEnabled();
}
+ @Override
+ public boolean isStrictTTL() {
+ return delegate.isStrictTTL();
+ }
+
@Override
public String getSchemaVersion() {
return delegate.getSchemaVersion();
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index 4cc466ad21..5a33d161f2 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -49,6 +49,7 @@ import static
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ASYNC_REBUILD_TIME
import static
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.AUTO_PARTITION_SEQ;
import static
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.BASE_COLUMN_COUNT;
import static
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CHANGE_DETECTION_ENABLED;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IS_STRICT_TTL;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CLASS_NAME;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_COUNT;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_DEF;
@@ -380,9 +381,10 @@ public class MetaDataClient {
INDEX_WHERE + "," +
CDC_INCLUDE_TABLE + "," +
TTL + "," +
- ROW_KEY_MATCHER +
- ") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?,
?, ?, ?, ?, ?, ?, " +
- "?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
+ ROW_KEY_MATCHER + ","
+ + IS_STRICT_TTL
+ + ") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?,
?, ?, ?, ?, ?, ?, ?, "
+ + "?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
private static final String CREATE_SCHEMA = "UPSERT INTO " +
SYSTEM_CATALOG_SCHEMA + ".\"" + SYSTEM_CATALOG_TABLE
+ "\"( " + TABLE_SCHEM + "," + TABLE_NAME + ") VALUES (?,?)";
@@ -2582,6 +2584,16 @@ public class MetaDataClient {
(Boolean)
TableProperty.CHANGE_DETECTION_ENABLED.getValue(tableProps);
verifyChangeDetectionTableType(tableType,
isChangeDetectionEnabledProp);
+ boolean isStrictTTL;
+ Boolean isStrictTTLProp =
+ (Boolean) TableProperty.IS_STRICT_TTL.getValue(tableProps);
+ if (tableType == PTableType.INDEX && parent != null) {
+ isStrictTTL = parent.isStrictTTL();
+ } else {
+ isStrictTTL =
+ isStrictTTLProp != null ? isStrictTTLProp :
PTable.DEFAULT_IS_STRICT_TTL;
+ }
+
String schemaVersion = (String)
TableProperty.SCHEMA_VERSION.getValue(tableProps);
String streamingTopicName = (String)
TableProperty.STREAMING_TOPIC_NAME.getValue(tableProps);
@@ -3669,6 +3681,8 @@ public class MetaDataClient {
tableUpsert.setBytes(37, rowKeyMatcher);
}
+ tableUpsert.setBoolean(38, isStrictTTL);
+
tableUpsert.execute();
if (asyncCreatedDate != null) {
@@ -3809,6 +3823,7 @@ public class MetaDataClient {
.setLastDDLTimestamp(result.getTable() != null ?
result.getTable().getLastDDLTimestamp() : null)
.setIsChangeDetectionEnabled(isChangeDetectionEnabledProp)
+ .setIsStrictTTL(isStrictTTL)
.setSchemaVersion(schemaVersion)
.setExternalSchemaId(result.getTable() != null ?
result.getTable().getExternalSchemaId() : null)
@@ -4230,6 +4245,7 @@ public class MetaDataClient {
.setImmutableStorageScheme(table.getImmutableStorageScheme())
.setQualifierEncodingScheme(table.getEncodingScheme())
.setUseStatsForParallelization(table.useStatsForParallelization())
+ .setIsStrictTTL(table.isStrictTTL())
.build();
tableRefs.add(new TableRef(null,
viewIndexTable, ts, false));
}
@@ -4327,6 +4343,7 @@ public class MetaDataClient {
metaPropertiesEvaluated.getUseStatsForParallelization(),
metaPropertiesEvaluated.getTTL(),
metaPropertiesEvaluated.isChangeDetectionEnabled(),
+ metaPropertiesEvaluated.isStrictTTL(),
metaPropertiesEvaluated.getPhysicalTableName(),
metaPropertiesEvaluated.getSchemaVersion(),
metaPropertiesEvaluated.getColumnEncodedBytes(),
@@ -4337,8 +4354,8 @@ public class MetaDataClient {
Long updateCacheFrequency, String
physicalTableName,
String schemaVersion,
QualifierEncodingScheme columnEncodedBytes) throws SQLException {
return incrementTableSeqNum(table, expectedType, columnCountDelta,
isTransactional, null,
- updateCacheFrequency, null, null, null, null, -1L, null, null,
null,null, false, physicalTableName,
- schemaVersion, columnEncodedBytes, null);
+ updateCacheFrequency, null, null, null, null, -1L, null, null,
null, null, false,
+ null, physicalTableName, schemaVersion, columnEncodedBytes,
null);
}
private long incrementTableSeqNum(PTable table, PTableType expectedType,
int columnCountDelta,
@@ -4346,7 +4363,8 @@ public class MetaDataClient {
Long updateCacheFrequency, Boolean isImmutableRows, Boolean
disableWAL,
Boolean isMultiTenant, Boolean storeNulls, Long guidePostWidth,
Boolean appendOnlySchema,
ImmutableStorageScheme immutableStorageScheme, Boolean
useStatsForParallelization,
- TTLExpression ttl, Boolean isChangeDetectionEnabled, String
physicalTableName,
+ TTLExpression ttl, Boolean isChangeDetectionEnabled, Boolean
isStrictTTL,
+ String physicalTableName,
String schemaVersion, QualifierEncodingScheme columnEncodedBytes,
String streamingTopicName)
throws SQLException {
@@ -4411,6 +4429,10 @@ public class MetaDataClient {
if (isChangeDetectionEnabled != null) {
mutateBooleanProperty(connection, tenantId, schemaName, tableName,
CHANGE_DETECTION_ENABLED, isChangeDetectionEnabled);
}
+ if (isStrictTTL != null) {
+ mutateBooleanProperty(connection, tenantId, schemaName, tableName,
IS_STRICT_TTL,
+ isStrictTTL);
+ }
if (!Strings.isNullOrEmpty(physicalTableName)) {
mutateStringProperty(connection, tenantId, schemaName, tableName,
PHYSICAL_TABLE_NAME, physicalTableName);
}
@@ -5053,6 +5075,7 @@ public class MetaDataClient {
.setImmutableStorageScheme(table.getImmutableStorageScheme())
.setQualifierEncodingScheme(table.getEncodingScheme())
.setUseStatsForParallelization(table.useStatsForParallelization())
+ .setIsStrictTTL(table.isStrictTTL())
.build();
List<TableRef> tableRefs =
Collections.singletonList(new TableRef(null, viewIndexTable, ts, false));
MutationPlan plan = new
PostDDLCompiler(connection).compile(tableRefs, null, null,
@@ -5508,6 +5531,7 @@ public class MetaDataClient {
QualifierEncodingScheme.NON_ENCODED_QUALIFIERS : qualifierEncodingScheme)
.setEncodedCQCounter(table.getEncodedCQCounter())
.setUseStatsForParallelization(table.useStatsForParallelization())
+ .setIsStrictTTL(table.isStrictTTL())
.setExcludedColumns(ImmutableList.<PColumn>of())
.setTenantId(sharedTableState.getTenantId())
.setSchemaName(sharedTableState.getSchemaName())
@@ -6034,6 +6058,8 @@ public class MetaDataClient {
metaProperties.setSchemaVersion((String) value);
} else if
(propName.equalsIgnoreCase(STREAMING_TOPIC_NAME)) {
metaProperties.setStreamingTopicName((String) value);
+ } else if (propName.equalsIgnoreCase(IS_STRICT_TTL)) {
+ metaProperties.setStrictTTL((Boolean) value);
}
}
// if removeTableProps is true only add the property if it is
not an HTable or Phoenix Table property
@@ -6267,6 +6293,13 @@ public class MetaDataClient {
}
}
+ if (metaProperties.isStrictTTL() != null) {
+ if (!metaProperties.isStrictTTL().equals(table.isStrictTTL())) {
+
metaPropertiesEvaluated.setStrictTTL(metaProperties.isStrictTTL());
+ changingPhoenixTableProperty = true;
+ }
+ }
+
return changingPhoenixTableProperty;
}
@@ -6287,6 +6320,7 @@ public class MetaDataClient {
private boolean nonTxToTx = false;
private TTLExpression ttl = null;
private Boolean isChangeDetectionEnabled = null;
+ private Boolean isStrictTTL = null;
private String physicalTableName = null;
private String schemaVersion = null;
private String streamingTopicName = null;
@@ -6442,6 +6476,14 @@ public class MetaDataClient {
public void setStreamingTopicName(String streamingTopicName) {
this.streamingTopicName = streamingTopicName;
}
+
+ public Boolean isStrictTTL() {
+ return isStrictTTL;
+ }
+
+ public void setStrictTTL(Boolean isStrictTTL) {
+ this.isStrictTTL = isStrictTTL;
+ }
}
private static class MetaPropertiesEvaluated {
@@ -6459,6 +6501,7 @@ public class MetaDataClient {
private TransactionFactory.Provider transactionProvider = null;
private TTLExpression ttl = null;
private Boolean isChangeDetectionEnabled = null;
+ private Boolean isStrictTTL = null;
private String physicalTableName = null;
private String schemaVersion = null;
private String streamingTopicName = null;
@@ -6591,6 +6634,14 @@ public class MetaDataClient {
public void setStreamingTopicName(String streamingTopicName) {
this.streamingTopicName = streamingTopicName;
}
+
+ public Boolean isStrictTTL() {
+ return isStrictTTL;
+ }
+
+ public void setStrictTTL(Boolean isStrictTTL) {
+ this.isStrictTTL = isStrictTTL;
+ }
}
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/schema/PTable.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/PTable.java
index 2e82286466..42f47829d8 100644
--- a/phoenix-core-client/src/main/java/org/apache/phoenix/schema/PTable.java
+++ b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/PTable.java
@@ -64,6 +64,7 @@ public interface PTable extends PMetaDataEntity {
public static final boolean DEFAULT_DISABLE_WAL = false;
public static final boolean DEFAULT_IMMUTABLE_ROWS = false;
static final Integer NO_SALTING = -1;
+ boolean DEFAULT_IS_STRICT_TTL = true;
public enum ViewType {
MAPPED((byte)1),
@@ -965,6 +966,12 @@ public interface PTable extends PMetaDataEntity {
*/
boolean isChangeDetectionEnabled();
+ /**
+ * @return Whether strict TTL mode is enabled on a given table. True if
the TTL is strict,
+ * false if relaxed.
+ */
+ boolean isStrictTTL();
+
/**
* @return User-provided string identifying the application version that
last created or modified this schema
* object. Used only on tables, views, and indexes.
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/schema/PTableImpl.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/PTableImpl.java
index 6d13c7c1a2..523c15c31d 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/schema/PTableImpl.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/PTableImpl.java
@@ -29,6 +29,7 @@ import static
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ENCODING_SCHEME;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IMMUTABLE_ROWS;
import static
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DEFAULT_COLUMN_FAMILY_NAME;
import static
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IMMUTABLE_STORAGE_SCHEME;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IS_STRICT_TTL;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.INDEX_STATE;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.MULTI_TENANT;
import static
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.PHYSICAL_TABLE_NAME;
@@ -212,6 +213,7 @@ public class PTableImpl implements PTable {
private final BitSet viewModifiedPropSet;
private final Long lastDDLTimestamp;
private final boolean isChangeDetectionEnabled;
+ private final boolean isStrictTTL;
private Map<String, String> propertyValues;
private String schemaVersion;
private String externalSchemaId;
@@ -281,6 +283,7 @@ public class PTableImpl implements PTable {
private Boolean useStatsForParallelization;
private Long lastDDLTimestamp;
private boolean isChangeDetectionEnabled = false;
+ private boolean isStrictTTL = DEFAULT_IS_STRICT_TTL;
private Map<String, String> propertyValues = new HashMap<>();
private String schemaVersion;
private String externalSchemaId;
@@ -728,6 +731,13 @@ public class PTableImpl implements PTable {
return this;
}
+ public Builder setIsStrictTTL(Boolean isStrictTTL) {
+ if (isStrictTTL != null) {
+ this.isStrictTTL = isStrictTTL;
+ }
+ return this;
+ }
+
/**
* Populate derivable attributes of the PTable
* @return PTableImpl.Builder object
@@ -1014,6 +1024,7 @@ public class PTableImpl implements PTable {
this.propertyValues = builder.propertyValues;
this.lastDDLTimestamp = builder.lastDDLTimestamp;
this.isChangeDetectionEnabled = builder.isChangeDetectionEnabled;
+ this.isStrictTTL = builder.isStrictTTL;
this.schemaVersion = builder.schemaVersion;
this.externalSchemaId = builder.externalSchemaId;
this.streamingTopicName = builder.streamingTopicName;
@@ -1095,6 +1106,7 @@ public class PTableImpl implements PTable {
.setViewModifiedUpdateCacheFrequency(table.hasViewModifiedUpdateCacheFrequency())
.setLastDDLTimestamp(table.getLastDDLTimestamp())
.setIsChangeDetectionEnabled(table.isChangeDetectionEnabled())
+ .setIsStrictTTL(table.isStrictTTL())
.setSchemaVersion(table.getSchemaVersion())
.setExternalSchemaId(table.getExternalSchemaId())
.setStreamingTopicName(table.getStreamingTopicName())
@@ -2039,6 +2051,10 @@ public class PTableImpl implements PTable {
if (table.hasChangeDetectionEnabled()) {
isChangeDetectionEnabled = table.getChangeDetectionEnabled();
}
+ boolean isStrictTTL = DEFAULT_IS_STRICT_TTL;
+ if (table.hasIsStrictTTL()) {
+ isStrictTTL = table.getIsStrictTTL();
+ }
String schemaVersion = null;
if (table.hasSchemaVersion()) {
schemaVersion = (String)
PVarchar.INSTANCE.toObject(table.getSchemaVersion().toByteArray());
@@ -2125,6 +2141,7 @@ public class PTableImpl implements PTable {
.setViewModifiedUseStatsForParallelization(viewModifiedUseStatsForParallelization)
.setLastDDLTimestamp(lastDDLTimestamp)
.setIsChangeDetectionEnabled(isChangeDetectionEnabled)
+ .setIsStrictTTL(isStrictTTL)
.setSchemaVersion(schemaVersion)
.setExternalSchemaId(externalSchemaId)
.setStreamingTopicName(streamingTopicName)
@@ -2279,6 +2296,7 @@ public class PTableImpl implements PTable {
builder.setLastDDLTimestamp(table.getLastDDLTimestamp());
}
builder.setChangeDetectionEnabled(table.isChangeDetectionEnabled());
+ builder.setIsStrictTTL(table.isStrictTTL());
if (table.getSchemaVersion() != null) {
builder.setSchemaVersion(ByteStringer.wrap(PVarchar.INSTANCE.toBytes(table.getSchemaVersion())));
}
@@ -2456,6 +2474,11 @@ public class PTableImpl implements PTable {
return indexWhere;
}
+ @Override
+ public boolean isStrictTTL() {
+ return isStrictTTL;
+ }
+
@Override
public Map<PTableKey, Long> getAncestorLastDDLTimestampMap() {
return ancestorLastDDLTimestampMap;
@@ -2586,6 +2609,7 @@ public class PTableImpl implements PTable {
Map<String, String> map = new HashMap<>();
map.put(DISABLE_WAL, String.valueOf(DEFAULT_DISABLE_WAL));
map.put(IMMUTABLE_ROWS, String.valueOf(DEFAULT_IMMUTABLE_ROWS));
+ map.put(IS_STRICT_TTL, String.valueOf(DEFAULT_IS_STRICT_TTL));
map.put(TRANSACTION_PROVIDER, DEFAULT_TRANSACTION_PROVIDER);
map.put(IMMUTABLE_STORAGE_SCHEME, DEFAULT_IMMUTABLE_STORAGE_SCHEME);
map.put(COLUMN_ENCODED_BYTES,
String.valueOf(DEFAULT_COLUMN_ENCODED_BYTES));
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/schema/TableProperty.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/TableProperty.java
index 2b42a3fd13..6856a7991c 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/schema/TableProperty.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/TableProperty.java
@@ -330,6 +330,26 @@ public enum TableProperty {
public Object getPTableValue(PTable table) {
return table.getCDCIncludeScopes();
}
+ },
+
+ IS_STRICT_TTL(PhoenixDatabaseMetaData.IS_STRICT_TTL, true, true, true) {
+
+ @Override
+ public Object getValue(Object value) {
+ if (value == null) {
+ return null;
+ } else if (value instanceof Boolean) {
+ return value;
+ } else {
+ throw new IllegalArgumentException("IS_STRICT_TTL property can
only be"
+ + " of type Boolean");
+ }
+ }
+
+ @Override
+ public Object getPTableValue(PTable table) {
+ return table.isStrictTTL();
+ }
};
private final String propertyName;
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/schema/transform/TransformClient.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/transform/TransformClient.java
index 60875afde0..198d5f1837 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/schema/transform/TransformClient.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/transform/TransformClient.java
@@ -269,6 +269,7 @@ public class TransformClient {
.setUseStatsForParallelization(table.useStatsForParallelization())
.setSchemaVersion(table.getSchemaVersion())
.setIsChangeDetectionEnabled(table.isChangeDetectionEnabled())
+ .setIsStrictTTL(table.isStrictTTL())
.setStreamingTopicName(table.getStreamingTopicName())
// Transformables
.setImmutableStorageScheme(
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/util/ScanUtil.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/util/ScanUtil.java
index 33724affec..bd33e4f45d 100644
--- a/phoenix-core-client/src/main/java/org/apache/phoenix/util/ScanUtil.java
+++ b/phoenix-core-client/src/main/java/org/apache/phoenix/util/ScanUtil.java
@@ -112,6 +112,7 @@ import
org.apache.phoenix.schema.transform.TransformMaintainer;
import org.apache.phoenix.schema.transform.TransformClient;
import org.apache.phoenix.schema.tuple.ResultTuple;
import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.schema.types.PBoolean;
import org.apache.phoenix.schema.types.PDataType;
import org.apache.phoenix.schema.types.PVarbinary;
import org.slf4j.Logger;
@@ -1230,6 +1231,22 @@ public class ScanUtil {
&& scan.getAttribute(BaseScannerRegionObserverConstants.TTL)
!= null;
}
+ public static boolean isStrictTTL(Scan scan) {
+ byte[] isStrictTTLBytes =
+
scan.getAttribute(BaseScannerRegionObserverConstants.IS_STRICT_TTL);
+ if (isStrictTTLBytes != null) {
+ try {
+ return (Boolean) PBoolean.INSTANCE.toObject(isStrictTTLBytes);
+ } catch (Exception e) {
+ LOGGER.error(
+ "Unable to parse isStrictTTL bytes, use default value
for strict TTL {}",
+ PTable.DEFAULT_IS_STRICT_TTL, e);
+ return PTable.DEFAULT_IS_STRICT_TTL;
+ }
+ }
+ return PTable.DEFAULT_IS_STRICT_TTL;
+ }
+
public static boolean isEmptyColumn(Cell cell, byte[] emptyCF, byte[]
emptyCQ) {
return CellUtil.matchingFamily(cell, emptyCF, 0, emptyCF.length) &&
CellUtil.matchingQualifier(cell, emptyCQ, 0, emptyCQ.length);
@@ -1436,6 +1453,11 @@ public class ScanUtil {
public static void setScanAttributesForPhoenixTTL(Scan scan, PTable table,
PhoenixConnection phoenixConnection) throws SQLException {
+ if (!table.isStrictTTL()) {
+ scan.setAttribute(BaseScannerRegionObserverConstants.IS_STRICT_TTL,
+ PBoolean.INSTANCE.toBytes(table.isStrictTTL()));
+ }
+
//If entity is a view and phoenix.view.ttl.enabled is false then don't
set TTL scan attribute.
if ((table.getType() == PTableType.VIEW) &&
!phoenixConnection.getQueryServices().getConfiguration().getBoolean(
QueryServices.PHOENIX_VIEW_TTL_ENABLED,
@@ -1481,6 +1503,12 @@ public class ScanUtil {
return;
}
}
+
+ if (!dataTable.isStrictTTL()) {
+ scan.setAttribute(BaseScannerRegionObserverConstants.IS_STRICT_TTL,
+ PBoolean.INSTANCE.toBytes(dataTable.isStrictTTL()));
+ }
+
// we want to compile the expression every time we pass it as a scan
attribute. This is
// needed so that any stateless expressions like CURRENT_TIME() are
always evaluated.
// Otherwise, we can cache stale values and keep reusing the stale
values which can give
@@ -1788,6 +1816,10 @@ public class ScanUtil {
byte[] ttl = ttlExpr.serialize();
for (Mutation mutation : mutations) {
mutation.setAttribute(BaseScannerRegionObserverConstants.TTL, ttl);
+ if (!table.isStrictTTL()) {
+
mutation.setAttribute(BaseScannerRegionObserverConstants.IS_STRICT_TTL,
+ PBoolean.INSTANCE.toBytes(table.isStrictTTL()));
+ }
}
}
diff --git a/phoenix-core-client/src/main/protobuf/PTable.proto
b/phoenix-core-client/src/main/protobuf/PTable.proto
index f4cafbb303..43f5315114 100644
--- a/phoenix-core-client/src/main/protobuf/PTable.proto
+++ b/phoenix-core-client/src/main/protobuf/PTable.proto
@@ -123,6 +123,7 @@ message PTable {
optional string CDCIncludeScopes=54;
optional bytes ttl = 55;
optional bytes rowKeyMatcher = 56;
+ optional bool isStrictTTL = 57;
}
message EncodedCQCounter {
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java
index eb8f1eac20..b2f65e2d07 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java
@@ -171,6 +171,7 @@ public abstract class GlobalIndexRegionScanner extends
BaseRegionScanner {
protected byte[] tableType;
protected byte[] lastDdlTimestamp;
private final CompiledTTLExpression ttlExpression;
+ private final boolean isTTLStrict;
// This relies on Hadoop Configuration to handle warning about deprecated
configs and
// to set the correct non-deprecated configs when an old one shows up.
@@ -219,6 +220,7 @@ public abstract class GlobalIndexRegionScanner extends
BaseRegionScanner {
lastDdlTimestamp = scan.getAttribute(
MutationState.MutationMetadataType.TIMESTAMP.toString());
ttlExpression = ScanUtil.getTTLExpression(scan);
+ isTTLStrict = ScanUtil.isStrictTTL(scan);
byte[] transforming =
scan.getAttribute(BaseScannerRegionObserverConstants.DO_TRANSFORMING);
List<IndexMaintainer> maintainers = null;
if (transforming == null) {
@@ -1257,7 +1259,8 @@ public abstract class GlobalIndexRegionScanner extends
BaseRegionScanner {
Put dataPut,
Delete dataDel,
byte[] encodedRegionName,
- CompiledTTLExpression ttlExpr) throws IOException {
+ CompiledTTLExpression ttlExpr,
+ boolean isTTLStrict) throws IOException {
boolean isCondTTL = ttlExpr instanceof
CompiledConditionalTTLExpression;
List<Mutation> dataMutations = getMutationsWithSameTS(dataPut,
dataDel);
List<Mutation> indexMutations =
Lists.newArrayListWithExpectedSize(dataMutations.size());
@@ -1270,7 +1273,7 @@ public abstract class GlobalIndexRegionScanner extends
BaseRegionScanner {
byte[] indexRowKeyForCurrentDataRow = null;
int dataMutationListSize = dataMutations.size();
for (int i = 0; i < dataMutationListSize; i++) {
- if (isCondTTL && currentDataRowState != null) {
+ if (isCondTTL && currentDataRowState != null && isTTLStrict) {
CompiledConditionalTTLExpression condExpr =
(CompiledConditionalTTLExpression) ttlExpr;
List<Cell> currentRow = flattenCells(currentDataRowState);
@@ -1432,7 +1435,7 @@ public abstract class GlobalIndexRegionScanner extends
BaseRegionScanner {
List<Mutation> indexMutations;
indexMutations = prepareIndexMutationsForRebuild(indexMaintainer, put,
del,
- region.getRegionInfo().getEncodedNameAsBytes(), ttlExpression);
+ region.getRegionInfo().getEncodedNameAsBytes(), ttlExpression,
isTTLStrict);
Collections.reverse(indexMutations);
boolean mostRecentDone = false;
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/IndexRepairRegionScanner.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/IndexRepairRegionScanner.java
index b270fccd07..c3c07d9067 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/IndexRepairRegionScanner.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/IndexRepairRegionScanner.java
@@ -88,6 +88,7 @@ public class IndexRepairRegionScanner extends
GlobalIndexRegionScanner {
private static final Logger LOGGER =
LoggerFactory.getLogger(IndexRepairRegionScanner.class);
private CompiledTTLExpression dataTableTTLExpr;
+ private boolean isTTLStrict;
public IndexRepairRegionScanner(final RegionScanner innerScanner,
final Region region,
@@ -110,6 +111,7 @@ public class IndexRepairRegionScanner extends
GlobalIndexRegionScanner {
QueryUtil.getConnectionOnServer(config).unwrap(PhoenixConnection.class)) {
PTable dataTable = conn.getTableNoCache(tenant, tableName);
dataTableTTLExpr = dataTable.getCompiledTTLExpression(conn);
+ isTTLStrict = ScanUtil.isStrictTTL(scan);
} catch (SQLException e) {
LOGGER.error(
"Unable to get PTable for the data table {}:{}", tenant,
tableName, e);
@@ -139,7 +141,7 @@ public class IndexRepairRegionScanner extends
GlobalIndexRegionScanner {
}
}
List<Mutation> indexMutations =
prepareIndexMutationsForRebuild(indexMaintainer, put, del,
- null, dataTableTTLExpr);
+ null, dataTableTTLExpr, isTTLStrict);
Collections.reverse(indexMutations);
for (Mutation mutation : indexMutations) {
byte[] indexRowKey = mutation.getRow();
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
index 5d3fd6e5ba..4f47189f23 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
@@ -23,6 +23,7 @@ import static
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.APPEND_ONLY_SCHEMA
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ARRAY_SIZE_BYTES;
import static
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.AUTO_PARTITION_SEQ_BYTES;
import static
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CHANGE_DETECTION_ENABLED_BYTES;
+import static
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IS_STRICT_TTL_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CLASS_NAME_BYTES;
import static
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_COUNT_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_DEF_BYTES;
@@ -399,6 +400,8 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
TABLE_FAMILY_BYTES, TTL_BYTES);
private static final Cell ROW_KEY_MATCHER_KV =
createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY,
TABLE_FAMILY_BYTES, ROW_KEY_MATCHER_BYTES);
+ private static final Cell IS_STRICT_TTL_KV =
+ createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES,
IS_STRICT_TTL_BYTES);
private static final List<Cell> TABLE_KV_COLUMNS = Lists.newArrayList(
EMPTY_KEYVALUE_KV,
@@ -440,7 +443,8 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
INDEX_WHERE_KV,
CDC_INCLUDE_KV,
TTL_KV,
- ROW_KEY_MATCHER_KV
+ ROW_KEY_MATCHER_KV,
+ IS_STRICT_TTL_KV
);
static {
@@ -491,6 +495,7 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
TABLE_KV_COLUMNS.indexOf(INDEX_WHERE_KV);
private static final int TTL_INDEX = TABLE_KV_COLUMNS.indexOf(TTL_KV);
private static final int ROW_KEY_MATCHER_INDEX =
TABLE_KV_COLUMNS.indexOf(ROW_KEY_MATCHER_KV);
+ private static final int IS_STRICT_TTL_INDEX =
TABLE_KV_COLUMNS.indexOf(IS_STRICT_TTL_KV);
// KeyValues for Column
private static final KeyValue DECIMAL_DIGITS_KV =
createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES,
DECIMAL_DIGITS_BYTES);
private static final KeyValue COLUMN_SIZE_KV =
createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES,
COLUMN_SIZE_BYTES);
@@ -1500,6 +1505,13 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
builder.setRowKeyMatcher(rowKeyMatcher != null ? rowKeyMatcher
: oldTable != null ? oldTable.getRowKeyMatcher() :
HConstants.EMPTY_BYTE_ARRAY);
+ Cell isStrictTTLKv = tableKeyValues[IS_STRICT_TTL_INDEX];
+ boolean isStrictTTL = isStrictTTLKv != null
+ &&
Boolean.TRUE.equals(PBoolean.INSTANCE.toObject(isStrictTTLKv.getValueArray(),
+ isStrictTTLKv.getValueOffset(),
+ isStrictTTLKv.getValueLength()));
+ builder.setIsStrictTTL(isStrictTTLKv != null ? isStrictTTL
+ : oldTable == null || oldTable.isStrictTTL());
// Check the cell tag to see whether the view has modified this
property
final byte[] tagUseStatsForParallelization =
(useStatsForParallelizationKv == null) ?
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/TTLRegionScanner.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/TTLRegionScanner.java
index f516b1ad41..a56ba24787 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/TTLRegionScanner.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/TTLRegionScanner.java
@@ -104,7 +104,8 @@ public class TTLRegionScanner extends BaseRegionScanner {
// be done here. We also disable masking when TTL is
HConstants.FOREVER.
isMaskingEnabled = emptyCF != null && emptyCQ != null
&& !ttlExpression.equals(TTL_EXPRESSION_FOREVER)
- && (isPhoenixCompactionEnabled(env.getConfiguration()));
+ && (isPhoenixCompactionEnabled(env.getConfiguration()))
+ && ScanUtil.isStrictTTL(scan);
}
private void init() throws IOException {
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
index e1a0f1d54a..9f2d3b1cba 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
@@ -98,6 +98,7 @@ import org.apache.phoenix.schema.TTLExpressionFactory;
import org.apache.phoenix.schema.transform.TransformMaintainer;
import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
import org.apache.phoenix.schema.types.PInteger;
+import org.apache.phoenix.schema.types.PBoolean;
import org.apache.phoenix.schema.types.PVarbinary;
import org.apache.phoenix.thirdparty.com.google.common.base.Preconditions;
import
org.apache.phoenix.thirdparty.com.google.common.collect.ArrayListMultimap;
@@ -563,7 +564,8 @@ public class IndexRegionObserver implements
RegionCoprocessor, RegionObserver {
for (int i = 0; i < miniBatchOp.size(); i++) {
Mutation m = miniBatchOp.getOperation(i);
if (this.builder.isAtomicOp(m) || context.returnResult
- || this.builder.isEnabled(m) ||
this.builder.hasConditionalTTL(m)) {
+ || this.builder.isEnabled(m)
+ || (this.builder.hasConditionalTTL(m) &&
isStrictTTLEnabled(miniBatchOp))) {
ImmutableBytesPtr row = new ImmutableBytesPtr(m.getRow());
context.rowsToLock.add(row);
}
@@ -659,6 +661,10 @@ public class IndexRegionObserver implements
RegionCoprocessor, RegionObserver {
private void updateMutationsForConditionalTTL(
MiniBatchOperationInProgress<Mutation> miniBatchOp,
BatchMutateContext context) throws IOException {
+ // If TTL is not strict, skip conditional TTL processing
+ if (!isStrictTTLEnabled(miniBatchOp)) {
+ return;
+ }
// mapping from row key to indices in mini batch
Map<ImmutableBytesPtr, List<Integer>> expiredVersions =
Maps.newHashMap();
Set<ImmutableBytesPtr> notExpiredVersions = Sets.newHashSet();
@@ -792,7 +798,8 @@ public class IndexRegionObserver implements
RegionCoprocessor, RegionObserver {
}
public static void setTimestamps(MiniBatchOperationInProgress<Mutation>
miniBatchOp,
- IndexBuildManager builder, long ts)
throws IOException {
+ IndexBuildManager builder, long ts,
boolean isTTLStrict)
+ throws IOException {
for (Integer i = 0; i < miniBatchOp.size(); i++) {
if (isAtomicOperationComplete(miniBatchOp.getOperationStatus(i))) {
continue;
@@ -801,7 +808,8 @@ public class IndexRegionObserver implements
RegionCoprocessor, RegionObserver {
// skip this mutation if we aren't enabling indexing or
Conditional TTL
// or not an atomic op or if it is an atomic op
// and its timestamp is already set(not LATEST)
- if (!builder.isEnabled(m) && !builder.hasConditionalTTL(m)
+ // Also, skip conditional TTL if TTL is not strict
+ if (!builder.isEnabled(m) && (!builder.hasConditionalTTL(m) ||
!isTTLStrict)
&& !((builder.isAtomicOp(m) || builder.returnResult(m))
&& IndexUtil.getMaxTimestamp(m) ==
HConstants.LATEST_TIMESTAMP)) {
continue;
@@ -849,6 +857,26 @@ public class IndexRegionObserver implements
RegionCoprocessor, RegionObserver {
}
}
+ /**
+ * Checks if strict TTL mode is enabled in mutation attributes.
+ * Falls back to default value if no attribute is found.
+ */
+ private boolean isStrictTTLEnabled(MiniBatchOperationInProgress<Mutation>
miniBatchOp) {
+ for (int i = 0; i < miniBatchOp.size(); i++) {
+ Mutation m = miniBatchOp.getOperation(i);
+ byte[] isStrictTTLBytes =
+
m.getAttribute(BaseScannerRegionObserverConstants.IS_STRICT_TTL);
+ if (isStrictTTLBytes != null) {
+ try {
+ return (Boolean)
PBoolean.INSTANCE.toObject(isStrictTTLBytes);
+ } catch (Exception e) {
+ break;
+ }
+ }
+ }
+ return PTable.DEFAULT_IS_STRICT_TTL;
+ }
+
/**
* This method returns true if the pending delete mutation needs to be
applied
* and false f the delete mutation can be ignored for example in the case
of
@@ -1328,7 +1356,7 @@ public class IndexRegionObserver implements
RegionCoprocessor, RegionObserver {
context.returnOldRow = true;
}
}
- if (this.builder.hasConditionalTTL(m)) {
+ if (this.builder.hasConditionalTTL(m) &&
isStrictTTLEnabled(miniBatchOp)) {
context.hasConditionalTTL = true;
}
if (this.builder.isAtomicOp(m) || this.builder.returnResult(m)) {
@@ -1530,7 +1558,7 @@ public class IndexRegionObserver implements
RegionCoprocessor, RegionObserver {
long batchTimestamp = getBatchTimestamp(context, table);
// Update the timestamps of the data table mutations to prevent
overlapping timestamps
// (which prevents index inconsistencies as this case is not handled).
- setTimestamps(miniBatchOp, builder, batchTimestamp);
+ setTimestamps(miniBatchOp, builder, batchTimestamp,
isStrictTTLEnabled(miniBatchOp));
if (context.hasGlobalIndex || context.hasUncoveredIndex ||
context.hasTransform) {
// Prepare next data rows states for pending mutations (for global
indexes)
prepareDataRowStates(c, miniBatchOp, context, batchTimestamp);
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/StrictTTLPropertyIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/StrictTTLPropertyIT.java
new file mode 100644
index 0000000000..83a13c5e86
--- /dev/null
+++
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/StrictTTLPropertyIT.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.end2end;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.util.SchemaUtil;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Tests for Strict and Relaxed TTL property.
+ */
+@Category(NeedsOwnMiniClusterTest.class)
+public class StrictTTLPropertyIT extends ParallelStatsDisabledIT {
+
+ @Test
+ public void testCreateTableWithStrictTTLDefault() throws Exception {
+ String schemaName = generateUniqueName();
+ String tableName = generateUniqueName();
+ String fullTableName = SchemaUtil.getTableName(schemaName, tableName);
+
+ try (PhoenixConnection conn = (PhoenixConnection)
DriverManager.getConnection(getUrl())) {
+ // Create table without specifying IS_STRICT_TTL - should default
to true
+ String ddl = "CREATE TABLE " + fullTableName +
+ " (id char(1) NOT NULL," +
+ " col1 integer NOT NULL," +
+ " col2 bigint NOT NULL," +
+ " CONSTRAINT NAME_PK PRIMARY KEY (id, col1, col2))";
+ conn.createStatement().execute(ddl);
+
+ PTable table = conn.getTableNoCache(fullTableName);
+ assertTrue("IS_STRICT_TTL should default to true",
table.isStrictTTL());
+ }
+ }
+
+ @Test
+ public void testCreateTableWithStrictTTLTrue() throws Exception {
+ String schemaName = generateUniqueName();
+ String tableName = generateUniqueName();
+ String fullTableName = SchemaUtil.getTableName(schemaName, tableName);
+
+ try (PhoenixConnection conn = (PhoenixConnection)
DriverManager.getConnection(getUrl())) {
+ // Create table with IS_STRICT_TTL=true
+ String ddl = "CREATE TABLE " + fullTableName +
+ " (id char(1) NOT NULL," +
+ " col1 integer NOT NULL," +
+ " col2 bigint NOT NULL," +
+ " CONSTRAINT NAME_PK PRIMARY KEY (id, col1, col2))" +
+ " IS_STRICT_TTL=true";
+ conn.createStatement().execute(ddl);
+
+ PTable table = conn.getTableNoCache(fullTableName);
+ assertTrue("IS_STRICT_TTL should be true", table.isStrictTTL());
+ }
+ }
+
+ @Test
+ public void testCreateTableWithStrictTTLFalse() throws Exception {
+ String schemaName = generateUniqueName();
+ String tableName = generateUniqueName();
+ String fullTableName = SchemaUtil.getTableName(schemaName, tableName);
+
+ try (PhoenixConnection conn = (PhoenixConnection)
DriverManager.getConnection(getUrl())) {
+ // Create table with IS_STRICT_TTL=false
+ String ddl = "CREATE TABLE " + fullTableName +
+ " (id char(1) NOT NULL," +
+ " col1 integer NOT NULL," +
+ " col2 bigint NOT NULL," +
+ " CONSTRAINT NAME_PK PRIMARY KEY (id, col1, col2))" +
+ " IS_STRICT_TTL=false";
+ conn.createStatement().execute(ddl);
+
+ PTable table = conn.getTableNoCache(fullTableName);
+ assertFalse("IS_STRICT_TTL should be false", table.isStrictTTL());
+ }
+ }
+
+ @Test
+ public void testAlterTableSetStrictTTLTrue() throws Exception {
+ String schemaName = generateUniqueName();
+ String tableName = generateUniqueName();
+ String fullTableName = SchemaUtil.getTableName(schemaName, tableName);
+
+ try (PhoenixConnection conn = (PhoenixConnection)
DriverManager.getConnection(getUrl())) {
+ // Create table with IS_STRICT_TTL=false
+ String createDdl = "CREATE TABLE " + fullTableName +
+ " (id char(1) NOT NULL," +
+ " col1 integer NOT NULL," +
+ " col2 bigint NOT NULL," +
+ " CONSTRAINT NAME_PK PRIMARY KEY (id, col1, col2))" +
+ " IS_STRICT_TTL=false";
+ conn.createStatement().execute(createDdl);
+
+ PTable table = conn.getTableNoCache(fullTableName);
+ assertFalse("IS_STRICT_TTL should be false initially",
table.isStrictTTL());
+
+ // Alter table to set IS_STRICT_TTL=true
+ String alterDdl = "ALTER TABLE " + fullTableName + " SET
IS_STRICT_TTL=true";
+ conn.createStatement().execute(alterDdl);
+
+ PTable alteredTable = conn.getTableNoCache(fullTableName);
+ assertTrue("IS_STRICT_TTL should be true after ALTER",
alteredTable.isStrictTTL());
+ }
+ }
+
+ @Test
+ public void testAlterTableSetStrictTTLFalse() throws Exception {
+ String schemaName = generateUniqueName();
+ String tableName = generateUniqueName();
+ String fullTableName = SchemaUtil.getTableName(schemaName, tableName);
+
+ try (PhoenixConnection conn = (PhoenixConnection)
DriverManager.getConnection(getUrl())) {
+ // Create table with default IS_STRICT_TTL (should be true)
+ String createDdl = "CREATE TABLE " + fullTableName +
+ " (id char(1) NOT NULL," +
+ " col1 integer NOT NULL," +
+ " col2 bigint NOT NULL," +
+ " CONSTRAINT NAME_PK PRIMARY KEY (id, col1, col2))";
+ conn.createStatement().execute(createDdl);
+
+ PTable table = conn.getTableNoCache(fullTableName);
+ assertTrue("IS_STRICT_TTL should be true initially",
table.isStrictTTL());
+
+ // Alter table to set IS_STRICT_TTL=false
+ String alterDdl = "ALTER TABLE " + fullTableName + " SET
IS_STRICT_TTL=false";
+ conn.createStatement().execute(alterDdl);
+
+ PTable alteredTable = conn.getTableNoCache(fullTableName);
+ assertFalse("IS_STRICT_TTL should be false after ALTER",
alteredTable.isStrictTTL());
+ }
+ }
+
+ @Test
+ public void testStrictTTLPropertyPersistsInSystemCatalog() throws
Exception {
+ String schemaName = generateUniqueName();
+ String tableName = generateUniqueName();
+ String fullTableName = SchemaUtil.getTableName(schemaName, tableName);
+
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ // Create table with IS_STRICT_TTL=false
+ String ddl = "CREATE TABLE " + fullTableName +
+ " (id char(1) NOT NULL," +
+ " col1 integer NOT NULL," +
+ " col2 bigint NOT NULL," +
+ " CONSTRAINT NAME_PK PRIMARY KEY (id, col1, col2))" +
+ " IS_STRICT_TTL=false";
+ conn.createStatement().execute(ddl);
+
+ // Verify the property is persisted in SYSTEM.CATALOG
+ String query = "SELECT IS_STRICT_TTL FROM SYSTEM.CATALOG " +
+ "WHERE TABLE_SCHEM = ? AND TABLE_NAME = ? AND COLUMN_NAME
IS NULL";
+ try (PreparedStatement stmt = conn.prepareStatement(query)) {
+ stmt.setString(1, schemaName);
+ stmt.setString(2, tableName);
+ ResultSet rs = stmt.executeQuery();
+ assertTrue("Should find table row", rs.next());
+ assertFalse("IS_STRICT_TTL should be false in SYSTEM.CATALOG",
+ rs.getBoolean("IS_STRICT_TTL"));
+ }
+ }
+ }
+}
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/schema/ConditionalTTLExpressionIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/schema/ConditionalTTLExpressionIT.java
index 74c02f58d6..b10ec93ebd 100644
---
a/phoenix-core/src/it/java/org/apache/phoenix/schema/ConditionalTTLExpressionIT.java
+++
b/phoenix-core/src/it/java/org/apache/phoenix/schema/ConditionalTTLExpressionIT.java
@@ -31,6 +31,7 @@ import static
org.apache.phoenix.schema.LiteralTTLExpression.TTL_EXPRESSION_FORE
import static org.apache.phoenix.util.TestUtil.retainSingleQuotes;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -92,6 +93,7 @@ import org.bson.BsonInt32;
import org.bson.BsonString;
import org.junit.After;
import org.junit.Assert;
+import org.junit.Assume;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Ignore;
@@ -127,6 +129,7 @@ public class ConditionalTTLExpressionIT extends
ParallelStatsDisabledIT {
private String tableDDLOptions;
private final boolean columnEncoded;
private final Integer tableLevelMaxLookback;
+ private final boolean isStrictTTL;
// column names -> fully qualified column names
private SchemaBuilder schemaBuilder;
// map of row-pos -> HBase row-key, used for verification
@@ -136,20 +139,25 @@ public class ConditionalTTLExpressionIT extends
ParallelStatsDisabledIT {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
public ConditionalTTLExpressionIT(boolean columnEncoded,
- Integer tableLevelMaxLooback) {
+ Integer tableLevelMaxLooback,
+ boolean isStrictTTL) {
this.columnEncoded = columnEncoded;
this.tableLevelMaxLookback = tableLevelMaxLooback; // in ms
+ this.isStrictTTL = isStrictTTL;
schemaBuilder = new SchemaBuilder(getUrl());
}
- @Parameterized.Parameters(name = "columnEncoded={0},
tableLevelMaxLookback={1}")
+ @Parameterized.Parameters(
+ name = "columnEncoded={0}, tableLevelMaxLookback={1},
isStrictTTL={2}")
public static synchronized Collection<Object[]> data() {
// maxlookback value is in sec
return Arrays.asList(new Object[][]{
- {false, 0},
- {true, 0},
- {false, 15},
- {true, 15}
+ {false, 0, false},
+ {true, 15, false},
+ {false, 0, true},
+ {true, 0, true},
+ {false, 15, true},
+ {true, 15, true}
});
}
@@ -175,6 +183,11 @@ public class ConditionalTTLExpressionIT extends
ParallelStatsDisabledIT {
} else {
optionBuilder.append(", COLUMN_ENCODED_BYTES=0");
}
+ if (!isStrictTTL) {
+ optionBuilder.append(", IS_STRICT_TTL = false");
+ } else {
+ optionBuilder.append(", IS_STRICT_TTL = true");
+ }
this.tableDDLOptions = optionBuilder.toString();
EnvironmentEdgeManager.reset();
injectEdge = new ManualEnvironmentEdge();
@@ -208,20 +221,20 @@ public class ConditionalTTLExpressionIT extends
ParallelStatsDisabledIT {
// expire 1 row by setting to true
updateColumn(conn, 3, ttlCol, true);
actual = TestUtil.getRowCount(conn, tableName, true);
- Assert.assertEquals(rowCount - 1, actual);
+ Assert.assertEquals(isStrictTTL ? rowCount - 1 : rowCount, actual);
actual = TestUtil.getRowCountFromIndex(conn, tableName, indexName);
- Assert.assertEquals(rowCount - 1, actual);
+ Assert.assertEquals(isStrictTTL ? rowCount - 1 : rowCount, actual);
// read the row again, this time it should be masked
rs = readRow(conn, 3);
- assertFalse(rs.next());
+ assertNotEquals(isStrictTTL, rs.next());
// expire 1 more row
updateColumn(conn, 2, ttlCol, true);
actual = TestUtil.getRowCount(conn, tableName, true);
- Assert.assertEquals(rowCount - 2, actual);
+ Assert.assertEquals(isStrictTTL ? rowCount - 2 : rowCount, actual);
actual = TestUtil.getRowCountFromIndex(conn, tableName, indexName);
- Assert.assertEquals(rowCount - 2, actual);
+ Assert.assertEquals(isStrictTTL ? rowCount - 2 : rowCount, actual);
// refresh the row again, this update should behave like a new row
updateColumn(conn, 3, ttlCol, false);
@@ -229,15 +242,17 @@ public class ConditionalTTLExpressionIT extends
ParallelStatsDisabledIT {
assertTrue(rs.next());
for (String col : COLUMNS) {
if (!col.equals(ttlCol)) {
- assertNull(rs.getObject(col));
+ if (isStrictTTL) {
+ assertNull(rs.getObject(col));
+ }
} else {
assertFalse(rs.getBoolean(ttlCol));
}
}
actual = TestUtil.getRowCount(conn, tableName, true);
- Assert.assertEquals(rowCount - 1, actual);
+ Assert.assertEquals(isStrictTTL ? rowCount - 1 : rowCount, actual);
actual = TestUtil.getRowCountFromIndex(conn, tableName, indexName);
- Assert.assertEquals(rowCount - 1, actual);
+ Assert.assertEquals(isStrictTTL ? rowCount - 1 : rowCount, actual);
// expire the row again
updateColumn(conn, 3, ttlCol, true);
@@ -270,9 +285,7 @@ public class ConditionalTTLExpressionIT extends
ParallelStatsDisabledIT {
@Test
public void testEverythingRetainedWithinMaxLookBack() throws Exception {
- if (tableLevelMaxLookback == 0) {
- return;
- }
+ Assume.assumeTrue(tableLevelMaxLookback > 0 && isStrictTTL);
String ttlCol = "VAL5";
String ttlExpression = String.format("%s=TRUE", ttlCol);
createTable(ttlExpression);
@@ -331,9 +344,7 @@ public class ConditionalTTLExpressionIT extends
ParallelStatsDisabledIT {
@Test
public void testPartialRowRetainedInMaxLookBack() throws Exception {
- if (tableLevelMaxLookback == 0) {
- return;
- }
+ Assume.assumeTrue(tableLevelMaxLookback > 0 && isStrictTTL);
String ttlCol = "VAL5";
String ttlExpression = String.format("%s=TRUE", ttlCol, ttlCol);
createTable(ttlExpression);
@@ -398,19 +409,21 @@ public class ConditionalTTLExpressionIT extends
ParallelStatsDisabledIT {
// bump the time so that the ttl expression evaluates to true
injectEdge.incrementValue(ttl);
actual = TestUtil.getRowCount(conn, tableName, true);
- assertEquals(0, actual);
+ assertEquals(isStrictTTL ? 0 : 5, actual);
// update VAL4 column of row 1
// This is an update on an expired row so only 2 columns should be
visible
long currentTime = injectEdge.currentTime();
updateColumn(conn, 1, "VAL4", currentTime);
actual = TestUtil.getRowCount(conn, tableName, true);
- assertEquals(1, actual);
+ assertEquals(isStrictTTL ? 1 : 5, actual);
try (ResultSet rs = readRow(conn, 1)) {
assertTrue(rs.next());
for (String col : COLUMNS) {
if (!col.equals("VAL4")) {
- assertNull(rs.getObject(col));
+ if (isStrictTTL) {
+ assertNull(rs.getObject(col));
+ }
} else {
assertEquals(currentTime,
rs.getTimestamp("VAL4").getTime());
}
@@ -421,7 +434,14 @@ public class ConditionalTTLExpressionIT extends
ParallelStatsDisabledIT {
injectEdge.incrementValue(tableLevelMaxLookback * 1000L + 2);
doMajorCompaction(tableName);
CellCount expectedCellCount = new CellCount();
- expectedCellCount.insertRow(dataRowPosToKey.get(1), 2);
+ expectedCellCount.insertRow(dataRowPosToKey.get(1),
+ isStrictTTL ? 2 : COLUMNS.length + 1);
+ validateTable(conn, tableName, expectedCellCount,
dataRowPosToKey.values());
+
+ // advance the time by more than TTL
+ injectEdge.incrementValue(ttl + 2);
+ doMajorCompaction(tableName);
+ expectedCellCount = new CellCount();
validateTable(conn, tableName, expectedCellCount,
dataRowPosToKey.values());
}
}
@@ -661,7 +681,7 @@ public class ConditionalTTLExpressionIT extends
ParallelStatsDisabledIT {
updateColumn(conn, 1, ttlCol, true);
actual = TestUtil.getRowCount(conn, tableName, true);
// 1 row expired, 2 deleted
- assertEquals(2, actual);
+ assertEquals(isStrictTTL ? 2 : 3, actual);
if (tableLevelMaxLookback == 0) {
// increment so that all updates are outside of max lookback
injectEdge.incrementValue(2);
@@ -714,7 +734,7 @@ public class ConditionalTTLExpressionIT extends
ParallelStatsDisabledIT {
// bump the time so that the ttl expression evaluates to true
injectEdge.incrementValue(QueryConstants.MILLIS_IN_DAY + 1200);
actual = TestUtil.getRowCount(conn, tableName, true);
- assertEquals(0, actual);
+ assertEquals(isStrictTTL ? 0 : 5, actual);
// update column of row 2
// This is an update on an expired row so only the updated columns
should be visible
@@ -724,7 +744,7 @@ public class ConditionalTTLExpressionIT extends
ParallelStatsDisabledIT {
List<Object> newVals = Lists.newArrayList(newVal, d);
updateColumns(conn, 2, updatedCols, newVals);
actual = TestUtil.getRowCount(conn, tableName, true);
- assertEquals(1, actual);
+ assertEquals(isStrictTTL ? 1 : 5, actual);
try (ResultSet rs = readRow(conn, 2)) {
assertTrue(rs.next());
for (String col : COLUMNS) {
@@ -733,7 +753,9 @@ public class ConditionalTTLExpressionIT extends
ParallelStatsDisabledIT {
} else if (col.equals(ttlCol)) {
assertEquals(d, rs.getDate(ttlCol));
} else {
- assertNull(rs.getObject(col));
+ if (isStrictTTL) {
+ assertNull(rs.getObject(col));
+ }
}
}
}
@@ -742,7 +764,14 @@ public class ConditionalTTLExpressionIT extends
ParallelStatsDisabledIT {
injectEdge.incrementValue(tableLevelMaxLookback * 1000L + 2);
doMajorCompaction(tableName);
CellCount expectedCellCount = new CellCount();
- expectedCellCount.insertRow(dataRowPosToKey.get(2),
updatedCols.size() + 1);
+ expectedCellCount.insertRow(dataRowPosToKey.get(2),
+ isStrictTTL ? updatedCols.size() + 1 : COLUMNS.length + 1);
+ validateTable(conn, tableName, expectedCellCount,
dataRowPosToKey.values());
+
+ // advance the time by more than TTL
+ injectEdge.incrementValue(QueryConstants.MILLIS_IN_DAY + 1400);
+ doMajorCompaction(tableName);
+ expectedCellCount = new CellCount();
validateTable(conn, tableName, expectedCellCount,
dataRowPosToKey.values());
}
}
@@ -817,9 +846,9 @@ public class ConditionalTTLExpressionIT extends
ParallelStatsDisabledIT {
null, 0, IndexTool.IndexVerifyType.BEFORE);
CounterGroup mrJobCounters = IndexToolIT.getMRJobCounters(it);
try {
- assertEquals(rowCount - 2, // only the expired rows are masked
but not deleted rows
+ assertEquals(isStrictTTL ? rowCount - 2 : rowCount, // only
the expired rows are masked but not deleted rows
mrJobCounters.findCounter(SCANNED_DATA_ROW_COUNT.name()).getValue());
- assertEquals(rowCount - 2,
+ assertEquals(isStrictTTL ? rowCount - 2 : rowCount,
mrJobCounters.findCounter(REBUILT_INDEX_ROW_COUNT.name()).getValue());
assertEquals(0,
mrJobCounters.findCounter(
@@ -833,7 +862,7 @@ public class ConditionalTTLExpressionIT extends
ParallelStatsDisabledIT {
String missingIndexRowCounter = tableLevelMaxLookback != 0 ?
BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT.name() :
BEFORE_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT.name();
- assertEquals(rowCount - 2,
+ assertEquals(isStrictTTL ? rowCount - 2 : rowCount,
mrJobCounters.findCounter(missingIndexRowCounter).getValue());
} catch (AssertionError e) {
IndexToolIT.dumpMRJobCounters(mrJobCounters);
@@ -843,9 +872,10 @@ public class ConditionalTTLExpressionIT extends
ParallelStatsDisabledIT {
// Both the tables should have the same row count from Phoenix
actual = TestUtil.getRowCount(conn, fullDataTableName, true);
- assertEquals(rowCount -(2+1), actual); // 2 expired, 1 deleted
+ // 2 expired, 1 deleted
+ assertEquals(isStrictTTL ? rowCount - (2 + 1) : rowCount - 1,
actual);
actual = TestUtil.getRowCountFromIndex(conn, fullDataTableName,
fullIndexName);
- assertEquals(rowCount -(2+1), actual);
+ assertEquals(isStrictTTL ? rowCount - (2 + 1) : rowCount - 1,
actual);
injectEdge.incrementValue(2 * tableLevelMaxLookback * 1000L + 5);
doMajorCompaction(fullDataTableName);
@@ -905,9 +935,7 @@ public class ConditionalTTLExpressionIT extends
ParallelStatsDisabledIT {
@Test
public void testNullPKColumn() throws Exception {
- if (tableLevelMaxLookback != 0) {
- return;
- }
+ Assume.assumeTrue(tableLevelMaxLookback == 0);
String tableName = "T_" + generateUniqueName();
String indexName = "I_" + generateUniqueName();
String ddlTemplate = "create table %s (id1 varchar, id2 varchar, col1
varchar, " +
@@ -922,8 +950,8 @@ public class ConditionalTTLExpressionIT extends
ParallelStatsDisabledIT {
String ddl = String.format(ddlTemplate, tableName,
String.format(tableDDLOptions,
retainSingleQuotes(ttlExpression)));
String indexDDL = String.format("create index %s ON %s (col1)
INCLUDE(col2) " +
- "\"phoenix.max.lookback.age.seconds\" = %d", indexName,
tableName,
- tableLevelMaxLookback);
+ "\"phoenix.max.lookback.age.seconds\" = %d",
+ indexName, tableName, tableLevelMaxLookback, isStrictTTL);
try (Connection conn = DriverManager.getConnection(getUrl())) {
conn.createStatement().execute(ddl);
conn.createStatement().execute(indexDDL);
@@ -940,9 +968,9 @@ public class ConditionalTTLExpressionIT extends
ParallelStatsDisabledIT {
" values(null, '0', 'col1', 'col2')");
conn.commit();
long actual = TestUtil.getRowCount(conn, tableName, true);
- assertEquals(3, actual);
+ assertEquals(isStrictTTL ? 3 : 5, actual);
actual = TestUtil.getRowCountFromIndex(conn, tableName, indexName);
- assertEquals(3, actual);
+ assertEquals(isStrictTTL ? 3 : 5, actual);
// alter the ttl
ttlExpression = "id1 is null";
@@ -950,13 +978,13 @@ public class ConditionalTTLExpressionIT extends
ParallelStatsDisabledIT {
conn.createStatement().execute(String.format(ddl, tableName,
ttlExpression));
conn.unwrap(PhoenixConnection.class).getQueryServices().clearCache();
actual = TestUtil.getRowCount(conn, tableName, true);
- assertEquals(4, actual);
+ assertEquals(isStrictTTL ? 4 : 5, actual);
PTable table = PhoenixRuntime.getTableNoCache(conn, tableName);
assertEquals(TTLExpressionFactory.create(ttlExpression),
table.getTTLExpression());
PTable index = PhoenixRuntime.getTableNoCache(conn, indexName);
assertEquals(TTLExpressionFactory.create(ttlExpression),
index.getTTLExpression());
actual = TestUtil.getRowCountFromIndex(conn, tableName, indexName);
- assertEquals(4, actual);
+ assertEquals(isStrictTTL ? 4 : 5, actual);
// alter the ttl
ttlExpression = "col1='col1'";
@@ -965,21 +993,19 @@ public class ConditionalTTLExpressionIT extends
ParallelStatsDisabledIT {
retainSingleQuotes(ttlExpression)));
conn.unwrap(PhoenixConnection.class).getQueryServices().clearCache();
actual = TestUtil.getRowCount(conn, tableName, true);
- assertEquals(2, actual);
+ assertEquals(isStrictTTL ? 2 : 5, actual);
table = PhoenixRuntime.getTableNoCache(conn, tableName);
assertEquals(TTLExpressionFactory.create(ttlExpression),
table.getTTLExpression());
index = PhoenixRuntime.getTableNoCache(conn, indexName);
assertEquals(TTLExpressionFactory.create(ttlExpression),
index.getTTLExpression());
actual = TestUtil.getRowCountFromIndex(conn, tableName, indexName);
- assertEquals(2, actual);
+ assertEquals(isStrictTTL ? 2 : 5, actual);
}
}
@Test
public void testUnverifiedRows() throws Exception {
- if (tableLevelMaxLookback != 0) {
- return;
- }
+ Assume.assumeTrue(tableLevelMaxLookback == 0);
String ttlCol = "VAL5";
String ttlExpression = String.format("%s=TRUE", ttlCol);
createTable(ttlExpression);
@@ -1041,7 +1067,7 @@ public class ConditionalTTLExpressionIT extends
ParallelStatsDisabledIT {
// Read the unverified rows to trigger read repair
actual = TestUtil.getRowCountFromIndex(conn, fullDataTableName,
fullIndexName);
TestUtil.dumpTable(conn, TableName.valueOf(fullIndexName));
- assertEquals(rowCount - 1, actual);
+ assertEquals(isStrictTTL ? rowCount - 1 : rowCount, actual);
// First read row 0 which is not expired
String dql = String.format("select VAL2, VAL5 from %s where
VAL1='%s' AND ID2=0",
fullDataTableName, val1_0);
@@ -1060,18 +1086,19 @@ public class ConditionalTTLExpressionIT extends
ParallelStatsDisabledIT {
PhoenixResultSet prs = rs1.unwrap(PhoenixResultSet.class);
String explainPlan =
QueryUtil.getExplainPlan(prs.getUnderlyingIterator());
assertTrue(explainPlan.contains(fullIndexName));
- assertFalse(rs1.next());
+ assertNotEquals(isStrictTTL, rs1.next());
}
// run the reverse index verification tool
IndexTool it = IndexToolIT.runIndexTool(false, schemaName,
tableName, indexName,
null, 0, IndexTool.IndexVerifyType.ONLY, "-fi");
CounterGroup mrJobCounters = IndexToolIT.getMRJobCounters(it);
try {
- assertEquals(1, // 1 row is expired
+ // 1 row is expired for strict TTL
+ assertEquals(isStrictTTL ? 1 : 2,
mrJobCounters.findCounter(SCANNED_DATA_ROW_COUNT.name()).getValue());
assertEquals(0,
mrJobCounters.findCounter(REBUILT_INDEX_ROW_COUNT.name()).getValue());
- assertEquals(1,
+ assertEquals(isStrictTTL ? 1 : 2,
mrJobCounters.findCounter(
BEFORE_REBUILD_VALID_INDEX_ROW_COUNT.name()).getValue());
assertEquals(0,
@@ -1099,6 +1126,7 @@ public class ConditionalTTLExpressionIT extends
ParallelStatsDisabledIT {
@Test
public void testLocalIndex() throws Exception {
+ Assume.assumeTrue(isStrictTTL);
String ttlCol = "VAL5";
String ttlExpression = String.format("%s=TRUE", ttlCol);
createTable(ttlExpression);
@@ -1158,9 +1186,7 @@ public class ConditionalTTLExpressionIT extends
ParallelStatsDisabledIT {
@Test
public void testNulls() throws Exception {
- if (tableLevelMaxLookback != 0) {
- return;
- }
+ Assume.assumeTrue(tableLevelMaxLookback == 0);
String ttlExpression = "VAL2 = -1 AND VAL6 IS NULL";
createTable(ttlExpression);
List<String> indexedColumns = Lists.newArrayList("VAL1");
@@ -1190,35 +1216,35 @@ public class ConditionalTTLExpressionIT extends
ParallelStatsDisabledIT {
}
// odd rows should be expired
actual = TestUtil.getRowCount(conn, tableName, true);
- assertEquals(rowCount/2, actual);
+ assertEquals(isStrictTTL ? rowCount / 2 : rowCount, actual);
actual = TestUtil.getRowCountFromIndex(conn, tableName, indexName);
- assertEquals(rowCount/2, actual);
+ assertEquals(isStrictTTL ? rowCount / 2 : rowCount, actual);
// partial update on a row which is expired is treated like a new
row
updateColumn(conn, 3, "VAL4", null);
actual = TestUtil.getRowCount(conn, tableName, true);
- assertEquals(rowCount/2 + 1, actual);
+ assertEquals(isStrictTTL ? rowCount / 2 + 1 : rowCount, actual);
// Delete an expired row
deleteRow(conn, 5);
actual = TestUtil.getRowCount(conn, tableName, true);
- assertEquals(rowCount/2 + 1, actual);
+ assertEquals(isStrictTTL ? rowCount / 2 + 1 : rowCount - 1,
actual);
injectEdge.incrementValue(2);
doMajorCompaction(tableName);
actual = TestUtil.getRowCount(conn, tableName, true);
- assertEquals(rowCount/2 + 1, actual);
+ // Strict TTL : 5 rows masked and expired, 5 visible, 1 new row
+ // Relaxed TTL: 5 rows expired, no new row
+ assertEquals(isStrictTTL ? rowCount / 2 + 1 : rowCount / 2,
actual);
doMajorCompaction(indexName);
actual = TestUtil.getRowCountFromIndex(conn, tableName, indexName);
- assertEquals(rowCount/2 + 1, actual);
+ assertEquals(isStrictTTL ? rowCount / 2 + 1 : rowCount / 2,
actual);
}
}
@Test
public void testNulls2() throws Exception {
- if (tableLevelMaxLookback != 0) {
- return;
- }
+ Assume.assumeTrue(tableLevelMaxLookback == 0);
String ttlExpression = "VAL2 IS NULL AND VAL4 IS NULL";
createTable(ttlExpression);
List<String> indexedColumns = Lists.newArrayList("VAL2"); // indexed
column is null
@@ -1236,9 +1262,9 @@ public class ConditionalTTLExpressionIT extends
ParallelStatsDisabledIT {
updateColumns(conn, 0,
Lists.newArrayList("VAL2", "VAL4"),
Lists.newArrayList(null, null));
actual = TestUtil.getRowCount(conn, tableName, true);
- assertEquals(0, actual);
+ assertEquals(isStrictTTL ? 0 : 1, actual);
actual = TestUtil.getRowCountFromIndex(conn, tableName, indexName);
- assertEquals(0, actual);
+ assertEquals(isStrictTTL ? 0 : 1, actual);
// now do a partial update over the expired row,
int newVal =123;
updateColumn(conn, 0, "VAL2", newVal);
@@ -1246,7 +1272,9 @@ public class ConditionalTTLExpressionIT extends
ParallelStatsDisabledIT {
assertTrue(rs.next());
for (String col : COLUMNS) {
if (!col.equals("VAL2")) {
- assertNull(rs.getObject(col));
+ if (isStrictTTL) {
+ assertNull(rs.getObject(col));
+ }
} else {
assertEquals(newVal, rs.getInt("VAL2"));
}
@@ -1298,15 +1326,15 @@ public class ConditionalTTLExpressionIT extends
ParallelStatsDisabledIT {
injectEdge.incrementValue(QueryConstants.MILLIS_IN_DAY);
actual = TestUtil.getRowCount(conn, tableName, true);
- assertEquals(rowCount/2, actual);
+ assertEquals(isStrictTTL ? rowCount/2 : rowCount, actual);
actual = TestUtil.getRowCountFromIndex(conn, tableName,
indexName);
- assertEquals(rowCount/2, actual);
+ assertEquals(isStrictTTL ? rowCount / 2 : rowCount, actual);
injectEdge.incrementValue(QueryConstants.MILLIS_IN_DAY*6);
actual = TestUtil.getRowCount(conn, tableName, true);
- assertEquals(0, actual);
+ assertEquals(isStrictTTL ? 0 : rowCount, actual);
actual = TestUtil.getRowCountFromIndex(conn, tableName,
indexName);
- assertEquals(0, actual);
+ assertEquals(isStrictTTL ? 0 : rowCount, actual);
injectEdge.incrementValue(1);
doMajorCompaction(tableName);
@@ -1322,9 +1350,7 @@ public class ConditionalTTLExpressionIT extends
ParallelStatsDisabledIT {
@Test
public void testConcurrentUpserts() throws Exception {
- if (tableLevelMaxLookback != 0) {
- return;
- }
+ Assume.assumeTrue(tableLevelMaxLookback == 0);
final int nThreads = 10;
final int batchSize = 100;
final int nRows = 499;
@@ -1405,7 +1431,7 @@ public class ConditionalTTLExpressionIT extends
ParallelStatsDisabledIT {
populateTable(conn, rowCount);
actual = TestUtil.getRowCount(conn, tableName, true);
// only odd rows (1,3) have non null attribute value
- assertEquals(2, actual);
+ assertEquals(isStrictTTL ? 2 : 5, actual);
// increment by at least 2*maxlookback so that there are no
updates within the
// maxlookback window and no updates visible through the
maxlookback window
@@ -1485,7 +1511,9 @@ public class ConditionalTTLExpressionIT extends
ParallelStatsDisabledIT {
CellCount actualCellCount = TestUtil.getRawCellCount(conn,
TableName.valueOf(tableName));
try {
- assertEquals(expectedCellCount, actualCellCount);
+ assertEquals(
+ "Expected cellCount: " + expectedCellCount + " , actual: "
+ actualCellCount,
+ expectedCellCount, actualCellCount);
} catch (AssertionError e) {
try {
TestUtil.dumpTable(conn, TableName.valueOf(tableName));
diff --git
a/phoenix-core/src/test/java/org/apache/phoenix/index/PrepareIndexMutationsForRebuildTest.java
b/phoenix-core/src/test/java/org/apache/phoenix/index/PrepareIndexMutationsForRebuildTest.java
index 99aded5f08..4ca93b698f 100644
---
a/phoenix-core/src/test/java/org/apache/phoenix/index/PrepareIndexMutationsForRebuildTest.java
+++
b/phoenix-core/src/test/java/org/apache/phoenix/index/PrepareIndexMutationsForRebuildTest.java
@@ -135,7 +135,8 @@ public class PrepareIndexMutationsForRebuildTest extends
BaseConnectionlessQuery
addEmptyColumnToDataPutMutation(dataPut, info.pDataTable, 1);
List<Mutation> actualIndexMutations = IndexRebuildRegionScanner
- .prepareIndexMutationsForRebuild(info.indexMaintainer,
dataPut, null, null, null);
+ .prepareIndexMutationsForRebuild(info.indexMaintainer,
dataPut, null, null, null,
+ true);
// Expect one row of index with row key "v1_k1"
Put idxPut1 = new Put(generateIndexRowKey("v1"));
@@ -166,7 +167,8 @@ public class PrepareIndexMutationsForRebuildTest extends
BaseConnectionlessQuery
addEmptyColumnToDataPutMutation(dataPut, info.pDataTable, 1);
List<Mutation> actualIndexMutations = IndexRebuildRegionScanner
- .prepareIndexMutationsForRebuild(info.indexMaintainer,
dataPut, null, null, null);
+ .prepareIndexMutationsForRebuild(info.indexMaintainer,
dataPut, null, null, null,
+ true);
// Expect one row of index with row key "_k1", as indexed column C1 is
nullable.
Put idxPut1 = new Put(generateIndexRowKey(null));
@@ -212,7 +214,7 @@ public class PrepareIndexMutationsForRebuildTest extends
BaseConnectionlessQuery
List<Mutation> actualIndexMutations = IndexRebuildRegionScanner
.prepareIndexMutationsForRebuild(info.indexMaintainer,
dataPut, dataDel,
- null, null);
+ null, null, true);
List<Mutation> expectedIndexMutation = new ArrayList<>();
@@ -278,7 +280,7 @@ public class PrepareIndexMutationsForRebuildTest extends
BaseConnectionlessQuery
List<Mutation> actualIndexMutations = IndexRebuildRegionScanner
.prepareIndexMutationsForRebuild(info.indexMaintainer,
dataPut, dataDel,
- null, null);
+ null, null, true);
List<Mutation> expectedIndexMutations = new ArrayList<>();
@@ -338,7 +340,7 @@ public class PrepareIndexMutationsForRebuildTest extends
BaseConnectionlessQuery
List<Mutation> actualIndexMutations = IndexRebuildRegionScanner
.prepareIndexMutationsForRebuild(info.indexMaintainer,
dataPut, dataDel,
- null, null);
+ null, null, true);
List<Mutation> expectedIndexMutations = new ArrayList<>();
@@ -406,7 +408,7 @@ public class PrepareIndexMutationsForRebuildTest extends
BaseConnectionlessQuery
List<Mutation> actualIndexMutations = IndexRebuildRegionScanner
.prepareIndexMutationsForRebuild(info.indexMaintainer,
dataPut, dataDel,
- null, null);
+ null, null, true);
List<Mutation> expectedIndexMutations = new ArrayList<>();
@@ -458,7 +460,7 @@ public class PrepareIndexMutationsForRebuildTest extends
BaseConnectionlessQuery
List<Mutation> actualIndexMutations = IndexRebuildRegionScanner
.prepareIndexMutationsForRebuild(info.indexMaintainer,
dataPut, dataDel,
- null, null);
+ null, null, true);
List<Mutation> expectedIndexMutations = new ArrayList<>();
byte[] idxKeyBytes = generateIndexRowKey("v2");
@@ -521,7 +523,7 @@ public class PrepareIndexMutationsForRebuildTest extends
BaseConnectionlessQuery
List<Mutation> actualIndexMutations = IndexRebuildRegionScanner
.prepareIndexMutationsForRebuild(info.indexMaintainer,
dataPut, dataDel,
- null, null);
+ null, null, true);
List<Mutation> expectedIndexMutations = new ArrayList<>();
byte[] idxKeyBytes = generateIndexRowKey("v2");
@@ -577,7 +579,7 @@ public class PrepareIndexMutationsForRebuildTest extends
BaseConnectionlessQuery
List<Mutation> actualIndexMutations = IndexRebuildRegionScanner
.prepareIndexMutationsForRebuild(info.indexMaintainer,
dataPut, dataDel,
- null, null);
+ null, null, true);
List<Mutation> expectedIndexMutations = new ArrayList<>();
byte[] idxKeyBytes = generateIndexRowKey("v1");
@@ -653,7 +655,7 @@ public class PrepareIndexMutationsForRebuildTest extends
BaseConnectionlessQuery
List<Mutation> actualIndexMutations = IndexRebuildRegionScanner
.prepareIndexMutationsForRebuild(info.indexMaintainer,
dataPut, dataDel,
- null, null);
+ null, null, true);
List<Mutation> expectedIndexMutation = new ArrayList<>();
@@ -718,7 +720,7 @@ public class PrepareIndexMutationsForRebuildTest extends
BaseConnectionlessQuery
List<Mutation> actualIndexMutations = IndexRebuildRegionScanner
.prepareIndexMutationsForRebuild(info.indexMaintainer,
dataPut, null,
- null, null);
+ null, null, true);
byte[] idxKeyBytes = generateIndexRowKey(null);
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
index 233723f001..5e82e81ade 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
@@ -1059,6 +1059,14 @@ public class TestUtil {
CellCount other = (CellCount) o;
return rowCountMap.equals(other.rowCountMap);
}
+
+ @Override
+ public String toString() {
+ final StringBuilder sb = new StringBuilder("CellCount{");
+ sb.append("rowCountMap=").append(rowCountMap);
+ sb.append('}');
+ return sb.toString();
+ }
}