Hello,
I've been running into a wall for the past few days trying to get my PyFlink
cluster running in a Docker container on my local machine.
However, whenever I get to the point in my code where I need to run
env.execute(), I see the following error (full stack trace at bottom of email):
```
java.net.MalformedURLException: no protocol:
['file:/opt/flink/opt/flink-python-1.19.2.jar']
```
It appears that the `file://` prefix is being shortened to `file:`, but I'm not
setting this anywhere in my code, leading me to believe there's underlying
behavior that's creating this inaccurate file path.
If this error is familiar, I would appreciate any advice as to how to remediate
the issue. I've attached code snippets to this file of my current
implementation. I've tried using both versions 1.19.2 and 1.20.1.
Thank you for any assistance you may be able to provide.
CODE SNIPPET:
```
def parse_record(record: str) -> dict:
return json.loads(record)
def initialize_environment() -> StreamExecutionEnvironment:
"""Initialize the Flink environment"""
env = StreamExecutionEnvironment.get_execution_environment()
flink_kafka_connector_path: str = str(
(JAR_DIR / "flink-connector-kafka-3.3.0-1.19.jar").resolve()
)
flink_python_path: str = str(
(JAR_DIR.parent / "opt" / "flink-python-1.19.2.jar").resolve()
)
print(f"flink_kafka_connector_path: {flink_kafka_connector_path}")
print(f"flink_python_path: {flink_python_path}")
env.add_jars(f"file://{flink_kafka_connector_path}",
f"file://{flink_python_path}")
return env
def main() -> None:
env: StreamExecutionEnvironment = initialize_environment()
kafka_properties = {
"bootstrap.servers": BROKERS,
"security.protocol": "SASL_SSL",
"sasl.mechanism": "PLAIN",
"sasl.jaas.config":
f"org.apache.kafka.common.security.plain.PlainLoginModule required
username='{USERNAME}' password='{PASSWORD}';",
}
kafka_source: KafkaSource = (
KafkaSource.builder()
.set_topics(SOURCE_TOPIC)
.set_properties(kafka_properties)
.set_starting_offsets(KafkaOffsetsInitializer.earliest())
.set_value_only_deserializer(SimpleStringSchema())
.build()
)
kafka_sink: KafkaSink = (
KafkaSink.builder()
.set_bootstrap_servers(BROKERS)
.set_record_serializer(
KafkaRecordSerializationSchema.builder()
.set_topic(DESTINATION_TOPIC)
.set_value_serialization_schema(SimpleStringSchema())
.build()
)
.set_delivery_guarantee(DeliveryGuarantee.AT_LEAST_ONCE)
.build()
)
data_stream = env.from_source(
kafka_source,
WatermarkStrategy.no_watermarks(),
"Kafka Source",
)
transformed_data = data_stream.map(parse_record, output_type=Types.STRING())
transformed_data.sink_to(kafka_sink)
print("START EXECUTING!")
env.execute()
if __name__ == "__main__":
main()
```
DOCKERFILE:
```
##############################################################################
# Stage 1 - Build uber jar using Maven
##############################################################################
# Set arguments for image versions
ARG JAVA_VERSION=11
ARG FLINK_VERSION=1.19
ARG SCALA_VERSION=scala_2.12
# Source build image
FROM maven:3.9-amazoncorretto-${JAVA_VERSION} AS build
# Build jars
WORKDIR /build
COPY pom.xml .
RUN mvn dependency:copy-dependencies -DoutputDirectory=/build/jars
##############################################################################
# Stage 2 - Create the final image
##############################################################################
# Source image
FROM flink:${FLINK_VERSION}-${SCALA_VERSION}-java${JAVA_VERSION}
# Install Python
RUN apt-get update && apt-get install -y python3.11 python3-pip
# Copy the built jars from the Maven stage
COPY --from=build /build/jars /opt/flink/lib
# Create symlink so that "python" points to "python3.11"
RUN ln -s /usr/bin/python3.11 /usr/bin/python
# Set the working directory
WORKDIR /opt/flink/src
# Copy the Python project files
COPY pyproject.toml .
COPY README.md .
COPY dataproducts /opt/flink/src/dataproducts
# Go get a cup of coffee, this will take a while ☕
RUN python3.11 -m pip install . \
--no-cache-dir
```
STACK TRACE:
```
root@9e14f49ae422:/opt/flink/src/dataproducts# flink run -py main.py
WARNING: Unknown module: jdk.compiler specified to --add-exports
WARNING: Unknown module: jdk.compiler specified to --add-exports
WARNING: Unknown module: jdk.compiler specified to --add-exports
WARNING: Unknown module: jdk.compiler specified to --add-exports
WARNING: Unknown module: jdk.compiler specified to --add-exports
flink_kafka_connector_path: /opt/flink/lib/flink-connector-kafka-3.3.0-1.19.jar
flink_python_path: /opt/flink/opt/flink-python-1.19.2.jar
START EXECUTING!
Traceback (most recent call last):
File "/opt/flink/src/dataproducts/main.py", line 97, in <module>
main()
File "/opt/flink/src/dataproducts/main.py", line 93, in main
env.execute()
File
"/opt/flink/opt/python/pyflink.zip/pyflink/datastream/stream_execution_environment.py",
line 813, in execute
File "/opt/flink/opt/python/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line
1322, in __call__
File "/opt/flink/opt/python/pyflink.zip/pyflink/util/exceptions.py", line
146, in deco
File "/opt/flink/opt/python/py4j-0.10.9.7-src.zip/py4j/protocol.py", line
326, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o0.execute.
: java.net.MalformedURLException: no protocol:
['file:/opt/flink/opt/flink-python-1.19.2.jar']
at java.base/java.net.URL.<init>(Unknown Source)
at java.base/java.net.URL.<init>(Unknown Source)
at java.base/java.net.URL.<init>(Unknown Source)
at
org.apache.flink.configuration.ConfigUtils.decodeListFromConfig(ConfigUtils.java:133)
at
org.apache.flink.client.cli.ExecutionConfigAccessor.getJars(ExecutionConfigAccessor.java:77)
at
org.apache.flink.client.deployment.executors.PipelineExecutorUtils.getJobGraph(PipelineExecutorUtils.java:77)
at
org.apache.flink.client.deployment.executors.AbstractSessionClusterExecutor.execute(AbstractSessionClusterExecutor.java:72)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2440)
at
org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:188)
at
org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:117)
at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
at
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown
Source)
at java.base/java.lang.reflect.Method.invoke(Unknown Source)
at
org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at
org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
at
org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
at
org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at
org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
at
org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.base/java.lang.Thread.run(Unknown Source)
org.apache.flink.client.program.ProgramAbortException:
java.lang.RuntimeException: Python process exits with code: 1
at
org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:134)
at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
at
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown
Source)
at java.base/java.lang.reflect.Method.invoke(Unknown Source)
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
at
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:108)
at
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:1026)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:247)
at
org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1270)
at
org.apache.flink.client.cli.CliFrontend.lambda$mainInternal$10(CliFrontend.java:1367)
at
org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
at
org.apache.flink.client.cli.CliFrontend.mainInternal(CliFrontend.java:1367)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1335)
Caused by: java.lang.RuntimeException: Python process exits with code: 1
at
org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:124)
... 14 more
```