kennknowles opened a new issue, #19445:
URL: https://github.com/apache/beam/issues/19445
My working environment:
* Apache Beam Java SDK version: works with 2.9.0 but failed with 2.11.0
* Runner: failed with both Direct Runner and Dataflow Runner
* Application code: Scala (note I did not use Scio)
I tried to change Apache Beam Java SDK version from 2.9.0 to 2.11.0 and
deploy it to Dataflow but I got this error. It works with 2.9.0.
```
Exception in thread "main" java.lang.IncompatibleClassChangeError: Class
org.apache.beam.model.pipeline.v1.RunnerApi$StandardPTransforms$Primitives
does not implement the requested interface
org.apache.beam.vendor.grpc.v1_13_1.com.google.protobuf.ProtocolMessageEnum
at
org.apache.beam.runners.core.construction.BeamUrns.getUrn(BeamUrns.java:27)
at
org.apache.beam.runners.core.construction.PTransformTranslation.<clinit>(PTransformTranslation.java:61)
at
org.apache.beam.runners.core.construction.UnconsumedReads$1.visitValue(UnconsumedReads.java:48)
at
org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:674)
at
org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657)
at
org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657)
at
org.apache.beam.sdk.runners.TransformHierarchy$Node.access$600(TransformHierarchy.java:317)
at
org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:251)
at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:458)
at
org.apache.beam.runners.core.construction.UnconsumedReads.ensureAllReadsConsumed(UnconsumedReads.java:39)
at
org.apache.beam.runners.dataflow.DataflowRunner.replaceTransforms(DataflowRunner.java:979)
at
org.apache.beam.runners.dataflow.DataflowRunner.run(DataflowRunner.java:707)
at
org.apache.beam.runners.dataflow.DataflowRunner.run(DataflowRunner.java:179)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:313)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:299)
```
my code is in Scala but it works with Beam 2.9.0.
```
val p = Pipeline.create(options)
val bqDestTable = s"$projectId:$dataset.${table}_${bu.name}"
val topicName = s"${options.getKafkaTopic}_${bu.name}"
p.apply(s"${bu.name}_ReadFromKafka",
KafkaIO.read()
.withBootstrapServers(options.getBootstreapServers)
.updateConsumerProperties(config)
.withTopics(util.Arrays.asList(topicName))
.withKeyDeserializer(classOf[LongDeserializer])
.withValueDeserializer(classOf[StringDeserializer])
.withConsumerFactoryFn(
new
KafkaTLSConsumerFactory(
projectId, options.getSourceBucket, options.getTrustStoreGCSKey,
options.getKeyStoreGCSKey)))
.apply(s"${bu.name}_Convert", ParDo.of(new
ConvertJSONTextToEPCTransaction(bu)))
.apply(s"${bu.name}_WriteToBQ", BigQueryIO.write()
.to(bqDestTable)
.withSchema(schema)
.withFormatFunction(new ConvertMessageToTable())
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND))
p.run
```
The error comes with this part.
```
package org.apache.beam.runners.core.construction;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import
org.apache.beam.vendor.grpc.v1_13_1.com.google.protobuf.ProtocolMessageEnum;
/** Returns the standard
URN of a given enum annotated with [(standard_urn)]. */
public class BeamUrns {
/** Returns the
standard URN of a given enum annotated with [(standard_urn)]. */
public static String getUrn(ProtocolMessageEnum
value) {
return
value.getValueDescriptor().getOptions().getExtension(RunnerApi.beamUrn);
}
}
```
Imported from Jira
[BEAM-6885](https://issues.apache.org/jira/browse/BEAM-6885). Original Jira may
contain additional context.
Reported by: yohei.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]