Repository: beam Updated Branches: refs/heads/master ddde35327 -> 9db5f746a
[BEAM-1273] Error with FlinkPipelineOptions serialization after setStateBackend Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b5bbadf5 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b5bbadf5 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b5bbadf5 Branch: refs/heads/master Commit: b5bbadf59625563d1755f66a24d27c96c5fd3492 Parents: ddde353 Author: Alexey Diomin <[email protected]> Authored: Mon Jan 16 14:46:08 2017 +0400 Committer: Aljoscha Krettek <[email protected]> Committed: Mon Jan 23 17:22:58 2017 +0100 ---------------------------------------------------------------------- .../beam/runners/flink/FlinkPipelineOptions.java | 6 +++--- .../apache/beam/runners/flink/PipelineOptionsTest.java | 13 +++++++++++++ 2 files changed, 16 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/b5bbadf5/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java index 3bb358e..ef9afea 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java @@ -89,13 +89,13 @@ public interface FlinkPipelineOptions void setObjectReuse(Boolean reuse); /** - * Sets a state backend to store Beam's state during computation. + * State backend to store Beam's state during computation. * Note: Only applicable when executing in streaming mode. - * @param stateBackend The state backend to use */ @Description("Sets the state backend to use in streaming mode. " + "Otherwise the default is read from the Flink config.") - void setStateBackend(AbstractStateBackend stateBackend); + @JsonIgnore AbstractStateBackend getStateBackend(); + void setStateBackend(AbstractStateBackend stateBackend); } http://git-wip-us.apache.org/repos/asf/beam/blob/b5bbadf5/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java index 4c97cc7..23bc6a2 100644 --- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java +++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java @@ -19,6 +19,7 @@ package org.apache.beam.runners.flink; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import java.util.Collections; @@ -40,6 +41,7 @@ import org.apache.commons.lang.SerializationUtils; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.runtime.state.memory.MemoryStateBackend; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; import org.joda.time.Instant; @@ -80,6 +82,17 @@ public class PipelineOptionsTest { } @Test + public void testIgnoredFieldSerialization() { + FlinkPipelineOptions options = PipelineOptionsFactory.as(FlinkPipelineOptions.class); + options.setStateBackend(new MemoryStateBackend()); + + FlinkPipelineOptions deserialized = + new SerializedPipelineOptions(options).getPipelineOptions().as(FlinkPipelineOptions.class); + + assertNull(deserialized.getStateBackend()); + } + + @Test public void testCaching() { PipelineOptions deserializedOptions = serializedOptions.getPipelineOptions().as(PipelineOptions.class);
