scwhittle commented on code in PR #33503:
URL: https://github.com/apache/beam/pull/33503#discussion_r1962369386
##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaMetrics.java:
##########
@@ -146,14 +154,31 @@ private void recordRpcLatencyMetrics() {
}
}
- private void recordBacklogBytes() {
+ private void recordBacklogBytesInternal() {
for (Map.Entry<String, Long> backlogs :
perTopicPartitionBacklogs().entrySet()) {
Gauge gauge =
KafkaSinkMetrics.createBacklogGauge(MetricName.named("KafkaSink",
backlogs.getKey()));
gauge.set(backlogs.getValue());
}
}
+ /**
+ * This is for recording backlog bytes on the current thread.
+ *
+ * @param topicName topicName
+ * @param partitionId partitionId for the topic Only included in the
metric key if
+ * 'supportsMetricsDeletion' is enabled.
+ * @param backlogBytes backlog for the topic Only included in the metric
key if
+ * 'supportsMetricsDeletion' is enabled.
+ */
+ @Override
+ public void recordBacklogBytes(String topicName, int partitionId, long
backlogBytes) {
+ Gauge perPartion =
+ Metrics.gauge(
Review Comment:
ie can we create the gauge here with the getPerWorkerGauge method? and then
ensure that those metrics have the per-worker metadata populated appropriately?
##########
runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerImpl.java:
##########
@@ -293,6 +293,13 @@ public MetricUpdates getUpdates() {
.setLabel(MonitoringInfoConstants.Labels.NAME,
metricKey.metricName().getName())
.setLabel(MonitoringInfoConstants.Labels.PTRANSFORM,
metricKey.stepName());
}
+
+ // Based on namespace, add per worker metrics label to enable separate
runner based sink based
+ // processing.
+ if (metricName.getNamespace().equals("BigQuerySink")
Review Comment:
it seems like this woudl be cleaner if it was at the call-site when creating
the metric.
That would both make it more obvious there and less based upon magic
constant and would allow us to support some kafka metrics per-worker but others
aggregated or add per-worker metrics to some namespace that already has metrics.
It seems like maybe we could either add it to MetricKey, or we could plumb
it into the cell and then update the function where we generate metadata to
base it upon that instead.
##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java:
##########
@@ -332,7 +332,6 @@ public long getSplitBacklogBytes() {
if (pBacklog == UnboundedReader.BACKLOG_UNKNOWN) {
return UnboundedReader.BACKLOG_UNKNOWN;
}
- backlogBytes += pBacklog;
Review Comment:
revert?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]