This is an automated email from the ASF dual-hosted git repository.
nicholasjiang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-flink.git
The following commit(s) were added to refs/heads/main by this push:
new b7c00f3 [issues-32] Implement of Apache RocketMQ Flink Catalog. (#49)
b7c00f3 is described below
commit b7c00f3ea71b7145e6ad28c7f71b0eb507adb332
Author: 李晓双 Li Xiao Shuang <[email protected]>
AuthorDate: Tue Aug 23 12:45:50 2022 +0800
[issues-32] Implement of Apache RocketMQ Flink Catalog. (#49)
Co-authored-by: lixiaoshuang <[email protected]>
Co-authored-by: gongzhongqiang <[email protected]>
---
README.md | 4 +-
pom.xml | 38 +-
.../rocketmq/flink/catalog/RocketMQCatalog.java | 491 +++++++++++++++++++++
.../flink/catalog/RocketMQCatalogFactory.java | 66 +++
.../catalog/RocketMQCatalogFactoryOptions.java | 53 +++
.../common/constant/RocketMqCatalogConstant.java | 28 ++
.../common/constant/SchemaRegistryConstant.java | 27 ++
.../org.apache.flink.table.factories.Factory | 3 +-
.../flink/catalog/RocketMQCatalogFactoryTest.java | 67 +++
.../flink/catalog/RocketMQCatalogTest.java | 398 +++++++++++++++++
10 files changed, 1158 insertions(+), 17 deletions(-)
diff --git a/README.md b/README.md
index 9ecf41b..fa53dd7 100644
--- a/README.md
+++ b/README.md
@@ -145,7 +145,7 @@ CREATE TABLE rocketmq_source (
) WITH (
'connector' = 'rocketmq',
'topic' = 'user_behavior',
- 'consumeGroup' = 'behavior_consume_group',
+ 'consumerGroup' = 'behavior_consumer_group',
'nameServerAddress' = '127.0.0.1:9876'
);
@@ -183,7 +183,7 @@ CREATE TABLE rocketmq_source (
) WITH (
'connector' = 'rocketmq',
'topic' = 'user_behavior',
- 'consumeGroup' = 'behavior_consume_group',
+ 'consumerGroup' = 'behavior_consumer_group',
'nameServerAddress' = '127.0.0.1:9876'
);
```
diff --git a/pom.xml b/pom.xml
index 72268c9..1b725e1 100644
--- a/pom.xml
+++ b/pom.xml
@@ -37,7 +37,8 @@
<flink.version>1.15.0</flink.version>
<commons-lang.version>2.6</commons-lang.version>
<spotless.version>2.4.2</spotless.version>
- <jaxb-api.version>2.3.1</jaxb-api.version>
+ <jaxb-api.version>2.3.1</jaxb-api.version>
+
<rocketmq.schema.registry.version>0.0.4-SNAPSHOT</rocketmq.schema.registry.version>
</properties>
<dependencies>
@@ -83,6 +84,11 @@
<artifactId>flink-queryable-state-runtime</artifactId>
<version>${flink.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-avro</artifactId>
+ <version>${flink.version}</version>
+ </dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
@@ -104,12 +110,27 @@
</exclusion>
</exclusions>
</dependency>
+ <dependency>
+ <groupId>org.apache.rocketmq</groupId>
+ <artifactId>rocketmq-namesrv</artifactId>
+ <version>${rocketmq.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.rocketmq</groupId>
+ <artifactId>rocketmq-broker</artifactId>
+ <version>${rocketmq.version}</version>
+ </dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-test</artifactId>
<version>${rocketmq.version}</version>
</dependency>
-
+ <dependency>
+ <groupId>org.apache.rocketmq</groupId>
+ <artifactId>schema-registry-client</artifactId>
+ <version>${rocketmq.schema.registry.version}</version>
+ </dependency>
+
<dependency>
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId>
@@ -135,18 +156,7 @@
<version>1.5.5</version>
<scope>test</scope>
</dependency>
- <dependency>
- <groupId>org.apache.rocketmq</groupId>
- <artifactId>rocketmq-namesrv</artifactId>
- <version>${rocketmq.version}</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.rocketmq</groupId>
- <artifactId>rocketmq-broker</artifactId>
- <version>${rocketmq.version}</version>
- <scope>test</scope>
- </dependency>
+
</dependencies>
<build>
diff --git
a/src/main/java/org/apache/rocketmq/flink/catalog/RocketMQCatalog.java
b/src/main/java/org/apache/rocketmq/flink/catalog/RocketMQCatalog.java
new file mode 100644
index 0000000..3636995
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/flink/catalog/RocketMQCatalog.java
@@ -0,0 +1,491 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.flink.catalog;
+
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.admin.TopicStatsTable;
+import org.apache.rocketmq.flink.common.constant.RocketMqCatalogConstant;
+import org.apache.rocketmq.remoting.protocol.LanguageCode;
+import org.apache.rocketmq.schema.registry.client.SchemaRegistryClient;
+import org.apache.rocketmq.schema.registry.client.SchemaRegistryClientFactory;
+import org.apache.rocketmq.schema.registry.common.dto.GetSchemaResponse;
+import org.apache.rocketmq.schema.registry.common.model.SchemaType;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+
+import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.catalog.AbstractCatalog;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.CatalogDatabaseImpl;
+import org.apache.flink.table.catalog.CatalogFunction;
+import org.apache.flink.table.catalog.CatalogPartition;
+import org.apache.flink.table.catalog.CatalogPartitionImpl;
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
+import
org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException;
+import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
+import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException;
+import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
+import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.factories.Factory;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.FieldsDataType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.utils.TypeConversions;
+
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.StringUtils.isNullOrWhitespaceOnly;
+
+/** A catalog implementation for RocketMQ. */
+public class RocketMQCatalog extends AbstractCatalog {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(RocketMQCatalog.class);
+ public static final String DEFAULT_DB = "default";
+ public final String namesrvAddr;
+ private final String schemaRegistryUrl;
+ private DefaultMQAdminExt mqAdminExt;
+ private SchemaRegistryClient schemaRegistryClient;
+
+ public RocketMQCatalog(
+ String catalogName, String database, String namesrvAddr, String
schemaRegistryUrl) {
+ super(catalogName, database);
+
+ checkArgument(!isNullOrWhitespaceOnly(namesrvAddr), "namesrvAddr
cannot be null or empty");
+ checkArgument(
+ !isNullOrWhitespaceOnly(schemaRegistryUrl),
+ "schemaRegistryUrl cannot be null or empty");
+
+ this.namesrvAddr = namesrvAddr;
+ this.schemaRegistryUrl = schemaRegistryUrl;
+ LOG.info("Created RocketMQ Catalog {}", catalogName);
+ }
+
+ @Override
+ public Optional<Factory> getFactory() {
+ return Optional.of(new RocketMQCatalogFactory());
+ }
+
+ @Override
+ public void open() throws CatalogException {
+ if (mqAdminExt == null) {
+ try {
+ mqAdminExt = new DefaultMQAdminExt();
+ mqAdminExt.setNamesrvAddr(namesrvAddr);
+ mqAdminExt.setLanguage(LanguageCode.JAVA);
+ mqAdminExt.start();
+ } catch (MQClientException e) {
+ throw new CatalogException(
+ "Failed to create RocketMQ admin using :" +
namesrvAddr, e);
+ }
+ }
+ if (schemaRegistryClient == null) {
+ schemaRegistryClient =
SchemaRegistryClientFactory.newClient(schemaRegistryUrl, null);
+ }
+ }
+
+ @Override
+ public void close() throws CatalogException {
+ if (Objects.nonNull(mqAdminExt)) {
+ mqAdminExt.shutdown();
+ mqAdminExt = null;
+ }
+ if (Objects.nonNull(schemaRegistryClient)) {
+ schemaRegistryClient = null;
+ }
+ }
+
+ @Override
+ public List<String> listDatabases() throws CatalogException {
+ return Collections.singletonList(getDefaultDatabase());
+ }
+
+ @Override
+ public CatalogDatabase getDatabase(String databaseName)
+ throws DatabaseNotExistException, CatalogException {
+ if (StringUtils.isEmpty(databaseName)) {
+ throw new CatalogException("Database name can not be null or
empty.");
+ }
+ if (!databaseExists(databaseName)) {
+ throw new DatabaseNotExistException(getName(), databaseName);
+ } else {
+ return new CatalogDatabaseImpl(new HashMap<>(), null);
+ }
+ }
+
+ @Override
+ public boolean databaseExists(String databaseName) throws CatalogException
{
+ return getDefaultDatabase().equals(databaseName);
+ }
+
+ @Override
+ public List<String> listTables(String databaseName)
+ throws DatabaseNotExistException, CatalogException {
+ if (!getDefaultDatabase().equals(databaseName)) {
+ throw new DatabaseNotExistException(getName(), databaseName);
+ }
+ try {
+ List<String> tables =
schemaRegistryClient.getSubjectsByTenant("default", "default");
+ return tables;
+ } catch (Exception e) {
+ throw new CatalogException(
+ String.format(
+ "Failed to get topics of namespace %s from schema
registry client.",
+ databaseName),
+ e);
+ }
+ }
+
+ @Override
+ public CatalogBaseTable getTable(ObjectPath tablePath)
+ throws TableNotExistException, CatalogException {
+ if (!tableExists(tablePath)) {
+ throw new TableNotExistException(getName(), tablePath);
+ }
+ String subject = tablePath.getObjectName();
+ try {
+ GetSchemaResponse getSchemaResponse =
schemaRegistryClient.getSchemaBySubject(subject);
+ if (getSchemaResponse.getType() != SchemaType.AVRO) {
+ throw new CatalogException("Only support avro schema.");
+ }
+ return getCatalogTableForSchema(subject, getSchemaResponse);
+ } catch (Exception e) {
+ throw new CatalogException(
+ String.format(
+ "Failed to get schema of table %s from schema
registry client.",
+ tablePath.getFullName()),
+ e);
+ }
+ }
+
+ private CatalogTable getCatalogTableForSchema(
+ String topic, GetSchemaResponse getSchemaResponse) {
+ DataType dataType =
AvroSchemaConverter.convertToDataType(getSchemaResponse.getIdl());
+ Schema.Builder builder = Schema.newBuilder();
+ if (dataType instanceof FieldsDataType) {
+ FieldsDataType fieldsDataType = (FieldsDataType) dataType;
+ RowType rowType = (RowType) fieldsDataType.getLogicalType();
+ for (RowType.RowField field : rowType.getFields()) {
+ DataType toDataType =
TypeConversions.fromLogicalToDataType(field.getType());
+ builder.column(field.getName(), toDataType);
+ }
+ }
+ Schema schema = builder.build();
+ Map<String, String> options = new HashMap<>();
+ options.put(RocketMqCatalogConstant.CONNECTOR,
RocketMqCatalogConstant.ROCKETMQ_CONNECTOR);
+ options.put(RocketMqCatalogConstant.TOPIC, topic);
+ options.put(RocketMqCatalogConstant.NAME_SERVER_ADDRESS,
mqAdminExt.getNamesrvAddr());
+ return CatalogTable.of(schema, null, Collections.emptyList(), options);
+ }
+
+ @Override
+ public boolean tableExists(ObjectPath tablePath) throws CatalogException {
+ if (!getDefaultDatabase().equals(tablePath.getDatabaseName())) {
+ throw new CatalogException("Database name is not default.");
+ }
+ if (StringUtils.isEmpty(tablePath.getObjectName())) {
+ return false;
+ }
+ String subject = tablePath.getObjectName();
+ try {
+ GetSchemaResponse getSchemaResponse =
schemaRegistryClient.getSchemaBySubject(subject);
+ if (Objects.nonNull(getSchemaResponse)) {
+ return true;
+ }
+ } catch (Exception e) {
+ throw new CatalogException(
+ String.format(
+ "Failed to get schema of table %s from schema
registry client.",
+ tablePath.getFullName()),
+ e);
+ }
+ return false;
+ }
+
+ @Override
+ public List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath)
+ throws TableNotExistException, TableNotPartitionedException,
CatalogException {
+ checkNotNull(tablePath, "Table path cannot be null");
+
+ try {
+ TopicStatsTable topicStatsTable =
+ mqAdminExt.examineTopicStats(tablePath.getObjectName());
+ return topicStatsTable.getOffsetTable().keySet().stream()
+ .map(
+ topicOffset ->
+ new CatalogPartitionSpec(
+ new HashMap<String, String>(1) {
+ {
+ String queueId =
+ String.valueOf(
+
topicOffset.getQueueId());
+ put("__queue_id__",
queueId);
+ }
+ }))
+ .collect(Collectors.toList());
+ } catch (Exception e) {
+ throw new CatalogException(
+ String.format(
+ "Failed to list partitions of table %s by
defaultMQAdminExt.",
+ tablePath.getFullName()),
+ e);
+ }
+ }
+
+ @Override
+ public List<CatalogPartitionSpec> listPartitions(
+ ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
+ throws TableNotExistException, TableNotPartitionedException,
CatalogException {
+ return listPartitions(tablePath);
+ }
+
+ @Override
+ public CatalogPartition getPartition(ObjectPath tablePath,
CatalogPartitionSpec partitionSpec)
+ throws PartitionNotExistException, CatalogException {
+ return new CatalogPartitionImpl(partitionSpec.getPartitionSpec(),
null);
+ }
+
+ @Override
+ public boolean partitionExists(ObjectPath tablePath, CatalogPartitionSpec
partitionSpec)
+ throws CatalogException {
+ try {
+ List<CatalogPartitionSpec> catalogPartitionSpecs =
listPartitions(tablePath);
+ return catalogPartitionSpecs.contains(partitionSpec);
+ } catch (Exception e) {
+ throw new CatalogException(
+ String.format(
+ "Failed to judge partition %s of table %s exists
by defaultMQAdminExt.",
+ partitionSpec, tablePath.getFullName()),
+ e);
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // Unsupported catalog operations for RocketMQ
+ // There should not be such permission in the connector, it is very
dangerous
+ // ------------------------------------------------------------------------
+
+ @Override
+ public void createDatabase(String name, CatalogDatabase database, boolean
ignoreIfExists)
+ throws DatabaseAlreadyExistException, CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void dropDatabase(String name, boolean ignoreIfNotExists, boolean
cascade)
+ throws DatabaseNotExistException, DatabaseNotEmptyException,
CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void createTable(ObjectPath tablePath, CatalogBaseTable table,
boolean ignoreIfExists)
+ throws TableAlreadyExistException, DatabaseNotExistException,
CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists)
+ throws TableNotExistException, CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public List<String> listFunctions(String dbName)
+ throws DatabaseNotExistException, CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public CatalogFunction getFunction(ObjectPath functionPath)
+ throws FunctionNotExistException, CatalogException {
+ throw new FunctionNotExistException("Not support to find functions.",
functionPath);
+ }
+
+ @Override
+ public boolean functionExists(ObjectPath functionPath) throws
CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void createFunction(
+ ObjectPath functionPath, CatalogFunction function, boolean
ignoreIfExists)
+ throws FunctionAlreadyExistException, DatabaseNotExistException,
CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void alterFunction(
+ ObjectPath functionPath, CatalogFunction newFunction, boolean
ignoreIfNotExists)
+ throws FunctionNotExistException, CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void dropFunction(ObjectPath functionPath, boolean
ignoreIfNotExists)
+ throws FunctionNotExistException, CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void alterDatabase(String name, CatalogDatabase newDatabase,
boolean ignoreIfNotExists)
+ throws DatabaseNotExistException, CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public List<String> listViews(String databaseName)
+ throws DatabaseNotExistException, CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void alterTable(
+ ObjectPath tablePath, CatalogBaseTable newTable, boolean
ignoreIfNotExists)
+ throws TableNotExistException, CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void renameTable(ObjectPath tablePath, String newTableName, boolean
ignoreIfNotExists)
+ throws TableNotExistException, TableAlreadyExistException,
CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public List<CatalogPartitionSpec> listPartitionsByFilter(
+ ObjectPath tablePath, List<Expression> expressions)
+ throws TableNotExistException, TableNotPartitionedException,
CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void createPartition(
+ ObjectPath tablePath,
+ CatalogPartitionSpec partitionSpec,
+ CatalogPartition partition,
+ boolean ignoreIfExists)
+ throws TableNotExistException, TableNotPartitionedException,
+ PartitionSpecInvalidException,
PartitionAlreadyExistsException,
+ CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void dropPartition(
+ ObjectPath tablePath, CatalogPartitionSpec partitionSpec, boolean
ignoreIfNotExists)
+ throws PartitionNotExistException, CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void alterPartition(
+ ObjectPath tablePath,
+ CatalogPartitionSpec partitionSpec,
+ CatalogPartition newPartition,
+ boolean ignoreIfNotExists)
+ throws PartitionNotExistException, CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public CatalogTableStatistics getTableStatistics(ObjectPath tablePath)
+ throws TableNotExistException, CatalogException {
+ return CatalogTableStatistics.UNKNOWN;
+ }
+
+ @Override
+ public CatalogColumnStatistics getTableColumnStatistics(ObjectPath
tablePath)
+ throws TableNotExistException, CatalogException {
+ return CatalogColumnStatistics.UNKNOWN;
+ }
+
+ @Override
+ public CatalogTableStatistics getPartitionStatistics(
+ ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
+ throws PartitionNotExistException, CatalogException {
+ return CatalogTableStatistics.UNKNOWN;
+ }
+
+ @Override
+ public CatalogColumnStatistics getPartitionColumnStatistics(
+ ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
+ throws PartitionNotExistException, CatalogException {
+ return CatalogColumnStatistics.UNKNOWN;
+ }
+
+ @Override
+ public void alterTableStatistics(
+ ObjectPath tablePath, CatalogTableStatistics tableStatistics,
boolean ignoreIfNotExists)
+ throws TableNotExistException, CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void alterTableColumnStatistics(
+ ObjectPath tablePath,
+ CatalogColumnStatistics columnStatistics,
+ boolean ignoreIfNotExists)
+ throws TableNotExistException, CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void alterPartitionStatistics(
+ ObjectPath tablePath,
+ CatalogPartitionSpec partitionSpec,
+ CatalogTableStatistics partitionStatistics,
+ boolean ignoreIfNotExists)
+ throws PartitionNotExistException, CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void alterPartitionColumnStatistics(
+ ObjectPath tablePath,
+ CatalogPartitionSpec partitionSpec,
+ CatalogColumnStatistics columnStatistics,
+ boolean ignoreIfNotExists)
+ throws PartitionNotExistException, CatalogException {
+ throw new UnsupportedOperationException();
+ }
+}
diff --git
a/src/main/java/org/apache/rocketmq/flink/catalog/RocketMQCatalogFactory.java
b/src/main/java/org/apache/rocketmq/flink/catalog/RocketMQCatalogFactory.java
new file mode 100644
index 0000000..37d0b3c
--- /dev/null
+++
b/src/main/java/org/apache/rocketmq/flink/catalog/RocketMQCatalogFactory.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.flink.catalog;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.factories.CatalogFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+import static
org.apache.rocketmq.flink.catalog.RocketMQCatalogFactoryOptions.DEFAULT_DATABASE;
+import static
org.apache.rocketmq.flink.catalog.RocketMQCatalogFactoryOptions.IDENTIFIER;
+import static
org.apache.rocketmq.flink.catalog.RocketMQCatalogFactoryOptions.NAME_SERVER_ADDR;
+import static
org.apache.rocketmq.flink.catalog.RocketMQCatalogFactoryOptions.SCHEMA_REGISTRY_BASE_URL;
+
+/** The {@CatalogFactory} implementation of RocketMQ. */
+public class RocketMQCatalogFactory implements CatalogFactory {
+
+ @Override
+ public Catalog createCatalog(Context context) {
+ final FactoryUtil.CatalogFactoryHelper helper =
+ FactoryUtil.createCatalogFactoryHelper(this, context);
+ helper.validate();
+ return new RocketMQCatalog(
+ context.getName(),
+ helper.getOptions().get(DEFAULT_DATABASE),
+ helper.getOptions().get(NAME_SERVER_ADDR),
+ helper.getOptions().get(SCHEMA_REGISTRY_BASE_URL));
+ }
+
+ @Override
+ public String factoryIdentifier() {
+ return IDENTIFIER;
+ }
+
+ @Override
+ public Set<ConfigOption<?>> requiredOptions() {
+ return Collections.emptySet();
+ }
+
+ @Override
+ public Set<ConfigOption<?>> optionalOptions() {
+ Set<ConfigOption<?>> options = new HashSet<>();
+ options.add(DEFAULT_DATABASE);
+ return options;
+ }
+}
diff --git
a/src/main/java/org/apache/rocketmq/flink/catalog/RocketMQCatalogFactoryOptions.java
b/src/main/java/org/apache/rocketmq/flink/catalog/RocketMQCatalogFactoryOptions.java
new file mode 100644
index 0000000..25226b2
--- /dev/null
+++
b/src/main/java/org/apache/rocketmq/flink/catalog/RocketMQCatalogFactoryOptions.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.flink.catalog;
+
+import org.apache.rocketmq.flink.common.constant.SchemaRegistryConstant;
+import org.apache.rocketmq.flink.legacy.RocketMQConfig;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.table.catalog.CommonCatalogOptions;
+
+/** {@link ConfigOption}s for {@link RocketMQCatalog}. */
+@Internal
+public final class RocketMQCatalogFactoryOptions {
+
+ public static final String IDENTIFIER = "rocketmq-catalog";
+
+ public static final ConfigOption<String> DEFAULT_DATABASE =
+ ConfigOptions.key(CommonCatalogOptions.DEFAULT_DATABASE_KEY)
+ .stringType()
+ .defaultValue(RocketMQCatalog.DEFAULT_DB);
+
+ public static final ConfigOption<String> NAME_SERVER_ADDR =
+ ConfigOptions.key(RocketMQConfig.NAME_SERVER_ADDR)
+ .stringType()
+ .defaultValue("http://localhost:9876")
+ .withDescription("Required rocketmq name server address");
+
+ public static final ConfigOption<String> SCHEMA_REGISTRY_BASE_URL =
+
ConfigOptions.key(SchemaRegistryConstant.SCHEMA_REGISTRY_BASE_URL_KEY)
+ .stringType()
+
.defaultValue(SchemaRegistryConstant.SCHEMA_REGISTRY_BASE_URL)
+ .withDescription("Required schema registry server
address");
+
+ private RocketMQCatalogFactoryOptions() {}
+}
diff --git
a/src/main/java/org/apache/rocketmq/flink/common/constant/RocketMqCatalogConstant.java
b/src/main/java/org/apache/rocketmq/flink/common/constant/RocketMqCatalogConstant.java
new file mode 100644
index 0000000..be3d9da
--- /dev/null
+++
b/src/main/java/org/apache/rocketmq/flink/common/constant/RocketMqCatalogConstant.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.flink.common.constant;
+
+/** RocketMqCatalogConstant. */
+public class RocketMqCatalogConstant {
+ public static final String CONNECTOR = "connector";
+
+ public static final String TOPIC = "topic";
+ public static final String NAME_SERVER_ADDRESS = "nameServerAddress";
+ public static final String ROCKETMQ_CONNECTOR = "rocketmq";
+}
diff --git
a/src/main/java/org/apache/rocketmq/flink/common/constant/SchemaRegistryConstant.java
b/src/main/java/org/apache/rocketmq/flink/common/constant/SchemaRegistryConstant.java
new file mode 100644
index 0000000..8584cb3
--- /dev/null
+++
b/src/main/java/org/apache/rocketmq/flink/common/constant/SchemaRegistryConstant.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.flink.common.constant;
+
+/** SchemaRegistryConstant. */
+public class SchemaRegistryConstant {
+
+ public static final String SCHEMA_REGISTRY_BASE_URL =
"http://localhost:8080";
+
+ public static final String SCHEMA_REGISTRY_BASE_URL_KEY =
"schema.registry.base.url";
+}
diff --git
a/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
b/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
index b164722..9e866fc 100644
---
a/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
+++
b/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -14,4 +14,5 @@
# limitations under the License.
org.apache.rocketmq.flink.source.table.RocketMQDynamicTableSourceFactory
-org.apache.rocketmq.flink.sink.table.RocketMQDynamicTableSinkFactory
\ No newline at end of file
+org.apache.rocketmq.flink.sink.table.RocketMQDynamicTableSinkFactory
+org.apache.rocketmq.flink.catalog.RocketMQCatalogFactory
diff --git
a/src/test/java/org/apache/rocketmq/flink/catalog/RocketMQCatalogFactoryTest.java
b/src/test/java/org/apache/rocketmq/flink/catalog/RocketMQCatalogFactoryTest.java
new file mode 100644
index 0000000..ad595d7
--- /dev/null
+++
b/src/test/java/org/apache/rocketmq/flink/catalog/RocketMQCatalogFactoryTest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.flink.catalog;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.factories.FactoryUtil;
+
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+public class RocketMQCatalogFactoryTest {
+
+ @Test
+ public void testCreateCatalog() {
+ RocketMQCatalogFactory factory = new RocketMQCatalogFactory();
+ FactoryUtil.DefaultCatalogContext context =
+ new FactoryUtil.DefaultCatalogContext(
+ "rocketmq-catalog",
+ new HashMap<>(),
+ null,
+ this.getClass().getClassLoader());
+ Catalog catalog = factory.createCatalog(context);
+ assertNotNull(catalog);
+ }
+
+ @Test
+ public void testFactoryIdentifier() {
+ RocketMQCatalogFactory factory = new RocketMQCatalogFactory();
+ assertEquals(factory.factoryIdentifier(), "rocketmq-catalog");
+ }
+
+ @Test
+ public void testRequiredOptions() {
+ RocketMQCatalogFactory factory = new RocketMQCatalogFactory();
+ Set<ConfigOption<?>> options = factory.requiredOptions();
+ assertNotNull(options);
+ }
+
+ @Test
+ public void testOptionalOptions() {
+ RocketMQCatalogFactory factory = new RocketMQCatalogFactory();
+ Set<ConfigOption<?>> options = factory.optionalOptions();
+ assertEquals(options.size(), 1);
+ }
+}
diff --git
a/src/test/java/org/apache/rocketmq/flink/catalog/RocketMQCatalogTest.java
b/src/test/java/org/apache/rocketmq/flink/catalog/RocketMQCatalogTest.java
new file mode 100644
index 0000000..4a36c77
--- /dev/null
+++ b/src/test/java/org/apache/rocketmq/flink/catalog/RocketMQCatalogTest.java
@@ -0,0 +1,398 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.flink.catalog;
+
+import org.apache.rocketmq.common.admin.TopicOffset;
+import org.apache.rocketmq.common.admin.TopicStatsTable;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.flink.common.constant.SchemaRegistryConstant;
+import org.apache.rocketmq.schema.registry.client.SchemaRegistryClient;
+import org.apache.rocketmq.schema.registry.common.dto.GetSchemaResponse;
+import org.apache.rocketmq.schema.registry.common.model.SchemaType;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.CatalogPartition;
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
+import
org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException;
+import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
+import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException;
+import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
+import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
+import org.apache.flink.table.factories.Factory;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.runners.MockitoJUnitRunner;
+
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Optional;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+@RunWith(MockitoJUnitRunner.class)
+public class RocketMQCatalogTest {
+ @Mock private SchemaRegistryClient schemaRegistryClient;
+ @Mock private DefaultMQAdminExt mqAdminExt;
+ @Mock private GetSchemaResponse getSchemaResponse;
+ private RocketMQCatalog rocketMQCatalog;
+
+ @Before
+ public void setUp() throws Exception {
+ rocketMQCatalog =
+ new RocketMQCatalog(
+ "rocketmq-catalog",
+ "default",
+ "http://localhost:9876",
+ SchemaRegistryConstant.SCHEMA_REGISTRY_BASE_URL);
+
+ Field schemaRegistryClientField =
+
rocketMQCatalog.getClass().getDeclaredField("schemaRegistryClient");
+ schemaRegistryClientField.setAccessible(true);
+ schemaRegistryClientField.set(rocketMQCatalog, schemaRegistryClient);
+
+ Field mqAdminExtField =
rocketMQCatalog.getClass().getDeclaredField("mqAdminExt");
+ mqAdminExtField.setAccessible(true);
+ mqAdminExtField.set(rocketMQCatalog, mqAdminExt);
+
+ List<String> list = new ArrayList();
+ list.add("test");
+ Mockito.when(schemaRegistryClient.getSubjectsByTenant("default",
"default"))
+ .thenReturn(list);
+
+ Mockito.when(mqAdminExt.getNamesrvAddr()).thenReturn("localhost:9876");
+
Mockito.when(schemaRegistryClient.getSchemaBySubject("test")).thenReturn(getSchemaResponse);
+ Mockito.when(getSchemaResponse.getType()).thenReturn(SchemaType.AVRO);
+ Mockito.when(getSchemaResponse.getIdl())
+ .thenReturn(
+ "{\"type\":\"record\",\"name\":\"Charge\","
+ +
"\"namespace\":\"org.apache.rocketmq.schema.registry.example.serde\",\"fields\":[{\"name\":\"item\","
+ +
"\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"}]}");
+
+ TopicStatsTable topicStatsTable = new TopicStatsTable();
+ topicStatsTable.setOffsetTable(
+ new HashMap<MessageQueue, TopicOffset>(2) {
+ {
+ put(new MessageQueue("test", "default", 0), new
TopicOffset());
+ put(new MessageQueue("test", "default", 1), new
TopicOffset());
+ }
+ });
+
+
Mockito.when(mqAdminExt.examineTopicStats("test")).thenReturn(topicStatsTable);
+ }
+
+ @Test
+ public void testGetFactory() {
+ Optional<Factory> factory = rocketMQCatalog.getFactory();
+ assertNotNull(factory.get());
+ }
+
+ @Test
+ public void testOpen() throws NoSuchFieldException, IllegalAccessException
{
+ rocketMQCatalog.open();
+
+ Class<? extends RocketMQCatalog> aClass = rocketMQCatalog.getClass();
+ Field mqAdminExtField = aClass.getDeclaredField("mqAdminExt");
+ mqAdminExtField.setAccessible(true);
+ Field schemaRegistryClientField =
aClass.getDeclaredField("schemaRegistryClient");
+ schemaRegistryClientField.setAccessible(true);
+
+ Object mqAdminExt = mqAdminExtField.get(rocketMQCatalog);
+ Object schemaRegistryClient =
schemaRegistryClientField.get(rocketMQCatalog);
+ assertNotNull(mqAdminExt);
+ assertNotNull(schemaRegistryClient);
+ }
+
+ @Test
+ public void testClose() throws NoSuchFieldException,
IllegalAccessException {
+ rocketMQCatalog.close();
+
+ Class<? extends RocketMQCatalog> aClass = rocketMQCatalog.getClass();
+ Field mqAdminExtField = aClass.getDeclaredField("mqAdminExt");
+ mqAdminExtField.setAccessible(true);
+ Field schemaRegistryClientField =
aClass.getDeclaredField("schemaRegistryClient");
+ schemaRegistryClientField.setAccessible(true);
+
+ Object mqAdminExt = mqAdminExtField.get(rocketMQCatalog);
+ Object schemaRegistryClient =
schemaRegistryClientField.get(rocketMQCatalog);
+ assertNull(schemaRegistryClient);
+ }
+
+ @Test
+ public void testListDatabases() {
+ List<String> strings = rocketMQCatalog.listDatabases();
+ assertEquals(1, strings.size());
+ assertEquals("default", strings.get(0));
+ }
+
+ @Test
+ public void testGetDatabase() throws DatabaseNotExistException {
+ CatalogDatabase database = rocketMQCatalog.getDatabase("default");
+ assertNotNull(database);
+ }
+
+ @Test
+ public void testDatabaseExists() {
+ boolean exists = rocketMQCatalog.databaseExists("default");
+ assertTrue(exists);
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void testCreateDatabase() throws DatabaseAlreadyExistException {
+ rocketMQCatalog.createDatabase("test", null, false);
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void testDropDatabase() throws DatabaseNotEmptyException,
DatabaseNotExistException {
+ rocketMQCatalog.dropDatabase("test", false, false);
+ }
+
+ @Test
+ public void testListTables() throws DatabaseNotExistException {
+ List<String> strings = rocketMQCatalog.listTables("default");
+ assertEquals(1, strings.size());
+ assertEquals("test", strings.get(0));
+ }
+
+ @Test
+ public void testGetTable() throws TableNotExistException {
+ ObjectPath objectPath = new ObjectPath("default", "test");
+ CatalogBaseTable catalogBaseTable =
rocketMQCatalog.getTable(objectPath);
+ assertNotNull(catalogBaseTable);
+ }
+
+ @Test
+ public void testTableExists() {
+ ObjectPath objectPath = new ObjectPath("default", "test");
+ boolean exists = rocketMQCatalog.tableExists(objectPath);
+ assertTrue(exists);
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void testCreateTable() throws TableAlreadyExistException,
DatabaseNotExistException {
+ rocketMQCatalog.createTable(null, null, false);
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void testDropTable() throws TableNotExistException {
+ rocketMQCatalog.dropTable(null, false);
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void testListFunctions() throws DatabaseNotExistException {
+ rocketMQCatalog.listFunctions("default");
+ }
+
+ @Test(expected = FunctionNotExistException.class)
+ public void testGetFunction() throws FunctionNotExistException {
+ ObjectPath objectPath = new ObjectPath("default", "test");
+ rocketMQCatalog.getFunction(objectPath);
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void testFunctionExists() {
+ boolean exists = rocketMQCatalog.functionExists(null);
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void testCreateFunction()
+ throws FunctionAlreadyExistException, DatabaseNotExistException {
+ rocketMQCatalog.createFunction(null, null, false);
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void testAlterFunction() throws FunctionNotExistException {
+ rocketMQCatalog.alterFunction(null, null, false);
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void testDropFunction() throws FunctionNotExistException {
+ rocketMQCatalog.dropFunction(null, false);
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void testAlterDatabase() throws DatabaseNotExistException {
+ rocketMQCatalog.alterDatabase(null, null, false);
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void testListViews() throws DatabaseNotExistException {
+ rocketMQCatalog.listViews("default");
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void testAlterTable() throws TableNotExistException {
+ rocketMQCatalog.alterTable(null, null, false);
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void testRenameTable() throws TableAlreadyExistException,
TableNotExistException {
+ rocketMQCatalog.renameTable(null, null, false);
+ }
+
+ @Test
+ public void testListPartitions() throws TableNotPartitionedException,
TableNotExistException {
+ List<CatalogPartitionSpec> catalogPartitionSpecs =
+ rocketMQCatalog.listPartitions(new ObjectPath("default",
"test"));
+ assertEquals(2, catalogPartitionSpecs.size());
+ assertEquals(
+ new ArrayList<CatalogPartitionSpec>() {
+ {
+ add(
+ new CatalogPartitionSpec(
+ new HashMap<String, String>(1) {
+ {
+ put("__queue_id__",
String.valueOf(0));
+ }
+ }));
+ add(
+ new CatalogPartitionSpec(
+ new HashMap<String, String>(1) {
+ {
+ put("__queue_id__",
String.valueOf(1));
+ }
+ }));
+ }
+ },
+ catalogPartitionSpecs);
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void testListPartitionsByFilter()
+ throws TableNotPartitionedException, TableNotExistException {
+ rocketMQCatalog.listPartitionsByFilter(null, null);
+ }
+
+ @Test
+ public void testGetPartition() throws PartitionNotExistException {
+ ObjectPath objectPath = new ObjectPath("default", "test");
+ CatalogPartition partition =
+ rocketMQCatalog.getPartition(
+ objectPath,
+ new CatalogPartitionSpec(
+ new HashMap<String, String>(1) {
+ {
+ put("__queue_id__", String.valueOf(0));
+ }
+ }));
+
+ assertEquals(
+ new HashMap<String, String>(1) {
+ {
+ put("__queue_id__", String.valueOf(0));
+ }
+ },
+ partition.getProperties());
+ }
+
+ @Test
+ public void testPartitionExists() {
+ ObjectPath objectPath = new ObjectPath("default", "test");
+ boolean test =
+ rocketMQCatalog.partitionExists(
+ objectPath,
+ new CatalogPartitionSpec(
+ new HashMap<String, String>(1) {
+ {
+ put("__queue_id__", String.valueOf(0));
+ }
+ }));
+ assertNotNull(test);
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void testCreatePartition()
+ throws TableNotPartitionedException, TableNotExistException,
+ PartitionSpecInvalidException,
PartitionAlreadyExistsException {
+ rocketMQCatalog.createPartition(null, null, null, false);
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void testDropPartition() throws PartitionNotExistException {
+ rocketMQCatalog.dropPartition(null, null, false);
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void testAlterPartition() throws PartitionNotExistException {
+ rocketMQCatalog.alterPartition(null, null, null, false);
+ }
+
+ @Test
+ public void testGetTableStatistics() throws TableNotExistException {
+ CatalogTableStatistics statistics =
rocketMQCatalog.getTableStatistics(null);
+ assertEquals(statistics, CatalogTableStatistics.UNKNOWN);
+ }
+
+ @Test
+ public void testGetTableColumnStatistics() throws TableNotExistException {
+ CatalogColumnStatistics statistics =
rocketMQCatalog.getTableColumnStatistics(null);
+ assertEquals(statistics, CatalogColumnStatistics.UNKNOWN);
+ }
+
+ @Test
+ public void testGetPartitionStatistics() throws PartitionNotExistException
{
+ CatalogTableStatistics statistics =
rocketMQCatalog.getPartitionStatistics(null, null);
+ assertEquals(statistics, CatalogTableStatistics.UNKNOWN);
+ }
+
+ @Test
+ public void testGetPartitionColumnStatistics() throws
PartitionNotExistException {
+ CatalogColumnStatistics statistics =
+ rocketMQCatalog.getPartitionColumnStatistics(null, null);
+ assertEquals(statistics, CatalogColumnStatistics.UNKNOWN);
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void testAlterTableStatistics() throws TableNotExistException {
+ rocketMQCatalog.alterTableStatistics(null, null, false);
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void testAlterTableColumnStatistics() throws TableNotExistException
{
+ rocketMQCatalog.alterTableColumnStatistics(null, null, false);
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void testAlterPartitionStatistics() throws
PartitionNotExistException {
+ rocketMQCatalog.alterPartitionStatistics(null, null, null, false);
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void testAlterPartitionColumnStatistics() throws
PartitionNotExistException {
+ rocketMQCatalog.alterPartitionColumnStatistics(null, null, null,
false);
+ }
+}