Repository: spark Updated Branches: refs/heads/master 427359f07 -> 6c6950839
[SPARK-22322][CORE] Update FutureAction for compatibility with Scala 2.12 Future ## What changes were proposed in this pull request? Scala 2.12's `Future` defines two new methods to implement, `transform` and `transformWith`. These can be implemented naturally in Spark's `FutureAction` extension and subclasses, but, only in terms of the new methods that don't exist in Scala 2.11. To support both at the same time, reflection is used to implement these. ## How was this patch tested? Existing tests. Author: Sean Owen <[email protected]> Closes #19561 from srowen/SPARK-22322. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6c695083 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6c695083 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6c695083 Branch: refs/heads/master Commit: 6c6950839da991bd41accdb8fb03fbc3b588c1e4 Parents: 427359f Author: Sean Owen <[email protected]> Authored: Wed Oct 25 12:51:20 2017 +0100 Committer: Sean Owen <[email protected]> Committed: Wed Oct 25 12:51:20 2017 +0100 ---------------------------------------------------------------------- .../scala/org/apache/spark/FutureAction.scala | 59 +++++++++++++++++++- pom.xml | 2 +- .../streaming/FlatMapGroupsWithStateSuite.scala | 3 +- .../sql/streaming/StreamingQuerySuite.scala | 2 +- .../streaming/util/FileBasedWriteAheadLog.scala | 2 +- 5 files changed, 62 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/6c695083/core/src/main/scala/org/apache/spark/FutureAction.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/FutureAction.scala b/core/src/main/scala/org/apache/spark/FutureAction.scala index 1034fdc..036c9a6 100644 --- a/core/src/main/scala/org/apache/spark/FutureAction.scala +++ b/core/src/main/scala/org/apache/spark/FutureAction.scala @@ -89,7 +89,11 @@ trait FutureAction[T] extends Future[T] { */ override def value: Option[Try[T]] - // These two methods must be implemented in Scala 2.12, but won't be used by Spark + // These two methods must be implemented in Scala 2.12. They're implemented as a no-op here + // and then filled in with a real implementation in the two subclasses below. The no-op exists + // here so that those implementations can declare "override", necessary in 2.12, while working + // in 2.11, where the method doesn't exist in the superclass. + // After 2.11 support goes away, remove these two: def transform[S](f: (Try[T]) => Try[S])(implicit executor: ExecutionContext): Future[S] = throw new UnsupportedOperationException() @@ -113,6 +117,42 @@ trait FutureAction[T] extends Future[T] { } +/** + * Scala 2.12 defines the two new transform/transformWith methods mentioned above. Impementing + * these for 2.12 in the Spark class here requires delegating to these same methods in an + * underlying Future object. But that only exists in 2.12. But these methods are only called + * in 2.12. So define helper shims to access these methods on a Future by reflection. + */ +private[spark] object FutureAction { + + private val transformTryMethod = + try { + classOf[Future[_]].getMethod("transform", classOf[(_) => _], classOf[ExecutionContext]) + } catch { + case _: NoSuchMethodException => null // Would fail later in 2.11, but not called in 2.11 + } + + private val transformWithTryMethod = + try { + classOf[Future[_]].getMethod("transformWith", classOf[(_) => _], classOf[ExecutionContext]) + } catch { + case _: NoSuchMethodException => null // Would fail later in 2.11, but not called in 2.11 + } + + private[spark] def transform[T, S]( + future: Future[T], + f: (Try[T]) => Try[S], + executor: ExecutionContext): Future[S] = + transformTryMethod.invoke(future, f, executor).asInstanceOf[Future[S]] + + private[spark] def transformWith[T, S]( + future: Future[T], + f: (Try[T]) => Future[S], + executor: ExecutionContext): Future[S] = + transformWithTryMethod.invoke(future, f, executor).asInstanceOf[Future[S]] + +} + /** * A [[FutureAction]] holding the result of an action that triggers a single job. Examples include @@ -153,6 +193,18 @@ class SimpleFutureAction[T] private[spark](jobWaiter: JobWaiter[_], resultFunc: jobWaiter.completionFuture.value.map {res => res.map(_ => resultFunc)} def jobIds: Seq[Int] = Seq(jobWaiter.jobId) + + override def transform[S](f: (Try[T]) => Try[S])(implicit e: ExecutionContext): Future[S] = + FutureAction.transform( + jobWaiter.completionFuture, + (u: Try[Unit]) => f(u.map(_ => resultFunc)), + e) + + override def transformWith[S](f: (Try[T]) => Future[S])(implicit e: ExecutionContext): Future[S] = + FutureAction.transformWith( + jobWaiter.completionFuture, + (u: Try[Unit]) => f(u.map(_ => resultFunc)), + e) } @@ -246,6 +298,11 @@ class ComplexFutureAction[T](run : JobSubmitter => Future[T]) def jobIds: Seq[Int] = subActions.flatMap(_.jobIds) + override def transform[S](f: (Try[T]) => Try[S])(implicit e: ExecutionContext): Future[S] = + FutureAction.transform(p.future, f, e) + + override def transformWith[S](f: (Try[T]) => Future[S])(implicit e: ExecutionContext): Future[S] = + FutureAction.transformWith(p.future, f, e) } http://git-wip-us.apache.org/repos/asf/spark/blob/6c695083/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index b9c9728..2d59f06 100644 --- a/pom.xml +++ b/pom.xml @@ -2692,7 +2692,7 @@ <profile> <id>scala-2.12</id> <properties> - <scala.version>2.12.3</scala.version> + <scala.version>2.12.4</scala.version> <scala.binary.version>2.12</scala.binary.version> </properties> <build> http://git-wip-us.apache.org/repos/asf/spark/blob/6c695083/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala index af08186..b906393 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala @@ -33,7 +33,6 @@ import org.apache.spark.sql.catalyst.streaming.InternalOutputModes._ import org.apache.spark.sql.execution.RDDScanExec import org.apache.spark.sql.execution.streaming.{FlatMapGroupsWithStateExec, GroupStateImpl, MemoryStream} import org.apache.spark.sql.execution.streaming.state.{StateStore, StateStoreId, StateStoreMetrics, UnsafeRowPair} -import org.apache.spark.sql.streaming.FlatMapGroupsWithStateSuite.MemoryStateStore import org.apache.spark.sql.streaming.util.StreamManualClock import org.apache.spark.sql.types.{DataType, IntegerType} @@ -1201,7 +1200,7 @@ object FlatMapGroupsWithStateSuite { } catch { case u: UnsupportedOperationException => return - case _ => + case _: Throwable => throw new TestFailedException("Unexpected exception when trying to get watermark", 20) } throw new TestFailedException("Could get watermark when not expected", 20) http://git-wip-us.apache.org/repos/asf/spark/blob/6c695083/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala index c53889b..cc69390 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala @@ -744,7 +744,7 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi assert(returnedValue === expectedReturnValue, "Returned value does not match expected") } } - AwaitTerminationTester.test(expectedBehavior, awaitTermFunc) + AwaitTerminationTester.test(expectedBehavior, () => awaitTermFunc()) true // If the control reached here, then everything worked as expected } } http://git-wip-us.apache.org/repos/asf/spark/blob/6c695083/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala index d6e15cf..ab7c855 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala @@ -139,7 +139,7 @@ private[streaming] class FileBasedWriteAheadLog( def readFile(file: String): Iterator[ByteBuffer] = { logDebug(s"Creating log reader with $file") val reader = new FileBasedWriteAheadLogReader(file, hadoopConf) - CompletionIterator[ByteBuffer, Iterator[ByteBuffer]](reader, reader.close _) + CompletionIterator[ByteBuffer, Iterator[ByteBuffer]](reader, () => reader.close()) } if (!closeFileAfterWrite) { logFilesToRead.iterator.map(readFile).flatten.asJava --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
