This is an automated email from the ASF dual-hosted git repository.

zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 27c9e0e  [GOBBLIN-1590] Add low/high watermark information in event 
emitted by Gobblin cluster (#3443)
27c9e0e is described below

commit 27c9e0e0fab8eee0bffb88d0d9c48375c545a727
Author: Zihan Li <[email protected]>
AuthorDate: Tue Dec 14 15:21:17 2021 -0800

    [GOBBLIN-1590] Add low/high watermark information in event emitted by 
Gobblin cluster (#3443)
---
 .../cluster/ClusterEventMetadataGenerator.java     | 27 ++++++++++++++--------
 1 file changed, 18 insertions(+), 9 deletions(-)

diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/ClusterEventMetadataGenerator.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/ClusterEventMetadataGenerator.java
index 05f7fb6..5ad0fb4 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/ClusterEventMetadataGenerator.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/ClusterEventMetadataGenerator.java
@@ -17,13 +17,13 @@
 
 package org.apache.gobblin.cluster;
 
+import com.google.common.collect.ImmutableMap;
 import java.util.List;
 import java.util.Map;
 
-import com.google.common.collect.ImmutableMap;
-
 import org.apache.gobblin.annotation.Alias;
 import org.apache.gobblin.metrics.event.EventName;
+import org.apache.gobblin.metrics.event.TimingEvent;
 import org.apache.gobblin.runtime.EventMetadataUtils;
 import org.apache.gobblin.runtime.JobContext;
 import org.apache.gobblin.runtime.TaskState;
@@ -43,17 +43,26 @@ public class ClusterEventMetadataGenerator implements 
EventMetadataGenerator{
     List<TaskState> taskStates = jobContext.getJobState().getTaskStates();
     String taskException = 
EventMetadataUtils.getTaskFailureExceptions(taskStates);
     String jobException = 
EventMetadataUtils.getJobFailureExceptions(jobContext.getJobState());
+    ImmutableMap.Builder<String,String> metadataBuilder = 
ImmutableMap.builder();
+    if 
(jobContext.getJobState().contains(TimingEvent.FlowEventConstants.HIGH_WATERMARK_FIELD))
 {
+        
metadataBuilder.put(TimingEvent.FlowEventConstants.HIGH_WATERMARK_FIELD,
+            
jobContext.getJobState().getProp(TimingEvent.FlowEventConstants.HIGH_WATERMARK_FIELD));
+    }
+    if 
(jobContext.getJobState().contains(TimingEvent.FlowEventConstants.LOW_WATERMARK_FIELD))
 {
+        metadataBuilder.put(TimingEvent.FlowEventConstants.LOW_WATERMARK_FIELD,
+            
jobContext.getJobState().getProp(TimingEvent.FlowEventConstants.LOW_WATERMARK_FIELD));
+    }
 
     switch (eventName) {
-      case JOB_COMPLETE:
-        return ImmutableMap.of(PROCESSED_COUNT_KEY, 
Long.toString(EventMetadataUtils.getProcessedCount(taskStates)));
-      case JOB_FAILED:
-        return ImmutableMap.of(MESSAGE_KEY, taskException.length() != 0 ? 
taskException : jobException);
-      default:
-        break;
+        case JOB_COMPLETE:
+            return metadataBuilder.put(PROCESSED_COUNT_KEY, 
Long.toString(EventMetadataUtils.getProcessedCount(taskStates))).build();
+        case JOB_FAILED:
+            return metadataBuilder.put(MESSAGE_KEY, taskException.length() != 
0 ? taskException : jobException).build();
+        default:
+            break;
     }
 
-    return ImmutableMap.of();
+    return metadataBuilder.build();
   }
 }
 

Reply via email to