This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new de77f87a1 [INLONG-6639][Sort] Fix DynamicPulsarDeserializationSchema 
deserialize NPE problem (#6641)
de77f87a1 is described below

commit de77f87a13f3f0aff8f249a0e5dbbf4b50c29eee
Author: wangpeix <[email protected]>
AuthorDate: Mon Nov 28 13:24:08 2022 +0800

    [INLONG-6639][Sort] Fix DynamicPulsarDeserializationSchema deserialize NPE 
problem (#6641)
---
 .../inlong/sort/pulsar/table/DynamicPulsarDeserializationSchema.java  | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)

diff --git 
a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/DynamicPulsarDeserializationSchema.java
 
b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/DynamicPulsarDeserializationSchema.java
index 73e841f88..4e9431f55 100644
--- 
a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/DynamicPulsarDeserializationSchema.java
+++ 
b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/DynamicPulsarDeserializationSchema.java
@@ -127,7 +127,9 @@ public class DynamicPulsarDeserializationSchema implements 
PulsarDeserialization
         // also not for a cartesian product with the keys
         if (keyDeserialization == null && !hasMetadata) {
             valueDeserialization.deserialize(message.getData(), new 
CallbackCollector<>(inputRow -> {
-                sourceMetricData.outputMetricsWithEstimate(inputRow);
+                if (sourceMetricData != null) {
+                    sourceMetricData.outputMetricsWithEstimate(inputRow);
+                }
                 collector.collect(inputRow);
             }));
             return;

Reply via email to