This is an automated email from the ASF dual-hosted git repository. zsxwing pushed a commit to branch branch-2.3 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-2.3 by this push: new c0fc6d0 Revert "[SPARK-26629][SS] Fixed error with multiple file stream in a query + restart on a batch that has no data for one file stream" c0fc6d0 is described below commit c0fc6d0d8dbd890a817176eb1da6e98252c2e0c0 Author: Shixiong Zhu <zsxw...@gmail.com> AuthorDate: Wed Jan 16 10:03:21 2019 -0800 Revert "[SPARK-26629][SS] Fixed error with multiple file stream in a query + restart on a batch that has no data for one file stream" This reverts commit 5a50ae37f4c41099c174459603966ee25f21ac75. --- .../execution/streaming/FileStreamSourceLog.scala | 4 +- .../sql/execution/streaming/HDFSMetadataLog.scala | 3 +- .../execution/streaming/HDFSMetadataLogSuite.scala | 6 -- .../sql/streaming/FileStreamSourceSuite.scala | 75 ++-------------------- 4 files changed, 8 insertions(+), 80 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala index 7b2ea96..8628471 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala @@ -117,9 +117,7 @@ class FileStreamSourceLog( val batches = (existedBatches ++ retrievedBatches).map(i => i._1 -> i._2.get).toArray.sortBy(_._1) - if (startBatchId <= endBatchId) { - HDFSMetadataLog.verifyBatchIds(batches.map(_._1), startId, endId) - } + HDFSMetadataLog.verifyBatchIds(batches.map(_._1), startId, endId) batches } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala index d4cfbb3..00bc215 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala @@ -457,8 +457,7 @@ object HDFSMetadataLog { } /** - * Verify if batchIds are continuous and between `startId` and `endId` (both inclusive and - * startId assumed to be <= endId). + * Verify if batchIds are continuous and between `startId` and `endId`. * * @param batchIds the sorted ids to verify. * @param startId the start id. If it's set, batchIds should start with this id. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala index 57a0343..4677769 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala @@ -275,12 +275,6 @@ class HDFSMetadataLogSuite extends SparkFunSuite with SharedSQLContext { intercept[IllegalStateException](verifyBatchIds(Seq(2, 3, 4), None, Some(5L))) intercept[IllegalStateException](verifyBatchIds(Seq(2, 3, 4), Some(1L), Some(5L))) intercept[IllegalStateException](verifyBatchIds(Seq(1, 2, 4, 5), Some(1L), Some(5L))) - - // Related to SPARK-26629, this capatures the behavior for verifyBatchIds when startId > endId - intercept[IllegalStateException](verifyBatchIds(Seq(), Some(2L), Some(1L))) - intercept[AssertionError](verifyBatchIds(Seq(2), Some(2L), Some(1L))) - intercept[AssertionError](verifyBatchIds(Seq(1), Some(2L), Some(1L))) - intercept[AssertionError](verifyBatchIds(Seq(0), Some(2L), Some(1L))) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala index fb0b365..d4bd9c7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala @@ -48,33 +48,21 @@ abstract class FileStreamSourceTest * `FileStreamSource` actually being used in the execution. */ abstract class AddFileData extends AddData { - private val _qualifiedBasePath = PrivateMethod[Path]('qualifiedBasePath) - - private def isSamePath(fileSource: FileStreamSource, srcPath: File): Boolean = { - val path = (fileSource invokePrivate _qualifiedBasePath()).toString.stripPrefix("file:") - path == srcPath.getCanonicalPath - } - override def addData(query: Option[StreamExecution]): (Source, Offset) = { require( query.nonEmpty, "Cannot add data when there is no query for finding the active file stream source") val sources = getSourcesFromStreamingQuery(query.get) - val source = if (sources.isEmpty) { + if (sources.isEmpty) { throw new Exception( "Could not find file source in the StreamExecution logical plan to add data to") - } else if (sources.size == 1) { - sources.head - } else { - val matchedSources = sources.filter(isSamePath(_, src)) - if (matchedSources.size != 1) { - throw new Exception( - "Could not select the file source in StreamExecution as there are multiple" + - s" file sources and none / more than one matches $src:\n" + sources.mkString("\n")) - } - matchedSources.head + } else if (sources.size > 1) { + throw new Exception( + "Could not select the file source in the StreamExecution logical plan as there" + + "are multiple file sources:\n\t" + sources.mkString("\n\t")) } + val source = sources.head val newOffset = source.withBatchingLocked { addData(source) new FileStreamSourceOffset(source.currentLogOffset + 1) @@ -83,9 +71,6 @@ abstract class FileStreamSourceTest (source, newOffset) } - /** Source directory to add file data to */ - protected def src: File - protected def addData(source: FileStreamSource): Unit } @@ -1509,54 +1494,6 @@ class FileStreamSourceSuite extends FileStreamSourceTest { newSource.getBatch(None, FileStreamSourceOffset(1)) } } - - test("SPARK-26629: multiple file sources work with restarts when a source does not have data") { - withTempDirs { case (dir, tmp) => - val sourceDir1 = new File(dir, "source1") - val sourceDir2 = new File(dir, "source2") - sourceDir1.mkdirs() - sourceDir2.mkdirs() - - val source1 = createFileStream("text", s"${sourceDir1.getCanonicalPath}") - val source2 = createFileStream("text", s"${sourceDir2.getCanonicalPath}") - val unioned = source1.union(source2) - - def addMultiTextFileData( - source1Content: String, - source2Content: String): StreamAction = { - val actions = Seq( - AddTextFileData(source1Content, sourceDir1, tmp), - AddTextFileData(source2Content, sourceDir2, tmp) - ).filter(_.content != null) // don't write to a source dir if no content specified - StreamProgressLockedActions(actions, desc = actions.mkString("[ ", " | ", " ]")) - } - - testStream(unioned)( - StartStream(), - addMultiTextFileData(source1Content = "source1_0", source2Content = "source2_0"), - CheckNewAnswer("source1_0", "source2_0"), - StopStream, - - StartStream(), - addMultiTextFileData(source1Content = "source1_1", source2Content = null), - CheckNewAnswer("source1_1"), - StopStream, - - // Restart after a batch with one file source having no new data. - // This restart is needed to hit the issue in SPARK-26629. - - StartStream(), - addMultiTextFileData(source1Content = null, source2Content = "source2_2"), - CheckNewAnswer("source2_2"), - StopStream, - - StartStream(), - addMultiTextFileData(source1Content = "source1_3", source2Content = "source2_3"), - CheckNewAnswer("source1_3", "source2_3"), - StopStream - ) - } - } } class FileStreamSourceStressTestSuite extends FileStreamSourceTest { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org