codelipenghui commented on code in PR #22762:
URL: https://github.com/apache/pulsar/pull/22762#discussion_r1616125013


##########
pulsar-client-api/src/main/java/org/apache/pulsar/client/api/SubscriptionIsolationLevel.java:
##########
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+public enum SubscriptionIsolationLevel {

Review Comment:
   It should be TransactionIsolationLevel



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java:
##########
@@ -3079,6 +3083,14 @@ private Response generateResponseWithEntry(Entry entry) 
throws IOException {
         if (metadata.hasNullPartitionKey()) {
             responseBuilder.header("X-Pulsar-null-partition-key", 
metadata.isNullPartitionKey());
         }
+        if (metadata.hasTxnidMostBits() && metadata.hasTxnidLeastBits()) {
+            TxnID txnID = new TxnID(metadata.getTxnidMostBits(), 
metadata.getTxnidLeastBits());
+            boolean isTxnAborted = persistentTopic.isTxnAborted(txnID, 
(PositionImpl) entry.getPosition());
+            responseBuilder.header("X-Pulsar-txn-aborted", isTxnAborted);
+        }
+        boolean isTxnUncommitted = ((PositionImpl) entry.getPosition())
+                .compareTo(persistentTopic.getMaxReadPosition()) > 0;
+        responseBuilder.header("X-Pulsar-txn-uncommitted", isTxnUncommitted);

Review Comment:
   I think it's not a correct implementation because we should keep a 
consistent behavior for Admin REST API and Admin CLI. The filter should be 
applied to the server side.



##########
pulsar-client-api/src/main/java/org/apache/pulsar/client/api/SubscriptionIsolationLevel.java:
##########
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+public enum SubscriptionIsolationLevel {

Review Comment:
   Could you please also help add 
   
   ```
   @InterfaceAudience.Public
   @InterfaceStability.Stable
   ```



##########
pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java:
##########
@@ -896,26 +902,49 @@ public void failed(Throwable throwable) {
     @Override
     public List<Message<byte[]>> peekMessages(String topic, String subName, 
int numMessages)
             throws PulsarAdminException {
-        return sync(() -> peekMessagesAsync(topic, subName, numMessages));
+        return sync(() -> peekMessagesAsync(topic, subName, numMessages,
+                false, SubscriptionIsolationLevel.READ_COMMITTED));
+    }

Review Comment:
   It's better to remove this implementation, just add a default implementation 
to the interface.



##########
pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java:
##########
@@ -1655,6 +1656,34 @@ void expireMessagesForAllSubscriptions(String topic, 
long expireTimeInSeconds)
      */
     List<Message<byte[]>> peekMessages(String topic, String subName, int 
numMessages) throws PulsarAdminException;
 
+    /**
+     * Peek messages from a topic subscription.
+     *
+     * @param topic
+     *            topic name
+     * @param subName
+     *            Subscription name
+     * @param numMessages
+     *            Number of messages
+     * @param showServerMarker
+     *            Enables the display of internal server write markers
+     * @param transactionIsolationLevel
+     *            Sets the isolation level for peeking messages within 
transactions.
+     *            - 'READ_COMMITTED' allows peeking only committed 
transactional messages.
+     *            - 'READ_UNCOMMITTED' allows peeking all messages,
+     *                                 even transactional messages which have 
been aborted.
+     * @return
+     * @throws NotAuthorizedException
+     *             Don't have admin permission
+     * @throws NotFoundException
+     *             Topic or subscription does not exist
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    List<Message<byte[]>> peekMessages(String topic, String subName, int 
numMessages,
+                                       boolean showServerMarker, 
SubscriptionIsolationLevel transactionIsolationLevel)
+            throws PulsarAdminException;

Review Comment:
   It's better to add a default implementation for 
https://github.com/apache/pulsar/pull/22762/files#diff-943b19d9674155ee923cf360b1a85ded4fcb2708edb9dc89be8cbf0ba3124deaR1657
   
   So that users can know the default value for `showServerMarker` and 
`transactionIsolationLevel`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to