[SYSTEMML-2418] New paramserv distributed spark data partitioners Closes #793.
Project: http://git-wip-us.apache.org/repos/asf/systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/8def2040 Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/8def2040 Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/8def2040 Branch: refs/heads/master Commit: 8def2040be8e9704c8ee8083e8b949ffc0d74927 Parents: 2f4bce9 Author: EdgarLGB <[email protected]> Authored: Thu Jul 12 22:25:44 2018 -0700 Committer: Matthias Boehm <[email protected]> Committed: Thu Jul 12 23:48:38 2018 -0700 ---------------------------------------------------------------------- .../context/SparkExecutionContext.java | 9 +- .../controlprogram/paramserv/DCScheme.java | 61 ++++++ .../controlprogram/paramserv/DRRScheme.java | 57 ++++++ .../controlprogram/paramserv/DRScheme.java | 62 ++++++ .../paramserv/DataPartitionScheme.java | 40 ++++ .../paramserv/DataPartitioner.java | 39 ++-- .../paramserv/DataPartitionerDC.java | 55 ------ .../paramserv/DataPartitionerDR.java | 64 ------- .../paramserv/DataPartitionerDRR.java | 61 ------ .../paramserv/DataPartitionerOR.java | 62 ------ .../controlprogram/paramserv/LocalPSWorker.java | 1 - .../controlprogram/paramserv/ORScheme.java | 61 ++++++ .../paramserv/ParamservUtils.java | 141 ++++++++++++-- .../paramserv/spark/DCSparkScheme.java | 47 +++++ .../paramserv/spark/DRRSparkScheme.java | 45 +++++ .../paramserv/spark/DRSparkScheme.java | 69 +++++++ .../spark/DataPartitionSparkScheme.java | 76 ++++++++ .../spark/DataPartitionerSparkAggregator.java | 66 +++++++ .../spark/DataPartitionerSparkMapper.java | 70 +++++++ .../paramserv/spark/ORSparkScheme.java | 60 ++++++ .../paramserv/spark/SparkDataPartitioner.java | 106 ++++++++++ .../paramserv/spark/SparkPSWorker.java | 48 +++++ .../cp/ParamservBuiltinCPInstruction.java | 101 +++++----- .../paramserv/BaseDataPartitionerTest.java | 80 ++++++++ .../paramserv/DataPartitionerTest.java | 192 ------------------- .../paramserv/LocalDataPartitionerTest.java | 135 +++++++++++++ .../paramserv/SparkDataPartitionerTest.java | 97 ++++++++++ .../functions/paramserv/ZPackageSuite.java | 3 +- 28 files changed, 1396 insertions(+), 512 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/systemml/blob/8def2040/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java b/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java index a3c7677..f8d2345 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java @@ -546,7 +546,7 @@ public class SparkExecutionContext extends ExecutionContext } @SuppressWarnings("unchecked") - public PartitionedBroadcast<MatrixBlock> getBroadcastForVariable(String varname) { + public PartitionedBroadcast<MatrixBlock> getBroadcastForMatrixObject(MatrixObject mo) { //NOTE: The memory consumption of this method is the in-memory size of the //matrix object plus the partitioned size in 1k-1k blocks. Since the call //to broadcast happens after the matrix object has been released, the memory @@ -557,8 +557,6 @@ public class SparkExecutionContext extends ExecutionContext long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0; - MatrixObject mo = getMatrixObject(varname); - PartitionedBroadcast<MatrixBlock> bret = null; //reuse existing broadcast handle @@ -613,6 +611,11 @@ public class SparkExecutionContext extends ExecutionContext return bret; } + public PartitionedBroadcast<MatrixBlock> getBroadcastForVariable(String varname) { + MatrixObject mo = getMatrixObject(varname); + return getBroadcastForMatrixObject(mo); + } + @SuppressWarnings("unchecked") public PartitionedBroadcast<FrameBlock> getBroadcastForFrameVariable(String varname) { long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0; http://git-wip-us.apache.org/repos/asf/systemml/blob/8def2040/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/DCScheme.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/DCScheme.java b/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/DCScheme.java new file mode 100644 index 0000000..00aaa21 --- /dev/null +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/DCScheme.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysml.runtime.controlprogram.paramserv; + +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; + +import org.apache.sysml.runtime.controlprogram.caching.MatrixObject; +import org.apache.sysml.runtime.matrix.data.MatrixBlock; + +/** + * Disjoint_Contiguous data partitioner: + * + * for each worker, use a right indexing + * operation X[beg:end,] to obtain contiguous, + * non-overlapping partitions of rows. + */ +public class DCScheme extends DataPartitionScheme { + + public static List<MatrixBlock> partition(int k, MatrixBlock mb) { + List<MatrixBlock> list = new ArrayList<>(); + long stepSize = (long) Math.ceil((double) mb.getNumRows() / k); + long begin = 1; + while (begin < mb.getNumRows()) { + long end = Math.min(begin - 1 + stepSize, mb.getNumRows()); + MatrixBlock pmo = ParamservUtils.sliceMatrixBlock(mb, begin, end); + list.add(pmo); + begin = end + 1; + } + return list; + } + + private List<MatrixObject> doPartitioning(int k, MatrixBlock mb) { + return partition(k, mb).stream().map(ParamservUtils::newMatrixObject).collect(Collectors.toList()); + } + + @Override + public Result doPartitioning(int workersNum, MatrixBlock features, MatrixBlock labels) { + List<MatrixObject> pfs = doPartitioning(workersNum, features); + List<MatrixObject> pls = doPartitioning(workersNum, labels); + return new Result(pfs, pls); + } +} http://git-wip-us.apache.org/repos/asf/systemml/blob/8def2040/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/DRRScheme.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/DRRScheme.java b/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/DRRScheme.java new file mode 100644 index 0000000..90c62d6 --- /dev/null +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/DRRScheme.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysml.runtime.controlprogram.paramserv; + +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import java.util.stream.LongStream; + +import org.apache.sysml.runtime.controlprogram.caching.MatrixObject; +import org.apache.sysml.runtime.matrix.data.MatrixBlock; +import org.apache.sysml.runtime.util.DataConverter; + +/** + * Disjoint_Round_Robin data partitioner: + * for each worker, use a permutation multiply + * or simpler a removeEmpty such as removeEmpty + * (target=X, margin=rows, select=(seq(1,nrow(X))%%k)==id) + */ +public class DRRScheme extends DataPartitionScheme { + + public static MatrixBlock removeEmpty(MatrixBlock mb, int k, int workerId) { + double[] data = LongStream.range(0, mb.getNumRows()).mapToDouble(l -> l % k == workerId ? 1 : 0).toArray(); + MatrixBlock select = DataConverter.convertToMatrixBlock(data, true); + return mb.removeEmptyOperations(new MatrixBlock(), true, true, select); + } + + private MatrixObject internalRemoveEmpty(MatrixBlock mb, int k, int workerId) { + MatrixObject result = ParamservUtils.newMatrixObject(removeEmpty(mb, k, workerId)); + result.enableCleanup(false); + return result; + } + + @Override + public Result doPartitioning(int workersNum, MatrixBlock features, MatrixBlock labels) { + List<MatrixObject> pfs = IntStream.range(0, workersNum).mapToObj(i -> internalRemoveEmpty(features, workersNum, i)).collect(Collectors.toList()); + List<MatrixObject> pls = IntStream.range(0, workersNum).mapToObj(i -> internalRemoveEmpty(labels, workersNum, i)).collect(Collectors.toList()); + return new Result(pfs, pls); + } +} http://git-wip-us.apache.org/repos/asf/systemml/blob/8def2040/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/DRScheme.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/DRScheme.java b/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/DRScheme.java new file mode 100644 index 0000000..062a7ab --- /dev/null +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/DRScheme.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysml.runtime.controlprogram.paramserv; + +import static org.apache.sysml.runtime.controlprogram.paramserv.ParamservUtils.SEED; + +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import org.apache.sysml.runtime.controlprogram.caching.MatrixObject; +import org.apache.sysml.runtime.instructions.InstructionUtils; +import org.apache.sysml.runtime.matrix.data.MatrixBlock; + +/** + * Data partitioner Disjoint_Random: + * for each worker, use a permutation multiply P[beg:end,] %*% X, + * where P is constructed for example with P=table(seq(1,nrow(X)),sample(nrow(X), nrow(X))), + * i.e., sampling without replacement to ensure disjointness. + */ +public class DRScheme extends DataPartitionScheme { + + private List<MatrixBlock> partition(int k, MatrixBlock mb, MatrixBlock permutation) { + int batchSize = (int) Math.ceil((double) mb.getNumRows() / k); + return IntStream.range(0, k).mapToObj(i -> { + int begin = i * batchSize; + int end = Math.min((i + 1) * batchSize, mb.getNumRows()); + MatrixBlock slicedPerm = permutation.slice(begin, end - 1); + return slicedPerm.aggregateBinaryOperations(slicedPerm, mb, new MatrixBlock(), InstructionUtils.getMatMultOperator(k)); + }).collect(Collectors.toList()); + } + + private List<MatrixObject> internalDoPartitioning(int k, MatrixBlock mb, MatrixBlock permutation) { + return partition(k, mb, permutation).stream().map(ParamservUtils::newMatrixObject).collect(Collectors.toList()); + } + + @Override + public Result doPartitioning(int workersNum, MatrixBlock features, MatrixBlock labels) { + // Generate a single permutation matrix (workers use slices) + MatrixBlock permutation = ParamservUtils.generatePermutation(features.getNumRows(), SEED); + List<MatrixObject> pfs = internalDoPartitioning(workersNum, features, permutation); + List<MatrixObject> pls = internalDoPartitioning(workersNum, labels, permutation); + return new Result(pfs, pls); + } +} http://git-wip-us.apache.org/repos/asf/systemml/blob/8def2040/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/DataPartitionScheme.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/DataPartitionScheme.java b/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/DataPartitionScheme.java new file mode 100644 index 0000000..f2ea0aa --- /dev/null +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/DataPartitionScheme.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysml.runtime.controlprogram.paramserv; + +import java.util.List; + +import org.apache.sysml.runtime.controlprogram.caching.MatrixObject; +import org.apache.sysml.runtime.matrix.data.MatrixBlock; + +public abstract class DataPartitionScheme { + + public final class Result { + public final List<MatrixObject> pFeatures; + public final List<MatrixObject> pLabels; + + public Result(List<MatrixObject> pFeatures, List<MatrixObject> pLabels) { + this.pFeatures = pFeatures; + this.pLabels = pLabels; + } + } + + public abstract Result doPartitioning(int workersNum, MatrixBlock features, MatrixBlock labels); +} http://git-wip-us.apache.org/repos/asf/systemml/blob/8def2040/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/DataPartitioner.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/DataPartitioner.java b/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/DataPartitioner.java index fce6d35..3f28cd1 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/DataPartitioner.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/DataPartitioner.java @@ -19,22 +19,31 @@ package org.apache.sysml.runtime.controlprogram.paramserv; -import java.util.List; - -import org.apache.sysml.runtime.controlprogram.caching.MatrixObject; - -public abstract class DataPartitioner { - - public final class Result { - public final List<MatrixObject> pFeatures; - public final List<MatrixObject> pLabels; - - public Result(List<MatrixObject> pFeatures, List<MatrixObject> pLabels) { - this.pFeatures = pFeatures; - this.pLabels = pLabels; +import org.apache.sysml.parser.Statement; +import org.apache.sysml.runtime.matrix.data.MatrixBlock; + +public class DataPartitioner { + + private DataPartitionScheme _scheme; + + public DataPartitioner(Statement.PSScheme scheme) { + switch (scheme) { + case DISJOINT_CONTIGUOUS: + _scheme = new DCScheme(); + break; + case DISJOINT_ROUND_ROBIN: + _scheme = new DRRScheme(); + break; + case DISJOINT_RANDOM: + _scheme = new DRScheme(); + break; + case OVERLAP_RESHUFFLE: + _scheme = new ORScheme(); + break; } } - public abstract Result doPartitioning(int workersNum, MatrixObject features, MatrixObject labels); - + public DataPartitionScheme.Result doPartitioning(int workersNum, MatrixBlock features, MatrixBlock labels) { + return _scheme.doPartitioning(workersNum, features, labels); + } } http://git-wip-us.apache.org/repos/asf/systemml/blob/8def2040/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/DataPartitionerDC.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/DataPartitionerDC.java b/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/DataPartitionerDC.java deleted file mode 100644 index 4810541..0000000 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/DataPartitionerDC.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.sysml.runtime.controlprogram.paramserv; - -import java.util.ArrayList; -import java.util.List; - -import org.apache.sysml.runtime.controlprogram.caching.MatrixObject; - -/** - * Disjoint_Contiguous data partitioner: - * - * for each worker, use a right indexing - * operation X[beg:end,] to obtain contiguous, - * non-overlapping partitions of rows. - */ -public class DataPartitionerDC extends DataPartitioner { - - private List<MatrixObject> doPartitioning(int k, MatrixObject mo) { - List<MatrixObject> list = new ArrayList<>(); - long stepSize = (long) Math.ceil((double) mo.getNumRows() / k); - long begin = 1; - while (begin < mo.getNumRows()) { - long end = Math.min(begin - 1 + stepSize, mo.getNumRows()); - MatrixObject pmo = ParamservUtils.sliceMatrix(mo, begin, end); - list.add(pmo); - begin = end + 1; - } - return list; - } - - @Override - public Result doPartitioning(int workersNum, MatrixObject features, MatrixObject labels) { - List<MatrixObject> pfs = doPartitioning(workersNum, features); - List<MatrixObject> pls = doPartitioning(workersNum, labels); - return new Result(pfs, pls); - } -} http://git-wip-us.apache.org/repos/asf/systemml/blob/8def2040/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/DataPartitionerDR.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/DataPartitionerDR.java b/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/DataPartitionerDR.java deleted file mode 100644 index adc6f60..0000000 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/DataPartitionerDR.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.sysml.runtime.controlprogram.paramserv; - -import java.util.List; -import java.util.stream.Collectors; -import java.util.stream.IntStream; - -import org.apache.sysml.runtime.controlprogram.caching.MatrixObject; -import org.apache.sysml.runtime.instructions.InstructionUtils; -import org.apache.sysml.runtime.matrix.data.MatrixBlock; - -/** - * Data partitioner Disjoint_Random: - * for each worker, use a permutation multiply P[beg:end,] %*% X, - * where P is constructed for example with P=table(seq(1,nrow(X)),sample(nrow(X), nrow(X))), - * i.e., sampling without replacement to ensure disjointness. - */ -public class DataPartitionerDR extends DataPartitioner { - - private List<MatrixObject> doPartitioning(int k, MatrixObject mo, MatrixBlock permutation) { - MatrixBlock data = mo.acquireRead(); - int batchSize = (int) Math.ceil((double) mo.getNumRows() / k); - List<MatrixObject> pMatrices = IntStream.range(0, k).mapToObj(i -> { - int begin = i * batchSize; - int end = (int) Math.min((i + 1) * batchSize, mo.getNumRows()); - MatrixBlock slicedPerm = permutation.slice(begin, end - 1); - MatrixBlock output = slicedPerm.aggregateBinaryOperations(slicedPerm, - data, new MatrixBlock(), InstructionUtils.getMatMultOperator(k)); - MatrixObject result = ParamservUtils.newMatrixObject(); - result.acquireModify(output); - result.release(); - return result; - }).collect(Collectors.toList()); - mo.release(); - return pMatrices; - } - - @Override - public Result doPartitioning(int workersNum, MatrixObject features, MatrixObject labels) { - // Generate a single permutation matrix (workers use slices) - MatrixBlock permutation = ParamservUtils.generatePermutation((int)features.getNumRows()); - List<MatrixObject> pfs = doPartitioning(workersNum, features, permutation); - List<MatrixObject> pls = doPartitioning(workersNum, labels, permutation); - return new Result(pfs, pls); - } -} http://git-wip-us.apache.org/repos/asf/systemml/blob/8def2040/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/DataPartitionerDRR.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/DataPartitionerDRR.java b/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/DataPartitionerDRR.java deleted file mode 100644 index a2ff5f9..0000000 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/DataPartitionerDRR.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.sysml.runtime.controlprogram.paramserv; - -import java.util.List; -import java.util.stream.Collectors; -import java.util.stream.IntStream; -import java.util.stream.LongStream; - -import org.apache.sysml.runtime.controlprogram.caching.MatrixObject; -import org.apache.sysml.runtime.matrix.data.MatrixBlock; -import org.apache.sysml.runtime.util.DataConverter; - -/** - * Disjoint_Round_Robin data partitioner: - * for each worker, use a permutation multiply - * or simpler a removeEmpty such as removeEmpty - * (target=X, margin=rows, select=(seq(1,nrow(X))%%k)==id) - */ -public class DataPartitionerDRR extends DataPartitioner { - - private MatrixObject removeEmpty(MatrixObject mo, int k, int workerId) { - MatrixObject result = ParamservUtils.newMatrixObject(); - MatrixBlock tmp = mo.acquireRead(); - double[] data = LongStream.range(1, mo.getNumRows() + 1) - .mapToDouble(l -> l % k == workerId ? 1 : 0).toArray(); - MatrixBlock select = DataConverter.convertToMatrixBlock(data, true); - MatrixBlock resultMB = tmp.removeEmptyOperations(new MatrixBlock(), true, true, select); - mo.release(); - result.acquireModify(resultMB); - result.release(); - result.enableCleanup(false); - return result; - } - - @Override - public Result doPartitioning(int workersNum, MatrixObject features, MatrixObject labels) { - List<MatrixObject> pfs = IntStream.range(0, workersNum) - .mapToObj(i -> removeEmpty(features, workersNum, i)).collect(Collectors.toList()); - List<MatrixObject> pls = IntStream.range(0, workersNum) - .mapToObj(i -> removeEmpty(labels, workersNum, i)).collect(Collectors.toList()); - return new Result(pfs, pls); - } -} http://git-wip-us.apache.org/repos/asf/systemml/blob/8def2040/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/DataPartitionerOR.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/DataPartitionerOR.java b/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/DataPartitionerOR.java deleted file mode 100644 index 0bfb4b7..0000000 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/DataPartitionerOR.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.sysml.runtime.controlprogram.paramserv; - -import java.util.List; -import java.util.stream.Collectors; -import java.util.stream.IntStream; - -import org.apache.sysml.runtime.controlprogram.caching.MatrixObject; -import org.apache.sysml.runtime.instructions.InstructionUtils; -import org.apache.sysml.runtime.matrix.data.MatrixBlock; - -/** - * Data partitioner Overlap_Reshuffle: - * for each worker, use a new permutation multiply P %*% X, - * where P is constructed for example with P=table(seq(1,nrow(X),sample(nrow(X), nrow(X)))) - */ -public class DataPartitionerOR extends DataPartitioner { - - private List<MatrixObject> doPartitioning(int k, MatrixObject mo, List<MatrixBlock> permutations) { - MatrixBlock data = mo.acquireRead(); - List<MatrixObject> pMatrices = IntStream.range(0, k).mapToObj(i -> { - MatrixBlock permutation = permutations.get(i); - MatrixBlock output = permutation.aggregateBinaryOperations(permutation, - data, new MatrixBlock(), InstructionUtils.getMatMultOperator(k)); - MatrixObject result = ParamservUtils.newMatrixObject(); - result.acquireModify(output); - result.release(); - return result; - }).collect(Collectors.toList()); - mo.release(); - return pMatrices; - } - - @Override - public Result doPartitioning(int workersNum, MatrixObject features, MatrixObject labels) { - // Generate a different permutation matrix for each worker - List<MatrixBlock> permutations = IntStream.range(0, workersNum) - .mapToObj(i -> ParamservUtils.generatePermutation((int)features.getNumRows())) - .collect(Collectors.toList()); - List<MatrixObject> pfs = doPartitioning(workersNum, features, permutations); - List<MatrixObject> pls = doPartitioning(workersNum, labels, permutations); - return new Result(pfs, pls); - } -} http://git-wip-us.apache.org/repos/asf/systemml/blob/8def2040/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/LocalPSWorker.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/LocalPSWorker.java b/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/LocalPSWorker.java index 366284c..307669e 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/LocalPSWorker.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/LocalPSWorker.java @@ -187,5 +187,4 @@ public class LocalPSWorker extends PSWorker implements Callable<Void> { ParamservUtils.cleanupData(bLabels); return gradients; } - } http://git-wip-us.apache.org/repos/asf/systemml/blob/8def2040/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/ORScheme.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/ORScheme.java b/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/ORScheme.java new file mode 100644 index 0000000..2692efa --- /dev/null +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/ORScheme.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysml.runtime.controlprogram.paramserv; + +import static org.apache.sysml.runtime.controlprogram.paramserv.ParamservUtils.SEED; + +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import org.apache.sysml.runtime.controlprogram.caching.MatrixObject; +import org.apache.sysml.runtime.instructions.InstructionUtils; +import org.apache.sysml.runtime.matrix.data.MatrixBlock; + +/** + * Data partitioner Overlap_Reshuffle: + * for each worker, use a new permutation multiply P %*% X, + * where P is constructed for example with P=table(seq(1,nrow(X),sample(nrow(X), nrow(X)))) + */ +public class ORScheme extends DataPartitionScheme { + + public static List<MatrixBlock> partition(int k, MatrixBlock mb, List<MatrixBlock> permutations) { + return IntStream.range(0, k).mapToObj(i -> { + MatrixBlock permutation = permutations.get(i); + return permutation.aggregateBinaryOperations(permutation, mb, new MatrixBlock(), + InstructionUtils.getMatMultOperator(k)); + }).collect(Collectors.toList()); + } + + private List<MatrixObject> doPartitioning(int k, MatrixBlock mb, List<MatrixBlock> permutations) { + return partition(k, mb, permutations).stream().map(ParamservUtils::newMatrixObject).collect(Collectors.toList()); + } + + @Override + public Result doPartitioning(int workersNum, MatrixBlock features, MatrixBlock labels) { + // Generate a different permutation matrix for each worker + List<MatrixBlock> permutations = IntStream.range(0, workersNum) + .mapToObj(i -> ParamservUtils.generatePermutation(features.getNumRows(), SEED+i)) + .collect(Collectors.toList()); + List<MatrixObject> pfs = doPartitioning(workersNum, features, permutations); + List<MatrixObject> pls = doPartitioning(workersNum, labels, permutations); + return new Result(pfs, pls); + } +} http://git-wip-us.apache.org/repos/asf/systemml/blob/8def2040/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/ParamservUtils.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/ParamservUtils.java b/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/ParamservUtils.java index 3aee170..ec6b6b2 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/ParamservUtils.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/ParamservUtils.java @@ -22,11 +22,15 @@ package org.apache.sysml.runtime.controlprogram.paramserv; import java.io.IOException; import java.util.ArrayList; import java.util.HashSet; +import java.util.LinkedList; import java.util.List; import java.util.stream.Collectors; import java.util.stream.IntStream; import org.apache.commons.lang.StringUtils; +import org.apache.spark.Partitioner; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.sysml.conf.ConfigurationManager; import org.apache.sysml.hops.Hop; import org.apache.sysml.hops.MultiThreadedHop; import org.apache.sysml.hops.OptimizerUtils; @@ -34,6 +38,7 @@ import org.apache.sysml.hops.recompile.Recompiler; import org.apache.sysml.parser.DMLProgram; import org.apache.sysml.parser.DMLTranslator; import org.apache.sysml.parser.Expression; +import org.apache.sysml.parser.Statement; import org.apache.sysml.parser.StatementBlock; import org.apache.sysml.runtime.DMLRuntimeException; import org.apache.sysml.runtime.controlprogram.ForProgramBlock; @@ -49,6 +54,9 @@ import org.apache.sysml.runtime.controlprogram.caching.FrameObject; import org.apache.sysml.runtime.controlprogram.caching.MatrixObject; import org.apache.sysml.runtime.controlprogram.context.ExecutionContext; import org.apache.sysml.runtime.controlprogram.context.ExecutionContextFactory; +import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext; +import org.apache.sysml.runtime.controlprogram.paramserv.spark.DataPartitionerSparkAggregator; +import org.apache.sysml.runtime.controlprogram.paramserv.spark.DataPartitionerSparkMapper; import org.apache.sysml.runtime.controlprogram.parfor.ProgramConverter; import org.apache.sysml.runtime.functionobjects.Plus; import org.apache.sysml.runtime.instructions.cp.Data; @@ -57,13 +65,18 @@ import org.apache.sysml.runtime.matrix.MatrixCharacteristics; import org.apache.sysml.runtime.matrix.MetaDataFormat; import org.apache.sysml.runtime.matrix.data.InputInfo; import org.apache.sysml.runtime.matrix.data.MatrixBlock; +import org.apache.sysml.runtime.matrix.data.MatrixIndexes; import org.apache.sysml.runtime.matrix.data.OutputInfo; import org.apache.sysml.runtime.matrix.operators.BinaryOperator; +import scala.Tuple2; + public class ParamservUtils { public static final String PS_FUNC_PREFIX = "_ps_"; + public static long SEED = -1; // Used for generating permutation + /** * Deep copy the list object * @@ -105,8 +118,13 @@ public class ParamservUtils { cd.clearData(); } - public static MatrixObject newMatrixObject() { - return new MatrixObject(Expression.ValueType.DOUBLE, OptimizerUtils.getUniqueTempFileName(), new MetaDataFormat(new MatrixCharacteristics(-1, -1, -1, -1), OutputInfo.BinaryBlockOutputInfo, InputInfo.BinaryBlockInputInfo)); + public static MatrixObject newMatrixObject(MatrixBlock mb) { + MatrixObject result = new MatrixObject(Expression.ValueType.DOUBLE, OptimizerUtils.getUniqueTempFileName(), + new MetaDataFormat(new MatrixCharacteristics(-1, -1, ConfigurationManager.getBlocksize(), + ConfigurationManager.getBlocksize()), OutputInfo.BinaryBlockOutputInfo, InputInfo.BinaryBlockInputInfo)); + result.acquireModify(mb); + result.release(); + return result; } /** @@ -118,20 +136,31 @@ public class ParamservUtils { * @return new sliced matrix */ public static MatrixObject sliceMatrix(MatrixObject mo, long rl, long rh) { - MatrixObject result = newMatrixObject(); - MatrixBlock tmp = mo.acquireRead(); - result.acquireModify(tmp.slice((int) rl - 1, (int) rh - 1)); - mo.release(); - result.release(); + MatrixBlock mb = mo.acquireRead(); + MatrixObject result = newMatrixObject(sliceMatrixBlock(mb, rl, rh)); result.enableCleanup(false); + mo.release(); return result; } - public static MatrixBlock generatePermutation(int numEntries) { + /** + * Slice the matrix block and return a matrix block + * (used in spark) + * + * @param mb input matrix + * @param rl low boundary + * @param rh high boundary + * @return new sliced matrix block + */ + public static MatrixBlock sliceMatrixBlock(MatrixBlock mb, long rl, long rh) { + return mb.slice((int) rl - 1, (int) rh - 1); + } + + public static MatrixBlock generatePermutation(int numEntries, long seed) { // Create a sequence and sample w/o replacement // (no need to materialize the sequence because ctable only uses its meta data) MatrixBlock seq = new MatrixBlock(numEntries, 1, false); - MatrixBlock sample = MatrixBlock.sampleOperations(numEntries, numEntries, false, -1); + MatrixBlock sample = MatrixBlock.sampleOperations(numEntries, numEntries, false, seed); // Combine the sequence and sample as a table return seq.ctableSeqOperations(sample, 1.0, @@ -257,14 +286,104 @@ public class ParamservUtils { return recompiled; } - private static FunctionProgramBlock getFunctionBlock(ExecutionContext ec, String funcName) { String[] cfn = getCompleteFuncName(funcName, null); String ns = cfn[0]; String fname = cfn[1]; return ec.getProgram().getFunctionProgramBlock(ns, fname); } - + + public static MatrixBlock cbindMatrix(MatrixBlock left, MatrixBlock right) { + return left.append(right, new MatrixBlock()); + } + + /** + * Assemble the matrix of features and labels according to the rowID + * + * @param numRows row size of the data + * @param featuresRDD indexed features matrix block + * @param labelsRDD indexed labels matrix block + * @return Assembled rdd with rowID as key while matrix of features and labels as value (rowID -> features, labels) + */ + public static JavaPairRDD<Long, Tuple2<MatrixBlock, MatrixBlock>> assembleTrainingData(long numRows, JavaPairRDD<MatrixIndexes, MatrixBlock> featuresRDD, JavaPairRDD<MatrixIndexes, MatrixBlock> labelsRDD) { + JavaPairRDD<Long, MatrixBlock> fRDD = groupMatrix(numRows, featuresRDD); + JavaPairRDD<Long, MatrixBlock> lRDD = groupMatrix(numRows, labelsRDD); + //TODO Add an additional physical operator which broadcasts the labels directly (broadcast join with features) if certain memory budgets are satisfied + return fRDD.join(lRDD); + } + + private static JavaPairRDD<Long, MatrixBlock> groupMatrix(long numRows, JavaPairRDD<MatrixIndexes, MatrixBlock> rdd) { + //TODO could use join and aggregation to avoid unnecessary shuffle introduced by reduceByKey + return rdd.mapToPair(input -> new Tuple2<>(input._1.getRowIndex(), new Tuple2<>(input._1.getColumnIndex(), input._2))) + .aggregateByKey(new LinkedList<Tuple2<Long, MatrixBlock>>(), + new Partitioner() { + private static final long serialVersionUID = -7032660778344579236L; + @Override + public int getPartition(Object rblkID) { + return Math.toIntExact((Long) rblkID); + } + @Override + public int numPartitions() { + return Math.toIntExact(numRows); + } + }, + (list, input) -> { + list.add(input); + return list; + }, + (l1, l2) -> { + l1.addAll(l2); + l1.sort((o1, o2) -> o1._1.compareTo(o2._1)); + return l1; + }) + .mapToPair(input -> { + LinkedList<Tuple2<Long, MatrixBlock>> list = input._2; + MatrixBlock result = list.get(0)._2; + for (int i = 1; i < list.size(); i++) { + result = ParamservUtils.cbindMatrix(result, list.get(i)._2); + } + return new Tuple2<>(input._1, result); + }); + } + + @SuppressWarnings("unchecked") + public static JavaPairRDD<Integer, Tuple2<MatrixBlock, MatrixBlock>> doPartitionOnSpark(SparkExecutionContext sec, MatrixObject features, MatrixObject labels, Statement.PSScheme scheme, int workerNum) { + // Get input RDD + JavaPairRDD<MatrixIndexes, MatrixBlock> featuresRDD = (JavaPairRDD<MatrixIndexes, MatrixBlock>) + sec.getRDDHandleForMatrixObject(features, InputInfo.BinaryBlockInputInfo); + JavaPairRDD<MatrixIndexes, MatrixBlock> labelsRDD = (JavaPairRDD<MatrixIndexes, MatrixBlock>) + sec.getRDDHandleForMatrixObject(labels, InputInfo.BinaryBlockInputInfo); + + DataPartitionerSparkMapper mapper = new DataPartitionerSparkMapper(scheme, workerNum, sec, (int) features.getNumRows()); + return ParamservUtils.assembleTrainingData(features.getNumRows(), featuresRDD, labelsRDD) // Combine features and labels into a pair (rowBlockID => (features, labels)) + .flatMapToPair(mapper) // Do the data partitioning on spark (workerID => (rowBlockID, (single row features, single row labels)) + // Aggregate the partitioned matrix according to rowID for each worker + // i.e. (workerID => ordered list[(rowBlockID, (single row features, single row labels)] + .aggregateByKey(new LinkedList<Tuple2<Long, Tuple2<MatrixBlock, MatrixBlock>>>(), + new Partitioner() { + private static final long serialVersionUID = -7937781374718031224L; + @Override + public int getPartition(Object workerID) { + return (int) workerID; + } + @Override + public int numPartitions() { + return workerNum; + } + }, + (list, input) -> { + list.add(input); + return list; + }, + (l1, l2) -> { + l1.addAll(l2); + l1.sort((o1, o2) -> o1._1.compareTo(o2._1)); + return l1; + }) + .mapToPair(new DataPartitionerSparkAggregator( + features.getNumColumns(), labels.getNumColumns())); + } + public static ListObject accrueGradients(ListObject accGradients, ListObject gradients) { return accrueGradients(accGradients, gradients, false); } http://git-wip-us.apache.org/repos/asf/systemml/blob/8def2040/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/spark/DCSparkScheme.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/spark/DCSparkScheme.java b/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/spark/DCSparkScheme.java new file mode 100644 index 0000000..666b891 --- /dev/null +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/spark/DCSparkScheme.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysml.runtime.controlprogram.paramserv.spark; + +import java.util.List; + +import org.apache.sysml.runtime.matrix.data.MatrixBlock; + +import scala.Tuple2; + +/** + * Spark Disjoint_Contiguous data partitioner: + * <p> + * For each row, find out the shifted place according to the workerID indicator + */ +public class DCSparkScheme extends DataPartitionSparkScheme { + + private static final long serialVersionUID = -2786906947020788787L; + + protected DCSparkScheme() { + // No-args constructor used for deserialization + } + + @Override + public Result doPartitioning(int numWorkers, int rblkID, MatrixBlock features, MatrixBlock labels) { + List<Tuple2<Integer, Tuple2<Long, MatrixBlock>>> pfs = nonShuffledPartition(rblkID, features); + List<Tuple2<Integer, Tuple2<Long, MatrixBlock>>> pls = nonShuffledPartition(rblkID, labels); + return new Result(pfs, pls); + } +} http://git-wip-us.apache.org/repos/asf/systemml/blob/8def2040/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/spark/DRRSparkScheme.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/spark/DRRSparkScheme.java b/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/spark/DRRSparkScheme.java new file mode 100644 index 0000000..7683251 --- /dev/null +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/spark/DRRSparkScheme.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysml.runtime.controlprogram.paramserv.spark; + +import java.util.List; + +import org.apache.sysml.runtime.matrix.data.MatrixBlock; + +import scala.Tuple2; + +/** + * Spark Disjoint_Round_Robin data partitioner: + */ +public class DRRSparkScheme extends DataPartitionSparkScheme { + + private static final long serialVersionUID = -3130831851505549672L; + + protected DRRSparkScheme() { + // No-args constructor used for deserialization + } + + @Override + public Result doPartitioning(int numWorkers, int rblkID, MatrixBlock features, MatrixBlock labels) { + List<Tuple2<Integer, Tuple2<Long, MatrixBlock>>> pfs = nonShuffledPartition(rblkID, features); + List<Tuple2<Integer, Tuple2<Long, MatrixBlock>>> pls = nonShuffledPartition(rblkID, labels); + return new Result(pfs, pls); + } +} http://git-wip-us.apache.org/repos/asf/systemml/blob/8def2040/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/spark/DRSparkScheme.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/spark/DRSparkScheme.java b/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/spark/DRSparkScheme.java new file mode 100644 index 0000000..51cc523 --- /dev/null +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/spark/DRSparkScheme.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysml.runtime.controlprogram.paramserv.spark; + +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import org.apache.sysml.hops.OptimizerUtils; +import org.apache.sysml.runtime.controlprogram.paramserv.ParamservUtils; +import org.apache.sysml.runtime.matrix.data.MatrixBlock; + +import scala.Tuple2; + +/** + * Spark data partitioner Disjoint_Random: + * + * For the current row block, find all the shifted place for each row (WorkerID => (row block ID, matrix) + */ +public class DRSparkScheme extends DataPartitionSparkScheme { + + private static final long serialVersionUID = -7655310624144544544L; + + protected DRSparkScheme() { + // No-args constructor used for deserialization + } + + @Override + public Result doPartitioning(int numWorkers, int rblkID, MatrixBlock features, MatrixBlock labels) { + List<Tuple2<Integer, Tuple2<Long, MatrixBlock>>> pfs = partition(rblkID, features); + List<Tuple2<Integer, Tuple2<Long, MatrixBlock>>> pls = partition(rblkID, labels); + return new Result(pfs, pls); + } + + private List<Tuple2<Integer, Tuple2<Long, MatrixBlock>>> partition(int rblkID, MatrixBlock mb) { + MatrixBlock partialPerm = _globalPerms.get(0).getBlock(rblkID, 1); + + // For each row, find out the shifted place + return IntStream.range(0, mb.getNumRows()).mapToObj(r -> { + MatrixBlock rowMB = ParamservUtils.sliceMatrixBlock(mb, r + 1, r + 1); + long shiftedPosition = (long) partialPerm.getValue(r, 0); + + // Get the shifted block and position + int shiftedBlkID = (int) (shiftedPosition / OptimizerUtils.DEFAULT_BLOCKSIZE + 1); + + MatrixBlock indicator = _workerIndicator.getBlock(shiftedBlkID, 1); + int workerID = (int) indicator.getValue((int) shiftedPosition / OptimizerUtils.DEFAULT_BLOCKSIZE, 0); + return new Tuple2<>(workerID, new Tuple2<>(shiftedPosition, rowMB)); + }).collect(Collectors.toList()); + } + +} http://git-wip-us.apache.org/repos/asf/systemml/blob/8def2040/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/spark/DataPartitionSparkScheme.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/spark/DataPartitionSparkScheme.java b/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/spark/DataPartitionSparkScheme.java new file mode 100644 index 0000000..9875dd2 --- /dev/null +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/spark/DataPartitionSparkScheme.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysml.runtime.controlprogram.paramserv.spark; + +import java.io.Serializable; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.LongStream; + +import org.apache.sysml.hops.OptimizerUtils; +import org.apache.sysml.runtime.controlprogram.paramserv.ParamservUtils; +import org.apache.sysml.runtime.instructions.spark.data.PartitionedBroadcast; +import org.apache.sysml.runtime.matrix.data.MatrixBlock; + +import scala.Tuple2; + +public abstract class DataPartitionSparkScheme implements Serializable { + + protected final class Result { + protected final List<Tuple2<Integer, Tuple2<Long, MatrixBlock>>> pFeatures; // WorkerID => (rowID, matrix) + protected final List<Tuple2<Integer, Tuple2<Long, MatrixBlock>>> pLabels; + + protected Result(List<Tuple2<Integer, Tuple2<Long, MatrixBlock>>> pFeatures, List<Tuple2<Integer, Tuple2<Long, MatrixBlock>>> pLabels) { + this.pFeatures = pFeatures; + this.pLabels = pLabels; + } + } + + private static final long serialVersionUID = -3462829818083371171L; + + protected List<PartitionedBroadcast<MatrixBlock>> _globalPerms; // a list of global permutations + protected PartitionedBroadcast<MatrixBlock> _workerIndicator; // a matrix indicating to which worker the given row belongs + + protected void setGlobalPermutation(List<PartitionedBroadcast<MatrixBlock>> gps) { + _globalPerms = gps; + } + + protected void setWorkerIndicator(PartitionedBroadcast<MatrixBlock> wi) { + _workerIndicator = wi; + } + + /** + * Do non-reshuffled data partitioning according to worker indicator + * @param rblkID row block ID + * @param mb Matrix + * @return list of tuple (workerID, (row block ID, matrix row)) + */ + protected List<Tuple2<Integer, Tuple2<Long, MatrixBlock>>> nonShuffledPartition(int rblkID, MatrixBlock mb) { + MatrixBlock indicator = _workerIndicator.getBlock(rblkID, 1); + return LongStream.range(0, mb.getNumRows()).mapToObj(r -> { + int workerID = (int) indicator.getValue((int) r, 0); + MatrixBlock rowMB = ParamservUtils.sliceMatrixBlock(mb, r + 1, r + 1); + long shiftedPosition = r + (rblkID - 1) * OptimizerUtils.DEFAULT_BLOCKSIZE; + return new Tuple2<>(workerID, new Tuple2<>(shiftedPosition, rowMB)); + }).collect(Collectors.toList()); + } + + public abstract Result doPartitioning(int numWorkers, int rblkID, MatrixBlock features, MatrixBlock labels); +} http://git-wip-us.apache.org/repos/asf/systemml/blob/8def2040/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/spark/DataPartitionerSparkAggregator.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/spark/DataPartitionerSparkAggregator.java b/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/spark/DataPartitionerSparkAggregator.java new file mode 100644 index 0000000..39b8adf --- /dev/null +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/spark/DataPartitionerSparkAggregator.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysml.runtime.controlprogram.paramserv.spark; + +import java.io.Serializable; +import java.util.LinkedList; + +import org.apache.spark.api.java.function.PairFunction; +import org.apache.sysml.runtime.controlprogram.caching.MatrixObject; +import org.apache.sysml.runtime.matrix.data.MatrixBlock; + +import scala.Tuple2; + +public class DataPartitionerSparkAggregator implements PairFunction<Tuple2<Integer,LinkedList<Tuple2<Long,Tuple2<MatrixBlock,MatrixBlock>>>>, Integer, Tuple2<MatrixBlock, MatrixBlock>>, Serializable { + + private static final long serialVersionUID = -1245300852709085117L; + private long _fcol; + private long _lcol; + + public DataPartitionerSparkAggregator() { + + } + + public DataPartitionerSparkAggregator(long fcol, long lcol) { + _fcol = fcol; + _lcol = lcol; + } + + /** + * Row-wise combine the matrix + * @param input workerID => ordered list [(rowBlockID, (features, labels))] + * @return workerID => [(features, labels)] + * @throws Exception Some exception + */ + @Override + public Tuple2<Integer, Tuple2<MatrixBlock, MatrixBlock>> call(Tuple2<Integer, LinkedList<Tuple2<Long, Tuple2<MatrixBlock, MatrixBlock>>>> input) throws Exception { + MatrixBlock fmb = new MatrixBlock(input._2.size(), (int) _fcol, false); + MatrixBlock lmb = new MatrixBlock(input._2.size(), (int) _lcol, false); + + for (int i = 0; i < input._2.size(); i++) { + MatrixBlock tmpFMB = input._2.get(i)._2._1; + MatrixBlock tmpLMB = input._2.get(i)._2._2; + // Row-wise aggregation + fmb = fmb.leftIndexingOperations(tmpFMB, i, i, 0, (int) _fcol - 1, fmb, MatrixObject.UpdateType.INPLACE_PINNED); + lmb = lmb.leftIndexingOperations(tmpLMB, i, i, 0, (int) _lcol - 1, lmb, MatrixObject.UpdateType.INPLACE_PINNED); + } + return new Tuple2<>(input._1, new Tuple2<>(fmb, lmb)); + } +} http://git-wip-us.apache.org/repos/asf/systemml/blob/8def2040/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/spark/DataPartitionerSparkMapper.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/spark/DataPartitionerSparkMapper.java b/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/spark/DataPartitionerSparkMapper.java new file mode 100644 index 0000000..2a69986 --- /dev/null +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/spark/DataPartitionerSparkMapper.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysml.runtime.controlprogram.paramserv.spark; + +import java.io.Serializable; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; + +import org.apache.spark.api.java.function.PairFlatMapFunction; +import org.apache.sysml.parser.Statement; +import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext; +import org.apache.sysml.runtime.matrix.data.MatrixBlock; + +import scala.Tuple2; + +public class DataPartitionerSparkMapper implements PairFlatMapFunction<Tuple2<Long, Tuple2<MatrixBlock, MatrixBlock>>, Integer, Tuple2<Long, Tuple2<MatrixBlock, MatrixBlock>>>, Serializable { + + private static final long serialVersionUID = 1710721606050403296L; + private int _workersNum; + + private SparkDataPartitioner _dp; + + protected DataPartitionerSparkMapper() { + // No-args constructor used for deserialization + } + + public DataPartitionerSparkMapper(Statement.PSScheme scheme, int workersNum, SparkExecutionContext sec, int numEntries) { + _workersNum = workersNum; + _dp = new SparkDataPartitioner(scheme, sec, numEntries, workersNum); + } + + /** + * Do data partitioning + * @param input RowBlockID => (features, labels) + * @return WorkerID => (rowBlockID, (single row features, single row labels)) + * @throws Exception Some exception + */ + @Override + public Iterator<Tuple2<Integer, Tuple2<Long, Tuple2<MatrixBlock, MatrixBlock>>>> call(Tuple2<Long,Tuple2<MatrixBlock,MatrixBlock>> input) + throws Exception { + List<Tuple2<Integer, Tuple2<Long,Tuple2<MatrixBlock,MatrixBlock>>>> partitions = new LinkedList<>(); + MatrixBlock features = input._2._1; + MatrixBlock labels = input._2._2; + DataPartitionSparkScheme.Result result = _dp.doPartitioning(_workersNum, features, labels, input._1); + for (int i = 0; i < result.pFeatures.size(); i++) { + Tuple2<Integer, Tuple2<Long, MatrixBlock>> ft = result.pFeatures.get(i); + Tuple2<Integer, Tuple2<Long, MatrixBlock>> lt = result.pLabels.get(i); + partitions.add(new Tuple2<>(ft._1, new Tuple2<>(ft._2._1, new Tuple2<>(ft._2._2, lt._2._2)))); + } + return partitions.iterator(); + } +} http://git-wip-us.apache.org/repos/asf/systemml/blob/8def2040/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/spark/ORSparkScheme.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/spark/ORSparkScheme.java b/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/spark/ORSparkScheme.java new file mode 100644 index 0000000..16ce516 --- /dev/null +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/spark/ORSparkScheme.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysml.runtime.controlprogram.paramserv.spark; + +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import org.apache.sysml.runtime.controlprogram.paramserv.ParamservUtils; +import org.apache.sysml.runtime.matrix.data.MatrixBlock; + +import scala.Tuple2; + +/** + * Spark data partitioner Overlap_Reshuffle: + * + */ +public class ORSparkScheme extends DataPartitionSparkScheme { + + private static final long serialVersionUID = 6867567406403580311L; + + protected ORSparkScheme() { + // No-args constructor used for deserialization + } + + @Override + public Result doPartitioning(int numWorkers, int rblkID, MatrixBlock features, MatrixBlock labels) { + List<Tuple2<Integer, Tuple2<Long, MatrixBlock>>> pfs = partition(numWorkers, rblkID, features); + List<Tuple2<Integer, Tuple2<Long, MatrixBlock>>> pls = partition(numWorkers, rblkID, labels); + return new Result(pfs, pls); + } + + private List<Tuple2<Integer, Tuple2<Long, MatrixBlock>>> partition(int numWorkers, int rblkID, MatrixBlock mb) { + return IntStream.range(0, numWorkers).mapToObj(i -> i).flatMap(workerID -> { + MatrixBlock partialPerm = _globalPerms.get(workerID).getBlock(rblkID, 1); + return IntStream.range(0, mb.getNumRows()).mapToObj(r -> { + MatrixBlock rowMB = ParamservUtils.sliceMatrixBlock(mb, r + 1, r + 1); + long shiftedPosition = (long) partialPerm.getValue(r, 0); + return new Tuple2<>(workerID, new Tuple2<>(shiftedPosition, rowMB)); + }); + }).collect(Collectors.toList()); + } +} http://git-wip-us.apache.org/repos/asf/systemml/blob/8def2040/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/spark/SparkDataPartitioner.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/spark/SparkDataPartitioner.java b/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/spark/SparkDataPartitioner.java new file mode 100644 index 0000000..6883d0f --- /dev/null +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/spark/SparkDataPartitioner.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysml.runtime.controlprogram.paramserv.spark; + +import static org.apache.sysml.runtime.controlprogram.paramserv.ParamservUtils.SEED; + +import java.io.Serializable; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import org.apache.sysml.parser.Statement; +import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext; +import org.apache.sysml.runtime.controlprogram.paramserv.ParamservUtils; +import org.apache.sysml.runtime.instructions.spark.data.PartitionedBroadcast; +import org.apache.sysml.runtime.matrix.data.MatrixBlock; +import org.apache.sysml.runtime.util.DataConverter; + +public class SparkDataPartitioner implements Serializable { + + private static final long serialVersionUID = 6841548626711057448L; + private DataPartitionSparkScheme _scheme; + + protected SparkDataPartitioner(Statement.PSScheme scheme, SparkExecutionContext sec, int numEntries, int numWorkers) { + switch (scheme) { + case DISJOINT_CONTIGUOUS: + _scheme = new DCSparkScheme(); + // Create the worker id indicator + createDCIndicator(sec, numWorkers, numEntries); + break; + case DISJOINT_ROUND_ROBIN: + _scheme = new DRRSparkScheme(); + // Create the worker id indicator + createDRIndicator(sec, numWorkers, numEntries); + break; + case DISJOINT_RANDOM: + _scheme = new DRSparkScheme(); + // Create the global permutation + createGlobalPermutations(sec, numEntries, 1); + // Create the worker id indicator + createDCIndicator(sec, numWorkers, numEntries); + break; + case OVERLAP_RESHUFFLE: + _scheme = new ORSparkScheme(); + // Create the global permutation seperately for each worker + createGlobalPermutations(sec, numEntries, numWorkers); + break; + } + } + + private void createDRIndicator(SparkExecutionContext sec, int numWorkers, int numEntries) { + double[] vector = IntStream.range(0, numEntries).mapToDouble(n -> n % numWorkers).toArray(); + MatrixBlock vectorMB = DataConverter.convertToMatrixBlock(vector, true); + _scheme.setWorkerIndicator(sec.getBroadcastForMatrixObject(ParamservUtils.newMatrixObject(vectorMB))); + } + + private void createDCIndicator(SparkExecutionContext sec, int numWorkers, int numEntries) { + double[] vector = new double[numEntries]; + int batchSize = (int) Math.ceil((double) numEntries / numWorkers); + for (int i = 1; i < numWorkers; i++) { + int begin = batchSize * i; + int end = Math.min(begin + batchSize, numEntries); + Arrays.fill(vector, begin, end, i); + } + MatrixBlock vectorMB = DataConverter.convertToMatrixBlock(vector, true); + _scheme.setWorkerIndicator(sec.getBroadcastForMatrixObject(ParamservUtils.newMatrixObject(vectorMB))); + } + + private void createGlobalPermutations(SparkExecutionContext sec, int numEntries, int numPerm) { + List<PartitionedBroadcast<MatrixBlock>> perms = IntStream.range(0, numPerm).mapToObj(i -> { + MatrixBlock perm = MatrixBlock.sampleOperations(numEntries, numEntries, false, SEED+i); + // Create the source-target id vector from the permutation ranging from 1 to number of entries + double[] vector = new double[numEntries]; + for (int j = 0; j < perm.getDenseBlockValues().length; j++) { + vector[(int) perm.getDenseBlockValues()[j] - 1] = j; + } + MatrixBlock vectorMB = DataConverter.convertToMatrixBlock(vector, true); + return sec.getBroadcastForMatrixObject(ParamservUtils.newMatrixObject(vectorMB)); + }).collect(Collectors.toList()); + _scheme.setGlobalPermutation(perms); + } + + public DataPartitionSparkScheme.Result doPartitioning(int numWorkers, MatrixBlock features, MatrixBlock labels, + long rowID) { + // Set the rowID in order to get the according permutation + return _scheme.doPartitioning(numWorkers, (int) rowID, features, labels); + } +} http://git-wip-us.apache.org/repos/asf/systemml/blob/8def2040/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/spark/SparkPSWorker.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/spark/SparkPSWorker.java b/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/spark/SparkPSWorker.java new file mode 100644 index 0000000..69da56c --- /dev/null +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/paramserv/spark/SparkPSWorker.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysml.runtime.controlprogram.paramserv.spark; + +import java.io.Serializable; + +import org.apache.spark.api.java.function.VoidFunction; +import org.apache.sysml.parser.Statement; +import org.apache.sysml.runtime.controlprogram.caching.MatrixObject; +import org.apache.sysml.runtime.controlprogram.context.ExecutionContext; +import org.apache.sysml.runtime.controlprogram.paramserv.ParamServer; +import org.apache.sysml.runtime.matrix.data.MatrixBlock; + +import scala.Tuple2; + +public class SparkPSWorker implements VoidFunction<Tuple2<Integer, Tuple2<MatrixBlock, MatrixBlock>>>, Serializable { + + private static final long serialVersionUID = -8674739573419648732L; + + public SparkPSWorker() { + // No-args constructor used for deserialization + } + + public SparkPSWorker(String updFunc, Statement.PSFrequency freq, int epochs, long batchSize, + MatrixObject valFeatures, MatrixObject valLabels, ExecutionContext ec, ParamServer ps) { + } + + @Override + public void call(Tuple2<Integer, Tuple2<MatrixBlock, MatrixBlock>> input) throws Exception { + } +} http://git-wip-us.apache.org/repos/asf/systemml/blob/8def2040/src/main/java/org/apache/sysml/runtime/instructions/cp/ParamservBuiltinCPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/cp/ParamservBuiltinCPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/cp/ParamservBuiltinCPInstruction.java index 25bc113..3d61625 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/cp/ParamservBuiltinCPInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/cp/ParamservBuiltinCPInstruction.java @@ -58,15 +58,14 @@ import org.apache.sysml.runtime.DMLRuntimeException; import org.apache.sysml.runtime.controlprogram.LocalVariableMap; import org.apache.sysml.runtime.controlprogram.caching.MatrixObject; import org.apache.sysml.runtime.controlprogram.context.ExecutionContext; +import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext; +import org.apache.sysml.runtime.controlprogram.paramserv.DataPartitionScheme; import org.apache.sysml.runtime.controlprogram.paramserv.DataPartitioner; -import org.apache.sysml.runtime.controlprogram.paramserv.DataPartitionerDC; -import org.apache.sysml.runtime.controlprogram.paramserv.DataPartitionerDR; -import org.apache.sysml.runtime.controlprogram.paramserv.DataPartitionerDRR; -import org.apache.sysml.runtime.controlprogram.paramserv.DataPartitionerOR; import org.apache.sysml.runtime.controlprogram.paramserv.LocalPSWorker; import org.apache.sysml.runtime.controlprogram.paramserv.LocalParamServer; import org.apache.sysml.runtime.controlprogram.paramserv.ParamServer; import org.apache.sysml.runtime.controlprogram.paramserv.ParamservUtils; +import org.apache.sysml.runtime.controlprogram.paramserv.spark.SparkPSWorker; import org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer; import org.apache.sysml.runtime.controlprogram.parfor.stat.Timing; import org.apache.sysml.runtime.matrix.operators.Operator; @@ -97,9 +96,32 @@ public class ParamservBuiltinCPInstruction extends ParameterizedBuiltinCPInstruc @Override public void processInstruction(ExecutionContext ec) { - Timing tSetup = DMLScript.STATISTICS ? new Timing(true) : null; - PSModeType mode = getPSMode(); + switch (mode) { + case LOCAL: + runLocally(ec, mode); + break; + case REMOTE_SPARK: + runOnSpark((SparkExecutionContext) ec, mode); + break; + } + } + + private void runOnSpark(SparkExecutionContext sec, PSModeType mode) { + PSScheme scheme = getScheme(); + int workerNum = getWorkerNum(mode); + + MatrixObject features = sec.getMatrixObject(getParam(PS_FEATURES)); + MatrixObject labels = sec.getMatrixObject(getParam(PS_LABELS)); + + SparkPSWorker worker = new SparkPSWorker(); + ParamservUtils.doPartitionOnSpark(sec, features, labels, scheme, workerNum) // Do data partitioning + .foreach(worker); // Run remote workers + + } + + private void runLocally(ExecutionContext ec, PSModeType mode) { + Timing tSetup = DMLScript.STATISTICS ? new Timing(true) : null; int workerNum = getWorkerNum(mode); BasicThreadFactory factory = new BasicThreadFactory.Builder() .namingPattern("workers-pool-thread-%d").build(); @@ -132,13 +154,13 @@ public class ParamservBuiltinCPInstruction extends ParameterizedBuiltinCPInstruc MatrixObject valFeatures = ec.getMatrixObject(getParam(PS_VAL_FEATURES)); MatrixObject valLabels = ec.getMatrixObject(getParam(PS_VAL_LABELS)); List<LocalPSWorker> workers = IntStream.range(0, workerNum) - .mapToObj(i -> new LocalPSWorker(i, updFunc, freq, epochs, getBatchSize(), valFeatures, valLabels, workerECs.get(i), ps)) - .collect(Collectors.toList()); + .mapToObj(i -> new LocalPSWorker(i, updFunc, freq, epochs, getBatchSize(), valFeatures, valLabels, workerECs.get(i), ps)) + .collect(Collectors.toList()); // Do data partition PSScheme scheme = getScheme(); - doDataPartitioning(scheme, ec, workers); - + partitionLocally(scheme, ec, workers); + if (DMLScript.STATISTICS) Statistics.accPSSetupTime((long) tSetup.stop()); @@ -180,8 +202,6 @@ public class ParamservBuiltinCPInstruction extends ParameterizedBuiltinCPInstruc } catch (IllegalArgumentException e) { throw new DMLRuntimeException(String.format("Paramserv function: not support ps execution mode '%s'", getParam(PS_MODE))); } - if( mode == PSModeType.REMOTE_SPARK ) - throw new DMLRuntimeException("Do not support remote spark."); return mode; } @@ -236,12 +256,13 @@ public class ParamservBuiltinCPInstruction extends ParameterizedBuiltinCPInstruc switch (mode) { case LOCAL: // default worker number: available cores - 1 (assign one process for agg service) - int workerNum = getRemainingCores(); - if (getParameterMap().containsKey(PS_PARALLELISM)) - workerNum = Integer.valueOf(getParam(PS_PARALLELISM)); - return workerNum; + return getParameterMap().containsKey(PS_PARALLELISM) ? + Integer.valueOf(getParam(PS_PARALLELISM)) : getRemainingCores(); + case REMOTE_SPARK: + return getParameterMap().containsKey(PS_PARALLELISM) ? + Integer.valueOf(getParam(PS_PARALLELISM)) : SparkExecutionContext.getDefaultParallelism(true); default: - throw new DMLRuntimeException("Unsupported parameter server: "+mode.name()); + throw new DMLRuntimeException("Unsupported parameter server: " + mode.name()); } } @@ -279,22 +300,24 @@ public class ParamservBuiltinCPInstruction extends ParameterizedBuiltinCPInstruc return hyperparams; } - private void doDataPartitioning(PSScheme scheme, ExecutionContext ec, List<LocalPSWorker> workers) { + private void partitionLocally(PSScheme scheme, ExecutionContext ec, List<LocalPSWorker> workers) { MatrixObject features = ec.getMatrixObject(getParam(PS_FEATURES)); MatrixObject labels = ec.getMatrixObject(getParam(PS_LABELS)); - switch (scheme) { - case DISJOINT_CONTIGUOUS: - doDataPartitioning(new DataPartitionerDC(), features, labels, workers); - break; - case DISJOINT_ROUND_ROBIN: - doDataPartitioning(new DataPartitionerDRR(), features, labels, workers); - break; - case DISJOINT_RANDOM: - doDataPartitioning(new DataPartitionerDR(), features, labels, workers); - break; - case OVERLAP_RESHUFFLE: - doDataPartitioning(new DataPartitionerOR(), features, labels, workers); - break; + DataPartitionScheme.Result result = new DataPartitioner(scheme).doPartitioning(workers.size(), features.acquireRead(), labels.acquireRead()); + features.release(); + labels.release(); + List<MatrixObject> pfs = result.pFeatures; + List<MatrixObject> pls = result.pLabels; + if (pfs.size() < workers.size()) { + if (LOG.isWarnEnabled()) { + LOG.warn(String.format("There is only %d batches of data but has %d workers. " + + "Hence, reset the number of workers with %d.", pfs.size(), workers.size(), pfs.size())); + } + workers = workers.subList(0, pfs.size()); + } + for (int i = 0; i < workers.size(); i++) { + workers.get(i).setFeatures(pfs.get(i)); + workers.get(i).setLabels(pls.get(i)); } } @@ -310,20 +333,4 @@ public class ParamservBuiltinCPInstruction extends ParameterizedBuiltinCPInstruc return scheme; } - private void doDataPartitioning(DataPartitioner dp, MatrixObject features, MatrixObject labels, List<LocalPSWorker> workers) { - DataPartitioner.Result result = dp.doPartitioning(workers.size(), features, labels); - List<MatrixObject> pfs = result.pFeatures; - List<MatrixObject> pls = result.pLabels; - if (pfs.size() < workers.size()) { - if (LOG.isWarnEnabled()) { - LOG.warn(String.format("There is only %d batches of data but has %d workers. " - + "Hence, reset the number of workers with %d.", pfs.size(), workers.size(), pfs.size())); - } - workers = workers.subList(0, pfs.size()); - } - for (int i = 0; i < workers.size(); i++) { - workers.get(i).setFeatures(pfs.get(i)); - workers.get(i).setLabels(pls.get(i)); - } - } } http://git-wip-us.apache.org/repos/asf/systemml/blob/8def2040/src/test/java/org/apache/sysml/test/integration/functions/paramserv/BaseDataPartitionerTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/sysml/test/integration/functions/paramserv/BaseDataPartitionerTest.java b/src/test/java/org/apache/sysml/test/integration/functions/paramserv/BaseDataPartitionerTest.java new file mode 100644 index 0000000..0092aed --- /dev/null +++ b/src/test/java/org/apache/sysml/test/integration/functions/paramserv/BaseDataPartitionerTest.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysml.test.integration.functions.paramserv; + +import java.util.stream.IntStream; + +import org.apache.sysml.parser.Statement; +import org.apache.sysml.runtime.controlprogram.paramserv.DataPartitionScheme; +import org.apache.sysml.runtime.controlprogram.paramserv.DataPartitioner; +import org.apache.sysml.runtime.controlprogram.paramserv.ParamservUtils; +import org.apache.sysml.runtime.matrix.data.MatrixBlock; +import org.apache.sysml.runtime.util.DataConverter; + +public abstract class BaseDataPartitionerTest { + + protected static final int ROW_SIZE = 4000; + protected static final int COL_SIZE = 2000; + protected static final int WORKER_NUM = 4; + + protected MatrixBlock[] generateData() { + double[][] df = new double[BaseDataPartitionerTest.ROW_SIZE][BaseDataPartitionerTest.COL_SIZE]; + for (int i = 0; i < BaseDataPartitionerTest.ROW_SIZE; i++) { + for (int j = 0; j < BaseDataPartitionerTest.COL_SIZE; j++) { + df[i][j] = i * BaseDataPartitionerTest.COL_SIZE + j; + } + } + double[] dl = new double[BaseDataPartitionerTest.ROW_SIZE]; + for (int i = 0; i < BaseDataPartitionerTest.ROW_SIZE; i++) { + dl[i] = i; + } + MatrixBlock fmb = DataConverter.convertToMatrixBlock(df); + MatrixBlock lmb = DataConverter.convertToMatrixBlock(dl, true); + return new MatrixBlock[] { fmb, lmb }; + } + + protected double[] generateExpectedData(int from, int to) { + return IntStream.range(from, to).mapToDouble(i -> (double) i).toArray(); + } + + protected DataPartitionScheme.Result launchLocalDataPartitionerDC() { + DataPartitioner dp = new DataPartitioner(Statement.PSScheme.DISJOINT_CONTIGUOUS); + MatrixBlock[] mbs = generateData(); + return dp.doPartitioning(WORKER_NUM, mbs[0], mbs[1]); + } + + protected DataPartitionScheme.Result launchLocalDataPartitionerDR(MatrixBlock[] mbs) { + ParamservUtils.SEED = System.nanoTime(); + DataPartitioner dp = new DataPartitioner(Statement.PSScheme.DISJOINT_RANDOM); + return dp.doPartitioning(WORKER_NUM, mbs[0], mbs[1]); + } + + protected DataPartitionScheme.Result launchLocalDataPartitionerDRR() { + DataPartitioner dp = new DataPartitioner(Statement.PSScheme.DISJOINT_ROUND_ROBIN); + MatrixBlock[] mbs = generateData(); + return dp.doPartitioning(WORKER_NUM, mbs[0], mbs[1]); + } + + protected DataPartitionScheme.Result launchLocalDataPartitionerOR() { + DataPartitioner dp = new DataPartitioner(Statement.PSScheme.OVERLAP_RESHUFFLE); + MatrixBlock[] mbs = generateData(); + return dp.doPartitioning(WORKER_NUM, mbs[0], mbs[1]); + } +}
