This is an automated email from the ASF dual-hosted git repository.
yangjie01 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 78b6cda9de51 [SPARK-55247][CONNECT] Clean up deprecated API usage
related to `o.a.c.io.input.BoundedInputStream`
78b6cda9de51 is described below
commit 78b6cda9de51517b8a524d311188f110ba6038b1
Author: yangjie01 <[email protected]>
AuthorDate: Wed Jan 28 14:21:18 2026 +0800
[SPARK-55247][CONNECT] Clean up deprecated API usage related to
`o.a.c.io.input.BoundedInputStream`
### What changes were proposed in this pull request?
This pr aims to clean up deprecated API usage related to
`o.a.c.io.input.BoundedInputStream`:
- The usage of the recommended Builder pattern mentions the deprecated
constructor. This change refers to
https://github.com/apache/commons-io/blob/rel/commons-io-2.21.0/src/main/java/org/apache/commons/io/input/BoundedInputStream.java#L345C1-L357
```
/**
* Constructs a new {link BoundedInputStream} that wraps the given
input stream and limits it to a certain size.
*
* param inputStream The wrapped input stream.
* param maxCount The maximum number of bytes to return, negative
means unbound.
* deprecated Use {link AbstractBuilder#get()}.
*/
Deprecated
public BoundedInputStream(final InputStream inputStream, final long
maxCount) {
// Some badly designed methods, for example the Servlet API,
overload length
// such that "-1" means stream finished
this(inputStream, builder().setMaxCount(maxCount));
}
```
### Why are the changes needed?
Cleanup deprecated API usage.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Pass Github Actions
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #54023 from LuciferYang/commons-io-deprecated.
Authored-by: yangjie01 <[email protected]>
Signed-off-by: yangjie01 <[email protected]>
---
.../spark/sql/connect/utils/PlanCompressionUtils.scala | 13 ++++++++-----
1 file changed, 8 insertions(+), 5 deletions(-)
diff --git
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/utils/PlanCompressionUtils.scala
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/utils/PlanCompressionUtils.scala
index 708ef1ee6558..74d3edd2975a 100644
---
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/utils/PlanCompressionUtils.scala
+++
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/utils/PlanCompressionUtils.scala
@@ -17,7 +17,7 @@
package org.apache.spark.sql.connect.utils
-import java.io.IOException
+import java.lang.{Long => JLong}
import scala.util.control.NonFatal
@@ -102,14 +102,17 @@ object PlanCompressionUtils {
// Create a bounded input stream to limit the decompressed output size to
avoid decompression
// bomb attacks.
- val boundedStream = new BoundedInputStream(zstdStream, maxOutputSize) {
- @throws[IOException]
- override protected def onMaxLength(maxBytes: Long, count: Long): Unit =
+ val boundedStream = BoundedInputStream
+ .builder()
+ .setInputStream(zstdStream)
+ .setMaxCount(maxOutputSize)
+ .setOnMaxCount((_: JLong, _: JLong) => {
throw new SparkSQLException(
errorClass = "CONNECT_INVALID_PLAN.PLAN_SIZE_LARGER_THAN_MAX",
messageParameters =
Map("planSize" -> "unknown", "maxPlanSize" ->
maxOutputSize.toString))
- }
+ })
+ .get()
val cis = CodedInputStream.newInstance(boundedStream)
cis.setSizeLimit(Integer.MAX_VALUE)
cis.setRecursionLimit(SparkEnv.get.conf.get(Connect.CONNECT_GRPC_MARSHALLER_RECURSION_LIMIT))
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]