EricJoy2048 commented on code in PR #5086: URL: https://github.com/apache/seatunnel/pull/5086#discussion_r1371007928
########## seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/source/DorisSourceFactory.java: ########## @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.doris.source; + +import org.apache.seatunnel.api.configuration.util.OptionRule; +import org.apache.seatunnel.api.source.SeaTunnelSource; +import org.apache.seatunnel.api.table.catalog.CatalogTableUtil; +import org.apache.seatunnel.api.table.factory.Factory; +import org.apache.seatunnel.api.table.factory.TableSourceFactory; +import org.apache.seatunnel.connectors.doris.config.DorisConfig; + +import com.google.auto.service.AutoService; + +@AutoService(Factory.class) +public class DorisSourceFactory implements TableSourceFactory { + @Override + public String factoryIdentifier() { + return "Doris"; + } + + @Override + public OptionRule optionRule() { + return OptionRule.builder() Review Comment: Most of the options define in `DorisConfig` were lost here. Are those options only used in Doris Sink? If they are used in Doris Source, they should be added to OptionRule.builder() ########## seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/source/split/DorisSourceSplitEnumerator.java: ########## @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.doris.source.split; + +import org.apache.seatunnel.api.source.SourceSplitEnumerator; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.common.exception.CommonErrorCode; +import org.apache.seatunnel.connectors.doris.config.DorisConfig; +import org.apache.seatunnel.connectors.doris.exception.DorisConnectorException; +import org.apache.seatunnel.connectors.doris.rest.PartitionDefinition; +import org.apache.seatunnel.connectors.doris.rest.RestService; +import org.apache.seatunnel.connectors.doris.source.DorisSourceState; + +import lombok.extern.slf4j.Slf4j; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +@Slf4j +public class DorisSourceSplitEnumerator + implements SourceSplitEnumerator<DorisSourceSplit, DorisSourceState> { + + private Context<DorisSourceSplit> context; + private DorisConfig dorisConfig; + + private volatile boolean shouldEnumerate; + + private final Map<Integer, List<DorisSourceSplit>> pendingSplit; + + private SeaTunnelRowType seaTunnelRowType; + private final Object stateLock = new Object(); + + public DorisSourceSplitEnumerator( + Context<DorisSourceSplit> context, + DorisConfig dorisConfig, + SeaTunnelRowType seaTunnelRowType) { + this(context, dorisConfig, seaTunnelRowType, null); + } + + public DorisSourceSplitEnumerator( + Context<DorisSourceSplit> context, + DorisConfig dorisConfig, + SeaTunnelRowType rowType, + DorisSourceState dorisSourceState) { + this.context = context; + this.dorisConfig = dorisConfig; + this.seaTunnelRowType = rowType; + this.pendingSplit = new HashMap<>(); + this.shouldEnumerate = (dorisSourceState == null); + if (dorisSourceState != null) { + this.shouldEnumerate = dorisSourceState.isShouldEnumerate(); + this.pendingSplit.putAll(dorisSourceState.getPendingSplit()); + } + } + + @Override + public void open() {} + + @Override + public void close() throws IOException {} + + @Override + public void run() { + Set<Integer> readers = context.registeredReaders(); + if (shouldEnumerate) { + List<DorisSourceSplit> dorisSourceSplits = getDorisSourceSplit(); + synchronized (stateLock) { + addPendingSplit(dorisSourceSplits); + shouldEnumerate = false; + } + assignSplit(readers); Review Comment: `pendingSplit ` is not thread safe, So `assignSplit(readers);` need put in `synchronized (stateLock) ` ########## seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/source/split/DorisSourceSplitEnumerator.java: ########## @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.doris.source.split; + +import org.apache.seatunnel.api.source.SourceSplitEnumerator; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.common.exception.CommonErrorCode; +import org.apache.seatunnel.connectors.doris.config.DorisConfig; +import org.apache.seatunnel.connectors.doris.exception.DorisConnectorException; +import org.apache.seatunnel.connectors.doris.rest.PartitionDefinition; +import org.apache.seatunnel.connectors.doris.rest.RestService; +import org.apache.seatunnel.connectors.doris.source.DorisSourceState; + +import lombok.extern.slf4j.Slf4j; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +@Slf4j +public class DorisSourceSplitEnumerator + implements SourceSplitEnumerator<DorisSourceSplit, DorisSourceState> { + + private Context<DorisSourceSplit> context; + private DorisConfig dorisConfig; + + private volatile boolean shouldEnumerate; + + private final Map<Integer, List<DorisSourceSplit>> pendingSplit; + + private SeaTunnelRowType seaTunnelRowType; + private final Object stateLock = new Object(); + + public DorisSourceSplitEnumerator( + Context<DorisSourceSplit> context, + DorisConfig dorisConfig, + SeaTunnelRowType seaTunnelRowType) { + this(context, dorisConfig, seaTunnelRowType, null); + } + + public DorisSourceSplitEnumerator( + Context<DorisSourceSplit> context, + DorisConfig dorisConfig, + SeaTunnelRowType rowType, + DorisSourceState dorisSourceState) { + this.context = context; + this.dorisConfig = dorisConfig; + this.seaTunnelRowType = rowType; + this.pendingSplit = new HashMap<>(); + this.shouldEnumerate = (dorisSourceState == null); + if (dorisSourceState != null) { + this.shouldEnumerate = dorisSourceState.isShouldEnumerate(); + this.pendingSplit.putAll(dorisSourceState.getPendingSplit()); + } + } + + @Override + public void open() {} + + @Override + public void close() throws IOException {} + + @Override + public void run() { + Set<Integer> readers = context.registeredReaders(); + if (shouldEnumerate) { + List<DorisSourceSplit> dorisSourceSplits = getDorisSourceSplit(); + synchronized (stateLock) { + addPendingSplit(dorisSourceSplits); + shouldEnumerate = false; + } + assignSplit(readers); + } + + log.debug( + "No more splits to assign." + " Sending NoMoreSplitsEvent to reader {}.", readers); + readers.forEach(context::signalNoMoreSplits); + } + + @Override + public void addSplitsBack(List<DorisSourceSplit> splits, int subtaskId) { + log.debug("Add back splits {} to DorisSourceSplitEnumerator.", splits); + if (!splits.isEmpty()) { + addPendingSplit(splits); + assignSplit(Collections.singletonList(subtaskId)); + } + } + + @Override + public int currentUnassignedSplitSize() { + return this.pendingSplit.size(); + } + + @Override + public void handleSplitRequest(int subtaskId) { + throw new DorisConnectorException( + CommonErrorCode.UNSUPPORTED_OPERATION, + String.format("Unsupported handleSplitRequest: %d", subtaskId)); + } + + @Override + public void registerReader(int subtaskId) { + log.debug("Register reader {} to DorisSourceSplitEnumerator.", subtaskId); + if (!pendingSplit.isEmpty()) { Review Comment: same as above. ########## docs/en/connector-v2/source/Doris.md: ########## @@ -0,0 +1,122 @@ +# Doris + +> Doris source connector + +## Support Those Engines + +> Spark<br/> +> Flink<br/> +> SeaTunnel Zeta<br/> + +## Key features + +- [x] [batch](../../concept/connector-v2-features.md) +- [ ] [stream](../../concept/connector-v2-features.md) +- [ ] [exactly-once](../../concept/connector-v2-features.md) +- [x] [schema projection](../../concept/connector-v2-features.md) +- [x] [parallelism](../../concept/connector-v2-features.md) +- [x] [support user-defined split](../../concept/connector-v2-features.md) + +## Description + +Used to read data from Doris. +Doris Source will send a SQL to FE, FE will parse it into an execution plan, send it to BE, and BE will +directly return the data + +## Supported DataSource Info + +| Datasource | Supported versions | Driver | Url | Maven | +|------------|--------------------------------------|--------|-----|-------| +| Doris | Only Doris2.0 or later is supported. | - | - | - | + +## Database Dependency + +> Please download the support list corresponding to 'Maven' and copy it to the '$SEATNUNNEL_HOME/plugins/jdbc/lib/' +> working directory<br/> + +## Data Type Mapping + +| Doris Data type | SeaTunnel Data type | +|--------------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------| +| INT | INT | +| TINYINT | TINYINT | +| SMALLINT | SMALLINT | +| BIGINT | BIGINT | +| LARGEINT | STRING | +| BOOLEAN | BOOLEAN | +| DECIMAL | DECIMAL((Get the designated column's specified column size)+1,<br/>(Gets the designated column's number of digits to right of the decimal point.))) | +| FLOAT | FLOAT | +| DOUBLE | DOUBLE | +| CHAR<br/>VARCHAR<br/>STRING<br/>TEXT | STRING | +| DATE | DATE | +| DATETIME<br/>DATETIME(p) | TIMESTAMP | +| ARRAY | ARRAY | + +## Source Options + +| Name | Type | Required | Default | Description | +|----------------------------------|--------|----------|------------|-----------------------------------------------------------------------------------------------------| +| fenodes | string | yes | - | FE address, the format is `"fe_host:fe_http_port"` | +| username | string | yes | - | User username | +| password | string | yes | - | User password | +| table.identifier | string | yes | - | The name of Doris database and table , the format is `"databases.tablename"` | +| schema | config | yes | - | The schema of the doris that you want to generate | +| doris.filter.query | string | no | - | Data filtering in doris. the format is "field = value". | +| doris.batch.size | int | no | 1024 | The maximum value that can be obtained by reading Doris BE once. | +| doris.request.query.timeout.s | int | no | 3600 | Timeout period of Doris scan data, expressed in seconds. | +| doris.exec.mem.limit | long | no | 2147483648 | Maximum memory that can be used by a single be scan request. The default memory is 2G (2147483648). | +| doris.request.retries | int | no | 3 | Number of retries to send requests to Doris FE. | +| doris.request.read.timeout.ms | int | no | 30000 | | +| doris.request.connect.timeout.ms | int | no | 30000 | | + +### Tips + +> It is not recommended to modify advanced parameters at will + +## Task Example Review Comment: We need more examples here. Can you add an example for `doris.filter.query` ? ########## seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/source/split/DorisSourceSplitEnumerator.java: ########## @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.doris.source.split; + +import org.apache.seatunnel.api.source.SourceSplitEnumerator; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.common.exception.CommonErrorCode; +import org.apache.seatunnel.connectors.doris.config.DorisConfig; +import org.apache.seatunnel.connectors.doris.exception.DorisConnectorException; +import org.apache.seatunnel.connectors.doris.rest.PartitionDefinition; +import org.apache.seatunnel.connectors.doris.rest.RestService; +import org.apache.seatunnel.connectors.doris.source.DorisSourceState; + +import lombok.extern.slf4j.Slf4j; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +@Slf4j +public class DorisSourceSplitEnumerator + implements SourceSplitEnumerator<DorisSourceSplit, DorisSourceState> { + + private Context<DorisSourceSplit> context; + private DorisConfig dorisConfig; + + private volatile boolean shouldEnumerate; + + private final Map<Integer, List<DorisSourceSplit>> pendingSplit; + + private SeaTunnelRowType seaTunnelRowType; + private final Object stateLock = new Object(); + + public DorisSourceSplitEnumerator( + Context<DorisSourceSplit> context, + DorisConfig dorisConfig, + SeaTunnelRowType seaTunnelRowType) { + this(context, dorisConfig, seaTunnelRowType, null); + } + + public DorisSourceSplitEnumerator( + Context<DorisSourceSplit> context, + DorisConfig dorisConfig, + SeaTunnelRowType rowType, + DorisSourceState dorisSourceState) { + this.context = context; + this.dorisConfig = dorisConfig; + this.seaTunnelRowType = rowType; + this.pendingSplit = new HashMap<>(); + this.shouldEnumerate = (dorisSourceState == null); + if (dorisSourceState != null) { + this.shouldEnumerate = dorisSourceState.isShouldEnumerate(); + this.pendingSplit.putAll(dorisSourceState.getPendingSplit()); + } + } + + @Override + public void open() {} + + @Override + public void close() throws IOException {} + + @Override + public void run() { + Set<Integer> readers = context.registeredReaders(); + if (shouldEnumerate) { + List<DorisSourceSplit> dorisSourceSplits = getDorisSourceSplit(); + synchronized (stateLock) { + addPendingSplit(dorisSourceSplits); + shouldEnumerate = false; + } + assignSplit(readers); + } + + log.debug( + "No more splits to assign." + " Sending NoMoreSplitsEvent to reader {}.", readers); + readers.forEach(context::signalNoMoreSplits); + } + + @Override + public void addSplitsBack(List<DorisSourceSplit> splits, int subtaskId) { + log.debug("Add back splits {} to DorisSourceSplitEnumerator.", splits); + if (!splits.isEmpty()) { Review Comment: `pendingSplit` is not thread safe, So `assignSplit(readers) and addPendingSplit(splits)`; need put in synchronized (stateLock) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
