nehsyc commented on a change in pull request #13493:
URL: https://github.com/apache/beam/pull/13493#discussion_r538631115
##########
File path: sdks/python/apache_beam/transforms/util.py
##########
@@ -815,18 +817,25 @@ def __init__(self, batch_size,
max_buffering_duration_secs=None):
_shard_id_prefix = uuid.uuid4().bytes
def expand(self, pcoll):
+ key_type, value_type = pcoll.element_type.tuple_types
sharded_pcoll = pcoll | Map(
lambda key_value: (
ShardedKey(
key_value[0],
# Use [uuid, thread id] as the shard id.
GroupIntoBatches.WithShardedKey._shard_id_prefix + bytes(
threading.get_ident().to_bytes(8, 'big'))),
- key_value[1]))
+ key_value[1])).with_output_types(
+ TupleConstraint([ShardedKeyType[key_type], value_type]))
return (
sharded_pcoll
| GroupIntoBatches(self.batch_size,
self.max_buffering_duration_secs))
+ def infer_output_type(self, input_type):
Review comment:
Couldn't use the decorator due to the error I mentioned in another
comment.
##########
File path: sdks/python/apache_beam/transforms/util.py
##########
@@ -815,18 +817,25 @@ def __init__(self, batch_size,
max_buffering_duration_secs=None):
_shard_id_prefix = uuid.uuid4().bytes
def expand(self, pcoll):
+ key_type, value_type = pcoll.element_type.tuple_types
sharded_pcoll = pcoll | Map(
lambda key_value: (
ShardedKey(
key_value[0],
# Use [uuid, thread id] as the shard id.
GroupIntoBatches.WithShardedKey._shard_id_prefix + bytes(
threading.get_ident().to_bytes(8, 'big'))),
- key_value[1]))
+ key_value[1])).with_output_types(
+ TupleConstraint([ShardedKeyType[key_type], value_type]))
Review comment:
Why? The MapFn converts the input K, V to ShardedKey, V. The output of
the composite transform is ShardedKey, Iterable[V].
##########
File path: sdks/python/apache_beam/transforms/util.py
##########
@@ -785,7 +788,6 @@ def expand(self, pcoll):
@experimental()
@typehints.with_input_types(Tuple[K, V])
- @typehints.with_output_types(Tuple[K, Iterable[V]])
Review comment:
Because I got an error saying:
```
arg = ShardedKey[int], msg = 'Tuple[t0, t1, ...]: each t must be a type.',
is_argument = True
def _type_check(arg, msg, is_argument=True):
"""Check that the argument is a type, and return it (internal
helper).
As a special case, accept None and return type(None) instead. Also
wrap strings
into ForwardRef instances. Consider several corner cases, for
example plain
special forms like Union are not valid, while Union[int, str] is OK,
etc.
The msg argument is a human-readable error message, e.g::
"Union[arg, ...]: arg should be a type."
We append the repr() of the actual value (truncated to 100 chars).
"""
invalid_generic_forms = (Generic, Protocol)
if is_argument:
invalid_generic_forms = invalid_generic_forms + (ClassVar, Final)
if arg is None:
return type(None)
if isinstance(arg, str):
return ForwardRef(arg)
if (isinstance(arg, _GenericAlias) and
arg.__origin__ in invalid_generic_forms):
raise TypeError(f"{arg} is not valid as type argument")
if (isinstance(arg, _SpecialForm) and arg not in (Any, NoReturn) or
arg in (Generic, Protocol)):
raise TypeError(f"Plain {arg} is not valid as type argument")
if isinstance(arg, (type, TypeVar, ForwardRef)):
return arg
if not callable(arg):
> raise TypeError(f"{msg} Got {arg!r:.100}.")
E TypeError: Tuple[t0, t1, ...]: each t must be a type. Got
ShardedKey[int].
../../../../../../.pyenv/versions/3.8.5/lib/python3.8/typing.py:149:
TypeError
```
Any insights?
##########
File path: sdks/python/apache_beam/transforms/util.py
##########
@@ -785,7 +788,6 @@ def expand(self, pcoll):
@experimental()
@typehints.with_input_types(Tuple[K, V])
- @typehints.with_output_types(Tuple[K, Iterable[V]])
Review comment:
Udi also had a similar question. I got TypeError when trying to use
Tuple[ShardedKey,...]. See my reply below.
##########
File path: sdks/python/apache_beam/typehints/sharded_key_type.py
##########
@@ -25,8 +25,12 @@
from apache_beam.typehints.typehints import match_type_variables
from apache_beam.utils.sharded_key import ShardedKey
+from future.utils import with_metaclass
-class ShardedKeyTypeConstraint(typehints.TypeConstraint):
+
+class ShardedKeyTypeConstraint(with_metaclass(typehints.GetitemConstructor,
Review comment:
`GetItemConstructor` basically defines `__getitem__`. I tried getting
rid of it and defining `__getitem__` directly but I got the error
```
TypeError: 'type' object is not subscriptable
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]