[
https://issues.apache.org/jira/browse/PIG-4421?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
liyunzhang_intel updated PIG-4421:
----------------------------------
Attachment: PIG-4421_5.patch
the difference between PIG-4421_4.patch and PIG-4421_5.patch:
fix the TestSkewedJoin.testSkewedJoinManyReducers unit test failure.
*why TestSkewedJoin.testSkewedJoinManyReducers fails in PIG-4421_4.patch?*
let me explain it with an example:
{code}
bin/testSkewedJoinManyReducers.pig:
A = load './testSkewedJoin6.txt' as (id,name);
B = load './testSkewedJoin7.txt' as (id,name);
C = join A by id, B by id using 'skewed' parallel 300;
store C into './testSkewedJoinManyReducers.out';
{code}
{code}
cat bin/testSkewedJoin6.txt:
237 0
237 1
237 2
237 3
237 4
{code}
{code}
cat bin/testSkewedJoin7.txt:
237 0
237 1
{code}
The result of testSkewedJoinManyReducers.pig sometimes is:
{code}
237 0 237 0
237 0 237 1
237 1 237 0
237 1 237 1
237 2 237 0
237 2 237 1
237 3 237 0
237 3 237 1
237 4 237 0
237 4 237 1
{code}
Sometimes is:
{code}
237 0 237 0
237 0 237 1
237 0 237 2
237 0 237 3
237 0 237 4
237 1 237 0
237 1 237 1
237 1 237 2
237 1 237 3
237 1 237 4
{code}
*The first one is left join which is correct while the second one is right
right which is not correct.* The reason causing this is because following code:
{code}
SparkLauncher.java#physicalToRDD
private void physicalToRDD(PhysicalPlan plan,
PhysicalOperator physicalOperator,
Map<OperatorKey, RDD<Tuple>> rdds,
List<RDD<Tuple>> rddsFromPredeSparkOper,
Map<Class<? extends PhysicalOperator>, POConverter>
convertMap)
throws IOException {
RDD<Tuple> nextRDD = null;
List<PhysicalOperator> predecessors = plan
.getPredecessors(physicalOperator); // the
predecessors is not sorted by the OperatorKey, for example,
physicalOperator(scope-9) and physicalOperator(scope-10) are both the
predecessor of physicalOperator(scope-13). Sometimes the predecessors is
[scope-9, scope-10], sometimes the predecessors is [scope-10, scope-9]. This
will make the left join change to right join.
…..
}
{code}
the solution to resolve this is :
{code}
SparkLauncher.java#physicalToRDD
private void physicalToRDD(PhysicalPlan plan,
PhysicalOperator physicalOperator,
Map<OperatorKey, RDD<Tuple>> rdds,
List<RDD<Tuple>> rddsFromPredeSparkOper,
Map<Class<? extends PhysicalOperator>, POConverter>
convertMap)
throws IOException {
RDD<Tuple> nextRDD = null;
List<PhysicalOperator> predecessors = plan
.getPredecessors(physicalOperator);
+ if( predecessors!= null ) {
+ Collections.sort(predecessors);
+}
…..
}
{code}
> implement visitSkewedJoin in SparkCompiler
> ------------------------------------------
>
> Key: PIG-4421
> URL: https://issues.apache.org/jira/browse/PIG-4421
> Project: Pig
> Issue Type: Sub-task
> Components: spark
> Reporter: liyunzhang_intel
> Assignee: liyunzhang_intel
> Fix For: spark-branch
>
> Attachments: PIG-4421.patch, PIG-4421_2.patch, PIG-4421_3.patch,
> PIG-4421_4.patch, PIG-4421_5.patch
>
>
> If visitSkewedJoin is not implemented, following unittests will fail.
> org.apache.pig.test.TestSkewedJoin.testSkewedJoinWithGroup
> org.apache.pig.test.TestSkewedJoin.testSkewedJoinMapKey
> org.apache.pig.test.TestSkewedJoin.testSkewedJoinManyReducers
> org.apache.pig.test.TestSkewedJoin.testNonExistingInputPathInSkewJoin
> org.apache.pig.test.TestSkewedJoin.testSkewedJoinOneValue
> org.apache.pig.test.TestSkewedJoin.testSkewedJoinWithNoProperties
> org.apache.pig.test.TestSkewedJoin.testSkewedJoinEmptyInput
> org.apache.pig.test.TestSkewedJoin.testSkewedJoinNullKeys
> org.apache.pig.test.TestSkewedJoin.testSkewedJoinOuter
> org.apache.pig.test.TestSkewedJoin.testRecursiveFileListing
> org.apache.pig.test.TestSkewedJoin.testSkewedJoinReducers
> org.apache.pig.test.TestJoinSmoke.testSkewedJoinWithGroup
> org.apache.pig.test.TestJoinSmoke.testSkewedJoinOuter
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)