MAHOUT-1734: Flink: DRM IO
Project: http://git-wip-us.apache.org/repos/asf/mahout/repo Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/0e7b0b46 Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/0e7b0b46 Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/0e7b0b46 Branch: refs/heads/flink-binding Commit: 0e7b0b4623ff802323027a5e6cd4568e60b2a793 Parents: 1806ca8 Author: Alexey Grigorev <[email protected]> Authored: Tue Jun 2 16:04:47 2015 +0200 Committer: Alexey Grigorev <[email protected]> Committed: Fri Sep 25 17:41:49 2015 +0200 ---------------------------------------------------------------------- .../mahout/flinkbindings/DataSetOps.scala | 60 ++++++++ .../mahout/flinkbindings/FlinkEngine.scala | 147 ++++++++++--------- .../mahout/flinkbindings/blas/package.scala | 2 +- .../drm/CheckpointedFlinkDrm.scala | 74 +++++++--- .../mahout/flinkbindings/io/DrmMetadata.scala | 53 +++++++ .../flinkbindings/io/HDFSPathSearch.scala | 82 +++++++++++ .../mahout/flinkbindings/io/HDFSUtil.scala | 33 +++++ .../flinkbindings/io/Hadoop1HDFSUtil.scala | 85 +++++++++++ .../apache/mahout/flinkbindings/package.scala | 50 ++++++- .../flinkbindings/examples/ReadCsvExample.scala | 21 +++ pom.xml | 3 +- 11 files changed, 511 insertions(+), 99 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mahout/blob/0e7b0b46/flink/src/main/scala/org/apache/mahout/flinkbindings/DataSetOps.scala ---------------------------------------------------------------------- diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/DataSetOps.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/DataSetOps.scala new file mode 100644 index 0000000..c7a92c2 --- /dev/null +++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/DataSetOps.scala @@ -0,0 +1,60 @@ +package org.apache.mahout.flinkbindings + +import java.lang.Iterable +import java.util.Collections +import java.util.Comparator +import scala.collection.JavaConverters._ +import org.apache.flink.util.Collector +import org.apache.flink.api.java.DataSet +import org.apache.flink.api.java.tuple.Tuple2 +import org.apache.flink.api.common.functions.RichMapPartitionFunction +import org.apache.flink.configuration.Configuration +import scala.reflect.ClassTag + + +class DataSetOps[K: ClassTag](val ds: DataSet[K]) { + + /** + * Implementation taken from http://stackoverflow.com/questions/30596556/zipwithindex-on-apache-flink + * + * TODO: remove when FLINK-2152 is committed and released + */ + def zipWithIndex(): DataSet[(Long, K)] = { + + // first for each partition count the number of elements - to calculate the offsets + val counts = ds.mapPartition(new RichMapPartitionFunction[K, (Int, Long)] { + override def mapPartition(values: Iterable[K], out: Collector[(Int, Long)]): Unit = { + val cnt: Long = values.asScala.count(_ => true) + val subtaskIdx = getRuntimeContext.getIndexOfThisSubtask + out.collect(subtaskIdx -> cnt) + } + }) + + // then use the offsets to index items of each partition + val zipped = ds.mapPartition(new RichMapPartitionFunction[K, (Long, K)] { + var offset: Long = 0 + + override def open(parameters: Configuration): Unit = { + val offsetsJava: java.util.List[(Int, Long)] = + getRuntimeContext.getBroadcastVariable("counts") + val offsets = offsetsJava.asScala + + val sortedOffsets = + offsets sortBy { case (id, _) => id } map { case (_, cnt) => cnt } + + val subtaskId = getRuntimeContext.getIndexOfThisSubtask + offset = sortedOffsets.take(subtaskId).sum + } + + override def mapPartition(values: Iterable[K], out: Collector[(Long, K)]): Unit = { + val it = values.asScala + it.zipWithIndex.foreach { case (value, idx) => + out.collect((idx + offset, value)) + } + } + }).withBroadcastSet(counts, "counts"); + + zipped + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/mahout/blob/0e7b0b46/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala ---------------------------------------------------------------------- diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala index f174871..1b0464e 100644 --- a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala +++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala @@ -1,66 +1,79 @@ package org.apache.mahout.flinkbindings +import java.util.Collection import scala.reflect.ClassTag -import org.apache.flink.api.scala.DataSet -import org.apache.mahout.flinkbindings._ -import org.apache.mahout.flinkbindings.drm.CheckpointedFlinkDrm +import scala.collection.JavaConverters._ +import com.google.common.collect._ import org.apache.mahout.math._ -import org.apache.mahout.math.Matrix -import org.apache.mahout.math.Vector -import org.apache.mahout.math.drm.BCast -import org.apache.mahout.math.drm.CacheHint -import org.apache.mahout.math.drm.CheckpointedDrm -import org.apache.mahout.math.drm.DistributedContext -import org.apache.mahout.math.drm.DistributedEngine -import org.apache.mahout.math.drm.DrmLike -import org.apache.mahout.math.indexeddataset.DefaultIndexedDatasetElementReadSchema -import org.apache.mahout.math.indexeddataset.DefaultIndexedDatasetReadSchema -import org.apache.mahout.math.indexeddataset.IndexedDataset -import org.apache.mahout.math.indexeddataset.Schema +import org.apache.mahout.math.drm._ +import org.apache.mahout.math.indexeddataset._ import org.apache.mahout.math.scalabindings._ import org.apache.mahout.math.scalabindings.RLikeOps._ -import com.google.common.collect.BiMap -import com.google.common.collect.HashBiMap -import scala.collection.JavaConverters._ -import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.java.typeutils.TypeExtractor import org.apache.mahout.math.drm.DrmTuple -import java.util.Collection -import org.apache.mahout.flinkbindings.drm.FlinkDrm +import org.apache.mahout.math.drm.logical._ +import org.apache.mahout.math.indexeddataset.BiDictionary +import org.apache.mahout.flinkbindings._ +import org.apache.mahout.flinkbindings.drm._ import org.apache.mahout.flinkbindings.blas._ -import org.apache.mahout.math.drm.logical.OpAx -import org.apache.mahout.flinkbindings.drm.RowsFlinkDrm -import org.apache.mahout.math.drm.logical.OpAt -import org.apache.mahout.math.drm.logical.OpAtx -import org.apache.mahout.math.drm.logical.OpAtx -import org.apache.mahout.math.drm.logical.OpAtB -import org.apache.mahout.math.drm.logical.OpABt -import org.apache.mahout.math.drm.logical.OpAtB -import org.apache.mahout.math.drm.logical.OpAtA -import org.apache.mahout.math.drm.logical.OpAewScalar -import org.apache.mahout.math.drm.logical.OpAewB -import org.apache.mahout.math.drm.logical.OpCbind -import org.apache.mahout.math.drm.logical.OpRbind -import org.apache.mahout.math.drm.logical.OpMapBlock -import org.apache.mahout.math.drm.logical.OpRowRange -import org.apache.mahout.math.drm.logical.OpTimesRightMatrix +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.common.functions._ import org.apache.flink.api.common.functions.MapFunction -import org.apache.flink.api.common.functions.ReduceFunction -import org.apache.mahout.math.indexeddataset.BiDictionary +import org.apache.flink.api.java.typeutils.TypeExtractor +import org.apache.flink.api.scala.DataSet +import org.apache.flink.api.java.io.TypeSerializerInputFormat +import org.apache.flink.api.common.io.SerializedInputFormat +import org.apache.hadoop.mapred.JobConf +import org.apache.hadoop.mapred.SequenceFileInputFormat +import org.apache.hadoop.mapred.FileInputFormat +import org.apache.mahout.flinkbindings.io._ +import org.apache.hadoop.io.Writable +import org.apache.flink.api.java.tuple.Tuple2 object FlinkEngine extends DistributedEngine { - /** Second optimizer pass. Translate previously rewritten logical pipeline into physical engine plan. */ + // By default, use Hadoop 1 utils + var hdfsUtils: HDFSUtil = Hadoop1HDFSUtil + + /** + * Load DRM from hdfs (as in Mahout DRM format). + * + * @param path The DFS path to load from + * @param parMin Minimum parallelism after load (equivalent to #par(min=...)). + */ + override def drmDfsRead(path: String, parMin: Int = 0) + (implicit dc: DistributedContext): CheckpointedDrm[_] = { + val metadata = hdfsUtils.readDrmHeader(path) + val unwrapKey = metadata.unwrapKeyFunction + + val job = new JobConf + val hadoopInput = new SequenceFileInputFormat[Writable, VectorWritable] + FileInputFormat.addInputPath(job, new org.apache.hadoop.fs.Path(path)) + + val writables = dc.env.createHadoopInput(hadoopInput, classOf[Writable], classOf[VectorWritable], job) + + val res = writables.map(new MapFunction[Tuple2[Writable, VectorWritable], (Any, Vector)] { + def map(tuple: Tuple2[Writable, VectorWritable]): (Any, Vector) = { + unwrapKey(tuple.f0) -> tuple.f1 + } + }) + + datasetWrap(res)(metadata.keyClassTag.asInstanceOf[ClassTag[Any]]) + } + + override def indexedDatasetDFSRead(src: String, schema: Schema, existingRowIDs: Option[BiDictionary]) + (implicit sc: DistributedContext): IndexedDataset = ??? + + override def indexedDatasetDFSReadElements(src: String,schema: Schema, existingRowIDs: Option[BiDictionary]) + (implicit sc: DistributedContext): IndexedDataset = ??? + + + /** + * Translates logical plan into Flink execution plan. + **/ override def toPhysical[K: ClassTag](plan: DrmLike[K], ch: CacheHint.CacheHint): CheckpointedDrm[K] = { // Flink-specific Physical Plan translation. val drm = flinkTranslate(plan) - - val newcp = new CheckpointedFlinkDrm( - ds = drm.deblockify.ds, - _nrow = plan.nrow, - _ncol = plan.ncol - ) - + val newcp = new CheckpointedFlinkDrm(ds = drm.deblockify.ds, _nrow = plan.nrow, _ncol = plan.ncol) newcp.cache() } @@ -117,11 +130,10 @@ object FlinkEngine extends DistributedEngine { case cp: CheckpointedFlinkDrm[K] => new RowsFlinkDrm(cp.ds, cp.ncol) case _ => throw new NotImplementedError(s"operator $oper is not implemented yet") } - - - def translate[K: ClassTag](oper: DrmLike[K]): DataSet[K] = ??? - /** Engine-specific colSums implementation based on a checkpoint. */ + /** + * returns a vector that contains a column-wise sum from DRM + */ override def colSums[K: ClassTag](drm: CheckpointedDrm[K]): Vector = { val sum = drm.ds.map(new MapFunction[(K, Vector), Vector] { def map(tuple: (K, Vector)): Vector = tuple._2 @@ -129,18 +141,23 @@ object FlinkEngine extends DistributedEngine { def reduce(v1: Vector, v2: Vector) = v1 + v2 }) - val list = CheckpointedFlinkDrm.flinkCollect(sum, "FlinkEngine colSums()") + val list = sum.collect.asScala.toList list.head } /** Engine-specific numNonZeroElementsPerColumn implementation based on a checkpoint. */ override def numNonZeroElementsPerColumn[K: ClassTag](drm: CheckpointedDrm[K]): Vector = ??? - /** Engine-specific colMeans implementation based on a checkpoint. */ + /** + * returns a vector that contains a column-wise mean from DRM + */ override def colMeans[K: ClassTag](drm: CheckpointedDrm[K]): Vector = { drm.colSums() / drm.nrow } + /** + * Calculates the element-wise squared norm of a matrix + */ override def norm[K: ClassTag](drm: CheckpointedDrm[K]): Double = { val sumOfSquares = drm.ds.map(new MapFunction[(K, Vector), Double] { def map(tuple: (K, Vector)): Double = tuple match { @@ -150,7 +167,7 @@ object FlinkEngine extends DistributedEngine { def reduce(v1: Double, v2: Double) = v1 + v2 }) - val list = CheckpointedFlinkDrm.flinkCollect(sumOfSquares, "FlinkEngine norm()") + val list = sumOfSquares.collect.asScala.toList list.head } @@ -160,29 +177,21 @@ object FlinkEngine extends DistributedEngine { /** Broadcast support */ override def drmBroadcast(m: Matrix)(implicit dc: DistributedContext): BCast[Matrix] = ??? - /** - * Load DRM from hdfs (as in Mahout DRM format). - * <P/> - * @param path The DFS path to load from - * @param parMin Minimum parallelism after load (equivalent to #par(min=...)). - */ - override def drmDfsRead(path: String, parMin: Int = 0) - (implicit sc: DistributedContext): CheckpointedDrm[_] = ??? /** Parallelize in-core matrix as spark distributed matrix, using row ordinal indices as data set keys. */ override def drmParallelizeWithRowIndices(m: Matrix, numPartitions: Int = 1) - (implicit sc: DistributedContext): CheckpointedDrm[Int] = { + (implicit dc: DistributedContext): CheckpointedDrm[Int] = { val parallelDrm = parallelize(m, numPartitions) new CheckpointedFlinkDrm(ds=parallelDrm, _nrow=m.numRows(), _ncol=m.numCols()) } private[flinkbindings] def parallelize(m: Matrix, parallelismDegree: Int) - (implicit sc: DistributedContext): DrmDataSet[Int] = { + (implicit dc: DistributedContext): DrmDataSet[Int] = { val rows = (0 until m.nrow).map(i => (i, m(i, ::))) val rowsJava: Collection[DrmTuple[Int]] = rows.asJava val dataSetType = TypeExtractor.getForObject(rows.head) - sc.env.fromCollection(rowsJava, dataSetType).setParallelism(parallelismDegree) + dc.env.fromCollection(rowsJava, dataSetType).setParallelism(parallelismDegree) } /** Parallelize in-core matrix as spark distributed matrix, using row labels as a data set keys. */ @@ -196,10 +205,4 @@ object FlinkEngine extends DistributedEngine { /** Creates empty DRM with non-trivial height */ override def drmParallelizeEmptyLong(nrow: Long, ncol: Int, numPartitions: Int = 10) (implicit sc: DistributedContext): CheckpointedDrm[Long] = ??? - - override def indexedDatasetDFSRead(src: String, schema: Schema, existingRowIDs: Option[BiDictionary]) - (implicit sc: DistributedContext): IndexedDataset = ??? - - override def indexedDatasetDFSReadElements(src: String,schema: Schema, existingRowIDs: Option[BiDictionary]) - (implicit sc: DistributedContext): IndexedDataset = ??? } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/mahout/blob/0e7b0b46/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/package.scala ---------------------------------------------------------------------- diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/package.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/package.scala index af5ccc8..fb154e4 100644 --- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/package.scala +++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/package.scala @@ -11,5 +11,5 @@ package object blas { def tuple_1[K: ClassTag] = new KeySelector[(Int, K), Integer] { def getKey(tuple: Tuple2[Int, K]): Integer = tuple._1 } - + } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/mahout/blob/0e7b0b46/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala ---------------------------------------------------------------------- diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala index e7d9dcd..0df75ca 100644 --- a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala +++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala @@ -20,13 +20,24 @@ import scala.collection.JavaConverters._ import org.apache.flink.api.common.functions.MapFunction import org.apache.flink.api.common.functions.ReduceFunction import org.apache.flink.api.java.DataSet +import org.apache.hadoop.io.Writable +import org.apache.hadoop.io.IntWritable +import org.apache.hadoop.io.Text +import org.apache.hadoop.io.LongWritable +import org.apache.mahout.math.VectorWritable +import org.apache.mahout.math.Vector +import org.apache.hadoop.mapred.SequenceFileOutputFormat +import org.apache.hadoop.mapred.JobConf +import org.apache.hadoop.mapred.FileOutputFormat +import org.apache.flink.api.java.tuple.Tuple2 +import org.apache.flink.api.java.hadoop.mapred.HadoopOutputFormat class CheckpointedFlinkDrm[K: ClassTag](val ds: DrmDataSet[K], - private var _nrow: Long = CheckpointedFlinkDrm.UNKNOWN, - private var _ncol: Int = CheckpointedFlinkDrm.UNKNOWN, - // private val _cacheStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY, - override protected[mahout] val partitioningTag: Long = Random.nextLong(), - private var _canHaveMissingRows: Boolean = false) extends CheckpointedDrm[K] { + private var _nrow: Long = CheckpointedFlinkDrm.UNKNOWN, + private var _ncol: Int = CheckpointedFlinkDrm.UNKNOWN, + override protected[mahout] val partitioningTag: Long = Random.nextLong(), + private var _canHaveMissingRows: Boolean = false + ) extends CheckpointedDrm[K] { lazy val nrow: Long = if (_nrow >= 0) _nrow else computeNRow lazy val ncol: Int = if (_ncol >= 0) _ncol else computeNCol @@ -38,7 +49,7 @@ class CheckpointedFlinkDrm[K: ClassTag](val ds: DrmDataSet[K], def reduce(a1: Long, a2: Long) = a1 + a2 }) - val list = CheckpointedFlinkDrm.flinkCollect(count, "CheckpointedFlinkDrm computeNRow()") + val list = count.collect().asScala.toList list.head } @@ -49,7 +60,7 @@ class CheckpointedFlinkDrm[K: ClassTag](val ds: DrmDataSet[K], def reduce(a1: Int, a2: Int) = Math.max(a1, a2) }) - val list = CheckpointedFlinkDrm.flinkCollect(max, "CheckpointedFlinkDrm computeNCol()") + val list = max.collect().asScala.toList list.head } @@ -69,7 +80,7 @@ class CheckpointedFlinkDrm[K: ClassTag](val ds: DrmDataSet[K], def checkpoint(cacheHint: CacheHint.CacheHint): CheckpointedDrm[K] = this def collect: Matrix = { - val data = CheckpointedFlinkDrm.flinkCollect(ds, "Checkpointed Flink Drm collect()") + val data = ds.collect().asScala.toList val isDense = data.forall(_._2.isDense) val m = if (isDense) { @@ -98,7 +109,42 @@ class CheckpointedFlinkDrm[K: ClassTag](val ds: DrmDataSet[K], m } - def dfsWrite(path: String) = ??? + def dfsWrite(path: String): Unit = { + val env = ds.getExecutionEnvironment + + val keyTag = implicitly[ClassTag[K]] + val convertKey = keyToWritableFunc(keyTag) + + val writableDataset = ds.map(new MapFunction[(K, Vector), Tuple2[Writable, VectorWritable]] { + def map(tuple: (K, Vector)): Tuple2[Writable, VectorWritable] = tuple match { + case (idx, vec) => new Tuple2(convertKey(idx), new VectorWritable(vec)) + } + }) + + val job = new JobConf + val sequenceFormat = new SequenceFileOutputFormat[Writable, VectorWritable] + FileOutputFormat.setOutputPath(job, new org.apache.hadoop.fs.Path(path)) + + val hadoopOutput = new HadoopOutputFormat(sequenceFormat, job) + writableDataset.output(hadoopOutput) + + env.execute(s"dfsWrite($path)") + } + + private def keyToWritableFunc[K: ClassTag](keyTag: ClassTag[K]): (K) => Writable = { + if (keyTag.runtimeClass == classOf[Int]) { + (x: K) => new IntWritable(x.asInstanceOf[Int]) + } else if (keyTag.runtimeClass == classOf[String]) { + (x: K) => new Text(x.asInstanceOf[String]) + } else if (keyTag.runtimeClass == classOf[Long]) { + (x: K) => new LongWritable(x.asInstanceOf[Long]) + } else if (classOf[Writable].isAssignableFrom(keyTag.runtimeClass)) { + (x: K) => x.asInstanceOf[Writable] + } else { + throw new IllegalArgumentException("Do not know how to convert class tag %s to Writable.".format(keyTag)) + } + } + def newRowCardinality(n: Int): CheckpointedDrm[K] = ??? override val context: DistributedContext = ds.getExecutionEnvironment @@ -108,14 +154,4 @@ class CheckpointedFlinkDrm[K: ClassTag](val ds: DrmDataSet[K], object CheckpointedFlinkDrm { val UNKNOWN = -1 - // needed for backwards compatibility with flink 0.8.1 - def flinkCollect[K](dataset: DataSet[K], jobName: String = "flinkCollect()"): List[K] = { - val dataJavaList = new ArrayList[K] - val outputFormat = new LocalCollectionOutputFormat[K](dataJavaList) - dataset.output(outputFormat) - val data = dataJavaList.asScala - dataset.getExecutionEnvironment.execute(jobName) - data.toList - } - } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/mahout/blob/0e7b0b46/flink/src/main/scala/org/apache/mahout/flinkbindings/io/DrmMetadata.scala ---------------------------------------------------------------------- diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/io/DrmMetadata.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/io/DrmMetadata.scala new file mode 100644 index 0000000..6efe99b --- /dev/null +++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/io/DrmMetadata.scala @@ -0,0 +1,53 @@ +package org.apache.mahout.flinkbindings.io + +import scala.reflect.ClassTag +import org.apache.hadoop.io._ +import java.util.Arrays + +/** + * Copied from /spark/src/main/scala/org/apache/mahout/common + */ +class DrmMetadata( + + /** Writable key type as a sub-type of Writable */ + val keyTypeWritable: Class[_], + + /** Value writable type, as a sub-type of Writable */ + val valueTypeWritable: Class[_]) { + + import DrmMetadata._ + + /** + * @param keyClassTag: Actual drm key class tag once converted out of writable + * @param keyW2ValFunc: Conversion from Writable to value type of the DRM key + */ + val (keyClassTag: ClassTag[_], unwrapKeyFunction: ((Writable) => Any)) = keyTypeWritable match { + case cz if (cz == classOf[IntWritable]) => ClassTag.Int -> w2int _ + case cz if (cz == classOf[LongWritable]) => ClassTag.Long -> w2long _ + case cz if (cz == classOf[DoubleWritable]) => ClassTag.Double -> w2double _ + case cz if (cz == classOf[FloatWritable]) => ClassTag.Float -> w2float _ + case cz if (cz == classOf[Text]) => ClassTag(classOf[String]) -> w2string _ + case cz if (cz == classOf[BooleanWritable]) => ClassTag(classOf[Boolean]) -> w2bool _ + case cz if (cz == classOf[BytesWritable]) => ClassTag(classOf[Array[Byte]]) -> w2bytes _ + case _ => throw new IllegalArgumentException(s"Unsupported DRM key type:${keyTypeWritable.getName}") + } + +} + +object DrmMetadata { + + private[io] def w2int(w: Writable) = w.asInstanceOf[IntWritable].get() + + private[io] def w2long(w: Writable) = w.asInstanceOf[LongWritable].get() + + private[io] def w2double(w: Writable) = w.asInstanceOf[DoubleWritable].get() + + private[io] def w2float(w: Writable) = w.asInstanceOf[FloatWritable].get() + + private[io] def w2string(w: Writable) = w.asInstanceOf[Text].toString() + + private[io] def w2bool(w: Writable) = w.asInstanceOf[BooleanWritable].get() + + private[io] def w2bytes(w: Writable) = Arrays.copyOf(w.asInstanceOf[BytesWritable].getBytes(), + w.asInstanceOf[BytesWritable].getLength()) +} http://git-wip-us.apache.org/repos/asf/mahout/blob/0e7b0b46/flink/src/main/scala/org/apache/mahout/flinkbindings/io/HDFSPathSearch.scala ---------------------------------------------------------------------- diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/io/HDFSPathSearch.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/io/HDFSPathSearch.scala new file mode 100644 index 0000000..fc97234 --- /dev/null +++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/io/HDFSPathSearch.scala @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.flinkbindings.io + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileStatus, FileSystem, Path} + +/** + * Returns a [[java.lang.String]], which is comma delimited list of URIs discovered based on parameters + * in the constructor. + * The String is formatted to be input into [[org.apache.spark.SparkContext#textFile()]] + * @param pathURI Where to start looking for inFiles, may be a list of comma delimited URIs + * @param filePattern regex that must match the entire filename to have the file returned + * @param recursive true traverses the filesystem recursively, default = false + * + * Copied from /spark/src/main/scala/org/apache/mahout/common + */ +case class HDFSPathSearch(pathURI: String, filePattern: String = "", recursive: Boolean = false) { + + val conf = new Configuration() + val fs = FileSystem.get(conf) + + /** + * Returns a string of comma delimited URIs matching the filePattern + * When pattern matching dirs are never returned, only traversed. + */ + def uris: String = { + if (!filePattern.isEmpty){ // have file pattern so + val pathURIs = pathURI.split(",") + var files = "" + for ( uri <- pathURIs ){ + files = findFiles(uri, filePattern, files) + } + if (files.length > 0 && files.endsWith(",")) files = files.dropRight(1) // drop the last comma + files + }else{ + pathURI + } + } + + /** + * Find matching files in the dir, recursively call self when another directory is found + * Only files are matched, directories are traversed but never return a match + */ + private def findFiles(dir: String, filePattern: String = ".*", files: String = ""): String = { + val seed = fs.getFileStatus(new Path(dir)) + var f: String = files + + if (seed.isDir) { + val fileStatuses: Array[FileStatus] = fs.listStatus(new Path(dir)) + for (fileStatus <- fileStatuses) { + if (fileStatus.getPath().getName().matches(filePattern) + && !fileStatus.isDir) { + // found a file + if (fileStatus.getLen() != 0) { + // file is not empty + f = f + fileStatus.getPath.toUri.toString + "," + } + } else if (fileStatus.isDir && recursive) { + f = findFiles(fileStatus.getPath.toString, filePattern, f) + } + } + } else { f = dir }// was a filename not dir + f + } + +} http://git-wip-us.apache.org/repos/asf/mahout/blob/0e7b0b46/flink/src/main/scala/org/apache/mahout/flinkbindings/io/HDFSUtil.scala ---------------------------------------------------------------------- diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/io/HDFSUtil.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/io/HDFSUtil.scala new file mode 100644 index 0000000..7629385 --- /dev/null +++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/io/HDFSUtil.scala @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.flinkbindings.io + +/** + * High level Hadoop version-specific hdfs manipulations we need in context of our operations. + * + * Copied from /spark/src/main/scala/org/apache/mahout/common + */ +trait HDFSUtil { + + /** + * Read DRM header information off (H)DFS. + */ + def readDrmHeader(path: String): DrmMetadata + +} + http://git-wip-us.apache.org/repos/asf/mahout/blob/0e7b0b46/flink/src/main/scala/org/apache/mahout/flinkbindings/io/Hadoop1HDFSUtil.scala ---------------------------------------------------------------------- diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/io/Hadoop1HDFSUtil.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/io/Hadoop1HDFSUtil.scala new file mode 100644 index 0000000..120edb4 --- /dev/null +++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/io/Hadoop1HDFSUtil.scala @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.flinkbindings.io + +import org.apache.hadoop.io.{ Writable, SequenceFile } +import org.apache.hadoop.fs.{ FileSystem, Path } +import org.apache.hadoop.conf.Configuration +import collection._ +import JavaConversions._ + +/** + * Deprecated Hadoop 1 api which we currently explicitly import via Mahout dependencies. May not work + * with Hadoop 2.0 + * + * Copied from /spark/src/main/scala/org/apache/mahout/common + */ +object Hadoop1HDFSUtil extends HDFSUtil { + + /** + * Read the header of a sequence file and determine the Key and Value type + * @param path + * @return + */ + def readDrmHeader(path: String): DrmMetadata = { + val dfsPath = new Path(path) + val fs = dfsPath.getFileSystem(new Configuration()) + + val partFilePath: Path = fs.listStatus(dfsPath) + + // Filter out anything starting with . + .filter { s => + !s.getPath.getName.startsWith("\\.") && !s.getPath.getName.startsWith("_") && !s.isDir + } + + // Take path + .map(_.getPath) + + // Take only one, if any + .headOption + + // Require there's at least one partition file found. + .getOrElse { + throw new IllegalArgumentException(s"No partition files found in ${dfsPath.toString}.") + } + + val reader = new SequenceFile.Reader(fs, partFilePath, fs.getConf) + try { + new DrmMetadata( + keyTypeWritable = reader.getKeyClass.asSubclass(classOf[Writable]), + valueTypeWritable = reader.getValueClass.asSubclass(classOf[Writable])) + } finally { + reader.close() + } + + } + + /** + * Delete a path from the filesystem + * @param path + */ + def delete(path: String) { + val dfsPath = new Path(path) + val fs = dfsPath.getFileSystem(new Configuration()) + + if (fs.exists(dfsPath)) { + fs.delete(dfsPath, true) + } + } + +} http://git-wip-us.apache.org/repos/asf/mahout/blob/0e7b0b46/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala ---------------------------------------------------------------------- diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala index 0b26781..6f04551 100644 --- a/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala +++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala @@ -1,17 +1,23 @@ package org.apache.mahout +import scala.reflect.ClassTag +import org.slf4j.LoggerFactory import org.apache.flink.api.java.DataSet import org.apache.flink.api.java.ExecutionEnvironment +import org.apache.flink.api.common.functions.MapFunction +import org.apache.mahout.math.Vector +import org.apache.mahout.math.DenseVector +import org.apache.mahout.math.Matrix +import org.apache.mahout.math.MatrixWritable +import org.apache.mahout.math.VectorWritable +import org.apache.mahout.math.drm._ +import org.apache.mahout.math.scalabindings._ import org.apache.mahout.flinkbindings.FlinkDistributedContext +import org.apache.mahout.flinkbindings.drm.FlinkDrm import org.apache.mahout.flinkbindings.drm.BlockifiedFlinkDrm import org.apache.mahout.flinkbindings.drm.RowsFlinkDrm -import org.apache.mahout.math.drm._ -import org.slf4j.LoggerFactory -import scala.reflect.ClassTag -import org.apache.mahout.flinkbindings.drm.FlinkDrm import org.apache.mahout.flinkbindings.drm.CheckpointedFlinkDrm -import org.apache.mahout.flinkbindings.drm.FlinkDrm -import org.apache.mahout.flinkbindings.drm.RowsFlinkDrm +import org.apache.flink.api.common.functions.FilterFunction package object flinkbindings { @@ -46,4 +52,36 @@ package object flinkbindings { new RowsFlinkDrm[K](flinkDrm.ds, flinkDrm.ncol) } + private[flinkbindings] implicit def wrapAsWritable(m: Matrix): MatrixWritable = new MatrixWritable(m) + private[flinkbindings] implicit def wrapAsWritable(v: Vector): VectorWritable = new VectorWritable(v) + private[flinkbindings] implicit def unwrapFromWritable(w: MatrixWritable): Matrix = w.get() + private[flinkbindings] implicit def unwrapFromWritable(w: VectorWritable): Vector = w.get() + + def readCsv(file: String, delim: String = ",", comment: String = "#") + (implicit dc: DistributedContext): CheckpointedDrm[Long] = { + val vectors = dc.env.readTextFile(file) + .filter(new FilterFunction[String] { + def filter(in: String): Boolean = { + !in.startsWith(comment) + } + }) + .map(new MapFunction[String, Vector] { + def map(in: String): Vector = { + val array = in.split(delim).map(_.toDouble) + new DenseVector(array) + } + }) + datasetToDrm(vectors) + } + + def datasetToDrm(ds: DataSet[Vector]): CheckpointedDrm[Long] = { + val zipped = new DataSetOps(ds).zipWithIndex + datasetWrap(zipped) + } + + def datasetWrap[K: ClassTag](dataset: DataSet[(K, Vector)]): CheckpointedDrm[K] = { + new CheckpointedFlinkDrm[K](dataset) + } + + } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/mahout/blob/0e7b0b46/flink/src/test/scala/org/apache/mahout/flinkbindings/examples/ReadCsvExample.scala ---------------------------------------------------------------------- diff --git a/flink/src/test/scala/org/apache/mahout/flinkbindings/examples/ReadCsvExample.scala b/flink/src/test/scala/org/apache/mahout/flinkbindings/examples/ReadCsvExample.scala new file mode 100644 index 0000000..27b17d0 --- /dev/null +++ b/flink/src/test/scala/org/apache/mahout/flinkbindings/examples/ReadCsvExample.scala @@ -0,0 +1,21 @@ +package org.apache.mahout.flinkbindings.examples + +import org.apache.flink.api.java.ExecutionEnvironment +import org.apache.mahout.math.drm._ +import org.apache.mahout.math.drm.RLikeDrmOps._ +import org.apache.mahout.flinkbindings._ + +object ReadCsvExample { + + def main(args: Array[String]): Unit = { + val filePath = "file:///c:/tmp/data/slashdot0902/Slashdot0902.txt" + + val env = ExecutionEnvironment.getExecutionEnvironment + implicit val ctx = new FlinkDistributedContext(env) + + val drm = readCsv(filePath, delim = "\t", comment = "#") + val C = drm.t %*% drm + println(C.collect) + } + +} http://git-wip-us.apache.org/repos/asf/mahout/blob/0e7b0b46/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 67d2eee..4e7cecd 100644 --- a/pom.xml +++ b/pom.xml @@ -121,7 +121,8 @@ <scala.compat.version>2.10</scala.compat.version> <scala.version>2.10.4</scala.version> <spark.version>1.3.1</spark.version> - <flink.version>0.9.0-milestone-1</flink.version> + <!-- TODO: Remove snapshot dependency when Flink 0.9.1 is released --> + <flink.version>0.9-SNAPSHOT</flink.version> <h2o.version>0.1.25</h2o.version> </properties> <issueManagement>
