EricJoy2048 commented on code in PR #6161: URL: https://github.com/apache/seatunnel/pull/6161#discussion_r1447175010
########## seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/source/DorisSource.java: ########## @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.doris.source; + +import org.apache.seatunnel.api.configuration.ReadonlyConfig; +import org.apache.seatunnel.api.source.Boundedness; +import org.apache.seatunnel.api.source.SeaTunnelSource; +import org.apache.seatunnel.api.source.SourceReader; +import org.apache.seatunnel.api.source.SourceSplitEnumerator; +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.connectors.doris.config.DorisConfig; +import org.apache.seatunnel.connectors.doris.source.reader.DorisSourceReader; +import org.apache.seatunnel.connectors.doris.source.split.DorisSourceSplit; +import org.apache.seatunnel.connectors.doris.source.split.DorisSourceSplitEnumerator; + +import com.google.auto.service.AutoService; +import lombok.extern.slf4j.Slf4j; + +import java.util.Collections; +import java.util.List; + +@Slf4j +@AutoService(SeaTunnelSource.class) +public class DorisSource + implements SeaTunnelSource<SeaTunnelRow, DorisSourceSplit, DorisSourceState> { + + private static final long serialVersionUID = 6139826339248788618L; + private final DorisConfig config; + private final SeaTunnelRowType seaTunnelRowType; + private final CatalogTable catalogTable; + + public DorisSource( + ReadonlyConfig config, CatalogTable catalogTable, SeaTunnelRowType seaTunnelRowType) { Review Comment: We can get seaTunnelRowType from catalogTable, so can we remove seaTunnelRowType from DorisSource Constructor? ########## seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/source/serialization/RowBatch.java: ########## @@ -0,0 +1,525 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.doris.source.serialization; + +import org.apache.seatunnel.shade.org.apache.arrow.memory.RootAllocator; +import org.apache.seatunnel.shade.org.apache.arrow.vector.BigIntVector; +import org.apache.seatunnel.shade.org.apache.arrow.vector.BitVector; +import org.apache.seatunnel.shade.org.apache.arrow.vector.DecimalVector; +import org.apache.seatunnel.shade.org.apache.arrow.vector.FieldVector; +import org.apache.seatunnel.shade.org.apache.arrow.vector.FixedSizeBinaryVector; +import org.apache.seatunnel.shade.org.apache.arrow.vector.Float4Vector; +import org.apache.seatunnel.shade.org.apache.arrow.vector.Float8Vector; +import org.apache.seatunnel.shade.org.apache.arrow.vector.IntVector; +import org.apache.seatunnel.shade.org.apache.arrow.vector.SmallIntVector; +import org.apache.seatunnel.shade.org.apache.arrow.vector.TinyIntVector; +import org.apache.seatunnel.shade.org.apache.arrow.vector.VarCharVector; +import org.apache.seatunnel.shade.org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.seatunnel.shade.org.apache.arrow.vector.complex.ListVector; +import org.apache.seatunnel.shade.org.apache.arrow.vector.complex.MapVector; +import org.apache.seatunnel.shade.org.apache.arrow.vector.complex.StructVector; +import org.apache.seatunnel.shade.org.apache.arrow.vector.complex.impl.UnionMapReader; +import org.apache.seatunnel.shade.org.apache.arrow.vector.ipc.ArrowStreamReader; +import org.apache.seatunnel.shade.org.apache.arrow.vector.types.Types; +import org.apache.seatunnel.shade.org.apache.arrow.vector.types.Types.MinorType; +import org.apache.seatunnel.shade.org.apache.arrow.vector.util.Text; + +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.connectors.doris.exception.DorisConnectorErrorCode; +import org.apache.seatunnel.connectors.doris.exception.DorisConnectorException; + +import org.apache.doris.sdk.thrift.TScanBatchResult; + +import com.google.common.base.Preconditions; +import lombok.extern.slf4j.Slf4j; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.math.BigDecimal; +import java.math.BigInteger; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.function.IntFunction; + +@Slf4j +public class RowBatch { + // offset for iterate the rowBatch + private int offsetInRowBatch = 0; + private int rowCountInOneBatch = 0; + private int readRowCount = 0; + SeaTunnelDataType<?>[] fieldTypes; + private List<SeaTunnelRow> seatunnelRowBatch = new ArrayList<>(); + private final ArrowStreamReader arrowStreamReader; + private VectorSchemaRoot root; + private List<FieldVector> fieldVectors; + private RootAllocator rootAllocator; + private final String DATETIME_PATTERN = "yyyy-MM-dd HH:mm:ss"; + private final String DATETIMEV2_PATTERN = "yyyy-MM-dd HH:mm:ss.SSSSSS"; + private final DateTimeFormatter dateTimeV2Formatter = + DateTimeFormatter.ofPattern(DATETIMEV2_PATTERN); + private final DateTimeFormatter dateFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd"); + + public RowBatch(TScanBatchResult nextResult, SeaTunnelRowType seaTunnelRowType) { + this.rootAllocator = new RootAllocator(Integer.MAX_VALUE); + this.arrowStreamReader = + new ArrowStreamReader( + new ByteArrayInputStream(nextResult.getRows()), rootAllocator); + this.offsetInRowBatch = 0; + this.fieldTypes = seaTunnelRowType.getFieldTypes(); + } + + public RowBatch readArrow() { + try { + this.root = arrowStreamReader.getVectorSchemaRoot(); + while (arrowStreamReader.loadNextBatch()) { + fieldVectors = root.getFieldVectors(); + // 适配 unique 模型隐藏列 Review Comment: Please change to English comment. ########## seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/backend/BackendClient.java: ########## @@ -0,0 +1,264 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.doris.backend; + +import org.apache.seatunnel.shade.org.apache.thrift.TConfiguration; +import org.apache.seatunnel.shade.org.apache.thrift.TException; +import org.apache.seatunnel.shade.org.apache.thrift.protocol.TBinaryProtocol; +import org.apache.seatunnel.shade.org.apache.thrift.protocol.TProtocol; +import org.apache.seatunnel.shade.org.apache.thrift.transport.TSocket; +import org.apache.seatunnel.shade.org.apache.thrift.transport.TTransport; +import org.apache.seatunnel.shade.org.apache.thrift.transport.TTransportException; + +import org.apache.seatunnel.connectors.doris.config.DorisConfig; +import org.apache.seatunnel.connectors.doris.config.DorisOptions; +import org.apache.seatunnel.connectors.doris.exception.DorisConnectorErrorCode; +import org.apache.seatunnel.connectors.doris.exception.DorisConnectorException; +import org.apache.seatunnel.connectors.doris.source.serialization.Routing; +import org.apache.seatunnel.connectors.doris.util.ErrorMessages; + +import org.apache.doris.sdk.thrift.TDorisExternalService; +import org.apache.doris.sdk.thrift.TScanBatchResult; +import org.apache.doris.sdk.thrift.TScanCloseParams; +import org.apache.doris.sdk.thrift.TScanCloseResult; +import org.apache.doris.sdk.thrift.TScanNextBatchParams; +import org.apache.doris.sdk.thrift.TScanOpenParams; +import org.apache.doris.sdk.thrift.TScanOpenResult; +import org.apache.doris.sdk.thrift.TStatusCode; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class BackendClient { + + private Routing routing; + + private TDorisExternalService.Client client; + private TTransport transport; + + private boolean isConnected = false; + private final int retries; + private final int socketTimeout; + private final int connectTimeout; + + public BackendClient(Routing routing, DorisConfig readOptions) { + this.routing = routing; + this.connectTimeout = + readOptions.getRequestConnectTimeoutMs() == null + ? DorisOptions.DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT + : readOptions.getRequestConnectTimeoutMs(); + this.socketTimeout = + readOptions.getRequestReadTimeoutMs() == null + ? DorisOptions.DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT + : readOptions.getRequestReadTimeoutMs(); + this.retries = + readOptions.getRequestRetries() == null + ? DorisOptions.DORIS_REQUEST_RETRIES_DEFAULT + : readOptions.getRequestRetries(); + log.trace( + "connect timeout set to '{}'. socket timeout set to '{}'. retries set to '{}'.", + this.connectTimeout, + this.socketTimeout, + this.retries); + open(); + } + + private void open() { + log.debug("Open client to Doris BE '{}'.", routing); + TException ex = null; + for (int attempt = 0; attempt < retries; ++attempt) { + log.debug("Attempt {} to connect {}.", attempt, routing); + try { + TBinaryProtocol.Factory factory = new TBinaryProtocol.Factory(); + transport = + new TSocket( + new TConfiguration(), + routing.getHost(), + routing.getPort(), + socketTimeout, + connectTimeout); + TProtocol protocol = factory.getProtocol(transport); + client = new TDorisExternalService.Client(protocol); + log.trace( + "Connect status before open transport to {} is '{}'.", + routing, + isConnected); + if (!transport.isOpen()) { + transport.open(); + isConnected = true; + log.info("Success connect to {}.", routing); + break; + } + } catch (TTransportException e) { + log.warn(ErrorMessages.CONNECT_FAILED_MESSAGE, routing, e); + ex = e; + } + } + if (!isConnected) { + log.error(ErrorMessages.CONNECT_FAILED_MESSAGE, routing); + // throw new ConnectedFailedException(routing.toString(), ex); + throw new DorisConnectorException( + DorisConnectorErrorCode.BACKEND_CLIENT_FAILED, routing.toString(), ex); + } + } + + private void close() { + log.trace("Connect status before close with '{}' is '{}'.", routing, isConnected); + isConnected = false; + if ((transport != null) && transport.isOpen()) { + transport.close(); + log.info("Closed a connection to {}.", routing); + } + if (null != client) { + client = null; + } + } + + /** + * Open a scanner for reading Doris data. + * + * @param openParams thrift struct to required by request + * @return scan open result + * @throws DorisConnectorException throw if cannot connect to Doris BE + */ + public TScanOpenResult openScanner(TScanOpenParams openParams) { + log.debug("OpenScanner to '{}', parameter is '{}'.", routing, openParams); + if (!isConnected) { + open(); + } + TException ex = null; + for (int attempt = 0; attempt < retries; ++attempt) { + log.debug("Attempt {} to openScanner {}.", attempt, routing); + try { + TScanOpenResult result = client.openScanner(openParams); + if (result == null) { + log.warn("Open scanner result from {} is null.", routing); + continue; + } + if (!TStatusCode.OK.equals(result.getStatus().getStatusCode())) { + log.warn( + "The status of open scanner result from {} is '{}', error message is: {}.", + routing, + result.getStatus().getStatusCode(), + result.getStatus().getErrorMsgs()); + continue; + } + return result; + } catch (TException e) { + log.warn("Open scanner from {} failed.", routing, e); + ex = e; + } + } + log.error(ErrorMessages.CONNECT_FAILED_MESSAGE, routing); + // throw new ConnectedFailedException(routing.toString(), ex); + throw new DorisConnectorException( + DorisConnectorErrorCode.SCAN_BATCH_FAILED, routing.toString(), ex); + } + + /** + * get next row batch from Doris BE + * + * @param nextBatchParams thrift struct to required by request + * @return scan batch result + * @throws DorisConnectorException throw if cannot connect to Doris BE + */ + public TScanBatchResult getNext(TScanNextBatchParams nextBatchParams) { + log.debug("GetNext to '{}', parameter is '{}'.", routing, nextBatchParams); + if (!isConnected) { + open(); + } + TException ex = null; + TScanBatchResult result = null; + for (int attempt = 0; attempt < retries; ++attempt) { + log.debug("Attempt {} to getNext {}.", attempt, routing); + try { + result = client.getNext(nextBatchParams); + if (result == null) { + log.warn("GetNext result from {} is null.", routing); + continue; + } + if (!TStatusCode.OK.equals(result.getStatus().getStatusCode())) { + log.warn( + "The status of get next result from {} is '{}', error message is: {}.", + routing, + result.getStatus().getStatusCode(), + result.getStatus().getErrorMsgs()); + continue; + } + return result; + } catch (TException e) { + log.warn("Get next from {} failed.", routing, e); + ex = e; + } + } + if (result != null && (TStatusCode.OK != (result.getStatus().getStatusCode()))) { + log.error( + ErrorMessages.DORIS_INTERNAL_FAIL_MESSAGE, + routing, + result.getStatus().getStatusCode(), + result.getStatus().getErrorMsgs()); + // throw new DorisInternalException(routing.toString(), + // result.getStatus().getStatusCode(), + // result.getStatus().getErrorMsgs()); + String errMsg = + "Doris server " + + routing.toString() + + " internal failed, status code [" + + result.getStatus().getStatusCode() + + "] error message is " + + result.getStatus().getErrorMsgs(); + throw new DorisConnectorException(DorisConnectorErrorCode.SCAN_BATCH_FAILED, errMsg); + } + log.error(ErrorMessages.CONNECT_FAILED_MESSAGE, routing); Review Comment: Got it. ########## seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/source/split/DorisSourceSplit.java: ########## @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.doris.source.split; + +import org.apache.seatunnel.api.source.SourceSplit; +import org.apache.seatunnel.connectors.doris.rest.PartitionDefinition; + +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.Setter; + +import java.util.Objects; + +@AllArgsConstructor +@Getter +@Setter +public class DorisSourceSplit implements SourceSplit { + Review Comment: Please add serialVersionUID. ########## seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/resources/doris_source_and_sink.conf: ########## @@ -0,0 +1,49 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env{ + execution.parallelism = 1 + job.mode = "BATCH" +} + +source{ + Doris { + fenodes = "doris_e2e:8030" + username = root + password = "" + database = "e2e_source" + table = "doris_e2e_table" + doris.read.field = "F_ID,F_INT,F_BIGINT,F_TINYINT,F_SMALLINT,F_DECIMAL,F_LARGEINT,F_BOOLEAN,F_DOUBLE,F_FLOAT,F_CHAR,F_VARCHAR_11,F_STRING,F_DATETIME_P,F_DATETIME,F_DATE" Review Comment: Can we add test case for `doris.filter.query` ? ########## seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisIT.java: ########## @@ -0,0 +1,291 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.e2e.connector.doris; + +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.common.utils.ExceptionUtils; +import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory; +import org.apache.seatunnel.e2e.common.container.TestContainer; +import org.apache.seatunnel.e2e.common.junit.TestContainerExtension; + +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.TestTemplate; +import org.testcontainers.containers.Container; +import org.testcontainers.shaded.org.apache.commons.io.IOUtils; + +import lombok.extern.slf4j.Slf4j; + +import java.io.IOException; +import java.io.InputStream; +import java.net.URL; +import java.net.URLClassLoader; +import java.nio.charset.StandardCharsets; +import java.sql.Connection; +import java.sql.Driver; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; +import java.util.Properties; +import java.util.stream.Collectors; + +@Slf4j +public class DorisIT extends AbstractDorisIT { + private static final String TABLE = "doris_e2e_table"; + private static final String DRIVER_JAR = + "https://repo1.maven.org/maven2/com/mysql/mysql-connector-j/8.0.32/mysql-connector-j-8.0.32.jar"; + + private static final String sourceDB = "e2e_source"; + private static final String sinkDB = "e2e_sink"; + private Connection conn; + + private static final String INIT_DATA_SQL = + "insert into " + + sourceDB + + "." + + TABLE + + " (\n" + + " F_ID,\n" + + " F_INT,\n" + + " F_BIGINT,\n" + + " F_TINYINT,\n" + + " F_SMALLINT,\n" + + " F_DECIMAL,\n" + + " F_LARGEINT,\n" + + " F_BOOLEAN,\n" + + " F_DOUBLE,\n" + + " F_FLOAT,\n" + + " F_CHAR,\n" + + " F_VARCHAR_11,\n" + + " F_STRING,\n" + + " F_DATETIME_P,\n" + + " F_DATETIME,\n" + + " F_DATE\n" + + ")values(\n" + + "\t?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?\n" + + ")"; + + private final String COLUMN_STRING = + "F_ID, F_INT, F_BIGINT, F_TINYINT, F_SMALLINT, F_DECIMAL, F_LARGEINT, F_BOOLEAN, F_DOUBLE, F_FLOAT, " + + "F_CHAR, F_VARCHAR_11, F_STRING, F_DATETIME_P, F_DATETIME, F_DATE"; + + @TestContainerExtension + protected final ContainerExtendedFactory extendedFactory = + container -> { + Container.ExecResult extraCommands = + container.execInContainer( + "bash", + "-c", + "mkdir -p /tmp/seatunnel/plugins/jdbc/lib && cd /tmp/seatunnel/plugins/jdbc/lib && wget " + + DRIVER_JAR); + Assertions.assertEquals(0, extraCommands.getExitCode(), extraCommands.getStderr()); + }; + + @TestTemplate + public void testDoris(TestContainer container) throws IOException, InterruptedException { + initializeJdbcTable(); + batchInsertData(); + Container.ExecResult execResult = container.executeJob("/doris_source_and_sink.conf"); + Assertions.assertEquals(0, execResult.getExitCode()); + + try { + assertHasData(sourceDB, TABLE); + + String sourceSql = String.format("select * from %s.%s order by F_ID", sourceDB, TABLE); + String sinkSql = String.format("select * from %s.%s order by F_ID", sinkDB, TABLE); + List<String> columnList = + Arrays.stream(COLUMN_STRING.split(",")) + .map(x -> x.trim()) + .collect(Collectors.toList()); + Statement sourceStatement = + conn.createStatement( + ResultSet.TYPE_SCROLL_SENSITIVE, ResultSet.CONCUR_READ_ONLY); + Statement sinkStatement = + conn.createStatement( + ResultSet.TYPE_SCROLL_SENSITIVE, ResultSet.CONCUR_READ_ONLY); + ResultSet sourceResultSet = sourceStatement.executeQuery(sourceSql); + ResultSet sinkResultSet = sinkStatement.executeQuery(sinkSql); + Assertions.assertEquals( + sourceResultSet.getMetaData().getColumnCount(), + sinkResultSet.getMetaData().getColumnCount()); + while (sourceResultSet.next()) { + if (sinkResultSet.next()) { + for (String column : columnList) { + Object source = sourceResultSet.getObject(column); + Object sink = sinkResultSet.getObject(column); + if (!Objects.deepEquals(source, sink)) { + InputStream sourceAsciiStream = sourceResultSet.getBinaryStream(column); + InputStream sinkAsciiStream = sinkResultSet.getBinaryStream(column); + String sourceValue = + IOUtils.toString(sourceAsciiStream, StandardCharsets.UTF_8); + String sinkValue = + IOUtils.toString(sinkAsciiStream, StandardCharsets.UTF_8); + Assertions.assertEquals(sourceValue, sinkValue); + } + } + } + } + // Check the row numbers is equal + sourceResultSet.last(); + sinkResultSet.last(); + Assertions.assertEquals(sourceResultSet.getRow(), sinkResultSet.getRow()); + clearSinkTable(); + } catch (Exception e) { + throw new RuntimeException("Doris connection error", e); + } + } + + private void assertHasData(String db, String table) { + try (Statement statement = conn.createStatement()) { + String sql = String.format("select * from %s.%s limit 1", db, table); + ResultSet source = statement.executeQuery(sql); + Assertions.assertTrue(source.next()); + } catch (Exception e) { + throw new RuntimeException("test doris server image error", e); + } + } + + private void clearSinkTable() { + try (Statement statement = conn.createStatement()) { + statement.execute(String.format("TRUNCATE TABLE %s.%s", sourceDB, TABLE)); + statement.execute(String.format("TRUNCATE TABLE %s.%s", sinkDB, TABLE)); + } catch (SQLException e) { + throw new RuntimeException("test doris server image error", e); + } + } + + private void initializeJdbcTable() { + try { + URLClassLoader urlClassLoader = + new URLClassLoader( + new URL[] {new URL(DRIVER_JAR)}, DorisIT.class.getClassLoader()); + Thread.currentThread().setContextClassLoader(urlClassLoader); + Driver driver = (Driver) urlClassLoader.loadClass(DRIVER_CLASS).newInstance(); + Properties props = new Properties(); + props.put("user", USERNAME); + props.put("password", PASSWORD); + conn = driver.connect(String.format(URL, container.getHost()), props); + try (Statement statement = conn.createStatement()) { + // create test databases + statement.execute(createDatabase(sourceDB)); + statement.execute(createDatabase(sinkDB)); + log.info("create source and sink database succeed"); + // create source and sink table + statement.execute(createTableForTest(sourceDB)); + statement.execute(createTableForTest(sinkDB)); + } catch (SQLException e) { + throw new RuntimeException("Initializing table failed!", e); + } + } catch (Exception e) { + throw new RuntimeException("Initializing jdbc failed!", e); + } + } + + private String createDatabase(String db) { + return String.format("CREATE DATABASE IF NOT EXISTS %s ;", db); + } + + private String createTableForTest(String db) { + String createTableSql = Review Comment: Can we test scenarios with multiple splits? ########## seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/rest/PartitionDefinition.java: ########## @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.doris.rest; + +import java.io.Serializable; +import java.util.Collections; +import java.util.HashSet; +import java.util.Objects; +import java.util.Set; + +/** Doris partition info. */ +public class PartitionDefinition implements Serializable, Comparable<PartitionDefinition> { + private final String database; Review Comment: Please add serialVersionUID. All class which implements Serializable need add `serialVersionUID`. ########## seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/source/split/DorisSourceSplitEnumerator.java: ########## @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.doris.source.split; + +import org.apache.seatunnel.api.source.SourceSplitEnumerator; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated; +import org.apache.seatunnel.connectors.doris.config.DorisConfig; +import org.apache.seatunnel.connectors.doris.exception.DorisConnectorException; +import org.apache.seatunnel.connectors.doris.rest.PartitionDefinition; +import org.apache.seatunnel.connectors.doris.rest.RestService; +import org.apache.seatunnel.connectors.doris.source.DorisSourceState; + +import lombok.extern.slf4j.Slf4j; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; + +@Slf4j +public class DorisSourceSplitEnumerator + implements SourceSplitEnumerator<DorisSourceSplit, DorisSourceState> { + + private Context<DorisSourceSplit> context; + private DorisConfig dorisConfig; + + private volatile boolean shouldEnumerate; + + private final Map<Integer, List<DorisSourceSplit>> pendingSplit; + + private SeaTunnelRowType seaTunnelRowType; + private final Object stateLock = new Object(); + + public DorisSourceSplitEnumerator( + Context<DorisSourceSplit> context, + DorisConfig dorisConfig, + SeaTunnelRowType seaTunnelRowType) { + this(context, dorisConfig, seaTunnelRowType, null); + } + + public DorisSourceSplitEnumerator( + Context<DorisSourceSplit> context, + DorisConfig dorisConfig, + SeaTunnelRowType rowType, + DorisSourceState dorisSourceState) { + this.context = context; + this.dorisConfig = dorisConfig; + this.seaTunnelRowType = rowType; + this.pendingSplit = new ConcurrentHashMap<>(); + this.shouldEnumerate = (dorisSourceState == null); + if (dorisSourceState != null) { + this.shouldEnumerate = dorisSourceState.isShouldEnumerate(); + this.pendingSplit.putAll(dorisSourceState.getPendingSplit()); + } + } + + @Override + public void open() {} + + @Override + public void close() throws IOException {} + + @Override + public void run() { + Set<Integer> readers = context.registeredReaders(); + if (shouldEnumerate) { + List<DorisSourceSplit> dorisSourceSplits = getDorisSourceSplit(); + synchronized (stateLock) { + addPendingSplit(dorisSourceSplits); + shouldEnumerate = false; + assignSplit(readers); + } + } + + log.debug( + "No more splits to assign." + " Sending NoMoreSplitsEvent to reader {}.", readers); + readers.forEach(context::signalNoMoreSplits); + } + + @Override + public void addSplitsBack(List<DorisSourceSplit> splits, int subtaskId) { + log.debug("Add back splits {} to DorisSourceSplitEnumerator.", splits); + if (!splits.isEmpty()) { + synchronized (stateLock) { + addPendingSplit(splits); + assignSplit(Collections.singletonList(subtaskId)); + } + } + } + + @Override + public int currentUnassignedSplitSize() { + return this.pendingSplit.size(); + } + + @Override + public void handleSplitRequest(int subtaskId) { + throw new DorisConnectorException( + CommonErrorCodeDeprecated.UNSUPPORTED_OPERATION, + String.format("Unsupported handleSplitRequest: %d", subtaskId)); + } + + @Override + public void registerReader(int subtaskId) { + log.debug("Register reader {} to DorisSourceSplitEnumerator.", subtaskId); + if (!pendingSplit.isEmpty()) { + synchronized (stateLock) { + assignSplit(Collections.singletonList(subtaskId)); + } + } + } + + @Override + public DorisSourceState snapshotState(long checkpointId) { + synchronized (stateLock) { + return new DorisSourceState(shouldEnumerate, pendingSplit); + } + } + + @Override + public void notifyCheckpointComplete(long checkpointId) {} + + private List<DorisSourceSplit> getDorisSourceSplit() { + List<DorisSourceSplit> splits = new ArrayList<>(); + List<PartitionDefinition> partitions = + RestService.findPartitions(seaTunnelRowType, dorisConfig, log); + for (PartitionDefinition partition : partitions) { + splits.add(new DorisSourceSplit(partition, String.valueOf(partition.hashCode()))); + } + return splits; + } + + private void addPendingSplit(Collection<DorisSourceSplit> splits) { + int readerCount = context.currentParallelism(); + for (DorisSourceSplit split : splits) { + int ownerReader = getSplitOwner(split.splitId(), readerCount); + log.info("Assigning {} to {} reader.", split.splitId(), ownerReader); + pendingSplit.computeIfAbsent(ownerReader, f -> new ArrayList<>()).add(split); + } + } + + private static int getSplitOwner(String tp, int numReaders) { + return (tp.hashCode() & Integer.MAX_VALUE) % numReaders; + } + + private void assignSplit(Collection<Integer> readers) { + for (Integer reader : readers) { + final List<DorisSourceSplit> assignmentForReader = pendingSplit.remove(reader); + + if (assignmentForReader != null && !assignmentForReader.isEmpty()) { + + String splitsInfo = + assignmentForReader.stream() + .map(DorisSourceSplit::getSplitId) + .collect(Collectors.joining(",")); + log.info("Assign splits {} to reader {}", splitsInfo, reader); + try { + context.assignSplit(reader, assignmentForReader); + } catch (Exception e) { + log.error( + "Failed to assign splits {} to reader {}", + assignmentForReader, + reader, + e); + pendingSplit.put(reader, assignmentForReader); Review Comment: This will cause split lost. `readers.forEach(context::signalNoMoreSplits);` will be call after method `assignSplit` return. But may be some split added to `pendingSplit ` because `context.assignSplit(reader, assignmentForReader);` throw Exception. ``` public void run() { Set<Integer> readers = context.registeredReaders(); if (shouldEnumerate) { List<DorisSourceSplit> dorisSourceSplits = getDorisSourceSplit(); synchronized (stateLock) { addPendingSplit(dorisSourceSplits); shouldEnumerate = false; assignSplit(readers); } } log.debug( "No more splits to assign." + " Sending NoMoreSplitsEvent to reader {}.", readers); readers.forEach(context::signalNoMoreSplits); } ``` ########## seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/source/split/DorisSourceSplitEnumerator.java: ########## @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.doris.source.split; + +import org.apache.seatunnel.api.source.SourceSplitEnumerator; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated; +import org.apache.seatunnel.connectors.doris.config.DorisConfig; +import org.apache.seatunnel.connectors.doris.exception.DorisConnectorException; +import org.apache.seatunnel.connectors.doris.rest.PartitionDefinition; +import org.apache.seatunnel.connectors.doris.rest.RestService; +import org.apache.seatunnel.connectors.doris.source.DorisSourceState; + +import lombok.extern.slf4j.Slf4j; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; + +@Slf4j +public class DorisSourceSplitEnumerator + implements SourceSplitEnumerator<DorisSourceSplit, DorisSourceState> { + + private Context<DorisSourceSplit> context; + private DorisConfig dorisConfig; + + private volatile boolean shouldEnumerate; + + private final Map<Integer, List<DorisSourceSplit>> pendingSplit; + + private SeaTunnelRowType seaTunnelRowType; + private final Object stateLock = new Object(); + + public DorisSourceSplitEnumerator( + Context<DorisSourceSplit> context, + DorisConfig dorisConfig, + SeaTunnelRowType seaTunnelRowType) { + this(context, dorisConfig, seaTunnelRowType, null); + } + + public DorisSourceSplitEnumerator( + Context<DorisSourceSplit> context, + DorisConfig dorisConfig, + SeaTunnelRowType rowType, + DorisSourceState dorisSourceState) { + this.context = context; + this.dorisConfig = dorisConfig; + this.seaTunnelRowType = rowType; + this.pendingSplit = new ConcurrentHashMap<>(); + this.shouldEnumerate = (dorisSourceState == null); + if (dorisSourceState != null) { + this.shouldEnumerate = dorisSourceState.isShouldEnumerate(); + this.pendingSplit.putAll(dorisSourceState.getPendingSplit()); + } + } + + @Override + public void open() {} + + @Override + public void close() throws IOException {} + + @Override + public void run() { + Set<Integer> readers = context.registeredReaders(); + if (shouldEnumerate) { + List<DorisSourceSplit> dorisSourceSplits = getDorisSourceSplit(); + synchronized (stateLock) { + addPendingSplit(dorisSourceSplits); + shouldEnumerate = false; + assignSplit(readers); + } + } + + log.debug( + "No more splits to assign." + " Sending NoMoreSplitsEvent to reader {}.", readers); + readers.forEach(context::signalNoMoreSplits); + } + + @Override + public void addSplitsBack(List<DorisSourceSplit> splits, int subtaskId) { + log.debug("Add back splits {} to DorisSourceSplitEnumerator.", splits); + if (!splits.isEmpty()) { + synchronized (stateLock) { + addPendingSplit(splits); + assignSplit(Collections.singletonList(subtaskId)); Review Comment: ```suggestion if (context.registeredReaders().contains(subtaskId)) { assignSplit(Collections.singletonList(subtaskId)); } else { LOG.warn( "Reader {} is not registered. Pending splits {} are not assigned.", subtaskId, splits); } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
