Repository: mahout Updated Branches: refs/heads/flink-binding bf1cdd428 -> 3e5491535
WIP, migrating to Flink 0.10 and the Flink Scala API Project: http://git-wip-us.apache.org/repos/asf/mahout/repo Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/3e549153 Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/3e549153 Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/3e549153 Branch: refs/heads/flink-binding Commit: 3e5491535695048d0f7ada9bd741cf76379eb643 Parents: bf1cdd4 Author: smarthi <[email protected]> Authored: Mon Nov 9 23:13:28 2015 -0500 Committer: smarthi <[email protected]> Committed: Mon Nov 9 23:13:28 2015 -0500 ---------------------------------------------------------------------- .../scala/org/apache/mahout/flinkbindings/FlinkEngine.scala | 6 ++---- .../main/scala/org/apache/mahout/h2obindings/H2OEngine.scala | 2 +- .../test/scala/org/apache/mahout/nlp/tfidf/TFIDFtestBase.scala | 2 +- pom.xml | 2 +- 4 files changed, 5 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mahout/blob/3e549153/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala ---------------------------------------------------------------------- diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala index 3db0319..6b12d11 100644 --- a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala +++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala @@ -64,12 +64,10 @@ object FlinkEngine extends DistributedEngine { implicit val env = dc.asInstanceOf[FlinkDistributedContext].env val metadata = hdfsUtils.readDrmHeader(path) - println(metadata) val unwrapKey = metadata.unwrapKeyFunction - println(unwrapKey) - val dataset = env.readHadoopFile(new SequenceFileInputFormat[Writable, VectorWritable], - classOf[Writable], classOf[VectorWritable], path) + + val dataset = env.readSequenceFile(classOf[Writable], classOf[VectorWritable], path) val res = dataset.map(new MapFunction[Tuple2[Writable, VectorWritable], (Any, Vector)] { def map(tuple: Tuple2[Writable, VectorWritable]): (Any, Vector) = { http://git-wip-us.apache.org/repos/asf/mahout/blob/3e549153/h2o/src/main/scala/org/apache/mahout/h2obindings/H2OEngine.scala ---------------------------------------------------------------------- diff --git a/h2o/src/main/scala/org/apache/mahout/h2obindings/H2OEngine.scala b/h2o/src/main/scala/org/apache/mahout/h2obindings/H2OEngine.scala index 463e9f5..000b292 100644 --- a/h2o/src/main/scala/org/apache/mahout/h2obindings/H2OEngine.scala +++ b/h2o/src/main/scala/org/apache/mahout/h2obindings/H2OEngine.scala @@ -112,7 +112,7 @@ object H2OEngine extends DistributedEngine { case op@OpRowRange(a, r) => RowRange.exec(tr2phys(a)(op.classTagA), r) // Custom operators case blockOp: OpMapBlock[K, _] => MapBlock.exec(tr2phys(blockOp.A)(blockOp.classTagA), blockOp.ncol, blockOp.bmf, - (blockOp.keyClassTag == implicitly[ClassTag[String]]), blockOp.classTagA, blockOp.keyClassTag) + blockOp.keyClassTag == implicitly[ClassTag[String]], blockOp.classTagA, blockOp.keyClassTag) case op@OpPar(a, m, e) => Par.exec(tr2phys(a)(op.classTagA), m, e) case cp: CheckpointedDrm[K] => cp.h2odrm case _ => throw new IllegalArgumentException("Internal:Optimizer has no exec policy for operator %s." http://git-wip-us.apache.org/repos/asf/mahout/blob/3e549153/math-scala/src/test/scala/org/apache/mahout/nlp/tfidf/TFIDFtestBase.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/test/scala/org/apache/mahout/nlp/tfidf/TFIDFtestBase.scala b/math-scala/src/test/scala/org/apache/mahout/nlp/tfidf/TFIDFtestBase.scala index 3ec5ec1..4635e95 100644 --- a/math-scala/src/test/scala/org/apache/mahout/nlp/tfidf/TFIDFtestBase.scala +++ b/math-scala/src/test/scala/org/apache/mahout/nlp/tfidf/TFIDFtestBase.scala @@ -43,7 +43,7 @@ trait TFIDFtestBase extends DistributedMahoutSuite with Matchers { val dictMap = documents.unzip._2.mkString(" ").toLowerCase.split(" ").groupBy(identity).mapValues(_.length) // create a dictionary with an index for each term - val dictIndex = dictMap.zipWithIndex.map(x => x._1._1 -> x._2).toMap + val dictIndex = dictMap.zipWithIndex.map(x => x._1._1 -> x._2) val docFrequencyCount = new Array[Int](dictMap.size) http://git-wip-us.apache.org/repos/asf/mahout/blob/3e549153/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 3adf972..8c77399 100644 --- a/pom.xml +++ b/pom.xml @@ -121,7 +121,7 @@ <scala.compat.version>2.10</scala.compat.version> <scala.version>2.10.4</scala.version> <spark.version>1.3.1</spark.version> - <flink.version>0.10-SNAPSHOT</flink.version> + <flink.version>1.0-SNAPSHOT</flink.version> <h2o.version>0.1.25</h2o.version> </properties> <issueManagement>
