This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new f35766f  Pulsar IO: allow Sinks to use native AVRO and JSON (#11322)
f35766f is described below

commit f35766f99fcc56493be9d12533f4b2a31c3c5884
Author: Enrico Olivelli <[email protected]>
AuthorDate: Tue Jul 20 05:11:26 2021 +0200

    Pulsar IO: allow Sinks to use native AVRO and JSON (#11322)
    
    The Sink should be able to manage the result of 
GenericRecord.getNativeObject().
    In order to do this Apache AVRO must be loaded from the same classloader 
that is loading Apache Pulsar Runtime.
    
    The same problem applies in the case of a JsonNode returned by 
getNativeObject();
    
    *Modifications*
    - Add AVRO to the list of classes (like slf4j) to be loaded from the Pulsar 
runtime (this in turn imports Commons Compress and Jackson Databind).
    - Enhance the existing integration tests, that tested about the Schema 
definition, but it didn't actually try to "use" the 
`org.apache.avro.GenericRecord` object as well as the `Jackson JsonNode` object
---
 pulsar-functions/runtime-all/pom.xml                     | 14 ++++++++++++++
 .../pulsar/functions/instance/JavaInstanceDepsTest.java  |  9 ++++++++-
 .../tests/integration/io/TestGenericObjectSink.java      | 16 +++++++++++++++-
 .../integration/io/PulsarGenericObjectSinkTest.java      |  1 +
 4 files changed, 38 insertions(+), 2 deletions(-)

diff --git a/pulsar-functions/runtime-all/pom.xml 
b/pulsar-functions/runtime-all/pom.xml
index d3eaccf..9dacaa3 100644
--- a/pulsar-functions/runtime-all/pom.xml
+++ b/pulsar-functions/runtime-all/pom.xml
@@ -41,6 +41,7 @@
     5. log4j-slf4j-impl
     6. log4j-api
     7. log4j-core
+    8. AVRO
   -->
 
   <artifactId>pulsar-functions-runtime-all</artifactId>
@@ -65,6 +66,19 @@
       <version>${project.version}</version>
     </dependency>
 
+    <!-- avro and its dependencies, with pinned version -->
+    <dependency>
+      <groupId>org.apache.avro</groupId>
+      <artifactId>avro</artifactId>
+      <version>${avro.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>com.fasterxml.jackson.core</groupId>
+      <artifactId>jackson-databind</artifactId>
+      <version>${jackson.databind.version}</version>
+    </dependency>
+
     <!-- logging -->
 
     <dependency>
diff --git 
a/pulsar-functions/runtime-all/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceDepsTest.java
 
b/pulsar-functions/runtime-all/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceDepsTest.java
index 3bdd23f..859be4e 100644
--- 
a/pulsar-functions/runtime-all/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceDepsTest.java
+++ 
b/pulsar-functions/runtime-all/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceDepsTest.java
@@ -44,6 +44,9 @@ import java.util.zip.ZipInputStream;
  *     5. log4j-slf4j-impl
  *     6. log4j-api
  *     7. log4j-core
+ *     8. Apache AVRO
+ *     9. Jackson Mapper and Databind (dependency of AVRO)
+ *     10. Apache Commons Compress (dependency of AVRO)
  */
 public class JavaInstanceDepsTest {
 
@@ -60,11 +63,15 @@ public class JavaInstanceDepsTest {
             if (e == null)
                 break;
             String name = e.getName();
-            if (name.endsWith(".class") && !name.startsWith("META-INF")) {
+            if (name.endsWith(".class") && !name.startsWith("META-INF") && 
!name.equals("module-info.class")) {
                 // The only classes in the java-instance.jar should be 
org.apache.pulsar, slf4j, and log4j classes
+                // (see the full list above)
                 // filter out those classes to see if there are any other 
classes that should not be allowed
                 if (!name.startsWith("org/apache/pulsar")
                         && !name.startsWith("org/slf4j")
+                        && !name.startsWith("org/apache/avro")
+                        && !name.startsWith("com/fasterxml/jackson")
+                        && !name.startsWith("org/apache/commons/compress")
                         && !name.startsWith("org/apache/logging/slf4j")
                         && !name.startsWith("org/apache/logging/log4j")) {
                     notAllowedClasses.add(name);
diff --git 
a/tests/docker-images/java-test-functions/src/main/java/org/apache/pulsar/tests/integration/io/TestGenericObjectSink.java
 
b/tests/docker-images/java-test-functions/src/main/java/org/apache/pulsar/tests/integration/io/TestGenericObjectSink.java
index d131e5b..fa8a3fe 100644
--- 
a/tests/docker-images/java-test-functions/src/main/java/org/apache/pulsar/tests/integration/io/TestGenericObjectSink.java
+++ 
b/tests/docker-images/java-test-functions/src/main/java/org/apache/pulsar/tests/integration/io/TestGenericObjectSink.java
@@ -18,7 +18,9 @@
  */
 package org.apache.pulsar.tests.integration.io;
 
+import com.fasterxml.jackson.databind.JsonNode;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.avro.generic.GenericRecord;
 import org.apache.pulsar.client.api.schema.GenericObject;
 import org.apache.pulsar.client.api.schema.KeyValueSchema;
 import org.apache.pulsar.common.schema.KeyValue;
@@ -67,7 +69,7 @@ public class TestGenericObjectSink implements 
Sink<GenericObject> {
         }
         log.info("value {}", record.getValue());
         log.info("value schema type {}", record.getValue().getSchemaType());
-        log.info("value native object {}", 
record.getValue().getNativeObject());
+        log.info("value native object {} class {}", 
record.getValue().getNativeObject(), 
record.getValue().getNativeObject().getClass());
 
         String expectedSchemaDefinition = 
record.getProperties().getOrDefault("expectedSchemaDefinition", "");
         log.info("schemaDefinition {}", 
record.getSchema().getSchemaInfo().getSchemaDefinition());
@@ -79,6 +81,18 @@ public class TestGenericObjectSink implements 
Sink<GenericObject> {
             }
         }
 
+        // testing that actually the Sink is able to use Native AVRO
+        if (record.getSchema().getSchemaInfo().getType() == SchemaType.AVRO) {
+            GenericRecord nativeGenericRecord = (GenericRecord) 
record.getValue().getNativeObject();
+            log.info("Schema from AVRO generic object {}", 
nativeGenericRecord.getSchema());
+        }
+
+        // testing that actually the Sink is able to use Native JSON
+        if (record.getSchema().getSchemaInfo().getType() == SchemaType.JSON) {
+            JsonNode nativeGenericRecord = (JsonNode) 
record.getValue().getNativeObject();
+            log.info("NodeType from JsonNode generic object {}", 
nativeGenericRecord.getNodeType());
+        }
+
         record.ack();
     }
 
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/PulsarGenericObjectSinkTest.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/PulsarGenericObjectSinkTest.java
index a728f5b..fc266a5 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/PulsarGenericObjectSinkTest.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/PulsarGenericObjectSinkTest.java
@@ -108,6 +108,7 @@ public class PulsarGenericObjectSinkTest extends 
PulsarStandaloneTestSuite {
         List<SinkSpec> specs = Arrays.asList(
                 new SinkSpec("test-kv-sink-input-string-" + randomName(8), 
Schema.STRING, "foo"),
                 new SinkSpec("test-kv-sink-input-avro-" + randomName(8), 
Schema.AVRO(Pojo.class), Pojo.builder().field1("a").field2(2).build()),
+                new SinkSpec("test-kv-sink-input-json-" + randomName(8), 
Schema.JSON(Pojo.class), Pojo.builder().field1("a").field2(2).build()),
                 new SinkSpec("test-kv-sink-input-kv-string-int-" + 
randomName(8),
                         Schema.KeyValue(Schema.STRING, Schema.INT32), new 
KeyValue<>("foo", 123)),
                 new SinkSpec("test-kv-sink-input-kv-avro-json-inl-" + 
randomName(8),

Reply via email to