I've double checked it and the mentioned fix must be adopted to at least 1.20 with the is_standard_yaml condition.
On Wed, Apr 2, 2025 at 5:07 PM Gabor Somogyi <gabor.g.somo...@gmail.com> wrote: > But as a general saying, if the issue exists on 1.x line then at least > 1.20 must have this fix. > Let me check that and act accordingly... > > BR, > G > > > On Wed, Apr 2, 2025 at 5:00 PM Gabor Somogyi <gabor.g.somo...@gmail.com> > wrote: > >> Yeah, new version of Kafka connector is needed to use 2.x. >> >> >> >> On Wed, Apr 2, 2025 at 4:56 PM Gulan, Jacob <jacob.gu...@duke-energy.com> >> wrote: >> >>> I'm able to use the 2.x line, but I see that the "flink-connector-kafka" >>> latest version is still on 3.4.0-1.20 with no 2.0 tag yet. Will I need to >>> wait for the 2.x tag to be released? >>> >>> ________________________________ >>> From: Gabor Somogyi <gabor.g.somo...@gmail.com> >>> Sent: Wednesday, April 2, 2025 10:13 AM >>> To: dev@flink.apache.org <dev@flink.apache.org> >>> Subject: [EXTERNAL] Re: Executing PyFlink Cluster >>> >>> *** CAUTION! EXTERNAL SENDER *** STOP. ASSESS. VERIFY!! Were you >>> expecting this email? Are grammar and spelling correct? Does the content >>> make sense? Can you verify the sender? If suspicious report it, then do not >>> click links, open attachments or enter your ID or password. >>> >>> I think I've already fixed this here [1] but only on 2.x line. >>> Up until now I thought that only 2.x is using YAML based configs... >>> >>> [1] https://github.com/apache/flink/pull/26327 >>> >>> BR, >>> G >>> >>> >>> On Wed, Apr 2, 2025 at 4:08 PM Andreas Bube >>> <ab...@toogoodtogo.com.invalid> >>> wrote: >>> >>> > There's an open issue that looks very similar: >>> > https://issues.apache.org/jira/browse/FLINK-36457. No idea how to work >>> > around this. >>> > >>> > On Wed, 2 Apr 2025 at 15:13, Gulan, Jacob <jacob.gu...@duke-energy.com >>> > >>> > wrote: >>> > >>> > > Hello, >>> > > >>> > > I've been running into a wall for the past few days trying to get my >>> > > PyFlink cluster running in a Docker container on my local machine. >>> > > >>> > > However, whenever I get to the point in my code where I need to run >>> > > env.execute(), I see the following error (full stack trace at bottom >>> of >>> > > email): >>> > > >>> > > ``` >>> > > java.net.MalformedURLException: no protocol: >>> > > ['file:/opt/flink/opt/flink-python-1.19.2.jar'] >>> > > ``` >>> > > >>> > > It appears that the `file://` prefix is being shortened to `file:`, >>> but >>> > > I'm not setting this anywhere in my code, leading me to believe >>> there's >>> > > underlying behavior that's creating this inaccurate file path. >>> > > >>> > > If this error is familiar, I would appreciate any advice as to how to >>> > > remediate the issue. I've attached code snippets to this file of my >>> > current >>> > > implementation. I've tried using both versions 1.19.2 and 1.20.1. >>> > > >>> > > Thank you for any assistance you may be able to provide. >>> > > >>> > > >>> > > CODE SNIPPET: >>> > > ``` >>> > > def parse_record(record: str) -> dict: >>> > > return json.loads(record) >>> > > >>> > > >>> > > def initialize_environment() -> StreamExecutionEnvironment: >>> > > """Initialize the Flink environment""" >>> > > env = StreamExecutionEnvironment.get_execution_environment() >>> > > >>> > > flink_kafka_connector_path: str = str( >>> > > (JAR_DIR / "flink-connector-kafka-3.3.0-1.19.jar").resolve() >>> > > ) >>> > > flink_python_path: str = str( >>> > > (JAR_DIR.parent / "opt" / >>> "flink-python-1.19.2.jar").resolve() >>> > > ) >>> > > >>> > > print(f"flink_kafka_connector_path: >>> {flink_kafka_connector_path}") >>> > > print(f"flink_python_path: {flink_python_path}") >>> > > >>> > > env.add_jars(f"file://{flink_kafka_connector_path}", >>> > > f"[file://{flink_python_path}]file://{flink_python_path}") >>> > > return env >>> > > >>> > > >>> > > def main() -> None: >>> > > env: StreamExecutionEnvironment = initialize_environment() >>> > > >>> > > kafka_properties = { >>> > > "bootstrap.servers": BROKERS, >>> > > "security.protocol": "SASL_SSL", >>> > > "sasl.mechanism": "PLAIN", >>> > > "sasl.jaas.config": >>> > > f"org.apache.kafka.common.security.plain.PlainLoginModule required >>> > > username='{USERNAME}' password='{PASSWORD}';", >>> > > } >>> > > >>> > > kafka_source: KafkaSource = ( >>> > > KafkaSource.builder() >>> > > .set_topics(SOURCE_TOPIC) >>> > > .set_properties(kafka_properties) >>> > > .set_starting_offsets(KafkaOffsetsInitializer.earliest()) >>> > > .set_value_only_deserializer(SimpleStringSchema()) >>> > > .build() >>> > > ) >>> > > >>> > > kafka_sink: KafkaSink = ( >>> > > KafkaSink.builder() >>> > > .set_bootstrap_servers(BROKERS) >>> > > .set_record_serializer( >>> > > KafkaRecordSerializationSchema.builder() >>> > > .set_topic(DESTINATION_TOPIC) >>> > > .set_value_serialization_schema(SimpleStringSchema()) >>> > > .build() >>> > > ) >>> > > .set_delivery_guarantee(DeliveryGuarantee.AT_LEAST_ONCE) >>> > > .build() >>> > > ) >>> > > >>> > > data_stream = env.from_source( >>> > > kafka_source, >>> > > WatermarkStrategy.no_watermarks(), >>> > > "Kafka Source", >>> > > ) >>> > > transformed_data = data_stream.map(parse_record, >>> > > output_type=Types.STRING()) >>> > > transformed_data.sink_to(kafka_sink) >>> > > >>> > > print("START EXECUTING!") >>> > > >>> > > env.execute() >>> > > >>> > > >>> > > if __name__ == "__main__": >>> > > main() >>> > > ``` >>> > > >>> > > >>> > > >>> > > DOCKERFILE: >>> > > >>> > > ``` >>> > > >>> > > >>> > >>> ############################################################################## >>> > > # Stage 1 - Build uber jar using Maven >>> > > >>> > > >>> > >>> ############################################################################## >>> > > >>> > > # Set arguments for image versions >>> > > ARG JAVA_VERSION=11 >>> > > ARG FLINK_VERSION=1.19 >>> > > ARG SCALA_VERSION=scala_2.12 >>> > > >>> > > # Source build image >>> > > FROM maven:3.9-amazoncorretto-${JAVA_VERSION} AS build >>> > > >>> > > # Build jars >>> > > WORKDIR /build >>> > > COPY pom.xml . >>> > > RUN mvn dependency:copy-dependencies -DoutputDirectory=/build/jars >>> > > >>> > > >>> > > >>> > > >>> > >>> ############################################################################## >>> > > # Stage 2 - Create the final image >>> > > >>> > > >>> > >>> ############################################################################## >>> > > >>> > > # Source image >>> > > FROM flink:${FLINK_VERSION}-${SCALA_VERSION}-java${JAVA_VERSION} >>> > > >>> > > # Install Python >>> > > RUN apt-get update && apt-get install -y python3.11 python3-pip >>> > > >>> > > # Copy the built jars from the Maven stage >>> > > COPY --from=build /build/jars /opt/flink/lib >>> > > >>> > > # Create symlink so that "python" points to "python3.11" >>> > > RUN ln -s /usr/bin/python3.11 /usr/bin/python >>> > > >>> > > # Set the working directory >>> > > WORKDIR /opt/flink/src >>> > > >>> > > # Copy the Python project files >>> > > COPY pyproject.toml . >>> > > COPY README.md . >>> > > COPY dataproducts /opt/flink/src/dataproducts >>> > > >>> > > # Go get a cup of coffee, this will take a while ☕ >>> > > RUN python3.11 -m pip install . \ >>> > > --no-cache-dir >>> > > ``` >>> > > >>> > > STACK TRACE: >>> > > ``` >>> > > root@9e14f49ae422:/opt/flink/src/dataproducts# flink run -py main.py >>> > > WARNING: Unknown module: jdk.compiler specified to --add-exports >>> > > WARNING: Unknown module: jdk.compiler specified to --add-exports >>> > > WARNING: Unknown module: jdk.compiler specified to --add-exports >>> > > WARNING: Unknown module: jdk.compiler specified to --add-exports >>> > > WARNING: Unknown module: jdk.compiler specified to --add-exports >>> > > flink_kafka_connector_path: >>> > > /opt/flink/lib/flink-connector-kafka-3.3.0-1.19.jar >>> > > flink_python_path: /opt/flink/opt/flink-python-1.19.2.jar >>> > > START EXECUTING! >>> > > Traceback (most recent call last): >>> > > File "/opt/flink/src/dataproducts/main.py", line 97, in <module> >>> > > main() >>> > > File "/opt/flink/src/dataproducts/main.py", line 93, in main >>> > > env.execute() >>> > > File >>> > > >>> > >>> "/opt/flink/opt/python/pyflink.zip/pyflink/datastream/stream_execution_environment.py", >>> > > line 813, in execute >>> > > File >>> > "/opt/flink/opt/python/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", >>> > > line 1322, in __call__ >>> > > File >>> "/opt/flink/opt/python/pyflink.zip/pyflink/util/exceptions.py", >>> > > line 146, in deco >>> > > File >>> "/opt/flink/opt/python/py4j-0.10.9.7-src.zip/py4j/protocol.py", >>> > > line 326, in get_return_value >>> > > py4j.protocol.Py4JJavaError: An error occurred while calling >>> o0.execute. >>> > > : java.net.MalformedURLException: no protocol: >>> > > ['file:/opt/flink/opt/flink-python-1.19.2.jar'] >>> > > at java.base/java.net.URL.<init>(Unknown Source) >>> > > at java.base/java.net.URL.<init>(Unknown Source) >>> > > at java.base/java.net.URL.<init>(Unknown Source) >>> > > at >>> > > >>> > >>> org.apache.flink.configuration.ConfigUtils.decodeListFromConfig(ConfigUtils.java:133) >>> > > at >>> > > >>> > >>> org.apache.flink.client.cli.ExecutionConfigAccessor.getJars(ExecutionConfigAccessor.java:77) >>> > > at >>> > > >>> > >>> org.apache.flink.client.deployment.executors.PipelineExecutorUtils.getJobGraph(PipelineExecutorUtils.java:77) >>> > > at >>> > > >>> > >>> org.apache.flink.client.deployment.executors.AbstractSessionClusterExecutor.execute(AbstractSessionClusterExecutor.java:72) >>> > > at >>> > > >>> > >>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2440) >>> > > at >>> > > >>> > >>> org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:188) >>> > > at >>> > > >>> > >>> org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:117) >>> > > at >>> > > >>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native >>> > > Method) >>> > > at >>> > > >>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown >>> > > Source) >>> > > at >>> > > >>> > >>> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown >>> > > Source) >>> > > at java.base/java.lang.reflect.Method.invoke(Unknown Source) >>> > > at >>> > > >>> > >>> org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) >>> > > at >>> > > >>> > >>> org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374) >>> > > at >>> > > >>> org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282) >>> > > at >>> > > >>> > >>> org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) >>> > > at >>> > > >>> > >>> org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79) >>> > > at >>> > > >>> > >>> org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238) >>> > > at java.base/java.lang.Thread.run(Unknown Source) >>> > > >>> > > org.apache.flink.client.program.ProgramAbortException: >>> > > java.lang.RuntimeException: Python process exits with code: 1 >>> > > at >>> > > >>> org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:134) >>> > > at >>> > > >>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native >>> > > Method) >>> > > at >>> > > >>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown >>> > > Source) >>> > > at >>> > > >>> > >>> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown >>> > > Source) >>> > > at java.base/java.lang.reflect.Method.invoke(Unknown Source) >>> > > at >>> > > >>> > >>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355) >>> > > at >>> > > >>> > >>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) >>> > > at >>> > > >>> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:108) >>> > > at >>> > > >>> > >>> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:1026) >>> > > at >>> > > org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:247) >>> > > at >>> > > >>> > >>> org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1270) >>> > > at >>> > > >>> > >>> org.apache.flink.client.cli.CliFrontend.lambda$mainInternal$10(CliFrontend.java:1367) >>> > > at >>> > > >>> > >>> org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28) >>> > > at >>> > > >>> > >>> org.apache.flink.client.cli.CliFrontend.mainInternal(CliFrontend.java:1367) >>> > > at >>> > > org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1335) >>> > > Caused by: java.lang.RuntimeException: Python process exits with >>> code: 1 >>> > > at >>> > > >>> org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:124) >>> > > ... 14 more >>> > > ``` >>> > > >>> > >>> >>