codelipenghui commented on a change in pull request #8256: URL: https://github.com/apache/pulsar/pull/8256#discussion_r510573634
########## File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java ########## @@ -0,0 +1,363 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.transaction.pendingack.impl; + +import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.mledger.Position; +import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl; +import org.apache.bookkeeper.mledger.impl.PositionImpl; +import org.apache.bookkeeper.util.collections.ConcurrentLongLongPairHashMap; +import org.apache.pulsar.broker.service.Consumer; +import org.apache.pulsar.broker.service.persistent.PersistentSubscription; +import org.apache.pulsar.broker.transaction.pendingack.PendingAckHandle; +import org.apache.pulsar.client.api.transaction.TxnID; +import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType; +import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; +import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet; +import org.apache.pulsar.transaction.common.exception.TransactionConflictException; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; + +import static com.google.common.base.Preconditions.checkArgument; + +@Slf4j +public class PendingAckHandleImpl implements PendingAckHandle { + + // Map to keep track of message ack by each txn. + private ConcurrentOpenHashMap<TxnID, ConcurrentOpenHashMap<Position, Position>> pendingIndividualAckMessagesMap; + + // Messages acked by ongoing transaction, pending transaction commit to materialize the acks. For faster look up. + // Using hashset as a message should only be acked once by one transaction. + private ConcurrentOpenHashMap<Position, Position> pendingAckMessages; + + private ConcurrentOpenHashMap<Position, ConcurrentOpenHashSet<TxnID>> pendingAckBatchMessageMap; + + // Message cumulative acked by ongoing transaction, pending transaction commit to materialize the ack. + // Only one transaction can cumulative ack. + // This parameter only keep the the largest Position it cumulative ack,as any Position smaller will also be covered. + private volatile Position pendingCumulativeAckMessage; + + private static final AtomicReferenceFieldUpdater<PendingAckHandleImpl, Position> POSITION_UPDATER = + AtomicReferenceFieldUpdater.newUpdater(PendingAckHandleImpl.class, Position.class, + "pendingCumulativeAckMessage"); + + // ID of transaction currently using cumulative ack. + private volatile TxnID pendingCumulativeAckTxnId; + + private static final AtomicReferenceFieldUpdater<PendingAckHandleImpl, TxnID> PENDING_CUMULATIVE_ACK_TXNID_UPDATER = + AtomicReferenceFieldUpdater.newUpdater(PendingAckHandleImpl.class, TxnID.class, + "pendingCumulativeAckTxnId"); + + private PersistentSubscription persistentSubscription; + + private final String topicName; + + private final String subName; + + public PendingAckHandleImpl(String topicName, String subName) { + this.topicName = topicName; + this.subName = subName; + } + + @Override + public synchronized CompletableFuture<Void> acknowledgeMessage(TxnID txnId, + List<Position> positions, AckType ackType) { + checkArgument(txnId != null, "TransactionID can not be null."); + CompletableFuture<Void> completableFuture = new CompletableFuture<>(); + checkArgument(positions.get(0) instanceof PositionImpl); + PositionImpl position = (PositionImpl) positions.get(0); + if (AckType.Cumulative == ackType) { + + if (positions.size() != 1) { + String errorMsg = "[" + topicName + "][" + subName + "] Transaction:" + txnId + + " invalid cumulative ack received with multiple message ids."; + log.error(errorMsg); + completableFuture.completeExceptionally(new TransactionConflictException(errorMsg)); Review comment: It's not a TransactionConflictException exception right? we must keep the exception handling clear. I think you can return a NotAllowedException. ########## File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java ########## @@ -0,0 +1,363 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.transaction.pendingack.impl; + +import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.mledger.Position; +import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl; +import org.apache.bookkeeper.mledger.impl.PositionImpl; +import org.apache.bookkeeper.util.collections.ConcurrentLongLongPairHashMap; +import org.apache.pulsar.broker.service.Consumer; +import org.apache.pulsar.broker.service.persistent.PersistentSubscription; +import org.apache.pulsar.broker.transaction.pendingack.PendingAckHandle; +import org.apache.pulsar.client.api.transaction.TxnID; +import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType; +import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; +import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet; +import org.apache.pulsar.transaction.common.exception.TransactionConflictException; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; + +import static com.google.common.base.Preconditions.checkArgument; + +@Slf4j +public class PendingAckHandleImpl implements PendingAckHandle { + + // Map to keep track of message ack by each txn. + private ConcurrentOpenHashMap<TxnID, ConcurrentOpenHashMap<Position, Position>> pendingIndividualAckMessagesMap; + + // Messages acked by ongoing transaction, pending transaction commit to materialize the acks. For faster look up. + // Using hashset as a message should only be acked once by one transaction. + private ConcurrentOpenHashMap<Position, Position> pendingAckMessages; + + private ConcurrentOpenHashMap<Position, ConcurrentOpenHashSet<TxnID>> pendingAckBatchMessageMap; + + // Message cumulative acked by ongoing transaction, pending transaction commit to materialize the ack. + // Only one transaction can cumulative ack. + // This parameter only keep the the largest Position it cumulative ack,as any Position smaller will also be covered. + private volatile Position pendingCumulativeAckMessage; + + private static final AtomicReferenceFieldUpdater<PendingAckHandleImpl, Position> POSITION_UPDATER = + AtomicReferenceFieldUpdater.newUpdater(PendingAckHandleImpl.class, Position.class, + "pendingCumulativeAckMessage"); + + // ID of transaction currently using cumulative ack. + private volatile TxnID pendingCumulativeAckTxnId; + + private static final AtomicReferenceFieldUpdater<PendingAckHandleImpl, TxnID> PENDING_CUMULATIVE_ACK_TXNID_UPDATER = + AtomicReferenceFieldUpdater.newUpdater(PendingAckHandleImpl.class, TxnID.class, + "pendingCumulativeAckTxnId"); + + private PersistentSubscription persistentSubscription; + + private final String topicName; + + private final String subName; + + public PendingAckHandleImpl(String topicName, String subName) { + this.topicName = topicName; + this.subName = subName; + } + + @Override + public synchronized CompletableFuture<Void> acknowledgeMessage(TxnID txnId, + List<Position> positions, AckType ackType) { + checkArgument(txnId != null, "TransactionID can not be null."); Review comment: checkArgument will throw a RuntimeException, since the method returns `CompletableFuture<Void>`, it's better to complete the future with an exception. ########## File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckHandle.java ########## @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.transaction.pendingack; + +import org.apache.bookkeeper.mledger.Position; +import org.apache.bookkeeper.mledger.impl.PositionImpl; +import org.apache.pulsar.broker.service.Consumer; +import org.apache.pulsar.broker.service.persistent.PersistentSubscription; +import org.apache.pulsar.client.api.transaction.TxnID; +import org.apache.pulsar.common.api.proto.PulsarApi; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; + +public interface PendingAckHandle { + + /** + * Acknowledge message(s) for an ongoing transaction. + * <p> + * It can be of {@link PulsarApi.CommandAck.AckType#Individual} or {@link PulsarApi.CommandAck.AckType#Cumulative}. Single messages acked by ongoing + * transaction will be put in pending_ack state and only marked as deleted after transaction is committed. + * <p> + * Only one transaction is allowed to do cumulative ack on a subscription at a given time. + * If a transaction do multiple cumulative ack, only the one with largest position determined by + * {@link PositionImpl#compareTo(PositionImpl)} will be kept as it cover all position smaller than it. + * <p> + * If an ongoing transaction cumulative acked a message and then try to ack single message which is + * smaller than that one it cumulative acked, it'll succeed. + * <p> + * If transaction is aborted all messages acked by it will be put back to pending state. + * + * @param txnId TransactionID of an ongoing transaction trying to sck message. + * @param positions {@link Position}(s) it try to ack. + * @param ackType {@link PulsarApi.CommandAck.AckType}. + * cumulative ack or try to single ack message already acked by any ongoing transaction. + * @return the future of this operation. + */ + CompletableFuture<Void> acknowledgeMessage(TxnID txnId, List<Position> positions, PulsarApi.CommandAck.AckType ackType); + + /** + * Commit a transaction. + * + * @param txnId {@link TxnID} to identify the transaction. + * @param properties Additional user-defined properties that can be associated with a particular cursor position. + * @return the future of this operation. + */ + CompletableFuture<Void> commitTxn(TxnID txnId, Map<String,Long> properties); + + /** + * Abort a transaction. + * + * @param txnId {@link TxnID} to identify the transaction. + * @param consumer {@link Consumer} which aborting transaction. + * @return the future of this operation. + */ + CompletableFuture<Void> abortTxn(TxnID txnId, Consumer consumer); + + void setPersistentSubscription(PersistentSubscription persistentSubscription); Review comment: Looks confused that introduces this method in the interface of PendingAckHandle. It's better to move it to the implementation internal. ########## File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckHandle.java ########## @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.transaction.pendingack; + +import org.apache.bookkeeper.mledger.Position; +import org.apache.bookkeeper.mledger.impl.PositionImpl; +import org.apache.pulsar.broker.service.Consumer; +import org.apache.pulsar.broker.service.persistent.PersistentSubscription; +import org.apache.pulsar.client.api.transaction.TxnID; +import org.apache.pulsar.common.api.proto.PulsarApi; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; + +public interface PendingAckHandle { + + /** + * Acknowledge message(s) for an ongoing transaction. + * <p> + * It can be of {@link PulsarApi.CommandAck.AckType#Individual} or {@link PulsarApi.CommandAck.AckType#Cumulative}. Single messages acked by ongoing + * transaction will be put in pending_ack state and only marked as deleted after transaction is committed. + * <p> + * Only one transaction is allowed to do cumulative ack on a subscription at a given time. + * If a transaction do multiple cumulative ack, only the one with largest position determined by + * {@link PositionImpl#compareTo(PositionImpl)} will be kept as it cover all position smaller than it. + * <p> + * If an ongoing transaction cumulative acked a message and then try to ack single message which is + * smaller than that one it cumulative acked, it'll succeed. + * <p> + * If transaction is aborted all messages acked by it will be put back to pending state. Review comment: Is it related to the acknowledgeMessage method? ########## File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckHandle.java ########## @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.transaction.pendingack; + +import org.apache.bookkeeper.mledger.Position; +import org.apache.bookkeeper.mledger.impl.PositionImpl; +import org.apache.pulsar.broker.service.Consumer; +import org.apache.pulsar.broker.service.persistent.PersistentSubscription; +import org.apache.pulsar.client.api.transaction.TxnID; +import org.apache.pulsar.common.api.proto.PulsarApi; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; + +public interface PendingAckHandle { + + /** + * Acknowledge message(s) for an ongoing transaction. + * <p> + * It can be of {@link PulsarApi.CommandAck.AckType#Individual} or {@link PulsarApi.CommandAck.AckType#Cumulative}. Single messages acked by ongoing + * transaction will be put in pending_ack state and only marked as deleted after transaction is committed. + * <p> + * Only one transaction is allowed to do cumulative ack on a subscription at a given time. + * If a transaction do multiple cumulative ack, only the one with largest position determined by + * {@link PositionImpl#compareTo(PositionImpl)} will be kept as it cover all position smaller than it. Review comment: Does this mean the one transaction only can allow one cumulative ack? I think we can allow a transaction to do cumulative ack multiple times, we only should ensure the new cumulative ack position is greater than the old one, right? ########## File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java ########## @@ -0,0 +1,363 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.transaction.pendingack.impl; + +import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.mledger.Position; +import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl; +import org.apache.bookkeeper.mledger.impl.PositionImpl; +import org.apache.bookkeeper.util.collections.ConcurrentLongLongPairHashMap; +import org.apache.pulsar.broker.service.Consumer; +import org.apache.pulsar.broker.service.persistent.PersistentSubscription; +import org.apache.pulsar.broker.transaction.pendingack.PendingAckHandle; +import org.apache.pulsar.client.api.transaction.TxnID; +import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType; +import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; +import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet; +import org.apache.pulsar.transaction.common.exception.TransactionConflictException; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; + +import static com.google.common.base.Preconditions.checkArgument; + +@Slf4j +public class PendingAckHandleImpl implements PendingAckHandle { + + // Map to keep track of message ack by each txn. + private ConcurrentOpenHashMap<TxnID, ConcurrentOpenHashMap<Position, Position>> pendingIndividualAckMessagesMap; + + // Messages acked by ongoing transaction, pending transaction commit to materialize the acks. For faster look up. + // Using hashset as a message should only be acked once by one transaction. + private ConcurrentOpenHashMap<Position, Position> pendingAckMessages; + + private ConcurrentOpenHashMap<Position, ConcurrentOpenHashSet<TxnID>> pendingAckBatchMessageMap; + + // Message cumulative acked by ongoing transaction, pending transaction commit to materialize the ack. + // Only one transaction can cumulative ack. + // This parameter only keep the the largest Position it cumulative ack,as any Position smaller will also be covered. + private volatile Position pendingCumulativeAckMessage; + + private static final AtomicReferenceFieldUpdater<PendingAckHandleImpl, Position> POSITION_UPDATER = + AtomicReferenceFieldUpdater.newUpdater(PendingAckHandleImpl.class, Position.class, + "pendingCumulativeAckMessage"); + + // ID of transaction currently using cumulative ack. + private volatile TxnID pendingCumulativeAckTxnId; + + private static final AtomicReferenceFieldUpdater<PendingAckHandleImpl, TxnID> PENDING_CUMULATIVE_ACK_TXNID_UPDATER = + AtomicReferenceFieldUpdater.newUpdater(PendingAckHandleImpl.class, TxnID.class, + "pendingCumulativeAckTxnId"); + + private PersistentSubscription persistentSubscription; + + private final String topicName; + + private final String subName; + + public PendingAckHandleImpl(String topicName, String subName) { + this.topicName = topicName; + this.subName = subName; + } + + @Override + public synchronized CompletableFuture<Void> acknowledgeMessage(TxnID txnId, + List<Position> positions, AckType ackType) { + checkArgument(txnId != null, "TransactionID can not be null."); + CompletableFuture<Void> completableFuture = new CompletableFuture<>(); + checkArgument(positions.get(0) instanceof PositionImpl); + PositionImpl position = (PositionImpl) positions.get(0); + if (AckType.Cumulative == ackType) { + + if (positions.size() != 1) { + String errorMsg = "[" + topicName + "][" + subName + "] Transaction:" + txnId + + " invalid cumulative ack received with multiple message ids."; + log.error(errorMsg); + completableFuture.completeExceptionally(new TransactionConflictException(errorMsg)); + return completableFuture; + } + + checkArgument(positions.get(0) instanceof PositionImpl); + if (position.compareTo((PositionImpl) persistentSubscription.getCursor().getMarkDeletedPosition()) <= 0) { + String errorMsg = "[" + topicName + "][" + subName + "] Transaction:" + txnId + + " try to cumulative ack position: " + position + " within range of cursor's " + + "markDeletePosition: " + persistentSubscription.getCursor().getMarkDeletedPosition(); + log.error(errorMsg); + completableFuture.completeExceptionally(new TransactionConflictException(errorMsg)); + return completableFuture; + } + + if (log.isDebugEnabled()) { + log.debug("[{}][{}] TxnID:[{}] Cumulative ack on {}.", topicName, subName, txnId.toString(), position); + } + if (this.pendingCumulativeAckTxnId == null) { + // Only set pendingCumulativeAckTxnId if no transaction is doing cumulative ack. + PENDING_CUMULATIVE_ACK_TXNID_UPDATER.set(this, txnId); + POSITION_UPDATER.set(this, position); Review comment: There is no atomic guarantee for these two operations. ########## File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java ########## @@ -0,0 +1,363 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.transaction.pendingack.impl; + +import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.mledger.Position; +import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl; +import org.apache.bookkeeper.mledger.impl.PositionImpl; +import org.apache.bookkeeper.util.collections.ConcurrentLongLongPairHashMap; +import org.apache.pulsar.broker.service.Consumer; +import org.apache.pulsar.broker.service.persistent.PersistentSubscription; +import org.apache.pulsar.broker.transaction.pendingack.PendingAckHandle; +import org.apache.pulsar.client.api.transaction.TxnID; +import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType; +import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; +import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet; +import org.apache.pulsar.transaction.common.exception.TransactionConflictException; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; + +import static com.google.common.base.Preconditions.checkArgument; + +@Slf4j +public class PendingAckHandleImpl implements PendingAckHandle { + + // Map to keep track of message ack by each txn. + private ConcurrentOpenHashMap<TxnID, ConcurrentOpenHashMap<Position, Position>> pendingIndividualAckMessagesMap; + + // Messages acked by ongoing transaction, pending transaction commit to materialize the acks. For faster look up. + // Using hashset as a message should only be acked once by one transaction. + private ConcurrentOpenHashMap<Position, Position> pendingAckMessages; + + private ConcurrentOpenHashMap<Position, ConcurrentOpenHashSet<TxnID>> pendingAckBatchMessageMap; + + // Message cumulative acked by ongoing transaction, pending transaction commit to materialize the ack. + // Only one transaction can cumulative ack. + // This parameter only keep the the largest Position it cumulative ack,as any Position smaller will also be covered. + private volatile Position pendingCumulativeAckMessage; + + private static final AtomicReferenceFieldUpdater<PendingAckHandleImpl, Position> POSITION_UPDATER = + AtomicReferenceFieldUpdater.newUpdater(PendingAckHandleImpl.class, Position.class, + "pendingCumulativeAckMessage"); + + // ID of transaction currently using cumulative ack. + private volatile TxnID pendingCumulativeAckTxnId; + + private static final AtomicReferenceFieldUpdater<PendingAckHandleImpl, TxnID> PENDING_CUMULATIVE_ACK_TXNID_UPDATER = + AtomicReferenceFieldUpdater.newUpdater(PendingAckHandleImpl.class, TxnID.class, + "pendingCumulativeAckTxnId"); + + private PersistentSubscription persistentSubscription; + + private final String topicName; + + private final String subName; + + public PendingAckHandleImpl(String topicName, String subName) { + this.topicName = topicName; + this.subName = subName; + } + + @Override + public synchronized CompletableFuture<Void> acknowledgeMessage(TxnID txnId, + List<Position> positions, AckType ackType) { + checkArgument(txnId != null, "TransactionID can not be null."); + CompletableFuture<Void> completableFuture = new CompletableFuture<>(); + checkArgument(positions.get(0) instanceof PositionImpl); + PositionImpl position = (PositionImpl) positions.get(0); + if (AckType.Cumulative == ackType) { + + if (positions.size() != 1) { + String errorMsg = "[" + topicName + "][" + subName + "] Transaction:" + txnId + + " invalid cumulative ack received with multiple message ids."; + log.error(errorMsg); + completableFuture.completeExceptionally(new TransactionConflictException(errorMsg)); + return completableFuture; + } + + checkArgument(positions.get(0) instanceof PositionImpl); + if (position.compareTo((PositionImpl) persistentSubscription.getCursor().getMarkDeletedPosition()) <= 0) { + String errorMsg = "[" + topicName + "][" + subName + "] Transaction:" + txnId + + " try to cumulative ack position: " + position + " within range of cursor's " + + "markDeletePosition: " + persistentSubscription.getCursor().getMarkDeletedPosition(); + log.error(errorMsg); + completableFuture.completeExceptionally(new TransactionConflictException(errorMsg)); + return completableFuture; + } + + if (log.isDebugEnabled()) { + log.debug("[{}][{}] TxnID:[{}] Cumulative ack on {}.", topicName, subName, txnId.toString(), position); + } + if (this.pendingCumulativeAckTxnId == null) { + // Only set pendingCumulativeAckTxnId if no transaction is doing cumulative ack. + PENDING_CUMULATIVE_ACK_TXNID_UPDATER.set(this, txnId); + POSITION_UPDATER.set(this, position); + } else if (this.pendingCumulativeAckTxnId.equals(txnId) + && position.compareToWithAckSet((PositionImpl) pendingCumulativeAckMessage) > 0) { + // Only set pendingCumulativeAckTxnId if no transaction is doing cumulative ack. + PENDING_CUMULATIVE_ACK_TXNID_UPDATER.set(this, txnId); + POSITION_UPDATER.set(this, position); Review comment: Same as above comment. ########## File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java ########## @@ -0,0 +1,363 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.transaction.pendingack.impl; + +import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.mledger.Position; +import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl; +import org.apache.bookkeeper.mledger.impl.PositionImpl; +import org.apache.bookkeeper.util.collections.ConcurrentLongLongPairHashMap; +import org.apache.pulsar.broker.service.Consumer; +import org.apache.pulsar.broker.service.persistent.PersistentSubscription; +import org.apache.pulsar.broker.transaction.pendingack.PendingAckHandle; +import org.apache.pulsar.client.api.transaction.TxnID; +import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType; +import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; +import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet; +import org.apache.pulsar.transaction.common.exception.TransactionConflictException; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; + +import static com.google.common.base.Preconditions.checkArgument; + +@Slf4j +public class PendingAckHandleImpl implements PendingAckHandle { + + // Map to keep track of message ack by each txn. + private ConcurrentOpenHashMap<TxnID, ConcurrentOpenHashMap<Position, Position>> pendingIndividualAckMessagesMap; + + // Messages acked by ongoing transaction, pending transaction commit to materialize the acks. For faster look up. + // Using hashset as a message should only be acked once by one transaction. + private ConcurrentOpenHashMap<Position, Position> pendingAckMessages; + + private ConcurrentOpenHashMap<Position, ConcurrentOpenHashSet<TxnID>> pendingAckBatchMessageMap; + + // Message cumulative acked by ongoing transaction, pending transaction commit to materialize the ack. + // Only one transaction can cumulative ack. + // This parameter only keep the the largest Position it cumulative ack,as any Position smaller will also be covered. + private volatile Position pendingCumulativeAckMessage; + + private static final AtomicReferenceFieldUpdater<PendingAckHandleImpl, Position> POSITION_UPDATER = + AtomicReferenceFieldUpdater.newUpdater(PendingAckHandleImpl.class, Position.class, + "pendingCumulativeAckMessage"); + + // ID of transaction currently using cumulative ack. + private volatile TxnID pendingCumulativeAckTxnId; + + private static final AtomicReferenceFieldUpdater<PendingAckHandleImpl, TxnID> PENDING_CUMULATIVE_ACK_TXNID_UPDATER = + AtomicReferenceFieldUpdater.newUpdater(PendingAckHandleImpl.class, TxnID.class, + "pendingCumulativeAckTxnId"); + + private PersistentSubscription persistentSubscription; + + private final String topicName; + + private final String subName; + + public PendingAckHandleImpl(String topicName, String subName) { + this.topicName = topicName; + this.subName = subName; + } + + @Override + public synchronized CompletableFuture<Void> acknowledgeMessage(TxnID txnId, Review comment: Looks this method is complicated, you can split it into 2 parts, acknowledgeMessageIndividual and acknowledgeMessageCumulative. ``` if (AckType.Cumulative == ackType) { acknowledgeMessageCumulative(); } else { acknowledgeMessageIndividual(); } ``` ########## File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java ########## @@ -331,7 +332,8 @@ private PersistentSubscription createPersistentSubscription(String subscriptionN if (subscriptionName.equals(Compactor.COMPACTION_SUBSCRIPTION)) { return new CompactorSubscription(this, compactedTopic, subscriptionName, cursor); } else { - return new PersistentSubscription(this, subscriptionName, cursor, replicated); + return new PersistentSubscription(this, subscriptionName, cursor, + replicated, new PendingAckHandleImpl(topic, subscriptionName)); Review comment: The PendingAckHandle is an internal component of the subscription, so seems it's better to instantiate internal. ########## File path: managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/PositionImpl.java ########## @@ -121,6 +180,10 @@ public boolean equals(Object obj) { return false; } + public boolean isBatchPosition() { + return ackSet != null; + } Review comment: Do you mean `hasAckSet()`? ########## File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java ########## @@ -0,0 +1,363 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.transaction.pendingack.impl; + +import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.mledger.Position; +import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl; +import org.apache.bookkeeper.mledger.impl.PositionImpl; +import org.apache.bookkeeper.util.collections.ConcurrentLongLongPairHashMap; +import org.apache.pulsar.broker.service.Consumer; +import org.apache.pulsar.broker.service.persistent.PersistentSubscription; +import org.apache.pulsar.broker.transaction.pendingack.PendingAckHandle; +import org.apache.pulsar.client.api.transaction.TxnID; +import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType; +import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; +import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet; +import org.apache.pulsar.transaction.common.exception.TransactionConflictException; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; + +import static com.google.common.base.Preconditions.checkArgument; + +@Slf4j +public class PendingAckHandleImpl implements PendingAckHandle { + + // Map to keep track of message ack by each txn. + private ConcurrentOpenHashMap<TxnID, ConcurrentOpenHashMap<Position, Position>> pendingIndividualAckMessagesMap; + + // Messages acked by ongoing transaction, pending transaction commit to materialize the acks. For faster look up. + // Using hashset as a message should only be acked once by one transaction. + private ConcurrentOpenHashMap<Position, Position> pendingAckMessages; + + private ConcurrentOpenHashMap<Position, ConcurrentOpenHashSet<TxnID>> pendingAckBatchMessageMap; + + // Message cumulative acked by ongoing transaction, pending transaction commit to materialize the ack. + // Only one transaction can cumulative ack. + // This parameter only keep the the largest Position it cumulative ack,as any Position smaller will also be covered. + private volatile Position pendingCumulativeAckMessage; + + private static final AtomicReferenceFieldUpdater<PendingAckHandleImpl, Position> POSITION_UPDATER = + AtomicReferenceFieldUpdater.newUpdater(PendingAckHandleImpl.class, Position.class, + "pendingCumulativeAckMessage"); + + // ID of transaction currently using cumulative ack. + private volatile TxnID pendingCumulativeAckTxnId; + + private static final AtomicReferenceFieldUpdater<PendingAckHandleImpl, TxnID> PENDING_CUMULATIVE_ACK_TXNID_UPDATER = + AtomicReferenceFieldUpdater.newUpdater(PendingAckHandleImpl.class, TxnID.class, + "pendingCumulativeAckTxnId"); + + private PersistentSubscription persistentSubscription; + + private final String topicName; + + private final String subName; + + public PendingAckHandleImpl(String topicName, String subName) { + this.topicName = topicName; + this.subName = subName; + } + + @Override + public synchronized CompletableFuture<Void> acknowledgeMessage(TxnID txnId, + List<Position> positions, AckType ackType) { + checkArgument(txnId != null, "TransactionID can not be null."); + CompletableFuture<Void> completableFuture = new CompletableFuture<>(); + checkArgument(positions.get(0) instanceof PositionImpl); + PositionImpl position = (PositionImpl) positions.get(0); + if (AckType.Cumulative == ackType) { + + if (positions.size() != 1) { + String errorMsg = "[" + topicName + "][" + subName + "] Transaction:" + txnId + + " invalid cumulative ack received with multiple message ids."; + log.error(errorMsg); + completableFuture.completeExceptionally(new TransactionConflictException(errorMsg)); + return completableFuture; + } + + checkArgument(positions.get(0) instanceof PositionImpl); + if (position.compareTo((PositionImpl) persistentSubscription.getCursor().getMarkDeletedPosition()) <= 0) { + String errorMsg = "[" + topicName + "][" + subName + "] Transaction:" + txnId + + " try to cumulative ack position: " + position + " within range of cursor's " + + "markDeletePosition: " + persistentSubscription.getCursor().getMarkDeletedPosition(); + log.error(errorMsg); Review comment: It's easy to achieve by using log pattern log.error("[{}] [{}] Transaction:", topicName, subName). ########## File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java ########## @@ -0,0 +1,363 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.transaction.pendingack.impl; + +import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.mledger.Position; +import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl; +import org.apache.bookkeeper.mledger.impl.PositionImpl; +import org.apache.bookkeeper.util.collections.ConcurrentLongLongPairHashMap; +import org.apache.pulsar.broker.service.Consumer; +import org.apache.pulsar.broker.service.persistent.PersistentSubscription; +import org.apache.pulsar.broker.transaction.pendingack.PendingAckHandle; +import org.apache.pulsar.client.api.transaction.TxnID; +import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType; +import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; +import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet; +import org.apache.pulsar.transaction.common.exception.TransactionConflictException; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; + +import static com.google.common.base.Preconditions.checkArgument; + +@Slf4j +public class PendingAckHandleImpl implements PendingAckHandle { + + // Map to keep track of message ack by each txn. + private ConcurrentOpenHashMap<TxnID, ConcurrentOpenHashMap<Position, Position>> pendingIndividualAckMessagesMap; + + // Messages acked by ongoing transaction, pending transaction commit to materialize the acks. For faster look up. + // Using hashset as a message should only be acked once by one transaction. + private ConcurrentOpenHashMap<Position, Position> pendingAckMessages; + + private ConcurrentOpenHashMap<Position, ConcurrentOpenHashSet<TxnID>> pendingAckBatchMessageMap; + + // Message cumulative acked by ongoing transaction, pending transaction commit to materialize the ack. + // Only one transaction can cumulative ack. + // This parameter only keep the the largest Position it cumulative ack,as any Position smaller will also be covered. + private volatile Position pendingCumulativeAckMessage; + + private static final AtomicReferenceFieldUpdater<PendingAckHandleImpl, Position> POSITION_UPDATER = + AtomicReferenceFieldUpdater.newUpdater(PendingAckHandleImpl.class, Position.class, + "pendingCumulativeAckMessage"); + + // ID of transaction currently using cumulative ack. + private volatile TxnID pendingCumulativeAckTxnId; + + private static final AtomicReferenceFieldUpdater<PendingAckHandleImpl, TxnID> PENDING_CUMULATIVE_ACK_TXNID_UPDATER = + AtomicReferenceFieldUpdater.newUpdater(PendingAckHandleImpl.class, TxnID.class, + "pendingCumulativeAckTxnId"); + + private PersistentSubscription persistentSubscription; + + private final String topicName; + + private final String subName; + + public PendingAckHandleImpl(String topicName, String subName) { + this.topicName = topicName; + this.subName = subName; + } + + @Override + public synchronized CompletableFuture<Void> acknowledgeMessage(TxnID txnId, + List<Position> positions, AckType ackType) { + checkArgument(txnId != null, "TransactionID can not be null."); + CompletableFuture<Void> completableFuture = new CompletableFuture<>(); + checkArgument(positions.get(0) instanceof PositionImpl); + PositionImpl position = (PositionImpl) positions.get(0); + if (AckType.Cumulative == ackType) { + + if (positions.size() != 1) { + String errorMsg = "[" + topicName + "][" + subName + "] Transaction:" + txnId + + " invalid cumulative ack received with multiple message ids."; + log.error(errorMsg); + completableFuture.completeExceptionally(new TransactionConflictException(errorMsg)); + return completableFuture; + } + + checkArgument(positions.get(0) instanceof PositionImpl); Review comment: Please ensure the method doesn't throw any exception. Instead, complete the future with exception. ########## File path: managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/PositionImpl.java ########## @@ -81,6 +82,64 @@ public long getEntryId() { return entryId; } + public int compareToWithAckSet(PositionImpl other) { + checkNotNull(other); + int result = ComparisonChain.start().compare(this.ledgerId, other.ledgerId).compare(this.entryId, other.entryId) + .result(); + if (result == 0) { + if (other.ackSet == null && ackSet == null) { + return result; + } + checkNotNull(other.ackSet); + checkNotNull(ackSet); + BitSetRecyclable otherAckSet = BitSetRecyclable.valueOf(other.ackSet); + BitSetRecyclable thisAckSet = BitSetRecyclable.valueOf(ackSet); + if (otherAckSet.equals(thisAckSet)) { + return result; + } + otherAckSet.and(thisAckSet); + boolean flag = otherAckSet.equals(thisAckSet); + thisAckSet.recycle(); + otherAckSet.recycle(); + return flag ? 1 : 0; + } + return result; + } + + public boolean isAckSetRepeated(PositionImpl other) { + if (ackSet == null || other.ackSet == null) { + return false; + } + checkNotNull(other); + checkNotNull(ackSet); + checkNotNull(other.ackSet); + BitSetRecyclable thisAckSet = BitSetRecyclable.valueOf(ackSet); + BitSetRecyclable otherAckSet = BitSetRecyclable.valueOf(other.ackSet); + if (otherAckSet.size() < thisAckSet.size()) { + otherAckSet.set(otherAckSet.size(), thisAckSet.size()); + } + thisAckSet.flip(0, thisAckSet.size()); + otherAckSet.flip(0, otherAckSet.size()); + thisAckSet.and(otherAckSet); + boolean isAckSetRepeated = !thisAckSet.isEmpty(); + thisAckSet.recycle(); + otherAckSet.recycle(); + return isAckSetRepeated; + } + + public void andAckSet(PositionImpl other) { + checkNotNull(other); + checkNotNull(ackSet); + checkNotNull(other.ackSet); + BitSetRecyclable thisAckSet = BitSetRecyclable.valueOf(ackSet); + BitSetRecyclable otherAckSet = BitSetRecyclable.valueOf(other.ackSet); + if (otherAckSet.size() < thisAckSet.size()) { + otherAckSet.set(otherAckSet.size(), thisAckSet.size()); + } + thisAckSet.and(otherAckSet); + this.ackSet = thisAckSet.toLongArray(); + } Review comment: We do not need to introduce complex checking logic in PositionImpl and it is only used in the transaction. I think it's better to expose a setAckSet method if you want to update the ackSet. ########## File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckHandle.java ########## @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.transaction.pendingack; + +import org.apache.bookkeeper.mledger.Position; +import org.apache.bookkeeper.mledger.impl.PositionImpl; +import org.apache.pulsar.broker.service.Consumer; +import org.apache.pulsar.broker.service.persistent.PersistentSubscription; +import org.apache.pulsar.client.api.transaction.TxnID; +import org.apache.pulsar.common.api.proto.PulsarApi; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; + +public interface PendingAckHandle { + + /** + * Acknowledge message(s) for an ongoing transaction. + * <p> + * It can be of {@link PulsarApi.CommandAck.AckType#Individual} or {@link PulsarApi.CommandAck.AckType#Cumulative}. Single messages acked by ongoing + * transaction will be put in pending_ack state and only marked as deleted after transaction is committed. + * <p> + * Only one transaction is allowed to do cumulative ack on a subscription at a given time. + * If a transaction do multiple cumulative ack, only the one with largest position determined by + * {@link PositionImpl#compareTo(PositionImpl)} will be kept as it cover all position smaller than it. + * <p> + * If an ongoing transaction cumulative acked a message and then try to ack single message which is + * smaller than that one it cumulative acked, it'll succeed. Review comment: Do you mean allowed in one transaction? I think we should throw the ConflictException here since the transaction want to ack messages are already acked right? ########## File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckHandle.java ########## @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.transaction.pendingack; + +import org.apache.bookkeeper.mledger.Position; +import org.apache.bookkeeper.mledger.impl.PositionImpl; +import org.apache.pulsar.broker.service.Consumer; +import org.apache.pulsar.broker.service.persistent.PersistentSubscription; +import org.apache.pulsar.client.api.transaction.TxnID; +import org.apache.pulsar.common.api.proto.PulsarApi; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; + +public interface PendingAckHandle { + + /** + * Acknowledge message(s) for an ongoing transaction. + * <p> + * It can be of {@link PulsarApi.CommandAck.AckType#Individual} or {@link PulsarApi.CommandAck.AckType#Cumulative}. Single messages acked by ongoing + * transaction will be put in pending_ack state and only marked as deleted after transaction is committed. + * <p> + * Only one transaction is allowed to do cumulative ack on a subscription at a given time. + * If a transaction do multiple cumulative ack, only the one with largest position determined by + * {@link PositionImpl#compareTo(PositionImpl)} will be kept as it cover all position smaller than it. + * <p> + * If an ongoing transaction cumulative acked a message and then try to ack single message which is + * smaller than that one it cumulative acked, it'll succeed. + * <p> + * If transaction is aborted all messages acked by it will be put back to pending state. + * + * @param txnId TransactionID of an ongoing transaction trying to sck message. + * @param positions {@link Position}(s) it try to ack. + * @param ackType {@link PulsarApi.CommandAck.AckType}. + * cumulative ack or try to single ack message already acked by any ongoing transaction. Review comment: Is it make sense here? Looks the comment does not relate to the `ackType`. ########## File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckHandle.java ########## @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.transaction.pendingack; + +import org.apache.bookkeeper.mledger.Position; +import org.apache.bookkeeper.mledger.impl.PositionImpl; +import org.apache.pulsar.broker.service.Consumer; +import org.apache.pulsar.broker.service.persistent.PersistentSubscription; +import org.apache.pulsar.client.api.transaction.TxnID; +import org.apache.pulsar.common.api.proto.PulsarApi; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; + +public interface PendingAckHandle { + + /** + * Acknowledge message(s) for an ongoing transaction. + * <p> + * It can be of {@link PulsarApi.CommandAck.AckType#Individual} or {@link PulsarApi.CommandAck.AckType#Cumulative}. Single messages acked by ongoing + * transaction will be put in pending_ack state and only marked as deleted after transaction is committed. + * <p> + * Only one transaction is allowed to do cumulative ack on a subscription at a given time. + * If a transaction do multiple cumulative ack, only the one with largest position determined by + * {@link PositionImpl#compareTo(PositionImpl)} will be kept as it cover all position smaller than it. + * <p> + * If an ongoing transaction cumulative acked a message and then try to ack single message which is + * smaller than that one it cumulative acked, it'll succeed. + * <p> + * If transaction is aborted all messages acked by it will be put back to pending state. + * + * @param txnId TransactionID of an ongoing transaction trying to sck message. + * @param positions {@link Position}(s) it try to ack. + * @param ackType {@link PulsarApi.CommandAck.AckType}. + * cumulative ack or try to single ack message already acked by any ongoing transaction. + * @return the future of this operation. + */ + CompletableFuture<Void> acknowledgeMessage(TxnID txnId, List<Position> positions, PulsarApi.CommandAck.AckType ackType); + + /** + * Commit a transaction. + * + * @param txnId {@link TxnID} to identify the transaction. + * @param properties Additional user-defined properties that can be associated with a particular cursor position. + * @return the future of this operation. + */ + CompletableFuture<Void> commitTxn(TxnID txnId, Map<String,Long> properties); + + /** + * Abort a transaction. + * + * @param txnId {@link TxnID} to identify the transaction. + * @param consumer {@link Consumer} which aborting transaction. + * @return the future of this operation. + */ + CompletableFuture<Void> abortTxn(TxnID txnId, Consumer consumer); + + void setPersistentSubscription(PersistentSubscription persistentSubscription); + + void redeliverUnacknowledgedMessages(Consumer consumer); + + void redeliverUnacknowledgedMessages(Consumer consumer, List<PositionImpl> positions); Review comment: Could you please also add the method comment for these two methods? ########## File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java ########## @@ -0,0 +1,363 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.transaction.pendingack.impl; + +import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.mledger.Position; +import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl; +import org.apache.bookkeeper.mledger.impl.PositionImpl; +import org.apache.bookkeeper.util.collections.ConcurrentLongLongPairHashMap; +import org.apache.pulsar.broker.service.Consumer; +import org.apache.pulsar.broker.service.persistent.PersistentSubscription; +import org.apache.pulsar.broker.transaction.pendingack.PendingAckHandle; +import org.apache.pulsar.client.api.transaction.TxnID; +import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType; +import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; +import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet; +import org.apache.pulsar.transaction.common.exception.TransactionConflictException; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; + +import static com.google.common.base.Preconditions.checkArgument; + +@Slf4j +public class PendingAckHandleImpl implements PendingAckHandle { + + // Map to keep track of message ack by each txn. + private ConcurrentOpenHashMap<TxnID, ConcurrentOpenHashMap<Position, Position>> pendingIndividualAckMessagesMap; + + // Messages acked by ongoing transaction, pending transaction commit to materialize the acks. For faster look up. + // Using hashset as a message should only be acked once by one transaction. + private ConcurrentOpenHashMap<Position, Position> pendingAckMessages; + + private ConcurrentOpenHashMap<Position, ConcurrentOpenHashSet<TxnID>> pendingAckBatchMessageMap; + + // Message cumulative acked by ongoing transaction, pending transaction commit to materialize the ack. + // Only one transaction can cumulative ack. + // This parameter only keep the the largest Position it cumulative ack,as any Position smaller will also be covered. + private volatile Position pendingCumulativeAckMessage; + + private static final AtomicReferenceFieldUpdater<PendingAckHandleImpl, Position> POSITION_UPDATER = + AtomicReferenceFieldUpdater.newUpdater(PendingAckHandleImpl.class, Position.class, + "pendingCumulativeAckMessage"); + + // ID of transaction currently using cumulative ack. + private volatile TxnID pendingCumulativeAckTxnId; + + private static final AtomicReferenceFieldUpdater<PendingAckHandleImpl, TxnID> PENDING_CUMULATIVE_ACK_TXNID_UPDATER = + AtomicReferenceFieldUpdater.newUpdater(PendingAckHandleImpl.class, TxnID.class, + "pendingCumulativeAckTxnId"); + + private PersistentSubscription persistentSubscription; + + private final String topicName; + + private final String subName; + + public PendingAckHandleImpl(String topicName, String subName) { + this.topicName = topicName; + this.subName = subName; + } + + @Override + public synchronized CompletableFuture<Void> acknowledgeMessage(TxnID txnId, + List<Position> positions, AckType ackType) { + checkArgument(txnId != null, "TransactionID can not be null."); + CompletableFuture<Void> completableFuture = new CompletableFuture<>(); + checkArgument(positions.get(0) instanceof PositionImpl); + PositionImpl position = (PositionImpl) positions.get(0); + if (AckType.Cumulative == ackType) { + + if (positions.size() != 1) { + String errorMsg = "[" + topicName + "][" + subName + "] Transaction:" + txnId + + " invalid cumulative ack received with multiple message ids."; + log.error(errorMsg); + completableFuture.completeExceptionally(new TransactionConflictException(errorMsg)); Review comment: It's easy to achieve by using log pattern log.error("[{}] [{}] Transaction:", topicName, subName). ########## File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java ########## @@ -0,0 +1,363 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.transaction.pendingack.impl; + +import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.mledger.Position; +import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl; +import org.apache.bookkeeper.mledger.impl.PositionImpl; +import org.apache.bookkeeper.util.collections.ConcurrentLongLongPairHashMap; +import org.apache.pulsar.broker.service.Consumer; +import org.apache.pulsar.broker.service.persistent.PersistentSubscription; +import org.apache.pulsar.broker.transaction.pendingack.PendingAckHandle; +import org.apache.pulsar.client.api.transaction.TxnID; +import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType; +import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; +import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet; +import org.apache.pulsar.transaction.common.exception.TransactionConflictException; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; + +import static com.google.common.base.Preconditions.checkArgument; + +@Slf4j +public class PendingAckHandleImpl implements PendingAckHandle { + + // Map to keep track of message ack by each txn. + private ConcurrentOpenHashMap<TxnID, ConcurrentOpenHashMap<Position, Position>> pendingIndividualAckMessagesMap; + + // Messages acked by ongoing transaction, pending transaction commit to materialize the acks. For faster look up. + // Using hashset as a message should only be acked once by one transaction. + private ConcurrentOpenHashMap<Position, Position> pendingAckMessages; + + private ConcurrentOpenHashMap<Position, ConcurrentOpenHashSet<TxnID>> pendingAckBatchMessageMap; + + // Message cumulative acked by ongoing transaction, pending transaction commit to materialize the ack. + // Only one transaction can cumulative ack. + // This parameter only keep the the largest Position it cumulative ack,as any Position smaller will also be covered. + private volatile Position pendingCumulativeAckMessage; + + private static final AtomicReferenceFieldUpdater<PendingAckHandleImpl, Position> POSITION_UPDATER = + AtomicReferenceFieldUpdater.newUpdater(PendingAckHandleImpl.class, Position.class, + "pendingCumulativeAckMessage"); + + // ID of transaction currently using cumulative ack. + private volatile TxnID pendingCumulativeAckTxnId; + + private static final AtomicReferenceFieldUpdater<PendingAckHandleImpl, TxnID> PENDING_CUMULATIVE_ACK_TXNID_UPDATER = + AtomicReferenceFieldUpdater.newUpdater(PendingAckHandleImpl.class, TxnID.class, + "pendingCumulativeAckTxnId"); + + private PersistentSubscription persistentSubscription; + + private final String topicName; + + private final String subName; + + public PendingAckHandleImpl(String topicName, String subName) { + this.topicName = topicName; + this.subName = subName; + } + + @Override + public synchronized CompletableFuture<Void> acknowledgeMessage(TxnID txnId, + List<Position> positions, AckType ackType) { + checkArgument(txnId != null, "TransactionID can not be null."); + CompletableFuture<Void> completableFuture = new CompletableFuture<>(); + checkArgument(positions.get(0) instanceof PositionImpl); Review comment: Same as above comment. ########## File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckHandle.java ########## @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.transaction.pendingack; + +import org.apache.bookkeeper.mledger.Position; +import org.apache.bookkeeper.mledger.impl.PositionImpl; +import org.apache.pulsar.broker.service.Consumer; +import org.apache.pulsar.broker.service.persistent.PersistentSubscription; +import org.apache.pulsar.client.api.transaction.TxnID; +import org.apache.pulsar.common.api.proto.PulsarApi; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; + +public interface PendingAckHandle { + + /** + * Acknowledge message(s) for an ongoing transaction. + * <p> + * It can be of {@link PulsarApi.CommandAck.AckType#Individual} or {@link PulsarApi.CommandAck.AckType#Cumulative}. Single messages acked by ongoing + * transaction will be put in pending_ack state and only marked as deleted after transaction is committed. + * <p> + * Only one transaction is allowed to do cumulative ack on a subscription at a given time. + * If a transaction do multiple cumulative ack, only the one with largest position determined by + * {@link PositionImpl#compareTo(PositionImpl)} will be kept as it cover all position smaller than it. + * <p> + * If an ongoing transaction cumulative acked a message and then try to ack single message which is + * smaller than that one it cumulative acked, it'll succeed. + * <p> + * If transaction is aborted all messages acked by it will be put back to pending state. + * + * @param txnId TransactionID of an ongoing transaction trying to sck message. + * @param positions {@link Position}(s) it try to ack. + * @param ackType {@link PulsarApi.CommandAck.AckType}. + * cumulative ack or try to single ack message already acked by any ongoing transaction. + * @return the future of this operation. Review comment: It's better to clarify what exceptions will throw? ########## File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java ########## @@ -0,0 +1,363 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.transaction.pendingack.impl; + +import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.mledger.Position; +import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl; +import org.apache.bookkeeper.mledger.impl.PositionImpl; +import org.apache.bookkeeper.util.collections.ConcurrentLongLongPairHashMap; +import org.apache.pulsar.broker.service.Consumer; +import org.apache.pulsar.broker.service.persistent.PersistentSubscription; +import org.apache.pulsar.broker.transaction.pendingack.PendingAckHandle; +import org.apache.pulsar.client.api.transaction.TxnID; +import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType; +import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; +import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet; +import org.apache.pulsar.transaction.common.exception.TransactionConflictException; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; + +import static com.google.common.base.Preconditions.checkArgument; + +@Slf4j +public class PendingAckHandleImpl implements PendingAckHandle { + + // Map to keep track of message ack by each txn. + private ConcurrentOpenHashMap<TxnID, ConcurrentOpenHashMap<Position, Position>> pendingIndividualAckMessagesMap; + + // Messages acked by ongoing transaction, pending transaction commit to materialize the acks. For faster look up. + // Using hashset as a message should only be acked once by one transaction. + private ConcurrentOpenHashMap<Position, Position> pendingAckMessages; + + private ConcurrentOpenHashMap<Position, ConcurrentOpenHashSet<TxnID>> pendingAckBatchMessageMap; + + // Message cumulative acked by ongoing transaction, pending transaction commit to materialize the ack. + // Only one transaction can cumulative ack. + // This parameter only keep the the largest Position it cumulative ack,as any Position smaller will also be covered. + private volatile Position pendingCumulativeAckMessage; + + private static final AtomicReferenceFieldUpdater<PendingAckHandleImpl, Position> POSITION_UPDATER = + AtomicReferenceFieldUpdater.newUpdater(PendingAckHandleImpl.class, Position.class, + "pendingCumulativeAckMessage"); + + // ID of transaction currently using cumulative ack. + private volatile TxnID pendingCumulativeAckTxnId; + + private static final AtomicReferenceFieldUpdater<PendingAckHandleImpl, TxnID> PENDING_CUMULATIVE_ACK_TXNID_UPDATER = + AtomicReferenceFieldUpdater.newUpdater(PendingAckHandleImpl.class, TxnID.class, + "pendingCumulativeAckTxnId"); Review comment: Is it can be simplified by using a Pair<Txn, Position> ? ########## File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java ########## @@ -0,0 +1,363 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.transaction.pendingack.impl; + +import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.mledger.Position; +import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl; +import org.apache.bookkeeper.mledger.impl.PositionImpl; +import org.apache.bookkeeper.util.collections.ConcurrentLongLongPairHashMap; +import org.apache.pulsar.broker.service.Consumer; +import org.apache.pulsar.broker.service.persistent.PersistentSubscription; +import org.apache.pulsar.broker.transaction.pendingack.PendingAckHandle; +import org.apache.pulsar.client.api.transaction.TxnID; +import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType; +import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; +import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet; +import org.apache.pulsar.transaction.common.exception.TransactionConflictException; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; + +import static com.google.common.base.Preconditions.checkArgument; + +@Slf4j +public class PendingAckHandleImpl implements PendingAckHandle { + + // Map to keep track of message ack by each txn. + private ConcurrentOpenHashMap<TxnID, ConcurrentOpenHashMap<Position, Position>> pendingIndividualAckMessagesMap; + + // Messages acked by ongoing transaction, pending transaction commit to materialize the acks. For faster look up. + // Using hashset as a message should only be acked once by one transaction. + private ConcurrentOpenHashMap<Position, Position> pendingAckMessages; + + private ConcurrentOpenHashMap<Position, ConcurrentOpenHashSet<TxnID>> pendingAckBatchMessageMap; + + // Message cumulative acked by ongoing transaction, pending transaction commit to materialize the ack. + // Only one transaction can cumulative ack. + // This parameter only keep the the largest Position it cumulative ack,as any Position smaller will also be covered. + private volatile Position pendingCumulativeAckMessage; + + private static final AtomicReferenceFieldUpdater<PendingAckHandleImpl, Position> POSITION_UPDATER = + AtomicReferenceFieldUpdater.newUpdater(PendingAckHandleImpl.class, Position.class, + "pendingCumulativeAckMessage"); + + // ID of transaction currently using cumulative ack. + private volatile TxnID pendingCumulativeAckTxnId; + + private static final AtomicReferenceFieldUpdater<PendingAckHandleImpl, TxnID> PENDING_CUMULATIVE_ACK_TXNID_UPDATER = + AtomicReferenceFieldUpdater.newUpdater(PendingAckHandleImpl.class, TxnID.class, + "pendingCumulativeAckTxnId"); + + private PersistentSubscription persistentSubscription; + + private final String topicName; + + private final String subName; + + public PendingAckHandleImpl(String topicName, String subName) { + this.topicName = topicName; + this.subName = subName; + } + + @Override + public synchronized CompletableFuture<Void> acknowledgeMessage(TxnID txnId, + List<Position> positions, AckType ackType) { + checkArgument(txnId != null, "TransactionID can not be null."); + CompletableFuture<Void> completableFuture = new CompletableFuture<>(); + checkArgument(positions.get(0) instanceof PositionImpl); Review comment: And this will introduce a NPE here since the positions.get(0) may return null. ########## File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java ########## @@ -0,0 +1,363 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.transaction.pendingack.impl; + +import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.mledger.Position; +import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl; +import org.apache.bookkeeper.mledger.impl.PositionImpl; +import org.apache.bookkeeper.util.collections.ConcurrentLongLongPairHashMap; +import org.apache.pulsar.broker.service.Consumer; +import org.apache.pulsar.broker.service.persistent.PersistentSubscription; +import org.apache.pulsar.broker.transaction.pendingack.PendingAckHandle; +import org.apache.pulsar.client.api.transaction.TxnID; +import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType; +import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; +import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet; +import org.apache.pulsar.transaction.common.exception.TransactionConflictException; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; + +import static com.google.common.base.Preconditions.checkArgument; + +@Slf4j +public class PendingAckHandleImpl implements PendingAckHandle { + + // Map to keep track of message ack by each txn. + private ConcurrentOpenHashMap<TxnID, ConcurrentOpenHashMap<Position, Position>> pendingIndividualAckMessagesMap; + + // Messages acked by ongoing transaction, pending transaction commit to materialize the acks. For faster look up. + // Using hashset as a message should only be acked once by one transaction. + private ConcurrentOpenHashMap<Position, Position> pendingAckMessages; + + private ConcurrentOpenHashMap<Position, ConcurrentOpenHashSet<TxnID>> pendingAckBatchMessageMap; + + // Message cumulative acked by ongoing transaction, pending transaction commit to materialize the ack. + // Only one transaction can cumulative ack. + // This parameter only keep the the largest Position it cumulative ack,as any Position smaller will also be covered. + private volatile Position pendingCumulativeAckMessage; + + private static final AtomicReferenceFieldUpdater<PendingAckHandleImpl, Position> POSITION_UPDATER = + AtomicReferenceFieldUpdater.newUpdater(PendingAckHandleImpl.class, Position.class, + "pendingCumulativeAckMessage"); + + // ID of transaction currently using cumulative ack. + private volatile TxnID pendingCumulativeAckTxnId; + + private static final AtomicReferenceFieldUpdater<PendingAckHandleImpl, TxnID> PENDING_CUMULATIVE_ACK_TXNID_UPDATER = + AtomicReferenceFieldUpdater.newUpdater(PendingAckHandleImpl.class, TxnID.class, + "pendingCumulativeAckTxnId"); + + private PersistentSubscription persistentSubscription; + + private final String topicName; + + private final String subName; + + public PendingAckHandleImpl(String topicName, String subName) { + this.topicName = topicName; + this.subName = subName; + } + + @Override + public synchronized CompletableFuture<Void> acknowledgeMessage(TxnID txnId, + List<Position> positions, AckType ackType) { + checkArgument(txnId != null, "TransactionID can not be null."); + CompletableFuture<Void> completableFuture = new CompletableFuture<>(); + checkArgument(positions.get(0) instanceof PositionImpl); + PositionImpl position = (PositionImpl) positions.get(0); Review comment: Same as the above comment. ########## File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java ########## @@ -0,0 +1,363 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.transaction.pendingack.impl; + +import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.mledger.Position; +import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl; +import org.apache.bookkeeper.mledger.impl.PositionImpl; +import org.apache.bookkeeper.util.collections.ConcurrentLongLongPairHashMap; +import org.apache.pulsar.broker.service.Consumer; +import org.apache.pulsar.broker.service.persistent.PersistentSubscription; +import org.apache.pulsar.broker.transaction.pendingack.PendingAckHandle; +import org.apache.pulsar.client.api.transaction.TxnID; +import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType; +import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; +import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet; +import org.apache.pulsar.transaction.common.exception.TransactionConflictException; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; + +import static com.google.common.base.Preconditions.checkArgument; + +@Slf4j +public class PendingAckHandleImpl implements PendingAckHandle { + + // Map to keep track of message ack by each txn. + private ConcurrentOpenHashMap<TxnID, ConcurrentOpenHashMap<Position, Position>> pendingIndividualAckMessagesMap; + + // Messages acked by ongoing transaction, pending transaction commit to materialize the acks. For faster look up. + // Using hashset as a message should only be acked once by one transaction. + private ConcurrentOpenHashMap<Position, Position> pendingAckMessages; + + private ConcurrentOpenHashMap<Position, ConcurrentOpenHashSet<TxnID>> pendingAckBatchMessageMap; + + // Message cumulative acked by ongoing transaction, pending transaction commit to materialize the ack. + // Only one transaction can cumulative ack. + // This parameter only keep the the largest Position it cumulative ack,as any Position smaller will also be covered. + private volatile Position pendingCumulativeAckMessage; + + private static final AtomicReferenceFieldUpdater<PendingAckHandleImpl, Position> POSITION_UPDATER = + AtomicReferenceFieldUpdater.newUpdater(PendingAckHandleImpl.class, Position.class, + "pendingCumulativeAckMessage"); + + // ID of transaction currently using cumulative ack. + private volatile TxnID pendingCumulativeAckTxnId; + + private static final AtomicReferenceFieldUpdater<PendingAckHandleImpl, TxnID> PENDING_CUMULATIVE_ACK_TXNID_UPDATER = + AtomicReferenceFieldUpdater.newUpdater(PendingAckHandleImpl.class, TxnID.class, + "pendingCumulativeAckTxnId"); + + private PersistentSubscription persistentSubscription; + + private final String topicName; + + private final String subName; + + public PendingAckHandleImpl(String topicName, String subName) { Review comment: You can instantiate with the PersistentSubscription and you can get topicName and subName from the PersistentSubscription ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
