Hey, I've been grappling with this issue for the past five days and, despite my continuous efforts, I haven't found a resolution. Additionally, I've been unable to locate a Slack channel for Beam where I might seek assistance.
issue *RuntimeError: Pipeline construction environment and pipeline runtime environment are not compatible. If you use a custom container image, check that the Python interpreter minor version and the Apache Beam version in your image match the versions used at pipeline construction time. Submission environment: beam:version:sdk_base:apache/beam_java8_sdk:2.48.0. Runtime environment: beam:version:sdk_base:apache/beam_python3.8_sdk:2.48.0.* Here what i am trying to do i am running job from kubernetes container that hits on job server and then job manager and task manager task manager and job manager is one Container Here is My custom Dockerfile. name:custom-flink # Starting with the base Flink image FROM apache/flink:1.16-java11 ARG FLINK_VERSION=1.16 ARG KAFKA_VERSION=2.8.0 # Install python3.8 and its associated dependencies, followed by pyflink RUN set -ex; \ apt-get update && \ apt-get install -y build-essential libssl-dev zlib1g-dev libbz2-dev libffi-dev lzma liblzma-dev && \ wget https://www.python.org/ftp/python/3.8.0/Python-3.8.0.tgz && \ tar -xvf Python-3.8.0.tgz && \ cd Python-3.8.0 && \ ./configure --without-tests --enable-shared && \ make -j4 && \ make install && \ ldconfig /usr/local/lib && \ cd .. && rm -f Python-3.8.0.tgz && rm -rf Python-3.8.0 && \ ln -s /usr/local/bin/python3.8 /usr/local/bin/python && \ ln -s /usr/local/bin/pip3.8 /usr/local/bin/pip && \ apt-get clean && \ rm -rf /var/lib/apt/lists/* && \ python -m pip install --upgrade pip; \ pip install apache-flink==${FLINK_VERSION}; \ pip install kafka-python RUN pip install --no-cache-dir apache-beam[gcp]==2.48.0 # Copy files from official SDK image, including script/dependencies. COPY --from=apache/beam_python3.8_sdk:2.48.0 /opt/apache/beam/ /opt/apache/beam/ # java SDK COPY --from=apache/beam_java11_sdk:2.48.0 /opt/apache/beam/ /opt/apache/beam_java/ RUN apt-get update && apt-get install -y python3-venv && rm -rf /var/lib/apt/lists/* # Give permissions to the /opt/apache/beam-venv directory RUN mkdir -p /opt/apache/beam-venv && chown -R 9999:9999 /opt/apache/beam-venv Here is my Deployment file for Job manager,Task manager plus worker-pool and job server apiVersion: v1 kind: Service metadata: name: flink-jobmanager namespace: flink spec: type: ClusterIP ports: - name: rpc port: 6123 - name: blob-server port: 6124 - name: webui port: 8081 selector: app: flink component: jobmanager --- apiVersion: v1 kind: Service metadata: name: beam-worker-pool namespace: flink spec: selector: app: flink component: taskmanager ports: - protocol: TCP port: 50000 targetPort: 50000 name: pool --- apiVersion: apps/v1 kind: Deployment metadata: name: flink-jobmanager namespace: flink spec: replicas: 1 selector: matchLabels: app: flink component: jobmanager template: metadata: labels: app: flink component: jobmanager spec: containers: - name: jobmanager image: custom-flink:latest imagePullPolicy: IfNotPresent args: ["jobmanager"] ports: - containerPort: 6123 name: rpc - containerPort: 6124 name: blob-server - containerPort: 8081 name: webui livenessProbe: tcpSocket: port: 6123 initialDelaySeconds: 30 periodSeconds: 60 volumeMounts: - name: flink-config-volume mountPath: /opt/flink/conf - name: flink-staging mountPath: /tmp/beam-artifact-staging securityContext: runAsUser: 9999 resources: requests: memory: "1Gi" cpu: "1" limits: memory: "1Gi" cpu: "1" volumes: - name: flink-config-volume configMap: name: flink-config items: - key: flink-conf.yaml path: flink-conf.yaml - key: log4j-console.properties path: log4j-console.properties - name: flink-staging persistentVolumeClaim: claimName: staging-artifacts-claim --- apiVersion: apps/v1 kind: Deployment metadata: name: flink-taskmanager namespace: flink spec: replicas: 1 selector: matchLabels: app: flink component: taskmanager template: metadata: labels: app: flink component: taskmanager spec: containers: - name: taskmanager-beam-worker image: custom-flink:latest imagePullPolicy: IfNotPresent args: - /bin/bash - -c - "/opt/flink/bin/taskmanager.sh start-foreground & python -m apache_beam.runners.worker.worker_pool_main --container_executable=/opt/apache/beam/boot --service_port=50000 & tail -f /dev/null" ports: - containerPort: 6122 name: rpc - containerPort: 6125 name: query-state - containerPort: 50000 name: pool livenessProbe: tcpSocket: port: 6122 initialDelaySeconds: 30 periodSeconds: 60 volumeMounts: - name: flink-config-volume mountPath: /opt/flink/conf/ - name: flink-staging mountPath: /tmp/beam-artifact-staging securityContext: runAsUser: 9999 resources: requests: memory: "4Gi" cpu: "4" limits: memory: "4Gi" cpu: "4" volumes: - name: flink-config-volume configMap: name: flink-config items: - key: flink-conf.yaml path: flink-conf.yaml - key: log4j-console.properties path: log4j-console.properties - name: flink-staging persistentVolumeClaim: claimName: staging-artifacts-claim --- apiVersion: apps/v1 kind: Deployment metadata: name: beam-jobserver namespace: flink spec: replicas: 1 selector: matchLabels: app: beam component: jobserver template: metadata: labels: app: beam component: jobserver spec: containers: - name: beam-jobserver image: apache/beam_flink1.16_job_server:2.48.0 args: ["--flink-master=flink-jobmanager:8081"] ports: - containerPort: 8097 - containerPort: 8098 - containerPort: 8099 volumeMounts: - name: beam-staging mountPath: /tmp/beam-artifact-staging resources: requests: memory: "2Gi" cpu: "2" limits: memory: "2Gi" cpu: "2" volumes: - name: beam-staging persistentVolumeClaim: claimName: staging-artifacts-claim --- apiVersion: v1 kind: Service metadata: name: beam-jobserver namespace: flink spec: type: ClusterIP ports: - name: grpc-port port: 8097 targetPort: 8097 - name: expansion-port port: 8098 targetPort: 8098 - name: job-manage-port port: 8099 targetPort: 8099 selector: app: beam component: jobserver pvc apiVersion: v1 kind: PersistentVolumeClaim metadata: name: staging-artifacts-claim namespace: flink spec: accessModes: - ReadWriteOnce resources: requests: storage: 5Gi storageClassName: standard Then I am running a Pod with apache/beam_python3.8_sdk:2.48.0. and installing java in it because expansion required to run the code here is my code that is running from above container ``` import json import logging import apache_beam as beam from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.io.kafka import ReadFromKafka, default_io_expansion_service def run_beam_pipeline(): logging.getLogger().setLevel(logging.INFO) consumer_config = { 'bootstrap.servers': 'cluster-0-kafka-bootstrap.strimzi.svc.cluster.local:9092', 'group.id': 'beamgrouptest', 'auto.offset.reset': 'earliest', 'key.deserializer': 'org.apache.kafka.common.serialization.StringDeserializer', 'value.deserializer': 'org.apache.kafka.common.serialization.StringDeserializer', } topic = 'locations' flink_options = PipelineOptions([ "--runner=PortableRunner", "--artifact_endpoint=beam-jobserver:8098", "--job_endpoint=beam-jobserver:8099", "--environment_type=EXTERNAL", "--environment_config=beam-worker-pool:50000", # "--environment_config={\"command\": \"/opt/apache/beam/boot\"}", ]) with beam.Pipeline(options=flink_options) as pipeline: messages = ( pipeline | "Read from Kafka" >> ReadFromKafka( consumer_config=consumer_config, topics=[topic], with_metadata=False, expansion_service=default_io_expansion_service( append_args=[ '--defaultEnvironmentType=PROCESS', "--defaultEnvironmentConfig={\"command\": \"/opt/apache/beam/boot\"}", ] ) ) | "Print messages" >> beam.Map(print) ) logging.info("Pipeline execution completed.") if __name__ == '__main__': run_beam_pipeline() ``` When starting a job here is logs, it is downloading java expansion service. python3 testing.py <jemalloc>: MADV_DONTNEED does not work (memset will be used instead) <jemalloc>: (This is the expected behaviour if you are running under QEMU) INFO:apache_beam.utils.subprocess_server:Using cached job server jar from https://repo.maven.apache.org/maven2/org/apache/beam/beam-sdks-java-io-expansion-service/2.48.0/beam-sdks-java-io-expansion-service-2.48.0.jar INFO:root:Starting a JAR-based expansion service from JAR /root/.apache_beam/cache/jars/beam-sdks-java-io-expansion-service-2.48.0.jar INFO:apache_beam.utils.subprocess_server:Starting service with ['java' '-jar' '/root/.apache_beam/cache/jars/beam-sdks-java-io-expansion-service-2.48.0.jar' '35371' '--filesToStage=/root/.apache_beam/cache/jars/beam-sdks-java-io-expansion-service-2.48.0.jar' '--defaultEnvironmentType=PROCESS' '--defaultEnvironmentConfig={"command": "/opt/apache/beam/boot"}'] INFO:apache_beam.utils.subprocess_server:Starting expansion service at localhost:35371 INFO:apache_beam.utils.subprocess_server:Aug 09, 2023 8:35:00 AM org.apache.beam.sdk.expansion.service.ExpansionService loadRegisteredTransforms INFO:apache_beam.utils.subprocess_server:INFO: Registering external transforms: [beam:transform:org.apache.beam:kafka_read_with_metadata:v1, beam:transform:org.apache.beam:kafka_read_without_metadata:v1, beam:transform:org.apache.beam:kafka_write:v1, beam:external:java:generate_sequence:v1] INFO:apache_beam.utils.subprocess_server: INFO:apache_beam.utils.subprocess_server:Registered transforms: INFO:apache_beam.utils.subprocess_server: beam:transform:org.apache.beam:kafka_read_with_metadata:v1: org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$1@5c6648b0 INFO:apache_beam.utils.subprocess_server: beam:transform:org.apache.beam:kafka_read_without_metadata:v1: org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$1@6f1de4c7 INFO:apache_beam.utils.subprocess_server: beam:transform:org.apache.beam:kafka_write:v1: org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$1@459e9125 INFO:apache_beam.utils.subprocess_server: beam:external:java:generate_sequence:v1: org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$1@128d2484 INFO:apache_beam.utils.subprocess_server: INFO:apache_beam.utils.subprocess_server:Registered SchemaTransformProviders: INFO:apache_beam.utils.subprocess_server: beam:schematransform:org.apache.beam:kafka_read:v1 INFO:apache_beam.utils.subprocess_server: beam:schematransform:org.apache.beam:kafka_write:v1 WARNING:root:Waiting for grpc channel to be ready at localhost:35371. WARNING:root:Waiting for grpc channel to be ready at localhost:35371. WARNING:root:Waiting for grpc channel to be ready at localhost:35371. WARNING:root:Waiting for grpc channel to be ready at localhost:35371. INFO:apache_beam.utils.subprocess_server:Aug 09, 2023 8:35:06 AM org.apache.beam.sdk.expansion.service.ExpansionService expand INFO:apache_beam.utils.subprocess_server:INFO: Expanding 'Read from Kafka' with URN 'beam:transform:org.apache.beam:kafka_read_without_metadata:v1' INFO:apache_beam.utils.subprocess_server:Dependencies list: {} INFO:apache_beam.utils.subprocess_server:Aug 09, 2023 8:35:07 AM org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader payloadToConfig INFO:apache_beam.utils.subprocess_server:WARNING: Configuration class 'org.apache.beam.sdk.io.kafka.KafkaIO$Read$External$Configuration' has no schema registered. Attempting to construct with setter approach. INFO:apache_beam.utils.subprocess_server:Aug 09, 2023 8:35:08 AM org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader payloadToConfig INFO:apache_beam.utils.subprocess_server:WARNING: Configuration class 'org.apache.beam.sdk.io.kafka.KafkaIO$Read$External$Configuration' has no schema registered. Attempting to construct with setter approach. INFO:root:Default Python SDK image for environment is apache/beam_python3.8_sdk:2.48.0 INFO:apache_beam.runners.portability.fn_api_runner.translations:==================== <function pack_combiners at 0x402e7a95e0> ==================== INFO:apache_beam.runners.portability.fn_api_runner.translations:==================== <function lift_combiners at 0x402e7a9670> ==================== INFO:apache_beam.runners.portability.fn_api_runner.translations:==================== <function sort_stages at 0x402e7a9dc0> ==================== INFO:apache_beam.runners.portability.portable_runner:Job state changed to STOPPED INFO:apache_beam.runners.portability.portable_runner:Job state changed to STARTING INFO:apache_beam.runners.portability.portable_runner:Job state changed to RUNNING *Error* 2023-08-09 10:25:37,146 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: taskmanager.rpc.port, 6122 2023-08-09 10:25:37,260 INFO org.apache.flink.runtime.taskmanager.Task [] - Source: Impulse -> [3]Read from Kafka/KafkaIO.Read/KafkaIO.Read.ReadFromKafkaViaSDF/{ParDo(GenerateKafkaSourceDescriptor), KafkaIO.ReadSourceDescriptors} (1/1)#0 (3a42a4a4b7edf55899dc956496d8f99b_cbc357ccb763df2852fee8c4fc7d55f2_0_0) switched from INITIALIZING to RUNNING. 2023-08-09 10:25:37,724 INFO org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder [] - Finished to build heap keyed state-backend. 2023-08-09 10:25:37,731 INFO org.apache.flink.runtime.state.heap.HeapKeyedStateBackend [] - Initializing heap keyed state backend with stream factory. 2023-08-09 10:25:37,771 INFO org.apache.beam.runners.fnexecution.control.FnApiControlClientPoolService [] - getProcessBundleDescriptor request with id 1-4 2023-08-09 10:25:37,876 INFO /usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py:889 [] - Creating insecure state channel for localhost:45429. 2023-08-09 10:25:37,877 INFO /usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py:896 [] - State channel established. 2023-08-09 10:25:37,918 INFO /usr/local/lib/python3.8/site-packages/apache_beam/transforms/environments.py:376 [] - Default Python SDK image for environment is apache/beam_python3.8_sdk:2.48.0 2023-08-09 10:25:37,928 ERROR /usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py:299 [] - Error processing instruction 1. Original traceback is Traceback (most recent call last): File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 295, in _execute response = task() File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 370, in <lambda> lambda: self.create_worker().do_instruction(request), request) File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 629, in do_instruction return getattr(self, request_type)( File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 660, in process_bundle bundle_processor = self.bundle_processor_cache.get( File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 491, in get processor = bundle_processor.BundleProcessor( File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 876, in __init__ _verify_descriptor_created_in_a_compatible_env(process_bundle_descriptor) File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 839, in _verify_descriptor_created_in_a_compatible_env raise RuntimeError( *RuntimeError: Pipeline construction environment and pipeline runtime environment are not compatible. If you use a custom container image, check that the Python interpreter minor version and the Apache Beam version in your image match the versions used at pipeline construction time. Submission environment: beam:version:sdk_base:apache/beam_java8_sdk:2.48.0. Runtime environment: beam:version:sdk_base:apache/beam_python3.8_sdk:2.48.0.* 2023-08-09 10:25:37,931 INFO org.apache.flink.runtime.taskmanager.Task [] - [3]Read from Kafka/{KafkaIO.Read, Remove Kafka Metadata} -> [1]Print messages (1/1)#0 (3a42a4a4b7edf55899dc956496d8f99b_03f93075562d7d50bb0b07080b2ebe35_0_0) switched from INITIALIZING to RUNNING. 2023-08-09 10:28:37,784 WARN org.apache.flink.runtime.taskmanager.Task [] - Source: Impulse -> [3]Read from Kafka/KafkaIO.Read/KafkaIO.Read.ReadFromKafkaViaSDF/{ParDo(GenerateKafkaSourceDescriptor), KafkaIO.ReadSourceDescriptors} (1/1)#0 (3a42a4a4b7edf55899dc956496d8f99b_cbc357ccb763df2852fee8c4fc7d55f2_0_0) switched from RUNNING to FAILED with failure cause: java.lang.RuntimeException: Failed to start remote bundle Thanks kapil Dev