codelipenghui commented on code in PR #17283: URL: https://github.com/apache/pulsar/pull/17283#discussion_r974131093
########## pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java: ########## @@ -437,11 +437,12 @@ private CompletableFuture<SchemaStorageFormat.PositionInfo> addNewSchemaEntryToS byte[] data ) { SchemaStorageFormat.SchemaEntry schemaEntry = newSchemaEntry(index, data); - return createLedger(schemaId).thenCompose(ledgerHandle -> - addEntry(ledgerHandle, schemaEntry).thenApply(entryId -> - Functions.newPositionInfo(ledgerHandle.getId(), entryId) - ) - ); + return createLedger(schemaId).thenCompose(ledgerHandle -> { + final long ledgerId = ledgerHandle.getId(); + return addEntry(ledgerHandle, schemaEntry) + .thenCompose(entryId -> ledgerHandle.closeAsync().thenApply(__ -> entryId)) Review Comment: I think we only need to trigger the close? The client-side doesn't need to wait for the close operation done. ########## pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java: ########## @@ -478,7 +479,52 @@ public void testProducerSendWithOldSchemaAndConsumerCanRead(SchemaCompatibilityS consumerOne.close(); producerOne.close(); + } + + @Test + public void testSchemaLedgerAutoRelease() throws Exception { + String namespaceName = PUBLIC_TENANT + "/default"; + String topicName = "persistent://" + namespaceName + "/tp"; + admin.namespaces().createNamespace(namespaceName, Sets.newHashSet(CLUSTER_NAME)); + admin.namespaces().setSchemaCompatibilityStrategy(namespaceName, SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE); + // Update schema 100 times. + for (int i = 0; i < 100; i++){ + Schema schema = Schema.JSON(SchemaDefinition.builder() + .withJsonDef(String.format(""" + { + "type": "record", + "name": "Test_Pojo", + "namespace": "org.apache.pulsar.schema.compatibility", + "fields": [{ + "name": "prop_%s", + "type": ["null", "string"], + "default": null + }] + } + """, i)) + .build()); + Producer producer = pulsarClient + .newProducer(schema) + .topic(topicName) + .create(); + producer.close(); + } + // The other ledgers are about 5. + Assert.assertTrue(mockBookKeeper.getLedgerMap().values().stream() + .filter(ledger -> !ledger.isFenced()) + .collect(Collectors.toList()).size() < 20); + admin.topics().delete(topicName, true); + } + + private static class DynamicClassLoader extends ClassLoader{ Review Comment: We don't need this class anymore? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org