This is an automated email from the ASF dual-hosted git repository.

mmodzelewski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/master by this push:
     new 7c00c5d8a fix(connectors): log underlying errors instead of discarding 
them in SDK poll/consume paths (#3200)
7c00c5d8a is described below

commit 7c00c5d8a5b28d1d791c19478dd3e2e30522c4cb
Author: Atharva Lade <[email protected]>
AuthorDate: Thu May 14 01:04:06 2026 -0500

    fix(connectors): log underlying errors instead of discarding them in SDK 
poll/consume paths (#3200)
    
    Closes #3171
---
 core/connectors/sdk/src/sink.rs   | 89 ++++++++++++++++++++++++---------------
 core/connectors/sdk/src/source.rs | 18 +++++---
 2 files changed, 68 insertions(+), 39 deletions(-)

diff --git a/core/connectors/sdk/src/sink.rs b/core/connectors/sdk/src/sink.rs
index 6e3d83bb1..0a59e8c63 100644
--- a/core/connectors/sdk/src/sink.rs
+++ b/core/connectors/sdk/src/sink.rs
@@ -146,30 +146,39 @@ impl<T: Sink + std::fmt::Debug> SinkContainer<T> {
                 std::slice::from_raw_parts(messages_meta_ptr, 
messages_meta_len);
             let messages_slice = std::slice::from_raw_parts(messages_ptr, 
messages_len);
 
-            let Ok(topic_metadata) = 
postcard::from_bytes::<TopicMetadata>(topic_meta_slice) else {
-                error!(
-                    "Failed to decode topic metadata by sink connector with 
ID: {}",
-                    self.id
-                );
-                return -1;
+            let topic_metadata = match 
postcard::from_bytes::<TopicMetadata>(topic_meta_slice) {
+                Ok(meta) => meta,
+                Err(err) => {
+                    error!(
+                        "Failed to decode topic metadata by sink connector 
with ID: {}. {err}",
+                        self.id
+                    );
+                    return -1;
+                }
             };
 
-            let Ok(messages_metadata) =
-                postcard::from_bytes::<MessagesMetadata>(messages_meta_slice)
-            else {
-                error!(
-                    "Failed to decode messages metadata by sink connector with 
ID: {} from stream: {}, topic: {}",
-                    self.id, topic_metadata.stream, topic_metadata.topic
-                );
-                return -1;
+            let messages_metadata = match 
postcard::from_bytes::<MessagesMetadata>(
+                messages_meta_slice,
+            ) {
+                Ok(meta) => meta,
+                Err(err) => {
+                    error!(
+                        "Failed to decode messages metadata by sink connector 
with ID: {} from stream: {}, topic: {}. {err}",
+                        self.id, topic_metadata.stream, topic_metadata.topic
+                    );
+                    return -1;
+                }
             };
 
-            let Ok(raw_messages) = 
postcard::from_bytes::<RawMessages>(messages_slice) else {
-                error!(
-                    "Failed to decode raw messages by sink connector with ID: 
{} from stream: {}, topic: {}",
-                    self.id, topic_metadata.stream, topic_metadata.topic
-                );
-                return -1;
+            let raw_messages = match 
postcard::from_bytes::<RawMessages>(messages_slice) {
+                Ok(messages) => messages,
+                Err(err) => {
+                    error!(
+                        "Failed to decode raw messages by sink connector with 
ID: {} from stream: {}, topic: {}. {err}",
+                        self.id, topic_metadata.stream, topic_metadata.topic
+                    );
+                    return -1;
+                }
             };
 
             let mut messages = Vec::with_capacity(raw_messages.messages.len());
@@ -177,22 +186,27 @@ impl<T: Sink + std::fmt::Debug> SinkContainer<T> {
                 let headers = if message.headers.is_empty() {
                     None
                 } else {
-                    let Ok(headers) = postcard::from_bytes(&message.headers) 
else {
+                    match postcard::from_bytes(&message.headers) {
+                        Ok(headers) => Some(headers),
+                        Err(err) => {
+                            error!(
+                                "Failed to decode message headers by sink 
connector with ID: {} from stream: {}, topic: {}. {err}",
+                                self.id, topic_metadata.stream, 
topic_metadata.topic
+                            );
+                            continue;
+                        }
+                    }
+                };
+
+                let payload = match 
messages_metadata.schema.try_into_payload(message.payload) {
+                    Ok(payload) => payload,
+                    Err(err) => {
                         error!(
-                            "Failed to decode message headers by sink 
connector with ID: {} from stream: {}, topic: {}",
+                            "Failed to decode message payload by sink 
connector with ID: {} from stream: {}, topic: {}. {err}",
                             self.id, topic_metadata.stream, 
topic_metadata.topic
                         );
                         continue;
-                    };
-                    Some(headers)
-                };
-
-                let Ok(payload) = 
messages_metadata.schema.try_into_payload(message.payload) else {
-                    error!(
-                        "Failed to decode message payload by sink connector 
with ID: {} from stream: {}, topic: {}",
-                        self.id, topic_metadata.stream, topic_metadata.topic
-                    );
-                    continue;
+                    }
                 };
 
                 messages.push(ConsumedMessage {
@@ -209,7 +223,16 @@ impl<T: Sink + std::fmt::Debug> SinkContainer<T> {
             let runtime = get_runtime();
             let result =
                 runtime.block_on(sink.consume(&topic_metadata, 
messages_metadata, messages));
-            if result.is_ok() { 0 } else { 1 }
+            match result {
+                Ok(()) => 0,
+                Err(err) => {
+                    error!(
+                        "Failed to consume messages by sink connector with ID: 
{} from stream: {}, topic: {}. {err}",
+                        self.id, topic_metadata.stream, topic_metadata.topic
+                    );
+                    1
+                }
+            }
         }
     }
 }
diff --git a/core/connectors/sdk/src/source.rs 
b/core/connectors/sdk/src/source.rs
index 0209027e0..c465d8a2e 100644
--- a/core/connectors/sdk/src/source.rs
+++ b/core/connectors/sdk/src/source.rs
@@ -182,14 +182,20 @@ async fn handle_messages<T: Source>(
                 break;
             }
             messages = source.poll() => {
-                let Ok(messages) = messages else {
-                    error!("Failed to poll messages for source connector with 
ID: {plugin_id}");
-                    continue;
+                let messages = match messages {
+                    Ok(messages) => messages,
+                    Err(err) => {
+                        error!("Failed to poll messages for source connector 
with ID: {plugin_id}. {err}");
+                        continue;
+                    }
                 };
 
-                let Ok(messages) = postcard::to_allocvec(&messages) else {
-                    error!("Failed to serialize messages for source connector 
with ID: {plugin_id}");
-                    continue;
+                let messages = match postcard::to_allocvec(&messages) {
+                    Ok(messages) => messages,
+                    Err(err) => {
+                        error!("Failed to serialize messages for source 
connector with ID: {plugin_id}. {err}");
+                        continue;
+                    }
                 };
 
                 callback(plugin_id, messages.as_ptr(), messages.len());

Reply via email to