DanielCarter-stack commented on PR #10485:
URL: https://github.com/apache/seatunnel/pull/10485#issuecomment-3885580909

   <!-- code-pr-reviewer -->
   <!-- cpr:pr_reply_v2_parts {"group": "apache/seatunnel#10485", "part": 1, 
"total": 1} -->
   ### Issue 1: Missing Exactly-Once Semantics Support
   
   **Location**: 
`seatunnel-connectors-v2/connector-bigquery/src/main/java/org/apache/seatunnel/connectors/bigquery/sink/BigQuerySink.java:33`
   
   ```java
   public class BigQuerySink
           implements SeaTunnelSink<SeaTunnelRow, Void, Void, Void>, 
SupportMultiTableSink {
   ```
   
   **Related Context**:
   - Interface definition: 
`seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SeaTunnelSink.java:47`
   - Implementation: BigQuerySinkWriter lines 126-129 return `Optional.empty()`
   
   **Issue Description**:
   BigQuerySink uses generic parameter `<SeaTunnelRow, Void, Void, Void>`, 
explicitly indicating lack of state management and two-phase commit support. 
This leads to:
   1. `prepareCommit()` returns empty, unable to implement two-phase commit
   2. `snapshotState()` uses default implementation returning empty list
   3. Unable to restore to previous checkpoint after Job failure
   4. Does not support exactly-once or at-least-once semantics
   
   Since BigQuery Storage Write API is a streaming API where data cannot be 
rolled back after writing, the current implementation cannot guarantee data 
consistency.
   
   **Potential Risks**:
   - Data loss risk: If job fails during writing, sent data cannot be rolled 
back
   - Data duplication risk: If job restarts, data may be resent
   - Unrecoverable: Unable to continue from last position after checkpoint 
failure
   
   **Impact Scope**:
   - Direct impact: BigQuerySink, BigQuerySinkWriter
   - Indirect impact: All user jobs using BigQuery Sink
   - Affected area: Single Connector
   
   **Severity**: CRITICAL
   
   **Suggestions for Improvement**:
   This is an architectural limitation. Suggested:
   1. Clearly document in docs that exactly-once is not supported, which may 
cause data duplication or loss
   2. Or implement BigQuery Storage Write API's streaming offset mechanism to 
support exactly-once (requires in-depth research of API capabilities)
   
   ### Issue 2: Incomplete Async Write Error Propagation Mechanism
   
   **Location**: 
`seatunnel-connectors-v2/connector-bigquery/src/main/java/org/apache/seatunnel/connectors/bigquery/sink/BigQuerySinkWriter.java:81-117`
   
   ```java
   private void flush() throws IOException {
       // ... omitting the first part
       ApiFutures.addCallback(
               future,
               new ApiFutureCallback<AppendRowsResponse>() {
                   @Override
                   public void onSuccess(AppendRowsResponse result) {
                       inflightRequests.arriveAndDeregister();
                       log.info("Successfully appended {} rows.", 
dataToSend.length());
                   }
   
                   @Override
                   public void onFailure(Throwable t) {
                       fatalError.compareAndSet(
                               null,
                               new BigQueryConnectorException(
                                       
BigQueryConnectorErrorCode.APPEND_ROWS_FAILED, t));
                       inflightRequests.arriveAndDeregister();
                       log.warn("Failed to append rows.", t);
                   }
               },
               MoreExecutors.directExecutor());
   }
   ```
   
   **Related Context**:
   - write() method: lines 70-79
   - checkFatalError() method: lines 119-124
   - close() method: lines 135-141
   
   **Issue Description**:
   The flush() method uses async callbacks to handle results, with errors 
stored in `fatalError` AtomicReference. Following issues exist:
   
   1. **Race condition**: `write()` calls `checkFatalError()` at line 71, but 
if `onFailure` in the async callback executes after this, the error will only 
be detected during the next `write()` or `close()`.
   
   2. **Error delay**: If flush() successfully sends request but subsequent 
callback fails, subsequently written data by user will also be discarded 
(because checkFatalError will throw exception).
   
   3. **Partial success**: If a batch fails, buffer has been cleared (line 85), 
causing data loss.
   
   **Potential Risks**:
   - Data loss: Failed batch has been cleared from buffer, cannot retry
   - Delayed error detection: Errors may only be discovered after multiple 
write() calls
   - State inconsistency: User may think data was successfully sent, but 
actually failed
   
   **Impact Scope**:
   - Direct impact: All callers of BigQuerySinkWriter
   - Indirect impact: Data integrity of all jobs using BigQuery Sink
   - Affected area: Single Connector
   
   **Severity**: CRITICAL
   
   **Suggestions for Improvement**:
   ```java
   // Pseudocode: use synchronous or CompletableFutures to wait
   private void flush() throws IOException {
       if (buffer.length() == 0) return;
       
       JSONArray dataToSend = buffer;
       buffer = new JSONArray();
       
       ApiFuture<AppendRowsResponse> future;
       try {
           future = streamWriter.append(dataToSend);
       } catch (Exception e) {
           // Restore buffer on failure
           buffer = dataToSend;
           throw new 
BigQueryConnectorException(BigQueryConnectorErrorCode.APPEND_ROWS_FAILED, e);
       }
       
       try {
           // Wait synchronously for result
           future.get();
           log.info("Successfully appended {} rows.", dataToSend.length());
       } catch (Exception e) {
           // Restore buffer on failure or retry
           buffer = dataToSend;
           throw new 
BigQueryConnectorException(BigQueryConnectorErrorCode.APPEND_ROWS_FAILED, e);
       }
   }
   ```
   
   ### Issue 3: close() Method May Cause Deadlock
   
   **Location**: 
`seatunnel-connectors-v2/connector-bigquery/src/main/java/org/apache/seatunnel/connectors/bigquery/sink/BigQuerySinkWriter.java:135-141`
   
   ```java
   @Override
   public void close() throws IOException {
       flush();  // Line 136
       inflightRequests.arriveAndAwaitAdvance();  // Line 137
       checkFatalError();
       streamWriter.close();
   }
   ```
   
   **Related Context**:
   - flush() method: lines 81-117
   - inflightRequests initialization: line 54 `new Phaser(1)`
   
   **Issue Description**:
   If `flush()` method throws exception at line 91 
`streamWriter.append(dataToSend)`:
   1. Exception is caught at lines 92-94
   2. Line 93 calls `inflightRequests.arriveAndDeregister()`
   3. Exception is rethrown
   4. `close()` method is interrupted at line 136 due to exception
   5. Line 137 `inflightRequests.arriveAndAwaitAdvance()` will not execute
   
   **Problem**: Phaser's initial parties is 1, if `arriveAndAwaitAdvance()` is 
not called, other waiting threads will block permanently. However, in `close()` 
scenario, this should not cause deadlock since no other threads are waiting.
   
   **More serious issue**: If `flush()` successfully sends request, 
`arriveAndDeregister()` in callback has not yet executed, then `close()` calls 
`arriveAndAwaitAdvance()`, this will lead to:
   - Main thread arrive (decrement from 1 to 0)
   - Wait for phase 0 to complete
   - But if callback is still executing and will arrive, this will cause phase 
to become -1 (error)
   
   **Potential Risks**:
   - Phaser state inconsistency
   - Potential deadlock or race condition
   - Resource leak
   
   **Impact Scope**:
   - Direct impact: BigQuerySinkWriter
   - Indirect impact: Job shutdown process
   - Affected area: Single Connector
   
   **Severity**: MAJOR
   
   **Suggestions for Improvement**:
   ```java
   @Override
   public void close() throws IOException {
       try {
           flush();
       } finally {
           // Wait for inflight requests regardless of whether flush succeeds
           inflightRequests.arriveAndAwaitAdvance();
       }
       checkFatalError();
       streamWriter.close();
   }
   ```
   
   ### Issue 4: Incorrect Map Conversion Logic in RowToJsonConverters
   
   **Location**: 
`seatunnel-connectors-v2/connector-bigquery/src/main/java/org/apache/seatunnel/connectors/bigquery/convert/RowToJsonConverters.java:278-301`
   
   ```java
   protected RowToJsonConverter createMapConverter(
           SeaTunnelDataType<?> keyType, SeaTunnelDataType<?> valueType) {
       // ... omitting the first part
       return new RowToJsonConverter() {
           @Override
           public JsonNode convert(ObjectMapper mapper, JsonNode reuse, Object 
value) {
               ObjectNode node = mapper.createObjectNode();
               Map<?, ?> mapData = (Map) value;
               for (Map.Entry<?, ?> entry : mapData.entrySet()) {
                   JsonNode keyNode = keyConverter.convert(mapper, null, 
entry.getKey());
                   String fieldName = keyNode.isTextual() ? keyNode.asText() : 
keyNode.toString();
                   node.set(fieldName, valueConverter.convert(mapper, 
node.get(fieldName), entry.getValue()));
               }
               return mapper.getNodeFactory().textNode(node.toString());  // 
Line 298: Error!
           }
       };
   }
   ```
   
   **Related Context**:
   - Original code: 
`seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/RowToJsonConverters.java:278-301`
 (same issue)
   - Caller: BigQuerySerializer lines 39-40
   
   **Issue Description**:
   Line 298 returns `textNode(node.toString())`, which serializes the entire 
JSON object as a string literal.
   
   For example, assuming Map data is `{"key1": "value1", "key2": 123}`:
   - Expected output: `{"key1": "value1", "key2": 123}` (JsonNode is ObjectNode)
   - Actual output: `"{\"key1\": \"value1\", \"key2\": 123}"` (JsonNode is 
TextNode with escaped JSON string as value)
   
   **Potential Risks**:
   - Incorrect data format: BigQuery receives string instead of JSON object
   - Type mismatch: If table schema is MAP/STRUCT type, write will fail
   - Query issues: Even if write succeeds, data is string rather than queryable 
JSON
   
   **Impact Scope**:
   - Direct impact: Serialization of all MAP type fields
   - Indirect impact: Data containing MAP types cannot be correctly written to 
BigQuery
   - Affected area: Single Connector, but same issue exists in format-json 
module
   
   **Severity**: CRITICAL
   
   **Suggestions for Improvement**:
   ```java
   @Override
   public JsonNode convert(ObjectMapper mapper, JsonNode reuse, Object value) {
       ObjectNode node = mapper.createObjectNode();
       Map<?, ?> mapData = (Map) value;
       for (Map.Entry<?, ?> entry : mapData.entrySet()) {
           JsonNode keyNode = keyConverter.convert(mapper, null, 
entry.getKey());
           String fieldName = keyNode.isTextual() ? keyNode.asText() : 
keyNode.toString();
           node.set(fieldName, valueConverter.convert(mapper, 
node.get(fieldName), entry.getValue()));
       }
       return node;  // Return ObjectNode instead of textNode
   }
   ```
   
   ### Issue 5: Code Duplication in RowToJsonConverters
   
   **Location**: 
`seatunnel-connectors-v2/connector-bigquery/src/main/java/org/apache/seatunnel/connectors/bigquery/convert/RowToJsonConverters.java`
   
   **Related Context**:
   - Original implementation: 
`seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/RowToJsonConverters.java`
   - New interface: 
`seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/RowToJsonConverter.java`
   
   **Issue Description**:
   The PR extracts `RowToJsonConverters.RowToJsonConverter` internal interface 
as independent top-level interface, but BigQuery connector copies entire 
`RowToJsonConverters` class (302 lines) into its own package instead of reusing 
format-json module.
   
   **Problems**:
   1. **Code duplication**: Large amount of code is completely identical, 
violating DRY principle
   2. **Bug duplication**: Map conversion Bug from original code is also copied
   3. **Maintenance cost**: Future maintenance requires maintaining two copies 
simultaneously
   4. **Inconsistency risk**: Two implementations may gradually diverge
   
   **Potential Risks**:
   - Maintenance difficulty: Fixing one place requires fixing the other
   - Inconsistency: Two implementations may produce different results
   - Bug amplification: Original Bug's impact scope is expanded
   
   **Impact Scope**:
   - Direct impact: connector-bigquery and format-json modules
   - Indirect impact: Any future modifications to RowToJsonConverters
   - Affected area: Multiple Connectors (format-json is shared module)
   
   **Severity**: MAJOR
   
   **Suggestions for Improvement**:
   1. Delete RowToJsonConverters class from connector-bigquery
   2. Use format-json module implementation directly:
   ```java
   // BigQuerySerializer.java
   public BigQuerySerializer(CatalogTable catalogTable) {
       SeaTunnelRowType rowType = catalogTable.getSeaTunnelRowType();
       this.jsonSerializationSchema =
               new JsonSerializationSchema(
                       rowType, 
                       new 
org.apache.seatunnel.format.json.RowToJsonConverters()
                           .createConverter(checkNotNull(rowType)));
   }
   ```
   
   ### Issue 6: New Public Method in JsonSerializationSchema
   
   **Location**: 
`seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonSerializationSchema.java:94-103`
   
   ```java
   public JsonNode convert(SeaTunnelRow row) {
       if (node == null) {
           node = mapper.createObjectNode();
       }
   
       try {
           return runtimeConverter.convert(mapper, node, row);
       } catch (Exception e) {
           throw CommonError.jsonOperationError(FORMAT, row.toString(), e);
       }
   }
   ```
   
   **Related Context**:
   - Original method: serialize() returns byte[]
   - New method: convert() returns JsonNode
   - Caller: BigQuerySerializer
   
   **Issue Description**:
   JsonSerializationSchema is a shared format class, adding new public method 
may:
   1. Affect other Connectors using this class
   2. Change the class's public API
   3. If other code depends on this behavior, may cause compatibility issues
   
   Although this method is to support BigQuery's requirement (needs JsonNode 
instead of byte[]), need to evaluate whether it will affect other modules.
   
   **Potential Risks**:
   - API stability: Changes to public methods may affect other code
   - Compatibility issues: If other code has method with same name, may cause 
conflicts
   - Maintenance burden: Need to maintain additional public methods
   
   **Impact Scope**:
   - Direct impact: All users of JsonSerializationSchema
   - Indirect impact: All Connectors depending on format-json
   - Affected area: Multiple Connectors
   
   **Severity**: MAJOR
   
   **Suggestions for Improvement**:
   1. Check if other Connectors use JsonSerializationSchema
   2. If this method needs to be kept, should document purpose and applicable 
scenarios in JavaDoc
   3. Consider using subclass or composition pattern instead of modifying 
shared class
   
   ### Issue 7: Insufficient Sensitive Information Handling
   
   **Location**: 
`seatunnel-connectors-v2/connector-bigquery/src/main/java/org/apache/seatunnel/connectors/bigquery/option/BigQuerySinkOptions.java:45-55`
   
   ```java
   public static final Option<String> SERVICE_ACCOUNT_KEY_PATH =
           Options.key("service_account_key_path")
                   .stringType()
                   .noDefaultValue()
                   .withDescription("Path to GCP service account JSON key 
file");
   
   public static final Option<String> SERVICE_ACCOUNT_KEY_JSON =
           Options.key("service_account_key_json")
                   .stringType()
                   .noDefaultValue()
                   .withDescription("Inline GCP service account JSON key 
content");
   ```
   
   **Related Context**:
   - User: BigQueryWriteClientFactory lines 66-81
   
   **Issue Description**:
   SERVICE_ACCOUNT_KEY_JSON allows users to directly embed GCP service account 
private key JSON in configuration files.
   
   **Potential Risks**:
   1. **Leakage risk**: Configuration files may be committed to version 
control, causing private key leakage
   2. **Log leakage**: If configuration is printed to logs, private key will be 
exposed
   3. **Excessive permissions**: Service account may have excessive 
permissions, increasing security risk
   4. **No validation**: BigQueryWriteClientFactory line 78 directly uses 
string without validating JSON format
   
   **Impact Scope**:
   - Direct impact: All users using BigQuery Sink
   - Indirect impact: GCP account security
   - Affected area: Single Connector
   
   **Severity**: MAJOR
   
   **Suggestions for Improvement**:
   1. Clearly warn in documentation not to use SERVICE_ACCOUNT_KEY_JSON 
directly in configuration files
   2. Redact sensitive information in logs
   3. Add JSON format validation:
   ```java
   try {
       byte[] jsonBytes = 
config.get(BigQuerySinkOptions.SERVICE_ACCOUNT_KEY_JSON).getBytes(StandardCharsets.UTF_8);
       credentials = ServiceAccountCredentials.fromStream(new 
ByteArrayInputStream(jsonBytes));
   } catch (IOException e) {
       throw new BigQueryConnectorException(
           BigQueryConnectorErrorCode.INVALID_CREDENTIALS, 
           "Invalid service account JSON format", 
           e);
   }
   ```
   
   ### Issue 8: Tests Disabled and Lacking Unit Tests
   
   **Location**: 
`seatunnel-e2e/seatunnel-connector-v2-e2e/connector-bigquery-e2e/src/test/java/org/apache/seatunnel/e2e/connector/bigquery/BigqueryIT.java:32-47`
   
   ```java
   @TestTemplate
   @Disabled("bigquery-emulator does not support bigquery storage write api.")
   void testBigQuerySink(TestContainer container) throws IOException, 
InterruptedException {
       Container.ExecResult execResult = 
container.executeJob("/fake_to_bigquery_sink.conf");
       Assertions.assertEquals(0, execResult.getExitCode());
   }
   
   @TestTemplate
   @Disabled("bigquery-emulator does not support bigquery storage write api.")
   void testBigQuerySinkWithVariousType(TestContainer container)
           throws IOException, InterruptedException {
       Container.ExecResult execResult =
               
container.executeJob("/fake_to_bigquery_sink_with_various_type.conf");
       Assertions.assertEquals(0, execResult.getExitCode());
   }
   ```
   
   **Related Context**:
   - PR description mentions: "I've added `BigQueryIT` for testing. However, 
the bigquery-emulator doesn't appear to support the BigQuery Write Storage API, 
so I tested this using my GCP account and verified that it works successfully."
   
   **Issue Description**:
   1. **Tests disabled**: All tests marked as `@Disabled`, will not run in CI
   2. **Missing unit tests**: Key classes have no unit tests
   3. **Dependence on real environment**: Author tested using real GCP account, 
but this cannot be automated
   
   **Potential Risks**:
   1. **Regression risk**: Future code changes may break functionality without 
timely detection
   2. **Quality assurance**: Cannot automatically verify functional correctness 
in CI
   3. **Development difficulty**: Other developers cannot quickly verify 
modifications
   
   **Impact Scope**:
   - Direct impact: Code quality and stability
   - Indirect impact: All users using this Connector
   - Affected area: Single Connector
   
   **Severity**: MAJOR
   
   **Suggestions for Improvement**:
   1. Add unit tests, using Mock to test BigQuery API:
   ```java
   @Test
   public void testWrite() throws IOException {
       BigQueryWriter mockWriter = mock(BigQueryWriter.class);
       
when(mockWriter.append(any())).thenReturn(ApiFutures.immediateFuture(mockResponse));
       
       BigQuerySinkWriter writer = new BigQuerySinkWriter(config, mockWriter, 
serializer);
       writer.write(testRow);
       
       verify(mockWriter).append(any());
   }
   ```
   
   2. For integration tests, consider:
      - Use Testcontainers to set up real BigQuery environment
      - Or create a mock BigQuery server (e.g., using WireMock)
      - Or accept manual testing (but document in docs)
   
   ### Issue 9: Hardcoded Retry Parameters
   
   **Location**: 
`seatunnel-connectors-v2/connector-bigquery/src/main/java/org/apache/seatunnel/connectors/bigquery/sink/writer/BigQueryStreamWriter.java:74-80`
   
   ```java
   RetrySettings retrySettings =
           RetrySettings.newBuilder()
                   .setInitialRetryDelay(Duration.ofMillis(500))
                   .setRetryDelayMultiplier(1.1)
                   .setMaxAttempts(5)
                   .setMaxRetryDelay(Duration.ofMinutes(1))
                   .build();
   ```
   
   **Related Context**:
   - BigQuerySinkOptions: Configuration options definition
   
   **Issue Description**:
   Retry parameters are completely hardcoded, users cannot adjust based on 
network environment or business requirements.
   
   **Potential Risks**:
   - Insufficient flexibility: High-latency networks may require longer retry 
delays
   - Timeout issues: Fast retries may increase server load
   - Debugging difficulty: Cannot troubleshoot issues by adjusting parameters
   
   **Impact Scope**:
   - Direct impact: All users using BigQuery Sink
   - Affected area: Single Connector
   
   **Severity**: MINOR
   
   **Suggestions for Improvement**:
   Add configuration options in BigQuerySinkOptions:
   ```java
   public static final Option<Long> INITIAL_RETRY_DELAY_MS =
           Options.key("initial_retry_delay_ms")
                   .longType()
                   .defaultValue(500L)
                   .withDescription("Initial retry delay in milliseconds");
   
   public static final Option<Integer> MAX_RETRY_ATTEMPTS =
           Options.key("max_retry_attempts")
                   .intType()
                   .defaultValue(5)
                   .withDescription("Maximum number of retry attempts");
   ```
   
   Then use configured values in BigQueryStreamWriter.
   
   ### Issue 10: Missing Metrics Collection
   
   **Location**: 
`seatunnel-connectors-v2/connector-bigquery/src/main/java/org/apache/seatunnel/connectors/bigquery/sink/BigQuerySinkWriter.java`
   
   **Related Context**:
   - SinkWriter.Context interface provides `getMetricsContext()`
   - Other Sinks (e.g., KafkaSink) all collect Metrics
   
   **Issue Description**:
   BigQuerySinkWriter does not collect or report any metrics, including:
   - Total rows sent
   - Batch count
   - Failure count
   - Latency metrics
   - Current buffer size
   
   **Potential Risks**:
   - Poor observability: Users cannot monitor Sink performance
   - Troubleshooting difficulty: Cannot locate issues through Metrics when 
problems occur
   - Performance optimization: Cannot optimize configuration through Metrics
   
   **Impact Scope**:
   - Direct impact: Production environment observability
   - Affected area: Single Connector
   
   **Severity**: MINOR
   
   **Suggestions for Improvement**:
   ```java
   public BigQuerySinkWriter(
           ReadonlyConfig readOnlyConfig,
           BigQueryStreamWriter streamWriter,
           BigQuerySerializer serializer,
           SinkWriter.Context context) {  // Add Context parameter
       this.batchSize = readOnlyConfig.get(BigQuerySinkOptions.BATCH_SIZE);
       this.streamWriter = streamWriter;
       this.serializer = serializer;
       
       // Initialize Metrics
       this.rowsSent = context.getMetricsContext().counter("rowsSent");
       this.batchesSent = context.getMetricsContext().counter("batchesSent");
       this.sendErrors = context.getMetricsContext().counter("sendErrors");
   }
   ```
   
   ---


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to