nehsyc commented on a change in pull request #13493:
URL: https://github.com/apache/beam/pull/13493#discussion_r538631115



##########
File path: sdks/python/apache_beam/transforms/util.py
##########
@@ -815,18 +817,25 @@ def __init__(self, batch_size, 
max_buffering_duration_secs=None):
     _shard_id_prefix = uuid.uuid4().bytes
 
     def expand(self, pcoll):
+      key_type, value_type = pcoll.element_type.tuple_types
       sharded_pcoll = pcoll | Map(
           lambda key_value: (
               ShardedKey(
                   key_value[0],
                   # Use [uuid, thread id] as the shard id.
                   GroupIntoBatches.WithShardedKey._shard_id_prefix + bytes(
                       threading.get_ident().to_bytes(8, 'big'))),
-              key_value[1]))
+              key_value[1])).with_output_types(
+                  TupleConstraint([ShardedKeyType[key_type], value_type]))
       return (
           sharded_pcoll
           | GroupIntoBatches(self.batch_size, 
self.max_buffering_duration_secs))
 
+    def infer_output_type(self, input_type):

Review comment:
       Couldn't use the decorator due to the error I mentioned in another 
comment.

##########
File path: sdks/python/apache_beam/transforms/util.py
##########
@@ -815,18 +817,25 @@ def __init__(self, batch_size, 
max_buffering_duration_secs=None):
     _shard_id_prefix = uuid.uuid4().bytes
 
     def expand(self, pcoll):
+      key_type, value_type = pcoll.element_type.tuple_types
       sharded_pcoll = pcoll | Map(
           lambda key_value: (
               ShardedKey(
                   key_value[0],
                   # Use [uuid, thread id] as the shard id.
                   GroupIntoBatches.WithShardedKey._shard_id_prefix + bytes(
                       threading.get_ident().to_bytes(8, 'big'))),
-              key_value[1]))
+              key_value[1])).with_output_types(
+                  TupleConstraint([ShardedKeyType[key_type], value_type]))

Review comment:
       Why? The MapFn converts the input K, V to ShardedKey, V. The output of 
the composite transform is ShardedKey, Iterable[V].

##########
File path: sdks/python/apache_beam/transforms/util.py
##########
@@ -785,7 +788,6 @@ def expand(self, pcoll):
 
   @experimental()
   @typehints.with_input_types(Tuple[K, V])
-  @typehints.with_output_types(Tuple[K, Iterable[V]])

Review comment:
       Because I got an error saying:
   ```
   arg = ShardedKey[int], msg = 'Tuple[t0, t1, ...]: each t must be a type.', 
is_argument = True
   
       def _type_check(arg, msg, is_argument=True):
           """Check that the argument is a type, and return it (internal 
helper).
       
           As a special case, accept None and return type(None) instead. Also 
wrap strings
           into ForwardRef instances. Consider several corner cases, for 
example plain
           special forms like Union are not valid, while Union[int, str] is OK, 
etc.
           The msg argument is a human-readable error message, e.g::
       
               "Union[arg, ...]: arg should be a type."
       
           We append the repr() of the actual value (truncated to 100 chars).
           """
           invalid_generic_forms = (Generic, Protocol)
           if is_argument:
               invalid_generic_forms = invalid_generic_forms + (ClassVar, Final)
       
           if arg is None:
               return type(None)
           if isinstance(arg, str):
               return ForwardRef(arg)
           if (isinstance(arg, _GenericAlias) and
                   arg.__origin__ in invalid_generic_forms):
               raise TypeError(f"{arg} is not valid as type argument")
           if (isinstance(arg, _SpecialForm) and arg not in (Any, NoReturn) or
                   arg in (Generic, Protocol)):
               raise TypeError(f"Plain {arg} is not valid as type argument")
           if isinstance(arg, (type, TypeVar, ForwardRef)):
               return arg
           if not callable(arg):
   >           raise TypeError(f"{msg} Got {arg!r:.100}.")
   E           TypeError: Tuple[t0, t1, ...]: each t must be a type. Got 
ShardedKey[int].
   
   ../../../../../../.pyenv/versions/3.8.5/lib/python3.8/typing.py:149: 
TypeError
   
   ```
   Any insights?

##########
File path: sdks/python/apache_beam/transforms/util.py
##########
@@ -785,7 +788,6 @@ def expand(self, pcoll):
 
   @experimental()
   @typehints.with_input_types(Tuple[K, V])
-  @typehints.with_output_types(Tuple[K, Iterable[V]])

Review comment:
       Udi also had a similar question. I got TypeError when trying to use 
Tuple[ShardedKey,...]. See my reply below. 

##########
File path: sdks/python/apache_beam/typehints/sharded_key_type.py
##########
@@ -25,8 +25,12 @@
 from apache_beam.typehints.typehints import match_type_variables
 from apache_beam.utils.sharded_key import ShardedKey
 
+from future.utils import with_metaclass
 
-class ShardedKeyTypeConstraint(typehints.TypeConstraint):
+
+class ShardedKeyTypeConstraint(with_metaclass(typehints.GetitemConstructor,

Review comment:
       `GetItemConstructor` basically defines `__getitem__`. I tried getting 
rid of it and defining `__getitem__` directly but I got the error
   ```
   TypeError: 'type' object is not subscriptable
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to