This is an automated email from the ASF dual-hosted git repository.

hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 2991f42  [GOBBLIN-1045] Emit more events in compaction job
2991f42 is described below

commit 2991f42873ebb626a849c54e7b101d50771ff716
Author: zhchen <[email protected]>
AuthorDate: Tue Feb 18 13:44:57 2020 -0800

    [GOBBLIN-1045] Emit more events in compaction job
    
    Closes #2885 from zxcware/compact-metrics
---
 .../action/CompactionHiveRegistrationAction.java   | 24 +++++-
 .../mapreduce/CompactorOutputCommitter.java        |  6 ++
 .../compaction/mapreduce/MRCompactionTask.java     | 32 ++++++-
 .../CompactionHiveRegistrationActionTest.java      | 99 ++++++++++++++++++++++
 .../gobblin/metrics/event/CountEventBuilder.java   | 12 +--
 .../metrics/event/EntityMissingEventBuilder.java   |  3 +-
 .../gobblin/metrics/event/FailureEventBuilder.java |  2 +-
 .../metrics/event/JobStateEventBuilder.java        |  2 +-
 .../apache/gobblin/runtime/mapreduce/MRTask.java   |  2 +-
 9 files changed, 169 insertions(+), 13 deletions(-)

diff --git 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionHiveRegistrationAction.java
 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionHiveRegistrationAction.java
index b1a4faa..ca32ea2 100644
--- 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionHiveRegistrationAction.java
+++ 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionHiveRegistrationAction.java
@@ -30,16 +30,17 @@ import com.google.common.collect.ImmutableMap;
 import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.compaction.event.CompactionSlaEventHelper;
+import org.apache.gobblin.compaction.mapreduce.MRCompactionTask;
 import org.apache.gobblin.compaction.parser.CompactionPathParser;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.configuration.WorkUnitState;
-import org.apache.gobblin.data.management.dataset.SimpleFileSystemDataset;
 import org.apache.gobblin.dataset.FileSystemDataset;
 import org.apache.gobblin.hive.HiveRegister;
 import org.apache.gobblin.hive.policy.HiveRegistrationPolicy;
 import org.apache.gobblin.hive.policy.HiveRegistrationPolicyBase;
 import org.apache.gobblin.hive.spec.HiveSpec;
+import org.apache.gobblin.metrics.event.CountEventBuilder;
 import org.apache.gobblin.metrics.event.EventSubmitter;
 
 
@@ -48,8 +49,15 @@ import org.apache.gobblin.metrics.event.EventSubmitter;
  */
 @Slf4j
 public class CompactionHiveRegistrationAction implements 
CompactionCompleteAction<FileSystemDataset> {
+
+  public static final String NUM_OUTPUT_FILES = "numOutputFiles";
+  public static final String RECORD_COUNT = "recordCount";
+  public static final String BYTE_COUNT = "byteCount";
+  public static final String DATASET_URN = "datasetUrn";
+
   private final State state;
   private EventSubmitter eventSubmitter;
+
   public CompactionHiveRegistrationAction (State state) {
     if (!(state instanceof WorkUnitState)) {
       throw new UnsupportedOperationException(this.getClass().getName() + " 
only supports workunit state");
@@ -62,10 +70,22 @@ public class CompactionHiveRegistrationAction implements 
CompactionCompleteActio
       return;
     }
 
+    CompactionPathParser.CompactionParserResult result = new 
CompactionPathParser(state).parse(dataset);
+
+    long numFiles = state.getPropAsLong(MRCompactionTask.FILE_COUNT, -1);
+    CountEventBuilder fileCountEvent = new CountEventBuilder(NUM_OUTPUT_FILES, 
numFiles);
+    fileCountEvent.addMetadata(DATASET_URN, result.getDstAbsoluteDir());
+    fileCountEvent.addMetadata(RECORD_COUNT, 
state.getProp(MRCompactionTask.RECORD_COUNT, "-1"));
+    fileCountEvent.addMetadata(BYTE_COUNT, 
state.getProp(MRCompactionTask.BYTE_COUNT, "-1"));
+    if (this.eventSubmitter != null) {
+      this.eventSubmitter.submit(fileCountEvent);
+    } else {
+      log.warn("Will not emit events in {} as EventSubmitter is null", 
getClass().getName());
+    }
+
     if (state.contains(ConfigurationKeys.HIVE_REGISTRATION_POLICY)) {
       HiveRegister hiveRegister = HiveRegister.get(state);
       HiveRegistrationPolicy hiveRegistrationPolicy = 
HiveRegistrationPolicyBase.getPolicy(state);
-      CompactionPathParser.CompactionParserResult result = new 
CompactionPathParser(state).parse(dataset);
 
       List<String> paths = new ArrayList<>();
       for (HiveSpec spec : hiveRegistrationPolicy.getHiveSpecs(new 
Path(result.getDstAbsoluteDir()))) {
diff --git 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactorOutputCommitter.java
 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactorOutputCommitter.java
index 5968095..4e86348 100644
--- 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactorOutputCommitter.java
+++ 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactorOutputCommitter.java
@@ -41,6 +41,11 @@ import org.slf4j.LoggerFactory;
  * {recordCount}.{timestamp}.<extensionName>(avro, orc, etc.).
  */
 public class CompactorOutputCommitter extends FileOutputCommitter {
+
+  public enum EVENT_COUNTER {
+    OUTPUT_FILE_COUNT
+  }
+
   /**
    * Note that the value of this key doesn't have dot.
    */
@@ -93,6 +98,7 @@ public class CompactorOutputCommitter extends 
FileOutputCommitter {
         Path newPath = new Path(status.getPath().getParent(), fileName);
         LOG.info(String.format("Renaming %s to %s", status.getPath(), 
newPath));
         fs.rename(status.getPath(), newPath);
+        context.getCounter(EVENT_COUNTER.OUTPUT_FILE_COUNT).increment(1);
       }
     }
 
diff --git 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactionTask.java
 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactionTask.java
index ceb5108..952e46a 100644
--- 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactionTask.java
+++ 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactionTask.java
@@ -22,6 +22,7 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormatCounter;
 
 import com.google.common.collect.ImmutableMap;
 
@@ -32,11 +33,11 @@ import 
org.apache.gobblin.compaction.event.CompactionSlaEventHelper;
 import org.apache.gobblin.compaction.suite.CompactionSuite;
 import org.apache.gobblin.compaction.suite.CompactionSuiteUtils;
 import org.apache.gobblin.compaction.verify.CompactionVerifier;
-import org.apache.gobblin.data.management.dataset.SimpleFileSystemDataset;
 import org.apache.gobblin.dataset.Dataset;
 import org.apache.gobblin.dataset.FileSystemDataset;
 import org.apache.gobblin.metrics.event.EventSubmitter;
 import org.apache.gobblin.runtime.TaskContext;
+import org.apache.gobblin.runtime.TaskState;
 import org.apache.gobblin.runtime.mapreduce.MRTask;
 
 
@@ -49,6 +50,11 @@ import org.apache.gobblin.runtime.mapreduce.MRTask;
  */
 @Slf4j
 public class MRCompactionTask extends MRTask {
+
+  public static final String RECORD_COUNT = "counter.recordCount";
+  public static final String FILE_COUNT = "counter.fileCount";
+  public static final String BYTE_COUNT = "counter.byteCount";
+
   protected final CompactionSuite suite;
   protected final Dataset dataset;
   protected final EventSubmitter eventSubmitter;
@@ -95,6 +101,8 @@ public class MRCompactionTask extends MRTask {
   public void onMRTaskComplete (boolean isSuccess, Throwable throwable) {
     if (isSuccess) {
       try {
+        setCounterInfo(taskContext.getTaskState());
+
         List<CompactionCompleteAction> actions = 
this.suite.getCompactionCompleteActions();
         for (CompactionCompleteAction action: actions) {
           action.addEventSubmitter(eventSubmitter);
@@ -112,6 +120,28 @@ public class MRCompactionTask extends MRTask {
     }
   }
 
+  private void setCounterInfo(TaskState taskState)
+      throws IOException {
+
+    if (mrJob == null) {
+      return;
+    }
+
+    long recordCount = getCounterValue(mrJob, 
RecordKeyDedupReducerBase.EVENT_COUNTER.RECORD_COUNT);
+    if (recordCount == 0) {
+      // map only job
+      recordCount = getCounterValue(mrJob, 
RecordKeyMapperBase.EVENT_COUNTER.RECORD_COUNT);
+    }
+    taskState.setProp(RECORD_COUNT, recordCount);
+    taskState.setProp(FILE_COUNT, getCounterValue(mrJob, 
CompactorOutputCommitter.EVENT_COUNTER.OUTPUT_FILE_COUNT));
+    taskState.setProp(BYTE_COUNT, getCounterValue(mrJob, 
FileOutputFormatCounter.BYTES_WRITTEN));
+  }
+
+  private long getCounterValue(Job job, Enum<?> key)
+      throws IOException {
+    return job.getCounters().findCounter(key).getValue();
+  }
+
   private void submitEvent(String eventName) {
     Map<String, String> eventMetadataMap = 
ImmutableMap.of(CompactionSlaEventHelper.DATASET_URN, 
this.dataset.datasetURN());
     this.eventSubmitter.submit(eventName, eventMetadataMap);
diff --git 
a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/action/CompactionHiveRegistrationActionTest.java
 
b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/action/CompactionHiveRegistrationActionTest.java
new file mode 100644
index 0000000..86fdd21
--- /dev/null
+++ 
b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/action/CompactionHiveRegistrationActionTest.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.compaction.action;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.fs.Path;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.Lists;
+
+import org.apache.gobblin.compaction.mapreduce.MRCompactionTask;
+import org.apache.gobblin.compaction.mapreduce.MRCompactor;
+import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.data.management.dataset.SimpleFileSystemDataset;
+import org.apache.gobblin.dataset.FileSystemDataset;
+import org.apache.gobblin.metrics.GobblinTrackingEvent;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.context.NameConflictException;
+import org.apache.gobblin.metrics.event.CountEventBuilder;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+
+
+public class CompactionHiveRegistrationActionTest {
+
+  @Test
+  public void testEvents()
+      throws Exception {
+
+    WorkUnitState state = new WorkUnitState();
+    String inputDir = "/data/tracking";
+    String inputSubDir = "hourly";
+    String destSubDir = "daily";
+    String pathPattern = "%s/myTopic/%s/2019/12/20";
+
+    String datasetPath = String.format(pathPattern, inputDir, inputSubDir);
+    state.setProp(MRCompactor.COMPACTION_INPUT_DIR, inputDir);
+    state.setProp(MRCompactor.COMPACTION_DEST_DIR, inputDir);
+    state.setProp(MRCompactor.COMPACTION_INPUT_SUBDIR, inputSubDir);
+    state.setProp(MRCompactor.COMPACTION_DEST_SUBDIR, destSubDir);
+
+    state.setProp(MRCompactionTask.FILE_COUNT, "10");
+    state.setProp(MRCompactionTask.RECORD_COUNT, "100");
+
+    CompactionHiveRegistrationAction action = new 
CompactionHiveRegistrationAction(state);
+    MockMetricContext mockMetricContext = new 
MockMetricContext(getClass().getName());
+    String namespace = "compaction.tracking.events";
+    EventSubmitter eventSubmitter = new 
EventSubmitter.Builder(mockMetricContext, namespace).build();
+    action.addEventSubmitter(eventSubmitter);
+
+    FileSystemDataset dataset = new SimpleFileSystemDataset(new 
Path(datasetPath));
+    action.onCompactionJobComplete(dataset);
+
+    String destinationPath = String.format(pathPattern, inputDir, destSubDir);
+
+    Assert.assertEquals(mockMetricContext.events.size(), 1);
+    CountEventBuilder fileCountEvent = 
CountEventBuilder.fromEvent(mockMetricContext.events.get(0));
+    Assert.assertEquals(fileCountEvent.getNamespace(), namespace);
+    Assert.assertEquals(fileCountEvent.getName(), 
CompactionHiveRegistrationAction.NUM_OUTPUT_FILES);
+    Assert.assertEquals(fileCountEvent.getCount(), 10);
+    Map<String, String> metadata = fileCountEvent.getMetadata();
+    
Assert.assertEquals(metadata.get(CompactionHiveRegistrationAction.DATASET_URN), 
destinationPath);
+    
Assert.assertEquals(metadata.get(CompactionHiveRegistrationAction.RECORD_COUNT),
 "100");
+    
Assert.assertEquals(metadata.get(CompactionHiveRegistrationAction.BYTE_COUNT), 
"-1");
+  }
+
+  private class MockMetricContext extends MetricContext {
+
+    List<GobblinTrackingEvent> events;
+
+    MockMetricContext(String name)
+        throws NameConflictException {
+      super(name,null, Lists.newArrayList(), false);
+      events = Lists.newArrayList();
+    }
+
+    @Override
+    public void submitEvent(GobblinTrackingEvent nonReusableEvent) {
+      events.add(nonReusableEvent);
+    }
+  }
+}
diff --git 
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/CountEventBuilder.java
 
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/CountEventBuilder.java
index 79f0ca5..b663d50 100644
--- 
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/CountEventBuilder.java
+++ 
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/CountEventBuilder.java
@@ -38,13 +38,13 @@ public class CountEventBuilder extends GobblinEventBuilder {
   public static final String COUNT_KEY = "count";
   @Setter
   @Getter
-  private int count;
+  private long count;
 
-  public CountEventBuilder(String name, int count) {
+  public CountEventBuilder(String name, long count) {
     this(name, null, count);
   }
 
-  public CountEventBuilder(String name, String namespace, int count) {
+  public CountEventBuilder(String name, String namespace, long count) {
     super(name, namespace);
     this.metadata.put(EVENT_TYPE, COUNT_EVENT_TYPE);
     this.count = count;
@@ -56,7 +56,7 @@ public class CountEventBuilder extends GobblinEventBuilder {
    */
   @Override
   public GobblinTrackingEvent build() {
-    this.metadata.put(COUNT_KEY, Integer.toString(count));
+    this.metadata.put(COUNT_KEY, Long.toString(count));
     return super.build();
   }
 
@@ -77,8 +77,8 @@ public class CountEventBuilder extends GobblinEventBuilder {
     }
 
     Map<String, String> metadata = event.getMetadata();
-    int count = Integer.parseInt(metadata.getOrDefault(COUNT_KEY, "0"));
-    CountEventBuilder countEventBuilder = new 
CountEventBuilder(event.getName(), count);
+    long count = Long.parseLong(metadata.getOrDefault(COUNT_KEY, "0"));
+    CountEventBuilder countEventBuilder = new 
CountEventBuilder(event.getName(), event.getNamespace(), count);
     metadata.forEach((key, value) -> {
       switch (key) {
         case COUNT_KEY:
diff --git 
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/EntityMissingEventBuilder.java
 
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/EntityMissingEventBuilder.java
index 6b43f97..8d8e199 100644
--- 
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/EntityMissingEventBuilder.java
+++ 
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/EntityMissingEventBuilder.java
@@ -70,7 +70,8 @@ public class EntityMissingEventBuilder extends 
GobblinEventBuilder {
 
     Map<String, String> metadata = event.getMetadata();
     String instance = metadata.get(INSTANCE_KEY);
-    EntityMissingEventBuilder eventBuilder = new 
EntityMissingEventBuilder(event.getName(), instance);
+    EntityMissingEventBuilder eventBuilder = new EntityMissingEventBuilder(
+        event.getName(), event.getNamespace(), instance);
     metadata.forEach((key, value) -> {
       switch (key) {
         case INSTANCE_KEY:
diff --git 
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/FailureEventBuilder.java
 
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/FailureEventBuilder.java
index dd6cf38..a66d9f7 100644
--- 
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/FailureEventBuilder.java
+++ 
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/FailureEventBuilder.java
@@ -99,7 +99,7 @@ public class FailureEventBuilder extends GobblinEventBuilder {
     }
 
     Map<String, String> metadata = event.getMetadata();
-    FailureEventBuilder failureEvent = new 
FailureEventBuilder(event.getName());
+    FailureEventBuilder failureEvent = new 
FailureEventBuilder(event.getName(), event.getNamespace());
 
     metadata.forEach((key, value) -> {
       switch (key) {
diff --git 
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/JobStateEventBuilder.java
 
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/JobStateEventBuilder.java
index 103d3c1..de9e77f 100644
--- 
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/JobStateEventBuilder.java
+++ 
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/JobStateEventBuilder.java
@@ -71,7 +71,7 @@ public class JobStateEventBuilder extends GobblinEventBuilder 
{
     }
 
     Map<String, String> metadata = event.getMetadata();
-    JobStateEventBuilder eventBuilder = new 
JobStateEventBuilder(event.getName());
+    JobStateEventBuilder eventBuilder = new 
JobStateEventBuilder(event.getName(), event.getNamespace());
     metadata.forEach((key, value) -> {
       switch (key) {
         case STATUS_KEY:
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRTask.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRTask.java
index af12004..05211f7 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRTask.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRTask.java
@@ -61,7 +61,7 @@ public class MRTask extends BaseAbstractTask {
     }
   }
 
-  private final TaskContext taskContext;
+  protected final TaskContext taskContext;
   private final EventSubmitter eventSubmitter;
   protected Job mrJob;
 

Reply via email to