This is an automated email from the ASF dual-hosted git repository.

hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 19467a0  [GOBBLIN-827] Add more events
19467a0 is described below

commit 19467a0120cf792bba5b0a98820f88c65d8b30ad
Author: zhchen <[email protected]>
AuthorDate: Thu Jul 18 12:02:36 2019 -0700

    [GOBBLIN-827] Add more events
    
    Closes #2688 from zxcware/metrics
---
 .../gobblin/metrics/event/CountEventBuilder.java   |  3 +-
 .../metrics/event/EntityMissingEventBuilder.java   | 86 ++++++++++++++++++++
 .../gobblin/metrics/event/GobblinEventBuilder.java |  5 +-
 .../org/apache/gobblin/metrics/event/JobEvent.java |  2 +-
 .../metrics/event/JobStateEventBuilder.java        | 95 ++++++++++++++++++++++
 .../gobblin/metrics/event/GobblinEventTest.java    | 84 +++++++++++++++++++
 .../gobblin/runtime/mapreduce/MRJobLauncher.java   | 10 +++
 7 files changed, 281 insertions(+), 4 deletions(-)

diff --git 
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/CountEventBuilder.java
 
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/CountEventBuilder.java
index bcaa732..79f0ca5 100644
--- 
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/CountEventBuilder.java
+++ 
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/CountEventBuilder.java
@@ -41,7 +41,7 @@ public class CountEventBuilder extends GobblinEventBuilder {
   private int count;
 
   public CountEventBuilder(String name, int count) {
-    this(name, NAMESPACE, count);
+    this(name, null, count);
   }
 
   public CountEventBuilder(String name, String namespace, int count) {
@@ -82,7 +82,6 @@ public class CountEventBuilder extends GobblinEventBuilder {
     metadata.forEach((key, value) -> {
       switch (key) {
         case COUNT_KEY:
-          countEventBuilder.setCount(Integer.parseInt(value));
           break;
         default:
           countEventBuilder.addMetadata(key, value);
diff --git 
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/EntityMissingEventBuilder.java
 
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/EntityMissingEventBuilder.java
new file mode 100644
index 0000000..6b43f97
--- /dev/null
+++ 
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/EntityMissingEventBuilder.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metrics.event;
+
+import java.util.Map;
+
+import org.apache.commons.lang.StringUtils;
+
+import lombok.Getter;
+
+import org.apache.gobblin.metrics.GobblinTrackingEvent;
+
+
+/**
+ * An event reporting missing any kind of entity, e.g. WorkUnit, Record, kafka 
topic
+ */
+public class EntityMissingEventBuilder extends GobblinEventBuilder {
+
+  private static final String ENTITY_MISSING_EVENT_TYPE = "EntityMissingEvent";
+  private static final String INSTANCE_KEY = "entityInstance";
+
+  /**
+   * The missing instance of this entity, e.g record id, topic name
+   */
+  @Getter
+  private final String instance;
+
+  public EntityMissingEventBuilder(String name, String instance) {
+    this(name, null, instance);
+  }
+
+  public EntityMissingEventBuilder(String name, String namespace, String 
instance) {
+    super(name, namespace);
+    this.instance = instance;
+    this.metadata.put(EVENT_TYPE, ENTITY_MISSING_EVENT_TYPE);
+  }
+
+  @Override
+  public GobblinTrackingEvent build() {
+    if (instance != null) {
+      this.metadata.put(INSTANCE_KEY, instance);
+    }
+    return super.build();
+  }
+
+  public static boolean isEntityMissingEvent(GobblinTrackingEvent event) {
+    String eventType = (event.getMetadata() == null) ? "" : 
event.getMetadata().get(EVENT_TYPE);
+    return StringUtils.isNotEmpty(eventType) && 
eventType.equals(ENTITY_MISSING_EVENT_TYPE);
+  }
+
+  public static EntityMissingEventBuilder fromEvent(GobblinTrackingEvent 
event) {
+    if(!isEntityMissingEvent(event)) {
+      return null;
+    }
+
+    Map<String, String> metadata = event.getMetadata();
+    String instance = metadata.get(INSTANCE_KEY);
+    EntityMissingEventBuilder eventBuilder = new 
EntityMissingEventBuilder(event.getName(), instance);
+    metadata.forEach((key, value) -> {
+      switch (key) {
+        case INSTANCE_KEY:
+          break;
+        default:
+          eventBuilder.addMetadata(key, value);
+          break;
+      }
+    });
+
+    return eventBuilder;
+  }
+}
diff --git 
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/GobblinEventBuilder.java
 
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/GobblinEventBuilder.java
index 0f03e93..6b4ae94 100644
--- 
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/GobblinEventBuilder.java
+++ 
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/GobblinEventBuilder.java
@@ -30,7 +30,10 @@ import org.apache.gobblin.metrics.MetricContext;
 
 
 /**
- * A general gobblin event builder which builds a {@link GobblinTrackingEvent}
+ * This class is to support semi-typed Gobblin event. Instead of all events 
represented as
+ * instances of {@link GobblinTrackingEvent}. Different types of events can be 
defined from {@link GobblinEventBuilder},
+ * where each can define its own attributes. In this way, one can inspect a 
Gobblin event with the corresponding event
+ * builder instead of looking into the metadata maps, whose keys could be 
changed without control
  *
  * Note: a {@link GobblinEventBuilder} instance is not reusable
  */
diff --git 
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/JobEvent.java
 
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/JobEvent.java
index d9310fb..3f35b73 100644
--- 
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/JobEvent.java
+++ 
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/JobEvent.java
@@ -27,7 +27,7 @@ import org.apache.gobblin.metrics.GobblinTrackingEvent;
  */
 public class JobEvent {
 
-  public static final String JOB_STATE = "JobStateEvent";
+  public static final String JOB_STATE = "JobStateEvent"; // TODO: Migrate to 
JobStateEventBuilder
   public static final String LOCK_IN_USE = "LockInUse";
   public static final String WORK_UNITS_MISSING = "WorkUnitsMissing";
   public static final String WORK_UNITS_EMPTY = "WorkUnitsEmpty";
diff --git 
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/JobStateEventBuilder.java
 
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/JobStateEventBuilder.java
new file mode 100644
index 0000000..103d3c1
--- /dev/null
+++ 
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/JobStateEventBuilder.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metrics.event;
+
+import java.util.Map;
+
+import org.apache.commons.lang.StringUtils;
+
+import org.apache.gobblin.metrics.GobblinTrackingEvent;
+
+
+/**
+ * An event reporting job state. It can be used to report the overall Gobblin 
job state or
+ * the state of an internal job, e.g MR job
+ */
+public class JobStateEventBuilder extends GobblinEventBuilder {
+
+  private static final String JOB_STATE_EVENT_TYPE = "JobStateEvent";
+  private static final String STATUS_KEY = "jobState";
+  private static final String JOB_URL_KEY = "jobTrackingURL";
+
+  public enum Status {
+    SUCCEEDED,
+    FAILED
+  }
+
+  public Status status;
+  public String jobTrackingURL;
+
+  public JobStateEventBuilder(String name) {
+    this(name, null);
+  }
+
+  public JobStateEventBuilder(String name, String namespace) {
+    super(name, namespace);
+    this.metadata.put(EVENT_TYPE, JOB_STATE_EVENT_TYPE);
+  }
+
+  @Override
+  public GobblinTrackingEvent build() {
+    if (status != null) {
+      this.metadata.put(STATUS_KEY, status.name());
+    }
+    this.metadata.put(JOB_URL_KEY, jobTrackingURL);
+    return super.build();
+  }
+
+  public static boolean isJobStateEvent(GobblinTrackingEvent event) {
+    String eventType = (event.getMetadata() == null) ? "" : 
event.getMetadata().get(EVENT_TYPE);
+    return StringUtils.isNotEmpty(eventType) && 
eventType.equals(JOB_STATE_EVENT_TYPE);
+  }
+
+  public static JobStateEventBuilder fromEvent(GobblinTrackingEvent event) {
+    if(!isJobStateEvent(event)) {
+      return null;
+    }
+
+    Map<String, String> metadata = event.getMetadata();
+    JobStateEventBuilder eventBuilder = new 
JobStateEventBuilder(event.getName());
+    metadata.forEach((key, value) -> {
+      switch (key) {
+        case STATUS_KEY:
+          eventBuilder.status = Status.valueOf(value);
+          break;
+        case JOB_URL_KEY:
+          eventBuilder.jobTrackingURL = value;
+          break;
+        default:
+          eventBuilder.addMetadata(key, value);
+          break;
+      }
+    });
+
+    return eventBuilder;
+  }
+
+  public static final class MRJobState {
+    public static final String MR_JOB_STATE = "MRJobState";
+  }
+}
diff --git 
a/gobblin-metrics-libs/gobblin-metrics-base/src/test/java/org/apache/gobblin/metrics/event/GobblinEventTest.java
 
b/gobblin-metrics-libs/gobblin-metrics-base/src/test/java/org/apache/gobblin/metrics/event/GobblinEventTest.java
new file mode 100644
index 0000000..c4440b8
--- /dev/null
+++ 
b/gobblin-metrics-libs/gobblin-metrics-base/src/test/java/org/apache/gobblin/metrics/event/GobblinEventTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metrics.event;
+
+import java.util.Map;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import org.apache.gobblin.metrics.GobblinTrackingEvent;
+
+
+/**
+ * Test all {@link GobblinEventBuilder}s
+ */
+public class GobblinEventTest {
+
+  @Test
+  public void testJobStateEvent() {
+    // Test build JobStateEvent
+    /// build without status
+    String jobUrl = "jobUrl";
+    JobStateEventBuilder eventBuilder = new 
JobStateEventBuilder(JobStateEventBuilder.MRJobState.MR_JOB_STATE);
+    eventBuilder.jobTrackingURL = jobUrl;
+    GobblinTrackingEvent event = eventBuilder.build();
+    Assert.assertEquals(event.getName(), "MRJobState");
+    Assert.assertNull(event.getNamespace());
+    Map<String, String> metadata = event.getMetadata();
+    Assert.assertEquals(metadata.size(), 2);
+    Assert.assertEquals(metadata.get("eventType"), "JobStateEvent");
+    Assert.assertEquals(metadata.get("jobTrackingURL"), jobUrl);
+    Assert.assertNull(metadata.get("jobState"));
+
+    /// build with status
+    eventBuilder.status = JobStateEventBuilder.Status.FAILED;
+    event = eventBuilder.build();
+    metadata = event.getMetadata();
+    Assert.assertEquals(metadata.size(), 3);
+    Assert.assertEquals(metadata.get("jobState"), "FAILED");
+
+    // Test parse from GobblinTrackingEvent
+    JobStateEventBuilder parsedEvent = JobStateEventBuilder.fromEvent(event);
+    Assert.assertEquals(parsedEvent.status, 
JobStateEventBuilder.Status.FAILED);
+    Assert.assertEquals(parsedEvent.jobTrackingURL, jobUrl);
+    Assert.assertEquals(parsedEvent.getMetadata().size(), 1);
+  }
+
+  @Test
+  public void testEntityMissingEvent() {
+    // Test build EntityMissingEvent
+    String instance = "mytopic";
+    String eventClass = "TopicMissing";
+    EntityMissingEventBuilder eventBuilder = new 
EntityMissingEventBuilder(eventClass, instance);
+    GobblinTrackingEvent event = eventBuilder.build();
+    Assert.assertEquals(event.getName(), eventClass);
+    Assert.assertNull(event.getNamespace());
+    Map<String, String> metadata = event.getMetadata();
+    Assert.assertEquals(metadata.size(), 2);
+    Assert.assertEquals(metadata.get("eventType"), "EntityMissingEvent");
+    Assert.assertEquals(metadata.get("entityInstance"), instance);
+
+    // Test parse from GobblinTrackingEvent
+    Assert.assertNull(JobStateEventBuilder.fromEvent(event));
+    EntityMissingEventBuilder parsedEvent = 
EntityMissingEventBuilder.fromEvent(event);
+    Assert.assertEquals(parsedEvent.getName(), eventClass);
+    Assert.assertEquals(parsedEvent.getInstance(), instance);
+    Assert.assertEquals(parsedEvent.getMetadata().size(), 1);
+  }
+}
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java
index 529fe11..0a2a284 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java
@@ -29,6 +29,7 @@ import java.util.concurrent.TimeoutException;
 
 import org.apache.gobblin.fsm.FiniteStateMachine;
 import org.apache.gobblin.fsm.StateWithCallbacks;
+import org.apache.gobblin.metrics.event.JobStateEventBuilder;
 import org.apache.gobblin.runtime.job.GobblinJobFiniteStateMachine;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
@@ -43,6 +44,7 @@ import org.apache.hadoop.mapreduce.Counter;
 import org.apache.hadoop.mapreduce.CounterGroup;
 import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobStatus;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.mapreduce.filecache.DistributedCache;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
@@ -301,6 +303,14 @@ public class MRJobLauncher extends AbstractJobLauncher {
       // The metrics set is to be persisted to the metrics store later.
       countersToMetrics(JobMetrics.get(jobName, 
this.jobProps.getProperty(ConfigurationKeys.JOB_ID_KEY)));
     } finally {
+      JobStateEventBuilder eventBuilder = new 
JobStateEventBuilder(JobStateEventBuilder.MRJobState.MR_JOB_STATE);
+      eventBuilder.jobTrackingURL = this.job.getTrackingURL();
+      eventBuilder.status = JobStateEventBuilder.Status.SUCCEEDED;
+      if (this.job.getJobState() != JobStatus.State.SUCCEEDED) {
+        eventBuilder.status = JobStateEventBuilder.Status.FAILED;
+      }
+      this.eventSubmitter.submit(eventBuilder);
+
       // The last iteration of output TaskState collecting will run when the 
collector service gets stopped
       this.taskStateCollectorService.stopAsync().awaitTerminated();
       cleanUpWorkingDirectory();

Reply via email to