This is an automated email from the ASF dual-hosted git repository.

palashc pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/master by this push:
     new 37364634de PHOENIX-7458 : Create new SYSTEM tables for tracking CDC 
Stream metadata (#2025)
37364634de is described below

commit 37364634de8cdf1b58194c628b10b8de916ff777
Author: Palash Chauhan <[email protected]>
AuthorDate: Mon Nov 18 14:35:05 2024 -0800

    PHOENIX-7458 : Create new SYSTEM tables for tracking CDC Stream metadata 
(#2025)
    
    Co-authored-by: Palash Chauhan 
<[email protected]>
---
 .../phoenix/jdbc/PhoenixDatabaseMetaData.java      | 17 +++++++++
 .../phoenix/query/ConnectionQueryServicesImpl.java | 34 ++++++++++++++++++
 .../query/ConnectionlessQueryServicesImpl.java     | 14 ++++++++
 .../org/apache/phoenix/query/QueryConstants.java   | 41 ++++++++++++++++++++++
 .../apache/phoenix/end2end/BasePermissionsIT.java  |  6 ++--
 .../MigrateSystemTablesToSystemNamespaceIT.java    |  6 ++--
 .../phoenix/end2end/ParallelStatsEnabledIT.java    |  1 +
 .../phoenix/end2end/QueryDatabaseMetaDataIT.java   | 10 +++++-
 .../SystemTablesCreationOnConnectionIT.java        |  6 ++--
 .../phoenix/end2end/TenantSpecificTablesDDLIT.java |  4 +++
 .../ConnectionQueryServicesMetricsIT.java          |  4 +++
 11 files changed, 136 insertions(+), 7 deletions(-)

diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
index e46eb927d5..ff250f40ba 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
@@ -451,6 +451,23 @@ public class PhoenixDatabaseMetaData implements 
DatabaseMetaData {
     public static final String SYSTEM_LOG_TABLE = "LOG";
     public static final String SYSTEM_LOG_NAME =
             SchemaUtil.getTableName(SYSTEM_CATALOG_SCHEMA, SYSTEM_LOG_TABLE);
+
+    public static final String SYSTEM_CDC_STREAM_STATUS_TABLE = 
"CDC_STREAM_STATUS";
+    public static final String SYSTEM_CDC_STREAM_STATUS_NAME =
+            SchemaUtil.getTableName(SYSTEM_CATALOG_SCHEMA, 
SYSTEM_CDC_STREAM_STATUS_TABLE);
+
+    public static final String SYSTEM_CDC_STREAM_TABLE = "CDC_STREAM";
+    public static final String SYSTEM_CDC_STREAM_NAME =
+            SchemaUtil.getTableName(SYSTEM_CATALOG_SCHEMA, 
SYSTEM_CDC_STREAM_TABLE);
+    public static final String STREAM_NAME = "STREAM_NAME";
+    public static final String STREAM_STATUS = "STREAM_STATUS";
+    public static final String PARTITION_ID = "PARTITION_ID";
+    public static final String PARENT_PARTITION_ID = "PARENT_PARTITION_ID";
+    public static final String PARTITION_START_TIME = "PARTITION_START_TIME";
+    public static final String PARTITION_END_TIME = "PARTITION_END_TIME";
+    public static final String PARTITION_START_KEY = "PARTITION_START_KEY";
+    public static final String PARTITION_END_KEY = "PARTITION_END_KEY";
+
     public static final String QUERY_ID = "QUERY_ID";
     public static final String USER = "USER";
     public static final String CLIENT_IP = "CLIENT_IP";
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index b69b2c06d3..bb6a648c7d 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -3715,6 +3715,14 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
         return 
setSystemDDLProperties(QueryConstants.CREATE_TRANSFORM_METADATA);
     }
 
+    protected String getCDCStreamStatusDDL() {
+        return 
setSystemDDLProperties(QueryConstants.CREATE_CDC_STREAM_STATUS_METADATA);
+    }
+
+    protected String getCDCStreamDDL() {
+        return 
setSystemDDLProperties(QueryConstants.CREATE_CDC_STREAM_METADATA);
+    }
+
     private String setSystemDDLProperties(String ddl) {
         return String.format(ddl,
           props.getInt(DEFAULT_SYSTEM_MAX_VERSIONS_ATTRIB, 
QueryServicesOptions.DEFAULT_SYSTEM_MAX_VERSIONS),
@@ -4011,6 +4019,12 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
         try {
             metaConnection.createStatement().executeUpdate(getTransformDDL());
         } catch (TableAlreadyExistsException ignore) {}
+        try {
+            
metaConnection.createStatement().executeUpdate(getCDCStreamStatusDDL());
+        } catch (TableAlreadyExistsException ignore) {}
+        try {
+            metaConnection.createStatement().executeUpdate(getCDCStreamDDL());
+        } catch (TableAlreadyExistsException ignore) {}
     }
 
     /**
@@ -4647,6 +4661,8 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
         metaConnection = upgradeSystemTransform(metaConnection, 
systemTableToSnapshotMap);
         metaConnection = upgradeSystemLog(metaConnection, 
systemTableToSnapshotMap);
         metaConnection = upgradeSystemMutex(metaConnection);
+        metaConnection = upgradeSystemCDCStreamStatus(metaConnection);
+        metaConnection = upgradeSystemCDCStream(metaConnection);
 
         // As this is where the most time will be spent during an upgrade,
         // especially when there are large number of views.
@@ -4982,6 +4998,24 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
         return metaConnection;
     }
 
+    private PhoenixConnection upgradeSystemCDCStreamStatus(PhoenixConnection 
metaConnection)
+            throws SQLException {
+        try {
+            
metaConnection.createStatement().executeUpdate(getCDCStreamStatusDDL());
+        } catch (TableAlreadyExistsException ignored) {
+        }
+        return metaConnection;
+    }
+
+    private PhoenixConnection upgradeSystemCDCStream(PhoenixConnection 
metaConnection)
+            throws SQLException {
+        try {
+            metaConnection.createStatement().executeUpdate(getCDCStreamDDL());
+        } catch (TableAlreadyExistsException ignored) {
+        }
+        return metaConnection;
+    }
+
 
     // Special method for adding the column qualifier column for 4.10. 
     private PhoenixConnection addColumnQualifierColumn(PhoenixConnection 
oldMetaConnection, Long timestamp) throws SQLException {
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
index 6b39eeb50b..e2de008e73 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
@@ -208,6 +208,14 @@ public class ConnectionlessQueryServicesImpl extends 
DelegateQueryServices imple
         return 
setSystemDDLProperties(QueryConstants.CREATE_TRANSFORM_METADATA);
     }
 
+    protected String getCDCStreamStatusDDL() {
+        return 
setSystemDDLProperties(QueryConstants.CREATE_CDC_STREAM_STATUS_METADATA);
+    }
+
+    protected String getCDCStreamDDL() {
+        return 
setSystemDDLProperties(QueryConstants.CREATE_CDC_STREAM_METADATA);
+    }
+
     private String setSystemDDLProperties(String ddl) {
         return String.format(ddl,
           props.getInt(DEFAULT_SYSTEM_MAX_VERSIONS_ATTRIB, 
QueryServicesOptions.DEFAULT_SYSTEM_MAX_VERSIONS),
@@ -472,6 +480,12 @@ public class ConnectionlessQueryServicesImpl extends 
DelegateQueryServices imple
                             .executeUpdate(getTransformDDL());
                 } catch (NewerTableAlreadyExistsException ignore) {
                 }
+                try {
+                    
metaConnection.createStatement().executeUpdate(getCDCStreamStatusDDL());
+                } catch (TableAlreadyExistsException ignore) {}
+                try {
+                    
metaConnection.createStatement().executeUpdate(getCDCStreamDDL());
+                } catch (TableAlreadyExistsException ignore) {}
             } catch (SQLException e) {
                 sqlE = e;
             } finally {
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryConstants.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryConstants.java
index ccadb5be19..5154bf67e5 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryConstants.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryConstants.java
@@ -105,6 +105,12 @@ import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.NUM_ARGS;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.NUM_PREC_RADIX;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.OLD_METADATA;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ORDINAL_POSITION;
+import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.PARENT_PARTITION_ID;
+import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.PARTITION_END_KEY;
+import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.PARTITION_END_TIME;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.PARTITION_ID;
+import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.PARTITION_START_KEY;
+import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.PARTITION_START_TIME;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.PHOENIX_TTL;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.PHOENIX_TTL_HWM;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.PHYSICAL_NAME;
@@ -134,8 +140,12 @@ import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.START_TIME;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.START_WITH;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.STORE_NULLS;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.STREAMING_TOPIC_NAME;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.STREAM_NAME;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.STREAM_STATUS;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_TABLE;
+import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CDC_STREAM_STATUS_TABLE;
+import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CDC_STREAM_TABLE;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CHILD_LINK_TABLE;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_FUNCTION_TABLE;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_LOG_TABLE;
@@ -640,4 +650,35 @@ public interface QueryConstants {
             + SYSTEM_TASK_SPLIT_POLICY_CLASSNAME + "',\n" +
             TRANSACTIONAL + "=" + Boolean.FALSE + ",\n" +
             STORE_NULLS + "=" + Boolean.TRUE;
+
+    String CREATE_CDC_STREAM_STATUS_METADATA = "CREATE TABLE " + 
SYSTEM_CATALOG_SCHEMA + ".\"" +
+            SYSTEM_CDC_STREAM_STATUS_TABLE + "\"(\n" +
+            // PK columns
+            TABLE_NAME + " VARCHAR NOT NULL," +
+            STREAM_STATUS + " VARCHAR NOT NULL," +
+            // Non-PK columns
+            STREAM_NAME + " VARCHAR,\n" +
+            "CONSTRAINT " + SYSTEM_TABLE_PK_NAME + " PRIMARY KEY (" +
+            TABLE_NAME + "," + STREAM_STATUS + "))\n" +
+            HConstants.VERSIONS + "=%s,\n" +
+            ColumnFamilyDescriptorBuilder.KEEP_DELETED_CELLS + "=%s,\n" +
+            TRANSACTIONAL + "=" + Boolean.FALSE;
+
+    String CREATE_CDC_STREAM_METADATA = "CREATE TABLE " + 
SYSTEM_CATALOG_SCHEMA + ".\"" +
+            SYSTEM_CDC_STREAM_TABLE + "\"(\n" +
+            // PK columns
+            TABLE_NAME + " VARCHAR NOT NULL," +
+            STREAM_NAME + " VARCHAR NOT NULL," +
+            PARTITION_ID + " VARCHAR NOT NULL," +
+            // Non-PK columns
+            PARENT_PARTITION_ID + " VARCHAR," +
+            PARTITION_START_TIME + " BIGINT," +
+            PARTITION_END_TIME + " BIGINT," +
+            PARTITION_START_KEY + " VARBINARY_ENCODED," +
+            PARTITION_END_KEY + " VARBINARY_ENCODED,\n" +
+            "CONSTRAINT " + SYSTEM_TABLE_PK_NAME + " PRIMARY KEY (" +
+            TABLE_NAME + "," + STREAM_NAME + "," + PARTITION_ID + "))\n" +
+            HConstants.VERSIONS + "=%s,\n" +
+            ColumnFamilyDescriptorBuilder.KEEP_DELETED_CELLS + "=%s,\n" +
+            TRANSACTIONAL + "=" + Boolean.FALSE;
 }
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/BasePermissionsIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/BasePermissionsIT.java
index 397c0b4992..53e8cb6894 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/BasePermissionsIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/BasePermissionsIT.java
@@ -103,7 +103,8 @@ public abstract class BasePermissionsIT extends BaseTest {
     static HBaseTestingUtility testUtil;
     private static final Set<String> PHOENIX_SYSTEM_TABLES =
             new HashSet<>(Arrays.asList("SYSTEM.CATALOG", "SYSTEM.SEQUENCE", 
"SYSTEM.STATS",
-                "SYSTEM.FUNCTION", "SYSTEM.MUTEX", "SYSTEM.CHILD_LINK", 
"SYSTEM.TRANSFORM"));
+                "SYSTEM.FUNCTION", "SYSTEM.MUTEX", "SYSTEM.CHILD_LINK", 
"SYSTEM.TRANSFORM",
+                    "SYSTEM.CDC_STREAM_STATUS", "SYSTEM.CDC_STREAM"));
 
     private static final Set<String> PHOENIX_SYSTEM_TABLES_IDENTIFIERS =
             new HashSet<>(Arrays.asList("SYSTEM.\"CATALOG\"", 
"SYSTEM.\"SEQUENCE\"",
@@ -117,7 +118,8 @@ public abstract class BasePermissionsIT extends BaseTest {
                     + PhoenixDatabaseMetaData.SYSTEM_MUTEX_TABLE_NAME + "\"";
 
     static final Set<String> PHOENIX_NAMESPACE_MAPPED_SYSTEM_TABLES = new 
HashSet<>(Arrays.asList(
-            "SYSTEM:CATALOG", "SYSTEM:SEQUENCE", "SYSTEM:STATS", 
"SYSTEM:FUNCTION", "SYSTEM:MUTEX", "SYSTEM:CHILD_LINK","SYSTEM:TRANSFORM"));
+            "SYSTEM:CATALOG", "SYSTEM:SEQUENCE", "SYSTEM:STATS", 
"SYSTEM:FUNCTION", "SYSTEM:MUTEX",
+            "SYSTEM:CHILD_LINK","SYSTEM:TRANSFORM", 
"SYSTEM:CDC_STREAM_STATUS", "SYSTEM:CDC_STREAM"));
 
     // Create Multiple users so that we can use Hadoop UGI to run tasks as 
various users
     // Permissions can be granted or revoke by superusers and admins only
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MigrateSystemTablesToSystemNamespaceIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MigrateSystemTablesToSystemNamespaceIT.java
index 918f7a1928..f9ceccfe65 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MigrateSystemTablesToSystemNamespaceIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MigrateSystemTablesToSystemNamespaceIT.java
@@ -66,10 +66,12 @@ public class MigrateSystemTablesToSystemNamespaceIT extends 
BaseTest {
 
     private static final Set<String> PHOENIX_SYSTEM_TABLES = new 
HashSet<>(Arrays.asList(
             "SYSTEM.CATALOG", "SYSTEM.SEQUENCE", "SYSTEM.STATS", 
"SYSTEM.FUNCTION",
-            "SYSTEM.MUTEX","SYSTEM.LOG", "SYSTEM.CHILD_LINK", "SYSTEM.TASK", 
"SYSTEM.TRANSFORM"));
+            "SYSTEM.MUTEX","SYSTEM.LOG", "SYSTEM.CHILD_LINK", "SYSTEM.TASK", 
"SYSTEM.TRANSFORM",
+            "SYSTEM.CDC_STREAM_STATUS", "SYSTEM.CDC_STREAM"));
     private static final Set<String> PHOENIX_NAMESPACE_MAPPED_SYSTEM_TABLES = 
new HashSet<>(
             Arrays.asList("SYSTEM:CATALOG", "SYSTEM:SEQUENCE", "SYSTEM:STATS", 
"SYSTEM:FUNCTION",
-                    "SYSTEM:MUTEX","SYSTEM:LOG", "SYSTEM:CHILD_LINK", 
"SYSTEM:TASK", "SYSTEM:TRANSFORM"));
+                    "SYSTEM:MUTEX","SYSTEM:LOG", "SYSTEM:CHILD_LINK", 
"SYSTEM:TASK", "SYSTEM:TRANSFORM",
+                    "SYSTEM:CDC_STREAM_STATUS", "SYSTEM:CDC_STREAM"));
     private static final String SCHEMA_NAME = "MIGRATETEST";
     private static final String TABLE_NAME =
             SCHEMA_NAME + "." + 
MigrateSystemTablesToSystemNamespaceIT.class.getSimpleName().toUpperCase();
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ParallelStatsEnabledIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ParallelStatsEnabledIT.java
index 50c0ad2359..e33b2e2b3f 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ParallelStatsEnabledIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ParallelStatsEnabledIT.java
@@ -48,6 +48,7 @@ public abstract class ParallelStatsEnabledIT extends BaseTest 
{
         props.put(QueryServices.STATS_UPDATE_FREQ_MS_ATTRIB, Long.toString(5));
         
props.put(QueryServices.MAX_SERVER_METADATA_CACHE_TIME_TO_LIVE_MS_ATTRIB, 
Long.toString(5));
         props.put(QueryServices.USE_STATS_FOR_PARALLELIZATION, 
Boolean.toString(true));
+        props.put(QueryServices.TASK_HANDLING_INITIAL_DELAY_MS_ATTRIB, 
Long.toString(Long.MAX_VALUE));
         setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
 
         TaskRegionEnvironment =
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryDatabaseMetaDataIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryDatabaseMetaDataIT.java
index abdec219d4..2549517fd0 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryDatabaseMetaDataIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryDatabaseMetaDataIT.java
@@ -175,6 +175,14 @@ public class QueryDatabaseMetaDataIT extends 
ParallelStatsDisabledIT {
             assertEquals(PTableType.SYSTEM.toString(), 
rs.getString("TABLE_TYPE"));
             assertTrue(rs.next());
             assertEquals(SYSTEM_CATALOG_SCHEMA, rs.getString("TABLE_SCHEM"));
+            assertEquals(PhoenixDatabaseMetaData.SYSTEM_CDC_STREAM_TABLE, 
rs.getString("TABLE_NAME"));
+            assertEquals(PTableType.SYSTEM.toString(), 
rs.getString("TABLE_TYPE"));
+            assertTrue(rs.next());
+            assertEquals(SYSTEM_CATALOG_SCHEMA, rs.getString("TABLE_SCHEM"));
+            
assertEquals(PhoenixDatabaseMetaData.SYSTEM_CDC_STREAM_STATUS_TABLE, 
rs.getString("TABLE_NAME"));
+            assertEquals(PTableType.SYSTEM.toString(), 
rs.getString("TABLE_TYPE"));
+            assertTrue(rs.next());
+            assertEquals(SYSTEM_CATALOG_SCHEMA, rs.getString("TABLE_SCHEM"));
             assertEquals(SYSTEM_CHILD_LINK_TABLE, rs.getString("TABLE_NAME"));
             assertEquals(PTableType.SYSTEM.toString(), 
rs.getString("TABLE_TYPE"));
             assertTrue(rs.next());
@@ -422,7 +430,7 @@ public class QueryDatabaseMetaDataIT extends 
ParallelStatsDisabledIT {
                 tables.add(rs.getString("TABLE_NAME"));
                 assertEquals("SYSTEM", rs.getString("TABLE_SCHEM"));
             }
-            assertEquals(9, tables.size());
+            assertEquals(11, tables.size());
             assertTrue(tables.contains("CATALOG"));
             assertTrue(tables.contains("FUNCTION"));
 
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SystemTablesCreationOnConnectionIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SystemTablesCreationOnConnectionIT.java
index 3faaaf9783..84074c349a 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SystemTablesCreationOnConnectionIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SystemTablesCreationOnConnectionIT.java
@@ -111,11 +111,13 @@ public class SystemTablesCreationOnConnectionIT {
 
     private static final Set<String> PHOENIX_SYSTEM_TABLES = new 
HashSet<>(Arrays.asList(
             "SYSTEM.CATALOG", "SYSTEM.SEQUENCE", "SYSTEM.STATS", 
"SYSTEM.FUNCTION",
-            "SYSTEM.MUTEX", "SYSTEM.LOG", "SYSTEM.CHILD_LINK", 
"SYSTEM.TASK","SYSTEM.TRANSFORM"));
+            "SYSTEM.MUTEX", "SYSTEM.LOG", "SYSTEM.CHILD_LINK", 
"SYSTEM.TASK","SYSTEM.TRANSFORM",
+            "SYSTEM.CDC_STREAM_STATUS", "SYSTEM.CDC_STREAM"));
 
     private static final Set<String> PHOENIX_NAMESPACE_MAPPED_SYSTEM_TABLES = 
new HashSet<>(
             Arrays.asList("SYSTEM:CATALOG", "SYSTEM:SEQUENCE", "SYSTEM:STATS", 
"SYSTEM:FUNCTION",
-                    "SYSTEM:MUTEX", "SYSTEM:LOG", "SYSTEM:CHILD_LINK", 
"SYSTEM:TASK", "SYSTEM:TRANSFORM"));
+                    "SYSTEM:MUTEX", "SYSTEM:LOG", "SYSTEM:CHILD_LINK", 
"SYSTEM:TASK", "SYSTEM:TRANSFORM",
+                    "SYSTEM:CDC_STREAM_STATUS", "SYSTEM:CDC_STREAM"));
 
     private static class PhoenixSysCatCreationServices extends 
ConnectionQueryServicesImpl {
 
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TenantSpecificTablesDDLIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TenantSpecificTablesDDLIT.java
index 5a66f9c5d4..ab6e183103 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TenantSpecificTablesDDLIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TenantSpecificTablesDDLIT.java
@@ -553,6 +553,10 @@ public class TenantSpecificTablesDDLIT extends 
BaseTenantSpecificTablesIT {
             assertTrue(rs.next());
             assertTableMetaData(rs, 
PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA, 
PhoenixDatabaseMetaData.SYSTEM_CATALOG_TABLE, PTableType.SYSTEM);
             assertTrue(rs.next());
+            assertTableMetaData(rs, SYSTEM_CATALOG_SCHEMA, 
PhoenixDatabaseMetaData.SYSTEM_CDC_STREAM_TABLE, PTableType.SYSTEM);
+            assertTrue(rs.next());
+            assertTableMetaData(rs, SYSTEM_CATALOG_SCHEMA, 
PhoenixDatabaseMetaData.SYSTEM_CDC_STREAM_STATUS_TABLE, PTableType.SYSTEM);
+            assertTrue(rs.next());
             assertTableMetaData(rs, 
PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA, 
PhoenixDatabaseMetaData.SYSTEM_CHILD_LINK_TABLE, PTableType.SYSTEM);
             assertTrue(rs.next());
             assertTableMetaData(rs, SYSTEM_CATALOG_SCHEMA, 
SYSTEM_FUNCTION_TABLE, SYSTEM);
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/connectionqueryservice/ConnectionQueryServicesMetricsIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/connectionqueryservice/ConnectionQueryServicesMetricsIT.java
index 0b3b6236a0..db0f48450c 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/connectionqueryservice/ConnectionQueryServicesMetricsIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/connectionqueryservice/ConnectionQueryServicesMetricsIT.java
@@ -109,6 +109,10 @@ public class ConnectionQueryServicesMetricsIT extends 
BaseTest {
             }
         });
         Configuration conf = 
HBaseFactoryProvider.getConfigurationFactory().getConfiguration();
+        conf.set(QueryServices.TASK_HANDLING_INTERVAL_MS_ATTRIB,
+                Long.toString(Long.MAX_VALUE));
+        conf.set(QueryServices.TASK_HANDLING_INITIAL_DELAY_MS_ATTRIB,
+                Long.toString(Long.MAX_VALUE));
         hbaseTestUtil = new HBaseTestingUtility(conf);
         setUpConfigForMiniCluster(conf);
         conf.set(QueryServices.EXTRA_JDBC_ARGUMENTS_ATTRIB,

Reply via email to