mrn-aglic commented on issue #24388:
URL: https://github.com/apache/airflow/issues/24388#issuecomment-1259108844
Thanks for the response. I tried to do it a couple of ways, including this:
`task.expand_input.value.parameters.batch_start`
But I get the following error:
```
[2022-09-27, 07:36:38 UTC] {crypto.py:84} WARNING - empty cryptography key -
values will not be stored encrypted.
[2022-09-27, 07:36:38 UTC] {base.py:71} INFO - Using connection ID 'src' for
task execution.
[2022-09-27, 07:36:38 UTC] {sql.py:315} INFO - Running statement: SELECT *
FROM match_scores OFFSET {{
task_instance.xcom_pull(task_ids='get_batch_starts',
dag_id='task_mapping_classic_operators', key='batch_start') }} LIMIT 80,
parameters: {'batch_start': '80', 'batch_end': '160'}
[2022-09-27, 07:36:38 UTC] {taskinstance.py:1851} ERROR - Task failed with
exception
Traceback (most recent call last):
File
"/home/airflow/.local/lib/python3.9/site-packages/airflow/providers/postgres/operators/postgres.py",
line 92, in execute
self.hook.run(self.sql, self.autocommit, parameters=self.parameters)
File
"/home/airflow/.local/lib/python3.9/site-packages/airflow/providers/common/sql/hooks/sql.py",
line 295, in run
self._run_command(cur, sql_statement, parameters)
File
"/home/airflow/.local/lib/python3.9/site-packages/airflow/providers/common/sql/hooks/sql.py",
line 318, in _run_command
cur.execute(sql_statement, parameters)
psycopg2.errors.SyntaxError: syntax error at or near "{"
LINE 1: SELECT * FROM match_scores OFFSET {{ task_instance.xcom_pull...
^
[2022-09-27, 07:36:38 UTC] {taskinstance.py:1401} INFO - Marking task as
FAILED. dag_id=task_mapping_classic_operators, task_id=batch_select,
map_index=1, execution_date=20220927T040000, start_date=20220927T073637,
end_date=20220927T073638
[2022-09-27, 07:36:38 UTC] {standard_task_runner.py:102} ERROR - Failed to
execute job 171 for task batch_select (syntax error at or near "{"
LINE 1: SELECT * FROM match_scores OFFSET {{ task_instance.xcom_pull...
^
```
Here is the full operator definition:
```
batch_select_op = PostgresOperator.partial(
task_id="batch_select",
postgres_conn_id="src",
sql=f"SELECT * FROM match_scores OFFSET {{{{
task.expand_input.value.parameters.batch_start }}}} LIMIT {BATCH_SIZE}"
).expand(parameters=get_batch_starts.output)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]