This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.6 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit b9855083d2bd6128897bfecb068d142f7c1a6e24 Author: Christophe Bornet <[email protected]> AuthorDate: Wed Nov 18 07:09:35 2020 +0100 Make ServerCnx, Producer and Consumer independent of Netty (#6720) This PR is a first step to implement [PIP 59: gRPC Protocol Handler](https://github.com/apache/pulsar/wiki/PIP-59%3A-gPRC-Protocol-Handler) It separates `ServerCnx` used by `Consumer` and `Producer` into an interface and an implementation. Alternate protocol handlers that use Pulsar's topics, consumers and producer will implement the interface with their own implementation. (cherry picked from commit 2a183ab0dfedfda9d13a8536c02d95c109056b63) --- .../pulsar/broker/service/BrokerService.java | 4 +- .../org/apache/pulsar/broker/service/Consumer.java | 120 +++------------------ .../org/apache/pulsar/broker/service/Producer.java | 52 ++++----- .../broker/service/PulsarChannelInitializer.java | 2 +- .../pulsar/broker/service/PulsarCommandSender.java | 16 ++- .../broker/service/PulsarCommandSenderImpl.java | 105 ++++++++++++++++++ .../apache/pulsar/broker/service/ServerCnx.java | 39 +++++-- .../org/apache/pulsar/broker/service/Topic.java | 2 +- .../apache/pulsar/broker/service/TransportCnx.java | 80 ++++++++++++++ .../service/nonpersistent/NonPersistentTopic.java | 6 +- .../broker/service/persistent/PersistentTopic.java | 10 +- .../service/MessagePublishBufferThrottleTest.java | 23 ++-- .../PersistentDispatcherFailoverConsumerTest.java | 4 + .../pulsar/broker/service/PersistentTopicTest.java | 2 + 14 files changed, 307 insertions(+), 158 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 2dee374..b27dba2 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -2260,8 +2260,8 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies } } - private void foreachCnx(Consumer<ServerCnx> consumer) { - Set<ServerCnx> cnxSet = new HashSet<>(); + private void foreachCnx(Consumer<TransportCnx> consumer) { + Set<TransportCnx> cnxSet = new HashSet<>(); topics.forEach((n, t) -> { Optional<Topic> topic = extractTopic(t); topic.ifPresent(value -> value.getProducers().values().forEach(producer -> cnxSet.add(producer.getCnx()))); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java index c2dd900..968050c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java @@ -23,12 +23,6 @@ import static com.google.common.base.Preconditions.checkArgument; import com.google.common.base.MoreObjects; import com.google.common.collect.Lists; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.Unpooled; -import io.netty.channel.ChannelFutureListener; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelPromise; - import java.util.ArrayList; import java.util.BitSet; import java.util.Collections; @@ -39,6 +33,8 @@ import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.LongAdder; import java.util.stream.Collectors; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.Promise; import org.apache.bookkeeper.mledger.Entry; import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.impl.PositionImpl; @@ -53,10 +49,8 @@ import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosi import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType; import org.apache.pulsar.common.api.proto.PulsarApi.IntRange; import org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData; -import org.apache.pulsar.common.api.proto.PulsarApi.ProtocolVersion; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.ConsumerStats; -import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.stats.Rate; import org.apache.pulsar.common.util.DateFormatter; import org.apache.pulsar.common.util.SafeCollectionUtils; @@ -69,7 +63,7 @@ import org.slf4j.LoggerFactory; public class Consumer { private final Subscription subscription; private final SubType subType; - private final ServerCnx cnx; + private final TransportCnx cnx; private final String appId; private AuthenticationDataSource authenticationData; private final String topicName; @@ -131,7 +125,7 @@ public class Consumer { public Consumer(Subscription subscription, SubType subType, String topicName, long consumerId, int priorityLevel, String consumerName, - int maxUnackedMessages, ServerCnx cnx, String appId, + int maxUnackedMessages, TransportCnx cnx, String appId, Map<String, String> metadata, boolean readCompacted, InitialPosition subscriptionInitialPosition, PulsarApi.KeySharedMeta keySharedMeta) throws BrokerServiceException { @@ -191,18 +185,11 @@ public class Consumer { } void notifyActiveConsumerChange(Consumer activeConsumer) { - if (!Commands.peerSupportsActiveConsumerListener(cnx.getRemoteEndpointProtocolVersion())) { - // if the client is older than `v12`, we don't need to send consumer group changes. - return; - } - if (log.isDebugEnabled()) { log.debug("notify consumer {} - that [{}] for subscription {} has new active consumer : {}", consumerId, topicName, subscription.getName(), activeConsumer); } - cnx.ctx().writeAndFlush( - Commands.newActiveConsumerChange(consumerId, this == activeConsumer), - cnx.ctx().voidPromise()); + cnx.getCommandSender().sendActiveConsumerChange(consumerId, this == activeConsumer); } public boolean readCompacted() { @@ -215,23 +202,21 @@ public class Consumer { * * @return a SendMessageInfo object that contains the detail of what was sent to consumer */ - - public ChannelPromise sendMessages(final List<Entry> entries, EntryBatchSizes batchSizes, EntryBatchIndexesAcks batchIndexesAcks, + public Future<Void> sendMessages(final List<Entry> entries, EntryBatchSizes batchSizes, EntryBatchIndexesAcks batchIndexesAcks, int totalMessages, long totalBytes, long totalChunkedMessages, RedeliveryTracker redeliveryTracker) { this.lastConsumedTimestamp = System.currentTimeMillis(); - final ChannelHandlerContext ctx = cnx.ctx(); - final ChannelPromise writePromise = ctx.newPromise(); if (entries.isEmpty() || totalMessages == 0) { if (log.isDebugEnabled()) { log.debug("[{}-{}] List of messages is empty, triggering write future immediately for consumerId {}", topicName, subscription, consumerId); } - writePromise.setSuccess(); batchSizes.recyle(); if (batchIndexesAcks != null) { batchIndexesAcks.recycle(); } + final Promise<Void> writePromise = cnx.newPromise(); + writePromise.setSuccess(null); return writePromise; } @@ -263,66 +248,8 @@ public class Consumer { bytesOutCounter.add(totalBytes); chuckedMessageRate.recordMultipleEvents(totalChunkedMessages, 0); - ctx.channel().eventLoop().execute(() -> { - for (int i = 0; i < entries.size(); i++) { - Entry entry = entries.get(i); - if (entry == null) { - // Entry was filtered out - continue; - } - - int batchSize = batchSizes.getBatchSize(i); - - if (batchSize > 1 && !cnx.isBatchMessageCompatibleVersion()) { - log.warn("[{}-{}] Consumer doesn't support batch messages - consumerId {}, msg id {}-{}", - topicName, subscription, - consumerId, entry.getLedgerId(), entry.getEntryId()); - ctx.close(); - entry.release(); - continue; - } - - MessageIdData.Builder messageIdBuilder = MessageIdData.newBuilder(); - MessageIdData messageId = messageIdBuilder - .setLedgerId(entry.getLedgerId()) - .setEntryId(entry.getEntryId()) - .setPartition(partitionIdx) - .build(); - - ByteBuf metadataAndPayload = entry.getDataBuffer(); - // increment ref-count of data and release at the end of process: so, we can get chance to call entry.release - metadataAndPayload.retain(); - // skip checksum by incrementing reader-index if consumer-client doesn't support checksum verification - if (cnx.getRemoteEndpointProtocolVersion() < ProtocolVersion.v11.getNumber()) { - Commands.skipChecksumIfPresent(metadataAndPayload); - } - - if (log.isDebugEnabled()) { - log.debug("[{}-{}] Sending message to consumerId {}, msg id {}-{}", topicName, subscription, - consumerId, entry.getLedgerId(), entry.getEntryId()); - } - - int redeliveryCount = 0; - PositionImpl position = PositionImpl.get(messageId.getLedgerId(), messageId.getEntryId()); - if (redeliveryTracker.contains(position)) { - redeliveryCount = redeliveryTracker.incrementAndGetRedeliveryCount(position); - } - ctx.write(cnx.newMessageAndIntercept(consumerId, messageId, redeliveryCount, metadataAndPayload, - batchIndexesAcks == null ? null : batchIndexesAcks.getAckSet(i), topicName), ctx.voidPromise()); - messageId.recycle(); - messageIdBuilder.recycle(); - entry.release(); - } - - // Use an empty write here so that we can just tie the flush with the write promise for last entry - ctx.writeAndFlush(Unpooled.EMPTY_BUFFER, writePromise); - batchSizes.recyle(); - if (batchIndexesAcks != null) { - batchIndexesAcks.recycle(); - } - }); - - return writePromise; + return cnx.getCommandSender().sendMessagesToConsumer(consumerId, topicName, subscription, partitionIdx, + entries, batchSizes, batchIndexesAcks, redeliveryTracker); } private void incrementUnackedMessages(int ackedMessages) { @@ -337,10 +264,6 @@ public class Consumer { return cnx.isWritable(); } - public void sendError(ByteBuf error) { - cnx.ctx().writeAndFlush(error).addListener(ChannelFutureListener.CLOSE); - } - /** * Close the consumer if: a. the connection is dropped b. connection is open (graceful close) and there are no * pending message acks @@ -368,18 +291,15 @@ public class Consumer { } } - void doUnsubscribe(final long requestId) { - final ChannelHandlerContext ctx = cnx.ctx(); - + public void doUnsubscribe(final long requestId) { subscription.doUnsubscribe(this).thenAccept(v -> { log.info("Unsubscribed successfully from {}", subscription); cnx.removedConsumer(this); - ctx.writeAndFlush(Commands.newSuccess(requestId)); + cnx.getCommandSender().sendSuccess(requestId); }).exceptionally(exception -> { log.warn("Unsubscribe failed for {}", subscription, exception); - ctx.writeAndFlush( - Commands.newError(requestId, BrokerServiceException.getClientErrorCode(exception), - exception.getCause().getMessage())); + cnx.getCommandSender().sendError(requestId, BrokerServiceException.getClientErrorCode(exception), + exception.getCause().getMessage()); return null; }); } @@ -439,7 +359,7 @@ public class Consumer { } } - void flowPermits(int additionalNumberOfMessages) { + public void flowPermits(int additionalNumberOfMessages) { checkArgument(additionalNumberOfMessages > 0); // block shared consumer when unacked-messages reaches limit @@ -489,11 +409,7 @@ public class Consumer { } public void reachedEndOfTopic() { - // Only send notification if the client understand the command - if (cnx.getRemoteEndpointProtocolVersion() >= ProtocolVersion.v9_VALUE) { - log.info("[{}] Notifying consumer that end of topic has been reached", this); - cnx.ctx().writeAndFlush(Commands.newReachedEndOfTopic(consumerId)); - } + cnx.getCommandSender().sendReachedEndOfTopic(consumerId); } /** @@ -546,10 +462,6 @@ public class Consumer { .add("consumerName", consumerName).add("address", this.cnx.clientAddress()).toString(); } - public ChannelHandlerContext ctx() { - return cnx.ctx(); - } - public void checkPermissions() { TopicName topicName = TopicName.get(subscription.getTopicName()); if (cnx.getBrokerService().getAuthorizationService() != null) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java index 2fd9c2e..3216d43 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java @@ -20,6 +20,7 @@ package org.apache.pulsar.broker.service; import static com.google.common.base.Preconditions.checkNotNull; import static com.scurrilous.circe.checksum.Crc32cIntChecksum.computeChecksum; +import static org.apache.pulsar.broker.service.AbstractReplicator.REPL_PRODUCER_NAME_DELIMITER; import static org.apache.pulsar.common.protocol.Commands.hasChecksum; import static org.apache.pulsar.common.protocol.Commands.readChecksum; @@ -52,14 +53,13 @@ import org.apache.pulsar.common.stats.Rate; import org.apache.pulsar.common.util.DateFormatter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static org.apache.pulsar.broker.service.AbstractReplicator.REPL_PRODUCER_NAME_DELIMITER; /** * Represents a currently connected producer */ public class Producer { private final Topic topic; - private final ServerCnx cnx; + private final TransportCnx cnx; private final String producerName; private final long epoch; private final boolean userProvidedProducerName; @@ -88,7 +88,7 @@ public class Producer { private final SchemaVersion schemaVersion; - public Producer(Topic topic, ServerCnx cnx, long producerId, String producerName, String appId, + public Producer(Topic topic, TransportCnx cnx, long producerId, String producerName, String appId, boolean isEncrypted, Map<String, String> metadata, SchemaVersion schemaVersion, long epoch, boolean userProvidedProducerName) { this.topic = topic; @@ -140,41 +140,42 @@ public class Producer { public void publishMessage(long producerId, long sequenceId, ByteBuf headersAndPayload, long batchSize, boolean isChunked) { - beforePublish(producerId, sequenceId, headersAndPayload, batchSize); - publishMessageToTopic(headersAndPayload, sequenceId, batchSize, isChunked); + if (checkAndStartPublish(producerId, sequenceId, headersAndPayload, batchSize)) { + publishMessageToTopic(headersAndPayload, sequenceId, batchSize, isChunked); + } } public void publishMessage(long producerId, long lowestSequenceId, long highestSequenceId, - ByteBuf headersAndPayload, long batchSize, boolean isChunked) { + ByteBuf headersAndPayload, long batchSize, boolean isChunked) { if (lowestSequenceId > highestSequenceId) { - cnx.ctx().channel().eventLoop().execute(() -> { + cnx.execute(() -> { cnx.getCommandSender().sendSendError(producerId, highestSequenceId, ServerError.MetadataError, "Invalid lowest or highest sequence id"); cnx.completedSendOperation(isNonPersistentTopic, headersAndPayload.readableBytes()); }); return; } - beforePublish(producerId, highestSequenceId, headersAndPayload, batchSize); - publishMessageToTopic(headersAndPayload, lowestSequenceId, highestSequenceId, batchSize, isChunked); + if (checkAndStartPublish(producerId, highestSequenceId, headersAndPayload, batchSize)) { + publishMessageToTopic(headersAndPayload, lowestSequenceId, highestSequenceId, batchSize, isChunked); + } } - public void beforePublish(long producerId, long sequenceId, ByteBuf headersAndPayload, long batchSize) { + public boolean checkAndStartPublish(long producerId, long sequenceId, ByteBuf headersAndPayload, long batchSize) { if (isClosed) { - cnx.ctx().channel().eventLoop().execute(() -> { + cnx.execute(() -> { cnx.getCommandSender().sendSendError(producerId, sequenceId, ServerError.PersistenceError, "Producer is closed"); cnx.completedSendOperation(isNonPersistentTopic, headersAndPayload.readableBytes()); }); - - return; + return false; } if (!verifyChecksum(headersAndPayload)) { - cnx.ctx().channel().eventLoop().execute(() -> { + cnx.execute(() -> { cnx.getCommandSender().sendSendError(producerId, sequenceId, ServerError.ChecksumError, "Checksum failed on the broker"); cnx.completedSendOperation(isNonPersistentTopic, headersAndPayload.readableBytes()); }); - return; + return false; } if (topic.isEncryptionRequired()) { @@ -187,16 +188,17 @@ public class Producer { // Check whether the message is encrypted or not if (encryptionKeysCount < 1) { log.warn("[{}] Messages must be encrypted", getTopic().getName()); - cnx.ctx().channel().eventLoop().execute(() -> { + cnx.execute(() -> { cnx.getCommandSender().sendSendError(producerId, sequenceId, ServerError.MetadataError, "Messages must be encrypted"); cnx.completedSendOperation(isNonPersistentTopic, headersAndPayload.readableBytes()); }); - return; + return false; } } startPublishOperation((int) batchSize, headersAndPayload.readableBytes()); + return true; } private void publishMessageToTopic(ByteBuf headersAndPayload, long sequenceId, long batchSize, boolean isChunked) { @@ -266,7 +268,7 @@ public class Producer { /** * Return the sequence id of - * @return + * @return the sequence id */ public long getLastSequenceId() { if (isNonPersistentTopic) { @@ -276,7 +278,7 @@ public class Producer { } } - public ServerCnx getCnx() { + public TransportCnx getCnx() { return this.cnx; } @@ -354,7 +356,7 @@ public class Producer { ServerError serverError = (exception instanceof TopicTerminatedException) ? ServerError.TopicTerminatedError : ServerError.PersistenceError; - producer.cnx.ctx().channel().eventLoop().execute(() -> { + producer.cnx.execute(() -> { if (!(exception instanceof TopicClosedException)) { // For TopicClosed exception there's no need to send explicit error, since the client was // already notified @@ -374,7 +376,7 @@ public class Producer { this.ledgerId = ledgerId; this.entryId = entryId; - producer.cnx.ctx().channel().eventLoop().execute(this); + producer.cnx.execute(this); } } @@ -417,7 +419,7 @@ public class Producer { } static MessagePublishContext get(Producer producer, long lowestSequenceId, long highestSequenceId, Rate rateIn, - int msgSize, long batchSize, boolean chunked, long startTimeNs) { + int msgSize, long batchSize, boolean chunked, long startTimeNs) { MessagePublishContext callback = RECYCLER.get(); callback.producer = producer; callback.sequenceId = lowestSequenceId; @@ -439,7 +441,7 @@ public class Producer { } private static final Recycler<MessagePublishContext> RECYCLER = new Recycler<MessagePublishContext>() { - protected MessagePublishContext newObject(Recycler.Handle<MessagePublishContext> handle) { + protected MessagePublishContext newObject(Handle<MessagePublishContext> handle) { return new MessagePublishContext(handle); } }; @@ -511,7 +513,7 @@ public class Producer { return closeFuture; } - void closeNow(boolean removeFromTopic) { + public void closeNow(boolean removeFromTopic) { if (removeFromTopic) { topic.removeProducer(this); } @@ -532,7 +534,7 @@ public class Producer { public CompletableFuture<Void> disconnect() { if (!closeFuture.isDone()) { log.info("Disconnecting producer: {}", this); - cnx.ctx().executor().execute(() -> { + cnx.execute(() -> { cnx.closeProducer(this); closeNow(true); }); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java index 2a2d3d5..7425ffb 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java @@ -137,7 +137,7 @@ public class PulsarChannelInitializer extends ChannelInitializer<SocketChannel> try { cnx.refreshAuthenticationCredentials(); } catch (Throwable t) { - log.warn("[{}] Failed to refresh auth credentials", cnx.getRemoteAddress()); + log.warn("[{}] Failed to refresh auth credentials", cnx.clientAddress()); } }); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSender.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSender.java index 1773106..141c7c8 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSender.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSender.java @@ -18,6 +18,9 @@ */ package org.apache.pulsar.broker.service; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.ImmediateEventExecutor; +import org.apache.bookkeeper.mledger.Entry; import org.apache.pulsar.common.api.proto.PulsarApi; import org.apache.pulsar.common.protocol.schema.SchemaVersion; import org.apache.pulsar.common.schema.SchemaInfo; @@ -26,7 +29,6 @@ import java.util.List; public interface PulsarCommandSender { - void sendPartitionMetadataResponse(PulsarApi.ServerError error, String errorMsg, long requestId); void sendPartitionMetadataResponse(int partitions, long requestId); @@ -61,4 +63,16 @@ public interface PulsarCommandSender { PulsarApi.CommandLookupTopicResponse.LookupType response, long requestId, boolean proxyThroughServiceUrl); void sendLookupResponse(PulsarApi.ServerError error, String errorMsg, long requestId); + + void sendActiveConsumerChange(long consumerId, boolean isActive); + + void sendSuccess(long requestId); + + void sendError(long requestId, PulsarApi.ServerError error, String message); + + void sendReachedEndOfTopic(long consumerId); + + Future<Void> sendMessagesToConsumer(long consumerId, String topicName, Subscription subscription, + int partitionIdx, final List<Entry> entries, EntryBatchSizes batchSizes, EntryBatchIndexesAcks batchIndexesAcks, + RedeliveryTracker redeliveryTracker); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java index c9f1b61..c3d6a56 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java @@ -19,9 +19,16 @@ package org.apache.pulsar.broker.service; import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelPromise; import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.mledger.Entry; +import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.pulsar.broker.intercept.BrokerInterceptor; import org.apache.pulsar.common.api.proto.PulsarApi; +import org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData; +import org.apache.pulsar.common.api.proto.PulsarApi.ProtocolVersion; import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.protocol.schema.SchemaVersion; import org.apache.pulsar.common.schema.SchemaInfo; @@ -206,6 +213,104 @@ public class PulsarCommandSenderImpl implements PulsarCommandSender { cnx.ctx().writeAndFlush(outBuf); } + @Override + public void sendActiveConsumerChange(long consumerId, boolean isActive) { + if (!Commands.peerSupportsActiveConsumerListener(cnx.getRemoteEndpointProtocolVersion())) { + // if the client is older than `v12`, we don't need to send consumer group changes. + return; + } + cnx.ctx().writeAndFlush( + Commands.newActiveConsumerChange(consumerId, isActive), + cnx.ctx().voidPromise()); + } + + @Override + public void sendSuccess(long requestId) { + cnx.ctx().writeAndFlush(Commands.newSuccess(requestId)); + } + + @Override + public void sendError(long requestId, PulsarApi.ServerError error, String message) { + cnx.ctx().writeAndFlush(Commands.newError(requestId, error, message)); + } + + @Override + public void sendReachedEndOfTopic(long consumerId) { + // Only send notification if the client understand the command + if (cnx.getRemoteEndpointProtocolVersion() >= PulsarApi.ProtocolVersion.v9_VALUE) { + log.info("[{}] Notifying consumer that end of topic has been reached", this); + cnx.ctx().writeAndFlush(Commands.newReachedEndOfTopic(consumerId)); + } + } + + @Override + public ChannelPromise sendMessagesToConsumer(long consumerId, String topicName, Subscription subscription, + int partitionIdx, final List<Entry> entries, EntryBatchSizes batchSizes, EntryBatchIndexesAcks batchIndexesAcks, + RedeliveryTracker redeliveryTracker) { + final ChannelHandlerContext ctx = cnx.ctx(); + final ChannelPromise writePromise = ctx.newPromise(); + ctx.channel().eventLoop().execute(() -> { + for (int i = 0; i < entries.size(); i++) { + Entry entry = entries.get(i); + if (entry == null) { + // Entry was filtered out + continue; + } + + int batchSize = batchSizes.getBatchSize(i); + + if (batchSize > 1 && !cnx.isBatchMessageCompatibleVersion()) { + log.warn("[{}-{}] Consumer doesn't support batch messages - consumerId {}, msg id {}-{}", + topicName, subscription, + consumerId, entry.getLedgerId(), entry.getEntryId()); + ctx.close(); + entry.release(); + continue; + } + + MessageIdData.Builder messageIdBuilder = MessageIdData.newBuilder(); + MessageIdData messageId = messageIdBuilder + .setLedgerId(entry.getLedgerId()) + .setEntryId(entry.getEntryId()) + .setPartition(partitionIdx) + .build(); + + ByteBuf metadataAndPayload = entry.getDataBuffer(); + // increment ref-count of data and release at the end of process: so, we can get chance to call entry.release + metadataAndPayload.retain(); + // skip checksum by incrementing reader-index if consumer-client doesn't support checksum verification + if (cnx.getRemoteEndpointProtocolVersion() < ProtocolVersion.v11.getNumber()) { + Commands.skipChecksumIfPresent(metadataAndPayload); + } + + if (log.isDebugEnabled()) { + log.debug("[{}-{}] Sending message to consumerId {}, msg id {}-{}", topicName, subscription, + consumerId, entry.getLedgerId(), entry.getEntryId()); + } + + int redeliveryCount = 0; + PositionImpl position = PositionImpl.get(messageId.getLedgerId(), messageId.getEntryId()); + if (redeliveryTracker.contains(position)) { + redeliveryCount = redeliveryTracker.incrementAndGetRedeliveryCount(position); + } + ctx.write(cnx.newMessageAndIntercept(consumerId, messageId, redeliveryCount, metadataAndPayload, + batchIndexesAcks == null ? null : batchIndexesAcks.getAckSet(i), topicName), ctx.voidPromise()); + messageId.recycle(); + messageIdBuilder.recycle(); + entry.release(); + } + + // Use an empty write here so that we can just tie the flush with the write promise for last entry + ctx.writeAndFlush(Unpooled.EMPTY_BUFFER, writePromise); + batchSizes.recyle(); + if (batchIndexesAcks != null) { + batchIndexesAcks.recycle(); + } + }); + + return writePromise; + } + private void safeIntercept(PulsarApi.BaseCommand command, ServerCnx cnx) { try { this.interceptor.onPulsarCommand(command, cnx); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 3b0426e..6497e12 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -34,6 +34,7 @@ import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelOption; import io.netty.handler.ssl.SslHandler; +import io.netty.util.concurrent.Promise; import java.net.SocketAddress; @@ -134,7 +135,7 @@ import org.apache.pulsar.transaction.impl.common.TxnStatus; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class ServerCnx extends PulsarHandler { +public class ServerCnx extends PulsarHandler implements TransportCnx { private final BrokerService service; private final SchemaRegistryService schemaService; private final ConcurrentLongHashMap<CompletableFuture<Producer>> producers; @@ -1771,6 +1772,7 @@ public class ServerCnx extends PulsarHandler { } + @Override public void closeConsumer(Consumer consumer) { // removes consumer-connection from map and send close command to consumer if (log.isDebugEnabled()) { @@ -1793,10 +1795,12 @@ public class ServerCnx extends PulsarHandler { ctx.close(); } + @Override public SocketAddress clientAddress() { return remoteAddress; } + @Override public void removedConsumer(Consumer consumer) { if (log.isDebugEnabled()) { log.debug("[{}] Removed consumer: {}", remoteAddress, consumer); @@ -1805,6 +1809,7 @@ public class ServerCnx extends PulsarHandler { consumers.remove(consumer.consumerId()); } + @Override public void removedProducer(Producer producer) { if (log.isDebugEnabled()) { log.debug("[{}] Removed producer: {}", remoteAddress, producer); @@ -1812,10 +1817,12 @@ public class ServerCnx extends PulsarHandler { producers.remove(producer.getProducerId()); } + @Override public boolean isActive() { return isActive; } + @Override public boolean isWritable() { return ctx.channel().isWritable(); } @@ -1848,6 +1855,7 @@ public class ServerCnx extends PulsarHandler { } } + @Override public void completedSendOperation(boolean isNonPersistentTopic, int msgSize) { MSG_PUBLISH_BUFFER_SIZE_UPDATER.getAndAdd(this, -msgSize); if (--pendingSendRequest == resumeReadsThreshold) { @@ -1861,6 +1869,7 @@ public class ServerCnx extends PulsarHandler { } } + @Override public void enableCnxAutoRead() { // we can add check (&& pendingSendRequest < MaxPendingSendRequests) here but then it requires // pendingSendRequest to be volatile and it can be expensive while writing. also this will be called on if @@ -1874,21 +1883,22 @@ public class ServerCnx extends PulsarHandler { } } + @Override public void disableCnxAutoRead() { if (ctx != null && ctx.channel().config().isAutoRead() ) { ctx.channel().config().setAutoRead(false); } } - @VisibleForTesting - void cancelPublishRateLimiting() { + @Override + public void cancelPublishRateLimiting() { if (autoReadDisabledRateLimiting) { autoReadDisabledRateLimiting = false; } } - @VisibleForTesting - void cancelPublishBufferLimiting() { + @Override + public void cancelPublishBufferLimiting() { if (autoReadDisabledPublishBufferLimiting) { autoReadDisabledPublishBufferLimiting = false; } @@ -1963,7 +1973,7 @@ public class ServerCnx extends PulsarHandler { /** * Helper method for testability * - * @return + * @return the connection state */ public State getState() { return state; @@ -1981,10 +1991,16 @@ public class ServerCnx extends PulsarHandler { return authRole; } + @Override + public Promise<Void> newPromise() { + return ctx.newPromise(); + } + boolean hasConsumer(long consumerId) { return consumers.containsKey(consumerId); } + @Override public boolean isBatchMessageCompatibleVersion() { return remoteEndpointProtocolVersion >= ProtocolVersion.v4.getNumber(); } @@ -1993,10 +2009,12 @@ public class ServerCnx extends PulsarHandler { return features != null && features.getSupportsAuthRefresh(); } + @Override public String getClientVersion() { return clientVersion; } + @Override public long getMessagePublishBufferSize() { return this.messagePublishBufferSize; } @@ -2011,6 +2029,7 @@ public class ServerCnx extends PulsarHandler { this.autoReadDisabledRateLimiting = isLimiting; } + @Override public boolean isPreciseDispatcherFlowControl() { return preciseDispatcherFlowControl; } @@ -2019,6 +2038,7 @@ public class ServerCnx extends PulsarHandler { return authState; } + @Override public AuthenticationDataSource getAuthenticationData() { return originalAuthData != null ? originalAuthData : authenticationData; } @@ -2031,6 +2051,7 @@ public class ServerCnx extends PulsarHandler { return authenticationProvider; } + @Override public String getAuthRole() { return authRole; } @@ -2047,7 +2068,13 @@ public class ServerCnx extends PulsarHandler { return producers; } + @Override public PulsarCommandSender getCommandSender() { return commandSender; } + + @Override + public void execute(Runnable runnable) { + ctx.channel().eventLoop().execute(runnable); + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java index 4d14326..ff5e962 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java @@ -102,7 +102,7 @@ public interface Topic { */ void recordAddLatency(long latency, TimeUnit unit); - CompletableFuture<Consumer> subscribe(ServerCnx cnx, String subscriptionName, long consumerId, SubType subType, + CompletableFuture<Consumer> subscribe(TransportCnx cnx, String subscriptionName, long consumerId, SubType subType, int priorityLevel, String consumerName, boolean isDurable, MessageId startMessageId, Map<String, String> metadata, boolean readCompacted, InitialPosition initialPosition, long startMessageRollbackDurationSec, boolean replicateSubscriptionState, PulsarApi.KeySharedMeta keySharedMeta); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TransportCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TransportCnx.java new file mode 100644 index 0000000..2b43eaa --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TransportCnx.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service; + +import io.netty.util.concurrent.Promise; +import org.apache.bookkeeper.mledger.Entry; +import org.apache.pulsar.broker.authentication.AuthenticationDataSource; +import org.apache.pulsar.common.api.proto.PulsarApi.ServerError; + +import java.net.SocketAddress; +import java.util.List; +import java.util.concurrent.CompletableFuture; + +public interface TransportCnx { + + String getClientVersion(); + + SocketAddress clientAddress(); + + BrokerService getBrokerService(); + + PulsarCommandSender getCommandSender(); + + boolean isBatchMessageCompatibleVersion(); + + /** + * The security role for this connection + * @return the role + */ + String getAuthRole(); + + AuthenticationDataSource getAuthenticationData(); + + boolean isActive(); + + boolean isWritable(); + + void completedSendOperation(boolean isNonPersistentTopic, int msgSize); + + void removedProducer(Producer producer); + + void closeProducer(Producer producer); + + long getMessagePublishBufferSize(); + + void cancelPublishRateLimiting(); + + void cancelPublishBufferLimiting(); + + void disableCnxAutoRead(); + + void enableCnxAutoRead(); + + void execute(Runnable runnable); + + void removedConsumer(Consumer consumer); + + void closeConsumer(Consumer consumer); + + boolean isPreciseDispatcherFlowControl(); + + Promise<Void> newPromise(); + +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java index b770f87..dd729bc 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java @@ -56,7 +56,7 @@ import org.apache.pulsar.broker.service.BrokerServiceException.UnsupportedVersio import org.apache.pulsar.broker.service.Consumer; import org.apache.pulsar.broker.service.Producer; import org.apache.pulsar.broker.service.Replicator; -import org.apache.pulsar.broker.service.ServerCnx; +import org.apache.pulsar.broker.service.TransportCnx; import org.apache.pulsar.broker.service.StreamingStats; import org.apache.pulsar.broker.service.Subscription; import org.apache.pulsar.broker.service.Topic; @@ -237,7 +237,7 @@ public class NonPersistentTopic extends AbstractTopic implements Topic { } @Override - public CompletableFuture<Consumer> subscribe(final ServerCnx cnx, String subscriptionName, long consumerId, + public CompletableFuture<Consumer> subscribe(final TransportCnx cnx, String subscriptionName, long consumerId, SubType subType, int priorityLevel, String consumerName, boolean isDurable, MessageId startMessageId, Map<String, String> metadata, boolean readCompacted, InitialPosition initialPosition, long resetStartMessageBackInSec, boolean replicateSubscriptionState, PulsarApi.KeySharedMeta keySharedMeta) { @@ -291,7 +291,7 @@ public class NonPersistentTopic extends AbstractTopic implements Topic { try { Consumer consumer = new Consumer(subscription, subType, topic, consumerId, priorityLevel, consumerName, 0, - cnx, cnx.getRole(), metadata, readCompacted, initialPosition, keySharedMeta); + cnx, cnx.getAuthRole(), metadata, readCompacted, initialPosition, keySharedMeta); addConsumerToSubscription(subscription, consumer); if (!cnx.isActive()) { consumer.close(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 198195d..9241ce9 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -84,7 +84,7 @@ import org.apache.pulsar.broker.service.Consumer; import org.apache.pulsar.broker.service.Dispatcher; import org.apache.pulsar.broker.service.Producer; import org.apache.pulsar.broker.service.Replicator; -import org.apache.pulsar.broker.service.ServerCnx; +import org.apache.pulsar.broker.service.TransportCnx; import org.apache.pulsar.broker.service.StreamingStats; import org.apache.pulsar.broker.service.Subscription; import org.apache.pulsar.broker.service.Topic; @@ -514,7 +514,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal } @Override - public CompletableFuture<Consumer> subscribe(final ServerCnx cnx, String subscriptionName, long consumerId, + public CompletableFuture<Consumer> subscribe(final TransportCnx cnx, String subscriptionName, long consumerId, SubType subType, int priorityLevel, String consumerName, boolean isDurable, MessageId startMessageId, Map<String, String> metadata, boolean readCompacted, InitialPosition initialPosition, long startMessageRollbackDurationSec, boolean replicatedSubscriptionState, PulsarApi.KeySharedMeta keySharedMeta) { @@ -571,9 +571,9 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal return future; } - if (cnx.getRemoteAddress() != null && cnx.getRemoteAddress().toString().contains(":")) { + if (cnx.clientAddress() != null && cnx.clientAddress().toString().contains(":")) { SubscribeRateLimiter.ConsumerIdentifier consumer = new SubscribeRateLimiter.ConsumerIdentifier( - cnx.getRemoteAddress().toString().split(":")[0], consumerName, consumerId); + cnx.clientAddress().toString().split(":")[0], consumerName, consumerId); if (subscribeRateLimiter.isPresent() && !subscribeRateLimiter.get().subscribeAvailable(consumer) || !subscribeRateLimiter.get().tryAcquire(consumer)) { log.warn("[{}] Failed to create subscription for {} {} limited by {}, available {}", topic, subscriptionName, consumer, subscribeRateLimiter.get().getSubscribeRate(), @@ -610,7 +610,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal subscriptionFuture.thenAccept(subscription -> { try { Consumer consumer = new Consumer(subscription, subType, topic, consumerId, priorityLevel, consumerName, - maxUnackedMessages, cnx, cnx.getRole(), metadata, readCompacted, initialPosition, keySharedMeta); + maxUnackedMessages, cnx, cnx.getAuthRole(), metadata, readCompacted, initialPosition, keySharedMeta); addConsumerToSubscription(subscription, consumer); checkBackloggedCursors(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessagePublishBufferThrottleTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessagePublishBufferThrottleTest.java index 8aa9c9a..b230f8d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessagePublishBufferThrottleTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessagePublishBufferThrottleTest.java @@ -56,7 +56,8 @@ public class MessagePublishBufferThrottleTest extends BrokerTestBase { .create(); Topic topicRef = pulsar.getBrokerService().getTopicReference(topic).get(); Assert.assertNotNull(topicRef); - ((AbstractTopic)topicRef).producers.get("producer-name").getCnx().setMessagePublishBufferSize(Long.MAX_VALUE / 2); + TransportCnx cnx = ((AbstractTopic) topicRef).producers.get("producer-name").getCnx(); + ((ServerCnx) cnx).setMessagePublishBufferSize(Long.MAX_VALUE / 2); Thread.sleep(20); Assert.assertFalse(pulsar.getBrokerService().isReachMessagePublishBufferThreshold()); List<CompletableFuture<MessageId>> futures = new ArrayList<>(); @@ -86,14 +87,15 @@ public class MessagePublishBufferThrottleTest extends BrokerTestBase { .create(); Topic topicRef = pulsar.getBrokerService().getTopicReference(topic).get(); Assert.assertNotNull(topicRef); - ((AbstractTopic)topicRef).producers.get("producer-name").getCnx().setMessagePublishBufferSize(Long.MAX_VALUE / 2); + TransportCnx cnx = ((AbstractTopic) topicRef).producers.get("producer-name").getCnx(); + ((ServerCnx) cnx).setMessagePublishBufferSize(Long.MAX_VALUE / 2); Assert.assertFalse(pulsar.getBrokerService().isReachMessagePublishBufferThreshold()); // The first message can publish success, but the second message should be blocked producer.sendAsync(new byte[1024]).get(1, TimeUnit.SECONDS); getPulsar().getBrokerService().checkMessagePublishBuffer(); Assert.assertTrue(pulsar.getBrokerService().isReachMessagePublishBufferThreshold()); - ((AbstractTopic)topicRef).producers.get("producer-name").getCnx().setMessagePublishBufferSize(0L); + ((ServerCnx) cnx).setMessagePublishBufferSize(0L); getPulsar().getBrokerService().checkMessagePublishBuffer(); Assert.assertFalse(pulsar.getBrokerService().isReachMessagePublishBufferThreshold()); List<CompletableFuture<MessageId>> futures = new ArrayList<>(); @@ -122,7 +124,8 @@ public class MessagePublishBufferThrottleTest extends BrokerTestBase { .create(); Topic topicRef = pulsar.getBrokerService().getTopicReference(topic).get(); Assert.assertNotNull(topicRef); - ((AbstractTopic)topicRef).producers.get("producer-name").getCnx().setMessagePublishBufferSize(Long.MAX_VALUE / 2); + TransportCnx cnx = ((AbstractTopic) topicRef).producers.get("producer-name").getCnx(); + ((ServerCnx) cnx).setMessagePublishBufferSize(Long.MAX_VALUE / 2); Assert.assertFalse(pulsar.getBrokerService().isReachMessagePublishBufferThreshold()); producer.sendAsync(new byte[1024]).get(1, TimeUnit.SECONDS); @@ -131,15 +134,15 @@ public class MessagePublishBufferThrottleTest extends BrokerTestBase { Assert.assertTrue(pulsar.getBrokerService().isReachMessagePublishBufferThreshold()); // Block by publish rate. - ((AbstractTopic)topicRef).producers.get("producer-name").getCnx().setMessagePublishBufferSize(0L); + ((ServerCnx) cnx).setMessagePublishBufferSize(0L); getPulsar().getBrokerService().checkMessagePublishBuffer(); - ((AbstractTopic)topicRef).producers.get("producer-name").getCnx().setAutoReadDisabledRateLimiting(true); - ((AbstractTopic)topicRef).producers.get("producer-name").getCnx().disableCnxAutoRead(); - ((AbstractTopic)topicRef).producers.get("producer-name").getCnx().enableCnxAutoRead(); + ((ServerCnx) cnx).setAutoReadDisabledRateLimiting(true); + cnx.disableCnxAutoRead(); + cnx.enableCnxAutoRead(); // Resume message publish. - ((AbstractTopic)topicRef).producers.get("producer-name").getCnx().setAutoReadDisabledRateLimiting(false); - ((AbstractTopic)topicRef).producers.get("producer-name").getCnx().enableCnxAutoRead(); + ((ServerCnx) cnx).setAutoReadDisabledRateLimiting(false); + cnx.enableCnxAutoRead(); Assert.assertFalse(pulsar.getBrokerService().isReachMessagePublishBufferThreshold()); List<CompletableFuture<MessageId>> futures = new ArrayList<>(); // Make sure the producer can publish succeed. diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java index 29829be..98cce6e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java @@ -173,6 +173,8 @@ public class PersistentDispatcherFailoverConsumerTest { doReturn(new InetSocketAddress("localhost", 1234)).when(serverCnx).clientAddress(); when(serverCnx.getRemoteEndpointProtocolVersion()).thenReturn(ProtocolVersion.v12.getNumber()); when(serverCnx.ctx()).thenReturn(channelCtx); + doReturn(new PulsarCommandSenderImpl(null, serverCnx)) + .when(serverCnx).getCommandSender(); serverCnxWithOldVersion = spy(new ServerCnx(pulsar)); doReturn(true).when(serverCnxWithOldVersion).isActive(); @@ -182,6 +184,8 @@ public class PersistentDispatcherFailoverConsumerTest { when(serverCnxWithOldVersion.getRemoteEndpointProtocolVersion()) .thenReturn(ProtocolVersion.v11.getNumber()); when(serverCnxWithOldVersion.ctx()).thenReturn(channelCtx); + doReturn(new PulsarCommandSenderImpl(null, serverCnxWithOldVersion)) + .when(serverCnxWithOldVersion).getCommandSender(); NamespaceService nsSvc = mock(NamespaceService.class); doReturn(nsSvc).when(pulsar).getNamespaceService(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java index bd56f35..ed49143 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java @@ -199,6 +199,8 @@ public class PersistentTopicTest extends MockedBookKeeperTestCase { doReturn(true).when(serverCnx).isActive(); doReturn(true).when(serverCnx).isWritable(); doReturn(new InetSocketAddress("localhost", 1234)).when(serverCnx).clientAddress(); + doReturn(new PulsarCommandSenderImpl(null, serverCnx)) + .when(serverCnx).getCommandSender(); NamespaceService nsSvc = mock(NamespaceService.class); doReturn(nsSvc).when(pulsar).getNamespaceService();
