This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 054f18b  Implementation of Transaction Buffer Client. (#6544)
054f18b is described below

commit 054f18b35c3da81878f331689d1a85e29adbf592
Author: lipenghui <[email protected]>
AuthorDate: Mon Jul 27 21:12:48 2020 +0800

    Implementation of Transaction Buffer Client. (#6544)
    
    * Implementation of Transaction Buffer Client.
    
    * Fix check style.
    
    * add license header
    
    * fix  tests
---
 .../apache/pulsar/broker/service/ServerCnx.java    |  10 +
 .../buffer/impl/TransactionBufferClientImpl.java   |  68 +++++
 .../buffer/impl/TransactionBufferHandlerImpl.java  | 302 +++++++++++++++++++++
 .../buffer/TransactionBufferClientTest.java        | 113 ++++++++
 .../TransactionCoordinatorClientTest.java          |   6 +
 .../TransactionMetaStoreAssignmentTest.java        |   6 +
 .../coordinator/TransactionMetaStoreTestBase.java  |   8 +-
 .../api}/transaction/TransactionBufferClient.java  |  17 +-
 .../TransactionBufferClientException.java          |  74 +++++
 .../org/apache/pulsar/client/impl/ClientCnx.java   |  30 ++
 .../transaction/TransactionBufferClientImpl.java   |  59 ----
 .../impl/transaction/TransactionBufferHandler.java |  67 +++++
 .../apache/pulsar/common/protocol/Commands.java    |  11 +
 13 files changed, 700 insertions(+), 71 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 72f86f7..6dc384f 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -1705,6 +1705,16 @@ public class ServerCnx extends PulsarHandler {
             });
     }
 
+    @Override
+    protected void handleEndTxnOnPartition(PulsarApi.CommandEndTxnOnPartition 
command) {
+        
ctx.writeAndFlush(Commands.newEndTxnOnPartitionResponse(command.getRequestId(), 
command.getTxnidLeastBits(), command.getTxnidMostBits()));
+    }
+
+    @Override
+    protected void 
handleEndTxnOnSubscription(PulsarApi.CommandEndTxnOnSubscription command) {
+        
ctx.writeAndFlush(Commands.newEndTxnOnSubscriptionResponse(command.getRequestId(),
 command.getTxnidLeastBits(), command.getTxnidMostBits()));
+    }
+
     private CompletableFuture<SchemaVersion> tryAddSchema(Topic topic, 
SchemaData schema) {
         if (schema != null) {
             return topic.addSchema(schema);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferClientImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferClientImpl.java
new file mode 100644
index 0000000..8294e29
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferClientImpl.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction.buffer.impl;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.namespace.NamespaceService;
+import org.apache.pulsar.client.api.transaction.TransactionBufferClient;
+import org.apache.pulsar.client.api.transaction.TxnID;
+import org.apache.pulsar.client.impl.ConnectionPool;
+import org.apache.pulsar.client.impl.transaction.TransactionBufferHandler;
+import org.apache.pulsar.common.api.proto.PulsarApi;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * The implementation of {@link TransactionBufferClient}.
+ */
+@Slf4j
+public class TransactionBufferClientImpl implements TransactionBufferClient {
+
+    private final TransactionBufferHandler tbHandler;
+
+    private TransactionBufferClientImpl(TransactionBufferHandler tbHandler) {
+        this.tbHandler = tbHandler;
+    }
+
+    public static TransactionBufferClient create(NamespaceService 
namespaceService, ConnectionPool connectionPool) {
+        TransactionBufferHandler handler = new 
TransactionBufferHandlerImpl(connectionPool, namespaceService);
+        return new TransactionBufferClientImpl(handler);
+    }
+
+    @Override
+    public CompletableFuture<TxnID> commitTxnOnTopic(String topic, long 
txnIdMostBits, long txnIdLeastBits) {
+        return tbHandler.endTxnOnTopic(topic, txnIdMostBits, txnIdLeastBits, 
PulsarApi.TxnAction.COMMIT);
+    }
+
+    @Override
+    public CompletableFuture<TxnID> abortTxnOnTopic(String topic, long 
txnIdMostBits, long txnIdLeastBits) {
+        return tbHandler.endTxnOnTopic(topic, txnIdMostBits, txnIdLeastBits, 
PulsarApi.TxnAction.ABORT);
+    }
+
+    @Override
+    public CompletableFuture<TxnID> commitTxnOnSubscription(String topic, 
String subscription, long txnIdMostBits, long txnIdLeastBits) {
+        return tbHandler.endTxnOnSubscription(topic, subscription, 
txnIdMostBits, txnIdLeastBits, PulsarApi.TxnAction.COMMIT);
+    }
+
+    @Override
+    public CompletableFuture<TxnID> abortTxnOnSubscription(String topic, 
String subscription, long txnIdMostBits, long txnIdLeastBits) {
+        return tbHandler.endTxnOnSubscription(topic, subscription, 
txnIdMostBits, txnIdLeastBits, PulsarApi.TxnAction.ABORT);
+    }
+
+}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferHandlerImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferHandlerImpl.java
new file mode 100644
index 0000000..62c815f
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferHandlerImpl.java
@@ -0,0 +1,302 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction.buffer.impl;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.util.HashedWheelTimer;
+import io.netty.util.Recycler;
+import io.netty.util.ReferenceCountUtil;
+import io.netty.util.Timeout;
+import io.netty.util.TimerTask;
+import io.netty.util.concurrent.DefaultThreadFactory;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.namespace.NamespaceService;
+import 
org.apache.pulsar.client.api.transaction.TransactionBufferClientException;
+import org.apache.pulsar.client.api.transaction.TxnID;
+import org.apache.pulsar.client.impl.ClientCnx;
+import org.apache.pulsar.client.impl.ConnectionPool;
+import org.apache.pulsar.common.api.proto.PulsarApi;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.protocol.Commands;
+import org.apache.pulsar.client.impl.transaction.TransactionBufferHandler;
+import org.apache.pulsar.common.util.FutureUtil;
+
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+@Slf4j
+public class TransactionBufferHandlerImpl implements TransactionBufferHandler, 
TimerTask {
+
+    private final ConcurrentSkipListMap<Long, OpRequestSend> pendingRequests;
+    private final ConnectionPool connectionPool;
+    private final NamespaceService namespaceService;
+    private final AtomicLong requestIdGenerator = new AtomicLong();
+    private long operationTimeoutInMills;
+    private Timeout requestTimeout;
+    private HashedWheelTimer timer;
+    private final Semaphore semaphore;
+    private final boolean blockIfReachMaxPendingOps;
+
+    public TransactionBufferHandlerImpl(ConnectionPool connectionPool, 
NamespaceService namespaceService) {
+        this.connectionPool = connectionPool;
+        this.pendingRequests = new ConcurrentSkipListMap<>();
+        this.namespaceService = namespaceService;
+        this.operationTimeoutInMills = 3000L;
+        this.semaphore = new Semaphore(10000);
+        this.blockIfReachMaxPendingOps = true;
+        this.timer = new HashedWheelTimer(new 
DefaultThreadFactory("pulsar-transaction-buffer-client-timer"));
+        this.requestTimeout = timer.newTimeout(this, operationTimeoutInMills, 
TimeUnit.MILLISECONDS);
+    }
+
+    @Override
+    public CompletableFuture<TxnID> endTxnOnTopic(String topic, long 
txnIdMostBits, long txnIdLeastBits, PulsarApi.TxnAction action) {
+        CompletableFuture<TxnID> cb = new CompletableFuture<>();
+        if (!canSendRequest(cb)) {
+            return cb;
+        }
+        long requestId = requestIdGenerator.getAndIncrement();
+        ByteBuf cmd = Commands.newEndTxnOnPartition(requestId, txnIdLeastBits, 
txnIdMostBits, topic, action);
+        OpRequestSend op = OpRequestSend.create(requestId, topic, cmd, cb);
+        pendingRequests.put(requestId, op);
+        cmd.retain();
+        cnx(topic).whenComplete((clientCnx, throwable) -> {
+            if (throwable == null) {
+                try {
+                    clientCnx.ctx().writeAndFlush(cmd, 
clientCnx.ctx().voidPromise());
+                } catch (Exception e) {
+                    cb.completeExceptionally(e);
+                    pendingRequests.remove(requestId);
+                    op.recycle();
+                }
+            } else {
+                cb.completeExceptionally(throwable);
+                pendingRequests.remove(requestId);
+                op.recycle();
+            }
+        });
+        return cb;
+    }
+
+    @Override
+    public CompletableFuture<TxnID> endTxnOnSubscription(String topic, String 
subscription, long txnIdMostBits, long txnIdLeastBits, PulsarApi.TxnAction 
action) {
+        CompletableFuture<TxnID> cb = new CompletableFuture<>();
+        if (!canSendRequest(cb)) {
+            return cb;
+        }
+        long requestId = requestIdGenerator.getAndIncrement();
+        ByteBuf cmd = Commands.newEndTxnOnSubscription(requestId, 
txnIdLeastBits, txnIdMostBits, topic, subscription, action);
+        OpRequestSend op = OpRequestSend.create(requestId, topic, cmd, cb);
+        pendingRequests.put(requestId, op);
+        cmd.retain();
+        cnx(topic).whenComplete((clientCnx, throwable) -> {
+            if (throwable == null) {
+                try {
+                    clientCnx.ctx().writeAndFlush(cmd, 
clientCnx.ctx().voidPromise());
+                } catch (Exception e) {
+                    cb.completeExceptionally(e);
+                    pendingRequests.remove(requestId);
+                    op.recycle();
+                }
+            } else {
+                cb.completeExceptionally(throwable);
+                pendingRequests.remove(requestId);
+                op.recycle();
+            }
+        });
+        return cb;
+    }
+
+    @Override
+    public void handleEndTxnOnTopicResponse(long requestId, 
PulsarApi.CommandEndTxnOnPartitionResponse response) {
+        OpRequestSend op = pendingRequests.remove(requestId);
+        if (op == null) {
+            if (log.isDebugEnabled()) {
+                log.debug("Got end txn on topic response for timeout {} - {}", 
response.getTxnidMostBits(),
+                        response.getTxnidLeastBits());
+            }
+            return;
+        }
+
+        if (!response.hasError()) {
+            if (log.isDebugEnabled()) {
+                log.debug("[{}] Got end txn on topic response for for request 
{}", op.topic, response.getRequestId());
+            }
+            op.cb.complete(new TxnID(response.getTxnidMostBits(), 
response.getTxnidLeastBits()));
+        } else {
+            log.error("[{}] Got end txn on topic response for request {} error 
{}", op.topic, response.getRequestId(), response.getError());
+            op.cb.completeExceptionally(getException(response.getError(), 
response.getMessage()));
+        }
+        op.recycle();
+    }
+
+    @Override
+    public void handleEndTxnOnSubscriptionResponse(long requestId, 
PulsarApi.CommandEndTxnOnSubscriptionResponse response) {
+        OpRequestSend op = pendingRequests.remove(requestId);
+        if (op == null) {
+            if (log.isDebugEnabled()) {
+                log.debug("Got end txn on subscription response for timeout {} 
- {}", response.getTxnidMostBits(),
+                        response.getTxnidLeastBits());
+            }
+            return;
+        }
+
+        if (!response.hasError()) {
+            if (log.isDebugEnabled()) {
+                log.debug("[{}] Got end txn on subscription response for for 
request {}", op.topic, response.getRequestId());
+            }
+            op.cb.complete(new TxnID(response.getTxnidMostBits(), 
response.getTxnidLeastBits()));
+        } else {
+            log.error("[{}] Got end txn on subscription response for request 
{} error {}", op.topic, response.getRequestId(), response.getError());
+            op.cb.completeExceptionally(getException(response.getError(), 
response.getMessage()));
+        }
+        op.recycle();
+    }
+
+    private CompletableFuture<ClientCnx> cnx(String topic) {
+        return getServiceUrl(topic).thenCompose(serviceUrl -> {
+            try {
+                if (serviceUrl == null) {
+                    return CompletableFuture.completedFuture(null);
+                }
+                URI uri = new URI(serviceUrl);
+                return 
connectionPool.getConnection(InetSocketAddress.createUnresolved(uri.getHost(), 
uri.getPort())).thenCompose(clientCnx -> {
+                    
clientCnx.registerTransactionBufferHandler(TransactionBufferHandlerImpl.this);
+                    return CompletableFuture.completedFuture(clientCnx);
+                });
+            } catch (Exception e) {
+                return FutureUtil.failedFuture(e);
+            }
+        });
+    }
+
+    private CompletableFuture<String> getServiceUrl(String topic) {
+        TopicName topicName = TopicName.get(topic);
+        return namespaceService.getBundleAsync(topicName)
+                .thenCompose(namespaceService::getOwnerAsync)
+                .thenCompose(ned -> {
+                    String serviceUrl = null;
+                    if (ned.isPresent()) {
+                        serviceUrl = ned.get().getNativeUrl();
+                    }
+                   return CompletableFuture.completedFuture(serviceUrl);
+                });
+    }
+
+    private TransactionBufferClientException 
getException(PulsarApi.ServerError serverError, String msg) {
+        return new TransactionBufferClientException(msg);
+    }
+
+    private boolean canSendRequest(CompletableFuture<?> callback) {
+        try {
+            if (blockIfReachMaxPendingOps) {
+                semaphore.acquire();
+            } else {
+                if (!semaphore.tryAcquire()) {
+                    callback.completeExceptionally(new 
TransactionBufferClientException("Reach max pending ops."));
+                    return false;
+                }
+            }
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            
callback.completeExceptionally(TransactionBufferClientException.unwrap(e));
+            return false;
+        }
+        return true;
+    }
+
+    @Override
+    public void run(Timeout timeout) throws Exception {
+        if (timeout.isCancelled()) {
+            return;
+        }
+        long timeToWaitMs;
+        OpRequestSend peeked = null;
+        Map.Entry<Long, OpRequestSend> firstEntry = 
pendingRequests.firstEntry();
+        peeked = firstEntry == null ? null : firstEntry.getValue();
+        while (peeked != null && peeked.createdAt + operationTimeoutInMills - 
System.currentTimeMillis() <= 0) {
+            if (!peeked.cb.isDone()) {
+                peeked.cb.completeExceptionally(new 
TransactionBufferClientException.RequestTimeoutException());
+                onResponse(peeked);
+            } else {
+                break;
+            }
+            firstEntry = pendingRequests.firstEntry();
+            peeked = firstEntry == null ? null : firstEntry.getValue();
+        }
+        if (peeked == null) {
+            timeToWaitMs = operationTimeoutInMills;
+        } else {
+            long diff = (peeked.createdAt + operationTimeoutInMills) - 
System.currentTimeMillis();
+            if (diff <= 0) {
+                timeToWaitMs = operationTimeoutInMills;
+            } else {
+                timeToWaitMs = diff;
+            }
+        }
+        requestTimeout = timer.newTimeout(this, timeToWaitMs, 
TimeUnit.MILLISECONDS);
+    }
+
+    void onResponse(OpRequestSend op) {
+        ReferenceCountUtil.safeRelease(op.byteBuf);
+        op.recycle();
+        semaphore.release();
+    }
+
+    private static final class OpRequestSend {
+
+        long requestId;
+        String topic;
+        ByteBuf byteBuf;
+        CompletableFuture<TxnID> cb;
+        long createdAt;
+
+        static OpRequestSend create(long requestId, String topic, ByteBuf 
byteBuf, CompletableFuture<TxnID> cb) {
+            OpRequestSend op = RECYCLER.get();
+            op.requestId = requestId;
+            op.topic = topic;
+            op.byteBuf = byteBuf;
+            op.cb = cb;
+            op.createdAt = System.nanoTime();
+            return op;
+        }
+
+        void recycle() {
+            recyclerHandle.recycle(this);
+        }
+
+        private OpRequestSend(Recycler.Handle<OpRequestSend> recyclerHandle) {
+            this.recyclerHandle = recyclerHandle;
+        }
+
+        private final Recycler.Handle<OpRequestSend> recyclerHandle;
+
+        private static final Recycler<OpRequestSend> RECYCLER = new 
Recycler<OpRequestSend>() {
+            @Override
+            protected OpRequestSend newObject(Handle<OpRequestSend> handle) {
+                return new OpRequestSend(handle);
+            }
+        };
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientTest.java
new file mode 100644
index 0000000..932270a
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientTest.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction.buffer;
+
+import com.google.common.collect.Sets;
+import 
org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferClientImpl;
+import 
org.apache.pulsar.broker.transaction.coordinator.TransactionMetaStoreTestBase;
+import org.apache.pulsar.client.api.transaction.TransactionBufferClient;
+import org.apache.pulsar.client.api.transaction.TxnID;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+public class TransactionBufferClientTest extends TransactionMetaStoreTestBase {
+
+    private static final Logger log = 
LoggerFactory.getLogger(TransactionBufferClientTest.class);
+    private TransactionBufferClient tbClient;
+    TopicName partitionedTopicName = TopicName.get("persistent", "public", 
"test", "tb-client");
+    int partitions = 10;
+
+    @BeforeClass
+    void init() throws Exception {
+        super.setup();
+        pulsarAdmins[0].clusters().createCluster("my-cluster", new 
ClusterData(pulsarServices[0].getWebServiceAddress()));
+        pulsarAdmins[0].tenants().createTenant("public", new 
TenantInfo(Sets.newHashSet(), Sets.newHashSet("my-cluster")));
+        pulsarAdmins[0].namespaces().createNamespace("public/test", 10);
+        
pulsarAdmins[0].topics().createPartitionedTopic(partitionedTopicName.getPartitionedTopicName(),
 partitions);
+        pulsarClient.newConsumer()
+                .topic(partitionedTopicName.getPartitionedTopicName())
+                .subscriptionName("test").subscribe();
+        tbClient = 
TransactionBufferClientImpl.create(pulsarServices[0].getNamespaceService(),
+                ((PulsarClientImpl) pulsarClient).getCnxPool());
+    }
+
+    @Test
+    public void testCommitOnTopic() throws ExecutionException, 
InterruptedException {
+        List<CompletableFuture<TxnID>> futures = new ArrayList<>();
+        for (int i = 0; i < partitions; i++) {
+            String topic = partitionedTopicName.getPartition(i).toString();
+            futures.add(tbClient.commitTxnOnTopic(topic, 1L, i));
+        }
+        for (int i = 0; i < futures.size(); i++) {
+            Assert.assertEquals(futures.get(i).get().getMostSigBits(), 1L);
+            Assert.assertEquals(futures.get(i).get().getLeastSigBits(), i);
+        }
+    }
+
+    @Test
+    public void testAbortOnTopic() throws ExecutionException, 
InterruptedException {
+        List<CompletableFuture<TxnID>> futures = new ArrayList<>();
+        for (int i = 0; i < partitions; i++) {
+            String topic = partitionedTopicName.getPartition(i).toString();
+            futures.add(tbClient.abortTxnOnTopic(topic, 1L, i));
+        }
+        for (int i = 0; i < futures.size(); i++) {
+            Assert.assertEquals(futures.get(i).get().getMostSigBits(), 1L);
+            Assert.assertEquals(futures.get(i).get().getLeastSigBits(), i);
+        }
+    }
+
+    @Test
+    public void testCommitOnSubscription() throws ExecutionException, 
InterruptedException {
+        List<CompletableFuture<TxnID>> futures = new ArrayList<>();
+        for (int i = 0; i < partitions; i++) {
+            String topic = partitionedTopicName.getPartition(i).toString();
+            futures.add(tbClient.commitTxnOnSubscription(topic, "test", 1L, 
i));
+        }
+        for (int i = 0; i < futures.size(); i++) {
+            Assert.assertEquals(futures.get(i).get().getMostSigBits(), 1L);
+            Assert.assertEquals(futures.get(i).get().getLeastSigBits(), i);
+        }
+    }
+
+    @Test
+    public void testAbortOnSubscription() throws ExecutionException, 
InterruptedException {
+        List<CompletableFuture<TxnID>> futures = new ArrayList<>();
+        for (int i = 0; i < partitions; i++) {
+            String topic = partitionedTopicName.getPartition(i).toString();
+            futures.add(tbClient.abortTxnOnSubscription(topic, "test", 1L, i));
+        }
+        for (int i = 0; i < futures.size(); i++) {
+            Assert.assertEquals(futures.get(i).get().getMostSigBits(), 1L);
+            Assert.assertEquals(futures.get(i).get().getLeastSigBits(), i);
+        }
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionCoordinatorClientTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionCoordinatorClientTest.java
index 9880c46..b751461 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionCoordinatorClientTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionCoordinatorClientTest.java
@@ -24,10 +24,16 @@ import 
org.apache.pulsar.client.api.transaction.TransactionCoordinatorClient.Sta
 import 
org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException;
 import org.apache.pulsar.client.api.transaction.TxnID;
 import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 public class TransactionCoordinatorClientTest extends 
TransactionMetaStoreTestBase {
 
+    @BeforeClass
+    public void init() throws Exception {
+        super.setup();
+    }
+
     @Test
     public void testClientStart() throws PulsarClientException, 
TransactionCoordinatorClientException, InterruptedException {
         try {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionMetaStoreAssignmentTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionMetaStoreAssignmentTest.java
index 8e1ee21..362043f 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionMetaStoreAssignmentTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionMetaStoreAssignmentTest.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.broker.transaction.coordinator;
 
 import org.apache.pulsar.broker.PulsarService;
 import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 import java.io.IOException;
@@ -28,6 +29,11 @@ import java.util.List;
 
 public class TransactionMetaStoreAssignmentTest extends 
TransactionMetaStoreTestBase {
 
+    @BeforeClass
+    public void init() throws Exception {
+        super.setup();
+    }
+
     @Test
     public void testTransactionMetaStoreAssignAndFailover() throws 
IOException, InterruptedException {
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionMetaStoreTestBase.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionMetaStoreTestBase.java
index 325bf6c..79b383b 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionMetaStoreTestBase.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionMetaStoreTestBase.java
@@ -38,14 +38,14 @@ public class TransactionMetaStoreTestBase {
 
     LocalBookkeeperEnsemble bkEnsemble;
     protected PulsarAdmin[] pulsarAdmins = new PulsarAdmin[BROKER_COUNT];
+    protected PulsarClient pulsarClient;
     protected static final int BROKER_COUNT = 5;
     protected ServiceConfiguration[] configurations = new 
ServiceConfiguration[BROKER_COUNT];
     protected PulsarService[] pulsarServices = new PulsarService[BROKER_COUNT];
 
     protected TransactionCoordinatorClient transactionCoordinatorClient;
 
-    @BeforeClass
-    void setup() throws Exception {
+    protected void setup() throws Exception {
         log.info("---- Initializing SLAMonitoringTest -----");
         // Start local bookkeeper ensemble
         bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> 0);
@@ -80,10 +80,10 @@ public class TransactionMetaStoreTestBase {
 
         Thread.sleep(100);
 
-        PulsarClient client = PulsarClient.builder().
+        pulsarClient = PulsarClient.builder().
             serviceUrl(pulsarServices[0].getBrokerServiceUrl())
             .build();
-        transactionCoordinatorClient = new 
TransactionCoordinatorClientImpl(client);
+        transactionCoordinatorClient = new 
TransactionCoordinatorClientImpl(pulsarClient);
         transactionCoordinatorClient.start();
 
         Thread.sleep(3000);
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionBufferClient.java
 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/transaction/TransactionBufferClient.java
similarity index 83%
rename from 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionBufferClient.java
rename to 
pulsar-client-api/src/main/java/org/apache/pulsar/client/api/transaction/TransactionBufferClient.java
index cd1a675..523773a 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionBufferClient.java
+++ 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/transaction/TransactionBufferClient.java
@@ -16,12 +16,13 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.client.impl.transaction;
+package org.apache.pulsar.client.api.transaction;
 
 import java.util.concurrent.CompletableFuture;
 
 /**
- * The transaction buffer client to commit and abort transactions on topics.
+ * The transaction buffer client to commit and abort transactions on topics or 
subscription.
+ * The transaction buffer client is used by transaction coordinator to end 
transactions.
  */
 public interface TransactionBufferClient {
 
@@ -33,9 +34,9 @@ public interface TransactionBufferClient {
      * @param txnIdLeastBits the least bits of txn id
      * @return the future represents the commit result
      */
-    CompletableFuture<Void> commitTxnOnTopic(String topic,
-                                             long txnIdMostBits,
-                                             long txnIdLeastBits);
+    CompletableFuture<TxnID> commitTxnOnTopic(String topic,
+                                              long txnIdMostBits,
+                                              long txnIdLeastBits);
 
     /**
      * Abort the transaction associated with the topic.
@@ -45,7 +46,7 @@ public interface TransactionBufferClient {
      * @param txnIdLeastBits the least bits of txn id
      * @return the future represents the abort result
      */
-    CompletableFuture<Void> abortTxnOnTopic(String topic,
+    CompletableFuture<TxnID> abortTxnOnTopic(String topic,
                                             long txnIdMostBits,
                                             long txnIdLeastBits);
 
@@ -58,7 +59,7 @@ public interface TransactionBufferClient {
      * @param txnIdLeastBits the least bits of txn id
      * @return the future represents the commit result
      */
-    CompletableFuture<Void> commitTxnOnSubscription(String topic,
+    CompletableFuture<TxnID> commitTxnOnSubscription(String topic,
                                                     String subscription,
                                                     long txnIdMostBits,
                                                     long txnIdLeastBits);
@@ -72,7 +73,7 @@ public interface TransactionBufferClient {
      * @param txnIdLeastBits the least bits of txn id
      * @return the future represents the abort result
      */
-    CompletableFuture<Void> abortTxnOnSubscription(String topic,
+    CompletableFuture<TxnID> abortTxnOnSubscription(String topic,
                                                    String subscription,
                                                    long txnIdMostBits,
                                                    long txnIdLeastBits);
diff --git 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/transaction/TransactionBufferClientException.java
 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/transaction/TransactionBufferClientException.java
new file mode 100644
index 0000000..40510da
--- /dev/null
+++ 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/transaction/TransactionBufferClientException.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api.transaction;
+
+import java.io.IOException;
+import java.util.concurrent.ExecutionException;
+
+/**
+ * Exceptions for transaction buffer client.
+ */
+public class TransactionBufferClientException extends IOException {
+
+    public TransactionBufferClientException(Throwable t) {
+        super(t);
+    }
+
+    public TransactionBufferClientException(String message) {
+        super(message);
+    }
+
+    /**
+     * Thrown when operation timeout.
+     */
+    public static class RequestTimeoutException extends 
TransactionBufferClientException {
+
+        public RequestTimeoutException() {
+            super("Transaction buffer request timeout.");
+        }
+
+        public RequestTimeoutException(String message) {
+            super(message);
+        }
+    }
+
+    public static TransactionBufferClientException unwrap(Throwable t) {
+        if (t instanceof TransactionBufferClientException) {
+            return (TransactionBufferClientException) t;
+        } else if (t instanceof RuntimeException) {
+            throw (RuntimeException) t;
+        } else if (t instanceof InterruptedException) {
+            Thread.currentThread().interrupt();
+            return new TransactionBufferClientException(t);
+        }  else if (!(t instanceof ExecutionException)) {
+            // Generic exception
+            return new TransactionBufferClientException(t);
+        }
+
+        Throwable cause = t.getCause();
+        String msg = cause.getMessage();
+
+        if (cause instanceof 
TransactionBufferClientException.RequestTimeoutException) {
+            return new 
TransactionBufferClientException.RequestTimeoutException(msg);
+        } else {
+            return new TransactionBufferClientException(t);
+        }
+
+    }
+}
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index ea1624b..28c880f 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -89,6 +89,7 @@ import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.protocol.schema.SchemaInfoUtil;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
+import org.apache.pulsar.client.impl.transaction.TransactionBufferHandler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -147,6 +148,7 @@ public class ClientCnx extends PulsarHandler {
 
     // Added for mutual authentication.
     protected AuthenticationDataProvider authenticationDataProvider;
+    private TransactionBufferHandler transactionBufferHandler;
 
     enum State {
         None, SentConnectFrame, Ready, Failed, Connecting
@@ -866,6 +868,22 @@ public class ClientCnx extends PulsarHandler {
     }
 
     @Override
+    protected void 
handleEndTxnOnPartitionResponse(PulsarApi.CommandEndTxnOnPartitionResponse 
command) {
+        TransactionBufferHandler handler = 
checkAndGetTransactionBufferHandler();
+        if (handler != null) {
+            handler.handleEndTxnOnTopicResponse(command.getRequestId(), 
command);
+        }
+    }
+
+    @Override
+    protected void 
handleEndTxnOnSubscriptionResponse(PulsarApi.CommandEndTxnOnSubscriptionResponse
 command) {
+        TransactionBufferHandler handler = 
checkAndGetTransactionBufferHandler();
+        if (handler != null) {
+            handler.handleEndTxnOnSubscriptionResponse(command.getRequestId(), 
command);
+        }
+    }
+
+    @Override
     protected void handleEndTxnResponse(PulsarApi.CommandEndTxnResponse 
command) {
         TransactionMetaStoreHandler handler = 
checkAndGetTransactionMetaStoreHandler(command.getTxnidMostBits());
         if (handler != null) {
@@ -882,6 +900,14 @@ public class ClientCnx extends PulsarHandler {
         return handler;
     }
 
+    private TransactionBufferHandler checkAndGetTransactionBufferHandler() {
+        if (transactionBufferHandler == null) {
+            channel().close();
+            log.warn("Close the channel since can't get the transaction buffer 
handler.");
+        }
+        return transactionBufferHandler;
+    }
+
     /**
      * check serverError and take appropriate action
      * <ul>
@@ -956,6 +982,10 @@ public class ClientCnx extends PulsarHandler {
         transactionMetaStoreHandlers.put(transactionMetaStoreId, handler);
     }
 
+    public void registerTransactionBufferHandler(final 
TransactionBufferHandler handler) {
+        transactionBufferHandler = handler;
+    }
+
     void removeProducer(final long producerId) {
         producers.remove(producerId);
     }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionBufferClientImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionBufferClientImpl.java
deleted file mode 100644
index fa0941f..0000000
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionBufferClientImpl.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.client.impl.transaction;
-
-import java.util.concurrent.CompletableFuture;
-import org.apache.pulsar.client.impl.PulsarClientImpl;
-import org.apache.pulsar.common.util.FutureUtil;
-
-/**
- * The implementation of {@link TransactionBufferClient}.
- */
-public class TransactionBufferClientImpl implements TransactionBufferClient {
-
-    private final PulsarClientImpl client;
-
-    public TransactionBufferClientImpl(PulsarClientImpl client) {
-        this.client = client;
-    }
-
-    @Override
-    public CompletableFuture<Void> commitTxnOnTopic(String topic, long 
txnIdMostBits, long txnIdLeastBits) {
-        return FutureUtil.failedFuture(
-            new UnsupportedOperationException("Not Implemented Yet"));
-    }
-
-    @Override
-    public CompletableFuture<Void> abortTxnOnTopic(String topic, long 
txnIdMostBits, long txnIdLeastBits) {
-        return FutureUtil.failedFuture(
-            new UnsupportedOperationException("Not Implemented Yet"));
-    }
-
-    @Override
-    public CompletableFuture<Void> commitTxnOnSubscription(String topic, 
String subscription, long txnIdMostBits, long txnIdLeastBits) {
-        return FutureUtil.failedFuture(
-            new UnsupportedOperationException("Not Implemented Yet"));
-    }
-
-    @Override
-    public CompletableFuture<Void> abortTxnOnSubscription(String topic, String 
subscription, long txnIdMostBits, long txnIdLeastBits) {
-        return FutureUtil.failedFuture(
-            new UnsupportedOperationException("Not Implemented Yet"));
-    }
-}
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionBufferHandler.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionBufferHandler.java
new file mode 100644
index 0000000..75c1ff1
--- /dev/null
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionBufferHandler.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.transaction;
+
+import org.apache.pulsar.client.api.transaction.TxnID;
+import org.apache.pulsar.common.api.proto.PulsarApi;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Interface of transaction buffer handler.
+ */
+public interface TransactionBufferHandler {
+
+    /**
+     * End transaction on topic.
+     * @param topic topic name
+     * @param txnIdMostBits txnIdMostBits
+     * @param txnIdLeastBits txnIdLeastBits
+     * @param action transaction action type
+     * @return TxnId
+     */
+    CompletableFuture<TxnID> endTxnOnTopic(String topic, long txnIdMostBits, 
long txnIdLeastBits,
+                                           PulsarApi.TxnAction action);
+
+    /**
+     * End transaction on subscription.
+     * @param topic topic name
+     * @param subscription subscription name
+     * @param txnIdMostBits txnIdMostBits
+     * @param txnIdLeastBits txnIdLeastBits
+     * @param action transaction action type
+     * @return TxnId
+     */
+    CompletableFuture<TxnID> endTxnOnSubscription(String topic, String 
subscription, long txnIdMostBits,
+        long txnIdLeastBits, PulsarApi.TxnAction action);
+
+    /**
+     * Handle response of end transaction on topic.
+     * @param requestId request ID
+     * @param response response
+     */
+    void handleEndTxnOnTopicResponse(long requestId, 
PulsarApi.CommandEndTxnOnPartitionResponse response);
+
+    /**
+     * Handle response of tend transaction on subscription
+     * @param requestId request ID
+     * @param response response
+     */
+    void handleEndTxnOnSubscriptionResponse(long requestId, 
PulsarApi.CommandEndTxnOnSubscriptionResponse response);
+}
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
index d867139..16da307 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
@@ -1431,6 +1431,16 @@ public class Commands {
         return res;
     }
 
+    public static ByteBuf newEndTxnOnSubscription(long requestId, long 
txnIdLeastBits, long txnIdMostBits, String topic,
+                                                  String subscription, 
TxnAction txnAction) {
+        Subscription.Builder builder = Subscription.newBuilder();
+        builder.setTopic(topic);
+        builder.setSubscription(subscription);
+        Subscription sub = builder.build();
+        builder.recycle();
+        return newEndTxnOnSubscription(requestId, txnIdLeastBits, 
txnIdMostBits, sub, txnAction);
+    }
+
     public static ByteBuf newEndTxnOnSubscription(long requestId, long 
txnIdLeastBits, long txnIdMostBits,
                                                   Subscription subscription, 
TxnAction txnAction) {
         CommandEndTxnOnSubscription commandEndTxnOnSubscription =
@@ -1443,6 +1453,7 @@ public class Commands {
                                        .build();
         ByteBuf res = 
serializeWithSize(BaseCommand.newBuilder().setType(Type.END_TXN_ON_SUBSCRIPTION)
                                                    
.setEndTxnOnSubscription(commandEndTxnOnSubscription));
+        subscription.recycle();
         commandEndTxnOnSubscription.recycle();
         return res;
     }

Reply via email to