arvindKandpal-ksolves opened a new pull request, #124: URL: https://github.com/apache/flink-connector-pulsar/pull/124
## Purpose of the change This PR fixes [FLINK-36235](https://issues.apache.org/jira/browse/FLINK-36235). Currently, when a message fails to deserialize and returns `null`, the `PulsarDeserializationSchemaWrapper` emits this `null` value downstream. This violates Flink's `DeserializationSchema` contract (where a `null` return value means "drop this record") and can cause `NullPointerException`s in downstream operators. This PR adds a simple null-check guard to safely drop these corrupted or null records. ## Brief change log - Added an `if (instance != null)` check before emitting the record in `PulsarDeserializationSchemaWrapper#deserialize`. - Added a regression test `wrapperDropsNullDeserializedRecord` and a `CountingCollector` helper class in `PulsarDeserializationSchemaTest` to verify that null records are correctly dropped without errors. ## Verifying this change This change added tests and can be verified as follows: - Added unit tests in `PulsarDeserializationSchemaTest.java` to explicitly test and verify the filtering behavior when the inner deserializer returns `null`. - Verified locally using `mvn -pl flink-connector-pulsar test`. ## Significant changes - [ ] Dependencies have been added or upgraded - [ ] Public API has been changed (Public API is any class annotated with `@Public(Evolving)`) - [ ] Serializers have been changed - [ ] New feature has been introduced - If yes, how is this documented? (not applicable / docs / JavaDocs / not documented) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
