tisonkun commented on code in PR #5680:
URL: https://github.com/apache/pulsar/pull/5680#discussion_r982683198


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionCoordinatorClientImpl.java:
##########
@@ -18,42 +18,209 @@
  */
 package org.apache.pulsar.client.impl.transaction;
 
-import java.util.concurrent.CompletableFuture;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClient;
+import 
org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException;
+import 
org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException.CoordinatorClientStateException;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.client.impl.TransactionMetaStoreHandler;
+import org.apache.pulsar.client.util.MathUtils;
+import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
+import org.apache.pulsar.transaction.impl.common.TxnID;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 
 /**
- * The implementation of {@link TransactionCoordinatorClient}.
+ * Transaction coordinator client based topic assigned.
  */
 public class TransactionCoordinatorClientImpl implements 
TransactionCoordinatorClient {
 
-    private final PulsarClientImpl client;
+    private static final Logger LOG = 
LoggerFactory.getLogger(TransactionCoordinatorClientImpl.class);
+
+    private final PulsarClientImpl pulsarClient;
+    private TransactionMetaStoreHandler[] handlers;
+    private ConcurrentLongHashMap<TransactionMetaStoreHandler> handlerMap = 
new ConcurrentLongHashMap<>(16, 1);
+    private final AtomicLong epoch = new AtomicLong(0);
+
+    private static final 
AtomicReferenceFieldUpdater<TransactionCoordinatorClientImpl, State> 
STATE_UPDATER =
+            
AtomicReferenceFieldUpdater.newUpdater(TransactionCoordinatorClientImpl.class, 
State.class, "state");
+    private volatile State state = State.NONE;
+
+    public TransactionCoordinatorClientImpl(PulsarClient pulsarClient) {
+        this.pulsarClient = (PulsarClientImpl) pulsarClient;
+    }
+
+    @Override
+    public void start() throws TransactionCoordinatorClientException {
+        try {
+            startAsync().get();
+        } catch (Exception e) {
+            throw TransactionCoordinatorClientException.unwrap(e);
+        }
+    }
+
+    @Override
+    public CompletableFuture<Void> startAsync() {
+        if (STATE_UPDATER.compareAndSet(this, State.NONE, State.STARTING)) {
+            return 
pulsarClient.getLookup().getPartitionedTopicMetadata(TopicName.TRANSACTION_COORDINATOR_ASSIGN)
+                .thenAccept(partitionMeta -> {
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("Transaction meta store assign partition is 
{}.", partitionMeta.partitions);
+                    }
+                    if (partitionMeta.partitions > 0) {
+                        handlers = new 
TransactionMetaStoreHandler[partitionMeta.partitions];
+                        for (int i = 0; i < partitionMeta.partitions; i++) {
+                            TransactionMetaStoreHandler handler = new 
TransactionMetaStoreHandler(i, pulsarClient,
+                                    
TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString() + 
TopicName.PARTITIONED_TOPIC_SUFFIX + i);
+                            handlers[i] = handler;
+                            handlerMap.put(i, handler);
+                        }
+                    } else {
+                        handlers = new TransactionMetaStoreHandler[1];
+                        TransactionMetaStoreHandler handler = new 
TransactionMetaStoreHandler(0, pulsarClient,
+                                
TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString());
+                        handlers[0] = handler;
+                        handlerMap.put(0, handler);
+                    }
+
+                    STATE_UPDATER.set(TransactionCoordinatorClientImpl.this, 
State.READY);
+
+                });
+        } else {
+            return FutureUtil.failedFuture(new 
CoordinatorClientStateException("Can not start while current state is " + 
state));
+        }
+    }
+
+    @Override
+    public void close() throws TransactionCoordinatorClientException {
+        try {
+            closeAsync().get();
+        } catch (Exception e) {
+            throw TransactionCoordinatorClientException.unwrap(e);
+        }
+    }
+
+    @Override
+    public CompletableFuture<Void> closeAsync() {
+        CompletableFuture<Void> result = new CompletableFuture<>();
+        if (getState() == State.CLOSING || getState() == State.CLOSED) {

Review Comment:
   @codelipenghui It seems the state can be transformed only:
   
   * FROM `NONE` to `STARTING` in L73.
   * FROM `STARTING` to `READY` in L95.
   
   ... never using `CLOSING` or `CLOSED`. It keeps even in the latest master. 
Any thoughts here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to