This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 7c2a402  [GOBBLIN-1268] track WORK_UNITS_PREPARATION timer event also 
in gaas
7c2a402 is described below

commit 7c2a40230368fbd1d604f212b70ef36b07ce004f
Author: Arjun <[email protected]>
AuthorDate: Tue Sep 22 15:51:19 2020 -0700

    [GOBBLIN-1268] track WORK_UNITS_PREPARATION timer event also in gaas
    
    Closes #3108 from arjun4084346/trackWU
---
 .../gobblin/runtime/AbstractJobLauncher.java       |  8 +++----
 .../gobblin/runtime/JobLauncherTestHelper.java     | 22 +++++++++++++++++--
 .../runtime/mapreduce/MRJobLauncherTest.java       | 25 ++++++++++++++++++++++
 .../java/org/apache/gobblin/test/TestSource.java   |  8 +++++++
 .../monitoring/KafkaAvroJobStatusMonitor.java      |  1 +
 5 files changed, 58 insertions(+), 6 deletions(-)

diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
index 595864b..242daf8 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
@@ -446,10 +446,6 @@ public abstract class AbstractJobLauncher implements 
JobLauncher {
           return;
         }
 
-        // If it is a streaming source, workunits cannot be counted
-        this.jobContext.getJobState().setProp(NUM_WORKUNITS,
-            workUnitStream.isSafeToMaterialize() ? 
workUnitStream.getMaterializedWorkUnitCollection().size() : 0);
-
         //Initialize writer and converter(s)
         closer.register(WriterInitializerFactory.newInstace(jobState, 
workUnitStream)).initialize();
         closer.register(ConverterInitializerFactory.newInstance(jobState, 
workUnitStream)).initialize();
@@ -493,6 +489,10 @@ public abstract class AbstractJobLauncher implements 
JobLauncher {
             }
           });
 
+          // If it is a streaming source, workunits cannot be counted
+          this.jobContext.getJobState().setProp(NUM_WORKUNITS,
+              workUnitStream.isSafeToMaterialize() ? 
workUnitStream.getMaterializedWorkUnitCollection().size() : 0);
+
           // dump the work unit if tracking logs are enabled
           if 
(jobState.getPropAsBoolean(ConfigurationKeys.WORK_UNIT_ENABLE_TRACKING_LOGS)) {
             workUnitStream = workUnitStream.transform(new Function<WorkUnit, 
WorkUnit>() {
diff --git 
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/JobLauncherTestHelper.java
 
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/JobLauncherTestHelper.java
index 89249b3..fff6c77 100644
--- 
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/JobLauncherTestHelper.java
+++ 
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/JobLauncherTestHelper.java
@@ -63,7 +63,7 @@ public class JobLauncherTestHelper {
     this.datasetStateStore = datasetStateStore;
   }
 
-  public void runTest(Properties jobProps) throws Exception {
+  public JobContext runTest(Properties jobProps) throws Exception {
     String jobName = jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY);
     String jobId = JobLauncherUtils.newJobId(jobName);
     jobProps.setProperty(ConfigurationKeys.JOB_ID_KEY, jobId);
@@ -87,10 +87,17 @@ public class JobLauncherTestHelper {
     DatasetState datasetState = datasetStateList.get(0);
 
     Assert.assertEquals(datasetState.getState(), 
JobState.RunningState.COMMITTED);
-    Assert.assertEquals(datasetState.getCompletedTasks(), 4);
     Assert.assertEquals(datasetState.getJobFailures(), 0);
 
+    int skippedWorkunits = 0;
+
     for (TaskState taskState : datasetState.getTaskStates()) {
+      if 
(Boolean.parseBoolean(jobProps.getProperty(ConfigurationKeys.WORK_UNIT_SKIP_KEY,
 Boolean.FALSE.toString()))
+          && taskState.getWorkingState() == 
WorkUnitState.WorkingState.SKIPPED) {
+        skippedWorkunits++;
+        continue;
+      }
+
       Assert.assertEquals(taskState.getWorkingState(), 
WorkUnitState.WorkingState.COMMITTED);
       
Assert.assertEquals(taskState.getPropAsLong(ConfigurationKeys.WRITER_RECORDS_WRITTEN),
           TestExtractor.TOTAL_RECORDS);
@@ -115,6 +122,17 @@ public class JobLauncherTestHelper {
         Assert.assertTrue(Long.valueOf(m.group(2)) < currentTime);
       }
     }
+
+    if 
(Boolean.parseBoolean(jobProps.getProperty(ConfigurationKeys.WORK_UNIT_SKIP_KEY,
+        Boolean.FALSE.toString()))) {
+      Assert.assertEquals(skippedWorkunits, 2);
+      Assert.assertEquals(datasetState.getCompletedTasks(), 2);
+    } else {
+      Assert.assertEquals(skippedWorkunits, 0);
+      Assert.assertEquals(datasetState.getCompletedTasks(), 4);
+    }
+
+    return jobContext;
   }
 
   public void runTestWithPullLimit(Properties jobProps, long limit) throws 
Exception {
diff --git 
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncherTest.java
 
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncherTest.java
index e8e996e..11d4df9 100644
--- 
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncherTest.java
+++ 
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncherTest.java
@@ -51,6 +51,8 @@ import 
org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
 import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
 import org.apache.gobblin.metrics.GobblinMetrics;
 import org.apache.gobblin.publisher.DataPublisher;
+import org.apache.gobblin.runtime.AbstractJobLauncher;
+import org.apache.gobblin.runtime.JobContext;
 import org.apache.gobblin.runtime.JobLauncherTestHelper;
 import org.apache.gobblin.runtime.JobState;
 import org.apache.gobblin.util.limiter.BaseLimiterType;
@@ -116,6 +118,29 @@ public class MRJobLauncherTest extends BMNGRunner {
   }
 
   @Test
+  public void testNumOfWorkunits() throws Exception {
+    Properties jobProps = loadJobProps();
+    JobContext jobContext;
+    jobProps.setProperty(ConfigurationKeys.JOB_NAME_KEY,
+        jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY) + 
"-testNumOfWorkunits");
+    try {
+      jobContext = this.jobLauncherTestHelper.runTest(jobProps);
+    } finally {
+      
this.jobLauncherTestHelper.deleteStateStore(jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY));
+    }
+    
Assert.assertEquals(jobContext.getJobState().getPropAsInt(AbstractJobLauncher.NUM_WORKUNITS),
 4);
+
+    jobProps.setProperty(ConfigurationKeys.WORK_UNIT_SKIP_KEY, 
Boolean.TRUE.toString());
+    try {
+      jobContext = this.jobLauncherTestHelper.runTest(jobProps);
+    } finally {
+      
this.jobLauncherTestHelper.deleteStateStore(jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY));
+    }
+
+    
Assert.assertEquals(jobContext.getJobState().getPropAsInt(AbstractJobLauncher.NUM_WORKUNITS),
 2);
+  }
+
+  @Test
   public void testLaunchJobWithConcurrencyLimit() throws Exception {
     final Logger log = LoggerFactory.getLogger(getClass().getName() + 
".testLaunchJobWithConcurrencyLimit");
     log.info("in");
diff --git 
a/gobblin-runtime/src/test/java/org/apache/gobblin/test/TestSource.java 
b/gobblin-runtime/src/test/java/org/apache/gobblin/test/TestSource.java
index aa1b548..35498dd 100644
--- a/gobblin-runtime/src/test/java/org/apache/gobblin/test/TestSource.java
+++ b/gobblin-runtime/src/test/java/org/apache/gobblin/test/TestSource.java
@@ -61,6 +61,14 @@ public class TestSource extends AbstractSource<String, 
String> {
       workUnits.add(workUnit);
     }
 
+    if (state.getPropAsBoolean(ConfigurationKeys.WORK_UNIT_SKIP_KEY, false)) {
+      for (int i = 0; i < list.size(); i++) {
+        if (i % 2 == 0) {
+          workUnits.get(i).setProp(ConfigurationKeys.WORK_UNIT_SKIP_KEY, true);
+        }
+      }
+    }
+
     if (state.getPropAsBoolean("use.multiworkunit", false)) {
       MultiWorkUnit multiWorkUnit = MultiWorkUnit.createEmpty();
       multiWorkUnit.addWorkUnits(workUnits);
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java
index e4ead68..a4250d5 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java
@@ -132,6 +132,7 @@ public class KafkaAvroJobStatusMonitor extends 
KafkaJobStatusMonitor {
         properties.put(JobStatusRetriever.EVENT_NAME_FIELD, 
ExecutionStatus.ORCHESTRATED.name());
         properties.put(TimingEvent.JOB_ORCHESTRATED_TIME, 
properties.getProperty(TimingEvent.METADATA_END_TIME));
         break;
+      case TimingEvent.LauncherTimings.WORK_UNITS_PREPARATION:
       case TimingEvent.LauncherTimings.JOB_PREPARE:
       case TimingEvent.LauncherTimings.JOB_START:
         properties.put(JobStatusRetriever.EVENT_NAME_FIELD, 
ExecutionStatus.RUNNING.name());

Reply via email to