TheNeuralBit commented on a change in pull request #16923:
URL: https://github.com/apache/beam/pull/16923#discussion_r813234030



##########
File path: model/pipeline/src/main/proto/beam_runner_api.proto
##########
@@ -1081,6 +1081,14 @@ message StandardCoders {
     // Components: the user key coder.
     // Experimental.
     SHARDED_KEY = 15 [(beam_urn) = "beam:coder:sharded_key:v1"];
+
+    //Wraps a coder of a potentially null value
+    //
+    // A Nullable coder encodes nullable values of wrapped coder value that 
does
+    // not tolerate null values. A Nullable coder uses exactly 1 byte per entry
+    // to indicate whether the value is null, then adds the encoding of the
+    // inner coder for non-null values.

Review comment:
       Could you have this clarify that 1 implies non-null? It would also be 
nice to update the row coder doc (under "Nullable types in container types 
(ArrayType, MapType)") to refer to this

##########
File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
##########
@@ -776,11 +777,11 @@ private static Coder resolveCoder(Class deserializer) {
               continue;
             }
             if (returnType.equals(byte[].class)) {
-              return ByteArrayCoder.of();
+              return NullableCoder.of(ByteArrayCoder.of());
             } else if (returnType.equals(Integer.class)) {
-              return VarIntCoder.of();
+              return NullableCoder.of(VarIntCoder.of());
             } else if (returnType.equals(Long.class)) {
-              return VarLongCoder.of();
+              return NullableCoder.of(VarLongCoder.of());

Review comment:
       Is this going to break the Go Kafka connector?
   
   CC: @youngoli 

##########
File path: sdks/python/apache_beam/coders/typecoders.py
##########
@@ -138,6 +137,10 @@ def get_coder(self, typehint):
         return coders.IterableCoder.from_type_hint(typehint, self)
       elif isinstance(typehint, typehints.ListConstraint):
         return coders.ListCoder.from_type_hint(typehint, self)
+      elif (isinstance(typehint, typehints.UnionConstraint) and
+            typehint.contains_type(type(None) and
+                                   len(list(typehint._inner_types())) == 2)):

Review comment:
       Could you extract this logic into an `is_optional` helper in 
`typehints.py`?
   
   It also looks like there may be a paren mistake, the `len` is inside the 
`contains_type` call. 

##########
File path: sdks/python/apache_beam/coders/typecoders_test.py
##########
@@ -140,6 +140,14 @@ def test_list_coder(self):
     self.assertIs(
         list, type(expected_coder.decode(expected_coder.encode(values))))
 
+  def test_nullable_coder(self):
+    expected_coder = coders.NullableCoder(coders.BytesCoder())
+    real_coder = typecoders.registry.get_coder(
+        typehints.UnionConstraint([type(None), type(bytes)]))

Review comment:
       nit: could you construct this with `Optional` instead of using `Union` 
directly?
   
   Also I think `bytes` is already a type:
   ```py
   >>> bytes
   <class 'bytes'>
   >>> type(bytes)
   <class 'type'>
   ```
   
   Shouldn't this be `Optional[bytes]`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to