DanielCarter-stack commented on PR #10485:
URL: https://github.com/apache/seatunnel/pull/10485#issuecomment-3885580909
<!-- code-pr-reviewer -->
<!-- cpr:pr_reply_v2_parts {"group": "apache/seatunnel#10485", "part": 1,
"total": 1} -->
### Issue 1: Missing Exactly-Once Semantics Support
**Location**:
`seatunnel-connectors-v2/connector-bigquery/src/main/java/org/apache/seatunnel/connectors/bigquery/sink/BigQuerySink.java:33`
```java
public class BigQuerySink
implements SeaTunnelSink<SeaTunnelRow, Void, Void, Void>,
SupportMultiTableSink {
```
**Related Context**:
- Interface definition:
`seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SeaTunnelSink.java:47`
- Implementation: BigQuerySinkWriter lines 126-129 return `Optional.empty()`
**Issue Description**:
BigQuerySink uses generic parameter `<SeaTunnelRow, Void, Void, Void>`,
explicitly indicating lack of state management and two-phase commit support.
This leads to:
1. `prepareCommit()` returns empty, unable to implement two-phase commit
2. `snapshotState()` uses default implementation returning empty list
3. Unable to restore to previous checkpoint after Job failure
4. Does not support exactly-once or at-least-once semantics
Since BigQuery Storage Write API is a streaming API where data cannot be
rolled back after writing, the current implementation cannot guarantee data
consistency.
**Potential Risks**:
- Data loss risk: If job fails during writing, sent data cannot be rolled
back
- Data duplication risk: If job restarts, data may be resent
- Unrecoverable: Unable to continue from last position after checkpoint
failure
**Impact Scope**:
- Direct impact: BigQuerySink, BigQuerySinkWriter
- Indirect impact: All user jobs using BigQuery Sink
- Affected area: Single Connector
**Severity**: CRITICAL
**Suggestions for Improvement**:
This is an architectural limitation. Suggested:
1. Clearly document in docs that exactly-once is not supported, which may
cause data duplication or loss
2. Or implement BigQuery Storage Write API's streaming offset mechanism to
support exactly-once (requires in-depth research of API capabilities)
### Issue 2: Incomplete Async Write Error Propagation Mechanism
**Location**:
`seatunnel-connectors-v2/connector-bigquery/src/main/java/org/apache/seatunnel/connectors/bigquery/sink/BigQuerySinkWriter.java:81-117`
```java
private void flush() throws IOException {
// ... omitting the first part
ApiFutures.addCallback(
future,
new ApiFutureCallback<AppendRowsResponse>() {
@Override
public void onSuccess(AppendRowsResponse result) {
inflightRequests.arriveAndDeregister();
log.info("Successfully appended {} rows.",
dataToSend.length());
}
@Override
public void onFailure(Throwable t) {
fatalError.compareAndSet(
null,
new BigQueryConnectorException(
BigQueryConnectorErrorCode.APPEND_ROWS_FAILED, t));
inflightRequests.arriveAndDeregister();
log.warn("Failed to append rows.", t);
}
},
MoreExecutors.directExecutor());
}
```
**Related Context**:
- write() method: lines 70-79
- checkFatalError() method: lines 119-124
- close() method: lines 135-141
**Issue Description**:
The flush() method uses async callbacks to handle results, with errors
stored in `fatalError` AtomicReference. Following issues exist:
1. **Race condition**: `write()` calls `checkFatalError()` at line 71, but
if `onFailure` in the async callback executes after this, the error will only
be detected during the next `write()` or `close()`.
2. **Error delay**: If flush() successfully sends request but subsequent
callback fails, subsequently written data by user will also be discarded
(because checkFatalError will throw exception).
3. **Partial success**: If a batch fails, buffer has been cleared (line 85),
causing data loss.
**Potential Risks**:
- Data loss: Failed batch has been cleared from buffer, cannot retry
- Delayed error detection: Errors may only be discovered after multiple
write() calls
- State inconsistency: User may think data was successfully sent, but
actually failed
**Impact Scope**:
- Direct impact: All callers of BigQuerySinkWriter
- Indirect impact: Data integrity of all jobs using BigQuery Sink
- Affected area: Single Connector
**Severity**: CRITICAL
**Suggestions for Improvement**:
```java
// Pseudocode: use synchronous or CompletableFutures to wait
private void flush() throws IOException {
if (buffer.length() == 0) return;
JSONArray dataToSend = buffer;
buffer = new JSONArray();
ApiFuture<AppendRowsResponse> future;
try {
future = streamWriter.append(dataToSend);
} catch (Exception e) {
// Restore buffer on failure
buffer = dataToSend;
throw new
BigQueryConnectorException(BigQueryConnectorErrorCode.APPEND_ROWS_FAILED, e);
}
try {
// Wait synchronously for result
future.get();
log.info("Successfully appended {} rows.", dataToSend.length());
} catch (Exception e) {
// Restore buffer on failure or retry
buffer = dataToSend;
throw new
BigQueryConnectorException(BigQueryConnectorErrorCode.APPEND_ROWS_FAILED, e);
}
}
```
### Issue 3: close() Method May Cause Deadlock
**Location**:
`seatunnel-connectors-v2/connector-bigquery/src/main/java/org/apache/seatunnel/connectors/bigquery/sink/BigQuerySinkWriter.java:135-141`
```java
@Override
public void close() throws IOException {
flush(); // Line 136
inflightRequests.arriveAndAwaitAdvance(); // Line 137
checkFatalError();
streamWriter.close();
}
```
**Related Context**:
- flush() method: lines 81-117
- inflightRequests initialization: line 54 `new Phaser(1)`
**Issue Description**:
If `flush()` method throws exception at line 91
`streamWriter.append(dataToSend)`:
1. Exception is caught at lines 92-94
2. Line 93 calls `inflightRequests.arriveAndDeregister()`
3. Exception is rethrown
4. `close()` method is interrupted at line 136 due to exception
5. Line 137 `inflightRequests.arriveAndAwaitAdvance()` will not execute
**Problem**: Phaser's initial parties is 1, if `arriveAndAwaitAdvance()` is
not called, other waiting threads will block permanently. However, in `close()`
scenario, this should not cause deadlock since no other threads are waiting.
**More serious issue**: If `flush()` successfully sends request,
`arriveAndDeregister()` in callback has not yet executed, then `close()` calls
`arriveAndAwaitAdvance()`, this will lead to:
- Main thread arrive (decrement from 1 to 0)
- Wait for phase 0 to complete
- But if callback is still executing and will arrive, this will cause phase
to become -1 (error)
**Potential Risks**:
- Phaser state inconsistency
- Potential deadlock or race condition
- Resource leak
**Impact Scope**:
- Direct impact: BigQuerySinkWriter
- Indirect impact: Job shutdown process
- Affected area: Single Connector
**Severity**: MAJOR
**Suggestions for Improvement**:
```java
@Override
public void close() throws IOException {
try {
flush();
} finally {
// Wait for inflight requests regardless of whether flush succeeds
inflightRequests.arriveAndAwaitAdvance();
}
checkFatalError();
streamWriter.close();
}
```
### Issue 4: Incorrect Map Conversion Logic in RowToJsonConverters
**Location**:
`seatunnel-connectors-v2/connector-bigquery/src/main/java/org/apache/seatunnel/connectors/bigquery/convert/RowToJsonConverters.java:278-301`
```java
protected RowToJsonConverter createMapConverter(
SeaTunnelDataType<?> keyType, SeaTunnelDataType<?> valueType) {
// ... omitting the first part
return new RowToJsonConverter() {
@Override
public JsonNode convert(ObjectMapper mapper, JsonNode reuse, Object
value) {
ObjectNode node = mapper.createObjectNode();
Map<?, ?> mapData = (Map) value;
for (Map.Entry<?, ?> entry : mapData.entrySet()) {
JsonNode keyNode = keyConverter.convert(mapper, null,
entry.getKey());
String fieldName = keyNode.isTextual() ? keyNode.asText() :
keyNode.toString();
node.set(fieldName, valueConverter.convert(mapper,
node.get(fieldName), entry.getValue()));
}
return mapper.getNodeFactory().textNode(node.toString()); //
Line 298: Error!
}
};
}
```
**Related Context**:
- Original code:
`seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/RowToJsonConverters.java:278-301`
(same issue)
- Caller: BigQuerySerializer lines 39-40
**Issue Description**:
Line 298 returns `textNode(node.toString())`, which serializes the entire
JSON object as a string literal.
For example, assuming Map data is `{"key1": "value1", "key2": 123}`:
- Expected output: `{"key1": "value1", "key2": 123}` (JsonNode is ObjectNode)
- Actual output: `"{\"key1\": \"value1\", \"key2\": 123}"` (JsonNode is
TextNode with escaped JSON string as value)
**Potential Risks**:
- Incorrect data format: BigQuery receives string instead of JSON object
- Type mismatch: If table schema is MAP/STRUCT type, write will fail
- Query issues: Even if write succeeds, data is string rather than queryable
JSON
**Impact Scope**:
- Direct impact: Serialization of all MAP type fields
- Indirect impact: Data containing MAP types cannot be correctly written to
BigQuery
- Affected area: Single Connector, but same issue exists in format-json
module
**Severity**: CRITICAL
**Suggestions for Improvement**:
```java
@Override
public JsonNode convert(ObjectMapper mapper, JsonNode reuse, Object value) {
ObjectNode node = mapper.createObjectNode();
Map<?, ?> mapData = (Map) value;
for (Map.Entry<?, ?> entry : mapData.entrySet()) {
JsonNode keyNode = keyConverter.convert(mapper, null,
entry.getKey());
String fieldName = keyNode.isTextual() ? keyNode.asText() :
keyNode.toString();
node.set(fieldName, valueConverter.convert(mapper,
node.get(fieldName), entry.getValue()));
}
return node; // Return ObjectNode instead of textNode
}
```
### Issue 5: Code Duplication in RowToJsonConverters
**Location**:
`seatunnel-connectors-v2/connector-bigquery/src/main/java/org/apache/seatunnel/connectors/bigquery/convert/RowToJsonConverters.java`
**Related Context**:
- Original implementation:
`seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/RowToJsonConverters.java`
- New interface:
`seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/RowToJsonConverter.java`
**Issue Description**:
The PR extracts `RowToJsonConverters.RowToJsonConverter` internal interface
as independent top-level interface, but BigQuery connector copies entire
`RowToJsonConverters` class (302 lines) into its own package instead of reusing
format-json module.
**Problems**:
1. **Code duplication**: Large amount of code is completely identical,
violating DRY principle
2. **Bug duplication**: Map conversion Bug from original code is also copied
3. **Maintenance cost**: Future maintenance requires maintaining two copies
simultaneously
4. **Inconsistency risk**: Two implementations may gradually diverge
**Potential Risks**:
- Maintenance difficulty: Fixing one place requires fixing the other
- Inconsistency: Two implementations may produce different results
- Bug amplification: Original Bug's impact scope is expanded
**Impact Scope**:
- Direct impact: connector-bigquery and format-json modules
- Indirect impact: Any future modifications to RowToJsonConverters
- Affected area: Multiple Connectors (format-json is shared module)
**Severity**: MAJOR
**Suggestions for Improvement**:
1. Delete RowToJsonConverters class from connector-bigquery
2. Use format-json module implementation directly:
```java
// BigQuerySerializer.java
public BigQuerySerializer(CatalogTable catalogTable) {
SeaTunnelRowType rowType = catalogTable.getSeaTunnelRowType();
this.jsonSerializationSchema =
new JsonSerializationSchema(
rowType,
new
org.apache.seatunnel.format.json.RowToJsonConverters()
.createConverter(checkNotNull(rowType)));
}
```
### Issue 6: New Public Method in JsonSerializationSchema
**Location**:
`seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonSerializationSchema.java:94-103`
```java
public JsonNode convert(SeaTunnelRow row) {
if (node == null) {
node = mapper.createObjectNode();
}
try {
return runtimeConverter.convert(mapper, node, row);
} catch (Exception e) {
throw CommonError.jsonOperationError(FORMAT, row.toString(), e);
}
}
```
**Related Context**:
- Original method: serialize() returns byte[]
- New method: convert() returns JsonNode
- Caller: BigQuerySerializer
**Issue Description**:
JsonSerializationSchema is a shared format class, adding new public method
may:
1. Affect other Connectors using this class
2. Change the class's public API
3. If other code depends on this behavior, may cause compatibility issues
Although this method is to support BigQuery's requirement (needs JsonNode
instead of byte[]), need to evaluate whether it will affect other modules.
**Potential Risks**:
- API stability: Changes to public methods may affect other code
- Compatibility issues: If other code has method with same name, may cause
conflicts
- Maintenance burden: Need to maintain additional public methods
**Impact Scope**:
- Direct impact: All users of JsonSerializationSchema
- Indirect impact: All Connectors depending on format-json
- Affected area: Multiple Connectors
**Severity**: MAJOR
**Suggestions for Improvement**:
1. Check if other Connectors use JsonSerializationSchema
2. If this method needs to be kept, should document purpose and applicable
scenarios in JavaDoc
3. Consider using subclass or composition pattern instead of modifying
shared class
### Issue 7: Insufficient Sensitive Information Handling
**Location**:
`seatunnel-connectors-v2/connector-bigquery/src/main/java/org/apache/seatunnel/connectors/bigquery/option/BigQuerySinkOptions.java:45-55`
```java
public static final Option<String> SERVICE_ACCOUNT_KEY_PATH =
Options.key("service_account_key_path")
.stringType()
.noDefaultValue()
.withDescription("Path to GCP service account JSON key
file");
public static final Option<String> SERVICE_ACCOUNT_KEY_JSON =
Options.key("service_account_key_json")
.stringType()
.noDefaultValue()
.withDescription("Inline GCP service account JSON key
content");
```
**Related Context**:
- User: BigQueryWriteClientFactory lines 66-81
**Issue Description**:
SERVICE_ACCOUNT_KEY_JSON allows users to directly embed GCP service account
private key JSON in configuration files.
**Potential Risks**:
1. **Leakage risk**: Configuration files may be committed to version
control, causing private key leakage
2. **Log leakage**: If configuration is printed to logs, private key will be
exposed
3. **Excessive permissions**: Service account may have excessive
permissions, increasing security risk
4. **No validation**: BigQueryWriteClientFactory line 78 directly uses
string without validating JSON format
**Impact Scope**:
- Direct impact: All users using BigQuery Sink
- Indirect impact: GCP account security
- Affected area: Single Connector
**Severity**: MAJOR
**Suggestions for Improvement**:
1. Clearly warn in documentation not to use SERVICE_ACCOUNT_KEY_JSON
directly in configuration files
2. Redact sensitive information in logs
3. Add JSON format validation:
```java
try {
byte[] jsonBytes =
config.get(BigQuerySinkOptions.SERVICE_ACCOUNT_KEY_JSON).getBytes(StandardCharsets.UTF_8);
credentials = ServiceAccountCredentials.fromStream(new
ByteArrayInputStream(jsonBytes));
} catch (IOException e) {
throw new BigQueryConnectorException(
BigQueryConnectorErrorCode.INVALID_CREDENTIALS,
"Invalid service account JSON format",
e);
}
```
### Issue 8: Tests Disabled and Lacking Unit Tests
**Location**:
`seatunnel-e2e/seatunnel-connector-v2-e2e/connector-bigquery-e2e/src/test/java/org/apache/seatunnel/e2e/connector/bigquery/BigqueryIT.java:32-47`
```java
@TestTemplate
@Disabled("bigquery-emulator does not support bigquery storage write api.")
void testBigQuerySink(TestContainer container) throws IOException,
InterruptedException {
Container.ExecResult execResult =
container.executeJob("/fake_to_bigquery_sink.conf");
Assertions.assertEquals(0, execResult.getExitCode());
}
@TestTemplate
@Disabled("bigquery-emulator does not support bigquery storage write api.")
void testBigQuerySinkWithVariousType(TestContainer container)
throws IOException, InterruptedException {
Container.ExecResult execResult =
container.executeJob("/fake_to_bigquery_sink_with_various_type.conf");
Assertions.assertEquals(0, execResult.getExitCode());
}
```
**Related Context**:
- PR description mentions: "I've added `BigQueryIT` for testing. However,
the bigquery-emulator doesn't appear to support the BigQuery Write Storage API,
so I tested this using my GCP account and verified that it works successfully."
**Issue Description**:
1. **Tests disabled**: All tests marked as `@Disabled`, will not run in CI
2. **Missing unit tests**: Key classes have no unit tests
3. **Dependence on real environment**: Author tested using real GCP account,
but this cannot be automated
**Potential Risks**:
1. **Regression risk**: Future code changes may break functionality without
timely detection
2. **Quality assurance**: Cannot automatically verify functional correctness
in CI
3. **Development difficulty**: Other developers cannot quickly verify
modifications
**Impact Scope**:
- Direct impact: Code quality and stability
- Indirect impact: All users using this Connector
- Affected area: Single Connector
**Severity**: MAJOR
**Suggestions for Improvement**:
1. Add unit tests, using Mock to test BigQuery API:
```java
@Test
public void testWrite() throws IOException {
BigQueryWriter mockWriter = mock(BigQueryWriter.class);
when(mockWriter.append(any())).thenReturn(ApiFutures.immediateFuture(mockResponse));
BigQuerySinkWriter writer = new BigQuerySinkWriter(config, mockWriter,
serializer);
writer.write(testRow);
verify(mockWriter).append(any());
}
```
2. For integration tests, consider:
- Use Testcontainers to set up real BigQuery environment
- Or create a mock BigQuery server (e.g., using WireMock)
- Or accept manual testing (but document in docs)
### Issue 9: Hardcoded Retry Parameters
**Location**:
`seatunnel-connectors-v2/connector-bigquery/src/main/java/org/apache/seatunnel/connectors/bigquery/sink/writer/BigQueryStreamWriter.java:74-80`
```java
RetrySettings retrySettings =
RetrySettings.newBuilder()
.setInitialRetryDelay(Duration.ofMillis(500))
.setRetryDelayMultiplier(1.1)
.setMaxAttempts(5)
.setMaxRetryDelay(Duration.ofMinutes(1))
.build();
```
**Related Context**:
- BigQuerySinkOptions: Configuration options definition
**Issue Description**:
Retry parameters are completely hardcoded, users cannot adjust based on
network environment or business requirements.
**Potential Risks**:
- Insufficient flexibility: High-latency networks may require longer retry
delays
- Timeout issues: Fast retries may increase server load
- Debugging difficulty: Cannot troubleshoot issues by adjusting parameters
**Impact Scope**:
- Direct impact: All users using BigQuery Sink
- Affected area: Single Connector
**Severity**: MINOR
**Suggestions for Improvement**:
Add configuration options in BigQuerySinkOptions:
```java
public static final Option<Long> INITIAL_RETRY_DELAY_MS =
Options.key("initial_retry_delay_ms")
.longType()
.defaultValue(500L)
.withDescription("Initial retry delay in milliseconds");
public static final Option<Integer> MAX_RETRY_ATTEMPTS =
Options.key("max_retry_attempts")
.intType()
.defaultValue(5)
.withDescription("Maximum number of retry attempts");
```
Then use configured values in BigQueryStreamWriter.
### Issue 10: Missing Metrics Collection
**Location**:
`seatunnel-connectors-v2/connector-bigquery/src/main/java/org/apache/seatunnel/connectors/bigquery/sink/BigQuerySinkWriter.java`
**Related Context**:
- SinkWriter.Context interface provides `getMetricsContext()`
- Other Sinks (e.g., KafkaSink) all collect Metrics
**Issue Description**:
BigQuerySinkWriter does not collect or report any metrics, including:
- Total rows sent
- Batch count
- Failure count
- Latency metrics
- Current buffer size
**Potential Risks**:
- Poor observability: Users cannot monitor Sink performance
- Troubleshooting difficulty: Cannot locate issues through Metrics when
problems occur
- Performance optimization: Cannot optimize configuration through Metrics
**Impact Scope**:
- Direct impact: Production environment observability
- Affected area: Single Connector
**Severity**: MINOR
**Suggestions for Improvement**:
```java
public BigQuerySinkWriter(
ReadonlyConfig readOnlyConfig,
BigQueryStreamWriter streamWriter,
BigQuerySerializer serializer,
SinkWriter.Context context) { // Add Context parameter
this.batchSize = readOnlyConfig.get(BigQuerySinkOptions.BATCH_SIZE);
this.streamWriter = streamWriter;
this.serializer = serializer;
// Initialize Metrics
this.rowsSent = context.getMetricsContext().counter("rowsSent");
this.batchesSent = context.getMetricsContext().counter("batchesSent");
this.sendErrors = context.getMetricsContext().counter("sendErrors");
}
```
---
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]