codelipenghui commented on code in PR #15137:
URL: https://github.com/apache/pulsar/pull/15137#discussion_r857629714


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/util/LogIndexLagBackoff.java:
##########
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction.util;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import lombok.Getter;
+
+public class LogIndexLagBackoff {
+
+    @Getter
+    private final long minLag;
+    @Getter
+    private final long maxLag;
+    @Getter
+    private final double exponent;
+
+    public LogIndexLagBackoff(long minLag, long maxLag, double exponent) {
+        checkArgument(minLag >= 0, "min lag must be >= 0");
+        checkArgument(maxLag >= minLag || maxLag == -1, "maxLag should be >= 
minLag or == -1");

Review Comment:
   And why the `minLag` can be 0, if `minLag` == 1 which means we will add 
index for each entry, what does `minLag` == 0 mean?



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/util/LogIndexLagBackoff.java:
##########
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction.util;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import lombok.Getter;
+
+public class LogIndexLagBackoff {
+
+    @Getter
+    private final long minLag;
+    @Getter
+    private final long maxLag;
+    @Getter
+    private final double exponent;
+
+    public LogIndexLagBackoff(long minLag, long maxLag, double exponent) {
+        checkArgument(minLag >= 0, "min lag must be >= 0");
+        checkArgument(maxLag >= minLag || maxLag == -1, "maxLag should be >= 
minLag or == -1");

Review Comment:
   If we don't want to limit the maxLag, we can just use the Long.MAX_VALUE, so 
that we don't need to handle any negative values, this will simplify the 
implementation



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java:
##########
@@ -292,6 +249,69 @@ public void addFailed(ManagedLedgerException exception, 
Object ctx) {
         return completableFuture;
     }
 
+    private void handleMetadataEntry(PositionImpl logPosition, 
PendingAckMetadataEntry pendingAckMetadataEntry) {
+        // store the persistent position in to memory
+        // store the max position of this entry retain
+        if (pendingAckMetadataEntry.getPendingAckOp() != PendingAckOp.ABORT
+                && pendingAckMetadataEntry.getPendingAckOp() != 
PendingAckOp.COMMIT) {
+            Optional<PendingAckMetadata> optional = 
pendingAckMetadataEntry.getPendingAckMetadatasList()
+                    .stream().max((o1, o2) -> 
ComparisonChain.start().compare(o1.getLedgerId(),
+                            o2.getLedgerId()).compare(o1.getEntryId(), 
o2.getEntryId()).result());
+
+            optional.ifPresent(pendingAckMetadata -> {
+                PositionImpl nowPosition = 
PositionImpl.get(pendingAckMetadata.getLedgerId(),
+                        pendingAckMetadata.getEntryId());
+
+                if (nowPosition.compareTo(maxAckPosition) > 0) {
+                    maxAckPosition = nowPosition;
+                }
+                if (logAppendTimes.get() > upperLimitOfLogAppendTimes) {

Review Comment:
   > And the execution here is synchronous, there is no race
   
   If the execution here is synchronous, why need to use `AtomicLong`?



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/util/LogIndexLagBackoff.java:
##########
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction.util;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import lombok.Getter;
+
+public class LogIndexLagBackoff {
+
+    @Getter
+    private final long minLag;
+    @Getter
+    private final long maxLag;
+    @Getter
+    private final double exponent;
+
+    public LogIndexLagBackoff(long minLag, long maxLag, double exponent) {
+        checkArgument(minLag >= 0, "min lag must be >= 0");
+        checkArgument(maxLag >= minLag || maxLag == -1, "maxLag should be >= 
minLag or == -1");
+        checkArgument(exponent >= 1, "exponent must be >= 1");
+        this.minLag = minLag;
+        this.maxLag = maxLag;
+        this.exponent = exponent;
+    }
+
+
+    public long next(int indexCount) {
+        if (indexCount <= 0 || minLag <= 0) {
+            return 0;
+        }
+        if (maxLag != -1) {
+            return (long) Math.min(this.maxLag, minLag * Math.pow(indexCount, 
exponent));
+        } else {
+            return (long) (minLag * Math.pow(indexCount, exponent));
+        }

Review Comment:
   If we can avoid any such `0`, `-1` number, we will get a simpler 
implementation here.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java:
##########
@@ -292,6 +249,69 @@ public void addFailed(ManagedLedgerException exception, 
Object ctx) {
         return completableFuture;
     }
 
+    private void handleMetadataEntry(PositionImpl logPosition, 
PendingAckMetadataEntry pendingAckMetadataEntry) {
+        // store the persistent position in to memory
+        // store the max position of this entry retain
+        if (pendingAckMetadataEntry.getPendingAckOp() != PendingAckOp.ABORT
+                && pendingAckMetadataEntry.getPendingAckOp() != 
PendingAckOp.COMMIT) {
+            Optional<PendingAckMetadata> optional = 
pendingAckMetadataEntry.getPendingAckMetadatasList()
+                    .stream().max((o1, o2) -> 
ComparisonChain.start().compare(o1.getLedgerId(),
+                            o2.getLedgerId()).compare(o1.getEntryId(), 
o2.getEntryId()).result());
+
+            optional.ifPresent(pendingAckMetadata -> {
+                PositionImpl nowPosition = 
PositionImpl.get(pendingAckMetadata.getLedgerId(),
+                        pendingAckMetadata.getEntryId());
+
+                if (nowPosition.compareTo(maxAckPosition) > 0) {
+                    maxAckPosition = nowPosition;
+                }
+                if (logAppendTimes.get() > upperLimitOfLogAppendTimes) {

Review Comment:
   > Sorry, I can not understand you mind. In this proposal, we use the size of 
pendingAckLogIndex to get the next backoff.
   
   I mean we already introduced `Lag` concept here, why need to introduce 
another concept named `*AppendTimes`, from the current implementation, 
logAppendTimes == currentIndexLag, upperLimitOfLogAppendTimes == maxIndexLag 
right? just a different name, but the new name makes the code more cumbersome 
to read
   
   We use a LagBackoff to calculate the next maxIndexLag, more entries are 
added to the entry log, the currentIndexLog increases, if reach the 
maxIndexLag, to get the next maxIndexLag and reset the currentIndexLag. 
   
   Or we can avoid resetting the `currentIndexLag`, just keep it increasing and 
use a `nextIndexLag`. If reach the `currentIndexLag` == `nextIndexLag`, reset 
the `nextIndexLag`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to