EricJoy2048 commented on code in PR #2937:
URL:
https://github.com/apache/incubator-seatunnel/pull/2937#discussion_r982345129
##########
pom.xml:
##########
@@ -198,6 +198,12 @@
<snappy-java.version>1.1.8.3</snappy-java.version>
<checker.qual.version>3.10.0</checker.qual.version>
<awaitility.version>4.2.0</awaitility.version>
+ <httpclient.version>4.5.13</httpclient.version>
+ <fastjson.version>1.2.83</fastjson.version>
Review Comment:
Please manager the jars which used by connector in connector pom.xml like
other connector.
##########
pom.xml:
##########
@@ -350,6 +356,43 @@
<version>${awaitility.version}</version>
<scope>test</scope>
</dependency>
+
+ <dependency>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpclient</artifactId>
+ <version>${httpclient.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.alibaba</groupId>
+ <artifactId>fastjson</artifactId>
Review Comment:
Suggest use jackson replace fastjson.
##########
seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/seatunnel/druid/client/DruidInputFormat.java:
##########
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.druid.client;
+
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.LocalTimeType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import
org.apache.seatunnel.connectors.seatunnel.druid.config.DruidSourceOptions;
+import org.apache.seatunnel.connectors.seatunnel.druid.config.DruidTypeMapper;
+
+import lombok.Data;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.Timestamp;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.List;
+import java.util.TimeZone;
+
+@Data
+public class DruidInputFormat implements Serializable {
+ protected static final String COLUMNS_DEFAULT = "*";
+ protected static final String QUERY_TEMPLATE = "SELECT %s FROM %s WHERE
1=1";
+ private static final Logger LOGGER =
LoggerFactory.getLogger(DruidInputFormat.class);
+ protected transient Connection connection;
+ protected transient PreparedStatement statement;
+ protected transient ResultSet resultSet;
+ protected SeaTunnelRowType rowTypeInfo;
+ protected DruidSourceOptions druidSourceOptions;
+ protected String quarySQL;
+ protected boolean hasNext;
+
+ public DruidInputFormat(DruidSourceOptions druidSourceOptions) {
+ this.druidSourceOptions = druidSourceOptions;
+ this.rowTypeInfo = initTableField();
+ }
+
+ public ResultSetMetaData getResultSetMetaData() throws SQLException {
+ try {
+ quarySQL = getSQL();
+ connection =
DriverManager.getConnection(druidSourceOptions.getUrl());
+ statement = connection.prepareStatement(quarySQL,
ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
+ return statement.getMetaData();
+ } catch (SQLException se) {
+ throw new SQLException("ResultSetMetaData() failed." +
se.getMessage(), se);
+ }
+ }
+
+ public void openInputFormat() {
+ try {
+ connection =
DriverManager.getConnection(druidSourceOptions.getUrl());
+ statement = connection.prepareStatement(quarySQL,
ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
+ resultSet = statement.executeQuery();
+ hasNext = resultSet.next();
+ } catch (SQLException se) {
+ throw new IllegalArgumentException("openInputFormat() failed." +
se.getMessage(), se);
Review Comment:
Same as above.
##########
seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/seatunnel/druid/client/DruidInputFormat.java:
##########
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.druid.client;
+
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.LocalTimeType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import
org.apache.seatunnel.connectors.seatunnel.druid.config.DruidSourceOptions;
+import org.apache.seatunnel.connectors.seatunnel.druid.config.DruidTypeMapper;
+
+import lombok.Data;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.Timestamp;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.List;
+import java.util.TimeZone;
+
+@Data
+public class DruidInputFormat implements Serializable {
+ protected static final String COLUMNS_DEFAULT = "*";
+ protected static final String QUERY_TEMPLATE = "SELECT %s FROM %s WHERE
1=1";
+ private static final Logger LOGGER =
LoggerFactory.getLogger(DruidInputFormat.class);
+ protected transient Connection connection;
+ protected transient PreparedStatement statement;
+ protected transient ResultSet resultSet;
+ protected SeaTunnelRowType rowTypeInfo;
+ protected DruidSourceOptions druidSourceOptions;
+ protected String quarySQL;
+ protected boolean hasNext;
+
+ public DruidInputFormat(DruidSourceOptions druidSourceOptions) {
+ this.druidSourceOptions = druidSourceOptions;
+ this.rowTypeInfo = initTableField();
+ }
+
+ public ResultSetMetaData getResultSetMetaData() throws SQLException {
+ try {
+ quarySQL = getSQL();
+ connection =
DriverManager.getConnection(druidSourceOptions.getUrl());
+ statement = connection.prepareStatement(quarySQL,
ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
+ return statement.getMetaData();
+ } catch (SQLException se) {
+ throw new SQLException("ResultSetMetaData() failed." +
se.getMessage(), se);
+ }
+ }
+
+ public void openInputFormat() {
+ try {
+ connection =
DriverManager.getConnection(druidSourceOptions.getUrl());
+ statement = connection.prepareStatement(quarySQL,
ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
+ resultSet = statement.executeQuery();
+ hasNext = resultSet.next();
+ } catch (SQLException se) {
+ throw new IllegalArgumentException("openInputFormat() failed." +
se.getMessage(), se);
+ }
+ }
+
+ private String getSQL() throws SQLException {
+ String columns = COLUMNS_DEFAULT;
+ String startTimestamp = druidSourceOptions.getStartTimestamp();
+ String endTimestamp = druidSourceOptions.getEndTimestamp();
+ String dataSource = druidSourceOptions.getDatasource();
+ if (druidSourceOptions.getColumns() != null &&
druidSourceOptions.getColumns().size() > 0) {
+ columns = String.join(",", druidSourceOptions.getColumns());
+ }
+ String sql = String.format(QUERY_TEMPLATE, columns, dataSource);
+ if (startTimestamp != null) {
+ sql += " AND __time >= '" + startTimestamp + "'";
+ }
+ if (endTimestamp != null) {
+ sql += " AND __time < '" + endTimestamp + "'";
+ }
+ return sql;
+ }
+
+ public void closeInputFormat() {
+ try {
+ if (resultSet != null) {
+ resultSet.close();
+ }
+ if (statement != null) {
+ statement.close();
+ }
+ } catch (SQLException se) {
+ LOGGER.error("DruidInputFormat Statement couldn't be closed", se);
+ } finally {
+ statement = null;
+ resultSet = null;
+ try {
+ if (connection != null) {
+ connection.close();
+ }
+ } catch (SQLException se) {
+ LOGGER.error("DruidInputFormat Connection couldn't be closed",
se);
+ } finally {
+ connection = null;
+ }
+ }
+ }
+
+ public boolean reachedEnd() throws IOException {
+ return !hasNext;
+ }
+
+ public SeaTunnelRow nextRecord() throws IOException {
+ try {
+ if (!hasNext) {
+ return null;
+ }
+ SeaTunnelRow seaTunnelRow = toInternal(resultSet, rowTypeInfo);
+ // update hasNext after we've read the record
+ hasNext = resultSet.next();
+ return seaTunnelRow;
+ } catch (SQLException se) {
+ throw new IOException("Couldn't read data - " + se.getMessage(),
se);
+ } catch (NullPointerException npe) {
+ throw new IOException("Couldn't access resultSet", npe);
+ }
+ }
+
+ public SeaTunnelRow toInternal(ResultSet rs, SeaTunnelRowType rowTypeInfo)
throws SQLException {
+ List<Object> fields = new ArrayList<>();
+ SeaTunnelDataType<?>[] seaTunnelDataTypes =
rowTypeInfo.getFieldTypes();
+
+ for (int i = 1; i <= seaTunnelDataTypes.length; i++) {
+ Object seatunnelField;
+ SeaTunnelDataType<?> seaTunnelDataType = seaTunnelDataTypes[i - 1];
+ if (null == rs.getObject(i)) {
+ seatunnelField = null;
+ } else if (BasicType.BOOLEAN_TYPE.equals(seaTunnelDataType)) {
+ seatunnelField = rs.getBoolean(i);
+ } else if (BasicType.BYTE_TYPE.equals(seaTunnelDataType)) {
+ seatunnelField = rs.getByte(i);
+ } else if (BasicType.SHORT_TYPE.equals(seaTunnelDataType)) {
+ seatunnelField = rs.getShort(i);
+ } else if (BasicType.INT_TYPE.equals(seaTunnelDataType)) {
+ seatunnelField = rs.getInt(i);
+ } else if (BasicType.LONG_TYPE.equals(seaTunnelDataType)) {
+ seatunnelField = rs.getLong(i);
+ } else if (BasicType.FLOAT_TYPE.equals(seaTunnelDataType)) {
+ seatunnelField = rs.getFloat(i);
+ } else if (BasicType.DOUBLE_TYPE.equals(seaTunnelDataType)) {
+ seatunnelField = rs.getDouble(i);
+ } else if (BasicType.STRING_TYPE.equals(seaTunnelDataType)) {
+ seatunnelField = rs.getString(i);
+ } else if
(LocalTimeType.LOCAL_DATE_TIME_TYPE.equals(seaTunnelDataType)) {
+ Timestamp ts = rs.getTimestamp(i,
Calendar.getInstance(TimeZone.getTimeZone("UTC")));
+ LocalDateTime localDateTime =
LocalDateTime.ofInstant(ts.toInstant(), ZoneId.of("UTC")); // good
+ seatunnelField = localDateTime;
+ } else if
(LocalTimeType.LOCAL_TIME_TYPE.equals(seaTunnelDataType)) {
+ seatunnelField = rs.getDate(i);
+ } else if
(LocalTimeType.LOCAL_DATE_TYPE.equals(seaTunnelDataType)) {
+ seatunnelField = rs.getDate(i);
+ } else {
+ throw new IllegalStateException("Unexpected value: " +
seaTunnelDataType);
+ }
+
+ fields.add(seatunnelField);
+ }
+
+ return new SeaTunnelRow(fields.toArray());
Review Comment:
Each row will call this method, I suggest we minimize unnecessary memory
overhead. So I think `Object[] fields = new
Object[seaTunnelDataTypes.getTotalFields()]` is better.
##########
seatunnel-connectors-v2/connector-druid/pom.xml:
##########
@@ -0,0 +1,55 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns="http://maven.apache.org/POM/4.0.0"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>seatunnel-connectors-v2</artifactId>
+ <groupId>org.apache.seatunnel</groupId>
+ <version>${revision}</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>connector-druid</artifactId>
+
+ <properties>
+ <jackson-datatype-joda.version>2.6.7</jackson-datatype-joda.version>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>connector-common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.datatype</groupId>
+ <artifactId>jackson-datatype-joda</artifactId>
+ <version>${jackson-datatype-joda.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpclient</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.alibaba</groupId>
+ <artifactId>fastjson</artifactId>
Review Comment:
same as above.
##########
seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/seatunnel/druid/client/DruidInputFormat.java:
##########
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.druid.client;
+
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.LocalTimeType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import
org.apache.seatunnel.connectors.seatunnel.druid.config.DruidSourceOptions;
+import org.apache.seatunnel.connectors.seatunnel.druid.config.DruidTypeMapper;
+
+import lombok.Data;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.Timestamp;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.List;
+import java.util.TimeZone;
+
+@Data
+public class DruidInputFormat implements Serializable {
+ protected static final String COLUMNS_DEFAULT = "*";
+ protected static final String QUERY_TEMPLATE = "SELECT %s FROM %s WHERE
1=1";
+ private static final Logger LOGGER =
LoggerFactory.getLogger(DruidInputFormat.class);
+ protected transient Connection connection;
+ protected transient PreparedStatement statement;
+ protected transient ResultSet resultSet;
+ protected SeaTunnelRowType rowTypeInfo;
+ protected DruidSourceOptions druidSourceOptions;
+ protected String quarySQL;
+ protected boolean hasNext;
+
+ public DruidInputFormat(DruidSourceOptions druidSourceOptions) {
+ this.druidSourceOptions = druidSourceOptions;
+ this.rowTypeInfo = initTableField();
+ }
+
+ public ResultSetMetaData getResultSetMetaData() throws SQLException {
+ try {
+ quarySQL = getSQL();
+ connection =
DriverManager.getConnection(druidSourceOptions.getUrl());
+ statement = connection.prepareStatement(quarySQL,
ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
+ return statement.getMetaData();
+ } catch (SQLException se) {
+ throw new SQLException("ResultSetMetaData() failed." +
se.getMessage(), se);
+ }
+ }
+
+ public void openInputFormat() {
+ try {
+ connection =
DriverManager.getConnection(druidSourceOptions.getUrl());
+ statement = connection.prepareStatement(quarySQL,
ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
+ resultSet = statement.executeQuery();
+ hasNext = resultSet.next();
+ } catch (SQLException se) {
+ throw new IllegalArgumentException("openInputFormat() failed." +
se.getMessage(), se);
+ }
+ }
+
+ private String getSQL() throws SQLException {
+ String columns = COLUMNS_DEFAULT;
+ String startTimestamp = druidSourceOptions.getStartTimestamp();
+ String endTimestamp = druidSourceOptions.getEndTimestamp();
+ String dataSource = druidSourceOptions.getDatasource();
+ if (druidSourceOptions.getColumns() != null &&
druidSourceOptions.getColumns().size() > 0) {
+ columns = String.join(",", druidSourceOptions.getColumns());
+ }
+ String sql = String.format(QUERY_TEMPLATE, columns, dataSource);
+ if (startTimestamp != null) {
+ sql += " AND __time >= '" + startTimestamp + "'";
+ }
+ if (endTimestamp != null) {
+ sql += " AND __time < '" + endTimestamp + "'";
+ }
+ return sql;
+ }
+
+ public void closeInputFormat() {
+ try {
+ if (resultSet != null) {
+ resultSet.close();
+ }
+ if (statement != null) {
+ statement.close();
+ }
+ } catch (SQLException se) {
+ LOGGER.error("DruidInputFormat Statement couldn't be closed", se);
+ } finally {
+ statement = null;
+ resultSet = null;
+ try {
+ if (connection != null) {
+ connection.close();
+ }
+ } catch (SQLException se) {
+ LOGGER.error("DruidInputFormat Connection couldn't be closed",
se);
+ } finally {
+ connection = null;
+ }
+ }
+ }
+
+ public boolean reachedEnd() throws IOException {
+ return !hasNext;
+ }
+
+ public SeaTunnelRow nextRecord() throws IOException {
+ try {
+ if (!hasNext) {
+ return null;
+ }
+ SeaTunnelRow seaTunnelRow = toInternal(resultSet, rowTypeInfo);
+ // update hasNext after we've read the record
+ hasNext = resultSet.next();
+ return seaTunnelRow;
+ } catch (SQLException se) {
+ throw new IOException("Couldn't read data - " + se.getMessage(),
se);
+ } catch (NullPointerException npe) {
+ throw new IOException("Couldn't access resultSet", npe);
+ }
+ }
+
+ public SeaTunnelRow toInternal(ResultSet rs, SeaTunnelRowType rowTypeInfo)
throws SQLException {
+ List<Object> fields = new ArrayList<>();
+ SeaTunnelDataType<?>[] seaTunnelDataTypes =
rowTypeInfo.getFieldTypes();
+
+ for (int i = 1; i <= seaTunnelDataTypes.length; i++) {
+ Object seatunnelField;
+ SeaTunnelDataType<?> seaTunnelDataType = seaTunnelDataTypes[i - 1];
+ if (null == rs.getObject(i)) {
+ seatunnelField = null;
+ } else if (BasicType.BOOLEAN_TYPE.equals(seaTunnelDataType)) {
Review Comment:
use seaTunnelDataType.getSqlType() better.
##########
seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/seatunnel/druid/client/DruidInputFormat.java:
##########
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.druid.client;
+
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.LocalTimeType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import
org.apache.seatunnel.connectors.seatunnel.druid.config.DruidSourceOptions;
+import org.apache.seatunnel.connectors.seatunnel.druid.config.DruidTypeMapper;
+
+import lombok.Data;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.Timestamp;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.List;
+import java.util.TimeZone;
+
+@Data
+public class DruidInputFormat implements Serializable {
+ protected static final String COLUMNS_DEFAULT = "*";
+ protected static final String QUERY_TEMPLATE = "SELECT %s FROM %s WHERE
1=1";
+ private static final Logger LOGGER =
LoggerFactory.getLogger(DruidInputFormat.class);
+ protected transient Connection connection;
+ protected transient PreparedStatement statement;
+ protected transient ResultSet resultSet;
+ protected SeaTunnelRowType rowTypeInfo;
+ protected DruidSourceOptions druidSourceOptions;
+ protected String quarySQL;
+ protected boolean hasNext;
+
+ public DruidInputFormat(DruidSourceOptions druidSourceOptions) {
+ this.druidSourceOptions = druidSourceOptions;
+ this.rowTypeInfo = initTableField();
+ }
+
+ public ResultSetMetaData getResultSetMetaData() throws SQLException {
+ try {
+ quarySQL = getSQL();
+ connection =
DriverManager.getConnection(druidSourceOptions.getUrl());
+ statement = connection.prepareStatement(quarySQL,
ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
+ return statement.getMetaData();
+ } catch (SQLException se) {
+ throw new SQLException("ResultSetMetaData() failed." +
se.getMessage(), se);
Review Comment:
Please use `ExceptionUtils.getMessage()` replace `se.getMessage()`
##########
seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/seatunnel/druid/client/DruidInputFormat.java:
##########
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.druid.client;
+
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.LocalTimeType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import
org.apache.seatunnel.connectors.seatunnel.druid.config.DruidSourceOptions;
+import org.apache.seatunnel.connectors.seatunnel.druid.config.DruidTypeMapper;
+
+import lombok.Data;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.Timestamp;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.List;
+import java.util.TimeZone;
+
+@Data
+public class DruidInputFormat implements Serializable {
+ protected static final String COLUMNS_DEFAULT = "*";
+ protected static final String QUERY_TEMPLATE = "SELECT %s FROM %s WHERE
1=1";
+ private static final Logger LOGGER =
LoggerFactory.getLogger(DruidInputFormat.class);
+ protected transient Connection connection;
+ protected transient PreparedStatement statement;
+ protected transient ResultSet resultSet;
+ protected SeaTunnelRowType rowTypeInfo;
+ protected DruidSourceOptions druidSourceOptions;
+ protected String quarySQL;
+ protected boolean hasNext;
+
+ public DruidInputFormat(DruidSourceOptions druidSourceOptions) {
+ this.druidSourceOptions = druidSourceOptions;
+ this.rowTypeInfo = initTableField();
+ }
+
+ public ResultSetMetaData getResultSetMetaData() throws SQLException {
+ try {
+ quarySQL = getSQL();
+ connection =
DriverManager.getConnection(druidSourceOptions.getUrl());
+ statement = connection.prepareStatement(quarySQL,
ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
+ return statement.getMetaData();
+ } catch (SQLException se) {
+ throw new SQLException("ResultSetMetaData() failed." +
se.getMessage(), se);
+ }
+ }
+
+ public void openInputFormat() {
+ try {
+ connection =
DriverManager.getConnection(druidSourceOptions.getUrl());
+ statement = connection.prepareStatement(quarySQL,
ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
+ resultSet = statement.executeQuery();
+ hasNext = resultSet.next();
+ } catch (SQLException se) {
+ throw new IllegalArgumentException("openInputFormat() failed." +
se.getMessage(), se);
+ }
+ }
+
+ private String getSQL() throws SQLException {
+ String columns = COLUMNS_DEFAULT;
+ String startTimestamp = druidSourceOptions.getStartTimestamp();
+ String endTimestamp = druidSourceOptions.getEndTimestamp();
+ String dataSource = druidSourceOptions.getDatasource();
+ if (druidSourceOptions.getColumns() != null &&
druidSourceOptions.getColumns().size() > 0) {
+ columns = String.join(",", druidSourceOptions.getColumns());
+ }
+ String sql = String.format(QUERY_TEMPLATE, columns, dataSource);
+ if (startTimestamp != null) {
+ sql += " AND __time >= '" + startTimestamp + "'";
+ }
+ if (endTimestamp != null) {
+ sql += " AND __time < '" + endTimestamp + "'";
+ }
+ return sql;
+ }
+
+ public void closeInputFormat() {
+ try {
+ if (resultSet != null) {
+ resultSet.close();
+ }
+ if (statement != null) {
+ statement.close();
+ }
+ } catch (SQLException se) {
+ LOGGER.error("DruidInputFormat Statement couldn't be closed", se);
+ } finally {
+ statement = null;
+ resultSet = null;
+ try {
+ if (connection != null) {
+ connection.close();
+ }
+ } catch (SQLException se) {
+ LOGGER.error("DruidInputFormat Connection couldn't be closed",
se);
+ } finally {
+ connection = null;
+ }
+ }
+ }
+
+ public boolean reachedEnd() throws IOException {
+ return !hasNext;
+ }
+
+ public SeaTunnelRow nextRecord() throws IOException {
+ try {
+ if (!hasNext) {
+ return null;
+ }
+ SeaTunnelRow seaTunnelRow = toInternal(resultSet, rowTypeInfo);
+ // update hasNext after we've read the record
+ hasNext = resultSet.next();
+ return seaTunnelRow;
+ } catch (SQLException se) {
+ throw new IOException("Couldn't read data - " + se.getMessage(),
se);
+ } catch (NullPointerException npe) {
+ throw new IOException("Couldn't access resultSet", npe);
+ }
+ }
+
+ public SeaTunnelRow toInternal(ResultSet rs, SeaTunnelRowType rowTypeInfo)
throws SQLException {
+ List<Object> fields = new ArrayList<>();
+ SeaTunnelDataType<?>[] seaTunnelDataTypes =
rowTypeInfo.getFieldTypes();
+
+ for (int i = 1; i <= seaTunnelDataTypes.length; i++) {
+ Object seatunnelField;
+ SeaTunnelDataType<?> seaTunnelDataType = seaTunnelDataTypes[i - 1];
+ if (null == rs.getObject(i)) {
+ seatunnelField = null;
+ } else if (BasicType.BOOLEAN_TYPE.equals(seaTunnelDataType)) {
+ seatunnelField = rs.getBoolean(i);
+ } else if (BasicType.BYTE_TYPE.equals(seaTunnelDataType)) {
+ seatunnelField = rs.getByte(i);
+ } else if (BasicType.SHORT_TYPE.equals(seaTunnelDataType)) {
+ seatunnelField = rs.getShort(i);
+ } else if (BasicType.INT_TYPE.equals(seaTunnelDataType)) {
+ seatunnelField = rs.getInt(i);
+ } else if (BasicType.LONG_TYPE.equals(seaTunnelDataType)) {
+ seatunnelField = rs.getLong(i);
+ } else if (BasicType.FLOAT_TYPE.equals(seaTunnelDataType)) {
+ seatunnelField = rs.getFloat(i);
+ } else if (BasicType.DOUBLE_TYPE.equals(seaTunnelDataType)) {
+ seatunnelField = rs.getDouble(i);
+ } else if (BasicType.STRING_TYPE.equals(seaTunnelDataType)) {
+ seatunnelField = rs.getString(i);
+ } else if
(LocalTimeType.LOCAL_DATE_TIME_TYPE.equals(seaTunnelDataType)) {
+ Timestamp ts = rs.getTimestamp(i,
Calendar.getInstance(TimeZone.getTimeZone("UTC")));
+ LocalDateTime localDateTime =
LocalDateTime.ofInstant(ts.toInstant(), ZoneId.of("UTC")); // good
+ seatunnelField = localDateTime;
+ } else if
(LocalTimeType.LOCAL_TIME_TYPE.equals(seaTunnelDataType)) {
+ seatunnelField = rs.getDate(i);
+ } else if
(LocalTimeType.LOCAL_DATE_TYPE.equals(seaTunnelDataType)) {
+ seatunnelField = rs.getDate(i);
+ } else {
Review Comment:
I see druid support `DECIMAL` from
https://druid.apache.org/docs/latest/querying/sql-data-types.html.
Can you support `DECIMAL` here?
##########
seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/seatunnel/druid/client/DruidInputFormat.java:
##########
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.druid.client;
+
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.LocalTimeType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import
org.apache.seatunnel.connectors.seatunnel.druid.config.DruidSourceOptions;
+import org.apache.seatunnel.connectors.seatunnel.druid.config.DruidTypeMapper;
+
+import lombok.Data;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.Timestamp;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.List;
+import java.util.TimeZone;
+
+@Data
+public class DruidInputFormat implements Serializable {
+ protected static final String COLUMNS_DEFAULT = "*";
+ protected static final String QUERY_TEMPLATE = "SELECT %s FROM %s WHERE
1=1";
+ private static final Logger LOGGER =
LoggerFactory.getLogger(DruidInputFormat.class);
+ protected transient Connection connection;
+ protected transient PreparedStatement statement;
+ protected transient ResultSet resultSet;
+ protected SeaTunnelRowType rowTypeInfo;
+ protected DruidSourceOptions druidSourceOptions;
+ protected String quarySQL;
+ protected boolean hasNext;
+
+ public DruidInputFormat(DruidSourceOptions druidSourceOptions) {
+ this.druidSourceOptions = druidSourceOptions;
+ this.rowTypeInfo = initTableField();
+ }
+
+ public ResultSetMetaData getResultSetMetaData() throws SQLException {
+ try {
+ quarySQL = getSQL();
+ connection =
DriverManager.getConnection(druidSourceOptions.getUrl());
+ statement = connection.prepareStatement(quarySQL,
ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
+ return statement.getMetaData();
+ } catch (SQLException se) {
+ throw new SQLException("ResultSetMetaData() failed." +
se.getMessage(), se);
+ }
+ }
+
+ public void openInputFormat() {
+ try {
+ connection =
DriverManager.getConnection(druidSourceOptions.getUrl());
+ statement = connection.prepareStatement(quarySQL,
ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
+ resultSet = statement.executeQuery();
+ hasNext = resultSet.next();
+ } catch (SQLException se) {
+ throw new IllegalArgumentException("openInputFormat() failed." +
se.getMessage(), se);
+ }
+ }
+
+ private String getSQL() throws SQLException {
+ String columns = COLUMNS_DEFAULT;
+ String startTimestamp = druidSourceOptions.getStartTimestamp();
+ String endTimestamp = druidSourceOptions.getEndTimestamp();
+ String dataSource = druidSourceOptions.getDatasource();
+ if (druidSourceOptions.getColumns() != null &&
druidSourceOptions.getColumns().size() > 0) {
+ columns = String.join(",", druidSourceOptions.getColumns());
+ }
+ String sql = String.format(QUERY_TEMPLATE, columns, dataSource);
+ if (startTimestamp != null) {
+ sql += " AND __time >= '" + startTimestamp + "'";
+ }
+ if (endTimestamp != null) {
+ sql += " AND __time < '" + endTimestamp + "'";
+ }
+ return sql;
+ }
+
+ public void closeInputFormat() {
+ try {
+ if (resultSet != null) {
+ resultSet.close();
+ }
+ if (statement != null) {
+ statement.close();
+ }
+ } catch (SQLException se) {
+ LOGGER.error("DruidInputFormat Statement couldn't be closed", se);
+ } finally {
+ statement = null;
+ resultSet = null;
+ try {
+ if (connection != null) {
+ connection.close();
+ }
+ } catch (SQLException se) {
+ LOGGER.error("DruidInputFormat Connection couldn't be closed",
se);
+ } finally {
+ connection = null;
+ }
+ }
+ }
+
+ public boolean reachedEnd() throws IOException {
+ return !hasNext;
+ }
+
+ public SeaTunnelRow nextRecord() throws IOException {
+ try {
+ if (!hasNext) {
+ return null;
+ }
+ SeaTunnelRow seaTunnelRow = toInternal(resultSet, rowTypeInfo);
+ // update hasNext after we've read the record
+ hasNext = resultSet.next();
+ return seaTunnelRow;
+ } catch (SQLException se) {
+ throw new IOException("Couldn't read data - " + se.getMessage(),
se);
+ } catch (NullPointerException npe) {
+ throw new IOException("Couldn't access resultSet", npe);
+ }
+ }
+
+ public SeaTunnelRow toInternal(ResultSet rs, SeaTunnelRowType rowTypeInfo)
throws SQLException {
+ List<Object> fields = new ArrayList<>();
+ SeaTunnelDataType<?>[] seaTunnelDataTypes =
rowTypeInfo.getFieldTypes();
+
+ for (int i = 1; i <= seaTunnelDataTypes.length; i++) {
+ Object seatunnelField;
+ SeaTunnelDataType<?> seaTunnelDataType = seaTunnelDataTypes[i - 1];
+ if (null == rs.getObject(i)) {
+ seatunnelField = null;
+ } else if (BasicType.BOOLEAN_TYPE.equals(seaTunnelDataType)) {
+ seatunnelField = rs.getBoolean(i);
+ } else if (BasicType.BYTE_TYPE.equals(seaTunnelDataType)) {
+ seatunnelField = rs.getByte(i);
+ } else if (BasicType.SHORT_TYPE.equals(seaTunnelDataType)) {
+ seatunnelField = rs.getShort(i);
+ } else if (BasicType.INT_TYPE.equals(seaTunnelDataType)) {
+ seatunnelField = rs.getInt(i);
+ } else if (BasicType.LONG_TYPE.equals(seaTunnelDataType)) {
+ seatunnelField = rs.getLong(i);
+ } else if (BasicType.FLOAT_TYPE.equals(seaTunnelDataType)) {
+ seatunnelField = rs.getFloat(i);
+ } else if (BasicType.DOUBLE_TYPE.equals(seaTunnelDataType)) {
+ seatunnelField = rs.getDouble(i);
+ } else if (BasicType.STRING_TYPE.equals(seaTunnelDataType)) {
+ seatunnelField = rs.getString(i);
+ } else if
(LocalTimeType.LOCAL_DATE_TIME_TYPE.equals(seaTunnelDataType)) {
+ Timestamp ts = rs.getTimestamp(i,
Calendar.getInstance(TimeZone.getTimeZone("UTC")));
+ LocalDateTime localDateTime =
LocalDateTime.ofInstant(ts.toInstant(), ZoneId.of("UTC")); // good
+ seatunnelField = localDateTime;
+ } else if
(LocalTimeType.LOCAL_TIME_TYPE.equals(seaTunnelDataType)) {
+ seatunnelField = rs.getDate(i);
+ } else if
(LocalTimeType.LOCAL_DATE_TYPE.equals(seaTunnelDataType)) {
+ seatunnelField = rs.getDate(i);
+ } else {
+ throw new IllegalStateException("Unexpected value: " +
seaTunnelDataType);
+ }
+
+ fields.add(seatunnelField);
+ }
+
+ return new SeaTunnelRow(fields.toArray());
+ }
+
+ private SeaTunnelRowType initTableField() {
+ ArrayList<SeaTunnelDataType<?>> seaTunnelDataTypes = new ArrayList<>();
+ ArrayList<String> fieldNames = new ArrayList<>();
Review Comment:
Same as above.
##########
seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/seatunnel/druid/client/DruidInputFormat.java:
##########
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.druid.client;
+
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.LocalTimeType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import
org.apache.seatunnel.connectors.seatunnel.druid.config.DruidSourceOptions;
+import org.apache.seatunnel.connectors.seatunnel.druid.config.DruidTypeMapper;
+
+import lombok.Data;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.Timestamp;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.List;
+import java.util.TimeZone;
+
+@Data
+public class DruidInputFormat implements Serializable {
+ protected static final String COLUMNS_DEFAULT = "*";
+ protected static final String QUERY_TEMPLATE = "SELECT %s FROM %s WHERE
1=1";
+ private static final Logger LOGGER =
LoggerFactory.getLogger(DruidInputFormat.class);
+ protected transient Connection connection;
+ protected transient PreparedStatement statement;
+ protected transient ResultSet resultSet;
+ protected SeaTunnelRowType rowTypeInfo;
+ protected DruidSourceOptions druidSourceOptions;
+ protected String quarySQL;
+ protected boolean hasNext;
+
+ public DruidInputFormat(DruidSourceOptions druidSourceOptions) {
+ this.druidSourceOptions = druidSourceOptions;
+ this.rowTypeInfo = initTableField();
+ }
+
+ public ResultSetMetaData getResultSetMetaData() throws SQLException {
+ try {
+ quarySQL = getSQL();
+ connection =
DriverManager.getConnection(druidSourceOptions.getUrl());
+ statement = connection.prepareStatement(quarySQL,
ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
+ return statement.getMetaData();
+ } catch (SQLException se) {
+ throw new SQLException("ResultSetMetaData() failed." +
se.getMessage(), se);
+ }
+ }
+
+ public void openInputFormat() {
+ try {
+ connection =
DriverManager.getConnection(druidSourceOptions.getUrl());
+ statement = connection.prepareStatement(quarySQL,
ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
+ resultSet = statement.executeQuery();
+ hasNext = resultSet.next();
+ } catch (SQLException se) {
+ throw new IllegalArgumentException("openInputFormat() failed." +
se.getMessage(), se);
+ }
+ }
+
+ private String getSQL() throws SQLException {
+ String columns = COLUMNS_DEFAULT;
+ String startTimestamp = druidSourceOptions.getStartTimestamp();
+ String endTimestamp = druidSourceOptions.getEndTimestamp();
+ String dataSource = druidSourceOptions.getDatasource();
+ if (druidSourceOptions.getColumns() != null &&
druidSourceOptions.getColumns().size() > 0) {
Review Comment:
Suggest use `CollectionUtils.isEmpty(druidSourceOptions.getColumns())`
##########
seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/seatunnel/druid/config/DruidSourceOptions.java:
##########
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.druid.config;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
+import java.io.Serializable;
+import java.util.List;
+
+@Data
+@AllArgsConstructor
+public class DruidSourceOptions implements Serializable {
+ private String url;
+ private String datasource;
+ private String startTimestamp;
+ private String endTimestamp;
+ private List<String> columns;
+
+ private String partitionColumn;
+ private Long partitionUpperBound;
+ private Long partitionLowerBound;
+ private Integer parallelism;
+
+ public DruidSourceOptions(Config pluginConfig) {
+ this.url = pluginConfig.getString(DruidSourceConfig.URL);
Review Comment:
Please check whether params exists before get it.
##########
seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/seatunnel/druid/source/DruidSource.java:
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.druid.source;
+
+import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.source.Boundedness;
+import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.constants.PluginType;
+import
org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader;
+import
org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitSource;
+import
org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext;
+import org.apache.seatunnel.connectors.seatunnel.druid.client.DruidInputFormat;
+import
org.apache.seatunnel.connectors.seatunnel.druid.config.DruidSourceOptions;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.google.auto.service.AutoService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@AutoService(SeaTunnelSource.class)
+public class DruidSource extends AbstractSingleSplitSource<SeaTunnelRow> {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(DruidSource.class);
+
+ private SeaTunnelRowType rowTypeInfo;
+ private DruidInputFormat druidInputFormat;
+ private DruidSourceOptions druidSourceOptions;
+
+ @Override
+ public String getPluginName() {
+ return "Druid";
+ }
+
+ @Override
+ public void prepare(Config pluginConfig) throws PrepareFailException {
+ LOGGER.info("Druid source prepare");
+ try {
+ druidSourceOptions = new DruidSourceOptions(pluginConfig);
+ druidInputFormat = new DruidInputFormat(druidSourceOptions);
+ this.rowTypeInfo = druidInputFormat.getRowTypeInfo();
+ } catch (Exception e) {
+ throw new PrepareFailException("Druid", PluginType.SOURCE,
e.toString());
Review Comment:
Replace `e.toString()` with `ExceptionUtils.getMessage(e)`
##########
seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/seatunnel/druid/client/DruidOutputFormat.java:
##########
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.druid.client;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONObject;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.MapperFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
+import com.fasterxml.jackson.datatype.joda.JodaModule;
+import lombok.Data;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.druid.data.input.MaxSizeSplitHintSpec;
+import org.apache.druid.data.input.impl.CsvInputFormat;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.InlineInputSource;
+import org.apache.druid.data.input.impl.TimestampSpec;
+import
org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexIOConfig;
+import
org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexIngestionSpec;
+import
org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTask;
+import
org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTuningConfig;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.segment.indexing.DataSchema;
+import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
+import org.joda.time.DateTime;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.Serializable;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+@Data
+public class DruidOutputFormat implements Serializable {
+ public static final String DEFAULT_LINE_DELIMITER = "\n";
+ public static final String DEFAULT_FIELD_DELIMITER = ",";
+ private static final Logger LOGGER =
LoggerFactory.getLogger(DruidOutputFormat.class);
+ private static final String DEFAULT_TIMESTAMP_COLUMN = "timestamp";
+ private static final String DEFAULT_TIMESTAMP_FORMAT = "auto";
+ private static final String DEFAULT_COLUMN = "__time";
+ private static final DateTime DEFAULT_TIMESTAMP_MISSING_VALUE = new
DateTime();
+ private final transient StringBuilder data;
+ private final String coordinatorURL;
+ private final String datasource;
+ private final String timestampColumn;
+ private final String timestampFormat;
+ private final DateTime timestampMissingValue;
+ private List<String> columns;
+
+ public DruidOutputFormat(String coordinatorURL,
+ String datasource,
+ String timestampColumn,
+ String timestampFormat,
+ String timestampMissingValue,
+ List<String> columns
+ ) {
+ this.data = new StringBuilder();
+ this.coordinatorURL = coordinatorURL;
+ this.datasource = datasource;
+ this.timestampColumn = timestampColumn == null ?
DEFAULT_TIMESTAMP_COLUMN : timestampColumn;
+ this.timestampFormat = timestampFormat == null ?
DEFAULT_TIMESTAMP_FORMAT : timestampFormat;
+ this.timestampMissingValue = timestampMissingValue == null ?
DEFAULT_TIMESTAMP_MISSING_VALUE : DateTimes.of(timestampMissingValue);
+ this.columns = columns;
+ }
+
+ public void write(SeaTunnelRow element) {
+ int fieldIndex = element.getArity();
+ for (int i = 0; i < fieldIndex; i++) {
+ Object v = element.getField(i);
+ if (i != 0) {
+ this.data.append(DEFAULT_FIELD_DELIMITER);
+ }
+ if (v != null) {
+ this.data.append(v);
+ }
+ }
+ this.data.append(DEFAULT_LINE_DELIMITER);
+ }
+
+ public void closeOutputFormat() {
+ try {
+ ParallelIndexIOConfig ioConfig = parallelIndexIOConfig();
+ ParallelIndexTuningConfig tuningConfig = tuningConfig();
+ ParallelIndexSupervisorTask indexTask =
parallelIndexSupervisorTask(ioConfig, tuningConfig);
+ ObjectMapper mapper = new ObjectMapper();
+ mapper.registerModule(new JodaModule());
+
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+ mapper.configure(MapperFeature.AUTO_DETECT_GETTERS, false);
+ mapper.configure(MapperFeature.AUTO_DETECT_FIELDS, false);
+ mapper.configure(MapperFeature.AUTO_DETECT_IS_GETTERS, false);
+ mapper.configure(MapperFeature.AUTO_DETECT_SETTERS, false);
+ mapper.configure(SerializationFeature.INDENT_OUTPUT, false);
+ mapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false);
+ String taskJSON = mapper.writeValueAsString(indexTask);
+ JSONObject jsonObject = JSON.parseObject(taskJSON);
+ jsonObject.remove("id");
+ jsonObject.remove("groupId");
+ jsonObject.remove("resource");
+ JSONObject spec = jsonObject.getJSONObject("spec");
+ spec.remove("tuningConfig");
+ jsonObject.put("spec", spec);
+ taskJSON = jsonObject.toJSONString();
+
+ URL url = new URL(this.coordinatorURL + "druid/indexer/v1/task");
+ HttpURLConnection con = (HttpURLConnection) url.openConnection();
+ con.setRequestMethod("POST");
+ con.setRequestProperty("Content-Type", "application/json");
+ con.setRequestProperty("Accept", "application/json, text/plain,
*/*");
+ con.setDoOutput(true);
+ try (OutputStream os = con.getOutputStream()) {
+ byte[] input = taskJSON.getBytes(StandardCharsets.UTF_8);
+ os.write(input, 0, input.length);
+ }
+ try (BufferedReader br = new BufferedReader(new
InputStreamReader(con.getInputStream(), StandardCharsets.UTF_8))) {
+ StringBuilder response = new StringBuilder();
+ String responseLine;
+ while ((responseLine = br.readLine()) != null) {
+ response.append(responseLine.trim());
+ }
+ LOGGER.info("Druid write task has been sent, and the response
is {}", response.toString());
+ }
+ } catch (IOException e) {
+ e.printStackTrace();
Review Comment:
Please don't use `e.printStackTrace()` and use logger to output exception
messages.
##########
seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/seatunnel/druid/config/DruidSinkOptions.java:
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.druid.config;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import org.joda.time.DateTime;
+
+import java.io.Serializable;
+import java.util.List;
+
+@Data
+@AllArgsConstructor
+public class DruidSinkOptions implements Serializable {
+ private static final String DEFAULT_TIMESTAMP_COLUMN = "timestamp";
+ private static final String DEFAULT_TIMESTAMP_FORMAT = "auto";
+ private static final DateTime DEFAULT_TIMESTAMP_MISSING_VALUE = null;
+ private static final int DEFAULT_PARALLELISM = 1;
+ private String coordinatorURL;
+ private String datasource;
+ private String timestampColumn;
+ private String timestampFormat;
+ private String timestampMissingValue;
+ private List<String> columns;
+ private int parallelism;
+
+ public DruidSinkOptions(Config pluginConfig) {
+ this.coordinatorURL =
pluginConfig.getString(DruidSinkConfig.COORDINATOR_URL);
Review Comment:
Please check whether params exists before get it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]