This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.9 by this push:
     new a0a9eb57416 [fix] [ml] Fix the incorrect total size if use ML 
interceptor (#19404)
a0a9eb57416 is described below

commit a0a9eb574163e4367bd6cd67e9d446e7eab2f461
Author: fengyubiao <[email protected]>
AuthorDate: Fri Feb 3 17:12:40 2023 +0800

    [fix] [ml] Fix the incorrect total size if use ML interceptor (#19404)
    
    (cherry picked from commit 33f40f6154ad20ffadd954d2fbb9fd1ee4268efb)
---
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java |  4 +-
 .../apache/bookkeeper/mledger/impl/OpAddEntry.java | 10 ++-
 .../impl/MangedLedgerInterceptorImplTest2.java     | 85 ++++++++++++++++++++++
 .../intercept/MangedLedgerInterceptorImplTest.java | 45 +++++++++++-
 4 files changed, 139 insertions(+), 5 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index be79048c7a7..5fa8f04e6a2 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -211,9 +211,9 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
     private final CallbackMutex offloadMutex = new CallbackMutex();
     private static final CompletableFuture<PositionImpl> NULL_OFFLOAD_PROMISE 
= CompletableFuture
             .completedFuture(PositionImpl.latest);
-    private volatile LedgerHandle currentLedger;
+    protected volatile LedgerHandle currentLedger;
     private long currentLedgerEntries = 0;
-    private long currentLedgerSize = 0;
+    protected long currentLedgerSize = 0;
     private long lastLedgerCreatedTimestamp = 0;
     private long lastLedgerCreationFailureTimestamp = 0;
     private long lastLedgerCreationInitiationTimestamp = 0;
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
index e390621c7c3..46069ea8f45 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
@@ -122,16 +122,22 @@ public class OpAddEntry extends SafeRunnable implements 
AddCallback, CloseCallba
 
     public void initiate() {
         if (STATE_UPDATER.compareAndSet(OpAddEntry.this, State.OPEN, 
State.INITIATED)) {
-
             ByteBuf duplicateBuffer = data.retainedDuplicate();
 
             // internally asyncAddEntry() will take the ownership of the 
buffer and release it at the end
             addOpCount = 
ManagedLedgerImpl.ADD_OP_COUNT_UPDATER.incrementAndGet(ml);
             lastInitTime = System.nanoTime();
             if (ml.getManagedLedgerInterceptor() != null) {
-                payloadProcessorHandle = 
ml.getManagedLedgerInterceptor().processPayloadBeforeLedgerWrite(this, 
duplicateBuffer);
+                long originalDataLen = data.readableBytes();
+                payloadProcessorHandle = 
ml.getManagedLedgerInterceptor().processPayloadBeforeLedgerWrite(this,
+                        duplicateBuffer);
                 if (payloadProcessorHandle != null) {
                     duplicateBuffer = 
payloadProcessorHandle.getProcessedPayload();
+                    // If data len of entry changes, correct "dataLength" and 
"currentLedgerSize".
+                    if (originalDataLen != duplicateBuffer.readableBytes()) {
+                        this.dataLength = duplicateBuffer.readableBytes();
+                        this.ml.currentLedgerSize += (dataLength - 
originalDataLen);
+                    }
                 }
             }
             ledger.asyncAddEntry(duplicateBuffer, this, addOpCount);
diff --git 
a/pulsar-broker/src/test/java/org/apache/bookkeeper/mledger/impl/MangedLedgerInterceptorImplTest2.java
 
b/pulsar-broker/src/test/java/org/apache/bookkeeper/mledger/impl/MangedLedgerInterceptorImplTest2.java
new file mode 100644
index 00000000000..f8c81005c8d
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/bookkeeper/mledger/impl/MangedLedgerInterceptorImplTest2.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.impl;
+
+import static org.testng.Assert.assertEquals;
+import static 
org.apache.pulsar.broker.intercept.MangedLedgerInterceptorImplTest.TestPayloadProcessor;
+import java.util.HashSet;
+import java.util.Set;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
+import org.apache.bookkeeper.mledger.intercept.ManagedLedgerInterceptor;
+import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
+import org.apache.pulsar.broker.intercept.ManagedLedgerInterceptorImpl;
+import org.apache.pulsar.broker.intercept.MangedLedgerInterceptorImplTest;
+import org.apache.pulsar.common.intercept.ManagedLedgerPayloadProcessor;
+import org.awaitility.Awaitility;
+import org.testng.annotations.Test;
+
+/***
+ * Differ to {@link MangedLedgerInterceptorImplTest}, this test can call 
{@link ManagedLedgerImpl}'s methods modified
+ * by "default".
+ */
+@Slf4j
+@Test(groups = "broker")
+public class MangedLedgerInterceptorImplTest2 extends MockedBookKeeperTestCase 
{
+
+    private void switchLedgerManually(ManagedLedgerImpl ledger){
+        LedgerHandle originalLedgerHandle = ledger.currentLedger;
+        ledger.ledgerClosed(ledger.currentLedger);
+        ledger.createLedgerAfterClosed();
+        Awaitility.await().until(() -> {
+            return ledger.state == ManagedLedgerImpl.State.LedgerOpened && 
ledger.currentLedger != originalLedgerHandle;
+        });
+    }
+
+    @Test
+    public void testCurrentLedgerSizeCorrectIfHasInterceptor() throws 
Exception {
+        final String mlName = "ml1";
+        final String cursorName = "cursor1";
+
+        // Registry interceptor.
+        ManagedLedgerConfig config = new ManagedLedgerConfig();
+        Set<ManagedLedgerPayloadProcessor> processors = new HashSet();
+        processors.add(new TestPayloadProcessor());
+        ManagedLedgerInterceptor interceptor = new 
ManagedLedgerInterceptorImpl(new HashSet(), processors);
+        config.setManagedLedgerInterceptor(interceptor);
+        config.setMaxEntriesPerLedger(100);
+
+        // Add one entry.
+        ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open(mlName, 
config);
+        ManagedCursorImpl cursor = (ManagedCursorImpl) 
ledger.openCursor(cursorName);
+        ledger.addEntry(new byte[1]);
+
+        // Mark "currentLedgerSize" and switch ledger.
+        long currentLedgerSize = ledger.getCurrentLedgerSize();
+        switchLedgerManually(ledger);
+
+        // verify.
+        assertEquals(currentLedgerSize, 
MangedLedgerInterceptorImplTest.calculatePreciseSize(ledger));
+
+        // cleanup.
+        cursor.close();
+        ledger.close();
+        factory.getEntryCacheManager().clear();
+        factory.shutdown();
+        config.setManagedLedgerInterceptor(null);
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/MangedLedgerInterceptorImplTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/MangedLedgerInterceptorImplTest.java
index 830e1894832..22562602e62 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/MangedLedgerInterceptorImplTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/MangedLedgerInterceptorImplTest.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.broker.intercept;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import lombok.Cleanup;
 import org.apache.bookkeeper.mledger.AsyncCallbacks;
@@ -31,6 +32,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.bookkeeper.mledger.impl.OpAddEntry;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.bookkeeper.mledger.intercept.ManagedLedgerInterceptor;
@@ -59,7 +61,7 @@ import static org.testng.Assert.assertNotNull;
 public class MangedLedgerInterceptorImplTest  extends MockedBookKeeperTestCase 
{
     private static final Logger log = 
LoggerFactory.getLogger(MangedLedgerInterceptorImplTest.class);
 
-    public class TestPayloadProcessor implements ManagedLedgerPayloadProcessor 
{
+    public static class TestPayloadProcessor implements 
ManagedLedgerPayloadProcessor {
         @Override
         public Processor inputProcessor() {
             return new Processor() {
@@ -157,6 +159,47 @@ public class MangedLedgerInterceptorImplTest  extends 
MockedBookKeeperTestCase {
         config.setManagedLedgerInterceptor(null);
     }
 
+    @Test
+    public void testTotalSizeCorrectIfHasInterceptor() throws Exception {
+        final String mlName = "ml1";
+        final String cursorName = "cursor1";
+
+        // Registry interceptor.
+        ManagedLedgerConfig config = new ManagedLedgerConfig();
+        Set<ManagedLedgerPayloadProcessor> processors = new HashSet();
+        processors.add(new TestPayloadProcessor());
+        ManagedLedgerInterceptor interceptor = new 
ManagedLedgerInterceptorImpl(new HashSet(), processors);
+        config.setManagedLedgerInterceptor(interceptor);
+        config.setMaxEntriesPerLedger(2);
+
+        // Add many entries and consume.
+        ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open(mlName, 
config);
+        ManagedCursorImpl cursor = (ManagedCursorImpl) 
ledger.openCursor(cursorName);
+        for (int i = 0; i < 5; i++){
+            cursor.delete(ledger.addEntry(new byte[1]));
+        }
+
+        // Trim ledgers.
+        CompletableFuture<Void> trimLedgerFuture = new CompletableFuture<>();
+        ledger.trimConsumedLedgersInBackground(trimLedgerFuture);
+        trimLedgerFuture.join();
+
+        // verify.
+        assertEquals(ledger.getTotalSize(), calculatePreciseSize(ledger));
+
+        // cleanup.
+        cursor.close();
+        ledger.close();
+        factory.getEntryCacheManager().clear();
+        factory.shutdown();
+        config.setManagedLedgerInterceptor(null);
+    }
+
+    public static long calculatePreciseSize(ManagedLedgerImpl ledger){
+        return ledger.getLedgersInfo().values().stream()
+                .map(info -> info.getSize()).reduce((l1,l2) -> l1 + 
l2).orElse(0L) + ledger.getCurrentLedgerSize();
+    }
+
     @Test(timeOut = 20000)
     public void testRecoveryIndex() throws Exception {
         final int MOCK_BATCH_SIZE = 2;

Reply via email to