This is an automated email from the ASF dual-hosted git repository.

nicoloboschi pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.10 by this push:
     new ca8976292cb Pulsar Functions: allow a Function<GenericObject,?> to 
access the original Schema of the Message and use it (#14847)
ca8976292cb is described below

commit ca8976292cbb32f0a8b3d01ab1b3c25aa02d454b
Author: Enrico Olivelli <[email protected]>
AuthorDate: Fri Apr 1 10:53:37 2022 +0200

    Pulsar Functions: allow a Function<GenericObject,?> to access the original 
Schema of the Message and use it (#14847)
    
    (cherry picked from commit 193f5b2f74e919c05d0df651981cef439f55472f)
---
 .../client/impl/schema/AutoConsumeSchema.java      |  12 ++
 .../pulsar/functions/source/PulsarSource.java      |  11 ++
 .../functions/GenericObjectFunction.java           |  64 ++++++++
 .../functions/RemoveAvroFieldFunction.java         | 159 ++++++++++++++++++++
 .../integration/functions/PulsarFunctionsTest.java | 163 ++++++++++++++++++---
 .../functions/PulsarFunctionsTestBase.java         |   6 +
 .../functions/java/PulsarFunctionsJavaTest.java    |  20 +++
 7 files changed, 416 insertions(+), 19 deletions(-)

diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java
index 0533ccdf86d..5e7568b8e18 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java
@@ -298,6 +298,18 @@ public class AutoConsumeSchema implements 
Schema<GenericRecord> {
         return schemaMap.get(getSchemaVersion(schemaVersion));
     }
 
+    /**
+     * Get a specific schema version, fetching from the Registry if it is not 
loaded yet.
+     * This method is not intended to be used by applications.
+     * @param schemaVersion the version
+     * @return the Schema at the specific version
+     * @see #atSchemaVersion(byte[])
+     */
+    public Schema<?> unwrapInternalSchema(byte[] schemaVersion) {
+        fetchSchemaIfNeeded(BytesSchemaVersion.of(schemaVersion));
+        return getInternalSchema(schemaVersion);
+    }
+
     /**
      * It may happen that the schema is not loaded but we need it, for 
instance in order to call getSchemaInfo()
      * We cannot call this method in getSchemaInfo, because getSchemaInfo is 
called in many
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
index da2facc5f94..05cb4531dc2 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
@@ -26,6 +26,7 @@ import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.impl.MessageImpl;
 import org.apache.pulsar.client.impl.TopicMessageImpl;
+import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
 import org.apache.pulsar.common.functions.ConsumerConfig;
 import org.apache.pulsar.common.functions.FunctionConfig;
 import org.apache.pulsar.functions.api.Record;
@@ -118,6 +119,16 @@ public abstract class PulsarSource<T> implements Source<T> 
{
             TopicMessageImpl impl = (TopicMessageImpl) message;
             schema = impl.getSchemaInternal();
         }
+
+        // we don't want the Function/Sink to see AutoConsumeSchema
+        if (schema instanceof AutoConsumeSchema) {
+            AutoConsumeSchema autoConsumeSchema = (AutoConsumeSchema) schema;
+            // we cannot use atSchemaVersion, because atSchemaVersion is only
+            // able to decode data, here we want a Schema that
+            // is able to re-encode the payload when needed.
+            schema = (Schema<T>) autoConsumeSchema
+                    .unwrapInternalSchema(message.getSchemaVersion());
+        }
         return PulsarRecord.<T>builder()
                 .message(message)
                 .schema(schema)
diff --git 
a/tests/docker-images/java-test-functions/src/main/java/org/apache/pulsar/tests/integration/functions/GenericObjectFunction.java
 
b/tests/docker-images/java-test-functions/src/main/java/org/apache/pulsar/tests/integration/functions/GenericObjectFunction.java
new file mode 100644
index 00000000000..3c5edaa48d0
--- /dev/null
+++ 
b/tests/docker-images/java-test-functions/src/main/java/org/apache/pulsar/tests/integration/functions/GenericObjectFunction.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.functions;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.schema.GenericObject;
+import org.apache.pulsar.functions.api.Context;
+import org.apache.pulsar.functions.api.Function;
+import org.apache.pulsar.functions.api.Record;
+
+/**
+ * This function processes any message with any schema,
+ * and outputs the message with the same schema to another topic.
+ */
+@Slf4j
+public class GenericObjectFunction implements Function<GenericObject, Void> {
+
+    @Override
+    public Void process(GenericObject genericObject, Context context) throws 
Exception {
+        Record<?> currentRecord = context.getCurrentRecord();
+        log.info("apply to {} {}", genericObject, 
genericObject.getNativeObject());
+        log.info("record with schema {} {}", currentRecord.getSchema(), 
currentRecord);
+        // do some processing...
+        final boolean isStruct;
+        switch (currentRecord.getSchema().getSchemaInfo().getType()) {
+            case AVRO:
+            case JSON:
+            case PROTOBUF_NATIVE:
+                isStruct = true;
+                break;
+            default:
+                isStruct = false;
+                break;
+        }
+        if (isStruct) {
+            // GenericRecord must stay wrapped
+            context.newOutputMessage(context.getOutputTopic(), (Schema) 
currentRecord.getSchema())
+                    .value(genericObject).send();
+        } else {
+            // primitives and KeyValue must be unwrapped
+            context.newOutputMessage(context.getOutputTopic(), (Schema) 
currentRecord.getSchema())
+                    .value(genericObject.getNativeObject()).send();
+        }
+        return null;
+    }
+}
+
diff --git 
a/tests/docker-images/java-test-functions/src/main/java/org/apache/pulsar/tests/integration/functions/RemoveAvroFieldFunction.java
 
b/tests/docker-images/java-test-functions/src/main/java/org/apache/pulsar/tests/integration/functions/RemoveAvroFieldFunction.java
new file mode 100644
index 00000000000..cc2a81bde64
--- /dev/null
+++ 
b/tests/docker-images/java-test-functions/src/main/java/org/apache/pulsar/tests/integration/functions/RemoveAvroFieldFunction.java
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.functions;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.schema.GenericObject;
+import org.apache.pulsar.client.api.schema.GenericRecord;
+import org.apache.pulsar.client.api.schema.KeyValueSchema;
+import org.apache.pulsar.common.schema.KeyValue;
+import org.apache.pulsar.common.schema.SchemaType;
+import org.apache.pulsar.functions.api.Context;
+import org.apache.pulsar.functions.api.Function;
+import org.apache.pulsar.functions.api.Record;
+
+import java.io.ByteArrayOutputStream;
+import java.util.stream.Collectors;
+
+/**
+ * This function removes a "field" from a AVRO message
+ */
+@Slf4j
+public class RemoveAvroFieldFunction implements Function<GenericObject, Void> {
+
+    private static final String FIELD_TO_REMOVE = "age";
+
+    @Override
+    public Void process(GenericObject genericObject, Context context) throws 
Exception {
+        Record<?> currentRecord = context.getCurrentRecord();
+        log.info("apply to {} {}", genericObject, 
genericObject.getNativeObject());
+        log.info("record with schema {} version {} {}", 
currentRecord.getSchema(),
+                currentRecord.getMessage().get().getSchemaVersion(),
+                currentRecord);
+        Object nativeObject = genericObject.getNativeObject();
+        Schema<?> schema = currentRecord.getSchema();
+
+        Schema outputSchema = schema;
+        Object outputObject = genericObject.getNativeObject();
+        boolean someThingDone = false;
+        if (schema instanceof KeyValueSchema && nativeObject instanceof 
KeyValue)  {
+            KeyValueSchema kvSchema = (KeyValueSchema) schema;
+
+            Schema keySchema = kvSchema.getKeySchema();
+            Schema valueSchema = kvSchema.getValueSchema();
+            // remove a column "age" from the "valueSchema"
+            if (valueSchema.getSchemaInfo().getType() == SchemaType.AVRO) {
+
+                org.apache.avro.Schema avroSchema = (org.apache.avro.Schema) 
valueSchema.getNativeSchema().get();
+                if (avroSchema.getField(FIELD_TO_REMOVE) != null) {
+                    org.apache.avro.Schema.Parser parser = new 
org.apache.avro.Schema.Parser();
+                    org.apache.avro.Schema originalAvroSchema = 
parser.parse(avroSchema.toString(false));
+                    org.apache.avro.Schema modified = 
org.apache.avro.Schema.createRecord(
+                            originalAvroSchema.getName(), 
originalAvroSchema.getDoc(), originalAvroSchema.getNamespace(), 
originalAvroSchema.isError(),
+                            originalAvroSchema.getFields().
+                                    stream()
+                                    
.filter(f->!f.name().equals(FIELD_TO_REMOVE))
+                                    .map(f-> new 
org.apache.avro.Schema.Field(f.name(), f.schema(), f.doc(), f.defaultVal(), 
f.order()))
+                                    .collect(Collectors.toList()));
+
+                    KeyValue originalObject = (KeyValue) nativeObject;
+
+                    GenericRecord value = (GenericRecord) 
originalObject.getValue();
+                    org.apache.avro.generic.GenericRecord genericRecord
+                            = (org.apache.avro.generic.GenericRecord) 
value.getNativeObject();
+
+                    org.apache.avro.generic.GenericRecord newRecord = new 
GenericData.Record(modified);
+                    for (org.apache.avro.Schema.Field field : 
modified.getFields()) {
+                        newRecord.put(field.name(), 
genericRecord.get(field.name()));
+                    }
+                    GenericDatumWriter writer = new 
GenericDatumWriter(modified);
+                    ByteArrayOutputStream oo = new ByteArrayOutputStream();
+                    BinaryEncoder encoder = 
EncoderFactory.get().directBinaryEncoder(oo, null);
+                    writer.write(newRecord, encoder);
+                    Object newValue = oo.toByteArray();
+
+                    Schema newValueSchema = Schema.NATIVE_AVRO(modified);
+                    outputSchema = Schema.KeyValue(keySchema, newValueSchema, 
kvSchema.getKeyValueEncodingType());
+                    outputObject = new KeyValue(originalObject.getKey(), 
newValue);
+                    someThingDone = true;
+                }
+            }
+        } else if (schema.getSchemaInfo().getType() == SchemaType.AVRO) {
+            org.apache.avro.Schema avroSchema = (org.apache.avro.Schema) 
schema.getNativeSchema().get();
+            if (avroSchema.getField(FIELD_TO_REMOVE) != null) {
+                org.apache.avro.Schema.Parser parser = new 
org.apache.avro.Schema.Parser();
+                org.apache.avro.Schema originalAvroSchema = 
parser.parse(avroSchema.toString(false));
+                org.apache.avro.Schema modified = 
org.apache.avro.Schema.createRecord(
+                        originalAvroSchema.getName(), 
originalAvroSchema.getDoc(), originalAvroSchema.getNamespace(), 
originalAvroSchema.isError(),
+                        originalAvroSchema.getFields().
+                                stream()
+                                .filter(f -> !f.name().equals(FIELD_TO_REMOVE))
+                                .map(f -> new 
org.apache.avro.Schema.Field(f.name(), f.schema(), f.doc(), f.defaultVal(), 
f.order()))
+                                .collect(Collectors.toList()));
+
+                org.apache.avro.generic.GenericRecord genericRecord
+                        = (org.apache.avro.generic.GenericRecord) nativeObject;
+                org.apache.avro.generic.GenericRecord newRecord = new 
GenericData.Record(modified);
+                for (org.apache.avro.Schema.Field field : 
modified.getFields()) {
+                    newRecord.put(field.name(), 
genericRecord.get(field.name()));
+                }
+                GenericDatumWriter writer = new GenericDatumWriter(modified);
+                ByteArrayOutputStream oo = new ByteArrayOutputStream();
+                BinaryEncoder encoder = 
EncoderFactory.get().directBinaryEncoder(oo, null);
+                writer.write(newRecord, encoder);
+
+                Schema newValueSchema = Schema.NATIVE_AVRO(modified);
+                outputSchema = newValueSchema;
+                outputObject = oo.toByteArray();
+                someThingDone = true;
+            }
+        }
+
+        if (!someThingDone) {
+            // do some processing...
+            final boolean isStruct;
+            switch (currentRecord.getSchema().getSchemaInfo().getType()) {
+                case AVRO:
+                case JSON:
+                case PROTOBUF_NATIVE:
+                    isStruct = true;
+                    break;
+                default:
+                    isStruct = false;
+                    break;
+            }
+            if (isStruct) {
+                // GenericRecord must stay wrapped
+                outputObject = currentRecord.getValue();
+            } else {
+                // primitives and KeyValue must be unwrapped
+                outputObject = nativeObject;
+            }
+        }
+        log.info("output {} schema {}", outputObject, outputSchema);
+        context.newOutputMessage(context.getOutputTopic(), outputSchema)
+                .value(outputObject).send();
+        return null;
+    }
+}
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
index 460ea2220d0..f9b90b4d75b 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.tests.integration.functions;
 import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 
@@ -1590,6 +1591,125 @@ public abstract class PulsarFunctionsTest extends 
PulsarFunctionsTestBase {
         client.close();
     }
 
+
+    protected void testGenericObjectFunction(String function, boolean 
removeAgeField, boolean keyValue) throws Exception {
+        log.info("start {} function test ...", function);
+
+        String ns = "public/ns-genericobject-" + randomName(8);
+        @Cleanup
+        PulsarAdmin pulsarAdmin = getPulsarAdmin();
+        pulsarAdmin.namespaces().createNamespace(ns);
+
+        @Cleanup
+        PulsarClient pulsarClient = getPulsarClient();
+
+        final int numMessages = 10;
+        final String inputTopic = ns + "/test-object-input-" + randomName(8);
+        final String outputTopic = ns + "/test-object-output" + randomName(8);
+        @Cleanup
+        Consumer<GenericRecord> consumer = pulsarClient
+                .newConsumer(Schema.AUTO_CONSUME())
+                .subscriptionName("test")
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                .topic(outputTopic)
+                .subscribe();
+
+        final String functionName = "test-generic-fn-" + randomName(8);
+        submitFunction(
+                Runtime.JAVA,
+                inputTopic,
+                outputTopic,
+                functionName,
+                null,
+                function,
+                Schema.AUTO_CONSUME(),
+                null,
+                null,
+                SchemaType.NONE.name(),
+                SubscriptionInitialPosition.Earliest);
+        try {
+            if (keyValue) {
+                @Cleanup
+                Producer<KeyValue<Users.UserV1, Users.UserV1>> producer = 
pulsarClient
+                        .newProducer(Schema.KeyValue(
+                                Schema.AVRO(Users.UserV1.class),
+                                Schema.AVRO(Users.UserV1.class), 
KeyValueEncodingType.SEPARATED))
+                        .topic(inputTopic)
+                        .create();
+                for (int i = 0; i < numMessages; i++) {
+                    producer.send(new KeyValue<>(new Users.UserV1("foo" + i, 
i),
+                            new Users.UserV1("bar" + i, i + 100)));
+                }
+            } else {
+                @Cleanup
+                Producer<Users.UserV1> producer = pulsarClient
+                        .newProducer(Schema.AVRO(Users.UserV1.class))
+                        .topic(inputTopic)
+                        .create();
+                for (int i = 0; i < numMessages; i++) {
+                    producer.send(new Users.UserV1("bar" + i, i + 100));
+                }
+            }
+
+            getFunctionInfoSuccess(functionName);
+
+            getFunctionStatus(functionName, numMessages, true);
+
+            int i = 0;
+            Message<GenericRecord> message;
+            do {
+                message = consumer.receive(30, TimeUnit.SECONDS);
+                if (message != null) {
+                    GenericRecord genericRecord = message.getValue();
+                    if (keyValue) {
+                        KeyValue<GenericRecord, GenericRecord> keyValueObject 
= (KeyValue<GenericRecord, GenericRecord>) genericRecord.getNativeObject();
+                        GenericRecord key = keyValueObject.getKey();
+                        GenericRecord value = keyValueObject.getValue();
+                        key.getFields().forEach(f-> {
+                            log.info("key field {} value {}", f.getName(), 
key.getField(f.getName()));
+                        });
+                        value.getFields().forEach(f-> {
+                            log.info("value field {} value {}", f.getName(), 
value.getField(f.getName()));
+                        });
+                        assertEquals(i, key.getField("age"));
+                        assertEquals("foo" + i, key.getField("name"));
+
+                        if (removeAgeField) {
+                            // field "age" is removed from the schema
+                            assertFalse(value.getFields().stream().anyMatch(f 
-> f.getName().equals("age")));
+                        } else {
+                            assertEquals(i + 100, value.getField("age"));
+                        }
+                        assertEquals("bar" + i, value.getField("name"));
+                    } else {
+                        GenericRecord value = genericRecord;
+                        log.info("received value {}", value);
+                        value.getFields().forEach(f-> {
+                            log.info("value field {} value {}", f.getName(), 
value.getField(f.getName()));
+                        });
+
+                        if (removeAgeField) {
+                            // field "age" is removed from the schema
+                            assertFalse(value.getFields().stream().anyMatch(f 
-> f.getName().equals("age")));
+                        } else {
+                            assertEquals(i + 100, value.getField("age"));
+                        }
+                        assertEquals("bar" + i, value.getField("name"));
+                    }
+
+                    consumer.acknowledge(message);
+                    i++;
+                }
+            } while (message != null);
+        } finally {
+            pulsarCluster.dumpFunctionLogs(functionName);
+        }
+
+        deleteFunction(functionName);
+
+        getFunctionInfoNotFound(functionName);
+    }
+
     protected void testMergeFunction() throws Exception {
         log.info("start merge function test ...");
 
@@ -1636,26 +1756,31 @@ public abstract class PulsarFunctionsTest extends 
PulsarFunctionsTestBase {
 
         getFunctionStatus(functionName, topicMsgCntMap.keySet().size() * 
messagePerTopic, true);
 
-        Message<GenericRecord> message;
-        do {
-            message = consumer.receive(30, TimeUnit.SECONDS);
-            if (message != null) {
-                String baseTopic = message.getProperty("baseTopic");
-                GenericRecord genericRecord = message.getValue();
-                log.info("receive msg baseTopic: {}, schemaType: {}, 
nativeClass: {}, nativeObject: {}",
-                        baseTopic,
-                        genericRecord.getSchemaType(),
-                        genericRecord.getNativeObject().getClass(),
-                        genericRecord.getNativeObject());
-                checkSchemaForAutoSchema(message, baseTopic);
-                topicMsgCntMap.get(baseTopic).decrementAndGet();
-                consumer.acknowledge(message);
-            }
-        } while (message != null);
+        try {
 
-        for (Map.Entry<String, AtomicInteger> entry : 
topicMsgCntMap.entrySet()) {
-            assertEquals(entry.getValue().get(), 0,
-                    "topic " + entry.getKey() + " left message cnt is not 0.");
+            Message<GenericRecord> message;
+            do {
+                message = consumer.receive(30, TimeUnit.SECONDS);
+                if (message != null) {
+                    String baseTopic = message.getProperty("baseTopic");
+                    GenericRecord genericRecord = message.getValue();
+                    log.info("receive msg baseTopic: {}, schemaType: {}, 
nativeClass: {}, nativeObject: {}",
+                            baseTopic,
+                            genericRecord.getSchemaType(),
+                            genericRecord.getNativeObject().getClass(),
+                            genericRecord.getNativeObject());
+                    checkSchemaForAutoSchema(message, baseTopic);
+                    topicMsgCntMap.get(baseTopic).decrementAndGet();
+                    consumer.acknowledge(message);
+                }
+            } while (message != null);
+
+            for (Map.Entry<String, AtomicInteger> entry : 
topicMsgCntMap.entrySet()) {
+                assertEquals(entry.getValue().get(), 0,
+                        "topic " + entry.getKey() + " left message cnt is not 
0.");
+            }
+        } finally {
+            pulsarCluster.dumpFunctionLogs(functionName);
         }
 
         deleteFunction(functionName);
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java
index e594923ec06..cbcdf89612a 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java
@@ -42,6 +42,12 @@ public abstract class PulsarFunctionsTestBase extends 
PulsarTestSuite {
     public static final String EXCEPTION_JAVA_CLASS =
             "org.apache.pulsar.tests.integration.functions.ExceptionFunction";
 
+    public static final String GENERIC_OBJECT_FUNCTION_JAVA_CLASS =
+            
"org.apache.pulsar.tests.integration.functions.GenericObjectFunction";
+
+    public static final String REMOVE_AVRO_FIELD_FUNCTION_JAVA_CLASS =
+            
"org.apache.pulsar.tests.integration.functions.RemoveAvroFieldFunction";
+
     public static final String SERDE_JAVA_CLASS =
             
"org.apache.pulsar.functions.api.examples.CustomBaseToBaseFunction";
 
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/java/PulsarFunctionsJavaTest.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/java/PulsarFunctionsJavaTest.java
index 2e35a700854..46cb15892a1 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/java/PulsarFunctionsJavaTest.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/java/PulsarFunctionsJavaTest.java
@@ -168,4 +168,24 @@ public class PulsarFunctionsJavaTest extends 
PulsarFunctionsTest {
            testMergeFunction();
    }
 
+    @Test(groups = {"java_function", "function"})
+    public void testGenericObjectFunction() throws Exception {
+        testGenericObjectFunction(GENERIC_OBJECT_FUNCTION_JAVA_CLASS, false, 
false);
+    }
+
+    @Test(groups = {"java_function", "function"})
+    public void testGenericObjectRemoveFiledFunction() throws Exception {
+        testGenericObjectFunction(REMOVE_AVRO_FIELD_FUNCTION_JAVA_CLASS, true, 
false);
+    }
+
+    @Test(groups = {"java_function", "function"})
+    public void testGenericObjectFunctionKeyValue() throws Exception {
+        testGenericObjectFunction(GENERIC_OBJECT_FUNCTION_JAVA_CLASS, false, 
true);
+    }
+
+    @Test(groups = {"java_function", "function"})
+    public void testGenericObjectRemoveFiledFunctionKeyValue() throws 
Exception {
+        testGenericObjectFunction(REMOVE_AVRO_FIELD_FUNCTION_JAVA_CLASS, true, 
true);
+    }
+
 }

Reply via email to