[
https://issues.apache.org/jira/browse/PIG-4848?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15208067#comment-15208067
]
Xianda Ke commented on PIG-4848:
--------------------------------
In MR mode, the flag was set as true internally for a merge join.
{code}
MRCompiler.visitMergeJoin() {
//...
curMROp.noCombineSmallSplits();
//...
}
{code}
and
{code}
JobControlCompiler.getJob() {
//..
if (!mro.combineSmallSplits() ||
pigContext.getProperties().getProperty("pig.splitCombination",
"true").equals("false"))
conf.setBoolean("pig.noSplitCombination", true);
//..
}
{code}
However, it doesn't work now in MR mode. The output is still out of order,
because the input splits of pig will be sorted again based on size by hadoop.
{code:title=org.apache.hadoop.mapreduce.JobSubmitter.java}
writeNewSplits () {
List<InputSplit> splits = input.getSplits(job);
//...
T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]);
// sort the splits into order based on size, so that the biggest
// go first
Arrays.sort(array, new SplitComparator());
JobSplitWriter.createSplitFiles(jobSubmitDir, conf,
jobSubmitDir.getFileSystem(conf), array);
}
{code}
In spark mode, there is no such sorting. if we set pig.noSplitCombination=true
internally, it should work.
> pig.noSplitCombination=true should always be set internally for a merge join
> ----------------------------------------------------------------------------
>
> Key: PIG-4848
> URL: https://issues.apache.org/jira/browse/PIG-4848
> Project: Pig
> Issue Type: Sub-task
> Components: spark
> Reporter: Xianda Ke
> Assignee: Xianda Ke
> Fix For: spark-branch
>
>
> In spark mode, for a merge join, the flag is NOT set as true internally. The
> input splits will be in the order of file size. The output is out of order.
> Scenaro:
> cat input1
> {code}
> 1 1
> {code}
> cat input2
> {code}
> 2 2
> {code}
> cat input3
> {code}
> 33 33
> {code}
> A = LOAD 'input*' as (a:int, b:int);
> B = LOAD 'input*' as (a:int, b:int);
> C = JOIN A BY $0, B BY $0 USING 'merge';
> DUMP C;
> expected result:
> {code}
> (1,1,1,1)
> (2,2,2,2)
> (33,33,33,33)
> {code}
> actual result:
> {code}
> (33,33,33,33)
> (1,1,1,1)
> (2,2,2,2)
> {code}
> In MR mode, the flag was set as true internally for a merge join(see:
> PIG-2773). However, it doesn't work now. The output is still out of order,
> because the splits will be ordered again by hadoop-client. In spark mode, we
> can solve this issue.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)