[ 
https://issues.apache.org/jira/browse/BEAM-6695?focusedWorklogId=242993&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-242993
 ]

ASF GitHub Bot logged work on BEAM-6695:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 16/May/19 00:02
            Start Date: 16/May/19 00:02
    Worklog Time Spent: 10m 
      Work Description: robinyqiu commented on issue #8206: [BEAM-6695] Latest 
PTransform for Python SDK
URL: https://github.com/apache/beam/pull/8206#issuecomment-492860487
 
 
   Hi @ttanay , I spent some time to investigate into the issues. I am sharing 
my findings here and I hope it will be helpful.
   
   1) I agree with what @aaltay said here.
   > In your example, the inputs you are passing are not in type Tuple[K, V]. 
For example, whatever test you come up with, it should be valid to do 
Create(elem_list) | GroupByKey(), and that input will work on PerKey transform.
   
   In other words, the action item is that we should add type annotations here
   ```python
   @with_input_types(T)
   @with_output_types(T)
   class Globally(ptransform.PTransform): ...
   ```
   and here
   ```python
   @with_input_types(KV[K, V])
   @with_output_types(KV[K, V])
   class PerKey(ptransform.PTransform): ...
   ```
   This is the intended behavior for these transforms. I know that the current 
tests will break after we add these annotations, but I think the right thing to 
do is that we make these changes and fix the tests (how we create a PCollection 
of timestamped values/kvs using `Create`, to be specific).
   
   2) So how can we create a PCollection of kvs that have timestamps for 
testing? A very simple fix is that we can add a `Map(lambda x: x)` transform 
after `Create`. With this change you can also use `TimestampedValue` instead of 
`WindowedValue` here.
   
   I have tried it and the following test passed:
   ```python
     def test_per_key(self):
       l = [window.TimestampedValue(('a', 1), 300),
            window.TimestampedValue(('b', 3), 100),
            window.TimestampedValue(('a', 2), 200)]
       with TestPipeline() as p:
         pc = p | Create(l) | Map(lambda x: x)
         latest = pc | combine.Latest.PerKey()
         assert_that(latest, equal_to([('a', 1), ('b', 3)]))
   ```
   
   3) The reason why this hack will make the type checking works is related to 
some underlying implementation details. To put it simply, both `WindowedValue` 
and `TimestampValue` are special objects that can be properly handled by a 
`DoFn`, but `Create` won't treat them differently than other object types. By 
inserting a `Map` (based on `DoFn` under the hood) after `Create`, we force the 
data to go through a `DoFn`, so the output will be a PCollection of only the 
`element` type, instead of the `TimestampedValue(element, timestamp)` type. 
Hope this explanation makes sense to you. Let me know if you have questions. 
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 242993)
    Time Spent: 9h 20m  (was: 9h 10m)

> Latest transform for Python SDK
> -------------------------------
>
>                 Key: BEAM-6695
>                 URL: https://issues.apache.org/jira/browse/BEAM-6695
>             Project: Beam
>          Issue Type: New Feature
>          Components: sdk-py-core
>            Reporter: Ahmet Altay
>            Assignee: Tanay Tummalapalli
>            Priority: Minor
>          Time Spent: 9h 20m
>  Remaining Estimate: 0h
>
> Add a PTransform} and Combine.CombineFn for computing the latest element in a 
> PCollection.
> It should offer the same API as its Java counterpart: 
> https://github.com/apache/beam/blob/11a977b8b26eff2274d706541127c19dc93131a2/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Latest.java



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to