This is an automated email from the ASF dual-hosted git repository.
vivekrai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 00d0ab99dd [GOBBLIN-2239] Introduce pushMessagesSync for sync GTE
emission (#4156)
00d0ab99dd is described below
commit 00d0ab99dd7e3f3620c1c7b11efaf2d8a6b29652
Author: Vivek Rai <[email protected]>
AuthorDate: Thu Nov 20 11:59:40 2025 +0530
[GOBBLIN-2239] Introduce pushMessagesSync for sync GTE emission (#4156)
* introduce pushMessagesSync for GTE emission
* add missing java doc
* some refactoring
* updated java doc
---
.../gobblin/metrics/reporter/EventReporter.java | 17 ++++++++++
.../org/apache/gobblin/metrics/GobblinMetrics.java | 12 +++++++
.../metrics/kafka/KafkaEventKeyValueReporter.java | 27 +++++++++++++--
.../org/apache/gobblin/metrics/kafka/Pusher.java | 11 ++++++-
.../temporal/GobblinTemporalConfigurationKeys.java | 4 +++
.../cluster/GobblinTemporalTaskRunner.java | 6 ++--
.../workflows/metrics/SubmitGTEActivityImpl.java | 38 +++++++++++++++++++---
7 files changed, 104 insertions(+), 11 deletions(-)
diff --git
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/reporter/EventReporter.java
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/reporter/EventReporter.java
index 94b8c4be4c..09eadbd6e0 100644
---
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/reporter/EventReporter.java
+++
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/reporter/EventReporter.java
@@ -163,12 +163,29 @@ public abstract class EventReporter extends
ScheduledReporter implements Closeab
reportEventQueue(this.reportingQueue);
}
+ /**
+ * Report all {@link org.apache.gobblin.metrics.GobblinTrackingEvent}s in
the queue synchronously.
+ */
+ public void reportSynchronously() {
+ reportEventQueueSynchronously(this.reportingQueue);
+ }
+
/**
* Emit all {@link org.apache.gobblin.metrics.GobblinTrackingEvent} in queue.
* @param queue {@link java.util.Queue} containing {@link
org.apache.gobblin.metrics.GobblinTrackingEvent}s that should be emitted.
*/
public abstract void reportEventQueue(Queue<GobblinTrackingEvent> queue);
+ /**
+ * Emit all {@link org.apache.gobblin.metrics.GobblinTrackingEvent} in queue
synchronously.
+ * Default implementation provides no synchronous guarantees and just calls
{@link #reportEventQueue(Queue)}.
+ * Subclasses should override this method to implement true synchronous
reporting behavior.
+ * @param queue {@link java.util.Queue} containing {@link
org.apache.gobblin.metrics.GobblinTrackingEvent}s that should be emitted.
+ */
+ public void reportEventQueueSynchronously(Queue<GobblinTrackingEvent> queue)
{
+ reportEventQueue(queue);
+ }
+
/**
* NOOP because {@link com.codahale.metrics.ScheduledReporter} requires this
method implemented.
*/
diff --git
a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/GobblinMetrics.java
b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/GobblinMetrics.java
index dfb6e7841a..def33d8fa2 100644
---
a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/GobblinMetrics.java
+++
b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/GobblinMetrics.java
@@ -350,6 +350,18 @@ public class GobblinMetrics {
return this.metricContext.timer(MetricRegistry.name(prefix, suffixes));
}
+ /**
+ * Get the first {@link ScheduledReporter} of the given class if it exists.
+ * @param clazz the class of the ScheduledReporter to get
+ * @return an Optional containing the first ScheduledReporter of the given
class if it exists, else an absent Optional
+ */
+ public <T extends com.codahale.metrics.ScheduledReporter>
java.util.Optional<T> getScheduledReporter(Class<T> clazz) {
+ if (clazz == null) {
+ throw new IllegalArgumentException("Class argument cannot be null");
+ }
+ return
this.codahaleScheduledReporters.stream().filter(clazz::isInstance).map(clazz::cast).findFirst();
+ }
+
/**
* Starts metric reporting and appends the given metrics file suffix to the
current value of
* {@link ConfigurationKeys#METRICS_FILE_SUFFIX}.
diff --git
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaEventKeyValueReporter.java
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaEventKeyValueReporter.java
index 8d2122601a..ce1e72ff11 100644
---
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaEventKeyValueReporter.java
+++
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaEventKeyValueReporter.java
@@ -54,6 +54,29 @@ public class KafkaEventKeyValueReporter extends
KafkaEventReporter {
@Override
public void reportEventQueue(Queue<GobblinTrackingEvent> queue) {
+ reportEventQueueInternal(queue, false);
+ }
+
+ @Override
+ public void reportEventQueueSynchronously(Queue<GobblinTrackingEvent> queue)
{
+ reportEventQueueInternal(queue, true);
+ }
+
+ private void reportEventQueueInternal(Queue<GobblinTrackingEvent> queue,
boolean sync) {
+ List<Pair<String, byte[]>> events = getEventsFromQueue(queue);
+ if (!events.isEmpty()) {
+ log.info("Pushing {} Gobblin Tracking Events to Kafka", events.size());
+ if (sync) {
+ this.kafkaPusher.pushMessagesSync(events);
+ } else {
+ this.kafkaPusher.pushMessages(events);
+ }
+ } else {
+ log.debug("No GTE to push.");
+ }
+ }
+
+ private List<Pair<String, byte[]>>
getEventsFromQueue(Queue<GobblinTrackingEvent> queue) {
GobblinTrackingEvent nextEvent;
List<Pair<String, byte[]>> events = Lists.newArrayList();
@@ -75,9 +98,7 @@ public class KafkaEventKeyValueReporter extends
KafkaEventReporter {
events.add(Pair.of(key, this.serializer.serializeRecord(nextEvent)));
}
- if (!events.isEmpty()) {
- this.kafkaPusher.pushMessages(events);
- }
+ return events;
}
private static class BuilderImpl extends Builder<BuilderImpl> {
diff --git
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/Pusher.java
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/Pusher.java
index 9024a88a02..57f088f5b2 100644
---
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/Pusher.java
+++
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/Pusher.java
@@ -27,7 +27,16 @@ import java.util.List;
public interface Pusher<M> extends Closeable {
/**
* Push all byte array messages to the Kafka topic.
- * @param messages List of byte array messages to push to Kakfa.
+ * @param messages List of byte array messages to push to Kafka.
*/
void pushMessages(List<M> messages);
+
+ /**
+ * Synchronous version of {@link #pushMessages(List)}.
+ * Default implementation just calls {@link #pushMessages(List)}.
+ * @param messages List of byte array messages to push to Kafka.
+ */
+ default void pushMessagesSync(List<M> messages) {
+ pushMessages(messages);
+ }
}
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java
index cdd5728fdc..94819c9c85 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java
@@ -46,6 +46,10 @@ public interface GobblinTemporalConfigurationKeys {
String GOBBLIN_TEMPORAL_WORK_DIR_CLEANUP_ENABLED = PREFIX +
"work.dir.cleanup.enabled";
String DEFAULT_GOBBLIN_TEMPORAL_WORK_DIR_CLEANUP_ENABLED = "true";
+ String GOBBLIN_TEMPORAL_CONTAINER_METRICS_PREFIX = PREFIX +
"container.metrics.";
+ String GOBBLIN_TEMPORAL_CONTAINER_METRICS_APPLICATION_NAME =
GOBBLIN_TEMPORAL_CONTAINER_METRICS_PREFIX + "application.name";
+ String GOBBLIN_TEMPORAL_CONTAINER_METRICS_TASK_RUNNER_ID =
GOBBLIN_TEMPORAL_CONTAINER_METRICS_PREFIX + "task.runner.id";
+
/**
* Suffix for metrics emitted by GobblinTemporalJobLauncher for preventing
collisions with prod jobs
* during testing
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/GobblinTemporalTaskRunner.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/GobblinTemporalTaskRunner.java
index c8091068a9..e7cd4315f0 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/GobblinTemporalTaskRunner.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/GobblinTemporalTaskRunner.java
@@ -208,8 +208,10 @@ public class GobblinTemporalTaskRunner implements
StandardMetricsBridge {
private Config saveConfigToFile(Config config)
throws IOException {
- Config newConf = config
- .withValue(CLUSTER_APP_WORK_DIR,
ConfigValueFactory.fromAnyRef(this.appWorkPath.toString()));
+ // Save the application name and task runner id in the config for getting
ContainerMetrics instance later
+ Config newConf = config.withValue(CLUSTER_APP_WORK_DIR,
ConfigValueFactory.fromAnyRef(this.appWorkPath.toString()))
+
.withValue(GobblinTemporalConfigurationKeys.GOBBLIN_TEMPORAL_CONTAINER_METRICS_APPLICATION_NAME,
ConfigValueFactory.fromAnyRef(this.applicationName))
+
.withValue(GobblinTemporalConfigurationKeys.GOBBLIN_TEMPORAL_CONTAINER_METRICS_TASK_RUNNER_ID,
ConfigValueFactory.fromAnyRef(this.taskRunnerId));
ConfigUtils configUtils = new ConfigUtils(new FileUtils());
configUtils.saveConfigToFile(newConf, CLUSTER_CONF_PATH);
return newConf;
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/SubmitGTEActivityImpl.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/SubmitGTEActivityImpl.java
index 097e279c5f..8b6d9be9d6 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/SubmitGTEActivityImpl.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/SubmitGTEActivityImpl.java
@@ -19,22 +19,50 @@ package org.apache.gobblin.temporal.workflows.metrics;
import java.util.Map;
-import io.temporal.workflow.Workflow;
-
-import org.slf4j.Logger;
+import java.util.Optional;
+import lombok.extern.slf4j.Slf4j;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import org.apache.gobblin.cluster.ContainerMetrics;
import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.metrics.event.GobblinEventBuilder;
import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.metrics.kafka.KafkaAvroEventKeyValueReporter;
+import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
+import org.apache.gobblin.temporal.cluster.WorkerConfig;
+import org.apache.gobblin.util.ConfigUtils;
+@Slf4j
public class SubmitGTEActivityImpl implements SubmitGTEActivity {
- private static Logger log =
Workflow.getLogger(SubmitGTEActivityImpl.class);
@Override
public void submitGTE(GobblinEventBuilder eventBuilder,
EventSubmitterContext eventSubmitterContext) {
- log.info("submitting GTE - {}",
summarizeEventMetadataForLogging(eventBuilder));
+ log.info("Submitting GTE - {}",
summarizeEventMetadataForLogging(eventBuilder));
eventSubmitterContext.create().submit(eventBuilder);
+ Config config = WorkerConfig.of(this).orElse(ConfigFactory.load());
+ if
(config.hasPath(GobblinTemporalConfigurationKeys.GOBBLIN_TEMPORAL_CONTAINER_METRICS_TASK_RUNNER_ID)
&&
+
config.hasPath(GobblinTemporalConfigurationKeys.GOBBLIN_TEMPORAL_CONTAINER_METRICS_APPLICATION_NAME))
{
+ String containerMetricsApplicationName =
config.getString(GobblinTemporalConfigurationKeys.GOBBLIN_TEMPORAL_CONTAINER_METRICS_APPLICATION_NAME);
+ String containerMetricsTaskRunnerId =
config.getString(GobblinTemporalConfigurationKeys.GOBBLIN_TEMPORAL_CONTAINER_METRICS_TASK_RUNNER_ID);
+ ContainerMetrics containerMetrics =
ContainerMetrics.get(ConfigUtils.configToState(config),
containerMetricsApplicationName, containerMetricsTaskRunnerId);
+ log.info("Fetched container metrics instance {} for taskRunnerId:
{} and applicationName: {}", containerMetrics.toString(),
+ containerMetricsTaskRunnerId, containerMetricsApplicationName);
+ Optional<KafkaAvroEventKeyValueReporter> kafkaReporterOptional =
containerMetrics.getScheduledReporter(KafkaAvroEventKeyValueReporter.class);
+ if (kafkaReporterOptional.isPresent()) {
+ log.info("Submitting GTE in synchronous manner to Kafka
reporter");
+ kafkaReporterOptional.get().reportSynchronously();
+ log.info("Submitted GTE to Kafka reporter");
+ } else {
+ log.warn("No KafkaAvroEventKeyValueReporter found in container
metrics for taskRunnerId: {} and applicationName: {}",
+ containerMetricsTaskRunnerId,
containerMetricsApplicationName);
+ }
+ } else {
+ log.warn("Both {} and {} should be set to fetch container metrics
instance for synchronous GTE submission",
+
GobblinTemporalConfigurationKeys.GOBBLIN_TEMPORAL_CONTAINER_METRICS_TASK_RUNNER_ID,
+
GobblinTemporalConfigurationKeys.GOBBLIN_TEMPORAL_CONTAINER_METRICS_APPLICATION_NAME);
+ }
}
private static String summarizeEventMetadataForLogging(GobblinEventBuilder
eventBuilder) {