This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit bf89f0807446f6e65d9db4cff24cce974e34eb01
Author: sinan liu <liusinan1...@gmail.com>
AuthorDate: Tue Apr 16 21:19:44 2024 +0800

    [fix][test] SchemaMap in AutoConsumeSchema has been reused (#22500)
    
    (cherry picked from commit ffdfc0c4e0881c682132e79c3cbf9768b1ab4f89)
---
 .../client/api/SimpleProducerConsumerTest.java     | 66 +++++++++++++---------
 1 file changed, 38 insertions(+), 28 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
index 7552b84a1c5..691f501777e 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -4329,6 +4329,10 @@ public class SimpleProducerConsumerTest extends 
ProducerConsumerBase {
     public void testAccessAvroSchemaMetadata(Schema<MyBean> schema) throws 
Exception {
         log.info("-- Starting {} test --", methodName);
 
+        if (pulsarClient == null) {
+            pulsarClient = newPulsarClient(lookupUrl.toString(), 0);
+        }
+
         final String topic = "persistent://my-property/my-ns/accessSchema";
         Consumer<GenericRecord> consumer = 
pulsarClient.newConsumer(Schema.AUTO_CONSUME())
                 .topic(topic)
@@ -4344,37 +4348,43 @@ public class SimpleProducerConsumerTest extends 
ProducerConsumerBase {
         producer.send(payload);
         producer.close();
 
-        GenericRecord res = consumer.receive(RECEIVE_TIMEOUT_SECONDS, 
TimeUnit.SECONDS).getValue();
-        consumer.close();
-        assertEquals(schema.getSchemaInfo().getType(), res.getSchemaType());
-        org.apache.avro.generic.GenericRecord nativeAvroRecord = null;
-        JsonNode nativeJsonRecord = null;
-        if (schema.getSchemaInfo().getType() == SchemaType.AVRO) {
-            nativeAvroRecord = (org.apache.avro.generic.GenericRecord) 
res.getNativeObject();
-            assertNotNull(nativeAvroRecord);
-        } else {
-            nativeJsonRecord = (JsonNode) res.getNativeObject();
-            assertNotNull(nativeJsonRecord);
-        }
-        for (org.apache.pulsar.client.api.schema.Field f : res.getFields()) {
-            log.info("field {} {}", f.getName(), res.getField(f));
-            assertEquals("field", f.getName());
-            assertEquals("aaaaaaaaaaaaaaaaaaaaaaaaa", res.getField(f));
-
-            if (nativeAvroRecord != null) {
-                // test that the native schema is accessible
-                org.apache.avro.Schema.Field fieldDetails = 
nativeAvroRecord.getSchema().getField(f.getName());
-                // a nullable string is an UNION
-                assertEquals(org.apache.avro.Schema.Type.UNION, 
fieldDetails.schema().getType());
-                
assertTrue(fieldDetails.schema().getTypes().stream().anyMatch(s -> s.getType() 
== org.apache.avro.Schema.Type.STRING));
-                
assertTrue(fieldDetails.schema().getTypes().stream().anyMatch(s -> s.getType() 
== org.apache.avro.Schema.Type.NULL));
+        try {
+            GenericRecord res = consumer.receive(RECEIVE_TIMEOUT_SECONDS, 
TimeUnit.SECONDS).getValue();
+            consumer.close();
+            assertEquals(schema.getSchemaInfo().getType(), 
res.getSchemaType());
+            org.apache.avro.generic.GenericRecord nativeAvroRecord = null;
+            JsonNode nativeJsonRecord = null;
+            if (schema.getSchemaInfo().getType() == SchemaType.AVRO) {
+                nativeAvroRecord = (org.apache.avro.generic.GenericRecord) 
res.getNativeObject();
+                assertNotNull(nativeAvroRecord);
             } else {
-                assertEquals(JsonNodeType.STRING, 
nativeJsonRecord.get("field").getNodeType());
+                nativeJsonRecord = (JsonNode) res.getNativeObject();
+                assertNotNull(nativeJsonRecord);
+            }
+            for (org.apache.pulsar.client.api.schema.Field f : 
res.getFields()) {
+                log.info("field {} {}", f.getName(), res.getField(f));
+                assertEquals("field", f.getName());
+                assertEquals("aaaaaaaaaaaaaaaaaaaaaaaaa", res.getField(f));
+
+                if (nativeAvroRecord != null) {
+                    // test that the native schema is accessible
+                    org.apache.avro.Schema.Field fieldDetails = 
nativeAvroRecord.getSchema().getField(f.getName());
+                    // a nullable string is an UNION
+                    assertEquals(org.apache.avro.Schema.Type.UNION, 
fieldDetails.schema().getType());
+                    
assertTrue(fieldDetails.schema().getTypes().stream().anyMatch(s -> s.getType() 
== org.apache.avro.Schema.Type.STRING));
+                    
assertTrue(fieldDetails.schema().getTypes().stream().anyMatch(s -> s.getType() 
== org.apache.avro.Schema.Type.NULL));
+                } else {
+                    assertEquals(JsonNodeType.STRING, 
nativeJsonRecord.get("field").getNodeType());
+                }
             }
+            assertEquals(1, res.getFields().size());
+        } catch (Exception e) {
+            fail();
+        } finally {
+            pulsarClient.shutdown();
+            pulsarClient = null;
+            admin.schemas().deleteSchema(topic);
         }
-        assertEquals(1, res.getFields().size());
-
-        admin.schemas().deleteSchema(topic);
     }
 
     @Test(timeOut = 100000)

Reply via email to