[
https://issues.apache.org/jira/browse/HIVE-15527?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15827369#comment-15827369
]
Ferdinand Xu commented on HIVE-15527:
-------------------------------------
Hi [~csun], thank you for working on this issue and we can reproduce OOM caused
by Arraylist when data increased 100TB at a skewed data. We're trying to use
your patch to see whether it works. BTW, a minor issue exists in your patch.
If SPARK_SHUFFLE_BUFFER_SIZE is not set, the default long value will be passed
to parseLong(String) method.
{noformat}
long maxBufferSize =
Long.parseLong(this.jobConf.get(HiveConf.ConfVars.SPARK_SHUFFLE_BUFFER_SIZE.varname));
{noformat}
> Memory usage is unbound in SortByShuffler for Spark
> ---------------------------------------------------
>
> Key: HIVE-15527
> URL: https://issues.apache.org/jira/browse/HIVE-15527
> Project: Hive
> Issue Type: Improvement
> Components: Spark
> Affects Versions: 1.1.0
> Reporter: Xuefu Zhang
> Assignee: Chao Sun
> Attachments: HIVE-15527.0.patch, HIVE-15527.0.patch,
> HIVE-15527.1.patch, HIVE-15527.2.patch, HIVE-15527.3.patch,
> HIVE-15527.4.patch, HIVE-15527.5.patch, HIVE-15527.6.patch,
> HIVE-15527.7.patch, HIVE-15527.8.patch, HIVE-15527.patch
>
>
> In SortByShuffler.java, an ArrayList is used to back the iterator for values
> that have the same key in shuffled result produced by spark transformation
> sortByKey. It's possible that memory can be exhausted because of a large key
> group.
> {code}
> @Override
> public Tuple2<HiveKey, Iterable<BytesWritable>> next() {
> // TODO: implement this by accumulating rows with the same key
> into a list.
> // Note that this list needs to improved to prevent excessive
> memory usage, but this
> // can be done in later phase.
> while (it.hasNext()) {
> Tuple2<HiveKey, BytesWritable> pair = it.next();
> if (curKey != null && !curKey.equals(pair._1())) {
> HiveKey key = curKey;
> List<BytesWritable> values = curValues;
> curKey = pair._1();
> curValues = new ArrayList<BytesWritable>();
> curValues.add(pair._2());
> return new Tuple2<HiveKey, Iterable<BytesWritable>>(key,
> values);
> }
> curKey = pair._1();
> curValues.add(pair._2());
> }
> if (curKey == null) {
> throw new NoSuchElementException();
> }
> // if we get here, this should be the last element we have
> HiveKey key = curKey;
> curKey = null;
> return new Tuple2<HiveKey, Iterable<BytesWritable>>(key,
> curValues);
> }
> {code}
> Since the output from sortByKey is already sorted on key, it's possible to
> backup the value iterable using the same input iterator.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)