eolivelli commented on code in PR #15015:
URL: https://github.com/apache/pulsar/pull/15015#discussion_r857148325
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBaseTxnBufferSnapshotService.java:
##########
@@ -18,69 +18,163 @@
*/
package org.apache.pulsar.broker.service;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.systopic.NamespaceEventsSystemTopicFactory;
import org.apache.pulsar.broker.systopic.SystemTopicClient;
import org.apache.pulsar.broker.systopic.SystemTopicClient.Reader;
import org.apache.pulsar.broker.systopic.SystemTopicClient.Writer;
import org.apache.pulsar.broker.systopic.TransactionBufferSystemTopicClient;
import
org.apache.pulsar.broker.transaction.buffer.matadata.TransactionBufferSnapshot;
import org.apache.pulsar.client.api.PulsarClient;
-import
org.apache.pulsar.client.api.PulsarClientException.InvalidTopicNameException;
-import org.apache.pulsar.common.events.EventType;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.FutureUtil;
+@Slf4j
public class SystemTopicBaseTxnBufferSnapshotService implements
TransactionBufferSnapshotService {
- private final Map<TopicName, SystemTopicClient<TransactionBufferSnapshot>>
clients;
+ private final Map<NamespaceName,
SystemTopicClient<TransactionBufferSnapshot>> clients;
private final NamespaceEventsSystemTopicFactory
namespaceEventsSystemTopicFactory;
+ private final HashMap<NamespaceName, ReferenceCountedWriter> writerMap;
+ private final LinkedList<Writer<TransactionBufferSnapshot>>
pendingCloseWriterList;
+
+ // The class ReferenceCountedWriter will maintain the reference count,
+ // when the reference count decrement to 0, it will be removed from
writerFutureMap, the writer will be closed.
+ public static class ReferenceCountedWriter {
+
+ private final AtomicLong referenceCount;
+ private final NamespaceName namespaceName;
+ private final CompletableFuture<Writer<TransactionBufferSnapshot>>
future;
+
+ public ReferenceCountedWriter(NamespaceName namespaceName,
+
CompletableFuture<Writer<TransactionBufferSnapshot>> future,
+ HashMap<NamespaceName,
ReferenceCountedWriter> writerMap) {
+ this.referenceCount = new AtomicLong(1);
+ this.namespaceName = namespaceName;
+ this.future = future;
+ this.future.exceptionally(t -> {
+ log.error("[{}] Failed to create transaction buffer snapshot
writer.", namespaceName, t);
+ writerMap.remove(namespaceName, this);
+ return null;
+ });
+ }
+
+ public CompletableFuture<Writer<TransactionBufferSnapshot>>
getFuture() {
+ return future;
+ }
+
+ private void retain() {
+ operationValidate(true);
+ this.referenceCount.incrementAndGet();
+ }
+
+ private long release() {
+ operationValidate(false);
+ return this.referenceCount.decrementAndGet();
+ }
+
+ private void operationValidate(boolean isRetain) {
+ if (this.referenceCount.get() == 0) {
+ throw new RuntimeException(
+ "[" + namespaceName + "] The reference counted
transaction buffer snapshot writer couldn't "
+ + "be " + (isRetain ? "retained" : "released")
+ ", refCnt is 0.");
+ }
+ }
+
+ }
+
public SystemTopicBaseTxnBufferSnapshotService(PulsarClient client) {
this.namespaceEventsSystemTopicFactory = new
NamespaceEventsSystemTopicFactory(client);
this.clients = new ConcurrentHashMap<>();
+ this.writerMap = new HashMap<>();
+ this.pendingCloseWriterList = new LinkedList<>();
}
@Override
public CompletableFuture<Writer<TransactionBufferSnapshot>>
createWriter(TopicName topicName) {
- return
getTransactionBufferSystemTopicClient(topicName).thenCompose(SystemTopicClient::newWriterAsync);
+ if (topicName == null) {
+ return FutureUtil.failedFuture(
+ new PulsarClientException.InvalidTopicNameException(
+ "Can't create
SystemTopicBaseTxnBufferSnapshotService, because the topicName is null!"));
+ }
+ return
getTransactionBufferSystemTopicClient(topicName.getNamespaceObject()).newWriterAsync();
}
- private CompletableFuture<SystemTopicClient<TransactionBufferSnapshot>>
getTransactionBufferSystemTopicClient(
- TopicName topicName) {
- TopicName systemTopicName = NamespaceEventsSystemTopicFactory
- .getSystemTopicName(topicName.getNamespaceObject(),
EventType.TRANSACTION_BUFFER_SNAPSHOT);
- if (systemTopicName == null) {
- return FutureUtil.failedFuture(
- new InvalidTopicNameException("Can't create
SystemTopicBaseTxnBufferSnapshotService, "
- + "because the topicName is null!"));
+ @Override
+ public synchronized ReferenceCountedWriter
getReferenceWriter(NamespaceName namespaceName) {
+ AtomicBoolean exitingFlag = new AtomicBoolean(false);
+ ReferenceCountedWriter referenceCountedWriter =
writerMap.compute(namespaceName, (k, v) -> {
+ if (v == null) {
+ return new ReferenceCountedWriter(namespaceName,
+
getTransactionBufferSystemTopicClient(namespaceName).newWriterAsync(),
writerMap);
+ }
+ exitingFlag.set(true);
+ return v;
+ });
+ if (exitingFlag.get()) {
+ referenceCountedWriter.retain();
+ }
+ return referenceCountedWriter;
+ }
+
+ @Override
+ public synchronized void releaseReferenceWriter(ReferenceCountedWriter
referenceCountedWriter) {
+ if (referenceCountedWriter.release() == 0) {
+ writerMap.remove(referenceCountedWriter.namespaceName,
referenceCountedWriter);
+ referenceCountedWriter.future.thenAccept(writer -> {
+ pendingCloseWriterList.add(writer);
+ closePendingCloseWriter();
+ });
}
- return
CompletableFuture.completedFuture(clients.computeIfAbsent(systemTopicName,
+ }
+
+ private SystemTopicClient<TransactionBufferSnapshot>
getTransactionBufferSystemTopicClient(
+ NamespaceName namespaceName) {
+ return clients.computeIfAbsent(namespaceName,
(v) -> namespaceEventsSystemTopicFactory
-
.createTransactionBufferSystemTopicClient(topicName.getNamespaceObject(),
this)));
+
.createTransactionBufferSystemTopicClient(namespaceName, this));
}
@Override
public CompletableFuture<Reader<TransactionBufferSnapshot>>
createReader(TopicName topicName) {
- return
getTransactionBufferSystemTopicClient(topicName).thenCompose(SystemTopicClient::newReaderAsync);
+ return
getTransactionBufferSystemTopicClient(topicName.getNamespaceObject()).newReaderAsync();
}
@Override
public void removeClient(TopicName topicName,
TransactionBufferSystemTopicClient
transactionBufferSystemTopicClient) {
if (transactionBufferSystemTopicClient.getReaders().size() == 0
&& transactionBufferSystemTopicClient.getWriters().size() ==
0) {
- clients.remove(topicName);
+ clients.remove(topicName.getNamespaceObject());
}
}
@Override
public void close() throws Exception {
- for (Map.Entry<TopicName,
SystemTopicClient<TransactionBufferSnapshot>> entry : clients.entrySet()) {
+ for (Map.Entry<NamespaceName,
SystemTopicClient<TransactionBufferSnapshot>> entry : clients.entrySet()) {
entry.getValue().close();
}
}
+
+ private void closePendingCloseWriter() {
+ Iterator<Writer<TransactionBufferSnapshot>> iterator =
pendingCloseWriterList.stream().iterator();
Review Comment:
Nit: why are you creating the stream?
We can iterate on the LinkedList directly, creating less garbage and also
making the code simpler
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]