damccorm opened a new issue, #21093:
URL: https://github.com/apache/beam/issues/21093

   When defining a SqlTransform joining two PCollections with the same Row 
schema, the expansion service throws below exception:
   
   ```
   
   RuntimeError: java.lang.IllegalArgumentException: key already present: 
SchemaCoder<Schema: Fields:
   Field{name=number,
   description=, type=INT64 NOT NULL, options={{}}}
   Encoding positions:
   {number=0}
   Options:{{}}UUID:
   7380513f-09ca-4209-a770-50bcfb82c3e4  UUID: 
7380513f-09ca-4209-a770-50bcfb82c3e4 delegateCoder: 
org.apache.beam.sdk.coders.Coder$ByteBuddy$6yeAkNko@2dd884d
        at
   
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.HashBiMap.putInverse(HashBiMap.java:315)
        at
   
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.HashBiMap.access$800(HashBiMap.java:59)
        at
   
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.HashBiMap$Inverse.put(HashBiMap.java:590)
        at
   java.util.AbstractMap.putAll(AbstractMap.java:281)
        at 
org.apache.beam.runners.core.construction.SdkComponents.create(SdkComponents.java:99)
        at
   
org.apache.beam.runners.core.construction.RehydratedComponents.getSdkComponents(RehydratedComponents.java:184)
        at
   
org.apache.beam.sdk.expansion.service.ExpansionService.expand(ExpansionService.java:479)
        at 
org.apache.beam.sdk.expansion.service.ExpansionService.expand(ExpansionService.java:546)
        at
   
org.apache.beam.model.expansion.v1.ExpansionServiceGrpc$MethodHandlers.invoke(ExpansionServiceGrpc.java:219)
        at
   
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:182)
        at
   
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:331)
        at
   
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:797)
        at
   
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
        at
   
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
        at
   
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at
   java.lang.Thread.run(Thread.java:748)
   
   ```
   
   
   The code to produce the error:
   
   ```
   
   p = beam.Pipeline()
   numbers = (p | beam.Create(range(10))
                           | beam.Map(lambda
   x: beam.Row(number=x)))
   numbers2 = (p | beam.Create(range(5,15))
                             | beam.Map(lambda
   x: beam.Row(number=x)))
   common_numbers = {'numbers': numbers, 'numbers2': numbers2} | 
       beam.transforms.sql.SqlTransform("""
   
          SELECT * FROM numbers JOIN numbers2 ON numbers.number = 
numbers2.number
       """)
   
   ```
   
   
   However, if we define the Row schema of `numbers2` to a different schema 
such as `beam.Row(number2=x)`. Then make the join `ON numbers.number = 
numbers2.number2`. The transform can be successfully applied. The pipeline can 
also be executed successfully later.
   
   
   
   
   
   Imported from Jira 
[BEAM-12674](https://issues.apache.org/jira/browse/BEAM-12674). Original Jira 
may contain additional context.
   Reported by: ningk.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to