This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 5a2fb0d  [GOBBLIN-1387] add function for jobcontext to access 
datasetstate failures
5a2fb0d is described below

commit 5a2fb0d1453ab425c5c379325d680d085372482a
Author: William Lo <[email protected]>
AuthorDate: Thu Feb 11 09:18:32 2021 -0800

    [GOBBLIN-1387] add function for jobcontext to access datasetstate failures
    
    Closes #3225 from Will-Lo/fix-bug-email-failures
---
 .../org/apache/gobblin/runtime/JobContext.java     | 15 ++++
 .../listeners/EmailNotificationJobListener.java    |  2 +-
 .../org/apache/gobblin/runtime/JobContextTest.java | 80 ++++++++++++++++++++++
 3 files changed, 96 insertions(+), 1 deletion(-)

diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobContext.java 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobContext.java
index fd70141..632de28 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobContext.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobContext.java
@@ -23,6 +23,7 @@ import java.net.URI;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 
@@ -547,4 +548,18 @@ public class JobContext implements Closeable {
     return 
Objects.toStringHelper(JobContext.class.getSimpleName()).add("jobName", 
getJobName())
         .add("jobId", getJobId()).add("jobState", getJobState()).toString();
   }
+
+  /**
+   * Get all of the failures from the datasetStates stored in the jobContext 
to determine if
+   * email notification should be sent or not. Previously job context only 
looked at jobStates, where
+   * failures from datasetStates were not propagated from
+   * Failures are tracked using {@link ConfigurationKeys#JOB_FAILURES_KEY}
+   */
+  public int getDatasetStateFailures() {
+    int totalFailures = 0;
+    for (Map.Entry<String, JobState.DatasetState> datasetState: 
this.getDatasetStatesByUrns().entrySet()) {
+      totalFailures += datasetState.getValue().getJobFailures();
+    }
+    return totalFailures;
+  }
 }
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/listeners/EmailNotificationJobListener.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/listeners/EmailNotificationJobListener.java
index feb0d92..02aa4a2 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/listeners/EmailNotificationJobListener.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/listeners/EmailNotificationJobListener.java
@@ -48,7 +48,7 @@ public class EmailNotificationJobListener extends 
AbstractJobListener {
 
     // Send out alert email if the maximum number of consecutive failures is 
reached
     if (jobState.getState() == JobState.RunningState.FAILED) {
-      int failures = jobState.getPropAsInt(ConfigurationKeys.JOB_FAILURES_KEY, 
0);
+      int failures = jobState.getPropAsInt(ConfigurationKeys.JOB_FAILURES_KEY, 
0) + jobContext.getDatasetStateFailures();
       int maxFailures =
           jobState.getPropAsInt(ConfigurationKeys.JOB_MAX_FAILURES_KEY, 
ConfigurationKeys.DEFAULT_JOB_MAX_FAILURES);
       if (alertEmailEnabled && failures >= maxFailures) {
diff --git 
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/JobContextTest.java 
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/JobContextTest.java
index a21a0d8..b149cb8 100644
--- 
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/JobContextTest.java
+++ 
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/JobContextTest.java
@@ -264,6 +264,86 @@ public class JobContextTest {
     Assert.assertFalse(fs.exists(currentJobPath));
   }
 
+  @Test
+  public void testDatasetStateFailure() throws Exception {
+    Properties jobProps = new Properties();
+
+    jobProps.setProperty(ConfigurationKeys.JOB_NAME_KEY, "test");
+    jobProps.setProperty(ConfigurationKeys.JOB_ID_KEY, "job_id_12345");
+    jobProps.setProperty(ConfigurationKeys.METRICS_ENABLED_KEY, "false");
+
+    Map<String, JobState.DatasetState> datasetStateMap = Maps.newHashMap();
+    JobState.DatasetState failingDatasetState = new 
JobState.DatasetState("DatasetState", "DatasetState-1");
+    // mark dataset state as a failing job
+    failingDatasetState.incrementJobFailures();
+    JobState.DatasetState failingDatasetState2 = new 
JobState.DatasetState("DatasetState2", "DatasetState-2");
+    failingDatasetState2.incrementJobFailures();
+    failingDatasetState2.incrementJobFailures();
+
+    datasetStateMap.put("0", failingDatasetState);
+    datasetStateMap.put("1", failingDatasetState2);
+
+    final BlockingQueue<ControllableCallable<Void>> callables = 
Queues.newLinkedBlockingQueue();
+
+    JobContext jobContext = new ControllableCommitJobContext(jobProps, log, 
datasetStateMap, new Predicate<String>() {
+      @Override
+      public boolean apply(@Nullable String input) {
+        return !input.equals("1");
+      }
+    }, callables);
+
+    ExecutorService executorService = Executors.newSingleThreadExecutor();
+    Future future = executorService.submit(new Runnable() {
+      @Override
+      public void run() {
+        try {
+          jobContext.commit();
+        } catch (IOException ioe) {
+          throw new RuntimeException(ioe);
+        }
+      }
+    });
+    callables.poll(1, TimeUnit.SECONDS).unblock();
+    callables.poll(1, TimeUnit.SECONDS).unblock();
+
+    // when checking the number of failures, this should detect the failing 
dataset state
+    Assert.assertEquals(jobContext.getDatasetStateFailures(), 3);
+  }
+
+  @Test
+  public void testNoDatasetStates() throws Exception {
+    Properties jobProps = new Properties();
+
+    jobProps.setProperty(ConfigurationKeys.JOB_NAME_KEY, "test");
+    jobProps.setProperty(ConfigurationKeys.JOB_ID_KEY, "job_id_12345");
+    jobProps.setProperty(ConfigurationKeys.METRICS_ENABLED_KEY, "false");
+
+    Map<String, JobState.DatasetState> datasetStateMap = Maps.newHashMap();
+
+    final BlockingQueue<ControllableCallable<Void>> callables = 
Queues.newLinkedBlockingQueue();
+
+    JobContext jobContext = new ControllableCommitJobContext(jobProps, log, 
datasetStateMap, new Predicate<String>() {
+      @Override
+      public boolean apply(@Nullable String input) {
+        return !input.equals("1");
+      }
+    }, callables);
+
+    ExecutorService executorService = Executors.newSingleThreadExecutor();
+    Future future = executorService.submit(new Runnable() {
+      @Override
+      public void run() {
+        try {
+          jobContext.commit();
+        } catch (IOException ioe) {
+          throw new RuntimeException(ioe);
+        }
+      }
+    });
+    // when checking the number of failures, this should detect the failing 
dataset state
+    Assert.assertEquals(jobContext.getDatasetStateFailures(), 0);
+  }
+
   /**
    * A {@link Callable} that blocks until a different thread calls {@link 
#unblock()}.
    */

Reply via email to