[
https://issues.apache.org/jira/browse/BEAM-6695?focusedWorklogId=242993&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-242993
]
ASF GitHub Bot logged work on BEAM-6695:
----------------------------------------
Author: ASF GitHub Bot
Created on: 16/May/19 00:02
Start Date: 16/May/19 00:02
Worklog Time Spent: 10m
Work Description: robinyqiu commented on issue #8206: [BEAM-6695] Latest
PTransform for Python SDK
URL: https://github.com/apache/beam/pull/8206#issuecomment-492860487
Hi @ttanay , I spent some time to investigate into the issues. I am sharing
my findings here and I hope it will be helpful.
1) I agree with what @aaltay said here.
> In your example, the inputs you are passing are not in type Tuple[K, V].
For example, whatever test you come up with, it should be valid to do
Create(elem_list) | GroupByKey(), and that input will work on PerKey transform.
In other words, the action item is that we should add type annotations here
```python
@with_input_types(T)
@with_output_types(T)
class Globally(ptransform.PTransform): ...
```
and here
```python
@with_input_types(KV[K, V])
@with_output_types(KV[K, V])
class PerKey(ptransform.PTransform): ...
```
This is the intended behavior for these transforms. I know that the current
tests will break after we add these annotations, but I think the right thing to
do is that we make these changes and fix the tests (how we create a PCollection
of timestamped values/kvs using `Create`, to be specific).
2) So how can we create a PCollection of kvs that have timestamps for
testing? A very simple fix is that we can add a `Map(lambda x: x)` transform
after `Create`. With this change you can also use `TimestampedValue` instead of
`WindowedValue` here.
I have tried it and the following test passed:
```python
def test_per_key(self):
l = [window.TimestampedValue(('a', 1), 300),
window.TimestampedValue(('b', 3), 100),
window.TimestampedValue(('a', 2), 200)]
with TestPipeline() as p:
pc = p | Create(l) | Map(lambda x: x)
latest = pc | combine.Latest.PerKey()
assert_that(latest, equal_to([('a', 1), ('b', 3)]))
```
3) The reason why this hack will make the type checking works is related to
some underlying implementation details. To put it simply, both `WindowedValue`
and `TimestampValue` are special objects that can be properly handled by a
`DoFn`, but `Create` won't treat them differently than other object types. By
inserting a `Map` (based on `DoFn` under the hood) after `Create`, we force the
data to go through a `DoFn`, so the output will be a PCollection of only the
`element` type, instead of the `TimestampedValue(element, timestamp)` type.
Hope this explanation makes sense to you. Let me know if you have questions.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 242993)
Time Spent: 9h 20m (was: 9h 10m)
> Latest transform for Python SDK
> -------------------------------
>
> Key: BEAM-6695
> URL: https://issues.apache.org/jira/browse/BEAM-6695
> Project: Beam
> Issue Type: New Feature
> Components: sdk-py-core
> Reporter: Ahmet Altay
> Assignee: Tanay Tummalapalli
> Priority: Minor
> Time Spent: 9h 20m
> Remaining Estimate: 0h
>
> Add a PTransform} and Combine.CombineFn for computing the latest element in a
> PCollection.
> It should offer the same API as its Java counterpart:
> https://github.com/apache/beam/blob/11a977b8b26eff2274d706541127c19dc93131a2/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Latest.java
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)