[
https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14699493#comment-14699493
]
ASF GitHub Bot commented on FLINK-1984:
---------------------------------------
Github user rmetzger commented on a diff in the pull request:
https://github.com/apache/flink/pull/948#discussion_r37184472
--- Diff:
flink-mesos/src/main/scala/org/apache/flink/mesos/executor/FlinkExecutor.scala
---
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.executor
+
+import scala.util.{Failure, Success, Try}
+
+import org.apache.flink.configuration.{Configuration, GlobalConfiguration}
+import org.apache.flink.mesos._
+import org.apache.flink.mesos.scheduler._
+import org.apache.flink.runtime.StreamingMode
+import org.apache.log4j.{ConsoleAppender, Level, Logger => ApacheLogger,
PatternLayout}
+import org.apache.mesos.{Executor, ExecutorDriver}
+import org.apache.mesos.Protos._
+
+trait FlinkExecutor extends Executor {
+ // logger to use
+ def LOG: org.slf4j.Logger
+
+ var currentRunningTaskId: Option[TaskID] = None
+ val TASK_MANAGER_LOGGING_LEVEL_KEY = "taskmanager.logging.level"
+ val DEFAULT_TASK_MANAGER_LOGGING_LEVEL = "INFO"
+
+
+ // methods that defines how the task is started when a launchTask is sent
+ def startTask(streamingMode: StreamingMode): Try[Unit]
+
+ var thread: Option[Thread] = None
+ var slaveId: Option[SlaveID] = None
+
+ override def shutdown(driver: ExecutorDriver): Unit = {
+ LOG.info("Killing taskManager thread")
+ // kill task manager thread
+ for (t <- thread) {
+ t.stop()
+ }
+
+ // exit
+ sys.exit(0)
+ }
+
+ override def disconnected(driver: ExecutorDriver): Unit = {}
+
+ override def killTask(driver: ExecutorDriver, taskId: TaskID): Unit = {
+ for (t <- thread) {
+ LOG.info(s"Killing task : ${taskId.getValue}")
+ thread = None
+ currentRunningTaskId = None
+
+ // stop running thread
+ t.stop()
+
+ // Send the TASK_FINISHED status
+ driver.sendStatusUpdate(TaskStatus.newBuilder()
+ .setTaskId(taskId)
+ .setState(TaskState.TASK_FINISHED)
+ .build())
+ }
+ }
+
+
+ override def error(driver: ExecutorDriver, message: String): Unit = {}
+
+ override def frameworkMessage(driver: ExecutorDriver, data:
Array[Byte]): Unit = {}
+
+ override def registered(driver: ExecutorDriver, executorInfo:
ExecutorInfo,
+ frameworkInfo: FrameworkInfo, slaveInfo:
SlaveInfo): Unit = {
+ LOG.info(s"${executorInfo.getName} was registered on slave:
${slaveInfo.getHostname}")
+ slaveId = Some(slaveInfo.getId)
+ // get the configuration passed to it
+ if (executorInfo.hasData) {
+ val newConfig: Configuration =
Utils.deserialize(executorInfo.getData.toByteArray)
+ GlobalConfiguration.includeConfiguration(newConfig)
+ }
+ LOG.debug("Loaded configuration: {}",
GlobalConfiguration.getConfiguration)
+ }
+
+
+ override def reregistered(driver: ExecutorDriver, slaveInfo: SlaveInfo):
Unit = {
+ slaveId = Some(slaveInfo.getId)
+ }
+
+
+ override def launchTask(driver: ExecutorDriver, task: TaskInfo): Unit = {
+ // overlay the new config over this one
+ val taskConf: Configuration =
Utils.deserialize(task.getData.toByteArray)
+ GlobalConfiguration.includeConfiguration(taskConf)
+
+ // reconfigure log4j
+ val logLevel = GlobalConfiguration.getString(
+ TASK_MANAGER_LOGGING_LEVEL_KEY, DEFAULT_TASK_MANAGER_LOGGING_LEVEL)
+
+ initializeLog4j(Level.toLevel(logLevel, Level.DEBUG))
+
+ // get streaming mode
+ val streamingMode = getStreamingMode()
+
+ // create the thread
+ val t = createThread(driver, task.getTaskId, streamingMode)
+ thread = Some(t)
+ t.start()
+
+ // send message
+ driver.sendStatusUpdate(TaskStatus.newBuilder()
+ .setTaskId(task.getTaskId)
+ .setState(TaskState.TASK_RUNNING)
+ .build())
+ }
+
+ def initializeLog4j(level: Level): Unit = {
--- End diff --
this will overwrite all user defined log4j configurations?
I'm not sure if it is still true, but you don't have to use Log4j as a
logging backend for Flink.
We are using SFL4j as the logger API, with pluggable back-ends. Log4j just
happens to be our default logging back end.
I think it would be better to configure log4j using JVM properties. I know
that this probably requires shipping of the configuration files, but that
should be doable.
Are there any services in Mesos to make files available to all executors?
> Integrate Flink with Apache Mesos
> ---------------------------------
>
> Key: FLINK-1984
> URL: https://issues.apache.org/jira/browse/FLINK-1984
> Project: Flink
> Issue Type: New Feature
> Components: New Components
> Reporter: Robert Metzger
> Priority: Minor
> Attachments: 251.patch
>
>
> There are some users asking for an integration of Flink into Mesos.
> There also is a pending pull request for adding Mesos support for Flink:
> https://github.com/apache/flink/pull/251
> But the PR is insufficiently tested. I'll add the code of the pull request to
> this JIRA in case somebody wants to pick it up in the future.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)