codelipenghui commented on code in PR #17283:
URL: https://github.com/apache/pulsar/pull/17283#discussion_r974131093


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java:
##########
@@ -437,11 +437,12 @@ private 
CompletableFuture<SchemaStorageFormat.PositionInfo> addNewSchemaEntryToS
         byte[] data
     ) {
         SchemaStorageFormat.SchemaEntry schemaEntry = newSchemaEntry(index, 
data);
-        return createLedger(schemaId).thenCompose(ledgerHandle ->
-            addEntry(ledgerHandle, schemaEntry).thenApply(entryId ->
-                Functions.newPositionInfo(ledgerHandle.getId(), entryId)
-            )
-        );
+        return createLedger(schemaId).thenCompose(ledgerHandle -> {
+            final long ledgerId = ledgerHandle.getId();
+            return addEntry(ledgerHandle, schemaEntry)
+                    .thenCompose(entryId -> 
ledgerHandle.closeAsync().thenApply(__ -> entryId))

Review Comment:
   I think we only need to trigger the close? The client-side doesn't need to 
wait for the close operation done.



##########
pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java:
##########
@@ -478,7 +479,52 @@ public void 
testProducerSendWithOldSchemaAndConsumerCanRead(SchemaCompatibilityS
 
         consumerOne.close();
         producerOne.close();
+    }
+
+    @Test
+    public void testSchemaLedgerAutoRelease() throws Exception {
+        String namespaceName = PUBLIC_TENANT + "/default";
+        String topicName = "persistent://" + namespaceName + "/tp";
+        admin.namespaces().createNamespace(namespaceName, 
Sets.newHashSet(CLUSTER_NAME));
+        admin.namespaces().setSchemaCompatibilityStrategy(namespaceName, 
SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE);
+        // Update schema 100 times.
+        for (int i = 0; i < 100; i++){
+            Schema schema = Schema.JSON(SchemaDefinition.builder()
+                    .withJsonDef(String.format("""
+                            {
+                               "type": "record",
+                               "name": "Test_Pojo",
+                               "namespace": 
"org.apache.pulsar.schema.compatibility",
+                               "fields": [{
+                                       "name": "prop_%s",
+                                       "type": ["null", "string"],
+                                       "default": null
+                               }]
+                            }
+                            """, i))
+                    .build());
+            Producer producer = pulsarClient
+                    .newProducer(schema)
+                    .topic(topicName)
+                    .create();
+            producer.close();
+        }
+        // The other ledgers are about 5.
+        Assert.assertTrue(mockBookKeeper.getLedgerMap().values().stream()
+                .filter(ledger -> !ledger.isFenced())
+                .collect(Collectors.toList()).size() < 20);
+        admin.topics().delete(topicName, true);
+    }
+
+    private static class DynamicClassLoader extends ClassLoader{

Review Comment:
   We don't need this class anymore?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to