Repository: mahout Updated Branches: refs/heads/master 53e5adac2 -> 85f9ece66
http://git-wip-us.apache.org/repos/asf/mahout/blob/85f9ece6/mr/src/main/java/org/apache/mahout/clustering/classify/ClusterClassifier.java ---------------------------------------------------------------------- diff --git a/mr/src/main/java/org/apache/mahout/clustering/classify/ClusterClassifier.java b/mr/src/main/java/org/apache/mahout/clustering/classify/ClusterClassifier.java index d5f8d64..dcd4062 100644 --- a/mr/src/main/java/org/apache/mahout/clustering/classify/ClusterClassifier.java +++ b/mr/src/main/java/org/apache/mahout/clustering/classify/ClusterClassifier.java @@ -18,9 +18,11 @@ package org.apache.mahout.clustering.classify; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import java.util.ArrayList; import java.util.List; import java.util.Locale; +import com.google.common.io.Closeables; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -41,19 +43,16 @@ import org.apache.mahout.common.iterator.sequencefile.SequenceFileDirValueIterab import org.apache.mahout.math.Vector; import org.apache.mahout.math.VectorWritable; -import com.google.common.collect.Lists; -import com.google.common.io.Closeables; - /** * This classifier works with any ClusteringPolicy and its associated Clusters. * It is initialized with a policy and a list of compatible clusters and * thereafter it can classify any new Vector into one or more of the clusters * based upon the pdf() function which each cluster supports. - * + * <p/> * In addition, it is an OnlineLearner and can be trained. Training amounts to * asking the actual model to observe the vector and closing the classifier * causes all the models to computeParameters. - * + * <p/> * Because a ClusterClassifier implements Writable, it can be written-to and * read-from a sequence file as a single entity. For sequential and MapReduce * clustering in conjunction with a ClusterIterator; however, it utilizes an @@ -63,42 +62,41 @@ import com.google.common.io.Closeables; * produce them. */ public class ClusterClassifier extends AbstractVectorClassifier implements OnlineLearner, Writable { - + private static final String POLICY_FILE_NAME = "_policy"; - + private List<Cluster> models; - + private String modelClass; - + private ClusteringPolicy policy; - + /** * The public constructor accepts a list of clusters to become the models - * - * @param models - * a List<Cluster> - * @param policy - * a ClusteringPolicy + * + * @param models a List<Cluster> + * @param policy a ClusteringPolicy */ public ClusterClassifier(List<Cluster> models, ClusteringPolicy policy) { this.models = models; modelClass = models.get(0).getClass().getName(); this.policy = policy; } - + // needed for serialization/De-serialization - public ClusterClassifier() {} - + public ClusterClassifier() { + } + // only used by MR ClusterIterator protected ClusterClassifier(ClusteringPolicy policy) { this.policy = policy; } - + @Override public Vector classify(Vector instance) { return policy.classify(instance, this); } - + @Override public double classifyScalar(Vector instance) { if (models.size() == 2) { @@ -108,12 +106,12 @@ public class ClusterClassifier extends AbstractVectorClassifier implements Onlin } throw new IllegalStateException(); } - + @Override public int numCategories() { return models.size(); } - + @Override public void write(DataOutput out) throws IOException { out.writeInt(models.size()); @@ -123,12 +121,12 @@ public class ClusterClassifier extends AbstractVectorClassifier implements Onlin cluster.write(out); } } - + @Override public void readFields(DataInput in) throws IOException { int size = in.readInt(); modelClass = in.readUTF(); - models = Lists.newArrayList(); + models = new ArrayList<>(); ClusteringPolicyWritable clusteringPolicyWritable = new ClusteringPolicyWritable(); clusteringPolicyWritable.readFields(in); policy = clusteringPolicyWritable.getValue(); @@ -138,73 +136,66 @@ public class ClusterClassifier extends AbstractVectorClassifier implements Onlin models.add(element); } } - + @Override public void train(int actual, Vector instance) { models.get(actual).observe(new VectorWritable(instance)); } - + /** * Train the models given an additional weight. Unique to ClusterClassifier - * - * @param actual - * the int index of a model - * @param data - * a data Vector - * @param weight - * a double weighting factor + * + * @param actual the int index of a model + * @param data a data Vector + * @param weight a double weighting factor */ public void train(int actual, Vector data, double weight) { models.get(actual).observe(new VectorWritable(data), weight); } - + @Override public void train(long trackingKey, String groupKey, int actual, Vector instance) { models.get(actual).observe(new VectorWritable(instance)); } - + @Override public void train(long trackingKey, int actual, Vector instance) { models.get(actual).observe(new VectorWritable(instance)); } - + @Override public void close() { policy.close(this); } - + public List<Cluster> getModels() { return models; } - + public ClusteringPolicy getPolicy() { return policy; } - + public void writeToSeqFiles(Path path) throws IOException { writePolicy(policy, path); Configuration config = new Configuration(); FileSystem fs = FileSystem.get(path.toUri(), config); - SequenceFile.Writer writer = null; ClusterWritable cw = new ClusterWritable(); for (int i = 0; i < models.size(); i++) { - try { + try (SequenceFile.Writer writer = new SequenceFile.Writer(fs, config, + new Path(path, "part-" + String.format(Locale.ENGLISH, "%05d", i)), IntWritable.class, + ClusterWritable.class)) { Cluster cluster = models.get(i); cw.setValue(cluster); - writer = new SequenceFile.Writer(fs, config, - new Path(path, "part-" + String.format(Locale.ENGLISH, "%05d", i)), IntWritable.class, - ClusterWritable.class); Writable key = new IntWritable(i); writer.append(key, cw); - } finally { - Closeables.close(writer, false); } } } - + public void readFromSeqFiles(Configuration conf, Path path) throws IOException { Configuration config = new Configuration(); - List<Cluster> clusters = Lists.newArrayList(); + List<Cluster> clusters = new ArrayList<>(); for (ClusterWritable cw : new SequenceFileDirValueIterable<ClusterWritable>(path, PathType.LIST, PathFilters.logsCRCFilter(), config)) { Cluster cluster = cw.getValue(); @@ -215,7 +206,7 @@ public class ClusterClassifier extends AbstractVectorClassifier implements Onlin modelClass = models.get(0).getClass().getName(); this.policy = readPolicy(path); } - + public static ClusteringPolicy readPolicy(Path path) throws IOException { Path policyPath = new Path(path, POLICY_FILE_NAME); Configuration config = new Configuration(); @@ -227,7 +218,7 @@ public class ClusterClassifier extends AbstractVectorClassifier implements Onlin Closeables.close(reader, true); return cpw.getValue(); } - + public static void writePolicy(ClusteringPolicy policy, Path path) throws IOException { Path policyPath = new Path(path, POLICY_FILE_NAME); Configuration config = new Configuration(); http://git-wip-us.apache.org/repos/asf/mahout/blob/85f9ece6/mr/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansDriver.java ---------------------------------------------------------------------- diff --git a/mr/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansDriver.java b/mr/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansDriver.java index c6c8427..98eb944 100644 --- a/mr/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansDriver.java +++ b/mr/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansDriver.java @@ -18,9 +18,9 @@ package org.apache.mahout.clustering.fuzzykmeans; import java.io.IOException; +import java.util.ArrayList; import java.util.List; -import com.google.common.collect.Lists; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.util.ToolRunner; @@ -265,7 +265,7 @@ public class FuzzyKMeansDriver extends AbstractJob { boolean runSequential) throws IOException, InterruptedException, ClassNotFoundException { - List<Cluster> clusters = Lists.newArrayList(); + List<Cluster> clusters = new ArrayList<>(); FuzzyKMeansUtil.configureWithClusterInfo(conf, clustersIn, clusters); if (conf == null) { http://git-wip-us.apache.org/repos/asf/mahout/blob/85f9ece6/mr/src/main/java/org/apache/mahout/clustering/iterator/CIReducer.java ---------------------------------------------------------------------- diff --git a/mr/src/main/java/org/apache/mahout/clustering/iterator/CIReducer.java b/mr/src/main/java/org/apache/mahout/clustering/iterator/CIReducer.java index bf42eb1..ca63b0f 100644 --- a/mr/src/main/java/org/apache/mahout/clustering/iterator/CIReducer.java +++ b/mr/src/main/java/org/apache/mahout/clustering/iterator/CIReducer.java @@ -18,10 +18,10 @@ package org.apache.mahout.clustering.iterator; import java.io.IOException; +import java.util.ArrayList; import java.util.Iterator; import java.util.List; -import com.google.common.collect.Lists; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; @@ -43,7 +43,7 @@ public class CIReducer extends Reducer<IntWritable,ClusterWritable,IntWritable,C Cluster cluster = iter.next().getValue(); first.observe(cluster); } - List<Cluster> models = Lists.newArrayList(); + List<Cluster> models = new ArrayList<>(); models.add(first); classifier = new ClusterClassifier(models, policy); classifier.close(); http://git-wip-us.apache.org/repos/asf/mahout/blob/85f9ece6/mr/src/main/java/org/apache/mahout/clustering/iterator/FuzzyKMeansClusteringPolicy.java ---------------------------------------------------------------------- diff --git a/mr/src/main/java/org/apache/mahout/clustering/iterator/FuzzyKMeansClusteringPolicy.java b/mr/src/main/java/org/apache/mahout/clustering/iterator/FuzzyKMeansClusteringPolicy.java index bc91f24..b4e41b6 100644 --- a/mr/src/main/java/org/apache/mahout/clustering/iterator/FuzzyKMeansClusteringPolicy.java +++ b/mr/src/main/java/org/apache/mahout/clustering/iterator/FuzzyKMeansClusteringPolicy.java @@ -19,6 +19,7 @@ package org.apache.mahout.clustering.iterator; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import java.util.ArrayList; import java.util.Collection; import java.util.List; @@ -28,8 +29,6 @@ import org.apache.mahout.clustering.fuzzykmeans.FuzzyKMeansClusterer; import org.apache.mahout.clustering.fuzzykmeans.SoftCluster; import org.apache.mahout.math.Vector; -import com.google.common.collect.Lists; - /** * This is a probability-weighted clustering policy, suitable for fuzzy k-means * clustering @@ -55,8 +54,8 @@ public class FuzzyKMeansClusteringPolicy extends AbstractClusteringPolicy { @Override public Vector classify(Vector data, ClusterClassifier prior) { - Collection<SoftCluster> clusters = Lists.newArrayList(); - List<Double> distances = Lists.newArrayList(); + Collection<SoftCluster> clusters = new ArrayList<>(); + List<Double> distances = new ArrayList<>(); for (Cluster model : prior.getModels()) { SoftCluster sc = (SoftCluster) model; clusters.add(sc); http://git-wip-us.apache.org/repos/asf/mahout/blob/85f9ece6/mr/src/main/java/org/apache/mahout/clustering/kmeans/KMeansDriver.java ---------------------------------------------------------------------- diff --git a/mr/src/main/java/org/apache/mahout/clustering/kmeans/KMeansDriver.java b/mr/src/main/java/org/apache/mahout/clustering/kmeans/KMeansDriver.java index 13f6b46..3b9094e 100644 --- a/mr/src/main/java/org/apache/mahout/clustering/kmeans/KMeansDriver.java +++ b/mr/src/main/java/org/apache/mahout/clustering/kmeans/KMeansDriver.java @@ -17,9 +17,9 @@ package org.apache.mahout.clustering.kmeans; import java.io.IOException; +import java.util.ArrayList; import java.util.List; -import com.google.common.collect.Lists; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.util.ToolRunner; @@ -206,7 +206,7 @@ public class KMeansDriver extends AbstractJob { InterruptedException, ClassNotFoundException { double convergenceDelta = Double.parseDouble(delta); - List<Cluster> clusters = Lists.newArrayList(); + List<Cluster> clusters = new ArrayList<>(); KMeansUtil.configureWithClusterInfo(conf, clustersIn, clusters); if (clusters.isEmpty()) { http://git-wip-us.apache.org/repos/asf/mahout/blob/85f9ece6/mr/src/main/java/org/apache/mahout/clustering/kmeans/RandomSeedGenerator.java ---------------------------------------------------------------------- diff --git a/mr/src/main/java/org/apache/mahout/clustering/kmeans/RandomSeedGenerator.java b/mr/src/main/java/org/apache/mahout/clustering/kmeans/RandomSeedGenerator.java index cc9e4cd..fbbabc5 100644 --- a/mr/src/main/java/org/apache/mahout/clustering/kmeans/RandomSeedGenerator.java +++ b/mr/src/main/java/org/apache/mahout/clustering/kmeans/RandomSeedGenerator.java @@ -18,12 +18,11 @@ package org.apache.mahout.clustering.kmeans; import java.io.IOException; +import java.util.ArrayList; import java.util.List; import java.util.Random; import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; -import com.google.common.io.Closeables; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -85,12 +84,11 @@ public final class RandomSeedGenerator { } FileStatus[] inputFiles = fs.globStatus(inputPathPattern, PathFilters.logsCRCFilter()); - SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, outFile, Text.class, ClusterWritable.class); Random random = (seed != null) ? RandomUtils.getRandom(seed) : RandomUtils.getRandom(); - List<Text> chosenTexts = Lists.newArrayListWithCapacity(k); - List<ClusterWritable> chosenClusters = Lists.newArrayListWithCapacity(k); + List<Text> chosenTexts = new ArrayList<>(k); + List<ClusterWritable> chosenClusters = new ArrayList<>(k); int nextClusterId = 0; int index = 0; @@ -123,13 +121,12 @@ public final class RandomSeedGenerator { } } - try { + try (SequenceFile.Writer writer = + SequenceFile.createWriter(fs, conf, outFile, Text.class, ClusterWritable.class)){ for (int i = 0; i < chosenTexts.size(); i++) { writer.append(chosenTexts.get(i), chosenClusters.get(i)); } log.info("Wrote {} Klusters to {}", k, outFile); - } finally { - Closeables.close(writer, false); } } http://git-wip-us.apache.org/repos/asf/mahout/blob/85f9ece6/mr/src/main/java/org/apache/mahout/clustering/lda/cvb/CVB0Driver.java ---------------------------------------------------------------------- diff --git a/mr/src/main/java/org/apache/mahout/clustering/lda/cvb/CVB0Driver.java b/mr/src/main/java/org/apache/mahout/clustering/lda/cvb/CVB0Driver.java index 3eee446..31c0d60 100644 --- a/mr/src/main/java/org/apache/mahout/clustering/lda/cvb/CVB0Driver.java +++ b/mr/src/main/java/org/apache/mahout/clustering/lda/cvb/CVB0Driver.java @@ -16,9 +16,13 @@ */ package org.apache.mahout.clustering.lda.cvb; +import java.io.IOException; +import java.net.URI; +import java.util.ArrayList; +import java.util.List; + import com.google.common.base.Joiner; import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.filecache.DistributedCache; import org.apache.hadoop.fs.FileStatus; @@ -45,10 +49,6 @@ import org.apache.mahout.math.VectorWritable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.net.URI; -import java.util.List; - /** * See {@link CachingCVB0Mapper} for more details on scalability and room for improvement. * To try out this LDA implementation without using Hadoop, check out @@ -274,7 +274,7 @@ public class CVB0Driver extends AbstractJob { conf.set(MODEL_WEIGHT, "1"); // TODO conf.set(TEST_SET_FRACTION, String.valueOf(testFraction)); - List<Double> perplexities = Lists.newArrayList(); + List<Double> perplexities = new ArrayList<>(); for (int i = 1; i <= iterationNumber; i++) { // form path to model Path modelPath = modelPath(topicModelStateTempPath, i); http://git-wip-us.apache.org/repos/asf/mahout/blob/85f9ece6/mr/src/main/java/org/apache/mahout/clustering/lda/cvb/InMemoryCollapsedVariationalBayes0.java ---------------------------------------------------------------------- diff --git a/mr/src/main/java/org/apache/mahout/clustering/lda/cvb/InMemoryCollapsedVariationalBayes0.java b/mr/src/main/java/org/apache/mahout/clustering/lda/cvb/InMemoryCollapsedVariationalBayes0.java index 07ae100..d7d09c5 100644 --- a/mr/src/main/java/org/apache/mahout/clustering/lda/cvb/InMemoryCollapsedVariationalBayes0.java +++ b/mr/src/main/java/org/apache/mahout/clustering/lda/cvb/InMemoryCollapsedVariationalBayes0.java @@ -16,8 +16,12 @@ */ package org.apache.mahout.clustering.lda.cvb; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + import org.apache.commons.cli2.CommandLine; import org.apache.commons.cli2.Group; import org.apache.commons.cli2.Option; @@ -44,17 +48,13 @@ import org.apache.mahout.math.DenseMatrix; import org.apache.mahout.math.DenseVector; import org.apache.mahout.math.DistributedRowMatrixWriter; import org.apache.mahout.math.Matrix; +import org.apache.mahout.math.NamedVector; import org.apache.mahout.math.SparseRowMatrix; import org.apache.mahout.math.Vector; import org.apache.mahout.math.VectorWritable; -import org.apache.mahout.math.NamedVector; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.util.List; -import java.util.Map; - /** * Runs the same algorithm as {@link CVB0Driver}, but sequentially, in memory. Memory requirements * are currently: the entire corpus is read into RAM, two copies of the model (each of size @@ -111,7 +111,7 @@ public class InMemoryCollapsedVariationalBayes0 extends AbstractJob { this.terms = terms; this.initialModelCorpusFraction = modelCorpusFraction; numTerms = terms != null ? terms.length : corpus.numCols(); - Map<String, Integer> termIdMap = Maps.newHashMap(); + Map<String, Integer> termIdMap = new HashMap<>(); if (terms != null) { for (int t = 0; t < terms.length; t++) { termIdMap.put(terms[t], t); @@ -414,35 +414,12 @@ public class InMemoryCollapsedVariationalBayes0 extends AbstractJob { return 0; } - /* - private static Map<Integer, Map<String, Integer>> loadCorpus(String path) throws IOException { - List<String> lines = Resources.readLines(Resources.getResource(path), Charsets.UTF_8); - Map<Integer, Map<String, Integer>> corpus = Maps.newHashMap(); - for (int i=0; i<lines.size(); i++) { - String line = lines.get(i); - Map<String, Integer> doc = Maps.newHashMap(); - for (String s : line.split(" ")) { - s = s.replaceAll("\\W", "").toLowerCase().trim(); - if (s.length() == 0) { - continue; - } - if (!doc.containsKey(s)) { - doc.put(s, 0); - } - doc.put(s, doc.get(s) + 1); - } - corpus.put(i, doc); - } - return corpus; - } - */ - private static String[] loadDictionary(String dictionaryPath, Configuration conf) { if (dictionaryPath == null) { return null; } Path dictionaryFile = new Path(dictionaryPath); - List<Pair<Integer, String>> termList = Lists.newArrayList(); + List<Pair<Integer, String>> termList = new ArrayList<>(); int maxTermId = 0; // key is word value is id for (Pair<Writable, IntWritable> record @@ -467,7 +444,7 @@ public class InMemoryCollapsedVariationalBayes0 extends AbstractJob { throws IOException { Path vectorPath = new Path(vectorPathString); FileSystem fs = vectorPath.getFileSystem(conf); - List<Path> subPaths = Lists.newArrayList(); + List<Path> subPaths = new ArrayList<>(); if (fs.isFile(vectorPath)) { subPaths.add(vectorPath); } else { @@ -475,7 +452,7 @@ public class InMemoryCollapsedVariationalBayes0 extends AbstractJob { subPaths.add(fileStatus.getPath()); } } - List<Pair<Integer, Vector>> rowList = Lists.newArrayList(); + List<Pair<Integer, Vector>> rowList = new ArrayList<>(); int numRows = Integer.MIN_VALUE; int numCols = -1; boolean sequentialAccess = false; http://git-wip-us.apache.org/repos/asf/mahout/blob/85f9ece6/mr/src/main/java/org/apache/mahout/clustering/lda/cvb/ModelTrainer.java ---------------------------------------------------------------------- diff --git a/mr/src/main/java/org/apache/mahout/clustering/lda/cvb/ModelTrainer.java b/mr/src/main/java/org/apache/mahout/clustering/lda/cvb/ModelTrainer.java index 912b6d5..c3f2bc0 100644 --- a/mr/src/main/java/org/apache/mahout/clustering/lda/cvb/ModelTrainer.java +++ b/mr/src/main/java/org/apache/mahout/clustering/lda/cvb/ModelTrainer.java @@ -16,19 +16,10 @@ */ package org.apache.mahout.clustering.lda.cvb; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import org.apache.hadoop.fs.Path; -import org.apache.mahout.math.Matrix; -import org.apache.mahout.math.MatrixSlice; -import org.apache.mahout.math.SparseRowMatrix; -import org.apache.mahout.math.Vector; -import org.apache.mahout.math.VectorIterable; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; +import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -38,6 +29,15 @@ import java.util.concurrent.Callable; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import org.apache.hadoop.fs.Path; +import org.apache.mahout.math.Matrix; +import org.apache.mahout.math.MatrixSlice; +import org.apache.mahout.math.SparseRowMatrix; +import org.apache.mahout.math.Vector; +import org.apache.mahout.math.VectorIterable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + /** * Multithreaded LDA model trainer class, which primarily operates by running a "map/reduce" * operation, all in memory locally (ie not a hadoop job!) : the "map" operation is to take @@ -141,7 +141,7 @@ public class ModelTrainer { long startTime = System.nanoTime(); int i = 0; double[] times = new double[100]; - Map<Vector, Vector> batch = Maps.newHashMap(); + Map<Vector, Vector> batch = new HashMap<>(); int numTokensInBatch = 0; long batchStart = System.nanoTime(); while (docIterator.hasNext() && docTopicIterator.hasNext()) { @@ -185,7 +185,7 @@ public class ModelTrainer { public void batchTrain(Map<Vector, Vector> batch, boolean update, int numDocTopicsIters) { while (true) { try { - List<TrainerRunnable> runnables = Lists.newArrayList(); + List<TrainerRunnable> runnables = new ArrayList<>(); for (Map.Entry<Vector, Vector> entry : batch.entrySet()) { runnables.add(new TrainerRunnable(readModel, null, entry.getKey(), entry.getValue(), new SparseRowMatrix(numTopics, numTerms, true), http://git-wip-us.apache.org/repos/asf/mahout/blob/85f9ece6/mr/src/main/java/org/apache/mahout/clustering/lda/cvb/TopicModel.java ---------------------------------------------------------------------- diff --git a/mr/src/main/java/org/apache/mahout/clustering/lda/cvb/TopicModel.java b/mr/src/main/java/org/apache/mahout/clustering/lda/cvb/TopicModel.java index 7b7816c..9ba77c1 100644 --- a/mr/src/main/java/org/apache/mahout/clustering/lda/cvb/TopicModel.java +++ b/mr/src/main/java/org/apache/mahout/clustering/lda/cvb/TopicModel.java @@ -16,7 +16,18 @@ */ package org.apache.mahout.clustering.lda.cvb; -import com.google.common.collect.Lists; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; +import java.util.Random; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -39,17 +50,6 @@ import org.apache.mahout.math.stats.Sampler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.util.Arrays; -import java.util.Collections; -import java.util.Comparator; -import java.util.Iterator; -import java.util.List; -import java.util.Random; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; - /** * Thin wrapper around a {@link Matrix} of counts of occurrences of (topic, term) pairs. Dividing * {code topicTermCount.viewRow(topic).get(term)} by the sum over the values for all terms in that @@ -205,7 +205,7 @@ public class TopicModel implements Configurable, Iterable<MatrixSlice> { throws IOException { int numTopics = -1; int numTerms = -1; - List<Pair<Integer, Vector>> rows = Lists.newArrayList(); + List<Pair<Integer, Vector>> rows = new ArrayList<>(); for (Path modelPath : modelPaths) { for (Pair<IntWritable, VectorWritable> row : new SequenceFileIterable<IntWritable, VectorWritable>(modelPath, true, conf)) { @@ -414,7 +414,7 @@ public class TopicModel implements Configurable, Iterable<MatrixSlice> { } public static String vectorToSortedString(Vector vector, String[] dictionary) { - List<Pair<String,Double>> vectorValues = Lists.newArrayListWithCapacity(vector.getNumNondefaultElements()); + List<Pair<String,Double>> vectorValues = new ArrayList<>(vector.getNumNondefaultElements()); for (Element e : vector.nonZeroes()) { vectorValues.add(Pair.of(dictionary != null ? dictionary[e.index()] : String.valueOf(e.index()), e.get())); http://git-wip-us.apache.org/repos/asf/mahout/blob/85f9ece6/mr/src/main/java/org/apache/mahout/clustering/spectral/VectorCache.java ---------------------------------------------------------------------- diff --git a/mr/src/main/java/org/apache/mahout/clustering/spectral/VectorCache.java b/mr/src/main/java/org/apache/mahout/clustering/spectral/VectorCache.java index 60e0a2e..4ec8149 100644 --- a/mr/src/main/java/org/apache/mahout/clustering/spectral/VectorCache.java +++ b/mr/src/main/java/org/apache/mahout/clustering/spectral/VectorCache.java @@ -21,7 +21,6 @@ import java.io.IOException; import java.net.URI; import java.util.Arrays; -import com.google.common.io.Closeables; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.filecache.DistributedCache; import org.apache.hadoop.fs.FileSystem; @@ -70,12 +69,9 @@ public final class VectorCache { DistributedCache.setCacheFiles(new URI[]{output.toUri()}, conf); // set up the writer - SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf, output, - IntWritable.class, VectorWritable.class); - try { + try (SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf, output, + IntWritable.class, VectorWritable.class)){ writer.append(key, new VectorWritable(vector)); - } finally { - Closeables.close(writer, false); } if (deleteOnExit) { @@ -112,12 +108,9 @@ public final class VectorCache { */ public static Vector load(Configuration conf, Path input) throws IOException { log.info("Loading vector from: {}", input); - SequenceFileValueIterator<VectorWritable> iterator = - new SequenceFileValueIterator<>(input, true, conf); - try { + try (SequenceFileValueIterator<VectorWritable> iterator = + new SequenceFileValueIterator<>(input, true, conf)){ return iterator.next().get(); - } finally { - Closeables.close(iterator, true); } } } http://git-wip-us.apache.org/repos/asf/mahout/blob/85f9ece6/mr/src/main/java/org/apache/mahout/clustering/spectral/kmeans/EigenSeedGenerator.java ---------------------------------------------------------------------- diff --git a/mr/src/main/java/org/apache/mahout/clustering/spectral/kmeans/EigenSeedGenerator.java b/mr/src/main/java/org/apache/mahout/clustering/spectral/kmeans/EigenSeedGenerator.java index 5f9c1a6..3ce94dc 100644 --- a/mr/src/main/java/org/apache/mahout/clustering/spectral/kmeans/EigenSeedGenerator.java +++ b/mr/src/main/java/org/apache/mahout/clustering/spectral/kmeans/EigenSeedGenerator.java @@ -18,6 +18,7 @@ package org.apache.mahout.clustering.spectral.kmeans; import java.io.IOException; +import java.util.HashMap; import java.util.Map; import org.apache.hadoop.conf.Configuration; @@ -39,9 +40,6 @@ import org.apache.mahout.math.VectorWritable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.collect.Maps; -import com.google.common.io.Closeables; - /** * Given an Input Path containing a {@link org.apache.hadoop.io.SequenceFile}, select k vectors and write them to the * output file as a {@link org.apache.mahout.clustering.kmeans.Kluster} representing the initial centroid to use. The @@ -72,15 +70,14 @@ public final class EigenSeedGenerator { } FileStatus[] inputFiles = fs.globStatus(inputPathPattern, PathFilters.logsCRCFilter()); - SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, outFile, Text.class, ClusterWritable.class); - Map<Integer,Double> maxEigens = Maps.newHashMapWithExpectedSize(k); // store + Map<Integer,Double> maxEigens = new HashMap<>(k); // store // max // value // of // each // column - Map<Integer,Text> chosenTexts = Maps.newHashMapWithExpectedSize(k); - Map<Integer,ClusterWritable> chosenClusters = Maps.newHashMapWithExpectedSize(k); + Map<Integer,Text> chosenTexts = new HashMap<>(k); + Map<Integer,ClusterWritable> chosenClusters = new HashMap<>(k); for (FileStatus fileStatus : inputFiles) { if (!fileStatus.isDir()) { @@ -108,13 +105,12 @@ public final class EigenSeedGenerator { } } - try { + try (SequenceFile.Writer writer = + SequenceFile.createWriter(fs, conf, outFile, Text.class, ClusterWritable.class)){ for (Integer key : maxEigens.keySet()) { writer.append(chosenTexts.get(key), chosenClusters.get(key)); } log.info("EigenSeedGenerator:: Wrote {} Klusters to {}", chosenTexts.size(), outFile); - } finally { - Closeables.close(writer, false); } } http://git-wip-us.apache.org/repos/asf/mahout/blob/85f9ece6/mr/src/main/java/org/apache/mahout/clustering/streaming/cluster/BallKMeans.java ---------------------------------------------------------------------- diff --git a/mr/src/main/java/org/apache/mahout/clustering/streaming/cluster/BallKMeans.java b/mr/src/main/java/org/apache/mahout/clustering/streaming/cluster/BallKMeans.java index 25a4022..25806fe 100644 --- a/mr/src/main/java/org/apache/mahout/clustering/streaming/cluster/BallKMeans.java +++ b/mr/src/main/java/org/apache/mahout/clustering/streaming/cluster/BallKMeans.java @@ -17,6 +17,7 @@ package org.apache.mahout.clustering.streaming.cluster; +import java.util.ArrayList; import java.util.Collections; import java.util.Iterator; import java.util.List; @@ -26,7 +27,6 @@ import com.google.common.base.Function; import com.google.common.base.Preconditions; import com.google.common.collect.Iterables; import com.google.common.collect.Iterators; -import com.google.common.collect.Lists; import org.apache.mahout.clustering.ClusteringUtils; import org.apache.mahout.common.Pair; import org.apache.mahout.common.RandomUtils; @@ -169,7 +169,7 @@ public class BallKMeans implements Iterable<Centroid> { // If there will be no points assigned to the test set, return now. if (testProbability == 0) { return new Pair<List<? extends WeightedVector>, List<? extends WeightedVector>>(datapoints, - Lists.<WeightedVector>newArrayList()); + new ArrayList<WeightedVector>()); } int numTest = (int) (testProbability * datapoints.size()); @@ -190,7 +190,7 @@ public class BallKMeans implements Iterable<Centroid> { */ public UpdatableSearcher cluster(List<? extends WeightedVector> datapoints) { Pair<List<? extends WeightedVector>, List<? extends WeightedVector>> trainTestSplit = splitTrainTest(datapoints); - List<Vector> bestCentroids = Lists.newArrayList(); + List<Vector> bestCentroids = new ArrayList<>(); double cost = Double.POSITIVE_INFINITY; double bestCost = Double.POSITIVE_INFINITY; for (int i = 0; i < numRuns; ++i) { @@ -377,11 +377,11 @@ public class BallKMeans implements Iterable<Centroid> { DistanceMeasure distanceMeasure = centroids.getDistanceMeasure(); // closestClusterDistances.get(i) is the distance from the i'th cluster to its closest // neighboring cluster. - List<Double> closestClusterDistances = Lists.newArrayListWithExpectedSize(numClusters); + List<Double> closestClusterDistances = new ArrayList<>(numClusters); // clusterAssignments[i] == j means that the i'th point is assigned to the j'th cluster. When // these don't change, we are done. // Each point is assigned to the invalid "-1" cluster initially. - List<Integer> clusterAssignments = Lists.newArrayList(Collections.nCopies(datapoints.size(), -1)); + List<Integer> clusterAssignments = new ArrayList<>(Collections.nCopies(datapoints.size(), -1)); boolean changed = true; for (int i = 0; changed && i < maxNumIterations; i++) { @@ -398,7 +398,7 @@ public class BallKMeans implements Iterable<Centroid> { // Copies the current cluster centroids to newClusters and sets their weights to 0. This is // so we calculate the new centroids as we go through the datapoints. - List<Centroid> newCentroids = Lists.newArrayList(); + List<Centroid> newCentroids = new ArrayList<>(); for (Vector centroid : centroids) { // need a deep copy because we will mutate these values Centroid newCentroid = (Centroid)centroid.clone(); http://git-wip-us.apache.org/repos/asf/mahout/blob/85f9ece6/mr/src/main/java/org/apache/mahout/clustering/streaming/cluster/StreamingKMeans.java ---------------------------------------------------------------------- diff --git a/mr/src/main/java/org/apache/mahout/clustering/streaming/cluster/StreamingKMeans.java b/mr/src/main/java/org/apache/mahout/clustering/streaming/cluster/StreamingKMeans.java index 0e3f068..604bc9d 100644 --- a/mr/src/main/java/org/apache/mahout/clustering/streaming/cluster/StreamingKMeans.java +++ b/mr/src/main/java/org/apache/mahout/clustering/streaming/cluster/StreamingKMeans.java @@ -17,6 +17,7 @@ package org.apache.mahout.clustering.streaming.cluster; +import java.util.ArrayList; import java.util.Collections; import java.util.Iterator; import java.util.List; @@ -25,7 +26,6 @@ import java.util.Random; import com.google.common.base.Function; import com.google.common.collect.Iterables; import com.google.common.collect.Iterators; -import com.google.common.collect.Lists; import org.apache.mahout.common.RandomUtils; import org.apache.mahout.common.distance.DistanceMeasure; import org.apache.mahout.math.Centroid; @@ -323,7 +323,7 @@ public class StreamingKMeans implements Iterable<Centroid> { if (!collapseClusters && centroids.size() > clusterOvershoot * numClusters) { numClusters = (int) Math.max(numClusters, clusterLogFactor * Math.log(numProcessedDatapoints)); - List<Centroid> shuffled = Lists.newArrayList(); + List<Centroid> shuffled = new ArrayList<>(); for (Vector vector : centroids) { shuffled.add((Centroid) vector); } http://git-wip-us.apache.org/repos/asf/mahout/blob/85f9ece6/mr/src/main/java/org/apache/mahout/clustering/streaming/mapreduce/StreamingKMeansDriver.java ---------------------------------------------------------------------- diff --git a/mr/src/main/java/org/apache/mahout/clustering/streaming/mapreduce/StreamingKMeansDriver.java b/mr/src/main/java/org/apache/mahout/clustering/streaming/mapreduce/StreamingKMeansDriver.java index 73776b9..0f6f7f2 100644 --- a/mr/src/main/java/org/apache/mahout/clustering/streaming/mapreduce/StreamingKMeansDriver.java +++ b/mr/src/main/java/org/apache/mahout/clustering/streaming/mapreduce/StreamingKMeansDriver.java @@ -18,6 +18,7 @@ package org.apache.mahout.clustering.streaming.mapreduce; import java.io.IOException; +import java.util.ArrayList; import java.util.List; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; @@ -26,7 +27,6 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -425,13 +425,13 @@ public final class StreamingKMeansDriver extends AbstractJob { long start = System.currentTimeMillis(); // Run StreamingKMeans step in parallel by spawning 1 thread per input path to process. ExecutorService pool = Executors.newCachedThreadPool(); - List<Future<Iterable<Centroid>>> intermediateCentroidFutures = Lists.newArrayList(); + List<Future<Iterable<Centroid>>> intermediateCentroidFutures = new ArrayList<>(); for (FileStatus status : HadoopUtil.listStatus(FileSystem.get(conf), input, PathFilters.logsCRCFilter())) { intermediateCentroidFutures.add(pool.submit(new StreamingKMeansThread(status.getPath(), conf))); } log.info("Finished running Mappers"); // Merge the resulting "mapper" centroids. - List<Centroid> intermediateCentroids = Lists.newArrayList(); + List<Centroid> intermediateCentroids = new ArrayList<>(); for (Future<Iterable<Centroid>> futureIterable : intermediateCentroidFutures) { for (Centroid centroid : futureIterable.get()) { intermediateCentroids.add(centroid); http://git-wip-us.apache.org/repos/asf/mahout/blob/85f9ece6/mr/src/main/java/org/apache/mahout/clustering/streaming/mapreduce/StreamingKMeansMapper.java ---------------------------------------------------------------------- diff --git a/mr/src/main/java/org/apache/mahout/clustering/streaming/mapreduce/StreamingKMeansMapper.java b/mr/src/main/java/org/apache/mahout/clustering/streaming/mapreduce/StreamingKMeansMapper.java index ced11ea..f12a876 100644 --- a/mr/src/main/java/org/apache/mahout/clustering/streaming/mapreduce/StreamingKMeansMapper.java +++ b/mr/src/main/java/org/apache/mahout/clustering/streaming/mapreduce/StreamingKMeansMapper.java @@ -18,9 +18,9 @@ package org.apache.mahout.clustering.streaming.mapreduce; import java.io.IOException; +import java.util.ArrayList; import java.util.List; -import com.google.common.collect.Lists; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Writable; @@ -59,7 +59,7 @@ public class StreamingKMeansMapper extends Mapper<Writable, VectorWritable, IntW StreamingKMeansDriver.INVALID_DISTANCE_CUTOFF); if (estimatedDistanceCutoff == StreamingKMeansDriver.INVALID_DISTANCE_CUTOFF) { estimateDistanceCutoff = true; - estimatePoints = Lists.newArrayList(); + estimatePoints = new ArrayList<>(); } // There is no way of estimating the distance cutoff unless we have some data. clusterer = new StreamingKMeans(searcher, numClusters, estimatedDistanceCutoff); http://git-wip-us.apache.org/repos/asf/mahout/blob/85f9ece6/mr/src/main/java/org/apache/mahout/clustering/streaming/mapreduce/StreamingKMeansThread.java ---------------------------------------------------------------------- diff --git a/mr/src/main/java/org/apache/mahout/clustering/streaming/mapreduce/StreamingKMeansThread.java b/mr/src/main/java/org/apache/mahout/clustering/streaming/mapreduce/StreamingKMeansThread.java index acb2b56..24cc1db 100644 --- a/mr/src/main/java/org/apache/mahout/clustering/streaming/mapreduce/StreamingKMeansThread.java +++ b/mr/src/main/java/org/apache/mahout/clustering/streaming/mapreduce/StreamingKMeansThread.java @@ -17,11 +17,11 @@ package org.apache.mahout.clustering.streaming.mapreduce; +import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.concurrent.Callable; -import com.google.common.collect.Lists; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.mahout.clustering.ClusteringUtils; @@ -61,7 +61,7 @@ public class StreamingKMeansThread implements Callable<Iterable<Centroid>> { Iterator<Centroid> dataPointsIterator = dataPoints.iterator(); if (estimateDistanceCutoff == StreamingKMeansDriver.INVALID_DISTANCE_CUTOFF) { - List<Centroid> estimatePoints = Lists.newArrayListWithExpectedSize(NUM_ESTIMATE_POINTS); + List<Centroid> estimatePoints = new ArrayList<>(NUM_ESTIMATE_POINTS); while (dataPointsIterator.hasNext() && estimatePoints.size() < NUM_ESTIMATE_POINTS) { Centroid centroid = dataPointsIterator.next(); estimatePoints.add(centroid); http://git-wip-us.apache.org/repos/asf/mahout/blob/85f9ece6/mr/src/main/java/org/apache/mahout/clustering/streaming/mapreduce/StreamingKMeansUtilsMR.java ---------------------------------------------------------------------- diff --git a/mr/src/main/java/org/apache/mahout/clustering/streaming/mapreduce/StreamingKMeansUtilsMR.java b/mr/src/main/java/org/apache/mahout/clustering/streaming/mapreduce/StreamingKMeansUtilsMR.java index 4bffb2b..f00cf56 100644 --- a/mr/src/main/java/org/apache/mahout/clustering/streaming/mapreduce/StreamingKMeansUtilsMR.java +++ b/mr/src/main/java/org/apache/mahout/clustering/streaming/mapreduce/StreamingKMeansUtilsMR.java @@ -22,7 +22,6 @@ import java.io.IOException; import com.google.common.base.Function; import com.google.common.base.Preconditions; import com.google.common.collect.Iterables; -import com.google.common.io.Closeables; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -133,31 +132,23 @@ public final class StreamingKMeansUtilsMR { */ public static void writeCentroidsToSequenceFile(Iterable<Centroid> centroids, Path path, Configuration conf) throws IOException { - SequenceFile.Writer writer = null; - try { - writer = SequenceFile.createWriter(FileSystem.get(conf), conf, - path, IntWritable.class, CentroidWritable.class); + try (SequenceFile.Writer writer = SequenceFile.createWriter(FileSystem.get(conf), conf, + path, IntWritable.class, CentroidWritable.class)) { int i = 0; for (Centroid centroid : centroids) { writer.append(new IntWritable(i++), new CentroidWritable(centroid)); } - } finally { - Closeables.close(writer, true); } } public static void writeVectorsToSequenceFile(Iterable<? extends Vector> datapoints, Path path, Configuration conf) throws IOException { - SequenceFile.Writer writer = null; - try { - writer = SequenceFile.createWriter(FileSystem.get(conf), conf, - path, IntWritable.class, VectorWritable.class); + try (SequenceFile.Writer writer = SequenceFile.createWriter(FileSystem.get(conf), conf, + path, IntWritable.class, VectorWritable.class)){ int i = 0; for (Vector vector : datapoints) { writer.append(new IntWritable(i++), new VectorWritable(vector)); } - } finally { - Closeables.close(writer, true); } } } http://git-wip-us.apache.org/repos/asf/mahout/blob/85f9ece6/mr/src/main/java/org/apache/mahout/clustering/streaming/tools/ResplitSequenceFiles.java ---------------------------------------------------------------------- diff --git a/mr/src/main/java/org/apache/mahout/clustering/streaming/tools/ResplitSequenceFiles.java b/mr/src/main/java/org/apache/mahout/clustering/streaming/tools/ResplitSequenceFiles.java index 55b7848..d7ca554 100644 --- a/mr/src/main/java/org/apache/mahout/clustering/streaming/tools/ResplitSequenceFiles.java +++ b/mr/src/main/java/org/apache/mahout/clustering/streaming/tools/ResplitSequenceFiles.java @@ -22,7 +22,6 @@ import java.io.OutputStreamWriter; import java.io.PrintWriter; import java.util.Iterator; -import com.google.common.base.Charsets; import com.google.common.collect.Iterables; import org.apache.commons.cli2.CommandLine; import org.apache.commons.cli2.Group; @@ -32,6 +31,7 @@ import org.apache.commons.cli2.builder.DefaultOptionBuilder; import org.apache.commons.cli2.builder.GroupBuilder; import org.apache.commons.cli2.commandline.Parser; import org.apache.commons.cli2.util.HelpFormatter; +import org.apache.commons.io.Charsets; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -71,7 +71,7 @@ public class ResplitSequenceFiles { private void run(PrintWriter printWriter) throws IOException { conf = new Configuration(); SequenceFileDirIterable<Writable, Writable> inputIterable = new - SequenceFileDirIterable<Writable, Writable>(new Path(inputFile), PathType.LIST, conf); + SequenceFileDirIterable<>(new Path(inputFile), PathType.LIST, conf); fs = FileSystem.get(conf); int numEntries = Iterables.size(inputIterable); http://git-wip-us.apache.org/repos/asf/mahout/blob/85f9ece6/mr/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterOutputPostProcessor.java ---------------------------------------------------------------------- diff --git a/mr/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterOutputPostProcessor.java b/mr/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterOutputPostProcessor.java index 44a944d..ded76ad 100644 --- a/mr/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterOutputPostProcessor.java +++ b/mr/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterOutputPostProcessor.java @@ -17,7 +17,10 @@ package org.apache.mahout.clustering.topdown.postprocessor; -import com.google.common.collect.Maps; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -34,9 +37,6 @@ import org.apache.mahout.common.iterator.sequencefile.PathType; import org.apache.mahout.common.iterator.sequencefile.SequenceFileDirIterable; import org.apache.mahout.math.VectorWritable; -import java.io.IOException; -import java.util.Map; - /** * This class reads the output of any clustering algorithm, and, creates separate directories for different * clusters. Each cluster directory's name is its clusterId. Each and every point is written in the cluster @@ -53,7 +53,7 @@ public final class ClusterOutputPostProcessor { private final FileSystem fileSystem; private final Configuration conf; private final Path clusterPostProcessorOutput; - private final Map<String, Path> postProcessedClusterDirectories = Maps.newHashMap(); + private final Map<String, Path> postProcessedClusterDirectories = new HashMap<>(); private long uniqueVectorId = 0L; private final Map<String, SequenceFile.Writer> writersForClusters; @@ -63,7 +63,7 @@ public final class ClusterOutputPostProcessor { this.clusterPostProcessorOutput = output; this.clusteredPoints = PathDirectory.getClusterOutputClusteredPoints(clusterOutputToBeProcessed); this.conf = hadoopConfiguration; - this.writersForClusters = Maps.newHashMap(); + this.writersForClusters = new HashMap<>(); fileSystem = clusteredPoints.getFileSystem(conf); } http://git-wip-us.apache.org/repos/asf/mahout/blob/85f9ece6/mr/src/main/java/org/apache/mahout/common/AbstractJob.java ---------------------------------------------------------------------- diff --git a/mr/src/main/java/org/apache/mahout/common/AbstractJob.java b/mr/src/main/java/org/apache/mahout/common/AbstractJob.java index ec77749..b732078 100644 --- a/mr/src/main/java/org/apache/mahout/common/AbstractJob.java +++ b/mr/src/main/java/org/apache/mahout/common/AbstractJob.java @@ -19,13 +19,14 @@ package org.apache.mahout.common; import java.io.File; import java.io.IOException; +import java.util.ArrayList; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.TreeMap; import java.util.concurrent.atomic.AtomicInteger; -import com.google.common.collect.Lists; -import com.google.common.io.Closeables; +import com.google.common.base.Preconditions; import org.apache.commons.cli2.CommandLine; import org.apache.commons.cli2.Group; import org.apache.commons.cli2.Option; @@ -50,15 +51,13 @@ import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.lucene.analysis.Analyzer; +import org.apache.lucene.analysis.standard.StandardAnalyzer; import org.apache.mahout.common.commandline.DefaultOptionCreator; import org.apache.mahout.common.lucene.AnalyzerUtils; import org.apache.mahout.math.VectorWritable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.base.Preconditions; -import org.apache.lucene.analysis.standard.StandardAnalyzer; - /** * <p>Superclass of many Mahout Hadoop "jobs". A job drives configuration and launch of one or * more maps and reduces in order to accomplish some task.</p> @@ -113,7 +112,7 @@ public abstract class AbstractJob extends Configured implements Tool { private Group group; protected AbstractJob() { - options = Lists.newLinkedList(); + options = new LinkedList<>(); } /** Returns the input path established by a call to {@link #parseArguments(String[])}. @@ -451,24 +450,15 @@ public abstract class AbstractJob extends Configured implements Tool { * @return the cardinality of the vector */ public int getDimensions(Path matrix) throws IOException { - - SequenceFile.Reader reader = null; - try { - reader = new SequenceFile.Reader(FileSystem.get(getConf()), matrix, getConf()); - + try (SequenceFile.Reader reader = new SequenceFile.Reader(FileSystem.get(getConf()), matrix, getConf())){ Writable row = ClassUtils.instantiateAs(reader.getKeyClass().asSubclass(Writable.class), Writable.class); - Preconditions.checkArgument(reader.getValueClass().equals(VectorWritable.class), "value type of sequencefile must be a VectorWritable"); VectorWritable vectorWritable = new VectorWritable(); boolean hasAtLeastOneRow = reader.next(row, vectorWritable); Preconditions.checkState(hasAtLeastOneRow, "matrix must have at least one row"); - return vectorWritable.get().size(); - - } finally { - Closeables.close(reader, true); } } @@ -523,7 +513,7 @@ public abstract class AbstractJob extends Configured implements Tool { // nulls are ok, for cases where options are simple flags. List<?> vo = cmdLine.getValues(o); if (vo != null && !vo.isEmpty()) { - List<String> vals = Lists.newArrayList(); + List<String> vals = new ArrayList<>(); for (Object o1 : vo) { vals.add(o1.toString()); } http://git-wip-us.apache.org/repos/asf/mahout/blob/85f9ece6/mr/src/main/java/org/apache/mahout/common/CommandLineUtil.java ---------------------------------------------------------------------- diff --git a/mr/src/main/java/org/apache/mahout/common/CommandLineUtil.java b/mr/src/main/java/org/apache/mahout/common/CommandLineUtil.java index 0cc93ba..ac4ab88 100644 --- a/mr/src/main/java/org/apache/mahout/common/CommandLineUtil.java +++ b/mr/src/main/java/org/apache/mahout/common/CommandLineUtil.java @@ -21,10 +21,10 @@ import java.io.IOException; import java.io.OutputStreamWriter; import java.io.PrintWriter; -import com.google.common.base.Charsets; import org.apache.commons.cli2.Group; import org.apache.commons.cli2.OptionException; import org.apache.commons.cli2.util.HelpFormatter; +import org.apache.commons.io.Charsets; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.util.GenericOptionsParser; http://git-wip-us.apache.org/repos/asf/mahout/blob/85f9ece6/mr/src/main/java/org/apache/mahout/common/HadoopUtil.java ---------------------------------------------------------------------- diff --git a/mr/src/main/java/org/apache/mahout/common/HadoopUtil.java b/mr/src/main/java/org/apache/mahout/common/HadoopUtil.java index f693821..27e5686 100644 --- a/mr/src/main/java/org/apache/mahout/common/HadoopUtil.java +++ b/mr/src/main/java/org/apache/mahout/common/HadoopUtil.java @@ -21,6 +21,7 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; import java.net.URI; +import java.util.ArrayList; import java.util.Arrays; import java.util.Comparator; import java.util.Iterator; @@ -28,8 +29,6 @@ import java.util.List; import com.google.common.base.Joiner; import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; -import com.google.common.io.Closeables; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.filecache.DistributedCache; import org.apache.hadoop.fs.FSDataInputStream; @@ -326,21 +325,15 @@ public final class HadoopUtil { public static void writeInt(int value, Path path, Configuration configuration) throws IOException { FileSystem fs = FileSystem.get(path.toUri(), configuration); - FSDataOutputStream out = fs.create(path); - try { + try (FSDataOutputStream out = fs.create(path)) { out.writeInt(value); - } finally { - Closeables.close(out, false); } } public static int readInt(Path path, Configuration configuration) throws IOException { FileSystem fs = FileSystem.get(path.toUri(), configuration); - FSDataInputStream in = fs.open(path); - try { + try (FSDataInputStream in = fs.open(path)) { return in.readInt(); - } finally { - Closeables.close(in, true); } } @@ -353,7 +346,7 @@ public final class HadoopUtil { */ public static String buildDirList(FileSystem fs, FileStatus fileStatus) throws IOException { boolean containsFiles = false; - List<String> directoriesList = Lists.newArrayList(); + List<String> directoriesList = new ArrayList<>(); for (FileStatus childFileStatus : fs.listStatus(fileStatus.getPath())) { if (childFileStatus.isDir()) { String subDirectoryList = buildDirList(fs, childFileStatus); @@ -379,7 +372,7 @@ public final class HadoopUtil { */ public static String buildDirList(FileSystem fs, FileStatus fileStatus, PathFilter pathFilter) throws IOException { boolean containsFiles = false; - List<String> directoriesList = Lists.newArrayList(); + List<String> directoriesList = new ArrayList<>(); for (FileStatus childFileStatus : fs.listStatus(fileStatus.getPath(), pathFilter)) { if (childFileStatus.isDir()) { String subDirectoryList = buildDirList(fs, childFileStatus); http://git-wip-us.apache.org/repos/asf/mahout/blob/85f9ece6/mr/src/main/java/org/apache/mahout/common/distance/MahalanobisDistanceMeasure.java ---------------------------------------------------------------------- diff --git a/mr/src/main/java/org/apache/mahout/common/distance/MahalanobisDistanceMeasure.java b/mr/src/main/java/org/apache/mahout/common/distance/MahalanobisDistanceMeasure.java index a8fa091..17ee714 100644 --- a/mr/src/main/java/org/apache/mahout/common/distance/MahalanobisDistanceMeasure.java +++ b/mr/src/main/java/org/apache/mahout/common/distance/MahalanobisDistanceMeasure.java @@ -17,9 +17,14 @@ package org.apache.mahout.common.distance; +import java.io.DataInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; -import com.google.common.io.Closeables; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -37,12 +42,6 @@ import org.apache.mahout.math.SingularValueDecomposition; import org.apache.mahout.math.Vector; import org.apache.mahout.math.VectorWritable; -import java.io.DataInputStream; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.util.Collection; -import java.util.List; - //See http://en.wikipedia.org/wiki/Mahalanobis_distance for details public class MahalanobisDistanceMeasure implements DistanceMeasure { @@ -77,11 +76,8 @@ public class MahalanobisDistanceMeasure implements DistanceMeasure { if (!fs.exists(inverseCovarianceFile.get())) { throw new FileNotFoundException(inverseCovarianceFile.get().toString()); } - DataInputStream in = fs.open(inverseCovarianceFile.get()); - try { + try (DataInputStream in = fs.open(inverseCovarianceFile.get())){ inverseCovarianceMatrix.readFields(in); - } finally { - Closeables.close(in, true); } this.inverseCovarianceMatrix = inverseCovarianceMatrix.get(); Preconditions.checkArgument(this.inverseCovarianceMatrix != null, "inverseCovarianceMatrix not initialized"); @@ -94,11 +90,8 @@ public class MahalanobisDistanceMeasure implements DistanceMeasure { if (!fs.exists(meanVectorFile.get())) { throw new FileNotFoundException(meanVectorFile.get().toString()); } - DataInputStream in = fs.open(meanVectorFile.get()); - try { + try (DataInputStream in = fs.open(meanVectorFile.get())){ meanVector.readFields(in); - } finally { - Closeables.close(in, true); } this.meanVector = meanVector.get(); Preconditions.checkArgument(this.meanVector != null, "meanVector not initialized"); @@ -116,7 +109,7 @@ public class MahalanobisDistanceMeasure implements DistanceMeasure { @Override public void createParameters(String prefix, Configuration jobConf) { - parameters = Lists.newArrayList(); + parameters = new ArrayList<>(); inverseCovarianceFile = new PathParameter(prefix, "inverseCovarianceFile", jobConf, null, "Path on DFS to a file containing the inverse covariance matrix."); parameters.add(inverseCovarianceFile); http://git-wip-us.apache.org/repos/asf/mahout/blob/85f9ece6/mr/src/main/java/org/apache/mahout/common/distance/MinkowskiDistanceMeasure.java ---------------------------------------------------------------------- diff --git a/mr/src/main/java/org/apache/mahout/common/distance/MinkowskiDistanceMeasure.java b/mr/src/main/java/org/apache/mahout/common/distance/MinkowskiDistanceMeasure.java index 3a57f2f..c3a48cb 100644 --- a/mr/src/main/java/org/apache/mahout/common/distance/MinkowskiDistanceMeasure.java +++ b/mr/src/main/java/org/apache/mahout/common/distance/MinkowskiDistanceMeasure.java @@ -17,10 +17,10 @@ package org.apache.mahout.common.distance; +import java.util.ArrayList; import java.util.Collection; import java.util.List; -import com.google.common.collect.Lists; import org.apache.hadoop.conf.Configuration; import org.apache.mahout.common.parameters.DoubleParameter; import org.apache.mahout.common.parameters.Parameter; @@ -50,7 +50,7 @@ public class MinkowskiDistanceMeasure implements DistanceMeasure { @Override public void createParameters(String prefix, Configuration conf) { - parameters = Lists.newArrayList(); + parameters = new ArrayList<>(); Parameter<?> param = new DoubleParameter(prefix, "exponent", conf, EXPONENT, "Exponent for Fractional Lagrange distance"); parameters.add(param); http://git-wip-us.apache.org/repos/asf/mahout/blob/85f9ece6/mr/src/main/java/org/apache/mahout/common/distance/WeightedDistanceMeasure.java ---------------------------------------------------------------------- diff --git a/mr/src/main/java/org/apache/mahout/common/distance/WeightedDistanceMeasure.java b/mr/src/main/java/org/apache/mahout/common/distance/WeightedDistanceMeasure.java index 0c1d2cd..1acbe86 100644 --- a/mr/src/main/java/org/apache/mahout/common/distance/WeightedDistanceMeasure.java +++ b/mr/src/main/java/org/apache/mahout/common/distance/WeightedDistanceMeasure.java @@ -20,11 +20,10 @@ package org.apache.mahout.common.distance; import java.io.DataInputStream; import java.io.FileNotFoundException; import java.io.IOException; +import java.util.ArrayList; import java.util.Collection; import java.util.List; -import com.google.common.collect.Lists; -import com.google.common.io.Closeables; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -46,7 +45,7 @@ public abstract class WeightedDistanceMeasure implements DistanceMeasure { @Override public void createParameters(String prefix, Configuration jobConf) { - parameters = Lists.newArrayList(); + parameters = new ArrayList<>(); weightsFile = new PathParameter(prefix, "weightsFile", jobConf, null, "Path on DFS to a file containing the weights."); parameters.add(weightsFile); @@ -73,11 +72,8 @@ public abstract class WeightedDistanceMeasure implements DistanceMeasure { if (!fs.exists(weightsFile.get())) { throw new FileNotFoundException(weightsFile.get().toString()); } - DataInputStream in = fs.open(weightsFile.get()); - try { + try (DataInputStream in = fs.open(weightsFile.get())){ weights.readFields(in); - } finally { - Closeables.close(in, true); } this.weights = weights.get(); } http://git-wip-us.apache.org/repos/asf/mahout/blob/85f9ece6/mr/src/main/java/org/apache/mahout/common/distance/WeightedEuclideanDistanceMeasure.java ---------------------------------------------------------------------- diff --git a/mr/src/main/java/org/apache/mahout/common/distance/WeightedEuclideanDistanceMeasure.java b/mr/src/main/java/org/apache/mahout/common/distance/WeightedEuclideanDistanceMeasure.java index c6889e2..4c78d9f 100644 --- a/mr/src/main/java/org/apache/mahout/common/distance/WeightedEuclideanDistanceMeasure.java +++ b/mr/src/main/java/org/apache/mahout/common/distance/WeightedEuclideanDistanceMeasure.java @@ -17,7 +17,6 @@ package org.apache.mahout.common.distance; - import org.apache.mahout.math.Vector; import org.apache.mahout.math.Vector.Element; http://git-wip-us.apache.org/repos/asf/mahout/blob/85f9ece6/mr/src/main/java/org/apache/mahout/driver/MahoutDriver.java ---------------------------------------------------------------------- diff --git a/mr/src/main/java/org/apache/mahout/driver/MahoutDriver.java b/mr/src/main/java/org/apache/mahout/driver/MahoutDriver.java index 1fd5506..5c5b8a4 100644 --- a/mr/src/main/java/org/apache/mahout/driver/MahoutDriver.java +++ b/mr/src/main/java/org/apache/mahout/driver/MahoutDriver.java @@ -19,13 +19,13 @@ package org.apache.mahout.driver; import java.io.IOException; import java.io.InputStream; +import java.util.ArrayList; import java.util.Arrays; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; import com.google.common.io.Closeables; import org.apache.hadoop.util.ProgramDriver; import org.slf4j.Logger; @@ -135,10 +135,10 @@ public final class MahoutDriver { mainProps = new Properties(); } - Map<String,String[]> argMap = Maps.newHashMap(); + Map<String,String[]> argMap = new HashMap<>(); int i = 0; while (i < args.length && args[i] != null) { - List<String> argValues = Lists.newArrayList(); + List<String> argValues = new ArrayList<>(); String arg = args[i]; i++; if (arg.startsWith("-D")) { // '-Dkey=value' or '-Dkey=value1,value2,etc' case @@ -170,7 +170,7 @@ public final class MahoutDriver { } // Now add command-line args - List<String> argsList = Lists.newArrayList(); + List<String> argsList = new ArrayList<>(); argsList.add(progName); for (Map.Entry<String,String[]> entry : argMap.entrySet()) { String arg = entry.getKey(); http://git-wip-us.apache.org/repos/asf/mahout/blob/85f9ece6/mr/src/main/java/org/apache/mahout/ep/EvolutionaryProcess.java ---------------------------------------------------------------------- diff --git a/mr/src/main/java/org/apache/mahout/ep/EvolutionaryProcess.java b/mr/src/main/java/org/apache/mahout/ep/EvolutionaryProcess.java index b744287..4b2eea1 100644 --- a/mr/src/main/java/org/apache/mahout/ep/EvolutionaryProcess.java +++ b/mr/src/main/java/org/apache/mahout/ep/EvolutionaryProcess.java @@ -17,14 +17,11 @@ package org.apache.mahout.ep; -import com.google.common.collect.Lists; -import org.apache.hadoop.io.Writable; -import org.apache.mahout.classifier.sgd.PolymorphicWritable; - import java.io.Closeable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.List; @@ -35,6 +32,10 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import com.google.common.collect.Lists; +import org.apache.hadoop.io.Writable; +import org.apache.mahout.classifier.sgd.PolymorphicWritable; + /** * Allows evolutionary optimization where the state function can't be easily * packaged for the optimizer to execute. A good example of this is with @@ -82,7 +83,7 @@ public class EvolutionaryProcess<T extends Payload<U>, U> implements Writable, C private int populationSize; public EvolutionaryProcess() { - population = Lists.newArrayList(); + population = new ArrayList<>(); } /** @@ -119,7 +120,7 @@ public class EvolutionaryProcess<T extends Payload<U>, U> implements Writable, C Collections.sort(population); // we copy here to avoid concurrent modification - List<State<T, U>> parents = Lists.newArrayList(population.subList(0, survivors)); + List<State<T, U>> parents = new ArrayList<>(population.subList(0, survivors)); population.subList(survivors, population.size()).clear(); // fill out the population with offspring from the survivors @@ -140,7 +141,7 @@ public class EvolutionaryProcess<T extends Payload<U>, U> implements Writable, C * and rethrown nested in an ExecutionException. */ public State<T, U> parallelDo(final Function<Payload<U>> fn) throws InterruptedException, ExecutionException { - Collection<Callable<State<T, U>>> tasks = Lists.newArrayList(); + Collection<Callable<State<T, U>>> tasks = new ArrayList<>(); for (final State<T, U> state : population) { tasks.add(new Callable<State<T, U>>() { @Override @@ -219,7 +220,7 @@ public class EvolutionaryProcess<T extends Payload<U>, U> implements Writable, C public void readFields(DataInput input) throws IOException { setThreadCount(input.readInt()); int n = input.readInt(); - population = Lists.newArrayList(); + population = new ArrayList<>(); for (int i = 0; i < n; i++) { State<T, U> state = (State<T, U>) PolymorphicWritable.read(input, State.class); population.add(state);
