MAHOUT-1570: Flink: imports cleaned
Project: http://git-wip-us.apache.org/repos/asf/mahout/repo Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/851eebcb Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/851eebcb Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/851eebcb Branch: refs/heads/flink-binding Commit: 851eebcb648752f1a3bff08191d19ce3d6f66ab5 Parents: f66477f Author: Alexey Grigorev <[email protected]> Authored: Wed Jun 24 14:57:14 2015 +0200 Committer: Alexey Grigorev <[email protected]> Committed: Fri Sep 25 17:41:55 2015 +0200 ---------------------------------------------------------------------- .../mahout/flinkbindings/FlinkEngine.scala | 79 ++++++++++++++------ .../mahout/flinkbindings/blas/FlinkOpAewB.scala | 21 +++--- .../mahout/flinkbindings/blas/FlinkOpAt.scala | 30 ++++---- .../mahout/flinkbindings/blas/FlinkOpAtB.scala | 31 ++++---- .../mahout/flinkbindings/blas/FlinkOpAx.scala | 20 +++-- .../flinkbindings/blas/FlinkOpCBind.scala | 20 ++--- .../flinkbindings/blas/FlinkOpMapBlock.scala | 13 +--- .../flinkbindings/blas/FlinkOpRBind.scala | 7 +- .../flinkbindings/blas/FlinkOpRowRange.scala | 6 +- .../blas/FlinkOpTimesRightMatrix.scala | 12 ++- .../drm/CheckpointedFlinkDrm.scala | 47 ++++++------ .../mahout/flinkbindings/drm/FlinkDrm.scala | 28 ++++--- 12 files changed, 168 insertions(+), 146 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mahout/blob/851eebcb/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala ---------------------------------------------------------------------- diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala index 5039d21..3076933 100644 --- a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala +++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala @@ -19,33 +19,66 @@ package org.apache.mahout.flinkbindings import java.util.Collection -import scala.reflect.ClassTag + import scala.collection.JavaConverters._ -import com.google.common.collect._ -import org.apache.mahout.math._ -import org.apache.mahout.math.drm._ -import org.apache.mahout.math.indexeddataset._ -import org.apache.mahout.math.scalabindings._ -import org.apache.mahout.math.scalabindings.RLikeOps._ -import org.apache.mahout.math.drm.DrmTuple -import org.apache.mahout.math.drm.logical._ -import org.apache.mahout.math.indexeddataset.BiDictionary -import org.apache.mahout.flinkbindings._ -import org.apache.mahout.flinkbindings.drm._ -import org.apache.mahout.flinkbindings.blas._ -import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.common.functions._ +import scala.reflect.ClassTag + import org.apache.flink.api.common.functions.MapFunction +import org.apache.flink.api.common.functions.ReduceFunction +import org.apache.flink.api.java.tuple.Tuple2 import org.apache.flink.api.java.typeutils.TypeExtractor -import org.apache.flink.api.scala.DataSet -import org.apache.flink.api.java.io.TypeSerializerInputFormat -import org.apache.flink.api.common.io.SerializedInputFormat +import org.apache.hadoop.io.Writable +import org.apache.hadoop.mapred.FileInputFormat import org.apache.hadoop.mapred.JobConf import org.apache.hadoop.mapred.SequenceFileInputFormat -import org.apache.hadoop.mapred.FileInputFormat -import org.apache.mahout.flinkbindings.io._ -import org.apache.hadoop.io.Writable -import org.apache.flink.api.java.tuple.Tuple2 +import org.apache.mahout.flinkbindings.blas.FlinkOpAewB +import org.apache.mahout.flinkbindings.blas.FlinkOpAewScalar +import org.apache.mahout.flinkbindings.blas.FlinkOpAt +import org.apache.mahout.flinkbindings.blas.FlinkOpAtB +import org.apache.mahout.flinkbindings.blas.FlinkOpAx +import org.apache.mahout.flinkbindings.blas.FlinkOpCBind +import org.apache.mahout.flinkbindings.blas.FlinkOpMapBlock +import org.apache.mahout.flinkbindings.blas.FlinkOpRBind +import org.apache.mahout.flinkbindings.blas.FlinkOpRowRange +import org.apache.mahout.flinkbindings.blas.FlinkOpTimesRightMatrix +import org.apache.mahout.flinkbindings._ +import org.apache.mahout.flinkbindings.drm.CheckpointedFlinkDrm +import org.apache.mahout.flinkbindings.drm.FlinkDrm +import org.apache.mahout.flinkbindings.drm.RowsFlinkDrm +import org.apache.mahout.flinkbindings.io.HDFSUtil +import org.apache.mahout.flinkbindings.io.Hadoop1HDFSUtil +import org.apache.mahout.math.Matrix +import org.apache.mahout.math.Vector +import org.apache.mahout.math.VectorWritable +import org.apache.mahout.math.drm.BCast +import org.apache.mahout.math.drm.BlockMapFunc2 +import org.apache.mahout.math.drm.BlockReduceFunc +import org.apache.mahout.math.drm.CacheHint +import org.apache.mahout.math.drm.CheckpointedDrm +import org.apache.mahout.math.drm.DistributedContext +import org.apache.mahout.math.drm.DistributedEngine +import org.apache.mahout.math.drm.DrmLike +import org.apache.mahout.math.drm.DrmTuple +import org.apache.mahout.math.drm.drm2drmCpOps +import org.apache.mahout.math.drm.logical.OpABt +import org.apache.mahout.math.drm.logical.OpAewB +import org.apache.mahout.math.drm.logical.OpAewScalar +import org.apache.mahout.math.drm.logical.OpAewUnaryFunc +import org.apache.mahout.math.drm.logical.OpAt +import org.apache.mahout.math.drm.logical.OpAtA +import org.apache.mahout.math.drm.logical.OpAtB +import org.apache.mahout.math.drm.logical.OpAtx +import org.apache.mahout.math.drm.logical.OpAx +import org.apache.mahout.math.drm.logical.OpCbind +import org.apache.mahout.math.drm.logical.OpMapBlock +import org.apache.mahout.math.drm.logical.OpRbind +import org.apache.mahout.math.drm.logical.OpRowRange +import org.apache.mahout.math.drm.logical.OpTimesRightMatrix +import org.apache.mahout.math.indexeddataset.BiDictionary +import org.apache.mahout.math.indexeddataset.IndexedDataset +import org.apache.mahout.math.indexeddataset.Schema +import org.apache.mahout.math.scalabindings._ +import org.apache.mahout.math.scalabindings.RLikeOps._ object FlinkEngine extends DistributedEngine { @@ -142,6 +175,8 @@ object FlinkEngine extends DistributedEngine { } case op @ OpTimesRightMatrix(a, b) => FlinkOpTimesRightMatrix.drmTimesInCore(op, flinkTranslate(a)(op.classTagA), b) + case op @ OpAewUnaryFunc(a, f, _) => + FlinkOpAewScalar.opUnaryFunction(op, flinkTranslate(a)(op.classTagA), f) case op @ OpAewScalar(a, scalar, _) => FlinkOpAewScalar.opScalarNoSideEffect(op, flinkTranslate(a)(op.classTagA), scalar) case op @ OpAewB(a, b, _) => http://git-wip-us.apache.org/repos/asf/mahout/blob/851eebcb/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAewB.scala ---------------------------------------------------------------------- diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAewB.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAewB.scala index ed25d08..2b35685 100644 --- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAewB.scala +++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAewB.scala @@ -1,19 +1,20 @@ package org.apache.mahout.flinkbindings.blas -import org.apache.mahout.math.drm.logical.OpAewB +import java.lang.Iterable + +import scala.collection.JavaConverters.asScalaBufferConverter import scala.reflect.ClassTag -import org.apache.mahout.flinkbindings.drm.FlinkDrm -import org.apache.mahout.math.Vector -import org.apache.mahout.math.scalabindings._ -import RLikeOps._ -import org.apache.flink.api.java.DataSet + import org.apache.flink.api.common.functions.CoGroupFunction -import java.lang.Iterable +import org.apache.flink.api.java.DataSet import org.apache.flink.util.Collector -import com.google.common.collect.Lists -import scala.collection.JavaConverters._ -import scala.collection.immutable.Nil +import org.apache.mahout.flinkbindings.drm.FlinkDrm import org.apache.mahout.flinkbindings.drm.RowsFlinkDrm +import org.apache.mahout.math.Vector +import org.apache.mahout.math.drm.logical._ +import org.apache.mahout.math.scalabindings.RLikeOps._ + +import com.google.common.collect.Lists /** * Implementation is inspired by Spark-binding's OpAewB http://git-wip-us.apache.org/repos/asf/mahout/blob/851eebcb/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAt.scala ---------------------------------------------------------------------- diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAt.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAt.scala index 6ba3fd5..ac6837c 100644 --- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAt.scala +++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAt.scala @@ -18,25 +18,23 @@ */ package org.apache.mahout.flinkbindings.blas -import org.apache.mahout.math.drm.logical.OpAt -import org.apache.mahout.flinkbindings.DrmDataSet -import org.apache.mahout.flinkbindings.drm.FlinkDrm +import java.lang.Iterable + +import scala.Array.canBuildFrom +import scala.collection.JavaConverters.asScalaBufferConverter + import org.apache.flink.api.common.functions.FlatMapFunction -import org.apache.mahout.math.Matrix -import scala.reflect.ClassTag -import org.apache.flink.util.Collector -import org.apache.mahout.flinkbindings._ -import org.apache.mahout.math._ -import scalabindings._ -import RLikeOps._ import org.apache.flink.api.common.functions.GroupReduceFunction -import org.apache.mahout.math.drm.DrmTuple -import java.lang.Iterable -import scala.collection.JavaConverters._ -import org.apache.mahout.flinkbindings.drm.RowsFlinkDrm -import org.apache.flink.api.java.functions.KeySelector -import java.util.ArrayList import org.apache.flink.shaded.com.google.common.collect.Lists +import org.apache.flink.util.Collector +import org.apache.mahout.flinkbindings.drm.FlinkDrm +import org.apache.mahout.flinkbindings.drm.RowsFlinkDrm +import org.apache.mahout.math.Matrix +import org.apache.mahout.math.SequentialAccessSparseVector +import org.apache.mahout.math.Vector +import org.apache.mahout.math.drm.DrmTuple +import org.apache.mahout.math.drm.logical.OpAt +import org.apache.mahout.math.scalabindings.RLikeOps._ /** * Implementation is taken from Spark's At http://git-wip-us.apache.org/repos/asf/mahout/blob/851eebcb/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtB.scala ---------------------------------------------------------------------- diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtB.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtB.scala index 462dc4a..297f676 100644 --- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtB.scala +++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtB.scala @@ -18,27 +18,28 @@ */ package org.apache.mahout.flinkbindings.blas +import java.lang.Iterable + +import scala.collection.JavaConverters.asScalaBufferConverter import scala.reflect.ClassTag -import org.apache.mahout.flinkbindings.drm.FlinkDrm -import org.apache.mahout.math.drm.logical.OpAtB + +import org.apache.flink.api.common.functions.FlatMapFunction +import org.apache.flink.api.common.functions.GroupReduceFunction import org.apache.flink.api.common.functions.MapFunction import org.apache.flink.api.java.tuple.Tuple2 -import org.apache.mahout.math.Vector -import org.apache.mahout.math.Matrix -import org.apache.flink.api.common.functions.FlatMapFunction import org.apache.flink.util.Collector -import org.apache.mahout.math.drm._ -import org.apache.mahout.math.scalabindings._ -import RLikeOps._ -import org.apache.flink.api.common.functions.GroupReduceFunction -import java.lang.Iterable -import scala.collection.JavaConverters._ -import com.google.common.collect.Lists -import org.apache.mahout.flinkbindings.drm.BlockifiedFlinkDrm import org.apache.mahout.flinkbindings.BlockifiedDrmDataSet -import org.apache.flink.api.scala._ -import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.mahout.flinkbindings.DrmDataSet +import org.apache.mahout.flinkbindings.drm.BlockifiedFlinkDrm +import org.apache.mahout.flinkbindings.drm.FlinkDrm +import org.apache.mahout.math.Matrix +import org.apache.mahout.math.Vector +import org.apache.mahout.math.drm.BlockifiedDrmTuple +import org.apache.mahout.math.drm.logical.OpAtB +import org.apache.mahout.math.drm.safeToNonNegInt +import org.apache.mahout.math.scalabindings.RLikeOps._ + +import com.google.common.collect.Lists /** http://git-wip-us.apache.org/repos/asf/mahout/blob/851eebcb/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAx.scala ---------------------------------------------------------------------- diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAx.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAx.scala index 72de022..b473a4c 100644 --- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAx.scala +++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAx.scala @@ -18,20 +18,18 @@ */ package org.apache.mahout.flinkbindings.blas +import java.util.List + import scala.reflect.ClassTag -import org.apache.mahout.flinkbindings.drm.FlinkDrm -import org.apache.mahout.flinkbindings._ -import org.apache.mahout.math.drm.drmBroadcast -import org.apache.mahout.math.drm.logical.OpAx -import org.apache.mahout.math.Matrix -import org.apache.flink.api.common.functions.MapFunction -import org.apache.mahout.flinkbindings.drm.BlockifiedFlinkDrm -import org.apache.mahout.math._ -import scalabindings._ -import RLikeOps._ + import org.apache.flink.api.common.functions.RichMapFunction import org.apache.flink.configuration.Configuration -import java.util.List +import org.apache.mahout.flinkbindings.drm.BlockifiedFlinkDrm +import org.apache.mahout.flinkbindings.drm.FlinkDrm +import org.apache.mahout.math.Matrix +import org.apache.mahout.math.Vector +import org.apache.mahout.math.drm.logical.OpAx +import org.apache.mahout.math.scalabindings.RLikeOps._ /** http://git-wip-us.apache.org/repos/asf/mahout/blob/851eebcb/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpCBind.scala ---------------------------------------------------------------------- diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpCBind.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpCBind.scala index 7f6e3fa..88155d6 100644 --- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpCBind.scala +++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpCBind.scala @@ -19,20 +19,22 @@ package org.apache.mahout.flinkbindings.blas import java.lang.Iterable -import scala.reflect.ClassTag + import scala.collection.JavaConverters._ -import org.apache.mahout.math.drm.logical.OpCbind -import org.apache.mahout.flinkbindings.drm.FlinkDrm -import org.apache.mahout.math.Vector -import org.apache.flink.api.java.DataSet +import scala.reflect.ClassTag + import org.apache.flink.api.common.functions.CoGroupFunction +import org.apache.flink.api.java.DataSet import org.apache.flink.util.Collector -import com.google.common.collect.Lists +import org.apache.mahout.flinkbindings.drm.FlinkDrm +import org.apache.mahout.flinkbindings.drm.RowsFlinkDrm import org.apache.mahout.math.DenseVector import org.apache.mahout.math.SequentialAccessSparseVector -import org.apache.mahout.math.scalabindings._ -import RLikeOps._ -import org.apache.mahout.flinkbindings.drm.RowsFlinkDrm +import org.apache.mahout.math.Vector +import org.apache.mahout.math.drm.logical.OpCbind +import org.apache.mahout.math.scalabindings.RLikeOps._ + +import com.google.common.collect.Lists /** http://git-wip-us.apache.org/repos/asf/mahout/blob/851eebcb/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpMapBlock.scala ---------------------------------------------------------------------- diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpMapBlock.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpMapBlock.scala index c8c1fa4..cd745e4 100644 --- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpMapBlock.scala +++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpMapBlock.scala @@ -19,15 +19,13 @@ package org.apache.mahout.flinkbindings.blas import scala.reflect.ClassTag -import org.apache.mahout.math.drm.BlockMapFunc -import org.apache.mahout.flinkbindings.drm.FlinkDrm + import org.apache.flink.api.common.functions.MapFunction import org.apache.mahout.flinkbindings.drm.BlockifiedFlinkDrm - +import org.apache.mahout.flinkbindings.drm.FlinkDrm import org.apache.mahout.math.Matrix -import org.apache.mahout.math._ -import scalabindings._ -import RLikeOps._ +import org.apache.mahout.math.drm.BlockMapFunc +import org.apache.mahout.math.scalabindings.RLikeOps._ /** * Implementation is taken from Spark's MapBlock @@ -36,9 +34,6 @@ import RLikeOps._ object FlinkOpMapBlock { def apply[S, R: ClassTag](src: FlinkDrm[S], ncol: Int, function: BlockMapFunc[S, R]): FlinkDrm[R] = { - - - val res = src.blockify.ds.map(new MapFunction[(Array[S], Matrix), (Array[R], Matrix)] { def map(block: (Array[S], Matrix)): (Array[R], Matrix) = { val out = function(block) http://git-wip-us.apache.org/repos/asf/mahout/blob/851eebcb/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpRBind.scala ---------------------------------------------------------------------- diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpRBind.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpRBind.scala index 0a4d08a..f8fbea0 100644 --- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpRBind.scala +++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpRBind.scala @@ -19,13 +19,12 @@ package org.apache.mahout.flinkbindings.blas import scala.reflect.ClassTag -import org.apache.mahout.math.drm.logical.OpRbind + +import org.apache.flink.api.java.DataSet import org.apache.mahout.flinkbindings.drm.FlinkDrm import org.apache.mahout.flinkbindings.drm.RowsFlinkDrm -import org.apache.mahout.flinkbindings.drm.BlockifiedFlinkDrm -import org.apache.flink.api.common.functions.MapFunction -import org.apache.flink.api.java.DataSet import org.apache.mahout.math.Vector +import org.apache.mahout.math.drm.logical.OpRbind object FlinkOpRBind { http://git-wip-us.apache.org/repos/asf/mahout/blob/851eebcb/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpRowRange.scala ---------------------------------------------------------------------- diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpRowRange.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpRowRange.scala index edae80e..f720125 100644 --- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpRowRange.scala +++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpRowRange.scala @@ -18,12 +18,12 @@ */ package org.apache.mahout.flinkbindings.blas -import org.apache.mahout.math.drm.logical.OpRowRange -import org.apache.mahout.flinkbindings.drm.FlinkDrm import org.apache.flink.api.common.functions.FilterFunction -import org.apache.mahout.math.Vector import org.apache.flink.api.common.functions.MapFunction +import org.apache.mahout.flinkbindings.drm.FlinkDrm import org.apache.mahout.flinkbindings.drm.RowsFlinkDrm +import org.apache.mahout.math.Vector +import org.apache.mahout.math.drm.logical.OpRowRange /** * Implementation is taken from Spark's OpRowRange http://git-wip-us.apache.org/repos/asf/mahout/blob/851eebcb/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpTimesRightMatrix.scala ---------------------------------------------------------------------- diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpTimesRightMatrix.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpTimesRightMatrix.scala index dd96066..92724d8 100644 --- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpTimesRightMatrix.scala +++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpTimesRightMatrix.scala @@ -19,16 +19,14 @@ package org.apache.mahout.flinkbindings.blas import scala.reflect.ClassTag -import org.apache.mahout.math.drm.logical.OpTimesRightMatrix -import org.apache.mahout.flinkbindings.drm.FlinkDrm -import org.apache.mahout.math.Matrix -import org.apache.mahout.math.DiagonalMatrix + import org.apache.flink.api.common.functions.RichMapFunction import org.apache.flink.configuration.Configuration import org.apache.mahout.flinkbindings.drm.BlockifiedFlinkDrm -import org.apache.mahout.math._ -import scalabindings._ -import RLikeOps._ +import org.apache.mahout.flinkbindings.drm.FlinkDrm +import org.apache.mahout.math.Matrix +import org.apache.mahout.math.drm.logical.OpTimesRightMatrix +import org.apache.mahout.math.scalabindings.RLikeOps._ /** * Implementation is taken from Spark's OpTimesRightMatrix: http://git-wip-us.apache.org/repos/asf/mahout/blob/851eebcb/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala ---------------------------------------------------------------------- diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala index 45c944a..ecd8b39 100644 --- a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala +++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala @@ -18,37 +18,34 @@ */ package org.apache.mahout.flinkbindings.drm -import scala.reflect.ClassTag -import org.apache.mahout.math.drm._ -import org.apache.mahout.math.scalabindings._ -import RLikeOps._ -import org.apache.mahout.flinkbindings._ -import org.apache.mahout.math.drm.CheckpointedDrm -import org.apache.mahout.math.Matrix -import org.apache.mahout.flinkbindings.FlinkDistributedContext -import org.apache.flink.api.scala.ExecutionEnvironment -import org.apache.mahout.math.drm.CacheHint -import scala.util.Random -import org.apache.mahout.math.drm.DistributedContext -import org.apache.mahout.math.DenseMatrix -import org.apache.mahout.math.SparseMatrix -import org.apache.flink.api.java.io.LocalCollectionOutputFormat -import java.util.ArrayList import scala.collection.JavaConverters._ +import scala.util.Random +import scala.reflect.ClassTag import org.apache.flink.api.common.functions.MapFunction import org.apache.flink.api.common.functions.ReduceFunction -import org.apache.flink.api.java.DataSet -import org.apache.hadoop.io.Writable +import org.apache.flink.api.java.hadoop.mapred.HadoopOutputFormat +import org.apache.flink.api.java.tuple.Tuple2 import org.apache.hadoop.io.IntWritable -import org.apache.hadoop.io.Text import org.apache.hadoop.io.LongWritable -import org.apache.mahout.math.VectorWritable -import org.apache.mahout.math.Vector -import org.apache.hadoop.mapred.SequenceFileOutputFormat -import org.apache.hadoop.mapred.JobConf +import org.apache.hadoop.io.Text +import org.apache.hadoop.io.Writable import org.apache.hadoop.mapred.FileOutputFormat -import org.apache.flink.api.java.tuple.Tuple2 -import org.apache.flink.api.java.hadoop.mapred.HadoopOutputFormat +import org.apache.hadoop.mapred.JobConf +import org.apache.hadoop.mapred.SequenceFileOutputFormat +import org.apache.mahout.flinkbindings._ +import org.apache.mahout.flinkbindings.DrmDataSet +import org.apache.mahout.math.DenseMatrix +import org.apache.mahout.math.Matrix +import org.apache.mahout.math.SparseMatrix +import org.apache.mahout.math.Vector +import org.apache.mahout.math.VectorWritable +import org.apache.mahout.math.drm._ +import org.apache.mahout.math.drm.CacheHint +import org.apache.mahout.math.drm.CheckpointedDrm +import org.apache.mahout.math.drm.DistributedContext +import org.apache.mahout.math.drm.DrmTuple +import org.apache.mahout.math.scalabindings._ +import org.apache.mahout.math.scalabindings.RLikeOps._ class CheckpointedFlinkDrm[K: ClassTag](val ds: DrmDataSet[K], private var _nrow: Long = CheckpointedFlinkDrm.UNKNOWN, http://git-wip-us.apache.org/repos/asf/mahout/blob/851eebcb/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/FlinkDrm.scala ---------------------------------------------------------------------- diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/FlinkDrm.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/FlinkDrm.scala index c959455..82c0d29 100644 --- a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/FlinkDrm.scala +++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/FlinkDrm.scala @@ -18,27 +18,25 @@ */ package org.apache.mahout.flinkbindings.drm +import java.lang.Iterable + +import scala.collection.JavaConverters.iterableAsScalaIterableConverter +import scala.reflect.ClassTag + import org.apache.flink.api.common.functions.FlatMapFunction -import org.apache.flink.api.java.DataSet +import org.apache.flink.api.common.functions.MapPartitionFunction import org.apache.flink.api.java.ExecutionEnvironment import org.apache.flink.util.Collector +import org.apache.mahout.flinkbindings.BlockifiedDrmDataSet +import org.apache.mahout.flinkbindings.DrmDataSet import org.apache.mahout.flinkbindings.FlinkDistributedContext -import org.apache.mahout.math.Matrix -import org.apache.mahout.math.drm._ -import org.apache.mahout.math.scalabindings._ -import RLikeOps._ -import org.apache.mahout.flinkbindings._ -import org.apache.flink.api.common.functions.MapPartitionFunction -import org.apache.mahout.math.Vector -import java.lang.Iterable -import scala.collection.JavaConverters._ +import org.apache.mahout.flinkbindings.wrapContext import org.apache.mahout.math.DenseMatrix -import scala.reflect.ClassTag +import org.apache.mahout.math.Matrix import org.apache.mahout.math.SparseRowMatrix -import scala.reflect.ClassTag -import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.scala.codegen.TypeInformationGen -import org.apache.flink.api.java.typeutils.TypeExtractor +import org.apache.mahout.math.drm.DrmTuple +import org.apache.mahout.math.scalabindings._ +import org.apache.mahout.math.scalabindings.RLikeOps._ trait FlinkDrm[K] { def executionEnvironment: ExecutionEnvironment
