[
https://issues.apache.org/jira/browse/BEAM-12674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17494274#comment-17494274
]
Brian Hulette commented on BEAM-12674:
--------------------------------------
Looked at this offline with Andy. The problem is that Java considers two
schemas equal if they have the exact same field names and types, regardless of
the UUID (which Python is setting, and is unique between the two schemas):
https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java
So the expansion service fails to rehydrate these coders, because they look
equal, and thus can't be stuck into a BiMap:
https://github.com/apache/beam/blob/6c04c3c523e7b60523f41d617582ca901fc7a733/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java#L99
I'm wondering if this is actually a bug in Java as well, I don't know that Java
has any logic to de-dupe Schemas created with the same set of fields. Do we do
that anywhere [~reuvenlax]?
It would be interesting to see if we can replicate this bug in a Java-only
context. One approach might be to make an equivalent Java pipeline (a
SqlTransform that joins two pcollections with equivalent schemas), then do a
proto roundtrip with it using
[PipelineTranslation.toProto|https://github.com/apache/beam/blob/6c04c3c523e7b60523f41d617582ca901fc7a733/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineTranslation.java#L42]
and
[RehydratedComponents|https://github.com/apache/beam/blob/6c04c3c523e7b60523f41d617582ca901fc7a733/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java#L478].
> SqlTransform IllegalArgumentException: key already present: SchemaCoder
> -----------------------------------------------------------------------
>
> Key: BEAM-12674
> URL: https://issues.apache.org/jira/browse/BEAM-12674
> Project: Beam
> Issue Type: Improvement
> Components: cross-language, dsl-sql, sdk-py-core
> Reporter: Ning
> Assignee: Andy Ye
> Priority: P3
>
> When defining a SqlTransform joining two PCollections with the same Row
> schema, the expansion service throws below exception:
> {code:java}
> RuntimeError: java.lang.IllegalArgumentException: key already present:
> SchemaCoder<Schema: Fields:
> Field{name=number, description=, type=INT64 NOT NULL, options={{}}}
> Encoding positions:
> {number=0}
> Options:{{}}UUID: 7380513f-09ca-4209-a770-50bcfb82c3e4 UUID:
> 7380513f-09ca-4209-a770-50bcfb82c3e4 delegateCoder:
> org.apache.beam.sdk.coders.Coder$ByteBuddy$6yeAkNko@2dd884d
> at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.HashBiMap.putInverse(HashBiMap.java:315)
> at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.HashBiMap.access$800(HashBiMap.java:59)
> at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.HashBiMap$Inverse.put(HashBiMap.java:590)
> at java.util.AbstractMap.putAll(AbstractMap.java:281)
> at
> org.apache.beam.runners.core.construction.SdkComponents.create(SdkComponents.java:99)
> at
> org.apache.beam.runners.core.construction.RehydratedComponents.getSdkComponents(RehydratedComponents.java:184)
> at
> org.apache.beam.sdk.expansion.service.ExpansionService.expand(ExpansionService.java:479)
> at
> org.apache.beam.sdk.expansion.service.ExpansionService.expand(ExpansionService.java:546)
> at
> org.apache.beam.model.expansion.v1.ExpansionServiceGrpc$MethodHandlers.invoke(ExpansionServiceGrpc.java:219)
> at
> org.apache.beam.vendor.grpc.v1p36p0.io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:182)
> at
> org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:331)
> at
> org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:797)
> at
> org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
> at
> org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> {code}
> The code to produce the error:
> {code:python}
> p = beam.Pipeline()
> numbers = (p | beam.Create(range(10))
> | beam.Map(lambda x: beam.Row(number=x)))
> numbers2 = (p | beam.Create(range(5,15))
> | beam.Map(lambda x: beam.Row(number=x)))
> common_numbers = {'numbers': numbers, 'numbers2': numbers2} |
> beam.transforms.sql.SqlTransform("""
> SELECT * FROM numbers JOIN numbers2 ON numbers.number =
> numbers2.number
> """)
> {code}
> However, if we define the Row schema of `numbers2` to a different schema such
> as `beam.Row(number2=x)`. Then make the join `ON numbers.number =
> numbers2.number2`. The transform can be successfully applied. The pipeline
> can also be executed successfully later.
--
This message was sent by Atlassian Jira
(v8.20.1#820001)