Carl-Zhou-CN commented on code in PR #10048:
URL: https://github.com/apache/seatunnel/pull/10048#discussion_r2551649227
##########
seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresJdbcRowConverter.java:
##########
@@ -316,4 +348,123 @@ public String microsecondsToIntervalFormatVal(String
intervalVal) {
if (seconds > 0) sb.append(seconds).append(" seconds");
return sb.toString().trim();
}
+
+ private OffsetDateTime getPostgresOffsetDateTime(ResultSet rs, int
columnIndex)
+ throws SQLException {
+ // Read the value once to avoid drivers returning null on subsequent
reads
+ final Object obj = rs.getObject(columnIndex);
+
+ if (obj == null) {
+ return null;
+ }
+
+ // Direct types
+ if (obj instanceof OffsetDateTime) {
+ return (OffsetDateTime) obj;
+ }
+ if (obj instanceof Timestamp) {
+ return ((Timestamp) obj).toInstant().atOffset(ZoneOffset.UTC);
+ }
+ if (obj instanceof java.time.ZonedDateTime) {
+ return ((java.time.ZonedDateTime) obj).toOffsetDateTime();
+ }
+ if (obj instanceof java.util.Date) {
+ return ((java.util.Date) obj).toInstant().atOffset(ZoneOffset.UTC);
+ }
+
+ // PostgreSQL specific objects (e.g., PGobject) or any
org.postgresql.* class
+ final String className = obj.getClass().getName();
+ if (className.startsWith("org.postgresql.")) {
+ String str = null;
+ try {
+ str = obj.toString();
+ } catch (Exception e) {
+ log.debug("Failed to get PostgreSQL timestamp object string
representation", e);
+ }
+ if (str != null) {
+ OffsetDateTime parsed = parseTimestampIfPresent(str);
Review Comment:
The parseTimestampIfPresent method has the ability to be compatible with
null values. Remove this `if`
##########
seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresJdbcRowConverter.java:
##########
@@ -316,4 +348,123 @@ public String microsecondsToIntervalFormatVal(String
intervalVal) {
if (seconds > 0) sb.append(seconds).append(" seconds");
return sb.toString().trim();
}
+
+ private OffsetDateTime getPostgresOffsetDateTime(ResultSet rs, int
columnIndex)
+ throws SQLException {
+ // Read the value once to avoid drivers returning null on subsequent
reads
+ final Object obj = rs.getObject(columnIndex);
+
+ if (obj == null) {
+ return null;
+ }
+
+ // Direct types
+ if (obj instanceof OffsetDateTime) {
+ return (OffsetDateTime) obj;
+ }
+ if (obj instanceof Timestamp) {
+ return ((Timestamp) obj).toInstant().atOffset(ZoneOffset.UTC);
+ }
+ if (obj instanceof java.time.ZonedDateTime) {
+ return ((java.time.ZonedDateTime) obj).toOffsetDateTime();
+ }
+ if (obj instanceof java.util.Date) {
+ return ((java.util.Date) obj).toInstant().atOffset(ZoneOffset.UTC);
+ }
+
+ // PostgreSQL specific objects (e.g., PGobject) or any
org.postgresql.* class
+ final String className = obj.getClass().getName();
+ if (className.startsWith("org.postgresql.")) {
+ String str = null;
+ try {
+ str = obj.toString();
+ } catch (Exception e) {
+ log.debug("Failed to get PostgreSQL timestamp object string
representation", e);
Review Comment:
When will it happen?
##########
seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresTypeConverter.java:
##########
@@ -444,6 +452,19 @@ public BasicTypeDefine reconvert(Column column) {
builder.dataType(PG_TIMESTAMP);
builder.scale(timestampScale);
break;
+ case TIMESTAMP_TZ:
+ Integer timestampTzScale = column.getScale();
+ if (timestampTzScale != null && timestampTzScale >
MAX_TIMESTAMP_SCALE) {
+ timestampTzScale = MAX_TIMESTAMP_SCALE;
+ }
+ if (timestampTzScale != null && timestampTzScale > 0) {
Review Comment:
It would be better to merge the two `if`
##########
seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresJdbcRowConverter.java:
##########
@@ -316,4 +348,123 @@ public String microsecondsToIntervalFormatVal(String
intervalVal) {
if (seconds > 0) sb.append(seconds).append(" seconds");
return sb.toString().trim();
}
+
+ private OffsetDateTime getPostgresOffsetDateTime(ResultSet rs, int
columnIndex)
+ throws SQLException {
+ // Read the value once to avoid drivers returning null on subsequent
reads
+ final Object obj = rs.getObject(columnIndex);
+
+ if (obj == null) {
+ return null;
+ }
+
+ // Direct types
+ if (obj instanceof OffsetDateTime) {
+ return (OffsetDateTime) obj;
+ }
+ if (obj instanceof Timestamp) {
+ return ((Timestamp) obj).toInstant().atOffset(ZoneOffset.UTC);
+ }
+ if (obj instanceof java.time.ZonedDateTime) {
+ return ((java.time.ZonedDateTime) obj).toOffsetDateTime();
+ }
+ if (obj instanceof java.util.Date) {
+ return ((java.util.Date) obj).toInstant().atOffset(ZoneOffset.UTC);
+ }
+
+ // PostgreSQL specific objects (e.g., PGobject) or any
org.postgresql.* class
+ final String className = obj.getClass().getName();
+ if (className.startsWith("org.postgresql.")) {
+ String str = null;
+ try {
+ str = obj.toString();
+ } catch (Exception e) {
+ log.debug("Failed to get PostgreSQL timestamp object string
representation", e);
+ }
+ if (str != null) {
+ OffsetDateTime parsed = parseTimestampIfPresent(str);
+ if (parsed != null) {
+ return parsed;
+ }
+ }
+ }
+
+ // String-like values
+ if (obj instanceof CharSequence) {
+ return parseTimestampIfPresent(obj.toString());
+ }
+
+ // Last resort: attempt to parse from toString() representation
+ final String str;
+ try {
+ str = String.valueOf(obj);
+ } catch (Throwable ignore) {
+ return null;
+ }
+ return parseTimestampIfPresent(str);
+ }
+
+ private OffsetDateTime parseTimestampIfPresent(String str) throws
SQLException {
+ if (str == null) {
+ return null;
+ }
+ return parsePostgresTimestampTz(str);
+ }
+
+ private OffsetDateTime parsePostgresTimestampTz(String str) throws
SQLException {
+ String normalized = normalizeIsoTimestamp(str);
+ if (normalized == null) {
+ return null;
+ }
+
+ try {
+ return OffsetDateTime.parse(normalized);
+ } catch (Exception primary) {
+ log.debug("Failed to parse PostgreSQL timestamptz as ISO-8601:
{}", str, primary);
+ try {
+ String withoutOffset =
+
normalized.replaceFirst("([+-]\\d{2}:?\\d{2}|\\s+UTC|[zZ])$", "");
+ String fallback = withoutOffset.replace('T', ' ').trim();
+ Timestamp ts = Timestamp.valueOf(fallback);
+ return ts.toInstant().atOffset(ZoneOffset.UTC);
+ } catch (Exception secondary) {
+ log.debug(
+ "Failed to parse PostgreSQL timestamptz as UTC
timestamp: {}",
+ str,
+ secondary);
+ throw new SQLException(
+ "Failed to parse PostgreSQL timestamptz string: " +
str, secondary);
+ }
+ }
+ }
+
+ private String normalizeIsoTimestamp(String value) {
Review Comment:
Please add some comments indicating which formats of time this method handles
##########
seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresJdbcRowConverter.java:
##########
@@ -316,4 +348,123 @@ public String microsecondsToIntervalFormatVal(String
intervalVal) {
if (seconds > 0) sb.append(seconds).append(" seconds");
return sb.toString().trim();
}
+
+ private OffsetDateTime getPostgresOffsetDateTime(ResultSet rs, int
columnIndex)
+ throws SQLException {
+ // Read the value once to avoid drivers returning null on subsequent
reads
+ final Object obj = rs.getObject(columnIndex);
+
+ if (obj == null) {
+ return null;
+ }
+
+ // Direct types
+ if (obj instanceof OffsetDateTime) {
+ return (OffsetDateTime) obj;
+ }
+ if (obj instanceof Timestamp) {
+ return ((Timestamp) obj).toInstant().atOffset(ZoneOffset.UTC);
+ }
+ if (obj instanceof java.time.ZonedDateTime) {
+ return ((java.time.ZonedDateTime) obj).toOffsetDateTime();
+ }
+ if (obj instanceof java.util.Date) {
+ return ((java.util.Date) obj).toInstant().atOffset(ZoneOffset.UTC);
+ }
+
+ // PostgreSQL specific objects (e.g., PGobject) or any
org.postgresql.* class
+ final String className = obj.getClass().getName();
+ if (className.startsWith("org.postgresql.")) {
+ String str = null;
+ try {
+ str = obj.toString();
+ } catch (Exception e) {
+ log.debug("Failed to get PostgreSQL timestamp object string
representation", e);
+ }
+ if (str != null) {
+ OffsetDateTime parsed = parseTimestampIfPresent(str);
+ if (parsed != null) {
+ return parsed;
+ }
+ }
+ }
+
+ // String-like values
+ if (obj instanceof CharSequence) {
+ return parseTimestampIfPresent(obj.toString());
+ }
+
+ // Last resort: attempt to parse from toString() representation
+ final String str;
+ try {
+ str = String.valueOf(obj);
+ } catch (Throwable ignore) {
+ return null;
+ }
+ return parseTimestampIfPresent(str);
+ }
+
+ private OffsetDateTime parseTimestampIfPresent(String str) throws
SQLException {
+ if (str == null) {
+ return null;
+ }
+ return parsePostgresTimestampTz(str);
+ }
+
+ private OffsetDateTime parsePostgresTimestampTz(String str) throws
SQLException {
+ String normalized = normalizeIsoTimestamp(str);
+ if (normalized == null) {
+ return null;
+ }
+
+ try {
+ return OffsetDateTime.parse(normalized);
Review Comment:
Can we avoid this exception because the format has already been corrected
within the normalizeIsoTimestamp method
##########
seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresJdbcRowConverter.java:
##########
@@ -316,4 +348,123 @@ public String microsecondsToIntervalFormatVal(String
intervalVal) {
if (seconds > 0) sb.append(seconds).append(" seconds");
return sb.toString().trim();
}
+
+ private OffsetDateTime getPostgresOffsetDateTime(ResultSet rs, int
columnIndex)
+ throws SQLException {
+ // Read the value once to avoid drivers returning null on subsequent
reads
+ final Object obj = rs.getObject(columnIndex);
+
+ if (obj == null) {
+ return null;
+ }
+
+ // Direct types
+ if (obj instanceof OffsetDateTime) {
+ return (OffsetDateTime) obj;
+ }
+ if (obj instanceof Timestamp) {
+ return ((Timestamp) obj).toInstant().atOffset(ZoneOffset.UTC);
+ }
+ if (obj instanceof java.time.ZonedDateTime) {
+ return ((java.time.ZonedDateTime) obj).toOffsetDateTime();
+ }
+ if (obj instanceof java.util.Date) {
+ return ((java.util.Date) obj).toInstant().atOffset(ZoneOffset.UTC);
+ }
+
+ // PostgreSQL specific objects (e.g., PGobject) or any
org.postgresql.* class
+ final String className = obj.getClass().getName();
+ if (className.startsWith("org.postgresql.")) {
+ String str = null;
+ try {
+ str = obj.toString();
+ } catch (Exception e) {
+ log.debug("Failed to get PostgreSQL timestamp object string
representation", e);
+ }
+ if (str != null) {
+ OffsetDateTime parsed = parseTimestampIfPresent(str);
+ if (parsed != null) {
+ return parsed;
+ }
+ }
+ }
+
+ // String-like values
+ if (obj instanceof CharSequence) {
Review Comment:
It seems that they are all for obtaining values on the toString method of
obj. Can they be converged into one method? In my opinion, there seem to be too
many judgments now, which is not conducive to reading
##########
seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcFieldTypeUtils.java:
##########
@@ -95,12 +100,109 @@ public static byte[] getBytes(ResultSet resultSet, int
columnIndex) throws SQLEx
return resultSet.getBytes(columnIndex);
}
+ public static OffsetDateTime getOffsetDateTime(ResultSet resultSet, int
columnIndex)
+ throws SQLException {
+ final Object obj = resultSet.getObject(columnIndex);
+ if (obj == null) {
+ return null;
+ }
+
+ // Handle OffsetDateTime directly
+ if (obj instanceof OffsetDateTime) {
+ return (OffsetDateTime) obj;
+ }
+
+ // Handle ZonedDateTime
+ if (obj instanceof ZonedDateTime) {
+ return ((ZonedDateTime) obj).toOffsetDateTime();
+ }
+
+ // Handle Instant
+ if (obj instanceof Instant) {
+ return ((Instant) obj).atOffset(ZoneOffset.UTC);
+ }
+
+ // Handle java.sql.Timestamp
+ if (obj instanceof Timestamp) {
+ return ((Timestamp)
obj).toLocalDateTime().atOffset(ZoneOffset.UTC);
+ }
+
+ // Handle java.util.Date
+ if (obj instanceof java.util.Date) {
+ return ((java.util.Date) obj).toInstant().atOffset(ZoneOffset.UTC);
+ }
+
+ // Handle Long (epoch milliseconds)
+ if (obj instanceof Long) {
+ return Instant.ofEpochMilli((Long) obj).atOffset(ZoneOffset.UTC);
+ }
+
+ // Try to parse as string
+ String str = obj.toString();
+ try {
+ return parseOffsetDateTimeFromString(str);
+ } catch (Exception e) {
+ throw new SQLException(
+ "Failed to parse OffsetDateTime value: "
+ + str
+ + " (class: "
+ + obj.getClass().getName()
+ + ")",
+ e);
+ }
+ }
+
+ public static OffsetDateTime parseOffsetDateTimeFromString(String str)
+ throws DateTimeParseException {
+ if (str == null || str.trim().isEmpty()) {
Review Comment:
The null value cannot reach here. Simplify it
##########
seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/ConnectorPackageServiceContainer.java:
##########
@@ -52,7 +52,7 @@
@NoArgsConstructor
@Slf4j
public class ConnectorPackageServiceContainer extends AbstractTestContainer {
- private static final String JDK_DOCKER_IMAGE = "openjdk:8";
+ private static final String JDK_DOCKER_IMAGE =
"seatunnelhub/openjdk:8u342";
Review Comment:
Does he need a higher JDK version? If so, by how much?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]