eolivelli commented on a change in pull request #8881:
URL: https://github.com/apache/pulsar/pull/8881#discussion_r557956335



##########
File path: pulsar-broker/generate_protobuf_docker.sh
##########
@@ -0,0 +1,42 @@
+#!/usr/bin/env bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# Fail script in case of errors
+set -e
+
+ROOT_DIR=$(git rev-parse --show-toplevel)
+COMMON_DIR=$ROOT_DIR/
+cd $COMMON_DIR
+
+BUILD_IMAGE_NAME="${BUILD_IMAGE_NAME:-apachepulsar/pulsar-build}"
+BUILD_IMAGE_VERSION="${BUILD_IMAGE_VERSION:-ubuntu-16.04}"
+
+IMAGE="$BUILD_IMAGE_NAME:$BUILD_IMAGE_VERSION"
+
+echo $IMAGE
+
+# Force to pull image in case it was updated
+docker pull $IMAGE
+
+WORKDIR=/workdir
+docker run -i \

Review comment:
       why do we need this file ?

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
##########
@@ -161,6 +162,12 @@ void setReplicated(boolean replicated) {
 
     @Override
     public synchronized void addConsumer(Consumer consumer) throws 
BrokerServiceException {
+        if (pendingAckHandle instanceof PendingAckHandleImpl) {
+            if (!((PendingAckHandleImpl) pendingAckHandle).checkIfReady()) {

Review comment:
       can we add `checkIdReady` to `PendingAckHandle` ?
   using `instanceof` is not a good practice, we should leverage polymorphism

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/InMemoryPendingAckStore.java
##########
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction.pendingack.impl;
+
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.commons.lang3.tuple.MutablePair;
+import org.apache.pulsar.broker.transaction.pendingack.PendingAckStore;
+import org.apache.pulsar.client.api.transaction.TxnID;
+import org.apache.pulsar.common.api.proto.PulsarApi;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+
+/**
+ * In memory implementation of {@link PendingAckStore}.
+ */
+public class InMemoryPendingAckStore implements PendingAckStore {
+
+    @Override
+    public void replayAsync(PendingAckHandleImpl pendingAckHandle, 
ScheduledExecutorService scheduledExecutorService) {
+        pendingAckHandle.changeToReadyState();

Review comment:
       is this expected to happen inside the `scheduledExecutorService` ?

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStoreProvider.java
##########
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction.pendingack.impl;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+import io.netty.util.HashedWheelTimer;
+import io.netty.util.Timer;
+import io.netty.util.concurrent.DefaultThreadFactory;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.AsyncCallbacks;
+import org.apache.bookkeeper.mledger.ManagedCursor;
+import org.apache.bookkeeper.mledger.ManagedLedger;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.broker.transaction.pendingack.PendingAckStore;
+import 
org.apache.pulsar.broker.transaction.pendingack.TransactionPendingAckStoreProvider;
+import 
org.apache.pulsar.broker.transaction.pendingack.exceptions.TransactionPendingAckStoreProviderException;
+import 
org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition;
+import org.apache.pulsar.common.naming.TopicName;
+
+/**
+ * Provider is for MLPendingAckStore.
+ */
+@Slf4j
+public class MLPendingAckStoreProvider implements 
TransactionPendingAckStoreProvider {
+
+    private volatile Timer timer;
+
+    private static final long tickTimeMillis = 1L;
+
+    @Override
+    public CompletableFuture<PendingAckStore> 
newPendingAckStore(PersistentSubscription subscription) {
+        CompletableFuture<PendingAckStore> pendingAckStoreFuture = new 
CompletableFuture<>();
+
+        if (subscription == null) {
+            pendingAckStoreFuture.completeExceptionally(
+                    new TransactionPendingAckStoreProviderException("The 
subscription is null."));
+            return pendingAckStoreFuture;
+        }
+
+        PersistentTopic originPersistentTopic = (PersistentTopic) 
subscription.getTopic();
+        String pendingAckTopicName = MLPendingAckStore
+                
.getTransactionPendingAckStoreSuffix(originPersistentTopic.getName(), 
subscription.getName());
+
+        originPersistentTopic.getBrokerService().getManagedLedgerFactory()
+                
.asyncOpen(TopicName.get(pendingAckTopicName).getPersistenceNamingEncoding(),
+                        new AsyncCallbacks.OpenLedgerCallback() {
+                            @Override
+                            public void openLedgerComplete(ManagedLedger 
ledger, Object ctx) {
+                                
ledger.asyncOpenCursor(MLPendingAckStore.getTransactionPendingAckStoreCursorName(),
+                                        InitialPosition.Earliest, new 
AsyncCallbacks.OpenCursorCallback() {
+                                            @Override
+                                            public void 
openCursorComplete(ManagedCursor cursor, Object ctx) {
+                                                if (timer == null) {

Review comment:
       what about creating a method in parent class MLPendingAckStoreProvider 
   ```
   void ensureTimer() {
        if (timer == null) {
            synchronized(this) {
                initialize();  
          }
       }
   }
   ```
   IMHO code will be more readable

##########
File path: pulsar-broker/generate_protobuf.sh
##########
@@ -0,0 +1,23 @@
+#!/usr/bin/env bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+
+PROTOC=${PROTOC:-protoc}
+${PROTOC} --java_out=pulsar-broker/src/main/java 
pulsar-broker/src/main/proto/TransactionPendingAck.proto

Review comment:
       why do we need this file ?

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStoreTimerTask.java
##########
@@ -0,0 +1,200 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction.pendingack.impl;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.util.Timeout;
+import io.netty.util.Timer;
+import io.netty.util.TimerTask;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.AsyncCallbacks;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.ManagedCursor;
+import org.apache.bookkeeper.mledger.ManagedLedger;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import 
org.apache.pulsar.broker.transaction.proto.TransactionPendingAck.PendingAckMetadata;
+import 
org.apache.pulsar.broker.transaction.proto.TransactionPendingAck.PendingAckMetadataEntry;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
+import org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Pending ack timer task.
+ */
+@Slf4j
+public class MLPendingAckStoreTimerTask implements TimerTask {
+
+    private int intervalTime;
+
+    private final int maxIntervalTime;
+
+    private final int minIntervalTime;
+
+    private final ManagedCursorImpl subManagedCursor;
+
+    private final ManagedLedgerImpl storeManagedLedger;
+
+    private final ManagedCursorImpl managedCursor;
+
+    private final Timer timer;
+
+    private volatile PositionImpl markDeletePosition;
+
+    public MLPendingAckStoreTimerTask(ManagedCursor managedCursor, 
ManagedLedger storeManagedLedger,
+                                      int maxIntervalTime, int minIntervalTime,
+                                      ManagedCursor subManageCursor, Timer 
timer) {
+        this.intervalTime = minIntervalTime;
+        this.maxIntervalTime = maxIntervalTime;
+        this.minIntervalTime = minIntervalTime;
+        this.managedCursor = (ManagedCursorImpl) managedCursor;
+        this.subManagedCursor = (ManagedCursorImpl) subManageCursor;
+        this.storeManagedLedger = (ManagedLedgerImpl) storeManagedLedger;
+        this.markDeletePosition = (PositionImpl) 
managedCursor.getMarkDeletedPosition();
+        this.timer = timer;
+    }
+
+    @Override
+    public void run(Timeout timeout) {
+        if (this.managedCursor.getState().equals("Closed")) {
+            return;
+        }
+        try {
+            // when no transaction ack operation in this pending ack store, it 
will increase the interval time
+            if (markDeletePosition.compareTo((PositionImpl) 
storeManagedLedger.getLastConfirmedEntry()) == 0) {
+                int time = intervalTime + minIntervalTime;
+                if (time > maxIntervalTime) {
+                    intervalTime = maxIntervalTime;
+                } else {
+                    intervalTime = time;
+                }
+                managedCursor.markDelete(markDeletePosition);
+                timer.newTimeout(MLPendingAckStoreTimerTask.this, 
intervalTime, TimeUnit.SECONDS);
+                return;
+            } else {
+                int time = intervalTime - minIntervalTime;
+                if (time < minIntervalTime) {
+                    intervalTime = minIntervalTime;
+                } else {
+                    intervalTime = time;
+                }
+            }
+            // this while in order to find the last position witch can mark 
delete
+            while (true) {
+                PositionImpl nextPosition = 
storeManagedLedger.getNextValidPosition(markDeletePosition);
+                Entry entry = getEntry(nextPosition).get();
+                ByteBuf buffer = entry.getDataBuffer();
+                ByteBufCodedInputStream stream = 
ByteBufCodedInputStream.get(buffer);
+                PendingAckMetadataEntry.Builder pendingAckMetadataEntryBuilder 
=
+                        PendingAckMetadataEntry.newBuilder();
+                PendingAckMetadataEntry pendingAckMetadataEntry = null;
+                try {
+                    pendingAckMetadataEntry =
+                            pendingAckMetadataEntryBuilder.mergeFrom(stream, 
null).build();
+                    switch (pendingAckMetadataEntry.getPendingAckOp()) {
+                        case ACK:
+                            if (pendingAckMetadataEntry.getAckType() == 
AckType.Cumulative) {
+                                PendingAckMetadata pendingAckMetadata =
+                                        
pendingAckMetadataEntry.getPendingAckMetadata(0);
+                                
handleAckCommon(PositionImpl.get(pendingAckMetadata.getLedgerId(),
+                                        pendingAckMetadata.getEntryId()), 
nextPosition);
+                            } else {
+                                //this judge the pendingAckMetadataEntry is 
can delete
+                                PositionImpl largestPosition = null;
+                                List<PendingAckMetadata> metadataList =
+                                        
pendingAckMetadataEntry.getPendingAckMetadataList();
+                                for (int i = 0; i < metadataList.size(); i++) {
+                                    PendingAckMetadata pendingAckMetadata = 
metadataList.get(0);
+                                    if (largestPosition == null) {
+                                        largestPosition = 
PositionImpl.get(pendingAckMetadata.getLedgerId(),
+                                                
pendingAckMetadata.getEntryId());
+                                    } else {
+                                        PositionImpl comparePosition = 
PositionImpl
+                                                
.get(pendingAckMetadata.getLedgerId(),
+                                                        
pendingAckMetadata.getEntryId());
+                                        if 
(largestPosition.compareTo(comparePosition) <  0) {
+                                            largestPosition = comparePosition;
+                                        }
+
+                                    }
+                                }
+                                if (largestPosition != null) {
+                                    handleAckCommon(largestPosition, 
nextPosition);
+                                }
+                            }
+                            break;
+                        case ABORT:
+                        case COMMIT:
+                            markDeletePosition = nextPosition;
+                            break;
+                        default:
+                            log.error("PendingAck timer task read illegal 
metadata state! {}",
+                                    pendingAckMetadataEntry.getPendingAckOp());
+                    }
+                } finally {
+                    entry.release();
+                    if (pendingAckMetadataEntry != null) {
+                        pendingAckMetadataEntry.recycle();
+                    }
+                    pendingAckMetadataEntryBuilder.recycle();
+                    stream.recycle();
+                }
+                // when markDeletePosition is not nextPosition, before 
markDeletePosition can delete
+                if (markDeletePosition != nextPosition) {
+                    this.managedCursor.markDelete(markDeletePosition);
+                    break;
+                }
+            }
+            this.timer.newTimeout(this, intervalTime, TimeUnit.MILLISECONDS);
+        } catch (Exception e) {
+            log.error("PendingAck timer task error!", e);
+            if ("Cursor was already closed".equals(e.getCause().getMessage())) 
{

Review comment:
       this is very error prone,
   can we detect an instance of a specific class of exception ?

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckReplyCallBack.java
##########
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction.pendingack;
+
+import 
org.apache.pulsar.broker.transaction.proto.TransactionPendingAck.PendingAckMetadataEntry;
+
+/**
+ * Call back for pending ack reply.
+ */
+public interface PendingAckReplyCallBack {
+
+    /**
+     * Pending ack replay complete callback for pending ack store.
+     */
+    void replayComplete();
+
+    /**
+     * Handle metadata entry.

Review comment:
       how many times is this method supposed to be called ?
   
   

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java
##########
@@ -0,0 +1,336 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction.pendingack.impl;
+
+import io.netty.buffer.ByteBuf;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import io.netty.util.Timer;
+import org.apache.bookkeeper.mledger.AsyncCallbacks;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.ManagedCursor;
+import org.apache.bookkeeper.mledger.ManagedLedger;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.commons.lang3.tuple.MutablePair;
+import org.apache.pulsar.broker.transaction.pendingack.PendingAckReplyCallBack;
+import org.apache.pulsar.broker.transaction.pendingack.PendingAckStore;
+import 
org.apache.pulsar.broker.transaction.proto.TransactionPendingAck.PendingAckMetadata;
+import 
org.apache.pulsar.broker.transaction.proto.TransactionPendingAck.PendingAckMetadataEntry;
+import 
org.apache.pulsar.broker.transaction.proto.TransactionPendingAck.PendingAckOp;
+import org.apache.pulsar.client.api.transaction.TxnID;
+import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.util.SafeCollectionUtils;
+import org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream;
+import org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream;
+import org.jctools.queues.MessagePassingQueue;
+import org.jctools.queues.SpscArrayQueue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The implement of the pending ack store by manageLedger.
+ */
+public class MLPendingAckStore implements PendingAckStore {
+
+
+    private final ManagedLedger managedLedger;
+
+    private final ManagedCursor cursor;
+
+    private static final String PENDING_ACK_STORE_SUFFIX = 
"-transaction-pendingack";
+
+    private static final String PENDING_ACK_STORE_CURSOR_NAME = "pendingack";
+
+    private final SpscArrayQueue<Entry> entryQueue;
+
+    //this is for replay
+    private final PositionImpl lastConfirmedEntry;
+
+    private PositionImpl currentLoadPosition;
+
+    private final Timer timer;
+
+    private final MLPendingAckStoreTimerTask mlPendingAckStoreTimerTask;
+
+    private final int intervalTime;
+
+    public MLPendingAckStore(ManagedLedger managedLedger, ManagedCursor cursor,
+                             Timer timer, ManagedCursor subManagedCursor, int 
maxIntervalTime, int minIntervalTime) {
+        this.managedLedger = managedLedger;
+        this.cursor = cursor;
+        this.currentLoadPosition = (PositionImpl) 
this.cursor.getMarkDeletedPosition();
+        this.entryQueue = new SpscArrayQueue<>(2000);
+        this.lastConfirmedEntry = (PositionImpl) 
managedLedger.getLastConfirmedEntry();
+        this.timer = timer;
+        this.intervalTime = minIntervalTime;
+        this.mlPendingAckStoreTimerTask = new 
MLPendingAckStoreTimerTask(cursor, managedLedger,
+                minIntervalTime, maxIntervalTime, subManagedCursor, 
this.timer);
+    }
+
+    @Override
+    public void replayAsync(PendingAckHandleImpl pendingAckHandle, 
ScheduledExecutorService transactionReplayExecutor) {
+        transactionReplayExecutor
+                .execute(new PendingAckReplay(new 
MLPendingAckReplyCallBack(this, pendingAckHandle)));
+    }
+
+    //TODO can control the number of entry to read
+    private void readAsync(int numberOfEntriesToRead,
+                           AsyncCallbacks.ReadEntriesCallback 
readEntriesCallback) {
+        cursor.asyncReadEntries(numberOfEntriesToRead, readEntriesCallback, 
System.nanoTime());
+    }
+
+    @Override
+    public CompletableFuture<Void> closeAsync() {
+        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
+        cursor.asyncClose(new AsyncCallbacks.CloseCallback() {
+            @Override
+            public void closeComplete(Object ctx) {
+                try {
+                    managedLedger.close();
+                } catch (Exception e) {
+                    completableFuture.completeExceptionally(e);
+                }
+                completableFuture.complete(null);
+            }
+
+            @Override
+            public void closeFailed(ManagedLedgerException exception, Object 
ctx) {
+                completableFuture.completeExceptionally(exception);
+            }
+        }, null);
+        return completableFuture;
+    }
+
+    @Override
+    public CompletableFuture<Void> appendIndividualAck(TxnID txnID, 
List<MutablePair<PositionImpl, Integer>> positions) {
+        PendingAckMetadataEntry.Builder builder = 
PendingAckMetadataEntry.newBuilder();
+        builder.setPendingAckOp(PendingAckOp.ACK);
+        builder.setAckType(AckType.Individual);
+        positions.forEach(positionIntegerMutablePair -> {
+            PendingAckMetadata.Builder metadataBuilder = 
PendingAckMetadata.newBuilder();
+            PositionImpl position = positionIntegerMutablePair.getLeft();
+            int batchSize = positionIntegerMutablePair.getRight();
+            if (positionIntegerMutablePair.getLeft().getAckSet() != null) {
+                
metadataBuilder.addAllAckSet(SafeCollectionUtils.longArrayToList(position.getAckSet()));
+                metadataBuilder.setBatchSize(batchSize);
+            }
+            metadataBuilder.setLedgerId(position.getLedgerId());
+            metadataBuilder.setEntryId(position.getEntryId());
+            PendingAckMetadata pendingAckMetadata = metadataBuilder.build();
+            metadataBuilder.recycle();
+            builder.addPendingAckMetadata(pendingAckMetadata);
+        });
+        return appendCommon(builder, txnID);
+    }
+
+    @Override
+    public CompletableFuture<Void> appendCumulativeAck(TxnID txnID, 
PositionImpl position) {
+        PendingAckMetadataEntry.Builder builder = 
PendingAckMetadataEntry.newBuilder();
+        builder.setPendingAckOp(PendingAckOp.ACK);
+        builder.setAckType(AckType.Cumulative);
+        PendingAckMetadata.Builder metadataBuilder = 
PendingAckMetadata.newBuilder();
+        if (position.getAckSet() != null) {
+            
metadataBuilder.addAllAckSet(SafeCollectionUtils.longArrayToList(position.getAckSet()));
+        }
+        metadataBuilder.setLedgerId(position.getLedgerId());
+        metadataBuilder.setEntryId(position.getEntryId());
+        PendingAckMetadata pendingAckMetadata = metadataBuilder.build();
+        metadataBuilder.recycle();
+        builder.addPendingAckMetadata(pendingAckMetadata);
+        return appendCommon(builder, txnID);
+    }
+
+    @Override
+    public CompletableFuture<Void> appendCommitMark(TxnID txnID, AckType 
ackType) {
+        PendingAckMetadataEntry.Builder builder = 
PendingAckMetadataEntry.newBuilder();
+        builder.setPendingAckOp(PendingAckOp.COMMIT);
+        builder.setAckType(ackType);
+        return appendCommon(builder, txnID);
+    }
+
+    @Override
+    public CompletableFuture<Void> appendAbortMark(TxnID txnID, AckType 
ackType) {
+        PendingAckMetadataEntry.Builder builder = 
PendingAckMetadataEntry.newBuilder();
+        builder.setPendingAckOp(PendingAckOp.ABORT);
+        builder.setAckType(ackType);
+        return appendCommon(builder, txnID);
+    }
+
+    private CompletableFuture<Void> 
appendCommon(PendingAckMetadataEntry.Builder builder, TxnID txnID) {
+        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
+        builder.setTxnidLeastBits(txnID.getLeastSigBits());
+        builder.setTxnidMostBits(txnID.getMostSigBits());
+        PendingAckMetadataEntry pendingAckMetadataEntry = builder.build();
+        int transactionMetadataEntrySize = 
pendingAckMetadataEntry.getSerializedSize();
+        ByteBuf buf = 
PulsarByteBufAllocator.DEFAULT.buffer(transactionMetadataEntrySize, 
transactionMetadataEntrySize);
+        ByteBufCodedOutputStream outStream = ByteBufCodedOutputStream.get(buf);
+        try {
+            pendingAckMetadataEntry.writeTo(outStream);
+            managedLedger.asyncAddEntry(buf, new 
AsyncCallbacks.AddEntryCallback() {
+                @Override
+                public void addComplete(Position position, Object ctx) {
+                    if (log.isDebugEnabled()) {
+                        log.debug("[{}][{}] MLPendingAckStore message append 
success at {}, operation : {}",
+                                managedLedger.getName(), ctx, position, 
builder.getPendingAckOp());
+                    }
+                    builder.recycle();
+                    pendingAckMetadataEntry.recycle();
+                    buf.release();
+                    completableFuture.complete(null);
+                }
+
+                @Override
+                public void addFailed(ManagedLedgerException exception, Object 
ctx) {
+                    log.error("[{}][{}] MLPendingAckStore message append fail 
exception : {}, operation : {}",
+                            managedLedger.getName(), ctx, exception, 
builder.getPendingAckOp());
+                    builder.recycle();
+                    pendingAckMetadataEntry.recycle();
+                    buf.release();
+                    completableFuture.completeExceptionally(exception);
+                }
+            } , null);
+        } catch (Exception e) {
+            log.error("[{}] MLPendingAckStore message append fail exception : 
{}",
+                    managedLedger.getName(), e);
+            builder.recycle();
+            pendingAckMetadataEntry.recycle();
+            buf.release();
+            completableFuture.completeExceptionally(e);
+        } finally {
+            outStream.recycle();
+        }
+        return completableFuture;
+    }
+
+    class PendingAckReplay implements Runnable {
+
+        private final FillEntryQueueCallback fillEntryQueueCallback;
+        private final PendingAckReplyCallBack pendingAckReplyCallBack;
+
+        PendingAckReplay(PendingAckReplyCallBack pendingAckReplyCallBack) {
+            this.fillEntryQueueCallback = new FillEntryQueueCallback();
+            this.pendingAckReplyCallBack = pendingAckReplyCallBack;
+        }
+
+        @Override
+        public void run() {
+            while (lastConfirmedEntry.compareTo(currentLoadPosition) > 0) {
+                fillEntryQueueCallback.fillQueue();
+                Entry entry = entryQueue.poll();
+                if (entry != null) {
+                    ByteBuf buffer = entry.getDataBuffer();
+                    currentLoadPosition = 
PositionImpl.get(entry.getLedgerId(), entry.getEntryId());
+                    ByteBufCodedInputStream stream = 
ByteBufCodedInputStream.get(buffer);
+                    PendingAckMetadataEntry.Builder 
pendingAckMetadataEntryBuilder =
+                            PendingAckMetadataEntry.newBuilder();
+                    PendingAckMetadataEntry pendingAckMetadataEntry = null;
+                    try {
+                        pendingAckMetadataEntry =
+                                
pendingAckMetadataEntryBuilder.mergeFrom(stream, null).build();
+                        
pendingAckReplyCallBack.handleMetadataEntry(pendingAckMetadataEntry);
+                    } catch (Exception e) {
+                        if (pendingAckMetadataEntry != null) {
+                            log.error("TxnId : [{}:{}] MLPendingAckStore reply 
error!",
+                                    pendingAckMetadataEntry.getTxnidMostBits(),
+                                    
pendingAckMetadataEntry.getTxnidLeastBits(), e);
+                        } else {
+                            log.error("MLPendingAckStore reply error!", e);
+                        }
+                    }
+                    entry.release();
+                    if (pendingAckMetadataEntry != null) {
+                        pendingAckMetadataEntry.recycle();
+                    }
+                    pendingAckMetadataEntryBuilder.recycle();
+                    stream.recycle();
+                } else {
+                    try {
+                        Thread.sleep(1);
+                    } catch (InterruptedException e) {
+                        //no-op

Review comment:
       this is not good,
   the contract is to exit the current activity in case of InterrupedException 
or at least set `Thread.interrupted `flag

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckReplyCallBack.java
##########
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction.pendingack;
+
+import 
org.apache.pulsar.broker.transaction.proto.TransactionPendingAck.PendingAckMetadataEntry;
+
+/**
+ * Call back for pending ack reply.
+ */
+public interface PendingAckReplyCallBack {
+
+    /**
+     * Pending ack replay complete callback for pending ack store.
+     */
+    void replayComplete();

Review comment:
       is there any way to receive notification of a failure ?

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java
##########
@@ -0,0 +1,336 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction.pendingack.impl;
+
+import io.netty.buffer.ByteBuf;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import io.netty.util.Timer;
+import org.apache.bookkeeper.mledger.AsyncCallbacks;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.ManagedCursor;
+import org.apache.bookkeeper.mledger.ManagedLedger;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.commons.lang3.tuple.MutablePair;
+import org.apache.pulsar.broker.transaction.pendingack.PendingAckReplyCallBack;
+import org.apache.pulsar.broker.transaction.pendingack.PendingAckStore;
+import 
org.apache.pulsar.broker.transaction.proto.TransactionPendingAck.PendingAckMetadata;
+import 
org.apache.pulsar.broker.transaction.proto.TransactionPendingAck.PendingAckMetadataEntry;
+import 
org.apache.pulsar.broker.transaction.proto.TransactionPendingAck.PendingAckOp;
+import org.apache.pulsar.client.api.transaction.TxnID;
+import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.util.SafeCollectionUtils;
+import org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream;
+import org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream;
+import org.jctools.queues.MessagePassingQueue;
+import org.jctools.queues.SpscArrayQueue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The implement of the pending ack store by manageLedger.
+ */
+public class MLPendingAckStore implements PendingAckStore {
+
+
+    private final ManagedLedger managedLedger;
+
+    private final ManagedCursor cursor;
+
+    private static final String PENDING_ACK_STORE_SUFFIX = 
"-transaction-pendingack";
+
+    private static final String PENDING_ACK_STORE_CURSOR_NAME = "pendingack";
+
+    private final SpscArrayQueue<Entry> entryQueue;
+
+    //this is for replay
+    private final PositionImpl lastConfirmedEntry;
+
+    private PositionImpl currentLoadPosition;
+
+    private final Timer timer;
+
+    private final MLPendingAckStoreTimerTask mlPendingAckStoreTimerTask;
+
+    private final int intervalTime;
+
+    public MLPendingAckStore(ManagedLedger managedLedger, ManagedCursor cursor,
+                             Timer timer, ManagedCursor subManagedCursor, int 
maxIntervalTime, int minIntervalTime) {
+        this.managedLedger = managedLedger;
+        this.cursor = cursor;
+        this.currentLoadPosition = (PositionImpl) 
this.cursor.getMarkDeletedPosition();
+        this.entryQueue = new SpscArrayQueue<>(2000);
+        this.lastConfirmedEntry = (PositionImpl) 
managedLedger.getLastConfirmedEntry();
+        this.timer = timer;
+        this.intervalTime = minIntervalTime;
+        this.mlPendingAckStoreTimerTask = new 
MLPendingAckStoreTimerTask(cursor, managedLedger,
+                minIntervalTime, maxIntervalTime, subManagedCursor, 
this.timer);
+    }
+
+    @Override
+    public void replayAsync(PendingAckHandleImpl pendingAckHandle, 
ScheduledExecutorService transactionReplayExecutor) {
+        transactionReplayExecutor
+                .execute(new PendingAckReplay(new 
MLPendingAckReplyCallBack(this, pendingAckHandle)));
+    }
+
+    //TODO can control the number of entry to read
+    private void readAsync(int numberOfEntriesToRead,
+                           AsyncCallbacks.ReadEntriesCallback 
readEntriesCallback) {
+        cursor.asyncReadEntries(numberOfEntriesToRead, readEntriesCallback, 
System.nanoTime());
+    }
+
+    @Override
+    public CompletableFuture<Void> closeAsync() {
+        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
+        cursor.asyncClose(new AsyncCallbacks.CloseCallback() {
+            @Override
+            public void closeComplete(Object ctx) {
+                try {
+                    managedLedger.close();
+                } catch (Exception e) {
+                    completableFuture.completeExceptionally(e);
+                }
+                completableFuture.complete(null);
+            }
+
+            @Override
+            public void closeFailed(ManagedLedgerException exception, Object 
ctx) {
+                completableFuture.completeExceptionally(exception);
+            }
+        }, null);
+        return completableFuture;
+    }
+
+    @Override
+    public CompletableFuture<Void> appendIndividualAck(TxnID txnID, 
List<MutablePair<PositionImpl, Integer>> positions) {
+        PendingAckMetadataEntry.Builder builder = 
PendingAckMetadataEntry.newBuilder();
+        builder.setPendingAckOp(PendingAckOp.ACK);
+        builder.setAckType(AckType.Individual);
+        positions.forEach(positionIntegerMutablePair -> {
+            PendingAckMetadata.Builder metadataBuilder = 
PendingAckMetadata.newBuilder();
+            PositionImpl position = positionIntegerMutablePair.getLeft();
+            int batchSize = positionIntegerMutablePair.getRight();
+            if (positionIntegerMutablePair.getLeft().getAckSet() != null) {
+                
metadataBuilder.addAllAckSet(SafeCollectionUtils.longArrayToList(position.getAckSet()));
+                metadataBuilder.setBatchSize(batchSize);
+            }
+            metadataBuilder.setLedgerId(position.getLedgerId());
+            metadataBuilder.setEntryId(position.getEntryId());
+            PendingAckMetadata pendingAckMetadata = metadataBuilder.build();
+            metadataBuilder.recycle();
+            builder.addPendingAckMetadata(pendingAckMetadata);
+        });
+        return appendCommon(builder, txnID);
+    }
+
+    @Override
+    public CompletableFuture<Void> appendCumulativeAck(TxnID txnID, 
PositionImpl position) {
+        PendingAckMetadataEntry.Builder builder = 
PendingAckMetadataEntry.newBuilder();
+        builder.setPendingAckOp(PendingAckOp.ACK);
+        builder.setAckType(AckType.Cumulative);
+        PendingAckMetadata.Builder metadataBuilder = 
PendingAckMetadata.newBuilder();
+        if (position.getAckSet() != null) {
+            
metadataBuilder.addAllAckSet(SafeCollectionUtils.longArrayToList(position.getAckSet()));
+        }
+        metadataBuilder.setLedgerId(position.getLedgerId());
+        metadataBuilder.setEntryId(position.getEntryId());
+        PendingAckMetadata pendingAckMetadata = metadataBuilder.build();
+        metadataBuilder.recycle();
+        builder.addPendingAckMetadata(pendingAckMetadata);
+        return appendCommon(builder, txnID);
+    }
+
+    @Override
+    public CompletableFuture<Void> appendCommitMark(TxnID txnID, AckType 
ackType) {
+        PendingAckMetadataEntry.Builder builder = 
PendingAckMetadataEntry.newBuilder();
+        builder.setPendingAckOp(PendingAckOp.COMMIT);
+        builder.setAckType(ackType);
+        return appendCommon(builder, txnID);
+    }
+
+    @Override
+    public CompletableFuture<Void> appendAbortMark(TxnID txnID, AckType 
ackType) {
+        PendingAckMetadataEntry.Builder builder = 
PendingAckMetadataEntry.newBuilder();
+        builder.setPendingAckOp(PendingAckOp.ABORT);
+        builder.setAckType(ackType);
+        return appendCommon(builder, txnID);
+    }
+
+    private CompletableFuture<Void> 
appendCommon(PendingAckMetadataEntry.Builder builder, TxnID txnID) {
+        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
+        builder.setTxnidLeastBits(txnID.getLeastSigBits());
+        builder.setTxnidMostBits(txnID.getMostSigBits());
+        PendingAckMetadataEntry pendingAckMetadataEntry = builder.build();
+        int transactionMetadataEntrySize = 
pendingAckMetadataEntry.getSerializedSize();
+        ByteBuf buf = 
PulsarByteBufAllocator.DEFAULT.buffer(transactionMetadataEntrySize, 
transactionMetadataEntrySize);
+        ByteBufCodedOutputStream outStream = ByteBufCodedOutputStream.get(buf);
+        try {
+            pendingAckMetadataEntry.writeTo(outStream);
+            managedLedger.asyncAddEntry(buf, new 
AsyncCallbacks.AddEntryCallback() {
+                @Override
+                public void addComplete(Position position, Object ctx) {
+                    if (log.isDebugEnabled()) {
+                        log.debug("[{}][{}] MLPendingAckStore message append 
success at {}, operation : {}",
+                                managedLedger.getName(), ctx, position, 
builder.getPendingAckOp());
+                    }
+                    builder.recycle();
+                    pendingAckMetadataEntry.recycle();
+                    buf.release();
+                    completableFuture.complete(null);
+                }
+
+                @Override
+                public void addFailed(ManagedLedgerException exception, Object 
ctx) {
+                    log.error("[{}][{}] MLPendingAckStore message append fail 
exception : {}, operation : {}",
+                            managedLedger.getName(), ctx, exception, 
builder.getPendingAckOp());
+                    builder.recycle();
+                    pendingAckMetadataEntry.recycle();
+                    buf.release();
+                    completableFuture.completeExceptionally(exception);
+                }
+            } , null);
+        } catch (Exception e) {
+            log.error("[{}] MLPendingAckStore message append fail exception : 
{}",
+                    managedLedger.getName(), e);
+            builder.recycle();
+            pendingAckMetadataEntry.recycle();
+            buf.release();
+            completableFuture.completeExceptionally(e);
+        } finally {
+            outStream.recycle();
+        }
+        return completableFuture;
+    }
+
+    class PendingAckReplay implements Runnable {
+
+        private final FillEntryQueueCallback fillEntryQueueCallback;
+        private final PendingAckReplyCallBack pendingAckReplyCallBack;
+
+        PendingAckReplay(PendingAckReplyCallBack pendingAckReplyCallBack) {
+            this.fillEntryQueueCallback = new FillEntryQueueCallback();
+            this.pendingAckReplyCallBack = pendingAckReplyCallBack;
+        }
+
+        @Override
+        public void run() {
+            while (lastConfirmedEntry.compareTo(currentLoadPosition) > 0) {
+                fillEntryQueueCallback.fillQueue();
+                Entry entry = entryQueue.poll();
+                if (entry != null) {
+                    ByteBuf buffer = entry.getDataBuffer();
+                    currentLoadPosition = 
PositionImpl.get(entry.getLedgerId(), entry.getEntryId());
+                    ByteBufCodedInputStream stream = 
ByteBufCodedInputStream.get(buffer);
+                    PendingAckMetadataEntry.Builder 
pendingAckMetadataEntryBuilder =
+                            PendingAckMetadataEntry.newBuilder();
+                    PendingAckMetadataEntry pendingAckMetadataEntry = null;
+                    try {
+                        pendingAckMetadataEntry =
+                                
pendingAckMetadataEntryBuilder.mergeFrom(stream, null).build();
+                        
pendingAckReplyCallBack.handleMetadataEntry(pendingAckMetadataEntry);
+                    } catch (Exception e) {
+                        if (pendingAckMetadataEntry != null) {
+                            log.error("TxnId : [{}:{}] MLPendingAckStore reply 
error!",
+                                    pendingAckMetadataEntry.getTxnidMostBits(),
+                                    
pendingAckMetadataEntry.getTxnidLeastBits(), e);
+                        } else {
+                            log.error("MLPendingAckStore reply error!", e);
+                        }
+                    }
+                    entry.release();
+                    if (pendingAckMetadataEntry != null) {
+                        pendingAckMetadataEntry.recycle();
+                    }
+                    pendingAckMetadataEntryBuilder.recycle();
+                    stream.recycle();
+                } else {
+                    try {
+                        Thread.sleep(1);
+                    } catch (InterruptedException e) {
+                        //no-op
+                    }
+                }
+            }
+            pendingAckReplyCallBack.replayComplete();
+        }
+    }
+
+    class FillEntryQueueCallback implements AsyncCallbacks.ReadEntriesCallback 
{
+
+        private final AtomicLong outstandingReadsRequests = new AtomicLong(0);
+
+        public FillEntryQueueCallback() {
+        }
+
+        void fillQueue() {
+            if (entryQueue.size() < entryQueue.capacity() && 
outstandingReadsRequests.get() == 0) {
+                if (cursor.hasMoreEntries()) {
+                    outstandingReadsRequests.incrementAndGet();
+                    readAsync(100, this);
+                }
+            }
+        }
+
+        @Override
+        public void readEntriesComplete(List<Entry> entries, Object ctx) {
+            entryQueue.fill(new MessagePassingQueue.Supplier<Entry>() {
+                private int i = 0;
+                @Override
+                public Entry get() {
+                    Entry entry = entries.get(i);
+                    i++;
+                    return entry;
+                }
+            }, entries.size());
+
+            outstandingReadsRequests.decrementAndGet();
+        }
+
+        @Override
+        public void readEntriesFailed(ManagedLedgerException exception, Object 
ctx) {
+            log.error("MLPendingAckStore stat reply fail!", exception);
+            outstandingReadsRequests.decrementAndGet();

Review comment:
       what is the recovery story for this error ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to