Mateusz Jukiewicz commented on SPARK-23298:

Not sure but seems like it could be related to SPARK-23207

> distinct.count on Dataset/DataFrame yields non-deterministic results
> --------------------------------------------------------------------
>                 Key: SPARK-23298
>                 URL: https://issues.apache.org/jira/browse/SPARK-23298
>             Project: Spark
>          Issue Type: Bug
>          Components: Shuffle, SQL, YARN
>    Affects Versions: 2.1.0, 2.2.0
>         Environment: Spark 2.2.0 or 2.1.0
> Java 1.8.0_144
> Yarn version:
> {code:java}
> Hadoop 2.6.0-cdh5.12.1
> Subversion http://github.com/cloudera/hadoop -r 
> 520d8b072e666e9f21d645ca6a5219fc37535a52
> Compiled by jenkins on 2017-08-24T16:43Z
> Compiled with protoc 2.5.0
> From source with checksum de51bf9693ab9426379a1cd28142cea0
> This command was run using 
> /usr/lib/hadoop/hadoop-common-2.6.0-cdh5.12.1.jar{code}
>            Reporter: Mateusz Jukiewicz
>            Priority: Major
> This is what happens:
> {code:java}
> /* Exemplary spark-shell starting command 
> /opt/spark/bin/spark-shell \
> --num-executors 269 \
> --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
> --conf spark.kryoserializer.buffer.max=512m 
> */
> val dataset = spark.read.textFile("/text_dataset.out")
> dataset.distinct.count
> // res0: Long = 24025868
> dataset.distinct.count
> // res1: Long = 24014227{code}
> The _text_dataset.out_ file is a dataset with one string per line. The string 
> has alphanumeric characters as well as colons and spaces. The line length 
> does not exceed 1200. I don't think that's important though, as the issue 
> appeared on various other datasets, I just tried to narrow it down to the 
> simplest possible case.
> The observations regarding the issue are as follows:
>  * I managed to reproduce it on both spark 2.2 and spark 2.1.
>  * The issue occurs in YARN cluster mode (I haven't tested YARN client mode).
>  * The issue is not reproducible on a single machine (e.g. laptop) in spark 
> local mode.
>  * It seems that once the correct count is computed, it is not possible to 
> reproduce the issue in the same spark session. In other words, I was able to 
> get 2-3 incorrect distinct.count results consecutively, but once it got 
> right, it always returned the correct value. I had to re-run spark-shell to 
> observe the problem again.
>  * The issue appears on both Dataset and DataFrame (i.e. using read.text or 
> read.textFile).
>  * The issue is not reproducible on RDD (i.e. dataset.rdd.distinct.count).
>  * Not a single container has failed in those multiple invalid executions.
>  * YARN doesn't show any warnings or errors in those invalid executions.
>  * The execution plan determined for both valid and invalid executions was 
> always the same (it's shown in the _SQL_ tab of the UI).
>  * The number returned in the invalid executions was always greater than the 
> correct number (24 014 227).
>  * This occurs even though the input is already completely deduplicated (i.e. 
> _distinct.count_ shouldn't change anything).
>  * The input isn't replicated (i.e. there's only one copy of each file block 
> on the HDFS).
>  * The problem is probably not related to reading from HDFS. Spark was always 
> able to correctly read all input records (which was shown in the UI), and 
> that number got malformed after the exchange phase:
>  ** correct execution:
>  Input Size / Records: 3.9 GB / 24014227 _(first stage)_
>  Shuffle Write: 3.3 GB / 24014227 _(first stage)_
>  Shuffle Read: 3.3 GB / 24014227 _(second stage)_
>  ** incorrect execution:
>  Input Size / Records: 3.9 GB / 24014227 _(first stage)_
>  Shuffle Write: 3.3 GB / 24014227 _(first stage)_
>  Shuffle Read: 3.3 GB / 24020150 _(second stage)_
>  * The problem might be related with the internal way of Encoders hashing. 
> The reason might be:
>  ** in a simple `distinct.count` invocation, there are in total three 
> hash-related stages (called `HashAggregate`),
>  ** excerpt from scaladoc for `distinct` method says:
> {code:java}
>    * @note Equality checking is performed directly on the encoded 
> representation of the data
>    * and thus is not affected by a custom `equals` function defined on 
> `T`.{code}
>  * One of my suspicions was the number of partitions we're using (2154). This 
> is greater than 2000, which means that a different data structure (i.e. 
> _HighlyCompressedMapStatus_instead of _CompressedMapStatus_) will be used for 
> book-keeping during the shuffle. Unfortunately after decreasing the number 
> below this threshold the problem still occurs.
>  * It's easier to reproduce the issue with a large number of partitions.
>  * One of my another suspicions was that it's somehow related to the number 
> of blocks on the HDFS (974). I was able to reproduce the problem with both 
> less and more partitions than this value, so I think this is not the case.
>  * Final note: It looks like for some reason the data gets duplicated in the 
> process of data exchange during the shuffle (because shuffle read sees more 
> elements than shuffle write has written).
> Please let me know if you have any other questions.
> I couldn't find much about similar problems on the Web, the only thing I 
> found was on the spark mailing list where someone using PySpark has found 
> that one of his/her executors was hashing things differently than the other 
> one which caused a similar issue.
> I didn't include a reproducible example as this is just a long file with 
> strings and as this occurred on many different datasets, I doubt it's 
> data-related. If that's necessary though, please let me know and I will try 
> to prepare an example.

This message was sent by Atlassian JIRA

To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to