This is an automated email from the ASF dual-hosted git repository.
nicholasjiang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-flink.git
The following commit(s) were added to refs/heads/main by this push:
new 017db20 [issues-44] support access control (#47)
017db20 is described below
commit 017db201f7bf69ba7461f5520d1e009b7c4d07c9
Author: lizhimins <[email protected]>
AuthorDate: Wed Jul 27 04:49:26 2022 +0800
[issues-44] support access control (#47)
---
.../apache/rocketmq/flink/sink/table/RocketMQDynamicTableSink.java | 4 ++--
.../flink/source/table/RocketMQDynamicTableSourceFactory.java | 4 ++++
.../apache/rocketmq/flink/source/table/RocketMQScanTableSource.java | 4 ++++
3 files changed, 10 insertions(+), 2 deletions(-)
diff --git
a/src/main/java/org/apache/rocketmq/flink/sink/table/RocketMQDynamicTableSink.java
b/src/main/java/org/apache/rocketmq/flink/sink/table/RocketMQDynamicTableSink.java
index 816449f..f61cbda 100644
---
a/src/main/java/org/apache/rocketmq/flink/sink/table/RocketMQDynamicTableSink.java
+++
b/src/main/java/org/apache/rocketmq/flink/sink/table/RocketMQDynamicTableSink.java
@@ -92,10 +92,10 @@ public class RocketMQDynamicTableSink implements
DynamicTableSink, SupportsWriti
properties,
schema,
topic,
- null,
- null,
producerGroup,
nameServerAddress,
+ null,
+ null,
tag,
dynamicColumn,
fieldDelimiter,
diff --git
a/src/main/java/org/apache/rocketmq/flink/source/table/RocketMQDynamicTableSourceFactory.java
b/src/main/java/org/apache/rocketmq/flink/source/table/RocketMQDynamicTableSourceFactory.java
index 4117cc1..e8693f7 100644
---
a/src/main/java/org/apache/rocketmq/flink/source/table/RocketMQDynamicTableSourceFactory.java
+++
b/src/main/java/org/apache/rocketmq/flink/source/table/RocketMQDynamicTableSourceFactory.java
@@ -122,6 +122,8 @@ public class RocketMQDynamicTableSourceFactory implements
DynamicTableSourceFact
long startTimeMs = configuration.getLong(OPTIONAL_START_TIME_MILLS);
String startDateTime = configuration.getString(OPTIONAL_START_TIME);
String timeZone = configuration.getString(OPTIONAL_TIME_ZONE);
+ String accessKey = configuration.getString(OPTIONAL_ACCESS_KEY);
+ String secretKey = configuration.getString(OPTIONAL_SECRET_KEY);
long startTime = startTimeMs;
if (startTime == -1) {
if (!StringUtils.isNullOrWhitespaceOnly(startDateTime)) {
@@ -167,6 +169,8 @@ public class RocketMQDynamicTableSourceFactory implements
DynamicTableSourceFact
topic,
consumerGroup,
nameServerAddress,
+ accessKey,
+ secretKey,
tag,
sql,
stopInMs,
diff --git
a/src/main/java/org/apache/rocketmq/flink/source/table/RocketMQScanTableSource.java
b/src/main/java/org/apache/rocketmq/flink/source/table/RocketMQScanTableSource.java
index e75bfaa..7a96d08 100644
---
a/src/main/java/org/apache/rocketmq/flink/source/table/RocketMQScanTableSource.java
+++
b/src/main/java/org/apache/rocketmq/flink/source/table/RocketMQScanTableSource.java
@@ -149,6 +149,8 @@ public class RocketMQScanTableSource implements
ScanTableSource, SupportsReading
topic,
consumerGroup,
nameServerAddress,
+ accessKey,
+ secretKey,
tag,
sql,
stopInMs,
@@ -238,6 +240,8 @@ public class RocketMQScanTableSource implements
ScanTableSource, SupportsReading
consumerProps.setProperty(RocketMQConfig.NAME_SERVER_ADDR,
nameServerAddress);
consumerProps.setProperty(RocketMQConfig.CONSUMER_TAG, tag);
consumerProps.setProperty(RocketMQConfig.CONSUMER_SQL, sql);
+ consumerProps.setProperty(RocketMQConfig.ACCESS_KEY, accessKey);
+ consumerProps.setProperty(RocketMQConfig.SECRET_KEY, secretKey);
return consumerProps;
}