cameronlee314 commented on a change in pull request #1426:
URL: https://github.com/apache/samza/pull/1426#discussion_r482317863



##########
File path: 
samza-core/src/main/java/org/apache/samza/serializers/IntermediateMessageSerde.java
##########
@@ -83,21 +83,17 @@ public Object fromBytes(byte[] bytes) {
           throw new UnsupportedOperationException(String.format("Message type 
%s is not supported", type.name()));
       }
       return object;
-
     } catch (UnsupportedOperationException ue) {
       throw new SamzaException(ue);
     } catch (Exception e) {
       // This will catch the following exceptions:
       // 1) the first byte is not a valid type so it will cause 
ArrayOutOfBound exception
       // 2) the first byte happens to be a valid type, but the deserialization 
fails with certain exception
-      // For these cases, we fall back to user-provided serde
-      try {
-        return userMessageSerde.fromBytes(bytes);
-      } catch (Exception umse) {
-        LOGGER.error("Error deserializing from both intermediate message serde 
and user message serde. "
-            + "Original exception: ", e);
-        throw umse;
-      }
+      // For these cases, we WILL NOT fall back to user-provided serde. Thus, 
we are not compatible with upgrade
+      // directly from samza version older than 0.13.1.

Review comment:
       Consider also adding a little clarification about what versions older 
than 0.13.1 did to make those versions incompatible with this new code.

##########
File path: 
samza-core/src/test/java/org/apache/samza/serializers/model/serializers/TestIntermediateMessageSerde.java
##########
@@ -126,4 +128,14 @@ public void testEndOfStreamMessageSerde() {
     assertEquals(de.getTaskName(), taskName);
     assertEquals(de.getVersion(), 1);
   }
+
+  @Test (expected = SamzaException.class)
+  public void testUserMessageSerdeException() {
+    IntermediateMessageSerde imserde = new IntermediateMessageSerde(new 
ObjectSerde());
+    IntermediateMessageSerde imserde2 = new IntermediateMessageSerde(new 
JsonSerdeV2<>());

Review comment:
       Instead of depending on `JsonSerdeV2` for this test, consider creating a 
mock `Serde` to do what you need.
   In general, using mocks is helpful so that you don't rely on a specific 
implementation of something which is subject to change in the future. For 
example, what if `JsonSerdeV2` was changed to stop throwing an exception for 
certain kinds of data, or if it was changed to throw a different kind of 
exception than `SamzaException`?

##########
File path: 
samza-core/src/test/java/org/apache/samza/serializers/model/serializers/TestIntermediateMessageSerde.java
##########
@@ -126,4 +128,14 @@ public void testEndOfStreamMessageSerde() {
     assertEquals(de.getTaskName(), taskName);
     assertEquals(de.getVersion(), 1);
   }
+
+  @Test (expected = SamzaException.class)
+  public void testUserMessageSerdeException() {

Review comment:
       It seems like this test would have succeeded with the previous version 
of `IntermediateMessageSerde` as well. Could you please try to come up with a 
test that would only succeed with your new version of 
`IntermediateMessageSerde`?
   For example, you could mock the `imserde2` and then verify that it only gets 
called with a certain byte array.

##########
File path: 
samza-core/src/main/java/org/apache/samza/serializers/IntermediateMessageSerde.java
##########
@@ -83,21 +83,17 @@ public Object fromBytes(byte[] bytes) {
           throw new UnsupportedOperationException(String.format("Message type 
%s is not supported", type.name()));
       }
       return object;
-
     } catch (UnsupportedOperationException ue) {
       throw new SamzaException(ue);
     } catch (Exception e) {
       // This will catch the following exceptions:
       // 1) the first byte is not a valid type so it will cause 
ArrayOutOfBound exception
       // 2) the first byte happens to be a valid type, but the deserialization 
fails with certain exception
-      // For these cases, we fall back to user-provided serde
-      try {
-        return userMessageSerde.fromBytes(bytes);
-      } catch (Exception umse) {
-        LOGGER.error("Error deserializing from both intermediate message serde 
and user message serde. "
-            + "Original exception: ", e);
-        throw umse;
-      }
+      // For these cases, we WILL NOT fall back to user-provided serde. Thus, 
we are not compatible with upgrade
+      // directly from samza version older than 0.13.1.
+      LOGGER.error("Error deserializing with intermediate message serde. If 
you are upgrading from samza version older"
+          + " than 0.13.1, please upgrade to samza 1.5 first.");

Review comment:
       Just to clarify: Is the idea that this PR will go into Samza 1.6? So 
upgrading to samza 1.5 first will convert all intermediate messages into the 
new format, which means going to 1.6 after that will then be compatible?
   Maybe consider adding a comment here about why upgrading to Samza 1.5 will 
help.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to