This is an automated email from the ASF dual-hosted git repository.
caolu pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/kylin5 by this push:
new 19435d61aa KYLIN-6054 Filter hadoop make files in file pruner
19435d61aa is described below
commit 19435d61aaa75cdaa7018a629b617066cda28834
Author: jlf <[email protected]>
AuthorDate: Wed Jul 10 16:58:05 2024 +0800
KYLIN-6054 Filter hadoop make files in file pruner
---
.../sql/execution/datasource/FilePruner.scala | 16 +++++++++++++---
.../sql/execution/datasource/FilePrunerSuite.scala | 22 ++++++++++++++++++++--
2 files changed, 33 insertions(+), 5 deletions(-)
diff --git
a/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/FilePruner.scala
b/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/FilePruner.scala
index 72d727b65c..0be4d129b2 100644
---
a/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/FilePruner.scala
+++
b/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/FilePruner.scala
@@ -18,10 +18,8 @@
package org.apache.spark.sql.execution.datasource
-import java.sql.{Date, Timestamp}
-import java.util
-
import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hadoop.mapred.FileOutputCommitter
import org.apache.kylin.common.exception.TargetSegmentNotFoundException
import org.apache.kylin.common.util.{DateFormat, HadoopUtil}
import org.apache.kylin.common.{KapConfig, KylinConfig, QueryContext}
@@ -42,6 +40,8 @@ import org.apache.spark.sql.types.{BooleanType, StructType}
import org.apache.spark.sql.{AnalysisException, SparkSession}
import org.apache.spark.util.collection.BitSet
+import java.sql.{Date, Timestamp}
+import java.util
import scala.collection.JavaConverters._
import scala.collection.mutable
@@ -331,6 +331,7 @@ case class FilePruner(val session: SparkSession,
maybeStatuses.get
} else {
val statuses =
path.getFileSystem(session.sparkContext.hadoopConfiguration).listStatus(path)
+ .filter(fileStatus => !FilePruner.isHadoopMakeFile(fileStatus))
fsc.putLeafFiles(path, statuses)
ShardFileStatusCache.refreshSegmentBuildTimeCache(segmentId,
lastBuildTime)
statuses
@@ -621,6 +622,15 @@ object FilePruner {
throw new TargetSegmentNotFoundException(missSegId.toString)
}
}
+
+ def isHadoopMakeFile(fileStatus: FileStatus): Boolean = {
+ try {
+
fileStatus.getPath.getName.equals(FileOutputCommitter.SUCCEEDED_FILE_NAME) ||
+ fileStatus.getPath.getName.equals(FileOutputCommitter.TEMP_DIR_NAME)
+ } catch {
+ case _: Exception => false
+ }
+ }
}
case class SegFilters(start: Long, end: Long, pattern: String) extends Logging
{
diff --git
a/src/spark-project/spark-common/src/test/scala/org/apache/spark/sql/execution/datasource/FilePrunerSuite.scala
b/src/spark-project/spark-common/src/test/scala/org/apache/spark/sql/execution/datasource/FilePrunerSuite.scala
index e9dd578710..6fc0f38b18 100644
---
a/src/spark-project/spark-common/src/test/scala/org/apache/spark/sql/execution/datasource/FilePrunerSuite.scala
+++
b/src/spark-project/spark-common/src/test/scala/org/apache/spark/sql/execution/datasource/FilePrunerSuite.scala
@@ -18,18 +18,21 @@
package org.apache.spark.sql.execution.datasource
+
+import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hadoop.mapred.FileOutputCommitter
import org.apache.kylin.common.exception.TargetSegmentNotFoundException
import org.apache.kylin.guava30.shaded.common.collect.Sets
import org.apache.kylin.metadata.cube.model.{NDataSegment, NDataflow}
import org.apache.kylin.metadata.model.{SegmentStatusEnum, Segments}
-import org.apache.spark.sql.common.SparderBaseFunSuite
import org.mockito.ArgumentMatchers.any
import org.mockito.Mockito
+import org.scalatest.funsuite.AnyFunSuite
import java.util
import scala.collection.JavaConverters._
-class FilePrunerSuite extends SparderBaseFunSuite {
+class FilePrunerSuite extends AnyFunSuite {
test("KE-37730: test check segment status") {
val segment = new NDataSegment
@@ -51,4 +54,19 @@ class FilePrunerSuite extends SparderBaseFunSuite {
assert(catchEx.getMessage.equals("Cannot find target segment, and missing
segment id: 2;"))
}
+ test("KE-43799: test check hadoop make file") {
+
assert(FilePruner.isHadoopMakeFile(setFileStatus(FileOutputCommitter.SUCCEEDED_FILE_NAME)))
+
assert(FilePruner.isHadoopMakeFile(setFileStatus(FileOutputCommitter.TEMP_DIR_NAME)))
+ assert(!FilePruner.isHadoopMakeFile(setFileStatus("test")))
+ assert(!FilePruner.isHadoopMakeFile(null))
+ assert(!FilePruner.isHadoopMakeFile(new FileStatus))
+
+ def setFileStatus(pathName: String): FileStatus = {
+ val path = Mockito.mock(classOf[Path])
+ Mockito.when(path.getName).thenReturn(pathName)
+ val file = new FileStatus
+ file.setPath(path)
+ file
+ }
+ }
}