http://git-wip-us.apache.org/repos/asf/mahout/blob/99a5358f/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala ---------------------------------------------------------------------- diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala deleted file mode 100644 index 1cba326..0000000 --- a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala +++ /dev/null @@ -1,302 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.mahout.flinkbindings.drm - -import org.apache.flink.api.common.functions.{MapFunction, ReduceFunction} -import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.java.io.{TypeSerializerInputFormat, TypeSerializerOutputFormat} -import org.apache.flink.api.scala._ -import org.apache.flink.core.fs.FileSystem.WriteMode -import org.apache.flink.core.fs.Path -import org.apache.flink.api.scala.hadoop.mapred.HadoopOutputFormat -import org.apache.flink.configuration.GlobalConfiguration -import org.apache.hadoop.io.{IntWritable, LongWritable, Text, Writable} -import org.apache.hadoop.mapred.{FileOutputFormat, JobConf, SequenceFileOutputFormat} -import org.apache.mahout.flinkbindings.io.Hadoop2HDFSUtil -import org.apache.mahout.flinkbindings.{DrmDataSet, _} -import org.apache.mahout.math._ -import org.apache.mahout.math.drm.CacheHint._ -import org.apache.mahout.math.drm.{CacheHint, CheckpointedDrm, DistributedContext, DrmTuple, _} -import org.apache.mahout.math.scalabindings.RLikeOps._ -import org.apache.mahout.math.scalabindings._ - -import scala.collection.JavaConverters._ -import scala.reflect.{ClassTag, classTag} -import scala.util.Random - -class CheckpointedFlinkDrm[K: ClassTag:TypeInformation](val ds: DrmDataSet[K], - private var _nrow: Long = CheckpointedFlinkDrm.UNKNOWN, - private var _ncol: Int = CheckpointedFlinkDrm.UNKNOWN, - override val cacheHint: CacheHint = CacheHint.NONE, - override protected[mahout] val partitioningTag: Long = Random.nextLong(), - private var _canHaveMissingRows: Boolean = false - ) extends CheckpointedDrm[K] { - - lazy val nrow: Long = if (_nrow >= 0) _nrow else dim._1 - lazy val ncol: Int = if (_ncol >= 0) _ncol else dim._2 - - // persistance values - var cacheFileName: String = "undefinedCacheName" - var isCached: Boolean = false - var parallelismDeg: Int = -1 - var persistanceRootDir: String = _ - - // need to make sure that this is actually getting the correct properties for {{taskmanager.tmp.dirs}} - val mahoutHome = getMahoutHome() - - // this is extra I/O for each cache call. this needs to be moved somewhere where it is called - // only once. Possibly FlinkDistributedEngine. - GlobalConfiguration.loadConfiguration(mahoutHome + "/conf/flink-config.yaml") - - val conf = GlobalConfiguration.getConfiguration - - if (!(conf == null )) { - persistanceRootDir = conf.getString("taskmanager.tmp.dirs", "/tmp") - } else { - persistanceRootDir = "/tmp" - } - - - private lazy val dim: (Long, Int) = { - // combine computation of ncol and nrow in one pass - - val res = ds.map(new MapFunction[DrmTuple[K], (Long, Int)] { - def map(value: DrmTuple[K]): (Long, Int) = { - (1L, value._2.length) - } - }).reduce(new ReduceFunction[(Long, Int)] { - def reduce(t1: (Long, Int), t2: (Long, Int)) = { - val ((rowCnt1, colNum1), (rowCnt2, colNum2)) = (t1, t2) - (rowCnt1 + rowCnt2, Math.max(colNum1, colNum2)) - } - }) - - val list = res.collect() - list.head - } - - - override val keyClassTag: ClassTag[K] = classTag[K] - - /** Note as of Flink 1.0.0, no direct flink caching exists so we save - * the dataset to the filesystem and read it back when cache is called */ - def cache() = { - if (!isCached) { - cacheFileName = persistanceRootDir + "/" + System.nanoTime().toString - parallelismDeg = ds.getParallelism - isCached = true - persist(ds, cacheFileName) - } - val _ds = readPersistedDataSet(cacheFileName, ds) - - /** Leave the parallelism degree to be set the operators - * TODO: find out a way to set the parallelism degree based on the - * final drm after computation is actually triggered - * - * // We may want to look more closely at this: - * // since we've cached a drm, triggering a computation - * // it may not make sense to keep the same parallelism degree - * if (!(parallelismDeg == _ds.getParallelism)) { - * _ds.setParallelism(parallelismDeg).rebalance() - * } - * - */ - - datasetWrap(_ds) - } - - def uncache(): this.type = { - if (isCached) { - Hadoop2HDFSUtil.delete(cacheFileName) - isCached = false - } - this - } - - /** Writes a [[DataSet]] to the specified path and returns it as a DataSource for subsequent - * operations. - * - * @param dataset [[DataSet]] to write to disk - * @param path File path to write dataset to - * @tparam T Type of the [[DataSet]] elements - */ - def persist[T: ClassTag: TypeInformation](dataset: DataSet[T], path: String): Unit = { - val env = dataset.getExecutionEnvironment - val outputFormat = new TypeSerializerOutputFormat[T] - val filePath = new Path(path) - - outputFormat.setOutputFilePath(filePath) - outputFormat.setWriteMode(WriteMode.OVERWRITE) - - dataset.output(outputFormat) - env.execute("FlinkTools persist") - } - - /** Read a [[DataSet]] from specified path and returns it as a DataSource for subsequent - * operations. - * - * @param path File path to read dataset from - * @param ds persisted ds to retrieve type information and environment forom - * @tparam T key Type of the [[DataSet]] elements - * @return [[DataSet]] the persisted dataset - */ - def readPersistedDataSet[T: ClassTag : TypeInformation] - (path: String, ds: DataSet[T]): DataSet[T] = { - - val env = ds.getExecutionEnvironment - val inputFormat = new TypeSerializerInputFormat[T](ds.getType()) - val filePath = new Path(path) - inputFormat.setFilePath(filePath) - - env.createInput(inputFormat) - } - - - // Members declared in org.apache.mahout.math.drm.DrmLike - - protected[mahout] def canHaveMissingRows: Boolean = _canHaveMissingRows - - def checkpoint(cacheHint: CacheHint.CacheHint): CheckpointedDrm[K] = { - this - } - - def collect: Matrix = { - val data = ds.collect() - val isDense = data.forall(_._2.isDense) - - val cols = ncol - val rows = safeToNonNegInt(nrow) - - val m = if (isDense) { - new DenseMatrix(rows, cols) - } else { - new SparseMatrix(rows, cols) - } - - val intRowIndices = keyClassTag == implicitly[ClassTag[Int]] - - if (intRowIndices) { - data.foreach { case (t, vec) => - val idx = t.asInstanceOf[Int] - m(idx, ::) := vec - } - - println(m.ncol, m.nrow) - } else { - // assign all rows sequentially - val d = data.zipWithIndex - d.foreach { - case ((_, vec), idx) => m(idx, ::) := vec - } - - val rowBindings = d.map { - case ((t, _), idx) => (t.toString, idx: java.lang.Integer) - }.toMap.asJava - - m.setRowLabelBindings(rowBindings) - } - - m - } - - def dfsWrite(path: String): Unit = { - val env = ds.getExecutionEnvironment - - val keyTag = implicitly[ClassTag[K]] - - val job = new JobConf - FileOutputFormat.setOutputPath(job, new org.apache.hadoop.fs.Path(path)) - - // explicitly define all Writable Subclasses for ds.map() keys - // as well as the SequenceFileOutputFormat paramaters - if (keyTag.runtimeClass == classOf[Int]) { - // explicitly map into Int keys - implicit val typeInformation = createTypeInformation[(IntWritable,VectorWritable)] - val writableDataset = ds.map(new MapFunction[DrmTuple[K], (IntWritable, VectorWritable)] { - def map(tuple: DrmTuple[K]): (IntWritable, VectorWritable) = - (new IntWritable(tuple._1.asInstanceOf[Int]), new VectorWritable(tuple._2)) - }) - - // setup sink for IntWritable - job.setOutputKeyClass(classOf[IntWritable]) - job.setOutputValueClass(classOf[VectorWritable]) - val sequenceFormat = new SequenceFileOutputFormat[IntWritable, VectorWritable] - val hadoopOutput = new HadoopOutputFormat(sequenceFormat, job) - writableDataset.output(hadoopOutput) - - } else if (keyTag.runtimeClass == classOf[String]) { - // explicitly map into Text keys - val writableDataset = ds.map(new MapFunction[DrmTuple[K], (Text, VectorWritable)] { - def map(tuple: DrmTuple[K]): (Text, VectorWritable) = - (new Text(tuple._1.asInstanceOf[String]), new VectorWritable(tuple._2)) - }) - - // setup sink for Text - job.setOutputKeyClass(classOf[Text]) - job.setOutputValueClass(classOf[VectorWritable]) - val sequenceFormat = new SequenceFileOutputFormat[Text, VectorWritable] - val hadoopOutput = new HadoopOutputFormat(sequenceFormat, job) - writableDataset.output(hadoopOutput) - - } else if (keyTag.runtimeClass == classOf[Long]) { - // explicitly map into Long keys - val writableDataset = ds.map(new MapFunction[DrmTuple[K], (LongWritable, VectorWritable)] { - def map(tuple: DrmTuple[K]): (LongWritable, VectorWritable) = - (new LongWritable(tuple._1.asInstanceOf[Long]), new VectorWritable(tuple._2)) - }) - - // setup sink for LongWritable - job.setOutputKeyClass(classOf[LongWritable]) - job.setOutputValueClass(classOf[VectorWritable]) - val sequenceFormat = new SequenceFileOutputFormat[LongWritable, VectorWritable] - val hadoopOutput = new HadoopOutputFormat(sequenceFormat, job) - writableDataset.output(hadoopOutput) - - } else throw new IllegalArgumentException("Do not know how to convert class tag %s to Writable.".format(keyTag)) - - env.execute(s"dfsWrite($path)") - } - - private def keyToWritableFunc[K: ClassTag](keyTag: ClassTag[K]): (K) => Writable = { - if (keyTag.runtimeClass == classOf[Int]) { - (x: K) => new IntWritable(x.asInstanceOf[Int]) - } else if (keyTag.runtimeClass == classOf[String]) { - (x: K) => new Text(x.asInstanceOf[String]) - } else if (keyTag.runtimeClass == classOf[Long]) { - (x: K) => new LongWritable(x.asInstanceOf[Long]) - } else { - throw new IllegalArgumentException("Do not know how to convert class tag %s to Writable.".format(keyTag)) - } - } - - def newRowCardinality(n: Int): CheckpointedDrm[K] = { - assert(n > -1) - assert(n >= nrow) - new CheckpointedFlinkDrm(ds = ds, _nrow = n, _ncol = _ncol, cacheHint = cacheHint, - partitioningTag = partitioningTag, _canHaveMissingRows = _canHaveMissingRows) - } - - override val context: DistributedContext = ds.getExecutionEnvironment - -} - -object CheckpointedFlinkDrm { - val UNKNOWN = -1 - -} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/mahout/blob/99a5358f/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrmOps.scala ---------------------------------------------------------------------- diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrmOps.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrmOps.scala deleted file mode 100644 index e65c43d..0000000 --- a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrmOps.scala +++ /dev/null @@ -1,34 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.mahout.flinkbindings.drm - -import org.apache.mahout.math.drm.CheckpointedDrm - -import scala.reflect.ClassTag - -class CheckpointedFlinkDrmOps[K: ClassTag](drm: CheckpointedDrm[K]) { - assert(drm.isInstanceOf[CheckpointedFlinkDrm[K]], "must be a Flink-backed matrix") - - private[flinkbindings] val flinkDrm = drm.asInstanceOf[CheckpointedFlinkDrm[K]] - - /** Flink matrix customization exposure */ - def dataset = flinkDrm.ds - -} http://git-wip-us.apache.org/repos/asf/mahout/blob/99a5358f/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/FlinkDrm.scala ---------------------------------------------------------------------- diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/FlinkDrm.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/FlinkDrm.scala deleted file mode 100644 index aea62fa..0000000 --- a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/FlinkDrm.scala +++ /dev/null @@ -1,108 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.mahout.flinkbindings.drm - -import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.scala._ -import org.apache.mahout.flinkbindings.{BlockifiedDrmDataSet, DrmDataSet, FlinkDistributedContext, wrapContext} -import org.apache.mahout.math.scalabindings.RLikeOps._ -import org.apache.mahout.math.scalabindings._ -import org.apache.mahout.math.{DenseMatrix, Matrix, SparseRowMatrix} - -import scala.reflect.ClassTag - -trait FlinkDrm[K] { - def executionEnvironment: ExecutionEnvironment - def context: FlinkDistributedContext - def isBlockified: Boolean - - def asBlockified: BlockifiedFlinkDrm[K] - def asRowWise: RowsFlinkDrm[K] - - def classTag: ClassTag[K] -} - -class RowsFlinkDrm[K: TypeInformation: ClassTag](val ds: DrmDataSet[K], val ncol: Int) extends FlinkDrm[K] { - - def executionEnvironment = ds.getExecutionEnvironment - def context: FlinkDistributedContext = ds.getExecutionEnvironment - - def isBlockified = false - - def asBlockified : BlockifiedFlinkDrm[K] = { - val ncolLocal = ncol - val classTag = implicitly[ClassTag[K]] - - val parts = ds.mapPartition { - values => - val (keys, vectors) = values.toIterable.unzip - - if (vectors.nonEmpty) { - val vector = vectors.head - val matrix: Matrix = if (vector.isDense) { - val matrix = new DenseMatrix(vectors.size, ncolLocal) - vectors.zipWithIndex.foreach { case (vec, idx) => matrix(idx, ::) := vec } - matrix - } else { - new SparseRowMatrix(vectors.size, ncolLocal, vectors.toArray) - } - - Seq((keys.toArray(classTag), matrix)) - } else { - Seq() - } - } - - new BlockifiedFlinkDrm[K](parts, ncol) - } - - def asRowWise = this - - def classTag = implicitly[ClassTag[K]] - -} - -class BlockifiedFlinkDrm[K: TypeInformation: ClassTag](val ds: BlockifiedDrmDataSet[K], val ncol: Int) extends FlinkDrm[K] { - - - def executionEnvironment = ds.getExecutionEnvironment - def context: FlinkDistributedContext = ds.getExecutionEnvironment - - - def isBlockified = true - - def asBlockified = this - - def asRowWise = { - val out = ds.flatMap { - tuple => - val keys = tuple._1 - val block = tuple._2 - - keys.view.zipWithIndex.map { - case (key, idx) => (key, block(idx, ::)) - } - } - - new RowsFlinkDrm[K](out, ncol) - } - - def classTag = implicitly[ClassTag[K]] - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/mahout/blob/99a5358f/flink/src/main/scala/org/apache/mahout/flinkbindings/io/DrmMetadata.scala ---------------------------------------------------------------------- diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/io/DrmMetadata.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/io/DrmMetadata.scala deleted file mode 100644 index 83ede9a..0000000 --- a/flink/src/main/scala/org/apache/mahout/flinkbindings/io/DrmMetadata.scala +++ /dev/null @@ -1,71 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.mahout.flinkbindings.io - -import scala.reflect.ClassTag -import org.apache.hadoop.io._ -import java.util.Arrays - -/** - * Flink DRM Metadata - */ -class DrmMetadata( - - /** Writable key type as a sub-type of Writable */ - val keyTypeWritable: Class[_], - - /** Value writable type, as a sub-type of Writable */ - val valueTypeWritable: Class[_]) { - - import DrmMetadata._ - - /** - * @param keyClassTag: Actual drm key class tag once converted out of writable - * @param keyW2ValFunc: Conversion from Writable to value type of the DRM key - */ - val (keyClassTag: ClassTag[_], unwrapKeyFunction: ((Writable) => Any)) = keyTypeWritable match { - case cz if cz == classOf[IntWritable] => ClassTag.Int -> w2int _ - case cz if cz == classOf[LongWritable] => ClassTag.Long -> w2long _ - case cz if cz == classOf[DoubleWritable] => ClassTag.Double -> w2double _ - case cz if cz == classOf[FloatWritable] => ClassTag.Float -> w2float _ - case cz if cz == classOf[Text] => ClassTag(classOf[String]) -> w2string _ - case cz if cz == classOf[BooleanWritable] => ClassTag(classOf[Boolean]) -> w2bool _ - case cz if cz == classOf[BytesWritable] => ClassTag(classOf[Array[Byte]]) -> w2bytes _ - case _ => throw new IllegalArgumentException(s"Unsupported DRM key type:${keyTypeWritable.getName}") - } - -} - -object DrmMetadata { - - private[io] def w2int(w: Writable) = w.asInstanceOf[IntWritable].get() - - private[io] def w2long(w: Writable) = w.asInstanceOf[LongWritable].get() - - private[io] def w2double(w: Writable) = w.asInstanceOf[DoubleWritable].get() - - private[io] def w2float(w: Writable) = w.asInstanceOf[FloatWritable].get() - - private[io] def w2string(w: Writable) = w.asInstanceOf[Text].toString() - - private[io] def w2bool(w: Writable) = w.asInstanceOf[BooleanWritable].get() - - private[io] def w2bytes(w: Writable) = Arrays.copyOf(w.asInstanceOf[BytesWritable].getBytes(), - w.asInstanceOf[BytesWritable].getLength()) -} http://git-wip-us.apache.org/repos/asf/mahout/blob/99a5358f/flink/src/main/scala/org/apache/mahout/flinkbindings/io/HDFSPathSearch.scala ---------------------------------------------------------------------- diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/io/HDFSPathSearch.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/io/HDFSPathSearch.scala deleted file mode 100644 index b027878..0000000 --- a/flink/src/main/scala/org/apache/mahout/flinkbindings/io/HDFSPathSearch.scala +++ /dev/null @@ -1,83 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.mahout.flinkbindings.io - -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{FileStatus, FileSystem, Path} - - -/** - * Returns a [[java.lang.String]], which is comma delimited list of URIs discovered based on parameters - * in the constructor. - * The String is formatted to be input into [[org.apache.flink.api.scala.ExecutionEnvironment#textFile()]] - * @param pathURI Where to start looking for inFiles, may be a list of comma delimited URIs - * @param filePattern regex that must match the entire filename to have the file returned - * @param recursive true traverses the filesystem recursively, default = false - * - */ -case class HDFSPathSearch(pathURI: String, filePattern: String = "", recursive: Boolean = false) { - - val conf = new Configuration() - val fs = FileSystem.get(conf) - - /** - * Returns a string of comma delimited URIs matching the filePattern - * When pattern matching dirs are never returned, only traversed. - */ - def uris: String = { - if (!filePattern.isEmpty) { // have file pattern so - val pathURIs = pathURI.split(",") - var files = "" - for (uri <- pathURIs) { - files = findFiles(uri, filePattern, files) - } - if (files.length > 0 && files.endsWith(",")) files = files.dropRight(1) // drop the last comma - files - } else { - pathURI - } - } - - /** - * Find matching files in the dir, recursively call self when another directory is found - * Only files are matched, directories are traversed but never return a match - */ - private def findFiles(dir: String, filePattern: String = ".*", files: String = ""): String = { - val seed = fs.getFileStatus(new Path(dir)) - var f: String = files - - if (seed.isDirectory) { - val fileStatuses: Array[FileStatus] = fs.listStatus(new Path(dir)) - for (fileStatus <- fileStatuses) { - if (fileStatus.getPath().getName().matches(filePattern) - && !fileStatus.isDirectory) { - // found a file - if (fileStatus.getLen() != 0) { - // file is not empty - f = f + fileStatus.getPath.toUri.toString + "," - } - } else if (fileStatus.isDirectory && recursive) { - f = findFiles(fileStatus.getPath.toString, filePattern, f) - } - } - } else { f = dir } // was a filename not dir - f - } - -} http://git-wip-us.apache.org/repos/asf/mahout/blob/99a5358f/flink/src/main/scala/org/apache/mahout/flinkbindings/io/HDFSUtil.scala ---------------------------------------------------------------------- diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/io/HDFSUtil.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/io/HDFSUtil.scala deleted file mode 100644 index 73436f1..0000000 --- a/flink/src/main/scala/org/apache/mahout/flinkbindings/io/HDFSUtil.scala +++ /dev/null @@ -1,33 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.mahout.flinkbindings.io - -/** - * High level Hadoop version-specific hdfs manipulations we need in context of our operations. - * - */ -trait HDFSUtil { - - /** - * Read DRM header information off (H)DFS. - */ - def readDrmHeader(path: String): DrmMetadata - -} - http://git-wip-us.apache.org/repos/asf/mahout/blob/99a5358f/flink/src/main/scala/org/apache/mahout/flinkbindings/io/Hadoop2HDFSUtil.scala ---------------------------------------------------------------------- diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/io/Hadoop2HDFSUtil.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/io/Hadoop2HDFSUtil.scala deleted file mode 100644 index 211088a..0000000 --- a/flink/src/main/scala/org/apache/mahout/flinkbindings/io/Hadoop2HDFSUtil.scala +++ /dev/null @@ -1,83 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.mahout.flinkbindings.io - -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.Path -import org.apache.hadoop.io.SequenceFile.Reader -import org.apache.hadoop.io.Writable - -object Hadoop2HDFSUtil extends HDFSUtil { - - /** - * Read the header of a sequence file and determine the Key and Value type - * @param path - hdfs path of Sequence File - * @return - */ - def readDrmHeader(path: String): DrmMetadata = { - val dfsPath = new Path(path) - val conf = new Configuration() - val fs = dfsPath.getFileSystem(conf) - - fs.setConf(conf) - - val partFilePath: Path = fs.listStatus(dfsPath) - - // Filter out anything starting with . - .filter { s => - !s.getPath.getName.startsWith("\\.") && !s.getPath.getName.startsWith("_") && !s.isDirectory - } - - // Take path - .map(_.getPath) - - // Take only one, if any - .headOption - - // Require there's at least one partition file found. - .getOrElse { - throw new IllegalArgumentException(s"No partition files found in ${dfsPath.toString}.") - } - - val reader = new Reader(fs.getConf, Reader.file(partFilePath)) - - try { - new DrmMetadata( - keyTypeWritable = reader.getKeyClass.asSubclass(classOf[Writable]), - valueTypeWritable = reader.getValueClass.asSubclass(classOf[Writable])) - } finally { - reader.close() - } - - } - - /** - * Delete a path from the filesystem - * @param path - hdfs path - */ - def delete(path: String) { - val dfsPath = new Path(path) - val fs = dfsPath.getFileSystem(new Configuration()) - - if (fs.exists(dfsPath)) { - fs.delete(dfsPath, true) - } - } - -} http://git-wip-us.apache.org/repos/asf/mahout/blob/99a5358f/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala ---------------------------------------------------------------------- diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala deleted file mode 100644 index cf4da41..0000000 --- a/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala +++ /dev/null @@ -1,114 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.mahout - -import org.apache.flink.api.common.functions.MapFunction -import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.scala.utils._ -import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment, _} -import org.apache.mahout.flinkbindings.drm.{CheckpointedFlinkDrm, CheckpointedFlinkDrmOps, FlinkDrm, RowsFlinkDrm} -import org.apache.mahout.math.drm.{BlockifiedDrmTuple, CheckpointedDrm, DistributedContext, DrmTuple, _} -import org.apache.mahout.math.{DenseVector, Matrix, MatrixWritable, Vector, VectorWritable} -import org.slf4j.LoggerFactory - -import scala.Array._ -import scala.reflect.ClassTag - -package object flinkbindings { - - private[flinkbindings] val log = LoggerFactory.getLogger("org.apache.mahout.flinkbindings") - - /** Row-wise organized DRM dataset type */ - type DrmDataSet[K] = DataSet[DrmTuple[K]] - - /** - * Blockified DRM dataset (keys of original DRM are grouped into array corresponding to rows of Matrix - * object value - */ - type BlockifiedDrmDataSet[K] = DataSet[BlockifiedDrmTuple[K]] - - implicit def wrapMahoutContext(context: DistributedContext): FlinkDistributedContext = { - assert(context.isInstanceOf[FlinkDistributedContext], "it must be FlinkDistributedContext") - context.asInstanceOf[FlinkDistributedContext] - } - - implicit def wrapContext(env: ExecutionEnvironment): FlinkDistributedContext = - new FlinkDistributedContext(env) - - implicit def unwrapContext(ctx: FlinkDistributedContext): ExecutionEnvironment = ctx.env - - private[flinkbindings] implicit def castCheckpointedDrm[K: ClassTag](drm: CheckpointedDrm[K]) - : CheckpointedFlinkDrm[K] = { - - assert(drm.isInstanceOf[CheckpointedFlinkDrm[K]], "it must be a Flink-backed matrix") - drm.asInstanceOf[CheckpointedFlinkDrm[K]] - } - - implicit def checkpointedDrmToFlinkDrm[K: TypeInformation: ClassTag](cp: CheckpointedDrm[K]): FlinkDrm[K] = { - val flinkDrm = castCheckpointedDrm(cp) - new RowsFlinkDrm[K](flinkDrm.ds, flinkDrm.ncol) - } - - /** Adding Flink-specific ops */ - implicit def cpDrm2cpDrmFlinkOps[K: ClassTag](drm: CheckpointedDrm[K]): CheckpointedFlinkDrmOps[K] = - new CheckpointedFlinkDrmOps[K](drm) - - implicit def drm2cpDrmFlinkOps[K: ClassTag](drm: DrmLike[K]): CheckpointedFlinkDrmOps[K] = drm: CheckpointedDrm[K] - - - private[flinkbindings] implicit def wrapAsWritable(m: Matrix): MatrixWritable = new MatrixWritable(m) - private[flinkbindings] implicit def wrapAsWritable(v: Vector): VectorWritable = new VectorWritable(v) - private[flinkbindings] implicit def unwrapFromWritable(w: MatrixWritable): Matrix = w.get() - private[flinkbindings] implicit def unwrapFromWritable(w: VectorWritable): Vector = w.get() - - - def readCsv(file: String, delim: String = ",", comment: String = "#") - (implicit dc: DistributedContext): CheckpointedDrm[Long] = { - val vectors = dc.env.readTextFile(file) - .filter((in: String) => { - !in.startsWith(comment) - }) - .map(new MapFunction[String, Vector] { - def map(in: String): Vector = { - val array = in.split(delim).map(_.toDouble) - new DenseVector(array) - } - }) - datasetToDrm(vectors) - } - - def datasetToDrm(ds: DataSet[Vector]): CheckpointedDrm[Long] = { - val zipped = ds.zipWithIndex - datasetWrap(zipped) - } - - def datasetWrap[K: ClassTag](dataset: DataSet[(K, Vector)]): CheckpointedDrm[K] = { - implicit val typeInformation = FlinkEngine.generateTypeInformation[K] - new CheckpointedFlinkDrm[K](dataset) - } - - private[flinkbindings] def extractRealClassTag[K: ClassTag](drm: DrmLike[K]): ClassTag[_] = drm.keyClassTag - - private[flinkbindings] def getMahoutHome() = { - var mhome = System.getenv("MAHOUT_HOME") - if (mhome == null) mhome = System.getProperty("mahout.home") - require(mhome != null, "MAHOUT_HOME is required to spawn mahout-based flink jobs") - mhome - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/mahout/blob/99a5358f/flink/src/test/scala/org/apache/mahout/flinkbindings/DistributedFlinkSuite.scala ---------------------------------------------------------------------- diff --git a/flink/src/test/scala/org/apache/mahout/flinkbindings/DistributedFlinkSuite.scala b/flink/src/test/scala/org/apache/mahout/flinkbindings/DistributedFlinkSuite.scala deleted file mode 100644 index 094c45b..0000000 --- a/flink/src/test/scala/org/apache/mahout/flinkbindings/DistributedFlinkSuite.scala +++ /dev/null @@ -1,74 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.mahout.flinkbindings - -import java.util.concurrent.TimeUnit - -import org.apache.flink.api.scala.ExecutionEnvironment -import org.apache.flink.test.util.{ForkableFlinkMiniCluster, TestBaseUtils} -import org.apache.mahout.math.drm.DistributedContext -import org.apache.mahout.test.DistributedMahoutSuite -import org.scalatest.{ConfigMap, Suite} - -import scala.concurrent.duration.FiniteDuration - -trait DistributedFlinkSuite extends DistributedMahoutSuite { this: Suite => - - protected implicit var mahoutCtx: DistributedContext = _ - protected var env: ExecutionEnvironment = null - - var cluster: Option[ForkableFlinkMiniCluster] = None - val parallelism = 4 - protected val DEFAULT_AKKA_ASK_TIMEOUT: Long = 1000 - protected var DEFAULT_TIMEOUT: FiniteDuration = new FiniteDuration(DEFAULT_AKKA_ASK_TIMEOUT, TimeUnit.SECONDS) - - def initContext() { - mahoutCtx = wrapContext(env) - } - - override def beforeEach() { - initContext() - } - - override def afterEach() { - super.afterEach() - } - - override protected def afterAll(configMap: ConfigMap): Unit = { - super.afterAll(configMap) - cluster.foreach(c => TestBaseUtils.stopCluster(c, DEFAULT_TIMEOUT)) - } - - override protected def beforeAll(configMap: ConfigMap): Unit = { - super.beforeAll(configMap) - - val cl = TestBaseUtils.startCluster( - 1, - parallelism, - false, - false, - true) - - env = ExecutionEnvironment.createLocalEnvironment(parallelism) - - cluster = Some(cl) - initContext() - } - -} http://git-wip-us.apache.org/repos/asf/mahout/blob/99a5358f/flink/src/test/scala/org/apache/mahout/flinkbindings/DrmLikeOpsSuite.scala ---------------------------------------------------------------------- diff --git a/flink/src/test/scala/org/apache/mahout/flinkbindings/DrmLikeOpsSuite.scala b/flink/src/test/scala/org/apache/mahout/flinkbindings/DrmLikeOpsSuite.scala deleted file mode 100644 index 288561b..0000000 --- a/flink/src/test/scala/org/apache/mahout/flinkbindings/DrmLikeOpsSuite.scala +++ /dev/null @@ -1,93 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.mahout.flinkbindings - -import org.apache.mahout.logging.info -import org.apache.mahout.math.DenseMatrix -import org.apache.mahout.math.drm.RLikeDrmOps._ -import org.apache.mahout.math.drm._ -import org.apache.mahout.math.scalabindings.RLikeOps._ -import org.apache.mahout.math.scalabindings._ -import org.scalatest.FunSuite - - -class DrmLikeOpsSuite extends FunSuite with DistributedFlinkSuite { - - test("norm") { - val inCoreA = dense((1, 2, 3), (2, 3, 4), (3, 4, 5)) - val A = drmParallelize(m = inCoreA, numPartitions = 2) - - (inCoreA.norm - A.norm) should be < 1e-6 - } - - test("colSums") { - val inCoreA = dense((1, 2, 3), (2, 3, 4), (3, 4, 5)) - val A = drmParallelize(m = inCoreA, numPartitions = 2) - - (inCoreA.colSums - A.colSums).norm(2) should be < 1e-6 - } - - test("rowSums") { - val inCoreA = dense((1, 2, 3), (2, 3, 4), (3, 4, 5)) - val A = drmParallelize(m = inCoreA, numPartitions = 2) - - (inCoreA.rowSums - A.rowSums).norm(2) should be < 1e-6 - } - - test("rowMeans") { - val inCoreA = dense((1, 2, 3), (2, 3, 4), (3, 4, 5)) - val A = drmParallelize(m = inCoreA, numPartitions = 2) - - (inCoreA.rowMeans - A.rowMeans).norm(2) should be < 1e-6 - } - - test("numNonZeroElementsPerColumn") { - val A = dense((0, 2), (3, 0), (0, -30)) - val drmA = drmParallelize(A, numPartitions = 2) - - drmA.numNonZeroElementsPerColumn() should equal(A.numNonZeroElementsPerColumn()) - } - - - test("drmParallelizeEmpty") { - val emptyDrm = drmParallelizeEmpty(nrow = 2, ncol = 2, numPartitions = 2) - val expected = dense((0, 0), (0, 0)) - - (emptyDrm.collect - expected).norm should be < 1e-6 - } - - test("Aggregating transpose") { - - val mxA = new DenseMatrix(20, 10) := 1 - - val drmA = drmParallelize(mxA, numPartitions = 3) - - val reassignedA = drmA.mapBlock() { case (keys, block) â - keys.map(_ % 3) â block - } - - val mxAggrA = reassignedA.t(::, 0 until 3).collect - - info(mxAggrA.toString) - - mxAggrA(0,0) shouldBe 7 - mxAggrA(0,1) shouldBe 7 - mxAggrA(0,2) shouldBe 6 - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/mahout/blob/99a5358f/flink/src/test/scala/org/apache/mahout/flinkbindings/FlinkByteBCastSuite.scala ---------------------------------------------------------------------- diff --git a/flink/src/test/scala/org/apache/mahout/flinkbindings/FlinkByteBCastSuite.scala b/flink/src/test/scala/org/apache/mahout/flinkbindings/FlinkByteBCastSuite.scala deleted file mode 100644 index 4953647..0000000 --- a/flink/src/test/scala/org/apache/mahout/flinkbindings/FlinkByteBCastSuite.scala +++ /dev/null @@ -1,38 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.mahout.flinkbindings - -import org.apache.mahout.math.scalabindings.RLikeOps._ -import org.apache.mahout.math.scalabindings._ -import org.scalatest.FunSuite - -class FlinkByteBCastSuite extends FunSuite { - - test("BCast vector") { - val v = dvec(1, 2, 3) - val vBc = FlinkByteBCast.wrap(v) - assert((v - vBc.value).norm(2) <= 1e-6) - } - - test("BCast matrix") { - val m = dense((1, 2), (3, 4)) - val mBc = FlinkByteBCast.wrap(m) - assert((m - mBc.value).norm <= 1e-6) - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/mahout/blob/99a5358f/flink/src/test/scala/org/apache/mahout/flinkbindings/RLikeOpsSuite.scala ---------------------------------------------------------------------- diff --git a/flink/src/test/scala/org/apache/mahout/flinkbindings/RLikeOpsSuite.scala b/flink/src/test/scala/org/apache/mahout/flinkbindings/RLikeOpsSuite.scala deleted file mode 100644 index 3e14d76..0000000 --- a/flink/src/test/scala/org/apache/mahout/flinkbindings/RLikeOpsSuite.scala +++ /dev/null @@ -1,326 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.mahout.flinkbindings - -import org.apache.mahout.flinkbindings._ -import org.apache.mahout.math._ -import org.apache.mahout.math.drm._ -import org.apache.mahout.math.drm.RLikeDrmOps._ -import org.apache.mahout.math.scalabindings._ -import org.apache.mahout.math.scalabindings.RLikeOps._ -import org.junit.runner.RunWith -import org.scalatest.FunSuite -import org.scalatest.junit.JUnitRunner -import org.slf4j.Logger -import org.slf4j.LoggerFactory - - -class RLikeOpsSuite extends FunSuite with DistributedFlinkSuite { - - val LOGGER = LoggerFactory.getLogger(getClass()) - - test("A %*% x") { - val inCoreA = dense((1, 2, 3), (2, 3, 4), (3, 4, 5)) - val A = drmParallelize(m = inCoreA, numPartitions = 2) - val x: Vector = (0, 1, 2) - - val res = A %*% x - - val b = res.collect(::, 0) - assert(b == dvec(8, 11, 14)) - } - - test("A.t") { - val inCoreA = dense((1, 2, 3), (2, 3, 4)) - val A = drmParallelize(m = inCoreA, numPartitions = 2) - val res = A.t.collect - - val expected = inCoreA.t - assert((res - expected).norm < 1e-6) - } - - test("A.t %*% x") { - val inCoreA = dense((1, 2, 3), (2, 3, 4)) - val A = drmParallelize(m = inCoreA, numPartitions = 2) - val x = dvec(3, 11) - val res = (A.t %*% x).collect(::, 0) - - val expected = inCoreA.t %*% x - assert((res - expected).norm(2) < 1e-6) - } - - test("A.t %*% B") { - val inCoreA = dense((1, 2), (2, 3), (3, 4)) - val inCoreB = dense((1, 2), (3, 4), (11, 4)) - - val A = drmParallelize(m = inCoreA, numPartitions = 2) - val B = drmParallelize(m = inCoreB, numPartitions = 2) - - val res = A.t %*% B - - val expected = inCoreA.t %*% inCoreB - assert((res.collect - expected).norm < 1e-6) - } - - test("A %*% B.t") { - val inCoreA = dense((1, 2), (2, 3), (3, 4)) - val inCoreB = dense((1, 2), (3, 4), (11, 4)) - - val A = drmParallelize(m = inCoreA, numPartitions = 2) - val B = drmParallelize(m = inCoreB, numPartitions = 2) - - val res = A %*% B.t - - val expected = inCoreA %*% inCoreB.t - assert((res.collect - expected).norm < 1e-6) - } - - test("A.t %*% A") { - val inCoreA = dense((1, 2), (2, 3), (3, 4)) - val A = drmParallelize(m = inCoreA, numPartitions = 2) - - val res = A.t %*% A - - val expected = inCoreA.t %*% inCoreA - assert((res.collect - expected).norm < 1e-6) - } - - test("A %*% B") { - val inCoreA = dense((1, 2), (2, 3), (3, 4)).t - val inCoreB = dense((1, 2), (3, 4), (11, 4)) - - val A = drmParallelize(m = inCoreA, numPartitions = 2) - val B = drmParallelize(m = inCoreB, numPartitions = 2) - - val res = A %*% B - - val expected = inCoreA %*% inCoreB - assert((res.collect - expected).norm < 1e-6) - } - - test("A %*% B.t test 2") { - val mxA = Matrices.symmetricUniformView(10, 7, 80085) - val mxB = Matrices.symmetricUniformView(30, 7, 31337) - val A = drmParallelize(mxA, 3) - val B = drmParallelize(mxB, 4) - - val ABt = (A %*% B.t).collect - (ABt - mxA %*% mxB.t).norm should be < 1e-7 - } - - test("ABt test") { - val mxX = dense((1, 2), (2, 3), (3, 4), (5, 6), (7, 8)) - val mxY = dense((1, 2), (2, 3), (3, 4), (5, 6), (7, 8), - (1, 2), (2, 3), (3, 4), (5, 6), (7, 8)) - - val drmX = drmParallelize(mxX, 3) - val drmY = drmParallelize(mxY, 4) - - val XYt = (drmX %*% drmY.t).collect - val control = mxX %*% mxY.t - (XYt - control).norm should be < 1e-7 - } - - - test("A * scalar") { - val inCoreA = dense((1, 2), (2, 3), (3, 4)) - val A = drmParallelize(m = inCoreA, numPartitions = 2) - - val res = A * 5 - assert((res.collect - inCoreA * 5).norm < 1e-6) - } - - test("A / scalar") { - val inCoreA = dense((1, 2), (2, 3), (3, 4)).t - val A = drmParallelize(m = inCoreA, numPartitions = 2) - - val res = A / 5 - assert((res.collect - (inCoreA / 5)).norm < 1e-6) - } - - test("A + scalar") { - val inCoreA = dense((1, 2), (2, 3), (3, 4)) - val A = drmParallelize(m = inCoreA, numPartitions = 2) - - val res = A + 5 - assert((res.collect - (inCoreA + 5)).norm < 1e-6) - } - - test("A - scalar") { - val inCoreA = dense((1, 2), (2, 3), (3, 4)) - val A = drmParallelize(m = inCoreA, numPartitions = 2) - - val res = A - 5 - assert((res.collect - (inCoreA - 5)).norm < 1e-6) - } - - test("A * B") { - val inCoreA = dense((1, 2), (2, 3), (3, 4)) - val inCoreB = dense((1, 2), (3, 4), (11, 4)) - - val A = drmParallelize(m = inCoreA, numPartitions = 2) - val B = drmParallelize(m = inCoreB, numPartitions = 2) - - val res = A * B - val expected = inCoreA * inCoreB - assert((res.collect - expected).norm < 1e-6) - } - - test("A / B") { - val inCoreA = dense((1, 2), (2, 3), (3, 4)) - val inCoreB = dense((1, 2), (3, 4), (11, 4)) - - val A = drmParallelize(m = inCoreA, numPartitions = 2) - val B = drmParallelize(m = inCoreB, numPartitions = 2) - - val res = A / B - val expected = inCoreA / inCoreB - assert((res.collect - expected).norm < 1e-6) - } - - test("A + B") { - val inCoreA = dense((1, 2), (2, 3), (3, 4)) - val inCoreB = dense((1, 2), (3, 4), (11, 4)) - - val A = drmParallelize(m = inCoreA, numPartitions = 2) - val B = drmParallelize(m = inCoreB, numPartitions = 2) - - val res = A + B - val expected = inCoreA + inCoreB - assert((res.collect - expected).norm < 1e-6) - } - - test("A - B") { - val inCoreA = dense((1, 2), (2, 3), (3, 4)) - val inCoreB = dense((1, 2), (3, 4), (11, 4)) - - val A = drmParallelize(m = inCoreA, numPartitions = 2) - val B = drmParallelize(m = inCoreB, numPartitions = 2) - - val res = A - B - val expected = inCoreA - inCoreB - assert((res.collect - expected).norm < 1e-6) - } - - test("A cbind B") { - val inCoreA = dense((1, 2), (2, 3), (3, 4)) - val inCoreB = dense((1, 2), (3, 4), (11, 4)) - val A = drmParallelize(m = inCoreA, numPartitions = 2) - val B = drmParallelize(m = inCoreB, numPartitions = 2) - - val res = A cbind B - val expected = dense((1, 2, 1, 2), (2, 3, 3, 4), (3, 4, 11, 4)) - assert((res.collect - expected).norm < 1e-6) - } - - test("1 cbind A") { - val inCoreA = dense((1, 2), (2, 3), (3, 4)) - val A = drmParallelize(m = inCoreA, numPartitions = 2) - - val res = 1 cbind A - val expected = dense((1, 1, 2), (1, 2, 3), (1, 3, 4)) - assert((res.collect - expected).norm < 1e-6) - } - - test("A cbind 1") { - val inCoreA = dense((1, 2), (2, 3), (3, 4)) - val A = drmParallelize(m = inCoreA, numPartitions = 2) - - val res = A cbind 1 - val expected = dense((1, 2, 1), (2, 3, 1), (3, 4, 1)) - assert((res.collect - expected).norm < 1e-6) - } - - test("A rbind B") { - val inCoreA = dense((1, 2), (2, 3), (3, 4)) - val inCoreB = dense((1, 2), (3, 4), (11, 4)) - val A = drmParallelize(m = inCoreA, numPartitions = 2) - val B = drmParallelize(m = inCoreB, numPartitions = 2) - - val res = A rbind B - val expected = dense((1, 2), (2, 3), (3, 4), (1, 2), (3, 4), (11, 4)) - assert((res.collect - expected).norm < 1e-6) - } - - test("A row slice") { - val inCoreA = dense((1, 2), (2, 3), (3, 4), (4, 4), (5, 5), (6, 7)) - val A = drmParallelize(m = inCoreA, numPartitions = 2) - - val res = A(2 until 5, ::) - val expected = inCoreA(2 until 5, ::) - assert((res.collect - expected).norm < 1e-6) - } - - test("A column slice") { - val inCoreA = dense((1, 2, 1, 2), (2, 3, 3, 4), (3, 4, 11, 4)) - val A = drmParallelize(m = inCoreA, numPartitions = 2) - - val res = A(::, 0 until 2) - val expected = inCoreA(::, 0 until 2) - assert((res.collect - expected).norm < 1e-6) - } - - test("A %*% inCoreB") { - val inCoreA = dense((1, 2), (2, 3), (3, 4)).t - val inCoreB = dense((1, 2), (3, 4), (11, 4)) - - val A = drmParallelize(m = inCoreA, numPartitions = 2) - - val res = A %*% inCoreB - - val expected = inCoreA %*% inCoreB - assert((res.collect - expected).norm < 1e-6) - } - - test("drmBroadcast") { - val inCoreA = dense((1, 2), (3, 4), (11, 4)) - val x = dvec(1, 2) - val A = drmParallelize(m = inCoreA, numPartitions = 2) - - val b = drmBroadcast(x) - - val res = A.mapBlock(1) { case (idx, block) => - (idx, (block %*% b).toColMatrix) - } - - val expected = inCoreA %*% x - assert((res.collect(::, 0) - expected).norm(2) < 1e-6) - } - - test("A.t %*% B with Long keys") { - val inCoreA = dense((1, 2), (3, 4), (3, 5)) - val inCoreB = dense((3, 5), (4, 6), (0, 1)) - - val A = drmParallelize(inCoreA, numPartitions = 2).mapBlock()({ - case (keys, block) => (keys.map(_.toLong), block) - }) - - val B = drmParallelize(inCoreB, numPartitions = 2).mapBlock()({ - case (keys, block) => (keys.map(_.toLong), block) - }) - - val C = A.t %*% B - val inCoreC = C.collect - val expected = inCoreA.t %*% inCoreB - - (inCoreC - expected).norm should be < 1E-10 - } - - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/mahout/blob/99a5358f/flink/src/test/scala/org/apache/mahout/flinkbindings/UseCasesSuite.scala ---------------------------------------------------------------------- diff --git a/flink/src/test/scala/org/apache/mahout/flinkbindings/UseCasesSuite.scala b/flink/src/test/scala/org/apache/mahout/flinkbindings/UseCasesSuite.scala deleted file mode 100644 index fa49114..0000000 --- a/flink/src/test/scala/org/apache/mahout/flinkbindings/UseCasesSuite.scala +++ /dev/null @@ -1,141 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.mahout.flinkbindings - -import org.apache.mahout.math.{Matrices, Vector} -import org.apache.mahout.math.drm.RLikeDrmOps._ -import org.apache.mahout.math.drm._ -import org.apache.mahout.math.function.IntIntFunction -import org.apache.mahout.math.scalabindings.RLikeOps._ -import org.apache.mahout.math.scalabindings._ -import org.scalatest.FunSuite -import org.slf4j.LoggerFactory - -import scala.util.hashing.MurmurHash3 - -class UseCasesSuite extends FunSuite with DistributedFlinkSuite { - - val LOGGER = LoggerFactory.getLogger(getClass()) - - test("use case: Power interation 1000 x 1000 matrix") { - val dim = 1000 - - // we want a symmetric matrix so we can have real eigenvalues - val inCoreA = symmtericMatrix(dim, max = 2000) - - val A = drmParallelize(m = inCoreA, numPartitions = 2) - - var x: Vector = 1 to dim map (_ => 1.0 / Math.sqrt(dim)) - var converged = false - - var iteration = 1 - - while (!converged) { - LOGGER.info(s"iteration #$iteration...") - - val Ax = A %*% x - var x_new = Ax.collect(::, 0) - x_new = x_new / x_new.norm(2) - - val diff = (x_new - x).norm(2) - LOGGER.info(s"difference norm is $diff") - - converged = diff < 1e-6 - iteration = iteration + 1 - x = x_new - } - - LOGGER.info("converged") - // TODO: add test that it's the 1st PC - } - - def symmtericMatrix(dim: Int, max: Int, seed: Int = 0x31337) = { - Matrices.functionalMatrixView(dim, dim, new IntIntFunction { - def apply(i: Int, j: Int): Double = { - val arr = Array(i + j, i * j, i + j + 31, i / (j + 1) + j / (i + 1)) - Math.abs(MurmurHash3.arrayHash(arr, seed) % max) - } - }) - } - - test("use case: OLS Regression") { - val inCoreA = dense((1, 2), (2, 3), (3, 4), (5, 6), (7, 8), (9, 10)) - val x = dvec(1, 2, 2, 3, 3, 3) - val A = drmParallelize(m = inCoreA, numPartitions = 2) - val AtA = A.t %*% A - val Atx = A.t %*% x - - val w = solve(AtA, Atx) - - val expected = solve(inCoreA.t %*% inCoreA, inCoreA.t %*% x) - assert((w(::, 0) - expected).norm(2) < 1e-6) - } - - test("use case: Ridge Regression") { - val inCoreA = dense((1, 2), (2, 3), (3, 4), (5, 6), (7, 8), (9, 10)) - val x = dvec(1, 2, 2, 3, 3, 3) - val A = drmParallelize(m = inCoreA, numPartitions = 2) - - val lambda = 1.0 - val reg = drmParallelize(diag(lambda, 2)) - - val w = solve(A.t %*% A + reg, A.t %*% x) - - val expected = solve(inCoreA.t %*% inCoreA + diag(lambda, 2), inCoreA.t %*% x) - assert((w(::, 0) - expected).norm(2) < 1e-6) - } - - // TODO: doesn't pass! - // Call to localhost/127.0.0.1:6498 failed on local exception - ignore("use case: trimmed-EVD via power iteration") { - val dim = 1000 - val k = 3 - - val inCoreA = symmtericMatrix(dim, max = 2000) - var A = drmParallelize(m = inCoreA, numPartitions = 2) - - val eigenvectors = for (i <- 0 until k) yield { - var x: Vector = 1 to dim map (_ => 1.0 / Math.sqrt(dim)) - var converged = false - - while (!converged) { - val Ax = A %*% x - var x_new = Ax.collect(::, 0) - x_new = x_new / x_new.norm(2) - - val diff = (x_new - x).norm(2) - - converged = diff < 1e-6 - x = x_new - } - - println(s"${i}th principal component found...") - // assuming 0th component of x is not zero - val evalue = (A %*% x).collect(0, 0) / x(0) - val evdComponent = drmParallelize(evalue * x cross x) - - A = A - evdComponent - - x - } - - eigenvectors.foreach(println(_)) - } - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/mahout/blob/99a5358f/flink/src/test/scala/org/apache/mahout/flinkbindings/blas/LATestSuite.scala ---------------------------------------------------------------------- diff --git a/flink/src/test/scala/org/apache/mahout/flinkbindings/blas/LATestSuite.scala b/flink/src/test/scala/org/apache/mahout/flinkbindings/blas/LATestSuite.scala deleted file mode 100644 index 95d0969..0000000 --- a/flink/src/test/scala/org/apache/mahout/flinkbindings/blas/LATestSuite.scala +++ /dev/null @@ -1,211 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.mahout.flinkbindings.blas - -import org.apache.flink.api.scala._ -import org.apache.mahout.flinkbindings._ -import org.apache.mahout.flinkbindings.drm.CheckpointedFlinkDrm -import org.apache.mahout.math._ -import org.apache.mahout.math.drm._ -import org.apache.mahout.math.drm.logical.{OpAx, _} -import org.apache.mahout.math.scalabindings.RLikeOps._ -import org.apache.mahout.math.scalabindings._ -import org.scalatest.FunSuite - -class LATestSuite extends FunSuite with DistributedFlinkSuite { - - test("Ax blockified") { - val inCoreA = dense((1, 2, 3), (2, 3, 4), (3, 4, 5)) - val A = drmParallelize(m = inCoreA, numPartitions = 2) - val x: Vector = (0, 1, 2) - - val opAx = new OpAx(A, x) - val res = FlinkOpAx.blockifiedBroadcastAx(opAx, A) - val drm = new CheckpointedFlinkDrm(res.asRowWise.ds) - val output = drm.collect - - val b = output(::, 0) - assert(b == dvec(8, 11, 14)) - } - - test("At sparseTrick") { - val inCoreA = dense((1, 2, 3), (2, 3, 4)) - val A = drmParallelize(m = inCoreA, numPartitions = 2) - - val opAt = new OpAt(A) - val res = FlinkOpAt.sparseTrick(opAt, A) - val drm = new CheckpointedFlinkDrm(res.asRowWise.ds, _nrow=inCoreA.ncol, _ncol=inCoreA.nrow) - val output = drm.collect - - assert((output - inCoreA.t).norm < 1e-6) - } - - test("AtB notZippable") { - val inCoreAt = dense((1, 2), (2, 3), (3, 4)) - - val At = drmParallelize(m = inCoreAt, numPartitions = 2) - - val inCoreB = dense((1, 2), (3, 4), (11, 4)) - val B = drmParallelize(m = inCoreB, numPartitions = 2) - - val opAtB = new OpAtB(At, B) - val res = FlinkOpAtB.notZippable(opAtB, At, B) - - val drm = new CheckpointedFlinkDrm(res.asRowWise.ds, _nrow=inCoreAt.ncol, _ncol=inCoreB.ncol) - val output = drm.collect - - val expected = inCoreAt.t %*% inCoreB - assert((output - expected).norm < 1e-6) - } - - test("AewScalar opScalarNoSideEffect") { - val inCoreA = dense((1, 2), (2, 3), (3, 4)) - val A = drmParallelize(m = inCoreA, numPartitions = 2) - val scalar = 5.0 - - val op = new OpAewScalar(A, scalar, "*") - val res = FlinkOpAewScalar.opScalarNoSideEffect(op, A, scalar) - - val drm = new CheckpointedFlinkDrm(res.asRowWise.ds, _nrow=inCoreA.nrow, _ncol=inCoreA.ncol) - val output = drm.collect - - val expected = inCoreA * scalar - assert((output - expected).norm < 1e-6) - } - - test("AewB rowWiseJoinNoSideEffect") { - val inCoreA = dense((1, 2), (2, 3), (3, 4)) - val A = drmParallelize(m = inCoreA, numPartitions = 2) - - val op = new OpAewB(A, A, "*") - val res = FlinkOpAewB.rowWiseJoinNoSideEffect(op, A, A) - - val drm = new CheckpointedFlinkDrm(res.asRowWise.ds, _nrow=inCoreA.nrow, _ncol=inCoreA.ncol) - val output = drm.collect - - assert((output - (inCoreA * inCoreA)).norm < 1e-6) - } - - test("Cbind") { - val inCoreA = dense((1, 2), (2, 3), (3, 4)) - val inCoreB = dense((4, 4), (5, 5), (6, 7)) - val A = drmParallelize(m = inCoreA, numPartitions = 2) - val B = drmParallelize(m = inCoreB, numPartitions = 2) - - val op = new OpCbind(A, B) - val res = FlinkOpCBind.cbind(op, A, B) - - val drm = new CheckpointedFlinkDrm(res.asRowWise.ds, _nrow=inCoreA.nrow, - _ncol= inCoreA.ncol + inCoreB.ncol) - val output = drm.collect - - val expected = dense((1, 2, 4, 4), (2, 3, 5, 5), (3, 4, 6, 7)) - assert((output - expected).norm < 1e-6) - } - - test("CbindScalar left") { - val inCoreA = dense((1, 2), (2, 3), (3, 4)) - val A = drmParallelize(m = inCoreA, numPartitions = 2) - - val op = new OpCbindScalar(A, 1, true) - val res = FlinkOpCBind.cbindScalar(op, A, 1) - - val drm = new CheckpointedFlinkDrm(res.asRowWise.ds, _nrow=inCoreA.nrow, - _ncol= inCoreA.ncol + 1) - val output = drm.collect - - val expected = dense((1, 1, 2), (1, 2, 3), (1, 3, 4)) - assert((output - expected).norm < 1e-6) - } - - test("CbindScalar right") { - val inCoreA = dense((1, 2), (2, 3), (3, 4)) - val A = drmParallelize(m = inCoreA, numPartitions = 2) - - val op = new OpCbindScalar(A, 1, false) - val res = FlinkOpCBind.cbindScalar(op, A, 1) - - val drm = new CheckpointedFlinkDrm(res.asRowWise.ds, _nrow=inCoreA.nrow, - _ncol= inCoreA.ncol + 1) - val output = drm.collect - - val expected = dense((1, 2, 1), (2, 3, 1), (3, 4, 1)) - assert((output - expected).norm < 1e-6) - } - - test("slice") { - val inCoreA = dense((1, 2), (2, 3), (3, 4), (4, 4), (5, 5), (6, 7)) - val A = drmParallelize(m = inCoreA, numPartitions = 2) - - val range = 2 until 5 - val op = new OpRowRange(A, range) - val res = FlinkOpRowRange.slice(op, A) - - val drm = new CheckpointedFlinkDrm(res.asRowWise.ds, _nrow=op.nrow, - _ncol=inCoreA.ncol) - val output = drm.collect - - val expected = inCoreA(2 until 5, ::) - assert((output - expected).norm < 1e-6) - } - - test("A times inCoreB") { - val inCoreA = dense((1, 2, 3), (2, 3, 1), (3, 4, 4), (4, 4, 5), (5, 5, 7), (6, 7, 11)) - val inCoreB = dense((2, 1), (3, 4), (5, 11)) - val A = drmParallelize(m = inCoreA, numPartitions = 2) - - val op = new OpTimesRightMatrix(A, inCoreB) - val res = FlinkOpTimesRightMatrix.drmTimesInCore(op, A, inCoreB) - - val drm = new CheckpointedFlinkDrm(res.asRowWise.ds, _nrow=op.nrow, - _ncol=op.ncol) - val output = drm.collect - - val expected = inCoreA %*% inCoreB - assert((output - expected).norm < 1e-6) - } - - test("At A slim") { - val inCoreA = dense((1, 2, 3), (2, 3, 1), (3, 4, 4), (4, 4, 5), (5, 5, 7), (6, 7, 11)) - val A = drmParallelize(m = inCoreA, numPartitions = 2) - - val op = new OpAtA(A) - val output = FlinkOpAtA.slim(op, A) - - val expected = inCoreA.t %*% inCoreA - assert((output - expected).norm < 1e-6) - } - - test("At A fat") { - val inCoreA = dense((1, 2, 3, 2, 3, 1), (3, 4, 4, 4, 4, 5), (5, 5, 7, 6, 7, 11)) - val A = drmParallelize(m = inCoreA, numPartitions = 2) - val Aany = A.asInstanceOf[CheckpointedDrm[Any]] - - val op = new OpAtA(Aany) - - val res = FlinkOpAtA.fat(op, Aany) - val drm = new CheckpointedFlinkDrm(res.asRowWise.ds, _nrow=op.nrow, _ncol=op.ncol) - val output = drm.collect - println(output) - - val expected = inCoreA.t %*% inCoreA - assert((output - expected).norm < 1e-6) - } - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/mahout/blob/99a5358f/flink/src/test/scala/org/apache/mahout/flinkbindings/examples/ReadCsvExample.scala ---------------------------------------------------------------------- diff --git a/flink/src/test/scala/org/apache/mahout/flinkbindings/examples/ReadCsvExample.scala b/flink/src/test/scala/org/apache/mahout/flinkbindings/examples/ReadCsvExample.scala deleted file mode 100644 index 4e713c7..0000000 --- a/flink/src/test/scala/org/apache/mahout/flinkbindings/examples/ReadCsvExample.scala +++ /dev/null @@ -1,39 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.mahout.flinkbindings.examples - -import org.apache.flink.api.scala.ExecutionEnvironment -import org.apache.mahout.math.drm._ -import org.apache.mahout.math.drm.RLikeDrmOps._ -import org.apache.mahout.flinkbindings._ - -object ReadCsvExample { - - def main(args: Array[String]): Unit = { - val filePath = "file:///c:/tmp/data/slashdot0902/Slashdot0902.txt" - - val env = ExecutionEnvironment.getExecutionEnvironment - implicit val ctx = new FlinkDistributedContext(env) - - val drm = readCsv(filePath, delim = "\t", comment = "#") - val C = drm.t %*% drm - println(C.collect) - } - -} http://git-wip-us.apache.org/repos/asf/mahout/blob/99a5358f/flink/src/test/scala/org/apache/mahout/flinkbindings/standard/ClusteringSuite.scala ---------------------------------------------------------------------- diff --git a/flink/src/test/scala/org/apache/mahout/flinkbindings/standard/ClusteringSuite.scala b/flink/src/test/scala/org/apache/mahout/flinkbindings/standard/ClusteringSuite.scala deleted file mode 100644 index ea86c91..0000000 --- a/flink/src/test/scala/org/apache/mahout/flinkbindings/standard/ClusteringSuite.scala +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.mahout.flinkbindings.standard - -import org.apache.mahout.flinkbindings.DistributedFlinkSuite -import org.apache.mahout.math.algorithms.ClusteringSuiteBase -import org.scalatest.FunSuite - -class ClusteringSuite extends FunSuite - with DistributedFlinkSuite with ClusteringSuiteBase - http://git-wip-us.apache.org/repos/asf/mahout/blob/99a5358f/flink/src/test/scala/org/apache/mahout/flinkbindings/standard/DrmLikeOpsSuite.scala ---------------------------------------------------------------------- diff --git a/flink/src/test/scala/org/apache/mahout/flinkbindings/standard/DrmLikeOpsSuite.scala b/flink/src/test/scala/org/apache/mahout/flinkbindings/standard/DrmLikeOpsSuite.scala deleted file mode 100644 index 3752187..0000000 --- a/flink/src/test/scala/org/apache/mahout/flinkbindings/standard/DrmLikeOpsSuite.scala +++ /dev/null @@ -1,28 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.mahout.flinkbindings.standard - -import org.apache.mahout.flinkbindings._ -import org.apache.mahout.math.drm._ -import org.scalatest.FunSuite - -class DrmLikeOpsSuite extends FunSuite with DistributedFlinkSuite - with DrmLikeOpsSuiteBase { - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/mahout/blob/99a5358f/flink/src/test/scala/org/apache/mahout/flinkbindings/standard/DrmLikeSuite.scala ---------------------------------------------------------------------- diff --git a/flink/src/test/scala/org/apache/mahout/flinkbindings/standard/DrmLikeSuite.scala b/flink/src/test/scala/org/apache/mahout/flinkbindings/standard/DrmLikeSuite.scala deleted file mode 100644 index 0a1653b..0000000 --- a/flink/src/test/scala/org/apache/mahout/flinkbindings/standard/DrmLikeSuite.scala +++ /dev/null @@ -1,28 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.mahout.flinkbindings.standard - -import org.apache.mahout.flinkbindings._ -import org.apache.mahout.math.drm._ -import org.scalatest.FunSuite - -class DrmLikeSuite extends FunSuite with DistributedFlinkSuite - with DrmLikeSuiteBase { - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/mahout/blob/99a5358f/flink/src/test/scala/org/apache/mahout/flinkbindings/standard/FlinkDistributedDecompositionsSuite.scala ---------------------------------------------------------------------- diff --git a/flink/src/test/scala/org/apache/mahout/flinkbindings/standard/FlinkDistributedDecompositionsSuite.scala b/flink/src/test/scala/org/apache/mahout/flinkbindings/standard/FlinkDistributedDecompositionsSuite.scala deleted file mode 100644 index a1054af..0000000 --- a/flink/src/test/scala/org/apache/mahout/flinkbindings/standard/FlinkDistributedDecompositionsSuite.scala +++ /dev/null @@ -1,221 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.mahout.flinkbindings.standard - -import org.apache.mahout.common.RandomUtils -import org.apache.mahout.flinkbindings._ -import org.apache.mahout.math.{Matrices, SparseRowMatrix} -import org.apache.mahout.math.decompositions._ -import org.apache.mahout.math.drm.{CacheHint, _} -import org.scalatest.{FunSuite, Matchers} -import org.apache.mahout.math._ -import scalabindings._ -import RLikeOps._ -import RLikeDrmOps._ - -import scala.math._ - -// Exact copy of the DistributedDecompositionsSuiteBase trait with the exception of the -// matrix size in the dals test which has been lowered to 350 x 350 from 500 x 500 -// due to some Flink serialization issues. - -class FlinkDistributedDecompositionsSuite extends FunSuite with DistributedFlinkSuite - with Matchers {this:FunSuite => - - - test("thin distributed qr") { - - val inCoreA = dense( - (1, 2, 3, 4), - (2, 3, 4, 5), - (3, -4, 5, 6), - (4, 5, 6, 7), - (8, 6, 7, 8) - ) - - val drmA = drmParallelize(inCoreA, numPartitions = 2) - val (drmQ, inCoreR) = dqrThin(drmA, checkRankDeficiency = false) - - // Assert optimizer still knows Q and A are identically partitioned - drmQ.partitioningTag should equal(drmA.partitioningTag) - - // drmQ.rdd.partitions.size should be(A.rdd.partitions.size) - // - // // Should also be zippable - // drmQ.rdd.zip(other = A.rdd) - - val inCoreQ = drmQ.collect - - printf("A=\n%s\n", inCoreA) - printf("Q=\n%s\n", inCoreQ) - printf("R=\n%s\n", inCoreR) - - val (qControl, rControl) = qr(inCoreA) - printf("qControl=\n%s\n", qControl) - printf("rControl=\n%s\n", rControl) - - // Validate with Cholesky - val ch = chol(inCoreA.t %*% inCoreA) - printf("A'A=\n%s\n", inCoreA.t %*% inCoreA) - printf("L:\n%s\n", ch.getL) - - val rControl2 = (ch.getL cloned).t - val qControl2 = ch.solveRight(inCoreA) - printf("qControl2=\n%s\n", qControl2) - printf("rControl2=\n%s\n", rControl2) - - // Householder approach seems to be a little bit more stable - (rControl - inCoreR).norm should be < 1E-5 - (qControl - inCoreQ).norm should be < 1E-5 - - // Assert identicity with in-core Cholesky-based -- this should be tighter. - (rControl2 - inCoreR).norm should be < 1E-10 - (qControl2 - inCoreQ).norm should be < 1E-10 - - // Assert orthogonality: - // (a) Q[,j] dot Q[,j] == 1.0 for all j - // (b) Q[,i] dot Q[,j] == 0.0 for all i != j - for (col <- 0 until inCoreQ.ncol) - ((inCoreQ(::, col) dot inCoreQ(::, col)) - 1.0).abs should be < 1e-10 - for (col1 <- 0 until inCoreQ.ncol - 1; col2 <- col1 + 1 until inCoreQ.ncol) - (inCoreQ(::, col1) dot inCoreQ(::, col2)).abs should be < 1e-10 - - - } - - test("dssvd - the naive-est - q=0") { - dssvdNaive(q = 0) - } - - test("ddsvd - naive - q=1") { - dssvdNaive(q = 1) - } - - test("ddsvd - naive - q=2") { - dssvdNaive(q = 2) - } - - - def dssvdNaive(q: Int) { - val inCoreA = dense( - (1, 2, 3, 4), - (2, 3, 4, 5), - (3, -4, 5, 6), - (4, 5, 6, 7), - (8, 6, 7, 8) - ) - val drmA = drmParallelize(inCoreA, numPartitions = 2) - - val (drmU, drmV, s) = dssvd(drmA, k = 4, q = q) - val (inCoreU, inCoreV) = (drmU.collect, drmV.collect) - - printf("U:\n%s\n", inCoreU) - printf("V:\n%s\n", inCoreV) - printf("Sigma:\n%s\n", s) - - (inCoreA - (inCoreU %*%: diagv(s)) %*% inCoreV.t).norm should be < 1E-5 - } - - test("dspca") { - - val rnd = RandomUtils.getRandom - - // Number of points - val m = 500 - // Length of actual spectrum - val spectrumLen = 40 - - val spectrum = dvec((0 until spectrumLen).map(x => 300.0 * exp(-x) max 1e-3)) - printf("spectrum:%s\n", spectrum) - - val (u, _) = qr(new SparseRowMatrix(m, spectrumLen) := - ((r, c, v) => if (rnd.nextDouble() < 0.2) 0 else rnd.nextDouble() + 5.0)) - - // PCA Rotation matrix -- should also be orthonormal. - val (tr, _) = qr(Matrices.symmetricUniformView(spectrumLen, spectrumLen, rnd.nextInt) - 10.0) - - val input = (u %*%: diagv(spectrum)) %*% tr.t - val drmInput = drmParallelize(m = input, numPartitions = 2) - - // Calculate just first 10 principal factors and reduce dimensionality. - // Since we assert just validity of the s-pca, not stochastic error, we bump p parameter to - // ensure to zero stochastic error and assert only functional correctness of the method's pca- - // specific additions. - val k = 10 - - // Calculate just first 10 principal factors and reduce dimensionality. - var (drmPCA, _, s) = dspca(drmA = drmInput, k = 10, p = spectrumLen, q = 1) - // Un-normalized pca data: - drmPCA = drmPCA %*% diagv(s) - - val pca = drmPCA.checkpoint(CacheHint.NONE).collect - - // Of course, once we calculated the pca, the spectrum is going to be different since our originally - // generated input was not centered. So here, we'd just brute-solve pca to verify - val xi = input.colMeans() - for (r <- 0 until input.nrow) input(r, ::) -= xi - var (pcaControl, _, sControl) = svd(m = input) - pcaControl = (pcaControl %*%: diagv(sControl))(::, 0 until k) - - printf("pca:\n%s\n", pca(0 until 10, 0 until 10)) - printf("pcaControl:\n%s\n", pcaControl(0 until 10, 0 until 10)) - - (pca(0 until 10, 0 until 10).norm - pcaControl(0 until 10, 0 until 10).norm).abs should be < 1E-5 - - } - - test("dals") { - - val rnd = RandomUtils.getRandom - - // Number of points - val m = 350 - val n = 350 - - // Length of actual spectrum - val spectrumLen = 40 - - // Create singluar values with decay - val spectrum = dvec((0 until spectrumLen).map(x => 300.0 * exp(-x) max 1e-3)) - printf("spectrum:%s\n", spectrum) - - // Create A as an ideal input - val inCoreA = (qr(Matrices.symmetricUniformView(m, spectrumLen, 1234))._1 %*%: diagv(spectrum)) %*% - qr(Matrices.symmetricUniformView(n, spectrumLen, 2345))._1.t - val drmA = drmParallelize(inCoreA, numPartitions = 2) - - // Decompose using ALS - val (drmU, drmV, rmse) = dals(drmA = drmA, k = 20).toTuple - val inCoreU = drmU.collect - val inCoreV = drmV.collect - - val predict = inCoreU %*% inCoreV.t - - printf("Control block:\n%s\n", inCoreA(0 until 3, 0 until 3)) - printf("ALS factorized approximation block:\n%s\n", predict(0 until 3, 0 until 3)) - - val err = (inCoreA - predict).norm - printf("norm of residuals %f\n", err) - printf("train iteration rmses: %s\n", rmse) - - err should be < 15e-2 - - } - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/mahout/blob/99a5358f/flink/src/test/scala/org/apache/mahout/flinkbindings/standard/NaiveBayesTestSuite.scala ---------------------------------------------------------------------- diff --git a/flink/src/test/scala/org/apache/mahout/flinkbindings/standard/NaiveBayesTestSuite.scala b/flink/src/test/scala/org/apache/mahout/flinkbindings/standard/NaiveBayesTestSuite.scala deleted file mode 100644 index 0f1d6bc..0000000 --- a/flink/src/test/scala/org/apache/mahout/flinkbindings/standard/NaiveBayesTestSuite.scala +++ /dev/null @@ -1,29 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.mahout.flinkbindings.standard - -import org.apache.mahout.classifier.naivebayes.NBTestBase -import org.apache.mahout.flinkbindings._ -import org.scalatest.FunSuite - - -class NaiveBayesTestSuite extends FunSuite with DistributedFlinkSuite - with NBTestBase { - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/mahout/blob/99a5358f/flink/src/test/scala/org/apache/mahout/flinkbindings/standard/PreprocessorSuite.scala ---------------------------------------------------------------------- diff --git a/flink/src/test/scala/org/apache/mahout/flinkbindings/standard/PreprocessorSuite.scala b/flink/src/test/scala/org/apache/mahout/flinkbindings/standard/PreprocessorSuite.scala deleted file mode 100644 index 5e2b4ee..0000000 --- a/flink/src/test/scala/org/apache/mahout/flinkbindings/standard/PreprocessorSuite.scala +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.mahout.flinkbindings.standard - - -import org.apache.mahout.flinkbindings.DistributedFlinkSuite -import org.apache.mahout.math.algorithms.PreprocessorSuiteBase -import org.scalatest.FunSuite - -class PreprocessorSuite extends FunSuite - with DistributedFlinkSuite with PreprocessorSuiteBase http://git-wip-us.apache.org/repos/asf/mahout/blob/99a5358f/flink/src/test/scala/org/apache/mahout/flinkbindings/standard/RLikeDrmOpsSuite.scala ---------------------------------------------------------------------- diff --git a/flink/src/test/scala/org/apache/mahout/flinkbindings/standard/RLikeDrmOpsSuite.scala b/flink/src/test/scala/org/apache/mahout/flinkbindings/standard/RLikeDrmOpsSuite.scala deleted file mode 100644 index 8bb1b02..0000000 --- a/flink/src/test/scala/org/apache/mahout/flinkbindings/standard/RLikeDrmOpsSuite.scala +++ /dev/null @@ -1,28 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.mahout.flinkbindings.standard - -import org.apache.mahout.flinkbindings._ -import org.apache.mahout.math.drm._ -import org.scalatest.FunSuite - -class RLikeDrmOpsSuite extends FunSuite with DistributedFlinkSuite - with RLikeDrmOpsSuiteBase { - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/mahout/blob/99a5358f/flink/src/test/scala/org/apache/mahout/flinkbindings/standard/RegressionSuite.scala ---------------------------------------------------------------------- diff --git a/flink/src/test/scala/org/apache/mahout/flinkbindings/standard/RegressionSuite.scala b/flink/src/test/scala/org/apache/mahout/flinkbindings/standard/RegressionSuite.scala deleted file mode 100644 index 5cb6183..0000000 --- a/flink/src/test/scala/org/apache/mahout/flinkbindings/standard/RegressionSuite.scala +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.mahout.flinkbindings.standard - - -import org.apache.mahout.flinkbindings.DistributedFlinkSuite -import org.apache.mahout.math.algorithms.RegressionSuiteBase -import org.scalatest.FunSuite - -class RegressionSuite extends FunSuite - with DistributedFlinkSuite with RegressionSuiteBase -
