Nflrijal opened a new pull request, #27353:
URL: https://github.com/apache/flink/pull/27353

   ## What is the purpose of the change
   
   This pull request introduces `RowFieldExtractorSchema`, a new 
`SerializationSchema` implementation that extracts and serializes a specific 
field from a Row object. This addresses the common use case where users need to 
serialize different fields of a Row separately, particularly for Kafka 
producers where keys and values require independent serialization.
   
   Previously, users had to implement custom serialization schemas or use 
workarounds to extract individual Row fields. This change provides a reusable, 
type-safe solution that simplifies the common pattern of using one Row field as 
a Kafka key and another as the value.
   
   ## Brief change log
   
   - Add `RowFieldExtractorSchema` class in `flink-core` that implements 
`SerializationSchema<Row>`
   - Add `RowFieldExtractorSchemaTest` with comprehensive unit tests covering:
     - Basic field extraction and serialization
     - Invalid field index handling
     - Null value handling
     - Empty Row handling
     - Multiple field extraction scenarios
   - Add Python bindings in 
`pyflink.common.serialization.RowFieldExtractorSchema`
   - Add Python unit tests in `test_serialization_schemas.py`
   - Add Kafka integration test demonstrating real-world usage
   - Add Javadoc and Python docstrings with usage examples
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
   **Java tests:**
   - Added `RowFieldExtractorSchemaTest` with 6 test cases covering normal 
operation, edge cases, and error handling
   - Run with: `mvn test -pl flink-core -Dtest=RowFieldExtractorSchemaTest`
   
   **Python tests:**
   - Added 13 unit tests in 
`test_serialization_schemas.py::SerializationSchemasTests::test_row_field_extractor_*`
   - Added integration test `kafka_test.py` demonstrating Kafka key/value 
serialization
   - Run with: `pytest pyflink/common/tests/test_serialization_schemas.py -k 
RowFieldExtractor -v`
   
   **Manual verification:**
   - Validated end-to-end with real Kafka cluster
   - Verified key/value extraction with multiple Row fields
   - Confirmed UTF-8 encoding works correctly
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): **no**
     - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: **yes** (new `@Public(Evolving)` class added)
     - The serializers: **yes** (adds new serialization schema)
     - The runtime per-record code paths (performance sensitive): **no** (only 
used when explicitly configured)
     - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: **no**
     - The S3 file system connector: **no**
   
   ## Documentation
   
     - Does this pull request introduce a new feature? **yes**
     - If yes, how is the feature documented? **JavaDocs / Python docstrings 
with usage examples**
       - Javadoc includes detailed class description and usage example with 
Kafka
       - Python docstring includes parameters, return types, and example usage
       - Test cases serve as additional usage documentation
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to