Author: rohini Date: Tue Oct 9 02:43:02 2018 New Revision: 1843214 URL: http://svn.apache.org/viewvc?rev=1843214&view=rev Log: PIG-5359: Reduce time spent in split serialization (satishsaley via rohini)
Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/SerializationInfo.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezInputHelper.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezJobSplitWriter.java Modified: pig/trunk/CHANGES.txt pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/LoaderProcessor.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/MRToTezHelper.java Modified: pig/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1843214&r1=1843213&r2=1843214&view=diff ============================================================================== --- pig/trunk/CHANGES.txt (original) +++ pig/trunk/CHANGES.txt Tue Oct 9 02:43:02 2018 @@ -26,6 +26,8 @@ PIG-5282: Upgade to Java 8 (satishsaley IMPROVEMENTS +PIG-5359: Reduce time spent in split serialization (satishsaley via rohini) + PIG-5357: BagFactory interface should support creating a distinct bag from a set (jtolar via rohini) PIG-5354: Show fieldname and a line number for casting errors (knoguchi) Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java?rev=1843214&r1=1843213&r2=1843214&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java Tue Oct 9 02:43:02 2018 @@ -108,6 +108,7 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.backend.hadoop.executionengine.tez.util.MRToTezHelper; import org.apache.pig.backend.hadoop.executionengine.tez.util.SecurityHelper; import org.apache.pig.backend.hadoop.executionengine.tez.util.TezCompilerUtil; +import org.apache.pig.backend.hadoop.executionengine.tez.util.TezInputHelper; import org.apache.pig.backend.hadoop.executionengine.tez.util.TezUDFContextSeparator; import org.apache.pig.data.DataType; import org.apache.pig.impl.PigContext; @@ -119,6 +120,7 @@ import org.apache.pig.impl.plan.Dependen import org.apache.pig.impl.plan.OperatorKey; import org.apache.pig.impl.plan.VisitorException; import org.apache.pig.impl.util.ObjectSerializer; +import org.apache.pig.impl.util.Pair; import org.apache.pig.impl.util.UDFContext; import org.apache.pig.impl.util.UDFContextSeparator.UDFType; import org.apache.pig.tools.pigstats.tez.TezScriptState; @@ -947,30 +949,33 @@ public class TezDagBuilder extends TezOp // Currently inputSplitInfo is always InputSplitInfoMem at this point if (inputSplitInfo instanceof InputSplitInfoMem) { - MRSplitsProto splitsProto = inputSplitInfo.getSplitsProto(); - int splitsSerializedSize = splitsProto.getSerializedSize(); - if(splitsSerializedSize > spillThreshold) { + MRSplitsProto.Builder splitsBuilder = MRSplitsProto.newBuilder(); + Pair<Long, Boolean> serializationInfo = TezInputHelper.createSplitsProto(inputSplitInfo, pigContextConf, splitsBuilder, + spillThreshold); + MRSplitsProto splitsProto = splitsBuilder.build(); + if(!serializationInfo.second) { + //write to disk inputPayLoad.setBoolean( org.apache.tez.mapreduce.hadoop.MRJobConfig.MR_TEZ_SPLITS_VIA_EVENTS, false); - // Write splits to disk - Path inputSplitsDir = FileLocalizer.getTemporaryPath(pc); - log.info("Writing input splits to " + inputSplitsDir + // Write splits to disk + Path inputSplitsDir = FileLocalizer.getTemporaryPath(pc); + log.info("Writing input splits to " + inputSplitsDir + " for vertex " + vertex.getName() - + " as the serialized size in memory is " - + splitsSerializedSize + ". Configured " + + " as the partially serialized size in memory is " + + serializationInfo.first + ". Configured " + PigConfiguration.PIG_TEZ_INPUT_SPLITS_MEM_THRESHOLD + " is " + spillThreshold); - inputSplitInfo = MRToTezHelper.writeInputSplitInfoToDisk( - (InputSplitInfoMem)inputSplitInfo, inputSplitsDir, payloadConf, fs); - additionalLocalResources = new HashMap<String, LocalResource>(); - MRToTezHelper.updateLocalResourcesForInputSplits( + inputSplitInfo = MRToTezHelper.writeInputSplitInfoToDisk( + (InputSplitInfoMem)inputSplitInfo, inputSplitsDir, payloadConf, fs, splitsProto); + additionalLocalResources = new HashMap<String, LocalResource>(); + MRToTezHelper.updateLocalResourcesForInputSplits( fs, inputSplitInfo, additionalLocalResources); - inputSplitInDiskVertices.add(vertex.getName()); + inputSplitInDiskVertices.add(vertex.getName()); } else { - // Send splits via RPC to AM - userPayLoadBuilder.setSplits(splitsProto); + // Send splits via RPC to AM + userPayLoadBuilder.setSplits(splitsProto); } //Free up memory tezOp.getLoaderInfo().setInputSplitInfo(null); Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/LoaderProcessor.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/LoaderProcessor.java?rev=1843214&r1=1843213&r2=1843214&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/LoaderProcessor.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/LoaderProcessor.java Tue Oct 9 02:43:02 2018 @@ -39,6 +39,7 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperPlan; import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperator; import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POSimpleTezLoad; +import org.apache.pig.backend.hadoop.executionengine.tez.util.TezInputHelper; import org.apache.pig.impl.PigContext; import org.apache.pig.impl.io.FileSpec; import org.apache.pig.impl.plan.DependencyOrderWalker; @@ -169,7 +170,7 @@ public class LoaderProcessor extends Tez tezOp.getLoaderInfo().setInpLimits(inpLimits); // Not using MRInputAMSplitGenerator because delegation tokens are // fetched in FileInputFormat - tezOp.getLoaderInfo().setInputSplitInfo(MRInputHelpers.generateInputSplitsToMem(conf, false, 0)); + tezOp.getLoaderInfo().setInputSplitInfo(TezInputHelper.generateInputSplitsToMem(conf)); // TODO: Can be set to -1 if TEZ-601 gets fixed and getting input // splits can be moved to if(loads) block below int parallelism = tezOp.getLoaderInfo().getInputSplitInfo().getNumTasks(); Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/MRToTezHelper.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/MRToTezHelper.java?rev=1843214&r1=1843213&r2=1843214&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/MRToTezHelper.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/MRToTezHelper.java Tue Oct 9 02:43:02 2018 @@ -39,7 +39,6 @@ import org.apache.hadoop.mapreduce.MRJob import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; -import org.apache.hadoop.mapreduce.split.JobSplitWriter; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.LocalResourceType; import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; @@ -55,6 +54,7 @@ import org.apache.tez.mapreduce.hadoop.D import org.apache.tez.mapreduce.hadoop.InputSplitInfo; import org.apache.tez.mapreduce.hadoop.InputSplitInfoDisk; import org.apache.tez.mapreduce.hadoop.InputSplitInfoMem; +import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitsProto; @InterfaceAudience.Private public class MRToTezHelper { @@ -62,7 +62,6 @@ public class MRToTezHelper { private static final Log LOG = LogFactory.getLog(MRToTezHelper.class); private static final String JOB_SPLIT_RESOURCE_NAME = MRJobConfig.JOB_SPLIT; private static final String JOB_SPLIT_METAINFO_RESOURCE_NAME = MRJobConfig.JOB_SPLIT_METAINFO; - private static Map<String, String> mrAMParamToTezAMParamMap = new HashMap<String, String>(); private static Map<String, String> mrMapParamToTezVertexParamMap = new HashMap<String, String>(); private static Map<String, String> mrReduceParamToTezVertexParamMap = new HashMap<String, String>(); @@ -297,14 +296,23 @@ public class MRToTezHelper { } /** - * Write input splits (job.split and job.splitmetainfo) to disk + * Write input splits (job.split and job.splitmetainfo) to disk. It uses already + * serialized splits from given MRSplitsProto + * @param infoMem + * @param inputSplitsDir + * @param jobConf + * @param fs + * @param splitsProto MRSplitsProto containing already serialized splits + * @return + * @throws IOException + * @throws InterruptedException */ public static InputSplitInfoDisk writeInputSplitInfoToDisk( InputSplitInfoMem infoMem, Path inputSplitsDir, JobConf jobConf, - FileSystem fs) throws IOException, InterruptedException { + FileSystem fs, MRSplitsProto splitsProto) throws IOException, InterruptedException { InputSplit[] splits = infoMem.getNewFormatSplits(); - JobSplitWriter.createSplitFiles(inputSplitsDir, jobConf, fs, splits); + TezJobSplitWriter.createSplitFiles(inputSplitsDir, jobConf, fs, splits, splitsProto); return new InputSplitInfoDisk( JobSubmissionFiles.getJobSplitFile(inputSplitsDir), Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/SerializationInfo.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/SerializationInfo.java?rev=1843214&view=auto ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/SerializationInfo.java (added) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/SerializationInfo.java Tue Oct 9 02:43:02 2018 @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pig.backend.hadoop.executionengine.tez.util; + +/** + * Wrapper class to contain information about serialization. + */ +public class SerializationInfo { + private boolean allSerialized; + private long serializedSize = 0L; + /** + * @return true if all splits are serialized, otherwise false; + */ + public boolean isAllSerialized() { + return allSerialized; + } + public void setAllSerialized(boolean allSerialized) { + this.allSerialized = allSerialized; + } + /** + * @return size of serialized splits. + */ + public long getSerializedSize() { + return serializedSize; + } + public void setSerializedSize(long serializedSize) { + this.serializedSize = serializedSize; + } + /** + * Increment serialized size + * @param increment + */ + public void incrSerializedSize(long increment) { + this.serializedSize += increment; + } +} Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezInputHelper.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezInputHelper.java?rev=1843214&view=auto ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezInputHelper.java (added) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezInputHelper.java Tue Oct 9 02:43:02 2018 @@ -0,0 +1,190 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pig.backend.hadoop.executionengine.tez.util; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Comparator; +import java.util.HashSet; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.serializer.SerializationFactory; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.pig.impl.util.Pair; +import org.apache.tez.dag.api.TaskLocationHint; +import org.apache.tez.dag.api.TezUncheckedException; +import org.apache.tez.mapreduce.hadoop.InputSplitInfo; +import org.apache.tez.mapreduce.hadoop.InputSplitInfoMem; +import org.apache.tez.mapreduce.hadoop.MRInputHelpers; +import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitProto; +import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitsProto; + +public class TezInputHelper { + private static final Log LOG = LogFactory.getLog(TezInputHelper.class); + + /** + * This method creates input splits similar to + * {@link org.apache.tez.mapreduce.hadoop.MRInputHelpers#generateInputSplitsToMem} + * but only does it for mapreduce API and does not do grouping of splits or create + * {@link org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitsProto} + * which is an expensive operation. + * + * @param conf an instance of Configuration. This + * Configuration instance should contain adequate information to + * be able to generate splits - like the InputFormat being used and + * related configuration. + * @return an instance of {@link InputSplitInfoMem} which supports a subset + * of the APIs defined on {@link InputSplitInfo} + * @throws IOException + * @throws ClassNotFoundException + * @throws InterruptedException + */ + public static InputSplitInfoMem generateInputSplitsToMem(Configuration conf) + throws IOException, ClassNotFoundException, InterruptedException { + + InputSplitInfoMem splitInfoMem = null; + if (LOG.isDebugEnabled()) { + LOG.debug("Generating mapreduce api input splits"); + } + Job job = Job.getInstance(conf); + org.apache.hadoop.mapreduce.InputSplit[] splits = generateSplits(job); + splitInfoMem = new InputSplitInfoMem(splits, createTaskLocationHintsFromSplits(splits), splits.length, + job.getCredentials(), job.getConfiguration()); + return splitInfoMem; + } + + private static org.apache.hadoop.mapreduce.InputSplit[] generateSplits(JobContext jobContext) + throws ClassNotFoundException, IOException, InterruptedException { + Configuration conf = jobContext.getConfiguration(); + + // This is the real input format. + org.apache.hadoop.mapreduce.InputFormat<?, ?> inputFormat = null; + try { + inputFormat = ReflectionUtils.newInstance(jobContext.getInputFormatClass(), conf); + } + catch (ClassNotFoundException e) { + throw new TezUncheckedException(e); + } + + org.apache.hadoop.mapreduce.InputFormat<?, ?> finalInputFormat = inputFormat; + List<org.apache.hadoop.mapreduce.InputSplit> array = finalInputFormat.getSplits(jobContext); + org.apache.hadoop.mapreduce.InputSplit[] splits = (org.apache.hadoop.mapreduce.InputSplit[]) array + .toArray(new org.apache.hadoop.mapreduce.InputSplit[array.size()]); + + // sort the splits into order based on size, so that the biggest + // go first + Arrays.sort(splits, new InputSplitComparator()); + return splits; + } + + /** + * Comparator for org.apache.hadoop.mapreduce.InputSplit + */ + private static class InputSplitComparator implements Comparator<org.apache.hadoop.mapreduce.InputSplit> { + @Override + public int compare(org.apache.hadoop.mapreduce.InputSplit o1, org.apache.hadoop.mapreduce.InputSplit o2) { + try { + long len1 = o1.getLength(); + long len2 = o2.getLength(); + if (len1 < len2) { + return 1; + } + else if (len1 == len2) { + return 0; + } + else { + return -1; + } + } + catch (IOException ie) { + throw new RuntimeException("exception in InputSplit compare", ie); + } + catch (InterruptedException ie) { + throw new RuntimeException("exception in InputSplit compare", ie); + } + } + } + + private static List<TaskLocationHint> createTaskLocationHintsFromSplits( + org.apache.hadoop.mapreduce.InputSplit[] newFormatSplits) throws IOException, InterruptedException { + List<TaskLocationHint> listLocationHint = new ArrayList<>(newFormatSplits.length); + for(org.apache.hadoop.mapreduce.InputSplit input : newFormatSplits) { + listLocationHint.add(TaskLocationHint.createTaskLocationHint( + new HashSet<String>(Arrays.asList(input.getLocations())), null)); + } + return listLocationHint; + } + + /** + * Creates MRSplitsProto from inputSplitInfo and adds into splitsBuilder. + * @param inputSplitInfo + * @param conf + * @param splitsBuilder + * @param spillThreshold + * @return Pair containing first element, a long, as serialized size and second element, a boolean, as true if all splits are serialized. Second element + * will be false, if only some of the splits are serialized because we reached to spillThreshold. + */ + public static Pair<Long, Boolean> createSplitsProto(InputSplitInfo inputSplitInfo, Configuration conf, MRSplitsProto.Builder splitsBuilder, long spillThreshold + ) { + try { + return createSplitsProto(inputSplitInfo.getNewFormatSplits(), new SerializationFactory(conf), splitsBuilder, spillThreshold); + } catch (IOException e) { + throw new RuntimeException(e); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + /** + * Creates MRSplitsProto from given org.apache.hadoop.mapreduce.InputSplit and adds into splitsBuilder. + * @param inputSplits + * @param serializationFactory + * @param splitsBuilder + * @param spillThreshold + * @return + * @throws IOException + * @throws InterruptedException + */ + private static Pair<Long, Boolean> createSplitsProto(InputSplit[] inputSplits, + SerializationFactory serializationFactory, MRSplitsProto.Builder splitsBuilder, + long spillThreshold) throws IOException, InterruptedException { + MRSplitProto split = null; + long serializedSize = 0; + boolean allSerialized = true; + for (int i=0;i<inputSplits.length;i++) { + split = MRInputHelpers.createSplitProto(inputSplits[i], serializationFactory); + serializedSize += split.getSerializedSize(); + splitsBuilder.addSplits(split); + // check for threshold after adding split, it may cause splitsSerializedSize to become more than spillThreshold, + // but we don't want to waste already serialized split + if(serializedSize > spillThreshold && i != (inputSplits.length - 1)) { + allSerialized = false; + break; + } + } + return new Pair<Long,Boolean>(serializedSize, allSerialized); + } +} Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezJobSplitWriter.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezJobSplitWriter.java?rev=1843214&view=auto ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezJobSplitWriter.java (added) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezJobSplitWriter.java Tue Oct 9 02:43:02 2018 @@ -0,0 +1,175 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.backend.hadoop.executionengine.tez.util; + +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.util.Arrays; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.WritableUtils; +import org.apache.hadoop.io.serializer.SerializationFactory; +import org.apache.hadoop.io.serializer.Serializer; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobSubmissionFiles; +import org.apache.hadoop.mapreduce.MRConfig; +import org.apache.hadoop.mapreduce.split.JobSplit; +import org.apache.hadoop.mapreduce.split.JobSplit.SplitMetaInfo; +import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitProto; +import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitsProto; + +public class TezJobSplitWriter { + private static final Log LOG = LogFactory.getLog(TezJobSplitWriter.class); + private static final int splitVersion = 1; + private static final byte[] SPLIT_FILE_HEADER; + static { + try { + SPLIT_FILE_HEADER = "SPL".getBytes("UTF-8"); + } + catch (UnsupportedEncodingException u) { + throw new RuntimeException(u); + } + } + static final byte[] META_SPLIT_FILE_HEADER; + static { + try { + META_SPLIT_FILE_HEADER = "META-SPL".getBytes("UTF-8"); + } + catch (UnsupportedEncodingException u) { + throw new RuntimeException(u); + } + } + + /** + * Create split files and write splits as well as as splits metadata + * @param jobSubmitDir + * @param conf + * @param fs + * @param splits + * @param splitsProto + * @throws IOException + * @throws InterruptedException + */ + public static <T extends InputSplit> void createSplitFiles(Path jobSubmitDir, Configuration conf, FileSystem fs, + T[] splits, MRSplitsProto splitsProto) throws IOException, InterruptedException { + FSDataOutputStream out = createFile(fs, JobSubmissionFiles.getJobSplitFile(jobSubmitDir), conf); + SplitMetaInfo[] info = writeSplits(conf, splits, out, splitsProto); + out.close(); + writeJobSplitMetaInfo(fs, JobSubmissionFiles.getJobSplitMetaFile(jobSubmitDir), + new FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION), splitVersion, info); + } + + private static FSDataOutputStream createFile(FileSystem fs, Path splitFile, Configuration job) throws IOException { + FSDataOutputStream out = FileSystem.create(fs, splitFile, + new FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION)); + int replication = job.getInt(Job.SUBMIT_REPLICATION, 10); + fs.setReplication(splitFile, (short) replication); + writeSplitHeader(out); + return out; + } + + private static void writeSplitHeader(FSDataOutputStream out) throws IOException { + out.write(SPLIT_FILE_HEADER); + out.writeInt(splitVersion); + } + + /** + * If there are already serialized splits in <code>splitsProto</code>, then write those splits, else + * serialize and writes the splits. + * @param conf + * @param array + * @param out + * @param splitsProto + * @return + * @throws IOException + * @throws InterruptedException + */ + @SuppressWarnings("unchecked") + private static <T extends InputSplit> SplitMetaInfo[] writeSplits(Configuration conf, T[] array, + FSDataOutputStream out, MRSplitsProto splitsProto) throws IOException, InterruptedException { + SplitMetaInfo[] info = null; + if (array.length != 0) { + info = new SplitMetaInfo[array.length]; + SerializationFactory factory = new SerializationFactory(conf); + int maxBlockLocations = conf.getInt(MRConfig.MAX_BLOCK_LOCATIONS_KEY, MRConfig.MAX_BLOCK_LOCATIONS_DEFAULT); + long offset = out.getPos(); + int i = 0; + for(MRSplitProto splitProto : splitsProto.getSplitsList()) { + long prevCount = out.getPos(); + Text.writeString(out, splitProto.getSplitClassName()); + splitProto.getSplitBytes().writeTo(out); + info[i++] = createSplitMetaInfo(array[i], offset, maxBlockLocations); + offset += out.getPos() - prevCount; + } + while(i < array.length) { + long prevCount = out.getPos(); + Text.writeString(out, array[i].getClass().getName()); + Serializer<T> serializer = factory.getSerializer((Class<T>) array[i].getClass()); + serializer.open(out); + serializer.serialize(array[i]); + info[i++] = createSplitMetaInfo(array[i], offset, maxBlockLocations); + offset += out.getPos() - prevCount; + } + } + LOG.info("Size of serialized job.split file is " + out.getPos()); + return info; + } + + /** + * Serializes split and write to given FSDataOutputStream. + * If splitProto contains already serialized splits, write those to given FSDataOutputStream. + * @param split + * @param offset + * @param maxBlockLocations + * @return + * @throws IOException + * @throws InterruptedException + */ + private static <T extends InputSplit> SplitMetaInfo createSplitMetaInfo(T split, + long offset, int maxBlockLocations) throws IOException, InterruptedException { + String[] locations = split.getLocations(); + if (locations.length > maxBlockLocations) { + LOG.warn("Max block location exceeded for split: " + split + " splitsize: " + locations.length + + " maxsize: " + maxBlockLocations); + locations = Arrays.copyOf(locations, maxBlockLocations); + } + return new JobSplit.SplitMetaInfo(locations, offset, split.getLength()); + } + + private static void writeJobSplitMetaInfo(FileSystem fs, Path filename, FsPermission p, int splitMetaInfoVersion, + JobSplit.SplitMetaInfo[] allSplitMetaInfo) throws IOException { + // write the splits meta-info to a file for the job tracker + FSDataOutputStream out = FileSystem.create(fs, filename, p); + out.write(META_SPLIT_FILE_HEADER); + WritableUtils.writeVInt(out, splitMetaInfoVersion); + WritableUtils.writeVInt(out, allSplitMetaInfo.length); + for (JobSplit.SplitMetaInfo splitMetaInfo : allSplitMetaInfo) { + splitMetaInfo.write(out); + } + out.close(); + } + +}