phoerious opened a new issue, #29683:
URL: https://github.com/apache/beam/issues/29683

   ### What happened?
   
   I'm trying to get the SparkRunner working properly with pre-compiled JARs 
from my Python job (created with `--output_executable=job.jar`) that can be run 
with `spark-submit` on Kubernetes. Unfortunately, there seems to be a problem 
with the log routing / plumbing, so most of the job logs just get lost in the 
weeds.
   
   I first tried the `PROCESS` environment type in a custom Spark image, but 
any output from the `boot` binary in the container gets lost entirely. Hence 
the job either fails or succeeds, but I have no idea why, unless I attach to 
the executor container and strace the `boot` process.
   
   I then tried `EXTERNAL`, because there at least I can check the logs of the 
sidecar SDK container. The example job is running, but the stdout/stderr of the 
job itself isn't fed back to the Spark driver, so even in client, mode I have 
no idea what's going on.
   
   There seems to be some sort of problem with the GRPC log server. The Spark 
executor container shows a bunch of these stacktraces:
   
   ```
   23/12/08 09:52:19 INFO GrpcLoggingService: Beam Fn Logging client connected.
   23/12/08 09:52:31 INFO GrpcLoggingService: Beam Fn Logging client connected.
   23/12/08 09:52:31 WARN py:291: Not setting flag with value None: runner 
   23/12/08 09:52:31 INFO py:111: semi_persistent_directory: /tmp
   23/12/08 09:52:31 WARN py:356: No session file found: 
/tmp/staged/pickled_main_session. Functions defined in __main__ (interactive 
session) may fail. 
   23/12/08 09:52:31 WARN py:367: Discarding unparseable args: 
['--app_name=BeamApp-roce3528-1208092121-c24b0136_accc3bf6-1ac5-4869-93df-9a9553150bef',
 '--direct_runner_use_stacked_bundle', '--enable_spark_metric_sinks', 
'--log_mdc', '--options_id=2', '--pipeline_type_check'] 
   23/12/08 09:52:31 INFO py:135: Pipeline_options: {'job_name': 
'BeamApp-roce3528-1208092121-c24b0136', 'gcp_oauth_scopes': 
['https://www.googleapis.com/auth/bigquery', 
'https://www.googleapis.com/auth/cloud-platform', 
'https://www.googleapis.com/auth/devstorage.full_control', 
'https://www.googleapis.com/auth/userinfo.email', 
'https://www.googleapis.com/auth/datastore', 
'https://www.googleapis.com/auth/spanner.admin', 
'https://www.googleapis.com/auth/spanner.data', 
'https://www.googleapis.com/auth/bigquery', 
'https://www.googleapis.com/auth/cloud-platform', 
'https://www.googleapis.com/auth/devstorage.full_control', 
'https://www.googleapis.com/auth/userinfo.email', 
'https://www.googleapis.com/auth/datastore', 
'https://www.googleapis.com/auth/spanner.admin', 
'https://www.googleapis.com/auth/spanner.data'], 'experiments': 
['beam_fn_api'], 'sdk_location': 'container', 'environment_type': 'EXTERNAL', 
'environment_config': 'localhost:50000', 'sdk_worker_parallelism': '1', 
'environment_cac
 he_millis': '0', 'output_executable_path': 'job.jar'}
   23/12/08 09:52:31 INFO py:234: Creating state cache with size 104857600
   23/12/08 09:52:31 INFO py:187: Creating insecure control channel for 
localhost:33659.
   23/12/08 09:52:31 INFO py:195: Control channel established.
   23/12/08 09:52:31 INFO py:243: Initializing SDKHarness with unbounded number 
of workers.
   23/12/08 09:52:31 INFO py:211: Python sdk harness starting.
   23/12/08 09:52:31 INFO FnApiControlClientPoolService: Beam Fn Control client 
connected with id 1-1
   23/12/08 09:52:31 INFO FnApiControlClientPoolService: 
getProcessBundleDescriptor request with id 1-2
   23/12/08 09:52:31 INFO py:885: Creating insecure state channel for 
localhost:45221.
   23/12/08 09:52:31 INFO py:892: State channel established.
   23/12/08 09:52:31 INFO py:770: Creating client data channel for 
localhost:37017
   23/12/08 09:52:31 INFO GrpcDataService: Beam Fn Data client connected.
   23/12/08 09:52:32 INFO DefaultJobBundleFactory: Closing environment urn: 
"beam:env:external:v1"
   payload: "\n\021\n\017localhost:50000"
   capabilities: "beam:coder:bytes:v1"
   capabilities: "beam:coder:string_utf8:v1"
   capabilities: "beam:coder:kv:v1"
   capabilities: "beam:coder:bool:v1"
   capabilities: "beam:coder:varint:v1"
   capabilities: "beam:coder:double:v1"
   capabilities: "beam:coder:iterable:v1"
   capabilities: "beam:coder:timer:v1"
   capabilities: "beam:coder:interval_window:v1"
   capabilities: "beam:coder:length_prefix:v1"
   capabilities: "beam:coder:global_window:v1"
   capabilities: "beam:coder:windowed_value:v1"
   capabilities: "beam:coder:param_windowed_value:v1"
   capabilities: "beam:coder:state_backed_iterable:v1"
   capabilities: "beam:coder:custom_window:v1"
   capabilities: "beam:coder:row:v1"
   capabilities: "beam:coder:sharded_key:v1"
   capabilities: "beam:coder:nullable:v1"
   capabilities: "beam:protocol:progress_reporting:v0"
   capabilities: "beam:protocol:harness_monitoring_infos:v1"
   capabilities: "beam:protocol:worker_status:v1"
   capabilities: "beam:combinefn:packed_python:v1"
   capabilities: "beam:version:sdk_base:apache/beam_python3.11_sdk:2.52.0"
   capabilities: "beam:transform:sdf_truncate_sized_restrictions:v1"
   capabilities: "beam:transform:to_string:v1"
   capabilities: "beam:protocol:data_sampling:v1"
   
   23/12/08 09:52:32 INFO GrpcLoggingService: 2 Beam Fn Logging clients still 
connected during shutdown.
   23/12/08 09:52:32 WARN BeamFnDataGrpcMultiplexer: Hanged up for unknown 
endpoint.
   23/12/08 09:52:32 ERROR SerializingExecutor: Exception while executing 
runnable 
org.apache.beam.vendor.grpc.v1p54p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed@721ac343
   java.lang.IllegalStateException: call already closed
           at 
org.apache.beam.vendor.grpc.v1p54p0.com.google.common.base.Preconditions.checkState(Preconditions.java:502)
           at 
org.apache.beam.vendor.grpc.v1p54p0.io.grpc.internal.ServerCallImpl.closeInternal(ServerCallImpl.java:219)
           at 
org.apache.beam.vendor.grpc.v1p54p0.io.grpc.internal.ServerCallImpl.close(ServerCallImpl.java:212)
           at 
org.apache.beam.vendor.grpc.v1p54p0.io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl.onCompleted(ServerCalls.java:395)
           at 
org.apache.beam.runners.fnexecution.state.GrpcStateService$Inbound.onCompleted(GrpcStateService.java:150)
           at 
org.apache.beam.vendor.grpc.v1p54p0.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onHalfClose(ServerCalls.java:273)
           at 
org.apache.beam.vendor.grpc.v1p54p0.io.grpc.PartialForwardingServerCallListener.onHalfClose(PartialForwardingServerCallListener.java:35)
           at 
org.apache.beam.vendor.grpc.v1p54p0.io.grpc.ForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:23)
           at 
org.apache.beam.vendor.grpc.v1p54p0.io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:40)
           at 
org.apache.beam.vendor.grpc.v1p54p0.io.grpc.Contexts$ContextualizedServerCallListener.onHalfClose(Contexts.java:86)
           at 
org.apache.beam.vendor.grpc.v1p54p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:355)
           at 
org.apache.beam.vendor.grpc.v1p54p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:867)
           at 
org.apache.beam.vendor.grpc.v1p54p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
           at 
org.apache.beam.vendor.grpc.v1p54p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
           at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
           at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
           at java.base/java.lang.Thread.run(Unknown Source)
   23/12/08 09:52:33 INFO Executor: Finished task 1.0 in stage 0.0 (TID 1). 
12698 bytes result sent to driver
   ```
   
   and the SDK container shows
   
   ```
   2023/12/08 09:51:24 Starting worker pool 1: python -m 
apache_beam.runners.worker.worker_pool_main --service_port=50000 
--container_executable=/opt/apache/beam/boot
   Starting worker with command ['/opt/apache/beam/boot', '--id=1-1', 
'--logging_endpoint=localhost:43205', '--artifact_endpoint=localhost:37223', 
'--provision_endpoint=localhost:46595', '--control_endpoint=localhost:33659']
   2023/12/08 09:52:19 Provision info:
   
pipeline_options:{fields:{key:"beam:option:allow_non_deterministic_key_coders:v1"
  value:{bool_value:false}}  fields:{key:"beam:option:allow_unsafe_triggers:v1" 
 value:{bool_value:false}}  fields:{key:"beam:option:app_name:v1"  
value:{string_value:"BeamApp-roce3528-1208092121-c24b0136_accc3bf6-1ac5-4869-93df-9a9553150bef"}}
  fields:{key:"beam:option:artifact_port:v1"  value:{string_value:"0"}}  
fields:{key:"beam:option:auto_unique_labels:v1"  value:{bool_value:false}}  
fields:{key:"beam:option:beam_services:v1"  value:{struct_value:{}}}  
fields:{key:"beam:option:cache_disabled:v1"  value:{bool_value:false}}  
fields:{key:"beam:option:dataflow_endpoint:v1"  
value:{string_value:"https://dataflow.googleapis.com"}}  
fields:{key:"beam:option:direct_embed_docker_python:v1"  
value:{bool_value:false}}  fields:{key:"beam:option:direct_num_workers:v1"  
value:{string_value:"1"}}  
fields:{key:"beam:option:direct_runner_bundle_repeat:v1"  
value:{string_value:"0"}}  fields:{key:"beam:option:dire
 ct_runner_use_stacked_bundle:v1"  value:{bool_value:true}}  
fields:{key:"beam:option:direct_running_mode:v1"  
value:{string_value:"in_memory"}}  
fields:{key:"beam:option:direct_test_splits:v1"  value:{struct_value:{}}}  
fields:{key:"beam:option:dry_run:v1"  value:{bool_value:false}}  
fields:{key:"beam:option:enable_artifact_caching:v1"  value:{bool_value:false}} 
 fields:{key:"beam:option:enable_heap_dumps:v1"  value:{bool_value:false}}  
fields:{key:"beam:option:enable_hot_key_logging:v1"  value:{bool_value:false}}  
fields:{key:"beam:option:enable_spark_metric_sinks:v1"  
value:{bool_value:true}}  fields:{key:"beam:option:enable_streaming_engine:v1"  
value:{bool_value:false}}  
fields:{key:"beam:option:environment_cache_millis:v1"  
value:{string_value:"0"}}  fields:{key:"beam:option:environment_config:v1"  
value:{string_value:"localhost:50000"}}  
fields:{key:"beam:option:environment_type:v1"  value:{string_value:"EXTERNAL"}} 
 fields:{key:"beam:option:expansion_port:v1"  value:{string_v
 alue:"0"}}  fields:{key:"beam:option:experiments:v1"  
value:{list_value:{values:{string_value:"beam_fn_api"}}}}  
fields:{key:"beam:option:flink_master:v1"  value:{string_value:"[auto]"}}  
fields:{key:"beam:option:flink_submit_uber_jar:v1"  value:{bool_value:false}}  
fields:{key:"beam:option:flink_version:v1"  value:{string_value:"1.16"}}  
fields:{key:"beam:option:gcp_oauth_scopes:v1"  
value:{list_value:{values:{string_value:"https://www.googleapis.com/auth/bigquery"}
  values:{string_value:"https://www.googleapis.com/auth/cloud-platform"}  
values:{string_value:"https://www.googleapis.com/auth/devstorage.full_control"} 
 values:{string_value:"https://www.googleapis.com/auth/userinfo.email"}  
values:{string_value:"https://www.googleapis.com/auth/datastore"}  
values:{string_value:"https://www.googleapis.com/auth/spanner.admin"}  
values:{string_value:"https://www.googleapis.com/auth/spanner.data"}}}}  
fields:{key:"beam:option:gcs_performance_metrics:v1"  value:{bool_value:false}} 
 fields:
 {key:"beam:option:hdfs_full_urls:v1"  value:{bool_value:false}}  
fields:{key:"beam:option:job_name:v1"  
value:{string_value:"BeamApp-roce3528-1208092121-c24b0136"}}  
fields:{key:"beam:option:job_port:v1"  value:{string_value:"0"}}  
fields:{key:"beam:option:job_server_java_launcher:v1"  
value:{string_value:"java"}}  
fields:{key:"beam:option:job_server_jvm_properties:v1"  value:{list_value:{}}}  
fields:{key:"beam:option:job_server_timeout:v1"  value:{string_value:"60"}}  
fields:{key:"beam:option:load_balance_bundles:v1"  value:{bool_value:false}}  
fields:{key:"beam:option:log_mdc:v1"  value:{bool_value:true}}  
fields:{key:"beam:option:max_cache_memory_usage_mb:v1"  
value:{string_value:"100"}}  fields:{key:"beam:option:max_parallelism:v1"  
value:{string_value:"-1"}}  fields:{key:"beam:option:no_auth:v1"  
value:{bool_value:false}}  fields:{key:"beam:option:options_id:v1"  
value:{number_value:2}}  fields:{key:"beam:option:output_executable_path:v1"  
value:{string_value:"job.jar"}}  field
 s:{key:"beam:option:parallelism:v1"  value:{string_value:"-1"}}  
fields:{key:"beam:option:performance_runtime_type_check:v1"  
value:{bool_value:false}}  fields:{key:"beam:option:pickle_library:v1"  
value:{string_value:"default"}}  
fields:{key:"beam:option:pipeline_type_check:v1"  value:{bool_value:true}}  
fields:{key:"beam:option:profile_cpu:v1"  value:{bool_value:false}}  
fields:{key:"beam:option:profile_memory:v1"  value:{bool_value:false}}  
fields:{key:"beam:option:profile_sample_rate:v1"  value:{number_value:1}}  
fields:{key:"beam:option:requirements_cache_only_sources:v1"  
value:{bool_value:false}}  fields:{key:"beam:option:resource_hints:v1"  
value:{list_value:{}}}  fields:{key:"beam:option:retain_docker_containers:v1"  
value:{bool_value:false}}  fields:{key:"beam:option:runner:v1"  
value:{null_value:NULL_VALUE}}  fields:{key:"beam:option:runtime_type_check:v1" 
 value:{bool_value:false}}  fields:{key:"beam:option:s3_disable_ssl:v1"  
value:{bool_value:false}}  fields:{key:"beam
 :option:save_main_session:v1"  value:{bool_value:false}}  
fields:{key:"beam:option:sdk_location:v1"  value:{string_value:"container"}}  
fields:{key:"beam:option:sdk_worker_parallelism:v1"  value:{string_value:"1"}}  
fields:{key:"beam:option:spark_master:v1"  value:{string_value:"local[4]"}}  
fields:{key:"beam:option:spark_master_url:v1"  value:{string_value:"local[4]"}} 
 fields:{key:"beam:option:spark_submit_uber_jar:v1"  value:{bool_value:false}}  
fields:{key:"beam:option:spark_version:v1"  value:{string_value:"3"}}  
fields:{key:"beam:option:streaming:v1"  value:{bool_value:false}}  
fields:{key:"beam:option:test_mode:v1"  value:{bool_value:false}}  
fields:{key:"beam:option:type_check_additional:v1"  value:{string_value:""}}  
fields:{key:"beam:option:type_check_strictness:v1"  
value:{string_value:"DEFAULT_TO_ANY"}}  fields:{key:"beam:option:update:v1"  
value:{bool_value:false}}  
fields:{key:"beam:option:use_active_spark_session:v1"  
value:{bool_value:false}}  fields:{key:"beam:optio
 n:use_transform_service:v1"  value:{bool_value:false}}  
fields:{key:"beam:option:uses_provided_spark_context:v1"  
value:{bool_value:false}}}  retrieval_token:"__no_artifacts_staged__"  
logging_endpoint:{url:"localhost:43205"}  
artifact_endpoint:{url:"localhost:37223"}  
control_endpoint:{url:"localhost:33659"}  
runner_capabilities:"beam:protocol:control_response_elements_embedding:v1"
   2023/12/08 09:52:32 boot.go: error logging message over FnAPI. endpoint 
localhost:43205 error: EOF message follows
   2023/12/08 09:52:32 DEBUG Python (worker 1-1) exited.
   2023/12/08 09:52:32 boot.go: error logging message over FnAPI. endpoint 
localhost:43205 error: EOF message follows
   2023/12/08 09:52:32 DEBUG Received signal: terminated
   2023/12/08 09:52:32 boot.go: error logging message over FnAPI. endpoint 
localhost:43205 error: EOF message follows
   2023/12/08 09:52:32 DEBUG Cleaned up temporary venv for worker 1-1.
   ```
   
   This is repeated several times with different port numbers (I guess for 
different stages).
   ```
   With the fruit sampling example from the website, I finally get
   
   ```
   2023/12/08 09:53:29 boot.go: error logging message over FnAPI. endpoint 
localhost:43597 error: EOF message follows
   2023/12/08 09:53:29 DEBUG ['🍓 Strawberry', '🥕 Carrot', '🍅 Tomato']
   
   2023/12/08 09:53:29 boot.go: error logging message over FnAPI. endpoint 
localhost:43597 error: EOF message follows
   2023/12/08 09:53:29 DEBUG Python (worker 5-1) exited.
   2023/12/08 09:53:29 boot.go: error logging message over FnAPI. endpoint 
localhost:43597 error: EOF message follows
   2023/12/08 09:53:29 DEBUG Received signal: terminated
   2023/12/08 09:53:29 boot.go: error logging message over FnAPI. endpoint 
localhost:43597 error: EOF message follows
   2023/12/08 09:53:29 DEBUG Cleaned up temporary venv for worker 5-1
   ```
   
   in one of the (two) SDK container outputs, but it's never fed back to the 
Spark executor or the Spark driver, probably due to this log endpoint error.
   
   ### Issue Priority
   
   Priority: 3 (minor)
   
   ### Issue Components
   
   - [X] Component: Python SDK
   - [ ] Component: Java SDK
   - [ ] Component: Go SDK
   - [ ] Component: Typescript SDK
   - [ ] Component: IO connector
   - [ ] Component: Beam YAML
   - [ ] Component: Beam examples
   - [ ] Component: Beam playground
   - [ ] Component: Beam katas
   - [ ] Component: Website
   - [X] Component: Spark Runner
   - [ ] Component: Flink Runner
   - [ ] Component: Samza Runner
   - [ ] Component: Twister2 Runner
   - [ ] Component: Hazelcast Jet Runner
   - [ ] Component: Google Cloud Dataflow Runner


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to