This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-2.11 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 0333becfbfebc1ae4a435d56deefe8dc22a68634 Author: fengyubiao <[email protected]> AuthorDate: Tue Sep 20 11:21:54 2022 +0800 [fix][schema]ledger handle leak when update schema (#17283) ### Motivation in the schema update, will create a `ledgerHandle` and write data to BK, after that `ledgerHandle` is no longer useful and no other object holds references to it. `ledgerHandle` will be recycled with GC, but `ledgerHandle` also hold external connections, which will cause leakage. https://github.com/apache/pulsar/blob/40b9d7ea50cef54becb09f2543193e08375abe0b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java#L452-L456 ### Modifications after the schema is updated, close the `ledgerHandle`, just like schema-read: https://github.com/apache/pulsar/blob/40b9d7ea50cef54becb09f2543193e08375abe0b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java#L519-L525 --- .../service/schema/BookkeeperSchemaStorage.java | 13 ++++---- .../SchemaCompatibilityCheckTest.java | 35 ++++++++++++++++++++++ .../bookkeeper/client/PulsarMockLedgerHandle.java | 4 +++ 3 files changed, 47 insertions(+), 5 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java index 88a94198f4f..4451cab7c92 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java @@ -449,11 +449,14 @@ public class BookkeeperSchemaStorage implements SchemaStorage { byte[] data ) { SchemaStorageFormat.SchemaEntry schemaEntry = newSchemaEntry(index, data); - return createLedger(schemaId).thenCompose(ledgerHandle -> - addEntry(ledgerHandle, schemaEntry).thenApply(entryId -> - Functions.newPositionInfo(ledgerHandle.getId(), entryId) - ) - ); + return createLedger(schemaId).thenCompose(ledgerHandle -> { + final long ledgerId = ledgerHandle.getId(); + return addEntry(ledgerHandle, schemaEntry) + .thenApply(entryId -> { + ledgerHandle.closeAsync(); + return Functions.newPositionInfo(ledgerId, entryId); + }); + }); } @NotNull diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java index 1b5e4d67232..1167eff2ab0 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java @@ -25,6 +25,7 @@ import static org.testng.Assert.fail; import com.google.common.collect.Sets; import java.util.Collections; import java.util.concurrent.ThreadLocalRandom; +import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.client.api.Consumer; @@ -478,7 +479,41 @@ public class SchemaCompatibilityCheckTest extends MockedPulsarServiceBaseTest { consumerOne.close(); producerOne.close(); + } + @Test + public void testSchemaLedgerAutoRelease() throws Exception { + String namespaceName = PUBLIC_TENANT + "/default"; + String topicName = "persistent://" + namespaceName + "/tp"; + admin.namespaces().createNamespace(namespaceName, Sets.newHashSet(CLUSTER_NAME)); + admin.namespaces().setSchemaCompatibilityStrategy(namespaceName, SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE); + // Update schema 100 times. + for (int i = 0; i < 100; i++){ + Schema schema = Schema.JSON(SchemaDefinition.builder() + .withJsonDef(String.format(""" + { + "type": "record", + "name": "Test_Pojo", + "namespace": "org.apache.pulsar.schema.compatibility", + "fields": [{ + "name": "prop_%s", + "type": ["null", "string"], + "default": null + }] + } + """, i)) + .build()); + Producer producer = pulsarClient + .newProducer(schema) + .topic(topicName) + .create(); + producer.close(); + } + // The other ledgers are about 5. + Assert.assertTrue(mockBookKeeper.getLedgerMap().values().stream() + .filter(ledger -> !ledger.isFenced()) + .collect(Collectors.toList()).size() < 20); + admin.topics().delete(topicName, true); } @Test diff --git a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java index 7378a6f9106..7842959ee25 100644 --- a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java +++ b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java @@ -18,6 +18,7 @@ */ package org.apache.bookkeeper.client; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; @@ -29,6 +30,7 @@ import java.util.Enumeration; import java.util.List; import java.util.Queue; import java.util.concurrent.CompletableFuture; +import lombok.Getter; import org.apache.bookkeeper.client.AsyncCallback.AddCallback; import org.apache.bookkeeper.client.AsyncCallback.CloseCallback; import org.apache.bookkeeper.client.AsyncCallback.ReadCallback; @@ -58,6 +60,8 @@ public class PulsarMockLedgerHandle extends LedgerHandle { final byte[] passwd; final ReadHandle readHandle; long lastEntry = -1; + @VisibleForTesting + @Getter boolean fenced = false; public PulsarMockLedgerHandle(PulsarMockBookKeeper bk, long id,
