Makes sense, thank you!
Jan
On 7/15/21 9:43 PM, Chamikara Jayalath wrote:
The PCollection you feed into WriteToKafka (or any other
cross-language transform) needs to use standard coders [1].
What's the type of the PCollection before WriteToKafka ? It's possible
that you just need to provide a type hint to prevent Python from
picking the default Pickel Coder that Java doesn't understand.
See https://issues.apache.org/jira/browse/BEAM-11938
<https://issues.apache.org/jira/browse/BEAM-11938>.
Thanks,
Cham
[1]
https://github.com/apache/beam/blob/3e933b55f3d2072fb0248050f9091850933f33c7/model/pipeline/src/main/proto/beam_runner_api.proto#L784
<https://github.com/apache/beam/blob/3e933b55f3d2072fb0248050f9091850933f33c7/model/pipeline/src/main/proto/beam_runner_api.proto#L784>
On Thu, Jul 15, 2021 at 12:16 PM Jan Lukavský <je...@seznam.cz
<mailto:je...@seznam.cz>> wrote:
Sorry, my bad. I made two modifications of the source at once - the
cause of the exception is not the expansion of ReadFromKafka, but
WriteToKafka. If I remove that the Pipeline runs. So it seems I
have to
change to coder of the PCollection that is consumed by WriteToKafka.
Which makes sense.
On 7/15/21 8:44 PM, Jan Lukavský wrote:
> Hi,
>
> I hit an issue when using x-lang python pipeline (with
ReadFromKafka)
> with subsequent WindowInto with
trigger=Repeatedly(AfterCount(1)). The
> Pipeline looks as follows:
>
> (p | ReadFromKafka(
> consumer_config={'bootstrap.servers': bootstrapServer},
> topics=[inputTopic],
> expansion_service=get_expansion_service())
> | "Tokenize" >> beam.FlatMap(lambda line:
> re.findall(r'[A-Za-z\']+', line))
> | beam.WindowInto(
> window.GlobalWindows(),
> trigger=trigger.Repeatedly(trigger.AfterCount(1)),
> accumulation_mode=trigger.AccumulationMode.DISCARDING,
> allowed_lateness=window.Duration.of(0))
> ...
>
> The error is
>
> Caused by: java.lang.IllegalArgumentException: Unknown Coder URN
> beam:coder:pickled_python:v1. Known URNs:
[beam:coder:avro:generic:v1,
> beam:coder:bytes:v1, beam:coder:bool:v1, beam:coder:string_utf8:v1,
> beam:coder:kv:v1, beam:coder:varint:v1,
beam:coder:interval_window:v1,
> beam:coder:iterable:v1, beam:coder:timer:v1,
> beam:coder:length_prefix:v1, beam:coder:global_window:v1,
> beam:coder:windowed_value:v1, beam:coder:param_windowed_value:v1,
> beam:coder:double:v1, beam:coder:row:v1, beam:coder:sharded_key:v1]
> at
>
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument(Preconditions.java:440)
> at
>
org.apache.beam.runners.core.construction.CoderTranslation.fromKnownCoder(CoderTranslation.java:165)
> at
>
org.apache.beam.runners.core.construction.CoderTranslation.fromProto(CoderTranslation.java:145)
> at
>
org.apache.beam.runners.core.construction.RehydratedComponents$2.load(RehydratedComponents.java:87)
> at
>
org.apache.beam.runners.core.construction.RehydratedComponents$2.load(RehydratedComponents.java:82)
> at
>
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3528)
> at
>
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2277)
> at
>
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2154)
> at
>
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2044)
> ... 50 more
>
> Which seems like missing coder, but what I do not understand is
why is
> the coder passed to the ExpansionService in the first place. This
> happens even if I place a (map) transform between the
ReadFromKafka a
> WindowInto transform. I think the expansion service should not
need to
> know about what happens later in the Pipeline. With default trigger
> the Pipeline is able to run.
>
> Any ideas?
>
> Jan
>