[FLINK-8916][REST] Write/read checkpointing mode enum in lower case This closes #5679.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3ad2489a Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3ad2489a Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3ad2489a Branch: refs/heads/release-1.5 Commit: 3ad2489ac4cac39c6e3e82f9745e1f52ad0b0f5f Parents: ed04f9c Author: yanghua <[email protected]> Authored: Mon Mar 12 12:12:56 2018 +0800 Committer: zentol <[email protected]> Committed: Wed Mar 14 20:47:27 2018 +0100 ---------------------------------------------------------------------- .../checkpoints/CheckpointConfigInfo.java | 46 +++++++++++++++++++- 1 file changed, 45 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/3ad2489a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointConfigInfo.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointConfigInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointConfigInfo.java index 7a5d99f..b0f6abf 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointConfigInfo.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointConfigInfo.java @@ -24,7 +24,16 @@ import org.apache.flink.util.Preconditions; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; - +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.SerializerProvider; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.StdSerializer; + +import java.io.IOException; import java.util.Objects; /** @@ -145,8 +154,43 @@ public class CheckpointConfigInfo implements ResponseBody { /** * Processing mode. */ + @JsonSerialize(using = ProcessingModeSerializer.class) + @JsonDeserialize(using = ProcessingModeDeserializer.class) public enum ProcessingMode { AT_LEAST_ONCE, EXACTLY_ONCE } + + /** + * JSON deserializer for {@link ProcessingMode}. + */ + public static class ProcessingModeSerializer extends StdSerializer<ProcessingMode> { + + public ProcessingModeSerializer() { + super(ProcessingMode.class); + } + + @Override + public void serialize(ProcessingMode mode, JsonGenerator generator, SerializerProvider serializerProvider) + throws IOException { + generator.writeString(mode.name().toLowerCase()); + } + } + + /** + * Processing mode deserializer. + */ + public static class ProcessingModeDeserializer extends StdDeserializer<ProcessingMode> { + + public ProcessingModeDeserializer() { + super(ProcessingMode.class); + } + + @Override + public ProcessingMode deserialize(JsonParser jsonParser, DeserializationContext deserializationContext) + throws IOException { + return ProcessingMode.valueOf(jsonParser.getValueAsString().toUpperCase()); + } + } + }
