phoerious opened a new issue, #29683:
URL: https://github.com/apache/beam/issues/29683
### What happened?
I'm trying to get the SparkRunner working properly with pre-compiled JARs
from my Python job (created with `--output_executable=job.jar`) that can be run
with `spark-submit` on Kubernetes. Unfortunately, there seems to be a problem
with the log routing / plumbing, so most of the job logs just get lost in the
weeds.
I first tried the `PROCESS` environment type in a custom Spark image, but
any output from the `boot` binary in the container gets lost entirely. Hence
the job either fails or succeeds, but I have no idea why, unless I attach to
the executor container and strace the `boot` process.
I then tried `EXTERNAL`, because there at least I can check the logs of the
sidecar SDK container. The example job is running, but the stdout/stderr of the
job itself isn't fed back to the Spark driver, so even in client, mode I have
no idea what's going on.
There seems to be some sort of problem with the GRPC log server. The Spark
executor container shows a bunch of these stacktraces:
```
23/12/08 09:52:19 INFO GrpcLoggingService: Beam Fn Logging client connected.
23/12/08 09:52:31 INFO GrpcLoggingService: Beam Fn Logging client connected.
23/12/08 09:52:31 WARN py:291: Not setting flag with value None: runner
23/12/08 09:52:31 INFO py:111: semi_persistent_directory: /tmp
23/12/08 09:52:31 WARN py:356: No session file found:
/tmp/staged/pickled_main_session. Functions defined in __main__ (interactive
session) may fail.
23/12/08 09:52:31 WARN py:367: Discarding unparseable args:
['--app_name=BeamApp-roce3528-1208092121-c24b0136_accc3bf6-1ac5-4869-93df-9a9553150bef',
'--direct_runner_use_stacked_bundle', '--enable_spark_metric_sinks',
'--log_mdc', '--options_id=2', '--pipeline_type_check']
23/12/08 09:52:31 INFO py:135: Pipeline_options: {'job_name':
'BeamApp-roce3528-1208092121-c24b0136', 'gcp_oauth_scopes':
['https://www.googleapis.com/auth/bigquery',
'https://www.googleapis.com/auth/cloud-platform',
'https://www.googleapis.com/auth/devstorage.full_control',
'https://www.googleapis.com/auth/userinfo.email',
'https://www.googleapis.com/auth/datastore',
'https://www.googleapis.com/auth/spanner.admin',
'https://www.googleapis.com/auth/spanner.data',
'https://www.googleapis.com/auth/bigquery',
'https://www.googleapis.com/auth/cloud-platform',
'https://www.googleapis.com/auth/devstorage.full_control',
'https://www.googleapis.com/auth/userinfo.email',
'https://www.googleapis.com/auth/datastore',
'https://www.googleapis.com/auth/spanner.admin',
'https://www.googleapis.com/auth/spanner.data'], 'experiments':
['beam_fn_api'], 'sdk_location': 'container', 'environment_type': 'EXTERNAL',
'environment_config': 'localhost:50000', 'sdk_worker_parallelism': '1',
'environment_cac
he_millis': '0', 'output_executable_path': 'job.jar'}
23/12/08 09:52:31 INFO py:234: Creating state cache with size 104857600
23/12/08 09:52:31 INFO py:187: Creating insecure control channel for
localhost:33659.
23/12/08 09:52:31 INFO py:195: Control channel established.
23/12/08 09:52:31 INFO py:243: Initializing SDKHarness with unbounded number
of workers.
23/12/08 09:52:31 INFO py:211: Python sdk harness starting.
23/12/08 09:52:31 INFO FnApiControlClientPoolService: Beam Fn Control client
connected with id 1-1
23/12/08 09:52:31 INFO FnApiControlClientPoolService:
getProcessBundleDescriptor request with id 1-2
23/12/08 09:52:31 INFO py:885: Creating insecure state channel for
localhost:45221.
23/12/08 09:52:31 INFO py:892: State channel established.
23/12/08 09:52:31 INFO py:770: Creating client data channel for
localhost:37017
23/12/08 09:52:31 INFO GrpcDataService: Beam Fn Data client connected.
23/12/08 09:52:32 INFO DefaultJobBundleFactory: Closing environment urn:
"beam:env:external:v1"
payload: "\n\021\n\017localhost:50000"
capabilities: "beam:coder:bytes:v1"
capabilities: "beam:coder:string_utf8:v1"
capabilities: "beam:coder:kv:v1"
capabilities: "beam:coder:bool:v1"
capabilities: "beam:coder:varint:v1"
capabilities: "beam:coder:double:v1"
capabilities: "beam:coder:iterable:v1"
capabilities: "beam:coder:timer:v1"
capabilities: "beam:coder:interval_window:v1"
capabilities: "beam:coder:length_prefix:v1"
capabilities: "beam:coder:global_window:v1"
capabilities: "beam:coder:windowed_value:v1"
capabilities: "beam:coder:param_windowed_value:v1"
capabilities: "beam:coder:state_backed_iterable:v1"
capabilities: "beam:coder:custom_window:v1"
capabilities: "beam:coder:row:v1"
capabilities: "beam:coder:sharded_key:v1"
capabilities: "beam:coder:nullable:v1"
capabilities: "beam:protocol:progress_reporting:v0"
capabilities: "beam:protocol:harness_monitoring_infos:v1"
capabilities: "beam:protocol:worker_status:v1"
capabilities: "beam:combinefn:packed_python:v1"
capabilities: "beam:version:sdk_base:apache/beam_python3.11_sdk:2.52.0"
capabilities: "beam:transform:sdf_truncate_sized_restrictions:v1"
capabilities: "beam:transform:to_string:v1"
capabilities: "beam:protocol:data_sampling:v1"
23/12/08 09:52:32 INFO GrpcLoggingService: 2 Beam Fn Logging clients still
connected during shutdown.
23/12/08 09:52:32 WARN BeamFnDataGrpcMultiplexer: Hanged up for unknown
endpoint.
23/12/08 09:52:32 ERROR SerializingExecutor: Exception while executing
runnable
org.apache.beam.vendor.grpc.v1p54p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed@721ac343
java.lang.IllegalStateException: call already closed
at
org.apache.beam.vendor.grpc.v1p54p0.com.google.common.base.Preconditions.checkState(Preconditions.java:502)
at
org.apache.beam.vendor.grpc.v1p54p0.io.grpc.internal.ServerCallImpl.closeInternal(ServerCallImpl.java:219)
at
org.apache.beam.vendor.grpc.v1p54p0.io.grpc.internal.ServerCallImpl.close(ServerCallImpl.java:212)
at
org.apache.beam.vendor.grpc.v1p54p0.io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl.onCompleted(ServerCalls.java:395)
at
org.apache.beam.runners.fnexecution.state.GrpcStateService$Inbound.onCompleted(GrpcStateService.java:150)
at
org.apache.beam.vendor.grpc.v1p54p0.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onHalfClose(ServerCalls.java:273)
at
org.apache.beam.vendor.grpc.v1p54p0.io.grpc.PartialForwardingServerCallListener.onHalfClose(PartialForwardingServerCallListener.java:35)
at
org.apache.beam.vendor.grpc.v1p54p0.io.grpc.ForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:23)
at
org.apache.beam.vendor.grpc.v1p54p0.io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:40)
at
org.apache.beam.vendor.grpc.v1p54p0.io.grpc.Contexts$ContextualizedServerCallListener.onHalfClose(Contexts.java:86)
at
org.apache.beam.vendor.grpc.v1p54p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:355)
at
org.apache.beam.vendor.grpc.v1p54p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:867)
at
org.apache.beam.vendor.grpc.v1p54p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
at
org.apache.beam.vendor.grpc.v1p54p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.base/java.lang.Thread.run(Unknown Source)
23/12/08 09:52:33 INFO Executor: Finished task 1.0 in stage 0.0 (TID 1).
12698 bytes result sent to driver
```
and the SDK container shows
```
2023/12/08 09:51:24 Starting worker pool 1: python -m
apache_beam.runners.worker.worker_pool_main --service_port=50000
--container_executable=/opt/apache/beam/boot
Starting worker with command ['/opt/apache/beam/boot', '--id=1-1',
'--logging_endpoint=localhost:43205', '--artifact_endpoint=localhost:37223',
'--provision_endpoint=localhost:46595', '--control_endpoint=localhost:33659']
2023/12/08 09:52:19 Provision info:
pipeline_options:{fields:{key:"beam:option:allow_non_deterministic_key_coders:v1"
value:{bool_value:false}} fields:{key:"beam:option:allow_unsafe_triggers:v1"
value:{bool_value:false}} fields:{key:"beam:option:app_name:v1"
value:{string_value:"BeamApp-roce3528-1208092121-c24b0136_accc3bf6-1ac5-4869-93df-9a9553150bef"}}
fields:{key:"beam:option:artifact_port:v1" value:{string_value:"0"}}
fields:{key:"beam:option:auto_unique_labels:v1" value:{bool_value:false}}
fields:{key:"beam:option:beam_services:v1" value:{struct_value:{}}}
fields:{key:"beam:option:cache_disabled:v1" value:{bool_value:false}}
fields:{key:"beam:option:dataflow_endpoint:v1"
value:{string_value:"https://dataflow.googleapis.com"}}
fields:{key:"beam:option:direct_embed_docker_python:v1"
value:{bool_value:false}} fields:{key:"beam:option:direct_num_workers:v1"
value:{string_value:"1"}}
fields:{key:"beam:option:direct_runner_bundle_repeat:v1"
value:{string_value:"0"}} fields:{key:"beam:option:dire
ct_runner_use_stacked_bundle:v1" value:{bool_value:true}}
fields:{key:"beam:option:direct_running_mode:v1"
value:{string_value:"in_memory"}}
fields:{key:"beam:option:direct_test_splits:v1" value:{struct_value:{}}}
fields:{key:"beam:option:dry_run:v1" value:{bool_value:false}}
fields:{key:"beam:option:enable_artifact_caching:v1" value:{bool_value:false}}
fields:{key:"beam:option:enable_heap_dumps:v1" value:{bool_value:false}}
fields:{key:"beam:option:enable_hot_key_logging:v1" value:{bool_value:false}}
fields:{key:"beam:option:enable_spark_metric_sinks:v1"
value:{bool_value:true}} fields:{key:"beam:option:enable_streaming_engine:v1"
value:{bool_value:false}}
fields:{key:"beam:option:environment_cache_millis:v1"
value:{string_value:"0"}} fields:{key:"beam:option:environment_config:v1"
value:{string_value:"localhost:50000"}}
fields:{key:"beam:option:environment_type:v1" value:{string_value:"EXTERNAL"}}
fields:{key:"beam:option:expansion_port:v1" value:{string_v
alue:"0"}} fields:{key:"beam:option:experiments:v1"
value:{list_value:{values:{string_value:"beam_fn_api"}}}}
fields:{key:"beam:option:flink_master:v1" value:{string_value:"[auto]"}}
fields:{key:"beam:option:flink_submit_uber_jar:v1" value:{bool_value:false}}
fields:{key:"beam:option:flink_version:v1" value:{string_value:"1.16"}}
fields:{key:"beam:option:gcp_oauth_scopes:v1"
value:{list_value:{values:{string_value:"https://www.googleapis.com/auth/bigquery"}
values:{string_value:"https://www.googleapis.com/auth/cloud-platform"}
values:{string_value:"https://www.googleapis.com/auth/devstorage.full_control"}
values:{string_value:"https://www.googleapis.com/auth/userinfo.email"}
values:{string_value:"https://www.googleapis.com/auth/datastore"}
values:{string_value:"https://www.googleapis.com/auth/spanner.admin"}
values:{string_value:"https://www.googleapis.com/auth/spanner.data"}}}}
fields:{key:"beam:option:gcs_performance_metrics:v1" value:{bool_value:false}}
fields:
{key:"beam:option:hdfs_full_urls:v1" value:{bool_value:false}}
fields:{key:"beam:option:job_name:v1"
value:{string_value:"BeamApp-roce3528-1208092121-c24b0136"}}
fields:{key:"beam:option:job_port:v1" value:{string_value:"0"}}
fields:{key:"beam:option:job_server_java_launcher:v1"
value:{string_value:"java"}}
fields:{key:"beam:option:job_server_jvm_properties:v1" value:{list_value:{}}}
fields:{key:"beam:option:job_server_timeout:v1" value:{string_value:"60"}}
fields:{key:"beam:option:load_balance_bundles:v1" value:{bool_value:false}}
fields:{key:"beam:option:log_mdc:v1" value:{bool_value:true}}
fields:{key:"beam:option:max_cache_memory_usage_mb:v1"
value:{string_value:"100"}} fields:{key:"beam:option:max_parallelism:v1"
value:{string_value:"-1"}} fields:{key:"beam:option:no_auth:v1"
value:{bool_value:false}} fields:{key:"beam:option:options_id:v1"
value:{number_value:2}} fields:{key:"beam:option:output_executable_path:v1"
value:{string_value:"job.jar"}} field
s:{key:"beam:option:parallelism:v1" value:{string_value:"-1"}}
fields:{key:"beam:option:performance_runtime_type_check:v1"
value:{bool_value:false}} fields:{key:"beam:option:pickle_library:v1"
value:{string_value:"default"}}
fields:{key:"beam:option:pipeline_type_check:v1" value:{bool_value:true}}
fields:{key:"beam:option:profile_cpu:v1" value:{bool_value:false}}
fields:{key:"beam:option:profile_memory:v1" value:{bool_value:false}}
fields:{key:"beam:option:profile_sample_rate:v1" value:{number_value:1}}
fields:{key:"beam:option:requirements_cache_only_sources:v1"
value:{bool_value:false}} fields:{key:"beam:option:resource_hints:v1"
value:{list_value:{}}} fields:{key:"beam:option:retain_docker_containers:v1"
value:{bool_value:false}} fields:{key:"beam:option:runner:v1"
value:{null_value:NULL_VALUE}} fields:{key:"beam:option:runtime_type_check:v1"
value:{bool_value:false}} fields:{key:"beam:option:s3_disable_ssl:v1"
value:{bool_value:false}} fields:{key:"beam
:option:save_main_session:v1" value:{bool_value:false}}
fields:{key:"beam:option:sdk_location:v1" value:{string_value:"container"}}
fields:{key:"beam:option:sdk_worker_parallelism:v1" value:{string_value:"1"}}
fields:{key:"beam:option:spark_master:v1" value:{string_value:"local[4]"}}
fields:{key:"beam:option:spark_master_url:v1" value:{string_value:"local[4]"}}
fields:{key:"beam:option:spark_submit_uber_jar:v1" value:{bool_value:false}}
fields:{key:"beam:option:spark_version:v1" value:{string_value:"3"}}
fields:{key:"beam:option:streaming:v1" value:{bool_value:false}}
fields:{key:"beam:option:test_mode:v1" value:{bool_value:false}}
fields:{key:"beam:option:type_check_additional:v1" value:{string_value:""}}
fields:{key:"beam:option:type_check_strictness:v1"
value:{string_value:"DEFAULT_TO_ANY"}} fields:{key:"beam:option:update:v1"
value:{bool_value:false}}
fields:{key:"beam:option:use_active_spark_session:v1"
value:{bool_value:false}} fields:{key:"beam:optio
n:use_transform_service:v1" value:{bool_value:false}}
fields:{key:"beam:option:uses_provided_spark_context:v1"
value:{bool_value:false}}} retrieval_token:"__no_artifacts_staged__"
logging_endpoint:{url:"localhost:43205"}
artifact_endpoint:{url:"localhost:37223"}
control_endpoint:{url:"localhost:33659"}
runner_capabilities:"beam:protocol:control_response_elements_embedding:v1"
2023/12/08 09:52:32 boot.go: error logging message over FnAPI. endpoint
localhost:43205 error: EOF message follows
2023/12/08 09:52:32 DEBUG Python (worker 1-1) exited.
2023/12/08 09:52:32 boot.go: error logging message over FnAPI. endpoint
localhost:43205 error: EOF message follows
2023/12/08 09:52:32 DEBUG Received signal: terminated
2023/12/08 09:52:32 boot.go: error logging message over FnAPI. endpoint
localhost:43205 error: EOF message follows
2023/12/08 09:52:32 DEBUG Cleaned up temporary venv for worker 1-1.
```
This is repeated several times with different port numbers (I guess for
different stages).
```
With the fruit sampling example from the website, I finally get
```
2023/12/08 09:53:29 boot.go: error logging message over FnAPI. endpoint
localhost:43597 error: EOF message follows
2023/12/08 09:53:29 DEBUG ['🍓 Strawberry', '🥕 Carrot', '🍅 Tomato']
2023/12/08 09:53:29 boot.go: error logging message over FnAPI. endpoint
localhost:43597 error: EOF message follows
2023/12/08 09:53:29 DEBUG Python (worker 5-1) exited.
2023/12/08 09:53:29 boot.go: error logging message over FnAPI. endpoint
localhost:43597 error: EOF message follows
2023/12/08 09:53:29 DEBUG Received signal: terminated
2023/12/08 09:53:29 boot.go: error logging message over FnAPI. endpoint
localhost:43597 error: EOF message follows
2023/12/08 09:53:29 DEBUG Cleaned up temporary venv for worker 5-1
```
in one of the (two) SDK container outputs, but it's never fed back to the
Spark executor or the Spark driver, probably due to this log endpoint error.
### Issue Priority
Priority: 3 (minor)
### Issue Components
- [X] Component: Python SDK
- [ ] Component: Java SDK
- [ ] Component: Go SDK
- [ ] Component: Typescript SDK
- [ ] Component: IO connector
- [ ] Component: Beam YAML
- [ ] Component: Beam examples
- [ ] Component: Beam playground
- [ ] Component: Beam katas
- [ ] Component: Website
- [X] Component: Spark Runner
- [ ] Component: Flink Runner
- [ ] Component: Samza Runner
- [ ] Component: Twister2 Runner
- [ ] Component: Hazelcast Jet Runner
- [ ] Component: Google Cloud Dataflow Runner
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]