This is an automated email from the ASF dual-hosted git repository.

caolu pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/kylin5 by this push:
     new 19435d61aa KYLIN-6054 Filter hadoop make files in file pruner
19435d61aa is described below

commit 19435d61aaa75cdaa7018a629b617066cda28834
Author: jlf <[email protected]>
AuthorDate: Wed Jul 10 16:58:05 2024 +0800

    KYLIN-6054 Filter hadoop make files in file pruner
---
 .../sql/execution/datasource/FilePruner.scala      | 16 +++++++++++++---
 .../sql/execution/datasource/FilePrunerSuite.scala | 22 ++++++++++++++++++++--
 2 files changed, 33 insertions(+), 5 deletions(-)

diff --git 
a/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/FilePruner.scala
 
b/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/FilePruner.scala
index 72d727b65c..0be4d129b2 100644
--- 
a/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/FilePruner.scala
+++ 
b/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/FilePruner.scala
@@ -18,10 +18,8 @@
 
 package org.apache.spark.sql.execution.datasource
 
-import java.sql.{Date, Timestamp}
-import java.util
-
 import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hadoop.mapred.FileOutputCommitter
 import org.apache.kylin.common.exception.TargetSegmentNotFoundException
 import org.apache.kylin.common.util.{DateFormat, HadoopUtil}
 import org.apache.kylin.common.{KapConfig, KylinConfig, QueryContext}
@@ -42,6 +40,8 @@ import org.apache.spark.sql.types.{BooleanType, StructType}
 import org.apache.spark.sql.{AnalysisException, SparkSession}
 import org.apache.spark.util.collection.BitSet
 
+import java.sql.{Date, Timestamp}
+import java.util
 import scala.collection.JavaConverters._
 import scala.collection.mutable
 
@@ -331,6 +331,7 @@ case class FilePruner(val session: SparkSession,
       maybeStatuses.get
     } else {
       val statuses = 
path.getFileSystem(session.sparkContext.hadoopConfiguration).listStatus(path)
+        .filter(fileStatus => !FilePruner.isHadoopMakeFile(fileStatus))
       fsc.putLeafFiles(path, statuses)
       ShardFileStatusCache.refreshSegmentBuildTimeCache(segmentId, 
lastBuildTime)
       statuses
@@ -621,6 +622,15 @@ object FilePruner {
       throw new TargetSegmentNotFoundException(missSegId.toString)
     }
   }
+
+  def isHadoopMakeFile(fileStatus: FileStatus): Boolean = {
+    try {
+      
fileStatus.getPath.getName.equals(FileOutputCommitter.SUCCEEDED_FILE_NAME) ||
+        fileStatus.getPath.getName.equals(FileOutputCommitter.TEMP_DIR_NAME)
+    } catch {
+      case _: Exception => false
+    }
+  }
 }
 
 case class SegFilters(start: Long, end: Long, pattern: String) extends Logging 
{
diff --git 
a/src/spark-project/spark-common/src/test/scala/org/apache/spark/sql/execution/datasource/FilePrunerSuite.scala
 
b/src/spark-project/spark-common/src/test/scala/org/apache/spark/sql/execution/datasource/FilePrunerSuite.scala
index e9dd578710..6fc0f38b18 100644
--- 
a/src/spark-project/spark-common/src/test/scala/org/apache/spark/sql/execution/datasource/FilePrunerSuite.scala
+++ 
b/src/spark-project/spark-common/src/test/scala/org/apache/spark/sql/execution/datasource/FilePrunerSuite.scala
@@ -18,18 +18,21 @@
 
 package org.apache.spark.sql.execution.datasource
 
+
+import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hadoop.mapred.FileOutputCommitter
 import org.apache.kylin.common.exception.TargetSegmentNotFoundException
 import org.apache.kylin.guava30.shaded.common.collect.Sets
 import org.apache.kylin.metadata.cube.model.{NDataSegment, NDataflow}
 import org.apache.kylin.metadata.model.{SegmentStatusEnum, Segments}
-import org.apache.spark.sql.common.SparderBaseFunSuite
 import org.mockito.ArgumentMatchers.any
 import org.mockito.Mockito
+import org.scalatest.funsuite.AnyFunSuite
 
 import java.util
 import scala.collection.JavaConverters._
 
-class FilePrunerSuite extends SparderBaseFunSuite {
+class FilePrunerSuite extends AnyFunSuite {
 
   test("KE-37730: test check segment status") {
     val segment = new NDataSegment
@@ -51,4 +54,19 @@ class FilePrunerSuite extends SparderBaseFunSuite {
     assert(catchEx.getMessage.equals("Cannot find target segment, and missing 
segment id: 2;"))
   }
 
+  test("KE-43799: test check hadoop make file") {
+    
assert(FilePruner.isHadoopMakeFile(setFileStatus(FileOutputCommitter.SUCCEEDED_FILE_NAME)))
+    
assert(FilePruner.isHadoopMakeFile(setFileStatus(FileOutputCommitter.TEMP_DIR_NAME)))
+    assert(!FilePruner.isHadoopMakeFile(setFileStatus("test")))
+    assert(!FilePruner.isHadoopMakeFile(null))
+    assert(!FilePruner.isHadoopMakeFile(new FileStatus))
+
+    def setFileStatus(pathName: String): FileStatus = {
+      val path = Mockito.mock(classOf[Path])
+      Mockito.when(path.getName).thenReturn(pathName)
+      val file = new FileStatus
+      file.setPath(path)
+      file
+    }
+  }
 }

Reply via email to