akupchinskiy commented on PR #2010:
URL: 
https://github.com/apache/datafusion-comet/pull/2010#issuecomment-3077747073

   > Thanks @akupchinskiy! I have a few questions. The big one for me is: does 
the seed state per partition match Spark's behavior, in particular the life 
cycle? If the seed gets reset at a different interval, it seems like a complex 
query would yield different results.
   
   I believe the partition-dependent state  is a cornerstone for all the 
nondeterministic functions in spark lifecycle. Basically, a state 
initialization hook is a part of [the common trait of all the nondeterministic 
functions](https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala#L520)
 
   
   Regarding the seed reset, could you elaborate more? A seed is evaluated 
during planning time and there is no place where it is gonna be reset. When 
tasks fail, it is gonna be reinitialized again with the same value. 
   
   > Put differently: `rand` and `randn` have fairly simple semantics to 
implement in native code. What are the implications of these non-deterministic 
functions returning different samples than Spark?
   Returning different samples than Spark seems not to be a big problem. Yet, 
having reproducible samples can be critical for 
   ML workloads or idempotent writes relying on the generated value as a part 
of a key. I don't see a way how we can guarantee reproducibility without 
inter-batch state management resilient to runtime parameters change (batch size 
in this very case).
   
   One thing to add, there are indeed complex queries where the output is not 
guaranteed to be the same even for the same seed.  Under the hood, rand is just 
a plain function of two arguments - seed (different per each partition) and a 
row number in the batch. And the second argument is the one making the whole 
thing truly undertermenistic since there is no guarantee on the order of 
incoming rows fetched from remote workers . Yet, there are cases where row 
number is stable across different executions:
   
   1. Evaluation happens before any shuffle:  
spark.read.parquet(...).select(rand(42))
   2. An evaluation order is enforced by explicit local sort:  
spark.read.parquet(...).repartition(42).sortWithinPartitions("some_unique_key").select(rand(42))
  
   
   In those scenarios we do have reproducibility and I believe a native 
implementation should also have this property. 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to