[
https://issues.apache.org/jira/browse/TEZ-2003?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14680784#comment-14680784
]
Bikas Saha commented on TEZ-2003:
---------------------------------
Some initial comments on the modified existing code. Not yet seen the newly
added code.
One repeated item in the comments is the special casing of uber/yarn mode in
different places. I would expect plugins to come in from the user when the AM
is created and either the client/AppMaster would create default plugins for
uber/yarn. Thereafter dag/vertex/routers should not need to have special casing
for any plugin (like the uber/yarn special casing that exists in all these
places in the branch). e.g. the vertexmanager only uses
VertexManagerPluginDescriptor - even for the built-in plugins. Similar, I would
expect the communicator/scheduler/launcher wrappers to work only with plugin
descriptors.
Also creating a ServicePlugin class will help in reducing code duplication and
make maintenance easier instead of having scheduler id, launcherId and commId
everywhere.
ContainerSignatureMatcher -> ExecutorSignatureMatcher ?
{code}
+public interface ContainerSignatureMatcher {
{code}
Why is this here?
ServicePluginLifecyle etc. in tez-runtime-api like
Inputs/Output/InputInitializer etc. ?
Typically we say start() -> stop() instead of shutdown
{code}+public interface ServicePluginLifecycle {
+ void start() throws Exception;
+ void shutdown() throws Exception;
{code}
Why are executedInAm and executeInContainers there?
{code}
+ public static class VertexExecutionContext {
+ final boolean executeInAm;
+ final boolean executeInContainers;
+ final String taskSchedulerName;{code}
Rename to ExecutorEndReason ? Also, how can "An error in the AM" be caused by a
container running a task?
{code}
+public enum ContainerEndReason {
+ NODE_FAILED, // Completed because the node running the container was marked
as dead
+ APPLICATION_ERROR, // An error in the AM caused by user code
+ FRAMEWORK_ERROR, // An error in the AM - likely a bug.
+ LAUNCH_FAILED, // Failure to launch the container
+}{code}
Why does this have schedulerName and taskCommName ?
{code}
+public class ContainerLaunchRequest extends ContainerLauncherOperationBase {
+
+ private final ContainerLaunchContext clc;
+ private final Container container;
+} {code}
Why enableContainers and enableUber?
{code}
+public class ServicePluginsDescriptor {
+
+ private final boolean enableContainers;
+ private final boolean enableUber;
+ {code}
Has the internal one been replaced by this?
{code}
+public enum TaskAttemptEndReason {
+ NODE_FAILED, // Completed because the node running the container was marked
as dead
+}{code}
Rename to ExecutorBusy ?
{code}
+ COMMUNICATION_ERROR, // Equivalent to a launch failure
+ SERVICE_BUSY, // Service rejected the task
+ INTERRUPTED_BY_SYSTEM, // Interrupted by the system. e.g. Pre-emption
+ INTERRUPTED_BY_USER, // Interrupted by the user
+
}{code}
Why isLocal flag needs to be passed to Scheduler/Launcher/Communicator routers?
Instead of a service plugin for local
Is is ensured that the integer for a service plugin will turn out to be the
same after AM restart?
Why is yarn scheduler special cased? Launcher/Communicator dont have the
special casing ?
{code}
+ static void processSchedulerDescriptors(List<NamedEntityDescriptor>
descriptors, boolean isLocal,
+ UserPayload defaultPayload,
+ BiMap<String, Integer>
schedulerPluginMap) {
.....
+ if (!foundYarn) {
+ NamedEntityDescriptor yarnDescriptor =
+ new
NamedEntityDescriptor(TezConstants.getTezYarnServicePluginName(), null)
+ .setUserPayload(defaultPayload);
+ addDescriptor(descriptors, schedulerPluginMap, yarnDescriptor);
+ }{code}
Why use different code path for uber/default. They should just work when
instantiated the same way as a custom plugin.
{code}
+ TaskCommunicator createTaskCommunicator(NamedEntityDescriptor
taskCommDescriptor,
+ int taskCommIndex) {
+ if
(taskCommDescriptor.getEntityName().equals(TezConstants.getTezYarnServicePluginName()))
{
+ return
createDefaultTaskCommunicator(taskCommunicatorContexts[taskCommIndex]);
+ } else if (taskCommDescriptor.getEntityName()
+ .equals(TezConstants.getTezUberServicePluginName())) {
+ return
createUberTaskCommunicator(taskCommunicatorContexts[taskCommIndex]);
+ } else {
+ return
createCustomTaskCommunicator(taskCommunicatorContexts[taskCommIndex],
+ taskCommDescriptor);
}{code}
Are this and other methods threadsafe wrt callback from multiple plugins?
{code}
+ public TaskHeartbeatResponse heartbeat(TaskHeartbeatRequest request)
+ throws IOException, TezException {
{code}
Also in heartbeat(), the following code has been lost during merge.
{code} for (TezEvent tezEvent : ListUtils.emptyIfNull(inEvents)) {
// for now, set the event time on the AM when it is received.
// this avoids any time disparity between machines.
tezEvent.setEventReceivedTime(currTime);{code}
Why are the contextImpls not directing invoking/handling the plugins instead of
going through the router?
{code}
+ public void vertexStateUpdateNotificationReceived(VertexStateUpdate event,
int taskCommIndex) throws
+ Exception {
+ taskCommunicators[taskCommIndex].onVertexStateUpdated(event);
+ }{code}
Why this 3 step process?
{code}
+ // Inform all communicators of the dagCompletion.
+ for (int i = 0 ; i < taskCommunicators.length ; i++) {
+
((TaskCommunicatorContextImpl)taskCommunicatorContexts[i]).dagCompleteStart(dag);
+ taskCommunicators[i].dagComplete(dag.getName());
+
((TaskCommunicatorContextImpl)taskCommunicatorContexts[i]).dagCompleteEnd();
+ }{code}
Why has the synchronization been removed. I remember this being a subtle race
condition.
{code}
@Override
public void registerTaskAttempt(AMContainerTask amContainerTask,
- ContainerId containerId) {
+ ContainerId containerId, int taskCommId) {
ContainerInfo containerInfo = registeredContainers.get(containerId);
- if(containerInfo == null) {
+ if (containerInfo == null) {
throw new TezUncheckedException("Registering task attempt: "
+ amContainerTask.getTask().getTaskAttemptID() + " to unknown
container: " + containerId);
}
- synchronized (containerInfo) {
- if(containerInfo.amContainerTask != null) {
- throw new TezUncheckedException("Registering task attempt: "
- + amContainerTask.getTask().getTaskAttemptID() + " to container: "
+ containerId
- + " with existing assignment to: " +
containerInfo.amContainerTask.getTask().getTaskAttemptID());
- }
- containerInfo.amContainerTask = amContainerTask;
- containerInfo.taskPulled = false;
-
- ContainerId containerIdFromMap =
- attemptToInfoMap.put(amContainerTask.getTask().getTaskAttemptID(),
containerId);
- if(containerIdFromMap != null) {
- throw new TezUncheckedException("Registering task attempt: "
- + amContainerTask.getTask().getTaskAttemptID() + " to container: "
+ containerId
- + " when already assigned to: " + containerIdFromMap);
- }
+ if (containerInfo.taskAttemptId != null) {
+ throw new TezUncheckedException("Registering task attempt: "
+ + amContainerTask.getTask().getTaskAttemptID() + " to container: " +
containerId
+ + " with existing assignment to: " +
+ containerInfo.taskAttemptId);
}
- }{code}
The dagCompleteStart/End logic is either broken or unnecessary because the
correct dag seems to be always received from appContext.getCurrentDAG().
{code}
+ private DAG getDag() {
+ dagChangedReadLock.lock();
+ try {
+ if (dag != null) {
+ return dag;
+ } else {
+ return context.getCurrentDAG();
+ }
+ } finally {
+ dagChangedReadLock.unlock();
+ }
+ }
+
+ @InterfaceAudience.Private
+ public void dagCompleteStart(DAG dag) {
+ dagChangedWriteLock.lock();
+ try {
+ this.dag = dag;
+ } finally {
+ dagChangedWriteLock.unlock();
+ }
+ }
+
+ public void dagCompleteEnd() {
+ dagChangedWriteLock.lock();
+ try {
+ this.dag = null;
+ } finally {
+ dagChangedWriteLock.unlock();
+ }
+ }
+}{code}
Why not keep a cached copy instead of converting each time?
{code}
+ public org.apache.tez.dag.api.Vertex.VertexExecutionContext
getDefaultExecutionContext() {
+ if (jobPlan.hasDefaultExecutionContext()) {
+ return
DagTypeConverters.convertFromProto(jobPlan.getDefaultExecutionContext());
+ } else {
+ return null;
+ }
+ }
+{code}
There is a scheduledTime on master that this is duplicating.
{code}
+ private long scheduleTime = 0;
private long finishTime = 0;
private String trackerName;
private int httpPort;{code}
What is the use of this?
{code}
+ @Override
+ public long getFirstAttemptStartTime() {
+ readLock.lock();
+ try {
+ // The first attempt will always have an index of 0.
+ return getAttempt(TezTaskAttemptID.getInstance(getTaskId(),
0)).getScheduleTime();
+ } finally {
+ readLock.unlock();
+ }
+ }
+{code}
Why is the vertex code concerned with checking local mode/uber etc.? It should
simply take the vertex or default
execution context and use it.
{code}
+ boolean isLocal = vertexConf.getBoolean(TezConfiguration.TEZ_LOCAL_MODE,
+ TezConfiguration.TEZ_LOCAL_MODE_DEFAULT);
+
+ String tezDefaultComponentName =
+ isLocal ? TezConstants.getTezUberServicePluginName() :
+ TezConstants.getTezYarnServicePluginName();
+
+ org.apache.tez.dag.api.Vertex.VertexExecutionContext execContext =
dag.getDefaultExecutionContext();
+ if (vertexPlan.hasExecutionContext()) {
+ execContext =
DagTypeConverters.convertFromProto(vertexPlan.getExecutionContext());
+ LOG.info("Using ExecutionContext from Vertex for Vertex {}", vertexName);
+ } else if (execContext != null) {
+ LOG.info("Using ExecutionContext from DAG for Vertex {}", vertexName);
+ }
+ if (execContext != null) {
+ if (execContext.shouldExecuteInAm()) {
+ tezDefaultComponentName = TezConstants.getTezUberServicePluginName();
+ }
+ }
+
+ String taskSchedulerName = tezDefaultComponentName;
+ String containerLauncherName = tezDefaultComponentName;
+ String taskCommName = tezDefaultComponentName;
+
+ if (execContext != null) {
+ if (execContext.getTaskSchedulerName() != null) {
+ taskSchedulerName = execContext.getTaskSchedulerName();
+ }
+ if (execContext.getContainerLauncherName() != null) {
+ containerLauncherName = execContext.getContainerLauncherName();
+ }
+ if (execContext.getTaskCommName() != null) {
+ taskCommName = execContext.getTaskCommName();
+ }
+ }
{code}
Create base class that takes care of the repeated schedulerId code in all
scheduler events?
{code}
public class AMSchedulerEventDeallocateContainer extends AMSchedulerEvent {
private final ContainerId containerId;
+ private final int schedulerId;
{code}
Why is the end reason here. Dont see any use anywhere in the patch.
{code}
@Override
- public boolean deallocateTask(Object task, boolean taskSucceeded) {
+ public boolean deallocateTask(Object task, boolean taskSucceeded,
TaskAttemptEndReason endReason) {
return taskRequestHandler.addDeallocateTaskRequest(task);
}
{code}
Why are uber/container handled differently
{code}
+ @VisibleForTesting
+ TaskScheduler createTaskScheduler(String host, int port, String trackingUrl,
+ AppContext appContext,
+ NamedEntityDescriptor
taskSchedulerDescriptor,
+ long customAppIdIdentifier,
+ int schedulerId) {
+ TaskSchedulerContext rawContext =
+ new TaskSchedulerContextImpl(this, appContext, schedulerId,
trackingUrl,
+ customAppIdIdentifier, host, port,
taskSchedulerDescriptor.getUserPayload());
+ TaskSchedulerContext wrappedContext = wrapTaskSchedulerContext(rawContext);
+ String schedulerName = taskSchedulerDescriptor.getEntityName();
+ if (schedulerName.equals(TezConstants.getTezYarnServicePluginName())) {
+ return createYarnTaskScheduler(wrappedContext, schedulerId);
+ } else if
(schedulerName.equals(TezConstants.getTezUberServicePluginName())) {
+ return createUberTaskScheduler(wrappedContext, schedulerId);
+ } else {
+ return createCustomTaskScheduler(wrappedContext,
taskSchedulerDescriptor, schedulerId);
}
- else {
- return new YarnTaskSchedulerService(this, this.containerSignatureMatcher,
- host, port, trackingUrl, appContext);
+ }
{code}
Have a base class that prevents code duplication for launcherId etc. for all
AMContainerEvents
{code}
public AMContainerEventLaunchRequest(ContainerId containerId,
- TezVertexID vertexId, ContainerContext containerContext) {
+ TezVertexID vertexId, ContainerContext containerContext,
+ int launcherId, int taskCommId) {
super(containerId, AMContainerEventType.C_LAUNCH_REQUEST);
this.vertexId = vertexId;
this.containerContext = containerContext;
+ this.launcherId = launcherId;
+ this.taskCommId = taskCommId;
}{code}
Create an object to represent service plugins instead of having
sched/launcher/comm everywhere. This will make the code cleaner
and easier to maintain. Any changes like have helper methods or new plugins
will be easier to make.
{code}
@@ -80,6 +84,9 @@ public class AMContainerImpl implements AMContainer {
private final TaskAttemptListener taskAttemptListener;
protected final EventHandler eventHandler;
private final ContainerSignatureMatcher signatureMatcher;
+ private final int schedulerId;
+ private final int launcherId;
+ private final int taskCommId;
{code}
AMContainer end reasons need a detailed look.
Then lets call it schedulerId unless there is reason for it to be something else
{code}
+ private final int sourceId; // Effectively the schedulerId
- public AMNodeEvent(NodeId nodeId, AMNodeEventType type) {
+ public AMNodeEvent(NodeId nodeId, int sourceId, AMNodeEventType type) {
{code}
The pending changes to AMNode (in general node handling per scheduler) really
need to be prioritized. They look quite incomplete because nodes are
closely tied to execution environment and the current state of the code with
some bits handled and some bits not, can be error prone.
{code}
@@ -247,7 +249,7 @@ public class AMNodeImpl implements AMNode {
/* Blacklist the node with the AMNodeTracker and check if the node should be
blacklisted */
protected boolean registerBadNodeAndShouldBlacklist() {
- return appContext.getNodeTracker().registerBadNodeAndShouldBlacklist(this);
+ return appContext.getNodeTracker().registerBadNodeAndShouldBlacklist(this,
sourceId);
}
@@ -257,7 +259,8 @@ public class AMNodeImpl implements AMNode {
- sendEvent(new AMSchedulerEventNodeBlacklistUpdate(getNodeId(), true));
+ // TODO TEZ-2124 node tracking per ext source
+ sendEvent(new AMSchedulerEventNodeBlacklistUpdate(getNodeId(), true, 0));
@@ -363,7 +366,8 @@ public class AMNodeImpl implements AMNode {
- node.sendEvent(new
AMSchedulerEventNodeBlacklistUpdate(node.getNodeId(), false));
+ // TODO TEZ-2124 node tracking per ext source
+ node.sendEvent(new
AMSchedulerEventNodeBlacklistUpdate(node.getNodeId(), false, 0));
{code}
What is a source? Can we call it scheduler or execution environment?
{code} static final Logger LOG = LoggerFactory.getLogger(AMNodeTracker.class);
- private final ConcurrentHashMap<NodeId, AMNode> nodeMap;
- private final ConcurrentHashMap<String, Set<NodeId>> blacklistMap;
+ private final ConcurrentMap<Integer, PerSourceNodeTracker>
perSourceNodeTrackers;
+{code}
Remove commented code
{code}
+ MockDAGAppMaster.this.getContext().getClock().getTime()));
+// TezHeartbeatRequest request = new
TezHeartbeatRequest(cData.numUpdates, events,
+// cData.cIdStr, cData.taId, cData.nextFromEventId, 50000);
+ TaskHeartbeatRequest request =
+ new TaskHeartbeatRequest(cData.cIdStr, cData.taId, events,
cData.nextFromEventId, cData.nextPreRoutedFromEventId,
+ 50000);{code}
New code misses the ++cData.numUpdates
{code}
- getContext().getClock().getTime()));
- TezHeartbeatRequest request = new
TezHeartbeatRequest(++cData.numUpdates, events,
- cData.nextPreRoutedFromEventId, cData.cIdStr, cData.taId,
cData.nextFromEventId, 10000);
+ MockDAGAppMaster.this.getContext().getClock().getTime()));
+ TaskHeartbeatRequest request =
+ new TaskHeartbeatRequest(cData.cIdStr, cData.taId, events,
cData.nextFromEventId, cData.nextPreRoutedFromEventId,
+ 10000);{code}
Why do we need the new session token related code here?
{code}- public void testPortRange_NotSpecified() {
+ public void testPortRange_NotSpecified() throws IOException {
Configuration conf = new Configuration();
+ JobTokenIdentifier identifier = new JobTokenIdentifier(new Text(
+ "fakeIdentifier"));
+ Token<JobTokenIdentifier> sessionToken = new
Token<JobTokenIdentifier>(identifier,
+ new JobTokenSecretManager());
+ sessionToken.setService(identifier.getJobId());
+ TokenCache.setSessionToken(sessionToken, credentials);
+ UserPayload userPayload = TezUtils.createUserPayloadFromConf(conf);
taskAttemptListener = new TaskAttemptListenerImpTezDag(appContext,
- mock(TaskHeartbeatHandler.class),
mock(ContainerHeartbeatHandler.class), null);
+ mock(TaskHeartbeatHandler.class),
mock(ContainerHeartbeatHandler.class), Lists.newArrayList(
+ new NamedEntityDescriptor(TezConstants.getTezYarnServicePluginName(),
null)
+ .setUserPayload(userPayload)));{code}
Create shared method instead of duplicating code?
{code}@@ -285,8 +287,9 @@ public class TestTaskAttempt {
- when(taListener.getAddress()).thenReturn(
- new InetSocketAddress("localhost", 0));
+ TaskCommunicator taskComm = mock(TaskCommunicator.class);
+ doReturn(new InetSocketAddress("localhost",
0)).when(taskComm).getAddress();
+ doReturn(taskComm).when(taListener).getTaskCommunicator(0);
@@ -335,8 +338,9 @@ public class TestTaskAttempt {
- when(taListener.getAddress()).thenReturn(
- new InetSocketAddress("localhost", 0));
+ TaskCommunicator taskComm = mock(TaskCommunicator.class);
+ doReturn(new InetSocketAddress("localhost",
0)).when(taskComm).getAddress();
+ doReturn(taskComm).when(taListener).getTaskCommunicator(0);
@@ -436,8 +440,9 @@ public class TestTaskAttempt {
- when(taListener.getAddress()).thenReturn(
- new InetSocketAddress("localhost", 0));
+ TaskCommunicator taskComm = mock(TaskCommunicator.class);
+ doReturn(new InetSocketAddress("localhost",
0)).when(taskComm).getAddress();
+ doReturn(taskComm).when(taListener).getTaskCommunicator(0);
{code}
This seems to be duplicating the logic in the DAGAppMaster. That would be
fragile and tests would not break if a future change
in the App master actually broke this logic.
{code}
+ private static class ExecutionContextTestInfoHolder {
+
+ static final String TASK_SCHEDULER_NAME_BASE = "TASK_SCHEDULER";
+ static final String CONTAINER_LAUNCHER_NAME_BASE = "CONTAINER_LAUNCHER";
+ static final String TASK_COMM_NAME_BASE = "TASK_COMMUNICATOR";
+
+ static String append(String base, int index) {
+ return base + index;
+ }
+
+ final String vertexName;
+ final VertexExecutionContext defaultExecutionContext;
+ final VertexExecutionContext vertexExecutionContext;
+ final BiMap<String, Integer> taskSchedulers = HashBiMap.create();
+ final BiMap<String, Integer> containerLaunchers = HashBiMap.create();
+ final BiMap<String, Integer> taskComms = HashBiMap.create();
+ final AppContext appContext;
+{code}
Why is this mock return of RUNNING state missing. Allocated containers checks
for running state. Is that handled by the new code in
TestTaskSchedulerHelpers?
{code}
@SuppressWarnings({ "unchecked" })
@Test(timeout=10000)
public void testTaskSchedulerWithReuse() throws Exception {
RackResolver.init(new YarnConfiguration());
- TaskSchedulerAppCallback mockApp = mock(TaskSchedulerAppCallback.class);
- AppContext mockAppContext = mock(AppContext.class);
- when(mockAppContext.getAMState()).thenReturn(DAGAppMasterState.RUNNING);
{code}
> [Umbrella] Allow Tez to co-ordinate execution to external services
> ------------------------------------------------------------------
>
> Key: TEZ-2003
> URL: https://issues.apache.org/jira/browse/TEZ-2003
> Project: Apache Tez
> Issue Type: Improvement
> Reporter: Siddharth Seth
> Attachments: 2003_20150728.1.txt, 2003_20150807.1.txt,
> 2003_20150807.2.txt, Tez With External Services.pdf
>
>
> The Tez engine itself takes care of co-ordinating execution - controlling how
> data gets routed (different connection patterns), fault tolerance, scheduling
> of work, etc.
> This is currently tied to TaskSpecs defined within Tez and on containers
> launched by Tez itself (TezChild).
> The proposal is to allow Tez to work with external services instead of just
> containers launched by Tez. This involves several more pluggable layers to
> work with alternate Task Specifications, custom launch and task allocation
> mechanics, as well as custom scheduling sources.
> A simple example would be a simple a process with the capability to execute
> multiple Tez TaskSpecs as threads. In such a case, a container launch isn't
> really need and can be mocked. Sourcing / scheduling containers would need to
> be pluggable.
> A more advanced example would be LLAP (HIVE-7926;
> https://issues.apache.org/jira/secure/attachment/12665704/LLAPdesigndocument.pdf).
> This works with custom interfaces - which would need to be supported by Tez,
> along with a custom event model which would need translation hooks.
> Tez should be able to work with a combination of certain vertices running in
> external services and others running in regular Tez containers.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)