Repository: mahout Updated Branches: refs/heads/master e5dca8599 -> 4b1c13332
MAHOUT-1659: Remove deprecated Lanczos solver from spectral clustering in mr-legacy, this closes #88 Project: http://git-wip-us.apache.org/repos/asf/mahout/repo Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/4b1c1333 Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/4b1c1333 Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/4b1c1333 Branch: refs/heads/master Commit: 4b1c133325da0119e693b69811a54a16cd77aa55 Parents: e5dca85 Author: Suneel Marthi <[email protected]> Authored: Mon Mar 30 02:29:00 2015 -0400 Committer: Suneel Marthi <[email protected]> Committed: Mon Mar 30 02:29:00 2015 -0400 ---------------------------------------------------------------------- CHANGELOG | 2 + .../clustering/display/DisplayClustering.java | 29 ++---- .../display/DisplaySpectralKMeans.java | 21 ++--- .../spectral/kmeans/SpectralKMeansDriver.java | 97 ++++++-------------- 4 files changed, 51 insertions(+), 98 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mahout/blob/4b1c1333/CHANGELOG ---------------------------------------------------------------------- diff --git a/CHANGELOG b/CHANGELOG index 6e15846..7b47a0f 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -2,6 +2,8 @@ Mahout Change Log Release 0.10.0 - unreleased + MAHOUT-1659: Remove deprecated Lanczos solver from spectral clustering in mr-legacy (Shannon Quinn) + MAHOUT-1612: NullPointerException happens during JSON output format for clusterdumper (smarthi, Manoj Awasthi) MAHOUT-1652: Java 7 update (smarthi) http://git-wip-us.apache.org/repos/asf/mahout/blob/4b1c1333/examples/src/main/java/org/apache/mahout/clustering/display/DisplayClustering.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/mahout/clustering/display/DisplayClustering.java b/examples/src/main/java/org/apache/mahout/clustering/display/DisplayClustering.java index f4c5429..ad85c6a 100644 --- a/examples/src/main/java/org/apache/mahout/clustering/display/DisplayClustering.java +++ b/examples/src/main/java/org/apache/mahout/clustering/display/DisplayClustering.java @@ -17,18 +17,14 @@ package org.apache.mahout.clustering.display; -import java.awt.BasicStroke; -import java.awt.Color; -import java.awt.Frame; -import java.awt.Graphics; -import java.awt.Graphics2D; -import java.awt.Toolkit; +import java.awt.*; import java.awt.event.WindowAdapter; import java.awt.event.WindowEvent; import java.awt.geom.AffineTransform; import java.awt.geom.Ellipse2D; import java.awt.geom.Rectangle2D; import java.io.IOException; +import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.List; @@ -43,8 +39,8 @@ import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Text; import org.apache.mahout.clustering.AbstractCluster; import org.apache.mahout.clustering.Cluster; -import org.apache.mahout.clustering.classify.WeightedVectorWritable; import org.apache.mahout.clustering.UncommonDistributions; +import org.apache.mahout.clustering.classify.WeightedVectorWritable; import org.apache.mahout.clustering.iterator.ClusterWritable; import org.apache.mahout.common.Pair; import org.apache.mahout.common.RandomUtils; @@ -58,9 +54,6 @@ import org.apache.mahout.math.VectorWritable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.collect.Lists; -import com.google.common.io.Closeables; - public class DisplayClustering extends Frame { private static final Logger log = LoggerFactory.getLogger(DisplayClustering.class); @@ -69,11 +62,11 @@ public class DisplayClustering extends Frame { protected static final int SIZE = 8; // screen size in inches - private static final Collection<Vector> SAMPLE_PARAMS = Lists.newArrayList(); + private static final Collection<Vector> SAMPLE_PARAMS = new ArrayList<>(); - protected static final List<VectorWritable> SAMPLE_DATA = Lists.newArrayList(); + protected static final List<VectorWritable> SAMPLE_DATA = new ArrayList<>(); - protected static final List<List<Cluster>> CLUSTERS = Lists.newArrayList(); + protected static final List<List<Cluster>> CLUSTERS = new ArrayList<>(); static final Color[] COLORS = { Color.red, Color.orange, Color.yellow, Color.green, Color.blue, Color.magenta, Color.lightGray }; @@ -198,7 +191,7 @@ public class DisplayClustering extends Frame { Path clusteredPointsPath = new Path(data, "clusteredPoints"); Path inputPath = new Path(clusteredPointsPath, "part-m-00000"); - Map<Integer,Color> colors = new HashMap<Integer,Color>(); + Map<Integer,Color> colors = new HashMap<>(); int point = 0; for (Pair<IntWritable,WeightedVectorWritable> record : new SequenceFileIterable<IntWritable,WeightedVectorWritable>( inputPath, new Configuration())) { @@ -318,19 +311,17 @@ public class DisplayClustering extends Frame { protected static void writeSampleData(Path output) throws IOException { Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(output.toUri(), conf); - SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf, output, Text.class, VectorWritable.class); - try { + + try (SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf, output, Text.class, VectorWritable.class)) { int i = 0; for (VectorWritable vw : SAMPLE_DATA) { writer.append(new Text("sample_" + i++), vw); } - } finally { - Closeables.close(writer, false); } } protected static List<Cluster> readClustersWritable(Path clustersIn) { - List<Cluster> clusters = Lists.newArrayList(); + List<Cluster> clusters = new ArrayList<>(); Configuration conf = new Configuration(); for (ClusterWritable value : new SequenceFileDirValueIterable<ClusterWritable>(clustersIn, PathType.LIST, PathFilters.logsCRCFilter(), conf)) { http://git-wip-us.apache.org/repos/asf/mahout/blob/4b1c1333/examples/src/main/java/org/apache/mahout/clustering/display/DisplaySpectralKMeans.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/mahout/clustering/display/DisplaySpectralKMeans.java b/examples/src/main/java/org/apache/mahout/clustering/display/DisplaySpectralKMeans.java index 00e654e..2b70749 100644 --- a/examples/src/main/java/org/apache/mahout/clustering/display/DisplaySpectralKMeans.java +++ b/examples/src/main/java/org/apache/mahout/clustering/display/DisplaySpectralKMeans.java @@ -19,12 +19,10 @@ package org.apache.mahout.clustering.display; import java.awt.Graphics; import java.awt.Graphics2D; -import java.io.File; +import java.io.BufferedWriter; +import java.io.FileWriter; import java.io.Writer; -import com.google.common.base.Charsets; -import com.google.common.io.Closeables; -import com.google.common.io.Files; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -63,21 +61,20 @@ public class DisplaySpectralKMeans extends DisplayClustering { if (!fs.exists(output)) { fs.mkdirs(output); } - Writer writer = null; - try { - writer = Files.newWriter(new File(affinities.toString()), Charsets.UTF_8); + + try (Writer writer = new BufferedWriter(new FileWriter(affinities.toString()))){ for (int i = 0; i < SAMPLE_DATA.size(); i++) { for (int j = 0; j < SAMPLE_DATA.size(); j++) { - writer.write(i + "," + j + ',' + measure.distance(SAMPLE_DATA.get(i).get(), SAMPLE_DATA.get(j).get()) + '\n'); + writer.write(i + "," + j + ',' + measure.distance(SAMPLE_DATA.get(i).get(), + SAMPLE_DATA.get(j).get()) + '\n'); } } - } finally { - Closeables.close(writer, false); } + int maxIter = 10; double convergenceDelta = 0.001; - SpectralKMeansDriver.run(new Configuration(), affinities, output, SAMPLE_DATA.size(), 3, measure, convergenceDelta, - maxIter, tempDir, false); + SpectralKMeansDriver.run(new Configuration(), affinities, output, SAMPLE_DATA.size(), 3, measure, + convergenceDelta, maxIter, tempDir); new DisplaySpectralKMeans(); } http://git-wip-us.apache.org/repos/asf/mahout/blob/4b1c1333/mrlegacy/src/main/java/org/apache/mahout/clustering/spectral/kmeans/SpectralKMeansDriver.java ---------------------------------------------------------------------- diff --git a/mrlegacy/src/main/java/org/apache/mahout/clustering/spectral/kmeans/SpectralKMeansDriver.java b/mrlegacy/src/main/java/org/apache/mahout/clustering/spectral/kmeans/SpectralKMeansDriver.java index 00c06da..427de91 100644 --- a/mrlegacy/src/main/java/org/apache/mahout/clustering/spectral/kmeans/SpectralKMeansDriver.java +++ b/mrlegacy/src/main/java/org/apache/mahout/clustering/spectral/kmeans/SpectralKMeansDriver.java @@ -5,9 +5,9 @@ * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -18,6 +18,7 @@ package org.apache.mahout.clustering.spectral.kmeans; import java.io.IOException; +import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -43,23 +44,17 @@ import org.apache.mahout.common.commandline.DefaultOptionCreator; import org.apache.mahout.common.distance.DistanceMeasure; import org.apache.mahout.common.iterator.sequencefile.SequenceFileIterable; import org.apache.mahout.math.Vector; -import org.apache.mahout.math.decomposer.lanczos.LanczosState; import org.apache.mahout.math.hadoop.DistributedRowMatrix; -import org.apache.mahout.math.hadoop.decomposer.DistributedLanczosSolver; -import org.apache.mahout.math.hadoop.decomposer.EigenVerificationJob; import org.apache.mahout.math.hadoop.stochasticsvd.SSVDSolver; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.collect.Lists; - /** * Performs spectral k-means clustering on the top k eigenvectors of the input affinity matrix. */ public class SpectralKMeansDriver extends AbstractJob { private static final Logger log = LoggerFactory.getLogger(SpectralKMeansDriver.class); - public static final double OVERSHOOTMULTIPLIER = 2.0; public static final int REDUCERS = 10; public static final int BLOCKHEIGHT = 30000; public static final int OVERSAMPLING = 15; @@ -87,7 +82,7 @@ public class SpectralKMeansDriver extends AbstractJob { addOption("oversampling", "p", "Oversampling parameter for SSVD", String.valueOf(OVERSAMPLING)); addOption("powerIter", "q", "Additional power iterations for SSVD", String.valueOf(POWERITERS)); - Map<String,List<String>> parsedArgs = parseArguments(arg0); + Map<String, List<String>> parsedArgs = parseArguments(arg0); if (parsedArgs == null) { return 0; } @@ -106,25 +101,20 @@ public class SpectralKMeansDriver extends AbstractJob { int maxIterations = Integer.parseInt(getOption(DefaultOptionCreator.MAX_ITERATIONS_OPTION)); Path tempdir = new Path(getOption("tempDir")); - boolean ssvd = parsedArgs.containsKey("--usessvd"); - if (ssvd) { - int reducers = Integer.parseInt(getOption("reduceTasks")); - int blockheight = Integer.parseInt(getOption("outerProdBlockHeight")); - int oversampling = Integer.parseInt(getOption("oversampling")); - int poweriters = Integer.parseInt(getOption("powerIter")); - run(conf, input, output, numDims, clusters, measure, convergenceDelta, maxIterations, tempdir, true, reducers, - blockheight, oversampling, poweriters); - } else { - run(conf, input, output, numDims, clusters, measure, convergenceDelta, maxIterations, tempdir, false); - } + int reducers = Integer.parseInt(getOption("reduceTasks")); + int blockheight = Integer.parseInt(getOption("outerProdBlockHeight")); + int oversampling = Integer.parseInt(getOption("oversampling")); + int poweriters = Integer.parseInt(getOption("powerIter")); + run(conf, input, output, numDims, clusters, measure, convergenceDelta, maxIterations, tempdir, reducers, + blockheight, oversampling, poweriters); return 0; } public static void run(Configuration conf, Path input, Path output, int numDims, int clusters, - DistanceMeasure measure, double convergenceDelta, int maxIterations, Path tempDir, boolean ssvd) + DistanceMeasure measure, double convergenceDelta, int maxIterations, Path tempDir) throws IOException, InterruptedException, ClassNotFoundException { - run(conf, input, output, numDims, clusters, measure, convergenceDelta, maxIterations, tempDir, ssvd, REDUCERS, + run(conf, input, output, numDims, clusters, measure, convergenceDelta, maxIterations, tempDir, REDUCERS, BLOCKHEIGHT, OVERSAMPLING, POWERITERS); } @@ -149,8 +139,6 @@ public class SpectralKMeansDriver extends AbstractJob { * the int maximum number of iterations for the k-Means calculations * @param tempDir * Temporary directory for intermediate calculations - * @param ssvd - * Flag to indicate the eigensolver to use * @param numReducers * Number of reducers * @param blockHeight @@ -158,9 +146,9 @@ public class SpectralKMeansDriver extends AbstractJob { * @param poweriters */ public static void run(Configuration conf, Path input, Path output, int numDims, int clusters, - DistanceMeasure measure, double convergenceDelta, int maxIterations, Path tempDir, boolean ssvd, int numReducers, - int blockHeight, int oversampling, int poweriters) throws IOException, InterruptedException, - ClassNotFoundException { + DistanceMeasure measure, double convergenceDelta, int maxIterations, Path tempDir, + int numReducers, int blockHeight, int oversampling, int poweriters) + throws IOException, InterruptedException, ClassNotFoundException { HadoopUtil.delete(conf, tempDir); Path outputCalc = new Path(tempDir, "calculations"); @@ -188,46 +176,21 @@ public class SpectralKMeansDriver extends AbstractJob { Path data; - if (ssvd) { - // SSVD requires an array of Paths to function. So we pass in an array of length one - Path[] LPath = new Path[1]; - LPath[0] = L.getRowPath(); + // SSVD requires an array of Paths to function. So we pass in an array of length one + Path[] LPath = new Path[1]; + LPath[0] = L.getRowPath(); - Path SSVDout = new Path(outputCalc, "SSVD"); + Path SSVDout = new Path(outputCalc, "SSVD"); - SSVDSolver solveIt = new SSVDSolver(depConf, LPath, SSVDout, blockHeight, clusters, oversampling, numReducers); + SSVDSolver solveIt = new SSVDSolver(depConf, LPath, SSVDout, blockHeight, clusters, oversampling, numReducers); - solveIt.setComputeV(false); - solveIt.setComputeU(true); - solveIt.setOverwrite(true); - solveIt.setQ(poweriters); - // solveIt.setBroadcast(false); - solveIt.run(); - data = new Path(solveIt.getUPath()); - } else { - // Perform eigen-decomposition using LanczosSolver - // since some of the eigen-output is spurious and will be eliminated - // upon verification, we have to aim to overshoot and then discard - // unnecessary vectors later - int overshoot = Math.min((int) (clusters * OVERSHOOTMULTIPLIER), numDims); - DistributedLanczosSolver solver = new DistributedLanczosSolver(); - LanczosState state = new LanczosState(L, overshoot, DistributedLanczosSolver.getInitialVector(L)); - Path lanczosSeqFiles = new Path(outputCalc, "eigenvectors"); - - solver.runJob(conf, state, overshoot, true, lanczosSeqFiles.toString()); - - // perform a verification - EigenVerificationJob verifier = new EigenVerificationJob(); - Path verifiedEigensPath = new Path(outputCalc, "eigenverifier"); - verifier.runJob(conf, lanczosSeqFiles, L.getRowPath(), verifiedEigensPath, true, 1.0, clusters); - - Path cleanedEigens = verifier.getCleanedEigensPath(); - DistributedRowMatrix W = new DistributedRowMatrix(cleanedEigens, new Path(cleanedEigens, "tmp"), clusters, - numDims); - W.setConf(depConf); - DistributedRowMatrix Wtrans = W.transpose(); - data = Wtrans.getRowPath(); - } + solveIt.setComputeV(false); + solveIt.setComputeU(true); + solveIt.setOverwrite(true); + solveIt.setQ(poweriters); + // solveIt.setBroadcast(false); + solveIt.run(); + data = new Path(solveIt.getUPath()); // Normalize the rows of Wt to unit length // normalize is important because it reduces the occurrence of two unique clusters combining into one @@ -250,7 +213,7 @@ public class SpectralKMeansDriver extends AbstractJob { // Restore name to id mapping and read through the cluster assignments Path mappingPath = new Path(new Path(conf.get("hadoop.tmp.dir")), "generic_input_mapping"); - List<String> mapping = Lists.newArrayList(); + List<String> mapping = new ArrayList<>(); FileSystem fs = FileSystem.get(mappingPath.toUri(), conf); if (fs.exists(mappingPath)) { SequenceFile.Reader reader = new SequenceFile.Reader(fs, mappingPath, conf); @@ -269,7 +232,7 @@ public class SpectralKMeansDriver extends AbstractJob { Path inputPath = new Path(clusteredPointsPath, "part-m-00000"); int id = 0; for (Pair<IntWritable, WeightedVectorWritable> record : - new SequenceFileIterable<IntWritable, WeightedVectorWritable>(inputPath, conf)) { + new SequenceFileIterable<IntWritable, WeightedVectorWritable>(inputPath, conf)) { if (!mapping.isEmpty()) { log.info("{}: {}", mapping.get(id++), record.getFirst().get()); } else {
