damccorm commented on code in PR #23830:
URL: https://github.com/apache/beam/pull/23830#discussion_r1004757497


##########
sdks/python/apache_beam/ml/inference/base.py:
##########
@@ -337,6 +337,7 @@ def __init__(self, namespace: str):
     # Metrics
     self._inference_counter = beam.metrics.Metrics.counter(
         namespace, 'num_inferences')
+    self.failed_batches_counter = 0

Review Comment:
   This should probably be initialized with 
`beam.metrics.Metrics.counter(namespace, 'failed_batches_counter')` so that we 
correctly capture the namespace



##########
sdks/python/apache_beam/ml/inference/base.py:
##########
@@ -426,17 +427,22 @@ def setup(self):
 
   def process(self, batch, inference_args):
     start_time = _to_microseconds(self._clock.time_ns())
-    result_generator = self._model_handler.run_inference(
+    try:
+      result_generator = self._model_handler.run_inference(
         batch, self._model, inference_args)
-    predictions = list(result_generator)
+    except BaseException:
+      self._metrics_collector.failed_batches_counter.inc()
+      raise
+    else:

Review Comment:
   We don't need to nest this clause in an else if we're raising the exception



##########
sdks/python/apache_beam/ml/inference/base_test.py:
##########
@@ -171,6 +171,38 @@ def test_unexpected_inference_args_passed(self):
             FakeModelHandlerFailsOnInferenceArgs(),
             inference_args=inference_args)
 
+  def test_increment_failed_batches_counter(self):
+    with self.assertRaises(ValueError, FakeModelHandlerFailsOnInferenceArgs):
+      with TestPipeline() as pipeline:
+        examples = [1, 5, 3, 10]
+        pcoll = pipeline | 'start' >> beam.Create(examples)
+        inference_args = {'key': True}
+        _ = pcoll | base.RunInference(FakeModelHandlerFailsOnInferenceArgs(),
+                inference_args=inference_args)
+        run_result = pipeline.run()
+        run_result.wait_until_finish()
+
+        metric_results = (
+            
run_result.metrics().query(MetricsFilter().with_name('failed_batches_counter')))
+        num_failed_batches_counter = metric_results['counters'][0]
+        self.assertEqual(num_failed_batches_counter.committed, 1)

Review Comment:
   Is this guaranteed to be 1, or could it be larger if the example got batched 
differently (e.g. if our batch size is 1)? Could we pass through 
`min_batch_size` and `max_batch_size` to  force the batching to a reasonable 
size?



##########
sdks/python/apache_beam/ml/inference/base.py:
##########
@@ -337,6 +337,7 @@ def __init__(self, namespace: str):
     # Metrics
     self._inference_counter = beam.metrics.Metrics.counter(
         namespace, 'num_inferences')
+    self.failed_batches_counter = 0

Review Comment:
   It looks like this is causing the tests to fail to run FWIW - 
https://ci-beam.apache.org/job/beam_PreCommit_Python_Commit/25468/consoleFull



##########
sdks/python/apache_beam/ml/inference/base_test.py:
##########
@@ -171,6 +171,38 @@ def test_unexpected_inference_args_passed(self):
             FakeModelHandlerFailsOnInferenceArgs(),
             inference_args=inference_args)
 
+  def test_increment_failed_batches_counter(self):
+    with self.assertRaises(ValueError, FakeModelHandlerFailsOnInferenceArgs):
+      with TestPipeline() as pipeline:
+        examples = [1, 5, 3, 10]
+        pcoll = pipeline | 'start' >> beam.Create(examples)
+        inference_args = {'key': True}
+        _ = pcoll | base.RunInference(FakeModelHandlerFailsOnInferenceArgs(),
+                inference_args=inference_args)
+        run_result = pipeline.run()
+        run_result.wait_until_finish()
+
+        metric_results = (
+            
run_result.metrics().query(MetricsFilter().with_name('failed_batches_counter')))
+        num_failed_batches_counter = metric_results['counters'][0]
+        self.assertEqual(num_failed_batches_counter.committed, 1)

Review Comment:
   FWIW, this whole setup may just cause the job to fail and retry a few times 
(in which case the failed counter would probably be 3 since we'd do that many 
retries). One testing alternative would be to optionally track if we've failed 
an inference in the FakeModelHandlerFailsOnInferenceArgs and succeed on retry. 
That would simulate a transient failure



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to