riteshghorse commented on code in PR #22919:
URL: https://github.com/apache/beam/pull/22919#discussion_r957414548
##########
sdks/go/pkg/beam/core/state/state_test.go:
##########
@@ -684,3 +684,171 @@ func TestMapPut(t *testing.T) {
}
}
}
+
+func TestSetContains(t *testing.T) {
+ is := make(map[string]interface{})
+ im := make(map[string]map[string]interface{})
+ ts := make(map[string][]Transaction)
+ es := make(map[string]error)
+ ca := make(map[string]bool)
+ eo := make(map[string]bool)
+ ts["no_transactions"] = nil
+ im["basic_set"] = map[string]interface{}{}
+ ts["basic_set"] = []Transaction{{Key: "basic_set", Type:
TransactionTypeSet, Val: true, MapKey: "foo"}, {Key: "basic_set", Type:
TransactionTypeSet, Val: true, MapKey: "bar"}}
+ im["basic_clear"] = map[string]interface{}{"foo": true, "bar": true}
+ ts["basic_clear"] = []Transaction{{Key: "basic_clear", Type:
TransactionTypeClear, Val: nil, MapKey: "foo"}}
+ im["set_then_clear"] = map[string]interface{}{"foo": true, "bar": true}
+ ts["set_then_clear"] = []Transaction{{Key: "set_then_clear", Type:
TransactionTypeSet, Val: true, MapKey: "foo"}, {Key: "set_then_clear", Type:
TransactionTypeClear, Val: nil, MapKey: "foo"}}
+ im["err"] = map[string]interface{}{"foo": true}
+ ts["err"] = []Transaction{{Key: "err", Type: TransactionTypeSet, Val:
true}}
+ es["err"] = errFake
+
+ f := fakeProvider{
+ initialMapState: im,
+ initialState: is,
+ transactions: ts,
+ err: es,
+ createAccumForKey: ca,
+ extractOutForKey: eo,
+ }
+
+ var tests = []struct {
+ vs Set[string]
+ fooOk bool
+ barOk bool
+ err error
+ }{
+ {MakeSetState[string]("no_transactions"), false, false, nil},
+ {MakeSetState[string]("basic_set"), true, true, nil},
+ {MakeSetState[string]("basic_clear"), false, true, nil},
+ {MakeSetState[string]("set_then_clear"), false, true, nil},
+ {MakeSetState[string]("err"), false, false, errFake},
+ }
+
+ for _, tt := range tests {
+ ok, err := tt.vs.Contains(&f, "foo")
+ if err != nil && tt.err == nil {
+ t.Errorf("Set.Contains() returned error %v for state
key %v entry foo when it shouldn't have", err, tt.vs.Key)
+ } else if err == nil && tt.err != nil {
+ t.Errorf("Set.Contains() returned no error for state
key %v entry foo when it should have returned %v", tt.vs.Key, err)
+ } else if ok && !tt.fooOk {
+ t.Errorf("Set.Contains() returned true for state key %v
entry foo when it shouldn't have", tt.vs.Key)
+ } else if !ok && tt.fooOk {
+ t.Errorf("Set.Contains() returned false for state key
%v entry foo when it should have returned true", tt.vs.Key)
+ }
+ ok, err = tt.vs.Contains(&f, "bar")
+ if err != nil && tt.err == nil {
+ t.Errorf("Set.Contains() returned error %v for state
key %v entry bar when it shouldn't have", err, tt.vs.Key)
+ } else if err == nil && tt.err != nil {
+ t.Errorf("Set.Contains() returned no error for state
key %v entry bar when it should have returned %v", tt.vs.Key, err)
+ } else if ok && !tt.barOk {
+ t.Errorf("Set.Contains() returned true for state key %v
entry bar when it shouldn't have", tt.vs.Key)
+ } else if !ok && tt.barOk {
+ t.Errorf("Set.Contains() returned false for state key
%v entry bar when it should have returned true", tt.vs.Key)
+ }
+ }
+}
+
+func TestSetKeys(t *testing.T) {
+ is := make(map[string]interface{})
+ im := make(map[string]map[string]interface{})
+ ts := make(map[string][]Transaction)
+ es := make(map[string]error)
+ ca := make(map[string]bool)
+ eo := make(map[string]bool)
+ ts["no_transactions"] = nil
+ im["basic_set"] = map[string]interface{}{"foo": true}
+ ts["basic_set"] = []Transaction{{Key: "basic_set", Type:
TransactionTypeSet, Val: true, MapKey: "foo"}, {Key: "basic_set", Type:
TransactionTypeSet, Val: true, MapKey: "bar"}}
+ im["basic_clear"] = map[string]interface{}{"foo": true, "bar": true}
+ ts["basic_clear"] = []Transaction{{Key: "basic_clear", Type:
TransactionTypeClear, Val: nil, MapKey: "foo"}}
+ im["set_then_clear"] = map[string]interface{}{"foo": true, "bar": true}
+ ts["set_then_clear"] = []Transaction{{Key: "set_then_clear", Type:
TransactionTypeSet, Val: true, MapKey: "foo"}, {Key: "set_then_clear", Type:
TransactionTypeClear, Val: nil, MapKey: "foo"}}
+ im["err"] = map[string]interface{}{"foo": true}
+ ts["err"] = []Transaction{{Key: "err", Type: TransactionTypeSet, Val:
true}}
+ es["err"] = errFake
+
+ f := fakeProvider{
+ initialMapState: im,
+ initialState: is,
+ transactions: ts,
+ err: es,
+ createAccumForKey: ca,
+ extractOutForKey: eo,
+ }
+
+ var tests = []struct {
+ vs Set[string]
+ keys []string
+ ok bool
+ err error
+ }{
+ {MakeSetState[string]("no_transactions"), []string{}, false,
nil},
+ {MakeSetState[string]("basic_set"), []string{"foo", "bar"},
true, nil},
+ {MakeSetState[string]("basic_clear"), []string{"bar"}, true,
nil},
+ {MakeSetState[string]("set_then_clear"), []string{"bar"}, true,
nil},
+ {MakeSetState[string]("err"), []string{}, false, errFake},
+ }
+
+ for _, tt := range tests {
+ val, ok, err := tt.vs.Keys(&f)
+ if err != nil && tt.err == nil {
+ t.Errorf("Set.Keys() returned error %v for state key %v
when it shouldn't have", err, tt.vs.Key)
+ } else if err == nil && tt.err != nil {
+ t.Errorf("Set.Keys() returned no error for state key %v
when it should have returned %v", tt.vs.Key, err)
+ } else if ok && !tt.ok {
+ t.Errorf("Set.Keys() returned a value %v for state key
%v when it shouldn't have", val, tt.vs.Key)
+ } else if len(val) != len(tt.keys) {
+ t.Errorf("Set.Keys()=%v, want %v for state key %v",
val, tt.keys, tt.vs.Key)
+ } else {
+ eq := true
+ for idx, v := range val {
+ if v != tt.keys[idx] {
+ eq = false
+ }
+ }
+ if !eq {
+ t.Errorf("Map.Keys()=%v, want %v for state key
%v", val, tt.keys, tt.vs.Key)
+ }
+ }
+ }
+}
+
+func TestMapAdd(t *testing.T) {
Review Comment:
```suggestion
func TestSetAdd(t *testing.T) {
```
##########
sdks/go/pkg/beam/core/state/state_test.go:
##########
@@ -684,3 +684,171 @@ func TestMapPut(t *testing.T) {
}
}
}
+
+func TestSetContains(t *testing.T) {
+ is := make(map[string]interface{})
+ im := make(map[string]map[string]interface{})
+ ts := make(map[string][]Transaction)
+ es := make(map[string]error)
+ ca := make(map[string]bool)
+ eo := make(map[string]bool)
+ ts["no_transactions"] = nil
+ im["basic_set"] = map[string]interface{}{}
+ ts["basic_set"] = []Transaction{{Key: "basic_set", Type:
TransactionTypeSet, Val: true, MapKey: "foo"}, {Key: "basic_set", Type:
TransactionTypeSet, Val: true, MapKey: "bar"}}
+ im["basic_clear"] = map[string]interface{}{"foo": true, "bar": true}
+ ts["basic_clear"] = []Transaction{{Key: "basic_clear", Type:
TransactionTypeClear, Val: nil, MapKey: "foo"}}
+ im["set_then_clear"] = map[string]interface{}{"foo": true, "bar": true}
+ ts["set_then_clear"] = []Transaction{{Key: "set_then_clear", Type:
TransactionTypeSet, Val: true, MapKey: "foo"}, {Key: "set_then_clear", Type:
TransactionTypeClear, Val: nil, MapKey: "foo"}}
+ im["err"] = map[string]interface{}{"foo": true}
+ ts["err"] = []Transaction{{Key: "err", Type: TransactionTypeSet, Val:
true}}
+ es["err"] = errFake
+
+ f := fakeProvider{
+ initialMapState: im,
+ initialState: is,
+ transactions: ts,
+ err: es,
+ createAccumForKey: ca,
+ extractOutForKey: eo,
+ }
+
+ var tests = []struct {
+ vs Set[string]
+ fooOk bool
+ barOk bool
+ err error
+ }{
+ {MakeSetState[string]("no_transactions"), false, false, nil},
+ {MakeSetState[string]("basic_set"), true, true, nil},
+ {MakeSetState[string]("basic_clear"), false, true, nil},
+ {MakeSetState[string]("set_then_clear"), false, true, nil},
+ {MakeSetState[string]("err"), false, false, errFake},
+ }
+
+ for _, tt := range tests {
+ ok, err := tt.vs.Contains(&f, "foo")
+ if err != nil && tt.err == nil {
+ t.Errorf("Set.Contains() returned error %v for state
key %v entry foo when it shouldn't have", err, tt.vs.Key)
+ } else if err == nil && tt.err != nil {
+ t.Errorf("Set.Contains() returned no error for state
key %v entry foo when it should have returned %v", tt.vs.Key, err)
+ } else if ok && !tt.fooOk {
+ t.Errorf("Set.Contains() returned true for state key %v
entry foo when it shouldn't have", tt.vs.Key)
+ } else if !ok && tt.fooOk {
+ t.Errorf("Set.Contains() returned false for state key
%v entry foo when it should have returned true", tt.vs.Key)
+ }
+ ok, err = tt.vs.Contains(&f, "bar")
+ if err != nil && tt.err == nil {
+ t.Errorf("Set.Contains() returned error %v for state
key %v entry bar when it shouldn't have", err, tt.vs.Key)
+ } else if err == nil && tt.err != nil {
+ t.Errorf("Set.Contains() returned no error for state
key %v entry bar when it should have returned %v", tt.vs.Key, err)
+ } else if ok && !tt.barOk {
+ t.Errorf("Set.Contains() returned true for state key %v
entry bar when it shouldn't have", tt.vs.Key)
+ } else if !ok && tt.barOk {
+ t.Errorf("Set.Contains() returned false for state key
%v entry bar when it should have returned true", tt.vs.Key)
+ }
+ }
+}
+
+func TestSetKeys(t *testing.T) {
+ is := make(map[string]interface{})
+ im := make(map[string]map[string]interface{})
+ ts := make(map[string][]Transaction)
+ es := make(map[string]error)
+ ca := make(map[string]bool)
+ eo := make(map[string]bool)
+ ts["no_transactions"] = nil
+ im["basic_set"] = map[string]interface{}{"foo": true}
+ ts["basic_set"] = []Transaction{{Key: "basic_set", Type:
TransactionTypeSet, Val: true, MapKey: "foo"}, {Key: "basic_set", Type:
TransactionTypeSet, Val: true, MapKey: "bar"}}
+ im["basic_clear"] = map[string]interface{}{"foo": true, "bar": true}
+ ts["basic_clear"] = []Transaction{{Key: "basic_clear", Type:
TransactionTypeClear, Val: nil, MapKey: "foo"}}
+ im["set_then_clear"] = map[string]interface{}{"foo": true, "bar": true}
+ ts["set_then_clear"] = []Transaction{{Key: "set_then_clear", Type:
TransactionTypeSet, Val: true, MapKey: "foo"}, {Key: "set_then_clear", Type:
TransactionTypeClear, Val: nil, MapKey: "foo"}}
+ im["err"] = map[string]interface{}{"foo": true}
+ ts["err"] = []Transaction{{Key: "err", Type: TransactionTypeSet, Val:
true}}
+ es["err"] = errFake
+
+ f := fakeProvider{
+ initialMapState: im,
+ initialState: is,
+ transactions: ts,
+ err: es,
+ createAccumForKey: ca,
+ extractOutForKey: eo,
+ }
+
+ var tests = []struct {
+ vs Set[string]
+ keys []string
+ ok bool
+ err error
+ }{
+ {MakeSetState[string]("no_transactions"), []string{}, false,
nil},
+ {MakeSetState[string]("basic_set"), []string{"foo", "bar"},
true, nil},
+ {MakeSetState[string]("basic_clear"), []string{"bar"}, true,
nil},
+ {MakeSetState[string]("set_then_clear"), []string{"bar"}, true,
nil},
+ {MakeSetState[string]("err"), []string{}, false, errFake},
+ }
+
+ for _, tt := range tests {
+ val, ok, err := tt.vs.Keys(&f)
+ if err != nil && tt.err == nil {
+ t.Errorf("Set.Keys() returned error %v for state key %v
when it shouldn't have", err, tt.vs.Key)
+ } else if err == nil && tt.err != nil {
+ t.Errorf("Set.Keys() returned no error for state key %v
when it should have returned %v", tt.vs.Key, err)
+ } else if ok && !tt.ok {
+ t.Errorf("Set.Keys() returned a value %v for state key
%v when it shouldn't have", val, tt.vs.Key)
+ } else if len(val) != len(tt.keys) {
+ t.Errorf("Set.Keys()=%v, want %v for state key %v",
val, tt.keys, tt.vs.Key)
+ } else {
+ eq := true
+ for idx, v := range val {
+ if v != tt.keys[idx] {
+ eq = false
+ }
+ }
+ if !eq {
+ t.Errorf("Map.Keys()=%v, want %v for state key
%v", val, tt.keys, tt.vs.Key)
Review Comment:
```suggestion
t.Errorf("Set.Keys()=%v, want %v for state key
%v", val, tt.keys, tt.vs.Key)
```
##########
sdks/go/pkg/beam/core/runtime/exec/translate.go:
##########
@@ -495,10 +495,22 @@ func (b *builder) makeLink(from string, id linkID) (Node,
error) {
} else if ms :=
spec.GetMapSpec(); ms != nil {
cID =
ms.ValueCoderId
kcID =
ms.KeyCoderId
+ } else if ss :=
spec.GetSetSpec(); ss != nil {
+ kcID =
ss.ElementCoderId
+ } else {
+ a :=
spec.GetSpec().(*pipepb.StateSpec_SetSpec)
Review Comment:
Is it possible to directly use `spec.GetSpec()` in error statement below
instead of checking for `SetSpec` value here?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]