This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch api-draft
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/api-draft by this push:
new da0949926 [feature] Add postgres support to jdbcSource (#2066)
da0949926 is described below
commit da0949926c0111f6acecaa06ac3d9015d76a5779
Author: ic4y <[email protected]>
AuthorDate: Tue Jun 28 09:49:13 2022 +0800
[feature] Add postgres support to jdbcSource (#2066)
---
.../converter/AbstractJdbcRowConverter.java | 5 +-
.../internal/dialect/psql/PostgresDialect.java | 39 +++++
.../dialect/psql/PostgresDialectFactory.java | 36 +++++
.../dialect/psql/PostgresJdbcRowConverter.java | 38 +++++
.../internal/dialect/psql/PostgresTypeMapper.java | 159 +++++++++++++++++++++
5 files changed, 276 insertions(+), 1 deletion(-)
diff --git
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/converter/AbstractJdbcRowConverter.java
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/converter/AbstractJdbcRowConverter.java
index 92a1eae2c..c1091293c 100644
---
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/converter/AbstractJdbcRowConverter.java
+++
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/converter/AbstractJdbcRowConverter.java
@@ -53,7 +53,10 @@ public abstract class AbstractJdbcRowConverter implements
JdbcRowConverter {
for (int i = 1; i <= seaTunnelDataTypes.length; i++) {
Object seatunnelField;
SeaTunnelDataType<?> seaTunnelDataType = seaTunnelDataTypes[i - 1];
- if (BasicType.BOOLEAN_TYPE.equals(seaTunnelDataType)) {
+ if (null == rs.getObject(i)) {
+ seatunnelField = null;
+ }
+ else if (BasicType.BOOLEAN_TYPE.equals(seaTunnelDataType)) {
seatunnelField = rs.getBoolean(i);
} else if (BasicType.BYTE_TYPE.equals(seaTunnelDataType)) {
seatunnelField = rs.getByte(i);
diff --git
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresDialect.java
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresDialect.java
new file mode 100644
index 000000000..f44ce8111
--- /dev/null
+++
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresDialect.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.psql;
+
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper;
+
+public class PostgresDialect implements JdbcDialect {
+ @Override
+ public String dialectName() {
+ return "PostgreSQL";
+ }
+
+ @Override
+ public JdbcRowConverter getRowConverter() {
+ return new PostgresJdbcRowConverter();
+ }
+
+ @Override
+ public JdbcDialectTypeMapper getJdbcDialectTypeMapper() {
+ return new PostgresTypeMapper();
+ }
+}
diff --git
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresDialectFactory.java
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresDialectFactory.java
new file mode 100644
index 000000000..963f7385e
--- /dev/null
+++
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresDialectFactory.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.psql;
+
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectFactory;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(JdbcDialectFactory.class)
+public class PostgresDialectFactory implements JdbcDialectFactory {
+ @Override
+ public boolean acceptsURL(String url) {
+ return url.startsWith("jdbc:postgresql:");
+ }
+
+ @Override
+ public JdbcDialect create() {
+ return new PostgresDialect();
+ }
+}
diff --git
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresJdbcRowConverter.java
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresJdbcRowConverter.java
new file mode 100644
index 000000000..4a06b8c82
--- /dev/null
+++
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresJdbcRowConverter.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.psql;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.AbstractJdbcRowConverter;
+
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+
+public class PostgresJdbcRowConverter extends AbstractJdbcRowConverter {
+ @Override
+ public String converterName() {
+ return null;
+ }
+
+ @Override
+ public SeaTunnelRow toInternal(ResultSet rs, ResultSetMetaData metaData,
SeaTunnelRowType typeInfo) throws SQLException {
+ return super.toInternal(rs, metaData, typeInfo);
+ }
+}
diff --git
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresTypeMapper.java
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresTypeMapper.java
new file mode 100644
index 000000000..43159f932
--- /dev/null
+++
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresTypeMapper.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.psql;
+
+import org.apache.seatunnel.api.table.type.ArrayType;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.DecimalType;
+import org.apache.seatunnel.api.table.type.LocalTimeType;
+import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+
+public class PostgresTypeMapper implements JdbcDialectTypeMapper {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(JdbcDialectTypeMapper.class);
+
+ // Postgres jdbc driver maps several alias to real type, we use real type
rather than alias:
+ // serial2 <=> int2
+ // smallserial <=> int2
+ // serial4 <=> serial
+ // serial8 <=> bigserial
+ // smallint <=> int2
+ // integer <=> int4
+ // int <=> int4
+ // bigint <=> int8
+ // float <=> float8
+ // boolean <=> bool
+ // decimal <=> numeric
+ private static final String PG_SMALLSERIAL = "smallserial";
+ private static final String PG_SERIAL = "serial";
+ private static final String PG_BIGSERIAL = "bigserial";
+ private static final String PG_BYTEA = "bytea";
+ private static final String PG_BYTEA_ARRAY = "_bytea";
+ private static final String PG_SMALLINT = "int2";
+ private static final String PG_SMALLINT_ARRAY = "_int2";
+ private static final String PG_INTEGER = "int4";
+ private static final String PG_INTEGER_ARRAY = "_int4";
+ private static final String PG_BIGINT = "int8";
+ private static final String PG_BIGINT_ARRAY = "_int8";
+ private static final String PG_REAL = "float4";
+ private static final String PG_REAL_ARRAY = "_float4";
+ private static final String PG_DOUBLE_PRECISION = "float8";
+ private static final String PG_DOUBLE_PRECISION_ARRAY = "_float8";
+ private static final String PG_NUMERIC = "numeric";
+ private static final String PG_NUMERIC_ARRAY = "_numeric";
+ private static final String PG_BOOLEAN = "bool";
+ private static final String PG_BOOLEAN_ARRAY = "_bool";
+ private static final String PG_TIMESTAMP = "timestamp";
+ private static final String PG_TIMESTAMP_ARRAY = "_timestamp";
+ private static final String PG_TIMESTAMPTZ = "timestamptz";
+ private static final String PG_TIMESTAMPTZ_ARRAY = "_timestamptz";
+ private static final String PG_DATE = "date";
+ private static final String PG_DATE_ARRAY = "_date";
+ private static final String PG_TIME = "time";
+ private static final String PG_TIME_ARRAY = "_time";
+ private static final String PG_TEXT = "text";
+ private static final String PG_TEXT_ARRAY = "_text";
+ private static final String PG_CHAR = "bpchar";
+ private static final String PG_CHAR_ARRAY = "_bpchar";
+ private static final String PG_CHARACTER = "character";
+ private static final String PG_CHARACTER_ARRAY = "_character";
+ private static final String PG_CHARACTER_VARYING = "varchar";
+ private static final String PG_CHARACTER_VARYING_ARRAY = "_varchar";
+
+ @SuppressWarnings("checkstyle:MagicNumber")
+ @Override
+ public SeaTunnelDataType<?> mapping(ResultSetMetaData metadata, int
colIndex) throws SQLException {
+
+ String pgType = metadata.getColumnTypeName(colIndex);
+
+ int precision = metadata.getPrecision(colIndex);
+ int scale = metadata.getScale(colIndex);
+
+ switch (pgType) {
+ case PG_BOOLEAN:
+ return BasicType.BOOLEAN_TYPE;
+ case PG_BOOLEAN_ARRAY:
+ return ArrayType.BOOLEAN_ARRAY_TYPE;
+ case PG_BYTEA:
+ return PrimitiveByteArrayType.INSTANCE;
+ case PG_BYTEA_ARRAY:
+ return ArrayType.BYTE_ARRAY_TYPE;
+ case PG_SMALLINT:
+ case PG_SMALLSERIAL:
+ case PG_INTEGER:
+ case PG_SERIAL:
+ return BasicType.INT_TYPE;
+ case PG_SMALLINT_ARRAY:
+ case PG_INTEGER_ARRAY:
+ return ArrayType.INT_ARRAY_TYPE;
+ case PG_BIGINT:
+ case PG_BIGSERIAL:
+ return BasicType.LONG_TYPE;
+ case PG_BIGINT_ARRAY:
+ return ArrayType.LONG_ARRAY_TYPE;
+ case PG_REAL:
+ return BasicType.FLOAT_TYPE;
+ case PG_REAL_ARRAY:
+ return ArrayType.FLOAT_ARRAY_TYPE;
+ case PG_DOUBLE_PRECISION:
+ return BasicType.DOUBLE_TYPE;
+ case PG_DOUBLE_PRECISION_ARRAY:
+ return ArrayType.DOUBLE_ARRAY_TYPE;
+ case PG_NUMERIC:
+ // see SPARK-26538: handle numeric without explicit precision
and scale.
+ if (precision > 0) {
+ return new DecimalType(precision,
metadata.getScale(colIndex));
+ }
+ return new DecimalType(38, 18);
+ case PG_CHAR:
+ case PG_CHARACTER:
+ case PG_CHARACTER_VARYING:
+ case PG_TEXT:
+ return BasicType.STRING_TYPE;
+ case PG_CHAR_ARRAY:
+ case PG_CHARACTER_ARRAY:
+ case PG_CHARACTER_VARYING_ARRAY:
+ case PG_TEXT_ARRAY:
+ return ArrayType.STRING_ARRAY_TYPE;
+ case PG_TIMESTAMP:
+ return LocalTimeType.LOCAL_DATE_TIME_TYPE;
+ case PG_TIME:
+ return LocalTimeType.LOCAL_TIME_TYPE;
+ case PG_DATE:
+ return LocalTimeType.LOCAL_DATE_TYPE;
+
+ case PG_TIMESTAMP_ARRAY:
+ case PG_NUMERIC_ARRAY:
+ case PG_TIMESTAMPTZ:
+ case PG_TIMESTAMPTZ_ARRAY:
+ case PG_TIME_ARRAY:
+ case PG_DATE_ARRAY:
+ default:
+ throw new UnsupportedOperationException(
+ String.format("Doesn't support Postgres type '%s' yet",
pgType));
+ }
+ }
+}