This is an automated email from the ASF dual-hosted git repository.
gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new ddca95f32c [Improve][JDBC] Optimized code style for getting jdbc field
types (#6583)
ddca95f32c is described below
commit ddca95f32c8cfec8ef22d087ce4ae5ccf78fcdd7
Author: ZhilinLi <[email protected]>
AuthorDate: Wed Mar 27 09:59:51 2024 +0800
[Improve][JDBC] Optimized code style for getting jdbc field types (#6583)
---
.../converter/AbstractJdbcRowConverter.java | 28 +++++-----
.../dialect/kingbase/KingbaseJdbcRowConverter.java | 28 +++++-----
.../dialect/psql/PostgresJdbcRowConverter.java | 28 +++++-----
.../sqlserver/SqlserverJdbcRowConverter.java | 4 +-
.../{JdbcUtils.java => JdbcFieldTypeUtils.java} | 64 ++++++++++------------
5 files changed, 72 insertions(+), 80 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/converter/AbstractJdbcRowConverter.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/converter/AbstractJdbcRowConverter.java
index 5a4a6b60d8..8ff8ac47d7 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/converter/AbstractJdbcRowConverter.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/converter/AbstractJdbcRowConverter.java
@@ -23,7 +23,7 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorException;
-import org.apache.seatunnel.connectors.seatunnel.jdbc.utils.JdbcUtils;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.utils.JdbcFieldTypeUtils;
import lombok.extern.slf4j.Slf4j;
@@ -56,34 +56,34 @@ public abstract class AbstractJdbcRowConverter implements
JdbcRowConverter {
int resultSetIndex = fieldIndex + 1;
switch (seaTunnelDataType.getSqlType()) {
case STRING:
- fields[fieldIndex] = JdbcUtils.getString(rs,
resultSetIndex);
+ fields[fieldIndex] = JdbcFieldTypeUtils.getString(rs,
resultSetIndex);
break;
case BOOLEAN:
- fields[fieldIndex] = JdbcUtils.getBoolean(rs,
resultSetIndex);
+ fields[fieldIndex] = JdbcFieldTypeUtils.getBoolean(rs,
resultSetIndex);
break;
case TINYINT:
- fields[fieldIndex] = JdbcUtils.getByte(rs, resultSetIndex);
+ fields[fieldIndex] = JdbcFieldTypeUtils.getByte(rs,
resultSetIndex);
break;
case SMALLINT:
- fields[fieldIndex] = JdbcUtils.getShort(rs,
resultSetIndex);
+ fields[fieldIndex] = JdbcFieldTypeUtils.getShort(rs,
resultSetIndex);
break;
case INT:
- fields[fieldIndex] = JdbcUtils.getInt(rs, resultSetIndex);
+ fields[fieldIndex] = JdbcFieldTypeUtils.getInt(rs,
resultSetIndex);
break;
case BIGINT:
- fields[fieldIndex] = JdbcUtils.getLong(rs, resultSetIndex);
+ fields[fieldIndex] = JdbcFieldTypeUtils.getLong(rs,
resultSetIndex);
break;
case FLOAT:
- fields[fieldIndex] = JdbcUtils.getFloat(rs,
resultSetIndex);
+ fields[fieldIndex] = JdbcFieldTypeUtils.getFloat(rs,
resultSetIndex);
break;
case DOUBLE:
- fields[fieldIndex] = JdbcUtils.getDouble(rs,
resultSetIndex);
+ fields[fieldIndex] = JdbcFieldTypeUtils.getDouble(rs,
resultSetIndex);
break;
case DECIMAL:
- fields[fieldIndex] = JdbcUtils.getBigDecimal(rs,
resultSetIndex);
+ fields[fieldIndex] = JdbcFieldTypeUtils.getBigDecimal(rs,
resultSetIndex);
break;
case DATE:
- Date sqlDate = JdbcUtils.getDate(rs, resultSetIndex);
+ Date sqlDate = JdbcFieldTypeUtils.getDate(rs,
resultSetIndex);
fields[fieldIndex] =
Optional.ofNullable(sqlDate).map(e ->
e.toLocalDate()).orElse(null);
break;
@@ -91,14 +91,14 @@ public abstract class AbstractJdbcRowConverter implements
JdbcRowConverter {
fields[fieldIndex] = readTime(rs, resultSetIndex);
break;
case TIMESTAMP:
- Timestamp sqlTimestamp = JdbcUtils.getTimestamp(rs,
resultSetIndex);
+ Timestamp sqlTimestamp =
JdbcFieldTypeUtils.getTimestamp(rs, resultSetIndex);
fields[fieldIndex] =
Optional.ofNullable(sqlTimestamp)
.map(e -> e.toLocalDateTime())
.orElse(null);
break;
case BYTES:
- fields[fieldIndex] = JdbcUtils.getBytes(rs,
resultSetIndex);
+ fields[fieldIndex] = JdbcFieldTypeUtils.getBytes(rs,
resultSetIndex);
break;
case NULL:
fields[fieldIndex] = null;
@@ -116,7 +116,7 @@ public abstract class AbstractJdbcRowConverter implements
JdbcRowConverter {
}
protected LocalTime readTime(ResultSet rs, int resultSetIndex) throws
SQLException {
- Time sqlTime = JdbcUtils.getTime(rs, resultSetIndex);
+ Time sqlTime = JdbcFieldTypeUtils.getTime(rs, resultSetIndex);
return Optional.ofNullable(sqlTime).map(e ->
e.toLocalTime()).orElse(null);
}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/kingbase/KingbaseJdbcRowConverter.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/kingbase/KingbaseJdbcRowConverter.java
index 4aa41c0f4c..4a9411b99b 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/kingbase/KingbaseJdbcRowConverter.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/kingbase/KingbaseJdbcRowConverter.java
@@ -25,7 +25,7 @@ import
org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorException;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.AbstractJdbcRowConverter;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;
-import org.apache.seatunnel.connectors.seatunnel.jdbc.utils.JdbcUtils;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.utils.JdbcFieldTypeUtils;
import java.math.BigDecimal;
import java.sql.Date;
@@ -56,51 +56,51 @@ public class KingbaseJdbcRowConverter extends
AbstractJdbcRowConverter {
int resultSetIndex = fieldIndex + 1;
switch (seaTunnelDataType.getSqlType()) {
case STRING:
- fields[fieldIndex] = JdbcUtils.getString(rs,
resultSetIndex);
+ fields[fieldIndex] = JdbcFieldTypeUtils.getString(rs,
resultSetIndex);
break;
case BOOLEAN:
- fields[fieldIndex] = JdbcUtils.getBoolean(rs,
resultSetIndex);
+ fields[fieldIndex] = JdbcFieldTypeUtils.getBoolean(rs,
resultSetIndex);
break;
case TINYINT:
- fields[fieldIndex] = JdbcUtils.getByte(rs, resultSetIndex);
+ fields[fieldIndex] = JdbcFieldTypeUtils.getByte(rs,
resultSetIndex);
break;
case SMALLINT:
- fields[fieldIndex] = JdbcUtils.getShort(rs,
resultSetIndex);
+ fields[fieldIndex] = JdbcFieldTypeUtils.getShort(rs,
resultSetIndex);
break;
case INT:
- fields[fieldIndex] = JdbcUtils.getInt(rs, resultSetIndex);
+ fields[fieldIndex] = JdbcFieldTypeUtils.getInt(rs,
resultSetIndex);
break;
case BIGINT:
- fields[fieldIndex] = JdbcUtils.getLong(rs, resultSetIndex);
+ fields[fieldIndex] = JdbcFieldTypeUtils.getLong(rs,
resultSetIndex);
break;
case FLOAT:
- fields[fieldIndex] = JdbcUtils.getFloat(rs,
resultSetIndex);
+ fields[fieldIndex] = JdbcFieldTypeUtils.getFloat(rs,
resultSetIndex);
break;
case DOUBLE:
- fields[fieldIndex] = JdbcUtils.getDouble(rs,
resultSetIndex);
+ fields[fieldIndex] = JdbcFieldTypeUtils.getDouble(rs,
resultSetIndex);
break;
case DECIMAL:
- fields[fieldIndex] = JdbcUtils.getBigDecimal(rs,
resultSetIndex);
+ fields[fieldIndex] = JdbcFieldTypeUtils.getBigDecimal(rs,
resultSetIndex);
break;
case DATE:
- Date sqlDate = JdbcUtils.getDate(rs, resultSetIndex);
+ Date sqlDate = JdbcFieldTypeUtils.getDate(rs,
resultSetIndex);
fields[fieldIndex] =
Optional.ofNullable(sqlDate).map(Date::toLocalDate).orElse(null);
break;
case TIME:
- Time sqlTime = JdbcUtils.getTime(rs, resultSetIndex);
+ Time sqlTime = JdbcFieldTypeUtils.getTime(rs,
resultSetIndex);
fields[fieldIndex] =
Optional.ofNullable(sqlTime).map(Time::toLocalTime).orElse(null);
break;
case TIMESTAMP:
- Timestamp sqlTimestamp = JdbcUtils.getTimestamp(rs,
resultSetIndex);
+ Timestamp sqlTimestamp =
JdbcFieldTypeUtils.getTimestamp(rs, resultSetIndex);
fields[fieldIndex] =
Optional.ofNullable(sqlTimestamp)
.map(Timestamp::toLocalDateTime)
.orElse(null);
break;
case BYTES:
- fields[fieldIndex] = JdbcUtils.getBytes(rs,
resultSetIndex);
+ fields[fieldIndex] = JdbcFieldTypeUtils.getBytes(rs,
resultSetIndex);
break;
case NULL:
fields[fieldIndex] = null;
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresJdbcRowConverter.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresJdbcRowConverter.java
index 171ab406f5..f1cd4f8ec9 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresJdbcRowConverter.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresJdbcRowConverter.java
@@ -26,7 +26,7 @@ import
org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorException;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.AbstractJdbcRowConverter;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;
-import org.apache.seatunnel.connectors.seatunnel.jdbc.utils.JdbcUtils;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.utils.JdbcFieldTypeUtils;
import java.sql.Array;
import java.sql.Date;
@@ -65,52 +65,52 @@ public class PostgresJdbcRowConverter extends
AbstractJdbcRowConverter {
? null
:
rs.getObject(resultSetIndex).toString();
} else {
- fields[fieldIndex] = JdbcUtils.getString(rs,
resultSetIndex);
+ fields[fieldIndex] = JdbcFieldTypeUtils.getString(rs,
resultSetIndex);
}
break;
case BOOLEAN:
- fields[fieldIndex] = JdbcUtils.getBoolean(rs,
resultSetIndex);
+ fields[fieldIndex] = JdbcFieldTypeUtils.getBoolean(rs,
resultSetIndex);
break;
case TINYINT:
- fields[fieldIndex] = JdbcUtils.getByte(rs, resultSetIndex);
+ fields[fieldIndex] = JdbcFieldTypeUtils.getByte(rs,
resultSetIndex);
break;
case SMALLINT:
- fields[fieldIndex] = JdbcUtils.getShort(rs,
resultSetIndex);
+ fields[fieldIndex] = JdbcFieldTypeUtils.getShort(rs,
resultSetIndex);
break;
case INT:
- fields[fieldIndex] = JdbcUtils.getInt(rs, resultSetIndex);
+ fields[fieldIndex] = JdbcFieldTypeUtils.getInt(rs,
resultSetIndex);
break;
case BIGINT:
- fields[fieldIndex] = JdbcUtils.getLong(rs, resultSetIndex);
+ fields[fieldIndex] = JdbcFieldTypeUtils.getLong(rs,
resultSetIndex);
break;
case FLOAT:
- fields[fieldIndex] = JdbcUtils.getFloat(rs,
resultSetIndex);
+ fields[fieldIndex] = JdbcFieldTypeUtils.getFloat(rs,
resultSetIndex);
break;
case DOUBLE:
- fields[fieldIndex] = JdbcUtils.getDouble(rs,
resultSetIndex);
+ fields[fieldIndex] = JdbcFieldTypeUtils.getDouble(rs,
resultSetIndex);
break;
case DECIMAL:
- fields[fieldIndex] = JdbcUtils.getBigDecimal(rs,
resultSetIndex);
+ fields[fieldIndex] = JdbcFieldTypeUtils.getBigDecimal(rs,
resultSetIndex);
break;
case DATE:
- Date sqlDate = JdbcUtils.getDate(rs, resultSetIndex);
+ Date sqlDate = JdbcFieldTypeUtils.getDate(rs,
resultSetIndex);
fields[fieldIndex] =
Optional.ofNullable(sqlDate).map(e ->
e.toLocalDate()).orElse(null);
break;
case TIME:
- Time sqlTime = JdbcUtils.getTime(rs, resultSetIndex);
+ Time sqlTime = JdbcFieldTypeUtils.getTime(rs,
resultSetIndex);
fields[fieldIndex] =
Optional.ofNullable(sqlTime).map(e ->
e.toLocalTime()).orElse(null);
break;
case TIMESTAMP:
- Timestamp sqlTimestamp = JdbcUtils.getTimestamp(rs,
resultSetIndex);
+ Timestamp sqlTimestamp =
JdbcFieldTypeUtils.getTimestamp(rs, resultSetIndex);
fields[fieldIndex] =
Optional.ofNullable(sqlTimestamp)
.map(e -> e.toLocalDateTime())
.orElse(null);
break;
case BYTES:
- fields[fieldIndex] = JdbcUtils.getBytes(rs,
resultSetIndex);
+ fields[fieldIndex] = JdbcFieldTypeUtils.getBytes(rs,
resultSetIndex);
break;
case NULL:
fields[fieldIndex] = null;
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlserverJdbcRowConverter.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlserverJdbcRowConverter.java
index efb1729244..5ae0dec1af 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlserverJdbcRowConverter.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlserverJdbcRowConverter.java
@@ -25,7 +25,7 @@ import
org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorException;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.AbstractJdbcRowConverter;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;
-import org.apache.seatunnel.connectors.seatunnel.jdbc.utils.JdbcUtils;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.utils.JdbcFieldTypeUtils;
import java.math.BigDecimal;
import java.sql.PreparedStatement;
@@ -46,7 +46,7 @@ public class SqlserverJdbcRowConverter extends
AbstractJdbcRowConverter {
@Override
protected LocalTime readTime(ResultSet rs, int resultSetIndex) throws
SQLException {
- Timestamp sqlTime = JdbcUtils.getTimestamp(rs, resultSetIndex);
+ Timestamp sqlTime = JdbcFieldTypeUtils.getTimestamp(rs,
resultSetIndex);
return Optional.ofNullable(sqlTime)
.map(e -> e.toLocalDateTime().toLocalTime())
.orElse(null);
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcUtils.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcFieldTypeUtils.java
similarity index 71%
rename from
seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcUtils.java
rename to
seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcFieldTypeUtils.java
index b9f7f1eac3..ca8edb6576 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcUtils.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcFieldTypeUtils.java
@@ -23,61 +23,40 @@ import java.sql.SQLException;
import java.sql.Time;
import java.sql.Timestamp;
-public final class JdbcUtils {
+public final class JdbcFieldTypeUtils {
- private JdbcUtils() {}
-
- public static String getString(ResultSet resultSet, int columnIndex)
throws SQLException {
- return resultSet.getString(columnIndex);
- }
+ private JdbcFieldTypeUtils() {}
public static Boolean getBoolean(ResultSet resultSet, int columnIndex)
throws SQLException {
- if (null == resultSet.getObject(columnIndex)) {
- return null;
- }
- return resultSet.getBoolean(columnIndex);
+ return getNullableValue(resultSet, columnIndex, ResultSet::getBoolean);
}
public static Byte getByte(ResultSet resultSet, int columnIndex) throws
SQLException {
- if (null == resultSet.getObject(columnIndex)) {
- return null;
- }
- return resultSet.getByte(columnIndex);
+ return getNullableValue(resultSet, columnIndex, ResultSet::getByte);
}
public static Short getShort(ResultSet resultSet, int columnIndex) throws
SQLException {
- if (null == resultSet.getObject(columnIndex)) {
- return null;
- }
- return resultSet.getShort(columnIndex);
+ return getNullableValue(resultSet, columnIndex, ResultSet::getShort);
}
public static Integer getInt(ResultSet resultSet, int columnIndex) throws
SQLException {
- if (null == resultSet.getObject(columnIndex)) {
- return null;
- }
- return resultSet.getInt(columnIndex);
+ return getNullableValue(resultSet, columnIndex, ResultSet::getInt);
}
public static Long getLong(ResultSet resultSet, int columnIndex) throws
SQLException {
- if (null == resultSet.getObject(columnIndex)) {
- return null;
- }
- return resultSet.getLong(columnIndex);
+ return getNullableValue(resultSet, columnIndex, ResultSet::getLong);
}
public static Float getFloat(ResultSet resultSet, int columnIndex) throws
SQLException {
- if (null == resultSet.getObject(columnIndex)) {
- return null;
- }
- return resultSet.getFloat(columnIndex);
+ return getNullableValue(resultSet, columnIndex, ResultSet::getFloat);
}
public static Double getDouble(ResultSet resultSet, int columnIndex)
throws SQLException {
- if (null == resultSet.getObject(columnIndex)) {
- return null;
- }
- return resultSet.getDouble(columnIndex);
+ return getNullableValue(resultSet, columnIndex, ResultSet::getDouble);
+ }
+
+ public static String getString(ResultSet resultSet, int columnIndex)
throws SQLException {
+ return resultSet.getString(columnIndex);
}
public static BigDecimal getBigDecimal(ResultSet resultSet, int
columnIndex)
@@ -98,9 +77,22 @@ public final class JdbcUtils {
}
public static byte[] getBytes(ResultSet resultSet, int columnIndex) throws
SQLException {
- if (null == resultSet.getObject(columnIndex)) {
+ return resultSet.getBytes(columnIndex);
+ }
+
+ private static <T> T getNullableValue(
+ ResultSet resultSet,
+ int columnIndex,
+ ThrowingFunction<ResultSet, T, SQLException> getter)
+ throws SQLException {
+ if (resultSet.getObject(columnIndex) == null) {
return null;
}
- return resultSet.getBytes(columnIndex);
+ return getter.apply(resultSet, columnIndex);
+ }
+
+ @FunctionalInterface
+ private interface ThrowingFunction<T, R, E extends Exception> {
+ R apply(T t, int columnIndex) throws E;
}
}