sijie commented on a change in pull request #4935: [Transaction][Buffer] handle command `EndTxnOnPartitiion` URL: https://github.com/apache/pulsar/pull/4935#discussion_r313771158
########## File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java ########## @@ -1358,6 +1360,57 @@ protected void handleGetSchema(CommandGetSchema commandGetSchema) { }); } + @Override + protected void handleEndTxnOnPartition(PulsarApi.CommandEndTxnOnPartition commandEndTxnOnPartition) { + final long requestId = commandEndTxnOnPartition.getRequestId(); + final String topicName = commandEndTxnOnPartition.getTopic(); + final long mostBits = commandEndTxnOnPartition.getTxnidMostBits(); + final long leastBits = commandEndTxnOnPartition.getTxnidLeastBits(); + + TxnID txnID = new TxnID(mostBits, leastBits); + log.info("Received CommandEndTxnOnPartition from {}, [{}] transaction {} on the topic {}", remoteAddress, + commandEndTxnOnPartition.getTxnAction(), txnID, topicName); + service.getTopic(topicName, false).whenComplete((topic, throwable) -> { + if (throwable != null) { + ctx.writeAndFlush( + Commands.newEndTxnOnPartitionResponse(requestId, ServerError.UnknownError, throwable.getMessage())); + } else { + if (topic.isPresent()) { + final Topic partition = topic.get(); + // sequenceId ? + TxnMarkerController markerController = new TxnMarkerController(topic.get(), txnID, -1); + if (commandEndTxnOnPartition.getTxnAction().equals(PulsarApi.TxnAction.COMMIT)) { + markerController.publishCommitMarker() + .thenApply(position -> (PositionImpl)position) + .thenCombine(partition.getTxnBuffer(false), (position, transactionBuffer) -> + transactionBuffer.commitTxn(txnID, position.getLedgerId(), position.getEntryId())) + .thenAccept(ignore -> + ctx.writeAndFlush( + Commands.newEndTxnOnPartitionResponse(requestId, txnID.getLeastSigBits(), txnID.getMostSigBits()))) + .exceptionally(err -> { + log.error("Commit txn error :", err); + ctx.writeAndFlush( + Commands.newEndTxnOnPartitionResponse(requestId, ServerError.UnknownError, err.getMessage())); + return null; + }); + } else if (commandEndTxnOnPartition.getTxnAction().equals(PulsarApi.TxnAction.ABORT)) { + markerController.publishAbortMarker() Review comment: Why do you need to publish an abort marker? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services