This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 5080264ad3022c2d2c4c2c05df741abd2c826ab0
Author: Aljoscha Krettek <[email protected]>
AuthorDate: Tue Oct 2 11:30:06 2018 +0200

    [FLINK-7811] Fix TestingJobManagerLike for Scala 2.12
---
 .../flink/runtime/testingUtils/TestingJobManagerLike.scala  | 13 ++++++++-----
 1 file changed, 8 insertions(+), 5 deletions(-)

diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
index ebe4639..355994f 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
@@ -24,9 +24,8 @@ import java.util.function.BiFunction
 import akka.actor.{ActorRef, Cancellable, Terminated}
 import akka.pattern.{ask, pipe}
 import org.apache.flink.api.common.JobID
-import org.apache.flink.core.fs.FSDataInputStream
+import org.apache.flink.api.common.accumulators.Accumulator
 import org.apache.flink.runtime.FlinkActor
-import org.apache.flink.runtime.checkpoint.savepoint.Savepoint
 import org.apache.flink.runtime.checkpoint._
 import org.apache.flink.runtime.execution.ExecutionState
 import org.apache.flink.runtime.jobgraph.JobStatus
@@ -39,10 +38,11 @@ import org.apache.flink.runtime.messages.Messages.Disconnect
 import 
org.apache.flink.runtime.messages.RegistrationMessages.RegisterTaskManager
 import org.apache.flink.runtime.messages.TaskManagerMessages.Heartbeat
 import org.apache.flink.runtime.state.memory.MemoryStateBackend
-import org.apache.flink.runtime.state.{StateBackend, StateBackendLoader, 
StreamStateHandle}
+import org.apache.flink.runtime.state.StateBackendLoader
 import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages._
 import org.apache.flink.runtime.testingUtils.TestingMessages._
 import 
org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages.AccumulatorsChanged
+import org.apache.flink.util.OptionalFailure
 
 import scala.collection.mutable
 import scala.concurrent.Future
@@ -75,7 +75,7 @@ trait TestingJobManagerLike extends FlinkActor {
   val waitForNumRegisteredTaskManagers = mutable.PriorityQueue.newBuilder(
     new Ordering[(Int, ActorRef)] {
       override def compare(x: (Int, ActorRef), y: (Int, ActorRef)): Int = y._1 
- x._1
-    })
+    }).result()
 
   val waitForClient = scala.collection.mutable.HashSet[ActorRef]()
 
@@ -241,7 +241,10 @@ trait TestingJobManagerLike extends FlinkActor {
         case (jobID, (updated, actors)) if updated =>
           currentJobs.get(jobID) match {
             case Some((graph, jobInfo)) =>
-              val userAccumulators = graph.aggregateUserAccumulators
+              val userAccumulators: java.util.Map[String, 
OptionalFailure[Accumulator[_, _]]] =
+                graph
+                  .aggregateUserAccumulators
+                  .asInstanceOf[java.util.Map[String, 
OptionalFailure[Accumulator[_, _]]]]
               actors foreach {
                  actor => actor ! UpdatedAccumulators(jobID, userAccumulators)
               }

Reply via email to