Copilot commented on code in PR #4156:
URL: https://github.com/apache/gobblin/pull/4156#discussion_r2542767484


##########
gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/Pusher.java:
##########
@@ -30,4 +30,13 @@ public interface Pusher<M> extends Closeable {
    * @param messages List of byte array messages to push to Kakfa.
    */
   void pushMessages(List<M> messages);
+
+  /**
+   * Synchronous version of {@link #pushMessages(List)}.
+   * Default implementation just calls {@link #pushMessages(List)}.
+   * @param messages

Review Comment:
   Missing JavaDoc parameter description. The `@param messages` tag should 
include a description of what messages are being pushed, similar to line 30.
   ```suggestion
      * @param messages List of byte array messages to push to Kafka.
   ```



##########
gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaEventKeyValueReporter.java:
##########
@@ -54,6 +54,27 @@ protected KafkaEventKeyValueReporter(Builder<?> builder) 
throws IOException {
 
   @Override
   public void reportEventQueue(Queue<GobblinTrackingEvent> queue) {
+    List<Pair<String, byte[]>> events = getEventsFromQueue(queue);
+    if (!events.isEmpty()) {
+      log.info("Pushing {} Gobblin Tracking Events to Kafka", events.size());
+      this.kafkaPusher.pushMessages(events);
+    } else {
+      log.debug("No GTE to push.");
+    }
+  }
+
+  @Override
+  public void reportEventQueueSynchronously(Queue<GobblinTrackingEvent> queue) 
{
+    List<Pair<String, byte[]>> events = getEventsFromQueue(queue);
+    if (!events.isEmpty()) {
+      log.info("Pushing {} Gobblin Tracking Events to Kafka", events.size());
+      this.kafkaPusher.pushMessagesSync(events);
+    } else {
+      log.debug("No GTE to push.");
+    }
+  }

Review Comment:
   Code duplication: The logic in `reportEventQueue()` and 
`reportEventQueueSynchronously()` is identical except for the call to 
`pushMessages()` vs `pushMessagesSync()`. Consider refactoring by having both 
methods call a common helper method that accepts the push method as a 
parameter, or have one method call the other with a boolean flag to indicate 
sync/async mode.



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/SubmitGTEActivityImpl.java:
##########
@@ -19,22 +19,50 @@
 
 import java.util.Map;
 
-import io.temporal.workflow.Workflow;
-
-import org.slf4j.Logger;
+import java.util.Optional;
+import lombok.extern.slf4j.Slf4j;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
 
+import org.apache.gobblin.cluster.ContainerMetrics;
 import org.apache.gobblin.metrics.event.EventSubmitter;
 import org.apache.gobblin.metrics.event.GobblinEventBuilder;
 import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.metrics.kafka.KafkaAvroEventKeyValueReporter;
+import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
+import org.apache.gobblin.temporal.cluster.WorkerConfig;
+import org.apache.gobblin.util.ConfigUtils;
 
 
+@Slf4j
 public class SubmitGTEActivityImpl implements SubmitGTEActivity {
-    private static Logger log = 
Workflow.getLogger(SubmitGTEActivityImpl.class);
 
     @Override
     public void submitGTE(GobblinEventBuilder eventBuilder, 
EventSubmitterContext eventSubmitterContext) {
-        log.info("submitting GTE - {}", 
summarizeEventMetadataForLogging(eventBuilder));
+        log.info("Submitting GTE - {}", 
summarizeEventMetadataForLogging(eventBuilder));
         eventSubmitterContext.create().submit(eventBuilder);
+        Config config = WorkerConfig.of(this).orElse(ConfigFactory.load());
+        if 
(config.hasPath(GobblinTemporalConfigurationKeys.GOBBLIN_TEMPORAL_CONTAINER_METRICS_TASK_RUNNER_ID)
 &&
+            
config.hasPath(GobblinTemporalConfigurationKeys.GOBBLIN_TEMPORAL_CONTAINER_METRICS_APPLICATION_NAME))
 {
+            String containerMetricsApplicationName = 
config.getString(GobblinTemporalConfigurationKeys.GOBBLIN_TEMPORAL_CONTAINER_METRICS_APPLICATION_NAME);
+            String containerMetricsTaskRunnerId = 
config.getString(GobblinTemporalConfigurationKeys.GOBBLIN_TEMPORAL_CONTAINER_METRICS_TASK_RUNNER_ID);
+            ContainerMetrics containerMetrics = 
ContainerMetrics.get(ConfigUtils.configToState(config), 
containerMetricsApplicationName, containerMetricsTaskRunnerId);
+            log.info("Fetched container metrics instance {} for taskRunnerId: 
{} and applicationName: {}", containerMetrics.toString(),

Review Comment:
   Redundant use of `toString()`. When using string formatting with `{}` 
placeholders in SLF4J, the object's `toString()` method is called 
automatically. The explicit `.toString()` call is unnecessary and should be 
removed.
   ```suggestion
               log.info("Fetched container metrics instance {} for 
taskRunnerId: {} and applicationName: {}", containerMetrics,
   ```



##########
gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/reporter/EventReporter.java:
##########
@@ -163,12 +163,28 @@ public void report() {
     reportEventQueue(this.reportingQueue);
   }
 
+  /**
+   * Report all {@link org.apache.gobblin.metrics.GobblinTrackingEvent}s in 
the queue synchronously.
+   */
+  public void reportSynchronously() {
+    reportEventQueueSynchronously(this.reportingQueue);
+  }
+
   /**
    * Emit all {@link org.apache.gobblin.metrics.GobblinTrackingEvent} in queue.
    * @param queue {@link java.util.Queue} containing {@link 
org.apache.gobblin.metrics.GobblinTrackingEvent}s that should be emitted.
    */
   public abstract void reportEventQueue(Queue<GobblinTrackingEvent> queue);
 
+  /**
+   * Emit all {@link org.apache.gobblin.metrics.GobblinTrackingEvent} in queue 
synchronously.
+   * Default implementation just calls {@link #reportEventQueue(Queue)} for 
backward compatibility.
+   * @param queue

Review Comment:
   Missing JavaDoc parameter description. The `@param queue` tag should include 
a description of the queue parameter, similar to line 175.
   ```suggestion
      * @param queue {@link java.util.Queue} containing {@link 
org.apache.gobblin.metrics.GobblinTrackingEvent}s that should be emitted 
synchronously.
   ```



##########
gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/Pusher.java:
##########
@@ -30,4 +30,13 @@ public interface Pusher<M> extends Closeable {
    * @param messages List of byte array messages to push to Kakfa.

Review Comment:
   Typo in JavaDoc: "Kakfa" should be "Kafka".
   ```suggestion
      * @param messages List of byte array messages to push to Kafka.
   ```



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/SubmitGTEActivityImpl.java:
##########
@@ -19,22 +19,50 @@
 
 import java.util.Map;
 
-import io.temporal.workflow.Workflow;
-
-import org.slf4j.Logger;
+import java.util.Optional;
+import lombok.extern.slf4j.Slf4j;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
 
+import org.apache.gobblin.cluster.ContainerMetrics;
 import org.apache.gobblin.metrics.event.EventSubmitter;
 import org.apache.gobblin.metrics.event.GobblinEventBuilder;
 import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.metrics.kafka.KafkaAvroEventKeyValueReporter;
+import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
+import org.apache.gobblin.temporal.cluster.WorkerConfig;
+import org.apache.gobblin.util.ConfigUtils;
 
 
+@Slf4j
 public class SubmitGTEActivityImpl implements SubmitGTEActivity {
-    private static Logger log = 
Workflow.getLogger(SubmitGTEActivityImpl.class);
 
     @Override
     public void submitGTE(GobblinEventBuilder eventBuilder, 
EventSubmitterContext eventSubmitterContext) {
-        log.info("submitting GTE - {}", 
summarizeEventMetadataForLogging(eventBuilder));
+        log.info("Submitting GTE - {}", 
summarizeEventMetadataForLogging(eventBuilder));
         eventSubmitterContext.create().submit(eventBuilder);
+        Config config = WorkerConfig.of(this).orElse(ConfigFactory.load());
+        if 
(config.hasPath(GobblinTemporalConfigurationKeys.GOBBLIN_TEMPORAL_CONTAINER_METRICS_TASK_RUNNER_ID)
 &&
+            
config.hasPath(GobblinTemporalConfigurationKeys.GOBBLIN_TEMPORAL_CONTAINER_METRICS_APPLICATION_NAME))
 {
+            String containerMetricsApplicationName = 
config.getString(GobblinTemporalConfigurationKeys.GOBBLIN_TEMPORAL_CONTAINER_METRICS_APPLICATION_NAME);
+            String containerMetricsTaskRunnerId = 
config.getString(GobblinTemporalConfigurationKeys.GOBBLIN_TEMPORAL_CONTAINER_METRICS_TASK_RUNNER_ID);
+            ContainerMetrics containerMetrics = 
ContainerMetrics.get(ConfigUtils.configToState(config), 
containerMetricsApplicationName, containerMetricsTaskRunnerId);
+            log.info("Fetched container metrics instance {} for taskRunnerId: 
{} and applicationName: {}", containerMetrics.toString(),
+                containerMetricsTaskRunnerId, containerMetricsApplicationName);
+            Optional<KafkaAvroEventKeyValueReporter> kafkaReporterOptional = 
containerMetrics.getScheduledReporter(KafkaAvroEventKeyValueReporter.class);
+            if (kafkaReporterOptional.isPresent()) {
+                log.info("Submitting GTE in synchronous manner to Kafka 
reporter");
+                kafkaReporterOptional.get().reportSynchronously();
+                log.info("Submitted GTE to Kafka reporter");
+            } else {
+                log.error("No KafkaAvroEventKeyValueReporter found in 
container metrics for taskRunnerId: {} and applicationName: {}",

Review Comment:
   Logging at ERROR level may not be appropriate here. If the 
KafkaAvroEventKeyValueReporter is optional, this should be logged at WARN or 
INFO level. ERROR level should be reserved for actual failure conditions that 
prevent the operation from completing successfully.
   ```suggestion
                   log.warn("No KafkaAvroEventKeyValueReporter found in 
container metrics for taskRunnerId: {} and applicationName: {}",
   ```



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/SubmitGTEActivityImpl.java:
##########
@@ -19,22 +19,50 @@
 
 import java.util.Map;
 
-import io.temporal.workflow.Workflow;
-
-import org.slf4j.Logger;
+import java.util.Optional;
+import lombok.extern.slf4j.Slf4j;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
 
+import org.apache.gobblin.cluster.ContainerMetrics;
 import org.apache.gobblin.metrics.event.EventSubmitter;
 import org.apache.gobblin.metrics.event.GobblinEventBuilder;
 import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.metrics.kafka.KafkaAvroEventKeyValueReporter;
+import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
+import org.apache.gobblin.temporal.cluster.WorkerConfig;
+import org.apache.gobblin.util.ConfigUtils;
 
 
+@Slf4j
 public class SubmitGTEActivityImpl implements SubmitGTEActivity {
-    private static Logger log = 
Workflow.getLogger(SubmitGTEActivityImpl.class);
 
     @Override
     public void submitGTE(GobblinEventBuilder eventBuilder, 
EventSubmitterContext eventSubmitterContext) {
-        log.info("submitting GTE - {}", 
summarizeEventMetadataForLogging(eventBuilder));
+        log.info("Submitting GTE - {}", 
summarizeEventMetadataForLogging(eventBuilder));
         eventSubmitterContext.create().submit(eventBuilder);
+        Config config = WorkerConfig.of(this).orElse(ConfigFactory.load());
+        if 
(config.hasPath(GobblinTemporalConfigurationKeys.GOBBLIN_TEMPORAL_CONTAINER_METRICS_TASK_RUNNER_ID)
 &&
+            
config.hasPath(GobblinTemporalConfigurationKeys.GOBBLIN_TEMPORAL_CONTAINER_METRICS_APPLICATION_NAME))
 {
+            String containerMetricsApplicationName = 
config.getString(GobblinTemporalConfigurationKeys.GOBBLIN_TEMPORAL_CONTAINER_METRICS_APPLICATION_NAME);
+            String containerMetricsTaskRunnerId = 
config.getString(GobblinTemporalConfigurationKeys.GOBBLIN_TEMPORAL_CONTAINER_METRICS_TASK_RUNNER_ID);
+            ContainerMetrics containerMetrics = 
ContainerMetrics.get(ConfigUtils.configToState(config), 
containerMetricsApplicationName, containerMetricsTaskRunnerId);
+            log.info("Fetched container metrics instance {} for taskRunnerId: 
{} and applicationName: {}", containerMetrics.toString(),
+                containerMetricsTaskRunnerId, containerMetricsApplicationName);
+            Optional<KafkaAvroEventKeyValueReporter> kafkaReporterOptional = 
containerMetrics.getScheduledReporter(KafkaAvroEventKeyValueReporter.class);
+            if (kafkaReporterOptional.isPresent()) {
+                log.info("Submitting GTE in synchronous manner to Kafka 
reporter");
+                kafkaReporterOptional.get().reportSynchronously();
+                log.info("Submitted GTE to Kafka reporter");
+            } else {
+                log.error("No KafkaAvroEventKeyValueReporter found in 
container metrics for taskRunnerId: {} and applicationName: {}",
+                    containerMetricsTaskRunnerId, 
containerMetricsApplicationName);
+            }
+        } else {
+            log.error("Both {} and {} must be set to fetch container metrics 
instance for synchronous GTE submission",

Review Comment:
   Logging at ERROR level may not be appropriate here. If these configuration 
keys are optional (as suggested by the conditional check), then their absence 
should be logged at WARN or INFO level instead. ERROR level typically indicates 
a problem that prevents normal operation, but the code continues without these 
values.
   ```suggestion
               log.warn("Both {} and {} should be set to fetch container 
metrics instance for synchronous GTE submission",
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to