watertree opened a new issue, #28513:
URL: https://github.com/apache/airflow/issues/28513
### Apache Airflow Provider(s)
google
### Versions of Apache Airflow Providers
```bash
airflow@airflow-worker-XXXXXX-XXXXXX:~$ pip freeze | grep google-cloud
google-cloud-aiplatform @
file:///usr/local/lib/airflow-pypi-dependencies-2.3.4/python3.8/google_cloud_aiplatform-1.16.1-py2.py3-none-any.whl
google-cloud-appengine-logging @
file:///usr/local/lib/airflow-pypi-dependencies-2.3.4/python3.8/google_cloud_appengine_logging-1.1.3-py2.py3-none-any.whl
google-cloud-audit-log @
file:///usr/local/lib/airflow-pypi-dependencies-2.3.4/python3.8/google_cloud_audit_log-0.2.4-py2.py3-none-any.whl
google-cloud-automl @
file:///usr/local/lib/airflow-pypi-dependencies-2.3.4/python3.8/google_cloud_automl-2.8.0-py2.py3-none-any.whl
google-cloud-bigquery @
file:///usr/local/lib/airflow-pypi-dependencies-2.3.4/python3.8/google_cloud_bigquery-2.34.4-py2.py3-none-any.whl
google-cloud-bigquery-datatransfer @
file:///usr/local/lib/airflow-pypi-dependencies-2.3.4/python3.8/google_cloud_bigquery_datatransfer-3.7.0-py2.py3-none-any.whl
google-cloud-bigquery-storage @
file:///usr/local/lib/airflow-pypi-dependencies-2.3.4/python3.8/google_cloud_bigquery_storage-2.14.1-py2.py3-none-any.whl
google-cloud-bigtable @
file:///usr/local/lib/airflow-pypi-dependencies-2.3.4/python3.8/google_cloud_bigtable-1.7.3-py2.py3-none-any.whl
google-cloud-build @
file:///usr/local/lib/airflow-pypi-dependencies-2.3.4/python3.8/google_cloud_build-3.9.0-py2.py3-none-any.whl
google-cloud-common @
file:///usr/local/lib/airflow-pypi-dependencies-2.3.4/python3.8/google_cloud_common-1.0.3-py2.py3-none-any.whl
google-cloud-compute @
file:///usr/local/lib/airflow-pypi-dependencies-2.3.4/python3.8/google_cloud_compute-0.7.0-py2.py3-none-any.whl
google-cloud-container @
file:///usr/local/lib/airflow-pypi-dependencies-2.3.4/python3.8/google_cloud_container-2.11.1-py2.py3-none-any.whl
google-cloud-core @
file:///usr/local/lib/airflow-pypi-dependencies-2.3.4/python3.8/google_cloud_core-2.3.2-py2.py3-none-any.whl
google-cloud-datacatalog @
file:///usr/local/lib/airflow-pypi-dependencies-2.3.4/python3.8/google_cloud_datacatalog-3.9.0-py2.py3-none-any.whl
google-cloud-datacatalog-lineage @
file:///usr/local/lib/airflow-pypi-dependencies-2.3.4/python3.8/google_cloud_datacatalog_lineage-0.1.6-py3-none-any.whl
google-cloud-datacatalog-lineage-producer-client @
file:///usr/local/lib/airflow-pypi-dependencies-2.3.4/python3.8/google_cloud_datacatalog_lineage_producer_client-0.0.9-py3-none-any.whl
google-cloud-dataform @
file:///usr/local/lib/airflow-pypi-dependencies-2.3.4/python3.8/google_cloud_dataform-0.2.0-py2.py3-none-any.whl
google-cloud-dataplex @
file:///usr/local/lib/airflow-pypi-dependencies-2.3.4/python3.8/google_cloud_dataplex-1.1.0-py2.py3-none-any.whl
google-cloud-dataproc @
file:///usr/local/lib/airflow-pypi-dependencies-2.3.4/python3.8/google_cloud_dataproc-5.0.0-py2.py3-none-any.whl
google-cloud-dataproc-metastore @
file:///usr/local/lib/airflow-pypi-dependencies-2.3.4/python3.8/google_cloud_dataproc_metastore-1.6.0-py2.py3-none-any.whl
google-cloud-datastore @
file:///usr/local/lib/airflow-pypi-dependencies-2.3.4/python3.8/google_cloud_datastore-2.8.0-py2.py3-none-any.whl
google-cloud-dlp @
file:///usr/local/lib/airflow-pypi-dependencies-2.3.4/python3.8/google_cloud_dlp-1.0.2-py2.py3-none-any.whl
google-cloud-filestore @
file:///usr/local/lib/airflow-pypi-dependencies-2.3.4/python3.8/google_cloud_filestore-1.2.0-py2.py3-none-any.whl
google-cloud-firestore @
file:///usr/local/lib/airflow-pypi-dependencies-2.3.4/python3.8/google_cloud_firestore-2.5.0-py2.py3-none-any.whl
google-cloud-kms @
file:///usr/local/lib/airflow-pypi-dependencies-2.3.4/python3.8/google_cloud_kms-2.12.0-py2.py3-none-any.whl
google-cloud-language @
file:///usr/local/lib/airflow-pypi-dependencies-2.3.4/python3.8/google_cloud_language-1.3.2-py2.py3-none-any.whl
google-cloud-logging @
file:///usr/local/lib/airflow-pypi-dependencies-2.3.4/python3.8/google_cloud_logging-3.2.1-py2.py3-none-any.whl
google-cloud-memcache @
file:///usr/local/lib/airflow-pypi-dependencies-2.3.4/python3.8/google_cloud_memcache-1.4.1-py2.py3-none-any.whl
google-cloud-monitoring @
file:///usr/local/lib/airflow-pypi-dependencies-2.3.4/python3.8/google_cloud_monitoring-2.11.0-py2.py3-none-any.whl
google-cloud-orchestration-airflow @
file:///usr/local/lib/airflow-pypi-dependencies-2.3.4/python3.8/google_cloud_orchestration_airflow-1.4.1-py2.py3-none-any.whl
google-cloud-os-login @
file:///usr/local/lib/airflow-pypi-dependencies-2.3.4/python3.8/google_cloud_os_login-2.7.1-py2.py3-none-any.whl
google-cloud-pubsub @
file:///usr/local/lib/airflow-pypi-dependencies-2.3.4/python3.8/google_cloud_pubsub-2.13.4-py2.py3-none-any.whl
google-cloud-pubsublite @
file:///usr/local/lib/airflow-pypi-dependencies-2.3.4/python3.8/google_cloud_pubsublite-0.6.1-py2.py3-none-any.whl
google-cloud-redis @
file:///usr/local/lib/airflow-pypi-dependencies-2.3.4/python3.8/google_cloud_redis-2.9.0-py2.py3-none-any.whl
google-cloud-resource-manager @
file:///usr/local/lib/airflow-pypi-dependencies-2.3.4/python3.8/google_cloud_resource_manager-1.6.0-py2.py3-none-any.whl
google-cloud-secret-manager @
file:///usr/local/lib/airflow-pypi-dependencies-2.3.4/python3.8/google_cloud_secret_manager-1.0.2-py2.py3-none-any.whl
google-cloud-spanner @
file:///usr/local/lib/airflow-pypi-dependencies-2.3.4/python3.8/google_cloud_spanner-1.19.3-py2.py3-none-any.whl
google-cloud-speech @
file:///usr/local/lib/airflow-pypi-dependencies-2.3.4/python3.8/google_cloud_speech-1.3.4-py2.py3-none-any.whl
google-cloud-storage @
file:///usr/local/lib/airflow-pypi-dependencies-2.3.4/python3.8/google_cloud_storage-2.6.0-py2.py3-none-any.whl
google-cloud-tasks @
file:///usr/local/lib/airflow-pypi-dependencies-2.3.4/python3.8/google_cloud_tasks-2.10.1-py2.py3-none-any.whl
google-cloud-texttospeech @
file:///usr/local/lib/airflow-pypi-dependencies-2.3.4/python3.8/google_cloud_texttospeech-1.0.3-py2.py3-none-any.whl
google-cloud-translate @
file:///usr/local/lib/airflow-pypi-dependencies-2.3.4/python3.8/google_cloud_translate-1.7.2-py2.py3-none-any.whl
google-cloud-videointelligence @
file:///usr/local/lib/airflow-pypi-dependencies-2.3.4/python3.8/google_cloud_videointelligence-1.16.3-py2.py3-none-any.whl
google-cloud-vision @
file:///usr/local/lib/airflow-pypi-dependencies-2.3.4/python3.8/google_cloud_vision-1.0.2-py2.py3-none-any.whl
google-cloud-workflows @
file:///usr/local/lib/airflow-pypi-dependencies-2.3.4/python3.8/google_cloud_workflows-1.7.1-py2.py3-none-any.whl
```
### Apache Airflow version
2.3.4
### Operating System
Linux
### Deployment
Composer
### Deployment details
Cloud Composer: 1.20.2
Airflow: 2.3.4
### What happened
GCSToBigQueryOperator was working properly on previous versions of
Airflow/Composer, but started failing with encoding errors from erroneous
attempts to get schemas. The error from the test case attached below:
```
*** Reading remote log from
gs://us-central1-composer-bdcca446-bucket/logs/dag_id=load_datastore_backup_from_gcs_to_bq_bug/run_id=scheduled__2022-12-20T00:00:00+00:00/task_id=load_ds_backup_from_bq/attempt=1.log.
[2022-12-21, 00:10:03 UTC] {taskinstance.py:1172} INFO - Dependencies all
met for <TaskInstance:
load_datastore_backup_from_gcs_to_bq_bug.load_ds_backup_from_bq
scheduled__2022-12-20T00:00:00+00:00 [queued]>
[2022-12-21, 00:10:03 UTC] {taskinstance.py:1172} INFO - Dependencies all
met for <TaskInstance:
load_datastore_backup_from_gcs_to_bq_bug.load_ds_backup_from_bq
scheduled__2022-12-20T00:00:00+00:00 [queued]>
[2022-12-21, 00:10:03 UTC] {taskinstance.py:1369} INFO -
--------------------------------------------------------------------------------
[2022-12-21, 00:10:03 UTC] {taskinstance.py:1370} INFO - Starting attempt 1
of 3
[2022-12-21, 00:10:03 UTC] {taskinstance.py:1371} INFO -
--------------------------------------------------------------------------------
[2022-12-21, 00:10:03 UTC] {taskinstance.py:1390} INFO - Executing
<Task(GCSToBigQueryOperator): load_ds_backup_from_bq> on 2022-12-20
00:00:00+00:00
[2022-12-21, 00:10:03 UTC] {standard_task_runner.py:52} INFO - Started
process 4324 to run task
[2022-12-21, 00:10:03 UTC] {standard_task_runner.py:79} INFO - Running:
['airflow', 'tasks', 'run', 'load_datastore_backup_from_gcs_to_bq_bug',
'load_ds_backup_from_bq', 'scheduled__2022-12-20T00:00:00+00:00', '--job-id',
'55473', '--raw', '--subdir', 'DAGS_FOLDER/gcs_datastore_bq_bug_dag.py',
'--cfg-path', '/tmp/tmpum3kky0y', '--error-file', '/tmp/tmpl4da9d_3']
[2022-12-21, 00:10:03 UTC] {standard_task_runner.py:80} INFO - Job 55473:
Subtask load_ds_backup_from_bq
[2022-12-21, 00:10:04 UTC] {task_command.py:375} INFO - Running
<TaskInstance: load_datastore_backup_from_gcs_to_bq_bug.load_ds_backup_from_bq
scheduled__2022-12-20T00:00:00+00:00 [running]> on host
airflow-worker-594959469f-9dwhs
[2022-12-21, 00:10:04 UTC] {taskinstance.py:1583} INFO - Exporting the
following env vars:
AIRFLOW_CTX_DAG_OWNER=airflow
AIRFLOW_CTX_DAG_ID=load_datastore_backup_from_gcs_to_bq_bug
AIRFLOW_CTX_TASK_ID=load_ds_backup_from_bq
AIRFLOW_CTX_EXECUTION_DATE=2022-12-20T00:00:00+00:00
AIRFLOW_CTX_TRY_NUMBER=1
AIRFLOW_CTX_DAG_RUN_ID=scheduled__2022-12-20T00:00:00+00:00
[2022-12-21, 00:10:04 UTC] {base.py:68} INFO - Using connection ID
'google_cloud_default' for task execution.
[2022-12-21, 00:10:04 UTC] {base.py:68} INFO - Using connection ID
'google_cloud_default' for task execution.
[2022-12-21, 00:10:04 UTC] {gcs_to_bigquery.py:367} INFO - Using existing
BigQuery table for storing data...
[2022-12-21, 00:10:04 UTC] {credentials_provider.py:323} INFO - Getting
connection using `google.auth.default()` since no key file is defined for hook.
[2022-12-21, 00:10:04 UTC] {bigquery.py:2252} INFO - Project is not included
in destination_project_dataset_table: ds.boog; using project
"*REDACTED_GCP_PROJECT*"
[2022-12-21, 00:10:05 UTC] {base.py:68} INFO - Using connection ID
'google_cloud_default' for task execution.
[2022-12-21, 00:10:05 UTC] {credentials_provider.py:323} INFO - Getting
connection using `google.auth.default()` since no key file is defined for hook.
[2022-12-21, 00:10:05 UTC] {taskinstance.py:1904} ERROR - Task failed with
exception
Traceback (most recent call last):
File
"/opt/python3.8/lib/python3.8/site-packages/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py",
line 397, in execute
self.configuration = self._check_schema_fields(self.configuration)
File
"/opt/python3.8/lib/python3.8/site-packages/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py",
line 542, in _check_schema_fields
fields, values = [item.split(",") for item in
blob.decode("utf-8").splitlines()][:2]
UnicodeDecodeError: 'utf-8' codec can't decode byte 0xb7 in position 20:
invalid start byte
[2022-12-21, 00:10:05 UTC] {taskinstance.py:1408} INFO - Marking task as
UP_FOR_RETRY. dag_id=load_datastore_backup_from_gcs_to_bq_bug,
task_id=load_ds_backup_from_bq, execution_date=20221220T000000,
start_date=20221221T001003, end_date=20221221T001005
[2022-12-21, 00:10:05 UTC] {standard_task_runner.py:92} ERROR - Failed to
execute job 55473 for task load_ds_backup_from_bq ('utf-8' codec can't decode
byte 0xb7 in position 20: invalid start byte; 4324)
[2022-12-21, 00:10:05 UTC] {local_task_job.py:156} INFO - Task exited with
return code 1
[2022-12-21, 00:10:05 UTC] {local_task_job.py:279} INFO - 0 downstream tasks
scheduled from follow-on schedule check
```
### What you think should happen instead
Should load table properly (was doing so previously and also manually when
using BQ table creation)
### How to reproduce
Sample below that can be unziped and copied into a cloud storage bucket,
assuming everything in an `airflow` directory:
[airflow.zip](https://github.com/apache/airflow/files/10273565/airflow.zip)
Set up a desired bucket to send backup to, for example:
```bash
gsutil rsync -r airflow gs://bucket-name-here/airflow
```
Drop following DAG and replace variables marked for replacement:
```python
from datetime import timedelta, datetime
from airflow.providers.google.cloud.transfers.gcs_to_bigquery import
GCSToBigQueryOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow import DAG
yesterday = datetime.combine(
datetime.today() - timedelta(1),
datetime.min.time())
# Replace these values
BUCKET = 'bucket-name'
GCP_PROJECT='gcp-project'
DATASET_TABLE='dataset.table'
default_args = {
'start_date': yesterday,
'project_id': GCP_PROJECT
}
schedule_interval = '@once'
dag = DAG('load_datastore_backup_from_gcs_to_bq_bug',
default_args=default_args,
schedule_interval=schedule_interval)
start = DummyOperator(task_id='start', dag=dag)
end = DummyOperator(task_id='end', dag=dag)
bq_load = GCSToBigQueryOperator(
task_id='load_ds_backup_from_bq',
source_format='DATASTORE_BACKUP',
bucket=BUCKET,
source_objects=[
'airflow/namespace_airflow/kind_BugEntity/namespace_airflow_kind_BugEntity.export_metadata'],
destination_project_dataset_table=DATASET_TABLE,
write_disposition='WRITE_TRUNCATE',
create_disposition='CREATE_IF_NEEDED',
dag=dag
)
start >> bq_load
bq_load >> end
```
### Anything else
Every time following upgrade. I believe I was running 2.25 before running a
Composer managed upgrade but not sure but several DAGs stopped working as a
result of this upgrade.
### Are you willing to submit PR?
- [X] Yes I am willing to submit a PR!
### Code of Conduct
- [X] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]