Copilot commented on code in PR #10485: URL: https://github.com/apache/seatunnel/pull/10485#discussion_r2792512457
########## seatunnel-connectors-v2/connector-bigquery/src/main/java/org/apache/seatunnel/connectors/bigquery/client/BigQueryWriteClientFactory.java: ########## @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.bigquery.client; + +import org.apache.seatunnel.api.configuration.ReadonlyConfig; +import org.apache.seatunnel.connectors.bigquery.option.BigQuerySinkOptions; + +import com.google.api.gax.core.FixedCredentialsProvider; +import com.google.auth.oauth2.GoogleCredentials; +import com.google.auth.oauth2.ServiceAccountCredentials; +import com.google.cloud.bigquery.storage.v1.BigQueryWriteClient; +import com.google.cloud.bigquery.storage.v1.BigQueryWriteSettings; +import com.google.cloud.bigquery.storage.v1.stub.EnhancedBigQueryReadStubSettings; +import io.grpc.ManagedChannelBuilder; +import lombok.extern.slf4j.Slf4j; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Paths; + +@Slf4j +public class BigQueryWriteClientFactory { + private BigQueryWriteClientFactory() {} + + public static BigQueryWriteClient getClient(ReadonlyConfig config) throws IOException { + + if (config.get(BigQuerySinkOptions.EMULATOR_HOST) != null) { + log.info( + "Using BigQuery Emulator at {}", config.get(BigQuerySinkOptions.EMULATOR_HOST)); + String emulatorHost = config.get(BigQuerySinkOptions.EMULATOR_HOST); + + BigQueryWriteSettings settings = + BigQueryWriteSettings.newBuilder() + .setEndpoint(emulatorHost) + .setTransportChannelProvider( + EnhancedBigQueryReadStubSettings + .defaultGrpcTransportProviderBuilder() + .setChannelConfigurator( + ManagedChannelBuilder::usePlaintext) + .build()) Review Comment: The emulator transport channel provider is built from `EnhancedBigQueryReadStubSettings.defaultGrpcTransportProviderBuilder()`, which is for the *Read* API and is a confusing/possibly incorrect dependency for a Write client. Use the Write API's transport provider builder (e.g., `BigQueryWriteSettings.defaultGrpcTransportProviderBuilder()` or the appropriate Write stub settings) to avoid relying on unrelated classes. ########## seatunnel-connectors-v2/connector-bigquery/src/main/java/org/apache/seatunnel/connectors/bigquery/sink/BigQuerySinkWriter.java: ########## @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.bigquery.sink; + +import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.JsonNode; +import org.apache.seatunnel.shade.com.google.common.util.concurrent.MoreExecutors; + +import org.apache.seatunnel.api.configuration.ReadonlyConfig; +import org.apache.seatunnel.api.sink.SinkWriter; +import org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.connectors.bigquery.convert.BigQuerySerializer; +import org.apache.seatunnel.connectors.bigquery.exception.BigQueryConnectorErrorCode; +import org.apache.seatunnel.connectors.bigquery.exception.BigQueryConnectorException; +import org.apache.seatunnel.connectors.bigquery.option.BigQuerySinkOptions; +import org.apache.seatunnel.connectors.bigquery.sink.writer.BigQueryStreamWriter; + +import org.json.JSONArray; +import org.json.JSONObject; + +import com.google.api.core.ApiFuture; +import com.google.api.core.ApiFutureCallback; +import com.google.api.core.ApiFutures; +import com.google.cloud.bigquery.storage.v1.AppendRowsResponse; +import com.google.protobuf.Descriptors; +import lombok.extern.slf4j.Slf4j; + +import java.io.IOException; +import java.util.Optional; +import java.util.concurrent.Phaser; +import java.util.concurrent.atomic.AtomicReference; + +@Slf4j +public class BigQuerySinkWriter + implements SinkWriter<SeaTunnelRow, Void, Void>, SupportMultiTableSinkWriter<Void> { + + private final BigQueryStreamWriter streamWriter; + private final BigQuerySerializer serializer; + + private final Phaser inflightRequests = new Phaser(1); + private final AtomicReference<RuntimeException> fatalError = new AtomicReference<>(); + + private final int batchSize; + private JSONArray buffer = new JSONArray(); + + public BigQuerySinkWriter( + ReadonlyConfig readOnlyConfig, + BigQueryStreamWriter streamWriter, + BigQuerySerializer serializer) { + this.batchSize = readOnlyConfig.get(BigQuerySinkOptions.BATCH_SIZE); + this.streamWriter = streamWriter; + this.serializer = serializer; + } + + @Override + public void write(SeaTunnelRow element) throws IOException { + checkFatalError(); + + JsonNode jsonNode = serializer.convert(element); + buffer.put(new JSONObject(jsonNode.toString())); + + if (buffer.length() >= batchSize) { + flush(); + } + } + + private void flush() throws IOException { + if (buffer.length() == 0) return; + + JSONArray dataToSend = buffer; + buffer = new JSONArray(); + + inflightRequests.register(); + + ApiFuture<AppendRowsResponse> future; + try { + future = streamWriter.append(dataToSend); + } catch (Descriptors.DescriptorValidationException e) { + throw new BigQueryConnectorException(BigQueryConnectorErrorCode.APPEND_ROWS_FAILED, e); + } Review Comment: `flush()` registers a party in `inflightRequests` before calling `append()`, but if `append()` throws (e.g., `DescriptorValidationException`), the party is never deregistered. This can deadlock `close()` at `arriveAndAwaitAdvance()`. Register only after a successful `append()`, or ensure you `arriveAndDeregister()` in the exception path (e.g., in a `finally`). ########## seatunnel-connectors-v2/connector-bigquery/pom.xml: ########## @@ -0,0 +1,119 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.seatunnel</groupId> + <artifactId>seatunnel-connectors-v2</artifactId> + <version>${revision}</version> + </parent> + + <artifactId>connector-bigquery</artifactId> + <name>SeaTunnel : Connectors V2 : Bigquery</name> + Review Comment: Even with this POM added, `seatunnel-connectors-v2/pom.xml` does not list `connector-bigquery` in its `<modules>` section, so Maven won't build/install this artifact in a reactor build. This will break builds that reference `connector-bigquery` (e.g., `seatunnel-dist` / e2e). Add `connector-bigquery` to the parent modules list. ########## seatunnel-connectors-v2/connector-bigquery/src/main/java/org/apache/seatunnel/connectors/bigquery/client/BigQueryWriteClientFactory.java: ########## @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.bigquery.client; + +import org.apache.seatunnel.api.configuration.ReadonlyConfig; +import org.apache.seatunnel.connectors.bigquery.option.BigQuerySinkOptions; + +import com.google.api.gax.core.FixedCredentialsProvider; +import com.google.auth.oauth2.GoogleCredentials; +import com.google.auth.oauth2.ServiceAccountCredentials; +import com.google.cloud.bigquery.storage.v1.BigQueryWriteClient; +import com.google.cloud.bigquery.storage.v1.BigQueryWriteSettings; +import com.google.cloud.bigquery.storage.v1.stub.EnhancedBigQueryReadStubSettings; +import io.grpc.ManagedChannelBuilder; +import lombok.extern.slf4j.Slf4j; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Paths; + +@Slf4j +public class BigQueryWriteClientFactory { + private BigQueryWriteClientFactory() {} + + public static BigQueryWriteClient getClient(ReadonlyConfig config) throws IOException { + + if (config.get(BigQuerySinkOptions.EMULATOR_HOST) != null) { + log.info( + "Using BigQuery Emulator at {}", config.get(BigQuerySinkOptions.EMULATOR_HOST)); + String emulatorHost = config.get(BigQuerySinkOptions.EMULATOR_HOST); + + BigQueryWriteSettings settings = + BigQueryWriteSettings.newBuilder() + .setEndpoint(emulatorHost) + .setTransportChannelProvider( + EnhancedBigQueryReadStubSettings + .defaultGrpcTransportProviderBuilder() + .setChannelConfigurator( + ManagedChannelBuilder::usePlaintext) + .build()) + .setCredentialsProvider( + com.google.api.gax.core.NoCredentialsProvider.create()) + .build(); + + BigQueryWriteClient bigQueryWriteClient = BigQueryWriteClient.create(settings); + log.info("Created BigQueryWriteClient for emulator at {}", emulatorHost); + + return bigQueryWriteClient; + } + + GoogleCredentials credentials; + if (config.get(BigQuerySinkOptions.SERVICE_ACCOUNT_KEY_PATH) != null) { + credentials = + ServiceAccountCredentials.fromStream( + Files.newInputStream( + Paths.get( + config.get( + BigQuerySinkOptions + .SERVICE_ACCOUNT_KEY_PATH)))); + } else if (config.get(BigQuerySinkOptions.SERVICE_ACCOUNT_KEY_JSON) != null) { + credentials = + ServiceAccountCredentials.fromStream( + new ByteArrayInputStream( + config.get(BigQuerySinkOptions.SERVICE_ACCOUNT_KEY_JSON) + .getBytes())); Review Comment: `service_account_key_json.getBytes()` uses the platform default charset. Use UTF-8 explicitly to avoid authentication failures on platforms with non-UTF8 defaults. ########## docs/en/connectors/sink/BigQuery.md: ########## @@ -0,0 +1,132 @@ +import ChangeLog from '../changelog/connector-bigquery.md'; + +# BigQuery + +> BigQuery sink connector + +## Support Those Engines + +> Spark<br/> +> Flink<br/> +> Seatunnel Zeta<br/> + +## Key Features + +- [ ] [exactly-once](../../introduction/concepts/connector-v2-features.md) +- [ ] [cdc](../../introduction/concepts/connector-v2-features.md) + +## Description + +Sink connector for Google Cloud BigQuery using the Storage Write API for high-performance data ingestion. + +## Supported DataSource Info + +| Datasource | Supported Versions | Maven | +|------------|--------------------|----------------------------------------------------------------------------------------| +| BigQuery | BOM 26.72.0 | [Download](https://mvnrepository.com/artifact/com.google.cloud/google-cloud-bigquery) | + + +## Options + +| Name | Type | Required | Default | Description | +|-----------------------------|---------|----------|---------|-----------------------------------------------------------------------------| +| project_id | string | Yes | - | GCP project ID | +| dataset_id | string | Yes | - | BigQuery dataset ID | +| table_id | string | Yes | - | BigQuery table ID | +| service_account_key_path | string | No | - | Path to GCP service account JSON key file | +| service_account_key_json | string | No | - | Inline GCP service account JSON key content | +| batch_size | int | No | 1000 | Number of rows to batch before sending to BigQuery | +| emulator_host | string | No | - | BigQuery emulator host for testing (e.g., localhost:9050) | + +### Authentication Options + +You must provide **one** of the following authentication methods: + +1. **service_account_key_path**: Path to service account JSON file +2. **service_account_key_json**: Inline JSON key content +3. **Default credentials**: Uses application default credentials (ADC) if neither is specified + +## Task Example + +### Simple Example (Using Service Account File) + +```hocon +env { + parallelism = 2 + job.mode = "BATCH" +} + +source { + FakeSource { + row.num = 1000 + schema = { + fields { + user_id = "bigint" + username = "string" + email = "string" + created_at = "timestamp" + } + } + } +} + +sink { + BigQuery { + project_id = "my-gcp-project" + dataset_id = "analytics" + table_id = "user_events" + service_account_key_path = "/path/to/key.json" + batch_size = 1000 + } +} +``` + +### Example with BigQuery Emulator (Testing) + +```hocon +sink { + BigQuery { + project_id = "test-project" + dataset_id = "test_dataset" + table_id = "test_table" + emulator_host = "localhost:9050" + batch_size = 100 + } +} Review Comment: Docs include an emulator example, but the added E2E tests are disabled with the note that the BigQuery emulator doesn't support the Storage Write API. This is confusing for users. Either document the limitation/version constraints clearly here or remove the emulator example until it is actually supported. ########## seatunnel-connectors-v2/connector-bigquery/src/main/java/org/apache/seatunnel/connectors/bigquery/sink/writer/BigQueryStreamWriter.java: ########## @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.bigquery.sink.writer; + +import org.apache.seatunnel.api.configuration.ReadonlyConfig; +import org.apache.seatunnel.connectors.bigquery.client.BigQueryWriteClientFactory; +import org.apache.seatunnel.connectors.bigquery.exception.BigQueryConnectorErrorCode; +import org.apache.seatunnel.connectors.bigquery.exception.BigQueryConnectorException; +import org.apache.seatunnel.connectors.bigquery.option.BigQuerySinkOptions; + +import org.json.JSONArray; +import org.threeten.bp.Duration; + +import com.google.api.core.ApiFuture; +import com.google.api.gax.batching.FlowControlSettings; +import com.google.api.gax.retrying.RetrySettings; +import com.google.cloud.bigquery.storage.v1.AppendRowsRequest; +import com.google.cloud.bigquery.storage.v1.AppendRowsResponse; +import com.google.cloud.bigquery.storage.v1.BigQueryWriteClient; +import com.google.cloud.bigquery.storage.v1.BigQueryWriteSettings; +import com.google.cloud.bigquery.storage.v1.JsonStreamWriter; +import com.google.cloud.bigquery.storage.v1.TableName; +import com.google.protobuf.Descriptors; +import lombok.extern.slf4j.Slf4j; + +import java.io.IOException; + +@Slf4j +public class BigQueryStreamWriter implements BigQueryWriter { + public static final String DEFAULT_PATH = "/streams/_default"; + private final JsonStreamWriter streamWriter; + private final BigQueryWriteClient client; + + public BigQueryStreamWriter(JsonStreamWriter streamWriter, BigQueryWriteClient client) { + this.streamWriter = streamWriter; + this.client = client; + } + + public static BigQueryStreamWriter of(ReadonlyConfig config) { + try { + BigQueryWriteClient bigQueryWriteClient = BigQueryWriteClientFactory.getClient(config); + return new BigQueryStreamWriter( + createStreamWriter(config, bigQueryWriteClient), bigQueryWriteClient); + } catch (Exception e) { + throw new BigQueryConnectorException( + BigQueryConnectorErrorCode.STREAM_CREATE_FAILED, e); + } + } + + private static JsonStreamWriter createStreamWriter( + ReadonlyConfig config, BigQueryWriteClient client) + throws Descriptors.DescriptorValidationException, IOException, InterruptedException { + String projectId = config.get(BigQuerySinkOptions.PROJECT_ID); + String datasetId = config.get(BigQuerySinkOptions.DATASET_ID); + String tableId = config.get(BigQuerySinkOptions.TABLE_ID); + + String streamName = getStreamName(projectId, datasetId, tableId); + + RetrySettings retrySettings = + RetrySettings.newBuilder() + .setInitialRetryDelay(Duration.ofMillis(500)) + .setRetryDelayMultiplier(1.1) + .setMaxAttempts(5) + .setMaxRetryDelay(Duration.ofMinutes(1)) + .build(); + + return JsonStreamWriter.newBuilder(streamName, client) + .setChannelProvider( + BigQueryWriteSettings.defaultGrpcTransportProviderBuilder() + .setKeepAliveTime(Duration.ofMinutes(1)) + .setKeepAliveTimeout(Duration.ofMinutes(1)) + .setKeepAliveWithoutCalls(true) + .build()) + .setEnableConnectionPool(true) + .setFlowControlSettings( + FlowControlSettings.newBuilder() + .setMaxOutstandingElementCount(100L) + .build()) + .setDefaultMissingValueInterpretation( + AppendRowsRequest.MissingValueInterpretation.DEFAULT_VALUE) + .setRetrySettings(retrySettings) + .build(); + } + + private static String getStreamName(String projectId, String datasetId, String tableId) { + return TableName.of(projectId, datasetId, tableId).toString() + DEFAULT_PATH; + } + + @Override + public ApiFuture<AppendRowsResponse> append(JSONArray jsonArr) + throws Descriptors.DescriptorValidationException, IOException { + return streamWriter.append(jsonArr); + } + + @Override + public void close() { + client.close(); + streamWriter.close(); Review Comment: `close()` closes the `BigQueryWriteClient` before closing `JsonStreamWriter`. If the writer needs the client/channel during shutdown (e.g., to flush/finish RPCs), this order can cause errors. Close `streamWriter` first, then close `client`. ```suggestion try { streamWriter.close(); } finally { client.close(); } ``` ########## seatunnel-connectors-v2/connector-bigquery/pom.xml: ########## @@ -0,0 +1,119 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.seatunnel</groupId> + <artifactId>seatunnel-connectors-v2</artifactId> + <version>${revision}</version> + </parent> + + <artifactId>connector-bigquery</artifactId> + <name>SeaTunnel : Connectors V2 : Bigquery</name> + + <dependencyManagement> + <dependencies> + <dependency> + <groupId>com.google.cloud</groupId> + <artifactId>libraries-bom</artifactId> + <version>26.72.0</version> + <type>pom</type> + <scope>import</scope> + </dependency> + </dependencies> + </dependencyManagement> + + <dependencies> + <dependency> + <groupId>org.apache.seatunnel</groupId> + <artifactId>seatunnel-api</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.seatunnel</groupId> + <artifactId>connector-common</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.seatunnel</groupId> + <artifactId>seatunnel-format-json</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + <version>31.1-jre</version> + </dependency> + <dependency> + <groupId>com.google.cloud</groupId> + <artifactId>google-cloud-bigquery</artifactId> + </dependency> + <dependency> + <groupId>com.google.cloud</groupId> Review Comment: This connector uses Storage Write API classes (`com.google.cloud.bigquery.storage.v1.*`), but the module POM doesn't declare a dependency that provides them (typically `com.google.cloud:google-cloud-bigquery-storage`, managed by `libraries-bom`). As-is, the module is likely to fail compilation unless the dependency is pulled transitively. Add the explicit dependency (and consider whether `google-cloud-storage` is needed for this initial implementation). ```suggestion <groupId>com.google.cloud</groupId> <artifactId>google-cloud-bigquery-storage</artifactId> </dependency> <dependency> <groupId>com.google.cloud</groupId> ``` ########## seatunnel-e2e/seatunnel-connector-v2-e2e/connector-bigquery-e2e/src/test/java/org/apache/seatunnel/e2e/connector/bigquery/BigqueryIT.java: ########## @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.e2e.connector.bigquery; + +import org.apache.seatunnel.e2e.common.container.TestContainer; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.TestTemplate; +import org.testcontainers.containers.Container; + +import lombok.extern.slf4j.Slf4j; + +import java.io.IOException; + +@Slf4j +public class BigqueryIT extends AbstractBigqueryIT { + @TestTemplate + @Disabled("bigquery-emulator does not support bigquery storage write api.") + void testBigQuerySink(TestContainer container) throws IOException, InterruptedException { + Container.ExecResult execResult = container.executeJob("/fake_to_bigquery_sink.conf"); + Assertions.assertEquals(0, execResult.getExitCode()); + } + + @TestTemplate + @Disabled("bigquery-emulator does not support bigquery storage write api.") + void testBigQuerySinkWithVariousType(TestContainer container) + throws IOException, InterruptedException { + Container.ExecResult execResult = + container.executeJob("/fake_to_bigquery_sink_with_various_type.conf"); + Assertions.assertEquals(0, execResult.getExitCode()); + } Review Comment: All E2E test methods are disabled, so this module provides no CI coverage for the new connector. Consider making the tests conditionally runnable (e.g., require an env var / profile + real GCP credentials) or add unit/integration tests around serialization and the writer (e.g., error propagation + batching) so that at least core logic is validated in CI. ########## docs/zh/connectors/sink/BigQuery.md: ########## @@ -0,0 +1,131 @@ +import ChangeLog from '../changelog/connector-bigquery.md'; + +# BigQuery + +> BigQuery 数据接收器连接器 + +## 支持的引擎 + +> Spark<br/> +> Flink<br/> +> Seatunnel Zeta<br/> + +## 主要特性 + +- [ ] [精确一次](../../introduction/concepts/connector-v2-features.md) +- [ ] [变更数据捕获](../../introduction/concepts/connector-v2-features.md) + +## 描述 + +用于 Google Cloud BigQuery 的数据接收器连接器,使用 Storage Write API 实现高性能数据摄取。 + +## 支持的数据源信息 + +| 数据源 | 支持的版本 | Maven | +|-----------|-------------|----------------------------------------------------------------------------------------| +| BigQuery | BOM 26.72.0 | [下载](https://mvnrepository.com/artifact/com.google.cloud/google-cloud-bigquery) | + +## 配置选项 + +| 名称 | 类型 | 是否必需 | 默认值 | 描述 | +|----------------------------|---------|---------|--------|-------------------------------------------------------------| +| project_id | string | 是 | - | GCP 项目 ID | +| dataset_id | string | 是 | - | BigQuery 数据集 ID | +| table_id | string | 是 | - | BigQuery 表 ID | +| service_account_key_path | string | 否 | - | GCP 服务账号 JSON 密钥文件路径 | +| service_account_key_json | string | 否 | - | GCP 服务账号 JSON 密钥内容 | +| batch_size | int | 否 | 1000 | 发送到 BigQuery 之前批处理的行数 | +| emulator_host | string | 否 | - | 用于测试的 BigQuery 模拟器主机 (例如: localhost:9050) | + +### 认证选项 + +您必须提供以下认证方法之一: + +1. **service_account_key_path**: 服务账号 JSON 文件路径 +2. **service_account_key_json**: 内联 JSON 密钥内容 +3. **默认凭据**: 如果未指定上述选项,则使用应用程序默认凭据 (ADC) + +## 任务示例 + +### 简单示例 (使用服务账号文件) + +```hocon +env { + parallelism = 2 + job.mode = "BATCH" +} + +source { + FakeSource { + row.num = 1000 + schema = { + fields { + user_id = "bigint" + username = "string" + email = "string" + created_at = "timestamp" + } + } + } +} + +sink { + BigQuery { + project_id = "my-gcp-project" + dataset_id = "analytics" + table_id = "user_events" + service_account_key_path = "/path/to/key.json" + batch_size = 1000 + } +} +``` + +### BigQuery 模拟器示例 (测试) + +```hocon +sink { + BigQuery { + project_id = "test-project" + dataset_id = "test_dataset" + table_id = "test_table" + emulator_host = "localhost:9050" + batch_size = 100 + } +} +``` Review Comment: 文档中给出了 BigQuery emulator 的示例,但新增的 E2E 测试明确说明 emulator 不支持 Storage Write API 并被禁用。为避免误导用户,建议在此处明确说明 emulator 当前不支持/仅在特定版本支持,或暂时移除该示例直到真正可用。 ########## seatunnel-connectors-v2/connector-bigquery/src/main/java/org/apache/seatunnel/connectors/bigquery/sink/BigQuerySinkWriter.java: ########## @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.bigquery.sink; + +import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.JsonNode; +import org.apache.seatunnel.shade.com.google.common.util.concurrent.MoreExecutors; + +import org.apache.seatunnel.api.configuration.ReadonlyConfig; +import org.apache.seatunnel.api.sink.SinkWriter; +import org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.connectors.bigquery.convert.BigQuerySerializer; +import org.apache.seatunnel.connectors.bigquery.exception.BigQueryConnectorErrorCode; +import org.apache.seatunnel.connectors.bigquery.exception.BigQueryConnectorException; +import org.apache.seatunnel.connectors.bigquery.option.BigQuerySinkOptions; +import org.apache.seatunnel.connectors.bigquery.sink.writer.BigQueryStreamWriter; + +import org.json.JSONArray; +import org.json.JSONObject; + +import com.google.api.core.ApiFuture; +import com.google.api.core.ApiFutureCallback; +import com.google.api.core.ApiFutures; +import com.google.cloud.bigquery.storage.v1.AppendRowsResponse; +import com.google.protobuf.Descriptors; +import lombok.extern.slf4j.Slf4j; + +import java.io.IOException; +import java.util.Optional; +import java.util.concurrent.Phaser; +import java.util.concurrent.atomic.AtomicReference; + +@Slf4j +public class BigQuerySinkWriter + implements SinkWriter<SeaTunnelRow, Void, Void>, SupportMultiTableSinkWriter<Void> { + + private final BigQueryStreamWriter streamWriter; + private final BigQuerySerializer serializer; + + private final Phaser inflightRequests = new Phaser(1); + private final AtomicReference<RuntimeException> fatalError = new AtomicReference<>(); + + private final int batchSize; + private JSONArray buffer = new JSONArray(); + + public BigQuerySinkWriter( + ReadonlyConfig readOnlyConfig, + BigQueryStreamWriter streamWriter, + BigQuerySerializer serializer) { + this.batchSize = readOnlyConfig.get(BigQuerySinkOptions.BATCH_SIZE); + this.streamWriter = streamWriter; + this.serializer = serializer; + } + + @Override + public void write(SeaTunnelRow element) throws IOException { + checkFatalError(); + + JsonNode jsonNode = serializer.convert(element); + buffer.put(new JSONObject(jsonNode.toString())); + + if (buffer.length() >= batchSize) { + flush(); + } + } + + private void flush() throws IOException { + if (buffer.length() == 0) return; + + JSONArray dataToSend = buffer; + buffer = new JSONArray(); + + inflightRequests.register(); + + ApiFuture<AppendRowsResponse> future; + try { + future = streamWriter.append(dataToSend); + } catch (Descriptors.DescriptorValidationException e) { + throw new BigQueryConnectorException(BigQueryConnectorErrorCode.APPEND_ROWS_FAILED, e); + } + + ApiFutures.addCallback( + future, + new ApiFutureCallback<AppendRowsResponse>() { + @Override + public void onSuccess(AppendRowsResponse result) { + inflightRequests.arriveAndDeregister(); + log.info("Successfully appended {} rows.", dataToSend.length()); + } + + @Override + public void onFailure(Throwable t) { + fatalError.compareAndSet( + null, + new BigQueryConnectorException( + BigQueryConnectorErrorCode.APPEND_ROWS_FAILED, t)); + inflightRequests.arriveAndDeregister(); + log.warn("Failed to append rows.", t); + } + }, + MoreExecutors.directExecutor()); + } + + private void checkFatalError() { + RuntimeException e = fatalError.get(); + if (e != null) { + throw e; + } + } + + @Override + public Optional<Void> prepareCommit() { + return Optional.empty(); + } + + @Override + public void abortPrepare() {} + + @Override + public void close() throws IOException { + flush(); + inflightRequests.arriveAndAwaitAdvance(); Review Comment: If an async append fails, `fatalError` is set in the callback, but `close()` never calls `checkFatalError()` after waiting for in-flight requests. This means the job can finish successfully even though the final batch failed. After `arriveAndAwaitAdvance()`, re-check and throw `fatalError` (and consider checking right after `flush()` as well). ```suggestion flush(); checkFatalError(); inflightRequests.arriveAndAwaitAdvance(); checkFatalError(); ``` ########## seatunnel-connectors-v2/connector-bigquery/src/main/java/org/apache/seatunnel/connectors/bigquery/convert/RowToJsonConverters.java: ########## @@ -0,0 +1,302 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.bigquery.convert; + +import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.JsonNode; +import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.ArrayNode; +import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.ObjectNode; + +import org.apache.seatunnel.api.table.type.ArrayType; +import org.apache.seatunnel.api.table.type.MapType; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.api.table.type.SqlType; +import org.apache.seatunnel.common.exception.CommonErrorCode; +import org.apache.seatunnel.format.json.RowToJsonConverter; +import org.apache.seatunnel.format.json.TimeFormat; +import org.apache.seatunnel.format.json.exception.SeaTunnelJsonFormatException; + +import java.math.BigDecimal; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.time.OffsetDateTime; +import java.util.Arrays; +import java.util.Map; +import java.util.function.Function; +import java.util.function.IntFunction; + +import static java.time.format.DateTimeFormatter.ISO_LOCAL_DATE; +import static java.time.format.DateTimeFormatter.ISO_LOCAL_DATE_TIME; +import static java.time.format.DateTimeFormatter.ISO_OFFSET_DATE_TIME; + +public class RowToJsonConverters { + private String nullValue; + + public RowToJsonConverter createConverter(SeaTunnelDataType<?> type) { + return wrapIntoNullableConverter(createNotNullConverter(type)); + } + + public RowToJsonConverter createConverter(SeaTunnelDataType<?> type, String nullValue) { + this.nullValue = nullValue; + return createConverter(type); + } + + private RowToJsonConverter wrapIntoNullableConverter(RowToJsonConverter converter) { + return new RowToJsonConverter() { + @Override + public JsonNode convert(ObjectMapper mapper, JsonNode reuse, Object value) { + if (value == null) { + if (nullValue != null) { + return mapper.getNodeFactory().textNode(nullValue); + } + return mapper.getNodeFactory().nullNode(); + } + return converter.convert(mapper, reuse, value); + } + }; + } + + private RowToJsonConverter createNotNullConverter(SeaTunnelDataType<?> type) { + SqlType sqlType = type.getSqlType(); + switch (sqlType) { + case ROW: + return createRowConverter((SeaTunnelRowType) type); + case NULL: + return new RowToJsonConverter() { + @Override + public JsonNode convert(ObjectMapper mapper, JsonNode reuse, Object value) { + return nullValue == null + ? null + : mapper.getNodeFactory().textNode((String) value); + } + }; + case BOOLEAN: + return new RowToJsonConverter() { + @Override + public JsonNode convert(ObjectMapper mapper, JsonNode reuse, Object value) { + return mapper.getNodeFactory().booleanNode((Boolean) value); + } + }; + case TINYINT: + return new RowToJsonConverter() { + @Override + public JsonNode convert(ObjectMapper mapper, JsonNode reuse, Object value) { + return mapper.getNodeFactory().numberNode((byte) value); + } + }; + case SMALLINT: + return new RowToJsonConverter() { + @Override + public JsonNode convert(ObjectMapper mapper, JsonNode reuse, Object value) { + return mapper.getNodeFactory().numberNode((short) value); + } + }; + case INT: + return new RowToJsonConverter() { + @Override + public JsonNode convert(ObjectMapper mapper, JsonNode reuse, Object value) { + return mapper.getNodeFactory().numberNode((int) value); + } + }; + case BIGINT: + return new RowToJsonConverter() { + @Override + public JsonNode convert(ObjectMapper mapper, JsonNode reuse, Object value) { + return mapper.getNodeFactory().numberNode((long) value); + } + }; + case FLOAT: + return new RowToJsonConverter() { + @Override + public JsonNode convert(ObjectMapper mapper, JsonNode reuse, Object value) { + return mapper.getNodeFactory().numberNode((float) value); + } + }; + case DOUBLE: + return new RowToJsonConverter() { + @Override + public JsonNode convert(ObjectMapper mapper, JsonNode reuse, Object value) { + return mapper.getNodeFactory().numberNode((double) value); + } + }; + case DECIMAL: + return new RowToJsonConverter() { + @Override + public JsonNode convert(ObjectMapper mapper, JsonNode reuse, Object value) { + return mapper.getNodeFactory().numberNode((BigDecimal) value); + } + }; + case BYTES: + return new RowToJsonConverter() { + @Override + public JsonNode convert(ObjectMapper mapper, JsonNode reuse, Object value) { + return mapper.getNodeFactory().binaryNode((byte[]) value); + } + }; + case STRING: + return new RowToJsonConverter() { + @Override + public JsonNode convert(ObjectMapper mapper, JsonNode reuse, Object value) { + return mapper.getNodeFactory().textNode((String) value); + } + }; + case DATE: + return new RowToJsonConverter() { + @Override + public JsonNode convert(ObjectMapper mapper, JsonNode reuse, Object value) { + return mapper.getNodeFactory() + .textNode(ISO_LOCAL_DATE.format((LocalDate) value)); + } + }; + case TIME: + return new RowToJsonConverter() { + @Override + public JsonNode convert(ObjectMapper mapper, JsonNode reuse, Object value) { + return mapper.getNodeFactory() + .textNode(TimeFormat.TIME_FORMAT.format((LocalTime) value)); + } + }; + case TIMESTAMP: + return new RowToJsonConverter() { + @Override + public JsonNode convert(ObjectMapper mapper, JsonNode reuse, Object value) { + return mapper.getNodeFactory() + .textNode(ISO_LOCAL_DATE_TIME.format((LocalDateTime) value)); + } + }; + case TIMESTAMP_TZ: + return new RowToJsonConverter() { + @Override + public JsonNode convert(ObjectMapper mapper, JsonNode reuse, Object value) { + return mapper.getNodeFactory() + .textNode(ISO_OFFSET_DATE_TIME.format((OffsetDateTime) value)); + } + }; + case ARRAY: + return createArrayConverter((ArrayType) type); + case MAP: + MapType mapType = (MapType) type; + return createMapConverter(mapType.getKeyType(), mapType.getValueType()); + default: + throw new SeaTunnelJsonFormatException( + CommonErrorCode.UNSUPPORTED_DATA_TYPE, "unsupported parse type: " + type); + } + } + + private RowToJsonConverter createRowConverter(SeaTunnelRowType rowType) { + final RowToJsonConverter[] fieldConverters = + Arrays.stream(rowType.getFieldTypes()) + .map( + new Function<SeaTunnelDataType<?>, Object>() { + @Override + public Object apply(SeaTunnelDataType<?> seaTunnelDataType) { + return createConverter(seaTunnelDataType); + } + }) + .toArray( + new IntFunction<RowToJsonConverter[]>() { + @Override + public RowToJsonConverter[] apply(int value) { + return new RowToJsonConverter[value]; + } + }); + final String[] fieldNames = rowType.getFieldNames(); + final int arity = fieldNames.length; + + return new RowToJsonConverter() { + @Override + public JsonNode convert(ObjectMapper mapper, JsonNode reuse, Object value) { + ObjectNode node; + + // reuse could be a NullNode if last record is null. + if (reuse == null || reuse.isNull()) { + node = mapper.createObjectNode(); + } else { + node = (ObjectNode) reuse; + } + + for (int i = 0; i < arity; i++) { + String fieldName = fieldNames[i]; + SeaTunnelRow row = (SeaTunnelRow) value; + node.set( + fieldName, + fieldConverters[i].convert( + mapper, node.get(fieldName), row.getField(i))); + } + + return node; + } + }; + } + + private RowToJsonConverter createArrayConverter(ArrayType arrayType) { + final RowToJsonConverter elementConverter = createConverter(arrayType.getElementType()); + return new RowToJsonConverter() { + @Override + public JsonNode convert(ObjectMapper mapper, JsonNode reuse, Object value) { + ArrayNode node; + + // reuse could be a NullNode if last record is null. + if (reuse == null || reuse.isNull()) { + node = mapper.createArrayNode(); + } else { + node = (ArrayNode) reuse; + node.removeAll(); + } + + Object[] arrayData = (Object[]) value; + int numElements = arrayData.length; + for (int i = 0; i < numElements; i++) { + Object element = arrayData[i]; + node.add(elementConverter.convert(mapper, null, element)); + } + + return node; + } + }; + } + + protected RowToJsonConverter createMapConverter( + SeaTunnelDataType<?> keyType, SeaTunnelDataType<?> valueType) { + final RowToJsonConverter keyConverter = createConverter(keyType); + final RowToJsonConverter valueConverter = createConverter(valueType); + + return new RowToJsonConverter() { + @Override + public JsonNode convert(ObjectMapper mapper, JsonNode reuse, Object value) { + ObjectNode node = mapper.createObjectNode(); + + Map<?, ?> mapData = (Map) value; + for (Map.Entry<?, ?> entry : mapData.entrySet()) { + // Convert the key to a string using the key converter + JsonNode keyNode = keyConverter.convert(mapper, null, entry.getKey()); + String fieldName = keyNode.isTextual() ? keyNode.asText() : keyNode.toString(); + + node.set( + fieldName, + valueConverter.convert(mapper, node.get(fieldName), entry.getValue())); + } + return mapper.getNodeFactory().textNode(node.toString()); + } Review Comment: `createMapConverter` returns `textNode(node.toString())`, which serializes MAP values as a JSON *string* rather than a JSON object. This differs from `seatunnel-format-json`'s `RowToJsonConverters` (which returns an `ObjectNode`) and will change the on-wire schema for MAP columns. Return the `ObjectNode` itself (and reuse/clear `reuse` like the upstream implementation) unless BigQuery explicitly requires a string for this field type. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
