This is an automated email from the ASF dual-hosted git repository.
wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new c6d6c1ba1 [GOBBLIN-1747] add job.name and job.id to kafka and
compaction workunits (#3607)
c6d6c1ba1 is described below
commit c6d6c1ba10357d4689c0d5ba1e29846e613deaaf
Author: Arjun Singh Bora <[email protected]>
AuthorDate: Wed Nov 23 17:46:29 2022 -0600
[GOBBLIN-1747] add job.name and job.id to kafka and compaction workunits
(#3607)
* add job.name and job.id to kafka workunits
* fix unit test
* add job.name and job.id to compaction source workunits also
Co-authored-by: umustafi <[email protected]>
---
.../gobblin/compaction/source/CompactionSource.java | 15 +++++++++++----
.../source/extractor/extract/kafka/KafkaSource.java | 6 ++++++
2 files changed, 17 insertions(+), 4 deletions(-)
diff --git
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/source/CompactionSource.java
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/source/CompactionSource.java
index 824d7b541..ab9f2f0a6 100644
---
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/source/CompactionSource.java
+++
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/source/CompactionSource.java
@@ -165,7 +165,7 @@ public class CompactionSource implements
WorkUnitStreamSource<String, String> {
Iterators.transform (datasets.iterator(), new
Function<Dataset, Callable<VerifiedDataset>>() {
@Override
public Callable<VerifiedDataset> apply(Dataset dataset) {
- return new DatasetVerifier (dataset, workUnitIterator,
suite.getDatasetsFinderVerifiers());
+ return new DatasetVerifier (dataset, workUnitIterator,
suite.getDatasetsFinderVerifiers(), state);
}
});
@@ -312,6 +312,7 @@ public class CompactionSource implements
WorkUnitStreamSource<String, String> {
private Dataset dataset;
private CompactionWorkUnitIterator workUnitIterator;
private List<CompactionVerifier> verifiers;
+ private State state;
/**
* {@link VerifiedDataset} wraps original {@link Dataset} because if
verification failed, we are able get original
@@ -321,7 +322,7 @@ public class CompactionSource implements
WorkUnitStreamSource<String, String> {
try {
VerifiedResult result = this.verify(dataset);
if (result.allVerificationPassed) {
- this.workUnitIterator.addWorkUnit(createWorkUnit(dataset));
+ this.workUnitIterator.addWorkUnit(createWorkUnit(dataset, state));
}
return new VerifiedDataset(dataset, result);
} catch (Exception e) {
@@ -423,11 +424,17 @@ public class CompactionSource implements
WorkUnitStreamSource<String, String> {
}
}
- protected WorkUnit createWorkUnit(Dataset dataset) throws IOException {
- WorkUnit workUnit = new WorkUnit();
+ protected WorkUnit createWorkUnit(Dataset dataset, State state) throws
IOException {
+ WorkUnit workUnit = WorkUnit.createEmpty();
TaskUtils.setTaskFactoryClass(workUnit, MRCompactionTaskFactory.class);
suite.save(dataset, workUnit);
workUnit.setProp(ConfigurationKeys.DATASET_URN_KEY, dataset.getUrn());
+ if (state.contains(ConfigurationKeys.JOB_NAME_KEY)) {
+ workUnit.setProp(ConfigurationKeys.JOB_NAME_KEY,
state.getProp(ConfigurationKeys.JOB_NAME_KEY));
+ }
+ if (state.contains(ConfigurationKeys.JOB_ID_KEY)) {
+ workUnit.setProp(ConfigurationKeys.JOB_ID_KEY,
state.getProp(ConfigurationKeys.JOB_ID_KEY));
+ }
return workUnit;
}
diff --git
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
index fa98350a5..4454559e3 100644
---
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
+++
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
@@ -583,6 +583,12 @@ public abstract class KafkaSource<S, D> extends
EventBasedSource<S, D> {
workUnit.setProp(KafkaSource.RECORD_CREATION_TIMESTAMP_FIELD,
state.getProp(KafkaSource.RECORD_CREATION_TIMESTAMP_FIELD));
workUnit.setProp(KafkaSource.RECORD_CREATION_TIMESTAMP_UNIT,
state.getProp(KafkaSource.RECORD_CREATION_TIMESTAMP_UNIT,
TimeUnit.MILLISECONDS.name()));
}
+ if (state.contains(ConfigurationKeys.JOB_NAME_KEY)) {
+ workUnit.setProp(ConfigurationKeys.JOB_NAME_KEY,
state.getProp(ConfigurationKeys.JOB_NAME_KEY));
+ }
+ if (state.contains(ConfigurationKeys.JOB_ID_KEY)) {
+ workUnit.setProp(ConfigurationKeys.JOB_ID_KEY,
state.getProp(ConfigurationKeys.JOB_ID_KEY));
+ }
}
private long getPreviousStartFetchEpochTimeForPartition(KafkaPartition
partition, SourceState state) {