Repository: tez Updated Branches: refs/heads/master 38b39003b -> 4de112b68
TEZ-2879. While grouping splits, allow an alternate list of preferred locations to be provided per split. (sseth) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/4de112b6 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/4de112b6 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/4de112b6 Branch: refs/heads/master Commit: 4de112b689d06babdbcc2fcf31d4cf008994247a Parents: 38b3900 Author: Siddharth Seth <[email protected]> Authored: Mon Nov 9 15:57:53 2015 -0800 Committer: Siddharth Seth <[email protected]> Committed: Mon Nov 9 15:57:53 2015 -0800 ---------------------------------------------------------------------- .../mapred/split/SplitLocationProvider.java | 26 ++++++ .../split/TezGroupedSplitsInputFormat.java | 14 +++- .../mapred/split/TezMapredSplitsGrouper.java | 16 +++- .../mapreduce/split/SplitLocationProvider.java | 26 ++++++ .../split/SplitLocationProviderMapReduce.java | 39 +++++++++ .../split/TezGroupedSplitsInputFormat.java | 15 +++- .../split/TezMapReduceSplitsGrouper.java | 13 ++- .../grouper/SplitLocationProviderWrapper.java | 24 ++++++ .../SplitLocationProviderWrapperMapred.java | 37 +++++++++ .../tez/mapreduce/grouper/TezSplitGrouper.java | 28 +++++-- .../hadoop/mapred/split/TestGroupedSplits.java | 86 ++++++++++++++++++++ 11 files changed, 311 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/4de112b6/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/SplitLocationProvider.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/SplitLocationProvider.java b/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/SplitLocationProvider.java new file mode 100644 index 0000000..f97d9ae --- /dev/null +++ b/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/SplitLocationProvider.java @@ -0,0 +1,26 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred.split; + +import java.io.IOException; + +import org.apache.hadoop.mapred.InputSplit; + +/** + * Provides location information for the given split + */ +public interface SplitLocationProvider { + String[] getLocations(InputSplit split) throws IOException; +} http://git-wip-us.apache.org/repos/asf/tez/blob/4de112b6/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezGroupedSplitsInputFormat.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezGroupedSplitsInputFormat.java b/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezGroupedSplitsInputFormat.java index b361aec..e082e3a 100644 --- a/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezGroupedSplitsInputFormat.java +++ b/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezGroupedSplitsInputFormat.java @@ -31,7 +31,6 @@ import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapred.Reporter; -import org.apache.hadoop.mapred.split.SplitSizeEstimator; import org.apache.tez.common.ReflectionUtils; import org.apache.tez.dag.api.TezException; @@ -53,6 +52,7 @@ public class TezGroupedSplitsInputFormat<K, V> Configuration conf; SplitSizeEstimator estimator; + SplitLocationProvider locationProvider; public TezGroupedSplitsInputFormat() { @@ -72,6 +72,14 @@ public class TezGroupedSplitsInputFormat<K, V> LOG.debug("Split size estimator : " + estimator); } } + + public void setSplitLocationProvider(SplitLocationProvider locationProvider) { + Preconditions.checkArgument(locationProvider != null); + this.locationProvider = locationProvider; + if (LOG.isDebugEnabled()) { + LOG.debug("Split size location provider: " + locationProvider); + } + } public void setDesiredNumberOfSplits(int num) { Preconditions.checkArgument(num >= 0); @@ -86,7 +94,9 @@ public class TezGroupedSplitsInputFormat<K, V> InputSplit[] originalSplits = wrappedInputFormat.getSplits(job, numSplits); TezMapredSplitsGrouper grouper = new TezMapredSplitsGrouper(); String wrappedInputFormatName = wrappedInputFormat.getClass().getName(); - return grouper.getGroupedSplits(conf, originalSplits, desiredNumSplits, wrappedInputFormatName, estimator); + return grouper + .getGroupedSplits(conf, originalSplits, desiredNumSplits, wrappedInputFormatName, estimator, + locationProvider); } @Override http://git-wip-us.apache.org/repos/asf/tez/blob/4de112b6/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezMapredSplitsGrouper.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezMapredSplitsGrouper.java b/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezMapredSplitsGrouper.java index f2a8a0c..2bfccfa 100644 --- a/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezMapredSplitsGrouper.java +++ b/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezMapredSplitsGrouper.java @@ -28,6 +28,7 @@ import com.google.common.collect.Lists; import org.apache.tez.mapreduce.grouper.GroupedSplitContainer; import org.apache.tez.mapreduce.grouper.MapredSplitContainer; import org.apache.tez.mapreduce.grouper.SplitContainer; +import org.apache.tez.mapreduce.grouper.SplitLocationProviderWrapperMapred; import org.apache.tez.mapreduce.grouper.SplitSizeEstimatorWrapperMapred; import org.apache.tez.mapreduce.grouper.TezSplitGrouper; import org.slf4j.Logger; @@ -54,8 +55,17 @@ public class TezMapredSplitsGrouper extends TezSplitGrouper { } public InputSplit[] getGroupedSplits(Configuration conf, + InputSplit[] originalSplits, int desiredNumSplits, + String wrappedInputFormatName, + SplitSizeEstimator estimator) throws IOException { + return getGroupedSplits(conf, originalSplits, desiredNumSplits, wrappedInputFormatName, + estimator, null); + } + + + public InputSplit[] getGroupedSplits(Configuration conf, InputSplit[] originalSplits, int desiredNumSplits, - String wrappedInputFormatName, SplitSizeEstimator estimator) throws IOException { + String wrappedInputFormatName, SplitSizeEstimator estimator, SplitLocationProvider locationProvider) throws IOException { Preconditions.checkArgument(originalSplits != null, "Splits must be specified"); List<SplitContainer> originalSplitContainers = Lists.transform(Arrays.asList(originalSplits), @@ -70,7 +80,9 @@ public class TezMapredSplitsGrouper extends TezSplitGrouper { List<InputSplit> resultList = Lists.transform(super .getGroupedSplits(conf, originalSplitContainers, desiredNumSplits, wrappedInputFormatName, estimator == null ? null : - new SplitSizeEstimatorWrapperMapred(estimator)), + new SplitSizeEstimatorWrapperMapred(estimator), + locationProvider == null ? null : + new SplitLocationProviderWrapperMapred(locationProvider)), new Function<GroupedSplitContainer, InputSplit>() { @Override public InputSplit apply(GroupedSplitContainer input) { http://git-wip-us.apache.org/repos/asf/tez/blob/4de112b6/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/SplitLocationProvider.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/SplitLocationProvider.java b/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/SplitLocationProvider.java new file mode 100644 index 0000000..e4bada4 --- /dev/null +++ b/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/SplitLocationProvider.java @@ -0,0 +1,26 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce.split; + +import java.io.IOException; + +import org.apache.hadoop.mapreduce.InputSplit; + +/** + * Provides location information for the given split + */ +public interface SplitLocationProvider { + String[] getLocations(InputSplit split) throws IOException, InterruptedException; +} http://git-wip-us.apache.org/repos/asf/tez/blob/4de112b6/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/SplitLocationProviderMapReduce.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/SplitLocationProviderMapReduce.java b/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/SplitLocationProviderMapReduce.java new file mode 100644 index 0000000..2cf76e7 --- /dev/null +++ b/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/SplitLocationProviderMapReduce.java @@ -0,0 +1,39 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce.split; + +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.tez.mapreduce.grouper.MapReduceSplitContainer; +import org.apache.tez.mapreduce.grouper.SplitContainer; +import org.apache.tez.mapreduce.grouper.SplitLocationProviderWrapper; + [email protected] +public class SplitLocationProviderMapReduce implements SplitLocationProviderWrapper { + + private final SplitLocationProvider locationProvider; + + public SplitLocationProviderMapReduce(SplitLocationProvider locationProvider) { + this.locationProvider = locationProvider; + } + + @Override + public String[] getPreferredLocations(SplitContainer rawContainer) throws IOException, + InterruptedException { + MapReduceSplitContainer splitContainer = (MapReduceSplitContainer) rawContainer; + return locationProvider.getLocations(splitContainer.getRawSplit()); + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/4de112b6/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezGroupedSplitsInputFormat.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezGroupedSplitsInputFormat.java b/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezGroupedSplitsInputFormat.java index 49dc70c..5988728 100644 --- a/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezGroupedSplitsInputFormat.java +++ b/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezGroupedSplitsInputFormat.java @@ -54,6 +54,7 @@ public class TezGroupedSplitsInputFormat<K, V> extends InputFormat<K, V> int desiredNumSplits = 0; Configuration conf; SplitSizeEstimator estimator; + SplitLocationProvider locationProvider; public TezGroupedSplitsInputFormat() { @@ -81,14 +82,24 @@ public class TezGroupedSplitsInputFormat<K, V> extends InputFormat<K, V> LOG.debug("Split size estimator : " + estimator); } } - + + public void setSplitLocationProvider(SplitLocationProvider locationProvider) { + Preconditions.checkArgument(locationProvider != null); + this.locationProvider = locationProvider; + if (LOG.isDebugEnabled()) { + LOG.debug("Split location provider : " + locationProvider); + } + } + @Override public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException { List<InputSplit> originalSplits = wrappedInputFormat.getSplits(context); TezMapReduceSplitsGrouper grouper = new TezMapReduceSplitsGrouper(); String wrappedInputFormatName = wrappedInputFormat.getClass().getName(); - return grouper.getGroupedSplits(conf, originalSplits, desiredNumSplits, wrappedInputFormatName, estimator); + return grouper + .getGroupedSplits(conf, originalSplits, desiredNumSplits, wrappedInputFormatName, estimator, + locationProvider); } @Override http://git-wip-us.apache.org/repos/asf/tez/blob/4de112b6/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezMapReduceSplitsGrouper.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezMapReduceSplitsGrouper.java b/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezMapReduceSplitsGrouper.java index 87729bd..b36d11d 100644 --- a/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezMapReduceSplitsGrouper.java +++ b/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezMapReduceSplitsGrouper.java @@ -145,6 +145,15 @@ public class TezMapReduceSplitsGrouper extends TezSplitGrouper { String wrappedInputFormatName, SplitSizeEstimator estimator) throws IOException, InterruptedException { + return getGroupedSplits(conf, originalSplits, desiredNumSplits, wrappedInputFormatName, estimator, null); + } + + public List<InputSplit> getGroupedSplits(Configuration conf, + List<InputSplit> originalSplits, int desiredNumSplits, + String wrappedInputFormatName, + SplitSizeEstimator estimator, + SplitLocationProvider locationProvider) throws IOException, + InterruptedException { Preconditions.checkArgument(originalSplits != null, "Splits must be specified"); List<SplitContainer> originalSplitContainers = Lists.transform(originalSplits, new Function<InputSplit, SplitContainer>() { @@ -158,7 +167,9 @@ public class TezMapReduceSplitsGrouper extends TezSplitGrouper { return Lists.transform(super .getGroupedSplits(conf, originalSplitContainers, desiredNumSplits, wrappedInputFormatName, estimator == null ? null : - new SplitSizeEstimatorWrapperMapReduce(estimator)), + new SplitSizeEstimatorWrapperMapReduce(estimator), + locationProvider == null ? null : + new SplitLocationProviderMapReduce(locationProvider)), new Function<GroupedSplitContainer, InputSplit>() { @Override public InputSplit apply(GroupedSplitContainer input) { http://git-wip-us.apache.org/repos/asf/tez/blob/4de112b6/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/SplitLocationProviderWrapper.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/SplitLocationProviderWrapper.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/SplitLocationProviderWrapper.java new file mode 100644 index 0000000..b30f174 --- /dev/null +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/SplitLocationProviderWrapper.java @@ -0,0 +1,24 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.mapreduce.grouper; + +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience; + [email protected] +public interface SplitLocationProviderWrapper { + String[] getPreferredLocations(SplitContainer splitContainer) throws IOException, InterruptedException; +} http://git-wip-us.apache.org/repos/asf/tez/blob/4de112b6/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/SplitLocationProviderWrapperMapred.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/SplitLocationProviderWrapperMapred.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/SplitLocationProviderWrapperMapred.java new file mode 100644 index 0000000..89a15ba --- /dev/null +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/SplitLocationProviderWrapperMapred.java @@ -0,0 +1,37 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.mapreduce.grouper; + +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.mapred.split.SplitLocationProvider; + [email protected] +public class SplitLocationProviderWrapperMapred implements SplitLocationProviderWrapper { + + private final SplitLocationProvider locationProvider; + + public SplitLocationProviderWrapperMapred(SplitLocationProvider locationProvider) { + this.locationProvider = locationProvider; + } + + @Override + public String[] getPreferredLocations(SplitContainer rawContainer) throws IOException, + InterruptedException { + MapredSplitContainer splitContainer = (MapredSplitContainer)rawContainer; + return locationProvider.getLocations(splitContainer.getRawSplit()); + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/4de112b6/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/TezSplitGrouper.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/TezSplitGrouper.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/TezSplitGrouper.java index eb616a0..848b06f 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/TezSplitGrouper.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/TezSplitGrouper.java @@ -129,6 +129,17 @@ public abstract class TezSplitGrouper { } } + private static final SplitLocationProviderWrapper DEFAULT_SPLIT_LOCATION_PROVIDER = new DefaultSplitLocationProvider(); + + static final class DefaultSplitLocationProvider implements SplitLocationProviderWrapper { + + @Override + public String[] getPreferredLocations(SplitContainer splitContainer) throws IOException, + InterruptedException { + return splitContainer.getPreferredLocations(); + } + } + Map<String, LocationHolder> createLocationsMap(Configuration conf) { if (conf.getBoolean(TEZ_GROUPING_REPEATABLE, TEZ_GROUPING_REPEATABLE_DEFAULT)) { @@ -141,7 +152,8 @@ public abstract class TezSplitGrouper { List<SplitContainer> originalSplits, int desiredNumSplits, String wrappedInputFormatName, - SplitSizeEstimatorWrapper estimator) throws + SplitSizeEstimatorWrapper estimator, + SplitLocationProviderWrapper locationProvider) throws IOException, InterruptedException { LOG.info("Grouping splits in Tez"); Preconditions.checkArgument(originalSplits != null, "Splits must be specified"); @@ -156,6 +168,9 @@ public abstract class TezSplitGrouper { if (estimator == null) { estimator = DEFAULT_SPLIT_ESTIMATOR; } + if (locationProvider == null) { + locationProvider = DEFAULT_SPLIT_LOCATION_PROVIDER; + } if (! (configNumSplits > 0 || originalSplits.size() == 0)) { @@ -218,9 +233,10 @@ public abstract class TezSplitGrouper { LOG.info("Using original number of splits: " + originalSplits.size() + " desired splits: " + desiredNumSplits); groupedSplits = new ArrayList<GroupedSplitContainer>(originalSplits.size()); + // TODO TEZ-2911 null in the non null String[] handled differently here compared to when grouping happens. for (SplitContainer split : originalSplits) { GroupedSplitContainer newSplit = - new GroupedSplitContainer(1, wrappedInputFormatName, split.getPreferredLocations(), + new GroupedSplitContainer(1, wrappedInputFormatName, locationProvider.getPreferredLocations(split), null); newSplit.addSplit(split); groupedSplits.add(newSplit); @@ -237,7 +253,7 @@ public abstract class TezSplitGrouper { // go through splits and add them to locations for (SplitContainer split : originalSplits) { totalLength += estimator.getEstimatedSize(split); - String[] locations = split.getPreferredLocations(); + String[] locations = locationProvider.getPreferredLocations(split); if (locations == null || locations.length == 0) { locations = emptyLocations; } @@ -262,7 +278,7 @@ public abstract class TezSplitGrouper { Set<String> locSet = new HashSet<String>(); for (SplitContainer split : originalSplits) { locSet.clear(); - String[] locations = split.getPreferredLocations(); + String[] locations = locationProvider.getPreferredLocations(split); if (locations == null || locations.length == 0) { locations = emptyLocations; } @@ -352,7 +368,7 @@ public abstract class TezSplitGrouper { groupLocation = null; } else if (doingRackLocal) { for (SplitContainer splitH : group) { - String[] locations = splitH.getPreferredLocations(); + String[] locations = locationProvider.getPreferredLocations(splitH); if (locations != null) { for (String loc : locations) { if (loc != null) { @@ -436,7 +452,7 @@ public abstract class TezSplitGrouper { } numRackSplitsToGroup--; rackSet.clear(); - String[] locations = split.getPreferredLocations(); + String[] locations = locationProvider.getPreferredLocations(split); if (locations == null || locations.length == 0) { locations = emptyLocations; } http://git-wip-us.apache.org/repos/asf/tez/blob/4de112b6/tez-mapreduce/src/test/java/org/apache/hadoop/mapred/split/TestGroupedSplits.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/test/java/org/apache/hadoop/mapred/split/TestGroupedSplits.java b/tez-mapreduce/src/test/java/org/apache/hadoop/mapred/split/TestGroupedSplits.java index eddcc42..43776f7 100644 --- a/tez-mapreduce/src/test/java/org/apache/hadoop/mapred/split/TestGroupedSplits.java +++ b/tez-mapreduce/src/test/java/org/apache/hadoop/mapred/split/TestGroupedSplits.java @@ -32,6 +32,7 @@ import java.util.List; import java.util.Random; import java.util.Set; +import org.apache.hadoop.conf.Configuration; import org.apache.tez.mapreduce.grouper.TezSplitGrouper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -615,4 +616,89 @@ public class TestGroupedSplits { } } + + // Splits get grouped + @Test (timeout = 10000) + public void testGroupingWithCustomLocations1() throws IOException { + + int numSplits = 3; + InputSplit[] mockSplits = new InputSplit[numSplits]; + InputSplit mockSplit1 = mock(InputSplit.class); + when(mockSplit1.getLength()).thenReturn(100*1000*1000l); + when(mockSplit1.getLocations()).thenReturn(new String[] {"location1", "location2"}); + mockSplits[0] = mockSplit1; + InputSplit mockSplit2 = mock(InputSplit.class); + when(mockSplit2.getLength()).thenReturn(100*1000*1000l); + when(mockSplit2.getLocations()).thenReturn(new String[] {"location3", "location4"}); + mockSplits[1] = mockSplit2; + InputSplit mockSplit3 = mock(InputSplit.class); + when(mockSplit3.getLength()).thenReturn(100*1000*1000l); + when(mockSplit3.getLocations()).thenReturn(new String[] {"location5", "location6"}); + mockSplits[2] = mockSplit3; + + SplitLocationProvider locationProvider = new SplitLocationProvider() { + @Override + public String[] getLocations(InputSplit split) throws IOException { + return new String[] {"customLocation"}; + } + }; + + TezMapredSplitsGrouper splitsGrouper = new TezMapredSplitsGrouper(); + InputSplit[] groupedSplits = splitsGrouper.getGroupedSplits(new Configuration(defaultConf), mockSplits, 1, + "MockInputForamt", null, locationProvider); + + // Sanity. 1 group, with 3 splits. + Assert.assertEquals(1, groupedSplits.length); + Assert.assertTrue(groupedSplits[0] instanceof TezGroupedSplit); + TezGroupedSplit groupedSplit = (TezGroupedSplit)groupedSplits[0]; + Assert.assertEquals(3, groupedSplit.getGroupedSplits().size()); + + // Verify that the split ends up being grouped to the custom location. + Assert.assertEquals(1, groupedSplit.getLocations().length); + Assert.assertEquals("customLocation", groupedSplit.getLocations()[0]); + } + + // Original splits returned. + @Test (timeout = 10000) + public void testGroupingWithCustomLocations2() throws IOException { + + int numSplits = 3; + InputSplit[] mockSplits = new InputSplit[numSplits]; + InputSplit mockSplit1 = mock(InputSplit.class); + when(mockSplit1.getLength()).thenReturn(100*1000*1000l); + when(mockSplit1.getLocations()).thenReturn(new String[] {"location1", "location2"}); + mockSplits[0] = mockSplit1; + InputSplit mockSplit2 = mock(InputSplit.class); + when(mockSplit2.getLength()).thenReturn(100*1000*1000l); + when(mockSplit2.getLocations()).thenReturn(new String[] {"location3", "location4"}); + mockSplits[1] = mockSplit2; + InputSplit mockSplit3 = mock(InputSplit.class); + when(mockSplit3.getLength()).thenReturn(100*1000*1000l); + when(mockSplit3.getLocations()).thenReturn(new String[] {"location5", "location6"}); + mockSplits[2] = mockSplit3; + + SplitLocationProvider locationProvider = new SplitLocationProvider() { + @Override + public String[] getLocations(InputSplit split) throws IOException { + return new String[] {"customLocation"}; + } + }; + + TezMapredSplitsGrouper splitsGrouper = new TezMapredSplitsGrouper(); + InputSplit[] groupedSplits = splitsGrouper.getGroupedSplits(new Configuration(defaultConf), mockSplits, 3, + "MockInputForamt", null, locationProvider); + + // Sanity. 3 group, with 1 split each + Assert.assertEquals(3, groupedSplits.length); + for (int i = 0 ; i < 3 ; i++) { + Assert.assertTrue(groupedSplits[i] instanceof TezGroupedSplit); + TezGroupedSplit groupedSplit = (TezGroupedSplit)groupedSplits[i]; + Assert.assertEquals(1, groupedSplit.getGroupedSplits().size()); + + // Verify the splits have their final location set to customLocation + Assert.assertEquals(1, groupedSplit.getLocations().length); + Assert.assertEquals("customLocation", groupedSplit.getLocations()[0]); + } + } + }
