congbobo184 commented on a change in pull request #12219:
URL: https://github.com/apache/pulsar/pull/12219#discussion_r725422949
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
##########
@@ -327,20 +367,25 @@ private void takeSnapshot() {
});
snapshot.setAborts(list);
}
- writer.writeAsync(snapshot).thenAccept((messageId) -> {
- this.lastSnapshotTimestamps = System.currentTimeMillis();
- if (log.isDebugEnabled()) {
- log.debug("[{}]Transaction buffer take snapshot success! "
- + "messageId : {}", topic.getName(), messageId);
+
+ CompletableFuture<Void> completableFuture = new
CompletableFuture<>();
+ writer.writeAsync(snapshot).whenComplete((v, e) -> {
+ if (e != null) {
+ completableFuture.completeExceptionally(e);
+ log.warn("[{}]Transaction buffer take snapshot fail! ",
topic.getName(), e);
+ } else {
+ this.lastSnapshotTimestamps = System.currentTimeMillis();
+ if (log.isDebugEnabled()) {
+ log.debug("[{}]Transaction buffer take snapshot
success! "
+ + "messageId : {}", topic.getName(), v);
+ }
+
+ completableFuture.complete(null);
}
- }).exceptionally(e -> {
- log.warn("[{}]Transaction buffer take snapshot fail! ",
topic.getName(), e);
- return null;
});
+ return completableFuture;
});
- }
-
- private void clearAbortedTransactions() {
+ } private void clearAbortedTransactions() {
Review comment:
format error
##########
File path:
pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
##########
@@ -246,4 +254,53 @@ public void testSubscriptionRecreateTopic()
}
+
+ public void testTakeSnapshotBeforeFirstTxnMessageSend() throws Exception{
+ String topic = "persistent://" + NAMESPACE1 + "/testSnapShot";
+ admin.topics().createNonPartitionedTopic(topic);
+ PersistentTopic persistentTopic = (PersistentTopic)
getPulsarServiceList().get(0)
+ .getBrokerService().getTopic(topic, false)
+ .get().get();
+ Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+ .producerName("testSnapshot").sendTimeout(0, TimeUnit.SECONDS)
+ .topic(topic).enableBatching(true)
+ .create();
+ ReaderBuilder<TransactionBufferSnapshot> readerBuilder = pulsarClient
+ .newReader(Schema.AVRO(TransactionBufferSnapshot.class))
+ .startMessageId(MessageId.latest)
+ .topic(NAMESPACE1 + "/" +
EventsTopicNames.TRANSACTION_BUFFER_SNAPSHOT);
+ Reader<TransactionBufferSnapshot> reader= readerBuilder.create();
+
+ long waitSnapShotTime =
getPulsarServiceList().get(0).getConfiguration()
+ .getTransactionBufferSnapshotMinTimeInMillis();
+
+ producer.newMessage(Schema.STRING).value("common message send").send();
+
+ Awaitility.await().atMost(waitSnapShotTime * 2, TimeUnit.MILLISECONDS)
+ .until(() -> {
Review comment:
use untilAsserted
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBufferRecoverCallBack.java
##########
@@ -26,7 +26,7 @@
/**
* Topic transaction buffer recover complete.
*/
- void recoverComplete();
Review comment:
add a method named noNeedToRecover
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
##########
@@ -163,21 +173,51 @@ public void recoverExceptionally(Exception e) {
@Override
public CompletableFuture<Position> appendBufferToTxn(TxnID txnId, long
sequenceId, ByteBuf buffer) {
CompletableFuture<Position> completableFuture = new
CompletableFuture<>();
- topic.getManagedLedger().asyncAddEntry(buffer, new
AsyncCallbacks.AddEntryCallback() {
- @Override
- public void addComplete(Position position, ByteBuf entryData,
Object ctx) {
- synchronized (TopicTransactionBuffer.this) {
- handleTransactionMessage(txnId, position);
+ if (checkIfUnused()){
+ takeSnapshot().thenAccept(ignore -> {
+ topic.getManagedLedger().asyncAddEntry(buffer, new
AsyncCallbacks.AddEntryCallback() {
+ @Override
+ public void addComplete(Position position, ByteBuf
entryData, Object ctx) {
+ synchronized (TopicTransactionBuffer.this) {
+ handleTransactionMessage(txnId, position);
+ if (!changeToReadyStateAfterUsed()){
+ log.error("Fail to change state when add
message with transaction at the first time.");
+ }
+ timer.newTimeout(TopicTransactionBuffer.this,
+ takeSnapshotIntervalTime,
TimeUnit.MILLISECONDS);
Review comment:
can move to after takeSnapshot and before asyncAddEntry
##########
File path: pulsar-io/jdbc/lombok.config
##########
@@ -1,22 +0,0 @@
-#
Review comment:
why delete this file?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]