This is an automated email from the ASF dual-hosted git repository.

wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new c6d6c1ba1 [GOBBLIN-1747] add job.name and job.id to kafka and 
compaction workunits (#3607)
c6d6c1ba1 is described below

commit c6d6c1ba10357d4689c0d5ba1e29846e613deaaf
Author: Arjun Singh Bora <[email protected]>
AuthorDate: Wed Nov 23 17:46:29 2022 -0600

    [GOBBLIN-1747] add job.name and job.id to kafka and compaction workunits 
(#3607)
    
    * add job.name and job.id to kafka workunits
    
    * fix unit test
    
    * add job.name and job.id to compaction source workunits also
    
    Co-authored-by: umustafi <[email protected]>
---
 .../gobblin/compaction/source/CompactionSource.java       | 15 +++++++++++----
 .../source/extractor/extract/kafka/KafkaSource.java       |  6 ++++++
 2 files changed, 17 insertions(+), 4 deletions(-)

diff --git 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/source/CompactionSource.java
 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/source/CompactionSource.java
index 824d7b541..ab9f2f0a6 100644
--- 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/source/CompactionSource.java
+++ 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/source/CompactionSource.java
@@ -165,7 +165,7 @@ public class CompactionSource implements 
WorkUnitStreamSource<String, String> {
                   Iterators.transform (datasets.iterator(), new 
Function<Dataset, Callable<VerifiedDataset>>() {
                     @Override
                     public Callable<VerifiedDataset> apply(Dataset dataset) {
-                      return new DatasetVerifier (dataset, workUnitIterator, 
suite.getDatasetsFinderVerifiers());
+                      return new DatasetVerifier (dataset, workUnitIterator, 
suite.getDatasetsFinderVerifiers(), state);
                     }
                   });
 
@@ -312,6 +312,7 @@ public class CompactionSource implements 
WorkUnitStreamSource<String, String> {
     private Dataset dataset;
     private CompactionWorkUnitIterator workUnitIterator;
     private List<CompactionVerifier> verifiers;
+    private State state;
 
     /**
      * {@link VerifiedDataset} wraps original {@link Dataset} because if 
verification failed, we are able get original
@@ -321,7 +322,7 @@ public class CompactionSource implements 
WorkUnitStreamSource<String, String> {
       try {
         VerifiedResult result = this.verify(dataset);
         if (result.allVerificationPassed) {
-          this.workUnitIterator.addWorkUnit(createWorkUnit(dataset));
+          this.workUnitIterator.addWorkUnit(createWorkUnit(dataset, state));
         }
         return new VerifiedDataset(dataset, result);
       } catch (Exception e) {
@@ -423,11 +424,17 @@ public class CompactionSource implements 
WorkUnitStreamSource<String, String> {
     }
   }
 
-  protected WorkUnit createWorkUnit(Dataset dataset) throws IOException {
-    WorkUnit workUnit = new WorkUnit();
+  protected WorkUnit createWorkUnit(Dataset dataset, State state) throws 
IOException {
+    WorkUnit workUnit = WorkUnit.createEmpty();
     TaskUtils.setTaskFactoryClass(workUnit, MRCompactionTaskFactory.class);
     suite.save(dataset, workUnit);
     workUnit.setProp(ConfigurationKeys.DATASET_URN_KEY, dataset.getUrn());
+    if (state.contains(ConfigurationKeys.JOB_NAME_KEY)) {
+      workUnit.setProp(ConfigurationKeys.JOB_NAME_KEY, 
state.getProp(ConfigurationKeys.JOB_NAME_KEY));
+    }
+    if (state.contains(ConfigurationKeys.JOB_ID_KEY)) {
+      workUnit.setProp(ConfigurationKeys.JOB_ID_KEY, 
state.getProp(ConfigurationKeys.JOB_ID_KEY));
+    }
     return workUnit;
   }
 
diff --git 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
index fa98350a5..4454559e3 100644
--- 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
+++ 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
@@ -583,6 +583,12 @@ public abstract class KafkaSource<S, D> extends 
EventBasedSource<S, D> {
       workUnit.setProp(KafkaSource.RECORD_CREATION_TIMESTAMP_FIELD, 
state.getProp(KafkaSource.RECORD_CREATION_TIMESTAMP_FIELD));
       workUnit.setProp(KafkaSource.RECORD_CREATION_TIMESTAMP_UNIT, 
state.getProp(KafkaSource.RECORD_CREATION_TIMESTAMP_UNIT, 
TimeUnit.MILLISECONDS.name()));
     }
+    if (state.contains(ConfigurationKeys.JOB_NAME_KEY)) {
+      workUnit.setProp(ConfigurationKeys.JOB_NAME_KEY, 
state.getProp(ConfigurationKeys.JOB_NAME_KEY));
+    }
+    if (state.contains(ConfigurationKeys.JOB_ID_KEY)) {
+      workUnit.setProp(ConfigurationKeys.JOB_ID_KEY, 
state.getProp(ConfigurationKeys.JOB_ID_KEY));
+    }
   }
 
   private long getPreviousStartFetchEpochTimeForPartition(KafkaPartition 
partition, SourceState state) {

Reply via email to