A few related thoughts: * it sucks to have to parse Hive log files, though it seems like there are no clean ways around it sometimes. We're doing similar for Superset to get an idea of % of progress of the query * really the HiveServer2 thrift client should allow to return structured stats about the job after its execution, but that's not the case and probably never will be... * the `HiveCliHook.run_cli` does return the log after it's done (now I'm noticing it could yield as it goes instead btw...), but that means you don't need the `on_success_callback`, you can do what you need to do in your PythonOperator which will simplify things a little * knowing that parsing Hive logs is a common necessary pattern, I wouldn't be opposed to adding some of the key elements in HiveCliHook. Maybe there's a new arg `return_stats=False` in `HiveCliHook.run_cli` that returns a dict with info about the query that just ran (row processed, cpu time, mappers / reducers, number of phases, ...) * if you have Presto, you could issue a quick COUNT(*) query right after your job. At Airbnb we have this common subdag pattern that stages the data in Hive, runs a set of data quality checks in Presto, and exchanges the partition in Hive when it passes the tests
A total side note is we have a stats collection bot at Airbnb that finds all the tables/partitions that have changed recently in the metastore (every 5 minutes), and issues a single scan Presto query that is dynamically generated to get tons of stats for each column (% of null, min, max, avg, count distinct, number of characters, ...) and stores results in MySQL. This is super useful for capacity planning, debugging, data quality checks, anomaly detection, ... I've been talking with Carlos Bueno from Lyft yesterday who might be interested in taking this code (and perhaps other Airbnb projects), generalizing the code, cleaning it up, documenting it and open sourcing it. Most of these little projects are stack specific and only useful to companies that happen to be running on the same stack as we are. Max On Fri, Feb 10, 2017 at 7:43 AM, Laura Lorenz <[email protected]> wrote: > I don't use the HiveCliHook so I'm not sure how it works, but is the only > place you can retrieve these counts the logfiles? If you have them at time > of query in your python callable, you could push them anywhere you like > inline at the conclusion of the task. Or, you may prefer to have your > PythonOperator `return` some data structure with those counts, which will > be stored by default in the airflow metadata database per the XCom system > <https://airflow.incubator.apache.org/concepts.html#xcoms>. Then, > depending > what you want to do with that, you could query those out of the metadata > database with the ad-hoc querying or charting UIs right within Airflow, or > a later task altogether. > > On Fri, Feb 10, 2017 at 8:58 AM, Boris Tyukin <[email protected]> > wrote: > > > please...? > > > > On Thu, Feb 9, 2017 at 8:35 AM, Boris Tyukin <[email protected]> > > wrote: > > > > > Hello, > > > > > > I am using HiveCliHook called from PythonOperator to run a series of > > > queries and want to capture record counts for auditing and validation > > > purposes. > > > > > > *I am thinking to use on_success_callback to invoke python function > that > > > will read the log file, produced by airflow and then parse it out using > > > regex. * > > > > > > *I am going to use this method from models to get to the file log:* > > > > > > *def log_filepath(self): iso = self.execution_date.isoformat() log = > > > os.path.expanduser(configuration.get('core', 'BASE_LOG_FOLDER')) > return > > ( > > > "{log}/{self.dag_id}/{self.task_id}/{iso}.log".format(**locals()))* > > > Is this a good strategy or there is an easier way? I wondering if > someone > > > did something similar. > > > > > > Another challenge is that the same log file contains multiple attempts > > and > > > reruns of the same task so I guess I need to parse the file backwards. > > > > > > thanks, > > > Boris > > > > > >
