[
https://issues.apache.org/jira/browse/BEAM-10475?focusedWorklogId=521452&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-521452
]
ASF GitHub Bot logged work on BEAM-10475:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 07/Dec/20 23:25
Start Date: 07/Dec/20 23:25
Worklog Time Spent: 10m
Work Description: udim commented on a change in pull request #13493:
URL: https://github.com/apache/beam/pull/13493#discussion_r537894802
##########
File path: sdks/python/apache_beam/transforms/util.py
##########
@@ -72,6 +72,9 @@
from apache_beam.transforms.window import NonMergingWindowFn
from apache_beam.transforms.window import TimestampCombiner
from apache_beam.transforms.window import TimestampedValue
+from apache_beam.typehints.sharded_key_type import ShardedKeyType
+from apache_beam.typehints.typehints import IterableTypeConstraint
+from apache_beam.typehints.typehints import TupleConstraint
Review comment:
Please use:
```
from apache_beam.typehints import ShardedKeyType
```
And use `Tuple` and `Iterable` types from the typing module (they will get
converted to the ones in typehints). In general use either `typing.Tuple` or
`apache_beam.typehints.Tuple`. The constraint classes are internal to the
typehints modules.
For the `ShardedKeyType` import to work, you can add the corresponding
import statement to `apache_beam/typehints/__init__.py`. This also avoid the
cyclic import you mentioned that happened with my suggestion to put
ShardedKeyType in typehints.py.
##########
File path: sdks/python/apache_beam/transforms/util.py
##########
@@ -785,7 +788,6 @@ def expand(self, pcoll):
@experimental()
@typehints.with_input_types(Tuple[K, V])
- @typehints.with_output_types(Tuple[K, Iterable[V]])
Review comment:
Any reason why this isn't:
```python
@typehints.with_output_types(Tuple[ShardedKeyType[K], Iterable[V]])
```
?
##########
File path: sdks/python/apache_beam/transforms/util.py
##########
@@ -815,18 +817,25 @@ def __init__(self, batch_size,
max_buffering_duration_secs=None):
_shard_id_prefix = uuid.uuid4().bytes
def expand(self, pcoll):
+ key_type, value_type = pcoll.element_type.tuple_types
sharded_pcoll = pcoll | Map(
lambda key_value: (
ShardedKey(
key_value[0],
# Use [uuid, thread id] as the shard id.
GroupIntoBatches.WithShardedKey._shard_id_prefix + bytes(
threading.get_ident().to_bytes(8, 'big'))),
- key_value[1]))
+ key_value[1])).with_output_types(
+ TupleConstraint([ShardedKeyType[key_type], value_type]))
Review comment:
If you keep this line, I believe the value_type should be
`Iterable[value_type]`.
##########
File path: sdks/python/apache_beam/typehints/sharded_key_type.py
##########
@@ -25,8 +25,12 @@
from apache_beam.typehints.typehints import match_type_variables
from apache_beam.utils.sharded_key import ShardedKey
+from future.utils import with_metaclass
-class ShardedKeyTypeConstraint(typehints.TypeConstraint):
+
+class ShardedKeyTypeConstraint(with_metaclass(typehints.GetitemConstructor,
Review comment:
I don't know what `with_metaclass` does. You could add the `__getitem__`
implementation directly if it's causing issues.
##########
File path: sdks/python/apache_beam/transforms/util.py
##########
@@ -815,18 +817,25 @@ def __init__(self, batch_size,
max_buffering_duration_secs=None):
_shard_id_prefix = uuid.uuid4().bytes
def expand(self, pcoll):
+ key_type, value_type = pcoll.element_type.tuple_types
sharded_pcoll = pcoll | Map(
lambda key_value: (
ShardedKey(
key_value[0],
# Use [uuid, thread id] as the shard id.
GroupIntoBatches.WithShardedKey._shard_id_prefix + bytes(
threading.get_ident().to_bytes(8, 'big'))),
- key_value[1]))
+ key_value[1])).with_output_types(
+ TupleConstraint([ShardedKeyType[key_type], value_type]))
return (
sharded_pcoll
| GroupIntoBatches(self.batch_size,
self.max_buffering_duration_secs))
+ def infer_output_type(self, input_type):
Review comment:
If you keep the with_output_types decorator, please remove this.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 521452)
Time Spent: 23h (was: 22h 50m)
> GroupIntoBatches with Runner-determined Sharding
> ------------------------------------------------
>
> Key: BEAM-10475
> URL: https://issues.apache.org/jira/browse/BEAM-10475
> Project: Beam
> Issue Type: Improvement
> Components: runner-dataflow
> Reporter: Siyuan Chen
> Assignee: Siyuan Chen
> Priority: P2
> Labels: GCP, performance
> Time Spent: 23h
> Remaining Estimate: 0h
>
> [https://s.apache.org/sharded-group-into-batches|https://s.apache.org/sharded-group-into-batches__]
> Improve the existing Beam transform, GroupIntoBatches, to allow runners to
> choose different sharding strategies depending on how the data needs to be
> grouped. The goal is to help with the situation where the elements to process
> need to be co-located to reduce the overhead that would otherwise be incurred
> per element, while not losing the ability to scale the parallelism. The
> essential idea is to build a stateful DoFn with shardable states.
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)