This is an automated email from the ASF dual-hosted git repository.
zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 27c9e0e [GOBBLIN-1590] Add low/high watermark information in event
emitted by Gobblin cluster (#3443)
27c9e0e is described below
commit 27c9e0e0fab8eee0bffb88d0d9c48375c545a727
Author: Zihan Li <[email protected]>
AuthorDate: Tue Dec 14 15:21:17 2021 -0800
[GOBBLIN-1590] Add low/high watermark information in event emitted by
Gobblin cluster (#3443)
---
.../cluster/ClusterEventMetadataGenerator.java | 27 ++++++++++++++--------
1 file changed, 18 insertions(+), 9 deletions(-)
diff --git
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/ClusterEventMetadataGenerator.java
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/ClusterEventMetadataGenerator.java
index 05f7fb6..5ad0fb4 100644
---
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/ClusterEventMetadataGenerator.java
+++
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/ClusterEventMetadataGenerator.java
@@ -17,13 +17,13 @@
package org.apache.gobblin.cluster;
+import com.google.common.collect.ImmutableMap;
import java.util.List;
import java.util.Map;
-import com.google.common.collect.ImmutableMap;
-
import org.apache.gobblin.annotation.Alias;
import org.apache.gobblin.metrics.event.EventName;
+import org.apache.gobblin.metrics.event.TimingEvent;
import org.apache.gobblin.runtime.EventMetadataUtils;
import org.apache.gobblin.runtime.JobContext;
import org.apache.gobblin.runtime.TaskState;
@@ -43,17 +43,26 @@ public class ClusterEventMetadataGenerator implements
EventMetadataGenerator{
List<TaskState> taskStates = jobContext.getJobState().getTaskStates();
String taskException =
EventMetadataUtils.getTaskFailureExceptions(taskStates);
String jobException =
EventMetadataUtils.getJobFailureExceptions(jobContext.getJobState());
+ ImmutableMap.Builder<String,String> metadataBuilder =
ImmutableMap.builder();
+ if
(jobContext.getJobState().contains(TimingEvent.FlowEventConstants.HIGH_WATERMARK_FIELD))
{
+
metadataBuilder.put(TimingEvent.FlowEventConstants.HIGH_WATERMARK_FIELD,
+
jobContext.getJobState().getProp(TimingEvent.FlowEventConstants.HIGH_WATERMARK_FIELD));
+ }
+ if
(jobContext.getJobState().contains(TimingEvent.FlowEventConstants.LOW_WATERMARK_FIELD))
{
+ metadataBuilder.put(TimingEvent.FlowEventConstants.LOW_WATERMARK_FIELD,
+
jobContext.getJobState().getProp(TimingEvent.FlowEventConstants.LOW_WATERMARK_FIELD));
+ }
switch (eventName) {
- case JOB_COMPLETE:
- return ImmutableMap.of(PROCESSED_COUNT_KEY,
Long.toString(EventMetadataUtils.getProcessedCount(taskStates)));
- case JOB_FAILED:
- return ImmutableMap.of(MESSAGE_KEY, taskException.length() != 0 ?
taskException : jobException);
- default:
- break;
+ case JOB_COMPLETE:
+ return metadataBuilder.put(PROCESSED_COUNT_KEY,
Long.toString(EventMetadataUtils.getProcessedCount(taskStates))).build();
+ case JOB_FAILED:
+ return metadataBuilder.put(MESSAGE_KEY, taskException.length() !=
0 ? taskException : jobException).build();
+ default:
+ break;
}
- return ImmutableMap.of();
+ return metadataBuilder.build();
}
}