This is an automated email from the ASF dual-hosted git repository.
freeznet pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new a045e4c2795 [feat][fn] Add message decoding failure handling (#25972)
a045e4c2795 is described below
commit a045e4c279505fa842da0135c839c78c4cf6c3c4
Author: Rui Fu <[email protected]>
AuthorDate: Thu Jun 11 16:37:50 2026 +0800
[feat][fn] Add message decoding failure handling (#25972)
---
.../functions/instance/JavaInstanceRunnable.java | 28 +++++++++-
.../instance/JavaInstanceRunnableTest.java | 63 ++++++++++++++++++++++
2 files changed, 89 insertions(+), 2 deletions(-)
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index 84b680a60a3..b52f188ba74 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -32,6 +32,7 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.HexFormat;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -51,6 +52,7 @@ import org.apache.logging.log4j.core.config.Configuration;
import org.apache.logging.log4j.core.config.LoggerConfig;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.ClientBuilder;
+import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
@@ -561,12 +563,34 @@ public class JavaInstanceRunnable implements
AutoCloseable, Runnable {
// check record is valid
if (record == null) {
throw new IllegalArgumentException("The record returned by the
source cannot be null");
- } else if (record.getValue() == null) {
- throw new IllegalArgumentException("The value in the record
returned by the source cannot be null");
+ }
+ // Eagerly access the value here so a malformed/poison message
surfaces with enough
+ // context (message id, topic, key, schema version) to be located and
skipped, instead
+ // of bubbling up as an opaque crash that names no message.
+ try {
+ if (record.getValue() == null) {
+ throw new IllegalArgumentException("The value in the record
returned by the source cannot be null");
+ }
+ } catch (Exception e) {
+ logInputValueDecodeFailure(record, e);
+ throw e;
}
return record;
}
+ private void logInputValueDecodeFailure(Record<?> record, Exception e) {
+ log.warn()
+ .attr("topic", record.getTopicName().orElse(null))
+ .attr("messageId", record.getMessage().map(m ->
String.valueOf(m.getMessageId())).orElse(null))
+ .attr("partitionKey", record.getKey().orElse(null))
+ .attr("schemaVersion", record.getMessage()
+ .map(Message::getSchemaVersion)
+ .map(sv -> HexFormat.of().formatHex(sv))
+ .orElse(null))
+ .exception(e)
+ .log("Failed to decode the value of the input message; the
message cannot be processed");
+ }
+
/**
* NOTE: this method is be synchronized because it is potentially called
by two different places
* one inside the run/finally clause and one inside the
ThreadRuntime::stop.
diff --git
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java
index c29ff358d58..f30e0e6c6f7 100644
---
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java
+++
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java
@@ -27,12 +27,14 @@ import static org.mockito.Mockito.when;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.google.protobuf.Any;
import java.lang.reflect.Field;
+import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.LinkedBlockingQueue;
@@ -41,8 +43,11 @@ import lombok.CustomLog;
import lombok.Getter;
import lombok.Setter;
import org.apache.pulsar.client.api.ClientBuilder;
+import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SchemaSerializationException;
+import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.io.ConnectorDefinition;
import org.apache.pulsar.common.nar.NarClassLoader;
import org.apache.pulsar.common.schema.SchemaType;
@@ -539,6 +544,64 @@ public class JavaInstanceRunnableTest {
});
}
+ @Test
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ public void testReadInputLogsAndRethrowsOnValueDecodeFailure() throws
Exception {
+ SchemaSerializationException decodeError = new
SchemaSerializationException("boom");
+
+ // Case 1: full message coordinates present -> the original decode
exception is rethrown unchanged.
+ Record<?> record = mock(Record.class);
+ when(record.getValue()).thenThrow(decodeError);
+
when(record.getTopicName()).thenReturn(Optional.of("persistent://t/n/topic-partition-0"));
+ when(record.getKey()).thenReturn(Optional.of("eyJpZCI6NjU2MzY0Nn0="));
+ Message<?> message = mock(Message.class);
+ when(message.getMessageId()).thenReturn(new MessageIdImpl(232155494L,
7641L, 0));
+ when(message.getSchemaVersion()).thenReturn(new byte[]{0, 0, 0, 0, 0,
0, 0, 1});
+ when(record.getMessage()).thenReturn((Optional) Optional.of(message));
+
+ JavaInstanceRunnable runnable = createRunnable((String) null);
+ setPrivateSource(runnable, record);
+ InvocationTargetException ite =
+ Assert.expectThrows(InvocationTargetException.class, () ->
invokeReadInput(runnable));
+ Assert.assertSame(ite.getCause(), decodeError);
+
+ // Case 2: missing coordinates (empty message, no topic/key) must not
NPE and still rethrows.
+ Record<?> record2 = mock(Record.class);
+ when(record2.getValue()).thenThrow(decodeError);
+ when(record2.getTopicName()).thenReturn(Optional.empty());
+ when(record2.getKey()).thenReturn(Optional.empty());
+ when(record2.getMessage()).thenReturn(Optional.empty());
+
+ JavaInstanceRunnable runnable2 = createRunnable((String) null);
+ setPrivateSource(runnable2, record2);
+ InvocationTargetException ite2 =
+ Assert.expectThrows(InvocationTargetException.class, () ->
invokeReadInput(runnable2));
+ Assert.assertSame(ite2.getCause(), decodeError);
+ }
+
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ private void setPrivateSource(JavaInstanceRunnable runnable, Record<?>
record) throws Exception {
+ Source<?> source = mock(Source.class);
+ when(source.read()).thenReturn((Record) record);
+ Field field = JavaInstanceRunnable.class.getDeclaredField("source");
+ field.setAccessible(true);
+ field.set(runnable, source);
+ field.setAccessible(false);
+ }
+
+ private void invokeReadInput(JavaInstanceRunnable runnable) throws
Exception {
+ Method method =
JavaInstanceRunnable.class.getDeclaredMethod("readInput");
+ method.setAccessible(true);
+ // readInput()'s finally block resets the context classloader;
preserve and restore the
+ // test thread's classloader so it is not left null for subsequent
tests.
+ ClassLoader original = Thread.currentThread().getContextClassLoader();
+ try {
+ method.invoke(runnable);
+ } finally {
+ Thread.currentThread().setContextClassLoader(original);
+ }
+ }
+
@AfterClass
public void cleanupInstanceCache() {
InstanceCache.shutdown();