This is an automated email from the ASF dual-hosted git repository.
mmodzelewski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/master by this push:
new 7c00c5d8a fix(connectors): log underlying errors instead of discarding
them in SDK poll/consume paths (#3200)
7c00c5d8a is described below
commit 7c00c5d8a5b28d1d791c19478dd3e2e30522c4cb
Author: Atharva Lade <[email protected]>
AuthorDate: Thu May 14 01:04:06 2026 -0500
fix(connectors): log underlying errors instead of discarding them in SDK
poll/consume paths (#3200)
Closes #3171
---
core/connectors/sdk/src/sink.rs | 89 ++++++++++++++++++++++++---------------
core/connectors/sdk/src/source.rs | 18 +++++---
2 files changed, 68 insertions(+), 39 deletions(-)
diff --git a/core/connectors/sdk/src/sink.rs b/core/connectors/sdk/src/sink.rs
index 6e3d83bb1..0a59e8c63 100644
--- a/core/connectors/sdk/src/sink.rs
+++ b/core/connectors/sdk/src/sink.rs
@@ -146,30 +146,39 @@ impl<T: Sink + std::fmt::Debug> SinkContainer<T> {
std::slice::from_raw_parts(messages_meta_ptr,
messages_meta_len);
let messages_slice = std::slice::from_raw_parts(messages_ptr,
messages_len);
- let Ok(topic_metadata) =
postcard::from_bytes::<TopicMetadata>(topic_meta_slice) else {
- error!(
- "Failed to decode topic metadata by sink connector with
ID: {}",
- self.id
- );
- return -1;
+ let topic_metadata = match
postcard::from_bytes::<TopicMetadata>(topic_meta_slice) {
+ Ok(meta) => meta,
+ Err(err) => {
+ error!(
+ "Failed to decode topic metadata by sink connector
with ID: {}. {err}",
+ self.id
+ );
+ return -1;
+ }
};
- let Ok(messages_metadata) =
- postcard::from_bytes::<MessagesMetadata>(messages_meta_slice)
- else {
- error!(
- "Failed to decode messages metadata by sink connector with
ID: {} from stream: {}, topic: {}",
- self.id, topic_metadata.stream, topic_metadata.topic
- );
- return -1;
+ let messages_metadata = match
postcard::from_bytes::<MessagesMetadata>(
+ messages_meta_slice,
+ ) {
+ Ok(meta) => meta,
+ Err(err) => {
+ error!(
+ "Failed to decode messages metadata by sink connector
with ID: {} from stream: {}, topic: {}. {err}",
+ self.id, topic_metadata.stream, topic_metadata.topic
+ );
+ return -1;
+ }
};
- let Ok(raw_messages) =
postcard::from_bytes::<RawMessages>(messages_slice) else {
- error!(
- "Failed to decode raw messages by sink connector with ID:
{} from stream: {}, topic: {}",
- self.id, topic_metadata.stream, topic_metadata.topic
- );
- return -1;
+ let raw_messages = match
postcard::from_bytes::<RawMessages>(messages_slice) {
+ Ok(messages) => messages,
+ Err(err) => {
+ error!(
+ "Failed to decode raw messages by sink connector with
ID: {} from stream: {}, topic: {}. {err}",
+ self.id, topic_metadata.stream, topic_metadata.topic
+ );
+ return -1;
+ }
};
let mut messages = Vec::with_capacity(raw_messages.messages.len());
@@ -177,22 +186,27 @@ impl<T: Sink + std::fmt::Debug> SinkContainer<T> {
let headers = if message.headers.is_empty() {
None
} else {
- let Ok(headers) = postcard::from_bytes(&message.headers)
else {
+ match postcard::from_bytes(&message.headers) {
+ Ok(headers) => Some(headers),
+ Err(err) => {
+ error!(
+ "Failed to decode message headers by sink
connector with ID: {} from stream: {}, topic: {}. {err}",
+ self.id, topic_metadata.stream,
topic_metadata.topic
+ );
+ continue;
+ }
+ }
+ };
+
+ let payload = match
messages_metadata.schema.try_into_payload(message.payload) {
+ Ok(payload) => payload,
+ Err(err) => {
error!(
- "Failed to decode message headers by sink
connector with ID: {} from stream: {}, topic: {}",
+ "Failed to decode message payload by sink
connector with ID: {} from stream: {}, topic: {}. {err}",
self.id, topic_metadata.stream,
topic_metadata.topic
);
continue;
- };
- Some(headers)
- };
-
- let Ok(payload) =
messages_metadata.schema.try_into_payload(message.payload) else {
- error!(
- "Failed to decode message payload by sink connector
with ID: {} from stream: {}, topic: {}",
- self.id, topic_metadata.stream, topic_metadata.topic
- );
- continue;
+ }
};
messages.push(ConsumedMessage {
@@ -209,7 +223,16 @@ impl<T: Sink + std::fmt::Debug> SinkContainer<T> {
let runtime = get_runtime();
let result =
runtime.block_on(sink.consume(&topic_metadata,
messages_metadata, messages));
- if result.is_ok() { 0 } else { 1 }
+ match result {
+ Ok(()) => 0,
+ Err(err) => {
+ error!(
+ "Failed to consume messages by sink connector with ID:
{} from stream: {}, topic: {}. {err}",
+ self.id, topic_metadata.stream, topic_metadata.topic
+ );
+ 1
+ }
+ }
}
}
}
diff --git a/core/connectors/sdk/src/source.rs
b/core/connectors/sdk/src/source.rs
index 0209027e0..c465d8a2e 100644
--- a/core/connectors/sdk/src/source.rs
+++ b/core/connectors/sdk/src/source.rs
@@ -182,14 +182,20 @@ async fn handle_messages<T: Source>(
break;
}
messages = source.poll() => {
- let Ok(messages) = messages else {
- error!("Failed to poll messages for source connector with
ID: {plugin_id}");
- continue;
+ let messages = match messages {
+ Ok(messages) => messages,
+ Err(err) => {
+ error!("Failed to poll messages for source connector
with ID: {plugin_id}. {err}");
+ continue;
+ }
};
- let Ok(messages) = postcard::to_allocvec(&messages) else {
- error!("Failed to serialize messages for source connector
with ID: {plugin_id}");
- continue;
+ let messages = match postcard::to_allocvec(&messages) {
+ Ok(messages) => messages,
+ Err(err) => {
+ error!("Failed to serialize messages for source
connector with ID: {plugin_id}. {err}");
+ continue;
+ }
};
callback(plugin_id, messages.as_ptr(), messages.len());