[
https://issues.apache.org/jira/browse/TEZ-3207?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Ming Ma updated TEZ-3207:
-------------------------
Attachment: TEZ-3207-2.patch
Here is updated patch to address [~sseth]'s comments.
bq. can we include the numPending partitions, and num pending segments
{{toDetailedString}} has already included the mapping of partition to segments.
I have updated it to include the partition ids for {{toString}}. If not enough,
we can add more.
bq. host = new InputHost(identifier); could be host = identifier ?
There are different classes, not sure how you can cast them. But the updated
patch have {{InputHost}} extend {{HostPort}}. At least it removes unnecessary
{{InputHost#getHost}} etc. We can do the same thing for {{MapHost}} used in the
sorted case in another jira which requires refactoring anyway to decouple
partition id from HostPort.
bq. On the test - can we reduce the sleep itnerval since this is a poll.
Fixed. BTW, hadoop common has GenericTestUtils#waitFor. It will be nice if Tez
can use that and has its own lib like this. Seems like another jira to clean up
the overall Tez code.
The test case has been updated to include your delayed event delivery
suggestion. It has also been modified to verify ShuffleManager can fetch from
multiple mappers, each of which has multiple partitions.
> Add support for fetching multiple partitions from the same source task to
> UnorderedKVInput
> ------------------------------------------------------------------------------------------
>
> Key: TEZ-3207
> URL: https://issues.apache.org/jira/browse/TEZ-3207
> Project: Apache Tez
> Issue Type: Bug
> Reporter: Ming Ma
> Assignee: Ming Ma
> Attachments: TEZ-3207-2.patch, TEZ-3207.patch
>
>
> The ordered grouped {{ShuffleScheduler}} can support fetching multiple
> partitions from the same source task. But for the unordered ShuffleManager,
> it only supports one partition per source task due to the following issue
> where {{identifier}} doesn't take partition id into account.
> {noformat}
> public void addKnownInput(String hostName, int port,
> InputAttemptIdentifier srcAttemptIdentifier, int srcPhysicalIndex) {
> String identifier = InputHost.createIdentifier(hostName, port);
> InputHost host = knownSrcHosts.get(identifier);
> ....
> }
> {noformat}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)