[
https://issues.apache.org/jira/browse/MAHOUT-1498?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Sergey updated MAHOUT-1498:
---------------------------
Status: Patch Available (was: Open)
>From 4c95084229830b88f0bdec63b0967e5d6fceb58d Mon Sep 17 00:00:00 2001
From: seregasheypak <[email protected]>
Date: Sun, 18 May 2014 20:20:16 +0400
Subject: [PATCH] MAHOUT-1498 Do not reset app jars stored in
DistributedCache. Now you can run it as oozie Java action.
Just bundle dependent jars in workflow/lib folder.
---
.../mahout/util/DistributedCacheFileLocator.java | 47 ++++++++++++++++++++
.../mahout/vectorizer/DictionaryVectorizer.java | 21 ++++-----
.../vectorizer/term/TFPartialVectorReducer.java | 14 +++---
.../mahout/vectorizer/tfidf/TFIDFConverter.java | 11 +++--
.../tfidf/TFIDFPartialVectorReducer.java | 6 ++-
.../util/DistributedCacheFileLocatorTest.java | 38 ++++++++++++++++
6 files changed, 109 insertions(+), 28 deletions(-)
create mode 100644
mrlegacy/src/main/java/org/apache/mahout/util/DistributedCacheFileLocator.java
create mode 100644
mrlegacy/src/test/java/org/apache/mahout/util/DistributedCacheFileLocatorTest.java
diff --git
a/mrlegacy/src/main/java/org/apache/mahout/util/DistributedCacheFileLocator.java
b/mrlegacy/src/main/java/org/apache/mahout/util/DistributedCacheFileLocator.java
new file mode 100644
index 0000000..8a59908
--- /dev/null
+++
b/mrlegacy/src/main/java/org/apache/mahout/util/DistributedCacheFileLocator.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.util;
+
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URI;
+
+public class DistributedCacheFileLocator {
+
+ private static final Logger log =
LoggerFactory.getLogger(DistributedCacheFileLocator.class);
+
+ /**
+ * Finds a file in Distributed cache
+ * @param aPartOfName is a substring in file name
+ * @param localFiles holds references to files stored in distributed cache
+ * @return Path instance to first matched file or null
+ * */
+ public Path findByContainsInName(String aPartOfName, URI[] localFiles){
+ for(URI distCacheFile : localFiles){
+ log.debug("find a file in distributed cache by part of name {}",
aPartOfName);
+ if(distCacheFile!=null &&
distCacheFile.toString().contains(aPartOfName)){
+ log.debug("found a file [{}] using a part of name[{}]",
distCacheFile.toString(), aPartOfName);
+ return new Path(distCacheFile.getPath());
+ }
+ }
+ return null;
+ }
+
+}
diff --git
a/mrlegacy/src/main/java/org/apache/mahout/vectorizer/DictionaryVectorizer.java
b/mrlegacy/src/main/java/org/apache/mahout/vectorizer/DictionaryVectorizer.java
index 99ef019..64e5a67 100644
---
a/mrlegacy/src/main/java/org/apache/mahout/vectorizer/DictionaryVectorizer.java
+++
b/mrlegacy/src/main/java/org/apache/mahout/vectorizer/DictionaryVectorizer.java
@@ -17,11 +17,6 @@
package org.apache.mahout.vectorizer;
-import java.io.IOException;
-import java.net.URI;
-import java.util.Collection;
-import java.util.List;
-
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.io.Closeables;
@@ -29,11 +24,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
@@ -59,6 +50,10 @@ import org.apache.mahout.vectorizer.term.TermCountReducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+
/**
* This class converts a set of input documents in the sequence file format to
vectors. The Sequence file
* input should have a {@link Text} key containing the unique document
identifier and a {@link StringTuple}
@@ -73,7 +68,7 @@ public final class DictionaryVectorizer extends AbstractJob
implements Vectorize
public static final String MAX_NGRAMS = "max.ngrams";
public static final int DEFAULT_MIN_SUPPORT = 2;
- private static final String DICTIONARY_FILE = "dictionary.file-";
+ public static final String DICTIONARY_FILE = "dictionary.file-";
private static final int MAX_CHUNKSIZE = 10000;
private static final int MIN_CHUNKSIZE = 100;
private static final String OUTPUT_FILES_PATTERN = "part-*";
@@ -301,8 +296,8 @@ public final class DictionaryVectorizer extends AbstractJob
implements Vectorize
conf.setInt(PartialVectorMerger.DIMENSION, dimension);
conf.setBoolean(PartialVectorMerger.SEQUENTIAL_ACCESS, sequentialAccess);
conf.setBoolean(PartialVectorMerger.NAMED_VECTOR, namedVectors);
- conf.setInt(MAX_NGRAMS, maxNGramSize);
- DistributedCache.setCacheFiles(new URI[] {dictionaryFilePath.toUri()},
conf);
+ conf.setInt(MAX_NGRAMS, maxNGramSize);
+ DistributedCache.addCacheFile(dictionaryFilePath.toUri(), conf);
Job job = new Job(conf);
job.setJobName("DictionaryVectorizer::MakePartialVectors: input-folder: "
+ input
diff --git
a/mrlegacy/src/main/java/org/apache/mahout/vectorizer/term/TFPartialVectorReducer.java
b/mrlegacy/src/main/java/org/apache/mahout/vectorizer/term/TFPartialVectorReducer.java
index 634b335..6b62a23 100644
---
a/mrlegacy/src/main/java/org/apache/mahout/vectorizer/term/TFPartialVectorReducer.java
+++
b/mrlegacy/src/main/java/org/apache/mahout/vectorizer/term/TFPartialVectorReducer.java
@@ -19,6 +19,7 @@ package org.apache.mahout.vectorizer.term;
import com.google.common.io.Closeables;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
@@ -26,21 +27,18 @@ import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.lucene.analysis.shingle.ShingleFilter;
import org.apache.lucene.analysis.tokenattributes.CharTermAttribute;
-import org.apache.mahout.common.HadoopUtil;
import org.apache.mahout.common.Pair;
import org.apache.mahout.common.StringTuple;
import org.apache.mahout.common.iterator.sequencefile.SequenceFileIterable;
import org.apache.mahout.common.lucene.IteratorTokenStream;
-import org.apache.mahout.math.NamedVector;
-import org.apache.mahout.math.RandomAccessSparseVector;
-import org.apache.mahout.math.SequentialAccessSparseVector;
-import org.apache.mahout.math.Vector;
-import org.apache.mahout.math.VectorWritable;
+import org.apache.mahout.math.*;
import org.apache.mahout.math.map.OpenObjectIntHashMap;
+import org.apache.mahout.util.DistributedCacheFileLocator;
import org.apache.mahout.vectorizer.DictionaryVectorizer;
import org.apache.mahout.vectorizer.common.PartialVectorMerger;
import java.io.IOException;
+import java.net.URI;
import java.util.Iterator;
/**
@@ -120,8 +118,8 @@ public class TFPartialVectorReducer extends Reducer<Text,
StringTuple, Text, Vec
namedVector = conf.getBoolean(PartialVectorMerger.NAMED_VECTOR, false);
maxNGramSize = conf.getInt(DictionaryVectorizer.MAX_NGRAMS, maxNGramSize);
- //MAHOUT-1247
- Path dictionaryFile = HadoopUtil.getSingleCachedFile(conf);
+ URI[] localFiles = DistributedCache.getCacheFiles(conf);
+ Path dictionaryFile = new
DistributedCacheFileLocator().findByContainsInName(DictionaryVectorizer.DICTIONARY_FILE,
localFiles);
// key is word value is id
for (Pair<Writable, IntWritable> record
: new SequenceFileIterable<Writable, IntWritable>(dictionaryFile,
true, conf)) {
diff --git
a/mrlegacy/src/main/java/org/apache/mahout/vectorizer/tfidf/TFIDFConverter.java
b/mrlegacy/src/main/java/org/apache/mahout/vectorizer/tfidf/TFIDFConverter.java
index db602dd..4586d77 100644
---
a/mrlegacy/src/main/java/org/apache/mahout/vectorizer/tfidf/TFIDFConverter.java
+++
b/mrlegacy/src/main/java/org/apache/mahout/vectorizer/tfidf/TFIDFConverter.java
@@ -17,10 +17,6 @@
package org.apache.mahout.vectorizer.tfidf;
-import java.io.IOException;
-import java.net.URI;
-import java.util.List;
-
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.io.Closeables;
@@ -47,6 +43,9 @@ import
org.apache.mahout.vectorizer.common.PartialVectorMerger;
import org.apache.mahout.vectorizer.term.TermDocumentCountMapper;
import org.apache.mahout.vectorizer.term.TermDocumentCountReducer;
+import java.io.IOException;
+import java.util.List;
+
/**
* This class converts a set of input vectors with term frequencies to TfIdf
vectors. The Sequence file input
* should have a {@link org.apache.hadoop.io.WritableComparable} key
containing and a
@@ -64,7 +63,7 @@ public final class TFIDFConverter {
//public static final String TFIDF_OUTPUT_FOLDER = "tfidf";
private static final String DOCUMENT_VECTOR_OUTPUT_FOLDER = "tfidf-vectors";
- private static final String FREQUENCY_FILE = "frequency.file-";
+ public static final String FREQUENCY_FILE = "frequency.file-";
private static final int MAX_CHUNKSIZE = 10000;
private static final int MIN_CHUNKSIZE = 100;
private static final String OUTPUT_FILES_PATTERN = "part-*";
@@ -299,7 +298,7 @@ public final class TFIDFConverter {
conf.setLong(MAX_DF, maxDF);
conf.setBoolean(PartialVectorMerger.SEQUENTIAL_ACCESS, sequentialAccess);
conf.setBoolean(PartialVectorMerger.NAMED_VECTOR, namedVector);
- DistributedCache.setCacheFiles(new URI[] {dictionaryFilePath.toUri()},
conf);
+ DistributedCache.addCacheFile(dictionaryFilePath.toUri(), conf);
Job job = new Job(conf);
job.setJobName(": MakePartialVectors: input-folder: " + input + ",
dictionary-file: "
diff --git
a/mrlegacy/src/main/java/org/apache/mahout/vectorizer/tfidf/TFIDFPartialVectorReducer.java
b/mrlegacy/src/main/java/org/apache/mahout/vectorizer/tfidf/TFIDFPartialVectorReducer.java
index ebb4d2b..1be630e 100644
---
a/mrlegacy/src/main/java/org/apache/mahout/vectorizer/tfidf/TFIDFPartialVectorReducer.java
+++
b/mrlegacy/src/main/java/org/apache/mahout/vectorizer/tfidf/TFIDFPartialVectorReducer.java
@@ -18,9 +18,11 @@
package org.apache.mahout.vectorizer.tfidf;
import java.io.IOException;
+import java.net.URI;
import java.util.Iterator;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
@@ -35,6 +37,7 @@ import org.apache.mahout.math.SequentialAccessSparseVector;
import org.apache.mahout.math.Vector;
import org.apache.mahout.math.VectorWritable;
import org.apache.mahout.math.map.OpenIntLongHashMap;
+import org.apache.mahout.util.DistributedCacheFileLocator;
import org.apache.mahout.vectorizer.TFIDF;
import org.apache.mahout.vectorizer.common.PartialVectorMerger;
@@ -106,7 +109,8 @@ public class TFIDFPartialVectorReducer extends
sequentialAccess = conf.getBoolean(PartialVectorMerger.SEQUENTIAL_ACCESS,
false);
namedVector = conf.getBoolean(PartialVectorMerger.NAMED_VECTOR, false);
- Path dictionaryFile = HadoopUtil.getSingleCachedFile(conf);
+ URI[] localFiles = DistributedCache.getCacheFiles(conf);
+ Path dictionaryFile = new
DistributedCacheFileLocator().findByContainsInName(TFIDFConverter.FREQUENCY_FILE,
localFiles);
// key is feature, value is the document frequency
for (Pair<IntWritable,LongWritable> record
: new SequenceFileIterable<IntWritable,LongWritable>(dictionaryFile,
true, conf)) {
diff --git
a/mrlegacy/src/test/java/org/apache/mahout/util/DistributedCacheFileLocatorTest.java
b/mrlegacy/src/test/java/org/apache/mahout/util/DistributedCacheFileLocatorTest.java
new file mode 100644
index 0000000..d9cd7d1
--- /dev/null
+++
b/mrlegacy/src/test/java/org/apache/mahout/util/DistributedCacheFileLocatorTest.java
@@ -0,0 +1,38 @@
+package org.apache.mahout.util;
+
+import org.apache.hadoop.fs.Path;
+import org.junit.Test;
+
+import java.io.File;
+import java.net.URI;
+
+/**
+ * User: sergey.sheypak
+ * Date: 18.05.14
+ * Time: 16:50
+ */
+public class DistributedCacheFileLocatorTest extends org.junit.Assert {
+
+ public static final File FILE_I_WANT_TO_FIND = new
File("file/i_want_to_find.txt");
+
+ @Test
+ public void testFindNothing(){
+ Path nullPath = aSut().findByContainsInName("there is no such file",
distributedCacheFiles());
+ assertNull(nullPath);
+ }
+
+ @Test
+ public void testFindFile(){
+ Path path = aSut().findByContainsInName("want_to_find",
distributedCacheFiles());
+ assertNotNull(path);
+ assertEquals(FILE_I_WANT_TO_FIND.getName(), path.getName());
+ }
+
+ private DistributedCacheFileLocator aSut(){
+ return new DistributedCacheFileLocator();
+ }
+
+ private URI[] distributedCacheFiles(){
+ return new URI[]{new File("/first/file").toURI(), new
File("/second/file").toURI(), FILE_I_WANT_TO_FIND.toURI()};
+ }
+}
--
1.7.10.4
> DistributedCache.setCacheFiles in DictionaryVectorizer overwrites jars pushed
> using oozie
> -----------------------------------------------------------------------------------------
>
> Key: MAHOUT-1498
> URL: https://issues.apache.org/jira/browse/MAHOUT-1498
> Project: Mahout
> Issue Type: Bug
> Affects Versions: 0.7
> Environment: mahout-core-0.7-cdh4.4.0.jar
> Reporter: Sergey
> Labels: patch
> Fix For: 1.0
>
> Attachments: MAHOUT-1498.patch
>
>
> Hi, I get exception
> {code}
> <<< Invocation of Main class completed <<<
> Failing Oozie Launcher, Main class
> [org.apache.mahout.vectorizer.SparseVectorsFromSequenceFiles], main() threw
> exception, Job failed!
> java.lang.IllegalStateException: Job failed!
> at
> org.apache.mahout.vectorizer.DictionaryVectorizer.makePartialVectors(DictionaryVectorizer.java:329)
> at
> org.apache.mahout.vectorizer.DictionaryVectorizer.createTermFrequencyVectors(DictionaryVectorizer.java:199)
> at
> org.apache.mahout.vectorizer.SparseVectorsFromSequenceFiles.run(SparseVectorsFromSequenceFiles.java:271)
> {code}
> The root cause is:
> {code}
> Error: java.lang.ClassNotFoundException: org.apache.mahout.math.Vector
> at java.net.URLClassLoader$1.run(URLClassLoader.java:202)
> at java.security.AccessController.doPrivileged(Native Method)
> at java.net.URLClassLoader.findClass(URLClassLoader.java:190)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:306)
> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:301)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:247)
> at java.lang.Class.forName0(Native Method)
> at java.lang.Class.forName(Class.java:247
> {code}
> Looks like it happens because of
> DictionaryVectorizer.makePartialVectors method.
> It has code:
> {code}
> DistributedCache.setCacheFiles(new URI[] {dictionaryFilePath.toUri()}, conf);
> {code}
> which overrides jars pushed with job by oozie:
> {code}
> public static void More ...setCacheFiles(URI[] files, Configuration conf) {
> String sfiles = StringUtils.uriToString(files);
> conf.set("mapred.cache.files", sfiles);
> }
> {code}
--
This message was sent by Atlassian JIRA
(v6.2#6252)