Repository: spark
Updated Branches:
  refs/heads/master 427359f07 -> 6c6950839


[SPARK-22322][CORE] Update FutureAction for compatibility with Scala 2.12 Future

## What changes were proposed in this pull request?

Scala 2.12's `Future` defines two new methods to implement, `transform` and 
`transformWith`. These can be implemented naturally in Spark's `FutureAction` 
extension and subclasses, but, only in terms of the new methods that don't 
exist in Scala 2.11. To support both at the same time, reflection is used to 
implement these.

## How was this patch tested?

Existing tests.

Author: Sean Owen <[email protected]>

Closes #19561 from srowen/SPARK-22322.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6c695083
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6c695083
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6c695083

Branch: refs/heads/master
Commit: 6c6950839da991bd41accdb8fb03fbc3b588c1e4
Parents: 427359f
Author: Sean Owen <[email protected]>
Authored: Wed Oct 25 12:51:20 2017 +0100
Committer: Sean Owen <[email protected]>
Committed: Wed Oct 25 12:51:20 2017 +0100

----------------------------------------------------------------------
 .../scala/org/apache/spark/FutureAction.scala   | 59 +++++++++++++++++++-
 pom.xml                                         |  2 +-
 .../streaming/FlatMapGroupsWithStateSuite.scala |  3 +-
 .../sql/streaming/StreamingQuerySuite.scala     |  2 +-
 .../streaming/util/FileBasedWriteAheadLog.scala |  2 +-
 5 files changed, 62 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/6c695083/core/src/main/scala/org/apache/spark/FutureAction.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/FutureAction.scala 
b/core/src/main/scala/org/apache/spark/FutureAction.scala
index 1034fdc..036c9a6 100644
--- a/core/src/main/scala/org/apache/spark/FutureAction.scala
+++ b/core/src/main/scala/org/apache/spark/FutureAction.scala
@@ -89,7 +89,11 @@ trait FutureAction[T] extends Future[T] {
    */
   override def value: Option[Try[T]]
 
-  // These two methods must be implemented in Scala 2.12, but won't be used by 
Spark
+  // These two methods must be implemented in Scala 2.12. They're implemented 
as a no-op here
+  // and then filled in with a real implementation in the two subclasses 
below. The no-op exists
+  // here so that those implementations can declare "override", necessary in 
2.12, while working
+  // in 2.11, where the method doesn't exist in the superclass.
+  // After 2.11 support goes away, remove these two:
 
   def transform[S](f: (Try[T]) => Try[S])(implicit executor: 
ExecutionContext): Future[S] =
     throw new UnsupportedOperationException()
@@ -113,6 +117,42 @@ trait FutureAction[T] extends Future[T] {
 
 }
 
+/**
+ * Scala 2.12 defines the two new transform/transformWith methods mentioned 
above. Impementing
+ * these for 2.12 in the Spark class here requires delegating to these same 
methods in an
+ * underlying Future object. But that only exists in 2.12. But these methods 
are only called
+ * in 2.12. So define helper shims to access these methods on a Future by 
reflection.
+ */
+private[spark] object FutureAction {
+
+  private val transformTryMethod =
+    try {
+      classOf[Future[_]].getMethod("transform", classOf[(_) => _], 
classOf[ExecutionContext])
+    } catch {
+      case _: NoSuchMethodException => null // Would fail later in 2.11, but 
not called in 2.11
+    }
+
+  private val transformWithTryMethod =
+    try {
+      classOf[Future[_]].getMethod("transformWith", classOf[(_) => _], 
classOf[ExecutionContext])
+    } catch {
+      case _: NoSuchMethodException => null // Would fail later in 2.11, but 
not called in 2.11
+    }
+
+  private[spark] def transform[T, S](
+      future: Future[T],
+      f: (Try[T]) => Try[S],
+      executor: ExecutionContext): Future[S] =
+    transformTryMethod.invoke(future, f, executor).asInstanceOf[Future[S]]
+
+  private[spark] def transformWith[T, S](
+      future: Future[T],
+      f: (Try[T]) => Future[S],
+      executor: ExecutionContext): Future[S] =
+    transformWithTryMethod.invoke(future, f, executor).asInstanceOf[Future[S]]
+
+}
+
 
 /**
  * A [[FutureAction]] holding the result of an action that triggers a single 
job. Examples include
@@ -153,6 +193,18 @@ class SimpleFutureAction[T] private[spark](jobWaiter: 
JobWaiter[_], resultFunc:
     jobWaiter.completionFuture.value.map {res => res.map(_ => resultFunc)}
 
   def jobIds: Seq[Int] = Seq(jobWaiter.jobId)
+
+  override def transform[S](f: (Try[T]) => Try[S])(implicit e: 
ExecutionContext): Future[S] =
+    FutureAction.transform(
+      jobWaiter.completionFuture,
+      (u: Try[Unit]) => f(u.map(_ => resultFunc)),
+      e)
+
+  override def transformWith[S](f: (Try[T]) => Future[S])(implicit e: 
ExecutionContext): Future[S] =
+    FutureAction.transformWith(
+      jobWaiter.completionFuture,
+      (u: Try[Unit]) => f(u.map(_ => resultFunc)),
+      e)
 }
 
 
@@ -246,6 +298,11 @@ class ComplexFutureAction[T](run : JobSubmitter => 
Future[T])
 
   def jobIds: Seq[Int] = subActions.flatMap(_.jobIds)
 
+  override def transform[S](f: (Try[T]) => Try[S])(implicit e: 
ExecutionContext): Future[S] =
+    FutureAction.transform(p.future, f, e)
+
+  override def transformWith[S](f: (Try[T]) => Future[S])(implicit e: 
ExecutionContext): Future[S] =
+    FutureAction.transformWith(p.future, f, e)
 }
 
 

http://git-wip-us.apache.org/repos/asf/spark/blob/6c695083/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index b9c9728..2d59f06 100644
--- a/pom.xml
+++ b/pom.xml
@@ -2692,7 +2692,7 @@
     <profile>
       <id>scala-2.12</id>
       <properties>
-        <scala.version>2.12.3</scala.version>
+        <scala.version>2.12.4</scala.version>
         <scala.binary.version>2.12</scala.binary.version>
       </properties>
       <build>

http://git-wip-us.apache.org/repos/asf/spark/blob/6c695083/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala
index af08186..b906393 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala
@@ -33,7 +33,6 @@ import 
org.apache.spark.sql.catalyst.streaming.InternalOutputModes._
 import org.apache.spark.sql.execution.RDDScanExec
 import org.apache.spark.sql.execution.streaming.{FlatMapGroupsWithStateExec, 
GroupStateImpl, MemoryStream}
 import org.apache.spark.sql.execution.streaming.state.{StateStore, 
StateStoreId, StateStoreMetrics, UnsafeRowPair}
-import 
org.apache.spark.sql.streaming.FlatMapGroupsWithStateSuite.MemoryStateStore
 import org.apache.spark.sql.streaming.util.StreamManualClock
 import org.apache.spark.sql.types.{DataType, IntegerType}
 
@@ -1201,7 +1200,7 @@ object FlatMapGroupsWithStateSuite {
     } catch {
       case u: UnsupportedOperationException =>
         return
-      case _ =>
+      case _: Throwable =>
         throw new TestFailedException("Unexpected exception when trying to get 
watermark", 20)
     }
     throw new TestFailedException("Could get watermark when not expected", 20)

http://git-wip-us.apache.org/repos/asf/spark/blob/6c695083/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
index c53889b..cc69390 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
@@ -744,7 +744,7 @@ class StreamingQuerySuite extends StreamTest with 
BeforeAndAfter with Logging wi
           assert(returnedValue === expectedReturnValue, "Returned value does 
not match expected")
         }
       }
-      AwaitTerminationTester.test(expectedBehavior, awaitTermFunc)
+      AwaitTerminationTester.test(expectedBehavior, () => awaitTermFunc())
       true // If the control reached here, then everything worked as expected
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/6c695083/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
index d6e15cf..ab7c855 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
@@ -139,7 +139,7 @@ private[streaming] class FileBasedWriteAheadLog(
     def readFile(file: String): Iterator[ByteBuffer] = {
       logDebug(s"Creating log reader with $file")
       val reader = new FileBasedWriteAheadLogReader(file, hadoopConf)
-      CompletionIterator[ByteBuffer, Iterator[ByteBuffer]](reader, 
reader.close _)
+      CompletionIterator[ByteBuffer, Iterator[ByteBuffer]](reader, () => 
reader.close())
     }
     if (!closeFileAfterWrite) {
       logFilesToRead.iterator.map(readFile).flatten.asJava


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to