This is an automated email from the ASF dual-hosted git repository.
nicholasjiang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-flink.git
The following commit(s) were added to refs/heads/main by this push:
new afba019 Revert "Fix some problems with the catalog (#58)" (#60)
afba019 is described below
commit afba019497e090d4ec2301dfd32a0dbd48cae619
Author: Nicholas Jiang <[email protected]>
AuthorDate: Thu Sep 29 20:56:28 2022 +0800
Revert "Fix some problems with the catalog (#58)" (#60)
This reverts commit 6909cbc5d2c3c6a0590d5a7bd32bd9f45cf18f97.
---
.../org/apache/rocketmq/flink/catalog/RocketMQCatalogFactory.java | 2 --
.../rocketmq/flink/catalog/RocketMQCatalogFactoryOptions.java | 2 +-
.../apache/rocketmq/flink/sink/table/RocketMQDynamicTableSink.java | 6 ++----
3 files changed, 3 insertions(+), 7 deletions(-)
diff --git
a/src/main/java/org/apache/rocketmq/flink/catalog/RocketMQCatalogFactory.java
b/src/main/java/org/apache/rocketmq/flink/catalog/RocketMQCatalogFactory.java
index 609708f..37d0b3c 100644
---
a/src/main/java/org/apache/rocketmq/flink/catalog/RocketMQCatalogFactory.java
+++
b/src/main/java/org/apache/rocketmq/flink/catalog/RocketMQCatalogFactory.java
@@ -61,8 +61,6 @@ public class RocketMQCatalogFactory implements CatalogFactory
{
public Set<ConfigOption<?>> optionalOptions() {
Set<ConfigOption<?>> options = new HashSet<>();
options.add(DEFAULT_DATABASE);
- options.add(NAME_SERVER_ADDR);
- options.add(SCHEMA_REGISTRY_BASE_URL);
return options;
}
}
diff --git
a/src/main/java/org/apache/rocketmq/flink/catalog/RocketMQCatalogFactoryOptions.java
b/src/main/java/org/apache/rocketmq/flink/catalog/RocketMQCatalogFactoryOptions.java
index 624539b..25226b2 100644
---
a/src/main/java/org/apache/rocketmq/flink/catalog/RocketMQCatalogFactoryOptions.java
+++
b/src/main/java/org/apache/rocketmq/flink/catalog/RocketMQCatalogFactoryOptions.java
@@ -30,7 +30,7 @@ import org.apache.flink.table.catalog.CommonCatalogOptions;
@Internal
public final class RocketMQCatalogFactoryOptions {
- public static final String IDENTIFIER = "rocketmq_catalog";
+ public static final String IDENTIFIER = "rocketmq-catalog";
public static final ConfigOption<String> DEFAULT_DATABASE =
ConfigOptions.key(CommonCatalogOptions.DEFAULT_DATABASE_KEY)
diff --git
a/src/main/java/org/apache/rocketmq/flink/sink/table/RocketMQDynamicTableSink.java
b/src/main/java/org/apache/rocketmq/flink/sink/table/RocketMQDynamicTableSink.java
index 366991d..f61cbda 100644
---
a/src/main/java/org/apache/rocketmq/flink/sink/table/RocketMQDynamicTableSink.java
+++
b/src/main/java/org/apache/rocketmq/flink/sink/table/RocketMQDynamicTableSink.java
@@ -242,10 +242,8 @@ public class RocketMQDynamicTableSink implements
DynamicTableSink, SupportsWriti
Properties producerProps = new Properties();
producerProps.setProperty(RocketMQConfig.PRODUCER_GROUP,
producerGroup);
producerProps.setProperty(RocketMQConfig.NAME_SERVER_ADDR,
nameServerAddress);
- if (accessKey != null && secretKey != null) {
- producerProps.setProperty(RocketMQConfig.ACCESS_KEY, accessKey);
- producerProps.setProperty(RocketMQConfig.SECRET_KEY, secretKey);
- }
+ producerProps.setProperty(RocketMQConfig.ACCESS_KEY, accessKey);
+ producerProps.setProperty(RocketMQConfig.SECRET_KEY, secretKey);
return producerProps;
}