Repository: kylin
Updated Branches:
  refs/heads/master 28ba1eaea -> 58224921d


refine mapper and reducer log


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/58224921
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/58224921
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/58224921

Branch: refs/heads/master
Commit: 58224921d896e4479f5d034d43c044aacaf14200
Parents: 28ba1ea
Author: Hongbin Ma <mahong...@apache.org>
Authored: Thu Dec 1 18:15:46 2016 +0800
Committer: Hongbin Ma <mahong...@apache.org>
Committed: Thu Dec 1 18:15:46 2016 +0800

----------------------------------------------------------------------
 .../java/org/apache/kylin/engine/mr/KylinMapper.java   | 12 +++++++++---
 .../java/org/apache/kylin/engine/mr/KylinReducer.java  | 13 ++++++++++---
 .../apache/kylin/engine/mr/steps/CuboidReducer.java    | 11 ++++-------
 .../kylin/engine/mr/steps/HiveToBaseCuboidMapper.java  |  6 ------
 .../kylin/engine/mr/steps/InMemCuboidMapper.java       |  7 +------
 .../kylin/engine/mr/steps/InMemCuboidReducer.java      | 10 +++++-----
 .../apache/kylin/engine/mr/steps/NDCuboidMapper.java   | 10 ++++------
 7 files changed, 33 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/58224921/engine-mr/src/main/java/org/apache/kylin/engine/mr/KylinMapper.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/KylinMapper.java 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/KylinMapper.java
index a01f7a2..2b564e9 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/KylinMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/KylinMapper.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.kylin.engine.mr.common.BatchConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -30,6 +31,8 @@ import org.slf4j.LoggerFactory;
 public class KylinMapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> extends 
Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
     private static final Logger logger = 
LoggerFactory.getLogger(KylinMapper.class);
 
+    protected int mapCounter = 0;
+
     protected void bindCurrentConfiguration(Configuration conf) {
         logger.info("The conf for current mapper will be " + 
System.identityHashCode(conf));
         HadoopUtil.setCurrentConfiguration(conf);
@@ -38,6 +41,9 @@ public class KylinMapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> 
extends Mapper<KEYIN,
     @Override
     final public void map(KEYIN key, VALUEIN value, Mapper<KEYIN, VALUEIN, 
KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {
         try {
+            if (mapCounter++ % BatchConstants.NORMAL_RECORD_LOG_THRESHOLD == 
0) {
+                logger.info("Accepting Mapper Key with ordinal: " + 
mapCounter);
+            }
             doMap(key, value, context);
         } catch (IOException ex) { // KYLIN-2170
             logger.error("", ex);
@@ -53,11 +59,11 @@ public class KylinMapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> 
extends Mapper<KEYIN,
             throw ex;
         }
     }
-    
+
     protected void doMap(KEYIN key, VALUEIN value, Mapper<KEYIN, VALUEIN, 
KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {
         super.map(key, value, context);
     }
-    
+
     @Override
     final protected void cleanup(Mapper<KEYIN, VALUEIN, KEYOUT, 
VALUEOUT>.Context context) throws IOException, InterruptedException {
         try {
@@ -76,7 +82,7 @@ public class KylinMapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> 
extends Mapper<KEYIN,
             throw ex;
         }
     }
-    
+
     protected void doCleanup(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context 
context) throws IOException, InterruptedException {
     }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/58224921/engine-mr/src/main/java/org/apache/kylin/engine/mr/KylinReducer.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/KylinReducer.java 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/KylinReducer.java
index 2b63ce0..cb2d7a7 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/KylinReducer.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/KylinReducer.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.kylin.engine.mr.common.BatchConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -29,7 +30,9 @@ import org.slf4j.LoggerFactory;
  */
 public class KylinReducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> extends 
Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
     private static final Logger logger = 
LoggerFactory.getLogger(KylinReducer.class);
-    
+
+    protected int reduceCounter = 0;
+
     protected void bindCurrentConfiguration(Configuration conf) {
         HadoopUtil.setCurrentConfiguration(conf);
     }
@@ -37,6 +40,10 @@ public class KylinReducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> 
extends Reducer<KEYI
     @Override
     final public void reduce(KEYIN key, Iterable<VALUEIN> values, 
Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, 
InterruptedException {
         try {
+            if (reduceCounter++ % BatchConstants.NORMAL_RECORD_LOG_THRESHOLD 
== 0) {
+                logger.info("Accepting Mapper Key with ordinal: " + 
reduceCounter);
+            }
+
             doReduce(key, values, context);
         } catch (IOException ex) { // KYLIN-2170
             logger.error("", ex);
@@ -52,11 +59,11 @@ public class KylinReducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> 
extends Reducer<KEYI
             throw ex;
         }
     }
-    
+
     protected void doReduce(KEYIN key, Iterable<VALUEIN> values, 
Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, 
InterruptedException {
         super.reduce(key, values, context);
     }
-    
+
     @Override
     final protected void cleanup(Reducer<KEYIN, VALUEIN, KEYOUT, 
VALUEOUT>.Context context) throws IOException, InterruptedException {
         try {

http://git-wip-us.apache.org/repos/asf/kylin/blob/58224921/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidReducer.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidReducer.java 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidReducer.java
index 9543f0a..b1d4aaa 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidReducer.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidReducer.java
@@ -50,11 +50,11 @@ public class CuboidReducer extends KylinReducer<Text, Text, 
Text, Text> {
     private BufferedMeasureCodec codec;
     private MeasureAggregators aggs;
 
-    private int counter;
     private int cuboidLevel;
     private boolean[] needAggr;
     private Object[] input;
     private Object[] result;
+    private int vcounter;
 
     private Text outputValue = new Text();
 
@@ -90,6 +90,9 @@ public class CuboidReducer extends KylinReducer<Text, Text, 
Text, Text> {
         aggs.reset();
 
         for (Text value : values) {
+            if (vcounter++ % BatchConstants.NORMAL_RECORD_LOG_THRESHOLD == 0) {
+                logger.info("Handling value with ordinal: " + vcounter);
+            }
             codec.decode(ByteBuffer.wrap(value.getBytes(), 0, 
value.getLength()), input);
             if (cuboidLevel > 0) {
                 aggs.aggregate(input, needAggr);
@@ -103,11 +106,5 @@ public class CuboidReducer extends KylinReducer<Text, 
Text, Text, Text> {
 
         outputValue.set(valueBuf.array(), 0, valueBuf.position());
         context.write(key, outputValue);
-
-        counter++;
-        if (counter % BatchConstants.NORMAL_RECORD_LOG_THRESHOLD == 0) {
-            logger.info("Handled " + counter + " records!");
-        }
     }
-
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/58224921/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/HiveToBaseCuboidMapper.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/HiveToBaseCuboidMapper.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/HiveToBaseCuboidMapper.java
index 9fa20ae..428f878 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/HiveToBaseCuboidMapper.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/HiveToBaseCuboidMapper.java
@@ -22,7 +22,6 @@ import java.io.IOException;
 
 import org.apache.kylin.engine.mr.IMRInput.IMRTableInputFormat;
 import org.apache.kylin.engine.mr.MRUtil;
-import org.apache.kylin.engine.mr.common.BatchConstants;
 
 /**
  * @author George Song (ysong1)
@@ -39,11 +38,6 @@ public class HiveToBaseCuboidMapper<KEYIN> extends 
BaseCuboidMapperBase<KEYIN, O
 
     @Override
     public void doMap(KEYIN key, Object value, Context context) throws 
IOException, InterruptedException {
-        counter++;
-        if (counter % BatchConstants.NORMAL_RECORD_LOG_THRESHOLD == 0) {
-            logger.info("Handled " + counter + " records!");
-        }
-
         String[] row = flatTableInputFormat.parseMapperInput(value);
         try {
             outputKV(row, context);

http://git-wip-us.apache.org/repos/asf/kylin/blob/58224921/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java
index 15bfd2e..116d5e0 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java
@@ -64,7 +64,6 @@ public class InMemCuboidMapper<KEYIN> extends 
KylinMapper<KEYIN, Object, ByteArr
     private CubeSegment cubeSegment;
     private IMRTableInputFormat flatTableInputFormat;
 
-    private int counter;
     private BlockingQueue<List<String>> queue = new 
ArrayBlockingQueue<List<String>>(64);
     private Future<?> future;
 
@@ -120,10 +119,6 @@ public class InMemCuboidMapper<KEYIN> extends 
KylinMapper<KEYIN, Object, ByteArr
 
         while (!future.isDone()) {
             if (queue.offer(rowAsList, 1, TimeUnit.SECONDS)) {
-                counter++;
-                if (counter % BatchConstants.NORMAL_RECORD_LOG_THRESHOLD == 0) 
{
-                    logger.info("Handled " + counter + " records!");
-                }
                 break;
             }
         }
@@ -131,7 +126,7 @@ public class InMemCuboidMapper<KEYIN> extends 
KylinMapper<KEYIN, Object, ByteArr
 
     @Override
     protected void doCleanup(Context context) throws IOException, 
InterruptedException {
-        logger.info("Totally handled " + counter + " records!");
+        logger.info("Totally handled " + mapCounter + " records!");
 
         while (!future.isDone()) {
             if (queue.offer(Collections.<String> emptyList(), 1, 
TimeUnit.SECONDS)) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/58224921/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java
index d0a7062..04c9e90 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java
@@ -46,10 +46,11 @@ public class InMemCuboidReducer extends 
KylinReducer<ByteArrayWritable, ByteArra
     private BufferedMeasureCodec codec;
     private MeasureAggregators aggs;
 
-    private int counter;
     private Object[] input;
     private Object[] result;
 
+    private int vcounter;
+
     private Text outputKey;
     private Text outputValue;
 
@@ -78,6 +79,9 @@ public class InMemCuboidReducer extends 
KylinReducer<ByteArrayWritable, ByteArra
         aggs.reset();
 
         for (ByteArrayWritable value : values) {
+            if (vcounter++ % BatchConstants.NORMAL_RECORD_LOG_THRESHOLD == 0) {
+                logger.info("Handling value with ordinal: " + vcounter);
+            }
             codec.decode(value.asBuffer(), input);
             aggs.aggregate(input);
         }
@@ -92,10 +96,6 @@ public class InMemCuboidReducer extends 
KylinReducer<ByteArrayWritable, ByteArra
 
         context.write(outputKey, outputValue);
 
-        counter++;
-        if (counter % BatchConstants.NORMAL_RECORD_LOG_THRESHOLD == 0) {
-            logger.info("Handled " + counter + " records!");
-        }
     }
 
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/58224921/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java
index 8107e52..01cdd4a 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java
@@ -129,18 +129,16 @@ public class NDCuboidMapper extends KylinMapper<Text, 
Text, Text, Text> {
         // if still empty or null
         if (myChildren == null || myChildren.size() == 0) {
             context.getCounter(BatchConstants.MAPREDUCE_COUNTER_GROUP_NAME, 
"Skipped records").increment(1L);
-            skipCounter++;
-            if (skipCounter % BatchConstants.NORMAL_RECORD_LOG_THRESHOLD == 0) 
{
-                logger.info("Skipped " + skipCounter + " records!");
+            if (skipCounter++ % BatchConstants.NORMAL_RECORD_LOG_THRESHOLD == 
0) {
+                logger.info("Skipping record with ordinal: " + skipCounter);
             }
             return;
         }
 
         context.getCounter(BatchConstants.MAPREDUCE_COUNTER_GROUP_NAME, 
"Processed records").increment(1L);
 
-        handleCounter++;
-        if (handleCounter % BatchConstants.NORMAL_RECORD_LOG_THRESHOLD == 0) {
-            logger.info("Handled " + handleCounter + " records!");
+        if (handleCounter++ % BatchConstants.NORMAL_RECORD_LOG_THRESHOLD == 0) 
{
+            logger.info("Handling record with ordinal: " + handleCounter);
         }
 
         for (Long child : myChildren) {

Reply via email to