Repository: parquet-mr Updated Branches: refs/heads/master 7987a544c -> 4fd34e651
PARQUET-220: Unnecessary warning in ParquetRecordReader.initialize Rather than querying the COUNTER_METHOD up front, the counter method is resolved per object. This allows us to use the 'getCounter' method on any TaskAttemptContext with the correct signature (ignoring versions where TaskAttemptContext does not have an appropriate method/signature - preserving current behavior). Author: Reuben Kuhnert <[email protected]> Closes #280 from sircodesalotOfTheRound/context-utils-parquet-220 and squashes the following commits: f118990 [Reuben Kuhnert] PARQUET-220: Unnecessary warning in ParquetRecordReader.initialize Project: http://git-wip-us.apache.org/repos/asf/parquet-mr/repo Commit: http://git-wip-us.apache.org/repos/asf/parquet-mr/commit/4fd34e65 Tree: http://git-wip-us.apache.org/repos/asf/parquet-mr/tree/4fd34e65 Diff: http://git-wip-us.apache.org/repos/asf/parquet-mr/diff/4fd34e65 Branch: refs/heads/master Commit: 4fd34e6517f2c400a06e3c1d43ec56df2ff5c392 Parents: 7987a54 Author: Reuben Kuhnert <[email protected]> Authored: Mon Dec 5 17:01:38 2016 -0800 Committer: Julien Le Dem <[email protected]> Committed: Mon Dec 5 17:01:38 2016 -0800 ---------------------------------------------------------------------- .../parquet/hadoop/ParquetRecordReader.java | 10 ++-- .../apache/parquet/hadoop/util/ContextUtil.java | 57 +++++++++++++++----- .../hadoop/util/counters/BenchmarkCounter.java | 4 +- .../mapreduce/MapReduceCounterLoader.java | 5 +- .../hadoop/example/TestInputOutputFormat.java | 1 + pom.xml | 1 + 6 files changed, 58 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/4fd34e65/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetRecordReader.java ---------------------------------------------------------------------- diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetRecordReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetRecordReader.java index f2f656d..ebdc686 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetRecordReader.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetRecordReader.java @@ -135,11 +135,13 @@ public class ParquetRecordReader<T> extends RecordReader<Void, T> { @Override public void initialize(InputSplit inputSplit, TaskAttemptContext context) throws IOException, InterruptedException { - if (context instanceof TaskInputOutputContext<?, ?, ?, ?>) { - BenchmarkCounter.initCounterFromContext((TaskInputOutputContext<?, ?, ?, ?>) context); + + if (ContextUtil.hasCounterMethod(context)) { + BenchmarkCounter.initCounterFromContext(context); } else { - LOG.error("Can not initialize counter due to context is not a instance of TaskInputOutputContext, but is " - + context.getClass().getCanonicalName()); + LOG.error( + String.format("Can not initialize counter because the class '%s' does not have a '.getCounterMethod'", + context.getClass().getCanonicalName())); } initializeInternalReader(toParquetSplit(inputSplit), ContextUtil.getConfiguration(context)); http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/4fd34e65/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/ContextUtil.java ---------------------------------------------------------------------- diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/ContextUtil.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/ContextUtil.java index 106fb0c..b2fec1b 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/ContextUtil.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/ContextUtil.java @@ -22,6 +22,8 @@ import java.lang.reflect.Constructor; import java.lang.reflect.Field; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; +import java.util.HashMap; +import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.Counter; @@ -61,9 +63,10 @@ public class ContextUtil { private static final Field WRAPPED_CONTEXT_FIELD; private static final Method GET_CONFIGURATION_METHOD; - private static final Method GET_COUNTER_METHOD; private static final Method INCREMENT_COUNTER_METHOD; + private static final Map<Class, Method> COUNTER_METHODS_BY_CLASS = new HashMap<Class, Method>(); + static { boolean v21 = true; final String PACKAGE = "org.apache.hadoop.mapreduce"; @@ -140,15 +143,20 @@ public class ContextUtil { WRAPPED_CONTEXT_FIELD = innerMapContextCls.getDeclaredField("mapContext"); WRAPPED_CONTEXT_FIELD.setAccessible(true); - Method get_counter_method; try { - get_counter_method = Class.forName(PACKAGE + ".TaskAttemptContext").getMethod("getCounter", String.class, - String.class); - } catch (Exception e) { - get_counter_method = Class.forName(PACKAGE + ".TaskInputOutputContext").getMethod("getCounter", - String.class, String.class); + Class<?> taskAttemptContextClass = Class.forName(PACKAGE + ".TaskAttemptContext"); + Method getCounterMethodForTaskAttemptContext + = taskAttemptContextClass.getMethod("getCounter", String.class, String.class); + + COUNTER_METHODS_BY_CLASS.put(taskAttemptContextClass, getCounterMethodForTaskAttemptContext); + + } catch (ClassNotFoundException e) { + Class<?> taskInputOutputContextClass = Class.forName(PACKAGE + ".TaskInputOutputContext"); + Method getCounterMethodForTaskInputOutputContextClass = + taskInputOutputContextClass.getMethod("getCounter", String.class, String.class); + + COUNTER_METHODS_BY_CLASS.put(taskInputOutputContextClass, getCounterMethodForTaskInputOutputContextClass); } - GET_COUNTER_METHOD=get_counter_method; } else { MAP_CONTEXT_CONSTRUCTOR = innerMapContextCls.getConstructor(mapCls, @@ -161,7 +169,8 @@ public class ContextUtil { InputSplit.class); MAP_CONTEXT_IMPL_CONSTRUCTOR = null; WRAPPED_CONTEXT_FIELD = null; - GET_COUNTER_METHOD=taskIOContextCls.getMethod("getCounter", String.class, String.class); + + COUNTER_METHODS_BY_CLASS.put(taskIOContextCls, taskIOContextCls.getMethod("getCounter", String.class, String.class)); } MAP_CONTEXT_CONSTRUCTOR.setAccessible(true); READER_FIELD = mapContextCls.getDeclaredField("reader"); @@ -251,9 +260,33 @@ public class ContextUtil { } } - public static Counter getCounter(TaskInputOutputContext context, - String groupName, String counterName) { - return (Counter) invoke(GET_COUNTER_METHOD, context, groupName, counterName); + public static Counter getCounter(TaskAttemptContext context, String groupName, String counterName) { + Method counterMethod = findCounterMethod(context); + return (Counter)invoke(counterMethod, context, groupName, counterName); + } + + public static boolean hasCounterMethod(TaskAttemptContext context) { + return findCounterMethod(context) != null; + } + + private static Method findCounterMethod(TaskAttemptContext context) { + if (context != null) { + if (COUNTER_METHODS_BY_CLASS.containsKey(context.getClass())) { + return COUNTER_METHODS_BY_CLASS.get(context.getClass()); + } + + try { + Method method = context.getClass().getMethod("getCounter", String.class, String.class); + if (method.getReturnType().isAssignableFrom(Counter.class)) { + COUNTER_METHODS_BY_CLASS.put(context.getClass(), method); + return method; + } + } catch (NoSuchMethodException e) { + return null; + } + } + + return null; } /** http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/4fd34e65/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/counters/BenchmarkCounter.java ---------------------------------------------------------------------- diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/counters/BenchmarkCounter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/counters/BenchmarkCounter.java index e537783..b8521b3 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/counters/BenchmarkCounter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/counters/BenchmarkCounter.java @@ -20,7 +20,7 @@ package org.apache.parquet.hadoop.util.counters; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapred.Reporter; -import org.apache.hadoop.mapreduce.TaskInputOutputContext; +import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.parquet.hadoop.util.counters.mapred.MapRedCounterLoader; import org.apache.parquet.hadoop.util.counters.mapreduce.MapReduceCounterLoader; @@ -48,7 +48,7 @@ public class BenchmarkCounter { * * @param context */ - public static void initCounterFromContext(TaskInputOutputContext<?, ?, ?, ?> context) { + public static void initCounterFromContext(TaskAttemptContext context) { counterLoader = new MapReduceCounterLoader(context); loadCounters(); } http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/4fd34e65/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/counters/mapreduce/MapReduceCounterLoader.java ---------------------------------------------------------------------- diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/counters/mapreduce/MapReduceCounterLoader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/counters/mapreduce/MapReduceCounterLoader.java index 1540f03..1bf4b97 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/counters/mapreduce/MapReduceCounterLoader.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/counters/mapreduce/MapReduceCounterLoader.java @@ -18,6 +18,7 @@ */ package org.apache.parquet.hadoop.util.counters.mapreduce; +import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.TaskInputOutputContext; import org.apache.parquet.hadoop.util.ContextUtil; import org.apache.parquet.hadoop.util.counters.BenchmarkCounter; @@ -30,9 +31,9 @@ import org.apache.parquet.hadoop.util.counters.ICounter; * @author Tianshuo Deng */ public class MapReduceCounterLoader implements CounterLoader { - private TaskInputOutputContext<?, ?, ?, ?> context; + private TaskAttemptContext context; - public MapReduceCounterLoader(TaskInputOutputContext<?, ?, ?, ?> context) { + public MapReduceCounterLoader(TaskAttemptContext context) { this.context = context; } http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/4fd34e65/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/example/TestInputOutputFormat.java ---------------------------------------------------------------------- diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/example/TestInputOutputFormat.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/example/TestInputOutputFormat.java index d1b5267..c829dc1 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/example/TestInputOutputFormat.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/example/TestInputOutputFormat.java @@ -329,6 +329,7 @@ public class TestInputOutputFormat { @Test public void testReadWriteWithCounter() throws Exception { runMapReduceJob(CompressionCodecName.GZIP); + assertTrue(value(readJob, "parquet", "bytesread") > 0L); assertTrue(value(readJob, "parquet", "bytestotal") > 0L); assertTrue(value(readJob, "parquet", "bytesread") http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/4fd34e65/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index beb1e93..7d6187d 100644 --- a/pom.xml +++ b/pom.xml @@ -225,6 +225,7 @@ <dumpDetails>true</dumpDetails> <previousVersion>${previous.version}</previousVersion> <excludes> + <exclude>org/apache/parquet/hadoop/util/**</exclude> <exclude>org/apache/parquet/thrift/projection/**</exclude> <exclude>org/apache/parquet/thrift/ThriftSchemaConverter</exclude> <exclude>org/apache/parquet/filter2/**</exclude>
