This is an automated email from the ASF dual-hosted git repository. gosonzhang pushed a commit to branch TUBEMQ-570 in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
commit 1c89454111c021ab58dae7629ec346da6be87449 Author: gosonzhang <[email protected]> AuthorDate: Fri May 28 16:53:13 2021 +0800 [INLONG-618] Add unit tests for metastore.dao.entity.* --- .../metastore/dao/entity/BaseEntity.java | 27 +++--- .../metastore/dao/entity/BrokerConfEntity.java | 37 ++++---- .../dao/entity/GroupConsumeCtrlEntity.java | 19 ++-- .../metastore/dao/entity/GroupResCtrlEntity.java | 23 +++-- .../metastore/dao/entity/TopicCtrlEntity.java | 19 ++-- .../metastore/dao/entity/TopicDeployEntity.java | 25 +++--- .../metastore/dao/entity/TopicPropGroup.java | 47 +++++----- .../master/metamanage/online/BrokerRunData.java | 100 --------------------- .../metastore/dao/entity/BaseEntityTest.java | 2 - 9 files changed, 92 insertions(+), 207 deletions(-) diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/BaseEntity.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/BaseEntity.java index fab9544..2601083 100644 --- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/BaseEntity.java +++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/BaseEntity.java @@ -176,7 +176,15 @@ public class BaseEntity implements Serializable, Cloneable { } protected void updSerialId() { - this.serialId.set(System.currentTimeMillis()); + long curSerialId; + long newSerialId; + do { + curSerialId = this.serialId.get(); + newSerialId = System.currentTimeMillis(); + if (newSerialId == curSerialId) { + newSerialId++; + } + } while (!this.serialId.compareAndSet(curSerialId, newSerialId)); } public String getModifyUser() { @@ -215,15 +223,12 @@ public class BaseEntity implements Serializable, Cloneable { if (target == null) { return true; } - if ((target.getDataVerId() != TBaseConstants.META_VALUE_UNDEFINED - && this.getDataVerId() != target.getDataVerId()) - || (TStringUtils.isNotBlank(target.getCreateUser()) - && !target.getCreateUser().equals(createUser)) - || (TStringUtils.isNotBlank(target.getModifyUser()) - && !target.getModifyUser().equals(modifyUser))) { - return false; - } - return true; + return (target.getDataVerId() == TBaseConstants.META_VALUE_UNDEFINED + || this.getDataVerId() == target.getDataVerId()) + && (TStringUtils.isBlank(target.getCreateUser()) + || target.getCreateUser().equals(createUser)) + && (TStringUtils.isBlank(target.getModifyUser()) + || target.getModifyUser().equals(modifyUser)); } /** @@ -233,7 +238,7 @@ public class BaseEntity implements Serializable, Cloneable { * @param isLongName if return field key is long name * @return */ - StringBuilder toWebJsonStr(StringBuilder sBuilder, boolean isLongName) { + public StringBuilder toWebJsonStr(StringBuilder sBuilder, boolean isLongName) { if (isLongName) { sBuilder.append(",\"dataVersionId\":").append(dataVersionId) .append(",\"serialId\":").append(serialId.get()) diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/BrokerConfEntity.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/BrokerConfEntity.java index 05216b1..7f49e68 100644 --- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/BrokerConfEntity.java +++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/BrokerConfEntity.java @@ -309,26 +309,23 @@ public class BrokerConfEntity extends BaseEntity implements Cloneable { if (!super.isMatched(target)) { return false; } - if ((target.getBrokerId() != TBaseConstants.META_VALUE_UNDEFINED - && target.getBrokerId() != this.brokerId) - || (TStringUtils.isNotBlank(target.getBrokerIp()) - && !target.getBrokerIp().equals(this.brokerIp)) - || (target.getBrokerPort() != TBaseConstants.META_VALUE_UNDEFINED - && target.getBrokerPort() != this.brokerPort) - || (target.getBrokerTLSPort() != TBaseConstants.META_VALUE_UNDEFINED - && target.getBrokerTLSPort() != this.brokerTLSPort) - || (target.getRegionId() != TBaseConstants.META_VALUE_UNDEFINED - && target.getRegionId() != this.regionId) - || (target.getGroupId() != TBaseConstants.META_VALUE_UNDEFINED - && target.getGroupId() != this.groupId) - || (target.getManageStatus() != ManageStatus.STATUS_MANAGE_UNDEFINED - && target.getManageStatus() != this.manageStatus) - || (target.getBrokerWebPort() != TBaseConstants.META_VALUE_UNDEFINED - && target.getBrokerWebPort() != this.brokerWebPort) - || !this.topicProps.isMatched(target.getTopicProps())) { - return false; - } - return true; + return (target.getBrokerId() == TBaseConstants.META_VALUE_UNDEFINED + || target.getBrokerId() == this.brokerId) + && (TStringUtils.isBlank(target.getBrokerIp()) + || target.getBrokerIp().equals(this.brokerIp)) + && (target.getBrokerPort() == TBaseConstants.META_VALUE_UNDEFINED + || target.getBrokerPort() == this.brokerPort) + && (target.getBrokerTLSPort() == TBaseConstants.META_VALUE_UNDEFINED + || target.getBrokerTLSPort() == this.brokerTLSPort) + && (target.getRegionId() == TBaseConstants.META_VALUE_UNDEFINED + || target.getRegionId() == this.regionId) + && (target.getGroupId() == TBaseConstants.META_VALUE_UNDEFINED + || target.getGroupId() == this.groupId) + && (target.getManageStatus() == ManageStatus.STATUS_MANAGE_UNDEFINED + || target.getManageStatus() == this.manageStatus) + && (target.getBrokerWebPort() == TBaseConstants.META_VALUE_UNDEFINED + || target.getBrokerWebPort() == this.brokerWebPort) + && this.topicProps.isMatched(target.getTopicProps()); } /** diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/GroupConsumeCtrlEntity.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/GroupConsumeCtrlEntity.java index 477bab8..a0bc7bc 100644 --- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/GroupConsumeCtrlEntity.java +++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/GroupConsumeCtrlEntity.java @@ -216,17 +216,14 @@ public class GroupConsumeCtrlEntity extends BaseEntity implements Cloneable { if (!super.isMatched(target)) { return false; } - if ((TStringUtils.isNotBlank(target.getTopicName()) - && !target.getTopicName().equals(this.topicName)) - || (TStringUtils.isNotBlank(target.getGroupName()) - && !target.getGroupName().equals(this.groupName)) - || (target.getConsumeEnable() != EnableStatus.STATUS_UNDEFINE - && target.getConsumeEnable() != this.consumeEnable) - || (target.getFilterEnable() != EnableStatus.STATUS_UNDEFINE - && target.getFilterEnable() != this.filterEnable)) { - return false; - } - return true; + return (TStringUtils.isBlank(target.getTopicName()) + || target.getTopicName().equals(this.topicName)) + && (TStringUtils.isBlank(target.getGroupName()) + || target.getGroupName().equals(this.groupName)) + && (target.getConsumeEnable() == EnableStatus.STATUS_UNDEFINE + || target.getConsumeEnable() == this.consumeEnable) + && (target.getFilterEnable() == EnableStatus.STATUS_UNDEFINE + || target.getFilterEnable() == this.filterEnable); } /** diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/GroupResCtrlEntity.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/GroupResCtrlEntity.java index a66fb74..456b024 100644 --- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/GroupResCtrlEntity.java +++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/GroupResCtrlEntity.java @@ -244,19 +244,16 @@ public class GroupResCtrlEntity extends BaseEntity implements Cloneable { if (!super.isMatched(target)) { return false; } - if ((target.getQryPriorityId() != TBaseConstants.META_VALUE_UNDEFINED - && target.getQryPriorityId() != this.qryPriorityId) - || (TStringUtils.isNotBlank(target.getGroupName()) - && !target.getGroupName().equals(this.groupName)) - || (target.getResCheckStatus() != EnableStatus.STATUS_UNDEFINE - && target.getResCheckStatus() != this.resCheckStatus) - || (target.getFlowCtrlStatus() != EnableStatus.STATUS_UNDEFINE - && target.getFlowCtrlStatus() != this.flowCtrlStatus) - || (target.getAllowedBrokerClientRate() != TBaseConstants.META_VALUE_UNDEFINED - && target.getAllowedBrokerClientRate() != this.allowedBrokerClientRate)) { - return false; - } - return true; + return (target.getQryPriorityId() == TBaseConstants.META_VALUE_UNDEFINED + || target.getQryPriorityId() == this.qryPriorityId) + && (TStringUtils.isBlank(target.getGroupName()) + || target.getGroupName().equals(this.groupName)) + && (target.getResCheckStatus() == EnableStatus.STATUS_UNDEFINE + || target.getResCheckStatus() == this.resCheckStatus) + && (target.getFlowCtrlStatus() == EnableStatus.STATUS_UNDEFINE + || target.getFlowCtrlStatus() == this.flowCtrlStatus) + && (target.getAllowedBrokerClientRate() == TBaseConstants.META_VALUE_UNDEFINED + || target.getAllowedBrokerClientRate() == this.allowedBrokerClientRate); } /** diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/TopicCtrlEntity.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/TopicCtrlEntity.java index 13a01e8..de880c7 100644 --- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/TopicCtrlEntity.java +++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/TopicCtrlEntity.java @@ -188,17 +188,14 @@ public class TopicCtrlEntity extends BaseEntity implements Cloneable { if (!super.isMatched(target)) { return false; } - if ((target.getMaxMsgSizeInB() != TBaseConstants.META_VALUE_UNDEFINED - && target.getMaxMsgSizeInB() != this.maxMsgSizeInB) - || (TStringUtils.isNotBlank(target.getTopicName()) - && !target.getTopicName().equals(this.topicName)) - || (target.getAuthCtrlStatus() != EnableStatus.STATUS_UNDEFINE - && target.getAuthCtrlStatus() != this.authCtrlStatus) - || (target.getTopicId() != TBaseConstants.META_VALUE_UNDEFINED - && target.getTopicId() != this.topicNameId)) { - return false; - } - return true; + return (target.getMaxMsgSizeInB() == TBaseConstants.META_VALUE_UNDEFINED + || target.getMaxMsgSizeInB() == this.maxMsgSizeInB) + && (TStringUtils.isBlank(target.getTopicName()) + || target.getTopicName().equals(this.topicName)) + && (target.getAuthCtrlStatus() == EnableStatus.STATUS_UNDEFINE + || target.getAuthCtrlStatus() == this.authCtrlStatus) + && (target.getTopicId() == TBaseConstants.META_VALUE_UNDEFINED + || target.getTopicId() == this.topicNameId); } /** diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/TopicDeployEntity.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/TopicDeployEntity.java index a2f6bd9..c110c0a 100644 --- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/TopicDeployEntity.java +++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/TopicDeployEntity.java @@ -257,20 +257,17 @@ public class TopicDeployEntity extends BaseEntity implements Cloneable { if (!super.isMatched(target)) { return false; } - if ((target.getBrokerId() != TBaseConstants.META_VALUE_UNDEFINED - && target.getBrokerId() != this.brokerId) - || (target.getTopicId() != TBaseConstants.META_VALUE_UNDEFINED - && target.getTopicId() != this.topicNameId) - || (TStringUtils.isNotBlank(target.getTopicName()) - && !target.getTopicName().equals(this.topicName)) - || (TStringUtils.isNotBlank(target.getBrokerIp()) - && !target.getBrokerIp().equals(this.brokerIp)) - || !topicProps.isMatched(target.topicProps) - || (target.getTopicStatus() != TopicStatus.STATUS_TOPIC_UNDEFINED - && target.getTopicStatus() != this.deployStatus)) { - return false; - } - return true; + return (target.getBrokerId() == TBaseConstants.META_VALUE_UNDEFINED + || target.getBrokerId() == this.brokerId) + && (target.getTopicId() == TBaseConstants.META_VALUE_UNDEFINED + || target.getTopicId() == this.topicNameId) + && (TStringUtils.isBlank(target.getTopicName()) + || target.getTopicName().equals(this.topicName)) + && (TStringUtils.isBlank(target.getBrokerIp()) + || target.getBrokerIp().equals(this.brokerIp)) + && topicProps.isMatched(target.topicProps) + && (target.getTopicStatus() == TopicStatus.STATUS_TOPIC_UNDEFINED + || target.getTopicStatus() == this.deployStatus); } /** diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/TopicPropGroup.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/TopicPropGroup.java index 27db4f9..180ac8f 100644 --- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/TopicPropGroup.java +++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/TopicPropGroup.java @@ -209,31 +209,28 @@ public class TopicPropGroup implements Serializable, Cloneable { if (target == null) { return true; } - if ((target.getNumTopicStores() != TBaseConstants.META_VALUE_UNDEFINED - && target.getNumTopicStores() != this.numTopicStores) - || (target.getNumPartitions() != TBaseConstants.META_VALUE_UNDEFINED - && target.getNumPartitions() != this.numPartitions) - || (target.getUnflushThreshold() != TBaseConstants.META_VALUE_UNDEFINED - && target.getUnflushThreshold() != this.unflushThreshold) - || (target.getUnflushInterval() != TBaseConstants.META_VALUE_UNDEFINED - && target.getUnflushInterval() != this.unflushInterval) - || (target.getUnflushDataHold() != TBaseConstants.META_VALUE_UNDEFINED - && target.getUnflushDataHold() != this.unflushDataHold) - || (target.getMemCacheMsgSizeInMB() != TBaseConstants.META_VALUE_UNDEFINED - && target.getMemCacheMsgSizeInMB() != this.memCacheMsgSizeInMB) - || (target.getMemCacheMsgCntInK() != TBaseConstants.META_VALUE_UNDEFINED - && target.getMemCacheMsgCntInK() != this.memCacheMsgCntInK) - || (target.getMemCacheFlushIntvl() != TBaseConstants.META_VALUE_UNDEFINED - && target.getMemCacheFlushIntvl() != this.memCacheFlushIntvl) - || (target.getAcceptPublish() != null - && target.getAcceptPublish() != this.acceptPublish) - || (target.getAcceptSubscribe() != null - && target.getAcceptSubscribe() != this.acceptSubscribe) - || (TStringUtils.isNotBlank(target.getDeletePolicy()) - && !target.getDeletePolicy().equals(this.deletePolicy))) { - return false; - } - return true; + return (target.getNumTopicStores() == TBaseConstants.META_VALUE_UNDEFINED + || target.getNumTopicStores() == this.numTopicStores) + && (target.getNumPartitions() == TBaseConstants.META_VALUE_UNDEFINED + || target.getNumPartitions() == this.numPartitions) + && (target.getUnflushThreshold() == TBaseConstants.META_VALUE_UNDEFINED + || target.getUnflushThreshold() == this.unflushThreshold) + && (target.getUnflushInterval() == TBaseConstants.META_VALUE_UNDEFINED + || target.getUnflushInterval() == this.unflushInterval) + && (target.getUnflushDataHold() == TBaseConstants.META_VALUE_UNDEFINED + || target.getUnflushDataHold() == this.unflushDataHold) + && (target.getMemCacheMsgSizeInMB() == TBaseConstants.META_VALUE_UNDEFINED + || target.getMemCacheMsgSizeInMB() == this.memCacheMsgSizeInMB) + && (target.getMemCacheMsgCntInK() == TBaseConstants.META_VALUE_UNDEFINED + || target.getMemCacheMsgCntInK() == this.memCacheMsgCntInK) + && (target.getMemCacheFlushIntvl() == TBaseConstants.META_VALUE_UNDEFINED + || target.getMemCacheFlushIntvl() == this.memCacheFlushIntvl) + && (target.getAcceptPublish() == null + || target.getAcceptPublish() == this.acceptPublish) + && (target.getAcceptSubscribe() == null + || target.getAcceptSubscribe() == this.acceptSubscribe) + && (TStringUtils.isBlank(target.getDeletePolicy()) + || target.getDeletePolicy().equals(this.deletePolicy)); } /** diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/online/BrokerRunData.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/online/BrokerRunData.java deleted file mode 100644 index acb5854..0000000 --- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/metamanage/online/BrokerRunData.java +++ /dev/null @@ -1,100 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tubemq.server.master.metamanage.online; - -import java.util.List; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicLong; - -import org.apache.tubemq.corebase.TBaseConstants; -import org.apache.tubemq.corebase.utils.Tuple2; -import org.apache.tubemq.server.master.metamanage.metastore.dao.entity.BrokerConfEntity; - - - - -public class BrokerRunData { - - private final ConcurrentHashMap<Integer, String> brokersMap = - new ConcurrentHashMap<>(); - private final ConcurrentHashMap<Integer, String> brokersTLSMap = - new ConcurrentHashMap<>(); - private AtomicLong brokerInfoChkId = new AtomicLong(System.currentTimeMillis()); - private long lastBrokerUpdatedTime = System.currentTimeMillis(); - - public BrokerRunData() { - - } - - public Tuple2<Long, ConcurrentHashMap<Integer, String>> getBrokerCurRunData( - boolean isOverTLS) { - if (isOverTLS) { - return new Tuple2<>(brokerInfoChkId.get(), brokersTLSMap); - } else { - return new Tuple2<>(brokerInfoChkId.get(), brokersMap); - } - } - - public void updateBrokerRunData(List<BrokerConfEntity> entities) { - if (entities == null || entities.isEmpty()) { - return; - } - for (BrokerConfEntity entity : entities) { - updBrokerRunData(entity); - } - } - - public void updBrokerRunData(BrokerConfEntity entity) { - if (entity == null) { - return; - } - String brokerReg = - this.brokersMap.putIfAbsent(entity.getBrokerId(), - entity.getSimpleBrokerInfo()); - String brokerTLSReg = - this.brokersTLSMap.putIfAbsent(entity.getBrokerId(), - entity.getSimpleTLSBrokerInfo()); - if (brokerReg == null - || brokerTLSReg == null - || !brokerReg.equals(entity.getSimpleBrokerInfo()) - || !brokerTLSReg.equals(entity.getSimpleTLSBrokerInfo())) { - if (brokerReg != null - && !brokerReg.equals(entity.getSimpleBrokerInfo())) { - this.brokersMap.put(entity.getBrokerId(), entity.getSimpleBrokerInfo()); - } - if (brokerTLSReg != null - && !brokerTLSReg.equals(entity.getSimpleTLSBrokerInfo())) { - this.brokersTLSMap.put(entity.getBrokerId(), entity.getSimpleTLSBrokerInfo()); - } - this.lastBrokerUpdatedTime = System.currentTimeMillis(); - this.brokerInfoChkId.set(this.lastBrokerUpdatedTime); - } - } - - public void delBrokerRunData(int brokerId) { - if (brokerId == TBaseConstants.META_VALUE_UNDEFINED) { - return; - } - String brokerReg = this.brokersMap.remove(brokerId); - String brokerTLSReg = this.brokersTLSMap.remove(brokerId); - if (brokerReg != null || brokerTLSReg != null) { - this.lastBrokerUpdatedTime = System.currentTimeMillis(); - this.brokerInfoChkId.set(this.lastBrokerUpdatedTime); - } - } -} diff --git a/tubemq-server/src/test/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/BaseEntityTest.java b/tubemq-server/src/test/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/BaseEntityTest.java index b71f1cd..392db4d 100644 --- a/tubemq-server/src/test/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/BaseEntityTest.java +++ b/tubemq-server/src/test/java/org/apache/tubemq/server/master/metamanage/metastore/dao/entity/BaseEntityTest.java @@ -19,7 +19,6 @@ package org.apache.tubemq.server.master.metamanage.metastore.dao.entity; import java.util.Date; import org.apache.tubemq.corebase.TBaseConstants; -import org.apache.tubemq.corebase.utils.ThreadUtils; import org.apache.tubemq.server.common.TServerConstants; import org.apache.tubemq.server.common.utils.WebParameterUtils; import org.junit.Assert; @@ -138,7 +137,6 @@ public class BaseEntityTest { // case 9 BaseEntity baseEntity9 = baseEntity6.clone(); Assert.assertEquals(baseEntity9, baseEntity6); - ThreadUtils.sleep(2000); baseEntity9.updSerialId(); baseEntity9.setDataVersionId(222223333); baseEntity9.setAttributes("aaaaabbbbccccddd");
