Author: ssc
Date: Tue Sep 17 05:43:04 2013
New Revision: 1523901
URL: http://svn.apache.org/r1523901
Log:
MAHOUT-1335 MultithreadedSharingMapper fails on Hadoop 2
Modified:
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/MultithreadedSharingMapper.java
Modified:
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/MultithreadedSharingMapper.java
URL:
http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/MultithreadedSharingMapper.java?rev=1523901&r1=1523900&r2=1523901&view=diff
==============================================================================
---
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/MultithreadedSharingMapper.java
(original)
+++
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/MultithreadedSharingMapper.java
Tue Sep 17 05:43:04 2013
@@ -17,7 +17,10 @@
package org.apache.mahout.cf.taste.hadoop.als;
+import com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.map.MultithreadedMapper;
import org.apache.hadoop.util.ReflectionUtils;
@@ -34,17 +37,21 @@ import java.io.IOException;
*/
public class MultithreadedSharingMapper<K1, V1, K2, V2> extends
MultithreadedMapper<K1, V1, K2, V2> {
- private static final String MAPPER_CLASS =
"mapred.map.multithreadedrunner.class";
-
@Override
public void run(Context ctx) throws IOException, InterruptedException {
+ Class<Mapper<K1, V1, K2, V2>> mapperClass =
+ MultithreadedSharingMapper.getMapperClass((JobContext) ctx);
+ Preconditions.checkNotNull(mapperClass, "Could not find Multithreaded
Mapper class.");
Configuration conf = ctx.getConfiguration();
-
- Class<? extends SharingMapper<K1,V1,K2,V2, ?>> mapperClass =
- (Class<SharingMapper<K1,V1,K2,V2, ?>>) conf.getClass(MAPPER_CLASS,
SharingMapper.class);
// instantiate the mapper
- SharingMapper<K1,V1,K2,V2, ?> mapper =
ReflectionUtils.newInstance(mapperClass, conf);
+ Mapper<K1, V1, K2, V2> mapper1 = ReflectionUtils.newInstance(mapperClass,
conf);
+ SharingMapper<K1, V1, K2, V2, ?> mapper = null;
+ if (mapper1 instanceof SharingMapper) {
+ mapper = (SharingMapper<K1, V1, K2, V2, ?>) mapper1;
+ }
+ Preconditions.checkNotNull(mapper, "Could not instantiate SharingMapper.
Class was: %s",
+ mapper1.getClass().getName());
// single threaded call to setup the sharing mapper
mapper.setupSharedInstance(ctx);