dear all,
Below is the code i execute:
import java.io._
import java.net.{URL, URLClassLoader}
import java.nio.charset.Charset
import java.util.Collections
import java.util.concurrent.atomic.AtomicBoolean
import com.netease.atom.common.util.logging.Logging
import com.netease.atom.interpreter.Code.Code
import com.netease.atom.interpreter.{Code, Interpreter, InterpreterResult,
InterpreterUtils}
import io.netty.buffer._
import org.apache.flink.api.scala.FlinkILoop
import org.apache.flink.client.CliFrontend
import org.apache.flink.client.cli.CliFrontendParser
import org.apache.flink.client.program.ClusterClient
import org.apache.flink.configuration.{QueryableStateOptions, Configuration,
ConfigConstants, GlobalConfiguration}
import org.apache.flink.runtime.akka.AkkaUtils
import org.apache.flink.runtime.minicluster.{StandaloneMiniCluster,
LocalFlinkMiniCluster}
import scala.Console
import scala.beans.BeanProperty
import scala.collection.JavaConversions._
import scala.collection.mutable
import scala.collection.mutable.{ArrayBuffer, ListBuffer}
import scala.runtime.AbstractFunction0
import scala.tools.nsc.Settings
import scala.tools.nsc.interpreter.{IMain, JPrintWriter, Results}
class FlinkInterpreter extends Interpreter {
private var bufferedReader: Option[BufferedReader] = None
private var jprintWriter: JPrintWriter = _
private val config = new Configuration;
private var cluster: LocalFlinkMiniCluster = _
@BeanProperty var imain: IMain = _
@BeanProperty var flinkILoop: FlinkILoop = _
private var out: ByteBufOutputStream = null
private var outBuf: ByteBuf = null
private var in: ByteBufInputStream = _
private var isRunning: AtomicBoolean = new AtomicBoolean(false)
override def isOpen: Boolean = {
isRunning.get()
}
def startLocalMiniCluster(): (String, Int, LocalFlinkMiniCluster) = {
config.toMap.toMap.foreach(println)
config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, 1)
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1)
config.setInteger(ConfigConstants.LOCAL_NUMBER_RESOURCE_MANAGER, 1)
config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 0)
config.setBoolean(QueryableStateOptions.SERVER_ENABLE.key(), true)
config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true)
val localCluster = new LocalFlinkMiniCluster(config, false)
localCluster.start(true)
val port =
AkkaUtils.getAddress(localCluster.jobManagerActorSystems.get.head).port
println(s"Starting local Flink cluster (host: localhost,port:
${localCluster.getLeaderRPCPort}).\n")
("localhost", localCluster.getLeaderRPCPort, localCluster)
}
/**
* Start flink cluster and create interpreter
*/
override def open: Unit = {
outBuf = ByteBufAllocator.DEFAULT.heapBuffer(20480)
out = new ByteBufOutputStream(outBuf)
in = new ByteBufInputStream(outBuf)
// val (host, port, yarnCluster) =
deployNewYarnCluster(YarnConfig(Option(1), None, None, None, Option(1), None))
val (host, port, localCluster) = startLocalMiniCluster()
this.cluster = localCluster
val conf = cluster.configuration
println(s"Connecting to Flink cluster (host:$host,port:$port)...")
flinkILoop = new FlinkILoop(host, port, conf, None)
val settings = new Settings()
settings.usejavacp.value = true
settings.Yreplsync.value = true
flinkILoop.settings_$eq(settings)
flinkILoop.createInterpreter()
imain = flinkILoop.intp
FlinkInterpreter.ourClassloader = imain.classLoader
val benv = flinkILoop.scalaBenv
val senv = flinkILoop.scalaSenv
benv.getConfig.disableSysoutLogging()
senv.getConfig.disableSysoutLogging()
// import libraries
imain.interpret("import scala.tools.nsc.io._")
// imain.interpret("import Properties.userHome")
imain.interpret("import scala.compat.Platform.EOL")
imain.interpret("import org.apache.flink.api.scala._")
imain.interpret("import org.apache.flink.api.common.functions._")
isRunning.set(true)
}
override def interpret(line: String): InterpreterResult = {
if (line == null || line.trim.length == 0) {
return new InterpreterResult(Code.SUCCESS)
}
interpret(line.split("\n"))
}
/**
* Interprete code
* @param lines
* @return
*/
def interpret(lines: Array[String]): InterpreterResult = {
val imain: IMain = getImain
val linesToRun: Array[String] = new Array[String](lines.length + 1)
for (i <- 0 until lines.length) {
linesToRun(i) = lines(i)
}
linesToRun(lines.length) = "print(\"\")"
System.setOut(new PrintStream(out))
out.buffer().clear()
var r: Code = null
var incomplete: String = ""
var inComment: Boolean = false
for (l <- 0 until linesToRun.length) {
val s: String = linesToRun(l)
var continuation: Boolean = false
if (l + 1 < linesToRun.length) {
val nextLine: String = linesToRun(l + 1).trim
if (nextLine.isEmpty ||
nextLine.startsWith("//") ||
nextLine.startsWith("}") ||
nextLine.startsWith("object")) {
continuation = true
} else if (!inComment && nextLine.startsWith("/*")) {
inComment = true
continuation = true
} else if (!inComment && nextLine.lastIndexOf("*/") >= 0) {
inComment = false
continuation = true
} else if (nextLine.length > 1 &&
nextLine.charAt(0) == '.' &&
nextLine.charAt(1) != '.' &&
nextLine.charAt(1) != '/') {
continuation = true
} else if (inComment) {
continuation = true
}
if (continuation) {
incomplete += s + "\n"
}
}
if (!continuation) {
val currentCommand: String = incomplete
var res: Results.Result = null
try {
res = Console.withOut(System.out)(new
AbstractFunction0[Results.Result] {
override def apply() = {
imain.interpret(currentCommand + s)
}
}.apply())
} catch {
case e: Exception =>
logError("Interpreter Exception ", e)
return new InterpreterResult(Code.ERROR,
InterpreterUtils.getMostRelevantMessage(e))
}
r = getResultCode(res)
if (r == Code.ERROR) {
return new InterpreterResult(r, out.toString)
} else if (r eq Code.INCOMPLETE) {
incomplete += s + "\n"
} else {
incomplete = ""
}
}
}
if (r eq Code.INCOMPLETE) {
return new InterpreterResult(r, "Incomplete expression")
}
else {
return new InterpreterResult(r,
out.buffer().toString(Charset.forName("utf-8")))
}
}
private def getResultCode(r: Results.Result): Code = {
if (r.isInstanceOf[Results.Success.type]) {
return Code.SUCCESS
}
else if (r.isInstanceOf[Results.Incomplete.type]) {
return Code.INCOMPLETE
}
else {
return Code.ERROR
}
}
}
}
object FlinkInterpreter extends Logging {
var ourClassloader: ClassLoader = _
def main(args: Array[String]): Unit = {
val interpreter: FlinkInterpreter = new FlinkInterpreter
val code =
"""
|val dataStream = senv.fromElements(1,2,3,4,5)
|dataStream.countWindowAll(2).sum(0).print()
|senv.execute("My streaming program")
""".stripMargin
interpreter.open
val result = interpreter.interpret(code)
}
}
The error messages i got are:
…
…
...
[WARN] [17/09/13 12:04:52] [org.apache.flink.runtime.jobmanager.JobManager]
Discard message
LeaderSessionMessage(00000000-0000-0000-0000-000000000000,SubmitJob(JobGraph(jobId:
1b923a7a54c06ffa1c91d276a45be826),EXECUTION_RESULT_AND_STATE_CHANGES)) because
the expected leader session ID 678ef53b-ff25-4298-b566-9c2d9e7371c7 did not
equal the received leader session ID 00000000-0000-0000-0000-000000000000.
[INFO] [17/09/13 12:05:52]
[org.apache.flink.runtime.client.JobSubmissionClientActor] Terminate
JobClientActor.
[INFO] [17/09/13 12:05:52]
[org.apache.flink.runtime.client.JobSubmissionClientActor] Disconnect from
JobManager Actor[akka.tcp://flink@localhost:63522/user/jobmanager#82627940].
[INFO] [17/09/13 12:05:52]
[akka.remote.RemoteActorRefProvider$RemotingTerminator] Shutting down remote
daemon.
[INFO] [17/09/13 12:05:52]
[akka.remote.RemoteActorRefProvider$RemotingTerminator] Remote daemon shut
down; proceeding with flushing remote transports.
[INFO] [17/09/13 12:05:52]
[akka.remote.RemoteActorRefProvider$RemotingTerminator] Remoting shut down.
org.apache.flink.client.program.ProgramInvocationException: The program
execution failed: Couldn't retrieve the JobExecutionResult from the JobManager.
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:478)
at
org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:105)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:442)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:434)
at
org.apache.flink.streaming.api.environment.RemoteStreamEnvironment.executeRemotely(RemoteStreamEnvironment.java:212)
at
org.apache.flink.api.java.ScalaShellRemoteStreamEnvironment.executeRemotely(ScalaShellRemoteStreamEnvironment.java:87)
at
org.apache.flink.streaming.api.environment.RemoteStreamEnvironment.execute(RemoteStreamEnvironment.java:176)
at
org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:638)
... 34 elided
Caused by: org.apache.flink.runtime.client.JobExecutionException: Couldn't
retrieve the JobExecutionResult from the JobManager.
at
org.apache.flink.runtime.client.JobClient.awaitJobResult(JobClient.java:309)
at
org.apache.flink.runtime.client.JobClient.submitJobAndWait(JobClient.java:396)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:467)
... 41 more
Caused by:
org.apache.flink.runtime.client.JobClientActorSubmissionTimeoutException: Job
submission to the JobManager timed out. You may increase 'akka.client.timeout'
in case the JobManager needs more time to configure and confirm the job
submission.
at
org.apache.flink.runtime.client.JobSubmissionClientActor.handleCustomMessage(JobSubmissionClientActor.java:119)
at
org.apache.flink.runtime.client.JobClientActor.handleMessage(JobClientActor.java:251)
at
org.apache.flink.runtime.akka.FlinkUntypedActor.handleLeaderSessionID(FlinkUntypedActor.java:89)
at
org.apache.flink.runtime.akka.FlinkUntypedActor.onReceive(FlinkUntypedActor.java:68)
at
akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167)
at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)