This is an automated email from the ASF dual-hosted git repository.

bogong pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.9 by this push:
     new ab568221c9d [fix][schema]ledger handle leak when update schema (#17283)
ab568221c9d is described below

commit ab568221c9d573be7b265aa337292fe1c660e403
Author: fengyubiao <[email protected]>
AuthorDate: Tue Sep 20 11:21:54 2022 +0800

    [fix][schema]ledger handle leak when update schema (#17283)
    
    in the schema update, will create a `ledgerHandle` and write data to BK, 
after that `ledgerHandle` is no longer useful and no other object holds 
references to it. `ledgerHandle` will be recycled with GC, but `ledgerHandle` 
also hold external connections, which will cause leakage.
    
    
https://github.com/apache/pulsar/blob/40b9d7ea50cef54becb09f2543193e08375abe0b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java#L452-L456
    
    after the schema is updated, close the `ledgerHandle`, just like 
schema-read:
    
    
https://github.com/apache/pulsar/blob/40b9d7ea50cef54becb09f2543193e08375abe0b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java#L519-L525
    (cherry picked from commit 26204503494871db3818b4d2f35071c6ee1b5b96)
---
 .../service/schema/BookkeeperSchemaStorage.java    | 13 +++++++----
 .../SchemaCompatibilityCheckTest.java              | 27 ++++++++++++++++++++++
 .../bookkeeper/client/PulsarMockLedgerHandle.java  |  6 +++--
 3 files changed, 39 insertions(+), 7 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
index 9cac21da67d..53157c7b1b2 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
@@ -452,11 +452,14 @@ public class BookkeeperSchemaStorage implements 
SchemaStorage {
         byte[] data
     ) {
         SchemaStorageFormat.SchemaEntry schemaEntry = newSchemaEntry(index, 
data);
-        return createLedger(schemaId).thenCompose(ledgerHandle ->
-            addEntry(ledgerHandle, schemaEntry).thenApply(entryId ->
-                Functions.newPositionInfo(ledgerHandle.getId(), entryId)
-            )
-        );
+        return createLedger(schemaId).thenCompose(ledgerHandle -> {
+            final long ledgerId = ledgerHandle.getId();
+            return addEntry(ledgerHandle, schemaEntry)
+                    .thenApply(entryId -> {
+                        ledgerHandle.closeAsync();
+                        return Functions.newPositionInfo(ledgerId, entryId);
+                    });
+        });
     }
 
     @NotNull
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java
index 1b5e4d67232..96b82bafbb4 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java
@@ -25,6 +25,7 @@ import static org.testng.Assert.fail;
 import com.google.common.collect.Sets;
 import java.util.Collections;
 import java.util.concurrent.ThreadLocalRandom;
+import java.util.stream.Collectors;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.client.api.Consumer;
@@ -478,7 +479,33 @@ public class SchemaCompatibilityCheckTest extends 
MockedPulsarServiceBaseTest {
 
         consumerOne.close();
         producerOne.close();
+    }
 
+    @Test
+    public void testSchemaLedgerAutoRelease() throws Exception {
+        String namespaceName = PUBLIC_TENANT + "/default";
+        String topicName = "persistent://" + namespaceName + "/tp";
+        admin.namespaces().createNamespace(namespaceName, 
Sets.newHashSet(CLUSTER_NAME));
+        admin.namespaces().setSchemaCompatibilityStrategy(namespaceName, 
SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE);
+        // Update schema 100 times.
+        for (int i = 0; i < 100; i++){
+            Schema schema = Schema.JSON(SchemaDefinition.builder()
+                    .withJsonDef(String.format("{\"type\": 
\"record\",\"name\": "
+                            + "\"Test_Pojo\",\"namespace\": 
\"org.apache.pulsar.schema.compatibility\","
+                            + "\"fields\": [{\"name\": \"prop_%s\",\"type\": "
+                            + "[\"null\", \"string\"],\"default\": null}]}", 
i))
+                    .build());
+            Producer producer = pulsarClient
+                    .newProducer(schema)
+                    .topic(topicName)
+                    .create();
+            producer.close();
+        }
+        // The other ledgers are about 5.
+        Assert.assertTrue(mockBookKeeper.getLedgerMap().values().stream()
+                .filter(ledger -> !ledger.isFenced())
+                .collect(Collectors.toList()).size() < 20);
+        admin.topics().delete(topicName, true);
     }
 
     @Test
diff --git 
a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java
 
b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java
index 8a62e42e05a..3f9b17b312f 100644
--- 
a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java
+++ 
b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java
@@ -18,6 +18,7 @@
  */
 package org.apache.bookkeeper.client;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
 
 import io.netty.buffer.ByteBuf;
@@ -31,7 +32,7 @@ import java.util.Enumeration;
 import java.util.List;
 import java.util.Queue;
 import java.util.concurrent.CompletableFuture;
-
+import lombok.Getter;
 import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
 import org.apache.bookkeeper.client.AsyncCallback.CloseCallback;
 import org.apache.bookkeeper.client.AsyncCallback.ReadCallback;
@@ -44,7 +45,6 @@ import org.apache.bookkeeper.client.api.WriteFlag;
 import org.apache.bookkeeper.client.impl.LedgerEntryImpl;
 import org.apache.bookkeeper.common.concurrent.FutureUtils;
 import org.apache.bookkeeper.net.BookieId;
-import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.versioning.LongVersion;
 import org.apache.bookkeeper.versioning.Versioned;
 import org.slf4j.Logger;
@@ -62,6 +62,8 @@ public class PulsarMockLedgerHandle extends LedgerHandle {
     final byte[] passwd;
     final ReadHandle readHandle;
     long lastEntry = -1;
+    @VisibleForTesting
+    @Getter
     boolean fenced = false;
 
     public PulsarMockLedgerHandle(PulsarMockBookKeeper bk, long id,

Reply via email to