VladaZakharova commented on code in PR #32487:
URL: https://github.com/apache/airflow/pull/32487#discussion_r1269084842
##########
tests/system/providers/google/cloud/dataflow/example_dataflow_sql.py:
##########
@@ -24,44 +24,85 @@
from datetime import datetime
from airflow import models
+from airflow.providers.google.cloud.operators.bigquery import (
+ BigQueryCreateEmptyDatasetOperator,
+ BigQueryCreateEmptyTableOperator,
+ BigQueryDeleteDatasetOperator,
+)
from airflow.providers.google.cloud.operators.dataflow import
DataflowStartSqlJobOperator
+from airflow.utils.trigger_rule import TriggerRule
-GCP_PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-project")
+DAG_ID = "example_gcp_dataflow_sql"
+
+GCP_PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "example-project")
BQ_SQL_DATASET = os.environ.get("GCP_DATAFLOW_BQ_SQL_DATASET",
"airflow_dataflow_samples")
BQ_SQL_TABLE_INPUT = os.environ.get("GCP_DATAFLOW_BQ_SQL_TABLE_INPUT",
"beam_input")
BQ_SQL_TABLE_OUTPUT = os.environ.get("GCP_DATAFLOW_BQ_SQL_TABLE_OUTPUT",
"beam_output")
DATAFLOW_SQL_JOB_NAME = os.environ.get("GCP_DATAFLOW_SQL_JOB_NAME",
"dataflow-sql")
DATAFLOW_SQL_LOCATION = os.environ.get("GCP_DATAFLOW_SQL_LOCATION", "us-west1")
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
+DATASET_NAME = f"dataset_{DAG_ID}_{ENV_ID}"
+BQ_LOCATION = "europe-north1"
with models.DAG(
- dag_id="example_gcp_dataflow_sql",
+ dag_id=DAG_ID,
start_date=datetime(2021, 1, 1),
catchup=False,
tags=["example"],
-) as dag_sql:
+) as dag:
+ create_dataset_with_location = BigQueryCreateEmptyDatasetOperator(
+ task_id="create_dataset_with_location", dataset_id=DATASET_NAME,
location=BQ_LOCATION
+ )
+
+ create_table_with_location = BigQueryCreateEmptyTableOperator(
+ task_id="create_table_with_location",
+ dataset_id=DATASET_NAME,
+ table_id="test_table",
Review Comment:
Why not use here a variable for table_id? It will be more flexible if the
user whats to change the name
##########
tests/system/providers/google/cloud/dataflow/example_dataflow_sql.py:
##########
@@ -24,44 +24,85 @@
from datetime import datetime
from airflow import models
+from airflow.providers.google.cloud.operators.bigquery import (
+ BigQueryCreateEmptyDatasetOperator,
+ BigQueryCreateEmptyTableOperator,
+ BigQueryDeleteDatasetOperator,
+)
from airflow.providers.google.cloud.operators.dataflow import
DataflowStartSqlJobOperator
+from airflow.utils.trigger_rule import TriggerRule
-GCP_PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-project")
+DAG_ID = "example_gcp_dataflow_sql"
+
+GCP_PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "example-project")
BQ_SQL_DATASET = os.environ.get("GCP_DATAFLOW_BQ_SQL_DATASET",
"airflow_dataflow_samples")
BQ_SQL_TABLE_INPUT = os.environ.get("GCP_DATAFLOW_BQ_SQL_TABLE_INPUT",
"beam_input")
BQ_SQL_TABLE_OUTPUT = os.environ.get("GCP_DATAFLOW_BQ_SQL_TABLE_OUTPUT",
"beam_output")
DATAFLOW_SQL_JOB_NAME = os.environ.get("GCP_DATAFLOW_SQL_JOB_NAME",
"dataflow-sql")
DATAFLOW_SQL_LOCATION = os.environ.get("GCP_DATAFLOW_SQL_LOCATION", "us-west1")
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
+DATASET_NAME = f"dataset_{DAG_ID}_{ENV_ID}"
+BQ_LOCATION = "europe-north1"
with models.DAG(
- dag_id="example_gcp_dataflow_sql",
+ dag_id=DAG_ID,
start_date=datetime(2021, 1, 1),
catchup=False,
tags=["example"],
-) as dag_sql:
+) as dag:
+ create_dataset_with_location = BigQueryCreateEmptyDatasetOperator(
+ task_id="create_dataset_with_location", dataset_id=DATASET_NAME,
location=BQ_LOCATION
+ )
+
+ create_table_with_location = BigQueryCreateEmptyTableOperator(
+ task_id="create_table_with_location",
+ dataset_id=DATASET_NAME,
+ table_id="test_table",
+ schema_fields=[
+ {"name": "emp_name", "type": "STRING", "mode": "REQUIRED"},
+ {"name": "salary", "type": "INTEGER", "mode": "NULLABLE"},
+ ],
+ )
+
# [START howto_operator_start_sql_job]
+
start_sql = DataflowStartSqlJobOperator(
task_id="start_sql_query",
job_name=DATAFLOW_SQL_JOB_NAME,
query=f"""
Review Comment:
```suggestion
query=f"
```
##########
tests/system/providers/google/cloud/dataflow/example_dataflow_sql.py:
##########
@@ -24,44 +24,85 @@
from datetime import datetime
from airflow import models
+from airflow.providers.google.cloud.operators.bigquery import (
+ BigQueryCreateEmptyDatasetOperator,
+ BigQueryCreateEmptyTableOperator,
+ BigQueryDeleteDatasetOperator,
+)
from airflow.providers.google.cloud.operators.dataflow import
DataflowStartSqlJobOperator
+from airflow.utils.trigger_rule import TriggerRule
-GCP_PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-project")
+DAG_ID = "example_gcp_dataflow_sql"
+
+GCP_PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "example-project")
BQ_SQL_DATASET = os.environ.get("GCP_DATAFLOW_BQ_SQL_DATASET",
"airflow_dataflow_samples")
BQ_SQL_TABLE_INPUT = os.environ.get("GCP_DATAFLOW_BQ_SQL_TABLE_INPUT",
"beam_input")
BQ_SQL_TABLE_OUTPUT = os.environ.get("GCP_DATAFLOW_BQ_SQL_TABLE_OUTPUT",
"beam_output")
DATAFLOW_SQL_JOB_NAME = os.environ.get("GCP_DATAFLOW_SQL_JOB_NAME",
"dataflow-sql")
DATAFLOW_SQL_LOCATION = os.environ.get("GCP_DATAFLOW_SQL_LOCATION", "us-west1")
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
+DATASET_NAME = f"dataset_{DAG_ID}_{ENV_ID}"
+BQ_LOCATION = "europe-north1"
with models.DAG(
- dag_id="example_gcp_dataflow_sql",
+ dag_id=DAG_ID,
start_date=datetime(2021, 1, 1),
catchup=False,
tags=["example"],
-) as dag_sql:
+) as dag:
+ create_dataset_with_location = BigQueryCreateEmptyDatasetOperator(
+ task_id="create_dataset_with_location", dataset_id=DATASET_NAME,
location=BQ_LOCATION
+ )
+
+ create_table_with_location = BigQueryCreateEmptyTableOperator(
+ task_id="create_table_with_location",
+ dataset_id=DATASET_NAME,
+ table_id="test_table",
+ schema_fields=[
+ {"name": "emp_name", "type": "STRING", "mode": "REQUIRED"},
+ {"name": "salary", "type": "INTEGER", "mode": "NULLABLE"},
+ ],
+ )
+
# [START howto_operator_start_sql_job]
+
start_sql = DataflowStartSqlJobOperator(
task_id="start_sql_query",
job_name=DATAFLOW_SQL_JOB_NAME,
query=f"""
- SELECT
- sales_region as sales_region,
- count(state_id) as count_state
- FROM
-
bigquery.table.`{GCP_PROJECT_ID}`.`{BQ_SQL_DATASET}`.`{BQ_SQL_TABLE_INPUT}`
- WHERE state_id >= @state_id_min
- GROUP BY sales_region;
+ SELECT * FROM {DATASET_NAME}.test_table;
""",
Review Comment:
```suggestion
",
```
##########
tests/system/providers/google/cloud/dataflow/example_dataflow_sql.py:
##########
@@ -24,44 +24,85 @@
from datetime import datetime
from airflow import models
+from airflow.providers.google.cloud.operators.bigquery import (
+ BigQueryCreateEmptyDatasetOperator,
+ BigQueryCreateEmptyTableOperator,
+ BigQueryDeleteDatasetOperator,
+)
from airflow.providers.google.cloud.operators.dataflow import
DataflowStartSqlJobOperator
+from airflow.utils.trigger_rule import TriggerRule
-GCP_PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-project")
+DAG_ID = "example_gcp_dataflow_sql"
+
+GCP_PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "example-project")
BQ_SQL_DATASET = os.environ.get("GCP_DATAFLOW_BQ_SQL_DATASET",
"airflow_dataflow_samples")
BQ_SQL_TABLE_INPUT = os.environ.get("GCP_DATAFLOW_BQ_SQL_TABLE_INPUT",
"beam_input")
BQ_SQL_TABLE_OUTPUT = os.environ.get("GCP_DATAFLOW_BQ_SQL_TABLE_OUTPUT",
"beam_output")
DATAFLOW_SQL_JOB_NAME = os.environ.get("GCP_DATAFLOW_SQL_JOB_NAME",
"dataflow-sql")
DATAFLOW_SQL_LOCATION = os.environ.get("GCP_DATAFLOW_SQL_LOCATION", "us-west1")
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
+DATASET_NAME = f"dataset_{DAG_ID}_{ENV_ID}"
+BQ_LOCATION = "europe-north1"
with models.DAG(
- dag_id="example_gcp_dataflow_sql",
+ dag_id=DAG_ID,
start_date=datetime(2021, 1, 1),
catchup=False,
tags=["example"],
Review Comment:
Will be good to add additional tag here, so that on UI it would be visible
that this test is for dataflow
##########
tests/system/providers/google/cloud/dataflow/example_dataflow_sql.py:
##########
@@ -24,44 +24,85 @@
from datetime import datetime
from airflow import models
+from airflow.providers.google.cloud.operators.bigquery import (
+ BigQueryCreateEmptyDatasetOperator,
+ BigQueryCreateEmptyTableOperator,
+ BigQueryDeleteDatasetOperator,
+)
from airflow.providers.google.cloud.operators.dataflow import
DataflowStartSqlJobOperator
+from airflow.utils.trigger_rule import TriggerRule
-GCP_PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-project")
+DAG_ID = "example_gcp_dataflow_sql"
+
+GCP_PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "example-project")
BQ_SQL_DATASET = os.environ.get("GCP_DATAFLOW_BQ_SQL_DATASET",
"airflow_dataflow_samples")
BQ_SQL_TABLE_INPUT = os.environ.get("GCP_DATAFLOW_BQ_SQL_TABLE_INPUT",
"beam_input")
BQ_SQL_TABLE_OUTPUT = os.environ.get("GCP_DATAFLOW_BQ_SQL_TABLE_OUTPUT",
"beam_output")
DATAFLOW_SQL_JOB_NAME = os.environ.get("GCP_DATAFLOW_SQL_JOB_NAME",
"dataflow-sql")
DATAFLOW_SQL_LOCATION = os.environ.get("GCP_DATAFLOW_SQL_LOCATION", "us-west1")
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
+DATASET_NAME = f"dataset_{DAG_ID}_{ENV_ID}"
+BQ_LOCATION = "europe-north1"
with models.DAG(
- dag_id="example_gcp_dataflow_sql",
+ dag_id=DAG_ID,
start_date=datetime(2021, 1, 1),
catchup=False,
tags=["example"],
-) as dag_sql:
+) as dag:
+ create_dataset_with_location = BigQueryCreateEmptyDatasetOperator(
+ task_id="create_dataset_with_location", dataset_id=DATASET_NAME,
location=BQ_LOCATION
+ )
+
+ create_table_with_location = BigQueryCreateEmptyTableOperator(
+ task_id="create_table_with_location",
+ dataset_id=DATASET_NAME,
+ table_id="test_table",
+ schema_fields=[
+ {"name": "emp_name", "type": "STRING", "mode": "REQUIRED"},
+ {"name": "salary", "type": "INTEGER", "mode": "NULLABLE"},
+ ],
+ )
+
# [START howto_operator_start_sql_job]
+
start_sql = DataflowStartSqlJobOperator(
task_id="start_sql_query",
job_name=DATAFLOW_SQL_JOB_NAME,
query=f"""
- SELECT
- sales_region as sales_region,
- count(state_id) as count_state
- FROM
-
bigquery.table.`{GCP_PROJECT_ID}`.`{BQ_SQL_DATASET}`.`{BQ_SQL_TABLE_INPUT}`
- WHERE state_id >= @state_id_min
- GROUP BY sales_region;
+ SELECT * FROM {DATASET_NAME}.test_table;
""",
options={
"bigquery-project": GCP_PROJECT_ID,
- "bigquery-dataset": BQ_SQL_DATASET,
- "bigquery-table": BQ_SQL_TABLE_OUTPUT,
- "bigquery-write-disposition": "write-truncate",
- "parameter": "state_id_min:INT64:2",
+ "bigquery-dataset": DATASET_NAME,
+ "bigquery-table": "test_table",
Review Comment:
The same here, what do you think about have this name in separate variable?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]