Major fixes for Flink backend merged
Project: http://git-wip-us.apache.org/repos/asf/mahout/repo Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/072289a4 Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/072289a4 Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/072289a4 Branch: refs/heads/flink-binding Commit: 072289a46c9bd4b7297a17b621f7da30b94df1a7 Parents: 92a2f6c Author: smarthi <[email protected]> Authored: Tue Mar 15 13:57:35 2016 -0400 Committer: smarthi <[email protected]> Committed: Tue Mar 15 13:57:35 2016 -0400 ---------------------------------------------------------------------- flink/pom.xml | 20 +- .../mahout/flinkbindings/FlinkByteBCast.scala | 3 + .../flinkbindings/FlinkDistributedContext.scala | 1 + .../mahout/flinkbindings/FlinkEngine.scala | 325 ++++++++++++------- .../mahout/flinkbindings/blas/FlinkOpAewB.scala | 58 ++-- .../flinkbindings/blas/FlinkOpAewScalar.scala | 33 +- .../mahout/flinkbindings/blas/FlinkOpAt.scala | 58 ++-- .../mahout/flinkbindings/blas/FlinkOpAtA.scala | 48 ++- .../mahout/flinkbindings/blas/FlinkOpAtB.scala | 25 +- .../mahout/flinkbindings/blas/FlinkOpAx.scala | 4 +- .../flinkbindings/blas/FlinkOpCBind.scala | 136 ++++---- .../flinkbindings/blas/FlinkOpMapBlock.scala | 34 +- .../flinkbindings/blas/FlinkOpRBind.scala | 5 +- .../flinkbindings/blas/FlinkOpRowRange.scala | 20 +- .../blas/FlinkOpTimesRightMatrix.scala | 54 ++- .../mahout/flinkbindings/blas/package.scala | 60 ---- .../drm/CheckpointedFlinkDrm.scala | 92 +++++- .../drm/CheckpointedFlinkDrmOps.scala | 1 - .../mahout/flinkbindings/drm/FlinkDrm.scala | 58 ++-- .../mahout/flinkbindings/io/DrmMetadata.scala | 16 +- .../flinkbindings/io/HDFSPathSearch.scala | 6 +- .../flinkbindings/io/Hadoop1HDFSUtil.scala | 86 ----- .../flinkbindings/io/Hadoop2HDFSUtil.scala | 94 ++++++ .../apache/mahout/flinkbindings/package.scala | 22 +- .../flinkbindings/DistributedFlinkSuite.scala | 2 + .../mahout/flinkbindings/DrmLikeOpsSuite.scala | 1 - .../flinkbindings/FailingTestsSuite.scala | 272 ++++++++++++++++ .../flinkbindings/FlinkByteBCastSuite.scala | 9 +- .../mahout/flinkbindings/RLikeOpsSuite.scala | 1 - .../mahout/flinkbindings/UseCasesSuite.scala | 1 - .../mahout/flinkbindings/blas/LATestSuite.scala | 3 +- .../DistributedDecompositionsSuite.scala | 1 - .../standard/DrmLikeOpsSuite.scala | 1 - .../flinkbindings/standard/DrmLikeSuite.scala | 1 - .../standard/NaiveBayesTestSuite.scala | 11 + .../standard/RLikeDrmOpsSuite.scala | 8 - math-scala/pom.xml | 2 +- .../math/drm/logical/OpTimesLeftMatrix.scala | 2 +- .../org/apache/mahout/math/drm/package.scala | 4 +- .../apache/mahout/math/scalabindings/MMul.scala | 45 ++- .../DistributedDecompositionsSuiteBase.scala | 4 +- .../mahout/math/drm/RLikeDrmOpsSuiteBase.scala | 3 + .../mahout/drivers/MahoutSparkDriver.scala | 4 +- .../drivers/MahoutSparkOptionParser.scala | 6 +- .../mahout/drivers/RowSimilarityDriver.scala | 5 +- .../drivers/TextDelimitedReaderWriter.scala | 10 +- .../apache/mahout/drivers/TrainNBDriver.scala | 21 +- .../sparkbindings/SparkDistributedContext.scala | 2 +- .../apache/mahout/sparkbindings/blas/ABt.scala | 2 +- .../test/DistributedSparkSuite.scala | 4 +- 50 files changed, 1023 insertions(+), 661 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/flink/pom.xml ---------------------------------------------------------------------- diff --git a/flink/pom.xml b/flink/pom.xml index 37f1dbf..2ccb558 100644 --- a/flink/pom.xml +++ b/flink/pom.xml @@ -103,14 +103,15 @@ </build> <dependencies> + <dependency> <groupId>org.apache.flink</groupId> - <artifactId>flink-core</artifactId> + <artifactId>flink-runtime_${scala.compat.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> - <artifactId>flink-scala_2.10</artifactId> + <artifactId>flink-scala_${scala.compat.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> @@ -120,15 +121,28 @@ </dependency> <dependency> <groupId>org.apache.flink</groupId> - <artifactId>flink-clients_2.10</artifactId> + <artifactId>flink-core</artifactId> <version>${flink.version}</version> </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-clients_${scala.compat.version}</artifactId> + <version>${flink.version}</version> + </dependency> + <dependency> <groupId>org.apache.mahout</groupId> <artifactId>mahout-math-scala_${scala.compat.version}</artifactId> </dependency> + <!-- enforce current version of kryo as of 0.10.1--> + <dependency> + <groupId>com.esotericsoftware.kryo</groupId> + <artifactId>kryo</artifactId> + <version>2.24.0</version> + </dependency> + <dependency> <groupId>org.apache.mahout</groupId> <artifactId>mahout-hdfs</artifactId> http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkByteBCast.scala ---------------------------------------------------------------------- diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkByteBCast.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkByteBCast.scala index 8544db0..5cdfb79 100644 --- a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkByteBCast.scala +++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkByteBCast.scala @@ -43,14 +43,17 @@ class FlinkByteBCast[T](private val arr: Array[Byte]) extends BCast[T] with Seri if (streamType == FlinkByteBCast.StreamTypeVector) { val writeable = new VectorWritable() writeable.readFields(stream) + // printf("broadcastValue: \n%s\n",writeable.get.asInstanceOf[T]) writeable.get.asInstanceOf[T] } else if (streamType == FlinkByteBCast.StreamTypeMatrix) { val writeable = new MatrixWritable() writeable.readFields(stream) + // printf("broadcastValue: \n%s\n",writeable.get.asInstanceOf[T]) writeable.get.asInstanceOf[T] } else { throw new IllegalArgumentException(s"unexpected type tag $streamType") } + } override def value: T = _value http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkDistributedContext.scala ---------------------------------------------------------------------- diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkDistributedContext.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkDistributedContext.scala index c818030..cfc9209 100644 --- a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkDistributedContext.scala +++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkDistributedContext.scala @@ -26,6 +26,7 @@ class FlinkDistributedContext(val env: ExecutionEnvironment) extends Distributed val engine: DistributedEngine = FlinkEngine + override def close() { // TODO } http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala ---------------------------------------------------------------------- diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala index d03aef7..f848c3f 100644 --- a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala +++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala @@ -18,32 +18,29 @@ */ package org.apache.mahout.flinkbindings -import scala.collection.JavaConversions._ -import scala.reflect.ClassTag import org.apache.flink.api.common.functions.MapFunction -import org.apache.flink.api.common.functions.ReduceFunction +import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.typeutils.TypeExtractor -import org.apache.hadoop.io.Writable +import org.apache.flink.api.scala._ +import org.apache.flink.api.scala.utils.DataSetUtils +import org.apache.hadoop.io.{IntWritable, LongWritable, Text} import org.apache.mahout.flinkbindings.blas._ import org.apache.mahout.flinkbindings.drm._ -import org.apache.mahout.flinkbindings.io.HDFSUtil -import org.apache.mahout.flinkbindings.io.Hadoop1HDFSUtil +import org.apache.mahout.flinkbindings.io.{HDFSUtil, Hadoop2HDFSUtil} import org.apache.mahout.math._ import org.apache.mahout.math.drm._ import org.apache.mahout.math.drm.logical._ -import org.apache.mahout.math.indexeddataset.BiDictionary -import org.apache.mahout.math.indexeddataset.IndexedDataset -import org.apache.mahout.math.indexeddataset.Schema -import org.apache.mahout.math.scalabindings._ +import org.apache.mahout.math.indexeddataset.{BiDictionary, IndexedDataset, Schema} import org.apache.mahout.math.scalabindings.RLikeOps._ +import org.apache.mahout.math.scalabindings._ -import org.apache.flink.api.scala._ - +import scala.collection.JavaConversions._ +import scala.reflect._ object FlinkEngine extends DistributedEngine { - // By default, use Hadoop 1 utils - var hdfsUtils: HDFSUtil = Hadoop1HDFSUtil + // By default, use Hadoop 2 utils + var hdfsUtils: HDFSUtil = Hadoop2HDFSUtil /** * Load DRM from hdfs (as in Mahout DRM format). @@ -51,7 +48,7 @@ object FlinkEngine extends DistributedEngine { * @param path The DFS path to load from * @param parMin Minimum parallelism after load (equivalent to #par(min=...)). */ - override def drmDfsRead(path: String, parMin: Int = 0) + override def drmDfsRead(path: String, parMin: Int = 1) (implicit dc: DistributedContext): CheckpointedDrm[_] = { // Require that context is actually Flink context. @@ -60,19 +57,47 @@ object FlinkEngine extends DistributedEngine { // Extract the Flink Environment variable implicit val env = dc.asInstanceOf[FlinkDistributedContext].env - val metadata = hdfsUtils.readDrmHeader(path) + // set the parallelism of the env to parMin + env.setParallelism(parMin) - val unwrapKey = metadata.unwrapKeyFunction + // get the header of a SequenceFile in the path + val metadata = hdfsUtils.readDrmHeader(path + "//") - val ds = env.readSequenceFile(classOf[Writable], classOf[VectorWritable], path) + val keyClass: Class[_] = metadata.keyTypeWritable - val res = ds.map(new MapFunction[(Writable, VectorWritable), (Any, Vector)] { - def map(tuple: (Writable, VectorWritable)): (Any, Vector) = { - (unwrapKey(tuple._1), tuple._2) - } - }) + // from the header determine which function to use to unwrap the key + val unwrapKey = metadata.unwrapKeyFunction + + // Map to the correct DrmLike based on the metadata information + if (metadata.keyClassTag == ClassTag.Int) { + val ds = env.readSequenceFile(classOf[IntWritable], classOf[VectorWritable], path) + + val res = ds.map(new MapFunction[(IntWritable, VectorWritable), (Any, Vector)] { + def map(tuple: (IntWritable, VectorWritable)): (Any, Vector) = { + (unwrapKey(tuple._1), tuple._2.get()) + } + }) + datasetWrap(res)(metadata.keyClassTag.asInstanceOf[ClassTag[Any]]) + } else if (metadata.keyClassTag == ClassTag.Long) { + val ds = env.readSequenceFile(classOf[LongWritable], classOf[VectorWritable], path) + + val res = ds.map(new MapFunction[(LongWritable, VectorWritable), (Any, Vector)] { + def map(tuple: (LongWritable, VectorWritable)): (Any, Vector) = { + (unwrapKey(tuple._1), tuple._2.get()) + } + }) + datasetWrap(res)(metadata.keyClassTag.asInstanceOf[ClassTag[Any]]) + } else if (metadata.keyClassTag == ClassTag(classOf[String])) { + val ds = env.readSequenceFile(classOf[Text], classOf[VectorWritable], path) + + val res = ds.map(new MapFunction[(Text, VectorWritable), (Any, Vector)] { + def map(tuple: (Text, VectorWritable)): (Any, Vector) = { + (unwrapKey(tuple._1), tuple._2.get()) + } + }) + datasetWrap(res)(metadata.keyClassTag.asInstanceOf[ClassTag[Any]]) + } else throw new IllegalArgumentException(s"Unsupported DRM key type:${keyClass.getName}") - datasetWrap(res)(metadata.keyClassTag.asInstanceOf[ClassTag[Any]]) } override def indexedDatasetDFSRead(src: String, schema: Schema, existingRowIDs: Option[BiDictionary]) @@ -82,89 +107,114 @@ object FlinkEngine extends DistributedEngine { (implicit sc: DistributedContext): IndexedDataset = ??? + /** + * Perform default expression rewrite. Return physical plan that we can pass to exec(). <P> + * + * A particular physical engine implementation may choose to either use or not use these rewrites + * as a useful basic rewriting rule.<P> + */ + override def optimizerRewrite[K: ClassTag](action: DrmLike[K]): DrmLike[K] = super.optimizerRewrite(action) + /** * Translates logical plan into Flink execution plan. **/ override def toPhysical[K: ClassTag](plan: DrmLike[K], ch: CacheHint.CacheHint): CheckpointedDrm[K] = { // Flink-specific Physical Plan translation. + + implicit val typeInformation = generateTypeInformation[K] val drm = flinkTranslate(plan) val newcp = new CheckpointedFlinkDrm(ds = drm.asRowWise.ds, _nrow = plan.nrow, _ncol = plan.ncol) + // newcp.ds.getExecutionEnvironment.createProgramPlan("plan") newcp.cache() } - private def flinkTranslate[K: ClassTag](oper: DrmLike[K]): FlinkDrm[K] = oper match { - case op @ OpAx(a, x) => FlinkOpAx.blockifiedBroadcastAx(op, flinkTranslate(a)(op.classTagA)) - case op @ OpAt(a) => FlinkOpAt.sparseTrick(op, flinkTranslate(a)(op.classTagA)) - case op @ OpAtx(a, x) => - // express Atx as (A.t) %*% x - // TODO: create specific implementation of Atx, see MAHOUT-1749 - val opAt = OpAt(a) - val at = FlinkOpAt.sparseTrick(opAt, flinkTranslate(a)(op.classTagA)) - val atCast = new CheckpointedFlinkDrm(at.asRowWise.ds, _nrow=opAt.nrow, _ncol=opAt.ncol) - val opAx = OpAx(atCast, x) - FlinkOpAx.blockifiedBroadcastAx(opAx, flinkTranslate(atCast)(op.classTagA)) - case op @ OpAtB(a, b) => FlinkOpAtB.notZippable(op, flinkTranslate(a)(op.classTagA), - flinkTranslate(b)(op.classTagA)) - case op @ OpABt(a, b) => - // express ABt via AtB: let C=At and D=Bt, and calculate CtD - // TODO: create specific implementation of ABt, see MAHOUT-1750 - val opAt = OpAt(a.asInstanceOf[DrmLike[Int]]) // TODO: casts! - val at = FlinkOpAt.sparseTrick(opAt, flinkTranslate(a.asInstanceOf[DrmLike[Int]])) - val c = new CheckpointedFlinkDrm(at.asRowWise.ds, _nrow=opAt.nrow, _ncol=opAt.ncol) - - val opBt = OpAt(b.asInstanceOf[DrmLike[Int]]) // TODO: casts! - val bt = FlinkOpAt.sparseTrick(opBt, flinkTranslate(b.asInstanceOf[DrmLike[Int]])) - val d = new CheckpointedFlinkDrm(bt.asRowWise.ds, _nrow=opBt.nrow, _ncol=opBt.ncol) - - FlinkOpAtB.notZippable(OpAtB(c, d), flinkTranslate(c), flinkTranslate(d)) - .asInstanceOf[FlinkDrm[K]] - case op @ OpAtA(a) => FlinkOpAtA.at_a(op, flinkTranslate(a)(op.classTagA)) - case op @ OpTimesRightMatrix(a, b) => - FlinkOpTimesRightMatrix.drmTimesInCore(op, flinkTranslate(a)(op.classTagA), b) - case op @ OpAewUnaryFunc(a, _, _) => - FlinkOpAewScalar.opUnaryFunction(op, flinkTranslate(a)(op.classTagA)) - case op @ OpAewUnaryFuncFusion(a, _) => - FlinkOpAewScalar.opUnaryFunction(op, flinkTranslate(a)(op.classTagA)) - // deprecated - case op @ OpAewScalar(a, scalar, _) => - FlinkOpAewScalar.opScalarNoSideEffect(op, flinkTranslate(a)(op.classTagA), scalar) - case op @ OpAewB(a, b, _) => - FlinkOpAewB.rowWiseJoinNoSideEffect(op, flinkTranslate(a)(op.classTagA), flinkTranslate(b)(op.classTagA)) - case op @ OpCbind(a, b) => - FlinkOpCBind.cbind(op, flinkTranslate(a)(op.classTagA), flinkTranslate(b)(op.classTagA)) - case op @ OpRbind(a, b) => - FlinkOpRBind.rbind(op, flinkTranslate(a)(op.classTagA), flinkTranslate(b)(op.classTagA)) - case op @ OpCbindScalar(a, x, _) => - FlinkOpCBind.cbindScalar(op, flinkTranslate(a)(op.classTagA), x) - case op @ OpRowRange(a, _) => - FlinkOpRowRange.slice(op, flinkTranslate(a)(op.classTagA)) - case op @ OpABAnyKey(a, b) if extractRealClassTag(a) != extractRealClassTag(b) => - throw new IllegalArgumentException("DRMs A and B have different indices, cannot multiply them") - case op: OpMapBlock[K, _] => - FlinkOpMapBlock.apply(flinkTranslate(op.A)(op.classTagA), op.ncol, op.bmf) - case cp: CheckpointedFlinkDrm[K] => new RowsFlinkDrm(cp.ds, cp.ncol) - case _ => throw new NotImplementedError(s"operator $oper is not implemented yet") + private def flinkTranslate[K](oper: DrmLike[K]): FlinkDrm[K] = { + implicit val kTag = oper.keyClassTag + implicit val typeInformation = generateTypeInformation[K] + oper match { + case OpAtAnyKey(_) â + throw new IllegalArgumentException("\"A\" must be Int-keyed in this A.t expression.") + case op@OpAx(a, x) â + //implicit val typeInformation = generateTypeInformation[K] + FlinkOpAx.blockifiedBroadcastAx(op, flinkTranslate(a)) + case op@OpAt(a) if op.keyClassTag == ClassTag.Int â FlinkOpAt.sparseTrick(op, flinkTranslate(a)).asInstanceOf[FlinkDrm[K]] + case op@OpAtx(a, x) if op.keyClassTag == ClassTag.Int â + // express Atx as (A.t) %*% x + // TODO: create specific implementation of Atx, see MAHOUT-1749 + val opAt = OpAt(a) + val at = FlinkOpAt.sparseTrick(opAt, flinkTranslate(a)) + val atCast = new CheckpointedFlinkDrm(at.asRowWise.ds, _nrow = opAt.nrow, _ncol = opAt.ncol) + val opAx = OpAx(atCast, x) + FlinkOpAx.blockifiedBroadcastAx(opAx, flinkTranslate(atCast)).asInstanceOf[FlinkDrm[K]] + case op@OpAtB(a, b) â FlinkOpAtB.notZippable(op, flinkTranslate(a), + flinkTranslate(b)).asInstanceOf[FlinkDrm[K]] + case op@OpABt(a, b) â + // express ABt via AtB: let C=At and D=Bt, and calculate CtD + // TODO: create specific implementation of ABt, see MAHOUT-1750 + val opAt = OpAt(a.asInstanceOf[DrmLike[Int]]) // TODO: casts! + val at = FlinkOpAt.sparseTrick(opAt, flinkTranslate(a.asInstanceOf[DrmLike[Int]])) + val c = new CheckpointedFlinkDrm(at.asRowWise.ds, _nrow = opAt.nrow, _ncol = opAt.ncol) + val opBt = OpAt(b.asInstanceOf[DrmLike[Int]]) // TODO: casts! + val bt = FlinkOpAt.sparseTrick(opBt, flinkTranslate(b.asInstanceOf[DrmLike[Int]])) + val d = new CheckpointedFlinkDrm(bt.asRowWise.ds, _nrow = opBt.nrow, _ncol = opBt.ncol) + FlinkOpAtB.notZippable(OpAtB(c, d), flinkTranslate(c), flinkTranslate(d)).asInstanceOf[FlinkDrm[K]] + case op@OpAtA(a) if op.keyClassTag == ClassTag.Int â FlinkOpAtA.at_a(op, flinkTranslate(a)).asInstanceOf[FlinkDrm[K]] + case op@OpTimesRightMatrix(a, b) â + FlinkOpTimesRightMatrix.drmTimesInCore(op, flinkTranslate(a), b) + case op@OpAewUnaryFunc(a, _, _) â + FlinkOpAewScalar.opUnaryFunction(op, flinkTranslate(a)) + case op@OpAewUnaryFuncFusion(a, _) â + FlinkOpAewScalar.opUnaryFunction(op, flinkTranslate(a)) + // deprecated + case op@OpAewScalar(a, scalar, _) â + FlinkOpAewScalar.opScalarNoSideEffect(op, flinkTranslate(a), scalar) + case op@OpAewB(a, b, _) â + FlinkOpAewB.rowWiseJoinNoSideEffect(op, flinkTranslate(a), flinkTranslate(b)) + case op@OpCbind(a, b) â + FlinkOpCBind.cbind(op, flinkTranslate(a), flinkTranslate(b)) + case op@OpRbind(a, b) â + FlinkOpRBind.rbind(op, flinkTranslate(a), flinkTranslate(b)) + case op@OpCbindScalar(a, x, _) â + FlinkOpCBind.cbindScalar(op, flinkTranslate(a), x) + case op@OpRowRange(a, _) â + FlinkOpRowRange.slice(op, flinkTranslate(a)).asInstanceOf[FlinkDrm[K]] + case op@OpABAnyKey(a, b) if a.keyClassTag != b.keyClassTag â + throw new IllegalArgumentException("DRMs A and B have different indices, cannot multiply them") + case op: OpMapBlock[K, _] â + FlinkOpMapBlock.apply(flinkTranslate(op.A), op.ncol, op).asInstanceOf[FlinkDrm[K]] + case cp: CheckpointedFlinkDrm[K] â + //implicit val ktag=cp.keyClassTag + new RowsFlinkDrm[K](cp.ds, cp.ncol) + case _ â + throw new NotImplementedError(s"operator $oper is not implemented yet") + } } /** * returns a vector that contains a column-wise sum from DRM */ - override def colSums[K: ClassTag](drm: CheckpointedDrm[K]): Vector = { - val sum = drm.ds.map(new MapFunction[(K, Vector), Vector] { - def map(tuple: (K, Vector)): Vector = tuple._2 - }).reduce(new ReduceFunction[Vector] { - def reduce(v1: Vector, v2: Vector) = v1 + v2 - }) + override def colSums[K](drm: CheckpointedDrm[K]): Vector = { + implicit val kTag: ClassTag[K] = drm.keyClassTag + implicit val typeInformation = generateTypeInformation[K] + + + val sum = drm.ds.map { + tuple => tuple._2 + }.reduce(_ + _) val list = sum.collect list.head } /** Engine-specific numNonZeroElementsPerColumn implementation based on a checkpoint. */ - override def numNonZeroElementsPerColumn[K: ClassTag](drm: CheckpointedDrm[K]): Vector = { - val result = drm.asBlockified.ds.map(new MapFunction[(Array[K], Matrix), Vector] { - def map(tuple: (Array[K], Matrix)): Vector = { - val (_, block) = tuple + override def numNonZeroElementsPerColumn[K](drm: CheckpointedDrm[K]): Vector = { + implicit val kTag: ClassTag[K] = drm.keyClassTag + implicit val typeInformation = generateTypeInformation[K] + + + val result = drm.asBlockified.ds.map { + tuple => + val block = tuple._2 val acc = block(0, ::).like() block.foreach { v => @@ -172,10 +222,7 @@ object FlinkEngine extends DistributedEngine { } acc - } - }).reduce(new ReduceFunction[Vector] { - def reduce(v1: Vector, v2: Vector) = v1 + v2 - }) + }.reduce(_ + _) val list = result.collect list.head @@ -184,21 +231,30 @@ object FlinkEngine extends DistributedEngine { /** * returns a vector that contains a column-wise mean from DRM */ - override def colMeans[K: ClassTag](drm: CheckpointedDrm[K]): Vector = { + override def colMeans[K](drm: CheckpointedDrm[K]): Vector = { drm.colSums() / drm.nrow } /** * Calculates the element-wise squared norm of a matrix */ - override def norm[K: ClassTag](drm: CheckpointedDrm[K]): Double = { - val sumOfSquares = drm.ds.map(new MapFunction[(K, Vector), Double] { - def map(tuple: (K, Vector)): Double = tuple match { + override def norm[K](drm: CheckpointedDrm[K]): Double = { + implicit val kTag: ClassTag[K] = drm.keyClassTag + implicit val typeInformation = generateTypeInformation[K] + + val sumOfSquares = drm.ds.map { + tuple => tuple match { case (idx, vec) => vec dot vec } - }).reduce(new ReduceFunction[Double] { - def reduce(v1: Double, v2: Double) = v1 + v2 - }) + }.reduce(_ + _) + +// val sumOfSquares = drm.ds.map(new MapFunction[(K, Vector), Double] { +// def map(tuple: (K, Vector)): Double = tuple match { +// case (idx, vec) => vec dot vec +// } +// }).reduce(new ReduceFunction[Double] { +// def reduce(v1: Double, v2: Double) = v1 + v2 +// }) val list = sumOfSquares.collect list.head @@ -214,7 +270,7 @@ object FlinkEngine extends DistributedEngine { FlinkByteBCast.wrap(m) - /** Parallelize in-core matrix as spark distributed matrix, using row ordinal indices as data set keys. */ + /** Parallelize in-core matrix as flink distributed matrix, using row ordinal indices as data set keys. */ override def drmParallelizeWithRowIndices(m: Matrix, numPartitions: Int = 1) (implicit dc: DistributedContext): CheckpointedDrm[Int] = { @@ -223,14 +279,19 @@ object FlinkEngine extends DistributedEngine { new CheckpointedFlinkDrm(ds=parallelDrm, _nrow=m.numRows(), _ncol=m.numCols()) } + private[flinkbindings] def parallelize(m: Matrix, parallelismDegree: Int) (implicit dc: DistributedContext): DrmDataSet[Int] = { - val rows = (0 until m.nrow).map(i => (i, m(i, ::))) + val rows = (0 until m.nrow).map(i => (i, m(i, ::)))//.toSeq.sortWith((ii, jj) => ii._1 < jj._1) val dataSetType = TypeExtractor.getForObject(rows.head) - dc.env.fromCollection(rows).setParallelism(parallelismDegree) + //TODO: Make Sure that this is the correct partitioning scheme + dc.env.fromCollection(rows) + .partitionByRange(0) + .setParallelism(parallelismDegree) + .rebalance() } - /** Parallelize in-core matrix as spark distributed matrix, using row labels as a data set keys. */ + /** Parallelize in-core matrix as flink distributed matrix, using row labels as a data set keys. */ override def drmParallelizeWithRowLabels(m: Matrix, numPartitions: Int = 1) (implicit dc: DistributedContext): CheckpointedDrm[String] = { ??? @@ -247,7 +308,7 @@ object FlinkEngine extends DistributedEngine { for (i <- partStart until partEnd) yield (i, new RandomAccessSparseVector(ncol): Vector) } val result = dc.env.fromCollection(nonParallelResult) - new CheckpointedFlinkDrm(ds=result, _nrow=nrow, _ncol=ncol) + new CheckpointedFlinkDrm[Int](ds=result, _nrow=nrow, _ncol=ncol) } /** Creates empty DRM with non-trivial height */ @@ -259,29 +320,53 @@ object FlinkEngine extends DistributedEngine { * Convert non-int-keyed matrix to an int-keyed, computing optionally mapping from old keys * to row indices in the new one. The mapping, if requested, is returned as a 1-column matrix. */ - def drm2IntKeyed[K: ClassTag](drmX: DrmLike[K], computeMap: Boolean = false): + def drm2IntKeyed[K](drmX: DrmLike[K], computeMap: Boolean = false): (DrmLike[Int], Option[DrmLike[K]]) = ??? /** - * (Optional) Sampling operation. Consistent with Spark semantics of the same. + * (Optional) Sampling operation. */ - def drmSampleRows[K: ClassTag](drmX: DrmLike[K], fraction: Double, replacement: Boolean = false): DrmLike[K] = ??? - - def drmSampleKRows[K: ClassTag](drmX: DrmLike[K], numSamples:Int, replacement: Boolean = false): Matrix = ??? - -// def drmSampleKRows[K: ClassTag](drmX: DrmLike[K], numSamples:Int, replacement: Boolean = false): Matrix = { -// -// val ncol = drmX match { -// case cp: CheckpointedFlinkDrm[K] â cp.ncol -// case _ â -1 -// } -// -// val sample = DataSetUtils.sampleWithSize(drmX.dataset, replacement, numSamples) -// -// } + def drmSampleRows[K](drmX: DrmLike[K], fraction: Double, replacement: Boolean = false): DrmLike[K] = { + implicit val kTag: ClassTag[K] = drmX.keyClassTag + implicit val typeInformation = generateTypeInformation[K] + + val sample = DataSetUtils(drmX.dataset).sample(replacement, fraction) + new CheckpointedFlinkDrm[K](sample) + } + + def drmSampleKRows[K](drmX: DrmLike[K], numSamples:Int, replacement: Boolean = false): Matrix = { + implicit val kTag: ClassTag[K] = drmX.keyClassTag + implicit val typeInformation = generateTypeInformation[K] + + val sample = DataSetUtils(drmX.dataset).sampleWithSize(replacement, numSamples) + new CheckpointedFlinkDrm[K](sample) + } /** Optional engine-specific all reduce tensor operation. */ - def allreduceBlock[K: ClassTag](drm: CheckpointedDrm[K], bmf: BlockMapFunc2[K], rf: BlockReduceFunc): Matrix = + def allreduceBlock[K](drm: CheckpointedDrm[K], bmf: BlockMapFunc2[K], rf: BlockReduceFunc): Matrix = throw new UnsupportedOperationException("the operation allreduceBlock is not yet supported on Flink") - + +// private def generateTypeInformation[K]: TypeInformation[K] = { +// val tag = implicitly[K].asInstanceOf[ClassTag[K]] +// generateTypeInformationFromTag(tag) +// } + private def generateTypeInformation[K: ClassTag]: TypeInformation[K] = { + val tag = implicitly[ClassTag[K]] + + generateTypeInformationFromTag(tag) + } + + private def generateTypeInformationFromTag[K](tag: ClassTag[K]): TypeInformation[K] = { + if (tag.runtimeClass.equals(classOf[Int])) { + createTypeInformation[Int].asInstanceOf[TypeInformation[K]] + } else if (tag.runtimeClass.equals(classOf[Long])) { + createTypeInformation[Long].asInstanceOf[TypeInformation[K]] + } else if (tag.runtimeClass.equals(classOf[String])) { + createTypeInformation[String].asInstanceOf[TypeInformation[K]] +// } else if (tag.runtimeClass.equals(classOf[Any])) { +// createTypeInformation[Any].asInstanceOf[TypeInformation[K]] + } else { + throw new IllegalArgumentException(s"index type $tag is not supported") + } + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAewB.scala ---------------------------------------------------------------------- diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAewB.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAewB.scala index f879e86..c61074b 100644 --- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAewB.scala +++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAewB.scala @@ -1,56 +1,42 @@ package org.apache.mahout.flinkbindings.blas -import java.lang.Iterable -import scala.collection.JavaConverters._ -import scala.reflect.ClassTag - -import org.apache.flink.api.common.functions.CoGroupFunction -import org.apache.flink.api.scala.DataSet +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.scala._ import org.apache.flink.util.Collector -import org.apache.mahout.flinkbindings._ -import org.apache.mahout.flinkbindings.drm.FlinkDrm -import org.apache.mahout.flinkbindings.drm.RowsFlinkDrm +import org.apache.mahout.flinkbindings.drm.{FlinkDrm, RowsFlinkDrm} import org.apache.mahout.math.Vector import org.apache.mahout.math.drm.logical.OpAewB import org.apache.mahout.math.scalabindings.RLikeOps._ -import com.google.common.collect.Lists - /** * Implementation is inspired by Spark-binding's OpAewB * (see https://github.com/apache/mahout/blob/master/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AewB.scala) */ object FlinkOpAewB { - def rowWiseJoinNoSideEffect[K: ClassTag](op: OpAewB[K], A: FlinkDrm[K], B: FlinkDrm[K]): FlinkDrm[K] = { + def rowWiseJoinNoSideEffect[K: TypeInformation](op: OpAewB[K], A: FlinkDrm[K], B: FlinkDrm[K]): FlinkDrm[K] = { val function = AewBOpsCloning.strToFunction(op.op) - val classTag = extractRealClassTag(op.A) - val joiner = selector[Vector, Any](classTag.asInstanceOf[ClassTag[Any]]) - - val rowsA = A.asRowWise.ds.asInstanceOf[DrmDataSet[Any]] - val rowsB = B.asRowWise.ds.asInstanceOf[DrmDataSet[Any]] - - val res: DataSet[(Any, Vector)] = - rowsA.coGroup(rowsB).where(joiner).equalTo(joiner) - .`with`(new CoGroupFunction[(_, Vector), (_, Vector), (_, Vector)] { - def coGroup(it1java: Iterable[(_, Vector)], it2java: Iterable[(_, Vector)], - out: Collector[(_, Vector)]): Unit = { - val it1 = Lists.newArrayList(it1java).asScala - val it2 = Lists.newArrayList(it2java).asScala - - if (it1.nonEmpty && it2.nonEmpty) { - val (idx, a) = it1.head - val (_, b) = it2.head - out.collect((idx, function(a, b))) - } else if (it1.isEmpty && it2.nonEmpty) { - out.collect(it2.head) - } else if (it1.nonEmpty && it2.isEmpty) { - out.collect(it1.head) - } + val rowsA = A.asRowWise.ds + val rowsB = B.asRowWise.ds + implicit val kTag = op.keyClassTag + + val res: DataSet[(K, Vector)] = + rowsA + .coGroup(rowsB) + .where(0) + .equalTo(0) { + (left, right, out: Collector[(K, Vector)]) => + (left.toIterable.headOption, right.toIterable.headOption) match { + case (Some((idx, a)), Some((_, b))) => out.collect((idx, function(a, b))) + case (None, Some(b)) => out.collect(b) + case (Some(a), None) => out.collect(a) + case (None, None) => throw new RuntimeException("At least one side of the co group " + + "must be non-empty.") + } } - }) + new RowsFlinkDrm(res.asInstanceOf[DataSet[(K, Vector)]], ncol=op.ncol) } http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAewScalar.scala ---------------------------------------------------------------------- diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAewScalar.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAewScalar.scala index 67d710b..56e7deb 100644 --- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAewScalar.scala +++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAewScalar.scala @@ -19,6 +19,7 @@ package org.apache.mahout.flinkbindings.blas import org.apache.flink.api.common.functions.MapFunction +import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.mahout.flinkbindings.drm.{BlockifiedFlinkDrm, FlinkDrm} import org.apache.mahout.math.Matrix import org.apache.mahout.math.drm.logical.{AbstractUnaryOp, OpAewScalar, TEwFunc} @@ -39,43 +40,45 @@ object FlinkOpAewScalar { private def isInplace = System.getProperty(PROPERTY_AEWB_INPLACE, "false").toBoolean @Deprecated - def opScalarNoSideEffect[K: ClassTag](op: OpAewScalar[K], A: FlinkDrm[K], scalar: Double): FlinkDrm[K] = { + def opScalarNoSideEffect[K: TypeInformation](op: OpAewScalar[K], A: FlinkDrm[K], scalar: Double): FlinkDrm[K] = { val function = EWOpsCloning.strToFunction(op.op) + implicit val kTag = op.keyClassTag - val res = A.asBlockified.ds.map(new MapFunction[(Array[K], Matrix), (Array[K], Matrix)] { - def map(tuple: (Array[K], Matrix)): (Array[K], Matrix) = tuple match { - case (keys, mat) => (keys, function(mat, scalar)) - } - }) + + val res = A.asBlockified.ds.map{ + tuple => (tuple._1, function(tuple._2, scalar)) + } new BlockifiedFlinkDrm(res, op.ncol) } - def opUnaryFunction[K: ClassTag](op: AbstractUnaryOp[K, K] with TEwFunc, A: FlinkDrm[K]): FlinkDrm[K] = { + def opUnaryFunction[K: TypeInformation](op: AbstractUnaryOp[K, K] with TEwFunc, A: FlinkDrm[K]): FlinkDrm[K] = { val f = op.f val inplace = isInplace + + implicit val kTag = op.keyClassTag + val res = if (op.evalZeros) { - A.asBlockified.ds.map(new MapFunction[(Array[K], Matrix), (Array[K], Matrix)] { - def map(tuple: (Array[K], Matrix)): (Array[K], Matrix) = { + A.asBlockified.ds.map{ + tuple => val (keys, block) = tuple val newBlock = if (inplace) block else block.cloned newBlock := ((_, _, x) => f(x)) (keys, newBlock) - } - }) + } } else { - A.asBlockified.ds.map(new MapFunction[(Array[K], Matrix), (Array[K], Matrix)] { - def map(tuple: (Array[K], Matrix)): (Array[K], Matrix) = { + A.asBlockified.ds.map{ + tuple => val (keys, block) = tuple val newBlock = if (inplace) block else block.cloned for (row <- newBlock; el <- row.nonZeroes) el := f(el.get) (keys, newBlock) - } - }) + } } new BlockifiedFlinkDrm(res, op.ncol) + } } http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAt.scala ---------------------------------------------------------------------- diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAt.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAt.scala index e515b34..6e320af 100644 --- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAt.scala +++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAt.scala @@ -18,25 +18,13 @@ */ package org.apache.mahout.flinkbindings.blas -import java.lang.Iterable - -import scala.Array.canBuildFrom -import scala.collection.JavaConverters.asScalaBufferConverter - -import org.apache.flink.api.common.functions.FlatMapFunction -import org.apache.flink.api.common.functions.GroupReduceFunction -import org.apache.flink.shaded.com.google.common.collect.Lists -import org.apache.flink.util.Collector -import org.apache.mahout.flinkbindings.drm.FlinkDrm -import org.apache.mahout.flinkbindings.drm.RowsFlinkDrm -import org.apache.mahout.math.Matrix -import org.apache.mahout.math.SequentialAccessSparseVector -import org.apache.mahout.math.Vector -import org.apache.mahout.math.drm.DrmTuple +import org.apache.flink.api.scala._ +import org.apache.mahout.flinkbindings.drm.{FlinkDrm, RowsFlinkDrm} +import org.apache.mahout.math.{SequentialAccessSparseVector, Vector} import org.apache.mahout.math.drm.logical.OpAt import org.apache.mahout.math.scalabindings.RLikeOps._ -import org.apache.flink.api.scala._ +import scala.Array.canBuildFrom /** * Implementation is taken from Spark's At @@ -52,34 +40,34 @@ object FlinkOpAt { def sparseTrick(op: OpAt, A: FlinkDrm[Int]): FlinkDrm[Int] = { val ncol = op.ncol // # of rows of A, i.e. # of columns of A^T - val sparseParts = A.asBlockified.ds.flatMap(new FlatMapFunction[(Array[Int], Matrix), DrmTuple[Int]] { - def flatMap(typle: (Array[Int], Matrix), out: Collector[DrmTuple[Int]]): Unit = typle match { - case (keys, block) => - (0 until block.ncol).map(columnIdx => { + val sparseParts = A.asBlockified.ds.flatMap { + blockifiedTuple => + val keys = blockifiedTuple._1 + val block = blockifiedTuple._2 + + (0 until block.ncol).map { + columnIndex => val columnVector: Vector = new SequentialAccessSparseVector(ncol) - keys.zipWithIndex.foreach { case (key, idx) => - columnVector(key) = block(idx, columnIdx) + keys.zipWithIndex.foreach { + case (key, idx) => columnVector(key) = block(idx, columnIndex) } - out.collect((columnIdx, columnVector)) - }) - } - }) + (columnIndex, columnVector) + } + } - val regrouped = sparseParts.groupBy(selector[Vector, Int]) + val regrouped = sparseParts.groupBy(0) - val sparseTotal = regrouped.reduceGroup(new GroupReduceFunction[(Int, Vector), DrmTuple[Int]] { - def reduce(values: Iterable[(Int, Vector)], out: Collector[DrmTuple[Int]]): Unit = { - val it = Lists.newArrayList(values).asScala - val (idx, _) = it.head - val vector = (it map { case (idx, vec) => vec }).sum - out.collect((idx, vector)) - } - }) + val sparseTotal = regrouped.reduce{ + (left, right) => + (left._1, left._2 + right._2) + } // TODO: densify or not? new RowsFlinkDrm(sparseTotal, ncol) } + + } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtA.scala ---------------------------------------------------------------------- diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtA.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtA.scala index 629857a..bdb0e5e 100644 --- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtA.scala +++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtA.scala @@ -2,22 +2,20 @@ package org.apache.mahout.flinkbindings.blas import java.lang.Iterable -import scala.collection.JavaConverters._ - import org.apache.flink.api.common.functions._ -import org.apache.flink.api.scala.DataSet +import org.apache.flink.api.scala._ import org.apache.flink.configuration.Configuration import org.apache.flink.shaded.com.google.common.collect.Lists import org.apache.flink.util.Collector import org.apache.mahout.flinkbindings._ import org.apache.mahout.flinkbindings.drm._ import org.apache.mahout.math.Matrix -import org.apache.mahout.math.drm._ -import org.apache.mahout.math.drm.BlockifiedDrmTuple +import org.apache.mahout.math.drm.{BlockifiedDrmTuple, _} import org.apache.mahout.math.drm.logical.OpAtA -import org.apache.mahout.math.scalabindings._ import org.apache.mahout.math.scalabindings.RLikeOps._ +import org.apache.mahout.math.scalabindings._ +import scala.collection.JavaConverters._ /** * Inspired by Spark's implementation from @@ -29,61 +27,59 @@ object FlinkOpAtA { final val PROPERTY_ATA_MAXINMEMNCOL = "mahout.math.AtA.maxInMemNCol" final val PROPERTY_ATA_MAXINMEMNCOL_DEFAULT = "200" - def at_a(op: OpAtA[_], A: FlinkDrm[_]): FlinkDrm[Int] = { + def at_a[K](op: OpAtA[K], A: FlinkDrm[K]): FlinkDrm[Int] = { val maxInMemStr = System.getProperty(PROPERTY_ATA_MAXINMEMNCOL, PROPERTY_ATA_MAXINMEMNCOL_DEFAULT) val maxInMemNCol = maxInMemStr.toInt maxInMemNCol.ensuring(_ > 0, "Invalid A'A in-memory setting for optimizer") + implicit val kTag = A.classTag + if (op.ncol <= maxInMemNCol) { implicit val ctx = A.context val inCoreAtA = slim(op, A) val result = drmParallelize(inCoreAtA, numPartitions = 1) result } else { - fat(op.asInstanceOf[OpAtA[Any]], A.asInstanceOf[FlinkDrm[Any]]) + fat(op.asInstanceOf[OpAtA[K]], A.asInstanceOf[FlinkDrm[K]]) } } - def slim(op: OpAtA[_], A: FlinkDrm[_]): Matrix = { - val ds = A.asBlockified.ds.asInstanceOf[DataSet[(Array[Any], Matrix)]] + def slim[K](op: OpAtA[K], A: FlinkDrm[K]): Matrix = { + val ds = A.asBlockified.ds.asInstanceOf[DataSet[(Array[K], Matrix)]] - val res = ds.map(new MapFunction[(Array[Any], Matrix), Matrix] { + val res = ds.map { // TODO: optimize it: use upper-triangle matrices like in Spark - def map(block: (Array[Any], Matrix)): Matrix = block match { - case (idx, m) => m.t %*% m - } - }).reduce(new ReduceFunction[Matrix] { - def reduce(m1: Matrix, m2: Matrix) = m1 + m2 - }).collect() + block => block._2.t %*% block._2 + }.reduce(_ + _).collect() res.head } - def fat(op: OpAtA[Any], A: FlinkDrm[Any]): FlinkDrm[Int] = { + def fat[K](op: OpAtA[K], A: FlinkDrm[K]): FlinkDrm[Int] = { val nrow = op.A.nrow val ncol = op.A.ncol val ds = A.asBlockified.ds - val numberOfPartitions: DataSet[Int] = ds.map(new MapFunction[(Array[Any], Matrix), Int] { - def map(a: (Array[Any], Matrix)): Int = 1 + val numberOfPartitions: DataSet[Int] = ds.map(new MapFunction[(Array[K], Matrix), Int] { + def map(a: (Array[K], Matrix)): Int = 1 }).reduce(new ReduceFunction[Int] { def reduce(a: Int, b: Int): Int = a + b }) - val subresults: DataSet[(Int, Matrix)] = - ds.flatMap(new RichFlatMapFunction[(Array[Any], Matrix), (Int, Matrix)] { + val subresults: DataSet[(Int, Matrix)] = + ds.flatMap(new RichFlatMapFunction[(Array[K], Matrix), (Int, Matrix)] { var ranges: Array[Range] = null override def open(params: Configuration): Unit = { - val runtime = this.getRuntimeContext() + val runtime = this.getRuntimeContext val dsX: java.util.List[Int] = runtime.getBroadcastVariable("numberOfPartitions") val parts = dsX.get(0) val numParts = estimatePartitions(nrow, ncol, parts) ranges = computeEvenSplits(ncol, numParts) } - def flatMap(tuple: (Array[Any], Matrix), out: Collector[(Int, Matrix)]): Unit = { + def flatMap(tuple: (Array[K], Matrix), out: Collector[(Int, Matrix)]): Unit = { val block = tuple._2 ranges.zipWithIndex.foreach { case (range, idx) => @@ -93,13 +89,13 @@ object FlinkOpAtA { }).withBroadcastSet(numberOfPartitions, "numberOfPartitions") - val res = subresults.groupBy(selector[Matrix, Int]) + val res = subresults.groupBy(0) .reduceGroup(new RichGroupReduceFunction[(Int, Matrix), BlockifiedDrmTuple[Int]] { var ranges: Array[Range] = null override def open(params: Configuration): Unit = { - val runtime = this.getRuntimeContext() + val runtime = this.getRuntimeContext val dsX: java.util.List[Int] = runtime.getBroadcastVariable("numberOfPartitions") val parts = dsX.get(0) val numParts = estimatePartitions(nrow, ncol, parts) http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtB.scala ---------------------------------------------------------------------- diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtB.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtB.scala index b514868..6a081ba 100644 --- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtB.scala +++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtB.scala @@ -46,25 +46,22 @@ import org.apache.flink.api.scala._ */ object FlinkOpAtB { - def notZippable[K: ClassTag](op: OpAtB[K], At: FlinkDrm[K], B: FlinkDrm[K]): FlinkDrm[Int] = { - val classTag = extractRealClassTag(op.A) - val joiner = selector[Vector, Any](classTag.asInstanceOf[ClassTag[Any]]) + def notZippable[A](op: OpAtB[A], At: FlinkDrm[A], B: FlinkDrm[A]): FlinkDrm[Int] = { - val rowsAt = At.asRowWise.ds.asInstanceOf[DrmDataSet[Any]] - val rowsB = B.asRowWise.ds.asInstanceOf[DrmDataSet[Any]] - val joined = rowsAt.join(rowsB).where(joiner).equalTo(joiner) + val rowsAt = At.asRowWise.ds.asInstanceOf[DrmDataSet[A]] + val rowsB = B.asRowWise.ds.asInstanceOf[DrmDataSet[A]] + val joined = rowsAt.join(rowsB).where(0).equalTo(0) val ncol = op.ncol val nrow = op.nrow.toInt val blockHeight = 10 val blockCount = safeToNonNegInt((nrow - 1) / blockHeight + 1) - val preProduct: DataSet[(Int, Matrix)] = - joined.flatMap(new FlatMapFunction[Tuple2[(_, Vector), (_, Vector)], (Int, Matrix)] { - def flatMap(in: Tuple2[(_, Vector), (_, Vector)], - out: Collector[(Int, Matrix)]): Unit = { + val preProduct: DataSet[(Int, Matrix)] = + joined.flatMap(new FlatMapFunction[((A, Vector), (A, Vector)), (Int, Matrix)] { + def flatMap(in: ((A, Vector), (A, Vector)), out: Collector[(Int, Matrix)]): Unit = { val avec = in._1._2 - val bvec = in._1._2 + val bvec = in._2._2 0.until(blockCount) map { blockKey => val blockStart = blockKey * blockHeight @@ -72,13 +69,13 @@ object FlinkOpAtB { val outer = avec(blockStart until blockEnd) cross bvec out.collect(blockKey -> outer) + out } } }) val res: BlockifiedDrmDataSet[Int] = - preProduct.groupBy(selector[Matrix, Int]) - .reduceGroup(new GroupReduceFunction[(Int, Matrix), BlockifiedDrmTuple[Int]] { + preProduct.groupBy(0).reduceGroup(new GroupReduceFunction[(Int, Matrix), BlockifiedDrmTuple[Int]] { def reduce(values: Iterable[(Int, Matrix)], out: Collector[BlockifiedDrmTuple[Int]]): Unit = { val it = Lists.newArrayList(values).asScala val (idx, _) = it.head @@ -90,7 +87,7 @@ object FlinkOpAtB { } }) - new BlockifiedFlinkDrm(res, ncol) + new BlockifiedFlinkDrm[Int](res, ncol) } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAx.scala ---------------------------------------------------------------------- diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAx.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAx.scala index 4302457..79f5fe8 100644 --- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAx.scala +++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAx.scala @@ -21,6 +21,7 @@ package org.apache.mahout.flinkbindings.blas import java.util.List import org.apache.flink.api.common.functions.RichMapFunction +import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.configuration.Configuration import org.apache.mahout.flinkbindings.drm.{BlockifiedFlinkDrm, FlinkDrm} import org.apache.mahout.math.{Matrix, Vector} @@ -38,8 +39,9 @@ import org.apache.flink.api.scala._ */ object FlinkOpAx { - def blockifiedBroadcastAx[K: ClassTag](op: OpAx[K], A: FlinkDrm[K]): FlinkDrm[K] = { + def blockifiedBroadcastAx[K: TypeInformation](op: OpAx[K], A: FlinkDrm[K]): FlinkDrm[K] = { implicit val ctx = A.context + implicit val kTag = op.keyClassTag val singletonDataSetX = ctx.env.fromElements(op.x) http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpCBind.scala ---------------------------------------------------------------------- diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpCBind.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpCBind.scala index 6cf5e5c..65b2a25 100644 --- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpCBind.scala +++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpCBind.scala @@ -19,12 +19,14 @@ package org.apache.mahout.flinkbindings.blas import java.lang.Iterable +import org.apache.flink.api.common.typeinfo.TypeInformation + import scala.collection.JavaConverters._ import scala.collection.JavaConversions._ import scala.reflect.ClassTag import org.apache.flink.api.common.functions.CoGroupFunction import org.apache.flink.api.common.functions.MapFunction -import org.apache.flink.api.scala.DataSet +import org.apache.flink.api.scala._ import org.apache.flink.util.Collector import org.apache.mahout.flinkbindings._ import org.apache.mahout.flinkbindings.drm._ @@ -43,94 +45,86 @@ import org.apache.mahout.math.scalabindings._ */ object FlinkOpCBind { - def cbind[K: ClassTag](op: OpCbind[K], A: FlinkDrm[K], B: FlinkDrm[K]): FlinkDrm[K] = { + def cbind[K: TypeInformation](op: OpCbind[K], A: FlinkDrm[K], B: FlinkDrm[K]): FlinkDrm[K] = { val n = op.ncol val n1 = op.A.ncol val n2 = op.B.ncol - val classTag = extractRealClassTag(op.A) - val joiner = selector[Vector, Any](classTag.asInstanceOf[ClassTag[Any]]) - - val rowsA = A.asRowWise.ds.asInstanceOf[DrmDataSet[Any]] - val rowsB = B.asRowWise.ds.asInstanceOf[DrmDataSet[Any]] - - val res: DataSet[(Any, Vector)] = - rowsA.coGroup(rowsB).where(joiner).equalTo(joiner) - .`with`(new CoGroupFunction[(_, Vector), (_, Vector), (_, Vector)] { - def coGroup(it1java: Iterable[(_, Vector)], it2java: Iterable[(_, Vector)], - out: Collector[(_, Vector)]): Unit = { - val it1 = Lists.newArrayList(it1java).asScala - val it2 = Lists.newArrayList(it2java).asScala - - if (it1.nonEmpty && it2.nonEmpty) { - val (idx, a) = it1.head - val (_, b) = it2.head - - val result: Vector = if (a.isDense && b.isDense) { - new DenseVector(n) - } else { - new SequentialAccessSparseVector(n) - } - - result(0 until n1) := a - result(n1 until n) := b - - out.collect((idx, result)) - } else if (it1.isEmpty && it2.nonEmpty) { - val (idx, b) = it2.head - val result: Vector = if (b.isDense) { - new DenseVector(n) - } else { - new SequentialAccessSparseVector(n) - } - result(n1 until n) := b - out.collect((idx, result)) - } else if (it1.nonEmpty && it2.isEmpty) { - val (idx, a) = it1.head - val result: Vector = if (a.isDense) { - new DenseVector(n) - } else { - new SequentialAccessSparseVector(n) + implicit val classTag = op.A.keyClassTag + + val rowsA = A.asRowWise.ds + val rowsB = B.asRowWise.ds + + val res: DataSet[(K, Vector)] = + rowsA.coGroup(rowsB).where(0).equalTo(0) { + (left, right) => + (left.toIterable.headOption, right.toIterable.headOption) match { + case (Some((idx, a)), Some((_, b))) => + val result = if (a.isDense && b.isDense) { + new DenseVector(n) + } else { + new SequentialAccessSparseVector(n) + } + + result(0 until n1) := a + result(n1 until n) := b + + (idx, result) + case (Some((idx, a)), None) => + val result: Vector = if (a.isDense) { + new DenseVector(n) + } else { + new SequentialAccessSparseVector(n) + } + result(n1 until n) := a + + (idx, result) + case (None, Some((idx, b))) => + val result: Vector = if (b.isDense) { + new DenseVector(n) + } else { + new SequentialAccessSparseVector(n) + } + result(n1 until n) := b + + (idx, result) + case (None, None) => + throw new RuntimeException("CoGroup should have at least one non-empty input.") } - result(n1 until n) := a - out.collect((idx, result)) - } } - }) new RowsFlinkDrm(res.asInstanceOf[DataSet[(K, Vector)]], ncol=op.ncol) } - def cbindScalar[K: ClassTag](op: OpCbindScalar[K], A: FlinkDrm[K], x: Double): FlinkDrm[K] = { + def cbindScalar[K: TypeInformation](op: OpCbindScalar[K], A: FlinkDrm[K], x: Double): FlinkDrm[K] = { val left = op.leftBind val ds = A.asBlockified.ds - val out = A.asBlockified.ds.map(new MapFunction[(Array[K], Matrix), (Array[K], Matrix)] { - def map(tuple: (Array[K], Matrix)): (Array[K], Matrix) = tuple match { - case (keys, mat) => (keys, cbind(mat, x, left)) - } + implicit val kTag= op.keyClassTag - def cbind(mat: Matrix, x: Double, left: Boolean): Matrix = { - val ncol = mat.ncol - val newMat = mat.like(mat.nrow, ncol + 1) + def cbind(mat: Matrix, x: Double, left: Boolean): Matrix = { + val ncol = mat.ncol + val newMat = mat.like(mat.nrow, ncol + 1) - if (left) { - newMat.zip(mat).foreach { case (newVec, origVec) => - newVec(0) = x - newVec(1 to ncol) := origVec - } - } else { - newMat.zip(mat).foreach { case (newVec, origVec) => - newVec(ncol) = x - newVec(0 to (ncol - 1)) := origVec - } + if (left) { + newMat.zip(mat).foreach { case (newVec, origVec) => + newVec(0) = x + newVec(1 to ncol) := origVec + } + } else { + newMat.zip(mat).foreach { case (newVec, origVec) => + newVec(ncol) = x + newVec(0 to (ncol - 1)) := origVec } - - newMat } - }) + + newMat + } + + val out = A.asBlockified.ds.map { + tuple => (tuple._1, cbind(tuple._2, x, left)) + } new BlockifiedFlinkDrm(out, op.ncol) } - } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpMapBlock.scala ---------------------------------------------------------------------- diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpMapBlock.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpMapBlock.scala index 9530d43..c3918a5 100644 --- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpMapBlock.scala +++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpMapBlock.scala @@ -18,13 +18,10 @@ */ package org.apache.mahout.flinkbindings.blas -import scala.reflect.ClassTag - -import org.apache.flink.api.common.functions.MapFunction -import org.apache.mahout.flinkbindings.drm.BlockifiedFlinkDrm -import org.apache.mahout.flinkbindings.drm.FlinkDrm -import org.apache.mahout.math.Matrix -import org.apache.mahout.math.drm.BlockMapFunc +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.scala._ +import org.apache.mahout.flinkbindings.drm.{BlockifiedFlinkDrm, FlinkDrm} +import org.apache.mahout.math.drm.logical.OpMapBlock import org.apache.mahout.math.scalabindings.RLikeOps._ /** @@ -33,16 +30,19 @@ import org.apache.mahout.math.scalabindings.RLikeOps._ */ object FlinkOpMapBlock { - def apply[S, R: ClassTag](src: FlinkDrm[S], ncol: Int, function: BlockMapFunc[S, R]): FlinkDrm[R] = { - val res = src.asBlockified.ds.map(new MapFunction[(Array[S], Matrix), (Array[R], Matrix)] { - def map(block: (Array[S], Matrix)): (Array[R], Matrix) = { - val out = function(block) - assert(out._2.nrow == block._2.nrow, "block mapping must return same number of rows.") - assert(out._2.ncol == ncol, s"block map must return $ncol number of columns.") - out - } - }) + def apply[S, R: TypeInformation](src: FlinkDrm[S], ncol: Int, operator: OpMapBlock[S,R]): FlinkDrm[R] = { + implicit val rtag = operator.keyClassTag + val bmf = operator.bmf + val ncol = operator.ncol + val res = src.asBlockified.ds.map { + block => + val result = bmf(block) + assert(result._2.nrow == block._2.nrow, "block mapping must return same number of rows.") + assert(result._2.ncol == ncol, s"block map must return $ncol number of columns.") + // printf("Block partition: \n%s\n", block._2) + result + } - new BlockifiedFlinkDrm(res, ncol) + new BlockifiedFlinkDrm[R](res, ncol) } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpRBind.scala ---------------------------------------------------------------------- diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpRBind.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpRBind.scala index 83beaa1..4fa2eaa 100644 --- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpRBind.scala +++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpRBind.scala @@ -18,6 +18,8 @@ */ package org.apache.mahout.flinkbindings.blas +import org.apache.flink.api.common.typeinfo.TypeInformation + import scala.reflect.ClassTag import org.apache.flink.api.scala.DataSet @@ -28,8 +30,9 @@ import org.apache.mahout.math.drm.logical.OpRbind object FlinkOpRBind { - def rbind[K: ClassTag](op: OpRbind[K], A: FlinkDrm[K], B: FlinkDrm[K]): FlinkDrm[K] = { + def rbind[K: TypeInformation](op: OpRbind[K], A: FlinkDrm[K], B: FlinkDrm[K]): FlinkDrm[K] = { // note that indexes of B are already re-arranged prior to executing this code + implicit val kTag = op.keyClassTag val res = A.asRowWise.ds.union(B.asRowWise.ds) new RowsFlinkDrm(res.asInstanceOf[DataSet[(K, Vector)]], ncol = op.ncol) } http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpRowRange.scala ---------------------------------------------------------------------- diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpRowRange.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpRowRange.scala index 6e11892..39f4ceb 100644 --- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpRowRange.scala +++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpRowRange.scala @@ -18,11 +18,9 @@ */ package org.apache.mahout.flinkbindings.blas -import org.apache.flink.api.common.functions.FilterFunction -import org.apache.flink.api.common.functions.MapFunction +import org.apache.flink.api.scala._ import org.apache.mahout.flinkbindings.drm.FlinkDrm import org.apache.mahout.flinkbindings.drm.RowsFlinkDrm -import org.apache.mahout.math.Vector import org.apache.mahout.math.drm.logical.OpRowRange /** @@ -35,17 +33,13 @@ object FlinkOpRowRange { val rowRange = op.rowRange val firstIdx = rowRange.head - val filtered = A.asRowWise.ds.filter(new FilterFunction[(Int, Vector)] { - def filter(tuple: (Int, Vector)): Boolean = tuple match { - case (idx, vec) => rowRange.contains(idx) - } - }) + val filtered = A.asRowWise.ds.filter { + tuple => rowRange.contains(tuple._1) + } - val res = filtered.map(new MapFunction[(Int, Vector), (Int, Vector)] { - def map(tuple: (Int, Vector)): (Int, Vector) = tuple match { - case (idx, vec) => (idx - firstIdx, vec) - } - }) + val res = filtered.map { + tuple => (tuple._1 - firstIdx, tuple._2) + } new RowsFlinkDrm(res, op.ncol) } http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpTimesRightMatrix.scala ---------------------------------------------------------------------- diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpTimesRightMatrix.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpTimesRightMatrix.scala index 989fad1..4e5b1a7 100644 --- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpTimesRightMatrix.scala +++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpTimesRightMatrix.scala @@ -18,17 +18,16 @@ */ package org.apache.mahout.flinkbindings.blas -import scala.reflect.ClassTag - import org.apache.flink.api.common.functions.RichMapFunction +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.TypeExtractor +import org.apache.flink.api.scala._ import org.apache.flink.configuration.Configuration -import org.apache.mahout.flinkbindings.drm.BlockifiedFlinkDrm -import org.apache.mahout.flinkbindings.drm.FlinkDrm -import org.apache.mahout.math.Matrix +import org.apache.mahout.flinkbindings.drm.{BlockifiedFlinkDrm, FlinkDrm} import org.apache.mahout.math.drm.logical.OpTimesRightMatrix import org.apache.mahout.math.scalabindings.RLikeOps._ - -import org.apache.flink.api.scala._ +import org.apache.mahout.math.scalabindings._ +import org.apache.mahout.math.{DenseMatrix, Matrix} /** * Implementation is taken from Spark's OpTimesRightMatrix: @@ -36,20 +35,51 @@ import org.apache.flink.api.scala._ */ object FlinkOpTimesRightMatrix { - def drmTimesInCore[K: ClassTag](op: OpTimesRightMatrix[K], A: FlinkDrm[K], inCoreB: Matrix): FlinkDrm[K] = { + def drmTimesInCore[K: TypeInformation](op: OpTimesRightMatrix[K], A: FlinkDrm[K], inCoreB: Matrix): FlinkDrm[K] = { implicit val ctx = A.context + implicit val kTag = op.keyClassTag + + - val singletonDataSetB = ctx.env.fromElements(inCoreB) + /* HACK: broadcasting the matrix using Flink's .withBroadcastSet(singletonDataSetB) on a matrix causes a backend Kryo + * Issue resulkting in a stackOverflow error. + * + * Quick fix is to instead break the matrix down into a list of rows and then rebuild it on the back end + * + * TODO: this is obviously very inefficient... need to use the correct broadcast on the matrix itself. + */ + + // val singletonDataSetB = ctx.env.fromElements(inCoreB) + + + // val inCoreBcastB = FlinkEngine.drmBroadcast(inCoreB) + // val singletonDataSetB = ctx.env.fromElements(inCoreB) + + val rows = (0 until inCoreB.nrow).map(i => (i, inCoreB(i, ::))) + val dataSetType = TypeExtractor.getForObject(rows.head) + val singletonDataSetB = ctx.env.fromCollection(rows) val res = A.asBlockified.ds.map(new RichMapFunction[(Array[K], Matrix), (Array[K], Matrix)] { var inCoreB: Matrix = null override def open(params: Configuration): Unit = { val runtime = this.getRuntimeContext - val dsB: java.util.List[Matrix] = runtime.getBroadcastVariable("matrix") - inCoreB = dsB.get(0) - } + //val dsB: java.util.List[Matrix] + val dsB: java.util.List[(Int, org.apache.mahout.math.Vector)] = runtime.getBroadcastVariable("matrix") + val m = dsB.size() + val n = dsB.get(0)._2.size + val isDense = dsB.get(0)._2.isDense + inCoreB = isDense match { + case true => new DenseMatrix(m, n) + case false => new DenseMatrix(m, n) + } + for (i <- 0 until m) { + inCoreB(i, ::) := dsB.get(i)._2 + } + + } + override def map(tuple: (Array[K], Matrix)): (Array[K], Matrix) = tuple match { case (keys, block_A) => (keys, block_A %*% inCoreB) } http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/package.scala ---------------------------------------------------------------------- diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/package.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/package.scala deleted file mode 100644 index 27f552c..0000000 --- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/package.scala +++ /dev/null @@ -1,60 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.mahout.flinkbindings - -import scala.reflect.ClassTag -import org.apache.flink.api.java.functions.KeySelector -import org.apache.flink.api.java.typeutils.ResultTypeQueryable -import org.apache.flink.api.scala.createTypeInformation -import org.apache.flink.api.common.typeinfo.TypeInformation - -package object blas { - - // TODO: remove it once figure out how to make Flink accept interfaces (Vector here) - def selector[V, K: ClassTag]: KeySelector[(K, V), K] = { - val tag = implicitly[ClassTag[K]] - if (tag.runtimeClass.equals(classOf[Int])) { - tuple_1_int.asInstanceOf[KeySelector[(K, V), K]] - } else if (tag.runtimeClass.equals(classOf[Long])) { - tuple_1_long.asInstanceOf[KeySelector[(K, V), K]] - } else if (tag.runtimeClass.equals(classOf[String])) { - tuple_1_string.asInstanceOf[KeySelector[(K, V), K]] - } else { - throw new IllegalArgumentException(s"index type $tag is not supported") - } - } - - private def tuple_1_int[K: ClassTag] = new KeySelector[(Int, _), Int] - with ResultTypeQueryable[Int] { - def getKey(tuple: Tuple2[Int, _]): Int = tuple._1 - def getProducedType: TypeInformation[Int] = createTypeInformation[Int] - } - - private def tuple_1_long[K: ClassTag] = new KeySelector[(Long, _), Long] - with ResultTypeQueryable[Long] { - def getKey(tuple: Tuple2[Long, _]): Long = tuple._1 - def getProducedType: TypeInformation[Long] = createTypeInformation[Long] - } - - private def tuple_1_string[K: ClassTag] = new KeySelector[(String, _), String] - with ResultTypeQueryable[String] { - def getKey(tuple: Tuple2[String, _]): String = tuple._1 - def getProducedType: TypeInformation[String] = createTypeInformation[String] - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala ---------------------------------------------------------------------- diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala index 96d57d2..84b327a 100644 --- a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala +++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala @@ -19,13 +19,13 @@ package org.apache.mahout.flinkbindings.drm import org.apache.flink.api.common.functions.{MapFunction, ReduceFunction} -import org.apache.flink.api.java.tuple.Tuple2 import org.apache.flink.api.scala._ import org.apache.flink.api.scala.hadoop.mapred.HadoopOutputFormat import org.apache.hadoop.io.{IntWritable, LongWritable, Text, Writable} import org.apache.hadoop.mapred.{FileOutputFormat, JobConf, SequenceFileOutputFormat} import org.apache.mahout.flinkbindings.{DrmDataSet, _} -import org.apache.mahout.math.{DenseMatrix, Matrix, SparseMatrix, Vector, VectorWritable} +import org.apache.mahout.math._ +import org.apache.mahout.math.drm.CacheHint._ import org.apache.mahout.math.drm.{CacheHint, CheckpointedDrm, DistributedContext, DrmTuple, _} import org.apache.mahout.math.scalabindings.RLikeOps._ import org.apache.mahout.math.scalabindings._ @@ -37,6 +37,7 @@ import scala.util.Random class CheckpointedFlinkDrm[K: ClassTag](val ds: DrmDataSet[K], private var _nrow: Long = CheckpointedFlinkDrm.UNKNOWN, private var _ncol: Int = CheckpointedFlinkDrm.UNKNOWN, + override val cacheHint: CacheHint = CacheHint.NONE, override protected[mahout] val partitioningTag: Long = Random.nextLong(), private var _canHaveMissingRows: Boolean = false ) extends CheckpointedDrm[K] { @@ -79,7 +80,10 @@ class CheckpointedFlinkDrm[K: ClassTag](val ds: DrmDataSet[K], protected[mahout] def canHaveMissingRows: Boolean = _canHaveMissingRows - def checkpoint(cacheHint: CacheHint.CacheHint): CheckpointedDrm[K] = this + def checkpoint(cacheHint: CacheHint.CacheHint): CheckpointedDrm[K] = { + + this + } def collect: Matrix = { val data = ds.collect() @@ -123,21 +127,76 @@ class CheckpointedFlinkDrm[K: ClassTag](val ds: DrmDataSet[K], def dfsWrite(path: String): Unit = { val env = ds.getExecutionEnvironment - val keyTag = implicitly[ClassTag[K]] - val convertKey = keyToWritableFunc(keyTag) + // ds.map is not picking up the correct runtime value of tuple._1 + // WritableType info is throwing an exception + // when asserting that the key is not an actual Writable + // rather a subclass - val writableDataset = ds.map(new MapFunction[(K, Vector), (Writable, VectorWritable)] { - def map(tuple: (K, Vector)): Tuple2[Writable, VectorWritable] = tuple match { - case (idx, vec) => new Tuple2(convertKey(idx), new VectorWritable(vec)) - } - }) +// val keyTag = implicitly[ClassTag[K]] +// def convertKey = keyToWritableFunc(keyTag) +// val writableDataset = ds.map { +// tuple => (convertKey(tuple._1), new VectorWritable(tuple._2)) +// } + + + // test output with IntWritable Key. VectorWritable is not a problem, +// val writableDataset = ds.map(new MapFunction[DrmTuple[K], (IntWritable, VectorWritable)] { +// def map(tuple: DrmTuple[K]): (IntWritable, VectorWritable) = +// (new IntWritable(1), new VectorWritable(tuple._2)) +// }) + + + val keyTag = implicitly[ClassTag[K]] val job = new JobConf - val sequenceFormat = new SequenceFileOutputFormat[Writable, VectorWritable] FileOutputFormat.setOutputPath(job, new org.apache.hadoop.fs.Path(path)) - val hadoopOutput = new HadoopOutputFormat(sequenceFormat, job) - writableDataset.output(hadoopOutput) + // explicitly define all Writable Subclasses for ds.map() keys + // as well as the SequenceFileOutputFormat paramaters + if (keyTag.runtimeClass == classOf[Int]) { + // explicitly map into Int keys + implicit val typeInformation = createTypeInformation[(IntWritable,VectorWritable)] + val writableDataset = ds.map(new MapFunction[DrmTuple[K], (IntWritable, VectorWritable)] { + def map(tuple: DrmTuple[K]): (IntWritable, VectorWritable) = + (new IntWritable(tuple._1.asInstanceOf[Int]), new VectorWritable(tuple._2)) + }) + + // setup sink for IntWritable + job.setOutputKeyClass(classOf[IntWritable]) + job.setOutputValueClass(classOf[VectorWritable]) + val sequenceFormat = new SequenceFileOutputFormat[IntWritable, VectorWritable] + val hadoopOutput = new HadoopOutputFormat(sequenceFormat, job) + writableDataset.output(hadoopOutput) + + } else if (keyTag.runtimeClass == classOf[String]) { + // explicitly map into Text keys + val writableDataset = ds.map(new MapFunction[DrmTuple[K], (Text, VectorWritable)] { + def map(tuple: DrmTuple[K]): (Text, VectorWritable) = + (new Text(tuple._1.asInstanceOf[String]), new VectorWritable(tuple._2)) + }) + + // setup sink for Text + job.setOutputKeyClass(classOf[Text]) + job.setOutputValueClass(classOf[VectorWritable]) + val sequenceFormat = new SequenceFileOutputFormat[Text, VectorWritable] + val hadoopOutput = new HadoopOutputFormat(sequenceFormat, job) + writableDataset.output(hadoopOutput) + + } else if (keyTag.runtimeClass == classOf[Long]) { + // explicitly map into Long keys + val writableDataset = ds.map(new MapFunction[DrmTuple[K], (LongWritable, VectorWritable)] { + def map(tuple: DrmTuple[K]): (LongWritable, VectorWritable) = + (new LongWritable(tuple._1.asInstanceOf[Long]), new VectorWritable(tuple._2)) + }) + + // setup sink for LongWritable + job.setOutputKeyClass(classOf[LongWritable]) + job.setOutputValueClass(classOf[VectorWritable]) + val sequenceFormat = new SequenceFileOutputFormat[LongWritable, VectorWritable] + val hadoopOutput = new HadoopOutputFormat(sequenceFormat, job) + writableDataset.output(hadoopOutput) + + } else throw new IllegalArgumentException("Do not know how to convert class tag %s to Writable.".format(keyTag)) env.execute(s"dfsWrite($path)") } @@ -148,9 +207,10 @@ class CheckpointedFlinkDrm[K: ClassTag](val ds: DrmDataSet[K], } else if (keyTag.runtimeClass == classOf[String]) { (x: K) => new Text(x.asInstanceOf[String]) } else if (keyTag.runtimeClass == classOf[Long]) { - (x: K) => new LongWritable(x.asInstanceOf[Long]) - } else if (classOf[Writable].isAssignableFrom(keyTag.runtimeClass)) { - (x: K) => x.asInstanceOf[Writable] + (x: K) => new LongWritable(x.asInstanceOf[Long]) + // WritableTypeInfo will reject the base Writable class +// } else if (classOf[Writable].isAssignableFrom(keyTag.runtimeClass)) { +// (x: K) => x.asInstanceOf[Writable] } else { throw new IllegalArgumentException("Do not know how to convert class tag %s to Writable.".format(keyTag)) } http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrmOps.scala ---------------------------------------------------------------------- diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrmOps.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrmOps.scala index a037d44..e65c43d 100644 --- a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrmOps.scala +++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrmOps.scala @@ -31,5 +31,4 @@ class CheckpointedFlinkDrmOps[K: ClassTag](drm: CheckpointedDrm[K]) { /** Flink matrix customization exposure */ def dataset = flinkDrm.ds - } http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/FlinkDrm.scala ---------------------------------------------------------------------- diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/FlinkDrm.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/FlinkDrm.scala index c9c1b2c..aea62fa 100644 --- a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/FlinkDrm.scala +++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/FlinkDrm.scala @@ -18,18 +18,13 @@ */ package org.apache.mahout.flinkbindings.drm -import java.lang.Iterable - -import org.apache.flink.api.common.functions.{FlatMapFunction, MapPartitionFunction} +import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.scala._ -import org.apache.flink.util.Collector import org.apache.mahout.flinkbindings.{BlockifiedDrmDataSet, DrmDataSet, FlinkDistributedContext, wrapContext} -import org.apache.mahout.math.drm.DrmTuple import org.apache.mahout.math.scalabindings.RLikeOps._ import org.apache.mahout.math.scalabindings._ import org.apache.mahout.math.{DenseMatrix, Matrix, SparseRowMatrix} -import scala.collection.JavaConverters.iterableAsScalaIterableConverter import scala.reflect.ClassTag trait FlinkDrm[K] { @@ -43,7 +38,7 @@ trait FlinkDrm[K] { def classTag: ClassTag[K] } -class RowsFlinkDrm[K: ClassTag](val ds: DrmDataSet[K], val ncol: Int) extends FlinkDrm[K] { +class RowsFlinkDrm[K: TypeInformation: ClassTag](val ds: DrmDataSet[K], val ncol: Int) extends FlinkDrm[K] { def executionEnvironment = ds.getExecutionEnvironment def context: FlinkDistributedContext = ds.getExecutionEnvironment @@ -54,27 +49,27 @@ class RowsFlinkDrm[K: ClassTag](val ds: DrmDataSet[K], val ncol: Int) extends Fl val ncolLocal = ncol val classTag = implicitly[ClassTag[K]] - val parts = ds.mapPartition(new MapPartitionFunction[DrmTuple[K], (Array[K], Matrix)] { - def mapPartition(values: Iterable[DrmTuple[K]], out: Collector[(Array[K], Matrix)]): Unit = { - val it = values.asScala.seq + val parts = ds.mapPartition { + values => + val (keys, vectors) = values.toIterable.unzip - val (keys, vectors) = it.unzip if (vectors.nonEmpty) { - val isDense = vectors.head.isDense - - if (isDense) { + val vector = vectors.head + val matrix: Matrix = if (vector.isDense) { val matrix = new DenseMatrix(vectors.size, ncolLocal) vectors.zipWithIndex.foreach { case (vec, idx) => matrix(idx, ::) := vec } - out.collect((keys.toArray(classTag), matrix)) + matrix } else { - val matrix = new SparseRowMatrix(vectors.size, ncolLocal, vectors.toArray) - out.collect((keys.toArray(classTag), matrix)) + new SparseRowMatrix(vectors.size, ncolLocal, vectors.toArray) } + + Seq((keys.toArray(classTag), matrix)) + } else { + Seq() } - } - }) + } - new BlockifiedFlinkDrm(parts, ncol) + new BlockifiedFlinkDrm[K](parts, ncol) } def asRowWise = this @@ -83,26 +78,31 @@ class RowsFlinkDrm[K: ClassTag](val ds: DrmDataSet[K], val ncol: Int) extends Fl } -class BlockifiedFlinkDrm[K: ClassTag](val ds: BlockifiedDrmDataSet[K], val ncol: Int) extends FlinkDrm[K] { +class BlockifiedFlinkDrm[K: TypeInformation: ClassTag](val ds: BlockifiedDrmDataSet[K], val ncol: Int) extends FlinkDrm[K] { + def executionEnvironment = ds.getExecutionEnvironment def context: FlinkDistributedContext = ds.getExecutionEnvironment + def isBlockified = true def asBlockified = this def asRowWise = { - val out = ds.flatMap(new FlatMapFunction[(Array[K], Matrix), DrmTuple[K]] { - def flatMap(tuple: (Array[K], Matrix), out: Collector[DrmTuple[K]]): Unit = tuple match { - case (keys, block) => keys.view.zipWithIndex.foreach { - case (key, idx) => - out.collect((key, block(idx, ::))) + val out = ds.flatMap { + tuple => + val keys = tuple._1 + val block = tuple._2 + + keys.view.zipWithIndex.map { + case (key, idx) => (key, block(idx, ::)) } - } - }) - new RowsFlinkDrm(out, ncol) + } + + new RowsFlinkDrm[K](out, ncol) } def classTag = implicitly[ClassTag[K]] + } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/flink/src/main/scala/org/apache/mahout/flinkbindings/io/DrmMetadata.scala ---------------------------------------------------------------------- diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/io/DrmMetadata.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/io/DrmMetadata.scala index 24f298d..83ede9a 100644 --- a/flink/src/main/scala/org/apache/mahout/flinkbindings/io/DrmMetadata.scala +++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/io/DrmMetadata.scala @@ -23,7 +23,7 @@ import org.apache.hadoop.io._ import java.util.Arrays /** - * Copied from /spark/src/main/scala/org/apache/mahout/common + * Flink DRM Metadata */ class DrmMetadata( @@ -40,13 +40,13 @@ class DrmMetadata( * @param keyW2ValFunc: Conversion from Writable to value type of the DRM key */ val (keyClassTag: ClassTag[_], unwrapKeyFunction: ((Writable) => Any)) = keyTypeWritable match { - case cz if (cz == classOf[IntWritable]) => ClassTag.Int -> w2int _ - case cz if (cz == classOf[LongWritable]) => ClassTag.Long -> w2long _ - case cz if (cz == classOf[DoubleWritable]) => ClassTag.Double -> w2double _ - case cz if (cz == classOf[FloatWritable]) => ClassTag.Float -> w2float _ - case cz if (cz == classOf[Text]) => ClassTag(classOf[String]) -> w2string _ - case cz if (cz == classOf[BooleanWritable]) => ClassTag(classOf[Boolean]) -> w2bool _ - case cz if (cz == classOf[BytesWritable]) => ClassTag(classOf[Array[Byte]]) -> w2bytes _ + case cz if cz == classOf[IntWritable] => ClassTag.Int -> w2int _ + case cz if cz == classOf[LongWritable] => ClassTag.Long -> w2long _ + case cz if cz == classOf[DoubleWritable] => ClassTag.Double -> w2double _ + case cz if cz == classOf[FloatWritable] => ClassTag.Float -> w2float _ + case cz if cz == classOf[Text] => ClassTag(classOf[String]) -> w2string _ + case cz if cz == classOf[BooleanWritable] => ClassTag(classOf[Boolean]) -> w2bool _ + case cz if cz == classOf[BytesWritable] => ClassTag(classOf[Array[Byte]]) -> w2bytes _ case _ => throw new IllegalArgumentException(s"Unsupported DRM key type:${keyTypeWritable.getName}") } http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/flink/src/main/scala/org/apache/mahout/flinkbindings/io/HDFSPathSearch.scala ---------------------------------------------------------------------- diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/io/HDFSPathSearch.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/io/HDFSPathSearch.scala index e77143e..b9d9f1b 100644 --- a/flink/src/main/scala/org/apache/mahout/flinkbindings/io/HDFSPathSearch.scala +++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/io/HDFSPathSearch.scala @@ -62,17 +62,17 @@ case class HDFSPathSearch(pathURI: String, filePattern: String = "", recursive: val seed = fs.getFileStatus(new Path(dir)) var f: String = files - if (seed.isDir) { + if (seed.isDirectory) { val fileStatuses: Array[FileStatus] = fs.listStatus(new Path(dir)) for (fileStatus <- fileStatuses) { if (fileStatus.getPath().getName().matches(filePattern) - && !fileStatus.isDir) { + && !fileStatus.isDirectory) { // found a file if (fileStatus.getLen() != 0) { // file is not empty f = f + fileStatus.getPath.toUri.toString + "," } - } else if (fileStatus.isDir && recursive) { + } else if (fileStatus.isDirectory && recursive) { f = findFiles(fileStatus.getPath.toString, filePattern, f) } } http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/flink/src/main/scala/org/apache/mahout/flinkbindings/io/Hadoop1HDFSUtil.scala ---------------------------------------------------------------------- diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/io/Hadoop1HDFSUtil.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/io/Hadoop1HDFSUtil.scala deleted file mode 100644 index 6581721..0000000 --- a/flink/src/main/scala/org/apache/mahout/flinkbindings/io/Hadoop1HDFSUtil.scala +++ /dev/null @@ -1,86 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.mahout.flinkbindings.io - -import org.apache.hadoop.io.{ Writable, SequenceFile } -import org.apache.hadoop.fs.{ FileSystem, Path } -import org.apache.hadoop.conf.Configuration -import collection._ -import JavaConversions._ - -/** - * Deprecated Hadoop 1 api which we currently explicitly import via Mahout dependencies. May not work - * with Hadoop 2.0 - * - * Copied from /spark/src/main/scala/org/apache/mahout/common - */ -object Hadoop1HDFSUtil extends HDFSUtil { - - /** - * Read the header of a sequence file and determine the Key and Value type - * @param path - * @return - */ - def readDrmHeader(path: String): DrmMetadata = { - val dfsPath = new Path(path) - val fs = dfsPath.getFileSystem(new Configuration()) - - val partFilePath: Path = fs.listStatus(dfsPath) - - // Filter out anything starting with . - .filter { s => - !s.getPath.getName.startsWith("\\.") && !s.getPath.getName.startsWith("_") && !s.isDir - } - - // Take path - .map(_.getPath) - - // Take only one, if any - .headOption - - // Require there's at least one partition file found. - .getOrElse { - throw new IllegalArgumentException(s"No partition files found in ${dfsPath.toString}.") - } - - val reader = new SequenceFile.Reader(fs, partFilePath, fs.getConf) - try { - new DrmMetadata( - keyTypeWritable = reader.getKeyClass.asSubclass(classOf[Writable]), - valueTypeWritable = reader.getValueClass.asSubclass(classOf[Writable])) - } finally { - reader.close() - } - - } - - /** - * Delete a path from the filesystem - * @param path - */ - def delete(path: String) { - val dfsPath = new Path(path) - val fs = dfsPath.getFileSystem(new Configuration()) - - if (fs.exists(dfsPath)) { - fs.delete(dfsPath, true) - } - } - -}
