Hi everyone, I am very excited with the 2.36 release, especially the stopReadOffset addition to the KafkaSourceDescriptors. With it, I can read sections of a topic and create state,effectively having a bounded kafka source, before reading new items that need processing.
Unfortunately, running the pipeline from the Flink CLI produces the following error: Pretty printing Flink args: --detached --class=namespace.pipeline.App /opt/bns/etf/data-platform/user_code/pipelines/cerberus-etl-pipelines.jar --configFilePath=/path/to/config.properties --runner=FlinkRunner --streaming --checkpointingInterval=30000 --stateBackend=filesystem --stateBackendStoragePath=file:///path/to/state --numberOfExecutionRetries=2 --fasterCopy --debugThrowExceptions java.lang.IncompatibleClassChangeError: Class org.apache.beam.model.pipeline.v1.RunnerApi$StandardResourceHints$Enum does not implement the requested interface org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.ProtocolMessageEnum at org.apache.beam.sdk.transforms.resourcehints.ResourceHints.getUrn(ResourceHints.java:50) at org.apache.beam.sdk.transforms.resourcehints.ResourceHints.<clinit>(ResourceHints.java:54) at org.apache.beam.sdk.Pipeline.<init>(Pipeline.java:523) at org.apache.beam.sdk.Pipeline.create(Pipeline.java:157) at lines containing Pipeline.create(options) <--- my code at namespace.pipeline.App.main(App.java:42) <-- my code at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) at java.base/java.lang.reflect.Method.invoke(Unknown Source) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246) at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054) at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132) at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132) Any advice would be appreciated. Thank you, Cristian