gaoran10 commented on a change in pull request #12449: URL: https://github.com/apache/pulsar/pull/12449#discussion_r751131605
########## File path: managed-ledger/src/main/java/org/apache/bookkeeper/mledger/OffloadFilter.java ########## @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bookkeeper.mledger; + +import org.apache.bookkeeper.client.api.LedgerEntry; +import org.apache.bookkeeper.mledger.impl.PositionImpl; + +public interface OffloadFilter { + + /** + * Check whether this Entry needs offload.Exclude the aborted message and transaction mark + * @return + */ + boolean checkIfNeedOffload(LedgerEntry LedgerEntry); Review comment: ```suggestion boolean checkEntry(LedgerEntry LedgerEntry); ``` ########## File path: managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java ########## @@ -222,6 +223,8 @@ private ManagedLedgerInterceptor managedLedgerInterceptor; + private OffloadFilter offloadFilter; Review comment: Maybe we could use a list here? It seems that the offload filter may have different implementations. ########## File path: managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java ########## @@ -2247,6 +2254,10 @@ private void maybeOffloadInBackground(CompletableFuture<PositionImpl> promise) { && config.getLedgerOffloader().getOffloadPolicies() != null && config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes() != null && config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes() >= 0) { + //This means that the topic has not been created yet or the TB has not been started completely. + if(offloadFilter == null || !offloadFilter.checkFilterIsReady()){ Review comment: We could use the `OffloadFilterDisabled` as the initialization value of the `offloadFilter`, then we could omit the null check for `offloadFilter`. ########## File path: pulsar-broker/pom.xml ########## @@ -335,6 +356,12 @@ <type>pom</type> <scope>test</scope> </dependency> + <dependency> + <groupId>org.apache.jclouds</groupId> + <artifactId>jclouds-allblobstore</artifactId> + <version>${jclouds.version}</version> + <scope>test</scope> + </dependency> Review comment: Same as above. ########## File path: managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java ########## @@ -1848,8 +1851,10 @@ public void asyncReadEntry(PositionImpl position, ReadEntryCallback callback, Ob } private void internalReadFromLedger(ReadHandle ledger, OpReadEntry opReadEntry) { - - if (opReadEntry.readPosition.compareTo(opReadEntry.maxPosition) > 0) { + //entries that has been offloaded do not need to be restricted by TransactionBuffer`maxReadPosition + LedgerInfo info = ledgers.get(ledger.getId()); + if (opReadEntry.readPosition.compareTo(opReadEntry.maxPosition) > 0 Review comment: Why the messages are aborted or not is related to offloading? ########## File path: managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java ########## @@ -2287,6 +2298,9 @@ private void maybeOffload(CompletableFuture<PositionImpl> finalPromise) { if (alreadyOffloaded) { alreadyOffloadedSize += size; } else if (sizeSummed > threshold) { + if(offloadFilter != null && !offloadFilter.checkIfLedgerIdCanOffload(e.getValue().getLedgerId())){ Review comment: We could omit the null check for offloadFilter. ########## File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/OffloadFilterImp.java ########## @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service; + +import org.apache.bookkeeper.client.api.LedgerEntry; +import org.apache.bookkeeper.mledger.OffloadFilter; +import org.apache.pulsar.broker.service.persistent.PersistentTopic; +import org.apache.pulsar.client.api.transaction.TxnID; +import org.apache.pulsar.common.api.proto.MessageMetadata; +import org.apache.pulsar.common.protocol.Commands; +import org.apache.pulsar.common.protocol.Markers; + +public class OffloadFilterImp implements OffloadFilter { + PersistentTopic persistentTopic; + public OffloadFilterImp(PersistentTopic persistentTopic) { + this.persistentTopic = persistentTopic; + } + + @Override + public boolean checkIfNeedOffload(LedgerEntry ledgerEntry) { + int index = ledgerEntry.getEntryBuffer().readerIndex(); + MessageMetadata messageMetadata = Commands.parseMessageMetadata(ledgerEntry.getEntryBuffer()); + ledgerEntry.getEntryBuffer().readerIndex(index); + + if (messageMetadata.hasTxnidLeastBits() && messageMetadata.hasTxnidMostBits()){ + return !persistentTopic.isTxnAborted(new TxnID(messageMetadata.getTxnidMostBits(), + messageMetadata.getTxnidLeastBits())) && !Markers.isTxnMarker(messageMetadata); + } + return true; + } + + @Override + public boolean checkIfLedgerIdCanOffload(long ledgerId) { + return ledgerId < persistentTopic.getMaxReadPosition().getLedgerId(); + } + + @Override + public boolean checkFilterIsReady() { + return "Ready".equals(persistentTopic.getTransactionBufferStats().state) Review comment: Please compare with enum. ########## File path: pulsar-broker/pom.xml ########## @@ -320,6 +320,27 @@ <scope>test</scope> </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-minicluster</artifactId> + <version>${hdfs-offload-version3}</version> + <scope>test</scope> + <exclusions> + <exclusion> + <groupId>io.netty</groupId> + <artifactId>netty-all</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.curator</groupId> + <artifactId>*</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.commons</groupId> + <artifactId>commons-math3</artifactId> + </exclusion> + </exclusions> + </dependency> Review comment: Do we need to add this dependencies? ########## File path: managed-ledger/src/main/java/org/apache/bookkeeper/mledger/OffloadFilter.java ########## @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bookkeeper.mledger; + +import org.apache.bookkeeper.client.api.LedgerEntry; +import org.apache.bookkeeper.mledger.impl.PositionImpl; + +public interface OffloadFilter { + + /** + * Check whether this Entry needs offload.Exclude the aborted message and transaction mark Review comment: ```suggestion * Whether the entry could be offloaded. ``` This is an interface declare, the annotation `Exclude the aborted message and transaction mark.` is used for transaction offload filter. ########## File path: managed-ledger/src/main/java/org/apache/bookkeeper/mledger/OffloadFilter.java ########## @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bookkeeper.mledger; + +import org.apache.bookkeeper.client.api.LedgerEntry; +import org.apache.bookkeeper.mledger.impl.PositionImpl; + +public interface OffloadFilter { + + /** + * Check whether this Entry needs offload.Exclude the aborted message and transaction mark + * @return + */ + boolean checkIfNeedOffload(LedgerEntry LedgerEntry); + + /** + * The largest stable position that can be exposed to the consumer + * @return + */ + boolean checkIfLedgerIdCanOffload(long LedgerId); Review comment: ```suggestion boolean checkLedger(long LedgerId); ``` ########## File path: managed-ledger/src/main/java/org/apache/bookkeeper/mledger/OffloadFilter.java ########## @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bookkeeper.mledger; + +import org.apache.bookkeeper.client.api.LedgerEntry; +import org.apache.bookkeeper.mledger.impl.PositionImpl; + +public interface OffloadFilter { + + /** + * Check whether this Entry needs offload.Exclude the aborted message and transaction mark + * @return + */ + boolean checkIfNeedOffload(LedgerEntry LedgerEntry); + + /** + * The largest stable position that can be exposed to the consumer Review comment: ```suggestion * Whether the ledger could be offloaded. ``` ########## File path: pulsar-broker/pom.xml ########## @@ -367,6 +394,18 @@ <artifactId>pulsar-package-core</artifactId> <version>${project.version}</version> </dependency> + <dependency> + <groupId>${project.groupId}</groupId> + <artifactId>tiered-storage-file-system</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>${project.groupId}</groupId> + <artifactId>tiered-storage-jcloud</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> Review comment: Same as above. ########## File path: pom.xml ########## @@ -2043,6 +2043,10 @@ flexible messaging model and an intuitive client API.</description> <module>pulsar-client-auth-sasl</module> <module>pulsar-config-validation</module> + <!-- tiered-storage related modules --> + <module>tiered-storage</module> + <module>jclouds-shaded</module> + Review comment: Do we need to add this? ########## File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java ########## @@ -117,6 +117,12 @@ public int filterEntriesForConsumer(Optional<EntryWrapper[]> entryWrapper, int e Entry entry = entries.get(i); if (entry == null) { continue; + } else if (entry.getLength() == 0) { + subscription.acknowledgeMessage(Collections.singletonList(entry.getPosition()), AckType.Individual, Review comment: Please add a description here. ########## File path: managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java ########## @@ -1868,7 +1873,9 @@ private void internalReadFromLedger(ReadHandle ledger, OpReadEntry opReadEntry) // can read max position entryId if (ledger.getId() == opReadEntry.maxPosition.getLedgerId()) { Review comment: Why need this check? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
