http://git-wip-us.apache.org/repos/asf/mahout/blob/99a5358f/flink/pom.xml
----------------------------------------------------------------------
diff --git a/flink/pom.xml b/flink/pom.xml
deleted file mode 100644
index 4d21166..0000000
--- a/flink/pom.xml
+++ /dev/null
@@ -1,237 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements.  See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License.  You may obtain a copy of the License at
-
-     http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-
-<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
-  <modelVersion>4.0.0</modelVersion>
-
-  <parent>
-    <groupId>org.apache.mahout</groupId>
-    <artifactId>mahout</artifactId>
-    <version>0.13.0-SNAPSHOT</version>
-    <relativePath>../pom.xml</relativePath>
-  </parent>
-
-  <artifactId>mahout-flink_${scala.compat.version}</artifactId>
-  <name>Mahout Flink bindings</name>
-  <description>
-    Mahout Bindings for Apache Flink
-  </description>
-
-  <packaging>jar</packaging>
-
-  <build>
-    <plugins>
-      <!-- copy jars to top directory, which is MAHOUT_HOME -->
-      <plugin>
-        <artifactId>maven-antrun-plugin</artifactId>
-        <version>1.4</version>
-        <executions>
-          <execution>
-            <id>copy</id>
-            <phase>package</phase>
-            <configuration>
-              <tasks>
-                <copy 
file="target/mahout-flink_${scala.compat.version}-${version}.jar" 
tofile="../mahout-flink_${scala.compat.version}-${version}.jar"/>
-              </tasks>
-            </configuration>
-            <goals>
-              <goal>run</goal>
-            </goals>
-          </execution>
-        </executions>
-      </plugin>
-      <plugin>
-        <artifactId>maven-javadoc-plugin</artifactId>
-      </plugin>
-
-      <plugin>
-        <artifactId>maven-source-plugin</artifactId>
-      </plugin>
-
-      <plugin>
-        <groupId>net.alchim31.maven</groupId>
-        <artifactId>scala-maven-plugin</artifactId>
-        <executions>
-          <execution>
-            <id>add-scala-sources</id>
-            <phase>initialize</phase>
-            <goals>
-              <goal>add-source</goal>
-            </goals>
-          </execution>
-          <execution>
-            <id>scala-compile</id>
-            <phase>process-resources</phase>
-            <goals>
-              <goal>compile</goal>
-            </goals>
-          </execution>
-          <execution>
-            <id>scala-test-compile</id>
-            <phase>process-test-resources</phase>
-            <goals>
-              <goal>testCompile</goal>
-            </goals>
-          </execution>
-        </executions>
-      </plugin>
-
-      <!--this is what scalatest recommends to do to enable scala tests -->
-
-      <!-- disable surefire -->
-      <plugin>
-        <groupId>org.apache.maven.plugins</groupId>
-        <artifactId>maven-surefire-plugin</artifactId>
-        <configuration>
-          <systemPropertyVariables>
-            <mahout.home>${project.build.directory}</mahout.home>
-          </systemPropertyVariables>
-          <skipTests>true</skipTests>
-        </configuration>
-      </plugin>
-
-      <!-- enable scalatest -->
-      <plugin>
-        <groupId>org.scalatest</groupId>
-        <artifactId>scalatest-maven-plugin</artifactId>
-        <executions>
-          <execution>
-            <id>test</id>
-            <goals>
-              <goal>test</goal>
-            </goals>
-          </execution>
-        </executions>
-        <configuration>
-          <systemProperties>
-            <mahout.home>${project.build.directory}</mahout.home>
-          </systemProperties>
-          <argLine>-Xmx4g</argLine>
-        </configuration>
-      </plugin>
-      <!-- remove jars from top directory on clean -->
-      <plugin>
-        <artifactId>maven-clean-plugin</artifactId>
-        <version>3.0.0</version>
-        <configuration>
-          <filesets>
-            <fileset>
-              <directory>../</directory>
-              <includes>
-                <include>mahout-flink*.jar</include>
-              </includes>
-              <followSymlinks>false</followSymlinks>
-            </fileset>
-          </filesets>
-        </configuration>
-      </plugin>
-    </plugins>
-  </build>
-
-  <dependencies>
-
-    <dependency>
-      <groupId>org.apache.flink</groupId>
-      <artifactId>flink-runtime_${scala.compat.version}</artifactId>
-      <version>${flink.version}</version>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.flink</groupId>
-      <artifactId>flink-scala_${scala.compat.version}</artifactId>
-      <version>${flink.version}</version>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.flink</groupId>
-      <artifactId>flink-java</artifactId>
-      <version>${flink.version}</version>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.flink</groupId>
-      <artifactId>flink-core</artifactId>
-      <version>${flink.version}</version>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.flink</groupId>
-      <artifactId>flink-clients_${scala.compat.version}</artifactId>
-      <version>${flink.version}</version>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.flink</groupId>
-      <artifactId>flink-test-utils_${scala.compat.version}</artifactId>
-      <version>${flink.version}</version>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.flink</groupId>
-      <artifactId>flink-tests_${scala.compat.version}</artifactId>
-      <version>${flink.version}</version>
-    </dependency>
-
-    <dependency>
-      <groupId>org.apache.mahout</groupId>
-      <artifactId>mahout-math-scala_${scala.compat.version}</artifactId>
-    </dependency>
-
-    <dependency>
-      <groupId>org.bytedeco</groupId>
-      <artifactId>javacpp</artifactId>
-      <version>1.2.2</version>
-    </dependency>
-
-    <!-- enforce current version of kryo as of 0.10.1-->
-    <dependency>
-      <groupId>com.esotericsoftware.kryo</groupId>
-      <artifactId>kryo</artifactId>
-      <version>2.24.0</version>
-    </dependency>
-
-    <dependency>
-      <groupId>org.apache.mahout</groupId>
-      <artifactId>mahout-hdfs</artifactId>
-      <exclusions>
-        <exclusion>
-          <groupId>asm</groupId>
-          <artifactId>asm</artifactId>
-        </exclusion>
-      </exclusions>
-    </dependency>
-
-    <dependency>
-      <groupId>org.apache.mahout</groupId>
-      <artifactId>mahout-math-scala_${scala.compat.version}</artifactId>
-      <classifier>tests</classifier>
-      <scope>test</scope>
-    </dependency>
-
-    <!--  3rd-party -->
-    <dependency>
-      <groupId>com.google.guava</groupId>
-      <artifactId>guava</artifactId>
-    </dependency>
-
-    <!-- tests -->
-    <dependency>
-      <groupId>junit</groupId>
-      <artifactId>junit</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>org.scalatest</groupId>
-      <artifactId>scalatest_${scala.compat.version}</artifactId>
-    </dependency>
-  </dependencies>
-</project>

http://git-wip-us.apache.org/repos/asf/mahout/blob/99a5358f/flink/src/main/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/flink/src/main/resources/log4j.properties 
b/flink/src/main/resources/log4j.properties
deleted file mode 100644
index 073a00d..0000000
--- a/flink/src/main/resources/log4j.properties
+++ /dev/null
@@ -1,8 +0,0 @@
-# Root logger option
-log4j.rootLogger=info, stdout
-
-# Direct log messages to stdout
-log4j.appender.stdout=org.apache.log4j.ConsoleAppender
-log4j.appender.stdout.target=System.out 
-log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
-log4j.appender.stdout.layout.conversionPattern=%d{ABSOLUTE} %5p %t %c{1}:%M:%L 
- %m%n 
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/mahout/blob/99a5358f/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkByteBCast.scala
----------------------------------------------------------------------
diff --git 
a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkByteBCast.scala 
b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkByteBCast.scala
deleted file mode 100644
index 5cdfb79..0000000
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkByteBCast.scala
+++ /dev/null
@@ -1,90 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.mahout.flinkbindings
-
-import org.apache.mahout.math.Matrix
-import org.apache.mahout.math.MatrixWritable
-import org.apache.mahout.math.Vector
-import org.apache.mahout.math.VectorWritable
-import org.apache.mahout.math.drm.BCast
-
-import com.google.common.io.ByteStreams
-
-/**
- * FlinkByteBCast wraps vector/matrix objects, represents them as byte arrays, 
and when 
- * it's used in UDFs, they are serialized using standard Java serialization 
along with 
- * UDFs (as a part of closure) and broadcasted to worker nodes. 
- * 
- * There should be a smarter way of doing it with some macro and then 
rewriting the UDF and 
- * appending `withBroadcastSet` to flink dataSet pipeline, but it's not 
implemented at the moment.  
- */
-class FlinkByteBCast[T](private val arr: Array[Byte]) extends BCast[T] with 
Serializable {
-
-  private lazy val _value = {
-    val stream = ByteStreams.newDataInput(arr)
-    val streamType = stream.readInt()
-
-    if (streamType == FlinkByteBCast.StreamTypeVector) {
-      val writeable = new VectorWritable()
-      writeable.readFields(stream)
-    //  printf("broadcastValue: \n%s\n",writeable.get.asInstanceOf[T])
-      writeable.get.asInstanceOf[T]
-    } else if (streamType == FlinkByteBCast.StreamTypeMatrix) {
-      val writeable = new MatrixWritable()
-      writeable.readFields(stream)
-     // printf("broadcastValue: \n%s\n",writeable.get.asInstanceOf[T])
-      writeable.get.asInstanceOf[T]
-    } else {
-      throw new IllegalArgumentException(s"unexpected type tag $streamType")
-    }
-
-  }
-
-  override def value: T = _value
-
-  override def close: Unit = {
-    // nothing to close
-  }
-
-}
-
-object FlinkByteBCast {
-
-  val StreamTypeVector = 0x0000
-  val StreamTypeMatrix = 0xFFFF
-
-  def wrap(v: Vector): FlinkByteBCast[Vector] = {
-    val writeable = new VectorWritable(v)
-    val dataOutput = ByteStreams.newDataOutput()
-    dataOutput.writeInt(StreamTypeVector)
-    writeable.write(dataOutput)
-    val array = dataOutput.toByteArray()
-    new FlinkByteBCast[Vector](array)
-  }
-
-  def wrap(m: Matrix): FlinkByteBCast[Matrix] = {
-    val writeable = new MatrixWritable(m)
-    val dataOutput = ByteStreams.newDataOutput()
-    dataOutput.writeInt(StreamTypeMatrix)
-    writeable.write(dataOutput)
-    val array = dataOutput.toByteArray()
-    new FlinkByteBCast[Matrix](array)
-  }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/mahout/blob/99a5358f/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkDistributedContext.scala
----------------------------------------------------------------------
diff --git 
a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkDistributedContext.scala
 
b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkDistributedContext.scala
deleted file mode 100644
index 7a61ee6..0000000
--- 
a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkDistributedContext.scala
+++ /dev/null
@@ -1,50 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.mahout.flinkbindings
-
-import org.apache.flink.api.scala.ExecutionEnvironment
-import org.apache.flink.configuration.GlobalConfiguration
-import org.apache.mahout.math.drm.DistributedContext
-import org.apache.mahout.math.drm.DistributedEngine
-
-class FlinkDistributedContext(val env: ExecutionEnvironment) extends 
DistributedContext {
-
-  val mahoutHome = getMahoutHome()
-
-  GlobalConfiguration.loadConfiguration(mahoutHome + 
"/src/conf/flink-config.yaml")
-
-  val conf = GlobalConfiguration.getConfiguration
-
-  var degreeOfParallelism: Int = 0
-
-  if (conf != null) {
-    degreeOfParallelism = conf.getInteger("parallelism.default", 
Runtime.getRuntime.availableProcessors)
-  } else {
-    degreeOfParallelism = Runtime.getRuntime.availableProcessors
-  }
-
-  env.setParallelism(degreeOfParallelism)
-  
-  val engine: DistributedEngine = FlinkEngine
-
-  override def close() {
-    // TODO
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/mahout/blob/99a5358f/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
----------------------------------------------------------------------
diff --git 
a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala 
b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
deleted file mode 100644
index f1d23b2..0000000
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
+++ /dev/null
@@ -1,416 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.mahout.flinkbindings
-
-import org.apache.flink.api.common.functions.MapFunction
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.typeutils.TypeExtractor
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.utils.DataSetUtils
-import org.apache.hadoop.io.{IntWritable, LongWritable, Text}
-import org.apache.mahout.flinkbindings.blas._
-import org.apache.mahout.flinkbindings.drm._
-import org.apache.mahout.flinkbindings.io.{HDFSUtil, Hadoop2HDFSUtil}
-import org.apache.mahout.math._
-import org.apache.mahout.math.drm._
-import org.apache.mahout.math.drm.logical._
-import org.apache.mahout.math.indexeddataset.{BiDictionary, IndexedDataset, 
Schema}
-import org.apache.mahout.math.scalabindings.RLikeOps._
-import org.apache.mahout.math.scalabindings._
-
-import scala.collection.JavaConversions._
-import scala.reflect._
-
-object FlinkEngine extends DistributedEngine {
-
-  // By default, use Hadoop 2 utils
-  var hdfsUtils: HDFSUtil = Hadoop2HDFSUtil
-
-  /**
-    * Load DRM from hdfs (as in Mahout DRM format).
-    *
-    * @param path   The DFS path to load from
-    * @param parMin Minimum parallelism after load (equivalent to 
#par(min=...)).
-    */
-  override def drmDfsRead(path: String, parMin: Int = 1)
-                         (implicit dc: DistributedContext): CheckpointedDrm[_] 
= {
-
-    // Require that context is actually Flink context.
-    require(dc.isInstanceOf[FlinkDistributedContext], "Supplied context must 
be for the Flink backend.")
-
-    // Extract the Flink Environment variable
-    implicit val env = dc.asInstanceOf[FlinkDistributedContext].env
-
-    // set the parallelism of the env to parMin
-    env.setParallelism(parMin)
-
-    // get the header of a SequenceFile in the path
-    val metadata = hdfsUtils.readDrmHeader(path + "//")
-
-    val keyClass: Class[_] = metadata.keyTypeWritable
-
-    // from the header determine which function to use to unwrap the key
-    val unwrapKey = metadata.unwrapKeyFunction
-
-    // Map to the correct DrmLike based on the metadata information
-    if (metadata.keyClassTag == ClassTag.Int) {
-      val ds = env.readSequenceFile(classOf[IntWritable], 
classOf[VectorWritable], path)
-
-      val res = ds.map(new MapFunction[(IntWritable, VectorWritable), (Int, 
Vector)] {
-        def map(tuple: (IntWritable, VectorWritable)): (Int, Vector) = {
-          (unwrapKey(tuple._1).asInstanceOf[Int], tuple._2.get())
-        }
-      })
-      datasetWrap(res)(metadata.keyClassTag.asInstanceOf[ClassTag[Int]])
-    } else if (metadata.keyClassTag == ClassTag.Long) {
-      val ds = env.readSequenceFile(classOf[LongWritable], 
classOf[VectorWritable], path)
-
-      val res = ds.map(new MapFunction[(LongWritable, VectorWritable), (Long, 
Vector)] {
-        def map(tuple: (LongWritable, VectorWritable)): (Long, Vector) = {
-          (unwrapKey(tuple._1).asInstanceOf[Long], tuple._2.get())
-        }
-      })
-      datasetWrap(res)(metadata.keyClassTag.asInstanceOf[ClassTag[Long]])
-    } else if (metadata.keyClassTag == ClassTag(classOf[String])) {
-      val ds = env.readSequenceFile(classOf[Text], classOf[VectorWritable], 
path)
-
-      val res = ds.map(new MapFunction[(Text, VectorWritable), (String, 
Vector)] {
-        def map(tuple: (Text, VectorWritable)): (String, Vector) = {
-          (unwrapKey(tuple._1).asInstanceOf[String], tuple._2.get())
-        }
-      })
-      datasetWrap(res)(metadata.keyClassTag.asInstanceOf[ClassTag[String]])
-    } else throw new IllegalArgumentException(s"Unsupported DRM key 
type:${keyClass.getName}")
-
-  }
-
-  override def indexedDatasetDFSRead(src: String, schema: Schema, 
existingRowIDs: Option[BiDictionary])
-                                    (implicit sc: DistributedContext): 
IndexedDataset = ???
-
-  override def indexedDatasetDFSReadElements(src: String, schema: Schema, 
existingRowIDs: Option[BiDictionary])
-                                            (implicit sc: DistributedContext): 
IndexedDataset = ???
-
-
-  /**
-    * Perform default expression rewrite. Return physical plan that we can 
pass to exec(). <P>
-    *
-    * A particular physical engine implementation may choose to either use or 
not use these rewrites
-    * as a useful basic rewriting rule.<P>
-    */
-  override def optimizerRewrite[K: ClassTag](action: DrmLike[K]): DrmLike[K] = 
super.optimizerRewrite(action)
-
-  /**
-    * Translates logical plan into Flink execution plan.
-    **/
-  override def toPhysical[K: ClassTag](plan: DrmLike[K], ch: 
CacheHint.CacheHint): CheckpointedDrm[K] = {
-    // Flink-specific Physical Plan translation.
-
-    implicit val typeInformation = generateTypeInformation[K]
-    val drm = flinkTranslate(plan)
-    val newcp = new CheckpointedFlinkDrm(ds = drm.asRowWise.ds, _nrow = 
plan.nrow, _ncol = plan.ncol)
-    newcp.cache()
-  }
-
-
-  private def flinkTranslate[K](oper: DrmLike[K]): FlinkDrm[K] = {
-    implicit val kTag = oper.keyClassTag
-    implicit val typeInformation = generateTypeInformation[K]
-    oper match {
-      case OpAtAnyKey(_) ⇒
-        throw new IllegalArgumentException("\"A\" must be Int-keyed in this 
A.t expression.")
-      case op@OpAx(a, x) ⇒
-        FlinkOpAx.blockifiedBroadcastAx(op, flinkTranslate(a))
-      case op@OpAt(a) if op.keyClassTag == ClassTag.Int ⇒ 
FlinkOpAt.sparseTrick(op, flinkTranslate(a)).asInstanceOf[FlinkDrm[K]]
-      case op@OpAtx(a, x) if op.keyClassTag == ClassTag.Int ⇒
-        FlinkOpAx.atx_with_broadcast(op, 
flinkTranslate(a)).asInstanceOf[FlinkDrm[K]]
-      case op@OpAtB(a, b) ⇒ FlinkOpAtB.notZippable(op, flinkTranslate(a),
-        flinkTranslate(b)).asInstanceOf[FlinkDrm[K]]
-      case op@OpABt(a, b) ⇒
-        // express ABt via AtB: let C=At and D=Bt, and calculate CtD
-        // TODO: create specific implementation of ABt, see MAHOUT-1750
-        val opAt = OpAt(a.asInstanceOf[DrmLike[Int]]) // TODO: casts!
-        val at = FlinkOpAt.sparseTrick(opAt, 
flinkTranslate(a.asInstanceOf[DrmLike[Int]]))
-        val c = new CheckpointedFlinkDrm(at.asRowWise.ds, _nrow = opAt.nrow, 
_ncol = opAt.ncol)
-        val opBt = OpAt(b.asInstanceOf[DrmLike[Int]]) // TODO: casts!
-        val bt = FlinkOpAt.sparseTrick(opBt, 
flinkTranslate(b.asInstanceOf[DrmLike[Int]]))
-        val d = new CheckpointedFlinkDrm(bt.asRowWise.ds, _nrow = opBt.nrow, 
_ncol = opBt.ncol)
-        FlinkOpAtB.notZippable(OpAtB(c, d), flinkTranslate(c), 
flinkTranslate(d)).asInstanceOf[FlinkDrm[K]]
-      case op@OpAtA(a) if op.keyClassTag == ClassTag.Int ⇒ 
FlinkOpAtA.at_a(op, flinkTranslate(a)).asInstanceOf[FlinkDrm[K]]
-      case op@OpTimesRightMatrix(a, b) ⇒
-        FlinkOpTimesRightMatrix.drmTimesInCore(op, flinkTranslate(a), b)
-      case op@OpAewUnaryFunc(a, _, _) ⇒
-        FlinkOpAewScalar.opUnaryFunction(op, flinkTranslate(a))
-      case op@OpAewUnaryFuncFusion(a, _) ⇒
-        FlinkOpAewScalar.opUnaryFunction(op, flinkTranslate(a))
-      // deprecated
-      case op@OpAewScalar(a, scalar, _) ⇒
-        FlinkOpAewScalar.opScalarNoSideEffect(op, flinkTranslate(a), scalar)
-      case op@OpAewB(a, b, _) ⇒
-        FlinkOpAewB.rowWiseJoinNoSideEffect(op, flinkTranslate(a), 
flinkTranslate(b))
-      case op@OpCbind(a, b) ⇒
-        FlinkOpCBind.cbind(op, flinkTranslate(a), flinkTranslate(b))
-      case op@OpRbind(a, b) ⇒
-        FlinkOpRBind.rbind(op, flinkTranslate(a), flinkTranslate(b))
-      case op@OpCbindScalar(a, x, _) ⇒
-        FlinkOpCBind.cbindScalar(op, flinkTranslate(a), x)
-      case op@OpRowRange(a, _) ⇒
-        FlinkOpRowRange.slice(op, flinkTranslate(a)).asInstanceOf[FlinkDrm[K]]
-      case op@OpABAnyKey(a, b) if a.keyClassTag != b.keyClassTag ⇒
-        throw new IllegalArgumentException("DRMs A and B have different 
indices, cannot multiply them")
-      case op: OpMapBlock[_, K] ⇒
-        FlinkOpMapBlock.apply(flinkTranslate(op.A), op.ncol, op)
-      case cp: CheckpointedDrm[K] ⇒ cp
-      case _ ⇒
-        throw new NotImplementedError(s"operator $oper is not implemented yet")
-    }
-  }
-
-  /**
-    * returns a vector that contains a column-wise sum from DRM
-    */
-  override def colSums[K](drm: CheckpointedDrm[K]): Vector = {
-    implicit val kTag: ClassTag[K] = drm.keyClassTag
-    implicit val typeInformation = generateTypeInformation[K]
-
-
-    val sum = drm.ds.map {
-      tuple => tuple._2
-    }.reduce(_ + _)
-
-    val list = sum.collect
-    list.head
-  }
-
-  /** Engine-specific numNonZeroElementsPerColumn implementation based on a 
checkpoint. */
-  override def numNonZeroElementsPerColumn[K](drm: CheckpointedDrm[K]): Vector 
= {
-    implicit val kTag: ClassTag[K] = drm.keyClassTag
-    implicit val typeInformation = generateTypeInformation[K]
-
-
-    val result = drm.asBlockified.ds.map {
-      tuple =>
-        val block = tuple._2
-        val acc = block(0, ::).like()
-
-        block.foreach { v =>
-          v.nonZeroes().foreach { el => acc(el.index()) = acc(el.index()) + 1 }
-        }
-
-        acc
-    }.reduce(_ + _)
-
-    val list = result.collect
-    list.head
-  }
-
-  /**
-    * returns a vector that contains a column-wise mean from DRM
-    */
-  override def colMeans[K](drm: CheckpointedDrm[K]): Vector = {
-    drm.colSums() / drm.nrow
-  }
-
-  /**
-    * Calculates the element-wise squared norm of a matrix
-    */
-  override def norm[K](drm: CheckpointedDrm[K]): Double = {
-    implicit val kTag: ClassTag[K] = drm.keyClassTag
-    implicit val typeInformation = generateTypeInformation[K]
-
-    val sumOfSquares = drm.ds.map {
-      tuple => tuple match {
-        case (idx, vec) => vec dot vec
-      }
-    }.reduce(_ + _)
-
-    val list = sumOfSquares.collect
-
-    // check on this --why is it returning a list?
-    math.sqrt(list.head)
-  }
-
-  /** Broadcast support */
-  override def drmBroadcast(v: Vector)(implicit dc: DistributedContext): 
BCast[Vector] =
-    FlinkByteBCast.wrap(v)
-
-  /** Broadcast support */
-  override def drmBroadcast(m: Matrix)(implicit dc: DistributedContext): 
BCast[Matrix] =
-    FlinkByteBCast.wrap(m)
-
-  /** Parallelize in-core matrix as flink distributed matrix, using row 
ordinal indices as data set keys. */
-  // The 'numPartitions' parameter is not honored in this call,
-  // as Flink sets a global parallelism in ExecutionEnvironment
-  override def drmParallelizeWithRowIndices(m: Matrix, numPartitions: Int = 1)
-                                           (implicit dc: DistributedContext): 
CheckpointedDrm[Int] = {
-
-    val parallelDrm = parallelize(m, numPartitions)
-
-    new CheckpointedFlinkDrm(ds = parallelDrm, _nrow = m.numRows(), _ncol = 
m.numCols())
-  }
-
-  // The 'parallelismDegree' parameter is not honored in this call,
-  // as Flink sets a global parallelism in ExecutionEnvironment
-  private[flinkbindings] def parallelize(m: Matrix, parallelismDegree: Int)
-                                        (implicit dc: DistributedContext): 
DrmDataSet[Int] = {
-    val rows = (0 until m.nrow).map(i => (i, m(i, ::)))
-    val dataSetType = TypeExtractor.getForObject(rows.head)
-    dc.env.fromCollection(rows).partitionByRange(0)
-  }
-
-  /** Parallelize in-core matrix as flink distributed matrix, using row labels 
as a data set keys. */
-  // The 'numPartitions' parameter is not honored in this call,
-  // as Flink sets a global parallelism in ExecutionEnvironment
-  override def drmParallelizeWithRowLabels(m: Matrix, numPartitions: Int = 1)
-                                          (implicit dc: DistributedContext): 
CheckpointedDrm[String] = {
-
-    val rb = m.getRowLabelBindings
-    val p = for (i: String ← rb.keySet().toIndexedSeq) yield i → m(rb(i), 
::)
-
-    new CheckpointedFlinkDrm[String](dc.env.fromCollection(p),
-      _nrow = m.nrow, _ncol = m.ncol, cacheHint = CacheHint.NONE)
-  }
-
-  /** This creates an empty DRM with specified number of partitions and 
cardinality. */
-  override def drmParallelizeEmpty(nrow: Int, ncol: Int, numPartitions: Int = 
10)
-                                  (implicit dc: DistributedContext): 
CheckpointedDrm[Int] = {
-    val nonParallelResult = (0 to numPartitions).flatMap { part ⇒
-      val partNRow = (nrow - 1) / numPartitions + 1
-      val partStart = partNRow * part
-      val partEnd = Math.min(partStart + partNRow, nrow)
-
-      for (i <- partStart until partEnd) yield (i, new 
RandomAccessSparseVector(ncol): Vector)
-    }
-    val result = dc.env.fromCollection(nonParallelResult)
-    new CheckpointedFlinkDrm[Int](ds = result, _nrow = nrow, _ncol = ncol)
-  }
-
-  /** Creates empty DRM with non-trivial height */
-  override def drmParallelizeEmptyLong(nrow: Long, ncol: Int, numPartitions: 
Int = 10)
-                                      (implicit dc: DistributedContext): 
CheckpointedDrm[Long] = {
-
-    val nonParallelResult = (0 to numPartitions).flatMap { part ⇒
-        val partNRow = (nrow - 1) / numPartitions + 1
-        val partStart = partNRow * part
-        val partEnd = Math.min(partStart + partNRow, nrow)
-
-      for (i ← partStart until partEnd) yield (i, new 
RandomAccessSparseVector(ncol): Vector)
-    }
-
-    val result = dc.env.fromCollection(nonParallelResult)
-    new CheckpointedFlinkDrm[Long](ds = result, _nrow = nrow, _ncol = ncol, 
cacheHint = CacheHint.NONE)
-  }
-
-  /**
-   * Convert non-int-keyed matrix to an int-keyed, computing optionally 
mapping from old keys
-   * to row indices in the new one. The mapping, if requested, is returned as 
a 1-column matrix.
-   */
-  def drm2IntKeyed[K](drmX: DrmLike[K], computeMap: Boolean = false): 
(DrmLike[Int], Option[DrmLike[K]]) = {
-    implicit val ktag = drmX.keyClassTag
-    implicit val kTypeInformation = generateTypeInformation[K]
-
-    if (ktag == ClassTag.Int) {
-      drmX.asInstanceOf[DrmLike[Int]] → None
-    } else {
-      val drmXcp = drmX.checkpoint(CacheHint.MEMORY_ONLY)
-      val ncol = drmXcp.asInstanceOf[CheckpointedFlinkDrm[K]].ncol
-      val nrow = drmXcp.asInstanceOf[CheckpointedFlinkDrm[K]].nrow
-
-      // Compute sequential int key numbering.
-      val (intDataset, keyMap) = blas.rekeySeqInts(drmDataSet = drmXcp, 
computeMap = computeMap)
-
-      // Convert computed key mapping to a matrix.
-      val mxKeyMap = keyMap.map { dataSet ⇒
-        datasetWrap(dataSet.map {
-          tuple: (K, Int) => {
-            val ordinal = tuple._2
-            val key = tuple._1
-            key -> (dvec(ordinal): Vector)
-          }
-        })
-      }
-
-      intDataset -> mxKeyMap
-    }
-  }
-
-  /**
-   * (Optional) Sampling operation.
-   */
-  def drmSampleRows[K](drmX: DrmLike[K], fraction: Double, replacement: 
Boolean = false): DrmLike[K] = {
-    implicit val kTag: ClassTag[K] =  drmX.keyClassTag
-    implicit val typeInformation = generateTypeInformation[K]
-
-    val sample = DataSetUtils(drmX.dataset).sample(replacement, fraction)
-
-    val res = if (kTag != ClassTag.Int) {
-      new CheckpointedFlinkDrm[K](sample)
-    }
-    else {
-      blas.rekeySeqInts(new RowsFlinkDrm[K](sample, ncol = drmX.ncol), 
computeMap = false)._1
-        .asInstanceOf[DrmLike[K]]
-    }
-
-    res
-  }
-
-  def drmSampleKRows[K](drmX: DrmLike[K], numSamples:Int, replacement: Boolean 
= false): Matrix = {
-    implicit val kTag: ClassTag[K] =  drmX.keyClassTag
-    implicit val typeInformation = generateTypeInformation[K]
-
-    val sample = DataSetUtils(drmX.dataset).sampleWithSize(replacement, 
numSamples)
-    val sampleArray = sample.collect().toArray
-    val isSparse = sampleArray.exists { case (_, vec) ⇒ !vec.isDense }
-
-    val vectors = sampleArray.map(_._2)
-    val labels = sampleArray.view.zipWithIndex
-      .map { case ((key, _), idx) ⇒ key.toString → (idx: Integer) }.toMap
-
-    val mx: Matrix = if (isSparse) sparse(vectors: _*) else dense(vectors)
-    mx.setRowLabelBindings(labels)
-
-    mx
-  }
-
-  /** Engine-specific all reduce tensor operation. */
-  def allreduceBlock[K](drm: CheckpointedDrm[K], bmf: BlockMapFunc2[K], rf: 
BlockReduceFunc): Matrix = {
-    implicit val kTag: ClassTag[K] = drm.keyClassTag
-    implicit val typeInformation = generateTypeInformation[K]
-
-    val res = drm.asBlockified.ds.map(par => bmf(par)).reduce(rf)
-    res.collect().head
-  }
-
-  def generateTypeInformation[K: ClassTag]: TypeInformation[K] = {
-    implicit val ktag = classTag[K]
-
-    generateTypeInformationFromTag(ktag)
-  }
-
-  private def generateTypeInformationFromTag[K](tag: ClassTag[K]): 
TypeInformation[K] = {
-    if (tag.runtimeClass.equals(classOf[Int])) {
-      createTypeInformation[Int].asInstanceOf[TypeInformation[K]]
-    } else if (tag.runtimeClass.equals(classOf[Long])) {
-      createTypeInformation[Long].asInstanceOf[TypeInformation[K]]
-    } else if (tag.runtimeClass.equals(classOf[String])) {
-      createTypeInformation[String].asInstanceOf[TypeInformation[K]]
-    } else {
-      throw new IllegalArgumentException(s"index type $tag is not supported")
-    }
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/mahout/blob/99a5358f/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAewB.scala
----------------------------------------------------------------------
diff --git 
a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAewB.scala 
b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAewB.scala
deleted file mode 100644
index a3be618..0000000
--- 
a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAewB.scala
+++ /dev/null
@@ -1,60 +0,0 @@
-package org.apache.mahout.flinkbindings.blas
-
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.scala._
-import org.apache.flink.util.Collector
-import org.apache.mahout.flinkbindings.drm.{FlinkDrm, RowsFlinkDrm}
-import org.apache.mahout.math.Vector
-import org.apache.mahout.math.drm.logical.OpAewB
-import org.apache.mahout.math.scalabindings.RLikeOps._
-
-/**
- * Implementation of Flink OpAewB
- */
-object FlinkOpAewB {
-
-  def rowWiseJoinNoSideEffect[K: TypeInformation](op: OpAewB[K], A: 
FlinkDrm[K], B: FlinkDrm[K]): FlinkDrm[K] = {
-    val function = AewBOpsCloning.strToFunction(op.op)
-
-    val rowsA = A.asRowWise.ds
-    val rowsB = B.asRowWise.ds
-    implicit val kTag = op.keyClassTag
-
-    val res: DataSet[(K, Vector)] =
-      rowsA
-        .coGroup(rowsB)
-        .where(0)
-        .equalTo(0) {
-        (left, right, out: Collector[(K, Vector)]) =>
-          (left.toIterable.headOption, right.toIterable.headOption) match {
-            case (Some((idx, a)), Some((_, b))) => out.collect((idx, 
function(a, b)))
-            case (None, Some(b)) => out.collect(b)
-            case (Some(a), None) => out.collect(a)
-            case (None, None) => throw new RuntimeException("At least one side 
of the co group " +
-              "must be non-empty.")
-          }
-      }
-
-
-    new RowsFlinkDrm(res.asInstanceOf[DataSet[(K, Vector)]], ncol=op.ncol)
-  }
-}
-
-
-object AewBOpsCloning {
-  type VectorVectorFunc = (Vector, Vector) => Vector
-
-  def strToFunction(op: String): VectorVectorFunc = op match {
-    case "+" => plus
-    case "-" => minus
-    case "*" => times
-    case "/" => div
-    case _ => throw new IllegalArgumentException(s"Unsupported elementwise 
operator: $op")
-  }
-
-  val plus: VectorVectorFunc = (a, b) => a + b
-  val minus: VectorVectorFunc = (a, b) => a - b
-  val times: VectorVectorFunc = (a, b) => a * b
-  val div: VectorVectorFunc = (a, b) => a / b
-}

http://git-wip-us.apache.org/repos/asf/mahout/blob/99a5358f/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAewScalar.scala
----------------------------------------------------------------------
diff --git 
a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAewScalar.scala
 
b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAewScalar.scala
deleted file mode 100644
index 6b034b8..0000000
--- 
a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAewScalar.scala
+++ /dev/null
@@ -1,106 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.mahout.flinkbindings.blas
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.mahout.flinkbindings.drm.{BlockifiedFlinkDrm, FlinkDrm}
-import org.apache.mahout.math.Matrix
-import org.apache.mahout.math.drm.logical.{AbstractUnaryOp, OpAewScalar, 
TEwFunc}
-import org.apache.mahout.math.scalabindings.RLikeOps._
-
-import scala.collection.JavaConversions._
-import scala.reflect.ClassTag
-
-import org.apache.flink.api.scala._
-
-/**
- * Implementation if Flink OpAewScalar
- */
-object FlinkOpAewScalar {
-
-  final val PROPERTY_AEWB_INPLACE = "mahout.math.AewB.inplace"
-  private def isInplace = System.getProperty(PROPERTY_AEWB_INPLACE, 
"false").toBoolean
-
-  @Deprecated
-  def opScalarNoSideEffect[K: TypeInformation](op: OpAewScalar[K], A: 
FlinkDrm[K], scalar: Double): FlinkDrm[K] = {
-    val function = EWOpsCloning.strToFunction(op.op)
-    implicit val kTag = op.keyClassTag
-
-
-    val res = A.asBlockified.ds.map{
-      tuple => (tuple._1, function(tuple._2, scalar))
-    }
-
-    new BlockifiedFlinkDrm(res, op.ncol)
-  }
-
-  def opUnaryFunction[K: TypeInformation](op: AbstractUnaryOp[K, K] with 
TEwFunc, A: FlinkDrm[K]): FlinkDrm[K] = {
-    val f = op.f
-    val inplace = isInplace
-
-
-    implicit val kTag = op.keyClassTag
-
-    val res = if (op.evalZeros) {
-      A.asBlockified.ds.map{
-        tuple =>
-          val (keys, block) = tuple
-          val newBlock = if (inplace) block else block.cloned
-          newBlock := ((_, _, x) => f(x))
-          (keys, newBlock)
-      }
-    } else {
-      A.asBlockified.ds.map{
-        tuple =>
-          val (keys, block) = tuple
-          val newBlock = if (inplace) block else block.cloned
-          for (row <- newBlock; el <- row.nonZeroes) el := f(el.get)
-          (keys, newBlock)
-      }
-    }
-
-    new BlockifiedFlinkDrm(res, op.ncol)
-
-  }
-
-}
-
-@Deprecated
-object EWOpsCloning {
-
-  type MatrixScalarFunc = (Matrix, Double) => Matrix
-
-  def strToFunction(op: String): MatrixScalarFunc = op match {
-    case "+" => plusScalar
-    case "-" => minusScalar
-    case "*" => timesScalar
-    case "/" => divScalar
-    case "-:" => scalarMinus
-    case "/:" => scalarDiv
-    case _ => throw new IllegalArgumentException(s"Unsupported elementwise 
operator: $op")
-  }
-
-  val plusScalar: MatrixScalarFunc = (A, s) => A + s
-  val minusScalar: MatrixScalarFunc = (A, s) => A - s
-  val scalarMinus: MatrixScalarFunc = (A, s) => s -: A
-  val timesScalar: MatrixScalarFunc = (A, s) => A * s
-  val divScalar: MatrixScalarFunc = (A, s) => A / s
-  val scalarDiv: MatrixScalarFunc = (A, s) => s /: A
-}
-

http://git-wip-us.apache.org/repos/asf/mahout/blob/99a5358f/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAt.scala
----------------------------------------------------------------------
diff --git 
a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAt.scala 
b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAt.scala
deleted file mode 100644
index 5093216..0000000
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAt.scala
+++ /dev/null
@@ -1,72 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.mahout.flinkbindings.blas
-
-import org.apache.flink.api.scala._
-import org.apache.mahout.flinkbindings.drm.{FlinkDrm, RowsFlinkDrm}
-import org.apache.mahout.math.{SequentialAccessSparseVector, Vector}
-import org.apache.mahout.math.drm.logical.OpAt
-import org.apache.mahout.math.scalabindings.RLikeOps._
-
-import scala.Array.canBuildFrom
-
-/**
- * Implementation of Flink At
- */
-object FlinkOpAt {
-
-  /**
-   * The idea here is simple: compile vertical column vectors of every 
partition block as sparse
-   * vectors of the <code>A.nrow</code> length; then group them by their 
column index and sum the
-   * groups into final rows of the transposed matrix.
-   */
-  def sparseTrick(op: OpAt, A: FlinkDrm[Int]): FlinkDrm[Int] = {
-    val ncol = op.ncol // # of rows of A, i.e. # of columns of A^T
-
-    val sparseParts = A.asBlockified.ds.flatMap {
-      blockifiedTuple =>
-        val keys = blockifiedTuple._1
-        val block = blockifiedTuple._2
-
-        (0 until block.ncol).map {
-          columnIndex =>
-            val columnVector: Vector = new SequentialAccessSparseVector(ncol)
-
-            keys.zipWithIndex.foreach {
-              case (key, idx) => columnVector(key) += block(idx, columnIndex)
-            }
-
-            (columnIndex, columnVector)
-        }
-    }
-
-    val regrouped = sparseParts.groupBy(0)
-
-    val sparseTotal = regrouped.reduce{
-      (left, right) =>
-        (left._1, left._2 + right._2)
-    }
-
-    // TODO: densify or not?
-    new RowsFlinkDrm(sparseTotal, ncol)
-  }
-
-
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/mahout/blob/99a5358f/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtA.scala
----------------------------------------------------------------------
diff --git 
a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtA.scala 
b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtA.scala
deleted file mode 100644
index 10f1b92..0000000
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtA.scala
+++ /dev/null
@@ -1,222 +0,0 @@
-/**
-  * Licensed to the Apache Software Foundation (ASF) under one
-  * or more contributor license agreements. See the NOTICE file
-  * distributed with this work for additional information
-  * regarding copyright ownership. The ASF licenses this file
-  * to you under the Apache License, Version 2.0 (the
-  * "License"); you may not use this file except in compliance
-  * with the License. You may obtain a copy of the License at
-  *
-  * http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing,
-  * software distributed under the License is distributed on an
-  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-  * KIND, either express or implied. See the License for the
-  * specific language governing permissions and limitations
-  * under the License.
-  */
-package org.apache.mahout.flinkbindings.blas
-
-import java.lang.Iterable
-
-import org.apache.flink.api.common.functions._
-import org.apache.flink.api.scala._
-import org.apache.flink.configuration.Configuration
-import org.apache.flink.shaded.com.google.common.collect.Lists
-import org.apache.flink.util.Collector
-import org.apache.mahout.flinkbindings._
-import org.apache.mahout.flinkbindings.drm._
-import org.apache.mahout.math.drm.logical.OpAtA
-import org.apache.mahout.math.drm.{BlockifiedDrmTuple, _}
-import org.apache.mahout.math.scalabindings.RLikeOps._
-import org.apache.mahout.math.scalabindings._
-import org.apache.mahout.math.{Matrix, UpperTriangular, _}
-
-import scala.collection.JavaConversions._
-import scala.collection.JavaConverters._
-import scala.collection._
-
-/**
- * Implementation of Flink A' * A
- *
- */
-object FlinkOpAtA {
-
-  final val PROPERTY_ATA_MAXINMEMNCOL = "mahout.math.AtA.maxInMemNCol"
-  final val PROPERTY_ATA_MAXINMEMNCOL_DEFAULT = "200"
-
-  def at_a[K](op: OpAtA[K], A: FlinkDrm[K]): FlinkDrm[Int] = {
-    val maxInMemStr = System.getProperty(PROPERTY_ATA_MAXINMEMNCOL, 
PROPERTY_ATA_MAXINMEMNCOL_DEFAULT)
-    val maxInMemNCol = maxInMemStr.toInt
-    maxInMemNCol.ensuring(_ > 0, "Invalid A'A in-memory setting for optimizer")
-
-    implicit val kTag = A.classTag
-
-    if (op.ncol <= maxInMemNCol) {
-      implicit val ctx = A.context
-      val inCoreAtA = slim(op, A)
-      val result = drmParallelize(inCoreAtA, numPartitions = 1)
-      result
-    } else {
-      fat(op.asInstanceOf[OpAtA[K]], A.asInstanceOf[FlinkDrm[K]])
-    }
-  }
-
-  def slim[K](op: OpAtA[K], A: FlinkDrm[K]): Matrix = {
-    val ds = A.asRowWise.ds
-    val ncol = op.ncol
-
-    // Compute backing vector of tiny-upper-triangular accumulator across all 
the data.
-    val res = ds.mapPartition(pIter => {
-
-      val ut = new UpperTriangular(ncol)
-
-      // Strategy is to add to an outer product of each row to the upper 
triangular accumulator.
-      pIter.foreach({ case (k, v) =>
-
-        // Use slightly various traversal strategies over dense vs. sparse 
source.
-        if (v.isDense) {
-
-          // Update upper-triangular pattern only (due to symmetry).
-          // Note: Scala for-comprehensions are said to be fairly inefficient 
this way, but this is
-          // such spectacular case they were designed for.. Yes I do observe 
some 20% difference
-          // compared to while loops with no other payload, but the other 
paylxcoad is usually much
-          // heavier than this overhead, so... I am keeping this as is for the 
time being.
-
-          for (row <- 0 until v.length; col <- row until v.length)
-            ut(row, col) = ut(row, col) + v(row) * v(col)
-
-        } else {
-
-          // Sparse source.
-          v.nonZeroes().view
-
-            // Outer iterator iterates over rows of outer product.
-            .foreach(elrow => {
-
-            // Inner loop for columns of outer product.
-            v.nonZeroes().view
-
-              // Filter out non-upper nonzero elements from the double loop.
-              .filter(_.index >= elrow.index)
-
-              // Incrementally update outer product value in the uppper 
triangular accumulator.
-              .foreach(elcol => {
-
-              val row = elrow.index
-              val col = elcol.index
-              ut(row, col) = ut(row, col) + elrow.get() * elcol.get()
-
-            })
-          })
-
-        }
-      })
-
-      Iterator(dvec(ddata = ut.getData).asInstanceOf[Vector]: Vector)
-    }).reduce(_ + _).collect()
-
-    new DenseSymmetricMatrix(res.head)
-  }
-
-  def fat[K](op: OpAtA[K], A: FlinkDrm[K]): FlinkDrm[Int] = {
-    val nrow = op.A.nrow
-    val ncol = op.A.ncol
-    val ds = A.asBlockified.ds
-
-    val numberOfPartitions: DataSet[Int] = ds.map(new MapFunction[(Array[K], 
Matrix), Int] {
-      def map(a: (Array[K], Matrix)): Int = 1
-    }).reduce(new ReduceFunction[Int] {
-      def reduce(a: Int, b: Int): Int = a + b
-    })
-
-    val subresults: DataSet[(Int, Matrix)] =
-          ds.flatMap(new RichFlatMapFunction[(Array[K], Matrix), (Int, 
Matrix)] {
-
-      var ranges: Array[Range] = _
-
-      override def open(params: Configuration): Unit = {
-        val runtime = this.getRuntimeContext
-        val dsX: java.util.List[Int] = 
runtime.getBroadcastVariable("numberOfPartitions")
-        val parts = dsX.get(0)
-        val numParts = estimatePartitions(nrow, ncol, parts)
-        ranges = computeEvenSplits(ncol, numParts)
-      }
-
-      def flatMap(tuple: (Array[K], Matrix), out: Collector[(Int, Matrix)]): 
Unit = {
-        val block = tuple._2
-
-        ranges.zipWithIndex.foreach { case (range, idx) => 
-          out.collect(idx -> block(::, range).t %*% block)
-        }
-      }
-
-    }).withBroadcastSet(numberOfPartitions, "numberOfPartitions")
-
-    val res = subresults.groupBy(0)
-                        .reduceGroup(new RichGroupReduceFunction[(Int, 
Matrix), BlockifiedDrmTuple[Int]] {
-
-      var ranges: Array[Range] = _
-
-      override def open(params: Configuration): Unit = {
-        val runtime = this.getRuntimeContext
-        val dsX: java.util.List[Int] = 
runtime.getBroadcastVariable("numberOfPartitions")
-        val parts = dsX.get(0)
-        val numParts = estimatePartitions(nrow, ncol, parts)
-        ranges = computeEvenSplits(ncol, numParts)
-      }
-
-      def reduce(values: Iterable[(Int, Matrix)], out: 
Collector[BlockifiedDrmTuple[Int]]): Unit = {
-        val it = Lists.newArrayList(values).asScala
-        val (blockKey, _) = it.head
-
-        val block = it.map { _._2 }.reduce { (m1, m2) => m1 + m2 }
-        val blockStart = ranges(blockKey).start
-        val rowKeys = Array.tabulate(block.nrow)(blockStart + _)
-
-        out.collect(rowKeys -> block)
-      }
-    }).withBroadcastSet(numberOfPartitions, "numberOfPartitions")
-
-    new BlockifiedFlinkDrm(res, ncol)
-  }
-
-  def estimatePartitions(nrow: Long, ncol: Int, parts:Int): Int = {
-    // per-partition element density 
-    val epp = nrow.toDouble * ncol / parts
-
-    // product partitions
-    val prodParts = nrow * ncol / epp
-
-    val nparts = math.round(prodParts).toInt max 1
-
-    // Constrain nparts to maximum of anrow to prevent guaranteed empty 
partitions.
-    if (nparts > nrow) { 
-      nrow.toInt 
-    } else { 
-      nparts
-    }
-  }
-
-  def computeEvenSplits(nrow: Long, numSplits: Int): Array[Range] = {
-    require(numSplits <= nrow, "Requested amount of splits greater than number 
of data points.")
-    require(nrow >= 1)
-    require(numSplits >= 1)
-
-    // Base split -- what is our base split size?
-    val baseSplit = safeToNonNegInt(nrow / numSplits)
-
-    // Slack -- how many splits will have to be incremented by 1 though?
-    val slack = safeToNonNegInt(nrow % numSplits)
-
-    // Compute ranges. We need to set ranges so that numSplits - slack splits 
have size of baseSplit;
-    // and `slack` splits have size baseSplit + 1. Here is how we do it: 
First, we compute the range
-    // offsets:
-    val offsets = (0 to numSplits).map(i => i * (baseSplit + 1) - (0 max i - 
slack))
-    // And then we connect the ranges using gaps between offsets:
-
-    val ranges = offsets.sliding(2).map { offs => offs(0) until offs(1) }
-    ranges.toArray
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/mahout/blob/99a5358f/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtB.scala
----------------------------------------------------------------------
diff --git 
a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtB.scala 
b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtB.scala
deleted file mode 100644
index c10c2e4..0000000
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtB.scala
+++ /dev/null
@@ -1,88 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.mahout.flinkbindings.blas
-
-import java.lang.Iterable
-
-import com.google.common.collect.Lists
-import org.apache.flink.api.common.functions.{FlatMapFunction, 
GroupReduceFunction}
-import org.apache.flink.api.scala._
-import org.apache.flink.util.Collector
-import org.apache.mahout.flinkbindings._
-import org.apache.mahout.flinkbindings.drm.{BlockifiedFlinkDrm, FlinkDrm}
-import org.apache.mahout.math.{Matrix, Vector}
-import org.apache.mahout.math.drm._
-import org.apache.mahout.math.drm.logical.OpAtB
-import org.apache.mahout.math.scalabindings.RLikeOps._
-
-import scala.collection.JavaConverters.asScalaBufferConverter
-
-/**
- * Implementation of Flink A' * B
- */
-object FlinkOpAtB {
-
-  def notZippable[A](op: OpAtB[A], At: FlinkDrm[A], B: FlinkDrm[A]): 
FlinkDrm[Int] = {
-    val rowsAt = At.asRowWise.ds.asInstanceOf[DrmDataSet[A]]
-    val rowsB = B.asRowWise.ds.asInstanceOf[DrmDataSet[A]]
-    val joined = rowsAt.join(rowsB).where(0).equalTo(0)
-
-    val ncol = op.ncol
-    val nrow = op.nrow.toInt
-    val blockHeight = 10
-    val blockCount = safeToNonNegInt((nrow - 1) / blockHeight + 1)
-
-    val preProduct: DataSet[(Int, Matrix)] =
-             joined.flatMap(new FlatMapFunction[((A, Vector), (A, Vector)), 
(Int, Matrix)] {
-      def flatMap(in: ((A, Vector), (A, Vector)), out: Collector[(Int, 
Matrix)]): Unit = {
-        val avec = in._1._2
-        val bvec = in._2._2
-
-        0.until(blockCount) map { blockKey =>
-          val blockStart = blockKey * blockHeight
-          val blockEnd = Math.min(nrow, blockStart + blockHeight)
-
-          val outer = avec(blockStart until blockEnd) cross bvec
-          out.collect(blockKey -> outer)
-          out
-        }
-      }
-    })
-
-    val res: BlockifiedDrmDataSet[Int] = 
-      preProduct.groupBy(0).reduceGroup(
-        new GroupReduceFunction[(Int, Matrix), BlockifiedDrmTuple[Int]] {
-          def reduce(values: Iterable[(Int, Matrix)], out: 
Collector[BlockifiedDrmTuple[Int]]): Unit = {
-            val it = Lists.newArrayList(values).asScala
-            val (idx, _) = it.head
-
-            val block = it.map { t => t._2 }.reduce { (m1, m2) => m1 + m2 }
-
-            val blockStart = idx * blockHeight
-            val keys = Array.tabulate(block.nrow)(blockStart + _)
-
-            out.collect(keys -> block)
-          }
-        }
-      )
-
-    new BlockifiedFlinkDrm[Int](res, ncol)
-  }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/mahout/blob/99a5358f/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAx.scala
----------------------------------------------------------------------
diff --git 
a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAx.scala 
b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAx.scala
deleted file mode 100644
index 18a3c4b..0000000
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAx.scala
+++ /dev/null
@@ -1,96 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.mahout.flinkbindings.blas
-
-import java.util
-
-import org.apache.flink.api.common.functions.RichMapFunction
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.scala._
-import org.apache.flink.configuration.Configuration
-import org.apache.mahout.flinkbindings.FlinkEngine
-import org.apache.mahout.flinkbindings.drm.{BlockifiedFlinkDrm, FlinkDrm, 
RowsFlinkDrm}
-import org.apache.mahout.math.drm._
-import org.apache.mahout.math.drm.logical.{OpAtx, OpAx}
-import org.apache.mahout.math.scalabindings.RLikeOps._
-import org.apache.mahout.math.scalabindings._
-import org.apache.mahout.math.{Matrix, Vector}
-
-/**
- * Implementation of Flink Ax
- */
-object FlinkOpAx {
-
-  def blockifiedBroadcastAx[K: TypeInformation](op: OpAx[K], A: FlinkDrm[K]): 
FlinkDrm[K] = {
-    implicit val ctx = A.context
-    implicit val kTag = op.keyClassTag
-
-    val singletonDataSetX = ctx.env.fromElements(op.x)
-
-    val out = A.asBlockified.ds.map(new RichMapFunction[(Array[K], Matrix), 
(Array[K], Matrix)] {
-      var x: Vector = null
-
-      override def open(params: Configuration): Unit = {
-        val runtime = this.getRuntimeContext
-        val dsX: util.List[Vector] = runtime.getBroadcastVariable("vector")
-        x = dsX.get(0)
-      }
-
-      override def map(tuple: (Array[K], Matrix)): (Array[K], Matrix) = tuple 
match {
-        case (keys, mat) => (keys, (mat %*% x).toColMatrix)
-      }
-    }).withBroadcastSet(singletonDataSetX, "vector")
-
-    new BlockifiedFlinkDrm(out, op.nrow.toInt)
-  }
-
-
-  def atx_with_broadcast(op: OpAtx, srcA: FlinkDrm[Int]): FlinkDrm[Int] = {
-    implicit val ctx = srcA.context
-
-    val dataSetA = srcA.asBlockified.ds
-
-    // broadcast the vector x to the back end
-    val bcastX = drmBroadcast(op.x)
-
-    implicit val typeInformation = createTypeInformation[(Array[Int],Matrix)]
-    val inCoreM = dataSetA.map {
-      tuple =>
-        tuple._1.zipWithIndex.map {
-          case (key, idx) => tuple._2(idx, ::) * bcastX.value(key)
-        }
-          .reduce(_ += _)
-    }
-      // All-reduce
-      .reduce(_ += _)
-
-      // collect result
-      .collect().head
-
-      // Convert back to mtx
-      .toColMatrix
-
-    // This doesn't do anything now
-    val res = FlinkEngine.parallelize(inCoreM, parallelismDegree = 1)
-
-    new RowsFlinkDrm[Int](res, 1)
-
-  }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/mahout/blob/99a5358f/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpCBind.scala
----------------------------------------------------------------------
diff --git 
a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpCBind.scala 
b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpCBind.scala
deleted file mode 100644
index 0e16fa8..0000000
--- 
a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpCBind.scala
+++ /dev/null
@@ -1,117 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.mahout.flinkbindings.blas
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.scala._
-import org.apache.mahout.flinkbindings.drm._
-import org.apache.mahout.math._
-import org.apache.mahout.math.drm.logical.{OpCbind, OpCbindScalar}
-import org.apache.mahout.math.scalabindings.RLikeOps._
-
-import scala.collection.JavaConversions._
-
-/**
- * Implementation of Flink's cbind
- */
-object FlinkOpCBind {
-
-  def cbind[K: TypeInformation](op: OpCbind[K], A: FlinkDrm[K], B: 
FlinkDrm[K]): FlinkDrm[K] = {
-    val n = op.ncol
-    val n1 = op.A.ncol
-    val n2 = op.B.ncol
-
-    implicit val classTag = op.A.keyClassTag
-
-    val rowsA = A.asRowWise.ds
-    val rowsB = B.asRowWise.ds
-
-    val res: DataSet[(K, Vector)] =
-      rowsA.coGroup(rowsB).where(0).equalTo(0) {
-        (left, right) =>
-          (left.toIterable.headOption, right.toIterable.headOption) match {
-            case (Some((idx, a)), Some((_, b))) =>
-              val result = if (a.isDense && b.isDense) {
-                new DenseVector(n)
-              } else {
-                new SequentialAccessSparseVector(n)
-              }
-
-              result(0 until n1) := a
-              result(n1 until n) := b
-
-              (idx, result)
-            case (Some((idx, a)), None) =>
-              val result: Vector = if (a.isDense) {
-                new DenseVector(n)
-              } else {
-                new SequentialAccessSparseVector(n)
-              }
-              result(n1 until n) := a
-
-              (idx, result)
-            case (None, Some((idx, b))) =>
-              val result: Vector = if (b.isDense) {
-                new DenseVector(n)
-              } else {
-                new SequentialAccessSparseVector(n)
-              }
-              result(n1 until n) := b
-
-              (idx, result)
-            case (None, None) =>
-              throw new RuntimeException("CoGroup should have at least one 
non-empty input.")
-          }
-      }
-
-    new RowsFlinkDrm(res.asInstanceOf[DataSet[(K, Vector)]], ncol=op.ncol)
-  }
-
-  def cbindScalar[K: TypeInformation](op: OpCbindScalar[K], A: FlinkDrm[K], x: 
Double): FlinkDrm[K] = {
-    val left = op.leftBind
-    val ds = A.asBlockified.ds
-
-    implicit val kTag= op.keyClassTag
-
-    def cbind(mat: Matrix, x: Double, left: Boolean): Matrix = {
-      val ncol = mat.ncol
-      val newMat = mat.like(mat.nrow, ncol + 1)
-
-      if (left) {
-        newMat.zip(mat).foreach { case (newVec, origVec) =>
-          newVec(0) = x
-          newVec(1 to ncol) := origVec
-        }
-      } else {
-        newMat.zip(mat).foreach { case (newVec, origVec) =>
-          newVec(ncol) = x
-          newVec(0 until ncol) := origVec
-        }
-      }
-
-      newMat
-    }
-
-    val out = A.asBlockified.ds.map {
-      tuple => (tuple._1, cbind(tuple._2, x, left))
-    }
-
-    new BlockifiedFlinkDrm(out, op.ncol)
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/mahout/blob/99a5358f/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpMapBlock.scala
----------------------------------------------------------------------
diff --git 
a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpMapBlock.scala
 
b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpMapBlock.scala
deleted file mode 100644
index c22fa9a..0000000
--- 
a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpMapBlock.scala
+++ /dev/null
@@ -1,47 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.mahout.flinkbindings.blas
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.scala._
-import org.apache.mahout.flinkbindings.drm.{BlockifiedFlinkDrm, FlinkDrm}
-import org.apache.mahout.math.drm.logical.OpMapBlock
-import org.apache.mahout.math.scalabindings.RLikeOps._
-
-/**
- * Implementation of Flink's MapBlock
- */
-object FlinkOpMapBlock {
-
-  def apply[S, R: TypeInformation](src: FlinkDrm[S], ncol: Int, operator: 
OpMapBlock[S,R]): FlinkDrm[R] = {
-
-    implicit val rtag = operator.keyClassTag
-    val bmf = operator.bmf
-    val ncol = operator.ncol
-    val res = src.asBlockified.ds.map {
-      block =>
-        val result = bmf(block)
-        assert(result._2.nrow == block._2.nrow, "block mapping must return 
same number of rows.")
-        assert(result._2.ncol == ncol, s"block map must return $ncol number of 
columns.")
-        result
-    }
-
-    new BlockifiedFlinkDrm[R](res, ncol)
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/mahout/blob/99a5358f/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpRBind.scala
----------------------------------------------------------------------
diff --git 
a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpRBind.scala 
b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpRBind.scala
deleted file mode 100644
index 685486c..0000000
--- 
a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpRBind.scala
+++ /dev/null
@@ -1,41 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.mahout.flinkbindings.blas
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-
-import org.apache.flink.api.scala.DataSet
-import org.apache.mahout.flinkbindings.drm.FlinkDrm
-import org.apache.mahout.flinkbindings.drm.RowsFlinkDrm
-import org.apache.mahout.math.Vector
-import org.apache.mahout.math.drm.logical.OpRbind
-
-/**
-  * Implementation of RBind
-  */
-object FlinkOpRBind {
-
-  def rbind[K: TypeInformation](op: OpRbind[K], A: FlinkDrm[K], B: 
FlinkDrm[K]): FlinkDrm[K] = {
-    // note that indexes of B are already re-arranged prior to executing this 
code
-    implicit val kTag = op.keyClassTag
-    val res = A.asRowWise.ds.union(B.asRowWise.ds)
-    new RowsFlinkDrm(res.asInstanceOf[DataSet[(K, Vector)]], ncol = op.ncol)
-  }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/mahout/blob/99a5358f/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpRowRange.scala
----------------------------------------------------------------------
diff --git 
a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpRowRange.scala
 
b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpRowRange.scala
deleted file mode 100644
index c002002..0000000
--- 
a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpRowRange.scala
+++ /dev/null
@@ -1,46 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.mahout.flinkbindings.blas
-
-import org.apache.flink.api.scala._
-import org.apache.mahout.flinkbindings.drm.FlinkDrm
-import org.apache.mahout.flinkbindings.drm.RowsFlinkDrm
-import org.apache.mahout.math.drm.logical.OpRowRange
-
-/**
- * Implementation of Flink OpRowRange
- */
-object FlinkOpRowRange {
-
-  def slice(op: OpRowRange, A: FlinkDrm[Int]): FlinkDrm[Int] = {
-    val rowRange = op.rowRange
-    val firstIdx = rowRange.head
-
-    val filtered = A.asRowWise.ds.filter {
-      tuple => rowRange.contains(tuple._1)
-    }
-
-    val res = filtered.map {
-      tuple => (tuple._1 - firstIdx, tuple._2)
-    }
-
-    new RowsFlinkDrm(res, op.ncol)
-  }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/mahout/blob/99a5358f/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpTimesRightMatrix.scala
----------------------------------------------------------------------
diff --git 
a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpTimesRightMatrix.scala
 
b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpTimesRightMatrix.scala
deleted file mode 100644
index a4c5373..0000000
--- 
a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpTimesRightMatrix.scala
+++ /dev/null
@@ -1,81 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.mahout.flinkbindings.blas
-
-import org.apache.flink.api.common.functions.RichMapFunction
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.typeutils.TypeExtractor
-import org.apache.flink.api.scala._
-import org.apache.flink.configuration.Configuration
-import org.apache.mahout.flinkbindings.drm.{BlockifiedFlinkDrm, FlinkDrm}
-import org.apache.mahout.math.drm.logical.OpTimesRightMatrix
-import org.apache.mahout.math.scalabindings.RLikeOps._
-import org.apache.mahout.math.scalabindings._
-import org.apache.mahout.math.{DenseMatrix, Matrix}
-
-/**
- * Implementation of OpTimesRightMatrix:
- */
-object FlinkOpTimesRightMatrix {
-
-  def drmTimesInCore[K: TypeInformation](op: OpTimesRightMatrix[K], A: 
FlinkDrm[K], inCoreB: Matrix): FlinkDrm[K] = {
-    implicit val ctx = A.context
-    implicit val kTag = op.keyClassTag
-
-    /* HACK: broadcasting the matrix using Flink's 
.withBroadcastSet(singletonDataSetB) on a matrix causes a backend Kryo
-     * Issue resulting in a stackOverflow error.
-     * 
-     * Quick fix is to instead break the matrix down into a list of rows and 
then rebuild it on the back end
-     * 
-     * TODO: this is obviously very inefficient... need to use the correct 
broadcast on the matrix itself.
-     */
-    val rows = (0 until inCoreB.nrow).map(i => (i, inCoreB(i, ::)))
-    val dataSetType = TypeExtractor.getForObject(rows.head)
-    val singletonDataSetB = ctx.env.fromCollection(rows)
-
-    val res = A.asBlockified.ds.map(new RichMapFunction[(Array[K], Matrix), 
(Array[K], Matrix)] {
-      var inCoreB: Matrix = _
-
-      override def open(params: Configuration): Unit = {
-        val runtime = this.getRuntimeContext
-        val dsB: java.util.List[(Int, org.apache.mahout.math.Vector)] = 
runtime.getBroadcastVariable("matrix")
-        val m = dsB.size()
-        val n = dsB.get(0)._2.size
-        val isDense = dsB.get(0)._2.isDense
-
-        inCoreB = isDense match {
-          case true => new DenseMatrix(m, n)
-          case false => new DenseMatrix(m, n)
-        }
-        for (i <- 0 until m) {
-          inCoreB(i, ::) := dsB.get(i)._2
-        }
-
-      }
-     
-      override def map(tuple: (Array[K], Matrix)): (Array[K], Matrix) = tuple 
match {
-        case (keys, block_A) => (keys, block_A %*% inCoreB)
-      }
-
-    }).withBroadcastSet(singletonDataSetB, "matrix")
-
-    new BlockifiedFlinkDrm(res, op.ncol)
-  }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/mahout/blob/99a5358f/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/package.scala
----------------------------------------------------------------------
diff --git 
a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/package.scala 
b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/package.scala
deleted file mode 100644
index 32a8cac..0000000
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/package.scala
+++ /dev/null
@@ -1,125 +0,0 @@
-/**
-  * Licensed to the Apache Software Foundation (ASF) under one
-  * or more contributor license agreements. See the NOTICE file
-  * distributed with this work for additional information
-  * regarding copyright ownership. The ASF licenses this file
-  * to you under the Apache License, Version 2.0 (the
-  * "License"); you may not use this file except in compliance
-  * with the License. You may obtain a copy of the License at
-  *
-  * http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing,
-  * software distributed under the License is distributed on an
-  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-  * KIND, either express or implied. See the License for the
-  * specific language governing permissions and limitations
-  * under the License.
-  */
-package org.apache.mahout.flinkbindings
-
-import java.lang.Iterable
-
-import org.apache.flink.api.common.functions.RichMapPartitionFunction
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.utils._
-import org.apache.flink.configuration.Configuration
-import org.apache.flink.util.Collector
-import org.apache.mahout.flinkbindings.drm.FlinkDrm
-import org.apache.mahout.math.drm.DrmLike
-import org.apache.mahout.math.scalabindings.RLikeOps._
-import org.apache.mahout.math.{RandomAccessSparseVector, Vector}
-
-import scala.reflect.ClassTag
-
-package object blas {
-
-  /**
-    * Rekey matrix dataset keys to consecutive int keys.
-    * @param drmDataSet incoming matrix row-wise dataset
-    * @param computeMap if true, also compute mapping between old and new keys
-    * @tparam K existing key parameter
-    * @return
-    */
-  private[flinkbindings] def rekeySeqInts[K: ClassTag: 
TypeInformation](drmDataSet: FlinkDrm[K],
-                                                                 computeMap: 
Boolean = true): (DrmLike[Int],
-    Option[DataSet[(K, Int)]]) = {
-
-    implicit val dc = drmDataSet.context
-
-    val datasetA = drmDataSet.asRowWise.ds
-
-    val ncols = drmDataSet.asRowWise.ncol
-
-    // Flink environment
-    val env = datasetA.getExecutionEnvironment
-
-    // First, compute partition sizes.
-    val partSizes = 
DataSetUtils(datasetA).countElementsPerPartition.collect().toList
-
-    // Starting indices
-    var startInd = new Array[Int](datasetA.getParallelism)
-
-    // Save counts
-    for (pc <- partSizes) startInd(pc._1) = pc._2.toInt
-
-    // compute cumulative sum
-    val cumulativeSum = startInd.scanLeft(0)(_ + _).init
-
-    val vector: Vector = new RandomAccessSparseVector(cumulativeSum.length)
-
-    cumulativeSum.indices.foreach { i => vector(i) = cumulativeSum(i).toDouble 
}
-
-    val bCast = FlinkEngine.drmBroadcast(vector)
-
-    implicit val typeInformation = createTypeInformation[(K, Int)]
-
-    // Compute key -> int index map:
-    val keyMap = if (computeMap) {
-      Some(
-        datasetA.mapPartition(new RichMapPartitionFunction[(K, Vector), (K, 
Int)] {
-
-          // partition number
-          var part: Int = 0
-
-          // get the index of the partition
-          override def open(params: Configuration): Unit = {
-            part = getRuntimeContext.getIndexOfThisSubtask
-          }
-
-          override def mapPartition(iterable: Iterable[(K, Vector)], 
collector: Collector[(K, Int)]): Unit = {
-            val k = iterable.iterator().next._1
-            val si = bCast.value.get(part)
-            collector.collect(k -> (part + si).toInt)
-          }
-        }))
-    } else {
-      None
-    }
-
-    // Finally, do the transform
-    val intDataSet = datasetA
-
-      // Re-number each partition
-      .mapPartition(new RichMapPartitionFunction[(K, Vector), (Int, Vector)] {
-
-        // partition number
-        var part: Int = 0
-
-        // get the index of the partition
-        override def open(params: Configuration): Unit = {
-          part = getRuntimeContext.getIndexOfThisSubtask
-        }
-
-        override def mapPartition(iterable: Iterable[(K, Vector)], collector: 
Collector[(Int, Vector)]): Unit = {
-          val k = iterable.iterator().next._2
-          val si = bCast.value.get(part)
-          collector.collect((part + si).toInt -> k)
-        }
-      })
-
-    // Finally, return drm -> keymap result
-    datasetWrap(intDataSet) -> keyMap
-  }
-}
\ No newline at end of file

Reply via email to