This is an automated email from the ASF dual-hosted git repository.

nicholasjiang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-flink.git


The following commit(s) were added to refs/heads/main by this push:
     new 460694b  [ISSUE #57] Fix some problems with the catalog (#61)
460694b is described below

commit 460694bf25ee18018760320543bbc14c787d664b
Author: 李晓双 Li Xiao Shuang <[email protected]>
AuthorDate: Fri Sep 30 08:39:56 2022 +0800

    [ISSUE #57] Fix some problems with the catalog (#61)
---
 .../org/apache/rocketmq/flink/catalog/RocketMQCatalogFactory.java   | 2 ++
 .../rocketmq/flink/catalog/RocketMQCatalogFactoryOptions.java       | 2 +-
 .../apache/rocketmq/flink/sink/table/RocketMQDynamicTableSink.java  | 6 ++++--
 .../apache/rocketmq/flink/catalog/RocketMQCatalogFactoryTest.java   | 4 ++--
 .../java/org/apache/rocketmq/flink/catalog/RocketMQCatalogTest.java | 2 +-
 5 files changed, 10 insertions(+), 6 deletions(-)

diff --git 
a/src/main/java/org/apache/rocketmq/flink/catalog/RocketMQCatalogFactory.java 
b/src/main/java/org/apache/rocketmq/flink/catalog/RocketMQCatalogFactory.java
index 37d0b3c..609708f 100644
--- 
a/src/main/java/org/apache/rocketmq/flink/catalog/RocketMQCatalogFactory.java
+++ 
b/src/main/java/org/apache/rocketmq/flink/catalog/RocketMQCatalogFactory.java
@@ -61,6 +61,8 @@ public class RocketMQCatalogFactory implements CatalogFactory 
{
     public Set<ConfigOption<?>> optionalOptions() {
         Set<ConfigOption<?>> options = new HashSet<>();
         options.add(DEFAULT_DATABASE);
+        options.add(NAME_SERVER_ADDR);
+        options.add(SCHEMA_REGISTRY_BASE_URL);
         return options;
     }
 }
diff --git 
a/src/main/java/org/apache/rocketmq/flink/catalog/RocketMQCatalogFactoryOptions.java
 
b/src/main/java/org/apache/rocketmq/flink/catalog/RocketMQCatalogFactoryOptions.java
index 25226b2..624539b 100644
--- 
a/src/main/java/org/apache/rocketmq/flink/catalog/RocketMQCatalogFactoryOptions.java
+++ 
b/src/main/java/org/apache/rocketmq/flink/catalog/RocketMQCatalogFactoryOptions.java
@@ -30,7 +30,7 @@ import org.apache.flink.table.catalog.CommonCatalogOptions;
 @Internal
 public final class RocketMQCatalogFactoryOptions {
 
-    public static final String IDENTIFIER = "rocketmq-catalog";
+    public static final String IDENTIFIER = "rocketmq_catalog";
 
     public static final ConfigOption<String> DEFAULT_DATABASE =
             ConfigOptions.key(CommonCatalogOptions.DEFAULT_DATABASE_KEY)
diff --git 
a/src/main/java/org/apache/rocketmq/flink/sink/table/RocketMQDynamicTableSink.java
 
b/src/main/java/org/apache/rocketmq/flink/sink/table/RocketMQDynamicTableSink.java
index f61cbda..366991d 100644
--- 
a/src/main/java/org/apache/rocketmq/flink/sink/table/RocketMQDynamicTableSink.java
+++ 
b/src/main/java/org/apache/rocketmq/flink/sink/table/RocketMQDynamicTableSink.java
@@ -242,8 +242,10 @@ public class RocketMQDynamicTableSink implements 
DynamicTableSink, SupportsWriti
         Properties producerProps = new Properties();
         producerProps.setProperty(RocketMQConfig.PRODUCER_GROUP, 
producerGroup);
         producerProps.setProperty(RocketMQConfig.NAME_SERVER_ADDR, 
nameServerAddress);
-        producerProps.setProperty(RocketMQConfig.ACCESS_KEY, accessKey);
-        producerProps.setProperty(RocketMQConfig.SECRET_KEY, secretKey);
+        if (accessKey != null && secretKey != null) {
+            producerProps.setProperty(RocketMQConfig.ACCESS_KEY, accessKey);
+            producerProps.setProperty(RocketMQConfig.SECRET_KEY, secretKey);
+        }
         return producerProps;
     }
 
diff --git 
a/src/test/java/org/apache/rocketmq/flink/catalog/RocketMQCatalogFactoryTest.java
 
b/src/test/java/org/apache/rocketmq/flink/catalog/RocketMQCatalogFactoryTest.java
index ad595d7..0766efc 100644
--- 
a/src/test/java/org/apache/rocketmq/flink/catalog/RocketMQCatalogFactoryTest.java
+++ 
b/src/test/java/org/apache/rocketmq/flink/catalog/RocketMQCatalogFactoryTest.java
@@ -48,7 +48,7 @@ public class RocketMQCatalogFactoryTest {
     @Test
     public void testFactoryIdentifier() {
         RocketMQCatalogFactory factory = new RocketMQCatalogFactory();
-        assertEquals(factory.factoryIdentifier(), "rocketmq-catalog");
+        assertEquals(factory.factoryIdentifier(), "rocketmq_catalog");
     }
 
     @Test
@@ -62,6 +62,6 @@ public class RocketMQCatalogFactoryTest {
     public void testOptionalOptions() {
         RocketMQCatalogFactory factory = new RocketMQCatalogFactory();
         Set<ConfigOption<?>> options = factory.optionalOptions();
-        assertEquals(options.size(), 1);
+        assertEquals(options.size(), 3);
     }
 }
diff --git 
a/src/test/java/org/apache/rocketmq/flink/catalog/RocketMQCatalogTest.java 
b/src/test/java/org/apache/rocketmq/flink/catalog/RocketMQCatalogTest.java
index 4a36c77..6801ae8 100644
--- a/src/test/java/org/apache/rocketmq/flink/catalog/RocketMQCatalogTest.java
+++ b/src/test/java/org/apache/rocketmq/flink/catalog/RocketMQCatalogTest.java
@@ -75,7 +75,7 @@ public class RocketMQCatalogTest {
     public void setUp() throws Exception {
         rocketMQCatalog =
                 new RocketMQCatalog(
-                        "rocketmq-catalog",
+                        "rocketmq_catalog",
                         "default",
                         "http://localhost:9876";,
                         SchemaRegistryConstant.SCHEMA_REGISTRY_BASE_URL);

Reply via email to