MAHOUT-1570: rebased to latest upstream
Project: http://git-wip-us.apache.org/repos/asf/mahout/repo Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/8de8b798 Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/8de8b798 Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/8de8b798 Branch: refs/heads/flink-binding Commit: 8de8b798f0ffa374c2b18e00b8130ac0a0d8e918 Parents: 08ad113 Author: Alexey Grigorev <[email protected]> Authored: Wed Jun 24 14:30:58 2015 +0200 Committer: Alexey Grigorev <[email protected]> Committed: Fri Sep 25 17:41:53 2015 +0200 ---------------------------------------------------------------------- .../mahout/flinkbindings/FlinkByteBCast.scala | 4 +++ .../mahout/flinkbindings/FlinkEngine.scala | 19 ++++++++++++++ .../apache/mahout/flinkbindings/package.scala | 26 ++++++++++++-------- .../flinkbindings/DistributedFlinkSuit.scala | 10 +++++--- 4 files changed, 45 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mahout/blob/8de8b798/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkByteBCast.scala ---------------------------------------------------------------------- diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkByteBCast.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkByteBCast.scala index 70d0545..1024452 100644 --- a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkByteBCast.scala +++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkByteBCast.scala @@ -55,6 +55,10 @@ class FlinkByteBCast[T](private val arr: Array[Byte]) extends BCast[T] with Seri override def value: T = _value + override def close: Unit = { + // nothing to close + } + } object FlinkByteBCast { http://git-wip-us.apache.org/repos/asf/mahout/blob/8de8b798/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala ---------------------------------------------------------------------- diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala index 18e17db..5039d21 100644 --- a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala +++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala @@ -235,4 +235,23 @@ object FlinkEngine extends DistributedEngine { /** Creates empty DRM with non-trivial height */ override def drmParallelizeEmptyLong(nrow: Long, ncol: Int, numPartitions: Int = 10) (implicit sc: DistributedContext): CheckpointedDrm[Long] = ??? + + + /** + * Convert non-int-keyed matrix to an int-keyed, computing optionally mapping from old keys + * to row indices in the new one. The mapping, if requested, is returned as a 1-column matrix. + */ + def drm2IntKeyed[K: ClassTag](drmX: DrmLike[K], computeMap: Boolean = false): + (DrmLike[Int], Option[DrmLike[K]]) = ??? + + /** + * (Optional) Sampling operation. Consistent with Spark semantics of the same. + */ + def drmSampleRows[K: ClassTag](drmX: DrmLike[K], fraction: Double, replacement: Boolean = false): DrmLike[K] = ??? + + def drmSampleKRows[K: ClassTag](drmX: DrmLike[K], numSamples:Int, replacement: Boolean = false): Matrix = ??? + + /** Optional engine-specific all reduce tensor operation. */ + def allreduceBlock[K: ClassTag](drm: CheckpointedDrm[K], bmf: BlockMapFunc2[K], rf: BlockReduceFunc): Matrix = ??? + } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/mahout/blob/8de8b798/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala ---------------------------------------------------------------------- diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala index e46e605..955d8b1 100644 --- a/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala +++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala @@ -18,24 +18,30 @@ */ package org.apache.mahout +import scala.Array.canBuildFrom import scala.reflect.ClassTag -import org.slf4j.LoggerFactory + +import org.apache.flink.api.common.functions.FilterFunction +import org.apache.flink.api.common.functions.MapFunction import org.apache.flink.api.java.DataSet import org.apache.flink.api.java.ExecutionEnvironment -import org.apache.flink.api.common.functions.MapFunction -import org.apache.mahout.math.Vector +import org.apache.mahout.flinkbindings._ +import org.apache.mahout.flinkbindings.FlinkDistributedContext +import org.apache.mahout.flinkbindings.drm.CheckpointedFlinkDrm +import org.apache.mahout.flinkbindings.drm.FlinkDrm +import org.apache.mahout.flinkbindings.drm.RowsFlinkDrm +import org.apache.mahout.math._ import org.apache.mahout.math.DenseVector import org.apache.mahout.math.Matrix import org.apache.mahout.math.MatrixWritable +import org.apache.mahout.math.Vector import org.apache.mahout.math.VectorWritable import org.apache.mahout.math.drm._ -import org.apache.mahout.math.scalabindings._ -import org.apache.mahout.flinkbindings.FlinkDistributedContext -import org.apache.mahout.flinkbindings.drm.FlinkDrm -import org.apache.mahout.flinkbindings.drm.BlockifiedFlinkDrm -import org.apache.mahout.flinkbindings.drm.RowsFlinkDrm -import org.apache.mahout.flinkbindings.drm.CheckpointedFlinkDrm -import org.apache.flink.api.common.functions.FilterFunction +import org.apache.mahout.math.drm.BlockifiedDrmTuple +import org.apache.mahout.math.drm.CheckpointedDrm +import org.apache.mahout.math.drm.DistributedContext +import org.apache.mahout.math.drm.DrmTuple +import org.slf4j.LoggerFactory package object flinkbindings { http://git-wip-us.apache.org/repos/asf/mahout/blob/8de8b798/flink/src/test/scala/org/apache/mahout/flinkbindings/DistributedFlinkSuit.scala ---------------------------------------------------------------------- diff --git a/flink/src/test/scala/org/apache/mahout/flinkbindings/DistributedFlinkSuit.scala b/flink/src/test/scala/org/apache/mahout/flinkbindings/DistributedFlinkSuit.scala index 2295c26..126a8f4 100644 --- a/flink/src/test/scala/org/apache/mahout/flinkbindings/DistributedFlinkSuit.scala +++ b/flink/src/test/scala/org/apache/mahout/flinkbindings/DistributedFlinkSuit.scala @@ -18,19 +18,21 @@ */ package org.apache.mahout.flinkbindings +import org.apache.flink.api.java.ExecutionEnvironment +import org.apache.mahout.flinkbindings._ +import org.apache.mahout.math.drm.DistributedContext import org.apache.mahout.test.DistributedMahoutSuite import org.scalatest.Suite -import org.apache.mahout.math.drm.DistributedContext -import org.apache.flink.api.java.ExecutionEnvironment + trait DistributedFlinkSuit extends DistributedMahoutSuite { this: Suite => protected implicit var mahoutCtx: DistributedContext = _ protected var env: ExecutionEnvironment = null - + def initContext() { env = ExecutionEnvironment.getExecutionEnvironment - mahoutCtx = env + mahoutCtx = wrapContext(env) } override def beforeEach() {
