This is an automated email from the ASF dual-hosted git repository. nicholasjiang pushed a commit to branch revert-58-feature_catalog_fix in repository https://gitbox.apache.org/repos/asf/rocketmq-flink.git
commit 9591c0c67fe092f158f482add411f7be2c417f6d Author: Nicholas Jiang <[email protected]> AuthorDate: Thu Sep 29 20:56:02 2022 +0800 Revert "Fix some problems with the catalog (#58)" This reverts commit 6909cbc5d2c3c6a0590d5a7bd32bd9f45cf18f97. --- .../org/apache/rocketmq/flink/catalog/RocketMQCatalogFactory.java | 2 -- .../rocketmq/flink/catalog/RocketMQCatalogFactoryOptions.java | 2 +- .../apache/rocketmq/flink/sink/table/RocketMQDynamicTableSink.java | 6 ++---- 3 files changed, 3 insertions(+), 7 deletions(-) diff --git a/src/main/java/org/apache/rocketmq/flink/catalog/RocketMQCatalogFactory.java b/src/main/java/org/apache/rocketmq/flink/catalog/RocketMQCatalogFactory.java index 609708f..37d0b3c 100644 --- a/src/main/java/org/apache/rocketmq/flink/catalog/RocketMQCatalogFactory.java +++ b/src/main/java/org/apache/rocketmq/flink/catalog/RocketMQCatalogFactory.java @@ -61,8 +61,6 @@ public class RocketMQCatalogFactory implements CatalogFactory { public Set<ConfigOption<?>> optionalOptions() { Set<ConfigOption<?>> options = new HashSet<>(); options.add(DEFAULT_DATABASE); - options.add(NAME_SERVER_ADDR); - options.add(SCHEMA_REGISTRY_BASE_URL); return options; } } diff --git a/src/main/java/org/apache/rocketmq/flink/catalog/RocketMQCatalogFactoryOptions.java b/src/main/java/org/apache/rocketmq/flink/catalog/RocketMQCatalogFactoryOptions.java index 624539b..25226b2 100644 --- a/src/main/java/org/apache/rocketmq/flink/catalog/RocketMQCatalogFactoryOptions.java +++ b/src/main/java/org/apache/rocketmq/flink/catalog/RocketMQCatalogFactoryOptions.java @@ -30,7 +30,7 @@ import org.apache.flink.table.catalog.CommonCatalogOptions; @Internal public final class RocketMQCatalogFactoryOptions { - public static final String IDENTIFIER = "rocketmq_catalog"; + public static final String IDENTIFIER = "rocketmq-catalog"; public static final ConfigOption<String> DEFAULT_DATABASE = ConfigOptions.key(CommonCatalogOptions.DEFAULT_DATABASE_KEY) diff --git a/src/main/java/org/apache/rocketmq/flink/sink/table/RocketMQDynamicTableSink.java b/src/main/java/org/apache/rocketmq/flink/sink/table/RocketMQDynamicTableSink.java index 366991d..f61cbda 100644 --- a/src/main/java/org/apache/rocketmq/flink/sink/table/RocketMQDynamicTableSink.java +++ b/src/main/java/org/apache/rocketmq/flink/sink/table/RocketMQDynamicTableSink.java @@ -242,10 +242,8 @@ public class RocketMQDynamicTableSink implements DynamicTableSink, SupportsWriti Properties producerProps = new Properties(); producerProps.setProperty(RocketMQConfig.PRODUCER_GROUP, producerGroup); producerProps.setProperty(RocketMQConfig.NAME_SERVER_ADDR, nameServerAddress); - if (accessKey != null && secretKey != null) { - producerProps.setProperty(RocketMQConfig.ACCESS_KEY, accessKey); - producerProps.setProperty(RocketMQConfig.SECRET_KEY, secretKey); - } + producerProps.setProperty(RocketMQConfig.ACCESS_KEY, accessKey); + producerProps.setProperty(RocketMQConfig.SECRET_KEY, secretKey); return producerProps; }
