codelipenghui commented on code in PR #16931:
URL: https://github.com/apache/pulsar/pull/16931#discussion_r996567311
##########
pulsar-common/src/main/java/org/apache/pulsar/common/naming/SystemTopicNames.java:
##########
@@ -37,6 +37,15 @@ public class SystemTopicNames {
*/
public static final String TRANSACTION_BUFFER_SNAPSHOT =
"__transaction_buffer_snapshot";
+ /**
+ * Local topic name for the transaction buffer snapshot segment.
+ */
+ public static final String TRANSACTION_BUFFER_SNAPSHOT_SEGMENT =
"__transaction_buffer_snapshot_segment";
Review Comment:
```suggestion
public static final String TRANSACTION_BUFFER_SNAPSHOT_SEGMENT =
"__transaction_buffer_snapshot_segments";
```
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/matadata/v2/TransactionBufferSnapshotIndexes.java:
##########
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction.buffer.matadata.v2;
Review Comment:
```suggestion
package org.apache.pulsar.broker.transaction.buffer.metadata.v2;
```
It should be a mistake before.
##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadOnlyManagedLedgerImpl.java:
##########
@@ -45,8 +45,8 @@ public ReadOnlyManagedLedgerImpl(ManagedLedgerFactoryImpl
factory, BookKeeper bo
super(factory, bookKeeper, store, config, scheduledExecutor, name);
}
- CompletableFuture<ReadOnlyCursor> initializeAndCreateCursor(PositionImpl
startPosition) {
- CompletableFuture<ReadOnlyCursor> future = new CompletableFuture<>();
+ CompletableFuture<ReadOnlyManagedLedgerImpl> initialize() {
+ CompletableFuture<ReadOnlyManagedLedgerImpl> future = new
CompletableFuture<>();
Review Comment:
If we can call ReadOnlyManagedLedgerImpl.initialize(), we already have the
ReadOnlyManagedLedgerImpl instance, can we change to CompletableFuture<Void>
here?
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/matadata/v2/TransactionBufferSnapshotIndexes.java:
##########
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction.buffer.matadata.v2;
+
+import java.util.List;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.Setter;
+import org.apache.pulsar.client.api.transaction.TxnID;
+
+@AllArgsConstructor
+@NoArgsConstructor
+@Getter
+@Setter
+@Builder
+public class TransactionBufferSnapshotIndexes {
+ private String topicName;
+
+ private List<TransactionBufferSnapshotIndex> indexList;
+
+ private TransactionBufferSnapshot snapshot;
+
+ @Builder
+ @Data
+ @AllArgsConstructor
+ @NoArgsConstructor
+ public static class TransactionBufferSnapshotIndex {
+ public long sequenceID;
+ public long maxReadPositionLedgerID;
+ public long maxReadPositionEntryID;
+ public long persistentPositionLedgerID;
+ public long persistentPositionEntryID;
+ }
+
+ @Data
+ @AllArgsConstructor
+ @NoArgsConstructor
+ public static class TransactionBufferSnapshot {
+ private String topicName;
+ private long sequenceId;
+ private long maxReadPositionLedgerId;
+ private long maxReadPositionEntryId;
+ private List<TxnID> aborts;
+ }
Review Comment:
It's better to define it as a separate class, not an internal class.
Otherwise, the caller will use
`TransactionBufferSnapshotIndexes.TransactionBufferSnapshot`.
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicTxnBufferSnapshotService.java:
##########
@@ -23,64 +23,63 @@
import java.util.concurrent.ConcurrentHashMap;
import org.apache.pulsar.broker.systopic.NamespaceEventsSystemTopicFactory;
import org.apache.pulsar.broker.systopic.SystemTopicClient;
-import org.apache.pulsar.broker.systopic.SystemTopicClient.Reader;
-import org.apache.pulsar.broker.systopic.SystemTopicClient.Writer;
-import org.apache.pulsar.broker.systopic.TransactionBufferSystemTopicClient;
-import
org.apache.pulsar.broker.transaction.buffer.matadata.TransactionBufferSnapshot;
+import org.apache.pulsar.broker.systopic.SystemTopicClientBase;
import org.apache.pulsar.client.api.PulsarClient;
-import
org.apache.pulsar.client.api.PulsarClientException.InvalidTopicNameException;
+import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.events.EventType;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.FutureUtil;
-public class SystemTopicBaseTxnBufferSnapshotService implements
TransactionBufferSnapshotService {
+public class SystemTopicTxnBufferSnapshotService<T> {
- private final Map<TopicName, SystemTopicClient<TransactionBufferSnapshot>>
clients;
+ protected final Map<TopicName, SystemTopicClient<T>> clients;
+ protected final NamespaceEventsSystemTopicFactory
namespaceEventsSystemTopicFactory;
- private final NamespaceEventsSystemTopicFactory
namespaceEventsSystemTopicFactory;
+ protected final Class<T> schemaType;
+ protected final EventType systemTopicType;
- public SystemTopicBaseTxnBufferSnapshotService(PulsarClient client) {
+ public SystemTopicTxnBufferSnapshotService(PulsarClient client, EventType
systemTopicType,
+ Class<T> schemaType) {
this.namespaceEventsSystemTopicFactory = new
NamespaceEventsSystemTopicFactory(client);
+ this.systemTopicType = systemTopicType;
+ this.schemaType = schemaType;
this.clients = new ConcurrentHashMap<>();
}
- @Override
- public CompletableFuture<Writer<TransactionBufferSnapshot>>
createWriter(TopicName topicName) {
+ public CompletableFuture<SystemTopicClient.Writer<T>>
createWriter(TopicName topicName) {
return
getTransactionBufferSystemTopicClient(topicName).thenCompose(SystemTopicClient::newWriterAsync);
}
- private CompletableFuture<SystemTopicClient<TransactionBufferSnapshot>>
getTransactionBufferSystemTopicClient(
- TopicName topicName) {
- TopicName systemTopicName = NamespaceEventsSystemTopicFactory
- .getSystemTopicName(topicName.getNamespaceObject(),
EventType.TRANSACTION_BUFFER_SNAPSHOT);
- if (systemTopicName == null) {
- return FutureUtil.failedFuture(
- new InvalidTopicNameException("Can't create
SystemTopicBaseTxnBufferSnapshotService, "
- + "because the topicName is null!"));
- }
- return
CompletableFuture.completedFuture(clients.computeIfAbsent(systemTopicName,
- (v) -> namespaceEventsSystemTopicFactory
-
.createTransactionBufferSystemTopicClient(topicName.getNamespaceObject(),
this)));
- }
-
- @Override
- public CompletableFuture<Reader<TransactionBufferSnapshot>>
createReader(TopicName topicName) {
+ public CompletableFuture<SystemTopicClient.Reader<T>>
createReader(TopicName topicName) {
return
getTransactionBufferSystemTopicClient(topicName).thenCompose(SystemTopicClient::newReaderAsync);
}
- @Override
- public void removeClient(TopicName topicName,
- TransactionBufferSystemTopicClient
transactionBufferSystemTopicClient) {
+ public void removeClient(TopicName topicName, SystemTopicClientBase<T>
transactionBufferSystemTopicClient) {
if (transactionBufferSystemTopicClient.getReaders().size() == 0
&& transactionBufferSystemTopicClient.getWriters().size() ==
0) {
clients.remove(topicName);
Review Comment:
I think you are talking about the system topic client leak issue right?
Can we resolve it with another bug-fix PR? It will help users to find the PR
that fixed the BUG.
Don't hide it in a feature PR.
##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactory.java:
##########
@@ -117,6 +117,17 @@ ReadOnlyCursor openReadOnlyCursor(String
managedLedgerName, Position startPositi
void asyncOpenReadOnlyCursor(String managedLedgerName, Position
startPosition, ManagedLedgerConfig config,
OpenReadOnlyCursorCallback callback, Object ctx);
+ /**
+ * Asynchronous open a Read-only managedLedger.
+ * @param managedLedgerName the unique name that identifies the managed
ledger
+ * @param callback
+ * @param config the managed ledger configuration.
+ * @param ctx opaque context
+ */
+ void asyncOpenReadOnlyManagedLedger(String managedLedgerName,
Review Comment:
We have
```
@InterfaceAudience.LimitedPrivate
@InterfaceStability.Stable
```
for this class
Does the new method is required?
I only see the test will call this method, if it's not required, can we
avoid introducing the new method to the public API?
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/SystemTopicClient.java:
##########
@@ -93,22 +93,22 @@
* @return message id
* @throws PulsarClientException exception while write event cause
*/
- MessageId write(T t) throws PulsarClientException;
+ MessageId write(T t, String key) throws PulsarClientException;
Review Comment:
+1, why mark this one as resolved, I don't see any conclusion for this one.
And it's better to use `MessageId write(String key, T value)`.
##########
pulsar-client-api/src/main/java/org/apache/pulsar/client/api/transaction/TxnID.java:
##########
@@ -39,14 +43,14 @@ public class TxnID implements Serializable {
*
* @serial
*/
- private final long mostSigBits;
+ private long mostSigBits;
Review Comment:
I think the issue is we are using by `TransactionBufferSnapshotIndexes`
right? It's better to add a new one in the broker side like `TxnIdData`.
Here we will change a public API for users. It's better to keep it simple
and understandable. I don't think the user will need the new constructor of
TxnID.
And allow user to update the `mostSigBits` and `leastSigBits` is also not
good. We should avoid it.
##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java:
##########
@@ -420,7 +420,30 @@ public void closeFailed(ManagedLedgerException exception,
Object ctx) {
});
}
+ @Override
+ public void asyncOpenReadOnlyManagedLedger(String managedLedgerName,
+ AsyncCallbacks.OpenReadOnlyManagedLedgerCallback
callback,
+ ManagedLedgerConfig config, Object ctx) {
+ CompletableFuture<ReadOnlyManagedLedgerImpl> future = new
CompletableFuture<>();
Review Comment:
Looks like we will not use this one.
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/TopicPoliciesSystemTopicClient.java:
##########
@@ -85,42 +85,48 @@ private TopicPolicyWriter(Producer<PulsarEvent> producer,
SystemTopicClient<Puls
}
@Override
- public MessageId write(PulsarEvent event) throws PulsarClientException
{
- TypedMessageBuilder<PulsarEvent> builder =
producer.newMessage().key(getEventKey(event)).value(event);
+ public MessageId write(PulsarEvent event, String key) throws
PulsarClientException {
+ TypedMessageBuilder<PulsarEvent> builder =
producer.newMessage().key(key).value(event);
setReplicateCluster(event, builder);
return builder.send();
}
@Override
- public CompletableFuture<MessageId> writeAsync(PulsarEvent event) {
- TypedMessageBuilder<PulsarEvent> builder =
producer.newMessage().key(getEventKey(event)).value(event);
+ public CompletableFuture<MessageId> writeAsync(PulsarEvent event,
String key) {
+ TypedMessageBuilder<PulsarEvent> builder =
producer.newMessage().key(key).value(event);
setReplicateCluster(event, builder);
return builder.sendAsync();
}
@Override
- public MessageId delete(PulsarEvent event) throws
PulsarClientException {
- validateActionType(event);
- TypedMessageBuilder<PulsarEvent> builder =
producer.newMessage().key(getEventKey(event)).value(null);
+ public MessageId delete(PulsarEvent event, String key) throws
PulsarClientException {
+ TypedMessageBuilder<PulsarEvent> builder =
producer.newMessage().key(key).value(null);
setReplicateCluster(event, builder);
return builder.send();
}
@Override
- public CompletableFuture<MessageId> deleteAsync(PulsarEvent event) {
+ public CompletableFuture<MessageId> deleteAsync(PulsarEvent event,
String key) {
validateActionType(event);
- TypedMessageBuilder<PulsarEvent> builder =
producer.newMessage().key(getEventKey(event)).value(null);
+ TypedMessageBuilder<PulsarEvent> builder =
producer.newMessage().key(key).value(null);
setReplicateCluster(event, builder);
return builder.sendAsync();
}
- private String getEventKey(PulsarEvent event) {
+ public static String getEventKey(PulsarEvent event) {
return TopicName.get(event.getTopicPoliciesEvent().getDomain(),
event.getTopicPoliciesEvent().getTenant(),
event.getTopicPoliciesEvent().getNamespace(),
event.getTopicPoliciesEvent().getTopic()).toString();
}
+ public static String getEventKey(TopicName topicName) {
+ return TopicName.get(topicName.getDomain().toString(),
+ topicName.getTenant(),
+ topicName.getNamespace(),
+
TopicName.get(topicName.getPartitionedTopicName()).getLocalName()).toString();
+ }
Review Comment:
I think the caller should provide the `getEventKey` implementation, not the
`TopicPoliciesSystemTopicClient`? Please keep the responsibilities clear.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]