This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 70c9f610 [Transaction] Transaction buffer take snapshot max read 
position (#12219)
70c9f610 is described below

commit 70c9f610078be57bee4c1581e6df64f694bbb13a
Author: Xiangying Meng <[email protected]>
AuthorDate: Fri Oct 15 20:06:07 2021 +0800

    [Transaction] Transaction buffer take snapshot max read position (#12219)
    
    ### Motivation
    In the previous implementation of transactionBuffer, if no transaction 
committed or aborted, TransactionBuffer will not take a snapshot. Even if it is 
a timed task, it will judge whether there has been a transaction committed or 
aborted. If no, the timer will  be skipped.
    **This will have a disadvantage:**
    * If there is no snapshot , maxReadPosition needs to be recovered from the 
earliest when the broker restart.
    
    ### Modifications
    Add a TransactionBuffer State: Unused. If no message with transaction was 
sent, the state of transactionBuffer is Unused.
    And if TransactionBuffer state is Unused, the TransactionBuffer is no need 
to be recovered. When the first message with transaction was sent to buffer, it 
will take a snapshot. And we have change the return value of takeSnapshot(), 
make it can be control whether to execute synchronously
---
 .../apache/pulsar/broker/service/ServerCnx.java    | 124 ++++++++-------
 .../org/apache/pulsar/broker/service/Topic.java    |   8 +
 .../service/nonpersistent/NonPersistentTopic.java  |   5 +
 .../broker/service/persistent/PersistentTopic.java | 123 +++++++-------
 .../transaction/buffer/TransactionBuffer.java      |   8 +
 .../buffer/TransactionBufferProvider.java          |   4 +-
 .../buffer/impl/InMemTransactionBuffer.java        |   7 +-
 .../impl/InMemTransactionBufferProvider.java       |   5 +-
 .../buffer/impl/TopicTransactionBuffer.java        | 176 ++++++++++++++-------
 .../impl/TopicTransactionBufferProvider.java       |   5 +-
 .../TopicTransactionBufferRecoverCallBack.java     |   6 +
 .../buffer/impl/TopicTransactionBufferState.java   |  18 ++-
 .../buffer/impl/TransactionBufferDisable.java      |   5 +
 .../pulsar/broker/service/ServerCnxTest.java       |  68 ++++----
 .../TopicTransactionBufferRecoverTest.java         |  27 ++--
 .../pulsar/broker/transaction/TransactionTest.java |  49 ++++++
 .../transaction/buffer/TransactionBufferTest.java  |   2 +-
 .../apache/pulsar/client/impl/ProducerImpl.java    |   2 +-
 .../apache/pulsar/common/protocol/Commands.java    |  11 +-
 pulsar-common/src/main/proto/PulsarApi.proto       |   2 +
 20 files changed, 415 insertions(+), 240 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index a5ed8f7..5be21cc 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -1124,6 +1124,7 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
         final ProducerAccessMode producerAccessMode = 
cmdProducer.getProducerAccessMode();
         final Optional<Long> topicEpoch = cmdProducer.hasTopicEpoch()
                 ? Optional.of(cmdProducer.getTopicEpoch()) : Optional.empty();
+        final boolean isTxnEnabled = cmdProducer.isTxnEnabled();
 
         TopicName topicName = validateTopicName(cmdProducer.getTopic(), 
requestId, cmdProducer);
         if (topicName == null) {
@@ -1235,67 +1236,25 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
                             });
 
                             schemaVersionFuture.thenAccept(schemaVersion -> {
-                                CompletableFuture<Void> producerQueuedFuture = 
new CompletableFuture<>();
-                                Producer producer = new Producer(topic, 
ServerCnx.this, producerId, producerName,
-                                        getPrincipal(), isEncrypted, metadata, 
schemaVersion, epoch,
-                                        userProvidedProducerName, 
producerAccessMode, topicEpoch);
-
-                                topic.addProducer(producer, 
producerQueuedFuture).thenAccept(newTopicEpoch -> {
-                                    if (isActive()) {
-                                        if (producerFuture.complete(producer)) 
{
-                                            log.info("[{}] Created new 
producer: {}", remoteAddress, producer);
-                                            
commandSender.sendProducerSuccessResponse(requestId, producerName,
-                                                    
producer.getLastSequenceId(), producer.getSchemaVersion(),
-                                                    newTopicEpoch, true /* 
producer is ready now */);
-                                            return;
-                                        } else {
-                                            // The producer's future was 
completed before by
-                                            // a close command
-                                            producer.closeNow(true);
-                                            log.info("[{}] Cleared producer 
created after"
-                                                            + " timeout on 
client side {}",
-                                                    remoteAddress, producer);
-                                        }
-                                        } else {
-                                            producer.closeNow(true);
-                                        log.info("[{}] Cleared producer 
created after connection was closed: {}",
-                                                remoteAddress, producer);
-                                        producerFuture.completeExceptionally(
-                                                new IllegalStateException(
-                                                        "Producer created 
after connection was closed"));
-                                        }
-
-                                        producers.remove(producerId, 
producerFuture);
-                                }).exceptionally(ex -> {
-                                    log.error("[{}] Failed to add producer to 
topic {}: producerId={}, {}",
-                                              remoteAddress, topicName, 
producerId, ex.getMessage());
-
-                                    producer.closeNow(true);
-                                    if 
(producerFuture.completeExceptionally(ex)) {
-                                        
commandSender.sendErrorResponse(requestId,
-                                                
BrokerServiceException.getClientErrorCode(ex), ex.getMessage());
-                                    }
+                                
topic.checkIfTransactionBufferRecoverCompletely(isTxnEnabled).thenAccept(future 
-> {
+                                    buildProducerAndAddTopic(topic, 
producerId, producerName, requestId, isEncrypted,
+                                            metadata, schemaVersion, epoch, 
userProvidedProducerName, topicName,
+                                            producerAccessMode, topicEpoch, 
producerFuture);
+                                }).exceptionally(exception -> {
+                                    Throwable cause = exception.getCause();
+                                    log.error("producerId {}, requestId {} : 
TransactionBuffer recover failed",
+                                            producerId, requestId, exception);
+                                    commandSender.sendErrorResponse(requestId,
+                                            
ServiceUnitNotReadyException.getClientErrorCode(cause),
+                                            cause.getMessage());
                                     return null;
                                 });
-
-                                producerQueuedFuture.thenRun(() -> {
-                                    // If the producer is queued waiting, we 
will get an immediate notification
-                                    // that we need to pass to client
-                                    if (isActive()) {
-                                        log.info("[{}] Producer is waiting in 
queue: {}", remoteAddress, producer);
-                                        
commandSender.sendProducerSuccessResponse(requestId, producerName,
-                                                producer.getLastSequenceId(), 
producer.getSchemaVersion(),
-                                                Optional.empty(), false/* 
producer is not ready now */);
-                                    }
-                                });
                             });
                         }).exceptionally(exception -> {
                             Throwable cause = exception.getCause();
-
                             if (cause instanceof NoSuchElementException) {
                                 cause = new TopicNotFoundException("Topic Not 
Found.");
                             }
-
                             if (!Exceptions.areExceptionsPresentInChain(cause,
                                     ServiceUnitNotReadyException.class, 
ManagedLedgerException.class)) {
                                 // Do not print stack traces for expected 
exceptions
@@ -1327,6 +1286,65 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
         });
     }
 
+    private void buildProducerAndAddTopic(Topic topic, long producerId, String 
producerName, long requestId,
+                             boolean isEncrypted, Map<String, String> 
metadata, SchemaVersion schemaVersion, long epoch,
+                             boolean userProvidedProducerName, TopicName 
topicName,
+                             ProducerAccessMode producerAccessMode,
+                             Optional<Long> topicEpoch, 
CompletableFuture<Producer> producerFuture){
+        CompletableFuture<Void> producerQueuedFuture = new 
CompletableFuture<>();
+        Producer producer = new Producer(topic, ServerCnx.this, producerId, 
producerName,
+                getPrincipal(), isEncrypted, metadata, schemaVersion, epoch,
+                userProvidedProducerName, producerAccessMode, topicEpoch);
+
+        topic.addProducer(producer, 
producerQueuedFuture).thenAccept(newTopicEpoch -> {
+            if (isActive()) {
+                if (producerFuture.complete(producer)) {
+                    log.info("[{}] Created new producer: {}", remoteAddress, 
producer);
+                    commandSender.sendProducerSuccessResponse(requestId, 
producerName,
+                            producer.getLastSequenceId(), 
producer.getSchemaVersion(),
+                            newTopicEpoch, true /* producer is ready now */);
+                    return;
+                } else {
+                    // The producer's future was completed before by
+                    // a close command
+                    producer.closeNow(true);
+                    log.info("[{}] Cleared producer created after"
+                                    + " timeout on client side {}",
+                            remoteAddress, producer);
+                }
+            } else {
+                producer.closeNow(true);
+                log.info("[{}] Cleared producer created after connection was 
closed: {}",
+                        remoteAddress, producer);
+                producerFuture.completeExceptionally(
+                        new IllegalStateException(
+                                "Producer created after connection was 
closed"));
+            }
+
+            producers.remove(producerId, producerFuture);
+        }).exceptionally(ex -> {
+            log.error("[{}] Failed to add producer to topic {}: producerId={}, 
{}",
+                    remoteAddress, topicName, producerId, ex.getMessage());
+
+            producer.closeNow(true);
+            if (producerFuture.completeExceptionally(ex)) {
+                commandSender.sendErrorResponse(requestId,
+                        BrokerServiceException.getClientErrorCode(ex), 
ex.getMessage());
+            }
+            return null;
+        });
+
+        producerQueuedFuture.thenRun(() -> {
+            // If the producer is queued waiting, we will get an immediate 
notification
+            // that we need to pass to client
+            if (isActive()) {
+                log.info("[{}] Producer is waiting in queue: {}", 
remoteAddress, producer);
+                commandSender.sendProducerSuccessResponse(requestId, 
producerName,
+                        producer.getLastSequenceId(), 
producer.getSchemaVersion(),
+                        Optional.empty(), false/* producer is not ready now 
*/);
+            }
+        });
+    }
     @Override
     protected void handleSend(CommandSend send, ByteBuf headersAndPayload) {
         checkArgument(state == State.Connected);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
index d02b54a..4e5c698 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
@@ -117,6 +117,14 @@ public interface Topic {
     void removeProducer(Producer producer);
 
     /**
+     * Wait TransactionBuffer Recovers completely.
+     * Take snapshot after TB Recovers completely.
+     * @param isTxnEnabled
+     * @return a future which has completely if isTxn = false. Or a future 
return by takeSnapshot.
+     */
+    CompletableFuture<Void> checkIfTransactionBufferRecoverCompletely(boolean 
isTxnEnabled);
+
+    /**
      * record add-latency.
      */
     void recordAddLatency(long latency, TimeUnit unit);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index 472f8a9..c1687a3 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -222,6 +222,11 @@ public class NonPersistentTopic extends AbstractTopic 
implements Topic {
     }
 
     @Override
+    public CompletableFuture<Void> 
checkIfTransactionBufferRecoverCompletely(boolean isTxnEnabled) {
+        return  CompletableFuture.completedFuture(null);
+    }
+
+    @Override
     public CompletableFuture<Consumer> subscribe(final TransportCnx cnx, 
String subscriptionName, long consumerId,
                                                  SubType subType, int 
priorityLevel, String consumerName,
                                                  boolean isDurable, MessageId 
startMessageId,
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index d1f5a4e..64e4b7f 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -215,8 +215,6 @@ public class PersistentTopic extends AbstractTopic
 
     private ScheduledFuture<?> fencedTopicMonitoringTask = null;
 
-    // this future is for publish txn message in order.
-    private volatile CompletableFuture<Void> transactionCompletableFuture;
     @Getter
     protected final TransactionBuffer transactionBuffer;
 
@@ -261,7 +259,6 @@ public class PersistentTopic extends AbstractTopic
                 
brokerService.pulsar().getConfiguration().getDelayedDeliveryTickTimeMillis();
         this.backloggedCursorThresholdEntries =
                 
brokerService.pulsar().getConfiguration().getManagedLedgerCursorBackloggedThreshold();
-        this.transactionCompletableFuture = new CompletableFuture<>();
         initializeRateLimiterIfNeeded(Optional.empty());
         registerTopicPolicyListener();
 
@@ -303,9 +300,8 @@ public class PersistentTopic extends AbstractTopic
         if 
(brokerService.getPulsar().getConfiguration().isTransactionCoordinatorEnabled()
                 && !checkTopicIsEventsNames(topicName)) {
             this.transactionBuffer = brokerService.getPulsar()
-                    .getTransactionBufferProvider().newTransactionBuffer(this, 
transactionCompletableFuture);
+                    .getTransactionBufferProvider().newTransactionBuffer(this);
         } else {
-            this.transactionCompletableFuture.complete(null);
             this.transactionBuffer = new TransactionBufferDisable();
         }
         transactionBuffer.syncMaxReadPositionForNormalPublish((PositionImpl) 
ledger.getLastConfirmedEntry());
@@ -357,12 +353,11 @@ public class PersistentTopic extends AbstractTopic
         this.compactedTopic = new 
CompactedTopicImpl(brokerService.pulsar().getBookKeeperClient());
         this.backloggedCursorThresholdEntries =
                 
brokerService.pulsar().getConfiguration().getManagedLedgerCursorBackloggedThreshold();
-        this.transactionCompletableFuture = new CompletableFuture<>();
+
         if 
(brokerService.pulsar().getConfiguration().isTransactionCoordinatorEnabled()) {
             this.transactionBuffer = brokerService.getPulsar()
-                    .getTransactionBufferProvider().newTransactionBuffer(this, 
transactionCompletableFuture);
+                    .getTransactionBufferProvider().newTransactionBuffer(this);
         } else {
-            this.transactionCompletableFuture.complete(null);
             this.transactionBuffer = new TransactionBufferDisable();
         }
     }
@@ -575,6 +570,11 @@ public class PersistentTopic extends AbstractTopic
     }
 
     @Override
+    public CompletableFuture<Void> 
checkIfTransactionBufferRecoverCompletely(boolean isTxnEnabled) {
+        return getTransactionBuffer().checkIfTBRecoverCompletely(isTxnEnabled);
+    }
+
+    @Override
     protected CompletableFuture<Long> incrementTopicEpoch(Optional<Long> 
currentEpoch) {
         long newEpoch = currentEpoch.orElse(-1L) + 1;
         return setTopicEpoch(newEpoch);
@@ -2956,71 +2956,62 @@ public class PersistentTopic extends AbstractTopic
     public void publishTxnMessage(TxnID txnID, ByteBuf headersAndPayload, 
PublishContext publishContext) {
         pendingWriteOps.incrementAndGet();
         // in order to avoid the opAddEntry retain
-        headersAndPayload.retain();
+
         // in order to promise the publish txn message orderly, we should 
change the transactionCompletableFuture
-        this.transactionCompletableFuture = 
this.transactionCompletableFuture.thenAccept(v -> {
-            try {
-                if (isFenced) {
-                    publishContext.completed(new 
TopicFencedException("fenced"), -1, -1);
-                    decrementPendingWriteOpsAndCheck();
-                    return;
-                }
-                if 
(isExceedMaximumMessageSize(headersAndPayload.readableBytes())) {
-                    publishContext.completed(new NotAllowedException("Exceed 
maximum message size")
-                            , -1, -1);
-                    decrementPendingWriteOpsAndCheck();
-                    return;
-                }
 
-                MessageDeduplication.MessageDupStatus status =
-                        messageDeduplication.isDuplicate(publishContext, 
headersAndPayload);
-                switch (status) {
-                    case NotDup:
-                        transactionBuffer.appendBufferToTxn(txnID, 
publishContext.getSequenceId(), headersAndPayload)
-                                .thenAccept(position -> {
-                                    // Message has been successfully persisted
-                                    
messageDeduplication.recordMessagePersisted(publishContext,
-                                            (PositionImpl) position);
-                                    publishContext.completed(null, 
((PositionImpl) position).getLedgerId(),
-                                            ((PositionImpl) 
position).getEntryId());
-
-                                    decrementPendingWriteOpsAndCheck();
-                                })
-                                .exceptionally(throwable -> {
-                                    addFailed((ManagedLedgerException) 
throwable, publishContext);
-                                    return null;
-                                });
-                        break;
-                    case Dup:
-                        // Immediately acknowledge duplicated message
-                        publishContext.completed(null, -1, -1);
-                        decrementPendingWriteOpsAndCheck();
-                        break;
-                    default:
-                        publishContext.completed(new 
MessageDeduplication.MessageDupUnknownException(), -1, -1);
-                        decrementPendingWriteOpsAndCheck();
+        if (isFenced) {
+            publishContext.completed(new TopicFencedException("fenced"), -1, 
-1);
+            decrementPendingWriteOpsAndCheck();
+            return;
+        }
+        if (isExceedMaximumMessageSize(headersAndPayload.readableBytes())) {
+            publishContext.completed(new NotAllowedException("Exceed maximum 
message size")
+                    , -1, -1);
+            decrementPendingWriteOpsAndCheck();
+            return;
+        }
+
+        MessageDeduplication.MessageDupStatus status =
+                messageDeduplication.isDuplicate(publishContext, 
headersAndPayload);
+        switch (status) {
+            case NotDup:
+                transactionBuffer.appendBufferToTxn(txnID, 
publishContext.getSequenceId(), headersAndPayload)
+                        .thenAccept(position -> {
+                            // Message has been successfully persisted
+                            
messageDeduplication.recordMessagePersisted(publishContext,
+                                    (PositionImpl) position);
+                            publishContext.completed(null, ((PositionImpl) 
position).getLedgerId(),
+                                    ((PositionImpl) position).getEntryId());
+
+                            decrementPendingWriteOpsAndCheck();
+                        })
+                        .exceptionally(throwable -> {
+                            addFailed((ManagedLedgerException) throwable, 
publishContext);
+                            return null;
+                        });
+                break;
+            case Dup:
+                // Immediately acknowledge duplicated message
+                publishContext.completed(null, -1, -1);
+                decrementPendingWriteOpsAndCheck();
+                break;
+            default:
+                publishContext.completed(new 
MessageDeduplication.MessageDupUnknownException(), -1, -1);
+                decrementPendingWriteOpsAndCheck();
+
+        }
 
-                }
-            } finally {
-                headersAndPayload.release();
-            }
-        }).exceptionally(e -> {
-            headersAndPayload.release();
-            return null;
-        });
     }
 
     @Override
     public CompletableFuture<Void> endTxn(TxnID txnID, int txnAction, long 
lowWaterMark) {
-        return this.transactionCompletableFuture.thenCompose(future -> {
-            if (TxnAction.COMMIT_VALUE == txnAction) {
-                return transactionBuffer.commitTxn(txnID, lowWaterMark);
-            } else if (TxnAction.ABORT_VALUE == txnAction) {
-                return transactionBuffer.abortTxn(txnID, lowWaterMark);
-            } else {
-                return FutureUtil.failedFuture(new 
NotAllowedException("Unsupported txnAction " + txnAction));
-            }
-        });
+        if (TxnAction.COMMIT_VALUE == txnAction) {
+            return transactionBuffer.commitTxn(txnID, lowWaterMark);
+        } else if (TxnAction.ABORT_VALUE == txnAction) {
+            return transactionBuffer.abortTxn(txnID, lowWaterMark);
+        } else {
+            return FutureUtil.failedFuture(new 
NotAllowedException("Unsupported txnAction " + txnAction));
+        }
     }
 
     @Override
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBuffer.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBuffer.java
index 6ffc218..b0193fc 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBuffer.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBuffer.java
@@ -168,4 +168,12 @@ public interface TransactionBuffer {
      * @return the transaction stats in buffer.
      */
     TransactionBufferStats getStats();
+
+    /**
+     * Wait TransactionBuffer Recovers completely.
+     * Take snapshot after TB Recovers completely.
+     * @param isTxn
+     * @return a future which has completely if isTxn = false. Or a future 
return by takeSnapshot.
+     */
+    CompletableFuture<Void> checkIfTBRecoverCompletely(boolean isTxn);
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferProvider.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferProvider.java
index 4b00e39..f167d51 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferProvider.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferProvider.java
@@ -21,7 +21,6 @@ package org.apache.pulsar.broker.transaction.buffer;
 import static com.google.common.base.Preconditions.checkArgument;
 import com.google.common.annotations.Beta;
 import java.io.IOException;
-import java.util.concurrent.CompletableFuture;
 import org.apache.pulsar.broker.service.Topic;
 
 /**
@@ -55,8 +54,7 @@ public interface TransactionBufferProvider {
      * Open the persistent transaction buffer.
      *
      * @param originTopic
-     * @param transactionBufferFuture the transaction buffer future
      * @return
      */
-    TransactionBuffer newTransactionBuffer(Topic originTopic, 
CompletableFuture<Void> transactionBufferFuture);
+    TransactionBuffer newTransactionBuffer(Topic originTopic);
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java
index 213c7d0..2c32d0b 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java
@@ -210,10 +210,9 @@ class InMemTransactionBuffer implements TransactionBuffer {
 
     final ConcurrentMap<TxnID, TxnBuffer> buffers;
     final Map<Long, Set<TxnID>> txnIndex;
-    public InMemTransactionBuffer(Topic topic, CompletableFuture<Void> 
transactionBufferFuture) {
+    public InMemTransactionBuffer(Topic topic) {
         this.buffers = new ConcurrentHashMap<>();
         this.txnIndex = new HashMap<>();
-        transactionBufferFuture.complete(null);
     }
 
     @Override
@@ -381,4 +380,8 @@ class InMemTransactionBuffer implements TransactionBuffer {
         return null;
     }
 
+    @Override
+    public CompletableFuture<Void> checkIfTBRecoverCompletely(boolean isTxn) {
+        return CompletableFuture.completedFuture(null);
+    }
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBufferProvider.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBufferProvider.java
index 37da7c8..d14116b 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBufferProvider.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBufferProvider.java
@@ -18,7 +18,6 @@
  */
 package org.apache.pulsar.broker.transaction.buffer.impl;
 
-import java.util.concurrent.CompletableFuture;
 import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.transaction.buffer.TransactionBuffer;
 import org.apache.pulsar.broker.transaction.buffer.TransactionBufferProvider;
@@ -29,7 +28,7 @@ import 
org.apache.pulsar.broker.transaction.buffer.TransactionBufferProvider;
 public class InMemTransactionBufferProvider implements 
TransactionBufferProvider {
 
     @Override
-    public TransactionBuffer newTransactionBuffer(Topic originTopic, 
CompletableFuture<Void> transactionBufferFuture) {
-        return new InMemTransactionBuffer(originTopic, 
transactionBufferFuture);
+    public TransactionBuffer newTransactionBuffer(Topic originTopic) {
+        return new InMemTransactionBuffer(originTopic);
     }
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
index d3a2655..79f7f35 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
@@ -65,7 +65,7 @@ public class TopicTransactionBuffer extends 
TopicTransactionBufferState implemen
 
     private final PersistentTopic topic;
 
-    private volatile PositionImpl maxReadPosition = PositionImpl.latest;
+    private volatile PositionImpl maxReadPosition;
 
     /**
      * Ongoing transaction, map for remove txn stable position, linked for 
find max read position.
@@ -91,7 +91,9 @@ public class TopicTransactionBuffer extends 
TopicTransactionBufferState implemen
 
     private volatile long lastSnapshotTimestamps;
 
-    public TopicTransactionBuffer(PersistentTopic topic, 
CompletableFuture<Void> transactionBufferFuture) {
+    private final CompletableFuture<Void> transactionBufferFuture = new 
CompletableFuture<>();
+
+    public TopicTransactionBuffer(PersistentTopic topic) {
         super(State.None);
         this.topic = topic;
         this.changeToInitializingState();
@@ -102,6 +104,7 @@ public class TopicTransactionBuffer extends 
TopicTransactionBufferState implemen
                 
.getConfiguration().getTransactionBufferSnapshotMaxTransactionCount();
         this.takeSnapshotIntervalTime = topic.getBrokerService().getPulsar()
                 
.getConfiguration().getTransactionBufferSnapshotMinTimeInMillis();
+        this.maxReadPosition = (PositionImpl) 
topic.getManagedLedger().getLastConfirmedEntry();
         
this.topic.getBrokerService().getPulsar().getTransactionReplayExecutor()
                 .execute(new TopicTransactionBufferRecover(new 
TopicTransactionBufferRecoverCallBack() {
                     @Override
@@ -116,6 +119,15 @@ public class TopicTransactionBuffer extends 
TopicTransactionBufferState implemen
                     }
 
                     @Override
+                    public void noNeedToRecover() {
+                        if (!changeToNoSnapshotState()) {
+                            log.error("[{}]Transaction buffer recover fail", 
topic.getName());
+                        } else {
+                            transactionBufferFuture.complete(null);
+                        }
+                    }
+
+                    @Override
                     public void handleSnapshot(TransactionBufferSnapshot 
snapshot) {
                         maxReadPosition = 
PositionImpl.get(snapshot.getMaxReadPositionLedgerId(),
                                 snapshot.getMaxReadPositionEntryId());
@@ -161,6 +173,38 @@ public class TopicTransactionBuffer extends 
TopicTransactionBufferState implemen
     }
 
     @Override
+    public CompletableFuture<Void> checkIfTBRecoverCompletely(boolean 
isTxnEnabled) {
+        if (!isTxnEnabled) {
+            return CompletableFuture.completedFuture(null);
+        } else {
+            CompletableFuture<Void> completableFuture = new 
CompletableFuture<>();
+            transactionBufferFuture.thenRun(() -> {
+                if (checkIfNoSnapshot()) {
+                    takeSnapshot().thenRun(() -> {
+                        if (changeToReadyStateFromNoSnapshot()) {
+                            timer.newTimeout(TopicTransactionBuffer.this,
+                                    takeSnapshotIntervalTime, 
TimeUnit.MILLISECONDS);
+                        }
+                        completableFuture.complete(null);
+                    }).exceptionally(exception -> {
+                        log.error("Topic {} failed to take snapshot", 
this.topic.getName());
+                        completableFuture.completeExceptionally(exception);
+                        return null;
+                    });
+                } else {
+                    completableFuture.complete(null);
+                }
+            }).exceptionally(exception -> {
+                log.error("Topic {}: TransactionBuffer recover failed", 
this.topic.getName(), exception);
+                completableFuture.completeExceptionally(exception);
+                return null;
+            });
+            return completableFuture;
+        }
+    }
+
+
+    @Override
     public CompletableFuture<Position> appendBufferToTxn(TxnID txnId, long 
sequenceId, ByteBuf buffer) {
         CompletableFuture<Position> completableFuture = new 
CompletableFuture<>();
         topic.getManagedLedger().asyncAddEntry(buffer, new 
AsyncCallbacks.AddEntryCallback() {
@@ -202,32 +246,37 @@ public class TopicTransactionBuffer extends 
TopicTransactionBufferState implemen
             log.debug("Transaction {} commit on topic {}.", txnID.toString(), 
topic.getName());
         }
         CompletableFuture<Void> completableFuture = new CompletableFuture<>();
-
-        ByteBuf commitMarker = Markers.newTxnCommitMarker(-1L, 
txnID.getMostSigBits(),
-                txnID.getLeastSigBits());
-
-        try {
-            topic.getManagedLedger().asyncAddEntry(commitMarker, new 
AsyncCallbacks.AddEntryCallback() {
-                @Override
-                public void addComplete(Position position, ByteBuf entryData, 
Object ctx) {
-                    synchronized (TopicTransactionBuffer.this) {
-                        updateMaxReadPosition(txnID);
-                        handleLowWaterMark(txnID, lowWaterMark);
-                        clearAbortedTransactions();
-                        takeSnapshotByChangeTimes();
+        //Wait TB recover completely.
+        transactionBufferFuture.thenRun(() -> {
+            ByteBuf commitMarker = Markers.newTxnCommitMarker(-1L, 
txnID.getMostSigBits(),
+                    txnID.getLeastSigBits());
+            try {
+                topic.getManagedLedger().asyncAddEntry(commitMarker, new 
AsyncCallbacks.AddEntryCallback() {
+                    @Override
+                    public void addComplete(Position position, ByteBuf 
entryData, Object ctx) {
+                        synchronized (TopicTransactionBuffer.this) {
+                            updateMaxReadPosition(txnID);
+                            handleLowWaterMark(txnID, lowWaterMark);
+                            clearAbortedTransactions();
+                            takeSnapshotByChangeTimes();
+                        }
+                        completableFuture.complete(null);
                     }
-                    completableFuture.complete(null);
-                }
 
-                @Override
-                public void addFailed(ManagedLedgerException exception, Object 
ctx) {
-                    log.error("Failed to commit for txn {}", txnID, exception);
-                    completableFuture.completeExceptionally(new 
PersistenceException(exception));
-                }
-            }, null);
-        } finally {
-            commitMarker.release();
-        }
+                    @Override
+                    public void addFailed(ManagedLedgerException exception, 
Object ctx) {
+                        log.error("Failed to commit for txn {}", txnID, 
exception);
+                        completableFuture.completeExceptionally(new 
PersistenceException(exception));
+                    }
+                }, null);
+            } finally {
+                commitMarker.release();
+            }
+        }).exceptionally(exception -> {
+            log.error("Transaction {} commit on topic {}.", txnID.toString(), 
topic.getName(), exception);
+            completableFuture.completeExceptionally(exception);
+            return null;
+        });
         return completableFuture;
     }
 
@@ -237,32 +286,43 @@ public class TopicTransactionBuffer extends 
TopicTransactionBufferState implemen
             log.debug("Transaction {} abort on topic {}.", txnID.toString(), 
topic.getName());
         }
         CompletableFuture<Void> completableFuture = new CompletableFuture<>();
-
-        ByteBuf abortMarker = Markers.newTxnAbortMarker(-1L, 
txnID.getMostSigBits(), txnID.getLeastSigBits());
-        try {
-            topic.getManagedLedger().asyncAddEntry(abortMarker, new 
AsyncCallbacks.AddEntryCallback() {
-                @Override
-                public void addComplete(Position position, ByteBuf entryData, 
Object ctx) {
-                    synchronized (TopicTransactionBuffer.this) {
-                        aborts.put(txnID, (PositionImpl) position);
-                        updateMaxReadPosition(txnID);
-                        handleLowWaterMark(txnID, lowWaterMark);
-                        
changeMaxReadPositionAndAddAbortTimes.getAndIncrement();
-                        clearAbortedTransactions();
-                        takeSnapshotByChangeTimes();
+        //Wait TB recover completely.
+        transactionBufferFuture.thenRun(() -> {
+            //no message sent, need not to add abort mark by txn timeout.
+            if (!checkIfReady()) {
+                completableFuture.complete(null);
+                return;
+            }
+            ByteBuf abortMarker = Markers.newTxnAbortMarker(-1L, 
txnID.getMostSigBits(), txnID.getLeastSigBits());
+            try {
+                topic.getManagedLedger().asyncAddEntry(abortMarker, new 
AsyncCallbacks.AddEntryCallback() {
+                    @Override
+                    public void addComplete(Position position, ByteBuf 
entryData, Object ctx) {
+                        synchronized (TopicTransactionBuffer.this) {
+                            aborts.put(txnID, (PositionImpl) position);
+                            updateMaxReadPosition(txnID);
+                            handleLowWaterMark(txnID, lowWaterMark);
+                            
changeMaxReadPositionAndAddAbortTimes.getAndIncrement();
+                            clearAbortedTransactions();
+                            takeSnapshotByChangeTimes();
+                        }
+                        completableFuture.complete(null);
                     }
-                    completableFuture.complete(null);
-                }
 
-                @Override
-                public void addFailed(ManagedLedgerException exception, Object 
ctx) {
-                    log.error("Failed to abort for txn {}", txnID, exception);
-                    completableFuture.completeExceptionally(new 
PersistenceException(exception));
-                }
-            }, null);
-        } finally {
-            abortMarker.release();
-        }
+                    @Override
+                    public void addFailed(ManagedLedgerException exception, 
Object ctx) {
+                        log.error("Failed to abort for txn {}", txnID, 
exception);
+                        completableFuture.completeExceptionally(new 
PersistenceException(exception));
+                    }
+                }, null);
+            } finally {
+                abortMarker.release();
+            }
+        }).exceptionally(exception -> {
+            log.error("Transaction {} abort on topic {}.", txnID.toString(), 
topic.getName());
+            completableFuture.completeExceptionally(exception);
+            return null;
+        });
         return completableFuture;
     }
 
@@ -308,9 +368,9 @@ public class TopicTransactionBuffer extends 
TopicTransactionBufferState implemen
                 takeSnapshotIntervalTime, TimeUnit.MILLISECONDS);
     }
 
-    private void takeSnapshot() {
+    private CompletableFuture<Void> takeSnapshot() {
         changeMaxReadPositionAndAddAbortTimes.set(0);
-        takeSnapshotWriter.thenAccept(writer -> {
+        return takeSnapshotWriter.thenCompose(writer -> {
             TransactionBufferSnapshot snapshot = new 
TransactionBufferSnapshot();
             synchronized (TopicTransactionBuffer.this) {
                 snapshot.setTopicName(topic.getName());
@@ -327,7 +387,7 @@ public class TopicTransactionBuffer extends 
TopicTransactionBufferState implemen
                 });
                 snapshot.setAborts(list);
             }
-            writer.writeAsync(snapshot).thenAccept((messageId) -> {
+            return writer.writeAsync(snapshot).thenAccept(messageId-> {
                 this.lastSnapshotTimestamps = System.currentTimeMillis();
                 if (log.isDebugEnabled()) {
                     log.debug("[{}]Transaction buffer take snapshot success! "
@@ -339,7 +399,6 @@ public class TopicTransactionBuffer extends 
TopicTransactionBufferState implemen
             });
         });
     }
-
     private void clearAbortedTransactions() {
         while (!aborts.isEmpty() && !((ManagedLedgerImpl) 
topic.getManagedLedger())
                 .ledgerExists(aborts.get(aborts.firstKey()).getLedgerId())) {
@@ -398,13 +457,14 @@ public class TopicTransactionBuffer extends 
TopicTransactionBufferState implemen
         synchronized (TopicTransactionBuffer.this) {
             if (ongoingTxns.isEmpty()) {
                 maxReadPosition = position;
+                changeMaxReadPositionAndAddAbortTimes.incrementAndGet();
             }
         }
     }
 
     @Override
     public PositionImpl getMaxReadPosition() {
-        if (checkIfReady()) {
+        if (checkIfReady() || checkIfNoSnapshot()) {
             return this.maxReadPosition;
         } else {
             return PositionImpl.earliest;
@@ -470,7 +530,9 @@ public class TopicTransactionBuffer extends 
TopicTransactionBufferState implemen
             
topic.getBrokerService().getPulsar().getTransactionBufferSnapshotService()
                     
.createReader(TopicName.get(topic.getName())).thenAcceptAsync(reader -> {
                 try {
+                    boolean hasSnapshot = false;
                     while (reader.hasMoreEvents()) {
+                        hasSnapshot = true;
                         Message<TransactionBufferSnapshot> message = 
reader.readNext();
                         TransactionBufferSnapshot transactionBufferSnapshot = 
message.getValue();
                         if 
(topic.getName().equals(transactionBufferSnapshot.getTopicName())) {
@@ -480,6 +542,10 @@ public class TopicTransactionBuffer extends 
TopicTransactionBufferState implemen
                                     
transactionBufferSnapshot.getMaxReadPositionEntryId());
                         }
                     }
+                    if (!hasSnapshot) {
+                        callBack.noNeedToRecover();
+                        return;
+                    }
                 } catch (PulsarClientException pulsarClientException) {
                     log.error("[{}]Transaction buffer recover fail when read "
                             + "transactionBufferSnapshot!", topic.getName(), 
pulsarClientException);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBufferProvider.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBufferProvider.java
index 258a2ee..e5737f4 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBufferProvider.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBufferProvider.java
@@ -18,7 +18,6 @@
  */
 package org.apache.pulsar.broker.transaction.buffer.impl;
 
-import java.util.concurrent.CompletableFuture;
 import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.broker.transaction.buffer.TransactionBuffer;
@@ -30,7 +29,7 @@ import 
org.apache.pulsar.broker.transaction.buffer.TransactionBufferProvider;
 public class TopicTransactionBufferProvider implements 
TransactionBufferProvider {
 
     @Override
-    public TransactionBuffer newTransactionBuffer(Topic originTopic, 
CompletableFuture<Void> transactionBufferFuture) {
-        return new TopicTransactionBuffer((PersistentTopic) originTopic, 
transactionBufferFuture);
+    public TransactionBuffer newTransactionBuffer(Topic originTopic) {
+        return new TopicTransactionBuffer((PersistentTopic) originTopic);
     }
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBufferRecoverCallBack.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBufferRecoverCallBack.java
index 3a4fc39..1640459 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBufferRecoverCallBack.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBufferRecoverCallBack.java
@@ -29,6 +29,12 @@ public interface TopicTransactionBufferRecoverCallBack {
     void recoverComplete();
 
     /**
+     * No message with transaction has ever been sent.
+     * Skip recovery procedure
+     */
+    void noNeedToRecover();
+
+    /**
      * Handle transactionBufferSnapshot.
      *
      * @param snapshot the transaction buffer snapshot
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBufferState.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBufferState.java
index f9120bd..3b21398 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBufferState.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBufferState.java
@@ -32,7 +32,8 @@ public abstract class TopicTransactionBufferState {
         None,
         Initializing,
         Ready,
-        Close
+        Close,
+        NoSnapshot
     }
 
     private static final 
AtomicReferenceFieldUpdater<TopicTransactionBufferState, State> STATE_UPDATER =
@@ -49,20 +50,33 @@ public abstract class TopicTransactionBufferState {
         return (STATE_UPDATER.compareAndSet(this, State.Initializing, 
State.Ready));
     }
 
+    protected boolean changeToNoSnapshotState() {
+        return (STATE_UPDATER.compareAndSet(this, State.Initializing, 
State.NoSnapshot));
+    }
+
     protected boolean changeToInitializingState() {
         return STATE_UPDATER.compareAndSet(this, State.None, 
State.Initializing);
     }
 
+    protected boolean changeToReadyStateFromNoSnapshot() {
+        return STATE_UPDATER.compareAndSet(this, State.NoSnapshot, 
State.Ready);
+    }
+
     protected boolean changeToCloseState() {
         return (STATE_UPDATER.compareAndSet(this, State.Ready, State.Close)
                 || STATE_UPDATER.compareAndSet(this, State.None, State.Close)
-                || STATE_UPDATER.compareAndSet(this, State.Initializing, 
State.Close));
+                || STATE_UPDATER.compareAndSet(this, State.Initializing, 
State.Close)
+                || STATE_UPDATER.compareAndSet(this, State.NoSnapshot, 
State.Close));
     }
 
     public boolean checkIfReady() {
         return STATE_UPDATER.get(this) == State.Ready;
     }
 
+    public boolean checkIfNoSnapshot() {
+        return STATE_UPDATER.get(this) == State.NoSnapshot;
+    }
+
     public State getState() {
         return STATE_UPDATER.get(this);
     }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java
index ff18924..f7e147d 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java
@@ -102,4 +102,9 @@ public class TransactionBufferDisable implements 
TransactionBuffer {
     public TransactionBufferStats getStats() {
         return null;
     }
+
+    @Override
+    public CompletableFuture<Void> checkIfTBRecoverCompletely(boolean isTxn) {
+        return CompletableFuture.completedFuture(null);
+    }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
index 0c2ff3d..2dc57b5 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
@@ -406,7 +406,7 @@ public class ServerCnxTest {
 
         // test PRODUCER success case
         ByteBuf clientCommand = Commands.newProducer(successTopicName, 1 /* 
producer id */, 1 /* request id */,
-                "prod-name", Collections.emptyMap());
+                "prod-name", Collections.emptyMap(), false);
         channel.writeInbound(clientCommand);
         assertTrue(getResponse() instanceof CommandProducerSuccess);
 
@@ -417,7 +417,7 @@ public class ServerCnxTest {
 
         // test PRODUCER error case
         clientCommand = Commands.newProducer(failTopicName, 2, 2,
-                "prod-name-2", Collections.emptyMap());
+                "prod-name-2", Collections.emptyMap(), false);
         channel.writeInbound(clientCommand);
 
         assertTrue(getResponse() instanceof CommandError);
@@ -436,12 +436,12 @@ public class ServerCnxTest {
         
doReturn(delayFuture).when(brokerService).getOrCreateTopic(any(String.class));
         // Create producer first time
         ByteBuf clientCommand = Commands.newProducer(successTopicName, 1 /* 
producer id */, 1 /* request id */,
-                "prod-name", Collections.emptyMap());
+                "prod-name", Collections.emptyMap(), false);
         channel.writeInbound(clientCommand);
 
         // Create producer second time
         clientCommand = Commands.newProducer(successTopicName, 1 /* producer 
id */, 1 /* request id */,
-                "prod-name", Collections.emptyMap());
+                "prod-name", Collections.emptyMap(), false);
         channel.writeInbound(clientCommand);
 
         Object response = getResponse();
@@ -461,7 +461,7 @@ public class ServerCnxTest {
 
         // test PRODUCER failure case
         ByteBuf clientCommand = Commands.newProducer(nonOwnedTopicName, 1 /* 
producer id */, 1 /* request id */,
-                "prod-name", Collections.emptyMap());
+                "prod-name", Collections.emptyMap(), false);
         channel.writeInbound(clientCommand);
 
         Object response = getResponse();
@@ -487,7 +487,7 @@ public class ServerCnxTest {
 
         // test PRODUCER success case
         ByteBuf clientCommand = Commands.newProducer(successTopicName, 1 /* 
producer id */, 1 /* request id */,
-                "prod-name", Collections.emptyMap());
+                "prod-name", Collections.emptyMap(), false);
         channel.writeInbound(clientCommand);
         assertEquals(getResponse().getClass(), CommandProducerSuccess.class);
 
@@ -517,7 +517,7 @@ public class ServerCnxTest {
         resetChannel();
         setChannelConnected();
         ByteBuf newProducerCmd = Commands.newProducer(nonExistentTopicName, 1 
/* producer id */, 1 /* request id */,
-                "prod-name", Collections.emptyMap());
+                "prod-name", Collections.emptyMap(), false);
         channel.writeInbound(newProducerCmd);
         assertTrue(getResponse() instanceof CommandError);
         channel.finish();
@@ -551,14 +551,14 @@ public class ServerCnxTest {
         resetChannel();
         setChannelConnected();
         ByteBuf clientCommand = Commands.newProducer(successTopicName, 1 /* 
producer id */, 1 /* request id */,
-                "prod-name", Collections.emptyMap());
+                "prod-name", Collections.emptyMap(), false);
         channel.writeInbound(clientCommand);
         assertTrue(getResponse() instanceof CommandProducerSuccess);
 
         resetChannel();
         setChannelConnected();
         clientCommand = Commands.newProducer(topicWithNonLocalCluster, 1 /* 
producer id */, 1 /* request id */,
-                "prod-name", Collections.emptyMap());
+                "prod-name", Collections.emptyMap(), false);
         channel.writeInbound(clientCommand);
         assertTrue(getResponse() instanceof CommandError);
         channel.finish();
@@ -579,7 +579,7 @@ public class ServerCnxTest {
         resetChannel();
         setChannelConnected();
         ByteBuf newProducerCmd = Commands.newProducer(nonExistentTopicName, 1 
/* producer id */, 1 /* request id */,
-                "prod-name", Collections.emptyMap());
+                "prod-name", Collections.emptyMap(), false);
         channel.writeInbound(newProducerCmd);
         assertTrue(getResponse() instanceof CommandProducerSuccess);
 
@@ -616,7 +616,7 @@ public class ServerCnxTest {
         setChannelConnected();
 
         ByteBuf clientCommand = Commands.newProducer(successTopicName, 1 /* 
producer id */, 1 /* request id */,
-                null, Collections.emptyMap());
+                null, Collections.emptyMap(), false);
         channel.writeInbound(clientCommand);
         assertTrue(getResponse() instanceof CommandError);
 
@@ -629,7 +629,7 @@ public class ServerCnxTest {
         setChannelConnected();
 
         ByteBuf clientCommand = Commands.newProducer(successTopicName, 1 /* 
producer id */, 1 /* request id */,
-                "prod-name", Collections.emptyMap());
+                "prod-name", Collections.emptyMap(), false);
         channel.writeInbound(clientCommand);
         assertTrue(getResponse() instanceof CommandProducerSuccess);
 
@@ -656,12 +656,12 @@ public class ServerCnxTest {
         String producerName = "my-producer";
 
         ByteBuf clientCommand1 = Commands.newProducer(successTopicName, 1 /* 
producer id */, 1 /* request id */,
-                producerName, Collections.emptyMap());
+                producerName, Collections.emptyMap(), false);
         channel.writeInbound(clientCommand1);
         assertTrue(getResponse() instanceof CommandProducerSuccess);
 
         ByteBuf clientCommand2 = Commands.newProducer(successTopicName, 2 /* 
producer id */, 2 /* request id */,
-                producerName, Collections.emptyMap());
+                producerName, Collections.emptyMap(), false);
         channel.writeInbound(clientCommand2);
         assertTrue(getResponse() instanceof CommandError);
 
@@ -678,7 +678,7 @@ public class ServerCnxTest {
         String producerName = "my-producer";
 
         ByteBuf createProducer1 = Commands.newProducer(successTopicName, 1 /* 
producer id */, 1 /* request id */,
-                producerName, Collections.emptyMap());
+                producerName, Collections.emptyMap(), false);
         channel.writeInbound(createProducer1);
 
         // Producer create succeeds
@@ -687,7 +687,7 @@ public class ServerCnxTest {
         assertEquals(((CommandProducerSuccess) response).getRequestId(), 1);
 
         ByteBuf createProducer2 = Commands.newProducer(successTopicName, 1 /* 
producer id */, 2 /* request id */,
-                producerName, Collections.emptyMap());
+                producerName, Collections.emptyMap(), false);
         channel.writeInbound(createProducer2);
 
         // 2nd producer create succeeds as well
@@ -788,14 +788,14 @@ public class ServerCnxTest {
         String producerName = "my-producer";
 
         ByteBuf createProducer1 = Commands.newProducer(successTopicName, 1 /* 
producer id */, 1 /* request id */,
-                producerName, Collections.emptyMap());
+                producerName, Collections.emptyMap(), false);
         channel.writeInbound(createProducer1);
 
         ByteBuf closeProducer = Commands.newCloseProducer(1 /* producer id */, 
2 /* request id */ );
         channel.writeInbound(closeProducer);
 
         ByteBuf createProducer2 = Commands.newProducer(successTopicName, 1 /* 
producer id */, 3 /* request id */,
-                producerName, Collections.emptyMap());
+                producerName, Collections.emptyMap(), false);
         channel.writeInbound(createProducer2);
 
         // Complete the topic opening: It will make 2nd producer creation 
successful
@@ -845,22 +845,22 @@ public class ServerCnxTest {
         String producerName = "my-producer";
 
         ByteBuf createProducer1 = Commands.newProducer(successTopicName, 1 /* 
producer id */, 1 /* request id */,
-                producerName, Collections.emptyMap());
+                producerName, Collections.emptyMap(), false);
         channel.writeInbound(createProducer1);
 
         ByteBuf closeProducer1 = Commands.newCloseProducer(1 /* producer id 
*/, 2 /* request id */ );
         channel.writeInbound(closeProducer1);
 
         ByteBuf createProducer2 = Commands.newProducer(successTopicName, 1 /* 
producer id */, 3 /* request id */,
-                producerName, Collections.emptyMap());
+                producerName, Collections.emptyMap(), false);
         channel.writeInbound(createProducer2);
 
         ByteBuf createProducer3 = Commands.newProducer(successTopicName, 1 /* 
producer id */, 4 /* request id */,
-                producerName, Collections.emptyMap());
+                producerName, Collections.emptyMap(), false);
         channel.writeInbound(createProducer3);
 
         ByteBuf createProducer4 = Commands.newProducer(successTopicName, 1 /* 
producer id */, 5 /* request id */,
-                producerName, Collections.emptyMap());
+                producerName, Collections.emptyMap(), false);
         channel.writeInbound(createProducer4);
 
         // Close succeeds
@@ -924,14 +924,14 @@ public class ServerCnxTest {
         String producerName = "my-producer";
 
         ByteBuf createProducer1 = Commands.newProducer(failTopicName, 1 /* 
producer id */, 1 /* request id */,
-                producerName, Collections.emptyMap());
+                producerName, Collections.emptyMap(), false);
         channel.writeInbound(createProducer1);
 
         ByteBuf closeProducer = Commands.newCloseProducer(1 /* producer id */, 
2 /* request id */ );
         channel.writeInbound(closeProducer);
 
         ByteBuf createProducer2 = Commands.newProducer(successTopicName, 1 /* 
producer id */, 3 /* request id */,
-                producerName, Collections.emptyMap());
+                producerName, Collections.emptyMap(), false);
         channel.writeInbound(createProducer2);
 
         // Now the topic gets opened.. It will make 2nd producer creation 
successful
@@ -950,7 +950,7 @@ public class ServerCnxTest {
         // Wait till the failtopic timeout interval
         Thread.sleep(500);
         ByteBuf createProducer3 = Commands.newProducer(successTopicName, 1 /* 
producer id */, 4 /* request id */,
-                producerName, Collections.emptyMap());
+                producerName, Collections.emptyMap(), false);
         channel.writeInbound(createProducer3);
 
         // 3rd producer succeeds because 2nd is already connected
@@ -1299,7 +1299,7 @@ public class ServerCnxTest {
 
         // test success case: encrypted producer can connect
         ByteBuf clientCommand = 
Commands.newProducer(encryptionRequiredTopicName, 1 /* producer id */, 1 /* 
request id */,
-                "encrypted-producer", true, Collections.emptyMap());
+                "encrypted-producer", true, Collections.emptyMap(), false);
         channel.writeInbound(clientCommand);
 
         Object response = getResponse();
@@ -1328,7 +1328,7 @@ public class ServerCnxTest {
 
         // test failure case: unencrypted producer cannot connect
         ByteBuf clientCommand = 
Commands.newProducer(encryptionRequiredTopicName, 2 /* producer id */, 2 /* 
request id */,
-                "unencrypted-producer", false, Collections.emptyMap());
+                "unencrypted-producer", false, Collections.emptyMap(), false);
         channel.writeInbound(clientCommand);
 
         Object response = getResponse();
@@ -1361,7 +1361,7 @@ public class ServerCnxTest {
 
         // test failure case: unencrypted producer cannot connect
         ByteBuf clientCommand = 
Commands.newProducer(encryptionRequiredTopicName, 2 /* producer id */, 2 /* 
request id */,
-                "unencrypted-producer", false, Collections.emptyMap());
+                "unencrypted-producer", false, Collections.emptyMap(), false);
         channel.writeInbound(clientCommand);
 
         Object response = getResponse();
@@ -1391,7 +1391,7 @@ public class ServerCnxTest {
                 
.getPoliciesAsync(TopicName.get(encryptionRequiredTopicName).getNamespaceObject());
 
         ByteBuf clientCommand = 
Commands.newProducer(encryptionRequiredTopicName, 1 /* producer id */, 1 /* 
request id */,
-                "prod-name", true, Collections.emptyMap());
+                "prod-name", true, Collections.emptyMap(), false);
         channel.writeInbound(clientCommand);
         assertTrue(getResponse() instanceof CommandProducerSuccess);
 
@@ -1428,7 +1428,7 @@ public class ServerCnxTest {
                 
.getPoliciesAsync(TopicName.get(encryptionRequiredTopicName).getNamespaceObject());
 
         ByteBuf clientCommand = 
Commands.newProducer(encryptionRequiredTopicName, 1 /* producer id */, 1 /* 
request id */,
-                "prod-name", true, Collections.emptyMap());
+                "prod-name", true, Collections.emptyMap(), false);
         channel.writeInbound(clientCommand);
         assertTrue(getResponse() instanceof CommandProducerSuccess);
 
@@ -1613,7 +1613,7 @@ public class ServerCnxTest {
         setChannelConnected();
 
         ByteBuf clientCommand = Commands.newProducer(invalidTopicName, 1 /* 
producer id */, 1 /* request id */,
-                "prod-name", Collections.emptyMap());
+                "prod-name", Collections.emptyMap(), false);
         channel.writeInbound(clientCommand);
         Object obj = getResponse();
         assertEquals(obj.getClass(), CommandError.class);
@@ -1653,7 +1653,7 @@ public class ServerCnxTest {
         // Create producer first time
         int producerId = 1;
         ByteBuf clientCommand = Commands.newProducer(successTopicName, 
producerId /* producer id */, 1 /* request id */,
-                "prod-name", Collections.emptyMap());
+                "prod-name", Collections.emptyMap(), false);
         channel.writeInbound(clientCommand);
 
         ByteBuf closeProducerCmd = Commands.newCloseProducer(producerId, 2);
@@ -1664,7 +1664,7 @@ public class ServerCnxTest {
         
doReturn(CompletableFuture.completedFuture(false)).when(topic).hasSchema();
 
         clientCommand = Commands.newProducer(successTopicName, producerId /* 
producer id */, 1 /* request id */,
-                "prod-name", Collections.emptyMap());
+                "prod-name", Collections.emptyMap(), false);
         channel.writeInbound(clientCommand);
 
         Object response = getResponse();
@@ -1705,7 +1705,7 @@ public class ServerCnxTest {
 
         // Producer command when the service unit is not ready
         ByteBuf clientCommand3 = Commands.newProducer(successTopicName, 1 /* 
producer id */, 3 /* request id */,
-                "p1" /* producer name */, Collections.emptyMap());
+                "p1" /* producer name */, Collections.emptyMap(), false);
         channel.writeInbound(clientCommand3);
 
         Object response3 = getResponse();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java
index b16dace..3607b45 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java
@@ -282,30 +282,33 @@ public class TopicTransactionBufferRecoverTest extends 
TransactionTestBase {
         tnx1.commit().get();
         // wait timeout take snapshot
 
-        TransactionBufferSnapshot transactionBufferSnapshot = 
reader.readNext().getValue();
-        assertEquals(transactionBufferSnapshot.getMaxReadPositionEntryId(), 
((MessageIdImpl) messageId1).getEntryId() + 1);
-        assertEquals(transactionBufferSnapshot.getMaxReadPositionLedgerId(), 
((MessageIdImpl) messageId1).getLedgerId());
-        assertFalse(reader.hasMessageAvailable());
+        Awaitility.await().untilAsserted(() -> {
+            TransactionBufferSnapshot transactionBufferSnapshot = 
reader.readNext().getValue();
+            
assertEquals(transactionBufferSnapshot.getMaxReadPositionEntryId(), -1);
+            
assertEquals(transactionBufferSnapshot.getMaxReadPositionLedgerId(), 
((MessageIdImpl) messageId1).getLedgerId());
+            transactionBufferSnapshot = reader.readNext().getValue();
+            
assertEquals(transactionBufferSnapshot.getMaxReadPositionEntryId(), 
((MessageIdImpl) messageId1).getEntryId() + 1);
+            
assertEquals(transactionBufferSnapshot.getMaxReadPositionLedgerId(), 
((MessageIdImpl) messageId1).getLedgerId());
+            assertFalse(reader.hasMessageAvailable());
+        });
 
         // take snapshot by change times
         MessageId messageId2 = producer.newMessage(tnx2).value("test").send();
         tnx2.commit().get();
 
-        MessageId messageId3 = producer.newMessage(tnx3).value("test").send();
-        tnx3.commit().get();
 
         TransactionBufferSnapshot snapshot = reader.readNext().getValue();
-        assertEquals(snapshot.getMaxReadPositionEntryId(), ((MessageIdImpl) 
messageId3).getEntryId() + 1);
-        assertEquals(snapshot.getMaxReadPositionLedgerId(), ((MessageIdImpl) 
messageId3).getLedgerId());
+        assertEquals(snapshot.getMaxReadPositionEntryId(), ((MessageIdImpl) 
messageId2).getEntryId() + 1);
+        assertEquals(snapshot.getMaxReadPositionLedgerId(), ((MessageIdImpl) 
messageId2).getLedgerId());
         assertEquals(snapshot.getAborts().size(), 0);
         assertFalse(reader.hasMessageAvailable());
 
-        MessageId messageId4 = 
producer.newMessage(abortTxn).value("test").send();
+        MessageId messageId3 = 
producer.newMessage(abortTxn).value("test").send();
         abortTxn.abort().get();
 
-        transactionBufferSnapshot = reader.readNext().getValue();
-        assertEquals(transactionBufferSnapshot.getMaxReadPositionEntryId(), 
((MessageIdImpl) messageId4).getEntryId() + 1);
-        assertEquals(transactionBufferSnapshot.getMaxReadPositionLedgerId(), 
((MessageIdImpl) messageId4).getLedgerId());
+        TransactionBufferSnapshot transactionBufferSnapshot = 
reader.readNext().getValue();
+        assertEquals(transactionBufferSnapshot.getMaxReadPositionEntryId(), 
((MessageIdImpl) messageId3).getEntryId() + 1);
+        assertEquals(transactionBufferSnapshot.getMaxReadPositionLedgerId(), 
((MessageIdImpl) messageId3).getLedgerId());
         assertEquals(transactionBufferSnapshot.getAborts().size(), 1);
         
assertEquals(transactionBufferSnapshot.getAborts().get(0).getTxnIdLeastBits(),
                 ((TransactionImpl) abortTxn).getTxnIdLeastBits());
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
index fb94638..971f8b2 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -35,15 +35,23 @@ import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.broker.transaction.pendingack.PendingAckStore;
+import 
org.apache.pulsar.broker.transaction.buffer.matadata.TransactionBufferSnapshot;
 import org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStore;
 import 
org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStoreProvider;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Reader;
+import org.apache.pulsar.client.api.ReaderBuilder;
+import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.api.transaction.Transaction;
 import org.apache.pulsar.client.api.transaction.TxnID;
+import org.apache.pulsar.common.events.EventsTopicNames;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicDomain;
 import org.apache.pulsar.common.naming.TopicName;
@@ -246,4 +254,45 @@ public class TransactionTest extends TransactionTestBase {
 
     }
 
+    @Test
+    public void testTakeSnapshotBeforeBuildTxnProducer() throws Exception {
+        String topic = "persistent://" + NAMESPACE1 + "/testSnapShot";
+        admin.topics().createNonPartitionedTopic(topic);
+        PersistentTopic persistentTopic = (PersistentTopic) 
getPulsarServiceList().get(0)
+                .getBrokerService().getTopic(topic, false)
+                .get().get();
+
+        ReaderBuilder<TransactionBufferSnapshot> readerBuilder = pulsarClient
+                .newReader(Schema.AVRO(TransactionBufferSnapshot.class))
+                .startMessageId(MessageId.earliest)
+                .topic(NAMESPACE1 + "/" + 
EventsTopicNames.TRANSACTION_BUFFER_SNAPSHOT);
+        Reader<TransactionBufferSnapshot> reader = readerBuilder.create();
+
+        long waitSnapShotTime = 
getPulsarServiceList().get(0).getConfiguration()
+                .getTransactionBufferSnapshotMinTimeInMillis();
+        Awaitility.await().atMost(waitSnapShotTime * 2, TimeUnit.MILLISECONDS)
+                .untilAsserted(() -> 
Assert.assertFalse(reader.hasMessageAvailable()));
+
+        //test take snapshot by build producer by the transactionEnable client
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .producerName("testSnapshot").sendTimeout(0, TimeUnit.SECONDS)
+                .topic(topic).enableBatching(true)
+                .create();
+
+        Awaitility.await().untilAsserted(() -> {
+            Message<TransactionBufferSnapshot> message1 = reader.readNext();
+            TransactionBufferSnapshot snapshot1 = message1.getValue();
+            Assert.assertEquals(snapshot1.getMaxReadPositionEntryId(), -1);
+        });
+
+        // test snapshot by publish  normal messages.
+        producer.newMessage(Schema.STRING).value("common message send").send();
+        producer.newMessage(Schema.STRING).value("common message send").send();
+
+        Awaitility.await().untilAsserted(() -> {
+            Message<TransactionBufferSnapshot> message1 = reader.readNext();
+            TransactionBufferSnapshot snapshot1 = message1.getValue();
+            Assert.assertEquals(snapshot1.getMaxReadPositionEntryId(), 1);
+        });
+    }
 }
\ No newline at end of file
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferTest.java
index be1face..2238490 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferTest.java
@@ -72,7 +72,7 @@ public class TransactionBufferTest {
     @BeforeMethod
     public void setup() throws Exception {
         PersistentTopic persistentTopic = mock(PersistentTopic.class);
-        this.buffer = this.provider.newTransactionBuffer(persistentTopic, new 
CompletableFuture<>());
+        this.buffer = this.provider.newTransactionBuffer(persistentTopic);
     }
 
     @AfterMethod(alwaysRun = true)
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index 5177451..0f27a63 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -1385,7 +1385,7 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
         cnx.sendRequestWithId(
                 Commands.newProducer(topic, producerId, requestId, 
producerName, conf.isEncryptionEnabled(), metadata,
                        schemaInfo, connectionHandler.getEpoch(), 
userProvidedProducerName,
-                       conf.getAccessMode(), topicEpoch),
+                       conf.getAccessMode(), topicEpoch, 
client.conf.isEnableTransaction()),
                 requestId).thenAccept(response -> {
                     String producerName = response.getProducerName();
                     long lastSequenceId = response.getLastSequenceId();
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
index e2685cc..3c363e0 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
@@ -692,14 +692,14 @@ public class Commands {
 
     @VisibleForTesting
     public static ByteBuf newProducer(String topic, long producerId, long 
requestId, String producerName,
-                Map<String, String> metadata) {
-        return newProducer(topic, producerId, requestId, producerName, false, 
metadata);
+                Map<String, String> metadata, boolean isTxnEnabled) {
+        return newProducer(topic, producerId, requestId, producerName, false, 
metadata, isTxnEnabled);
     }
 
     public static ByteBuf newProducer(String topic, long producerId, long 
requestId, String producerName,
-                boolean encrypted, Map<String, String> metadata) {
+                boolean encrypted, Map<String, String> metadata, boolean 
isTxnEnabled) {
         return newProducer(topic, producerId, requestId, producerName, 
encrypted, metadata, null, 0, false,
-                ProducerAccessMode.Shared, Optional.empty());
+                ProducerAccessMode.Shared, Optional.empty(), isTxnEnabled);
     }
 
     private static Schema.Type getSchemaType(SchemaType type) {
@@ -736,7 +736,7 @@ public class Commands {
     public static ByteBuf newProducer(String topic, long producerId, long 
requestId, String producerName,
           boolean encrypted, Map<String, String> metadata, SchemaInfo 
schemaInfo,
           long epoch, boolean userProvidedProducerName,
-          ProducerAccessMode accessMode, Optional<Long> topicEpoch) {
+          ProducerAccessMode accessMode, Optional<Long> topicEpoch, boolean 
isTxnEnabled) {
         BaseCommand cmd = localCmd(Type.PRODUCER);
         CommandProducer producer = cmd.setProducer()
                 .setTopic(topic)
@@ -745,6 +745,7 @@ public class Commands {
                 .setEpoch(epoch)
                 .setUserProvidedProducerName(userProvidedProducerName)
                 .setEncrypted(encrypted)
+                .setTxnEnabled(isTxnEnabled)
                 .setProducerAccessMode(convertProducerAccessMode(accessMode));
         if (producerName != null) {
             producer.setProducerName(producerName);
diff --git a/pulsar-common/src/main/proto/PulsarApi.proto 
b/pulsar-common/src/main/proto/PulsarApi.proto
index b08ec46..34ad37b 100644
--- a/pulsar-common/src/main/proto/PulsarApi.proto
+++ b/pulsar-common/src/main/proto/PulsarApi.proto
@@ -483,6 +483,8 @@ message CommandProducer {
     // leave it empty and then it will always carry the same epoch number on
     // the subsequent reconnections.
     optional uint64 topic_epoch = 11;
+
+    optional bool txn_enabled = 12 [default = false];
 }
 
 message CommandSend {

Reply via email to