There's an open issue that looks very similar: https://issues.apache.org/jira/browse/FLINK-36457. No idea how to work around this.
On Wed, 2 Apr 2025 at 15:13, Gulan, Jacob <jacob.gu...@duke-energy.com> wrote: > Hello, > > I've been running into a wall for the past few days trying to get my > PyFlink cluster running in a Docker container on my local machine. > > However, whenever I get to the point in my code where I need to run > env.execute(), I see the following error (full stack trace at bottom of > email): > > ``` > java.net.MalformedURLException: no protocol: > ['file:/opt/flink/opt/flink-python-1.19.2.jar'] > ``` > > It appears that the `file://` prefix is being shortened to `file:`, but > I'm not setting this anywhere in my code, leading me to believe there's > underlying behavior that's creating this inaccurate file path. > > If this error is familiar, I would appreciate any advice as to how to > remediate the issue. I've attached code snippets to this file of my current > implementation. I've tried using both versions 1.19.2 and 1.20.1. > > Thank you for any assistance you may be able to provide. > > > CODE SNIPPET: > ``` > def parse_record(record: str) -> dict: > return json.loads(record) > > > def initialize_environment() -> StreamExecutionEnvironment: > """Initialize the Flink environment""" > env = StreamExecutionEnvironment.get_execution_environment() > > flink_kafka_connector_path: str = str( > (JAR_DIR / "flink-connector-kafka-3.3.0-1.19.jar").resolve() > ) > flink_python_path: str = str( > (JAR_DIR.parent / "opt" / "flink-python-1.19.2.jar").resolve() > ) > > print(f"flink_kafka_connector_path: {flink_kafka_connector_path}") > print(f"flink_python_path: {flink_python_path}") > > env.add_jars(f"file://{flink_kafka_connector_path}", > f"file://{flink_python_path}") > return env > > > def main() -> None: > env: StreamExecutionEnvironment = initialize_environment() > > kafka_properties = { > "bootstrap.servers": BROKERS, > "security.protocol": "SASL_SSL", > "sasl.mechanism": "PLAIN", > "sasl.jaas.config": > f"org.apache.kafka.common.security.plain.PlainLoginModule required > username='{USERNAME}' password='{PASSWORD}';", > } > > kafka_source: KafkaSource = ( > KafkaSource.builder() > .set_topics(SOURCE_TOPIC) > .set_properties(kafka_properties) > .set_starting_offsets(KafkaOffsetsInitializer.earliest()) > .set_value_only_deserializer(SimpleStringSchema()) > .build() > ) > > kafka_sink: KafkaSink = ( > KafkaSink.builder() > .set_bootstrap_servers(BROKERS) > .set_record_serializer( > KafkaRecordSerializationSchema.builder() > .set_topic(DESTINATION_TOPIC) > .set_value_serialization_schema(SimpleStringSchema()) > .build() > ) > .set_delivery_guarantee(DeliveryGuarantee.AT_LEAST_ONCE) > .build() > ) > > data_stream = env.from_source( > kafka_source, > WatermarkStrategy.no_watermarks(), > "Kafka Source", > ) > transformed_data = data_stream.map(parse_record, > output_type=Types.STRING()) > transformed_data.sink_to(kafka_sink) > > print("START EXECUTING!") > > env.execute() > > > if __name__ == "__main__": > main() > ``` > > > > DOCKERFILE: > > ``` > > ############################################################################## > # Stage 1 - Build uber jar using Maven > > ############################################################################## > > # Set arguments for image versions > ARG JAVA_VERSION=11 > ARG FLINK_VERSION=1.19 > ARG SCALA_VERSION=scala_2.12 > > # Source build image > FROM maven:3.9-amazoncorretto-${JAVA_VERSION} AS build > > # Build jars > WORKDIR /build > COPY pom.xml . > RUN mvn dependency:copy-dependencies -DoutputDirectory=/build/jars > > > > ############################################################################## > # Stage 2 - Create the final image > > ############################################################################## > > # Source image > FROM flink:${FLINK_VERSION}-${SCALA_VERSION}-java${JAVA_VERSION} > > # Install Python > RUN apt-get update && apt-get install -y python3.11 python3-pip > > # Copy the built jars from the Maven stage > COPY --from=build /build/jars /opt/flink/lib > > # Create symlink so that "python" points to "python3.11" > RUN ln -s /usr/bin/python3.11 /usr/bin/python > > # Set the working directory > WORKDIR /opt/flink/src > > # Copy the Python project files > COPY pyproject.toml . > COPY README.md . > COPY dataproducts /opt/flink/src/dataproducts > > # Go get a cup of coffee, this will take a while ☕ > RUN python3.11 -m pip install . \ > --no-cache-dir > ``` > > STACK TRACE: > ``` > root@9e14f49ae422:/opt/flink/src/dataproducts# flink run -py main.py > WARNING: Unknown module: jdk.compiler specified to --add-exports > WARNING: Unknown module: jdk.compiler specified to --add-exports > WARNING: Unknown module: jdk.compiler specified to --add-exports > WARNING: Unknown module: jdk.compiler specified to --add-exports > WARNING: Unknown module: jdk.compiler specified to --add-exports > flink_kafka_connector_path: > /opt/flink/lib/flink-connector-kafka-3.3.0-1.19.jar > flink_python_path: /opt/flink/opt/flink-python-1.19.2.jar > START EXECUTING! > Traceback (most recent call last): > File "/opt/flink/src/dataproducts/main.py", line 97, in <module> > main() > File "/opt/flink/src/dataproducts/main.py", line 93, in main > env.execute() > File > "/opt/flink/opt/python/pyflink.zip/pyflink/datastream/stream_execution_environment.py", > line 813, in execute > File "/opt/flink/opt/python/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", > line 1322, in __call__ > File "/opt/flink/opt/python/pyflink.zip/pyflink/util/exceptions.py", > line 146, in deco > File "/opt/flink/opt/python/py4j-0.10.9.7-src.zip/py4j/protocol.py", > line 326, in get_return_value > py4j.protocol.Py4JJavaError: An error occurred while calling o0.execute. > : java.net.MalformedURLException: no protocol: > ['file:/opt/flink/opt/flink-python-1.19.2.jar'] > at java.base/java.net.URL.<init>(Unknown Source) > at java.base/java.net.URL.<init>(Unknown Source) > at java.base/java.net.URL.<init>(Unknown Source) > at > org.apache.flink.configuration.ConfigUtils.decodeListFromConfig(ConfigUtils.java:133) > at > org.apache.flink.client.cli.ExecutionConfigAccessor.getJars(ExecutionConfigAccessor.java:77) > at > org.apache.flink.client.deployment.executors.PipelineExecutorUtils.getJobGraph(PipelineExecutorUtils.java:77) > at > org.apache.flink.client.deployment.executors.AbstractSessionClusterExecutor.execute(AbstractSessionClusterExecutor.java:72) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2440) > at > org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:188) > at > org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:117) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown > Source) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown > Source) > at java.base/java.lang.reflect.Method.invoke(Unknown Source) > at > org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) > at > org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374) > at > org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282) > at > org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) > at > org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79) > at > org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238) > at java.base/java.lang.Thread.run(Unknown Source) > > org.apache.flink.client.program.ProgramAbortException: > java.lang.RuntimeException: Python process exits with code: 1 > at > org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:134) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown > Source) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown > Source) > at java.base/java.lang.reflect.Method.invoke(Unknown Source) > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355) > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) > at > org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:108) > at > org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:1026) > at > org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:247) > at > org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1270) > at > org.apache.flink.client.cli.CliFrontend.lambda$mainInternal$10(CliFrontend.java:1367) > at > org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28) > at > org.apache.flink.client.cli.CliFrontend.mainInternal(CliFrontend.java:1367) > at > org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1335) > Caused by: java.lang.RuntimeException: Python process exits with code: 1 > at > org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:124) > ... 14 more > ``` >