galenwarren commented on a change in pull request #152:
URL: https://github.com/apache/flink-statefun/pull/152#discussion_r491706635



##########
File path: 
statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/MessageTypeSerializer.java
##########
@@ -101,45 +102,67 @@ public int hashCode() {
 
   @Override
   public TypeSerializerSnapshot<Message> snapshotConfiguration() {
-    return new Snapshot(messageFactoryType);
+    return new Snapshot(messageFactoryKey);
   }
 
   private MessageFactory factory() {
     if (factory == null) {
-      factory = MessageFactory.forType(messageFactoryType);
+      factory = MessageFactory.forKey(messageFactoryKey);
     }
     return factory;
   }
 
   public static final class Snapshot implements 
TypeSerializerSnapshot<Message> {
-    private MessageFactoryType messageFactoryType;
+    private MessageFactoryKey messageFactoryKey;
 
     @SuppressWarnings("unused")
     public Snapshot() {}
 
-    Snapshot(MessageFactoryType messageFactoryType) {
-      this.messageFactoryType = messageFactoryType;
+    Snapshot(MessageFactoryKey messageFactoryKey) {
+      this.messageFactoryKey = messageFactoryKey;
+    }
+
+    // used in unit tests
+    MessageFactoryKey getMessageFactoryKey() {

Review comment:
       Done

##########
File path: 
statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/message/JavaPayloadSerializer.java
##########
@@ -0,0 +1,50 @@
+package org.apache.flink.statefun.flink.core.message;

Review comment:
       Fixed

##########
File path: 
statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfigValidator.java
##########
@@ -61,4 +64,26 @@ private static void 
validateParentFirstClassloaderPatterns(Configuration configu
     }
     return parentFirstClassloaderPatterns;
   }
+
+  private static void validateCustomPayloadSerializerClassName(Configuration 
configuration) {
+
+    MessageFactoryType factoryType =
+        configuration.get(StatefulFunctionsConfig.USER_MESSAGE_SERIALIZER);
+    String customPayloadSerializerClassName =
+        
configuration.get(StatefulFunctionsConfig.USER_MESSAGE_CUSTOM_PAYLOAD_SERIALIZER_CLASS);
+
+    if (factoryType == MessageFactoryType.WITH_CUSTOM_PAYLOADS) {
+      if 
(StringUtils.isNullOrWhitespaceOnly(customPayloadSerializerClassName)) {

Review comment:
       A question here -- as it stands now, the 
USER_MESSAGE_CUSTOM_PAYLOAD_SERIALIZER_CLASS config option has no default 
value, so if not set, its value will be null. So I believe I'd have to first 
check for null and then check length on the trimmed string. I'm happy to do 
that, but I thought I'd check first ...

##########
File path: 
statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/MessageFactoryKey.java
##########
@@ -0,0 +1,43 @@
+package org.apache.flink.statefun.flink.core.message;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+public class MessageFactoryKey implements Serializable {
+  private static final long serialVersionUID = 1L;
+
+  private final MessageFactoryType type;
+  private final String customPayloadSerializerClassName;
+
+  private MessageFactoryKey(MessageFactoryType type, String 
customPayloadSerializerClassName) {
+    this.type = type;
+    this.customPayloadSerializerClassName = customPayloadSerializerClassName;
+  }
+
+  public static MessageFactoryKey forType(
+      MessageFactoryType type, String customPayloadSerializerClassName) {
+    return new MessageFactoryKey(type, customPayloadSerializerClassName);
+  }
+
+  public MessageFactoryType getType() {
+    return this.type;
+  }
+
+  public String getCustomPayloadSerializerClassName() {

Review comment:
       As it stands now, this method gets called from 
MessageTypeSerializer.Snapshot.writeSnapshot, for all types of message 
serializers, not just custom ones. For non-custom ones, a null is returned and 
this is the value that is written into the snapshot, after the 
MesssageFactoryType. So, the version 2 serialized format is always a 
MessageFactoryType value and then the custom payload serializer class name 
(string, which can be null), for all types of serializers.
   
   The writeSnapshot method could be changed to only call 
getCustomPayloadSerializerClassName in the event that the serializer is custom, 
and then to either 1) force a null serializer class name to be written to the 
snapshot or 2) skip the write of the serializer class name altogether in the 
non-custom case. This would make the read/writeSnapshot code and testing a bit 
more complicated but would definitely be doable, would one of those options be 
better?

##########
File path: 
statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfigValidator.java
##########
@@ -61,4 +64,26 @@ private static void 
validateParentFirstClassloaderPatterns(Configuration configu
     }
     return parentFirstClassloaderPatterns;
   }
+
+  private static void validateCustomPayloadSerializerClassName(Configuration 
configuration) {
+
+    MessageFactoryType factoryType =
+        configuration.get(StatefulFunctionsConfig.USER_MESSAGE_SERIALIZER);
+    String customPayloadSerializerClassName =
+        
configuration.get(StatefulFunctionsConfig.USER_MESSAGE_CUSTOM_PAYLOAD_SERIALIZER_CLASS);
+
+    if (factoryType == MessageFactoryType.WITH_CUSTOM_PAYLOADS) {
+      if 
(StringUtils.isNullOrWhitespaceOnly(customPayloadSerializerClassName)) {

Review comment:
       A question here -- as it stands now, the 
USER_MESSAGE_CUSTOM_PAYLOAD_SERIALIZER_CLASS config option has no default 
value, so if not set, its value will be null. So I believe I'd have to first 
check for null and then check length on the trimmed string. I'm happy to do 
that, but I thought I'd check first in case that affects things ...
   
   Or should USER_MESSAGE_CUSTOM_PAYLOAD_SERIALIZER_CLASS have an empty string 
as the default value?

##########
File path: 
statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfigValidator.java
##########
@@ -61,4 +64,26 @@ private static void 
validateParentFirstClassloaderPatterns(Configuration configu
     }
     return parentFirstClassloaderPatterns;
   }
+
+  private static void validateCustomPayloadSerializerClassName(Configuration 
configuration) {
+
+    MessageFactoryType factoryType =
+        configuration.get(StatefulFunctionsConfig.USER_MESSAGE_SERIALIZER);
+    String customPayloadSerializerClassName =
+        
configuration.get(StatefulFunctionsConfig.USER_MESSAGE_CUSTOM_PAYLOAD_SERIALIZER_CLASS);
+
+    if (factoryType == MessageFactoryType.WITH_CUSTOM_PAYLOADS) {
+      if 
(StringUtils.isNullOrWhitespaceOnly(customPayloadSerializerClassName)) {

Review comment:
       OK will do

##########
File path: 
statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/MessageFactoryKey.java
##########
@@ -0,0 +1,43 @@
+package org.apache.flink.statefun.flink.core.message;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+public class MessageFactoryKey implements Serializable {

Review comment:
       Yes, and it will be included in the next push

##########
File path: 
statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/MessageFactoryKey.java
##########
@@ -0,0 +1,43 @@
+package org.apache.flink.statefun.flink.core.message;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+public class MessageFactoryKey implements Serializable {
+  private static final long serialVersionUID = 1L;
+
+  private final MessageFactoryType type;
+  private final String customPayloadSerializerClassName;
+
+  private MessageFactoryKey(MessageFactoryType type, String 
customPayloadSerializerClassName) {
+    this.type = type;
+    this.customPayloadSerializerClassName = customPayloadSerializerClassName;
+  }
+
+  public static MessageFactoryKey forType(
+      MessageFactoryType type, String customPayloadSerializerClassName) {
+    return new MessageFactoryKey(type, customPayloadSerializerClassName);
+  }
+
+  public MessageFactoryType getType() {
+    return this.type;
+  }
+
+  public String getCustomPayloadSerializerClassName() {

Review comment:
       For some reason, before I couldn't comment here, but now I can. Weird 
glitch. Anyway, I left a top-level comment/question for you on this one, just 
FYI.

##########
File path: 
statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/MessageFactoryKey.java
##########
@@ -0,0 +1,43 @@
+package org.apache.flink.statefun.flink.core.message;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+public class MessageFactoryKey implements Serializable {
+  private static final long serialVersionUID = 1L;
+
+  private final MessageFactoryType type;
+  private final String customPayloadSerializerClassName;
+
+  private MessageFactoryKey(MessageFactoryType type, String 
customPayloadSerializerClassName) {
+    this.type = type;

Review comment:
       Yes, will be fixed in next push




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to