TaoZex commented on code in PR #3958:
URL:
https://github.com/apache/incubator-seatunnel/pull/3958#discussion_r1071205410
##########
docs/en/connector-v2/sink/SelectDB-Cloud.md:
##########
@@ -0,0 +1,120 @@
+# SelectDB Cloud
+
+> SelectDB Cloud sink connector
+
+## Description
+Used to send data to SelectDB Cloud. Both support streaming and batch mode.
+The internal implementation of SelectDB Cloud sink connector upload after
batch caching and commit the CopyInto sql to load data into the table.
+## Key features
+
+- [ ] [exactly-once](../../concept/connector-v2-features.md)
+
+## Options
+
+| name | type | required | default value |
+|---------------------|--------|----------|-----------------|
+| load-url | string | yes | - |
+| jdbc-url | string | yes | - |
+| cluster-name | string | yes | - |
+| username | string | yes | - |
+| password | string | yes | - |
+| table.identifier | string | yes | - |
+| sink.properties.* | string | yes | - |
+| sink.buffer-size | int | no | 1024*1024 (1MB) |
+| sink.buffer-count | int | no | 3 |
+| sink.max-retries | int | no | 1 |
+| sink.check-interval | int | no | 10000 |
+
+### load-url [string]
+
+`SelectDB Cloud` warehouse http address, the format is `warehouse_ip:http_port`
+
+### jdbc-url [string]
+
+`SelectDB Cloud` warehouse jdbc address, the format is
`warehouse_ip:mysql_port`
+
+### cluster-name [string]
+
+`SelectDB Cloud` cluster name
+
+### username [string]
+
+`SelectDB Cloud` user username
+
+### password [string]
+
+`SelectDB Cloud` user password
+
+### table.identifier [string]
+
+The name of `SelectDB Cloud` table, the format is `database.table`
+
+### sink.properties [string]
+
+Write property configuration
+CSV Write:
+ sink.properties.file.type='csv'
+ sink.properties.file.column_separator=','
+ sink.properties.file.line_delimiter='\n'
+JSON Write:
+ sink.properties.file.type='json'
+ sink.properties.file.strip_outer_array='false'
+
+### sink.buffer-size [string]
+
+Write data cache buffer size, unit byte. The default is 1 MB, and it is not
recommended to modify it.
+
+### sink.buffer-count [string]
+
+The number of write data cache buffers, the default is 3, it is not
recommended to modify.
+
+### sink.max-retries [string]
+
+The maximum number of retries in the Commit phase, the default is 1.
+
+### sink.check-interval [string]
+
+Periodic interval for writing files, in milliseconds, default 10 seconds.
+
+## Example
+
+Use JSON format to import data
+
+```
+sink {
+ SelectDBSink {
+ load-url="warehouse_ip:http_port"
+ jdbc-url="warehouse_ip:mysql_port"
+ cluster-name="Cluster"
+ table.identifier="test.test"
+ username="admin"
+ password="******"
+ sink.properties.file.type="json"
+ sink.properties.file.strip_outer_array="false"
+ }
+}
+```
+
+Use CSV format to import data
+
+```
+sink {
+ SelectDBSink {
+ load-url="warehouse_ip:http_port"
+ jdbc-url="warehouse_ip:mysql_port"
+ cluster-name="Cluster"
+ table.identifier="test.test"
+ username="admin"
+ password="******"
+ sink.properties.file.type='csv'
+ sink.properties.file.column_separator=','
+ sink.properties.file.line_delimiter='\n'
+ }
+}
+```
+
+## Changelog
+
+### next version
+
+- Add SelectDB Cloud Sink Connector
Review Comment:
Please add pr link.You can refer to other documents.
##########
seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connector/selectdb/sink/writer/RecordBuffer.java:
##########
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connector.selectdb.sink.writer;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingDeque;
+
+@Slf4j
+public class RecordBuffer {
+ BlockingQueue<ByteBuffer> writeQueue;
+ BlockingQueue<ByteBuffer> readQueue;
+ int bufferCapacity;
+ int queueSize;
+ ByteBuffer currentWriteBuffer;
+ ByteBuffer currentReadBuffer;
+
+ public RecordBuffer(int capacity, int queueSize) {
+ log.info("init RecordBuffer capacity {}, count {}", capacity,
queueSize);
+ checkState(capacity > 0);
+ checkState(queueSize > 1);
+ this.writeQueue = new ArrayBlockingQueue<>(queueSize);
+ for (int index = 0; index < queueSize; index++) {
+ this.writeQueue.add(ByteBuffer.allocate(capacity));
+ }
+ readQueue = new LinkedBlockingDeque<>();
+ this.bufferCapacity = capacity;
+ this.queueSize = queueSize;
+ }
+
+ public void startBufferData() {
+ log.info("start buffer data, read queue size {}, write queue size {}",
readQueue.size(), writeQueue.size());
+ checkState(readQueue.size() == 0);
+ checkState(writeQueue.size() == queueSize);
+ for (ByteBuffer byteBuffer : writeQueue) {
+ checkState(byteBuffer.position() == 0);
+ checkState(byteBuffer.remaining() == bufferCapacity);
+ }
+ }
+
+ public void stopBufferData() throws IOException {
+ try {
+ // add Empty buffer as finish flag.
+ boolean isEmpty = false;
+ if (currentWriteBuffer != null) {
+ currentWriteBuffer.flip();
+ // check if the current write buffer is empty.
+ isEmpty = currentWriteBuffer.limit() == 0;
+ readQueue.put(currentWriteBuffer);
+ currentWriteBuffer = null;
+ }
+ if (!isEmpty) {
+ ByteBuffer byteBuffer = writeQueue.take();
+ byteBuffer.flip();
+ checkState(byteBuffer.limit() == 0);
+ readQueue.put(byteBuffer);
+ }
+ } catch (Exception e) {
+ throw new IOException(e);
Review Comment:
Recommended uniform exception.
##########
seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connector/selectdb/sink/writer/SelectDBCopyInto.java:
##########
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connector.selectdb.sink.writer;
+
+import static
org.apache.seatunnel.connector.selectdb.sink.writer.LoadConstants.LINE_DELIMITER_DEFAULT;
+import static
org.apache.seatunnel.connector.selectdb.sink.writer.LoadConstants.LINE_DELIMITER_KEY;
+import static com.google.common.base.Preconditions.checkState;
+
+import org.apache.seatunnel.connector.selectdb.config.SelectDBConfig;
+import
org.apache.seatunnel.connector.selectdb.exception.SelectDBConnectorErrorCode;
+import
org.apache.seatunnel.connector.selectdb.exception.SelectDBConnectorException;
+import org.apache.seatunnel.connector.selectdb.rest.BaseResponse;
+import org.apache.seatunnel.connector.selectdb.util.HttpPutBuilder;
+import org.apache.seatunnel.connector.selectdb.util.HttpUtil;
+import org.apache.seatunnel.connector.selectdb.util.StringUtil;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.http.Header;
+import org.apache.http.HttpEntity;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.entity.InputStreamEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.util.EntityUtils;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+@Slf4j
+public class SelectDBCopyInto implements Serializable {
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+ private static final int HTTP_TEMPORARY_REDIRECT = 200;
+ private static final int HTTP_SUCCESS = 307;
+ private final LabelGenerator labelGenerator;
+ private final byte[] lineDelimiter;
+ private static final String UPLOAD_URL_PATTERN = "http://%s/copy/upload";
+
+ private String uploadUrl;
+ private String hostPort;
+ private final String user;
+ private final String passwd;
+ private final String db;
+ private final String table;
+ private final boolean enable2PC;
+ private final Properties streamLoadProp;
+ private final RecordStream recordStream;
+ private Future<CloseableHttpResponse> pendingLoadFuture;
+ private final CloseableHttpClient httpClient;
+ private final ExecutorService executorService;
+ private boolean loadBatchFirstRecord;
+ private List<String> fileList = new CopyOnWriteArrayList();
+
+ private String fileName;
+
+ public SelectDBCopyInto(SelectDBConfig selectdbConfig,
+ LabelGenerator labelGenerator,
+ CloseableHttpClient httpClient) {
+ this.hostPort = selectdbConfig.getLoadUrl();
+ String[] tableInfo = selectdbConfig.getTableIdentifier().split("\\.");
+ this.db = tableInfo[0];
+ this.table = tableInfo[1];
+ this.user = selectdbConfig.getUsername();
+ this.passwd = selectdbConfig.getPassword();
+ this.labelGenerator = labelGenerator;
+ this.uploadUrl = String.format(UPLOAD_URL_PATTERN, hostPort);
+ this.enable2PC = selectdbConfig.getEnable2PC();
+ this.streamLoadProp = selectdbConfig.getStreamLoadProps();
+ this.httpClient = httpClient;
+ this.executorService = new ThreadPoolExecutor(1, 1,
+ 0L, TimeUnit.MILLISECONDS,
+ new LinkedBlockingQueue<>(), new
ThreadFactoryBuilder().setNameFormat("file-load-upload").build());
+ this.recordStream = new RecordStream(selectdbConfig.getBufferSize(),
selectdbConfig.getBufferCount());
+ lineDelimiter = streamLoadProp.getProperty(LINE_DELIMITER_KEY,
LINE_DELIMITER_DEFAULT).getBytes();
+ loadBatchFirstRecord = true;
+ }
+
+ public String getDb() {
+ return db;
+ }
+
+ public String getHostPort() {
+ return hostPort;
+ }
+
+ public Future<CloseableHttpResponse> getPendingLoadFuture() {
+ return pendingLoadFuture;
+ }
+
+ public String getFileName() {
+ return fileName;
+ }
+
+ public List<String> getFileList() {
+ return fileList;
+ }
+
+ public void clearFileList() {
+ fileList.clear();
+ }
+
+ public void writeRecord(byte[] record) throws IOException {
+ if (loadBatchFirstRecord) {
+ loadBatchFirstRecord = false;
+ } else {
+ recordStream.write(lineDelimiter);
+ }
+ recordStream.write(record);
+ }
+
+ @VisibleForTesting
+ public RecordStream getRecordStream() {
+ return recordStream;
+ }
+
+ public BaseResponse<HashMap<String, String>>
handleResponse(CloseableHttpResponse response) throws IOException {
+ try {
+ final int statusCode = response.getStatusLine().getStatusCode();
+ if (statusCode == HTTP_TEMPORARY_REDIRECT && response.getEntity()
!= null) {
+ String loadResult = EntityUtils.toString(response.getEntity());
+ if (StringUtil.isNullOrWhitespaceOnly(loadResult)) {
+ return null;
+ }
+ log.info("response result {}", loadResult);
+ BaseResponse<HashMap<String, String>> baseResponse =
OBJECT_MAPPER.readValue(loadResult, new
TypeReference<BaseResponse<HashMap<String, String>>>() {
+ });
+ if (baseResponse.getCode() == 0) {
+ return baseResponse;
+ } else {
+ throw new RuntimeException("upload file error: " +
baseResponse.getMsg());
Review Comment:
Recommended uniform exception.
##########
seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connector/selectdb/sink/writer/SelectDBCopyInto.java:
##########
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connector.selectdb.sink.writer;
+
+import static
org.apache.seatunnel.connector.selectdb.sink.writer.LoadConstants.LINE_DELIMITER_DEFAULT;
+import static
org.apache.seatunnel.connector.selectdb.sink.writer.LoadConstants.LINE_DELIMITER_KEY;
+import static com.google.common.base.Preconditions.checkState;
+
+import org.apache.seatunnel.connector.selectdb.config.SelectDBConfig;
+import
org.apache.seatunnel.connector.selectdb.exception.SelectDBConnectorErrorCode;
+import
org.apache.seatunnel.connector.selectdb.exception.SelectDBConnectorException;
+import org.apache.seatunnel.connector.selectdb.rest.BaseResponse;
+import org.apache.seatunnel.connector.selectdb.util.HttpPutBuilder;
+import org.apache.seatunnel.connector.selectdb.util.HttpUtil;
+import org.apache.seatunnel.connector.selectdb.util.StringUtil;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.http.Header;
+import org.apache.http.HttpEntity;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.entity.InputStreamEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.util.EntityUtils;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+@Slf4j
+public class SelectDBCopyInto implements Serializable {
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+ private static final int HTTP_TEMPORARY_REDIRECT = 200;
+ private static final int HTTP_SUCCESS = 307;
+ private final LabelGenerator labelGenerator;
+ private final byte[] lineDelimiter;
+ private static final String UPLOAD_URL_PATTERN = "http://%s/copy/upload";
+
+ private String uploadUrl;
+ private String hostPort;
+ private final String user;
+ private final String passwd;
+ private final String db;
+ private final String table;
+ private final boolean enable2PC;
+ private final Properties streamLoadProp;
+ private final RecordStream recordStream;
+ private Future<CloseableHttpResponse> pendingLoadFuture;
+ private final CloseableHttpClient httpClient;
+ private final ExecutorService executorService;
+ private boolean loadBatchFirstRecord;
+ private List<String> fileList = new CopyOnWriteArrayList();
+
+ private String fileName;
+
+ public SelectDBCopyInto(SelectDBConfig selectdbConfig,
+ LabelGenerator labelGenerator,
+ CloseableHttpClient httpClient) {
+ this.hostPort = selectdbConfig.getLoadUrl();
+ String[] tableInfo = selectdbConfig.getTableIdentifier().split("\\.");
+ this.db = tableInfo[0];
+ this.table = tableInfo[1];
+ this.user = selectdbConfig.getUsername();
+ this.passwd = selectdbConfig.getPassword();
+ this.labelGenerator = labelGenerator;
+ this.uploadUrl = String.format(UPLOAD_URL_PATTERN, hostPort);
+ this.enable2PC = selectdbConfig.getEnable2PC();
+ this.streamLoadProp = selectdbConfig.getStreamLoadProps();
+ this.httpClient = httpClient;
+ this.executorService = new ThreadPoolExecutor(1, 1,
+ 0L, TimeUnit.MILLISECONDS,
+ new LinkedBlockingQueue<>(), new
ThreadFactoryBuilder().setNameFormat("file-load-upload").build());
+ this.recordStream = new RecordStream(selectdbConfig.getBufferSize(),
selectdbConfig.getBufferCount());
+ lineDelimiter = streamLoadProp.getProperty(LINE_DELIMITER_KEY,
LINE_DELIMITER_DEFAULT).getBytes();
+ loadBatchFirstRecord = true;
+ }
+
+ public String getDb() {
+ return db;
+ }
+
+ public String getHostPort() {
+ return hostPort;
+ }
+
+ public Future<CloseableHttpResponse> getPendingLoadFuture() {
+ return pendingLoadFuture;
+ }
+
+ public String getFileName() {
+ return fileName;
+ }
+
+ public List<String> getFileList() {
+ return fileList;
+ }
+
+ public void clearFileList() {
+ fileList.clear();
+ }
+
+ public void writeRecord(byte[] record) throws IOException {
+ if (loadBatchFirstRecord) {
+ loadBatchFirstRecord = false;
+ } else {
+ recordStream.write(lineDelimiter);
+ }
+ recordStream.write(record);
+ }
+
+ @VisibleForTesting
+ public RecordStream getRecordStream() {
+ return recordStream;
+ }
+
+ public BaseResponse<HashMap<String, String>>
handleResponse(CloseableHttpResponse response) throws IOException {
+ try {
+ final int statusCode = response.getStatusLine().getStatusCode();
+ if (statusCode == HTTP_TEMPORARY_REDIRECT && response.getEntity()
!= null) {
+ String loadResult = EntityUtils.toString(response.getEntity());
+ if (StringUtil.isNullOrWhitespaceOnly(loadResult)) {
+ return null;
+ }
+ log.info("response result {}", loadResult);
+ BaseResponse<HashMap<String, String>> baseResponse =
OBJECT_MAPPER.readValue(loadResult, new
TypeReference<BaseResponse<HashMap<String, String>>>() {
+ });
+ if (baseResponse.getCode() == 0) {
+ return baseResponse;
+ } else {
+ throw new RuntimeException("upload file error: " +
baseResponse.getMsg());
+ }
+ }
+ throw new RuntimeException("upload file error: " +
response.getStatusLine().toString());
+ } finally {
+ if (response != null) {
+ response.close();
+ }
+ }
+ }
+
+ public void stopLoad() throws IOException {
+ recordStream.endInput();
+ log.info("file {} write stopped.", fileName);
+ checkState(pendingLoadFuture != null);
+ try {
+ handleResponse(pendingLoadFuture.get());
+ log.info("upload file {} finished", fileName);
+ fileList.add(fileName);
+ } catch (Exception e) {
+ throw new
SelectDBConnectorException(SelectDBConnectorErrorCode.UPLOAD_FAILED, e);
+ }
+ }
+
+ public void startLoad(String fileName) throws IOException {
+ this.fileName = fileName;
+ loadBatchFirstRecord = true;
+ recordStream.startInput();
+ log.info("file write started for {}", fileName);
+ try {
+ String address = getUploadAddress(fileName);
+ log.info("redirect to s3 address:{}", address);
+ InputStreamEntity entity = new InputStreamEntity(recordStream);
+ HttpPutBuilder putBuilder = new HttpPutBuilder();
+ putBuilder.setUrl(address)
+ .addCommonHeader()
+ .setEntity(entity);
+ pendingLoadFuture = executorService.submit(() -> {
+ log.info("start execute load {}", fileName);
+ return new
HttpUtil().getHttpClient().execute(putBuilder.build());
+ });
+ } catch (Exception e) {
+ String err = "failed to write data with fileName: " + fileName;
+ log.warn(err, e);
+ throw e;
+ }
+ }
+
+ /**
+ * Get the redirected s3 address
+ */
+ public String getUploadAddress(String fileName) throws IOException {
+ HttpPutBuilder putBuilder = new HttpPutBuilder();
+ putBuilder.setUrl(uploadUrl)
+ .addFileName(fileName)
+ .addCommonHeader()
+ .setEmptyEntity()
+ .baseAuth(user, passwd);
+
+ try (CloseableHttpResponse execute =
httpClient.execute(putBuilder.build())) {
+ int statusCode = execute.getStatusLine().getStatusCode();
+ String reason = execute.getStatusLine().getReasonPhrase();
+ if (statusCode == HTTP_SUCCESS) {
+ Header location = execute.getFirstHeader("location");
+ String uploadAddress = location.getValue();
+ return uploadAddress;
+ } else {
+ HttpEntity entity = execute.getEntity();
+ String result = entity == null ? null :
EntityUtils.toString(entity);
+ log.error("Failed get the redirected address, status {},
reason {}, response {}", statusCode, reason, result);
+ throw new RuntimeException("Could not get the redirected
address.");
+ }
+ }
+ }
+
+ public void close() throws IOException {
+ if (null != httpClient) {
+ try {
+ httpClient.close();
+ } catch (IOException e) {
+ throw new IOException("Closing httpClient failed.", e);
Review Comment:
Recommended uniform exception.
##########
seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connector/selectdb/sink/committer/SelectDBCommitInfo.java:
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connector.selectdb.sink.committer;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+public class SelectDBCommitInfo implements Serializable {
+ private final String hostPort;
+ private final String clusterName;
+ private final String copySQL;
+
+ public SelectDBCommitInfo(String hostPort, String clusterName, String
copySQL) {
+ this.hostPort = hostPort;
+ this.clusterName = clusterName;
+ this.copySQL = copySQL;
+ }
+
+ public String getHostPort() {
+ return hostPort;
+ }
+
+ public String getClusterName() {
+ return clusterName;
+ }
+
+ public String getCopySQL() {
+ return copySQL;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ SelectDBCommitInfo that = (SelectDBCommitInfo) o;
+ return hostPort.equals(that.hostPort) &&
+ clusterName.equals(that.clusterName) &&
+ copySQL.equals(that.copySQL);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(hostPort, clusterName, copySQL);
+ }
+
+ @Override
+ public String toString() {
+ return "SelectDBCommittable{" +
+ "hostPort='" + hostPort + '\'' +
+ ", clusterName='" + clusterName + '\'' +
+ ", copySQL='" + copySQL + '\'' +
+ '}';
Review Comment:
It's better to use annotations.
##########
docs/en/connector-v2/sink/SelectDB-Cloud.md:
##########
@@ -0,0 +1,120 @@
+# SelectDB Cloud
+
+> SelectDB Cloud sink connector
+
+## Description
+Used to send data to SelectDB Cloud. Both support streaming and batch mode.
+The internal implementation of SelectDB Cloud sink connector upload after
batch caching and commit the CopyInto sql to load data into the table.
+## Key features
+
+- [ ] [exactly-once](../../concept/connector-v2-features.md)
+
+## Options
+
+| name | type | required | default value |
+|---------------------|--------|----------|-----------------|
+| load-url | string | yes | - |
+| jdbc-url | string | yes | - |
+| cluster-name | string | yes | - |
+| username | string | yes | - |
+| password | string | yes | - |
+| table.identifier | string | yes | - |
+| sink.properties.* | string | yes | - |
+| sink.buffer-size | int | no | 1024*1024 (1MB) |
+| sink.buffer-count | int | no | 3 |
+| sink.max-retries | int | no | 1 |
+| sink.check-interval | int | no | 10000 |
+
+### load-url [string]
+
+`SelectDB Cloud` warehouse http address, the format is `warehouse_ip:http_port`
+
+### jdbc-url [string]
+
+`SelectDB Cloud` warehouse jdbc address, the format is
`warehouse_ip:mysql_port`
+
+### cluster-name [string]
+
+`SelectDB Cloud` cluster name
+
+### username [string]
+
+`SelectDB Cloud` user username
+
+### password [string]
+
+`SelectDB Cloud` user password
+
+### table.identifier [string]
+
+The name of `SelectDB Cloud` table, the format is `database.table`
+
+### sink.properties [string]
+
+Write property configuration
+CSV Write:
+ sink.properties.file.type='csv'
+ sink.properties.file.column_separator=','
+ sink.properties.file.line_delimiter='\n'
+JSON Write:
+ sink.properties.file.type='json'
+ sink.properties.file.strip_outer_array='false'
Review Comment:
Recommended to support the config prefix, which is a unified configuration
format.
https://github.com/apache/incubator-seatunnel/pull/3856/files
##########
seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connector/selectdb/sink/committer/SelectDBCommitter.java:
##########
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connector.selectdb.sink.committer;
+
+import static
org.apache.seatunnel.connector.selectdb.sink.writer.LoadStatus.FAIL;
+import static
org.apache.seatunnel.connector.selectdb.sink.writer.LoadStatus.SUCCESS;
+
+import org.apache.seatunnel.api.sink.SinkCommitter;
+import org.apache.seatunnel.connector.selectdb.config.SelectDBConfig;
+import
org.apache.seatunnel.connector.selectdb.exception.SelectDBConnectorErrorCode;
+import
org.apache.seatunnel.connector.selectdb.exception.SelectDBConnectorException;
+import org.apache.seatunnel.connector.selectdb.rest.BaseResponse;
+import org.apache.seatunnel.connector.selectdb.rest.CopyIntoResp;
+import org.apache.seatunnel.connector.selectdb.util.HttpPostBuilder;
+import org.apache.seatunnel.connector.selectdb.util.HttpUtil;
+import org.apache.seatunnel.connector.selectdb.util.ResponseUtil;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.util.EntityUtils;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+@Slf4j
+public class SelectDBCommitter implements SinkCommitter<SelectDBCommitInfo> {
+ private static final String COMMIT_PATTERN = "http://%s/copy/query";
+ private static final int HTTP_TEMPORARY_REDIRECT = 200;
+ private final ObjectMapper objectMapper = new ObjectMapper();
+ private final CloseableHttpClient httpClient;
+ private final SelectDBConfig selectdbConfig;
+ int maxRetry;
+
+ public SelectDBCommitter(Config pluginConfig) {
+ this(SelectDBConfig.loadConfig(pluginConfig),
SelectDBConfig.loadConfig(pluginConfig).getMaxRetries(), new
HttpUtil().getHttpClient());
+ }
+
+ public SelectDBCommitter(SelectDBConfig selectdbConfig, int maxRetry,
CloseableHttpClient client) {
+ this.selectdbConfig = selectdbConfig;
+ this.maxRetry = maxRetry;
+ this.httpClient = client;
+ }
+
+ @Override
+ public List<SelectDBCommitInfo> commit(List<SelectDBCommitInfo>
commitInfos) throws IOException {
+ for (SelectDBCommitInfo committable : commitInfos) {
+ commitTransaction(committable);
+ }
+ return Collections.emptyList();
+ }
+
+ @Override
+ public void abort(List<SelectDBCommitInfo> commitInfos) throws IOException
{
+ }
+
+ private void commitTransaction(SelectDBCommitInfo committable) throws
IOException {
+ long start = System.currentTimeMillis();
+ String hostPort = committable.getHostPort();
+ String clusterName = committable.getClusterName();
+ String copySQL = committable.getCopySQL();
+ log.info("commit to cluster {} with copy sql: {}", clusterName,
copySQL);
+
+ int statusCode = -1;
+ String reasonPhrase = null;
+ int retry = 0;
+ Map<String, String> params = new HashMap<>();
+ params.put("cluster", clusterName);
+ params.put("sql", copySQL);
+ boolean success = false;
+ CloseableHttpResponse response;
+ String loadResult = "";
+ while (retry++ <= maxRetry) {
+ HttpPostBuilder postBuilder = new HttpPostBuilder();
+ postBuilder.setUrl(String.format(COMMIT_PATTERN, hostPort))
+ .baseAuth(selectdbConfig.getUsername(),
selectdbConfig.getPassword())
+ .setEntity(new
StringEntity(objectMapper.writeValueAsString(params)));
+ try {
+ response = httpClient.execute(postBuilder.build());
+ } catch (IOException e) {
+ log.error("commit error : ", e);
+ continue;
+ }
+ statusCode = response.getStatusLine().getStatusCode();
+ reasonPhrase = response.getStatusLine().getReasonPhrase();
+ if (statusCode != HTTP_TEMPORARY_REDIRECT) {
+ log.warn("commit failed with status {} {}, reason {}",
statusCode, hostPort, reasonPhrase);
+ } else if (response.getEntity() != null) {
+ loadResult = EntityUtils.toString(response.getEntity());
+ success = handleCommitResponse(loadResult);
+ if (success) {
+ log.info("commit success cost {}ms, response is {}",
System.currentTimeMillis() - start, loadResult);
+ break;
+ } else {
+ log.warn("commit failed, retry again");
+ }
+ }
+ }
+
+ if (!success) {
+ log.error("commit error with status {}, reason {}, response {}",
statusCode, reasonPhrase, loadResult);
+ throw new
SelectDBConnectorException(SelectDBConnectorErrorCode.COMMIT_FAILED,
committable.getCopySQL());
Review Comment:
It is not recommended to use both log.error and throw,and recommended to add
the contents of log.error to exception
##########
seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connector/selectdb/sink/writer/SelectDBCopyInto.java:
##########
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connector.selectdb.sink.writer;
+
+import static
org.apache.seatunnel.connector.selectdb.sink.writer.LoadConstants.LINE_DELIMITER_DEFAULT;
+import static
org.apache.seatunnel.connector.selectdb.sink.writer.LoadConstants.LINE_DELIMITER_KEY;
+import static com.google.common.base.Preconditions.checkState;
+
+import org.apache.seatunnel.connector.selectdb.config.SelectDBConfig;
+import
org.apache.seatunnel.connector.selectdb.exception.SelectDBConnectorErrorCode;
+import
org.apache.seatunnel.connector.selectdb.exception.SelectDBConnectorException;
+import org.apache.seatunnel.connector.selectdb.rest.BaseResponse;
+import org.apache.seatunnel.connector.selectdb.util.HttpPutBuilder;
+import org.apache.seatunnel.connector.selectdb.util.HttpUtil;
+import org.apache.seatunnel.connector.selectdb.util.StringUtil;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.http.Header;
+import org.apache.http.HttpEntity;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.entity.InputStreamEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.util.EntityUtils;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+@Slf4j
+public class SelectDBCopyInto implements Serializable {
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+ private static final int HTTP_TEMPORARY_REDIRECT = 200;
+ private static final int HTTP_SUCCESS = 307;
+ private final LabelGenerator labelGenerator;
+ private final byte[] lineDelimiter;
+ private static final String UPLOAD_URL_PATTERN = "http://%s/copy/upload";
+
+ private String uploadUrl;
+ private String hostPort;
+ private final String user;
+ private final String passwd;
+ private final String db;
+ private final String table;
+ private final boolean enable2PC;
+ private final Properties streamLoadProp;
+ private final RecordStream recordStream;
+ private Future<CloseableHttpResponse> pendingLoadFuture;
+ private final CloseableHttpClient httpClient;
+ private final ExecutorService executorService;
+ private boolean loadBatchFirstRecord;
+ private List<String> fileList = new CopyOnWriteArrayList();
+
+ private String fileName;
+
+ public SelectDBCopyInto(SelectDBConfig selectdbConfig,
+ LabelGenerator labelGenerator,
+ CloseableHttpClient httpClient) {
+ this.hostPort = selectdbConfig.getLoadUrl();
+ String[] tableInfo = selectdbConfig.getTableIdentifier().split("\\.");
+ this.db = tableInfo[0];
+ this.table = tableInfo[1];
+ this.user = selectdbConfig.getUsername();
+ this.passwd = selectdbConfig.getPassword();
+ this.labelGenerator = labelGenerator;
+ this.uploadUrl = String.format(UPLOAD_URL_PATTERN, hostPort);
+ this.enable2PC = selectdbConfig.getEnable2PC();
+ this.streamLoadProp = selectdbConfig.getStreamLoadProps();
+ this.httpClient = httpClient;
+ this.executorService = new ThreadPoolExecutor(1, 1,
+ 0L, TimeUnit.MILLISECONDS,
+ new LinkedBlockingQueue<>(), new
ThreadFactoryBuilder().setNameFormat("file-load-upload").build());
+ this.recordStream = new RecordStream(selectdbConfig.getBufferSize(),
selectdbConfig.getBufferCount());
+ lineDelimiter = streamLoadProp.getProperty(LINE_DELIMITER_KEY,
LINE_DELIMITER_DEFAULT).getBytes();
+ loadBatchFirstRecord = true;
+ }
+
+ public String getDb() {
+ return db;
+ }
+
+ public String getHostPort() {
+ return hostPort;
+ }
+
+ public Future<CloseableHttpResponse> getPendingLoadFuture() {
+ return pendingLoadFuture;
+ }
+
+ public String getFileName() {
+ return fileName;
+ }
+
+ public List<String> getFileList() {
+ return fileList;
+ }
+
+ public void clearFileList() {
+ fileList.clear();
+ }
+
+ public void writeRecord(byte[] record) throws IOException {
+ if (loadBatchFirstRecord) {
+ loadBatchFirstRecord = false;
+ } else {
+ recordStream.write(lineDelimiter);
+ }
+ recordStream.write(record);
+ }
+
+ @VisibleForTesting
+ public RecordStream getRecordStream() {
+ return recordStream;
+ }
+
+ public BaseResponse<HashMap<String, String>>
handleResponse(CloseableHttpResponse response) throws IOException {
+ try {
+ final int statusCode = response.getStatusLine().getStatusCode();
+ if (statusCode == HTTP_TEMPORARY_REDIRECT && response.getEntity()
!= null) {
+ String loadResult = EntityUtils.toString(response.getEntity());
+ if (StringUtil.isNullOrWhitespaceOnly(loadResult)) {
+ return null;
+ }
+ log.info("response result {}", loadResult);
+ BaseResponse<HashMap<String, String>> baseResponse =
OBJECT_MAPPER.readValue(loadResult, new
TypeReference<BaseResponse<HashMap<String, String>>>() {
+ });
+ if (baseResponse.getCode() == 0) {
+ return baseResponse;
+ } else {
+ throw new RuntimeException("upload file error: " +
baseResponse.getMsg());
+ }
+ }
+ throw new RuntimeException("upload file error: " +
response.getStatusLine().toString());
Review Comment:
Recommended uniform exception.
##########
seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connector/selectdb/sink/writer/SelectDBSinkWriter.java:
##########
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connector.selectdb.sink.writer;
+
+import static
org.apache.seatunnel.connector.selectdb.sink.writer.LoadConstants.FIELD_DELIMITER_KEY;
+import static
org.apache.seatunnel.connector.selectdb.sink.writer.LoadConstants.FORMAT_KEY;
+import static
org.apache.seatunnel.connector.selectdb.sink.writer.LoadConstants.LINE_DELIMITER_DEFAULT;
+import static
org.apache.seatunnel.connector.selectdb.sink.writer.LoadConstants.LINE_DELIMITER_KEY;
+import static com.google.common.base.Preconditions.checkState;
+
+import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
+import org.apache.seatunnel.connector.selectdb.config.SelectDBConfig;
+import
org.apache.seatunnel.connector.selectdb.exception.SelectDBConnectorException;
+import org.apache.seatunnel.connector.selectdb.serialize.SelectDBCsvSerializer;
+import
org.apache.seatunnel.connector.selectdb.serialize.SelectDBJsonSerializer;
+import org.apache.seatunnel.connector.selectdb.serialize.SelectDBSerializer;
+import
org.apache.seatunnel.connector.selectdb.sink.committer.SelectDBCommitInfo;
+import org.apache.seatunnel.connector.selectdb.util.HttpUtil;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+@Slf4j
+public class SelectDBSinkWriter implements SinkWriter<SeaTunnelRow,
SelectDBCommitInfo, SelectDBSinkState> {
+ private final SelectDBConfig selectdbConfig;
+ private final long lastCheckpointId;
+ private volatile long currentCheckpointId;
+ private SelectDBCopyInto selectdbCopyInto;
+ volatile boolean loading;
+ private final String labelPrefix;
+ private final byte[] lineDelimiter;
+ private final LabelGenerator labelGenerator;
+ private final int intervalTime;
+ private final SelectDBSinkState selectdbSinkState;
+ private final SelectDBSerializer serializer;
+ private final transient ScheduledExecutorService scheduledExecutorService;
+ private transient volatile Exception loadException = null;
+ private final AtomicInteger fileNum;
+
+ private final ArrayList<byte[]> cache = new ArrayList<>();
+ private int cacheSize = 0;
+ private int cacheCnt = 0;
+
+ private static final long MAX_CACHE_SIZE = 1024 * 1024L;
+ private static final int INITIAL_DELAY = 1000;
+ public SelectDBSinkWriter(SinkWriter.Context context,
+ List<SelectDBSinkState> state,
+ SeaTunnelRowType seaTunnelRowType,
+ Config pluginConfig) {
+ this.selectdbConfig = SelectDBConfig.loadConfig(pluginConfig);
+ this.lastCheckpointId = context.getIndexOfSubtask();
+ log.info("restore checkpointId {}", lastCheckpointId);
+ this.currentCheckpointId = lastCheckpointId;
+ log.info("labelPrefix " + selectdbConfig.getLabelPrefix());
+ this.selectdbSinkState = new
SelectDBSinkState(selectdbConfig.getLabelPrefix());
+ this.labelPrefix = selectdbConfig.getLabelPrefix() + "_" +
context.getIndexOfSubtask();
+ this.lineDelimiter =
selectdbConfig.getStreamLoadProps().getProperty(LINE_DELIMITER_KEY,
LINE_DELIMITER_DEFAULT).getBytes();
+ this.labelGenerator = new LabelGenerator(labelPrefix,
selectdbConfig.getEnable2PC());
+ this.scheduledExecutorService = new ScheduledThreadPoolExecutor(1, new
ThreadFactoryBuilder().setNameFormat("file-load-check-" +
context.getIndexOfSubtask()).build());
+ this.serializer = createSerializer(selectdbConfig, seaTunnelRowType);
+ this.intervalTime = selectdbConfig.getCheckInterval();
+ this.loading = false;
+ this.fileNum = new AtomicInteger();
+ }
+
+ public void initializeLoad(List<SelectDBSinkState> state) throws
IOException {
+ this.selectdbCopyInto = new SelectDBCopyInto(selectdbConfig,
+ labelGenerator, new HttpUtil().getHttpClient());
+ currentCheckpointId = lastCheckpointId + 1;
+ scheduledExecutorService.scheduleWithFixedDelay(this::checkDone,
INITIAL_DELAY, intervalTime, TimeUnit.MILLISECONDS);
+ serializer.open();
+ }
+
+ @Override
+ public synchronized void write(SeaTunnelRow element) throws IOException {
+ checkLoadException();
+ byte[] serialize = serializer.serialize(element);
+ if (Objects.isNull(serialize)) {
+ //schema change is null
+ return;
+ }
+ if (cacheSize > MAX_CACHE_SIZE) {
+ flush(serialize);
+ } else {
+ cacheSize += serialize.length;
+ cacheCnt++;
+ cache.add(serialize);
+ }
+ }
+
+ public synchronized void flush(byte[] serialize) throws IOException {
+ if (!loading) {
+ log.info("start load by cache full, cnt {}, size {}", cacheCnt,
cacheSize);
+ startLoad();
+ }
+ this.selectdbCopyInto.writeRecord(serialize);
+ }
+
+ @Override
+ public synchronized Optional<SelectDBCommitInfo> prepareCommit() throws
IOException {
+ checkState(selectdbCopyInto != null);
+ if (!loading) {
+ //No data was written during the entire checkpoint period
+ log.info("start load by checkpoint, cnt {} size {} ", cacheCnt,
cacheSize);
+ startLoad();
+ }
+ log.info("stop load by checkpoint");
+ stopLoad();
+ CopySQLBuilder copySQLBuilder = new CopySQLBuilder(selectdbConfig,
selectdbCopyInto.getFileList());
+ String copySql = copySQLBuilder.buildCopySQL();
+ return Optional.of(new
SelectDBCommitInfo(selectdbCopyInto.getHostPort(),
selectdbConfig.getClusterName(), copySql));
+ }
+
+ @Override
+ public synchronized List<SelectDBSinkState> snapshotState(long
checkpointId) throws IOException {
+ checkState(selectdbCopyInto != null);
+ this.currentCheckpointId = checkpointId + 1;
+
+ log.info("clear the file list {}", selectdbCopyInto.getFileList());
+ this.fileNum.set(0);
+ this.selectdbCopyInto.clearFileList();
+ return Collections.singletonList(selectdbSinkState);
+ }
+
+ @Override
+ public void abortPrepare() {
+
+ }
+
+ private synchronized void startLoad() throws IOException {
+ //If not started writing, make a streaming request
+
this.selectdbCopyInto.startLoad(labelGenerator.generateLabel(currentCheckpointId,
fileNum.getAndIncrement()));
+ if (!cache.isEmpty()) {
+ //add line delimiter
+ ByteBuffer buf = ByteBuffer.allocate(cacheSize + (cache.size() -
1) * lineDelimiter.length);
+ for (int i = 0; i < cache.size(); i++) {
+ if (i > 0) {
+ buf.put(lineDelimiter);
+ }
+ buf.put(cache.get(i));
+ }
+ this.selectdbCopyInto.writeRecord(buf.array());
+ }
+ this.loading = true;
+ }
+
+ private synchronized void stopLoad() throws IOException {
+ this.loading = false;
+ this.selectdbCopyInto.stopLoad();
+ this.cacheSize = 0;
+ this.cacheCnt = 0;
+ this.cache.clear();
+ }
+
+ private synchronized void checkDone() {
+ // s3 can't keep http long links, generate data files regularly
+ log.info("start timer checker, interval {} ms", intervalTime);
+ try {
+ if (!loading) {
+ log.info("not loading, skip timer checker");
+ return;
+ }
+ if (selectdbCopyInto.getPendingLoadFuture() != null
+ && !selectdbCopyInto.getPendingLoadFuture().isDone()) {
+ log.info("stop load by timer checker");
+ stopLoad();
+ }
+ } catch (Exception ex) {
+ log.error("upload file failed, thread exited:", ex);
+ loadException = ex;
+ }
+ }
+
+ private void checkLoadException() {
+ if (loadException != null) {
+ throw new RuntimeException("error while loading data.",
loadException);
Review Comment:
Uniform exception
##########
seatunnel-connectors-v2/connector-selectdb-cloud/src/main/java/org/apache/seatunnel/connector/selectdb/sink/writer/SelectDBCopyInto.java:
##########
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connector.selectdb.sink.writer;
+
+import static
org.apache.seatunnel.connector.selectdb.sink.writer.LoadConstants.LINE_DELIMITER_DEFAULT;
+import static
org.apache.seatunnel.connector.selectdb.sink.writer.LoadConstants.LINE_DELIMITER_KEY;
+import static com.google.common.base.Preconditions.checkState;
+
+import org.apache.seatunnel.connector.selectdb.config.SelectDBConfig;
+import
org.apache.seatunnel.connector.selectdb.exception.SelectDBConnectorErrorCode;
+import
org.apache.seatunnel.connector.selectdb.exception.SelectDBConnectorException;
+import org.apache.seatunnel.connector.selectdb.rest.BaseResponse;
+import org.apache.seatunnel.connector.selectdb.util.HttpPutBuilder;
+import org.apache.seatunnel.connector.selectdb.util.HttpUtil;
+import org.apache.seatunnel.connector.selectdb.util.StringUtil;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.http.Header;
+import org.apache.http.HttpEntity;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.entity.InputStreamEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.util.EntityUtils;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+@Slf4j
+public class SelectDBCopyInto implements Serializable {
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+ private static final int HTTP_TEMPORARY_REDIRECT = 200;
+ private static final int HTTP_SUCCESS = 307;
+ private final LabelGenerator labelGenerator;
+ private final byte[] lineDelimiter;
+ private static final String UPLOAD_URL_PATTERN = "http://%s/copy/upload";
+
+ private String uploadUrl;
+ private String hostPort;
+ private final String user;
+ private final String passwd;
+ private final String db;
+ private final String table;
+ private final boolean enable2PC;
+ private final Properties streamLoadProp;
+ private final RecordStream recordStream;
+ private Future<CloseableHttpResponse> pendingLoadFuture;
+ private final CloseableHttpClient httpClient;
+ private final ExecutorService executorService;
+ private boolean loadBatchFirstRecord;
+ private List<String> fileList = new CopyOnWriteArrayList();
+
+ private String fileName;
+
+ public SelectDBCopyInto(SelectDBConfig selectdbConfig,
+ LabelGenerator labelGenerator,
+ CloseableHttpClient httpClient) {
+ this.hostPort = selectdbConfig.getLoadUrl();
+ String[] tableInfo = selectdbConfig.getTableIdentifier().split("\\.");
+ this.db = tableInfo[0];
+ this.table = tableInfo[1];
+ this.user = selectdbConfig.getUsername();
+ this.passwd = selectdbConfig.getPassword();
+ this.labelGenerator = labelGenerator;
+ this.uploadUrl = String.format(UPLOAD_URL_PATTERN, hostPort);
+ this.enable2PC = selectdbConfig.getEnable2PC();
+ this.streamLoadProp = selectdbConfig.getStreamLoadProps();
+ this.httpClient = httpClient;
+ this.executorService = new ThreadPoolExecutor(1, 1,
+ 0L, TimeUnit.MILLISECONDS,
+ new LinkedBlockingQueue<>(), new
ThreadFactoryBuilder().setNameFormat("file-load-upload").build());
+ this.recordStream = new RecordStream(selectdbConfig.getBufferSize(),
selectdbConfig.getBufferCount());
+ lineDelimiter = streamLoadProp.getProperty(LINE_DELIMITER_KEY,
LINE_DELIMITER_DEFAULT).getBytes();
+ loadBatchFirstRecord = true;
+ }
+
+ public String getDb() {
+ return db;
+ }
+
+ public String getHostPort() {
+ return hostPort;
+ }
+
+ public Future<CloseableHttpResponse> getPendingLoadFuture() {
+ return pendingLoadFuture;
+ }
+
+ public String getFileName() {
+ return fileName;
+ }
+
+ public List<String> getFileList() {
+ return fileList;
+ }
+
+ public void clearFileList() {
+ fileList.clear();
+ }
+
+ public void writeRecord(byte[] record) throws IOException {
+ if (loadBatchFirstRecord) {
+ loadBatchFirstRecord = false;
+ } else {
+ recordStream.write(lineDelimiter);
+ }
+ recordStream.write(record);
+ }
+
+ @VisibleForTesting
+ public RecordStream getRecordStream() {
+ return recordStream;
+ }
+
+ public BaseResponse<HashMap<String, String>>
handleResponse(CloseableHttpResponse response) throws IOException {
+ try {
+ final int statusCode = response.getStatusLine().getStatusCode();
+ if (statusCode == HTTP_TEMPORARY_REDIRECT && response.getEntity()
!= null) {
+ String loadResult = EntityUtils.toString(response.getEntity());
+ if (StringUtil.isNullOrWhitespaceOnly(loadResult)) {
+ return null;
+ }
+ log.info("response result {}", loadResult);
+ BaseResponse<HashMap<String, String>> baseResponse =
OBJECT_MAPPER.readValue(loadResult, new
TypeReference<BaseResponse<HashMap<String, String>>>() {
+ });
+ if (baseResponse.getCode() == 0) {
+ return baseResponse;
+ } else {
+ throw new RuntimeException("upload file error: " +
baseResponse.getMsg());
+ }
+ }
+ throw new RuntimeException("upload file error: " +
response.getStatusLine().toString());
+ } finally {
+ if (response != null) {
+ response.close();
+ }
+ }
+ }
+
+ public void stopLoad() throws IOException {
+ recordStream.endInput();
+ log.info("file {} write stopped.", fileName);
+ checkState(pendingLoadFuture != null);
+ try {
+ handleResponse(pendingLoadFuture.get());
+ log.info("upload file {} finished", fileName);
+ fileList.add(fileName);
+ } catch (Exception e) {
+ throw new
SelectDBConnectorException(SelectDBConnectorErrorCode.UPLOAD_FAILED, e);
+ }
+ }
+
+ public void startLoad(String fileName) throws IOException {
+ this.fileName = fileName;
+ loadBatchFirstRecord = true;
+ recordStream.startInput();
+ log.info("file write started for {}", fileName);
+ try {
+ String address = getUploadAddress(fileName);
+ log.info("redirect to s3 address:{}", address);
+ InputStreamEntity entity = new InputStreamEntity(recordStream);
+ HttpPutBuilder putBuilder = new HttpPutBuilder();
+ putBuilder.setUrl(address)
+ .addCommonHeader()
+ .setEntity(entity);
+ pendingLoadFuture = executorService.submit(() -> {
+ log.info("start execute load {}", fileName);
+ return new
HttpUtil().getHttpClient().execute(putBuilder.build());
+ });
+ } catch (Exception e) {
+ String err = "failed to write data with fileName: " + fileName;
+ log.warn(err, e);
+ throw e;
+ }
+ }
+
+ /**
+ * Get the redirected s3 address
+ */
+ public String getUploadAddress(String fileName) throws IOException {
+ HttpPutBuilder putBuilder = new HttpPutBuilder();
+ putBuilder.setUrl(uploadUrl)
+ .addFileName(fileName)
+ .addCommonHeader()
+ .setEmptyEntity()
+ .baseAuth(user, passwd);
+
+ try (CloseableHttpResponse execute =
httpClient.execute(putBuilder.build())) {
+ int statusCode = execute.getStatusLine().getStatusCode();
+ String reason = execute.getStatusLine().getReasonPhrase();
+ if (statusCode == HTTP_SUCCESS) {
+ Header location = execute.getFirstHeader("location");
+ String uploadAddress = location.getValue();
+ return uploadAddress;
+ } else {
+ HttpEntity entity = execute.getEntity();
+ String result = entity == null ? null :
EntityUtils.toString(entity);
+ log.error("Failed get the redirected address, status {},
reason {}, response {}", statusCode, reason, result);
+ throw new RuntimeException("Could not get the redirected
address.");
Review Comment:
Same as above.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]