Repository: kylin Updated Branches: refs/heads/master 28ba1eaea -> 58224921d
refine mapper and reducer log Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/58224921 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/58224921 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/58224921 Branch: refs/heads/master Commit: 58224921d896e4479f5d034d43c044aacaf14200 Parents: 28ba1ea Author: Hongbin Ma <mahong...@apache.org> Authored: Thu Dec 1 18:15:46 2016 +0800 Committer: Hongbin Ma <mahong...@apache.org> Committed: Thu Dec 1 18:15:46 2016 +0800 ---------------------------------------------------------------------- .../java/org/apache/kylin/engine/mr/KylinMapper.java | 12 +++++++++--- .../java/org/apache/kylin/engine/mr/KylinReducer.java | 13 ++++++++++--- .../apache/kylin/engine/mr/steps/CuboidReducer.java | 11 ++++------- .../kylin/engine/mr/steps/HiveToBaseCuboidMapper.java | 6 ------ .../kylin/engine/mr/steps/InMemCuboidMapper.java | 7 +------ .../kylin/engine/mr/steps/InMemCuboidReducer.java | 10 +++++----- .../apache/kylin/engine/mr/steps/NDCuboidMapper.java | 10 ++++------ 7 files changed, 33 insertions(+), 36 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/58224921/engine-mr/src/main/java/org/apache/kylin/engine/mr/KylinMapper.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/KylinMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/KylinMapper.java index a01f7a2..2b564e9 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/KylinMapper.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/KylinMapper.java @@ -22,6 +22,7 @@ import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.Mapper; +import org.apache.kylin.engine.mr.common.BatchConstants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -30,6 +31,8 @@ import org.slf4j.LoggerFactory; public class KylinMapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> extends Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> { private static final Logger logger = LoggerFactory.getLogger(KylinMapper.class); + protected int mapCounter = 0; + protected void bindCurrentConfiguration(Configuration conf) { logger.info("The conf for current mapper will be " + System.identityHashCode(conf)); HadoopUtil.setCurrentConfiguration(conf); @@ -38,6 +41,9 @@ public class KylinMapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> extends Mapper<KEYIN, @Override final public void map(KEYIN key, VALUEIN value, Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException { try { + if (mapCounter++ % BatchConstants.NORMAL_RECORD_LOG_THRESHOLD == 0) { + logger.info("Accepting Mapper Key with ordinal: " + mapCounter); + } doMap(key, value, context); } catch (IOException ex) { // KYLIN-2170 logger.error("", ex); @@ -53,11 +59,11 @@ public class KylinMapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> extends Mapper<KEYIN, throw ex; } } - + protected void doMap(KEYIN key, VALUEIN value, Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException { super.map(key, value, context); } - + @Override final protected void cleanup(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException { try { @@ -76,7 +82,7 @@ public class KylinMapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> extends Mapper<KEYIN, throw ex; } } - + protected void doCleanup(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException { } } http://git-wip-us.apache.org/repos/asf/kylin/blob/58224921/engine-mr/src/main/java/org/apache/kylin/engine/mr/KylinReducer.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/KylinReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/KylinReducer.java index 2b63ce0..cb2d7a7 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/KylinReducer.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/KylinReducer.java @@ -22,6 +22,7 @@ import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.Reducer; +import org.apache.kylin.engine.mr.common.BatchConstants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -29,7 +30,9 @@ import org.slf4j.LoggerFactory; */ public class KylinReducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> extends Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> { private static final Logger logger = LoggerFactory.getLogger(KylinReducer.class); - + + protected int reduceCounter = 0; + protected void bindCurrentConfiguration(Configuration conf) { HadoopUtil.setCurrentConfiguration(conf); } @@ -37,6 +40,10 @@ public class KylinReducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> extends Reducer<KEYI @Override final public void reduce(KEYIN key, Iterable<VALUEIN> values, Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException { try { + if (reduceCounter++ % BatchConstants.NORMAL_RECORD_LOG_THRESHOLD == 0) { + logger.info("Accepting Mapper Key with ordinal: " + reduceCounter); + } + doReduce(key, values, context); } catch (IOException ex) { // KYLIN-2170 logger.error("", ex); @@ -52,11 +59,11 @@ public class KylinReducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> extends Reducer<KEYI throw ex; } } - + protected void doReduce(KEYIN key, Iterable<VALUEIN> values, Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException { super.reduce(key, values, context); } - + @Override final protected void cleanup(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException { try { http://git-wip-us.apache.org/repos/asf/kylin/blob/58224921/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidReducer.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidReducer.java index 9543f0a..b1d4aaa 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidReducer.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidReducer.java @@ -50,11 +50,11 @@ public class CuboidReducer extends KylinReducer<Text, Text, Text, Text> { private BufferedMeasureCodec codec; private MeasureAggregators aggs; - private int counter; private int cuboidLevel; private boolean[] needAggr; private Object[] input; private Object[] result; + private int vcounter; private Text outputValue = new Text(); @@ -90,6 +90,9 @@ public class CuboidReducer extends KylinReducer<Text, Text, Text, Text> { aggs.reset(); for (Text value : values) { + if (vcounter++ % BatchConstants.NORMAL_RECORD_LOG_THRESHOLD == 0) { + logger.info("Handling value with ordinal: " + vcounter); + } codec.decode(ByteBuffer.wrap(value.getBytes(), 0, value.getLength()), input); if (cuboidLevel > 0) { aggs.aggregate(input, needAggr); @@ -103,11 +106,5 @@ public class CuboidReducer extends KylinReducer<Text, Text, Text, Text> { outputValue.set(valueBuf.array(), 0, valueBuf.position()); context.write(key, outputValue); - - counter++; - if (counter % BatchConstants.NORMAL_RECORD_LOG_THRESHOLD == 0) { - logger.info("Handled " + counter + " records!"); - } } - } http://git-wip-us.apache.org/repos/asf/kylin/blob/58224921/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/HiveToBaseCuboidMapper.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/HiveToBaseCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/HiveToBaseCuboidMapper.java index 9fa20ae..428f878 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/HiveToBaseCuboidMapper.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/HiveToBaseCuboidMapper.java @@ -22,7 +22,6 @@ import java.io.IOException; import org.apache.kylin.engine.mr.IMRInput.IMRTableInputFormat; import org.apache.kylin.engine.mr.MRUtil; -import org.apache.kylin.engine.mr.common.BatchConstants; /** * @author George Song (ysong1) @@ -39,11 +38,6 @@ public class HiveToBaseCuboidMapper<KEYIN> extends BaseCuboidMapperBase<KEYIN, O @Override public void doMap(KEYIN key, Object value, Context context) throws IOException, InterruptedException { - counter++; - if (counter % BatchConstants.NORMAL_RECORD_LOG_THRESHOLD == 0) { - logger.info("Handled " + counter + " records!"); - } - String[] row = flatTableInputFormat.parseMapperInput(value); try { outputKV(row, context); http://git-wip-us.apache.org/repos/asf/kylin/blob/58224921/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java index 15bfd2e..116d5e0 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java @@ -64,7 +64,6 @@ public class InMemCuboidMapper<KEYIN> extends KylinMapper<KEYIN, Object, ByteArr private CubeSegment cubeSegment; private IMRTableInputFormat flatTableInputFormat; - private int counter; private BlockingQueue<List<String>> queue = new ArrayBlockingQueue<List<String>>(64); private Future<?> future; @@ -120,10 +119,6 @@ public class InMemCuboidMapper<KEYIN> extends KylinMapper<KEYIN, Object, ByteArr while (!future.isDone()) { if (queue.offer(rowAsList, 1, TimeUnit.SECONDS)) { - counter++; - if (counter % BatchConstants.NORMAL_RECORD_LOG_THRESHOLD == 0) { - logger.info("Handled " + counter + " records!"); - } break; } } @@ -131,7 +126,7 @@ public class InMemCuboidMapper<KEYIN> extends KylinMapper<KEYIN, Object, ByteArr @Override protected void doCleanup(Context context) throws IOException, InterruptedException { - logger.info("Totally handled " + counter + " records!"); + logger.info("Totally handled " + mapCounter + " records!"); while (!future.isDone()) { if (queue.offer(Collections.<String> emptyList(), 1, TimeUnit.SECONDS)) { http://git-wip-us.apache.org/repos/asf/kylin/blob/58224921/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java index d0a7062..04c9e90 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java @@ -46,10 +46,11 @@ public class InMemCuboidReducer extends KylinReducer<ByteArrayWritable, ByteArra private BufferedMeasureCodec codec; private MeasureAggregators aggs; - private int counter; private Object[] input; private Object[] result; + private int vcounter; + private Text outputKey; private Text outputValue; @@ -78,6 +79,9 @@ public class InMemCuboidReducer extends KylinReducer<ByteArrayWritable, ByteArra aggs.reset(); for (ByteArrayWritable value : values) { + if (vcounter++ % BatchConstants.NORMAL_RECORD_LOG_THRESHOLD == 0) { + logger.info("Handling value with ordinal: " + vcounter); + } codec.decode(value.asBuffer(), input); aggs.aggregate(input); } @@ -92,10 +96,6 @@ public class InMemCuboidReducer extends KylinReducer<ByteArrayWritable, ByteArra context.write(outputKey, outputValue); - counter++; - if (counter % BatchConstants.NORMAL_RECORD_LOG_THRESHOLD == 0) { - logger.info("Handled " + counter + " records!"); - } } } http://git-wip-us.apache.org/repos/asf/kylin/blob/58224921/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java index 8107e52..01cdd4a 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java @@ -129,18 +129,16 @@ public class NDCuboidMapper extends KylinMapper<Text, Text, Text, Text> { // if still empty or null if (myChildren == null || myChildren.size() == 0) { context.getCounter(BatchConstants.MAPREDUCE_COUNTER_GROUP_NAME, "Skipped records").increment(1L); - skipCounter++; - if (skipCounter % BatchConstants.NORMAL_RECORD_LOG_THRESHOLD == 0) { - logger.info("Skipped " + skipCounter + " records!"); + if (skipCounter++ % BatchConstants.NORMAL_RECORD_LOG_THRESHOLD == 0) { + logger.info("Skipping record with ordinal: " + skipCounter); } return; } context.getCounter(BatchConstants.MAPREDUCE_COUNTER_GROUP_NAME, "Processed records").increment(1L); - handleCounter++; - if (handleCounter % BatchConstants.NORMAL_RECORD_LOG_THRESHOLD == 0) { - logger.info("Handled " + handleCounter + " records!"); + if (handleCounter++ % BatchConstants.NORMAL_RECORD_LOG_THRESHOLD == 0) { + logger.info("Handling record with ordinal: " + handleCounter); } for (Long child : myChildren) {