congbobo184 commented on a change in pull request #12219:
URL: https://github.com/apache/pulsar/pull/12219#discussion_r725422949



##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
##########
@@ -327,20 +367,25 @@ private void takeSnapshot() {
                 });
                 snapshot.setAborts(list);
             }
-            writer.writeAsync(snapshot).thenAccept((messageId) -> {
-                this.lastSnapshotTimestamps = System.currentTimeMillis();
-                if (log.isDebugEnabled()) {
-                    log.debug("[{}]Transaction buffer take snapshot success! "
-                            + "messageId : {}", topic.getName(), messageId);
+
+            CompletableFuture<Void> completableFuture = new 
CompletableFuture<>();
+            writer.writeAsync(snapshot).whenComplete((v, e) -> {
+                if (e != null) {
+                    completableFuture.completeExceptionally(e);
+                    log.warn("[{}]Transaction buffer take snapshot fail! ", 
topic.getName(), e);
+                } else {
+                    this.lastSnapshotTimestamps = System.currentTimeMillis();
+                    if (log.isDebugEnabled()) {
+                        log.debug("[{}]Transaction buffer take snapshot 
success! "
+                                + "messageId : {}", topic.getName(), v);
+                    }
+
+                    completableFuture.complete(null);
                 }
-            }).exceptionally(e -> {
-                log.warn("[{}]Transaction buffer take snapshot fail! ", 
topic.getName(), e);
-                return null;
             });
+            return completableFuture;
         });
-    }
-
-    private void clearAbortedTransactions() {
+    }    private void clearAbortedTransactions() {

Review comment:
       format error

##########
File path: 
pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
##########
@@ -246,4 +254,53 @@ public void testSubscriptionRecreateTopic()
 
     }
 
+
+    public void testTakeSnapshotBeforeFirstTxnMessageSend() throws Exception{
+        String topic = "persistent://" + NAMESPACE1 + "/testSnapShot";
+        admin.topics().createNonPartitionedTopic(topic);
+        PersistentTopic persistentTopic = (PersistentTopic) 
getPulsarServiceList().get(0)
+                .getBrokerService().getTopic(topic, false)
+                .get().get();
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .producerName("testSnapshot").sendTimeout(0, TimeUnit.SECONDS)
+                .topic(topic).enableBatching(true)
+                .create();
+        ReaderBuilder<TransactionBufferSnapshot> readerBuilder = pulsarClient
+                .newReader(Schema.AVRO(TransactionBufferSnapshot.class))
+                .startMessageId(MessageId.latest)
+                .topic(NAMESPACE1 + "/" + 
EventsTopicNames.TRANSACTION_BUFFER_SNAPSHOT);
+        Reader<TransactionBufferSnapshot> reader= readerBuilder.create();
+
+        long waitSnapShotTime = 
getPulsarServiceList().get(0).getConfiguration()
+                .getTransactionBufferSnapshotMinTimeInMillis();
+
+        producer.newMessage(Schema.STRING).value("common message send").send();
+
+        Awaitility.await().atMost(waitSnapShotTime * 2, TimeUnit.MILLISECONDS)
+                .until(() -> {

Review comment:
       use untilAsserted

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBufferRecoverCallBack.java
##########
@@ -26,7 +26,7 @@
     /**
      * Topic transaction buffer recover complete.
      */
-    void recoverComplete();

Review comment:
       add a method named noNeedToRecover

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
##########
@@ -163,21 +173,51 @@ public void recoverExceptionally(Exception e) {
     @Override
     public CompletableFuture<Position> appendBufferToTxn(TxnID txnId, long 
sequenceId, ByteBuf buffer) {
         CompletableFuture<Position> completableFuture = new 
CompletableFuture<>();
-        topic.getManagedLedger().asyncAddEntry(buffer, new 
AsyncCallbacks.AddEntryCallback() {
-            @Override
-            public void addComplete(Position position, ByteBuf entryData, 
Object ctx) {
-                synchronized (TopicTransactionBuffer.this) {
-                    handleTransactionMessage(txnId, position);
+        if (checkIfUnused()){
+            takeSnapshot().thenAccept(ignore -> {
+                topic.getManagedLedger().asyncAddEntry(buffer, new 
AsyncCallbacks.AddEntryCallback() {
+                    @Override
+                    public void addComplete(Position position, ByteBuf 
entryData, Object ctx) {
+                        synchronized (TopicTransactionBuffer.this) {
+                            handleTransactionMessage(txnId, position);
+                            if (!changeToReadyStateAfterUsed()){
+                                log.error("Fail to change state when add 
message with transaction at the first time.");
+                            }
+                            timer.newTimeout(TopicTransactionBuffer.this,
+                                    takeSnapshotIntervalTime, 
TimeUnit.MILLISECONDS);

Review comment:
       can move to after takeSnapshot and before asyncAddEntry

##########
File path: pulsar-io/jdbc/lombok.config
##########
@@ -1,22 +0,0 @@
-#

Review comment:
       why delete this file?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to