This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 054f18b Implementation of Transaction Buffer Client. (#6544)
054f18b is described below
commit 054f18b35c3da81878f331689d1a85e29adbf592
Author: lipenghui <[email protected]>
AuthorDate: Mon Jul 27 21:12:48 2020 +0800
Implementation of Transaction Buffer Client. (#6544)
* Implementation of Transaction Buffer Client.
* Fix check style.
* add license header
* fix tests
---
.../apache/pulsar/broker/service/ServerCnx.java | 10 +
.../buffer/impl/TransactionBufferClientImpl.java | 68 +++++
.../buffer/impl/TransactionBufferHandlerImpl.java | 302 +++++++++++++++++++++
.../buffer/TransactionBufferClientTest.java | 113 ++++++++
.../TransactionCoordinatorClientTest.java | 6 +
.../TransactionMetaStoreAssignmentTest.java | 6 +
.../coordinator/TransactionMetaStoreTestBase.java | 8 +-
.../api}/transaction/TransactionBufferClient.java | 17 +-
.../TransactionBufferClientException.java | 74 +++++
.../org/apache/pulsar/client/impl/ClientCnx.java | 30 ++
.../transaction/TransactionBufferClientImpl.java | 59 ----
.../impl/transaction/TransactionBufferHandler.java | 67 +++++
.../apache/pulsar/common/protocol/Commands.java | 11 +
13 files changed, 700 insertions(+), 71 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 72f86f7..6dc384f 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -1705,6 +1705,16 @@ public class ServerCnx extends PulsarHandler {
});
}
+ @Override
+ protected void handleEndTxnOnPartition(PulsarApi.CommandEndTxnOnPartition
command) {
+
ctx.writeAndFlush(Commands.newEndTxnOnPartitionResponse(command.getRequestId(),
command.getTxnidLeastBits(), command.getTxnidMostBits()));
+ }
+
+ @Override
+ protected void
handleEndTxnOnSubscription(PulsarApi.CommandEndTxnOnSubscription command) {
+
ctx.writeAndFlush(Commands.newEndTxnOnSubscriptionResponse(command.getRequestId(),
command.getTxnidLeastBits(), command.getTxnidMostBits()));
+ }
+
private CompletableFuture<SchemaVersion> tryAddSchema(Topic topic,
SchemaData schema) {
if (schema != null) {
return topic.addSchema(schema);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferClientImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferClientImpl.java
new file mode 100644
index 0000000..8294e29
--- /dev/null
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferClientImpl.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction.buffer.impl;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.namespace.NamespaceService;
+import org.apache.pulsar.client.api.transaction.TransactionBufferClient;
+import org.apache.pulsar.client.api.transaction.TxnID;
+import org.apache.pulsar.client.impl.ConnectionPool;
+import org.apache.pulsar.client.impl.transaction.TransactionBufferHandler;
+import org.apache.pulsar.common.api.proto.PulsarApi;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * The implementation of {@link TransactionBufferClient}.
+ */
+@Slf4j
+public class TransactionBufferClientImpl implements TransactionBufferClient {
+
+ private final TransactionBufferHandler tbHandler;
+
+ private TransactionBufferClientImpl(TransactionBufferHandler tbHandler) {
+ this.tbHandler = tbHandler;
+ }
+
+ public static TransactionBufferClient create(NamespaceService
namespaceService, ConnectionPool connectionPool) {
+ TransactionBufferHandler handler = new
TransactionBufferHandlerImpl(connectionPool, namespaceService);
+ return new TransactionBufferClientImpl(handler);
+ }
+
+ @Override
+ public CompletableFuture<TxnID> commitTxnOnTopic(String topic, long
txnIdMostBits, long txnIdLeastBits) {
+ return tbHandler.endTxnOnTopic(topic, txnIdMostBits, txnIdLeastBits,
PulsarApi.TxnAction.COMMIT);
+ }
+
+ @Override
+ public CompletableFuture<TxnID> abortTxnOnTopic(String topic, long
txnIdMostBits, long txnIdLeastBits) {
+ return tbHandler.endTxnOnTopic(topic, txnIdMostBits, txnIdLeastBits,
PulsarApi.TxnAction.ABORT);
+ }
+
+ @Override
+ public CompletableFuture<TxnID> commitTxnOnSubscription(String topic,
String subscription, long txnIdMostBits, long txnIdLeastBits) {
+ return tbHandler.endTxnOnSubscription(topic, subscription,
txnIdMostBits, txnIdLeastBits, PulsarApi.TxnAction.COMMIT);
+ }
+
+ @Override
+ public CompletableFuture<TxnID> abortTxnOnSubscription(String topic,
String subscription, long txnIdMostBits, long txnIdLeastBits) {
+ return tbHandler.endTxnOnSubscription(topic, subscription,
txnIdMostBits, txnIdLeastBits, PulsarApi.TxnAction.ABORT);
+ }
+
+}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferHandlerImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferHandlerImpl.java
new file mode 100644
index 0000000..62c815f
--- /dev/null
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferHandlerImpl.java
@@ -0,0 +1,302 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction.buffer.impl;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.util.HashedWheelTimer;
+import io.netty.util.Recycler;
+import io.netty.util.ReferenceCountUtil;
+import io.netty.util.Timeout;
+import io.netty.util.TimerTask;
+import io.netty.util.concurrent.DefaultThreadFactory;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.namespace.NamespaceService;
+import
org.apache.pulsar.client.api.transaction.TransactionBufferClientException;
+import org.apache.pulsar.client.api.transaction.TxnID;
+import org.apache.pulsar.client.impl.ClientCnx;
+import org.apache.pulsar.client.impl.ConnectionPool;
+import org.apache.pulsar.common.api.proto.PulsarApi;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.protocol.Commands;
+import org.apache.pulsar.client.impl.transaction.TransactionBufferHandler;
+import org.apache.pulsar.common.util.FutureUtil;
+
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+@Slf4j
+public class TransactionBufferHandlerImpl implements TransactionBufferHandler,
TimerTask {
+
+ private final ConcurrentSkipListMap<Long, OpRequestSend> pendingRequests;
+ private final ConnectionPool connectionPool;
+ private final NamespaceService namespaceService;
+ private final AtomicLong requestIdGenerator = new AtomicLong();
+ private long operationTimeoutInMills;
+ private Timeout requestTimeout;
+ private HashedWheelTimer timer;
+ private final Semaphore semaphore;
+ private final boolean blockIfReachMaxPendingOps;
+
+ public TransactionBufferHandlerImpl(ConnectionPool connectionPool,
NamespaceService namespaceService) {
+ this.connectionPool = connectionPool;
+ this.pendingRequests = new ConcurrentSkipListMap<>();
+ this.namespaceService = namespaceService;
+ this.operationTimeoutInMills = 3000L;
+ this.semaphore = new Semaphore(10000);
+ this.blockIfReachMaxPendingOps = true;
+ this.timer = new HashedWheelTimer(new
DefaultThreadFactory("pulsar-transaction-buffer-client-timer"));
+ this.requestTimeout = timer.newTimeout(this, operationTimeoutInMills,
TimeUnit.MILLISECONDS);
+ }
+
+ @Override
+ public CompletableFuture<TxnID> endTxnOnTopic(String topic, long
txnIdMostBits, long txnIdLeastBits, PulsarApi.TxnAction action) {
+ CompletableFuture<TxnID> cb = new CompletableFuture<>();
+ if (!canSendRequest(cb)) {
+ return cb;
+ }
+ long requestId = requestIdGenerator.getAndIncrement();
+ ByteBuf cmd = Commands.newEndTxnOnPartition(requestId, txnIdLeastBits,
txnIdMostBits, topic, action);
+ OpRequestSend op = OpRequestSend.create(requestId, topic, cmd, cb);
+ pendingRequests.put(requestId, op);
+ cmd.retain();
+ cnx(topic).whenComplete((clientCnx, throwable) -> {
+ if (throwable == null) {
+ try {
+ clientCnx.ctx().writeAndFlush(cmd,
clientCnx.ctx().voidPromise());
+ } catch (Exception e) {
+ cb.completeExceptionally(e);
+ pendingRequests.remove(requestId);
+ op.recycle();
+ }
+ } else {
+ cb.completeExceptionally(throwable);
+ pendingRequests.remove(requestId);
+ op.recycle();
+ }
+ });
+ return cb;
+ }
+
+ @Override
+ public CompletableFuture<TxnID> endTxnOnSubscription(String topic, String
subscription, long txnIdMostBits, long txnIdLeastBits, PulsarApi.TxnAction
action) {
+ CompletableFuture<TxnID> cb = new CompletableFuture<>();
+ if (!canSendRequest(cb)) {
+ return cb;
+ }
+ long requestId = requestIdGenerator.getAndIncrement();
+ ByteBuf cmd = Commands.newEndTxnOnSubscription(requestId,
txnIdLeastBits, txnIdMostBits, topic, subscription, action);
+ OpRequestSend op = OpRequestSend.create(requestId, topic, cmd, cb);
+ pendingRequests.put(requestId, op);
+ cmd.retain();
+ cnx(topic).whenComplete((clientCnx, throwable) -> {
+ if (throwable == null) {
+ try {
+ clientCnx.ctx().writeAndFlush(cmd,
clientCnx.ctx().voidPromise());
+ } catch (Exception e) {
+ cb.completeExceptionally(e);
+ pendingRequests.remove(requestId);
+ op.recycle();
+ }
+ } else {
+ cb.completeExceptionally(throwable);
+ pendingRequests.remove(requestId);
+ op.recycle();
+ }
+ });
+ return cb;
+ }
+
+ @Override
+ public void handleEndTxnOnTopicResponse(long requestId,
PulsarApi.CommandEndTxnOnPartitionResponse response) {
+ OpRequestSend op = pendingRequests.remove(requestId);
+ if (op == null) {
+ if (log.isDebugEnabled()) {
+ log.debug("Got end txn on topic response for timeout {} - {}",
response.getTxnidMostBits(),
+ response.getTxnidLeastBits());
+ }
+ return;
+ }
+
+ if (!response.hasError()) {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Got end txn on topic response for for request
{}", op.topic, response.getRequestId());
+ }
+ op.cb.complete(new TxnID(response.getTxnidMostBits(),
response.getTxnidLeastBits()));
+ } else {
+ log.error("[{}] Got end txn on topic response for request {} error
{}", op.topic, response.getRequestId(), response.getError());
+ op.cb.completeExceptionally(getException(response.getError(),
response.getMessage()));
+ }
+ op.recycle();
+ }
+
+ @Override
+ public void handleEndTxnOnSubscriptionResponse(long requestId,
PulsarApi.CommandEndTxnOnSubscriptionResponse response) {
+ OpRequestSend op = pendingRequests.remove(requestId);
+ if (op == null) {
+ if (log.isDebugEnabled()) {
+ log.debug("Got end txn on subscription response for timeout {}
- {}", response.getTxnidMostBits(),
+ response.getTxnidLeastBits());
+ }
+ return;
+ }
+
+ if (!response.hasError()) {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Got end txn on subscription response for for
request {}", op.topic, response.getRequestId());
+ }
+ op.cb.complete(new TxnID(response.getTxnidMostBits(),
response.getTxnidLeastBits()));
+ } else {
+ log.error("[{}] Got end txn on subscription response for request
{} error {}", op.topic, response.getRequestId(), response.getError());
+ op.cb.completeExceptionally(getException(response.getError(),
response.getMessage()));
+ }
+ op.recycle();
+ }
+
+ private CompletableFuture<ClientCnx> cnx(String topic) {
+ return getServiceUrl(topic).thenCompose(serviceUrl -> {
+ try {
+ if (serviceUrl == null) {
+ return CompletableFuture.completedFuture(null);
+ }
+ URI uri = new URI(serviceUrl);
+ return
connectionPool.getConnection(InetSocketAddress.createUnresolved(uri.getHost(),
uri.getPort())).thenCompose(clientCnx -> {
+
clientCnx.registerTransactionBufferHandler(TransactionBufferHandlerImpl.this);
+ return CompletableFuture.completedFuture(clientCnx);
+ });
+ } catch (Exception e) {
+ return FutureUtil.failedFuture(e);
+ }
+ });
+ }
+
+ private CompletableFuture<String> getServiceUrl(String topic) {
+ TopicName topicName = TopicName.get(topic);
+ return namespaceService.getBundleAsync(topicName)
+ .thenCompose(namespaceService::getOwnerAsync)
+ .thenCompose(ned -> {
+ String serviceUrl = null;
+ if (ned.isPresent()) {
+ serviceUrl = ned.get().getNativeUrl();
+ }
+ return CompletableFuture.completedFuture(serviceUrl);
+ });
+ }
+
+ private TransactionBufferClientException
getException(PulsarApi.ServerError serverError, String msg) {
+ return new TransactionBufferClientException(msg);
+ }
+
+ private boolean canSendRequest(CompletableFuture<?> callback) {
+ try {
+ if (blockIfReachMaxPendingOps) {
+ semaphore.acquire();
+ } else {
+ if (!semaphore.tryAcquire()) {
+ callback.completeExceptionally(new
TransactionBufferClientException("Reach max pending ops."));
+ return false;
+ }
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+
callback.completeExceptionally(TransactionBufferClientException.unwrap(e));
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public void run(Timeout timeout) throws Exception {
+ if (timeout.isCancelled()) {
+ return;
+ }
+ long timeToWaitMs;
+ OpRequestSend peeked = null;
+ Map.Entry<Long, OpRequestSend> firstEntry =
pendingRequests.firstEntry();
+ peeked = firstEntry == null ? null : firstEntry.getValue();
+ while (peeked != null && peeked.createdAt + operationTimeoutInMills -
System.currentTimeMillis() <= 0) {
+ if (!peeked.cb.isDone()) {
+ peeked.cb.completeExceptionally(new
TransactionBufferClientException.RequestTimeoutException());
+ onResponse(peeked);
+ } else {
+ break;
+ }
+ firstEntry = pendingRequests.firstEntry();
+ peeked = firstEntry == null ? null : firstEntry.getValue();
+ }
+ if (peeked == null) {
+ timeToWaitMs = operationTimeoutInMills;
+ } else {
+ long diff = (peeked.createdAt + operationTimeoutInMills) -
System.currentTimeMillis();
+ if (diff <= 0) {
+ timeToWaitMs = operationTimeoutInMills;
+ } else {
+ timeToWaitMs = diff;
+ }
+ }
+ requestTimeout = timer.newTimeout(this, timeToWaitMs,
TimeUnit.MILLISECONDS);
+ }
+
+ void onResponse(OpRequestSend op) {
+ ReferenceCountUtil.safeRelease(op.byteBuf);
+ op.recycle();
+ semaphore.release();
+ }
+
+ private static final class OpRequestSend {
+
+ long requestId;
+ String topic;
+ ByteBuf byteBuf;
+ CompletableFuture<TxnID> cb;
+ long createdAt;
+
+ static OpRequestSend create(long requestId, String topic, ByteBuf
byteBuf, CompletableFuture<TxnID> cb) {
+ OpRequestSend op = RECYCLER.get();
+ op.requestId = requestId;
+ op.topic = topic;
+ op.byteBuf = byteBuf;
+ op.cb = cb;
+ op.createdAt = System.nanoTime();
+ return op;
+ }
+
+ void recycle() {
+ recyclerHandle.recycle(this);
+ }
+
+ private OpRequestSend(Recycler.Handle<OpRequestSend> recyclerHandle) {
+ this.recyclerHandle = recyclerHandle;
+ }
+
+ private final Recycler.Handle<OpRequestSend> recyclerHandle;
+
+ private static final Recycler<OpRequestSend> RECYCLER = new
Recycler<OpRequestSend>() {
+ @Override
+ protected OpRequestSend newObject(Handle<OpRequestSend> handle) {
+ return new OpRequestSend(handle);
+ }
+ };
+ }
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientTest.java
new file mode 100644
index 0000000..932270a
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientTest.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction.buffer;
+
+import com.google.common.collect.Sets;
+import
org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferClientImpl;
+import
org.apache.pulsar.broker.transaction.coordinator.TransactionMetaStoreTestBase;
+import org.apache.pulsar.client.api.transaction.TransactionBufferClient;
+import org.apache.pulsar.client.api.transaction.TxnID;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+public class TransactionBufferClientTest extends TransactionMetaStoreTestBase {
+
+ private static final Logger log =
LoggerFactory.getLogger(TransactionBufferClientTest.class);
+ private TransactionBufferClient tbClient;
+ TopicName partitionedTopicName = TopicName.get("persistent", "public",
"test", "tb-client");
+ int partitions = 10;
+
+ @BeforeClass
+ void init() throws Exception {
+ super.setup();
+ pulsarAdmins[0].clusters().createCluster("my-cluster", new
ClusterData(pulsarServices[0].getWebServiceAddress()));
+ pulsarAdmins[0].tenants().createTenant("public", new
TenantInfo(Sets.newHashSet(), Sets.newHashSet("my-cluster")));
+ pulsarAdmins[0].namespaces().createNamespace("public/test", 10);
+
pulsarAdmins[0].topics().createPartitionedTopic(partitionedTopicName.getPartitionedTopicName(),
partitions);
+ pulsarClient.newConsumer()
+ .topic(partitionedTopicName.getPartitionedTopicName())
+ .subscriptionName("test").subscribe();
+ tbClient =
TransactionBufferClientImpl.create(pulsarServices[0].getNamespaceService(),
+ ((PulsarClientImpl) pulsarClient).getCnxPool());
+ }
+
+ @Test
+ public void testCommitOnTopic() throws ExecutionException,
InterruptedException {
+ List<CompletableFuture<TxnID>> futures = new ArrayList<>();
+ for (int i = 0; i < partitions; i++) {
+ String topic = partitionedTopicName.getPartition(i).toString();
+ futures.add(tbClient.commitTxnOnTopic(topic, 1L, i));
+ }
+ for (int i = 0; i < futures.size(); i++) {
+ Assert.assertEquals(futures.get(i).get().getMostSigBits(), 1L);
+ Assert.assertEquals(futures.get(i).get().getLeastSigBits(), i);
+ }
+ }
+
+ @Test
+ public void testAbortOnTopic() throws ExecutionException,
InterruptedException {
+ List<CompletableFuture<TxnID>> futures = new ArrayList<>();
+ for (int i = 0; i < partitions; i++) {
+ String topic = partitionedTopicName.getPartition(i).toString();
+ futures.add(tbClient.abortTxnOnTopic(topic, 1L, i));
+ }
+ for (int i = 0; i < futures.size(); i++) {
+ Assert.assertEquals(futures.get(i).get().getMostSigBits(), 1L);
+ Assert.assertEquals(futures.get(i).get().getLeastSigBits(), i);
+ }
+ }
+
+ @Test
+ public void testCommitOnSubscription() throws ExecutionException,
InterruptedException {
+ List<CompletableFuture<TxnID>> futures = new ArrayList<>();
+ for (int i = 0; i < partitions; i++) {
+ String topic = partitionedTopicName.getPartition(i).toString();
+ futures.add(tbClient.commitTxnOnSubscription(topic, "test", 1L,
i));
+ }
+ for (int i = 0; i < futures.size(); i++) {
+ Assert.assertEquals(futures.get(i).get().getMostSigBits(), 1L);
+ Assert.assertEquals(futures.get(i).get().getLeastSigBits(), i);
+ }
+ }
+
+ @Test
+ public void testAbortOnSubscription() throws ExecutionException,
InterruptedException {
+ List<CompletableFuture<TxnID>> futures = new ArrayList<>();
+ for (int i = 0; i < partitions; i++) {
+ String topic = partitionedTopicName.getPartition(i).toString();
+ futures.add(tbClient.abortTxnOnSubscription(topic, "test", 1L, i));
+ }
+ for (int i = 0; i < futures.size(); i++) {
+ Assert.assertEquals(futures.get(i).get().getMostSigBits(), 1L);
+ Assert.assertEquals(futures.get(i).get().getLeastSigBits(), i);
+ }
+ }
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionCoordinatorClientTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionCoordinatorClientTest.java
index 9880c46..b751461 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionCoordinatorClientTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionCoordinatorClientTest.java
@@ -24,10 +24,16 @@ import
org.apache.pulsar.client.api.transaction.TransactionCoordinatorClient.Sta
import
org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
public class TransactionCoordinatorClientTest extends
TransactionMetaStoreTestBase {
+ @BeforeClass
+ public void init() throws Exception {
+ super.setup();
+ }
+
@Test
public void testClientStart() throws PulsarClientException,
TransactionCoordinatorClientException, InterruptedException {
try {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionMetaStoreAssignmentTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionMetaStoreAssignmentTest.java
index 8e1ee21..362043f 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionMetaStoreAssignmentTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionMetaStoreAssignmentTest.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.broker.transaction.coordinator;
import org.apache.pulsar.broker.PulsarService;
import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import java.io.IOException;
@@ -28,6 +29,11 @@ import java.util.List;
public class TransactionMetaStoreAssignmentTest extends
TransactionMetaStoreTestBase {
+ @BeforeClass
+ public void init() throws Exception {
+ super.setup();
+ }
+
@Test
public void testTransactionMetaStoreAssignAndFailover() throws
IOException, InterruptedException {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionMetaStoreTestBase.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionMetaStoreTestBase.java
index 325bf6c..79b383b 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionMetaStoreTestBase.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionMetaStoreTestBase.java
@@ -38,14 +38,14 @@ public class TransactionMetaStoreTestBase {
LocalBookkeeperEnsemble bkEnsemble;
protected PulsarAdmin[] pulsarAdmins = new PulsarAdmin[BROKER_COUNT];
+ protected PulsarClient pulsarClient;
protected static final int BROKER_COUNT = 5;
protected ServiceConfiguration[] configurations = new
ServiceConfiguration[BROKER_COUNT];
protected PulsarService[] pulsarServices = new PulsarService[BROKER_COUNT];
protected TransactionCoordinatorClient transactionCoordinatorClient;
- @BeforeClass
- void setup() throws Exception {
+ protected void setup() throws Exception {
log.info("---- Initializing SLAMonitoringTest -----");
// Start local bookkeeper ensemble
bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> 0);
@@ -80,10 +80,10 @@ public class TransactionMetaStoreTestBase {
Thread.sleep(100);
- PulsarClient client = PulsarClient.builder().
+ pulsarClient = PulsarClient.builder().
serviceUrl(pulsarServices[0].getBrokerServiceUrl())
.build();
- transactionCoordinatorClient = new
TransactionCoordinatorClientImpl(client);
+ transactionCoordinatorClient = new
TransactionCoordinatorClientImpl(pulsarClient);
transactionCoordinatorClient.start();
Thread.sleep(3000);
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionBufferClient.java
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/transaction/TransactionBufferClient.java
similarity index 83%
rename from
pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionBufferClient.java
rename to
pulsar-client-api/src/main/java/org/apache/pulsar/client/api/transaction/TransactionBufferClient.java
index cd1a675..523773a 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionBufferClient.java
+++
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/transaction/TransactionBufferClient.java
@@ -16,12 +16,13 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.client.impl.transaction;
+package org.apache.pulsar.client.api.transaction;
import java.util.concurrent.CompletableFuture;
/**
- * The transaction buffer client to commit and abort transactions on topics.
+ * The transaction buffer client to commit and abort transactions on topics or
subscription.
+ * The transaction buffer client is used by transaction coordinator to end
transactions.
*/
public interface TransactionBufferClient {
@@ -33,9 +34,9 @@ public interface TransactionBufferClient {
* @param txnIdLeastBits the least bits of txn id
* @return the future represents the commit result
*/
- CompletableFuture<Void> commitTxnOnTopic(String topic,
- long txnIdMostBits,
- long txnIdLeastBits);
+ CompletableFuture<TxnID> commitTxnOnTopic(String topic,
+ long txnIdMostBits,
+ long txnIdLeastBits);
/**
* Abort the transaction associated with the topic.
@@ -45,7 +46,7 @@ public interface TransactionBufferClient {
* @param txnIdLeastBits the least bits of txn id
* @return the future represents the abort result
*/
- CompletableFuture<Void> abortTxnOnTopic(String topic,
+ CompletableFuture<TxnID> abortTxnOnTopic(String topic,
long txnIdMostBits,
long txnIdLeastBits);
@@ -58,7 +59,7 @@ public interface TransactionBufferClient {
* @param txnIdLeastBits the least bits of txn id
* @return the future represents the commit result
*/
- CompletableFuture<Void> commitTxnOnSubscription(String topic,
+ CompletableFuture<TxnID> commitTxnOnSubscription(String topic,
String subscription,
long txnIdMostBits,
long txnIdLeastBits);
@@ -72,7 +73,7 @@ public interface TransactionBufferClient {
* @param txnIdLeastBits the least bits of txn id
* @return the future represents the abort result
*/
- CompletableFuture<Void> abortTxnOnSubscription(String topic,
+ CompletableFuture<TxnID> abortTxnOnSubscription(String topic,
String subscription,
long txnIdMostBits,
long txnIdLeastBits);
diff --git
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/transaction/TransactionBufferClientException.java
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/transaction/TransactionBufferClientException.java
new file mode 100644
index 0000000..40510da
--- /dev/null
+++
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/transaction/TransactionBufferClientException.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api.transaction;
+
+import java.io.IOException;
+import java.util.concurrent.ExecutionException;
+
+/**
+ * Exceptions for transaction buffer client.
+ */
+public class TransactionBufferClientException extends IOException {
+
+ public TransactionBufferClientException(Throwable t) {
+ super(t);
+ }
+
+ public TransactionBufferClientException(String message) {
+ super(message);
+ }
+
+ /**
+ * Thrown when operation timeout.
+ */
+ public static class RequestTimeoutException extends
TransactionBufferClientException {
+
+ public RequestTimeoutException() {
+ super("Transaction buffer request timeout.");
+ }
+
+ public RequestTimeoutException(String message) {
+ super(message);
+ }
+ }
+
+ public static TransactionBufferClientException unwrap(Throwable t) {
+ if (t instanceof TransactionBufferClientException) {
+ return (TransactionBufferClientException) t;
+ } else if (t instanceof RuntimeException) {
+ throw (RuntimeException) t;
+ } else if (t instanceof InterruptedException) {
+ Thread.currentThread().interrupt();
+ return new TransactionBufferClientException(t);
+ } else if (!(t instanceof ExecutionException)) {
+ // Generic exception
+ return new TransactionBufferClientException(t);
+ }
+
+ Throwable cause = t.getCause();
+ String msg = cause.getMessage();
+
+ if (cause instanceof
TransactionBufferClientException.RequestTimeoutException) {
+ return new
TransactionBufferClientException.RequestTimeoutException(msg);
+ } else {
+ return new TransactionBufferClientException(t);
+ }
+
+ }
+}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index ea1624b..28c880f 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -89,6 +89,7 @@ import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.protocol.schema.SchemaInfoUtil;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
+import org.apache.pulsar.client.impl.transaction.TransactionBufferHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -147,6 +148,7 @@ public class ClientCnx extends PulsarHandler {
// Added for mutual authentication.
protected AuthenticationDataProvider authenticationDataProvider;
+ private TransactionBufferHandler transactionBufferHandler;
enum State {
None, SentConnectFrame, Ready, Failed, Connecting
@@ -866,6 +868,22 @@ public class ClientCnx extends PulsarHandler {
}
@Override
+ protected void
handleEndTxnOnPartitionResponse(PulsarApi.CommandEndTxnOnPartitionResponse
command) {
+ TransactionBufferHandler handler =
checkAndGetTransactionBufferHandler();
+ if (handler != null) {
+ handler.handleEndTxnOnTopicResponse(command.getRequestId(),
command);
+ }
+ }
+
+ @Override
+ protected void
handleEndTxnOnSubscriptionResponse(PulsarApi.CommandEndTxnOnSubscriptionResponse
command) {
+ TransactionBufferHandler handler =
checkAndGetTransactionBufferHandler();
+ if (handler != null) {
+ handler.handleEndTxnOnSubscriptionResponse(command.getRequestId(),
command);
+ }
+ }
+
+ @Override
protected void handleEndTxnResponse(PulsarApi.CommandEndTxnResponse
command) {
TransactionMetaStoreHandler handler =
checkAndGetTransactionMetaStoreHandler(command.getTxnidMostBits());
if (handler != null) {
@@ -882,6 +900,14 @@ public class ClientCnx extends PulsarHandler {
return handler;
}
+ private TransactionBufferHandler checkAndGetTransactionBufferHandler() {
+ if (transactionBufferHandler == null) {
+ channel().close();
+ log.warn("Close the channel since can't get the transaction buffer
handler.");
+ }
+ return transactionBufferHandler;
+ }
+
/**
* check serverError and take appropriate action
* <ul>
@@ -956,6 +982,10 @@ public class ClientCnx extends PulsarHandler {
transactionMetaStoreHandlers.put(transactionMetaStoreId, handler);
}
+ public void registerTransactionBufferHandler(final
TransactionBufferHandler handler) {
+ transactionBufferHandler = handler;
+ }
+
void removeProducer(final long producerId) {
producers.remove(producerId);
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionBufferClientImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionBufferClientImpl.java
deleted file mode 100644
index fa0941f..0000000
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionBufferClientImpl.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.client.impl.transaction;
-
-import java.util.concurrent.CompletableFuture;
-import org.apache.pulsar.client.impl.PulsarClientImpl;
-import org.apache.pulsar.common.util.FutureUtil;
-
-/**
- * The implementation of {@link TransactionBufferClient}.
- */
-public class TransactionBufferClientImpl implements TransactionBufferClient {
-
- private final PulsarClientImpl client;
-
- public TransactionBufferClientImpl(PulsarClientImpl client) {
- this.client = client;
- }
-
- @Override
- public CompletableFuture<Void> commitTxnOnTopic(String topic, long
txnIdMostBits, long txnIdLeastBits) {
- return FutureUtil.failedFuture(
- new UnsupportedOperationException("Not Implemented Yet"));
- }
-
- @Override
- public CompletableFuture<Void> abortTxnOnTopic(String topic, long
txnIdMostBits, long txnIdLeastBits) {
- return FutureUtil.failedFuture(
- new UnsupportedOperationException("Not Implemented Yet"));
- }
-
- @Override
- public CompletableFuture<Void> commitTxnOnSubscription(String topic,
String subscription, long txnIdMostBits, long txnIdLeastBits) {
- return FutureUtil.failedFuture(
- new UnsupportedOperationException("Not Implemented Yet"));
- }
-
- @Override
- public CompletableFuture<Void> abortTxnOnSubscription(String topic, String
subscription, long txnIdMostBits, long txnIdLeastBits) {
- return FutureUtil.failedFuture(
- new UnsupportedOperationException("Not Implemented Yet"));
- }
-}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionBufferHandler.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionBufferHandler.java
new file mode 100644
index 0000000..75c1ff1
--- /dev/null
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionBufferHandler.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.transaction;
+
+import org.apache.pulsar.client.api.transaction.TxnID;
+import org.apache.pulsar.common.api.proto.PulsarApi;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Interface of transaction buffer handler.
+ */
+public interface TransactionBufferHandler {
+
+ /**
+ * End transaction on topic.
+ * @param topic topic name
+ * @param txnIdMostBits txnIdMostBits
+ * @param txnIdLeastBits txnIdLeastBits
+ * @param action transaction action type
+ * @return TxnId
+ */
+ CompletableFuture<TxnID> endTxnOnTopic(String topic, long txnIdMostBits,
long txnIdLeastBits,
+ PulsarApi.TxnAction action);
+
+ /**
+ * End transaction on subscription.
+ * @param topic topic name
+ * @param subscription subscription name
+ * @param txnIdMostBits txnIdMostBits
+ * @param txnIdLeastBits txnIdLeastBits
+ * @param action transaction action type
+ * @return TxnId
+ */
+ CompletableFuture<TxnID> endTxnOnSubscription(String topic, String
subscription, long txnIdMostBits,
+ long txnIdLeastBits, PulsarApi.TxnAction action);
+
+ /**
+ * Handle response of end transaction on topic.
+ * @param requestId request ID
+ * @param response response
+ */
+ void handleEndTxnOnTopicResponse(long requestId,
PulsarApi.CommandEndTxnOnPartitionResponse response);
+
+ /**
+ * Handle response of tend transaction on subscription
+ * @param requestId request ID
+ * @param response response
+ */
+ void handleEndTxnOnSubscriptionResponse(long requestId,
PulsarApi.CommandEndTxnOnSubscriptionResponse response);
+}
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
index d867139..16da307 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
@@ -1431,6 +1431,16 @@ public class Commands {
return res;
}
+ public static ByteBuf newEndTxnOnSubscription(long requestId, long
txnIdLeastBits, long txnIdMostBits, String topic,
+ String subscription,
TxnAction txnAction) {
+ Subscription.Builder builder = Subscription.newBuilder();
+ builder.setTopic(topic);
+ builder.setSubscription(subscription);
+ Subscription sub = builder.build();
+ builder.recycle();
+ return newEndTxnOnSubscription(requestId, txnIdLeastBits,
txnIdMostBits, sub, txnAction);
+ }
+
public static ByteBuf newEndTxnOnSubscription(long requestId, long
txnIdLeastBits, long txnIdMostBits,
Subscription subscription,
TxnAction txnAction) {
CommandEndTxnOnSubscription commandEndTxnOnSubscription =
@@ -1443,6 +1453,7 @@ public class Commands {
.build();
ByteBuf res =
serializeWithSize(BaseCommand.newBuilder().setType(Type.END_TXN_ON_SUBSCRIPTION)
.setEndTxnOnSubscription(commandEndTxnOnSubscription));
+ subscription.recycle();
commandEndTxnOnSubscription.recycle();
return res;
}