[SYSTEMML-2418] New paramserv distributed spark data partitioners

Closes #793.


Project: http://git-wip-us.apache.org/repos/asf/systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/8def2040
Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/8def2040
Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/8def2040

Branch: refs/heads/master
Commit: 8def2040be8e9704c8ee8083e8b949ffc0d74927
Parents: 2f4bce9
Author: EdgarLGB <[email protected]>
Authored: Thu Jul 12 22:25:44 2018 -0700
Committer: Matthias Boehm <[email protected]>
Committed: Thu Jul 12 23:48:38 2018 -0700

----------------------------------------------------------------------
 .../context/SparkExecutionContext.java          |   9 +-
 .../controlprogram/paramserv/DCScheme.java      |  61 ++++++
 .../controlprogram/paramserv/DRRScheme.java     |  57 ++++++
 .../controlprogram/paramserv/DRScheme.java      |  62 ++++++
 .../paramserv/DataPartitionScheme.java          |  40 ++++
 .../paramserv/DataPartitioner.java              |  39 ++--
 .../paramserv/DataPartitionerDC.java            |  55 ------
 .../paramserv/DataPartitionerDR.java            |  64 -------
 .../paramserv/DataPartitionerDRR.java           |  61 ------
 .../paramserv/DataPartitionerOR.java            |  62 ------
 .../controlprogram/paramserv/LocalPSWorker.java |   1 -
 .../controlprogram/paramserv/ORScheme.java      |  61 ++++++
 .../paramserv/ParamservUtils.java               | 141 ++++++++++++--
 .../paramserv/spark/DCSparkScheme.java          |  47 +++++
 .../paramserv/spark/DRRSparkScheme.java         |  45 +++++
 .../paramserv/spark/DRSparkScheme.java          |  69 +++++++
 .../spark/DataPartitionSparkScheme.java         |  76 ++++++++
 .../spark/DataPartitionerSparkAggregator.java   |  66 +++++++
 .../spark/DataPartitionerSparkMapper.java       |  70 +++++++
 .../paramserv/spark/ORSparkScheme.java          |  60 ++++++
 .../paramserv/spark/SparkDataPartitioner.java   | 106 ++++++++++
 .../paramserv/spark/SparkPSWorker.java          |  48 +++++
 .../cp/ParamservBuiltinCPInstruction.java       | 101 +++++-----
 .../paramserv/BaseDataPartitionerTest.java      |  80 ++++++++
 .../paramserv/DataPartitionerTest.java          | 192 -------------------
 .../paramserv/LocalDataPartitionerTest.java     | 135 +++++++++++++
 .../paramserv/SparkDataPartitionerTest.java     |  97 ++++++++++
 .../functions/paramserv/ZPackageSuite.java      |   3 +-
 28 files changed, 1396 insertions(+), 512 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/systemml/blob/8def2040/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
index a3c7677..f8d2345 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
@@ -546,7 +546,7 @@ public class SparkExecutionContext extends ExecutionContext
        }
 
        @SuppressWarnings("unchecked")
-       public PartitionedBroadcast<MatrixBlock> getBroadcastForVariable(String 
varname) {
+       public PartitionedBroadcast<MatrixBlock> 
getBroadcastForMatrixObject(MatrixObject mo) {
                //NOTE: The memory consumption of this method is the in-memory 
size of the 
                //matrix object plus the partitioned size in 1k-1k blocks. 
Since the call
                //to broadcast happens after the matrix object has been 
released, the memory
@@ -557,8 +557,6 @@ public class SparkExecutionContext extends ExecutionContext
                
                long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
 
-               MatrixObject mo = getMatrixObject(varname);
-
                PartitionedBroadcast<MatrixBlock> bret = null;
 
                //reuse existing broadcast handle
@@ -613,6 +611,11 @@ public class SparkExecutionContext extends ExecutionContext
                return bret;
        }
 
+       public PartitionedBroadcast<MatrixBlock> getBroadcastForVariable(String 
varname) {
+               MatrixObject mo = getMatrixObject(varname);
+               return getBroadcastForMatrixObject(mo);
+       }
+
        @SuppressWarnings("unchecked")
        public PartitionedBroadcast<FrameBlock> 
getBroadcastForFrameVariable(String varname) {
                long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;

http://git-wip-us.apache.org/repos/asf/systemml/blob/8def2040/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/DCScheme.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/DCScheme.java 
b/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/DCScheme.java
new file mode 100644
index 0000000..00aaa21
--- /dev/null
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/DCScheme.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.controlprogram.paramserv;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import org.apache.sysml.runtime.controlprogram.caching.MatrixObject;
+import org.apache.sysml.runtime.matrix.data.MatrixBlock;
+
+/**
+ * Disjoint_Contiguous data partitioner:
+ *
+ * for each worker, use a right indexing
+ * operation X[beg:end,] to obtain contiguous,
+ * non-overlapping partitions of rows.
+ */
+public class DCScheme extends DataPartitionScheme {
+
+       public static List<MatrixBlock> partition(int k, MatrixBlock mb) {
+               List<MatrixBlock> list = new ArrayList<>();
+               long stepSize = (long) Math.ceil((double) mb.getNumRows() / k);
+               long begin = 1;
+               while (begin < mb.getNumRows()) {
+                       long end = Math.min(begin - 1 + stepSize, 
mb.getNumRows());
+                       MatrixBlock pmo = ParamservUtils.sliceMatrixBlock(mb, 
begin, end);
+                       list.add(pmo);
+                       begin = end + 1;
+               }
+               return list;
+       }
+
+       private List<MatrixObject> doPartitioning(int k, MatrixBlock mb) {
+               return partition(k, 
mb).stream().map(ParamservUtils::newMatrixObject).collect(Collectors.toList());
+       }
+
+       @Override
+       public Result doPartitioning(int workersNum, MatrixBlock features, 
MatrixBlock labels) {
+               List<MatrixObject> pfs = doPartitioning(workersNum, features);
+               List<MatrixObject> pls = doPartitioning(workersNum, labels);
+               return new Result(pfs, pls);
+       }
+}

http://git-wip-us.apache.org/repos/asf/systemml/blob/8def2040/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/DRRScheme.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/DRRScheme.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/DRRScheme.java
new file mode 100644
index 0000000..90c62d6
--- /dev/null
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/DRRScheme.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.controlprogram.paramserv;
+
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.LongStream;
+
+import org.apache.sysml.runtime.controlprogram.caching.MatrixObject;
+import org.apache.sysml.runtime.matrix.data.MatrixBlock;
+import org.apache.sysml.runtime.util.DataConverter;
+
+/**
+ * Disjoint_Round_Robin data partitioner:
+ * for each worker, use a permutation multiply
+ * or simpler a removeEmpty such as removeEmpty
+ * (target=X, margin=rows, select=(seq(1,nrow(X))%%k)==id)
+ */
+public class DRRScheme extends DataPartitionScheme {
+
+       public static MatrixBlock removeEmpty(MatrixBlock mb, int k, int 
workerId) {
+               double[] data = LongStream.range(0, 
mb.getNumRows()).mapToDouble(l -> l % k == workerId ? 1 : 0).toArray();
+               MatrixBlock select = DataConverter.convertToMatrixBlock(data, 
true);
+               return mb.removeEmptyOperations(new MatrixBlock(), true, true, 
select);
+       }
+
+       private MatrixObject internalRemoveEmpty(MatrixBlock mb, int k, int 
workerId) {
+               MatrixObject result = 
ParamservUtils.newMatrixObject(removeEmpty(mb, k, workerId));
+               result.enableCleanup(false);
+               return result;
+       }
+
+       @Override
+       public Result doPartitioning(int workersNum, MatrixBlock features, 
MatrixBlock labels) {
+               List<MatrixObject> pfs = IntStream.range(0, 
workersNum).mapToObj(i -> internalRemoveEmpty(features, workersNum, 
i)).collect(Collectors.toList());
+               List<MatrixObject> pls = IntStream.range(0, 
workersNum).mapToObj(i -> internalRemoveEmpty(labels, workersNum, 
i)).collect(Collectors.toList());
+               return new Result(pfs, pls);
+       }
+}

http://git-wip-us.apache.org/repos/asf/systemml/blob/8def2040/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/DRScheme.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/DRScheme.java 
b/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/DRScheme.java
new file mode 100644
index 0000000..062a7ab
--- /dev/null
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/DRScheme.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.controlprogram.paramserv;
+
+import static 
org.apache.sysml.runtime.controlprogram.paramserv.ParamservUtils.SEED;
+
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import org.apache.sysml.runtime.controlprogram.caching.MatrixObject;
+import org.apache.sysml.runtime.instructions.InstructionUtils;
+import org.apache.sysml.runtime.matrix.data.MatrixBlock;
+
+/**
+ * Data partitioner Disjoint_Random:
+ * for each worker, use a permutation multiply P[beg:end,] %*% X,
+ * where P is constructed for example with 
P=table(seq(1,nrow(X)),sample(nrow(X), nrow(X))),
+ * i.e., sampling without replacement to ensure disjointness.
+ */
+public class DRScheme extends DataPartitionScheme {
+
+       private List<MatrixBlock> partition(int k, MatrixBlock mb, MatrixBlock 
permutation) {
+               int batchSize = (int) Math.ceil((double) mb.getNumRows() / k);
+               return IntStream.range(0, k).mapToObj(i -> {
+                       int begin = i * batchSize;
+                       int end = Math.min((i + 1) * batchSize, 
mb.getNumRows());
+                       MatrixBlock slicedPerm = permutation.slice(begin, end - 
1);
+                       return slicedPerm.aggregateBinaryOperations(slicedPerm, 
mb, new MatrixBlock(), InstructionUtils.getMatMultOperator(k));
+               }).collect(Collectors.toList());
+       }
+
+       private List<MatrixObject> internalDoPartitioning(int k, MatrixBlock 
mb, MatrixBlock permutation) {
+               return partition(k, mb, 
permutation).stream().map(ParamservUtils::newMatrixObject).collect(Collectors.toList());
+       }
+
+       @Override
+       public Result doPartitioning(int workersNum, MatrixBlock features, 
MatrixBlock labels) {
+               // Generate a single permutation matrix (workers use slices)
+               MatrixBlock permutation = 
ParamservUtils.generatePermutation(features.getNumRows(), SEED);
+               List<MatrixObject> pfs = internalDoPartitioning(workersNum, 
features, permutation);
+               List<MatrixObject> pls = internalDoPartitioning(workersNum, 
labels, permutation);
+               return new Result(pfs, pls);
+       }
+}

http://git-wip-us.apache.org/repos/asf/systemml/blob/8def2040/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/DataPartitionScheme.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/DataPartitionScheme.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/DataPartitionScheme.java
new file mode 100644
index 0000000..f2ea0aa
--- /dev/null
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/DataPartitionScheme.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.controlprogram.paramserv;
+
+import java.util.List;
+
+import org.apache.sysml.runtime.controlprogram.caching.MatrixObject;
+import org.apache.sysml.runtime.matrix.data.MatrixBlock;
+
+public abstract class DataPartitionScheme {
+
+       public final class Result {
+               public final List<MatrixObject> pFeatures;
+               public final List<MatrixObject> pLabels;
+
+               public Result(List<MatrixObject> pFeatures, List<MatrixObject> 
pLabels) {
+                       this.pFeatures = pFeatures;
+                       this.pLabels = pLabels;
+               }
+       }
+
+       public abstract Result doPartitioning(int workersNum, MatrixBlock 
features, MatrixBlock labels);
+}

http://git-wip-us.apache.org/repos/asf/systemml/blob/8def2040/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/DataPartitioner.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/DataPartitioner.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/DataPartitioner.java
index fce6d35..3f28cd1 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/DataPartitioner.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/DataPartitioner.java
@@ -19,22 +19,31 @@
 
 package org.apache.sysml.runtime.controlprogram.paramserv;
 
-import java.util.List;
-
-import org.apache.sysml.runtime.controlprogram.caching.MatrixObject;
-
-public abstract class DataPartitioner {
-
-       public final class Result {
-               public final List<MatrixObject> pFeatures;
-               public final List<MatrixObject> pLabels;
-
-               public Result(List<MatrixObject> pFeatures, List<MatrixObject> 
pLabels) {
-                       this.pFeatures = pFeatures;
-                       this.pLabels = pLabels;
+import org.apache.sysml.parser.Statement;
+import org.apache.sysml.runtime.matrix.data.MatrixBlock;
+
+public class DataPartitioner {
+
+       private DataPartitionScheme _scheme;
+
+       public DataPartitioner(Statement.PSScheme scheme) {
+               switch (scheme) {
+                       case DISJOINT_CONTIGUOUS:
+                               _scheme = new DCScheme();
+                               break;
+                       case DISJOINT_ROUND_ROBIN:
+                               _scheme = new DRRScheme();
+                               break;
+                       case DISJOINT_RANDOM:
+                               _scheme = new DRScheme();
+                               break;
+                       case OVERLAP_RESHUFFLE:
+                               _scheme = new ORScheme();
+                               break;
                }
        }
 
-       public abstract Result doPartitioning(int workersNum, MatrixObject 
features, MatrixObject labels);
-
+       public DataPartitionScheme.Result doPartitioning(int workersNum, 
MatrixBlock features, MatrixBlock labels) {
+               return _scheme.doPartitioning(workersNum, features, labels);
+       }
 }

http://git-wip-us.apache.org/repos/asf/systemml/blob/8def2040/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/DataPartitionerDC.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/DataPartitionerDC.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/DataPartitionerDC.java
deleted file mode 100644
index 4810541..0000000
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/DataPartitionerDC.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.sysml.runtime.controlprogram.paramserv;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.sysml.runtime.controlprogram.caching.MatrixObject;
-
-/**
- * Disjoint_Contiguous data partitioner:
- *
- * for each worker, use a right indexing
- * operation X[beg:end,] to obtain contiguous,
- * non-overlapping partitions of rows.
- */
-public class DataPartitionerDC extends DataPartitioner {
-
-       private List<MatrixObject> doPartitioning(int k, MatrixObject mo) {
-               List<MatrixObject> list = new ArrayList<>();
-               long stepSize = (long) Math.ceil((double) mo.getNumRows() / k);
-               long begin = 1;
-               while (begin < mo.getNumRows()) {
-                       long end = Math.min(begin - 1 + stepSize, 
mo.getNumRows());
-                       MatrixObject pmo = ParamservUtils.sliceMatrix(mo, 
begin, end);
-                       list.add(pmo);
-                       begin = end + 1;
-               }
-               return list;
-       }
-
-       @Override
-       public Result doPartitioning(int workersNum, MatrixObject features, 
MatrixObject labels) {
-               List<MatrixObject> pfs = doPartitioning(workersNum, features);
-               List<MatrixObject> pls = doPartitioning(workersNum, labels);
-               return new Result(pfs, pls);
-       }
-}

http://git-wip-us.apache.org/repos/asf/systemml/blob/8def2040/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/DataPartitionerDR.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/DataPartitionerDR.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/DataPartitionerDR.java
deleted file mode 100644
index adc6f60..0000000
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/DataPartitionerDR.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.sysml.runtime.controlprogram.paramserv;
-
-import java.util.List;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
-
-import org.apache.sysml.runtime.controlprogram.caching.MatrixObject;
-import org.apache.sysml.runtime.instructions.InstructionUtils;
-import org.apache.sysml.runtime.matrix.data.MatrixBlock;
-
-/**
- * Data partitioner Disjoint_Random:
- * for each worker, use a permutation multiply P[beg:end,] %*% X,
- * where P is constructed for example with 
P=table(seq(1,nrow(X)),sample(nrow(X), nrow(X))),
- * i.e., sampling without replacement to ensure disjointness.
- */
-public class DataPartitionerDR extends DataPartitioner {
-
-       private List<MatrixObject> doPartitioning(int k, MatrixObject mo, 
MatrixBlock permutation) {
-               MatrixBlock data = mo.acquireRead();
-               int batchSize = (int) Math.ceil((double) mo.getNumRows() / k);
-               List<MatrixObject> pMatrices = IntStream.range(0, k).mapToObj(i 
-> {
-                       int begin = i * batchSize;
-                       int end = (int) Math.min((i + 1) * batchSize, 
mo.getNumRows());
-                       MatrixBlock slicedPerm = permutation.slice(begin, end - 
1);
-                       MatrixBlock output = 
slicedPerm.aggregateBinaryOperations(slicedPerm,
-                               data, new MatrixBlock(), 
InstructionUtils.getMatMultOperator(k));
-                       MatrixObject result = ParamservUtils.newMatrixObject();
-                       result.acquireModify(output);
-                       result.release();
-                       return result;
-               }).collect(Collectors.toList());
-               mo.release();
-               return pMatrices;
-       }
-
-       @Override
-       public Result doPartitioning(int workersNum, MatrixObject features, 
MatrixObject labels) {
-               // Generate a single permutation matrix (workers use slices)
-               MatrixBlock permutation = 
ParamservUtils.generatePermutation((int)features.getNumRows());
-               List<MatrixObject> pfs = doPartitioning(workersNum, features, 
permutation);
-               List<MatrixObject> pls = doPartitioning(workersNum, labels, 
permutation);
-               return new Result(pfs, pls);
-       }
-}

http://git-wip-us.apache.org/repos/asf/systemml/blob/8def2040/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/DataPartitionerDRR.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/DataPartitionerDRR.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/DataPartitionerDRR.java
deleted file mode 100644
index a2ff5f9..0000000
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/DataPartitionerDRR.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.sysml.runtime.controlprogram.paramserv;
-
-import java.util.List;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
-import java.util.stream.LongStream;
-
-import org.apache.sysml.runtime.controlprogram.caching.MatrixObject;
-import org.apache.sysml.runtime.matrix.data.MatrixBlock;
-import org.apache.sysml.runtime.util.DataConverter;
-
-/**
- * Disjoint_Round_Robin data partitioner:
- * for each worker, use a permutation multiply
- * or simpler a removeEmpty such as removeEmpty
- * (target=X, margin=rows, select=(seq(1,nrow(X))%%k)==id)
- */
-public class DataPartitionerDRR extends DataPartitioner {
-
-       private MatrixObject removeEmpty(MatrixObject mo, int k, int workerId) {
-               MatrixObject result = ParamservUtils.newMatrixObject();
-               MatrixBlock tmp = mo.acquireRead();
-               double[] data = LongStream.range(1, mo.getNumRows() + 1)
-                       .mapToDouble(l -> l % k == workerId ? 1 : 0).toArray();
-               MatrixBlock select = DataConverter.convertToMatrixBlock(data, 
true);
-               MatrixBlock resultMB = tmp.removeEmptyOperations(new 
MatrixBlock(), true, true, select);
-               mo.release();
-               result.acquireModify(resultMB);
-               result.release();
-               result.enableCleanup(false);
-               return result;
-       }
-
-       @Override
-       public Result doPartitioning(int workersNum, MatrixObject features, 
MatrixObject labels) {
-               List<MatrixObject> pfs = IntStream.range(0, workersNum)
-                       .mapToObj(i -> removeEmpty(features, workersNum, 
i)).collect(Collectors.toList());
-               List<MatrixObject> pls = IntStream.range(0, workersNum)
-                   .mapToObj(i -> removeEmpty(labels, workersNum, 
i)).collect(Collectors.toList());
-               return new Result(pfs, pls);
-       }
-}

http://git-wip-us.apache.org/repos/asf/systemml/blob/8def2040/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/DataPartitionerOR.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/DataPartitionerOR.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/DataPartitionerOR.java
deleted file mode 100644
index 0bfb4b7..0000000
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/DataPartitionerOR.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.sysml.runtime.controlprogram.paramserv;
-
-import java.util.List;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
-
-import org.apache.sysml.runtime.controlprogram.caching.MatrixObject;
-import org.apache.sysml.runtime.instructions.InstructionUtils;
-import org.apache.sysml.runtime.matrix.data.MatrixBlock;
-
-/**
- * Data partitioner Overlap_Reshuffle:
- * for each worker, use a new permutation multiply P %*% X,
- * where P is constructed for example with 
P=table(seq(1,nrow(X),sample(nrow(X), nrow(X))))
- */
-public class DataPartitionerOR extends DataPartitioner {
-
-       private List<MatrixObject> doPartitioning(int k, MatrixObject mo, 
List<MatrixBlock> permutations) {
-               MatrixBlock data = mo.acquireRead();
-               List<MatrixObject> pMatrices = IntStream.range(0, k).mapToObj(i 
-> {
-                       MatrixBlock permutation = permutations.get(i);
-                       MatrixBlock output = 
permutation.aggregateBinaryOperations(permutation,
-                               data, new MatrixBlock(), 
InstructionUtils.getMatMultOperator(k));
-                       MatrixObject result = ParamservUtils.newMatrixObject();
-                       result.acquireModify(output);
-                       result.release();
-                       return result;
-               }).collect(Collectors.toList());
-               mo.release();
-               return pMatrices;
-       }
-
-       @Override
-       public Result doPartitioning(int workersNum, MatrixObject features, 
MatrixObject labels) {
-               // Generate a different permutation matrix for each worker
-               List<MatrixBlock> permutations = IntStream.range(0, workersNum)
-                       .mapToObj(i -> 
ParamservUtils.generatePermutation((int)features.getNumRows()))
-               .collect(Collectors.toList());
-               List<MatrixObject> pfs = doPartitioning(workersNum, features, 
permutations);
-               List<MatrixObject> pls = doPartitioning(workersNum, labels, 
permutations);
-               return new Result(pfs, pls);
-       }
-}

http://git-wip-us.apache.org/repos/asf/systemml/blob/8def2040/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/LocalPSWorker.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/LocalPSWorker.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/LocalPSWorker.java
index 366284c..307669e 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/LocalPSWorker.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/LocalPSWorker.java
@@ -187,5 +187,4 @@ public class LocalPSWorker extends PSWorker implements 
Callable<Void> {
                ParamservUtils.cleanupData(bLabels);
                return gradients;
        }
-
 }

http://git-wip-us.apache.org/repos/asf/systemml/blob/8def2040/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/ORScheme.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/ORScheme.java 
b/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/ORScheme.java
new file mode 100644
index 0000000..2692efa
--- /dev/null
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/ORScheme.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.controlprogram.paramserv;
+
+import static 
org.apache.sysml.runtime.controlprogram.paramserv.ParamservUtils.SEED;
+
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import org.apache.sysml.runtime.controlprogram.caching.MatrixObject;
+import org.apache.sysml.runtime.instructions.InstructionUtils;
+import org.apache.sysml.runtime.matrix.data.MatrixBlock;
+
+/**
+ * Data partitioner Overlap_Reshuffle:
+ * for each worker, use a new permutation multiply P %*% X,
+ * where P is constructed for example with 
P=table(seq(1,nrow(X),sample(nrow(X), nrow(X))))
+ */
+public class ORScheme extends DataPartitionScheme {
+
+       public static List<MatrixBlock> partition(int k, MatrixBlock mb, 
List<MatrixBlock> permutations) {
+               return IntStream.range(0, k).mapToObj(i -> {
+                       MatrixBlock permutation = permutations.get(i);
+                       return 
permutation.aggregateBinaryOperations(permutation, mb, new MatrixBlock(),
+                                       InstructionUtils.getMatMultOperator(k));
+               }).collect(Collectors.toList());
+       }
+
+       private List<MatrixObject> doPartitioning(int k, MatrixBlock mb, 
List<MatrixBlock> permutations) {
+               return partition(k, mb, 
permutations).stream().map(ParamservUtils::newMatrixObject).collect(Collectors.toList());
+       }
+
+       @Override
+       public Result doPartitioning(int workersNum, MatrixBlock features, 
MatrixBlock labels) {
+               // Generate a different permutation matrix for each worker
+               List<MatrixBlock> permutations = IntStream.range(0, workersNum)
+                       .mapToObj(i -> 
ParamservUtils.generatePermutation(features.getNumRows(), SEED+i))
+                       .collect(Collectors.toList());
+               List<MatrixObject> pfs = doPartitioning(workersNum, features, 
permutations);
+               List<MatrixObject> pls = doPartitioning(workersNum, labels, 
permutations);
+               return new Result(pfs, pls);
+       }
+}

http://git-wip-us.apache.org/repos/asf/systemml/blob/8def2040/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/ParamservUtils.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/ParamservUtils.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/ParamservUtils.java
index 3aee170..ec6b6b2 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/ParamservUtils.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/ParamservUtils.java
@@ -22,11 +22,15 @@ package org.apache.sysml.runtime.controlprogram.paramserv;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashSet;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
 import org.apache.commons.lang.StringUtils;
+import org.apache.spark.Partitioner;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.sysml.conf.ConfigurationManager;
 import org.apache.sysml.hops.Hop;
 import org.apache.sysml.hops.MultiThreadedHop;
 import org.apache.sysml.hops.OptimizerUtils;
@@ -34,6 +38,7 @@ import org.apache.sysml.hops.recompile.Recompiler;
 import org.apache.sysml.parser.DMLProgram;
 import org.apache.sysml.parser.DMLTranslator;
 import org.apache.sysml.parser.Expression;
+import org.apache.sysml.parser.Statement;
 import org.apache.sysml.parser.StatementBlock;
 import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.controlprogram.ForProgramBlock;
@@ -49,6 +54,9 @@ import 
org.apache.sysml.runtime.controlprogram.caching.FrameObject;
 import org.apache.sysml.runtime.controlprogram.caching.MatrixObject;
 import org.apache.sysml.runtime.controlprogram.context.ExecutionContext;
 import org.apache.sysml.runtime.controlprogram.context.ExecutionContextFactory;
+import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
+import 
org.apache.sysml.runtime.controlprogram.paramserv.spark.DataPartitionerSparkAggregator;
+import 
org.apache.sysml.runtime.controlprogram.paramserv.spark.DataPartitionerSparkMapper;
 import org.apache.sysml.runtime.controlprogram.parfor.ProgramConverter;
 import org.apache.sysml.runtime.functionobjects.Plus;
 import org.apache.sysml.runtime.instructions.cp.Data;
@@ -57,13 +65,18 @@ import 
org.apache.sysml.runtime.matrix.MatrixCharacteristics;
 import org.apache.sysml.runtime.matrix.MetaDataFormat;
 import org.apache.sysml.runtime.matrix.data.InputInfo;
 import org.apache.sysml.runtime.matrix.data.MatrixBlock;
+import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
 import org.apache.sysml.runtime.matrix.data.OutputInfo;
 import org.apache.sysml.runtime.matrix.operators.BinaryOperator;
 
+import scala.Tuple2;
+
 public class ParamservUtils {
 
        public static final String PS_FUNC_PREFIX = "_ps_";
 
+       public static long SEED = -1; // Used for generating permutation
+
        /**
         * Deep copy the list object
         *
@@ -105,8 +118,13 @@ public class ParamservUtils {
                cd.clearData();
        }
 
-       public static MatrixObject newMatrixObject() {
-               return new MatrixObject(Expression.ValueType.DOUBLE, 
OptimizerUtils.getUniqueTempFileName(), new MetaDataFormat(new 
MatrixCharacteristics(-1, -1, -1, -1), OutputInfo.BinaryBlockOutputInfo, 
InputInfo.BinaryBlockInputInfo));
+       public static MatrixObject newMatrixObject(MatrixBlock mb) {
+               MatrixObject result = new 
MatrixObject(Expression.ValueType.DOUBLE, 
OptimizerUtils.getUniqueTempFileName(),
+                       new MetaDataFormat(new MatrixCharacteristics(-1, -1, 
ConfigurationManager.getBlocksize(),
+                       ConfigurationManager.getBlocksize()), 
OutputInfo.BinaryBlockOutputInfo, InputInfo.BinaryBlockInputInfo));
+               result.acquireModify(mb);
+               result.release();
+               return result;
        }
 
        /**
@@ -118,20 +136,31 @@ public class ParamservUtils {
         * @return new sliced matrix
         */
        public static MatrixObject sliceMatrix(MatrixObject mo, long rl, long 
rh) {
-               MatrixObject result = newMatrixObject();
-               MatrixBlock tmp = mo.acquireRead();
-               result.acquireModify(tmp.slice((int) rl - 1, (int) rh - 1));
-               mo.release();
-               result.release();
+               MatrixBlock mb = mo.acquireRead();
+               MatrixObject result = newMatrixObject(sliceMatrixBlock(mb, rl, 
rh));
                result.enableCleanup(false);
+               mo.release();
                return result;
        }
 
-       public static MatrixBlock generatePermutation(int numEntries) {
+       /**
+        * Slice the matrix block and return a matrix block
+        * (used in spark)
+        *
+        * @param mb input matrix
+        * @param rl low boundary
+        * @param rh high boundary
+        * @return new sliced matrix block
+        */
+       public static MatrixBlock sliceMatrixBlock(MatrixBlock mb, long rl, 
long rh) {
+               return mb.slice((int) rl - 1, (int) rh - 1);
+       }
+
+       public static MatrixBlock generatePermutation(int numEntries, long 
seed) {
                // Create a sequence and sample w/o replacement
                // (no need to materialize the sequence because ctable only 
uses its meta data)
                MatrixBlock seq = new MatrixBlock(numEntries, 1, false);
-               MatrixBlock sample = MatrixBlock.sampleOperations(numEntries, 
numEntries, false, -1);
+               MatrixBlock sample = MatrixBlock.sampleOperations(numEntries, 
numEntries, false, seed);
 
                // Combine the sequence and sample as a table
                return seq.ctableSeqOperations(sample, 1.0,
@@ -257,14 +286,104 @@ public class ParamservUtils {
                return recompiled;
        }
 
-
        private static FunctionProgramBlock getFunctionBlock(ExecutionContext 
ec, String funcName) {
                String[] cfn = getCompleteFuncName(funcName, null);
                String ns = cfn[0];
                String fname = cfn[1];
                return ec.getProgram().getFunctionProgramBlock(ns, fname);
        }
-       
+
+       public static MatrixBlock cbindMatrix(MatrixBlock left, MatrixBlock 
right) {
+               return left.append(right, new MatrixBlock());
+       }
+
+       /**
+        * Assemble the matrix of features and labels according to the rowID
+        *
+        * @param numRows row size of the data
+        * @param featuresRDD indexed features matrix block
+        * @param labelsRDD indexed labels matrix block
+        * @return Assembled rdd with rowID as key while matrix of features and 
labels as value (rowID -> features, labels)
+        */
+       public static JavaPairRDD<Long, Tuple2<MatrixBlock, MatrixBlock>> 
assembleTrainingData(long numRows, JavaPairRDD<MatrixIndexes, MatrixBlock> 
featuresRDD, JavaPairRDD<MatrixIndexes, MatrixBlock> labelsRDD) {
+               JavaPairRDD<Long, MatrixBlock> fRDD = groupMatrix(numRows, 
featuresRDD);
+               JavaPairRDD<Long, MatrixBlock> lRDD = groupMatrix(numRows, 
labelsRDD);
+               //TODO Add an additional physical operator which broadcasts the 
labels directly (broadcast join with features) if certain memory budgets are 
satisfied
+               return fRDD.join(lRDD);
+       }
+
+       private static JavaPairRDD<Long, MatrixBlock> groupMatrix(long numRows, 
JavaPairRDD<MatrixIndexes, MatrixBlock> rdd) {
+               //TODO could use join and aggregation to avoid unnecessary 
shuffle introduced by reduceByKey
+               return rdd.mapToPair(input -> new 
Tuple2<>(input._1.getRowIndex(), new Tuple2<>(input._1.getColumnIndex(), 
input._2)))
+                       .aggregateByKey(new LinkedList<Tuple2<Long, 
MatrixBlock>>(),
+                               new Partitioner() {
+                                       private static final long 
serialVersionUID = -7032660778344579236L;
+                                       @Override
+                                       public int getPartition(Object rblkID) {
+                                               return Math.toIntExact((Long) 
rblkID);
+                                       }
+                                       @Override
+                                       public int numPartitions() {
+                                               return Math.toIntExact(numRows);
+                                       }
+                               },
+                               (list, input) -> {
+                                       list.add(input); 
+                                       return list;
+                               }, 
+                               (l1, l2) -> {
+                                       l1.addAll(l2);
+                                       l1.sort((o1, o2) -> 
o1._1.compareTo(o2._1));
+                                       return l1;
+                               })
+                       .mapToPair(input -> {
+                               LinkedList<Tuple2<Long, MatrixBlock>> list = 
input._2;
+                               MatrixBlock result = list.get(0)._2;
+                               for (int i = 1; i < list.size(); i++) {
+                                       result = 
ParamservUtils.cbindMatrix(result, list.get(i)._2);
+                               }
+                               return new Tuple2<>(input._1, result);
+                       });
+       }
+
+       @SuppressWarnings("unchecked")
+       public static JavaPairRDD<Integer, Tuple2<MatrixBlock, MatrixBlock>> 
doPartitionOnSpark(SparkExecutionContext sec, MatrixObject features, 
MatrixObject labels, Statement.PSScheme scheme, int workerNum) {
+               // Get input RDD
+               JavaPairRDD<MatrixIndexes, MatrixBlock> featuresRDD = 
(JavaPairRDD<MatrixIndexes, MatrixBlock>)
+                               sec.getRDDHandleForMatrixObject(features, 
InputInfo.BinaryBlockInputInfo);
+               JavaPairRDD<MatrixIndexes, MatrixBlock> labelsRDD = 
(JavaPairRDD<MatrixIndexes, MatrixBlock>)
+                               sec.getRDDHandleForMatrixObject(labels, 
InputInfo.BinaryBlockInputInfo);
+
+               DataPartitionerSparkMapper mapper = new 
DataPartitionerSparkMapper(scheme, workerNum, sec, (int) features.getNumRows());
+               return 
ParamservUtils.assembleTrainingData(features.getNumRows(), featuresRDD, 
labelsRDD) // Combine features and labels into a pair (rowBlockID => (features, 
labels))
+                       .flatMapToPair(mapper) // Do the data partitioning on 
spark (workerID => (rowBlockID, (single row features, single row labels))
+                       // Aggregate the partitioned matrix according to rowID 
for each worker
+                       // i.e. (workerID => ordered list[(rowBlockID, (single 
row features, single row labels)]
+                       .aggregateByKey(new LinkedList<Tuple2<Long, 
Tuple2<MatrixBlock, MatrixBlock>>>(),
+                               new Partitioner() {
+                                       private static final long 
serialVersionUID = -7937781374718031224L;
+                                       @Override
+                                       public int getPartition(Object 
workerID) {
+                                               return (int) workerID;
+                                       }
+                                       @Override
+                                       public int numPartitions() {
+                                               return workerNum;
+                                       }
+                               }, 
+                               (list, input) -> {
+                                       list.add(input);
+                                       return list;
+                               },
+                               (l1, l2) -> {
+                                       l1.addAll(l2);
+                                       l1.sort((o1, o2) -> 
o1._1.compareTo(o2._1));
+                                       return l1;
+                               })
+                       .mapToPair(new DataPartitionerSparkAggregator(
+                               features.getNumColumns(), 
labels.getNumColumns()));
+       }
+
        public static ListObject accrueGradients(ListObject accGradients, 
ListObject gradients) {
                return accrueGradients(accGradients, gradients, false);
        }

http://git-wip-us.apache.org/repos/asf/systemml/blob/8def2040/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/spark/DCSparkScheme.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/spark/DCSparkScheme.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/spark/DCSparkScheme.java
new file mode 100644
index 0000000..666b891
--- /dev/null
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/spark/DCSparkScheme.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.controlprogram.paramserv.spark;
+
+import java.util.List;
+
+import org.apache.sysml.runtime.matrix.data.MatrixBlock;
+
+import scala.Tuple2;
+
+/**
+ * Spark Disjoint_Contiguous data partitioner:
+ * <p>
+ * For each row, find out the shifted place according to the workerID indicator
+ */
+public class DCSparkScheme extends DataPartitionSparkScheme {
+
+       private static final long serialVersionUID = -2786906947020788787L;
+
+       protected DCSparkScheme() {
+               // No-args constructor used for deserialization
+       }
+
+       @Override
+       public Result doPartitioning(int numWorkers, int rblkID, MatrixBlock 
features, MatrixBlock labels) {
+               List<Tuple2<Integer, Tuple2<Long, MatrixBlock>>> pfs = 
nonShuffledPartition(rblkID, features);
+               List<Tuple2<Integer, Tuple2<Long, MatrixBlock>>> pls = 
nonShuffledPartition(rblkID, labels);
+               return new Result(pfs, pls);
+       }
+}

http://git-wip-us.apache.org/repos/asf/systemml/blob/8def2040/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/spark/DRRSparkScheme.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/spark/DRRSparkScheme.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/spark/DRRSparkScheme.java
new file mode 100644
index 0000000..7683251
--- /dev/null
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/spark/DRRSparkScheme.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.controlprogram.paramserv.spark;
+
+import java.util.List;
+
+import org.apache.sysml.runtime.matrix.data.MatrixBlock;
+
+import scala.Tuple2;
+
+/**
+ * Spark Disjoint_Round_Robin data partitioner:
+ */
+public class DRRSparkScheme extends DataPartitionSparkScheme {
+
+       private static final long serialVersionUID = -3130831851505549672L;
+
+       protected DRRSparkScheme() {
+               // No-args constructor used for deserialization
+       }
+
+       @Override
+       public Result doPartitioning(int numWorkers, int rblkID, MatrixBlock 
features, MatrixBlock labels) {
+               List<Tuple2<Integer, Tuple2<Long, MatrixBlock>>> pfs = 
nonShuffledPartition(rblkID, features);
+               List<Tuple2<Integer, Tuple2<Long, MatrixBlock>>> pls = 
nonShuffledPartition(rblkID, labels);
+               return new Result(pfs, pls);
+       }
+}

http://git-wip-us.apache.org/repos/asf/systemml/blob/8def2040/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/spark/DRSparkScheme.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/spark/DRSparkScheme.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/spark/DRSparkScheme.java
new file mode 100644
index 0000000..51cc523
--- /dev/null
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/spark/DRSparkScheme.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.controlprogram.paramserv.spark;
+
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import org.apache.sysml.hops.OptimizerUtils;
+import org.apache.sysml.runtime.controlprogram.paramserv.ParamservUtils;
+import org.apache.sysml.runtime.matrix.data.MatrixBlock;
+
+import scala.Tuple2;
+
+/**
+ * Spark data partitioner Disjoint_Random:
+ *
+ * For the current row block, find all the shifted place for each row 
(WorkerID => (row block ID, matrix)
+ */
+public class DRSparkScheme extends DataPartitionSparkScheme {
+
+       private static final long serialVersionUID = -7655310624144544544L;
+
+       protected DRSparkScheme() {
+               // No-args constructor used for deserialization
+       }
+
+       @Override
+       public Result doPartitioning(int numWorkers, int rblkID, MatrixBlock 
features, MatrixBlock labels) {
+               List<Tuple2<Integer, Tuple2<Long, MatrixBlock>>> pfs = 
partition(rblkID, features);
+               List<Tuple2<Integer, Tuple2<Long, MatrixBlock>>> pls = 
partition(rblkID, labels);
+               return new Result(pfs, pls);
+       }
+
+       private List<Tuple2<Integer, Tuple2<Long, MatrixBlock>>> partition(int 
rblkID, MatrixBlock mb) {
+               MatrixBlock partialPerm = _globalPerms.get(0).getBlock(rblkID, 
1);
+
+               // For each row, find out the shifted place
+               return IntStream.range(0, mb.getNumRows()).mapToObj(r -> {
+                       MatrixBlock rowMB = ParamservUtils.sliceMatrixBlock(mb, 
r + 1, r + 1);
+                       long shiftedPosition = (long) partialPerm.getValue(r, 
0);
+
+                       // Get the shifted block and position
+                       int shiftedBlkID = (int) (shiftedPosition / 
OptimizerUtils.DEFAULT_BLOCKSIZE + 1);
+
+                       MatrixBlock indicator = 
_workerIndicator.getBlock(shiftedBlkID, 1);
+                       int workerID = (int) indicator.getValue((int) 
shiftedPosition / OptimizerUtils.DEFAULT_BLOCKSIZE, 0);
+                       return new Tuple2<>(workerID, new 
Tuple2<>(shiftedPosition, rowMB));
+               }).collect(Collectors.toList());
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/systemml/blob/8def2040/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/spark/DataPartitionSparkScheme.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/spark/DataPartitionSparkScheme.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/spark/DataPartitionSparkScheme.java
new file mode 100644
index 0000000..9875dd2
--- /dev/null
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/spark/DataPartitionSparkScheme.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.controlprogram.paramserv.spark;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.LongStream;
+
+import org.apache.sysml.hops.OptimizerUtils;
+import org.apache.sysml.runtime.controlprogram.paramserv.ParamservUtils;
+import org.apache.sysml.runtime.instructions.spark.data.PartitionedBroadcast;
+import org.apache.sysml.runtime.matrix.data.MatrixBlock;
+
+import scala.Tuple2;
+
+public abstract class DataPartitionSparkScheme implements Serializable {
+
+       protected final class Result {
+               protected final List<Tuple2<Integer, Tuple2<Long, 
MatrixBlock>>> pFeatures; // WorkerID => (rowID, matrix)
+               protected final List<Tuple2<Integer, Tuple2<Long, 
MatrixBlock>>> pLabels;
+
+               protected Result(List<Tuple2<Integer, Tuple2<Long, 
MatrixBlock>>> pFeatures, List<Tuple2<Integer, Tuple2<Long, MatrixBlock>>> 
pLabels) {
+                       this.pFeatures = pFeatures;
+                       this.pLabels = pLabels;
+               }
+       }
+
+       private static final long serialVersionUID = -3462829818083371171L;
+
+       protected List<PartitionedBroadcast<MatrixBlock>> _globalPerms; // a 
list of global permutations
+       protected PartitionedBroadcast<MatrixBlock> _workerIndicator; // a 
matrix indicating to which worker the given row belongs
+
+       protected void 
setGlobalPermutation(List<PartitionedBroadcast<MatrixBlock>> gps) {
+               _globalPerms = gps;
+       }
+
+       protected void setWorkerIndicator(PartitionedBroadcast<MatrixBlock> wi) 
{
+               _workerIndicator = wi;
+       }
+
+       /**
+        * Do non-reshuffled data partitioning according to worker indicator
+        * @param rblkID row block ID
+        * @param mb Matrix
+        * @return list of tuple (workerID, (row block ID, matrix row))
+        */
+       protected List<Tuple2<Integer, Tuple2<Long, MatrixBlock>>> 
nonShuffledPartition(int rblkID, MatrixBlock mb) {
+               MatrixBlock indicator = _workerIndicator.getBlock(rblkID, 1);
+               return LongStream.range(0, mb.getNumRows()).mapToObj(r -> {
+                       int workerID = (int) indicator.getValue((int) r, 0);
+                       MatrixBlock rowMB = ParamservUtils.sliceMatrixBlock(mb, 
r + 1, r + 1);
+                       long shiftedPosition = r + (rblkID - 1) * 
OptimizerUtils.DEFAULT_BLOCKSIZE;
+                       return new Tuple2<>(workerID, new 
Tuple2<>(shiftedPosition, rowMB));
+               }).collect(Collectors.toList());
+       }
+
+       public abstract Result doPartitioning(int numWorkers, int rblkID, 
MatrixBlock features, MatrixBlock labels);
+}

http://git-wip-us.apache.org/repos/asf/systemml/blob/8def2040/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/spark/DataPartitionerSparkAggregator.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/spark/DataPartitionerSparkAggregator.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/spark/DataPartitionerSparkAggregator.java
new file mode 100644
index 0000000..39b8adf
--- /dev/null
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/spark/DataPartitionerSparkAggregator.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.controlprogram.paramserv.spark;
+
+import java.io.Serializable;
+import java.util.LinkedList;
+
+import org.apache.spark.api.java.function.PairFunction;
+import org.apache.sysml.runtime.controlprogram.caching.MatrixObject;
+import org.apache.sysml.runtime.matrix.data.MatrixBlock;
+
+import scala.Tuple2;
+
+public class DataPartitionerSparkAggregator implements 
PairFunction<Tuple2<Integer,LinkedList<Tuple2<Long,Tuple2<MatrixBlock,MatrixBlock>>>>,
 Integer, Tuple2<MatrixBlock, MatrixBlock>>, Serializable {
+
+       private static final long serialVersionUID = -1245300852709085117L;
+       private long _fcol;
+       private long _lcol;
+
+       public DataPartitionerSparkAggregator() {
+
+       }
+
+       public DataPartitionerSparkAggregator(long fcol, long lcol) {
+               _fcol = fcol;
+               _lcol = lcol;
+       }
+
+       /**
+        * Row-wise combine the matrix
+        * @param input workerID => ordered list [(rowBlockID, (features, 
labels))]
+        * @return workerID => [(features, labels)]
+        * @throws Exception Some exception
+        */
+       @Override
+       public Tuple2<Integer, Tuple2<MatrixBlock, MatrixBlock>> 
call(Tuple2<Integer, LinkedList<Tuple2<Long, Tuple2<MatrixBlock, 
MatrixBlock>>>> input) throws Exception {
+               MatrixBlock fmb = new MatrixBlock(input._2.size(), (int) _fcol, 
false);
+               MatrixBlock lmb = new MatrixBlock(input._2.size(), (int) _lcol, 
false);
+
+               for (int i = 0; i < input._2.size(); i++) {
+                       MatrixBlock tmpFMB = input._2.get(i)._2._1;
+                       MatrixBlock tmpLMB = input._2.get(i)._2._2;
+                       // Row-wise aggregation
+                       fmb = fmb.leftIndexingOperations(tmpFMB, i, i, 0, (int) 
_fcol - 1, fmb, MatrixObject.UpdateType.INPLACE_PINNED);
+                       lmb = lmb.leftIndexingOperations(tmpLMB, i, i, 0, (int) 
_lcol - 1, lmb, MatrixObject.UpdateType.INPLACE_PINNED);
+               }
+               return new Tuple2<>(input._1, new Tuple2<>(fmb, lmb));
+       }
+}

http://git-wip-us.apache.org/repos/asf/systemml/blob/8def2040/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/spark/DataPartitionerSparkMapper.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/spark/DataPartitionerSparkMapper.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/spark/DataPartitionerSparkMapper.java
new file mode 100644
index 0000000..2a69986
--- /dev/null
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/spark/DataPartitionerSparkMapper.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.controlprogram.paramserv.spark;
+
+import java.io.Serializable;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.spark.api.java.function.PairFlatMapFunction;
+import org.apache.sysml.parser.Statement;
+import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
+import org.apache.sysml.runtime.matrix.data.MatrixBlock;
+
+import scala.Tuple2;
+
+public class DataPartitionerSparkMapper implements 
PairFlatMapFunction<Tuple2<Long, Tuple2<MatrixBlock, MatrixBlock>>, Integer, 
Tuple2<Long, Tuple2<MatrixBlock, MatrixBlock>>>, Serializable {
+
+       private static final long serialVersionUID = 1710721606050403296L;
+       private int _workersNum;
+
+       private SparkDataPartitioner _dp;
+
+       protected DataPartitionerSparkMapper() {
+               // No-args constructor used for deserialization
+       }
+
+       public DataPartitionerSparkMapper(Statement.PSScheme scheme, int 
workersNum, SparkExecutionContext sec, int numEntries) {
+               _workersNum = workersNum;
+               _dp = new SparkDataPartitioner(scheme, sec, numEntries, 
workersNum);
+       }
+
+       /**
+        * Do data partitioning
+        * @param input RowBlockID => (features, labels)
+        * @return WorkerID => (rowBlockID, (single row features, single row 
labels))
+        * @throws Exception Some exception
+        */
+       @Override
+       public Iterator<Tuple2<Integer, Tuple2<Long, Tuple2<MatrixBlock, 
MatrixBlock>>>> call(Tuple2<Long,Tuple2<MatrixBlock,MatrixBlock>> input)
+                       throws Exception {
+               List<Tuple2<Integer, 
Tuple2<Long,Tuple2<MatrixBlock,MatrixBlock>>>> partitions = new LinkedList<>();
+               MatrixBlock features = input._2._1;
+               MatrixBlock labels = input._2._2;
+               DataPartitionSparkScheme.Result result = 
_dp.doPartitioning(_workersNum, features, labels, input._1);
+               for (int i = 0; i < result.pFeatures.size(); i++) {
+                       Tuple2<Integer, Tuple2<Long, MatrixBlock>> ft = 
result.pFeatures.get(i);
+                       Tuple2<Integer, Tuple2<Long, MatrixBlock>> lt = 
result.pLabels.get(i);
+                       partitions.add(new Tuple2<>(ft._1, new 
Tuple2<>(ft._2._1, new Tuple2<>(ft._2._2, lt._2._2))));
+               }
+               return partitions.iterator();
+       }
+}

http://git-wip-us.apache.org/repos/asf/systemml/blob/8def2040/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/spark/ORSparkScheme.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/spark/ORSparkScheme.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/spark/ORSparkScheme.java
new file mode 100644
index 0000000..16ce516
--- /dev/null
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/spark/ORSparkScheme.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.controlprogram.paramserv.spark;
+
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import org.apache.sysml.runtime.controlprogram.paramserv.ParamservUtils;
+import org.apache.sysml.runtime.matrix.data.MatrixBlock;
+
+import scala.Tuple2;
+
+/**
+ * Spark data partitioner Overlap_Reshuffle:
+ *
+ */
+public class ORSparkScheme extends DataPartitionSparkScheme {
+
+       private static final long serialVersionUID = 6867567406403580311L;
+
+       protected ORSparkScheme() {
+               // No-args constructor used for deserialization
+       }
+
+       @Override
+       public Result doPartitioning(int numWorkers, int rblkID, MatrixBlock 
features, MatrixBlock labels) {
+               List<Tuple2<Integer, Tuple2<Long, MatrixBlock>>> pfs = 
partition(numWorkers, rblkID, features);
+               List<Tuple2<Integer, Tuple2<Long, MatrixBlock>>> pls = 
partition(numWorkers, rblkID, labels);
+               return new Result(pfs, pls);
+       }
+
+       private List<Tuple2<Integer, Tuple2<Long, MatrixBlock>>> partition(int 
numWorkers, int rblkID, MatrixBlock mb) {
+               return IntStream.range(0, numWorkers).mapToObj(i -> 
i).flatMap(workerID -> {
+                               MatrixBlock partialPerm = 
_globalPerms.get(workerID).getBlock(rblkID, 1);
+                               return IntStream.range(0, 
mb.getNumRows()).mapToObj(r -> {
+                                       MatrixBlock rowMB = 
ParamservUtils.sliceMatrixBlock(mb, r + 1, r + 1);
+                                       long shiftedPosition = (long) 
partialPerm.getValue(r, 0);
+                                       return new Tuple2<>(workerID, new 
Tuple2<>(shiftedPosition, rowMB));
+                               });
+                       }).collect(Collectors.toList());
+       }
+}

http://git-wip-us.apache.org/repos/asf/systemml/blob/8def2040/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/spark/SparkDataPartitioner.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/spark/SparkDataPartitioner.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/spark/SparkDataPartitioner.java
new file mode 100644
index 0000000..6883d0f
--- /dev/null
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/spark/SparkDataPartitioner.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.controlprogram.paramserv.spark;
+
+import static 
org.apache.sysml.runtime.controlprogram.paramserv.ParamservUtils.SEED;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import org.apache.sysml.parser.Statement;
+import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
+import org.apache.sysml.runtime.controlprogram.paramserv.ParamservUtils;
+import org.apache.sysml.runtime.instructions.spark.data.PartitionedBroadcast;
+import org.apache.sysml.runtime.matrix.data.MatrixBlock;
+import org.apache.sysml.runtime.util.DataConverter;
+
+public class SparkDataPartitioner implements Serializable {
+
+       private static final long serialVersionUID = 6841548626711057448L;
+       private DataPartitionSparkScheme _scheme;
+
+       protected SparkDataPartitioner(Statement.PSScheme scheme, 
SparkExecutionContext sec, int numEntries, int numWorkers) {
+               switch (scheme) {
+                       case DISJOINT_CONTIGUOUS:
+                               _scheme = new DCSparkScheme();
+                               // Create the worker id indicator
+                               createDCIndicator(sec, numWorkers, numEntries);
+                               break;
+                       case DISJOINT_ROUND_ROBIN:
+                               _scheme = new DRRSparkScheme();
+                               // Create the worker id indicator
+                               createDRIndicator(sec, numWorkers, numEntries);
+                               break;
+                       case DISJOINT_RANDOM:
+                               _scheme = new DRSparkScheme();
+                               // Create the global permutation
+                               createGlobalPermutations(sec, numEntries, 1);
+                               // Create the worker id indicator
+                               createDCIndicator(sec, numWorkers, numEntries);
+                               break;
+                       case OVERLAP_RESHUFFLE:
+                               _scheme = new ORSparkScheme();
+                               // Create the global permutation seperately for 
each worker
+                               createGlobalPermutations(sec, numEntries, 
numWorkers);
+                               break;
+               }
+       }
+
+       private void createDRIndicator(SparkExecutionContext sec, int 
numWorkers, int numEntries) {
+               double[] vector = IntStream.range(0, numEntries).mapToDouble(n 
-> n % numWorkers).toArray();
+               MatrixBlock vectorMB = 
DataConverter.convertToMatrixBlock(vector, true);
+               
_scheme.setWorkerIndicator(sec.getBroadcastForMatrixObject(ParamservUtils.newMatrixObject(vectorMB)));
+       }
+
+       private void createDCIndicator(SparkExecutionContext sec, int 
numWorkers, int numEntries) {
+               double[] vector = new double[numEntries];
+               int batchSize = (int) Math.ceil((double) numEntries / 
numWorkers);
+               for (int i = 1; i < numWorkers; i++) {
+                       int begin = batchSize * i;
+                       int end = Math.min(begin + batchSize, numEntries);
+                       Arrays.fill(vector, begin, end, i);
+               }
+               MatrixBlock vectorMB = 
DataConverter.convertToMatrixBlock(vector, true);
+               
_scheme.setWorkerIndicator(sec.getBroadcastForMatrixObject(ParamservUtils.newMatrixObject(vectorMB)));
+       }
+
+       private void createGlobalPermutations(SparkExecutionContext sec, int 
numEntries, int numPerm) {
+               List<PartitionedBroadcast<MatrixBlock>> perms = 
IntStream.range(0, numPerm).mapToObj(i -> {
+                       MatrixBlock perm = 
MatrixBlock.sampleOperations(numEntries, numEntries, false, SEED+i);
+                       // Create the source-target id vector from the 
permutation ranging from 1 to number of entries
+                       double[] vector = new double[numEntries];
+                       for (int j = 0; j < perm.getDenseBlockValues().length; 
j++) {
+                               vector[(int) perm.getDenseBlockValues()[j] - 1] 
= j;
+                       }
+                       MatrixBlock vectorMB = 
DataConverter.convertToMatrixBlock(vector, true);
+                       return 
sec.getBroadcastForMatrixObject(ParamservUtils.newMatrixObject(vectorMB));
+               }).collect(Collectors.toList());
+               _scheme.setGlobalPermutation(perms);
+       }
+
+       public DataPartitionSparkScheme.Result doPartitioning(int numWorkers, 
MatrixBlock features, MatrixBlock labels,
+                       long rowID) {
+               // Set the rowID in order to get the according permutation
+               return _scheme.doPartitioning(numWorkers, (int) rowID, 
features, labels);
+       }
+}

http://git-wip-us.apache.org/repos/asf/systemml/blob/8def2040/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/spark/SparkPSWorker.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/spark/SparkPSWorker.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/spark/SparkPSWorker.java
new file mode 100644
index 0000000..69da56c
--- /dev/null
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/spark/SparkPSWorker.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.controlprogram.paramserv.spark;
+
+import java.io.Serializable;
+
+import org.apache.spark.api.java.function.VoidFunction;
+import org.apache.sysml.parser.Statement;
+import org.apache.sysml.runtime.controlprogram.caching.MatrixObject;
+import org.apache.sysml.runtime.controlprogram.context.ExecutionContext;
+import org.apache.sysml.runtime.controlprogram.paramserv.ParamServer;
+import org.apache.sysml.runtime.matrix.data.MatrixBlock;
+
+import scala.Tuple2;
+
+public class SparkPSWorker implements VoidFunction<Tuple2<Integer, 
Tuple2<MatrixBlock, MatrixBlock>>>, Serializable {
+
+       private static final long serialVersionUID = -8674739573419648732L;
+
+       public SparkPSWorker() {
+               // No-args constructor used for deserialization
+       }
+
+       public SparkPSWorker(String updFunc, Statement.PSFrequency freq, int 
epochs, long batchSize,
+                       MatrixObject valFeatures, MatrixObject valLabels, 
ExecutionContext ec, ParamServer ps) {
+       }
+
+       @Override
+       public void call(Tuple2<Integer, Tuple2<MatrixBlock, MatrixBlock>> 
input) throws Exception {
+       }
+}

http://git-wip-us.apache.org/repos/asf/systemml/blob/8def2040/src/main/java/org/apache/sysml/runtime/instructions/cp/ParamservBuiltinCPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/cp/ParamservBuiltinCPInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/cp/ParamservBuiltinCPInstruction.java
index 25bc113..3d61625 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/cp/ParamservBuiltinCPInstruction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/cp/ParamservBuiltinCPInstruction.java
@@ -58,15 +58,14 @@ import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.controlprogram.LocalVariableMap;
 import org.apache.sysml.runtime.controlprogram.caching.MatrixObject;
 import org.apache.sysml.runtime.controlprogram.context.ExecutionContext;
+import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
+import org.apache.sysml.runtime.controlprogram.paramserv.DataPartitionScheme;
 import org.apache.sysml.runtime.controlprogram.paramserv.DataPartitioner;
-import org.apache.sysml.runtime.controlprogram.paramserv.DataPartitionerDC;
-import org.apache.sysml.runtime.controlprogram.paramserv.DataPartitionerDR;
-import org.apache.sysml.runtime.controlprogram.paramserv.DataPartitionerDRR;
-import org.apache.sysml.runtime.controlprogram.paramserv.DataPartitionerOR;
 import org.apache.sysml.runtime.controlprogram.paramserv.LocalPSWorker;
 import org.apache.sysml.runtime.controlprogram.paramserv.LocalParamServer;
 import org.apache.sysml.runtime.controlprogram.paramserv.ParamServer;
 import org.apache.sysml.runtime.controlprogram.paramserv.ParamservUtils;
+import org.apache.sysml.runtime.controlprogram.paramserv.spark.SparkPSWorker;
 import 
org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
 import org.apache.sysml.runtime.controlprogram.parfor.stat.Timing;
 import org.apache.sysml.runtime.matrix.operators.Operator;
@@ -97,9 +96,32 @@ public class ParamservBuiltinCPInstruction extends 
ParameterizedBuiltinCPInstruc
 
        @Override
        public void processInstruction(ExecutionContext ec) {
-               Timing tSetup = DMLScript.STATISTICS ? new Timing(true) : null;
-
                PSModeType mode = getPSMode();
+               switch (mode) {
+                       case LOCAL:
+                               runLocally(ec, mode);
+                               break;
+                       case REMOTE_SPARK:
+                               runOnSpark((SparkExecutionContext) ec, mode);
+                               break;
+               }
+       }
+
+       private void runOnSpark(SparkExecutionContext sec, PSModeType mode) {
+               PSScheme scheme = getScheme();
+               int workerNum = getWorkerNum(mode);
+
+               MatrixObject features = 
sec.getMatrixObject(getParam(PS_FEATURES));
+               MatrixObject labels = sec.getMatrixObject(getParam(PS_LABELS));
+
+               SparkPSWorker worker = new SparkPSWorker();
+               ParamservUtils.doPartitionOnSpark(sec, features, labels, 
scheme, workerNum) // Do data partitioning
+                               .foreach(worker);   // Run remote workers
+
+       }
+
+       private void runLocally(ExecutionContext ec, PSModeType mode) {
+               Timing tSetup = DMLScript.STATISTICS ? new Timing(true) : null;
                int workerNum = getWorkerNum(mode);
                BasicThreadFactory factory = new BasicThreadFactory.Builder()
                        .namingPattern("workers-pool-thread-%d").build();
@@ -132,13 +154,13 @@ public class ParamservBuiltinCPInstruction extends 
ParameterizedBuiltinCPInstruc
                MatrixObject valFeatures = 
ec.getMatrixObject(getParam(PS_VAL_FEATURES));
                MatrixObject valLabels = 
ec.getMatrixObject(getParam(PS_VAL_LABELS));
                List<LocalPSWorker> workers = IntStream.range(0, workerNum)
-                       .mapToObj(i -> new LocalPSWorker(i, updFunc, freq, 
epochs, getBatchSize(), valFeatures, valLabels, workerECs.get(i), ps))
-                       .collect(Collectors.toList());
+                  .mapToObj(i -> new LocalPSWorker(i, updFunc, freq, epochs, 
getBatchSize(), valFeatures, valLabels, workerECs.get(i), ps))
+                  .collect(Collectors.toList());
 
                // Do data partition
                PSScheme scheme = getScheme();
-               doDataPartitioning(scheme, ec, workers);
-               
+               partitionLocally(scheme, ec, workers);
+
                if (DMLScript.STATISTICS)
                        Statistics.accPSSetupTime((long) tSetup.stop());
 
@@ -180,8 +202,6 @@ public class ParamservBuiltinCPInstruction extends 
ParameterizedBuiltinCPInstruc
                } catch (IllegalArgumentException e) {
                        throw new DMLRuntimeException(String.format("Paramserv 
function: not support ps execution mode '%s'", getParam(PS_MODE)));
                }
-               if( mode == PSModeType.REMOTE_SPARK )
-                       throw new DMLRuntimeException("Do not support remote 
spark.");
                return mode;
        }
 
@@ -236,12 +256,13 @@ public class ParamservBuiltinCPInstruction extends 
ParameterizedBuiltinCPInstruc
                switch (mode) {
                        case LOCAL:
                                // default worker number: available cores - 1 
(assign one process for agg service)
-                               int workerNum = getRemainingCores();
-                               if 
(getParameterMap().containsKey(PS_PARALLELISM))
-                                       workerNum = 
Integer.valueOf(getParam(PS_PARALLELISM));
-                               return workerNum;
+                               return 
getParameterMap().containsKey(PS_PARALLELISM) ?
+                                       
Integer.valueOf(getParam(PS_PARALLELISM)) : getRemainingCores();
+                       case REMOTE_SPARK:
+                               return 
getParameterMap().containsKey(PS_PARALLELISM) ?
+                                       
Integer.valueOf(getParam(PS_PARALLELISM)) : 
SparkExecutionContext.getDefaultParallelism(true);
                        default:
-                               throw new DMLRuntimeException("Unsupported 
parameter server: "+mode.name());
+                               throw new DMLRuntimeException("Unsupported 
parameter server: " + mode.name());
                }
        }
 
@@ -279,22 +300,24 @@ public class ParamservBuiltinCPInstruction extends 
ParameterizedBuiltinCPInstruc
                return hyperparams;
        }
 
-       private void doDataPartitioning(PSScheme scheme, ExecutionContext ec, 
List<LocalPSWorker> workers) {
+       private void partitionLocally(PSScheme scheme, ExecutionContext ec, 
List<LocalPSWorker> workers) {
                MatrixObject features = 
ec.getMatrixObject(getParam(PS_FEATURES));
                MatrixObject labels = ec.getMatrixObject(getParam(PS_LABELS));
-               switch (scheme) {
-                       case DISJOINT_CONTIGUOUS:
-                               doDataPartitioning(new DataPartitionerDC(), 
features, labels, workers);
-                               break;
-                       case DISJOINT_ROUND_ROBIN:
-                               doDataPartitioning(new DataPartitionerDRR(), 
features, labels, workers);
-                               break;
-                       case DISJOINT_RANDOM:
-                               doDataPartitioning(new DataPartitionerDR(), 
features, labels, workers);
-                               break;
-                       case OVERLAP_RESHUFFLE:
-                               doDataPartitioning(new DataPartitionerOR(), 
features, labels, workers);
-                               break;
+               DataPartitionScheme.Result result = new 
DataPartitioner(scheme).doPartitioning(workers.size(), features.acquireRead(), 
labels.acquireRead());
+               features.release();
+               labels.release();
+               List<MatrixObject> pfs = result.pFeatures;
+               List<MatrixObject> pls = result.pLabels;
+               if (pfs.size() < workers.size()) {
+                       if (LOG.isWarnEnabled()) {
+                               LOG.warn(String.format("There is only %d 
batches of data but has %d workers. "
+                                       + "Hence, reset the number of workers 
with %d.", pfs.size(), workers.size(), pfs.size()));
+                       }
+                       workers = workers.subList(0, pfs.size());
+               }
+               for (int i = 0; i < workers.size(); i++) {
+                       workers.get(i).setFeatures(pfs.get(i));
+                       workers.get(i).setLabels(pls.get(i));
                }
        }
 
@@ -310,20 +333,4 @@ public class ParamservBuiltinCPInstruction extends 
ParameterizedBuiltinCPInstruc
                return scheme;
        }
 
-       private void doDataPartitioning(DataPartitioner dp, MatrixObject 
features, MatrixObject labels, List<LocalPSWorker> workers) {
-               DataPartitioner.Result result = 
dp.doPartitioning(workers.size(), features, labels);
-               List<MatrixObject> pfs = result.pFeatures;
-               List<MatrixObject> pls = result.pLabels;
-               if (pfs.size() < workers.size()) {
-                       if (LOG.isWarnEnabled()) {
-                               LOG.warn(String.format("There is only %d 
batches of data but has %d workers. "
-                                       + "Hence, reset the number of workers 
with %d.", pfs.size(), workers.size(), pfs.size()));
-                       }
-                       workers = workers.subList(0, pfs.size());
-               }
-               for (int i = 0; i < workers.size(); i++) {
-                       workers.get(i).setFeatures(pfs.get(i));
-                       workers.get(i).setLabels(pls.get(i));
-               }
-       }
 }

http://git-wip-us.apache.org/repos/asf/systemml/blob/8def2040/src/test/java/org/apache/sysml/test/integration/functions/paramserv/BaseDataPartitionerTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/sysml/test/integration/functions/paramserv/BaseDataPartitionerTest.java
 
b/src/test/java/org/apache/sysml/test/integration/functions/paramserv/BaseDataPartitionerTest.java
new file mode 100644
index 0000000..0092aed
--- /dev/null
+++ 
b/src/test/java/org/apache/sysml/test/integration/functions/paramserv/BaseDataPartitionerTest.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.test.integration.functions.paramserv;
+
+import java.util.stream.IntStream;
+
+import org.apache.sysml.parser.Statement;
+import org.apache.sysml.runtime.controlprogram.paramserv.DataPartitionScheme;
+import org.apache.sysml.runtime.controlprogram.paramserv.DataPartitioner;
+import org.apache.sysml.runtime.controlprogram.paramserv.ParamservUtils;
+import org.apache.sysml.runtime.matrix.data.MatrixBlock;
+import org.apache.sysml.runtime.util.DataConverter;
+
+public abstract class BaseDataPartitionerTest {
+
+       protected static final int ROW_SIZE = 4000;
+       protected static final int COL_SIZE = 2000;
+       protected static final int WORKER_NUM = 4;
+
+       protected MatrixBlock[] generateData() {
+               double[][] df = new 
double[BaseDataPartitionerTest.ROW_SIZE][BaseDataPartitionerTest.COL_SIZE];
+               for (int i = 0; i < BaseDataPartitionerTest.ROW_SIZE; i++) {
+                       for (int j = 0; j < BaseDataPartitionerTest.COL_SIZE; 
j++) {
+                               df[i][j] = i * BaseDataPartitionerTest.COL_SIZE 
+ j;
+                       }
+               }
+               double[] dl = new double[BaseDataPartitionerTest.ROW_SIZE];
+               for (int i = 0; i < BaseDataPartitionerTest.ROW_SIZE; i++) {
+                       dl[i] = i;
+               }
+               MatrixBlock fmb = DataConverter.convertToMatrixBlock(df);
+               MatrixBlock lmb = DataConverter.convertToMatrixBlock(dl, true);
+               return new MatrixBlock[] { fmb, lmb };
+       }
+
+       protected double[] generateExpectedData(int from, int to) {
+               return IntStream.range(from, to).mapToDouble(i -> (double) 
i).toArray();
+       }
+
+       protected DataPartitionScheme.Result launchLocalDataPartitionerDC() {
+               DataPartitioner dp = new 
DataPartitioner(Statement.PSScheme.DISJOINT_CONTIGUOUS);
+               MatrixBlock[] mbs = generateData();
+               return dp.doPartitioning(WORKER_NUM, mbs[0], mbs[1]);
+       }
+
+       protected DataPartitionScheme.Result 
launchLocalDataPartitionerDR(MatrixBlock[] mbs) {
+               ParamservUtils.SEED = System.nanoTime();
+               DataPartitioner dp = new 
DataPartitioner(Statement.PSScheme.DISJOINT_RANDOM);
+               return dp.doPartitioning(WORKER_NUM, mbs[0], mbs[1]);
+       }
+
+       protected DataPartitionScheme.Result launchLocalDataPartitionerDRR() {
+               DataPartitioner dp = new 
DataPartitioner(Statement.PSScheme.DISJOINT_ROUND_ROBIN);
+               MatrixBlock[] mbs = generateData();
+               return dp.doPartitioning(WORKER_NUM, mbs[0], mbs[1]);
+       }
+
+       protected DataPartitionScheme.Result launchLocalDataPartitionerOR() {
+               DataPartitioner dp = new 
DataPartitioner(Statement.PSScheme.OVERLAP_RESHUFFLE);
+               MatrixBlock[] mbs = generateData();
+               return dp.doPartitioning(WORKER_NUM, mbs[0], mbs[1]);
+       }
+}

Reply via email to