This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch branch-1.2-lts
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-1.2-lts by this push:
new 184ca99803 [enhancement](stream load pipe) using queryid or load id to
identify stream load pipe instead of fragment instance id (#17439)
184ca99803 is described below
commit 184ca99803b954bfc755732a1ee3de07a0729683
Author: yiguolei <[email protected]>
AuthorDate: Mon Mar 6 11:10:54 2023 +0800
[enhancement](stream load pipe) using queryid or load id to identify stream
load pipe instead of fragment instance id (#17439)
cherry-pick part of #17362
This PR does not affect load behavior, just add load id to Protobuf Message.
So user could upgrade to 2.x from 1.2.3 smoothly.
---
.../org/apache/doris/qe/InsertStreamTxnExecutor.java | 16 +++++++++++++---
.../src/main/java/org/apache/doris/qe/StmtExecutor.java | 6 +++---
.../java/org/apache/doris/rpc/BackendServiceProxy.java | 14 +++++++++-----
.../org/apache/doris/transaction/TransactionEntry.java | 10 ++++++++++
.../apache/doris/load/sync/canal/CanalSyncDataTest.java | 13 +++++++------
gensrc/proto/internal_service.proto | 3 +++
6 files changed, 45 insertions(+), 17 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/qe/InsertStreamTxnExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/InsertStreamTxnExecutor.java
index 033d013ecf..cd50fa6feb 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/InsertStreamTxnExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/InsertStreamTxnExecutor.java
@@ -63,9 +63,11 @@ public class InsertStreamTxnExecutor {
public void beginTransaction(TStreamLoadPutRequest request) throws
UserException, TException, TimeoutException,
InterruptedException, ExecutionException {
TTxnParams txnConf = txnEntry.getTxnConf();
+ // StreamLoadTask's id == request's load_id
StreamLoadTask streamLoadTask =
StreamLoadTask.fromTStreamLoadPutRequest(request);
StreamLoadPlanner planner = new StreamLoadPlanner(
txnEntry.getDb(), (OlapTable) txnEntry.getTable(),
streamLoadTask);
+ // Will using load id as query id in fragment
TExecPlanFragmentParams tRequest =
planner.plan(streamLoadTask.getId());
BeSelectionPolicy policy = new
BeSelectionPolicy.Builder().setCluster(txnEntry.getDb().getClusterName())
.needLoadAvailable().needQueryAvailable().build();
@@ -90,6 +92,10 @@ public class InsertStreamTxnExecutor {
}
}
txnConf.setFragmentInstanceId(tRequest.params.fragment_instance_id);
+ this.loadId = request.getLoadId();
+ this.txnEntry.setpLoadId(Types.PUniqueId.newBuilder()
+ .setHi(loadId.getHi())
+ .setLo(loadId.getLo()).build());
Backend backend =
Env.getCurrentSystemInfo().getIdToBackend().get(beIds.get(0));
txnConf.setUserIp(backend.getHost());
@@ -117,11 +123,12 @@ public class InsertStreamTxnExecutor {
.setHi(txnConf.getFragmentInstanceId().getHi())
.setLo(txnConf.getFragmentInstanceId().getLo()).build();
+
Backend backend = txnEntry.getBackend();
TNetworkAddress address = new TNetworkAddress(backend.getHost(),
backend.getBrpcPort());
try {
Future<InternalService.PCommitResult> future = BackendServiceProxy
- .getInstance().commit(address, fragmentInstanceId);
+ .getInstance().commit(address, fragmentInstanceId,
this.txnEntry.getpLoadId());
InternalService.PCommitResult result = future.get(5,
TimeUnit.SECONDS);
TStatusCode code =
TStatusCode.findByValue(result.getStatus().getStatusCode());
if (code != TStatusCode.OK) {
@@ -143,7 +150,7 @@ public class InsertStreamTxnExecutor {
TNetworkAddress address = new TNetworkAddress(be.getHost(),
be.getBrpcPort());
try {
Future<InternalService.PRollbackResult> future =
BackendServiceProxy.getInstance().rollback(address,
- fragmentInstanceId);
+ fragmentInstanceId, this.txnEntry.getpLoadId());
InternalService.PRollbackResult result = future.get(5,
TimeUnit.SECONDS);
TStatusCode code =
TStatusCode.findByValue(result.getStatus().getStatusCode());
if (code != TStatusCode.OK) {
@@ -169,7 +176,7 @@ public class InsertStreamTxnExecutor {
TNetworkAddress address = new TNetworkAddress(backend.getHost(),
backend.getBrpcPort());
try {
Future<InternalService.PSendDataResult> future =
BackendServiceProxy.getInstance().sendData(
- address, fragmentInstanceId, txnEntry.getDataToSend());
+ address, fragmentInstanceId, this.txnEntry.getpLoadId(),
txnEntry.getDataToSend());
InternalService.PSendDataResult result = future.get(5,
TimeUnit.SECONDS);
TStatusCode code =
TStatusCode.findByValue(result.getStatus().getStatusCode());
if (code != TStatusCode.OK) {
@@ -188,6 +195,9 @@ public class InsertStreamTxnExecutor {
public void setLoadId(TUniqueId loadId) {
this.loadId = loadId;
+ this.txnEntry.setpLoadId(Types.PUniqueId.newBuilder()
+ .setHi(loadId.getHi())
+ .setLo(loadId.getLo()).build());
}
public long getTxnId() {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index 70d3fe0848..2f53b3ccb6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -586,7 +586,7 @@ public class StmtExecutor implements ProfileWriter {
throw e;
} catch (UserException e) {
// analysis exception only print message, not print the stack
- LOG.warn("execute Exception. {}, {}",
context.getQueryIdentifier(), e.getMessage());
+ LOG.warn("execute Exception. {}", context.getQueryIdentifier(), e);
context.getState().setError(e.getMysqlErrorCode(), e.getMessage());
context.getState().setErrType(QueryState.ErrType.ANALYSIS_ERR);
} catch (Exception e) {
@@ -1253,8 +1253,7 @@ public class StmtExecutor implements ProfileWriter {
if (context.getTxnEntry() == null) {
context.setTxnEntry(new TransactionEntry());
}
- TransactionEntry txnEntry = context.getTxnEntry();
- txnEntry.setTxnConf(txnParams);
+ context.getTxnEntry().setTxnConf(txnParams);
StringBuilder sb = new StringBuilder();
sb.append("{'label':'").append(context.getTxnEntry().getLabel()).append("',
'status':'")
.append(TransactionStatus.PREPARE.name());
@@ -1300,6 +1299,7 @@ public class StmtExecutor implements ProfileWriter {
.append(context.getTxnEntry().getTxnConf().getTxnId()).append("'").append("}");
context.getState().setOk(0, 0, sb.toString());
} catch (Exception e) {
+ LOG.warn("Txn commit failed", e);
throw new AnalysisException(e.getMessage());
} finally {
context.setTxnEntry(null);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java
b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java
index a7c2df6e28..5911558e02 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java
@@ -249,11 +249,13 @@ public class BackendServiceProxy {
}
public Future<InternalService.PSendDataResult> sendData(
- TNetworkAddress address, Types.PUniqueId fragmentInstanceId,
List<InternalService.PDataRow> data)
+ TNetworkAddress address, Types.PUniqueId fragmentInstanceId,
+ Types.PUniqueId loadId, List<InternalService.PDataRow> data)
throws RpcException {
final InternalService.PSendDataRequest.Builder pRequest =
InternalService.PSendDataRequest.newBuilder();
pRequest.setFragmentInstanceId(fragmentInstanceId);
+ pRequest.setLoadId(loadId);
pRequest.addAllData(data);
try {
final BackendServiceClient client = getProxy(address);
@@ -264,10 +266,11 @@ public class BackendServiceProxy {
}
}
- public Future<InternalService.PRollbackResult> rollback(TNetworkAddress
address, Types.PUniqueId fragmentInstanceId)
+ public Future<InternalService.PRollbackResult> rollback(TNetworkAddress
address,
+ Types.PUniqueId fragmentInstanceId, Types.PUniqueId loadId)
throws RpcException {
final InternalService.PRollbackRequest pRequest =
InternalService.PRollbackRequest.newBuilder()
- .setFragmentInstanceId(fragmentInstanceId).build();
+
.setFragmentInstanceId(fragmentInstanceId).setLoadId(loadId).build();
try {
final BackendServiceClient client = getProxy(address);
return client.rollback(pRequest);
@@ -277,10 +280,11 @@ public class BackendServiceProxy {
}
}
- public Future<InternalService.PCommitResult> commit(TNetworkAddress
address, Types.PUniqueId fragmentInstanceId)
+ public Future<InternalService.PCommitResult> commit(TNetworkAddress
address,
+ Types.PUniqueId fragmentInstanceId, Types.PUniqueId loadId)
throws RpcException {
final InternalService.PCommitRequest pRequest =
InternalService.PCommitRequest.newBuilder()
- .setFragmentInstanceId(fragmentInstanceId).build();
+
.setFragmentInstanceId(fragmentInstanceId).setLoadId(loadId).build();
try {
final BackendServiceClient client = getProxy(address);
return client.commit(pRequest);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionEntry.java
b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionEntry.java
index 4db596dc55..7136871579 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionEntry.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionEntry.java
@@ -20,6 +20,7 @@ package org.apache.doris.transaction;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Table;
import org.apache.doris.proto.InternalService;
+import org.apache.doris.proto.Types;
import org.apache.doris.system.Backend;
import org.apache.doris.thrift.TTxnParams;
@@ -35,6 +36,7 @@ public class TransactionEntry {
private TTxnParams txnConf;
private List<InternalService.PDataRow> dataToSend = new ArrayList<>();
private long rowsInTransaction = 0;
+ private Types.PUniqueId pLoadId;
public TransactionEntry() {
}
@@ -116,4 +118,12 @@ public class TransactionEntry {
public void setRowsInTransaction(long rowsInTransaction) {
this.rowsInTransaction = rowsInTransaction;
}
+
+ public Types.PUniqueId getpLoadId() {
+ return pLoadId;
+ }
+
+ public void setpLoadId(Types.PUniqueId pLoadId) {
+ this.pLoadId = pLoadId;
+ }
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/load/sync/canal/CanalSyncDataTest.java
b/fe/fe-core/src/test/java/org/apache/doris/load/sync/canal/CanalSyncDataTest.java
index 430d8d204e..a1f31b0b34 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/load/sync/canal/CanalSyncDataTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/load/sync/canal/CanalSyncDataTest.java
@@ -263,11 +263,11 @@ public class CanalSyncDataTest {
minTimes = 0;
result = execFuture;
- backendServiceProxy.commit((TNetworkAddress) any,
(Types.PUniqueId) any);
+ backendServiceProxy.commit((TNetworkAddress) any,
(Types.PUniqueId) any, (Types.PUniqueId) any);
minTimes = 0;
result = commitFuture;
- backendServiceProxy.sendData((TNetworkAddress) any,
(Types.PUniqueId) any,
+ backendServiceProxy.sendData((TNetworkAddress) any,
(Types.PUniqueId) any, (Types.PUniqueId) any,
(List<InternalService.PDataRow>) any);
minTimes = 0;
result = sendDataFuture;
@@ -336,7 +336,7 @@ public class CanalSyncDataTest {
minTimes = 0;
result = execFuture;
- backendServiceProxy.rollback((TNetworkAddress) any,
(Types.PUniqueId) any);
+ backendServiceProxy.rollback((TNetworkAddress) any,
(Types.PUniqueId) any, (Types.PUniqueId) any);
minTimes = 0;
result = abortFuture;
@@ -403,15 +403,16 @@ public class CanalSyncDataTest {
minTimes = 0;
result = execFuture;
- backendServiceProxy.commit((TNetworkAddress) any,
(Types.PUniqueId) any);
+ backendServiceProxy.commit((TNetworkAddress) any,
(Types.PUniqueId) any, (Types.PUniqueId) any);
minTimes = 0;
result = commitFuture;
- backendServiceProxy.rollback((TNetworkAddress) any,
(Types.PUniqueId) any);
+ backendServiceProxy.rollback((TNetworkAddress) any,
(Types.PUniqueId) any, (Types.PUniqueId) any);
minTimes = 0;
result = abortFuture;
- backendServiceProxy.sendData((TNetworkAddress) any,
(Types.PUniqueId) any, (List<InternalService.PDataRow>) any);
+ backendServiceProxy.sendData((TNetworkAddress) any,
(Types.PUniqueId) any,
+ (Types.PUniqueId) any,
(List<InternalService.PDataRow>) any);
minTimes = 0;
result = sendDataFuture;
diff --git a/gensrc/proto/internal_service.proto
b/gensrc/proto/internal_service.proto
index 0396e8944b..95e1887b26 100644
--- a/gensrc/proto/internal_service.proto
+++ b/gensrc/proto/internal_service.proto
@@ -351,6 +351,7 @@ message PDataRow {
message PSendDataRequest {
required PUniqueId fragment_instance_id = 1;
repeated PDataRow data = 2;
+ optional PUniqueId load_id = 3; // load_id == query_id in fragment exec
}
message PSendDataResult {
@@ -359,6 +360,7 @@ message PSendDataResult {
message PCommitRequest {
required PUniqueId fragment_instance_id = 1;
+ optional PUniqueId load_id = 2;
}
message PCommitResult {
@@ -367,6 +369,7 @@ message PCommitResult {
message PRollbackRequest {
required PUniqueId fragment_instance_id = 1;
+ optional PUniqueId load_id = 2;
}
message PRollbackResult {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]