merlimat commented on code in PR #25821:
URL: https://github.com/apache/pulsar/pull/25821#discussion_r3280294532


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnPaths.java:
##########
@@ -134,6 +149,85 @@ public static String ackIndexKey(String segment, String 
subscription) {
         return segmentKey(segment) + ":" + Codec.encode(subscription);
     }
 
+    /** Parent path for per-segment watermark records. Records are direct 
children of this. */
+    public static final String TXN_SEGMENT_WATERMARK_PREFIX = 
TXN_SEGMENT_STATE_PREFIX + "/watermark";
+
+    /** Parent path for per-aborted-txn records, flat across all segments. */
+    public static final String TXN_SEGMENT_ABORTED_PREFIX = 
TXN_SEGMENT_STATE_PREFIX + "/aborted";
+
+    /**
+     * @return {@code /txn/segment-state/watermark/<encoded-segment>} — 
durable watermark record
+     *     for {@code segment}. Direct child of {@link 
#TXN_SEGMENT_WATERMARK_PREFIX} so the
+     *     fallback {@code scanByIndex} path works on backends without a 
native secondary index.
+     */
+    public static String segmentWatermarkPath(String segment) {
+        return TXN_SEGMENT_WATERMARK_PREFIX + "/" + segmentKey(segment);
+    }
+
+    /**
+     * @return {@code /txn/segment-state/aborted/<encoded-segment>:<txnId>} — 
durable
+     *     per-aborted-txn record. Direct child of {@link 
#TXN_SEGMENT_ABORTED_PREFIX} (flat
+     *     across all segments) so the fallback scan finds it; {@code 
<encoded-segment>:<txnId>}
+     *     keeps the per-segment grouping addressable.
+     */
+    public static String segmentAbortedTxnPath(String segment, String txnId) {
+        return TXN_SEGMENT_ABORTED_PREFIX + "/" + segmentKey(segment) + ":" + 
txnId;
+    }
+
+    /**
+     * @return the {@link #IDX_TXN_ABORTED_BY_POSITION} index key for a 
per-segment aborted-txn
+     *     record. Format: {@code 
<encoded-segment>:padded(ledgerId):padded(entryId)} — the
+     *     encoded prefix scopes scans to one segment; the padded position is 
lexicographically
+     *     ordered so range scans by trim point work naturally.
+     */
+    public static String abortedByPositionIndexKey(String segment, long 
ledgerId, long entryId) {
+        return segmentKey(segment) + ":" + longKey(ledgerId) + ":" + 
longKey(entryId);
+    }
+
+    /** @return the lower bound of the per-segment range in {@link 
#IDX_TXN_ABORTED_BY_POSITION}. */
+    public static String abortedByPositionSegmentLowerBound(String segment) {
+        return segmentKey(segment) + ":" + longKey(0L) + ":" + longKey(0L);

Review Comment:
   Extracted into `MIN_POSITION_SUFFIX`.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnPaths.java:
##########
@@ -134,6 +149,85 @@ public static String ackIndexKey(String segment, String 
subscription) {
         return segmentKey(segment) + ":" + Codec.encode(subscription);
     }
 
+    /** Parent path for per-segment watermark records. Records are direct 
children of this. */
+    public static final String TXN_SEGMENT_WATERMARK_PREFIX = 
TXN_SEGMENT_STATE_PREFIX + "/watermark";
+
+    /** Parent path for per-aborted-txn records, flat across all segments. */
+    public static final String TXN_SEGMENT_ABORTED_PREFIX = 
TXN_SEGMENT_STATE_PREFIX + "/aborted";
+
+    /**
+     * @return {@code /txn/segment-state/watermark/<encoded-segment>} — 
durable watermark record
+     *     for {@code segment}. Direct child of {@link 
#TXN_SEGMENT_WATERMARK_PREFIX} so the
+     *     fallback {@code scanByIndex} path works on backends without a 
native secondary index.
+     */
+    public static String segmentWatermarkPath(String segment) {
+        return TXN_SEGMENT_WATERMARK_PREFIX + "/" + segmentKey(segment);
+    }
+
+    /**
+     * @return {@code /txn/segment-state/aborted/<encoded-segment>:<txnId>} — 
durable
+     *     per-aborted-txn record. Direct child of {@link 
#TXN_SEGMENT_ABORTED_PREFIX} (flat
+     *     across all segments) so the fallback scan finds it; {@code 
<encoded-segment>:<txnId>}
+     *     keeps the per-segment grouping addressable.
+     */
+    public static String segmentAbortedTxnPath(String segment, String txnId) {
+        return TXN_SEGMENT_ABORTED_PREFIX + "/" + segmentKey(segment) + ":" + 
txnId;
+    }
+
+    /**
+     * @return the {@link #IDX_TXN_ABORTED_BY_POSITION} index key for a 
per-segment aborted-txn
+     *     record. Format: {@code 
<encoded-segment>:padded(ledgerId):padded(entryId)} — the
+     *     encoded prefix scopes scans to one segment; the padded position is 
lexicographically
+     *     ordered so range scans by trim point work naturally.
+     */
+    public static String abortedByPositionIndexKey(String segment, long 
ledgerId, long entryId) {
+        return segmentKey(segment) + ":" + longKey(ledgerId) + ":" + 
longKey(entryId);
+    }
+
+    /** @return the lower bound of the per-segment range in {@link 
#IDX_TXN_ABORTED_BY_POSITION}. */
+    public static String abortedByPositionSegmentLowerBound(String segment) {
+        return segmentKey(segment) + ":" + longKey(0L) + ":" + longKey(0L);
+    }
+
+    /** @return the upper bound of the per-segment range in {@link 
#IDX_TXN_ABORTED_BY_POSITION}. */
+    public static String abortedByPositionSegmentUpperBound(String segment) {
+        return segmentKey(segment) + ":" + MAX_LONG_KEY + ":" + MAX_LONG_KEY;

Review Comment:
   Extracted into `MAX_POSITION_SUFFIX`.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnPaths.java:
##########
@@ -134,6 +149,85 @@ public static String ackIndexKey(String segment, String 
subscription) {
         return segmentKey(segment) + ":" + Codec.encode(subscription);
     }
 
+    /** Parent path for per-segment watermark records. Records are direct 
children of this. */
+    public static final String TXN_SEGMENT_WATERMARK_PREFIX = 
TXN_SEGMENT_STATE_PREFIX + "/watermark";
+
+    /** Parent path for per-aborted-txn records, flat across all segments. */
+    public static final String TXN_SEGMENT_ABORTED_PREFIX = 
TXN_SEGMENT_STATE_PREFIX + "/aborted";
+
+    /**
+     * @return {@code /txn/segment-state/watermark/<encoded-segment>} — 
durable watermark record
+     *     for {@code segment}. Direct child of {@link 
#TXN_SEGMENT_WATERMARK_PREFIX} so the
+     *     fallback {@code scanByIndex} path works on backends without a 
native secondary index.
+     */
+    public static String segmentWatermarkPath(String segment) {
+        return TXN_SEGMENT_WATERMARK_PREFIX + "/" + segmentKey(segment);
+    }
+
+    /**
+     * @return {@code /txn/segment-state/aborted/<encoded-segment>:<txnId>} — 
durable
+     *     per-aborted-txn record. Direct child of {@link 
#TXN_SEGMENT_ABORTED_PREFIX} (flat
+     *     across all segments) so the fallback scan finds it; {@code 
<encoded-segment>:<txnId>}
+     *     keeps the per-segment grouping addressable.
+     */
+    public static String segmentAbortedTxnPath(String segment, String txnId) {
+        return TXN_SEGMENT_ABORTED_PREFIX + "/" + segmentKey(segment) + ":" + 
txnId;
+    }
+
+    /**
+     * @return the {@link #IDX_TXN_ABORTED_BY_POSITION} index key for a 
per-segment aborted-txn
+     *     record. Format: {@code 
<encoded-segment>:padded(ledgerId):padded(entryId)} — the
+     *     encoded prefix scopes scans to one segment; the padded position is 
lexicographically
+     *     ordered so range scans by trim point work naturally.
+     */
+    public static String abortedByPositionIndexKey(String segment, long 
ledgerId, long entryId) {
+        return segmentKey(segment) + ":" + longKey(ledgerId) + ":" + 
longKey(entryId);
+    }
+
+    /** @return the lower bound of the per-segment range in {@link 
#IDX_TXN_ABORTED_BY_POSITION}. */
+    public static String abortedByPositionSegmentLowerBound(String segment) {
+        return segmentKey(segment) + ":" + longKey(0L) + ":" + longKey(0L);
+    }
+
+    /** @return the upper bound of the per-segment range in {@link 
#IDX_TXN_ABORTED_BY_POSITION}. */
+    public static String abortedByPositionSegmentUpperBound(String segment) {
+        return segmentKey(segment) + ":" + MAX_LONG_KEY + ":" + MAX_LONG_KEY;
+    }
+
+    /**
+     * Extract the {@code txnId} part from a path returned by {@link 
#segmentAbortedTxnPath}. The
+     * path is {@code .../aborted/<encoded-segment>:<txnId>}; {@code 
encoded-segment} is URL-
+     * encoded (no {@code :}) so the first {@code :} in the trailing name is 
the segment / txn
+     * separator.
+     *
+     * @return the txnId key, or {@code null} if {@code abortedPath} doesn't 
match
+     */
+    public static String txnIdFromAbortedPath(String abortedPath) {
+        int lastSlash = abortedPath.lastIndexOf('/');
+        if (lastSlash < 0) {
+            return null;
+        }
+        String name = abortedPath.substring(lastSlash + 1);
+        int colon = name.indexOf(':');
+        if (colon < 0 || colon == name.length() - 1) {
+            return null;
+        }
+        return name.substring(colon + 1);
+    }
+
+    /**
+     * @return the {@code segmentKey} part from a path returned by {@link 
#segmentAbortedTxnPath}.
+     */
+    public static String segmentKeyFromAbortedPath(String abortedPath) {
+        int lastSlash = abortedPath.lastIndexOf('/');
+        if (lastSlash < 0) {
+            return null;
+        }
+        String name = abortedPath.substring(lastSlash + 1);
+        int colon = name.indexOf(':');
+        return colon < 0 ? null : name.substring(0, colon);
+    }
+
     /** @return {@code value} formatted as a zero-padded fixed-width decimal 
for use as a range-scan index key. */
     public static String longKey(long value) {
         return String.format("%0" + LONG_KEY_WIDTH + "d", value);

Review Comment:
   Done — replaced `String.format` with a direct zero-padded char-buffer fill. 
Verified it produces identical output across our value range (non-negative 
ledger/entry ids and epoch millis).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to