This is an automated email from the ASF dual-hosted git repository.
lrhkobe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/eventmesh.git
The following commit(s) were added to refs/heads/master by this push:
new 40a2aea81 [ISSUE #4519]Rust sdk support CloudEvents (#4520)
40a2aea81 is described below
commit 40a2aea812e75895e70e42be99c3736bcfcb10b1
Author: mxsm <[email protected]>
AuthorDate: Thu Nov 2 10:00:23 2023 +0800
[ISSUE #4519]Rust sdk support CloudEvents (#4520)
* [ISSUE #4519]Rust sdk support CloudEvents
* optimize code
* fix complie error
---
eventmesh-sdks/eventmesh-sdk-rust/Cargo.toml | 23 ++-
.../examples/grpc/producer_example.rs | 101 +++++++---
.../eventmesh-sdk-rust/rust-toolchain.toml | 2 +-
eventmesh-sdks/eventmesh-sdk-rust/src/common.rs | 2 +-
...ge_utils.rs => grpc_eventmesh_message_utils.rs} | 222 ++++++++++++++++++---
.../eventmesh-sdk-rust/src/common/protocol_key.rs | 4 +
eventmesh-sdks/eventmesh-sdk-rust/src/grpc.rs | 6 +-
.../eventmesh-sdk-rust/src/grpc/grpc_consumer.rs | 6 +-
.../eventmesh-sdk-rust/src/grpc/grpc_producer.rs | 15 +-
.../src/grpc/{eventmesh_message.rs => impl.rs} | 4 +-
.../grpc_producer_impl.rs} | 67 ++++---
eventmesh-sdks/eventmesh-sdk-rust/src/lib.rs | 18 +-
eventmesh-sdks/eventmesh-sdk-rust/src/model.rs | 1 +
.../eventmesh_message.rs => model/event_clouds.rs} | 13 +-
.../eventmesh-sdk-rust/src/model/message.rs | 16 +-
.../tests/eventmesh_message_utils_test.rs | 2 +-
16 files changed, 383 insertions(+), 119 deletions(-)
diff --git a/eventmesh-sdks/eventmesh-sdk-rust/Cargo.toml
b/eventmesh-sdks/eventmesh-sdk-rust/Cargo.toml
index 0dd9fa939..55b4881a4 100644
--- a/eventmesh-sdks/eventmesh-sdk-rust/Cargo.toml
+++ b/eventmesh-sdks/eventmesh-sdk-rust/Cargo.toml
@@ -26,15 +26,16 @@ description = "Rust client for Apache EventMesh"
license = "Apache-2.0"
keywords = ["EventMesh", "SDK", "rust-client", "rust", "eventmesh-rust-sdk"]
readme = "./README.md"
-homepage = "https://github.com/apache/eventmesh"
-repository = "https://github.com/apache/eventmesh"
+homepage = "https://github.com/apache/eventmesh"
+repository = "https://github.com/apache/eventmesh"
[features]
-default = ["grpc","eventmesh_message"]
-full = ["grpc","eventmesh_message"]
-eventmesh_message=[]
-tls=[]
-grpc=[]
+default = ["grpc", "eventmesh_message"]
+full = ["grpc", "eventmesh_message","cloud_events"]
+eventmesh_message = []
+cloud_events = []
+tls = []
+grpc = []
# See more keys and their definitions at
https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
@@ -57,6 +58,9 @@ serde_json = "1.0"
tracing = "0.1"
tracing-subscriber = "0.3"
+#cloudEvents
+cloudevents-sdk = "0.7.0"
+
# tools crate
thiserror = "1.0"
bytes = "1"
@@ -65,6 +69,7 @@ uuid = { version = "1.4.1", features = ["v4"] }
local-ip-address = "0.5.6"
futures = "0.3"
log = "0.4.20"
+chrono = "0.4"
[build-dependencies]
tonic-build = "0.10"
@@ -72,9 +77,9 @@ tonic-build = "0.10"
[[example]]
name = "producer_example"
path = "examples/grpc/producer_example.rs"
-required-features = ["grpc","eventmesh_message"]
+required-features = ["grpc", "eventmesh_message","cloud_events"]
[[example]]
name = "consumer_example"
path = "examples/grpc/consumer_example.rs"
-required-features = ["grpc","eventmesh_message"]
\ No newline at end of file
+required-features = ["grpc", "eventmesh_message"]
\ No newline at end of file
diff --git
a/eventmesh-sdks/eventmesh-sdk-rust/examples/grpc/producer_example.rs
b/eventmesh-sdks/eventmesh-sdk-rust/examples/grpc/producer_example.rs
index bf21bd08c..959c1a6a0 100644
--- a/eventmesh-sdks/eventmesh-sdk-rust/examples/grpc/producer_example.rs
+++ b/eventmesh-sdks/eventmesh-sdk-rust/examples/grpc/producer_example.rs
@@ -15,11 +15,15 @@
* limitations under the License.
*/
use std::time::{SystemTime, UNIX_EPOCH};
+
+use chrono::Utc;
+use cloudevents::{EventBuilder, EventBuilderV10};
use tracing::info;
+use eventmesh::common::ProtocolKey;
use eventmesh::config::EventMeshGrpcClientConfig;
use eventmesh::grpc::grpc_producer::EventMeshGrpcProducer;
-use eventmesh::grpc::GrpcEventMeshMessageProducer;
+use eventmesh::grpc::GrpcEventMeshProducer;
use eventmesh::log;
use eventmesh::model::message::EventMeshMessage;
@@ -27,35 +31,72 @@ use eventmesh::model::message::EventMeshMessage;
async fn main() -> Result<(), Box<dyn std::error::Error>> {
log::init_logger();
- let grpc_client_config = EventMeshGrpcClientConfig::new();
- let mut producer = GrpcEventMeshMessageProducer::new(grpc_client_config);
-
//Publish Message
- info!("Publish Message to EventMesh........");
- let message = EventMeshMessage::default()
- .with_biz_seq_no("1")
- .with_content("123")
-
.with_create_time(SystemTime::now().duration_since(UNIX_EPOCH)?.as_millis() as
u64)
- .with_topic("123")
- .with_unique_id("1111");
- let response = producer.publish(message.clone()).await?;
- info!("Publish Message to EventMesh return result: {}", response);
-
- //Publish batch message
- info!("Publish batch message to EventMesh........");
- let messages = vec![message.clone(), message.clone(), message.clone()];
- let response = producer.publish_batch(messages).await?;
- info!(
- "Publish batch message to EventMesh return result: {}",
- response
- );
-
- //Publish batch message
- info!("Publish request reply message to EventMesh........");
- let response = producer.request_reply(message.clone(), 1000).await?;
- info!(
- "Publish request reply message to EventMesh return result: {}",
- response
- );
+ #[cfg(feature = "eventmesh_message")]
+ {
+ let grpc_client_config = EventMeshGrpcClientConfig::new();
+ let mut producer = GrpcEventMeshProducer::new(grpc_client_config);
+ info!("Publish Message to EventMesh........");
+ let message = EventMeshMessage::default()
+ .with_biz_seq_no("1")
+ .with_content("123")
+
.with_create_time(SystemTime::now().duration_since(UNIX_EPOCH)?.as_millis() as
u64)
+ .with_topic("123")
+ .with_unique_id("1111");
+ let response = producer.publish(message.clone()).await?;
+ info!("Publish Message to EventMesh return result: {}", response);
+
+ //Publish batch message
+ info!("Publish batch message to EventMesh........");
+ let messages = vec![message.clone(), message.clone(), message.clone()];
+ let response = producer.publish_batch(messages).await?;
+ info!(
+ "Publish batch message to EventMesh return result: {}",
+ response
+ );
+
+ //Publish batch message
+ info!("Publish request reply message to EventMesh........");
+ let response = producer.request_reply(message.clone(), 1000).await?;
+ info!(
+ "Publish request reply message to EventMesh return result: {}",
+ response
+ );
+ }
+
+ #[cfg(feature = "cloud_events")]
+ {
+ let grpc_client_config = EventMeshGrpcClientConfig::new();
+ let mut producer = GrpcEventMeshProducer::new(grpc_client_config);
+ info!("Publish Message to EventMesh........");
+ let message = EventBuilderV10::new()
+ .id("my_event.my_application")
+ .source("http://localhost:8080")
+ .subject("mxsm")
+ .ty("example.demo")
+ .time(Utc::now())
+ .data(ProtocolKey::CLOUDEVENT_CONTENT_TYPE, "{\"aaa\":\"1111\"}")
+ .build()?;
+ let response = producer.publish(message.clone()).await?;
+ info!("Publish Message to EventMesh return result: {}", response);
+
+ //Publish batch message
+ info!("Publish batch message to EventMesh........");
+ let messages = vec![message.clone(), message.clone(), message.clone()];
+ let response = producer.publish_batch(messages).await?;
+ info!(
+ "Publish batch message to EventMesh return result: {}",
+ response
+ );
+
+ //Publish batch message
+ info!("Publish request reply message to EventMesh........");
+ let response = producer.request_reply(message.clone(), 1000).await?;
+ info!(
+ "Publish request reply message to EventMesh return result: {}",
+ response
+ );
+ }
+
Ok(())
}
diff --git a/eventmesh-sdks/eventmesh-sdk-rust/rust-toolchain.toml
b/eventmesh-sdks/eventmesh-sdk-rust/rust-toolchain.toml
index 15d04a135..50096cb30 100644
--- a/eventmesh-sdks/eventmesh-sdk-rust/rust-toolchain.toml
+++ b/eventmesh-sdks/eventmesh-sdk-rust/rust-toolchain.toml
@@ -18,4 +18,4 @@
[toolchain]
# TODO: we can remove this toolchain file when AFIT and RPITIT hits stable.
-channel = "nightly"
+channel = "nightly-x86_64-pc-windows-msvc"
diff --git a/eventmesh-sdks/eventmesh-sdk-rust/src/common.rs
b/eventmesh-sdks/eventmesh-sdk-rust/src/common.rs
index 1dabb9d28..7b3ea0d01 100644
--- a/eventmesh-sdks/eventmesh-sdk-rust/src/common.rs
+++ b/eventmesh-sdks/eventmesh-sdk-rust/src/common.rs
@@ -20,7 +20,7 @@
pub mod constants;
/// Eventmesh message utilities.
-pub mod eventmesh_message_utils;
+pub mod grpc_eventmesh_message_utils;
/// Local IP helper.
pub(crate) mod local_ip;
diff --git
a/eventmesh-sdks/eventmesh-sdk-rust/src/common/eventmesh_message_utils.rs
b/eventmesh-sdks/eventmesh-sdk-rust/src/common/grpc_eventmesh_message_utils.rs
similarity index 69%
rename from
eventmesh-sdks/eventmesh-sdk-rust/src/common/eventmesh_message_utils.rs
rename to
eventmesh-sdks/eventmesh-sdk-rust/src/common/grpc_eventmesh_message_utils.rs
index 878b0690b..c1ff9a322 100644
--- a/eventmesh-sdks/eventmesh-sdk-rust/src/common/eventmesh_message_utils.rs
+++
b/eventmesh-sdks/eventmesh-sdk-rust/src/common/grpc_eventmesh_message_utils.rs
@@ -20,12 +20,13 @@ use std::collections::{HashMap, HashSet};
use std::fmt::Debug;
use std::time::{SystemTime, UNIX_EPOCH};
+use cloudevents::Data::String as EventString;
+use cloudevents::{AttributesReader, Data, Event, EventBuilder,
EventBuilderV10};
use tonic::transport::Uri;
use crate::common::constants::{DataContentType, SpecVersion,
DEFAULT_EVENTMESH_MESSAGE_TTL};
use crate::common::{ProtocolKey, RandomStringUtils};
use crate::config::EventMeshGrpcClientConfig;
-use crate::model::convert::FromPbCloudEvent;
use crate::model::message::EventMeshMessage;
use crate::model::response::EventMeshResponse;
use crate::model::subscription::SubscriptionItem;
@@ -181,27 +182,28 @@ impl EventMeshCloudEventUtils {
pub fn build_event_mesh_cloud_event<T>(
message: T,
client_config: &EventMeshGrpcClientConfig,
- protocol_type: EventMeshProtocolType,
) -> Option<PbCloudEvent>
where
- T: Debug + Any,
+ T: Any,
{
let message_any = &message as &dyn Any;
- match protocol_type {
- EventMeshProtocolType::CloudEvents => None,
- EventMeshProtocolType::EventMeshMessage => {
- if let Some(em_message) =
message_any.downcast_ref::<EventMeshMessage>() {
-
Some(Self::switch_event_mesh_message_2_event_mesh_cloud_event(
- em_message,
- client_config,
- protocol_type,
- ))
- } else {
- None
- }
- }
- _ => None,
+
+ if let Some(em_message) =
message_any.downcast_ref::<EventMeshMessage>() {
+ return
Some(Self::switch_event_mesh_message_2_event_mesh_cloud_event(
+ em_message,
+ client_config,
+ EventMeshProtocolType::EventMeshMessage,
+ ));
}
+ if let Some(cloud_event) = message_any.downcast_ref::<Event>() {
+ return Some(Self::switch_cloud_event_2_event_mesh_cloud_event(
+ cloud_event,
+ client_config,
+ EventMeshProtocolType::CloudEvents,
+ ));
+ }
+
+ None
}
pub fn switch_event_mesh_message_2_event_mesh_cloud_event(
@@ -330,12 +332,127 @@ impl EventMeshCloudEventUtils {
}
}
- pub fn build_message_from_event_mesh_cloud_event<T>(
- cloud_event: &PbCloudEvent,
+ pub fn switch_cloud_event_2_event_mesh_cloud_event(
+ message: &Event,
+ client_config: &EventMeshGrpcClientConfig,
protocol_type: EventMeshProtocolType,
- ) -> Option<T>
+ ) -> PbCloudEvent {
+ let mut attribute_value_map =
+ Self::build_common_cloud_event_attributes(client_config,
protocol_type);
+ let ttl = message
+ .extension(ProtocolKey::TTL)
+ .map_or(DEFAULT_EVENTMESH_MESSAGE_TTL.to_string(), |value| {
+ value.to_string()
+ });
+ let seq_num = message
+ .extension(ProtocolKey::SEQ_NUM)
+ .map_or(RandomStringUtils::generate_num(30), |value| {
+ value.to_string()
+ });
+ let unique_id = message.id().to_string();
+ attribute_value_map
+ .entry(ProtocolKey::DATA_CONTENT_TYPE.to_string())
+ .or_insert_with(|| PbCloudEventAttributeValue {
+ attr:
Some(PbAttr::CeString(DataContentType::TEXT_PLAIN.to_string())),
+ });
+
+ attribute_value_map.insert(
+ ProtocolKey::TTL.to_string(),
+ PbCloudEventAttributeValue {
+ attr: Some(PbAttr::CeString(ttl.to_string())),
+ },
+ );
+ attribute_value_map.insert(
+ ProtocolKey::SEQ_NUM.to_string(),
+ PbCloudEventAttributeValue {
+ attr: Some(PbAttr::CeString(seq_num.to_string())),
+ },
+ );
+
+ attribute_value_map.insert(
+ ProtocolKey::SEQ_NUM.to_string(),
+ PbCloudEventAttributeValue {
+ attr: Some(PbAttr::CeString(seq_num.to_string())),
+ },
+ );
+
+ attribute_value_map.insert(
+ ProtocolKey::PROTOCOL_DESC.to_string(),
+ PbCloudEventAttributeValue {
+ attr: Some(PbAttr::CeString(
+ ProtocolKey::PROTOCOL_DESC_GRPC_CLOUD_EVENT.to_string(),
+ )),
+ },
+ );
+
+ attribute_value_map.insert(
+ ProtocolKey::UNIQUE_ID.to_string(),
+ PbCloudEventAttributeValue {
+ attr: Some(PbAttr::CeString(unique_id.to_string())),
+ },
+ );
+ attribute_value_map.insert(
+ ProtocolKey::PRODUCERGROUP.to_string(),
+ PbCloudEventAttributeValue {
+ attr: Some(PbAttr::CeString(
+ client_config.producer_group.clone().unwrap(),
+ )),
+ },
+ );
+
+ attribute_value_map.insert(
+ ProtocolKey::SUBJECT.to_string(),
+ PbCloudEventAttributeValue {
+ attr:
Some(PbAttr::CeString(message.subject().unwrap().to_string())),
+ },
+ );
+
+ attribute_value_map.insert(
+ ProtocolKey::DATA_CONTENT_TYPE.to_string(),
+ PbCloudEventAttributeValue {
+ attr:
Some(PbAttr::CeString(DataContentType::TEXT_PLAIN.to_string())),
+ },
+ );
+ message.iter_extensions().for_each(|(key, value)| {
+ attribute_value_map.insert(
+ key.to_string(),
+ PbCloudEventAttributeValue {
+ attr: Some(PbAttr::CeString(value.to_string())),
+ },
+ );
+ });
+
+ let data = {
+ if let Some(content) = message.data() {
+ match content {
+ Data::Binary(bytes) =>
Some(PbData::ProtoData(prost_types::Any {
+ type_url: String::from(""),
+ value: bytes.clone(),
+ })),
+ EventString(string) =>
Some(PbData::TextData(string.clone())),
+ Data::Json(_json) => None,
+ }
+ } else {
+ None
+ }
+ };
+ PbCloudEvent {
+ id: RandomStringUtils::generate_uuid(),
+ source: Uri::builder()
+ .path_and_query("/")
+ .build()
+ .unwrap()
+ .to_string(),
+ spec_version: SpecVersion::V1.to_string(),
+ r#type: Self::CLOUD_EVENT_TYPE.to_string(),
+ attributes: attribute_value_map,
+ data,
+ }
+ }
+
+ pub fn build_message_from_event_mesh_cloud_event<T>(cloud_event:
&PbCloudEvent) -> Option<T>
where
- T: FromPbCloudEvent<T>,
+ T: Any + Debug + From<PbCloudEvent>,
{
let seq = EventMeshCloudEventUtils::get_seq_num(cloud_event);
let unique_id = EventMeshCloudEventUtils::get_unique_id(cloud_event);
@@ -343,11 +460,7 @@ impl EventMeshCloudEventUtils {
if seq.is_empty() || unique_id.is_empty() {
return None;
}
- match protocol_type {
- EventMeshProtocolType::CloudEvents => None,
- EventMeshProtocolType::EventMeshMessage =>
T::from_pb_cloud_event(cloud_event),
- _ => None,
- }
+ Some(T::from(cloud_event.clone()))
}
pub(crate) fn switch_event_mesh_cloud_event_2_event_mesh_message(
@@ -376,6 +489,51 @@ impl EventMeshCloudEventUtils {
}
}
+ pub(crate) fn switch_event_mesh_cloud_event_2_cloud_event(cloud_event:
PbCloudEvent) -> Event {
+ let topic = EventMeshCloudEventUtils::get_subject(&cloud_event);
+ let unique_id = EventMeshCloudEventUtils::get_unique_id(&cloud_event);
+ let content = EventMeshCloudEventUtils::get_text_data(&cloud_event);
+ let source = EventMeshCloudEventUtils::get_source(&cloud_event);
+
+ let mut builder = EventBuilderV10::new()
+ .id(unique_id)
+ .subject(topic)
+ .source(source)
+ .ty(ProtocolKey::CLOUD_EVENTS_PROTOCOL_NAME)
+ .data(DataContentType::JSON, content);
+
+ for (key, value) in cloud_event.attributes {
+ builder = builder.extension(key.as_str(),
value.attr.clone().unwrap().to_string());
+ }
+
+ builder.build().unwrap()
+ }
+
+ #[allow(dead_code)]
+ pub(crate) fn switch_cloud_event_2_event_mesh_message(cloud_event: Event)
-> EventMeshMessage {
+ let mut prop = HashMap::new();
+ cloud_event.iter_attributes().for_each(|(key, value)| {
+ prop.insert(key.to_string(), value.to_string());
+ });
+ let topic = cloud_event.subject().unwrap().to_string();
+ let biz_seq_no = cloud_event
+ .extension(ProtocolKey::SEQ_NUM)
+ .unwrap()
+ .to_string();
+ let unique_id = cloud_event.id().to_string();
+ let content = cloud_event.data().unwrap().to_string();
+ EventMeshMessage {
+ biz_seq_no: Some(biz_seq_no),
+ unique_id: Some(unique_id),
+ topic: Some(topic),
+ content: Some(content),
+ prop,
+ create_time: SystemTime::now()
+ .duration_since(UNIX_EPOCH)
+ .map_or_else(|_err| 0u64, |time| time.as_millis() as u64),
+ }
+ }
+
pub fn get_seq_num(cloud_event: &PbCloudEvent) -> String {
cloud_event
.attributes
@@ -444,6 +602,18 @@ impl EventMeshCloudEventUtils {
.to_string()
}
+ pub fn get_source(cloud_event: &PbCloudEvent) -> String {
+ cloud_event.attributes.get(ProtocolKey::SOURCE).map_or_else(
+ || String::new(),
+ |ce| {
+ ce.attr
+ .clone()
+ .unwrap_or(PbAttr::CeString(String::new()))
+ .to_string()
+ },
+ )
+ }
+
pub fn get_response(cloud_event: &PbCloudEvent) -> EventMeshResponse {
let code = cloud_event
.attributes
diff --git a/eventmesh-sdks/eventmesh-sdk-rust/src/common/protocol_key.rs
b/eventmesh-sdks/eventmesh-sdk-rust/src/common/protocol_key.rs
index fd4a378f5..048af3019 100644
--- a/eventmesh-sdks/eventmesh-sdk-rust/src/common/protocol_key.rs
+++ b/eventmesh-sdks/eventmesh-sdk-rust/src/common/protocol_key.rs
@@ -57,4 +57,8 @@ impl ProtocolKey {
//protocol desc
pub const PROTOCOL_DESC_GRPC_CLOUD_EVENT: &'static str =
"grpc-cloud-event";
+
+ pub const CLOUD_EVENTS_PROTOCOL_NAME: &'static str = "cloudevents";
+
+ pub const CLOUDEVENT_CONTENT_TYPE: &'static str =
"application/cloudevents+json";
}
diff --git a/eventmesh-sdks/eventmesh-sdk-rust/src/grpc.rs
b/eventmesh-sdks/eventmesh-sdk-rust/src/grpc.rs
index b053fe0e6..30046f8c5 100644
--- a/eventmesh-sdks/eventmesh-sdk-rust/src/grpc.rs
+++ b/eventmesh-sdks/eventmesh-sdk-rust/src/grpc.rs
@@ -18,7 +18,7 @@
//! gRPC client implementations.
/// EventMesh message types.
-pub(crate) mod eventmesh_message;
+pub(crate) mod r#impl;
/// gRPC consumer client.
pub mod grpc_consumer;
@@ -29,6 +29,6 @@ pub mod grpc_producer;
/// Protobuf generated definitions.
pub(crate) mod pb;
-#[cfg(all(feature = "eventmesh_message", feature = "grpc"))]
+#[cfg(feature = "grpc")]
/// Re-export gRPC eventmesh message producer when features enabled.
-pub use
crate::grpc::eventmesh_message::eventmesh_message_producer::GrpcEventMeshMessageProducer;
+pub use crate::grpc::r#impl::grpc_producer_impl::GrpcEventMeshProducer;
diff --git a/eventmesh-sdks/eventmesh-sdk-rust/src/grpc/grpc_consumer.rs
b/eventmesh-sdks/eventmesh-sdk-rust/src/grpc/grpc_consumer.rs
index 1beee1a88..bbdd48e4b 100644
--- a/eventmesh-sdks/eventmesh-sdk-rust/src/grpc/grpc_consumer.rs
+++ b/eventmesh-sdks/eventmesh-sdk-rust/src/grpc/grpc_consumer.rs
@@ -23,7 +23,7 @@ use tonic::codegen::tokio_stream::StreamExt;
use tracing::error;
use crate::common::constants::{DataContentType, SDK_STREAM_URL};
-use crate::common::eventmesh_message_utils::EventMeshCloudEventUtils;
+use crate::common::grpc_eventmesh_message_utils::EventMeshCloudEventUtils;
use crate::common::{ProtocolKey, ReceiveMessageListener};
use crate::config::EventMeshGrpcClientConfig;
use crate::error::EventMeshError::{EventMeshLocal, InvalidArgs};
@@ -132,7 +132,7 @@ impl EventMeshGrpcConsumer {
let eventmesh_message =
EventMeshCloudEventUtils::build_message_from_event_mesh_cloud_event::<
EventMeshMessage,
- >(&received, EventMeshProtocolType::EventMeshMessage);
+ >(&received);
if eventmesh_message.is_none() {
continue;
}
@@ -145,7 +145,7 @@ impl EventMeshGrpcConsumer {
let handled_msg =
listener_inner.handle(eventmesh_message.unwrap());
if let Ok(msg_option) = handled_msg {
- if let Some(msg) = msg_option {
+ if let Some(_msg) = msg_option {
let properties = HashMap::<String, String>::new();
let reply = SubscriptionReply::new(
EventMeshCloudEventUtils::get_subject(&received),
diff --git a/eventmesh-sdks/eventmesh-sdk-rust/src/grpc/grpc_producer.rs
b/eventmesh-sdks/eventmesh-sdk-rust/src/grpc/grpc_producer.rs
index ecd8e9e96..867588fb8 100644
--- a/eventmesh-sdks/eventmesh-sdk-rust/src/grpc/grpc_producer.rs
+++ b/eventmesh-sdks/eventmesh-sdk-rust/src/grpc/grpc_producer.rs
@@ -17,16 +17,23 @@
//! Trait for gRPC eventmesh producer.
use crate::model::response::EventMeshResponse;
+use std::future::Future;
/// Trait for gRPC eventmesh producer.
-#[tonic::async_trait]
pub trait EventMeshGrpcProducer<M> {
/// Publish a message.
- async fn publish(&mut self, message: M) ->
crate::Result<EventMeshResponse>;
+ fn publish(&mut self, message: M) -> impl Future<Output =
crate::Result<EventMeshResponse>>;
/// Publish a batch of messages.
- async fn publish_batch(&mut self, messages: Vec<M>) ->
crate::Result<EventMeshResponse>;
+ fn publish_batch(
+ &mut self,
+ messages: Vec<M>,
+ ) -> impl Future<Output = crate::Result<EventMeshResponse>>;
/// Request reply for a message.
- async fn request_reply(&mut self, message: M, time_out: u64) ->
crate::Result<M>;
+ fn request_reply(
+ &mut self,
+ message: M,
+ time_out: u64,
+ ) -> impl Future<Output = crate::Result<M>>;
}
diff --git a/eventmesh-sdks/eventmesh-sdk-rust/src/grpc/eventmesh_message.rs
b/eventmesh-sdks/eventmesh-sdk-rust/src/grpc/impl.rs
similarity index 89%
copy from eventmesh-sdks/eventmesh-sdk-rust/src/grpc/eventmesh_message.rs
copy to eventmesh-sdks/eventmesh-sdk-rust/src/grpc/impl.rs
index a954c46de..edb47941c 100644
--- a/eventmesh-sdks/eventmesh-sdk-rust/src/grpc/eventmesh_message.rs
+++ b/eventmesh-sdks/eventmesh-sdk-rust/src/grpc/impl.rs
@@ -15,5 +15,5 @@
* limitations under the License.
*/
-#[cfg(all(feature = "eventmesh_message", feature = "grpc"))]
-pub mod eventmesh_message_producer;
+#[cfg(feature = "grpc")]
+pub mod grpc_producer_impl;
diff --git
a/eventmesh-sdks/eventmesh-sdk-rust/src/grpc/eventmesh_message/eventmesh_message_producer.rs
b/eventmesh-sdks/eventmesh-sdk-rust/src/grpc/impl/grpc_producer_impl.rs
similarity index 77%
rename from
eventmesh-sdks/eventmesh-sdk-rust/src/grpc/eventmesh_message/eventmesh_message_producer.rs
rename to eventmesh-sdks/eventmesh-sdk-rust/src/grpc/impl/grpc_producer_impl.rs
index 40b3bee34..f81c3f65e 100644
---
a/eventmesh-sdks/eventmesh-sdk-rust/src/grpc/eventmesh_message/eventmesh_message_producer.rs
+++ b/eventmesh-sdks/eventmesh-sdk-rust/src/grpc/impl/grpc_producer_impl.rs
@@ -14,13 +14,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
-#![cfg(all(feature = "eventmesh_message", feature = "grpc"))]
+use std::any::Any;
+use std::fmt::Debug;
+use std::marker::PhantomData;
use tonic::transport::Uri;
use crate::common::constants::{DataContentType, DEFAULT_EVENTMESH_MESSAGE_TTL};
-use crate::common::eventmesh_message_utils::EventMeshCloudEventUtils;
+use crate::common::grpc_eventmesh_message_utils::EventMeshCloudEventUtils;
use crate::config::EventMeshGrpcClientConfig;
use crate::error::EventMeshError;
use crate::grpc::grpc_producer::EventMeshGrpcProducer;
@@ -33,23 +34,30 @@ use crate::proto_cloud_event::{
};
/// gRPC EventMesh message producer.
-pub struct GrpcEventMeshMessageProducer {
+pub struct GrpcEventMeshProducer<M> {
/// gRPC client.
inner: GrpcClient,
/// gRPC configuration.
grpc_config: EventMeshGrpcClientConfig,
+
+ _mark: PhantomData<M>,
}
-impl GrpcEventMeshMessageProducer {
+impl<M> GrpcEventMeshProducer<M>
+where
+ M: Any,
+{
pub fn new(grpc_config: EventMeshGrpcClientConfig) -> Self {
let client = GrpcClient::new(&grpc_config).unwrap();
Self {
inner: client,
grpc_config,
+ _mark: PhantomData::<M>,
}
}
+ #[allow(dead_code)]
fn build_event_mesh_cloud_event(&mut self, message: EventMeshMessage) ->
PbCloudEvent {
let mut event = EventMeshCloudEventBuilder::default()
.with_env(self.grpc_config.env.clone())
@@ -87,7 +95,7 @@ impl GrpcEventMeshMessageProducer {
fn build_event_mesh_cloud_event_batch(
&mut self,
- messages: Vec<EventMeshMessage>,
+ messages: Vec<M>,
) -> Option<PbCloudEventBatch> {
if messages.is_empty() {
return None;
@@ -95,7 +103,10 @@ impl GrpcEventMeshMessageProducer {
let events = messages
.into_iter()
- .map(|msg| self.build_event_mesh_cloud_event(msg))
+ .map(|msg| {
+ EventMeshCloudEventUtils::build_event_mesh_cloud_event(msg,
&self.grpc_config)
+ .unwrap()
+ })
.collect();
let mut cloud_event_batch = PbCloudEventBatch::default();
@@ -107,15 +118,14 @@ impl GrpcEventMeshMessageProducer {
/// gRPC EventMesh message producer implementation.
#[allow(unused_variables)]
-#[tonic::async_trait]
-impl EventMeshGrpcProducer<EventMeshMessage> for GrpcEventMeshMessageProducer {
+impl<M> EventMeshGrpcProducer<M> for GrpcEventMeshProducer<M>
+where
+ M: Any + Debug + From<PbCloudEvent>,
+{
/// Publish a message.
- async fn publish(&mut self, message: EventMeshMessage) ->
crate::Result<EventMeshResponse> {
- let event = EventMeshCloudEventUtils::build_event_mesh_cloud_event(
- message,
- &self.grpc_config,
- EventMeshProtocolType::EventMeshMessage,
- );
+ async fn publish(&mut self, message: M) ->
crate::Result<EventMeshResponse> {
+ let event =
+ EventMeshCloudEventUtils::build_event_mesh_cloud_event(message,
&self.grpc_config);
if event.is_none() {
return Err(EventMeshError::EventMeshLocal(
"Create Event Mesh cloud event Error".to_string(),
@@ -127,10 +137,7 @@ impl EventMeshGrpcProducer<EventMeshMessage> for
GrpcEventMeshMessageProducer {
}
/// Publish a batch of messages.
- async fn publish_batch(
- &mut self,
- messages: Vec<EventMeshMessage>,
- ) -> crate::Result<EventMeshResponse> {
+ async fn publish_batch(&mut self, messages: Vec<M>) ->
crate::Result<EventMeshResponse> {
let events = self.build_event_mesh_cloud_event_batch(messages);
if events.is_none() {
return Err(EventMeshError::EventMeshLocal("Vec is
empty".to_string()).into());
@@ -140,19 +147,13 @@ impl EventMeshGrpcProducer<EventMeshMessage> for
GrpcEventMeshMessageProducer {
}
/// Request reply for a message.
- async fn request_reply(
- &mut self,
- message: EventMeshMessage,
- time_out: u64,
- ) -> crate::Result<EventMeshMessage> {
- let event = self.build_event_mesh_cloud_event(message);
- let result = self.inner.request_reply_inner(event, time_out).await?;
- Ok(
-
EventMeshCloudEventUtils::build_message_from_event_mesh_cloud_event(
- &result,
- EventMeshProtocolType::EventMeshMessage,
- )
- .unwrap(),
- )
+ async fn request_reply(&mut self, message: M, time_out: u64) ->
crate::Result<M> {
+ let event =
+ EventMeshCloudEventUtils::build_event_mesh_cloud_event(message,
&self.grpc_config);
+ let result = self
+ .inner
+ .request_reply_inner(event.unwrap(), time_out)
+ .await?;
+
Ok(EventMeshCloudEventUtils::build_message_from_event_mesh_cloud_event(&result).unwrap())
}
}
diff --git a/eventmesh-sdks/eventmesh-sdk-rust/src/lib.rs
b/eventmesh-sdks/eventmesh-sdk-rust/src/lib.rs
index 452403393..a03f8cfa2 100644
--- a/eventmesh-sdks/eventmesh-sdk-rust/src/lib.rs
+++ b/eventmesh-sdks/eventmesh-sdk-rust/src/lib.rs
@@ -19,7 +19,6 @@
/// Re-export eventmesh main.
pub use eventmesh::main;
-
/// Re-export eventmesh as tokio.
pub use tokio as eventmesh;
@@ -51,19 +50,20 @@ pub mod model;
/// Module contains Protobuf CloudEvent related types and builder.
pub mod proto_cloud_event {
- use crate::common::ProtocolKey;
+ use cloudevents::Event;
+ use crate::common::ProtocolKey;
/// Protobuf CloudEvent attribute value enum.
pub use
crate::grpc::pb::cloud_events::cloud_event::cloud_event_attribute_value::Attr
as PbAttr;
use
crate::grpc::pb::cloud_events::cloud_event::cloud_event_attribute_value::Attr;
pub use
crate::grpc::pb::cloud_events::cloud_event::CloudEventAttributeValue as
PbCloudEventAttributeValue;
pub use crate::grpc::pb::cloud_events::cloud_event::Data as PbData;
use crate::grpc::pb::cloud_events::cloud_event::{CloudEventAttributeValue,
Data};
-
/// Protobuf CloudEvent message.
pub use crate::grpc::pb::cloud_events::{
CloudEvent as PbCloudEvent, CloudEventBatch as PbCloudEventBatch,
};
+ use crate::model::message::EventMeshMessage;
impl ToString for PbAttr {
/// Convert Protobuf attribute to String.
@@ -80,6 +80,18 @@ pub mod proto_cloud_event {
}
}
+ impl From<EventMeshMessage> for PbCloudEvent {
+ fn from(_value: EventMeshMessage) -> Self {
+ todo!()
+ }
+ }
+
+ impl From<Event> for PbCloudEvent {
+ fn from(_value: Event) -> Self {
+ todo!()
+ }
+ }
+
impl ToString for PbData {
/// Convert Protobuf data to String.
fn to_string(&self) -> String {
diff --git a/eventmesh-sdks/eventmesh-sdk-rust/src/model.rs
b/eventmesh-sdks/eventmesh-sdk-rust/src/model.rs
index 95c722afe..299c11147 100644
--- a/eventmesh-sdks/eventmesh-sdk-rust/src/model.rs
+++ b/eventmesh-sdks/eventmesh-sdk-rust/src/model.rs
@@ -53,5 +53,6 @@ impl EventMeshProtocolType {
pub mod message;
pub(crate) mod convert;
+pub mod event_clouds;
pub(crate) mod response;
pub mod subscription;
diff --git a/eventmesh-sdks/eventmesh-sdk-rust/src/grpc/eventmesh_message.rs
b/eventmesh-sdks/eventmesh-sdk-rust/src/model/event_clouds.rs
similarity index 70%
rename from eventmesh-sdks/eventmesh-sdk-rust/src/grpc/eventmesh_message.rs
rename to eventmesh-sdks/eventmesh-sdk-rust/src/model/event_clouds.rs
index a954c46de..324ea00a9 100644
--- a/eventmesh-sdks/eventmesh-sdk-rust/src/grpc/eventmesh_message.rs
+++ b/eventmesh-sdks/eventmesh-sdk-rust/src/model/event_clouds.rs
@@ -15,5 +15,14 @@
* limitations under the License.
*/
-#[cfg(all(feature = "eventmesh_message", feature = "grpc"))]
-pub mod eventmesh_message_producer;
+ use crate::common::grpc_eventmesh_message_utils::EventMeshCloudEventUtils;
+ use cloudevents::Event;
+
+ use crate::proto_cloud_event::PbCloudEvent;
+
+ impl From<PbCloudEvent> for Event {
+ fn from(value: PbCloudEvent) -> Self {
+
EventMeshCloudEventUtils::switch_event_mesh_cloud_event_2_cloud_event(value)
+ }
+ }
+
\ No newline at end of file
diff --git a/eventmesh-sdks/eventmesh-sdk-rust/src/model/message.rs
b/eventmesh-sdks/eventmesh-sdk-rust/src/model/message.rs
index a3a0f7be8..361684063 100644
--- a/eventmesh-sdks/eventmesh-sdk-rust/src/model/message.rs
+++ b/eventmesh-sdks/eventmesh-sdk-rust/src/model/message.rs
@@ -21,9 +21,10 @@ use std::collections::HashMap;
use std::fmt;
use std::time::{SystemTime, UNIX_EPOCH};
+use cloudevents::Event;
use serde::{Deserialize, Serialize};
-use crate::common::eventmesh_message_utils::EventMeshCloudEventUtils;
+use crate::common::grpc_eventmesh_message_utils::EventMeshCloudEventUtils;
use crate::model::convert::FromPbCloudEvent;
use crate::proto_cloud_event::PbCloudEvent;
@@ -145,6 +146,19 @@ impl FromPbCloudEvent<EventMeshMessage> for
EventMeshMessage {
}
}
+impl From<PbCloudEvent> for EventMeshMessage {
+ fn from(value: PbCloudEvent) -> Self {
+
EventMeshCloudEventUtils::switch_event_mesh_cloud_event_2_event_mesh_message(&value)
+ }
+}
+
+#[cfg(feature = "cloud_events")]
+impl From<Event> for EventMeshMessage {
+ fn from(value: Event) -> Self {
+
EventMeshCloudEventUtils::switch_cloud_event_2_event_mesh_message(value)
+ }
+}
+
#[cfg(test)]
mod tests {
use super::*;
diff --git
a/eventmesh-sdks/eventmesh-sdk-rust/tests/eventmesh_message_utils_test.rs
b/eventmesh-sdks/eventmesh-sdk-rust/tests/eventmesh_message_utils_test.rs
index 10c171090..fc523880c 100644
--- a/eventmesh-sdks/eventmesh-sdk-rust/tests/eventmesh_message_utils_test.rs
+++ b/eventmesh-sdks/eventmesh-sdk-rust/tests/eventmesh_message_utils_test.rs
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-use eventmesh::common::eventmesh_message_utils::{EventMeshCloudEventUtils,
ProtoSupport};
+use
eventmesh::common::grpc_eventmesh_message_utils::{EventMeshCloudEventUtils,
ProtoSupport};
use eventmesh::config::EventMeshGrpcClientConfig;
use eventmesh::model::EventMeshProtocolType;
use eventmesh::proto_cloud_event::PbAttr;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]