Repository: mahout
Updated Branches:
  refs/heads/flink-binding bf1cdd428 -> 3e5491535


WIP, migrating to Flink 0.10 and the Flink Scala API


Project: http://git-wip-us.apache.org/repos/asf/mahout/repo
Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/3e549153
Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/3e549153
Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/3e549153

Branch: refs/heads/flink-binding
Commit: 3e5491535695048d0f7ada9bd741cf76379eb643
Parents: bf1cdd4
Author: smarthi <[email protected]>
Authored: Mon Nov 9 23:13:28 2015 -0500
Committer: smarthi <[email protected]>
Committed: Mon Nov 9 23:13:28 2015 -0500

----------------------------------------------------------------------
 .../scala/org/apache/mahout/flinkbindings/FlinkEngine.scala    | 6 ++----
 .../main/scala/org/apache/mahout/h2obindings/H2OEngine.scala   | 2 +-
 .../test/scala/org/apache/mahout/nlp/tfidf/TFIDFtestBase.scala | 2 +-
 pom.xml                                                        | 2 +-
 4 files changed, 5 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mahout/blob/3e549153/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
----------------------------------------------------------------------
diff --git 
a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala 
b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
index 3db0319..6b12d11 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
@@ -64,12 +64,10 @@ object FlinkEngine extends DistributedEngine {
     implicit val env = dc.asInstanceOf[FlinkDistributedContext].env
 
     val metadata = hdfsUtils.readDrmHeader(path)
-    println(metadata)
 
     val unwrapKey = metadata.unwrapKeyFunction
-    println(unwrapKey)
-    val dataset = env.readHadoopFile(new SequenceFileInputFormat[Writable, 
VectorWritable],
-      classOf[Writable], classOf[VectorWritable], path)
+
+    val dataset = env.readSequenceFile(classOf[Writable], 
classOf[VectorWritable], path)
 
     val res = dataset.map(new MapFunction[Tuple2[Writable, VectorWritable], 
(Any, Vector)] {
       def map(tuple: Tuple2[Writable, VectorWritable]): (Any, Vector) = {

http://git-wip-us.apache.org/repos/asf/mahout/blob/3e549153/h2o/src/main/scala/org/apache/mahout/h2obindings/H2OEngine.scala
----------------------------------------------------------------------
diff --git a/h2o/src/main/scala/org/apache/mahout/h2obindings/H2OEngine.scala 
b/h2o/src/main/scala/org/apache/mahout/h2obindings/H2OEngine.scala
index 463e9f5..000b292 100644
--- a/h2o/src/main/scala/org/apache/mahout/h2obindings/H2OEngine.scala
+++ b/h2o/src/main/scala/org/apache/mahout/h2obindings/H2OEngine.scala
@@ -112,7 +112,7 @@ object H2OEngine extends DistributedEngine {
       case op@OpRowRange(a, r) => RowRange.exec(tr2phys(a)(op.classTagA), r)
       // Custom operators
       case blockOp: OpMapBlock[K, _] => 
MapBlock.exec(tr2phys(blockOp.A)(blockOp.classTagA), blockOp.ncol, blockOp.bmf,
-        (blockOp.keyClassTag == implicitly[ClassTag[String]]), 
blockOp.classTagA, blockOp.keyClassTag)
+        blockOp.keyClassTag == implicitly[ClassTag[String]], 
blockOp.classTagA, blockOp.keyClassTag)
       case op@OpPar(a, m, e) => Par.exec(tr2phys(a)(op.classTagA), m, e)
       case cp: CheckpointedDrm[K] => cp.h2odrm
       case _ => throw new IllegalArgumentException("Internal:Optimizer has no 
exec policy for operator %s."

http://git-wip-us.apache.org/repos/asf/mahout/blob/3e549153/math-scala/src/test/scala/org/apache/mahout/nlp/tfidf/TFIDFtestBase.scala
----------------------------------------------------------------------
diff --git 
a/math-scala/src/test/scala/org/apache/mahout/nlp/tfidf/TFIDFtestBase.scala 
b/math-scala/src/test/scala/org/apache/mahout/nlp/tfidf/TFIDFtestBase.scala
index 3ec5ec1..4635e95 100644
--- a/math-scala/src/test/scala/org/apache/mahout/nlp/tfidf/TFIDFtestBase.scala
+++ b/math-scala/src/test/scala/org/apache/mahout/nlp/tfidf/TFIDFtestBase.scala
@@ -43,7 +43,7 @@ trait TFIDFtestBase extends DistributedMahoutSuite with 
Matchers {
     val dictMap = documents.unzip._2.mkString(" ").toLowerCase.split(" 
").groupBy(identity).mapValues(_.length)
 
     // create a dictionary with an index for each term
-    val dictIndex = dictMap.zipWithIndex.map(x => x._1._1 -> x._2).toMap
+    val dictIndex = dictMap.zipWithIndex.map(x => x._1._1 -> x._2)
 
     val docFrequencyCount = new Array[Int](dictMap.size)
 

http://git-wip-us.apache.org/repos/asf/mahout/blob/3e549153/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 3adf972..8c77399 100644
--- a/pom.xml
+++ b/pom.xml
@@ -121,7 +121,7 @@
     <scala.compat.version>2.10</scala.compat.version>
     <scala.version>2.10.4</scala.version>
     <spark.version>1.3.1</spark.version>
-    <flink.version>0.10-SNAPSHOT</flink.version>
+    <flink.version>1.0-SNAPSHOT</flink.version>
     <h2o.version>0.1.25</h2o.version>
   </properties>
   <issueManagement>

Reply via email to