This is an automated email from the ASF dual-hosted git repository.
corgy pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 01f1caa6fb [Fix][Connector-clickhouse] Fix SeaTunnelRow tableId set
error (#9585)
01f1caa6fb is described below
commit 01f1caa6fb2cd2465f1eb4cef04f38fc9e1de599
Author: JeremyXin <[email protected]>
AuthorDate: Tue Jul 22 11:17:05 2025 +0800
[Fix][Connector-clickhouse] Fix SeaTunnelRow tableId set error (#9585)
---
.../clickhouse/config/ClickhouseSourceConfig.java | 1 +
.../clickhouse/source/ClickhouseValueReader.java | 8 ++-
.../seatunnel/clickhouse/util/ClickhouseProxy.java | 5 +-
.../source/ClickhouseValueReaderTest.java | 82 +++++++++++++++++-----
4 files changed, 75 insertions(+), 21 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/ClickhouseSourceConfig.java
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/ClickhouseSourceConfig.java
index a26d4be952..bbf4a51cb0 100644
---
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/ClickhouseSourceConfig.java
+++
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/ClickhouseSourceConfig.java
@@ -69,6 +69,7 @@ public class ClickhouseSourceConfig implements Serializable {
builder.splitSize(config.get(ClickhouseSourceOptions.CLICKHOUSE_SPLIT_SIZE));
builder.sql(config.get(ClickhouseSourceOptions.SQL));
builder.clickhouseConfig(config.get(ClickhouseBaseOptions.CLICKHOUSE_CONFIG));
+
builder.serverTimeZone(config.get(ClickhouseBaseOptions.SERVER_TIME_ZONE));
builder.isSqlStrategyRead(config.getOptional(ClickhouseSourceOptions.SQL).isPresent());
return builder.build();
diff --git
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseValueReader.java
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseValueReader.java
index 336ca005de..77da3a77dc 100644
---
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseValueReader.java
+++
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseValueReader.java
@@ -105,7 +105,9 @@ public class ClickhouseValueReader implements Serializable {
try {
String query = buildPartQuery(currentPart);
- rowBatch = proxy.batchFetchRecords(query, rowTypeInfo);
+ rowBatch =
+ proxy.batchFetchRecords(
+ query, clickhouseSourceTable.getTablePath(),
rowTypeInfo);
log.debug(
"SplitId: {}, partName: {} read rowBatch size: {}",
@@ -139,7 +141,9 @@ public class ClickhouseValueReader implements Serializable {
String query = buildSqlQuery();
try {
- rowBatch = proxy.batchFetchRecords(query, rowTypeInfo);
+ rowBatch =
+ proxy.batchFetchRecords(
+ query, clickhouseSourceTable.getTablePath(),
rowTypeInfo);
clickhouseSourceSplit.setSqlOffset(
clickhouseSourceSplit.getSqlOffset() + rowBatch.size());
diff --git
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/ClickhouseProxy.java
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/ClickhouseProxy.java
index 8264dc8b84..54788292db 100644
---
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/ClickhouseProxy.java
+++
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/ClickhouseProxy.java
@@ -475,7 +475,8 @@ public class ClickhouseProxy implements AutoCloseable {
}
}
- public List<SeaTunnelRow> batchFetchRecords(String sql, SeaTunnelRowType
seaTunnelRowType) {
+ public List<SeaTunnelRow> batchFetchRecords(
+ String sql, TablePath tablePath, SeaTunnelRowType
seaTunnelRowType) {
List<SeaTunnelRow> seaTunnelRowList = new ArrayList<>();
log.debug("run query data sql: {}", sql);
@@ -485,7 +486,7 @@ public class ClickhouseProxy implements AutoCloseable {
record -> {
SeaTunnelRow seaTunnelRow =
ClickhouseUtil.convertToSeaTunnelRow(
- record, seaTunnelRowType, sql);
+ record, seaTunnelRowType,
tablePath.getFullName());
seaTunnelRowList.add(seaTunnelRow);
});
} catch (ClickHouseException e) {
diff --git
a/seatunnel-connectors-v2/connector-clickhouse/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseValueReaderTest.java
b/seatunnel-connectors-v2/connector-clickhouse/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseValueReaderTest.java
index 697f3e94c3..57571b1dc7 100644
---
a/seatunnel-connectors-v2/connector-clickhouse/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseValueReaderTest.java
+++
b/seatunnel-connectors-v2/connector-clickhouse/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseValueReaderTest.java
@@ -32,13 +32,16 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
+import com.clickhouse.client.ClickHouseColumn;
import com.clickhouse.client.ClickHouseException;
import com.clickhouse.client.ClickHouseNode;
import com.clickhouse.client.ClickHouseRecord;
import com.clickhouse.client.ClickHouseRequest;
import com.clickhouse.client.ClickHouseResponse;
+import com.clickhouse.client.ClickHouseValue;
import com.clickhouse.client.data.ClickHouseIntegerValue;
import com.clickhouse.client.data.ClickHouseLongValue;
+import com.clickhouse.client.data.ClickHouseSimpleRecord;
import com.clickhouse.client.data.ClickHouseStringValue;
import lombok.extern.slf4j.Slf4j;
@@ -55,6 +58,7 @@ import static org.mockito.Mockito.when;
public class ClickhouseValueReaderTest {
private ClickhouseProxy mockProxy;
+ private ClickHouseNode node;
private ClickhouseValueReader reader;
private ClickhouseSourceSplit split;
@@ -81,7 +85,7 @@ public class ClickhouseValueReaderTest {
.clickhouseTable(mockClickhouseTable)
.build();
- ClickHouseNode node =
ClickHouseNode.builder().host("localhost").port(8123).build();
+ node = ClickHouseNode.builder().host("localhost").port(8123).build();
Shard shard = new Shard(1, 1, node);
@@ -117,7 +121,8 @@ public class ClickhouseValueReaderTest {
public void testHasNextWithFullBatch() {
List<SeaTunnelRow> mockRows = createMockRows(BATCH_SIZE);
- when(mockProxy.batchFetchRecords(any(),
eq(rowType))).thenReturn(mockRows);
+ when(mockProxy.batchFetchRecords(any(),
eq(sourceTable.getTablePath()), eq(rowType)))
+ .thenReturn(mockRows);
Assertions.assertTrue(reader.hasNext());
@@ -137,7 +142,8 @@ public class ClickhouseValueReaderTest {
int partialSize = BATCH_SIZE - 2;
List<SeaTunnelRow> mockRows = createMockRows(partialSize);
- when(mockProxy.batchFetchRecords(any(),
eq(rowType))).thenReturn(mockRows);
+ when(mockProxy.batchFetchRecords(any(),
eq(sourceTable.getTablePath()), eq(rowType)))
+ .thenReturn(mockRows);
Assertions.assertTrue(reader.hasNext());
@@ -156,7 +162,8 @@ public class ClickhouseValueReaderTest {
// create empty test data
List<SeaTunnelRow> mockRows = new ArrayList<>();
- when(mockProxy.batchFetchRecords(any(),
eq(rowType))).thenReturn(mockRows);
+ when(mockProxy.batchFetchRecords(any(),
eq(sourceTable.getTablePath()), eq(rowType)))
+ .thenReturn(mockRows);
Assertions.assertFalse(reader.hasNext());
@@ -181,7 +188,7 @@ public class ClickhouseValueReaderTest {
List<ClickhousePart> parts = split.getParts();
// Return different data for different parts
- when(mockProxy.batchFetchRecords(any(), eq(rowType)))
+ when(mockProxy.batchFetchRecords(any(),
eq(sourceTable.getTablePath()), eq(rowType)))
.thenAnswer(
invocation -> {
ClickhousePart part =
parts.get(reader.currentPartIndex);
@@ -264,7 +271,7 @@ public class ClickhouseValueReaderTest {
List<SeaTunnelRow> secondBatch = createMockRows(5);
List<SeaTunnelRow> emptyBatch = new ArrayList<>();
- when(mockProxy.batchFetchRecords(any(), eq(rowType)))
+ when(mockProxy.batchFetchRecords(any(),
eq(sourceTable.getTablePath()), eq(rowType)))
.thenAnswer(
x ->
split.getSqlOffset() == 0
@@ -283,22 +290,55 @@ public class ClickhouseValueReaderTest {
Assertions.assertFalse(reader.hasNext());
- Mockito.verify(mockProxy, Mockito.times(3)).batchFetchRecords(any(),
any());
+ Mockito.verify(mockProxy, Mockito.times(3))
+ .batchFetchRecords(any(), eq(sourceTable.getTablePath()),
any());
}
- private void initStreamValueReaderMock() throws ClickHouseException {
- // mock ClickHouseResponse
+ @Test
+ public void testBatchFetchRecordsAndTableId() throws Exception {
+ // mock proxy query response
+ ClickhouseProxy proxy = Mockito.spy(new ClickhouseProxy(node));
+ Field requestField =
ClickhouseProxy.class.getDeclaredField("clickhouseRequest");
+ requestField.setAccessible(true);
ClickHouseRequest mockRequest = Mockito.mock(ClickHouseRequest.class);
+ requestField.set(proxy, mockRequest);
+
+ mockClickhouseQueryAndResponse(proxy, mockRequest,
createMockClickHouseRecords());
+
+ // test values and tableId return by batchFetchRecords
+ TablePath tablePath = sourceTable.getTablePath();
+ List<SeaTunnelRow> rows =
+ proxy.batchFetchRecords("select * from test_db.test_table",
tablePath, rowType);
+ Assertions.assertEquals(BATCH_SIZE, rows.size());
+
+ for (int i = 0; i < BATCH_SIZE; i++) {
+ Assertions.assertEquals((long) i, rows.get(i).getField(0));
+ Assertions.assertEquals("name" + i, rows.get(i).getField(1));
+ Assertions.assertEquals(20 + i, rows.get(i).getField(2));
+ Assertions.assertEquals(tablePath.getFullName(),
rows.get(i).getTableId());
+ }
+ }
+
+ private void initStreamValueReaderMock() throws ClickHouseException {
+ mockClickhouseQueryAndResponse(mockProxy, null,
createMockClickHouseRecords());
+ }
+
+ private void mockClickhouseQueryAndResponse(
+ ClickhouseProxy proxy,
+ ClickHouseRequest mockRequest,
+ List<ClickHouseRecord> mockRecords)
+ throws ClickHouseException {
+ if (mockRequest == null) {
+ mockRequest = Mockito.mock(ClickHouseRequest.class);
+ }
ClickHouseRequest mockQueryRequest =
Mockito.mock(ClickHouseRequest.class);
ClickHouseResponse mockResponse =
Mockito.mock(ClickHouseResponse.class);
- when(mockProxy.getClickhouseConnection()).thenReturn(mockRequest);
+ when(proxy.getClickhouseConnection()).thenReturn(mockRequest);
when(mockRequest.query(any(String.class))).thenReturn(mockQueryRequest);
when(mockQueryRequest.executeAndWait()).thenReturn(mockResponse);
-
- // create multiple batches of mock data
- List<ClickHouseRecord> mockRecords = createMockClickHouseRecords();
when(mockResponse.records()).thenReturn(mockRecords);
+ when(mockResponse.stream()).thenReturn(mockRecords.stream());
}
private List<SeaTunnelRow> createMockRows(int size) {
@@ -316,12 +356,20 @@ public class ClickhouseValueReaderTest {
private List<ClickHouseRecord> createMockClickHouseRecords() {
List<ClickHouseRecord> records = new ArrayList<>();
+ List<ClickHouseColumn> clickHouseColumns = new ArrayList<>();
+ clickHouseColumns.add(ClickHouseColumn.of("id", "Int32"));
+ clickHouseColumns.add(ClickHouseColumn.of("name", "String"));
+ clickHouseColumns.add(ClickHouseColumn.of("age", "Int8"));
+
for (int i = 0; i < BATCH_SIZE; i++) {
- ClickHouseRecord mockRecord = Mockito.mock(ClickHouseRecord.class);
-
when(mockRecord.getValue(0)).thenReturn(ClickHouseLongValue.of((long) i));
-
when(mockRecord.getValue(1)).thenReturn(ClickHouseStringValue.of("name" + i));
-
when(mockRecord.getValue(2)).thenReturn(ClickHouseIntegerValue.of(20 + i));
+ ClickHouseValue[] clickHouseValues = new ClickHouseValue[3];
+ clickHouseValues[0] = ClickHouseLongValue.of((long) i);
+ clickHouseValues[1] = ClickHouseStringValue.of("name" + i);
+ clickHouseValues[2] = ClickHouseIntegerValue.of(20 + i);
+
+ ClickHouseRecord mockRecord =
+ ClickHouseSimpleRecord.of(clickHouseColumns,
clickHouseValues);
records.add(mockRecord);
}
return records;