http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/integration/src/main/java/org/apache/mahout/utils/SplitInput.java ---------------------------------------------------------------------- diff --git a/integration/src/main/java/org/apache/mahout/utils/SplitInput.java b/integration/src/main/java/org/apache/mahout/utils/SplitInput.java deleted file mode 100644 index 6178f80..0000000 --- a/integration/src/main/java/org/apache/mahout/utils/SplitInput.java +++ /dev/null @@ -1,673 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.mahout.utils; - -import java.io.BufferedReader; -import java.io.IOException; -import java.io.InputStreamReader; -import java.io.OutputStreamWriter; -import java.io.Writer; -import java.nio.charset.Charset; -import java.util.BitSet; - -import com.google.common.base.Preconditions; -import org.apache.commons.cli2.OptionException; -import org.apache.commons.io.Charsets; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.SequenceFile; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.util.ToolRunner; -import org.apache.mahout.common.AbstractJob; -import org.apache.mahout.common.CommandLineUtil; -import org.apache.mahout.common.HadoopUtil; -import org.apache.mahout.common.Pair; -import org.apache.mahout.common.RandomUtils; -import org.apache.mahout.common.commandline.DefaultOptionCreator; -import org.apache.mahout.common.iterator.sequencefile.PathFilters; -import org.apache.mahout.common.iterator.sequencefile.SequenceFileIterator; -import org.apache.mahout.math.jet.random.sampling.RandomSampler; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * A utility for splitting files in the input format used by the Bayes - * classifiers or anything else that has one item per line or SequenceFiles (key/value) - * into training and test sets in order to perform cross-validation. - * <p/> - * <p/> - * This class can be used to split directories of files or individual files into - * training and test sets using a number of different methods. - * <p/> - * When executed via {@link #splitDirectory(Path)} or {@link #splitFile(Path)}, - * the lines read from one or more, input files are written to files of the same - * name into the directories specified by the - * {@link #setTestOutputDirectory(Path)} and - * {@link #setTrainingOutputDirectory(Path)} methods. - * <p/> - * The composition of the test set is determined using one of the following - * approaches: - * <ul> - * <li>A contiguous set of items can be chosen from the input file(s) using the - * {@link #setTestSplitSize(int)} or {@link #setTestSplitPct(int)} methods. - * {@link #setTestSplitSize(int)} allocates a fixed number of items, while - * {@link #setTestSplitPct(int)} allocates a percentage of the original input, - * rounded up to the nearest integer. {@link #setSplitLocation(int)} is used to - * control the position in the input from which the test data is extracted and - * is described further below.</li> - * <li>A random sampling of items can be chosen from the input files(s) using - * the {@link #setTestRandomSelectionSize(int)} or - * {@link #setTestRandomSelectionPct(int)} methods, each choosing a fixed test - * set size or percentage of the input set size as described above. The - * {@link RandomSampler} class from {@code mahout-math} is used to create a sample - * of the appropriate size.</li> - * </ul> - * <p/> - * Any one of the methods above can be used to control the size of the test set. - * If multiple methods are called, a runtime exception will be thrown at - * execution time. - * <p/> - * The {@link #setSplitLocation(int)} method is passed an integer from 0 to 100 - * (inclusive) which is translated into the position of the start of the test - * data within the input file. - * <p/> - * Given: - * <ul> - * <li>an input file of 1500 lines</li> - * <li>a desired test data size of 10 percent</li> - * </ul> - * <p/> - * <ul> - * <li>A split location of 0 will cause the first 150 items appearing in the - * input set to be written to the test set.</li> - * <li>A split location of 25 will cause items 375-525 to be written to the test - * set.</li> - * <li>A split location of 100 will cause the last 150 items in the input to be - * written to the test set</li> - * </ul> - * The start of the split will always be adjusted forwards in order to ensure - * that the desired test set size is allocated. Split location has no effect is - * random sampling is employed. - */ -public class SplitInput extends AbstractJob { - - private static final Logger log = LoggerFactory.getLogger(SplitInput.class); - - private int testSplitSize = -1; - private int testSplitPct = -1; - private int splitLocation = 100; - private int testRandomSelectionSize = -1; - private int testRandomSelectionPct = -1; - private int keepPct = 100; - private Charset charset = Charsets.UTF_8; - private boolean useSequence; - private boolean useMapRed; - - private Path inputDirectory; - private Path trainingOutputDirectory; - private Path testOutputDirectory; - private Path mapRedOutputDirectory; - - private SplitCallback callback; - - @Override - public int run(String[] args) throws Exception { - - if (parseArgs(args)) { - splitDirectory(); - } - return 0; - } - - public static void main(String[] args) throws Exception { - ToolRunner.run(new Configuration(), new SplitInput(), args); - } - - /** - * Configure this instance based on the command-line arguments contained within provided array. - * Calls {@link #validate()} to ensure consistency of configuration. - * - * @return true if the arguments were parsed successfully and execution should proceed. - * @throws Exception if there is a problem parsing the command-line arguments or the particular - * combination would violate class invariants. - */ - private boolean parseArgs(String[] args) throws Exception { - - addInputOption(); - addOption("trainingOutput", "tr", "The training data output directory", false); - addOption("testOutput", "te", "The test data output directory", false); - addOption("testSplitSize", "ss", "The number of documents held back as test data for each category", false); - addOption("testSplitPct", "sp", "The % of documents held back as test data for each category", false); - addOption("splitLocation", "sl", "Location for start of test data expressed as a percentage of the input file " - + "size (0=start, 50=middle, 100=end", false); - addOption("randomSelectionSize", "rs", "The number of items to be randomly selected as test data ", false); - addOption("randomSelectionPct", "rp", "Percentage of items to be randomly selected as test data when using " - + "mapreduce mode", false); - addOption("charset", "c", "The name of the character encoding of the input files (not needed if using " - + "SequenceFiles)", false); - addOption(buildOption("sequenceFiles", "seq", "Set if the input files are sequence files. Default is false", - false, false, "false")); - addOption(DefaultOptionCreator.methodOption().create()); - addOption(DefaultOptionCreator.overwriteOption().create()); - //TODO: extend this to sequential mode - addOption("keepPct", "k", "The percentage of total data to keep in map-reduce mode, the rest will be ignored. " - + "Default is 100%", false); - addOption("mapRedOutputDir", "mro", "Output directory for map reduce jobs", false); - - if (parseArguments(args) == null) { - return false; - } - - try { - inputDirectory = getInputPath(); - - useMapRed = getOption(DefaultOptionCreator.METHOD_OPTION).equalsIgnoreCase(DefaultOptionCreator.MAPREDUCE_METHOD); - - if (useMapRed) { - if (!hasOption("randomSelectionPct")) { - throw new OptionException(getCLIOption("randomSelectionPct"), - "must set randomSelectionPct when mapRed option is used"); - } - if (!hasOption("mapRedOutputDir")) { - throw new OptionException(getCLIOption("mapRedOutputDir"), - "mapRedOutputDir must be set when mapRed option is used"); - } - mapRedOutputDirectory = new Path(getOption("mapRedOutputDir")); - if (hasOption("keepPct")) { - keepPct = Integer.parseInt(getOption("keepPct")); - } - if (hasOption(DefaultOptionCreator.OVERWRITE_OPTION)) { - HadoopUtil.delete(getConf(), mapRedOutputDirectory); - } - } else { - if (!hasOption("trainingOutput") - || !hasOption("testOutput")) { - throw new OptionException(getCLIOption("trainingOutput"), - "trainingOutput and testOutput must be set if mapRed option is not used"); - } - if (!hasOption("testSplitSize") - && !hasOption("testSplitPct") - && !hasOption("randomSelectionPct") - && !hasOption("randomSelectionSize")) { - throw new OptionException(getCLIOption("testSplitSize"), - "must set one of test split size/percentage or randomSelectionSize/percentage"); - } - - trainingOutputDirectory = new Path(getOption("trainingOutput")); - testOutputDirectory = new Path(getOption("testOutput")); - FileSystem fs = trainingOutputDirectory.getFileSystem(getConf()); - if (hasOption(DefaultOptionCreator.OVERWRITE_OPTION)) { - HadoopUtil.delete(fs.getConf(), trainingOutputDirectory); - HadoopUtil.delete(fs.getConf(), testOutputDirectory); - } - fs.mkdirs(trainingOutputDirectory); - fs.mkdirs(testOutputDirectory); - } - - if (hasOption("charset")) { - charset = Charset.forName(getOption("charset")); - } - - if (hasOption("testSplitSize") && hasOption("testSplitPct")) { - throw new OptionException(getCLIOption("testSplitPct"), "must have either split size or split percentage " - + "option, not BOTH"); - } - - if (hasOption("testSplitSize")) { - setTestSplitSize(Integer.parseInt(getOption("testSplitSize"))); - } - - if (hasOption("testSplitPct")) { - setTestSplitPct(Integer.parseInt(getOption("testSplitPct"))); - } - - if (hasOption("splitLocation")) { - setSplitLocation(Integer.parseInt(getOption("splitLocation"))); - } - - if (hasOption("randomSelectionSize")) { - setTestRandomSelectionSize(Integer.parseInt(getOption("randomSelectionSize"))); - } - - if (hasOption("randomSelectionPct")) { - setTestRandomSelectionPct(Integer.parseInt(getOption("randomSelectionPct"))); - } - - useSequence = hasOption("sequenceFiles"); - - } catch (OptionException e) { - log.error("Command-line option Exception", e); - CommandLineUtil.printHelp(getGroup()); - return false; - } - - validate(); - return true; - } - - /** - * Perform a split on directory specified by {@link #setInputDirectory(Path)} by calling {@link #splitFile(Path)} - * on each file found within that directory. - */ - public void splitDirectory() throws IOException, ClassNotFoundException, InterruptedException { - this.splitDirectory(inputDirectory); - } - - /** - * Perform a split on the specified directory by calling {@link #splitFile(Path)} on each file found within that - * directory. - */ - public void splitDirectory(Path inputDir) throws IOException, ClassNotFoundException, InterruptedException { - Configuration conf = getConf(); - splitDirectory(conf, inputDir); - } - - /* - * See also splitDirectory(Path inputDir) - * */ - public void splitDirectory(Configuration conf, Path inputDir) - throws IOException, ClassNotFoundException, InterruptedException { - FileSystem fs = inputDir.getFileSystem(conf); - if (fs.getFileStatus(inputDir) == null) { - throw new IOException(inputDir + " does not exist"); - } - if (!fs.getFileStatus(inputDir).isDir()) { - throw new IOException(inputDir + " is not a directory"); - } - - if (useMapRed) { - SplitInputJob.run(conf, inputDir, mapRedOutputDirectory, - keepPct, testRandomSelectionPct); - } else { - // input dir contains one file per category. - FileStatus[] fileStats = fs.listStatus(inputDir, PathFilters.logsCRCFilter()); - for (FileStatus inputFile : fileStats) { - if (!inputFile.isDir()) { - splitFile(inputFile.getPath()); - } - } - } - } - - /** - * Perform a split on the specified input file. Results will be written to files of the same name in the specified - * training and test output directories. The {@link #validate()} method is called prior to executing the split. - */ - public void splitFile(Path inputFile) throws IOException { - Configuration conf = getConf(); - FileSystem fs = inputFile.getFileSystem(conf); - if (fs.getFileStatus(inputFile) == null) { - throw new IOException(inputFile + " does not exist"); - } - if (fs.getFileStatus(inputFile).isDir()) { - throw new IOException(inputFile + " is a directory"); - } - - validate(); - - Path testOutputFile = new Path(testOutputDirectory, inputFile.getName()); - Path trainingOutputFile = new Path(trainingOutputDirectory, inputFile.getName()); - - int lineCount = countLines(fs, inputFile, charset); - - log.info("{} has {} lines", inputFile.getName(), lineCount); - - int testSplitStart = 0; - int testSplitSize = this.testSplitSize; // don't modify state - BitSet randomSel = null; - - if (testRandomSelectionPct > 0 || testRandomSelectionSize > 0) { - testSplitSize = this.testRandomSelectionSize; - - if (testRandomSelectionPct > 0) { - testSplitSize = Math.round(lineCount * testRandomSelectionPct / 100.0f); - } - log.info("{} test split size is {} based on random selection percentage {}", - inputFile.getName(), testSplitSize, testRandomSelectionPct); - long[] ridx = new long[testSplitSize]; - RandomSampler.sample(testSplitSize, lineCount - 1, testSplitSize, 0, ridx, 0, RandomUtils.getRandom()); - randomSel = new BitSet(lineCount); - for (long idx : ridx) { - randomSel.set((int) idx + 1); - } - } else { - if (testSplitPct > 0) { // calculate split size based on percentage - testSplitSize = Math.round(lineCount * testSplitPct / 100.0f); - log.info("{} test split size is {} based on percentage {}", - inputFile.getName(), testSplitSize, testSplitPct); - } else { - log.info("{} test split size is {}", inputFile.getName(), testSplitSize); - } - - if (splitLocation > 0) { // calculate start of split based on percentage - testSplitStart = Math.round(lineCount * splitLocation / 100.0f); - if (lineCount - testSplitStart < testSplitSize) { - // adjust split start downwards based on split size. - testSplitStart = lineCount - testSplitSize; - } - log.info("{} test split start is {} based on split location {}", - inputFile.getName(), testSplitStart, splitLocation); - } - - if (testSplitStart < 0) { - throw new IllegalArgumentException("test split size for " + inputFile + " is too large, it would produce an " - + "empty training set from the initial set of " + lineCount + " examples"); - } else if (lineCount - testSplitSize < testSplitSize) { - log.warn("Test set size for {} may be too large, {} is larger than the number of " - + "lines remaining in the training set: {}", - inputFile, testSplitSize, lineCount - testSplitSize); - } - } - int trainCount = 0; - int testCount = 0; - if (!useSequence) { - try (BufferedReader reader = new BufferedReader(new InputStreamReader(fs.open(inputFile), charset)); - Writer trainingWriter = new OutputStreamWriter(fs.create(trainingOutputFile), charset); - Writer testWriter = new OutputStreamWriter(fs.create(testOutputFile), charset)){ - - String line; - int pos = 0; - while ((line = reader.readLine()) != null) { - pos++; - - Writer writer; - if (testRandomSelectionPct > 0) { // Randomly choose - writer = randomSel.get(pos) ? testWriter : trainingWriter; - } else { // Choose based on location - writer = pos > testSplitStart ? testWriter : trainingWriter; - } - - if (writer == testWriter) { - if (testCount >= testSplitSize) { - writer = trainingWriter; - } else { - testCount++; - } - } - if (writer == trainingWriter) { - trainCount++; - } - writer.write(line); - writer.write('\n'); - } - - } - } else { - try (SequenceFileIterator<Writable, Writable> iterator = - new SequenceFileIterator<>(inputFile, false, fs.getConf()); - SequenceFile.Writer trainingWriter = SequenceFile.createWriter(fs, fs.getConf(), trainingOutputFile, - iterator.getKeyClass(), iterator.getValueClass()); - SequenceFile.Writer testWriter = SequenceFile.createWriter(fs, fs.getConf(), testOutputFile, - iterator.getKeyClass(), iterator.getValueClass())) { - - int pos = 0; - while (iterator.hasNext()) { - pos++; - SequenceFile.Writer writer; - if (testRandomSelectionPct > 0) { // Randomly choose - writer = randomSel.get(pos) ? testWriter : trainingWriter; - } else { // Choose based on location - writer = pos > testSplitStart ? testWriter : trainingWriter; - } - - if (writer == testWriter) { - if (testCount >= testSplitSize) { - writer = trainingWriter; - } else { - testCount++; - } - } - if (writer == trainingWriter) { - trainCount++; - } - Pair<Writable, Writable> pair = iterator.next(); - writer.append(pair.getFirst(), pair.getSecond()); - } - - } - } - log.info("file: {}, input: {} train: {}, test: {} starting at {}", - inputFile.getName(), lineCount, trainCount, testCount, testSplitStart); - - // testing; - if (callback != null) { - callback.splitComplete(inputFile, lineCount, trainCount, testCount, testSplitStart); - } - } - - public int getTestSplitSize() { - return testSplitSize; - } - - public void setTestSplitSize(int testSplitSize) { - this.testSplitSize = testSplitSize; - } - - public int getTestSplitPct() { - return testSplitPct; - } - - /** - * Sets the percentage of the input data to allocate to the test split - * - * @param testSplitPct a value between 0 and 100 inclusive. - */ - public void setTestSplitPct(int testSplitPct) { - this.testSplitPct = testSplitPct; - } - - /** - * Sets the percentage of the input data to keep in a map reduce split input job - * - * @param keepPct a value between 0 and 100 inclusive. - */ - public void setKeepPct(int keepPct) { - this.keepPct = keepPct; - } - - /** - * Set to true to use map reduce to split the input - * - * @param useMapRed a boolean to indicate whether map reduce should be used - */ - public void setUseMapRed(boolean useMapRed) { - this.useMapRed = useMapRed; - } - - public void setMapRedOutputDirectory(Path mapRedOutputDirectory) { - this.mapRedOutputDirectory = mapRedOutputDirectory; - } - - public int getSplitLocation() { - return splitLocation; - } - - /** - * Set the location of the start of the test/training data split. Expressed as percentage of lines, for example - * 0 indicates that the test data should be taken from the start of the file, 100 indicates that the test data - * should be taken from the end of the input file, while 25 indicates that the test data should be taken from the - * first quarter of the file. - * <p/> - * This option is only relevant in cases where random selection is not employed - * - * @param splitLocation a value between 0 and 100 inclusive. - */ - public void setSplitLocation(int splitLocation) { - this.splitLocation = splitLocation; - } - - public Charset getCharset() { - return charset; - } - - /** - * Set the charset used to read and write files - */ - public void setCharset(Charset charset) { - this.charset = charset; - } - - public Path getInputDirectory() { - return inputDirectory; - } - - /** - * Set the directory from which input data will be read when the the {@link #splitDirectory()} method is invoked - */ - public void setInputDirectory(Path inputDir) { - this.inputDirectory = inputDir; - } - - public Path getTrainingOutputDirectory() { - return trainingOutputDirectory; - } - - /** - * Set the directory to which training data will be written. - */ - public void setTrainingOutputDirectory(Path trainingOutputDir) { - this.trainingOutputDirectory = trainingOutputDir; - } - - public Path getTestOutputDirectory() { - return testOutputDirectory; - } - - /** - * Set the directory to which test data will be written. - */ - public void setTestOutputDirectory(Path testOutputDir) { - this.testOutputDirectory = testOutputDir; - } - - public SplitCallback getCallback() { - return callback; - } - - /** - * Sets the callback used to inform the caller that an input file has been successfully split - */ - public void setCallback(SplitCallback callback) { - this.callback = callback; - } - - public int getTestRandomSelectionSize() { - return testRandomSelectionSize; - } - - /** - * Sets number of random input samples that will be saved to the test set. - */ - public void setTestRandomSelectionSize(int testRandomSelectionSize) { - this.testRandomSelectionSize = testRandomSelectionSize; - } - - public int getTestRandomSelectionPct() { - - return testRandomSelectionPct; - } - - /** - * Sets number of random input samples that will be saved to the test set as a percentage of the size of the - * input set. - * - * @param randomSelectionPct a value between 0 and 100 inclusive. - */ - public void setTestRandomSelectionPct(int randomSelectionPct) { - this.testRandomSelectionPct = randomSelectionPct; - } - - /** - * Validates that the current instance is in a consistent state - * - * @throws IllegalArgumentException if settings violate class invariants. - * @throws IOException if output directories do not exist or are not directories. - */ - public void validate() throws IOException { - Preconditions.checkArgument(testSplitSize >= 1 || testSplitSize == -1, - "Invalid testSplitSize: " + testSplitSize + ". Must be: testSplitSize >= 1 or testSplitSize = -1"); - Preconditions.checkArgument(splitLocation >= 0 && splitLocation <= 100 || splitLocation == -1, - "Invalid splitLocation percentage: " + splitLocation + ". Must be: 0 <= splitLocation <= 100 or splitLocation = -1"); - Preconditions.checkArgument(testSplitPct >= 0 && testSplitPct <= 100 || testSplitPct == -1, - "Invalid testSplitPct percentage: " + testSplitPct + ". Must be: 0 <= testSplitPct <= 100 or testSplitPct = -1"); - Preconditions.checkArgument(testRandomSelectionPct >= 0 && testRandomSelectionPct <= 100 - || testRandomSelectionPct == -1,"Invalid testRandomSelectionPct percentage: " + testRandomSelectionPct + - ". Must be: 0 <= testRandomSelectionPct <= 100 or testRandomSelectionPct = -1"); - - Preconditions.checkArgument(trainingOutputDirectory != null || useMapRed, - "No training output directory was specified"); - Preconditions.checkArgument(testOutputDirectory != null || useMapRed, "No test output directory was specified"); - - // only one of the following may be set, one must be set. - int count = 0; - if (testSplitSize > 0) { - count++; - } - if (testSplitPct > 0) { - count++; - } - if (testRandomSelectionSize > 0) { - count++; - } - if (testRandomSelectionPct > 0) { - count++; - } - - Preconditions.checkArgument(count == 1, "Exactly one of testSplitSize, testSplitPct, testRandomSelectionSize, " - + "testRandomSelectionPct should be set"); - - if (!useMapRed) { - Configuration conf = getConf(); - FileSystem fs = trainingOutputDirectory.getFileSystem(conf); - FileStatus trainingOutputDirStatus = fs.getFileStatus(trainingOutputDirectory); - Preconditions.checkArgument(trainingOutputDirStatus != null && trainingOutputDirStatus.isDir(), - "%s is not a directory", trainingOutputDirectory); - FileStatus testOutputDirStatus = fs.getFileStatus(testOutputDirectory); - Preconditions.checkArgument(testOutputDirStatus != null && testOutputDirStatus.isDir(), - "%s is not a directory", testOutputDirectory); - } - } - - /** - * Count the lines in the file specified as returned by {@code BufferedReader.readLine()} - * - * @param inputFile the file whose lines will be counted - * @param charset the charset of the file to read - * @return the number of lines in the input file. - * @throws IOException if there is a problem opening or reading the file. - */ - public static int countLines(FileSystem fs, Path inputFile, Charset charset) throws IOException { - int lineCount = 0; - try (BufferedReader reader = new BufferedReader(new InputStreamReader(fs.open(inputFile), charset))){ - while (reader.readLine() != null) { - lineCount++; - } - } - return lineCount; - } - - /** - * Used to pass information back to a caller once a file has been split without the need for a data object - */ - public interface SplitCallback { - void splitComplete(Path inputFile, int lineCount, int trainCount, int testCount, int testSplitStart); - } - -}
http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/integration/src/main/java/org/apache/mahout/utils/SplitInputJob.java ---------------------------------------------------------------------- diff --git a/integration/src/main/java/org/apache/mahout/utils/SplitInputJob.java b/integration/src/main/java/org/apache/mahout/utils/SplitInputJob.java deleted file mode 100644 index 4a1ff86..0000000 --- a/integration/src/main/java/org/apache/mahout/utils/SplitInputJob.java +++ /dev/null @@ -1,213 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.mahout.utils; - -import java.io.IOException; -import java.io.Serializable; -import java.util.Random; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.WritableComparable; -import org.apache.hadoop.io.WritableComparator; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.Mapper; -import org.apache.hadoop.mapreduce.Reducer; -import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; -import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; -import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; -import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs; -import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; -import org.apache.mahout.common.Pair; -import org.apache.mahout.common.RandomUtils; -import org.apache.mahout.common.iterator.sequencefile.PathFilters; -import org.apache.mahout.common.iterator.sequencefile.PathType; -import org.apache.mahout.common.iterator.sequencefile.SequenceFileDirIterator; - -/** - * Class which implements a map reduce version of SplitInput. - * This class takes a SequenceFile input, e.g. a set of training data - * for a learning algorithm, downsamples it, applies a random - * permutation and splits it into test and training sets - */ -public final class SplitInputJob { - - private static final String DOWNSAMPLING_FACTOR = "SplitInputJob.downsamplingFactor"; - private static final String RANDOM_SELECTION_PCT = "SplitInputJob.randomSelectionPct"; - private static final String TRAINING_TAG = "training"; - private static final String TEST_TAG = "test"; - - private SplitInputJob() {} - - /** - * Run job to downsample, randomly permute and split data into test and - * training sets. This job takes a SequenceFile as input and outputs two - * SequenceFiles test-r-00000 and training-r-00000 which contain the test and - * training sets respectively - * - * @param initialConf - * Initial configuration - * @param inputPath - * path to input data SequenceFile - * @param outputPath - * path for output data SequenceFiles - * @param keepPct - * percentage of key value pairs in input to keep. The rest are - * discarded - * @param randomSelectionPercent - * percentage of key value pairs to allocate to test set. Remainder - * are allocated to training set - */ - @SuppressWarnings("rawtypes") - public static void run(Configuration initialConf, Path inputPath, - Path outputPath, int keepPct, float randomSelectionPercent) - throws IOException, ClassNotFoundException, InterruptedException { - - int downsamplingFactor = (int) (100.0 / keepPct); - initialConf.setInt(DOWNSAMPLING_FACTOR, downsamplingFactor); - initialConf.setFloat(RANDOM_SELECTION_PCT, randomSelectionPercent); - - // Determine class of keys and values - FileSystem fs = FileSystem.get(initialConf); - - SequenceFileDirIterator<? extends WritableComparable, Writable> iterator = - new SequenceFileDirIterator<>(inputPath, - PathType.LIST, PathFilters.partFilter(), null, false, fs.getConf()); - Class<? extends WritableComparable> keyClass; - Class<? extends Writable> valueClass; - if (iterator.hasNext()) { - Pair<? extends WritableComparable, Writable> pair = iterator.next(); - keyClass = pair.getFirst().getClass(); - valueClass = pair.getSecond().getClass(); - } else { - throw new IllegalStateException("Couldn't determine class of the input values"); - } - - Job job = new Job(new Configuration(initialConf)); - - MultipleOutputs.addNamedOutput(job, TRAINING_TAG, SequenceFileOutputFormat.class, keyClass, valueClass); - MultipleOutputs.addNamedOutput(job, TEST_TAG, SequenceFileOutputFormat.class, keyClass, valueClass); - job.setJarByClass(SplitInputJob.class); - FileInputFormat.addInputPath(job, inputPath); - FileOutputFormat.setOutputPath(job, outputPath); - job.setNumReduceTasks(1); - job.setInputFormatClass(SequenceFileInputFormat.class); - job.setOutputFormatClass(SequenceFileOutputFormat.class); - job.setMapperClass(SplitInputMapper.class); - job.setReducerClass(SplitInputReducer.class); - job.setSortComparatorClass(SplitInputComparator.class); - job.setOutputKeyClass(keyClass); - job.setOutputValueClass(valueClass); - job.submit(); - boolean succeeded = job.waitForCompletion(true); - if (!succeeded) { - throw new IllegalStateException("Job failed!"); - } - } - - /** Mapper which downsamples the input by downsamplingFactor */ - public static class SplitInputMapper extends - Mapper<WritableComparable<?>, Writable, WritableComparable<?>, Writable> { - - private int downsamplingFactor; - - @Override - public void setup(Context ctx) { - downsamplingFactor = ctx.getConfiguration().getInt(DOWNSAMPLING_FACTOR, 1); - } - - /** Only run map() for one out of every downsampleFactor inputs */ - @Override - public void run(Context context) throws IOException, InterruptedException { - setup(context); - int i = 0; - while (context.nextKeyValue()) { - if (i % downsamplingFactor == 0) { - map(context.getCurrentKey(), context.getCurrentValue(), context); - } - i++; - } - cleanup(context); - } - - } - - /** Reducer which uses MultipleOutputs to randomly allocate key value pairs between test and training outputs */ - public static class SplitInputReducer extends - Reducer<WritableComparable<?>, Writable, WritableComparable<?>, Writable> { - - private MultipleOutputs multipleOutputs; - private final Random rnd = RandomUtils.getRandom(); - private float randomSelectionPercent; - - @Override - protected void setup(Context ctx) throws IOException { - randomSelectionPercent = ctx.getConfiguration().getFloat(RANDOM_SELECTION_PCT, 0); - multipleOutputs = new MultipleOutputs(ctx); - } - - /** - * Randomly allocate key value pairs between test and training sets. - * randomSelectionPercent of the pairs will go to the test set. - */ - @Override - protected void reduce(WritableComparable<?> key, Iterable<Writable> values, - Context context) throws IOException, InterruptedException { - for (Writable value : values) { - if (rnd.nextInt(100) < randomSelectionPercent) { - multipleOutputs.write(TEST_TAG, key, value); - } else { - multipleOutputs.write(TRAINING_TAG, key, value); - } - } - - } - - @Override - protected void cleanup(Context context) throws IOException { - try { - multipleOutputs.close(); - } catch (InterruptedException e) { - throw new IOException(e); - } - } - - } - - /** Randomly permute key value pairs */ - public static class SplitInputComparator extends WritableComparator implements Serializable { - - private final Random rnd = RandomUtils.getRandom(); - - protected SplitInputComparator() { - super(WritableComparable.class); - } - - @Override - public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { - if (rnd.nextBoolean()) { - return 1; - } else { - return -1; - } - } - } - -} http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/integration/src/main/java/org/apache/mahout/utils/clustering/AbstractClusterWriter.java ---------------------------------------------------------------------- diff --git a/integration/src/main/java/org/apache/mahout/utils/clustering/AbstractClusterWriter.java b/integration/src/main/java/org/apache/mahout/utils/clustering/AbstractClusterWriter.java deleted file mode 100644 index ac884d0..0000000 --- a/integration/src/main/java/org/apache/mahout/utils/clustering/AbstractClusterWriter.java +++ /dev/null @@ -1,160 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.mahout.utils.clustering; - -import java.io.IOException; -import java.io.Writer; -import java.util.Collection; -import java.util.Collections; -import java.util.Comparator; -import java.util.Iterator; -import java.util.List; -import java.util.Map; - -import org.apache.commons.lang3.StringUtils; -import org.apache.mahout.clustering.classify.WeightedPropertyVectorWritable; -import org.apache.mahout.clustering.iterator.ClusterWritable; -import org.apache.mahout.common.Pair; -import org.apache.mahout.common.distance.DistanceMeasure; -import org.apache.mahout.math.Vector; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.collect.Lists; - -/** - * Base class for implementing ClusterWriter - */ -public abstract class AbstractClusterWriter implements ClusterWriter { - - private static final Logger log = LoggerFactory.getLogger(AbstractClusterWriter.class); - - protected final Writer writer; - protected final Map<Integer, List<WeightedPropertyVectorWritable>> clusterIdToPoints; - protected final DistanceMeasure measure; - - /** - * - * @param writer The underlying {@link java.io.Writer} to use - * @param clusterIdToPoints The map between cluster ids {@link org.apache.mahout.clustering.Cluster#getId()} and the - * points in the cluster - * @param measure The {@link org.apache.mahout.common.distance.DistanceMeasure} used to calculate the distance. - * Some writers may wish to use it for calculating weights for display. May be null. - */ - protected AbstractClusterWriter(Writer writer, Map<Integer, List<WeightedPropertyVectorWritable>> clusterIdToPoints, - DistanceMeasure measure) { - this.writer = writer; - this.clusterIdToPoints = clusterIdToPoints; - this.measure = measure; - } - - protected Writer getWriter() { - return writer; - } - - protected Map<Integer, List<WeightedPropertyVectorWritable>> getClusterIdToPoints() { - return clusterIdToPoints; - } - - public static String getTopFeatures(Vector vector, String[] dictionary, int numTerms) { - - StringBuilder sb = new StringBuilder(100); - - for (Pair<String, Double> item : getTopPairs(vector, dictionary, numTerms)) { - String term = item.getFirst(); - sb.append("\n\t\t"); - sb.append(StringUtils.rightPad(term, 40)); - sb.append("=>"); - sb.append(StringUtils.leftPad(item.getSecond().toString(), 20)); - } - return sb.toString(); - } - - public static String getTopTerms(Vector vector, String[] dictionary, int numTerms) { - - StringBuilder sb = new StringBuilder(100); - - for (Pair<String, Double> item : getTopPairs(vector, dictionary, numTerms)) { - String term = item.getFirst(); - sb.append(term).append('_'); - } - sb.deleteCharAt(sb.length() - 1); - return sb.toString(); - } - - @Override - public long write(Iterable<ClusterWritable> iterable) throws IOException { - return write(iterable, Long.MAX_VALUE); - } - - @Override - public void close() throws IOException { - writer.close(); - } - - @Override - public long write(Iterable<ClusterWritable> iterable, long maxDocs) throws IOException { - long result = 0; - Iterator<ClusterWritable> iterator = iterable.iterator(); - while (result < maxDocs && iterator.hasNext()) { - write(iterator.next()); - result++; - } - return result; - } - - private static Collection<Pair<String, Double>> getTopPairs(Vector vector, String[] dictionary, int numTerms) { - List<TermIndexWeight> vectorTerms = Lists.newArrayList(); - - for (Vector.Element elt : vector.nonZeroes()) { - vectorTerms.add(new TermIndexWeight(elt.index(), elt.get())); - } - - // Sort results in reverse order (ie weight in descending order) - Collections.sort(vectorTerms, new Comparator<TermIndexWeight>() { - @Override - public int compare(TermIndexWeight one, TermIndexWeight two) { - return Double.compare(two.weight, one.weight); - } - }); - - Collection<Pair<String, Double>> topTerms = Lists.newLinkedList(); - - for (int i = 0; i < vectorTerms.size() && i < numTerms; i++) { - int index = vectorTerms.get(i).index; - String dictTerm = dictionary[index]; - if (dictTerm == null) { - log.error("Dictionary entry missing for {}", index); - continue; - } - topTerms.add(new Pair<>(dictTerm, vectorTerms.get(i).weight)); - } - - return topTerms; - } - - private static class TermIndexWeight { - private final int index; - private final double weight; - - TermIndexWeight(int index, double weight) { - this.index = index; - this.weight = weight; - } - } -} http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/integration/src/main/java/org/apache/mahout/utils/clustering/CSVClusterWriter.java ---------------------------------------------------------------------- diff --git a/integration/src/main/java/org/apache/mahout/utils/clustering/CSVClusterWriter.java b/integration/src/main/java/org/apache/mahout/utils/clustering/CSVClusterWriter.java deleted file mode 100644 index 7269016..0000000 --- a/integration/src/main/java/org/apache/mahout/utils/clustering/CSVClusterWriter.java +++ /dev/null @@ -1,69 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.mahout.utils.clustering; - -import org.apache.mahout.clustering.Cluster; -import org.apache.mahout.clustering.classify.WeightedPropertyVectorWritable; -import org.apache.mahout.clustering.iterator.ClusterWritable; -import org.apache.mahout.common.distance.DistanceMeasure; -import org.apache.mahout.math.NamedVector; -import org.apache.mahout.math.Vector; - -import java.io.IOException; -import java.io.Writer; -import java.util.List; -import java.util.Map; -import java.util.regex.Pattern; - -/** - * Format is adjacency style as put forth at http://gephi.org/users/supported-graph-formats/csv-format/, the centroid - * is the first element and all the rest of the row are the points in that cluster - * - **/ -public class CSVClusterWriter extends AbstractClusterWriter { - - private static final Pattern VEC_PATTERN = Pattern.compile("\\{|\\:|\\,|\\}"); - - public CSVClusterWriter(Writer writer, Map<Integer, List<WeightedPropertyVectorWritable>> clusterIdToPoints, - DistanceMeasure measure) { - super(writer, clusterIdToPoints, measure); - } - - @Override - public void write(ClusterWritable clusterWritable) throws IOException { - StringBuilder line = new StringBuilder(); - Cluster cluster = clusterWritable.getValue(); - line.append(cluster.getId()); - List<WeightedPropertyVectorWritable> points = getClusterIdToPoints().get(cluster.getId()); - if (points != null) { - for (WeightedPropertyVectorWritable point : points) { - Vector theVec = point.getVector(); - line.append(','); - if (theVec instanceof NamedVector) { - line.append(((NamedVector)theVec).getName()); - } else { - String vecStr = theVec.asFormatString(); - //do some basic manipulations for display - vecStr = VEC_PATTERN.matcher(vecStr).replaceAll("_"); - line.append(vecStr); - } - } - getWriter().append(line).append("\n"); - } - } -} http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/integration/src/main/java/org/apache/mahout/utils/clustering/ClusterDumper.java ---------------------------------------------------------------------- diff --git a/integration/src/main/java/org/apache/mahout/utils/clustering/ClusterDumper.java b/integration/src/main/java/org/apache/mahout/utils/clustering/ClusterDumper.java deleted file mode 100644 index 75b5ded..0000000 --- a/integration/src/main/java/org/apache/mahout/utils/clustering/ClusterDumper.java +++ /dev/null @@ -1,328 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.mahout.utils.clustering; - -import java.io.File; -import java.io.IOException; -import java.io.OutputStreamWriter; -import java.io.Writer; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.TreeMap; - -import com.google.common.io.Closeables; -import com.google.common.io.Files; -import org.apache.commons.io.Charsets; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.IntWritable; -import org.apache.mahout.clustering.cdbw.CDbwEvaluator; -import org.apache.mahout.clustering.classify.WeightedPropertyVectorWritable; -import org.apache.mahout.clustering.evaluation.ClusterEvaluator; -import org.apache.mahout.clustering.evaluation.RepresentativePointsDriver; -import org.apache.mahout.clustering.iterator.ClusterWritable; -import org.apache.mahout.common.AbstractJob; -import org.apache.mahout.common.ClassUtils; -import org.apache.mahout.common.HadoopUtil; -import org.apache.mahout.common.Pair; -import org.apache.mahout.common.commandline.DefaultOptionCreator; -import org.apache.mahout.common.distance.DistanceMeasure; -import org.apache.mahout.common.iterator.sequencefile.PathFilters; -import org.apache.mahout.common.iterator.sequencefile.PathType; -import org.apache.mahout.common.iterator.sequencefile.SequenceFileDirIterable; -import org.apache.mahout.common.iterator.sequencefile.SequenceFileDirValueIterable; -import org.apache.mahout.utils.vectors.VectorHelper; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public final class ClusterDumper extends AbstractJob { - - public static final String SAMPLE_POINTS = "samplePoints"; - DistanceMeasure measure; - - public enum OUTPUT_FORMAT { - TEXT, - CSV, - GRAPH_ML, - JSON, - } - - public static final String DICTIONARY_TYPE_OPTION = "dictionaryType"; - public static final String DICTIONARY_OPTION = "dictionary"; - public static final String POINTS_DIR_OPTION = "pointsDir"; - public static final String NUM_WORDS_OPTION = "numWords"; - public static final String SUBSTRING_OPTION = "substring"; - public static final String EVALUATE_CLUSTERS = "evaluate"; - - public static final String OUTPUT_FORMAT_OPT = "outputFormat"; - - private static final Logger log = LoggerFactory.getLogger(ClusterDumper.class); - private Path seqFileDir; - private Path pointsDir; - private long maxPointsPerCluster = Long.MAX_VALUE; - private String termDictionary; - private String dictionaryFormat; - private int subString = Integer.MAX_VALUE; - private int numTopFeatures = 10; - private Map<Integer, List<WeightedPropertyVectorWritable>> clusterIdToPoints; - private OUTPUT_FORMAT outputFormat = OUTPUT_FORMAT.TEXT; - private boolean runEvaluation; - - public ClusterDumper(Path seqFileDir, Path pointsDir) { - this.seqFileDir = seqFileDir; - this.pointsDir = pointsDir; - init(); - } - - public ClusterDumper() { - setConf(new Configuration()); - } - - public static void main(String[] args) throws Exception { - new ClusterDumper().run(args); - } - - @Override - public int run(String[] args) throws Exception { - addInputOption(); - addOutputOption(); - addOption(OUTPUT_FORMAT_OPT, "of", "The optional output format for the results. Options: TEXT, CSV, JSON or GRAPH_ML", - "TEXT"); - addOption(SUBSTRING_OPTION, "b", "The number of chars of the asFormatString() to print"); - addOption(NUM_WORDS_OPTION, "n", "The number of top terms to print"); - addOption(POINTS_DIR_OPTION, "p", - "The directory containing points sequence files mapping input vectors to their cluster. " - + "If specified, then the program will output the points associated with a cluster"); - addOption(SAMPLE_POINTS, "sp", "Specifies the maximum number of points to include _per_ cluster. The default " - + "is to include all points"); - addOption(DICTIONARY_OPTION, "d", "The dictionary file"); - addOption(DICTIONARY_TYPE_OPTION, "dt", "The dictionary file type (text|sequencefile)", "text"); - addOption(buildOption(EVALUATE_CLUSTERS, "e", "Run ClusterEvaluator and CDbwEvaluator over the input. " - + "The output will be appended to the rest of the output at the end.", false, false, null)); - addOption(DefaultOptionCreator.distanceMeasureOption().create()); - - // output is optional, will print to System.out per default - if (parseArguments(args, false, true) == null) { - return -1; - } - - seqFileDir = getInputPath(); - if (hasOption(POINTS_DIR_OPTION)) { - pointsDir = new Path(getOption(POINTS_DIR_OPTION)); - } - outputFile = getOutputFile(); - if (hasOption(SUBSTRING_OPTION)) { - int sub = Integer.parseInt(getOption(SUBSTRING_OPTION)); - if (sub >= 0) { - subString = sub; - } - } - termDictionary = getOption(DICTIONARY_OPTION); - dictionaryFormat = getOption(DICTIONARY_TYPE_OPTION); - if (hasOption(NUM_WORDS_OPTION)) { - numTopFeatures = Integer.parseInt(getOption(NUM_WORDS_OPTION)); - } - if (hasOption(OUTPUT_FORMAT_OPT)) { - outputFormat = OUTPUT_FORMAT.valueOf(getOption(OUTPUT_FORMAT_OPT)); - } - if (hasOption(SAMPLE_POINTS)) { - maxPointsPerCluster = Long.parseLong(getOption(SAMPLE_POINTS)); - } else { - maxPointsPerCluster = Long.MAX_VALUE; - } - runEvaluation = hasOption(EVALUATE_CLUSTERS); - String distanceMeasureClass = getOption(DefaultOptionCreator.DISTANCE_MEASURE_OPTION); - measure = ClassUtils.instantiateAs(distanceMeasureClass, DistanceMeasure.class); - - init(); - printClusters(null); - return 0; - } - - public void printClusters(String[] dictionary) throws Exception { - Configuration conf = new Configuration(); - - if (this.termDictionary != null) { - if ("text".equals(dictionaryFormat)) { - dictionary = VectorHelper.loadTermDictionary(new File(this.termDictionary)); - } else if ("sequencefile".equals(dictionaryFormat)) { - dictionary = VectorHelper.loadTermDictionary(conf, this.termDictionary); - } else { - throw new IllegalArgumentException("Invalid dictionary format"); - } - } - - Writer writer; - boolean shouldClose; - if (this.outputFile == null) { - shouldClose = false; - writer = new OutputStreamWriter(System.out, Charsets.UTF_8); - } else { - shouldClose = true; - if (outputFile.getName().startsWith("s3n://")) { - Path p = outputPath; - FileSystem fs = FileSystem.get(p.toUri(), conf); - writer = new OutputStreamWriter(fs.create(p), Charsets.UTF_8); - } else { - Files.createParentDirs(outputFile); - writer = Files.newWriter(this.outputFile, Charsets.UTF_8); - } - } - ClusterWriter clusterWriter = createClusterWriter(writer, dictionary); - try { - long numWritten = clusterWriter.write(new SequenceFileDirValueIterable<ClusterWritable>(new Path(seqFileDir, - "part-*"), PathType.GLOB, conf)); - - writer.flush(); - if (runEvaluation) { - HadoopUtil.delete(conf, new Path("tmp/representative")); - int numIters = 5; - RepresentativePointsDriver.main(new String[]{ - "--input", seqFileDir.toString(), - "--output", "tmp/representative", - "--clusteredPoints", pointsDir.toString(), - "--distanceMeasure", measure.getClass().getName(), - "--maxIter", String.valueOf(numIters) - }); - conf.set(RepresentativePointsDriver.DISTANCE_MEASURE_KEY, measure.getClass().getName()); - conf.set(RepresentativePointsDriver.STATE_IN_KEY, "tmp/representative/representativePoints-" + numIters); - ClusterEvaluator ce = new ClusterEvaluator(conf, seqFileDir); - writer.append("\n"); - writer.append("Inter-Cluster Density: ").append(String.valueOf(ce.interClusterDensity())).append("\n"); - writer.append("Intra-Cluster Density: ").append(String.valueOf(ce.intraClusterDensity())).append("\n"); - CDbwEvaluator cdbw = new CDbwEvaluator(conf, seqFileDir); - writer.append("CDbw Inter-Cluster Density: ").append(String.valueOf(cdbw.interClusterDensity())).append("\n"); - writer.append("CDbw Intra-Cluster Density: ").append(String.valueOf(cdbw.intraClusterDensity())).append("\n"); - writer.append("CDbw Separation: ").append(String.valueOf(cdbw.separation())).append("\n"); - writer.flush(); - } - log.info("Wrote {} clusters", numWritten); - } finally { - if (shouldClose) { - Closeables.close(clusterWriter, false); - } else { - if (clusterWriter instanceof GraphMLClusterWriter) { - clusterWriter.close(); - } - } - } - } - - ClusterWriter createClusterWriter(Writer writer, String[] dictionary) throws IOException { - ClusterWriter result; - - switch (outputFormat) { - case TEXT: - result = new ClusterDumperWriter(writer, clusterIdToPoints, measure, numTopFeatures, dictionary, subString); - break; - case CSV: - result = new CSVClusterWriter(writer, clusterIdToPoints, measure); - break; - case GRAPH_ML: - result = new GraphMLClusterWriter(writer, clusterIdToPoints, measure, numTopFeatures, dictionary, subString); - break; - case JSON: - result = new JsonClusterWriter(writer, clusterIdToPoints, measure, numTopFeatures, dictionary); - break; - default: - throw new IllegalStateException("Unknown outputformat: " + outputFormat); - } - return result; - } - - /** - * Convenience function to set the output format during testing. - */ - public void setOutputFormat(OUTPUT_FORMAT of) { - outputFormat = of; - } - - private void init() { - if (this.pointsDir != null) { - Configuration conf = new Configuration(); - // read in the points - clusterIdToPoints = readPoints(this.pointsDir, maxPointsPerCluster, conf); - } else { - clusterIdToPoints = Collections.emptyMap(); - } - } - - - public int getSubString() { - return subString; - } - - public void setSubString(int subString) { - this.subString = subString; - } - - public Map<Integer, List<WeightedPropertyVectorWritable>> getClusterIdToPoints() { - return clusterIdToPoints; - } - - public String getTermDictionary() { - return termDictionary; - } - - public void setTermDictionary(String termDictionary, String dictionaryType) { - this.termDictionary = termDictionary; - this.dictionaryFormat = dictionaryType; - } - - public void setNumTopFeatures(int num) { - this.numTopFeatures = num; - } - - public int getNumTopFeatures() { - return this.numTopFeatures; - } - - public long getMaxPointsPerCluster() { - return maxPointsPerCluster; - } - - public void setMaxPointsPerCluster(long maxPointsPerCluster) { - this.maxPointsPerCluster = maxPointsPerCluster; - } - - public static Map<Integer, List<WeightedPropertyVectorWritable>> readPoints(Path pointsPathDir, - long maxPointsPerCluster, - Configuration conf) { - Map<Integer, List<WeightedPropertyVectorWritable>> result = new TreeMap<>(); - for (Pair<IntWritable, WeightedPropertyVectorWritable> record - : new SequenceFileDirIterable<IntWritable, WeightedPropertyVectorWritable>(pointsPathDir, PathType.LIST, - PathFilters.logsCRCFilter(), conf)) { - // value is the cluster id as an int, key is the name/id of the - // vector, but that doesn't matter because we only care about printing it - //String clusterId = value.toString(); - int keyValue = record.getFirst().get(); - List<WeightedPropertyVectorWritable> pointList = result.get(keyValue); - if (pointList == null) { - pointList = new ArrayList<>(); - result.put(keyValue, pointList); - } - if (pointList.size() < maxPointsPerCluster) { - pointList.add(record.getSecond()); - } - } - return result; - } -} http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/integration/src/main/java/org/apache/mahout/utils/clustering/ClusterDumperWriter.java ---------------------------------------------------------------------- diff --git a/integration/src/main/java/org/apache/mahout/utils/clustering/ClusterDumperWriter.java b/integration/src/main/java/org/apache/mahout/utils/clustering/ClusterDumperWriter.java deleted file mode 100644 index 31858c4..0000000 --- a/integration/src/main/java/org/apache/mahout/utils/clustering/ClusterDumperWriter.java +++ /dev/null @@ -1,100 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.mahout.utils.clustering; - -import org.apache.hadoop.io.Text; -import org.apache.mahout.clustering.AbstractCluster; -import org.apache.mahout.clustering.Cluster; -import org.apache.mahout.clustering.classify.WeightedPropertyVectorWritable; -import org.apache.mahout.clustering.iterator.ClusterWritable; -import org.apache.mahout.common.distance.DistanceMeasure; - -import java.io.IOException; -import java.io.Writer; -import java.util.Iterator; -import java.util.List; -import java.util.Map; - -/** - * Implements a {@link ClusterWriter} that outputs in the format used by ClusterDumper in Mahout 0.5 - */ -public class ClusterDumperWriter extends AbstractClusterWriter { - - private final int subString; - private final String[] dictionary; - private final int numTopFeatures; - - public ClusterDumperWriter(Writer writer, Map<Integer,List<WeightedPropertyVectorWritable>> clusterIdToPoints, - DistanceMeasure measure, int numTopFeatures, String[] dictionary, int subString) { - super(writer, clusterIdToPoints, measure); - this.numTopFeatures = numTopFeatures; - this.dictionary = dictionary; - this.subString = subString; - } - - @Override - public void write(ClusterWritable clusterWritable) throws IOException { - Cluster cluster = clusterWritable.getValue(); - String fmtStr = cluster.asFormatString(dictionary); - Writer writer = getWriter(); - if (subString > 0 && fmtStr.length() > subString) { - writer.write(':'); - writer.write(fmtStr, 0, Math.min(subString, fmtStr.length())); - } else { - writer.write(fmtStr); - } - - writer.write('\n'); - - if (dictionary != null) { - String topTerms = getTopFeatures(clusterWritable.getValue().getCenter(), dictionary, numTopFeatures); - writer.write("\tTop Terms: "); - writer.write(topTerms); - writer.write('\n'); - } - - Map<Integer,List<WeightedPropertyVectorWritable>> clusterIdToPoints = getClusterIdToPoints(); - List<WeightedPropertyVectorWritable> points = clusterIdToPoints.get(clusterWritable.getValue().getId()); - if (points != null) { - writer.write("\tWeight : [props - optional]: Point:\n\t"); - for (Iterator<WeightedPropertyVectorWritable> iterator = points.iterator(); iterator.hasNext();) { - WeightedPropertyVectorWritable point = iterator.next(); - writer.write(String.valueOf(point.getWeight())); - Map<Text,Text> map = point.getProperties(); - // map can be null since empty maps when written are returned as null - writer.write(" : ["); - if (map != null) { - for (Map.Entry<Text,Text> entry : map.entrySet()) { - writer.write(entry.getKey().toString()); - writer.write("="); - writer.write(entry.getValue().toString()); - } - } - writer.write("]"); - - writer.write(": "); - - writer.write(AbstractCluster.formatVector(point.getVector(), dictionary)); - if (iterator.hasNext()) { - writer.write("\n\t"); - } - } - writer.write('\n'); - } - } -} http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/integration/src/main/java/org/apache/mahout/utils/clustering/ClusterWriter.java ---------------------------------------------------------------------- diff --git a/integration/src/main/java/org/apache/mahout/utils/clustering/ClusterWriter.java b/integration/src/main/java/org/apache/mahout/utils/clustering/ClusterWriter.java deleted file mode 100644 index 70f8f6f..0000000 --- a/integration/src/main/java/org/apache/mahout/utils/clustering/ClusterWriter.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.mahout.utils.clustering; - -import java.io.Closeable; -import java.io.IOException; - -import org.apache.mahout.clustering.iterator.ClusterWritable; - -/** - * Writes out clusters - */ -public interface ClusterWriter extends Closeable { - - /** - * Write all values in the Iterable to the output - * - * @param iterable The {@link Iterable} to loop over - * @return the number of docs written - * @throws java.io.IOException if there was a problem writing - */ - long write(Iterable<ClusterWritable> iterable) throws IOException; - - /** - * Write out a Cluster - */ - void write(ClusterWritable clusterWritable) throws IOException; - - /** - * Write the first {@code maxDocs} to the output. - * - * @param iterable The {@link Iterable} to loop over - * @param maxDocs the maximum number of docs to write - * @return The number of docs written - * @throws IOException if there was a problem writing - */ - long write(Iterable<ClusterWritable> iterable, long maxDocs) throws IOException; -} http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/integration/src/main/java/org/apache/mahout/utils/clustering/GraphMLClusterWriter.java ---------------------------------------------------------------------- diff --git a/integration/src/main/java/org/apache/mahout/utils/clustering/GraphMLClusterWriter.java b/integration/src/main/java/org/apache/mahout/utils/clustering/GraphMLClusterWriter.java deleted file mode 100644 index 25e8f3b..0000000 --- a/integration/src/main/java/org/apache/mahout/utils/clustering/GraphMLClusterWriter.java +++ /dev/null @@ -1,216 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.mahout.utils.clustering; - -import java.io.IOException; -import java.io.Writer; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Random; -import java.util.regex.Pattern; - -import org.apache.mahout.clustering.Cluster; -import org.apache.mahout.clustering.classify.WeightedPropertyVectorWritable; -import org.apache.mahout.clustering.classify.WeightedVectorWritable; -import org.apache.mahout.clustering.iterator.ClusterWritable; -import org.apache.mahout.common.RandomUtils; -import org.apache.mahout.common.StringUtils; -import org.apache.mahout.common.distance.DistanceMeasure; -import org.apache.mahout.math.NamedVector; -import org.apache.mahout.math.Vector; - -/** - * GraphML -- see http://gephi.org/users/supported-graph-formats/graphml-format/ - */ -public class GraphMLClusterWriter extends AbstractClusterWriter { - - private static final Pattern VEC_PATTERN = Pattern.compile("\\{|\\:|\\,|\\}"); - private final Map<Integer, Color> colors = new HashMap<>(); - private Color lastClusterColor; - private float lastX; - private float lastY; - private Random random; - private int posStep; - private final String[] dictionary; - private final int numTopFeatures; - private final int subString; - - public GraphMLClusterWriter(Writer writer, Map<Integer, List<WeightedPropertyVectorWritable>> clusterIdToPoints, - DistanceMeasure measure, int numTopFeatures, String[] dictionary, int subString) - throws IOException { - super(writer, clusterIdToPoints, measure); - this.dictionary = dictionary; - this.numTopFeatures = numTopFeatures; - this.subString = subString; - init(writer); - } - - private void init(Writer writer) throws IOException { - writer.append("<?xml version=\"1.0\" encoding=\"UTF-8\"?>"); - writer.append("<graphml xmlns=\"http://graphml.graphdrawing.org/xmlns\"\n" - + "xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\"\n" - + "xsi:schemaLocation=\"http://graphml.graphdrawing.org/xmlns\n" - + "http://graphml.graphdrawing.org/xmlns/1.0/graphml.xsd\">"); - //support rgb - writer.append("<key attr.name=\"r\" attr.type=\"int\" for=\"node\" id=\"r\"/>\n" - + "<key attr.name=\"g\" attr.type=\"int\" for=\"node\" id=\"g\"/>\n" - + "<key attr.name=\"b\" attr.type=\"int\" for=\"node\" id=\"b\"/>" - + "<key attr.name=\"size\" attr.type=\"int\" for=\"node\" id=\"size\"/>" - + "<key attr.name=\"weight\" attr.type=\"float\" for=\"edge\" id=\"weight\"/>" - + "<key attr.name=\"x\" attr.type=\"float\" for=\"node\" id=\"x\"/>" - + "<key attr.name=\"y\" attr.type=\"float\" for=\"node\" id=\"y\"/>"); - writer.append("<graph edgedefault=\"undirected\">"); - lastClusterColor = new Color(); - posStep = (int) (0.1 * clusterIdToPoints.size()) + 100; - random = RandomUtils.getRandom(); - } - - /* - <?xml version="1.0" encoding="UTF-8"?> - <graphml xmlns="http://graphml.graphdrawing.org/xmlns" - xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://graphml.graphdrawing.org/xmlns - http://graphml.graphdrawing.org/xmlns/1.0/graphml.xsd"> - <graph id="G" edgedefault="undirected"> - <node id="n0"/> - <node id="n1"/> - <edge id="e1" source="n0" target="n1"/> - </graph> - </graphml> - */ - - @Override - public void write(ClusterWritable clusterWritable) throws IOException { - StringBuilder line = new StringBuilder(); - Cluster cluster = clusterWritable.getValue(); - Color rgb = getColor(cluster.getId()); - - String topTerms = ""; - if (dictionary != null) { - topTerms = getTopTerms(cluster.getCenter(), dictionary, numTopFeatures); - } - String clusterLabel = String.valueOf(cluster.getId()) + '_' + topTerms; - //do some positioning so that items are visible and grouped together - //TODO: put in a real layout algorithm - float x = lastX + 1000; - float y = lastY; - if (x > (1000 + posStep)) { - y = lastY + 1000; - x = 0; - } - - line.append(createNode(clusterLabel, rgb, x, y)); - List<WeightedPropertyVectorWritable> points = clusterIdToPoints.get(cluster.getId()); - if (points != null) { - for (WeightedVectorWritable point : points) { - Vector theVec = point.getVector(); - double distance = 1; - if (measure != null) { - //scale the distance - distance = measure.distance(cluster.getCenter().getLengthSquared(), cluster.getCenter(), theVec) * 500; - } - String vecStr; - int angle = random.nextInt(360); //pick an angle at random and then scale along that angle - double angleRads = Math.toRadians(angle); - - float targetX = x + (float) (distance * Math.cos(angleRads)); - float targetY = y + (float) (distance * Math.sin(angleRads)); - if (theVec instanceof NamedVector) { - vecStr = ((NamedVector) theVec).getName(); - } else { - vecStr = theVec.asFormatString(); - //do some basic manipulations for display - vecStr = VEC_PATTERN.matcher(vecStr).replaceAll("_"); - } - if (subString > 0 && vecStr.length() > subString) { - vecStr = vecStr.substring(0, subString); - } - line.append(createNode(vecStr, rgb, targetX, targetY)); - line.append(createEdge(clusterLabel, vecStr, distance)); - } - } - lastClusterColor = rgb; - lastX = x; - lastY = y; - getWriter().append(line).append("\n"); - } - - private Color getColor(int clusterId) { - Color result = colors.get(clusterId); - if (result == null) { - result = new Color(); - //there is probably some better way to color a graph - int incR = 0; - int incG = 0; - int incB = 0; - if (lastClusterColor.r + 20 < 256 && lastClusterColor.g + 20 < 256 && lastClusterColor.b + 20 < 256) { - incR = 20; - incG = 0; - incB = 0; - } else if (lastClusterColor.r + 20 >= 256 && lastClusterColor.g + 20 < 256 && lastClusterColor.b + 20 < 256) { - incG = 20; - incB = 0; - } else if (lastClusterColor.r + 20 >= 256 && lastClusterColor.g + 20 >= 256 && lastClusterColor.b + 20 < 256) { - incB = 20; - } else { - incR += 3; - incG += 3; - incR += 3; - } - result.r = (lastClusterColor.r + incR) % 256; - result.g = (lastClusterColor.g + incG) % 256; - result.b = (lastClusterColor.b + incB) % 256; - colors.put(clusterId, result); - } - return result; - } - - private static String createEdge(String left, String right, double distance) { - left = StringUtils.escapeXML(left); - right = StringUtils.escapeXML(right); - return "<edge id=\"" + left + '_' + right + "\" source=\"" + left + "\" target=\"" + right + "\">" - + "<data key=\"weight\">" + distance + "</data></edge>"; - } - - private static String createNode(String s, Color rgb, float x, float y) { - return "<node id=\"" + StringUtils.escapeXML(s) + "\"><data key=\"r\">" + rgb.r - + "</data>" - + "<data key=\"g\">" + rgb.g - + "</data>" - + "<data key=\"b\">" + rgb.b - + "</data>" - + "<data key=\"x\">" + x - + "</data>" - + "<data key=\"y\">" + y - + "</data>" - + "</node>"; - } - - @Override - public void close() throws IOException { - getWriter().append("</graph>").append("</graphml>"); - super.close(); - } - - private static class Color { - int r; - int g; - int b; - } -} http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/integration/src/main/java/org/apache/mahout/utils/clustering/JsonClusterWriter.java ---------------------------------------------------------------------- diff --git a/integration/src/main/java/org/apache/mahout/utils/clustering/JsonClusterWriter.java b/integration/src/main/java/org/apache/mahout/utils/clustering/JsonClusterWriter.java deleted file mode 100644 index d564a73..0000000 --- a/integration/src/main/java/org/apache/mahout/utils/clustering/JsonClusterWriter.java +++ /dev/null @@ -1,188 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.mahout.utils.clustering; - -import java.io.IOException; -import java.io.Writer; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Comparator; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.regex.Pattern; - -import org.apache.mahout.clustering.AbstractCluster; -import org.apache.mahout.clustering.Cluster; -import org.apache.mahout.clustering.classify.WeightedPropertyVectorWritable; -import org.apache.mahout.clustering.iterator.ClusterWritable; -import org.apache.mahout.common.distance.DistanceMeasure; -import org.apache.mahout.math.NamedVector; -import org.apache.mahout.math.Vector; -import org.codehaus.jackson.map.ObjectMapper; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Dump cluster info to JSON formatted lines. Heavily inspired by - * ClusterDumperWriter.java and CSVClusterWriter.java - * - */ -public class JsonClusterWriter extends AbstractClusterWriter { - private final String[] dictionary; - private final int numTopFeatures; - private final ObjectMapper jxn; - - private static final Logger log = LoggerFactory.getLogger(JsonClusterWriter.class); - private static final Pattern VEC_PATTERN = Pattern.compile("\\{|\\:|\\,|\\}"); - - public JsonClusterWriter(Writer writer, - Map<Integer, List<WeightedPropertyVectorWritable>> clusterIdToPoints, - DistanceMeasure measure, int numTopFeatures, String[] dictionary) { - super(writer, clusterIdToPoints, measure); - this.numTopFeatures = numTopFeatures; - this.dictionary = dictionary; - jxn = new ObjectMapper(); - } - - /** - * Generate HashMap with cluster info and write as a single JSON formatted - * line - */ - @Override - public void write(ClusterWritable clusterWritable) throws IOException { - Map<String, Object> res = new HashMap<>(); - - // get top terms - if (dictionary != null) { - List<Object> topTerms = getTopFeaturesList(clusterWritable.getValue() - .getCenter(), dictionary, numTopFeatures); - res.put("top_terms", topTerms); - } else { - res.put("top_terms", new ArrayList<>()); - } - - // get human-readable cluster representation - Cluster cluster = clusterWritable.getValue(); - res.put("cluster_id", cluster.getId()); - - if (dictionary != null) { - Map<String,Object> fmtStr = cluster.asJson(dictionary); - res.put("cluster", fmtStr); - - // get points - List<Object> points = getPoints(cluster, dictionary); - res.put("points", points); - } else { - res.put("cluster", new HashMap<>()); - res.put("points", new ArrayList<>()); - } - - // write JSON - Writer writer = getWriter(); - writer.write(jxn.writeValueAsString(res) + "\n"); - } - - /** - * Create a List of HashMaps containing top terms information - * - * @return List<Object> - */ - public List<Object> getTopFeaturesList(Vector vector, String[] dictionary, - int numTerms) { - - List<TermIndexWeight> vectorTerms = new ArrayList<>(); - - for (Vector.Element elt : vector.nonZeroes()) { - vectorTerms.add(new TermIndexWeight(elt.index(), elt.get())); - } - - // Sort results in reverse order (i.e. weight in descending order) - Collections.sort(vectorTerms, new Comparator<TermIndexWeight>() { - @Override - public int compare(TermIndexWeight one, TermIndexWeight two) { - return Double.compare(two.weight, one.weight); - } - }); - - List<Object> topTerms = new ArrayList<>(); - - for (int i = 0; i < vectorTerms.size() && i < numTerms; i++) { - int index = vectorTerms.get(i).index; - String dictTerm = dictionary[index]; - if (dictTerm == null) { - log.error("Dictionary entry missing for {}", index); - continue; - } - Map<String, Object> term_entry = new HashMap<>(); - term_entry.put(dictTerm, vectorTerms.get(i).weight); - topTerms.add(term_entry); - } - - return topTerms; - } - - /** - * Create a List of HashMaps containing Vector point information - * - * @return List<Object> - */ - public List<Object> getPoints(Cluster cluster, String[] dictionary) { - List<Object> vectorObjs = new ArrayList<>(); - List<WeightedPropertyVectorWritable> points = getClusterIdToPoints().get( - cluster.getId()); - - if (points != null) { - for (WeightedPropertyVectorWritable point : points) { - Map<String, Object> entry = new HashMap<>(); - Vector theVec = point.getVector(); - if (theVec instanceof NamedVector) { - entry.put("vector_name", ((NamedVector) theVec).getName()); - } else { - String vecStr = theVec.asFormatString(); - // do some basic manipulations for display - vecStr = VEC_PATTERN.matcher(vecStr).replaceAll("_"); - entry.put("vector_name", vecStr); - } - entry.put("weight", String.valueOf(point.getWeight())); - try { - entry.put("point", - AbstractCluster.formatVectorAsJson(point.getVector(), dictionary)); - } catch (IOException e) { - log.error("IOException: ", e); - } - vectorObjs.add(entry); - } - } - return vectorObjs; - } - - /** - * Convenience class for sorting terms - * - */ - private static class TermIndexWeight { - private final int index; - private final double weight; - - TermIndexWeight(int index, double weight) { - this.index = index; - this.weight = weight; - } - } - -} http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/integration/src/main/java/org/apache/mahout/utils/email/MailOptions.java ---------------------------------------------------------------------- diff --git a/integration/src/main/java/org/apache/mahout/utils/email/MailOptions.java b/integration/src/main/java/org/apache/mahout/utils/email/MailOptions.java deleted file mode 100644 index 54ad43f..0000000 --- a/integration/src/main/java/org/apache/mahout/utils/email/MailOptions.java +++ /dev/null @@ -1,186 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.mahout.utils.email; - -import java.io.File; -import java.nio.charset.Charset; -import java.util.Map; -import java.util.regex.Pattern; - -/** - * Configuration options to be used by {@link MailProcessor}. Includes options controlling the exact output format - * and which mail fields are included (body, to, from, subject, etc.) - */ -public class MailOptions { - - public static final String FROM = "FROM"; - public static final String TO = "TO"; - public static final String REFS = "REFS"; - public static final String SUBJECT = "SUBJECT"; - public static final Pattern DEFAULT_QUOTED_TEXT = Pattern.compile("^(\\||>)"); - - private boolean stripQuotedText; - private File input; - private String outputDir; - private String prefix; - private int chunkSize; - private Charset charset; - private String separator; - private String bodySeparator = "\n"; - private boolean includeBody; - private Pattern[] patternsToMatch; - //maps FROM, TO, REFS, SUBJECT, etc. to the order they appear in patternsToMatch. See MailToRecMapper - private Map<String, Integer> patternOrder; - - //the regular expression to use for identifying quoted text. - private Pattern quotedTextPattern = DEFAULT_QUOTED_TEXT; - - public File getInput() { - return input; - } - - public void setInput(File input) { - this.input = input; - } - - public String getOutputDir() { - return outputDir; - } - - /** - * Sets the output directory where sequence files will be written. - */ - public void setOutputDir(String outputDir) { - this.outputDir = outputDir; - } - - public String getPrefix() { - return prefix; - } - - /** - * Sets the prefix that is combined with the archive name and with message ids to create {@code SequenceFile} keys. - * @param prefix The name of the directory containing the mail archive is commonly used. - */ - public void setPrefix(String prefix) { - this.prefix = prefix; - } - - public int getChunkSize() { - return chunkSize; - } - - /** - * Sets the size of each generated sequence file, in Megabytes. - */ - public void setChunkSize(int chunkSize) { - this.chunkSize = chunkSize; - } - - public Charset getCharset() { - return charset; - } - - /** - * Sets the encoding of the input - */ - public void setCharset(Charset charset) { - this.charset = charset; - } - - public String getSeparator() { - return separator; - } - - /** - * Sets the separator to use in the output between metadata items (to, from, etc.). - */ - public void setSeparator(String separator) { - this.separator = separator; - } - - public String getBodySeparator() { - return bodySeparator; - } - - /** - * Sets the separator to use in the output between lines in the body, the default is "\n". - */ - public void setBodySeparator(String bodySeparator) { - this.bodySeparator = bodySeparator; - } - - public boolean isIncludeBody() { - return includeBody; - } - - /** - * Sets whether mail bodies are included in the output - */ - public void setIncludeBody(boolean includeBody) { - this.includeBody = includeBody; - } - - public Pattern[] getPatternsToMatch() { - return patternsToMatch; - } - - /** - * Sets the list of patterns to be applied in the given order to extract metadata fields (to, from, subject, etc.) - * from the input - */ - public void setPatternsToMatch(Pattern[] patternsToMatch) { - this.patternsToMatch = patternsToMatch; - } - - public Map<String, Integer> getPatternOrder() { - return patternOrder; - } - - public void setPatternOrder(Map<String, Integer> patternOrder) { - this.patternOrder = patternOrder; - } - - /** - * - * @return true if we should strip out quoted email text - */ - public boolean isStripQuotedText() { - return stripQuotedText; - } - - /** - * - * Sets whether quoted text such as lines starting with | or > is striped off. - */ - public void setStripQuotedText(boolean stripQuotedText) { - this.stripQuotedText = stripQuotedText; - } - - public Pattern getQuotedTextPattern() { - return quotedTextPattern; - } - - /** - * Sets the {@link java.util.regex.Pattern} to use to identify lines that are quoted text. Default is | and > - * @see #setStripQuotedText(boolean) - */ - public void setQuotedTextPattern(Pattern quotedTextPattern) { - this.quotedTextPattern = quotedTextPattern; - } -}
