This is an automated email from the ASF dual-hosted git repository. yuanbo pushed a commit to branch TUBEMQ-421 in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git
commit e8bd45a148ce617056b5559635351f3a3da0d00a Author: gosonzhang <[email protected]> AuthorDate: Thu Jan 7 11:02:29 2021 +0800 [TUBEMQ-495]Code implementation adjustment based on SpotBugs check --- .../tubemq/client/config/TubeClientConfig.java | 2 +- .../client/consumer/MessageFetchManager.java | 5 +++- .../tubemq/client/consumer/RmtDataCache.java | 4 +-- .../apache/tubemq/corebase/utils/AddressUtils.java | 12 ++++---- .../org/apache/tubemq/corebase/utils/Tuple2.java | 12 ++++++-- .../org/apache/tubemq/corebase/utils/Tuple3.java | 18 ++++++++++-- .../tubemq/corerpc/AbstractServiceInvoker.java | 2 +- .../apache/tubemq/corerpc/RpcServiceFactory.java | 4 +-- .../tubemq/example/MAMessageProducerExample.java | 6 ++-- .../tubemq/example/MessageProducerExample.java | 6 ++-- .../server/broker/offset/DefaultOffsetManager.java | 24 ++++++++-------- .../server/broker/stats/GroupCountService.java | 2 +- .../server/broker/utils/GroupOffsetInfo.java | 4 +-- .../server/broker/web/BrokerAdminServlet.java | 15 ++++------ .../tubemq/server/common/webbase/WebFieldType.java | 4 +-- .../org/apache/tubemq/server/master/TMaster.java | 32 +++++++++++----------- .../master/bdbstore/DefaultBdbStoreService.java | 4 +-- .../nodemanage/nodebroker/BrokerInfoHolder.java | 4 +-- .../tubemq/server/tools/cli/CliProducer.java | 6 ++-- 19 files changed, 91 insertions(+), 75 deletions(-) diff --git a/tubemq-client/src/main/java/org/apache/tubemq/client/config/TubeClientConfig.java b/tubemq-client/src/main/java/org/apache/tubemq/client/config/TubeClientConfig.java index b1bd3b1..437b171 100644 --- a/tubemq-client/src/main/java/org/apache/tubemq/client/config/TubeClientConfig.java +++ b/tubemq-client/src/main/java/org/apache/tubemq/client/config/TubeClientConfig.java @@ -101,7 +101,7 @@ public class TubeClientConfig { throw new IllegalArgumentException("Illegal parameter: masterAddrInfo is null!"); } this.masterInfo = masterInfo.clone(); - String iPv4LocalAddress = AddressUtils.getIPV4LocalAddress(); + AddressUtils.getIPV4LocalAddress(); } @Deprecated diff --git a/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/MessageFetchManager.java b/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/MessageFetchManager.java index 6afb8a9..80279ff 100644 --- a/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/MessageFetchManager.java +++ b/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/MessageFetchManager.java @@ -216,7 +216,10 @@ public class MessageFetchManager { sBuilder.delete(0, sBuilder.length()); } fetchWorkerStatusMap.put(curThreadId, 2); - MessageFetchManager.this.pushConsumer.processRequest(partSelectResult, sBuilder); + if (partSelectResult != null) { + MessageFetchManager.this.pushConsumer.processRequest( + partSelectResult, sBuilder); + } } fetchWorkerStatusMap.remove(curThreadId); } diff --git a/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/RmtDataCache.java b/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/RmtDataCache.java index 745f127..a222913 100644 --- a/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/RmtDataCache.java +++ b/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/RmtDataCache.java @@ -423,8 +423,8 @@ public class RmtDataCache implements Closeable { if (frozenTime == null) { if (waitDlt > 10) { TimeoutTask timeoutTask = new TimeoutTask(partitionKey); - timeouts.put(partitionKey, - timer.newTimeout(timeoutTask, waitDlt, TimeUnit.MILLISECONDS)); + timeouts.put(partitionKey, timer.newTimeout( + timeoutTask, waitDlt, TimeUnit.MILLISECONDS)); } else { releaseIdlePartition(partitionKey); } diff --git a/tubemq-core/src/main/java/org/apache/tubemq/corebase/utils/AddressUtils.java b/tubemq-core/src/main/java/org/apache/tubemq/corebase/utils/AddressUtils.java index 5a76af2..bc14e5f 100644 --- a/tubemq-core/src/main/java/org/apache/tubemq/corebase/utils/AddressUtils.java +++ b/tubemq-core/src/main/java/org/apache/tubemq/corebase/utils/AddressUtils.java @@ -58,12 +58,12 @@ public class AddressUtils { try { Tuple2<Boolean, String> result = getValidIPV4Address(allInterface.nextElement(), currLocalHost); - if (result.f0) { + if (result.getF0()) { localIPAddress = currLocalHost; return true; } if (TStringUtils.isEmpty(fstV4IP)) { - fstV4IP = result.f1; + fstV4IP = result.getF1(); } } catch (Throwable e) { // @@ -153,8 +153,8 @@ public class AddressUtils { try { Tuple2<Boolean, String> result = getValidIPV4Address(enumeration.nextElement(), null); - if (result.f0) { - tmpAdress = result.f1; + if (result.getF0()) { + tmpAdress = result.getF1(); break; } } catch (Throwable e) { @@ -196,8 +196,8 @@ public class AddressUtils { try { Tuple2<Boolean, String> result = getValidIPV4Address(oneInterface, null); - if (result.f0) { - localIPAddress = result.f1; + if (result.getF0()) { + localIPAddress = result.getF1(); return localIPAddress; } } catch (Throwable e) { diff --git a/tubemq-core/src/main/java/org/apache/tubemq/corebase/utils/Tuple2.java b/tubemq-core/src/main/java/org/apache/tubemq/corebase/utils/Tuple2.java index f5626f8..048452f 100644 --- a/tubemq-core/src/main/java/org/apache/tubemq/corebase/utils/Tuple2.java +++ b/tubemq-core/src/main/java/org/apache/tubemq/corebase/utils/Tuple2.java @@ -20,9 +20,9 @@ package org.apache.tubemq.corebase.utils; public class Tuple2<T0, T1> { /** Field 0 of the tuple. */ - public T0 f0 = null; + private T0 f0 = null; /** Field 1 of the tuple. */ - public T1 f1 = null; + private T1 f1 = null; /** * Creates a new tuple where all fields are null. @@ -50,4 +50,12 @@ public class Tuple2<T0, T1> { this.f0 = value0; this.f1 = value1; } + + public T0 getF0() { + return f0; + } + + public T1 getF1() { + return f1; + } } diff --git a/tubemq-core/src/main/java/org/apache/tubemq/corebase/utils/Tuple3.java b/tubemq-core/src/main/java/org/apache/tubemq/corebase/utils/Tuple3.java index a2d98c3..579b425 100644 --- a/tubemq-core/src/main/java/org/apache/tubemq/corebase/utils/Tuple3.java +++ b/tubemq-core/src/main/java/org/apache/tubemq/corebase/utils/Tuple3.java @@ -20,11 +20,11 @@ package org.apache.tubemq.corebase.utils; public class Tuple3<T0, T1, T2> { /** Field 0 of the tuple. */ - public T0 f0 = null; + private T0 f0 = null; /** Field 1 of the tuple. */ - public T1 f1 = null; + private T1 f1 = null; /** Field 2 of the tuple. */ - public T2 f2 = null; + private T2 f2 = null; /** * Creates a new tuple where all fields are null. @@ -45,4 +45,16 @@ public class Tuple3<T0, T1, T2> { this.f1 = value1; this.f2 = value2; } + + public T0 getF0() { + return f0; + } + + public T1 getF1() { + return f1; + } + + public T2 getF2() { + return f2; + } } diff --git a/tubemq-core/src/main/java/org/apache/tubemq/corerpc/AbstractServiceInvoker.java b/tubemq-core/src/main/java/org/apache/tubemq/corerpc/AbstractServiceInvoker.java index 8103387..1154f12 100644 --- a/tubemq-core/src/main/java/org/apache/tubemq/corerpc/AbstractServiceInvoker.java +++ b/tubemq-core/src/main/java/org/apache/tubemq/corerpc/AbstractServiceInvoker.java @@ -64,7 +64,7 @@ public abstract class AbstractServiceInvoker implements InvocationHandler { // client.close(); } - private class RpcResponseCallback implements Callback { + private static class RpcResponseCallback implements Callback { private Callback chainedCallback; diff --git a/tubemq-core/src/main/java/org/apache/tubemq/corerpc/RpcServiceFactory.java b/tubemq-core/src/main/java/org/apache/tubemq/corerpc/RpcServiceFactory.java index 8652c2e..cd16253 100644 --- a/tubemq-core/src/main/java/org/apache/tubemq/corerpc/RpcServiceFactory.java +++ b/tubemq-core/src/main/java/org/apache/tubemq/corerpc/RpcServiceFactory.java @@ -470,7 +470,7 @@ public class RpcServiceFactory { .append(masterInfo.getMasterClusterStr()).toString(); } - private class ServiceHolder<T> implements Shutdownable { + private static class ServiceHolder<T> implements Shutdownable { private T service; private AbstractServiceInvoker invoker; @@ -489,7 +489,7 @@ public class RpcServiceFactory { } } - private class ConnectionNode { + private static class ConnectionNode { private Class clazzType; private NodeAddrInfo addressInfo; private RpcConfig config; diff --git a/tubemq-example/src/main/java/org/apache/tubemq/example/MAMessageProducerExample.java b/tubemq-example/src/main/java/org/apache/tubemq/example/MAMessageProducerExample.java index ea546c9..814dbad 100644 --- a/tubemq-example/src/main/java/org/apache/tubemq/example/MAMessageProducerExample.java +++ b/tubemq-example/src/main/java/org/apache/tubemq/example/MAMessageProducerExample.java @@ -191,12 +191,12 @@ public class MAMessageProducerExample { long millis = System.currentTimeMillis(); roundIndex = (int) (sentCount++ % targetCnt); Tuple2<String, String> target = topicSendRounds.get(roundIndex); - Message message = new Message(target.f0, sendData); + Message message = new Message(target.getF0(), sendData); message.setAttrKeyVal("index", String.valueOf(sentCount)); message.setAttrKeyVal("dataTime", String.valueOf(millis)); - if (target.f1 != null) { + if (target.getF1() != null) { filterMsgCount.incrementAndGet(); - message.putSystemHeader(target.f1, sdf.format(new Date(millis))); + message.putSystemHeader(target.getF1(), sdf.format(new Date(millis))); } try { // next line sends message synchronously, which is not recommended diff --git a/tubemq-example/src/main/java/org/apache/tubemq/example/MessageProducerExample.java b/tubemq-example/src/main/java/org/apache/tubemq/example/MessageProducerExample.java index 7c29a63..2b806b5 100644 --- a/tubemq-example/src/main/java/org/apache/tubemq/example/MessageProducerExample.java +++ b/tubemq-example/src/main/java/org/apache/tubemq/example/MessageProducerExample.java @@ -104,12 +104,12 @@ public final class MessageProducerExample { while (msgCount < 0 || sentCount < msgCount) { roundIndex = (int) (sentCount++ % targetCnt); Tuple2<String, String> target = topicSendRounds.get(roundIndex); - Message message = new Message(target.f0, body.getBytes()); + Message message = new Message(target.getF0(), body.getBytes()); long currTimeMillis = System.currentTimeMillis(); message.setAttrKeyVal("index", String.valueOf(sentCount)); message.setAttrKeyVal("dataTime", String.valueOf(currTimeMillis)); - if (target.f1 != null) { - message.putSystemHeader(target.f1, sdf.format(new Date(currTimeMillis))); + if (target.getF1() != null) { + message.putSystemHeader(target.getF1(), sdf.format(new Date(currTimeMillis))); } try { // 1.1 next line sends message synchronously, which is not recommended diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/offset/DefaultOffsetManager.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/offset/DefaultOffsetManager.java index f052375..0e91dc2 100644 --- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/offset/DefaultOffsetManager.java +++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/offset/DefaultOffsetManager.java @@ -439,10 +439,12 @@ public class DefaultOffsetManager extends AbstractDaemonService implements Offse getOffsetCacheKey(entry.getKey(), partitionId); OffsetStorageInfo offsetInfo = topicPartOffsetMap.get(offsetCacheKey); Long tmpOffset = tmpPartOffsetMap.get(offsetCacheKey); + if (tmpOffset == null) { + tmpOffset = 0L; + } if (offsetInfo != null) { offsetMap.put(partitionId, - new Tuple2<>(offsetInfo.getOffset(), - (tmpOffset == null ? 0 : tmpOffset))); + new Tuple2<>(offsetInfo.getOffset(), tmpOffset)); } } if (!offsetMap.isEmpty()) { @@ -473,21 +475,21 @@ public class DefaultOffsetManager extends AbstractDaemonService implements Offse for (String group : groups) { for (Tuple3<String, Integer, Long> tuple3 : topicPartOffsets) { if (tuple3 == null - || tuple3.f0 == null - || tuple3.f1 == null - || tuple3.f2 == null) { + || tuple3.getF0() == null + || tuple3.getF1() == null + || tuple3.getF2() == null) { continue; } // set offset value - offsetCacheKey = getOffsetCacheKey(tuple3.f0, tuple3.f1); + offsetCacheKey = getOffsetCacheKey(tuple3.getF0(), tuple3.getF1()); getAndResetTmpOffset(group, offsetCacheKey); OffsetStorageInfo regInfo = loadOrCreateOffset(group, - tuple3.f0, tuple3.f1, offsetCacheKey, 0); - oldOffset = regInfo.getAndSetOffset(tuple3.f2); + tuple3.getF0(), tuple3.getF1(), offsetCacheKey, 0); + oldOffset = regInfo.getAndSetOffset(tuple3.getF2()); changed = true; logger.info(strBuidler .append("[Offset Manager] Update offset by modifier=") - .append(modifier).append(",reset offset=").append(tuple3.f2) + .append(modifier).append(",reset offset=").append(tuple3.getF2()) .append(",old offset=").append(oldOffset) .append(",updated offset=").append(regInfo.getOffset()) .append(",group=").append(group) @@ -710,9 +712,5 @@ public class DefaultOffsetManager extends AbstractDaemonService implements Offse .append("-").append(partitionId).toString(); } - private String getOffsetCacheKey(String topic, String partitionId) { - return new StringBuilder(256).append(topic) - .append("-").append(partitionId).toString(); - } } diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/stats/GroupCountService.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/stats/GroupCountService.java index b0de32e..7a7c056 100644 --- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/stats/GroupCountService.java +++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/stats/GroupCountService.java @@ -139,7 +139,7 @@ public class GroupCountService extends AbstractDaemonService implements CountSer countSet.refCnt.decrementAndGet(); } - private class CountSet { + private static class CountSet { public AtomicLong refCnt = new AtomicLong(0); public ConcurrentHashMap<String, CountItem> counterItem = new ConcurrentHashMap<>(); diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/utils/GroupOffsetInfo.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/utils/GroupOffsetInfo.java index a0c7215..fc0ebfa 100644 --- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/utils/GroupOffsetInfo.java +++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/utils/GroupOffsetInfo.java @@ -48,8 +48,8 @@ public class GroupOffsetInfo { public void setConsumeOffsetInfo(Tuple2<Long, Long> offsetInfo) { if (offsetInfo != null) { - this.curOffset = offsetInfo.f0; - this.flightOffset = offsetInfo.f1; + this.curOffset = offsetInfo.getF0(); + this.flightOffset = offsetInfo.getF1(); } } diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/web/BrokerAdminServlet.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/web/BrokerAdminServlet.java index 30d7247..5b11be1 100644 --- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/web/BrokerAdminServlet.java +++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/web/BrokerAdminServlet.java @@ -459,7 +459,7 @@ public class BrokerAdminServlet extends AbstractWebHandler { return; } Set<String> filterCondStrSet = (Set<String>) result.retData1; - sBuilder = broker.getBrokerServiceServer() + broker.getBrokerServiceServer() .getMessageSnapshot(topicName, partitionId, msgCount, filterCondStrSet, sBuilder); } @@ -811,8 +811,7 @@ public class BrokerAdminServlet extends AbstractWebHandler { // transfer offset format resetOffsets = buildOffsetResetInfo(topicSet); } - boolean changed = broker.getOffsetManager().modifyGroupOffset( - groupNameSet, resetOffsets, modifier); + broker.getOffsetManager().modifyGroupOffset(groupNameSet, resetOffsets, modifier); sBuilder.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"OK\"}"); } @@ -875,10 +874,8 @@ public class BrokerAdminServlet extends AbstractWebHandler { Map<String, Map<Integer, Tuple2<Long, Long>>> srcGroupOffsets = broker.getOffsetManager().queryGroupOffset(srcGroupName, topicPartMap); // transfer offset format - List<Tuple3<String, Integer, Long>> resetOffsets = - buildOffsetResetInfo(srcGroupOffsets); - boolean changed = broker.getOffsetManager().modifyGroupOffset( - tgtGroupNameSet, resetOffsets, modifier); + List<Tuple3<String, Integer, Long>> resetOffsets = buildOffsetResetInfo(srcGroupOffsets); + broker.getOffsetManager().modifyGroupOffset(tgtGroupNameSet, resetOffsets, modifier); // builder return result sBuilder.append("{\"result\":true,\"errCode\":0,\"errMsg\":\"OK\"}"); } @@ -965,8 +962,8 @@ public class BrokerAdminServlet extends AbstractWebHandler { long firstOffset = store.getIndexMinOffset(); long lastOffset = store.getIndexMaxOffset(); // adjust reset offset value - adjOffset = offsetTuple.f0 < firstOffset - ? firstOffset : Math.min(offsetTuple.f0, lastOffset); + adjOffset = offsetTuple.getF0() < firstOffset + ? firstOffset : Math.min(offsetTuple.getF0(), lastOffset); result.add(new Tuple3<>(entry.getKey(), entry1.getKey(), adjOffset)); } } diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/common/webbase/WebFieldType.java b/tubemq-server/src/main/java/org/apache/tubemq/server/common/webbase/WebFieldType.java index 2f32cb1..a3e037f 100644 --- a/tubemq-server/src/main/java/org/apache/tubemq/server/common/webbase/WebFieldType.java +++ b/tubemq-server/src/main/java/org/apache/tubemq/server/common/webbase/WebFieldType.java @@ -32,8 +32,8 @@ public enum WebFieldType { JSONTYPE(8, "Json"); - public int value; - public String desc; + private int value; + private String desc; WebFieldType(int value, String desc) { this.value = value; diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/TMaster.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/TMaster.java index cc6f681..4299bc7 100644 --- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/TMaster.java +++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/TMaster.java @@ -1956,12 +1956,12 @@ public class TMaster extends HasThread implements MasterService, Stoppable { Tuple2<String, ConsumerInfo> tupleInfo = consumerHolder.getConsumeTupleInfo(consumerId); if (tupleInfo == null - || tupleInfo.f0 == null - || tupleInfo.f1 == null) { + || tupleInfo.getF0() == null + || tupleInfo.getF1() == null) { continue; } List<String> blackTopicList = - this.defaultBrokerConfManager.getBdbBlackTopicList(tupleInfo.f0); + this.defaultBrokerConfManager.getBdbBlackTopicList(tupleInfo.getF0()); Map<String, List<Partition>> topicSubPartMap = entry.getValue(); List<SubscribeInfo> deletedSubInfoList = new ArrayList<>(); List<SubscribeInfo> addedSubInfoList = new ArrayList<>(); @@ -1979,7 +1979,7 @@ public class TMaster extends HasThread implements MasterService, Stoppable { currentPartMap = new HashMap<>(); } } - if (tupleInfo.f1.isOverTLS()) { + if (tupleInfo.getF1().isOverTLS()) { for (Partition currentPart : currentPartMap.values()) { if (!blackTopicList.contains(currentPart.getTopic())) { boolean found = false; @@ -1995,8 +1995,8 @@ public class TMaster extends HasThread implements MasterService, Stoppable { } } deletedSubInfoList - .add(new SubscribeInfo(consumerId, tupleInfo.f0, - tupleInfo.f1.isOverTLS(), currentPart)); + .add(new SubscribeInfo(consumerId, tupleInfo.getF0(), + tupleInfo.getF1().isOverTLS(), currentPart)); } for (Partition finalPart : finalPartList) { if (!blackTopicList.contains(finalPart.getTopic())) { @@ -2012,7 +2012,7 @@ public class TMaster extends HasThread implements MasterService, Stoppable { continue; } addedSubInfoList.add(new SubscribeInfo(consumerId, - tupleInfo.f0, true, finalPart)); + tupleInfo.getF0(), true, finalPart)); } } } else { @@ -2020,14 +2020,14 @@ public class TMaster extends HasThread implements MasterService, Stoppable { if ((blackTopicList.contains(currentPart.getTopic())) || (!finalPartList.contains(currentPart))) { deletedSubInfoList.add(new SubscribeInfo(consumerId, - tupleInfo.f0, false, currentPart)); + tupleInfo.getF0(), false, currentPart)); } } for (Partition finalPart : finalPartList) { if ((currentPartMap.get(finalPart.getPartitionKey()) == null) && (!blackTopicList.contains(finalPart.getTopic()))) { addedSubInfoList.add(new SubscribeInfo(consumerId, - tupleInfo.f0, false, finalPart)); + tupleInfo.getF0(), false, finalPart)); } } } @@ -2090,13 +2090,13 @@ public class TMaster extends HasThread implements MasterService, Stoppable { Tuple2<String, ConsumerInfo> tupleInfo = consumerHolder.getConsumeTupleInfo(consumerId); if (tupleInfo == null - || tupleInfo.f0 == null - || tupleInfo.f1 == null) { + || tupleInfo.getF0() == null + || tupleInfo.getF1() == null) { continue; } // allocate partitions to consumers List<String> blackTopicList = - this.defaultBrokerConfManager.getBdbBlackTopicList(tupleInfo.f0); + this.defaultBrokerConfManager.getBdbBlackTopicList(tupleInfo.getF0()); Map<String, Map<String, Partition>> topicSubPartMap = entry.getValue(); List<SubscribeInfo> deletedSubInfoList = new ArrayList<>(); List<SubscribeInfo> addedSubInfoList = new ArrayList<>(); @@ -2120,15 +2120,15 @@ public class TMaster extends HasThread implements MasterService, Stoppable { if ((blackTopicList.contains(currentPart.getTopic())) || (finalPartMap.get(currentPart.getPartitionKey()) == null)) { deletedSubInfoList - .add(new SubscribeInfo(consumerId, tupleInfo.f0, - tupleInfo.f1.isOverTLS(), currentPart)); + .add(new SubscribeInfo(consumerId, tupleInfo.getF0(), + tupleInfo.getF1().isOverTLS(), currentPart)); } } for (Partition finalPart : finalPartMap.values()) { if ((currentPartMap.get(finalPart.getPartitionKey()) == null) && (!blackTopicList.contains(finalPart.getTopic()))) { - addedSubInfoList.add(new SubscribeInfo(consumerId, tupleInfo.f0, - tupleInfo.f1.isOverTLS(), finalPart)); + addedSubInfoList.add(new SubscribeInfo(consumerId, tupleInfo.getF0(), + tupleInfo.getF1().isOverTLS(), finalPart)); } } } diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/bdbstore/DefaultBdbStoreService.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/bdbstore/DefaultBdbStoreService.java index 7622c57..b201cde 100644 --- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/bdbstore/DefaultBdbStoreService.java +++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/bdbstore/DefaultBdbStoreService.java @@ -930,9 +930,7 @@ public class DefaultBdbStoreService implements BdbStoreService, Server { if (!isPrimaryNodeActive()) { if ((replicas4Transfer != null) && (!replicas4Transfer.isEmpty())) { logger.info("start transferMaster to replicas: " + replicas4Transfer); - if ((replicas4Transfer != null) && (!replicas4Transfer.isEmpty())) { - repEnv.transferMaster(replicas4Transfer, 5, TimeUnit.MINUTES); - } + repEnv.transferMaster(replicas4Transfer, 5, TimeUnit.MINUTES); logger.info("transferMaster end..."); } else { throw new Exception("The replicate nodes is empty!"); diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerInfoHolder.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerInfoHolder.java index dec535b..c9fdaea 100644 --- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerInfoHolder.java +++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodebroker/BrokerInfoHolder.java @@ -323,7 +323,7 @@ public class BrokerInfoHolder { } } - public class BrokerAbnInfo { + public static class BrokerAbnInfo { private int brokerId; private int abnStatus; // 0 normal , -100 read abnormal, -1 write abnormal, -101 r & w abnormal private long firstRepTime; @@ -364,7 +364,7 @@ public class BrokerInfoHolder { } } - public class BrokerFbdInfo { + public static class BrokerFbdInfo { private int brokerId; private int befStatus; private int newStatus; diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/tools/cli/CliProducer.java b/tubemq-server/src/main/java/org/apache/tubemq/server/tools/cli/CliProducer.java index f544829..a6e7e38 100644 --- a/tubemq-server/src/main/java/org/apache/tubemq/server/tools/cli/CliProducer.java +++ b/tubemq-server/src/main/java/org/apache/tubemq/server/tools/cli/CliProducer.java @@ -268,10 +268,10 @@ public class CliProducer extends CliAbstractBase { try { long millis = System.currentTimeMillis(); Tuple2<String, String> target = topicSendRounds.get(roundIndex); - Message message = new Message(target.f0, sentData); - if (target.f1 != null) { + Message message = new Message(target.getF0(), sentData); + if (target.getF1() != null) { // if include filter, add filter item - message.putSystemHeader(target.f1, sdf.format(new Date(millis))); + message.putSystemHeader(target.getF1(), sdf.format(new Date(millis))); } // use sync or async process if (syncProduction) {
