There's an open issue that looks very similar:
https://issues.apache.org/jira/browse/FLINK-36457. No idea how to work
around this.

On Wed, 2 Apr 2025 at 15:13, Gulan, Jacob <jacob.gu...@duke-energy.com>
wrote:

> Hello,
>
> I've been running into a wall for the past few days trying to get my
> PyFlink cluster running in a Docker container on my local machine.
>
> However, whenever I get to the point in my code where I need to run
> env.execute(), I see the following error (full stack trace at bottom of
> email):
>
> ```
> java.net.MalformedURLException: no protocol:
> ['file:/opt/flink/opt/flink-python-1.19.2.jar']
> ```
>
> It appears that the `file://` prefix is being shortened to `file:`, but
> I'm not setting this anywhere in my code, leading me to believe there's
> underlying behavior that's creating this inaccurate file path.
>
> If this error is familiar, I would appreciate any advice as to how to
> remediate the issue. I've attached code snippets to this file of my current
> implementation. I've tried using both versions 1.19.2 and 1.20.1.
>
> Thank you for any assistance you may be able to provide.
>
>
> CODE SNIPPET:
> ```
> def parse_record(record: str) -> dict:
>     return json.loads(record)
>
>
> def initialize_environment() -> StreamExecutionEnvironment:
>     """Initialize the Flink environment"""
>     env = StreamExecutionEnvironment.get_execution_environment()
>
>     flink_kafka_connector_path: str = str(
>         (JAR_DIR / "flink-connector-kafka-3.3.0-1.19.jar").resolve()
>     )
>     flink_python_path: str = str(
>         (JAR_DIR.parent / "opt" / "flink-python-1.19.2.jar").resolve()
>     )
>
>     print(f"flink_kafka_connector_path: {flink_kafka_connector_path}")
>     print(f"flink_python_path: {flink_python_path}")
>
>     env.add_jars(f"file://{flink_kafka_connector_path}",
> f"file://{flink_python_path}")
>     return env
>
>
> def main() -> None:
>     env: StreamExecutionEnvironment = initialize_environment()
>
>     kafka_properties = {
>         "bootstrap.servers": BROKERS,
>         "security.protocol": "SASL_SSL",
>         "sasl.mechanism": "PLAIN",
>         "sasl.jaas.config":
> f"org.apache.kafka.common.security.plain.PlainLoginModule required
> username='{USERNAME}' password='{PASSWORD}';",
>     }
>
>     kafka_source: KafkaSource = (
>         KafkaSource.builder()
>         .set_topics(SOURCE_TOPIC)
>         .set_properties(kafka_properties)
>         .set_starting_offsets(KafkaOffsetsInitializer.earliest())
>         .set_value_only_deserializer(SimpleStringSchema())
>         .build()
>     )
>
>     kafka_sink: KafkaSink = (
>         KafkaSink.builder()
>         .set_bootstrap_servers(BROKERS)
>         .set_record_serializer(
>             KafkaRecordSerializationSchema.builder()
>             .set_topic(DESTINATION_TOPIC)
>             .set_value_serialization_schema(SimpleStringSchema())
>             .build()
>         )
>         .set_delivery_guarantee(DeliveryGuarantee.AT_LEAST_ONCE)
>         .build()
>     )
>
>     data_stream = env.from_source(
>         kafka_source,
>         WatermarkStrategy.no_watermarks(),
>         "Kafka Source",
>     )
>     transformed_data = data_stream.map(parse_record,
> output_type=Types.STRING())
>     transformed_data.sink_to(kafka_sink)
>
>     print("START EXECUTING!")
>
>     env.execute()
>
>
> if __name__ == "__main__":
>     main()
> ```
>
>
>
> DOCKERFILE:
>
> ```
>
> ##############################################################################
> # Stage 1 - Build uber jar using Maven
>
> ##############################################################################
>
> # Set arguments for image versions
> ARG JAVA_VERSION=11
> ARG FLINK_VERSION=1.19
> ARG SCALA_VERSION=scala_2.12
>
> # Source build image
> FROM maven:3.9-amazoncorretto-${JAVA_VERSION} AS build
>
> # Build jars
> WORKDIR /build
> COPY pom.xml .
> RUN mvn dependency:copy-dependencies -DoutputDirectory=/build/jars
>
>
>
> ##############################################################################
> # Stage 2 - Create the final image
>
> ##############################################################################
>
> # Source image
> FROM flink:${FLINK_VERSION}-${SCALA_VERSION}-java${JAVA_VERSION}
>
> # Install Python
> RUN apt-get update && apt-get install -y python3.11 python3-pip
>
> # Copy the built jars from the Maven stage
> COPY --from=build /build/jars /opt/flink/lib
>
> # Create symlink so that "python" points to "python3.11"
> RUN ln -s /usr/bin/python3.11 /usr/bin/python
>
> # Set the working directory
> WORKDIR /opt/flink/src
>
> # Copy the Python project files
> COPY pyproject.toml .
> COPY README.md .
> COPY dataproducts /opt/flink/src/dataproducts
>
> # Go get a cup of coffee, this will take a while ☕
> RUN python3.11 -m pip install . \
>     --no-cache-dir
> ```
>
> STACK TRACE:
> ```
> root@9e14f49ae422:/opt/flink/src/dataproducts# flink run -py main.py
> WARNING: Unknown module: jdk.compiler specified to --add-exports
> WARNING: Unknown module: jdk.compiler specified to --add-exports
> WARNING: Unknown module: jdk.compiler specified to --add-exports
> WARNING: Unknown module: jdk.compiler specified to --add-exports
> WARNING: Unknown module: jdk.compiler specified to --add-exports
> flink_kafka_connector_path:
> /opt/flink/lib/flink-connector-kafka-3.3.0-1.19.jar
> flink_python_path: /opt/flink/opt/flink-python-1.19.2.jar
> START EXECUTING!
> Traceback (most recent call last):
>   File "/opt/flink/src/dataproducts/main.py", line 97, in <module>
>     main()
>   File "/opt/flink/src/dataproducts/main.py", line 93, in main
>     env.execute()
>   File
> "/opt/flink/opt/python/pyflink.zip/pyflink/datastream/stream_execution_environment.py",
> line 813, in execute
>   File "/opt/flink/opt/python/py4j-0.10.9.7-src.zip/py4j/java_gateway.py",
> line 1322, in __call__
>   File "/opt/flink/opt/python/pyflink.zip/pyflink/util/exceptions.py",
> line 146, in deco
>   File "/opt/flink/opt/python/py4j-0.10.9.7-src.zip/py4j/protocol.py",
> line 326, in get_return_value
> py4j.protocol.Py4JJavaError: An error occurred while calling o0.execute.
> : java.net.MalformedURLException: no protocol:
> ['file:/opt/flink/opt/flink-python-1.19.2.jar']
>         at java.base/java.net.URL.<init>(Unknown Source)
>         at java.base/java.net.URL.<init>(Unknown Source)
>         at java.base/java.net.URL.<init>(Unknown Source)
>         at
> org.apache.flink.configuration.ConfigUtils.decodeListFromConfig(ConfigUtils.java:133)
>         at
> org.apache.flink.client.cli.ExecutionConfigAccessor.getJars(ExecutionConfigAccessor.java:77)
>         at
> org.apache.flink.client.deployment.executors.PipelineExecutorUtils.getJobGraph(PipelineExecutorUtils.java:77)
>         at
> org.apache.flink.client.deployment.executors.AbstractSessionClusterExecutor.execute(AbstractSessionClusterExecutor.java:72)
>         at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2440)
>         at
> org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:188)
>         at
> org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:117)
>         at
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
> Method)
>         at
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown
> Source)
>         at
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown
> Source)
>         at java.base/java.lang.reflect.Method.invoke(Unknown Source)
>         at
> org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
>         at
> org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
>         at
> org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
>         at
> org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
>         at
> org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
>         at
> org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
>         at java.base/java.lang.Thread.run(Unknown Source)
>
> org.apache.flink.client.program.ProgramAbortException:
> java.lang.RuntimeException: Python process exits with code: 1
>         at
> org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:134)
>         at
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
> Method)
>         at
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown
> Source)
>         at
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown
> Source)
>         at java.base/java.lang.reflect.Method.invoke(Unknown Source)
>         at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
>         at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
>         at
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:108)
>         at
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:1026)
>         at
> org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:247)
>         at
> org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1270)
>         at
> org.apache.flink.client.cli.CliFrontend.lambda$mainInternal$10(CliFrontend.java:1367)
>         at
> org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
>         at
> org.apache.flink.client.cli.CliFrontend.mainInternal(CliFrontend.java:1367)
>         at
> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1335)
> Caused by: java.lang.RuntimeException: Python process exits with code: 1
>         at
> org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:124)
>         ... 14 more
> ```
>

Reply via email to