[
https://issues.apache.org/jira/browse/PIG-5044?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15803560#comment-15803560
]
liyunzhang_intel commented on PIG-5044:
---------------------------------------
[~kexianda]: help review.
Change in PIG-5044_3.patch
1. add POSparkSampleSort.java, SparkSampleSortConverter.java,
POPoissonSampleSpark.java. These classes is for
create the sample from input file and sort the sample to get the key
distributed file for SkewedJoin.
2. add comment code to show how to use it in SkewedJoin (PIG-4855), @xiandake,
please apply the comment code in
PIG-4855's patch.
3. an example to explain the feature, following is what i integrate the patch
of PIG-4855 and PIG-5044, a skewed join case
script
{code}
A = load './SkewedJoinInput1.txt' as (id,name,n);
B = load './SkewedJoinInput2.txt' as (id,name);
D = join A by id, B by id using 'skewed' parallel 300;
store D into './testSkewedJoin.out';
explain D;
{code}
the spark plan, there will be 2 sparkOperators(scope-58 and scope-75). scope-58
is for sample data( column "id" ) from A(SkewedJoinInput1.txt) and sort the
sample to get the key distributed file.
scope-75 is for join two tables(A,B) by using the key distributed file
generated in scope-58.
{code}
Spark node scope-58
BroadcastSpark - scope-74
|
|---New For Each(false)[tuple] - scope-73
| |
| POUserFunc(org.apache.pig.impl.builtin.PartitionSkewedKeys)[tuple] -
scope-72
| |
| |---Project[tuple][*] - scope-71
|
|---New For Each(false,false)[tuple] - scope-70
| |
| Constant(300) - scope-69
| |
| Project[bag][1] - scope-68
|
|---POSparkSampleSort [tuple]() - scope-54
| |
| Project[bytearray][0] - scope-52
|
|---New For Each(true,true)[tuple] - scope-67
| |
| Project[bytearray][0] - scope-52
| |
|
POUserFunc(org.apache.pig.impl.builtin.GetMemNumRows)[tuple] - scope-65
| |
| |---Project[tuple][*] - scope-64
|
|---PoissonSample - scope-66
|
|---A: New For Each(false,false,false)[bag] - scope-63
| |
| Project[bytearray][0] - scope-60
| |
| Project[bytearray][1] - scope-61
| |
| Project[bytearray][2] - scope-62
|
|---A:
Load(hdfs://zly1.sh.intel.com:8020/user/root/SkewedJoinInput1.txt:org.apache.pig.builtin.PigStorage)
- scope-59--------
Spark node scope-75
D:
Store(hdfs://zly1.sh.intel.com:8020/user/root/testSkewedJoin.out:org.apache.pig.builtin.PigStorage)
- scope-55
|
|---D: SkewedJoin[tuple] - scope-54
| |
| Project[bytearray][0] - scope-52
| |
| Project[bytearray][0] - scope-53
|
|---A: New For Each(false,false,false)[bag] - scope-45
| | |
| | Project[bytearray][0] - scope-39
| | |
| | Project[bytearray][1] - scope-41
| | |
| | Project[bytearray][2] - scope-43
| |
| |---A:
Load(hdfs://zly1.sh.intel.com:8020/user/root/SkewedJoinInput1.txt:org.apache.pig.builtin.PigStorage)
- scope-38
|
|---B: New For Each(false,false)[bag] - scope-51
| |
| Project[bytearray][0] - scope-47
| |
| Project[bytearray][1] - scope-49
|
|---B:
Load(hdfs://zly1.sh.intel.com:8020/user/root/SkewedJoinInput2.txt:org.apache.pig.builtin.PigStorage)
- scope-46--------
+ cd -{code}
> Create SparlCompiler#getSamplingJob in spark mode
> -------------------------------------------------
>
> Key: PIG-5044
> URL: https://issues.apache.org/jira/browse/PIG-5044
> Project: Pig
> Issue Type: Sub-task
> Components: spark
> Reporter: liyunzhang_intel
> Assignee: liyunzhang_intel
> Fix For: spark-branch
>
> Attachments: PIG-5044_2.patch, PIG-5044_3.patch
>
>
> Like MRCompiler#getSamplingJob, we also need a function like that to sample
> data from a file, sort sampling data and generate output by
> UDF(org.apache.pig.impl.builtin.FindQuantiles).
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)