cameronlee314 commented on a change in pull request #1426: URL: https://github.com/apache/samza/pull/1426#discussion_r482317863
########## File path: samza-core/src/main/java/org/apache/samza/serializers/IntermediateMessageSerde.java ########## @@ -83,21 +83,17 @@ public Object fromBytes(byte[] bytes) { throw new UnsupportedOperationException(String.format("Message type %s is not supported", type.name())); } return object; - } catch (UnsupportedOperationException ue) { throw new SamzaException(ue); } catch (Exception e) { // This will catch the following exceptions: // 1) the first byte is not a valid type so it will cause ArrayOutOfBound exception // 2) the first byte happens to be a valid type, but the deserialization fails with certain exception - // For these cases, we fall back to user-provided serde - try { - return userMessageSerde.fromBytes(bytes); - } catch (Exception umse) { - LOGGER.error("Error deserializing from both intermediate message serde and user message serde. " - + "Original exception: ", e); - throw umse; - } + // For these cases, we WILL NOT fall back to user-provided serde. Thus, we are not compatible with upgrade + // directly from samza version older than 0.13.1. Review comment: Consider also adding a little clarification about what versions older than 0.13.1 did to make those versions incompatible with this new code. ########## File path: samza-core/src/test/java/org/apache/samza/serializers/model/serializers/TestIntermediateMessageSerde.java ########## @@ -126,4 +128,14 @@ public void testEndOfStreamMessageSerde() { assertEquals(de.getTaskName(), taskName); assertEquals(de.getVersion(), 1); } + + @Test (expected = SamzaException.class) + public void testUserMessageSerdeException() { + IntermediateMessageSerde imserde = new IntermediateMessageSerde(new ObjectSerde()); + IntermediateMessageSerde imserde2 = new IntermediateMessageSerde(new JsonSerdeV2<>()); Review comment: Instead of depending on `JsonSerdeV2` for this test, consider creating a mock `Serde` to do what you need. In general, using mocks is helpful so that you don't rely on a specific implementation of something which is subject to change in the future. For example, what if `JsonSerdeV2` was changed to stop throwing an exception for certain kinds of data, or if it was changed to throw a different kind of exception than `SamzaException`? ########## File path: samza-core/src/test/java/org/apache/samza/serializers/model/serializers/TestIntermediateMessageSerde.java ########## @@ -126,4 +128,14 @@ public void testEndOfStreamMessageSerde() { assertEquals(de.getTaskName(), taskName); assertEquals(de.getVersion(), 1); } + + @Test (expected = SamzaException.class) + public void testUserMessageSerdeException() { Review comment: It seems like this test would have succeeded with the previous version of `IntermediateMessageSerde` as well. Could you please try to come up with a test that would only succeed with your new version of `IntermediateMessageSerde`? For example, you could mock the `imserde2` and then verify that it only gets called with a certain byte array. ########## File path: samza-core/src/main/java/org/apache/samza/serializers/IntermediateMessageSerde.java ########## @@ -83,21 +83,17 @@ public Object fromBytes(byte[] bytes) { throw new UnsupportedOperationException(String.format("Message type %s is not supported", type.name())); } return object; - } catch (UnsupportedOperationException ue) { throw new SamzaException(ue); } catch (Exception e) { // This will catch the following exceptions: // 1) the first byte is not a valid type so it will cause ArrayOutOfBound exception // 2) the first byte happens to be a valid type, but the deserialization fails with certain exception - // For these cases, we fall back to user-provided serde - try { - return userMessageSerde.fromBytes(bytes); - } catch (Exception umse) { - LOGGER.error("Error deserializing from both intermediate message serde and user message serde. " - + "Original exception: ", e); - throw umse; - } + // For these cases, we WILL NOT fall back to user-provided serde. Thus, we are not compatible with upgrade + // directly from samza version older than 0.13.1. + LOGGER.error("Error deserializing with intermediate message serde. If you are upgrading from samza version older" + + " than 0.13.1, please upgrade to samza 1.5 first."); Review comment: Just to clarify: Is the idea that this PR will go into Samza 1.6? So upgrading to samza 1.5 first will convert all intermediate messages into the new format, which means going to 1.6 after that will then be compatible? Maybe consider adding a comment here about why upgrading to Samza 1.5 will help. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org