This is an automated email from the ASF dual-hosted git repository.

corgy pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 01f1caa6fb [Fix][Connector-clickhouse] Fix SeaTunnelRow tableId set 
error (#9585)
01f1caa6fb is described below

commit 01f1caa6fb2cd2465f1eb4cef04f38fc9e1de599
Author: JeremyXin <[email protected]>
AuthorDate: Tue Jul 22 11:17:05 2025 +0800

    [Fix][Connector-clickhouse] Fix SeaTunnelRow tableId set error (#9585)
---
 .../clickhouse/config/ClickhouseSourceConfig.java  |  1 +
 .../clickhouse/source/ClickhouseValueReader.java   |  8 ++-
 .../seatunnel/clickhouse/util/ClickhouseProxy.java |  5 +-
 .../source/ClickhouseValueReaderTest.java          | 82 +++++++++++++++++-----
 4 files changed, 75 insertions(+), 21 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/ClickhouseSourceConfig.java
 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/ClickhouseSourceConfig.java
index a26d4be952..bbf4a51cb0 100644
--- 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/ClickhouseSourceConfig.java
+++ 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/ClickhouseSourceConfig.java
@@ -69,6 +69,7 @@ public class ClickhouseSourceConfig implements Serializable {
         
builder.splitSize(config.get(ClickhouseSourceOptions.CLICKHOUSE_SPLIT_SIZE));
         builder.sql(config.get(ClickhouseSourceOptions.SQL));
         
builder.clickhouseConfig(config.get(ClickhouseBaseOptions.CLICKHOUSE_CONFIG));
+        
builder.serverTimeZone(config.get(ClickhouseBaseOptions.SERVER_TIME_ZONE));
         
builder.isSqlStrategyRead(config.getOptional(ClickhouseSourceOptions.SQL).isPresent());
 
         return builder.build();
diff --git 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseValueReader.java
 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseValueReader.java
index 336ca005de..77da3a77dc 100644
--- 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseValueReader.java
+++ 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseValueReader.java
@@ -105,7 +105,9 @@ public class ClickhouseValueReader implements Serializable {
 
         try {
             String query = buildPartQuery(currentPart);
-            rowBatch = proxy.batchFetchRecords(query, rowTypeInfo);
+            rowBatch =
+                    proxy.batchFetchRecords(
+                            query, clickhouseSourceTable.getTablePath(), 
rowTypeInfo);
 
             log.debug(
                     "SplitId: {}, partName: {} read rowBatch size: {}",
@@ -139,7 +141,9 @@ public class ClickhouseValueReader implements Serializable {
         String query = buildSqlQuery();
 
         try {
-            rowBatch = proxy.batchFetchRecords(query, rowTypeInfo);
+            rowBatch =
+                    proxy.batchFetchRecords(
+                            query, clickhouseSourceTable.getTablePath(), 
rowTypeInfo);
 
             clickhouseSourceSplit.setSqlOffset(
                     clickhouseSourceSplit.getSqlOffset() + rowBatch.size());
diff --git 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/ClickhouseProxy.java
 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/ClickhouseProxy.java
index 8264dc8b84..54788292db 100644
--- 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/ClickhouseProxy.java
+++ 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/ClickhouseProxy.java
@@ -475,7 +475,8 @@ public class ClickhouseProxy implements AutoCloseable {
         }
     }
 
-    public List<SeaTunnelRow> batchFetchRecords(String sql, SeaTunnelRowType 
seaTunnelRowType) {
+    public List<SeaTunnelRow> batchFetchRecords(
+            String sql, TablePath tablePath, SeaTunnelRowType 
seaTunnelRowType) {
         List<SeaTunnelRow> seaTunnelRowList = new ArrayList<>();
         log.debug("run query data sql: {}", sql);
 
@@ -485,7 +486,7 @@ public class ClickhouseProxy implements AutoCloseable {
                             record -> {
                                 SeaTunnelRow seaTunnelRow =
                                         ClickhouseUtil.convertToSeaTunnelRow(
-                                                record, seaTunnelRowType, sql);
+                                                record, seaTunnelRowType, 
tablePath.getFullName());
                                 seaTunnelRowList.add(seaTunnelRow);
                             });
         } catch (ClickHouseException e) {
diff --git 
a/seatunnel-connectors-v2/connector-clickhouse/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseValueReaderTest.java
 
b/seatunnel-connectors-v2/connector-clickhouse/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseValueReaderTest.java
index 697f3e94c3..57571b1dc7 100644
--- 
a/seatunnel-connectors-v2/connector-clickhouse/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseValueReaderTest.java
+++ 
b/seatunnel-connectors-v2/connector-clickhouse/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseValueReaderTest.java
@@ -32,13 +32,16 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.mockito.Mockito;
 
+import com.clickhouse.client.ClickHouseColumn;
 import com.clickhouse.client.ClickHouseException;
 import com.clickhouse.client.ClickHouseNode;
 import com.clickhouse.client.ClickHouseRecord;
 import com.clickhouse.client.ClickHouseRequest;
 import com.clickhouse.client.ClickHouseResponse;
+import com.clickhouse.client.ClickHouseValue;
 import com.clickhouse.client.data.ClickHouseIntegerValue;
 import com.clickhouse.client.data.ClickHouseLongValue;
+import com.clickhouse.client.data.ClickHouseSimpleRecord;
 import com.clickhouse.client.data.ClickHouseStringValue;
 import lombok.extern.slf4j.Slf4j;
 
@@ -55,6 +58,7 @@ import static org.mockito.Mockito.when;
 public class ClickhouseValueReaderTest {
 
     private ClickhouseProxy mockProxy;
+    private ClickHouseNode node;
 
     private ClickhouseValueReader reader;
     private ClickhouseSourceSplit split;
@@ -81,7 +85,7 @@ public class ClickhouseValueReaderTest {
                         .clickhouseTable(mockClickhouseTable)
                         .build();
 
-        ClickHouseNode node = 
ClickHouseNode.builder().host("localhost").port(8123).build();
+        node = ClickHouseNode.builder().host("localhost").port(8123).build();
 
         Shard shard = new Shard(1, 1, node);
 
@@ -117,7 +121,8 @@ public class ClickhouseValueReaderTest {
     public void testHasNextWithFullBatch() {
         List<SeaTunnelRow> mockRows = createMockRows(BATCH_SIZE);
 
-        when(mockProxy.batchFetchRecords(any(), 
eq(rowType))).thenReturn(mockRows);
+        when(mockProxy.batchFetchRecords(any(), 
eq(sourceTable.getTablePath()), eq(rowType)))
+                .thenReturn(mockRows);
 
         Assertions.assertTrue(reader.hasNext());
 
@@ -137,7 +142,8 @@ public class ClickhouseValueReaderTest {
         int partialSize = BATCH_SIZE - 2;
         List<SeaTunnelRow> mockRows = createMockRows(partialSize);
 
-        when(mockProxy.batchFetchRecords(any(), 
eq(rowType))).thenReturn(mockRows);
+        when(mockProxy.batchFetchRecords(any(), 
eq(sourceTable.getTablePath()), eq(rowType)))
+                .thenReturn(mockRows);
 
         Assertions.assertTrue(reader.hasNext());
 
@@ -156,7 +162,8 @@ public class ClickhouseValueReaderTest {
         // create empty test data
         List<SeaTunnelRow> mockRows = new ArrayList<>();
 
-        when(mockProxy.batchFetchRecords(any(), 
eq(rowType))).thenReturn(mockRows);
+        when(mockProxy.batchFetchRecords(any(), 
eq(sourceTable.getTablePath()), eq(rowType)))
+                .thenReturn(mockRows);
 
         Assertions.assertFalse(reader.hasNext());
 
@@ -181,7 +188,7 @@ public class ClickhouseValueReaderTest {
         List<ClickhousePart> parts = split.getParts();
 
         // Return different data for different parts
-        when(mockProxy.batchFetchRecords(any(), eq(rowType)))
+        when(mockProxy.batchFetchRecords(any(), 
eq(sourceTable.getTablePath()), eq(rowType)))
                 .thenAnswer(
                         invocation -> {
                             ClickhousePart part = 
parts.get(reader.currentPartIndex);
@@ -264,7 +271,7 @@ public class ClickhouseValueReaderTest {
         List<SeaTunnelRow> secondBatch = createMockRows(5);
         List<SeaTunnelRow> emptyBatch = new ArrayList<>();
 
-        when(mockProxy.batchFetchRecords(any(), eq(rowType)))
+        when(mockProxy.batchFetchRecords(any(), 
eq(sourceTable.getTablePath()), eq(rowType)))
                 .thenAnswer(
                         x ->
                                 split.getSqlOffset() == 0
@@ -283,22 +290,55 @@ public class ClickhouseValueReaderTest {
 
         Assertions.assertFalse(reader.hasNext());
 
-        Mockito.verify(mockProxy, Mockito.times(3)).batchFetchRecords(any(), 
any());
+        Mockito.verify(mockProxy, Mockito.times(3))
+                .batchFetchRecords(any(), eq(sourceTable.getTablePath()), 
any());
     }
 
-    private void initStreamValueReaderMock() throws ClickHouseException {
-        // mock ClickHouseResponse
+    @Test
+    public void testBatchFetchRecordsAndTableId() throws Exception {
+        // mock proxy query response
+        ClickhouseProxy proxy = Mockito.spy(new ClickhouseProxy(node));
+        Field requestField = 
ClickhouseProxy.class.getDeclaredField("clickhouseRequest");
+        requestField.setAccessible(true);
         ClickHouseRequest mockRequest = Mockito.mock(ClickHouseRequest.class);
+        requestField.set(proxy, mockRequest);
+
+        mockClickhouseQueryAndResponse(proxy, mockRequest, 
createMockClickHouseRecords());
+
+        // test values and tableId return by batchFetchRecords
+        TablePath tablePath = sourceTable.getTablePath();
+        List<SeaTunnelRow> rows =
+                proxy.batchFetchRecords("select * from test_db.test_table", 
tablePath, rowType);
+        Assertions.assertEquals(BATCH_SIZE, rows.size());
+
+        for (int i = 0; i < BATCH_SIZE; i++) {
+            Assertions.assertEquals((long) i, rows.get(i).getField(0));
+            Assertions.assertEquals("name" + i, rows.get(i).getField(1));
+            Assertions.assertEquals(20 + i, rows.get(i).getField(2));
+            Assertions.assertEquals(tablePath.getFullName(), 
rows.get(i).getTableId());
+        }
+    }
+
+    private void initStreamValueReaderMock() throws ClickHouseException {
+        mockClickhouseQueryAndResponse(mockProxy, null, 
createMockClickHouseRecords());
+    }
+
+    private void mockClickhouseQueryAndResponse(
+            ClickhouseProxy proxy,
+            ClickHouseRequest mockRequest,
+            List<ClickHouseRecord> mockRecords)
+            throws ClickHouseException {
+        if (mockRequest == null) {
+            mockRequest = Mockito.mock(ClickHouseRequest.class);
+        }
         ClickHouseRequest mockQueryRequest = 
Mockito.mock(ClickHouseRequest.class);
         ClickHouseResponse mockResponse = 
Mockito.mock(ClickHouseResponse.class);
 
-        when(mockProxy.getClickhouseConnection()).thenReturn(mockRequest);
+        when(proxy.getClickhouseConnection()).thenReturn(mockRequest);
         
when(mockRequest.query(any(String.class))).thenReturn(mockQueryRequest);
         when(mockQueryRequest.executeAndWait()).thenReturn(mockResponse);
-
-        // create multiple batches of mock data
-        List<ClickHouseRecord> mockRecords = createMockClickHouseRecords();
         when(mockResponse.records()).thenReturn(mockRecords);
+        when(mockResponse.stream()).thenReturn(mockRecords.stream());
     }
 
     private List<SeaTunnelRow> createMockRows(int size) {
@@ -316,12 +356,20 @@ public class ClickhouseValueReaderTest {
     private List<ClickHouseRecord> createMockClickHouseRecords() {
         List<ClickHouseRecord> records = new ArrayList<>();
 
+        List<ClickHouseColumn> clickHouseColumns = new ArrayList<>();
+        clickHouseColumns.add(ClickHouseColumn.of("id", "Int32"));
+        clickHouseColumns.add(ClickHouseColumn.of("name", "String"));
+        clickHouseColumns.add(ClickHouseColumn.of("age", "Int8"));
+
         for (int i = 0; i < BATCH_SIZE; i++) {
-            ClickHouseRecord mockRecord = Mockito.mock(ClickHouseRecord.class);
 
-            
when(mockRecord.getValue(0)).thenReturn(ClickHouseLongValue.of((long) i));
-            
when(mockRecord.getValue(1)).thenReturn(ClickHouseStringValue.of("name" + i));
-            
when(mockRecord.getValue(2)).thenReturn(ClickHouseIntegerValue.of(20 + i));
+            ClickHouseValue[] clickHouseValues = new ClickHouseValue[3];
+            clickHouseValues[0] = ClickHouseLongValue.of((long) i);
+            clickHouseValues[1] = ClickHouseStringValue.of("name" + i);
+            clickHouseValues[2] = ClickHouseIntegerValue.of(20 + i);
+
+            ClickHouseRecord mockRecord =
+                    ClickHouseSimpleRecord.of(clickHouseColumns, 
clickHouseValues);
             records.add(mockRecord);
         }
         return records;

Reply via email to