Hi everyone,

I am very excited with the 2.36 release, especially the stopReadOffset
addition to the KafkaSourceDescriptors. With it, I can read sections of a
topic and create state,effectively having a bounded kafka source, before
reading new items that need processing.

Unfortunately, running the pipeline from the Flink CLI produces the
following error:

Pretty printing Flink args:
--detached
--class=namespace.pipeline.App
/opt/bns/etf/data-platform/user_code/pipelines/cerberus-etl-pipelines.jar
--configFilePath=/path/to/config.properties
--runner=FlinkRunner
--streaming
--checkpointingInterval=30000
--stateBackend=filesystem
--stateBackendStoragePath=file:///path/to/state
--numberOfExecutionRetries=2
--fasterCopy
--debugThrowExceptions
java.lang.IncompatibleClassChangeError: Class
org.apache.beam.model.pipeline.v1.RunnerApi$StandardResourceHints$Enum does
not implement the requested interface
org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.ProtocolMessageEnum
        at
org.apache.beam.sdk.transforms.resourcehints.ResourceHints.getUrn(ResourceHints.java:50)
        at
org.apache.beam.sdk.transforms.resourcehints.ResourceHints.<clinit>(ResourceHints.java:54)
        at org.apache.beam.sdk.Pipeline.<init>(Pipeline.java:523)
        at org.apache.beam.sdk.Pipeline.create(Pipeline.java:157)
        at lines containing Pipeline.create(options) <--- my code
        at namespace.pipeline.App.main(App.java:42) <-- my code
        at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
Method)
        at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown
Source)
        at
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown
Source)
        at java.base/java.lang.reflect.Method.invoke(Unknown Source)
        at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
        at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
        at
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
        at
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)
        at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)
        at
org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)
        at
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
        at
org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
        at
org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)

Any advice would be appreciated.

Thank you,
Cristian

Reply via email to