Denovo1998 commented on PR #22469:
URL: https://github.com/apache/pulsar/pull/22469#issuecomment-2057198666

   @rdhabalia @lhotari @Technoboy- 
   This pr may cause some problems, please look at the third part of this test. 
For example, we want to evolve schema from PersonTwo to PersonOne. Data with 
schema PersonTwo cannot be consumed. And only the schema PersonOne is version 
1. 
   
   ```java
       /**
        * This test validates that consumer/producers should recover on topic 
whose 
        * schema ledgers are not able to open due to non-recoverable error.
        * 
        * @throws Exception
        */
       @Test
       public void testDeletedSchemaLedgerRecovery() throws Exception {
           final String tenant = PUBLIC_TENANT;
           final String namespace = "test-namespace-" + randomName(16);
           final String topicOne = "test-multi-version-schema-one";
           final String subName = "test";
           final String topicName = 
TopicName.get(TopicDomain.persistent.value(), tenant, namespace, 
topicOne).toString();
   
           admin.namespaces().createNamespace(tenant + "/" + namespace, 
Sets.newHashSet(CLUSTER_NAME));
   
           // (1) create schema
           Producer<Schemas.PersonTwo> producer = pulsarClient
                   
.newProducer(Schema.AVRO(SchemaDefinition.<Schemas.PersonTwo> 
builder().withAlwaysAllowNull(false)
                           
.withSupportSchemaVersioning(true).withPojo(Schemas.PersonTwo.class).build()))
                   .topic(topicName).create();
   
           Schemas.PersonTwo personTwo = new Schemas.PersonTwo();
           personTwo.setId(1);
           personTwo.setName("Pulsar");
   
           Consumer<Schemas.PersonTwo> consumer = pulsarClient
                   
.newConsumer(Schema.AVRO(SchemaDefinition.<Schemas.PersonTwo> 
builder().withAlwaysAllowNull(false)
                           
.withSupportSchemaVersioning(true).withPojo(Schemas.PersonTwo.class).build()))
                   
.subscriptionName(subName).subscriptionType(SubscriptionType.Shared).topic(topicName).subscribe();
   
           producer.send(personTwo);
           // producer.close();
           // consumer.close();
   
           // (2) Delete schema ledger
           MetadataCache<SchemaStorageFormat.SchemaLocator> locatorEntryCache = 
pulsar.getLocalMetadataStore()
                   .getMetadataCache(new 
MetadataSerde<SchemaStorageFormat.SchemaLocator>() {
                       @Override
                       public byte[] serialize(String path, 
SchemaStorageFormat.SchemaLocator value) {
                           return value.toByteArray();
                       }
   
                       @Override
                       public SchemaStorageFormat.SchemaLocator 
deserialize(String path, byte[] content, Stat stat)
                               throws IOException {
                           return 
SchemaStorageFormat.SchemaLocator.parseFrom(content);
                       }
                   });
           String path = "/schemas/public/" + namespace + 
"/test-multi-version-schema-one";
           SchemaLocator schema = locatorEntryCache.get(path).get().get();
           long ledgerId = schema.getInfo().getPosition().getLedgerId();
           pulsar.getBookKeeperClient().deleteLedger(ledgerId);
   
           // (3) Topic should recover from deleted schema and should allow to 
create consumer and producer?????????
           Consumer<Schemas.PersonOne> consumer1 = pulsarClient
                   
.newConsumer(Schema.AVRO(SchemaDefinition.<Schemas.PersonOne> 
builder().withAlwaysAllowNull(false)
                           
.withSupportSchemaVersioning(true).withPojo(Schemas.PersonOne.class).build()))
                   
.subscriptionName(subName).subscriptionType(SubscriptionType.Shared).topic(topicName).subscribe();
   
           Producer<Schemas.PersonOne> producer1 = pulsarClient
                   
.newProducer(Schema.AVRO(SchemaDefinition.<Schemas.PersonOne> 
builder().withAlwaysAllowNull(false)
                           
.withSupportSchemaVersioning(true).withPojo(Schemas.PersonOne.class).build()))
                   .topic(topicName).create();
           assertNotNull(consumer1);
           assertNotNull(producer1);
   
           Schemas.PersonOne personOne = new Schemas.PersonOne();
           personOne.setId(1);
   
           producer1.send(personOne);
           Assert.assertEquals(personOne, consumer1.receive().getValue());
   
           // Old "version" consumer cannot be consumed.
           producer.send(personTwo);
           Assert.assertEquals(personTwo, consumer.receive().getValue());
   
           consumer.close();
           producer.close();
           consumer1.close();
           producer1.close();
       }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to