This is an automated email from the ASF dual-hosted git repository.
mikexue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/eventmesh.git
The following commit(s) were added to refs/heads/master by this push:
new 2a46b82ac [ISSUE #4596]Fix SourceWorker#convertRecordToEvent method
converts ConnectRecord to CloudEvent throw NPE (#4597)
2a46b82ac is described below
commit 2a46b82aca6a8f7a8286627a0df561dbe2d29099
Author: mxsm <[email protected]>
AuthorDate: Sun Dec 3 19:19:05 2023 +0800
[ISSUE #4596]Fix SourceWorker#convertRecordToEvent method converts
ConnectRecord to CloudEvent throw NPE (#4597)
* [ISSUE #4596]Fix SourceWorker#convertRecordToEvent method converts
ConnectRecord to CloudEvent throw NPE
* fix CloudEventUtil#convertRecordToEvent method converts ConnectRecord to
CloudEvent throw NPE
---
.../apache/eventmesh/openconnect/SourceWorker.java | 13 +++++++------
.../eventmesh/openconnect/util/CloudEventUtil.java | 22 +++++++++++-----------
2 files changed, 18 insertions(+), 17 deletions(-)
diff --git
a/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/SourceWorker.java
b/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/SourceWorker.java
index 71c3fea4d..2445382e7 100644
---
a/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/SourceWorker.java
+++
b/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/SourceWorker.java
@@ -244,12 +244,13 @@ public class SourceWorker implements ConnectorWorker {
.withData(Objects.requireNonNull(JsonUtils.toJSONString(connectRecord.getData())).getBytes(StandardCharsets.UTF_8))
.withExtension("ttl", 10000);
- for (String key : connectRecord.getExtensions().keySet()) {
- if
(CloudEventUtil.validateExtensionType(connectRecord.getExtensionObj(key))) {
- cloudEventBuilder.withExtension(key,
connectRecord.getExtension(key));
+ if (connectRecord.getExtensions() != null) {
+ for (String key : connectRecord.getExtensions().keySet()) {
+ if
(CloudEventUtil.validateExtensionType(connectRecord.getExtensionObj(key))) {
+ cloudEventBuilder.withExtension(key,
connectRecord.getExtension(key));
+ }
}
}
-
return cloudEventBuilder.build();
}
@@ -329,7 +330,7 @@ public class SourceWorker implements ConnectorWorker {
log.info("{} Committing offsets for {} acknowledged messages",
this, committableOffsets.numCommittableMessages());
if (committableOffsets.hasPending()) {
log.debug("{} There are currently {} pending messages spread
across {} source partitions whose offsets will not be committed. "
- + "The source partition with the most pending messages is
{}, with {} pending messages",
+ + "The source partition with the most pending messages
is {}, with {} pending messages",
this,
committableOffsets.numUncommittableMessages(),
committableOffsets.numDeques(),
@@ -337,7 +338,7 @@ public class SourceWorker implements ConnectorWorker {
committableOffsets.largestDequeSize());
} else {
log.debug("{} There are currently no pending messages for this
offset commit; "
- + "all messages dispatched to the task's producer since
the last commit have been acknowledged",
+ + "all messages dispatched to the task's producer
since the last commit have been acknowledged",
this);
}
}
diff --git
a/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/util/CloudEventUtil.java
b/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/util/CloudEventUtil.java
index 5691d43fd..64e5a9167 100644
---
a/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/util/CloudEventUtil.java
+++
b/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/util/CloudEventUtil.java
@@ -23,6 +23,7 @@ import java.net.URI;
import java.net.URISyntaxException;
import java.time.OffsetDateTime;
import java.util.Objects;
+import java.util.Optional;
import io.cloudevents.CloudEvent;
import io.cloudevents.core.builder.CloudEventBuilder;
@@ -33,32 +34,31 @@ import lombok.extern.slf4j.Slf4j;
public class CloudEventUtil {
public static CloudEvent convertRecordToEvent(ConnectRecord connectRecord)
{
- CloudEventBuilder cloudEventBuilder = CloudEventBuilder.v1()
- .withData((byte[]) connectRecord.getData());
- connectRecord.getExtensions().keySet().forEach(s -> {
- switch (s) {
+ final CloudEventBuilder cloudEventBuilder =
CloudEventBuilder.v1().withData((byte[]) connectRecord.getData());
+
Optional.ofNullable(connectRecord.getExtensions()).ifPresent((extensions) ->
extensions.keySet().forEach(key -> {
+ switch (key) {
case "id":
- cloudEventBuilder.withId(connectRecord.getExtension(s));
+ cloudEventBuilder.withId(connectRecord.getExtension(key));
break;
case "topic":
-
cloudEventBuilder.withSubject(connectRecord.getExtension(s));
+
cloudEventBuilder.withSubject(connectRecord.getExtension(key));
break;
case "source":
try {
- cloudEventBuilder.withSource(new
URI(connectRecord.getExtension(s)));
+ cloudEventBuilder.withSource(new
URI(connectRecord.getExtension(key)));
} catch (URISyntaxException e) {
throw new RuntimeException(e);
}
break;
case "type":
- cloudEventBuilder.withType(connectRecord.getExtension(s));
+
cloudEventBuilder.withType(connectRecord.getExtension(key));
break;
default:
- if
(validateExtensionType(connectRecord.getExtensionObj(s))) {
- cloudEventBuilder.withExtension(s,
connectRecord.getExtension(s));
+ if
(validateExtensionType(connectRecord.getExtensionObj(key))) {
+ cloudEventBuilder.withExtension(key,
connectRecord.getExtension(key));
}
}
- });
+ }));
return cloudEventBuilder.build();
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]