http://git-wip-us.apache.org/repos/asf/mahout/blob/99a5358f/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
----------------------------------------------------------------------
diff --git 
a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
 
b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
deleted file mode 100644
index 1cba326..0000000
--- 
a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
+++ /dev/null
@@ -1,302 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.mahout.flinkbindings.drm
-
-import org.apache.flink.api.common.functions.{MapFunction, ReduceFunction}
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.io.{TypeSerializerInputFormat, 
TypeSerializerOutputFormat}
-import org.apache.flink.api.scala._
-import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.core.fs.Path
-import org.apache.flink.api.scala.hadoop.mapred.HadoopOutputFormat
-import org.apache.flink.configuration.GlobalConfiguration
-import org.apache.hadoop.io.{IntWritable, LongWritable, Text, Writable}
-import org.apache.hadoop.mapred.{FileOutputFormat, JobConf, 
SequenceFileOutputFormat}
-import org.apache.mahout.flinkbindings.io.Hadoop2HDFSUtil
-import org.apache.mahout.flinkbindings.{DrmDataSet, _}
-import org.apache.mahout.math._
-import org.apache.mahout.math.drm.CacheHint._
-import org.apache.mahout.math.drm.{CacheHint, CheckpointedDrm, 
DistributedContext, DrmTuple, _}
-import org.apache.mahout.math.scalabindings.RLikeOps._
-import org.apache.mahout.math.scalabindings._
-
-import scala.collection.JavaConverters._
-import scala.reflect.{ClassTag, classTag}
-import scala.util.Random
-
-class CheckpointedFlinkDrm[K: ClassTag:TypeInformation](val ds: DrmDataSet[K],
-      private var _nrow: Long = CheckpointedFlinkDrm.UNKNOWN,
-      private var _ncol: Int = CheckpointedFlinkDrm.UNKNOWN,
-      override val cacheHint: CacheHint = CacheHint.NONE,
-      override protected[mahout] val partitioningTag: Long = Random.nextLong(),
-      private var _canHaveMissingRows: Boolean = false
-  ) extends CheckpointedDrm[K] {
-
-  lazy val nrow: Long = if (_nrow >= 0) _nrow else dim._1
-  lazy val ncol: Int = if (_ncol >= 0) _ncol else dim._2
-
-  // persistance values
-  var cacheFileName: String = "undefinedCacheName"
-  var isCached: Boolean = false
-  var parallelismDeg: Int = -1
-  var persistanceRootDir: String = _
-
-  // need to make sure that this is actually getting the correct properties 
for {{taskmanager.tmp.dirs}}
-  val mahoutHome = getMahoutHome()
-
-  // this is extra I/O for each cache call.  this needs to be moved somewhere 
where it is called
-  // only once.  Possibly FlinkDistributedEngine.
-  GlobalConfiguration.loadConfiguration(mahoutHome + "/conf/flink-config.yaml")
-
-  val conf = GlobalConfiguration.getConfiguration
-
-  if (!(conf == null )) {
-     persistanceRootDir = conf.getString("taskmanager.tmp.dirs", "/tmp")
-  } else {
-     persistanceRootDir = "/tmp"
-  }
-
-
-  private lazy val dim: (Long, Int) = {
-    // combine computation of ncol and nrow in one pass
-
-    val res = ds.map(new MapFunction[DrmTuple[K], (Long, Int)] {
-      def map(value: DrmTuple[K]): (Long, Int) = {
-        (1L, value._2.length)
-      }
-    }).reduce(new ReduceFunction[(Long, Int)] {
-      def reduce(t1: (Long, Int), t2: (Long, Int)) = {
-        val ((rowCnt1, colNum1), (rowCnt2, colNum2)) = (t1, t2)
-        (rowCnt1 + rowCnt2, Math.max(colNum1, colNum2))
-      }
-    })
-
-    val list = res.collect()
-    list.head
-  }
-
-
-  override val keyClassTag: ClassTag[K] = classTag[K]
-
-  /** Note as of Flink 1.0.0, no direct flink caching exists so we save
-    * the dataset to the filesystem and read it back when cache is called */
-  def cache() = {
-    if (!isCached) {
-      cacheFileName = persistanceRootDir + "/" + System.nanoTime().toString
-      parallelismDeg = ds.getParallelism
-      isCached = true
-      persist(ds, cacheFileName)
-    }
-    val _ds = readPersistedDataSet(cacheFileName, ds)
-
-    /** Leave the parallelism degree to be set the operators
-      * TODO: find out a way to set the parallelism degree based on the
-      * final drm after computation is actually triggered
-      *
-      *  // We may want to look more closely at this:
-      *  // since we've cached a drm, triggering a computation
-      *  // it may not make sense to keep the same parallelism degree
-      *  if (!(parallelismDeg == _ds.getParallelism)) {
-      *    _ds.setParallelism(parallelismDeg).rebalance()
-      *  }
-      *
-      */
-
-    datasetWrap(_ds)
-  }
-
-  def uncache(): this.type = {
-    if (isCached) {
-      Hadoop2HDFSUtil.delete(cacheFileName)
-      isCached = false
-    }
-    this
-  }
-
-  /** Writes a [[DataSet]] to the specified path and returns it as a 
DataSource for subsequent
-    * operations.
-    *
-    * @param dataset [[DataSet]] to write to disk
-    * @param path File path to write dataset to
-    * @tparam T Type of the [[DataSet]] elements
-    */
-  def persist[T: ClassTag: TypeInformation](dataset: DataSet[T], path: 
String): Unit = {
-    val env = dataset.getExecutionEnvironment
-    val outputFormat = new TypeSerializerOutputFormat[T]
-    val filePath = new Path(path)
-
-    outputFormat.setOutputFilePath(filePath)
-    outputFormat.setWriteMode(WriteMode.OVERWRITE)
-
-    dataset.output(outputFormat)
-    env.execute("FlinkTools persist")
-  }
-
-  /** Read a [[DataSet]] from specified path and returns it as a DataSource 
for subsequent
-    * operations.
-    *
-    * @param path File path to read dataset from
-    * @param ds persisted ds to retrieve type information and environment forom
-    * @tparam T key Type of the [[DataSet]] elements
-    * @return [[DataSet]] the persisted dataset
-    */
-  def readPersistedDataSet[T: ClassTag : TypeInformation]
-       (path: String, ds: DataSet[T]): DataSet[T] = {
-
-    val env = ds.getExecutionEnvironment
-    val inputFormat = new TypeSerializerInputFormat[T](ds.getType())
-    val filePath = new Path(path)
-    inputFormat.setFilePath(filePath)
-
-    env.createInput(inputFormat)
-  }
-
-
-  // Members declared in org.apache.mahout.math.drm.DrmLike
-
-  protected[mahout] def canHaveMissingRows: Boolean = _canHaveMissingRows
-
-  def checkpoint(cacheHint: CacheHint.CacheHint): CheckpointedDrm[K] = {
-    this
-  }
-
-  def collect: Matrix = {
-    val data = ds.collect()
-    val isDense = data.forall(_._2.isDense)
-
-    val cols = ncol
-    val rows = safeToNonNegInt(nrow)
-
-    val m = if (isDense) {
-      new DenseMatrix(rows, cols)
-    } else {
-      new SparseMatrix(rows, cols)
-    }
-
-    val intRowIndices = keyClassTag == implicitly[ClassTag[Int]]
-
-    if (intRowIndices) {
-      data.foreach { case (t, vec) =>
-        val idx = t.asInstanceOf[Int]
-        m(idx, ::) := vec
-      }
-
-      println(m.ncol, m.nrow)
-    } else {
-      // assign all rows sequentially
-      val d = data.zipWithIndex
-      d.foreach {
-        case ((_, vec), idx) => m(idx, ::) := vec
-      }
-
-      val rowBindings = d.map {
-        case ((t, _), idx) => (t.toString, idx: java.lang.Integer) 
-      }.toMap.asJava
-
-      m.setRowLabelBindings(rowBindings)
-    }
-
-    m
-  }
-
-  def dfsWrite(path: String): Unit = {
-    val env = ds.getExecutionEnvironment
-
-    val keyTag = implicitly[ClassTag[K]]
-
-    val job = new JobConf
-    FileOutputFormat.setOutputPath(job, new org.apache.hadoop.fs.Path(path))
-
-    // explicitly define all Writable Subclasses for ds.map() keys
-    // as well as the SequenceFileOutputFormat paramaters
-    if (keyTag.runtimeClass == classOf[Int]) {
-      // explicitly map into Int keys
-      implicit val typeInformation = 
createTypeInformation[(IntWritable,VectorWritable)]
-      val writableDataset = ds.map(new MapFunction[DrmTuple[K], (IntWritable, 
VectorWritable)] {
-        def map(tuple: DrmTuple[K]): (IntWritable, VectorWritable) =
-          (new IntWritable(tuple._1.asInstanceOf[Int]), new 
VectorWritable(tuple._2))
-      })
-
-      // setup sink for IntWritable
-      job.setOutputKeyClass(classOf[IntWritable])
-      job.setOutputValueClass(classOf[VectorWritable])
-      val sequenceFormat = new SequenceFileOutputFormat[IntWritable, 
VectorWritable]
-      val hadoopOutput  = new HadoopOutputFormat(sequenceFormat, job)
-      writableDataset.output(hadoopOutput)
-
-     } else if (keyTag.runtimeClass == classOf[String]) {
-      // explicitly map into Text keys
-      val writableDataset = ds.map(new MapFunction[DrmTuple[K], (Text, 
VectorWritable)] {
-        def map(tuple: DrmTuple[K]): (Text, VectorWritable) =
-          (new Text(tuple._1.asInstanceOf[String]), new 
VectorWritable(tuple._2))
-      })
-
-      // setup sink for Text
-      job.setOutputKeyClass(classOf[Text])
-      job.setOutputValueClass(classOf[VectorWritable])
-      val sequenceFormat = new SequenceFileOutputFormat[Text, VectorWritable]
-      val hadoopOutput  = new HadoopOutputFormat(sequenceFormat, job)
-      writableDataset.output(hadoopOutput)
-
-    } else if (keyTag.runtimeClass == classOf[Long]) {
-      // explicitly map into Long keys
-      val writableDataset = ds.map(new MapFunction[DrmTuple[K], (LongWritable, 
VectorWritable)] {
-        def map(tuple: DrmTuple[K]): (LongWritable, VectorWritable) =
-          (new LongWritable(tuple._1.asInstanceOf[Long]), new 
VectorWritable(tuple._2))
-      })
-
-      // setup sink for LongWritable
-      job.setOutputKeyClass(classOf[LongWritable])
-      job.setOutputValueClass(classOf[VectorWritable])
-      val sequenceFormat = new SequenceFileOutputFormat[LongWritable, 
VectorWritable]
-      val hadoopOutput  = new HadoopOutputFormat(sequenceFormat, job)
-      writableDataset.output(hadoopOutput)
-
-    } else throw new IllegalArgumentException("Do not know how to convert 
class tag %s to Writable.".format(keyTag))
-
-    env.execute(s"dfsWrite($path)")
-  }
-
-  private def keyToWritableFunc[K: ClassTag](keyTag: ClassTag[K]): (K) => 
Writable = {
-    if (keyTag.runtimeClass == classOf[Int]) { 
-      (x: K) => new IntWritable(x.asInstanceOf[Int])
-    } else if (keyTag.runtimeClass == classOf[String]) {
-      (x: K) => new Text(x.asInstanceOf[String]) 
-    } else if (keyTag.runtimeClass == classOf[Long]) {
-      (x: K) => new LongWritable(x.asInstanceOf[Long])
-    } else {
-      throw new IllegalArgumentException("Do not know how to convert class tag 
%s to Writable.".format(keyTag))
-    }
-  }
-
-  def newRowCardinality(n: Int): CheckpointedDrm[K] = {
-    assert(n > -1)
-    assert(n >= nrow)
-    new CheckpointedFlinkDrm(ds = ds, _nrow = n, _ncol = _ncol, cacheHint = 
cacheHint,
-      partitioningTag = partitioningTag, _canHaveMissingRows = 
_canHaveMissingRows)
-  }
-
-  override val context: DistributedContext = ds.getExecutionEnvironment
-
-}
-
-object CheckpointedFlinkDrm {
-  val UNKNOWN = -1
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/mahout/blob/99a5358f/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrmOps.scala
----------------------------------------------------------------------
diff --git 
a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrmOps.scala
 
b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrmOps.scala
deleted file mode 100644
index e65c43d..0000000
--- 
a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrmOps.scala
+++ /dev/null
@@ -1,34 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.mahout.flinkbindings.drm
-
-import org.apache.mahout.math.drm.CheckpointedDrm
-
-import scala.reflect.ClassTag
-
-class CheckpointedFlinkDrmOps[K: ClassTag](drm: CheckpointedDrm[K]) {
-  assert(drm.isInstanceOf[CheckpointedFlinkDrm[K]], "must be a Flink-backed 
matrix")
-
-  private[flinkbindings] val flinkDrm = 
drm.asInstanceOf[CheckpointedFlinkDrm[K]]
-
-  /** Flink matrix customization exposure */
-  def dataset = flinkDrm.ds
-
-}

http://git-wip-us.apache.org/repos/asf/mahout/blob/99a5358f/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/FlinkDrm.scala
----------------------------------------------------------------------
diff --git 
a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/FlinkDrm.scala 
b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/FlinkDrm.scala
deleted file mode 100644
index aea62fa..0000000
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/FlinkDrm.scala
+++ /dev/null
@@ -1,108 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.mahout.flinkbindings.drm
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.scala._
-import org.apache.mahout.flinkbindings.{BlockifiedDrmDataSet, DrmDataSet, 
FlinkDistributedContext, wrapContext}
-import org.apache.mahout.math.scalabindings.RLikeOps._
-import org.apache.mahout.math.scalabindings._
-import org.apache.mahout.math.{DenseMatrix, Matrix, SparseRowMatrix}
-
-import scala.reflect.ClassTag
-
-trait FlinkDrm[K] {
-  def executionEnvironment: ExecutionEnvironment
-  def context: FlinkDistributedContext
-  def isBlockified: Boolean
-
-  def asBlockified: BlockifiedFlinkDrm[K]
-  def asRowWise: RowsFlinkDrm[K]
-
-  def classTag: ClassTag[K]
-}
-
-class RowsFlinkDrm[K: TypeInformation: ClassTag](val ds: DrmDataSet[K], val 
ncol: Int) extends FlinkDrm[K] {
-
-  def executionEnvironment = ds.getExecutionEnvironment
-  def context: FlinkDistributedContext = ds.getExecutionEnvironment
-
-  def isBlockified = false
-
-  def asBlockified : BlockifiedFlinkDrm[K] = {
-    val ncolLocal = ncol
-    val classTag = implicitly[ClassTag[K]]
-
-    val parts = ds.mapPartition {
-      values =>
-        val (keys, vectors) = values.toIterable.unzip
-
-        if (vectors.nonEmpty) {
-          val vector = vectors.head
-          val matrix: Matrix = if (vector.isDense) {
-            val matrix = new DenseMatrix(vectors.size, ncolLocal)
-            vectors.zipWithIndex.foreach { case (vec, idx) => matrix(idx, ::) 
:= vec }
-            matrix
-          } else {
-            new SparseRowMatrix(vectors.size, ncolLocal, vectors.toArray)
-          }
-
-          Seq((keys.toArray(classTag), matrix))
-        } else {
-          Seq()
-        }
-    }
-
-    new BlockifiedFlinkDrm[K](parts, ncol)
-  }
-
-  def asRowWise = this
-
-  def classTag = implicitly[ClassTag[K]]
-
-}
-
-class BlockifiedFlinkDrm[K: TypeInformation: ClassTag](val ds: 
BlockifiedDrmDataSet[K], val ncol: Int) extends FlinkDrm[K] {
-
-
-  def executionEnvironment = ds.getExecutionEnvironment
-  def context: FlinkDistributedContext = ds.getExecutionEnvironment
-
-
-  def isBlockified = true
-
-  def asBlockified = this
-
-  def asRowWise = {
-    val out = ds.flatMap {
-      tuple =>
-        val keys = tuple._1
-        val block = tuple._2
-
-        keys.view.zipWithIndex.map {
-          case (key, idx) => (key, block(idx, ::))
-        }
-    }
-
-    new RowsFlinkDrm[K](out, ncol)
-  }
-
-  def classTag = implicitly[ClassTag[K]]
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/mahout/blob/99a5358f/flink/src/main/scala/org/apache/mahout/flinkbindings/io/DrmMetadata.scala
----------------------------------------------------------------------
diff --git 
a/flink/src/main/scala/org/apache/mahout/flinkbindings/io/DrmMetadata.scala 
b/flink/src/main/scala/org/apache/mahout/flinkbindings/io/DrmMetadata.scala
deleted file mode 100644
index 83ede9a..0000000
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/io/DrmMetadata.scala
+++ /dev/null
@@ -1,71 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.mahout.flinkbindings.io
-
-import scala.reflect.ClassTag
-import org.apache.hadoop.io._
-import java.util.Arrays
-
-/**
- * Flink DRM Metadata
- */
-class DrmMetadata(
-
-  /** Writable  key type as a sub-type of Writable */
-  val keyTypeWritable: Class[_],
-
-  /** Value writable type, as a sub-type of Writable */
-  val valueTypeWritable: Class[_]) {
-
-  import DrmMetadata._
-
-  /**
-   * @param keyClassTag: Actual drm key class tag once converted out of 
writable
-   * @param keyW2ValFunc: Conversion from Writable to value type of the DRM key
-   */
-  val (keyClassTag: ClassTag[_], unwrapKeyFunction: ((Writable) => Any)) = 
keyTypeWritable match {
-    case cz if cz == classOf[IntWritable] => ClassTag.Int -> w2int _
-    case cz if cz == classOf[LongWritable] => ClassTag.Long -> w2long _
-    case cz if cz == classOf[DoubleWritable] => ClassTag.Double -> w2double _
-    case cz if cz == classOf[FloatWritable] => ClassTag.Float -> w2float _
-    case cz if cz == classOf[Text] => ClassTag(classOf[String]) -> w2string _
-    case cz if cz == classOf[BooleanWritable] => ClassTag(classOf[Boolean]) -> 
w2bool _
-    case cz if cz == classOf[BytesWritable] => ClassTag(classOf[Array[Byte]]) 
-> w2bytes _
-    case _ => throw new IllegalArgumentException(s"Unsupported DRM key 
type:${keyTypeWritable.getName}")
-  }
-
-}
-
-object DrmMetadata {
-
-  private[io] def w2int(w: Writable) = w.asInstanceOf[IntWritable].get()
-
-  private[io] def w2long(w: Writable) = w.asInstanceOf[LongWritable].get()
-
-  private[io] def w2double(w: Writable) = w.asInstanceOf[DoubleWritable].get()
-
-  private[io] def w2float(w: Writable) = w.asInstanceOf[FloatWritable].get()
-
-  private[io] def w2string(w: Writable) = w.asInstanceOf[Text].toString()
-
-  private[io] def w2bool(w: Writable) = w.asInstanceOf[BooleanWritable].get()
-
-  private[io] def w2bytes(w: Writable) = 
Arrays.copyOf(w.asInstanceOf[BytesWritable].getBytes(),
-    w.asInstanceOf[BytesWritable].getLength())
-}

http://git-wip-us.apache.org/repos/asf/mahout/blob/99a5358f/flink/src/main/scala/org/apache/mahout/flinkbindings/io/HDFSPathSearch.scala
----------------------------------------------------------------------
diff --git 
a/flink/src/main/scala/org/apache/mahout/flinkbindings/io/HDFSPathSearch.scala 
b/flink/src/main/scala/org/apache/mahout/flinkbindings/io/HDFSPathSearch.scala
deleted file mode 100644
index b027878..0000000
--- 
a/flink/src/main/scala/org/apache/mahout/flinkbindings/io/HDFSPathSearch.scala
+++ /dev/null
@@ -1,83 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.mahout.flinkbindings.io
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
-
-
-/**
- * Returns a [[java.lang.String]], which is comma delimited list of URIs 
discovered based on parameters
- * in the constructor.
- * The String is formatted to be input into 
[[org.apache.flink.api.scala.ExecutionEnvironment#textFile()]]
- * @param pathURI Where to start looking for inFiles, may be a list of comma 
delimited URIs
- * @param filePattern regex that must match the entire filename to have the 
file returned
- * @param recursive true traverses the filesystem recursively, default = false
- * 
- */
-case class HDFSPathSearch(pathURI: String, filePattern: String = "", 
recursive: Boolean = false) {
-
-  val conf = new Configuration()
-  val fs = FileSystem.get(conf)
-
-  /**
-   * Returns a string of comma delimited URIs matching the filePattern
-   * When pattern matching dirs are never returned, only traversed.
-   */
-  def uris: String = {
-    if (!filePattern.isEmpty) { // have file pattern so
-      val pathURIs = pathURI.split(",")
-      var files = ""
-      for (uri <- pathURIs) {
-        files = findFiles(uri, filePattern, files)
-      }
-      if (files.length > 0 && files.endsWith(",")) files = files.dropRight(1) 
// drop the last comma
-      files
-    } else {
-      pathURI
-    }
-  }
-
-  /**
-   * Find matching files in the dir, recursively call self when another 
directory is found
-   * Only files are matched, directories are traversed but never return a match
-   */
-  private def findFiles(dir: String, filePattern: String = ".*", files: String 
= ""): String = {
-    val seed = fs.getFileStatus(new Path(dir))
-    var f: String = files
-
-    if (seed.isDirectory) {
-      val fileStatuses: Array[FileStatus] = fs.listStatus(new Path(dir))
-      for (fileStatus <- fileStatuses) {
-        if (fileStatus.getPath().getName().matches(filePattern)
-          && !fileStatus.isDirectory) {
-          // found a file
-          if (fileStatus.getLen() != 0) {
-            // file is not empty
-            f = f + fileStatus.getPath.toUri.toString + ","
-          }
-        } else if (fileStatus.isDirectory && recursive) {
-          f = findFiles(fileStatus.getPath.toString, filePattern, f)
-        }
-      }
-    } else { f = dir } // was a filename not dir
-    f
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/mahout/blob/99a5358f/flink/src/main/scala/org/apache/mahout/flinkbindings/io/HDFSUtil.scala
----------------------------------------------------------------------
diff --git 
a/flink/src/main/scala/org/apache/mahout/flinkbindings/io/HDFSUtil.scala 
b/flink/src/main/scala/org/apache/mahout/flinkbindings/io/HDFSUtil.scala
deleted file mode 100644
index 73436f1..0000000
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/io/HDFSUtil.scala
+++ /dev/null
@@ -1,33 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.mahout.flinkbindings.io
-
-/**
- * High level Hadoop version-specific hdfs manipulations we need in context of 
our operations.
- *
- */
-trait HDFSUtil {
-
-  /**
-   *  Read DRM header information off (H)DFS.
-   */
-  def readDrmHeader(path: String): DrmMetadata
-
-}
-

http://git-wip-us.apache.org/repos/asf/mahout/blob/99a5358f/flink/src/main/scala/org/apache/mahout/flinkbindings/io/Hadoop2HDFSUtil.scala
----------------------------------------------------------------------
diff --git 
a/flink/src/main/scala/org/apache/mahout/flinkbindings/io/Hadoop2HDFSUtil.scala 
b/flink/src/main/scala/org/apache/mahout/flinkbindings/io/Hadoop2HDFSUtil.scala
deleted file mode 100644
index 211088a..0000000
--- 
a/flink/src/main/scala/org/apache/mahout/flinkbindings/io/Hadoop2HDFSUtil.scala
+++ /dev/null
@@ -1,83 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.mahout.flinkbindings.io
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.Path
-import org.apache.hadoop.io.SequenceFile.Reader
-import org.apache.hadoop.io.Writable
-
-object Hadoop2HDFSUtil extends HDFSUtil {
-
-  /**
-   * Read the header of a sequence file and determine the Key and Value type
-   * @param path - hdfs path of Sequence File
-   * @return
-   */
-  def readDrmHeader(path: String): DrmMetadata = {
-    val dfsPath = new Path(path)
-    val conf = new Configuration()
-    val fs = dfsPath.getFileSystem(conf)
-
-    fs.setConf(conf)
-
-    val partFilePath: Path = fs.listStatus(dfsPath)
-
-      // Filter out anything starting with .
-      .filter { s =>
-        !s.getPath.getName.startsWith("\\.") && 
!s.getPath.getName.startsWith("_") && !s.isDirectory
-      }
-
-      // Take path
-      .map(_.getPath)
-
-      // Take only one, if any
-      .headOption
-
-      // Require there's at least one partition file found.
-      .getOrElse {
-        throw new IllegalArgumentException(s"No partition files found in 
${dfsPath.toString}.")
-      }
-
-     val reader = new Reader(fs.getConf, Reader.file(partFilePath))
-
-    try {
-      new DrmMetadata(
-        keyTypeWritable = reader.getKeyClass.asSubclass(classOf[Writable]),
-        valueTypeWritable = reader.getValueClass.asSubclass(classOf[Writable]))
-    } finally {
-      reader.close()
-    }
-
-  }
-
-  /**
-   * Delete a path from the filesystem
-   * @param path - hdfs path
-   */
-  def delete(path: String) {
-    val dfsPath = new Path(path)
-    val fs = dfsPath.getFileSystem(new Configuration())
-
-    if (fs.exists(dfsPath)) {
-      fs.delete(dfsPath, true)
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/mahout/blob/99a5358f/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala 
b/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala
deleted file mode 100644
index cf4da41..0000000
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala
+++ /dev/null
@@ -1,114 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.mahout
-
-import org.apache.flink.api.common.functions.MapFunction
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.scala.utils._
-import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment, _}
-import org.apache.mahout.flinkbindings.drm.{CheckpointedFlinkDrm, 
CheckpointedFlinkDrmOps, FlinkDrm, RowsFlinkDrm}
-import org.apache.mahout.math.drm.{BlockifiedDrmTuple, CheckpointedDrm, 
DistributedContext, DrmTuple, _}
-import org.apache.mahout.math.{DenseVector, Matrix, MatrixWritable, Vector, 
VectorWritable}
-import org.slf4j.LoggerFactory
-
-import scala.Array._
-import scala.reflect.ClassTag
-
-package object flinkbindings {
-
-  private[flinkbindings] val log = 
LoggerFactory.getLogger("org.apache.mahout.flinkbindings")
-
-  /** Row-wise organized DRM dataset type */
-  type DrmDataSet[K] = DataSet[DrmTuple[K]]
-
-  /**
-   * Blockified DRM dataset (keys of original DRM are grouped into array 
corresponding to rows of Matrix
-   * object value
-   */
-  type BlockifiedDrmDataSet[K] = DataSet[BlockifiedDrmTuple[K]]
-
-  implicit def wrapMahoutContext(context: DistributedContext): 
FlinkDistributedContext = {
-    assert(context.isInstanceOf[FlinkDistributedContext], "it must be 
FlinkDistributedContext")
-    context.asInstanceOf[FlinkDistributedContext]
-  }
-
-  implicit def wrapContext(env: ExecutionEnvironment): FlinkDistributedContext 
=
-    new FlinkDistributedContext(env)
-
-  implicit def unwrapContext(ctx: FlinkDistributedContext): 
ExecutionEnvironment = ctx.env
-
-  private[flinkbindings] implicit def castCheckpointedDrm[K: ClassTag](drm: 
CheckpointedDrm[K])
-    : CheckpointedFlinkDrm[K] = {
-
-    assert(drm.isInstanceOf[CheckpointedFlinkDrm[K]], "it must be a 
Flink-backed matrix")
-    drm.asInstanceOf[CheckpointedFlinkDrm[K]]
-  }
-
-  implicit def checkpointedDrmToFlinkDrm[K: TypeInformation: ClassTag](cp: 
CheckpointedDrm[K]): FlinkDrm[K] = {
-    val flinkDrm = castCheckpointedDrm(cp)
-    new RowsFlinkDrm[K](flinkDrm.ds, flinkDrm.ncol)
-  }
-
-  /** Adding Flink-specific ops */
-  implicit def cpDrm2cpDrmFlinkOps[K: ClassTag](drm: CheckpointedDrm[K]): 
CheckpointedFlinkDrmOps[K] =
-    new CheckpointedFlinkDrmOps[K](drm)
-
-  implicit def drm2cpDrmFlinkOps[K: ClassTag](drm: DrmLike[K]): 
CheckpointedFlinkDrmOps[K] = drm: CheckpointedDrm[K]
-
-
-  private[flinkbindings] implicit def wrapAsWritable(m: Matrix): 
MatrixWritable = new MatrixWritable(m)
-  private[flinkbindings] implicit def wrapAsWritable(v: Vector): 
VectorWritable = new VectorWritable(v)
-  private[flinkbindings] implicit def unwrapFromWritable(w: MatrixWritable): 
Matrix = w.get()
-  private[flinkbindings] implicit def unwrapFromWritable(w: VectorWritable): 
Vector = w.get()
-
-
-  def readCsv(file: String, delim: String = ",", comment: String = "#")
-             (implicit dc: DistributedContext): CheckpointedDrm[Long] = {
-    val vectors = dc.env.readTextFile(file)
-      .filter((in: String) => {
-        !in.startsWith(comment)
-      })
-      .map(new MapFunction[String, Vector] {
-        def map(in: String): Vector = {
-          val array = in.split(delim).map(_.toDouble)
-          new DenseVector(array)
-        }
-      })
-    datasetToDrm(vectors)
-  }
-
-  def datasetToDrm(ds: DataSet[Vector]): CheckpointedDrm[Long] = {
-    val zipped = ds.zipWithIndex
-    datasetWrap(zipped)
-  }
-
-  def datasetWrap[K: ClassTag](dataset: DataSet[(K, Vector)]): 
CheckpointedDrm[K] = {
-    implicit val typeInformation = FlinkEngine.generateTypeInformation[K]
-    new CheckpointedFlinkDrm[K](dataset)
-  }
-
-  private[flinkbindings] def extractRealClassTag[K: ClassTag](drm: 
DrmLike[K]): ClassTag[_] = drm.keyClassTag
-
-  private[flinkbindings] def getMahoutHome() = {
-    var mhome = System.getenv("MAHOUT_HOME")
-    if (mhome == null) mhome = System.getProperty("mahout.home")
-    require(mhome != null, "MAHOUT_HOME is required to spawn mahout-based 
flink jobs")
-    mhome
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/mahout/blob/99a5358f/flink/src/test/scala/org/apache/mahout/flinkbindings/DistributedFlinkSuite.scala
----------------------------------------------------------------------
diff --git 
a/flink/src/test/scala/org/apache/mahout/flinkbindings/DistributedFlinkSuite.scala
 
b/flink/src/test/scala/org/apache/mahout/flinkbindings/DistributedFlinkSuite.scala
deleted file mode 100644
index 094c45b..0000000
--- 
a/flink/src/test/scala/org/apache/mahout/flinkbindings/DistributedFlinkSuite.scala
+++ /dev/null
@@ -1,74 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.mahout.flinkbindings
-
-import java.util.concurrent.TimeUnit
-
-import org.apache.flink.api.scala.ExecutionEnvironment
-import org.apache.flink.test.util.{ForkableFlinkMiniCluster, TestBaseUtils}
-import org.apache.mahout.math.drm.DistributedContext
-import org.apache.mahout.test.DistributedMahoutSuite
-import org.scalatest.{ConfigMap, Suite}
-
-import scala.concurrent.duration.FiniteDuration
-
-trait DistributedFlinkSuite extends DistributedMahoutSuite { this: Suite =>
-
-  protected implicit var mahoutCtx: DistributedContext = _
-  protected var env: ExecutionEnvironment = null
-
-  var cluster: Option[ForkableFlinkMiniCluster] = None
-  val parallelism = 4
-  protected val DEFAULT_AKKA_ASK_TIMEOUT: Long = 1000
-  protected var DEFAULT_TIMEOUT: FiniteDuration = new 
FiniteDuration(DEFAULT_AKKA_ASK_TIMEOUT, TimeUnit.SECONDS)
-
-  def initContext() {
-    mahoutCtx = wrapContext(env)
-  }
-
-  override def beforeEach() {
-    initContext()
-  }
-
-  override def afterEach() {
-    super.afterEach()
-  }
-
-  override protected def afterAll(configMap: ConfigMap): Unit = {
-    super.afterAll(configMap)
-    cluster.foreach(c => TestBaseUtils.stopCluster(c, DEFAULT_TIMEOUT))
-  }
-
-  override protected def beforeAll(configMap: ConfigMap): Unit = {
-    super.beforeAll(configMap)
-
-    val cl = TestBaseUtils.startCluster(
-      1,
-      parallelism,
-      false,
-      false,
-      true)
-
-    env = ExecutionEnvironment.createLocalEnvironment(parallelism)
-
-    cluster = Some(cl)
-    initContext()
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/mahout/blob/99a5358f/flink/src/test/scala/org/apache/mahout/flinkbindings/DrmLikeOpsSuite.scala
----------------------------------------------------------------------
diff --git 
a/flink/src/test/scala/org/apache/mahout/flinkbindings/DrmLikeOpsSuite.scala 
b/flink/src/test/scala/org/apache/mahout/flinkbindings/DrmLikeOpsSuite.scala
deleted file mode 100644
index 288561b..0000000
--- a/flink/src/test/scala/org/apache/mahout/flinkbindings/DrmLikeOpsSuite.scala
+++ /dev/null
@@ -1,93 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.mahout.flinkbindings
-
-import org.apache.mahout.logging.info
-import org.apache.mahout.math.DenseMatrix
-import org.apache.mahout.math.drm.RLikeDrmOps._
-import org.apache.mahout.math.drm._
-import org.apache.mahout.math.scalabindings.RLikeOps._
-import org.apache.mahout.math.scalabindings._
-import org.scalatest.FunSuite
-
-
-class DrmLikeOpsSuite extends FunSuite with DistributedFlinkSuite {
-
-  test("norm") {
-    val inCoreA = dense((1, 2, 3), (2, 3, 4), (3, 4, 5))
-    val A = drmParallelize(m = inCoreA, numPartitions = 2)
-
-    (inCoreA.norm - A.norm) should be < 1e-6
-  }
-
-  test("colSums") {
-    val inCoreA = dense((1, 2, 3), (2, 3, 4), (3, 4, 5))
-    val A = drmParallelize(m = inCoreA, numPartitions = 2)
-    
-    (inCoreA.colSums - A.colSums).norm(2) should be < 1e-6
-  }
-
-  test("rowSums") {
-    val inCoreA = dense((1, 2, 3), (2, 3, 4), (3, 4, 5))
-    val A = drmParallelize(m = inCoreA, numPartitions = 2)
-    
-    (inCoreA.rowSums - A.rowSums).norm(2) should be < 1e-6
-  }
-
-  test("rowMeans") {
-    val inCoreA = dense((1, 2, 3), (2, 3, 4), (3, 4, 5))
-    val A = drmParallelize(m = inCoreA, numPartitions = 2)
-    
-    (inCoreA.rowMeans - A.rowMeans).norm(2) should be < 1e-6
-  }
-
-  test("numNonZeroElementsPerColumn") {
-    val A = dense((0, 2), (3, 0), (0, -30))
-    val drmA = drmParallelize(A, numPartitions = 2)
-
-    drmA.numNonZeroElementsPerColumn() should 
equal(A.numNonZeroElementsPerColumn())
-  }
-
-
-  test("drmParallelizeEmpty") {
-    val emptyDrm = drmParallelizeEmpty(nrow = 2, ncol = 2, numPartitions = 2)
-    val expected = dense((0, 0), (0, 0))
-
-    (emptyDrm.collect - expected).norm should be < 1e-6
-  }
-
-  test("Aggregating transpose") {
-
-    val mxA = new DenseMatrix(20, 10) := 1
-
-    val drmA = drmParallelize(mxA, numPartitions = 3)
-
-    val reassignedA = drmA.mapBlock() { case (keys, block) ⇒
-      keys.map(_ % 3) → block
-    }
-
-    val mxAggrA = reassignedA.t(::, 0 until 3).collect
-
-    info(mxAggrA.toString)
-
-    mxAggrA(0,0) shouldBe 7
-    mxAggrA(0,1) shouldBe 7
-    mxAggrA(0,2) shouldBe 6
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/mahout/blob/99a5358f/flink/src/test/scala/org/apache/mahout/flinkbindings/FlinkByteBCastSuite.scala
----------------------------------------------------------------------
diff --git 
a/flink/src/test/scala/org/apache/mahout/flinkbindings/FlinkByteBCastSuite.scala
 
b/flink/src/test/scala/org/apache/mahout/flinkbindings/FlinkByteBCastSuite.scala
deleted file mode 100644
index 4953647..0000000
--- 
a/flink/src/test/scala/org/apache/mahout/flinkbindings/FlinkByteBCastSuite.scala
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
-  * Licensed to the Apache Software Foundation (ASF) under one
-  * or more contributor license agreements. See the NOTICE file
-  * distributed with this work for additional information
-  * regarding copyright ownership. The ASF licenses this file
-  * to you under the Apache License, Version 2.0 (the
-  * "License"); you may not use this file except in compliance
-  * with the License. You may obtain a copy of the License at
-  *
-  * http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing,
-  * software distributed under the License is distributed on an
-  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-  * KIND, either express or implied. See the License for the
-  * specific language governing permissions and limitations
-  * under the License.
-  */
-package org.apache.mahout.flinkbindings
-
-import org.apache.mahout.math.scalabindings.RLikeOps._
-import org.apache.mahout.math.scalabindings._
-import org.scalatest.FunSuite
-
-class FlinkByteBCastSuite extends FunSuite {
-
-  test("BCast vector") {
-    val v = dvec(1, 2, 3)
-    val vBc = FlinkByteBCast.wrap(v)
-    assert((v - vBc.value).norm(2) <= 1e-6)
-  }
-
-  test("BCast matrix") {
-    val m = dense((1, 2), (3, 4))
-    val mBc = FlinkByteBCast.wrap(m)
-    assert((m - mBc.value).norm <= 1e-6)
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/mahout/blob/99a5358f/flink/src/test/scala/org/apache/mahout/flinkbindings/RLikeOpsSuite.scala
----------------------------------------------------------------------
diff --git 
a/flink/src/test/scala/org/apache/mahout/flinkbindings/RLikeOpsSuite.scala 
b/flink/src/test/scala/org/apache/mahout/flinkbindings/RLikeOpsSuite.scala
deleted file mode 100644
index 3e14d76..0000000
--- a/flink/src/test/scala/org/apache/mahout/flinkbindings/RLikeOpsSuite.scala
+++ /dev/null
@@ -1,326 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.mahout.flinkbindings
-
-import org.apache.mahout.flinkbindings._
-import org.apache.mahout.math._
-import org.apache.mahout.math.drm._
-import org.apache.mahout.math.drm.RLikeDrmOps._
-import org.apache.mahout.math.scalabindings._
-import org.apache.mahout.math.scalabindings.RLikeOps._
-import org.junit.runner.RunWith
-import org.scalatest.FunSuite
-import org.scalatest.junit.JUnitRunner
-import org.slf4j.Logger
-import org.slf4j.LoggerFactory
-
-
-class RLikeOpsSuite extends FunSuite with DistributedFlinkSuite {
-
-  val LOGGER = LoggerFactory.getLogger(getClass())
-
-  test("A %*% x") {
-    val inCoreA = dense((1, 2, 3), (2, 3, 4), (3, 4, 5))
-    val A = drmParallelize(m = inCoreA, numPartitions = 2)
-    val x: Vector = (0, 1, 2)
-
-    val res = A %*% x
-
-    val b = res.collect(::, 0)
-    assert(b == dvec(8, 11, 14))
-  }
-
-  test("A.t") {
-    val inCoreA = dense((1, 2, 3), (2, 3, 4))
-    val A = drmParallelize(m = inCoreA, numPartitions = 2)
-    val res = A.t.collect
-
-    val expected = inCoreA.t
-    assert((res - expected).norm < 1e-6)
-  }
-
-  test("A.t %*% x") {
-    val inCoreA = dense((1, 2, 3), (2, 3, 4))
-    val A = drmParallelize(m = inCoreA, numPartitions = 2)
-    val x = dvec(3, 11)
-    val res = (A.t %*% x).collect(::, 0)
-
-    val expected = inCoreA.t %*% x 
-    assert((res - expected).norm(2) < 1e-6)
-  }
-
-  test("A.t %*% B") {
-    val inCoreA = dense((1, 2), (2, 3), (3, 4))
-    val inCoreB = dense((1, 2), (3, 4), (11, 4))
-
-    val A = drmParallelize(m = inCoreA, numPartitions = 2)
-    val B = drmParallelize(m = inCoreB, numPartitions = 2)
-
-    val res = A.t %*% B
-
-    val expected = inCoreA.t %*% inCoreB
-    assert((res.collect - expected).norm < 1e-6)
-  }
-
-  test("A %*% B.t") {
-    val inCoreA = dense((1, 2), (2, 3), (3, 4))
-    val inCoreB = dense((1, 2), (3, 4), (11, 4))
-
-    val A = drmParallelize(m = inCoreA, numPartitions = 2)
-    val B = drmParallelize(m = inCoreB, numPartitions = 2)
-
-    val res = A %*% B.t
-
-    val expected = inCoreA %*% inCoreB.t
-    assert((res.collect - expected).norm < 1e-6)
-  }
-
-  test("A.t %*% A") {
-    val inCoreA = dense((1, 2), (2, 3), (3, 4))
-    val A = drmParallelize(m = inCoreA, numPartitions = 2)
-
-    val res = A.t %*% A
-
-    val expected = inCoreA.t %*% inCoreA
-    assert((res.collect - expected).norm < 1e-6)
-  }
-
-  test("A %*% B") {
-    val inCoreA = dense((1, 2), (2, 3), (3, 4)).t
-    val inCoreB = dense((1, 2), (3, 4), (11, 4))
-
-    val A = drmParallelize(m = inCoreA, numPartitions = 2)
-    val B = drmParallelize(m = inCoreB, numPartitions = 2)
-
-    val res = A %*% B
-
-    val expected = inCoreA %*% inCoreB
-    assert((res.collect - expected).norm < 1e-6)
-  }
-
-  test("A %*% B.t test 2") {
-    val mxA = Matrices.symmetricUniformView(10, 7, 80085)
-    val mxB = Matrices.symmetricUniformView(30, 7, 31337)
-    val A = drmParallelize(mxA, 3)
-    val B = drmParallelize(mxB, 4)
-
-    val ABt = (A %*% B.t).collect
-    (ABt - mxA %*% mxB.t).norm should be < 1e-7
-  }
-
-  test("ABt test") {
-    val mxX = dense((1, 2), (2, 3), (3, 4), (5, 6), (7, 8))
-    val mxY = dense((1, 2), (2, 3), (3, 4), (5, 6), (7, 8),
-                    (1, 2), (2, 3), (3, 4), (5, 6), (7, 8))
-
-    val drmX = drmParallelize(mxX, 3)
-    val drmY = drmParallelize(mxY, 4)
-
-    val XYt = (drmX %*% drmY.t).collect
-    val control = mxX %*% mxY.t
-    (XYt - control).norm should be < 1e-7
-  }
-
-
-  test("A * scalar") {
-    val inCoreA = dense((1, 2), (2, 3), (3, 4))
-    val A = drmParallelize(m = inCoreA, numPartitions = 2)
-
-    val res = A * 5
-    assert((res.collect - inCoreA * 5).norm < 1e-6)
-  }
-
-  test("A / scalar") {
-    val inCoreA = dense((1, 2), (2, 3), (3, 4)).t
-    val A = drmParallelize(m = inCoreA, numPartitions = 2)
-
-    val res = A / 5
-    assert((res.collect - (inCoreA / 5)).norm < 1e-6)
-  }
-
-  test("A + scalar") {
-    val inCoreA = dense((1, 2), (2, 3), (3, 4))
-    val A = drmParallelize(m = inCoreA, numPartitions = 2)
-
-    val res = A + 5
-    assert((res.collect - (inCoreA + 5)).norm < 1e-6)
-  }
-
-  test("A - scalar") {
-    val inCoreA = dense((1, 2), (2, 3), (3, 4))
-    val A = drmParallelize(m = inCoreA, numPartitions = 2)
-
-    val res = A - 5
-    assert((res.collect - (inCoreA - 5)).norm < 1e-6)
-  }
-
-  test("A * B") {
-    val inCoreA = dense((1, 2), (2, 3), (3, 4))
-    val inCoreB = dense((1, 2), (3, 4), (11, 4))
-
-    val A = drmParallelize(m = inCoreA, numPartitions = 2)
-    val B = drmParallelize(m = inCoreB, numPartitions = 2)
-
-    val res = A * B
-    val expected = inCoreA * inCoreB
-    assert((res.collect - expected).norm < 1e-6)
-  }
-
-  test("A / B") {
-    val inCoreA = dense((1, 2), (2, 3), (3, 4))
-    val inCoreB = dense((1, 2), (3, 4), (11, 4))
-
-    val A = drmParallelize(m = inCoreA, numPartitions = 2)
-    val B = drmParallelize(m = inCoreB, numPartitions = 2)
-
-    val res = A / B
-    val expected = inCoreA / inCoreB
-    assert((res.collect - expected).norm < 1e-6)
-  }
-
-  test("A + B") {
-    val inCoreA = dense((1, 2), (2, 3), (3, 4))
-    val inCoreB = dense((1, 2), (3, 4), (11, 4))
-
-    val A = drmParallelize(m = inCoreA, numPartitions = 2)
-    val B = drmParallelize(m = inCoreB, numPartitions = 2)
-
-    val res = A + B
-    val expected = inCoreA + inCoreB
-    assert((res.collect - expected).norm < 1e-6)
-  }
-
-  test("A - B") {
-    val inCoreA = dense((1, 2), (2, 3), (3, 4))
-    val inCoreB = dense((1, 2), (3, 4), (11, 4))
-
-    val A = drmParallelize(m = inCoreA, numPartitions = 2)
-    val B = drmParallelize(m = inCoreB, numPartitions = 2)
-
-    val res = A - B
-    val expected = inCoreA - inCoreB
-    assert((res.collect - expected).norm < 1e-6)
-  }
-
-  test("A cbind B") {
-    val inCoreA = dense((1, 2), (2, 3), (3, 4))
-    val inCoreB = dense((1, 2), (3, 4), (11, 4))
-    val A = drmParallelize(m = inCoreA, numPartitions = 2)
-    val B = drmParallelize(m = inCoreB, numPartitions = 2)
-
-    val res = A cbind B
-    val expected = dense((1, 2, 1, 2), (2, 3, 3, 4), (3, 4, 11, 4))
-    assert((res.collect - expected).norm < 1e-6)
-  }
-
-  test("1 cbind A") {
-    val inCoreA = dense((1, 2), (2, 3), (3, 4))
-    val A = drmParallelize(m = inCoreA, numPartitions = 2)
-
-    val res = 1 cbind A
-    val expected = dense((1, 1, 2), (1, 2, 3), (1, 3, 4))
-    assert((res.collect - expected).norm < 1e-6)
-  }
-
-  test("A cbind 1") {
-    val inCoreA = dense((1, 2), (2, 3), (3, 4))
-    val A = drmParallelize(m = inCoreA, numPartitions = 2)
-
-    val res = A cbind 1
-    val expected = dense((1, 2, 1), (2, 3, 1), (3, 4, 1))
-    assert((res.collect - expected).norm < 1e-6)
-  }
-
-  test("A rbind B") {
-    val inCoreA = dense((1, 2), (2, 3), (3, 4))
-    val inCoreB = dense((1, 2), (3, 4), (11, 4))
-    val A = drmParallelize(m = inCoreA, numPartitions = 2)
-    val B = drmParallelize(m = inCoreB, numPartitions = 2)
-
-    val res = A rbind B
-    val expected = dense((1, 2), (2, 3), (3, 4), (1, 2), (3, 4), (11, 4))
-    assert((res.collect - expected).norm < 1e-6)
-  }
-
-  test("A row slice") {
-    val inCoreA = dense((1, 2), (2, 3), (3, 4), (4, 4), (5, 5), (6, 7))
-    val A = drmParallelize(m = inCoreA, numPartitions = 2)
-
-    val res = A(2 until 5, ::)
-    val expected = inCoreA(2 until 5, ::)
-    assert((res.collect - expected).norm < 1e-6)
-  }
-
-  test("A column slice") {
-    val inCoreA = dense((1, 2, 1, 2), (2, 3, 3, 4), (3, 4, 11, 4))
-    val A = drmParallelize(m = inCoreA, numPartitions = 2)
-
-    val res = A(::, 0 until 2)
-    val expected = inCoreA(::, 0 until 2)
-    assert((res.collect - expected).norm < 1e-6)
-  }
-
-  test("A %*% inCoreB") {
-    val inCoreA = dense((1, 2), (2, 3), (3, 4)).t
-    val inCoreB = dense((1, 2), (3, 4), (11, 4))
-
-    val A = drmParallelize(m = inCoreA, numPartitions = 2)
-
-    val res = A %*% inCoreB
-
-    val expected = inCoreA %*% inCoreB
-    assert((res.collect - expected).norm < 1e-6)
-  }
-
-  test("drmBroadcast") {
-    val inCoreA = dense((1, 2), (3, 4), (11, 4))
-    val x = dvec(1, 2)
-    val A = drmParallelize(m = inCoreA, numPartitions = 2)
-
-    val b = drmBroadcast(x)
-
-    val res = A.mapBlock(1) { case (idx, block) =>
-      (idx, (block %*% b).toColMatrix)
-    }
-
-    val expected = inCoreA %*% x
-    assert((res.collect(::, 0) - expected).norm(2) < 1e-6)
-  }
-
-  test("A.t %*% B with Long keys") {
-    val inCoreA = dense((1, 2), (3, 4), (3, 5))
-    val inCoreB = dense((3, 5), (4, 6), (0, 1))
-
-    val A = drmParallelize(inCoreA, numPartitions = 2).mapBlock()({
-      case (keys, block) => (keys.map(_.toLong), block)
-    })
-
-    val B = drmParallelize(inCoreB, numPartitions = 2).mapBlock()({
-      case (keys, block) => (keys.map(_.toLong), block)
-    })
-
-    val C = A.t %*% B
-    val inCoreC = C.collect
-    val expected = inCoreA.t %*% inCoreB
-
-    (inCoreC - expected).norm should be < 1E-10
-  }
-
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/mahout/blob/99a5358f/flink/src/test/scala/org/apache/mahout/flinkbindings/UseCasesSuite.scala
----------------------------------------------------------------------
diff --git 
a/flink/src/test/scala/org/apache/mahout/flinkbindings/UseCasesSuite.scala 
b/flink/src/test/scala/org/apache/mahout/flinkbindings/UseCasesSuite.scala
deleted file mode 100644
index fa49114..0000000
--- a/flink/src/test/scala/org/apache/mahout/flinkbindings/UseCasesSuite.scala
+++ /dev/null
@@ -1,141 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.mahout.flinkbindings
-
-import org.apache.mahout.math.{Matrices, Vector}
-import org.apache.mahout.math.drm.RLikeDrmOps._
-import org.apache.mahout.math.drm._
-import org.apache.mahout.math.function.IntIntFunction
-import org.apache.mahout.math.scalabindings.RLikeOps._
-import org.apache.mahout.math.scalabindings._
-import org.scalatest.FunSuite
-import org.slf4j.LoggerFactory
-
-import scala.util.hashing.MurmurHash3
-
-class UseCasesSuite extends FunSuite with DistributedFlinkSuite {
-
-  val LOGGER = LoggerFactory.getLogger(getClass())
-
-  test("use case: Power interation 1000 x 1000 matrix") {
-    val dim = 1000
-
-    // we want a symmetric matrix so we can have real eigenvalues
-    val inCoreA = symmtericMatrix(dim, max = 2000)
-
-    val A = drmParallelize(m = inCoreA, numPartitions = 2)
-
-    var x: Vector = 1 to dim map (_ => 1.0 / Math.sqrt(dim))
-    var converged = false
-
-    var iteration = 1
-
-    while (!converged) {
-      LOGGER.info(s"iteration #$iteration...")
-
-      val Ax = A %*% x
-      var x_new = Ax.collect(::, 0)
-      x_new = x_new / x_new.norm(2)
-
-      val diff = (x_new - x).norm(2)
-      LOGGER.info(s"difference norm is $diff")
-
-      converged = diff < 1e-6
-      iteration = iteration + 1
-      x = x_new
-    }
-
-    LOGGER.info("converged")
-    // TODO: add test that it's the 1st PC
-  }
-
-  def symmtericMatrix(dim: Int, max: Int, seed: Int = 0x31337) = {
-    Matrices.functionalMatrixView(dim, dim, new IntIntFunction {
-      def apply(i: Int, j: Int): Double = {
-        val arr = Array(i + j, i * j, i + j + 31, i / (j + 1) + j / (i + 1))
-        Math.abs(MurmurHash3.arrayHash(arr, seed) % max)
-      }
-    })
-  }
-
-  test("use case: OLS Regression") {
-    val inCoreA = dense((1, 2), (2, 3), (3, 4), (5, 6), (7, 8), (9, 10))
-    val x = dvec(1, 2, 2, 3, 3, 3)
-    val A = drmParallelize(m = inCoreA, numPartitions = 2)
-    val AtA = A.t %*% A
-    val Atx = A.t %*% x
-
-    val w = solve(AtA, Atx)
-
-    val expected = solve(inCoreA.t %*% inCoreA, inCoreA.t %*% x)
-    assert((w(::, 0) - expected).norm(2) < 1e-6)
-  }
-
-  test("use case: Ridge Regression") {
-    val inCoreA = dense((1, 2), (2, 3), (3, 4), (5, 6), (7, 8), (9, 10))
-    val x = dvec(1, 2, 2, 3, 3, 3)
-    val A = drmParallelize(m = inCoreA, numPartitions = 2)
-
-    val lambda = 1.0
-    val reg = drmParallelize(diag(lambda, 2)) 
-
-    val w = solve(A.t %*% A + reg, A.t %*% x)
-
-    val expected = solve(inCoreA.t %*% inCoreA + diag(lambda, 2), inCoreA.t 
%*% x)
-    assert((w(::, 0) - expected).norm(2) < 1e-6)
-  }
-
-  // TODO: doesn't pass! 
-  // Call to localhost/127.0.0.1:6498 failed on local exception
-  ignore("use case: trimmed-EVD via power iteration") {
-    val dim = 1000
-    val k = 3
-
-    val inCoreA = symmtericMatrix(dim, max = 2000)
-    var A = drmParallelize(m = inCoreA, numPartitions = 2)
-
-    val eigenvectors = for (i <- 0 until k) yield {
-      var x: Vector = 1 to dim map (_ => 1.0 / Math.sqrt(dim))
-      var converged = false
-
-      while (!converged) {
-        val Ax = A %*% x
-        var x_new = Ax.collect(::, 0)
-        x_new = x_new / x_new.norm(2)
-
-        val diff = (x_new - x).norm(2)
-
-        converged = diff < 1e-6
-        x = x_new
-      }
-
-      println(s"${i}th principal component found...")
-      // assuming 0th component of x is not zero
-      val evalue = (A %*% x).collect(0, 0) / x(0) 
-      val evdComponent = drmParallelize(evalue * x cross x)
-
-      A = A - evdComponent
-
-      x
-    }
-
-    eigenvectors.foreach(println(_))
-  }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/mahout/blob/99a5358f/flink/src/test/scala/org/apache/mahout/flinkbindings/blas/LATestSuite.scala
----------------------------------------------------------------------
diff --git 
a/flink/src/test/scala/org/apache/mahout/flinkbindings/blas/LATestSuite.scala 
b/flink/src/test/scala/org/apache/mahout/flinkbindings/blas/LATestSuite.scala
deleted file mode 100644
index 95d0969..0000000
--- 
a/flink/src/test/scala/org/apache/mahout/flinkbindings/blas/LATestSuite.scala
+++ /dev/null
@@ -1,211 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.mahout.flinkbindings.blas
-
-import org.apache.flink.api.scala._
-import org.apache.mahout.flinkbindings._
-import org.apache.mahout.flinkbindings.drm.CheckpointedFlinkDrm
-import org.apache.mahout.math._
-import org.apache.mahout.math.drm._
-import org.apache.mahout.math.drm.logical.{OpAx, _}
-import org.apache.mahout.math.scalabindings.RLikeOps._
-import org.apache.mahout.math.scalabindings._
-import org.scalatest.FunSuite
-
-class LATestSuite extends FunSuite with DistributedFlinkSuite {
-
-  test("Ax blockified") {
-    val inCoreA = dense((1, 2, 3), (2, 3, 4), (3, 4, 5))
-    val A = drmParallelize(m = inCoreA, numPartitions = 2)
-    val x: Vector = (0, 1, 2)
-
-    val opAx = new OpAx(A, x)
-    val res = FlinkOpAx.blockifiedBroadcastAx(opAx, A)
-    val drm = new CheckpointedFlinkDrm(res.asRowWise.ds)
-    val output = drm.collect
-
-    val b = output(::, 0)
-    assert(b == dvec(8, 11, 14))
-  }
-
-  test("At sparseTrick") {
-    val inCoreA = dense((1, 2, 3), (2, 3, 4))
-    val A = drmParallelize(m = inCoreA, numPartitions = 2)
-
-    val opAt = new OpAt(A)
-    val res = FlinkOpAt.sparseTrick(opAt, A)
-    val drm = new CheckpointedFlinkDrm(res.asRowWise.ds, _nrow=inCoreA.ncol, 
_ncol=inCoreA.nrow)
-    val output = drm.collect
-
-    assert((output - inCoreA.t).norm < 1e-6)
-  }
-
-  test("AtB notZippable") {
-    val inCoreAt = dense((1, 2), (2, 3), (3, 4))
-
-    val At = drmParallelize(m = inCoreAt, numPartitions = 2)
-
-    val inCoreB = dense((1, 2), (3, 4), (11, 4))
-    val B = drmParallelize(m = inCoreB, numPartitions = 2)
-
-    val opAtB = new OpAtB(At, B)
-    val res = FlinkOpAtB.notZippable(opAtB, At, B)
-
-    val drm = new CheckpointedFlinkDrm(res.asRowWise.ds, _nrow=inCoreAt.ncol, 
_ncol=inCoreB.ncol)
-    val output = drm.collect
-
-    val expected = inCoreAt.t %*% inCoreB
-    assert((output - expected).norm < 1e-6)
-  }
-
-  test("AewScalar opScalarNoSideEffect") {
-    val inCoreA = dense((1, 2), (2, 3), (3, 4))
-    val A = drmParallelize(m = inCoreA, numPartitions = 2)
-    val scalar = 5.0
-
-    val op = new OpAewScalar(A, scalar, "*") 
-    val res = FlinkOpAewScalar.opScalarNoSideEffect(op, A, scalar)
-
-    val drm = new CheckpointedFlinkDrm(res.asRowWise.ds, _nrow=inCoreA.nrow, 
_ncol=inCoreA.ncol)
-    val output = drm.collect
-
-    val expected = inCoreA  * scalar
-    assert((output - expected).norm < 1e-6)
-  }
-
-  test("AewB rowWiseJoinNoSideEffect") {
-    val inCoreA = dense((1, 2), (2, 3), (3, 4))
-    val A = drmParallelize(m = inCoreA, numPartitions = 2)
-
-    val op = new OpAewB(A, A, "*")
-    val res = FlinkOpAewB.rowWiseJoinNoSideEffect(op, A, A)
-
-    val drm = new CheckpointedFlinkDrm(res.asRowWise.ds, _nrow=inCoreA.nrow, 
_ncol=inCoreA.ncol)
-    val output = drm.collect
-
-    assert((output - (inCoreA  * inCoreA)).norm < 1e-6)
-  }
-
-  test("Cbind") {
-    val inCoreA = dense((1, 2), (2, 3), (3, 4))
-    val inCoreB = dense((4, 4), (5, 5), (6, 7))
-    val A = drmParallelize(m = inCoreA, numPartitions = 2)
-    val B = drmParallelize(m = inCoreB, numPartitions = 2)
-
-    val op = new OpCbind(A, B)
-    val res = FlinkOpCBind.cbind(op, A, B)
-
-    val drm = new CheckpointedFlinkDrm(res.asRowWise.ds, _nrow=inCoreA.nrow,
-        _ncol= inCoreA.ncol + inCoreB.ncol)
-    val output = drm.collect
-
-    val expected = dense((1, 2, 4, 4), (2, 3, 5, 5), (3, 4, 6, 7))
-    assert((output - expected).norm < 1e-6)
-  }
-
-  test("CbindScalar left") {
-    val inCoreA = dense((1, 2), (2, 3), (3, 4))
-    val A = drmParallelize(m = inCoreA, numPartitions = 2)
-
-    val op = new OpCbindScalar(A, 1, true)
-    val res = FlinkOpCBind.cbindScalar(op, A, 1)
-
-    val drm = new CheckpointedFlinkDrm(res.asRowWise.ds, _nrow=inCoreA.nrow,
-        _ncol= inCoreA.ncol + 1)
-    val output = drm.collect
-
-    val expected = dense((1, 1, 2), (1, 2, 3), (1, 3, 4))
-    assert((output - expected).norm < 1e-6)
-  }
-
-  test("CbindScalar right") {
-    val inCoreA = dense((1, 2), (2, 3), (3, 4))
-    val A = drmParallelize(m = inCoreA, numPartitions = 2)
-
-    val op = new OpCbindScalar(A, 1, false)
-    val res = FlinkOpCBind.cbindScalar(op, A, 1)
-
-    val drm = new CheckpointedFlinkDrm(res.asRowWise.ds, _nrow=inCoreA.nrow,
-        _ncol= inCoreA.ncol + 1)
-    val output = drm.collect
-
-    val expected = dense((1, 2, 1), (2, 3, 1), (3, 4, 1))
-    assert((output - expected).norm < 1e-6)
-  }
-
-  test("slice") {
-    val inCoreA = dense((1, 2), (2, 3), (3, 4), (4, 4), (5, 5), (6, 7))
-    val A = drmParallelize(m = inCoreA, numPartitions = 2)
-
-    val range = 2 until 5
-    val op = new OpRowRange(A, range)
-    val res = FlinkOpRowRange.slice(op, A)
-
-    val drm = new CheckpointedFlinkDrm(res.asRowWise.ds, _nrow=op.nrow,
-        _ncol=inCoreA.ncol)
-    val output = drm.collect
-
-    val expected = inCoreA(2 until 5, ::)
-    assert((output - expected).norm < 1e-6)
-  }
-
-  test("A times inCoreB") {
-    val inCoreA = dense((1, 2, 3), (2, 3, 1), (3, 4, 4), (4, 4, 5), (5, 5, 7), 
(6, 7, 11))
-    val inCoreB = dense((2, 1), (3, 4), (5, 11))
-    val A = drmParallelize(m = inCoreA, numPartitions = 2)
-
-    val op = new OpTimesRightMatrix(A, inCoreB)
-    val res = FlinkOpTimesRightMatrix.drmTimesInCore(op, A, inCoreB)
-
-    val drm = new CheckpointedFlinkDrm(res.asRowWise.ds, _nrow=op.nrow,
-        _ncol=op.ncol)
-    val output = drm.collect
-
-    val expected = inCoreA %*% inCoreB
-    assert((output - expected).norm < 1e-6)
-  }
-
-  test("At A slim") {
-    val inCoreA = dense((1, 2, 3), (2, 3, 1), (3, 4, 4), (4, 4, 5), (5, 5, 7), 
(6, 7, 11))
-    val A = drmParallelize(m = inCoreA, numPartitions = 2)
-
-    val op = new OpAtA(A)
-    val output = FlinkOpAtA.slim(op, A)
-
-    val expected = inCoreA.t %*% inCoreA
-    assert((output - expected).norm < 1e-6)
-  }
-
-  test("At A fat") {
-    val inCoreA = dense((1, 2, 3, 2, 3, 1), (3, 4, 4, 4, 4, 5), (5, 5, 7, 6, 
7, 11))
-    val A = drmParallelize(m = inCoreA, numPartitions = 2)
-    val Aany = A.asInstanceOf[CheckpointedDrm[Any]]
-
-    val op = new OpAtA(Aany)
-
-    val res = FlinkOpAtA.fat(op, Aany)
-    val drm = new CheckpointedFlinkDrm(res.asRowWise.ds, _nrow=op.nrow, 
_ncol=op.ncol)
-    val output = drm.collect
-    println(output)
-
-    val expected = inCoreA.t %*% inCoreA
-    assert((output - expected).norm < 1e-6)
-  }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/mahout/blob/99a5358f/flink/src/test/scala/org/apache/mahout/flinkbindings/examples/ReadCsvExample.scala
----------------------------------------------------------------------
diff --git 
a/flink/src/test/scala/org/apache/mahout/flinkbindings/examples/ReadCsvExample.scala
 
b/flink/src/test/scala/org/apache/mahout/flinkbindings/examples/ReadCsvExample.scala
deleted file mode 100644
index 4e713c7..0000000
--- 
a/flink/src/test/scala/org/apache/mahout/flinkbindings/examples/ReadCsvExample.scala
+++ /dev/null
@@ -1,39 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.mahout.flinkbindings.examples
-
-import org.apache.flink.api.scala.ExecutionEnvironment
-import org.apache.mahout.math.drm._
-import org.apache.mahout.math.drm.RLikeDrmOps._
-import org.apache.mahout.flinkbindings._
-
-object ReadCsvExample {
-
-  def main(args: Array[String]): Unit = {
-    val filePath = "file:///c:/tmp/data/slashdot0902/Slashdot0902.txt"
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    implicit val ctx = new FlinkDistributedContext(env)
-
-    val drm = readCsv(filePath, delim = "\t", comment = "#")
-    val C = drm.t %*% drm
-    println(C.collect)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/mahout/blob/99a5358f/flink/src/test/scala/org/apache/mahout/flinkbindings/standard/ClusteringSuite.scala
----------------------------------------------------------------------
diff --git 
a/flink/src/test/scala/org/apache/mahout/flinkbindings/standard/ClusteringSuite.scala
 
b/flink/src/test/scala/org/apache/mahout/flinkbindings/standard/ClusteringSuite.scala
deleted file mode 100644
index ea86c91..0000000
--- 
a/flink/src/test/scala/org/apache/mahout/flinkbindings/standard/ClusteringSuite.scala
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.flinkbindings.standard
-
-import org.apache.mahout.flinkbindings.DistributedFlinkSuite
-import org.apache.mahout.math.algorithms.ClusteringSuiteBase
-import org.scalatest.FunSuite
-
-class ClusteringSuite extends FunSuite
-  with DistributedFlinkSuite with ClusteringSuiteBase
-

http://git-wip-us.apache.org/repos/asf/mahout/blob/99a5358f/flink/src/test/scala/org/apache/mahout/flinkbindings/standard/DrmLikeOpsSuite.scala
----------------------------------------------------------------------
diff --git 
a/flink/src/test/scala/org/apache/mahout/flinkbindings/standard/DrmLikeOpsSuite.scala
 
b/flink/src/test/scala/org/apache/mahout/flinkbindings/standard/DrmLikeOpsSuite.scala
deleted file mode 100644
index 3752187..0000000
--- 
a/flink/src/test/scala/org/apache/mahout/flinkbindings/standard/DrmLikeOpsSuite.scala
+++ /dev/null
@@ -1,28 +0,0 @@
-/**
-  * Licensed to the Apache Software Foundation (ASF) under one
-  * or more contributor license agreements. See the NOTICE file
-  * distributed with this work for additional information
-  * regarding copyright ownership. The ASF licenses this file
-  * to you under the Apache License, Version 2.0 (the
-  * "License"); you may not use this file except in compliance
-  * with the License. You may obtain a copy of the License at
-  *
-  * http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing,
-  * software distributed under the License is distributed on an
-  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-  * KIND, either express or implied. See the License for the
-  * specific language governing permissions and limitations
-  * under the License.
-  */
-package org.apache.mahout.flinkbindings.standard
-
-import org.apache.mahout.flinkbindings._
-import org.apache.mahout.math.drm._
-import org.scalatest.FunSuite
-
-class DrmLikeOpsSuite extends FunSuite with DistributedFlinkSuite
-      with DrmLikeOpsSuiteBase {
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/mahout/blob/99a5358f/flink/src/test/scala/org/apache/mahout/flinkbindings/standard/DrmLikeSuite.scala
----------------------------------------------------------------------
diff --git 
a/flink/src/test/scala/org/apache/mahout/flinkbindings/standard/DrmLikeSuite.scala
 
b/flink/src/test/scala/org/apache/mahout/flinkbindings/standard/DrmLikeSuite.scala
deleted file mode 100644
index 0a1653b..0000000
--- 
a/flink/src/test/scala/org/apache/mahout/flinkbindings/standard/DrmLikeSuite.scala
+++ /dev/null
@@ -1,28 +0,0 @@
-/**
-  * Licensed to the Apache Software Foundation (ASF) under one
-  * or more contributor license agreements. See the NOTICE file
-  * distributed with this work for additional information
-  * regarding copyright ownership. The ASF licenses this file
-  * to you under the Apache License, Version 2.0 (the
-  * "License"); you may not use this file except in compliance
-  * with the License. You may obtain a copy of the License at
-  *
-  * http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing,
-  * software distributed under the License is distributed on an
-  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-  * KIND, either express or implied. See the License for the
-  * specific language governing permissions and limitations
-  * under the License.
-  */
-package org.apache.mahout.flinkbindings.standard
-
-import org.apache.mahout.flinkbindings._
-import org.apache.mahout.math.drm._
-import org.scalatest.FunSuite
-
-class DrmLikeSuite extends FunSuite with DistributedFlinkSuite
-      with DrmLikeSuiteBase {
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/mahout/blob/99a5358f/flink/src/test/scala/org/apache/mahout/flinkbindings/standard/FlinkDistributedDecompositionsSuite.scala
----------------------------------------------------------------------
diff --git 
a/flink/src/test/scala/org/apache/mahout/flinkbindings/standard/FlinkDistributedDecompositionsSuite.scala
 
b/flink/src/test/scala/org/apache/mahout/flinkbindings/standard/FlinkDistributedDecompositionsSuite.scala
deleted file mode 100644
index a1054af..0000000
--- 
a/flink/src/test/scala/org/apache/mahout/flinkbindings/standard/FlinkDistributedDecompositionsSuite.scala
+++ /dev/null
@@ -1,221 +0,0 @@
-/**
-  * Licensed to the Apache Software Foundation (ASF) under one
-  * or more contributor license agreements. See the NOTICE file
-  * distributed with this work for additional information
-  * regarding copyright ownership. The ASF licenses this file
-  * to you under the Apache License, Version 2.0 (the
-  * "License"); you may not use this file except in compliance
-  * with the License. You may obtain a copy of the License at
-  *
-  * http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing,
-  * software distributed under the License is distributed on an
-  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-  * KIND, either express or implied. See the License for the
-  * specific language governing permissions and limitations
-  * under the License.
-  */
-package org.apache.mahout.flinkbindings.standard
-
-import org.apache.mahout.common.RandomUtils
-import org.apache.mahout.flinkbindings._
-import org.apache.mahout.math.{Matrices, SparseRowMatrix}
-import org.apache.mahout.math.decompositions._
-import org.apache.mahout.math.drm.{CacheHint, _}
-import org.scalatest.{FunSuite, Matchers}
-import org.apache.mahout.math._
-import scalabindings._
-import RLikeOps._
-import RLikeDrmOps._
-
-import scala.math._
-
-// Exact copy of the DistributedDecompositionsSuiteBase trait with the 
exception of the
-// matrix size in the dals test which has been lowered to 350 x 350 from 500 x 
500
-// due to some Flink serialization issues.
-
-class FlinkDistributedDecompositionsSuite extends FunSuite with 
DistributedFlinkSuite
-      with Matchers {this:FunSuite =>
-
-
-  test("thin distributed qr") {
-
-    val inCoreA = dense(
-      (1, 2, 3, 4),
-      (2, 3, 4, 5),
-      (3, -4, 5, 6),
-      (4, 5, 6, 7),
-      (8, 6, 7, 8)
-    )
-
-    val drmA = drmParallelize(inCoreA, numPartitions = 2)
-    val (drmQ, inCoreR) = dqrThin(drmA, checkRankDeficiency = false)
-
-    // Assert optimizer still knows Q and A are identically partitioned
-    drmQ.partitioningTag should equal(drmA.partitioningTag)
-
-    //    drmQ.rdd.partitions.size should be(A.rdd.partitions.size)
-    //
-    //    // Should also be zippable
-    //    drmQ.rdd.zip(other = A.rdd)
-
-    val inCoreQ = drmQ.collect
-
-    printf("A=\n%s\n", inCoreA)
-    printf("Q=\n%s\n", inCoreQ)
-    printf("R=\n%s\n", inCoreR)
-
-    val (qControl, rControl) = qr(inCoreA)
-    printf("qControl=\n%s\n", qControl)
-    printf("rControl=\n%s\n", rControl)
-
-    // Validate with Cholesky
-    val ch = chol(inCoreA.t %*% inCoreA)
-    printf("A'A=\n%s\n", inCoreA.t %*% inCoreA)
-    printf("L:\n%s\n", ch.getL)
-
-    val rControl2 = (ch.getL cloned).t
-    val qControl2 = ch.solveRight(inCoreA)
-    printf("qControl2=\n%s\n", qControl2)
-    printf("rControl2=\n%s\n", rControl2)
-
-    // Householder approach seems to be a little bit more stable
-    (rControl - inCoreR).norm should be < 1E-5
-    (qControl - inCoreQ).norm should be < 1E-5
-
-    // Assert identicity with in-core Cholesky-based -- this should be tighter.
-    (rControl2 - inCoreR).norm should be < 1E-10
-    (qControl2 - inCoreQ).norm should be < 1E-10
-
-    // Assert orthogonality:
-    // (a) Q[,j] dot Q[,j] == 1.0 for all j
-    // (b) Q[,i] dot Q[,j] == 0.0 for all i != j
-    for (col <- 0 until inCoreQ.ncol)
-      ((inCoreQ(::, col) dot inCoreQ(::, col)) - 1.0).abs should be < 1e-10
-    for (col1 <- 0 until inCoreQ.ncol - 1; col2 <- col1 + 1 until inCoreQ.ncol)
-      (inCoreQ(::, col1) dot inCoreQ(::, col2)).abs should be < 1e-10
-
-
-  }
-
-  test("dssvd - the naive-est - q=0") {
-    dssvdNaive(q = 0)
-  }
-
-  test("ddsvd - naive - q=1") {
-    dssvdNaive(q = 1)
-  }
-
-  test("ddsvd - naive - q=2") {
-    dssvdNaive(q = 2)
-  }
-
-
-  def dssvdNaive(q: Int) {
-    val inCoreA = dense(
-      (1, 2, 3, 4),
-      (2, 3, 4, 5),
-      (3, -4, 5, 6),
-      (4, 5, 6, 7),
-      (8, 6, 7, 8)
-    )
-    val drmA = drmParallelize(inCoreA, numPartitions = 2)
-
-    val (drmU, drmV, s) = dssvd(drmA, k = 4, q = q)
-    val (inCoreU, inCoreV) = (drmU.collect, drmV.collect)
-
-    printf("U:\n%s\n", inCoreU)
-    printf("V:\n%s\n", inCoreV)
-    printf("Sigma:\n%s\n", s)
-
-    (inCoreA - (inCoreU %*%: diagv(s)) %*% inCoreV.t).norm should be < 1E-5
-  }
-
-  test("dspca") {
-
-    val rnd = RandomUtils.getRandom
-
-    // Number of points
-    val m = 500
-    // Length of actual spectrum
-    val spectrumLen = 40
-
-    val spectrum = dvec((0 until spectrumLen).map(x => 300.0 * exp(-x) max 
1e-3))
-    printf("spectrum:%s\n", spectrum)
-
-    val (u, _) = qr(new SparseRowMatrix(m, spectrumLen) :=
-      ((r, c, v) => if (rnd.nextDouble() < 0.2) 0 else rnd.nextDouble() + 5.0))
-
-    // PCA Rotation matrix -- should also be orthonormal.
-    val (tr, _) = qr(Matrices.symmetricUniformView(spectrumLen, spectrumLen, 
rnd.nextInt) - 10.0)
-
-    val input = (u %*%: diagv(spectrum)) %*% tr.t
-    val drmInput = drmParallelize(m = input, numPartitions = 2)
-
-    // Calculate just first 10 principal factors and reduce dimensionality.
-    // Since we assert just validity of the s-pca, not stochastic error, we 
bump p parameter to
-    // ensure to zero stochastic error and assert only functional correctness 
of the method's pca-
-    // specific additions.
-    val k = 10
-
-    // Calculate just first 10 principal factors and reduce dimensionality.
-    var (drmPCA, _, s) = dspca(drmA = drmInput, k = 10, p = spectrumLen, q = 1)
-    // Un-normalized pca data:
-    drmPCA = drmPCA %*% diagv(s)
-
-    val pca = drmPCA.checkpoint(CacheHint.NONE).collect
-
-    // Of course, once we calculated the pca, the spectrum is going to be 
different since our originally
-    // generated input was not centered. So here, we'd just brute-solve pca to 
verify
-    val xi = input.colMeans()
-    for (r <- 0 until input.nrow) input(r, ::) -= xi
-    var (pcaControl, _, sControl) = svd(m = input)
-    pcaControl = (pcaControl %*%: diagv(sControl))(::, 0 until k)
-
-    printf("pca:\n%s\n", pca(0 until 10, 0 until 10))
-    printf("pcaControl:\n%s\n", pcaControl(0 until 10, 0 until 10))
-
-    (pca(0 until 10, 0 until 10).norm - pcaControl(0 until 10, 0 until 
10).norm).abs should be < 1E-5
-
-  }
-
-  test("dals") {
-
-    val rnd = RandomUtils.getRandom
-
-    // Number of points
-    val m = 350
-    val n = 350
-
-    // Length of actual spectrum
-    val spectrumLen = 40
-
-    // Create singluar values with decay
-    val spectrum = dvec((0 until spectrumLen).map(x => 300.0 * exp(-x) max 
1e-3))
-    printf("spectrum:%s\n", spectrum)
-
-    // Create A as an ideal input
-    val inCoreA = (qr(Matrices.symmetricUniformView(m, spectrumLen, 1234))._1 
%*%: diagv(spectrum)) %*%
-      qr(Matrices.symmetricUniformView(n, spectrumLen, 2345))._1.t
-    val drmA = drmParallelize(inCoreA, numPartitions = 2)
-
-    // Decompose using ALS
-    val (drmU, drmV, rmse) = dals(drmA = drmA, k = 20).toTuple
-    val inCoreU = drmU.collect
-    val inCoreV = drmV.collect
-
-    val predict = inCoreU %*% inCoreV.t
-
-    printf("Control block:\n%s\n", inCoreA(0 until 3, 0 until 3))
-    printf("ALS factorized approximation block:\n%s\n", predict(0 until 3, 0 
until 3))
-
-    val err = (inCoreA - predict).norm
-    printf("norm of residuals %f\n", err)
-    printf("train iteration rmses: %s\n", rmse)
-
-    err should be < 15e-2
-
-  }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/mahout/blob/99a5358f/flink/src/test/scala/org/apache/mahout/flinkbindings/standard/NaiveBayesTestSuite.scala
----------------------------------------------------------------------
diff --git 
a/flink/src/test/scala/org/apache/mahout/flinkbindings/standard/NaiveBayesTestSuite.scala
 
b/flink/src/test/scala/org/apache/mahout/flinkbindings/standard/NaiveBayesTestSuite.scala
deleted file mode 100644
index 0f1d6bc..0000000
--- 
a/flink/src/test/scala/org/apache/mahout/flinkbindings/standard/NaiveBayesTestSuite.scala
+++ /dev/null
@@ -1,29 +0,0 @@
-/**
-  * Licensed to the Apache Software Foundation (ASF) under one
-  * or more contributor license agreements. See the NOTICE file
-  * distributed with this work for additional information
-  * regarding copyright ownership. The ASF licenses this file
-  * to you under the Apache License, Version 2.0 (the
-  * "License"); you may not use this file except in compliance
-  * with the License. You may obtain a copy of the License at
-  *
-  * http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing,
-  * software distributed under the License is distributed on an
-  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-  * KIND, either express or implied. See the License for the
-  * specific language governing permissions and limitations
-  * under the License.
-  */
-package org.apache.mahout.flinkbindings.standard
-
-import org.apache.mahout.classifier.naivebayes.NBTestBase
-import org.apache.mahout.flinkbindings._
-import org.scalatest.FunSuite
-
-
-class NaiveBayesTestSuite extends FunSuite with DistributedFlinkSuite
-      with NBTestBase {
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/mahout/blob/99a5358f/flink/src/test/scala/org/apache/mahout/flinkbindings/standard/PreprocessorSuite.scala
----------------------------------------------------------------------
diff --git 
a/flink/src/test/scala/org/apache/mahout/flinkbindings/standard/PreprocessorSuite.scala
 
b/flink/src/test/scala/org/apache/mahout/flinkbindings/standard/PreprocessorSuite.scala
deleted file mode 100644
index 5e2b4ee..0000000
--- 
a/flink/src/test/scala/org/apache/mahout/flinkbindings/standard/PreprocessorSuite.scala
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.flinkbindings.standard
-
-
-import org.apache.mahout.flinkbindings.DistributedFlinkSuite
-import org.apache.mahout.math.algorithms.PreprocessorSuiteBase
-import org.scalatest.FunSuite
-
-class PreprocessorSuite extends FunSuite
-  with DistributedFlinkSuite with PreprocessorSuiteBase

http://git-wip-us.apache.org/repos/asf/mahout/blob/99a5358f/flink/src/test/scala/org/apache/mahout/flinkbindings/standard/RLikeDrmOpsSuite.scala
----------------------------------------------------------------------
diff --git 
a/flink/src/test/scala/org/apache/mahout/flinkbindings/standard/RLikeDrmOpsSuite.scala
 
b/flink/src/test/scala/org/apache/mahout/flinkbindings/standard/RLikeDrmOpsSuite.scala
deleted file mode 100644
index 8bb1b02..0000000
--- 
a/flink/src/test/scala/org/apache/mahout/flinkbindings/standard/RLikeDrmOpsSuite.scala
+++ /dev/null
@@ -1,28 +0,0 @@
-/**
-  * Licensed to the Apache Software Foundation (ASF) under one
-  * or more contributor license agreements. See the NOTICE file
-  * distributed with this work for additional information
-  * regarding copyright ownership. The ASF licenses this file
-  * to you under the Apache License, Version 2.0 (the
-  * "License"); you may not use this file except in compliance
-  * with the License. You may obtain a copy of the License at
-  *
-  * http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing,
-  * software distributed under the License is distributed on an
-  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-  * KIND, either express or implied. See the License for the
-  * specific language governing permissions and limitations
-  * under the License.
-  */
-package org.apache.mahout.flinkbindings.standard
-
-import org.apache.mahout.flinkbindings._
-import org.apache.mahout.math.drm._
-import org.scalatest.FunSuite
-
-class RLikeDrmOpsSuite extends FunSuite with DistributedFlinkSuite
-      with RLikeDrmOpsSuiteBase {
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/mahout/blob/99a5358f/flink/src/test/scala/org/apache/mahout/flinkbindings/standard/RegressionSuite.scala
----------------------------------------------------------------------
diff --git 
a/flink/src/test/scala/org/apache/mahout/flinkbindings/standard/RegressionSuite.scala
 
b/flink/src/test/scala/org/apache/mahout/flinkbindings/standard/RegressionSuite.scala
deleted file mode 100644
index 5cb6183..0000000
--- 
a/flink/src/test/scala/org/apache/mahout/flinkbindings/standard/RegressionSuite.scala
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.flinkbindings.standard
-
-
-import org.apache.mahout.flinkbindings.DistributedFlinkSuite
-import org.apache.mahout.math.algorithms.RegressionSuiteBase
-import org.scalatest.FunSuite
-
-class RegressionSuite extends FunSuite
-  with DistributedFlinkSuite with RegressionSuiteBase
-

Reply via email to