Hey,

I've been grappling with this issue for the past five days and, despite my
continuous efforts, I haven't found a resolution. Additionally, I've been
unable to locate a Slack channel for Beam where I might seek assistance.

issue

*RuntimeError: Pipeline construction environment and pipeline runtime
environment are not compatible. If you use a custom container image, check
that the Python interpreter minor version and the Apache Beam version in
your image match the versions used at pipeline construction time.
Submission environment: beam:version:sdk_base:apache/beam_java8_sdk:2.48.0.
Runtime environment:
beam:version:sdk_base:apache/beam_python3.8_sdk:2.48.0.*


Here what i am trying to do

 i am running job from kubernetes container  that hits on job server and
then job manager and task manager
task manager and job manager is one Container

Here is  My custom Dockerfile. name:custom-flink

# Starting with the base Flink image
FROM apache/flink:1.16-java11
ARG FLINK_VERSION=1.16
ARG KAFKA_VERSION=2.8.0

# Install python3.8 and its associated dependencies, followed by pyflink
RUN set -ex; \
apt-get update && \
apt-get install -y build-essential libssl-dev zlib1g-dev libbz2-dev
libffi-dev lzma liblzma-dev && \
wget https://www.python.org/ftp/python/3.8.0/Python-3.8.0.tgz && \
tar -xvf Python-3.8.0.tgz && \
cd Python-3.8.0 && \
./configure --without-tests --enable-shared && \
make -j4 && \
make install && \
ldconfig /usr/local/lib && \
cd .. && rm -f Python-3.8.0.tgz && rm -rf Python-3.8.0 && \
ln -s /usr/local/bin/python3.8 /usr/local/bin/python && \
ln -s /usr/local/bin/pip3.8 /usr/local/bin/pip && \
apt-get clean && \
rm -rf /var/lib/apt/lists/* && \
python -m pip install --upgrade pip; \
pip install apache-flink==${FLINK_VERSION}; \
pip install kafka-python

RUN pip install --no-cache-dir apache-beam[gcp]==2.48.0

# Copy files from official SDK image, including script/dependencies.
COPY --from=apache/beam_python3.8_sdk:2.48.0 /opt/apache/beam/
/opt/apache/beam/

# java SDK
COPY --from=apache/beam_java11_sdk:2.48.0 /opt/apache/beam/
/opt/apache/beam_java/

RUN apt-get update && apt-get install -y python3-venv && rm -rf
/var/lib/apt/lists/*

# Give permissions to the /opt/apache/beam-venv directory
RUN mkdir -p /opt/apache/beam-venv && chown -R 9999:9999
/opt/apache/beam-venv

Here is my Deployment file for Job manager,Task manager plus worker-pool
and job server


apiVersion: v1
kind: Service
metadata:
name: flink-jobmanager
namespace: flink
spec:
type: ClusterIP
ports:
- name: rpc
port: 6123
- name: blob-server
port: 6124
- name: webui
port: 8081
selector:
app: flink
component: jobmanager
---
apiVersion: v1
kind: Service
metadata:
name: beam-worker-pool
namespace: flink
spec:
selector:
app: flink
component: taskmanager
ports:
- protocol: TCP
port: 50000
targetPort: 50000
name: pool
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: flink-jobmanager
namespace: flink
spec:
replicas: 1
selector:
matchLabels:
app: flink
component: jobmanager
template:
metadata:
labels:
app: flink
component: jobmanager
spec:
containers:
- name: jobmanager
image: custom-flink:latest
imagePullPolicy: IfNotPresent
args: ["jobmanager"]
ports:
- containerPort: 6123
name: rpc
- containerPort: 6124
name: blob-server
- containerPort: 8081
name: webui
livenessProbe:
tcpSocket:
port: 6123
initialDelaySeconds: 30
periodSeconds: 60
volumeMounts:
- name: flink-config-volume
mountPath: /opt/flink/conf
- name: flink-staging
mountPath: /tmp/beam-artifact-staging
securityContext:
runAsUser: 9999
resources:
requests:
memory: "1Gi"
cpu: "1"
limits:
memory: "1Gi"
cpu: "1"
volumes:
- name: flink-config-volume
configMap:
name: flink-config
items:
- key: flink-conf.yaml
path: flink-conf.yaml
- key: log4j-console.properties
path: log4j-console.properties
- name: flink-staging
persistentVolumeClaim:
claimName: staging-artifacts-claim
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: flink-taskmanager
namespace: flink
spec:
replicas: 1
selector:
matchLabels:
app: flink
component: taskmanager
template:
metadata:
labels:
app: flink
component: taskmanager
spec:
containers:
- name: taskmanager-beam-worker
image: custom-flink:latest
imagePullPolicy: IfNotPresent
args:
- /bin/bash
- -c
- "/opt/flink/bin/taskmanager.sh start-foreground & python -m
apache_beam.runners.worker.worker_pool_main
--container_executable=/opt/apache/beam/boot --service_port=50000 & tail -f
/dev/null"
ports:
- containerPort: 6122
name: rpc
- containerPort: 6125
name: query-state
- containerPort: 50000
name: pool
livenessProbe:
tcpSocket:
port: 6122
initialDelaySeconds: 30
periodSeconds: 60
volumeMounts:
- name: flink-config-volume
mountPath: /opt/flink/conf/
- name: flink-staging
mountPath: /tmp/beam-artifact-staging
securityContext:
runAsUser: 9999
resources:
requests:
memory: "4Gi"
cpu: "4"
limits:
memory: "4Gi"
cpu: "4"
volumes:
- name: flink-config-volume
configMap:
name: flink-config
items:
- key: flink-conf.yaml
path: flink-conf.yaml
- key: log4j-console.properties
path: log4j-console.properties
- name: flink-staging
persistentVolumeClaim:
claimName: staging-artifacts-claim
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: beam-jobserver
namespace: flink
spec:
replicas: 1
selector:
matchLabels:
app: beam
component: jobserver
template:
metadata:
labels:
app: beam
component: jobserver
spec:
containers:
- name: beam-jobserver
image: apache/beam_flink1.16_job_server:2.48.0
args: ["--flink-master=flink-jobmanager:8081"]
ports:
- containerPort: 8097
- containerPort: 8098
- containerPort: 8099
volumeMounts:
- name: beam-staging
mountPath: /tmp/beam-artifact-staging
resources:
requests:
memory: "2Gi"
cpu: "2"
limits:
memory: "2Gi"
cpu: "2"
volumes:
- name: beam-staging
persistentVolumeClaim:
claimName: staging-artifacts-claim
---
apiVersion: v1
kind: Service
metadata:
name: beam-jobserver
namespace: flink
spec:
type: ClusterIP
ports:
- name: grpc-port
port: 8097
targetPort: 8097
- name: expansion-port
port: 8098
targetPort: 8098
- name: job-manage-port
port: 8099
targetPort: 8099
selector:
app: beam
component: jobserver

pvc

apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: staging-artifacts-claim
namespace: flink
spec:
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 5Gi
storageClassName: standard



Then I am running a Pod  with apache/beam_python3.8_sdk:2.48.0.
and installing java in it because expansion required to run the code here
is my code that is running from above container
```
import json
import logging
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io.kafka import ReadFromKafka, default_io_expansion_service

def run_beam_pipeline():
logging.getLogger().setLevel(logging.INFO)

consumer_config = {
'bootstrap.servers':
'cluster-0-kafka-bootstrap.strimzi.svc.cluster.local:9092',
'group.id': 'beamgrouptest',
'auto.offset.reset': 'earliest',
'key.deserializer':
'org.apache.kafka.common.serialization.StringDeserializer',
'value.deserializer':
'org.apache.kafka.common.serialization.StringDeserializer',
}

topic = 'locations'

flink_options = PipelineOptions([
"--runner=PortableRunner",
"--artifact_endpoint=beam-jobserver:8098",
"--job_endpoint=beam-jobserver:8099",
"--environment_type=EXTERNAL",
"--environment_config=beam-worker-pool:50000",
# "--environment_config={\"command\": \"/opt/apache/beam/boot\"}",
])

with beam.Pipeline(options=flink_options) as pipeline:
messages = (
pipeline
| "Read from Kafka" >> ReadFromKafka(
consumer_config=consumer_config,
topics=[topic],
with_metadata=False,
expansion_service=default_io_expansion_service(
append_args=[
'--defaultEnvironmentType=PROCESS',
"--defaultEnvironmentConfig={\"command\": \"/opt/apache/beam/boot\"}",
]
)
)
| "Print messages" >> beam.Map(print)
)

logging.info("Pipeline execution completed.")

if __name__ == '__main__':
run_beam_pipeline()

```

When starting a job here is logs, it is downloading java expansion service.


python3 testing.py





<jemalloc>: MADV_DONTNEED does not work (memset will be used instead)

<jemalloc>: (This is the expected behaviour if you are running under QEMU)

INFO:apache_beam.utils.subprocess_server:Using cached job server jar from
https://repo.maven.apache.org/maven2/org/apache/beam/beam-sdks-java-io-expansion-service/2.48.0/beam-sdks-java-io-expansion-service-2.48.0.jar

INFO:root:Starting a JAR-based expansion service from JAR
/root/.apache_beam/cache/jars/beam-sdks-java-io-expansion-service-2.48.0.jar


INFO:apache_beam.utils.subprocess_server:Starting service with ['java'
'-jar'
'/root/.apache_beam/cache/jars/beam-sdks-java-io-expansion-service-2.48.0.jar'
'35371'
'--filesToStage=/root/.apache_beam/cache/jars/beam-sdks-java-io-expansion-service-2.48.0.jar'
'--defaultEnvironmentType=PROCESS' '--defaultEnvironmentConfig={"command":
"/opt/apache/beam/boot"}']

INFO:apache_beam.utils.subprocess_server:Starting expansion service at
localhost:35371

INFO:apache_beam.utils.subprocess_server:Aug 09, 2023 8:35:00 AM
org.apache.beam.sdk.expansion.service.ExpansionService
loadRegisteredTransforms

INFO:apache_beam.utils.subprocess_server:INFO: Registering external
transforms: [beam:transform:org.apache.beam:kafka_read_with_metadata:v1,
beam:transform:org.apache.beam:kafka_read_without_metadata:v1,
beam:transform:org.apache.beam:kafka_write:v1,
beam:external:java:generate_sequence:v1]

INFO:apache_beam.utils.subprocess_server:

INFO:apache_beam.utils.subprocess_server:Registered transforms:

INFO:apache_beam.utils.subprocess_server:
beam:transform:org.apache.beam:kafka_read_with_metadata:v1:
org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$1@5c6648b0

INFO:apache_beam.utils.subprocess_server:
beam:transform:org.apache.beam:kafka_read_without_metadata:v1:
org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$1@6f1de4c7

INFO:apache_beam.utils.subprocess_server:
beam:transform:org.apache.beam:kafka_write:v1:
org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$1@459e9125

INFO:apache_beam.utils.subprocess_server:
beam:external:java:generate_sequence:v1:
org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$1@128d2484

INFO:apache_beam.utils.subprocess_server:

INFO:apache_beam.utils.subprocess_server:Registered
SchemaTransformProviders:

INFO:apache_beam.utils.subprocess_server:
beam:schematransform:org.apache.beam:kafka_read:v1

INFO:apache_beam.utils.subprocess_server:
beam:schematransform:org.apache.beam:kafka_write:v1

WARNING:root:Waiting for grpc channel to be ready at localhost:35371.




WARNING:root:Waiting for grpc channel to be ready at localhost:35371.

WARNING:root:Waiting for grpc channel to be ready at localhost:35371.

WARNING:root:Waiting for grpc channel to be ready at localhost:35371.

INFO:apache_beam.utils.subprocess_server:Aug 09, 2023 8:35:06 AM
org.apache.beam.sdk.expansion.service.ExpansionService expand

INFO:apache_beam.utils.subprocess_server:INFO: Expanding 'Read from Kafka'
with URN 'beam:transform:org.apache.beam:kafka_read_without_metadata:v1'

INFO:apache_beam.utils.subprocess_server:Dependencies list: {}

INFO:apache_beam.utils.subprocess_server:Aug 09, 2023 8:35:07 AM
org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader
payloadToConfig

INFO:apache_beam.utils.subprocess_server:WARNING: Configuration class
'org.apache.beam.sdk.io.kafka.KafkaIO$Read$External$Configuration' has no
schema registered. Attempting to construct with setter approach.

INFO:apache_beam.utils.subprocess_server:Aug 09, 2023 8:35:08 AM
org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader
payloadToConfig

INFO:apache_beam.utils.subprocess_server:WARNING: Configuration class
'org.apache.beam.sdk.io.kafka.KafkaIO$Read$External$Configuration' has no
schema registered. Attempting to construct with setter approach.

INFO:root:Default Python SDK image for environment is
apache/beam_python3.8_sdk:2.48.0

INFO:apache_beam.runners.portability.fn_api_runner.translations:====================
<function pack_combiners at 0x402e7a95e0> ====================

INFO:apache_beam.runners.portability.fn_api_runner.translations:====================
<function lift_combiners at 0x402e7a9670> ====================

INFO:apache_beam.runners.portability.fn_api_runner.translations:====================
<function sort_stages at 0x402e7a9dc0> ====================

INFO:apache_beam.runners.portability.portable_runner:Job state changed to
STOPPED

INFO:apache_beam.runners.portability.portable_runner:Job state changed to
STARTING

INFO:apache_beam.runners.portability.portable_runner:Job state changed to
RUNNING







*Error*

2023-08-09 10:25:37,146 INFO
org.apache.flink.configuration.GlobalConfiguration
          [] - Loading configuration property: taskmanager.rpc.port, 6122

2023-08-09 10:25:37,260 INFO  org.apache.flink.runtime.taskmanager.Task
                [] - Source: Impulse -> [3]Read from
Kafka/KafkaIO.Read/KafkaIO.Read.ReadFromKafkaViaSDF/{ParDo(GenerateKafkaSourceDescriptor),
KafkaIO.ReadSourceDescriptors} (1/1)#0
(3a42a4a4b7edf55899dc956496d8f99b_cbc357ccb763df2852fee8c4fc7d55f2_0_0)
switched from INITIALIZING to RUNNING.

2023-08-09 10:25:37,724 INFO
org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder
[] - Finished to build heap keyed state-backend.

2023-08-09 10:25:37,731 INFO
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend    [] -
Initializing heap keyed state backend with stream factory.

2023-08-09 10:25:37,771 INFO
org.apache.beam.runners.fnexecution.control.FnApiControlClientPoolService
[] - getProcessBundleDescriptor request with id 1-4

2023-08-09 10:25:37,876 INFO
/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py:889
[] - Creating insecure state channel for localhost:45429.

2023-08-09 10:25:37,877 INFO
/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py:896
[] - State channel established.

2023-08-09 10:25:37,918 INFO
/usr/local/lib/python3.8/site-packages/apache_beam/transforms/environments.py:376
[] - Default Python SDK image for environment is
apache/beam_python3.8_sdk:2.48.0

2023-08-09 10:25:37,928 ERROR
/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py:299
[] - Error processing instruction 1. Original traceback is

Traceback (most recent call last):

  File
"/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 295, in _execute

    response = task()

  File
"/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 370, in <lambda>

    lambda: self.create_worker().do_instruction(request), request)

  File
"/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 629, in do_instruction

    return getattr(self, request_type)(

  File
"/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 660, in process_bundle

    bundle_processor = self.bundle_processor_cache.get(

  File
"/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 491, in get

    processor = bundle_processor.BundleProcessor(

  File
"/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 876, in __init__


_verify_descriptor_created_in_a_compatible_env(process_bundle_descriptor)

  File
"/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 839, in _verify_descriptor_created_in_a_compatible_env

    raise RuntimeError(

*RuntimeError: Pipeline construction environment and pipeline runtime
environment are not compatible. If you use a custom container image, check
that the Python interpreter minor version and the Apache Beam version in
your image match the versions used at pipeline construction time.
Submission environment: beam:version:sdk_base:apache/beam_java8_sdk:2.48.0.
Runtime environment:
beam:version:sdk_base:apache/beam_python3.8_sdk:2.48.0.*




2023-08-09 10:25:37,931 INFO  org.apache.flink.runtime.taskmanager.Task
                [] - [3]Read from Kafka/{KafkaIO.Read, Remove Kafka
Metadata} -> [1]Print messages (1/1)#0
(3a42a4a4b7edf55899dc956496d8f99b_03f93075562d7d50bb0b07080b2ebe35_0_0)
switched from INITIALIZING to RUNNING.

2023-08-09 10:28:37,784 WARN  org.apache.flink.runtime.taskmanager.Task
                [] - Source: Impulse -> [3]Read from
Kafka/KafkaIO.Read/KafkaIO.Read.ReadFromKafkaViaSDF/{ParDo(GenerateKafkaSourceDescriptor),
KafkaIO.ReadSourceDescriptors} (1/1)#0
(3a42a4a4b7edf55899dc956496d8f99b_cbc357ccb763df2852fee8c4fc7d55f2_0_0)
switched from RUNNING to FAILED with failure cause:
java.lang.RuntimeException: Failed to start remote bundle




Thanks
kapil Dev

Reply via email to