eolivelli commented on pull request #14847:
URL: https://github.com/apache/pulsar/pull/14847#issuecomment-1077539317
This is another kind of Function that is unblocked by this patch.
The function reads any Object and in case it is a KeyValue<?, AVRO> it
removes a field from the AVRO struct and then writes the KeyValue downstream
(this is only an example, we should cache the Schema instances and organise
the code better)
```
@Slf4j
public class MyFunctionRemoveFieldTransform implements
Function<GenericObject, Void> {
private static final String FIELD_TO_REMOVE = "foo";
@Override
public Void process(GenericObject genericObject, Context context) throws
Exception {
Record<?> currentRecord = context.getCurrentRecord();
log.info("apply to {} {}", genericObject,
genericObject.getNativeObject());
log.info("record with schema {} {}", currentRecord.getSchema(),
currentRecord);
Object nativeObject = genericObject.getNativeObject();
Schema<?> schema = currentRecord.getSchema();
Schema outputSchema = schema;
Object outputObject = genericObject.getNativeObject();
if (schema instanceof KeyValueSchema && nativeObject instanceof
KeyValue) {
KeyValueSchema kvSchema = (KeyValueSchema) schema;
Schema keySchema = kvSchema.getKeySchema();
Schema valueSchema = kvSchema.getValueSchema();
// remove a column "foo" from the "valueSchema"
if (valueSchema.getSchemaInfo().getType() == SchemaType.AVRO) {
org.apache.avro.Schema avroSchema = (org.apache.avro.Schema)
valueSchema.getNativeSchema().get();
if (avroSchema.getField(FIELD_TO_REMOVE) != null) {
org.apache.avro.Schema.Parser parser = new
org.apache.avro.Schema.Parser();
org.apache.avro.Schema originalAvroSchema =
parser.parse(avroSchema.toString(false));
org.apache.avro.Schema modified =
org.apache.avro.Schema.createRecord(
originalAvroSchema.getName(),
originalAvroSchema.getDoc(), originalAvroSchema.getNamespace(),
originalAvroSchema.isError(),
originalAvroSchema.getFields().
stream()
.filter(f->!f.name().equals(FIELD_TO_REMOVE))
.map(f-> new
org.apache.avro.Schema.Field(f.name(), f.schema(), f.doc(), f.defaultVal(),
f.order()))
.collect(Collectors.toList()));
Schema newValueSchema = Schema.NATIVE_AVRO(modified);
outputSchema = Schema.KeyValue(keySchema,
newValueSchema, kvSchema.getKeyValueEncodingType());
KeyValue originalObject = (KeyValue) nativeObject;
GenericRecord value = (GenericRecord)
originalObject.getValue();
org.apache.avro.generic.GenericRecord genericRecord
= (org.apache.avro.generic.GenericRecord)
value.getNativeObject();
org.apache.avro.generic.GenericRecord newRecord = new
GenericData.Record(modified);
for (org.apache.avro.Schema.Field field :
modified.getFields()) {
newRecord.put(field.name(),
genericRecord.get(field.name()));
}
GenericDatumWriter writer = new
GenericDatumWriter(modified);
ByteArrayOutputStream oo = new ByteArrayOutputStream();
BinaryEncoder encoder =
EncoderFactory.get().directBinaryEncoder(oo, null);
writer.write(newRecord, encoder);
Object newValue = oo.toByteArray();
outputObject = new KeyValue(originalObject.getKey(),
newValue);
}
}
}
log.info("output {} schema {}", outputObject, outputSchema);
context.newOutputMessage(context.getOutputTopic(), outputSchema)
.value(outputObject).send();
return null;
}
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]