galenwarren commented on a change in pull request #152:
URL: https://github.com/apache/flink-statefun/pull/152#discussion_r491706635
##########
File path:
statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/MessageTypeSerializer.java
##########
@@ -101,45 +102,67 @@ public int hashCode() {
@Override
public TypeSerializerSnapshot<Message> snapshotConfiguration() {
- return new Snapshot(messageFactoryType);
+ return new Snapshot(messageFactoryKey);
}
private MessageFactory factory() {
if (factory == null) {
- factory = MessageFactory.forType(messageFactoryType);
+ factory = MessageFactory.forKey(messageFactoryKey);
}
return factory;
}
public static final class Snapshot implements
TypeSerializerSnapshot<Message> {
- private MessageFactoryType messageFactoryType;
+ private MessageFactoryKey messageFactoryKey;
@SuppressWarnings("unused")
public Snapshot() {}
- Snapshot(MessageFactoryType messageFactoryType) {
- this.messageFactoryType = messageFactoryType;
+ Snapshot(MessageFactoryKey messageFactoryKey) {
+ this.messageFactoryKey = messageFactoryKey;
+ }
+
+ // used in unit tests
+ MessageFactoryKey getMessageFactoryKey() {
Review comment:
Done
##########
File path:
statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/message/JavaPayloadSerializer.java
##########
@@ -0,0 +1,50 @@
+package org.apache.flink.statefun.flink.core.message;
Review comment:
Fixed
##########
File path:
statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfigValidator.java
##########
@@ -61,4 +64,26 @@ private static void
validateParentFirstClassloaderPatterns(Configuration configu
}
return parentFirstClassloaderPatterns;
}
+
+ private static void validateCustomPayloadSerializerClassName(Configuration
configuration) {
+
+ MessageFactoryType factoryType =
+ configuration.get(StatefulFunctionsConfig.USER_MESSAGE_SERIALIZER);
+ String customPayloadSerializerClassName =
+
configuration.get(StatefulFunctionsConfig.USER_MESSAGE_CUSTOM_PAYLOAD_SERIALIZER_CLASS);
+
+ if (factoryType == MessageFactoryType.WITH_CUSTOM_PAYLOADS) {
+ if
(StringUtils.isNullOrWhitespaceOnly(customPayloadSerializerClassName)) {
Review comment:
A question here -- as it stands now, the
USER_MESSAGE_CUSTOM_PAYLOAD_SERIALIZER_CLASS config option has no default
value, so if not set, its value will be null. So I believe I'd have to first
check for null and then check length on the trimmed string. I'm happy to do
that, but I thought I'd check first ...
##########
File path:
statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/MessageFactoryKey.java
##########
@@ -0,0 +1,43 @@
+package org.apache.flink.statefun.flink.core.message;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+public class MessageFactoryKey implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private final MessageFactoryType type;
+ private final String customPayloadSerializerClassName;
+
+ private MessageFactoryKey(MessageFactoryType type, String
customPayloadSerializerClassName) {
+ this.type = type;
+ this.customPayloadSerializerClassName = customPayloadSerializerClassName;
+ }
+
+ public static MessageFactoryKey forType(
+ MessageFactoryType type, String customPayloadSerializerClassName) {
+ return new MessageFactoryKey(type, customPayloadSerializerClassName);
+ }
+
+ public MessageFactoryType getType() {
+ return this.type;
+ }
+
+ public String getCustomPayloadSerializerClassName() {
Review comment:
As it stands now, this method gets called from
MessageTypeSerializer.Snapshot.writeSnapshot, for all types of message
serializers, not just custom ones. For non-custom ones, a null is returned and
this is the value that is written into the snapshot, after the
MesssageFactoryType. So, the version 2 serialized format is always a
MessageFactoryType value and then the custom payload serializer class name
(string, which can be null), for all types of serializers.
The writeSnapshot method could be changed to only call
getCustomPayloadSerializerClassName in the event that the serializer is custom,
and then to either 1) force a null serializer class name to be written to the
snapshot or 2) skip the write of the serializer class name altogether in the
non-custom case. This would make the read/writeSnapshot code and testing a bit
more complicated but would definitely be doable, would one of those options be
better?
##########
File path:
statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfigValidator.java
##########
@@ -61,4 +64,26 @@ private static void
validateParentFirstClassloaderPatterns(Configuration configu
}
return parentFirstClassloaderPatterns;
}
+
+ private static void validateCustomPayloadSerializerClassName(Configuration
configuration) {
+
+ MessageFactoryType factoryType =
+ configuration.get(StatefulFunctionsConfig.USER_MESSAGE_SERIALIZER);
+ String customPayloadSerializerClassName =
+
configuration.get(StatefulFunctionsConfig.USER_MESSAGE_CUSTOM_PAYLOAD_SERIALIZER_CLASS);
+
+ if (factoryType == MessageFactoryType.WITH_CUSTOM_PAYLOADS) {
+ if
(StringUtils.isNullOrWhitespaceOnly(customPayloadSerializerClassName)) {
Review comment:
A question here -- as it stands now, the
USER_MESSAGE_CUSTOM_PAYLOAD_SERIALIZER_CLASS config option has no default
value, so if not set, its value will be null. So I believe I'd have to first
check for null and then check length on the trimmed string. I'm happy to do
that, but I thought I'd check first in case that affects things ...
Or should USER_MESSAGE_CUSTOM_PAYLOAD_SERIALIZER_CLASS have an empty string
as the default value?
##########
File path:
statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfigValidator.java
##########
@@ -61,4 +64,26 @@ private static void
validateParentFirstClassloaderPatterns(Configuration configu
}
return parentFirstClassloaderPatterns;
}
+
+ private static void validateCustomPayloadSerializerClassName(Configuration
configuration) {
+
+ MessageFactoryType factoryType =
+ configuration.get(StatefulFunctionsConfig.USER_MESSAGE_SERIALIZER);
+ String customPayloadSerializerClassName =
+
configuration.get(StatefulFunctionsConfig.USER_MESSAGE_CUSTOM_PAYLOAD_SERIALIZER_CLASS);
+
+ if (factoryType == MessageFactoryType.WITH_CUSTOM_PAYLOADS) {
+ if
(StringUtils.isNullOrWhitespaceOnly(customPayloadSerializerClassName)) {
Review comment:
OK will do
##########
File path:
statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/MessageFactoryKey.java
##########
@@ -0,0 +1,43 @@
+package org.apache.flink.statefun.flink.core.message;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+public class MessageFactoryKey implements Serializable {
Review comment:
Yes, and it will be included in the next push
##########
File path:
statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/MessageFactoryKey.java
##########
@@ -0,0 +1,43 @@
+package org.apache.flink.statefun.flink.core.message;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+public class MessageFactoryKey implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private final MessageFactoryType type;
+ private final String customPayloadSerializerClassName;
+
+ private MessageFactoryKey(MessageFactoryType type, String
customPayloadSerializerClassName) {
+ this.type = type;
+ this.customPayloadSerializerClassName = customPayloadSerializerClassName;
+ }
+
+ public static MessageFactoryKey forType(
+ MessageFactoryType type, String customPayloadSerializerClassName) {
+ return new MessageFactoryKey(type, customPayloadSerializerClassName);
+ }
+
+ public MessageFactoryType getType() {
+ return this.type;
+ }
+
+ public String getCustomPayloadSerializerClassName() {
Review comment:
For some reason, before I couldn't comment here, but now I can. Weird
glitch. Anyway, I left a top-level comment/question for you on this one, just
FYI.
##########
File path:
statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/MessageFactoryKey.java
##########
@@ -0,0 +1,43 @@
+package org.apache.flink.statefun.flink.core.message;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+public class MessageFactoryKey implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private final MessageFactoryType type;
+ private final String customPayloadSerializerClassName;
+
+ private MessageFactoryKey(MessageFactoryType type, String
customPayloadSerializerClassName) {
+ this.type = type;
Review comment:
Yes, will be fixed in next push
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]