This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 5a2fb0d [GOBBLIN-1387] add function for jobcontext to access
datasetstate failures
5a2fb0d is described below
commit 5a2fb0d1453ab425c5c379325d680d085372482a
Author: William Lo <[email protected]>
AuthorDate: Thu Feb 11 09:18:32 2021 -0800
[GOBBLIN-1387] add function for jobcontext to access datasetstate failures
Closes #3225 from Will-Lo/fix-bug-email-failures
---
.../org/apache/gobblin/runtime/JobContext.java | 15 ++++
.../listeners/EmailNotificationJobListener.java | 2 +-
.../org/apache/gobblin/runtime/JobContextTest.java | 80 ++++++++++++++++++++++
3 files changed, 96 insertions(+), 1 deletion(-)
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobContext.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobContext.java
index fd70141..632de28 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobContext.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobContext.java
@@ -23,6 +23,7 @@ import java.net.URI;
import java.util.List;
import java.util.Map;
import java.util.Properties;
+import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
@@ -547,4 +548,18 @@ public class JobContext implements Closeable {
return
Objects.toStringHelper(JobContext.class.getSimpleName()).add("jobName",
getJobName())
.add("jobId", getJobId()).add("jobState", getJobState()).toString();
}
+
+ /**
+ * Get all of the failures from the datasetStates stored in the jobContext
to determine if
+ * email notification should be sent or not. Previously job context only
looked at jobStates, where
+ * failures from datasetStates were not propagated from
+ * Failures are tracked using {@link ConfigurationKeys#JOB_FAILURES_KEY}
+ */
+ public int getDatasetStateFailures() {
+ int totalFailures = 0;
+ for (Map.Entry<String, JobState.DatasetState> datasetState:
this.getDatasetStatesByUrns().entrySet()) {
+ totalFailures += datasetState.getValue().getJobFailures();
+ }
+ return totalFailures;
+ }
}
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/listeners/EmailNotificationJobListener.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/listeners/EmailNotificationJobListener.java
index feb0d92..02aa4a2 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/listeners/EmailNotificationJobListener.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/listeners/EmailNotificationJobListener.java
@@ -48,7 +48,7 @@ public class EmailNotificationJobListener extends
AbstractJobListener {
// Send out alert email if the maximum number of consecutive failures is
reached
if (jobState.getState() == JobState.RunningState.FAILED) {
- int failures = jobState.getPropAsInt(ConfigurationKeys.JOB_FAILURES_KEY,
0);
+ int failures = jobState.getPropAsInt(ConfigurationKeys.JOB_FAILURES_KEY,
0) + jobContext.getDatasetStateFailures();
int maxFailures =
jobState.getPropAsInt(ConfigurationKeys.JOB_MAX_FAILURES_KEY,
ConfigurationKeys.DEFAULT_JOB_MAX_FAILURES);
if (alertEmailEnabled && failures >= maxFailures) {
diff --git
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/JobContextTest.java
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/JobContextTest.java
index a21a0d8..b149cb8 100644
---
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/JobContextTest.java
+++
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/JobContextTest.java
@@ -264,6 +264,86 @@ public class JobContextTest {
Assert.assertFalse(fs.exists(currentJobPath));
}
+ @Test
+ public void testDatasetStateFailure() throws Exception {
+ Properties jobProps = new Properties();
+
+ jobProps.setProperty(ConfigurationKeys.JOB_NAME_KEY, "test");
+ jobProps.setProperty(ConfigurationKeys.JOB_ID_KEY, "job_id_12345");
+ jobProps.setProperty(ConfigurationKeys.METRICS_ENABLED_KEY, "false");
+
+ Map<String, JobState.DatasetState> datasetStateMap = Maps.newHashMap();
+ JobState.DatasetState failingDatasetState = new
JobState.DatasetState("DatasetState", "DatasetState-1");
+ // mark dataset state as a failing job
+ failingDatasetState.incrementJobFailures();
+ JobState.DatasetState failingDatasetState2 = new
JobState.DatasetState("DatasetState2", "DatasetState-2");
+ failingDatasetState2.incrementJobFailures();
+ failingDatasetState2.incrementJobFailures();
+
+ datasetStateMap.put("0", failingDatasetState);
+ datasetStateMap.put("1", failingDatasetState2);
+
+ final BlockingQueue<ControllableCallable<Void>> callables =
Queues.newLinkedBlockingQueue();
+
+ JobContext jobContext = new ControllableCommitJobContext(jobProps, log,
datasetStateMap, new Predicate<String>() {
+ @Override
+ public boolean apply(@Nullable String input) {
+ return !input.equals("1");
+ }
+ }, callables);
+
+ ExecutorService executorService = Executors.newSingleThreadExecutor();
+ Future future = executorService.submit(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ jobContext.commit();
+ } catch (IOException ioe) {
+ throw new RuntimeException(ioe);
+ }
+ }
+ });
+ callables.poll(1, TimeUnit.SECONDS).unblock();
+ callables.poll(1, TimeUnit.SECONDS).unblock();
+
+ // when checking the number of failures, this should detect the failing
dataset state
+ Assert.assertEquals(jobContext.getDatasetStateFailures(), 3);
+ }
+
+ @Test
+ public void testNoDatasetStates() throws Exception {
+ Properties jobProps = new Properties();
+
+ jobProps.setProperty(ConfigurationKeys.JOB_NAME_KEY, "test");
+ jobProps.setProperty(ConfigurationKeys.JOB_ID_KEY, "job_id_12345");
+ jobProps.setProperty(ConfigurationKeys.METRICS_ENABLED_KEY, "false");
+
+ Map<String, JobState.DatasetState> datasetStateMap = Maps.newHashMap();
+
+ final BlockingQueue<ControllableCallable<Void>> callables =
Queues.newLinkedBlockingQueue();
+
+ JobContext jobContext = new ControllableCommitJobContext(jobProps, log,
datasetStateMap, new Predicate<String>() {
+ @Override
+ public boolean apply(@Nullable String input) {
+ return !input.equals("1");
+ }
+ }, callables);
+
+ ExecutorService executorService = Executors.newSingleThreadExecutor();
+ Future future = executorService.submit(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ jobContext.commit();
+ } catch (IOException ioe) {
+ throw new RuntimeException(ioe);
+ }
+ }
+ });
+ // when checking the number of failures, this should detect the failing
dataset state
+ Assert.assertEquals(jobContext.getDatasetStateFailures(), 0);
+ }
+
/**
* A {@link Callable} that blocks until a different thread calls {@link
#unblock()}.
*/