tvalentyn commented on code in PR #28182:
URL: https://github.com/apache/beam/pull/28182#discussion_r1307650919


##########
sdks/python/apache_beam/ml/inference/base_test.py:
##########
@@ -319,6 +326,24 @@ def mult_two(example: str) -> int:
     with self.assertRaises(ValueError):
       base.KeyedModelHandler(mhs)
 
+  def test_keyed_model_handler_get_num_bytes(self):
+    mh = base.KeyedModelHandler(FakeModelHandler(num_bytes_per_element=10))
+    batch = [('key1', 1), ('key2', 2), ('key1', 3)]
+    expected = len(pickle.dumps(('key1', 'key2', 'key1'))) + 30
+    actual = mh.get_num_bytes(batch)
+    self.assertEqual(expected, actual)
+
+  def test_keyed_model_handler_multiple_models_get_num_bytes(self):
+    mhs = [
+        base.KeyMhMapping(['key1'], 
FakeModelHandler(num_bytes_per_element=10)),

Review Comment:
   do you still plan to change the name of KeyMhMapping?



##########
sdks/python/apache_beam/ml/inference/base.py:
##########
@@ -477,11 +477,19 @@ def run_inference(
     return predictions
 
   def get_num_bytes(self, batch: Sequence[Tuple[KeyT, ExampleT]]) -> int:
+    keys, unkeyed_batch = zip(*batch)
+    batch_bytes = len(pickle.dumps(keys))
     if self._single_model:
-      keys, unkeyed_batch = zip(*batch)
-      return len(
-          pickle.dumps(keys)) + self._unkeyed.get_num_bytes(unkeyed_batch)
-    return len(pickle.dumps(batch))
+      return batch_bytes + self._unkeyed.get_num_bytes(unkeyed_batch)
+
+    batch_by_key = defaultdict(list)
+    for pair in batch:

Review Comment:
   nit: consider using name readability if pair can be unpacked: 
   
   ```
       for key, examples in batch:
         batch_by_key[key].append(examples)
   ```



##########
sdks/python/apache_beam/ml/inference/base.py:
##########
@@ -477,11 +477,19 @@ def run_inference(
     return predictions
 
   def get_num_bytes(self, batch: Sequence[Tuple[KeyT, ExampleT]]) -> int:
+    keys, unkeyed_batch = zip(*batch)
+    batch_bytes = len(pickle.dumps(keys))
     if self._single_model:
-      keys, unkeyed_batch = zip(*batch)
-      return len(
-          pickle.dumps(keys)) + self._unkeyed.get_num_bytes(unkeyed_batch)
-    return len(pickle.dumps(batch))
+      return batch_bytes + self._unkeyed.get_num_bytes(unkeyed_batch)
+
+    batch_by_key = defaultdict(list)
+    for pair in batch:

Review Comment:
   also there is conflation of batch of keyed prediction inputs vs a batch of 
elements for a single key.  not sure how to clarify, could use a courtesy 
variable like:
   
   ```    
       if self._single_model:
         return batch_bytes + self._unkeyed.get_num_bytes(unkeyed_batch)
       else:
         keyed_batches = batch
   
       for key, examples in keyed_batches:
         ...
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to