MAHOUT-1819:Set the default Parallelism for Flink execution in FlinkDistributedContext, this closes apache/mahout#206
Project: http://git-wip-us.apache.org/repos/asf/mahout/repo Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/f0e22e28 Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/f0e22e28 Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/f0e22e28 Branch: refs/heads/master Commit: f0e22e28c8da53a0c26233afd4d363dcc035ee7d Parents: 99c32f2 Author: smarthi <[email protected]> Authored: Mon Mar 28 17:17:48 2016 -0400 Committer: smarthi <[email protected]> Committed: Mon Mar 28 17:17:48 2016 -0400 ---------------------------------------------------------------------- .../flinkbindings/FlinkDistributedContext.scala | 17 +++++++++++++++++ .../apache/mahout/flinkbindings/FlinkEngine.scala | 17 ++++++++--------- .../mahout/flinkbindings/blas/FlinkOpAx.scala | 3 +-- .../flinkbindings/drm/CheckpointedFlinkDrm.scala | 4 ++-- 4 files changed, 28 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mahout/blob/f0e22e28/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkDistributedContext.scala ---------------------------------------------------------------------- diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkDistributedContext.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkDistributedContext.scala index cfc9209..49dc593 100644 --- a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkDistributedContext.scala +++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkDistributedContext.scala @@ -19,11 +19,28 @@ package org.apache.mahout.flinkbindings import org.apache.flink.api.scala.ExecutionEnvironment +import org.apache.flink.configuration.GlobalConfiguration import org.apache.mahout.math.drm.DistributedContext import org.apache.mahout.math.drm.DistributedEngine class FlinkDistributedContext(val env: ExecutionEnvironment) extends DistributedContext { + val mahoutHome = getMahoutHome() + + GlobalConfiguration.loadConfiguration(mahoutHome + "/conf/flink-config.yaml") + + val conf = GlobalConfiguration.getConfiguration + + var degreeOfParallelism: Int = 0 + + if (conf != null) { + degreeOfParallelism = conf.getInteger("parallelism.default", Runtime.getRuntime.availableProcessors) + } else { + degreeOfParallelism = Runtime.getRuntime.availableProcessors + } + + env.setParallelism(degreeOfParallelism) + val engine: DistributedEngine = FlinkEngine http://git-wip-us.apache.org/repos/asf/mahout/blob/f0e22e28/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala ---------------------------------------------------------------------- diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala index dd28e9d..adff30b 100644 --- a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala +++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala @@ -249,13 +249,13 @@ object FlinkEngine extends DistributedEngine { override def drmBroadcast(v: Vector)(implicit dc: DistributedContext): BCast[Vector] = FlinkByteBCast.wrap(v) - /** Broadcast support */ override def drmBroadcast(m: Matrix)(implicit dc: DistributedContext): BCast[Matrix] = FlinkByteBCast.wrap(m) - /** Parallelize in-core matrix as flink distributed matrix, using row ordinal indices as data set keys. */ + // The 'numPartitions' parameter is not honored in this call, + // as Flink sets a global parallelism in ExecutionEnvironment override def drmParallelizeWithRowIndices(m: Matrix, numPartitions: Int = 1) (implicit dc: DistributedContext): CheckpointedDrm[Int] = { @@ -264,26 +264,25 @@ object FlinkEngine extends DistributedEngine { new CheckpointedFlinkDrm(ds = parallelDrm, _nrow = m.numRows(), _ncol = m.numCols()) } - + // The 'parallelismDegree' parameter is not honored in this call, + // as Flink sets a global parallelism in ExecutionEnvironment private[flinkbindings] def parallelize(m: Matrix, parallelismDegree: Int) (implicit dc: DistributedContext): DrmDataSet[Int] = { val rows = (0 until m.nrow).map(i => (i, m(i, ::))) val dataSetType = TypeExtractor.getForObject(rows.head) - //TODO: Make Sure that this is the correct partitioning scheme - dc.env.fromCollection(rows) - .partitionByRange(0) - .setParallelism(parallelismDegree) - .rebalance() + dc.env.fromCollection(rows).partitionByRange(0) } /** Parallelize in-core matrix as flink distributed matrix, using row labels as a data set keys. */ + // The 'numPartitions' parameter is not honored in this call, + // as Flink sets a global parallelism in ExecutionEnvironment override def drmParallelizeWithRowLabels(m: Matrix, numPartitions: Int = 1) (implicit dc: DistributedContext): CheckpointedDrm[String] = { val rb = m.getRowLabelBindings val p = for (i: String â rb.keySet().toIndexedSeq) yield i â m(rb(i), ::) - new CheckpointedFlinkDrm[String](dc.env.fromCollection(p).setParallelism(numPartitions), + new CheckpointedFlinkDrm[String](dc.env.fromCollection(p), _nrow = m.nrow, _ncol = m.ncol, cacheHint = CacheHint.NONE) } http://git-wip-us.apache.org/repos/asf/mahout/blob/f0e22e28/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAx.scala ---------------------------------------------------------------------- diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAx.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAx.scala index fa649fb..ca43b31 100644 --- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAx.scala +++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAx.scala @@ -87,8 +87,7 @@ object FlinkOpAx { // Convert back to mtx .toColMatrix - // It is ridiculous, but in this scheme we will have to re-parallelize it again in order to plug - // it back as a Flink drm + // This doesn't do anything now val res = FlinkEngine.parallelize(inCoreM, parallelismDegree = 1) new RowsFlinkDrm[Int](res, 1) http://git-wip-us.apache.org/repos/asf/mahout/blob/f0e22e28/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala ---------------------------------------------------------------------- diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala index e59e5a5..794c721 100644 --- a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala +++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala @@ -57,14 +57,14 @@ class CheckpointedFlinkDrm[K: ClassTag:TypeInformation](val ds: DrmDataSet[K], var parallelismDeg: Int = -1 var persistanceRootDir: String = _ - // need to make sure that this is actually getting the correct propertirs for {{taskmanager.tmp.dirs}} + // need to make sure that this is actually getting the correct properties for {{taskmanager.tmp.dirs}} val mahoutHome = getMahoutHome() // this is extra I/O for each cache call. this needs to be moved somewhere where it is called // only once. Possibly FlinkDistributedEngine. GlobalConfiguration.loadConfiguration(mahoutHome + "/conf/flink-config.yaml") - val conf = GlobalConfiguration.getConfiguration() + val conf = GlobalConfiguration.getConfiguration if (!(conf == null )) { persistanceRootDir = conf.getString("taskmanager.tmp.dirs", "/tmp/")
