Repository: incubator-reef
Updated Branches:
  refs/heads/master f78c88c11 -> 33812fb01


[REEF-469] Assign data splits to specific Evaluators

This commit allows to assign data splits to the different evaluators using the
DataLoadingService.  Several changes were done. The main one is to inject a
strategy into InputFormatLoadingService, which is in charge of assigning splits
to evaluators.  New data types needed to be created. For example,
DistributedDataSetPartition, which is just a folder, but has a location field,
where the user can specify where she would like to load that data into (in
certain node or rack). We have LocationAwareJobConfs now, which binds the
typical hadoop JobConf and the DataPartition objects. Removing the
EvaluatorToPartitionMapper class, most of its logic has been placed in the
AbstractEvaluatorToSplitStrategy one.

JIRA:
  [REEF-469](https://issues.apache.org/jira/browse/REEF-469)

Pull Request:
  This closes #290


Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/33812fb0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/33812fb0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/33812fb0

Branch: refs/heads/master
Commit: 33812fb0117ba65c92ca96b18f2eeea5d38101d2
Parents: f78c88c
Author: Ignacio Cano <[email protected]>
Authored: Fri Jul 10 14:39:26 2015 -0700
Committer: Markus Weimer <[email protected]>
Committed: Tue Jul 14 19:10:56 2015 -0700

----------------------------------------------------------------------
 .../loading/api/DataLoadingRequestBuilder.java  |  86 +++++--
 .../io/data/loading/api/DistributedDataSet.java | 110 ++++++++
 .../api/EvaluatorToPartitionStrategy.java       |  59 +++++
 .../AbstractEvaluatorToPartitionStrategy.java   | 254 +++++++++++++++++++
 .../impl/DistributedDataSetPartition.java       | 179 +++++++++++++
 .../DistributedDataSetPartitionSerializer.java  |  66 +++++
 .../impl/EvaluatorToPartitionMapper.java        | 153 -----------
 .../loading/impl/InputFormatLoadingService.java |  65 ++---
 ...iDataCenterEvaluatorToPartitionStrategy.java | 152 +++++++++++
 .../io/data/loading/impl/NumberedSplit.java     |  26 +-
 ...eDataCenterEvaluatorToPartitionStrategy.java |  75 ++++++
 11 files changed, 1018 insertions(+), 207 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/33812fb0/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/api/DataLoadingRequestBuilder.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/api/DataLoadingRequestBuilder.java
 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/api/DataLoadingRequestBuilder.java
index 53e8433..f227374 100644
--- 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/api/DataLoadingRequestBuilder.java
+++ 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/api/DataLoadingRequestBuilder.java
@@ -20,14 +20,16 @@ package org.apache.reef.io.data.loading.api;
 
 import org.apache.commons.lang.Validate;
 import org.apache.hadoop.mapred.InputFormat;
-import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.TextInputFormat;
 import org.apache.reef.client.DriverConfiguration;
 import org.apache.reef.driver.evaluator.EvaluatorRequest;
+import 
org.apache.reef.io.data.loading.impl.DistributedDataSetPartitionSerializer;
 import org.apache.reef.io.data.loading.impl.EvaluatorRequestSerializer;
-import org.apache.reef.io.data.loading.impl.InputFormatExternalConstructor;
+import 
org.apache.reef.io.data.loading.impl.SingleDataCenterEvaluatorToPartitionStrategy;
+import org.apache.reef.io.data.loading.impl.DistributedDataSetPartition;
 import org.apache.reef.io.data.loading.impl.InputFormatLoadingService;
 import org.apache.reef.io.data.loading.impl.JobConfExternalConstructor;
+import 
org.apache.reef.io.data.loading.impl.MultiDataCenterEvaluatorToPartitionStrategy;
 import org.apache.reef.tang.Configuration;
 import org.apache.reef.tang.JavaConfigurationBuilder;
 import org.apache.reef.tang.Tang;
@@ -38,6 +40,7 @@ import org.apache.reef.tang.formats.ConfigurationModule;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
 
@@ -69,7 +72,16 @@ public final class DataLoadingRequestBuilder
   private boolean renewFailedEvaluators = true;
   private ConfigurationModule driverConfigurationModule = null;
   private String inputFormatClass;
-  private String inputPath;
+  /**
+   * Single data center loading strategy flag. Allows to specify if the data
+   * will be loaded in machines of a single data center or not. By
+   * default, is set to true.
+   */
+  private boolean singleDataCenterStrategy = true;
+  /**
+   * Distributed dataset that can contain many distributed partitions.
+   */
+  private DistributedDataSet distributedDataSet;
 
   public DataLoadingRequestBuilder setNumberOfDesiredSplits(final int 
numberOfDesiredSplits) {
     this.numberOfDesiredSplits = numberOfDesiredSplits;
@@ -188,8 +200,38 @@ public final class DataLoadingRequestBuilder
     return this;
   }
 
+  /**
+   * Sets the path of the folder where the data is. Internally it constructs a
+   * distributed data set with one partition, no splits and the data can be
+   * loaded from anywhere.
+   *
+   * @deprecated since 0.12. Should use instead
+   *             {@link 
DataLoadingRequestBuilder#setDistributedDataSet(DistributedDataSet)}
+   * @param inputPath
+   *          the input path
+   * @return this
+   */
+  @Deprecated
   public DataLoadingRequestBuilder setInputPath(final String inputPath) {
-    this.inputPath = inputPath;
+    final DistributedDataSet dds = new DistributedDataSet();
+    
dds.addPartition(DistributedDataSetPartition.newBuilder().setPath(inputPath)
+        .setLocation(DistributedDataSetPartition.LOAD_INTO_ANY_LOCATION)
+        
.setDesiredSplits(Integer.valueOf(NumberOfDesiredSplits.DEFAULT_DESIRED_SPLITS)).build());
+    this.singleDataCenterStrategy = true;
+    this.distributedDataSet = dds;
+    return this;
+  }
+
+  /**
+   * Sets the distributed data set.
+   *
+   * @param dataSet
+   *          the distributed data set
+   * @return this
+   */
+  public DataLoadingRequestBuilder setDistributedDataSet(final 
DistributedDataSet distributedDataSet) {
+    this.distributedDataSet = distributedDataSet;
+    this.singleDataCenterStrategy = false;
     return this;
   }
 
@@ -199,8 +241,8 @@ public final class DataLoadingRequestBuilder
       throw new BindException("Driver Configuration Module is a required 
parameter.");
     }
 
-    if (this.inputPath == null) {
-      throw new BindException("InputPath is a required parameter.");
+    if (this.distributedDataSet == null || this.distributedDataSet.isEmpty()) {
+      throw new BindException("Distributed Data Set is a required parameter.");
     }
 
     if (this.inputFormatClass == null) {
@@ -258,18 +300,32 @@ public final class DataLoadingRequestBuilder
       }
     }
 
-    return jcb
-        .bindNamedParameter(LoadDataIntoMemory.class, 
Boolean.toString(this.inMemory))
-        .bindConstructor(InputFormat.class, 
InputFormatExternalConstructor.class)
-        .bindConstructor(JobConf.class, JobConfExternalConstructor.class)
-        .bindNamedParameter(JobConfExternalConstructor.InputFormatClass.class, 
inputFormatClass)
-        .bindNamedParameter(JobConfExternalConstructor.InputPath.class, 
inputPath)
-        .bindImplementation(DataLoadingService.class, 
InputFormatLoadingService.class)
-        .build();
+    jcb.bindNamedParameter(LoadDataIntoMemory.class, 
Boolean.toString(this.inMemory))
+       .bindNamedParameter(JobConfExternalConstructor.InputFormatClass.class, 
inputFormatClass);
+
+    final Iterator<DistributedDataSetPartition> partitions = 
this.distributedDataSet.iterator();
+    while (partitions.hasNext()) {
+      jcb.bindSetEntry(
+          
DistributedDataSetPartitionSerializer.DistributedDataSetPartitions.class,
+          DistributedDataSetPartitionSerializer.serialize(partitions.next()));
+    }
+
+    // we do this check for backwards compatibility, if the user defined it
+    // wants to use the single data center loading strategy, we bind that 
implementation.
+    if (this.singleDataCenterStrategy) {
+      jcb.bindImplementation(EvaluatorToPartitionStrategy.class, 
SingleDataCenterEvaluatorToPartitionStrategy.class);
+    } else {
+      // otherwise, we bind the strategy that will allow the user to specify
+      // which evaluators can load the different partitions in a multi data 
center network topology
+      jcb.bindImplementation(EvaluatorToPartitionStrategy.class, 
MultiDataCenterEvaluatorToPartitionStrategy.class);
+    }
+
+    return jcb.bindImplementation(DataLoadingService.class, 
InputFormatLoadingService.class).build();
   }
 
-  @NamedParameter(short_name = "num_splits", default_value = "0")
+  @NamedParameter(short_name = "num_splits", default_value = 
NumberOfDesiredSplits.DEFAULT_DESIRED_SPLITS)
   public static final class NumberOfDesiredSplits implements Name<Integer> {
+    static final String DEFAULT_DESIRED_SPLITS = "0";
   }
 
   @NamedParameter(short_name = "dataLoadingEvaluatorMemoryMB",

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/33812fb0/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/api/DistributedDataSet.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/api/DistributedDataSet.java
 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/api/DistributedDataSet.java
new file mode 100644
index 0000000..e95ca3e
--- /dev/null
+++ 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/api/DistributedDataSet.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.data.loading.api;
+
+import org.apache.reef.annotations.Unstable;
+import org.apache.reef.io.data.loading.impl.DistributedDataSetPartition;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Represents a distributed data set that is split across data centers.
+ * It contains a set of distributed data set partitions {@link 
DistributedDataSetPartition}
+ * this data to be loaded into.
+ *
+ */
+@Unstable
+public final class DistributedDataSet implements 
Iterable<DistributedDataSetPartition> {
+
+
+  /**
+   * The set of distributed data set partitions.
+   */
+  private final Set<DistributedDataSetPartition> partitions = new HashSet<>();
+
+  /**
+   * Adds the given partition to the set.
+   *
+   * @param partition
+   *          the partition to add
+   */
+  public void addPartition(final DistributedDataSetPartition partition) {
+    this.partitions.add(partition);
+  }
+
+  /**
+   * Adds the given partitions to the set.
+   *
+   * @param partitions
+   *          the partitions to add
+   */
+  public void addPartitions(final Collection<DistributedDataSetPartition> 
partitions) {
+    this.partitions.addAll(partitions);
+  }
+
+  /**
+   * Returns true if it does not contain any partition.
+   *
+   * @return a boolean indicating whether it contains partitions or not
+   */
+  public boolean isEmpty() {
+    return this.partitions.isEmpty();
+  }
+
+  @Override
+  public Iterator<DistributedDataSetPartition> iterator() {
+    return new DistributedDataSetIterator(partitions);
+  }
+
+  static final class DistributedDataSetIterator implements 
Iterator<DistributedDataSetPartition> {
+
+    private final List<DistributedDataSetPartition> partitions;
+    private int position;
+
+    public DistributedDataSetIterator(
+        final Collection<DistributedDataSetPartition> partitions) {
+      this.partitions = new 
LinkedList<DistributedDataSetPartition>(partitions);
+      position = 0;
+    }
+
+    @Override
+    public boolean hasNext() {
+      return position < partitions.size();
+    }
+
+    @Override
+    public DistributedDataSetPartition next() {
+      final DistributedDataSetPartition partition = partitions
+          .get(position);
+      position++;
+      return partition;
+    }
+
+    @Override
+    public void remove() {
+      throw new UnsupportedOperationException(
+          "Remove method has not been implemented in this iterator");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/33812fb0/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/api/EvaluatorToPartitionStrategy.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/api/EvaluatorToPartitionStrategy.java
 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/api/EvaluatorToPartitionStrategy.java
new file mode 100644
index 0000000..d10b4d4
--- /dev/null
+++ 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/api/EvaluatorToPartitionStrategy.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.data.loading.api;
+
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.reef.annotations.Unstable;
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.driver.catalog.NodeDescriptor;
+import org.apache.reef.io.data.loading.impl.NumberedSplit;
+
+/**
+ * Interface that tracks the mapping between evaluators & the data partitions
+ * assigned to those evaluators. Its part of the implementation of a
+ * {@link org.apache.reef.io.data.loading.api.DataLoadingService} that uses the
+ * Hadoop {@link org.apache.hadoop.mapred.InputFormat} to partition the data 
and
+ * request resources accordingly
+ *
+ * @param <V>
+ */
+@DriverSide
+@Unstable
+public interface EvaluatorToPartitionStrategy<V extends InputSplit> {
+
+  /**
+   * Returns an input split for the given evaluator.
+   * @param nodeDescriptor
+   *      the node descriptor where the evaluator is running on
+   * @param evalId
+   *      the evaluator id
+   * @return
+   *      the numberedSplit
+   * @throws RuntimeException if no split could be allocated
+   */
+  NumberedSplit<V> getInputSplit(NodeDescriptor nodeDescriptor, String evalId);
+
+  /**
+   * Returns the total number of splits computed in this strategy.
+   * @return
+   *  the number of splits
+   */
+  int getNumberOfSplits();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/33812fb0/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/impl/AbstractEvaluatorToPartitionStrategy.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/impl/AbstractEvaluatorToPartitionStrategy.java
 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/impl/AbstractEvaluatorToPartitionStrategy.java
new file mode 100644
index 0000000..f6ea0dd
--- /dev/null
+++ 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/impl/AbstractEvaluatorToPartitionStrategy.java
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.data.loading.impl;
+
+import org.apache.commons.lang.Validate;
+import org.apache.commons.math3.util.Pair;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.reef.annotations.Unstable;
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.driver.catalog.NodeDescriptor;
+import org.apache.reef.io.data.loading.api.EvaluatorToPartitionStrategy;
+import org.apache.reef.tang.ExternalConstructor;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.Map.Entry;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * This is an abstract class useful for {@link EvaluatorToPartitionStrategy}
+ * implementations. Contains a template implementation of
+ * {@link EvaluatorToPartitionStrategy#getInputSplit(NodeDescriptor, String)}
+ * that call abstract methods implemented by subclasses. If your implementation
+ * does not need this logic, you should just implement the
+ * {@link EvaluatorToPartitionStrategy} interface and do not extend this class.
+ */
+@DriverSide
+@Unstable
+public abstract class AbstractEvaluatorToPartitionStrategy implements 
EvaluatorToPartitionStrategy<InputSplit> {
+  private static final Logger LOG = 
Logger.getLogger(AbstractEvaluatorToPartitionStrategy.class.getName());
+
+  protected final ConcurrentMap<String, 
BlockingQueue<NumberedSplit<InputSplit>>> locationToSplits;;
+  protected final ConcurrentMap<String, NumberedSplit<InputSplit>> 
evaluatorToSplits;
+  protected final BlockingQueue<NumberedSplit<InputSplit>> unallocatedSplits;
+
+  private int totalNumberOfSplits;
+
+  @SuppressWarnings("rawtypes")
+  AbstractEvaluatorToPartitionStrategy(
+      final String inputFormatClassName, final Set<String> 
serializedDataPartitions) {
+    LOG.fine("AbstractEvaluatorToPartitionStrategy injected");
+    Validate.notEmpty(inputFormatClassName);
+    Validate.notEmpty(serializedDataPartitions);
+
+    locationToSplits = new ConcurrentHashMap<>();
+    evaluatorToSplits = new ConcurrentHashMap<>();
+    unallocatedSplits = new LinkedBlockingQueue<>();
+    setUp();
+
+    final Map<DistributedDataSetPartition, InputSplit[]> splitsPerPartition = 
new HashMap<>();
+    for (final String serializedDataPartition : serializedDataPartitions) {
+      final DistributedDataSetPartition dp = 
DistributedDataSetPartitionSerializer.deserialize(serializedDataPartition);
+      final ExternalConstructor<JobConf> jobConfExternalConstructor = new 
JobConfExternalConstructor(
+          inputFormatClassName, dp.getPath());
+      try {
+        final JobConf jobConf = jobConfExternalConstructor.newInstance();
+        final InputFormat inputFormat = jobConf.getInputFormat();
+        final InputSplit[] inputSplits = inputFormat.getSplits(jobConf, 
dp.getDesiredSplits());
+        if (LOG.isLoggable(Level.FINEST)) {
+          LOG.log(Level.FINEST, "Splits for partition: {0} {1}", new Object[] 
{dp, Arrays.toString(inputSplits)});
+        }
+        this.totalNumberOfSplits += inputSplits.length;
+        splitsPerPartition.put(dp, inputSplits);
+      } catch (final IOException e) {
+        throw new RuntimeException("Unable to get InputSplits using the 
specified InputFormat", e);
+      }
+    }
+    init(splitsPerPartition);
+    LOG.log(Level.FINE, "Total Number of splits: {0}", 
this.totalNumberOfSplits);
+  }
+
+  /**
+   * Initializes the locations of the splits where we'd like to be loaded into.
+   * Sets all the splits to unallocated
+   *
+   * @param splitsPerPartition
+   *          a map containing the input splits per data partition
+   */
+  private void init(final Map<DistributedDataSetPartition, InputSplit[]> 
splitsPerPartition) {
+    final Pair<InputSplit[], DistributedDataSetPartition[]>
+                                      splitsAndPartitions = 
getSplitsAndPartitions(splitsPerPartition);
+    final InputSplit[] splits = splitsAndPartitions.getFirst();
+    final DistributedDataSetPartition[] partitions = 
splitsAndPartitions.getSecond();
+    Validate.isTrue(splits.length == partitions.length);
+    for (int splitNum = 0; splitNum < splits.length; splitNum++) {
+      LOG.log(Level.FINE, "Processing split: " + splitNum);
+      final InputSplit split = splits[splitNum];
+      final NumberedSplit<InputSplit> numberedSplit = new 
NumberedSplit<InputSplit>(split, splitNum,
+          partitions[splitNum]);
+      unallocatedSplits.add(numberedSplit);
+      updateLocations(numberedSplit);
+    }
+    if (LOG.isLoggable(Level.FINE)) {
+      for (final Map.Entry<String, BlockingQueue<NumberedSplit<InputSplit>>> 
locSplit : locationToSplits.entrySet()) {
+        LOG.log(Level.FINE, locSplit.getKey() + ": " + 
locSplit.getValue().toString());
+      }
+    }
+  }
+
+  /**
+   * Each strategy should update the locations where they want the split to be
+   * loaded into. For example, the split physical location, certain node,
+   * certain rack
+   *
+   * @param numberedSplit
+   *          the numberedSplit
+   */
+  protected abstract void updateLocations(final NumberedSplit<InputSplit> 
numberedSplit);
+
+  /**
+   * Tries to allocate a split in an evaluator based on some particular rule.
+   * For example, based on the rack name, randomly, etc.
+   *
+   * @param nodeDescriptor
+   *          the node descriptor to extract information from
+   * @param evaluatorId
+   *          the evaluator id where we want to allocate the numberedSplit
+   * @return a numberedSplit or null if couldn't allocate one
+   */
+  protected abstract NumberedSplit<InputSplit> tryAllocate(NodeDescriptor 
nodeDescriptor, String evaluatorId);
+
+  /**
+   * Called in the constructor. Allows children to setUp the objects they will
+   * need in
+   * {@link AbstractEvaluatorToPartitionStrategy#updateLocations(InputSplit, 
NumberedSplit)}
+   * and
+   * {@link AbstractEvaluatorToPartitionStrategy#tryAllocate(NodeDescriptor, 
String)}
+   * methods.
+   * By default we provide an empty implementation.
+   */
+  protected void setUp() {
+    // empty implementation by default
+  }
+
+  /**
+   * Get an input split to be assigned to this evaluator.
+   * <p/>
+   * Allocates one if its not already allocated
+   *
+   * @param evaluatorId
+   * @return a numberedSplit
+   * @throws RuntimeException
+   *           if couldn't find any split
+   */
+  @Override
+  public NumberedSplit<InputSplit> getInputSplit(final NodeDescriptor 
nodeDescriptor, final String evaluatorId) {
+    synchronized (evaluatorToSplits) {
+      if (evaluatorToSplits.containsKey(evaluatorId)) {
+        LOG.log(Level.FINE, "Found an already allocated split, {0}", 
evaluatorToSplits.toString());
+        return evaluatorToSplits.get(evaluatorId);
+      }
+    }
+    // always first try to allocate based on the hostName
+    final String hostName = nodeDescriptor.getName();
+    LOG.log(Level.FINE, "Allocated split not found, trying on {0}", hostName);
+    if (locationToSplits.containsKey(hostName)) {
+      LOG.log(Level.FINE, "Found splits possibly hosted for {0} at {1}", new 
Object[] {evaluatorId, hostName});
+      final NumberedSplit<InputSplit> split = allocateSplit(evaluatorId, 
locationToSplits.get(hostName));
+      if (split != null) {
+        return split;
+      }
+    }
+    LOG.log(Level.FINE, "{0} does not host any splits or someone else took 
splits hosted here. Picking other ones",
+        hostName);
+    final NumberedSplit<InputSplit> split = tryAllocate(nodeDescriptor, 
evaluatorId);
+    if (split == null) {
+      throw new RuntimeException("Unable to find an input split to evaluator " 
+ evaluatorId);
+    } else {
+      LOG.log(Level.FINE, evaluatorToSplits.toString());
+    }
+    return split;
+  }
+
+  @Override
+  public int getNumberOfSplits() {
+    return this.totalNumberOfSplits;
+  }
+
+  private Pair<InputSplit[], DistributedDataSetPartition[]> 
getSplitsAndPartitions(
+      final Map<DistributedDataSetPartition, InputSplit[]> splitsPerPartition) 
{
+    final List<InputSplit> inputSplits = new ArrayList<>();
+    final List<DistributedDataSetPartition> partitions = new ArrayList<>();
+    for (final Entry<DistributedDataSetPartition, InputSplit[]> entry : 
splitsPerPartition.entrySet()) {
+      final DistributedDataSetPartition partition = entry.getKey();
+      final InputSplit[] splits = entry.getValue();
+      for (final InputSplit split : splits) {
+        inputSplits.add(split);
+        partitions.add(partition);
+      }
+    }
+    return new Pair<>(inputSplits.toArray(new InputSplit[inputSplits.size()]),
+        partitions.toArray(new 
DistributedDataSetPartition[partitions.size()]));
+  }
+
+  /**
+   * Allocates the first available split into the evaluator.
+   *
+   * @param evaluatorId
+   *          the evaluator id
+   * @param value
+   *          the queue of splits
+   * @return a numberedSplit or null if it cannot find one
+   */
+  protected NumberedSplit<InputSplit> allocateSplit(final String evaluatorId,
+      final BlockingQueue<NumberedSplit<InputSplit>> value) {
+    if (value == null) {
+      LOG.log(Level.FINE, "Queue of splits can't be empty. Returning null");
+      return null;
+    }
+    while (true) {
+      final NumberedSplit<InputSplit> split = value.poll();
+      if (split == null) {
+        return null;
+      }
+      if (value == unallocatedSplits || unallocatedSplits.remove(split)) {
+        LOG.log(Level.FINE, "Found split-" + split.getIndex() + " in the 
queue");
+        final NumberedSplit<InputSplit> old = 
evaluatorToSplits.putIfAbsent(evaluatorId, split);
+        if (old != null) {
+          throw new RuntimeException("Trying to assign different splits to the 
same evaluator is not supported");
+        } else {
+          LOG.log(Level.FINE, "Returning " + split.getIndex());
+          return split;
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/33812fb0/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/impl/DistributedDataSetPartition.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/impl/DistributedDataSetPartition.java
 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/impl/DistributedDataSetPartition.java
new file mode 100644
index 0000000..7cc52c1
--- /dev/null
+++ 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/impl/DistributedDataSetPartition.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.data.loading.impl;
+
+import org.apache.commons.lang.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.reef.annotations.Unstable;
+
+/**
+ * POJO that represents a distributed data set partition. Basically, it 
contains the path where
+ * the data files are located for this partition, and the location where we 
want
+ * this data to be loaded into.
+ *
+ */
+@Unstable
+public final class DistributedDataSetPartition {
+
+  /**
+   * Constant to specify that the data partition could be loaded into any
+   * location.
+   */
+  public static final String LOAD_INTO_ANY_LOCATION = "/*";
+
+  /**
+   * The path of the distributed data set partition. If we use HDFS, it will 
be the
+   * hdfs path.
+   */
+  private final String path;
+
+  /**
+   * The location (either a rackName or a nodeName) where we want the data in
+   * this distributed partition to be loaded into. It can contain a wildcard at
+   * the end, for example /datacenter1/*.
+   */
+  private final String location;
+
+  /**
+   * Number of desired splits for this partition.
+   */
+  private final int desiredSplits;
+
+  DistributedDataSetPartition(final String path, final String location, final 
int desiredSplits) {
+    this.path = path;
+    this.location = location;
+    this.desiredSplits = desiredSplits;
+  }
+
+  /**
+   * Returns the path to the distributed data partition.
+   *
+   * @return the path of the distributed data partition
+   */
+  String getPath() {
+    return path;
+  }
+
+  /**
+   * Returns the location where we want the data in this partition to be loaded
+   * into.
+   *
+   * @return the location where to load this data into.
+   */
+  String getLocation() {
+    return location;
+  }
+
+  /**
+   * Returns the number of desired splits for this data partition.
+   *
+   * @return the number of desired splits
+   */
+  int getDesiredSplits() {
+    return desiredSplits;
+  }
+
+  /**
+   * @return a new DistributedDataSetPartition Builder.
+   */
+  public static Builder newBuilder() {
+    return new Builder();
+  }
+
+  @Override
+  public boolean equals(final Object obj) {
+    if (obj == this) {
+      return true;
+    }
+    if (!(obj instanceof DistributedDataSetPartition)) {
+      return false;
+    }
+    final DistributedDataSetPartition that = (DistributedDataSetPartition) obj;
+    return new EqualsBuilder().append(this.path, 
that.path).append(this.location, that.location)
+        .append(this.desiredSplits, that.desiredSplits).isEquals();
+  }
+
+  @Override
+  public int hashCode() {
+    return new HashCodeBuilder(17, 
37).append(this.path).append(this.location).append(this.desiredSplits).toHashCode();
+  }
+
+  @Override
+  public String toString() {
+    return "{" + this.path + "," + this.location + "," + this.desiredSplits + 
"}";
+  }
+
+  /**
+   * {@link DistributedDataSetPartition}s are build using this Builder.
+   */
+  public static final class Builder implements 
org.apache.reef.util.Builder<DistributedDataSetPartition> {
+
+    private String path;
+    private String location;
+    private int desiredSplits;
+
+    private Builder() {
+    }
+
+    /**
+     * Sets the path of the distributed data set partition.
+     *
+     * @param path
+     *          the path to set
+     * @return this
+     */
+    public Builder setPath(final String path) {
+      this.path = path;
+      return this;
+    }
+
+
+    /**
+     * Sets the location where we want the data in this partition to be loaded
+     * into.
+     *
+     * @param location
+     *          the location to set
+     * @return this
+     */
+    public Builder setLocation(final String location) {
+      this.location = location;
+      return this;
+    }
+
+    /**
+     * Sets the desired number of splits for this partition.
+     * @param desiredSplits
+     *          the number of desired splits
+     * @return this
+     */
+    public Builder setDesiredSplits(final int desiredSplits) {
+      this.desiredSplits = desiredSplits;
+      return this;
+    }
+
+    /**
+     * Builds the {@link DistributedDataSetPartition}.
+     */
+    @Override
+    public DistributedDataSetPartition build() {
+      return new DistributedDataSetPartition(this.path, this.location, 
this.desiredSplits);
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/33812fb0/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/impl/DistributedDataSetPartitionSerializer.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/impl/DistributedDataSetPartitionSerializer.java
 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/impl/DistributedDataSetPartitionSerializer.java
new file mode 100644
index 0000000..60a4191
--- /dev/null
+++ 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/impl/DistributedDataSetPartitionSerializer.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.data.loading.impl;
+
+import org.apache.commons.codec.binary.Base64;
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+
+import java.io.*;
+import java.util.Set;
+
+/**
+ * Serialize and deserialize {@link DistributedDataSetPartition} objects.
+ */
+public final class DistributedDataSetPartitionSerializer {
+
+  public static String serialize(final DistributedDataSetPartition partition) {
+    try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
+      final DataOutputStream daos = new DataOutputStream(baos);
+      daos.writeUTF(partition.getPath());
+      daos.writeUTF(partition.getLocation());
+      daos.writeInt(partition.getDesiredSplits());
+      return Base64.encodeBase64String(baos.toByteArray());
+    } catch (final IOException e) {
+      throw new RuntimeException("Unable to serialize distributed data 
partition", e);
+    }
+  }
+
+  public static DistributedDataSetPartition deserialize(final String 
serializedPartition) {
+    try (ByteArrayInputStream bais = new 
ByteArrayInputStream(Base64.decodeBase64(serializedPartition))) {
+      final DataInputStream dais = new DataInputStream(bais);
+      return new DistributedDataSetPartition(dais.readUTF(), dais.readUTF(), 
dais.readInt());
+    } catch (final IOException e) {
+      throw new RuntimeException("Unable to de-serialize distributed data 
partition", e);
+    }
+  }
+
+  /**
+   * Empty private constructor to prohibit instantiation of utility class.
+   */
+  private DistributedDataSetPartitionSerializer() {
+  }
+
+  /**
+   * Allows to specify a set of distributed data set partitions.
+   */
+  @NamedParameter(doc = "Sets of distributed data set partitions")
+  public static final class DistributedDataSetPartitions implements 
Name<Set<String>> {
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/33812fb0/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/impl/EvaluatorToPartitionMapper.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/impl/EvaluatorToPartitionMapper.java
 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/impl/EvaluatorToPartitionMapper.java
deleted file mode 100644
index 3ac11e9..0000000
--- 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/impl/EvaluatorToPartitionMapper.java
+++ /dev/null
@@ -1,153 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.reef.io.data.loading.impl;
-
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.reef.annotations.audience.DriverSide;
-
-import java.io.IOException;
-import java.util.Map;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-/**
- * Class that tracks the mapping between
- * evaluators & the data partition assigned
- * to those evaluators. Its part of the
- * implementation of a {@link 
org.apache.reef.io.data.loading.api.DataLoadingService}
- * that uses the Hadoop {@link org.apache.hadoop.mapred.InputFormat} to
- * partition the data and request resources
- * accordingly
- * <p/>
- * This is an online version which satisfies
- * requests in a greedy way.
- *
- * @param <V>
- */
-@DriverSide
-public class EvaluatorToPartitionMapper<V extends InputSplit> {
-  private static final Logger LOG = Logger
-      .getLogger(EvaluatorToPartitionMapper.class.getName());
-
-  private final ConcurrentMap<String, BlockingQueue<NumberedSplit<V>>> 
locationToSplits = new ConcurrentHashMap<>();
-  private final ConcurrentMap<String, NumberedSplit<V>> evaluatorToSplits = 
new ConcurrentHashMap<>();
-  private final BlockingQueue<NumberedSplit<V>> unallocatedSplits = new 
LinkedBlockingQueue<>();
-
-  /**
-   * Initializes the locations of splits mapping.
-   *
-   * @param splits
-   */
-  public EvaluatorToPartitionMapper(V[] splits) {
-    try {
-      for (int splitNum = 0; splitNum < splits.length; splitNum++) {
-        LOG.log(Level.FINE, "Processing split: " + splitNum);
-        final V split = splits[splitNum];
-        final String[] locations = split.getLocations();
-        final NumberedSplit<V> numberedSplit = new NumberedSplit<V>(split, 
splitNum);
-        unallocatedSplits.add(numberedSplit);
-        for (final String location : locations) {
-          BlockingQueue<NumberedSplit<V>> newSplitQue = new 
LinkedBlockingQueue<NumberedSplit<V>>();
-          final BlockingQueue<NumberedSplit<V>> splitQue = 
locationToSplits.putIfAbsent(location,
-              newSplitQue);
-          if (splitQue != null) {
-            newSplitQue = splitQue;
-          }
-          newSplitQue.add(numberedSplit);
-        }
-      }
-      for (Map.Entry<String, BlockingQueue<NumberedSplit<V>>> locSplit : 
locationToSplits.entrySet()) {
-        LOG.log(Level.FINE, locSplit.getKey() + ": " + 
locSplit.getValue().toString());
-      }
-    } catch (IOException e) {
-      throw new RuntimeException(
-          "Unable to get InputSplits using the specified InputFormat", e);
-    }
-  }
-
-  /**
-   * Get an input split to be assigned to this.
-   * evaluator
-   * <p/>
-   * Allocates one if its not already allocated
-   *
-   * @param evaluatorId
-   * @return
-   */
-  public NumberedSplit<V> getInputSplit(final String hostName, final String 
evaluatorId) {
-    synchronized (evaluatorToSplits) {
-      if (evaluatorToSplits.containsKey(evaluatorId)) {
-        LOG.log(Level.FINE, "Found an already allocated partition");
-        LOG.log(Level.FINE, evaluatorToSplits.toString());
-        return evaluatorToSplits.get(evaluatorId);
-      }
-    }
-    LOG.log(Level.FINE, "allocated partition not found");
-    if (locationToSplits.containsKey(hostName)) {
-      LOG.log(Level.FINE, "Found partitions possibly hosted for " + 
evaluatorId + " at " + hostName);
-      final NumberedSplit<V> split = allocateSplit(evaluatorId, 
locationToSplits.get(hostName));
-      LOG.log(Level.FINE, evaluatorToSplits.toString());
-      if (split != null) {
-        return split;
-      }
-    }
-    //pick random
-    LOG.log(
-        Level.FINE,
-        hostName
-            + " does not host any partitions or someone else took partitions 
hosted here. Picking a random one");
-    final NumberedSplit<V> split = allocateSplit(evaluatorId, 
unallocatedSplits);
-    LOG.log(Level.FINE, evaluatorToSplits.toString());
-    if (split != null) {
-      return split;
-    }
-    throw new RuntimeException("Unable to find an input partition to evaluator 
" + evaluatorId);
-  }
-
-  private NumberedSplit<V> allocateSplit(final String evaluatorId,
-                                         final BlockingQueue<NumberedSplit<V>> 
value) {
-    if (value == null) {
-      LOG.log(Level.FINE, "Queue of splits can't be empty. Returning null");
-      return null;
-    }
-    while (true) {
-      final NumberedSplit<V> split = value.poll();
-      if (split == null) {
-        return null;
-      }
-      if (value == unallocatedSplits || unallocatedSplits.remove(split)) {
-        LOG.log(Level.FINE, "Found split-" + split.getIndex() + " in the 
queue");
-        final NumberedSplit<V> old = 
evaluatorToSplits.putIfAbsent(evaluatorId, split);
-        if (old != null) {
-          final String msg = "Trying to assign different partitions to the 
same evaluator " +
-              "is not supported";
-          LOG.severe(msg);
-          throw new RuntimeException(msg);
-        } else {
-          LOG.log(Level.FINE, "Returning " + split.getIndex());
-          return split;
-        }
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/33812fb0/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/impl/InputFormatLoadingService.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/impl/InputFormatLoadingService.java
 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/impl/InputFormatLoadingService.java
index dcaafb7..40b76ce 100644
--- 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/impl/InputFormatLoadingService.java
+++ 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/impl/InputFormatLoadingService.java
@@ -29,14 +29,16 @@ import org.apache.reef.driver.evaluator.AllocatedEvaluator;
 import org.apache.reef.io.data.loading.api.DataLoadingRequestBuilder;
 import org.apache.reef.io.data.loading.api.DataLoadingService;
 import org.apache.reef.io.data.loading.api.DataSet;
+import org.apache.reef.io.data.loading.api.EvaluatorToPartitionStrategy;
 import org.apache.reef.tang.Configuration;
 import org.apache.reef.tang.Tang;
 import org.apache.reef.tang.annotations.Parameter;
 import org.apache.reef.tang.exceptions.BindException;
 
 import javax.inject.Inject;
-import java.io.IOException;
+
 import java.util.Arrays;
+import java.util.HashSet;
 import java.util.Random;
 import java.util.logging.Level;
 import java.util.logging.Logger;
@@ -46,10 +48,10 @@ import java.util.logging.Logger;
  * that uses the Hadoop {@link InputFormat} to find
  * partitions of data & request resources.
  * <p/>
- * The InputFormat is injected using a Tang external constructor
+ * The InputFormat is taken from the job configurations
  * <p/>
- * It also tries to obtain data locality in a greedy
- * fashion using {@link EvaluatorToPartitionMapper}
+ * The {@link EvaluatorToPartitionStrategy} is injected via Tang,
+ * in order to support different ways to map evaluators to data
  */
 @DriverSide
 public class InputFormatLoadingService<K, V> implements DataLoadingService {
@@ -61,15 +63,20 @@ public class InputFormatLoadingService<K, V> implements 
DataLoadingService {
   private static final String COMPUTE_CONTEXT_PREFIX =
       "ComputeContext-" + new Random(3381).nextInt(1 << 20) + "-";
 
-  private final EvaluatorToPartitionMapper<InputSplit> 
evaluatorToPartitionMapper;
-  private final int numberOfPartitions;
+  private final EvaluatorToPartitionStrategy<InputSplit> 
evaluatorToPartitionStrategy;
 
   private final boolean inMemory;
 
   private final String inputFormatClass;
 
-  private final String inputPath;
 
+  /**
+   * @deprecated since 0.12. Should use the other constructor instead, which
+   *             allows to specify the strategy on how to assign partitions to
+   *             evaluators. This one by default uses {@link 
SingleDataCenterEvaluatorToPartitionStrategy}
+   *
+   */
+  @Deprecated
   @Inject
   public InputFormatLoadingService(
       final InputFormat<K, V> inputFormat,
@@ -78,40 +85,36 @@ public class InputFormatLoadingService<K, V> implements 
DataLoadingService {
       @Parameter(DataLoadingRequestBuilder.LoadDataIntoMemory.class) final 
boolean inMemory,
       @Parameter(JobConfExternalConstructor.InputFormatClass.class) final 
String inputFormatClass,
       @Parameter(JobConfExternalConstructor.InputPath.class) final String 
inputPath) {
+    this(new SingleDataCenterEvaluatorToPartitionStrategy(inputFormatClass, 
new HashSet<String>(
+        Arrays.asList(DistributedDataSetPartitionSerializer.serialize(new 
DistributedDataSetPartition(inputPath,
+            DistributedDataSetPartition.LOAD_INTO_ANY_LOCATION, 
numberOfDesiredSplits))))), inMemory, inputFormatClass);
+  }
 
+  @Inject
+  public InputFormatLoadingService(
+      final EvaluatorToPartitionStrategy<InputSplit> 
evaluatorToPartitionStrategy,
+      @Parameter(DataLoadingRequestBuilder.LoadDataIntoMemory.class) final 
boolean inMemory,
+      @Parameter(JobConfExternalConstructor.InputFormatClass.class) final 
String inputFormatClass) {
     this.inMemory = inMemory;
     this.inputFormatClass = inputFormatClass;
-    this.inputPath = inputPath;
-
-
-    try {
-
-      final InputSplit[] inputSplits = inputFormat.getSplits(jobConf, 
numberOfDesiredSplits);
-      if (LOG.isLoggable(Level.FINEST)) {
-        LOG.log(Level.FINEST, "Splits: {0}", Arrays.toString(inputSplits));
-      }
-
-      this.numberOfPartitions = inputSplits.length;
-      LOG.log(Level.FINE, "Number of partitions: {0}", 
this.numberOfPartitions);
-
-      this.evaluatorToPartitionMapper = new 
EvaluatorToPartitionMapper<>(inputSplits);
-
-    } catch (final IOException e) {
-      throw new RuntimeException("Unable to get InputSplits using the 
specified InputFormat", e);
-    }
+    this.evaluatorToPartitionStrategy = evaluatorToPartitionStrategy;
   }
 
+  /**
+   * This method actually returns the number of splits in all partition of the 
data.
+   * We should probably need to rename it in the future
+   */
   @Override
   public int getNumberOfPartitions() {
-    return this.numberOfPartitions;
+    return evaluatorToPartitionStrategy.getNumberOfSplits();
   }
 
   @Override
   public Configuration getContextConfiguration(final AllocatedEvaluator 
allocatedEvaluator) {
 
     final NumberedSplit<InputSplit> numberedSplit =
-        this.evaluatorToPartitionMapper.getInputSplit(
-            
allocatedEvaluator.getEvaluatorDescriptor().getNodeDescriptor().getName(),
+        this.evaluatorToPartitionStrategy.getInputSplit(
+            allocatedEvaluator.getEvaluatorDescriptor().getNodeDescriptor(),
             allocatedEvaluator.getId());
 
     return ContextConfiguration.CONF
@@ -125,8 +128,8 @@ public class InputFormatLoadingService<K, V> implements 
DataLoadingService {
     try {
 
       final NumberedSplit<InputSplit> numberedSplit =
-          this.evaluatorToPartitionMapper.getInputSplit(
-              
allocatedEvaluator.getEvaluatorDescriptor().getNodeDescriptor().getName(),
+          this.evaluatorToPartitionStrategy.getInputSplit(
+              allocatedEvaluator.getEvaluatorDescriptor().getNodeDescriptor(),
               allocatedEvaluator.getId());
 
       final Configuration serviceConfiguration = ServiceConfiguration.CONF
@@ -139,7 +142,7 @@ public class InputFormatLoadingService<K, V> implements 
DataLoadingService {
               DataSet.class,
               this.inMemory ? InMemoryInputFormatDataSet.class : 
InputFormatDataSet.class)
           
.bindNamedParameter(JobConfExternalConstructor.InputFormatClass.class, 
inputFormatClass)
-          .bindNamedParameter(JobConfExternalConstructor.InputPath.class, 
inputPath)
+          .bindNamedParameter(JobConfExternalConstructor.InputPath.class, 
numberedSplit.getPath())
           .bindNamedParameter(
               InputSplitExternalConstructor.SerializedInputSplit.class,
               WritableSerializer.serialize(numberedSplit.getEntry()))

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/33812fb0/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/impl/MultiDataCenterEvaluatorToPartitionStrategy.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/impl/MultiDataCenterEvaluatorToPartitionStrategy.java
 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/impl/MultiDataCenterEvaluatorToPartitionStrategy.java
new file mode 100644
index 0000000..31bda53
--- /dev/null
+++ 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/impl/MultiDataCenterEvaluatorToPartitionStrategy.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.data.loading.impl;
+
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.reef.annotations.Unstable;
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.driver.catalog.NodeDescriptor;
+import org.apache.reef.tang.annotations.Parameter;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import javax.inject.Inject;
+
+/**
+ * This is an online version which satisfies requests based on the locations 
the
+ * users ask the data to be loaded, for multiple data center network 
topologies.
+ *
+ */
+@DriverSide
+@Unstable
+public final class MultiDataCenterEvaluatorToPartitionStrategy extends 
AbstractEvaluatorToPartitionStrategy {
+  private static final Logger LOG = 
Logger.getLogger(MultiDataCenterEvaluatorToPartitionStrategy.class.getName());
+
+  private static final String PATH_SEPARATOR = "/";
+  private static final String ANY = "*";
+  /**
+   * Sorted set in reverse order, to keep track of the locations from most to
+   * least specific. For example: [/dc1/room1, /dc1].
+   */
+  private Set<String> normalizedLocations;
+  /**
+   * Partial locations where we want to allocate, in case exact match does not 
work.
+   */
+  private ConcurrentMap<String, BlockingQueue<NumberedSplit<InputSplit>>> 
partialLocationsToSplits;
+
+
+  @Inject
+  MultiDataCenterEvaluatorToPartitionStrategy(
+      @Parameter(JobConfExternalConstructor.InputFormatClass.class) final 
String inputFormatClassName,
+      
@Parameter(DistributedDataSetPartitionSerializer.DistributedDataSetPartitions.class)
+      final Set<String> serializedDataPartitions) {
+    super(inputFormatClassName, serializedDataPartitions);
+  }
+
+  /**
+   * Creates the objects to be used in updateLocations and tryAllocate methods.
+   */
+  @Override
+  protected void setUp() {
+    normalizedLocations = new TreeSet<>(Collections.reverseOrder());
+    partialLocationsToSplits = new ConcurrentHashMap<>();
+  }
+
+  /**
+   * {@inheritDoc}.
+   * Saves locationToSplits and partialLocations as well.
+   */
+  @Override
+  protected void updateLocations(final NumberedSplit<InputSplit> 
numberedSplit) {
+    final String location = numberedSplit.getLocation();
+    addLocationMapping(locationToSplits, numberedSplit, location);
+    final String normalizedLocation = normalize(location);
+    addLocationMapping(partialLocationsToSplits, numberedSplit, 
normalizedLocation);
+    normalizedLocations.add(normalizedLocation);
+  }
+
+  /**
+   * {@inheritDoc}. Tries to allocate on exact rack match, if it cannot, then 
it
+   * tries to get a partial match using the partialLocations map.
+   */
+  @Override
+  protected NumberedSplit<InputSplit> tryAllocate(final NodeDescriptor 
nodeDescriptor, final String evaluatorId) {
+    final String rackName = nodeDescriptor.getRackDescriptor().getName();
+    LOG.log(Level.FINE, "Trying an exact match on rack name {0}", rackName);
+    if (locationToSplits.containsKey(rackName)) {
+      LOG.log(Level.FINE, "Found splits possibly hosted for {0} at {1}", new 
Object[] {evaluatorId, rackName});
+      final NumberedSplit<InputSplit> split = allocateSplit(evaluatorId, 
locationToSplits.get(rackName));
+      if (split != null) {
+        return split;
+      }
+    }
+    LOG.fine("No success, trying based on a partial match on locations");
+    final Iterator<String> it = normalizedLocations.iterator();
+    while (it.hasNext()) {
+      final String possibleLocation = it.next();
+      LOG.log(Level.FINE, "Trying on possible location {0}", possibleLocation);
+      if (rackName.startsWith(possibleLocation)) {
+        LOG.log(Level.FINE, "Found splits possibly hosted for {0} at {1} for 
rack {2}", new Object[] {evaluatorId,
+            possibleLocation, rackName});
+        final NumberedSplit<InputSplit> split = allocateSplit(evaluatorId,
+            partialLocationsToSplits.get(possibleLocation));
+        if (split != null) {
+          return split;
+        }
+      }
+    }
+    LOG.fine("Nothing found");
+    return null;
+  }
+
+  private void addLocationMapping(final ConcurrentMap<String,
+      BlockingQueue<NumberedSplit<InputSplit>>> concurrentMap,
+      final NumberedSplit<InputSplit> numberedSplit, final String location) {
+    if (!concurrentMap.containsKey(location)) {
+      final BlockingQueue<NumberedSplit<InputSplit>> newSplitQueue = new 
LinkedBlockingQueue<>();
+      concurrentMap.put(location, newSplitQueue);
+    }
+    concurrentMap.get(location).add(numberedSplit);
+  }
+
+  private String normalize(String location) {
+    // should start with a separator
+    if (!location.startsWith(PATH_SEPARATOR)) {
+      location = PATH_SEPARATOR + location;
+    }
+    // if it is just /*, return /
+    if (location.equals(PATH_SEPARATOR + ANY)) {
+      return PATH_SEPARATOR;
+    }
+    // remove the ending ANY or path separator
+    while (location.endsWith(ANY) || location.endsWith(PATH_SEPARATOR)) {
+      location = location.substring(0, location.length() - 1);
+    }
+    return location;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/33812fb0/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/impl/NumberedSplit.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/impl/NumberedSplit.java
 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/impl/NumberedSplit.java
index f3cbc97..a9594aa 100644
--- 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/impl/NumberedSplit.java
+++ 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/impl/NumberedSplit.java
@@ -18,24 +18,34 @@
  */
 package org.apache.reef.io.data.loading.impl;
 
+import org.apache.commons.lang.Validate;
+
 /**
  * A tuple of an object of type E and an integer index.
- * Used inside {@link EvaluatorToPartitionMapper} to
+ * Used inside {@link EvaluatorToPartitionStrategy} implementations to
  * mark the partitions associated with each {@link 
org.apache.hadoop.mapred.InputSplit}
  *
  * @param <E>
  */
-final class NumberedSplit<E> implements Comparable<NumberedSplit<E>> {
+public final class NumberedSplit<E> implements Comparable<NumberedSplit<E>> {
   private final E entry;
   private final int index;
+  private final DistributedDataSetPartition partition;
 
-  public NumberedSplit(final E entry, final int index) {
-    super();
-    if (entry == null) {
-      throw new IllegalArgumentException("Entry cannot be null");
-    }
+  public NumberedSplit(final E entry, final int index, final 
DistributedDataSetPartition partition) {
+    Validate.notNull(entry, "Entry cannot be null");
+    Validate.notNull(partition, "Partition cannot be null");
     this.entry = entry;
     this.index = index;
+    this.partition = partition;
+  }
+
+  public String getPath() {
+    return partition.getPath();
+  }
+
+  public String getLocation() {
+    return partition.getLocation();
   }
 
   public E getEntry() {
@@ -48,7 +58,7 @@ final class NumberedSplit<E> implements 
Comparable<NumberedSplit<E>> {
 
   @Override
   public String toString() {
-    return "InputSplit-" + index;
+    return "InputSplit-" + partition + "-" + index;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/33812fb0/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/impl/SingleDataCenterEvaluatorToPartitionStrategy.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/impl/SingleDataCenterEvaluatorToPartitionStrategy.java
 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/impl/SingleDataCenterEvaluatorToPartitionStrategy.java
new file mode 100644
index 0000000..b0d0f6f
--- /dev/null
+++ 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/impl/SingleDataCenterEvaluatorToPartitionStrategy.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.data.loading.impl;
+
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.driver.catalog.NodeDescriptor;
+import org.apache.reef.tang.annotations.Parameter;
+
+import java.io.IOException;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.logging.Logger;
+
+import javax.inject.Inject;
+
+/**
+ * This is an online version which satisfies
+ * requests in a greedy way, for single data center network topologies.
+ */
+@DriverSide
+public final class SingleDataCenterEvaluatorToPartitionStrategy extends 
AbstractEvaluatorToPartitionStrategy {
+  private static final Logger LOG = Logger
+      .getLogger(SingleDataCenterEvaluatorToPartitionStrategy.class.getName());
+
+  @Inject
+  SingleDataCenterEvaluatorToPartitionStrategy(
+      @Parameter(JobConfExternalConstructor.InputFormatClass.class) final 
String inputFormatClassName,
+      
@Parameter(DistributedDataSetPartitionSerializer.DistributedDataSetPartitions.class)
+      final Set<String> serializedDataPartitions) {
+    super(inputFormatClassName, serializedDataPartitions);
+  }
+
+  @Override
+  protected void updateLocations(final NumberedSplit<InputSplit> 
numberedSplit) {
+    try {
+      final InputSplit split = numberedSplit.getEntry();
+      final String[] locations = split.getLocations();
+      for (final String location : locations) {
+        BlockingQueue<NumberedSplit<InputSplit>> newSplitQue = new 
LinkedBlockingQueue<NumberedSplit<InputSplit>>();
+        final BlockingQueue<NumberedSplit<InputSplit>> splitQue = 
locationToSplits.putIfAbsent(location, newSplitQue);
+        if (splitQue != null) {
+          newSplitQue = splitQue;
+        }
+        newSplitQue.add(numberedSplit);
+      }
+    } catch (final IOException e) {
+      throw new RuntimeException("Unable to get InputSplits using the 
specified InputFormat", e);
+    }
+  }
+
+  @Override
+  protected NumberedSplit<InputSplit> tryAllocate(final NodeDescriptor 
nodeDescriptor, final String evaluatorId) {
+    LOG.fine("Picking a random split from the unallocated ones");
+    return allocateSplit(evaluatorId, unallocatedSplits);
+  }
+
+}

Reply via email to