This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch branch-1.4 in repository https://gitbox.apache.org/repos/asf/inlong.git
commit 7b7fc132f83e881509adba16e5e6fc0f893ef683 Author: Goson Zhang <[email protected]> AuthorDate: Tue Nov 8 11:00:43 2022 +0800 [INLONG-6423][TubeMQ] Consumer registration failed due to BDB error (#6450) --- .../inlong/tubemq/server/broker/offset/OffsetRecordService.java | 2 +- .../main/java/org/apache/inlong/tubemq/server/master/TMaster.java | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/offset/OffsetRecordService.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/offset/OffsetRecordService.java index 2f55d4f14..16901911f 100644 --- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/offset/OffsetRecordService.java +++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/offset/OffsetRecordService.java @@ -79,7 +79,7 @@ public class OffsetRecordService extends AbstractDaemonService { // check topic writable status TopicMetadata topicMetadata = storeManager.getMetadataManager() .getTopicMetadata(TServerConstants.OFFSET_HISTORY_NAME); - if (!topicMetadata.isAcceptPublish()) { + if (topicMetadata == null || !topicMetadata.isAcceptPublish()) { return; } // get group offset information diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/TMaster.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/TMaster.java index e581a5105..6a5b76b90 100644 --- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/TMaster.java +++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/TMaster.java @@ -575,6 +575,7 @@ public class TMaster extends HasThread implements MasterService, Stoppable { return builder.build(); } final String groupName = (String) paramCheckResult.checkData; + checkNodeStatus(consumerId, strBuffer); if (!PBParameterUtils.checkConsumerTopicList(defMetaDataService.getDeployedTopicSet(), request.getTopicListList(), result, strBuffer)) { builder.setErrCode(result.getErrCode()); @@ -626,7 +627,6 @@ public class TMaster extends HasThread implements MasterService, Stoppable { return builder.build(); } ConsumerInfo inConsumerInfo2 = (ConsumerInfo) paramCheckResult.checkData; - checkNodeStatus(consumerId, strBuffer); CertifiedResult authorizeResult = serverAuthHandler.validConsumerAuthorizeInfo(certResult.userName, groupName, reqTopicSet, reqTopicConditions, rmtAddress); @@ -1258,6 +1258,8 @@ public class TMaster extends HasThread implements MasterService, Stoppable { return builder.build(); } final String groupName = (String) paramCheckResult.checkData; + // check master current status + checkNodeStatus(consumerId, sBuffer); if (!PBParameterUtils.checkConsumerTopicList(defMetaDataService.getDeployedTopicSet(), request.getTopicListList(), result, sBuffer)) { builder.setErrCode(result.getErrCode()); @@ -1283,8 +1285,6 @@ public class TMaster extends HasThread implements MasterService, Stoppable { if (request.hasOpsTaskInfo()) { opsTaskInfo.updOpsSyncInfo(request.getOpsTaskInfo()); } - // check master current status - checkNodeStatus(consumerId, sBuffer); ClientSyncInfo clientSyncInfo = new ClientSyncInfo(); if (request.hasSubRepInfo()) { clientSyncInfo.updSubRepInfo(brokerRunManager, request.getSubRepInfo());
