morningman commented on code in PR #16073:
URL: https://github.com/apache/doris/pull/16073#discussion_r1101018493
##########
fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MysqlLoadManager.java:
##########
@@ -43,55 +59,297 @@
import org.apache.http.util.EntityUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
+import org.apache.thrift.TException;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
-import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
-import java.util.Base64;
import java.util.List;
import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
public class MysqlLoadManager {
private static final Logger LOG =
LogManager.getLogger(MysqlLoadManager.class);
private final ThreadPoolExecutor mysqlLoadPool;
+ private final ConcurrentHashMap<String, Long> mysqlAuthTokens = new
ConcurrentHashMap<>();
+ private final ScheduledExecutorService authCleaner;
public MysqlLoadManager() {
this.mysqlLoadPool = ThreadPoolManager.newDaemonCacheThreadPool(4,
"Mysql Load", true);
+ this.authCleaner = Executors.newScheduledThreadPool(1);
+ this.authCleaner.scheduleAtFixedRate(() -> {
+ synchronized (mysqlAuthTokens) {
+ for (String key : mysqlAuthTokens.keySet()) {
+ if (System.currentTimeMillis() - mysqlAuthTokens.get(key)
>= 3600 * 1000) {
Review Comment:
If the load time is longer than 1 hour, the load will fail because the auth
code is expired?
##########
fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java:
##########
@@ -98,6 +98,7 @@ public class DataDescription {
private final String tableName;
private String dbName;
+ private String fullDatabaseName;
Review Comment:
what is different between `dbName` and `fullDatabaseName`?
Add comment to explain, or I think we can merge them into one
##########
fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MysqlLoadManager.java:
##########
@@ -43,55 +59,297 @@
import org.apache.http.util.EntityUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
+import org.apache.thrift.TException;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
-import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
-import java.util.Base64;
import java.util.List;
import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
public class MysqlLoadManager {
private static final Logger LOG =
LogManager.getLogger(MysqlLoadManager.class);
private final ThreadPoolExecutor mysqlLoadPool;
+ private final ConcurrentHashMap<String, Long> mysqlAuthTokens = new
ConcurrentHashMap<>();
+ private final ScheduledExecutorService authCleaner;
public MysqlLoadManager() {
this.mysqlLoadPool = ThreadPoolManager.newDaemonCacheThreadPool(4,
"Mysql Load", true);
+ this.authCleaner = Executors.newScheduledThreadPool(1);
+ this.authCleaner.scheduleAtFixedRate(() -> {
+ synchronized (mysqlAuthTokens) {
+ for (String key : mysqlAuthTokens.keySet()) {
+ if (System.currentTimeMillis() - mysqlAuthTokens.get(key)
>= 3600 * 1000) {
+ mysqlAuthTokens.remove(key);
+ }
+ }
+ }
+ }, 1, 1, TimeUnit.DAYS);
+ }
+
+ // this method only will be called in master node, since stream load only
send message to master.
+ public boolean checkAuthToken(String token) {
+ return mysqlAuthTokens.containsKey(token);
+ }
+
+ // context only use in no master branch.
+ public String acquireToken(ConnectContext context) {
+ if (Env.getCurrentEnv().isMaster()) {
+ String token = UUID.randomUUID().toString();
+ long createTime = System.currentTimeMillis();
+ mysqlAuthTokens.put(token, createTime);
+ return token;
+ } else {
+ MasterTxnExecutor masterTxnExecutor = new
MasterTxnExecutor(context);
+ try {
+ return masterTxnExecutor.acquireToken();
+ } catch (TException e) {
+ LOG.error("acquire token error", e);
Review Comment:
```suggestion
LOG.warn("acquire token error", e);
```
##########
fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MysqlLoadManager.java:
##########
@@ -43,55 +59,297 @@
import org.apache.http.util.EntityUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
+import org.apache.thrift.TException;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
-import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
-import java.util.Base64;
import java.util.List;
import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
public class MysqlLoadManager {
private static final Logger LOG =
LogManager.getLogger(MysqlLoadManager.class);
private final ThreadPoolExecutor mysqlLoadPool;
+ private final ConcurrentHashMap<String, Long> mysqlAuthTokens = new
ConcurrentHashMap<>();
+ private final ScheduledExecutorService authCleaner;
public MysqlLoadManager() {
this.mysqlLoadPool = ThreadPoolManager.newDaemonCacheThreadPool(4,
"Mysql Load", true);
+ this.authCleaner = Executors.newScheduledThreadPool(1);
+ this.authCleaner.scheduleAtFixedRate(() -> {
+ synchronized (mysqlAuthTokens) {
+ for (String key : mysqlAuthTokens.keySet()) {
+ if (System.currentTimeMillis() - mysqlAuthTokens.get(key)
>= 3600 * 1000) {
+ mysqlAuthTokens.remove(key);
+ }
+ }
+ }
+ }, 1, 1, TimeUnit.DAYS);
+ }
+
+ // this method only will be called in master node, since stream load only
send message to master.
+ public boolean checkAuthToken(String token) {
+ return mysqlAuthTokens.containsKey(token);
+ }
+
+ // context only use in no master branch.
+ public String acquireToken(ConnectContext context) {
+ if (Env.getCurrentEnv().isMaster()) {
+ String token = UUID.randomUUID().toString();
+ long createTime = System.currentTimeMillis();
+ mysqlAuthTokens.put(token, createTime);
Review Comment:
If we generate token for every load job, there may be too many tokens in
memeory(in high concurrency load scene).
Maybe we can make it simple:
1. On master, there is a scheduled timer thread and a FIFO fixed size queue.
2. Every 12 hour(larger interval to make long-run job happy), the timer will
generate a token and push it to the queue. And because this is a fixed size
queue, the oldest token will be removed automatically.
3. When calling `acquireToken`, it will always return the token at the end
of the queue(the latest token)
4. When calling `checkToken`, it can simple check if the token exist in the
queue.
Therefore, we only need to keep several tokens, and these tokens can be used
by all load jobs.
And no need to save the timestamp, because the expired token will be removed
automatically, we only need
to control the interval and queue size.
And no need to call `releaseToken()` manually
##########
fe/fe-core/src/main/java/org/apache/doris/qe/StreamLoadTxnExecutor.java:
##########
@@ -42,20 +42,31 @@
import org.apache.thrift.TException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-public class InsertStreamTxnExecutor {
+public class StreamLoadTxnExecutor {
private long txnId;
private TUniqueId loadId;
- private TransactionEntry txnEntry;
+ private final TransactionEntry txnEntry;
+ private final TFileFormatType formatType;
- public InsertStreamTxnExecutor(TransactionEntry txnEntry) {
- this.txnEntry = txnEntry;
+ private final TFileCompressType compressType;
+
+ public StreamLoadTxnExecutor(TransactionEntry txnEntry) {
+ this(txnEntry, TFileFormatType.FORMAT_PROTO, TFileCompressType.PLAIN);
+ }
+
+ public StreamLoadTxnExecutor(TransactionEntry entry, TFileFormatType
formatType, TFileCompressType compressType) {
+ this.txnEntry = entry;
+ this.formatType = formatType;
+ this.compressType = compressType;
}
public void beginTransaction(TStreamLoadPutRequest request) throws
UserException, TException, TimeoutException,
Review Comment:
How about name it `executeTranscation`?
##########
fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MysqlLoadManager.java:
##########
@@ -43,55 +59,297 @@
import org.apache.http.util.EntityUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
+import org.apache.thrift.TException;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
-import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
-import java.util.Base64;
import java.util.List;
import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
public class MysqlLoadManager {
private static final Logger LOG =
LogManager.getLogger(MysqlLoadManager.class);
private final ThreadPoolExecutor mysqlLoadPool;
+ private final ConcurrentHashMap<String, Long> mysqlAuthTokens = new
ConcurrentHashMap<>();
+ private final ScheduledExecutorService authCleaner;
public MysqlLoadManager() {
this.mysqlLoadPool = ThreadPoolManager.newDaemonCacheThreadPool(4,
"Mysql Load", true);
+ this.authCleaner = Executors.newScheduledThreadPool(1);
+ this.authCleaner.scheduleAtFixedRate(() -> {
+ synchronized (mysqlAuthTokens) {
+ for (String key : mysqlAuthTokens.keySet()) {
+ if (System.currentTimeMillis() - mysqlAuthTokens.get(key)
>= 3600 * 1000) {
+ mysqlAuthTokens.remove(key);
+ }
+ }
+ }
+ }, 1, 1, TimeUnit.DAYS);
+ }
+
+ // this method only will be called in master node, since stream load only
send message to master.
+ public boolean checkAuthToken(String token) {
+ return mysqlAuthTokens.containsKey(token);
+ }
+
+ // context only use in no master branch.
+ public String acquireToken(ConnectContext context) {
+ if (Env.getCurrentEnv().isMaster()) {
+ String token = UUID.randomUUID().toString();
+ long createTime = System.currentTimeMillis();
+ mysqlAuthTokens.put(token, createTime);
+ return token;
+ } else {
+ MasterTxnExecutor masterTxnExecutor = new
MasterTxnExecutor(context);
+ try {
+ return masterTxnExecutor.acquireToken();
+ } catch (TException e) {
+ LOG.error("acquire token error", e);
+ return null;
+ }
+ }
+ }
+
+ // context only use in no master branch.
+ public void releaseToken(ConnectContext context, String token) {
+ if (Env.getCurrentEnv().isMaster()) {
+ mysqlAuthTokens.remove(token);
+ } else {
+ MasterTxnExecutor masterTxnExecutor = new
MasterTxnExecutor(context);
+ try {
+ masterTxnExecutor.releaseToken(token);
+ } catch (TException e) {
+ LOG.error("release token error", e);
+ }
+ }
}
public LoadJobRowResult executeMySqlLoadJobFromStmt(ConnectContext
context, LoadStmt stmt)
throws IOException, LoadException {
LoadJobRowResult loadResult = new LoadJobRowResult();
// Mysql data load only have one data desc
DataDescription dataDesc = stmt.getDataDescriptions().get(0);
- String database = dataDesc.getDbName();
- String table = dataDesc.getTableName();
List<String> filePaths = dataDesc.getFilePaths();
- try (final CloseableHttpClient httpclient =
HttpClients.createDefault()) {
- for (String file : filePaths) {
- InputStreamEntity entity = getInputStreamEntity(context,
dataDesc.isClientLocal(), file);
- HttpPut request = generateRequestForMySqlLoad(entity,
dataDesc, database, table);
- try (final CloseableHttpResponse response =
httpclient.execute(request)) {
- JsonObject result =
JsonParser.parseString(EntityUtils.toString(response.getEntity()))
- .getAsJsonObject();
- if
(!result.get("Status").getAsString().equalsIgnoreCase("Success")) {
- LOG.warn("Execute stream load for mysql data load
failed with message: " + request);
- throw new
LoadException(result.get("Message").getAsString());
+ if (Config.use_http_mysql_load_job) {
+ String database = dataDesc.getDbName();
+ String table = dataDesc.getTableName();
+ String token = acquireToken(context);
+ if (token == null) {
+ throw new LoadException("Can't get the load token in mysql
load");
+ }
+ try (final CloseableHttpClient httpclient =
HttpClients.createDefault()) {
+ for (String file : filePaths) {
+ InputStreamEntity entity = getInputStreamEntity(context,
dataDesc.isClientLocal(), file);
+ HttpPut request = generateRequestForMySqlLoad(entity,
dataDesc, database, table, token);
+ try (final CloseableHttpResponse response =
httpclient.execute(request)) {
+ JsonObject result =
JsonParser.parseString(EntityUtils.toString(response.getEntity()))
+ .getAsJsonObject();
+ if
(!result.get("Status").getAsString().equalsIgnoreCase("Success")) {
+ LOG.warn("Execute stream load for mysql data load
failed with message: " + request);
+ throw new
LoadException(result.get("Message").getAsString());
+ }
+
loadResult.incRecords(result.get("NumberLoadedRows").getAsLong());
+
loadResult.incSkipped(result.get("NumberFilteredRows").getAsInt());
}
-
loadResult.incRecords(result.get("NumberLoadedRows").getAsLong());
-
loadResult.incSkipped(result.get("NumberFilteredRows").getAsInt());
}
}
+ releaseToken(context, token);
+ } else {
+ StreamLoadTxnExecutor executor = null;
+ try {
+ String database = dataDesc.getFullDatabaseName();
+ String table = dataDesc.getTableName();
+ TransactionEntry entry = prepareTransactionEntry(database,
table);
+ openTxn(context, entry);
+ executor = beginTxn(context, entry, dataDesc);
+ // sendData
+ for (String file : filePaths) {
+ sendData(context, executor, file);
+ }
+ executor.commitTransaction();
+ } catch (Exception e) {
+ LOG.error("Failed to load mysql data into doris", e);
+ if (executor != null) {
+ try {
+ executor.abortTransaction();
+ } catch (Exception ex) {
+ throw new LoadException("Failed when abort the
transaction", ex);
+ }
+ }
+ throw new LoadException("Load failed when execute the mysql
data load", e);
+ }
}
return loadResult;
}
+ private TransactionEntry prepareTransactionEntry(String database, String
table)
+ throws TException {
+ TTxnParams txnConf = new TTxnParams();
+
txnConf.setNeedTxn(true).setEnablePipelineTxnLoad(Config.enable_pipeline_load)
+ .setThriftRpcTimeoutMs(5000).setTxnId(-1).setDb("").setTbl("");
+ Database dbObj = Env.getCurrentInternalCatalog()
+ .getDbOrException(database, s -> new TException("database is
invalid for dbName: " + s));
+ Table tblObj = dbObj.getTableOrException(table, s -> new
TException("table is invalid: " + s));
+ txnConf.setDbId(dbObj.getId()).setTbl(table).setDb(database);
+
+ TransactionEntry txnEntry = new TransactionEntry();
+ txnEntry.setTxnConf(txnConf);
+ txnEntry.setTable(tblObj);
+ txnEntry.setDb(dbObj);
+ String label = UUID.randomUUID().toString();
+ txnEntry.setLabel(label);
+ return txnEntry;
+ }
+
+ private void openTxn(ConnectContext context, TransactionEntry txnEntry)
throws Exception {
Review Comment:
The method name is confusing.
in `openTxn()`, it actually "beginTransaction()".
and in `beginTxn()`, it actually do the plan job.
##########
gensrc/thrift/FrontendService.thrift:
##########
@@ -716,6 +721,19 @@ struct TFetchSchemaTableDataResult {
2: optional list<Data.TRow> data_batch;
}
+struct TMySqlLoadAcquireTokenResult {
+ 1: required Status.TStatus status
Review Comment:
use `optional`
##########
gensrc/thrift/FrontendService.thrift:
##########
@@ -483,6 +483,7 @@ struct TLoadTxnBeginRequest {
10: optional i64 timeout
11: optional Types.TUniqueId request_id
12: optional string auth_code_uuid
+ 13: optional string token
Review Comment:
I think we can unify `auth_code_uuid` and `token`.
Currently, `auth_code_uuid` is used for transactional insert operation, and
is generated for each TransactionState on FE side, and pass it to BE.
I think we can replace the `auth_code_uuid` with `token`. When creating
`TransactionState`, get a token from TokenManager, and when checking token,
just check if token is still valid.
##########
gensrc/thrift/FrontendService.thrift:
##########
@@ -716,6 +721,19 @@ struct TFetchSchemaTableDataResult {
2: optional list<Data.TRow> data_batch;
}
+struct TMySqlLoadAcquireTokenResult {
+ 1: required Status.TStatus status
+ 2: optional string token
+}
+
+struct TMySqlLoadReleaseTokenRequest {
+ 1: optional string token
+}
+
+struct TMySqlLoadReleaseTokenResult {
+ 1: required Status.TStatus status
Review Comment:
use `optional`, it is a tradition.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]