juarezr commented on PR #57514:
URL: https://github.com/apache/airflow/pull/57514#issuecomment-3507242927
> @MaksYermak
>
> @juarezr could I ask you to check the scenario with JSON format for
`MSSQLToGCSOperator` when in parameters `schema_filename` is present and
`schema` is not present and share with us the results that file with schema is
generating successfully for JSON format?
@MaksYermak,
The dag/task executed successfully. Please check the source code below.
The only unexpected thing that happened is that the schema file was also
uploaded to the same path in the bucket as the local filename:
- Local: `/tmp/TEST_QUERY_SCHEMA_JSON_FILE.json`
- Remote: `gs://my-bucket//tmp/TEST_QUERY_SCHEMA_JSON_FILE.json`
I hope that I've interpreted correctly you request.
### Dag Souce Code
```python
TEST_QUERY = """
SELECT TOP 5
TABLE_CATALOG,
TABLE_SCHEMA,
TABLE_NAME,
TABLE_TYPE,
CAST(
CASE WHEN TABLE_TYPE = 'BASE TABLE' THEN 1 ELSE 0 END
AS BIT) AS IS_BASE_TABLE
FROM INFORMATION_SCHEMA.TABLES
ORDER BY 1, 2, 3
"""
TEST_QUERY_SCHEMA_JSON_FILE = "/tmp/TEST_QUERY_SCHEMA_JSON_FILE.json"
TEST_QUERY_SCHEMA_JSON = """
[
{
"name": "TABLE_CATALOG",
"type": "STRING",
"mode": "REQUIRED",
"description": "MSSQL Database Name"
},
{
"name": "TABLE_SCHEMA",
"type": "STRING",
"mode": "REQUIRED",
"description": "MSSQL Schema
Name"gs://my-bucket//tmp/TEST_QUERY_SCHEMA_JSON_FILE.json
"type": "STRING",
"mode": "REQUIRED",
"description": "MSSQL Table Name"
},
{
"name": "TABLE_TYPE",
"type": "STRING",
"mode": "REQUIRED",
"description": "MSSQL Table Type"
},
{
"name": "IS_BASE_TABLE",
"type": "BOOL",
"mode": "NULLABLE",
"description": "MSSQL Table Type is Base Table"
}
]
"""
@dag(
schedule=None,
tags=["test", "connectivity", "storage"],
description="""
### MSSQL Data Transfer Test
This is a simple data pipeline testing data transfer from MSSQL to GCS
""",
)
def test_mssql_gcs():
# Write the schema file to the local filesystem
with open(TEST_QUERY_SCHEMA_JSON_FILE, "w") as f:
f.write(TEST_QUERY_SCHEMA_JSON)
test_mssql_gcs_as_json_schema_file = MSSQLToGCSOperator(
task_id="test_mssql_gcs_as_json_schema_file",
mssql_conn_id=MSSQL_CONN_ID,
gcp_conn_id=GCS_CONN_ID,
sql=TEST_QUERY,
bucket=GCS_BUCKET,
export_format="json",
filename="test/test_mssql_gcs/test_mssql_gcs_as_json_schema_file.json",
schema_filename=TEST_QUERY_SCHEMA_JSON_FILE,
)
# ...
```
### Task log
```json
{"timestamp":"2025-11-09T00:02:32.657940Z","level":"info","event":"DAG
bundles loaded: dags-folder,
example_dags","logger":"airflow.dag_processing.bundles.manager.DagBundlesManager","filename":"manager.py","lineno":179}
{"timestamp":"2025-11-09T00:02:32.658511Z","level":"info","event":"Filling
up the DagBag from
/opt/airflow/dags/test_mssql_gcs.py","logger":"airflow.models.dagbag.DagBag","filename":"dagbag.py","lineno":593}
{"timestamp":"2025-11-09T00:02:34.191557Z","level":"info","event":"Task
instance is in running state","logger":"task.stdout"}
{"timestamp":"2025-11-09T00:02:34.191652Z","level":"info","event":" Previous
state of the Task instance: TaskInstanceState.QUEUED","logger":"task.stdout"}
{"timestamp":"2025-11-09T00:02:34.193007Z","level":"info","event":"Current
task name:test_mssql_gcs_as_json_schema_file","logger":"task.stdout"}
{"timestamp":"2025-11-09T00:02:34.193061Z","level":"info","event":"Dag
name:test_mssql_gcs","logger":"task.stdout"}
{"timestamp":"2025-11-09T00:02:34.193105Z","level":"info","event":"Executing
query","logger":"airflow.task.operators.custom.operators.MSSQLToGCSOperator","filename":"sql_to_gcs.py","lineno":166}
{"timestamp":"2025-11-09T00:02:34.207059Z","level":"info","event":"Writing
local schema
file","logger":"airflow.task.operators.custom.operators.MSSQLToGCSOperator","filename":"sql_to_gcs.py","lineno":171}
{"timestamp":"2025-11-09T00:02:34.207155Z","level":"info","event":"Starts
generating
schema","logger":"airflow.task.operators.custom.operators.MSSQLToGCSOperator","filename":"sql_to_gcs.py","lineno":455}
{"timestamp":"2025-11-09T00:02:34.207529Z","level":"info","event":"Using
schema for
/tmp/TEST_QUERY_SCHEMA_JSON_FILE.json","logger":"airflow.task.operators.custom.operators.MSSQLToGCSOperator","filename":"sql_to_gcs.py","lineno":465}
{"timestamp":"2025-11-09T00:02:34.207807Z","level":"info","event":"Uploading
schema file to
GCS.","logger":"airflow.task.operators.custom.operators.MSSQLToGCSOperator","filename":"sql_to_gcs.py","lineno":177}
{"timestamp":"2025-11-09T00:02:34.212913Z","level":"warning","event":"Connection
schemes (type: google_cloud_platform) shall not contain '_' according to
RFC3986.","logger":"airflow.sdk.definitions.connection","filename":"connection.py","lineno":130}
{"timestamp":"2025-11-09T00:02:34.215900Z","level":"info","event":"Getting
connection using `google.auth.default()` since no explicit credentials are
provided.","logger":"airflow.providers.google.cloud.utils.credentials_provider._CredentialProvider","filename":"credentials_provider.py","lineno":410}
{"timestamp":"2025-11-09T00:02:35.944849Z","level":"info","event":"File
/tmp/tmp3ke_phhx uploaded to /tmp/TEST_QUERY_SCHEMA_JSON_FILE.json in my-bucket
bucket","logger":"airflow.task.hooks.airflow.providers.google.cloud.hooks.gcs.GCSHook","filename":"gcs.py","lineno":579}
{"timestamp":"2025-11-09T00:02:35.944998Z","level":"info","event":"Writing
local data
files","logger":"airflow.task.operators.custom.operators.MSSQLToGCSOperator","filename":"sql_to_gcs.py","lineno":185}
{"timestamp":"2025-11-09T00:02:35.945422Z","level":"info","event":"Uploading
chunk file #0 to
GCS.","logger":"airflow.task.operators.custom.operators.MSSQLToGCSOperator","filename":"sql_to_gcs.py","lineno":190}
{"timestamp":"2025-11-09T00:02:35.951310Z","level":"warning","event":"Connection
schemes (type: google_cloud_platform) shall not contain '_' according to
RFC3986.","logger":"airflow.sdk.definitions.connection","filename":"connection.py","lineno":130}
{"timestamp":"2025-11-09T00:02:35.952119Z","level":"info","event":"Getting
connection using `google.auth.default()` since no explicit credentials are
provided.","logger":"airflow.providers.google.cloud.utils.credentials_provider._CredentialProvider","filename":"credentials_provider.py","lineno":410}
{"timestamp":"2025-11-09T00:02:37.522008Z","level":"info","event":"File
/tmp/tmp7kq_evsy uploaded to
test/test_mssql_gcs/test_mssql_gcs_as_json_schema_file.json in my-bucket
bucket","logger":"airflow.task.hooks.airflow.providers.google.cloud.hooks.gcs.GCSHook","filename":"gcs.py","lineno":579}
{"timestamp":"2025-11-09T00:02:37.522115Z","level":"info","event":"Removing
local
file","logger":"airflow.task.operators.custom.operators.MSSQLToGCSOperator","filename":"sql_to_gcs.py","lineno":193}
{"timestamp":"2025-11-09T00:02:37.522820Z","level":"info","event":"Pushing
xcom","ti":"RuntimeTaskInstance(id=UUID('019a65eb-98e6-785d-abdd-4948329cba31'),
task_id='test_mssql_gcs_as_json_schema_file', dag_id='test_mssql_gcs',
run_id='manual__2025-11-09T00:02:09+00:00', try_number=1,
dag_version_id=UUID('019a2724-b18d-73ac-822c-4fdb0764d0c4'), map_index=-1,
hostname='airflow', context_carrier={}, task=<Task(MSSQLToGCSOperator):
test_mssql_gcs_as_json_schema_file>,
bundle_instance=LocalDagBundle(name=dags-folder), max_tries=0,
start_date=datetime.datetime(2025, 11, 9, 0, 2, 32, 636275,
tzinfo=datetime.timezone.utc), end_date=None, state=<TaskInstanceState.RUNNING:
'running'>, is_mapped=False,
rendered_map_index=None)","logger":"task","filename":"task_runner.py","lineno":1357}
{"timestamp":"2025-11-09T00:02:37.550763Z","level":"info","event":"Task
instance in success state","logger":"task.stdout"}
{"timestamp":"2025-11-09T00:02:37.550911Z","level":"info","event":" Previous
state of the Task instance: TaskInstanceState.RUNNING","logger":"task.stdout"}
{"timestamp":"2025-11-09T00:02:37.552842Z","level":"info","event":"Task
operator:<Task(MSSQLToGCSOperator):
test_mssql_gcs_as_json_schema_file>","logger":"task.stdout"}
```
### gs://my-bucket//tmp/TEST_QUERY_SCHEMA_JSON_FILE.json
```json
[{"mode": "NULLABLE", "name": "TABLE_CATALOG", "type": "STRING"}, {"mode":
"NULLABLE", "name": "TABLE_SCHEMA", "type": "STRING"}, {"mode": "NULLABLE",
"name": "TABLE_NAME", "type": "STRING"}, {"mode": "NULLABLE", "name":
"TABLE_TYPE", "type": "STRING"}, {"mode": "NULLABLE", "name": "IS_BASE_TABLE",
"type": "INTEGER"}]
```
###
gs://my-bucket/test/test_mssql_gcs/test_mssql_gcs_as_json_schema_file.json
```json
{"IS_BASE_TABLE": true, "TABLE_CATALOG": "fleet", "TABLE_NAME":
"__EFMigrationsHistory", "TABLE_SCHEMA": "dbo", "TABLE_TYPE": "BASE TABLE"}
{"IS_BASE_TABLE": true, "TABLE_CATALOG": "fleet", "TABLE_NAME": "Alert",
"TABLE_SCHEMA": "dbo", "TABLE_TYPE": "BASE TABLE"}
{"IS_BASE_TABLE": true, "TABLE_CATALOG": "fleet", "TABLE_NAME":
"AlertBlackList", "TABLE_SCHEMA": "dbo", "TABLE_TYPE": "BASE TABLE"}
{"IS_BASE_TABLE": true, "TABLE_CATALOG": "fleet", "TABLE_NAME":
"AlertRulesGroup", "TABLE_SCHEMA": "dbo", "TABLE_TYPE": "BASE TABLE"}
{"IS_BASE_TABLE": true, "TABLE_CATALOG": "fleet", "TABLE_NAME":
"AlertRulesGroupTrackableObject", "TABLE_SCHEMA": "dbo", "TABLE_TYPE": "BASE
TABLE"}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]