congbobo184 commented on a change in pull request #12449: URL: https://github.com/apache/pulsar/pull/12449#discussion_r733577816
########## File path: managed-ledger/src/main/java/org/apache/bookkeeper/mledger/OffloadFilter.java ########## @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bookkeeper.mledger; + +import org.apache.bookkeeper.client.api.LedgerEntry; +import org.apache.bookkeeper.mledger.impl.PositionImpl; + +public interface OffloadFilter { + + /** + * Check whether this Entry needs offload.Exclude the aborted message and transaction mark + * @return + */ + boolean checkIfNeedOffload(LedgerEntry LedgerEntry); + + /** + * The largest stable position that can be exposed to the consumer + * @return + */ + PositionImpl getMaxReadPosition(); Review comment: named checkLedgerIdCanOffload is better ########## File path: managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java ########## @@ -2289,6 +2302,12 @@ private void maybeOffload(CompletableFuture<PositionImpl> finalPromise) { if (alreadyOffloaded) { alreadyOffloadedSize += size; } else if (sizeSummed > threshold) { + //If the state of TB is noSnapshot, this ledger will not contain transaction messages + if(config.isTransactionEnable() Review comment: in this logic, we only check ledgerId can offload is enough right? we don't need to judge `isTransactionEnable` and offloadFilter state. ########## File path: tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java ########## @@ -200,7 +204,7 @@ public String getOffloadDriverName() { try (BlockAwareSegmentInputStream blockStream = new BlockAwareSegmentInputStreamImpl( readHandle, startEntry, blockSize)) { - + ((BlockAwareSegmentInputStreamImpl)blockStream).setOffloadFilter(offloadFilter); Review comment: add a interface in BlockAwareSegmentInputStream ########## File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/OffloadFilterImp.java ########## @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service; + +import org.apache.bookkeeper.client.api.LedgerEntry; +import org.apache.bookkeeper.mledger.OffloadFilter; +import org.apache.bookkeeper.mledger.impl.PositionImpl; +import org.apache.pulsar.broker.service.persistent.PersistentTopic; +import org.apache.pulsar.client.api.transaction.TxnID; +import org.apache.pulsar.common.api.proto.MessageMetadata; +import org.apache.pulsar.common.protocol.Commands; +import org.apache.pulsar.common.protocol.Markers; + +public class OffloadFilterImp implements OffloadFilter { + PersistentTopic persistentTopic; + public OffloadFilterImp(PersistentTopic persistentTopic) { + this.persistentTopic = persistentTopic; + } + + @Override + public boolean checkIfNeedOffload(LedgerEntry ledgerEntry) { + MessageMetadata messageMetadata = Commands.parseMessageMetadata(ledgerEntry.getEntryBuffer()); + + if (messageMetadata.hasTxnidLeastBits() && messageMetadata.hasTxnidMostBits()){ + if (persistentTopic.isTxnAborted(new TxnID(messageMetadata.getTxnidMostBits(), + messageMetadata.getTxnidLeastBits()))){ Review comment: || Markers.isTxnMarker(messageMetadata) ########## File path: pulsar-broker/pom.xml ########## @@ -367,6 +386,24 @@ <artifactId>pulsar-package-core</artifactId> <version>${project.version}</version> </dependency> + <dependency> + <groupId>org.apache.pulsar</groupId> + <artifactId>tiered-storage-file-system</artifactId> + <version>2.9.0-SNAPSHOT</version> Review comment: use ${project.version} ########## File path: pulsar-broker/pom.xml ########## @@ -367,6 +386,24 @@ <artifactId>pulsar-package-core</artifactId> <version>${project.version}</version> </dependency> + <dependency> + <groupId>org.apache.pulsar</groupId> + <artifactId>tiered-storage-file-system</artifactId> + <version>2.9.0-SNAPSHOT</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.pulsar</groupId> + <artifactId>tiered-storage-jcloud</artifactId> + <version>2.9.0-SNAPSHOT</version> Review comment: same as above ########## File path: managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java ########## @@ -2249,6 +2256,12 @@ private void maybeOffloadInBackground(CompletableFuture<PositionImpl> promise) { && config.getLedgerOffloader().getOffloadPolicies() != null && config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes() != null && config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes() >= 0) { + //This means that the topic has not been created yet or the TB has not been started completely. + if(config.isTransactionEnable()){ Review comment: why should we add isTransactionEnable, use offloadFilter == null is enough ########## File path: managed-ledger/src/main/java/org/apache/bookkeeper/mledger/OffloadFilter.java ########## @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bookkeeper.mledger; + +import org.apache.bookkeeper.client.api.LedgerEntry; +import org.apache.bookkeeper.mledger.impl.PositionImpl; + +public interface OffloadFilter { + + /** + * Check whether this Entry needs offload.Exclude the aborted message and transaction mark + * @return + */ + boolean checkIfNeedOffload(LedgerEntry LedgerEntry); + + /** + * The largest stable position that can be exposed to the consumer + * @return + */ + PositionImpl getMaxReadPosition(); + + /** + * Check whether the status of TransactionBuffer is Initializing. + * @return + */ + boolean isTransactionBufferInitializing(); Review comment: don't named isTransactionBufferInitializing, I think checkFilterIsReady is better, Maybe this filter is not necessarily only used by transaction. ########## File path: pulsar-broker/pom.xml ########## @@ -367,6 +386,24 @@ <artifactId>pulsar-package-core</artifactId> <version>${project.version}</version> </dependency> + <dependency> + <groupId>org.apache.pulsar</groupId> + <artifactId>tiered-storage-file-system</artifactId> + <version>2.9.0-SNAPSHOT</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.pulsar</groupId> + <artifactId>tiered-storage-jcloud</artifactId> + <version>2.9.0-SNAPSHOT</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.jclouds</groupId> + <artifactId>jclouds-blobstore</artifactId> + <version>2.3.0</version> Review comment: use ${jclouds.version} ########## File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java ########## @@ -1353,6 +1360,7 @@ public void openLedgerFailed(ManagedLedgerException exception, Object ctx) { ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig(); + managedLedgerConfig.setTransactionEnable(serviceConfig.isTransactionCoordinatorEnabled()); Review comment: In my opinion, it seems unnecessary ########## File path: tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java ########## @@ -129,14 +131,26 @@ public LedgerMetadata getLedgerMetadata() { log.warn("Reading a closed read handler. Ledger ID: {}, Read range: {}-{}", ledgerId, firstEntry, lastEntry); throw new BKException.BKUnexpectedConditionException(); } - int length = dataStream.readInt(); - if (length < 0) { // hit padding or new block - inputStream.seek(index.getIndexEntryForEntry(nextExpectedId).getDataOffset()); - continue; + long entryId; + int length; + try { Review comment: dont use try catch, this code have been created in try catch line 110 ########## File path: tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileStoreBackedReadHandleImpl.java ########## @@ -105,30 +105,22 @@ public LedgerMetadata getLedgerMetadata() { promise.completeExceptionally(new BKException.BKIncorrectParameterException()); return; } - long entriesToRead = (lastEntry - firstEntry) + 1; List<LedgerEntry> entries = new ArrayList<LedgerEntry>(); - long nextExpectedId = firstEntry; LongWritable key = new LongWritable(); BytesWritable value = new BytesWritable(); try { - key.set(nextExpectedId - 1); + key.set(firstEntry - 1); reader.seek(key); - while (entriesToRead > 0) { - reader.next(key, value); + reader.next(key, value); Review comment: why not use while do? if reader.next(key, value) return false, `value.getLength()` will throw NPE ########## File path: managed-ledger/src/main/java/org/apache/bookkeeper/mledger/OffloadFilter.java ########## @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bookkeeper.mledger; + +import org.apache.bookkeeper.client.api.LedgerEntry; +import org.apache.bookkeeper.mledger.impl.PositionImpl; + +public interface OffloadFilter { + + /** + * Check whether this Entry needs offload.Exclude the aborted message and transaction mark + * @return + */ + boolean checkIfNeedOffload(LedgerEntry LedgerEntry); + + /** + * The largest stable position that can be exposed to the consumer + * @return + */ + PositionImpl getMaxReadPosition(); + + /** + * Check whether the status of TransactionBuffer is Initializing. + * @return + */ + boolean isTransactionBufferInitializing(); + + /** + * Check whether the status of TransactionBuffer is NoSnapshot. + * @return + */ + boolean isTransactionBufferNoSnapshot(); Review comment: I think we don't need this API, Is there any use for it I didn't think of? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org