This is an automated email from the ASF dual-hosted git repository.

vjasani pushed a commit to branch PHOENIX-7001-feature
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/PHOENIX-7001-feature by this 
push:
     new 581e61365c PHOENIX-7008 Implementation for CREATE CDC  (#1681)
581e61365c is described below

commit 581e61365cac22ce85592179703029adcfd9af24
Author: Hari Krishna Dara <harid...@gmail.com>
AuthorDate: Sat Sep 30 09:24:52 2023 +0530

    PHOENIX-7008 Implementation for CREATE CDC  (#1681)
---
 .gitignore                                         |   3 +
 .../java/org/apache/phoenix/end2end/CDCMiscIT.java | 152 ++++++++++
 .../phoenix/compile/CreateIndexCompiler.java       |   2 +-
 .../phoenix/coprocessor/MetaDataEndpointImpl.java  |  17 +-
 .../phoenix/coprocessor/MetaDataProtocol.java      |   2 +-
 .../coprocessor/PhoenixAccessController.java       |   6 +-
 .../apache/phoenix/exception/SQLExceptionCode.java |   7 +-
 .../phoenix/jdbc/PhoenixDatabaseMetaData.java      |   7 +
 .../org/apache/phoenix/jdbc/PhoenixStatement.java  |  15 +-
 .../phoenix/query/ConnectionQueryServicesImpl.java |  17 +-
 .../org/apache/phoenix/query/QueryConstants.java   |   5 +
 .../org/apache/phoenix/schema/DelegateTable.java   |   6 +
 .../org/apache/phoenix/schema/MetaDataClient.java  | 325 ++++++++++++++-------
 .../java/org/apache/phoenix/schema/PTable.java     |  18 +-
 .../java/org/apache/phoenix/schema/PTableImpl.java |  24 ++
 .../java/org/apache/phoenix/schema/PTableType.java |   3 +-
 .../org/apache/phoenix/schema/TableProperty.java   |  33 +++
 .../main/java/org/apache/phoenix/util/CDCUtil.java |  85 ++++++
 .../java/org/apache/phoenix/util/SchemaUtil.java   |   3 +-
 phoenix-core/src/main/protobuf/PTable.proto        |   2 +
 .../org/apache/phoenix/parse/QueryParserTest.java  |  42 ++-
 .../java/org/apache/phoenix/util/CDCUtilTest.java  |  39 +++
 22 files changed, 671 insertions(+), 142 deletions(-)

diff --git a/.gitignore b/.gitignore
index 1d33a1cbe0..bebe16f280 100644
--- a/.gitignore
+++ b/.gitignore
@@ -32,3 +32,6 @@ phoenix-hbase-compat-1.3.0/
 phoenix-hbase-compat-1.4.0/
 phoenix-hbase-compat-1.5.0/
 */hbase.log
+
+# Vim swap files
+.*.sw*
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCMiscIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCMiscIT.java
new file mode 100644
index 0000000000..380445e22a
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCMiscIT.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.TableProperty;
+import org.apache.phoenix.util.CDCUtil;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Properties;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+@Category(ParallelStatsDisabledTest.class)
+public class CDCMiscIT extends ParallelStatsDisabledIT {
+    private void assertCDCState(Connection conn, String cdcName, String 
expInclude,
+                                int idxType) throws SQLException {
+        try (ResultSet rs = conn.createStatement().executeQuery("SELECT 
cdc_include FROM " +
+                "system.catalog WHERE table_name = '" + cdcName +
+                "' AND column_name IS NULL and column_family IS NULL")) {
+            assertEquals(true, rs.next());
+            assertEquals(expInclude, rs.getString(1));
+        }
+        try (ResultSet rs = conn.createStatement().executeQuery("SELECT 
index_type FROM " +
+                "system.catalog WHERE table_name = '" + 
CDCUtil.getCDCIndexName(cdcName) +
+                "' AND column_name IS NULL and column_family IS NULL")) {
+                assertEquals(true, rs.next());
+            assertEquals(idxType, rs.getInt(1));
+        }
+    }
+
+    private void assertPTable(String cdcName, Set<PTable.CDCChangeScope> 
expIncludeScopes)
+            throws SQLException {
+        Properties props = new Properties();
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        PTable table = PhoenixRuntime.getTable(conn, cdcName);
+        assertEquals(expIncludeScopes, table.getCDCIncludeScopes());
+        assertEquals(expIncludeScopes, 
TableProperty.INCLUDE.getPTableValue(table));
+    }
+
+    @Test
+    public void testCreate() throws SQLException {
+        Properties props = new Properties();
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        String tableName = generateUniqueName();
+        conn.createStatement().execute(
+                "CREATE TABLE  " + tableName + " ( k INTEGER PRIMARY KEY," + " 
v1 INTEGER,"
+                        + " v2 DATE)");
+        String cdcName = generateUniqueName();
+
+        try {
+            conn.createStatement().execute("CREATE CDC " + cdcName
+                    + " ON NON_EXISTENT_TABLE (PHOENIX_ROW_TIMESTAMP())");
+            fail("Expected to fail due to non-existent table");
+        } catch (SQLException e) {
+            assertEquals(SQLExceptionCode.TABLE_UNDEFINED.getErrorCode(), 
e.getErrorCode());
+        }
+
+        try {
+            conn.createStatement().execute("CREATE CDC " + cdcName
+                    + " ON " + tableName +"(UNKNOWN_FUNCTION())");
+            fail("Expected to fail due to invalid function");
+        } catch (SQLException e) {
+            assertEquals(SQLExceptionCode.FUNCTION_UNDEFINED.getErrorCode(), 
e.getErrorCode());
+        }
+
+        try {
+            conn.createStatement().execute("CREATE CDC " + cdcName
+                    + " ON " + tableName +"(NOW())");
+            fail("Expected to fail due to non-deterministic function");
+        } catch (SQLException e) {
+            
assertEquals(SQLExceptionCode.NON_DETERMINISTIC_EXPRESSION_NOT_ALLOWED_IN_INDEX.
+                    getErrorCode(), e.getErrorCode());
+        }
+
+        try {
+            conn.createStatement().execute("CREATE CDC " + cdcName
+                    + " ON " + tableName +"(ROUND(v1))");
+            fail("Expected to fail due to non-timestamp expression in the 
index PK");
+        } catch (SQLException e) {
+            
assertEquals(SQLExceptionCode.INCORRECT_DATATYPE_FOR_EXPRESSION.getErrorCode(),
+                    e.getErrorCode());
+        }
+
+        try {
+            conn.createStatement().execute("CREATE CDC " + cdcName
+                    + " ON " + tableName +"(v1)");
+            fail("Expected to fail due to non-timestamp column in the index 
PK");
+        } catch (SQLException e) {
+            
assertEquals(SQLExceptionCode.INCORRECT_DATATYPE_FOR_EXPRESSION.getErrorCode(),
+                    e.getErrorCode());
+        }
+
+        String cdc_sql = "CREATE CDC " + cdcName
+                + " ON " + tableName + "(PHOENIX_ROW_TIMESTAMP())";
+        conn.createStatement().execute(cdc_sql);
+        assertCDCState(conn, cdcName, null, 3);
+
+        try {
+            conn.createStatement().execute(cdc_sql);
+            fail("Expected to fail due to duplicate index");
+        } catch (SQLException e) {
+            assertEquals(SQLExceptionCode.TABLE_ALREADY_EXIST.getErrorCode(), 
e.getErrorCode());
+            assertTrue(e.getMessage().endsWith(cdcName));
+        }
+        conn.createStatement().execute("CREATE CDC IF NOT EXISTS " + cdcName + 
" ON " + tableName +
+                "(v2) INCLUDE (pre, post) INDEX_TYPE=g");
+
+        cdcName = generateUniqueName();
+        conn.createStatement().execute("CREATE CDC " + cdcName + " ON " + 
tableName +
+                "(v2) INCLUDE (pre, post) INDEX_TYPE=g");
+        assertCDCState(conn, cdcName, "PRE,POST", 3);
+        assertPTable(cdcName, new HashSet<>(
+                Arrays.asList(PTable.CDCChangeScope.PRE, 
PTable.CDCChangeScope.POST)));
+
+        cdcName = generateUniqueName();
+        conn.createStatement().execute("CREATE CDC " + cdcName + " ON " + 
tableName +
+                "(v2) INDEX_TYPE=l");
+        assertCDCState(conn, cdcName, null, 2);
+        assertPTable(cdcName, null);
+
+        conn.close();
+    }
+}
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/compile/CreateIndexCompiler.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/compile/CreateIndexCompiler.java
index 8a6e239897..1279f83b25 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/compile/CreateIndexCompiler.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/compile/CreateIndexCompiler.java
@@ -82,7 +82,7 @@ public class CreateIndexCompiler {
         return new BaseMutationPlan(context, operation) {
             @Override
             public MutationState execute() throws SQLException {
-                return client.createIndex(create, splits);
+                return client.createIndex(create, splits, null);
             }
 
             @Override
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
index 7db23f4678..553a925374 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
@@ -82,6 +82,7 @@ import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_INDEX_ID_BYTE
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_INDEX_ID_DATA_TYPE_BYTES;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_STATEMENT_BYTES;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_TYPE_BYTES;
+import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CDC_INCLUDE_BYTES;
 import static 
org.apache.phoenix.query.QueryConstants.VIEW_MODIFIED_PROPERTY_TAG_TYPE;
 import static org.apache.phoenix.schema.PTableImpl.getColumnsToClone;
 import static org.apache.phoenix.schema.PTableType.INDEX;
@@ -246,6 +247,7 @@ import org.apache.phoenix.schema.types.PVarchar;
 import org.apache.phoenix.trace.util.Tracing;
 import org.apache.phoenix.transaction.TransactionFactory;
 import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.util.CDCUtil;
 import org.apache.phoenix.util.EncodedColumnsUtil;
 import org.apache.phoenix.util.EnvironmentEdgeManager;
 import org.apache.phoenix.util.MetaDataUtil;
@@ -365,6 +367,8 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
         TABLE_FAMILY_BYTES, EXTERNAL_SCHEMA_ID_BYTES);
     private static final Cell STREAMING_TOPIC_NAME_KV = 
createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY,
         TABLE_FAMILY_BYTES, STREAMING_TOPIC_NAME_BYTES);
+    private static final Cell CDC_INCLUDE_KV = 
createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY,
+            TABLE_FAMILY_BYTES, CDC_INCLUDE_BYTES);
 
     private static final List<Cell> TABLE_KV_COLUMNS = Lists.newArrayList(
             EMPTY_KEYVALUE_KV,
@@ -404,7 +408,8 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
             CHANGE_DETECTION_ENABLED_KV,
             SCHEMA_VERSION_KV,
             EXTERNAL_SCHEMA_ID_KV,
-            STREAMING_TOPIC_NAME_KV
+            STREAMING_TOPIC_NAME_KV,
+            CDC_INCLUDE_KV
     );
 
     static {
@@ -452,6 +457,7 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
         TABLE_KV_COLUMNS.indexOf(EXTERNAL_SCHEMA_ID_KV);
     private static final int STREAMING_TOPIC_NAME_INDEX =
         TABLE_KV_COLUMNS.indexOf(STREAMING_TOPIC_NAME_KV);
+    private static final int CDC_INCLUDE_INDEX = 
TABLE_KV_COLUMNS.indexOf(CDC_INCLUDE_KV);
     // KeyValues for Column
     private static final KeyValue DECIMAL_DIGITS_KV = 
createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, 
DECIMAL_DIGITS_BYTES);
     private static final KeyValue COLUMN_SIZE_KV = 
createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, 
COLUMN_SIZE_BYTES);
@@ -1423,6 +1429,15 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
         builder.setStreamingTopicName(streamingTopicName != null ? 
streamingTopicName :
             oldTable != null ? oldTable.getStreamingTopicName() : null);
 
+        Cell includeSpecKv = tableKeyValues[CDC_INCLUDE_INDEX];
+        String includeSpec = includeSpecKv != null ?
+                (String) 
PVarchar.INSTANCE.toObject(includeSpecKv.getValueArray(),
+                        includeSpecKv.getValueOffset(), 
includeSpecKv.getValueLength())
+                : null;
+        builder.setCDCIncludeScopes(includeSpec != null ?
+                CDCUtil.makeChangeScopeEnumsFromString(includeSpec) :
+                oldTable != null ? oldTable.getCDCIncludeScopes() : null);
+
         // Check the cell tag to see whether the view has modified this 
property
         final byte[] tagUseStatsForParallelization = 
(useStatsForParallelizationKv == null) ?
                 HConstants.EMPTY_BYTE_ARRAY :
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
index af6c1cf312..a88a991a04 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
@@ -99,7 +99,7 @@ public abstract class MetaDataProtocol extends 
MetaDataService {
     public static final long MIN_SYSTEM_TABLE_TIMESTAMP_4_15_0 = 
MIN_TABLE_TIMESTAMP + 29;
     public static final long MIN_SYSTEM_TABLE_TIMESTAMP_4_16_0 = 
MIN_TABLE_TIMESTAMP + 33;
     public static final long MIN_SYSTEM_TABLE_TIMESTAMP_5_1_0 = 
MIN_SYSTEM_TABLE_TIMESTAMP_4_16_0;
-    public static final long MIN_SYSTEM_TABLE_TIMESTAMP_5_2_0 = 
MIN_TABLE_TIMESTAMP + 37;
+    public static final long MIN_SYSTEM_TABLE_TIMESTAMP_5_2_0 = 
MIN_TABLE_TIMESTAMP + 38;
     // MIN_SYSTEM_TABLE_TIMESTAMP needs to be set to the max of all the 
MIN_SYSTEM_TABLE_TIMESTAMP_* constants
     public static final long MIN_SYSTEM_TABLE_TIMESTAMP = 
MIN_SYSTEM_TABLE_TIMESTAMP_5_2_0;
 
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PhoenixAccessController.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PhoenixAccessController.java
index 74d94c1333..1039d88947 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PhoenixAccessController.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PhoenixAccessController.java
@@ -173,7 +173,7 @@ public class PhoenixAccessController extends 
BaseMetaDataEndpointObserver {
             Set<byte[]> familySet, Set<TableName> indexes) throws IOException {
         if (!accessCheckEnabled) { return; }
         
-        if (tableType != PTableType.VIEW) {
+        if (tableType != PTableType.VIEW && tableType != PTableType.CDC) {
             TableDescriptorBuilder tableDescBuilder = 
TableDescriptorBuilder.newBuilder(physicalTableName);
             for (byte[] familyName : familySet) {
                 
tableDescBuilder.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(familyName).build());
@@ -184,9 +184,9 @@ public class PhoenixAccessController extends 
BaseMetaDataEndpointObserver {
             }
         }
 
-        // Index and view require read access on parent physical table.
+        // Index, view and CDC require read access on parent physical table.
         Set<TableName> physicalTablesChecked = new HashSet<TableName>();
-        if (tableType == PTableType.VIEW || tableType == PTableType.INDEX) {
+        if (tableType == PTableType.VIEW || tableType == PTableType.INDEX || 
tableType == PTableType.CDC) {
             physicalTablesChecked.add(parentPhysicalTableName);
             if (execPermissionsCheckEnabled) {
                 requireAccess("Create" + tableType, parentPhysicalTableName, 
Action.READ, Action.EXEC);
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java 
b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
index 67dd2635a1..2fb221f165 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
@@ -168,8 +168,9 @@ public enum SQLExceptionCode {
      AGGREGATE_EXPRESSION_NOT_ALLOWED_IN_INDEX(520, "42897", "Aggregate 
expression not allowed in an index."),
      NON_DETERMINISTIC_EXPRESSION_NOT_ALLOWED_IN_INDEX(521, "42898", 
"Non-deterministic expression not allowed in an index."),
      STATELESS_EXPRESSION_NOT_ALLOWED_IN_INDEX(522, "42899", "Stateless 
expression not allowed in an index."),
-     
-     /**
+     INCORRECT_DATATYPE_FOR_EXPRESSION(539, "42102", "Expression datatype is 
incorrect for this index."),
+
+    /**
       *  Transaction exceptions.
       */
      TRANSACTION_CONFLICT_EXCEPTION(523, "42900", "Transaction aborted due to 
conflict with other mutations."),
@@ -351,6 +352,8 @@ public enum SQLExceptionCode {
             + PhoenixDatabaseMetaData.PHOENIX_TTL + " property on an view when 
parent/child view has PHOENIX_TTL set,"),
     CHANGE_DETECTION_SUPPORTED_FOR_TABLES_AND_VIEWS_ONLY(10954, "44A36",
         CHANGE_DETECTION_ENABLED + " is only supported on tables and views"),
+    UNKNOWN_INDEX_TYPE(1098,"44A37", "Unknown INDEX type: "),
+    UNKNOWN_INCLUDE_CHANGE_SCOPE(1099,"44A38", "Unknown change scope for 
INCLUDE: "),
 
     /** Sequence related */
     SEQUENCE_ALREADY_EXIST(1200, "42Z00", "Sequence already exists.", new 
Factory() {
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
index 4283a59e00..5a95bf4a52 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
@@ -430,6 +430,13 @@ public class PhoenixDatabaseMetaData implements 
DatabaseMetaData {
     public static final String SYSTEM_TRANSFORM_TABLE = "TRANSFORM";
     public static final String SYSTEM_TRANSFORM_NAME = 
SchemaUtil.getTableName(SYSTEM_CATALOG_SCHEMA, SYSTEM_TRANSFORM_TABLE);
 
+    public static final String CDC_INCLUDE_NAME = "INCLUDE";
+    public static final String CDC_INCLUDE_TABLE = "CDC_INCLUDE";
+    public static final byte[] CDC_INCLUDE_BYTES = 
Bytes.toBytes(CDC_INCLUDE_TABLE);
+
+    // This is just a virtual property on CDC that translates to the type of 
index created.
+    public static final String CDC_INDEX_TYPE_NAME = "INDEX_TYPE";
+
     //SYSTEM:LOG
     public static final String SYSTEM_LOG_TABLE = "LOG";
     public static final String SYSTEM_LOG_NAME =
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java 
b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
index 54f99fe437..7589ba5021 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
@@ -1082,7 +1082,20 @@ public class PhoenixStatement implements 
PhoenixMonitoredStatement, SQLCloseable
         @Override
         public MutationPlan compilePlan(PhoenixStatement stmt,
                                         Sequence.ValueOp seqAction) throws 
SQLException {
-            return null;
+            final StatementContext context = new StatementContext(stmt);
+            return new BaseMutationPlan(context, this.getOperation()) {
+
+                @Override
+                public ExplainPlan getExplainPlan() throws SQLException {
+                    return new ExplainPlan(Collections.singletonList("CREATE 
CDC"));
+                }
+
+                @Override
+                public MutationState execute() throws SQLException {
+                    MetaDataClient client = new 
MetaDataClient(getContext().getConnection());
+                    return client.createCDC(ExecutableCreateCDCStatement.this);
+                }
+            };
         }
     }
 
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index 551be97e1c..5d1273551e 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -2106,8 +2106,10 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
                 break;
             }
         }
-        if ((tableType == PTableType.VIEW && physicalTableName != null) ||
-                (tableType != PTableType.VIEW && (physicalTableName == null || 
localIndexTable))) {
+        if ((tableType != PTableType.CDC) && (
+                (tableType == PTableType.VIEW && physicalTableName != null) ||
+                (tableType != PTableType.VIEW && (physicalTableName == null || 
localIndexTable))
+        )) {
             // For views this will ensure that metadata already exists
             // For tables and indexes, this will create the metadata if it 
doesn't already exist
             ensureTableCreated(physicalTableNameBytes, null, tableType, 
tableProps, families, splits, true,
@@ -4096,19 +4098,22 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
         }
         if (currentServerSideTableTimeStamp < 
MIN_SYSTEM_TABLE_TIMESTAMP_5_2_0) {
             metaConnection = addColumnsIfNotExists(metaConnection,
-                    PhoenixDatabaseMetaData.SYSTEM_CATALOG, 
MIN_SYSTEM_TABLE_TIMESTAMP_5_2_0 -3,
+                    PhoenixDatabaseMetaData.SYSTEM_CATALOG, 
MIN_SYSTEM_TABLE_TIMESTAMP_5_2_0 - 4,
                     PhoenixDatabaseMetaData.PHYSICAL_TABLE_NAME
                             + " " + PVarchar.INSTANCE.getSqlTypeName());
 
             metaConnection = addColumnsIfNotExists(metaConnection, 
PhoenixDatabaseMetaData.SYSTEM_CATALOG,
-                MIN_SYSTEM_TABLE_TIMESTAMP_5_2_0 -2,
+                MIN_SYSTEM_TABLE_TIMESTAMP_5_2_0 - 3,
                     PhoenixDatabaseMetaData.SCHEMA_VERSION + " " + 
PVarchar.INSTANCE.getSqlTypeName());
             metaConnection = addColumnsIfNotExists(metaConnection, 
PhoenixDatabaseMetaData.SYSTEM_CATALOG,
-                MIN_SYSTEM_TABLE_TIMESTAMP_5_2_0 -1,
+                MIN_SYSTEM_TABLE_TIMESTAMP_5_2_0 - 2,
                 PhoenixDatabaseMetaData.EXTERNAL_SCHEMA_ID + " " + 
PVarchar.INSTANCE.getSqlTypeName());
             metaConnection = addColumnsIfNotExists(metaConnection, 
PhoenixDatabaseMetaData.SYSTEM_CATALOG,
-                MIN_SYSTEM_TABLE_TIMESTAMP_5_2_0,
+                MIN_SYSTEM_TABLE_TIMESTAMP_5_2_0 - 1,
                 PhoenixDatabaseMetaData.STREAMING_TOPIC_NAME + " " + 
PVarchar.INSTANCE.getSqlTypeName());
+            metaConnection = addColumnsIfNotExists(metaConnection,
+                    PhoenixDatabaseMetaData.SYSTEM_CATALOG, 
MIN_SYSTEM_TABLE_TIMESTAMP_5_2_0,
+                    PhoenixDatabaseMetaData.CDC_INCLUDE_TABLE + " " + 
PVarchar.INSTANCE.getSqlTypeName());
             UpgradeUtil.bootstrapLastDDLTimestampForIndexes(metaConnection);
         }
         return metaConnection;
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java 
b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
index 52326c189d..b3bab6b637 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
@@ -47,6 +47,7 @@ import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.BASE_COLUMN_COUNT;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.BIND_PARAMETERS;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.BUFFER_LENGTH;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CACHE_SIZE;
+import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CDC_INCLUDE_TABLE;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CHANGE_DETECTION_ENABLED;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CHAR_OCTET_LENGTH;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CLASS_NAME;
@@ -303,6 +304,9 @@ public interface QueryConstants {
 
     // custom TagType
     byte VIEW_MODIFIED_PROPERTY_TAG_TYPE = (byte) 70;
+
+    String CDC_JSON_COL_NAME = "CDC JSON";
+
     /**
      * We mark counter values 0 to 10 as reserved. Value 0 is used by
      * {@link #ENCODED_EMPTY_COLUMN_NAME}. Values 1-10
@@ -342,6 +346,7 @@ public interface QueryConstants {
             SCHEMA_VERSION + " VARCHAR, \n" +
             EXTERNAL_SCHEMA_ID + " VARCHAR, \n" +
             STREAMING_TOPIC_NAME + " VARCHAR, \n" +
+            CDC_INCLUDE_TABLE + " VARCHAR, \n" +
             // Column metadata (will be null for table row)
             DATA_TYPE + " INTEGER," +
             COLUMN_SIZE + " INTEGER," +
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java
index b79691b335..54a959f177 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java
@@ -19,6 +19,7 @@ package org.apache.phoenix.schema;
 
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
@@ -397,6 +398,11 @@ public class DelegateTable implements PTable {
     @Override
     public String getStreamingTopicName() { return 
delegate.getStreamingTopicName(); }
 
+    @Override
+    public Set<CDCChangeScope> getCDCIncludeScopes() {
+        return delegate.getCDCIncludeScopes();
+    }
+
     @Override public Map<String, String> getPropertyValues() { return 
delegate.getPropertyValues(); }
 
     @Override public Map<String, String> getDefaultPropertyValues() { return 
delegate.getDefaultPropertyValues(); }
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index 4fbee3cc24..ca30b9a215 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -19,6 +19,8 @@ package org.apache.phoenix.schema;
 
 import static 
org.apache.phoenix.exception.SQLExceptionCode.CANNOT_TRANSFORM_TRANSACTIONAL_TABLE;
 import static 
org.apache.phoenix.exception.SQLExceptionCode.ERROR_WRITING_TO_SCHEMA_REGISTRY;
+import static 
org.apache.phoenix.exception.SQLExceptionCode.TABLE_ALREADY_EXIST;
+import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CDC_INCLUDE_TABLE;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.STREAMING_TOPIC_NAME;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_TASK_TABLE;
 import static org.apache.phoenix.query.QueryConstants.SYSTEM_SCHEMA_NAME;
@@ -44,7 +46,6 @@ import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_DEF;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_ENCODED_BYTES;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_FAMILY;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_NAME;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_QUALIFIER;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_QUALIFIER_COUNTER;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_SIZE;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DATA_TABLE_NAME;
@@ -64,7 +65,6 @@ import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.INDEX_TYPE;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IS_ARRAY;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IS_CONSTANT;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IS_NAMESPACE_MAPPED;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IS_ROW_TIMESTAMP;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IS_VIEW_REFERENCED;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.JAR_PATH;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.KEY_SEQ;
@@ -121,7 +121,6 @@ import static 
org.apache.phoenix.schema.PTable.ImmutableStorageScheme.ONE_CELL_P
 import static 
org.apache.phoenix.schema.PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS;
 import static 
org.apache.phoenix.schema.PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS;
 import static org.apache.phoenix.schema.PTable.ViewType.MAPPED;
-import static org.apache.phoenix.schema.PTableImpl.getColumnsToClone;
 import static org.apache.phoenix.schema.PTableType.INDEX;
 import static org.apache.phoenix.schema.PTableType.TABLE;
 import static org.apache.phoenix.schema.PTableType.VIEW;
@@ -139,6 +138,7 @@ import java.sql.SQLFeatureNotSupportedException;
 import java.sql.Timestamp;
 import java.sql.Types;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.BitSet;
 import java.util.Collection;
 import java.util.Collections;
@@ -155,6 +155,7 @@ import java.util.Set;
 import java.util.HashSet;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.phoenix.parse.CreateCDCStatement;
 import org.apache.phoenix.schema.task.SystemTaskParams;
 import 
org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.hbase.HConstants;
@@ -190,6 +191,8 @@ import 
org.apache.phoenix.coprocessor.MetaDataProtocol.MutationCode;
 import org.apache.phoenix.coprocessor.MetaDataProtocol.SharedTableState;
 import org.apache.phoenix.schema.stats.GuidePostsInfo;
 import org.apache.phoenix.schema.transform.Transform;
+import 
org.apache.phoenix.thirdparty.com.google.common.collect.ArrayListMultimap;
+import org.apache.phoenix.util.CDCUtil;
 import org.apache.phoenix.util.TaskMetaDataServiceCallBack;
 import org.apache.phoenix.util.ViewUtil;
 import org.apache.phoenix.util.JacksonUtil;
@@ -290,7 +293,6 @@ import 
org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
 import org.apache.phoenix.thirdparty.com.google.common.collect.Sets;
 import org.apache.phoenix.thirdparty.com.google.common.primitives.Ints;
 
-
 public class MetaDataClient {
     private static final Logger LOGGER = 
LoggerFactory.getLogger(MetaDataClient.class);
 
@@ -347,9 +349,10 @@ public class MetaDataClient {
                     CHANGE_DETECTION_ENABLED + "," +
                     PHYSICAL_TABLE_NAME + "," +
                     SCHEMA_VERSION + "," +
-                    STREAMING_TOPIC_NAME +
+                    STREAMING_TOPIC_NAME + "," +
+                    CDC_INCLUDE_TABLE +
                     ") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 
?, ?, ?, ?, ?, ?, " +
-                "?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
+                "?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
 
     private static final String CREATE_SCHEMA = "UPSERT INTO " + 
SYSTEM_CATALOG_SCHEMA + ".\"" + SYSTEM_CATALOG_TABLE
             + "\"( " + TABLE_SCHEM + "," + TABLE_NAME + ") VALUES (?,?)";
@@ -1019,7 +1022,9 @@ public class MetaDataClient {
                         true, NamedTableNode.create(statement.getTableName()), 
statement.getTableType(), false, null);
             }
         }
-        table = createTableInternal(statement, splits, parent, viewStatement, 
viewType, viewIndexIdType, viewColumnConstants, isViewColumnReferenced, false, 
null, null, tableProps, commonFamilyProps);
+        table = createTableInternal(statement, splits, parent, viewStatement, 
viewType,
+                viewIndexIdType, viewColumnConstants, isViewColumnReferenced, 
false, null,
+                null, null, tableProps, commonFamilyProps);
 
         if (table == null || table.getType() == PTableType.VIEW
                 || statement.isNoVerify() /*|| table.isTransactional()*/) {
@@ -1177,9 +1182,10 @@ public class MetaDataClient {
                 + " IS NULL AND " + LAST_STATS_UPDATE_TIME + " IS NOT NULL";
             try (PreparedStatement selectStatsStmt = 
connection.prepareStatement(query)) {
                 selectStatsStmt.setString(1, physicalName.getString());
-                ResultSet rs = selectStatsStmt.executeQuery(query);
-                if (rs.next()) {
-                    msSinceLastUpdate = rs.getLong(1) - rs.getLong(2);
+                try (ResultSet rs = selectStatsStmt.executeQuery(query)) {
+                    if (rs.next()) {
+                        msSinceLastUpdate = rs.getLong(1) - rs.getLong(2);
+                    }
                 }
             }
         }
@@ -1417,10 +1423,11 @@ public class MetaDataClient {
      *    listed as an index column.
      * @param statement
      * @param splits
+     * @param indexPKExpresionsType If non-{@code null}, all PK expressions 
should be of this specific type.
      * @return MutationState from population of index table from data table
      * @throws SQLException
      */
-    public MutationState createIndex(CreateIndexStatement statement, byte[][] 
splits) throws SQLException {
+    public MutationState createIndex(CreateIndexStatement statement, byte[][] 
splits, PDataType indexPKExpresionsType) throws SQLException {
         IndexKeyConstraint ik = statement.getIndexConstraint();
         TableName indexTableName = statement.getIndexTableName();
 
@@ -1535,6 +1542,9 @@ public class MetaDataClient {
                 if (expression.isStateless()) {
                     throw new 
SQLExceptionInfo.Builder(SQLExceptionCode.STATELESS_EXPRESSION_NOT_ALLOWED_IN_INDEX).build().buildException();
                 }
+                if (indexPKExpresionsType != null && expression.getDataType() 
!= indexPKExpresionsType) {
+                    throw new 
SQLExceptionInfo.Builder(SQLExceptionCode.INCORRECT_DATATYPE_FOR_EXPRESSION).build().buildException();
+                }
                 unusedPkColumns.remove(expression);
 
                 // Go through parse node to get string as otherwise we
@@ -1666,7 +1676,9 @@ public class MetaDataClient {
 
             tableProps.put(MetaDataUtil.DATA_TABLE_NAME_PROP_NAME, 
dataTable.getPhysicalName().getString());
             CreateTableStatement tableStatement = 
FACTORY.createTable(indexTableName, statement.getProps(), columnDefs, pk, 
statement.getSplitNodes(), PTableType.INDEX, statement.ifNotExists(), null, 
null, statement.getBindCount(), null);
-            table = createTableInternal(tableStatement, splits, dataTable, 
null, null, getViewIndexDataType() ,null, null, allocateIndexId, 
statement.getIndexType(), asyncCreatedDate, tableProps, commonFamilyProps);
+            table = createTableInternal(tableStatement, splits, dataTable, 
null, null,
+                    getViewIndexDataType() ,null, null, allocateIndexId,
+                    statement.getIndexType(), asyncCreatedDate, null, 
tableProps, commonFamilyProps);
         }
         finally {
             deleteMutexCells(physicalSchemaName, physicalTableName, 
acquiredColumnMutexSet);
@@ -1698,6 +1710,83 @@ public class MetaDataClient {
         return buildIndex(table, tableRef);
     }
 
+    public MutationState createCDC(CreateCDCStatement statement) throws 
SQLException {
+        Map<String,Object> tableProps = Maps.newHashMapWithExpectedSize(
+                statement.getProps().size());
+        Map<String,Object> commonFamilyProps = Maps.newHashMapWithExpectedSize(
+                statement.getProps().size() + 1);
+        populatePropertyMaps(statement.getProps(), tableProps, 
commonFamilyProps, PTableType.CDC);
+
+        NamedNode indexName = FACTORY.indexName(CDCUtil.getCDCIndexName(
+                statement.getCdcObjName().getName()));
+        String timeIdxColName = statement.getTimeIdxColumn() != null ?
+                statement.getTimeIdxColumn().getColumnName() : null;
+        IndexKeyConstraint indexKeyConstraint =
+                FACTORY.indexKey(Arrays.asList(new Pair[] { Pair.newPair(
+                        timeIdxColName != null ? 
FACTORY.column(statement.getDataTable(),
+                                timeIdxColName, timeIdxColName) : 
statement.getTimeIdxFunc(),
+                        SortOrder.getDefault()) }));
+        IndexType indexType = (IndexType) 
TableProperty.INDEX_TYPE.getValue(tableProps);
+        ListMultimap<String, Pair<String, Object>> indexProps = 
ArrayListMultimap.create();
+        // TODO: Transfer TTL and MaxLookback from statement.getProps() to 
indexProps.
+        CreateIndexStatement indexStatement = FACTORY.createIndex(indexName, 
FACTORY.namedTable(null,
+                        statement.getDataTable(), (Double) null), 
indexKeyConstraint, null, null,
+                        indexProps, statement.isIfNotExists(), indexType, 
false, 0,
+                        new HashMap<>());
+        // TODO: Currently index can be dropped, leaving the CDC dangling, 
DROP INDEX needs to
+        //  protect based on CDCUtil.isACDCIndex().
+        // TODO: Should we also allow PTimestamp here?
+        MutationState indexMutationState;
+        try {
+            indexMutationState = createIndex(indexStatement, null, 
PDate.INSTANCE);
+        }
+        catch(SQLException e) {
+            if (e.getErrorCode() == TABLE_ALREADY_EXIST.getErrorCode()) {
+                throw new 
SQLExceptionInfo.Builder(TABLE_ALREADY_EXIST).setTableName(
+                        
statement.getCdcObjName().getName()).build().buildException();
+            }
+            // TODO: What about translating other index creation failures? 
E.g., bad TS column.
+            throw e;
+        }
+
+        // TODO: Do we need to borrow the schema name of the table?
+        ColumnResolver resolver = 
FromCompiler.getResolver(NamedTableNode.create(statement.getDataTable()), 
connection);
+        TableRef tableRef = resolver.getTables().get(0);
+        PTable dataTable = tableRef.getTable();
+        List<PColumn> pkColumns = dataTable.getPKColumns();
+        List<ColumnDef> columnDefs = new ArrayList<>();
+        List<ColumnDefInPkConstraint> pkColumnDefs = new ArrayList<>();
+        // TODO: toString() on function will have an extra space at the 
beginning, but this may
+        //  be OK as I see exactly the same with an index.
+        ColumnName timeIdxCol = statement.getTimeIdxColumn() != null ?
+                statement.getTimeIdxColumn() :
+                FACTORY.columnName(statement.getTimeIdxFunc().toString());
+        columnDefs.add(FACTORY.columnDef(timeIdxCol, 
PTimestamp.INSTANCE.getSqlTypeName(), false, null, false,
+                PTimestamp.INSTANCE.getMaxLength(null), 
PTimestamp.INSTANCE.getScale(null), false,
+                SortOrder.getDefault(), "", null, false));
+        pkColumnDefs.add(FACTORY.columnDefInPkConstraint(timeIdxCol, 
SortOrder.getDefault(), false));
+        for (PColumn pcol: pkColumns) {
+            // TODO: Cross check with the ColumnName creation logic in 
createIndex (line ~1578).
+            
columnDefs.add(FACTORY.columnDef(FACTORY.columnName(pcol.getName().getString()),
+                    pcol.getDataType().getSqlTypeName(), false, null, false, 
pcol.getMaxLength(),
+                    pcol.getScale(), false, pcol.getSortOrder(), "", null, 
false));
+            
pkColumnDefs.add(FACTORY.columnDefInPkConstraint(FACTORY.columnName(
+                    pcol.getName().getString()), pcol.getSortOrder(), 
pcol.isRowTimestamp()));
+        }
+        
columnDefs.add(FACTORY.columnDef(FACTORY.columnName(QueryConstants.CDC_JSON_COL_NAME),
+                PVarchar.INSTANCE.getSqlTypeName(), false, null, true, null,
+                null, false, SortOrder.getDefault(), "", null, false));
+        CreateTableStatement tableStatement = FACTORY.createTable(
+                FACTORY.table(dataTable.getSchemaName().getString(), 
statement.getCdcObjName().getName()),
+                statement.getProps(), columnDefs, FACTORY.primaryKey(null, 
pkColumnDefs),
+                Collections.emptyList(), PTableType.CDC, 
statement.isIfNotExists(), null, null,
+                statement.getBindCount(), null);
+        createTableInternal(tableStatement, null, dataTable, null, null, null,
+                null, null, false, null,
+                null, statement.getIncludeScopes(), tableProps, 
commonFamilyProps);
+        return indexMutationState;
+    }
+
     public MutationState dropSequence(DropSequenceStatement statement) throws 
SQLException {
         Long scn = connection.getSCN();
         long timestamp = scn == null ? HConstants.LATEST_TIMESTAMP : scn;
@@ -1933,11 +2022,12 @@ public class MetaDataClient {
     }
 
     private PTable createTableInternal(CreateTableStatement statement, 
byte[][] splits,
-            final PTable parent, String viewStatement, ViewType viewType, 
PDataType viewIndexIdType,
-            final byte[][] viewColumnConstants, final BitSet 
isViewColumnReferenced, boolean allocateIndexId,
-            IndexType indexType, Date asyncCreatedDate,
-            Map<String,Object> tableProps,
-            Map<String,Object> commonFamilyProps) throws SQLException {
+                                       final PTable parent, String 
viewStatement, ViewType viewType, PDataType viewIndexIdType,
+                                       final byte[][] viewColumnConstants, 
final BitSet isViewColumnReferenced, boolean allocateIndexId,
+                                       IndexType indexType, Date 
asyncCreatedDate,
+                                       Set<PTable.CDCChangeScope> 
cdcIncludeScopes,
+                                       Map<String, Object> tableProps,
+                                       Map<String, Object> commonFamilyProps) 
throws SQLException {
         final PTableType tableType = statement.getTableType();
         boolean wasAutoCommit = connection.getAutoCommit();
         TableName tableNameNode = null;
@@ -2036,6 +2126,8 @@ public class MetaDataClient {
 
             String schemaVersion = (String) 
TableProperty.SCHEMA_VERSION.getValue(tableProps);
             String streamingTopicName = (String) 
TableProperty.STREAMING_TOPIC_NAME.getValue(tableProps);
+            String cdcIncludeScopesStr = cdcIncludeScopes == null ? null :
+                    CDCUtil.makeChangeScopeStringFromEnums(cdcIncludeScopes);
 
             if (parent != null && tableType == PTableType.INDEX) {
                 timestamp = TransactionUtil.getTableTimestamp(connection, 
transactionProvider != null, transactionProvider);
@@ -2075,37 +2167,40 @@ public class MetaDataClient {
                 storeNulls = parent.getStoreNulls();
                 parentTableName = parent.getTableName().getString();
                 // Pass through data table sequence number so we can check it 
hasn't changed
-                PreparedStatement incrementStatement = 
connection.prepareStatement(INCREMENT_SEQ_NUM);
-                incrementStatement.setString(1, tenantIdStr);
-                incrementStatement.setString(2, schemaName);
-                incrementStatement.setString(3, parentTableName);
-                incrementStatement.setLong(4, parent.getSequenceNumber());
-                incrementStatement.execute();
-                // Get list of mutations and add to table meta data that will 
be passed to server
-                // to guarantee order. This row will always end up last
-                
tableMetaData.addAll(connection.getMutationState().toMutations(timestamp).next().getSecond());
-                connection.rollback();
+                try (PreparedStatement incrementStatement = 
connection.prepareStatement(INCREMENT_SEQ_NUM)) {
+                    incrementStatement.setString(1, tenantIdStr);
+                    incrementStatement.setString(2, schemaName);
+                    incrementStatement.setString(3, parentTableName);
+                    incrementStatement.setLong(4, parent.getSequenceNumber());
+                    incrementStatement.execute();
+                    // Get list of mutations and add to table meta data that 
will be passed to server
+                    // to guarantee order. This row will always end up last
+                    
tableMetaData.addAll(connection.getMutationState().toMutations(timestamp).next().getSecond());
+                    connection.rollback();
+                }
 
                 // Add row linking from data table row to index table row
-                PreparedStatement linkStatement = 
connection.prepareStatement(CREATE_LINK);
-                linkStatement.setString(1, tenantIdStr);
-                linkStatement.setString(2, schemaName);
-                linkStatement.setString(3, parentTableName);
-                linkStatement.setString(4, tableName);
-                linkStatement.setByte(5, 
LinkType.INDEX_TABLE.getSerializedValue());
-                linkStatement.setLong(6, parent.getSequenceNumber());
-                linkStatement.setString(7, 
PTableType.INDEX.getSerializedValue());
-                linkStatement.execute();
+                try (PreparedStatement linkStatement = 
connection.prepareStatement(CREATE_LINK)) {
+                    linkStatement.setString(1, tenantIdStr);
+                    linkStatement.setString(2, schemaName);
+                    linkStatement.setString(3, parentTableName);
+                    linkStatement.setString(4, tableName);
+                    linkStatement.setByte(5, 
LinkType.INDEX_TABLE.getSerializedValue());
+                    linkStatement.setLong(6, parent.getSequenceNumber());
+                    linkStatement.setString(7, 
PTableType.INDEX.getSerializedValue());
+                    linkStatement.execute();
+                }
 
                 // Add row linking index table to parent table for indexes on 
views
                 if (parent.getType() == PTableType.VIEW) {
-                       linkStatement = 
connection.prepareStatement(CREATE_VIEW_INDEX_PARENT_LINK);
-                       linkStatement.setString(1, tenantIdStr);
-                       linkStatement.setString(2, schemaName);
-                       linkStatement.setString(3, tableName);
-                       linkStatement.setString(4, 
parent.getName().getString());
-                       linkStatement.setByte(5, 
LinkType.VIEW_INDEX_PARENT_TABLE.getSerializedValue());
-                       linkStatement.execute();
+                    try (PreparedStatement linkStatement = 
connection.prepareStatement(CREATE_VIEW_INDEX_PARENT_LINK)) {
+                        linkStatement.setString(1, tenantIdStr);
+                        linkStatement.setString(2, schemaName);
+                        linkStatement.setString(3, tableName);
+                        linkStatement.setString(4, 
parent.getName().getString());
+                        linkStatement.setByte(5, 
LinkType.VIEW_INDEX_PARENT_TABLE.getSerializedValue());
+                        linkStatement.execute();
+                    }
                 }
             }
 
@@ -2375,24 +2470,26 @@ public class MetaDataClient {
                     pkColumns = newLinkedHashSet(parent.getPKColumns());
 
                     // Add row linking view to its parent 
-                    PreparedStatement linkStatement = 
connection.prepareStatement(CREATE_VIEW_LINK);
-                    linkStatement.setString(1, tenantIdStr);
-                    linkStatement.setString(2, schemaName);
-                    linkStatement.setString(3, tableName);
-                    linkStatement.setString(4, parent.getName().getString());
-                    linkStatement.setByte(5, 
LinkType.PARENT_TABLE.getSerializedValue());
-                    linkStatement.setString(6, parent.getTenantId() == null ? 
null : parent.getTenantId().getString());
-                    linkStatement.execute();
+                    try (PreparedStatement linkStatement = 
connection.prepareStatement(CREATE_VIEW_LINK)) {
+                        linkStatement.setString(1, tenantIdStr);
+                        linkStatement.setString(2, schemaName);
+                        linkStatement.setString(3, tableName);
+                        linkStatement.setString(4, 
parent.getName().getString());
+                        linkStatement.setByte(5, 
LinkType.PARENT_TABLE.getSerializedValue());
+                        linkStatement.setString(6, parent.getTenantId() == 
null ? null : parent.getTenantId().getString());
+                        linkStatement.execute();
+                    }
                     // Add row linking parent to view
-                    // TODO From 4.16 write the child links to 
SYSTEM.CHILD_LINK directly 
-                    linkStatement = 
connection.prepareStatement(CREATE_CHILD_LINK);
-                    linkStatement.setString(1, parent.getTenantId() == null ? 
null : parent.getTenantId().getString());
-                    linkStatement.setString(2, parent.getSchemaName() == null 
? null : parent.getSchemaName().getString());
-                    linkStatement.setString(3, 
parent.getTableName().getString());
-                    linkStatement.setString(4, tenantIdStr);
-                    linkStatement.setString(5, 
SchemaUtil.getTableName(schemaName, tableName));
-                    linkStatement.setByte(6, 
LinkType.CHILD_TABLE.getSerializedValue());
-                    linkStatement.execute();
+                    // TODO From 4.16 write the child links to 
SYSTEM.CHILD_LINK directly
+                    try (PreparedStatement linkStatement = 
connection.prepareStatement(CREATE_CHILD_LINK)) {
+                        linkStatement.setString(1, parent.getTenantId() == 
null ? null : parent.getTenantId().getString());
+                        linkStatement.setString(2, parent.getSchemaName() == 
null ? null : parent.getSchemaName().getString());
+                        linkStatement.setString(3, 
parent.getTableName().getString());
+                        linkStatement.setString(4, tenantIdStr);
+                        linkStatement.setString(5, 
SchemaUtil.getTableName(schemaName, tableName));
+                        linkStatement.setByte(6, 
LinkType.CHILD_TABLE.getSerializedValue());
+                        linkStatement.execute();
+                    }
                 }
             } else {
                 columns = new LinkedHashMap<PColumn,PColumn>(colDefs.size());
@@ -2409,29 +2506,30 @@ public class MetaDataClient {
                     && 
!physicalNames.get(0).getString().equals(SchemaUtil.getPhysicalHBaseTableName(
                         schemaName, tableName, 
isNamespaceMapped).getString()))) {
                     // Add row linking from data table row to physical table 
row
-                    PreparedStatement linkStatement = 
connection.prepareStatement(CREATE_LINK);
-                    for (PName physicalName : physicalNames) {
-                        linkStatement.setString(1, tenantIdStr);
-                        linkStatement.setString(2, schemaName);
-                        linkStatement.setString(3, tableName);
-                        linkStatement.setString(4, physicalName.getString());
-                        linkStatement.setByte(5, 
LinkType.PHYSICAL_TABLE.getSerializedValue());
-                        if (tableType == PTableType.VIEW) {
-                            if (parent.getType() == PTableType.TABLE) {
-                                linkStatement.setString(4, 
SchemaUtil.getTableName(parent.getSchemaName().getString(),parent.getTableName().getString()));
+                    try (PreparedStatement linkStatement = 
connection.prepareStatement(CREATE_LINK)) {
+                        for (PName physicalName : physicalNames) {
+                            linkStatement.setString(1, tenantIdStr);
+                            linkStatement.setString(2, schemaName);
+                            linkStatement.setString(3, tableName);
+                            linkStatement.setString(4, 
physicalName.getString());
+                            linkStatement.setByte(5, 
LinkType.PHYSICAL_TABLE.getSerializedValue());
+                            if (tableType == PTableType.VIEW) {
+                                if (parent.getType() == PTableType.TABLE) {
+                                    linkStatement.setString(4, 
SchemaUtil.getTableName(parent.getSchemaName().getString(), 
parent.getTableName().getString()));
+                                    linkStatement.setLong(6, 
parent.getSequenceNumber());
+                                } else { //This is a grandchild view, find the 
physical base table
+                                    PTable logicalTable = 
connection.getTable(new PTableKey(null, 
SchemaUtil.replaceNamespaceSeparator(physicalName)));
+                                    linkStatement.setString(4, 
SchemaUtil.getTableName(logicalTable.getSchemaName().getString(), 
logicalTable.getTableName().getString()));
+                                    linkStatement.setLong(6, 
logicalTable.getSequenceNumber());
+                                }
+                                // Set link to logical name
+                                linkStatement.setString(7, null);
+                            } else {
                                 linkStatement.setLong(6, 
parent.getSequenceNumber());
-                            } else { //This is a grandchild view, find the 
physical base table
-                                PTable logicalTable = connection.getTable(new 
PTableKey(null, SchemaUtil.replaceNamespaceSeparator(physicalName)));
-                                linkStatement.setString(4, 
SchemaUtil.getTableName(logicalTable.getSchemaName().getString(),logicalTable.getTableName().getString()));
-                                linkStatement.setLong(6, 
logicalTable.getSequenceNumber());
+                                linkStatement.setString(7, 
PTableType.INDEX.getSerializedValue());
                             }
-                            // Set link to logical name
-                            linkStatement.setString(7, null);
-                        } else {
-                            linkStatement.setLong(6, 
parent.getSequenceNumber());
-                            linkStatement.setString(7, 
PTableType.INDEX.getSerializedValue());
+                            linkStatement.execute();
                         }
-                        linkStatement.execute();
                     }
                     
tableMetaData.addAll(connection.getMutationState().toMutations(timestamp).next().getSecond());
                     connection.rollback();
@@ -2845,12 +2943,13 @@ public class MetaDataClient {
                     }
                 }
                 if (tableType == VIEW && !changedCqCounters.isEmpty()) {
-                    PreparedStatement incrementStatement = 
connection.prepareStatement(INCREMENT_SEQ_NUM);
-                    incrementStatement.setString(1, null);
-                    incrementStatement.setString(2, 
viewPhysicalTable.getSchemaName().getString());
-                    incrementStatement.setString(3, 
viewPhysicalTable.getTableName().getString());
-                    incrementStatement.setLong(4, 
viewPhysicalTable.getSequenceNumber() + 1);
-                    incrementStatement.execute();
+                    try (PreparedStatement incrementStatement = 
connection.prepareStatement(INCREMENT_SEQ_NUM)) {
+                        incrementStatement.setString(1, null);
+                        incrementStatement.setString(2, 
viewPhysicalTable.getSchemaName().getString());
+                        incrementStatement.setString(3, 
viewPhysicalTable.getTableName().getString());
+                        incrementStatement.setLong(4, 
viewPhysicalTable.getSequenceNumber() + 1);
+                        incrementStatement.execute();
+                    }
                 }
                 if 
(connection.getMutationState().toMutations(timestamp).hasNext()) {
                     
tableMetaData.addAll(connection.getMutationState().toMutations(timestamp).next().getSecond());
@@ -3072,23 +3171,31 @@ public class MetaDataClient {
                 tableUpsert.setString(35, streamingTopicName);
             }
 
+            if (cdcIncludeScopesStr == null) {
+                tableUpsert.setNull(36, Types.VARCHAR);
+            } else {
+                tableUpsert.setString(36, cdcIncludeScopesStr);
+            }
+
             tableUpsert.execute();
 
             if (asyncCreatedDate != null) {
-                PreparedStatement setAsync = 
connection.prepareStatement(SET_ASYNC_CREATED_DATE);
-                setAsync.setString(1, tenantIdStr);
-                setAsync.setString(2, schemaName);
-                setAsync.setString(3, tableName);
-                setAsync.setDate(4, asyncCreatedDate);
-                setAsync.execute();
+                try (PreparedStatement setAsync = 
connection.prepareStatement(SET_ASYNC_CREATED_DATE)) {
+                    setAsync.setString(1, tenantIdStr);
+                    setAsync.setString(2, schemaName);
+                    setAsync.setString(3, tableName);
+                    setAsync.setDate(4, asyncCreatedDate);
+                    setAsync.execute();
+                }
             } else {
                 Date syncCreatedDate = new 
Date(EnvironmentEdgeManager.currentTimeMillis());
-                PreparedStatement setSync = 
connection.prepareStatement(SET_INDEX_SYNC_CREATED_DATE);
-                setSync.setString(1, tenantIdStr);
-                setSync.setString(2, schemaName);
-                setSync.setString(3, tableName);
-                setSync.setDate(4, syncCreatedDate);
-                setSync.execute();
+                try (PreparedStatement setSync = 
connection.prepareStatement(SET_INDEX_SYNC_CREATED_DATE)) {
+                    setSync.setString(1, tenantIdStr);
+                    setSync.setString(2, schemaName);
+                    setSync.setString(3, tableName);
+                    setSync.setDate(4, syncCreatedDate);
+                    setSync.execute();
+                }
             }
             
tableMetaData.addAll(connection.getMutationState().toMutations(timestamp).next().getSecond());
             connection.rollback();
@@ -3219,6 +3326,7 @@ public class MetaDataClient {
                         .setExternalSchemaId(result.getTable() != null ?
                         result.getTable().getExternalSchemaId() : null)
                         .setStreamingTopicName(streamingTopicName)
+                        .setCDCIncludeScopes(cdcIncludeScopes)
                         .build();
                 result = new MetaDataMutationResult(code, 
result.getMutationTime(), table, true);
                 addTableToCache(result);
@@ -3700,8 +3808,8 @@ public class MetaDataClient {
         // Ordinal position is 1-based and we don't count SALT column in 
ordinal position
         int totalColumnCount = table.getColumns().size() + 
(table.getBucketNum() == null ? 0 : -1);
         final long seqNum = table.getSequenceNumber() + 1;
-        PreparedStatement tableUpsert = 
connection.prepareStatement(MUTATE_TABLE);
         String tenantId = connection.getTenantId() == null ? null : 
connection.getTenantId().getString();
+        PreparedStatement tableUpsert = 
connection.prepareStatement(MUTATE_TABLE);
         try {
             tableUpsert.setString(1, tenantId);
             tableUpsert.setString(2, schemaName);
@@ -4190,9 +4298,8 @@ public class MetaDataClient {
                 // Add column metadata afterwards, maintaining the order so 
columns have more predictable ordinal position
                 tableMetaData.addAll(columnMetaData);
                 if (!changedCqCounters.isEmpty()) {
-                    PreparedStatement linkStatement;
-                        linkStatement = 
connection.prepareStatement(UPDATE_ENCODED_COLUMN_COUNTER);
-                        for (Entry<String, Integer> entry : 
changedCqCounters.entrySet()) {    
+                    try (PreparedStatement linkStatement = 
connection.prepareStatement(UPDATE_ENCODED_COLUMN_COUNTER)) {
+                        for (Entry<String, Integer> entry : 
changedCqCounters.entrySet()) {
                             linkStatement.setString(1, tenantIdToUse);
                             linkStatement.setString(2, 
tableForCQCounters.getSchemaName().getString());
                             linkStatement.setString(3, 
tableForCQCounters.getTableName().getString());
@@ -4200,16 +4307,18 @@ public class MetaDataClient {
                             linkStatement.setInt(5, entry.getValue());
                             linkStatement.execute();
                         }
+                    }
 
                     // When a view adds its own columns, then we need to 
increase the sequence number of the base table
                     // too since we want clients to get the latest PTable of 
the base table.
                     if (tableType == VIEW) {
-                        PreparedStatement incrementStatement = 
connection.prepareStatement(INCREMENT_SEQ_NUM);
-                        incrementStatement.setString(1, null);
-                        incrementStatement.setString(2, 
tableForCQCounters.getSchemaName().getString());
-                        incrementStatement.setString(3, 
tableForCQCounters.getTableName().getString());
-                        incrementStatement.setLong(4, 
tableForCQCounters.getSequenceNumber() + 1);
-                        incrementStatement.execute();
+                        try (PreparedStatement incrementStatement = 
connection.prepareStatement(INCREMENT_SEQ_NUM)) {
+                            incrementStatement.setString(1, null);
+                            incrementStatement.setString(2, 
tableForCQCounters.getSchemaName().getString());
+                            incrementStatement.setString(3, 
tableForCQCounters.getTableName().getString());
+                            incrementStatement.setLong(4, 
tableForCQCounters.getSequenceNumber() + 1);
+                            incrementStatement.execute();
+                        }
                     }
                     
tableMetaData.addAll(connection.getMutationState().toMutations(timeStamp).next().getSecond());
                     connection.rollback();
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
index 890ce22909..fcaaea5301 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
@@ -28,6 +28,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Set;
 
 import javax.annotation.Nullable;
 
@@ -962,6 +963,11 @@ public interface PTable extends PMetaDataEntity {
      */
     String getStreamingTopicName();
 
+    /**
+     * @return Optional string that represents the default include scopes to 
be used for CDC queries.
+     */
+    Set<CDCChangeScope> getCDCIncludeScopes();
+
     /**
      * Class to help track encoded column qualifier counters per column family.
      */
@@ -1049,6 +1055,11 @@ public interface PTable extends PMetaDataEntity {
     }
 
     enum CDCChangeScope {
+        /**
+         * Include only the actual change in image.
+         */
+        CHANGE,
+
         /**
          * Include only the pre image (state prior to the change) of the row.
          */
@@ -1063,11 +1074,6 @@ public interface PTable extends PMetaDataEntity {
          * Include only the latest image of the row.
          */
         LATEST,
-
-        /**
-         * Include all images.
-         */
-        ALL,
+        ;
     }
-
 }
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
index d332a95269..1e32c60900 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
@@ -64,6 +64,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Set;
 
 import javax.annotation.Nonnull;
 
@@ -107,6 +108,7 @@ import org.apache.phoenix.schema.types.PFloat;
 import org.apache.phoenix.schema.types.PVarchar;
 import org.apache.phoenix.transaction.TransactionFactory;
 import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.util.CDCUtil;
 import org.apache.phoenix.util.EncodedColumnsUtil;
 import org.apache.phoenix.util.MetaDataUtil;
 import org.apache.phoenix.util.PhoenixRuntime;
@@ -211,6 +213,7 @@ public class PTableImpl implements PTable {
     private String schemaVersion;
     private String externalSchemaId;
     private String streamingTopicName;
+    private Set<CDCChangeScope> cdcIncludeScopes;
 
     public static class Builder {
         private PTableKey key;
@@ -276,6 +279,7 @@ public class PTableImpl implements PTable {
         private String schemaVersion;
         private String externalSchemaId;
         private String streamingTopicName;
+        private Set<CDCChangeScope> cdcIncludeScopes;
 
         // Used to denote which properties a view has explicitly modified
         private BitSet viewModifiedPropSet = new BitSet(3);
@@ -696,6 +700,13 @@ public class PTableImpl implements PTable {
             return this;
          }
 
+        public Builder setCDCIncludeScopes(Set<CDCChangeScope> 
cdcIncludeScopes) {
+            if (cdcIncludeScopes != null) {
+                this.cdcIncludeScopes = cdcIncludeScopes;
+            }
+            return this;
+        }
+
         /**
          * Populate derivable attributes of the PTable
          * @return PTableImpl.Builder object
@@ -986,6 +997,7 @@ public class PTableImpl implements PTable {
         this.schemaVersion = builder.schemaVersion;
         this.externalSchemaId = builder.externalSchemaId;
         this.streamingTopicName = builder.streamingTopicName;
+        this.cdcIncludeScopes = builder.cdcIncludeScopes;
     }
 
     // When cloning table, ignore the salt column as it will be added back in 
the constructor
@@ -1999,6 +2011,11 @@ public class PTableImpl implements PTable {
             streamingTopicName =
                 (String) 
PVarchar.INSTANCE.toObject(table.getStreamingTopicName().toByteArray());
         }
+        String cdcIncludeScopesStr = null;
+        if (table.hasCDCIncludeScopes()) {
+            cdcIncludeScopesStr =
+                    (String) 
PVarchar.INSTANCE.toObject(table.getCDCIncludeScopes().toByteArray());
+        }
         try {
             return new PTableImpl.Builder()
                     .setType(tableType)
@@ -2056,6 +2073,8 @@ public class PTableImpl implements PTable {
                     .setSchemaVersion(schemaVersion)
                     .setExternalSchemaId(externalSchemaId)
                     .setStreamingTopicName(streamingTopicName)
+                    .setCDCIncludeScopes(
+                            
CDCUtil.makeChangeScopeEnumsFromString(cdcIncludeScopesStr))
                     .build();
         } catch (SQLException e) {
             throw new RuntimeException(e); // Impossible
@@ -2337,6 +2356,11 @@ public class PTableImpl implements PTable {
         return streamingTopicName;
     }
 
+    @Override
+    public Set<CDCChangeScope> getCDCIncludeScopes() {
+        return cdcIncludeScopes;
+    }
+
     private static final class KVColumnFamilyQualifier {
         @Nonnull
         private final String colFamilyName;
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableType.java 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableType.java
index 8d6281edd4..d89f1bd36a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableType.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableType.java
@@ -29,7 +29,8 @@ public enum PTableType {
     INDEX("i", "INDEX"),
     PROJECTED("p", "PROJECTED"),
     CDC("c", "CDC"),
-    SUBQUERY("q", "SUBQUERY"); 
+    SUBQUERY("q", "SUBQUERY"),
+    ;
 
     private final PName value;
     private final String serializedValue;
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/schema/TableProperty.java 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/TableProperty.java
index 24d0432543..40bef055a7 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/TableProperty.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/TableProperty.java
@@ -25,6 +25,7 @@ import static 
org.apache.phoenix.exception.SQLExceptionCode.SALT_ONLY_ON_CREATE_
 import static 
org.apache.phoenix.exception.SQLExceptionCode.VIEW_WITH_PROPERTIES;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DEFAULT_COLUMN_FAMILY_NAME;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.PHOENIX_TTL_NOT_DEFINED;
+import static org.apache.phoenix.util.CDCUtil.CDC_INDEX_TYPE_LOCAL;
 
 import java.sql.SQLException;
 import java.util.Map;
@@ -336,6 +337,38 @@ public enum TableProperty {
         @Override public Object getPTableValue(PTable table) {
             return table.getStreamingTopicName();
         }
+    },
+
+    INDEX_TYPE(PhoenixDatabaseMetaData.CDC_INDEX_TYPE_NAME, 
COLUMN_FAMILY_NOT_ALLOWED_FOR_PROPERTY, false, false, false) {
+        @Override
+        public Object getValue(Object value) {
+            return value != null && 
value.toString().toUpperCase().equals(CDC_INDEX_TYPE_LOCAL) ?
+                    PTable.IndexType.LOCAL :
+                    PTable.IndexType.UNCOVERED_GLOBAL;
+        }
+
+        @Override
+        public Object getPTableValue(PTable table) {
+            return null;
+        }
+    },
+
+    INCLUDE(PhoenixDatabaseMetaData.CDC_INCLUDE_NAME, 
COLUMN_FAMILY_NOT_ALLOWED_FOR_PROPERTY, true, false, false) {
+        @Override
+        public Object getValue(Object value) {
+            try {
+                return value == null ? PTable.CDCChangeScope.CHANGE : 
PTable.CDCChangeScope.valueOf(value.toString().toUpperCase());
+            } catch (IllegalArgumentException e) {
+                throw new RuntimeException(new 
SQLExceptionInfo.Builder(SQLExceptionCode.UNKNOWN_INCLUDE_CHANGE_SCOPE)
+                        .setMessage(value.toString())
+                        .build().buildException());
+            }
+        }
+
+        @Override
+        public Object getPTableValue(PTable table) {
+            return table.getCDCIncludeScopes();
+        }
     };
 
     private final String propertyName;
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/CDCUtil.java 
b/phoenix-core/src/main/java/org/apache/phoenix/util/CDCUtil.java
new file mode 100644
index 0000000000..d015aaf422
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/CDCUtil.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.util;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.StringTokenizer;
+
+import org.apache.hadoop.util.StringUtils;
+
+import org.apache.phoenix.schema.PTable;
+
+public class CDCUtil {
+    public static final String CDC_INDEX_PREFIX = "__CDC__";
+    public static final String CDC_INDEX_TYPE_LOCAL = "L";
+
+    /**
+     * Make a set of CDC change scope enums from the given string containing 
comma separated scope
+     * names.
+     *
+     * @param includeScopes Comma-separated scope names.
+     * @return the set of enums, which can be empty if the string is empty or 
has no valid names.
+     */
+    public static Set<PTable.CDCChangeScope> 
makeChangeScopeEnumsFromString(String includeScopes) {
+        Set<PTable.CDCChangeScope> cdcChangeScopes = new HashSet<>();
+        if (includeScopes != null) {
+            StringTokenizer st  = new StringTokenizer(includeScopes, ",");
+            while (st.hasMoreTokens()) {
+                String tok = st.nextToken();
+                try {
+                    
cdcChangeScopes.add(PTable.CDCChangeScope.valueOf(tok.trim().toUpperCase()));
+                }
+                catch (IllegalArgumentException e) {
+                    // Just ignore unrecognized scopes.
+                }
+            }
+        }
+        return cdcChangeScopes;
+    }
+
+    /**
+     * Make a string of comma-separated scope names from the specified set of 
enums.
+     *
+     * @param includeScopes Set of scope enums
+     * @return the comma-separated string of scopes, which can be an empty 
string in case the set is empty.
+     */
+    public static String 
makeChangeScopeStringFromEnums(Set<PTable.CDCChangeScope> includeScopes) {
+        String cdcChangeScopes = null;
+        if (includeScopes != null) {
+            Iterable<String> tmpStream = () -> includeScopes.stream().sorted()
+                    .map(s -> s.name()).iterator();
+            cdcChangeScopes = StringUtils.join(",", tmpStream);
+        }
+        return cdcChangeScopes;
+    }
+
+    public static String getCDCIndexName(String cdcName) {
+        return CDC_INDEX_PREFIX + cdcName;
+    }
+
+    public static String getCDCNameFromIndexName(String indexName) {
+        assert(indexName.startsWith(CDC_INDEX_PREFIX));
+        return indexName.substring(CDC_INDEX_PREFIX.length());
+    }
+
+    public static boolean isACDCIndex(String indexName) {
+        return indexName.startsWith(CDC_INDEX_PREFIX);
+    }
+}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java 
b/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java
index fbca85b3b9..14a48a62e8 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java
@@ -115,6 +115,7 @@ public class SchemaUtil {
     private static final int VAR_KV_LENGTH_ESTIMATE = 50;
     public static final String ESCAPE_CHARACTER = "\"";
     public static final DataBlockEncoding DEFAULT_DATA_BLOCK_ENCODING = 
DataBlockEncoding.FAST_DIFF;
+
     public static final PDatum VAR_BINARY_DATUM = new PDatum() {
     
         @Override
@@ -239,7 +240,7 @@ public class SchemaUtil {
     }
 
     /**
-     * Normalizes the fulltableName . Uses {@linkplain normalizeIdentifier}
+     * Normalizes the fulltableName . Uses {@linkplain #normalizeIdentifier}
      * @param fullTableName
      * @return
      */
diff --git a/phoenix-core/src/main/protobuf/PTable.proto 
b/phoenix-core/src/main/protobuf/PTable.proto
index 6f6a663a99..4019a63695 100644
--- a/phoenix-core/src/main/protobuf/PTable.proto
+++ b/phoenix-core/src/main/protobuf/PTable.proto
@@ -30,6 +30,7 @@ enum PTableType {
   VIEW = 2;
   INDEX = 3;
   JOIN = 4;
+  CDC = 5;
 }
 
 message PColumn {
@@ -117,6 +118,7 @@ message PTable {
   optional bytes externalSchemaId=50;
   optional PTable transformingNewTable=51;
   optional bytes streamingTopicName=52;
+  optional bytes CDCIncludeScopes=55;
 }
 
 message EncodedCQCounter {
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/parse/QueryParserTest.java 
b/phoenix-core/src/test/java/org/apache/phoenix/parse/QueryParserTest.java
index cfbb4a4572..829cb73ed0 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/parse/QueryParserTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/parse/QueryParserTest.java
@@ -497,7 +497,8 @@ public class QueryParserTest {
         }
     }
 
-    private CreateCDCStatement parseCreateCDCSimple(String sql, boolean 
ifNotExists, String tsCol) throws Exception {
+    private CreateCDCStatement parseCreateCDCSimple(String sql, boolean 
ifNotExists, String tsCol)
+            throws Exception {
         CreateCDCStatement stmt = parseQuery(sql, CreateCDCStatement.class);
         assertEquals("FOO", stmt.getCdcObjName().getName());
         assertEquals("BAR", stmt.getDataTable().getTableName());
@@ -517,17 +518,37 @@ public class QueryParserTest {
         parseCreateCDCSimple("create cdc foo on bar(ts)", false, "TS");
         parseCreateCDCSimple("create cdc foo on s.bar(ts)", false, "TS");
         parseCreateCDCSimple("create cdc if not exists foo on bar(ts)", true, 
"TS");
-        stmt = parseCreateCDCSimple("create cdc foo on 
bar(PHOENIX_ROW_TIMESTAMP())", false, null);
-        assertEquals("PHOENIX_ROW_TIMESTAMP", stmt.getTimeIdxFunc().getName());
-        assertEquals(" PHOENIX_ROW_TIMESTAMP()", 
stmt.getTimeIdxFunc().toString());
+        parseCreateCDCSimple("create cdc foo on bar(t) index_type=g", false, 
"T");
+        parseCreateCDCSimple("create cdc foo on bar(t) index_type=l", false, 
"T");
+        stmt = parseCreateCDCSimple("create cdc foo on bar(TS_FUNC()) TTL=100, 
INDEX_TYPE=g",
+                false, null);
+        assertEquals("TS_FUNC", stmt.getTimeIdxFunc().getName());
+        assertEquals(" TS_FUNC()", stmt.getTimeIdxFunc().toString());
+        assertEquals(Arrays.asList(new Pair("TTL", 100), new 
Pair("INDEX_TYPE", "g")),
+                stmt.getProps().get(""));
         stmt = parseCreateCDCSimple("create cdc foo on bar(ts) include (pre)", 
false, "TS");
-        assertEquals(new HashSet<>(Arrays.asList(PTable.CDCChangeScope.PRE)), 
stmt.getIncludeScopes());
-        stmt = parseCreateCDCSimple("create cdc foo on bar(ts) include (pre, 
pre, post)", false, "TS");
-        assertEquals(new HashSet<>(Arrays.asList(PTable.CDCChangeScope.PRE, 
PTable.CDCChangeScope.POST)), stmt.getIncludeScopes());
-        stmt = parseCreateCDCSimple("create cdc if not exists foo on bar(ts) 
abc=def", true, "TS");
+        assertEquals(new HashSet<>(Arrays.asList(PTable.CDCChangeScope.PRE)),
+                stmt.getIncludeScopes());
+        stmt = parseCreateCDCSimple("create cdc foo on bar(ts) include (pre, 
pre, post)",
+                false, "TS");
+        assertEquals(new HashSet<>(Arrays.asList(PTable.CDCChangeScope.PRE,
+                PTable.CDCChangeScope.POST)), stmt.getIncludeScopes());
+        stmt = parseCreateCDCSimple("create cdc if not exists foo on bar(ts) 
abc=def",
+                true, "TS");
         assertEquals(Arrays.asList(new Pair("ABC", "def")), 
stmt.getProps().get(""));
-        stmt = parseCreateCDCSimple("create cdc if not exists foo on bar(ts) 
abc=def, prop=val", true, "TS");
-        assertEquals(Arrays.asList(new Pair("ABC", "def"), new Pair("PROP", 
"val")), stmt.getProps().get(""));
+        stmt = parseCreateCDCSimple("create cdc if not exists foo on bar(ts) 
abc=def, prop=val",
+                true, "TS");
+        assertEquals(Arrays.asList(new Pair("ABC", "def"), new Pair("PROP", 
"val")),
+                stmt.getProps().get(""));
+    }
+
+    @Test
+    public void testCreateCDCWithErrors() throws Exception {
+        parseQueryThatShouldFail("create cdc foo");
+        parseQueryThatShouldFail("create cdc foo on bar");
+        parseQueryThatShouldFail("create cdc foo on bar(ts integer)");
+        parseQueryThatShouldFail("create cdc foo on bar(ts1, ts2)");
+        parseQueryThatShouldFail("create cdc foo on bar(ts) include (abc)");
     }
 
     private void parseInvalidCreateCDC(String sql, int expRrrorCode) throws 
IOException {
@@ -1106,5 +1127,4 @@ public class QueryParserTest {
         parseQueryThatShouldFail("SELECT b, x from x WHERE x = "
                 + "b'0 10 ' --comment \n /* comment */ '00 000' \n \n ''");
     }
-
 }
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/util/CDCUtilTest.java 
b/phoenix-core/src/test/java/org/apache/phoenix/util/CDCUtilTest.java
new file mode 100644
index 0000000000..3282ba62c9
--- /dev/null
+++ b/phoenix-core/src/test/java/org/apache/phoenix/util/CDCUtilTest.java
@@ -0,0 +1,39 @@
+package org.apache.phoenix.util;
+
+import org.apache.phoenix.schema.PTable;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.HashSet;
+
+import static org.apache.phoenix.schema.PTable.CDCChangeScope.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class CDCUtilTest {
+    @Test
+    public void testScopeSetConstruction() throws Exception {
+        assertEquals(new HashSet<>(Arrays.asList(PRE)), 
CDCUtil.makeChangeScopeEnumsFromString(
+                "PRE"));
+        assertEquals(new HashSet<>(Arrays.asList(PRE)),
+                CDCUtil.makeChangeScopeEnumsFromString("PRE,"));
+        assertEquals(new HashSet<>(Arrays.asList(PRE)),
+                CDCUtil.makeChangeScopeEnumsFromString("PRE, PRE"));
+        assertEquals(new HashSet<>(Arrays.asList(PRE)),
+                CDCUtil.makeChangeScopeEnumsFromString("PRE,DUMMY"));
+        assertEquals(new HashSet<>(Arrays.asList(CHANGE, PRE, POST, LATEST)),
+                
CDCUtil.makeChangeScopeEnumsFromString("POST,PRE,CHANGE,LATEST"));
+    }
+
+    @Test
+    public void testScopeStringConstruction() throws Exception {
+        assertEquals(null, CDCUtil.makeChangeScopeStringFromEnums(null));
+        assertEquals("", CDCUtil.makeChangeScopeStringFromEnums(
+                new HashSet<PTable.CDCChangeScope>()));
+        assertEquals("CHANGE,PRE,POST,LATEST", 
CDCUtil.makeChangeScopeStringFromEnums(
+                new HashSet<>(Arrays.asList(CHANGE, PRE, POST, LATEST))));
+        assertEquals("CHANGE,PRE,POST,LATEST", 
CDCUtil.makeChangeScopeStringFromEnums(
+                new HashSet<>(Arrays.asList(PRE, LATEST, POST, CHANGE))));
+    }
+}

Reply via email to