This is an automated email from the ASF dual-hosted git repository.

lrhkobe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/eventmesh.git


The following commit(s) were added to refs/heads/master by this push:
     new 40a2aea81 [ISSUE #4519]Rust sdk support CloudEvents (#4520)
40a2aea81 is described below

commit 40a2aea812e75895e70e42be99c3736bcfcb10b1
Author: mxsm <[email protected]>
AuthorDate: Thu Nov 2 10:00:23 2023 +0800

    [ISSUE #4519]Rust sdk support CloudEvents (#4520)
    
    * [ISSUE #4519]Rust sdk support CloudEvents
    
    * optimize code
    
    * fix complie error
---
 eventmesh-sdks/eventmesh-sdk-rust/Cargo.toml       |  23 ++-
 .../examples/grpc/producer_example.rs              | 101 +++++++---
 .../eventmesh-sdk-rust/rust-toolchain.toml         |   2 +-
 eventmesh-sdks/eventmesh-sdk-rust/src/common.rs    |   2 +-
 ...ge_utils.rs => grpc_eventmesh_message_utils.rs} | 222 ++++++++++++++++++---
 .../eventmesh-sdk-rust/src/common/protocol_key.rs  |   4 +
 eventmesh-sdks/eventmesh-sdk-rust/src/grpc.rs      |   6 +-
 .../eventmesh-sdk-rust/src/grpc/grpc_consumer.rs   |   6 +-
 .../eventmesh-sdk-rust/src/grpc/grpc_producer.rs   |  15 +-
 .../src/grpc/{eventmesh_message.rs => impl.rs}     |   4 +-
 .../grpc_producer_impl.rs}                         |  67 ++++---
 eventmesh-sdks/eventmesh-sdk-rust/src/lib.rs       |  18 +-
 eventmesh-sdks/eventmesh-sdk-rust/src/model.rs     |   1 +
 .../eventmesh_message.rs => model/event_clouds.rs} |  13 +-
 .../eventmesh-sdk-rust/src/model/message.rs        |  16 +-
 .../tests/eventmesh_message_utils_test.rs          |   2 +-
 16 files changed, 383 insertions(+), 119 deletions(-)

diff --git a/eventmesh-sdks/eventmesh-sdk-rust/Cargo.toml 
b/eventmesh-sdks/eventmesh-sdk-rust/Cargo.toml
index 0dd9fa939..55b4881a4 100644
--- a/eventmesh-sdks/eventmesh-sdk-rust/Cargo.toml
+++ b/eventmesh-sdks/eventmesh-sdk-rust/Cargo.toml
@@ -26,15 +26,16 @@ description = "Rust client for Apache EventMesh"
 license = "Apache-2.0"
 keywords = ["EventMesh", "SDK", "rust-client", "rust", "eventmesh-rust-sdk"]
 readme = "./README.md"
-homepage      = "https://github.com/apache/eventmesh";
-repository    = "https://github.com/apache/eventmesh";
+homepage = "https://github.com/apache/eventmesh";
+repository = "https://github.com/apache/eventmesh";
 
 [features]
-default = ["grpc","eventmesh_message"]
-full = ["grpc","eventmesh_message"]
-eventmesh_message=[]
-tls=[]
-grpc=[]
+default = ["grpc", "eventmesh_message"]
+full = ["grpc", "eventmesh_message","cloud_events"]
+eventmesh_message = []
+cloud_events = []
+tls = []
+grpc = []
 
 # See more keys and their definitions at 
https://doc.rust-lang.org/cargo/reference/manifest.html
 [dependencies]
@@ -57,6 +58,9 @@ serde_json = "1.0"
 tracing = "0.1"
 tracing-subscriber = "0.3"
 
+#cloudEvents
+cloudevents-sdk = "0.7.0"
+
 # tools crate
 thiserror = "1.0"
 bytes = "1"
@@ -65,6 +69,7 @@ uuid = { version = "1.4.1", features = ["v4"] }
 local-ip-address = "0.5.6"
 futures = "0.3"
 log = "0.4.20"
+chrono = "0.4"
 
 [build-dependencies]
 tonic-build = "0.10"
@@ -72,9 +77,9 @@ tonic-build = "0.10"
 [[example]]
 name = "producer_example"
 path = "examples/grpc/producer_example.rs"
-required-features = ["grpc","eventmesh_message"]
+required-features = ["grpc", "eventmesh_message","cloud_events"]
 
 [[example]]
 name = "consumer_example"
 path = "examples/grpc/consumer_example.rs"
-required-features = ["grpc","eventmesh_message"]
\ No newline at end of file
+required-features = ["grpc", "eventmesh_message"]
\ No newline at end of file
diff --git 
a/eventmesh-sdks/eventmesh-sdk-rust/examples/grpc/producer_example.rs 
b/eventmesh-sdks/eventmesh-sdk-rust/examples/grpc/producer_example.rs
index bf21bd08c..959c1a6a0 100644
--- a/eventmesh-sdks/eventmesh-sdk-rust/examples/grpc/producer_example.rs
+++ b/eventmesh-sdks/eventmesh-sdk-rust/examples/grpc/producer_example.rs
@@ -15,11 +15,15 @@
  * limitations under the License.
  */
 use std::time::{SystemTime, UNIX_EPOCH};
+
+use chrono::Utc;
+use cloudevents::{EventBuilder, EventBuilderV10};
 use tracing::info;
 
+use eventmesh::common::ProtocolKey;
 use eventmesh::config::EventMeshGrpcClientConfig;
 use eventmesh::grpc::grpc_producer::EventMeshGrpcProducer;
-use eventmesh::grpc::GrpcEventMeshMessageProducer;
+use eventmesh::grpc::GrpcEventMeshProducer;
 use eventmesh::log;
 use eventmesh::model::message::EventMeshMessage;
 
@@ -27,35 +31,72 @@ use eventmesh::model::message::EventMeshMessage;
 async fn main() -> Result<(), Box<dyn std::error::Error>> {
     log::init_logger();
 
-    let grpc_client_config = EventMeshGrpcClientConfig::new();
-    let mut producer = GrpcEventMeshMessageProducer::new(grpc_client_config);
-
     //Publish Message
-    info!("Publish Message to EventMesh........");
-    let message = EventMeshMessage::default()
-        .with_biz_seq_no("1")
-        .with_content("123")
-        
.with_create_time(SystemTime::now().duration_since(UNIX_EPOCH)?.as_millis() as 
u64)
-        .with_topic("123")
-        .with_unique_id("1111");
-    let response = producer.publish(message.clone()).await?;
-    info!("Publish Message to EventMesh return result: {}", response);
-
-    //Publish batch message
-    info!("Publish batch message to EventMesh........");
-    let messages = vec![message.clone(), message.clone(), message.clone()];
-    let response = producer.publish_batch(messages).await?;
-    info!(
-        "Publish batch message to EventMesh return result: {}",
-        response
-    );
-
-    //Publish batch message
-    info!("Publish request reply message to EventMesh........");
-    let response = producer.request_reply(message.clone(), 1000).await?;
-    info!(
-        "Publish request reply message to EventMesh return result: {}",
-        response
-    );
+    #[cfg(feature = "eventmesh_message")]
+    {
+        let grpc_client_config = EventMeshGrpcClientConfig::new();
+        let mut producer = GrpcEventMeshProducer::new(grpc_client_config);
+        info!("Publish Message to EventMesh........");
+        let message = EventMeshMessage::default()
+            .with_biz_seq_no("1")
+            .with_content("123")
+            
.with_create_time(SystemTime::now().duration_since(UNIX_EPOCH)?.as_millis() as 
u64)
+            .with_topic("123")
+            .with_unique_id("1111");
+        let response = producer.publish(message.clone()).await?;
+        info!("Publish Message to EventMesh return result: {}", response);
+
+        //Publish batch message
+        info!("Publish batch message to EventMesh........");
+        let messages = vec![message.clone(), message.clone(), message.clone()];
+        let response = producer.publish_batch(messages).await?;
+        info!(
+            "Publish batch message to EventMesh return result: {}",
+            response
+        );
+
+        //Publish batch message
+        info!("Publish request reply message to EventMesh........");
+        let response = producer.request_reply(message.clone(), 1000).await?;
+        info!(
+            "Publish request reply message to EventMesh return result: {}",
+            response
+        );
+    }
+
+    #[cfg(feature = "cloud_events")]
+    {
+        let grpc_client_config = EventMeshGrpcClientConfig::new();
+        let mut producer = GrpcEventMeshProducer::new(grpc_client_config);
+        info!("Publish Message to EventMesh........");
+        let message = EventBuilderV10::new()
+            .id("my_event.my_application")
+            .source("http://localhost:8080";)
+            .subject("mxsm")
+            .ty("example.demo")
+            .time(Utc::now())
+            .data(ProtocolKey::CLOUDEVENT_CONTENT_TYPE, "{\"aaa\":\"1111\"}")
+            .build()?;
+        let response = producer.publish(message.clone()).await?;
+        info!("Publish Message to EventMesh return result: {}", response);
+
+        //Publish batch message
+        info!("Publish batch message to EventMesh........");
+        let messages = vec![message.clone(), message.clone(), message.clone()];
+        let response = producer.publish_batch(messages).await?;
+        info!(
+            "Publish batch message to EventMesh return result: {}",
+            response
+        );
+
+        //Publish batch message
+        info!("Publish request reply message to EventMesh........");
+        let response = producer.request_reply(message.clone(), 1000).await?;
+        info!(
+            "Publish request reply message to EventMesh return result: {}",
+            response
+        );
+    }
+
     Ok(())
 }
diff --git a/eventmesh-sdks/eventmesh-sdk-rust/rust-toolchain.toml 
b/eventmesh-sdks/eventmesh-sdk-rust/rust-toolchain.toml
index 15d04a135..50096cb30 100644
--- a/eventmesh-sdks/eventmesh-sdk-rust/rust-toolchain.toml
+++ b/eventmesh-sdks/eventmesh-sdk-rust/rust-toolchain.toml
@@ -18,4 +18,4 @@
 
 [toolchain]
 # TODO: we can remove this toolchain file when AFIT and RPITIT hits stable.
-channel = "nightly"
+channel = "nightly-x86_64-pc-windows-msvc"
diff --git a/eventmesh-sdks/eventmesh-sdk-rust/src/common.rs 
b/eventmesh-sdks/eventmesh-sdk-rust/src/common.rs
index 1dabb9d28..7b3ea0d01 100644
--- a/eventmesh-sdks/eventmesh-sdk-rust/src/common.rs
+++ b/eventmesh-sdks/eventmesh-sdk-rust/src/common.rs
@@ -20,7 +20,7 @@
 pub mod constants;
 
 /// Eventmesh message utilities.
-pub mod eventmesh_message_utils;
+pub mod grpc_eventmesh_message_utils;
 
 /// Local IP helper.
 pub(crate) mod local_ip;
diff --git 
a/eventmesh-sdks/eventmesh-sdk-rust/src/common/eventmesh_message_utils.rs 
b/eventmesh-sdks/eventmesh-sdk-rust/src/common/grpc_eventmesh_message_utils.rs
similarity index 69%
rename from 
eventmesh-sdks/eventmesh-sdk-rust/src/common/eventmesh_message_utils.rs
rename to 
eventmesh-sdks/eventmesh-sdk-rust/src/common/grpc_eventmesh_message_utils.rs
index 878b0690b..c1ff9a322 100644
--- a/eventmesh-sdks/eventmesh-sdk-rust/src/common/eventmesh_message_utils.rs
+++ 
b/eventmesh-sdks/eventmesh-sdk-rust/src/common/grpc_eventmesh_message_utils.rs
@@ -20,12 +20,13 @@ use std::collections::{HashMap, HashSet};
 use std::fmt::Debug;
 use std::time::{SystemTime, UNIX_EPOCH};
 
+use cloudevents::Data::String as EventString;
+use cloudevents::{AttributesReader, Data, Event, EventBuilder, 
EventBuilderV10};
 use tonic::transport::Uri;
 
 use crate::common::constants::{DataContentType, SpecVersion, 
DEFAULT_EVENTMESH_MESSAGE_TTL};
 use crate::common::{ProtocolKey, RandomStringUtils};
 use crate::config::EventMeshGrpcClientConfig;
-use crate::model::convert::FromPbCloudEvent;
 use crate::model::message::EventMeshMessage;
 use crate::model::response::EventMeshResponse;
 use crate::model::subscription::SubscriptionItem;
@@ -181,27 +182,28 @@ impl EventMeshCloudEventUtils {
     pub fn build_event_mesh_cloud_event<T>(
         message: T,
         client_config: &EventMeshGrpcClientConfig,
-        protocol_type: EventMeshProtocolType,
     ) -> Option<PbCloudEvent>
     where
-        T: Debug + Any,
+        T: Any,
     {
         let message_any = &message as &dyn Any;
-        match protocol_type {
-            EventMeshProtocolType::CloudEvents => None,
-            EventMeshProtocolType::EventMeshMessage => {
-                if let Some(em_message) = 
message_any.downcast_ref::<EventMeshMessage>() {
-                    
Some(Self::switch_event_mesh_message_2_event_mesh_cloud_event(
-                        em_message,
-                        client_config,
-                        protocol_type,
-                    ))
-                } else {
-                    None
-                }
-            }
-            _ => None,
+
+        if let Some(em_message) = 
message_any.downcast_ref::<EventMeshMessage>() {
+            return 
Some(Self::switch_event_mesh_message_2_event_mesh_cloud_event(
+                em_message,
+                client_config,
+                EventMeshProtocolType::EventMeshMessage,
+            ));
         }
+        if let Some(cloud_event) = message_any.downcast_ref::<Event>() {
+            return Some(Self::switch_cloud_event_2_event_mesh_cloud_event(
+                cloud_event,
+                client_config,
+                EventMeshProtocolType::CloudEvents,
+            ));
+        }
+
+        None
     }
 
     pub fn switch_event_mesh_message_2_event_mesh_cloud_event(
@@ -330,12 +332,127 @@ impl EventMeshCloudEventUtils {
         }
     }
 
-    pub fn build_message_from_event_mesh_cloud_event<T>(
-        cloud_event: &PbCloudEvent,
+    pub fn switch_cloud_event_2_event_mesh_cloud_event(
+        message: &Event,
+        client_config: &EventMeshGrpcClientConfig,
         protocol_type: EventMeshProtocolType,
-    ) -> Option<T>
+    ) -> PbCloudEvent {
+        let mut attribute_value_map =
+            Self::build_common_cloud_event_attributes(client_config, 
protocol_type);
+        let ttl = message
+            .extension(ProtocolKey::TTL)
+            .map_or(DEFAULT_EVENTMESH_MESSAGE_TTL.to_string(), |value| {
+                value.to_string()
+            });
+        let seq_num = message
+            .extension(ProtocolKey::SEQ_NUM)
+            .map_or(RandomStringUtils::generate_num(30), |value| {
+                value.to_string()
+            });
+        let unique_id = message.id().to_string();
+        attribute_value_map
+            .entry(ProtocolKey::DATA_CONTENT_TYPE.to_string())
+            .or_insert_with(|| PbCloudEventAttributeValue {
+                attr: 
Some(PbAttr::CeString(DataContentType::TEXT_PLAIN.to_string())),
+            });
+
+        attribute_value_map.insert(
+            ProtocolKey::TTL.to_string(),
+            PbCloudEventAttributeValue {
+                attr: Some(PbAttr::CeString(ttl.to_string())),
+            },
+        );
+        attribute_value_map.insert(
+            ProtocolKey::SEQ_NUM.to_string(),
+            PbCloudEventAttributeValue {
+                attr: Some(PbAttr::CeString(seq_num.to_string())),
+            },
+        );
+
+        attribute_value_map.insert(
+            ProtocolKey::SEQ_NUM.to_string(),
+            PbCloudEventAttributeValue {
+                attr: Some(PbAttr::CeString(seq_num.to_string())),
+            },
+        );
+
+        attribute_value_map.insert(
+            ProtocolKey::PROTOCOL_DESC.to_string(),
+            PbCloudEventAttributeValue {
+                attr: Some(PbAttr::CeString(
+                    ProtocolKey::PROTOCOL_DESC_GRPC_CLOUD_EVENT.to_string(),
+                )),
+            },
+        );
+
+        attribute_value_map.insert(
+            ProtocolKey::UNIQUE_ID.to_string(),
+            PbCloudEventAttributeValue {
+                attr: Some(PbAttr::CeString(unique_id.to_string())),
+            },
+        );
+        attribute_value_map.insert(
+            ProtocolKey::PRODUCERGROUP.to_string(),
+            PbCloudEventAttributeValue {
+                attr: Some(PbAttr::CeString(
+                    client_config.producer_group.clone().unwrap(),
+                )),
+            },
+        );
+
+        attribute_value_map.insert(
+            ProtocolKey::SUBJECT.to_string(),
+            PbCloudEventAttributeValue {
+                attr: 
Some(PbAttr::CeString(message.subject().unwrap().to_string())),
+            },
+        );
+
+        attribute_value_map.insert(
+            ProtocolKey::DATA_CONTENT_TYPE.to_string(),
+            PbCloudEventAttributeValue {
+                attr: 
Some(PbAttr::CeString(DataContentType::TEXT_PLAIN.to_string())),
+            },
+        );
+        message.iter_extensions().for_each(|(key, value)| {
+            attribute_value_map.insert(
+                key.to_string(),
+                PbCloudEventAttributeValue {
+                    attr: Some(PbAttr::CeString(value.to_string())),
+                },
+            );
+        });
+
+        let data = {
+            if let Some(content) = message.data() {
+                match content {
+                    Data::Binary(bytes) => 
Some(PbData::ProtoData(prost_types::Any {
+                        type_url: String::from(""),
+                        value: bytes.clone(),
+                    })),
+                    EventString(string) => 
Some(PbData::TextData(string.clone())),
+                    Data::Json(_json) => None,
+                }
+            } else {
+                None
+            }
+        };
+        PbCloudEvent {
+            id: RandomStringUtils::generate_uuid(),
+            source: Uri::builder()
+                .path_and_query("/")
+                .build()
+                .unwrap()
+                .to_string(),
+            spec_version: SpecVersion::V1.to_string(),
+            r#type: Self::CLOUD_EVENT_TYPE.to_string(),
+            attributes: attribute_value_map,
+            data,
+        }
+    }
+
+    pub fn build_message_from_event_mesh_cloud_event<T>(cloud_event: 
&PbCloudEvent) -> Option<T>
     where
-        T: FromPbCloudEvent<T>,
+        T: Any + Debug + From<PbCloudEvent>,
     {
         let seq = EventMeshCloudEventUtils::get_seq_num(cloud_event);
         let unique_id = EventMeshCloudEventUtils::get_unique_id(cloud_event);
@@ -343,11 +460,7 @@ impl EventMeshCloudEventUtils {
         if seq.is_empty() || unique_id.is_empty() {
             return None;
         }
-        match protocol_type {
-            EventMeshProtocolType::CloudEvents => None,
-            EventMeshProtocolType::EventMeshMessage => 
T::from_pb_cloud_event(cloud_event),
-            _ => None,
-        }
+        Some(T::from(cloud_event.clone()))
     }
 
     pub(crate) fn switch_event_mesh_cloud_event_2_event_mesh_message(
@@ -376,6 +489,51 @@ impl EventMeshCloudEventUtils {
         }
     }
 
+    pub(crate) fn switch_event_mesh_cloud_event_2_cloud_event(cloud_event: 
PbCloudEvent) -> Event {
+        let topic = EventMeshCloudEventUtils::get_subject(&cloud_event);
+        let unique_id = EventMeshCloudEventUtils::get_unique_id(&cloud_event);
+        let content = EventMeshCloudEventUtils::get_text_data(&cloud_event);
+        let source = EventMeshCloudEventUtils::get_source(&cloud_event);
+
+        let mut builder = EventBuilderV10::new()
+            .id(unique_id)
+            .subject(topic)
+            .source(source)
+            .ty(ProtocolKey::CLOUD_EVENTS_PROTOCOL_NAME)
+            .data(DataContentType::JSON, content);
+
+        for (key, value) in cloud_event.attributes {
+            builder = builder.extension(key.as_str(), 
value.attr.clone().unwrap().to_string());
+        }
+
+        builder.build().unwrap()
+    }
+
+    #[allow(dead_code)]
+    pub(crate) fn switch_cloud_event_2_event_mesh_message(cloud_event: Event) 
-> EventMeshMessage {
+        let mut prop = HashMap::new();
+        cloud_event.iter_attributes().for_each(|(key, value)| {
+            prop.insert(key.to_string(), value.to_string());
+        });
+        let topic = cloud_event.subject().unwrap().to_string();
+        let biz_seq_no = cloud_event
+            .extension(ProtocolKey::SEQ_NUM)
+            .unwrap()
+            .to_string();
+        let unique_id = cloud_event.id().to_string();
+        let content = cloud_event.data().unwrap().to_string();
+        EventMeshMessage {
+            biz_seq_no: Some(biz_seq_no),
+            unique_id: Some(unique_id),
+            topic: Some(topic),
+            content: Some(content),
+            prop,
+            create_time: SystemTime::now()
+                .duration_since(UNIX_EPOCH)
+                .map_or_else(|_err| 0u64, |time| time.as_millis() as u64),
+        }
+    }
+
     pub fn get_seq_num(cloud_event: &PbCloudEvent) -> String {
         cloud_event
             .attributes
@@ -444,6 +602,18 @@ impl EventMeshCloudEventUtils {
             .to_string()
     }
 
+    pub fn get_source(cloud_event: &PbCloudEvent) -> String {
+        cloud_event.attributes.get(ProtocolKey::SOURCE).map_or_else(
+            || String::new(),
+            |ce| {
+                ce.attr
+                    .clone()
+                    .unwrap_or(PbAttr::CeString(String::new()))
+                    .to_string()
+            },
+        )
+    }
+
     pub fn get_response(cloud_event: &PbCloudEvent) -> EventMeshResponse {
         let code = cloud_event
             .attributes
diff --git a/eventmesh-sdks/eventmesh-sdk-rust/src/common/protocol_key.rs 
b/eventmesh-sdks/eventmesh-sdk-rust/src/common/protocol_key.rs
index fd4a378f5..048af3019 100644
--- a/eventmesh-sdks/eventmesh-sdk-rust/src/common/protocol_key.rs
+++ b/eventmesh-sdks/eventmesh-sdk-rust/src/common/protocol_key.rs
@@ -57,4 +57,8 @@ impl ProtocolKey {
 
     //protocol desc
     pub const PROTOCOL_DESC_GRPC_CLOUD_EVENT: &'static str = 
"grpc-cloud-event";
+
+    pub const CLOUD_EVENTS_PROTOCOL_NAME: &'static str = "cloudevents";
+
+    pub const CLOUDEVENT_CONTENT_TYPE: &'static str = 
"application/cloudevents+json";
 }
diff --git a/eventmesh-sdks/eventmesh-sdk-rust/src/grpc.rs 
b/eventmesh-sdks/eventmesh-sdk-rust/src/grpc.rs
index b053fe0e6..30046f8c5 100644
--- a/eventmesh-sdks/eventmesh-sdk-rust/src/grpc.rs
+++ b/eventmesh-sdks/eventmesh-sdk-rust/src/grpc.rs
@@ -18,7 +18,7 @@
 //! gRPC client implementations.
 
 /// EventMesh message types.
-pub(crate) mod eventmesh_message;
+pub(crate) mod r#impl;
 
 /// gRPC consumer client.
 pub mod grpc_consumer;
@@ -29,6 +29,6 @@ pub mod grpc_producer;
 /// Protobuf generated definitions.
 pub(crate) mod pb;
 
-#[cfg(all(feature = "eventmesh_message", feature = "grpc"))]
+#[cfg(feature = "grpc")]
 /// Re-export gRPC eventmesh message producer when features enabled.
-pub use 
crate::grpc::eventmesh_message::eventmesh_message_producer::GrpcEventMeshMessageProducer;
+pub use crate::grpc::r#impl::grpc_producer_impl::GrpcEventMeshProducer;
diff --git a/eventmesh-sdks/eventmesh-sdk-rust/src/grpc/grpc_consumer.rs 
b/eventmesh-sdks/eventmesh-sdk-rust/src/grpc/grpc_consumer.rs
index 1beee1a88..bbdd48e4b 100644
--- a/eventmesh-sdks/eventmesh-sdk-rust/src/grpc/grpc_consumer.rs
+++ b/eventmesh-sdks/eventmesh-sdk-rust/src/grpc/grpc_consumer.rs
@@ -23,7 +23,7 @@ use tonic::codegen::tokio_stream::StreamExt;
 use tracing::error;
 
 use crate::common::constants::{DataContentType, SDK_STREAM_URL};
-use crate::common::eventmesh_message_utils::EventMeshCloudEventUtils;
+use crate::common::grpc_eventmesh_message_utils::EventMeshCloudEventUtils;
 use crate::common::{ProtocolKey, ReceiveMessageListener};
 use crate::config::EventMeshGrpcClientConfig;
 use crate::error::EventMeshError::{EventMeshLocal, InvalidArgs};
@@ -132,7 +132,7 @@ impl EventMeshGrpcConsumer {
                 let eventmesh_message =
                     
EventMeshCloudEventUtils::build_message_from_event_mesh_cloud_event::<
                         EventMeshMessage,
-                    >(&received, EventMeshProtocolType::EventMeshMessage);
+                    >(&received);
                 if eventmesh_message.is_none() {
                     continue;
                 }
@@ -145,7 +145,7 @@ impl EventMeshGrpcConsumer {
 
                 let handled_msg = 
listener_inner.handle(eventmesh_message.unwrap());
                 if let Ok(msg_option) = handled_msg {
-                    if let Some(msg) = msg_option {
+                    if let Some(_msg) = msg_option {
                         let properties = HashMap::<String, String>::new();
                         let reply = SubscriptionReply::new(
                             EventMeshCloudEventUtils::get_subject(&received),
diff --git a/eventmesh-sdks/eventmesh-sdk-rust/src/grpc/grpc_producer.rs 
b/eventmesh-sdks/eventmesh-sdk-rust/src/grpc/grpc_producer.rs
index ecd8e9e96..867588fb8 100644
--- a/eventmesh-sdks/eventmesh-sdk-rust/src/grpc/grpc_producer.rs
+++ b/eventmesh-sdks/eventmesh-sdk-rust/src/grpc/grpc_producer.rs
@@ -17,16 +17,23 @@
 //! Trait for gRPC eventmesh producer.
 
 use crate::model::response::EventMeshResponse;
+use std::future::Future;
 
 /// Trait for gRPC eventmesh producer.
-#[tonic::async_trait]
 pub trait EventMeshGrpcProducer<M> {
     /// Publish a message.
-    async fn publish(&mut self, message: M) -> 
crate::Result<EventMeshResponse>;
+    fn publish(&mut self, message: M) -> impl Future<Output = 
crate::Result<EventMeshResponse>>;
 
     /// Publish a batch of messages.
-    async fn publish_batch(&mut self, messages: Vec<M>) -> 
crate::Result<EventMeshResponse>;
+    fn publish_batch(
+        &mut self,
+        messages: Vec<M>,
+    ) -> impl Future<Output = crate::Result<EventMeshResponse>>;
 
     /// Request reply for a message.
-    async fn request_reply(&mut self, message: M, time_out: u64) -> 
crate::Result<M>;
+    fn request_reply(
+        &mut self,
+        message: M,
+        time_out: u64,
+    ) -> impl Future<Output = crate::Result<M>>;
 }
diff --git a/eventmesh-sdks/eventmesh-sdk-rust/src/grpc/eventmesh_message.rs 
b/eventmesh-sdks/eventmesh-sdk-rust/src/grpc/impl.rs
similarity index 89%
copy from eventmesh-sdks/eventmesh-sdk-rust/src/grpc/eventmesh_message.rs
copy to eventmesh-sdks/eventmesh-sdk-rust/src/grpc/impl.rs
index a954c46de..edb47941c 100644
--- a/eventmesh-sdks/eventmesh-sdk-rust/src/grpc/eventmesh_message.rs
+++ b/eventmesh-sdks/eventmesh-sdk-rust/src/grpc/impl.rs
@@ -15,5 +15,5 @@
  * limitations under the License.
  */
 
-#[cfg(all(feature = "eventmesh_message", feature = "grpc"))]
-pub mod eventmesh_message_producer;
+#[cfg(feature = "grpc")]
+pub mod grpc_producer_impl;
diff --git 
a/eventmesh-sdks/eventmesh-sdk-rust/src/grpc/eventmesh_message/eventmesh_message_producer.rs
 b/eventmesh-sdks/eventmesh-sdk-rust/src/grpc/impl/grpc_producer_impl.rs
similarity index 77%
rename from 
eventmesh-sdks/eventmesh-sdk-rust/src/grpc/eventmesh_message/eventmesh_message_producer.rs
rename to eventmesh-sdks/eventmesh-sdk-rust/src/grpc/impl/grpc_producer_impl.rs
index 40b3bee34..f81c3f65e 100644
--- 
a/eventmesh-sdks/eventmesh-sdk-rust/src/grpc/eventmesh_message/eventmesh_message_producer.rs
+++ b/eventmesh-sdks/eventmesh-sdk-rust/src/grpc/impl/grpc_producer_impl.rs
@@ -14,13 +14,14 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
-#![cfg(all(feature = "eventmesh_message", feature = "grpc"))]
+use std::any::Any;
+use std::fmt::Debug;
+use std::marker::PhantomData;
 
 use tonic::transport::Uri;
 
 use crate::common::constants::{DataContentType, DEFAULT_EVENTMESH_MESSAGE_TTL};
-use crate::common::eventmesh_message_utils::EventMeshCloudEventUtils;
+use crate::common::grpc_eventmesh_message_utils::EventMeshCloudEventUtils;
 use crate::config::EventMeshGrpcClientConfig;
 use crate::error::EventMeshError;
 use crate::grpc::grpc_producer::EventMeshGrpcProducer;
@@ -33,23 +34,30 @@ use crate::proto_cloud_event::{
 };
 
 /// gRPC EventMesh message producer.
-pub struct GrpcEventMeshMessageProducer {
+pub struct GrpcEventMeshProducer<M> {
     /// gRPC client.
     inner: GrpcClient,
 
     /// gRPC configuration.
     grpc_config: EventMeshGrpcClientConfig,
+
+    _mark: PhantomData<M>,
 }
 
-impl GrpcEventMeshMessageProducer {
+impl<M> GrpcEventMeshProducer<M>
+where
+    M: Any,
+{
     pub fn new(grpc_config: EventMeshGrpcClientConfig) -> Self {
         let client = GrpcClient::new(&grpc_config).unwrap();
         Self {
             inner: client,
             grpc_config,
+            _mark: PhantomData::<M>,
         }
     }
 
+    #[allow(dead_code)]
     fn build_event_mesh_cloud_event(&mut self, message: EventMeshMessage) -> 
PbCloudEvent {
         let mut event = EventMeshCloudEventBuilder::default()
             .with_env(self.grpc_config.env.clone())
@@ -87,7 +95,7 @@ impl GrpcEventMeshMessageProducer {
 
     fn build_event_mesh_cloud_event_batch(
         &mut self,
-        messages: Vec<EventMeshMessage>,
+        messages: Vec<M>,
     ) -> Option<PbCloudEventBatch> {
         if messages.is_empty() {
             return None;
@@ -95,7 +103,10 @@ impl GrpcEventMeshMessageProducer {
 
         let events = messages
             .into_iter()
-            .map(|msg| self.build_event_mesh_cloud_event(msg))
+            .map(|msg| {
+                EventMeshCloudEventUtils::build_event_mesh_cloud_event(msg, 
&self.grpc_config)
+                    .unwrap()
+            })
             .collect();
 
         let mut cloud_event_batch = PbCloudEventBatch::default();
@@ -107,15 +118,14 @@ impl GrpcEventMeshMessageProducer {
 
 /// gRPC EventMesh message producer implementation.
 #[allow(unused_variables)]
-#[tonic::async_trait]
-impl EventMeshGrpcProducer<EventMeshMessage> for GrpcEventMeshMessageProducer {
+impl<M> EventMeshGrpcProducer<M> for GrpcEventMeshProducer<M>
+where
+    M: Any + Debug + From<PbCloudEvent>,
+{
     /// Publish a message.
-    async fn publish(&mut self, message: EventMeshMessage) -> 
crate::Result<EventMeshResponse> {
-        let event = EventMeshCloudEventUtils::build_event_mesh_cloud_event(
-            message,
-            &self.grpc_config,
-            EventMeshProtocolType::EventMeshMessage,
-        );
+    async fn publish(&mut self, message: M) -> 
crate::Result<EventMeshResponse> {
+        let event =
+            EventMeshCloudEventUtils::build_event_mesh_cloud_event(message, 
&self.grpc_config);
         if event.is_none() {
             return Err(EventMeshError::EventMeshLocal(
                 "Create Event Mesh cloud event Error".to_string(),
@@ -127,10 +137,7 @@ impl EventMeshGrpcProducer<EventMeshMessage> for 
GrpcEventMeshMessageProducer {
     }
 
     /// Publish a batch of messages.
-    async fn publish_batch(
-        &mut self,
-        messages: Vec<EventMeshMessage>,
-    ) -> crate::Result<EventMeshResponse> {
+    async fn publish_batch(&mut self, messages: Vec<M>) -> 
crate::Result<EventMeshResponse> {
         let events = self.build_event_mesh_cloud_event_batch(messages);
         if events.is_none() {
             return Err(EventMeshError::EventMeshLocal("Vec is 
empty".to_string()).into());
@@ -140,19 +147,13 @@ impl EventMeshGrpcProducer<EventMeshMessage> for 
GrpcEventMeshMessageProducer {
     }
 
     /// Request reply for a message.
-    async fn request_reply(
-        &mut self,
-        message: EventMeshMessage,
-        time_out: u64,
-    ) -> crate::Result<EventMeshMessage> {
-        let event = self.build_event_mesh_cloud_event(message);
-        let result = self.inner.request_reply_inner(event, time_out).await?;
-        Ok(
-            
EventMeshCloudEventUtils::build_message_from_event_mesh_cloud_event(
-                &result,
-                EventMeshProtocolType::EventMeshMessage,
-            )
-            .unwrap(),
-        )
+    async fn request_reply(&mut self, message: M, time_out: u64) -> 
crate::Result<M> {
+        let event =
+            EventMeshCloudEventUtils::build_event_mesh_cloud_event(message, 
&self.grpc_config);
+        let result = self
+            .inner
+            .request_reply_inner(event.unwrap(), time_out)
+            .await?;
+        
Ok(EventMeshCloudEventUtils::build_message_from_event_mesh_cloud_event(&result).unwrap())
     }
 }
diff --git a/eventmesh-sdks/eventmesh-sdk-rust/src/lib.rs 
b/eventmesh-sdks/eventmesh-sdk-rust/src/lib.rs
index 452403393..a03f8cfa2 100644
--- a/eventmesh-sdks/eventmesh-sdk-rust/src/lib.rs
+++ b/eventmesh-sdks/eventmesh-sdk-rust/src/lib.rs
@@ -19,7 +19,6 @@
 
 /// Re-export eventmesh main.
 pub use eventmesh::main;
-
 /// Re-export eventmesh as tokio.
 pub use tokio as eventmesh;
 
@@ -51,19 +50,20 @@ pub mod model;
 
 /// Module contains Protobuf CloudEvent related types and builder.
 pub mod proto_cloud_event {
-    use crate::common::ProtocolKey;
+    use cloudevents::Event;
 
+    use crate::common::ProtocolKey;
     /// Protobuf CloudEvent attribute value enum.    
     pub use 
crate::grpc::pb::cloud_events::cloud_event::cloud_event_attribute_value::Attr 
as PbAttr;
     use 
crate::grpc::pb::cloud_events::cloud_event::cloud_event_attribute_value::Attr;
     pub use 
crate::grpc::pb::cloud_events::cloud_event::CloudEventAttributeValue as 
PbCloudEventAttributeValue;
     pub use crate::grpc::pb::cloud_events::cloud_event::Data as PbData;
     use crate::grpc::pb::cloud_events::cloud_event::{CloudEventAttributeValue, 
Data};
-
     /// Protobuf CloudEvent message.
     pub use crate::grpc::pb::cloud_events::{
         CloudEvent as PbCloudEvent, CloudEventBatch as PbCloudEventBatch,
     };
+    use crate::model::message::EventMeshMessage;
 
     impl ToString for PbAttr {
         /// Convert Protobuf attribute to String.
@@ -80,6 +80,18 @@ pub mod proto_cloud_event {
         }
     }
 
+    impl From<EventMeshMessage> for PbCloudEvent {
+        fn from(_value: EventMeshMessage) -> Self {
+            todo!()
+        }
+    }
+
+    impl From<Event> for PbCloudEvent {
+        fn from(_value: Event) -> Self {
+            todo!()
+        }
+    }
+
     impl ToString for PbData {
         /// Convert Protobuf data to String.
         fn to_string(&self) -> String {
diff --git a/eventmesh-sdks/eventmesh-sdk-rust/src/model.rs 
b/eventmesh-sdks/eventmesh-sdk-rust/src/model.rs
index 95c722afe..299c11147 100644
--- a/eventmesh-sdks/eventmesh-sdk-rust/src/model.rs
+++ b/eventmesh-sdks/eventmesh-sdk-rust/src/model.rs
@@ -53,5 +53,6 @@ impl EventMeshProtocolType {
 pub mod message;
 
 pub(crate) mod convert;
+pub mod event_clouds;
 pub(crate) mod response;
 pub mod subscription;
diff --git a/eventmesh-sdks/eventmesh-sdk-rust/src/grpc/eventmesh_message.rs 
b/eventmesh-sdks/eventmesh-sdk-rust/src/model/event_clouds.rs
similarity index 70%
rename from eventmesh-sdks/eventmesh-sdk-rust/src/grpc/eventmesh_message.rs
rename to eventmesh-sdks/eventmesh-sdk-rust/src/model/event_clouds.rs
index a954c46de..324ea00a9 100644
--- a/eventmesh-sdks/eventmesh-sdk-rust/src/grpc/eventmesh_message.rs
+++ b/eventmesh-sdks/eventmesh-sdk-rust/src/model/event_clouds.rs
@@ -15,5 +15,14 @@
  * limitations under the License.
  */
 
-#[cfg(all(feature = "eventmesh_message", feature = "grpc"))]
-pub mod eventmesh_message_producer;
+ use crate::common::grpc_eventmesh_message_utils::EventMeshCloudEventUtils;
+ use cloudevents::Event;
+ 
+ use crate::proto_cloud_event::PbCloudEvent;
+ 
+ impl From<PbCloudEvent> for Event {
+     fn from(value: PbCloudEvent) -> Self {
+         
EventMeshCloudEventUtils::switch_event_mesh_cloud_event_2_cloud_event(value)
+     }
+ }
+ 
\ No newline at end of file
diff --git a/eventmesh-sdks/eventmesh-sdk-rust/src/model/message.rs 
b/eventmesh-sdks/eventmesh-sdk-rust/src/model/message.rs
index a3a0f7be8..361684063 100644
--- a/eventmesh-sdks/eventmesh-sdk-rust/src/model/message.rs
+++ b/eventmesh-sdks/eventmesh-sdk-rust/src/model/message.rs
@@ -21,9 +21,10 @@ use std::collections::HashMap;
 use std::fmt;
 use std::time::{SystemTime, UNIX_EPOCH};
 
+use cloudevents::Event;
 use serde::{Deserialize, Serialize};
 
-use crate::common::eventmesh_message_utils::EventMeshCloudEventUtils;
+use crate::common::grpc_eventmesh_message_utils::EventMeshCloudEventUtils;
 use crate::model::convert::FromPbCloudEvent;
 use crate::proto_cloud_event::PbCloudEvent;
 
@@ -145,6 +146,19 @@ impl FromPbCloudEvent<EventMeshMessage> for 
EventMeshMessage {
     }
 }
 
+impl From<PbCloudEvent> for EventMeshMessage {
+    fn from(value: PbCloudEvent) -> Self {
+        
EventMeshCloudEventUtils::switch_event_mesh_cloud_event_2_event_mesh_message(&value)
+    }
+}
+
+#[cfg(feature = "cloud_events")]
+impl From<Event> for EventMeshMessage {
+    fn from(value: Event) -> Self {
+        
EventMeshCloudEventUtils::switch_cloud_event_2_event_mesh_message(value)
+    }
+}
+
 #[cfg(test)]
 mod tests {
     use super::*;
diff --git 
a/eventmesh-sdks/eventmesh-sdk-rust/tests/eventmesh_message_utils_test.rs 
b/eventmesh-sdks/eventmesh-sdk-rust/tests/eventmesh_message_utils_test.rs
index 10c171090..fc523880c 100644
--- a/eventmesh-sdks/eventmesh-sdk-rust/tests/eventmesh_message_utils_test.rs
+++ b/eventmesh-sdks/eventmesh-sdk-rust/tests/eventmesh_message_utils_test.rs
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-use eventmesh::common::eventmesh_message_utils::{EventMeshCloudEventUtils, 
ProtoSupport};
+use 
eventmesh::common::grpc_eventmesh_message_utils::{EventMeshCloudEventUtils, 
ProtoSupport};
 use eventmesh::config::EventMeshGrpcClientConfig;
 use eventmesh::model::EventMeshProtocolType;
 use eventmesh::proto_cloud_event::PbAttr;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to