juarezr commented on PR #57514:
URL: https://github.com/apache/airflow/pull/57514#issuecomment-3507242927

   > @MaksYermak
   > 
   > @juarezr could I ask you to check the scenario with JSON format for 
`MSSQLToGCSOperator` when in parameters `schema_filename` is present and 
`schema` is not present and share with us the results that file with schema is 
generating successfully for JSON format?
   
   @MaksYermak,
   
   The dag/task executed successfully. Please check the source code below.
   
   The only unexpected thing that happened is that the schema file was also 
uploaded to the same path in the bucket as the local filename:
   
   - Local: `/tmp/TEST_QUERY_SCHEMA_JSON_FILE.json`
   - Remote: `gs://my-bucket//tmp/TEST_QUERY_SCHEMA_JSON_FILE.json`
   
   I hope that I've interpreted correctly you request.
   
   ### Dag Souce Code
   
   ```python
   
   TEST_QUERY = """
     SELECT TOP 5
           TABLE_CATALOG,
           TABLE_SCHEMA,
           TABLE_NAME,
           TABLE_TYPE,
           CAST(
             CASE WHEN TABLE_TYPE = 'BASE TABLE' THEN 1 ELSE 0 END
             AS BIT) AS IS_BASE_TABLE
       FROM INFORMATION_SCHEMA.TABLES
      ORDER BY 1, 2, 3
   """
   
   TEST_QUERY_SCHEMA_JSON_FILE = "/tmp/TEST_QUERY_SCHEMA_JSON_FILE.json"
   
   TEST_QUERY_SCHEMA_JSON = """
   [
     {
       "name": "TABLE_CATALOG",
       "type": "STRING",
       "mode": "REQUIRED",
       "description": "MSSQL Database Name"
     },
     {
       "name": "TABLE_SCHEMA",
       "type": "STRING",
       "mode": "REQUIRED",
       "description": "MSSQL Schema 
Name"gs://my-bucket//tmp/TEST_QUERY_SCHEMA_JSON_FILE.json
       "type": "STRING",
       "mode": "REQUIRED",
       "description": "MSSQL Table Name"
     },
     {
       "name": "TABLE_TYPE",
       "type": "STRING",
       "mode": "REQUIRED",
       "description": "MSSQL Table Type"
     },
     {
       "name": "IS_BASE_TABLE",
       "type": "BOOL",
       "mode": "NULLABLE",
       "description": "MSSQL Table Type is Base Table"
     }
   ]
   """
   
   @dag(
       schedule=None,
       tags=["test", "connectivity", "storage"],
       description="""
       ### MSSQL Data Transfer Test
   
       This is a simple data pipeline testing data transfer from MSSQL to GCS
       """,
   )
   def test_mssql_gcs():
   
       # Write the schema file to the local filesystem
       with open(TEST_QUERY_SCHEMA_JSON_FILE, "w") as f:
           f.write(TEST_QUERY_SCHEMA_JSON)
   
       test_mssql_gcs_as_json_schema_file = MSSQLToGCSOperator(
           task_id="test_mssql_gcs_as_json_schema_file",
           mssql_conn_id=MSSQL_CONN_ID,
           gcp_conn_id=GCS_CONN_ID,
           sql=TEST_QUERY,
           bucket=GCS_BUCKET,
           export_format="json",
           
filename="test/test_mssql_gcs/test_mssql_gcs_as_json_schema_file.json",
           schema_filename=TEST_QUERY_SCHEMA_JSON_FILE,
       )
   
       # ...
   
   ```
   
   ### Task log
   
   ```json
   {"timestamp":"2025-11-09T00:02:32.657940Z","level":"info","event":"DAG 
bundles loaded: dags-folder, 
example_dags","logger":"airflow.dag_processing.bundles.manager.DagBundlesManager","filename":"manager.py","lineno":179}
   {"timestamp":"2025-11-09T00:02:32.658511Z","level":"info","event":"Filling 
up the DagBag from 
/opt/airflow/dags/test_mssql_gcs.py","logger":"airflow.models.dagbag.DagBag","filename":"dagbag.py","lineno":593}
   {"timestamp":"2025-11-09T00:02:34.191557Z","level":"info","event":"Task 
instance is in running state","logger":"task.stdout"}
   {"timestamp":"2025-11-09T00:02:34.191652Z","level":"info","event":" Previous 
state of the Task instance: TaskInstanceState.QUEUED","logger":"task.stdout"}
   {"timestamp":"2025-11-09T00:02:34.193007Z","level":"info","event":"Current 
task name:test_mssql_gcs_as_json_schema_file","logger":"task.stdout"}
   {"timestamp":"2025-11-09T00:02:34.193061Z","level":"info","event":"Dag 
name:test_mssql_gcs","logger":"task.stdout"}
   {"timestamp":"2025-11-09T00:02:34.193105Z","level":"info","event":"Executing 
query","logger":"airflow.task.operators.custom.operators.MSSQLToGCSOperator","filename":"sql_to_gcs.py","lineno":166}
   {"timestamp":"2025-11-09T00:02:34.207059Z","level":"info","event":"Writing 
local schema 
file","logger":"airflow.task.operators.custom.operators.MSSQLToGCSOperator","filename":"sql_to_gcs.py","lineno":171}
   {"timestamp":"2025-11-09T00:02:34.207155Z","level":"info","event":"Starts 
generating 
schema","logger":"airflow.task.operators.custom.operators.MSSQLToGCSOperator","filename":"sql_to_gcs.py","lineno":455}
   {"timestamp":"2025-11-09T00:02:34.207529Z","level":"info","event":"Using 
schema for 
/tmp/TEST_QUERY_SCHEMA_JSON_FILE.json","logger":"airflow.task.operators.custom.operators.MSSQLToGCSOperator","filename":"sql_to_gcs.py","lineno":465}
   {"timestamp":"2025-11-09T00:02:34.207807Z","level":"info","event":"Uploading 
schema file to 
GCS.","logger":"airflow.task.operators.custom.operators.MSSQLToGCSOperator","filename":"sql_to_gcs.py","lineno":177}
   
{"timestamp":"2025-11-09T00:02:34.212913Z","level":"warning","event":"Connection
 schemes (type: google_cloud_platform) shall not contain '_' according to 
RFC3986.","logger":"airflow.sdk.definitions.connection","filename":"connection.py","lineno":130}
   {"timestamp":"2025-11-09T00:02:34.215900Z","level":"info","event":"Getting 
connection using `google.auth.default()` since no explicit credentials are 
provided.","logger":"airflow.providers.google.cloud.utils.credentials_provider._CredentialProvider","filename":"credentials_provider.py","lineno":410}
   {"timestamp":"2025-11-09T00:02:35.944849Z","level":"info","event":"File 
/tmp/tmp3ke_phhx uploaded to /tmp/TEST_QUERY_SCHEMA_JSON_FILE.json in my-bucket 
bucket","logger":"airflow.task.hooks.airflow.providers.google.cloud.hooks.gcs.GCSHook","filename":"gcs.py","lineno":579}
   {"timestamp":"2025-11-09T00:02:35.944998Z","level":"info","event":"Writing 
local data 
files","logger":"airflow.task.operators.custom.operators.MSSQLToGCSOperator","filename":"sql_to_gcs.py","lineno":185}
   {"timestamp":"2025-11-09T00:02:35.945422Z","level":"info","event":"Uploading 
chunk file #0 to 
GCS.","logger":"airflow.task.operators.custom.operators.MSSQLToGCSOperator","filename":"sql_to_gcs.py","lineno":190}
   
{"timestamp":"2025-11-09T00:02:35.951310Z","level":"warning","event":"Connection
 schemes (type: google_cloud_platform) shall not contain '_' according to 
RFC3986.","logger":"airflow.sdk.definitions.connection","filename":"connection.py","lineno":130}
   {"timestamp":"2025-11-09T00:02:35.952119Z","level":"info","event":"Getting 
connection using `google.auth.default()` since no explicit credentials are 
provided.","logger":"airflow.providers.google.cloud.utils.credentials_provider._CredentialProvider","filename":"credentials_provider.py","lineno":410}
   {"timestamp":"2025-11-09T00:02:37.522008Z","level":"info","event":"File 
/tmp/tmp7kq_evsy uploaded to 
test/test_mssql_gcs/test_mssql_gcs_as_json_schema_file.json in my-bucket 
bucket","logger":"airflow.task.hooks.airflow.providers.google.cloud.hooks.gcs.GCSHook","filename":"gcs.py","lineno":579}
   {"timestamp":"2025-11-09T00:02:37.522115Z","level":"info","event":"Removing 
local 
file","logger":"airflow.task.operators.custom.operators.MSSQLToGCSOperator","filename":"sql_to_gcs.py","lineno":193}
   {"timestamp":"2025-11-09T00:02:37.522820Z","level":"info","event":"Pushing 
xcom","ti":"RuntimeTaskInstance(id=UUID('019a65eb-98e6-785d-abdd-4948329cba31'),
 task_id='test_mssql_gcs_as_json_schema_file', dag_id='test_mssql_gcs', 
run_id='manual__2025-11-09T00:02:09+00:00', try_number=1, 
dag_version_id=UUID('019a2724-b18d-73ac-822c-4fdb0764d0c4'), map_index=-1, 
hostname='airflow', context_carrier={}, task=<Task(MSSQLToGCSOperator): 
test_mssql_gcs_as_json_schema_file>, 
bundle_instance=LocalDagBundle(name=dags-folder), max_tries=0, 
start_date=datetime.datetime(2025, 11, 9, 0, 2, 32, 636275, 
tzinfo=datetime.timezone.utc), end_date=None, state=<TaskInstanceState.RUNNING: 
'running'>, is_mapped=False, 
rendered_map_index=None)","logger":"task","filename":"task_runner.py","lineno":1357}
   {"timestamp":"2025-11-09T00:02:37.550763Z","level":"info","event":"Task 
instance in success state","logger":"task.stdout"}
   {"timestamp":"2025-11-09T00:02:37.550911Z","level":"info","event":" Previous 
state of the Task instance: TaskInstanceState.RUNNING","logger":"task.stdout"}
   {"timestamp":"2025-11-09T00:02:37.552842Z","level":"info","event":"Task 
operator:<Task(MSSQLToGCSOperator): 
test_mssql_gcs_as_json_schema_file>","logger":"task.stdout"}
   ```
   
   ### gs://my-bucket//tmp/TEST_QUERY_SCHEMA_JSON_FILE.json
   
   ```json
   [{"mode": "NULLABLE", "name": "TABLE_CATALOG", "type": "STRING"}, {"mode": 
"NULLABLE", "name": "TABLE_SCHEMA", "type": "STRING"}, {"mode": "NULLABLE", 
"name": "TABLE_NAME", "type": "STRING"}, {"mode": "NULLABLE", "name": 
"TABLE_TYPE", "type": "STRING"}, {"mode": "NULLABLE", "name": "IS_BASE_TABLE", 
"type": "INTEGER"}]
   ```
   
   ### 
gs://my-bucket/test/test_mssql_gcs/test_mssql_gcs_as_json_schema_file.json
   
   ```json
   {"IS_BASE_TABLE": true, "TABLE_CATALOG": "fleet", "TABLE_NAME": 
"__EFMigrationsHistory", "TABLE_SCHEMA": "dbo", "TABLE_TYPE": "BASE TABLE"}
   {"IS_BASE_TABLE": true, "TABLE_CATALOG": "fleet", "TABLE_NAME": "Alert", 
"TABLE_SCHEMA": "dbo", "TABLE_TYPE": "BASE TABLE"}
   {"IS_BASE_TABLE": true, "TABLE_CATALOG": "fleet", "TABLE_NAME": 
"AlertBlackList", "TABLE_SCHEMA": "dbo", "TABLE_TYPE": "BASE TABLE"}
   {"IS_BASE_TABLE": true, "TABLE_CATALOG": "fleet", "TABLE_NAME": 
"AlertRulesGroup", "TABLE_SCHEMA": "dbo", "TABLE_TYPE": "BASE TABLE"}
   {"IS_BASE_TABLE": true, "TABLE_CATALOG": "fleet", "TABLE_NAME": 
"AlertRulesGroupTrackableObject", "TABLE_SCHEMA": "dbo", "TABLE_TYPE": "BASE 
TABLE"}
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to