This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 7c2a402 [GOBBLIN-1268] track WORK_UNITS_PREPARATION timer event also
in gaas
7c2a402 is described below
commit 7c2a40230368fbd1d604f212b70ef36b07ce004f
Author: Arjun <[email protected]>
AuthorDate: Tue Sep 22 15:51:19 2020 -0700
[GOBBLIN-1268] track WORK_UNITS_PREPARATION timer event also in gaas
Closes #3108 from arjun4084346/trackWU
---
.../gobblin/runtime/AbstractJobLauncher.java | 8 +++----
.../gobblin/runtime/JobLauncherTestHelper.java | 22 +++++++++++++++++--
.../runtime/mapreduce/MRJobLauncherTest.java | 25 ++++++++++++++++++++++
.../java/org/apache/gobblin/test/TestSource.java | 8 +++++++
.../monitoring/KafkaAvroJobStatusMonitor.java | 1 +
5 files changed, 58 insertions(+), 6 deletions(-)
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
index 595864b..242daf8 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
@@ -446,10 +446,6 @@ public abstract class AbstractJobLauncher implements
JobLauncher {
return;
}
- // If it is a streaming source, workunits cannot be counted
- this.jobContext.getJobState().setProp(NUM_WORKUNITS,
- workUnitStream.isSafeToMaterialize() ?
workUnitStream.getMaterializedWorkUnitCollection().size() : 0);
-
//Initialize writer and converter(s)
closer.register(WriterInitializerFactory.newInstace(jobState,
workUnitStream)).initialize();
closer.register(ConverterInitializerFactory.newInstance(jobState,
workUnitStream)).initialize();
@@ -493,6 +489,10 @@ public abstract class AbstractJobLauncher implements
JobLauncher {
}
});
+ // If it is a streaming source, workunits cannot be counted
+ this.jobContext.getJobState().setProp(NUM_WORKUNITS,
+ workUnitStream.isSafeToMaterialize() ?
workUnitStream.getMaterializedWorkUnitCollection().size() : 0);
+
// dump the work unit if tracking logs are enabled
if
(jobState.getPropAsBoolean(ConfigurationKeys.WORK_UNIT_ENABLE_TRACKING_LOGS)) {
workUnitStream = workUnitStream.transform(new Function<WorkUnit,
WorkUnit>() {
diff --git
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/JobLauncherTestHelper.java
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/JobLauncherTestHelper.java
index 89249b3..fff6c77 100644
---
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/JobLauncherTestHelper.java
+++
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/JobLauncherTestHelper.java
@@ -63,7 +63,7 @@ public class JobLauncherTestHelper {
this.datasetStateStore = datasetStateStore;
}
- public void runTest(Properties jobProps) throws Exception {
+ public JobContext runTest(Properties jobProps) throws Exception {
String jobName = jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY);
String jobId = JobLauncherUtils.newJobId(jobName);
jobProps.setProperty(ConfigurationKeys.JOB_ID_KEY, jobId);
@@ -87,10 +87,17 @@ public class JobLauncherTestHelper {
DatasetState datasetState = datasetStateList.get(0);
Assert.assertEquals(datasetState.getState(),
JobState.RunningState.COMMITTED);
- Assert.assertEquals(datasetState.getCompletedTasks(), 4);
Assert.assertEquals(datasetState.getJobFailures(), 0);
+ int skippedWorkunits = 0;
+
for (TaskState taskState : datasetState.getTaskStates()) {
+ if
(Boolean.parseBoolean(jobProps.getProperty(ConfigurationKeys.WORK_UNIT_SKIP_KEY,
Boolean.FALSE.toString()))
+ && taskState.getWorkingState() ==
WorkUnitState.WorkingState.SKIPPED) {
+ skippedWorkunits++;
+ continue;
+ }
+
Assert.assertEquals(taskState.getWorkingState(),
WorkUnitState.WorkingState.COMMITTED);
Assert.assertEquals(taskState.getPropAsLong(ConfigurationKeys.WRITER_RECORDS_WRITTEN),
TestExtractor.TOTAL_RECORDS);
@@ -115,6 +122,17 @@ public class JobLauncherTestHelper {
Assert.assertTrue(Long.valueOf(m.group(2)) < currentTime);
}
}
+
+ if
(Boolean.parseBoolean(jobProps.getProperty(ConfigurationKeys.WORK_UNIT_SKIP_KEY,
+ Boolean.FALSE.toString()))) {
+ Assert.assertEquals(skippedWorkunits, 2);
+ Assert.assertEquals(datasetState.getCompletedTasks(), 2);
+ } else {
+ Assert.assertEquals(skippedWorkunits, 0);
+ Assert.assertEquals(datasetState.getCompletedTasks(), 4);
+ }
+
+ return jobContext;
}
public void runTestWithPullLimit(Properties jobProps, long limit) throws
Exception {
diff --git
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncherTest.java
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncherTest.java
index e8e996e..11d4df9 100644
---
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncherTest.java
+++
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncherTest.java
@@ -51,6 +51,8 @@ import
org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
import org.apache.gobblin.metrics.GobblinMetrics;
import org.apache.gobblin.publisher.DataPublisher;
+import org.apache.gobblin.runtime.AbstractJobLauncher;
+import org.apache.gobblin.runtime.JobContext;
import org.apache.gobblin.runtime.JobLauncherTestHelper;
import org.apache.gobblin.runtime.JobState;
import org.apache.gobblin.util.limiter.BaseLimiterType;
@@ -116,6 +118,29 @@ public class MRJobLauncherTest extends BMNGRunner {
}
@Test
+ public void testNumOfWorkunits() throws Exception {
+ Properties jobProps = loadJobProps();
+ JobContext jobContext;
+ jobProps.setProperty(ConfigurationKeys.JOB_NAME_KEY,
+ jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY) +
"-testNumOfWorkunits");
+ try {
+ jobContext = this.jobLauncherTestHelper.runTest(jobProps);
+ } finally {
+
this.jobLauncherTestHelper.deleteStateStore(jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY));
+ }
+
Assert.assertEquals(jobContext.getJobState().getPropAsInt(AbstractJobLauncher.NUM_WORKUNITS),
4);
+
+ jobProps.setProperty(ConfigurationKeys.WORK_UNIT_SKIP_KEY,
Boolean.TRUE.toString());
+ try {
+ jobContext = this.jobLauncherTestHelper.runTest(jobProps);
+ } finally {
+
this.jobLauncherTestHelper.deleteStateStore(jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY));
+ }
+
+
Assert.assertEquals(jobContext.getJobState().getPropAsInt(AbstractJobLauncher.NUM_WORKUNITS),
2);
+ }
+
+ @Test
public void testLaunchJobWithConcurrencyLimit() throws Exception {
final Logger log = LoggerFactory.getLogger(getClass().getName() +
".testLaunchJobWithConcurrencyLimit");
log.info("in");
diff --git
a/gobblin-runtime/src/test/java/org/apache/gobblin/test/TestSource.java
b/gobblin-runtime/src/test/java/org/apache/gobblin/test/TestSource.java
index aa1b548..35498dd 100644
--- a/gobblin-runtime/src/test/java/org/apache/gobblin/test/TestSource.java
+++ b/gobblin-runtime/src/test/java/org/apache/gobblin/test/TestSource.java
@@ -61,6 +61,14 @@ public class TestSource extends AbstractSource<String,
String> {
workUnits.add(workUnit);
}
+ if (state.getPropAsBoolean(ConfigurationKeys.WORK_UNIT_SKIP_KEY, false)) {
+ for (int i = 0; i < list.size(); i++) {
+ if (i % 2 == 0) {
+ workUnits.get(i).setProp(ConfigurationKeys.WORK_UNIT_SKIP_KEY, true);
+ }
+ }
+ }
+
if (state.getPropAsBoolean("use.multiworkunit", false)) {
MultiWorkUnit multiWorkUnit = MultiWorkUnit.createEmpty();
multiWorkUnit.addWorkUnits(workUnits);
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java
index e4ead68..a4250d5 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java
@@ -132,6 +132,7 @@ public class KafkaAvroJobStatusMonitor extends
KafkaJobStatusMonitor {
properties.put(JobStatusRetriever.EVENT_NAME_FIELD,
ExecutionStatus.ORCHESTRATED.name());
properties.put(TimingEvent.JOB_ORCHESTRATED_TIME,
properties.getProperty(TimingEvent.METADATA_END_TIME));
break;
+ case TimingEvent.LauncherTimings.WORK_UNITS_PREPARATION:
case TimingEvent.LauncherTimings.JOB_PREPARE:
case TimingEvent.LauncherTimings.JOB_START:
properties.put(JobStatusRetriever.EVENT_NAME_FIELD,
ExecutionStatus.RUNNING.name());