youngoli commented on a change in pull request #13245:
URL: https://github.com/apache/beam/pull/13245#discussion_r517781915



##########
File path: sdks/go/pkg/beam/io/synthetic/source.go
##########
@@ -131,8 +130,19 @@ func (fn *sourceFn) ProcessElement(rt *sdf.LockRTracker, 
config SourceConfig, em
        for i := rt.GetRestriction().(offsetrange.Restriction).Start; 
rt.TryClaim(i) == true; i++ {
                key := make([]byte, config.KeySize)
                val := make([]byte, config.ValueSize)
-               if _, err := fn.rng.Read(key); err != nil {
-                       return err
+               generator := sourceFn{}
+               generator.rng = rand.New(rand.NewSource(i))
+               randomSample := generator.rng.Float64()

Review comment:
       It's unnecessary to create a new sourceFn here just to use the rng 
field. You should be able to get the same behavior by using the rand.Rand 
object directly.
   ```suggestion
                generator := rand.New(rand.NewSource(i))
                randomSample := generator.Float64()
   ```

##########
File path: sdks/go/pkg/beam/io/synthetic/source_test.go
##########
@@ -162,6 +163,59 @@ func TestSourceConfig_BuildFromJSON(t *testing.T) {
        }
 }
 
+// TestSourceConfig_NumHotKeys tests that setting the number of hot keys
+// for a synthetic source works correctly.
+func TestSourceConfigBuilder_NumHotKeys(t *testing.T) {
+       tests := []struct {
+               elms    int
+               hotKeys int
+       }{
+               {elms: 15, hotKeys: 2},
+               {elms: 30, hotKeys: 10},
+               {elms: 50, hotKeys: 25},
+       }
+       for _, test := range tests {
+               test := test
+               t.Run(fmt.Sprintf("(elm = %v)", test.hotKeys), func(t 
*testing.T) {
+                       dfn := sourceFn{}
+                       cfg := DefaultSourceConfig()
+                       cfg.NumElements(test.elms)
+                       cfg.HotKeyFraction(1.0)
+                       cfg.NumHotKeys(test.hotKeys)
+
+                       keys, _, err := simulateSourceFn(t, &dfn, cfg.Build())
+                       if err != nil {
+                               t.Errorf("Failure processing sourceFn: %v", err)
+                       }
+
+                       m := make(map[string]int)
+                       for _, key := range keys {
+                               encoded := hex.EncodeToString(key)
+                               m[encoded]++
+                       }
+
+                       numOfHotKeys := 0
+                       numOfAllKeys := 0
+                       for _, element := range m {
+                               numOfAllKeys += element
+                               if element > 1 {
+                                       numOfHotKeys += 1
+                               }
+                       }
+
+                       if numOfAllKeys != test.elms {

Review comment:
       I don't think the numOfAllKeys check here is necessary. It essentially 
boils down to counting the number of elements emitted which is already done by 
other unit tests. The only reason I can see for this is if you suspect that 
enabling hot keys might cause elements to drop, when they wouldn't otherwise, 
but based on your implementation I don't think that's something to worry about.

##########
File path: sdks/go/pkg/beam/io/synthetic/source.go
##########
@@ -26,13 +26,12 @@ import (
        "bytes"
        "encoding/json"
        "fmt"
+       "github.com/apache/beam/sdks/go/pkg/beam"

Review comment:
       Nit: The style in the Go SDK is to separate standard Go package imports 
from other others. So in this case it's actually the beam/core/sdf import that 
should be moved to the lower group, separate from the standard Go packages.

##########
File path: sdks/go/pkg/beam/io/synthetic/source.go
##########
@@ -131,8 +130,19 @@ func (fn *sourceFn) ProcessElement(rt *sdf.LockRTracker, 
config SourceConfig, em
        for i := rt.GetRestriction().(offsetrange.Restriction).Start; 
rt.TryClaim(i) == true; i++ {
                key := make([]byte, config.KeySize)
                val := make([]byte, config.ValueSize)
-               if _, err := fn.rng.Read(key); err != nil {
-                       return err
+               generator := sourceFn{}
+               generator.rng = rand.New(rand.NewSource(i))

Review comment:
       I would avoid making local instances of rand.Rand here unless you have a 
very good reason here. Is the reason you're creating new, local instances of 
rand.Rand and seeding them with the restriction positions because you want to 
get completely deterministic behavior, regardless of any splitting that might 
occur on the restriction?
   
   If so: Then be aware that this implementation is going to get identical 
results per input (in this case, per SourceConfig). For example, if you make 
two SourceConfigs for 10 elements each, then both of them will seed these 
generators with the numbers 0-9, which can lead to strangely identical results. 
If you want purely deterministic output that is still different per 
SourceConfig, then each SourceConfig will probably need some kind of unique but 
deterministic salt that can be added to these numbers (example: 
`rand.NewSource(i + fn.salt))`).
   
   On the other hand, if this check and the hotkey distributions don't need to 
be deterministic, then I would simplify a bit and just use the existing Rand 
object (`fn.rng`) more. Or, if you need to customize the rng for test purposes, 
you can add more `randWrapper` fields to the SourceFn struct. I think the only 
spot where you would need deterministic values in that case is for filling out 
the actual hotkey value, and even then I think there's room for optimizing so 
we don't constantly create and reseed a rand.Rand.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to