Taragolis commented on code in PR #35090:
URL: https://github.com/apache/airflow/pull/35090#discussion_r1367888855


##########
airflow/providers/amazon/aws/operators/athena.py:
##########
@@ -163,3 +167,132 @@ def on_kill(self) -> None:
                         "Polling Athena for query with id %s to reach final 
state", self.query_execution_id
                     )
                     self.hook.poll_query_status(self.query_execution_id, 
sleep_time=self.sleep_time)
+
+    def get_openlineage_facets_on_start(self):
+        from openlineage.client.facet import ExtractionError, 
ExtractionErrorRunFacet, SqlJobFacet
+        from openlineage.client.run import Dataset
+
+        from airflow.providers.openlineage.extractors.base import 
OperatorLineage
+        from airflow.providers.openlineage.sqlparser import SQLParser
+
+        sql_parser = SQLParser(dialect="generic")
+
+        job_facets: dict[str, BaseFacet] = {"sql": 
SqlJobFacet(query=sql_parser.normalize_sql(self.query))}
+        parse_result = sql_parser.parse(sql=self.query)
+
+        if not parse_result:
+            return OperatorLineage(job_facets=job_facets)
+
+        run_facets: dict[str, BaseFacet] = {}
+        if parse_result.errors:
+            run_facets["extractionError"] = ExtractionErrorRunFacet(
+                totalTasks=len(self.query) if isinstance(self.query, list) 
else 1,
+                failedTasks=len(parse_result.errors),
+                errors=[
+                    ExtractionError(
+                        errorMessage=error.message,
+                        stackTrace=None,
+                        task=error.origin_statement,
+                        taskNumber=error.index,
+                    )
+                    for error in parse_result.errors
+                ],
+            )
+
+        inputs: list[Dataset] = list(
+            filter(
+                None,
+                [
+                    self.get_openlineage_dataset(table.schema or 
self.database, table.name)
+                    for table in parse_result.in_tables
+                ],
+            )
+        )
+
+        # Athena can output query result to a new table with CTAS query.
+        # cf. https://docs.aws.amazon.com/athena/latest/ug/ctas.html
+        outputs: list[Dataset] = list(
+            filter(
+                None,
+                [
+                    self.get_openlineage_dataset(table.schema or 
self.database, table.name)
+                    for table in parse_result.out_tables
+                ],
+            )
+        )
+
+        # In addition to CTAS query, it's also possible to specify output 
location on S3
+        # with a mandatory parameter, OutputLocation in ResultConfiguration.
+        # cf. 
https://docs.aws.amazon.com/athena/latest/APIReference/API_ResultConfiguration.html#athena-Type-ResultConfiguration-OutputLocation
  # noqa: E501
+        #
+        # Depending on the query type and the external_location property in 
the CTAS query,
+        # its behavior changes as follows:
+        #
+        # * Normal SELECT statement
+        #   -> The result is put into output_location as files rather than a 
table.
+        #
+        # * CTAS statement without external_location (`CREATE TABLE ... AS 
SELECT ...`)
+        #   -> The result is put into output_location as a table,
+        #      that is, both metadata files and data files are in there.
+        #
+        # * CTAS statement with external_location
+        #   (`CREATE TABLE ... WITH (external_location='s3://bucket/key') AS 
SELECT ...`)
+        #   -> The result is output as a table, but metadata and data files are
+        #      separated into output_location and external_location 
respectively.
+        #
+        # For the last case, output_location may be unnecessary as OL's output 
information,
+        # but we keep it as of now since it may be useful for some purpose.

Review Comment:
   > If user doesnt set it right I guess boto3/athena api will raise exception
   
   Yeas that is true. For example two cases
   
   With this settings on `primary` group, which is used by default in boto3 / 
AthenaOperator
   
![image](https://github.com/apache/airflow/assets/3998685/98a5c2bf-424c-4b5c-b233-3fb1c8ea90cf)
   
   This simple code would run without any issues thought `boto3`
   
   ```python
   import boto3
   
   session = boto3.session.Session(...)
   client = session.client(service_name="athena")
   
   response = client.start_query_execution(QueryString="SELECT 1")
   assert response
   ```
   
   However if WorkGroup doesn't assign any output location, like this one:
   
![image](https://github.com/apache/airflow/assets/3998685/8eaedf77-e72d-4b20-a03c-a0cc0d9f0a90)
   
   Then it would raise an error
   ```python
   import boto3
   
   session = boto3.session.Session(...)
   client = session.client(service_name="athena")
   
   response = client.start_query_execution(QueryString="SELECT 1", 
WorkGroup="test_workgroup")
   ```
   
   ```console
   botocore.errorfactory.InvalidRequestException: An error occurred 
(InvalidRequestException) when calling the StartQueryExecution operation: No 
output location provided. An output location is required either through the 
Workgroup result configuration setting or as an API input.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to