Repository: mahout
Updated Branches:
  refs/heads/flink-binding 1d9b6322e -> e943b0a0d


Removed unused imports


Project: http://git-wip-us.apache.org/repos/asf/mahout/repo
Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/e943b0a0
Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/e943b0a0
Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/e943b0a0

Branch: refs/heads/flink-binding
Commit: e943b0a0df28b20cf93aba8778a02eb68fead6c4
Parents: 1d9b632
Author: smarthi <[email protected]>
Authored: Sat Oct 24 23:51:15 2015 -0400
Committer: smarthi <[email protected]>
Committed: Sat Oct 24 23:51:15 2015 -0400

----------------------------------------------------------------------
 .../org/apache/mahout/flinkbindings/FlinkEngine.scala | 14 ++++----------
 .../apache/mahout/flinkbindings/drm/FlinkDrm.scala    |  5 ++---
 2 files changed, 6 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mahout/blob/e943b0a0/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
----------------------------------------------------------------------
diff --git 
a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala 
b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
index 9820b86..5915c0a 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
@@ -28,8 +28,6 @@ import org.apache.flink.api.common.functions.ReduceFunction
 import org.apache.flink.api.java.tuple.Tuple2
 import org.apache.flink.api.java.typeutils.TypeExtractor
 import org.apache.hadoop.io.Writable
-import org.apache.hadoop.mapred.FileInputFormat
-import org.apache.hadoop.mapred.JobConf
 import org.apache.hadoop.mapred.SequenceFileInputFormat
 import org.apache.mahout.flinkbindings.blas._
 import org.apache.mahout.flinkbindings.drm._
@@ -101,20 +99,19 @@ object FlinkEngine extends DistributedEngine {
   private def flinkTranslate[K: ClassTag](oper: DrmLike[K]): FlinkDrm[K] = 
oper match {
     case op @ OpAx(a, x) => FlinkOpAx.blockifiedBroadcastAx(op, 
flinkTranslate(a)(op.classTagA))
     case op @ OpAt(a) => FlinkOpAt.sparseTrick(op, 
flinkTranslate(a)(op.classTagA))
-    case op @ OpAtx(a, x) => {
+    case op @ OpAtx(a, x) =>
       // express Atx as (A.t) %*% x
-      // TODO: create specific implementation of Atx, see MAHOUT-1749 
+      // TODO: create specific implementation of Atx, see MAHOUT-1749
       val opAt = OpAt(a)
       val at = FlinkOpAt.sparseTrick(opAt, flinkTranslate(a)(op.classTagA))
       val atCast = new CheckpointedFlinkDrm(at.asRowWise.ds, _nrow=opAt.nrow, 
_ncol=opAt.ncol)
       val opAx = OpAx(atCast, x)
       FlinkOpAx.blockifiedBroadcastAx(opAx, 
flinkTranslate(atCast)(op.classTagA))
-    }
     case op @ OpAtB(a, b) => FlinkOpAtB.notZippable(op, 
flinkTranslate(a)(op.classTagA), 
         flinkTranslate(b)(op.classTagA))
-    case op @ OpABt(a, b) => {
+    case op @ OpABt(a, b) =>
       // express ABt via AtB: let C=At and D=Bt, and calculate CtD
-      // TODO: create specific implementation of ABt, see MAHOUT-1750 
+      // TODO: create specific implementation of ABt, see MAHOUT-1750
       val opAt = OpAt(a.asInstanceOf[DrmLike[Int]]) // TODO: casts!
       val at = FlinkOpAt.sparseTrick(opAt, 
flinkTranslate(a.asInstanceOf[DrmLike[Int]]))
       val c = new CheckpointedFlinkDrm(at.asRowWise.ds, _nrow=opAt.nrow, 
_ncol=opAt.ncol)
@@ -125,7 +122,6 @@ object FlinkEngine extends DistributedEngine {
 
       FlinkOpAtB.notZippable(OpAtB(c, d), flinkTranslate(c), flinkTranslate(d))
                 .asInstanceOf[FlinkDrm[K]]
-    }
     case op @ OpAtA(a) => FlinkOpAtA.at_a(op, flinkTranslate(a)(op.classTagA))
     case op @ OpTimesRightMatrix(a, b) => 
       FlinkOpTimesRightMatrix.drmTimesInCore(op, 
flinkTranslate(a)(op.classTagA), b)
@@ -170,8 +166,6 @@ object FlinkEngine extends DistributedEngine {
 
   /** Engine-specific numNonZeroElementsPerColumn implementation based on a 
checkpoint. */
   override def numNonZeroElementsPerColumn[K: ClassTag](drm: 
CheckpointedDrm[K]): Vector = {
-    val n = drm.ncol
-
     val result = drm.asBlockified.ds.map(new MapFunction[(Array[K], Matrix), 
Vector] {
       def map(tuple: (Array[K], Matrix)): Vector = {
         val (_, block) = tuple

http://git-wip-us.apache.org/repos/asf/mahout/blob/e943b0a0/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/FlinkDrm.scala
----------------------------------------------------------------------
diff --git 
a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/FlinkDrm.scala 
b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/FlinkDrm.scala
index d00a335..4a16724 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/FlinkDrm.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/FlinkDrm.scala
@@ -56,7 +56,7 @@ class RowsFlinkDrm[K: ClassTag](val ds: DrmDataSet[K], val 
ncol: Int) extends Fl
 
   def isBlockified = false
 
-  def asBlockified(): BlockifiedFlinkDrm[K] = {
+  def asBlockified : BlockifiedFlinkDrm[K] = {
     val ncolLocal = ncol
     val classTag = implicitly[ClassTag[K]]
 
@@ -100,9 +100,8 @@ class BlockifiedFlinkDrm[K: ClassTag](val ds: 
BlockifiedDrmDataSet[K], val ncol:
     val out = ds.flatMap(new FlatMapFunction[(Array[K], Matrix), DrmTuple[K]] {
       def flatMap(typle: (Array[K], Matrix), out: Collector[DrmTuple[K]]): 
Unit = typle match {
         case (keys, block) => keys.view.zipWithIndex.foreach {
-          case (key, idx) => {
+          case (key, idx) =>
             out.collect((key, block(idx, ::)))
-          }
         }
       }
     })

Reply via email to