This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 2130e0d5ad [Fix] Fix oracle sample data from column error (#7340)
2130e0d5ad is described below
commit 2130e0d5ad720d2c4af71a886d4fed55d50c0499
Author: Jia Fan <[email protected]>
AuthorDate: Tue Aug 20 10:37:27 2024 +0800
[Fix] Fix oracle sample data from column error (#7340)
---
.../internal/dialect/oracle/OracleDialect.java | 42 ++++++++++++++++++++++
.../connectors/seatunnel/jdbc/JdbcOracleIT.java | 10 ++++++
2 files changed, 52 insertions(+)
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleDialect.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleDialect.java
index e1aee7f7d8..b6a35dba0c 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleDialect.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleDialect.java
@@ -35,6 +35,7 @@ import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
@@ -284,4 +285,45 @@ public class OracleDialect implements JdbcDialect {
}
}
}
+
+ @Override
+ public Object[] sampleDataFromColumn(
+ Connection connection,
+ JdbcSourceTable table,
+ String columnName,
+ int samplingRate,
+ int fetchSize)
+ throws Exception {
+ String sampleQuery;
+ if (StringUtils.isNotBlank(table.getQuery())) {
+ sampleQuery =
+ String.format(
+ "SELECT %s FROM (%s) T",
quoteIdentifier(columnName), table.getQuery());
+ } else {
+ sampleQuery =
+ String.format(
+ "SELECT %s FROM %s",
+ quoteIdentifier(columnName),
tableIdentifier(table.getTablePath()));
+ }
+
+ try (PreparedStatement stmt = creatPreparedStatement(connection,
sampleQuery, fetchSize)) {
+ try (ResultSet rs = stmt.executeQuery()) {
+ int count = 0;
+ List<Object> results = new ArrayList<>();
+
+ while (rs.next()) {
+ count++;
+ if (count % samplingRate == 0) {
+ results.add(rs.getObject(1));
+ }
+ if (Thread.currentThread().isInterrupted()) {
+ throw new InterruptedException("Thread interrupted");
+ }
+ }
+ Object[] resultsArray = results.toArray();
+ Arrays.sort(resultsArray);
+ return resultsArray;
+ }
+ }
+ }
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOracleIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOracleIT.java
index e4b4de3950..b7c4a54b59 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOracleIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOracleIT.java
@@ -136,6 +136,16 @@ public class JdbcOracleIT extends AbstractJdbcIT {
.tablePath(TablePath.of(null, SCHEMA, SOURCE_TABLE))
.build();
dialect.sampleDataFromColumn(connection, table, "INTEGER_COL", 1,
1024);
+
+ table =
+ JdbcSourceTable.builder()
+ .tablePath(TablePath.of(null, SCHEMA, SOURCE_TABLE))
+ .query(
+ "select * from "
+ + quoteIdentifier(SOURCE_TABLE)
+ + " where INTEGER_COL = 1")
+ .build();
+ dialect.sampleDataFromColumn(connection, table, "INTEGER_COL", 1,
1024);
}
@Override