cbornet opened a new issue, #15902:
URL: https://github.com/apache/pulsar/issues/15902

   
   <!---
   Instructions for creating a PIP using this issue template:
   
    1. The author(s) of the proposal will create a GitHub issue ticket using 
this template.
       (Optionally, it can be helpful to send a note discussing the proposal to
       [email protected] mailing list before submitting this GitHub issue. 
This discussion can
       help developers gauge interest in the proposed changes before 
formalizing the proposal.)
    2. The author(s) will send a note to the [email protected] mailing list
       to start the discussion, using subject prefix `[PIP] xxx`. To determine 
the appropriate PIP
       number `xxx`, inspect the mailing list 
(https://lists.apache.org/[email protected])
       for the most recent PIP. Add 1 to that PIP"s number to get your PIP"s 
number.
    3. Based on the discussion and feedback, some changes might be applied by
       the author(s) to the text of the proposal.
    4. Once some consensus is reached, there will be a vote to formally approve
       the proposal. The vote will be held on the [email protected] 
mailing list. Everyone
       is welcome to vote on the proposal, though it will considered to be 
binding
       only the vote of PMC members. It will be required to have a lazy 
majority of
       at least 3 binding +1s votes. The vote should stay open for at least 48 
hours.
    5. When the vote is closed, if the outcome is positive, the state of the
       proposal is updated and the Pull Requests associated with this proposal 
can
       start to get merged into the master branch.
   
   -->
   
   ## Motivation
   
   Currently, when users want to modify the data in Pulsar, they need to write 
a Function.
   For a lot of use cases, it would be handy for them to be able to use a 
ready-made built-in Function that implements the most common basic 
transformations like the ones available in [Kafka Connect’s 
SMTs](https://docs.confluent.io/platform/current/connect/transforms/overview.html).
   This removes users the burden of writing the Function themselves, having to 
understanding the perks of Pulsar Schemas, coding in a language that they may 
not master (probably Java if they want to do advanced stuff), and they benefit 
from battle-tested, maintained, performance-optimised code.
   
   ## Goal
   
   This PIP is about providing a `TransformFunction` that executes a sequence 
of basic transformations on the data.
   The `TransformFunction` shall be easy to configure, launchable as a built-in 
NAR.
   The `TransformFunction` shall be able to apply a sequence of common 
transformations in-memory so we don’t need to execute the `TransformFunction` 
multiple times and read/write to a topic each time.
   
   This PIP is not about appending such a Function to a Source or a Sink. 
   While this is the ultimate goal, so we can provide an experience similar to 
Kafka SMTs and avoid a read/write to a topic, this work will be done in a 
future PIP. 
   It is expected that the code written for this PIP will be reusable in this 
future work. 
   
   ## API Changes
   
   This PIP will introduce a new `transform` module in `pulsar-function` 
multi-module project.
 The produced artifact will be a NAR of the 
TransformFunction.
   
   ## Implementation
   
   When it processes a record, `TransformFunction` will :

   
   * Create a mutable structure `TransformContext` that contains
   
   ```java
   @Data
   public class TransformContext {
       private Context context;
       private Schema<?> keySchema;
       private Object keyObject;
       private boolean keyModified;
       private Schema<?> valueSchema;
       private Object valueObject;
       private boolean valueModified;
       private KeyValueEncodingType keyValueEncodingType;
       private String key;
       private Map<String, String> properties;
       private String outputTopic;
   ```
   
   If the record is a `KeyValue`, the key and value schemas and object are 
unpacked. Otherwise the `keySchema` and `keyObject` are null.
   
   * Call in sequence the process method of a series of `TransformStep` on this 
`TransformContext`
   
   ```java
   public interface TransformStep {
       void process(TransformContext transformContext) throws Exception;
   }
   ```
   
   Each `TransformStep` can then modify the `TransformContext` as needed.

   
   * Call the `send()` method of the `TransformContext` which will create the 
message to send to the outputTopic, repacking the KeyValue if needed.

   
   The `TransformFunction` will read its configuration as Json from 
`userConfig` in the format:
   
   ```json
   {
     "steps": [
       {
         "type": "drop-fields", "fields": "keyField1,keyField2", "part": "key"
       },
       {
         "type": "merge-key-value"
       },
       {
         "type": "unwrap-key-value"
       },
       {
         "type": "cast", "schema-type": "STRING"
       }
     ]
   }
   ```
   
   Each step is defined by its `type` and uses its own arguments.
   
   This example config applied on a KeyValue<AVRO, AVRO> input record with 
value `{key={keyField1: key1, keyField2: key2, keyField3: key3}, 
value={valueField1: value1, valueField2: value2, valueField3: value3}}` will 
give after each step:
   ```
   {key={keyField1: key1, keyField2: key2, keyField3: key3}, 
value={valueField1: value1, valueField2: value2, valueField3: 
value3}}(KeyValue<AVRO, AVRO>)

              |
              | ”type": "drop-fields", "fields": "keyField1,keyField2”, "part": 
"key”
              |
   {key={keyField3: key3}, value={valueField1: value1, valueField2: value2, 
valueField3: value3}} (KeyValue<AVRO, AVRO>)
              |
              | "type": "merge-key-value"
              |

   {key={keyField3: key3}, value={keyField3: key3, valueField1: value1, 
valueField2: value2, valueField3: value3}} (KeyValue<AVRO, AVRO>)
              |
              | "type": "unwrap-key-value"
              |
   {keyField3: key3, valueField1: value1, valueField2: value2, valueField3: 
value3} (AVRO)
              |
              | "type": "cast", "schema-type": "STRING"
              |
   {"keyField3": "key3", "valueField1": "value1", "valueField2": "value2", 
"valueField3": "value3"} (STRING)
   ```
   
   `TransformFunction` will be built as a NAR including a `pulsar-io.yaml` 
service file so it can be registered as a built-in function with name 
`transform`.
   
   ## Reject Alternatives
   
   None
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to