Makes sense, thank you!

 Jan

On 7/15/21 9:43 PM, Chamikara Jayalath wrote:
The PCollection you feed into WriteToKafka (or any other cross-language transform) needs to use standard coders [1]. What's the type of the PCollection before WriteToKafka ? It's possible that you just need to provide a type hint to prevent Python from picking the default Pickel Coder that Java doesn't understand. See https://issues.apache.org/jira/browse/BEAM-11938 <https://issues.apache.org/jira/browse/BEAM-11938>.

Thanks,
Cham

[1] https://github.com/apache/beam/blob/3e933b55f3d2072fb0248050f9091850933f33c7/model/pipeline/src/main/proto/beam_runner_api.proto#L784 <https://github.com/apache/beam/blob/3e933b55f3d2072fb0248050f9091850933f33c7/model/pipeline/src/main/proto/beam_runner_api.proto#L784>

On Thu, Jul 15, 2021 at 12:16 PM Jan Lukavský <je...@seznam.cz <mailto:je...@seznam.cz>> wrote:

    Sorry, my bad. I made two modifications of the source at once - the
    cause of the exception is not the expansion of ReadFromKafka, but
    WriteToKafka. If I remove that the Pipeline runs. So it seems I
    have to
    change to coder of the PCollection that is consumed by WriteToKafka.
    Which makes sense.

    On 7/15/21 8:44 PM, Jan Lukavský wrote:
    > Hi,
    >
    > I hit an issue when using x-lang python pipeline (with
    ReadFromKafka)
    > with subsequent WindowInto with
    trigger=Repeatedly(AfterCount(1)). The
    > Pipeline looks as follows:
    >
    >   (p | ReadFromKafka(
    >       consumer_config={'bootstrap.servers': bootstrapServer},
    >       topics=[inputTopic],
    >       expansion_service=get_expansion_service())
    >     | "Tokenize" >> beam.FlatMap(lambda line:
    > re.findall(r'[A-Za-z\']+', line))
    >     | beam.WindowInto(
    >           window.GlobalWindows(),
    > trigger=trigger.Repeatedly(trigger.AfterCount(1)),
    > accumulation_mode=trigger.AccumulationMode.DISCARDING,
    >           allowed_lateness=window.Duration.of(0))
    >     ...
    >
    > The error is
    >
    > Caused by: java.lang.IllegalArgumentException: Unknown Coder URN
    > beam:coder:pickled_python:v1. Known URNs:
    [beam:coder:avro:generic:v1,
    > beam:coder:bytes:v1, beam:coder:bool:v1, beam:coder:string_utf8:v1,
    > beam:coder:kv:v1, beam:coder:varint:v1,
    beam:coder:interval_window:v1,
    > beam:coder:iterable:v1, beam:coder:timer:v1,
    > beam:coder:length_prefix:v1, beam:coder:global_window:v1,
    > beam:coder:windowed_value:v1, beam:coder:param_windowed_value:v1,
    > beam:coder:double:v1, beam:coder:row:v1, beam:coder:sharded_key:v1]
    >     at
    >
    
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument(Preconditions.java:440)
    >     at
    >
    
org.apache.beam.runners.core.construction.CoderTranslation.fromKnownCoder(CoderTranslation.java:165)
    >     at
    >
    
org.apache.beam.runners.core.construction.CoderTranslation.fromProto(CoderTranslation.java:145)
    >     at
    >
    
org.apache.beam.runners.core.construction.RehydratedComponents$2.load(RehydratedComponents.java:87)
    >     at
    >
    
org.apache.beam.runners.core.construction.RehydratedComponents$2.load(RehydratedComponents.java:82)
    >     at
    >
    
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3528)
    >     at
    >
    
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2277)
    >     at
    >
    
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2154)
    >     at
    >
    
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2044)
    >     ... 50 more
    >
    > Which seems like missing coder, but what I do not understand is
    why is
    > the coder passed to the ExpansionService in the first place. This
    > happens even if I place a (map) transform between the
    ReadFromKafka a
    > WindowInto transform. I think the expansion service should not
    need to
    > know about what happens later in the Pipeline. With default trigger
    > the Pipeline is able to run.
    >
    > Any ideas?
    >
    >  Jan
    >

Reply via email to