liangyepianzhou commented on code in PR #20559:
URL: https://github.com/apache/pulsar/pull/20559#discussion_r1317031590
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java:
##########
@@ -250,7 +231,54 @@ public long getCommittedTxnCount() {
}
@Override
+/**
+ * If it's the first time using the transaction buffer, the method records the
max read position of the topic
+ * to metadata of the topic at the time of the first write. If the transaction
buffer has been used before,
+ * it writes the message after recovering the transaction buffer.
+ */
public CompletableFuture<Position> appendBufferToTxn(TxnID txnId, long
sequenceId, ByteBuf buffer) {
+ ManagedLedgerImpl managedLedger = (ManagedLedgerImpl)
topic.getManagedLedger();
+ if (checkIfReady()) {
+ // Transaction buffer is ready, directly append the buffer
+ return internalAppendBufferToTxn(txnId, sequenceId, buffer);
+ } else if (changeToInitializingState()) {
+ // Transition to initializing state and record max read position
if it's the first usage
+ Position firstMaxReadPosition =
topic.getManagedLedger().getLastConfirmedEntry();
+ if (managedLedger.getProperties().get(MAX_READ_POSITION) == null) {
+ HashMap<String, String> hashMap = new HashMap<>();
+ hashMap.put(MAX_READ_POSITION,
firstMaxReadPosition.toString());
+ managedLedger.asyncSetProperties(hashMap, new
AsyncCallbacks.UpdatePropertiesCallback() {
+ @Override
+ public void updatePropertiesComplete(Map<String, String>
properties, Object ctx) {
+ changeToReadyState();
+ transactionBufferFuture.complete(null);
+ }
+
+ @Override
+ public void updatePropertiesFailed(ManagedLedgerException
exception, Object ctx) {
+ log.error("Failed to set first max read position to
topic {}", topic.getName(), exception);
+ changeToCloseState();
+
transactionBufferFuture.completeExceptionally(exception);
+ }
+ }, null);
+ }
+ }
+ // Return a CompletableFuture that will complete after recovering the
transaction buffer,
+ // then append the buffer
+ return transactionBufferFuture.thenCompose(ignore ->
internalAppendBufferToTxn(txnId, sequenceId, buffer));
+ }
+
+ private PositionImpl getPositionFromString(String positionStr) {
Review Comment:
I remove the new adding struct method of TransactionBuffer, so it is needed
in the TransactionBuffer.
We can not make this method is public in the PersistentTopic.
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java:
##########
@@ -250,7 +231,54 @@ public long getCommittedTxnCount() {
}
@Override
+/**
+ * If it's the first time using the transaction buffer, the method records the
max read position of the topic
+ * to metadata of the topic at the time of the first write. If the transaction
buffer has been used before,
+ * it writes the message after recovering the transaction buffer.
+ */
public CompletableFuture<Position> appendBufferToTxn(TxnID txnId, long
sequenceId, ByteBuf buffer) {
+ ManagedLedgerImpl managedLedger = (ManagedLedgerImpl)
topic.getManagedLedger();
+ if (checkIfReady()) {
+ // Transaction buffer is ready, directly append the buffer
+ return internalAppendBufferToTxn(txnId, sequenceId, buffer);
+ } else if (changeToInitializingState()) {
+ // Transition to initializing state and record max read position
if it's the first usage
+ Position firstMaxReadPosition =
topic.getManagedLedger().getLastConfirmedEntry();
+ if (managedLedger.getProperties().get(MAX_READ_POSITION) == null) {
+ HashMap<String, String> hashMap = new HashMap<>();
+ hashMap.put(MAX_READ_POSITION,
firstMaxReadPosition.toString());
+ managedLedger.asyncSetProperties(hashMap, new
AsyncCallbacks.UpdatePropertiesCallback() {
+ @Override
+ public void updatePropertiesComplete(Map<String, String>
properties, Object ctx) {
+ changeToReadyState();
+ transactionBufferFuture.complete(null);
+ }
+
+ @Override
+ public void updatePropertiesFailed(ManagedLedgerException
exception, Object ctx) {
+ log.error("Failed to set first max read position to
topic {}", topic.getName(), exception);
+ changeToCloseState();
+
transactionBufferFuture.completeExceptionally(exception);
+ }
+ }, null);
+ }
+ }
+ // Return a CompletableFuture that will complete after recovering the
transaction buffer,
+ // then append the buffer
+ return transactionBufferFuture.thenCompose(ignore ->
internalAppendBufferToTxn(txnId, sequenceId, buffer));
+ }
+
+ private PositionImpl getPositionFromString(String positionStr) {
Review Comment:
I remove the new adding struct method of TransactionBuffer, so it is needed
in the TransactionBuffer.
We can not make this method public in the PersistentTopic.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]