This is an automated email from the ASF dual-hosted git repository. duhengforever pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/rocketmq-flink.git
commit 7059330fc645e369ea79b9da867b1f3c01c6e4dc Author: Heng Du <[email protected]> AuthorDate: Wed Aug 21 10:11:34 2019 +0800 [ISSUE #385]Add acl feature support for rocketmq-flink (#384) * Add acl support for rocketmq flink connector * Fix unit test error --- pom.xml | 7 +++- .../org/apache/rocketmq/flink/RocketMQConfig.java | 21 ++++++++++ .../org/apache/rocketmq/flink/RocketMQSink.java | 2 +- .../org/apache/rocketmq/flink/RocketMQSource.java | 47 +++++++++++----------- 4 files changed, 51 insertions(+), 26 deletions(-) diff --git a/pom.xml b/pom.xml index c715f7f..0d68314 100644 --- a/pom.xml +++ b/pom.xml @@ -33,7 +33,7 @@ <!-- compiler settings properties --> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> - <rocketmq.version>4.2.0</rocketmq.version> + <rocketmq.version>4.5.2</rocketmq.version> <flink.version>1.7.0</flink.version> <commons-lang.version>2.5</commons-lang.version> <scala.binary.version>2.11</scala.binary.version> @@ -67,6 +67,11 @@ </dependency> <dependency> <groupId>org.apache.rocketmq</groupId> + <artifactId>rocketmq-acl</artifactId> + <version>${rocketmq.version}</version> + </dependency> + <dependency> + <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-common</artifactId> <version>${rocketmq.version}</version> <exclusions> diff --git a/src/main/java/org/apache/rocketmq/flink/RocketMQConfig.java b/src/main/java/org/apache/rocketmq/flink/RocketMQConfig.java index 5b43b31..5a0784b 100644 --- a/src/main/java/org/apache/rocketmq/flink/RocketMQConfig.java +++ b/src/main/java/org/apache/rocketmq/flink/RocketMQConfig.java @@ -23,6 +23,8 @@ import java.util.UUID; import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.Validate; +import org.apache.rocketmq.acl.common.AclClientRPCHook; +import org.apache.rocketmq.acl.common.SessionCredentials; import org.apache.rocketmq.client.ClientConfig; import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer; import org.apache.rocketmq.client.producer.DefaultMQProducer; @@ -53,6 +55,9 @@ public class RocketMQConfig { public static final String PRODUCER_TIMEOUT = "producer.timeout"; public static final int DEFAULT_PRODUCER_TIMEOUT = 3000; // 3 seconds + public static final String ACCESS_KEY = "access.key"; + public static final String SECRET_KEY = "secret.key"; + // Consumer related config public static final String CONSUMER_GROUP = "consumer.group"; // Required @@ -152,4 +157,20 @@ public class RocketMQConfig { client.setHeartbeatBrokerInterval(getInteger(props, BROKER_HEART_BEAT_INTERVAL, DEFAULT_BROKER_HEART_BEAT_INTERVAL)); } + + + /** + * Build credentials for client. + * @param props + * @return + */ + public static AclClientRPCHook buildAclRPCHook(Properties props) { + String accessKey = props.getProperty(ACCESS_KEY); + String secretKey = props.getProperty(SECRET_KEY); + if (!StringUtils.isEmpty(accessKey) && !StringUtils.isEmpty(secretKey)) { + AclClientRPCHook aclClientRPCHook = new AclClientRPCHook(new SessionCredentials(accessKey, secretKey)); + return aclClientRPCHook; + } + return null; + } } diff --git a/src/main/java/org/apache/rocketmq/flink/RocketMQSink.java b/src/main/java/org/apache/rocketmq/flink/RocketMQSink.java index e8f237f..f3e200d 100644 --- a/src/main/java/org/apache/rocketmq/flink/RocketMQSink.java +++ b/src/main/java/org/apache/rocketmq/flink/RocketMQSink.java @@ -87,7 +87,7 @@ public class RocketMQSink<IN> extends RichSinkFunction<IN> implements Checkpoint Validate.notNull(topicSelector, "TopicSelector can not be null"); Validate.notNull(serializationSchema, "KeyValueSerializationSchema can not be null"); - producer = new DefaultMQProducer(); + producer = new DefaultMQProducer(RocketMQConfig.buildAclRPCHook(props)); producer.setInstanceName(String.valueOf(getRuntimeContext().getIndexOfThisSubtask()) + "_" + UUID.randomUUID()); RocketMQConfig.buildProducerConfigs(props, producer); diff --git a/src/main/java/org/apache/rocketmq/flink/RocketMQSource.java b/src/main/java/org/apache/rocketmq/flink/RocketMQSource.java index 06eecfb..e289b49 100644 --- a/src/main/java/org/apache/rocketmq/flink/RocketMQSource.java +++ b/src/main/java/org/apache/rocketmq/flink/RocketMQSource.java @@ -1,19 +1,14 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. */ package org.apache.rocketmq.flink; @@ -26,7 +21,6 @@ import java.util.Properties; import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; - import org.apache.commons.collections.map.LinkedMap; import org.apache.commons.lang.Validate; import org.apache.flink.api.common.state.ListState; @@ -47,7 +41,6 @@ import org.apache.rocketmq.client.consumer.MQPullConsumerScheduleService; import org.apache.rocketmq.client.consumer.PullResult; import org.apache.rocketmq.client.consumer.PullTaskCallback; import org.apache.rocketmq.client.consumer.PullTaskContext; -import org.apache.rocketmq.client.consumer.store.OffsetStore; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageQueue; @@ -62,9 +55,8 @@ import static org.apache.rocketmq.flink.RocketMQUtils.getInteger; import static org.apache.rocketmq.flink.RocketMQUtils.getLong; /** - * The RocketMQSource is based on RocketMQ pull consumer mode, - * and provides exactly once reliability guarantees when checkpoints are enabled. - * Otherwise, the source doesn't provide any reliability guarantees. + * The RocketMQSource is based on RocketMQ pull consumer mode, and provides exactly once reliability guarantees when + * checkpoints are enabled. Otherwise, the source doesn't provide any reliability guarantees. */ public class RocketMQSource<OUT> extends RichParallelSourceFunction<OUT> implements CheckpointedFunction, CheckpointListener, ResultTypeQueryable<OUT> { @@ -126,7 +118,8 @@ public class RocketMQSource<OUT> extends RichParallelSourceFunction<OUT> runningChecker = new RunningChecker(); - pullConsumerScheduleService = new MQPullConsumerScheduleService(group); + //Wait for lite pull consumer + pullConsumerScheduleService = new MQPullConsumerScheduleService(group, RocketMQConfig.buildAclRPCHook(props)); consumer = pullConsumerScheduleService.getDefaultMQPullConsumer(); consumer.setInstanceName(String.valueOf(getRuntimeContext().getIndexOfThisSubtask()) + "_" + UUID.randomUUID()); @@ -270,10 +263,15 @@ public class RocketMQSource<OUT> extends RichParallelSourceFunction<OUT> if (pullConsumerScheduleService != null) { pullConsumerScheduleService.shutdown(); } - - offsetTable.clear(); - restoredOffsets.clear(); - pendingOffsetsToCommit.clear(); + if (offsetTable != null) { + offsetTable.clear(); + } + if (restoredOffsets != null) { + restoredOffsets.clear(); + } + if (pendingOffsetsToCommit != null) { + pendingOffsetsToCommit.clear(); + } } @Override @@ -331,8 +329,9 @@ public class RocketMQSource<OUT> extends RichParallelSourceFunction<OUT> LOG.debug("initialize State ..."); this.unionOffsetStates = context.getOperatorStateStore().getUnionListState(new ListStateDescriptor<>( - OFFSETS_STATE_NAME, TypeInformation.of(new TypeHint<Tuple2<MessageQueue, Long>>() { }))); + OFFSETS_STATE_NAME, TypeInformation.of(new TypeHint<Tuple2<MessageQueue, Long>>() { + }))); this.restored = context.isRestored(); if (restored) { @@ -369,7 +368,7 @@ public class RocketMQSource<OUT> extends RichParallelSourceFunction<OUT> return; } - Map<MessageQueue, Long> offsets = (Map<MessageQueue, Long>)pendingOffsetsToCommit.remove(posInMap); + Map<MessageQueue, Long> offsets = (Map<MessageQueue, Long>) pendingOffsetsToCommit.remove(posInMap); // remove older checkpoints in map for (int i = 0; i < posInMap; i++) {
