This is an automated email from the ASF dual-hosted git repository.
cameronlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new d12af34 SAMZA-2587: IntermediateMessageSerde exception handling
(#1426)
d12af34 is described below
commit d12af34de53338048e9eb81f8e46019cd9f5ba06
Author: Yixing Zhang <[email protected]>
AuthorDate: Thu Oct 1 15:26:44 2020 -0700
SAMZA-2587: IntermediateMessageSerde exception handling (#1426)
Upgrade Instructions: For users that are upgrading directly from samza
version 0.13.0 or older versions: A message type of intermediate messages was
introduced in samza 0.13.1. For samza 0.13.0 or older versions, the first byte
of MessageType doesn't exist in the bytes. Thus, upgrading from those versions
will fail. There are three ways to fix this issue:
a) Reset checkpoint to consume from newest message in the intermediate
stream
b) Clean all existing messages in the intermediate stream
c) Run the application in any version between 0.13.1 and 1.5 until all old
messages in intermediate stream has reached retention time.
Co-authored-by: Yixing Zhang <[email protected]>
---
.../serializers/IntermediateMessageSerde.java | 29 ++++++++++++----------
.../serializers/TestIntermediateMessageSerde.java | 27 ++++++++++++++++++++
2 files changed, 43 insertions(+), 13 deletions(-)
diff --git
a/samza-core/src/main/java/org/apache/samza/serializers/IntermediateMessageSerde.java
b/samza-core/src/main/java/org/apache/samza/serializers/IntermediateMessageSerde.java
index a8f9852..83a0a35 100644
---
a/samza-core/src/main/java/org/apache/samza/serializers/IntermediateMessageSerde.java
+++
b/samza-core/src/main/java/org/apache/samza/serializers/IntermediateMessageSerde.java
@@ -67,7 +67,21 @@ public class IntermediateMessageSerde implements
Serde<Object> {
public Object fromBytes(byte[] bytes) {
try {
final Object object;
- final MessageType type = MessageType.values()[bytes[0]];
+ final MessageType type;
+ try {
+ type = MessageType.values()[bytes[0]];
+ } catch (ArrayIndexOutOfBoundsException e) {
+ // The message type was introduced in samza 0.13.1. For samza 0.13.0
or older versions, the first byte of
+ // MessageType doesn't exist in the bytes. Thus, upgrading from those
versions will get this exception.
+ // There are three ways to solve this issue:
+ // a) Reset checkpoint to consume from newest message in the
intermediate stream
+ // b) clean all existing messages in the intermediate stream
+ // c) Run the application in any version between 0.13.1 and 1.5 until
all old messages in intermediate stream
+ // has reached retention time.
+ throw new SamzaException("Error reading the message type from
intermediate message. This may happen if you "
+ + "have recently upgraded from samza version older than 0.13.1 or
there are still old messages in the "
+ + "intermediate stream.", e);
+ }
final byte[] data = Arrays.copyOfRange(bytes, 1, bytes.length);
switch (type) {
case USER_MESSAGE:
@@ -83,21 +97,10 @@ public class IntermediateMessageSerde implements
Serde<Object> {
throw new UnsupportedOperationException(String.format("Message type
%s is not supported", type.name()));
}
return object;
-
} catch (UnsupportedOperationException ue) {
throw new SamzaException(ue);
} catch (Exception e) {
- // This will catch the following exceptions:
- // 1) the first byte is not a valid type so it will cause
ArrayOutOfBound exception
- // 2) the first byte happens to be a valid type, but the deserialization
fails with certain exception
- // For these cases, we fall back to user-provided serde
- try {
- return userMessageSerde.fromBytes(bytes);
- } catch (Exception umse) {
- LOGGER.error("Error deserializing from both intermediate message serde
and user message serde. "
- + "Original exception: ", e);
- throw umse;
- }
+ throw e;
}
}
diff --git
a/samza-core/src/test/java/org/apache/samza/serializers/model/serializers/TestIntermediateMessageSerde.java
b/samza-core/src/test/java/org/apache/samza/serializers/model/serializers/TestIntermediateMessageSerde.java
index 7a3faca..22250fc 100644
---
a/samza-core/src/test/java/org/apache/samza/serializers/model/serializers/TestIntermediateMessageSerde.java
+++
b/samza-core/src/test/java/org/apache/samza/serializers/model/serializers/TestIntermediateMessageSerde.java
@@ -24,15 +24,20 @@ import java.io.ByteArrayOutputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
+import java.util.Arrays;
import org.apache.samza.serializers.IntermediateMessageSerde;
import org.apache.samza.serializers.Serde;
import org.apache.samza.system.EndOfStreamMessage;
import org.apache.samza.system.MessageType;
import org.apache.samza.system.WatermarkMessage;
+import org.junit.Assert;
import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.*;
public class TestIntermediateMessageSerde {
@@ -126,4 +131,26 @@ public class TestIntermediateMessageSerde {
assertEquals(de.getTaskName(), taskName);
assertEquals(de.getVersion(), 1);
}
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testUserMessageSerdeException() {
+ Serde<?> mockUserMessageSerde = mock(Serde.class);
+ when(mockUserMessageSerde.fromBytes(anyObject())).then(new
Answer<Object>() {
+ @Override
+ public Object answer(InvocationOnMock invocation) throws Throwable {
+ byte[] bytes = invocation.getArgumentAt(0, byte[].class);
+ if (Arrays.equals(bytes, new byte[]{1, 2})) {
+ throw new IllegalArgumentException("User message serde failed to
deserialize this message.");
+ } else {
+ // Intermediate message serde shouldn't try to deserialize user
message with wrong bytes
+ Assert.fail();
+ return null;
+ }
+ }
+ });
+
+ IntermediateMessageSerde imserde = new
IntermediateMessageSerde(mockUserMessageSerde);
+ byte[] bytes = new byte[]{0, 1, 2};
+ imserde.fromBytes(bytes);
+ }
}