http://git-wip-us.apache.org/repos/asf/mahout/blob/99a5358f/flink/pom.xml ---------------------------------------------------------------------- diff --git a/flink/pom.xml b/flink/pom.xml deleted file mode 100644 index 4d21166..0000000 --- a/flink/pom.xml +++ /dev/null @@ -1,237 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> - -<!-- - Licensed to the Apache Software Foundation (ASF) under one or more - contributor license agreements. See the NOTICE file distributed with - this work for additional information regarding copyright ownership. - The ASF licenses this file to You under the Apache License, Version 2.0 - (the "License"); you may not use this file except in compliance with - the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. ---> - -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> - <modelVersion>4.0.0</modelVersion> - - <parent> - <groupId>org.apache.mahout</groupId> - <artifactId>mahout</artifactId> - <version>0.13.0-SNAPSHOT</version> - <relativePath>../pom.xml</relativePath> - </parent> - - <artifactId>mahout-flink_${scala.compat.version}</artifactId> - <name>Mahout Flink bindings</name> - <description> - Mahout Bindings for Apache Flink - </description> - - <packaging>jar</packaging> - - <build> - <plugins> - <!-- copy jars to top directory, which is MAHOUT_HOME --> - <plugin> - <artifactId>maven-antrun-plugin</artifactId> - <version>1.4</version> - <executions> - <execution> - <id>copy</id> - <phase>package</phase> - <configuration> - <tasks> - <copy file="target/mahout-flink_${scala.compat.version}-${version}.jar" tofile="../mahout-flink_${scala.compat.version}-${version}.jar"/> - </tasks> - </configuration> - <goals> - <goal>run</goal> - </goals> - </execution> - </executions> - </plugin> - <plugin> - <artifactId>maven-javadoc-plugin</artifactId> - </plugin> - - <plugin> - <artifactId>maven-source-plugin</artifactId> - </plugin> - - <plugin> - <groupId>net.alchim31.maven</groupId> - <artifactId>scala-maven-plugin</artifactId> - <executions> - <execution> - <id>add-scala-sources</id> - <phase>initialize</phase> - <goals> - <goal>add-source</goal> - </goals> - </execution> - <execution> - <id>scala-compile</id> - <phase>process-resources</phase> - <goals> - <goal>compile</goal> - </goals> - </execution> - <execution> - <id>scala-test-compile</id> - <phase>process-test-resources</phase> - <goals> - <goal>testCompile</goal> - </goals> - </execution> - </executions> - </plugin> - - <!--this is what scalatest recommends to do to enable scala tests --> - - <!-- disable surefire --> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-surefire-plugin</artifactId> - <configuration> - <systemPropertyVariables> - <mahout.home>${project.build.directory}</mahout.home> - </systemPropertyVariables> - <skipTests>true</skipTests> - </configuration> - </plugin> - - <!-- enable scalatest --> - <plugin> - <groupId>org.scalatest</groupId> - <artifactId>scalatest-maven-plugin</artifactId> - <executions> - <execution> - <id>test</id> - <goals> - <goal>test</goal> - </goals> - </execution> - </executions> - <configuration> - <systemProperties> - <mahout.home>${project.build.directory}</mahout.home> - </systemProperties> - <argLine>-Xmx4g</argLine> - </configuration> - </plugin> - <!-- remove jars from top directory on clean --> - <plugin> - <artifactId>maven-clean-plugin</artifactId> - <version>3.0.0</version> - <configuration> - <filesets> - <fileset> - <directory>../</directory> - <includes> - <include>mahout-flink*.jar</include> - </includes> - <followSymlinks>false</followSymlinks> - </fileset> - </filesets> - </configuration> - </plugin> - </plugins> - </build> - - <dependencies> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-runtime_${scala.compat.version}</artifactId> - <version>${flink.version}</version> - </dependency> - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-scala_${scala.compat.version}</artifactId> - <version>${flink.version}</version> - </dependency> - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-java</artifactId> - <version>${flink.version}</version> - </dependency> - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-core</artifactId> - <version>${flink.version}</version> - </dependency> - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-clients_${scala.compat.version}</artifactId> - <version>${flink.version}</version> - </dependency> - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-test-utils_${scala.compat.version}</artifactId> - <version>${flink.version}</version> - </dependency> - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-tests_${scala.compat.version}</artifactId> - <version>${flink.version}</version> - </dependency> - - <dependency> - <groupId>org.apache.mahout</groupId> - <artifactId>mahout-math-scala_${scala.compat.version}</artifactId> - </dependency> - - <dependency> - <groupId>org.bytedeco</groupId> - <artifactId>javacpp</artifactId> - <version>1.2.2</version> - </dependency> - - <!-- enforce current version of kryo as of 0.10.1--> - <dependency> - <groupId>com.esotericsoftware.kryo</groupId> - <artifactId>kryo</artifactId> - <version>2.24.0</version> - </dependency> - - <dependency> - <groupId>org.apache.mahout</groupId> - <artifactId>mahout-hdfs</artifactId> - <exclusions> - <exclusion> - <groupId>asm</groupId> - <artifactId>asm</artifactId> - </exclusion> - </exclusions> - </dependency> - - <dependency> - <groupId>org.apache.mahout</groupId> - <artifactId>mahout-math-scala_${scala.compat.version}</artifactId> - <classifier>tests</classifier> - <scope>test</scope> - </dependency> - - <!-- 3rd-party --> - <dependency> - <groupId>com.google.guava</groupId> - <artifactId>guava</artifactId> - </dependency> - - <!-- tests --> - <dependency> - <groupId>junit</groupId> - <artifactId>junit</artifactId> - </dependency> - <dependency> - <groupId>org.scalatest</groupId> - <artifactId>scalatest_${scala.compat.version}</artifactId> - </dependency> - </dependencies> -</project>
http://git-wip-us.apache.org/repos/asf/mahout/blob/99a5358f/flink/src/main/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/flink/src/main/resources/log4j.properties b/flink/src/main/resources/log4j.properties deleted file mode 100644 index 073a00d..0000000 --- a/flink/src/main/resources/log4j.properties +++ /dev/null @@ -1,8 +0,0 @@ -# Root logger option -log4j.rootLogger=info, stdout - -# Direct log messages to stdout -log4j.appender.stdout=org.apache.log4j.ConsoleAppender -log4j.appender.stdout.target=System.out -log4j.appender.stdout.layout=org.apache.log4j.PatternLayout -log4j.appender.stdout.layout.conversionPattern=%d{ABSOLUTE} %5p %t %c{1}:%M:%L - %m%n \ No newline at end of file http://git-wip-us.apache.org/repos/asf/mahout/blob/99a5358f/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkByteBCast.scala ---------------------------------------------------------------------- diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkByteBCast.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkByteBCast.scala deleted file mode 100644 index 5cdfb79..0000000 --- a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkByteBCast.scala +++ /dev/null @@ -1,90 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.mahout.flinkbindings - -import org.apache.mahout.math.Matrix -import org.apache.mahout.math.MatrixWritable -import org.apache.mahout.math.Vector -import org.apache.mahout.math.VectorWritable -import org.apache.mahout.math.drm.BCast - -import com.google.common.io.ByteStreams - -/** - * FlinkByteBCast wraps vector/matrix objects, represents them as byte arrays, and when - * it's used in UDFs, they are serialized using standard Java serialization along with - * UDFs (as a part of closure) and broadcasted to worker nodes. - * - * There should be a smarter way of doing it with some macro and then rewriting the UDF and - * appending `withBroadcastSet` to flink dataSet pipeline, but it's not implemented at the moment. - */ -class FlinkByteBCast[T](private val arr: Array[Byte]) extends BCast[T] with Serializable { - - private lazy val _value = { - val stream = ByteStreams.newDataInput(arr) - val streamType = stream.readInt() - - if (streamType == FlinkByteBCast.StreamTypeVector) { - val writeable = new VectorWritable() - writeable.readFields(stream) - // printf("broadcastValue: \n%s\n",writeable.get.asInstanceOf[T]) - writeable.get.asInstanceOf[T] - } else if (streamType == FlinkByteBCast.StreamTypeMatrix) { - val writeable = new MatrixWritable() - writeable.readFields(stream) - // printf("broadcastValue: \n%s\n",writeable.get.asInstanceOf[T]) - writeable.get.asInstanceOf[T] - } else { - throw new IllegalArgumentException(s"unexpected type tag $streamType") - } - - } - - override def value: T = _value - - override def close: Unit = { - // nothing to close - } - -} - -object FlinkByteBCast { - - val StreamTypeVector = 0x0000 - val StreamTypeMatrix = 0xFFFF - - def wrap(v: Vector): FlinkByteBCast[Vector] = { - val writeable = new VectorWritable(v) - val dataOutput = ByteStreams.newDataOutput() - dataOutput.writeInt(StreamTypeVector) - writeable.write(dataOutput) - val array = dataOutput.toByteArray() - new FlinkByteBCast[Vector](array) - } - - def wrap(m: Matrix): FlinkByteBCast[Matrix] = { - val writeable = new MatrixWritable(m) - val dataOutput = ByteStreams.newDataOutput() - dataOutput.writeInt(StreamTypeMatrix) - writeable.write(dataOutput) - val array = dataOutput.toByteArray() - new FlinkByteBCast[Matrix](array) - } - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/mahout/blob/99a5358f/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkDistributedContext.scala ---------------------------------------------------------------------- diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkDistributedContext.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkDistributedContext.scala deleted file mode 100644 index 7a61ee6..0000000 --- a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkDistributedContext.scala +++ /dev/null @@ -1,50 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.mahout.flinkbindings - -import org.apache.flink.api.scala.ExecutionEnvironment -import org.apache.flink.configuration.GlobalConfiguration -import org.apache.mahout.math.drm.DistributedContext -import org.apache.mahout.math.drm.DistributedEngine - -class FlinkDistributedContext(val env: ExecutionEnvironment) extends DistributedContext { - - val mahoutHome = getMahoutHome() - - GlobalConfiguration.loadConfiguration(mahoutHome + "/src/conf/flink-config.yaml") - - val conf = GlobalConfiguration.getConfiguration - - var degreeOfParallelism: Int = 0 - - if (conf != null) { - degreeOfParallelism = conf.getInteger("parallelism.default", Runtime.getRuntime.availableProcessors) - } else { - degreeOfParallelism = Runtime.getRuntime.availableProcessors - } - - env.setParallelism(degreeOfParallelism) - - val engine: DistributedEngine = FlinkEngine - - override def close() { - // TODO - } - -} http://git-wip-us.apache.org/repos/asf/mahout/blob/99a5358f/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala ---------------------------------------------------------------------- diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala deleted file mode 100644 index f1d23b2..0000000 --- a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala +++ /dev/null @@ -1,416 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.mahout.flinkbindings - -import org.apache.flink.api.common.functions.MapFunction -import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.java.typeutils.TypeExtractor -import org.apache.flink.api.scala._ -import org.apache.flink.api.scala.utils.DataSetUtils -import org.apache.hadoop.io.{IntWritable, LongWritable, Text} -import org.apache.mahout.flinkbindings.blas._ -import org.apache.mahout.flinkbindings.drm._ -import org.apache.mahout.flinkbindings.io.{HDFSUtil, Hadoop2HDFSUtil} -import org.apache.mahout.math._ -import org.apache.mahout.math.drm._ -import org.apache.mahout.math.drm.logical._ -import org.apache.mahout.math.indexeddataset.{BiDictionary, IndexedDataset, Schema} -import org.apache.mahout.math.scalabindings.RLikeOps._ -import org.apache.mahout.math.scalabindings._ - -import scala.collection.JavaConversions._ -import scala.reflect._ - -object FlinkEngine extends DistributedEngine { - - // By default, use Hadoop 2 utils - var hdfsUtils: HDFSUtil = Hadoop2HDFSUtil - - /** - * Load DRM from hdfs (as in Mahout DRM format). - * - * @param path The DFS path to load from - * @param parMin Minimum parallelism after load (equivalent to #par(min=...)). - */ - override def drmDfsRead(path: String, parMin: Int = 1) - (implicit dc: DistributedContext): CheckpointedDrm[_] = { - - // Require that context is actually Flink context. - require(dc.isInstanceOf[FlinkDistributedContext], "Supplied context must be for the Flink backend.") - - // Extract the Flink Environment variable - implicit val env = dc.asInstanceOf[FlinkDistributedContext].env - - // set the parallelism of the env to parMin - env.setParallelism(parMin) - - // get the header of a SequenceFile in the path - val metadata = hdfsUtils.readDrmHeader(path + "//") - - val keyClass: Class[_] = metadata.keyTypeWritable - - // from the header determine which function to use to unwrap the key - val unwrapKey = metadata.unwrapKeyFunction - - // Map to the correct DrmLike based on the metadata information - if (metadata.keyClassTag == ClassTag.Int) { - val ds = env.readSequenceFile(classOf[IntWritable], classOf[VectorWritable], path) - - val res = ds.map(new MapFunction[(IntWritable, VectorWritable), (Int, Vector)] { - def map(tuple: (IntWritable, VectorWritable)): (Int, Vector) = { - (unwrapKey(tuple._1).asInstanceOf[Int], tuple._2.get()) - } - }) - datasetWrap(res)(metadata.keyClassTag.asInstanceOf[ClassTag[Int]]) - } else if (metadata.keyClassTag == ClassTag.Long) { - val ds = env.readSequenceFile(classOf[LongWritable], classOf[VectorWritable], path) - - val res = ds.map(new MapFunction[(LongWritable, VectorWritable), (Long, Vector)] { - def map(tuple: (LongWritable, VectorWritable)): (Long, Vector) = { - (unwrapKey(tuple._1).asInstanceOf[Long], tuple._2.get()) - } - }) - datasetWrap(res)(metadata.keyClassTag.asInstanceOf[ClassTag[Long]]) - } else if (metadata.keyClassTag == ClassTag(classOf[String])) { - val ds = env.readSequenceFile(classOf[Text], classOf[VectorWritable], path) - - val res = ds.map(new MapFunction[(Text, VectorWritable), (String, Vector)] { - def map(tuple: (Text, VectorWritable)): (String, Vector) = { - (unwrapKey(tuple._1).asInstanceOf[String], tuple._2.get()) - } - }) - datasetWrap(res)(metadata.keyClassTag.asInstanceOf[ClassTag[String]]) - } else throw new IllegalArgumentException(s"Unsupported DRM key type:${keyClass.getName}") - - } - - override def indexedDatasetDFSRead(src: String, schema: Schema, existingRowIDs: Option[BiDictionary]) - (implicit sc: DistributedContext): IndexedDataset = ??? - - override def indexedDatasetDFSReadElements(src: String, schema: Schema, existingRowIDs: Option[BiDictionary]) - (implicit sc: DistributedContext): IndexedDataset = ??? - - - /** - * Perform default expression rewrite. Return physical plan that we can pass to exec(). <P> - * - * A particular physical engine implementation may choose to either use or not use these rewrites - * as a useful basic rewriting rule.<P> - */ - override def optimizerRewrite[K: ClassTag](action: DrmLike[K]): DrmLike[K] = super.optimizerRewrite(action) - - /** - * Translates logical plan into Flink execution plan. - **/ - override def toPhysical[K: ClassTag](plan: DrmLike[K], ch: CacheHint.CacheHint): CheckpointedDrm[K] = { - // Flink-specific Physical Plan translation. - - implicit val typeInformation = generateTypeInformation[K] - val drm = flinkTranslate(plan) - val newcp = new CheckpointedFlinkDrm(ds = drm.asRowWise.ds, _nrow = plan.nrow, _ncol = plan.ncol) - newcp.cache() - } - - - private def flinkTranslate[K](oper: DrmLike[K]): FlinkDrm[K] = { - implicit val kTag = oper.keyClassTag - implicit val typeInformation = generateTypeInformation[K] - oper match { - case OpAtAnyKey(_) â - throw new IllegalArgumentException("\"A\" must be Int-keyed in this A.t expression.") - case op@OpAx(a, x) â - FlinkOpAx.blockifiedBroadcastAx(op, flinkTranslate(a)) - case op@OpAt(a) if op.keyClassTag == ClassTag.Int â FlinkOpAt.sparseTrick(op, flinkTranslate(a)).asInstanceOf[FlinkDrm[K]] - case op@OpAtx(a, x) if op.keyClassTag == ClassTag.Int â - FlinkOpAx.atx_with_broadcast(op, flinkTranslate(a)).asInstanceOf[FlinkDrm[K]] - case op@OpAtB(a, b) â FlinkOpAtB.notZippable(op, flinkTranslate(a), - flinkTranslate(b)).asInstanceOf[FlinkDrm[K]] - case op@OpABt(a, b) â - // express ABt via AtB: let C=At and D=Bt, and calculate CtD - // TODO: create specific implementation of ABt, see MAHOUT-1750 - val opAt = OpAt(a.asInstanceOf[DrmLike[Int]]) // TODO: casts! - val at = FlinkOpAt.sparseTrick(opAt, flinkTranslate(a.asInstanceOf[DrmLike[Int]])) - val c = new CheckpointedFlinkDrm(at.asRowWise.ds, _nrow = opAt.nrow, _ncol = opAt.ncol) - val opBt = OpAt(b.asInstanceOf[DrmLike[Int]]) // TODO: casts! - val bt = FlinkOpAt.sparseTrick(opBt, flinkTranslate(b.asInstanceOf[DrmLike[Int]])) - val d = new CheckpointedFlinkDrm(bt.asRowWise.ds, _nrow = opBt.nrow, _ncol = opBt.ncol) - FlinkOpAtB.notZippable(OpAtB(c, d), flinkTranslate(c), flinkTranslate(d)).asInstanceOf[FlinkDrm[K]] - case op@OpAtA(a) if op.keyClassTag == ClassTag.Int â FlinkOpAtA.at_a(op, flinkTranslate(a)).asInstanceOf[FlinkDrm[K]] - case op@OpTimesRightMatrix(a, b) â - FlinkOpTimesRightMatrix.drmTimesInCore(op, flinkTranslate(a), b) - case op@OpAewUnaryFunc(a, _, _) â - FlinkOpAewScalar.opUnaryFunction(op, flinkTranslate(a)) - case op@OpAewUnaryFuncFusion(a, _) â - FlinkOpAewScalar.opUnaryFunction(op, flinkTranslate(a)) - // deprecated - case op@OpAewScalar(a, scalar, _) â - FlinkOpAewScalar.opScalarNoSideEffect(op, flinkTranslate(a), scalar) - case op@OpAewB(a, b, _) â - FlinkOpAewB.rowWiseJoinNoSideEffect(op, flinkTranslate(a), flinkTranslate(b)) - case op@OpCbind(a, b) â - FlinkOpCBind.cbind(op, flinkTranslate(a), flinkTranslate(b)) - case op@OpRbind(a, b) â - FlinkOpRBind.rbind(op, flinkTranslate(a), flinkTranslate(b)) - case op@OpCbindScalar(a, x, _) â - FlinkOpCBind.cbindScalar(op, flinkTranslate(a), x) - case op@OpRowRange(a, _) â - FlinkOpRowRange.slice(op, flinkTranslate(a)).asInstanceOf[FlinkDrm[K]] - case op@OpABAnyKey(a, b) if a.keyClassTag != b.keyClassTag â - throw new IllegalArgumentException("DRMs A and B have different indices, cannot multiply them") - case op: OpMapBlock[_, K] â - FlinkOpMapBlock.apply(flinkTranslate(op.A), op.ncol, op) - case cp: CheckpointedDrm[K] â cp - case _ â - throw new NotImplementedError(s"operator $oper is not implemented yet") - } - } - - /** - * returns a vector that contains a column-wise sum from DRM - */ - override def colSums[K](drm: CheckpointedDrm[K]): Vector = { - implicit val kTag: ClassTag[K] = drm.keyClassTag - implicit val typeInformation = generateTypeInformation[K] - - - val sum = drm.ds.map { - tuple => tuple._2 - }.reduce(_ + _) - - val list = sum.collect - list.head - } - - /** Engine-specific numNonZeroElementsPerColumn implementation based on a checkpoint. */ - override def numNonZeroElementsPerColumn[K](drm: CheckpointedDrm[K]): Vector = { - implicit val kTag: ClassTag[K] = drm.keyClassTag - implicit val typeInformation = generateTypeInformation[K] - - - val result = drm.asBlockified.ds.map { - tuple => - val block = tuple._2 - val acc = block(0, ::).like() - - block.foreach { v => - v.nonZeroes().foreach { el => acc(el.index()) = acc(el.index()) + 1 } - } - - acc - }.reduce(_ + _) - - val list = result.collect - list.head - } - - /** - * returns a vector that contains a column-wise mean from DRM - */ - override def colMeans[K](drm: CheckpointedDrm[K]): Vector = { - drm.colSums() / drm.nrow - } - - /** - * Calculates the element-wise squared norm of a matrix - */ - override def norm[K](drm: CheckpointedDrm[K]): Double = { - implicit val kTag: ClassTag[K] = drm.keyClassTag - implicit val typeInformation = generateTypeInformation[K] - - val sumOfSquares = drm.ds.map { - tuple => tuple match { - case (idx, vec) => vec dot vec - } - }.reduce(_ + _) - - val list = sumOfSquares.collect - - // check on this --why is it returning a list? - math.sqrt(list.head) - } - - /** Broadcast support */ - override def drmBroadcast(v: Vector)(implicit dc: DistributedContext): BCast[Vector] = - FlinkByteBCast.wrap(v) - - /** Broadcast support */ - override def drmBroadcast(m: Matrix)(implicit dc: DistributedContext): BCast[Matrix] = - FlinkByteBCast.wrap(m) - - /** Parallelize in-core matrix as flink distributed matrix, using row ordinal indices as data set keys. */ - // The 'numPartitions' parameter is not honored in this call, - // as Flink sets a global parallelism in ExecutionEnvironment - override def drmParallelizeWithRowIndices(m: Matrix, numPartitions: Int = 1) - (implicit dc: DistributedContext): CheckpointedDrm[Int] = { - - val parallelDrm = parallelize(m, numPartitions) - - new CheckpointedFlinkDrm(ds = parallelDrm, _nrow = m.numRows(), _ncol = m.numCols()) - } - - // The 'parallelismDegree' parameter is not honored in this call, - // as Flink sets a global parallelism in ExecutionEnvironment - private[flinkbindings] def parallelize(m: Matrix, parallelismDegree: Int) - (implicit dc: DistributedContext): DrmDataSet[Int] = { - val rows = (0 until m.nrow).map(i => (i, m(i, ::))) - val dataSetType = TypeExtractor.getForObject(rows.head) - dc.env.fromCollection(rows).partitionByRange(0) - } - - /** Parallelize in-core matrix as flink distributed matrix, using row labels as a data set keys. */ - // The 'numPartitions' parameter is not honored in this call, - // as Flink sets a global parallelism in ExecutionEnvironment - override def drmParallelizeWithRowLabels(m: Matrix, numPartitions: Int = 1) - (implicit dc: DistributedContext): CheckpointedDrm[String] = { - - val rb = m.getRowLabelBindings - val p = for (i: String â rb.keySet().toIndexedSeq) yield i â m(rb(i), ::) - - new CheckpointedFlinkDrm[String](dc.env.fromCollection(p), - _nrow = m.nrow, _ncol = m.ncol, cacheHint = CacheHint.NONE) - } - - /** This creates an empty DRM with specified number of partitions and cardinality. */ - override def drmParallelizeEmpty(nrow: Int, ncol: Int, numPartitions: Int = 10) - (implicit dc: DistributedContext): CheckpointedDrm[Int] = { - val nonParallelResult = (0 to numPartitions).flatMap { part â - val partNRow = (nrow - 1) / numPartitions + 1 - val partStart = partNRow * part - val partEnd = Math.min(partStart + partNRow, nrow) - - for (i <- partStart until partEnd) yield (i, new RandomAccessSparseVector(ncol): Vector) - } - val result = dc.env.fromCollection(nonParallelResult) - new CheckpointedFlinkDrm[Int](ds = result, _nrow = nrow, _ncol = ncol) - } - - /** Creates empty DRM with non-trivial height */ - override def drmParallelizeEmptyLong(nrow: Long, ncol: Int, numPartitions: Int = 10) - (implicit dc: DistributedContext): CheckpointedDrm[Long] = { - - val nonParallelResult = (0 to numPartitions).flatMap { part â - val partNRow = (nrow - 1) / numPartitions + 1 - val partStart = partNRow * part - val partEnd = Math.min(partStart + partNRow, nrow) - - for (i â partStart until partEnd) yield (i, new RandomAccessSparseVector(ncol): Vector) - } - - val result = dc.env.fromCollection(nonParallelResult) - new CheckpointedFlinkDrm[Long](ds = result, _nrow = nrow, _ncol = ncol, cacheHint = CacheHint.NONE) - } - - /** - * Convert non-int-keyed matrix to an int-keyed, computing optionally mapping from old keys - * to row indices in the new one. The mapping, if requested, is returned as a 1-column matrix. - */ - def drm2IntKeyed[K](drmX: DrmLike[K], computeMap: Boolean = false): (DrmLike[Int], Option[DrmLike[K]]) = { - implicit val ktag = drmX.keyClassTag - implicit val kTypeInformation = generateTypeInformation[K] - - if (ktag == ClassTag.Int) { - drmX.asInstanceOf[DrmLike[Int]] â None - } else { - val drmXcp = drmX.checkpoint(CacheHint.MEMORY_ONLY) - val ncol = drmXcp.asInstanceOf[CheckpointedFlinkDrm[K]].ncol - val nrow = drmXcp.asInstanceOf[CheckpointedFlinkDrm[K]].nrow - - // Compute sequential int key numbering. - val (intDataset, keyMap) = blas.rekeySeqInts(drmDataSet = drmXcp, computeMap = computeMap) - - // Convert computed key mapping to a matrix. - val mxKeyMap = keyMap.map { dataSet â - datasetWrap(dataSet.map { - tuple: (K, Int) => { - val ordinal = tuple._2 - val key = tuple._1 - key -> (dvec(ordinal): Vector) - } - }) - } - - intDataset -> mxKeyMap - } - } - - /** - * (Optional) Sampling operation. - */ - def drmSampleRows[K](drmX: DrmLike[K], fraction: Double, replacement: Boolean = false): DrmLike[K] = { - implicit val kTag: ClassTag[K] = drmX.keyClassTag - implicit val typeInformation = generateTypeInformation[K] - - val sample = DataSetUtils(drmX.dataset).sample(replacement, fraction) - - val res = if (kTag != ClassTag.Int) { - new CheckpointedFlinkDrm[K](sample) - } - else { - blas.rekeySeqInts(new RowsFlinkDrm[K](sample, ncol = drmX.ncol), computeMap = false)._1 - .asInstanceOf[DrmLike[K]] - } - - res - } - - def drmSampleKRows[K](drmX: DrmLike[K], numSamples:Int, replacement: Boolean = false): Matrix = { - implicit val kTag: ClassTag[K] = drmX.keyClassTag - implicit val typeInformation = generateTypeInformation[K] - - val sample = DataSetUtils(drmX.dataset).sampleWithSize(replacement, numSamples) - val sampleArray = sample.collect().toArray - val isSparse = sampleArray.exists { case (_, vec) â !vec.isDense } - - val vectors = sampleArray.map(_._2) - val labels = sampleArray.view.zipWithIndex - .map { case ((key, _), idx) â key.toString â (idx: Integer) }.toMap - - val mx: Matrix = if (isSparse) sparse(vectors: _*) else dense(vectors) - mx.setRowLabelBindings(labels) - - mx - } - - /** Engine-specific all reduce tensor operation. */ - def allreduceBlock[K](drm: CheckpointedDrm[K], bmf: BlockMapFunc2[K], rf: BlockReduceFunc): Matrix = { - implicit val kTag: ClassTag[K] = drm.keyClassTag - implicit val typeInformation = generateTypeInformation[K] - - val res = drm.asBlockified.ds.map(par => bmf(par)).reduce(rf) - res.collect().head - } - - def generateTypeInformation[K: ClassTag]: TypeInformation[K] = { - implicit val ktag = classTag[K] - - generateTypeInformationFromTag(ktag) - } - - private def generateTypeInformationFromTag[K](tag: ClassTag[K]): TypeInformation[K] = { - if (tag.runtimeClass.equals(classOf[Int])) { - createTypeInformation[Int].asInstanceOf[TypeInformation[K]] - } else if (tag.runtimeClass.equals(classOf[Long])) { - createTypeInformation[Long].asInstanceOf[TypeInformation[K]] - } else if (tag.runtimeClass.equals(classOf[String])) { - createTypeInformation[String].asInstanceOf[TypeInformation[K]] - } else { - throw new IllegalArgumentException(s"index type $tag is not supported") - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/mahout/blob/99a5358f/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAewB.scala ---------------------------------------------------------------------- diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAewB.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAewB.scala deleted file mode 100644 index a3be618..0000000 --- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAewB.scala +++ /dev/null @@ -1,60 +0,0 @@ -package org.apache.mahout.flinkbindings.blas - - -import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.scala._ -import org.apache.flink.util.Collector -import org.apache.mahout.flinkbindings.drm.{FlinkDrm, RowsFlinkDrm} -import org.apache.mahout.math.Vector -import org.apache.mahout.math.drm.logical.OpAewB -import org.apache.mahout.math.scalabindings.RLikeOps._ - -/** - * Implementation of Flink OpAewB - */ -object FlinkOpAewB { - - def rowWiseJoinNoSideEffect[K: TypeInformation](op: OpAewB[K], A: FlinkDrm[K], B: FlinkDrm[K]): FlinkDrm[K] = { - val function = AewBOpsCloning.strToFunction(op.op) - - val rowsA = A.asRowWise.ds - val rowsB = B.asRowWise.ds - implicit val kTag = op.keyClassTag - - val res: DataSet[(K, Vector)] = - rowsA - .coGroup(rowsB) - .where(0) - .equalTo(0) { - (left, right, out: Collector[(K, Vector)]) => - (left.toIterable.headOption, right.toIterable.headOption) match { - case (Some((idx, a)), Some((_, b))) => out.collect((idx, function(a, b))) - case (None, Some(b)) => out.collect(b) - case (Some(a), None) => out.collect(a) - case (None, None) => throw new RuntimeException("At least one side of the co group " + - "must be non-empty.") - } - } - - - new RowsFlinkDrm(res.asInstanceOf[DataSet[(K, Vector)]], ncol=op.ncol) - } -} - - -object AewBOpsCloning { - type VectorVectorFunc = (Vector, Vector) => Vector - - def strToFunction(op: String): VectorVectorFunc = op match { - case "+" => plus - case "-" => minus - case "*" => times - case "/" => div - case _ => throw new IllegalArgumentException(s"Unsupported elementwise operator: $op") - } - - val plus: VectorVectorFunc = (a, b) => a + b - val minus: VectorVectorFunc = (a, b) => a - b - val times: VectorVectorFunc = (a, b) => a * b - val div: VectorVectorFunc = (a, b) => a / b -} http://git-wip-us.apache.org/repos/asf/mahout/blob/99a5358f/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAewScalar.scala ---------------------------------------------------------------------- diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAewScalar.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAewScalar.scala deleted file mode 100644 index 6b034b8..0000000 --- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAewScalar.scala +++ /dev/null @@ -1,106 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.mahout.flinkbindings.blas - -import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.mahout.flinkbindings.drm.{BlockifiedFlinkDrm, FlinkDrm} -import org.apache.mahout.math.Matrix -import org.apache.mahout.math.drm.logical.{AbstractUnaryOp, OpAewScalar, TEwFunc} -import org.apache.mahout.math.scalabindings.RLikeOps._ - -import scala.collection.JavaConversions._ -import scala.reflect.ClassTag - -import org.apache.flink.api.scala._ - -/** - * Implementation if Flink OpAewScalar - */ -object FlinkOpAewScalar { - - final val PROPERTY_AEWB_INPLACE = "mahout.math.AewB.inplace" - private def isInplace = System.getProperty(PROPERTY_AEWB_INPLACE, "false").toBoolean - - @Deprecated - def opScalarNoSideEffect[K: TypeInformation](op: OpAewScalar[K], A: FlinkDrm[K], scalar: Double): FlinkDrm[K] = { - val function = EWOpsCloning.strToFunction(op.op) - implicit val kTag = op.keyClassTag - - - val res = A.asBlockified.ds.map{ - tuple => (tuple._1, function(tuple._2, scalar)) - } - - new BlockifiedFlinkDrm(res, op.ncol) - } - - def opUnaryFunction[K: TypeInformation](op: AbstractUnaryOp[K, K] with TEwFunc, A: FlinkDrm[K]): FlinkDrm[K] = { - val f = op.f - val inplace = isInplace - - - implicit val kTag = op.keyClassTag - - val res = if (op.evalZeros) { - A.asBlockified.ds.map{ - tuple => - val (keys, block) = tuple - val newBlock = if (inplace) block else block.cloned - newBlock := ((_, _, x) => f(x)) - (keys, newBlock) - } - } else { - A.asBlockified.ds.map{ - tuple => - val (keys, block) = tuple - val newBlock = if (inplace) block else block.cloned - for (row <- newBlock; el <- row.nonZeroes) el := f(el.get) - (keys, newBlock) - } - } - - new BlockifiedFlinkDrm(res, op.ncol) - - } - -} - -@Deprecated -object EWOpsCloning { - - type MatrixScalarFunc = (Matrix, Double) => Matrix - - def strToFunction(op: String): MatrixScalarFunc = op match { - case "+" => plusScalar - case "-" => minusScalar - case "*" => timesScalar - case "/" => divScalar - case "-:" => scalarMinus - case "/:" => scalarDiv - case _ => throw new IllegalArgumentException(s"Unsupported elementwise operator: $op") - } - - val plusScalar: MatrixScalarFunc = (A, s) => A + s - val minusScalar: MatrixScalarFunc = (A, s) => A - s - val scalarMinus: MatrixScalarFunc = (A, s) => s -: A - val timesScalar: MatrixScalarFunc = (A, s) => A * s - val divScalar: MatrixScalarFunc = (A, s) => A / s - val scalarDiv: MatrixScalarFunc = (A, s) => s /: A -} - http://git-wip-us.apache.org/repos/asf/mahout/blob/99a5358f/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAt.scala ---------------------------------------------------------------------- diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAt.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAt.scala deleted file mode 100644 index 5093216..0000000 --- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAt.scala +++ /dev/null @@ -1,72 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.mahout.flinkbindings.blas - -import org.apache.flink.api.scala._ -import org.apache.mahout.flinkbindings.drm.{FlinkDrm, RowsFlinkDrm} -import org.apache.mahout.math.{SequentialAccessSparseVector, Vector} -import org.apache.mahout.math.drm.logical.OpAt -import org.apache.mahout.math.scalabindings.RLikeOps._ - -import scala.Array.canBuildFrom - -/** - * Implementation of Flink At - */ -object FlinkOpAt { - - /** - * The idea here is simple: compile vertical column vectors of every partition block as sparse - * vectors of the <code>A.nrow</code> length; then group them by their column index and sum the - * groups into final rows of the transposed matrix. - */ - def sparseTrick(op: OpAt, A: FlinkDrm[Int]): FlinkDrm[Int] = { - val ncol = op.ncol // # of rows of A, i.e. # of columns of A^T - - val sparseParts = A.asBlockified.ds.flatMap { - blockifiedTuple => - val keys = blockifiedTuple._1 - val block = blockifiedTuple._2 - - (0 until block.ncol).map { - columnIndex => - val columnVector: Vector = new SequentialAccessSparseVector(ncol) - - keys.zipWithIndex.foreach { - case (key, idx) => columnVector(key) += block(idx, columnIndex) - } - - (columnIndex, columnVector) - } - } - - val regrouped = sparseParts.groupBy(0) - - val sparseTotal = regrouped.reduce{ - (left, right) => - (left._1, left._2 + right._2) - } - - // TODO: densify or not? - new RowsFlinkDrm(sparseTotal, ncol) - } - - - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/mahout/blob/99a5358f/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtA.scala ---------------------------------------------------------------------- diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtA.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtA.scala deleted file mode 100644 index 10f1b92..0000000 --- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtA.scala +++ /dev/null @@ -1,222 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.mahout.flinkbindings.blas - -import java.lang.Iterable - -import org.apache.flink.api.common.functions._ -import org.apache.flink.api.scala._ -import org.apache.flink.configuration.Configuration -import org.apache.flink.shaded.com.google.common.collect.Lists -import org.apache.flink.util.Collector -import org.apache.mahout.flinkbindings._ -import org.apache.mahout.flinkbindings.drm._ -import org.apache.mahout.math.drm.logical.OpAtA -import org.apache.mahout.math.drm.{BlockifiedDrmTuple, _} -import org.apache.mahout.math.scalabindings.RLikeOps._ -import org.apache.mahout.math.scalabindings._ -import org.apache.mahout.math.{Matrix, UpperTriangular, _} - -import scala.collection.JavaConversions._ -import scala.collection.JavaConverters._ -import scala.collection._ - -/** - * Implementation of Flink A' * A - * - */ -object FlinkOpAtA { - - final val PROPERTY_ATA_MAXINMEMNCOL = "mahout.math.AtA.maxInMemNCol" - final val PROPERTY_ATA_MAXINMEMNCOL_DEFAULT = "200" - - def at_a[K](op: OpAtA[K], A: FlinkDrm[K]): FlinkDrm[Int] = { - val maxInMemStr = System.getProperty(PROPERTY_ATA_MAXINMEMNCOL, PROPERTY_ATA_MAXINMEMNCOL_DEFAULT) - val maxInMemNCol = maxInMemStr.toInt - maxInMemNCol.ensuring(_ > 0, "Invalid A'A in-memory setting for optimizer") - - implicit val kTag = A.classTag - - if (op.ncol <= maxInMemNCol) { - implicit val ctx = A.context - val inCoreAtA = slim(op, A) - val result = drmParallelize(inCoreAtA, numPartitions = 1) - result - } else { - fat(op.asInstanceOf[OpAtA[K]], A.asInstanceOf[FlinkDrm[K]]) - } - } - - def slim[K](op: OpAtA[K], A: FlinkDrm[K]): Matrix = { - val ds = A.asRowWise.ds - val ncol = op.ncol - - // Compute backing vector of tiny-upper-triangular accumulator across all the data. - val res = ds.mapPartition(pIter => { - - val ut = new UpperTriangular(ncol) - - // Strategy is to add to an outer product of each row to the upper triangular accumulator. - pIter.foreach({ case (k, v) => - - // Use slightly various traversal strategies over dense vs. sparse source. - if (v.isDense) { - - // Update upper-triangular pattern only (due to symmetry). - // Note: Scala for-comprehensions are said to be fairly inefficient this way, but this is - // such spectacular case they were designed for.. Yes I do observe some 20% difference - // compared to while loops with no other payload, but the other paylxcoad is usually much - // heavier than this overhead, so... I am keeping this as is for the time being. - - for (row <- 0 until v.length; col <- row until v.length) - ut(row, col) = ut(row, col) + v(row) * v(col) - - } else { - - // Sparse source. - v.nonZeroes().view - - // Outer iterator iterates over rows of outer product. - .foreach(elrow => { - - // Inner loop for columns of outer product. - v.nonZeroes().view - - // Filter out non-upper nonzero elements from the double loop. - .filter(_.index >= elrow.index) - - // Incrementally update outer product value in the uppper triangular accumulator. - .foreach(elcol => { - - val row = elrow.index - val col = elcol.index - ut(row, col) = ut(row, col) + elrow.get() * elcol.get() - - }) - }) - - } - }) - - Iterator(dvec(ddata = ut.getData).asInstanceOf[Vector]: Vector) - }).reduce(_ + _).collect() - - new DenseSymmetricMatrix(res.head) - } - - def fat[K](op: OpAtA[K], A: FlinkDrm[K]): FlinkDrm[Int] = { - val nrow = op.A.nrow - val ncol = op.A.ncol - val ds = A.asBlockified.ds - - val numberOfPartitions: DataSet[Int] = ds.map(new MapFunction[(Array[K], Matrix), Int] { - def map(a: (Array[K], Matrix)): Int = 1 - }).reduce(new ReduceFunction[Int] { - def reduce(a: Int, b: Int): Int = a + b - }) - - val subresults: DataSet[(Int, Matrix)] = - ds.flatMap(new RichFlatMapFunction[(Array[K], Matrix), (Int, Matrix)] { - - var ranges: Array[Range] = _ - - override def open(params: Configuration): Unit = { - val runtime = this.getRuntimeContext - val dsX: java.util.List[Int] = runtime.getBroadcastVariable("numberOfPartitions") - val parts = dsX.get(0) - val numParts = estimatePartitions(nrow, ncol, parts) - ranges = computeEvenSplits(ncol, numParts) - } - - def flatMap(tuple: (Array[K], Matrix), out: Collector[(Int, Matrix)]): Unit = { - val block = tuple._2 - - ranges.zipWithIndex.foreach { case (range, idx) => - out.collect(idx -> block(::, range).t %*% block) - } - } - - }).withBroadcastSet(numberOfPartitions, "numberOfPartitions") - - val res = subresults.groupBy(0) - .reduceGroup(new RichGroupReduceFunction[(Int, Matrix), BlockifiedDrmTuple[Int]] { - - var ranges: Array[Range] = _ - - override def open(params: Configuration): Unit = { - val runtime = this.getRuntimeContext - val dsX: java.util.List[Int] = runtime.getBroadcastVariable("numberOfPartitions") - val parts = dsX.get(0) - val numParts = estimatePartitions(nrow, ncol, parts) - ranges = computeEvenSplits(ncol, numParts) - } - - def reduce(values: Iterable[(Int, Matrix)], out: Collector[BlockifiedDrmTuple[Int]]): Unit = { - val it = Lists.newArrayList(values).asScala - val (blockKey, _) = it.head - - val block = it.map { _._2 }.reduce { (m1, m2) => m1 + m2 } - val blockStart = ranges(blockKey).start - val rowKeys = Array.tabulate(block.nrow)(blockStart + _) - - out.collect(rowKeys -> block) - } - }).withBroadcastSet(numberOfPartitions, "numberOfPartitions") - - new BlockifiedFlinkDrm(res, ncol) - } - - def estimatePartitions(nrow: Long, ncol: Int, parts:Int): Int = { - // per-partition element density - val epp = nrow.toDouble * ncol / parts - - // product partitions - val prodParts = nrow * ncol / epp - - val nparts = math.round(prodParts).toInt max 1 - - // Constrain nparts to maximum of anrow to prevent guaranteed empty partitions. - if (nparts > nrow) { - nrow.toInt - } else { - nparts - } - } - - def computeEvenSplits(nrow: Long, numSplits: Int): Array[Range] = { - require(numSplits <= nrow, "Requested amount of splits greater than number of data points.") - require(nrow >= 1) - require(numSplits >= 1) - - // Base split -- what is our base split size? - val baseSplit = safeToNonNegInt(nrow / numSplits) - - // Slack -- how many splits will have to be incremented by 1 though? - val slack = safeToNonNegInt(nrow % numSplits) - - // Compute ranges. We need to set ranges so that numSplits - slack splits have size of baseSplit; - // and `slack` splits have size baseSplit + 1. Here is how we do it: First, we compute the range - // offsets: - val offsets = (0 to numSplits).map(i => i * (baseSplit + 1) - (0 max i - slack)) - // And then we connect the ranges using gaps between offsets: - - val ranges = offsets.sliding(2).map { offs => offs(0) until offs(1) } - ranges.toArray - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/mahout/blob/99a5358f/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtB.scala ---------------------------------------------------------------------- diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtB.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtB.scala deleted file mode 100644 index c10c2e4..0000000 --- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtB.scala +++ /dev/null @@ -1,88 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.mahout.flinkbindings.blas - -import java.lang.Iterable - -import com.google.common.collect.Lists -import org.apache.flink.api.common.functions.{FlatMapFunction, GroupReduceFunction} -import org.apache.flink.api.scala._ -import org.apache.flink.util.Collector -import org.apache.mahout.flinkbindings._ -import org.apache.mahout.flinkbindings.drm.{BlockifiedFlinkDrm, FlinkDrm} -import org.apache.mahout.math.{Matrix, Vector} -import org.apache.mahout.math.drm._ -import org.apache.mahout.math.drm.logical.OpAtB -import org.apache.mahout.math.scalabindings.RLikeOps._ - -import scala.collection.JavaConverters.asScalaBufferConverter - -/** - * Implementation of Flink A' * B - */ -object FlinkOpAtB { - - def notZippable[A](op: OpAtB[A], At: FlinkDrm[A], B: FlinkDrm[A]): FlinkDrm[Int] = { - val rowsAt = At.asRowWise.ds.asInstanceOf[DrmDataSet[A]] - val rowsB = B.asRowWise.ds.asInstanceOf[DrmDataSet[A]] - val joined = rowsAt.join(rowsB).where(0).equalTo(0) - - val ncol = op.ncol - val nrow = op.nrow.toInt - val blockHeight = 10 - val blockCount = safeToNonNegInt((nrow - 1) / blockHeight + 1) - - val preProduct: DataSet[(Int, Matrix)] = - joined.flatMap(new FlatMapFunction[((A, Vector), (A, Vector)), (Int, Matrix)] { - def flatMap(in: ((A, Vector), (A, Vector)), out: Collector[(Int, Matrix)]): Unit = { - val avec = in._1._2 - val bvec = in._2._2 - - 0.until(blockCount) map { blockKey => - val blockStart = blockKey * blockHeight - val blockEnd = Math.min(nrow, blockStart + blockHeight) - - val outer = avec(blockStart until blockEnd) cross bvec - out.collect(blockKey -> outer) - out - } - } - }) - - val res: BlockifiedDrmDataSet[Int] = - preProduct.groupBy(0).reduceGroup( - new GroupReduceFunction[(Int, Matrix), BlockifiedDrmTuple[Int]] { - def reduce(values: Iterable[(Int, Matrix)], out: Collector[BlockifiedDrmTuple[Int]]): Unit = { - val it = Lists.newArrayList(values).asScala - val (idx, _) = it.head - - val block = it.map { t => t._2 }.reduce { (m1, m2) => m1 + m2 } - - val blockStart = idx * blockHeight - val keys = Array.tabulate(block.nrow)(blockStart + _) - - out.collect(keys -> block) - } - } - ) - - new BlockifiedFlinkDrm[Int](res, ncol) - } - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/mahout/blob/99a5358f/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAx.scala ---------------------------------------------------------------------- diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAx.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAx.scala deleted file mode 100644 index 18a3c4b..0000000 --- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAx.scala +++ /dev/null @@ -1,96 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.mahout.flinkbindings.blas - -import java.util - -import org.apache.flink.api.common.functions.RichMapFunction -import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.scala._ -import org.apache.flink.configuration.Configuration -import org.apache.mahout.flinkbindings.FlinkEngine -import org.apache.mahout.flinkbindings.drm.{BlockifiedFlinkDrm, FlinkDrm, RowsFlinkDrm} -import org.apache.mahout.math.drm._ -import org.apache.mahout.math.drm.logical.{OpAtx, OpAx} -import org.apache.mahout.math.scalabindings.RLikeOps._ -import org.apache.mahout.math.scalabindings._ -import org.apache.mahout.math.{Matrix, Vector} - -/** - * Implementation of Flink Ax - */ -object FlinkOpAx { - - def blockifiedBroadcastAx[K: TypeInformation](op: OpAx[K], A: FlinkDrm[K]): FlinkDrm[K] = { - implicit val ctx = A.context - implicit val kTag = op.keyClassTag - - val singletonDataSetX = ctx.env.fromElements(op.x) - - val out = A.asBlockified.ds.map(new RichMapFunction[(Array[K], Matrix), (Array[K], Matrix)] { - var x: Vector = null - - override def open(params: Configuration): Unit = { - val runtime = this.getRuntimeContext - val dsX: util.List[Vector] = runtime.getBroadcastVariable("vector") - x = dsX.get(0) - } - - override def map(tuple: (Array[K], Matrix)): (Array[K], Matrix) = tuple match { - case (keys, mat) => (keys, (mat %*% x).toColMatrix) - } - }).withBroadcastSet(singletonDataSetX, "vector") - - new BlockifiedFlinkDrm(out, op.nrow.toInt) - } - - - def atx_with_broadcast(op: OpAtx, srcA: FlinkDrm[Int]): FlinkDrm[Int] = { - implicit val ctx = srcA.context - - val dataSetA = srcA.asBlockified.ds - - // broadcast the vector x to the back end - val bcastX = drmBroadcast(op.x) - - implicit val typeInformation = createTypeInformation[(Array[Int],Matrix)] - val inCoreM = dataSetA.map { - tuple => - tuple._1.zipWithIndex.map { - case (key, idx) => tuple._2(idx, ::) * bcastX.value(key) - } - .reduce(_ += _) - } - // All-reduce - .reduce(_ += _) - - // collect result - .collect().head - - // Convert back to mtx - .toColMatrix - - // This doesn't do anything now - val res = FlinkEngine.parallelize(inCoreM, parallelismDegree = 1) - - new RowsFlinkDrm[Int](res, 1) - - } - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/mahout/blob/99a5358f/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpCBind.scala ---------------------------------------------------------------------- diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpCBind.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpCBind.scala deleted file mode 100644 index 0e16fa8..0000000 --- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpCBind.scala +++ /dev/null @@ -1,117 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.mahout.flinkbindings.blas - -import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.scala._ -import org.apache.mahout.flinkbindings.drm._ -import org.apache.mahout.math._ -import org.apache.mahout.math.drm.logical.{OpCbind, OpCbindScalar} -import org.apache.mahout.math.scalabindings.RLikeOps._ - -import scala.collection.JavaConversions._ - -/** - * Implementation of Flink's cbind - */ -object FlinkOpCBind { - - def cbind[K: TypeInformation](op: OpCbind[K], A: FlinkDrm[K], B: FlinkDrm[K]): FlinkDrm[K] = { - val n = op.ncol - val n1 = op.A.ncol - val n2 = op.B.ncol - - implicit val classTag = op.A.keyClassTag - - val rowsA = A.asRowWise.ds - val rowsB = B.asRowWise.ds - - val res: DataSet[(K, Vector)] = - rowsA.coGroup(rowsB).where(0).equalTo(0) { - (left, right) => - (left.toIterable.headOption, right.toIterable.headOption) match { - case (Some((idx, a)), Some((_, b))) => - val result = if (a.isDense && b.isDense) { - new DenseVector(n) - } else { - new SequentialAccessSparseVector(n) - } - - result(0 until n1) := a - result(n1 until n) := b - - (idx, result) - case (Some((idx, a)), None) => - val result: Vector = if (a.isDense) { - new DenseVector(n) - } else { - new SequentialAccessSparseVector(n) - } - result(n1 until n) := a - - (idx, result) - case (None, Some((idx, b))) => - val result: Vector = if (b.isDense) { - new DenseVector(n) - } else { - new SequentialAccessSparseVector(n) - } - result(n1 until n) := b - - (idx, result) - case (None, None) => - throw new RuntimeException("CoGroup should have at least one non-empty input.") - } - } - - new RowsFlinkDrm(res.asInstanceOf[DataSet[(K, Vector)]], ncol=op.ncol) - } - - def cbindScalar[K: TypeInformation](op: OpCbindScalar[K], A: FlinkDrm[K], x: Double): FlinkDrm[K] = { - val left = op.leftBind - val ds = A.asBlockified.ds - - implicit val kTag= op.keyClassTag - - def cbind(mat: Matrix, x: Double, left: Boolean): Matrix = { - val ncol = mat.ncol - val newMat = mat.like(mat.nrow, ncol + 1) - - if (left) { - newMat.zip(mat).foreach { case (newVec, origVec) => - newVec(0) = x - newVec(1 to ncol) := origVec - } - } else { - newMat.zip(mat).foreach { case (newVec, origVec) => - newVec(ncol) = x - newVec(0 until ncol) := origVec - } - } - - newMat - } - - val out = A.asBlockified.ds.map { - tuple => (tuple._1, cbind(tuple._2, x, left)) - } - - new BlockifiedFlinkDrm(out, op.ncol) - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/mahout/blob/99a5358f/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpMapBlock.scala ---------------------------------------------------------------------- diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpMapBlock.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpMapBlock.scala deleted file mode 100644 index c22fa9a..0000000 --- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpMapBlock.scala +++ /dev/null @@ -1,47 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.mahout.flinkbindings.blas - -import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.scala._ -import org.apache.mahout.flinkbindings.drm.{BlockifiedFlinkDrm, FlinkDrm} -import org.apache.mahout.math.drm.logical.OpMapBlock -import org.apache.mahout.math.scalabindings.RLikeOps._ - -/** - * Implementation of Flink's MapBlock - */ -object FlinkOpMapBlock { - - def apply[S, R: TypeInformation](src: FlinkDrm[S], ncol: Int, operator: OpMapBlock[S,R]): FlinkDrm[R] = { - - implicit val rtag = operator.keyClassTag - val bmf = operator.bmf - val ncol = operator.ncol - val res = src.asBlockified.ds.map { - block => - val result = bmf(block) - assert(result._2.nrow == block._2.nrow, "block mapping must return same number of rows.") - assert(result._2.ncol == ncol, s"block map must return $ncol number of columns.") - result - } - - new BlockifiedFlinkDrm[R](res, ncol) - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/mahout/blob/99a5358f/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpRBind.scala ---------------------------------------------------------------------- diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpRBind.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpRBind.scala deleted file mode 100644 index 685486c..0000000 --- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpRBind.scala +++ /dev/null @@ -1,41 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.mahout.flinkbindings.blas - -import org.apache.flink.api.common.typeinfo.TypeInformation - -import org.apache.flink.api.scala.DataSet -import org.apache.mahout.flinkbindings.drm.FlinkDrm -import org.apache.mahout.flinkbindings.drm.RowsFlinkDrm -import org.apache.mahout.math.Vector -import org.apache.mahout.math.drm.logical.OpRbind - -/** - * Implementation of RBind - */ -object FlinkOpRBind { - - def rbind[K: TypeInformation](op: OpRbind[K], A: FlinkDrm[K], B: FlinkDrm[K]): FlinkDrm[K] = { - // note that indexes of B are already re-arranged prior to executing this code - implicit val kTag = op.keyClassTag - val res = A.asRowWise.ds.union(B.asRowWise.ds) - new RowsFlinkDrm(res.asInstanceOf[DataSet[(K, Vector)]], ncol = op.ncol) - } - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/mahout/blob/99a5358f/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpRowRange.scala ---------------------------------------------------------------------- diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpRowRange.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpRowRange.scala deleted file mode 100644 index c002002..0000000 --- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpRowRange.scala +++ /dev/null @@ -1,46 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.mahout.flinkbindings.blas - -import org.apache.flink.api.scala._ -import org.apache.mahout.flinkbindings.drm.FlinkDrm -import org.apache.mahout.flinkbindings.drm.RowsFlinkDrm -import org.apache.mahout.math.drm.logical.OpRowRange - -/** - * Implementation of Flink OpRowRange - */ -object FlinkOpRowRange { - - def slice(op: OpRowRange, A: FlinkDrm[Int]): FlinkDrm[Int] = { - val rowRange = op.rowRange - val firstIdx = rowRange.head - - val filtered = A.asRowWise.ds.filter { - tuple => rowRange.contains(tuple._1) - } - - val res = filtered.map { - tuple => (tuple._1 - firstIdx, tuple._2) - } - - new RowsFlinkDrm(res, op.ncol) - } - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/mahout/blob/99a5358f/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpTimesRightMatrix.scala ---------------------------------------------------------------------- diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpTimesRightMatrix.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpTimesRightMatrix.scala deleted file mode 100644 index a4c5373..0000000 --- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpTimesRightMatrix.scala +++ /dev/null @@ -1,81 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.mahout.flinkbindings.blas - -import org.apache.flink.api.common.functions.RichMapFunction -import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.java.typeutils.TypeExtractor -import org.apache.flink.api.scala._ -import org.apache.flink.configuration.Configuration -import org.apache.mahout.flinkbindings.drm.{BlockifiedFlinkDrm, FlinkDrm} -import org.apache.mahout.math.drm.logical.OpTimesRightMatrix -import org.apache.mahout.math.scalabindings.RLikeOps._ -import org.apache.mahout.math.scalabindings._ -import org.apache.mahout.math.{DenseMatrix, Matrix} - -/** - * Implementation of OpTimesRightMatrix: - */ -object FlinkOpTimesRightMatrix { - - def drmTimesInCore[K: TypeInformation](op: OpTimesRightMatrix[K], A: FlinkDrm[K], inCoreB: Matrix): FlinkDrm[K] = { - implicit val ctx = A.context - implicit val kTag = op.keyClassTag - - /* HACK: broadcasting the matrix using Flink's .withBroadcastSet(singletonDataSetB) on a matrix causes a backend Kryo - * Issue resulting in a stackOverflow error. - * - * Quick fix is to instead break the matrix down into a list of rows and then rebuild it on the back end - * - * TODO: this is obviously very inefficient... need to use the correct broadcast on the matrix itself. - */ - val rows = (0 until inCoreB.nrow).map(i => (i, inCoreB(i, ::))) - val dataSetType = TypeExtractor.getForObject(rows.head) - val singletonDataSetB = ctx.env.fromCollection(rows) - - val res = A.asBlockified.ds.map(new RichMapFunction[(Array[K], Matrix), (Array[K], Matrix)] { - var inCoreB: Matrix = _ - - override def open(params: Configuration): Unit = { - val runtime = this.getRuntimeContext - val dsB: java.util.List[(Int, org.apache.mahout.math.Vector)] = runtime.getBroadcastVariable("matrix") - val m = dsB.size() - val n = dsB.get(0)._2.size - val isDense = dsB.get(0)._2.isDense - - inCoreB = isDense match { - case true => new DenseMatrix(m, n) - case false => new DenseMatrix(m, n) - } - for (i <- 0 until m) { - inCoreB(i, ::) := dsB.get(i)._2 - } - - } - - override def map(tuple: (Array[K], Matrix)): (Array[K], Matrix) = tuple match { - case (keys, block_A) => (keys, block_A %*% inCoreB) - } - - }).withBroadcastSet(singletonDataSetB, "matrix") - - new BlockifiedFlinkDrm(res, op.ncol) - } - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/mahout/blob/99a5358f/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/package.scala ---------------------------------------------------------------------- diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/package.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/package.scala deleted file mode 100644 index 32a8cac..0000000 --- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/package.scala +++ /dev/null @@ -1,125 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.mahout.flinkbindings - -import java.lang.Iterable - -import org.apache.flink.api.common.functions.RichMapPartitionFunction -import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.scala._ -import org.apache.flink.api.scala.utils._ -import org.apache.flink.configuration.Configuration -import org.apache.flink.util.Collector -import org.apache.mahout.flinkbindings.drm.FlinkDrm -import org.apache.mahout.math.drm.DrmLike -import org.apache.mahout.math.scalabindings.RLikeOps._ -import org.apache.mahout.math.{RandomAccessSparseVector, Vector} - -import scala.reflect.ClassTag - -package object blas { - - /** - * Rekey matrix dataset keys to consecutive int keys. - * @param drmDataSet incoming matrix row-wise dataset - * @param computeMap if true, also compute mapping between old and new keys - * @tparam K existing key parameter - * @return - */ - private[flinkbindings] def rekeySeqInts[K: ClassTag: TypeInformation](drmDataSet: FlinkDrm[K], - computeMap: Boolean = true): (DrmLike[Int], - Option[DataSet[(K, Int)]]) = { - - implicit val dc = drmDataSet.context - - val datasetA = drmDataSet.asRowWise.ds - - val ncols = drmDataSet.asRowWise.ncol - - // Flink environment - val env = datasetA.getExecutionEnvironment - - // First, compute partition sizes. - val partSizes = DataSetUtils(datasetA).countElementsPerPartition.collect().toList - - // Starting indices - var startInd = new Array[Int](datasetA.getParallelism) - - // Save counts - for (pc <- partSizes) startInd(pc._1) = pc._2.toInt - - // compute cumulative sum - val cumulativeSum = startInd.scanLeft(0)(_ + _).init - - val vector: Vector = new RandomAccessSparseVector(cumulativeSum.length) - - cumulativeSum.indices.foreach { i => vector(i) = cumulativeSum(i).toDouble } - - val bCast = FlinkEngine.drmBroadcast(vector) - - implicit val typeInformation = createTypeInformation[(K, Int)] - - // Compute key -> int index map: - val keyMap = if (computeMap) { - Some( - datasetA.mapPartition(new RichMapPartitionFunction[(K, Vector), (K, Int)] { - - // partition number - var part: Int = 0 - - // get the index of the partition - override def open(params: Configuration): Unit = { - part = getRuntimeContext.getIndexOfThisSubtask - } - - override def mapPartition(iterable: Iterable[(K, Vector)], collector: Collector[(K, Int)]): Unit = { - val k = iterable.iterator().next._1 - val si = bCast.value.get(part) - collector.collect(k -> (part + si).toInt) - } - })) - } else { - None - } - - // Finally, do the transform - val intDataSet = datasetA - - // Re-number each partition - .mapPartition(new RichMapPartitionFunction[(K, Vector), (Int, Vector)] { - - // partition number - var part: Int = 0 - - // get the index of the partition - override def open(params: Configuration): Unit = { - part = getRuntimeContext.getIndexOfThisSubtask - } - - override def mapPartition(iterable: Iterable[(K, Vector)], collector: Collector[(Int, Vector)]): Unit = { - val k = iterable.iterator().next._2 - val si = bCast.value.get(part) - collector.collect((part + si).toInt -> k) - } - }) - - // Finally, return drm -> keymap result - datasetWrap(intDataSet) -> keyMap - } -} \ No newline at end of file
