[ 
https://issues.apache.org/jira/browse/MAHOUT-1498?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sergey updated MAHOUT-1498:
---------------------------

    Labels: patch  (was: )
    Status: Patch Available  (was: Open)

>From 4c95084229830b88f0bdec63b0967e5d6fceb58d Mon Sep 17 00:00:00 2001
From: seregasheypak <[email protected]>
Date: Sun, 18 May 2014 20:20:16 +0400
Subject: [PATCH] MAHOUT-1498 Do not reset app jars stored in
 DistributedCache. Now you can run it as oozie Java action.
 Just bundle dependent jars in workflow/lib folder.

---
 .../mahout/util/DistributedCacheFileLocator.java   |   47 ++++++++++++++++++++
 .../mahout/vectorizer/DictionaryVectorizer.java    |   21 ++++-----
 .../vectorizer/term/TFPartialVectorReducer.java    |   14 +++---
 .../mahout/vectorizer/tfidf/TFIDFConverter.java    |   11 +++--
 .../tfidf/TFIDFPartialVectorReducer.java           |    6 ++-
 .../util/DistributedCacheFileLocatorTest.java      |   38 ++++++++++++++++
 6 files changed, 109 insertions(+), 28 deletions(-)
 create mode 100644 
mrlegacy/src/main/java/org/apache/mahout/util/DistributedCacheFileLocator.java
 create mode 100644 
mrlegacy/src/test/java/org/apache/mahout/util/DistributedCacheFileLocatorTest.java

diff --git 
a/mrlegacy/src/main/java/org/apache/mahout/util/DistributedCacheFileLocator.java
 
b/mrlegacy/src/main/java/org/apache/mahout/util/DistributedCacheFileLocator.java
new file mode 100644
index 0000000..8a59908
--- /dev/null
+++ 
b/mrlegacy/src/main/java/org/apache/mahout/util/DistributedCacheFileLocator.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.util;
+
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URI;
+
+public class DistributedCacheFileLocator {
+
+    private static final Logger log = 
LoggerFactory.getLogger(DistributedCacheFileLocator.class);
+
+    /**
+     * Finds a file in Distributed cache
+     * @param aPartOfName is a substring in file name
+     * @param localFiles holds references to files stored in distributed cache
+     * @return Path instance to first matched file or null
+     * */
+    public Path findByContainsInName(String aPartOfName, URI[] localFiles){
+        for(URI distCacheFile : localFiles){
+            log.debug("find a file in distributed cache by part of name {}", 
aPartOfName);
+            if(distCacheFile!=null && 
distCacheFile.toString().contains(aPartOfName)){
+                log.debug("found a file [{}] using a part of name[{}]", 
distCacheFile.toString(), aPartOfName);
+                return new Path(distCacheFile.getPath());
+            }
+        }
+        return null;
+    }
+
+}
diff --git 
a/mrlegacy/src/main/java/org/apache/mahout/vectorizer/DictionaryVectorizer.java 
b/mrlegacy/src/main/java/org/apache/mahout/vectorizer/DictionaryVectorizer.java
index 99ef019..64e5a67 100644
--- 
a/mrlegacy/src/main/java/org/apache/mahout/vectorizer/DictionaryVectorizer.java
+++ 
b/mrlegacy/src/main/java/org/apache/mahout/vectorizer/DictionaryVectorizer.java
@@ -17,11 +17,6 @@
 
 package org.apache.mahout.vectorizer;
 
-import java.io.IOException;
-import java.net.URI;
-import java.util.Collection;
-import java.util.List;
-
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.io.Closeables;
@@ -29,11 +24,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.filecache.DistributedCache;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.*;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
@@ -59,6 +50,10 @@ import org.apache.mahout.vectorizer.term.TermCountReducer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+
 /**
  * This class converts a set of input documents in the sequence file format to 
vectors. The Sequence file
  * input should have a {@link Text} key containing the unique document 
identifier and a {@link StringTuple}
@@ -73,7 +68,7 @@ public final class DictionaryVectorizer extends AbstractJob 
implements Vectorize
   public static final String MAX_NGRAMS = "max.ngrams";
   public static final int DEFAULT_MIN_SUPPORT = 2;
   
-  private static final String DICTIONARY_FILE = "dictionary.file-";
+  public static final String DICTIONARY_FILE = "dictionary.file-";
   private static final int MAX_CHUNKSIZE = 10000;
   private static final int MIN_CHUNKSIZE = 100;
   private static final String OUTPUT_FILES_PATTERN = "part-*";
@@ -301,8 +296,8 @@ public final class DictionaryVectorizer extends AbstractJob 
implements Vectorize
     conf.setInt(PartialVectorMerger.DIMENSION, dimension);
     conf.setBoolean(PartialVectorMerger.SEQUENTIAL_ACCESS, sequentialAccess);
     conf.setBoolean(PartialVectorMerger.NAMED_VECTOR, namedVectors);
-    conf.setInt(MAX_NGRAMS, maxNGramSize);   
-    DistributedCache.setCacheFiles(new URI[] {dictionaryFilePath.toUri()}, 
conf);
+    conf.setInt(MAX_NGRAMS, maxNGramSize);
+    DistributedCache.addCacheFile(dictionaryFilePath.toUri(), conf);
     
     Job job = new Job(conf);
     job.setJobName("DictionaryVectorizer::MakePartialVectors: input-folder: " 
+ input
diff --git 
a/mrlegacy/src/main/java/org/apache/mahout/vectorizer/term/TFPartialVectorReducer.java
 
b/mrlegacy/src/main/java/org/apache/mahout/vectorizer/term/TFPartialVectorReducer.java
index 634b335..6b62a23 100644
--- 
a/mrlegacy/src/main/java/org/apache/mahout/vectorizer/term/TFPartialVectorReducer.java
+++ 
b/mrlegacy/src/main/java/org/apache/mahout/vectorizer/term/TFPartialVectorReducer.java
@@ -19,6 +19,7 @@ package org.apache.mahout.vectorizer.term;
 
 import com.google.common.io.Closeables;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.filecache.DistributedCache;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.Text;
@@ -26,21 +27,18 @@ import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapreduce.Reducer;
 import org.apache.lucene.analysis.shingle.ShingleFilter;
 import org.apache.lucene.analysis.tokenattributes.CharTermAttribute;
-import org.apache.mahout.common.HadoopUtil;
 import org.apache.mahout.common.Pair;
 import org.apache.mahout.common.StringTuple;
 import org.apache.mahout.common.iterator.sequencefile.SequenceFileIterable;
 import org.apache.mahout.common.lucene.IteratorTokenStream;
-import org.apache.mahout.math.NamedVector;
-import org.apache.mahout.math.RandomAccessSparseVector;
-import org.apache.mahout.math.SequentialAccessSparseVector;
-import org.apache.mahout.math.Vector;
-import org.apache.mahout.math.VectorWritable;
+import org.apache.mahout.math.*;
 import org.apache.mahout.math.map.OpenObjectIntHashMap;
+import org.apache.mahout.util.DistributedCacheFileLocator;
 import org.apache.mahout.vectorizer.DictionaryVectorizer;
 import org.apache.mahout.vectorizer.common.PartialVectorMerger;
 
 import java.io.IOException;
+import java.net.URI;
 import java.util.Iterator;
 
 /**
@@ -120,8 +118,8 @@ public class TFPartialVectorReducer extends Reducer<Text, 
StringTuple, Text, Vec
     namedVector = conf.getBoolean(PartialVectorMerger.NAMED_VECTOR, false);
     maxNGramSize = conf.getInt(DictionaryVectorizer.MAX_NGRAMS, maxNGramSize);
 
-    //MAHOUT-1247
-    Path dictionaryFile = HadoopUtil.getSingleCachedFile(conf);
+    URI[] localFiles = DistributedCache.getCacheFiles(conf);
+    Path dictionaryFile = new 
DistributedCacheFileLocator().findByContainsInName(DictionaryVectorizer.DICTIONARY_FILE,
 localFiles);
     // key is word value is id
     for (Pair<Writable, IntWritable> record
             : new SequenceFileIterable<Writable, IntWritable>(dictionaryFile, 
true, conf)) {
diff --git 
a/mrlegacy/src/main/java/org/apache/mahout/vectorizer/tfidf/TFIDFConverter.java 
b/mrlegacy/src/main/java/org/apache/mahout/vectorizer/tfidf/TFIDFConverter.java
index db602dd..4586d77 100644
--- 
a/mrlegacy/src/main/java/org/apache/mahout/vectorizer/tfidf/TFIDFConverter.java
+++ 
b/mrlegacy/src/main/java/org/apache/mahout/vectorizer/tfidf/TFIDFConverter.java
@@ -17,10 +17,6 @@
 
 package org.apache.mahout.vectorizer.tfidf;
 
-import java.io.IOException;
-import java.net.URI;
-import java.util.List;
-
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.io.Closeables;
@@ -47,6 +43,9 @@ import 
org.apache.mahout.vectorizer.common.PartialVectorMerger;
 import org.apache.mahout.vectorizer.term.TermDocumentCountMapper;
 import org.apache.mahout.vectorizer.term.TermDocumentCountReducer;
 
+import java.io.IOException;
+import java.util.List;
+
 /**
  * This class converts a set of input vectors with term frequencies to TfIdf 
vectors. The Sequence file input
  * should have a {@link org.apache.hadoop.io.WritableComparable} key 
containing and a
@@ -64,7 +63,7 @@ public final class TFIDFConverter {
   //public static final String TFIDF_OUTPUT_FOLDER = "tfidf";
 
   private static final String DOCUMENT_VECTOR_OUTPUT_FOLDER = "tfidf-vectors";
-  private static final String FREQUENCY_FILE = "frequency.file-";
+  public static final String FREQUENCY_FILE = "frequency.file-";
   private static final int MAX_CHUNKSIZE = 10000;
   private static final int MIN_CHUNKSIZE = 100;
   private static final String OUTPUT_FILES_PATTERN = "part-*";
@@ -299,7 +298,7 @@ public final class TFIDFConverter {
     conf.setLong(MAX_DF, maxDF);
     conf.setBoolean(PartialVectorMerger.SEQUENTIAL_ACCESS, sequentialAccess);
     conf.setBoolean(PartialVectorMerger.NAMED_VECTOR, namedVector);
-    DistributedCache.setCacheFiles(new URI[] {dictionaryFilePath.toUri()}, 
conf);
+    DistributedCache.addCacheFile(dictionaryFilePath.toUri(), conf);
 
     Job job = new Job(conf);
     job.setJobName(": MakePartialVectors: input-folder: " + input + ", 
dictionary-file: "
diff --git 
a/mrlegacy/src/main/java/org/apache/mahout/vectorizer/tfidf/TFIDFPartialVectorReducer.java
 
b/mrlegacy/src/main/java/org/apache/mahout/vectorizer/tfidf/TFIDFPartialVectorReducer.java
index ebb4d2b..1be630e 100644
--- 
a/mrlegacy/src/main/java/org/apache/mahout/vectorizer/tfidf/TFIDFPartialVectorReducer.java
+++ 
b/mrlegacy/src/main/java/org/apache/mahout/vectorizer/tfidf/TFIDFPartialVectorReducer.java
@@ -18,9 +18,11 @@
 package org.apache.mahout.vectorizer.tfidf;
 
 import java.io.IOException;
+import java.net.URI;
 import java.util.Iterator;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.filecache.DistributedCache;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.LongWritable;
@@ -35,6 +37,7 @@ import org.apache.mahout.math.SequentialAccessSparseVector;
 import org.apache.mahout.math.Vector;
 import org.apache.mahout.math.VectorWritable;
 import org.apache.mahout.math.map.OpenIntLongHashMap;
+import org.apache.mahout.util.DistributedCacheFileLocator;
 import org.apache.mahout.vectorizer.TFIDF;
 import org.apache.mahout.vectorizer.common.PartialVectorMerger;
 
@@ -106,7 +109,8 @@ public class TFIDFPartialVectorReducer extends
     sequentialAccess = conf.getBoolean(PartialVectorMerger.SEQUENTIAL_ACCESS, 
false);
     namedVector = conf.getBoolean(PartialVectorMerger.NAMED_VECTOR, false);
 
-    Path dictionaryFile = HadoopUtil.getSingleCachedFile(conf);
+    URI[] localFiles = DistributedCache.getCacheFiles(conf);
+    Path dictionaryFile = new 
DistributedCacheFileLocator().findByContainsInName(TFIDFConverter.FREQUENCY_FILE,
 localFiles);
     // key is feature, value is the document frequency
     for (Pair<IntWritable,LongWritable> record 
          : new SequenceFileIterable<IntWritable,LongWritable>(dictionaryFile, 
true, conf)) {
diff --git 
a/mrlegacy/src/test/java/org/apache/mahout/util/DistributedCacheFileLocatorTest.java
 
b/mrlegacy/src/test/java/org/apache/mahout/util/DistributedCacheFileLocatorTest.java
new file mode 100644
index 0000000..d9cd7d1
--- /dev/null
+++ 
b/mrlegacy/src/test/java/org/apache/mahout/util/DistributedCacheFileLocatorTest.java
@@ -0,0 +1,38 @@
+package org.apache.mahout.util;
+
+import org.apache.hadoop.fs.Path;
+import org.junit.Test;
+
+import java.io.File;
+import java.net.URI;
+
+/**
+ * User: sergey.sheypak
+ * Date: 18.05.14
+ * Time: 16:50
+ */
+public class DistributedCacheFileLocatorTest extends org.junit.Assert {
+
+    public static final File FILE_I_WANT_TO_FIND = new 
File("file/i_want_to_find.txt");
+
+    @Test
+    public void testFindNothing(){
+        Path nullPath = aSut().findByContainsInName("there is no such file", 
distributedCacheFiles());
+        assertNull(nullPath);
+    }
+
+    @Test
+    public void testFindFile(){
+        Path path = aSut().findByContainsInName("want_to_find", 
distributedCacheFiles());
+        assertNotNull(path);
+        assertEquals(FILE_I_WANT_TO_FIND.getName(), path.getName());
+    }
+
+    private DistributedCacheFileLocator aSut(){
+        return new DistributedCacheFileLocator();
+    }
+
+    private URI[] distributedCacheFiles(){
+        return new URI[]{new File("/first/file").toURI(), new 
File("/second/file").toURI(), FILE_I_WANT_TO_FIND.toURI()};
+    }
+}
-- 
1.7.10.4


> DistributedCache.setCacheFiles in DictionaryVectorizer overwrites jars pushed 
> using oozie
> -----------------------------------------------------------------------------------------
>
>                 Key: MAHOUT-1498
>                 URL: https://issues.apache.org/jira/browse/MAHOUT-1498
>             Project: Mahout
>          Issue Type: Bug
>    Affects Versions: 0.7
>         Environment: mahout-core-0.7-cdh4.4.0.jar
>            Reporter: Sergey
>              Labels: patch
>             Fix For: 1.0
>
>
> Hi, I get exception 
> {code}
> <<< Invocation of Main class completed <<<
> Failing Oozie Launcher, Main class 
> [org.apache.mahout.vectorizer.SparseVectorsFromSequenceFiles], main() threw 
> exception, Job failed!
> java.lang.IllegalStateException: Job failed!
> at 
> org.apache.mahout.vectorizer.DictionaryVectorizer.makePartialVectors(DictionaryVectorizer.java:329)
> at 
> org.apache.mahout.vectorizer.DictionaryVectorizer.createTermFrequencyVectors(DictionaryVectorizer.java:199)
> at 
> org.apache.mahout.vectorizer.SparseVectorsFromSequenceFiles.run(SparseVectorsFromSequenceFiles.java:271)
> {code}
> The root cause is:
> {code}
> Error: java.lang.ClassNotFoundException: org.apache.mahout.math.Vector
> at java.net.URLClassLoader$1.run(URLClassLoader.java:202)
> at java.security.AccessController.doPrivileged(Native Method)
> at java.net.URLClassLoader.findClass(URLClassLoader.java:190)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:306)
> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:301)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:247)
> at java.lang.Class.forName0(Native Method)
> at java.lang.Class.forName(Class.java:247
> {code}
> Looks like it happens because of 
> DictionaryVectorizer.makePartialVectors method.
> It has code:
> {code}
> DistributedCache.setCacheFiles(new URI[] {dictionaryFilePath.toUri()}, conf);
> {code}
> which overrides jars pushed with job by oozie:
> {code}
> public static void More ...setCacheFiles(URI[] files, Configuration conf) {
>          String sfiles = StringUtils.uriToString(files);
>          conf.set("mapred.cache.files", sfiles);
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.2#6252)

Reply via email to