TyrantLucifer commented on code in PR #2832: URL: https://github.com/apache/incubator-seatunnel/pull/2832#discussion_r1064305848
########## seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/exception/TDengineConnectorErrorCode.java: ########## @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.tdengine.exception; + +import org.apache.seatunnel.common.exception.SeaTunnelErrorCode; + +public enum TDengineConnectorErrorCode implements SeaTunnelErrorCode { + SQL_OPERATION_FAILED("TDengine-01", "execute sql failed"), Review Comment: SQL_OPERATION_FAILED --> CommonErrorCode.SQL_OPERATION_FAILED READER_FAILED --> CommonErrorCode.READER_OPERATION_FAILED WRITER_FAILED --> CommonErrorCode.WRITER_OPERATION_FAILED TYPE_MAPPER_FAILED --> CommonErrorCode.UNSUPPORTED_DATA_TYPE CONNECTION_FAILED should be removed and fall into writer or reader operation failed. ########## seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/source/TDengineSourceSplitEnumerator.java: ########## @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.tdengine.source; + +import org.apache.seatunnel.api.source.SourceEvent; +import org.apache.seatunnel.api.source.SourceSplitEnumerator; +import org.apache.seatunnel.api.source.SourceSplitEnumerator.Context; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig; +import org.apache.seatunnel.connectors.seatunnel.tdengine.exception.TDengineConnectorErrorCode; +import org.apache.seatunnel.connectors.seatunnel.tdengine.exception.TDengineConnectorException; +import org.apache.seatunnel.connectors.seatunnel.tdengine.state.TDengineSourceState; + +import com.google.common.collect.Sets; +import lombok.SneakyThrows; +import org.apache.commons.lang3.StringUtils; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Objects; +import java.util.Set; +import java.util.stream.Collectors; + +public class TDengineSourceSplitEnumerator implements SourceSplitEnumerator<TDengineSourceSplit, TDengineSourceState> { + + private final Context<TDengineSourceSplit> context; + private final TDengineSourceConfig config; + private Set<TDengineSourceSplit> pendingSplit = new HashSet<>(); + private Set<TDengineSourceSplit> assignedSplit = new HashSet<>(); + private Connection conn; + private SeaTunnelRowType seaTunnelRowType; + + public TDengineSourceSplitEnumerator(SeaTunnelRowType seaTunnelRowType, TDengineSourceConfig config, Context<TDengineSourceSplit> context) { + this(seaTunnelRowType, config, null, context); + } + + public TDengineSourceSplitEnumerator(SeaTunnelRowType seaTunnelRowType, TDengineSourceConfig config, TDengineSourceState sourceState, Context<TDengineSourceSplit> context) { + this.config = config; + this.context = context; + this.seaTunnelRowType = seaTunnelRowType; + if (sourceState != null) { + this.assignedSplit = sourceState.getAssignedSplit(); + } + } + + private static int getSplitOwner(String tp, int numReaders) { + return (tp.hashCode() & Integer.MAX_VALUE) % numReaders; + } + + @SneakyThrows + @Override + public void open() { + String jdbcUrl = StringUtils.join(config.getUrl(), config.getDatabase(), "?user=", config.getUsername(), "&password=", config.getPassword()); + conn = DriverManager.getConnection(jdbcUrl); + } + + @Override + public void run() throws SQLException { + pendingSplit = getAllSplits(); + assignSplit(context.registeredReaders()); + } + + /* + * 1. get timestampField + * 2. get all sub tables of configured super table + * 3. each split has one sub table + */ + private Set<TDengineSourceSplit> getAllSplits() throws SQLException { + final String timestampFieldName; + try (Statement statement = conn.createStatement()) { + final ResultSet fieldNameResultSet = statement.executeQuery("desc " + config.getDatabase() + "." + config.getStable()); + fieldNameResultSet.next(); + timestampFieldName = fieldNameResultSet.getString(1); + } + + final Set<TDengineSourceSplit> splits = Sets.newHashSet(); + try (Statement statement = conn.createStatement()) { + String metaSQL = "select table_name from information_schema.ins_tables where db_name = '" + config.getDatabase() + "' and stable_name='" + config.getStable() + "';"; + ResultSet subTableNameResultSet = statement.executeQuery(metaSQL); + while (subTableNameResultSet.next()) { + final String subTableName = subTableNameResultSet.getString(1); + final TDengineSourceSplit splitBySubTable = createSplitBySubTable(subTableName, timestampFieldName); + splits.add(splitBySubTable); + } + } + return splits; + } + + private TDengineSourceSplit createSplitBySubTable(String subTableName, String timestampFieldName) { + String selectFields = Arrays.stream(seaTunnelRowType.getFieldNames()).skip(1).collect(Collectors.joining(",")); + String subTableSQL = "select " + selectFields + " from " + config.getDatabase() + "." + subTableName; + String start = config.getLowerBound(); + String end = config.getUpperBound(); + if (start != null || end != null) { + String startCondition = null; + String endCondition = null; + //Left closed right away + if (start != null) { + startCondition = timestampFieldName + " >= '" + start + "'"; + } + if (end != null) { + endCondition = timestampFieldName + " < '" + end + "'"; + } + String query = StringUtils.join(new String[]{startCondition, endCondition}, " and "); + subTableSQL = subTableSQL + " where " + query; + } + + return new TDengineSourceSplit(subTableName, subTableSQL); + } + + @Override + public void addSplitsBack(List<TDengineSourceSplit> splits, int subtaskId) { + if (!splits.isEmpty()) { + pendingSplit.addAll(splits); + assignSplit(Collections.singletonList(subtaskId)); + } + } + + @Override + public int currentUnassignedSplitSize() { + return pendingSplit.size(); + } + + @Override + public void registerReader(int subtaskId) { + if (!pendingSplit.isEmpty()) { + assignSplit(Collections.singletonList(subtaskId)); + } + } + + private void assignSplit(Collection<Integer> taskIDList) { + assignedSplit = pendingSplit.stream() + .map(split -> { + int splitOwner = getSplitOwner(split.splitId(), context.currentParallelism()); + if (taskIDList.contains(splitOwner)) { + context.assignSplit(splitOwner, split); + return split; + } else { + return null; + } + }) + .filter(Objects::nonNull) + .collect(Collectors.toSet()); + pendingSplit.clear(); + } + + @Override + public TDengineSourceState snapshotState(long checkpointId) { + return new TDengineSourceState(assignedSplit); + } + + @Override + public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) { + SourceSplitEnumerator.super.handleSourceEvent(subtaskId, sourceEvent); + } + + @Override + public void notifyCheckpointComplete(long checkpointId) { + //nothing to do + } + + @Override + public void notifyCheckpointAborted(long checkpointId) throws Exception { + SourceSplitEnumerator.super.notifyCheckpointAborted(checkpointId); + } + + @Override + public void close() { + try { + if (!Objects.isNull(conn)) { + conn.close(); + } + } catch (SQLException e) { + throw new TDengineConnectorException(TDengineConnectorErrorCode.CONNECTION_FAILED, "TDengine split_enumerator connection close failed", e); Review Comment: As below ########## seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/source/TDengineSourceReader.java: ########## @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.tdengine.source; + +import org.apache.seatunnel.api.source.Boundedness; +import org.apache.seatunnel.api.source.Collector; +import org.apache.seatunnel.api.source.SourceReader; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig; +import org.apache.seatunnel.connectors.seatunnel.tdengine.exception.TDengineConnectorErrorCode; +import org.apache.seatunnel.connectors.seatunnel.tdengine.exception.TDengineConnectorException; + +import com.google.common.collect.Sets; +import com.taosdata.jdbc.TSDBDriver; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import java.sql.Statement; +import java.sql.Timestamp; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import java.util.Properties; +import java.util.Set; + +@Slf4j +public class TDengineSourceReader implements SourceReader<SeaTunnelRow, TDengineSourceSplit> { + + private static final long THREAD_WAIT_TIME = 500L; + + private final TDengineSourceConfig config; + + private final Set<TDengineSourceSplit> sourceSplits; + + private final Context context; + + private Connection conn; + + public TDengineSourceReader(TDengineSourceConfig config, SourceReader.Context readerContext) { + this.config = config; + this.sourceSplits = Sets.newHashSet(); + this.context = readerContext; + } + + @Override + public void pollNext(Collector<SeaTunnelRow> collector) throws InterruptedException { + if (sourceSplits.isEmpty()) { + Thread.sleep(THREAD_WAIT_TIME); + return; + } + synchronized (collector.getCheckpointLock()) { + sourceSplits.forEach(split -> { + try { + read(split, collector); + } catch (Exception e) { + throw new TDengineConnectorException(TDengineConnectorErrorCode.READER_FAILED, "TDengine split read error", e); + } + }); + } + + if (Boundedness.BOUNDED.equals(context.getBoundedness())) { + // signal to the source that we have reached the end of the data. + log.info("Closed the bounded TDengine source"); + context.signalNoMoreElement(); + } + } + + @Override + public void open(){ + String jdbcUrl = StringUtils.join(config.getUrl(), config.getDatabase(), "?user=", config.getUsername(), "&password=", config.getPassword()); + Properties connProps = new Properties(); + //todo: when TSDBDriver.PROPERTY_KEY_BATCH_LOAD set to "true", + // there is a exception : Caused by: java.sql.SQLException: can't create connection with server + // under docker network env + // @bobo (tdengine) + connProps.setProperty(TSDBDriver.PROPERTY_KEY_BATCH_LOAD, "false"); + try { + conn = DriverManager.getConnection(jdbcUrl, connProps); + } catch (SQLException e) { + throw new TDengineConnectorException(TDengineConnectorErrorCode.CONNECTION_FAILED, "get TDengine connection failed:" + jdbcUrl); + } + } + + @Override + public void close() { + try { + if (!Objects.isNull(conn)) { + conn.close(); + } + } catch (SQLException e) { + throw new TDengineConnectorException(TDengineConnectorErrorCode.CONNECTION_FAILED, "TDengine reader connection close failed", e); Review Comment: Use `CommonErrorCode.READER_OPERATION_FAILED` ########## seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/typemapper/TDengineTypeMapper.java: ########## @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.tdengine.typemapper; + +import org.apache.seatunnel.api.table.type.BasicType; +import org.apache.seatunnel.api.table.type.DecimalType; +import org.apache.seatunnel.api.table.type.LocalTimeType; +import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.connectors.seatunnel.tdengine.exception.TDengineConnectorErrorCode; +import org.apache.seatunnel.connectors.seatunnel.tdengine.exception.TDengineConnectorException; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class TDengineTypeMapper { + + + // ============================data types===================== + + private static final String TDENGINE_UNKNOWN = "UNKNOWN"; + private static final String TDENGINE_BIT = "BIT"; + + // -------------------------number---------------------------- + private static final String TDENGINE_TINYINT = "TINYINT"; + private static final String TDENGINE_TINYINT_UNSIGNED = "TINYINT UNSIGNED"; + private static final String TDENGINE_SMALLINT = "SMALLINT"; + private static final String TDENGINE_SMALLINT_UNSIGNED = "SMALLINT UNSIGNED"; + private static final String TDENGINE_MEDIUMINT = "MEDIUMINT"; + private static final String TDENGINE_MEDIUMINT_UNSIGNED = "MEDIUMINT UNSIGNED"; + private static final String TDENGINE_INT = "INT"; + private static final String TDENGINE_INT_UNSIGNED = "INT UNSIGNED"; + private static final String TDENGINE_INTEGER = "INTEGER"; + private static final String TDENGINE_INTEGER_UNSIGNED = "INTEGER UNSIGNED"; + private static final String TDENGINE_BIGINT = "BIGINT"; + private static final String TDENGINE_BIGINT_UNSIGNED = "BIGINT UNSIGNED"; + private static final String TDENGINE_DECIMAL = "DECIMAL"; + private static final String TDENGINE_DECIMAL_UNSIGNED = "DECIMAL UNSIGNED"; + private static final String TDENGINE_FLOAT = "FLOAT"; + private static final String TDENGINE_FLOAT_UNSIGNED = "FLOAT UNSIGNED"; + private static final String TDENGINE_DOUBLE = "DOUBLE"; + private static final String TDENGINE_DOUBLE_UNSIGNED = "DOUBLE UNSIGNED"; + + // -------------------------string---------------------------- + private static final String TDENGINE_CHAR = "CHAR"; + private static final String TDENGINE_VARCHAR = "VARCHAR"; + private static final String TDENGINE_TINYTEXT = "TINYTEXT"; + private static final String TDENGINE_MEDIUMTEXT = "MEDIUMTEXT"; + private static final String TDENGINE_TEXT = "TEXT"; + private static final String TDENGINE_LONGTEXT = "LONGTEXT"; + private static final String TDENGINE_JSON = "JSON"; + + // ------------------------------time------------------------- + private static final String TDENGINE_DATE = "DATE"; + private static final String TDENGINE_DATETIME = "DATETIME"; + private static final String TDENGINE_TIME = "TIME"; + private static final String TDENGINE_TIMESTAMP = "TIMESTAMP"; + private static final String TDENGINE_YEAR = "YEAR"; + + // ------------------------------blob------------------------- + private static final String TDENGINE_TINYBLOB = "TINYBLOB"; + private static final String TDENGINE_MEDIUMBLOB = "MEDIUMBLOB"; + private static final String TDENGINE_BLOB = "BLOB"; + private static final String TDENGINE_LONGBLOB = "LONGBLOB"; + private static final String TDENGINE_BINARY = "BINARY"; + private static final String TDENGINE_VARBINARY = "VARBINARY"; + private static final String TDENGINE_GEOMETRY = "GEOMETRY"; + + @SuppressWarnings("checkstyle:MagicNumber") + public static SeaTunnelDataType<?> mapping(String tdengineType) { + switch (tdengineType) { + case TDENGINE_BIT: + return BasicType.BOOLEAN_TYPE; + case TDENGINE_TINYINT: + case TDENGINE_TINYINT_UNSIGNED: + case TDENGINE_SMALLINT: + case TDENGINE_SMALLINT_UNSIGNED: + case TDENGINE_MEDIUMINT: + case TDENGINE_MEDIUMINT_UNSIGNED: + case TDENGINE_INT: + case TDENGINE_INTEGER: + case TDENGINE_YEAR: + return BasicType.INT_TYPE; + case TDENGINE_INT_UNSIGNED: + case TDENGINE_INTEGER_UNSIGNED: + case TDENGINE_BIGINT: + return BasicType.LONG_TYPE; + case TDENGINE_BIGINT_UNSIGNED: + return new DecimalType(20, 0); + case TDENGINE_DECIMAL: + log.warn("{} will probably cause value overflow.", TDENGINE_DECIMAL); + return new DecimalType(38, 18); + case TDENGINE_DECIMAL_UNSIGNED: + return new DecimalType(38, 18); + case TDENGINE_FLOAT: + return BasicType.FLOAT_TYPE; + case TDENGINE_FLOAT_UNSIGNED: + log.warn("{} will probably cause value overflow.", TDENGINE_FLOAT_UNSIGNED); + return BasicType.FLOAT_TYPE; + case TDENGINE_DOUBLE: + return BasicType.DOUBLE_TYPE; + case TDENGINE_DOUBLE_UNSIGNED: + log.warn("{} will probably cause value overflow.", TDENGINE_DOUBLE_UNSIGNED); + return BasicType.DOUBLE_TYPE; + case TDENGINE_CHAR: + case TDENGINE_TINYTEXT: + case TDENGINE_MEDIUMTEXT: + case TDENGINE_TEXT: + case TDENGINE_VARCHAR: + case TDENGINE_JSON: + case TDENGINE_LONGTEXT: + return BasicType.STRING_TYPE; + case TDENGINE_DATE: + return LocalTimeType.LOCAL_DATE_TYPE; + case TDENGINE_TIME: + return LocalTimeType.LOCAL_TIME_TYPE; + case TDENGINE_DATETIME: + case TDENGINE_TIMESTAMP: + return LocalTimeType.LOCAL_DATE_TIME_TYPE; + + case TDENGINE_TINYBLOB: + case TDENGINE_MEDIUMBLOB: + case TDENGINE_BLOB: + case TDENGINE_LONGBLOB: + case TDENGINE_VARBINARY: + case TDENGINE_BINARY: + return PrimitiveByteArrayType.INSTANCE; + + //Doesn't support yet + case TDENGINE_GEOMETRY: + case TDENGINE_UNKNOWN: + default: + throw new TDengineConnectorException(TDengineConnectorErrorCode.TYPE_MAPPER_FAILED, String.format( Review Comment: As below, use `CommonErrorCode.UNSUPPORTED_DATA_TYPE` ########## seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/source/TDengineSourceReader.java: ########## @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.tdengine.source; + +import org.apache.seatunnel.api.source.Boundedness; +import org.apache.seatunnel.api.source.Collector; +import org.apache.seatunnel.api.source.SourceReader; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig; +import org.apache.seatunnel.connectors.seatunnel.tdengine.exception.TDengineConnectorErrorCode; +import org.apache.seatunnel.connectors.seatunnel.tdengine.exception.TDengineConnectorException; + +import com.google.common.collect.Sets; +import com.taosdata.jdbc.TSDBDriver; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import java.sql.Statement; +import java.sql.Timestamp; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import java.util.Properties; +import java.util.Set; + +@Slf4j +public class TDengineSourceReader implements SourceReader<SeaTunnelRow, TDengineSourceSplit> { + + private static final long THREAD_WAIT_TIME = 500L; + + private final TDengineSourceConfig config; + + private final Set<TDengineSourceSplit> sourceSplits; + + private final Context context; + + private Connection conn; + + public TDengineSourceReader(TDengineSourceConfig config, SourceReader.Context readerContext) { + this.config = config; + this.sourceSplits = Sets.newHashSet(); + this.context = readerContext; + } + + @Override + public void pollNext(Collector<SeaTunnelRow> collector) throws InterruptedException { + if (sourceSplits.isEmpty()) { + Thread.sleep(THREAD_WAIT_TIME); + return; + } + synchronized (collector.getCheckpointLock()) { + sourceSplits.forEach(split -> { + try { + read(split, collector); + } catch (Exception e) { + throw new TDengineConnectorException(TDengineConnectorErrorCode.READER_FAILED, "TDengine split read error", e); Review Comment: As below ########## seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/source/TDengineSource.java: ########## @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.tdengine.source; + +import static org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig.ConfigNames.DATABASE; +import static org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig.ConfigNames.PASSWORD; +import static org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig.ConfigNames.STABLE; +import static org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig.ConfigNames.URL; +import static org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig.ConfigNames.USERNAME; +import static org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig.buildSourceConfig; + +import org.apache.seatunnel.api.common.PrepareFailException; +import org.apache.seatunnel.api.source.Boundedness; +import org.apache.seatunnel.api.source.SeaTunnelSource; +import org.apache.seatunnel.api.source.SourceReader; +import org.apache.seatunnel.api.source.SourceReader.Context; +import org.apache.seatunnel.api.source.SourceSplitEnumerator; +import org.apache.seatunnel.api.table.type.BasicType; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.common.config.CheckConfigUtil; +import org.apache.seatunnel.common.config.CheckResult; +import org.apache.seatunnel.common.constants.PluginType; +import org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig; +import org.apache.seatunnel.connectors.seatunnel.tdengine.state.TDengineSourceState; +import org.apache.seatunnel.connectors.seatunnel.tdengine.typemapper.TDengineTypeMapper; + +import org.apache.seatunnel.shade.com.typesafe.config.Config; + +import com.google.auto.service.AutoService; +import com.google.common.collect.Lists; +import lombok.SneakyThrows; +import org.apache.commons.lang3.ArrayUtils; +import org.apache.commons.lang3.StringUtils; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.Statement; +import java.util.List; + +/** + * TDengine source each split corresponds one subtable + * <p> + * TODO: wait for optimization + * 1. batch -> batch + stream + * 2. one item of data writing -> a batch of data writing + */ +@AutoService(SeaTunnelSource.class) +public class TDengineSource implements SeaTunnelSource<SeaTunnelRow, TDengineSourceSplit, TDengineSourceState> { + + private SeaTunnelRowType seaTunnelRowType; + private TDengineSourceConfig tdengineSourceConfig; + + @Override + public String getPluginName() { + return "TDengine"; + } + + @SneakyThrows + @Override + public void prepare(Config pluginConfig) throws PrepareFailException { + CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig, URL, DATABASE, STABLE, USERNAME, PASSWORD); + if (!result.isSuccess()) { + throw new PrepareFailException(getPluginName(), PluginType.SOURCE, "TDengine connection require url/database/stable/username/password. All of these must not be empty."); Review Comment: Use `SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED` ########## seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/sink/TDengineSinkWriter.java: ########## @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.tdengine.sink; + +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter; +import org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig; +import org.apache.seatunnel.connectors.seatunnel.tdengine.exception.TDengineConnectorErrorCode; +import org.apache.seatunnel.connectors.seatunnel.tdengine.exception.TDengineConnectorException; + +import org.apache.seatunnel.shade.com.typesafe.config.Config; + +import com.google.common.base.Throwables; +import com.google.common.collect.Lists; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.ArrayUtils; +import org.apache.commons.lang3.StringUtils; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.time.format.DateTimeFormatter; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Objects; + +@Slf4j +public class TDengineSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void> { + + private static final DateTimeFormatter FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS"); + private final Connection conn; + private final TDengineSourceConfig config; + private int tagsNum; + + @SneakyThrows + public TDengineSinkWriter(Config pluginConfig, SeaTunnelRowType seaTunnelRowType) { + config = TDengineSourceConfig.buildSourceConfig(pluginConfig); + String jdbcUrl = StringUtils.join(config.getUrl(), config.getDatabase(), "?user=", config.getUsername(), "&password=", config.getPassword()); + conn = DriverManager.getConnection(jdbcUrl); + try (Statement statement = conn.createStatement()) { + final ResultSet metaResultSet = statement.executeQuery("desc " + config.getDatabase() + "." + config.getStable()); + while (metaResultSet.next()) { + if (StringUtils.equals("TAG", metaResultSet.getString("note"))) { + tagsNum++; + } + } + } + } + + @SneakyThrows + @Override + @SuppressWarnings("checkstyle:RegexpSingleline") + public void write(SeaTunnelRow element) { + final ArrayList<Object> tags = Lists.newArrayList(); + for (int i = element.getArity() - tagsNum; i < element.getArity(); i++) { + tags.add(element.getField(i)); + } + final String tagValues = StringUtils.join(convertDataType(tags.toArray()), ","); + + final Object[] metrics = ArrayUtils.subarray(element.getFields(), 1, element.getArity() - tagsNum); + + try (Statement statement = conn.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)) { + String sql = String.format("INSERT INTO %s using %s tags ( %s ) VALUES ( %s );", + element.getField(0), + config.getStable(), + tagValues, + StringUtils.join(convertDataType(metrics), ",")); + final int rowCount = statement.executeUpdate(sql); + if (rowCount == 0) { + Throwables.propagateIfPossible(new TDengineConnectorException(TDengineConnectorErrorCode.SQL_OPERATION_FAILED, "insert error:" + element)); + } + } + } + + @Override + public void close() { + if (Objects.nonNull(conn)) { + try { + conn.close(); + } catch (SQLException e) { + throw new TDengineConnectorException(TDengineConnectorErrorCode.CONNECTION_FAILED, "TDengine writer connection close failed", e); Review Comment: Use `CommonErrorCode.WRITER_OPERATION_FAILED` ########## seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/source/TDengineSourceReader.java: ########## @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.tdengine.source; + +import org.apache.seatunnel.api.source.Boundedness; +import org.apache.seatunnel.api.source.Collector; +import org.apache.seatunnel.api.source.SourceReader; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig; +import org.apache.seatunnel.connectors.seatunnel.tdengine.exception.TDengineConnectorErrorCode; +import org.apache.seatunnel.connectors.seatunnel.tdengine.exception.TDengineConnectorException; + +import com.google.common.collect.Sets; +import com.taosdata.jdbc.TSDBDriver; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import java.sql.Statement; +import java.sql.Timestamp; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import java.util.Properties; +import java.util.Set; + +@Slf4j +public class TDengineSourceReader implements SourceReader<SeaTunnelRow, TDengineSourceSplit> { + + private static final long THREAD_WAIT_TIME = 500L; + + private final TDengineSourceConfig config; + + private final Set<TDengineSourceSplit> sourceSplits; + + private final Context context; + + private Connection conn; + + public TDengineSourceReader(TDengineSourceConfig config, SourceReader.Context readerContext) { + this.config = config; + this.sourceSplits = Sets.newHashSet(); + this.context = readerContext; + } + + @Override + public void pollNext(Collector<SeaTunnelRow> collector) throws InterruptedException { + if (sourceSplits.isEmpty()) { + Thread.sleep(THREAD_WAIT_TIME); + return; + } + synchronized (collector.getCheckpointLock()) { + sourceSplits.forEach(split -> { + try { + read(split, collector); + } catch (Exception e) { + throw new TDengineConnectorException(TDengineConnectorErrorCode.READER_FAILED, "TDengine split read error", e); + } + }); + } + + if (Boundedness.BOUNDED.equals(context.getBoundedness())) { + // signal to the source that we have reached the end of the data. + log.info("Closed the bounded TDengine source"); + context.signalNoMoreElement(); + } + } + + @Override + public void open(){ + String jdbcUrl = StringUtils.join(config.getUrl(), config.getDatabase(), "?user=", config.getUsername(), "&password=", config.getPassword()); + Properties connProps = new Properties(); + //todo: when TSDBDriver.PROPERTY_KEY_BATCH_LOAD set to "true", + // there is a exception : Caused by: java.sql.SQLException: can't create connection with server + // under docker network env + // @bobo (tdengine) + connProps.setProperty(TSDBDriver.PROPERTY_KEY_BATCH_LOAD, "false"); + try { + conn = DriverManager.getConnection(jdbcUrl, connProps); + } catch (SQLException e) { + throw new TDengineConnectorException(TDengineConnectorErrorCode.CONNECTION_FAILED, "get TDengine connection failed:" + jdbcUrl); Review Comment: Use `CommonErrorCode.READER_OPERATION_FAILED` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
