This is an automated email from the ASF dual-hosted git repository.

yangzy pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 67ebbc8c53 [GLUTEN-8455][VL] Port encrypted file checks to shim layer 
(#8501)
67ebbc8c53 is described below

commit 67ebbc8c533298bc2f8e727a543ecae09fa6a460
Author: Arnav Balyan <[email protected]>
AuthorDate: Thu Jan 16 07:16:08 2025 +0530

    [GLUTEN-8455][VL] Port encrypted file checks to shim layer (#8501)
---
 .../apache/gluten/utils/ParquetMetadataUtils.scala |  34 +---
 .../utils/ParquetEncryptionDetectionSuite.scala    | 175 +++++++++++++++++++++
 .../org/apache/gluten/sql/shims/SparkShims.scala   |   5 +
 .../org/apache/gluten/utils/ExceptionUtils.scala   |  43 +++++
 .../gluten/sql/shims/spark32/Spark32Shims.scala    |  21 ++-
 .../gluten/sql/shims/spark33/Spark33Shims.scala    |  21 ++-
 .../gluten/sql/shims/spark34/Spark34Shims.scala    |  20 ++-
 .../gluten/sql/shims/spark35/Spark35Shims.scala    |  10 +-
 8 files changed, 293 insertions(+), 36 deletions(-)

diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/utils/ParquetMetadataUtils.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/utils/ParquetMetadataUtils.scala
index 6a96789d51..9f43575cf9 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/utils/ParquetMetadataUtils.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/utils/ParquetMetadataUtils.scala
@@ -17,6 +17,7 @@
 package org.apache.gluten.utils
 
 import org.apache.gluten.extension.ValidationResult
+import org.apache.gluten.sql.shims.SparkShimLoader
 import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
 import 
org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat.ParquetReadFormat
 
@@ -24,8 +25,6 @@ import org.apache.spark.util.SerializableConfiguration
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileSystem, LocatedFileStatus, Path, 
RemoteIterator}
-import org.apache.parquet.crypto.ParquetCryptoRuntimeException
-import org.apache.parquet.hadoop.ParquetFileReader
 
 object ParquetMetadataUtils {
 
@@ -98,38 +97,9 @@ object ParquetMetadataUtils {
     while (filesIterator.hasNext && checkedFileCount < fileLimit) {
       val fileStatus = filesIterator.next()
       checkedFileCount += 1
-      try {
-        ParquetFileReader.readFooter(conf, fileStatus.getPath).toString
-      } catch {
-        case e: Exception if hasCause(e, 
classOf[ParquetCryptoRuntimeException]) =>
-          return true
-        case e: Exception =>
-      }
-    }
-    false
-  }
-
-  /**
-   * Utility to check the exception for the specified type. Parquet 1.12 does 
not provide direct
-   * utility to check for encryption. Newer versions provide utility to check 
encryption from read
-   * footer which can be used in the future once Spark brings it in.
-   *
-   * @param throwable
-   *   Exception to check
-   * @param causeType
-   *   Class of the cause to look for
-   * @tparam T
-   *   Type of the cause
-   * @return
-   *   True if the cause is found; false otherwise
-   */
-  private def hasCause[T <: Throwable](throwable: Throwable, causeType: 
Class[T]): Boolean = {
-    var current = throwable
-    while (current != null) {
-      if (causeType.isInstance(current)) {
+      if (SparkShimLoader.getSparkShims.isParquetFileEncrypted(fileStatus, 
conf)) {
         return true
       }
-      current = current.getCause
     }
     false
   }
diff --git 
a/backends-velox/src/test/scala/org/apache/gluten/utils/ParquetEncryptionDetectionSuite.scala
 
b/backends-velox/src/test/scala/org/apache/gluten/utils/ParquetEncryptionDetectionSuite.scala
new file mode 100644
index 0000000000..db53c329f6
--- /dev/null
+++ 
b/backends-velox/src/test/scala/org/apache/gluten/utils/ParquetEncryptionDetectionSuite.scala
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.utils
+
+import org.apache.gluten.sql.shims.SparkShimLoader
+
+import org.apache.spark.sql.{GlutenQueryTest, SparkSession}
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileSystem, LocatedFileStatus, Path}
+import org.apache.parquet.crypto.{ColumnEncryptionProperties, 
FileEncryptionProperties}
+import org.apache.parquet.example.data.simple.SimpleGroup
+import org.apache.parquet.hadoop.example.ExampleParquetWriter
+import org.apache.parquet.hadoop.metadata.ColumnPath
+import org.apache.parquet.schema.{MessageType, PrimitiveType, Type, Types}
+import org.junit.Assert._
+
+import java.nio.charset.StandardCharsets
+import java.util.Base64
+
+import scala.collection.JavaConverters._
+
+/**
+ * This suite attempt to test parquet encryption for fallback of scan 
operator. Will check the
+ * following:
+ *   1. Plain Parquet File:
+ *      - Writes a Parquet file with no encryption.
+ *      - Asserts that parquet is not encrypted
+ *
+ * 2. Encrypted Parquet File (with encrypted footer):
+ *   - Writes a Parquet file with column-level encryption and an encrypted 
footer.
+ *   - Asserts that the file is encrypted.
+ *
+ * 3. Encrypted Parquet File (with plaintext footer):
+ *   - Writes a Parquet file with column-level encryption but a plaintext 
(unencrypted) footer.
+ *   - Ensures the file is still detected as encrypted despite the plaintext 
footer.
+ */
+
+class ParquetEncryptionDetectionSuite extends GlutenQueryTest {
+
+  private val masterKey =
+    
Base64.getEncoder.encodeToString("0123456789012345".getBytes(StandardCharsets.UTF_8))
+  private val columnKey =
+    
Base64.getEncoder.encodeToString("1234567890123456".getBytes(StandardCharsets.UTF_8))
+
+  private val schema: MessageType = Types
+    .buildMessage()
+    .addField(
+      Types.primitive(PrimitiveType.PrimitiveTypeName.INT32, 
Type.Repetition.REQUIRED).named("id"))
+    .addField(
+      Types
+        .primitive(PrimitiveType.PrimitiveTypeName.BINARY, 
Type.Repetition.REQUIRED)
+        .named("name"))
+    .named("TestSchema")
+
+  private var _spark: SparkSession = _
+
+  override protected def spark: SparkSession = _spark
+
+  private def writeParquet(
+      path: String,
+      encryptionProperties: Option[FileEncryptionProperties],
+      data: Seq[Map[String, Any]]
+  ): Unit = {
+    val configuration = new Configuration()
+    val writerBuilder = ExampleParquetWriter
+      .builder(new Path(path))
+      .withConf(configuration)
+      .withType(schema)
+      
.withWriteMode(org.apache.parquet.hadoop.ParquetFileWriter.Mode.OVERWRITE)
+
+    encryptionProperties.foreach(writerBuilder.withEncryption)
+
+    val writer = writerBuilder.build()
+    try {
+      data.foreach {
+        row =>
+          val group = new SimpleGroup(schema)
+          row.foreach {
+            case (key, value) =>
+              value match {
+                case i: Int => group.add(key, i)
+                case s: String => group.add(key, s)
+              }
+          }
+          writer.write(group)
+      }
+    } finally {
+      writer.close()
+    }
+  }
+
+  private def getLocatedFileStatus(path: String): LocatedFileStatus = {
+    val conf = new Configuration()
+    val fs = FileSystem.get(conf)
+    fs.listFiles(new Path(path), false).next()
+  }
+
+  testWithSpecifiedSparkVersion(
+    "Detect encrypted Parquet with encrypted footer",
+    Array("3.2", "3.3", "3.4")) {
+    withTempDir {
+      tempDir =>
+        val filePath = s"${tempDir.getAbsolutePath}/encrypted_footer.parquet"
+        val encryptionProps = FileEncryptionProperties
+          .builder(Base64.getDecoder.decode(masterKey))
+          .withEncryptedColumns(
+            Map(
+              ColumnPath.get("name") -> ColumnEncryptionProperties
+                .builder(ColumnPath.get("name"))
+                .withKey(Base64.getDecoder.decode(columnKey))
+                .build()).asJava)
+          .build()
+
+        writeParquet(filePath, Some(encryptionProps), Seq(Map("id" -> 1, 
"name" -> "Alice")))
+        val fileStatus = getLocatedFileStatus(filePath)
+
+        assertTrue(
+          SparkShimLoader.getSparkShims.isParquetFileEncrypted(fileStatus, new 
Configuration()))
+    }
+  }
+
+  testWithSpecifiedSparkVersion(
+    "Detect encrypted Parquet without encrypted footer (plaintext footer)",
+    Array("3.2", "3.3", "3.4")) {
+    withTempDir {
+      tempDir =>
+        val filePath = s"${tempDir.getAbsolutePath}/plaintext_footer.parquet"
+        val encryptionProps = FileEncryptionProperties
+          .builder(Base64.getDecoder.decode(masterKey))
+          .withEncryptedColumns(
+            Map(
+              ColumnPath.get("name") -> ColumnEncryptionProperties
+                .builder(ColumnPath.get("name"))
+                .withKey(Base64.getDecoder.decode(columnKey))
+                .build()).asJava)
+          .withPlaintextFooter()
+          .build()
+
+        writeParquet(filePath, Some(encryptionProps), Seq(Map("id" -> 1, 
"name" -> "Bob")))
+        val fileStatus = getLocatedFileStatus(filePath)
+        assertTrue(
+          SparkShimLoader.getSparkShims.isParquetFileEncrypted(fileStatus, new 
Configuration()))
+    }
+  }
+
+  testWithSpecifiedSparkVersion(
+    "Detect plain (unencrypted) Parquet file",
+    Array("3.2", "3.3", "3.4")) {
+    withTempDir {
+      tempDir =>
+        val filePath = s"${tempDir.getAbsolutePath}/plain.parquet"
+
+        writeParquet(filePath, None, Seq(Map("id" -> 1, "name" -> "Charlie")))
+        val fileStatus = getLocatedFileStatus(filePath)
+
+        assertFalse(
+          SparkShimLoader.getSparkShims.isParquetFileEncrypted(fileStatus, new 
Configuration()))
+    }
+  }
+}
diff --git 
a/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala 
b/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala
index fba6a4a5a4..681e0f583d 100644
--- a/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala
+++ b/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala
@@ -48,7 +48,9 @@ import org.apache.spark.sql.types.{DecimalType, StructType}
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
 import org.apache.spark.storage.{BlockId, BlockManagerId}
 
+import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hadoop.fs.LocatedFileStatus
 import org.apache.parquet.schema.MessageType
 
 import java.util.{Map => JMap, Properties}
@@ -285,4 +287,7 @@ trait SparkShims {
 
   /** Shim method for usages from GlutenExplainUtils.scala. */
   def unsetOperatorId(plan: QueryPlan[_]): Unit
+
+  def isParquetFileEncrypted(fileStatus: LocatedFileStatus, conf: 
Configuration): Boolean
+
 }
diff --git 
a/shims/common/src/main/scala/org/apache/gluten/utils/ExceptionUtils.scala 
b/shims/common/src/main/scala/org/apache/gluten/utils/ExceptionUtils.scala
new file mode 100644
index 0000000000..f86071186d
--- /dev/null
+++ b/shims/common/src/main/scala/org/apache/gluten/utils/ExceptionUtils.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.utils
+
+object ExceptionUtils {
+
+  /**
+   * Utility to check the exception for the specified type.
+   *
+   * @param throwable
+   *   Exception to check
+   * @param causeType
+   *   Class of the cause to look for
+   * @tparam T
+   *   Type of the cause
+   * @return
+   *   True if the cause is found; false otherwise
+   */
+  def hasCause[T <: Throwable](throwable: Throwable, causeType: Class[T]): 
Boolean = {
+    var current = throwable
+    while (current != null) {
+      if (causeType.isInstance(current)) {
+        return true
+      }
+      current = current.getCause
+    }
+    false
+  }
+}
diff --git 
a/shims/spark32/src/main/scala/org/apache/gluten/sql/shims/spark32/Spark32Shims.scala
 
b/shims/spark32/src/main/scala/org/apache/gluten/sql/shims/spark32/Spark32Shims.scala
index 833d2385b0..123f74770b 100644
--- 
a/shims/spark32/src/main/scala/org/apache/gluten/sql/shims/spark32/Spark32Shims.scala
+++ 
b/shims/spark32/src/main/scala/org/apache/gluten/sql/shims/spark32/Spark32Shims.scala
@@ -19,6 +19,7 @@ package org.apache.gluten.sql.shims.spark32
 import org.apache.gluten.execution.datasource.GlutenFormatFactory
 import org.apache.gluten.expression.{ExpressionNames, Sig}
 import org.apache.gluten.sql.shims.{ShimDescriptor, SparkShims}
+import org.apache.gluten.utils.ExceptionUtils
 
 import org.apache.spark.{ShuffleUtils, SparkContext, TaskContext, 
TaskContextUtils}
 import org.apache.spark.scheduler.TaskInfo
@@ -51,7 +52,10 @@ import org.apache.spark.sql.types.{DecimalType, StructField, 
StructType}
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
 import org.apache.spark.storage.{BlockId, BlockManagerId}
 
-import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileStatus, LocatedFileStatus, Path}
+import org.apache.parquet.crypto.ParquetCryptoRuntimeException
+import org.apache.parquet.hadoop.ParquetFileReader
 import org.apache.parquet.schema.MessageType
 
 import java.util.{HashMap => JHashMap, Map => JMap, Properties}
@@ -296,4 +300,19 @@ class Spark32Shims extends SparkShims {
   override def unsetOperatorId(plan: QueryPlan[_]): Unit = {
     plan.unsetTagValue(QueryPlan.OP_ID_TAG)
   }
+
+  override def isParquetFileEncrypted(
+      fileStatus: LocatedFileStatus,
+      conf: Configuration): Boolean = {
+    try {
+      ParquetFileReader.readFooter(new Configuration(), 
fileStatus.getPath).toString
+      false
+    } catch {
+      case e: Exception if ExceptionUtils.hasCause(e, 
classOf[ParquetCryptoRuntimeException]) =>
+        true
+      case e: Throwable =>
+        e.printStackTrace()
+        false
+    }
+  }
 }
diff --git 
a/shims/spark33/src/main/scala/org/apache/gluten/sql/shims/spark33/Spark33Shims.scala
 
b/shims/spark33/src/main/scala/org/apache/gluten/sql/shims/spark33/Spark33Shims.scala
index 2135780d05..aba794a161 100644
--- 
a/shims/spark33/src/main/scala/org/apache/gluten/sql/shims/spark33/Spark33Shims.scala
+++ 
b/shims/spark33/src/main/scala/org/apache/gluten/sql/shims/spark33/Spark33Shims.scala
@@ -20,6 +20,7 @@ import 
org.apache.gluten.execution.datasource.GlutenFormatFactory
 import org.apache.gluten.expression.{ExpressionNames, Sig}
 import org.apache.gluten.expression.ExpressionNames.{CEIL, FLOOR, 
KNOWN_NULLABLE, TIMESTAMP_ADD}
 import org.apache.gluten.sql.shims.{ShimDescriptor, SparkShims}
+import org.apache.gluten.utils.ExceptionUtils
 
 import org.apache.spark._
 import org.apache.spark.scheduler.TaskInfo
@@ -53,7 +54,10 @@ import org.apache.spark.sql.types.{StructField, StructType}
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
 import org.apache.spark.storage.{BlockId, BlockManagerId}
 
-import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileStatus, LocatedFileStatus, Path}
+import org.apache.parquet.crypto.ParquetCryptoRuntimeException
+import org.apache.parquet.hadoop.ParquetFileReader
 import org.apache.parquet.schema.MessageType
 
 import java.time.ZoneOffset
@@ -377,4 +381,19 @@ class Spark33Shims extends SparkShims {
   override def unsetOperatorId(plan: QueryPlan[_]): Unit = {
     plan.unsetTagValue(QueryPlan.OP_ID_TAG)
   }
+  override def isParquetFileEncrypted(
+      fileStatus: LocatedFileStatus,
+      conf: Configuration): Boolean = {
+    try {
+      ParquetFileReader.readFooter(new Configuration(), 
fileStatus.getPath).toString
+      false
+    } catch {
+      case e: Exception if ExceptionUtils.hasCause(e, 
classOf[ParquetCryptoRuntimeException]) =>
+        true
+      case e: Throwable =>
+        e.printStackTrace()
+        false
+    }
+  }
+
 }
diff --git 
a/shims/spark34/src/main/scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala
 
b/shims/spark34/src/main/scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala
index bedad4c017..62a4afe106 100644
--- 
a/shims/spark34/src/main/scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala
+++ 
b/shims/spark34/src/main/scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala
@@ -19,6 +19,7 @@ package org.apache.gluten.sql.shims.spark34
 import org.apache.gluten.expression.{ExpressionNames, Sig}
 import org.apache.gluten.expression.ExpressionNames.KNOWN_NULLABLE
 import org.apache.gluten.sql.shims.{ShimDescriptor, SparkShims}
+import org.apache.gluten.utils.ExceptionUtils
 
 import org.apache.spark._
 import org.apache.spark.broadcast.Broadcast
@@ -55,7 +56,10 @@ import org.apache.spark.sql.types.{IntegerType, LongType, 
StructField, StructTyp
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
 import org.apache.spark.storage.{BlockId, BlockManagerId}
 
-import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileStatus, LocatedFileStatus, Path}
+import org.apache.parquet.crypto.ParquetCryptoRuntimeException
+import org.apache.parquet.hadoop.ParquetFileReader
 import org.apache.parquet.schema.MessageType
 
 import java.time.ZoneOffset
@@ -512,4 +516,18 @@ class Spark34Shims extends SparkShims {
   override def unsetOperatorId(plan: QueryPlan[_]): Unit = {
     plan.unsetTagValue(QueryPlan.OP_ID_TAG)
   }
+  override def isParquetFileEncrypted(
+      fileStatus: LocatedFileStatus,
+      conf: Configuration): Boolean = {
+    try {
+      ParquetFileReader.readFooter(new Configuration(), 
fileStatus.getPath).toString
+      false
+    } catch {
+      case e: Exception if ExceptionUtils.hasCause(e, 
classOf[ParquetCryptoRuntimeException]) =>
+        true
+      case e: Throwable =>
+        e.printStackTrace()
+        false
+    }
+  }
 }
diff --git 
a/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala
 
b/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala
index 43ed51579a..7b272ce993 100644
--- 
a/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala
+++ 
b/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala
@@ -55,7 +55,8 @@ import org.apache.spark.sql.types.{IntegerType, LongType, 
StructField, StructTyp
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
 import org.apache.spark.storage.{BlockId, BlockManagerId}
 
-import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileStatus, LocatedFileStatus, Path}
 import org.apache.parquet.schema.MessageType
 
 import java.time.ZoneOffset
@@ -549,4 +550,11 @@ class Spark35Shims extends SparkShims {
   override def unsetOperatorId(plan: QueryPlan[_]): Unit = {
     QueryPlan.localIdMap.get().remove(plan)
   }
+
+  override def isParquetFileEncrypted(
+      fileStatus: LocatedFileStatus,
+      conf: Configuration): Boolean = {
+    // TODO: Support will be added 
(https://github.com/apache/incubator-gluten/pull/8501)
+    return false
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to