Ruiii-w commented on PR #10468:
URL: https://github.com/apache/seatunnel/pull/10468#issuecomment-4002426049

   
   ## Purpose of this pull request
   
   This pull request adds support for the PostgreSQL `COPY` protocol in the 
JDBC Source connector. Compared to standard `SELECT` queries, `COPY (SELECT 
...) TO STDOUT` provides significantly better throughput for bulk data 
extraction. The feature integrates with SeaTunnel's split (sharding) logic to 
preserve parallel reads and supports both CSV and Binary formats.
   
   Note: The feature was developed against SeaTunnel 2.3.8 and has been merged 
into `dev`; functionality remains stable.
   
   ---
   
   ## Improvements Based on PR Review
   
   This implementation has been **enhanced** based on the code review feedback. 
Key improvements include:
   
   | Issue                          | Resolution                                
                   |
   | ------------------------------ | 
------------------------------------------------------------ |
   | **Feature Integration**        | COPY functionality now integrated into 
`JdbcInputFormat` data reading flow |
   | **Thread Safety**              | Static variables in `PgCopyBinaryReader` 
changed to instance variables |
   | **SQL Injection Prevention**   | Added SQL validation before COPY 
execution                   |
   | **Exactly-Once Compatibility** | Added validation to prevent COPY with 
Exactly-Once semantics |
   | **ChunkSplitter Interface**    | Added configuration validation for COPY 
with fixed partitioning only |
   | **Memory Safety**              | Implemented bounded buffer expansion with 
upper limits       |
   | **Exception Handling**         | Enhanced resource cleanup with proper 
logging                |
   | **Test Coverage**              | Added unit tests and E2E tests for 
PostgreSQL COPY           |
   | **Documentation**              | Unified default values between code and 
documentation        |
   
   ---
   
   ## Key Features
   
   - **COPY-driven reads**: `COPY (SELECT ...) TO STDOUT` for PostgreSQL
   - **Parallel-friendly**: COPY is applied per split generated by SeaTunnel
   - **CSV and Binary formats**: Both formats supported with dedicated parsing 
(Binary mode uses a fast, robust reader)
   - **New JDBC Source options**: PostgreSQL-only options: `copy.enabled`, 
`copy.binary`, `copy.buffer_size`
   
   ---
   
   ## User-Facing Changes
   
   Yes. Optional JDBC Source options are added for PostgreSQL. Defaults remain 
unchanged; existing jobs are not affected.
   
   ### New Options
   
   | Name               | Type    | Required | Default   | Description          
                                        |
   | ------------------ | ------- | -------- | --------- | 
------------------------------------------------------------ |
   | `copy.enabled`     | Boolean | No       | `false`   | Enable `COPY (SELECT 
...) TO STDOUT` for PostgreSQL. Only supported for PostgreSQL. Not compatible 
with Exactly-Once. |
   | `copy.binary`      | Boolean | No       | `false`   | When `copy.enabled = 
true`, use `COPY ... WITH BINARY`. SeaTunnel parses PostgreSQL COPY binary 
protocol directly. |
   | `copy.buffer_size` | Int     | No       | `1048576` | Buffer size (bytes) 
for COPY reading. Allowed range: `65536–10485760`. May be rounded up to a power 
of 2 at runtime. |
   
   ---
   
   ## Documentation Details
   
   1. **Data Source Title**: JDBC Source (PostgreSQL COPY Mode)
   2. **Connector Support Version**: SeaTunnel 2.3.8+
   3. **Data Source Description**: High-throughput PostgreSQL bulk reads via 
`COPY (SELECT ...) TO STDOUT`
   4. **Supported Engines**: Spark, Flink, SeaTunnel Zeta
   5. **Supported Data Source List**: PostgreSQL
   6. **Dependencies**: `org.postgresql:postgresql` (standard JDBC driver)
   7. **Data Type Mapping**: Reuses existing JDBC PostgreSQL mapping; in binary 
mode, SeaTunnel decodes COPY fields directly
   8. **Options**: See the "New Options" table above
   
   ---
   
   ## Example Configuration
   
   ```hocon
   source {
     Jdbc {
       url = "jdbc:postgresql://localhost:5432/test"
       driver = "org.postgresql.Driver"
       username = "postgres"
       password = "password"
   
       # Either table_path or query can be used
       table_path = "public.my_table"
       # query = "select * from public.my_table"
   
       # Enable COPY mode
       copy.enabled = true
       # Optional: use binary mode
       copy.binary = true
       # Optional: tune buffer size (64KB ~ 10MB)
       copy.buffer_size = 1048576
     }
   }
   ```
   
   ---
   
   ## Compatibility and Constraints
   
   - **PostgreSQL only**: COPY is activated exclusively for the PostgreSQL 
dialect; `copy.*` is ignored for other dialects.
   - **Not compatible with Exactly-Once**: A validation prevents enabling COPY 
with Exactly-Once semantics.
   - **SQL Safety**: COPY SQL is generated from your `query`/`table_path` and 
split conditions; unsafe patterns (e.g., `;`, comments, DDL/DML) are rejected 
by validation.
   - **Backward compatibility**: Feature is disabled by default; non-PostgreSQL 
use cases remain unchanged.
   
   ---
   
   ## Key Implementation
   
   - **Dialect-aware path selection in the reader**
     **File**: `JdbcInputFormat.java`
     ```java
     if (useCopyStatement) {
         pgCopyInput = new PgCopyInput(config, jdbcDialect, chunkSplitter, 
splitTableSchema, splitTableId);
         pgCopyInput.open(inputSplit);
         hasNext = pgCopyInput.hasNext();
     } else {
         statement = chunkSplitter.generateSplitStatement(inputSplit, 
splitTableSchema);
         resultSet = statement.executeQuery();
         hasNext = resultSet.next();
     }
     ```
   
   - **COPY execution and reader creation**
     **File**: `PgCopyInput.java`
     ```java
     String selectSql = chunkSplitter.generateSplitQuerySQL(split, tableSchema);
     if (useBinary) {
         selectSql = addCastsForBinaryMode(selectSql, tableSchema);
     }
     String copySql = String.format("COPY (%s) TO STDOUT WITH %s", selectSql, 
useBinary ? "BINARY" : "CSV");
     
     copyStream = copyManagerProxy.copyOutAsStream(copySql);
     reader = createReader(copyStream);
     ```
   
   - **Binary decoding with robust EOF/truncation checks**
     **File**: `PgCopyBinaryReader.java`
     ```java
     // Header completeness checks
     if (buffer.remaining() < SIGNATURE.length + 8) {
         if (eof) throw new JdbcConnectorException(..., "Truncated COPY 
header");
         return;
     }
     
     // Field length payload checks
     if (buffer.remaining() < 4) {
         if (eof) throw new JdbcConnectorException(..., "Unexpected EOF while 
reading field length");
         return;
     }
     ```
   
   - **Options and validation**
     **Files**: `JdbcSourceOptions.java`, `JdbcSourceConfig.java`
   
   ---
   
   ## Testing
   
   - **Unit Tests**
     **File**: `PgCopyBinaryReaderTest.java` and `PgCopyCsvReaderTest.java`
     Coverage highlights:
     - `testBinaryHeaderParsing`: Header parsing
     - `testRowParsing` / `testNullValues` / `testMoreDataTypes` / 
`testDateAndTimestamp` / `testByteAAndSmallIntFloat`: Data types and nulls
     - `testInvalidSignature` / `testPrematureEof`: Negative paths
     - `testBufferExpansion`: Large field triggers controlled buffer growth
   
   - **E2E Tests**
     E2E test is added to `JdbcPostgresIT` by appending 
`/jdbc_postgres_source_copy_binary.conf` to `PG_CONFIG_FILE_LIST`, and executed 
by `testAutoGenerateSQL` via `container.executeJob(CONFIG_FILE)`.
   
   ---
   
   ## Backward Compatibility
   
   - All new options are optional and disabled by default.
   - Non-PostgreSQL sources ignore COPY options.
   - Explicit incompatibility with Exactly-Once prevents ambiguous behavior.
   
   ---
   
   ## Performance Benchmarks
   
   ### Test Environment 1: Jdbc Source → Doris Sink
   
   **Environment**: VMware virtualized x86_64 virtual machine
   - **CPU**: Intel Xeon Silver 4210 @ 2.20GHz (16 cores)
   - **Memory**: 32GB
   - **Storage**: ~128GB virtual disk
   - **Architecture**: 2 NUMA nodes, hyperthreading disabled
   
   **Dataset**: 4,513,000 rows, 393 columns
   
   > *All results are from single-threaded performance tests*
   
   #### Native JDBC (SeaTunnel)
   
   - **Total Time**: 3145.9 s (~52 min 26 sec)
   - **Average Rate**: 434.6 rec/s
   - **Peak Rate**: 1763.0 rec/s
   - **Minimum Rate**: 125.7 rec/s
   
   #### COPY WITH BINARY
   
   - **Total Time**: 2011.5 s (~33 min 32 sec)
   - **Average Rate**: 2243.6 rec/s
   - **Peak Rate**: 2701.2 rec/s
   - **Minimum Rate**: 708.3 rec/s
   
   | Metric                   | COPY WITH BINARY | Native JDBC | Improvement |
   | ------------------------ | ---------------- | ----------- | ----------- |
   | Total Time (seconds)     | 2011.5           | 3145.9      | **-36%**    |
   | Average Rate (records/s) | 2243.6           | 1434.6      | **+56%**    |
   | Peak Rate (records/s)    | 2701.2           | 1763.0      | **+53%**    |
   | Throughput (MB/s)        | ~14.7            | ~9.35       | **+57%**    |
   
   ---
   
   ### Test Environment 2: Jdbc Source → Console Sink
   
   **Environment**: High-memory multi-core compute server
   - **CPU**: Hygon (64 cores, 128 threads)
   - **Memory**: 256GB
   - **Storage**: 1024GB SSD
   
   **Dataset**: 4,513,000 rows, 393 columns
   
   > *All results are from single-threaded performance tests*
   
   #### Native JDBC (SeaTunnel)
   
   - **Total Time**: 962.9 s (~16 min 2.9 sec)
   - **Average Rate**: 4687 rec/s
   - **Peak Rate**: 5262.4 rec/s
   - **Minimum Rate**: 1963.7 rec/s
   
   #### COPY WITH BINARY
   
   - **Total Time**: 381.5 s (~6 min 21.5 sec)
   - **Average Rate**: 11830.1 rec/s
   - **Peak Rate**: 12796.3 rec/s
   - **Minimum Rate**: 10081.4 rec/s
   
   | Metric                   | COPY WITH BINARY | Native JDBC | Improvement |
   | ------------------------ | ---------------- | ----------- | ----------- |
   | Total Time (seconds)     | 381.5            | 962.9       | **-60.4%**  |
   | Average Rate (records/s) | 11830.1          | 4687        | **+152.4%** |
   | Peak Rate (records/s)    | 12796.3          | 5262.4      | **+143.2%** |
   | Throughput (MB/s)        | ~77.1            | ~30.5       | **+152.8%** |
   
   ---
   
   ### Key Findings
   
   - **Binary COPY mode consistently outperforms native JDBC** by **36-60% in 
total execution time**
   - **Throughput improvements are more significant in high-performance 
environments** (152% on high-end servers)
   - **Performance gains are substantial for wide tables** (393 columns in test 
dataset)
   - **Peak throughput exceeds 12,000 records/second** in optimized environments
   
   ---
   
   ## Checklist
   
   - [x] Documentation updated (en/zh) for new options and behavior
   - [x] Unit tests added/updated, including negative and boundary cases
   - [x] E2E tests added/updated for PostgreSQL COPY (including Binary)
   - [x] No new external dependencies introduced
   - [x] Changes scoped to JDBC Source (PostgreSQL) without breaking public SPI
   - [x] Code formatted and basic build passes (`spotless`/`verify`/`tests`)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to