This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 70c9f610 [Transaction] Transaction buffer take snapshot max read
position (#12219)
70c9f610 is described below
commit 70c9f610078be57bee4c1581e6df64f694bbb13a
Author: Xiangying Meng <[email protected]>
AuthorDate: Fri Oct 15 20:06:07 2021 +0800
[Transaction] Transaction buffer take snapshot max read position (#12219)
### Motivation
In the previous implementation of transactionBuffer, if no transaction
committed or aborted, TransactionBuffer will not take a snapshot. Even if it is
a timed task, it will judge whether there has been a transaction committed or
aborted. If no, the timer will be skipped.
**This will have a disadvantageļ¼**
* If there is no snapshot , maxReadPosition needs to be recovered from the
earliest when the broker restart.
### Modifications
Add a TransactionBuffer State: Unused. If no message with transaction was
sent, the state of transactionBuffer is Unused.
And if TransactionBuffer state is Unused, the TransactionBuffer is no need
to be recovered. When the first message with transaction was sent to buffer, it
will take a snapshot. And we have change the return value of takeSnapshot(),
make it can be control whether to execute synchronously
---
.../apache/pulsar/broker/service/ServerCnx.java | 124 ++++++++-------
.../org/apache/pulsar/broker/service/Topic.java | 8 +
.../service/nonpersistent/NonPersistentTopic.java | 5 +
.../broker/service/persistent/PersistentTopic.java | 123 +++++++-------
.../transaction/buffer/TransactionBuffer.java | 8 +
.../buffer/TransactionBufferProvider.java | 4 +-
.../buffer/impl/InMemTransactionBuffer.java | 7 +-
.../impl/InMemTransactionBufferProvider.java | 5 +-
.../buffer/impl/TopicTransactionBuffer.java | 176 ++++++++++++++-------
.../impl/TopicTransactionBufferProvider.java | 5 +-
.../TopicTransactionBufferRecoverCallBack.java | 6 +
.../buffer/impl/TopicTransactionBufferState.java | 18 ++-
.../buffer/impl/TransactionBufferDisable.java | 5 +
.../pulsar/broker/service/ServerCnxTest.java | 68 ++++----
.../TopicTransactionBufferRecoverTest.java | 27 ++--
.../pulsar/broker/transaction/TransactionTest.java | 49 ++++++
.../transaction/buffer/TransactionBufferTest.java | 2 +-
.../apache/pulsar/client/impl/ProducerImpl.java | 2 +-
.../apache/pulsar/common/protocol/Commands.java | 11 +-
pulsar-common/src/main/proto/PulsarApi.proto | 2 +
20 files changed, 415 insertions(+), 240 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index a5ed8f7..5be21cc 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -1124,6 +1124,7 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
final ProducerAccessMode producerAccessMode =
cmdProducer.getProducerAccessMode();
final Optional<Long> topicEpoch = cmdProducer.hasTopicEpoch()
? Optional.of(cmdProducer.getTopicEpoch()) : Optional.empty();
+ final boolean isTxnEnabled = cmdProducer.isTxnEnabled();
TopicName topicName = validateTopicName(cmdProducer.getTopic(),
requestId, cmdProducer);
if (topicName == null) {
@@ -1235,67 +1236,25 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
});
schemaVersionFuture.thenAccept(schemaVersion -> {
- CompletableFuture<Void> producerQueuedFuture =
new CompletableFuture<>();
- Producer producer = new Producer(topic,
ServerCnx.this, producerId, producerName,
- getPrincipal(), isEncrypted, metadata,
schemaVersion, epoch,
- userProvidedProducerName,
producerAccessMode, topicEpoch);
-
- topic.addProducer(producer,
producerQueuedFuture).thenAccept(newTopicEpoch -> {
- if (isActive()) {
- if (producerFuture.complete(producer))
{
- log.info("[{}] Created new
producer: {}", remoteAddress, producer);
-
commandSender.sendProducerSuccessResponse(requestId, producerName,
-
producer.getLastSequenceId(), producer.getSchemaVersion(),
- newTopicEpoch, true /*
producer is ready now */);
- return;
- } else {
- // The producer's future was
completed before by
- // a close command
- producer.closeNow(true);
- log.info("[{}] Cleared producer
created after"
- + " timeout on
client side {}",
- remoteAddress, producer);
- }
- } else {
- producer.closeNow(true);
- log.info("[{}] Cleared producer
created after connection was closed: {}",
- remoteAddress, producer);
- producerFuture.completeExceptionally(
- new IllegalStateException(
- "Producer created
after connection was closed"));
- }
-
- producers.remove(producerId,
producerFuture);
- }).exceptionally(ex -> {
- log.error("[{}] Failed to add producer to
topic {}: producerId={}, {}",
- remoteAddress, topicName,
producerId, ex.getMessage());
-
- producer.closeNow(true);
- if
(producerFuture.completeExceptionally(ex)) {
-
commandSender.sendErrorResponse(requestId,
-
BrokerServiceException.getClientErrorCode(ex), ex.getMessage());
- }
+
topic.checkIfTransactionBufferRecoverCompletely(isTxnEnabled).thenAccept(future
-> {
+ buildProducerAndAddTopic(topic,
producerId, producerName, requestId, isEncrypted,
+ metadata, schemaVersion, epoch,
userProvidedProducerName, topicName,
+ producerAccessMode, topicEpoch,
producerFuture);
+ }).exceptionally(exception -> {
+ Throwable cause = exception.getCause();
+ log.error("producerId {}, requestId {} :
TransactionBuffer recover failed",
+ producerId, requestId, exception);
+ commandSender.sendErrorResponse(requestId,
+
ServiceUnitNotReadyException.getClientErrorCode(cause),
+ cause.getMessage());
return null;
});
-
- producerQueuedFuture.thenRun(() -> {
- // If the producer is queued waiting, we
will get an immediate notification
- // that we need to pass to client
- if (isActive()) {
- log.info("[{}] Producer is waiting in
queue: {}", remoteAddress, producer);
-
commandSender.sendProducerSuccessResponse(requestId, producerName,
- producer.getLastSequenceId(),
producer.getSchemaVersion(),
- Optional.empty(), false/*
producer is not ready now */);
- }
- });
});
}).exceptionally(exception -> {
Throwable cause = exception.getCause();
-
if (cause instanceof NoSuchElementException) {
cause = new TopicNotFoundException("Topic Not
Found.");
}
-
if (!Exceptions.areExceptionsPresentInChain(cause,
ServiceUnitNotReadyException.class,
ManagedLedgerException.class)) {
// Do not print stack traces for expected
exceptions
@@ -1327,6 +1286,65 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
});
}
+ private void buildProducerAndAddTopic(Topic topic, long producerId, String
producerName, long requestId,
+ boolean isEncrypted, Map<String, String>
metadata, SchemaVersion schemaVersion, long epoch,
+ boolean userProvidedProducerName, TopicName
topicName,
+ ProducerAccessMode producerAccessMode,
+ Optional<Long> topicEpoch,
CompletableFuture<Producer> producerFuture){
+ CompletableFuture<Void> producerQueuedFuture = new
CompletableFuture<>();
+ Producer producer = new Producer(topic, ServerCnx.this, producerId,
producerName,
+ getPrincipal(), isEncrypted, metadata, schemaVersion, epoch,
+ userProvidedProducerName, producerAccessMode, topicEpoch);
+
+ topic.addProducer(producer,
producerQueuedFuture).thenAccept(newTopicEpoch -> {
+ if (isActive()) {
+ if (producerFuture.complete(producer)) {
+ log.info("[{}] Created new producer: {}", remoteAddress,
producer);
+ commandSender.sendProducerSuccessResponse(requestId,
producerName,
+ producer.getLastSequenceId(),
producer.getSchemaVersion(),
+ newTopicEpoch, true /* producer is ready now */);
+ return;
+ } else {
+ // The producer's future was completed before by
+ // a close command
+ producer.closeNow(true);
+ log.info("[{}] Cleared producer created after"
+ + " timeout on client side {}",
+ remoteAddress, producer);
+ }
+ } else {
+ producer.closeNow(true);
+ log.info("[{}] Cleared producer created after connection was
closed: {}",
+ remoteAddress, producer);
+ producerFuture.completeExceptionally(
+ new IllegalStateException(
+ "Producer created after connection was
closed"));
+ }
+
+ producers.remove(producerId, producerFuture);
+ }).exceptionally(ex -> {
+ log.error("[{}] Failed to add producer to topic {}: producerId={},
{}",
+ remoteAddress, topicName, producerId, ex.getMessage());
+
+ producer.closeNow(true);
+ if (producerFuture.completeExceptionally(ex)) {
+ commandSender.sendErrorResponse(requestId,
+ BrokerServiceException.getClientErrorCode(ex),
ex.getMessage());
+ }
+ return null;
+ });
+
+ producerQueuedFuture.thenRun(() -> {
+ // If the producer is queued waiting, we will get an immediate
notification
+ // that we need to pass to client
+ if (isActive()) {
+ log.info("[{}] Producer is waiting in queue: {}",
remoteAddress, producer);
+ commandSender.sendProducerSuccessResponse(requestId,
producerName,
+ producer.getLastSequenceId(),
producer.getSchemaVersion(),
+ Optional.empty(), false/* producer is not ready now
*/);
+ }
+ });
+ }
@Override
protected void handleSend(CommandSend send, ByteBuf headersAndPayload) {
checkArgument(state == State.Connected);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
index d02b54a..4e5c698 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
@@ -117,6 +117,14 @@ public interface Topic {
void removeProducer(Producer producer);
/**
+ * Wait TransactionBuffer Recovers completely.
+ * Take snapshot after TB Recovers completely.
+ * @param isTxnEnabled
+ * @return a future which has completely if isTxn = false. Or a future
return by takeSnapshot.
+ */
+ CompletableFuture<Void> checkIfTransactionBufferRecoverCompletely(boolean
isTxnEnabled);
+
+ /**
* record add-latency.
*/
void recordAddLatency(long latency, TimeUnit unit);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index 472f8a9..c1687a3 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -222,6 +222,11 @@ public class NonPersistentTopic extends AbstractTopic
implements Topic {
}
@Override
+ public CompletableFuture<Void>
checkIfTransactionBufferRecoverCompletely(boolean isTxnEnabled) {
+ return CompletableFuture.completedFuture(null);
+ }
+
+ @Override
public CompletableFuture<Consumer> subscribe(final TransportCnx cnx,
String subscriptionName, long consumerId,
SubType subType, int
priorityLevel, String consumerName,
boolean isDurable, MessageId
startMessageId,
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index d1f5a4e..64e4b7f 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -215,8 +215,6 @@ public class PersistentTopic extends AbstractTopic
private ScheduledFuture<?> fencedTopicMonitoringTask = null;
- // this future is for publish txn message in order.
- private volatile CompletableFuture<Void> transactionCompletableFuture;
@Getter
protected final TransactionBuffer transactionBuffer;
@@ -261,7 +259,6 @@ public class PersistentTopic extends AbstractTopic
brokerService.pulsar().getConfiguration().getDelayedDeliveryTickTimeMillis();
this.backloggedCursorThresholdEntries =
brokerService.pulsar().getConfiguration().getManagedLedgerCursorBackloggedThreshold();
- this.transactionCompletableFuture = new CompletableFuture<>();
initializeRateLimiterIfNeeded(Optional.empty());
registerTopicPolicyListener();
@@ -303,9 +300,8 @@ public class PersistentTopic extends AbstractTopic
if
(brokerService.getPulsar().getConfiguration().isTransactionCoordinatorEnabled()
&& !checkTopicIsEventsNames(topicName)) {
this.transactionBuffer = brokerService.getPulsar()
- .getTransactionBufferProvider().newTransactionBuffer(this,
transactionCompletableFuture);
+ .getTransactionBufferProvider().newTransactionBuffer(this);
} else {
- this.transactionCompletableFuture.complete(null);
this.transactionBuffer = new TransactionBufferDisable();
}
transactionBuffer.syncMaxReadPositionForNormalPublish((PositionImpl)
ledger.getLastConfirmedEntry());
@@ -357,12 +353,11 @@ public class PersistentTopic extends AbstractTopic
this.compactedTopic = new
CompactedTopicImpl(brokerService.pulsar().getBookKeeperClient());
this.backloggedCursorThresholdEntries =
brokerService.pulsar().getConfiguration().getManagedLedgerCursorBackloggedThreshold();
- this.transactionCompletableFuture = new CompletableFuture<>();
+
if
(brokerService.pulsar().getConfiguration().isTransactionCoordinatorEnabled()) {
this.transactionBuffer = brokerService.getPulsar()
- .getTransactionBufferProvider().newTransactionBuffer(this,
transactionCompletableFuture);
+ .getTransactionBufferProvider().newTransactionBuffer(this);
} else {
- this.transactionCompletableFuture.complete(null);
this.transactionBuffer = new TransactionBufferDisable();
}
}
@@ -575,6 +570,11 @@ public class PersistentTopic extends AbstractTopic
}
@Override
+ public CompletableFuture<Void>
checkIfTransactionBufferRecoverCompletely(boolean isTxnEnabled) {
+ return getTransactionBuffer().checkIfTBRecoverCompletely(isTxnEnabled);
+ }
+
+ @Override
protected CompletableFuture<Long> incrementTopicEpoch(Optional<Long>
currentEpoch) {
long newEpoch = currentEpoch.orElse(-1L) + 1;
return setTopicEpoch(newEpoch);
@@ -2956,71 +2956,62 @@ public class PersistentTopic extends AbstractTopic
public void publishTxnMessage(TxnID txnID, ByteBuf headersAndPayload,
PublishContext publishContext) {
pendingWriteOps.incrementAndGet();
// in order to avoid the opAddEntry retain
- headersAndPayload.retain();
+
// in order to promise the publish txn message orderly, we should
change the transactionCompletableFuture
- this.transactionCompletableFuture =
this.transactionCompletableFuture.thenAccept(v -> {
- try {
- if (isFenced) {
- publishContext.completed(new
TopicFencedException("fenced"), -1, -1);
- decrementPendingWriteOpsAndCheck();
- return;
- }
- if
(isExceedMaximumMessageSize(headersAndPayload.readableBytes())) {
- publishContext.completed(new NotAllowedException("Exceed
maximum message size")
- , -1, -1);
- decrementPendingWriteOpsAndCheck();
- return;
- }
- MessageDeduplication.MessageDupStatus status =
- messageDeduplication.isDuplicate(publishContext,
headersAndPayload);
- switch (status) {
- case NotDup:
- transactionBuffer.appendBufferToTxn(txnID,
publishContext.getSequenceId(), headersAndPayload)
- .thenAccept(position -> {
- // Message has been successfully persisted
-
messageDeduplication.recordMessagePersisted(publishContext,
- (PositionImpl) position);
- publishContext.completed(null,
((PositionImpl) position).getLedgerId(),
- ((PositionImpl)
position).getEntryId());
-
- decrementPendingWriteOpsAndCheck();
- })
- .exceptionally(throwable -> {
- addFailed((ManagedLedgerException)
throwable, publishContext);
- return null;
- });
- break;
- case Dup:
- // Immediately acknowledge duplicated message
- publishContext.completed(null, -1, -1);
- decrementPendingWriteOpsAndCheck();
- break;
- default:
- publishContext.completed(new
MessageDeduplication.MessageDupUnknownException(), -1, -1);
- decrementPendingWriteOpsAndCheck();
+ if (isFenced) {
+ publishContext.completed(new TopicFencedException("fenced"), -1,
-1);
+ decrementPendingWriteOpsAndCheck();
+ return;
+ }
+ if (isExceedMaximumMessageSize(headersAndPayload.readableBytes())) {
+ publishContext.completed(new NotAllowedException("Exceed maximum
message size")
+ , -1, -1);
+ decrementPendingWriteOpsAndCheck();
+ return;
+ }
+
+ MessageDeduplication.MessageDupStatus status =
+ messageDeduplication.isDuplicate(publishContext,
headersAndPayload);
+ switch (status) {
+ case NotDup:
+ transactionBuffer.appendBufferToTxn(txnID,
publishContext.getSequenceId(), headersAndPayload)
+ .thenAccept(position -> {
+ // Message has been successfully persisted
+
messageDeduplication.recordMessagePersisted(publishContext,
+ (PositionImpl) position);
+ publishContext.completed(null, ((PositionImpl)
position).getLedgerId(),
+ ((PositionImpl) position).getEntryId());
+
+ decrementPendingWriteOpsAndCheck();
+ })
+ .exceptionally(throwable -> {
+ addFailed((ManagedLedgerException) throwable,
publishContext);
+ return null;
+ });
+ break;
+ case Dup:
+ // Immediately acknowledge duplicated message
+ publishContext.completed(null, -1, -1);
+ decrementPendingWriteOpsAndCheck();
+ break;
+ default:
+ publishContext.completed(new
MessageDeduplication.MessageDupUnknownException(), -1, -1);
+ decrementPendingWriteOpsAndCheck();
+
+ }
- }
- } finally {
- headersAndPayload.release();
- }
- }).exceptionally(e -> {
- headersAndPayload.release();
- return null;
- });
}
@Override
public CompletableFuture<Void> endTxn(TxnID txnID, int txnAction, long
lowWaterMark) {
- return this.transactionCompletableFuture.thenCompose(future -> {
- if (TxnAction.COMMIT_VALUE == txnAction) {
- return transactionBuffer.commitTxn(txnID, lowWaterMark);
- } else if (TxnAction.ABORT_VALUE == txnAction) {
- return transactionBuffer.abortTxn(txnID, lowWaterMark);
- } else {
- return FutureUtil.failedFuture(new
NotAllowedException("Unsupported txnAction " + txnAction));
- }
- });
+ if (TxnAction.COMMIT_VALUE == txnAction) {
+ return transactionBuffer.commitTxn(txnID, lowWaterMark);
+ } else if (TxnAction.ABORT_VALUE == txnAction) {
+ return transactionBuffer.abortTxn(txnID, lowWaterMark);
+ } else {
+ return FutureUtil.failedFuture(new
NotAllowedException("Unsupported txnAction " + txnAction));
+ }
}
@Override
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBuffer.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBuffer.java
index 6ffc218..b0193fc 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBuffer.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBuffer.java
@@ -168,4 +168,12 @@ public interface TransactionBuffer {
* @return the transaction stats in buffer.
*/
TransactionBufferStats getStats();
+
+ /**
+ * Wait TransactionBuffer Recovers completely.
+ * Take snapshot after TB Recovers completely.
+ * @param isTxn
+ * @return a future which has completely if isTxn = false. Or a future
return by takeSnapshot.
+ */
+ CompletableFuture<Void> checkIfTBRecoverCompletely(boolean isTxn);
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferProvider.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferProvider.java
index 4b00e39..f167d51 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferProvider.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferProvider.java
@@ -21,7 +21,6 @@ package org.apache.pulsar.broker.transaction.buffer;
import static com.google.common.base.Preconditions.checkArgument;
import com.google.common.annotations.Beta;
import java.io.IOException;
-import java.util.concurrent.CompletableFuture;
import org.apache.pulsar.broker.service.Topic;
/**
@@ -55,8 +54,7 @@ public interface TransactionBufferProvider {
* Open the persistent transaction buffer.
*
* @param originTopic
- * @param transactionBufferFuture the transaction buffer future
* @return
*/
- TransactionBuffer newTransactionBuffer(Topic originTopic,
CompletableFuture<Void> transactionBufferFuture);
+ TransactionBuffer newTransactionBuffer(Topic originTopic);
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java
index 213c7d0..2c32d0b 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java
@@ -210,10 +210,9 @@ class InMemTransactionBuffer implements TransactionBuffer {
final ConcurrentMap<TxnID, TxnBuffer> buffers;
final Map<Long, Set<TxnID>> txnIndex;
- public InMemTransactionBuffer(Topic topic, CompletableFuture<Void>
transactionBufferFuture) {
+ public InMemTransactionBuffer(Topic topic) {
this.buffers = new ConcurrentHashMap<>();
this.txnIndex = new HashMap<>();
- transactionBufferFuture.complete(null);
}
@Override
@@ -381,4 +380,8 @@ class InMemTransactionBuffer implements TransactionBuffer {
return null;
}
+ @Override
+ public CompletableFuture<Void> checkIfTBRecoverCompletely(boolean isTxn) {
+ return CompletableFuture.completedFuture(null);
+ }
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBufferProvider.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBufferProvider.java
index 37da7c8..d14116b 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBufferProvider.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBufferProvider.java
@@ -18,7 +18,6 @@
*/
package org.apache.pulsar.broker.transaction.buffer.impl;
-import java.util.concurrent.CompletableFuture;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.transaction.buffer.TransactionBuffer;
import org.apache.pulsar.broker.transaction.buffer.TransactionBufferProvider;
@@ -29,7 +28,7 @@ import
org.apache.pulsar.broker.transaction.buffer.TransactionBufferProvider;
public class InMemTransactionBufferProvider implements
TransactionBufferProvider {
@Override
- public TransactionBuffer newTransactionBuffer(Topic originTopic,
CompletableFuture<Void> transactionBufferFuture) {
- return new InMemTransactionBuffer(originTopic,
transactionBufferFuture);
+ public TransactionBuffer newTransactionBuffer(Topic originTopic) {
+ return new InMemTransactionBuffer(originTopic);
}
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
index d3a2655..79f7f35 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
@@ -65,7 +65,7 @@ public class TopicTransactionBuffer extends
TopicTransactionBufferState implemen
private final PersistentTopic topic;
- private volatile PositionImpl maxReadPosition = PositionImpl.latest;
+ private volatile PositionImpl maxReadPosition;
/**
* Ongoing transaction, map for remove txn stable position, linked for
find max read position.
@@ -91,7 +91,9 @@ public class TopicTransactionBuffer extends
TopicTransactionBufferState implemen
private volatile long lastSnapshotTimestamps;
- public TopicTransactionBuffer(PersistentTopic topic,
CompletableFuture<Void> transactionBufferFuture) {
+ private final CompletableFuture<Void> transactionBufferFuture = new
CompletableFuture<>();
+
+ public TopicTransactionBuffer(PersistentTopic topic) {
super(State.None);
this.topic = topic;
this.changeToInitializingState();
@@ -102,6 +104,7 @@ public class TopicTransactionBuffer extends
TopicTransactionBufferState implemen
.getConfiguration().getTransactionBufferSnapshotMaxTransactionCount();
this.takeSnapshotIntervalTime = topic.getBrokerService().getPulsar()
.getConfiguration().getTransactionBufferSnapshotMinTimeInMillis();
+ this.maxReadPosition = (PositionImpl)
topic.getManagedLedger().getLastConfirmedEntry();
this.topic.getBrokerService().getPulsar().getTransactionReplayExecutor()
.execute(new TopicTransactionBufferRecover(new
TopicTransactionBufferRecoverCallBack() {
@Override
@@ -116,6 +119,15 @@ public class TopicTransactionBuffer extends
TopicTransactionBufferState implemen
}
@Override
+ public void noNeedToRecover() {
+ if (!changeToNoSnapshotState()) {
+ log.error("[{}]Transaction buffer recover fail",
topic.getName());
+ } else {
+ transactionBufferFuture.complete(null);
+ }
+ }
+
+ @Override
public void handleSnapshot(TransactionBufferSnapshot
snapshot) {
maxReadPosition =
PositionImpl.get(snapshot.getMaxReadPositionLedgerId(),
snapshot.getMaxReadPositionEntryId());
@@ -161,6 +173,38 @@ public class TopicTransactionBuffer extends
TopicTransactionBufferState implemen
}
@Override
+ public CompletableFuture<Void> checkIfTBRecoverCompletely(boolean
isTxnEnabled) {
+ if (!isTxnEnabled) {
+ return CompletableFuture.completedFuture(null);
+ } else {
+ CompletableFuture<Void> completableFuture = new
CompletableFuture<>();
+ transactionBufferFuture.thenRun(() -> {
+ if (checkIfNoSnapshot()) {
+ takeSnapshot().thenRun(() -> {
+ if (changeToReadyStateFromNoSnapshot()) {
+ timer.newTimeout(TopicTransactionBuffer.this,
+ takeSnapshotIntervalTime,
TimeUnit.MILLISECONDS);
+ }
+ completableFuture.complete(null);
+ }).exceptionally(exception -> {
+ log.error("Topic {} failed to take snapshot",
this.topic.getName());
+ completableFuture.completeExceptionally(exception);
+ return null;
+ });
+ } else {
+ completableFuture.complete(null);
+ }
+ }).exceptionally(exception -> {
+ log.error("Topic {}: TransactionBuffer recover failed",
this.topic.getName(), exception);
+ completableFuture.completeExceptionally(exception);
+ return null;
+ });
+ return completableFuture;
+ }
+ }
+
+
+ @Override
public CompletableFuture<Position> appendBufferToTxn(TxnID txnId, long
sequenceId, ByteBuf buffer) {
CompletableFuture<Position> completableFuture = new
CompletableFuture<>();
topic.getManagedLedger().asyncAddEntry(buffer, new
AsyncCallbacks.AddEntryCallback() {
@@ -202,32 +246,37 @@ public class TopicTransactionBuffer extends
TopicTransactionBufferState implemen
log.debug("Transaction {} commit on topic {}.", txnID.toString(),
topic.getName());
}
CompletableFuture<Void> completableFuture = new CompletableFuture<>();
-
- ByteBuf commitMarker = Markers.newTxnCommitMarker(-1L,
txnID.getMostSigBits(),
- txnID.getLeastSigBits());
-
- try {
- topic.getManagedLedger().asyncAddEntry(commitMarker, new
AsyncCallbacks.AddEntryCallback() {
- @Override
- public void addComplete(Position position, ByteBuf entryData,
Object ctx) {
- synchronized (TopicTransactionBuffer.this) {
- updateMaxReadPosition(txnID);
- handleLowWaterMark(txnID, lowWaterMark);
- clearAbortedTransactions();
- takeSnapshotByChangeTimes();
+ //Wait TB recover completely.
+ transactionBufferFuture.thenRun(() -> {
+ ByteBuf commitMarker = Markers.newTxnCommitMarker(-1L,
txnID.getMostSigBits(),
+ txnID.getLeastSigBits());
+ try {
+ topic.getManagedLedger().asyncAddEntry(commitMarker, new
AsyncCallbacks.AddEntryCallback() {
+ @Override
+ public void addComplete(Position position, ByteBuf
entryData, Object ctx) {
+ synchronized (TopicTransactionBuffer.this) {
+ updateMaxReadPosition(txnID);
+ handleLowWaterMark(txnID, lowWaterMark);
+ clearAbortedTransactions();
+ takeSnapshotByChangeTimes();
+ }
+ completableFuture.complete(null);
}
- completableFuture.complete(null);
- }
- @Override
- public void addFailed(ManagedLedgerException exception, Object
ctx) {
- log.error("Failed to commit for txn {}", txnID, exception);
- completableFuture.completeExceptionally(new
PersistenceException(exception));
- }
- }, null);
- } finally {
- commitMarker.release();
- }
+ @Override
+ public void addFailed(ManagedLedgerException exception,
Object ctx) {
+ log.error("Failed to commit for txn {}", txnID,
exception);
+ completableFuture.completeExceptionally(new
PersistenceException(exception));
+ }
+ }, null);
+ } finally {
+ commitMarker.release();
+ }
+ }).exceptionally(exception -> {
+ log.error("Transaction {} commit on topic {}.", txnID.toString(),
topic.getName(), exception);
+ completableFuture.completeExceptionally(exception);
+ return null;
+ });
return completableFuture;
}
@@ -237,32 +286,43 @@ public class TopicTransactionBuffer extends
TopicTransactionBufferState implemen
log.debug("Transaction {} abort on topic {}.", txnID.toString(),
topic.getName());
}
CompletableFuture<Void> completableFuture = new CompletableFuture<>();
-
- ByteBuf abortMarker = Markers.newTxnAbortMarker(-1L,
txnID.getMostSigBits(), txnID.getLeastSigBits());
- try {
- topic.getManagedLedger().asyncAddEntry(abortMarker, new
AsyncCallbacks.AddEntryCallback() {
- @Override
- public void addComplete(Position position, ByteBuf entryData,
Object ctx) {
- synchronized (TopicTransactionBuffer.this) {
- aborts.put(txnID, (PositionImpl) position);
- updateMaxReadPosition(txnID);
- handleLowWaterMark(txnID, lowWaterMark);
-
changeMaxReadPositionAndAddAbortTimes.getAndIncrement();
- clearAbortedTransactions();
- takeSnapshotByChangeTimes();
+ //Wait TB recover completely.
+ transactionBufferFuture.thenRun(() -> {
+ //no message sent, need not to add abort mark by txn timeout.
+ if (!checkIfReady()) {
+ completableFuture.complete(null);
+ return;
+ }
+ ByteBuf abortMarker = Markers.newTxnAbortMarker(-1L,
txnID.getMostSigBits(), txnID.getLeastSigBits());
+ try {
+ topic.getManagedLedger().asyncAddEntry(abortMarker, new
AsyncCallbacks.AddEntryCallback() {
+ @Override
+ public void addComplete(Position position, ByteBuf
entryData, Object ctx) {
+ synchronized (TopicTransactionBuffer.this) {
+ aborts.put(txnID, (PositionImpl) position);
+ updateMaxReadPosition(txnID);
+ handleLowWaterMark(txnID, lowWaterMark);
+
changeMaxReadPositionAndAddAbortTimes.getAndIncrement();
+ clearAbortedTransactions();
+ takeSnapshotByChangeTimes();
+ }
+ completableFuture.complete(null);
}
- completableFuture.complete(null);
- }
- @Override
- public void addFailed(ManagedLedgerException exception, Object
ctx) {
- log.error("Failed to abort for txn {}", txnID, exception);
- completableFuture.completeExceptionally(new
PersistenceException(exception));
- }
- }, null);
- } finally {
- abortMarker.release();
- }
+ @Override
+ public void addFailed(ManagedLedgerException exception,
Object ctx) {
+ log.error("Failed to abort for txn {}", txnID,
exception);
+ completableFuture.completeExceptionally(new
PersistenceException(exception));
+ }
+ }, null);
+ } finally {
+ abortMarker.release();
+ }
+ }).exceptionally(exception -> {
+ log.error("Transaction {} abort on topic {}.", txnID.toString(),
topic.getName());
+ completableFuture.completeExceptionally(exception);
+ return null;
+ });
return completableFuture;
}
@@ -308,9 +368,9 @@ public class TopicTransactionBuffer extends
TopicTransactionBufferState implemen
takeSnapshotIntervalTime, TimeUnit.MILLISECONDS);
}
- private void takeSnapshot() {
+ private CompletableFuture<Void> takeSnapshot() {
changeMaxReadPositionAndAddAbortTimes.set(0);
- takeSnapshotWriter.thenAccept(writer -> {
+ return takeSnapshotWriter.thenCompose(writer -> {
TransactionBufferSnapshot snapshot = new
TransactionBufferSnapshot();
synchronized (TopicTransactionBuffer.this) {
snapshot.setTopicName(topic.getName());
@@ -327,7 +387,7 @@ public class TopicTransactionBuffer extends
TopicTransactionBufferState implemen
});
snapshot.setAborts(list);
}
- writer.writeAsync(snapshot).thenAccept((messageId) -> {
+ return writer.writeAsync(snapshot).thenAccept(messageId-> {
this.lastSnapshotTimestamps = System.currentTimeMillis();
if (log.isDebugEnabled()) {
log.debug("[{}]Transaction buffer take snapshot success! "
@@ -339,7 +399,6 @@ public class TopicTransactionBuffer extends
TopicTransactionBufferState implemen
});
});
}
-
private void clearAbortedTransactions() {
while (!aborts.isEmpty() && !((ManagedLedgerImpl)
topic.getManagedLedger())
.ledgerExists(aborts.get(aborts.firstKey()).getLedgerId())) {
@@ -398,13 +457,14 @@ public class TopicTransactionBuffer extends
TopicTransactionBufferState implemen
synchronized (TopicTransactionBuffer.this) {
if (ongoingTxns.isEmpty()) {
maxReadPosition = position;
+ changeMaxReadPositionAndAddAbortTimes.incrementAndGet();
}
}
}
@Override
public PositionImpl getMaxReadPosition() {
- if (checkIfReady()) {
+ if (checkIfReady() || checkIfNoSnapshot()) {
return this.maxReadPosition;
} else {
return PositionImpl.earliest;
@@ -470,7 +530,9 @@ public class TopicTransactionBuffer extends
TopicTransactionBufferState implemen
topic.getBrokerService().getPulsar().getTransactionBufferSnapshotService()
.createReader(TopicName.get(topic.getName())).thenAcceptAsync(reader -> {
try {
+ boolean hasSnapshot = false;
while (reader.hasMoreEvents()) {
+ hasSnapshot = true;
Message<TransactionBufferSnapshot> message =
reader.readNext();
TransactionBufferSnapshot transactionBufferSnapshot =
message.getValue();
if
(topic.getName().equals(transactionBufferSnapshot.getTopicName())) {
@@ -480,6 +542,10 @@ public class TopicTransactionBuffer extends
TopicTransactionBufferState implemen
transactionBufferSnapshot.getMaxReadPositionEntryId());
}
}
+ if (!hasSnapshot) {
+ callBack.noNeedToRecover();
+ return;
+ }
} catch (PulsarClientException pulsarClientException) {
log.error("[{}]Transaction buffer recover fail when read "
+ "transactionBufferSnapshot!", topic.getName(),
pulsarClientException);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBufferProvider.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBufferProvider.java
index 258a2ee..e5737f4 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBufferProvider.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBufferProvider.java
@@ -18,7 +18,6 @@
*/
package org.apache.pulsar.broker.transaction.buffer.impl;
-import java.util.concurrent.CompletableFuture;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.transaction.buffer.TransactionBuffer;
@@ -30,7 +29,7 @@ import
org.apache.pulsar.broker.transaction.buffer.TransactionBufferProvider;
public class TopicTransactionBufferProvider implements
TransactionBufferProvider {
@Override
- public TransactionBuffer newTransactionBuffer(Topic originTopic,
CompletableFuture<Void> transactionBufferFuture) {
- return new TopicTransactionBuffer((PersistentTopic) originTopic,
transactionBufferFuture);
+ public TransactionBuffer newTransactionBuffer(Topic originTopic) {
+ return new TopicTransactionBuffer((PersistentTopic) originTopic);
}
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBufferRecoverCallBack.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBufferRecoverCallBack.java
index 3a4fc39..1640459 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBufferRecoverCallBack.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBufferRecoverCallBack.java
@@ -29,6 +29,12 @@ public interface TopicTransactionBufferRecoverCallBack {
void recoverComplete();
/**
+ * No message with transaction has ever been sent.
+ * Skip recovery procedure
+ */
+ void noNeedToRecover();
+
+ /**
* Handle transactionBufferSnapshot.
*
* @param snapshot the transaction buffer snapshot
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBufferState.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBufferState.java
index f9120bd..3b21398 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBufferState.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBufferState.java
@@ -32,7 +32,8 @@ public abstract class TopicTransactionBufferState {
None,
Initializing,
Ready,
- Close
+ Close,
+ NoSnapshot
}
private static final
AtomicReferenceFieldUpdater<TopicTransactionBufferState, State> STATE_UPDATER =
@@ -49,20 +50,33 @@ public abstract class TopicTransactionBufferState {
return (STATE_UPDATER.compareAndSet(this, State.Initializing,
State.Ready));
}
+ protected boolean changeToNoSnapshotState() {
+ return (STATE_UPDATER.compareAndSet(this, State.Initializing,
State.NoSnapshot));
+ }
+
protected boolean changeToInitializingState() {
return STATE_UPDATER.compareAndSet(this, State.None,
State.Initializing);
}
+ protected boolean changeToReadyStateFromNoSnapshot() {
+ return STATE_UPDATER.compareAndSet(this, State.NoSnapshot,
State.Ready);
+ }
+
protected boolean changeToCloseState() {
return (STATE_UPDATER.compareAndSet(this, State.Ready, State.Close)
|| STATE_UPDATER.compareAndSet(this, State.None, State.Close)
- || STATE_UPDATER.compareAndSet(this, State.Initializing,
State.Close));
+ || STATE_UPDATER.compareAndSet(this, State.Initializing,
State.Close)
+ || STATE_UPDATER.compareAndSet(this, State.NoSnapshot,
State.Close));
}
public boolean checkIfReady() {
return STATE_UPDATER.get(this) == State.Ready;
}
+ public boolean checkIfNoSnapshot() {
+ return STATE_UPDATER.get(this) == State.NoSnapshot;
+ }
+
public State getState() {
return STATE_UPDATER.get(this);
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java
index ff18924..f7e147d 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java
@@ -102,4 +102,9 @@ public class TransactionBufferDisable implements
TransactionBuffer {
public TransactionBufferStats getStats() {
return null;
}
+
+ @Override
+ public CompletableFuture<Void> checkIfTBRecoverCompletely(boolean isTxn) {
+ return CompletableFuture.completedFuture(null);
+ }
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
index 0c2ff3d..2dc57b5 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
@@ -406,7 +406,7 @@ public class ServerCnxTest {
// test PRODUCER success case
ByteBuf clientCommand = Commands.newProducer(successTopicName, 1 /*
producer id */, 1 /* request id */,
- "prod-name", Collections.emptyMap());
+ "prod-name", Collections.emptyMap(), false);
channel.writeInbound(clientCommand);
assertTrue(getResponse() instanceof CommandProducerSuccess);
@@ -417,7 +417,7 @@ public class ServerCnxTest {
// test PRODUCER error case
clientCommand = Commands.newProducer(failTopicName, 2, 2,
- "prod-name-2", Collections.emptyMap());
+ "prod-name-2", Collections.emptyMap(), false);
channel.writeInbound(clientCommand);
assertTrue(getResponse() instanceof CommandError);
@@ -436,12 +436,12 @@ public class ServerCnxTest {
doReturn(delayFuture).when(brokerService).getOrCreateTopic(any(String.class));
// Create producer first time
ByteBuf clientCommand = Commands.newProducer(successTopicName, 1 /*
producer id */, 1 /* request id */,
- "prod-name", Collections.emptyMap());
+ "prod-name", Collections.emptyMap(), false);
channel.writeInbound(clientCommand);
// Create producer second time
clientCommand = Commands.newProducer(successTopicName, 1 /* producer
id */, 1 /* request id */,
- "prod-name", Collections.emptyMap());
+ "prod-name", Collections.emptyMap(), false);
channel.writeInbound(clientCommand);
Object response = getResponse();
@@ -461,7 +461,7 @@ public class ServerCnxTest {
// test PRODUCER failure case
ByteBuf clientCommand = Commands.newProducer(nonOwnedTopicName, 1 /*
producer id */, 1 /* request id */,
- "prod-name", Collections.emptyMap());
+ "prod-name", Collections.emptyMap(), false);
channel.writeInbound(clientCommand);
Object response = getResponse();
@@ -487,7 +487,7 @@ public class ServerCnxTest {
// test PRODUCER success case
ByteBuf clientCommand = Commands.newProducer(successTopicName, 1 /*
producer id */, 1 /* request id */,
- "prod-name", Collections.emptyMap());
+ "prod-name", Collections.emptyMap(), false);
channel.writeInbound(clientCommand);
assertEquals(getResponse().getClass(), CommandProducerSuccess.class);
@@ -517,7 +517,7 @@ public class ServerCnxTest {
resetChannel();
setChannelConnected();
ByteBuf newProducerCmd = Commands.newProducer(nonExistentTopicName, 1
/* producer id */, 1 /* request id */,
- "prod-name", Collections.emptyMap());
+ "prod-name", Collections.emptyMap(), false);
channel.writeInbound(newProducerCmd);
assertTrue(getResponse() instanceof CommandError);
channel.finish();
@@ -551,14 +551,14 @@ public class ServerCnxTest {
resetChannel();
setChannelConnected();
ByteBuf clientCommand = Commands.newProducer(successTopicName, 1 /*
producer id */, 1 /* request id */,
- "prod-name", Collections.emptyMap());
+ "prod-name", Collections.emptyMap(), false);
channel.writeInbound(clientCommand);
assertTrue(getResponse() instanceof CommandProducerSuccess);
resetChannel();
setChannelConnected();
clientCommand = Commands.newProducer(topicWithNonLocalCluster, 1 /*
producer id */, 1 /* request id */,
- "prod-name", Collections.emptyMap());
+ "prod-name", Collections.emptyMap(), false);
channel.writeInbound(clientCommand);
assertTrue(getResponse() instanceof CommandError);
channel.finish();
@@ -579,7 +579,7 @@ public class ServerCnxTest {
resetChannel();
setChannelConnected();
ByteBuf newProducerCmd = Commands.newProducer(nonExistentTopicName, 1
/* producer id */, 1 /* request id */,
- "prod-name", Collections.emptyMap());
+ "prod-name", Collections.emptyMap(), false);
channel.writeInbound(newProducerCmd);
assertTrue(getResponse() instanceof CommandProducerSuccess);
@@ -616,7 +616,7 @@ public class ServerCnxTest {
setChannelConnected();
ByteBuf clientCommand = Commands.newProducer(successTopicName, 1 /*
producer id */, 1 /* request id */,
- null, Collections.emptyMap());
+ null, Collections.emptyMap(), false);
channel.writeInbound(clientCommand);
assertTrue(getResponse() instanceof CommandError);
@@ -629,7 +629,7 @@ public class ServerCnxTest {
setChannelConnected();
ByteBuf clientCommand = Commands.newProducer(successTopicName, 1 /*
producer id */, 1 /* request id */,
- "prod-name", Collections.emptyMap());
+ "prod-name", Collections.emptyMap(), false);
channel.writeInbound(clientCommand);
assertTrue(getResponse() instanceof CommandProducerSuccess);
@@ -656,12 +656,12 @@ public class ServerCnxTest {
String producerName = "my-producer";
ByteBuf clientCommand1 = Commands.newProducer(successTopicName, 1 /*
producer id */, 1 /* request id */,
- producerName, Collections.emptyMap());
+ producerName, Collections.emptyMap(), false);
channel.writeInbound(clientCommand1);
assertTrue(getResponse() instanceof CommandProducerSuccess);
ByteBuf clientCommand2 = Commands.newProducer(successTopicName, 2 /*
producer id */, 2 /* request id */,
- producerName, Collections.emptyMap());
+ producerName, Collections.emptyMap(), false);
channel.writeInbound(clientCommand2);
assertTrue(getResponse() instanceof CommandError);
@@ -678,7 +678,7 @@ public class ServerCnxTest {
String producerName = "my-producer";
ByteBuf createProducer1 = Commands.newProducer(successTopicName, 1 /*
producer id */, 1 /* request id */,
- producerName, Collections.emptyMap());
+ producerName, Collections.emptyMap(), false);
channel.writeInbound(createProducer1);
// Producer create succeeds
@@ -687,7 +687,7 @@ public class ServerCnxTest {
assertEquals(((CommandProducerSuccess) response).getRequestId(), 1);
ByteBuf createProducer2 = Commands.newProducer(successTopicName, 1 /*
producer id */, 2 /* request id */,
- producerName, Collections.emptyMap());
+ producerName, Collections.emptyMap(), false);
channel.writeInbound(createProducer2);
// 2nd producer create succeeds as well
@@ -788,14 +788,14 @@ public class ServerCnxTest {
String producerName = "my-producer";
ByteBuf createProducer1 = Commands.newProducer(successTopicName, 1 /*
producer id */, 1 /* request id */,
- producerName, Collections.emptyMap());
+ producerName, Collections.emptyMap(), false);
channel.writeInbound(createProducer1);
ByteBuf closeProducer = Commands.newCloseProducer(1 /* producer id */,
2 /* request id */ );
channel.writeInbound(closeProducer);
ByteBuf createProducer2 = Commands.newProducer(successTopicName, 1 /*
producer id */, 3 /* request id */,
- producerName, Collections.emptyMap());
+ producerName, Collections.emptyMap(), false);
channel.writeInbound(createProducer2);
// Complete the topic opening: It will make 2nd producer creation
successful
@@ -845,22 +845,22 @@ public class ServerCnxTest {
String producerName = "my-producer";
ByteBuf createProducer1 = Commands.newProducer(successTopicName, 1 /*
producer id */, 1 /* request id */,
- producerName, Collections.emptyMap());
+ producerName, Collections.emptyMap(), false);
channel.writeInbound(createProducer1);
ByteBuf closeProducer1 = Commands.newCloseProducer(1 /* producer id
*/, 2 /* request id */ );
channel.writeInbound(closeProducer1);
ByteBuf createProducer2 = Commands.newProducer(successTopicName, 1 /*
producer id */, 3 /* request id */,
- producerName, Collections.emptyMap());
+ producerName, Collections.emptyMap(), false);
channel.writeInbound(createProducer2);
ByteBuf createProducer3 = Commands.newProducer(successTopicName, 1 /*
producer id */, 4 /* request id */,
- producerName, Collections.emptyMap());
+ producerName, Collections.emptyMap(), false);
channel.writeInbound(createProducer3);
ByteBuf createProducer4 = Commands.newProducer(successTopicName, 1 /*
producer id */, 5 /* request id */,
- producerName, Collections.emptyMap());
+ producerName, Collections.emptyMap(), false);
channel.writeInbound(createProducer4);
// Close succeeds
@@ -924,14 +924,14 @@ public class ServerCnxTest {
String producerName = "my-producer";
ByteBuf createProducer1 = Commands.newProducer(failTopicName, 1 /*
producer id */, 1 /* request id */,
- producerName, Collections.emptyMap());
+ producerName, Collections.emptyMap(), false);
channel.writeInbound(createProducer1);
ByteBuf closeProducer = Commands.newCloseProducer(1 /* producer id */,
2 /* request id */ );
channel.writeInbound(closeProducer);
ByteBuf createProducer2 = Commands.newProducer(successTopicName, 1 /*
producer id */, 3 /* request id */,
- producerName, Collections.emptyMap());
+ producerName, Collections.emptyMap(), false);
channel.writeInbound(createProducer2);
// Now the topic gets opened.. It will make 2nd producer creation
successful
@@ -950,7 +950,7 @@ public class ServerCnxTest {
// Wait till the failtopic timeout interval
Thread.sleep(500);
ByteBuf createProducer3 = Commands.newProducer(successTopicName, 1 /*
producer id */, 4 /* request id */,
- producerName, Collections.emptyMap());
+ producerName, Collections.emptyMap(), false);
channel.writeInbound(createProducer3);
// 3rd producer succeeds because 2nd is already connected
@@ -1299,7 +1299,7 @@ public class ServerCnxTest {
// test success case: encrypted producer can connect
ByteBuf clientCommand =
Commands.newProducer(encryptionRequiredTopicName, 1 /* producer id */, 1 /*
request id */,
- "encrypted-producer", true, Collections.emptyMap());
+ "encrypted-producer", true, Collections.emptyMap(), false);
channel.writeInbound(clientCommand);
Object response = getResponse();
@@ -1328,7 +1328,7 @@ public class ServerCnxTest {
// test failure case: unencrypted producer cannot connect
ByteBuf clientCommand =
Commands.newProducer(encryptionRequiredTopicName, 2 /* producer id */, 2 /*
request id */,
- "unencrypted-producer", false, Collections.emptyMap());
+ "unencrypted-producer", false, Collections.emptyMap(), false);
channel.writeInbound(clientCommand);
Object response = getResponse();
@@ -1361,7 +1361,7 @@ public class ServerCnxTest {
// test failure case: unencrypted producer cannot connect
ByteBuf clientCommand =
Commands.newProducer(encryptionRequiredTopicName, 2 /* producer id */, 2 /*
request id */,
- "unencrypted-producer", false, Collections.emptyMap());
+ "unencrypted-producer", false, Collections.emptyMap(), false);
channel.writeInbound(clientCommand);
Object response = getResponse();
@@ -1391,7 +1391,7 @@ public class ServerCnxTest {
.getPoliciesAsync(TopicName.get(encryptionRequiredTopicName).getNamespaceObject());
ByteBuf clientCommand =
Commands.newProducer(encryptionRequiredTopicName, 1 /* producer id */, 1 /*
request id */,
- "prod-name", true, Collections.emptyMap());
+ "prod-name", true, Collections.emptyMap(), false);
channel.writeInbound(clientCommand);
assertTrue(getResponse() instanceof CommandProducerSuccess);
@@ -1428,7 +1428,7 @@ public class ServerCnxTest {
.getPoliciesAsync(TopicName.get(encryptionRequiredTopicName).getNamespaceObject());
ByteBuf clientCommand =
Commands.newProducer(encryptionRequiredTopicName, 1 /* producer id */, 1 /*
request id */,
- "prod-name", true, Collections.emptyMap());
+ "prod-name", true, Collections.emptyMap(), false);
channel.writeInbound(clientCommand);
assertTrue(getResponse() instanceof CommandProducerSuccess);
@@ -1613,7 +1613,7 @@ public class ServerCnxTest {
setChannelConnected();
ByteBuf clientCommand = Commands.newProducer(invalidTopicName, 1 /*
producer id */, 1 /* request id */,
- "prod-name", Collections.emptyMap());
+ "prod-name", Collections.emptyMap(), false);
channel.writeInbound(clientCommand);
Object obj = getResponse();
assertEquals(obj.getClass(), CommandError.class);
@@ -1653,7 +1653,7 @@ public class ServerCnxTest {
// Create producer first time
int producerId = 1;
ByteBuf clientCommand = Commands.newProducer(successTopicName,
producerId /* producer id */, 1 /* request id */,
- "prod-name", Collections.emptyMap());
+ "prod-name", Collections.emptyMap(), false);
channel.writeInbound(clientCommand);
ByteBuf closeProducerCmd = Commands.newCloseProducer(producerId, 2);
@@ -1664,7 +1664,7 @@ public class ServerCnxTest {
doReturn(CompletableFuture.completedFuture(false)).when(topic).hasSchema();
clientCommand = Commands.newProducer(successTopicName, producerId /*
producer id */, 1 /* request id */,
- "prod-name", Collections.emptyMap());
+ "prod-name", Collections.emptyMap(), false);
channel.writeInbound(clientCommand);
Object response = getResponse();
@@ -1705,7 +1705,7 @@ public class ServerCnxTest {
// Producer command when the service unit is not ready
ByteBuf clientCommand3 = Commands.newProducer(successTopicName, 1 /*
producer id */, 3 /* request id */,
- "p1" /* producer name */, Collections.emptyMap());
+ "p1" /* producer name */, Collections.emptyMap(), false);
channel.writeInbound(clientCommand3);
Object response3 = getResponse();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java
index b16dace..3607b45 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java
@@ -282,30 +282,33 @@ public class TopicTransactionBufferRecoverTest extends
TransactionTestBase {
tnx1.commit().get();
// wait timeout take snapshot
- TransactionBufferSnapshot transactionBufferSnapshot =
reader.readNext().getValue();
- assertEquals(transactionBufferSnapshot.getMaxReadPositionEntryId(),
((MessageIdImpl) messageId1).getEntryId() + 1);
- assertEquals(transactionBufferSnapshot.getMaxReadPositionLedgerId(),
((MessageIdImpl) messageId1).getLedgerId());
- assertFalse(reader.hasMessageAvailable());
+ Awaitility.await().untilAsserted(() -> {
+ TransactionBufferSnapshot transactionBufferSnapshot =
reader.readNext().getValue();
+
assertEquals(transactionBufferSnapshot.getMaxReadPositionEntryId(), -1);
+
assertEquals(transactionBufferSnapshot.getMaxReadPositionLedgerId(),
((MessageIdImpl) messageId1).getLedgerId());
+ transactionBufferSnapshot = reader.readNext().getValue();
+
assertEquals(transactionBufferSnapshot.getMaxReadPositionEntryId(),
((MessageIdImpl) messageId1).getEntryId() + 1);
+
assertEquals(transactionBufferSnapshot.getMaxReadPositionLedgerId(),
((MessageIdImpl) messageId1).getLedgerId());
+ assertFalse(reader.hasMessageAvailable());
+ });
// take snapshot by change times
MessageId messageId2 = producer.newMessage(tnx2).value("test").send();
tnx2.commit().get();
- MessageId messageId3 = producer.newMessage(tnx3).value("test").send();
- tnx3.commit().get();
TransactionBufferSnapshot snapshot = reader.readNext().getValue();
- assertEquals(snapshot.getMaxReadPositionEntryId(), ((MessageIdImpl)
messageId3).getEntryId() + 1);
- assertEquals(snapshot.getMaxReadPositionLedgerId(), ((MessageIdImpl)
messageId3).getLedgerId());
+ assertEquals(snapshot.getMaxReadPositionEntryId(), ((MessageIdImpl)
messageId2).getEntryId() + 1);
+ assertEquals(snapshot.getMaxReadPositionLedgerId(), ((MessageIdImpl)
messageId2).getLedgerId());
assertEquals(snapshot.getAborts().size(), 0);
assertFalse(reader.hasMessageAvailable());
- MessageId messageId4 =
producer.newMessage(abortTxn).value("test").send();
+ MessageId messageId3 =
producer.newMessage(abortTxn).value("test").send();
abortTxn.abort().get();
- transactionBufferSnapshot = reader.readNext().getValue();
- assertEquals(transactionBufferSnapshot.getMaxReadPositionEntryId(),
((MessageIdImpl) messageId4).getEntryId() + 1);
- assertEquals(transactionBufferSnapshot.getMaxReadPositionLedgerId(),
((MessageIdImpl) messageId4).getLedgerId());
+ TransactionBufferSnapshot transactionBufferSnapshot =
reader.readNext().getValue();
+ assertEquals(transactionBufferSnapshot.getMaxReadPositionEntryId(),
((MessageIdImpl) messageId3).getEntryId() + 1);
+ assertEquals(transactionBufferSnapshot.getMaxReadPositionLedgerId(),
((MessageIdImpl) messageId3).getLedgerId());
assertEquals(transactionBufferSnapshot.getAborts().size(), 1);
assertEquals(transactionBufferSnapshot.getAborts().get(0).getTxnIdLeastBits(),
((TransactionImpl) abortTxn).getTxnIdLeastBits());
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
index fb94638..971f8b2 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -35,15 +35,23 @@ import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.transaction.pendingack.PendingAckStore;
+import
org.apache.pulsar.broker.transaction.buffer.matadata.TransactionBufferSnapshot;
import org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStore;
import
org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStoreProvider;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Reader;
+import org.apache.pulsar.client.api.ReaderBuilder;
+import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.transaction.Transaction;
import org.apache.pulsar.client.api.transaction.TxnID;
+import org.apache.pulsar.common.events.EventsTopicNames;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
@@ -246,4 +254,45 @@ public class TransactionTest extends TransactionTestBase {
}
+ @Test
+ public void testTakeSnapshotBeforeBuildTxnProducer() throws Exception {
+ String topic = "persistent://" + NAMESPACE1 + "/testSnapShot";
+ admin.topics().createNonPartitionedTopic(topic);
+ PersistentTopic persistentTopic = (PersistentTopic)
getPulsarServiceList().get(0)
+ .getBrokerService().getTopic(topic, false)
+ .get().get();
+
+ ReaderBuilder<TransactionBufferSnapshot> readerBuilder = pulsarClient
+ .newReader(Schema.AVRO(TransactionBufferSnapshot.class))
+ .startMessageId(MessageId.earliest)
+ .topic(NAMESPACE1 + "/" +
EventsTopicNames.TRANSACTION_BUFFER_SNAPSHOT);
+ Reader<TransactionBufferSnapshot> reader = readerBuilder.create();
+
+ long waitSnapShotTime =
getPulsarServiceList().get(0).getConfiguration()
+ .getTransactionBufferSnapshotMinTimeInMillis();
+ Awaitility.await().atMost(waitSnapShotTime * 2, TimeUnit.MILLISECONDS)
+ .untilAsserted(() ->
Assert.assertFalse(reader.hasMessageAvailable()));
+
+ //test take snapshot by build producer by the transactionEnable client
+ Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+ .producerName("testSnapshot").sendTimeout(0, TimeUnit.SECONDS)
+ .topic(topic).enableBatching(true)
+ .create();
+
+ Awaitility.await().untilAsserted(() -> {
+ Message<TransactionBufferSnapshot> message1 = reader.readNext();
+ TransactionBufferSnapshot snapshot1 = message1.getValue();
+ Assert.assertEquals(snapshot1.getMaxReadPositionEntryId(), -1);
+ });
+
+ // test snapshot by publish normal messages.
+ producer.newMessage(Schema.STRING).value("common message send").send();
+ producer.newMessage(Schema.STRING).value("common message send").send();
+
+ Awaitility.await().untilAsserted(() -> {
+ Message<TransactionBufferSnapshot> message1 = reader.readNext();
+ TransactionBufferSnapshot snapshot1 = message1.getValue();
+ Assert.assertEquals(snapshot1.getMaxReadPositionEntryId(), 1);
+ });
+ }
}
\ No newline at end of file
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferTest.java
index be1face..2238490 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferTest.java
@@ -72,7 +72,7 @@ public class TransactionBufferTest {
@BeforeMethod
public void setup() throws Exception {
PersistentTopic persistentTopic = mock(PersistentTopic.class);
- this.buffer = this.provider.newTransactionBuffer(persistentTopic, new
CompletableFuture<>());
+ this.buffer = this.provider.newTransactionBuffer(persistentTopic);
}
@AfterMethod(alwaysRun = true)
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index 5177451..0f27a63 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -1385,7 +1385,7 @@ public class ProducerImpl<T> extends ProducerBase<T>
implements TimerTask, Conne
cnx.sendRequestWithId(
Commands.newProducer(topic, producerId, requestId,
producerName, conf.isEncryptionEnabled(), metadata,
schemaInfo, connectionHandler.getEpoch(),
userProvidedProducerName,
- conf.getAccessMode(), topicEpoch),
+ conf.getAccessMode(), topicEpoch,
client.conf.isEnableTransaction()),
requestId).thenAccept(response -> {
String producerName = response.getProducerName();
long lastSequenceId = response.getLastSequenceId();
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
index e2685cc..3c363e0 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
@@ -692,14 +692,14 @@ public class Commands {
@VisibleForTesting
public static ByteBuf newProducer(String topic, long producerId, long
requestId, String producerName,
- Map<String, String> metadata) {
- return newProducer(topic, producerId, requestId, producerName, false,
metadata);
+ Map<String, String> metadata, boolean isTxnEnabled) {
+ return newProducer(topic, producerId, requestId, producerName, false,
metadata, isTxnEnabled);
}
public static ByteBuf newProducer(String topic, long producerId, long
requestId, String producerName,
- boolean encrypted, Map<String, String> metadata) {
+ boolean encrypted, Map<String, String> metadata, boolean
isTxnEnabled) {
return newProducer(topic, producerId, requestId, producerName,
encrypted, metadata, null, 0, false,
- ProducerAccessMode.Shared, Optional.empty());
+ ProducerAccessMode.Shared, Optional.empty(), isTxnEnabled);
}
private static Schema.Type getSchemaType(SchemaType type) {
@@ -736,7 +736,7 @@ public class Commands {
public static ByteBuf newProducer(String topic, long producerId, long
requestId, String producerName,
boolean encrypted, Map<String, String> metadata, SchemaInfo
schemaInfo,
long epoch, boolean userProvidedProducerName,
- ProducerAccessMode accessMode, Optional<Long> topicEpoch) {
+ ProducerAccessMode accessMode, Optional<Long> topicEpoch, boolean
isTxnEnabled) {
BaseCommand cmd = localCmd(Type.PRODUCER);
CommandProducer producer = cmd.setProducer()
.setTopic(topic)
@@ -745,6 +745,7 @@ public class Commands {
.setEpoch(epoch)
.setUserProvidedProducerName(userProvidedProducerName)
.setEncrypted(encrypted)
+ .setTxnEnabled(isTxnEnabled)
.setProducerAccessMode(convertProducerAccessMode(accessMode));
if (producerName != null) {
producer.setProducerName(producerName);
diff --git a/pulsar-common/src/main/proto/PulsarApi.proto
b/pulsar-common/src/main/proto/PulsarApi.proto
index b08ec46..34ad37b 100644
--- a/pulsar-common/src/main/proto/PulsarApi.proto
+++ b/pulsar-common/src/main/proto/PulsarApi.proto
@@ -483,6 +483,8 @@ message CommandProducer {
// leave it empty and then it will always carry the same epoch number on
// the subsequent reconnections.
optional uint64 topic_epoch = 11;
+
+ optional bool txn_enabled = 12 [default = false];
}
message CommandSend {