Yohei Onishi created BEAM-6885:
----------------------------------
Summary: java.lang.IncompatibleClassChangeError when deploying
Beam 2.11.0 to Dataflow
Key: BEAM-6885
URL: https://issues.apache.org/jira/browse/BEAM-6885
Project: Beam
Issue Type: Bug
Components: beam-model
Affects Versions: 2.11.0
Environment: Dataflow
Reporter: Yohei Onishi
When I am trying to my code to Dataflow I got this error.
{code}
Exception in thread "main" java.lang.IncompatibleClassChangeError: Class
org.apache.beam.model.pipeline.v1.RunnerApi$StandardPTransforms$Primitives does
not implement the requested interface
org.apache.beam.vendor.grpc.v1_13_1.com.google.protobuf.ProtocolMessageEnum
at
org.apache.beam.runners.core.construction.BeamUrns.getUrn(BeamUrns.java:27)
at
org.apache.beam.runners.core.construction.PTransformTranslation.<clinit>(PTransformTranslation.java:61)
at
org.apache.beam.runners.core.construction.UnconsumedReads$1.visitValue(UnconsumedReads.java:48)
at
org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:674)
at
org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657)
at
org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657)
at
org.apache.beam.sdk.runners.TransformHierarchy$Node.access$600(TransformHierarchy.java:317)
at
org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:251)
at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:458)
at
org.apache.beam.runners.core.construction.UnconsumedReads.ensureAllReadsConsumed(UnconsumedReads.java:39)
at
org.apache.beam.runners.dataflow.DataflowRunner.replaceTransforms(DataflowRunner.java:979)
at
org.apache.beam.runners.dataflow.DataflowRunner.run(DataflowRunner.java:707)
at
org.apache.beam.runners.dataflow.DataflowRunner.run(DataflowRunner.java:179)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:313)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:299)
{code}
my code is in Scala but it works with Beam 2.9.0.
{code}
val p = Pipeline.create(options)
val bqDestTable = s"$projectId:$dataset.${table}_${bu.name}"
val topicName = s"${options.getKafkaTopic}_${bu.name}"
p.apply(s"${bu.name}_ReadFromKafka", KafkaIO.read()
.withBootstrapServers(options.getBootstreapServers)
.updateConsumerProperties(config)
.withTopics(util.Arrays.asList(topicName))
.withKeyDeserializer(classOf[LongDeserializer])
.withValueDeserializer(classOf[StringDeserializer])
.withConsumerFactoryFn(
new KafkaTLSConsumerFactory(
projectId, options.getSourceBucket, options.getTrustStoreGCSKey,
options.getKeyStoreGCSKey)))
.apply(s"${bu.name}_Convert", ParDo.of(new
ConvertJSONTextToEPCTransaction(bu)))
.apply(s"${bu.name}_WriteToBQ", BigQueryIO.write()
.to(bqDestTable)
.withSchema(schema)
.withFormatFunction(new ConvertMessageToTable())
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND))
p.run
{code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)