This is an automated email from the ASF dual-hosted git repository.

wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new f469b11f0 [GOBBLIN-1699] Log progress of reducer task for visibility 
with slow compaction jobs #3552
f469b11f0 is described below

commit f469b11f088611c8b5da0e13a7ebf3b9f5bf0a14
Author: umustafi <[email protected]>
AuthorDate: Fri Sep 9 11:49:54 2022 -0700

    [GOBBLIN-1699] Log progress of reducer task for visibility with slow 
compaction jobs #3552
    
    * before starting reduce
    * after first record is reduced
    * after reducing every 1000 records
    
    Co-authored-by: Urmi Mustafi <[email protected]>
---
 .../compaction/mapreduce/orc/OrcKeyDedupReducer.java      | 15 +++++++++++++++
 1 file changed, 15 insertions(+)

diff --git 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcKeyDedupReducer.java
 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcKeyDedupReducer.java
index 8539a99a0..675c51859 100644
--- 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcKeyDedupReducer.java
+++ 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcKeyDedupReducer.java
@@ -31,15 +31,18 @@ import org.apache.orc.mapred.OrcValue;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 
+import lombok.extern.slf4j.Slf4j;
 
 /**
  * Check record duplicates in reducer-side.
  */
+@Slf4j
 public class OrcKeyDedupReducer extends RecordKeyDedupReducerBase<OrcKey, 
OrcValue, NullWritable, OrcValue> {
   @VisibleForTesting
   public static final String ORC_DELTA_SCHEMA_PROVIDER =
       "org.apache.gobblin.compaction." + 
OrcKeyDedupReducer.class.getSimpleName() + ".deltaFieldsProvider";
   public static final String USING_WHOLE_RECORD_FOR_COMPARE = 
"usingWholeRecordForCompareInReducer";
+  private int recordCounter = 0;
 
   @Override
   protected void setOutValue(OrcValue valueToRetain) {
@@ -60,7 +63,19 @@ public class OrcKeyDedupReducer extends 
RecordKeyDedupReducerBase<OrcKey, OrcVal
     Map<Integer, Integer> valuesToRetain = new HashMap<>();
     int valueHash = 0;
 
+    if (recordCounter == 0) {
+      log.info("Starting to reduce values for the first key {}", key);
+    }
+
     for (OrcValue value : values) {
+      if (recordCounter == 1) {
+        log.info("Reduced first value");
+      }
+      recordCounter++;
+      if (recordCounter % 1000 == 0) {
+        log.info("Reduced {} values so far", recordCounter);
+      }
+
       valueHash = ((OrcStruct) value.value).hashCode();
       if (valuesToRetain.containsKey(valueHash)) {
         valuesToRetain.put(valueHash, valuesToRetain.get(valueHash) + 1);

Reply via email to