This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new f35766f Pulsar IO: allow Sinks to use native AVRO and JSON (#11322)
f35766f is described below
commit f35766f99fcc56493be9d12533f4b2a31c3c5884
Author: Enrico Olivelli <[email protected]>
AuthorDate: Tue Jul 20 05:11:26 2021 +0200
Pulsar IO: allow Sinks to use native AVRO and JSON (#11322)
The Sink should be able to manage the result of
GenericRecord.getNativeObject().
In order to do this Apache AVRO must be loaded from the same classloader
that is loading Apache Pulsar Runtime.
The same problem applies in the case of a JsonNode returned by
getNativeObject();
*Modifications*
- Add AVRO to the list of classes (like slf4j) to be loaded from the Pulsar
runtime (this in turn imports Commons Compress and Jackson Databind).
- Enhance the existing integration tests, that tested about the Schema
definition, but it didn't actually try to "use" the
`org.apache.avro.GenericRecord` object as well as the `Jackson JsonNode` object
---
pulsar-functions/runtime-all/pom.xml | 14 ++++++++++++++
.../pulsar/functions/instance/JavaInstanceDepsTest.java | 9 ++++++++-
.../tests/integration/io/TestGenericObjectSink.java | 16 +++++++++++++++-
.../integration/io/PulsarGenericObjectSinkTest.java | 1 +
4 files changed, 38 insertions(+), 2 deletions(-)
diff --git a/pulsar-functions/runtime-all/pom.xml
b/pulsar-functions/runtime-all/pom.xml
index d3eaccf..9dacaa3 100644
--- a/pulsar-functions/runtime-all/pom.xml
+++ b/pulsar-functions/runtime-all/pom.xml
@@ -41,6 +41,7 @@
5. log4j-slf4j-impl
6. log4j-api
7. log4j-core
+ 8. AVRO
-->
<artifactId>pulsar-functions-runtime-all</artifactId>
@@ -65,6 +66,19 @@
<version>${project.version}</version>
</dependency>
+ <!-- avro and its dependencies, with pinned version -->
+ <dependency>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro</artifactId>
+ <version>${avro.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ <version>${jackson.databind.version}</version>
+ </dependency>
+
<!-- logging -->
<dependency>
diff --git
a/pulsar-functions/runtime-all/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceDepsTest.java
b/pulsar-functions/runtime-all/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceDepsTest.java
index 3bdd23f..859be4e 100644
---
a/pulsar-functions/runtime-all/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceDepsTest.java
+++
b/pulsar-functions/runtime-all/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceDepsTest.java
@@ -44,6 +44,9 @@ import java.util.zip.ZipInputStream;
* 5. log4j-slf4j-impl
* 6. log4j-api
* 7. log4j-core
+ * 8. Apache AVRO
+ * 9. Jackson Mapper and Databind (dependency of AVRO)
+ * 10. Apache Commons Compress (dependency of AVRO)
*/
public class JavaInstanceDepsTest {
@@ -60,11 +63,15 @@ public class JavaInstanceDepsTest {
if (e == null)
break;
String name = e.getName();
- if (name.endsWith(".class") && !name.startsWith("META-INF")) {
+ if (name.endsWith(".class") && !name.startsWith("META-INF") &&
!name.equals("module-info.class")) {
// The only classes in the java-instance.jar should be
org.apache.pulsar, slf4j, and log4j classes
+ // (see the full list above)
// filter out those classes to see if there are any other
classes that should not be allowed
if (!name.startsWith("org/apache/pulsar")
&& !name.startsWith("org/slf4j")
+ && !name.startsWith("org/apache/avro")
+ && !name.startsWith("com/fasterxml/jackson")
+ && !name.startsWith("org/apache/commons/compress")
&& !name.startsWith("org/apache/logging/slf4j")
&& !name.startsWith("org/apache/logging/log4j")) {
notAllowedClasses.add(name);
diff --git
a/tests/docker-images/java-test-functions/src/main/java/org/apache/pulsar/tests/integration/io/TestGenericObjectSink.java
b/tests/docker-images/java-test-functions/src/main/java/org/apache/pulsar/tests/integration/io/TestGenericObjectSink.java
index d131e5b..fa8a3fe 100644
---
a/tests/docker-images/java-test-functions/src/main/java/org/apache/pulsar/tests/integration/io/TestGenericObjectSink.java
+++
b/tests/docker-images/java-test-functions/src/main/java/org/apache/pulsar/tests/integration/io/TestGenericObjectSink.java
@@ -18,7 +18,9 @@
*/
package org.apache.pulsar.tests.integration.io;
+import com.fasterxml.jackson.databind.JsonNode;
import lombok.extern.slf4j.Slf4j;
+import org.apache.avro.generic.GenericRecord;
import org.apache.pulsar.client.api.schema.GenericObject;
import org.apache.pulsar.client.api.schema.KeyValueSchema;
import org.apache.pulsar.common.schema.KeyValue;
@@ -67,7 +69,7 @@ public class TestGenericObjectSink implements
Sink<GenericObject> {
}
log.info("value {}", record.getValue());
log.info("value schema type {}", record.getValue().getSchemaType());
- log.info("value native object {}",
record.getValue().getNativeObject());
+ log.info("value native object {} class {}",
record.getValue().getNativeObject(),
record.getValue().getNativeObject().getClass());
String expectedSchemaDefinition =
record.getProperties().getOrDefault("expectedSchemaDefinition", "");
log.info("schemaDefinition {}",
record.getSchema().getSchemaInfo().getSchemaDefinition());
@@ -79,6 +81,18 @@ public class TestGenericObjectSink implements
Sink<GenericObject> {
}
}
+ // testing that actually the Sink is able to use Native AVRO
+ if (record.getSchema().getSchemaInfo().getType() == SchemaType.AVRO) {
+ GenericRecord nativeGenericRecord = (GenericRecord)
record.getValue().getNativeObject();
+ log.info("Schema from AVRO generic object {}",
nativeGenericRecord.getSchema());
+ }
+
+ // testing that actually the Sink is able to use Native JSON
+ if (record.getSchema().getSchemaInfo().getType() == SchemaType.JSON) {
+ JsonNode nativeGenericRecord = (JsonNode)
record.getValue().getNativeObject();
+ log.info("NodeType from JsonNode generic object {}",
nativeGenericRecord.getNodeType());
+ }
+
record.ack();
}
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/PulsarGenericObjectSinkTest.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/PulsarGenericObjectSinkTest.java
index a728f5b..fc266a5 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/PulsarGenericObjectSinkTest.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/PulsarGenericObjectSinkTest.java
@@ -108,6 +108,7 @@ public class PulsarGenericObjectSinkTest extends
PulsarStandaloneTestSuite {
List<SinkSpec> specs = Arrays.asList(
new SinkSpec("test-kv-sink-input-string-" + randomName(8),
Schema.STRING, "foo"),
new SinkSpec("test-kv-sink-input-avro-" + randomName(8),
Schema.AVRO(Pojo.class), Pojo.builder().field1("a").field2(2).build()),
+ new SinkSpec("test-kv-sink-input-json-" + randomName(8),
Schema.JSON(Pojo.class), Pojo.builder().field1("a").field2(2).build()),
new SinkSpec("test-kv-sink-input-kv-string-int-" +
randomName(8),
Schema.KeyValue(Schema.STRING, Schema.INT32), new
KeyValue<>("foo", 123)),
new SinkSpec("test-kv-sink-input-kv-avro-json-inl-" +
randomName(8),