This is an automated email from the ASF dual-hosted git repository. duhengforever pushed a commit to branch OpenMessaging in repository https://gitbox.apache.org/repos/asf/rocketmq-ons.git
commit 51e20e3325f79dd0b42bd773b156ac4290e395a7 Author: duhenglucky <[email protected]> AuthorDate: Wed Dec 4 17:26:57 2019 +0800 feat(ons-client) remove the dependency of ons-auth4client --- ons-core/ons-client/pom.xml | 21 +++++-- .../org/apache/rocketmq/ons/api/Constants.java | 2 + .../exception/AuthenticationException.java | 68 ++++++++++++++++++++ .../rocketmq/ONSChannel.java} | 12 ++-- .../ons/api/impl/rocketmq/ONSClientAbstract.java | 33 ++-------- .../ons/api/impl/rocketmq/ONSConsumerAbstract.java | 11 ++-- .../ons/api/impl/rocketmq/OnsClientRPCHook.java | 28 +++++++-- .../ons/api/impl/rocketmq/OrderProducerImpl.java | 12 ++-- .../ons/api/impl/rocketmq/ProducerImpl.java | 7 ++- .../ons/api/impl/rocketmq/PullConsumerImpl.java | 3 +- .../api/impl/rocketmq/TransactionProducerImpl.java | 9 ++- ons-core/ons-trace-core/pom.xml | 5 -- .../core/dispatch/impl/AsyncArrayDispatcher.java | 30 +++------ .../core/dispatch/impl/TraceProducerFactory.java | 55 +---------------- .../ons/open/trace/core/hook/AbstractRPCHook.java | 72 ---------------------- ons-core/pom.xml | 5 ++ 16 files changed, 156 insertions(+), 217 deletions(-) diff --git a/ons-core/ons-client/pom.xml b/ons-core/ons-client/pom.xml index b9558d8..fde3dc5 100644 --- a/ons-core/ons-client/pom.xml +++ b/ons-core/ons-client/pom.xml @@ -33,6 +33,11 @@ <version>${rocketmq.version}</version> </dependency> <dependency> + <groupId>org.apache.rocketmq</groupId> + <artifactId>rocketmq-acl</artifactId> + <version>${rocketmq.version}</version> + </dependency> + <dependency> <groupId>commons-codec</groupId> <artifactId>commons-codec</artifactId> </dependency> @@ -45,12 +50,18 @@ <groupId>${project.groupId}</groupId> <artifactId>ons-trace-core</artifactId> <version>${project.version}</version> + <exclusions> + <exclusion> + <groupId>${project.groupId}</groupId> + <artifactId>ons-auth4client</artifactId> + </exclusion> + </exclusions> </dependency> - <dependency> - <groupId>${project.groupId}</groupId> - <artifactId>ons-auth4client</artifactId> - <version>${project.version}</version> - </dependency> + <!--<dependency>--> + <!--<groupId>${project.groupId}</groupId>--> + <!--<artifactId>ons-auth4client</artifactId>--> + <!--<version>${project.version}</version>--> + <!--</dependency>--> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> diff --git a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/Constants.java b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/Constants.java index a82c405..2a82a0b 100644 --- a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/Constants.java +++ b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/Constants.java @@ -21,4 +21,6 @@ public class Constants { public static final String TRANSACTION_ID = "__transactionId__"; public static final String TOPIC_PARTITION_SEPARATOR = "#"; + + public static final String ONS_CHANNEL_KEY = "OnsChannel"; } diff --git a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/authority/exception/AuthenticationException.java b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/authority/exception/AuthenticationException.java new file mode 100644 index 0000000..b4c57d6 --- /dev/null +++ b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/authority/exception/AuthenticationException.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.ons.api.impl.authority.exception; + +public class AuthenticationException extends RuntimeException { + private static final long serialVersionUID = 1L; + + private String status; + private int code; + + + public AuthenticationException(String status, int code) { + super(); + this.status = status; + this.code = code; + } + + + public AuthenticationException(String status, int code, String message) { + super(message); + this.status = status; + this.code = code; + } + + + public AuthenticationException(String status, int code, Throwable throwable) { + super(throwable); + this.status = status; + this.code = code; + } + + + public AuthenticationException(String status, int code, String message, Throwable throwable) { + super(message, throwable); + this.status = status; + this.code = code; + } + + public String getStatus() { + return status; + } + + public void setStatus(String status) { + this.status = status; + } + + public int getCode() { + return code; + } + + public void setCode(int code) { + this.code = code; + } +} diff --git a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/Constants.java b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/ONSChannel.java similarity index 80% copy from ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/Constants.java copy to ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/ONSChannel.java index a82c405..e4acc99 100644 --- a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/Constants.java +++ b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/ONSChannel.java @@ -15,10 +15,14 @@ * limitations under the License. */ -package org.apache.rocketmq.ons.api; +package org.apache.rocketmq.ons.api.impl.rocketmq; -public class Constants { - public static final String TRANSACTION_ID = "__transactionId__"; - public static final String TOPIC_PARTITION_SEPARATOR = "#"; +public enum ONSChannel { + + CLOUD, + + ALIYUN, + + ALL } diff --git a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/ONSClientAbstract.java b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/ONSClientAbstract.java index 6b78740..984ba6f 100644 --- a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/ONSClientAbstract.java +++ b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/ONSClientAbstract.java @@ -17,7 +17,6 @@ package org.apache.rocketmq.ons.api.impl.rocketmq; - import io.openmessaging.api.Credentials; import io.openmessaging.api.LifeCycle; import java.util.Properties; @@ -28,14 +27,15 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import javax.annotation.Generated; import org.apache.commons.lang3.StringUtils; +import org.apache.rocketmq.acl.common.SessionCredentials; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl; import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.namesrv.TopAddressing; import org.apache.rocketmq.logging.InternalLogger; +import org.apache.rocketmq.ons.api.Constants; import org.apache.rocketmq.ons.api.PropertyKeyConst; import org.apache.rocketmq.ons.api.exception.ONSClientException; -import org.apache.rocketmq.ons.api.impl.authority.SessionCredentials; import org.apache.rocketmq.ons.api.impl.util.ClientLoggerUtil; import org.apache.rocketmq.ons.api.impl.util.NameAddrUtils; import org.apache.rocketmq.ons.open.trace.core.dispatch.AsyncDispatcher; @@ -74,27 +74,17 @@ public abstract class ONSClientAbstract implements LifeCycle, Credentials { public ONSClientAbstract(Properties properties) { this.properties = properties; this.sessionCredentials.updateContent(properties); - if (this.sessionCredentials.getOnsChannel().equals(ONSChannel.ALIYUN) && - (null == this.sessionCredentials.getAccessKey() || "".equals(this.sessionCredentials.getAccessKey()))) { - throw new ONSClientException("please set access key"); - } - - if (this.sessionCredentials.getOnsChannel().equals(ONSChannel.ALIYUN) && - (null == this.sessionCredentials.getSecretKey() || "".equals(this.sessionCredentials.getSecretKey()))) { - throw new ONSClientException("please set secret key"); - } - - if (null == this.sessionCredentials.getOnsChannel()) { - throw new ONSClientException("please set ons channel"); - } + ONSChannel onsChannle = ONSChannel.valueOf(this.properties.getProperty(Constants.ONS_CHANNEL_KEY, "ALIYUN")); this.nameServerAddr = getNameSrvAddrFromProperties(); if (nameServerAddr != null) { return; } - if (nameServerAddr == null && !this.sessionCredentials.getOnsChannel().equals(ONSChannel.ALIYUN)) { + + if (nameServerAddr == null && !onsChannle.equals(ONSChannel.ALIYUN)) { return; } + this.nameServerAddr = fetchNameServerAddr(); if (null == nameServerAddr) { throw new ONSClientException(FAQ.errorMessage("Can not find name server, May be your network problem.", FAQ.FIND_NS_FAILED)); @@ -239,17 +229,6 @@ public abstract class ONSClientAbstract implements LifeCycle, Credentials { @Override public void updateCredential(Properties credentialProperties) { - if (this.sessionCredentials.getOnsChannel().equals(ONSChannel.ALIYUN) && - (null == credentialProperties.getProperty(SessionCredentials.AccessKey) - || "".equals(credentialProperties.getProperty(SessionCredentials.AccessKey)))) { - throw new ONSClientException("update credential failed. please set access key."); - } - - if (this.sessionCredentials.getOnsChannel().equals(ONSChannel.ALIYUN) && - (null == credentialProperties.getProperty(SessionCredentials.SecretKey) - || "".equals(credentialProperties.getProperty(SessionCredentials.SecretKey)))) { - throw new ONSClientException("update credential failed. please set secret key"); - } this.sessionCredentials.updateContent(credentialProperties); } diff --git a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/ONSConsumerAbstract.java b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/ONSConsumerAbstract.java index 3c6cae2..6cc4220 100644 --- a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/ONSConsumerAbstract.java +++ b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/ONSConsumerAbstract.java @@ -17,14 +17,15 @@ package org.apache.rocketmq.ons.api.impl.rocketmq; - import io.openmessaging.api.MessageSelector; import java.util.Properties; import org.apache.commons.lang3.StringUtils; +import org.apache.rocketmq.acl.common.AclClientRPCHook; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.logging.InternalLogger; +import org.apache.rocketmq.ons.api.Constants; import org.apache.rocketmq.ons.api.PropertyKeyConst; import org.apache.rocketmq.ons.api.exception.ONSClientException; import org.apache.rocketmq.ons.api.impl.util.ClientLoggerUtil; @@ -55,8 +56,8 @@ public class ONSConsumerAbstract extends ONSClientAbstract { } this.defaultMQPushConsumer = - new DefaultMQPushConsumer(this.getNamespace(), consumerGroup, new OnsClientRPCHook(sessionCredentials)); - + new DefaultMQPushConsumer(this.getNamespace(), consumerGroup, new OnsClientRPCHook(sessionCredentials, + properties.getProperty(Constants.ONS_CHANNEL_KEY))); String maxReconsumeTimes = properties.getProperty(PropertyKeyConst.MaxReconsumeTimes); if (!UtilAll.isBlank(maxReconsumeTimes)) { @@ -117,15 +118,13 @@ public class ONSConsumerAbstract extends ONSClientAbstract { } else { try { Properties tempProperties = new Properties(); - tempProperties.put(OnsTraceConstants.AccessKey, sessionCredentials.getAccessKey()); - tempProperties.put(OnsTraceConstants.SecretKey, sessionCredentials.getSecretKey()); tempProperties.put(OnsTraceConstants.MaxMsgSize, "128000"); tempProperties.put(OnsTraceConstants.AsyncBufferSize, "2048"); tempProperties.put(OnsTraceConstants.MaxBatchNum, "100"); tempProperties.put(OnsTraceConstants.NAMESRV_ADDR, this.getNameServerAddr()); tempProperties.put(OnsTraceConstants.InstanceName, "PID_CLIENT_INNER_TRACE_PRODUCER"); tempProperties.put(OnsTraceConstants.TraceDispatcherType, OnsTraceDispatcherType.CONSUMER.name()); - AsyncArrayDispatcher dispatcher = new AsyncArrayDispatcher(tempProperties, sessionCredentials); + AsyncArrayDispatcher dispatcher = new AsyncArrayDispatcher(tempProperties, new AclClientRPCHook(sessionCredentials)); dispatcher.setHostConsumer(defaultMQPushConsumer.getDefaultMQPushConsumerImpl()); traceDispatcher = dispatcher; this.defaultMQPushConsumer.getDefaultMQPushConsumerImpl().registerConsumeMessageHook( diff --git a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/OnsClientRPCHook.java b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/OnsClientRPCHook.java index a145c25..b117a00 100644 --- a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/OnsClientRPCHook.java +++ b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/OnsClientRPCHook.java @@ -17,23 +17,39 @@ package org.apache.rocketmq.ons.api.impl.rocketmq; -import org.apache.rocketmq.remoting.protocol.RemotingCommand; +import org.apache.rocketmq.acl.common.AclClientRPCHook; +import org.apache.rocketmq.acl.common.AclException; +import org.apache.rocketmq.acl.common.SessionCredentials; +import org.apache.rocketmq.ons.api.Constants; import org.apache.rocketmq.ons.api.impl.MQClientInfo; -import org.apache.rocketmq.ons.api.impl.authority.SessionCredentials; +import org.apache.rocketmq.ons.api.impl.authority.exception.AuthenticationException; +import org.apache.rocketmq.remoting.protocol.RemotingCommand; -public class OnsClientRPCHook extends ClientRPCHook { +public class OnsClientRPCHook extends AclClientRPCHook { + private static final int CAL_SIGNATURE_FAILED = 10015; + private final ONSChannel onsChannel; - public OnsClientRPCHook(SessionCredentials sessionCredentials) { +// public OnsClientRPCHook(SessionCredentials sessionCredentials) { +// super(sessionCredentials); +// this.onsChannel = ONSChannel.ALIYUN; +// } + + public OnsClientRPCHook(SessionCredentials sessionCredentials, String channel) { super(sessionCredentials); + this.onsChannel = ONSChannel.valueOf(channel); } @Override public void doBeforeRequest(String remoteAddr, RemotingCommand request) { - super.doBeforeRequest(remoteAddr, request); + try { + super.doBeforeRequest(remoteAddr, request); + } catch (AclException aclException) { + throw new AuthenticationException("CAL_SIGNATURE_FAILED", CAL_SIGNATURE_FAILED, aclException.getMessage(), aclException); + } + request.addExtField(Constants.ONS_CHANNEL_KEY, onsChannel.name()); request.setVersion(MQClientInfo.versionCode); } - @Override public void doAfterResponse(String remoteAddr, RemotingCommand request, RemotingCommand response) { super.doAfterResponse(remoteAddr, request, response); diff --git a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/OrderProducerImpl.java b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/OrderProducerImpl.java index 2808444..1164ad1 100644 --- a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/OrderProducerImpl.java +++ b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/OrderProducerImpl.java @@ -17,18 +17,18 @@ package org.apache.rocketmq.ons.api.impl.rocketmq; - import io.openmessaging.api.Message; import io.openmessaging.api.SendResult; import io.openmessaging.api.order.OrderProducer; import java.util.List; import java.util.Properties; import org.apache.commons.lang3.StringUtils; +import org.apache.rocketmq.acl.common.AclClientRPCHook; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.logging.InternalLogger; - +import org.apache.rocketmq.ons.api.Constants; import org.apache.rocketmq.ons.api.PropertyKeyConst; import org.apache.rocketmq.ons.api.exception.ONSClientException; import org.apache.rocketmq.ons.api.impl.util.ClientLoggerUtil; @@ -50,8 +50,8 @@ public class OrderProducerImpl extends ONSClientAbstract implements OrderProduce } this.defaultMQProducer = - new DefaultMQProducer(this.getNamespace(), producerGroup, new OnsClientRPCHook(sessionCredentials)); - + new DefaultMQProducer(this.getNamespace(), producerGroup, new OnsClientRPCHook(sessionCredentials, + properties.getProperty(Constants.ONS_CHANNEL_KEY))); this.defaultMQProducer.setProducerGroup(producerGroup); @@ -78,15 +78,13 @@ public class OrderProducerImpl extends ONSClientAbstract implements OrderProduce } else { try { Properties tempProperties = new Properties(); - tempProperties.put(OnsTraceConstants.AccessKey, sessionCredentials.getAccessKey()); - tempProperties.put(OnsTraceConstants.SecretKey, sessionCredentials.getSecretKey()); tempProperties.put(OnsTraceConstants.MaxMsgSize, "128000"); tempProperties.put(OnsTraceConstants.AsyncBufferSize, "2048"); tempProperties.put(OnsTraceConstants.MaxBatchNum, "100"); tempProperties.put(OnsTraceConstants.NAMESRV_ADDR, this.getNameServerAddr()); tempProperties.put(OnsTraceConstants.InstanceName, "PID_CLIENT_INNER_TRACE_PRODUCER"); tempProperties.put(OnsTraceConstants.TraceDispatcherType, OnsTraceDispatcherType.PRODUCER.name()); - AsyncArrayDispatcher dispatcher = new AsyncArrayDispatcher(tempProperties, sessionCredentials); + AsyncArrayDispatcher dispatcher = new AsyncArrayDispatcher(tempProperties, new AclClientRPCHook(sessionCredentials)); dispatcher.setHostProducer(defaultMQProducer.getDefaultMQProducerImpl()); traceDispatcher = dispatcher; this.defaultMQProducer.getDefaultMQProducerImpl().registerSendMessageHook( diff --git a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/ProducerImpl.java b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/ProducerImpl.java index 80b3fe6..97107ed 100644 --- a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/ProducerImpl.java +++ b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/ProducerImpl.java @@ -32,6 +32,7 @@ import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.message.MessageClientIDSetter; import org.apache.rocketmq.common.protocol.ResponseCode; import org.apache.rocketmq.logging.InternalLogger; +import org.apache.rocketmq.ons.api.Constants; import org.apache.rocketmq.ons.api.PropertyKeyConst; import org.apache.rocketmq.ons.api.exception.ONSClientException; import org.apache.rocketmq.ons.api.impl.util.ClientLoggerUtil; @@ -56,7 +57,8 @@ public class ProducerImpl extends ONSClientAbstract implements Producer { } this.defaultMQProducer = - new DefaultMQProducer(this.getNamespace(), producerGroup, new OnsClientRPCHook(sessionCredentials)); + new DefaultMQProducer(this.getNamespace(), producerGroup, new OnsClientRPCHook(sessionCredentials, + properties.getProperty(Constants.ONS_CHANNEL_KEY))); this.defaultMQProducer.setProducerGroup(producerGroup); @@ -96,7 +98,8 @@ public class ProducerImpl extends ONSClientAbstract implements Producer { tempProperties.put(OnsTraceConstants.NAMESRV_ADDR, this.getNameServerAddr()); tempProperties.put(OnsTraceConstants.InstanceName, "PID_CLIENT_INNER_TRACE_PRODUCER"); tempProperties.put(OnsTraceConstants.TraceDispatcherType, OnsTraceDispatcherType.PRODUCER.name()); - AsyncArrayDispatcher dispatcher = new AsyncArrayDispatcher(tempProperties, sessionCredentials); + AsyncArrayDispatcher dispatcher = new AsyncArrayDispatcher(tempProperties, + new OnsClientRPCHook(sessionCredentials, properties.getProperty(Constants.ONS_CHANNEL_KEY))); dispatcher.setHostProducer(defaultMQProducer.getDefaultMQProducerImpl()); traceDispatcher = dispatcher; this.defaultMQProducer.getDefaultMQProducerImpl().registerSendMessageHook( diff --git a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/PullConsumerImpl.java b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/PullConsumerImpl.java index d364b88..233d637 100644 --- a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/PullConsumerImpl.java +++ b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/PullConsumerImpl.java @@ -63,7 +63,8 @@ public class PullConsumerImpl extends ONSClientAbstract implements PullConsumer } this.litePullConsumer = - new DefaultLitePullConsumer(this.getNamespace(), consumerGroup, new OnsClientRPCHook(sessionCredentials)); + new DefaultLitePullConsumer(this.getNamespace(), consumerGroup, new OnsClientRPCHook(sessionCredentials, + properties.getProperty(Constants.ONS_CHANNEL_KEY))); String messageModel = properties.getProperty(PropertyKeyConst.MessageModel, PropertyValueConst.CLUSTERING); this.litePullConsumer.setMessageModel(MessageModel.valueOf(messageModel)); diff --git a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/TransactionProducerImpl.java b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/TransactionProducerImpl.java index f56a460..45a6839 100644 --- a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/TransactionProducerImpl.java +++ b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/TransactionProducerImpl.java @@ -17,7 +17,6 @@ package org.apache.rocketmq.ons.api.impl.rocketmq; - import io.openmessaging.api.Message; import io.openmessaging.api.SendResult; import io.openmessaging.api.transaction.LocalTransactionExecuter; @@ -25,6 +24,7 @@ import io.openmessaging.api.transaction.TransactionProducer; import io.openmessaging.api.transaction.TransactionStatus; import java.util.Properties; import org.apache.commons.lang3.StringUtils; +import org.apache.rocketmq.acl.common.AclClientRPCHook; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.LocalTransactionState; import org.apache.rocketmq.client.producer.TransactionCheckListener; @@ -53,7 +53,8 @@ public class TransactionProducerImpl extends ONSClientAbstract implements Transa producerGroup = "__ONS_PRODUCER_DEFAULT_GROUP"; } transactionMQProducer = - new TransactionMQProducer(this.getNamespace(), producerGroup, new OnsClientRPCHook(sessionCredentials)); + new TransactionMQProducer(this.getNamespace(), producerGroup, new OnsClientRPCHook(sessionCredentials, + properties.getProperty(Constants.ONS_CHANNEL_KEY))); boolean isVipChannelEnabled = Boolean.parseBoolean(properties.getProperty(PropertyKeyConst.isVipChannelEnabled, "false")); transactionMQProducer.setVipChannelEnabled(isVipChannelEnabled); @@ -75,15 +76,13 @@ public class TransactionProducerImpl extends ONSClientAbstract implements Transa } else { try { Properties tempProperties = new Properties(); - tempProperties.put(OnsTraceConstants.AccessKey, sessionCredentials.getAccessKey()); - tempProperties.put(OnsTraceConstants.SecretKey, sessionCredentials.getSecretKey()); tempProperties.put(OnsTraceConstants.MaxMsgSize, "128000"); tempProperties.put(OnsTraceConstants.AsyncBufferSize, "2048"); tempProperties.put(OnsTraceConstants.MaxBatchNum, "100"); tempProperties.put(OnsTraceConstants.NAMESRV_ADDR, this.getNameServerAddr()); tempProperties.put(OnsTraceConstants.InstanceName, "PID_CLIENT_INNER_TRACE_PRODUCER"); tempProperties.put(OnsTraceConstants.TraceDispatcherType, OnsTraceDispatcherType.PRODUCER.name()); - AsyncArrayDispatcher dispatcher = new AsyncArrayDispatcher(tempProperties, sessionCredentials); + AsyncArrayDispatcher dispatcher = new AsyncArrayDispatcher(tempProperties, new AclClientRPCHook(sessionCredentials)); dispatcher.setHostProducer(transactionMQProducer.getDefaultMQProducerImpl()); traceDispatcher = dispatcher; this.transactionMQProducer.getDefaultMQProducerImpl().registerSendMessageHook( diff --git a/ons-core/ons-trace-core/pom.xml b/ons-core/ons-trace-core/pom.xml index 422b192..32317fc 100644 --- a/ons-core/ons-trace-core/pom.xml +++ b/ons-core/ons-trace-core/pom.xml @@ -29,11 +29,6 @@ <name>ons-trace-core ${project.version}</name> <dependencies> <dependency> - <groupId>${project.groupId}</groupId> - <artifactId>ons-auth4client</artifactId> - <version>${project.version}</version> - </dependency> - <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> </dependency> diff --git a/ons-core/ons-trace-core/src/main/java/org/apache/rocketmq/ons/open/trace/core/dispatch/impl/AsyncArrayDispatcher.java b/ons-core/ons-trace-core/src/main/java/org/apache/rocketmq/ons/open/trace/core/dispatch/impl/AsyncArrayDispatcher.java index f3a456d..601f6df 100644 --- a/ons-core/ons-trace-core/src/main/java/org/apache/rocketmq/ons/open/trace/core/dispatch/impl/AsyncArrayDispatcher.java +++ b/ons-core/ons-trace-core/src/main/java/org/apache/rocketmq/ons/open/trace/core/dispatch/impl/AsyncArrayDispatcher.java @@ -46,7 +46,6 @@ import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.protocol.NamespaceUtil; import org.apache.rocketmq.logging.InternalLogger; -import org.apache.rocketmq.ons.api.impl.authority.SessionCredentials; import org.apache.rocketmq.ons.open.trace.core.common.OnsTraceConstants; import org.apache.rocketmq.ons.open.trace.core.common.OnsTraceContext; import org.apache.rocketmq.ons.open.trace.core.common.OnsTraceDataEncoder; @@ -75,20 +74,13 @@ public class AsyncArrayDispatcher implements AsyncDispatcher { private String dispatcherId = UUID.randomUUID().toString(); private String customizedTraceTopic; + /** + * Create AsyncArrayDispatcher with acl RPC hook. + * + * @param properties + * @param rpcHook RPC hook only can be set with AclRPCHook + */ public AsyncArrayDispatcher(Properties properties, RPCHook rpcHook) { - this(properties, null, rpcHook); - traceProducer = TraceProducerFactory.getTraceDispatcherProducer(properties, rpcHook); - } - - public AsyncArrayDispatcher(Properties properties) { - this(properties, null, null); - } - - public AsyncArrayDispatcher(Properties properties, SessionCredentials sessionCredentials) { - this(properties, sessionCredentials, null); - } - - public AsyncArrayDispatcher(Properties properties, SessionCredentials sessionCredentials, RPCHook rpcHook) { dispatcherType = properties.getProperty(OnsTraceConstants.TraceDispatcherType); this.customizedTraceTopic = properties.getProperty(OnsTraceConstants.CustomizedTraceTopic); int queueSize = Integer.parseInt(properties.getProperty(OnsTraceConstants.AsyncBufferSize, "2048")); @@ -106,15 +98,7 @@ public class AsyncArrayDispatcher implements AsyncDispatcher { TimeUnit.MILLISECONDS, // this.appenderQueue, // new ThreadFactoryImpl("MQTraceSendThread_")); - if (sessionCredentials == null && rpcHook == null) { - traceProducer = TraceProducerFactory.getTraceDispatcherProducer(properties); - } - if (properties != null && rpcHook != null) { - traceProducer = TraceProducerFactory.getTraceDispatcherProducer(properties, rpcHook); - } - if (properties != null && sessionCredentials != null) { - traceProducer = TraceProducerFactory.getTraceDispatcherProducer(properties, sessionCredentials); - } + traceProducer = TraceProducerFactory.getTraceDispatcherProducer(properties, rpcHook); } public DefaultMQProducerImpl getHostProducer() { diff --git a/ons-core/ons-trace-core/src/main/java/org/apache/rocketmq/ons/open/trace/core/dispatch/impl/TraceProducerFactory.java b/ons-core/ons-trace-core/src/main/java/org/apache/rocketmq/ons/open/trace/core/dispatch/impl/TraceProducerFactory.java index 187637f..4a75f7b 100644 --- a/ons-core/ons-trace-core/src/main/java/org/apache/rocketmq/ons/open/trace/core/dispatch/impl/TraceProducerFactory.java +++ b/ons-core/ons-trace-core/src/main/java/org/apache/rocketmq/ons/open/trace/core/dispatch/impl/TraceProducerFactory.java @@ -24,8 +24,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.common.namesrv.TopAddressing; -import org.apache.rocketmq.ons.api.impl.authority.SessionCredentials; -import org.apache.rocketmq.ons.api.impl.rocketmq.ClientRPCHook; import org.apache.rocketmq.ons.open.trace.core.common.OnsTraceConstants; import org.apache.rocketmq.remoting.RPCHook; @@ -34,39 +32,9 @@ public class TraceProducerFactory { private static AtomicBoolean isStarted = new AtomicBoolean(false); private static DefaultMQProducer traceProducer; - public static DefaultMQProducer getTraceDispatcherProducer(Properties properties) { - if (traceProducer == null) { - String accessKey = properties.getProperty(OnsTraceConstants.AccessKey); - String secretKey = properties.getProperty(OnsTraceConstants.SecretKey); - if (accessKey != null && secretKey != null) { - SessionCredentials sessionCredentials = new SessionCredentials(); - Properties sessionProperties = new Properties(); - sessionProperties.put(OnsTraceConstants.AccessKey, accessKey); - sessionProperties.put(OnsTraceConstants.SecretKey, secretKey); - sessionCredentials.updateContent(sessionProperties); - traceProducer = new DefaultMQProducer(new ClientRPCHook(sessionCredentials)); - } else { - traceProducer = new DefaultMQProducer(); - } - traceProducer.setProducerGroup(accessKey + OnsTraceConstants.groupName); - traceProducer.setSendMsgTimeout(5000); - traceProducer.setInstanceName(properties.getProperty(OnsTraceConstants.InstanceName, String.valueOf(System.currentTimeMillis()))); - String nameSrv = properties.getProperty(OnsTraceConstants.NAMESRV_ADDR); - if (nameSrv == null) { - TopAddressing topAddressing = new TopAddressing(properties.getProperty(OnsTraceConstants.ADDRSRV_URL)); - nameSrv = topAddressing.fetchNSAddr(); - } - traceProducer.setNamesrvAddr(nameSrv); - traceProducer.setVipChannelEnabled(false); - int maxSize = Integer.parseInt(properties.getProperty(OnsTraceConstants.MaxMsgSize, "128000")); - traceProducer.setMaxMessageSize(maxSize - 10 * 1000); - } - return traceProducer; - } - public static DefaultMQProducer getTraceDispatcherProducer(Properties properties, RPCHook rpcHook) { if (traceProducer == null) { - traceProducer = new DefaultMQProducer(rpcHook); + traceProducer = new DefaultMQProducer(rpcHook); //RPC hook only can be set with AclRPCHook traceProducer.setProducerGroup(OnsTraceConstants.groupName); traceProducer.setSendMsgTimeout(5000); traceProducer.setInstanceName(properties.getProperty(OnsTraceConstants.InstanceName, String.valueOf(System.currentTimeMillis()))); @@ -83,27 +51,6 @@ public class TraceProducerFactory { return traceProducer; } - public static DefaultMQProducer getTraceDispatcherProducer(Properties properties, - SessionCredentials sessionCredentials) { - if (traceProducer == null) { - String accessKey = properties.getProperty(OnsTraceConstants.AccessKey); - traceProducer = new DefaultMQProducer(new ClientRPCHook(sessionCredentials)); - traceProducer.setProducerGroup(accessKey.replace('.', '-') + OnsTraceConstants.groupName); - traceProducer.setSendMsgTimeout(5000); - traceProducer.setInstanceName(properties.getProperty(OnsTraceConstants.InstanceName, String.valueOf(System.currentTimeMillis()))); - String nameSrv = properties.getProperty(OnsTraceConstants.NAMESRV_ADDR); - if (nameSrv == null) { - TopAddressing topAddressing = new TopAddressing(properties.getProperty(OnsTraceConstants.ADDRSRV_URL)); - nameSrv = topAddressing.fetchNSAddr(); - } - traceProducer.setNamesrvAddr(nameSrv); - traceProducer.setVipChannelEnabled(false); - int maxSize = Integer.parseInt(properties.getProperty(OnsTraceConstants.MaxMsgSize, "128000")); - traceProducer.setMaxMessageSize(maxSize - 10 * 1000); - } - return traceProducer; - } - public static void registerTraceDispatcher(String dispatcherId) throws MQClientException { dispatcherTable.put(dispatcherId, new Object()); if (traceProducer != null && isStarted.compareAndSet(false, true)) { diff --git a/ons-core/ons-trace-core/src/main/java/org/apache/rocketmq/ons/open/trace/core/hook/AbstractRPCHook.java b/ons-core/ons-trace-core/src/main/java/org/apache/rocketmq/ons/open/trace/core/hook/AbstractRPCHook.java deleted file mode 100644 index 9e78880..0000000 --- a/ons-core/ons-trace-core/src/main/java/org/apache/rocketmq/ons/open/trace/core/hook/AbstractRPCHook.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.ons.open.trace.core.hook; - -import java.lang.reflect.Field; -import java.util.SortedMap; -import java.util.TreeMap; -import java.util.concurrent.ConcurrentHashMap; - -import org.apache.rocketmq.remoting.CommandCustomHeader; -import org.apache.rocketmq.remoting.RPCHook; -import org.apache.rocketmq.remoting.protocol.RemotingCommand; - -import static org.apache.rocketmq.ons.api.impl.authority.SessionCredentials.AccessKey; -import static org.apache.rocketmq.ons.api.impl.authority.SessionCredentials.ONSChannelKey; - -public abstract class AbstractRPCHook implements RPCHook { - protected ConcurrentHashMap<Class<? extends CommandCustomHeader>, Field[]> fieldCache = - new ConcurrentHashMap<Class<? extends CommandCustomHeader>, Field[]>(); - - - protected SortedMap<String, String> parseRequestContent(RemotingCommand request, String ak, String onsChannel) { - CommandCustomHeader header = request.readCustomHeader(); - // sort property - SortedMap<String, String> map = new TreeMap<String, String>(); - map.put(AccessKey, ak); - map.put(ONSChannelKey, onsChannel); - try { - // add header properties - if (null != header) { - Field[] fields = fieldCache.get(header.getClass()); - if (null == fields) { - fields = header.getClass().getDeclaredFields(); - for (Field field : fields) { - field.setAccessible(true); - } - Field[] tmp = fieldCache.putIfAbsent(header.getClass(), fields); - if (null != tmp) { - fields = tmp; - } - } - - for (Field field : fields) { - Object value = field.get(header); - if (null != value && !field.isSynthetic()) { - map.put(field.getName(), value.toString()); - } - } - } - return map; - } - catch (Exception e) { - throw new RuntimeException("incompatible exception.", e); - } - } - -} diff --git a/ons-core/pom.xml b/ons-core/pom.xml index cd82803..dcd2f46 100644 --- a/ons-core/pom.xml +++ b/ons-core/pom.xml @@ -246,6 +246,11 @@ <version>${rocketmq.version}</version> </dependency> <dependency> + <groupId>org.apache.rocketmq</groupId> + <artifactId>rocketmq-acl</artifactId> + <version>${rocketmq.version}</version> + </dependency> + <dependency> <groupId>${project.groupId}</groupId> <artifactId>ons-auth4client</artifactId> <version>${project.version}</version>
