This is an automated email from the ASF dual-hosted git repository.

duhengforever pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-flink.git

commit 7059330fc645e369ea79b9da867b1f3c01c6e4dc
Author: Heng Du <[email protected]>
AuthorDate: Wed Aug 21 10:11:34 2019 +0800

    [ISSUE #385]Add acl feature support for rocketmq-flink (#384)
    
    * Add acl support for rocketmq flink connector
    
    * Fix unit test error
---
 pom.xml                                            |  7 +++-
 .../org/apache/rocketmq/flink/RocketMQConfig.java  | 21 ++++++++++
 .../org/apache/rocketmq/flink/RocketMQSink.java    |  2 +-
 .../org/apache/rocketmq/flink/RocketMQSource.java  | 47 +++++++++++-----------
 4 files changed, 51 insertions(+), 26 deletions(-)

diff --git a/pom.xml b/pom.xml
index c715f7f..0d68314 100644
--- a/pom.xml
+++ b/pom.xml
@@ -33,7 +33,7 @@
         <!-- compiler settings properties -->
         <maven.compiler.source>1.8</maven.compiler.source>
         <maven.compiler.target>1.8</maven.compiler.target>
-        <rocketmq.version>4.2.0</rocketmq.version>
+        <rocketmq.version>4.5.2</rocketmq.version>
         <flink.version>1.7.0</flink.version>
         <commons-lang.version>2.5</commons-lang.version>
         <scala.binary.version>2.11</scala.binary.version>
@@ -67,6 +67,11 @@
         </dependency>
         <dependency>
             <groupId>org.apache.rocketmq</groupId>
+            <artifactId>rocketmq-acl</artifactId>
+            <version>${rocketmq.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.rocketmq</groupId>
             <artifactId>rocketmq-common</artifactId>
             <version>${rocketmq.version}</version>
             <exclusions>
diff --git a/src/main/java/org/apache/rocketmq/flink/RocketMQConfig.java 
b/src/main/java/org/apache/rocketmq/flink/RocketMQConfig.java
index 5b43b31..5a0784b 100644
--- a/src/main/java/org/apache/rocketmq/flink/RocketMQConfig.java
+++ b/src/main/java/org/apache/rocketmq/flink/RocketMQConfig.java
@@ -23,6 +23,8 @@ import java.util.UUID;
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang.Validate;
+import org.apache.rocketmq.acl.common.AclClientRPCHook;
+import org.apache.rocketmq.acl.common.SessionCredentials;
 import org.apache.rocketmq.client.ClientConfig;
 import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
 import org.apache.rocketmq.client.producer.DefaultMQProducer;
@@ -53,6 +55,9 @@ public class RocketMQConfig {
     public static final String PRODUCER_TIMEOUT = "producer.timeout";
     public static final int DEFAULT_PRODUCER_TIMEOUT = 3000; // 3 seconds
 
+    public static final String ACCESS_KEY = "access.key";
+    public static final String SECRET_KEY = "secret.key";
+
 
     // Consumer related config
     public static final String CONSUMER_GROUP = "consumer.group"; // Required
@@ -152,4 +157,20 @@ public class RocketMQConfig {
         client.setHeartbeatBrokerInterval(getInteger(props,
             BROKER_HEART_BEAT_INTERVAL, DEFAULT_BROKER_HEART_BEAT_INTERVAL));
     }
+
+
+    /**
+     * Build credentials for client.
+     * @param props
+     * @return
+     */
+    public static AclClientRPCHook buildAclRPCHook(Properties props) {
+        String accessKey = props.getProperty(ACCESS_KEY);
+        String secretKey = props.getProperty(SECRET_KEY);
+        if (!StringUtils.isEmpty(accessKey) && 
!StringUtils.isEmpty(secretKey)) {
+            AclClientRPCHook aclClientRPCHook = new AclClientRPCHook(new 
SessionCredentials(accessKey, secretKey));
+            return aclClientRPCHook;
+        }
+        return null;
+    }
 }
diff --git a/src/main/java/org/apache/rocketmq/flink/RocketMQSink.java 
b/src/main/java/org/apache/rocketmq/flink/RocketMQSink.java
index e8f237f..f3e200d 100644
--- a/src/main/java/org/apache/rocketmq/flink/RocketMQSink.java
+++ b/src/main/java/org/apache/rocketmq/flink/RocketMQSink.java
@@ -87,7 +87,7 @@ public class RocketMQSink<IN> extends RichSinkFunction<IN> 
implements Checkpoint
         Validate.notNull(topicSelector, "TopicSelector can not be null");
         Validate.notNull(serializationSchema, "KeyValueSerializationSchema can 
not be null");
 
-        producer = new DefaultMQProducer();
+        producer = new 
DefaultMQProducer(RocketMQConfig.buildAclRPCHook(props));
         
producer.setInstanceName(String.valueOf(getRuntimeContext().getIndexOfThisSubtask())
 + "_" + UUID.randomUUID());
         RocketMQConfig.buildProducerConfigs(props, producer);
 
diff --git a/src/main/java/org/apache/rocketmq/flink/RocketMQSource.java 
b/src/main/java/org/apache/rocketmq/flink/RocketMQSource.java
index 06eecfb..e289b49 100644
--- a/src/main/java/org/apache/rocketmq/flink/RocketMQSource.java
+++ b/src/main/java/org/apache/rocketmq/flink/RocketMQSource.java
@@ -1,19 +1,14 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE
+ * file distributed with this work for additional information regarding 
copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the "License"); you may not 
use this file except in compliance with the
+ * License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 
express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
  */
 
 package org.apache.rocketmq.flink;
@@ -26,7 +21,6 @@ import java.util.Properties;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
-
 import org.apache.commons.collections.map.LinkedMap;
 import org.apache.commons.lang.Validate;
 import org.apache.flink.api.common.state.ListState;
@@ -47,7 +41,6 @@ import 
org.apache.rocketmq.client.consumer.MQPullConsumerScheduleService;
 import org.apache.rocketmq.client.consumer.PullResult;
 import org.apache.rocketmq.client.consumer.PullTaskCallback;
 import org.apache.rocketmq.client.consumer.PullTaskContext;
-import org.apache.rocketmq.client.consumer.store.OffsetStore;
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.message.MessageQueue;
@@ -62,9 +55,8 @@ import static 
org.apache.rocketmq.flink.RocketMQUtils.getInteger;
 import static org.apache.rocketmq.flink.RocketMQUtils.getLong;
 
 /**
- * The RocketMQSource is based on RocketMQ pull consumer mode,
- * and provides exactly once reliability guarantees when checkpoints are 
enabled.
- * Otherwise, the source doesn't provide any reliability guarantees.
+ * The RocketMQSource is based on RocketMQ pull consumer mode, and provides 
exactly once reliability guarantees when
+ * checkpoints are enabled. Otherwise, the source doesn't provide any 
reliability guarantees.
  */
 public class RocketMQSource<OUT> extends RichParallelSourceFunction<OUT>
     implements CheckpointedFunction, CheckpointListener, 
ResultTypeQueryable<OUT> {
@@ -126,7 +118,8 @@ public class RocketMQSource<OUT> extends 
RichParallelSourceFunction<OUT>
 
         runningChecker = new RunningChecker();
 
-        pullConsumerScheduleService = new MQPullConsumerScheduleService(group);
+        //Wait for lite pull consumer
+        pullConsumerScheduleService = new MQPullConsumerScheduleService(group, 
RocketMQConfig.buildAclRPCHook(props));
         consumer = pullConsumerScheduleService.getDefaultMQPullConsumer();
 
         
consumer.setInstanceName(String.valueOf(getRuntimeContext().getIndexOfThisSubtask())
 + "_" + UUID.randomUUID());
@@ -270,10 +263,15 @@ public class RocketMQSource<OUT> extends 
RichParallelSourceFunction<OUT>
         if (pullConsumerScheduleService != null) {
             pullConsumerScheduleService.shutdown();
         }
-
-        offsetTable.clear();
-        restoredOffsets.clear();
-        pendingOffsetsToCommit.clear();
+        if (offsetTable != null) {
+            offsetTable.clear();
+        }
+        if (restoredOffsets != null) {
+            restoredOffsets.clear();
+        }
+        if (pendingOffsetsToCommit != null) {
+            pendingOffsetsToCommit.clear();
+        }
     }
 
     @Override
@@ -331,8 +329,9 @@ public class RocketMQSource<OUT> extends 
RichParallelSourceFunction<OUT>
         LOG.debug("initialize State ...");
 
         this.unionOffsetStates = 
context.getOperatorStateStore().getUnionListState(new ListStateDescriptor<>(
-            OFFSETS_STATE_NAME, TypeInformation.of(new 
TypeHint<Tuple2<MessageQueue, Long>>() { })));
+                OFFSETS_STATE_NAME, TypeInformation.of(new 
TypeHint<Tuple2<MessageQueue, Long>>() {
 
+        })));
         this.restored = context.isRestored();
 
         if (restored) {
@@ -369,7 +368,7 @@ public class RocketMQSource<OUT> extends 
RichParallelSourceFunction<OUT>
             return;
         }
 
-        Map<MessageQueue, Long> offsets = (Map<MessageQueue, 
Long>)pendingOffsetsToCommit.remove(posInMap);
+        Map<MessageQueue, Long> offsets = (Map<MessageQueue, Long>) 
pendingOffsetsToCommit.remove(posInMap);
 
         // remove older checkpoints in map
         for (int i = 0; i < posInMap; i++) {

Reply via email to