eolivelli commented on pull request #14847:
URL: https://github.com/apache/pulsar/pull/14847#issuecomment-1077539317


   This is another kind of Function that is unblocked by this patch.
   The function reads any Object and in case it is a KeyValue<?, AVRO> it 
removes a field from the AVRO struct and then writes the KeyValue downstream
   (this is only an example, we should cache the Schema instances and organise 
the code better)
   
   
   ```
   @Slf4j
   public class MyFunctionRemoveFieldTransform implements 
Function<GenericObject, Void> {
   
       private static final String FIELD_TO_REMOVE = "foo";
   
       @Override
       public Void process(GenericObject genericObject, Context context) throws 
Exception {
           Record<?> currentRecord = context.getCurrentRecord();
           log.info("apply to {} {}", genericObject, 
genericObject.getNativeObject());
           log.info("record with schema {} {}", currentRecord.getSchema(), 
currentRecord);
           Object nativeObject = genericObject.getNativeObject();
           Schema<?> schema = currentRecord.getSchema();
   
           Schema outputSchema = schema;
           Object outputObject = genericObject.getNativeObject();
   
           if (schema instanceof KeyValueSchema && nativeObject instanceof 
KeyValue)  {
               KeyValueSchema kvSchema = (KeyValueSchema) schema;
   
               Schema keySchema = kvSchema.getKeySchema();
               Schema valueSchema = kvSchema.getValueSchema();
               // remove a column "foo" from the "valueSchema"
               if (valueSchema.getSchemaInfo().getType() == SchemaType.AVRO) {
   
                   org.apache.avro.Schema avroSchema = (org.apache.avro.Schema) 
valueSchema.getNativeSchema().get();
                   if (avroSchema.getField(FIELD_TO_REMOVE) != null) {
                       org.apache.avro.Schema.Parser parser = new 
org.apache.avro.Schema.Parser();
                       org.apache.avro.Schema originalAvroSchema = 
parser.parse(avroSchema.toString(false));
                       org.apache.avro.Schema modified = 
org.apache.avro.Schema.createRecord(
                               originalAvroSchema.getName(), 
originalAvroSchema.getDoc(), originalAvroSchema.getNamespace(), 
originalAvroSchema.isError(),
                               originalAvroSchema.getFields().
                                   stream()
                                   .filter(f->!f.name().equals(FIELD_TO_REMOVE))
                                   .map(f-> new 
org.apache.avro.Schema.Field(f.name(), f.schema(), f.doc(), f.defaultVal(), 
f.order()))
                                   .collect(Collectors.toList()));
   
                       Schema newValueSchema = Schema.NATIVE_AVRO(modified);
   
                       outputSchema = Schema.KeyValue(keySchema, 
newValueSchema, kvSchema.getKeyValueEncodingType());
                       KeyValue originalObject = (KeyValue) nativeObject;
   
                       GenericRecord value = (GenericRecord) 
originalObject.getValue();
                       org.apache.avro.generic.GenericRecord genericRecord
                               = (org.apache.avro.generic.GenericRecord) 
value.getNativeObject();
   
                       org.apache.avro.generic.GenericRecord newRecord = new 
GenericData.Record(modified);
                       for (org.apache.avro.Schema.Field field : 
modified.getFields()) {
                           newRecord.put(field.name(), 
genericRecord.get(field.name()));
                       }
                       GenericDatumWriter writer = new 
GenericDatumWriter(modified);
                       ByteArrayOutputStream oo = new ByteArrayOutputStream();
                       BinaryEncoder encoder = 
EncoderFactory.get().directBinaryEncoder(oo, null);
                       writer.write(newRecord, encoder);
                       Object newValue = oo.toByteArray();
   
                       outputObject = new KeyValue(originalObject.getKey(), 
newValue);
                   }
   
   
               }
           }
           log.info("output {} schema {}", outputObject, outputSchema);
           context.newOutputMessage(context.getOutputTopic(), outputSchema)
                   .value(outputObject).send();
           return null;
       }
   }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to