This is an automated email from the ASF dual-hosted git repository.
hvanhovell pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.5 by this push:
new b2aead9f98d [SPARK-44872][CONNECT][FOLLOWUP] Deflake
ReattachableExecuteSuite and increase retry buffer
b2aead9f98d is described below
commit b2aead9f98d900d139cff41d53f79a37e1e09e81
Author: Juliusz Sompolski <[email protected]>
AuthorDate: Fri Sep 15 19:06:58 2023 -0700
[SPARK-44872][CONNECT][FOLLOWUP] Deflake ReattachableExecuteSuite and
increase retry buffer
### What changes were proposed in this pull request?
Deflake tests in ReattachableExecuteSuite and increase
CONNECT_EXECUTE_REATTACHABLE_OBSERVER_RETRY_BUFFER_SIZE.
### Why are the changes needed?
Two tests could be flaky with errors
`INVALID_CURSOR.POSITION_NOT_AVAILABLE`.
This is caused when a server releases the response when it falls more than
CONNECT_EXECUTE_REATTACHABLE_OBSERVER_RETRY_BUFFER_SIZE behind the latest
response it sent. However, because of HTTP2 flow control, the responses could
still be in transit. In the test suite, we were explicitly disconnecting the
iterators and later reconnect... In some cases they could not reconnect,
because the response they last seen have fallen too fare behind.
This not only changes the suite, but also adjust the default config. This
potentially makes the reconnecting more robust. In normal situation, it should
not lead to increased memory pressure, because the clients also release the
responses using ReleaseExecute as soon as they are received. Normally, buffered
responses should be freed by ReleaseExecute and this retry buffer is only a
fallback mechanism. Therefore, it is safe to increase the default.
In practice, this would only have effect in cases where there are actual
network errors, and the increased buffer size should make the reconnects more
robust in these cases.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
ReattachableExecuteSuite.
Did more manual experiments of how far the response sent by client can be
behind the response sent by server (because of HTTP2 flow control window)
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #42908 from juliuszsompolski/SPARK-44872-followup.
Authored-by: Juliusz Sompolski <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../apache/spark/sql/connect/config/Connect.scala | 2 +-
.../spark/sql/connect/SparkConnectServerTest.scala | 2 +-
.../execution/ReattachableExecuteSuite.scala | 26 +++++++++++++---------
3 files changed, 18 insertions(+), 12 deletions(-)
diff --git
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala
index 7b8b05ce11a..253ac38f9cf 100644
---
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala
+++
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala
@@ -133,7 +133,7 @@ object Connect {
"With any value greater than 0, the last sent response will always
be buffered.")
.version("3.5.0")
.bytesConf(ByteUnit.BYTE)
- .createWithDefaultString("1m")
+ .createWithDefaultString("10m")
val CONNECT_EXTENSIONS_RELATION_CLASSES =
buildStaticConf("spark.connect.extensions.relation.classes")
diff --git
a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/SparkConnectServerTest.scala
b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/SparkConnectServerTest.scala
index 488858d33ea..eddd1c6be72 100644
---
a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/SparkConnectServerTest.scala
+++
b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/SparkConnectServerTest.scala
@@ -35,7 +35,7 @@ import org.apache.spark.sql.test.SharedSparkSession
* Base class and utilities for a test suite that starts and tests the real
SparkConnectService
* with a real SparkConnectClient, communicating over RPC, but both in-process.
*/
-class SparkConnectServerTest extends SharedSparkSession {
+trait SparkConnectServerTest extends SharedSparkSession {
// Server port
val serverPort: Int =
diff --git
a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/execution/ReattachableExecuteSuite.scala
b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/execution/ReattachableExecuteSuite.scala
index 169b15582b6..0e29a07b719 100644
---
a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/execution/ReattachableExecuteSuite.scala
+++
b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/execution/ReattachableExecuteSuite.scala
@@ -22,7 +22,7 @@ import io.grpc.StatusRuntimeException
import org.scalatest.concurrent.Eventually
import org.scalatest.time.SpanSugar._
-import org.apache.spark.SparkException
+import org.apache.spark.{SparkEnv, SparkException}
import org.apache.spark.sql.connect.SparkConnectServerTest
import org.apache.spark.sql.connect.config.Connect
import org.apache.spark.sql.connect.service.SparkConnectService
@@ -32,7 +32,7 @@ class ReattachableExecuteSuite extends SparkConnectServerTest
{
// Tests assume that this query will result in at least a couple
ExecutePlanResponses on the
// stream. If this is no longer the case because of changes in how much is
returned in a single
// ExecutePlanResponse, it may need to be adjusted.
- val MEDIUM_RESULTS_QUERY = "select * from range(1000000)"
+ val MEDIUM_RESULTS_QUERY = "select * from range(10000000)"
test("reattach after initial RPC ends") {
withClient { client =>
@@ -138,13 +138,12 @@ class ReattachableExecuteSuite extends
SparkConnectServerTest {
val reattachIter = stub.reattachExecute(
buildReattachExecuteRequest(operationId,
Some(response.getResponseId)))
assert(reattachIter.hasNext)
- reattachIter.next()
-
- // Nevertheless, the original iterator will handle the
INVALID_CURSOR.DISCONNECTED error
- iter.next()
- // iterator changed because it had to reconnect
- assert(reattachableIter.innerIterator ne initialInnerIter)
}
+
+ // Nevertheless, the original iterator will handle the
INVALID_CURSOR.DISCONNECTED error
+ iter.next()
+ // iterator changed because it had to reconnect
+ assert(reattachableIter.innerIterator ne initialInnerIter)
}
}
@@ -246,19 +245,26 @@ class ReattachableExecuteSuite extends
SparkConnectServerTest {
val iter = stub.executePlan(
buildExecutePlanRequest(buildPlan(MEDIUM_RESULTS_QUERY), operationId =
operationId))
var lastSeenResponse: String = null
+ val serverRetryBuffer = SparkEnv.get.conf
+ .get(Connect.CONNECT_EXECUTE_REATTACHABLE_OBSERVER_RETRY_BUFFER_SIZE)
+ .toLong
iter.hasNext // open iterator
val execution = getExecutionHolder
// after consuming enough from the iterator, server should automatically
start releasing
var lastSeenIndex = 0
- while (iter.hasNext && execution.responseObserver.releasedUntilIndex ==
0) {
+ var totalSizeSeen = 0
+ while (iter.hasNext && totalSizeSeen <= 1.1 * serverRetryBuffer) {
val r = iter.next()
lastSeenResponse = r.getResponseId()
+ totalSizeSeen += r.getSerializedSize
lastSeenIndex += 1
}
assert(iter.hasNext)
- assert(execution.responseObserver.releasedUntilIndex > 0)
+ Eventually.eventually(timeout(eventuallyTimeout)) {
+ assert(execution.responseObserver.releasedUntilIndex > 0)
+ }
// Reattach from the beginning is not available.
val reattach =
stub.reattachExecute(buildReattachExecuteRequest(operationId, None))
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]