This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.6 by this push:
     new a46875c  Pulsar IO: Make Source topic Schema information available to 
downstream Sinks (#8854)
a46875c is described below

commit a46875c80a8869f56f2c1d0654a296f285fd643d
Author: Enrico Olivelli <[email protected]>
AuthorDate: Fri Dec 11 03:14:37 2020 +0100

    Pulsar IO: Make Source topic Schema information available to downstream 
Sinks (#8854)
    
    ## Changes:
    - extract original Schema from the message read from the topic
    - expose the schema into the PulsarRecord implementation
    - add unit test
    - in PulsarSink detect the fact that the source record is directly from a 
PulsarSource and drop the schema (there was already a check about the fact that 
the source record is a PulsarRecord, so this patch is not adding a new 
shortcircuit)
    - little refactor in PulsarSink in order to make it clear that we are 
working with SinkRecord (casts were already present in the code, and it would 
not have worked without SinkRecord, so we are only tidying it up)
    
    ### Motivation
    Pulsar sinks cannot access the original Schema of the Message they are 
consuming,
    this is because currently PulsarSource does not publish the Schema 
information to downstream processing.
    
    ### Modifications
    Try to retrieve Schema information from the Message and push it down to the 
Record.
    
    ### Verifying this change
    - [X] Make sure that the change passes the CI checks.
    - [X] This change added tests and can be verified as follows:
    - added unit test
    - tested manually with Pulsar Standalone
    - there are existing unit and integration tests that are touching this 
change (in fact I had to iterate a few times)
    
    ### Does this pull request potentially affect one of the following parts: 
yes
    with this change Pulsar function users and especially Pulsar I/O Sinks will 
be able to inspect the original schema in case the processing starts from a 
Pulsar topic (PulsarSource)
    
    
    (cherry picked from commit 0e6ad1ced441aef0611a5683aa5ff0799a501866)
---
 .../worker/PulsarFunctionLocalRunTest.java         |  3 +-
 .../apache/pulsar/functions/sink/PulsarSink.java   | 53 +++++++++++++---------
 .../pulsar/functions/source/PulsarRecord.java      |  7 +++
 .../pulsar/functions/source/PulsarSource.java      |  8 +++-
 .../pulsar/functions/sink/PulsarSinkTest.java      |  1 +
 .../pulsar/functions/source/PulsarSourceTest.java  | 26 ++++++++++-
 6 files changed, 74 insertions(+), 24 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java
index 0fe2958..1002352 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java
@@ -219,7 +219,7 @@ public class PulsarFunctionLocalRunTest {
         propAdmin.getAdminRoles().add("superUser");
         
propAdmin.setAllowedClusters(Sets.newHashSet(Lists.newArrayList(CLUSTER)));
         admin.tenants().updateTenant(tenant, propAdmin);
-        
+
         // setting up simple web sever to test submitting function via URL
         fileServer = HttpServer.create(new InetSocketAddress(0), 0);
         fileServer.createContext("/pulsar-io-data-generator.nar", he -> {
@@ -503,6 +503,7 @@ public class PulsarFunctionLocalRunTest {
                 .withPojo(AvroTestObject.class).build());
         //use AVRO schema
         admin.schemas().createSchema(sourceTopic, schema.getSchemaInfo());
+        // please note that in this test the sink topic schema is different 
from the schema of the source topic
 
         //produce message to sourceTopic
         Producer<AvroTestObject> producer = 
pulsarClient.newProducer(schema).topic(sourceTopic).create();
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
index 9aa8bc1..d94db28 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
@@ -71,9 +71,9 @@ public class PulsarSink<T> implements Sink<T> {
 
     private interface PulsarSinkProcessor<T> {
 
-        TypedMessageBuilder<T> newMessage(Record<T> record);
+        TypedMessageBuilder<T> newMessage(SinkRecord<T> record);
 
-        void sendOutputMessage(TypedMessageBuilder<T> msg, Record<T> record);
+        void sendOutputMessage(TypedMessageBuilder<T> msg, SinkRecord<T> 
record);
 
         void close() throws Exception;
     }
@@ -140,11 +140,10 @@ public class PulsarSink<T> implements Sink<T> {
             }
         }
 
-        public Function<Throwable, Void> getPublishErrorHandler(Record<T> 
record, boolean failSource) {
+        public Function<Throwable, Void> getPublishErrorHandler(SinkRecord<T> 
record, boolean failSource) {
 
             return throwable -> {
-                SinkRecord<T> sinkRecord = (SinkRecord<T>) record;
-                Record<T> srcRecord = sinkRecord.getSourceRecord();
+                Record<T> srcRecord = record.getSourceRecord();
                 if (failSource) {
                     srcRecord.fail();
                 }
@@ -179,22 +178,29 @@ public class PulsarSink<T> implements Sink<T> {
         }
 
         @Override
-        public TypedMessageBuilder<T> newMessage(Record<T> record) {
-            if (record.getSchema() != null) {
+        public TypedMessageBuilder<T> newMessage(SinkRecord<T> record) {
+            Schema<T> schemaToWrite = record.getSchema();
+            if (record.getSourceRecord() instanceof PulsarRecord) {
+                // we are receiving data directly from another Pulsar topic
+                // we must use the destination topic schema
+                schemaToWrite = schema;
+            }
+
+            if (schemaToWrite != null) {
                 return getProducer(record
                         .getDestinationTopic()
-                        .orElse(pulsarSinkConfig.getTopic()), 
record.getSchema())
-                        .newMessage(record.getSchema());
+                        .orElse(pulsarSinkConfig.getTopic()), schemaToWrite)
+                        .newMessage(schemaToWrite);
             } else {
                 return getProducer(record
                         .getDestinationTopic()
-                        .orElse(pulsarSinkConfig.getTopic()), 
record.getSchema())
+                        .orElse(pulsarSinkConfig.getTopic()), null)
                         .newMessage();
             }
         }
 
         @Override
-        public void sendOutputMessage(TypedMessageBuilder<T> msg, Record<T> 
record) {
+        public void sendOutputMessage(TypedMessageBuilder<T> msg, 
SinkRecord<T> record) {
             msg.sendAsync().thenAccept(messageId -> {
                 //no op
             }).exceptionally(getPublishErrorHandler(record, false));
@@ -208,7 +214,7 @@ public class PulsarSink<T> implements Sink<T> {
         }
 
         @Override
-        public void sendOutputMessage(TypedMessageBuilder<T> msg, Record<T> 
record) {
+        public void sendOutputMessage(TypedMessageBuilder<T> msg, 
SinkRecord<T> record) {
             msg.sendAsync()
                     .thenAccept(messageId -> record.ack())
                     .exceptionally(getPublishErrorHandler(record, true));
@@ -223,26 +229,31 @@ public class PulsarSink<T> implements Sink<T> {
         }
 
         @Override
-        public TypedMessageBuilder<T> newMessage(Record<T> record) {
+        public TypedMessageBuilder<T> newMessage(SinkRecord<T> record) {
             if (!record.getPartitionId().isPresent()) {
                 throw new RuntimeException("PartitionId needs to be specified 
for every record while in Effectively-once mode");
             }
-
+            Schema<T> schemaToWrite = record.getSchema();
+            if (record.getSourceRecord() instanceof PulsarRecord) {
+                // we are receiving data directly from another Pulsar topic
+                // we must use the destination topic schema
+                schemaToWrite = schema;
+            }
             Producer<T> producer = getProducer(
                     
String.format("%s-%s",record.getDestinationTopic().orElse(pulsarSinkConfig.getTopic()),
 record.getPartitionId().get()),
                     record.getPartitionId().get(),
                     
record.getDestinationTopic().orElse(pulsarSinkConfig.getTopic()),
-                    record.getSchema()
+                    schemaToWrite
             );
-            if (record.getSchema() != null) {
-                return producer.newMessage(record.getSchema());
+            if (schemaToWrite != null) {
+                return producer.newMessage(schemaToWrite);
             } else {
                 return producer.newMessage();
             }
         }
 
         @Override
-        public void sendOutputMessage(TypedMessageBuilder<T> msg, Record<T> 
record) {
+        public void sendOutputMessage(TypedMessageBuilder<T> msg, 
SinkRecord<T> record) {
 
             if (!record.getRecordSequence().isPresent()) {
                 throw new RuntimeException("RecordSequence needs to be 
specified for every record while in Effectively-once mode");
@@ -292,7 +303,8 @@ public class PulsarSink<T> implements Sink<T> {
 
     @Override
     public void write(Record<T> record) {
-        TypedMessageBuilder<T> msg = pulsarSinkProcessor.newMessage(record);
+        SinkRecord<T> sinkRecord = (SinkRecord<T>) record;
+        TypedMessageBuilder<T> msg = 
pulsarSinkProcessor.newMessage(sinkRecord);
 
         if (record.getKey().isPresent() && !(record.getSchema() instanceof 
KeyValueSchema &&
                 ((KeyValueSchema) 
record.getSchema()).getKeyValueEncodingType() == 
KeyValueEncodingType.SEPARATED)) {
@@ -305,7 +317,6 @@ public class PulsarSink<T> implements Sink<T> {
             msg.properties(record.getProperties());
         }
 
-        SinkRecord<T> sinkRecord = (SinkRecord<T>) record;
         if (sinkRecord.getSourceRecord() instanceof PulsarRecord) {
             PulsarRecord<T> pulsarRecord = (PulsarRecord<T>) 
sinkRecord.getSourceRecord();
             // forward user properties to sink-topic
@@ -318,7 +329,7 @@ public class PulsarSink<T> implements Sink<T> {
             eventTime.ifPresent(msg::eventTime);
         }
 
-        pulsarSinkProcessor.sendOutputMessage(msg, record);
+        pulsarSinkProcessor.sendOutputMessage(msg, sinkRecord);
     }
 
     @Override
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarRecord.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarRecord.java
index 3af9a41..46a58a8 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarRecord.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarRecord.java
@@ -28,6 +28,7 @@ import lombok.ToString;
 
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.common.api.EncryptionContext;
 import org.apache.pulsar.functions.utils.FunctionCommon;
 
@@ -41,6 +42,7 @@ public class PulsarRecord<T> implements 
RecordWithEncryptionContext<T> {
     private final int partition;
 
     private final Message<T> message;
+    private final Schema<T> schema;
 
     private final Runnable failFunction;
     private final Runnable ackFunction;
@@ -75,6 +77,11 @@ public class PulsarRecord<T> implements 
RecordWithEncryptionContext<T> {
     }
 
     @Override
+    public Schema<T> getSchema() {
+        return schema;
+    }
+
+    @Override
     public Optional<Long> getEventTime() {
         if (message.getEventTime() != 0) {
             return Optional.of(message.getEventTime());
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
index 67f7b67..310e28c 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
@@ -31,6 +31,7 @@ import lombok.Data;
 import lombok.extern.slf4j.Slf4j;
 
 import org.apache.pulsar.client.api.*;
+import org.apache.pulsar.client.impl.MessageImpl;
 import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
 import org.apache.pulsar.functions.api.Record;
 import org.apache.pulsar.common.functions.FunctionConfig;
@@ -116,9 +117,14 @@ public class PulsarSource<T> extends PushSource<T> 
implements MessageListener<T>
 
     @Override
     public void received(Consumer<T> consumer, Message<T> message) {
-
+        Schema<T> schema = null;
+        if (message instanceof MessageImpl) {
+            MessageImpl impl = (MessageImpl) message;
+            schema = impl.getSchema();
+        }
         Record<T> record = PulsarRecord.<T>builder()
                 .message(message)
+                .schema(schema)
                 .topicName(message.getTopicName())
                 .ackFunction(() -> {
                     if (pulsarSourceConfig
diff --git 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java
 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java
index ed64f80..a61df12 100644
--- 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java
+++ 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java
@@ -118,6 +118,7 @@ public class PulsarSinkTest {
         Producer producer = mock(Producer.class);
         doReturn(producer).when(producerBuilder).create();
         doReturn(typedMessageBuilder).when(producer).newMessage();
+        
doReturn(typedMessageBuilder).when(producer).newMessage(any(Schema.class));
 
         doReturn(producerBuilder).when(pulsarClient).newProducer();
         doReturn(producerBuilder).when(pulsarClient).newProducer(any());
diff --git 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java
 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java
index c2e556c..be39508 100644
--- 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java
+++ 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java
@@ -18,7 +18,6 @@
  */
 package org.apache.pulsar.functions.source;
 
-
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.ArgumentMatchers.anyString;
@@ -41,15 +40,21 @@ import lombok.extern.slf4j.Slf4j;
 
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.ConsumerBuilder;
+import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.api.schema.GenericRecord;
+import org.apache.pulsar.client.impl.MessageImpl;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.common.functions.ConsumerConfig;
 import org.apache.pulsar.common.functions.FunctionConfig;
+import org.apache.pulsar.functions.api.Record;
 import org.apache.pulsar.functions.api.SerDe;
 import org.apache.pulsar.io.core.SourceContext;
 import org.mockito.ArgumentMatcher;
+import static org.testng.Assert.assertSame;
 import org.testng.annotations.Test;
 
 @Slf4j
@@ -268,4 +273,23 @@ public class PulsarSourceTest {
         }
 
     }
+
+    @Test
+    public void testPreserveOriginalSchema() throws Exception {
+        PulsarSourceConfig pulsarConfig = getPulsarConfigs(false);
+        pulsarConfig.setTypeClassName(GenericRecord.class.getName());
+
+        @Cleanup
+        PulsarSource<GenericRecord> pulsarSource = new 
PulsarSource<>(getPulsarClient(), pulsarConfig, new HashMap<>(), 
Thread.currentThread().getContextClassLoader());
+
+        pulsarSource.open(new HashMap<>(), mock(SourceContext.class));
+        Consumer consumer = mock(Consumer.class);
+        MessageImpl messageImpl = mock(MessageImpl.class);
+        Schema schema = mock(Schema.class);
+        when(messageImpl.getSchema()).thenReturn(schema);
+        pulsarSource.received(consumer, (Message) messageImpl);
+        verify(messageImpl.getSchema(), times(1));
+        Record<GenericRecord> pushed = pulsarSource.read();
+        assertSame(pushed.getSchema(), schema);
+    }
 }

Reply via email to