This is an automated email from the ASF dual-hosted git repository. hvanhovell pushed a commit to branch branch-3.4 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.4 by this push: new 516a202e3ed [SPARK-42721][CONNECT] RPC logging interceptor 516a202e3ed is described below commit 516a202e3ed778487f9d2eaaeb6603193eb0a7b6 Author: Raghu Angadi <raghu.ang...@databricks.com> AuthorDate: Fri Mar 10 19:51:55 2023 -0400 [SPARK-42721][CONNECT] RPC logging interceptor ### What changes were proposed in this pull request? This adds an gRPC interceptor in spark-connect server. It logs all the incoming RPC requests and responses. - How to enable: Set interceptor config. e.g. ./sbin/start-connect-server.sh --conf spark.connect.grpc.interceptor.classes=org.apache.spark.sql.connect.service.LoggingInterceptor --jars connector/connect/server/target/spark-connect_*-SNAPSHOT.jar - Sample output: 23/03/08 10:54:37 INFO LoggingInterceptor: Received RPC Request spark.connect.SparkConnectService/ExecutePlan (id 1868663481): { "client_id": "6844bc44-4411-4481-8109-a10e3a836f97", "user_context": { "user_id": "raghu" }, "plan": { "root": { "common": { "plan_id": "37" }, "show_string": { "input": { "common": { "plan_id": "36" }, "read": { "data_source": { "format": "csv", "schema": "", "paths": ["file:///tmp/x-in"] } } }, "num_rows": 20, "truncate": 20 } } }, "client_type": "_SPARK_CONNECT_PYTHON" } ### Why are the changes needed? This is useful in development. It might be useful to debug some problems in production as well. ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? - Manually in development - Unit test Closes #40342 from rangadi/logging-interceptor. Authored-by: Raghu Angadi <raghu.ang...@databricks.com> Signed-off-by: Herman van Hovell <her...@databricks.com> (cherry picked from commit 19cb8d7014e03d828794a637bc67d09fc84650ad) Signed-off-by: Herman van Hovell <her...@databricks.com> --- connector/connect/server/pom.xml | 6 ++ .../sql/connect/service/LoggingInterceptor.scala | 75 ++++++++++++++++++++++ .../connect/service/InterceptorRegistrySuite.scala | 9 +++ 3 files changed, 90 insertions(+) diff --git a/connector/connect/server/pom.xml b/connector/connect/server/pom.xml index a2aff8f9f31..302f6590fd2 100644 --- a/connector/connect/server/pom.xml +++ b/connector/connect/server/pom.xml @@ -155,6 +155,12 @@ <version>${protobuf.version}</version> <scope>compile</scope> </dependency> + <dependency> + <groupId>com.google.protobuf</groupId> + <artifactId>protobuf-java-util</artifactId> + <version>${protobuf.version}</version> + <scope>compile</scope> + </dependency> <dependency> <groupId>io.grpc</groupId> <artifactId>grpc-netty</artifactId> diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/LoggingInterceptor.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/LoggingInterceptor.scala new file mode 100644 index 00000000000..c91075fd127 --- /dev/null +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/LoggingInterceptor.scala @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connect.service + +import scala.util.Random + +import com.google.protobuf.Message +import com.google.protobuf.util.JsonFormat +import io.grpc.ForwardingServerCall.SimpleForwardingServerCall +import io.grpc.ForwardingServerCallListener.SimpleForwardingServerCallListener +import io.grpc.Metadata +import io.grpc.ServerCall +import io.grpc.ServerCallHandler +import io.grpc.ServerInterceptor + +import org.apache.spark.internal.Logging + +/** + * A gRPC interceptor to log RPC requests and responses. It logs the protobufs as JSON. + * Useful for local development. An ID is logged for each RPC so that requests and corresponding + * responses can be exactly matched. + */ +class LoggingInterceptor extends ServerInterceptor with Logging { + + private val jsonPrinter = JsonFormat.printer().preservingProtoFieldNames() + + private def logProto[T](description: String, message: T): Unit = { + message match { + case m: Message => + logInfo(s"$description:\n${jsonPrinter.print(m)}") + case other => + logInfo(s"$description: (Unknown message type) $other") + } + } + + override def interceptCall[ReqT, RespT]( + call: ServerCall[ReqT, RespT], + headers: Metadata, + next: ServerCallHandler[ReqT, RespT]): ServerCall.Listener[ReqT] = { + + val id = Random.nextInt(Int.MaxValue) // Assign a random id for this RPC. + val desc = s"${call.getMethodDescriptor.getFullMethodName} (id $id)" + + val respLoggingCall = new SimpleForwardingServerCall[ReqT, RespT](call) { + override def sendMessage(message: RespT): Unit = { + logProto(s"Responding to RPC $desc", message) + super.sendMessage(message) + } + } + + val listener = next.startCall(respLoggingCall, headers) + + new SimpleForwardingServerCallListener[ReqT](listener) { + override def onMessage(message: ReqT): Unit = { + logProto(s"Received RPC request $desc", message) + super.onMessage(message) + } + } + } +} diff --git a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/service/InterceptorRegistrySuite.scala b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/service/InterceptorRegistrySuite.scala index fb1b3bb9df1..7f85966f0a7 100644 --- a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/service/InterceptorRegistrySuite.scala +++ b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/service/InterceptorRegistrySuite.scala @@ -175,4 +175,13 @@ class InterceptorRegistrySuite extends SharedSparkSession { } } + test("LoggingInterceptor initializes when configured in spark conf") { + withSparkConf( + Connect.CONNECT_GRPC_INTERCEPTOR_CLASSES.key -> + "org.apache.spark.sql.connect.service.LoggingInterceptor") { + val interceptors = SparkConnectInterceptorRegistry.createConfiguredInterceptors() + assert(interceptors.size == 1) + assert(interceptors.head.isInstanceOf[LoggingInterceptor]) + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org