codelipenghui commented on code in PR #16931:
URL: https://github.com/apache/pulsar/pull/16931#discussion_r996567311


##########
pulsar-common/src/main/java/org/apache/pulsar/common/naming/SystemTopicNames.java:
##########
@@ -37,6 +37,15 @@ public class SystemTopicNames {
      */
     public static final String TRANSACTION_BUFFER_SNAPSHOT = 
"__transaction_buffer_snapshot";
 
+    /**
+     * Local topic name for the transaction buffer snapshot segment.
+     */
+    public static final String TRANSACTION_BUFFER_SNAPSHOT_SEGMENT = 
"__transaction_buffer_snapshot_segment";

Review Comment:
   ```suggestion
       public static final String TRANSACTION_BUFFER_SNAPSHOT_SEGMENT = 
"__transaction_buffer_snapshot_segments";
   ```



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/matadata/v2/TransactionBufferSnapshotIndexes.java:
##########
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction.buffer.matadata.v2;

Review Comment:
   ```suggestion
   package org.apache.pulsar.broker.transaction.buffer.metadata.v2;
   ```
   
   It should be a mistake before.



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadOnlyManagedLedgerImpl.java:
##########
@@ -45,8 +45,8 @@ public ReadOnlyManagedLedgerImpl(ManagedLedgerFactoryImpl 
factory, BookKeeper bo
         super(factory, bookKeeper, store, config, scheduledExecutor, name);
     }
 
-    CompletableFuture<ReadOnlyCursor> initializeAndCreateCursor(PositionImpl 
startPosition) {
-        CompletableFuture<ReadOnlyCursor> future = new CompletableFuture<>();
+    CompletableFuture<ReadOnlyManagedLedgerImpl> initialize() {
+        CompletableFuture<ReadOnlyManagedLedgerImpl> future = new 
CompletableFuture<>();

Review Comment:
   If we can call ReadOnlyManagedLedgerImpl.initialize(), we already have the 
ReadOnlyManagedLedgerImpl instance, can we change to CompletableFuture<Void>  
here?



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/matadata/v2/TransactionBufferSnapshotIndexes.java:
##########
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction.buffer.matadata.v2;
+
+import java.util.List;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.Setter;
+import org.apache.pulsar.client.api.transaction.TxnID;
+
+@AllArgsConstructor
+@NoArgsConstructor
+@Getter
+@Setter
+@Builder
+public class TransactionBufferSnapshotIndexes {
+    private String topicName;
+
+    private List<TransactionBufferSnapshotIndex> indexList;
+
+    private TransactionBufferSnapshot snapshot;
+
+    @Builder
+    @Data
+    @AllArgsConstructor
+    @NoArgsConstructor
+    public static class TransactionBufferSnapshotIndex {
+        public long sequenceID;
+        public long maxReadPositionLedgerID;
+        public long maxReadPositionEntryID;
+        public long persistentPositionLedgerID;
+        public long persistentPositionEntryID;
+    }
+
+    @Data
+    @AllArgsConstructor
+    @NoArgsConstructor
+    public static class TransactionBufferSnapshot {
+        private String topicName;
+        private long sequenceId;
+        private long maxReadPositionLedgerId;
+        private long maxReadPositionEntryId;
+        private List<TxnID> aborts;
+    }

Review Comment:
   It's better to define it as a separate class, not an internal class.
   Otherwise, the caller will use 
`TransactionBufferSnapshotIndexes.TransactionBufferSnapshot`.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicTxnBufferSnapshotService.java:
##########
@@ -23,64 +23,63 @@
 import java.util.concurrent.ConcurrentHashMap;
 import org.apache.pulsar.broker.systopic.NamespaceEventsSystemTopicFactory;
 import org.apache.pulsar.broker.systopic.SystemTopicClient;
-import org.apache.pulsar.broker.systopic.SystemTopicClient.Reader;
-import org.apache.pulsar.broker.systopic.SystemTopicClient.Writer;
-import org.apache.pulsar.broker.systopic.TransactionBufferSystemTopicClient;
-import 
org.apache.pulsar.broker.transaction.buffer.matadata.TransactionBufferSnapshot;
+import org.apache.pulsar.broker.systopic.SystemTopicClientBase;
 import org.apache.pulsar.client.api.PulsarClient;
-import 
org.apache.pulsar.client.api.PulsarClientException.InvalidTopicNameException;
+import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.common.events.EventType;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.util.FutureUtil;
 
-public class SystemTopicBaseTxnBufferSnapshotService implements 
TransactionBufferSnapshotService {
+public class SystemTopicTxnBufferSnapshotService<T> {
 
-    private final Map<TopicName, SystemTopicClient<TransactionBufferSnapshot>> 
clients;
+    protected final Map<TopicName, SystemTopicClient<T>> clients;
+    protected final NamespaceEventsSystemTopicFactory 
namespaceEventsSystemTopicFactory;
 
-    private final NamespaceEventsSystemTopicFactory 
namespaceEventsSystemTopicFactory;
+    protected final Class<T> schemaType;
+    protected final EventType systemTopicType;
 
-    public SystemTopicBaseTxnBufferSnapshotService(PulsarClient client) {
+    public SystemTopicTxnBufferSnapshotService(PulsarClient client, EventType 
systemTopicType,
+                                               Class<T> schemaType) {
         this.namespaceEventsSystemTopicFactory = new 
NamespaceEventsSystemTopicFactory(client);
+        this.systemTopicType = systemTopicType;
+        this.schemaType = schemaType;
         this.clients = new ConcurrentHashMap<>();
     }
 
-    @Override
-    public CompletableFuture<Writer<TransactionBufferSnapshot>> 
createWriter(TopicName topicName) {
+    public CompletableFuture<SystemTopicClient.Writer<T>> 
createWriter(TopicName topicName) {
         return 
getTransactionBufferSystemTopicClient(topicName).thenCompose(SystemTopicClient::newWriterAsync);
     }
 
-    private CompletableFuture<SystemTopicClient<TransactionBufferSnapshot>> 
getTransactionBufferSystemTopicClient(
-            TopicName topicName) {
-        TopicName systemTopicName = NamespaceEventsSystemTopicFactory
-                .getSystemTopicName(topicName.getNamespaceObject(), 
EventType.TRANSACTION_BUFFER_SNAPSHOT);
-        if (systemTopicName == null) {
-            return FutureUtil.failedFuture(
-                    new InvalidTopicNameException("Can't create 
SystemTopicBaseTxnBufferSnapshotService, "
-                            + "because the topicName is null!"));
-        }
-        return 
CompletableFuture.completedFuture(clients.computeIfAbsent(systemTopicName,
-                (v) -> namespaceEventsSystemTopicFactory
-                        
.createTransactionBufferSystemTopicClient(topicName.getNamespaceObject(), 
this)));
-    }
-
-    @Override
-    public CompletableFuture<Reader<TransactionBufferSnapshot>> 
createReader(TopicName topicName) {
+    public CompletableFuture<SystemTopicClient.Reader<T>> 
createReader(TopicName topicName) {
         return 
getTransactionBufferSystemTopicClient(topicName).thenCompose(SystemTopicClient::newReaderAsync);
     }
 
-    @Override
-    public void removeClient(TopicName topicName,
-                                          TransactionBufferSystemTopicClient 
transactionBufferSystemTopicClient) {
+    public void removeClient(TopicName topicName, SystemTopicClientBase<T> 
transactionBufferSystemTopicClient) {
         if (transactionBufferSystemTopicClient.getReaders().size() == 0
                 && transactionBufferSystemTopicClient.getWriters().size() == 
0) {
             clients.remove(topicName);

Review Comment:
   I think you are talking about the system topic client leak issue right?
   Can we resolve it with another bug-fix PR? It will help users to find the PR 
that fixed the BUG.
   Don't hide it in a feature PR.



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactory.java:
##########
@@ -117,6 +117,17 @@ ReadOnlyCursor openReadOnlyCursor(String 
managedLedgerName, Position startPositi
     void asyncOpenReadOnlyCursor(String managedLedgerName, Position 
startPosition, ManagedLedgerConfig config,
             OpenReadOnlyCursorCallback callback, Object ctx);
 
+    /**
+     * Asynchronous open a Read-only managedLedger.
+     * @param managedLedgerName the unique name that identifies the managed 
ledger
+     * @param callback
+     * @param config the managed ledger configuration.
+     * @param ctx opaque context
+     */
+    void asyncOpenReadOnlyManagedLedger(String managedLedgerName,

Review Comment:
   We have 
   
   ```
   @InterfaceAudience.LimitedPrivate
   @InterfaceStability.Stable
   ```
   for this class
   Does the new method is required?
   I only see the test will call this method, if it's not required, can we 
avoid introducing the new method to the public API?



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/SystemTopicClient.java:
##########
@@ -93,22 +93,22 @@
          * @return message id
          * @throws PulsarClientException exception while write event cause
          */
-        MessageId write(T t) throws PulsarClientException;
+        MessageId write(T t, String key) throws PulsarClientException;

Review Comment:
   +1, why mark this one as resolved, I don't see any conclusion for this one.
   And it's better to use `MessageId write(String key, T value)`.



##########
pulsar-client-api/src/main/java/org/apache/pulsar/client/api/transaction/TxnID.java:
##########
@@ -39,14 +43,14 @@ public class TxnID implements Serializable {
      *
      * @serial
      */
-    private final long mostSigBits;
+    private long mostSigBits;

Review Comment:
   I think the issue is we are using by `TransactionBufferSnapshotIndexes` 
right? It's better to add a new one in the broker side like `TxnIdData`.
   
   Here we will change a public API for users. It's better to keep it simple 
and understandable. I don't think the user will need the new constructor of 
TxnID.
   
   And allow user to update the `mostSigBits` and `leastSigBits` is also not 
good. We should avoid it.



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java:
##########
@@ -420,7 +420,30 @@ public void closeFailed(ManagedLedgerException exception, 
Object ctx) {
         });
     }
 
+    @Override
+    public void asyncOpenReadOnlyManagedLedger(String managedLedgerName,
+                              AsyncCallbacks.OpenReadOnlyManagedLedgerCallback 
callback,
+                              ManagedLedgerConfig config, Object ctx) {
+        CompletableFuture<ReadOnlyManagedLedgerImpl> future = new 
CompletableFuture<>();

Review Comment:
   Looks like we will not use this one.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/TopicPoliciesSystemTopicClient.java:
##########
@@ -85,42 +85,48 @@ private TopicPolicyWriter(Producer<PulsarEvent> producer, 
SystemTopicClient<Puls
         }
 
         @Override
-        public MessageId write(PulsarEvent event) throws PulsarClientException 
{
-            TypedMessageBuilder<PulsarEvent> builder = 
producer.newMessage().key(getEventKey(event)).value(event);
+        public MessageId write(PulsarEvent event, String key) throws 
PulsarClientException {
+            TypedMessageBuilder<PulsarEvent> builder = 
producer.newMessage().key(key).value(event);
             setReplicateCluster(event, builder);
             return builder.send();
         }
 
         @Override
-        public CompletableFuture<MessageId> writeAsync(PulsarEvent event) {
-            TypedMessageBuilder<PulsarEvent> builder = 
producer.newMessage().key(getEventKey(event)).value(event);
+        public CompletableFuture<MessageId> writeAsync(PulsarEvent event, 
String key) {
+            TypedMessageBuilder<PulsarEvent> builder = 
producer.newMessage().key(key).value(event);
             setReplicateCluster(event, builder);
             return builder.sendAsync();
         }
 
         @Override
-        public MessageId delete(PulsarEvent event) throws 
PulsarClientException {
-            validateActionType(event);
-            TypedMessageBuilder<PulsarEvent> builder = 
producer.newMessage().key(getEventKey(event)).value(null);
+        public MessageId delete(PulsarEvent event, String key) throws 
PulsarClientException {
+            TypedMessageBuilder<PulsarEvent> builder = 
producer.newMessage().key(key).value(null);
             setReplicateCluster(event, builder);
             return builder.send();
         }
 
         @Override
-        public CompletableFuture<MessageId> deleteAsync(PulsarEvent event) {
+        public CompletableFuture<MessageId> deleteAsync(PulsarEvent event, 
String key) {
             validateActionType(event);
-            TypedMessageBuilder<PulsarEvent> builder = 
producer.newMessage().key(getEventKey(event)).value(null);
+            TypedMessageBuilder<PulsarEvent> builder = 
producer.newMessage().key(key).value(null);
             setReplicateCluster(event, builder);
             return builder.sendAsync();
         }
 
-        private String getEventKey(PulsarEvent event) {
+        public static String getEventKey(PulsarEvent event) {
             return TopicName.get(event.getTopicPoliciesEvent().getDomain(),
                 event.getTopicPoliciesEvent().getTenant(),
                 event.getTopicPoliciesEvent().getNamespace(),
                 event.getTopicPoliciesEvent().getTopic()).toString();
         }
 
+        public static String getEventKey(TopicName topicName) {
+            return TopicName.get(topicName.getDomain().toString(),
+                    topicName.getTenant(),
+                    topicName.getNamespace(),
+                    
TopicName.get(topicName.getPartitionedTopicName()).getLocalName()).toString();
+        }

Review Comment:
   I think the caller should provide the `getEventKey` implementation, not the 
`TopicPoliciesSystemTopicClient`? Please keep the responsibilities clear.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to