Thanks - that seems to have been what it was.

However i now get the below error. Seems like running the trigger_dag
command multiple times (i had initially done it without the --conf param)
violates some key constraint in the db. I tried adding a unique run_id but
could not figure out how to resolve this error for the 26th June. If i try
the same command but for the 27th it seems to trigger it fine.

Am i missing something obvious here as to why i don't seem to be able to
run the same trigger_dag multiple times?

Cheers,
Andy

/usr/local/lib/python2.7/dist-packages/airflow-1.9.0.dev0+apache.incubating-py2.7.egg/airflow/configuration.py:553:
DeprecationWarning: This method will be removed in future versions.
Use 'parser.read_file()' instead.
  self.readfp(StringIO.StringIO(string))
[2017-07-27 19:25:54,308] {__init__.py:56} INFO - Using executor LocalExecutor
[2017-07-27 19:25:54,420] {driver.py:120} INFO - Generating grammar
tables from /usr/lib/python2.7/lib2to3/Grammar.txt
[2017-07-27 19:25:54,449] {driver.py:120} INFO - Generating grammar
tables from /usr/lib/python2.7/lib2to3/PatternGrammar.txt
[2017-07-27 19:25:54,718] {models.py:167} INFO - Filling up the DagBag
from /home/andrew_maguire/pmc-analytical-data-mart/airflow/dags
Traceback (most recent call last):
  File "/usr/local/bin/airflow", line 4, in <module>
    
__import__('pkg_resources').run_script('airflow==1.9.0.dev0+apache.incubating',
'airflow')
  File "/usr/lib/python2.7/dist-packages/pkg_resources/__init__.py",
line 719, in run_script
    self.require(requires)[0].run_script(script_name, ns)
  File "/usr/lib/python2.7/dist-packages/pkg_resources/__init__.py",
line 1504, in run_script
    exec(code, namespace, namespace)
  File 
"/usr/local/lib/python2.7/dist-packages/airflow-1.9.0.dev0+apache.incubating-py2.7.egg/EGG-INFO/scripts/airflow",
line 28, in <module>
    args.func(args)
  File 
"/usr/local/lib/python2.7/dist-packages/airflow-1.9.0.dev0+apache.incubating-py2.7.egg/airflow/bin/cli.py",
line 180, in trigger_dag
    execution_date=args.exec_date)
  File 
"/usr/local/lib/python2.7/dist-packages/airflow-1.9.0.dev0+apache.incubating-py2.7.egg/airflow/api/client/local_client.py",
line 24, in trigger_dag
    execution_date=execution_date)
  File 
"/usr/local/lib/python2.7/dist-packages/airflow-1.9.0.dev0+apache.incubating-py2.7.egg/airflow/api/common/experimental/trigger_dag.py",
line 56, in trigger_dag
    external_trigger=True
  File 
"/usr/local/lib/python2.7/dist-packages/airflow-1.9.0.dev0+apache.incubating-py2.7.egg/airflow/utils/db.py",
line 53, in wrapper
    result = func(*args, **kwargs)
  File 
"/usr/local/lib/python2.7/dist-packages/airflow-1.9.0.dev0+apache.incubating-py2.7.egg/airflow/models.py",
line 3396, in create_dagrun
    session.commit()
  File "build/bdist.linux-x86_64/egg/sqlalchemy/orm/session.py", line
874, in commit
  File "build/bdist.linux-x86_64/egg/sqlalchemy/orm/session.py", line
461, in commit
  File "build/bdist.linux-x86_64/egg/sqlalchemy/orm/session.py", line
441, in _prepare_impl
  File "build/bdist.linux-x86_64/egg/sqlalchemy/orm/session.py", line
2139, in flush
  File "build/bdist.linux-x86_64/egg/sqlalchemy/orm/session.py", line
2259, in _flush
  File "build/bdist.linux-x86_64/egg/sqlalchemy/util/langhelpers.py",
line 60, in __exit__
  File "build/bdist.linux-x86_64/egg/sqlalchemy/orm/session.py", line
2223, in _flush
  File "build/bdist.linux-x86_64/egg/sqlalchemy/orm/unitofwork.py",
line 389, in execute
  File "build/bdist.linux-x86_64/egg/sqlalchemy/orm/unitofwork.py",
line 548, in execute
  File "build/bdist.linux-x86_64/egg/sqlalchemy/orm/persistence.py",
line 181, in save_obj
  File "build/bdist.linux-x86_64/egg/sqlalchemy/orm/persistence.py",
line 835, in _emit_insert_statements
  File "build/bdist.linux-x86_64/egg/sqlalchemy/engine/base.py", line
945, in execute
  File "build/bdist.linux-x86_64/egg/sqlalchemy/sql/elements.py", line
263, in _execute_on_connection
  File "build/bdist.linux-x86_64/egg/sqlalchemy/engine/base.py", line
1053, in _execute_clauseelement
  File "build/bdist.linux-x86_64/egg/sqlalchemy/engine/base.py", line
1189, in _execute_context
  File "build/bdist.linux-x86_64/egg/sqlalchemy/engine/base.py", line
1393, in _handle_dbapi_exception
  File "build/bdist.linux-x86_64/egg/sqlalchemy/util/compat.py", line
203, in raise_from_cause
  File "build/bdist.linux-x86_64/egg/sqlalchemy/engine/base.py", line
1182, in _execute_context
  File "build/bdist.linux-x86_64/egg/sqlalchemy/engine/default.py",
line 470, in do_execute
  File "/usr/local/lib/python2.7/dist-packages/MySQLdb/cursors.py",
line 205, in execute
    self.errorhandler(self, exc, value)
  File "/usr/local/lib/python2.7/dist-packages/MySQLdb/connections.py",
line 36, in defaulterrorhandler
    raise errorclass, errorvalue
sqlalchemy.exc.IntegrityError: (_mysql_exceptions.IntegrityError)
(1062, "Duplicate entry 'ga_data-2017-06-26 00:00:00.000000' for key
'dag_id'") [SQL: u'INSERT INTO dag_run (dag_id, execution_date,
start_date, end_date, state, run_id, external_trigger, conf) VALUES
(%s, %s, now(), %s, %s, %s, %s, %s)'] [parameters: ('ga_data',
datetime.datetime(2017, 6, 26, 0, 0), None, u'running',
'manual__2017-06-26T00:00:00', 1,
'\x80\x02}q\x01X\x08\x00\x00\x00forcerunK\x01s.')]


On Thu, Jul 27, 2017 at 8:19 PM Daniel Huang <[email protected]> wrote:

> {'forcerun': 1} is not valid JSON because of the single quotes, thus the
> ValueError when decoding. Try --conf '{"forcerun": 1}'.
>
> -Daniel
>
> On Thu, Jul 27, 2017 at 11:52 AM, Andrew Maguire <[email protected]>
> wrote:
>
> > Hi,
> >
> > Does anyone have an example of how to pass --conf json string to
> > trigger_dag from the cli?
> >
> > I need to trigger a range of historic dag's but i don't want to do
> backfill
> > as the dag runs each hour and that would create many duplicate runs. Also
> > as part of the dag the first step is to check if source data has changed
> > and if not then do nothing - this is to try be data driven with the
> > execution.
> >
> > Issue is i've found a bug in my sql logic. So i want to force it to rerun
> > based on a param i want to send in via the command line (am guessing
> --conf
> > is way to do this?).
> >
> > So instead of a backfill i want to do a series of
> >
> > airflow trigger_dag
> >
> > Commands to essentially mimic running the dag once per day for a given
> date
> > with a param to tell it to ignore the usual data checks and just run
> > regardless.
> >
> > I tried:
> >
> > airflow trigger_dag ga_data --exec_date 2017-06-26 --conf
> "{'forcerun':1}"
> >
> >
> > Where 'forcerun' is a param i want to pass to tell the dag to ignore the
> > usual data driven checks and just run regardless.
> >
> > I get this error:
> >
> > /usr/local/lib/python2.7/dist-packages/airflow-1.9.0.dev0+
> > apache.incubating-py2.7.egg/airflow/configuration.py:553:
> > DeprecationWarning: This method will be removed in future versions.
> > Use 'parser.read_file()' instead.
> >   self.readfp(StringIO.StringIO(string))
> > [2017-07-27 18:37:03,706] {__init__.py:56} INFO - Using executor
> > LocalExecutor
> > [2017-07-27 18:37:03,812] {driver.py:120} INFO - Generating grammar
> > tables from /usr/lib/python2.7/lib2to3/Grammar.txt
> > [2017-07-27 18:37:03,842] {driver.py:120} INFO - Generating grammar
> > tables from /usr/lib/python2.7/lib2to3/PatternGrammar.txt
> > [2017-07-27 18:37:04,113] {models.py:167} INFO - Filling up the DagBag
> > from /home/andrew_maguire/pmc-analytical-data-mart/airflow/dags
> > Traceback (most recent call last):
> >   File "/usr/local/bin/airflow", line 4, in <module>
> >     __import__('pkg_resources').run_script('airflow==1.9.0.
> > dev0+apache.incubating',
> > 'airflow')
> >   File "/usr/lib/python2.7/dist-packages/pkg_resources/__init__.py",
> > line 719, in run_script
> >     self.require(requires)[0].run_script(script_name, ns)
> >   File "/usr/lib/python2.7/dist-packages/pkg_resources/__init__.py",
> > line 1504, in run_script
> >     exec(code, namespace, namespace)
> >   File "/usr/local/lib/python2.7/dist-packages/airflow-1.9.0.
> > dev0+apache.incubating-py2.7.egg/EGG-INFO/scripts/airflow",
> > line 28, in <module>
> >     args.func(args)
> >   File "/usr/local/lib/python2.7/dist-packages/airflow-1.9.0.
> > dev0+apache.incubating-py2.7.egg/airflow/bin/cli.py",
> > line 180, in trigger_dag
> >     execution_date=args.exec_date)
> >   File "/usr/local/lib/python2.7/dist-packages/airflow-1.9.0.
> > dev0+apache.incubating-py2.7.egg/airflow/api/client/local_client.py",
> > line 24, in trigger_dag
> >     execution_date=execution_date)
> >   File "/usr/local/lib/python2.7/dist-packages/airflow-1.9.0.
> > dev0+apache.incubating-py2.7.egg/airflow/api/common/
> > experimental/trigger_dag.py",
> > line 49, in trigger_dag
> >     run_conf = json.loads(conf)
> >   File "/usr/lib/python2.7/json/__init__.py", line 339, in loads
> >     return _default_decoder.decode(s)
> >   File "/usr/lib/python2.7/json/decoder.py", line 364, in decode
> >     obj, end = self.raw_decode(s, idx=_w(s, 0).end())
> >   File "/usr/lib/python2.7/json/decoder.py", line 380, in raw_decode
> >     obj, end = self.scan_once(s, idx)
> > ValueError: Expecting property name: line 1 column 2 (char 1)
> >
> >
> > I'm sure i either need to pass more key values to --conf of maybe
> structure
> > the json differently but i cant seem to find any examples anywhere of how
> > to go about this.
> >
> > Any pointers very welcome.
> >
> > Cheers,
> > Andy
> >
>

Reply via email to