Repository: beam
Updated Branches:
  refs/heads/master ddde35327 -> 9db5f746a


[BEAM-1273] Error with FlinkPipelineOptions serialization after setStateBackend


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b5bbadf5
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b5bbadf5
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b5bbadf5

Branch: refs/heads/master
Commit: b5bbadf59625563d1755f66a24d27c96c5fd3492
Parents: ddde353
Author: Alexey Diomin <[email protected]>
Authored: Mon Jan 16 14:46:08 2017 +0400
Committer: Aljoscha Krettek <[email protected]>
Committed: Mon Jan 23 17:22:58 2017 +0100

----------------------------------------------------------------------
 .../beam/runners/flink/FlinkPipelineOptions.java       |  6 +++---
 .../apache/beam/runners/flink/PipelineOptionsTest.java | 13 +++++++++++++
 2 files changed, 16 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/b5bbadf5/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
index 3bb358e..ef9afea 100644
--- 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
+++ 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
@@ -89,13 +89,13 @@ public interface FlinkPipelineOptions
   void setObjectReuse(Boolean reuse);
 
   /**
-   * Sets a state backend to store Beam's state during computation.
+   * State backend to store Beam's state during computation.
    * Note: Only applicable when executing in streaming mode.
-   * @param stateBackend The state backend to use
    */
   @Description("Sets the state backend to use in streaming mode. "
       + "Otherwise the default is read from the Flink config.")
-  void setStateBackend(AbstractStateBackend stateBackend);
+  @JsonIgnore
   AbstractStateBackend getStateBackend();
+  void setStateBackend(AbstractStateBackend stateBackend);
 
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/b5bbadf5/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
 
b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
index 4c97cc7..23bc6a2 100644
--- 
a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
+++ 
b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
@@ -19,6 +19,7 @@ package org.apache.beam.runners.flink;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
 import java.util.Collections;
@@ -40,6 +41,7 @@ import org.apache.commons.lang.SerializationUtils;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.TypeHint;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.joda.time.Instant;
@@ -80,6 +82,17 @@ public class PipelineOptionsTest {
   }
 
   @Test
+  public void testIgnoredFieldSerialization() {
+    FlinkPipelineOptions options = 
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+    options.setStateBackend(new MemoryStateBackend());
+
+    FlinkPipelineOptions deserialized =
+        new 
SerializedPipelineOptions(options).getPipelineOptions().as(FlinkPipelineOptions.class);
+
+    assertNull(deserialized.getStateBackend());
+  }
+
+  @Test
   public void testCaching() {
     PipelineOptions deserializedOptions =
         serializedOptions.getPipelineOptions().as(PipelineOptions.class);

Reply via email to