This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new 22e3433 [SPARK-29999][SS][FOLLOWUP] Fix test to check the actual metadata log directory 22e3433 is described below commit 22e34336da50220073d83768903726e619489942 Author: Jungtaek Lim (HeartSaVioR) <kabhwan.opensou...@gmail.com> AuthorDate: Tue Jun 30 08:09:18 2020 +0000 [SPARK-29999][SS][FOLLOWUP] Fix test to check the actual metadata log directory ### What changes were proposed in this pull request? This patch fixes the missed spot - the test initializes FileStreamSinkLog with its "output" directory instead of "metadata" directory, hence the verification against sink log was no-op. ### Why are the changes needed? Without the fix, the verification against sink log was no-op. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Checked with debugger in test, and verified `allFiles()` returns non-zero entries. (It returned zero entry, as there's no metadata.) Closes #28930 from HeartSaVioR/SPARK-29999-FOLLOWUP-fix-test. Authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan.opensou...@gmail.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> (cherry picked from commit 5472170a2b35864c617bdb846ff7123533765a16) Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../sql/execution/streaming/FileStreamSink.scala | 19 +++++++++++-------- .../spark/sql/streaming/FileStreamSinkSuite.scala | 10 ++++++---- 2 files changed, 17 insertions(+), 12 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala index b679f16..86a3194 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala @@ -45,8 +45,7 @@ object FileStreamSink extends Logging { val hdfsPath = new Path(singlePath) val fs = hdfsPath.getFileSystem(hadoopConf) if (fs.isDirectory(hdfsPath)) { - val metadataPath = new Path(hdfsPath, metadataDir) - checkEscapedMetadataPath(fs, metadataPath, sqlConf) + val metadataPath = getMetadataLogPath(fs, hdfsPath, sqlConf) fs.exists(metadataPath) } else { false @@ -55,6 +54,12 @@ object FileStreamSink extends Logging { } } + def getMetadataLogPath(fs: FileSystem, path: Path, sqlConf: SQLConf): Path = { + val metadataDir = new Path(path, FileStreamSink.metadataDir) + FileStreamSink.checkEscapedMetadataPath(fs, metadataDir, sqlConf) + metadataDir + } + def checkEscapedMetadataPath(fs: FileSystem, metadataPath: Path, sqlConf: SQLConf): Unit = { if (sqlConf.getConf(SQLConf.STREAMING_CHECKPOINT_ESCAPED_PATH_CHECK_ENABLED) && StreamExecution.containsSpecialCharsInPath(metadataPath)) { @@ -125,14 +130,12 @@ class FileStreamSink( partitionColumnNames: Seq[String], options: Map[String, String]) extends Sink with Logging { + import FileStreamSink._ + private val hadoopConf = sparkSession.sessionState.newHadoopConf() private val basePath = new Path(path) - private val logPath = { - val metadataDir = new Path(basePath, FileStreamSink.metadataDir) - val fs = metadataDir.getFileSystem(hadoopConf) - FileStreamSink.checkEscapedMetadataPath(fs, metadataDir, sparkSession.sessionState.conf) - metadataDir - } + private val logPath = getMetadataLogPath(basePath.getFileSystem(hadoopConf), basePath, + sparkSession.sessionState.conf) private val fileLog = new FileStreamSinkLog(FileStreamSinkLog.VERSION, sparkSession, logPath.toString) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala index 8779651..aa2664c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala @@ -555,10 +555,12 @@ abstract class FileStreamSinkSuite extends StreamTest { } } - val fs = new Path(outputDir.getCanonicalPath).getFileSystem( - spark.sessionState.newHadoopConf()) - val sinkLog = new FileStreamSinkLog(FileStreamSinkLog.VERSION, spark, - outputDir.getCanonicalPath) + val outputDirPath = new Path(outputDir.getCanonicalPath) + val hadoopConf = spark.sessionState.newHadoopConf() + val fs = outputDirPath.getFileSystem(hadoopConf) + val logPath = FileStreamSink.getMetadataLogPath(fs, outputDirPath, conf) + + val sinkLog = new FileStreamSinkLog(FileStreamSinkLog.VERSION, spark, logPath.toString) val allFiles = sinkLog.allFiles() // only files from non-empty partition should be logged --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org