[
https://issues.apache.org/jira/browse/SPARK-23298?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Hyukjin Kwon resolved SPARK-23298.
----------------------------------
Resolution: Incomplete
> distinct.count on Dataset/DataFrame yields non-deterministic results
> --------------------------------------------------------------------
>
> Key: SPARK-23298
> URL: https://issues.apache.org/jira/browse/SPARK-23298
> Project: Spark
> Issue Type: Bug
> Components: Shuffle, SQL, YARN
> Affects Versions: 2.1.0, 2.2.0
> Environment: Spark 2.2.0 or 2.1.0
> Java 1.8.0_144
> Yarn version:
> {code:java}
> Hadoop 2.6.0-cdh5.12.1
> Subversion http://github.com/cloudera/hadoop -r
> 520d8b072e666e9f21d645ca6a5219fc37535a52
> Compiled by jenkins on 2017-08-24T16:43Z
> Compiled with protoc 2.5.0
> From source with checksum de51bf9693ab9426379a1cd28142cea0
> This command was run using
> /usr/lib/hadoop/hadoop-common-2.6.0-cdh5.12.1.jar{code}
>
>
> Reporter: Mateusz Jukiewicz
> Priority: Major
> Labels: Correctness, CorrectnessBug, bulk-closed, correctness
>
> This is what happens (EDIT - managed to get a reproducible example):
> {code:java}
> /* Exemplary spark-shell starting command
> /opt/spark/bin/spark-shell \
> --num-executors 269 \
> --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
> --conf spark.kryoserializer.buffer.max=512m
> // The spark.sql.shuffle.partitions is 2154 here, if that matters
> */
> val df = spark.range(10000000).withColumn("col1", (rand() *
> 1000).cast("long")).withColumn("col2", (rand() *
> 1000).cast("long")).drop("id")
> df.repartition(5240).write.parquet("/test.parquet")
> // Then, ideally in a new session
> val df = spark.read.parquet("/test.parquet")
> df.distinct.count
> // res1: Long = 1001256
>
> df.distinct.count
> // res2: Long = 999955 {code}
> -The _text_dataset.out_ file is a dataset with one string per line. The
> string has alphanumeric characters as well as colons and spaces. The line
> length does not exceed 1200. I don't think that's important though, as the
> issue appeared on various other datasets, I just tried to narrow it down to
> the simplest possible case.- (the case is now fully reproducible with the
> above code)
> The observations regarding the issue are as follows:
> * I managed to reproduce it on both spark 2.2 and spark 2.1.
> * The issue occurs in YARN cluster mode (I haven't tested YARN client mode).
> * The issue is not reproducible on a single machine (e.g. laptop) in spark
> local mode.
> * It seems that once the correct count is computed, it is not possible to
> reproduce the issue in the same spark session. In other words, I was able to
> get 2-3 incorrect distinct.count results consecutively, but once it got
> right, it always returned the correct value. I had to re-run spark-shell to
> observe the problem again.
> * The issue appears on both Dataset and DataFrame (i.e. using read.text or
> read.textFile).
> * The issue is not reproducible on RDD (i.e. dataset.rdd.distinct.count).
> * Not a single container has failed in those multiple invalid executions.
> * YARN doesn't show any warnings or errors in those invalid executions.
> * The execution plan determined for both valid and invalid executions was
> always the same (it's shown in the _SQL_ tab of the UI).
> * The number returned in the invalid executions was always greater than the
> correct number (24 014 227).
> * This occurs even though the input is already completely deduplicated (i.e.
> _distinct.count_ shouldn't change anything).
> * The input isn't replicated (i.e. there's only one copy of each file block
> on the HDFS).
> * The problem is probably not related to reading from HDFS. Spark was always
> able to correctly read all input records (which was shown in the UI), and
> that number got malformed after the exchange phase:
> ** correct execution:
> Input Size / Records: 3.9 GB / 24014227 _(first stage)_
> Shuffle Write: 3.3 GB / 24014227 _(first stage)_
> Shuffle Read: 3.3 GB / 24014227 _(second stage)_
> ** incorrect execution:
> Input Size / Records: 3.9 GB / 24014227 _(first stage)_
> Shuffle Write: 3.3 GB / 24014227 _(first stage)_
> Shuffle Read: 3.3 GB / 24020150 _(second stage)_
> * The problem might be related with the internal way of Encoders hashing.
> The reason might be:
> ** in a simple `distinct.count` invocation, there are in total three
> hash-related stages (called `HashAggregate`),
> ** excerpt from scaladoc for `distinct` method says:
> {code:java}
> * @note Equality checking is performed directly on the encoded
> representation of the data
> * and thus is not affected by a custom `equals` function defined on
> `T`.{code}
> * One of my suspicions was the number of partitions we're using (2154). This
> is greater than 2000, which means that a different data structure (i.e.
> _HighlyCompressedMapStatus_instead of _CompressedMapStatus_) will be used for
> book-keeping during the shuffle. Unfortunately after decreasing the number
> below this threshold the problem still occurs.
> * It's easier to reproduce the issue with a large number of partitions.
> * One of my another suspicions was that it's somehow related to the number
> of blocks on the HDFS (974). I was able to reproduce the problem with both
> less and more partitions than this value, so I think this is not the case.
> * Final note: It looks like for some reason the data gets duplicated in the
> process of data exchange during the shuffle (because shuffle read sees more
> elements than shuffle write has written).
> Please let me know if you have any other questions.
> I couldn't find much about similar problems on the Web, the only thing I
> found was on the spark mailing list where someone using PySpark has found
> that one of his/her executors was hashing things differently than the other
> one which caused a similar issue.
> I didn't include a reproducible example as this is just a long file with
> strings and as this occurred on many different datasets, I doubt it's
> data-related. If that's necessary though, please let me know and I will try
> to prepare an example.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]