igalshilman commented on a change in pull request #152:
URL: https://github.com/apache/flink-statefun/pull/152#discussion_r490279224
##########
File path:
statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfigValidator.java
##########
@@ -61,4 +64,26 @@ private static void
validateParentFirstClassloaderPatterns(Configuration configu
}
return parentFirstClassloaderPatterns;
}
+
+ private static void validateCustomPayloadSerializerClassName(Configuration
configuration) {
+
+ MessageFactoryType factoryType =
+ configuration.get(StatefulFunctionsConfig.USER_MESSAGE_SERIALIZER);
+ String customPayloadSerializerClassName =
+
configuration.get(StatefulFunctionsConfig.USER_MESSAGE_CUSTOM_PAYLOAD_SERIALIZER_CLASS);
+
+ if (factoryType == MessageFactoryType.WITH_CUSTOM_PAYLOADS) {
+ if
(StringUtils.isNullOrWhitespaceOnly(customPayloadSerializerClassName)) {
Review comment:
I think that it could be just replaced with a call to ‘.trim()’ and then
checking if the string is empty.
To avoid this dependency (I know it is currently there)
##########
File path:
statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/MessageFactoryKey.java
##########
@@ -0,0 +1,43 @@
+package org.apache.flink.statefun.flink.core.message;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+public class MessageFactoryKey implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private final MessageFactoryType type;
+ private final String customPayloadSerializerClassName;
+
+ private MessageFactoryKey(MessageFactoryType type, String
customPayloadSerializerClassName) {
+ this.type = type;
+ this.customPayloadSerializerClassName = customPayloadSerializerClassName;
+ }
+
+ public static MessageFactoryKey forType(
+ MessageFactoryType type, String customPayloadSerializerClassName) {
+ return new MessageFactoryKey(type, customPayloadSerializerClassName);
+ }
+
+ public MessageFactoryType getType() {
+ return this.type;
+ }
+
+ public String getCustomPayloadSerializerClassName() {
Review comment:
Can you add here a Precondtions.checkState that indeed the serializer is
custom?
##########
File path:
statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/message/JavaPayloadSerializer.java
##########
@@ -0,0 +1,50 @@
+package org.apache.flink.statefun.flink.core.message;
Review comment:
Missing license header.
##########
File path:
statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/MessageTypeSerializer.java
##########
@@ -101,45 +102,67 @@ public int hashCode() {
@Override
public TypeSerializerSnapshot<Message> snapshotConfiguration() {
- return new Snapshot(messageFactoryType);
+ return new Snapshot(messageFactoryKey);
}
private MessageFactory factory() {
if (factory == null) {
- factory = MessageFactory.forType(messageFactoryType);
+ factory = MessageFactory.forKey(messageFactoryKey);
}
return factory;
}
public static final class Snapshot implements
TypeSerializerSnapshot<Message> {
- private MessageFactoryType messageFactoryType;
+ private MessageFactoryKey messageFactoryKey;
@SuppressWarnings("unused")
public Snapshot() {}
- Snapshot(MessageFactoryType messageFactoryType) {
- this.messageFactoryType = messageFactoryType;
+ Snapshot(MessageFactoryKey messageFactoryKey) {
+ this.messageFactoryKey = messageFactoryKey;
+ }
+
+ // used in unit tests
+ MessageFactoryKey getMessageFactoryKey() {
Review comment:
Can you annotate this method with @VisibleForTesting annotation?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]