youngoli commented on a change in pull request #13245:
URL: https://github.com/apache/beam/pull/13245#discussion_r517781915
##########
File path: sdks/go/pkg/beam/io/synthetic/source.go
##########
@@ -131,8 +130,19 @@ func (fn *sourceFn) ProcessElement(rt *sdf.LockRTracker,
config SourceConfig, em
for i := rt.GetRestriction().(offsetrange.Restriction).Start;
rt.TryClaim(i) == true; i++ {
key := make([]byte, config.KeySize)
val := make([]byte, config.ValueSize)
- if _, err := fn.rng.Read(key); err != nil {
- return err
+ generator := sourceFn{}
+ generator.rng = rand.New(rand.NewSource(i))
+ randomSample := generator.rng.Float64()
Review comment:
It's unnecessary to create a new sourceFn here just to use the rng
field. You should be able to get the same behavior by using the rand.Rand
object directly.
```suggestion
generator := rand.New(rand.NewSource(i))
randomSample := generator.Float64()
```
##########
File path: sdks/go/pkg/beam/io/synthetic/source_test.go
##########
@@ -162,6 +163,59 @@ func TestSourceConfig_BuildFromJSON(t *testing.T) {
}
}
+// TestSourceConfig_NumHotKeys tests that setting the number of hot keys
+// for a synthetic source works correctly.
+func TestSourceConfigBuilder_NumHotKeys(t *testing.T) {
+ tests := []struct {
+ elms int
+ hotKeys int
+ }{
+ {elms: 15, hotKeys: 2},
+ {elms: 30, hotKeys: 10},
+ {elms: 50, hotKeys: 25},
+ }
+ for _, test := range tests {
+ test := test
+ t.Run(fmt.Sprintf("(elm = %v)", test.hotKeys), func(t
*testing.T) {
+ dfn := sourceFn{}
+ cfg := DefaultSourceConfig()
+ cfg.NumElements(test.elms)
+ cfg.HotKeyFraction(1.0)
+ cfg.NumHotKeys(test.hotKeys)
+
+ keys, _, err := simulateSourceFn(t, &dfn, cfg.Build())
+ if err != nil {
+ t.Errorf("Failure processing sourceFn: %v", err)
+ }
+
+ m := make(map[string]int)
+ for _, key := range keys {
+ encoded := hex.EncodeToString(key)
+ m[encoded]++
+ }
+
+ numOfHotKeys := 0
+ numOfAllKeys := 0
+ for _, element := range m {
+ numOfAllKeys += element
+ if element > 1 {
+ numOfHotKeys += 1
+ }
+ }
+
+ if numOfAllKeys != test.elms {
Review comment:
I don't think the numOfAllKeys check here is necessary. It essentially
boils down to counting the number of elements emitted which is already done by
other unit tests. The only reason I can see for this is if you suspect that
enabling hot keys might cause elements to drop, when they wouldn't otherwise,
but based on your implementation I don't think that's something to worry about.
##########
File path: sdks/go/pkg/beam/io/synthetic/source.go
##########
@@ -26,13 +26,12 @@ import (
"bytes"
"encoding/json"
"fmt"
+ "github.com/apache/beam/sdks/go/pkg/beam"
Review comment:
Nit: The style in the Go SDK is to separate standard Go package imports
from other others. So in this case it's actually the beam/core/sdf import that
should be moved to the lower group, separate from the standard Go packages.
##########
File path: sdks/go/pkg/beam/io/synthetic/source.go
##########
@@ -131,8 +130,19 @@ func (fn *sourceFn) ProcessElement(rt *sdf.LockRTracker,
config SourceConfig, em
for i := rt.GetRestriction().(offsetrange.Restriction).Start;
rt.TryClaim(i) == true; i++ {
key := make([]byte, config.KeySize)
val := make([]byte, config.ValueSize)
- if _, err := fn.rng.Read(key); err != nil {
- return err
+ generator := sourceFn{}
+ generator.rng = rand.New(rand.NewSource(i))
Review comment:
I would avoid making local instances of rand.Rand here unless you have a
very good reason here. Is the reason you're creating new, local instances of
rand.Rand and seeding them with the restriction positions because you want to
get completely deterministic behavior, regardless of any splitting that might
occur on the restriction?
If so: Then be aware that this implementation is going to get identical
results per input (in this case, per SourceConfig). For example, if you make
two SourceConfigs for 10 elements each, then both of them will seed these
generators with the numbers 0-9, which can lead to strangely identical results.
If you want purely deterministic output that is still different per
SourceConfig, then each SourceConfig will probably need some kind of unique but
deterministic salt that can be added to these numbers (example:
`rand.NewSource(i + fn.salt))`).
On the other hand, if this check and the hotkey distributions don't need to
be deterministic, then I would simplify a bit and just use the existing Rand
object (`fn.rng`) more. Or, if you need to customize the rng for test purposes,
you can add more `randWrapper` fields to the SourceFn struct. I think the only
spot where you would need deterministic values in that case is for filling out
the actual hotkey value, and even then I think there's room for optimizing so
we don't constantly create and reseed a rand.Rand.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]