[
https://issues.apache.org/jira/browse/SPARK-23298?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16360415#comment-16360415
]
Mateusz Jukiewicz commented on SPARK-23298:
-------------------------------------------
Not sure but seems like it could be related to SPARK-23207
> distinct.count on Dataset/DataFrame yields non-deterministic results
> --------------------------------------------------------------------
>
> Key: SPARK-23298
> URL: https://issues.apache.org/jira/browse/SPARK-23298
> Project: Spark
> Issue Type: Bug
> Components: Shuffle, SQL, YARN
> Affects Versions: 2.1.0, 2.2.0
> Environment: Spark 2.2.0 or 2.1.0
> Java 1.8.0_144
> Yarn version:
> {code:java}
> Hadoop 2.6.0-cdh5.12.1
> Subversion http://github.com/cloudera/hadoop -r
> 520d8b072e666e9f21d645ca6a5219fc37535a52
> Compiled by jenkins on 2017-08-24T16:43Z
> Compiled with protoc 2.5.0
> From source with checksum de51bf9693ab9426379a1cd28142cea0
> This command was run using
> /usr/lib/hadoop/hadoop-common-2.6.0-cdh5.12.1.jar{code}
>
>
> Reporter: Mateusz Jukiewicz
> Priority: Major
>
> This is what happens:
> {code:java}
> /* Exemplary spark-shell starting command
> /opt/spark/bin/spark-shell \
> --num-executors 269 \
> --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
> --conf spark.kryoserializer.buffer.max=512m
> */
> val dataset = spark.read.textFile("/text_dataset.out")
> dataset.distinct.count
> // res0: Long = 24025868
> dataset.distinct.count
> // res1: Long = 24014227{code}
> The _text_dataset.out_ file is a dataset with one string per line. The string
> has alphanumeric characters as well as colons and spaces. The line length
> does not exceed 1200. I don't think that's important though, as the issue
> appeared on various other datasets, I just tried to narrow it down to the
> simplest possible case.
> The observations regarding the issue are as follows:
> * I managed to reproduce it on both spark 2.2 and spark 2.1.
> * The issue occurs in YARN cluster mode (I haven't tested YARN client mode).
> * The issue is not reproducible on a single machine (e.g. laptop) in spark
> local mode.
> * It seems that once the correct count is computed, it is not possible to
> reproduce the issue in the same spark session. In other words, I was able to
> get 2-3 incorrect distinct.count results consecutively, but once it got
> right, it always returned the correct value. I had to re-run spark-shell to
> observe the problem again.
> * The issue appears on both Dataset and DataFrame (i.e. using read.text or
> read.textFile).
> * The issue is not reproducible on RDD (i.e. dataset.rdd.distinct.count).
> * Not a single container has failed in those multiple invalid executions.
> * YARN doesn't show any warnings or errors in those invalid executions.
> * The execution plan determined for both valid and invalid executions was
> always the same (it's shown in the _SQL_ tab of the UI).
> * The number returned in the invalid executions was always greater than the
> correct number (24 014 227).
> * This occurs even though the input is already completely deduplicated (i.e.
> _distinct.count_ shouldn't change anything).
> * The input isn't replicated (i.e. there's only one copy of each file block
> on the HDFS).
> * The problem is probably not related to reading from HDFS. Spark was always
> able to correctly read all input records (which was shown in the UI), and
> that number got malformed after the exchange phase:
> ** correct execution:
> Input Size / Records: 3.9 GB / 24014227 _(first stage)_
> Shuffle Write: 3.3 GB / 24014227 _(first stage)_
> Shuffle Read: 3.3 GB / 24014227 _(second stage)_
> ** incorrect execution:
> Input Size / Records: 3.9 GB / 24014227 _(first stage)_
> Shuffle Write: 3.3 GB / 24014227 _(first stage)_
> Shuffle Read: 3.3 GB / 24020150 _(second stage)_
> * The problem might be related with the internal way of Encoders hashing.
> The reason might be:
> ** in a simple `distinct.count` invocation, there are in total three
> hash-related stages (called `HashAggregate`),
> ** excerpt from scaladoc for `distinct` method says:
> {code:java}
> * @note Equality checking is performed directly on the encoded
> representation of the data
> * and thus is not affected by a custom `equals` function defined on
> `T`.{code}
> * One of my suspicions was the number of partitions we're using (2154). This
> is greater than 2000, which means that a different data structure (i.e.
> _HighlyCompressedMapStatus_instead of _CompressedMapStatus_) will be used for
> book-keeping during the shuffle. Unfortunately after decreasing the number
> below this threshold the problem still occurs.
> * It's easier to reproduce the issue with a large number of partitions.
> * One of my another suspicions was that it's somehow related to the number
> of blocks on the HDFS (974). I was able to reproduce the problem with both
> less and more partitions than this value, so I think this is not the case.
> * Final note: It looks like for some reason the data gets duplicated in the
> process of data exchange during the shuffle (because shuffle read sees more
> elements than shuffle write has written).
> Please let me know if you have any other questions.
> I couldn't find much about similar problems on the Web, the only thing I
> found was on the spark mailing list where someone using PySpark has found
> that one of his/her executors was hashing things differently than the other
> one which caused a similar issue.
> I didn't include a reproducible example as this is just a long file with
> strings and as this occurred on many different datasets, I doubt it's
> data-related. If that's necessary though, please let me know and I will try
> to prepare an example.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]