tszerszen commented on a change in pull request #13245:
URL: https://github.com/apache/beam/pull/13245#discussion_r517835487



##########
File path: sdks/go/pkg/beam/io/synthetic/source.go
##########
@@ -131,8 +130,19 @@ func (fn *sourceFn) ProcessElement(rt *sdf.LockRTracker, 
config SourceConfig, em
        for i := rt.GetRestriction().(offsetrange.Restriction).Start; 
rt.TryClaim(i) == true; i++ {
                key := make([]byte, config.KeySize)
                val := make([]byte, config.ValueSize)
-               if _, err := fn.rng.Read(key); err != nil {
-                       return err
+               generator := sourceFn{}
+               generator.rng = rand.New(rand.NewSource(i))

Review comment:
       @youngoli thank you for your feedback. Since I have the most experience 
with Python, when I was implementing it I was following the way it was 
implemented in Python.
   
   In Python generator is seed with each index, therefore I did the same in Go, 
and I seed the generator with a NewSource on each iteration. There is no salt 
in config in other SDKs implementation and the idea behind it was: To be 
consistent with other SDKs to be able to compare them.
   ```python
     def _gen_kv_pair(self, generator, index):
       generator.seed(index)
       rand = generator.random_sample()
   
       # Determines whether to generate hot key or not.
       if rand < self._hot_key_fraction:
         # Generate hot key.
         # An integer is randomly selected from the range [0, numHotKeys-1]
         # with equal probability.
         generator_hot = Generator(index % self._num_hot_keys)
         bytes_ = generator_hot.bytes(self._key_size), generator.bytes(
           self._value_size)
       else:
         bytes_ = generator.bytes(self.element_size)
         bytes_ = bytes_[:self._key_size], bytes_[self._key_size:]
       return bytes_
   
     def read(self, range_tracker):
       index = range_tracker.start_position()
       generator = Generator()
       while range_tracker.try_claim(index):
         time.sleep(self._sleep_per_input_record_sec)
         yield self._gen_kv_pair(generator, index)
         index += 1
   ```
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to