Satish Subhashrao Saley created PIG-5359:
--------------------------------------------

             Summary: Reduce time spent in split serialization
                 Key: PIG-5359
                 URL: https://issues.apache.org/jira/browse/PIG-5359
             Project: Pig
          Issue Type: Improvement
            Reporter: Satish Subhashrao Saley
            Assignee: Satish Subhashrao Saley


1. Unnecessary serialization of splits in Tez.
 In LoaderProcessor, pig calls
 
[https://github.com/apache/pig/blob/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/LoaderProcessor.java#L172]
{code:java}
tezOp.getLoaderInfo().setInputSplitInfo(MRInputHelpers.generateInputSplitsToMem(conf,
 false, 0));
{code}
It ends up serializing the splits, just to print log.

[https://github.com/apache/tez/blob/master/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRInputHelpers.java#L317]
{code:java}
  public static InputSplitInfoMem generateInputSplitsToMem(Configuration conf,
      boolean groupSplits, boolean sortSplits, int targetTasks)
      throws IOException, ClassNotFoundException, InterruptedException {
      ....
      ....
          LOG.info("NumSplits: " + splitInfoMem.getNumTasks() + ", 
SerializedSize: "
        + splitInfoMem.getSplitsProto().getSerializedSize());
    return splitInfoMem;

{code}
[https://github.com/apache/tez/blob/master/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/InputSplitInfoMem.java#L106]
{code:java}
  public MRSplitsProto getSplitsProto() {
    if (isNewSplit) {
      try {
        return createSplitsProto(newFormatSplits, new 
SerializationFactory(conf));

{code}
[https://github.com/apache/tez/blob/master/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/InputSplitInfoMem.java#L152-L170]
{code:java}
  private static MRSplitsProto createSplitsProto(
      org.apache.hadoop.mapreduce.InputSplit[] newSplits,
      SerializationFactory serializationFactory) throws IOException,
      InterruptedException {
    MRSplitsProto.Builder splitsBuilder = MRSplitsProto.newBuilder();

    for (org.apache.hadoop.mapreduce.InputSplit newSplit : newSplits) {
      splitsBuilder.addSplits(MRInputHelpers.createSplitProto(newSplit, 
serializationFactory));
    }
    return splitsBuilder.build();
  }

{code}
[https://github.com/apache/tez/blob/master/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRInputHelpers.java#L221-L259]

2. In TezDagBuilder, if splitsSerializedSize > spillThreshold, then the 
InputSplits serialized in MRSplitsProto are not used by Pig and it serializes 
again directly to disk via JobSplitWriter.createSplitFiles. So the InputSplit 
serialization logic is called again which is wasteful and expensive in cases 
like HCat.

[https://github.com/apache/pig/blob/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java#L946-L947]
{code:java}
MRSplitsProto splitsProto = inputSplitInfo.getSplitsProto();
int splitsSerializedSize = splitsProto.getSerializedSize();
{code}
The getSplitsProto, creates MRSplitsProto which consists of list of 
MRSplitProto. MRSplitProto has serialized bytes of each InputFormat. If 
splitsSerializedSize > spillThreshold, pig writes the splits to disk via
{code:java}
if(splitsSerializedSize > spillThreshold) {
    inputPayLoad.setBoolean(
            
org.apache.tez.mapreduce.hadoop.MRJobConfig.MR_TEZ_SPLITS_VIA_EVENTS,
            false);
    // Write splits to disk
    Path inputSplitsDir = FileLocalizer.getTemporaryPath(pc);
    log.info("Writing input splits to " + inputSplitsDir
            + " for vertex " + vertex.getName()
            + " as the serialized size in memory is "
            + splitsSerializedSize + ". Configured "
            + PigConfiguration.PIG_TEZ_INPUT_SPLITS_MEM_THRESHOLD
            + " is " + spillThreshold);
    inputSplitInfo = MRToTezHelper.writeInputSplitInfoToDisk(
            (InputSplitInfoMem)inputSplitInfo, inputSplitsDir, payloadConf, fs);

{code}
[https://github.com/apache/pig/blob/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java#L960]
 
[https://github.com/apache/pig/blob/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/MRToTezHelper.java#L302-L314]

Solution:
 1. Do not serialize the split in LoaderProcessor.java
 2. In TezDagBuilder.java, serialize each input split and keep adding its size 
and if it exceeds spillThreshold, then write the splits to disk reusing the 
serialized buffers for each split.

 

Thank you [~rohini] for identifying the issue.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to