BewareMyPower opened a new pull request, #15622: URL: https://github.com/apache/pulsar/pull/15622
### Motivation When I tried to consume a topic via a consumer with Avro schema while the topic was produced by a producer without schema, the consumption failed. It's because `MultiVersionSchemaInfoProvider#getSchemaByVersion` doesn't check if `schemaVersion` is an empty byte array. If yes, a `BytesSchemaVersion` of an empty array will be passed to `cache.get` and then passed to `loadSchema`. https://github.com/apache/pulsar/blob/f90ef9c6ad88c4f94ce1fcc682bbf3f3189cbf2a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/MultiVersionSchemaInfoProvider.java#L94-L98 However, `LookupService#getSchema` cannot accept an empty byte array as the version, so `loadSchema` failed. The root cause is that the schema version was set unexpectly when messages were sent by a producer without schema. At broker side, the returned schema version is never null. If the schema version was an empty array, then it means the message doesn't have schema. However, at Java client side, the empty byte array is treated as an existing schema and the schema version field will be set. When consumer receives the message, it will try to load schema whose version is an empty array. ### Modifications - When a producer receives a response whose schema version is an empty byte array, just ignore it. - Fix the existing tests. - Add `testConsumeAvroMessagesWithoutSchema` to cover the case that messages without schema are compatible with the schema. This patch also modifies the existing behavior when `schemaValidationEnforced` is false and messages are produced by a producer without schema and consumed by a consumer with schema. 1. If the message is incompatible with the schema - Before: `getSchemaVersion` returns an empty array and `getValue` fails with `UncheckedExecutionException`: > com.google.common.util.concurrent.UncheckedExecutionException: org.apache.commons.lang3.SerializationException: Failed at fetching schema info for EMPTY - After: `getSchemaVersion` returns `null` and `getValue` fails with `SchemaSerializationException`. 2. Otherwise (the message is compatible with the schema) - Before: `getSchemaVersion` returns an empty array and `getValue` fails with `UncheckedExecutionException`. - After: `getSchemaVersion` returns `null` and `getValue` returns the correctly decoded object. ### Documentation Check the box below or label this PR directly. Need to update docs? - [ ] `doc-required` (Your PR needs to update docs and you will update later) - [x] `no-need-doc` (Please explain why) - [ ] `doc` (Your PR contains doc changes) - [ ] `doc-added` (Docs have been already added) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
