A few related thoughts:

* it sucks to have to parse Hive log files, though it seems like there are
no clean ways around it sometimes. We're doing similar for Superset to get
an idea of % of progress of the query
* really the HiveServer2 thrift client should allow to return structured
stats about the job after its execution, but that's not the case and
probably never will be...
* the `HiveCliHook.run_cli` does return the log after it's done (now I'm
noticing it could yield as it goes instead btw...), but that means you
don't need the `on_success_callback`, you can do what you need to do in
your PythonOperator which will simplify things a little
* knowing that parsing Hive logs is a common necessary pattern, I wouldn't
be opposed to adding some of the key elements in HiveCliHook. Maybe there's
a new arg `return_stats=False` in `HiveCliHook.run_cli` that returns a dict
with info about the query that just ran (row processed, cpu time, mappers /
reducers, number of phases, ...)
* if you have Presto, you could issue a quick COUNT(*) query right after
your job. At Airbnb we have this common subdag pattern that stages the data
in Hive, runs a set of data quality checks in Presto, and exchanges the
partition in Hive when it passes the tests

A total side note is we have a stats collection bot at Airbnb that finds
all the tables/partitions that have changed recently in the metastore
(every 5 minutes), and issues a single scan Presto query that is
dynamically generated to get tons of stats for each column (% of null, min,
max, avg, count distinct, number of characters, ...) and stores results in
MySQL. This is super useful for capacity planning, debugging, data quality
checks, anomaly detection, ... I've been talking with Carlos Bueno from
Lyft yesterday who might be interested in taking this code (and perhaps
other Airbnb projects), generalizing the code, cleaning it up,  documenting
it and open sourcing it. Most of these little projects are stack specific
and only useful to companies that happen to be running on the same stack as
we are.

Max


On Fri, Feb 10, 2017 at 7:43 AM, Laura Lorenz <[email protected]>
wrote:

> I don't use the HiveCliHook so I'm not sure how it works, but is the only
> place you can retrieve these counts the logfiles? If you have them at time
> of query in your python callable, you could push them anywhere you like
> inline at the conclusion of the task. Or, you may prefer to have your
> PythonOperator `return` some data structure with those counts, which will
> be stored by default in the airflow metadata database per the XCom system
> <https://airflow.incubator.apache.org/concepts.html#xcoms>. Then,
> depending
> what you want to do with that, you could query those out of the metadata
> database with the ad-hoc querying or charting UIs right within Airflow, or
> a later task altogether.
>
> On Fri, Feb 10, 2017 at 8:58 AM, Boris Tyukin <[email protected]>
> wrote:
>
> > please...?
> >
> > On Thu, Feb 9, 2017 at 8:35 AM, Boris Tyukin <[email protected]>
> > wrote:
> >
> > > Hello,
> > >
> > > I am using HiveCliHook called from PythonOperator to run a series of
> > > queries and want to capture record counts for auditing and validation
> > > purposes.
> > >
> > > *I am thinking to use on_success_callback to invoke python function
> that
> > > will read the log file, produced by airflow and then parse it out using
> > > regex. *
> > >
> > > *I am going to use this method from models to get to the file log:*
> > >
> > > *def log_filepath(self): iso = self.execution_date.isoformat() log =
> > > os.path.expanduser(configuration.get('core', 'BASE_LOG_FOLDER'))
> return
> > (
> > > "{log}/{self.dag_id}/{self.task_id}/{iso}.log".format(**locals()))*
> > > Is this a good strategy or there is an easier way? I wondering if
> someone
> > > did something similar.
> > >
> > > Another challenge is that the same log file contains multiple attempts
> > and
> > > reruns of the same task so I guess I need to parse the file backwards.
> > >
> > > thanks,
> > > Boris
> > >
> >
>

Reply via email to