Sailesh Mukil has posted comments on this change.

Change subject: PREVIEW: IMPALA-2550 Introduce query-wide execution context.
......................................................................


Patch Set 5:

(30 comments)

http://gerrit.cloudera.org:8080/#/c/3817/4/be/src/runtime/coordinator.cc
File be/src/runtime/coordinator.cc:

PS4, Line 418: // TODO: Should we create a QueryExecState for the coordinator 
fragment?
> yes, otherwise you wind up with special-case logic to handle the paths wher
I thought the coordinator does not have a QEM (or previously a FragmentMgr). 
Because we never create a FragmentExecState for the coordinator fragment and 
just start off with a PlanFragmentExecutor directly. And the coordinator code 
ends up doing what a FragmentExecState would have done for a fragment otherwise 
(like Prepare(), Exec(), Close(), etc.)


http://gerrit.cloudera.org:8080/#/c/3817/4/be/src/runtime/query-exec-state.cc
File be/src/runtime/query-exec-state.cc:

PS4, Line 71:   lock_guard<SpinLock> l(fragment_exec_state_map_lock_);
> This is confusing: presumably this exists to ensure that *this has a lifeti
That makes sense. I previously wanted only a single interface to 
increase/decrease the ref counter, but I see the problem with it now.
I've made FindOrInsertQES() increase the ref count.


http://gerrit.cloudera.org:8080/#/c/3817/4/be/src/runtime/query-exec-state.h
File be/src/runtime/query-exec-state.h:

Line 1: // Licensed to the Apache Software Foundation (ASF) under one
> Fix license headers here and elsewhere.
Done


Line 36: /// Backend execution state of a query, which is shared between all 
fragment instances
> No need to talk about what owners of this class might do with it.
Done


PS4, Line 41: or f
> nit: use nullptr now.
Done


PS4, Line 84: 
> Why shared?
That was a mistake. Changed it to a raw pointer.


PS4, Line 101: rence
> std
As we spoke, I'll make sure that the decided on patch compiles with 'boost' and 
change it to 'std' in the end (as changing to 'std' causes some compilation 
issues I don't really understand now).


http://gerrit.cloudera.org:8080/#/c/3817/4/be/src/runtime/runtime-state.h
File be/src/runtime/runtime-state.h:

Line 274:   /// A pointer to the query wide execution state which holds all the 
shared state between
> comment what this is.
Done


http://gerrit.cloudera.org:8080/#/c/3817/4/be/src/service/client-request-exec-state.cc
File be/src/service/client-request-exec-state.cc:

Line 14: 
> add new line before this one
Done


Line 570:   wait_thread_.reset(new Thread("query-exec-state", "wait-thread",
> long line
Done


Line 677: 
> long line
Done


Line 829: }
> long line
Done. Just FYI, there's quite some more to fix in this file, but haven't done 
it yet as it's not relevant to the core patch. Will do it once the header is 
decided on.


http://gerrit.cloudera.org:8080/#/c/3817/4/be/src/service/query-exec-mgr.cc
File be/src/service/query-exec-mgr.cc:

PS4, Line 37: 
> avoid bind, prefer lambdas
I've made a note of this, I will change it once I can get past all the 
compilation issues after the header has been decided on.


PS4, Line 48:             << " fragment instance#="
            :              << 
exec_params.fragment_instance_ctx.instance_state_idx;
            : 
            :   // Remote fragments must always have a sink. Remove when 
IMPALA-2905 is resolved.
            :   DCHECK(exec_params.fragment_ctx.fragment.__isset.output_sink);
            : 
            :   // Create or retrieve the QueryExecState for this query id.
            :   QueryExecState* query_exec_state = 
FindOrInsertQueryExecState(exec_params.query_ctx);
            :   DCHECK(query_exec_state != NULL);
            : 
            :   // This is a no-op if another fragment of the same query 
already called Init().
            :   query_exec_state->Init();
            : 
> this logic looks like it should live inside RegisterFragmentInstance
Done


PS4, Line 65: if (!register_status.ok()) {
            :     ReleaseQueryExecState(query_exec_state);
            :     return register_status;
            :   }
            :   return Status::OK();
            : }
> Why not change the interface to the following:
I thought of that but GetQES() is used wherever we want to get the QES for a 
query.

So, this could result in a case where the QES is destroyed in a backend (after 
that backend completes its execution for that query), and then a late 
CancelPlanFragment() arrives and calls GetQES() which will end up setting up 
the QES for that already completed query (w.r.t that backend) again.

I've made a change so that FindOrInsertQES() also increases the ref count, and 
calls ReleaseQES() on a failure.


PS4, Line 103: lock_guard<SpinLock> l(query_exec_sta
> ++
Done


PS4, Line 107:     unique_ptr<QueryExecState> query_exec_state(new 
QueryExecState(this, query_ctx));
             :     i = query_exec_state_map_.insert(
             :         make_pair(query_id, std::move(query_exec_state)));
             :   }
             :   ++(i->second->num_current_references_);
             :   return i->second.get();
             : }
             : 
> A QES will never be destroyed unless at least one reference is ever taken t
I've made a change such that QEM->FindOrInsertQES() and QEM->GetQES() both 
increase the ref count, and I also make sure to call ReleaseQES() on a failure 
in QEM->RegisterFragmentInstanceWithQuery().

So there is no case where a QES can be alive and dangling without impending 
destruction.


PS4, Line 133:    
> remove std::
Done


PS4, Line 137: scoped_qes.PublishFilter(dst_instance
> nit: you're doing the lookup twice. insert returns a pair <iterator, bool> 
Done


PS4, Line 142: 
> The right thing to do is to augment the rpc params with the query ID, so yo
Done


http://gerrit.cloudera.org:8080/#/c/3817/4/be/src/service/query-exec-mgr.h
File be/src/service/query-exec-mgr.h:

Line 1: // Licensed to the Apache Software Foundation (ASF) under one
> Replace header with ASF one.
Done


PS4, Line 29: space imp
> fragment
Done


PS4, Line 40: /// offer itself up for destruction when it is no longer being 
used and the
            : /// destroy_thread_ will destroy it.
            : ///
> Can you do that in this patch? (Already noticed one instance below). Gettin
Done. I changed CancelPlanFragment() and PublishFilter().


PS4, Line 61: 
            :   /// Return a pointer to the QueryExecState if
> This shouldn't have RPC-style signature (since this isn't a proxy class): r
Done


PS4, Line 80: eId& fragment_instance_id);
> When do we know the fragment instance id, but not the query id?
As we spoke everything coming in to a backend specifies only the fragment_id, 
i.e. TCancelPlanFragmentParams, TPublishFilterParams, etc.

I've changed their thrift structures to include the query_id and gotten rid of 
this function and the <query_id, fragment_id> map.

So now every lookup is from query_id->QES as you suggested.


Line 91:   /// The caller should always check if obj->is_valid_ref() is 'true' 
before using it.
> Comment?
Done


PS4, Line 94:  only through the APIs provi
> This should take a query ID. You should try to get rid of the fragment->que
Done. (As mentioned in the comment above)


PS4, Line 98:       function signatures for every function the QES wants to 
expose.
> What prevents this from returning NULL? You need some mechanism to ensure t
I've added a comment above explaining the guarantees of this class.

Currently, if NULL is returned, there's nothing that can be done. It's the 
callers responsibility to check (ScopedQueryExecStateRef.query_exec_state() != 
NULL) before using it.


PS4, Line 117: y_exe
> std
As we spoke, I'll make sure that the decided on patch compiles with 'boost' and 
change it to 'std' in the end (as changing to 'std' causes some compilation 
issues I don't really understand now).


PS4, Line 130: e_map_
> not just a single thread then.
This ThreadPool is initialized with one thread, so it's still just one thread.
I followed how it's done in data-stream-sender:
https://github.com/apache/incubator-impala/blob/master/be/src/runtime/data-stream-sender.cc#L79-L80

This way it's easy to use the Offer() API.

Please let me know if you think there's a better way to do it.


-- 
To view, visit http://gerrit.cloudera.org:8080/3817
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-MessageType: comment
Gerrit-Change-Id: I892091d6401acb2ea91ccb1623af54c6f9635e6c
Gerrit-PatchSet: 5
Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-Owner: Sailesh Mukil <[email protected]>
Gerrit-Reviewer: Henry Robinson <[email protected]>
Gerrit-Reviewer: Lars Volker <[email protected]>
Gerrit-Reviewer: Marcel Kornacker <[email protected]>
Gerrit-Reviewer: Sailesh Mukil <[email protected]>
Gerrit-HasComments: Yes

Reply via email to