This is an automated email from the ASF dual-hosted git repository.
yangzy pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 67ebbc8c53 [GLUTEN-8455][VL] Port encrypted file checks to shim layer
(#8501)
67ebbc8c53 is described below
commit 67ebbc8c533298bc2f8e727a543ecae09fa6a460
Author: Arnav Balyan <[email protected]>
AuthorDate: Thu Jan 16 07:16:08 2025 +0530
[GLUTEN-8455][VL] Port encrypted file checks to shim layer (#8501)
---
.../apache/gluten/utils/ParquetMetadataUtils.scala | 34 +---
.../utils/ParquetEncryptionDetectionSuite.scala | 175 +++++++++++++++++++++
.../org/apache/gluten/sql/shims/SparkShims.scala | 5 +
.../org/apache/gluten/utils/ExceptionUtils.scala | 43 +++++
.../gluten/sql/shims/spark32/Spark32Shims.scala | 21 ++-
.../gluten/sql/shims/spark33/Spark33Shims.scala | 21 ++-
.../gluten/sql/shims/spark34/Spark34Shims.scala | 20 ++-
.../gluten/sql/shims/spark35/Spark35Shims.scala | 10 +-
8 files changed, 293 insertions(+), 36 deletions(-)
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/utils/ParquetMetadataUtils.scala
b/backends-velox/src/main/scala/org/apache/gluten/utils/ParquetMetadataUtils.scala
index 6a96789d51..9f43575cf9 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/utils/ParquetMetadataUtils.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/utils/ParquetMetadataUtils.scala
@@ -17,6 +17,7 @@
package org.apache.gluten.utils
import org.apache.gluten.extension.ValidationResult
+import org.apache.gluten.sql.shims.SparkShimLoader
import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
import
org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat.ParquetReadFormat
@@ -24,8 +25,6 @@ import org.apache.spark.util.SerializableConfiguration
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, LocatedFileStatus, Path,
RemoteIterator}
-import org.apache.parquet.crypto.ParquetCryptoRuntimeException
-import org.apache.parquet.hadoop.ParquetFileReader
object ParquetMetadataUtils {
@@ -98,38 +97,9 @@ object ParquetMetadataUtils {
while (filesIterator.hasNext && checkedFileCount < fileLimit) {
val fileStatus = filesIterator.next()
checkedFileCount += 1
- try {
- ParquetFileReader.readFooter(conf, fileStatus.getPath).toString
- } catch {
- case e: Exception if hasCause(e,
classOf[ParquetCryptoRuntimeException]) =>
- return true
- case e: Exception =>
- }
- }
- false
- }
-
- /**
- * Utility to check the exception for the specified type. Parquet 1.12 does
not provide direct
- * utility to check for encryption. Newer versions provide utility to check
encryption from read
- * footer which can be used in the future once Spark brings it in.
- *
- * @param throwable
- * Exception to check
- * @param causeType
- * Class of the cause to look for
- * @tparam T
- * Type of the cause
- * @return
- * True if the cause is found; false otherwise
- */
- private def hasCause[T <: Throwable](throwable: Throwable, causeType:
Class[T]): Boolean = {
- var current = throwable
- while (current != null) {
- if (causeType.isInstance(current)) {
+ if (SparkShimLoader.getSparkShims.isParquetFileEncrypted(fileStatus,
conf)) {
return true
}
- current = current.getCause
}
false
}
diff --git
a/backends-velox/src/test/scala/org/apache/gluten/utils/ParquetEncryptionDetectionSuite.scala
b/backends-velox/src/test/scala/org/apache/gluten/utils/ParquetEncryptionDetectionSuite.scala
new file mode 100644
index 0000000000..db53c329f6
--- /dev/null
+++
b/backends-velox/src/test/scala/org/apache/gluten/utils/ParquetEncryptionDetectionSuite.scala
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.utils
+
+import org.apache.gluten.sql.shims.SparkShimLoader
+
+import org.apache.spark.sql.{GlutenQueryTest, SparkSession}
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileSystem, LocatedFileStatus, Path}
+import org.apache.parquet.crypto.{ColumnEncryptionProperties,
FileEncryptionProperties}
+import org.apache.parquet.example.data.simple.SimpleGroup
+import org.apache.parquet.hadoop.example.ExampleParquetWriter
+import org.apache.parquet.hadoop.metadata.ColumnPath
+import org.apache.parquet.schema.{MessageType, PrimitiveType, Type, Types}
+import org.junit.Assert._
+
+import java.nio.charset.StandardCharsets
+import java.util.Base64
+
+import scala.collection.JavaConverters._
+
+/**
+ * This suite attempt to test parquet encryption for fallback of scan
operator. Will check the
+ * following:
+ * 1. Plain Parquet File:
+ * - Writes a Parquet file with no encryption.
+ * - Asserts that parquet is not encrypted
+ *
+ * 2. Encrypted Parquet File (with encrypted footer):
+ * - Writes a Parquet file with column-level encryption and an encrypted
footer.
+ * - Asserts that the file is encrypted.
+ *
+ * 3. Encrypted Parquet File (with plaintext footer):
+ * - Writes a Parquet file with column-level encryption but a plaintext
(unencrypted) footer.
+ * - Ensures the file is still detected as encrypted despite the plaintext
footer.
+ */
+
+class ParquetEncryptionDetectionSuite extends GlutenQueryTest {
+
+ private val masterKey =
+
Base64.getEncoder.encodeToString("0123456789012345".getBytes(StandardCharsets.UTF_8))
+ private val columnKey =
+
Base64.getEncoder.encodeToString("1234567890123456".getBytes(StandardCharsets.UTF_8))
+
+ private val schema: MessageType = Types
+ .buildMessage()
+ .addField(
+ Types.primitive(PrimitiveType.PrimitiveTypeName.INT32,
Type.Repetition.REQUIRED).named("id"))
+ .addField(
+ Types
+ .primitive(PrimitiveType.PrimitiveTypeName.BINARY,
Type.Repetition.REQUIRED)
+ .named("name"))
+ .named("TestSchema")
+
+ private var _spark: SparkSession = _
+
+ override protected def spark: SparkSession = _spark
+
+ private def writeParquet(
+ path: String,
+ encryptionProperties: Option[FileEncryptionProperties],
+ data: Seq[Map[String, Any]]
+ ): Unit = {
+ val configuration = new Configuration()
+ val writerBuilder = ExampleParquetWriter
+ .builder(new Path(path))
+ .withConf(configuration)
+ .withType(schema)
+
.withWriteMode(org.apache.parquet.hadoop.ParquetFileWriter.Mode.OVERWRITE)
+
+ encryptionProperties.foreach(writerBuilder.withEncryption)
+
+ val writer = writerBuilder.build()
+ try {
+ data.foreach {
+ row =>
+ val group = new SimpleGroup(schema)
+ row.foreach {
+ case (key, value) =>
+ value match {
+ case i: Int => group.add(key, i)
+ case s: String => group.add(key, s)
+ }
+ }
+ writer.write(group)
+ }
+ } finally {
+ writer.close()
+ }
+ }
+
+ private def getLocatedFileStatus(path: String): LocatedFileStatus = {
+ val conf = new Configuration()
+ val fs = FileSystem.get(conf)
+ fs.listFiles(new Path(path), false).next()
+ }
+
+ testWithSpecifiedSparkVersion(
+ "Detect encrypted Parquet with encrypted footer",
+ Array("3.2", "3.3", "3.4")) {
+ withTempDir {
+ tempDir =>
+ val filePath = s"${tempDir.getAbsolutePath}/encrypted_footer.parquet"
+ val encryptionProps = FileEncryptionProperties
+ .builder(Base64.getDecoder.decode(masterKey))
+ .withEncryptedColumns(
+ Map(
+ ColumnPath.get("name") -> ColumnEncryptionProperties
+ .builder(ColumnPath.get("name"))
+ .withKey(Base64.getDecoder.decode(columnKey))
+ .build()).asJava)
+ .build()
+
+ writeParquet(filePath, Some(encryptionProps), Seq(Map("id" -> 1,
"name" -> "Alice")))
+ val fileStatus = getLocatedFileStatus(filePath)
+
+ assertTrue(
+ SparkShimLoader.getSparkShims.isParquetFileEncrypted(fileStatus, new
Configuration()))
+ }
+ }
+
+ testWithSpecifiedSparkVersion(
+ "Detect encrypted Parquet without encrypted footer (plaintext footer)",
+ Array("3.2", "3.3", "3.4")) {
+ withTempDir {
+ tempDir =>
+ val filePath = s"${tempDir.getAbsolutePath}/plaintext_footer.parquet"
+ val encryptionProps = FileEncryptionProperties
+ .builder(Base64.getDecoder.decode(masterKey))
+ .withEncryptedColumns(
+ Map(
+ ColumnPath.get("name") -> ColumnEncryptionProperties
+ .builder(ColumnPath.get("name"))
+ .withKey(Base64.getDecoder.decode(columnKey))
+ .build()).asJava)
+ .withPlaintextFooter()
+ .build()
+
+ writeParquet(filePath, Some(encryptionProps), Seq(Map("id" -> 1,
"name" -> "Bob")))
+ val fileStatus = getLocatedFileStatus(filePath)
+ assertTrue(
+ SparkShimLoader.getSparkShims.isParquetFileEncrypted(fileStatus, new
Configuration()))
+ }
+ }
+
+ testWithSpecifiedSparkVersion(
+ "Detect plain (unencrypted) Parquet file",
+ Array("3.2", "3.3", "3.4")) {
+ withTempDir {
+ tempDir =>
+ val filePath = s"${tempDir.getAbsolutePath}/plain.parquet"
+
+ writeParquet(filePath, None, Seq(Map("id" -> 1, "name" -> "Charlie")))
+ val fileStatus = getLocatedFileStatus(filePath)
+
+ assertFalse(
+ SparkShimLoader.getSparkShims.isParquetFileEncrypted(fileStatus, new
Configuration()))
+ }
+ }
+}
diff --git
a/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala
b/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala
index fba6a4a5a4..681e0f583d 100644
--- a/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala
+++ b/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala
@@ -48,7 +48,9 @@ import org.apache.spark.sql.types.{DecimalType, StructType}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.storage.{BlockId, BlockManagerId}
+import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hadoop.fs.LocatedFileStatus
import org.apache.parquet.schema.MessageType
import java.util.{Map => JMap, Properties}
@@ -285,4 +287,7 @@ trait SparkShims {
/** Shim method for usages from GlutenExplainUtils.scala. */
def unsetOperatorId(plan: QueryPlan[_]): Unit
+
+ def isParquetFileEncrypted(fileStatus: LocatedFileStatus, conf:
Configuration): Boolean
+
}
diff --git
a/shims/common/src/main/scala/org/apache/gluten/utils/ExceptionUtils.scala
b/shims/common/src/main/scala/org/apache/gluten/utils/ExceptionUtils.scala
new file mode 100644
index 0000000000..f86071186d
--- /dev/null
+++ b/shims/common/src/main/scala/org/apache/gluten/utils/ExceptionUtils.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.utils
+
+object ExceptionUtils {
+
+ /**
+ * Utility to check the exception for the specified type.
+ *
+ * @param throwable
+ * Exception to check
+ * @param causeType
+ * Class of the cause to look for
+ * @tparam T
+ * Type of the cause
+ * @return
+ * True if the cause is found; false otherwise
+ */
+ def hasCause[T <: Throwable](throwable: Throwable, causeType: Class[T]):
Boolean = {
+ var current = throwable
+ while (current != null) {
+ if (causeType.isInstance(current)) {
+ return true
+ }
+ current = current.getCause
+ }
+ false
+ }
+}
diff --git
a/shims/spark32/src/main/scala/org/apache/gluten/sql/shims/spark32/Spark32Shims.scala
b/shims/spark32/src/main/scala/org/apache/gluten/sql/shims/spark32/Spark32Shims.scala
index 833d2385b0..123f74770b 100644
---
a/shims/spark32/src/main/scala/org/apache/gluten/sql/shims/spark32/Spark32Shims.scala
+++
b/shims/spark32/src/main/scala/org/apache/gluten/sql/shims/spark32/Spark32Shims.scala
@@ -19,6 +19,7 @@ package org.apache.gluten.sql.shims.spark32
import org.apache.gluten.execution.datasource.GlutenFormatFactory
import org.apache.gluten.expression.{ExpressionNames, Sig}
import org.apache.gluten.sql.shims.{ShimDescriptor, SparkShims}
+import org.apache.gluten.utils.ExceptionUtils
import org.apache.spark.{ShuffleUtils, SparkContext, TaskContext,
TaskContextUtils}
import org.apache.spark.scheduler.TaskInfo
@@ -51,7 +52,10 @@ import org.apache.spark.sql.types.{DecimalType, StructField,
StructType}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.storage.{BlockId, BlockManagerId}
-import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileStatus, LocatedFileStatus, Path}
+import org.apache.parquet.crypto.ParquetCryptoRuntimeException
+import org.apache.parquet.hadoop.ParquetFileReader
import org.apache.parquet.schema.MessageType
import java.util.{HashMap => JHashMap, Map => JMap, Properties}
@@ -296,4 +300,19 @@ class Spark32Shims extends SparkShims {
override def unsetOperatorId(plan: QueryPlan[_]): Unit = {
plan.unsetTagValue(QueryPlan.OP_ID_TAG)
}
+
+ override def isParquetFileEncrypted(
+ fileStatus: LocatedFileStatus,
+ conf: Configuration): Boolean = {
+ try {
+ ParquetFileReader.readFooter(new Configuration(),
fileStatus.getPath).toString
+ false
+ } catch {
+ case e: Exception if ExceptionUtils.hasCause(e,
classOf[ParquetCryptoRuntimeException]) =>
+ true
+ case e: Throwable =>
+ e.printStackTrace()
+ false
+ }
+ }
}
diff --git
a/shims/spark33/src/main/scala/org/apache/gluten/sql/shims/spark33/Spark33Shims.scala
b/shims/spark33/src/main/scala/org/apache/gluten/sql/shims/spark33/Spark33Shims.scala
index 2135780d05..aba794a161 100644
---
a/shims/spark33/src/main/scala/org/apache/gluten/sql/shims/spark33/Spark33Shims.scala
+++
b/shims/spark33/src/main/scala/org/apache/gluten/sql/shims/spark33/Spark33Shims.scala
@@ -20,6 +20,7 @@ import
org.apache.gluten.execution.datasource.GlutenFormatFactory
import org.apache.gluten.expression.{ExpressionNames, Sig}
import org.apache.gluten.expression.ExpressionNames.{CEIL, FLOOR,
KNOWN_NULLABLE, TIMESTAMP_ADD}
import org.apache.gluten.sql.shims.{ShimDescriptor, SparkShims}
+import org.apache.gluten.utils.ExceptionUtils
import org.apache.spark._
import org.apache.spark.scheduler.TaskInfo
@@ -53,7 +54,10 @@ import org.apache.spark.sql.types.{StructField, StructType}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.storage.{BlockId, BlockManagerId}
-import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileStatus, LocatedFileStatus, Path}
+import org.apache.parquet.crypto.ParquetCryptoRuntimeException
+import org.apache.parquet.hadoop.ParquetFileReader
import org.apache.parquet.schema.MessageType
import java.time.ZoneOffset
@@ -377,4 +381,19 @@ class Spark33Shims extends SparkShims {
override def unsetOperatorId(plan: QueryPlan[_]): Unit = {
plan.unsetTagValue(QueryPlan.OP_ID_TAG)
}
+ override def isParquetFileEncrypted(
+ fileStatus: LocatedFileStatus,
+ conf: Configuration): Boolean = {
+ try {
+ ParquetFileReader.readFooter(new Configuration(),
fileStatus.getPath).toString
+ false
+ } catch {
+ case e: Exception if ExceptionUtils.hasCause(e,
classOf[ParquetCryptoRuntimeException]) =>
+ true
+ case e: Throwable =>
+ e.printStackTrace()
+ false
+ }
+ }
+
}
diff --git
a/shims/spark34/src/main/scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala
b/shims/spark34/src/main/scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala
index bedad4c017..62a4afe106 100644
---
a/shims/spark34/src/main/scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala
+++
b/shims/spark34/src/main/scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala
@@ -19,6 +19,7 @@ package org.apache.gluten.sql.shims.spark34
import org.apache.gluten.expression.{ExpressionNames, Sig}
import org.apache.gluten.expression.ExpressionNames.KNOWN_NULLABLE
import org.apache.gluten.sql.shims.{ShimDescriptor, SparkShims}
+import org.apache.gluten.utils.ExceptionUtils
import org.apache.spark._
import org.apache.spark.broadcast.Broadcast
@@ -55,7 +56,10 @@ import org.apache.spark.sql.types.{IntegerType, LongType,
StructField, StructTyp
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.storage.{BlockId, BlockManagerId}
-import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileStatus, LocatedFileStatus, Path}
+import org.apache.parquet.crypto.ParquetCryptoRuntimeException
+import org.apache.parquet.hadoop.ParquetFileReader
import org.apache.parquet.schema.MessageType
import java.time.ZoneOffset
@@ -512,4 +516,18 @@ class Spark34Shims extends SparkShims {
override def unsetOperatorId(plan: QueryPlan[_]): Unit = {
plan.unsetTagValue(QueryPlan.OP_ID_TAG)
}
+ override def isParquetFileEncrypted(
+ fileStatus: LocatedFileStatus,
+ conf: Configuration): Boolean = {
+ try {
+ ParquetFileReader.readFooter(new Configuration(),
fileStatus.getPath).toString
+ false
+ } catch {
+ case e: Exception if ExceptionUtils.hasCause(e,
classOf[ParquetCryptoRuntimeException]) =>
+ true
+ case e: Throwable =>
+ e.printStackTrace()
+ false
+ }
+ }
}
diff --git
a/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala
b/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala
index 43ed51579a..7b272ce993 100644
---
a/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala
+++
b/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala
@@ -55,7 +55,8 @@ import org.apache.spark.sql.types.{IntegerType, LongType,
StructField, StructTyp
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.storage.{BlockId, BlockManagerId}
-import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileStatus, LocatedFileStatus, Path}
import org.apache.parquet.schema.MessageType
import java.time.ZoneOffset
@@ -549,4 +550,11 @@ class Spark35Shims extends SparkShims {
override def unsetOperatorId(plan: QueryPlan[_]): Unit = {
QueryPlan.localIdMap.get().remove(plan)
}
+
+ override def isParquetFileEncrypted(
+ fileStatus: LocatedFileStatus,
+ conf: Configuration): Boolean = {
+ // TODO: Support will be added
(https://github.com/apache/incubator-gluten/pull/8501)
+ return false
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]