I'm able to use the 2.x line, but I see that the "flink-connector-kafka" latest version is still on 3.4.0-1.20 with no 2.0 tag yet. Will I need to wait for the 2.x tag to be released?
________________________________ From: Gabor Somogyi <gabor.g.somo...@gmail.com> Sent: Wednesday, April 2, 2025 10:13 AM To: dev@flink.apache.org <dev@flink.apache.org> Subject: [EXTERNAL] Re: Executing PyFlink Cluster *** CAUTION! EXTERNAL SENDER *** STOP. ASSESS. VERIFY!! Were you expecting this email? Are grammar and spelling correct? Does the content make sense? Can you verify the sender? If suspicious report it, then do not click links, open attachments or enter your ID or password. I think I've already fixed this here [1] but only on 2.x line. Up until now I thought that only 2.x is using YAML based configs... [1] https://github.com/apache/flink/pull/26327 BR, G On Wed, Apr 2, 2025 at 4:08 PM Andreas Bube <ab...@toogoodtogo.com.invalid> wrote: > There's an open issue that looks very similar: > https://issues.apache.org/jira/browse/FLINK-36457. No idea how to work > around this. > > On Wed, 2 Apr 2025 at 15:13, Gulan, Jacob <jacob.gu...@duke-energy.com> > wrote: > > > Hello, > > > > I've been running into a wall for the past few days trying to get my > > PyFlink cluster running in a Docker container on my local machine. > > > > However, whenever I get to the point in my code where I need to run > > env.execute(), I see the following error (full stack trace at bottom of > > email): > > > > ``` > > java.net.MalformedURLException: no protocol: > > ['file:/opt/flink/opt/flink-python-1.19.2.jar'] > > ``` > > > > It appears that the `file://` prefix is being shortened to `file:`, but > > I'm not setting this anywhere in my code, leading me to believe there's > > underlying behavior that's creating this inaccurate file path. > > > > If this error is familiar, I would appreciate any advice as to how to > > remediate the issue. I've attached code snippets to this file of my > current > > implementation. I've tried using both versions 1.19.2 and 1.20.1. > > > > Thank you for any assistance you may be able to provide. > > > > > > CODE SNIPPET: > > ``` > > def parse_record(record: str) -> dict: > > return json.loads(record) > > > > > > def initialize_environment() -> StreamExecutionEnvironment: > > """Initialize the Flink environment""" > > env = StreamExecutionEnvironment.get_execution_environment() > > > > flink_kafka_connector_path: str = str( > > (JAR_DIR / "flink-connector-kafka-3.3.0-1.19.jar").resolve() > > ) > > flink_python_path: str = str( > > (JAR_DIR.parent / "opt" / "flink-python-1.19.2.jar").resolve() > > ) > > > > print(f"flink_kafka_connector_path: {flink_kafka_connector_path}") > > print(f"flink_python_path: {flink_python_path}") > > > > env.add_jars(f"file://{flink_kafka_connector_path}", > > f"[file://{flink_python_path}]file://{flink_python_path}") > > return env > > > > > > def main() -> None: > > env: StreamExecutionEnvironment = initialize_environment() > > > > kafka_properties = { > > "bootstrap.servers": BROKERS, > > "security.protocol": "SASL_SSL", > > "sasl.mechanism": "PLAIN", > > "sasl.jaas.config": > > f"org.apache.kafka.common.security.plain.PlainLoginModule required > > username='{USERNAME}' password='{PASSWORD}';", > > } > > > > kafka_source: KafkaSource = ( > > KafkaSource.builder() > > .set_topics(SOURCE_TOPIC) > > .set_properties(kafka_properties) > > .set_starting_offsets(KafkaOffsetsInitializer.earliest()) > > .set_value_only_deserializer(SimpleStringSchema()) > > .build() > > ) > > > > kafka_sink: KafkaSink = ( > > KafkaSink.builder() > > .set_bootstrap_servers(BROKERS) > > .set_record_serializer( > > KafkaRecordSerializationSchema.builder() > > .set_topic(DESTINATION_TOPIC) > > .set_value_serialization_schema(SimpleStringSchema()) > > .build() > > ) > > .set_delivery_guarantee(DeliveryGuarantee.AT_LEAST_ONCE) > > .build() > > ) > > > > data_stream = env.from_source( > > kafka_source, > > WatermarkStrategy.no_watermarks(), > > "Kafka Source", > > ) > > transformed_data = data_stream.map(parse_record, > > output_type=Types.STRING()) > > transformed_data.sink_to(kafka_sink) > > > > print("START EXECUTING!") > > > > env.execute() > > > > > > if __name__ == "__main__": > > main() > > ``` > > > > > > > > DOCKERFILE: > > > > ``` > > > > > ############################################################################## > > # Stage 1 - Build uber jar using Maven > > > > > ############################################################################## > > > > # Set arguments for image versions > > ARG JAVA_VERSION=11 > > ARG FLINK_VERSION=1.19 > > ARG SCALA_VERSION=scala_2.12 > > > > # Source build image > > FROM maven:3.9-amazoncorretto-${JAVA_VERSION} AS build > > > > # Build jars > > WORKDIR /build > > COPY pom.xml . > > RUN mvn dependency:copy-dependencies -DoutputDirectory=/build/jars > > > > > > > > > ############################################################################## > > # Stage 2 - Create the final image > > > > > ############################################################################## > > > > # Source image > > FROM flink:${FLINK_VERSION}-${SCALA_VERSION}-java${JAVA_VERSION} > > > > # Install Python > > RUN apt-get update && apt-get install -y python3.11 python3-pip > > > > # Copy the built jars from the Maven stage > > COPY --from=build /build/jars /opt/flink/lib > > > > # Create symlink so that "python" points to "python3.11" > > RUN ln -s /usr/bin/python3.11 /usr/bin/python > > > > # Set the working directory > > WORKDIR /opt/flink/src > > > > # Copy the Python project files > > COPY pyproject.toml . > > COPY README.md . > > COPY dataproducts /opt/flink/src/dataproducts > > > > # Go get a cup of coffee, this will take a while ☕ > > RUN python3.11 -m pip install . \ > > --no-cache-dir > > ``` > > > > STACK TRACE: > > ``` > > root@9e14f49ae422:/opt/flink/src/dataproducts# flink run -py main.py > > WARNING: Unknown module: jdk.compiler specified to --add-exports > > WARNING: Unknown module: jdk.compiler specified to --add-exports > > WARNING: Unknown module: jdk.compiler specified to --add-exports > > WARNING: Unknown module: jdk.compiler specified to --add-exports > > WARNING: Unknown module: jdk.compiler specified to --add-exports > > flink_kafka_connector_path: > > /opt/flink/lib/flink-connector-kafka-3.3.0-1.19.jar > > flink_python_path: /opt/flink/opt/flink-python-1.19.2.jar > > START EXECUTING! > > Traceback (most recent call last): > > File "/opt/flink/src/dataproducts/main.py", line 97, in <module> > > main() > > File "/opt/flink/src/dataproducts/main.py", line 93, in main > > env.execute() > > File > > > "/opt/flink/opt/python/pyflink.zip/pyflink/datastream/stream_execution_environment.py", > > line 813, in execute > > File > "/opt/flink/opt/python/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", > > line 1322, in __call__ > > File "/opt/flink/opt/python/pyflink.zip/pyflink/util/exceptions.py", > > line 146, in deco > > File "/opt/flink/opt/python/py4j-0.10.9.7-src.zip/py4j/protocol.py", > > line 326, in get_return_value > > py4j.protocol.Py4JJavaError: An error occurred while calling o0.execute. > > : java.net.MalformedURLException: no protocol: > > ['file:/opt/flink/opt/flink-python-1.19.2.jar'] > > at java.base/java.net.URL.<init>(Unknown Source) > > at java.base/java.net.URL.<init>(Unknown Source) > > at java.base/java.net.URL.<init>(Unknown Source) > > at > > > org.apache.flink.configuration.ConfigUtils.decodeListFromConfig(ConfigUtils.java:133) > > at > > > org.apache.flink.client.cli.ExecutionConfigAccessor.getJars(ExecutionConfigAccessor.java:77) > > at > > > org.apache.flink.client.deployment.executors.PipelineExecutorUtils.getJobGraph(PipelineExecutorUtils.java:77) > > at > > > org.apache.flink.client.deployment.executors.AbstractSessionClusterExecutor.execute(AbstractSessionClusterExecutor.java:72) > > at > > > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2440) > > at > > > org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:188) > > at > > > org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:117) > > at > > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native > > Method) > > at > > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown > > Source) > > at > > > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown > > Source) > > at java.base/java.lang.reflect.Method.invoke(Unknown Source) > > at > > > org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) > > at > > > org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374) > > at > > org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282) > > at > > > org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) > > at > > > org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79) > > at > > > org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238) > > at java.base/java.lang.Thread.run(Unknown Source) > > > > org.apache.flink.client.program.ProgramAbortException: > > java.lang.RuntimeException: Python process exits with code: 1 > > at > > org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:134) > > at > > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native > > Method) > > at > > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown > > Source) > > at > > > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown > > Source) > > at java.base/java.lang.reflect.Method.invoke(Unknown Source) > > at > > > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355) > > at > > > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) > > at > > org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:108) > > at > > > org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:1026) > > at > > org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:247) > > at > > > org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1270) > > at > > > org.apache.flink.client.cli.CliFrontend.lambda$mainInternal$10(CliFrontend.java:1367) > > at > > > org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28) > > at > > > org.apache.flink.client.cli.CliFrontend.mainInternal(CliFrontend.java:1367) > > at > > org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1335) > > Caused by: java.lang.RuntimeException: Python process exits with code: 1 > > at > > org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:124) > > ... 14 more > > ``` > > >