TEZ-3430. Make split sorting optional. Add missing newly added test.
Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/04d609e7 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/04d609e7 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/04d609e7 Branch: refs/heads/TEZ-3334 Commit: 04d609e7f222d5c830e846a2bea4eb770279ed00 Parents: c1a7f10 Author: Ming Ma <[email protected]> Authored: Tue Oct 18 08:47:42 2016 -0700 Committer: Ming Ma <[email protected]> Committed: Tue Oct 18 08:48:18 2016 -0700 ---------------------------------------------------------------------- .../common/TestMRInputAMSplitGenerator.java | 241 +++++++++++++++++++ 1 file changed, 241 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/04d609e7/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/common/TestMRInputAMSplitGenerator.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/common/TestMRInputAMSplitGenerator.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/common/TestMRInputAMSplitGenerator.java new file mode 100644 index 0000000..bd4e5a9 --- /dev/null +++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/common/TestMRInputAMSplitGenerator.java @@ -0,0 +1,241 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.mapreduce.common; + +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.split.TezGroupedSplit; +import org.apache.tez.dag.api.DataSourceDescriptor; +import org.apache.tez.dag.api.UserPayload; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.conf.Configuration; +import org.apache.tez.mapreduce.TezTestUtils; +import org.apache.tez.mapreduce.input.MRInput; +import org.apache.tez.mapreduce.lib.MRInputUtils; +import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitProto; +import org.apache.tez.runtime.api.Event; +import org.apache.tez.runtime.api.InputInitializerContext; +import org.apache.tez.runtime.api.events.InputConfigureVertexTasksEvent; +import org.apache.tez.runtime.api.events.InputDataInformationEvent; +import org.junit.Test; + +import com.google.protobuf.ByteString; + +public class TestMRInputAMSplitGenerator { + + private static String SPLITS_LENGTHS = "splits.length"; + + @Test(timeout = 5000) + public void testGroupSplitsDisabledSortSplitsEnabled() + throws Exception { + testGroupSplitsAndSortSplits(false, true); + } + + @Test(timeout = 5000) + public void testGroupSplitsDisabledSortSplitsDisabled() + throws Exception { + testGroupSplitsAndSortSplits(false, false); + } + + @Test(timeout = 5000) + public void testGroupSplitsEnabledSortSplitsEnabled() + throws Exception { + testGroupSplitsAndSortSplits(true, true); + } + + @Test(timeout = 5000) + public void testGroupSplitsEnabledSortSplitsDisabled() + throws Exception { + testGroupSplitsAndSortSplits(true, false); + } + + private void testGroupSplitsAndSortSplits(boolean groupSplitsEnabled, + boolean sortSplitsEnabled) throws Exception { + Configuration conf = new Configuration(); + String[] splitLengths = new String[] {"1000", "2000", "3000"}; + conf.setStrings(SPLITS_LENGTHS, splitLengths); + DataSourceDescriptor dataSource = MRInput.createConfigBuilder( + conf, InputFormatForTest.class). + groupSplits(groupSplitsEnabled).sortSplits(sortSplitsEnabled).build(); + UserPayload userPayload = dataSource.getInputDescriptor().getUserPayload(); + + InputInitializerContext context = + new TezTestUtils.TezRootInputInitializerContextForTest(userPayload); + MRInputAMSplitGenerator splitGenerator = + new MRInputAMSplitGenerator(context); + + List<Event> events = splitGenerator.initialize(); + + assertEquals(splitLengths.length + 1, events.size()); + assertTrue(events.get(0) instanceof InputConfigureVertexTasksEvent); + for (int i = 1; i < splitLengths.length + 1; i++) { + assertTrue(events.get(i) instanceof InputDataInformationEvent); + InputDataInformationEvent diEvent = (InputDataInformationEvent) (events.get(i)); + assertNull(diEvent.getDeserializedUserPayload()); + assertNotNull(diEvent.getUserPayload()); + MRSplitProto eventProto = MRSplitProto.parseFrom(ByteString.copyFrom( + diEvent.getUserPayload())); + InputSplit is = MRInputUtils.getNewSplitDetailsFromEvent(eventProto, new Configuration()); + if (groupSplitsEnabled) { + // For this configuration, there is no actual split grouping. + is = ((TezGroupedSplit)is).getGroupedSplits().get(0); + } + assertTrue(is instanceof InputSplitForTest); + // The splits in the list returned from InputFormat has ascending + // size in order. MRInputAMSplitGenerator might sort the list + // from InputFormat depending on sortSplitsEnabled. + if (i == 1) { + // The first split returned from MRInputAMSplitGenerator. + // When sort split is enabled, the first split returned from + // MRInputAMSplitGenerator is the last split in the list returned + // from InputFormat. + assertEquals(sortSplitsEnabled ? splitLengths.length : 1, + ((InputSplitForTest) is).getIdentifier()); + } else if (i == splitLengths.length) { + // The last split returned from MRInputAMSplitGenerator + // When sort split is enabled, the last split returned from + // MRInputAMSplitGenerator is the first split in the list returned + // from InputFormat. + assertEquals(sortSplitsEnabled ? 1 : splitLengths.length, + ((InputSplitForTest) is).getIdentifier()); + } + } + } + + private static class InputFormatForTest + extends InputFormat<IntWritable, IntWritable> { + + @Override + public RecordReader<IntWritable, IntWritable> createRecordReader( + org.apache.hadoop.mapreduce.InputSplit split, + TaskAttemptContext context) throws IOException, + InterruptedException { + return new RecordReader<IntWritable, IntWritable>() { + + private boolean done = false; + + @Override + public void close() throws IOException { + } + + @Override + public IntWritable getCurrentKey() throws IOException, + InterruptedException { + return new IntWritable(0); + } + + @Override + public IntWritable getCurrentValue() throws IOException, + InterruptedException { + return new IntWritable(0); + } + + @Override + public float getProgress() throws IOException, InterruptedException { + return done ? 0 : 1; + } + + @Override + public void initialize(org.apache.hadoop.mapreduce.InputSplit split, TaskAttemptContext context) + throws IOException, InterruptedException { + } + + @Override + public boolean nextKeyValue() throws IOException, InterruptedException { + if (!done) { + done = true; + return true; + } + return false; + } + }; + } + + @Override + public List<org.apache.hadoop.mapreduce.InputSplit> getSplits( + JobContext context) throws IOException, InterruptedException { + List<org.apache.hadoop.mapreduce.InputSplit> list = new ArrayList<org.apache.hadoop.mapreduce.InputSplit>(); + int[] lengths = context.getConfiguration().getInts(SPLITS_LENGTHS); + for (int i = 0; i < lengths.length; i++) { + list.add(new InputSplitForTest(i + 1, lengths[i])); + } + return list; + } + } + + @Private + public static class InputSplitForTest extends InputSplit + implements Writable { + + private int identifier; + private int length; + + @SuppressWarnings("unused") + public InputSplitForTest() { + // For writable + } + + public int getIdentifier() { + return this.identifier; + } + public InputSplitForTest(int identifier, int length) { + this.identifier = identifier; + this.length = length; + } + + @Override + public void write(DataOutput out) throws IOException { + out.writeInt(identifier); + out.writeInt(length); + } + + @Override + public void readFields(DataInput in) throws IOException { + identifier = in.readInt(); + length = in.readInt(); + } + + @Override + public long getLength() throws IOException { + return length; + } + + @Override + public String[] getLocations() throws IOException { + return new String[] {"localhost"}; + } + } +}
