Copilot commented on code in PR #4156:
URL: https://github.com/apache/gobblin/pull/4156#discussion_r2544295654


##########
gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/Pusher.java:
##########
@@ -27,7 +27,16 @@
 public interface Pusher<M> extends Closeable {
   /**
    * Push all byte array messages to the Kafka topic.
-   * @param messages List of byte array messages to push to Kakfa.
+   * @param messages List of byte array messages to push to Kafka.
    */
   void pushMessages(List<M> messages);
+
+  /**
+   * Synchronous version of {@link #pushMessages(List)}.
+   * Default implementation just calls {@link #pushMessages(List)}.
+   * @param messages List of byte array messages to push to Kafka.
+   */
+  default void pushMessagesSync(List<M> messages) {
+    pushMessages(messages);
+  }

Review Comment:
   The default implementation of `pushMessagesSync` only calls `pushMessages`, 
which doesn't actually provide synchronous behavior. None of the concrete 
`Pusher` implementations (KafkaKeyValueProducerPusher for Kafka 08, 09, and 1) 
override this method to call `producer.flush()`. This means the synchronous 
flushing functionality will not work as intended. Each 
KafkaKeyValueProducerPusher implementation should override `pushMessagesSync` 
to call `this.producer.flush()` after sending messages.



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/SubmitGTEActivityImpl.java:
##########
@@ -19,22 +19,50 @@
 
 import java.util.Map;
 
-import io.temporal.workflow.Workflow;
-
-import org.slf4j.Logger;
+import java.util.Optional;
+import lombok.extern.slf4j.Slf4j;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
 
+import org.apache.gobblin.cluster.ContainerMetrics;
 import org.apache.gobblin.metrics.event.EventSubmitter;
 import org.apache.gobblin.metrics.event.GobblinEventBuilder;
 import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.metrics.kafka.KafkaAvroEventKeyValueReporter;
+import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
+import org.apache.gobblin.temporal.cluster.WorkerConfig;
+import org.apache.gobblin.util.ConfigUtils;
 
 
+@Slf4j
 public class SubmitGTEActivityImpl implements SubmitGTEActivity {
-    private static Logger log = 
Workflow.getLogger(SubmitGTEActivityImpl.class);
 
     @Override
     public void submitGTE(GobblinEventBuilder eventBuilder, 
EventSubmitterContext eventSubmitterContext) {
-        log.info("submitting GTE - {}", 
summarizeEventMetadataForLogging(eventBuilder));
+        log.info("Submitting GTE - {}", 
summarizeEventMetadataForLogging(eventBuilder));
         eventSubmitterContext.create().submit(eventBuilder);
+        Config config = WorkerConfig.of(this).orElse(ConfigFactory.load());

Review Comment:
   The config lookup happens on every GTE submission. Consider caching the 
config and ContainerMetrics instance as instance variables (initialized once, 
possibly lazily) to avoid repeated lookups, especially since 
SubmitGTEActivityImpl is registered as a singleton activity implementation and 
the config should remain constant throughout the activity's lifecycle.



##########
gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/reporter/EventReporter.java:
##########
@@ -163,12 +163,28 @@ public void report() {
     reportEventQueue(this.reportingQueue);
   }
 
+  /**
+   * Report all {@link org.apache.gobblin.metrics.GobblinTrackingEvent}s in 
the queue synchronously.
+   */
+  public void reportSynchronously() {
+    reportEventQueueSynchronously(this.reportingQueue);
+  }
+
   /**
    * Emit all {@link org.apache.gobblin.metrics.GobblinTrackingEvent} in queue.
    * @param queue {@link java.util.Queue} containing {@link 
org.apache.gobblin.metrics.GobblinTrackingEvent}s that should be emitted.
    */
   public abstract void reportEventQueue(Queue<GobblinTrackingEvent> queue);
 
+  /**
+   * Emit all {@link org.apache.gobblin.metrics.GobblinTrackingEvent} in queue 
synchronously.
+   * Default implementation just calls {@link #reportEventQueue(Queue)} for 
backward compatibility.
+   * @param queue {@link java.util.Queue} containing {@link 
org.apache.gobblin.metrics.GobblinTrackingEvent}s that should be emitted.
+   */
+  public void reportEventQueueSynchronously(Queue<GobblinTrackingEvent> queue) 
{
+    reportEventQueue(queue);
+  }

Review Comment:
   The default implementation of `reportEventQueueSynchronously` should clarify 
in the Javadoc that it provides no actual synchronous guarantees and subclasses 
must override it to implement true synchronous behavior. Update the comment on 
line 181 to state: 'Default implementation provides no synchronous guarantees 
and just calls {@link #reportEventQueue(Queue)}. Subclasses should override 
this method to implement true synchronous reporting behavior.'



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to