This is an automated email from the ASF dual-hosted git repository.
mikexue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/eventmesh.git
The following commit(s) were added to refs/heads/master by this push:
new cc36756d0 [ISSUE#4568] Support adding extensions from ConnectRcord to
CloudEvent (#4569)
cc36756d0 is described below
commit cc36756d076c50bad23ee2978091ee6a9772792d
Author: hhuang <[email protected]>
AuthorDate: Tue Nov 21 11:10:44 2023 +0800
[ISSUE#4568] Support adding extensions from ConnectRcord to CloudEvent
(#4569)
* feat() : Support adding extensions from ConnectRcord to CloudEvent
* feat() : Determine whether there are primitive types
* fix() : Resolve ambiguity in method names in AbstractTCPServer
* feat : more put and get method in KeyValue
* feat : simplify Number type put method
---
.../apache/eventmesh/openconnect/SourceWorker.java | 26 ++-
.../eventmesh/openconnect/util/CloudEventUtil.java | 11 +-
.../offsetmgmt/api/data/ConnectRecord.java | 9 +-
.../offsetmgmt/api/data/DefaultKeyValue.java | 223 +++++++++++++++++----
.../openconnect/offsetmgmt/api/data/KeyValue.java | 40 +++-
5 files changed, 254 insertions(+), 55 deletions(-)
diff --git
a/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/SourceWorker.java
b/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/SourceWorker.java
index ee0a8a865..d95f9c189 100644
---
a/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/SourceWorker.java
+++
b/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/SourceWorker.java
@@ -41,6 +41,7 @@ import
org.apache.eventmesh.openconnect.offsetmgmt.api.storage.DefaultOffsetMana
import
org.apache.eventmesh.openconnect.offsetmgmt.api.storage.OffsetManagementService;
import
org.apache.eventmesh.openconnect.offsetmgmt.api.storage.OffsetStorageReaderImpl;
import
org.apache.eventmesh.openconnect.offsetmgmt.api.storage.OffsetStorageWriterImpl;
+import org.apache.eventmesh.openconnect.util.CloudEventUtil;
import org.apache.eventmesh.spi.EventMeshExtensionFactory;
import org.apache.commons.collections4.CollectionUtils;
@@ -233,16 +234,23 @@ public class SourceWorker implements ConnectorWorker {
}
private CloudEvent convertRecordToEvent(ConnectRecord connectRecord) {
+ CloudEventBuilder cloudEventBuilder = CloudEventBuilder.v1();
+
+ cloudEventBuilder.withId(UUID.randomUUID().toString())
+ .withSubject(config.getPubSubConfig().getSubject())
+ .withSource(URI.create("/"))
+ .withDataContentType("application/cloudevents+json")
+ .withType(CLOUD_EVENTS_PROTOCOL_NAME)
+
.withData(Objects.requireNonNull(JsonUtils.toJSONString(connectRecord.getData())).getBytes(StandardCharsets.UTF_8))
+ .withExtension("ttl", 10000);
+
+ for (String key : connectRecord.getExtensions().keySet()) {
+ if
(CloudEventUtil.validateExtensionType(connectRecord.getExtensionObj(key))) {
+ cloudEventBuilder.withExtension(key,
connectRecord.getExtension(key));
+ }
+ }
- return CloudEventBuilder.v1()
- .withId(UUID.randomUUID().toString())
- .withSubject(config.getPubSubConfig().getSubject())
- .withSource(URI.create("/"))
- .withDataContentType("application/cloudevents+json")
- .withType(CLOUD_EVENTS_PROTOCOL_NAME)
-
.withData(Objects.requireNonNull(JsonUtils.toJSONString(connectRecord.getData())).getBytes(StandardCharsets.UTF_8))
- .withExtension("ttl", 10000)
- .build();
+ return cloudEventBuilder.build();
}
private SendResult convertToSendResult(CloudEvent event) {
diff --git
a/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/util/CloudEventUtil.java
b/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/util/CloudEventUtil.java
index 64cdd788a..54be8acc0 100644
---
a/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/util/CloudEventUtil.java
+++
b/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/util/CloudEventUtil.java
@@ -21,6 +21,7 @@ import
org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
import java.net.URI;
import java.net.URISyntaxException;
+import java.time.OffsetDateTime;
import java.util.Objects;
import io.cloudevents.CloudEvent;
@@ -53,7 +54,9 @@ public class CloudEventUtil {
cloudEventBuilder.withType(connectRecord.getExtension(s));
break;
default:
- cloudEventBuilder.withExtension(s,
connectRecord.getExtension(s));
+ if
(validateExtensionType(connectRecord.getExtensionObj(s))) {
+ cloudEventBuilder.withExtension(s,
connectRecord.getExtension(s));
+ }
}
});
return cloudEventBuilder.build();
@@ -74,4 +77,10 @@ public class CloudEventUtil {
connectRecord.addExtension("datacontenttype",
event.getDataContentType());
return connectRecord;
}
+
+ public static boolean validateExtensionType(Object obj) {
+ return obj instanceof String || obj instanceof Number || obj
instanceof Boolean
+ || obj instanceof URI || obj instanceof OffsetDateTime || obj
instanceof byte[];
+ }
+
}
diff --git
a/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/data/ConnectRecord.java
b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/data/ConnectRecord.java
index 68a4f0537..119f058b5 100644
---
a/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/data/ConnectRecord.java
+++
b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/data/ConnectRecord.java
@@ -83,7 +83,7 @@ public class ConnectRecord {
}
Set<String> keySet = extensions.keySet();
for (String key : keySet) {
- this.extensions.put(key, extensions.getString(key));
+ this.extensions.put(key, extensions.getObject(key));
}
}
@@ -101,6 +101,13 @@ public class ConnectRecord {
return this.extensions.getString(key);
}
+ public <T> T getExtension(String key, Class<T> c) {
+ if (this.extensions == null) {
+ return null;
+ }
+ return this.extensions.getObject(key, c);
+ }
+
public Object getExtensionObj(String key) {
if (this.extensions == null) {
return null;
diff --git
a/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/data/DefaultKeyValue.java
b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/data/DefaultKeyValue.java
index 39f65e86b..a0390c189 100644
---
a/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/data/DefaultKeyValue.java
+++
b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/data/DefaultKeyValue.java
@@ -17,78 +17,113 @@
package org.apache.eventmesh.openconnect.offsetmgmt.api.data;
+import java.net.URI;
+import java.time.OffsetDateTime;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
public class DefaultKeyValue implements KeyValue {
- private Map<String, Object> properties = new ConcurrentHashMap<>();
+ private final Map<String, Object> properties;
- @Override
- public boolean getBoolean(String key) {
- if (!properties.containsKey(key)) {
- return false;
- }
- return Boolean.parseBoolean(String.valueOf(properties.get(key)));
+ public DefaultKeyValue() {
+ properties = new ConcurrentHashMap<>();
}
@Override
- public boolean getBoolean(String key, boolean defaultValue) {
- return properties.containsKey(key) ? getBoolean(key) : defaultValue;
+ public KeyValue put(String key, Boolean value) {
+ properties.put(key, value);
+ return this;
}
@Override
- public short getShort(String key) {
- if (!properties.containsKey(key)) {
- return 0;
- }
- return Short.parseShort(String.valueOf(properties.get(key)));
+ public KeyValue put(String key, Number value) {
+ properties.put(key, value);
+ return this;
+
}
@Override
- public short getShort(String key, short defaultValue) {
- return properties.containsKey(key) ? getShort(key) : defaultValue;
+ public KeyValue put(String key, byte[] value) {
+ properties.put(key, value);
+ return this;
}
@Override
- public KeyValue put(String key, boolean value) {
- properties.put(key, String.valueOf(value));
+ public KeyValue put(String key, String value) {
+ properties.put(key, value);
return this;
}
@Override
- public KeyValue put(String key, short value) {
- properties.put(key, String.valueOf(value));
+ public KeyValue put(String key, URI value) {
+ properties.put(key, value);
return this;
}
- public DefaultKeyValue() {
- properties = new ConcurrentHashMap<String, Object>();
+ @Override
+ public KeyValue put(String key, OffsetDateTime value) {
+ properties.put(key, value);
+ return this;
}
@Override
- public KeyValue put(String key, int value) {
- properties.put(key, String.valueOf(value));
+ public KeyValue put(String key, Object value) {
+ properties.put(key, value);
return this;
}
@Override
- public KeyValue put(String key, long value) {
- properties.put(key, String.valueOf(value));
- return this;
+ public boolean getBoolean(String key) {
+ if (!properties.containsKey(key)) {
+ return false;
+ }
+ Object val = properties.get(key);
+ if (val instanceof Boolean) {
+ return (Boolean) val;
+ }
+ return false;
}
@Override
- public KeyValue put(String key, double value) {
- properties.put(key, String.valueOf(value));
- return this;
+ public boolean getBoolean(String key, boolean defaultValue) {
+ return properties.containsKey(key) ? getBoolean(key) : defaultValue;
}
@Override
- public KeyValue put(String key, Object value) {
- properties.put(key, value);
- return this;
+ public byte getByte(String key) {
+ if (!properties.containsKey(key)) {
+ return 0;
+ }
+ Object val = properties.get(key);
+ if (val instanceof Byte) {
+ return (Byte) val;
+ }
+ return 0;
+ }
+
+ @Override
+ public byte getByte(String key, byte defaultValue) {
+ return properties.containsKey(key) ? getByte(key) : defaultValue;
+
+ }
+
+ @Override
+ public short getShort(String key) {
+ if (!properties.containsKey(key)) {
+ return 0;
+ }
+ Object val = properties.get(key);
+ if (val instanceof Short) {
+ return (Short) val;
+ }
+ return 0;
+ }
+
+ @Override
+ public short getShort(String key, short defaultValue) {
+ return properties.containsKey(key) ? getShort(key) : defaultValue;
}
@Override
@@ -96,7 +131,11 @@ public class DefaultKeyValue implements KeyValue {
if (!properties.containsKey(key)) {
return 0;
}
- return Integer.parseInt(String.valueOf(properties.get(key)));
+ Object val = properties.get(key);
+ if (val instanceof Integer) {
+ return (Integer) val;
+ }
+ return 0;
}
@Override
@@ -109,7 +148,11 @@ public class DefaultKeyValue implements KeyValue {
if (!properties.containsKey(key)) {
return 0;
}
- return Long.parseLong(String.valueOf(properties.get(key)));
+ Object val = properties.get(key);
+ if (val instanceof Long) {
+ return (Long) val;
+ }
+ return 0;
}
@Override
@@ -117,12 +160,33 @@ public class DefaultKeyValue implements KeyValue {
return properties.containsKey(key) ? getLong(key) : defaultValue;
}
+ @Override
+ public float getFloat(String key) {
+ if (!properties.containsKey(key)) {
+ return 0;
+ }
+ Object val = properties.get(key);
+ if (val instanceof Float) {
+ return (Float) val;
+ }
+ return 0;
+ }
+
+ @Override
+ public float getFloat(String key, float defaultValue) {
+ return properties.containsKey(key) ? getFloat(key) : defaultValue;
+ }
+
@Override
public double getDouble(String key) {
if (!properties.containsKey(key)) {
return 0;
}
- return Double.parseDouble(String.valueOf(properties.get(key)));
+ Object val = properties.get(key);
+ if (val instanceof Double) {
+ return (Double) val;
+ }
+ return 0;
}
@Override
@@ -131,13 +195,32 @@ public class DefaultKeyValue implements KeyValue {
}
@Override
- public Object getObject(String key) {
- return properties.get(key);
+ public byte[] getBytes(String key) {
+ if (!properties.containsKey(key)) {
+ return new byte[]{};
+ }
+ Object val = properties.get(key);
+ if (val instanceof byte[]) {
+ return (byte[]) val;
+ }
+ return new byte[]{};
+ }
+
+ @Override
+ public byte[] getBytes(String key, byte[] defaultValue) {
+ return properties.containsKey(key) ? getBytes(key) : defaultValue;
}
@Override
public String getString(String key) {
- return String.valueOf(properties.get(key));
+ if (!properties.containsKey(key)) {
+ return "";
+ }
+ Object val = properties.get(key);
+ if (val instanceof String) {
+ return (String) val;
+ }
+ return "";
}
@Override
@@ -145,6 +228,68 @@ public class DefaultKeyValue implements KeyValue {
return properties.containsKey(key) ? getString(key) : defaultValue;
}
+ @Override
+ public URI getURI(String key) {
+ if (!properties.containsKey(key)) {
+ return null;
+ }
+ Object val = properties.get(key);
+ if (val instanceof URI) {
+ return (URI) val;
+ }
+ return null;
+ }
+
+ @Override
+ public URI getURI(String key, URI defaultValue) {
+ return properties.containsKey(key) ? getURI(key) : defaultValue;
+ }
+
+ @Override
+ public OffsetDateTime getOffsetDateTime(String key) {
+ if (!properties.containsKey(key)) {
+ return null;
+ }
+ Object val = properties.get(key);
+ if (val instanceof OffsetDateTime) {
+ return (OffsetDateTime) val;
+ }
+ return null;
+ }
+
+ @Override
+ public OffsetDateTime getOffsetDateTime(String key, OffsetDateTime
defaultValue) {
+ return properties.containsKey(key) ? getOffsetDateTime(key) :
defaultValue;
+ }
+
+ @Override
+ public Object getObject(String key) {
+ return properties.getOrDefault(key, null);
+ }
+
+ @Override
+ public Object getObject(String key, Object defaultValue) {
+ return properties.getOrDefault(key, defaultValue);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public <T> T getObject(String key, Class<T> c) {
+ if (!properties.containsKey(key)) {
+ return null;
+ }
+ Object val = properties.get(key);
+ if (val.getClass() == c) {
+ return (T) val;
+ }
+ return null;
+ }
+
+ @Override
+ public <T> T getObject(String key, T defaultValue, Class<T> c) {
+ return properties.containsKey(key) ? getObject(key, c) : defaultValue;
+ }
+
@Override
public Set<String> keySet() {
return properties.keySet();
diff --git
a/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/data/KeyValue.java
b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/data/KeyValue.java
index 9cc8893a0..1cff3ddfc 100644
---
a/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/data/KeyValue.java
+++
b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/data/KeyValue.java
@@ -17,6 +17,8 @@
package org.apache.eventmesh.openconnect.offsetmgmt.api.data;
+import java.net.URI;
+import java.time.OffsetDateTime;
import java.util.Set;
/**
@@ -24,15 +26,17 @@ import java.util.Set;
*/
public interface KeyValue {
- KeyValue put(String key, boolean value);
+ KeyValue put(String key, Boolean value);
- KeyValue put(String key, short value);
+ KeyValue put(String key, Number value);
- KeyValue put(String key, int value);
+ KeyValue put(String key, byte[] value);
- KeyValue put(String key, long value);
+ KeyValue put(String key, String value);
- KeyValue put(String key, double value);
+ KeyValue put(String key, URI value);
+
+ KeyValue put(String key, OffsetDateTime value);
KeyValue put(String key, Object value);
@@ -40,6 +44,10 @@ public interface KeyValue {
boolean getBoolean(String key, boolean defaultValue);
+ byte getByte(String key);
+
+ byte getByte(String key, byte defaultValue);
+
short getShort(String key);
short getShort(String key, short defaultValue);
@@ -52,16 +60,38 @@ public interface KeyValue {
long getLong(String key, long defaultValue);
+ float getFloat(String key);
+
+ float getFloat(String key, float defaultValue);
+
double getDouble(String key);
double getDouble(String key, double defaultValue);
+ byte[] getBytes(String key);
+
+ byte[] getBytes(String key, byte[] defaultValue);
+
String getString(String key);
String getString(String key, String defaultValue);
+ URI getURI(String key);
+
+ URI getURI(String key, URI defaultValue);
+
+ OffsetDateTime getOffsetDateTime(String key);
+
+ OffsetDateTime getOffsetDateTime(String key, OffsetDateTime defaultValue);
+
Object getObject(String key);
+ Object getObject(String key, Object defaultValue);
+
+ <T> T getObject(String key, Class<T> c);
+
+ <T> T getObject(String key, T defaultValue, Class<T> c);
+
Set<String> keySet();
boolean containsKey(String key);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]