This is an automated email from the ASF dual-hosted git repository.
palashc pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/master by this push:
new 37364634de PHOENIX-7458 : Create new SYSTEM tables for tracking CDC
Stream metadata (#2025)
37364634de is described below
commit 37364634de8cdf1b58194c628b10b8de916ff777
Author: Palash Chauhan <[email protected]>
AuthorDate: Mon Nov 18 14:35:05 2024 -0800
PHOENIX-7458 : Create new SYSTEM tables for tracking CDC Stream metadata
(#2025)
Co-authored-by: Palash Chauhan
<[email protected]>
---
.../phoenix/jdbc/PhoenixDatabaseMetaData.java | 17 +++++++++
.../phoenix/query/ConnectionQueryServicesImpl.java | 34 ++++++++++++++++++
.../query/ConnectionlessQueryServicesImpl.java | 14 ++++++++
.../org/apache/phoenix/query/QueryConstants.java | 41 ++++++++++++++++++++++
.../apache/phoenix/end2end/BasePermissionsIT.java | 6 ++--
.../MigrateSystemTablesToSystemNamespaceIT.java | 6 ++--
.../phoenix/end2end/ParallelStatsEnabledIT.java | 1 +
.../phoenix/end2end/QueryDatabaseMetaDataIT.java | 10 +++++-
.../SystemTablesCreationOnConnectionIT.java | 6 ++--
.../phoenix/end2end/TenantSpecificTablesDDLIT.java | 4 +++
.../ConnectionQueryServicesMetricsIT.java | 4 +++
11 files changed, 136 insertions(+), 7 deletions(-)
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
index e46eb927d5..ff250f40ba 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
@@ -451,6 +451,23 @@ public class PhoenixDatabaseMetaData implements
DatabaseMetaData {
public static final String SYSTEM_LOG_TABLE = "LOG";
public static final String SYSTEM_LOG_NAME =
SchemaUtil.getTableName(SYSTEM_CATALOG_SCHEMA, SYSTEM_LOG_TABLE);
+
+ public static final String SYSTEM_CDC_STREAM_STATUS_TABLE =
"CDC_STREAM_STATUS";
+ public static final String SYSTEM_CDC_STREAM_STATUS_NAME =
+ SchemaUtil.getTableName(SYSTEM_CATALOG_SCHEMA,
SYSTEM_CDC_STREAM_STATUS_TABLE);
+
+ public static final String SYSTEM_CDC_STREAM_TABLE = "CDC_STREAM";
+ public static final String SYSTEM_CDC_STREAM_NAME =
+ SchemaUtil.getTableName(SYSTEM_CATALOG_SCHEMA,
SYSTEM_CDC_STREAM_TABLE);
+ public static final String STREAM_NAME = "STREAM_NAME";
+ public static final String STREAM_STATUS = "STREAM_STATUS";
+ public static final String PARTITION_ID = "PARTITION_ID";
+ public static final String PARENT_PARTITION_ID = "PARENT_PARTITION_ID";
+ public static final String PARTITION_START_TIME = "PARTITION_START_TIME";
+ public static final String PARTITION_END_TIME = "PARTITION_END_TIME";
+ public static final String PARTITION_START_KEY = "PARTITION_START_KEY";
+ public static final String PARTITION_END_KEY = "PARTITION_END_KEY";
+
public static final String QUERY_ID = "QUERY_ID";
public static final String USER = "USER";
public static final String CLIENT_IP = "CLIENT_IP";
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index b69b2c06d3..bb6a648c7d 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -3715,6 +3715,14 @@ public class ConnectionQueryServicesImpl extends
DelegateQueryServices implement
return
setSystemDDLProperties(QueryConstants.CREATE_TRANSFORM_METADATA);
}
+ protected String getCDCStreamStatusDDL() {
+ return
setSystemDDLProperties(QueryConstants.CREATE_CDC_STREAM_STATUS_METADATA);
+ }
+
+ protected String getCDCStreamDDL() {
+ return
setSystemDDLProperties(QueryConstants.CREATE_CDC_STREAM_METADATA);
+ }
+
private String setSystemDDLProperties(String ddl) {
return String.format(ddl,
props.getInt(DEFAULT_SYSTEM_MAX_VERSIONS_ATTRIB,
QueryServicesOptions.DEFAULT_SYSTEM_MAX_VERSIONS),
@@ -4011,6 +4019,12 @@ public class ConnectionQueryServicesImpl extends
DelegateQueryServices implement
try {
metaConnection.createStatement().executeUpdate(getTransformDDL());
} catch (TableAlreadyExistsException ignore) {}
+ try {
+
metaConnection.createStatement().executeUpdate(getCDCStreamStatusDDL());
+ } catch (TableAlreadyExistsException ignore) {}
+ try {
+ metaConnection.createStatement().executeUpdate(getCDCStreamDDL());
+ } catch (TableAlreadyExistsException ignore) {}
}
/**
@@ -4647,6 +4661,8 @@ public class ConnectionQueryServicesImpl extends
DelegateQueryServices implement
metaConnection = upgradeSystemTransform(metaConnection,
systemTableToSnapshotMap);
metaConnection = upgradeSystemLog(metaConnection,
systemTableToSnapshotMap);
metaConnection = upgradeSystemMutex(metaConnection);
+ metaConnection = upgradeSystemCDCStreamStatus(metaConnection);
+ metaConnection = upgradeSystemCDCStream(metaConnection);
// As this is where the most time will be spent during an upgrade,
// especially when there are large number of views.
@@ -4982,6 +4998,24 @@ public class ConnectionQueryServicesImpl extends
DelegateQueryServices implement
return metaConnection;
}
+ private PhoenixConnection upgradeSystemCDCStreamStatus(PhoenixConnection
metaConnection)
+ throws SQLException {
+ try {
+
metaConnection.createStatement().executeUpdate(getCDCStreamStatusDDL());
+ } catch (TableAlreadyExistsException ignored) {
+ }
+ return metaConnection;
+ }
+
+ private PhoenixConnection upgradeSystemCDCStream(PhoenixConnection
metaConnection)
+ throws SQLException {
+ try {
+ metaConnection.createStatement().executeUpdate(getCDCStreamDDL());
+ } catch (TableAlreadyExistsException ignored) {
+ }
+ return metaConnection;
+ }
+
// Special method for adding the column qualifier column for 4.10.
private PhoenixConnection addColumnQualifierColumn(PhoenixConnection
oldMetaConnection, Long timestamp) throws SQLException {
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
index 6b39eeb50b..e2de008e73 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
@@ -208,6 +208,14 @@ public class ConnectionlessQueryServicesImpl extends
DelegateQueryServices imple
return
setSystemDDLProperties(QueryConstants.CREATE_TRANSFORM_METADATA);
}
+ protected String getCDCStreamStatusDDL() {
+ return
setSystemDDLProperties(QueryConstants.CREATE_CDC_STREAM_STATUS_METADATA);
+ }
+
+ protected String getCDCStreamDDL() {
+ return
setSystemDDLProperties(QueryConstants.CREATE_CDC_STREAM_METADATA);
+ }
+
private String setSystemDDLProperties(String ddl) {
return String.format(ddl,
props.getInt(DEFAULT_SYSTEM_MAX_VERSIONS_ATTRIB,
QueryServicesOptions.DEFAULT_SYSTEM_MAX_VERSIONS),
@@ -472,6 +480,12 @@ public class ConnectionlessQueryServicesImpl extends
DelegateQueryServices imple
.executeUpdate(getTransformDDL());
} catch (NewerTableAlreadyExistsException ignore) {
}
+ try {
+
metaConnection.createStatement().executeUpdate(getCDCStreamStatusDDL());
+ } catch (TableAlreadyExistsException ignore) {}
+ try {
+
metaConnection.createStatement().executeUpdate(getCDCStreamDDL());
+ } catch (TableAlreadyExistsException ignore) {}
} catch (SQLException e) {
sqlE = e;
} finally {
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryConstants.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryConstants.java
index ccadb5be19..5154bf67e5 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryConstants.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryConstants.java
@@ -105,6 +105,12 @@ import static
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.NUM_ARGS;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.NUM_PREC_RADIX;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.OLD_METADATA;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ORDINAL_POSITION;
+import static
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.PARENT_PARTITION_ID;
+import static
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.PARTITION_END_KEY;
+import static
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.PARTITION_END_TIME;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.PARTITION_ID;
+import static
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.PARTITION_START_KEY;
+import static
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.PARTITION_START_TIME;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.PHOENIX_TTL;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.PHOENIX_TTL_HWM;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.PHYSICAL_NAME;
@@ -134,8 +140,12 @@ import static
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.START_TIME;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.START_WITH;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.STORE_NULLS;
import static
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.STREAMING_TOPIC_NAME;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.STREAM_NAME;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.STREAM_STATUS;
import static
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA;
import static
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_TABLE;
+import static
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CDC_STREAM_STATUS_TABLE;
+import static
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CDC_STREAM_TABLE;
import static
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CHILD_LINK_TABLE;
import static
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_FUNCTION_TABLE;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_LOG_TABLE;
@@ -640,4 +650,35 @@ public interface QueryConstants {
+ SYSTEM_TASK_SPLIT_POLICY_CLASSNAME + "',\n" +
TRANSACTIONAL + "=" + Boolean.FALSE + ",\n" +
STORE_NULLS + "=" + Boolean.TRUE;
+
+ String CREATE_CDC_STREAM_STATUS_METADATA = "CREATE TABLE " +
SYSTEM_CATALOG_SCHEMA + ".\"" +
+ SYSTEM_CDC_STREAM_STATUS_TABLE + "\"(\n" +
+ // PK columns
+ TABLE_NAME + " VARCHAR NOT NULL," +
+ STREAM_STATUS + " VARCHAR NOT NULL," +
+ // Non-PK columns
+ STREAM_NAME + " VARCHAR,\n" +
+ "CONSTRAINT " + SYSTEM_TABLE_PK_NAME + " PRIMARY KEY (" +
+ TABLE_NAME + "," + STREAM_STATUS + "))\n" +
+ HConstants.VERSIONS + "=%s,\n" +
+ ColumnFamilyDescriptorBuilder.KEEP_DELETED_CELLS + "=%s,\n" +
+ TRANSACTIONAL + "=" + Boolean.FALSE;
+
+ String CREATE_CDC_STREAM_METADATA = "CREATE TABLE " +
SYSTEM_CATALOG_SCHEMA + ".\"" +
+ SYSTEM_CDC_STREAM_TABLE + "\"(\n" +
+ // PK columns
+ TABLE_NAME + " VARCHAR NOT NULL," +
+ STREAM_NAME + " VARCHAR NOT NULL," +
+ PARTITION_ID + " VARCHAR NOT NULL," +
+ // Non-PK columns
+ PARENT_PARTITION_ID + " VARCHAR," +
+ PARTITION_START_TIME + " BIGINT," +
+ PARTITION_END_TIME + " BIGINT," +
+ PARTITION_START_KEY + " VARBINARY_ENCODED," +
+ PARTITION_END_KEY + " VARBINARY_ENCODED,\n" +
+ "CONSTRAINT " + SYSTEM_TABLE_PK_NAME + " PRIMARY KEY (" +
+ TABLE_NAME + "," + STREAM_NAME + "," + PARTITION_ID + "))\n" +
+ HConstants.VERSIONS + "=%s,\n" +
+ ColumnFamilyDescriptorBuilder.KEEP_DELETED_CELLS + "=%s,\n" +
+ TRANSACTIONAL + "=" + Boolean.FALSE;
}
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/BasePermissionsIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/BasePermissionsIT.java
index 397c0b4992..53e8cb6894 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/BasePermissionsIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/BasePermissionsIT.java
@@ -103,7 +103,8 @@ public abstract class BasePermissionsIT extends BaseTest {
static HBaseTestingUtility testUtil;
private static final Set<String> PHOENIX_SYSTEM_TABLES =
new HashSet<>(Arrays.asList("SYSTEM.CATALOG", "SYSTEM.SEQUENCE",
"SYSTEM.STATS",
- "SYSTEM.FUNCTION", "SYSTEM.MUTEX", "SYSTEM.CHILD_LINK",
"SYSTEM.TRANSFORM"));
+ "SYSTEM.FUNCTION", "SYSTEM.MUTEX", "SYSTEM.CHILD_LINK",
"SYSTEM.TRANSFORM",
+ "SYSTEM.CDC_STREAM_STATUS", "SYSTEM.CDC_STREAM"));
private static final Set<String> PHOENIX_SYSTEM_TABLES_IDENTIFIERS =
new HashSet<>(Arrays.asList("SYSTEM.\"CATALOG\"",
"SYSTEM.\"SEQUENCE\"",
@@ -117,7 +118,8 @@ public abstract class BasePermissionsIT extends BaseTest {
+ PhoenixDatabaseMetaData.SYSTEM_MUTEX_TABLE_NAME + "\"";
static final Set<String> PHOENIX_NAMESPACE_MAPPED_SYSTEM_TABLES = new
HashSet<>(Arrays.asList(
- "SYSTEM:CATALOG", "SYSTEM:SEQUENCE", "SYSTEM:STATS",
"SYSTEM:FUNCTION", "SYSTEM:MUTEX", "SYSTEM:CHILD_LINK","SYSTEM:TRANSFORM"));
+ "SYSTEM:CATALOG", "SYSTEM:SEQUENCE", "SYSTEM:STATS",
"SYSTEM:FUNCTION", "SYSTEM:MUTEX",
+ "SYSTEM:CHILD_LINK","SYSTEM:TRANSFORM",
"SYSTEM:CDC_STREAM_STATUS", "SYSTEM:CDC_STREAM"));
// Create Multiple users so that we can use Hadoop UGI to run tasks as
various users
// Permissions can be granted or revoke by superusers and admins only
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MigrateSystemTablesToSystemNamespaceIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MigrateSystemTablesToSystemNamespaceIT.java
index 918f7a1928..f9ceccfe65 100644
---
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MigrateSystemTablesToSystemNamespaceIT.java
+++
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MigrateSystemTablesToSystemNamespaceIT.java
@@ -66,10 +66,12 @@ public class MigrateSystemTablesToSystemNamespaceIT extends
BaseTest {
private static final Set<String> PHOENIX_SYSTEM_TABLES = new
HashSet<>(Arrays.asList(
"SYSTEM.CATALOG", "SYSTEM.SEQUENCE", "SYSTEM.STATS",
"SYSTEM.FUNCTION",
- "SYSTEM.MUTEX","SYSTEM.LOG", "SYSTEM.CHILD_LINK", "SYSTEM.TASK",
"SYSTEM.TRANSFORM"));
+ "SYSTEM.MUTEX","SYSTEM.LOG", "SYSTEM.CHILD_LINK", "SYSTEM.TASK",
"SYSTEM.TRANSFORM",
+ "SYSTEM.CDC_STREAM_STATUS", "SYSTEM.CDC_STREAM"));
private static final Set<String> PHOENIX_NAMESPACE_MAPPED_SYSTEM_TABLES =
new HashSet<>(
Arrays.asList("SYSTEM:CATALOG", "SYSTEM:SEQUENCE", "SYSTEM:STATS",
"SYSTEM:FUNCTION",
- "SYSTEM:MUTEX","SYSTEM:LOG", "SYSTEM:CHILD_LINK",
"SYSTEM:TASK", "SYSTEM:TRANSFORM"));
+ "SYSTEM:MUTEX","SYSTEM:LOG", "SYSTEM:CHILD_LINK",
"SYSTEM:TASK", "SYSTEM:TRANSFORM",
+ "SYSTEM:CDC_STREAM_STATUS", "SYSTEM:CDC_STREAM"));
private static final String SCHEMA_NAME = "MIGRATETEST";
private static final String TABLE_NAME =
SCHEMA_NAME + "." +
MigrateSystemTablesToSystemNamespaceIT.class.getSimpleName().toUpperCase();
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ParallelStatsEnabledIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ParallelStatsEnabledIT.java
index 50c0ad2359..e33b2e2b3f 100644
---
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ParallelStatsEnabledIT.java
+++
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ParallelStatsEnabledIT.java
@@ -48,6 +48,7 @@ public abstract class ParallelStatsEnabledIT extends BaseTest
{
props.put(QueryServices.STATS_UPDATE_FREQ_MS_ATTRIB, Long.toString(5));
props.put(QueryServices.MAX_SERVER_METADATA_CACHE_TIME_TO_LIVE_MS_ATTRIB,
Long.toString(5));
props.put(QueryServices.USE_STATS_FOR_PARALLELIZATION,
Boolean.toString(true));
+ props.put(QueryServices.TASK_HANDLING_INITIAL_DELAY_MS_ATTRIB,
Long.toString(Long.MAX_VALUE));
setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
TaskRegionEnvironment =
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryDatabaseMetaDataIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryDatabaseMetaDataIT.java
index abdec219d4..2549517fd0 100644
---
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryDatabaseMetaDataIT.java
+++
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryDatabaseMetaDataIT.java
@@ -175,6 +175,14 @@ public class QueryDatabaseMetaDataIT extends
ParallelStatsDisabledIT {
assertEquals(PTableType.SYSTEM.toString(),
rs.getString("TABLE_TYPE"));
assertTrue(rs.next());
assertEquals(SYSTEM_CATALOG_SCHEMA, rs.getString("TABLE_SCHEM"));
+ assertEquals(PhoenixDatabaseMetaData.SYSTEM_CDC_STREAM_TABLE,
rs.getString("TABLE_NAME"));
+ assertEquals(PTableType.SYSTEM.toString(),
rs.getString("TABLE_TYPE"));
+ assertTrue(rs.next());
+ assertEquals(SYSTEM_CATALOG_SCHEMA, rs.getString("TABLE_SCHEM"));
+
assertEquals(PhoenixDatabaseMetaData.SYSTEM_CDC_STREAM_STATUS_TABLE,
rs.getString("TABLE_NAME"));
+ assertEquals(PTableType.SYSTEM.toString(),
rs.getString("TABLE_TYPE"));
+ assertTrue(rs.next());
+ assertEquals(SYSTEM_CATALOG_SCHEMA, rs.getString("TABLE_SCHEM"));
assertEquals(SYSTEM_CHILD_LINK_TABLE, rs.getString("TABLE_NAME"));
assertEquals(PTableType.SYSTEM.toString(),
rs.getString("TABLE_TYPE"));
assertTrue(rs.next());
@@ -422,7 +430,7 @@ public class QueryDatabaseMetaDataIT extends
ParallelStatsDisabledIT {
tables.add(rs.getString("TABLE_NAME"));
assertEquals("SYSTEM", rs.getString("TABLE_SCHEM"));
}
- assertEquals(9, tables.size());
+ assertEquals(11, tables.size());
assertTrue(tables.contains("CATALOG"));
assertTrue(tables.contains("FUNCTION"));
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SystemTablesCreationOnConnectionIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SystemTablesCreationOnConnectionIT.java
index 3faaaf9783..84074c349a 100644
---
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SystemTablesCreationOnConnectionIT.java
+++
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SystemTablesCreationOnConnectionIT.java
@@ -111,11 +111,13 @@ public class SystemTablesCreationOnConnectionIT {
private static final Set<String> PHOENIX_SYSTEM_TABLES = new
HashSet<>(Arrays.asList(
"SYSTEM.CATALOG", "SYSTEM.SEQUENCE", "SYSTEM.STATS",
"SYSTEM.FUNCTION",
- "SYSTEM.MUTEX", "SYSTEM.LOG", "SYSTEM.CHILD_LINK",
"SYSTEM.TASK","SYSTEM.TRANSFORM"));
+ "SYSTEM.MUTEX", "SYSTEM.LOG", "SYSTEM.CHILD_LINK",
"SYSTEM.TASK","SYSTEM.TRANSFORM",
+ "SYSTEM.CDC_STREAM_STATUS", "SYSTEM.CDC_STREAM"));
private static final Set<String> PHOENIX_NAMESPACE_MAPPED_SYSTEM_TABLES =
new HashSet<>(
Arrays.asList("SYSTEM:CATALOG", "SYSTEM:SEQUENCE", "SYSTEM:STATS",
"SYSTEM:FUNCTION",
- "SYSTEM:MUTEX", "SYSTEM:LOG", "SYSTEM:CHILD_LINK",
"SYSTEM:TASK", "SYSTEM:TRANSFORM"));
+ "SYSTEM:MUTEX", "SYSTEM:LOG", "SYSTEM:CHILD_LINK",
"SYSTEM:TASK", "SYSTEM:TRANSFORM",
+ "SYSTEM:CDC_STREAM_STATUS", "SYSTEM:CDC_STREAM"));
private static class PhoenixSysCatCreationServices extends
ConnectionQueryServicesImpl {
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TenantSpecificTablesDDLIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TenantSpecificTablesDDLIT.java
index 5a66f9c5d4..ab6e183103 100644
---
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TenantSpecificTablesDDLIT.java
+++
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TenantSpecificTablesDDLIT.java
@@ -553,6 +553,10 @@ public class TenantSpecificTablesDDLIT extends
BaseTenantSpecificTablesIT {
assertTrue(rs.next());
assertTableMetaData(rs,
PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA,
PhoenixDatabaseMetaData.SYSTEM_CATALOG_TABLE, PTableType.SYSTEM);
assertTrue(rs.next());
+ assertTableMetaData(rs, SYSTEM_CATALOG_SCHEMA,
PhoenixDatabaseMetaData.SYSTEM_CDC_STREAM_TABLE, PTableType.SYSTEM);
+ assertTrue(rs.next());
+ assertTableMetaData(rs, SYSTEM_CATALOG_SCHEMA,
PhoenixDatabaseMetaData.SYSTEM_CDC_STREAM_STATUS_TABLE, PTableType.SYSTEM);
+ assertTrue(rs.next());
assertTableMetaData(rs,
PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA,
PhoenixDatabaseMetaData.SYSTEM_CHILD_LINK_TABLE, PTableType.SYSTEM);
assertTrue(rs.next());
assertTableMetaData(rs, SYSTEM_CATALOG_SCHEMA,
SYSTEM_FUNCTION_TABLE, SYSTEM);
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/connectionqueryservice/ConnectionQueryServicesMetricsIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/connectionqueryservice/ConnectionQueryServicesMetricsIT.java
index 0b3b6236a0..db0f48450c 100644
---
a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/connectionqueryservice/ConnectionQueryServicesMetricsIT.java
+++
b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/connectionqueryservice/ConnectionQueryServicesMetricsIT.java
@@ -109,6 +109,10 @@ public class ConnectionQueryServicesMetricsIT extends
BaseTest {
}
});
Configuration conf =
HBaseFactoryProvider.getConfigurationFactory().getConfiguration();
+ conf.set(QueryServices.TASK_HANDLING_INTERVAL_MS_ATTRIB,
+ Long.toString(Long.MAX_VALUE));
+ conf.set(QueryServices.TASK_HANDLING_INITIAL_DELAY_MS_ATTRIB,
+ Long.toString(Long.MAX_VALUE));
hbaseTestUtil = new HBaseTestingUtility(conf);
setUpConfigForMiniCluster(conf);
conf.set(QueryServices.EXTRA_JDBC_ARGUMENTS_ATTRIB,