damccorm opened a new issue, #21093:
URL: https://github.com/apache/beam/issues/21093
When defining a SqlTransform joining two PCollections with the same Row
schema, the expansion service throws below exception:
```
RuntimeError: java.lang.IllegalArgumentException: key already present:
SchemaCoder<Schema: Fields:
Field{name=number,
description=, type=INT64 NOT NULL, options={{}}}
Encoding positions:
{number=0}
Options:{{}}UUID:
7380513f-09ca-4209-a770-50bcfb82c3e4 UUID:
7380513f-09ca-4209-a770-50bcfb82c3e4 delegateCoder:
org.apache.beam.sdk.coders.Coder$ByteBuddy$6yeAkNko@2dd884d
at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.HashBiMap.putInverse(HashBiMap.java:315)
at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.HashBiMap.access$800(HashBiMap.java:59)
at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.HashBiMap$Inverse.put(HashBiMap.java:590)
at
java.util.AbstractMap.putAll(AbstractMap.java:281)
at
org.apache.beam.runners.core.construction.SdkComponents.create(SdkComponents.java:99)
at
org.apache.beam.runners.core.construction.RehydratedComponents.getSdkComponents(RehydratedComponents.java:184)
at
org.apache.beam.sdk.expansion.service.ExpansionService.expand(ExpansionService.java:479)
at
org.apache.beam.sdk.expansion.service.ExpansionService.expand(ExpansionService.java:546)
at
org.apache.beam.model.expansion.v1.ExpansionServiceGrpc$MethodHandlers.invoke(ExpansionServiceGrpc.java:219)
at
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:182)
at
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:331)
at
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:797)
at
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
at
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at
java.lang.Thread.run(Thread.java:748)
```
The code to produce the error:
```
p = beam.Pipeline()
numbers = (p | beam.Create(range(10))
| beam.Map(lambda
x: beam.Row(number=x)))
numbers2 = (p | beam.Create(range(5,15))
| beam.Map(lambda
x: beam.Row(number=x)))
common_numbers = {'numbers': numbers, 'numbers2': numbers2} |
beam.transforms.sql.SqlTransform("""
SELECT * FROM numbers JOIN numbers2 ON numbers.number =
numbers2.number
""")
```
However, if we define the Row schema of `numbers2` to a different schema
such as `beam.Row(number2=x)`. Then make the join `ON numbers.number =
numbers2.number2`. The transform can be successfully applied. The pipeline can
also be executed successfully later.
Imported from Jira
[BEAM-12674](https://issues.apache.org/jira/browse/BEAM-12674). Original Jira
may contain additional context.
Reported by: ningk.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]