This is an automated email from the ASF dual-hosted git repository.
yao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 8d3d4f9b900d [SPARK-47307][SQL] Add a config to optionally chunk
base64 strings
8d3d4f9b900d is described below
commit 8d3d4f9b900dadede3b8e33af830e5ef66682923
Author: Ted Jenks <[email protected]>
AuthorDate: Fri Jul 12 18:33:20 2024 +0800
[SPARK-47307][SQL] Add a config to optionally chunk base64 strings
Follow up #45408
### What changes were proposed in this pull request?
[[SPARK-47307](https://issues.apache.org/jira/browse/SPARK-47307)] Add a
config to optionally chunk base64 strings
### Why are the changes needed?
In #35110, it was incorrectly asserted that:
> ApacheCommonBase64 obeys http://www.ietf.org/rfc/rfc2045.txt
This is not true as the previous code called:
```java
public static byte[] encodeBase64(byte[] binaryData)
```
Which states:
> Encodes binary data using the base64 algorithm but does not chunk the
output.
However, the RFC 2045 (MIME) base64 encoder does chunk by default. This now
means that any Spark encoded base64 strings cannot be decoded by encoders that
do not implement RFC 2045. The docs state RFC 4648.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Existing test suite.
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #47303 from wForget/SPARK-47307.
Lead-authored-by: Ted Jenks <[email protected]>
Co-authored-by: wforget <[email protected]>
Co-authored-by: Kent Yao <[email protected]>
Co-authored-by: Ted Chester Jenks <[email protected]>
Signed-off-by: Kent Yao <[email protected]>
---
.../explain-results/function_base64.explain | 2 +-
.../catalyst/expressions/stringExpressions.scala | 40 +++++++++++++++-------
.../org/apache/spark/sql/internal/SQLConf.scala | 11 ++++++
.../expressions/StringExpressionsSuite.scala | 13 +++++++
4 files changed, 53 insertions(+), 13 deletions(-)
diff --git
a/connect/common/src/test/resources/query-tests/explain-results/function_base64.explain
b/connect/common/src/test/resources/query-tests/explain-results/function_base64.explain
index f80f3522190d..d3a250919ea5 100644
---
a/connect/common/src/test/resources/query-tests/explain-results/function_base64.explain
+++
b/connect/common/src/test/resources/query-tests/explain-results/function_base64.explain
@@ -1,2 +1,2 @@
-Project [base64(cast(g#0 as binary)) AS base64(CAST(g AS BINARY))#0]
+Project [static_invoke(Base64.encode(cast(g#0 as binary), false)) AS
base64(CAST(g AS BINARY))#0]
+- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala
index f25f58731c8c..b188b9c2630f 100755
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala
@@ -2682,24 +2682,40 @@ case class Chr(child: Expression)
""",
since = "1.5.0",
group = "string_funcs")
-case class Base64(child: Expression)
- extends UnaryExpression with ImplicitCastInputTypes with NullIntolerant {
+case class Base64(child: Expression, chunkBase64: Boolean)
+ extends UnaryExpression with RuntimeReplaceable with ImplicitCastInputTypes {
+
+ def this(expr: Expression) = this(expr, SQLConf.get.chunkBase64StringEnabled)
override def dataType: DataType = SQLConf.get.defaultStringType
override def inputTypes: Seq[DataType] = Seq(BinaryType)
- protected override def nullSafeEval(bytes: Any): Any = {
-
UTF8String.fromBytes(JBase64.getMimeEncoder.encode(bytes.asInstanceOf[Array[Byte]]))
- }
+ override def replacement: Expression = StaticInvoke(
+ classOf[Base64],
+ dataType,
+ "encode",
+ Seq(child, Literal(chunkBase64, BooleanType)),
+ Seq(BinaryType, BooleanType))
- override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
- nullSafeCodeGen(ctx, ev, (child) => {
- s"""${ev.value} = UTF8String.fromBytes(
- ${classOf[JBase64].getName}.getMimeEncoder().encode($child));
- """})
- }
+ override def toString: String = s"$prettyName($child)"
- override protected def withNewChildInternal(newChild: Expression): Base64 =
copy(child = newChild)
+ override protected def withNewChildInternal(newChild: Expression):
Expression =
+ copy(child = newChild)
+}
+
+object Base64 {
+ def apply(expr: Expression): Base64 = new Base64(expr)
+
+ private lazy val nonChunkEncoder = JBase64.getMimeEncoder(-1, Array())
+
+ def encode(input: Array[Byte], chunkBase64: Boolean): UTF8String = {
+ val encoder = if (chunkBase64) {
+ JBase64.getMimeEncoder
+ } else {
+ nonChunkEncoder
+ }
+ UTF8String.fromBytes(encoder.encode(input))
+ }
}
/**
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 6ca831f99304..65beb21d59d9 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -3525,6 +3525,15 @@ object SQLConf {
.booleanConf
.createWithDefault(false)
+ val CHUNK_BASE64_STRING_ENABLED =
buildConf("spark.sql.legacy.chunkBase64String.enabled")
+ .internal()
+ .doc("Whether to truncate string generated by the `Base64` function. When
true, base64" +
+ " strings generated by the base64 function are chunked into lines of at
most 76" +
+ " characters. When false, the base64 strings are not chunked.")
+ .version("3.5.2")
+ .booleanConf
+ .createWithDefault(false)
+
val ENABLE_DEFAULT_COLUMNS =
buildConf("spark.sql.defaultColumn.enabled")
.internal()
@@ -5856,6 +5865,8 @@ class SQLConf extends Serializable with Logging with
SqlApiConf {
def ansiRelationPrecedence: Boolean = ansiEnabled &&
getConf(ANSI_RELATION_PRECEDENCE)
+ def chunkBase64StringEnabled: Boolean = getConf(CHUNK_BASE64_STRING_ENABLED)
+
def timestampType: AtomicType = getConf(TIMESTAMP_TYPE) match {
case "TIMESTAMP_LTZ" =>
// For historical reason, the TimestampType maps to TIMESTAMP WITH LOCAL
TIME ZONE
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/StringExpressionsSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/StringExpressionsSuite.scala
index ebd724543481..2ad8652f2b31 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/StringExpressionsSuite.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/StringExpressionsSuite.scala
@@ -509,6 +509,19 @@ class StringExpressionsSuite extends SparkFunSuite with
ExpressionEvalHelper {
GenerateUnsafeProjection.generate(StringDecode(b,
Literal("\"quote")).replacement :: Nil)
}
+ test("SPARK-47307: base64 encoding without chunking") {
+ val longString = "a" * 58
+ val encoded =
"YWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYQ=="
+ withSQLConf(SQLConf.CHUNK_BASE64_STRING_ENABLED.key -> "false") {
+ checkEvaluation(Base64(Literal(longString.getBytes)), encoded)
+ }
+ val chunkEncoded =
+
s"YWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFh\r\nYQ=="
+ withSQLConf(SQLConf.CHUNK_BASE64_STRING_ENABLED.key -> "true") {
+ checkEvaluation(Base64(Literal(longString.getBytes)), chunkEncoded)
+ }
+ }
+
test("initcap unit test") {
checkEvaluation(InitCap(Literal.create(null, StringType)), null)
checkEvaluation(InitCap(Literal("a b")), "A B")
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]