This is an automated email from the ASF dual-hosted git repository.

mikexue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/eventmesh.git


The following commit(s) were added to refs/heads/master by this push:
     new cc36756d0 [ISSUE#4568] Support adding extensions from ConnectRcord to 
CloudEvent (#4569)
cc36756d0 is described below

commit cc36756d076c50bad23ee2978091ee6a9772792d
Author: hhuang <[email protected]>
AuthorDate: Tue Nov 21 11:10:44 2023 +0800

    [ISSUE#4568] Support adding extensions from ConnectRcord to CloudEvent 
(#4569)
    
    * feat() : Support adding extensions from ConnectRcord to CloudEvent
    
    * feat() : Determine whether there are primitive types
    
    * fix() : Resolve ambiguity in method names in AbstractTCPServer
    
    * feat : more put and get method in KeyValue
    
    * feat : simplify Number type put method
---
 .../apache/eventmesh/openconnect/SourceWorker.java |  26 ++-
 .../eventmesh/openconnect/util/CloudEventUtil.java |  11 +-
 .../offsetmgmt/api/data/ConnectRecord.java         |   9 +-
 .../offsetmgmt/api/data/DefaultKeyValue.java       | 223 +++++++++++++++++----
 .../openconnect/offsetmgmt/api/data/KeyValue.java  |  40 +++-
 5 files changed, 254 insertions(+), 55 deletions(-)

diff --git 
a/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/SourceWorker.java
 
b/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/SourceWorker.java
index ee0a8a865..d95f9c189 100644
--- 
a/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/SourceWorker.java
+++ 
b/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/SourceWorker.java
@@ -41,6 +41,7 @@ import 
org.apache.eventmesh.openconnect.offsetmgmt.api.storage.DefaultOffsetMana
 import 
org.apache.eventmesh.openconnect.offsetmgmt.api.storage.OffsetManagementService;
 import 
org.apache.eventmesh.openconnect.offsetmgmt.api.storage.OffsetStorageReaderImpl;
 import 
org.apache.eventmesh.openconnect.offsetmgmt.api.storage.OffsetStorageWriterImpl;
+import org.apache.eventmesh.openconnect.util.CloudEventUtil;
 import org.apache.eventmesh.spi.EventMeshExtensionFactory;
 
 import org.apache.commons.collections4.CollectionUtils;
@@ -233,16 +234,23 @@ public class SourceWorker implements ConnectorWorker {
     }
 
     private CloudEvent convertRecordToEvent(ConnectRecord connectRecord) {
+        CloudEventBuilder cloudEventBuilder = CloudEventBuilder.v1();
+
+        cloudEventBuilder.withId(UUID.randomUUID().toString())
+                .withSubject(config.getPubSubConfig().getSubject())
+                .withSource(URI.create("/"))
+                .withDataContentType("application/cloudevents+json")
+                .withType(CLOUD_EVENTS_PROTOCOL_NAME)
+                
.withData(Objects.requireNonNull(JsonUtils.toJSONString(connectRecord.getData())).getBytes(StandardCharsets.UTF_8))
+                .withExtension("ttl", 10000);
+
+        for (String key : connectRecord.getExtensions().keySet()) {
+            if 
(CloudEventUtil.validateExtensionType(connectRecord.getExtensionObj(key))) {
+                cloudEventBuilder.withExtension(key, 
connectRecord.getExtension(key));
+            }
+        }
 
-        return CloudEventBuilder.v1()
-            .withId(UUID.randomUUID().toString())
-            .withSubject(config.getPubSubConfig().getSubject())
-            .withSource(URI.create("/"))
-            .withDataContentType("application/cloudevents+json")
-            .withType(CLOUD_EVENTS_PROTOCOL_NAME)
-            
.withData(Objects.requireNonNull(JsonUtils.toJSONString(connectRecord.getData())).getBytes(StandardCharsets.UTF_8))
-            .withExtension("ttl", 10000)
-            .build();
+        return cloudEventBuilder.build();
     }
 
     private SendResult convertToSendResult(CloudEvent event) {
diff --git 
a/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/util/CloudEventUtil.java
 
b/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/util/CloudEventUtil.java
index 64cdd788a..54be8acc0 100644
--- 
a/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/util/CloudEventUtil.java
+++ 
b/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/util/CloudEventUtil.java
@@ -21,6 +21,7 @@ import 
org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
 
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.time.OffsetDateTime;
 import java.util.Objects;
 
 import io.cloudevents.CloudEvent;
@@ -53,7 +54,9 @@ public class CloudEventUtil {
                     cloudEventBuilder.withType(connectRecord.getExtension(s));
                     break;
                 default:
-                    cloudEventBuilder.withExtension(s, 
connectRecord.getExtension(s));
+                    if 
(validateExtensionType(connectRecord.getExtensionObj(s))) {
+                        cloudEventBuilder.withExtension(s, 
connectRecord.getExtension(s));
+                    }
             }
         });
         return cloudEventBuilder.build();
@@ -74,4 +77,10 @@ public class CloudEventUtil {
         connectRecord.addExtension("datacontenttype", 
event.getDataContentType());
         return connectRecord;
     }
+
+    public static boolean validateExtensionType(Object obj) {
+        return obj instanceof String || obj instanceof Number || obj 
instanceof Boolean
+                || obj instanceof URI || obj instanceof OffsetDateTime || obj 
instanceof byte[];
+    }
+
 }
diff --git 
a/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/data/ConnectRecord.java
 
b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/data/ConnectRecord.java
index 68a4f0537..119f058b5 100644
--- 
a/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/data/ConnectRecord.java
+++ 
b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/data/ConnectRecord.java
@@ -83,7 +83,7 @@ public class ConnectRecord {
         }
         Set<String> keySet = extensions.keySet();
         for (String key : keySet) {
-            this.extensions.put(key, extensions.getString(key));
+            this.extensions.put(key, extensions.getObject(key));
         }
     }
 
@@ -101,6 +101,13 @@ public class ConnectRecord {
         return this.extensions.getString(key);
     }
 
+    public <T> T getExtension(String key, Class<T> c) {
+        if (this.extensions == null) {
+            return null;
+        }
+        return this.extensions.getObject(key, c);
+    }
+
     public Object getExtensionObj(String key) {
         if (this.extensions == null) {
             return null;
diff --git 
a/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/data/DefaultKeyValue.java
 
b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/data/DefaultKeyValue.java
index 39f65e86b..a0390c189 100644
--- 
a/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/data/DefaultKeyValue.java
+++ 
b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/data/DefaultKeyValue.java
@@ -17,78 +17,113 @@
 
 package org.apache.eventmesh.openconnect.offsetmgmt.api.data;
 
+import java.net.URI;
+import java.time.OffsetDateTime;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
 public class DefaultKeyValue implements KeyValue {
 
-    private Map<String, Object> properties = new ConcurrentHashMap<>();
+    private final Map<String, Object> properties;
 
-    @Override
-    public boolean getBoolean(String key) {
-        if (!properties.containsKey(key)) {
-            return false;
-        }
-        return Boolean.parseBoolean(String.valueOf(properties.get(key)));
+    public DefaultKeyValue() {
+        properties = new ConcurrentHashMap<>();
     }
 
     @Override
-    public boolean getBoolean(String key, boolean defaultValue) {
-        return properties.containsKey(key) ? getBoolean(key) : defaultValue;
+    public KeyValue put(String key, Boolean value) {
+        properties.put(key, value);
+        return this;
     }
 
     @Override
-    public short getShort(String key) {
-        if (!properties.containsKey(key)) {
-            return 0;
-        }
-        return Short.parseShort(String.valueOf(properties.get(key)));
+    public KeyValue put(String key, Number value) {
+        properties.put(key, value);
+        return this;
+
     }
 
     @Override
-    public short getShort(String key, short defaultValue) {
-        return properties.containsKey(key) ? getShort(key) : defaultValue;
+    public KeyValue put(String key, byte[] value) {
+        properties.put(key, value);
+        return this;
     }
 
     @Override
-    public KeyValue put(String key, boolean value) {
-        properties.put(key, String.valueOf(value));
+    public KeyValue put(String key, String value) {
+        properties.put(key, value);
         return this;
     }
 
     @Override
-    public KeyValue put(String key, short value) {
-        properties.put(key, String.valueOf(value));
+    public KeyValue put(String key, URI value) {
+        properties.put(key, value);
         return this;
     }
 
-    public DefaultKeyValue() {
-        properties = new ConcurrentHashMap<String, Object>();
+    @Override
+    public KeyValue put(String key, OffsetDateTime value) {
+        properties.put(key, value);
+        return this;
     }
 
     @Override
-    public KeyValue put(String key, int value) {
-        properties.put(key, String.valueOf(value));
+    public KeyValue put(String key, Object value) {
+        properties.put(key, value);
         return this;
     }
 
     @Override
-    public KeyValue put(String key, long value) {
-        properties.put(key, String.valueOf(value));
-        return this;
+    public boolean getBoolean(String key) {
+        if (!properties.containsKey(key)) {
+            return false;
+        }
+        Object val = properties.get(key);
+        if (val instanceof Boolean) {
+            return (Boolean) val;
+        }
+        return false;
     }
 
     @Override
-    public KeyValue put(String key, double value) {
-        properties.put(key, String.valueOf(value));
-        return this;
+    public boolean getBoolean(String key, boolean defaultValue) {
+        return properties.containsKey(key) ? getBoolean(key) : defaultValue;
     }
 
     @Override
-    public KeyValue put(String key, Object value) {
-        properties.put(key, value);
-        return this;
+    public byte getByte(String key) {
+        if (!properties.containsKey(key)) {
+            return 0;
+        }
+        Object val = properties.get(key);
+        if (val instanceof Byte) {
+            return (Byte) val;
+        }
+        return 0;
+    }
+
+    @Override
+    public byte getByte(String key, byte defaultValue) {
+        return properties.containsKey(key) ? getByte(key) : defaultValue;
+
+    }
+
+    @Override
+    public short getShort(String key) {
+        if (!properties.containsKey(key)) {
+            return 0;
+        }
+        Object val = properties.get(key);
+        if (val instanceof Short) {
+            return (Short) val;
+        }
+        return 0;
+    }
+
+    @Override
+    public short getShort(String key, short defaultValue) {
+        return properties.containsKey(key) ? getShort(key) : defaultValue;
     }
 
     @Override
@@ -96,7 +131,11 @@ public class DefaultKeyValue implements KeyValue {
         if (!properties.containsKey(key)) {
             return 0;
         }
-        return Integer.parseInt(String.valueOf(properties.get(key)));
+        Object val = properties.get(key);
+        if (val instanceof Integer) {
+            return (Integer) val;
+        }
+        return 0;
     }
 
     @Override
@@ -109,7 +148,11 @@ public class DefaultKeyValue implements KeyValue {
         if (!properties.containsKey(key)) {
             return 0;
         }
-        return Long.parseLong(String.valueOf(properties.get(key)));
+        Object val = properties.get(key);
+        if (val instanceof Long) {
+            return (Long) val;
+        }
+        return 0;
     }
 
     @Override
@@ -117,12 +160,33 @@ public class DefaultKeyValue implements KeyValue {
         return properties.containsKey(key) ? getLong(key) : defaultValue;
     }
 
+    @Override
+    public float getFloat(String key) {
+        if (!properties.containsKey(key)) {
+            return 0;
+        }
+        Object val = properties.get(key);
+        if (val instanceof Float) {
+            return (Float) val;
+        }
+        return 0;
+    }
+
+    @Override
+    public float getFloat(String key, float defaultValue) {
+        return properties.containsKey(key) ? getFloat(key) : defaultValue;
+    }
+
     @Override
     public double getDouble(String key) {
         if (!properties.containsKey(key)) {
             return 0;
         }
-        return Double.parseDouble(String.valueOf(properties.get(key)));
+        Object val = properties.get(key);
+        if (val instanceof Double) {
+            return (Double) val;
+        }
+        return 0;
     }
 
     @Override
@@ -131,13 +195,32 @@ public class DefaultKeyValue implements KeyValue {
     }
 
     @Override
-    public Object getObject(String key) {
-        return properties.get(key);
+    public byte[] getBytes(String key) {
+        if (!properties.containsKey(key)) {
+            return new byte[]{};
+        }
+        Object val = properties.get(key);
+        if (val instanceof byte[]) {
+            return (byte[]) val;
+        }
+        return new byte[]{};
+    }
+
+    @Override
+    public byte[] getBytes(String key, byte[] defaultValue) {
+        return properties.containsKey(key) ? getBytes(key) : defaultValue;
     }
 
     @Override
     public String getString(String key) {
-        return String.valueOf(properties.get(key));
+        if (!properties.containsKey(key)) {
+            return "";
+        }
+        Object val = properties.get(key);
+        if (val instanceof String) {
+            return (String) val;
+        }
+        return "";
     }
 
     @Override
@@ -145,6 +228,68 @@ public class DefaultKeyValue implements KeyValue {
         return properties.containsKey(key) ? getString(key) : defaultValue;
     }
 
+    @Override
+    public URI getURI(String key) {
+        if (!properties.containsKey(key)) {
+            return null;
+        }
+        Object val = properties.get(key);
+        if (val instanceof URI) {
+            return (URI) val;
+        }
+        return null;
+    }
+
+    @Override
+    public URI getURI(String key, URI defaultValue) {
+        return properties.containsKey(key) ? getURI(key) : defaultValue;
+    }
+
+    @Override
+    public OffsetDateTime getOffsetDateTime(String key) {
+        if (!properties.containsKey(key)) {
+            return null;
+        }
+        Object val = properties.get(key);
+        if (val instanceof OffsetDateTime) {
+            return (OffsetDateTime) val;
+        }
+        return null;
+    }
+
+    @Override
+    public OffsetDateTime getOffsetDateTime(String key, OffsetDateTime 
defaultValue) {
+        return properties.containsKey(key) ? getOffsetDateTime(key) : 
defaultValue;
+    }
+
+    @Override
+    public Object getObject(String key) {
+        return properties.getOrDefault(key, null);
+    }
+
+    @Override
+    public Object getObject(String key, Object defaultValue) {
+        return properties.getOrDefault(key, defaultValue);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public <T> T getObject(String key, Class<T> c) {
+        if (!properties.containsKey(key)) {
+            return null;
+        }
+        Object val = properties.get(key);
+        if (val.getClass() == c) {
+            return (T) val;
+        }
+        return null;
+    }
+
+    @Override
+    public <T> T getObject(String key, T defaultValue, Class<T> c) {
+        return properties.containsKey(key) ? getObject(key, c) : defaultValue;
+    }
+
     @Override
     public Set<String> keySet() {
         return properties.keySet();
diff --git 
a/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/data/KeyValue.java
 
b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/data/KeyValue.java
index 9cc8893a0..1cff3ddfc 100644
--- 
a/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/data/KeyValue.java
+++ 
b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/data/KeyValue.java
@@ -17,6 +17,8 @@
 
 package org.apache.eventmesh.openconnect.offsetmgmt.api.data;
 
+import java.net.URI;
+import java.time.OffsetDateTime;
 import java.util.Set;
 
 /**
@@ -24,15 +26,17 @@ import java.util.Set;
  */
 public interface KeyValue {
 
-    KeyValue put(String key, boolean value);
+    KeyValue put(String key, Boolean value);
 
-    KeyValue put(String key, short value);
+    KeyValue put(String key, Number value);
 
-    KeyValue put(String key, int value);
+    KeyValue put(String key, byte[] value);
 
-    KeyValue put(String key, long value);
+    KeyValue put(String key, String value);
 
-    KeyValue put(String key, double value);
+    KeyValue put(String key, URI value);
+
+    KeyValue put(String key, OffsetDateTime value);
 
     KeyValue put(String key, Object value);
 
@@ -40,6 +44,10 @@ public interface KeyValue {
 
     boolean getBoolean(String key, boolean defaultValue);
 
+    byte getByte(String key);
+
+    byte getByte(String key, byte defaultValue);
+
     short getShort(String key);
 
     short getShort(String key, short defaultValue);
@@ -52,16 +60,38 @@ public interface KeyValue {
 
     long getLong(String key, long defaultValue);
 
+    float getFloat(String key);
+
+    float getFloat(String key, float defaultValue);
+
     double getDouble(String key);
 
     double getDouble(String key, double defaultValue);
 
+    byte[] getBytes(String key);
+
+    byte[] getBytes(String key, byte[] defaultValue);
+
     String getString(String key);
 
     String getString(String key, String defaultValue);
 
+    URI getURI(String key);
+
+    URI getURI(String key, URI defaultValue);
+
+    OffsetDateTime getOffsetDateTime(String key);
+
+    OffsetDateTime getOffsetDateTime(String key, OffsetDateTime defaultValue);
+
     Object getObject(String key);
 
+    Object getObject(String key, Object defaultValue);
+
+    <T> T getObject(String key, Class<T> c);
+
+    <T> T getObject(String key, T defaultValue, Class<T> c);
+
     Set<String> keySet();
 
     boolean containsKey(String key);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to