kennknowles commented on code in PR #26477:
URL: https://github.com/apache/beam/pull/26477#discussion_r1185260230
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##########
@@ -1400,6 +1402,31 @@ public void close() {
// Blocks while executing work.
executionState.getWorkExecutor().execute();
+ // Reports source bytes processed to workitemcommitrequest if available.
+ try {
+ long sourceBytesProcessed = 0;
+ List<ElementCounter> counters =
+ ((DataflowMapTaskExecutor) executionState.getWorkExecutor())
+ .getReadOperation()
+ .receivers[0]
+ .getOutputCounters();
+ for (ElementCounter counter : counters) {
+ try {
+ Counter<Long, Long> baseCounter = ((OutputObjectAndByteCounter)
counter).getByteCount();
+ if (!baseCounter.getName().name().equals(counterName)) continue;
+ sourceBytesProcessed = (long) baseCounter.getAndReset();
+ } catch (Exception e) {
+ // Ignoring because most counter will crash, spamming the logs.
Review Comment:
What kind of crash? Can you do this without catching all exceptions? For
example when looping over counters, just use an `if` statement to check if the
counter is the one we want. Or even better would be to put the counters into a
map so you can try to fetch the one you are interested in. Or it should be safe
to downcast to a dataflow-specific `OutputReceiver` and it can have a specific
method for the counter you are interested in.
Overall I just want to avoid this pattern of intentionally throwing an
exception, and avoid "catch every exception" blocks. You never know what
exceptions might be added in the future and they shouldn't all be caught,
except when that is the specific design goal.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]