This is an automated email from the ASF dual-hosted git repository.
hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 19467a0 [GOBBLIN-827] Add more events
19467a0 is described below
commit 19467a0120cf792bba5b0a98820f88c65d8b30ad
Author: zhchen <[email protected]>
AuthorDate: Thu Jul 18 12:02:36 2019 -0700
[GOBBLIN-827] Add more events
Closes #2688 from zxcware/metrics
---
.../gobblin/metrics/event/CountEventBuilder.java | 3 +-
.../metrics/event/EntityMissingEventBuilder.java | 86 ++++++++++++++++++++
.../gobblin/metrics/event/GobblinEventBuilder.java | 5 +-
.../org/apache/gobblin/metrics/event/JobEvent.java | 2 +-
.../metrics/event/JobStateEventBuilder.java | 95 ++++++++++++++++++++++
.../gobblin/metrics/event/GobblinEventTest.java | 84 +++++++++++++++++++
.../gobblin/runtime/mapreduce/MRJobLauncher.java | 10 +++
7 files changed, 281 insertions(+), 4 deletions(-)
diff --git
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/CountEventBuilder.java
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/CountEventBuilder.java
index bcaa732..79f0ca5 100644
---
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/CountEventBuilder.java
+++
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/CountEventBuilder.java
@@ -41,7 +41,7 @@ public class CountEventBuilder extends GobblinEventBuilder {
private int count;
public CountEventBuilder(String name, int count) {
- this(name, NAMESPACE, count);
+ this(name, null, count);
}
public CountEventBuilder(String name, String namespace, int count) {
@@ -82,7 +82,6 @@ public class CountEventBuilder extends GobblinEventBuilder {
metadata.forEach((key, value) -> {
switch (key) {
case COUNT_KEY:
- countEventBuilder.setCount(Integer.parseInt(value));
break;
default:
countEventBuilder.addMetadata(key, value);
diff --git
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/EntityMissingEventBuilder.java
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/EntityMissingEventBuilder.java
new file mode 100644
index 0000000..6b43f97
--- /dev/null
+++
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/EntityMissingEventBuilder.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metrics.event;
+
+import java.util.Map;
+
+import org.apache.commons.lang.StringUtils;
+
+import lombok.Getter;
+
+import org.apache.gobblin.metrics.GobblinTrackingEvent;
+
+
+/**
+ * An event reporting missing any kind of entity, e.g. WorkUnit, Record, kafka
topic
+ */
+public class EntityMissingEventBuilder extends GobblinEventBuilder {
+
+ private static final String ENTITY_MISSING_EVENT_TYPE = "EntityMissingEvent";
+ private static final String INSTANCE_KEY = "entityInstance";
+
+ /**
+ * The missing instance of this entity, e.g record id, topic name
+ */
+ @Getter
+ private final String instance;
+
+ public EntityMissingEventBuilder(String name, String instance) {
+ this(name, null, instance);
+ }
+
+ public EntityMissingEventBuilder(String name, String namespace, String
instance) {
+ super(name, namespace);
+ this.instance = instance;
+ this.metadata.put(EVENT_TYPE, ENTITY_MISSING_EVENT_TYPE);
+ }
+
+ @Override
+ public GobblinTrackingEvent build() {
+ if (instance != null) {
+ this.metadata.put(INSTANCE_KEY, instance);
+ }
+ return super.build();
+ }
+
+ public static boolean isEntityMissingEvent(GobblinTrackingEvent event) {
+ String eventType = (event.getMetadata() == null) ? "" :
event.getMetadata().get(EVENT_TYPE);
+ return StringUtils.isNotEmpty(eventType) &&
eventType.equals(ENTITY_MISSING_EVENT_TYPE);
+ }
+
+ public static EntityMissingEventBuilder fromEvent(GobblinTrackingEvent
event) {
+ if(!isEntityMissingEvent(event)) {
+ return null;
+ }
+
+ Map<String, String> metadata = event.getMetadata();
+ String instance = metadata.get(INSTANCE_KEY);
+ EntityMissingEventBuilder eventBuilder = new
EntityMissingEventBuilder(event.getName(), instance);
+ metadata.forEach((key, value) -> {
+ switch (key) {
+ case INSTANCE_KEY:
+ break;
+ default:
+ eventBuilder.addMetadata(key, value);
+ break;
+ }
+ });
+
+ return eventBuilder;
+ }
+}
diff --git
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/GobblinEventBuilder.java
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/GobblinEventBuilder.java
index 0f03e93..6b4ae94 100644
---
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/GobblinEventBuilder.java
+++
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/GobblinEventBuilder.java
@@ -30,7 +30,10 @@ import org.apache.gobblin.metrics.MetricContext;
/**
- * A general gobblin event builder which builds a {@link GobblinTrackingEvent}
+ * This class is to support semi-typed Gobblin event. Instead of all events
represented as
+ * instances of {@link GobblinTrackingEvent}. Different types of events can be
defined from {@link GobblinEventBuilder},
+ * where each can define its own attributes. In this way, one can inspect a
Gobblin event with the corresponding event
+ * builder instead of looking into the metadata maps, whose keys could be
changed without control
*
* Note: a {@link GobblinEventBuilder} instance is not reusable
*/
diff --git
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/JobEvent.java
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/JobEvent.java
index d9310fb..3f35b73 100644
---
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/JobEvent.java
+++
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/JobEvent.java
@@ -27,7 +27,7 @@ import org.apache.gobblin.metrics.GobblinTrackingEvent;
*/
public class JobEvent {
- public static final String JOB_STATE = "JobStateEvent";
+ public static final String JOB_STATE = "JobStateEvent"; // TODO: Migrate to
JobStateEventBuilder
public static final String LOCK_IN_USE = "LockInUse";
public static final String WORK_UNITS_MISSING = "WorkUnitsMissing";
public static final String WORK_UNITS_EMPTY = "WorkUnitsEmpty";
diff --git
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/JobStateEventBuilder.java
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/JobStateEventBuilder.java
new file mode 100644
index 0000000..103d3c1
--- /dev/null
+++
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/JobStateEventBuilder.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metrics.event;
+
+import java.util.Map;
+
+import org.apache.commons.lang.StringUtils;
+
+import org.apache.gobblin.metrics.GobblinTrackingEvent;
+
+
+/**
+ * An event reporting job state. It can be used to report the overall Gobblin
job state or
+ * the state of an internal job, e.g MR job
+ */
+public class JobStateEventBuilder extends GobblinEventBuilder {
+
+ private static final String JOB_STATE_EVENT_TYPE = "JobStateEvent";
+ private static final String STATUS_KEY = "jobState";
+ private static final String JOB_URL_KEY = "jobTrackingURL";
+
+ public enum Status {
+ SUCCEEDED,
+ FAILED
+ }
+
+ public Status status;
+ public String jobTrackingURL;
+
+ public JobStateEventBuilder(String name) {
+ this(name, null);
+ }
+
+ public JobStateEventBuilder(String name, String namespace) {
+ super(name, namespace);
+ this.metadata.put(EVENT_TYPE, JOB_STATE_EVENT_TYPE);
+ }
+
+ @Override
+ public GobblinTrackingEvent build() {
+ if (status != null) {
+ this.metadata.put(STATUS_KEY, status.name());
+ }
+ this.metadata.put(JOB_URL_KEY, jobTrackingURL);
+ return super.build();
+ }
+
+ public static boolean isJobStateEvent(GobblinTrackingEvent event) {
+ String eventType = (event.getMetadata() == null) ? "" :
event.getMetadata().get(EVENT_TYPE);
+ return StringUtils.isNotEmpty(eventType) &&
eventType.equals(JOB_STATE_EVENT_TYPE);
+ }
+
+ public static JobStateEventBuilder fromEvent(GobblinTrackingEvent event) {
+ if(!isJobStateEvent(event)) {
+ return null;
+ }
+
+ Map<String, String> metadata = event.getMetadata();
+ JobStateEventBuilder eventBuilder = new
JobStateEventBuilder(event.getName());
+ metadata.forEach((key, value) -> {
+ switch (key) {
+ case STATUS_KEY:
+ eventBuilder.status = Status.valueOf(value);
+ break;
+ case JOB_URL_KEY:
+ eventBuilder.jobTrackingURL = value;
+ break;
+ default:
+ eventBuilder.addMetadata(key, value);
+ break;
+ }
+ });
+
+ return eventBuilder;
+ }
+
+ public static final class MRJobState {
+ public static final String MR_JOB_STATE = "MRJobState";
+ }
+}
diff --git
a/gobblin-metrics-libs/gobblin-metrics-base/src/test/java/org/apache/gobblin/metrics/event/GobblinEventTest.java
b/gobblin-metrics-libs/gobblin-metrics-base/src/test/java/org/apache/gobblin/metrics/event/GobblinEventTest.java
new file mode 100644
index 0000000..c4440b8
--- /dev/null
+++
b/gobblin-metrics-libs/gobblin-metrics-base/src/test/java/org/apache/gobblin/metrics/event/GobblinEventTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metrics.event;
+
+import java.util.Map;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import org.apache.gobblin.metrics.GobblinTrackingEvent;
+
+
+/**
+ * Test all {@link GobblinEventBuilder}s
+ */
+public class GobblinEventTest {
+
+ @Test
+ public void testJobStateEvent() {
+ // Test build JobStateEvent
+ /// build without status
+ String jobUrl = "jobUrl";
+ JobStateEventBuilder eventBuilder = new
JobStateEventBuilder(JobStateEventBuilder.MRJobState.MR_JOB_STATE);
+ eventBuilder.jobTrackingURL = jobUrl;
+ GobblinTrackingEvent event = eventBuilder.build();
+ Assert.assertEquals(event.getName(), "MRJobState");
+ Assert.assertNull(event.getNamespace());
+ Map<String, String> metadata = event.getMetadata();
+ Assert.assertEquals(metadata.size(), 2);
+ Assert.assertEquals(metadata.get("eventType"), "JobStateEvent");
+ Assert.assertEquals(metadata.get("jobTrackingURL"), jobUrl);
+ Assert.assertNull(metadata.get("jobState"));
+
+ /// build with status
+ eventBuilder.status = JobStateEventBuilder.Status.FAILED;
+ event = eventBuilder.build();
+ metadata = event.getMetadata();
+ Assert.assertEquals(metadata.size(), 3);
+ Assert.assertEquals(metadata.get("jobState"), "FAILED");
+
+ // Test parse from GobblinTrackingEvent
+ JobStateEventBuilder parsedEvent = JobStateEventBuilder.fromEvent(event);
+ Assert.assertEquals(parsedEvent.status,
JobStateEventBuilder.Status.FAILED);
+ Assert.assertEquals(parsedEvent.jobTrackingURL, jobUrl);
+ Assert.assertEquals(parsedEvent.getMetadata().size(), 1);
+ }
+
+ @Test
+ public void testEntityMissingEvent() {
+ // Test build EntityMissingEvent
+ String instance = "mytopic";
+ String eventClass = "TopicMissing";
+ EntityMissingEventBuilder eventBuilder = new
EntityMissingEventBuilder(eventClass, instance);
+ GobblinTrackingEvent event = eventBuilder.build();
+ Assert.assertEquals(event.getName(), eventClass);
+ Assert.assertNull(event.getNamespace());
+ Map<String, String> metadata = event.getMetadata();
+ Assert.assertEquals(metadata.size(), 2);
+ Assert.assertEquals(metadata.get("eventType"), "EntityMissingEvent");
+ Assert.assertEquals(metadata.get("entityInstance"), instance);
+
+ // Test parse from GobblinTrackingEvent
+ Assert.assertNull(JobStateEventBuilder.fromEvent(event));
+ EntityMissingEventBuilder parsedEvent =
EntityMissingEventBuilder.fromEvent(event);
+ Assert.assertEquals(parsedEvent.getName(), eventClass);
+ Assert.assertEquals(parsedEvent.getInstance(), instance);
+ Assert.assertEquals(parsedEvent.getMetadata().size(), 1);
+ }
+}
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java
index 529fe11..0a2a284 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java
@@ -29,6 +29,7 @@ import java.util.concurrent.TimeoutException;
import org.apache.gobblin.fsm.FiniteStateMachine;
import org.apache.gobblin.fsm.StateWithCallbacks;
+import org.apache.gobblin.metrics.event.JobStateEventBuilder;
import org.apache.gobblin.runtime.job.GobblinJobFiniteStateMachine;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
@@ -43,6 +44,7 @@ import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.CounterGroup;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobStatus;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.filecache.DistributedCache;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
@@ -301,6 +303,14 @@ public class MRJobLauncher extends AbstractJobLauncher {
// The metrics set is to be persisted to the metrics store later.
countersToMetrics(JobMetrics.get(jobName,
this.jobProps.getProperty(ConfigurationKeys.JOB_ID_KEY)));
} finally {
+ JobStateEventBuilder eventBuilder = new
JobStateEventBuilder(JobStateEventBuilder.MRJobState.MR_JOB_STATE);
+ eventBuilder.jobTrackingURL = this.job.getTrackingURL();
+ eventBuilder.status = JobStateEventBuilder.Status.SUCCEEDED;
+ if (this.job.getJobState() != JobStatus.State.SUCCEEDED) {
+ eventBuilder.status = JobStateEventBuilder.Status.FAILED;
+ }
+ this.eventSubmitter.submit(eventBuilder);
+
// The last iteration of output TaskState collecting will run when the
collector service gets stopped
this.taskStateCollectorService.stopAsync().awaitTerminated();
cleanUpWorkingDirectory();