BewareMyPower opened a new pull request, #15622:
URL: https://github.com/apache/pulsar/pull/15622

   ### Motivation
   
   When I tried to consume a topic via a consumer with Avro schema while
   the topic was produced by a producer without schema, the consumption
   failed. It's because `MultiVersionSchemaInfoProvider#getSchemaByVersion`
   doesn't check if `schemaVersion` is an empty byte array. If yes, a
   `BytesSchemaVersion` of an empty array will be passed to `cache.get` and
   then passed to `loadSchema`.
   
   
https://github.com/apache/pulsar/blob/f90ef9c6ad88c4f94ce1fcc682bbf3f3189cbf2a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/MultiVersionSchemaInfoProvider.java#L94-L98
   
   However, `LookupService#getSchema` cannot accept an empty byte array as
   the version, so `loadSchema` failed.
   
   The root cause is that the schema version was set unexpectly when
   messages were sent by a producer without schema. At broker side, the
   returned schema version is never null. If the schema version was an
   empty array, then it means the message doesn't have schema. However, at
   Java client side, the empty byte array is treated as an existing schema
   and the schema version field will be set. When consumer receives the
   message, it will try to load schema whose version is an empty array.
   
   ### Modifications
   
   - When a producer receives a response whose schema version is an empty
     byte array, just ignore it.
   - Fix the existing tests.
   - Add `testConsumeAvroMessagesWithoutSchema` to cover the case that
     messages without schema are compatible with the schema.
   
   This patch also modifies the existing behavior when
   `schemaValidationEnforced` is false and messages are produced by a
   producer without schema and consumed by a consumer with schema.
   
   1. If the message is incompatible with the schema
      - Before: `getSchemaVersion` returns an empty array and `getValue`
        fails with `UncheckedExecutionException`:
   
        > com.google.common.util.concurrent.UncheckedExecutionException: 
org.apache.commons.lang3.SerializationException: Failed at fetching schema info 
for EMPTY
   
      - After: `getSchemaVersion` returns `null` and `getValue` fails with
        `SchemaSerializationException`.
   
   2. Otherwise (the message is compatible with the schema)
      - Before: `getSchemaVersion` returns an empty array and `getValue`
        fails with `UncheckedExecutionException`.
      - After: `getSchemaVersion` returns `null` and `getValue` returns the
        correctly decoded object.
   
   ### Documentation
   
   Check the box below or label this PR directly.
   
   Need to update docs? 
   
   - [ ] `doc-required` 
   (Your PR needs to update docs and you will update later)
     
   - [x] `no-need-doc` 
   (Please explain why)
     
   - [ ] `doc` 
   (Your PR contains doc changes)
   
   - [ ] `doc-added`
   (Docs have been already added)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to