Copilot commented on code in PR #4156:
URL: https://github.com/apache/gobblin/pull/4156#discussion_r2544295654
##########
gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/Pusher.java:
##########
@@ -27,7 +27,16 @@
public interface Pusher<M> extends Closeable {
/**
* Push all byte array messages to the Kafka topic.
- * @param messages List of byte array messages to push to Kakfa.
+ * @param messages List of byte array messages to push to Kafka.
*/
void pushMessages(List<M> messages);
+
+ /**
+ * Synchronous version of {@link #pushMessages(List)}.
+ * Default implementation just calls {@link #pushMessages(List)}.
+ * @param messages List of byte array messages to push to Kafka.
+ */
+ default void pushMessagesSync(List<M> messages) {
+ pushMessages(messages);
+ }
Review Comment:
The default implementation of `pushMessagesSync` only calls `pushMessages`,
which doesn't actually provide synchronous behavior. None of the concrete
`Pusher` implementations (KafkaKeyValueProducerPusher for Kafka 08, 09, and 1)
override this method to call `producer.flush()`. This means the synchronous
flushing functionality will not work as intended. Each
KafkaKeyValueProducerPusher implementation should override `pushMessagesSync`
to call `this.producer.flush()` after sending messages.
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/SubmitGTEActivityImpl.java:
##########
@@ -19,22 +19,50 @@
import java.util.Map;
-import io.temporal.workflow.Workflow;
-
-import org.slf4j.Logger;
+import java.util.Optional;
+import lombok.extern.slf4j.Slf4j;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import org.apache.gobblin.cluster.ContainerMetrics;
import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.metrics.event.GobblinEventBuilder;
import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.metrics.kafka.KafkaAvroEventKeyValueReporter;
+import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
+import org.apache.gobblin.temporal.cluster.WorkerConfig;
+import org.apache.gobblin.util.ConfigUtils;
+@Slf4j
public class SubmitGTEActivityImpl implements SubmitGTEActivity {
- private static Logger log =
Workflow.getLogger(SubmitGTEActivityImpl.class);
@Override
public void submitGTE(GobblinEventBuilder eventBuilder,
EventSubmitterContext eventSubmitterContext) {
- log.info("submitting GTE - {}",
summarizeEventMetadataForLogging(eventBuilder));
+ log.info("Submitting GTE - {}",
summarizeEventMetadataForLogging(eventBuilder));
eventSubmitterContext.create().submit(eventBuilder);
+ Config config = WorkerConfig.of(this).orElse(ConfigFactory.load());
Review Comment:
The config lookup happens on every GTE submission. Consider caching the
config and ContainerMetrics instance as instance variables (initialized once,
possibly lazily) to avoid repeated lookups, especially since
SubmitGTEActivityImpl is registered as a singleton activity implementation and
the config should remain constant throughout the activity's lifecycle.
##########
gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/reporter/EventReporter.java:
##########
@@ -163,12 +163,28 @@ public void report() {
reportEventQueue(this.reportingQueue);
}
+ /**
+ * Report all {@link org.apache.gobblin.metrics.GobblinTrackingEvent}s in
the queue synchronously.
+ */
+ public void reportSynchronously() {
+ reportEventQueueSynchronously(this.reportingQueue);
+ }
+
/**
* Emit all {@link org.apache.gobblin.metrics.GobblinTrackingEvent} in queue.
* @param queue {@link java.util.Queue} containing {@link
org.apache.gobblin.metrics.GobblinTrackingEvent}s that should be emitted.
*/
public abstract void reportEventQueue(Queue<GobblinTrackingEvent> queue);
+ /**
+ * Emit all {@link org.apache.gobblin.metrics.GobblinTrackingEvent} in queue
synchronously.
+ * Default implementation just calls {@link #reportEventQueue(Queue)} for
backward compatibility.
+ * @param queue {@link java.util.Queue} containing {@link
org.apache.gobblin.metrics.GobblinTrackingEvent}s that should be emitted.
+ */
+ public void reportEventQueueSynchronously(Queue<GobblinTrackingEvent> queue)
{
+ reportEventQueue(queue);
+ }
Review Comment:
The default implementation of `reportEventQueueSynchronously` should clarify
in the Javadoc that it provides no actual synchronous guarantees and subclasses
must override it to implement true synchronous behavior. Update the comment on
line 181 to state: 'Default implementation provides no synchronous guarantees
and just calls {@link #reportEventQueue(Queue)}. Subclasses should override
this method to implement true synchronous reporting behavior.'
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]