AFAIK the standard YAML config possibility were introduced in Flink 1.19, it just coexisted with the old solution to keep bw compatibility in the 1.x line.
If we bp it to 1.20, it's not much overhead to move it over to 1.19 as well I guess. WDYT? On Wednesday, April 2nd, 2025 at 17:23, Gabor Somogyi <gabor.g.somo...@gmail.com> wrote: > > > I've double checked it and the mentioned fix must be adopted to at least > 1.20 > with the is_standard_yaml condition. > > > On Wed, Apr 2, 2025 at 5:07 PM Gabor Somogyi gabor.g.somo...@gmail.com > > wrote: > > > But as a general saying, if the issue exists on 1.x line then at least > > 1.20 must have this fix. > > Let me check that and act accordingly... > > > > BR, > > G > > > > On Wed, Apr 2, 2025 at 5:00 PM Gabor Somogyi gabor.g.somo...@gmail.com > > wrote: > > > > > Yeah, new version of Kafka connector is needed to use 2.x. > > > > > > On Wed, Apr 2, 2025 at 4:56 PM Gulan, Jacob jacob.gu...@duke-energy.com > > > wrote: > > > > > > > I'm able to use the 2.x line, but I see that the "flink-connector-kafka" > > > > latest version is still on 3.4.0-1.20 with no 2.0 tag yet. Will I need > > > > to > > > > wait for the 2.x tag to be released? > > > > > > > > ________________________________ > > > > From: Gabor Somogyi gabor.g.somo...@gmail.com > > > > Sent: Wednesday, April 2, 2025 10:13 AM > > > > To: dev@flink.apache.org dev@flink.apache.org > > > > Subject: [EXTERNAL] Re: Executing PyFlink Cluster > > > > > > > > *** CAUTION! EXTERNAL SENDER *** STOP. ASSESS. VERIFY!! Were you > > > > expecting this email? Are grammar and spelling correct? Does the content > > > > make sense? Can you verify the sender? If suspicious report it, then do > > > > not > > > > click links, open attachments or enter your ID or password. > > > > > > > > I think I've already fixed this here [1] but only on 2.x line. > > > > Up until now I thought that only 2.x is using YAML based configs... > > > > > > > > [1] https://github.com/apache/flink/pull/26327 > > > > > > > > BR, > > > > G > > > > > > > > On Wed, Apr 2, 2025 at 4:08 PM Andreas Bube > > > > ab...@toogoodtogo.com.invalid > > > > wrote: > > > > > > > > > There's an open issue that looks very similar: > > > > > https://issues.apache.org/jira/browse/FLINK-36457. No idea how to work > > > > > around this. > > > > > > > > > > On Wed, 2 Apr 2025 at 15:13, Gulan, Jacob <jacob.gu...@duke-energy.com > > > > > > > > > > wrote: > > > > > > > > > > > Hello, > > > > > > > > > > > > I've been running into a wall for the past few days trying to get my > > > > > > PyFlink cluster running in a Docker container on my local machine. > > > > > > > > > > > > However, whenever I get to the point in my code where I need to run > > > > > > env.execute(), I see the following error (full stack trace at bottom > > > > > > of > > > > > > email): > > > > > > > > > > > > `java.net.MalformedURLException: no protocol: > > > > > > ['file:/opt/flink/opt/flink-python-1.19.2.jar']` > > > > > > > > > > > > It appears that the `file://` prefix is being shortened to `file:`, > > > > > > but > > > > > > I'm not setting this anywhere in my code, leading me to believe > > > > > > there's > > > > > > underlying behavior that's creating this inaccurate file path. > > > > > > > > > > > > If this error is familiar, I would appreciate any advice as to how > > > > > > to > > > > > > remediate the issue. I've attached code snippets to this file of my > > > > > > current > > > > > > implementation. I've tried using both versions 1.19.2 and 1.20.1. > > > > > > > > > > > > Thank you for any assistance you may be able to provide. > > > > > > > > > > > > CODE SNIPPET: > > > > > > ``` > > > > > > def parse_record(record: str) -> dict: > > > > > > return json.loads(record) > > > > > > > > > > > > def initialize_environment() -> StreamExecutionEnvironment: > > > > > > """Initialize the Flink environment""" > > > > > > env = StreamExecutionEnvironment.get_execution_environment() > > > > > > > > > > > > flink_kafka_connector_path: str = str( > > > > > > (JAR_DIR / "flink-connector-kafka-3.3.0-1.19.jar").resolve() > > > > > > ) > > > > > > flink_python_path: str = str( > > > > > > (JAR_DIR.parent / "opt" / > > > > > > "flink-python-1.19.2.jar").resolve() > > > > > > ) > > > > > > > > > > > > print(f"flink_kafka_connector_path: > > > > > > {flink_kafka_connector_path}") > > > > > > print(f"flink_python_path: {flink_python_path}") > > > > > > > > > > > > env.add_jars(f"file://{flink_kafka_connector_path}", > > > > > > f"[file://{flink_python_path}]file://{flink_python_path}") > > > > > > return env > > > > > > > > > > > > def main() -> None: > > > > > > env: StreamExecutionEnvironment = initialize_environment() > > > > > > > > > > > > kafka_properties = { > > > > > > "bootstrap.servers": BROKERS, > > > > > > "security.protocol": "SASL_SSL", > > > > > > "sasl.mechanism": "PLAIN", > > > > > > "sasl.jaas.config": > > > > > > f"org.apache.kafka.common.security.plain.PlainLoginModule required > > > > > > username='{USERNAME}' password='{PASSWORD}';", > > > > > > } > > > > > > > > > > > > kafka_source: KafkaSource = ( > > > > > > KafkaSource.builder() > > > > > > .set_topics(SOURCE_TOPIC) > > > > > > .set_properties(kafka_properties) > > > > > > .set_starting_offsets(KafkaOffsetsInitializer.earliest()) > > > > > > .set_value_only_deserializer(SimpleStringSchema()) > > > > > > .build() > > > > > > ) > > > > > > > > > > > > kafka_sink: KafkaSink = ( > > > > > > KafkaSink.builder() > > > > > > .set_bootstrap_servers(BROKERS) > > > > > > .set_record_serializer( > > > > > > KafkaRecordSerializationSchema.builder() > > > > > > .set_topic(DESTINATION_TOPIC) > > > > > > .set_value_serialization_schema(SimpleStringSchema()) > > > > > > .build() > > > > > > ) > > > > > > .set_delivery_guarantee(DeliveryGuarantee.AT_LEAST_ONCE) > > > > > > .build() > > > > > > ) > > > > > > > > > > > > data_stream = env.from_source( > > > > > > kafka_source, > > > > > > WatermarkStrategy.no_watermarks(), > > > > > > "Kafka Source", > > > > > > ) > > > > > > transformed_data = data_stream.map(parse_record, > > > > > > output_type=Types.STRING()) > > > > > > transformed_data.sink_to(kafka_sink) > > > > > > > > > > > > print("START EXECUTING!") > > > > > > > > > > > > env.execute() > > > > > > > > > > > > if name == "main": > > > > > > main() > > > > > > ``` > > > > > > > > > > > > DOCKERFILE: > > > > > > > > > > > > ``` > > > > > > > > ############################################################################## > > > > > > > > > > # Stage 1 - Build uber jar using Maven > > > > > > > > ############################################################################## > > > > > > > > > > # Set arguments for image versions > > > > > > ARG JAVA_VERSION=11 > > > > > > ARG FLINK_VERSION=1.19 > > > > > > ARG SCALA_VERSION=scala_2.12 > > > > > > > > > > > > # Source build image > > > > > > FROM maven:3.9-amazoncorretto-${JAVA_VERSION} AS build > > > > > > > > > > > > # Build jars > > > > > > WORKDIR /build > > > > > > COPY pom.xml . > > > > > > RUN mvn dependency:copy-dependencies -DoutputDirectory=/build/jars > > > > > > > > ############################################################################## > > > > > > > > > > # Stage 2 - Create the final image > > > > > > > > ############################################################################## > > > > > > > > > > # Source image > > > > > > FROM flink:${FLINK_VERSION}-${SCALA_VERSION}-java${JAVA_VERSION} > > > > > > > > > > > > # Install Python > > > > > > RUN apt-get update && apt-get install -y python3.11 python3-pip > > > > > > > > > > > > # Copy the built jars from the Maven stage > > > > > > COPY --from=build /build/jars /opt/flink/lib > > > > > > > > > > > > # Create symlink so that "python" points to "python3.11" > > > > > > RUN ln -s /usr/bin/python3.11 /usr/bin/python > > > > > > > > > > > > # Set the working directory > > > > > > WORKDIR /opt/flink/src > > > > > > > > > > > > # Copy the Python project files > > > > > > COPY pyproject.toml . > > > > > > COPY README.md . > > > > > > COPY dataproducts /opt/flink/src/dataproducts > > > > > > > > > > > > # Go get a cup of coffee, this will take a while ☕ > > > > > > RUN python3.11 -m pip install . \ > > > > > > --no-cache-dir > > > > > > ``` > > > > > > > > > > > > STACK TRACE: > > > > > > ``` > > > > > > root@9e14f49ae422:/opt/flink/src/dataproducts# flink run -py main.py > > > > > > WARNING: Unknown module: jdk.compiler specified to --add-exports > > > > > > WARNING: Unknown module: jdk.compiler specified to --add-exports > > > > > > WARNING: Unknown module: jdk.compiler specified to --add-exports > > > > > > WARNING: Unknown module: jdk.compiler specified to --add-exports > > > > > > WARNING: Unknown module: jdk.compiler specified to --add-exports > > > > > > flink_kafka_connector_path: > > > > > > /opt/flink/lib/flink-connector-kafka-3.3.0-1.19.jar > > > > > > flink_python_path: /opt/flink/opt/flink-python-1.19.2.jar > > > > > > START EXECUTING! > > > > > > Traceback (most recent call last): > > > > > > File "/opt/flink/src/dataproducts/main.py", line 97, in <module> > > > > > > main() > > > > > > File "/opt/flink/src/dataproducts/main.py", line 93, in main > > > > > > env.execute() > > > > > > File > > > > > > > > "/opt/flink/opt/python/pyflink.zip/pyflink/datastream/stream_execution_environment.py", > > > > > > > > > > line 813, in execute > > > > > > File > > > > > > "/opt/flink/opt/python/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", > > > > > > line 1322, in call > > > > > > File > > > > > > "/opt/flink/opt/python/pyflink.zip/pyflink/util/exceptions.py", > > > > > > line 146, in deco > > > > > > File > > > > > > "/opt/flink/opt/python/py4j-0.10.9.7-src.zip/py4j/protocol.py", > > > > > > line 326, in get_return_value > > > > > > py4j.protocol.Py4JJavaError: An error occurred while calling > > > > > > o0.execute. > > > > > > : java.net.MalformedURLException: no protocol: > > > > > > ['file:/opt/flink/opt/flink-python-1.19.2.jar'] > > > > > > at java.base/java.net.URL.<init>(Unknown Source) > > > > > > at java.base/java.net.URL.<init>(Unknown Source) > > > > > > at java.base/java.net.URL.<init>(Unknown Source) > > > > > > at > > > > > > > > org.apache.flink.configuration.ConfigUtils.decodeListFromConfig(ConfigUtils.java:133) > > > > > > > > > > at > > > > > > > > org.apache.flink.client.cli.ExecutionConfigAccessor.getJars(ExecutionConfigAccessor.java:77) > > > > > > > > > > at > > > > > > > > org.apache.flink.client.deployment.executors.PipelineExecutorUtils.getJobGraph(PipelineExecutorUtils.java:77) > > > > > > > > > > at > > > > > > > > org.apache.flink.client.deployment.executors.AbstractSessionClusterExecutor.execute(AbstractSessionClusterExecutor.java:72) > > > > > > > > > > at > > > > > > > > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2440) > > > > > > > > > > at > > > > > > > > org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:188) > > > > > > > > > > at > > > > > > > > org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:117) > > > > > > > > > > at > > > > > > > > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native > > > > > > > > > > Method) > > > > > > at > > > > > > > > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown > > > > > > > > > > Source) > > > > > > at > > > > > > > > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown > > > > > > > > > > Source) > > > > > > at java.base/java.lang.reflect.Method.invoke(Unknown Source) > > > > > > at > > > > > > > > org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) > > > > > > > > > > at > > > > > > > > org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374) > > > > > > > > > > at > > > > > > > > org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282) > > > > > > > > > > at > > > > > > > > org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) > > > > > > > > > > at > > > > > > > > org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79) > > > > > > > > > > at > > > > > > > > org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238) > > > > > > > > > > at java.base/java.lang.Thread.run(Unknown Source) > > > > > > > > > > > > org.apache.flink.client.program.ProgramAbortException: > > > > > > java.lang.RuntimeException: Python process exits with code: 1 > > > > > > at > > > > > > > > org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:134) > > > > > > > > > > at > > > > > > > > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native > > > > > > > > > > Method) > > > > > > at > > > > > > > > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown > > > > > > > > > > Source) > > > > > > at > > > > > > > > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown > > > > > > > > > > Source) > > > > > > at java.base/java.lang.reflect.Method.invoke(Unknown Source) > > > > > > at > > > > > > > > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355) > > > > > > > > > > at > > > > > > > > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) > > > > > > > > > > at > > > > > > > > org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:108) > > > > > > > > > > at > > > > > > > > org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:1026) > > > > > > > > > > at > > > > > > org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:247) > > > > > > at > > > > > > > > org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1270) > > > > > > > > > > at > > > > > > > > org.apache.flink.client.cli.CliFrontend.lambda$mainInternal$10(CliFrontend.java:1367) > > > > > > > > > > at > > > > > > > > org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28) > > > > > > > > > > at > > > > > > > > org.apache.flink.client.cli.CliFrontend.mainInternal(CliFrontend.java:1367) > > > > > > > > > > at > > > > > > org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1335) > > > > > > Caused by: java.lang.RuntimeException: Python process exits with > > > > > > code: 1 > > > > > > at > > > > > > > > org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:124) > > > > > > > > > > ... 14 more > > > > > > ```