[
https://issues.apache.org/jira/browse/BEAM-10529?focusedWorklogId=750301&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-750301
]
ASF GitHub Bot logged work on BEAM-10529:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 30/Mar/22 17:46
Start Date: 30/Mar/22 17:46
Worklog Time Spent: 10m
Work Description: lostluck commented on a change in pull request #16923:
URL: https://github.com/apache/beam/pull/16923#discussion_r838700467
##########
File path: sdks/go/pkg/beam/core/graph/coder/coder.go
##########
@@ -394,6 +395,21 @@ func NewKV(components []*Coder) *Coder {
}
}
+func NewN(component *Coder) *Coder {
+ coders := make([]*Coder, 1)
+ coders[0] = component
+ checkCodersNotNil(coders)
+ return &Coder{
+ Kind: Nullable,
+ T: typex.New(typex.NullableType, component.T),
+ Components: []*Coder{component},
Review comment:
```suggestion
Components: coders,
```
##########
File path: sdks/go/pkg/beam/core/graph/coder/coder.go
##########
@@ -394,6 +395,21 @@ func NewKV(components []*Coder) *Coder {
}
}
+func NewN(component *Coder) *Coder {
+ coders := make([]*Coder, 1)
+ coders[0] = component
Review comment:
```suggestion
coders := []*Coder{ component }
```
##########
File path: model/pipeline/src/main/proto/beam_runner_api.proto
##########
@@ -1085,6 +1082,14 @@ message StandardCoders {
// Components: the user key coder.
// Experimental.
SHARDED_KEY = 15 [(beam_urn) = "beam:coder:sharded_key:v1"];
+
+ // Wraps a coder of a potentially null value
+ // A Nullable Type is encoded by:
+ // - A one byte null indicator, 0x00 for null values, or 0x01 for present
+ // values.
+ // - For present values the null indicator is followed by the value
+ // encoded with it's corresponding coder.
Review comment:
While it feels redundant, please include the conventional `//
Components: Coder for the value.` or similar.
It codifies that this coder has 1 component (and not 0, or 2 or arbitrary),
and being consistent makes docs easier to read.
##########
File path: sdks/go/pkg/beam/core/runtime/exec/coder.go
##########
@@ -609,6 +621,56 @@ func convertIfNeeded(v interface{}, allocated *FullValue)
*FullValue {
return allocated
}
+type nullableEncoder struct {
+ inner ElementEncoder
+ be boolEncoder
+}
+
+func (n nullableEncoder) Encode(value *FullValue, writer io.Writer) error {
+ if value.Elm == nil {
+ if err := n.be.Encode(&FullValue{Elm: false}, writer); err !=
nil {
+ return err
+ }
+ return nil
+ }
+ if err := n.be.Encode(&FullValue{Elm: true}, writer); err != nil {
+ return err
+ }
+ if err := n.inner.Encode(value, writer); err != nil {
+ return err
+ }
+ return nil
+}
+
+type nullableDecoder struct {
+ inner ElementDecoder
+ bd boolDecoder
+}
+
+func (n nullableDecoder) Decode(reader io.Reader) (*FullValue, error) {
+ hasValue, err := n.bd.Decode(reader)
+ if err != nil {
+ return nil, err
+ }
+ if !hasValue.Elm.(bool) {
+ return &FullValue{}, nil
+ }
+ val, err := n.inner.Decode(reader)
+ if err != nil {
+ return nil, err
+ }
+ return val, nil
+}
+
+func (n nullableDecoder) DecodeTo(reader io.Reader, value *FullValue) error {
Review comment:
Please have these methods on pointer receivers, not the value receivers.
(*nullableDecoder here and *nullableEncoder above). This keeps them consistent
with the rest of the file, which helps readability, and avoids unnecessary
usage bugs.
##########
File path: sdks/go/pkg/beam/core/graph/coder/coder_test.go
##########
@@ -517,6 +524,59 @@ func TestNewKV(t *testing.T) {
}
}
+func TestNewNullable(t *testing.T) {
+ bytes := NewBytes()
+
+ tests := []struct {
+ name string
+ component *Coder
+ shouldpanic bool
+ want *Coder
+ }{{
Review comment:
+1
In this case, you should split the `}}` with a new line, and then run
[`gofmt`](https://go.dev/blog/gofmt) on this file.
I would strongly recommend having your IDE do so automatically on save
anyway.
##########
File path: sdks/go/test/regression/coders/fromyaml/fromyaml.go
##########
@@ -218,6 +218,19 @@ func diff(c Coder, elem *exec.FullValue, eg yaml.MapItem)
bool {
}
return pass
+ case "beam:coder:nullable:v1":
+ if elem.Elm == nil || eg.Value == nil {
+ got, want = elem.Elm, eg.Value
+ } else {
+ got = string(elem.Elm.([]byte))
Review comment:
This code has design problems, but they pre-exist John's addition, so
he's largely following the existing structure. In this case, this was an
unscheduled task that I had to do in a hurry, with little oversight, and
improving it has never been a priority. This can certainly be improved
significantly, but likely in a separate PR.
So generally, I agree with Danny here. But specifically...
1. As this is specific purpose comparison code, it is true it may blow up,
but it will only blow up with a runtime type assertion error when someone is
adding new/strange cases to it. The [defer'd
recover](https://github.com/apache/beam/blob/488e1d165e121d757fac8b9260d004de93dbd338/sdks/go/test/regression/coders/fromyaml/fromyaml.go#L122)
at the top of the encoder case loop will narrow down where the problem is, if
not the actual example that blew up, which should be enough information to
determine the offending types.
2. Agreed, in this case, there's probably a separate bug going on. But this
is how the plain `bytes` coder is handling it. IIRC there was something very
odd with the YAML parsing which lead to flaky interpretations of the types. It
was easier to work around it at the time.
3. In this case, I disagree: Downstream the `want` and `got` are compared
with `cmp`, and that will produce a much clearer error message than what we
could add here, along with the full coder information.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 750301)
Time Spent: 17h 20m (was: 17h 10m)
> Kafka XLang fails for ?empty? key/values
> ----------------------------------------
>
> Key: BEAM-10529
> URL: https://issues.apache.org/jira/browse/BEAM-10529
> Project: Beam
> Issue Type: Bug
> Components: cross-language, io-java-kafka
> Reporter: Luke Cwik
> Assignee: John Casey
> Priority: P1
> Time Spent: 17h 20m
> Remaining Estimate: 0h
>
> It looks like the Javadoc for ByteArrayDeserializer and StringDeserializer
> can return null[1, 2] and we aren't using
> NullableCoder.of(ByteArrayCoder.of()) in the expansion[3]. Note that KafkaIO
> does this correctly in its regular coder inference logic[4].
> 1:
> [https://kafka.apache.org/21/javadoc/org/apache/kafka/common/serialization/ByteArrayDeserializer.html#deserialize-java.lang.String-byte:A-|https://kafka.apache.org/21/javadoc/org/apache/kafka/common/serialization/ByteArrayDeserializer.html#deserialize-java.lang.String-byte:A-2:]
> [2:|https://kafka.apache.org/21/javadoc/org/apache/kafka/common/serialization/ByteArrayDeserializer.html#deserialize-java.lang.String-byte:A-2:]
>
> [https://kafka.apache.org/21/javadoc/org/apache/kafka/common/serialization/StringDeserializer.html#deserialize-java.lang.String-byte:A-]
> 3:
> [https://github.com/apache/beam/blob/af2d6b0379d64b522ecb769d88e9e7e7b8900208/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L478]
> 4:
> [https://github.com/apache/beam/blob/af2d6b0379d64b522ecb769d88e9e7e7b8900208/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/LocalDeserializerProvider.java#L85]
--
This message was sent by Atlassian Jira
(v8.20.1#820001)