[
https://issues.apache.org/jira/browse/DRILL-5975?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16260300#comment-16260300
]
Paul Rogers commented on DRILL-5975:
------------------------------------
Hi [~weijie], the key question is: why is it that scheduling Drill minor
fragments is any different from, say, scheduling tasks in YARN?
The first difference is one I mentioned before: dependencies. We must be
careful to schedule minor fragments in the order that they process data. If
task C needs input from both A and B, then we must run A and B before we run C.
In the worst case, some nodes need input from *all* other workers. Consider a
hash partitioner. In order to build a hash table for a join, we must see all
records from the "build" side. This means all "upsteam" senders must have run
and completed. Note that any one reader will send to all joins. And, each join
gets data from all readers. So, scheduling is a bit tricky. Holding any of the
reader fragments will block the entire downstream query from making progress.
Second is that Drill is not a streaming system. In a streaming system such as
Flink or Apex, data arrives continuously and must be processed or spilled to
disk. Drill, however, reads from disk. There is little value in, say, reading 1
TB from disk only to spill it again. Might as well leave it on disk until it is
needed. Thus, the dynamics of Drill drive different optimization technique than
for streaming systems.
All that said, it is interesting that you see low CPU usage. I would have
thought that you'd have seen excessive load, since that is what we see in our
stress tests. Can you identify in your setup what resource is the bottleneck if
not CPU? Are you seeing either network or disk as the bottleneck?
Also, what is the size of the cluster and queries in your shop? How many nodes?
Of how much memory and how many cores? How many concurrent queries do you run?
About how much data (in GB or TB) is consumed by each query? Perhaps that will
help us understand what is going on so we are all on the same page.
The point is not to discuss whether a scheduler is a good idea. Drill certainly
needs one. Rather, the goal is to identify the constrained resource we wish to
control, and to identify effective ways to control that resource.
> Resource utilization
> --------------------
>
> Key: DRILL-5975
> URL: https://issues.apache.org/jira/browse/DRILL-5975
> Project: Apache Drill
> Issue Type: New Feature
> Affects Versions: 2.0.0
> Reporter: weijie.tong
> Assignee: weijie.tong
>
> h1. Motivation
> Now the resource utilization radio of Drill's cluster is not too good. Most
> of the cluster resource is wasted. We can not afford too much concurrent
> queries. Once the system accepted more queries with a not high cpu load, the
> query which originally is very quick will become slower and slower.
> The reason is Drill does not supply a scheduler . It just assume all the
> nodes have enough calculation resource. Once a query comes, it will schedule
> the related fragments to random nodes not caring about the node's load. Some
> nodes will suffer more cpu context switch to satisfy the coming query. The
> profound causes to this is that the runtime minor fragments construct a
> runtime tree whose nodes spread different drillbits. The runtime tree is a
> memory pipeline that is all the nodes will stay alone the whole lifecycle of
> a query by sending out data to upper nodes successively, even though some
> node could run quickly and quit immediately.What's more the runtime tree is
> constructed before actual running. The schedule target to Drill will become
> the whole runtime tree nodes.
> h1. Design
> It will be hard to schedule the runtime tree nodes as a whole. So I try to
> solve this by breaking the runtime cascade nodes. The graph below describes
> the initial design.
> !https://raw.githubusercontent.com/wiki/weijietong/drill/images/design.png!
> [graph
> link|https://raw.githubusercontent.com/wiki/weijietong/drill/images/design.png]
> Every Drillbit instance will have a RecordBatchManager which will accept all
> the RecordBatchs written by the senders of local different MinorFragments.
> The RecordBatchManager will hold the RecordBatchs in memory firstly then disk
> storage . Once the first RecordBatch of a MinorFragment sender of one query
> occurs , it will notice the FragmentScheduler. The FragmentScheduler is
> instanced by the Foreman.It holds the whole PlanFragment execution graph.It
> will allocate a new corresponding FragmentExecutor to run the generated
> RecordBatch. The allocated FragmentExecutor will then notify the
> corresponding FragmentManager to indicate that I am ready to receive the
> data. Then the FragmentManger will send out the RecordBatch one by one to the
> corresponding FragmentExecutor's receiver like what the current Sender does
> by throttling the data stream.
> What we can gain from this design is :
> a. The computation leaf node does not to wait for the consumer's speed to end
> its life to release the resource.
> b. The sending data logic will be isolated from the computation nodes and
> shared by different FragmentManagers.
> c. We can schedule the MajorFragments according to Drillbit's actual resource
> capacity at runtime.
> d. Drill's pipeline data processing characteristic is also retained.
> h1. Plan
> This will be a large PR ,so I plan to divide it into some small ones.
> a. to implement the RecordManager.
> b. to implement a simple random FragmentScheduler and the whole event flow.
> c. to implement a primitive FragmentScheduler which may reference the Sparrow
> project.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)