Repository: mahout Updated Branches: refs/heads/flink-binding [created] c72698aca
MAHOUT-1570: initial skeleton for Mahout DSL on Apache Flink Project: http://git-wip-us.apache.org/repos/asf/mahout/repo Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/bb4c4bca Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/bb4c4bca Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/bb4c4bca Branch: refs/heads/flink-binding Commit: bb4c4bcaf452d67b75b3c8d7c500cca6aeb31036 Parents: 9d14053 Author: Alexey Grigorev <[email protected]> Authored: Thu Apr 9 11:54:01 2015 +0200 Committer: Alexey Grigorev <[email protected]> Committed: Fri Sep 25 17:41:36 2015 +0200 ---------------------------------------------------------------------- flink/pom.xml | 176 +++++++++++++++++++ .../flinkbindings/FlinkDistributedContext.scala | 31 ++++ .../mahout/flinkbindings/FlinkEngine.scala | 115 ++++++++++++ .../apache/mahout/flinkbindings/package.scala | 11 ++ .../mahout/math/drm/DistributedContext.scala | 2 +- pom.xml | 8 +- 6 files changed, 341 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mahout/blob/bb4c4bca/flink/pom.xml ---------------------------------------------------------------------- diff --git a/flink/pom.xml b/flink/pom.xml new file mode 100644 index 0000000..9f62b5f --- /dev/null +++ b/flink/pom.xml @@ -0,0 +1,176 @@ +<?xml version="1.0" encoding="UTF-8"?> + +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.mahout</groupId> + <artifactId>mahout</artifactId> + <version>0.10.0-SNAPSHOT</version> + <relativePath>../pom.xml</relativePath> + </parent> + + <artifactId>mahout-flink_2.10</artifactId> + <name>Mahout Flink bindings</name> + <description> + Mahout Bindings for Apache Flink + </description> + + <packaging>jar</packaging> + + <build> + <plugins> + + <plugin> + <artifactId>maven-javadoc-plugin</artifactId> + </plugin> + + <plugin> + <artifactId>maven-source-plugin</artifactId> + </plugin> + + <plugin> + <groupId>net.alchim31.maven</groupId> + <artifactId>scala-maven-plugin</artifactId> + <executions> + <execution> + <id>add-scala-sources</id> + <phase>initialize</phase> + <goals> + <goal>add-source</goal> + </goals> + </execution> + <execution> + <id>scala-compile</id> + <phase>process-resources</phase> + <goals> + <goal>compile</goal> + </goals> + </execution> + <execution> + <id>scala-test-compile</id> + <phase>process-test-resources</phase> + <goals> + <goal>testCompile</goal> + </goals> + </execution> + </executions> + </plugin> + + <!--this is what scalatest recommends to do to enable scala tests --> + + <!-- disable surefire --> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-plugin</artifactId> + <configuration> + <skipTests>true</skipTests> + </configuration> + </plugin> + <!-- enable scalatest --> + <plugin> + <groupId>org.scalatest</groupId> + <artifactId>scalatest-maven-plugin</artifactId> + <executions> + <execution> + <id>test</id> + <goals> + <goal>test</goal> + </goals> + </execution> + </executions> + </plugin> + + <!-- create an all dependencies job.jar --> + <!-- todo: before release we need a better way to do this MAHOUT-1636 --> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-assembly-plugin</artifactId> + <executions> + <execution> + <id>dependency-reduced</id> + <phase>package</phase> + <goals> + <goal>single</goal> + </goals> + <configuration> + <descriptors> + <descriptor>src/main/assembly/dependency-reduced.xml</descriptor> + </descriptors> + </configuration> + </execution> + </executions> + </plugin> + + + </plugins> + </build> + + <dependencies> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-core</artifactId> + <version>${flink.version}</version> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-scala</artifactId> + <version>${flink.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.mahout</groupId> + <artifactId>mahout-math-scala_${scala.compat.version}</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.mahout</groupId> + <artifactId>mahout-hdfs</artifactId> + <exclusions> + <exclusion> + <groupId>asm</groupId> + <artifactId>asm</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>org.apache.mahout</groupId> + <artifactId>mahout-math-scala_${scala.compat.version}</artifactId> + <classifier>tests</classifier> + <scope>test</scope> + </dependency> + + <!-- 3rd-party --> + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + </dependency> + + <!-- scala stuff --> + <dependency> + <groupId>org.scalatest</groupId> + <artifactId>scalatest_${scala.compat.version}</artifactId> + </dependency> + + </dependencies> +</project> http://git-wip-us.apache.org/repos/asf/mahout/blob/bb4c4bca/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkDistributedContext.scala ---------------------------------------------------------------------- diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkDistributedContext.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkDistributedContext.scala new file mode 100644 index 0000000..1124126 --- /dev/null +++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkDistributedContext.scala @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.flinkbindings + +import org.apache.mahout.math.drm.{ DistributedEngine, BCast, DistributedContext } +import org.apache.flink.api.scala.ExecutionEnvironment + +class FlinkDistributedContext(val env: ExecutionEnvironment) extends DistributedContext { + + val engine: DistributedEngine = FlinkEngine + + override def close() { + // TODO + } + +} http://git-wip-us.apache.org/repos/asf/mahout/blob/bb4c4bca/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala ---------------------------------------------------------------------- diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala new file mode 100644 index 0000000..66c1089 --- /dev/null +++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala @@ -0,0 +1,115 @@ +package org.apache.mahout.flinkbindings + +import scala.reflect.ClassTag + +import org.apache.mahout.math.Matrix +import org.apache.mahout.math.Vector +import org.apache.mahout.math.drm.BCast +import org.apache.mahout.math.drm.CacheHint +import org.apache.mahout.math.drm.CheckpointedDrm +import org.apache.mahout.math.drm.DistributedContext +import org.apache.mahout.math.drm.DistributedEngine +import org.apache.mahout.math.drm.DrmLike +import org.apache.mahout.math.indexeddataset.DefaultIndexedDatasetElementReadSchema +import org.apache.mahout.math.indexeddataset.DefaultIndexedDatasetReadSchema +import org.apache.mahout.math.indexeddataset.IndexedDataset +import org.apache.mahout.math.indexeddataset.Schema + +import com.google.common.collect.BiMap +import com.google.common.collect.HashBiMap + +object FlinkEngine extends DistributedEngine { + + /** Second optimizer pass. Translate previously rewritten logical pipeline into physical engine plan. */ + override def toPhysical[K: ClassTag](plan: DrmLike[K], ch: CacheHint.CacheHint): CheckpointedDrm[K] = { + null + } + + /** Engine-specific colSums implementation based on a checkpoint. */ + override def colSums[K: ClassTag](drm: CheckpointedDrm[K]): Vector = { + null + } + + /** Engine-specific numNonZeroElementsPerColumn implementation based on a checkpoint. */ + override def numNonZeroElementsPerColumn[K: ClassTag](drm: CheckpointedDrm[K]): Vector = { + null + } + + /** Engine-specific colMeans implementation based on a checkpoint. */ + override def colMeans[K: ClassTag](drm: CheckpointedDrm[K]): Vector = { + null + } + + override def norm[K: ClassTag](drm: CheckpointedDrm[K]): Double = { + 0.0d + } + + /** Broadcast support */ + override def drmBroadcast(v: Vector)(implicit dc: DistributedContext): BCast[Vector] = { + null + } + + /** Broadcast support */ + override def drmBroadcast(m: Matrix)(implicit dc: DistributedContext): BCast[Matrix] = { + null + } + + /** + * Load DRM from hdfs (as in Mahout DRM format). + * <P/> + * @param path The DFS path to load from + * @param parMin Minimum parallelism after load (equivalent to #par(min=...)). + */ + override def drmDfsRead(path: String, parMin: Int = 0) + (implicit sc: DistributedContext): CheckpointedDrm[_] = { + null + } + + /** Parallelize in-core matrix as spark distributed matrix, using row ordinal indices as data set keys. */ + override def drmParallelizeWithRowIndices(m: Matrix, numPartitions: Int = 1) + (implicit sc: DistributedContext): CheckpointedDrm[Int] = { + null + } + + /** Parallelize in-core matrix as spark distributed matrix, using row labels as a data set keys. */ + override def drmParallelizeWithRowLabels(m: Matrix, numPartitions: Int = 1) + (implicit sc: DistributedContext): CheckpointedDrm[String] = { + null + } + + /** This creates an empty DRM with specified number of partitions and cardinality. */ + override def drmParallelizeEmpty(nrow: Int, ncol: Int, numPartitions: Int = 10) + (implicit sc: DistributedContext): CheckpointedDrm[Int] = { + null + } + + /** Creates empty DRM with non-trivial height */ + override def drmParallelizeEmptyLong(nrow: Long, ncol: Int, numPartitions: Int = 10) + (implicit sc: DistributedContext): CheckpointedDrm[Long] = { + null + } + + /** + * Load IndexedDataset from text delimited format. + * @param src comma delimited URIs to read from + * @param schema defines format of file(s) + */ + override def indexedDatasetDFSRead(src: String, + schema: Schema = DefaultIndexedDatasetReadSchema, + existingRowIDs: BiMap[String, Int] = HashBiMap.create()) + (implicit sc: DistributedContext): IndexedDataset = { + null + } + + /** + * Load IndexedDataset from text delimited format, one element per line + * @param src comma delimited URIs to read from + * @param schema defines format of file(s) + */ + override def indexedDatasetDFSReadElements(src: String, + schema: Schema = DefaultIndexedDatasetElementReadSchema, + existingRowIDs: BiMap[String, Int] = HashBiMap.create()) + (implicit sc: DistributedContext): IndexedDataset = { + null + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/mahout/blob/bb4c4bca/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala ---------------------------------------------------------------------- diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala new file mode 100644 index 0000000..fb0780e --- /dev/null +++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala @@ -0,0 +1,11 @@ +package org.apache.mahout + +import org.slf4j.LoggerFactory + +package object flinkbindings { + + private[flinkbindings] val log = LoggerFactory.getLogger("apache.org.mahout.flinkbingings") + + + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/mahout/blob/bb4c4bca/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedContext.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedContext.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedContext.scala index 39bab90..e1833d8 100644 --- a/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedContext.scala +++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedContext.scala @@ -22,6 +22,6 @@ import java.io.Closeable /** Distributed context (a.k.a. distributed session handle) */ trait DistributedContext extends Closeable { - val engine:DistributedEngine + val engine: DistributedEngine } http://git-wip-us.apache.org/repos/asf/mahout/blob/bb4c4bca/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 38cd82c..f90b41b 100644 --- a/pom.xml +++ b/pom.xml @@ -121,6 +121,7 @@ <scala.compat.version>2.10</scala.compat.version> <scala.version>2.10.4</scala.version> <spark.version>1.3.1</spark.version> + <flink.version>0.8.1</flink.version> <h2o.version>0.1.25</h2o.version> </properties> <issueManagement> @@ -228,7 +229,6 @@ <groupId>${project.groupId}</groupId> <version>${project.version}</version> </dependency> - <dependency> <artifactId>mahout-spark-shell_${scala.compat.version}</artifactId> <groupId>${project.groupId}</groupId> @@ -236,6 +236,12 @@ </dependency> <dependency> + <artifactId>mahout-flink_${scala.compat.version}</artifactId> + <groupId>${project.groupId}</groupId> + <version>${project.version}</version> + </dependency> + + <dependency> <artifactId>mahout-h2o_${scala.compat.version}</artifactId> <groupId>${project.groupId}</groupId> <version>${project.version}</version>
