comment out parallization settting in cache()
Project: http://git-wip-us.apache.org/repos/asf/mahout/repo Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/48a8a820 Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/48a8a820 Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/48a8a820 Branch: refs/heads/master Commit: 48a8a8208322ed690d5356a4e0cac7667b080bab Parents: e420485 Author: Andrew Palumbo <[email protected]> Authored: Sun Mar 27 15:16:36 2016 -0400 Committer: Andrew Palumbo <[email protected]> Committed: Sun Mar 27 15:16:36 2016 -0400 ---------------------------------------------------------------------- .../drm/CheckpointedFlinkDrm.scala | 21 +++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mahout/blob/48a8a820/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala ---------------------------------------------------------------------- diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala index 8424856..e59e5a5 100644 --- a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala +++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala @@ -105,16 +105,23 @@ class CheckpointedFlinkDrm[K: ClassTag:TypeInformation](val ds: DrmDataSet[K], } val _ds = readPersistedDataSet(cacheFileName, ds) - // We may want to look more closely at this: - // since we've cached a drm, triggering a computation - // it may not make sense to keep the same parallelism degree - if (!(parallelismDeg == _ds.getParallelism)) { - _ds.setParallelism(parallelismDeg).rebalance() - } + /** Leave the parallelism degree to be set the operators + * TODO: find out a way to set the parallelism degree based on the + * final drm after computation is actually triggered + * + * // We may want to look more closely at this: + * // since we've cached a drm, triggering a computation + * // it may not make sense to keep the same parallelism degree + * if (!(parallelismDeg == _ds.getParallelism)) { + * _ds.setParallelism(parallelismDeg).rebalance() + * } + * + */ + datasetWrap(_ds) } - def uncache():this.type = { + def uncache(): this.type = { if (isCached) { Hadoop2HDFSUtil.delete(cacheFileName) isCached = false
