http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/decomposer/EigenVerificationJob.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/decomposer/EigenVerificationJob.java b/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/decomposer/EigenVerificationJob.java new file mode 100644 index 0000000..5abb800 --- /dev/null +++ b/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/decomposer/EigenVerificationJob.java @@ -0,0 +1,333 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.math.hadoop.decomposer; + +import java.io.IOException; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Map; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.io.Closeables; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.util.ToolRunner; +import org.apache.mahout.common.AbstractJob; +import org.apache.mahout.common.commandline.DefaultOptionCreator; +import org.apache.mahout.math.MatrixSlice; +import org.apache.mahout.math.SparseRowMatrix; +import org.apache.mahout.math.Vector; +import org.apache.mahout.math.VectorIterable; +import org.apache.mahout.math.VectorWritable; +import org.apache.mahout.math.decomposer.EigenStatus; +import org.apache.mahout.math.decomposer.SimpleEigenVerifier; +import org.apache.mahout.math.decomposer.SingularVectorVerifier; +import org.apache.mahout.math.hadoop.DistributedRowMatrix; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * <p> + * Class for taking the output of an eigendecomposition (specified as a Path location), and verifies correctness, in + * terms of the following: if you have a vector e, and a matrix m, then let e' = m.timesSquared(v); the error w.r.t. + * eigenvector-ness is the cosine of the angle between e and e': + * </p> + * + * <pre> + * error(e,e') = e.dot(e') / (e.norm(2)*e'.norm(2)) + * </pre> + * <p> + * A set of eigenvectors should also all be very close to orthogonal, so this job computes all inner products between + * eigenvectors, and checks that this is close to the identity matrix. + * </p> + * <p> + * Parameters used in the cleanup (other than in the input/output path options) include --minEigenvalue, which specifies + * the value below which eigenvector/eigenvalue pairs will be discarded, and --maxError, which specifies the maximum + * error (as defined above) to be tolerated in an eigenvector. + * </p> + * <p> + * If all the eigenvectors can fit in memory, --inMemory allows for a speedier completion of this task by doing so. + * </p> + */ +@Deprecated +public class EigenVerificationJob extends AbstractJob { + + public static final String CLEAN_EIGENVECTORS = "cleanEigenvectors"; + + private static final Logger log = LoggerFactory.getLogger(EigenVerificationJob.class); + + private SingularVectorVerifier eigenVerifier; + + private VectorIterable eigensToVerify; + + private VectorIterable corpus; + + private double maxError; + + private double minEigenValue; + + // private boolean loadEigensInMemory; + + private Path tmpOut; + + private Path outPath; + + private int maxEigensToKeep; + + private Path cleanedEigensPath; + + public void setEigensToVerify(VectorIterable eigens) { + eigensToVerify = eigens; + } + + @Override + public int run(String[] args) throws Exception { + Map<String,List<String>> argMap = handleArgs(args); + if (argMap == null) { + return -1; + } + if (argMap.isEmpty()) { + return 0; + } + // parse out the arguments + runJob(getConf(), new Path(getOption("eigenInput")), new Path(getOption("corpusInput")), getOutputPath(), + getOption("inMemory") != null, Double.parseDouble(getOption("maxError")), + // Double.parseDouble(getOption("minEigenvalue")), + Integer.parseInt(getOption("maxEigens"))); + return 0; + } + + /** + * Run the job with the given arguments + * + * @param corpusInput + * the corpus input Path + * @param eigenInput + * the eigenvector input Path + * @param output + * the output Path + * @param tempOut + * temporary output Path + * @param maxError + * a double representing the maximum error + * @param minEigenValue + * a double representing the minimum eigenvalue + * @param inMemory + * a boolean requesting in-memory preparation + * @param conf + * the Configuration to use, or null if a default is ok (saves referencing Configuration in calling classes + * unless needed) + */ + public int run(Path corpusInput, Path eigenInput, Path output, Path tempOut, double maxError, double minEigenValue, + boolean inMemory, Configuration conf) throws IOException { + this.outPath = output; + this.tmpOut = tempOut; + this.maxError = maxError; + this.minEigenValue = minEigenValue; + + if (eigenInput != null && eigensToVerify == null) { + prepareEigens(conf, eigenInput, inMemory); + } + DistributedRowMatrix c = new DistributedRowMatrix(corpusInput, tempOut, 1, 1); + c.setConf(conf); + corpus = c; + + // set up eigenverifier and orthoverifier TODO: allow multithreaded execution + + eigenVerifier = new SimpleEigenVerifier(); + + // we don't currently verify orthonormality here. + // VectorIterable pairwiseInnerProducts = computePairwiseInnerProducts(); + + Map<MatrixSlice,EigenStatus> eigenMetaData = verifyEigens(); + + List<Map.Entry<MatrixSlice,EigenStatus>> prunedEigenMeta = pruneEigens(eigenMetaData); + + saveCleanEigens(new Configuration(), prunedEigenMeta); + return 0; + } + + private Map<String,List<String>> handleArgs(String[] args) throws IOException { + addOutputOption(); + addOption("eigenInput", "ei", + "The Path for purported eigenVector input files (SequenceFile<WritableComparable,VectorWritable>.", null); + addOption("corpusInput", "ci", "The Path for corpus input files (SequenceFile<WritableComparable,VectorWritable>."); + addOption(DefaultOptionCreator.outputOption().create()); + addOption(DefaultOptionCreator.helpOption()); + addOption("inMemory", "mem", "Buffer eigen matrix into memory (if you have enough!)", "false"); + addOption("maxError", "err", "Maximum acceptable error", "0.05"); + addOption("minEigenvalue", "mev", "Minimum eigenvalue to keep the vector for", "0.0"); + addOption("maxEigens", "max", "Maximum number of eigenvectors to keep (0 means all)", "0"); + + return parseArguments(args); + } + + private void saveCleanEigens(Configuration conf, Collection<Map.Entry<MatrixSlice,EigenStatus>> prunedEigenMeta) + throws IOException { + Path path = new Path(outPath, CLEAN_EIGENVECTORS); + FileSystem fs = FileSystem.get(path.toUri(), conf); + SequenceFile.Writer seqWriter = new SequenceFile.Writer(fs, conf, path, IntWritable.class, VectorWritable.class); + try { + IntWritable iw = new IntWritable(); + int numEigensWritten = 0; + int index = 0; + for (Map.Entry<MatrixSlice,EigenStatus> pruneSlice : prunedEigenMeta) { + MatrixSlice s = pruneSlice.getKey(); + EigenStatus meta = pruneSlice.getValue(); + EigenVector ev = new EigenVector(s.vector(), meta.getEigenValue(), Math.abs(1 - meta.getCosAngle()), s.index()); + // log.info("appending {} to {}", ev, path); + Writable vw = new VectorWritable(ev); + iw.set(index++); + seqWriter.append(iw, vw); + + // increment the number of eigenvectors written and see if we've + // reached our specified limit, or if we wish to write all eigenvectors + // (latter is built-in, since numEigensWritten will always be > 0 + numEigensWritten++; + if (numEigensWritten == maxEigensToKeep) { + log.info("{} of the {} total eigens have been written", maxEigensToKeep, prunedEigenMeta.size()); + break; + } + } + } finally { + Closeables.close(seqWriter, false); + } + cleanedEigensPath = path; + } + + private List<Map.Entry<MatrixSlice,EigenStatus>> pruneEigens(Map<MatrixSlice,EigenStatus> eigenMetaData) { + List<Map.Entry<MatrixSlice,EigenStatus>> prunedEigenMeta = Lists.newArrayList(); + + for (Map.Entry<MatrixSlice,EigenStatus> entry : eigenMetaData.entrySet()) { + if (Math.abs(1 - entry.getValue().getCosAngle()) < maxError && entry.getValue().getEigenValue() > minEigenValue) { + prunedEigenMeta.add(entry); + } + } + + Collections.sort(prunedEigenMeta, new Comparator<Map.Entry<MatrixSlice,EigenStatus>>() { + @Override + public int compare(Map.Entry<MatrixSlice,EigenStatus> e1, Map.Entry<MatrixSlice,EigenStatus> e2) { + // sort eigens on eigenvalues in descending order + Double eg1 = e1.getValue().getEigenValue(); + Double eg2 = e2.getValue().getEigenValue(); + return eg1.compareTo(eg2); + } + }); + + // iterate thru' the eigens, pick up ones with max orthogonality with the selected ones + List<Map.Entry<MatrixSlice,EigenStatus>> selectedEigenMeta = Lists.newArrayList(); + Map.Entry<MatrixSlice,EigenStatus> e1 = prunedEigenMeta.remove(0); + selectedEigenMeta.add(e1); + int selectedEigenMetaLength = selectedEigenMeta.size(); + int prunedEigenMetaLength = prunedEigenMeta.size(); + + while (prunedEigenMetaLength > 0) { + double sum = Double.MAX_VALUE; + int index = 0; + for (int i = 0; i < prunedEigenMetaLength; i++) { + Map.Entry<MatrixSlice,EigenStatus> e = prunedEigenMeta.get(i); + double tmp = 0; + for (int j = 0; j < selectedEigenMetaLength; j++) { + Map.Entry<MatrixSlice,EigenStatus> ee = selectedEigenMeta.get(j); + tmp += ee.getKey().vector().times(e.getKey().vector()).norm(2); + } + if (tmp < sum) { + sum = tmp; + index = i; + } + } + Map.Entry<MatrixSlice,EigenStatus> e = prunedEigenMeta.remove(index); + selectedEigenMeta.add(e); + selectedEigenMetaLength++; + prunedEigenMetaLength--; + } + + return selectedEigenMeta; + } + + private Map<MatrixSlice,EigenStatus> verifyEigens() { + Map<MatrixSlice,EigenStatus> eigenMetaData = Maps.newHashMap(); + + for (MatrixSlice slice : eigensToVerify) { + EigenStatus status = eigenVerifier.verify(corpus, slice.vector()); + eigenMetaData.put(slice, status); + } + return eigenMetaData; + } + + private void prepareEigens(Configuration conf, Path eigenInput, boolean inMemory) { + DistributedRowMatrix eigens = new DistributedRowMatrix(eigenInput, tmpOut, 1, 1); + eigens.setConf(conf); + if (inMemory) { + List<Vector> eigenVectors = Lists.newArrayList(); + for (MatrixSlice slice : eigens) { + eigenVectors.add(slice.vector()); + } + eigensToVerify = new SparseRowMatrix(eigenVectors.size(), eigenVectors.get(0).size(), + eigenVectors.toArray(new Vector[eigenVectors.size()]), true, true); + + } else { + eigensToVerify = eigens; + } + } + + public Path getCleanedEigensPath() { + return cleanedEigensPath; + } + + public static void main(String[] args) throws Exception { + ToolRunner.run(new EigenVerificationJob(), args); + } + + /** + * Progammatic invocation of run() + * + * @param eigenInput + * Output of LanczosSolver + * @param corpusInput + * Input of LanczosSolver + */ + public void runJob(Configuration conf, Path eigenInput, Path corpusInput, Path output, boolean inMemory, + double maxError, int maxEigens) throws IOException { + // no need to handle command line arguments + outPath = output; + tmpOut = new Path(outPath, "tmp"); + maxEigensToKeep = maxEigens; + this.maxError = maxError; + if (eigenInput != null && eigensToVerify == null) { + prepareEigens(new Configuration(conf), eigenInput, inMemory); + } + + DistributedRowMatrix c = new DistributedRowMatrix(corpusInput, tmpOut, 1, 1); + c.setConf(new Configuration(conf)); + corpus = c; + + eigenVerifier = new SimpleEigenVerifier(); + + Map<MatrixSlice,EigenStatus> eigenMetaData = verifyEigens(); + List<Map.Entry<MatrixSlice,EigenStatus>> prunedEigenMeta = pruneEigens(eigenMetaData); + saveCleanEigens(conf, prunedEigenMeta); + } +}
http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/decomposer/HdfsBackedLanczosState.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/decomposer/HdfsBackedLanczosState.java b/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/decomposer/HdfsBackedLanczosState.java new file mode 100644 index 0000000..f1874a8 --- /dev/null +++ b/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/decomposer/HdfsBackedLanczosState.java @@ -0,0 +1,237 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.math.hadoop.decomposer; + +import java.io.IOException; +import java.util.Map; + +import com.google.common.io.Closeables; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.SequenceFile; +import org.apache.mahout.math.DenseMatrix; +import org.apache.mahout.math.DenseVector; +import org.apache.mahout.math.Matrix; +import org.apache.mahout.math.Vector; +import org.apache.mahout.math.VectorIterable; +import org.apache.mahout.math.VectorWritable; +import org.apache.mahout.math.decomposer.lanczos.LanczosState; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class HdfsBackedLanczosState extends LanczosState implements Configurable { + + private static final Logger log = LoggerFactory.getLogger(HdfsBackedLanczosState.class); + + public static final String BASIS_PREFIX = "basis"; + public static final String SINGULAR_PREFIX = "singular"; + //public static final String METADATA_FILE = "metadata"; + + private Configuration conf; + private final Path baseDir; + private final Path basisPath; + private final Path singularVectorPath; + private FileSystem fs; + + public HdfsBackedLanczosState(VectorIterable corpus, int desiredRank, Vector initialVector, Path dir) { + super(corpus, desiredRank, initialVector); + baseDir = dir; + //Path metadataPath = new Path(dir, METADATA_FILE); + basisPath = new Path(dir, BASIS_PREFIX); + singularVectorPath = new Path(dir, SINGULAR_PREFIX); + if (corpus instanceof Configurable) { + setConf(((Configurable)corpus).getConf()); + } + } + + @Override public void setConf(Configuration configuration) { + conf = configuration; + try { + setupDirs(); + updateHdfsState(); + } catch (IOException e) { + log.error("Could not retrieve filesystem: {}", conf, e); + } + } + + @Override public Configuration getConf() { + return conf; + } + + private void setupDirs() throws IOException { + fs = baseDir.getFileSystem(conf); + createDirIfNotExist(baseDir); + createDirIfNotExist(basisPath); + createDirIfNotExist(singularVectorPath); + } + + private void createDirIfNotExist(Path path) throws IOException { + if (!fs.exists(path) && !fs.mkdirs(path)) { + throw new IOException("Unable to create: " + path); + } + } + + @Override + public void setIterationNumber(int i) { + super.setIterationNumber(i); + try { + updateHdfsState(); + } catch (IOException e) { + log.error("Could not update HDFS state: ", e); + } + } + + protected void updateHdfsState() throws IOException { + if (conf == null) { + return; + } + int numBasisVectorsOnDisk = 0; + Path nextBasisVectorPath = new Path(basisPath, BASIS_PREFIX + '_' + numBasisVectorsOnDisk); + while (fs.exists(nextBasisVectorPath)) { + nextBasisVectorPath = new Path(basisPath, BASIS_PREFIX + '_' + ++numBasisVectorsOnDisk); + } + Vector nextVector; + while (numBasisVectorsOnDisk < iterationNumber + && (nextVector = getBasisVector(numBasisVectorsOnDisk)) != null) { + persistVector(nextBasisVectorPath, numBasisVectorsOnDisk, nextVector); + nextBasisVectorPath = new Path(basisPath, BASIS_PREFIX + '_' + ++numBasisVectorsOnDisk); + } + if (scaleFactor <= 0) { + scaleFactor = getScaleFactor(); // load from disk if possible + } + diagonalMatrix = getDiagonalMatrix(); // load from disk if possible + Vector norms = new DenseVector(diagonalMatrix.numCols() - 1); + Vector projections = new DenseVector(diagonalMatrix.numCols()); + int i = 0; + while (i < diagonalMatrix.numCols() - 1) { + norms.set(i, diagonalMatrix.get(i, i + 1)); + projections.set(i, diagonalMatrix.get(i, i)); + i++; + } + projections.set(i, diagonalMatrix.get(i, i)); + persistVector(new Path(baseDir, "projections"), 0, projections); + persistVector(new Path(baseDir, "norms"), 0, norms); + persistVector(new Path(baseDir, "scaleFactor"), 0, new DenseVector(new double[] {scaleFactor})); + for (Map.Entry<Integer, Vector> entry : singularVectors.entrySet()) { + persistVector(new Path(singularVectorPath, SINGULAR_PREFIX + '_' + entry.getKey()), + entry.getKey(), entry.getValue()); + } + super.setIterationNumber(numBasisVectorsOnDisk); + } + + protected void persistVector(Path p, int key, Vector vector) throws IOException { + SequenceFile.Writer writer = null; + try { + if (fs.exists(p)) { + log.warn("{} exists, will overwrite", p); + fs.delete(p, true); + } + writer = new SequenceFile.Writer(fs, conf, p, + IntWritable.class, VectorWritable.class); + writer.append(new IntWritable(key), new VectorWritable(vector)); + } finally { + Closeables.close(writer, false); + } + } + + protected Vector fetchVector(Path p, int keyIndex) throws IOException { + if (!fs.exists(p)) { + return null; + } + SequenceFile.Reader reader = new SequenceFile.Reader(fs, p, conf); + IntWritable key = new IntWritable(); + VectorWritable vw = new VectorWritable(); + while (reader.next(key, vw)) { + if (key.get() == keyIndex) { + return vw.get(); + } + } + return null; + } + + @Override + public Vector getBasisVector(int i) { + if (!basis.containsKey(i)) { + try { + Vector v = fetchVector(new Path(basisPath, BASIS_PREFIX + '_' + i), i); + basis.put(i, v); + } catch (IOException e) { + log.error("Could not load basis vector: {}", i, e); + } + } + return super.getBasisVector(i); + } + + @Override + public Vector getRightSingularVector(int i) { + if (!singularVectors.containsKey(i)) { + try { + Vector v = fetchVector(new Path(singularVectorPath, BASIS_PREFIX + '_' + i), i); + singularVectors.put(i, v); + } catch (IOException e) { + log.error("Could not load singular vector: {}", i, e); + } + } + return super.getRightSingularVector(i); + } + + @Override + public double getScaleFactor() { + if (scaleFactor <= 0) { + try { + Vector v = fetchVector(new Path(baseDir, "scaleFactor"), 0); + if (v != null && v.size() > 0) { + scaleFactor = v.get(0); + } + } catch (IOException e) { + log.error("could not load scaleFactor:", e); + } + } + return scaleFactor; + } + + @Override + public Matrix getDiagonalMatrix() { + if (diagonalMatrix == null) { + diagonalMatrix = new DenseMatrix(desiredRank, desiredRank); + } + if (diagonalMatrix.get(0, 1) <= 0) { + try { + Vector norms = fetchVector(new Path(baseDir, "norms"), 0); + Vector projections = fetchVector(new Path(baseDir, "projections"), 0); + if (norms != null && projections != null) { + int i = 0; + while (i < projections.size() - 1) { + diagonalMatrix.set(i, i, projections.get(i)); + diagonalMatrix.set(i, i + 1, norms.get(i)); + diagonalMatrix.set(i + 1, i, norms.get(i)); + i++; + } + diagonalMatrix.set(i, i, projections.get(i)); + } + } catch (IOException e) { + log.error("Could not load diagonal matrix of norms and projections: ", e); + } + } + return diagonalMatrix; + } + +} http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/similarity/SeedVectorUtil.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/similarity/SeedVectorUtil.java b/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/similarity/SeedVectorUtil.java new file mode 100644 index 0000000..9119f69 --- /dev/null +++ b/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/similarity/SeedVectorUtil.java @@ -0,0 +1,104 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.math.hadoop.similarity; + +import com.google.common.collect.Lists; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Writable; +import org.apache.mahout.clustering.canopy.Canopy; +import org.apache.mahout.clustering.kmeans.Kluster; +import org.apache.mahout.common.iterator.sequencefile.PathFilters; +import org.apache.mahout.common.iterator.sequencefile.PathType; +import org.apache.mahout.common.iterator.sequencefile.SequenceFileDirValueIterable; +import org.apache.mahout.math.NamedVector; +import org.apache.mahout.math.Vector; +import org.apache.mahout.math.VectorWritable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collections; +import java.util.List; + +final class SeedVectorUtil { + + private static final Logger log = LoggerFactory.getLogger(SeedVectorUtil.class); + + private SeedVectorUtil() { + } + + public static List<NamedVector> loadSeedVectors(Configuration conf) { + + String seedPathStr = conf.get(VectorDistanceSimilarityJob.SEEDS_PATH_KEY); + if (seedPathStr == null || seedPathStr.isEmpty()) { + return Collections.emptyList(); + } + + List<NamedVector> seedVectors = Lists.newArrayList(); + long item = 0; + for (Writable value + : new SequenceFileDirValueIterable<>(new Path(seedPathStr), + PathType.LIST, + PathFilters.partFilter(), + conf)) { + Class<? extends Writable> valueClass = value.getClass(); + if (valueClass.equals(Kluster.class)) { + // get the cluster info + Kluster cluster = (Kluster) value; + Vector vector = cluster.getCenter(); + if (vector instanceof NamedVector) { + seedVectors.add((NamedVector) vector); + } else { + seedVectors.add(new NamedVector(vector, cluster.getIdentifier())); + } + } else if (valueClass.equals(Canopy.class)) { + // get the cluster info + Canopy canopy = (Canopy) value; + Vector vector = canopy.getCenter(); + if (vector instanceof NamedVector) { + seedVectors.add((NamedVector) vector); + } else { + seedVectors.add(new NamedVector(vector, canopy.getIdentifier())); + } + } else if (valueClass.equals(Vector.class)) { + Vector vector = (Vector) value; + if (vector instanceof NamedVector) { + seedVectors.add((NamedVector) vector); + } else { + seedVectors.add(new NamedVector(vector, seedPathStr + '.' + item++)); + } + } else if (valueClass.equals(VectorWritable.class) || valueClass.isInstance(VectorWritable.class)) { + VectorWritable vw = (VectorWritable) value; + Vector vector = vw.get(); + if (vector instanceof NamedVector) { + seedVectors.add((NamedVector) vector); + } else { + seedVectors.add(new NamedVector(vector, seedPathStr + '.' + item++)); + } + } else { + throw new IllegalStateException("Bad value class: " + valueClass); + } + } + if (seedVectors.isEmpty()) { + throw new IllegalStateException("No seeds found. Check your path: " + seedPathStr); + } + log.info("Seed Vectors size: {}", seedVectors.size()); + return seedVectors; + } + +} http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/similarity/VectorDistanceInvertedMapper.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/similarity/VectorDistanceInvertedMapper.java b/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/similarity/VectorDistanceInvertedMapper.java new file mode 100644 index 0000000..c45d55a --- /dev/null +++ b/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/similarity/VectorDistanceInvertedMapper.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.math.hadoop.similarity; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.mahout.common.ClassUtils; +import org.apache.mahout.common.distance.DistanceMeasure; +import org.apache.mahout.math.DenseVector; +import org.apache.mahout.math.NamedVector; +import org.apache.mahout.math.Vector; +import org.apache.mahout.math.VectorWritable; + +import java.io.IOException; +import java.util.List; + +/** + * Similar to {@link VectorDistanceMapper}, except it outputs + * <input, Vector>, where the vector is a dense vector contain one entry for every seed vector + */ +public final class VectorDistanceInvertedMapper + extends Mapper<WritableComparable<?>, VectorWritable, Text, VectorWritable> { + + private DistanceMeasure measure; + private List<NamedVector> seedVectors; + + @Override + protected void map(WritableComparable<?> key, VectorWritable value, Context context) + throws IOException, InterruptedException { + String keyName; + Vector valVec = value.get(); + if (valVec instanceof NamedVector) { + keyName = ((NamedVector) valVec).getName(); + } else { + keyName = key.toString(); + } + Vector outVec = new DenseVector(new double[seedVectors.size()]); + int i = 0; + for (NamedVector seedVector : seedVectors) { + outVec.setQuick(i++, measure.distance(seedVector, valVec)); + } + context.write(new Text(keyName), new VectorWritable(outVec)); + } + + @Override + protected void setup(Context context) throws IOException, InterruptedException { + super.setup(context); + Configuration conf = context.getConfiguration(); + measure = + ClassUtils.instantiateAs(conf.get(VectorDistanceSimilarityJob.DISTANCE_MEASURE_KEY), DistanceMeasure.class); + measure.configure(conf); + seedVectors = SeedVectorUtil.loadSeedVectors(conf); + } +} http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/similarity/VectorDistanceMapper.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/similarity/VectorDistanceMapper.java b/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/similarity/VectorDistanceMapper.java new file mode 100644 index 0000000..9fccd8e --- /dev/null +++ b/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/similarity/VectorDistanceMapper.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.math.hadoop.similarity; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.DoubleWritable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.mahout.common.ClassUtils; +import org.apache.mahout.common.StringTuple; +import org.apache.mahout.common.distance.DistanceMeasure; +import org.apache.mahout.math.NamedVector; +import org.apache.mahout.math.Vector; +import org.apache.mahout.math.VectorWritable; + +import java.io.IOException; +import java.util.List; + +public final class VectorDistanceMapper + extends Mapper<WritableComparable<?>, VectorWritable, StringTuple, DoubleWritable> { + + private DistanceMeasure measure; + private List<NamedVector> seedVectors; + private boolean usesThreshold = false; + private double maxDistance; + + @Override + protected void map(WritableComparable<?> key, VectorWritable value, Context context) + throws IOException, InterruptedException { + String keyName; + Vector valVec = value.get(); + if (valVec instanceof NamedVector) { + keyName = ((NamedVector) valVec).getName(); + } else { + keyName = key.toString(); + } + + for (NamedVector seedVector : seedVectors) { + double distance = measure.distance(seedVector, valVec); + if (!usesThreshold || distance <= maxDistance) { + StringTuple outKey = new StringTuple(); + outKey.add(seedVector.getName()); + outKey.add(keyName); + context.write(outKey, new DoubleWritable(distance)); + } + } + } + + @Override + protected void setup(Context context) throws IOException, InterruptedException { + super.setup(context); + Configuration conf = context.getConfiguration(); + + String maxDistanceParam = conf.get(VectorDistanceSimilarityJob.MAX_DISTANCE); + if (maxDistanceParam != null) { + usesThreshold = true; + maxDistance = Double.parseDouble(maxDistanceParam); + } + + measure = ClassUtils.instantiateAs(conf.get(VectorDistanceSimilarityJob.DISTANCE_MEASURE_KEY), + DistanceMeasure.class); + measure.configure(conf); + seedVectors = SeedVectorUtil.loadSeedVectors(conf); + } +} http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/similarity/VectorDistanceSimilarityJob.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/similarity/VectorDistanceSimilarityJob.java b/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/similarity/VectorDistanceSimilarityJob.java new file mode 100644 index 0000000..9f58f1e --- /dev/null +++ b/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/similarity/VectorDistanceSimilarityJob.java @@ -0,0 +1,153 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.math.hadoop.similarity; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.DoubleWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; +import org.apache.hadoop.util.ToolRunner; +import org.apache.mahout.common.AbstractJob; +import org.apache.mahout.common.ClassUtils; +import org.apache.mahout.common.HadoopUtil; +import org.apache.mahout.common.StringTuple; +import org.apache.mahout.common.commandline.DefaultOptionCreator; +import org.apache.mahout.common.distance.DistanceMeasure; +import org.apache.mahout.common.distance.SquaredEuclideanDistanceMeasure; +import org.apache.mahout.math.VectorWritable; + +import com.google.common.base.Preconditions; + +import java.io.IOException; + +/** + * This class does a Map-side join between seed vectors (the map side can also be a Cluster) and a list of other vectors + * and emits the a tuple of seed id, other id, distance. It is a more generic version of KMean's mapper + */ +public class VectorDistanceSimilarityJob extends AbstractJob { + + public static final String SEEDS = "seeds"; + public static final String SEEDS_PATH_KEY = "seedsPath"; + public static final String DISTANCE_MEASURE_KEY = "vectorDistSim.measure"; + public static final String OUT_TYPE_KEY = "outType"; + public static final String MAX_DISTANCE = "maxDistance"; + + public static void main(String[] args) throws Exception { + ToolRunner.run(new Configuration(), new VectorDistanceSimilarityJob(), args); + } + + @Override + public int run(String[] args) throws Exception { + + addInputOption(); + addOutputOption(); + addOption(DefaultOptionCreator.distanceMeasureOption().create()); + addOption(SEEDS, "s", "The set of vectors to compute distances against. Must fit in memory on the mapper"); + addOption(MAX_DISTANCE, "mx", "set an upper-bound on distance (double) such that any pair of vectors with a" + + " distance greater than this value is ignored in the output. Ignored for non pairwise output!"); + addOption(DefaultOptionCreator.overwriteOption().create()); + addOption(OUT_TYPE_KEY, "ot", "[pw|v] -- Define the output style: pairwise, the default, (pw) or vector (v). " + + "Pairwise is a tuple of <seed, other, distance>, vector is <other, <Vector of size the number of seeds>>.", + "pw"); + + if (parseArguments(args) == null) { + return -1; + } + + Path input = getInputPath(); + Path output = getOutputPath(); + Path seeds = new Path(getOption(SEEDS)); + String measureClass = getOption(DefaultOptionCreator.DISTANCE_MEASURE_OPTION); + if (measureClass == null) { + measureClass = SquaredEuclideanDistanceMeasure.class.getName(); + } + if (hasOption(DefaultOptionCreator.OVERWRITE_OPTION)) { + HadoopUtil.delete(getConf(), output); + } + DistanceMeasure measure = ClassUtils.instantiateAs(measureClass, DistanceMeasure.class); + String outType = getOption(OUT_TYPE_KEY, "pw"); + + Double maxDistance = null; + + if ("pw".equals(outType)) { + String maxDistanceArg = getOption(MAX_DISTANCE); + if (maxDistanceArg != null) { + maxDistance = Double.parseDouble(maxDistanceArg); + Preconditions.checkArgument(maxDistance > 0.0d, "value for " + MAX_DISTANCE + " must be greater than zero"); + } + } + + run(getConf(), input, seeds, output, measure, outType, maxDistance); + return 0; + } + + public static void run(Configuration conf, + Path input, + Path seeds, + Path output, + DistanceMeasure measure, String outType) + throws IOException, ClassNotFoundException, InterruptedException { + run(conf, input, seeds, output, measure, outType, null); + } + + public static void run(Configuration conf, + Path input, + Path seeds, + Path output, + DistanceMeasure measure, String outType, Double maxDistance) + throws IOException, ClassNotFoundException, InterruptedException { + if (maxDistance != null) { + conf.set(MAX_DISTANCE, String.valueOf(maxDistance)); + } + conf.set(DISTANCE_MEASURE_KEY, measure.getClass().getName()); + conf.set(SEEDS_PATH_KEY, seeds.toString()); + Job job = new Job(conf, "Vector Distance Similarity: seeds: " + seeds + " input: " + input); + job.setInputFormatClass(SequenceFileInputFormat.class); + job.setOutputFormatClass(SequenceFileOutputFormat.class); + if ("pw".equalsIgnoreCase(outType)) { + job.setMapOutputKeyClass(StringTuple.class); + job.setOutputKeyClass(StringTuple.class); + job.setMapOutputValueClass(DoubleWritable.class); + job.setOutputValueClass(DoubleWritable.class); + job.setMapperClass(VectorDistanceMapper.class); + } else if ("v".equalsIgnoreCase(outType)) { + job.setMapOutputKeyClass(Text.class); + job.setOutputKeyClass(Text.class); + job.setMapOutputValueClass(VectorWritable.class); + job.setOutputValueClass(VectorWritable.class); + job.setMapperClass(VectorDistanceInvertedMapper.class); + } else { + throw new IllegalArgumentException("Invalid outType specified: " + outType); + } + + job.setNumReduceTasks(0); + FileInputFormat.addInputPath(job, input); + FileOutputFormat.setOutputPath(job, output); + + job.setJarByClass(VectorDistanceSimilarityJob.class); + HadoopUtil.delete(conf, output); + if (!job.waitForCompletion(true)) { + throw new IllegalStateException("VectorDistance Similarity failed processing " + seeds); + } + } +} http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/similarity/cooccurrence/MutableElement.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/similarity/cooccurrence/MutableElement.java b/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/similarity/cooccurrence/MutableElement.java new file mode 100644 index 0000000..ecd0d94 --- /dev/null +++ b/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/similarity/cooccurrence/MutableElement.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.math.hadoop.similarity.cooccurrence; + +import org.apache.mahout.math.Vector; + +public class MutableElement implements Vector.Element { + + private int index; + private double value; + + MutableElement(int index, double value) { + this.index = index; + this.value = value; + } + + @Override + public double get() { + return value; + } + + @Override + public int index() { + return index; + } + + public void setIndex(int index) { + this.index = index; + } + + @Override + public void set(double value) { + this.value = value; + } +} http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/similarity/cooccurrence/RowSimilarityJob.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/similarity/cooccurrence/RowSimilarityJob.java b/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/similarity/cooccurrence/RowSimilarityJob.java new file mode 100644 index 0000000..fb28821 --- /dev/null +++ b/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/similarity/cooccurrence/RowSimilarityJob.java @@ -0,0 +1,562 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.math.hadoop.similarity.cooccurrence; + +import com.google.common.base.Preconditions; +import com.google.common.primitives.Ints; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.hadoop.util.ToolRunner; +import org.apache.mahout.common.AbstractJob; +import org.apache.mahout.common.ClassUtils; +import org.apache.mahout.common.HadoopUtil; +import org.apache.mahout.common.RandomUtils; +import org.apache.mahout.common.commandline.DefaultOptionCreator; +import org.apache.mahout.common.mapreduce.VectorSumCombiner; +import org.apache.mahout.common.mapreduce.VectorSumReducer; +import org.apache.mahout.math.RandomAccessSparseVector; +import org.apache.mahout.math.Vector; +import org.apache.mahout.math.Vector.Element; +import org.apache.mahout.math.VectorWritable; +import org.apache.mahout.math.hadoop.similarity.cooccurrence.measures.VectorSimilarityMeasures; +import org.apache.mahout.math.hadoop.similarity.cooccurrence.measures.VectorSimilarityMeasure; +import org.apache.mahout.math.map.OpenIntIntHashMap; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.concurrent.atomic.AtomicInteger; + +public class RowSimilarityJob extends AbstractJob { + + public static final double NO_THRESHOLD = Double.MIN_VALUE; + public static final long NO_FIXED_RANDOM_SEED = Long.MIN_VALUE; + + private static final String SIMILARITY_CLASSNAME = RowSimilarityJob.class + ".distributedSimilarityClassname"; + private static final String NUMBER_OF_COLUMNS = RowSimilarityJob.class + ".numberOfColumns"; + private static final String MAX_SIMILARITIES_PER_ROW = RowSimilarityJob.class + ".maxSimilaritiesPerRow"; + private static final String EXCLUDE_SELF_SIMILARITY = RowSimilarityJob.class + ".excludeSelfSimilarity"; + + private static final String THRESHOLD = RowSimilarityJob.class + ".threshold"; + private static final String NORMS_PATH = RowSimilarityJob.class + ".normsPath"; + private static final String MAXVALUES_PATH = RowSimilarityJob.class + ".maxWeightsPath"; + + private static final String NUM_NON_ZERO_ENTRIES_PATH = RowSimilarityJob.class + ".nonZeroEntriesPath"; + private static final int DEFAULT_MAX_SIMILARITIES_PER_ROW = 100; + + private static final String OBSERVATIONS_PER_COLUMN_PATH = RowSimilarityJob.class + ".observationsPerColumnPath"; + + private static final String MAX_OBSERVATIONS_PER_ROW = RowSimilarityJob.class + ".maxObservationsPerRow"; + private static final String MAX_OBSERVATIONS_PER_COLUMN = RowSimilarityJob.class + ".maxObservationsPerColumn"; + private static final String RANDOM_SEED = RowSimilarityJob.class + ".randomSeed"; + + private static final int DEFAULT_MAX_OBSERVATIONS_PER_ROW = 500; + private static final int DEFAULT_MAX_OBSERVATIONS_PER_COLUMN = 500; + + private static final int NORM_VECTOR_MARKER = Integer.MIN_VALUE; + private static final int MAXVALUE_VECTOR_MARKER = Integer.MIN_VALUE + 1; + private static final int NUM_NON_ZERO_ENTRIES_VECTOR_MARKER = Integer.MIN_VALUE + 2; + + enum Counters { ROWS, USED_OBSERVATIONS, NEGLECTED_OBSERVATIONS, COOCCURRENCES, PRUNED_COOCCURRENCES } + + public static void main(String[] args) throws Exception { + ToolRunner.run(new RowSimilarityJob(), args); + } + + @Override + public int run(String[] args) throws Exception { + + addInputOption(); + addOutputOption(); + addOption("numberOfColumns", "r", "Number of columns in the input matrix", false); + addOption("similarityClassname", "s", "Name of distributed similarity class to instantiate, alternatively use " + + "one of the predefined similarities (" + VectorSimilarityMeasures.list() + ')'); + addOption("maxSimilaritiesPerRow", "m", "Number of maximum similarities per row (default: " + + DEFAULT_MAX_SIMILARITIES_PER_ROW + ')', String.valueOf(DEFAULT_MAX_SIMILARITIES_PER_ROW)); + addOption("excludeSelfSimilarity", "ess", "compute similarity of rows to themselves?", String.valueOf(false)); + addOption("threshold", "tr", "discard row pairs with a similarity value below this", false); + addOption("maxObservationsPerRow", null, "sample rows down to this number of entries", + String.valueOf(DEFAULT_MAX_OBSERVATIONS_PER_ROW)); + addOption("maxObservationsPerColumn", null, "sample columns down to this number of entries", + String.valueOf(DEFAULT_MAX_OBSERVATIONS_PER_COLUMN)); + addOption("randomSeed", null, "use this seed for sampling", false); + addOption(DefaultOptionCreator.overwriteOption().create()); + + Map<String,List<String>> parsedArgs = parseArguments(args); + if (parsedArgs == null) { + return -1; + } + + int numberOfColumns; + + if (hasOption("numberOfColumns")) { + // Number of columns explicitly specified via CLI + numberOfColumns = Integer.parseInt(getOption("numberOfColumns")); + } else { + // else get the number of columns by determining the cardinality of a vector in the input matrix + numberOfColumns = getDimensions(getInputPath()); + } + + String similarityClassnameArg = getOption("similarityClassname"); + String similarityClassname; + try { + similarityClassname = VectorSimilarityMeasures.valueOf(similarityClassnameArg).getClassname(); + } catch (IllegalArgumentException iae) { + similarityClassname = similarityClassnameArg; + } + + // Clear the output and temp paths if the overwrite option has been set + if (hasOption(DefaultOptionCreator.OVERWRITE_OPTION)) { + // Clear the temp path + HadoopUtil.delete(getConf(), getTempPath()); + // Clear the output path + HadoopUtil.delete(getConf(), getOutputPath()); + } + + int maxSimilaritiesPerRow = Integer.parseInt(getOption("maxSimilaritiesPerRow")); + boolean excludeSelfSimilarity = Boolean.parseBoolean(getOption("excludeSelfSimilarity")); + double threshold = hasOption("threshold") + ? Double.parseDouble(getOption("threshold")) : NO_THRESHOLD; + long randomSeed = hasOption("randomSeed") + ? Long.parseLong(getOption("randomSeed")) : NO_FIXED_RANDOM_SEED; + + int maxObservationsPerRow = Integer.parseInt(getOption("maxObservationsPerRow")); + int maxObservationsPerColumn = Integer.parseInt(getOption("maxObservationsPerColumn")); + + Path weightsPath = getTempPath("weights"); + Path normsPath = getTempPath("norms.bin"); + Path numNonZeroEntriesPath = getTempPath("numNonZeroEntries.bin"); + Path maxValuesPath = getTempPath("maxValues.bin"); + Path pairwiseSimilarityPath = getTempPath("pairwiseSimilarity"); + + Path observationsPerColumnPath = getTempPath("observationsPerColumn.bin"); + + AtomicInteger currentPhase = new AtomicInteger(); + + Job countObservations = prepareJob(getInputPath(), getTempPath("notUsed"), CountObservationsMapper.class, + NullWritable.class, VectorWritable.class, SumObservationsReducer.class, NullWritable.class, + VectorWritable.class); + countObservations.setCombinerClass(VectorSumCombiner.class); + countObservations.getConfiguration().set(OBSERVATIONS_PER_COLUMN_PATH, observationsPerColumnPath.toString()); + countObservations.setNumReduceTasks(1); + countObservations.waitForCompletion(true); + + if (shouldRunNextPhase(parsedArgs, currentPhase)) { + Job normsAndTranspose = prepareJob(getInputPath(), weightsPath, VectorNormMapper.class, IntWritable.class, + VectorWritable.class, MergeVectorsReducer.class, IntWritable.class, VectorWritable.class); + normsAndTranspose.setCombinerClass(MergeVectorsCombiner.class); + Configuration normsAndTransposeConf = normsAndTranspose.getConfiguration(); + normsAndTransposeConf.set(THRESHOLD, String.valueOf(threshold)); + normsAndTransposeConf.set(NORMS_PATH, normsPath.toString()); + normsAndTransposeConf.set(NUM_NON_ZERO_ENTRIES_PATH, numNonZeroEntriesPath.toString()); + normsAndTransposeConf.set(MAXVALUES_PATH, maxValuesPath.toString()); + normsAndTransposeConf.set(SIMILARITY_CLASSNAME, similarityClassname); + normsAndTransposeConf.set(OBSERVATIONS_PER_COLUMN_PATH, observationsPerColumnPath.toString()); + normsAndTransposeConf.set(MAX_OBSERVATIONS_PER_ROW, String.valueOf(maxObservationsPerRow)); + normsAndTransposeConf.set(MAX_OBSERVATIONS_PER_COLUMN, String.valueOf(maxObservationsPerColumn)); + normsAndTransposeConf.set(RANDOM_SEED, String.valueOf(randomSeed)); + + boolean succeeded = normsAndTranspose.waitForCompletion(true); + if (!succeeded) { + return -1; + } + } + + if (shouldRunNextPhase(parsedArgs, currentPhase)) { + Job pairwiseSimilarity = prepareJob(weightsPath, pairwiseSimilarityPath, CooccurrencesMapper.class, + IntWritable.class, VectorWritable.class, SimilarityReducer.class, IntWritable.class, VectorWritable.class); + pairwiseSimilarity.setCombinerClass(VectorSumReducer.class); + Configuration pairwiseConf = pairwiseSimilarity.getConfiguration(); + pairwiseConf.set(THRESHOLD, String.valueOf(threshold)); + pairwiseConf.set(NORMS_PATH, normsPath.toString()); + pairwiseConf.set(NUM_NON_ZERO_ENTRIES_PATH, numNonZeroEntriesPath.toString()); + pairwiseConf.set(MAXVALUES_PATH, maxValuesPath.toString()); + pairwiseConf.set(SIMILARITY_CLASSNAME, similarityClassname); + pairwiseConf.setInt(NUMBER_OF_COLUMNS, numberOfColumns); + pairwiseConf.setBoolean(EXCLUDE_SELF_SIMILARITY, excludeSelfSimilarity); + boolean succeeded = pairwiseSimilarity.waitForCompletion(true); + if (!succeeded) { + return -1; + } + } + + if (shouldRunNextPhase(parsedArgs, currentPhase)) { + Job asMatrix = prepareJob(pairwiseSimilarityPath, getOutputPath(), UnsymmetrifyMapper.class, + IntWritable.class, VectorWritable.class, MergeToTopKSimilaritiesReducer.class, IntWritable.class, + VectorWritable.class); + asMatrix.setCombinerClass(MergeToTopKSimilaritiesReducer.class); + asMatrix.getConfiguration().setInt(MAX_SIMILARITIES_PER_ROW, maxSimilaritiesPerRow); + boolean succeeded = asMatrix.waitForCompletion(true); + if (!succeeded) { + return -1; + } + } + + return 0; + } + + public static class CountObservationsMapper extends Mapper<IntWritable,VectorWritable,NullWritable,VectorWritable> { + + private Vector columnCounts = new RandomAccessSparseVector(Integer.MAX_VALUE); + + @Override + protected void map(IntWritable rowIndex, VectorWritable rowVectorWritable, Context ctx) + throws IOException, InterruptedException { + + Vector row = rowVectorWritable.get(); + for (Vector.Element elem : row.nonZeroes()) { + columnCounts.setQuick(elem.index(), columnCounts.getQuick(elem.index()) + 1); + } + } + + @Override + protected void cleanup(Context ctx) throws IOException, InterruptedException { + ctx.write(NullWritable.get(), new VectorWritable(columnCounts)); + } + } + + public static class SumObservationsReducer extends Reducer<NullWritable,VectorWritable,NullWritable,VectorWritable> { + @Override + protected void reduce(NullWritable nullWritable, Iterable<VectorWritable> partialVectors, Context ctx) + throws IOException, InterruptedException { + Vector counts = Vectors.sum(partialVectors.iterator()); + Vectors.write(counts, new Path(ctx.getConfiguration().get(OBSERVATIONS_PER_COLUMN_PATH)), ctx.getConfiguration()); + } + } + + public static class VectorNormMapper extends Mapper<IntWritable,VectorWritable,IntWritable,VectorWritable> { + + private VectorSimilarityMeasure similarity; + private Vector norms; + private Vector nonZeroEntries; + private Vector maxValues; + private double threshold; + + private OpenIntIntHashMap observationsPerColumn; + private int maxObservationsPerRow; + private int maxObservationsPerColumn; + + private Random random; + + @Override + protected void setup(Context ctx) throws IOException, InterruptedException { + + Configuration conf = ctx.getConfiguration(); + + similarity = ClassUtils.instantiateAs(conf.get(SIMILARITY_CLASSNAME), VectorSimilarityMeasure.class); + norms = new RandomAccessSparseVector(Integer.MAX_VALUE); + nonZeroEntries = new RandomAccessSparseVector(Integer.MAX_VALUE); + maxValues = new RandomAccessSparseVector(Integer.MAX_VALUE); + threshold = Double.parseDouble(conf.get(THRESHOLD)); + + observationsPerColumn = Vectors.readAsIntMap(new Path(conf.get(OBSERVATIONS_PER_COLUMN_PATH)), conf); + maxObservationsPerRow = conf.getInt(MAX_OBSERVATIONS_PER_ROW, DEFAULT_MAX_OBSERVATIONS_PER_ROW); + maxObservationsPerColumn = conf.getInt(MAX_OBSERVATIONS_PER_COLUMN, DEFAULT_MAX_OBSERVATIONS_PER_COLUMN); + + long seed = Long.parseLong(conf.get(RANDOM_SEED)); + if (seed == NO_FIXED_RANDOM_SEED) { + random = RandomUtils.getRandom(); + } else { + random = RandomUtils.getRandom(seed); + } + } + + private Vector sampleDown(Vector rowVector, Context ctx) { + + int observationsPerRow = rowVector.getNumNondefaultElements(); + double rowSampleRate = (double) Math.min(maxObservationsPerRow, observationsPerRow) / (double) observationsPerRow; + + Vector downsampledRow = rowVector.like(); + long usedObservations = 0; + long neglectedObservations = 0; + + for (Vector.Element elem : rowVector.nonZeroes()) { + + int columnCount = observationsPerColumn.get(elem.index()); + double columnSampleRate = (double) Math.min(maxObservationsPerColumn, columnCount) / (double) columnCount; + + if (random.nextDouble() <= Math.min(rowSampleRate, columnSampleRate)) { + downsampledRow.setQuick(elem.index(), elem.get()); + usedObservations++; + } else { + neglectedObservations++; + } + + } + + ctx.getCounter(Counters.USED_OBSERVATIONS).increment(usedObservations); + ctx.getCounter(Counters.NEGLECTED_OBSERVATIONS).increment(neglectedObservations); + + return downsampledRow; + } + + @Override + protected void map(IntWritable row, VectorWritable vectorWritable, Context ctx) + throws IOException, InterruptedException { + + Vector sampledRowVector = sampleDown(vectorWritable.get(), ctx); + + Vector rowVector = similarity.normalize(sampledRowVector); + + int numNonZeroEntries = 0; + double maxValue = Double.MIN_VALUE; + + for (Vector.Element element : rowVector.nonZeroes()) { + RandomAccessSparseVector partialColumnVector = new RandomAccessSparseVector(Integer.MAX_VALUE); + partialColumnVector.setQuick(row.get(), element.get()); + ctx.write(new IntWritable(element.index()), new VectorWritable(partialColumnVector)); + + numNonZeroEntries++; + if (maxValue < element.get()) { + maxValue = element.get(); + } + } + + if (threshold != NO_THRESHOLD) { + nonZeroEntries.setQuick(row.get(), numNonZeroEntries); + maxValues.setQuick(row.get(), maxValue); + } + norms.setQuick(row.get(), similarity.norm(rowVector)); + + ctx.getCounter(Counters.ROWS).increment(1); + } + + @Override + protected void cleanup(Context ctx) throws IOException, InterruptedException { + ctx.write(new IntWritable(NORM_VECTOR_MARKER), new VectorWritable(norms)); + ctx.write(new IntWritable(NUM_NON_ZERO_ENTRIES_VECTOR_MARKER), new VectorWritable(nonZeroEntries)); + ctx.write(new IntWritable(MAXVALUE_VECTOR_MARKER), new VectorWritable(maxValues)); + } + } + + private static class MergeVectorsCombiner extends Reducer<IntWritable,VectorWritable,IntWritable,VectorWritable> { + @Override + protected void reduce(IntWritable row, Iterable<VectorWritable> partialVectors, Context ctx) + throws IOException, InterruptedException { + ctx.write(row, new VectorWritable(Vectors.merge(partialVectors))); + } + } + + public static class MergeVectorsReducer extends Reducer<IntWritable,VectorWritable,IntWritable,VectorWritable> { + + private Path normsPath; + private Path numNonZeroEntriesPath; + private Path maxValuesPath; + + @Override + protected void setup(Context ctx) throws IOException, InterruptedException { + normsPath = new Path(ctx.getConfiguration().get(NORMS_PATH)); + numNonZeroEntriesPath = new Path(ctx.getConfiguration().get(NUM_NON_ZERO_ENTRIES_PATH)); + maxValuesPath = new Path(ctx.getConfiguration().get(MAXVALUES_PATH)); + } + + @Override + protected void reduce(IntWritable row, Iterable<VectorWritable> partialVectors, Context ctx) + throws IOException, InterruptedException { + Vector partialVector = Vectors.merge(partialVectors); + + if (row.get() == NORM_VECTOR_MARKER) { + Vectors.write(partialVector, normsPath, ctx.getConfiguration()); + } else if (row.get() == MAXVALUE_VECTOR_MARKER) { + Vectors.write(partialVector, maxValuesPath, ctx.getConfiguration()); + } else if (row.get() == NUM_NON_ZERO_ENTRIES_VECTOR_MARKER) { + Vectors.write(partialVector, numNonZeroEntriesPath, ctx.getConfiguration(), true); + } else { + ctx.write(row, new VectorWritable(partialVector)); + } + } + } + + + public static class CooccurrencesMapper extends Mapper<IntWritable,VectorWritable,IntWritable,VectorWritable> { + + private VectorSimilarityMeasure similarity; + + private OpenIntIntHashMap numNonZeroEntries; + private Vector maxValues; + private double threshold; + + private static final Comparator<Vector.Element> BY_INDEX = new Comparator<Vector.Element>() { + @Override + public int compare(Vector.Element one, Vector.Element two) { + return Ints.compare(one.index(), two.index()); + } + }; + + @Override + protected void setup(Context ctx) throws IOException, InterruptedException { + similarity = ClassUtils.instantiateAs(ctx.getConfiguration().get(SIMILARITY_CLASSNAME), + VectorSimilarityMeasure.class); + numNonZeroEntries = Vectors.readAsIntMap(new Path(ctx.getConfiguration().get(NUM_NON_ZERO_ENTRIES_PATH)), + ctx.getConfiguration()); + maxValues = Vectors.read(new Path(ctx.getConfiguration().get(MAXVALUES_PATH)), ctx.getConfiguration()); + threshold = Double.parseDouble(ctx.getConfiguration().get(THRESHOLD)); + } + + private boolean consider(Vector.Element occurrenceA, Vector.Element occurrenceB) { + int numNonZeroEntriesA = numNonZeroEntries.get(occurrenceA.index()); + int numNonZeroEntriesB = numNonZeroEntries.get(occurrenceB.index()); + + double maxValueA = maxValues.get(occurrenceA.index()); + double maxValueB = maxValues.get(occurrenceB.index()); + + return similarity.consider(numNonZeroEntriesA, numNonZeroEntriesB, maxValueA, maxValueB, threshold); + } + + @Override + protected void map(IntWritable column, VectorWritable occurrenceVector, Context ctx) + throws IOException, InterruptedException { + Vector.Element[] occurrences = Vectors.toArray(occurrenceVector); + Arrays.sort(occurrences, BY_INDEX); + + int cooccurrences = 0; + int prunedCooccurrences = 0; + for (int n = 0; n < occurrences.length; n++) { + Vector.Element occurrenceA = occurrences[n]; + Vector dots = new RandomAccessSparseVector(Integer.MAX_VALUE); + for (int m = n; m < occurrences.length; m++) { + Vector.Element occurrenceB = occurrences[m]; + if (threshold == NO_THRESHOLD || consider(occurrenceA, occurrenceB)) { + dots.setQuick(occurrenceB.index(), similarity.aggregate(occurrenceA.get(), occurrenceB.get())); + cooccurrences++; + } else { + prunedCooccurrences++; + } + } + ctx.write(new IntWritable(occurrenceA.index()), new VectorWritable(dots)); + } + ctx.getCounter(Counters.COOCCURRENCES).increment(cooccurrences); + ctx.getCounter(Counters.PRUNED_COOCCURRENCES).increment(prunedCooccurrences); + } + } + + + public static class SimilarityReducer extends Reducer<IntWritable,VectorWritable,IntWritable,VectorWritable> { + + private VectorSimilarityMeasure similarity; + private int numberOfColumns; + private boolean excludeSelfSimilarity; + private Vector norms; + private double treshold; + + @Override + protected void setup(Context ctx) throws IOException, InterruptedException { + similarity = ClassUtils.instantiateAs(ctx.getConfiguration().get(SIMILARITY_CLASSNAME), + VectorSimilarityMeasure.class); + numberOfColumns = ctx.getConfiguration().getInt(NUMBER_OF_COLUMNS, -1); + Preconditions.checkArgument(numberOfColumns > 0, "Number of columns must be greater then 0! But numberOfColumns = " + numberOfColumns); + excludeSelfSimilarity = ctx.getConfiguration().getBoolean(EXCLUDE_SELF_SIMILARITY, false); + norms = Vectors.read(new Path(ctx.getConfiguration().get(NORMS_PATH)), ctx.getConfiguration()); + treshold = Double.parseDouble(ctx.getConfiguration().get(THRESHOLD)); + } + + @Override + protected void reduce(IntWritable row, Iterable<VectorWritable> partialDots, Context ctx) + throws IOException, InterruptedException { + Iterator<VectorWritable> partialDotsIterator = partialDots.iterator(); + Vector dots = partialDotsIterator.next().get(); + while (partialDotsIterator.hasNext()) { + Vector toAdd = partialDotsIterator.next().get(); + for (Element nonZeroElement : toAdd.nonZeroes()) { + dots.setQuick(nonZeroElement.index(), dots.getQuick(nonZeroElement.index()) + nonZeroElement.get()); + } + } + + Vector similarities = dots.like(); + double normA = norms.getQuick(row.get()); + for (Element b : dots.nonZeroes()) { + double similarityValue = similarity.similarity(b.get(), normA, norms.getQuick(b.index()), numberOfColumns); + if (similarityValue >= treshold) { + similarities.set(b.index(), similarityValue); + } + } + if (excludeSelfSimilarity) { + similarities.setQuick(row.get(), 0); + } + ctx.write(row, new VectorWritable(similarities)); + } + } + + public static class UnsymmetrifyMapper extends Mapper<IntWritable,VectorWritable,IntWritable,VectorWritable> { + + private int maxSimilaritiesPerRow; + + @Override + protected void setup(Mapper.Context ctx) throws IOException, InterruptedException { + maxSimilaritiesPerRow = ctx.getConfiguration().getInt(MAX_SIMILARITIES_PER_ROW, 0); + Preconditions.checkArgument(maxSimilaritiesPerRow > 0, "Maximum number of similarities per row must be greater then 0!"); + } + + @Override + protected void map(IntWritable row, VectorWritable similaritiesWritable, Context ctx) + throws IOException, InterruptedException { + Vector similarities = similaritiesWritable.get(); + // For performance, the creation of transposedPartial is moved out of the while loop and it is reused inside + Vector transposedPartial = new RandomAccessSparseVector(similarities.size(), 1); + TopElementsQueue topKQueue = new TopElementsQueue(maxSimilaritiesPerRow); + for (Element nonZeroElement : similarities.nonZeroes()) { + MutableElement top = topKQueue.top(); + double candidateValue = nonZeroElement.get(); + if (candidateValue > top.get()) { + top.setIndex(nonZeroElement.index()); + top.set(candidateValue); + topKQueue.updateTop(); + } + + transposedPartial.setQuick(row.get(), candidateValue); + ctx.write(new IntWritable(nonZeroElement.index()), new VectorWritable(transposedPartial)); + transposedPartial.setQuick(row.get(), 0.0); + } + Vector topKSimilarities = new RandomAccessSparseVector(similarities.size(), maxSimilaritiesPerRow); + for (Vector.Element topKSimilarity : topKQueue.getTopElements()) { + topKSimilarities.setQuick(topKSimilarity.index(), topKSimilarity.get()); + } + ctx.write(row, new VectorWritable(topKSimilarities)); + } + } + + public static class MergeToTopKSimilaritiesReducer + extends Reducer<IntWritable,VectorWritable,IntWritable,VectorWritable> { + + private int maxSimilaritiesPerRow; + + @Override + protected void setup(Context ctx) throws IOException, InterruptedException { + maxSimilaritiesPerRow = ctx.getConfiguration().getInt(MAX_SIMILARITIES_PER_ROW, 0); + Preconditions.checkArgument(maxSimilaritiesPerRow > 0, "Maximum number of similarities per row must be greater then 0!"); + } + + @Override + protected void reduce(IntWritable row, Iterable<VectorWritable> partials, Context ctx) + throws IOException, InterruptedException { + Vector allSimilarities = Vectors.merge(partials); + Vector topKSimilarities = Vectors.topKElements(maxSimilaritiesPerRow, allSimilarities); + ctx.write(row, new VectorWritable(topKSimilarities)); + } + } + +} http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/similarity/cooccurrence/TopElementsQueue.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/similarity/cooccurrence/TopElementsQueue.java b/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/similarity/cooccurrence/TopElementsQueue.java new file mode 100644 index 0000000..34135ac --- /dev/null +++ b/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/similarity/cooccurrence/TopElementsQueue.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.math.hadoop.similarity.cooccurrence; + +import com.google.common.collect.Lists; +import org.apache.lucene.util.PriorityQueue; + +import java.util.Collections; +import java.util.List; + +public class TopElementsQueue extends PriorityQueue<MutableElement> { + + private final int maxSize; + + private static final int SENTINEL_INDEX = Integer.MIN_VALUE; + + public TopElementsQueue(int maxSize) { + super(maxSize); + this.maxSize = maxSize; + } + + public List<MutableElement> getTopElements() { + List<MutableElement> topElements = Lists.newArrayListWithCapacity(maxSize); + while (size() > 0) { + MutableElement top = pop(); + // filter out "sentinel" objects necessary for maintaining an efficient priority queue + if (top.index() != SENTINEL_INDEX) { + topElements.add(top); + } + } + Collections.reverse(topElements); + return topElements; + } + + @Override + protected MutableElement getSentinelObject() { + return new MutableElement(SENTINEL_INDEX, Double.MIN_VALUE); + } + + @Override + protected boolean lessThan(MutableElement e1, MutableElement e2) { + return e1.get() < e2.get(); + } +} http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/similarity/cooccurrence/Vectors.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/similarity/cooccurrence/Vectors.java b/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/similarity/cooccurrence/Vectors.java new file mode 100644 index 0000000..66fb0ae --- /dev/null +++ b/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/similarity/cooccurrence/Vectors.java @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.math.hadoop.similarity.cooccurrence; + +import java.io.DataInput; +import java.io.IOException; +import java.util.Iterator; + +import com.google.common.base.Preconditions; +import com.google.common.io.Closeables; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.mahout.common.iterator.FixedSizeSamplingIterator; +import org.apache.mahout.math.RandomAccessSparseVector; +import org.apache.mahout.math.Varint; +import org.apache.mahout.math.Vector; +import org.apache.mahout.math.Vector.Element; +import org.apache.mahout.math.VectorWritable; +import org.apache.mahout.math.function.Functions; +import org.apache.mahout.math.map.OpenIntIntHashMap; + +public final class Vectors { + + private Vectors() {} + + public static Vector maybeSample(Vector original, int sampleSize) { + if (original.getNumNondefaultElements() <= sampleSize) { + return original; + } + Vector sample = new RandomAccessSparseVector(original.size(), sampleSize); + Iterator<Element> sampledElements = + new FixedSizeSamplingIterator<>(sampleSize, original.nonZeroes().iterator()); + while (sampledElements.hasNext()) { + Element elem = sampledElements.next(); + sample.setQuick(elem.index(), elem.get()); + } + return sample; + } + + public static Vector topKElements(int k, Vector original) { + if (original.getNumNondefaultElements() <= k) { + return original; + } + + TopElementsQueue topKQueue = new TopElementsQueue(k); + for (Element nonZeroElement : original.nonZeroes()) { + MutableElement top = topKQueue.top(); + double candidateValue = nonZeroElement.get(); + if (candidateValue > top.get()) { + top.setIndex(nonZeroElement.index()); + top.set(candidateValue); + topKQueue.updateTop(); + } + } + + Vector topKSimilarities = new RandomAccessSparseVector(original.size(), k); + for (Vector.Element topKSimilarity : topKQueue.getTopElements()) { + topKSimilarities.setQuick(topKSimilarity.index(), topKSimilarity.get()); + } + return topKSimilarities; + } + + public static Vector merge(Iterable<VectorWritable> partialVectors) { + Iterator<VectorWritable> vectors = partialVectors.iterator(); + Vector accumulator = vectors.next().get(); + while (vectors.hasNext()) { + VectorWritable v = vectors.next(); + if (v != null) { + for (Element nonZeroElement : v.get().nonZeroes()) { + accumulator.setQuick(nonZeroElement.index(), nonZeroElement.get()); + } + } + } + return accumulator; + } + + public static Vector sum(Iterator<VectorWritable> vectors) { + Vector sum = vectors.next().get(); + while (vectors.hasNext()) { + sum.assign(vectors.next().get(), Functions.PLUS); + } + return sum; + } + + static class TemporaryElement implements Vector.Element { + + private final int index; + private double value; + + TemporaryElement(int index, double value) { + this.index = index; + this.value = value; + } + + TemporaryElement(Vector.Element toClone) { + this(toClone.index(), toClone.get()); + } + + @Override + public double get() { + return value; + } + + @Override + public int index() { + return index; + } + + @Override + public void set(double value) { + this.value = value; + } + } + + public static Vector.Element[] toArray(VectorWritable vectorWritable) { + Vector.Element[] elements = new Vector.Element[vectorWritable.get().getNumNondefaultElements()]; + int k = 0; + for (Element nonZeroElement : vectorWritable.get().nonZeroes()) { + elements[k++] = new TemporaryElement(nonZeroElement.index(), nonZeroElement.get()); + } + return elements; + } + + public static void write(Vector vector, Path path, Configuration conf) throws IOException { + write(vector, path, conf, false); + } + + public static void write(Vector vector, Path path, Configuration conf, boolean laxPrecision) throws IOException { + FileSystem fs = FileSystem.get(path.toUri(), conf); + FSDataOutputStream out = fs.create(path); + try { + VectorWritable vectorWritable = new VectorWritable(vector); + vectorWritable.setWritesLaxPrecision(laxPrecision); + vectorWritable.write(out); + } finally { + Closeables.close(out, false); + } + } + + public static OpenIntIntHashMap readAsIntMap(Path path, Configuration conf) throws IOException { + FileSystem fs = FileSystem.get(path.toUri(), conf); + FSDataInputStream in = fs.open(path); + try { + return readAsIntMap(in); + } finally { + Closeables.close(in, true); + } + } + + /* ugly optimization for loading sparse vectors containing ints only */ + private static OpenIntIntHashMap readAsIntMap(DataInput in) throws IOException { + int flags = in.readByte(); + Preconditions.checkArgument(flags >> VectorWritable.NUM_FLAGS == 0, + "Unknown flags set: %d", Integer.toString(flags, 2)); + boolean dense = (flags & VectorWritable.FLAG_DENSE) != 0; + boolean sequential = (flags & VectorWritable.FLAG_SEQUENTIAL) != 0; + boolean laxPrecision = (flags & VectorWritable.FLAG_LAX_PRECISION) != 0; + Preconditions.checkState(!dense && !sequential, "Only for reading sparse vectors!"); + + Varint.readUnsignedVarInt(in); + + OpenIntIntHashMap values = new OpenIntIntHashMap(); + int numNonDefaultElements = Varint.readUnsignedVarInt(in); + for (int i = 0; i < numNonDefaultElements; i++) { + int index = Varint.readUnsignedVarInt(in); + double value = laxPrecision ? in.readFloat() : in.readDouble(); + values.put(index, (int) value); + } + return values; + } + + public static Vector read(Path path, Configuration conf) throws IOException { + FileSystem fs = FileSystem.get(path.toUri(), conf); + FSDataInputStream in = fs.open(path); + try { + return VectorWritable.readVector(in); + } finally { + Closeables.close(in, true); + } + } +} http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/similarity/cooccurrence/measures/CityBlockSimilarity.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/similarity/cooccurrence/measures/CityBlockSimilarity.java b/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/similarity/cooccurrence/measures/CityBlockSimilarity.java new file mode 100644 index 0000000..0435d84 --- /dev/null +++ b/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/similarity/cooccurrence/measures/CityBlockSimilarity.java @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.math.hadoop.similarity.cooccurrence.measures; + +public class CityBlockSimilarity extends CountbasedMeasure { + + @Override + public double similarity(double dots, double normA, double normB, int numberOfColumns) { + return 1.0 / (1.0 + normA + normB - 2 * dots); + } +} http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/similarity/cooccurrence/measures/CooccurrenceCountSimilarity.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/similarity/cooccurrence/measures/CooccurrenceCountSimilarity.java b/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/similarity/cooccurrence/measures/CooccurrenceCountSimilarity.java new file mode 100644 index 0000000..61d071f --- /dev/null +++ b/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/similarity/cooccurrence/measures/CooccurrenceCountSimilarity.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.math.hadoop.similarity.cooccurrence.measures; + +public class CooccurrenceCountSimilarity extends CountbasedMeasure { + + @Override + public double similarity(double dots, double normA, double normB, int numberOfColumns) { + return dots; + } + + @Override + public boolean consider(int numNonZeroEntriesA, int numNonZeroEntriesB, double maxValueA, double maxValueB, + double threshold) { + return numNonZeroEntriesA >= threshold && numNonZeroEntriesB >= threshold; + } +} http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/similarity/cooccurrence/measures/CosineSimilarity.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/similarity/cooccurrence/measures/CosineSimilarity.java b/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/similarity/cooccurrence/measures/CosineSimilarity.java new file mode 100644 index 0000000..3f4946b --- /dev/null +++ b/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/similarity/cooccurrence/measures/CosineSimilarity.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.math.hadoop.similarity.cooccurrence.measures; + +import org.apache.mahout.math.Vector; + +public class CosineSimilarity implements VectorSimilarityMeasure { + + @Override + public Vector normalize(Vector vector) { + return vector.normalize(); + } + + @Override + public double norm(Vector vector) { + return VectorSimilarityMeasure.NO_NORM; + } + + @Override + public double aggregate(double valueA, double nonZeroValueB) { + return valueA * nonZeroValueB; + } + + @Override + public double similarity(double dots, double normA, double normB, int numberOfColumns) { + return dots; + } + + @Override + public boolean consider(int numNonZeroEntriesA, int numNonZeroEntriesB, double maxValueA, double maxValueB, + double threshold) { + return numNonZeroEntriesB >= threshold / maxValueA + && numNonZeroEntriesA >= threshold / maxValueB; + } +} http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/similarity/cooccurrence/measures/CountbasedMeasure.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/similarity/cooccurrence/measures/CountbasedMeasure.java b/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/similarity/cooccurrence/measures/CountbasedMeasure.java new file mode 100644 index 0000000..105df2b --- /dev/null +++ b/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/similarity/cooccurrence/measures/CountbasedMeasure.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.math.hadoop.similarity.cooccurrence.measures; + +import org.apache.mahout.math.Vector; + +public abstract class CountbasedMeasure implements VectorSimilarityMeasure { + + @Override + public Vector normalize(Vector vector) { + return vector; + } + + @Override + public double norm(Vector vector) { + return vector.norm(0); + } + + @Override + public double aggregate(double valueA, double nonZeroValueB) { + return 1; + } + + @Override + public boolean consider(int numNonZeroEntriesA, int numNonZeroEntriesB, double maxValueA, double maxValueB, + double threshold) { + return true; + } +}
