This is an automated email from the ASF dual-hosted git repository. aljoscha pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 5080264ad3022c2d2c4c2c05df741abd2c826ab0 Author: Aljoscha Krettek <[email protected]> AuthorDate: Tue Oct 2 11:30:06 2018 +0200 [FLINK-7811] Fix TestingJobManagerLike for Scala 2.12 --- .../flink/runtime/testingUtils/TestingJobManagerLike.scala | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala index ebe4639..355994f 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala @@ -24,9 +24,8 @@ import java.util.function.BiFunction import akka.actor.{ActorRef, Cancellable, Terminated} import akka.pattern.{ask, pipe} import org.apache.flink.api.common.JobID -import org.apache.flink.core.fs.FSDataInputStream +import org.apache.flink.api.common.accumulators.Accumulator import org.apache.flink.runtime.FlinkActor -import org.apache.flink.runtime.checkpoint.savepoint.Savepoint import org.apache.flink.runtime.checkpoint._ import org.apache.flink.runtime.execution.ExecutionState import org.apache.flink.runtime.jobgraph.JobStatus @@ -39,10 +38,11 @@ import org.apache.flink.runtime.messages.Messages.Disconnect import org.apache.flink.runtime.messages.RegistrationMessages.RegisterTaskManager import org.apache.flink.runtime.messages.TaskManagerMessages.Heartbeat import org.apache.flink.runtime.state.memory.MemoryStateBackend -import org.apache.flink.runtime.state.{StateBackend, StateBackendLoader, StreamStateHandle} +import org.apache.flink.runtime.state.StateBackendLoader import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages._ import org.apache.flink.runtime.testingUtils.TestingMessages._ import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages.AccumulatorsChanged +import org.apache.flink.util.OptionalFailure import scala.collection.mutable import scala.concurrent.Future @@ -75,7 +75,7 @@ trait TestingJobManagerLike extends FlinkActor { val waitForNumRegisteredTaskManagers = mutable.PriorityQueue.newBuilder( new Ordering[(Int, ActorRef)] { override def compare(x: (Int, ActorRef), y: (Int, ActorRef)): Int = y._1 - x._1 - }) + }).result() val waitForClient = scala.collection.mutable.HashSet[ActorRef]() @@ -241,7 +241,10 @@ trait TestingJobManagerLike extends FlinkActor { case (jobID, (updated, actors)) if updated => currentJobs.get(jobID) match { case Some((graph, jobInfo)) => - val userAccumulators = graph.aggregateUserAccumulators + val userAccumulators: java.util.Map[String, OptionalFailure[Accumulator[_, _]]] = + graph + .aggregateUserAccumulators + .asInstanceOf[java.util.Map[String, OptionalFailure[Accumulator[_, _]]]] actors foreach { actor => actor ! UpdatedAccumulators(jobID, userAccumulators) }
