I have realized that the |collect| method is defined in an interface. Is
this a reason to not intercept the call? Or am I missing something in
the configuration?
|package org.apache.hadoop.mapred; import java.io.IOException; import
org.apache.hadoop.classification.InterfaceAudience.Public; import
org.apache.hadoop.classification.InterfaceStability.Stable; @Public
@Stable public interface OutputCollector<K, V> { void collect(K var1, V
var2) throws IOException; } |
On 09/09/2015 11:04 AM, xeonmailinglist wrote:
Hi,
I am trying to use AspectJ with an MapReduce example, although I am
not understanding one thing. But first, let me give you the code that
I have.
[1] Wordcount example
|package org.apache.hadoop.mapred.examples; import
org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable; import
org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.*; import
java.io.IOException; import java.util.Iterator; import
java.util.StringTokenizer; /** * Common Wordcount example */ public
class WordCount { public static class Map extends MapReduceBase
implements Mapper<LongWritable, Text, Text, IntWritable> { private
final static IntWritable one = new IntWritable(1); private Text word =
new Text(); public void map(LongWritable key, Text value,
OutputCollector<Text, IntWritable> output, Reporter reporter) throws
IOException { String line = value.toString(); StringTokenizer
tokenizer = new StringTokenizer(line); while
(tokenizer.hasMoreTokens()) { word.set(tokenizer.nextToken());
output.collect(word, one); } } } public static class Reduce extends
MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable>
{ public void reduce(Text key, Iterator<IntWritable> values,
OutputCollector<Text, IntWritable> output, Reporter reporter) throws
IOException { int sum = 0; while (values.hasNext()) { sum +=
values.next().get(); } output.collect(key, new IntWritable(sum)); } }
public static void main(String[] args) throws Exception { JobConf conf
= new JobConf(WordCount.class); conf.setJobName("wordcount");
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
conf.setMapperClass(Map.class); conf.setCombinerClass(Reduce.class);
conf.setReducerClass(Reduce.class); conf.setNumReduceTasks(2);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
FileInputFormat.setInputPaths(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(args[1]));
JobClient.runJob(conf); } } |
[2] My mapreduce aspects
|package org.apache.hadoop.mapred.aspects; import
org.aspectj.lang.JoinPoint; import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Before; @Aspect public class
MapReduceAspects { @Before("execution(* map(..))") public void
mymap(JoinPoint joinPoint) { System.out.println("My Map Execution: " +
joinPoint.getArgs() + ":" + joinPoint.getTarget()); Object[] obj =
joinPoint.getArgs(); for (Object o : obj){
System.out.println(o.toString()); } } @Before("execution(*
reduce(..))") public void myreduce() { System.out.println("My Reduce
Execution"); } @Before("execution(* collect(..))") public void
updatehash(JoinPoint joinPoint) { System.out.println("Output collect:
Args: " + joinPoint.getArgs()); } } |
[3] bean-aspects.xml
|<?xml version="1.0" encoding="UTF-8"?> <beans
xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:p="http://www.springframework.org/schema/p"
xmlns:aop="http://www.springframework.org/schema/aop"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:jee="http://www.springframework.org/schema/jee"
xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:task="http://www.springframework.org/schema/task"
xsi:schemaLocation="http://www.springframework.org/schema/aop
http://www.springframework.org/schema/aop/spring-aop-3.2.xsd
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.2.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spr ing-context-3.2.xsd
http://www.springframework.org/schema/jee
http://www.springframework.org/schema/jee/spring-jee-3.2.xsd
http://www.springframework.org/schema/tx
http://www.springframework.org/schema/tx/spring-tx-3.2.xsd
http://www.springframework.org/schema/task
http://www.springframework.org/schema/task/spring-task-3.2.xsd">
<aop:aspectj-autoproxy proxy-target-class="true"> <aop:include
name="mapreduceAspect"/> </aop:aspectj-autoproxy> <bean
id="mapreduceAspect"
class="org.apache.hadoop.mapred.aspects.MapReduceAspects"/> </beans> |
In [1], I have an wordcount example with |map| and |reduce| function.
When I launch my application in the MapReduce framework, the framework
will create a job that will execute the |map| and |reduce| functions.
The |map| function accepts an input dir, and the |reduce| function
outputs the result.
I can intercept the |map| and |reduce| function calls with AspectJ,
but I can’t intercept the |collect| call in the instruction
|output.collect(word, one)| that is in the |map| function. Why this
happens? Didn`t I configure the Aspects correctly?
I would love if anyone could explain me why AspectJ is behaving like this.
Thanks,
_______________________________________________
aspectj-users mailing list
aspectj-users@eclipse.org
To change your delivery options, retrieve your password, or unsubscribe from
this list, visit
https://dev.eclipse.org/mailman/listinfo/aspectj-users