This is an automated email from the ASF dual-hosted git repository.

vivekrai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 00d0ab99dd [GOBBLIN-2239] Introduce pushMessagesSync for sync GTE 
emission (#4156)
00d0ab99dd is described below

commit 00d0ab99dd7e3f3620c1c7b11efaf2d8a6b29652
Author: Vivek Rai <[email protected]>
AuthorDate: Thu Nov 20 11:59:40 2025 +0530

    [GOBBLIN-2239] Introduce pushMessagesSync for sync GTE emission (#4156)
    
    * introduce pushMessagesSync for GTE emission
    
    * add missing java doc
    
    * some refactoring
    
    * updated java doc
---
 .../gobblin/metrics/reporter/EventReporter.java    | 17 ++++++++++
 .../org/apache/gobblin/metrics/GobblinMetrics.java | 12 +++++++
 .../metrics/kafka/KafkaEventKeyValueReporter.java  | 27 +++++++++++++--
 .../org/apache/gobblin/metrics/kafka/Pusher.java   | 11 ++++++-
 .../temporal/GobblinTemporalConfigurationKeys.java |  4 +++
 .../cluster/GobblinTemporalTaskRunner.java         |  6 ++--
 .../workflows/metrics/SubmitGTEActivityImpl.java   | 38 +++++++++++++++++++---
 7 files changed, 104 insertions(+), 11 deletions(-)

diff --git 
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/reporter/EventReporter.java
 
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/reporter/EventReporter.java
index 94b8c4be4c..09eadbd6e0 100644
--- 
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/reporter/EventReporter.java
+++ 
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/reporter/EventReporter.java
@@ -163,12 +163,29 @@ public abstract class EventReporter extends 
ScheduledReporter implements Closeab
     reportEventQueue(this.reportingQueue);
   }
 
+  /**
+   * Report all {@link org.apache.gobblin.metrics.GobblinTrackingEvent}s in 
the queue synchronously.
+   */
+  public void reportSynchronously() {
+    reportEventQueueSynchronously(this.reportingQueue);
+  }
+
   /**
    * Emit all {@link org.apache.gobblin.metrics.GobblinTrackingEvent} in queue.
    * @param queue {@link java.util.Queue} containing {@link 
org.apache.gobblin.metrics.GobblinTrackingEvent}s that should be emitted.
    */
   public abstract void reportEventQueue(Queue<GobblinTrackingEvent> queue);
 
+  /**
+   * Emit all {@link org.apache.gobblin.metrics.GobblinTrackingEvent} in queue 
synchronously.
+   * Default implementation provides no synchronous guarantees and just calls 
{@link #reportEventQueue(Queue)}.
+   * Subclasses should override this method to implement true synchronous 
reporting behavior.
+   * @param queue {@link java.util.Queue} containing {@link 
org.apache.gobblin.metrics.GobblinTrackingEvent}s that should be emitted.
+   */
+  public void reportEventQueueSynchronously(Queue<GobblinTrackingEvent> queue) 
{
+    reportEventQueue(queue);
+  }
+
   /**
    * NOOP because {@link com.codahale.metrics.ScheduledReporter} requires this 
method implemented.
    */
diff --git 
a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/GobblinMetrics.java
 
b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/GobblinMetrics.java
index dfb6e7841a..def33d8fa2 100644
--- 
a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/GobblinMetrics.java
+++ 
b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/GobblinMetrics.java
@@ -350,6 +350,18 @@ public class GobblinMetrics {
     return this.metricContext.timer(MetricRegistry.name(prefix, suffixes));
   }
 
+  /**
+   * Get the first {@link ScheduledReporter} of the given class if it exists.
+   * @param clazz the class of the ScheduledReporter to get
+   * @return an Optional containing the first ScheduledReporter of the given 
class if it exists, else an absent Optional
+   */
+  public <T extends com.codahale.metrics.ScheduledReporter> 
java.util.Optional<T> getScheduledReporter(Class<T> clazz) {
+    if (clazz == null) {
+      throw new IllegalArgumentException("Class argument cannot be null");
+    }
+    return 
this.codahaleScheduledReporters.stream().filter(clazz::isInstance).map(clazz::cast).findFirst();
+  }
+
   /**
    * Starts metric reporting and appends the given metrics file suffix to the 
current value of
    * {@link ConfigurationKeys#METRICS_FILE_SUFFIX}.
diff --git 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaEventKeyValueReporter.java
 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaEventKeyValueReporter.java
index 8d2122601a..ce1e72ff11 100644
--- 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaEventKeyValueReporter.java
+++ 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaEventKeyValueReporter.java
@@ -54,6 +54,29 @@ public class KafkaEventKeyValueReporter extends 
KafkaEventReporter {
 
   @Override
   public void reportEventQueue(Queue<GobblinTrackingEvent> queue) {
+    reportEventQueueInternal(queue, false);
+  }
+
+  @Override
+  public void reportEventQueueSynchronously(Queue<GobblinTrackingEvent> queue) 
{
+    reportEventQueueInternal(queue, true);
+  }
+
+  private void reportEventQueueInternal(Queue<GobblinTrackingEvent> queue, 
boolean sync) {
+    List<Pair<String, byte[]>> events = getEventsFromQueue(queue);
+    if (!events.isEmpty()) {
+      log.info("Pushing {} Gobblin Tracking Events to Kafka", events.size());
+      if (sync) {
+        this.kafkaPusher.pushMessagesSync(events);
+      } else {
+        this.kafkaPusher.pushMessages(events);
+      }
+    } else {
+      log.debug("No GTE to push.");
+    }
+  }
+
+  private List<Pair<String, byte[]>> 
getEventsFromQueue(Queue<GobblinTrackingEvent> queue) {
     GobblinTrackingEvent nextEvent;
     List<Pair<String, byte[]>> events = Lists.newArrayList();
 
@@ -75,9 +98,7 @@ public class KafkaEventKeyValueReporter extends 
KafkaEventReporter {
       events.add(Pair.of(key, this.serializer.serializeRecord(nextEvent)));
     }
 
-    if (!events.isEmpty()) {
-      this.kafkaPusher.pushMessages(events);
-    }
+    return events;
   }
 
   private static class BuilderImpl extends Builder<BuilderImpl> {
diff --git 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/Pusher.java
 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/Pusher.java
index 9024a88a02..57f088f5b2 100644
--- 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/Pusher.java
+++ 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/Pusher.java
@@ -27,7 +27,16 @@ import java.util.List;
 public interface Pusher<M> extends Closeable {
   /**
    * Push all byte array messages to the Kafka topic.
-   * @param messages List of byte array messages to push to Kakfa.
+   * @param messages List of byte array messages to push to Kafka.
    */
   void pushMessages(List<M> messages);
+
+  /**
+   * Synchronous version of {@link #pushMessages(List)}.
+   * Default implementation just calls {@link #pushMessages(List)}.
+   * @param messages List of byte array messages to push to Kafka.
+   */
+  default void pushMessagesSync(List<M> messages) {
+    pushMessages(messages);
+  }
 }
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java
index cdd5728fdc..94819c9c85 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java
@@ -46,6 +46,10 @@ public interface GobblinTemporalConfigurationKeys {
   String GOBBLIN_TEMPORAL_WORK_DIR_CLEANUP_ENABLED = PREFIX + 
"work.dir.cleanup.enabled";
   String DEFAULT_GOBBLIN_TEMPORAL_WORK_DIR_CLEANUP_ENABLED = "true";
 
+  String GOBBLIN_TEMPORAL_CONTAINER_METRICS_PREFIX = PREFIX + 
"container.metrics.";
+  String GOBBLIN_TEMPORAL_CONTAINER_METRICS_APPLICATION_NAME = 
GOBBLIN_TEMPORAL_CONTAINER_METRICS_PREFIX + "application.name";
+  String GOBBLIN_TEMPORAL_CONTAINER_METRICS_TASK_RUNNER_ID = 
GOBBLIN_TEMPORAL_CONTAINER_METRICS_PREFIX + "task.runner.id";
+
   /**
    * Suffix for metrics emitted by GobblinTemporalJobLauncher for preventing 
collisions with prod jobs
    * during testing
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/GobblinTemporalTaskRunner.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/GobblinTemporalTaskRunner.java
index c8091068a9..e7cd4315f0 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/GobblinTemporalTaskRunner.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/GobblinTemporalTaskRunner.java
@@ -208,8 +208,10 @@ public class GobblinTemporalTaskRunner implements 
StandardMetricsBridge {
 
   private Config saveConfigToFile(Config config)
       throws IOException {
-    Config newConf = config
-        .withValue(CLUSTER_APP_WORK_DIR, 
ConfigValueFactory.fromAnyRef(this.appWorkPath.toString()));
+    // Save the application name and task runner id in the config for getting 
ContainerMetrics instance later
+    Config newConf = config.withValue(CLUSTER_APP_WORK_DIR, 
ConfigValueFactory.fromAnyRef(this.appWorkPath.toString()))
+        
.withValue(GobblinTemporalConfigurationKeys.GOBBLIN_TEMPORAL_CONTAINER_METRICS_APPLICATION_NAME,
 ConfigValueFactory.fromAnyRef(this.applicationName))
+        
.withValue(GobblinTemporalConfigurationKeys.GOBBLIN_TEMPORAL_CONTAINER_METRICS_TASK_RUNNER_ID,
 ConfigValueFactory.fromAnyRef(this.taskRunnerId));
     ConfigUtils configUtils = new ConfigUtils(new FileUtils());
     configUtils.saveConfigToFile(newConf, CLUSTER_CONF_PATH);
     return newConf;
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/SubmitGTEActivityImpl.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/SubmitGTEActivityImpl.java
index 097e279c5f..8b6d9be9d6 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/SubmitGTEActivityImpl.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/SubmitGTEActivityImpl.java
@@ -19,22 +19,50 @@ package org.apache.gobblin.temporal.workflows.metrics;
 
 import java.util.Map;
 
-import io.temporal.workflow.Workflow;
-
-import org.slf4j.Logger;
+import java.util.Optional;
+import lombok.extern.slf4j.Slf4j;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
 
+import org.apache.gobblin.cluster.ContainerMetrics;
 import org.apache.gobblin.metrics.event.EventSubmitter;
 import org.apache.gobblin.metrics.event.GobblinEventBuilder;
 import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.metrics.kafka.KafkaAvroEventKeyValueReporter;
+import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
+import org.apache.gobblin.temporal.cluster.WorkerConfig;
+import org.apache.gobblin.util.ConfigUtils;
 
 
+@Slf4j
 public class SubmitGTEActivityImpl implements SubmitGTEActivity {
-    private static Logger log = 
Workflow.getLogger(SubmitGTEActivityImpl.class);
 
     @Override
     public void submitGTE(GobblinEventBuilder eventBuilder, 
EventSubmitterContext eventSubmitterContext) {
-        log.info("submitting GTE - {}", 
summarizeEventMetadataForLogging(eventBuilder));
+        log.info("Submitting GTE - {}", 
summarizeEventMetadataForLogging(eventBuilder));
         eventSubmitterContext.create().submit(eventBuilder);
+        Config config = WorkerConfig.of(this).orElse(ConfigFactory.load());
+        if 
(config.hasPath(GobblinTemporalConfigurationKeys.GOBBLIN_TEMPORAL_CONTAINER_METRICS_TASK_RUNNER_ID)
 &&
+            
config.hasPath(GobblinTemporalConfigurationKeys.GOBBLIN_TEMPORAL_CONTAINER_METRICS_APPLICATION_NAME))
 {
+            String containerMetricsApplicationName = 
config.getString(GobblinTemporalConfigurationKeys.GOBBLIN_TEMPORAL_CONTAINER_METRICS_APPLICATION_NAME);
+            String containerMetricsTaskRunnerId = 
config.getString(GobblinTemporalConfigurationKeys.GOBBLIN_TEMPORAL_CONTAINER_METRICS_TASK_RUNNER_ID);
+            ContainerMetrics containerMetrics = 
ContainerMetrics.get(ConfigUtils.configToState(config), 
containerMetricsApplicationName, containerMetricsTaskRunnerId);
+            log.info("Fetched container metrics instance {} for taskRunnerId: 
{} and applicationName: {}", containerMetrics.toString(),
+                containerMetricsTaskRunnerId, containerMetricsApplicationName);
+            Optional<KafkaAvroEventKeyValueReporter> kafkaReporterOptional = 
containerMetrics.getScheduledReporter(KafkaAvroEventKeyValueReporter.class);
+            if (kafkaReporterOptional.isPresent()) {
+                log.info("Submitting GTE in synchronous manner to Kafka 
reporter");
+                kafkaReporterOptional.get().reportSynchronously();
+                log.info("Submitted GTE to Kafka reporter");
+            } else {
+                log.warn("No KafkaAvroEventKeyValueReporter found in container 
metrics for taskRunnerId: {} and applicationName: {}",
+                    containerMetricsTaskRunnerId, 
containerMetricsApplicationName);
+            }
+        } else {
+            log.warn("Both {} and {} should be set to fetch container metrics 
instance for synchronous GTE submission",
+                
GobblinTemporalConfigurationKeys.GOBBLIN_TEMPORAL_CONTAINER_METRICS_TASK_RUNNER_ID,
+                
GobblinTemporalConfigurationKeys.GOBBLIN_TEMPORAL_CONTAINER_METRICS_APPLICATION_NAME);
+        }
     }
 
     private static String summarizeEventMetadataForLogging(GobblinEventBuilder 
eventBuilder) {

Reply via email to