sijie commented on a change in pull request #4935: [Transaction][Buffer] handle 
command `EndTxnOnPartitiion`
URL: https://github.com/apache/pulsar/pull/4935#discussion_r313771158
 
 

 ##########
 File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
 ##########
 @@ -1358,6 +1360,57 @@ protected void handleGetSchema(CommandGetSchema 
commandGetSchema) {
         });
     }
 
+    @Override
+    protected void handleEndTxnOnPartition(PulsarApi.CommandEndTxnOnPartition 
commandEndTxnOnPartition) {
+        final long requestId = commandEndTxnOnPartition.getRequestId();
+        final String topicName = commandEndTxnOnPartition.getTopic();
+        final long mostBits = commandEndTxnOnPartition.getTxnidMostBits();
+        final long leastBits = commandEndTxnOnPartition.getTxnidLeastBits();
+
+        TxnID txnID = new TxnID(mostBits, leastBits);
+        log.info("Received CommandEndTxnOnPartition from {}, [{}] transaction 
{} on the topic {}", remoteAddress,
+                 commandEndTxnOnPartition.getTxnAction(), txnID, topicName);
+        service.getTopic(topicName, false).whenComplete((topic, throwable) -> {
+            if (throwable != null) {
+                ctx.writeAndFlush(
+                    Commands.newEndTxnOnPartitionResponse(requestId, 
ServerError.UnknownError, throwable.getMessage()));
+            } else {
+                if (topic.isPresent()) {
+                    final Topic partition = topic.get();
+                    // sequenceId ?
+                    TxnMarkerController markerController = new 
TxnMarkerController(topic.get(), txnID, -1);
+                    if 
(commandEndTxnOnPartition.getTxnAction().equals(PulsarApi.TxnAction.COMMIT)) {
+                        markerController.publishCommitMarker()
+                                        .thenApply(position -> 
(PositionImpl)position)
+                                        
.thenCombine(partition.getTxnBuffer(false), (position, transactionBuffer) ->
+                                            transactionBuffer.commitTxn(txnID, 
position.getLedgerId(), position.getEntryId()))
+                                        .thenAccept(ignore ->
+                                                        ctx.writeAndFlush(
+                                                            
Commands.newEndTxnOnPartitionResponse(requestId, txnID.getLeastSigBits(), 
txnID.getMostSigBits())))
+                                        .exceptionally(err -> {
+                                            log.error("Commit txn error :", 
err);
+                                            ctx.writeAndFlush(
+                                                
Commands.newEndTxnOnPartitionResponse(requestId, ServerError.UnknownError, 
err.getMessage()));
+                                            return null;
+                                        });
+                    } else if 
(commandEndTxnOnPartition.getTxnAction().equals(PulsarApi.TxnAction.ABORT)) {
+                        markerController.publishAbortMarker()
 
 Review comment:
   Why do you need to publish an abort marker?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to