Repository: incubator-beam
Updated Branches:
  refs/heads/master b7b68e6fb -> 2584ebeb8


Do not modify the context in NullableCoder

The NullableCoder does not encode any elements after the subcoder
encodes the input value for non-null values. As a result, the subcoder
should see the entire input stream if it is available.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/2c056b17
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/2c056b17
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/2c056b17

Branch: refs/heads/master
Commit: 2c056b171b6e329ca5c025eb6fbc81cff29a8950
Parents: b7b68e6
Author: Thomas Groh <tg...@google.com>
Authored: Fri Sep 23 08:58:07 2016 -0700
Committer: Dan Halperin <dhalp...@google.com>
Committed: Fri Oct 14 10:27:15 2016 -0700

----------------------------------------------------------------------
 .../apache/beam/sdk/coders/NullableCoder.java   |  4 +-
 .../beam/sdk/coders/NullableCoderTest.java      | 49 +++++++++++++++++---
 2 files changed, 44 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2c056b17/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/NullableCoder.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/NullableCoder.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/NullableCoder.java
index 9c6c7c0..29b697c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/NullableCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/NullableCoder.java
@@ -79,7 +79,7 @@ public class NullableCoder<T> extends StandardCoder<T> {
       outStream.write(ENCODE_NULL);
     } else {
       outStream.write(ENCODE_PRESENT);
-      valueCoder.encode(value, outStream, context.nested());
+      valueCoder.encode(value, outStream, context);
     }
   }
 
@@ -94,7 +94,7 @@ public class NullableCoder<T> extends StandardCoder<T> {
             "NullableCoder expects either a byte valued %s (null) or %s 
(present), got %s",
             ENCODE_NULL, ENCODE_PRESENT, b));
     }
-    return valueCoder.decode(inStream, context.nested());
+    return valueCoder.decode(inStream, context);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2c056b17/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/NullableCoderTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/NullableCoderTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/NullableCoderTest.java
index 61e7e41..425d5ba 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/NullableCoderTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/NullableCoderTest.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.sdk.coders;
 
+import static com.google.common.base.Preconditions.checkArgument;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.theInstance;
 import static org.junit.Assert.assertEquals;
@@ -26,9 +27,13 @@ import static org.junit.Assert.assertTrue;
 
 import com.google.common.collect.ImmutableList;
 import java.io.ByteArrayInputStream;
+import java.io.IOException;
 import java.io.InputStream;
+import java.io.OutputStream;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
+import org.apache.beam.sdk.coders.Coder.Context;
 import org.apache.beam.sdk.testing.CoderProperties;
 import org.junit.Rule;
 import org.junit.Test;
@@ -76,14 +81,15 @@ public class NullableCoderTest {
    * @see org.apache.beam.sdk.coders.PrintBase64Encodings
    */
   private static final List<String> TEST_ENCODINGS = Arrays.asList(
-      "AQA",
-      "AQFh",
-      "AQIxMw",
-      "AQVoZWxsbw",
+      "AQ",
+      "AWE",
+      "ATEz",
+      "AWhlbGxv",
       "AA",
-      "AShhIGxvbmdlciBzdHJpbmcgd2l0aCBzcGFjZXMgYW5kIGFsbCB0aGF0",
-      "ARlhIHN0cmluZyB3aXRoIGEgCiBuZXdsaW5l",
-      "AQ_jgrnjgr_jg6rjg7PjgrA");
+      "AWEgbG9uZ2VyIHN0cmluZyB3aXRoIHNwYWNlcyBhbmQgYWxsIHRoYXQ",
+      "AWEgc3RyaW5nIHdpdGggYSAKIG5ld2xpbmU",
+      "AeOCueOCv-ODquODs-OCsA"
+  );
 
   @Test
   public void testWireFormatEncode() throws Exception {
@@ -135,8 +141,37 @@ public class NullableCoderTest {
   }
 
   @Test
+  public void testSubcoderRecievesEntireStream() throws Exception {
+    NullableCoder<String> coder = NullableCoder.of(new 
EntireStreamExpectingCoder());
+
+    CoderProperties.coderDecodeEncodeEqualInContext(coder, Context.OUTER, 
null);
+    CoderProperties.coderDecodeEncodeEqualInContext(coder, Context.OUTER, 
"foo");
+  }
+
+  @Test
   public void testNestedNullableCoder() {
     NullableCoder<Double> coder = NullableCoder.of(DoubleCoder.of());
     assertThat(NullableCoder.of(coder), theInstance(coder));
   }
+
+  private static class EntireStreamExpectingCoder extends 
DeterministicStandardCoder<String> {
+    @Override
+    public void encode(
+        String value, OutputStream outStream, Context context) throws 
IOException {
+      checkArgument(context.isWholeStream, "Expected to get entire stream");
+      StringUtf8Coder.of().encode(value, outStream, context);
+    }
+
+    @Override
+    public String decode(InputStream inStream, Context context)
+        throws CoderException, IOException {
+      checkArgument(context.isWholeStream, "Expected to get entire stream");
+      return StringUtf8Coder.of().decode(inStream, context);
+    }
+
+    @Override
+    public List<? extends Coder<?>> getCoderArguments() {
+      return Collections.emptyList();
+    }
+  }
 }

Reply via email to