This is an automated email from the ASF dual-hosted git repository.
wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new f469b11f0 [GOBBLIN-1699] Log progress of reducer task for visibility
with slow compaction jobs #3552
f469b11f0 is described below
commit f469b11f088611c8b5da0e13a7ebf3b9f5bf0a14
Author: umustafi <[email protected]>
AuthorDate: Fri Sep 9 11:49:54 2022 -0700
[GOBBLIN-1699] Log progress of reducer task for visibility with slow
compaction jobs #3552
* before starting reduce
* after first record is reduced
* after reducing every 1000 records
Co-authored-by: Urmi Mustafi <[email protected]>
---
.../compaction/mapreduce/orc/OrcKeyDedupReducer.java | 15 +++++++++++++++
1 file changed, 15 insertions(+)
diff --git
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcKeyDedupReducer.java
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcKeyDedupReducer.java
index 8539a99a0..675c51859 100644
---
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcKeyDedupReducer.java
+++
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcKeyDedupReducer.java
@@ -31,15 +31,18 @@ import org.apache.orc.mapred.OrcValue;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
+import lombok.extern.slf4j.Slf4j;
/**
* Check record duplicates in reducer-side.
*/
+@Slf4j
public class OrcKeyDedupReducer extends RecordKeyDedupReducerBase<OrcKey,
OrcValue, NullWritable, OrcValue> {
@VisibleForTesting
public static final String ORC_DELTA_SCHEMA_PROVIDER =
"org.apache.gobblin.compaction." +
OrcKeyDedupReducer.class.getSimpleName() + ".deltaFieldsProvider";
public static final String USING_WHOLE_RECORD_FOR_COMPARE =
"usingWholeRecordForCompareInReducer";
+ private int recordCounter = 0;
@Override
protected void setOutValue(OrcValue valueToRetain) {
@@ -60,7 +63,19 @@ public class OrcKeyDedupReducer extends
RecordKeyDedupReducerBase<OrcKey, OrcVal
Map<Integer, Integer> valuesToRetain = new HashMap<>();
int valueHash = 0;
+ if (recordCounter == 0) {
+ log.info("Starting to reduce values for the first key {}", key);
+ }
+
for (OrcValue value : values) {
+ if (recordCounter == 1) {
+ log.info("Reduced first value");
+ }
+ recordCounter++;
+ if (recordCounter % 1000 == 0) {
+ log.info("Reduced {} values so far", recordCounter);
+ }
+
valueHash = ((OrcStruct) value.value).hashCode();
if (valuesToRetain.containsKey(valueHash)) {
valuesToRetain.put(valueHash, valuesToRetain.get(valueHash) + 1);