[ 
https://issues.apache.org/jira/browse/BEAM-5674?focusedWorklogId=166930&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-166930
 ]

ASF GitHub Bot logged work on BEAM-5674:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 16/Nov/18 16:13
            Start Date: 16/Nov/18 16:13
    Worklog Time Spent: 10m 
      Work Description: lukecwik closed pull request #7054: [BEAM-5674] Add 
withKeyType to DataflowRunner.Deduplicate internals
URL: https://github.com/apache/beam/pull/7054
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 680c348967d..9eb0cfc9ab2 100644
--- 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -153,6 +153,7 @@
 import org.apache.beam.sdk.values.PValue;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.sdk.values.TypeDescriptors;
 import org.apache.beam.sdk.values.ValueWithRecordId;
 import org.apache.beam.sdk.values.WindowingStrategy;
 import org.joda.time.DateTimeUtils;
@@ -1494,7 +1495,11 @@ public void translate(ReadWithIds<?> transform, 
TranslationContext context) {
     @Override
     public PCollection<T> expand(PCollection<ValueWithRecordId<T>> input) {
       return input
-          .apply(WithKeys.of(value -> Arrays.hashCode(value.getId()) % 
NUM_RESHARD_KEYS))
+          .apply(
+              WithKeys.of(
+                      (ValueWithRecordId<T> value) ->
+                          Arrays.hashCode(value.getId()) % NUM_RESHARD_KEYS)
+                  .withKeyType(TypeDescriptors.integers()))
           // Reshuffle will dedup based on ids in ValueWithRecordId by passing 
the data through
           // WindmillSink.
           .apply(Reshuffle.of())


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 166930)
    Time Spent: 50m  (was: 40m)

> DataflowRunner.Deduplicate/WithKeys cannot infer Coder for K when running 
> with experiment "enable_custom_pubsub_source"
> -----------------------------------------------------------------------------------------------------------------------
>
>                 Key: BEAM-5674
>                 URL: https://issues.apache.org/jira/browse/BEAM-5674
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-dataflow
>    Affects Versions: 2.7.0
>         Environment: OS: Linux 64bit,
> Platform: Dataflow
>            Reporter: Duy Le
>            Assignee: Sam Rohde
>            Priority: Major
>              Labels: easyfix, patch
>          Time Spent: 50m
>  Remaining Estimate: 0h
>
> *Bug*: DataflowRunner.Deduplicate/WithKeys cannot infer Coder for K when 
> running with experiment "enable_custom_pubsub_source"
> *Steps*
>  # Start a Beam pipeline with DataflowRunner using 
> ExperimentalOptions."enable_custom_pubsub_source"
>  # Observe the result when the pipeline is being constructed.
> *Actual*: An error thrown
> {color:#FF0000}Unable to return a default Coder for 
> PubsubIO.Read/PubsubUnboundedSource/Read(PubsubSource)/DataflowRunner.Deduplicate/WithKeys/AddKeys/Map/ParMultiDo(Anonymous).output
>  [PCollection]. Correct one of the following root causes:{color}
> {color:#FF0000} No Coder has been manually specified; you may do so using 
> .setCoder().{color}
> {color:#FF0000} Inferring a Coder from the CoderRegistry failed: Cannot 
> provide coder for parameterized type org.apache.beam.sdk.values.KV<K, V>: 
> Unable to provide a Coder for K.{color}
> {color:#FF0000} Building a Coder using a registered CoderProvider 
> failed.{color}
> *Expected*: The pipeline should be constructed successfully.
> *Root cause*:
> In
> {noformat}
> DataflowRunner.Deduplicate{noformat}
> transform, it applies
> {code:java}
> WithKeys.of(){code}
> transform to an input of *ValueWithRecordId* with a function in the style of 
> Java 8 lambda.
>  
>  
> As the Javadoc states that:
>  
> {code:java}
> If using a lambda in Java 8, {@link #withKeyType(TypeDescriptor)} must be 
> called on the result {@link PTransform}{code}
>  
> *Suggested Solution*:
> Since the lambda function returns a hashed code (_int_) of the
> {code:java}
> value.getId(){code}
> (_byte[]_), can we just use
> {code:java}
> withKeyType(TypeDescriptors.integers()){code}
> right after the
> {code:java}
> WithKeys.of(){code}
> method?
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to