thanks for responding, Laura. I am using XCOMs but my problem that HIVE query would not return the value I need (map reduce counter) so I thought to parse hive output and extract mapreduce jobid and then use mapred cli to get that counter for that job_id.
another option I found is to gather statistics on my refreshed table and then use describe extended to pull number of rows - this will take more time because of the first step than just by grabbing mapreduce counter from hadoop. does it make any sense? On Fri, Feb 10, 2017 at 10:43 AM, Laura Lorenz <[email protected]> wrote: > I don't use the HiveCliHook so I'm not sure how it works, but is the only > place you can retrieve these counts the logfiles? If you have them at time > of query in your python callable, you could push them anywhere you like > inline at the conclusion of the task. Or, you may prefer to have your > PythonOperator `return` some data structure with those counts, which will > be stored by default in the airflow metadata database per the XCom system > <https://airflow.incubator.apache.org/concepts.html#xcoms>. Then, > depending > what you want to do with that, you could query those out of the metadata > database with the ad-hoc querying or charting UIs right within Airflow, or > a later task altogether. > > On Fri, Feb 10, 2017 at 8:58 AM, Boris Tyukin <[email protected]> > wrote: > > > please...? > > > > On Thu, Feb 9, 2017 at 8:35 AM, Boris Tyukin <[email protected]> > > wrote: > > > > > Hello, > > > > > > I am using HiveCliHook called from PythonOperator to run a series of > > > queries and want to capture record counts for auditing and validation > > > purposes. > > > > > > *I am thinking to use on_success_callback to invoke python function > that > > > will read the log file, produced by airflow and then parse it out using > > > regex. * > > > > > > *I am going to use this method from models to get to the file log:* > > > > > > *def log_filepath(self): iso = self.execution_date.isoformat() log = > > > os.path.expanduser(configuration.get('core', 'BASE_LOG_FOLDER')) > return > > ( > > > "{log}/{self.dag_id}/{self.task_id}/{iso}.log".format(**locals()))* > > > Is this a good strategy or there is an easier way? I wondering if > someone > > > did something similar. > > > > > > Another challenge is that the same log file contains multiple attempts > > and > > > reruns of the same task so I guess I need to parse the file backwards. > > > > > > thanks, > > > Boris > > > > > >
