This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch branch-2.8 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 28a223c50009c99d040c0a9f1f4702c75d026f38 Author: Matteo Merli <[email protected]> AuthorDate: Thu Sep 16 08:28:36 2021 -0700 Improved logic for pausing replicated subscription snapshots when no traffic (#11922) * Improved logic for pausing replicated subscription snapshots when no traffic * Removed unused import * Fixed flaky test ReplicatorTest.testRemoveClusterFromNamespace * Fixed cast that was not available in tests --- .../org/apache/pulsar/broker/service/Producer.java | 35 +++++++++++------- .../apache/pulsar/broker/service/ServerCnx.java | 7 ++-- .../org/apache/pulsar/broker/service/Topic.java | 4 +++ .../broker/service/persistent/PersistentTopic.java | 13 +++++++ .../ReplicatedSubscriptionsController.java | 41 +++++++++------------- .../ReplicatedSubscriptionsSnapshotBuilder.java | 4 +++ .../apache/pulsar/common/protocol/Commands.java | 4 +++ pulsar-common/src/main/proto/PulsarApi.proto | 3 ++ 8 files changed, 71 insertions(+), 40 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java index 12697f6..d13e2f9 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java @@ -157,14 +157,14 @@ public class Producer { } public void publishMessage(long producerId, long sequenceId, ByteBuf headersAndPayload, long batchSize, - boolean isChunked) { + boolean isChunked, boolean isMarker) { if (checkAndStartPublish(producerId, sequenceId, headersAndPayload, batchSize)) { - publishMessageToTopic(headersAndPayload, sequenceId, batchSize, isChunked); + publishMessageToTopic(headersAndPayload, sequenceId, batchSize, isChunked, isMarker); } } public void publishMessage(long producerId, long lowestSequenceId, long highestSequenceId, - ByteBuf headersAndPayload, long batchSize, boolean isChunked) { + ByteBuf headersAndPayload, long batchSize, boolean isChunked, boolean isMarker) { if (lowestSequenceId > highestSequenceId) { cnx.execute(() -> { cnx.getCommandSender().sendSendError(producerId, highestSequenceId, ServerError.MetadataError, @@ -174,7 +174,8 @@ public class Producer { return; } if (checkAndStartPublish(producerId, highestSequenceId, headersAndPayload, batchSize)) { - publishMessageToTopic(headersAndPayload, lowestSequenceId, highestSequenceId, batchSize, isChunked); + publishMessageToTopic(headersAndPayload, lowestSequenceId, highestSequenceId, batchSize, isChunked, + isMarker); } } @@ -219,19 +220,20 @@ public class Producer { return true; } - private void publishMessageToTopic(ByteBuf headersAndPayload, long sequenceId, long batchSize, boolean isChunked) { + private void publishMessageToTopic(ByteBuf headersAndPayload, long sequenceId, long batchSize, boolean isChunked, + boolean isMarker) { topic.publishMessage(headersAndPayload, MessagePublishContext.get(this, sequenceId, msgIn, headersAndPayload.readableBytes(), batchSize, - isChunked, System.nanoTime())); + isChunked, System.nanoTime(), isMarker)); } private void publishMessageToTopic(ByteBuf headersAndPayload, long lowestSequenceId, long highestSequenceId, - long batchSize, boolean isChunked) { + long batchSize, boolean isChunked, boolean isMarker) { topic.publishMessage(headersAndPayload, MessagePublishContext.get(this, lowestSequenceId, highestSequenceId, msgIn, headersAndPayload.readableBytes(), batchSize, - isChunked, System.nanoTime())); + isChunked, System.nanoTime(), isMarker)); } private boolean verifyChecksum(ByteBuf headersAndPayload) { @@ -313,6 +315,7 @@ public class Producer { private int msgSize; private long batchSize; private boolean chunked; + private boolean isMarker; private long startTimeNs; @@ -437,7 +440,7 @@ public class Producer { } static MessagePublishContext get(Producer producer, long sequenceId, Rate rateIn, int msgSize, - long batchSize, boolean chunked, long startTimeNs) { + long batchSize, boolean chunked, long startTimeNs, boolean isMarker) { MessagePublishContext callback = RECYCLER.get(); callback.producer = producer; callback.sequenceId = sequenceId; @@ -448,11 +451,12 @@ public class Producer { callback.originalProducerName = null; callback.originalSequenceId = -1L; callback.startTimeNs = startTimeNs; + callback.isMarker = isMarker; return callback; } static MessagePublishContext get(Producer producer, long lowestSequenceId, long highestSequenceId, Rate rateIn, - int msgSize, long batchSize, boolean chunked, long startTimeNs) { + int msgSize, long batchSize, boolean chunked, long startTimeNs, boolean isMarker) { MessagePublishContext callback = RECYCLER.get(); callback.producer = producer; callback.sequenceId = lowestSequenceId; @@ -464,6 +468,7 @@ public class Producer { callback.originalSequenceId = -1L; callback.startTimeNs = startTimeNs; callback.chunked = chunked; + callback.isMarker = isMarker; return callback; } @@ -472,6 +477,11 @@ public class Producer { return batchSize; } + @Override + public boolean isMarkerMessage() { + return isMarker; + } + private final Handle<MessagePublishContext> recyclerHandle; private MessagePublishContext(Handle<MessagePublishContext> recyclerHandle) { @@ -497,6 +507,7 @@ public class Producer { batchSize = 0L; startTimeNs = -1L; chunked = false; + isMarker = false; recyclerHandle.recycle(this); } } @@ -652,11 +663,11 @@ public class Producer { } public void publishTxnMessage(TxnID txnID, long producerId, long sequenceId, long highSequenceId, - ByteBuf headersAndPayload, long batchSize, boolean isChunked) { + ByteBuf headersAndPayload, long batchSize, boolean isChunked, boolean isMarker) { checkAndStartPublish(producerId, sequenceId, headersAndPayload, batchSize); topic.publishTxnMessage(txnID, headersAndPayload, MessagePublishContext.get(this, sequenceId, highSequenceId, msgIn, - headersAndPayload.readableBytes(), batchSize, isChunked, System.nanoTime())); + headersAndPayload.readableBytes(), batchSize, isChunked, System.nanoTime(), isMarker)); } public SchemaVersion getSchemaVersion() { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 84bba9c..340685b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -1361,17 +1361,18 @@ public class ServerCnx extends PulsarHandler implements TransportCnx { if (send.hasTxnidMostBits() && send.hasTxnidLeastBits()) { TxnID txnID = new TxnID(send.getTxnidMostBits(), send.getTxnidLeastBits()); producer.publishTxnMessage(txnID, producer.getProducerId(), send.getSequenceId(), - send.getHighestSequenceId(), headersAndPayload, send.getNumMessages(), send.isIsChunk()); + send.getHighestSequenceId(), headersAndPayload, send.getNumMessages(), send.isIsChunk(), + send.isMarker()); return; } // Persist the message if (send.hasHighestSequenceId() && send.getSequenceId() <= send.getHighestSequenceId()) { producer.publishMessage(send.getProducerId(), send.getSequenceId(), send.getHighestSequenceId(), - headersAndPayload, send.getNumMessages(), send.isIsChunk()); + headersAndPayload, send.getNumMessages(), send.isIsChunk(), send.isMarker()); } else { producer.publishMessage(send.getProducerId(), send.getSequenceId(), headersAndPayload, - send.getNumMessages(), send.isIsChunk()); + send.getNumMessages(), send.isIsChunk(), send.isMarker()); } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java index eaed76c..2b92560 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java @@ -94,6 +94,10 @@ public interface Topic { default long getNumberOfMessages() { return 1L; } + + default boolean isMarkerMessage() { + return false; + } } void publishMessage(ByteBuf headersAndPayload, PublishContext callback); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index e165916..371971a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -30,6 +30,7 @@ import com.google.common.collect.Maps; import com.google.common.collect.Sets; import io.netty.buffer.ByteBuf; import io.netty.util.concurrent.FastThreadLocal; +import java.time.Clock; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -225,6 +226,9 @@ public class PersistentTopic extends AbstractTopic private final LongAdder bytesOutFromRemovedSubscriptions = new LongAdder(); private final LongAdder msgOutFromRemovedSubscriptions = new LongAdder(); + // Record the last time a data message (ie: not an internal Pulsar marker) is published on the topic + private long lastDataMessagePublishedTimestamp = 0; + private static class TopicStatsHelper { public double averageMsgSize; public double aggMsgRateIn; @@ -491,6 +495,11 @@ public class PersistentTopic extends AbstractTopic // Message has been successfully persisted messageDeduplication.recordMessagePersisted(publishContext, position); + + if (!publishContext.isMarkerMessage()) { + lastDataMessagePublishedTimestamp = Clock.systemUTC().millis(); + } + publishContext.setMetadataFromEntryData(entryData); publishContext.completed(null, position.getLedgerId(), position.getEntryId()); // in order to sync the max position when cursor read entries @@ -3242,4 +3251,8 @@ public class PersistentTopic extends AbstractTopic } return subscription.getPendingAckManageLedger(); } + + public long getLastDataMessagePublishedTimestamp() { + return lastDataMessagePublishedTimestamp; + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java index dc03962..7beeaa0 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java @@ -36,7 +36,6 @@ import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.commons.lang3.mutable.MutableBoolean; -import org.apache.pulsar.broker.service.Producer; import org.apache.pulsar.broker.service.Topic; import org.apache.pulsar.common.api.proto.ClusterMessageId; import org.apache.pulsar.common.api.proto.CommandAck.AckType; @@ -56,9 +55,12 @@ import org.apache.pulsar.common.protocol.Markers; public class ReplicatedSubscriptionsController implements AutoCloseable, Topic.PublishContext { private final PersistentTopic topic; private final String localCluster; + + // The timestamp of when the last snapshot was initiated + private long lastCompletedSnapshotStartTime = 0; + private String lastCompletedSnapshotId; - private boolean skippedSnapshotForNoProducers = false; private volatile Position positionOfLastLocalMarker; private final ScheduledFuture<?> timer; @@ -192,28 +194,7 @@ public class ReplicatedSubscriptionsController implements AutoCloseable, Topic.P private void startNewSnapshot() { cleanupTimedOutSnapshots(); - boolean hasLocalProducer = false; - for (Producer p : topic.getProducers().values()) { - if (!p.isRemote()) { - hasLocalProducer = true; - break; - } - } - - if (!hasLocalProducer) { - if (!skippedSnapshotForNoProducers) { - skippedSnapshotForNoProducers = true; - if (log.isDebugEnabled()) { - log.debug("[{}] There are no local producers: Skipping 1 snapshot", topic.getName()); - } - - return; - } - } - - skippedSnapshotForNoProducers = false; - - if (topic.getLastPosition() != null && topic.getLastPosition().equals(positionOfLastLocalMarker)) { + if (topic.getLastDataMessagePublishedTimestamp() < lastCompletedSnapshotStartTime) { // There was no message written since the last snapshot, we can skip creating a new snapshot if (log.isDebugEnabled()) { log.debug("[{}] There is no new data in topic. Skipping snapshot creation.", topic.getName()); @@ -269,9 +250,13 @@ public class ReplicatedSubscriptionsController implements AutoCloseable, Topic.P } void snapshotCompleted(String snapshotId) { - pendingSnapshots.remove(snapshotId); + ReplicatedSubscriptionsSnapshotBuilder snapshot = pendingSnapshots.remove(snapshotId); pendingSnapshotsMetric.dec(); lastCompletedSnapshotId = snapshotId; + + if (snapshot != null) { + lastCompletedSnapshotStartTime = snapshot.getStartTimeMillis(); + } } void writeMarker(ByteBuf marker) { @@ -305,6 +290,12 @@ public class ReplicatedSubscriptionsController implements AutoCloseable, Topic.P } @Override + public boolean isMarkerMessage() { + // Everything published by this controller will be a marker a message + return true; + } + + @Override public void close() { timer.cancel(true); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsSnapshotBuilder.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsSnapshotBuilder.java index 42c6138..38fcf7b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsSnapshotBuilder.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsSnapshotBuilder.java @@ -131,4 +131,8 @@ public class ReplicatedSubscriptionsSnapshotBuilder { boolean isTimedOut() { return (startTimeMillis + timeoutMillis) < clock.millis(); } + + long getStartTimeMillis() { + return startTimeMillis; + } } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java index 4484ab9..241ae87 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java @@ -504,6 +504,10 @@ public class Commands { send.setIsChunk(true); } + if (messageData.hasMarkerType()) { + send.setMarker(true); + } + return serializeCommandSendWithSize(cmd, checksumType, messageData, payload); } diff --git a/pulsar-common/src/main/proto/PulsarApi.proto b/pulsar-common/src/main/proto/PulsarApi.proto index b26024f..cabd099 100644 --- a/pulsar-common/src/main/proto/PulsarApi.proto +++ b/pulsar-common/src/main/proto/PulsarApi.proto @@ -493,6 +493,9 @@ message CommandSend { /// Add highest sequence id to support batch message with external sequence id optional uint64 highest_sequence_id = 6 [default = 0]; optional bool is_chunk =7 [default = false]; + + // Specify if the message being published is a Pulsar marker or not + optional bool marker = 8 [default = false]; } message CommandSendReceipt {
