This is an automated email from the ASF dual-hosted git repository.

mikexue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/eventmesh.git


The following commit(s) were added to refs/heads/master by this push:
     new 2a46b82ac [ISSUE #4596]Fix SourceWorker#convertRecordToEvent method 
converts ConnectRecord  to CloudEvent throw NPE (#4597)
2a46b82ac is described below

commit 2a46b82aca6a8f7a8286627a0df561dbe2d29099
Author: mxsm <[email protected]>
AuthorDate: Sun Dec 3 19:19:05 2023 +0800

    [ISSUE #4596]Fix SourceWorker#convertRecordToEvent method converts 
ConnectRecord  to CloudEvent throw NPE (#4597)
    
    * [ISSUE #4596]Fix SourceWorker#convertRecordToEvent method converts 
ConnectRecord  to CloudEvent throw NPE
    
    * fix CloudEventUtil#convertRecordToEvent method converts ConnectRecord to 
CloudEvent throw NPE
---
 .../apache/eventmesh/openconnect/SourceWorker.java | 13 +++++++------
 .../eventmesh/openconnect/util/CloudEventUtil.java | 22 +++++++++++-----------
 2 files changed, 18 insertions(+), 17 deletions(-)

diff --git 
a/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/SourceWorker.java
 
b/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/SourceWorker.java
index 71c3fea4d..2445382e7 100644
--- 
a/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/SourceWorker.java
+++ 
b/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/SourceWorker.java
@@ -244,12 +244,13 @@ public class SourceWorker implements ConnectorWorker {
             
.withData(Objects.requireNonNull(JsonUtils.toJSONString(connectRecord.getData())).getBytes(StandardCharsets.UTF_8))
             .withExtension("ttl", 10000);
 
-        for (String key : connectRecord.getExtensions().keySet()) {
-            if 
(CloudEventUtil.validateExtensionType(connectRecord.getExtensionObj(key))) {
-                cloudEventBuilder.withExtension(key, 
connectRecord.getExtension(key));
+        if (connectRecord.getExtensions() != null) {
+            for (String key : connectRecord.getExtensions().keySet()) {
+                if 
(CloudEventUtil.validateExtensionType(connectRecord.getExtensionObj(key))) {
+                    cloudEventBuilder.withExtension(key, 
connectRecord.getExtension(key));
+                }
             }
         }
-
         return cloudEventBuilder.build();
     }
 
@@ -329,7 +330,7 @@ public class SourceWorker implements ConnectorWorker {
             log.info("{} Committing offsets for {} acknowledged messages", 
this, committableOffsets.numCommittableMessages());
             if (committableOffsets.hasPending()) {
                 log.debug("{} There are currently {} pending messages spread 
across {} source partitions whose offsets will not be committed. "
-                    + "The source partition with the most pending messages is 
{}, with {} pending messages",
+                        + "The source partition with the most pending messages 
is {}, with {} pending messages",
                     this,
                     committableOffsets.numUncommittableMessages(),
                     committableOffsets.numDeques(),
@@ -337,7 +338,7 @@ public class SourceWorker implements ConnectorWorker {
                     committableOffsets.largestDequeSize());
             } else {
                 log.debug("{} There are currently no pending messages for this 
offset commit; "
-                    + "all messages dispatched to the task's producer since 
the last commit have been acknowledged",
+                        + "all messages dispatched to the task's producer 
since the last commit have been acknowledged",
                     this);
             }
         }
diff --git 
a/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/util/CloudEventUtil.java
 
b/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/util/CloudEventUtil.java
index 5691d43fd..64e5a9167 100644
--- 
a/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/util/CloudEventUtil.java
+++ 
b/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/util/CloudEventUtil.java
@@ -23,6 +23,7 @@ import java.net.URI;
 import java.net.URISyntaxException;
 import java.time.OffsetDateTime;
 import java.util.Objects;
+import java.util.Optional;
 
 import io.cloudevents.CloudEvent;
 import io.cloudevents.core.builder.CloudEventBuilder;
@@ -33,32 +34,31 @@ import lombok.extern.slf4j.Slf4j;
 public class CloudEventUtil {
 
     public static CloudEvent convertRecordToEvent(ConnectRecord connectRecord) 
{
-        CloudEventBuilder cloudEventBuilder = CloudEventBuilder.v1()
-            .withData((byte[]) connectRecord.getData());
-        connectRecord.getExtensions().keySet().forEach(s -> {
-            switch (s) {
+        final CloudEventBuilder cloudEventBuilder = 
CloudEventBuilder.v1().withData((byte[]) connectRecord.getData());
+        
Optional.ofNullable(connectRecord.getExtensions()).ifPresent((extensions) -> 
extensions.keySet().forEach(key -> {
+            switch (key) {
                 case "id":
-                    cloudEventBuilder.withId(connectRecord.getExtension(s));
+                    cloudEventBuilder.withId(connectRecord.getExtension(key));
                     break;
                 case "topic":
-                    
cloudEventBuilder.withSubject(connectRecord.getExtension(s));
+                    
cloudEventBuilder.withSubject(connectRecord.getExtension(key));
                     break;
                 case "source":
                     try {
-                        cloudEventBuilder.withSource(new 
URI(connectRecord.getExtension(s)));
+                        cloudEventBuilder.withSource(new 
URI(connectRecord.getExtension(key)));
                     } catch (URISyntaxException e) {
                         throw new RuntimeException(e);
                     }
                     break;
                 case "type":
-                    cloudEventBuilder.withType(connectRecord.getExtension(s));
+                    
cloudEventBuilder.withType(connectRecord.getExtension(key));
                     break;
                 default:
-                    if 
(validateExtensionType(connectRecord.getExtensionObj(s))) {
-                        cloudEventBuilder.withExtension(s, 
connectRecord.getExtension(s));
+                    if 
(validateExtensionType(connectRecord.getExtensionObj(key))) {
+                        cloudEventBuilder.withExtension(key, 
connectRecord.getExtension(key));
                     }
             }
-        });
+        }));
         return cloudEventBuilder.build();
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to