http://git-wip-us.apache.org/repos/asf/mahout/blob/99a5358f/math-scala/src/main/scala/org/apache/mahout/math/algorithms/clustering/Canopy.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/algorithms/clustering/Canopy.scala b/math-scala/src/main/scala/org/apache/mahout/math/algorithms/clustering/Canopy.scala deleted file mode 100644 index 96d1968..0000000 --- a/math-scala/src/main/scala/org/apache/mahout/math/algorithms/clustering/Canopy.scala +++ /dev/null @@ -1,158 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.mahout.math.algorithms.clustering - - - -import org.apache.mahout.math.algorithms.common.distance.{DistanceMetric, DistanceMetricSelector} -import org.apache.mahout.math._ -import org.apache.mahout.math.drm._ -import org.apache.mahout.math.drm.RLikeDrmOps._ -import org.apache.mahout.math.function.VectorFunction -import org.apache.mahout.math.scalabindings._ -import org.apache.mahout.math.scalabindings.RLikeOps._ -import org.apache.mahout.math.{Matrix, Vector} - - -class CanopyClusteringModel(canopies: Matrix, dm: Symbol) extends ClusteringModel { - - val canopyCenters = canopies - val distanceMetric = dm - - def cluster[K](input: DrmLike[K]): DrmLike[K] = { - - implicit val ctx = input.context - implicit val ktag = input.keyClassTag - - val bcCanopies = drmBroadcast(canopyCenters) - val bcDM = drmBroadcast(dvec(DistanceMetricSelector.namedMetricLookup(distanceMetric))) - - input.mapBlock(1) { - case (keys, block: Matrix) => { - val outputMatrix = new DenseMatrix(block.nrow, 1) - - val localCanopies: Matrix = bcCanopies.value - for (i <- 0 until block.nrow) { - val distanceMetric = DistanceMetricSelector.select(bcDM.value.get(0)) - - val cluster = (0 until localCanopies.nrow).foldLeft(-1, 9999999999999999.9)((l, r) => { - val dist = distanceMetric.distance(localCanopies(r, ::), block(i, ::)) - if ((dist) < l._2) { - (r, dist) - } - else { - l - } - })._1 - outputMatrix(i, ::) = dvec(cluster) - } - keys -> outputMatrix - } - } - } -} - - -class CanopyClustering extends ClusteringFitter { - - var t1: Double = _ // loose distance - var t2: Double = _ // tight distance - var t3: Double = _ - var t4: Double = _ - var distanceMeasure: Symbol = _ - - def setStandardHyperparameters(hyperparameters: Map[Symbol, Any] = Map('foo -> None)): Unit = { - t1 = hyperparameters.asInstanceOf[Map[Symbol, Double]].getOrElse('t1, 0.5) - t2 = hyperparameters.asInstanceOf[Map[Symbol, Double]].getOrElse('t2, 0.1) - t3 = hyperparameters.asInstanceOf[Map[Symbol, Double]].getOrElse('t3, t1) - t4 = hyperparameters.asInstanceOf[Map[Symbol, Double]].getOrElse('t4, t2) - - distanceMeasure = hyperparameters.asInstanceOf[Map[Symbol, Symbol]].getOrElse('distanceMeasure, 'Cosine) - - } - - def fit[K](input: DrmLike[K], - hyperparameters: (Symbol, Any)*): CanopyClusteringModel = { - - setStandardHyperparameters(hyperparameters.toMap) - implicit val ctx = input.context - implicit val ktag = input.keyClassTag - - val dmNumber = DistanceMetricSelector.namedMetricLookup(distanceMeasure) - - val distanceBC = drmBroadcast(dvec(t1,t2,t3,t4, dmNumber)) - val canopies = input.allreduceBlock( - { - - // Assign All Points to Clusters - case (keys, block: Matrix) => { - val t1_local = distanceBC.value.get(0) - val t2_local = distanceBC.value.get(1) - val dm = distanceBC.value.get(4) - CanopyFn.findCenters(block, DistanceMetricSelector.select(dm), t1_local, t2_local) - } - }, { - // Optionally Merge Clusters that are close enough - case (oldM: Matrix, newM: Matrix) => { - val t3_local = distanceBC.value.get(2) - val t4_local = distanceBC.value.get(3) - val dm = distanceBC.value.get(4) - CanopyFn.findCenters(oldM, DistanceMetricSelector.select(dm), t3_local, t4_local) - } - }) - - val model = new CanopyClusteringModel(canopies, distanceMeasure) - model.summary = s"""CanopyClusteringModel\n${canopies.nrow} Clusters\n${distanceMeasure} distance metric used for calculating distances\nCanopy centers stored in model.canopies where row n coresponds to canopy n""" - model - } - - -} - -object CanopyFn extends Serializable { - def findCenters(block: Matrix, distanceMeasure: DistanceMetric, t1: Double, t2: Double): Matrix = { - val block = dense((1.0, 1.2, 1.3, 1.4), (1.1, 1.5, 2.5, 1.0), (6.0, 5.2, -5.2, 5.3), (7.0,6.0, 5.0, 5.0), (10.0, 1.0, 20.0, -10.0)) - var rowAssignedToCanopy = Array.fill(block.nrow) { false } - val clusterBuf = scala.collection.mutable.ListBuffer.empty[org.apache.mahout.math.Vector] - while (rowAssignedToCanopy.contains(false)) { - val rowIndexOfNextUncanopiedVector = rowAssignedToCanopy.indexOf(false) - clusterBuf += block(rowIndexOfNextUncanopiedVector, ::).cloned - block(rowIndexOfNextUncanopiedVector, ::) = svec(Nil, cardinality = block.ncol) - rowAssignedToCanopy(rowIndexOfNextUncanopiedVector) = true - for (i <- 0 until block.nrow) { - if (block(i, ::).getNumNonZeroElements > 0) { // - distanceMeasure.distance(block(i, ::), clusterBuf.last) match { - case d if d < t2 => { - - rowAssignedToCanopy(i) = true - block(i, ::) = svec(Nil, cardinality = block.ncol) - } - case d if d < t1 => { - - rowAssignedToCanopy(i) = true - } - case d => {} - } - } - } - } - dense(clusterBuf) - } -} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/mahout/blob/99a5358f/math-scala/src/main/scala/org/apache/mahout/math/algorithms/clustering/ClusteringModel.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/algorithms/clustering/ClusteringModel.scala b/math-scala/src/main/scala/org/apache/mahout/math/algorithms/clustering/ClusteringModel.scala deleted file mode 100644 index 8ab1170..0000000 --- a/math-scala/src/main/scala/org/apache/mahout/math/algorithms/clustering/ClusteringModel.scala +++ /dev/null @@ -1,45 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.mahout.math.algorithms.clustering - -import org.apache.mahout.math.algorithms.{UnsupervisedFitter, UnsupervisedModel} -import org.apache.mahout.math.drm.DrmLike - -trait ClusteringModel extends UnsupervisedModel { - - def cluster[K](input: DrmLike[K]): DrmLike[K] - -} - -trait ClusteringFitter extends UnsupervisedFitter { - - def fit[K](input: DrmLike[K], - hyperparameters: (Symbol, Any)*): ClusteringModel - - def fitCluster[K](input: DrmLike[K], - hyperparameters: (Symbol, Any)*): DrmLike[K] = { - model = this.fit(input, hyperparameters:_*) - model.cluster(input) - - } - - // used to store the model if `fitTransform` method called - var model: ClusteringModel = _ -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/mahout/blob/99a5358f/math-scala/src/main/scala/org/apache/mahout/math/algorithms/common/distance/DistanceMetrics.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/algorithms/common/distance/DistanceMetrics.scala b/math-scala/src/main/scala/org/apache/mahout/math/algorithms/common/distance/DistanceMetrics.scala deleted file mode 100644 index 00495fd..0000000 --- a/math-scala/src/main/scala/org/apache/mahout/math/algorithms/common/distance/DistanceMetrics.scala +++ /dev/null @@ -1,48 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.mahout.math.algorithms.common.distance - -import org.apache.mahout.math.function.Functions -import org.apache.mahout.math.{CardinalityException, Vector} - -trait DistanceMetric extends Serializable { - def distance(v1: Vector, v2: Vector): Double -} - - -object DistanceMetricSelector extends Serializable{ - - val namedMetricLookup = Map('Chebyshev -> 1.0, 'Cosine -> 2.0) - - def select(dm: Double): DistanceMetric = { - dm match { - case 1.0 => Chebyshev - case 2.0 => Cosine - } - } -} - -object Chebyshev extends DistanceMetric { - def distance(v1: Vector, v2: Vector): Double = { - if (v1.size != v2.size) throw new CardinalityException(v1.size, v2.size) - v1.aggregate(v2, Functions.MAX_ABS, Functions.MINUS) - } -} - -object Cosine extends DistanceMetric { - def distance(v1: Vector, v2: Vector): Double = 1.0 - v1.dot(v2) / (Math.sqrt(v1.getLengthSquared) * Math.sqrt(v2.getLengthSquared)) -} http://git-wip-us.apache.org/repos/asf/mahout/blob/99a5358f/math-scala/src/main/scala/org/apache/mahout/math/algorithms/preprocessing/AsFactor.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/algorithms/preprocessing/AsFactor.scala b/math-scala/src/main/scala/org/apache/mahout/math/algorithms/preprocessing/AsFactor.scala deleted file mode 100644 index 2e2a3dd..0000000 --- a/math-scala/src/main/scala/org/apache/mahout/math/algorithms/preprocessing/AsFactor.scala +++ /dev/null @@ -1,129 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.mahout.math.algorithms.preprocessing - - - -import collection._ -import JavaConversions._ -import org.apache.mahout.math._ -import org.apache.mahout.math.drm._ -import org.apache.mahout.math.{Vector => MahoutVector} -import org.apache.mahout.math.drm.RLikeDrmOps._ -import org.apache.mahout.math.scalabindings._ -import org.apache.mahout.math.scalabindings.RLikeOps._ -import MahoutCollections._ - -class AsFactor extends PreprocessorFitter { - - def fit[K](input: DrmLike[K], - hyperparameters: (Symbol, Any)*): AsFactorModel = { - - import org.apache.mahout.math.function.VectorFunction - val factorMap = input.allreduceBlock( - { case (keys, block: Matrix) => block }, - { case (oldM: Matrix, newM: Matrix) => - // someday we'll replace this with block.max: Vector - // or better yet- block.distinct - - dense((oldM rbind newM).aggregateColumns( new VectorFunction { - def apply(f: Vector): Double = f.max - })) - })(0, ::) - /* - val A = drmParallelize(dense( - (3, 2, 1), - (0, 0, 0), - (1, 1, 1)) - -> (4,2,2), now 4,3,2 - */ - new AsFactorModel(factorMap.sum.toInt, - dvec(factorMap.toArray.scanLeft(0.0)((l, r) => l + r ).take(factorMap.length)) - // factorMap - ) - } - -} - -class AsFactorModel(cardinality: Int, factorVec: MahoutVector) extends PreprocessorModel { - - val factorMap: MahoutVector = factorVec - - def transform[K](input: DrmLike[K]): DrmLike[K] ={ - - implicit val ctx = input.context - - val bcastK = drmBroadcast(dvec(cardinality)) - val bcastFactorMap = drmBroadcast(factorMap) - - implicit val ktag = input.keyClassTag - - val res = input.mapBlock(cardinality) { - case (keys, block: Matrix) => { - val cardinality: Int = bcastK.value.get(0).toInt - val output = new SparseMatrix(block.nrow, cardinality) - // This is how we take a vector of mapping to a map - val fm = bcastFactorMap.value - for (n <- 0 until output.nrow){ - var m = 0 - for (e <- block(n, ::).all() ){ - output(n, fm.get(m).toInt + e.get().toInt ) = 1.0 - m += 1 - } - } - (keys, output) - } - } - res - } - - override def invTransform[K](input: DrmLike[K]): DrmLike[K] = { - implicit val ctx = input.context - - val bcastK = drmBroadcast(dvec(cardinality)) - val bcastFactorMap = drmBroadcast(factorMap) - - implicit val ktag = input.keyClassTag - - val res = input.mapBlock(cardinality) { - case (keys, block: Matrix) => { - val k: Int = bcastK.value.get(0).toInt - val output = new DenseMatrix(block.nrow, bcastK.value.length) - // This is how we take a vector of mapping to a map - val fm = bcastFactorMap.all.toSeq.map(e => e.get -> e.index).toMap - - import MahoutCollections._ - val indexArray = Array(1.0) ++ bcastFactorMap.value.toArray.map(i => i.toInt) - for (n <- 0 until output.nrow){ - val v = new DenseVector(bcastFactorMap.value.length) - var m = 0 - for (e <- block(n, ::).asInstanceOf[RandomAccessSparseVector].iterateNonZero() ){ - v.setQuick(m, e.index - m) - m += 1 - } - output(n, ::) = v - } - (keys, output) - } - } - res - } - -} http://git-wip-us.apache.org/repos/asf/mahout/blob/99a5358f/math-scala/src/main/scala/org/apache/mahout/math/algorithms/preprocessing/MeanCenter.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/algorithms/preprocessing/MeanCenter.scala b/math-scala/src/main/scala/org/apache/mahout/math/algorithms/preprocessing/MeanCenter.scala deleted file mode 100644 index 258ad1b..0000000 --- a/math-scala/src/main/scala/org/apache/mahout/math/algorithms/preprocessing/MeanCenter.scala +++ /dev/null @@ -1,91 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.mahout.math.algorithms.preprocessing - -import collection._ -import JavaConversions._ -import org.apache.mahout.math.drm._ -import org.apache.mahout.math.drm.RLikeDrmOps._ -import org.apache.mahout.math.Matrix -import org.apache.mahout.math.scalabindings.RLikeOps._ -import org.apache.mahout.math.{Vector => MahoutVector} - - - -class MeanCenter extends PreprocessorFitter { - - /** - * Centers Columns at zero or centers - * @param input A drm which to center on - * - */ - def fit[K](input: DrmLike[K], - hyperparameters: (Symbol, Any)*): MeanCenterModel = { - new MeanCenterModel(input.colMeans()) // could add centers here - } - -} - -/** - * A model for mean centering each column of a data set at 0 or some number specified by the setCenters method. - * @param means - */ -class MeanCenterModel(means: MahoutVector) extends PreprocessorModel { - - var colCentersV: MahoutVector = means - - def setCenters(centers: MahoutVector): Unit = { - if (means.length != centers.length){ - throw new Exception(s"Length of centers vector (${centers.length}) must equal length of means vector ((${means.length}) (e.g. the number of columns in the orignally fit input).") - } - colCentersV = means + centers - } - def transform[K](input: DrmLike[K]): DrmLike[K] = { - - implicit val ctx = input.context - implicit val ktag = input.keyClassTag - - val bcastV = drmBroadcast(colCentersV) - - val output = input.mapBlock(input.ncol) { - case (keys, block: Matrix) => - val copy: Matrix = block.cloned - copy.foreach(row => row -= bcastV.value) - (keys, copy) - } - output - } - - def invTransform[K](input: DrmLike[K]): DrmLike[K] = { - - implicit val ctx = input.context - implicit val ktag = input.keyClassTag - val bcastV = drmBroadcast(colCentersV) - - val output = input.mapBlock(input.ncol) { - case (keys, block: Matrix) => - val copy: Matrix = block.cloned - copy.foreach(row => row += bcastV.value) - (keys, copy) - } - output - } - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/mahout/blob/99a5358f/math-scala/src/main/scala/org/apache/mahout/math/algorithms/preprocessing/PreprocessorModel.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/algorithms/preprocessing/PreprocessorModel.scala b/math-scala/src/main/scala/org/apache/mahout/math/algorithms/preprocessing/PreprocessorModel.scala deleted file mode 100644 index 5adb87d..0000000 --- a/math-scala/src/main/scala/org/apache/mahout/math/algorithms/preprocessing/PreprocessorModel.scala +++ /dev/null @@ -1,58 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.mahout.math.algorithms.preprocessing - -import org.apache.mahout.math.algorithms.{UnsupervisedFitter, UnsupervisedModel} -import org.apache.mahout.math.drm.DrmLike - -trait PreprocessorModel extends UnsupervisedModel { - - /** - * A convenience method for returning transformed data back to original - * @param input - * @tparam K - * @return - */ - def invTransform[K](input: DrmLike[K]): DrmLike[K] - - /** - * Transform given Drm given the feature set - * @param input - - */ - def transform[K](input: DrmLike[K]): DrmLike[K] - -} - -trait PreprocessorFitter extends UnsupervisedFitter { - - def fit[K](input: DrmLike[K], - hyperparameters: (Symbol, Any)*): PreprocessorModel - - def fitTransform[K](input: DrmLike[K], - hyperparameters: (Symbol, Any)*): DrmLike[K] = { - model = this.fit(input, hyperparameters:_*) - model.transform(input) - - } - - // used to store the model if `fitTransform` method called - var model: PreprocessorModel = _ -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/mahout/blob/99a5358f/math-scala/src/main/scala/org/apache/mahout/math/algorithms/preprocessing/StandardScaler.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/algorithms/preprocessing/StandardScaler.scala b/math-scala/src/main/scala/org/apache/mahout/math/algorithms/preprocessing/StandardScaler.scala deleted file mode 100644 index 5863330..0000000 --- a/math-scala/src/main/scala/org/apache/mahout/math/algorithms/preprocessing/StandardScaler.scala +++ /dev/null @@ -1,108 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.mahout.math.algorithms.preprocessing - -import collection._ -import JavaConversions._ - -import org.apache.mahout.math.drm._ -import org.apache.mahout.math.drm.RLikeDrmOps._ -import org.apache.mahout.math.scalabindings.RLikeOps._ -import org.apache.mahout.math.{Vector => MahoutVector, Matrix} - -/** - * Scales columns to mean 0 and unit variance - * - * An important note- The equivelent call in R would be something like - * ```r - * N <- nrow(x) - * scale(x, scale= apply(x, 2, sd) * sqrt(N-1/N)) - * ``` - * - * This is because R uses degrees of freedom = 1 to calculate standard deviation. - * Multiplying the standard deviation by sqrt(N-1/N) 'undoes' this correction. - * - * The StandardScaler of sklearn uses degrees of freedom = 0 for its calculation, so results - * should be similar. - */ -class StandardScaler extends PreprocessorFitter { - - def fit[K](input: DrmLike[K], - hyperparameters: (Symbol, Any)*): StandardScalerModel = { - val mNv = dcolMeanVars(input) - new StandardScalerModel(mNv._1, mNv._2.sqrt) - } - -} - -class StandardScalerModel(val meanVec: MahoutVector, - val stdev: MahoutVector - ) extends PreprocessorModel { - - - def transform[K](input: DrmLike[K]): DrmLike[K] = { - implicit val ctx = input.context - - // Some mapBlock() calls need it - // implicit val ktag = input.keyClassTag - - val bcastMu = drmBroadcast(meanVec) - val bcastSigma = drmBroadcast(stdev) - - implicit val ktag = input.keyClassTag - - val res = input.mapBlock(input.ncol) { - case (keys, block: Matrix) => { - val copy: Matrix = block.cloned - copy.foreach(row => row := (row - bcastMu) / bcastSigma ) - (keys, copy) - } - } - res - } - - /** - * Given a an output- trasform it back into the original - * e.g. a normalized column, back to original values. - * - * @param input - * @tparam K - * @return - */ - def invTransform[K](input: DrmLike[K]): DrmLike[K] = { // [K: ClassTag] - - implicit val ctx = input.context - - // Some mapBlock() calls need it - implicit val ktag = input.keyClassTag - - val bcastMu = drmBroadcast(meanVec) - val bcastSigma = drmBroadcast(stdev) - - val res = input.mapBlock(input.ncol) { - case (keys, block: Matrix) => { - val copy: Matrix = block.cloned - copy.foreach(row => row := (row * bcastSigma ) + bcastMu) - (keys, copy) - } - } - res - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/mahout/blob/99a5358f/math-scala/src/main/scala/org/apache/mahout/math/algorithms/regression/CochraneOrcuttModel.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/algorithms/regression/CochraneOrcuttModel.scala b/math-scala/src/main/scala/org/apache/mahout/math/algorithms/regression/CochraneOrcuttModel.scala deleted file mode 100644 index 3e5a496..0000000 --- a/math-scala/src/main/scala/org/apache/mahout/math/algorithms/regression/CochraneOrcuttModel.scala +++ /dev/null @@ -1,151 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.mahout.math.algorithms.regression - -import org.apache.mahout.math.algorithms.regression.tests._ -import org.apache.mahout.math.drm.{CacheHint, DrmLike, safeToNonNegInt} -import org.apache.mahout.math.drm.RLikeDrmOps._ -import org.apache.mahout.math.function.Functions -import org.apache.mahout.math.scalabindings.RLikeOps._ -import org.apache.mahout.math.{Vector => MahoutVector} - - -class CochraneOrcuttModel[K](regressor: LinearRegressorModel[K]) extends LinearRegressorModel[K] { - // https://en.wikipedia.org/wiki/Cochrane%E2%80%93Orcutt_estimation - - var betas: Array[MahoutVector] = _ - var dws: Array[Double] = _ - var rhos: Array[Double] = _ - - def predict(drmPredictors: DrmLike[K]): DrmLike[K] = { - regressor.predict(drmPredictors) - } - -} - -class CochraneOrcutt[K](hyperparameters: (Symbol, Any)*) extends LinearRegressorFitter[K] { - - var regressor: LinearRegressorFitter[K] = _ - var iterations: Int = _ - var cacheHint: CacheHint.CacheHint = _ - // For larger inputs, CacheHint.MEMORY_AND_DISK2 is reccomended. - - def setHyperparameters(hyperparameters: Map[Symbol, Any] = Map('foo -> None)): Unit = { - setStandardHyperparameters(hyperparameters.toMap) - regressor = hyperparameters.asInstanceOf[Map[Symbol, LinearRegressorFitter[K]]].getOrElse('regressor, new OrdinaryLeastSquares()) - regressor.calcStandardErrors = false - regressor.calcCommonStatistics = false - iterations = hyperparameters.asInstanceOf[Map[Symbol, Int]].getOrElse('iterations, 3) - cacheHint = hyperparameters.asInstanceOf[Map[Symbol, CacheHint.CacheHint]].getOrElse('cacheHint, CacheHint.MEMORY_ONLY) - } - - setHyperparameters(hyperparameters.toMap) - - def calculateRho(errorDrm: DrmLike[K]): Double ={ - val error = errorDrm.collect.viewColumn(0) - val n = error.length - 1 - val e2: MahoutVector = error.viewPart(1, n) - val e3: MahoutVector = error.viewPart(0, n) - // regression through the origin lm(e2 ~e3 -1) is sum(e2 * e3) / e3^2 - e3.times(e2).sum / e3.assign(Functions.SQUARE).sum - } - - def fit(drmFeatures: DrmLike[K], drmTarget: DrmLike[K], hyperparameters: (Symbol, Any)*): CochraneOrcuttModel[K] = { - - setHyperparameters(hyperparameters.toMap[Symbol, Any]) - - val betas = new Array[MahoutVector](iterations) - val models = new Array[LinearRegressorModel[K]](iterations) - val dws = new Array[Double](iterations) - val rhos = new Array[Double](iterations) - - val n = safeToNonNegInt(drmTarget.nrow) - val Y = drmTarget(1 until n, 0 until 1).checkpoint(cacheHint) - val Y_lag = drmTarget(0 until n - 1, 0 until 1).checkpoint(cacheHint) - val X = drmFeatures(1 until n, 0 until drmFeatures.ncol).checkpoint(cacheHint) - val X_lag = drmFeatures(0 until n - 1, 0 until drmFeatures.ncol).checkpoint(cacheHint) - - // Step 1: Normal Regression - regressor.calcStandardErrors = true - regressor.calcCommonStatistics = true - models(0) = regressor.fit(drmFeatures, drmTarget) - regressor.calcStandardErrors = false - regressor.calcCommonStatistics = false - betas(0) = models(0).beta - var residuals = drmTarget - models(0).predict(drmFeatures) - - for (i <- 1 until iterations){ - // Step 2: Calculate Rho - val rho_hat = calculateRho(residuals) - rhos(i-1) = rho_hat - - // Step 3: Transform Variables - val drmYprime = Y - (Y_lag * rho_hat) - val drmXprime = X - (X_lag * rho_hat) - - // Step 4: Get Estimates of Transformed Equation - if (i == iterations - 1 ){ - // get standard errors on last iteration only - regressor.calcStandardErrors = true - regressor.calcCommonStatistics = true - } - models(i) = regressor.fit(drmXprime, drmYprime) - // Make this optional- only for parity with R reported dw-stat, doesn't really mean anything - dws(i) = AutocorrelationTests.DurbinWatson( models(i), - drmTarget - models(i).predict(drmFeatures)) - .testResults.get('durbinWatsonTestStatistic).get.asInstanceOf[Double] - - models(i).beta(X.ncol) = models(i).beta(X.ncol) / (1 - rho_hat) // intercept adjust - betas(i) = models(i).beta - - // Step 5: Use Betas from (4) to recalculate model from (1) - residuals = drmTarget - models(i).predict(drmFeatures) - - /** Step 6: repeat Step 2 through 5 until a stopping criteria is met. - * some models call for convergence- - * Kunter et. al reccomend 3 iterations, if you don't achieve desired results, use - * an alternative method. - **/ - } - - var finalModel = new CochraneOrcuttModel[K](models(iterations -1)) - finalModel.betas = betas - finalModel.dws = dws - finalModel.rhos = rhos - finalModel.tScore = models(iterations -1).tScore - finalModel.pval = models(iterations -1).pval - finalModel.beta = models(iterations -1).beta - val se = models(iterations -1).se - se(se.length -1) = se(se.length -1) / (1 - rhos(iterations - 2)) - finalModel.se = se - finalModel.summary = "Original Model:\n" + models(0).summary + - "\n\nTransformed Model:\n" + - generateSummaryString(finalModel) + - "\n\nfinal rho: " + finalModel.rhos(iterations - 2) + - s"\nMSE: ${models(iterations -1 ).mse}\nR2: ${models(iterations -1 ).r2}\n" - - if (models(0).addIntercept == true){ - finalModel.summary = finalModel.summary.replace(s"X${X.ncol}", "(Intercept)") - } - - finalModel - } - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/mahout/blob/99a5358f/math-scala/src/main/scala/org/apache/mahout/math/algorithms/regression/LinearRegressorModel.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/algorithms/regression/LinearRegressorModel.scala b/math-scala/src/main/scala/org/apache/mahout/math/algorithms/regression/LinearRegressorModel.scala deleted file mode 100644 index 7b87a1a..0000000 --- a/math-scala/src/main/scala/org/apache/mahout/math/algorithms/regression/LinearRegressorModel.scala +++ /dev/null @@ -1,178 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.mahout.math.algorithms.regression - -import org.apache.mahout.math.algorithms.regression.tests.FittnessTests -import org.apache.mahout.math.drm._ -import org.apache.mahout.math.drm.DrmLike -import org.apache.mahout.math.drm.RLikeDrmOps._ -import org.apache.mahout.math.scalabindings.dvec -import org.apache.mahout.math.{Matrix, Vector => MahoutVector} -import org.apache.mahout.math.scalabindings.RLikeOps._ -import org.apache.commons.math3.distribution._ - -import scala.language.higherKinds - -trait LinearRegressorModel[K] extends RegressorModel[K] { - - var beta: MahoutVector = _ - var se: MahoutVector = _ - var tScore: MahoutVector = _ - var pval: MahoutVector = _ - - - -} - -trait LinearRegressorFitter[K] extends RegressorFitter[K] { - - var calcStandardErrors: Boolean = _ - var calcCommonStatistics: Boolean = _ - - def fit(drmX: DrmLike[K], - drmTarget: DrmLike[K], - hyperparameters: (Symbol, Any)*): LinearRegressorModel[K] - - - def setStandardHyperparameters(hyperparameters: Map[Symbol, Any] = Map('foo -> None)): Unit = { - calcCommonStatistics = hyperparameters.asInstanceOf[Map[Symbol, Boolean]].getOrElse('calcCommonStatistics, true) - calcStandardErrors = hyperparameters.asInstanceOf[Map[Symbol, Boolean]].getOrElse('calcStandardErrors, true) - addIntercept = hyperparameters.asInstanceOf[Map[Symbol, Boolean]].getOrElse('addIntercept, true) - } - - - def calculateStandardError[M[K] <: LinearRegressorModel[K]](X: DrmLike[K], - drmTarget: DrmLike[K], - drmXtXinv: Matrix, - model: M[K]): M[K] = { - import org.apache.mahout.math.function.Functions.SQRT - import org.apache.mahout.math.scalabindings.MahoutCollections._ - - val yhat = X %*% model.beta - val residuals = drmTarget - yhat - - // Setting modelOut.rss - // Changed name from ete, to rssModel. This is residual sum of squares for model of yhat vs y - var modelOut = FittnessTests.calculateResidualSumOfSquares(model,residuals) - - val n = drmTarget.nrow - val k = safeToNonNegInt(X.ncol) - val invDegFreedomKindOf = 1.0 / (n - k) - val varCovarMatrix = invDegFreedomKindOf * modelOut.rss * drmXtXinv - val se = varCovarMatrix.viewDiagonal.assign(SQRT) - val tScore = model.beta / se - val tDist = new TDistribution(n-k) - - val pval = dvec(tScore.toArray.map(t => 2 * (1.0 - tDist.cumulativeProbability(Math.abs(t))) )) - - // ^^ TODO bug in this calculation- fix and add test - //degreesFreedom = k - modelOut.se = se - modelOut.tScore = tScore - modelOut.pval = pval - modelOut.degreesOfFreedom = safeToNonNegInt(X.ncol) - modelOut.trainingExamples = safeToNonNegInt(n) - - if (calcCommonStatistics){ - modelOut = calculateCommonStatistics(modelOut, drmTarget, residuals) - } - - // Let Statistics Get Calculated prior to assigning the summary - modelOut.summary = generateSummaryString(modelOut) - - modelOut - } - - - def calculateCommonStatistics[M[K] <: LinearRegressorModel[K]](model: M[K], - drmTarget: DrmLike[K], - residuals: DrmLike[K]): M[K] ={ - var modelOut = model - modelOut = FittnessTests.CoefficientOfDetermination(model, drmTarget, residuals) - modelOut = FittnessTests.MeanSquareError(model, residuals) - modelOut = FittnessTests.FTest(model, drmTarget) - - - modelOut - } - - def modelPostprocessing[M[K] <: LinearRegressorModel[K]](model: M[K], - X: DrmLike[K], - drmTarget: DrmLike[K], - drmXtXinv: Matrix): M[K] = { - var modelOut = model - if (calcStandardErrors) { - modelOut = calculateStandardError(X, drmTarget, drmXtXinv, model ) - } else { - modelOut.summary = "Coef.\t\tEstimate\n" + - (0 until X.ncol).map(i => s"X${i}\t${modelOut.beta(i)}").mkString("\n") - if (calcCommonStatistics) { // we do this in calcStandard errors to avoid calculating residuals twice - val residuals = drmTarget - (X %*% modelOut.beta) - // If rss is already set, then this will drop through to calculateCommonStatistics - modelOut = FittnessTests.calculateResidualSumOfSquares(modelOut,residuals) - modelOut = calculateCommonStatistics(modelOut, drmTarget, residuals) - } - - modelOut - } - - if (addIntercept) { - model.summary.replace(s"X${X.ncol - 1}", "(Intercept)") - model.addIntercept = true - } - model - } - - def generateSummaryString[M[K] <: LinearRegressorModel[K]](model: M[K]): String = { - - /* Model after R implementation ... - Call: - lm(formula = target ~ a + b + c + d, data = df1) - - Residuals: - 1 2 3 4 5 6 7 8 9 - -4.2799 0.5059 -2.2783 4.3765 -1.3455 0.7202 -1.8063 1.2889 2.8184 - - Coefficients: - Estimate Std. Error t value Pr(>|t|) - (Intercept) 163.179 51.915 3.143 0.0347 * - a -1.336 2.688 -0.497 0.6452 - b -13.158 5.394 -2.439 0.0713 . - c -4.153 1.785 -2.327 0.0806 . - d -5.680 1.887 -3.010 0.0395 * - --- - Signif. codes: 0 â***â 0.001 â**â 0.01 â*â 0.05 â.â 0.1 â â 1 - */ - - val k = model.beta.length - - // Using Formatted Print here to pretty print the columns - var summaryString = "\nCoef.\t\tEstimate\t\tStd. Error\t\tt-score\t\t\tPr(Beta=0)\n" + - (0 until k).map(i => "X%-3d\t\t%+5.5f\t\t%+5.5f\t\t%+5.5f\t\t%+5.5f".format(i,model.beta(i),model.se(i),model.tScore(i),model.pval(i))).mkString("\n") - if(calcCommonStatistics) { - summaryString += "\nF-statistic: " + model.fScore + " on " + (model.degreesOfFreedom - 1) + " and " + - (model.trainingExamples - model.degreesOfFreedom) + " DF, p-value: " + 0.009545 + "\n" - summaryString += s"\nMean Squared Error: ${model.mse}" - summaryString += s"\nR^2: ${model.r2}" - - } - summaryString - } -} http://git-wip-us.apache.org/repos/asf/mahout/blob/99a5358f/math-scala/src/main/scala/org/apache/mahout/math/algorithms/regression/OrdinaryLeastSquaresModel.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/algorithms/regression/OrdinaryLeastSquaresModel.scala b/math-scala/src/main/scala/org/apache/mahout/math/algorithms/regression/OrdinaryLeastSquaresModel.scala deleted file mode 100644 index fd9924e..0000000 --- a/math-scala/src/main/scala/org/apache/mahout/math/algorithms/regression/OrdinaryLeastSquaresModel.scala +++ /dev/null @@ -1,71 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.mahout.math.algorithms.regression - -import org.apache.mahout.math.drm.RLikeDrmOps._ -import org.apache.mahout.math.drm.DrmLike -import org.apache.mahout.math.scalabindings._ -import org.apache.mahout.math.scalabindings.RLikeOps._ - -class OrdinaryLeastSquaresModel[K] - extends LinearRegressorModel[K] { - // https://en.wikipedia.org/wiki/Ordinary_least_squares - - def predict(drmPredictors: DrmLike[K]): DrmLike[K] = { - var X = drmPredictors - if (addIntercept) { - X = X cbind 1 - } - X %*% beta - } - -} - -class OrdinaryLeastSquares[K] extends LinearRegressorFitter[K] { - - - def fit(drmFeatures: DrmLike[K], - drmTarget: DrmLike[K], - hyperparameters: (Symbol, Any)*): OrdinaryLeastSquaresModel[K] = { - - assert(drmTarget.ncol == 1, s"drmTarget must be a single column matrix, found ${drmTarget.ncol} columns") - var model = new OrdinaryLeastSquaresModel[K]() - setStandardHyperparameters(hyperparameters.toMap) - - - if (drmFeatures.nrow != drmTarget.nrow){ - throw new Exception(s"${drmFeatures.nrow} observations in features, ${drmTarget.nrow} observations in target, must be equal.") - } - - var X = drmFeatures - - if (addIntercept) { - X = X cbind 1 - } - - val XtX = (X.t %*% X).collect - val drmXtXinv = solve(XtX) - val drmXty = (X.t %*% drmTarget).collect // this fails when number of columns^2 size matrix won't fit in driver - model.beta = (drmXtXinv %*% drmXty)(::, 0) - - - this.modelPostprocessing(model, X, drmTarget, drmXtXinv) - } -} http://git-wip-us.apache.org/repos/asf/mahout/blob/99a5358f/math-scala/src/main/scala/org/apache/mahout/math/algorithms/regression/RegressorModel.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/algorithms/regression/RegressorModel.scala b/math-scala/src/main/scala/org/apache/mahout/math/algorithms/regression/RegressorModel.scala deleted file mode 100644 index aa3dad4..0000000 --- a/math-scala/src/main/scala/org/apache/mahout/math/algorithms/regression/RegressorModel.scala +++ /dev/null @@ -1,66 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.mahout.math.algorithms.regression - -import org.apache.mahout.math.algorithms.{SupervisedFitter, SupervisedModel} -import org.apache.mahout.math.drm.DrmLike - -trait RegressorModel[K] extends SupervisedModel[K] { - - def predict(drmPredictors: DrmLike[K]): DrmLike[K] - - var addIntercept: Boolean = _ - // Common Applicable Tests- here only for convenience. - var mse: Double = _ - var r2: Double = _ - var fpval: Double = _ - // default rss to a negative number to ensure rss gets set. - var rss:Double = -9999.0 - var fScore: Double = _ - var degreesOfFreedom: Int = _ - var trainingExamples :Int = _ - - /** - * Syntatictic sugar for fetching test results. Will Return test result if it exists, otherwise None - * @param testSymbol - symbol of the test result to fetch, e.g. `'mse` - * @tparam T - The Type - * @return - */ - def getTestResult[T](testSymbol: Symbol): Option[T] = { - Some(testResults.get(testSymbol).asInstanceOf[T]) - } -} - -trait RegressorFitter[K] extends SupervisedFitter[K, RegressorModel[K]] { - - var addIntercept: Boolean = _ - - def fitPredict(drmX: DrmLike[K], - drmTarget: DrmLike[K], - hyperparameters: (Symbol, Any)* ): DrmLike[K] = { - - model = this.fit(drmX, drmTarget, hyperparameters: _* ) - model.predict(drmX) - } - - // used to store the model if `fitTransform` method called - var model: RegressorModel[K] = _ - -} http://git-wip-us.apache.org/repos/asf/mahout/blob/99a5358f/math-scala/src/main/scala/org/apache/mahout/math/algorithms/regression/tests/AutocorrelationTests.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/algorithms/regression/tests/AutocorrelationTests.scala b/math-scala/src/main/scala/org/apache/mahout/math/algorithms/regression/tests/AutocorrelationTests.scala deleted file mode 100644 index 2b16b74..0000000 --- a/math-scala/src/main/scala/org/apache/mahout/math/algorithms/regression/tests/AutocorrelationTests.scala +++ /dev/null @@ -1,57 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.mahout.math.algorithms.regression.tests - -import org.apache.mahout.math.algorithms.regression.RegressorModel -import org.apache.mahout.math.drm._ -import org.apache.mahout.math.drm.DrmLike -import org.apache.mahout.math.drm.RLikeDrmOps._ -import org.apache.mahout.math.function.Functions.SQUARE -import org.apache.mahout.math.scalabindings.RLikeOps._ -import scala.language.higherKinds - -object AutocorrelationTests { - - //https://en.wikipedia.org/wiki/Durbin%E2%80%93Watson_statistic - /* - To test for positive autocorrelation at significance α, the test statistic d is compared to lower and upper critical values (dL,α and dU,α): - If d < dL,α, there is statistical evidence that the error terms are positively autocorrelated. - If d > dU,α, there is no statistical evidence that the error terms are positively autocorrelated. - If dL,α < d < dU,α, the test is inconclusive. - - Rule of Thumb: - d < 2 : positive auto-correlation - d = 2 : no auto-correlation - d > 2 : negative auto-correlation - */ - def DurbinWatson[R[K] <: RegressorModel[K], K](model: R[K], residuals: DrmLike[K]): R[K] = { - - val n = safeToNonNegInt(residuals.nrow) - val e: DrmLike[K] = residuals(1 until n , 0 until 1) - val e_t_1: DrmLike[K] = residuals(0 until n - 1, 0 until 1) - val numerator = (e - e_t_1).assign(SQUARE).colSums() - val denominator = residuals.assign(SQUARE).colSums() - val dw = numerator / denominator - model.testResults += ('durbinWatsonTestStatistic â dw.get(0)) - model.summary += s"\nDurbin Watson Test Statistic: ${dw.toString}" - model - } - -} http://git-wip-us.apache.org/repos/asf/mahout/blob/99a5358f/math-scala/src/main/scala/org/apache/mahout/math/algorithms/regression/tests/FittnessTests.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/algorithms/regression/tests/FittnessTests.scala b/math-scala/src/main/scala/org/apache/mahout/math/algorithms/regression/tests/FittnessTests.scala deleted file mode 100644 index c2d634b..0000000 --- a/math-scala/src/main/scala/org/apache/mahout/math/algorithms/regression/tests/FittnessTests.scala +++ /dev/null @@ -1,133 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.mahout.math.algorithms.regression.tests - - - - - -import org.apache.commons.math3.distribution.FDistribution -import org.apache.mahout.math.algorithms.regression.RegressorModel -import org.apache.mahout.math.algorithms.preprocessing.MeanCenter -import org.apache.mahout.math.drm.DrmLike -import org.apache.mahout.math.function.Functions.SQUARE -import org.apache.mahout.math.scalabindings.RLikeOps._ -import org.apache.mahout.math.drm.RLikeDrmOps._ - -import scala.language.higherKinds - -object FittnessTests { - - // https://en.wikipedia.org/wiki/Coefficient_of_determination - def CoefficientOfDetermination[R[K] <: RegressorModel[K], K](model: R[K], - drmTarget: DrmLike[K], - residuals: DrmLike[K]): R[K] = { - val sumSquareResiduals = residuals.assign(SQUARE).sum - val mc = new MeanCenter() - val totalResiduals = mc.fitTransform(drmTarget) - val sumSquareTotal = totalResiduals.assign(SQUARE).sum - val r2 = 1 - (sumSquareResiduals / sumSquareTotal) - model.r2 = r2 - model.testResults += ('r2 -> r2) // need setResult and setSummary method incase you change in future, also to initialize map if non exists or update value if it does - //model.summary += s"\nR^2: ${r2}" - model - } - - // https://en.wikipedia.org/wiki/Mean_squared_error - def MeanSquareError[R[K] <: RegressorModel[K], K](model: R[K], residuals: DrmLike[K]): R[K] = { - // TODO : I think mse denom should be (row - col) ?? <-- https://en.wikipedia.org/wiki/Mean_squared_error see regression section - val mse = residuals.assign(SQUARE).sum / residuals.nrow - model.mse = mse - model.testResults += ('mse -> mse) - //model.summary += s"\nMean Squared Error: ${mse}" - model - } - - // Since rss is needed for multiple test statistics, use this function to cache this value - def calculateResidualSumOfSquares[R[K] <: RegressorModel[K], K](model: R[K],residuals: DrmLike[K]) : R[K] = { - // This is a check so that model.rss isnt unnecessarily computed - // by default setting this value to negative, so that the first time its garaunteed to evaluate. - if (model.rss < 0) { - val ete = (residuals.t %*% residuals).collect // 1x1 - model.rss = ete(0, 0) - } - model - } - - - // https://en.wikipedia.org/wiki/F-test - /* - # R Prototype - # Cereal Dataframe - df1 <- data.frame( - "X0" = c(1,1,1,1,1,1,1,1,1), - "a" = c(2,1,1,2,1,2,6,3,3), - "b" = c( 2,2,1,1,2,1,2,2,3), - "c" = c( 10.5,12,12, 11,12, 16,17, 13,13), - "d" = c( 10,12,13,13,11,8, 1, 7, 4), - "target" = c( 29.509541,18.042851,22.736446,32.207582,21.871292,36.187559,50.764999,40.400208,45.811716)) - - # Create linear regression models adding features one by one - lrfit0 <- lm(data=df1, formula = target ~ 1 ) - lrfit1 <- lm(data=df1, formula = target ~ a ) - lrfit2 <- lm(data=df1, formula = target ~ a + b ) - lrfit3 <- lm(data=df1, formula = target ~ a + b + c ) - lrfit4 <- lm(data=df1, formula = target ~ a + b + c + d) - - ###################################### - # Fscore Calculation - ###################################### - - # So in the anova report using lm ... - # These are the residual sum of squares for each model - rssint <- sum(lrfit0$residuals^2) - rssa <- sum(lrfit1$residuals^2) - rssb <- sum(lrfit2$residuals^2) - rssc <- sum(lrfit3$residuals^2) - rssd <- sum(lrfit4$residuals^2) - - #Ftest in overall model - (rssint - rssd)/4 / (rssd/4) # g = 4, n - g - 1 = 4 - # Compare with R - summary(lrfit4) - - */ - def FTest[R[K] <: RegressorModel[K], K](model: R[K] , drmTarget: DrmLike[K]): R[K] = { - - val targetMean: Double = drmTarget.colMeans().get(0) - - // rssint is the Residual Sum of Squares for model using only based on the intercept - val rssint: Double = ((drmTarget - targetMean ).t %*% (drmTarget - targetMean)).zSum() - // K-1 is model.degreesOfFreedom-1 - // N-K is model.trainingExamples - model.degreesOfFreedom - - val fScore = ((rssint - model.rss) / (model.degreesOfFreedom-1) / ( model.rss / (model.trainingExamples - model.degreesOfFreedom))) - val fDist = new FDistribution(model.degreesOfFreedom-1,model.trainingExamples-model.degreesOfFreedom) - val fpval = 1.0 - fDist.cumulativeProbability(fScore) - model.fpval = fpval - - model.fScore = fScore - model.testResults += ('fScore -> fScore) - //model.summary += s"\nFscore : ${fScore}" - model - } - - -} http://git-wip-us.apache.org/repos/asf/mahout/blob/99a5358f/math-scala/src/main/scala/org/apache/mahout/math/backend/Backend.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/backend/Backend.scala b/math-scala/src/main/scala/org/apache/mahout/math/backend/Backend.scala deleted file mode 100644 index 9dfb7f2..0000000 --- a/math-scala/src/main/scala/org/apache/mahout/math/backend/Backend.scala +++ /dev/null @@ -1,33 +0,0 @@ -package org.apache.mahout.math.backend - -import org.apache.mahout.math.backend.jvm.JvmBackend - -import collection._ -import scala.reflect.{ClassTag, classTag} -import jvm.JvmBackend - -/** - * == Overview == - * - * Backend representing collection of in-memory solvers or distributed operators. - * - * == Note to implementors == - * - * Backend is expected to initialize & verify its own viability lazily either upon first time the - * class is loaded, or upon the first invocation of any of its methods. After that, the value of - * [[Backend.isAvailable]] must be cached and defined. - * - * A Backend is also a [[SolverFactory]] of course in a sense that it enumerates solvers made - * available via the backend. - */ -trait Backend extends SolverFactory { - - /** - * If backend has loaded (lazily) ok and verified its availability/functionality, - * this must return `true`. - * - * @return `true` - */ - def isAvailable: Boolean - -} http://git-wip-us.apache.org/repos/asf/mahout/blob/99a5358f/math-scala/src/main/scala/org/apache/mahout/math/backend/RootSolverFactory.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/backend/RootSolverFactory.scala b/math-scala/src/main/scala/org/apache/mahout/math/backend/RootSolverFactory.scala deleted file mode 100644 index 0904ea5..0000000 --- a/math-scala/src/main/scala/org/apache/mahout/math/backend/RootSolverFactory.scala +++ /dev/null @@ -1,84 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.mahout.math.backend - -import org.apache.mahout.logging._ -import org.apache.mahout.math.backend.jvm.JvmBackend -import org.apache.mahout.math.scalabindings.{MMBinaryFunc, MMul, _} - -import scala.collection._ -import scala.reflect.{ClassTag, classTag} - - -final object RootSolverFactory extends SolverFactory { - - import org.apache.mahout.math.backend.incore._ - - private implicit val logger = getLog(RootSolverFactory.getClass) - - private val solverTagsToScan = - classTag[MMulSolver] :: - classTag[MMulSparseSolver] :: - classTag[MMulDenseSolver] :: - Nil - - private val defaultBackendPriority = - JvmBackend.getClass.getName :: Nil - - private def initBackends(): Unit = { - - } - - // TODO: MAHOUT-1909: Cache Modular Backend solvers after probing - // That is, lazily initialize the map, query backends, and build resolution rules. - override protected[backend] val solverMap = new mutable.HashMap[ClassTag[_], Any]() - - validateMap() - - // Default solver is JVM - var clazz: MMBinaryFunc = MMul - - // TODO: Match on implicit Classtag - - def getOperator[C: ClassTag]: MMBinaryFunc = { - - try { - logger.info("Creating org.apache.mahout.viennacl.opencl.GPUMMul solver") - clazz = Class.forName("org.apache.mahout.viennacl.opencl.GPUMMul$").getField("MODULE$").get(null).asInstanceOf[MMBinaryFunc] - logger.info("Successfully created org.apache.mahout.viennacl.opencl.GPUMMul solver") - - } catch { - case x: Exception => - logger.info("Unable to create class GPUMMul: attempting OpenMP version") - try { - // Attempt to instantiate the OpenMP version, assuming weâve - // created a separate OpenMP-only module (none exist yet) - logger.info("Creating org.apache.mahout.viennacl.openmp.OMPMMul solver") - clazz = Class.forName("org.apache.mahout.viennacl.openmp.OMPMMul$").getField("MODULE$").get(null).asInstanceOf[MMBinaryFunc] - logger.info("Successfully created org.apache.mahout.viennacl.openmp.OMPMMul solver") - - } catch { - case xx: Exception => - logger.info(xx.getMessage) - // Fall back to JVM; don't need to dynamically assign since MMul is in the same package. - logger.info("Unable to create class OMPMMul: falling back to java version") - clazz = MMul - } - } - clazz - } -} http://git-wip-us.apache.org/repos/asf/mahout/blob/99a5358f/math-scala/src/main/scala/org/apache/mahout/math/backend/SolverFactory.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/backend/SolverFactory.scala b/math-scala/src/main/scala/org/apache/mahout/math/backend/SolverFactory.scala deleted file mode 100644 index 756b971..0000000 --- a/math-scala/src/main/scala/org/apache/mahout/math/backend/SolverFactory.scala +++ /dev/null @@ -1,55 +0,0 @@ -package org.apache.mahout.math.backend - -import scala.collection.{Iterable, Map} -import scala.reflect.{ClassTag, classTag} - -/** - * == Overview == - * - * Solver factory is an essence a collection of lazily initialized strategy singletons solving some - * (any) problem in context of the Mahout project. - * - * We intend to use it _mainly_ for problems that are super-linear problems, and often involve more - * than one argument (operand). - * - * The main method to probe for an available solver is [[RootSolverFactory.getSolver]]. - */ -trait SolverFactory { - /** - * We take an implicit context binding, the classTag, of the trait of the solver desired. - * - * == Note to callers == - * - * Due to Scala semantics, it is usually not enough to request a solver via merely {{{ - * val s:SolverType = backend.getSolver - * }}} but instead requires an explicit solver tag, i.e.: {{{ - * val s = backend.getSolver[SolverType] - * }}} - * - * - */ - def getSolver[S: ClassTag]: Option[S] = { - solverMap.get(classTag[S]).flatMap { - _ match { - case s: S â Some(s) - case _ â None - } - } - } - - lazy val availableSolverTags: Iterable[ClassTag[_]] = solverMap.keySet - - - - protected[backend] val solverMap: Map[ClassTag[_], Any] - - protected[backend] def validateMap(): Unit = { - - for ((tag, instance) â solverMap) { - require(tag.runtimeClass.isAssignableFrom(instance.getClass), - s"Solver implementation class `${instance.getClass.getName}` is not a subclass of solver trait `${tag}`.") - - } - } - -} http://git-wip-us.apache.org/repos/asf/mahout/blob/99a5358f/math-scala/src/main/scala/org/apache/mahout/math/backend/incore/package.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/backend/incore/package.scala b/math-scala/src/main/scala/org/apache/mahout/math/backend/incore/package.scala deleted file mode 100644 index 1bb4480..0000000 --- a/math-scala/src/main/scala/org/apache/mahout/math/backend/incore/package.scala +++ /dev/null @@ -1,17 +0,0 @@ -package org.apache.mahout.math.backend - -import org.apache.mahout.math.scalabindings.{MMBinaryFunc, MMUnaryFunc} - -package object incore { - - trait MMulSolver extends MMBinaryFunc - trait MMulDenseSolver extends MMulSolver - trait MMulSparseSolver extends MMulSolver - trait AAtSolver extends MMUnaryFunc - trait AAtDenseSolver extends AAtSolver - trait AAtSparseSolver extends AAtSolver - trait AtASolver extends MMUnaryFunc - trait AtADenseSolver extends AtASolver - trait AtASparseSolver extends AtASolver - -} http://git-wip-us.apache.org/repos/asf/mahout/blob/99a5358f/math-scala/src/main/scala/org/apache/mahout/math/backend/jvm/JvmBackend.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/backend/jvm/JvmBackend.scala b/math-scala/src/main/scala/org/apache/mahout/math/backend/jvm/JvmBackend.scala deleted file mode 100644 index 6588243..0000000 --- a/math-scala/src/main/scala/org/apache/mahout/math/backend/jvm/JvmBackend.scala +++ /dev/null @@ -1,51 +0,0 @@ -package org.apache.mahout.math.backend.jvm - -import org.apache.mahout.math._ -import scalabindings._ -import RLikeOps._ -import org.apache.mahout.math.backend.Backend -import org.apache.mahout.math.scalabindings.MMul - -import scala.collection.Map -import scala.reflect._ - -object JvmBackend extends Backend { - - import org.apache.mahout.math.backend.incore._ - - /** - * If backend has loaded (lazily) ok and verified its availability/functionality, - * this must return `true`. - * - * @return `true` - */ - override def isAvailable: Boolean = true - - // TODO: In a future release, Refactor MMul optimizations into this object - override protected[backend] val solverMap: Map[ClassTag[_], Any] = Map( - classTag[MMulSolver] â MMul - // classTag[MMulDenseSolver] â MMul, - // classTag[MMulSparseSolver] â MMul, - // classTag[AtASolver] â new AtASolver { - // override def apply(a: Matrix, r: Option[Matrix]): Matrix = MMul(a.t, a, r) - // }// , - // classTag[AtADenseSolver] â { (a: Matrix, r: Option[Matrix]) â MMul(a.t, a, r) }, - // classTag[AtASparseSolver] â { (a: Matrix, r: Option[Matrix]) â MMul(a.t, a, r) }, - // classTag[AAtSolver] â { (a: Matrix, r: Option[Matrix]) â MMul(a, a.t, r) }, - // classTag[AAtDenseSolver] â { (a: Matrix, r: Option[Matrix]) â MMul(a, a.t, r) }, - // classTag[AAtSparseSolver] â { (a: Matrix, r: Option[Matrix]) â MMul(a, a.t, r) } - ) - validateMap() - - private val mmulSolver = new MMulSolver with MMulDenseSolver with MMulSparseSolver { - override def apply(a: Matrix, b: Matrix, r: Option[Matrix]): Matrix = MMul(a, b, r) - } - - private val ataSolver = new AtASolver with AtADenseSolver with AtASparseSolver { - override def apply(a: Matrix, r: Option[Matrix]): Matrix = MMul(a.t, a, r) - } - - private val aatSolver = new AAtSolver { - override def apply(a: Matrix, r: Option[Matrix]): Matrix = MMul(a, a.t, r) - } -} http://git-wip-us.apache.org/repos/asf/mahout/blob/99a5358f/math-scala/src/main/scala/org/apache/mahout/math/cf/SimilarityAnalysis.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/cf/SimilarityAnalysis.scala b/math-scala/src/main/scala/org/apache/mahout/math/cf/SimilarityAnalysis.scala deleted file mode 100644 index f69bf81..0000000 --- a/math-scala/src/main/scala/org/apache/mahout/math/cf/SimilarityAnalysis.scala +++ /dev/null @@ -1,453 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.mahout.math.cf - -import org.apache.mahout.math._ -import org.apache.mahout.math.indexeddataset.IndexedDataset -import scalabindings._ -import RLikeOps._ -import drm._ -import RLikeDrmOps._ -import scala.collection.JavaConversions._ -import org.apache.mahout.math.stats.LogLikelihood -import collection._ -import org.apache.mahout.math.function.{VectorFunction, Functions} - -import scala.util.Random - - -/** - * Based on "Ted Dunnning & Ellen Friedman: Practical Machine Learning, Innovations in Recommendation", - * available at http://www.mapr.com/practical-machine-learning - * - * see also "Sebastian Schelter, Christoph Boden, Volker Markl: - * Scalable Similarity-Based Neighborhood Methods with MapReduce - * ACM Conference on Recommender Systems 2012" - */ -object SimilarityAnalysis extends Serializable { - - /** Compares (Int,Double) pairs by the second value */ - private val orderByScore = Ordering.fromLessThan[(Int, Double)] { case ((_, score1), (_, score2)) => score1 > score2} - - lazy val defaultParOpts = ParOpts() - - /** - * Calculates item (column-wise) similarity using the log-likelihood ratio on A'A, A'B, A'C, ... - * and returns a list of similarity and cross-similarity matrices - * - * @param drmARaw Primary interaction matrix - * @param randomSeed when kept to a constant will make repeatable downsampling - * @param maxInterestingItemsPerThing number of similar items to return per item, default: 50 - * @param maxNumInteractions max number of interactions after downsampling, default: 500 - * @param parOpts partitioning params for drm.par(...) - * @return a list of [[org.apache.mahout.math.drm.DrmLike]] containing downsampled DRMs for cooccurrence and - * cross-cooccurrence - */ - def cooccurrences( - drmARaw: DrmLike[Int], - randomSeed: Int = 0xdeadbeef, - maxInterestingItemsPerThing: Int = 50, - maxNumInteractions: Int = 500, - drmBs: Array[DrmLike[Int]] = Array(), - parOpts: ParOpts = defaultParOpts) - : List[DrmLike[Int]] = { - - implicit val distributedContext = drmARaw.context - - // backend partitioning defaults to 'auto', which is often better decided by calling funciton - // todo: this should ideally be different per drm - drmARaw.par( min = parOpts.minPar, exact = parOpts.exactPar, auto = parOpts.autoPar) - - // Apply selective downsampling, pin resulting matrix - val drmA = sampleDownAndBinarize(drmARaw, randomSeed, maxNumInteractions) - - // num users, which equals the maximum number of interactions per item - val numUsers = drmA.nrow.toInt - - // Compute & broadcast the number of interactions per thing in A - val bcastInteractionsPerItemA = drmBroadcast(drmA.numNonZeroElementsPerColumn) - - // Compute cooccurrence matrix A'A - val drmAtA = drmA.t %*% drmA - - // Compute loglikelihood scores and sparsify the resulting matrix to get the similarity matrix - val drmSimilarityAtA = computeSimilarities(drmAtA, numUsers, maxInterestingItemsPerThing, - bcastInteractionsPerItemA, bcastInteractionsPerItemA, crossCooccurrence = false) - - var similarityMatrices = List(drmSimilarityAtA) - - // Now look at cross cooccurrences - for (drmBRaw <- drmBs) { - // backend partitioning defaults to 'auto', which is often better decided by calling funciton - // todo: this should ideally be different per drm - drmARaw.par( min = parOpts.minPar, exact = parOpts.exactPar, auto = parOpts.autoPar) - - // Down-sample and pin other interaction matrix - val drmB = sampleDownAndBinarize(drmBRaw, randomSeed, maxNumInteractions).checkpoint() - - // Compute & broadcast the number of interactions per thing in B - val bcastInteractionsPerThingB = drmBroadcast(drmB.numNonZeroElementsPerColumn) - - // Compute cross-cooccurrence matrix A'B - val drmAtB = drmA.t %*% drmB - - val drmSimilarityAtB = computeSimilarities(drmAtB, numUsers, maxInterestingItemsPerThing, - bcastInteractionsPerItemA, bcastInteractionsPerThingB) - - similarityMatrices = similarityMatrices :+ drmSimilarityAtB - - drmB.uncache() - } - - // Unpin downsampled interaction matrix - drmA.uncache() - - // Return list of similarity matrices - similarityMatrices - } - - /** - * Calculates item (column-wise) similarity using the log-likelihood ratio on A'A, A'B, A'C, ... and returns - * a list of similarity and cross-similarity matrices. Somewhat easier to use method, which handles the ID - * dictionaries correctly - * - * @param indexedDatasets first in array is primary/A matrix all others are treated as secondary - * @param randomSeed use default to make repeatable, otherwise pass in system time or some randomizing seed - * @param maxInterestingItemsPerThing max similarities per items - * @param maxNumInteractions max number of input items per item - * @param parOpts partitioning params for drm.par(...) - * @return a list of [[org.apache.mahout.math.indexeddataset.IndexedDataset]] containing downsampled - * IndexedDatasets for cooccurrence and cross-cooccurrence - */ - def cooccurrencesIDSs( - indexedDatasets: Array[IndexedDataset], - randomSeed: Int = 0xdeadbeef, - maxInterestingItemsPerThing: Int = 50, - maxNumInteractions: Int = 500, - parOpts: ParOpts = defaultParOpts): - List[IndexedDataset] = { - val drms = indexedDatasets.map(_.matrix.asInstanceOf[DrmLike[Int]]) - val primaryDrm = drms(0) - val secondaryDrms = drms.drop(1) - val coocMatrices = cooccurrences(primaryDrm, randomSeed, maxInterestingItemsPerThing, - maxNumInteractions, secondaryDrms, parOpts) - val retIDSs = coocMatrices.iterator.zipWithIndex.map { - case( drm, i ) => - indexedDatasets(0).create(drm, indexedDatasets(0).columnIDs, indexedDatasets(i).columnIDs) - } - retIDSs.toList - } - - /** - * Calculates item (column-wise) similarity using the log-likelihood ratio on A'A, A'B, A'C, ... and returns - * a list of similarity and cross-occurrence matrices. Somewhat easier to use method, which handles the ID - * dictionaries correctly and contains info about downsampling in each model calc. - * - * @param datasets first in array is primary/A matrix all others are treated as secondary, includes information - * used to downsample the input drm as well as the output llr(A'A), llr(A'B). The information - * is contained in each dataset in the array and applies to the model calculation of A' with - * the dataset. Todo: ignoring absolute threshold for now. - * @param randomSeed use default to make repeatable, otherwise pass in system time or some randomizing seed - * @param parOpts partitioning params for drm.par(...) - * @return a list of [[org.apache.mahout.math.indexeddataset.IndexedDataset]] containing downsampled - * IndexedDatasets for cooccurrence and cross-cooccurrence - */ - def crossOccurrenceDownsampled( - datasets: List[DownsamplableCrossOccurrenceDataset], - randomSeed: Int = 0xdeadbeef): - List[IndexedDataset] = { - - - val crossDatasets = datasets.drop(1) // drop A - val primaryDataset = datasets.head // use A throughout - val drmARaw = primaryDataset.iD.matrix - - implicit val distributedContext = primaryDataset.iD.matrix.context - - // backend partitioning defaults to 'auto', which is often better decided by calling funciton - val parOptsA = primaryDataset.parOpts.getOrElse(defaultParOpts) - drmARaw.par( min = parOptsA.minPar, exact = parOptsA.exactPar, auto = parOptsA.autoPar) - - // Apply selective downsampling, pin resulting matrix - val drmA = sampleDownAndBinarize(drmARaw, randomSeed, primaryDataset.maxElementsPerRow) - - // num users, which equals the maximum number of interactions per item - val numUsers = drmA.nrow.toInt - - // Compute & broadcast the number of interactions per thing in A - val bcastInteractionsPerItemA = drmBroadcast(drmA.numNonZeroElementsPerColumn) - - // Compute cooccurrence matrix A'A - val drmAtA = drmA.t %*% drmA - - // Compute loglikelihood scores and sparsify the resulting matrix to get the similarity matrix - val drmSimilarityAtA = computeSimilarities(drmAtA, numUsers, primaryDataset.maxInterestingElements, - bcastInteractionsPerItemA, bcastInteractionsPerItemA, crossCooccurrence = false, - minLLROpt = primaryDataset.minLLROpt) - - var similarityMatrices = List(drmSimilarityAtA) - - // Now look at cross cooccurrences - for (dataset <- crossDatasets) { - // backend partitioning defaults to 'auto', which is often better decided by calling funciton - val parOptsB = dataset.parOpts.getOrElse(defaultParOpts) - dataset.iD.matrix.par(min = parOptsB.minPar, exact = parOptsB.exactPar, auto = parOptsB.autoPar) - - // Downsample and pin other interaction matrix - val drmB = sampleDownAndBinarize(dataset.iD.matrix, randomSeed, dataset.maxElementsPerRow).checkpoint() - - // Compute & broadcast the number of interactions per thing in B - val bcastInteractionsPerThingB = drmBroadcast(drmB.numNonZeroElementsPerColumn) - - // Compute cross-cooccurrence matrix A'B - val drmAtB = drmA.t %*% drmB - - val drmSimilarityAtB = computeSimilarities(drmAtB, numUsers, dataset.maxInterestingElements, - bcastInteractionsPerItemA, bcastInteractionsPerThingB, minLLROpt = dataset.minLLROpt) - - similarityMatrices = similarityMatrices :+ drmSimilarityAtB - - drmB.uncache() - } - - // Unpin downsampled interaction matrix - drmA.uncache() - - // Return list of datasets - val retIDSs = similarityMatrices.iterator.zipWithIndex.map { - case( drm, i ) => - datasets(0).iD.create(drm, datasets(0).iD.columnIDs, datasets(i).iD.columnIDs) - } - retIDSs.toList - - } - - /** - * Calculates row-wise similarity using the log-likelihood ratio on AA' and returns a DRM of rows and similar rows - * - * @param drmARaw Primary interaction matrix - * @param randomSeed when kept to a constant will make repeatable downsampling - * @param maxInterestingSimilaritiesPerRow number of similar items to return per item, default: 50 - * @param maxNumInteractions max number of interactions after downsampling, default: 500 - * @param parOpts partitioning options used for drm.par(...) - */ - def rowSimilarity( - drmARaw: DrmLike[Int], - randomSeed: Int = 0xdeadbeef, - maxInterestingSimilaritiesPerRow: Int = 50, - maxNumInteractions: Int = 500, - parOpts: ParOpts = defaultParOpts): DrmLike[Int] = { - - implicit val distributedContext = drmARaw.context - - // backend partitioning defaults to 'auto', which is often better decided by calling funciton - // todo: should this ideally be different per drm? - drmARaw.par(min = parOpts.minPar, exact = parOpts.exactPar, auto = parOpts.autoPar) - - // Apply selective downsampling, pin resulting matrix - val drmA = sampleDownAndBinarize(drmARaw, randomSeed, maxNumInteractions) - - // num columns, which equals the maximum number of interactions per item - val numCols = drmA.ncol - - // Compute & broadcast the number of interactions per row in A - val bcastInteractionsPerItemA = drmBroadcast(drmA.numNonZeroElementsPerRow) - - // Compute row similarity cooccurrence matrix AA' - val drmAAt = drmA %*% drmA.t - - // Compute loglikelihood scores and sparsify the resulting matrix to get the similarity matrix - val drmSimilaritiesAAt = computeSimilarities(drmAAt, numCols, maxInterestingSimilaritiesPerRow, - bcastInteractionsPerItemA, bcastInteractionsPerItemA, crossCooccurrence = false) - - drmSimilaritiesAAt - } - - /** - * Calculates row-wise similarity using the log-likelihood ratio on AA' and returns a drm of rows and similar rows. - * Uses IndexedDatasets, which handle external ID dictionaries properly - * - * @param indexedDataset compare each row to every other - * @param randomSeed use default to make repeatable, otherwise pass in system time or some randomizing seed - * @param maxInterestingSimilaritiesPerRow max elements returned in each row - * @param maxObservationsPerRow max number of input elements to use - */ - def rowSimilarityIDS(indexedDataset: IndexedDataset, randomSeed: Int = 0xdeadbeef, - maxInterestingSimilaritiesPerRow: Int = 50, - maxObservationsPerRow: Int = 500): - IndexedDataset = { - val coocMatrix = rowSimilarity(indexedDataset.matrix, randomSeed, maxInterestingSimilaritiesPerRow, - maxObservationsPerRow) - indexedDataset.create(coocMatrix, indexedDataset.rowIDs, indexedDataset.rowIDs) - } - - /** Compute loglikelihood ratio see http://tdunning.blogspot.de/2008/03/surprise-and-coincidence.html for details */ - def logLikelihoodRatio(numInteractionsWithA: Long, numInteractionsWithB: Long, - numInteractionsWithAandB: Long, numInteractions: Long) = { - - val k11 = numInteractionsWithAandB - val k12 = numInteractionsWithA - numInteractionsWithAandB - val k21 = numInteractionsWithB - numInteractionsWithAandB - val k22 = numInteractions - numInteractionsWithA - numInteractionsWithB + numInteractionsWithAandB - - LogLikelihood.logLikelihoodRatio(k11, k12, k21, k22) - - } - - def computeSimilarities( - drm: DrmLike[Int], - numUsers: Int, - maxInterestingItemsPerThing: Int, - bcastNumInteractionsB: BCast[Vector], - bcastNumInteractionsA: BCast[Vector], - crossCooccurrence: Boolean = true, - minLLROpt: Option[Double] = None) = { - - //val minLLR = minLLROpt.getOrElse(0.0d) // accept all values if not specified - - val minLLR = minLLROpt - - drm.mapBlock() { - case (keys, block) => - - val llrBlock = block.like() - val numInteractionsB: Vector = bcastNumInteractionsB - val numInteractionsA: Vector = bcastNumInteractionsA - - for (index <- 0 until keys.size) { - - val thingB = keys(index) - - // PriorityQueue to select the top-k items - val topItemsPerThing = new mutable.PriorityQueue[(Int, Double)]()(orderByScore) - - block(index, ::).nonZeroes().foreach { elem => - val thingA = elem.index - val cooccurrences = elem.get - - // exclude co-occurrences of the item with itself - if (crossCooccurrence || thingB != thingA) { - // Compute loglikelihood ratio - val llr = logLikelihoodRatio(numInteractionsB(thingB).toLong, numInteractionsA(thingA).toLong, - cooccurrences.toLong, numUsers) - - val candidate = thingA -> llr - - // legacy hadoop code maps values to range (0..1) via - // val normailizedLLR = 1.0 - (1.0 / (1.0 + llr)) - // val candidate = thingA -> normailizedLLR - - // Enqueue item with score, if belonging to the top-k - if(minLLR.isEmpty || llr >= minLLR.get) { // llr threshold takes precedence over max per row - if (topItemsPerThing.size < maxInterestingItemsPerThing) { - topItemsPerThing.enqueue(candidate) - } else if (orderByScore.lt(candidate, topItemsPerThing.head)) { - topItemsPerThing.dequeue() - topItemsPerThing.enqueue(candidate) - } - } - } - } - - // Add top-k interesting items to the output matrix - topItemsPerThing.dequeueAll.foreach { - case (otherThing, llrScore) => - llrBlock(index, otherThing) = llrScore - } - } - - keys -> llrBlock - } - } - - /** - * Selectively downsample rows and items with an anomalous amount of interactions, inspired by - * https://github.com/tdunning/in-memory-cooccurrence/blob/master/src/main/java/com/tdunning/cooc/Analyze.java - * - * additionally binarizes input matrix, as we're only interesting in knowing whether interactions happened or not - * - * @param drmM matrix to downsample - * @param seed random number generator seed, keep to a constant if repeatability is neccessary - * @param maxNumInteractions number of elements in a row of the returned matrix - * @return the downsampled DRM - */ - def sampleDownAndBinarize(drmM: DrmLike[Int], seed: Int, maxNumInteractions: Int) = { - - implicit val distributedContext = drmM.context - - // Pin raw interaction matrix - val drmI = drmM.checkpoint() - - // Broadcast vector containing the number of interactions with each thing - val bcastNumInteractions = drmBroadcast(drmI.numNonZeroElementsPerColumn) - - val downSampledDrmI = drmI.mapBlock() { - case (keys, block) => - val numInteractions: Vector = bcastNumInteractions - - // Use a hash of the unique first key to seed the RNG, makes this computation repeatable in case of - //failures - val random = new Random(MurmurHash.hash(keys(0), seed)) - - val downsampledBlock = block.like() - - // Downsample the interaction vector of each row - for (rowIndex <- 0 until keys.size) { - - val interactionsInRow = block(rowIndex, ::) - - val numInteractionsPerRow = interactionsInRow.getNumNonZeroElements() - - val perRowSampleRate = math.min(maxNumInteractions, numInteractionsPerRow) / numInteractionsPerRow - - interactionsInRow.nonZeroes().foreach { elem => - val numInteractionsWithThing = numInteractions(elem.index) - val perThingSampleRate = math.min(maxNumInteractions, numInteractionsWithThing) / numInteractionsWithThing - - if (random.nextDouble() <= math.min(perRowSampleRate, perThingSampleRate)) { - // We ignore the original interaction value and create a binary 0-1 matrix - // as we only consider whether interactions happened or did not happen - downsampledBlock(rowIndex, elem.index) = 1 - } - } - } - - keys -> downsampledBlock - } - - // Unpin raw interaction matrix - drmI.uncache() - - downSampledDrmI - } -} - -case class ParOpts( // this will contain the default `par` params except for auto = true - minPar: Int = -1, - exactPar: Int = -1, - autoPar: Boolean = true) - -/* Used to pass in data and params for downsampling the input data as well as output A'A, A'B, etc. */ -case class DownsamplableCrossOccurrenceDataset( - iD: IndexedDataset, - maxElementsPerRow: Int = 500, // usually items per user in the input dataset, used to ramdomly downsample - maxInterestingElements: Int = 50, // number of items/columns to keep in the A'A, A'B etc. where iD == A, B, C ... - minLLROpt: Option[Double] = None, // absolute threshold, takes precedence over maxInterestingElements if present - parOpts: Option[ParOpts] = None) // these can be set per dataset and are applied to each of the drms - // in crossOccurrenceDownsampled -
