MAHOUT-1816:Implement newRowCardinality in CheckpointedFlinkDrm, this closes apache/mahout#199
Project: http://git-wip-us.apache.org/repos/asf/mahout/repo Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/9bf5292f Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/9bf5292f Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/9bf5292f Branch: refs/heads/master Commit: 9bf5292fd7ae8bacf027378d4b692f4e70aa46e2 Parents: f4f42ae Author: smarthi <[email protected]> Authored: Mon Mar 21 22:59:11 2016 -0400 Committer: smarthi <[email protected]> Committed: Mon Mar 21 22:59:11 2016 -0400 ---------------------------------------------------------------------- .../mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala | 10 ++++++---- .../mahout/sparkbindings/drm/CheckpointedDrmSpark.scala | 4 ++-- 2 files changed, 8 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mahout/blob/9bf5292f/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala ---------------------------------------------------------------------- diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala index 6f1ba9f..a6b267b 100644 --- a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala +++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala @@ -192,15 +192,17 @@ class CheckpointedFlinkDrm[K: ClassTag](val ds: DrmDataSet[K], (x: K) => new Text(x.asInstanceOf[String]) } else if (keyTag.runtimeClass == classOf[Long]) { (x: K) => new LongWritable(x.asInstanceOf[Long]) - // WritableTypeInfo will reject the base Writable class -// } else if (classOf[Writable].isAssignableFrom(keyTag.runtimeClass)) { -// (x: K) => x.asInstanceOf[Writable] } else { throw new IllegalArgumentException("Do not know how to convert class tag %s to Writable.".format(keyTag)) } } - def newRowCardinality(n: Int): CheckpointedDrm[K] = ??? + def newRowCardinality(n: Int): CheckpointedDrm[K] = { + assert(n > -1) + assert(n >= nrow) + new CheckpointedFlinkDrm(ds = ds, _nrow = n, _ncol = _ncol, cacheHint = cacheHint, + partitioningTag = partitioningTag, _canHaveMissingRows = _canHaveMissingRows) + } override val context: DistributedContext = ds.getExecutionEnvironment http://git-wip-us.apache.org/repos/asf/mahout/blob/9bf5292f/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala index 71755c5..ff150a1 100644 --- a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala +++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala @@ -155,7 +155,7 @@ class CheckpointedDrmSpark[K: ClassTag]( /** * Dump matrix as computed Mahout's DRM into specified (HD)FS path * - * @param path + * @param path output path to dump Matrix to */ def dfsWrite(path: String) = { val ktag = implicitly[ClassTag[K]] @@ -201,7 +201,7 @@ class CheckpointedDrmSpark[K: ClassTag]( rddInput.isBlockified match { case true â rddInput.asBlockified(throw new AssertionError("not reached")) .map(_._2.ncol).reduce(max) - case false â cache().rddInput.asRowWise().map(_._2.length).fold(-1)(max(_, _)) + case false â cache().rddInput.asRowWise().map(_._2.length).fold(-1)(max) } }
