This is an automated email from the ASF dual-hosted git repository.

tkhurana pushed a commit to branch PHOENIX-7170
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/PHOENIX-7170 by this push:
     new aafc8b863e PHOENIX-7383 Unify in-memory representation of Conditional 
TTL expressions and Literal TTL values (#1953)
aafc8b863e is described below

commit aafc8b863e0d516e631d09ef2732db69dabdadf7
Author: tkhurana <[email protected]>
AuthorDate: Thu Oct 3 13:57:24 2024 -0700

    PHOENIX-7383 Unify in-memory representation of Conditional TTL expressions 
and Literal TTL values (#1953)
    
    * PHOENIX-7383 Unify in-memory representation of Conditional TTL 
expressions and Literal TTL values
    
    * Fix banned import
    
    * Add ASF license header
    
    * Fix tests
    
    * Fix checkstyle issues
    
    * TTLExpression unit tests
    
    * Compile the TTL expression to do validation
    
    * Renamed classes
    
    * Add Conditional TTL DDL tests
    
    * Add missing ASF license header
    
    * More tests
    
    * Added check for aggregate functions and view tests
    
    ---------
    
    Co-authored-by: Tanuj Khurana <[email protected]>
---
 .../phoenix/coprocessorclient/TableTTLInfo.java    |  33 +-
 .../apache/phoenix/exception/SQLExceptionCode.java |   3 +-
 .../phoenix/jdbc/PhoenixDatabaseMetaData.java      |   2 +
 .../phoenix/query/ConnectionQueryServicesImpl.java |  11 +-
 .../phoenix/schema/ConditionTTLExpression.java     | 224 ++++++++++++
 .../org/apache/phoenix/schema/DelegateTable.java   |   2 +-
 .../phoenix/schema/LiteralTTLExpression.java       |  87 +++++
 .../org/apache/phoenix/schema/MetaDataClient.java  |  90 ++---
 .../java/org/apache/phoenix/schema/PTable.java     |   2 +-
 .../java/org/apache/phoenix/schema/PTableImpl.java |  28 +-
 .../org/apache/phoenix/schema/TTLExpression.java   |  74 ++++
 .../org/apache/phoenix/schema/TableProperty.java   |  10 +-
 .../java/org/apache/phoenix/util/ScanUtil.java     |  15 +-
 .../phoenix/coprocessor/CompactionScanner.java     | 146 +++-----
 .../phoenix/coprocessor/MetaDataEndpointImpl.java  |  33 +-
 .../mapreduce/index/IndexScrutinyMapper.java       |   7 +-
 .../util/DefaultPhoenixMultiViewListProvider.java  |   2 +-
 .../org/apache/phoenix/end2end/CreateTableIT.java  |  13 +-
 .../org/apache/phoenix/end2end/SetPropertyIT.java  |  18 +-
 .../apache/phoenix/end2end/TTLAsPhoenixTTLIT.java  | 378 +++++++++++++++------
 .../it/java/org/apache/phoenix/end2end/TTLIT.java  |  10 +-
 .../java/org/apache/phoenix/end2end/ViewTTLIT.java |   2 +-
 .../schema/ConditionalTTLExpressionDDLTest.java    | 221 ++++++++++++
 .../apache/phoenix/schema/TTLExpressionTest.java   |  68 ++++
 .../java/org/apache/phoenix/util/ScanUtilTest.java |   2 +-
 .../java/org/apache/phoenix/util/TestUtil.java     |   8 +-
 26 files changed, 1164 insertions(+), 325 deletions(-)

diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/coprocessorclient/TableTTLInfo.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/coprocessorclient/TableTTLInfo.java
index fa9e50b3fb..3a57eae3b4 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/coprocessorclient/TableTTLInfo.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/coprocessorclient/TableTTLInfo.java
@@ -18,11 +18,14 @@
 
 package org.apache.phoenix.coprocessorclient;
 
-import org.apache.hadoop.hbase.util.Bytes;
-
 import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
 
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.schema.TTLExpression;
+
+import 
org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
+
 /**
  * Simple POJO class to hold TTL info
  */
@@ -31,18 +34,38 @@ public class TableTTLInfo implements Comparable {
     private final byte[] tenantId;
     private final byte[] entityName;
     private final byte[] matchPattern;
-    private final int ttl;
+    private final TTLExpression ttl;
 
+    @VisibleForTesting
     public TableTTLInfo(String physicalTableName, String tenantId, String 
entityName, String matchPattern, int ttl) {
         super();
         this.physicalTableName = 
physicalTableName.getBytes(StandardCharsets.UTF_8);
         this.tenantId = tenantId.getBytes(StandardCharsets.UTF_8);
         this.entityName = entityName.getBytes(StandardCharsets.UTF_8);
         this.matchPattern = matchPattern.getBytes(StandardCharsets.UTF_8);
-        this.ttl = ttl;
+        this.ttl = TTLExpression.create(ttl);
     }
 
+    @VisibleForTesting
     public TableTTLInfo(byte[] physicalTableName, byte[] tenantId, byte[] 
entityName, byte[] matchPattern, int ttl) {
+        super();
+        this.physicalTableName = physicalTableName;
+        this.tenantId = tenantId;
+        this.matchPattern = matchPattern;
+        this.entityName = entityName;
+        this.ttl = TTLExpression.create(ttl);
+    }
+
+    public TableTTLInfo(String physicalTableName, String tenantId, String 
entityName, String matchPattern, TTLExpression ttl) {
+        super();
+        this.physicalTableName = 
physicalTableName.getBytes(StandardCharsets.UTF_8);
+        this.tenantId = tenantId.getBytes(StandardCharsets.UTF_8);
+        this.entityName = entityName.getBytes(StandardCharsets.UTF_8);
+        this.matchPattern = matchPattern.getBytes(StandardCharsets.UTF_8);
+        this.ttl = ttl;
+    }
+
+    public TableTTLInfo(byte[] physicalTableName, byte[] tenantId, byte[] 
entityName, byte[] matchPattern, TTLExpression ttl) {
         super();
         this.physicalTableName = physicalTableName;
         this.tenantId = tenantId;
@@ -51,7 +74,7 @@ public class TableTTLInfo implements Comparable {
         this.ttl = ttl;
     }
 
-    public int getTTL() {
+    public TTLExpression getTTL() {
         return ttl;
     }
     public byte[] getTenantId() {
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
index 24abfc4bb4..c403154500 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
@@ -206,7 +206,8 @@ public enum SQLExceptionCode {
     INVALID_JSON_DATA(540, "42916", "Invalid json data."),
     JSON_FRAGMENT_NOT_ALLOWED_IN_INDEX_EXPRESSION(541, "42917",
             "Functions returning JSON fragments are not allowed in Index 
Expression."),
-
+    AGGREGATE_EXPRESSION_NOT_ALLOWED_IN_TTL_EXPRESSION(542, "42918",
+            "Aggregate expression not allowed in an TTL Expression."),
     /**
      * HBase and Phoenix specific implementation defined sub-classes.
      * Column family related exceptions.
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
index ad7f83a8a0..642a94b4e6 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
@@ -397,6 +397,8 @@ public class PhoenixDatabaseMetaData implements 
DatabaseMetaData {
     public static final byte[] TTL_BYTES = Bytes.toBytes(TTL);
     public static final int TTL_NOT_DEFINED = 0;
     public static final int DEFAULT_TTL = HConstants.FOREVER;
+    public static final String FOREVER_TTL = "FOREVER";
+    public static final String NONE_TTL = "NONE";
     public static final String PHOENIX_TTL = "PHOENIX_TTL";
     public static final byte[] PHOENIX_TTL_BYTES = Bytes.toBytes(PHOENIX_TTL);
     public static final String PHOENIX_TTL_HWM = "PHOENIX_TTL_HWM";
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index 26254451b8..5edcb299aa 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -265,6 +265,8 @@ import org.apache.phoenix.schema.Sequence;
 import org.apache.phoenix.schema.SequenceAllocation;
 import org.apache.phoenix.schema.SequenceKey;
 import org.apache.phoenix.schema.SortOrder;
+import org.apache.phoenix.schema.TTLExpression;
+import org.apache.phoenix.schema.LiteralTTLExpression;
 import org.apache.phoenix.schema.TableAlreadyExistsException;
 import org.apache.phoenix.schema.TableNotFoundException;
 import org.apache.phoenix.schema.TableProperty;
@@ -2985,7 +2987,7 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
         boolean willBeTransactional = false;
         boolean isOrWillBeTransactional = isTransactional;
         Integer newTTL = null;
-        Integer newPhoenixTTL = null;
+        TTLExpression newPhoenixTTL = null;
         Integer newReplicationScope = null;
         KeepDeletedCells newKeepDeletedCells = null;
         TransactionFactory.Provider txProvider = null;
@@ -3031,14 +3033,17 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
                                 //If Phoenix level TTL is enabled we are using 
TTL as phoenix
                                 //Table level property.
                                 if (!isPhoenixTTLEnabled()) {
-                                    newTTL = ((Number) propValue).intValue();
+                                    // only literal TTL expression
+                                    LiteralTTLExpression ttlExpr =
+                                            (LiteralTTLExpression) 
TableProperty.TTL.getValue(propValue);
+                                    newTTL = ttlExpr != null ? 
ttlExpr.getTTLValue() : null;
                                     //Even though TTL is really a 
HColumnProperty we treat it
                                     //specially. We enforce that all CFs have 
the same TTL.
                                     commonFamilyProps.put(propName, propValue);
                                 } else {
                                     //Setting this here just to check if we 
need to throw Exception
                                     //for Transaction's SET_TTL Feature.
-                                    newPhoenixTTL = ((Number) 
propValue).intValue();
+                                    newPhoenixTTL = (TTLExpression) 
TableProperty.TTL.getValue(propValue);
                                 }
                             } else if 
(propName.equals(PhoenixDatabaseMetaData.TRANSACTIONAL) && 
Boolean.TRUE.equals(propValue)) {
                                 willBeTransactional = isOrWillBeTransactional 
= true;
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/schema/ConditionTTLExpression.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/ConditionTTLExpression.java
new file mode 100644
index 0000000000..828010a034
--- /dev/null
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/ConditionTTLExpression.java
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.schema;
+
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DEFAULT_TTL;
+import static 
org.apache.phoenix.schema.PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN;
+import static 
org.apache.phoenix.schema.PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS;
+
+import java.sql.SQLException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.compile.ColumnResolver;
+import org.apache.phoenix.compile.ExpressionCompiler;
+import org.apache.phoenix.compile.FromCompiler;
+import org.apache.phoenix.compile.StatementContext;
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.exception.SQLExceptionInfo;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.KeyValueColumnExpression;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixStatement;
+import org.apache.phoenix.parse.ColumnDef;
+import org.apache.phoenix.parse.ColumnName;
+import org.apache.phoenix.parse.ColumnParseNode;
+import org.apache.phoenix.parse.CreateTableStatement;
+import org.apache.phoenix.parse.ParseNode;
+import org.apache.phoenix.parse.SQLParser;
+import org.apache.phoenix.parse.TableName;
+import org.apache.phoenix.schema.types.PBoolean;
+import org.apache.phoenix.schema.types.PDataType;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.SchemaUtil;
+
+public class ConditionTTLExpression extends TTLExpression {
+    private final String ttlExpr;
+
+    public ConditionTTLExpression(String ttlExpr) {
+        this.ttlExpr = ttlExpr;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        ConditionTTLExpression that = (ConditionTTLExpression) o;
+        return ttlExpr.equals(that.ttlExpr);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(ttlExpr);
+    }
+
+    @Override
+    public String getTTLExpression() {
+        return ttlExpr;
+    }
+
+    @Override
+    public String toString() {
+        return getTTLExpression();
+    }
+
+    @Override
+    /**
+     * @param result row to be evaluated against the conditional ttl expression
+     * @return DEFAULT_TTL (FOREVER) if the expression evaluates to False else 0
+     * if the expression evaluates to true i.e. row is expired
+     */
+    public long getTTLForRow(List<Cell> result) {
+        // TODO
+        return DEFAULT_TTL;
+    }
+
+    @Override
+    public void validateTTLOnCreation(PhoenixConnection conn, 
CreateTableStatement create) throws SQLException {
+        ParseNode ttlCondition = SQLParser.parseCondition(this.ttlExpr);
+        StatementContext ttlValidationContext = new StatementContext(new 
PhoenixStatement(conn));
+        // Construct a PTable with just enough information to be able to 
compile the TTL expression
+        PTable newTable = createTempPTable(conn, create);
+        ttlValidationContext.setCurrentTable(new TableRef(newTable));
+        VerifyCreateConditionalTTLExpression condTTLVisitor =
+                new VerifyCreateConditionalTTLExpression(conn, 
ttlValidationContext, create);
+        Expression ttlExpression = ttlCondition.accept(condTTLVisitor);
+        validateTTLExpression(ttlExpression, condTTLVisitor);
+    }
+
+    @Override
+    public void validateTTLOnAlter(PhoenixConnection conn, PTable table) 
throws SQLException {
+        ParseNode ttlCondition = SQLParser.parseCondition(this.ttlExpr);
+        ColumnResolver resolver = FromCompiler.getResolver(new 
TableRef(table));
+        StatementContext context = new StatementContext(new 
PhoenixStatement(conn), resolver);
+        ExpressionCompiler expressionCompiler = new 
ExpressionCompiler(context);
+        Expression ttlExpression = ttlCondition.accept(expressionCompiler);
+        validateTTLExpression(ttlExpression, expressionCompiler);
+    }
+
+    @Override
+    public String getTTLForScanAttribute() {
+        // Conditional TTL is not sent as a scan attribute
+        // Masking is implemented using query re-write
+        return null;
+    }
+
+    /**
+     * Validates that all the columns used in the conditional TTL expression 
are present in the table
+     * or its parent table in case of view
+     */
+    private static class VerifyCreateConditionalTTLExpression extends 
ExpressionCompiler {
+        private final CreateTableStatement create;
+        private final ColumnResolver baseTableResolver;
+
+        private VerifyCreateConditionalTTLExpression(PhoenixConnection conn,
+                                                     StatementContext 
ttlExprValidationContext,
+                                                     CreateTableStatement 
create) throws SQLException {
+            super(ttlExprValidationContext);
+            this.create = create;
+            // Returns the resolver for base table if base table is not null 
(in case of views)
+            // Else, returns FromCompiler#EMPTY_TABLE_RESOLVER which is a 
no-op resolver
+            this.baseTableResolver = 
FromCompiler.getResolverForCreation(create, conn);
+        }
+
+        @Override
+        public Expression visit(ColumnParseNode node) throws SQLException {
+            // First check current table
+            for (ColumnDef columnDef : create.getColumnDefs()) {
+                ColumnName columnName = columnDef.getColumnDefName();
+                // Takes family name into account
+                if (columnName.toString().equals(node.getFullName())) {
+                    String cf = columnName.getFamilyName();
+                    String cq = columnName.getColumnName();
+                    return new KeyValueColumnExpression( new PDatum() {
+                        @Override
+                        public boolean isNullable() {
+                            return columnDef.isNull();
+                        }
+                        @Override
+                        public PDataType getDataType() {
+                            return columnDef.getDataType();
+                        }
+                        @Override
+                        public Integer getMaxLength() {
+                            return columnDef.getMaxLength();
+                        }
+                        @Override
+                        public Integer getScale() {
+                            return columnDef.getScale();
+                        }
+                        @Override
+                        public SortOrder getSortOrder() {
+                            return columnDef.getSortOrder();
+                        }
+                    }, cf != null ? Bytes.toBytes(cf) : null, 
Bytes.toBytes(cq));
+                }
+            }
+            // Column used in TTL expression not found in current, check the 
parent
+            ColumnRef columnRef = baseTableResolver.resolveColumn(
+                    node.getSchemaName(), node.getTableName(), node.getName());
+            return 
columnRef.newColumnExpression(node.isTableNameCaseSensitive(), 
node.isCaseSensitive());
+        }
+    }
+
+    /**
+     * We are still in the middle of executing the CreateTable statement, so 
we don't have
+     * the PTable yet, but we need one for compiling the conditional TTL 
expression so let's
+     * build the PTable object with just enough information to be able to 
compile the Conditional
+     * TTL expression statement.
+     * @param statement
+     * @return
+     * @throws SQLException
+     */
+    private PTable createTempPTable(PhoenixConnection conn, 
CreateTableStatement statement) throws SQLException {
+        final TableName tableNameNode = statement.getTableName();
+        final PName schemaName = 
PNameFactory.newName(tableNameNode.getSchemaName());
+        final PName tableName = 
PNameFactory.newName(tableNameNode.getTableName());
+        PName fullName = SchemaUtil.getTableName(schemaName, tableName);
+        final PName tenantId = conn.getTenantId();
+        return new PTableImpl.Builder()
+                .setName(fullName)
+                .setKey(new PTableKey(tenantId, fullName.getString()))
+                .setTenantId(tenantId)
+                .setSchemaName(schemaName)
+                .setTableName(tableName)
+                .setType(statement.getTableType())
+                .setImmutableStorageScheme(ONE_CELL_PER_COLUMN)
+                .setQualifierEncodingScheme(NON_ENCODED_QUALIFIERS)
+                .setFamilies(Collections.EMPTY_LIST)
+                .setIndexes(Collections.EMPTY_LIST)
+                .build();
+    }
+
+    private void validateTTLExpression(Expression ttlExpression,
+                                       ExpressionCompiler expressionCompiler) 
throws SQLException {
+
+        if (expressionCompiler.isAggregate()) {
+            throw new SQLExceptionInfo.Builder(
+                    
SQLExceptionCode.AGGREGATE_EXPRESSION_NOT_ALLOWED_IN_TTL_EXPRESSION).build().buildException();
+        }
+
+        if (ttlExpression.getDataType() != PBoolean.INSTANCE) {
+            throw TypeMismatchException.newException(PBoolean.INSTANCE,
+                    ttlExpression.getDataType(), ttlExpression.toString());
+        }
+    }
+}
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/schema/DelegateTable.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/DelegateTable.java
index e67e876dcd..9e1f2e81a1 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/schema/DelegateTable.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/DelegateTable.java
@@ -381,7 +381,7 @@ public class DelegateTable implements PTable {
         return delegate.hasViewModifiedUseStatsForParallelization();
     }
 
-    @Override public int getTTL() {
+    @Override public TTLExpression getTTL() {
         return delegate.getTTL();
     }
 
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/schema/LiteralTTLExpression.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/LiteralTTLExpression.java
new file mode 100644
index 0000000000..bf4c934611
--- /dev/null
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/LiteralTTLExpression.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.schema;
+
+import java.sql.SQLException;
+import java.util.List;
+import java.util.Objects;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.parse.CreateTableStatement;
+import org.apache.phoenix.thirdparty.com.google.common.base.Preconditions;
+
+public class LiteralTTLExpression extends TTLExpression {
+    private final int ttlValue;
+
+    public LiteralTTLExpression(int ttl) {
+        Preconditions.checkArgument(ttl >= 0);
+        this.ttlValue = ttl;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        LiteralTTLExpression that = (LiteralTTLExpression) o;
+        return ttlValue == that.ttlValue;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(ttlValue);
+    }
+
+    @Override
+    public String getTTLExpression() {
+        return String.valueOf(ttlValue);
+    }
+
+    @Override
+    public String toString() {
+        return getTTLExpression();
+    }
+
+    @Override
+    public long getTTLForRow(List<Cell> result) {
+        return ttlValue;
+    }
+
+    @Override
+    public void validateTTLOnCreation(PhoenixConnection conn,
+                                      CreateTableStatement create) throws 
SQLException {
+
+    }
+
+    @Override
+    public void validateTTLOnAlter(PhoenixConnection connection, PTable table) 
throws SQLException {
+
+    }
+
+    @Override
+    public String getTTLForScanAttribute() {
+        if (this.equals(TTLExpression.TTL_EXPRESSION_NOT_DEFINED)) {
+            return null;
+        }
+        return getTTLExpression();
+    }
+
+    public int getTTLValue() {
+        return ttlValue;
+    }
+}
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index fb7b3a9595..3ad8ade84c 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -21,7 +21,6 @@ import static 
org.apache.phoenix.exception.SQLExceptionCode.CANNOT_TRANSFORM_TRA
 import static 
org.apache.phoenix.exception.SQLExceptionCode.ERROR_WRITING_TO_SCHEMA_REGISTRY;
 import static 
org.apache.phoenix.exception.SQLExceptionCode.TABLE_ALREADY_EXIST;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CDC_INCLUDE_TABLE;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TTL_NOT_DEFINED;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.STREAMING_TOPIC_NAME;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_TASK_TABLE;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TTL;
@@ -29,6 +28,7 @@ import static 
org.apache.phoenix.query.QueryConstants.SPLITS_FILE;
 import static org.apache.phoenix.query.QueryConstants.SYSTEM_SCHEMA_NAME;
 import static 
org.apache.phoenix.query.QueryServices.INDEX_CREATE_DEFAULT_STATE;
 import static org.apache.phoenix.schema.PTableType.CDC;
+import static 
org.apache.phoenix.schema.TTLExpression.TTL_EXPRESSION_NOT_DEFINED;
 import static 
org.apache.phoenix.thirdparty.com.google.common.collect.Sets.newLinkedHashSet;
 import static 
org.apache.phoenix.thirdparty.com.google.common.collect.Sets.newLinkedHashSetWithExpectedSize;
 import static 
org.apache.phoenix.exception.SQLExceptionCode.INSUFFICIENT_MULTI_TENANT_COLUMNS;
@@ -2315,10 +2315,10 @@ public class MetaDataClient {
      * @return TTL from hierarchy if defined otherwise TTL_NOT_DEFINED.
      * @throws TableNotFoundException if not able ot find any table in 
hierarchy
      */
-    private Integer checkAndGetTTLFromHierarchy(PTable parent) throws 
SQLException {
+    private TTLExpression checkAndGetTTLFromHierarchy(PTable parent) throws 
SQLException {
         return parent != null ? (parent.getType() == TABLE ? parent.getTTL()
-                : (parent.getType() == VIEW && parent.getViewType() != MAPPED 
? getTTLFromViewHierarchy(parent) : TTL_NOT_DEFINED))
-                : TTL_NOT_DEFINED;
+                : (parent.getType() == VIEW && parent.getViewType() != MAPPED 
? getTTLFromViewHierarchy(parent) : TTL_EXPRESSION_NOT_DEFINED))
+                : TTL_EXPRESSION_NOT_DEFINED;
     }
 
     /**
@@ -2327,9 +2327,9 @@ public class MetaDataClient {
      * @return appropriate TTL from Views defined above for the entity calling.
      * @throws TableNotFoundException if not able to find any table in 
hierarchy
      */
-    private Integer getTTLFromViewHierarchy(PTable view) throws SQLException {
-            return view.getTTL() != TTL_NOT_DEFINED
-                    ? Integer.valueOf(view.getTTL()) : 
(checkIfParentIsTable(view)
+    private TTLExpression getTTLFromViewHierarchy(PTable view) throws 
SQLException {
+            return view.getTTL() != TTL_EXPRESSION_NOT_DEFINED
+                    ? view.getTTL() : (checkIfParentIsTable(view)
                     ? PhoenixRuntime.getTable(connection, 
view.getPhysicalNames().get(0).toString()).getTTL()
                     : 
getTTLFromViewHierarchy(PhoenixRuntime.getTable(connection, 
view.getParentName().toString())));
     }
@@ -2407,46 +2407,48 @@ public class MetaDataClient {
                     tableType == PTableType.VIEW ? parent.getColumns().size()
                             : QueryConstants.BASE_TABLE_BASE_COLUMN_COUNT;
 
-            Integer ttl = TTL_NOT_DEFINED;
-            Integer ttlFromHierarchy = TTL_NOT_DEFINED;
-            Integer ttlProp = (Integer) TableProperty.TTL.getValue(tableProps);
+            TTLExpression ttl = TTL_EXPRESSION_NOT_DEFINED;
+            TTLExpression ttlFromHierarchy = TTL_EXPRESSION_NOT_DEFINED;
+            TTLExpression ttlProp = (TTLExpression) 
TableProperty.TTL.getValue(tableProps);
 
             // Validate TTL prop value if set
             if (ttlProp != null) {
-                if (ttlProp < 0) {
-                    throw new 
SQLExceptionInfo.Builder(SQLExceptionCode.ILLEGAL_DATA)
-                            .setMessage(String.format("entity = %s, TTL value 
should be > 0",
-                                    tableName))
-                            .build()
-                            .buildException();
-                }
                 if (!isViewTTLEnabled() && tableType == VIEW) {
                     throw new SQLExceptionInfo.Builder(SQLExceptionCode.
-                            VIEW_TTL_NOT_ENABLED)
-                            .setSchemaName(schemaName)
-                            .setTableName(tableName)
-                            .build()
-                            .buildException();
+                        VIEW_TTL_NOT_ENABLED)
+                        .setSchemaName(schemaName)
+                        .setTableName(tableName)
+                        .build()
+                        .buildException();
                 }
 
                 if (tableType != TABLE && (tableType != VIEW || viewType != 
UPDATABLE)) {
                     throw new SQLExceptionInfo.Builder(SQLExceptionCode.
-                            TTL_SUPPORTED_FOR_TABLES_AND_VIEWS_ONLY)
+                        TTL_SUPPORTED_FOR_TABLES_AND_VIEWS_ONLY)
+                        .setSchemaName(schemaName)
+                        .setTableName(tableName)
+                        .build()
+                        .buildException();
+                }
+                ttlFromHierarchy = checkAndGetTTLFromHierarchy(parent);
+                if (ttlFromHierarchy != TTL_EXPRESSION_NOT_DEFINED) {
+                    throw new SQLExceptionInfo.Builder(SQLExceptionCode.
+                            TTL_ALREADY_DEFINED_IN_HIERARCHY)
                             .setSchemaName(schemaName)
                             .setTableName(tableName)
                             .build()
                             .buildException();
                 }
-                ttlFromHierarchy = checkAndGetTTLFromHierarchy(parent);
-                if (ttlFromHierarchy != TTL_NOT_DEFINED) {
-                    throw new SQLExceptionInfo.Builder(SQLExceptionCode.
-                            TTL_ALREADY_DEFINED_IN_HIERARCHY)
+                try {
+                    ttlProp.validateTTLOnCreation(connection, statement);
+                } catch (IllegalArgumentException e) {
+                    throw new 
SQLExceptionInfo.Builder(SQLExceptionCode.ILLEGAL_DATA)
+                            .setMessage(e.getMessage())
                             .setSchemaName(schemaName)
                             .setTableName(tableName)
                             .build()
                             .buildException();
                 }
-
                 ttl = ttlProp;
             } else {
                 ttlFromHierarchy = checkAndGetTTLFromHierarchy(parent);
@@ -3272,7 +3274,7 @@ public class MetaDataClient {
                         .setIndexWhere(statement.getWhereClause() == null ? 
null
                                 : statement.getWhereClause().toString())
                         .setRowKeyMatcher(rowKeyMatcher)
-                        .setTTL(TTL_NOT_DEFINED)
+                        .setTTL(TTL_EXPRESSION_NOT_DEFINED)
                         .build();
                 connection.addTable(table, 
MetaDataProtocol.MIN_TABLE_TIMESTAMP);
             }
@@ -3539,10 +3541,10 @@ public class MetaDataClient {
                 tableUpsert.setString(36, cdcIncludeScopesStr);
             }
 
-            if (ttl == null || ttl == TTL_NOT_DEFINED) {
+            if (ttl == null || ttl == TTL_EXPRESSION_NOT_DEFINED) {
                 tableUpsert.setNull(37, Types.VARCHAR);
             } else {
-                tableUpsert.setString(37, String.valueOf(ttl));
+                tableUpsert.setString(37, ttl.getTTLExpression());
             }
 
             if ((rowKeyMatcher == null) ||
@@ -3700,7 +3702,7 @@ public class MetaDataClient {
                                 : statement.getWhereClause().toString())
                         .setMaxLookbackAge(maxLookbackAge)
                         .setCDCIncludeScopes(cdcIncludeScopes)
-                        .setTTL(ttl == null || ttl == TTL_NOT_DEFINED ? 
ttlFromHierarchy : ttl)
+                        .setTTL(ttl == null || ttl == 
TTL_EXPRESSION_NOT_DEFINED ? ttlFromHierarchy : ttl)
                         .setRowKeyMatcher(rowKeyMatcher)
                         .build();
                 result = new MetaDataMutationResult(code, 
result.getMutationTime(), table, true);
@@ -4196,7 +4198,7 @@ public class MetaDataClient {
             Long updateCacheFrequency, Boolean isImmutableRows, Boolean 
disableWAL,
             Boolean isMultiTenant, Boolean storeNulls, Long guidePostWidth, 
Boolean appendOnlySchema,
             ImmutableStorageScheme immutableStorageScheme, Boolean 
useStatsForParallelization,
-            Integer ttl, Boolean isChangeDetectionEnabled, String 
physicalTableName, String schemaVersion,
+            TTLExpression ttl, Boolean isChangeDetectionEnabled, String 
physicalTableName, String schemaVersion,
                                       QualifierEncodingScheme 
columnEncodedBytes, String streamingTopicName, Long maxLookbackAge)
             throws SQLException {
         String schemaName = table.getSchemaName().getString();
@@ -4255,7 +4257,7 @@ public class MetaDataClient {
         }
         if (ttl != null) {
             mutateStringProperty(connection, tenantId, schemaName, tableName, 
TTL,
-                    ttl == TTL_NOT_DEFINED ? null : String.valueOf(ttl));
+                    ttl == TTL_EXPRESSION_NOT_DEFINED ? null : 
ttl.getTTLExpression());
         }
         if (isChangeDetectionEnabled != null) {
             mutateBooleanProperty(connection, tenantId, schemaName, tableName, 
CHANGE_DETECTION_ENABLED, isChangeDetectionEnabled);
@@ -4511,12 +4513,12 @@ public class MetaDataClient {
                 if (areWeIntroducingTTLAtThisLevel.booleanValue()) {
                     //As we are introducing TTL for the first time at this 
level, we need to check
                     //if TTL is already defined up or down in the hierarchy.
-                    Integer ttlAlreadyDefined = TTL_NOT_DEFINED;
+                    TTLExpression ttlAlreadyDefined = 
TTL_EXPRESSION_NOT_DEFINED;
                     //Check up the hierarchy
                     if (table.getType() != PTableType.TABLE) {
                         ttlAlreadyDefined = 
checkAndGetTTLFromHierarchy(PhoenixRuntime.getTableNoCache(connection, 
table.getParentName().toString()));
                     }
-                    if (ttlAlreadyDefined != TTL_NOT_DEFINED) {
+                    if (ttlAlreadyDefined != TTL_EXPRESSION_NOT_DEFINED) {
                         throw new SQLExceptionInfo.Builder(SQLExceptionCode.
                                 TTL_ALREADY_DEFINED_IN_HIERARCHY)
                                 .setSchemaName(schemaName)
@@ -5840,7 +5842,7 @@ public class MetaDataClient {
                     } else if 
(propName.equalsIgnoreCase(USE_STATS_FOR_PARALLELIZATION)) {
                         
metaProperties.setUseStatsForParallelizationProp((Boolean)value);
                     } else if (propName.equalsIgnoreCase(TTL)) {
-                        metaProperties.setTTL((Integer) value);
+                        metaProperties.setTTL((TTLExpression) value);
                     } else if 
(propName.equalsIgnoreCase(CHANGE_DETECTION_ENABLED)) {
                         metaProperties.setChangeDetectionEnabled((Boolean) 
value);
                     } else if (propName.equalsIgnoreCase(PHYSICAL_TABLE_NAME)) 
{
@@ -6041,6 +6043,8 @@ public class MetaDataClient {
                         .buildException();
             }
             if (metaProperties.getTTL() != table.getTTL()) {
+                TTLExpression newTTL = metaProperties.getTTL();
+                newTTL.validateTTLOnAlter(connection, table);
                 metaPropertiesEvaluated.setTTL(metaProperties.getTTL());
                 changingPhoenixTableProperty = true;
             }
@@ -6113,7 +6117,7 @@ public class MetaDataClient {
         private ImmutableStorageScheme immutableStorageSchemeProp = null;
         private Boolean useStatsForParallelizationProp = null;
         private boolean nonTxToTx = false;
-        private Integer ttl = null;
+        private TTLExpression ttl = null;
         private Boolean isChangeDetectionEnabled = null;
         private String physicalTableName = null;
         private String schemaVersion = null;
@@ -6235,11 +6239,11 @@ public class MetaDataClient {
             this.nonTxToTx = nonTxToTx;
         }
 
-        public Integer getTTL() {
+        public TTLExpression getTTL() {
             return ttl;
         }
 
-        public void setTTL(Integer ttl) {
+        public void setTTL(TTLExpression ttl) {
             this.ttl = ttl;
         }
 
@@ -6295,7 +6299,7 @@ public class MetaDataClient {
         private Boolean useStatsForParallelization = null;
         private Boolean isTransactional = null;
         private TransactionFactory.Provider transactionProvider = null;
-        private Integer ttl = null;
+        private TTLExpression ttl = null;
         private Boolean isChangeDetectionEnabled = null;
         private String physicalTableName = null;
         private String schemaVersion = null;
@@ -6398,9 +6402,9 @@ public class MetaDataClient {
             this.transactionProvider = transactionProvider;
         }
 
-        public Integer getTTL() { return ttl; }
+        public TTLExpression getTTL() { return ttl; }
 
-        public void setTTL(Integer ttl) { this.ttl = ttl; }
+        public void setTTL(TTLExpression ttl) { this.ttl = ttl; }
 
         public Boolean isChangeDetectionEnabled() {
             return isChangeDetectionEnabled;
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/schema/PTable.java 
b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/PTable.java
index 9c487f5bd0..a61b50778f 100644
--- a/phoenix-core-client/src/main/java/org/apache/phoenix/schema/PTable.java
+++ b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/PTable.java
@@ -934,7 +934,7 @@ public interface PTable extends PMetaDataEntity {
     /**
      * @return The TTL duration associated with the entity when Phoenix level 
TTL is enabled.
      */
-    int getTTL();
+    TTLExpression getTTL();
 
     /**
      * @return the last timestamp at which this entity had its data shape 
created or modified (e
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/schema/PTableImpl.java 
b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/PTableImpl.java
index a185811740..a2aa4fd02d 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/schema/PTableImpl.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/PTableImpl.java
@@ -49,6 +49,7 @@ import static 
org.apache.phoenix.query.QueryServicesOptions.DEFAULT_TRANSACTION_
 import static 
org.apache.phoenix.query.QueryServicesOptions.DEFAULT_UPDATE_CACHE_FREQUENCY;
 import static 
org.apache.phoenix.query.QueryServicesOptions.DEFAULT_USE_STATS_FOR_PARALLELIZATION;
 import static org.apache.phoenix.schema.SaltingUtil.SALTING_COLUMN;
+import static 
org.apache.phoenix.schema.TTLExpression.TTL_EXPRESSION_NOT_DEFINED;
 import static org.apache.phoenix.schema.TableProperty.DEFAULT_COLUMN_FAMILY;
 import static org.apache.phoenix.schema.types.PDataType.TRUE_BYTES;
 
@@ -196,6 +197,8 @@ import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.SizedUtil;
 import org.apache.phoenix.util.TrustedByteArrayOutputStream;
 
+import com.google.protobuf.ByteString;
+
 /**
  *
  * Base class for PTable implementors.  Provides abstraction for
@@ -271,7 +274,7 @@ public class PTableImpl implements PTable {
     private final QualifierEncodingScheme qualifierEncodingScheme;
     private final EncodedCQCounter encodedCQCounter;
     private final Boolean useStatsForParallelization;
-    private final int ttl;
+    private final TTLExpression ttl;
     private final BitSet viewModifiedPropSet;
     private final Long lastDDLTimestamp;
     private final boolean isChangeDetectionEnabled;
@@ -353,7 +356,7 @@ public class PTableImpl implements PTable {
         private String indexWhere;
         private Long maxLookbackAge;
         private Map<PTableKey, Long> ancestorLastDDLTimestampMap = new 
HashMap<>();
-        private int ttl;
+        private TTLExpression ttl = TTL_EXPRESSION_NOT_DEFINED;
         private byte[] rowKeyMatcher;
 
         // Used to denote which properties a view has explicitly modified
@@ -689,8 +692,10 @@ public class PTableImpl implements PTable {
             return this;
         }
 
-        public Builder setTTL(int ttl) {
-            propertyValues.put(TTL, String.valueOf(ttl));
+        public Builder setTTL(TTLExpression ttl) {
+            if (ttl != null) {
+                propertyValues.put(TTL, ttl.getTTLExpression());
+            }
             this.ttl = ttl;
             return this;
         }
@@ -2125,10 +2130,10 @@ public class PTableImpl implements PTable {
             cdcIncludeScopesStr = table.getCDCIncludeScopes();
         }
 
-        Integer ttl = TTL_NOT_DEFINED;
+        TTLExpression ttl = TTL_EXPRESSION_NOT_DEFINED;
         if (table.hasTtl()) {
-            String ttlStr = (String) 
PVarchar.INSTANCE.toObject(table.getTtl().toByteArray());
-            ttl = Integer.parseInt(ttlStr);
+            String ttlExpr = (String) 
PVarchar.INSTANCE.toObject(table.getTtl().toByteArray());
+            ttl = TTLExpression.create(ttlExpr);
         }
 
         byte[] rowKeyMatcher = null;
@@ -2342,9 +2347,10 @@ public class PTableImpl implements PTable {
         builder.setCDCIncludeScopes(CDCUtil.makeChangeScopeStringFromEnums(
                 table.getCDCIncludeScopes() != null ? 
table.getCDCIncludeScopes()
                 : Collections.EMPTY_SET));
-
-        
builder.setTtl(ByteStringer.wrap(PVarchar.INSTANCE.toBytes(String.valueOf(table.getTTL()))));
-
+        if (table.getTTL() != null) {
+            builder.setTtl(ByteStringer.wrap(PVarchar.INSTANCE.toBytes(
+                    table.getTTL().getTTLExpression())));
+        }
         if (table.getRowKeyMatcher() != null) {
             
builder.setRowKeyMatcher(ByteStringer.wrap(table.getRowKeyMatcher()));
         }
@@ -2444,7 +2450,7 @@ public class PTableImpl implements PTable {
     }
 
     @Override
-    public int getTTL() {
+    public TTLExpression getTTL() {
         return ttl;
     }
 
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/schema/TTLExpression.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/TTLExpression.java
new file mode 100644
index 0000000000..126389a46c
--- /dev/null
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/TTLExpression.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.schema;
+
+import java.sql.SQLException;
+import java.util.List;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
+import org.apache.phoenix.parse.CreateTableStatement;
+
+public abstract class TTLExpression {
+
+    public static final TTLExpression TTL_EXPRESSION_FORVER =
+            new LiteralTTLExpression(HConstants.FOREVER);
+    public static final TTLExpression TTL_EXPRESSION_NOT_DEFINED =
+            new LiteralTTLExpression(PhoenixDatabaseMetaData.TTL_NOT_DEFINED);
+
+    public static TTLExpression create(String ttlExpr) {
+        if (PhoenixDatabaseMetaData.NONE_TTL.equalsIgnoreCase(ttlExpr)) {
+            return TTL_EXPRESSION_NOT_DEFINED;
+        } else if 
(PhoenixDatabaseMetaData.FOREVER_TTL.equalsIgnoreCase(ttlExpr)) {
+            return TTL_EXPRESSION_FORVER;
+        } else {
+            try {
+                int ttlValue = Integer.parseInt(ttlExpr);
+                return create(ttlValue);
+            } catch (NumberFormatException e) {
+                return new ConditionTTLExpression(ttlExpr);
+            }
+        }
+    }
+
+    public static TTLExpression create (int ttlValue) {
+        if (ttlValue == PhoenixDatabaseMetaData.TTL_NOT_DEFINED) {
+            return TTL_EXPRESSION_NOT_DEFINED;
+        } else if (ttlValue == HConstants.FOREVER) {
+            return TTL_EXPRESSION_FORVER;
+        } else {
+            return new LiteralTTLExpression(ttlValue);
+        }
+    }
+
+    abstract public String getTTLExpression();
+
+    abstract public long getTTLForRow(List<Cell> result);
+
+    abstract public String toString();
+
+    abstract public void validateTTLOnCreation(PhoenixConnection conn,
+                                               CreateTableStatement create) 
throws SQLException;
+
+    abstract public void validateTTLOnAlter(PhoenixConnection connection, 
PTable table) throws SQLException;
+
+    abstract public String getTTLForScanAttribute();
+
+}
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/schema/TableProperty.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/TableProperty.java
index 7f42c42124..6a287dabd9 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/schema/TableProperty.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/TableProperty.java
@@ -261,16 +261,12 @@ public enum TableProperty {
         @Override
         public Object getValue(Object value) {
             if (value instanceof String) {
-                String strValue = (String) value;
-                if ("FOREVER".equalsIgnoreCase(strValue)) {
-                    return HConstants.FOREVER;
-                } else if ("NONE".equalsIgnoreCase(strValue)) {
-                    return TTL_NOT_DEFINED;
-                }
+                return TTLExpression.create((String)value);
             } else if (value != null) {
                 //Not converting to milli-seconds for better understanding at 
compaction and masking
                 //stage. As HBase Descriptor level gives this value in seconds.
-                return ((Number) value).intValue();
+                int ttlValue = ((Number) value).intValue();
+                return TTLExpression.create(ttlValue);
             }
             return value;
         }
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/util/ScanUtil.java 
b/phoenix-core-client/src/main/java/org/apache/phoenix/util/ScanUtil.java
index 77e45541f3..caa21fb337 100644
--- a/phoenix-core-client/src/main/java/org/apache/phoenix/util/ScanUtil.java
+++ b/phoenix-core-client/src/main/java/org/apache/phoenix/util/ScanUtil.java
@@ -101,6 +101,8 @@ import org.apache.phoenix.schema.PTableKey;
 import org.apache.phoenix.schema.PTableType;
 import org.apache.phoenix.schema.RowKeySchema;
 import org.apache.phoenix.schema.SortOrder;
+import org.apache.phoenix.schema.TTLExpression;
+import org.apache.phoenix.schema.LiteralTTLExpression;
 import org.apache.phoenix.schema.TableNotFoundException;
 import org.apache.phoenix.schema.ValueSchema.Field;
 import org.apache.phoenix.schema.transform.SystemTransformRecord;
@@ -109,8 +111,8 @@ import org.apache.phoenix.schema.transform.TransformClient;
 import org.apache.phoenix.schema.tuple.ResultTuple;
 import org.apache.phoenix.schema.tuple.Tuple;
 import org.apache.phoenix.schema.types.PDataType;
-import org.apache.phoenix.schema.types.PInteger;
 import org.apache.phoenix.schema.types.PVarbinary;
+import org.apache.phoenix.schema.types.PVarchar;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -1086,7 +1088,9 @@ public class ScanUtil {
         if (phoenixTTL == null) {
             return DEFAULT_TTL;
         }
-        return Bytes.readAsInt(phoenixTTL, 0, phoenixTTL.length);
+        String ttlStr = (String) PVarchar.INSTANCE.toObject(phoenixTTL);
+        LiteralTTLExpression literal = (LiteralTTLExpression) 
TTLExpression.create(ttlStr);
+        return literal.getTTLValue();
     }
 
     public static boolean isPhoenixTableTTLEnabled(Configuration conf) {
@@ -1358,7 +1362,9 @@ public class ScanUtil {
                 return;
             }
         }
-        if (dataTable.getTTL() != 0) {
+        TTLExpression ttlExpr = dataTable.getTTL();
+        String ttlForScan = ttlExpr.getTTLForScanAttribute();
+        if (ttlForScan != null) {
             byte[] emptyColumnFamilyName = 
SchemaUtil.getEmptyColumnFamily(table);
             byte[] emptyColumnName =
                     table.getEncodingScheme() == 
PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS ?
@@ -1368,8 +1374,7 @@ public class ScanUtil {
                     Bytes.toBytes(tableName));
             
scan.setAttribute(BaseScannerRegionObserverConstants.EMPTY_COLUMN_FAMILY_NAME, 
emptyColumnFamilyName);
             
scan.setAttribute(BaseScannerRegionObserverConstants.EMPTY_COLUMN_QUALIFIER_NAME,
 emptyColumnName);
-            scan.setAttribute(BaseScannerRegionObserverConstants.TTL,
-                    Bytes.toBytes(Integer.valueOf(dataTable.getTTL())));
+            scan.setAttribute(BaseScannerRegionObserverConstants.TTL, 
Bytes.toBytes(ttlForScan));
             if (!ScanUtil.isDeleteTTLExpiredRows(scan)) {
                 
scan.setAttribute(BaseScannerRegionObserverConstants.MASK_PHOENIX_TTL_EXPIRED, 
PDataType.TRUE_BYTES);
             }
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java
index 70b57fb213..e19b6d45b3 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java
@@ -64,6 +64,7 @@ import org.apache.phoenix.schema.IllegalDataException;
 import org.apache.phoenix.schema.PName;
 import org.apache.phoenix.schema.PNameFactory;
 import org.apache.phoenix.schema.SortOrder;
+import org.apache.phoenix.schema.TTLExpression;
 import org.apache.phoenix.schema.types.PDataType;
 import org.apache.phoenix.schema.types.PLong;
 import org.apache.phoenix.schema.types.PSmallint;
@@ -88,13 +89,13 @@ import static 
org.apache.phoenix.coprocessor.CompactionScanner.MatcherType.GLOBA
 import static 
org.apache.phoenix.coprocessor.CompactionScanner.MatcherType.GLOBAL_VIEWS;
 import static 
org.apache.phoenix.coprocessor.CompactionScanner.MatcherType.TENANT_INDEXES;
 import static 
org.apache.phoenix.coprocessor.CompactionScanner.MatcherType.TENANT_VIEWS;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DEFAULT_TTL;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CHILD_LINK_NAMESPACE_BYTES;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CHILD_LINK_NAME_BYTES;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TTL_NOT_DEFINED;
 import static 
org.apache.phoenix.query.QueryConstants.LOCAL_INDEX_COLUMN_FAMILY_PREFIX;
 import static 
org.apache.phoenix.query.QueryServices.PHOENIX_VIEW_TTL_TENANT_VIEWS_PER_SCAN_LIMIT;
 import static 
org.apache.phoenix.query.QueryServicesOptions.DEFAULT_PHOENIX_VIEW_TTL_TENANT_VIEWS_PER_SCAN_LIMIT;
+import static org.apache.phoenix.schema.TTLExpression.TTL_EXPRESSION_FORVER;
+import static 
org.apache.phoenix.schema.TTLExpression.TTL_EXPRESSION_NOT_DEFINED;
 import static org.apache.phoenix.util.ByteUtil.EMPTY_BYTE_ARRAY;
 
 import java.util.ArrayList;
@@ -207,7 +208,7 @@ public class CompactionScanner implements InternalScanner {
 
         LOGGER.info("Starting CompactionScanner for table " + tableName + " 
store "
                 + columnFamilyName + (major ? " major " : " not major ") + 
"compaction ttl "
-                + ttlTracker.getRowContext().getTTL() + "ms " + "max lookback 
" + this.maxLookbackInMillis + "ms");
+                + ttlTracker.getDefaultTTL() + " " + "max lookback " + 
this.maxLookbackInMillis + "ms");
         LOGGER.info(String.format("CompactionScanner params:- (" +
                         "physical-data-tablename = %s, compaction-tablename = 
%s, region = %s, " +
                         "start-key = %s, end-key = %s, " +
@@ -563,7 +564,7 @@ public class CompactionScanner implements InternalScanner {
 
             if (tableList != null && !tableList.isEmpty()) {
                 tableList.forEach(m -> {
-                    if (m.getTTL() != TTL_NOT_DEFINED) {
+                    if (m.getTTL() != TTL_EXPRESSION_NOT_DEFINED) {
                         // add the ttlInfo to the cache.
                         // each new/unique ttlInfo object added returns a 
unique tableId.
                         int tableId = -1;
@@ -643,7 +644,7 @@ public class CompactionScanner implements InternalScanner {
 
             if (tableList != null && !tableList.isEmpty()) {
                 tableList.forEach(m -> {
-                    if (m.getTTL() != TTL_NOT_DEFINED) {
+                    if (m.getTTL() != TTL_EXPRESSION_NOT_DEFINED) {
                         // add the ttlInfo to the cache.
                         // each new/unique ttlInfo object added returns a 
unique tableId.
                         int tableId = -1;
@@ -992,8 +993,8 @@ public class CompactionScanner implements InternalScanner {
                             String schem = viewTTLRS.getString("TABLE_SCHEM");
                             String tName = viewTTLRS.getString("TABLE_NAME");
                             String viewTTLStr = viewTTLRS.getString("TTL");
-                            int viewTTL = viewTTLStr == null || 
viewTTLStr.isEmpty() ?
-                                    TTL_NOT_DEFINED : 
Integer.valueOf(viewTTLStr);
+                            TTLExpression viewTTL = viewTTLStr == null || 
viewTTLStr.isEmpty() ?
+                                    TTL_EXPRESSION_NOT_DEFINED : 
TTLExpression.create(viewTTLStr);
                             byte[] rowKeyMatcher = 
viewTTLRS.getBytes("ROW_KEY_MATCHER");
                             byte[]
                                     tenantIdBytes =
@@ -1155,12 +1156,11 @@ public class CompactionScanner implements 
InternalScanner {
      *          For Flushes and Minor compaction we do not need to track the 
TTL.
      */
     private interface TTLTracker {
-        // Set the TTL for the given row in the row-context being tracked.
-        void setTTL(Cell firstCell) throws IOException;
-        // get the row context for the current row.
-        RowContext getRowContext();
-        // set the row context for the current row.
-        void setRowContext(RowContext rowContext);
+        // get TTL for the row
+        long getTTL(List<Cell> result) throws IOException;
+
+        // get the default TTL
+        TTLExpression getDefaultTTL();
     }
 
     /**
@@ -1169,47 +1169,31 @@ public class CompactionScanner implements 
InternalScanner {
      */
     private class TableTTLTrackerForFlushesAndMinor implements TTLTracker {
 
-        private long ttl;
-        private RowContext rowContext;
+        private TTLExpression ttl;
 
         public TableTTLTrackerForFlushesAndMinor(String tableName) {
 
-            ttl = DEFAULT_TTL;
+            ttl = TTL_EXPRESSION_FORVER;
             LOGGER.info(String.format(
                     "TableTTLTrackerForFlushesAndMinor params:- " +
-                            "(table-name=%s, ttl=%d)",
-                    tableName, ttl*1000));
+                            "(table-name=%s, ttl=%s)",
+                    tableName, ttl));
         }
 
         @Override
-        public void setTTL(Cell firstCell) {
-            if (this.rowContext == null) {
-                this.rowContext = new RowContext();
-            }
-            this.rowContext.setTTL(ttl);
-
-        }
-
-        @Override
-        public RowContext getRowContext() {
-            if (this.rowContext == null) {
-                this.rowContext = new RowContext();
-                this.rowContext.setTTL(ttl);
-            }
-            return rowContext;
+        public long getTTL(List<Cell> result) throws IOException {
+            return ttl.getTTLForRow(result);
         }
 
         @Override
-        public void setRowContext(RowContext rowContext) {
-            this.rowContext = rowContext;
-            this.rowContext.setTTL(ttl);
+        public TTLExpression getDefaultTTL() {
+            return ttl;
         }
     }
 
     private class NonPartitionedTableTTLTracker implements TTLTracker {
 
-        private long ttl;
-        private RowContext rowContext;
+        private TTLExpression ttl;
 
         public NonPartitionedTableTTLTracker(
                 PTable pTable,
@@ -1218,38 +1202,25 @@ public class CompactionScanner implements 
InternalScanner {
             boolean isSystemTable = pTable.getType() == PTableType.SYSTEM;
             if (isSystemTable) {
                 ColumnFamilyDescriptor cfd = store.getColumnFamilyDescriptor();
-                ttl = cfd.getTimeToLive();
+                ttl = TTLExpression.create(cfd.getTimeToLive());
             } else {
-                ttl = pTable.getTTL() != TTL_NOT_DEFINED ? pTable.getTTL() : 
DEFAULT_TTL;
+                ttl = pTable.getTTL() != TTL_EXPRESSION_NOT_DEFINED ? 
pTable.getTTL() :
+                        TTL_EXPRESSION_FORVER;
             }
             LOGGER.info(String.format(
                     "NonPartitionedTableTTLTracker params:- " +
-                            "(physical-name=%s, ttl=%d, isSystemTable=%s)",
-                    pTable.getName().toString(), ttl*1000, isSystemTable));
-        }
-
-        @Override
-        public void setTTL(Cell firstCell) {
-            if (this.rowContext == null) {
-                this.rowContext = new RowContext();
-            }
-            this.rowContext.setTTL(ttl);
-
+                            "(physical-name=%s, ttl=%s, isSystemTable=%s)",
+                    pTable.getName().toString(), ttl.getTTLExpression(), 
isSystemTable));
         }
 
         @Override
-        public RowContext getRowContext() {
-            if (this.rowContext == null) {
-                this.rowContext = new RowContext();
-                this.rowContext.setTTL(ttl);
-            }
-            return rowContext;
+        public long getTTL(List<Cell> result) throws IOException {
+            return ttl.getTTLForRow(result);
         }
 
         @Override
-        public void setRowContext(RowContext rowContext) {
-            this.rowContext = rowContext;
-            this.rowContext.setTTL(ttl);
+        public TTLExpression getDefaultTTL() {
+            return ttl;
         }
     }
 
@@ -1258,9 +1229,7 @@ public class CompactionScanner implements InternalScanner 
{
                 PartitionedTableTTLTracker.class);
 
         // Default or Table-Level TTL
-        private long ttl;
-        private RowContext rowContext;
-
+        private TTLExpression ttl;
         private boolean isSharedIndex = false;
         private boolean isMultiTenant = false;
         private boolean isSalted = false;
@@ -1281,7 +1250,8 @@ public class CompactionScanner implements InternalScanner 
{
                 this.tableRowKeyMatcher =
                         new PartitionedTableRowKeyMatcher(table, isSalted, 
isSharedIndex,
                                 isLongViewIndexEnabled, 
viewTTLTenantViewsPerScanLimit);
-                this.ttl = table.getTTL() != TTL_NOT_DEFINED ? table.getTTL() 
: DEFAULT_TTL;
+                this.ttl = table.getTTL() != TTL_EXPRESSION_NOT_DEFINED ? 
table.getTTL() :
+                        TTL_EXPRESSION_FORVER;
                 this.isSharedIndex = isSharedIndex || localIndex;
                 this.isLongViewIndexEnabled = isLongViewIndexEnabled;
                 this.isSalted = isSalted;
@@ -1292,7 +1262,7 @@ public class CompactionScanner implements InternalScanner 
{
                         "PartitionedTableTTLTracker params:- " + 
                                 "region-name = %s, table-name = %s,  " +
                                 "multi-tenant = %s, shared-index = %s, salted 
= %s, " +
-                                "default-ttl = %d, startingPKPosition = %d",
+                                "default-ttl = %s, startingPKPosition = %d",
                         region.getRegionInfo().getEncodedName(),
                         region.getRegionInfo().getTable().getNameAsString(), 
this.isMultiTenant,
                         this.isSharedIndex, this.isSalted, this.ttl, 
this.startingPKPosition));
@@ -1370,15 +1340,16 @@ public class CompactionScanner implements 
InternalScanner {
         }
 
         @Override
-        public void setTTL(Cell firstCell) throws IOException {
-
+        public long getTTL(List<Cell> result) throws IOException {
             boolean matched = false;
             TableTTLInfo tableTTLInfo = null;
             List<Integer> pkPositions = null;
-            long rowTTLInSecs = ttl;
+            long defaultTTLInSecs = ttl.getTTLForRow(result);
+            long rowTTLInSecs = defaultTTLInSecs;
             long matchedOffset = -1;
             int pkPosition = startingPKPosition;
             MatcherType matchedType = null;
+            Cell firstCell = result.get(0);
             try {
                 // pkPositions holds the byte offsets for the PKs of the base 
table
                 // for the current row
@@ -1430,11 +1401,12 @@ public class CompactionScanner implements 
InternalScanner {
                 }
                 matched = tableTTLInfo != null;
                 matchedOffset = matched ? offset : -1;
-                rowTTLInSecs = matched ? tableTTLInfo.getTTL() : ttl; /* in 
secs */
-                if (this.rowContext == null) {
-                    this.rowContext = new RowContext();
+                if (matched) {
+                    rowTTLInSecs = tableTTLInfo.getTTL().getTTLForRow(result);
+                } else {
+                    rowTTLInSecs = defaultTTLInSecs; /* in secs */
                 }
-                this.rowContext.setTTL(rowTTLInSecs);
+                return rowTTLInSecs;
             } catch (SQLException e) {
                 LOGGER.error(String.format("Exception when visiting table: " + 
e.getMessage()));
                 throw new IOException(e);
@@ -1459,22 +1431,11 @@ public class CompactionScanner implements 
InternalScanner {
                                     .collect(Collectors.joining(",")) : ""));
                 }
             }
-
         }
 
         @Override
-        public RowContext getRowContext() {
-            if (this.rowContext == null) {
-                this.rowContext = new RowContext();
-                this.rowContext.setTTL(ttl);
-            }
-            return rowContext;
-        }
-
-        @Override
-        public void setRowContext(RowContext rowContext) {
-            this.rowContext = rowContext;
-            this.rowContext.setTTL(ttl);
+        public TTLExpression getDefaultTTL() {
+            return ttl;
         }
     }
 
@@ -2082,9 +2043,9 @@ public class CompactionScanner implements InternalScanner 
{
         }
 
         private void formCompactionRowVersions(LinkedList<LinkedList<Cell>> 
columns,
-                List<Cell> result) {
+                List<Cell> result) throws IOException {
             rowContext.init();
-            rowTracker.setRowContext(rowContext);
+            rowContext.setTTL(rowTracker.getTTL(result));
             while (!columns.isEmpty()) {
                 formNextCompactionRowVersion(columns, rowContext, result);
                 // Remove the columns that are empty
@@ -2129,7 +2090,7 @@ public class CompactionScanner implements InternalScanner 
{
          * Compacts a single row at the HBase level. The result parameter is 
the input row and
          * modified to be the output of the compaction.
          */
-        private void compact(List<Cell> result) {
+        private void compact(List<Cell> result) throws IOException {
             if (result.isEmpty()) {
                 return;
             }
@@ -2181,7 +2142,7 @@ public class CompactionScanner implements InternalScanner 
{
             boolean isEmptyColumn = false;
             Cell cellAtMaxLookbackWindowStart = null;
             for (Cell cell : result) {
-                if (cell.getTimestamp() > 
rowTracker.getRowContext().getMaxLookbackWindowStart()) {
+                if (cell.getTimestamp() > 
rowContext.getMaxLookbackWindowStart()) {
                     retainedCells.add(cell);
                     if (cell.getTimestamp() == maxLookbackWindowStart) {
                         cellAtMaxLookbackWindowStart = cell;
@@ -2257,8 +2218,9 @@ public class CompactionScanner implements InternalScanner 
{
             if (lastRow.isEmpty()) {
                 return;
             }
+            // init doesn't change ttl
             rowContext.init();
-            rowTracker.setRowContext(rowContext);
+            // ttl has already been evaluated
             long ttl = rowContext.getTTL();
             rowContext.getNextRowVersionTimestamps(lastRow, storeColumnFamily);
             Cell firstCell = lastRow.get(0);
@@ -2322,7 +2284,7 @@ public class CompactionScanner implements InternalScanner 
{
                 retainCellsOfLastRowVersion(lastRowVersion, emptyColumn, 
retainedCells);
                 return true;
             }
-            long ttl = rowTracker.getRowContext().getTTL();
+            long ttl = rowContext.getTTL();
             long maxTimestamp = 0;
             long minTimestamp = Long.MAX_VALUE;
             long ts;
@@ -2424,7 +2386,7 @@ public class CompactionScanner implements InternalScanner 
{
                 return;
             }
             phoenixResult.clear();
-            rowTracker.setTTL(result.get(0));
+            rowContext.setTTL(rowTracker.getTTL(result));
             if (!retainCellsForMaxLookback(result, regionLevel, 
phoenixResult)) {
                 if (familyCount == 1 || regionLevel) {
                     throw new RuntimeException("UNEXPECTED");
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
index b54fd8c1e9..8595254281 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
@@ -58,7 +58,6 @@ import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.NUM_ARGS_BYTES;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ORDINAL_POSITION_BYTES;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.PARENT_TENANT_ID_BYTES;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TTL_NOT_DEFINED;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.PHYSICAL_TABLE_NAME_BYTES;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.PK_NAME_BYTES;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.RETURN_TYPE_BYTES;
@@ -96,6 +95,7 @@ import static 
org.apache.phoenix.schema.PTableImpl.getColumnsToClone;
 import static org.apache.phoenix.schema.PTableType.CDC;
 import static org.apache.phoenix.schema.PTableType.INDEX;
 import static org.apache.phoenix.schema.PTableType.VIEW;
+import static 
org.apache.phoenix.schema.TTLExpression.TTL_EXPRESSION_NOT_DEFINED;
 import static org.apache.phoenix.util.PhoenixRuntime.TENANT_ID_ATTRIB;
 import static org.apache.phoenix.util.SchemaUtil.*;
 import static org.apache.phoenix.util.ViewUtil.findAllDescendantViews;
@@ -241,6 +241,7 @@ import 
org.apache.phoenix.schema.SequenceAlreadyExistsException;
 import org.apache.phoenix.schema.SequenceKey;
 import org.apache.phoenix.schema.SequenceNotFoundException;
 import org.apache.phoenix.schema.SortOrder;
+import org.apache.phoenix.schema.TTLExpression;
 import org.apache.phoenix.schema.TableNotFoundException;
 import org.apache.phoenix.schema.export.SchemaRegistryRepository;
 import org.apache.phoenix.schema.export.SchemaRegistryRepositoryFactory;
@@ -1476,17 +1477,17 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
             maxLookbackAge = scanMaxLookbackAgeFromParent(viewKey, 
clientTimeStamp);
         }
         Cell ttlKv = tableKeyValues[TTL_INDEX];
-        int ttl = TTL_NOT_DEFINED;
+        TTLExpression ttl = TTL_EXPRESSION_NOT_DEFINED;
         if (ttlKv != null) {
             String ttlStr = (String) PVarchar.INSTANCE.toObject(
                     ttlKv.getValueArray(),
                     ttlKv.getValueOffset(),
                     ttlKv.getValueLength());
-            ttl = Integer.parseInt(ttlStr);
+            ttl = TTLExpression.create(ttlStr);
         }
         ttl = ttlKv != null ? ttl : oldTable != null
-                ? oldTable.getTTL() : TTL_NOT_DEFINED;
-        if (tableType == VIEW && viewType != MAPPED && ttl == TTL_NOT_DEFINED) 
{
+                ? oldTable.getTTL() : TTL_EXPRESSION_NOT_DEFINED;
+        if (tableType == VIEW && viewType != MAPPED && ttl == 
TTL_EXPRESSION_NOT_DEFINED) {
             //Scan SysCat to get TTL from Parent View/Table
             byte[] viewKey = SchemaUtil.getTableKey(tenantId == null ? null : 
tenantId.getBytes(),
                     schemaName == null ? null : schemaName.getBytes(), 
tableNameBytes);
@@ -1658,7 +1659,7 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
         builder.setMaxLookbackAge(maxLookbackAge != null ? maxLookbackAge :
                 (oldTable != null ? oldTable.getMaxLookbackAge() : null));
 
-        if(tableType == INDEX && !isThisAViewIndex && ttl == TTL_NOT_DEFINED) {
+        if (tableType == INDEX && !isThisAViewIndex && ttl == 
TTL_EXPRESSION_NOT_DEFINED) {
             //If this is an index on Table get TTL from Table
             byte[] tableKey = getTableKey(tenantId == null ? null : 
tenantId.getBytes(),
                     parentSchemaName == null ? null : 
parentSchemaName.getBytes(),
@@ -1764,7 +1765,7 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
      * @throws SQLException
      */
 
-    private int getTTLFromHierarchy(byte[] viewKey, long clientTimeStamp, 
boolean checkForMappedView) throws IOException, SQLException {
+    private TTLExpression getTTLFromHierarchy(byte[] viewKey, long 
clientTimeStamp, boolean checkForMappedView) throws IOException, SQLException {
         Scan scan = MetaDataUtil.newTableRowsScan(viewKey, 
MIN_TABLE_TIMESTAMP, clientTimeStamp);
         Table sysCat = ServerUtil.getHTableForCoprocessorScan(this.env,
                 SchemaUtil.getPhysicalTableName(SYSTEM_CATALOG_NAME_BYTES,
@@ -1776,12 +1777,12 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
         do {
 
             if (result == null) {
-                return TTL_NOT_DEFINED;
+                return TTL_EXPRESSION_NOT_DEFINED;
             }
 
             //return TTL_NOT_DEFINED for Index on a Mapped View.
             if (checkForMappedView && checkIfViewIsMappedView(result)) {
-                return TTL_NOT_DEFINED;
+                return TTL_EXPRESSION_NOT_DEFINED;
             }
 
             byte[] linkTypeBytes = result.getValue(TABLE_FAMILY_BYTES, 
LINK_TYPE_BYTES);
@@ -1791,7 +1792,7 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
             if (result.getValue(TABLE_FAMILY_BYTES, TTL_BYTES) != null) {
                 String ttlStr = (String) PVarchar.INSTANCE.toObject(
                         result.getValue(DEFAULT_COLUMN_FAMILY_BYTES, 
TTL_BYTES));
-                return Integer.parseInt(ttlStr);
+                return TTLExpression.create(ttlStr);
             } else if (linkTypeBytes != null ) {
                 String parentSchema =SchemaUtil.getSchemaNameFromFullName(
                         
rowKeyMetaData[PhoenixDatabaseMetaData.FAMILY_NAME_INDEX]);
@@ -1840,7 +1841,7 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
      * @return TTL defined for a given table if it is null then return 
TTL_NOT_DEFINED(0)
      * @throws IOException
      */
-    private int getTTLForTable(byte[] tableKey, long clientTimeStamp) throws 
IOException {
+    private TTLExpression getTTLForTable(byte[] tableKey, long 
clientTimeStamp) throws IOException {
         Scan scan = MetaDataUtil.newTableRowsScan(tableKey, 
MIN_TABLE_TIMESTAMP, clientTimeStamp);
         Table sysCat = ServerUtil.getHTableForCoprocessorScan(this.env,
                 SchemaUtil.getPhysicalTableName(SYSTEM_CATALOG_NAME_BYTES,
@@ -1849,16 +1850,16 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
         Result result = scanner.next();
         do {
             if (result == null) {
-                return TTL_NOT_DEFINED;
+                return TTL_EXPRESSION_NOT_DEFINED;
             }
             if (result.getValue(TABLE_FAMILY_BYTES, TTL_BYTES) != null) {
                 String ttlStr = (String) PVarchar.INSTANCE.toObject(
                         result.getValue(DEFAULT_COLUMN_FAMILY_BYTES, 
TTL_BYTES));
-                return Integer.parseInt(ttlStr);
+                return TTLExpression.create(ttlStr);
             }
             result = scanner.next();
         } while (result != null);
-        return TTL_NOT_DEFINED;
+        return TTL_EXPRESSION_NOT_DEFINED;
     }
 
     private Long getViewIndexId(Cell[] tableKeyValues, PDataType 
viewIndexIdType) {
@@ -3826,8 +3827,8 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
                     Cell cell = cells.get(0);
                     String newTTLStr = (String) 
PVarchar.INSTANCE.toObject(cell.getValueArray(),
                             cell.getValueOffset(), cell.getValueLength());
-                    int newTTL = Integer.parseInt(newTTLStr);
-                    return newTTL != TTL_NOT_DEFINED;
+                    TTLExpression newTTL = TTLExpression.create(newTTLStr);
+                    return newTTL != TTL_EXPRESSION_NOT_DEFINED;
                 }
             }
         }
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyMapper.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyMapper.java
index 78161a9c64..925edc9ac5 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyMapper.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyMapper.java
@@ -35,6 +35,7 @@ import java.util.Set;
 
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.schema.LiteralTTLExpression;
 import 
org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.codec.binary.Hex;
 import org.apache.hadoop.conf.Configuration;
@@ -69,7 +70,7 @@ import org.slf4j.LoggerFactory;
 import org.apache.phoenix.thirdparty.com.google.common.base.Joiner;
 
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DEFAULT_TTL;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TTL_NOT_DEFINED;
+import static 
org.apache.phoenix.schema.TTLExpression.TTL_EXPRESSION_NOT_DEFINED;
 
 /**
  * Mapper that reads from the data table and checks the rows against the index 
table
@@ -344,8 +345,8 @@ public class IndexScrutinyMapper extends 
Mapper<NullWritable, PhoenixIndexDBWrit
                 SchemaUtil.isNamespaceMappingEnabled(null, cqsi.getProps()));
         if (configuration.getBoolean(QueryServices.PHOENIX_TABLE_TTL_ENABLED,
                 QueryServicesOptions.DEFAULT_PHOENIX_TABLE_TTL_ENABLED)) {
-            return pSourceTable.getTTL() == TTL_NOT_DEFINED ? DEFAULT_TTL
-                    : pSourceTable.getTTL();
+            return pSourceTable.getTTL() == TTL_EXPRESSION_NOT_DEFINED ? 
DEFAULT_TTL
+                    : ((LiteralTTLExpression) 
pSourceTable.getTTL()).getTTLValue(); // TODO
         } else {
             TableDescriptor tableDesc;
             try (Admin admin = cqsi.getAdmin()) {
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/util/DefaultPhoenixMultiViewListProvider.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/util/DefaultPhoenixMultiViewListProvider.java
index 31ba3f19f4..9c4f6093a8 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/util/DefaultPhoenixMultiViewListProvider.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/util/DefaultPhoenixMultiViewListProvider.java
@@ -139,7 +139,7 @@ public class DefaultPhoenixMultiViewListProvider implements 
PhoenixMultiViewList
             PTable parentTable = connection.getTable(null, 
pTable.getParentName().toString());
             System.out.println("Parent Table");
             if (parentTable.getType() == PTableType.VIEW &&
-                    parentTable.getTTL() > 0) {
+                    parentTable.getTTL() != null) {
                             /* if the current view parent already has a TTL 
value, we want to
                             skip the current view cleanup job because we want 
to run the cleanup
                              job for at the GlobalView level instead of 
running multi-jobs at
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CreateTableIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CreateTableIT.java
index 879539473d..09c3b979e0 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CreateTableIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CreateTableIT.java
@@ -62,7 +62,6 @@ import org.apache.hadoop.hbase.client.TableDescriptor;
 import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
 import org.apache.hadoop.hbase.regionserver.BloomType;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
@@ -81,13 +80,13 @@ import 
org.apache.phoenix.schema.PTable.QualifierEncodingScheme;
 import org.apache.phoenix.schema.PTableKey;
 import org.apache.phoenix.schema.PTableType;
 import org.apache.phoenix.schema.SchemaNotFoundException;
+import org.apache.phoenix.schema.LiteralTTLExpression;
 import org.apache.phoenix.schema.TableAlreadyExistsException;
 import org.apache.phoenix.schema.TableNotFoundException;
 import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
 import org.apache.phoenix.util.EnvironmentEdgeManager;
 import org.apache.phoenix.util.IndexUtil;
-import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.QueryUtil;
 import org.apache.phoenix.util.ReadOnlyProps;
@@ -479,7 +478,7 @@ public class CreateTableIT extends ParallelStatsDisabledIT {
         assertEquals(1, columnFamilies.length);
         assertEquals(86400, columnFamilies[0].getTimeToLive());
         //Check if TTL is stored in SYSCAT as well and we are getting ttl from 
get api in PTable
-        assertEquals(86400, conn.unwrap(PhoenixConnection.class).getTable(
+        assertEquals(new LiteralTTLExpression(86400), 
conn.unwrap(PhoenixConnection.class).getTable(
                 new PTableKey(null, tableName)).getTTL());
     }
 
@@ -538,7 +537,7 @@ public class CreateTableIT extends ParallelStatsDisabledIT {
         assertEquals(86400, columnFamilies[1].getTimeToLive());
         assertEquals("C", columnFamilies[1].getNameAsString());
         //Check if TTL is stored in SYSCAT as well and we are getting ttl from 
get api in PTable
-        assertEquals(86400, conn.unwrap(PhoenixConnection.class).getTable(
+        assertEquals(new LiteralTTLExpression(86400), 
conn.unwrap(PhoenixConnection.class).getTable(
                 new PTableKey(null, tableName)).getTTL());
     }
 
@@ -567,7 +566,7 @@ public class CreateTableIT extends ParallelStatsDisabledIT {
         assertEquals("B", columnFamilies[1].getNameAsString());
         assertEquals(86400, columnFamilies[1].getTimeToLive());
         //Check if TTL is stored in SYSCAT as well and we are getting ttl from 
get api in PTable
-        assertEquals(86400, conn.unwrap(PhoenixConnection.class).getTable(
+        assertEquals(new LiteralTTLExpression(86400), 
conn.unwrap(PhoenixConnection.class).getTable(
                 new PTableKey(null, tableName)).getTTL());
     }
 
@@ -648,7 +647,7 @@ public class CreateTableIT extends ParallelStatsDisabledIT {
         assertEquals("a", columnFamilies[0].getNameAsString());
         assertEquals(10000, columnFamilies[0].getTimeToLive());
         //Check if TTL is stored in SYSCAT as well and we are getting ttl from 
get api in PTable
-        assertEquals(10000, conn.unwrap(PhoenixConnection.class).getTable(
+        assertEquals(new LiteralTTLExpression(10000), 
conn.unwrap(PhoenixConnection.class).getTable(
                 new PTableKey(null, tableName)).getTTL());
     }
 
@@ -673,7 +672,7 @@ public class CreateTableIT extends ParallelStatsDisabledIT {
         assertEquals("a", columnFamilies[0].getNameAsString());
         assertEquals(10000, columnFamilies[0].getTimeToLive());
         //Check if TTL is stored in SYSCAT as well and we are getting ttl from 
get api in PTable
-        assertEquals(10000, conn.unwrap(PhoenixConnection.class).getTable(
+        assertEquals(new LiteralTTLExpression(10000), 
conn.unwrap(PhoenixConnection.class).getTable(
                 new PTableKey(null, tableName)).getTTL());
     }
 
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SetPropertyIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SetPropertyIT.java
index 2a79254b07..0a487a5df5 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SetPropertyIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SetPropertyIT.java
@@ -38,13 +38,13 @@ import 
org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
 import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
 import org.apache.hadoop.hbase.client.TableDescriptor;
 import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
-import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.jdbc.PhoenixConnection;
-import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.PTableKey;
+import org.apache.phoenix.schema.TTLExpression;
+import org.apache.phoenix.schema.LiteralTTLExpression;
 import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
 import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.ReadOnlyProps;
@@ -430,7 +430,7 @@ public abstract class SetPropertyIT extends 
ParallelStatsDisabledIT {
             assertEquals(ColumnFamilyDescriptorBuilder.DEFAULT_BLOCKSIZE, 
columnFamilies[1].getBlocksize());
             assertEquals(Boolean.toString(false), 
tableDesc.getValue(TableDescriptorBuilder.COMPACTION_ENABLED));
             //Check if Alter Table TTL also changes TTL stored in SYSCAT with 
phoenix.table.ttl disabled
-            assertEquals(1000, conn.unwrap(PhoenixConnection.class).getTable(
+            assertEquals(new LiteralTTLExpression(1000), 
conn.unwrap(PhoenixConnection.class).getTable(
                     new PTableKey(null, dataTableFullName)).getTTL());
         }
     }
@@ -793,7 +793,7 @@ public abstract class SetPropertyIT extends 
ParallelStatsDisabledIT {
                 assertEquals("XYZ", columnFamilies[0].getNameAsString());
                 assertEquals(86400, columnFamilies[0].getTimeToLive());
                 //Check if TTL is stored in SYSCAT as well and we are getting 
ttl from get api in PTable
-                assertEquals(86400, 
conn.unwrap(PhoenixConnection.class).getTable(
+                assertEquals(new LiteralTTLExpression(86400), 
conn.unwrap(PhoenixConnection.class).getTable(
                         new PTableKey(null, dataTableFullName)).getTTL());
             }
             ddl = "ALTER TABLE " + dataTableFullName + " SET TTL=30";
@@ -806,7 +806,7 @@ public abstract class SetPropertyIT extends 
ParallelStatsDisabledIT {
                 assertEquals(30, columnFamilies[0].getTimeToLive());
                 assertEquals("XYZ", columnFamilies[0].getNameAsString());
                 //Check if Alter Table TTL also changes TTL stored in SYSCAT 
with phoenix.table.ttl disabled
-                assertEquals(30, conn.unwrap(PhoenixConnection.class).getTable(
+                assertEquals(new LiteralTTLExpression(30), 
conn.unwrap(PhoenixConnection.class).getTable(
                         new PTableKey(null, dataTableFullName)).getTTL());
             }
         } finally {
@@ -834,7 +834,7 @@ public abstract class SetPropertyIT extends 
ParallelStatsDisabledIT {
                 assertEquals("XYZ", columnFamilies[0].getNameAsString());
                 assertEquals(HConstants.FOREVER, 
columnFamilies[0].getTimeToLive());
                 //Check if TTL is stored in SYSCAT as well and we are getting 
ttl from get api in PTable
-                assertEquals(PhoenixDatabaseMetaData.TTL_NOT_DEFINED, 
conn.unwrap(PhoenixConnection.class).getTable(
+                assertEquals(TTLExpression.TTL_EXPRESSION_NOT_DEFINED, 
conn.unwrap(PhoenixConnection.class).getTable(
                         new PTableKey(null, dataTableFullName)).getTTL());
             }
             ddl = "ALTER TABLE " + dataTableFullName + " SET TTL=FOREVER";
@@ -847,7 +847,7 @@ public abstract class SetPropertyIT extends 
ParallelStatsDisabledIT {
                 assertEquals(HConstants.FOREVER, 
columnFamilies[0].getTimeToLive());
                 assertEquals("XYZ", columnFamilies[0].getNameAsString());
                 //Check if Alter Table TTL also changes TTL stored in SYSCAT 
with phoenix.table.ttl disabled
-                assertEquals(HConstants.FOREVER, 
conn.unwrap(PhoenixConnection.class).getTable(
+                assertEquals(TTLExpression.TTL_EXPRESSION_FORVER, 
conn.unwrap(PhoenixConnection.class).getTable(
                         new PTableKey(null, dataTableFullName)).getTTL());
             }
         } finally {
@@ -996,7 +996,7 @@ public abstract class SetPropertyIT extends 
ParallelStatsDisabledIT {
                 assertEquals(false, columnFamilies[1].isInMemory());
                 assertEquals(86400, columnFamilies[1].getTimeToLive());
                 //Check if TTL is stored in SYSCAT as well and we are getting 
ttl from get api in PTable
-                assertEquals(86400, 
conn.unwrap(PhoenixConnection.class).getTable(
+                assertEquals(new LiteralTTLExpression(86400), 
conn.unwrap(PhoenixConnection.class).getTable(
                         new PTableKey(null, dataTableFullName)).getTTL());
             }
 
@@ -1014,7 +1014,7 @@ public abstract class SetPropertyIT extends 
ParallelStatsDisabledIT {
                 assertEquals(false, columnFamilies[1].isInMemory());
                 assertEquals(1000, columnFamilies[1].getTimeToLive());
                 //Check if Alter Table TTL also changes TTL stored in SYSCAT 
with phoenix.table.ttl disabled
-                assertEquals(1000, 
conn.unwrap(PhoenixConnection.class).getTable(
+                assertEquals(new LiteralTTLExpression(1000), 
conn.unwrap(PhoenixConnection.class).getTable(
                         new PTableKey(null, dataTableFullName)).getTTL());
             }
 
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TTLAsPhoenixTTLIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TTLAsPhoenixTTLIT.java
index ac835947e0..fdc42a6236 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TTLAsPhoenixTTLIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TTLAsPhoenixTTLIT.java
@@ -24,26 +24,36 @@ import 
org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
 import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.jdbc.PhoenixConnection;
-import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
+import org.apache.phoenix.parse.ParseNode;
+import org.apache.phoenix.parse.SQLParser;
+import org.apache.phoenix.schema.ColumnNotFoundException;
+import org.apache.phoenix.schema.ColumnFamilyNotFoundException;
 import org.apache.phoenix.schema.PName;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.PTableKey;
+import org.apache.phoenix.schema.TTLExpression;
+import org.apache.phoenix.schema.TypeMismatchException;
 import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.PhoenixRuntime;
+import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
 import java.sql.Connection;
 import java.sql.DriverManager;
+import java.sql.PreparedStatement;
 import java.sql.SQLException;
-import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.List;
 import java.util.Properties;
 
 import static 
org.apache.phoenix.exception.SQLExceptionCode.CANNOT_SET_OR_ALTER_PROPERTY_FOR_INDEX;
 import static 
org.apache.phoenix.exception.SQLExceptionCode.TTL_ALREADY_DEFINED_IN_HIERARCHY;
 import static 
org.apache.phoenix.exception.SQLExceptionCode.TTL_SUPPORTED_FOR_TABLES_AND_VIEWS_ONLY;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TTL_NOT_DEFINED;
+import static 
org.apache.phoenix.schema.TTLExpression.TTL_EXPRESSION_NOT_DEFINED;
 import static org.apache.phoenix.util.PhoenixRuntime.TENANT_ID_ATTRIB;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
@@ -51,11 +61,56 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 @Category(ParallelStatsDisabledTest.class)
+@RunWith(Parameterized.class)
 public class TTLAsPhoenixTTLIT extends ParallelStatsDisabledIT{
 
+    private static final String DDL_TEMPLATE = "CREATE TABLE IF NOT EXISTS %s "
+            + "("
+            + " ID INTEGER NOT NULL,"
+            + " COL1 INTEGER NOT NULL,"
+            + " COL2 bigint NOT NULL,"
+            + " CREATED_DATE DATE,"
+            + " CREATION_TIME TIME,"
+            + " CONSTRAINT NAME_PK PRIMARY KEY (ID, COL1, COL2)"
+            + ")";
+
+    private static final String DEFAULT_DDL_OPTIONS = "MULTI_TENANT=true";
+
     private static final int DEFAULT_TTL_FOR_TEST = 86400;
     private static final int DEFAULT_TTL_FOR_CHILD = 10000;
     private static final int DEFAULT_TTL_FOR_ALTER = 7000;
+    private static final String DEFAULT_TTL_EXPRESSION = "CURRENT_TIME() - 
cREATION_TIME > 500"; // case-insensitive comparison
+    private static final String DEFAULT_TTL_EXPRESSION_FOR_ALTER =
+            "CURRENT_TIME() - PHOENIX_ROW_TIMESTAMP() > 100";
+
+    private boolean useExpression;
+    private TTLExpression defaultTTL;
+    private String defaultTTLDDLOption;
+    private TTLExpression alterTTL;
+    private String alterTTLDDLOption;
+
+    public TTLAsPhoenixTTLIT(boolean useExpression) {
+        this.useExpression = useExpression;
+        this.defaultTTL = useExpression ?
+                TTLExpression.create(DEFAULT_TTL_EXPRESSION) :
+                TTLExpression.create(DEFAULT_TTL_FOR_TEST);
+        this.defaultTTLDDLOption = useExpression ?
+                String.format("'%s'", DEFAULT_TTL_EXPRESSION) :
+                String.valueOf(DEFAULT_TTL_FOR_TEST);
+        this.alterTTL = useExpression ?
+                TTLExpression.create(DEFAULT_TTL_EXPRESSION_FOR_ALTER) :
+                TTLExpression.create(DEFAULT_TTL_FOR_ALTER);
+        this.alterTTLDDLOption = useExpression ?
+                String.format("'%s'", DEFAULT_TTL_EXPRESSION_FOR_ALTER) :
+                String.valueOf(DEFAULT_TTL_FOR_ALTER);
+    }
+
+    @Parameterized.Parameters(name = "useExpression={0}")
+    public static synchronized Collection<Boolean[]> data() {
+        return Arrays.asList(new Boolean[][]{
+                {false}, {true}
+        });
+    }
 
     /**
      * test TTL is being set as PhoenixTTL when PhoenixTTL is enabled.
@@ -65,13 +120,40 @@ public class TTLAsPhoenixTTLIT extends 
ParallelStatsDisabledIT{
         try (Connection conn = DriverManager.getConnection(getUrl());) {
             PTable table = conn.unwrap(PhoenixConnection.class).getTable(new 
PTableKey(null,
                     createTableWithOrWithOutTTLAsItsProperty(conn, true)));
-            assertEquals("TTL is not set correctly at Phoenix level", 
DEFAULT_TTL_FOR_TEST,
-                    table.getTTL());
+            assertTTLValue(table, defaultTTL);
             assertTrue("RowKeyMatcher should be Null",
                     (Bytes.compareTo(HConstants.EMPTY_BYTE_ARRAY, 
table.getRowKeyMatcher()) == 0));
         }
     }
 
+    @Test
+    public void testCreateTableWithNoTTL() throws Exception {
+        try (Connection conn = DriverManager.getConnection(getUrl());) {
+            PTable table = conn.unwrap(PhoenixConnection.class).getTable(new 
PTableKey(null,
+                    createTableWithOrWithOutTTLAsItsProperty(conn, false)));
+            assertTTLValue(table, TTL_EXPRESSION_NOT_DEFINED);
+        }
+    }
+
+    @Test
+    public void testSwitchingTTLFromCondToValue() throws Exception {
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            String tableName = createTableWithOrWithOutTTLAsItsProperty(conn, 
true);
+            PTable table = conn.unwrap(PhoenixConnection.class).
+                    getTable(new PTableKey(null, tableName));
+            assertTTLValue(table, defaultTTL);
+            // Switch from cond ttl to value or vice versa
+            String alterTTL = useExpression ? 
String.valueOf(DEFAULT_TTL_FOR_ALTER) :
+                    String.format("'%s'", DEFAULT_TTL_EXPRESSION_FOR_ALTER);
+            String alterDDL = "ALTER TABLE " + tableName + " SET TTL = " + 
alterTTL;
+            conn.createStatement().execute(alterDDL);
+            TTLExpression expected = useExpression ?
+                    TTLExpression.create(DEFAULT_TTL_FOR_ALTER) :
+                    TTLExpression.create(DEFAULT_TTL_EXPRESSION_FOR_ALTER);
+            assertTTLValue(conn.unwrap(PhoenixConnection.class), expected, 
tableName);
+        }
+    }
+
     /**
      * Tests that when: 1) DDL has both pk as well as key value columns 2) Key 
value columns have
      *      * both default and explicit column family names 3) TTL specifier 
doesn't have column family
@@ -80,69 +162,78 @@ public class TTLAsPhoenixTTLIT extends 
ParallelStatsDisabledIT{
     @Test
     public void  testCreateTableWithTTLWithDifferentColumnFamilies() throws  
Exception {
         String tableName = generateUniqueName();
+        String ttlExpr = "id = 'x' AND b.col2 = 7";
+        String quotedTTLExpr = "id = ''x'' AND b.col2 = 7"; // to retain 
single quotes in DDL
+        int ttlValue = 86400;
+        String ttl = (useExpression ?
+                String.format("'%s'", quotedTTLExpr) :
+                String.valueOf(ttlValue));
         String ddl =
                 "create table IF NOT EXISTS  " + tableName + "  (" + " id 
char(1) NOT NULL,"
                         + " col1 integer NOT NULL," + " b.col2 bigint," + " 
col3 bigint, "
                         + " CONSTRAINT NAME_PK PRIMARY KEY (id, col1)"
-                        + " ) TTL=86400";
-        Connection conn = DriverManager.getConnection(getUrl());
-        conn.createStatement().execute(ddl);
-        assertTTLValueOfTableOrView(conn.unwrap(PhoenixConnection.class), 
DEFAULT_TTL_FOR_TEST, tableName);
+                        + " ) TTL=" + ttl;
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            conn.createStatement().execute(ddl);
+            TTLExpression expected = useExpression ?
+                    TTLExpression.create(ttlExpr) : 
TTLExpression.create(ttlValue);
+            assertTTLValue(conn.unwrap(PhoenixConnection.class), expected, 
tableName);
+        }
         //Setting TTL should not be stored as CF Descriptor properties when
         //phoenix.table.ttl.enabled is true
         Admin admin = driver.getConnectionQueryServices(getUrl(), new 
Properties()).getAdmin();
         ColumnFamilyDescriptor[] columnFamilies =
                 
admin.getDescriptor(TableName.valueOf(tableName)).getColumnFamilies();
         assertEquals(ColumnFamilyDescriptorBuilder.DEFAULT_TTL, 
columnFamilies[0].getTimeToLive());
-
     }
 
     @Test
     public void testCreateAndAlterTableDDLWithForeverAndNoneTTLValues() throws 
Exception {
+        if (useExpression) {
+            return;
+        }
         String tableName = generateUniqueName();
         String ddl =
                 "create table IF NOT EXISTS  " + tableName + "  (" + " id 
char(1) NOT NULL,"
                         + " col1 integer NOT NULL," + " b.col2 bigint," + " 
col3 bigint, "
                         + " CONSTRAINT NAME_PK PRIMARY KEY (id, col1)"
                         + " ) TTL=FOREVER";
-        Connection conn = DriverManager.getConnection(getUrl());
-        conn.createStatement().execute(ddl);
-        assertTTLValueOfTableOrView(conn.unwrap(PhoenixConnection.class),
-                HConstants.FOREVER, tableName);
-
-        ddl = "ALTER TABLE  " + tableName
-                + " SET TTL=NONE";
-        conn.createStatement().execute(ddl);
-        assertTTLValueOfTableOrView(conn.unwrap(PhoenixConnection.class),
-                PhoenixDatabaseMetaData.TTL_NOT_DEFINED, tableName);
-        //Setting TTL should not be stored as CF Descriptor properties when
-        //phoenix.table.ttl.enabled is true
-        Admin admin = driver.getConnectionQueryServices(getUrl(), new 
Properties()).getAdmin();
-        ColumnFamilyDescriptor[] columnFamilies =
-                
admin.getDescriptor(TableName.valueOf(tableName)).getColumnFamilies();
-        assertEquals(ColumnFamilyDescriptorBuilder.DEFAULT_TTL, 
columnFamilies[0].getTimeToLive());
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            conn.createStatement().execute(ddl);
+            assertTTLValue(conn.unwrap(PhoenixConnection.class),
+                    TTLExpression.TTL_EXPRESSION_FORVER, tableName);
 
-        tableName = generateUniqueName();
-        ddl =
-                "create table IF NOT EXISTS  " + tableName + "  (" + " id 
char(1) NOT NULL,"
-                        + " col1 integer NOT NULL," + " b.col2 bigint," + " 
col3 bigint, "
-                        + " CONSTRAINT NAME_PK PRIMARY KEY (id, col1)"
-                        + " ) TTL=NONE";
-        conn.createStatement().execute(ddl);
-        assertTTLValueOfTableOrView(conn.unwrap(PhoenixConnection.class),
-                PhoenixDatabaseMetaData.TTL_NOT_DEFINED, tableName);
-
-        ddl = "ALTER TABLE  " + tableName
-                + " SET TTL=FOREVER";
-        conn.createStatement().execute(ddl);
-        assertTTLValueOfTableOrView(conn.unwrap(PhoenixConnection.class),
-                HConstants.FOREVER, tableName);
-        //Setting TTL should not be stored as CF Descriptor properties when
-        //phoenix.table.ttl.enabled is true
-        columnFamilies =
-                
admin.getDescriptor(TableName.valueOf(tableName)).getColumnFamilies();
-        assertEquals(ColumnFamilyDescriptorBuilder.DEFAULT_TTL, 
columnFamilies[0].getTimeToLive());
+            ddl = "ALTER TABLE  " + tableName + " SET TTL=NONE";
+            conn.createStatement().execute(ddl);
+            assertTTLValue(conn.unwrap(PhoenixConnection.class),
+                    TTL_EXPRESSION_NOT_DEFINED, tableName);
+            //Setting TTL should not be stored as CF Descriptor properties when
+            //phoenix.table.ttl.enabled is true
+            Admin admin = driver.getConnectionQueryServices(getUrl(), new 
Properties()).getAdmin();
+            ColumnFamilyDescriptor[] columnFamilies =
+                    
admin.getDescriptor(TableName.valueOf(tableName)).getColumnFamilies();
+            assertEquals(ColumnFamilyDescriptorBuilder.DEFAULT_TTL, 
columnFamilies[0].getTimeToLive());
+
+            tableName = generateUniqueName();
+            ddl =
+                    "create table IF NOT EXISTS  " + tableName + "  (" + " id 
char(1) NOT NULL,"
+                            + " col1 integer NOT NULL," + " b.col2 bigint," + 
" col3 bigint, "
+                            + " CONSTRAINT NAME_PK PRIMARY KEY (id, col1)"
+                            + " ) TTL=NONE";
+            conn.createStatement().execute(ddl);
+            assertTTLValue(conn.unwrap(PhoenixConnection.class),
+                    TTL_EXPRESSION_NOT_DEFINED, tableName);
 
+            ddl = "ALTER TABLE  " + tableName + " SET TTL=FOREVER";
+            conn.createStatement().execute(ddl);
+            assertTTLValue(conn.unwrap(PhoenixConnection.class),
+                    TTLExpression.TTL_EXPRESSION_FORVER, tableName);
+            //Setting TTL should not be stored as CF Descriptor properties when
+            //phoenix.table.ttl.enabled is true
+            columnFamilies =
+                    
admin.getDescriptor(TableName.valueOf(tableName)).getColumnFamilies();
+            assertEquals(ColumnFamilyDescriptorBuilder.DEFAULT_TTL, 
columnFamilies[0].getTimeToLive());
+        }
     }
 
     @Test
@@ -151,12 +242,11 @@ public class TTLAsPhoenixTTLIT extends 
ParallelStatsDisabledIT{
              PhoenixConnection pConn = conn.unwrap(PhoenixConnection.class);){
             String tableName = createTableWithOrWithOutTTLAsItsProperty(conn, 
false);
             //Checking Default TTL in case of PhoenixTTLEnabled
-            assertTTLValueOfTableOrView(conn.unwrap(PhoenixConnection.class), 
PhoenixDatabaseMetaData.TTL_NOT_DEFINED, tableName);
-            String ddl = "ALTER TABLE  " + tableName
-                    + " SET TTL = " + DEFAULT_TTL_FOR_ALTER;
+            assertTTLValue(conn.unwrap(PhoenixConnection.class), 
TTL_EXPRESSION_NOT_DEFINED, tableName);
+
+            String ddl = "ALTER TABLE  " + tableName + " SET TTL = " + 
this.alterTTLDDLOption;
             conn.createStatement().execute(ddl);
-            assertTTLValueOfTableOrView(conn.unwrap(PhoenixConnection.class),
-                    DEFAULT_TTL_FOR_ALTER, tableName);
+            assertTTLValue(conn.unwrap(PhoenixConnection.class), 
this.alterTTL, tableName);
             //Asserting TTL should not be stored as CF Descriptor properties 
when
             //phoenix.table.ttl.enabled is true
             Admin admin = driver.getConnectionQueryServices(getUrl(), new 
Properties()).getAdmin();
@@ -168,7 +258,7 @@ public class TTLAsPhoenixTTLIT extends 
ParallelStatsDisabledIT{
 
     @Test
     public void testSettingTTLForIndexes() throws Exception {
-        try (Connection conn = DriverManager.getConnection(getUrl())){
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
             String tableName = createTableWithOrWithOutTTLAsItsProperty(conn, 
true);
 
             //By default, Indexes should set TTL what Base Table has
@@ -176,7 +266,7 @@ public class TTLAsPhoenixTTLIT extends 
ParallelStatsDisabledIT{
             createIndexOnTableOrViewProvidedWithTTL(conn, tableName, 
PTable.IndexType.GLOBAL, false);
             List<PTable> indexes = PhoenixRuntime.getTable(conn, 
tableName).getIndexes();
             for (PTable index : indexes) {
-                assertTTLValueOfIndex(DEFAULT_TTL_FOR_TEST, index);;
+                assertTTLValue(index, defaultTTL);
             }
 
             tableName = createTableWithOrWithOutTTLAsItsProperty(conn, false);
@@ -185,15 +275,18 @@ public class TTLAsPhoenixTTLIT extends 
ParallelStatsDisabledIT{
             String globalIndexName = 
createIndexOnTableOrViewProvidedWithTTL(conn, tableName, 
PTable.IndexType.GLOBAL, false);
             indexes = conn.unwrap(PhoenixConnection.class).getTable(new 
PTableKey(null, tableName)).getIndexes();
             for (PTable index : indexes) {
-                assertTTLValueOfIndex(PhoenixDatabaseMetaData.TTL_NOT_DEFINED, 
index);
+                assertTTLValue(index, TTL_EXPRESSION_NOT_DEFINED);
                 assertTrue(Bytes.compareTo(
                         index.getRowKeyMatcher(), HConstants.EMPTY_BYTE_ARRAY) 
== 0
                 );
             }
 
             //Test setting TTL as index property not allowed while creating 
them or setting them explicitly.
+            String ttl = (useExpression ?
+                    String.format("'%s'",DEFAULT_TTL_EXPRESSION) :
+                    String.valueOf(1000));
             try {
-                conn.createStatement().execute("ALTER TABLE " + localIndexName 
+ " SET TTL = 1000");
+                conn.createStatement().execute("ALTER TABLE " + localIndexName 
+ " SET TTL = " + ttl);
                 fail();
             } catch (SQLException sqe) {
                 assertEquals("Should fail with cannot set or alter property 
for index",
@@ -201,7 +294,7 @@ public class TTLAsPhoenixTTLIT extends 
ParallelStatsDisabledIT{
             }
 
             try {
-                conn.createStatement().execute("ALTER TABLE " + 
globalIndexName + " SET TTL = 1000");
+                conn.createStatement().execute("ALTER TABLE " + 
globalIndexName + " SET TTL = " + ttl);
                 fail();
             } catch (SQLException sqe) {
                 assertEquals("Should fail with cannot set or alter property 
for index",
@@ -223,7 +316,58 @@ public class TTLAsPhoenixTTLIT extends 
ParallelStatsDisabledIT{
                 assertEquals("Should fail with cannot set or alter property 
for index",
                         CANNOT_SET_OR_ALTER_PROPERTY_FOR_INDEX.getErrorCode(), 
sqe.getErrorCode());
             }
+        }
+    }
+
+    @Test
+    public void testConditionalTTLDDL() throws Exception {
+        if (!useExpression) {
+            return;
+        }
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            String tableName = generateUniqueName();
+            String ddl = "CREATE TABLE %s (ID1 VARCHAR NOT NULL, ID2 INTEGER 
NOT NULL, COL1 VARCHAR, G.COL2 DATE " +
+                    "CONSTRAINT PK PRIMARY KEY(ID1, ID2)) TTL = '%s'";
+            try {
+                conn.createStatement().execute(String.format(ddl, tableName, 
"ID2 = 12 OR UNKNOWN_COLUMN = 67"));
+                fail("Should have thrown ColumnNotFoundException");
+            } catch (ColumnNotFoundException e) {
+            }
+            String ttl = "ID2 = 34 AND G.COL2 > CURRENT_DATE() + 1000";
+            conn.createStatement().execute(String.format(ddl, tableName, ttl));
+            TTLExpression expected = TTLExpression.create(ttl);
+            assertTTLValue(conn.unwrap(PhoenixConnection.class), expected, 
tableName);
+
+            conn.createStatement().execute(String.format("ALTER TABLE %s SET 
TTL=NONE", tableName));
+            assertTTLValue(conn.unwrap(PhoenixConnection.class), 
TTL_EXPRESSION_NOT_DEFINED, tableName);
+
+            try {
+                conn.createStatement().execute(String.format("ALTER TABLE %s 
SET TTL='%s'",
+                        tableName, "UNKNOWN_COLUMN=67"));
+                fail("Alter table should have thrown ColumnNotFoundException");
+            } catch (ColumnNotFoundException e) {
+
+            }
 
+            String viewName = generateUniqueName();
+            ddl = "CREATE VIEW %s ( VINT SMALLINT) AS SELECT * FROM %s 
TTL='%s'";
+            ttl = "F.ID2 = 124";
+            try {
+                conn.createStatement().execute(String.format(ddl, viewName, 
tableName, ttl));
+                fail("Should have thrown ColumnFamilyNotFoundException");
+            } catch (ColumnFamilyNotFoundException e) {
+
+            }
+            ttl = "G.COL2 > CURRENT_DATE() + 200 AND VINT > 123";
+            conn.createStatement().execute(String.format(ddl, viewName, 
tableName, ttl));
+            expected = TTLExpression.create(ttl);
+            assertTTLValue(conn.unwrap(PhoenixConnection.class), expected, 
viewName);
+
+            ttl = "G.COL2 > CURRENT_DATE() + 500 AND VINT > 123";
+            conn.createStatement().execute(String.format("ALTER VIEW %s SET 
TTL='%s'",
+                    viewName, ttl));
+            expected = TTLExpression.create(ttl);
+            assertTTLValue(conn.unwrap(PhoenixConnection.class), expected, 
viewName);
         }
     }
 
@@ -242,8 +386,7 @@ public class TTLAsPhoenixTTLIT extends 
ParallelStatsDisabledIT{
             Connection tenantConn1 = DriverManager.getConnection(getUrl(), 
props1);
 
             String tableName = createTableWithOrWithOutTTLAsItsProperty(conn, 
true);
-            assertTTLValueOfTableOrView(conn.unwrap(PhoenixConnection.class), 
DEFAULT_TTL_FOR_TEST,
-                    tableName);
+            assertTTLValue(conn.unwrap(PhoenixConnection.class), defaultTTL, 
tableName);
 
             //Setting TTL on views is not allowed if Table already has TTL
             try {
@@ -265,17 +408,14 @@ public class TTLAsPhoenixTTLIT extends 
ParallelStatsDisabledIT{
 
             //View should have gotten TTL from parent table.
             String viewName = createUpdatableViewOnTableWithTTL(conn, 
tableName, false);
-            assertTTLValueOfTableOrView(conn.unwrap(PhoenixConnection.class),
-                    DEFAULT_TTL_FOR_TEST, viewName);
+            assertTTLValue(conn.unwrap(PhoenixConnection.class), defaultTTL, 
viewName);
 
             //Child View's PTable gets TTL from parent View's PTable which 
gets from Table.
             String childView = createViewOnViewWithTTL(tenantConn, viewName, 
false);
-            
assertTTLValueOfTableOrView(tenantConn.unwrap(PhoenixConnection.class),
-                    DEFAULT_TTL_FOR_TEST, childView);
+            assertTTLValue(tenantConn.unwrap(PhoenixConnection.class), 
defaultTTL, childView);
 
             String childView1 = createViewOnViewWithTTL(tenantConn1, viewName, 
false);
-            
assertTTLValueOfTableOrView(tenantConn1.unwrap(PhoenixConnection.class),
-                    DEFAULT_TTL_FOR_TEST, childView1);
+            assertTTLValue(tenantConn1.unwrap(PhoenixConnection.class), 
defaultTTL, childView1);
 
             createIndexOnTableOrViewProvidedWithTTL(conn, viewName, 
PTable.IndexType.GLOBAL,
                     false);
@@ -286,7 +426,7 @@ public class TTLAsPhoenixTTLIT extends 
ParallelStatsDisabledIT{
                     conn.unwrap(PhoenixConnection.class), 
viewName).getIndexes();
 
             for (PTable index : indexes) {
-                assertTTLValueOfIndex(DEFAULT_TTL_FOR_TEST, index);
+                assertTTLValue(index, defaultTTL);
             }
 
             createIndexOnTableOrViewProvidedWithTTL(conn, tableName, 
PTable.IndexType.GLOBAL, false);
@@ -294,9 +434,8 @@ public class TTLAsPhoenixTTLIT extends 
ParallelStatsDisabledIT{
                     conn.unwrap(PhoenixConnection.class), 
tableName).getIndexes();
 
             for (PTable index : tIndexes) {
-                assertTTLValueOfIndex(DEFAULT_TTL_FOR_TEST, index);
+                assertTTLValue(index, defaultTTL);
             }
-
         }
     }
 
@@ -310,8 +449,7 @@ public class TTLAsPhoenixTTLIT extends 
ParallelStatsDisabledIT{
             Connection tenantConn = DriverManager.getConnection(getUrl(), 
props);
 
             String tableName = createTableWithOrWithOutTTLAsItsProperty(conn, 
true);
-            assertTTLValueOfTableOrView(conn.unwrap(PhoenixConnection.class), 
DEFAULT_TTL_FOR_TEST,
-                    tableName);
+            assertTTLValue(conn.unwrap(PhoenixConnection.class), defaultTTL, 
tableName);
 
             //Setting TTL on views is not allowed if Table already has TTL
             try {
@@ -325,11 +463,14 @@ public class TTLAsPhoenixTTLIT extends 
ParallelStatsDisabledIT{
             String ddl = "ALTER TABLE " + tableName + " SET TTL=NONE";
             conn.createStatement().execute(ddl);
 
-            assertTTLValueOfTableOrView(conn.unwrap(PhoenixConnection.class), 
TTL_NOT_DEFINED,
-                    tableName);
+            assertTTLValue(conn.unwrap(PhoenixConnection.class),
+                    TTL_EXPRESSION_NOT_DEFINED, tableName);
 
             String viewName = createUpdatableViewOnTableWithTTL(conn, 
tableName, true);
-            assertTTLValueOfTableOrView(conn.unwrap(PhoenixConnection.class), 
DEFAULT_TTL_FOR_CHILD, viewName);
+            TTLExpression expectedChildTTl = useExpression ?
+                    TTLExpression.create(DEFAULT_TTL_EXPRESSION) :
+                    TTLExpression.create(DEFAULT_TTL_FOR_CHILD);
+            assertTTLValue(conn.unwrap(PhoenixConnection.class), 
expectedChildTTl, viewName);
 
             try {
                 createViewOnViewWithTTL(tenantConn, viewName, true);
@@ -339,8 +480,11 @@ public class TTLAsPhoenixTTLIT extends 
ParallelStatsDisabledIT{
                         TTL_ALREADY_DEFINED_IN_HIERARCHY.getErrorCode(), 
sqe.getErrorCode());
             }
 
+            String ttlAlter = (useExpression ?
+                    String.format("'%s'", DEFAULT_TTL_EXPRESSION_FOR_ALTER) :
+                    String.valueOf(DEFAULT_TTL_FOR_ALTER));
             try {
-                ddl = "ALTER TABLE " + tableName + " SET TTL=" + 
DEFAULT_TTL_FOR_ALTER;
+                ddl = "ALTER TABLE " + tableName + " SET TTL=" + ttlAlter;
                 conn.createStatement().execute(ddl);
             } catch (SQLException sqe) {
                 assertEquals("Should fail with TTL already defined in 
hierarchy",
@@ -351,19 +495,20 @@ public class TTLAsPhoenixTTLIT extends 
ParallelStatsDisabledIT{
             conn.createStatement().execute(ddl);
 
             String childView = createViewOnViewWithTTL(tenantConn, viewName, 
true);
-            
assertTTLValueOfTableOrView(tenantConn.unwrap(PhoenixConnection.class),
-                    DEFAULT_TTL_FOR_CHILD, childView);
+            assertTTLValue(tenantConn.unwrap(PhoenixConnection.class), 
expectedChildTTl, childView);
 
             ddl = "ALTER VIEW " + childView + " SET TTL=NONE";
             tenantConn.createStatement().execute(ddl);
 
-            
assertTTLValueOfTableOrView(tenantConn.unwrap(PhoenixConnection.class),
-                    TTL_NOT_DEFINED, childView);
+            assertTTLValue(tenantConn.unwrap(PhoenixConnection.class),
+                    TTL_EXPRESSION_NOT_DEFINED, childView);
 
-            ddl = "ALTER VIEW " + viewName + " SET TTL=" + 
DEFAULT_TTL_FOR_ALTER;
+            ddl = "ALTER VIEW " + viewName + " SET TTL=" + ttlAlter;
             conn.createStatement().execute(ddl);
-            assertTTLValueOfTableOrView(conn.unwrap(PhoenixConnection.class), 
DEFAULT_TTL_FOR_ALTER, viewName);
-
+            TTLExpression expectedAlterTTl = useExpression ?
+                    TTLExpression.create(DEFAULT_TTL_EXPRESSION_FOR_ALTER) :
+                    TTLExpression.create(DEFAULT_TTL_FOR_ALTER);
+            assertTTLValue(conn.unwrap(PhoenixConnection.class), 
expectedAlterTTl, viewName);
         }
     }
 
@@ -382,24 +527,23 @@ public class TTLAsPhoenixTTLIT extends 
ParallelStatsDisabledIT{
             Connection tenantConn1 = DriverManager.getConnection(getUrl(), 
props1);
 
             String tableName = createTableWithOrWithOutTTLAsItsProperty(conn, 
true);
-            assertTTLValueOfTableOrView(conn.unwrap(PhoenixConnection.class), 
DEFAULT_TTL_FOR_TEST,
-                    tableName);
+            assertTTLValue(conn.unwrap(PhoenixConnection.class), defaultTTL, 
tableName);
 
             //View should have gotten TTL from parent table.
             String viewName = createUpdatableViewOnTableWithTTL(conn, 
tableName, false);
-            assertTTLValueOfTableOrView(conn.unwrap(PhoenixConnection.class),
-                    DEFAULT_TTL_FOR_TEST, viewName);
+            assertTTLValue(conn.unwrap(PhoenixConnection.class), defaultTTL, 
viewName);
 
             //Child View's PTable gets TTL from parent View's PTable which 
gets from Table.
             String childView = createViewOnViewWithTTL(tenantConn, viewName, 
false);
-            
assertTTLValueOfTableOrView(tenantConn.unwrap(PhoenixConnection.class),
-                    DEFAULT_TTL_FOR_TEST, childView);
+            assertTTLValue(tenantConn.unwrap(PhoenixConnection.class), 
defaultTTL, childView);
 
             String childView1 = createViewOnViewWithTTL(tenantConn1, viewName, 
false);
-            
assertTTLValueOfTableOrView(tenantConn1.unwrap(PhoenixConnection.class),
-                    DEFAULT_TTL_FOR_TEST, childView1);
+            assertTTLValue(tenantConn1.unwrap(PhoenixConnection.class), 
defaultTTL, childView1);
 
-            String alter = "ALTER TABLE " + tableName + " SET TTL = " + 
DEFAULT_TTL_FOR_ALTER;
+            String ttlAlter = (useExpression ?
+                    String.format("'%s'", DEFAULT_TTL_EXPRESSION_FOR_ALTER) :
+                    String.valueOf(DEFAULT_TTL_FOR_ALTER));
+            String alter = "ALTER TABLE " + tableName + " SET TTL = " + 
ttlAlter;
             conn.createStatement().execute(alter);
 
             //Clear Cache for all Tables to reflect Alter TTL commands in 
hierarchy
@@ -408,52 +552,53 @@ public class TTLAsPhoenixTTLIT extends 
ParallelStatsDisabledIT{
             clearCache(tenantConn, null, childView);
             clearCache(tenantConn1, null, childView1);
 
+            TTLExpression expectedAlterTTl = useExpression ?
+                    TTLExpression.create(DEFAULT_TTL_EXPRESSION_FOR_ALTER) :
+                    TTLExpression.create(DEFAULT_TTL_FOR_ALTER);
             //Assert TTL for each entity again with altered value
-            assertTTLValueOfTableOrView(conn.unwrap(PhoenixConnection.class),
-                    DEFAULT_TTL_FOR_ALTER, viewName);
-            
assertTTLValueOfTableOrView(tenantConn.unwrap(PhoenixConnection.class),
-                    DEFAULT_TTL_FOR_ALTER, childView);
-            
assertTTLValueOfTableOrView(tenantConn1.unwrap(PhoenixConnection.class),
-                    DEFAULT_TTL_FOR_ALTER, childView1);
+            assertTTLValue(conn.unwrap(PhoenixConnection.class), 
expectedAlterTTl, viewName);
+            assertTTLValue(tenantConn.unwrap(PhoenixConnection.class), 
expectedAlterTTl, childView);
+            assertTTLValue(tenantConn1.unwrap(PhoenixConnection.class), 
expectedAlterTTl, childView1);
         }
     }
 
-    private void assertTTLValueOfTableOrView(PhoenixConnection conn, long 
expected, String name) throws SQLException {
+    private void assertTTLValue(PhoenixConnection conn, TTLExpression 
expected, String name) throws SQLException {
         assertEquals("TTL value did not match :-", expected,
                 PhoenixRuntime.getTableNoCache(conn, name).getTTL());
     }
 
-    private void assertTTLValueOfIndex(long expected, PTable index) {
-        assertEquals("TTL value is not what expected :-", expected, 
index.getTTL());
+    private void assertTTLValue(PTable table, TTLExpression expected) {
+        assertEquals("TTL value did not match :-", expected, table.getTTL());
     }
 
-
     private String createTableWithOrWithOutTTLAsItsProperty(Connection conn, 
boolean withTTL) throws SQLException {
         String tableName = generateUniqueName();
-        conn.createStatement().execute("CREATE TABLE IF NOT EXISTS  " + 
tableName + "  ("
-                + " ID INTEGER NOT NULL,"
-                + " COL1 INTEGER NOT NULL,"
-                + " COL2 bigint NOT NULL,"
-                + " CREATED_DATE DATE,"
-                + " CREATION_TIME BIGINT,"
-                + " CONSTRAINT NAME_PK PRIMARY KEY (ID, COL1, COL2)) 
MULTI_TENANT=true "
-                + ( withTTL ? ", TTL = " + DEFAULT_TTL_FOR_TEST : ""));
+        StringBuilder ddl = new StringBuilder();
+        ddl.append(String.format(DDL_TEMPLATE, tableName));
+        ddl.append(DEFAULT_DDL_OPTIONS);
+        if (withTTL) {
+            ddl.append(", TTL = " + this.defaultTTLDDLOption);
+        }
+        conn.createStatement().execute(ddl.toString());
         return tableName;
     }
 
     private String createIndexOnTableOrViewProvidedWithTTL(Connection conn, 
String baseTableOrViewName, PTable.IndexType indexType,
                                                            boolean withTTL) 
throws SQLException {
+        String ttl = (useExpression ?
+                String.format("'%s'", DEFAULT_TTL_EXPRESSION) :
+                String.valueOf(DEFAULT_TTL_FOR_CHILD));
         switch (indexType) {
             case LOCAL:
                 String localIndexName = baseTableOrViewName + "_Local_" + 
generateUniqueName();
                 conn.createStatement().execute("CREATE LOCAL INDEX " + 
localIndexName + " ON " +
-                        baseTableOrViewName + " (COL2) " + (withTTL ? "TTL = " 
+ DEFAULT_TTL_FOR_CHILD : ""));
+                        baseTableOrViewName + " (COL2) " + (withTTL ? "TTL = " 
+ ttl : ""));
                 return localIndexName;
 
             case GLOBAL:
                 String globalIndexName = baseTableOrViewName + "_Global_" + 
generateUniqueName();
                 conn.createStatement().execute("CREATE INDEX " + 
globalIndexName + " ON " +
-                        baseTableOrViewName + " (COL2) " + (withTTL ? "TTL = " 
+ DEFAULT_TTL_FOR_CHILD : ""));
+                        baseTableOrViewName + " (COL2) " + (withTTL ? "TTL = " 
+ ttl : ""));
                 return globalIndexName;
 
             default:
@@ -463,30 +608,39 @@ public class TTLAsPhoenixTTLIT extends 
ParallelStatsDisabledIT{
 
     private String createReadOnlyViewOnTableWithTTL(Connection conn, String 
baseTableName,
                                             boolean withTTL) throws 
SQLException {
+        String ttl = (useExpression ?
+                String.format("'%s'", DEFAULT_TTL_EXPRESSION) :
+                String.valueOf(DEFAULT_TTL_FOR_CHILD));
         String viewName = "VIEW_" + baseTableName + "_" + generateUniqueName();
         conn.createStatement().execute("CREATE VIEW " + viewName
                 + " (" + generateUniqueName() + " SMALLINT) as select * from "
                 + baseTableName + " where COL1 > 1 "
-                + (withTTL ? "TTL = " + DEFAULT_TTL_FOR_CHILD  : "") );
+                + (withTTL ? "TTL = " + ttl  : "") );
         return viewName;
     }
 
     private String createUpdatableViewOnTableWithTTL(Connection conn, String 
baseTableName,
                                             boolean withTTL) throws 
SQLException {
+        String ttl = (useExpression ?
+                String.format("'%s'", DEFAULT_TTL_EXPRESSION) :
+                String.valueOf(DEFAULT_TTL_FOR_CHILD));
         String viewName = "VIEW_" + baseTableName + "_" + generateUniqueName();
         conn.createStatement().execute("CREATE VIEW " + viewName
                 + " (" + generateUniqueName() + " SMALLINT) as select * from "
                 + baseTableName + " where COL1 = 1 "
-                + (withTTL ? "TTL = " + DEFAULT_TTL_FOR_CHILD : "") );
+                + (withTTL ? "TTL = " + ttl : "") );
         return viewName;
     }
 
     private String createViewOnViewWithTTL(Connection conn, String 
parentViewName,
                                            boolean withTTL) throws 
SQLException {
+        String ttl = (useExpression ?
+                String.format("'%s'", DEFAULT_TTL_EXPRESSION) :
+                String.valueOf(DEFAULT_TTL_FOR_CHILD));
         String childView = parentViewName + "_" + generateUniqueName();
         conn.createStatement().execute("CREATE VIEW " + childView +
                 " (E BIGINT, F BIGINT) AS SELECT * FROM " + parentViewName +
-                (withTTL ? " TTL = " + DEFAULT_TTL_FOR_CHILD : ""));
+                (withTTL ? " TTL = " + ttl : ""));
         return childView;
     }
 
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TTLIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TTLIT.java
index 63bf22f594..b901082d71 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TTLIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TTLIT.java
@@ -23,6 +23,7 @@ import org.apache.phoenix.query.PhoenixTestBuilder;
 import org.apache.phoenix.query.PhoenixTestBuilder.SchemaBuilder;
 import org.apache.phoenix.query.PhoenixTestBuilder.SchemaBuilder.TableOptions;
 import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.LiteralTTLExpression;
 import org.apache.phoenix.thirdparty.com.google.common.collect.Lists;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.junit.Assert;
@@ -136,19 +137,22 @@ public class TTLIT extends ParallelStatsDisabledIT {
      */
 
     private void assertTTLForGivenPTable(PTable table, int ttl) {
-        Assert.assertEquals(ttl, table.getTTL());
+        LiteralTTLExpression expected = new LiteralTTLExpression(ttl);
+        Assert.assertEquals(expected, table.getTTL());
     }
 
 
     private void assertTTLForGivenEntity(Connection connection, String 
entityName, int ttl) throws SQLException {
         PTable pTable = PhoenixRuntime.getTable(connection, entityName);
-        Assert.assertEquals(ttl,pTable.getTTL());
+        LiteralTTLExpression expected = new LiteralTTLExpression(ttl);
+        Assert.assertEquals(expected, pTable.getTTL());
     }
 
     private void assertTTLForIndexName(Connection connection, String 
indexName, int ttl) throws SQLException {
         if (!indexName.equals(SKIP_ASSERT)) {
             PTable index = PhoenixRuntime.getTable(connection, indexName);
-            Assert.assertEquals(ttl,index.getTTL());
+            LiteralTTLExpression expected = new LiteralTTLExpression(ttl);
+            Assert.assertEquals(expected, index.getTTL());
         }
     }
 
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewTTLIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewTTLIT.java
index d3d04c63c6..d4c480a571 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewTTLIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewTTLIT.java
@@ -1638,7 +1638,7 @@ public class ViewTTLIT extends BaseViewTTLIT {
             
scan.setAttribute(BaseScannerRegionObserverConstants.DELETE_PHOENIX_TTL_EXPIRED,
                     PDataType.TRUE_BYTES);
             scan.setAttribute(BaseScannerRegionObserverConstants.TTL,
-                    Bytes.toBytes(Long.valueOf(table.getTTL())));
+                    Bytes.toBytes(table.getTTL().getTTLExpression()));
 
             PhoenixResultSet
                     rs =
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/schema/ConditionalTTLExpressionDDLTest.java
 
b/phoenix-core/src/test/java/org/apache/phoenix/schema/ConditionalTTLExpressionDDLTest.java
new file mode 100644
index 0000000000..4a473464d3
--- /dev/null
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/schema/ConditionalTTLExpressionDDLTest.java
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.schema;
+
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+
+import org.apache.phoenix.exception.PhoenixParserException;
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.query.BaseConnectionlessQueryTest;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.junit.Test;
+
+public class ConditionalTTLExpressionDDLTest extends 
BaseConnectionlessQueryTest {
+
+    private static void assertConditonTTL(Connection conn, String tableName, 
String ttlExpr) throws SQLException {
+        TTLExpression expected = new ConditionTTLExpression(ttlExpr);
+        assertConditonTTL(conn, tableName, expected);
+    }
+
+    private static void assertConditonTTL(Connection conn, String tableName, 
TTLExpression expected) throws SQLException {
+        PTable table = 
conn.unwrap(PhoenixConnection.class).getTable(tableName);
+        assertEquals(expected, table.getTTL());
+    }
+
+    @Test
+    public void testBasicExpression() throws SQLException {
+        String ddlTemplate = "create table %s (k1 bigint not null, k2 bigint 
not null, col1 varchar, col2 date " +
+                "constraint pk primary key (k1,k2 desc)) TTL = '%s'";
+        String quotedValue = "k1 > 5 AND col1 < ''zzzzzz''";
+        String ttl = "k1 > 5 AND col1 < 'zzzzzz'";
+        String tableName = generateUniqueName();
+        String ddl = String.format(ddlTemplate, tableName, quotedValue);
+        try (Connection conn = DriverManager.getConnection(getUrl(), 
PropertiesUtil.deepCopy(TEST_PROPERTIES))) {
+            conn.createStatement().execute(ddl);
+            assertConditonTTL(conn, tableName, ttl);
+        }
+    }
+
+    @Test(expected = TypeMismatchException.class)
+    public void testNotBooleanExpr() throws SQLException {
+        String ddlTemplate = "create table %s (k1 bigint not null, k2 bigint 
not null, col1 varchar, col2 date " +
+                "constraint pk primary key (k1,k2 desc)) TTL = '%s'";
+        String ttl = "k1 + 100";
+        String tableName = generateUniqueName();
+        String ddl = String.format(ddlTemplate, tableName, ttl);
+        try (Connection conn = DriverManager.getConnection(getUrl(), 
PropertiesUtil.deepCopy(TEST_PROPERTIES))) {
+            conn.createStatement().execute(ddl);
+        }
+    }
+
+    @Test(expected = TypeMismatchException.class)
+    public void testWrongArgumentValue() throws SQLException {
+        String ddlTemplate = "create table %s (k1 bigint not null, k2 bigint 
not null, col1 varchar, col2 date " +
+                "constraint pk primary key (k1,k2 desc)) TTL = '%s'";
+        String ttl = "k1 = ''abc''";
+        String tableName = generateUniqueName();
+        String ddl = String.format(ddlTemplate, tableName, ttl);
+        try (Connection conn = DriverManager.getConnection(getUrl(), 
PropertiesUtil.deepCopy(TEST_PROPERTIES))) {
+            conn.createStatement().execute(ddl);
+        }
+    }
+
+    @Test(expected = PhoenixParserException.class)
+    public void testParsingError() throws SQLException {
+        String ddlTemplate = "create table %s (k1 bigint not null, k2 bigint 
not null, col1 varchar, col2 date " +
+                "constraint pk primary key (k1,k2 desc)) TTL = '%s'";
+        String ttl = "k2 == 23";
+        String tableName = generateUniqueName();
+        String ddl = String.format(ddlTemplate, tableName, ttl);
+        try (Connection conn = DriverManager.getConnection(getUrl(), 
PropertiesUtil.deepCopy(TEST_PROPERTIES))) {
+            conn.createStatement().execute(ddl);
+        }
+    }
+
+    @Test
+    public void testAggregateExpressionNotAllowed() throws SQLException {
+        String ddlTemplate = "create table %s (k1 bigint not null, k2 bigint 
not null, col1 varchar, col2 date " +
+                "constraint pk primary key (k1,k2 desc)) TTL = '%s'";
+        String ttl = "SUM(k2) > 23";
+        String tableName = generateUniqueName();
+        String ddl = String.format(ddlTemplate, tableName, ttl);
+        try (Connection conn = DriverManager.getConnection(getUrl(), 
PropertiesUtil.deepCopy(TEST_PROPERTIES))) {
+            conn.createStatement().execute(ddl);
+            fail();
+        } catch (SQLException e) {
+            
assertEquals(SQLExceptionCode.AGGREGATE_EXPRESSION_NOT_ALLOWED_IN_TTL_EXPRESSION.getErrorCode(),
+                    e.getErrorCode());
+        } catch (Exception e) {
+            fail("Unknown exception " + e);
+        }
+    }
+
+    @Test
+    public void testNullExpression() throws SQLException {
+        String ddlTemplate = "create table %s (k1 bigint not null, k2 bigint 
not null, col1 varchar, col2 date " +
+                "constraint pk primary key (k1,k2 desc)) TTL = '%s'";
+        String ttl = "col1 is NULL AND col2 < CURRENT_DATE() + 30000";
+        String tableName = generateUniqueName();
+        String ddl = String.format(ddlTemplate, tableName, ttl);
+        try (Connection conn = DriverManager.getConnection(getUrl(), 
PropertiesUtil.deepCopy(TEST_PROPERTIES))) {
+            conn.createStatement().execute(ddl);
+            assertConditonTTL(conn, tableName, ttl);
+        }
+    }
+
+    @Test
+    public void testBooleanColumn() throws SQLException {
+        String ddlTemplate = "create table %s (k1 bigint not null, k2 bigint 
not null, expired BOOLEAN " +
+                "constraint pk primary key (k1,k2 desc)) TTL = '%s'";
+        String ttl = "expired";
+        String tableName = generateUniqueName();
+        String ddl = String.format(ddlTemplate, tableName, ttl);
+        try (Connection conn = DriverManager.getConnection(getUrl(), 
PropertiesUtil.deepCopy(TEST_PROPERTIES))) {
+            conn.createStatement().execute(ddl);
+            assertConditonTTL(conn, tableName, ttl);
+        }
+    }
+
+    @Test
+    public void testNot() throws SQLException {
+        String ddlTemplate = "create table %s (k1 bigint not null, k2 bigint 
not null, expired BOOLEAN " +
+                "constraint pk primary key (k1,k2 desc)) TTL = '%s'";
+        String ttl = "NOT expired";
+        String tableName = generateUniqueName();
+        String ddl = String.format(ddlTemplate, tableName, ttl);
+        try (Connection conn = DriverManager.getConnection(getUrl(), 
PropertiesUtil.deepCopy(TEST_PROPERTIES))) {
+            conn.createStatement().execute(ddl);
+            assertConditonTTL(conn, tableName, ttl);
+        }
+    }
+
+    @Test
+    public void testPhoenixRowTimestamp() throws SQLException {
+        String ddlTemplate = "create table %s (k1 bigint not null, k2 bigint 
not null, col1 varchar " +
+                "constraint pk primary key (k1,k2 desc)) TTL = '%s'";
+        String ttl = "PHOENIX_ROW_TIMESTAMP() < CURRENT_DATE() - 100";
+        String tableName = generateUniqueName();
+        String ddl = String.format(ddlTemplate, tableName, ttl);
+        try (Connection conn = DriverManager.getConnection(getUrl(), 
PropertiesUtil.deepCopy(TEST_PROPERTIES))) {
+            conn.createStatement().execute(ddl);
+            assertConditonTTL(conn, tableName, ttl);
+        }
+    }
+
+    @Test
+    public void testBooleanCaseExpression() throws SQLException {
+        String ddlTemplate = "create table %s (k1 bigint not null, k2 bigint 
not null, col1 varchar, status char(1) " +
+                "constraint pk primary key (k1,k2 desc)) TTL = '%s'";
+        String ttl = "CASE WHEN status = ''E'' THEN TRUE ELSE FALSE END";
+        String expectedTTLExpr = "CASE WHEN status = 'E' THEN TRUE ELSE FALSE 
END";
+        String tableName = generateUniqueName();
+        String ddl = String.format(ddlTemplate, tableName, ttl);
+        try (Connection conn = DriverManager.getConnection(getUrl(), 
PropertiesUtil.deepCopy(TEST_PROPERTIES))) {
+            conn.createStatement().execute(ddl);
+            assertConditonTTL(conn, tableName, expectedTTLExpr);
+        }
+    }
+
+    @Test
+    public void testCondTTLOnTopLevelView() throws SQLException {
+        String ddlTemplate = "create table %s (k1 bigint not null primary 
key," +
+                "k2 bigint, col1 varchar, status char(1))";
+        String tableName = generateUniqueName();
+        String viewName = generateUniqueName();
+        String viewTemplate = "create view %s (k3 smallint) as select * from 
%s WHERE k1=7 TTL = '%s'";
+        String ttl = "k2 = 34 and k3 = -1";
+        String ddl = String.format(ddlTemplate, tableName);
+        try (Connection conn = DriverManager.getConnection(getUrl(), 
PropertiesUtil.deepCopy(TEST_PROPERTIES))) {
+            conn.createStatement().execute(ddl);
+            ddl = String.format(viewTemplate, viewName, tableName, ttl);
+            conn.createStatement().execute(ddl);
+            assertConditonTTL(conn, tableName, 
TTLExpression.TTL_EXPRESSION_NOT_DEFINED);
+            assertConditonTTL(conn, viewName, ttl);
+        }
+    }
+
+    @Test
+    public void testCondTTLOnMultiLevelView() throws SQLException {
+        String ddlTemplate = "create table %s (k1 bigint not null primary 
key," +
+                "k2 bigint, col1 varchar, status char(1))";
+        String tableName = generateUniqueName();
+        String parentView = generateUniqueName();
+        String childView = generateUniqueName();
+        String parentViewTemplate = "create view %s (k3 smallint) as select * 
from %s WHERE k1=7";
+        String childViewTemplate = "create view %s as select * from %s TTL = 
'%s'";
+        String ttl = "k2 = 34 and k3 = -1";
+        String ddl = String.format(ddlTemplate, tableName);
+        try (Connection conn = DriverManager.getConnection(getUrl(), 
PropertiesUtil.deepCopy(TEST_PROPERTIES))) {
+            conn.createStatement().execute(ddl);
+            ddl = String.format(parentViewTemplate, parentView, tableName);
+            conn.createStatement().execute(ddl);
+            ddl = String.format(childViewTemplate, childView, parentView, ttl);
+            conn.createStatement().execute(ddl);
+            assertConditonTTL(conn, tableName, 
TTLExpression.TTL_EXPRESSION_NOT_DEFINED);
+            assertConditonTTL(conn, parentView, 
TTLExpression.TTL_EXPRESSION_NOT_DEFINED);
+            assertConditonTTL(conn, childView, ttl);
+        }
+    }
+}
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/schema/TTLExpressionTest.java 
b/phoenix-core/src/test/java/org/apache/phoenix/schema/TTLExpressionTest.java
new file mode 100644
index 0000000000..0d1d3dc46c
--- /dev/null
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/schema/TTLExpressionTest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.schema;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
+import org.junit.Test;
+
+public class TTLExpressionTest {
+
+    @Test
+    public void testLiteralExpression() {
+        int ttl = 100;
+        LiteralTTLExpression literal = new LiteralTTLExpression(ttl);
+        assertEquals(literal, TTLExpression.create(ttl));
+        assertEquals(literal, TTLExpression.create(String.valueOf(ttl)));
+    }
+
+    @Test
+    public void testForever() {
+        assertEquals(TTLExpression.TTL_EXPRESSION_FORVER,
+                TTLExpression.create(PhoenixDatabaseMetaData.FOREVER_TTL));
+        assertEquals(TTLExpression.TTL_EXPRESSION_FORVER,
+                TTLExpression.create(HConstants.FOREVER));
+    }
+
+    @Test
+    public void testNone() {
+        assertEquals(TTLExpression.TTL_EXPRESSION_NOT_DEFINED,
+                TTLExpression.create(PhoenixDatabaseMetaData.NONE_TTL));
+        assertEquals(TTLExpression.TTL_EXPRESSION_NOT_DEFINED,
+                TTLExpression.create(PhoenixDatabaseMetaData.TTL_NOT_DEFINED));
+        
assertNull(TTLExpression.TTL_EXPRESSION_NOT_DEFINED.getTTLForScanAttribute());
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testInvalidLiteral() {
+        TTLExpression.create(-1);
+    }
+
+    @Test
+    public void testConditionalExpression() {
+        String ttl = "PK1 = 5 AND COL1 > 'abc'";
+        ConditionTTLExpression expected = new ConditionTTLExpression(ttl);
+        TTLExpression actual = TTLExpression.create(ttl);
+        assertEquals(expected, actual);
+        assertEquals(ttl, expected.getTTLExpression());
+        assertNull(actual.getTTLForScanAttribute());
+    }
+}
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/util/ScanUtilTest.java 
b/phoenix-core/src/test/java/org/apache/phoenix/util/ScanUtilTest.java
index e088bbfab5..abdba86f1d 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/util/ScanUtilTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/util/ScanUtilTest.java
@@ -536,7 +536,7 @@ public class ScanUtilTest {
 
                 long timestamp44 = 44L;
                 Scan testScan = new Scan();
-                testScan.setAttribute(BaseScannerRegionObserverConstants.TTL, 
Bytes.toBytes(1L));
+                testScan.setAttribute(BaseScannerRegionObserverConstants.TTL, 
Bytes.toBytes("1"));
                 // Test isTTLExpired
                 Assert.assertTrue(ScanUtil.isTTLExpired(cell42, testScan, 
timestamp44));
                 Assert.assertFalse(ScanUtil.isTTLExpired(cell43, testScan, 
timestamp44));
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java 
b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
index c071129dcb..8204c535b9 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
@@ -146,6 +146,7 @@ import 
org.apache.phoenix.schema.PTable.QualifierEncodingScheme;
 import org.apache.phoenix.schema.PTableKey;
 import org.apache.phoenix.schema.RowKeyValueAccessor;
 import org.apache.phoenix.schema.SortOrder;
+import org.apache.phoenix.schema.TTLExpression;
 import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.schema.stats.GuidePostsInfo;
 import org.apache.phoenix.schema.stats.GuidePostsKey;
@@ -1339,14 +1340,15 @@ public class TestUtil {
 
     public static void assertTableHasTtl(Connection conn, TableName tableName, 
int ttl, boolean phoenixTTLEnabled)
         throws SQLException, IOException {
-        long tableTTL = -1;
+        TTLExpression tableTTL;
         if (phoenixTTLEnabled) {
             tableTTL = conn.unwrap(PhoenixConnection.class).getTable(new 
PTableKey(null,
                     tableName.getNameAsString())).getTTL();
         } else {
-            tableTTL = getColumnDescriptor(conn, tableName).getTimeToLive();
+            tableTTL = TTLExpression.create(getColumnDescriptor(conn, 
tableName).getTimeToLive());
         }
-        Assert.assertEquals(ttl, tableTTL);
+        TTLExpression expectedTTL = TTLExpression.create(ttl);
+        Assert.assertEquals(expectedTTL, tableTTL);
     }
 
     public static void assertTableHasVersions(Connection conn, TableName 
tableName, int versions)


Reply via email to