[ 
https://issues.apache.org/jira/browse/SPARK-52965?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18059932#comment-18059932
 ] 

Giambattista Bloisi commented on SPARK-52965:
---------------------------------------------

I came across the same problem with a spark connect using 4.0.2 and deployed on 
kubernetes.

It looks like it is the same problem as reported here [INTERNAL: Encountered 
end-of-stream mid-frame #12412|https://github.com/grpc/grpc-java/issues/12412] 
and I can confirm it was fixed in my case by upgrading to 4.1.1, which has 
updated grpc and netty libraries.

 

> [SPARK-CONNECT] [SPARK-4.0] Encountered end-of-stream mid-frame on Complex 
> Schema Transformation
> ------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-52965
>                 URL: https://issues.apache.org/jira/browse/SPARK-52965
>             Project: Spark
>          Issue Type: Bug
>          Components: Connect, Spark Core
>    Affects Versions: 4.0.0
>         Environment: Environment * {*}Spark version{*}: 4.0.0 (from 
> {{apache/spark:4.0.0-scala2.13-java17-python3-r-ubuntu}} image)
>  * {*}Java version{*}: 17
>  * {*}Scala version{*}: 2.13.x
>  * {*}Platform{*}: Originally observed on AWS EKS 1.30, but fully 
> reproducible locally with Docker.
>  * {*}Node Architecture{*}: {{amd64}} / {{x86_64}}
>  * {*}Node OS/Kernel (original){*}: {{Linux 5.10.225-213.878.amzn2.x86_64}}
>  * {*}Client{*}: PySpark 4.0.0
>            Reporter: Manish Kumar Jagnani
>            Priority: Major
>
> A gRPC connection failure occurs when executing a PySpark DataFrame action 
> that involves a complex, dynamically generated schema transformation. The 
> Spark Connect server terminates the connection, leading to an {{Encountered 
> end-of-stream mid-frame}} error.
> The issue is reproducible in a minimal Docker environment and persists even 
> after eliminating network factors and increasing gRPC message size limits, 
> indicating a potential core issue rather than a configuration problem.
> Steps to Reproduce
> This issue can be fully reproduced locally using the following Docker setup.
> *1. Build the Docker Image*
> Use the following {{Dockerfile}} to create the test environment.
> {code:java}
> # Dockerfile
> FROM apache/spark:4.0.0-scala2.13-java17-python3-r-ubuntu
> USER root
> # Ensure Python and pip are installed
> RUN apt-get update && \
>     apt-get install -y --no-install-recommends python3-pip && \
>     rm -rf /var/lib/apt/lists/*
> USER ${spark_uid} {code}
> Build the image:
>  
> {code:java}
> docker build -t spark-connect-repro . {code}
> *2. Run the Spark Connect Server*
> Start the Spark Connect server in a container. Note that we are including the 
> {{maxInboundMessageSize}} configuration to demonstrate that it does not solve 
> the problem.
> {code:java}
> docker run -d --name spark-connect-server -p 15002:15002 spark-connect-repro \
> /opt/spark/bin/spark-submit \
>   --class org.apache.spark.sql.connect.service.SparkConnectServer \
>   --conf spark.driver.host=0.0.0.0 \
>   --conf spark.driver.bindAddress=0.0.0.0 \
>   --conf spark.connect.grpc.binding.port=15002 \
>   --conf spark.connect.grpc.maxInboundMessageSize=104857600 \
>   spark-internal {code}
> *3. Run the PySpark Client and Execute Failing Code* 
> exec into the running container to start a client session that connects to 
> the server via localhost. This replicates the "same-pod" test scenario.
> {code:java}
> docker exec -it spark-connect-server /bin/bash{code}
> Inside the container's shell, start a {{pyspark}} shell, again including the 
> client-side {{maxMessageSize}} to show it doesn't help.
> pyspark --remote sc://localhost:15002
> Now, paste and run the following Python code in the {{pyspark}} shell.
> {code:java}
> df = 
> spark.read.parquet('s3://prod-data-test/temp/manishjag/income_entropy/synthetic_salary_data.parquet')
> df.show()
> COLUMNS_TO_KEEP = [
>     "user_id",
>     "device_id",
>     "external_id",
>     "sender_address",
>     "account_number",
>     "credit_amount",
>     "description",
>     "transaction_date",
>     "body",
>     "sys_partition_date",
>     "gt_model",
>     "model_version",
> ]
> import pyspark.sql.connect.functions as F
> def transform_schema(df, table_name = None):
>     """
>     Transform DataFrame schema by moving specified columns into a metadata 
> struct column.
>     """
>     # List of columns to keep as is
>     if table_name == 'predicted_transactions':
>         main_columns = COLUMNS_TO_KEEP
>     elif table_name == 'aggregated':
>         main_columns = [col for col in df.columns if col not in 
> ["is_odd_found", "month_count", "months_last_12", "months_last_6", 
> "months_last_3", "total_sal_old", "total_sal_lower", "total_sal_upper", 
> "total_sal_bucket"]]
>     # Get all columns that should go into metadata
>     metadata_columns = [col for col in df.columns if col not in main_columns]
>     # Create a struct with original data types
>     metadata_struct = F.struct(*[
>         F.col(col_name).alias(col_name) 
>         for col_name in metadata_columns
>     ])
>     # Select main columns and add metadata column
>     result_df = df.select(
>         *main_columns,
>         metadata_struct.alias("metadata")
>     )
>     return result_df
> final_agg = transform_schema(df, table_name='aggregated')
> final_agg.show() {code}
> Observed Behavior
> The {{final_agg.show()}} command fails with the following errors.
> *Client-Side Error (in* {{*pyspark*}} *shell):*
> {code:java}
> pyspark.errors.exceptions.connect.SparkConnectGrpcException: 
> <_InactiveRpcError of RPC that terminated with:
>     status = StatusCode.INTERNAL
>     details = "Encountered end-of-stream mid-frame"
>     debug_error_string = "UNKNOWN:Error received from peer  
> {grpc_message:"Encountered end-of-stream mid-frame", grpc_status:13}"
> > {code}
> *Server-Side Log (viewable via docker logs spark-connect-server):*
> {code:java}
> WARN NettyServerStream: Exception processing message
> org.sparkproject.connect.grpc.StatusRuntimeException: INTERNAL: Encountered 
> end-of-stream mid-frame
>     at 
> org.sparkproject.connect.grpc.Status.asRuntimeException(Status.java:524)
>     at 
> org.sparkproject.connect.grpc.internal.AbstractServerStream$TransportState.deframerClosed(AbstractServerStream.java:247)
>     at 
> org.sparkproject.connect.grpc.netty.NettyServerStream$TransportState.deframerClosed(NettyServerStream.java:139)
>     at 
> org.sparkproject.connect.grpc.internal.MessageDeframer.close(MessageDeframer.java:234)
>     at 
> org.sparkproject.connect.grpc.internal.MessageDeframer.deliver(MessageDeframer.java:301)
>     at 
> org.sparkproject.connect.grpc.internal.MessageDeframer.request(MessageDeframer.java:162)
>  {code}
> *Expected Behavior* 
> The final_agg.show() command should execute successfully without any gRPC or 
> network-related errors, printing the transformed DataFrame to the console.
> *Troubleshooting Performed:*
> The issue persists despite the following troubleshooting steps, which rules 
> out common configuration errors: # {*}Isolated Environment{*}: The client 
> ({{{}pyspark{}}} shell) was run inside the same container as the Spark 
> Connect server, connecting via {{{}localhost{}}}. This confirms the issue is 
> not caused by external network components (Load Balancers, Ingress, etc.).
>  # {*}Increased Message Size Limits{*}: Both client 
> ({{{}spark.connect.grpc.maxMessageSize{}}}) and server 
> ({{{}spark.connect.grpc.maxInboundMessageSize{}}}) gRPC message size limits 
> were increased to 100MB, with no change in behavior.
>  # {*}Varied Java Versions{*}: The server was tested with both Java 17 and 
> Java 21.
>  # {*}Varied gRPCio Versions{*}: Multiple versions of the {{grpcio}} library 
> were tested on the client side.
>  # {*}Adjusted Timeout Configurations{*}: Several Spark Connect timeout 
> settings were increased with no effect, including 
> {{spark.connect.session.manager.defaultSessionTimeout}} and 
> {{{}spark.connect.execute.manager.detachedTimeout{}}}.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to