congbobo184 commented on a change in pull request #12449:
URL: https://github.com/apache/pulsar/pull/12449#discussion_r733577816



##########
File path: 
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/OffloadFilter.java
##########
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger;
+
+import org.apache.bookkeeper.client.api.LedgerEntry;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+
+public interface OffloadFilter {
+
+    /**
+     * Check whether this Entry needs offload.Exclude the aborted message and 
transaction mark
+     * @return
+     */
+    boolean checkIfNeedOffload(LedgerEntry LedgerEntry);
+
+    /**
+     * The largest stable position that can be exposed to the consumer
+     * @return
+     */
+    PositionImpl getMaxReadPosition();

Review comment:
       named checkLedgerIdCanOffload is better

##########
File path: 
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
##########
@@ -2289,6 +2302,12 @@ private void 
maybeOffload(CompletableFuture<PositionImpl> finalPromise) {
                     if (alreadyOffloaded) {
                         alreadyOffloadedSize += size;
                     } else if (sizeSummed > threshold) {
+                        //If the state of TB is noSnapshot, this ledger will 
not contain transaction messages
+                        if(config.isTransactionEnable()

Review comment:
       in this logic, we only check ledgerId can offload is enough right? we 
don't need to judge `isTransactionEnable` and offloadFilter state.

##########
File path: 
tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java
##########
@@ -200,7 +204,7 @@ public String getOffloadDriverName() {
 
                     try (BlockAwareSegmentInputStream blockStream = new 
BlockAwareSegmentInputStreamImpl(
                         readHandle, startEntry, blockSize)) {
-
+                        
((BlockAwareSegmentInputStreamImpl)blockStream).setOffloadFilter(offloadFilter);

Review comment:
       add a interface in BlockAwareSegmentInputStream

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/OffloadFilterImp.java
##########
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import org.apache.bookkeeper.client.api.LedgerEntry;
+import org.apache.bookkeeper.mledger.OffloadFilter;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.client.api.transaction.TxnID;
+import org.apache.pulsar.common.api.proto.MessageMetadata;
+import org.apache.pulsar.common.protocol.Commands;
+import org.apache.pulsar.common.protocol.Markers;
+
+public class OffloadFilterImp implements OffloadFilter {
+    PersistentTopic persistentTopic;
+    public OffloadFilterImp(PersistentTopic persistentTopic) {
+        this.persistentTopic = persistentTopic;
+    }
+
+    @Override
+    public boolean checkIfNeedOffload(LedgerEntry ledgerEntry) {
+        MessageMetadata messageMetadata = 
Commands.parseMessageMetadata(ledgerEntry.getEntryBuffer());
+
+        if (messageMetadata.hasTxnidLeastBits() && 
messageMetadata.hasTxnidMostBits()){
+            if (persistentTopic.isTxnAborted(new 
TxnID(messageMetadata.getTxnidMostBits(),
+                    messageMetadata.getTxnidLeastBits()))){

Review comment:
       || Markers.isTxnMarker(messageMetadata)

##########
File path: pulsar-broker/pom.xml
##########
@@ -367,6 +386,24 @@
       <artifactId>pulsar-package-core</artifactId>
       <version>${project.version}</version>
     </dependency>
+    <dependency>
+      <groupId>org.apache.pulsar</groupId>
+      <artifactId>tiered-storage-file-system</artifactId>
+      <version>2.9.0-SNAPSHOT</version>

Review comment:
       use ${project.version}

##########
File path: pulsar-broker/pom.xml
##########
@@ -367,6 +386,24 @@
       <artifactId>pulsar-package-core</artifactId>
       <version>${project.version}</version>
     </dependency>
+    <dependency>
+      <groupId>org.apache.pulsar</groupId>
+      <artifactId>tiered-storage-file-system</artifactId>
+      <version>2.9.0-SNAPSHOT</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.pulsar</groupId>
+      <artifactId>tiered-storage-jcloud</artifactId>
+      <version>2.9.0-SNAPSHOT</version>

Review comment:
       same as above

##########
File path: 
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
##########
@@ -2249,6 +2256,12 @@ private void 
maybeOffloadInBackground(CompletableFuture<PositionImpl> promise) {
                 && config.getLedgerOffloader().getOffloadPolicies() != null
                 && 
config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes()
 != null
                 && 
config.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes()
 >= 0) {
+            //This means that the topic has not been created yet or the TB has 
not been started completely.
+            if(config.isTransactionEnable()){

Review comment:
       why should we add isTransactionEnable, use offloadFilter == null is 
enough

##########
File path: 
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/OffloadFilter.java
##########
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger;
+
+import org.apache.bookkeeper.client.api.LedgerEntry;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+
+public interface OffloadFilter {
+
+    /**
+     * Check whether this Entry needs offload.Exclude the aborted message and 
transaction mark
+     * @return
+     */
+    boolean checkIfNeedOffload(LedgerEntry LedgerEntry);
+
+    /**
+     * The largest stable position that can be exposed to the consumer
+     * @return
+     */
+    PositionImpl getMaxReadPosition();
+
+    /**
+     * Check whether the status of TransactionBuffer is Initializing.
+     * @return
+     */
+    boolean isTransactionBufferInitializing();

Review comment:
       don't named  isTransactionBufferInitializing, I think checkFilterIsReady 
is better, Maybe this filter is not necessarily only used by transaction.

##########
File path: pulsar-broker/pom.xml
##########
@@ -367,6 +386,24 @@
       <artifactId>pulsar-package-core</artifactId>
       <version>${project.version}</version>
     </dependency>
+    <dependency>
+      <groupId>org.apache.pulsar</groupId>
+      <artifactId>tiered-storage-file-system</artifactId>
+      <version>2.9.0-SNAPSHOT</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.pulsar</groupId>
+      <artifactId>tiered-storage-jcloud</artifactId>
+      <version>2.9.0-SNAPSHOT</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.jclouds</groupId>
+      <artifactId>jclouds-blobstore</artifactId>
+      <version>2.3.0</version>

Review comment:
       use ${jclouds.version}

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
##########
@@ -1353,6 +1360,7 @@ public void openLedgerFailed(ManagedLedgerException 
exception, Object ctx) {
 
 
                     ManagedLedgerConfig managedLedgerConfig = new 
ManagedLedgerConfig();
+                    
managedLedgerConfig.setTransactionEnable(serviceConfig.isTransactionCoordinatorEnabled());

Review comment:
       In my opinion, it seems unnecessary

##########
File path: 
tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java
##########
@@ -129,14 +131,26 @@ public LedgerMetadata getLedgerMetadata() {
                         log.warn("Reading a closed read handler. Ledger ID: 
{}, Read range: {}-{}", ledgerId, firstEntry, lastEntry);
                         throw new BKException.BKUnexpectedConditionException();
                     }
-                    int length = dataStream.readInt();
-                    if (length < 0) { // hit padding or new block
-                        
inputStream.seek(index.getIndexEntryForEntry(nextExpectedId).getDataOffset());
-                        continue;
+                    long entryId;
+                    int length;
+                    try {

Review comment:
       dont use try catch, this code have been created in try catch line 110

##########
File path: 
tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileStoreBackedReadHandleImpl.java
##########
@@ -105,30 +105,22 @@ public LedgerMetadata getLedgerMetadata() {
                 promise.completeExceptionally(new 
BKException.BKIncorrectParameterException());
                 return;
             }
-            long entriesToRead = (lastEntry - firstEntry) + 1;
             List<LedgerEntry> entries = new ArrayList<LedgerEntry>();
-            long nextExpectedId = firstEntry;
             LongWritable key = new LongWritable();
             BytesWritable value = new BytesWritable();
             try {
-                key.set(nextExpectedId - 1);
+                key.set(firstEntry - 1);
                 reader.seek(key);
-                while (entriesToRead > 0) {
-                    reader.next(key, value);
+                reader.next(key, value);

Review comment:
       why not use while do? if reader.next(key, value) return false, 
`value.getLength()` will throw NPE

##########
File path: 
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/OffloadFilter.java
##########
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger;
+
+import org.apache.bookkeeper.client.api.LedgerEntry;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+
+public interface OffloadFilter {
+
+    /**
+     * Check whether this Entry needs offload.Exclude the aborted message and 
transaction mark
+     * @return
+     */
+    boolean checkIfNeedOffload(LedgerEntry LedgerEntry);
+
+    /**
+     * The largest stable position that can be exposed to the consumer
+     * @return
+     */
+    PositionImpl getMaxReadPosition();
+
+    /**
+     * Check whether the status of TransactionBuffer is Initializing.
+     * @return
+     */
+    boolean isTransactionBufferInitializing();
+
+    /**
+     * Check whether the status of TransactionBuffer is NoSnapshot.
+     * @return
+     */
+    boolean isTransactionBufferNoSnapshot();

Review comment:
       I think we don't need this API, Is there any use for it I didn't think 
of?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to