[
https://issues.apache.org/jira/browse/TEZ-1923?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14269939#comment-14269939
]
Siddharth Seth commented on TEZ-1923:
-------------------------------------
{code}
+ if (usedMemory > memoryLimit) {
+ LOG.info("Starting inMemoryMerger's merge since usedMemory=" +
+ memoryLimit + " > memoryLimit=" + memoryLimit +
+ ". commitMemory=" + commitMemory + ", mergeThreshold=" +
mergeThreshold);
+ startMemToDiskMerge();
+ }
{code}
This will, at best, attempt to start the memToDiskMerger - there's no guarantee
that it'll actually run since one may already be in progress. It ends up not
waiting for the MemToMemMerger to complete - which would free up some memory -
and potentially trigger another merge based on thresholds. The usedMemory at
this point will be determined by a race between the current thread and the
memtomemmerge thread (whether the unconditional reserve has been done yet or
not). Meanwhile, Fetchers block in any case - since memory isn't available. I
think it's better to leave this section of the patch out - to be fixed in the
MemToMem merger jiras.
{code}
+ if ((usedMemory + mergeOutputSize) > memoryLimit) {
+ LOG.info("Not enough memory to carry out mem-to-mem merging.
usedMemory=" + usedMemory +
+ " > memoryLimit=" + memoryLimit);
+ return;
+ }
{code}
usedMemory may not be visible correctly - since it isn't inside the main
MergeManager lock. This could also be part of the MemToMemMerger fixes.
{code}merger.waitForShuffleToMergeMemory();{code}
Would this be a problem in terms of connection timeouts - since this wait is
while the connection is established. IThis could be in the run() method similar
to merger.waitForInMemoryMerge() instead.
> FetcherOrderedGrouped gets into infinite loop due to memory pressure
> --------------------------------------------------------------------
>
> Key: TEZ-1923
> URL: https://issues.apache.org/jira/browse/TEZ-1923
> Project: Apache Tez
> Issue Type: Bug
> Reporter: Rajesh Balamohan
> Assignee: Rajesh Balamohan
> Attachments: TEZ-1923.1.patch, TEZ-1923.2.patch
>
>
> - Ran a comparatively large job (temp table creation) at 10 TB scale.
> - Turned on intermediate mem-to-mem
> (tez.runtime.shuffle.memory-to-memory.enable=true and
> tez.runtime.shuffle.memory-to-memory.segments=4)
> - Some reducers get lots of data and quickly gets into infinite loop
> {code}
> 2015-01-07 02:36:56,644 INFO [fetcher [Map_1] #2]
> orderedgrouped.FetcherOrderedGrouped: fetcher#2 - MergerManager returned
> Status.WAIT ...
> 2015-01-07 02:36:56,644 INFO [fetcher [Map_1] #2]
> orderedgrouped.ShuffleScheduler: m1:13562 freed by fetcher [Map_1] #2 in 3ms
> 2015-01-07 02:36:56,644 INFO [fetcher [Map_1] #2] shuffle.HttpConnection: for
> url=http://m1:13562/mapOutput?job=job_1420000126204_0201&reduce=34&map=attempt_1420000126204_0201_1_00_000420_0_10027&keepAlive=true
> sent hash and receievd reply 0 ms
> 2015-01-07 02:36:56,645 INFO [fetcher [Map_1] #2]
> orderedgrouped.FetcherOrderedGrouped: fetcher#2 - MergerManager returned
> Status.WAIT ...
> 2015-01-07 02:36:56,645 INFO [fetcher [Map_1] #2]
> orderedgrouped.ShuffleScheduler: m1:13562 freed by fetcher [Map_1] #2 in 1ms
> 2015-01-07 02:36:56,645 INFO [fetcher [Map_1] #2] shuffle.HttpConnection: for
> url=http://m1:13562/mapOutput?job=job_1420000126204_0201&reduce=34&map=attempt_1420000126204_0201_1_00_000420_0_10027&keepAlive=true
> sent hash and receievd reply 0 ms
> 2015-01-07 02:36:56,647 INFO [fetcher [Map_1] #2]
> orderedgrouped.FetcherOrderedGrouped: fetcher#2 - MergerManager returned
> Status.WAIT ...
> 2015-01-07 02:36:56,647 INFO [fetcher [Map_1] #2]
> orderedgrouped.ShuffleScheduler: m1:13562 freed by fetcher [Map_1] #2 in 2ms
> 2015-01-07 02:36:56,653 INFO [fetcher [Map_1] #2] shuffle.HttpConnection: for
> url=http://m1:13562/mapOutput?job=job_1420000126204_0201&reduce=34&map=attempt_1420000126204_0201_1_00_000420_0_10027&keepAlive=true
> sent hash and receievd reply 0 ms
> 2015-01-07 02:36:56,653 INFO [fetcher [Map_1] #2]
> orderedgrouped.FetcherOrderedGrouped: fetcher#2 - MergerManager returned
> Status.WAIT ...
> 2015-01-07 02:36:56,653 INFO [fetcher [Map_1] #2]
> orderedgrouped.ShuffleScheduler: m1:13562 freed by fetcher [Map_1] #2 in 5ms
> 2015-01-07 02:36:56,654 INFO [fetcher [Map_1] #2] shuffle.HttpConnection: for
> url=http://m1:13562/mapOutput?job=job_1420000126204_0201&reduce=34&map=attempt_1420000126204_0201_1_00_000420_0_10027&keepAlive=true
> sent hash and receievd reply 0 ms
> 2015-01-07 02:36:56,654 INFO [fetcher [Map_1] #2]
> orderedgrouped.FetcherOrderedGrouped: fetcher#2 - MergerManager returned
> Status.WAIT ...
> {code}
> Additional debug/patch statements revealed that InMemoryMerge is not invoked
> appropriately and not releasing the memory back for fetchers to proceed. e.g
> debug/patch messages are given below
> {code}
> syslog_attempt_1420000126204_0201_1_01_000034_0:2015-01-07 02:05:48,332 INFO
> [fetcher [Map_1] #2] orderedgrouped.MergeManager:
> Patch..usedMemory=1551867234, memoryLimit=1073741824, commitMemory=883028388,
> mergeThreshold=708669632 <<=== InMemoryMerge would be started in this case
> as commitMemory >= mergeThreshold
> syslog_attempt_1420000126204_0201_1_01_000034_0:2015-01-07 02:05:52,900 INFO
> [fetcher [Map_1] #2] orderedgrouped.MergeManager:
> Patch..usedMemory=1273349784, memoryLimit=1073741824, commitMemory=347296632,
> mergeThreshold=708669632 <<=== InMemoryMerge would *NOT* be started in this
> case as commitMemory < mergeThreshold. But the usedMemory is higher than
> memoryLimit. Fetchers would keep waiting indefinitely until memory is
> released. InMemoryMerge will not kick in and not release memory.
> syslog_attempt_1420000126204_0201_1_01_000034_0:2015-01-07 02:05:53,163 INFO
> [fetcher [Map_1] #1] orderedgrouped.MergeManager:
> Patch..usedMemory=1191994052, memoryLimit=1073741824, commitMemory=523155206,
> mergeThreshold=708669632 <<=== InMemoryMerge would *NOT* be started in this
> case as commitMemory < mergeThreshold. But the usedMemory is higher than
> memoryLimit. Fetchers would keep waiting indefinitely until memory is
> released. InMemoryMerge will not kick in and not release memory.
> {code}
> In MergeManager, in memory merging is invoked under the following condition
> {code}
> if (!inMemoryMerger.isInProgress() && commitMemory >= mergeThreshold)
> {code}
> Attaching the sample hive command just for reference
> {code}
> $HIVE_HOME/bin/hive -hiveconf tez.runtime.io.sort.factor=200 --hiveconf
> hive.tez.auto.reducer.parallelism=false --hiveconf
> tez.am.heartbeat.interval-ms.max=20 --hiveconf tez.runtime.io.sort.mb=1200
> --hiveconf tez.runtime.sort.threads=2 --hiveconf hive.tez.container.size=4096
> --hiveconf tez.runtime.shuffle.memory-to-memory.enable=true --hiveconf
> tez.runtime.shuffle.memory-to-memory.segments=4
> create table testData as select
> ss_sold_date_sk,ss_sold_time_sk,ss_item_sk,ss_customer_sk,ss_quantity,ss_sold_date
> from store_sales distribute by ss_sold_date;
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)