This is an automated email from the ASF dual-hosted git repository.

freeznet pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new a045e4c2795 [feat][fn] Add message decoding failure handling (#25972)
a045e4c2795 is described below

commit a045e4c279505fa842da0135c839c78c4cf6c3c4
Author: Rui Fu <[email protected]>
AuthorDate: Thu Jun 11 16:37:50 2026 +0800

    [feat][fn] Add message decoding failure handling (#25972)
---
 .../functions/instance/JavaInstanceRunnable.java   | 28 +++++++++-
 .../instance/JavaInstanceRunnableTest.java         | 63 ++++++++++++++++++++++
 2 files changed, 89 insertions(+), 2 deletions(-)

diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index 84b680a60a3..b52f188ba74 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -32,6 +32,7 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HexFormat;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -51,6 +52,7 @@ import org.apache.logging.log4j.core.config.Configuration;
 import org.apache.logging.log4j.core.config.LoggerConfig;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.api.ClientBuilder;
+import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
@@ -561,12 +563,34 @@ public class JavaInstanceRunnable implements 
AutoCloseable, Runnable {
         // check record is valid
         if (record == null) {
             throw new IllegalArgumentException("The record returned by the 
source cannot be null");
-        } else if (record.getValue() == null) {
-            throw new IllegalArgumentException("The value in the record 
returned by the source cannot be null");
+        }
+        // Eagerly access the value here so a malformed/poison message 
surfaces with enough
+        // context (message id, topic, key, schema version) to be located and 
skipped, instead
+        // of bubbling up as an opaque crash that names no message.
+        try {
+            if (record.getValue() == null) {
+                throw new IllegalArgumentException("The value in the record 
returned by the source cannot be null");
+            }
+        } catch (Exception e) {
+            logInputValueDecodeFailure(record, e);
+            throw e;
         }
         return record;
     }
 
+    private void logInputValueDecodeFailure(Record<?> record, Exception e) {
+        log.warn()
+                .attr("topic", record.getTopicName().orElse(null))
+                .attr("messageId", record.getMessage().map(m -> 
String.valueOf(m.getMessageId())).orElse(null))
+                .attr("partitionKey", record.getKey().orElse(null))
+                .attr("schemaVersion", record.getMessage()
+                        .map(Message::getSchemaVersion)
+                        .map(sv -> HexFormat.of().formatHex(sv))
+                        .orElse(null))
+                .exception(e)
+                .log("Failed to decode the value of the input message; the 
message cannot be processed");
+    }
+
     /**
      * NOTE: this method is be synchronized because it is potentially called 
by two different places
      *       one inside the run/finally clause and one inside the 
ThreadRuntime::stop.
diff --git 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java
 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java
index c29ff358d58..f30e0e6c6f7 100644
--- 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java
+++ 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java
@@ -27,12 +27,14 @@ import static org.mockito.Mockito.when;
 import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.google.protobuf.Any;
 import java.lang.reflect.Field;
+import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.TreeSet;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -41,8 +43,11 @@ import lombok.CustomLog;
 import lombok.Getter;
 import lombok.Setter;
 import org.apache.pulsar.client.api.ClientBuilder;
+import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SchemaSerializationException;
+import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.common.io.ConnectorDefinition;
 import org.apache.pulsar.common.nar.NarClassLoader;
 import org.apache.pulsar.common.schema.SchemaType;
@@ -539,6 +544,64 @@ public class JavaInstanceRunnableTest {
                 });
     }
 
+    @Test
+    @SuppressWarnings({"unchecked", "rawtypes"})
+    public void testReadInputLogsAndRethrowsOnValueDecodeFailure() throws 
Exception {
+        SchemaSerializationException decodeError = new 
SchemaSerializationException("boom");
+
+        // Case 1: full message coordinates present -> the original decode 
exception is rethrown unchanged.
+        Record<?> record = mock(Record.class);
+        when(record.getValue()).thenThrow(decodeError);
+        
when(record.getTopicName()).thenReturn(Optional.of("persistent://t/n/topic-partition-0"));
+        when(record.getKey()).thenReturn(Optional.of("eyJpZCI6NjU2MzY0Nn0="));
+        Message<?> message = mock(Message.class);
+        when(message.getMessageId()).thenReturn(new MessageIdImpl(232155494L, 
7641L, 0));
+        when(message.getSchemaVersion()).thenReturn(new byte[]{0, 0, 0, 0, 0, 
0, 0, 1});
+        when(record.getMessage()).thenReturn((Optional) Optional.of(message));
+
+        JavaInstanceRunnable runnable = createRunnable((String) null);
+        setPrivateSource(runnable, record);
+        InvocationTargetException ite =
+                Assert.expectThrows(InvocationTargetException.class, () -> 
invokeReadInput(runnable));
+        Assert.assertSame(ite.getCause(), decodeError);
+
+        // Case 2: missing coordinates (empty message, no topic/key) must not 
NPE and still rethrows.
+        Record<?> record2 = mock(Record.class);
+        when(record2.getValue()).thenThrow(decodeError);
+        when(record2.getTopicName()).thenReturn(Optional.empty());
+        when(record2.getKey()).thenReturn(Optional.empty());
+        when(record2.getMessage()).thenReturn(Optional.empty());
+
+        JavaInstanceRunnable runnable2 = createRunnable((String) null);
+        setPrivateSource(runnable2, record2);
+        InvocationTargetException ite2 =
+                Assert.expectThrows(InvocationTargetException.class, () -> 
invokeReadInput(runnable2));
+        Assert.assertSame(ite2.getCause(), decodeError);
+    }
+
+    @SuppressWarnings({"unchecked", "rawtypes"})
+    private void setPrivateSource(JavaInstanceRunnable runnable, Record<?> 
record) throws Exception {
+        Source<?> source = mock(Source.class);
+        when(source.read()).thenReturn((Record) record);
+        Field field = JavaInstanceRunnable.class.getDeclaredField("source");
+        field.setAccessible(true);
+        field.set(runnable, source);
+        field.setAccessible(false);
+    }
+
+    private void invokeReadInput(JavaInstanceRunnable runnable) throws 
Exception {
+        Method method = 
JavaInstanceRunnable.class.getDeclaredMethod("readInput");
+        method.setAccessible(true);
+        // readInput()'s finally block resets the context classloader; 
preserve and restore the
+        // test thread's classloader so it is not left null for subsequent 
tests.
+        ClassLoader original = Thread.currentThread().getContextClassLoader();
+        try {
+            method.invoke(runnable);
+        } finally {
+            Thread.currentThread().setContextClassLoader(original);
+        }
+    }
+
     @AfterClass
     public void cleanupInstanceCache() {
         InstanceCache.shutdown();

Reply via email to