[ 
https://issues.apache.org/jira/browse/PIG-5359?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16646844#comment-16646844
 ] 

Satish Subhashrao Saley commented on PIG-5359:
----------------------------------------------

Updated amend patch since TestTezAutoParallelism was failing.

> Reduce time spent in split serialization
> ----------------------------------------
>
>                 Key: PIG-5359
>                 URL: https://issues.apache.org/jira/browse/PIG-5359
>             Project: Pig
>          Issue Type: Improvement
>            Reporter: Satish Subhashrao Saley
>            Assignee: Satish Subhashrao Saley
>            Priority: Major
>             Fix For: 0.18.0
>
>         Attachments: PIG-5359-3.patch, PIG-5359-amend-1.patch
>
>
> 1. Unnecessary serialization of splits in Tez.
>  In LoaderProcessor, pig calls
>  
> [https://github.com/apache/pig/blob/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/LoaderProcessor.java#L172]
> {code:java}
> tezOp.getLoaderInfo().setInputSplitInfo(MRInputHelpers.generateInputSplitsToMem(conf,
>  false, 0));
> {code}
> It ends up serializing the splits, just to print log.
> [https://github.com/apache/tez/blob/master/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRInputHelpers.java#L317]
> {code:java}
>   public static InputSplitInfoMem generateInputSplitsToMem(Configuration conf,
>       boolean groupSplits, boolean sortSplits, int targetTasks)
>       throws IOException, ClassNotFoundException, InterruptedException {
>       ....
>       ....
>           LOG.info("NumSplits: " + splitInfoMem.getNumTasks() + ", 
> SerializedSize: "
>         + splitInfoMem.getSplitsProto().getSerializedSize());
>     return splitInfoMem;
> {code}
> [https://github.com/apache/tez/blob/master/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/InputSplitInfoMem.java#L106]
> {code:java}
>   public MRSplitsProto getSplitsProto() {
>     if (isNewSplit) {
>       try {
>         return createSplitsProto(newFormatSplits, new 
> SerializationFactory(conf));
> {code}
> [https://github.com/apache/tez/blob/master/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/InputSplitInfoMem.java#L152-L170]
> {code:java}
>   private static MRSplitsProto createSplitsProto(
>       org.apache.hadoop.mapreduce.InputSplit[] newSplits,
>       SerializationFactory serializationFactory) throws IOException,
>       InterruptedException {
>     MRSplitsProto.Builder splitsBuilder = MRSplitsProto.newBuilder();
>     for (org.apache.hadoop.mapreduce.InputSplit newSplit : newSplits) {
>       splitsBuilder.addSplits(MRInputHelpers.createSplitProto(newSplit, 
> serializationFactory));
>     }
>     return splitsBuilder.build();
>   }
> {code}
> [https://github.com/apache/tez/blob/master/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRInputHelpers.java#L221-L259]
> 2. In TezDagBuilder, if splitsSerializedSize > spillThreshold, then the 
> InputSplits serialized in MRSplitsProto are not used by Pig and it serializes 
> again directly to disk via JobSplitWriter.createSplitFiles. So the InputSplit 
> serialization logic is called again which is wasteful and expensive in cases 
> like HCat.
> [https://github.com/apache/pig/blob/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java#L946-L947]
> {code:java}
> MRSplitsProto splitsProto = inputSplitInfo.getSplitsProto();
> int splitsSerializedSize = splitsProto.getSerializedSize();
> {code}
> The getSplitsProto, creates MRSplitsProto which consists of list of 
> MRSplitProto. MRSplitProto has serialized bytes of each InputFormat. If 
> splitsSerializedSize > spillThreshold, pig writes the splits to disk via
> {code:java}
> if(splitsSerializedSize > spillThreshold) {
>     inputPayLoad.setBoolean(
>             
> org.apache.tez.mapreduce.hadoop.MRJobConfig.MR_TEZ_SPLITS_VIA_EVENTS,
>             false);
>     // Write splits to disk
>     Path inputSplitsDir = FileLocalizer.getTemporaryPath(pc);
>     log.info("Writing input splits to " + inputSplitsDir
>             + " for vertex " + vertex.getName()
>             + " as the serialized size in memory is "
>             + splitsSerializedSize + ". Configured "
>             + PigConfiguration.PIG_TEZ_INPUT_SPLITS_MEM_THRESHOLD
>             + " is " + spillThreshold);
>     inputSplitInfo = MRToTezHelper.writeInputSplitInfoToDisk(
>             (InputSplitInfoMem)inputSplitInfo, inputSplitsDir, payloadConf, 
> fs);
> {code}
> [https://github.com/apache/pig/blob/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java#L960]
>  
> [https://github.com/apache/pig/blob/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/MRToTezHelper.java#L302-L314]
> Solution:
>  1. Do not serialize the split in LoaderProcessor.java
>  2. In TezDagBuilder.java, serialize each input split and keep adding its 
> size and if it exceeds spillThreshold, then write the splits to disk reusing 
> the serialized buffers for each split.
>  
> Thank you [~rohini] for identifying the issue.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to