This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new b2a21e6533c [bugfix](external)Memory leak problem for external table
with insert operation (#40440)
b2a21e6533c is described below
commit b2a21e6533cabdf800ea436ed33f9764c47445d6
Author: wuwenchi <[email protected]>
AuthorDate: Sat Sep 21 08:52:00 2024 +0800
[bugfix](external)Memory leak problem for external table with insert
operation (#40440)
## Proposed changes
Get the corresponding transaction through `txnId`, and then update the
file list returned from be according to the transaction.
---
.../main/java/org/apache/doris/catalog/Env.java | 8 ++++
.../commands/insert/AbstractInsertExecutor.java | 9 ++++-
.../insert/BaseExternalTableInsertExecutor.java | 13 ------
.../plans/commands/insert/HiveInsertExecutor.java | 8 +---
.../commands/insert/IcebergInsertExecutor.java | 8 +---
.../commands/insert/InsertIntoTableCommand.java | 1 +
.../plans/commands/insert/JdbcInsertExecutor.java | 5 ---
.../plans/commands/insert/OlapInsertExecutor.java | 7 ----
.../main/java/org/apache/doris/qe/Coordinator.java | 32 ++++++---------
...ava => AbstractExternalTransactionManager.java} | 40 ++++++++++--------
....java => GlobalExternalTransactionInfoMgr.java} | 26 ++++++++----
.../doris/transaction/HiveTransactionManager.java | 47 ++--------------------
.../transaction/IcebergTransactionManager.java | 47 ++--------------------
.../doris/transaction/TransactionManager.java | 2 +-
14 files changed, 82 insertions(+), 171 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
index 1b55a966958..4c4c3089bab 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
@@ -290,6 +290,7 @@ import org.apache.doris.thrift.TStatus;
import org.apache.doris.thrift.TStatusCode;
import org.apache.doris.thrift.TStorageMedium;
import org.apache.doris.transaction.DbUsedDataQuotaInfoCollector;
+import org.apache.doris.transaction.GlobalExternalTransactionInfoMgr;
import org.apache.doris.transaction.GlobalTransactionMgrIface;
import org.apache.doris.transaction.PublishVersionDaemon;
@@ -568,6 +569,8 @@ public class Env {
private final SplitSourceManager splitSourceManager;
+ private final GlobalExternalTransactionInfoMgr
globalExternalTransactionInfoMgr;
+
private final List<String> forceSkipJournalIds =
Arrays.asList(Config.force_skip_journal_ids);
// if a config is relative to a daemon thread. record the relation here.
we will proactively change interval of it.
@@ -816,6 +819,7 @@ public class Env {
this.dnsCache = new DNSCache();
this.sqlCacheManager = new NereidsSqlCacheManager();
this.splitSourceManager = new SplitSourceManager();
+ this.globalExternalTransactionInfoMgr = new
GlobalExternalTransactionInfoMgr();
}
public static void destroyCheckpoint() {
@@ -6591,6 +6595,10 @@ public class Env {
return splitSourceManager;
}
+ public GlobalExternalTransactionInfoMgr
getGlobalExternalTransactionInfoMgr() {
+ return globalExternalTransactionInfoMgr;
+ }
+
public StatisticsJobAppender getStatisticsJobAppender() {
return statisticsJobAppender;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java
index cafffab295e..de3fc5eb953 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java
@@ -48,7 +48,9 @@ import java.util.Optional;
* The derived class should implement the abstract method for certain type of
target table
*/
public abstract class AbstractInsertExecutor {
+ protected static final long INVALID_TXN_ID = -1L;
private static final Logger LOG =
LogManager.getLogger(AbstractInsertExecutor.class);
+
protected long jobId;
protected final ConnectContext ctx;
protected final Coordinator coordinator;
@@ -62,6 +64,7 @@ public abstract class AbstractInsertExecutor {
protected String errMsg = "";
protected Optional<InsertCommandContext> insertCtx;
protected final boolean emptyInsert;
+ protected long txnId = INVALID_TXN_ID;
/**
* Constructor
@@ -93,7 +96,9 @@ public abstract class AbstractInsertExecutor {
return labelName;
}
- public abstract long getTxnId();
+ public long getTxnId() {
+ return txnId;
+ }
/**
* begin transaction if necessary
@@ -108,7 +113,7 @@ public abstract class AbstractInsertExecutor {
/**
* Do something before exec
*/
- protected abstract void beforeExec();
+ protected abstract void beforeExec() throws UserException;
/**
* Do something after exec finished
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/BaseExternalTableInsertExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/BaseExternalTableInsertExecutor.java
index e456d171df5..082f1bab7d6 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/BaseExternalTableInsertExecutor.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/BaseExternalTableInsertExecutor.java
@@ -46,9 +46,7 @@ import java.util.Optional;
* Insert executor for base external table
*/
public abstract class BaseExternalTableInsertExecutor extends
AbstractInsertExecutor {
- protected static final long INVALID_TXN_ID = -1L;
private static final Logger LOG =
LogManager.getLogger(BaseExternalTableInsertExecutor.class);
- protected long txnId = INVALID_TXN_ID;
protected TransactionStatus txnStatus = TransactionStatus.ABORTED;
protected final TransactionManager transactionManager;
protected final String catalogName;
@@ -70,16 +68,6 @@ public abstract class BaseExternalTableInsertExecutor
extends AbstractInsertExec
}
}
- @Override
- public long getTxnId() {
- return txnId;
- }
-
- /**
- * collect commit infos from BEs
- */
- protected abstract void setCollectCommitInfoFunc();
-
/**
* At this time, FE has successfully collected all commit information from
BEs.
* Before commit this txn, commit information need to be analyzed and
processed.
@@ -94,7 +82,6 @@ public abstract class BaseExternalTableInsertExecutor extends
AbstractInsertExec
@Override
public void beginTransaction() {
txnId = transactionManager.begin();
- setCollectCommitInfoFunc();
}
@Override
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/HiveInsertExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/HiveInsertExecutor.java
index 10ff27add86..99464ccfc01 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/HiveInsertExecutor.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/HiveInsertExecutor.java
@@ -49,13 +49,7 @@ public class HiveInsertExecutor extends
BaseExternalTableInsertExecutor {
}
@Override
- public void setCollectCommitInfoFunc() {
- HMSTransaction transaction = (HMSTransaction)
transactionManager.getTransaction(txnId);
-
coordinator.setHivePartitionUpdateFunc(transaction::updateHivePartitionUpdates);
- }
-
- @Override
- protected void beforeExec() {
+ protected void beforeExec() throws UserException {
// check params
HMSTransaction transaction = (HMSTransaction)
transactionManager.getTransaction(txnId);
Preconditions.checkArgument(insertCtx.isPresent(), "insert context
must be present");
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/IcebergInsertExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/IcebergInsertExecutor.java
index 86b1f1ef0b7..fe8ff063571 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/IcebergInsertExecutor.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/IcebergInsertExecutor.java
@@ -47,13 +47,7 @@ public class IcebergInsertExecutor extends
BaseExternalTableInsertExecutor {
}
@Override
- public void setCollectCommitInfoFunc() {
- IcebergTransaction transaction = (IcebergTransaction)
transactionManager.getTransaction(txnId);
-
coordinator.setIcebergCommitDataFunc(transaction::updateIcebergCommitData);
- }
-
- @Override
- protected void beforeExec() {
+ protected void beforeExec() throws UserException {
String dbName = ((IcebergExternalTable) table).getDbName();
String tbName = table.getName();
SimpleTableInfo tableInfo = new SimpleTableInfo(dbName, tbName);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
index 38d0d838630..74f75d2d7d5 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
@@ -237,6 +237,7 @@ public class InsertIntoTableCommand extends Command
implements ForwardWithSync,
executor.setProfileType(ProfileType.LOAD);
// We exposed @StmtExecutor#cancel as a unified entry point for
statement interruption,
// so we need to set this here
+ insertExecutor.getCoordinator().setTxnId(insertExecutor.getTxnId());
executor.setCoord(insertExecutor.getCoordinator());
return insertExecutor;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/JdbcInsertExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/JdbcInsertExecutor.java
index 928b17edf38..fb41f71083a 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/JdbcInsertExecutor.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/JdbcInsertExecutor.java
@@ -90,11 +90,6 @@ public class JdbcInsertExecutor extends
BaseExternalTableInsertExecutor {
// do nothing
}
- @Override
- protected void setCollectCommitInfoFunc() {
- // do nothing
- }
-
@Override
protected void doBeforeCommit() throws UserException {
// do nothing
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java
index e38ee40bc9a..658b154b017 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java
@@ -72,9 +72,7 @@ import java.util.stream.Collectors;
* Insert executor for olap table
*/
public class OlapInsertExecutor extends AbstractInsertExecutor {
- protected static final long INVALID_TXN_ID = -1L;
private static final Logger LOG =
LogManager.getLogger(OlapInsertExecutor.class);
- protected long txnId = INVALID_TXN_ID;
protected TransactionStatus txnStatus = TransactionStatus.ABORTED;
/**
@@ -85,11 +83,6 @@ public class OlapInsertExecutor extends
AbstractInsertExecutor {
super(ctx, table, labelName, planner, insertCtx, emptyInsert);
}
- @Override
- public long getTxnId() {
- return txnId;
- }
-
@Override
public void beginTransaction() {
if (isGroupCommitHttpStream()) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index b1ea3772deb..39cdb051378 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -36,6 +36,8 @@ import org.apache.doris.common.util.ListUtil;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.datasource.ExternalScanNode;
import org.apache.doris.datasource.FileQueryScanNode;
+import org.apache.doris.datasource.hive.HMSTransaction;
+import org.apache.doris.datasource.iceberg.IcebergTransaction;
import org.apache.doris.load.loadv2.LoadJob;
import org.apache.doris.metric.MetricRepo;
import org.apache.doris.mysql.MysqlCommand;
@@ -94,8 +96,6 @@ import org.apache.doris.thrift.TExternalScanRange;
import org.apache.doris.thrift.TFileScanRange;
import org.apache.doris.thrift.TFileScanRangeParams;
import org.apache.doris.thrift.TFragmentInstanceReport;
-import org.apache.doris.thrift.THivePartitionUpdate;
-import org.apache.doris.thrift.TIcebergCommitData;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TPaloScanRange;
import org.apache.doris.thrift.TPipelineFragmentParams;
@@ -251,12 +251,6 @@ public class Coordinator implements CoordInterface {
private final List<TTabletCommitInfo> commitInfos = Lists.newArrayList();
private final List<TErrorTabletInfo> errorTabletInfos =
Lists.newArrayList();
- // Collect all hivePartitionUpdates obtained from be
- Consumer<List<THivePartitionUpdate>> hivePartitionUpdateFunc;
-
- // Collect all icebergCommitData obtained from be
- Consumer<List<TIcebergCommitData>> icebergCommitDataFunc;
-
// Input parameter
private long jobId = -1; // job which this task belongs to
private TUniqueId queryId;
@@ -484,6 +478,10 @@ public class Coordinator implements CoordInterface {
return txnId;
}
+ public void setTxnId(long txnId) {
+ this.txnId = txnId;
+ }
+
public String getLabel() {
return label;
}
@@ -2381,14 +2379,6 @@ public class Coordinator implements CoordInterface {
// TODO: more ranges?
}
- public void
setHivePartitionUpdateFunc(Consumer<List<THivePartitionUpdate>>
hivePartitionUpdateFunc) {
- this.hivePartitionUpdateFunc = hivePartitionUpdateFunc;
- }
-
- public void setIcebergCommitDataFunc(Consumer<List<TIcebergCommitData>>
icebergCommitDataFunc) {
- this.icebergCommitDataFunc = icebergCommitDataFunc;
- }
-
// update job progress from BE
public void updateFragmentExecStatus(TReportExecStatusParams params) {
PipelineExecContext ctx =
pipelineExecContexts.get(Pair.of(params.getFragmentId(),
params.getBackendId()));
@@ -2441,11 +2431,13 @@ public class Coordinator implements CoordInterface {
if (params.isSetErrorTabletInfos()) {
updateErrorTabletInfos(params.getErrorTabletInfos());
}
- if (params.isSetHivePartitionUpdates() && hivePartitionUpdateFunc !=
null) {
- hivePartitionUpdateFunc.accept(params.getHivePartitionUpdates());
+ if (params.isSetHivePartitionUpdates()) {
+ ((HMSTransaction)
Env.getCurrentEnv().getGlobalExternalTransactionInfoMgr().getTxnById(txnId))
+ .updateHivePartitionUpdates(params.getHivePartitionUpdates());
}
- if (params.isSetIcebergCommitDatas() && icebergCommitDataFunc != null)
{
- icebergCommitDataFunc.accept(params.getIcebergCommitDatas());
+ if (params.isSetIcebergCommitDatas()) {
+ ((IcebergTransaction)
Env.getCurrentEnv().getGlobalExternalTransactionInfoMgr().getTxnById(txnId))
+ .updateIcebergCommitData(params.getIcebergCommitDatas());
}
if (ctx.done) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/transaction/IcebergTransactionManager.java
b/fe/fe-core/src/main/java/org/apache/doris/transaction/AbstractExternalTransactionManager.java
similarity index 53%
copy from
fe/fe-core/src/main/java/org/apache/doris/transaction/IcebergTransactionManager.java
copy to
fe/fe-core/src/main/java/org/apache/doris/transaction/AbstractExternalTransactionManager.java
index f373c133685..da80b8f77bd 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/transaction/IcebergTransactionManager.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/transaction/AbstractExternalTransactionManager.java
@@ -17,29 +17,33 @@
package org.apache.doris.transaction;
-
import org.apache.doris.catalog.Env;
import org.apache.doris.common.UserException;
-import org.apache.doris.datasource.iceberg.IcebergMetadataOps;
-import org.apache.doris.datasource.iceberg.IcebergTransaction;
+import org.apache.doris.datasource.operations.ExternalMetadataOps;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
-public class IcebergTransactionManager implements TransactionManager {
+public abstract class AbstractExternalTransactionManager<T extends
Transaction> implements TransactionManager {
+ private static final Logger LOG =
LogManager.getLogger(AbstractExternalTransactionManager.class);
+ private final Map<Long, T> transactions = new ConcurrentHashMap<>();
+ protected final ExternalMetadataOps ops;
- private final Map<Long, IcebergTransaction> transactions = new
ConcurrentHashMap<>();
- private final IcebergMetadataOps ops;
-
- public IcebergTransactionManager(IcebergMetadataOps ops) {
+ public AbstractExternalTransactionManager(ExternalMetadataOps ops) {
this.ops = ops;
}
+ abstract T createTransaction();
+
@Override
public long begin() {
long id = Env.getCurrentEnv().getNextId();
- IcebergTransaction icebergTransaction = new IcebergTransaction(ops);
- transactions.put(id, icebergTransaction);
+ T transaction = createTransaction();
+ transactions.put(id, transaction);
+
Env.getCurrentEnv().getGlobalExternalTransactionInfoMgr().putTxnById(id,
transaction);
return id;
}
@@ -47,27 +51,31 @@ public class IcebergTransactionManager implements
TransactionManager {
public void commit(long id) throws UserException {
getTransactionWithException(id).commit();
transactions.remove(id);
+
Env.getCurrentEnv().getGlobalExternalTransactionInfoMgr().removeTxnById(id);
}
@Override
public void rollback(long id) {
try {
getTransactionWithException(id).rollback();
+ } catch (TransactionNotFoundException e) {
+ LOG.warn(e.getMessage(), e);
} finally {
transactions.remove(id);
+
Env.getCurrentEnv().getGlobalExternalTransactionInfoMgr().removeTxnById(id);
}
}
@Override
- public IcebergTransaction getTransaction(long id) {
+ public Transaction getTransaction(long id) throws UserException {
return getTransactionWithException(id);
}
- public IcebergTransaction getTransactionWithException(long id) {
- IcebergTransaction icebergTransaction = transactions.get(id);
- if (icebergTransaction == null) {
- throw new RuntimeException("Can't find transaction for " + id);
+ private Transaction getTransactionWithException(long id) throws
TransactionNotFoundException {
+ Transaction txn = transactions.get(id);
+ if (txn == null) {
+ throw new TransactionNotFoundException("Can't find transaction for
" + id);
}
- return icebergTransaction;
+ return txn;
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionManager.java
b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalExternalTransactionInfoMgr.java
similarity index 54%
copy from
fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionManager.java
copy to
fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalExternalTransactionInfoMgr.java
index ca9cbb917ec..e516c648dff 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionManager.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalExternalTransactionInfoMgr.java
@@ -17,15 +17,27 @@
package org.apache.doris.transaction;
-import org.apache.doris.common.UserException;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
-public interface TransactionManager {
+public class GlobalExternalTransactionInfoMgr {
+ public Map<Long, Transaction> idToTxn = new ConcurrentHashMap<>();
- long begin();
+ public Transaction getTxnById(long txnId) {
+ if (idToTxn.containsKey(txnId)) {
+ return idToTxn.get(txnId);
+ }
+ throw new RuntimeException("Can't find txn for " + txnId);
+ }
- void commit(long id) throws UserException;
+ public void putTxnById(long txnId, Transaction txn) {
+ if (idToTxn.containsKey(txnId)) {
+ throw new RuntimeException("Duplicate txnId for " + txnId);
+ }
+ idToTxn.put(txnId, txn);
+ }
- void rollback(long id);
-
- Transaction getTransaction(long id);
+ public void removeTxnById(long txnId) {
+ idToTxn.remove(txnId);
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/transaction/HiveTransactionManager.java
b/fe/fe-core/src/main/java/org/apache/doris/transaction/HiveTransactionManager.java
index c48210ad452..65f0c2bd5e3 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/transaction/HiveTransactionManager.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/transaction/HiveTransactionManager.java
@@ -17,65 +17,26 @@
package org.apache.doris.transaction;
-import org.apache.doris.catalog.Env;
-import org.apache.doris.common.UserException;
import org.apache.doris.datasource.hive.HMSTransaction;
import org.apache.doris.datasource.hive.HiveMetadataOps;
import org.apache.doris.fs.FileSystemProvider;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
-public class HiveTransactionManager implements TransactionManager {
-
- private final Map<Long, HMSTransaction> transactions = new
ConcurrentHashMap<>();
- private final HiveMetadataOps ops;
+public class HiveTransactionManager extends
AbstractExternalTransactionManager<HMSTransaction> {
private final FileSystemProvider fileSystemProvider;
-
private final Executor fileSystemExecutor;
public HiveTransactionManager(HiveMetadataOps ops, FileSystemProvider
fileSystemProvider,
Executor fileSystemExecutor) {
- this.ops = ops;
+ super(ops);
this.fileSystemProvider = fileSystemProvider;
this.fileSystemExecutor = fileSystemExecutor;
}
@Override
- public long begin() {
- long id = Env.getCurrentEnv().getNextId();
- HMSTransaction hiveTransaction = new HMSTransaction(ops,
fileSystemProvider, fileSystemExecutor);
- transactions.put(id, hiveTransaction);
- return id;
- }
-
- @Override
- public void commit(long id) throws UserException {
- getTransactionWithException(id).commit();
- transactions.remove(id);
- }
-
- @Override
- public void rollback(long id) {
- try {
- getTransactionWithException(id).rollback();
- } finally {
- transactions.remove(id);
- }
- }
-
- @Override
- public HMSTransaction getTransaction(long id) {
- return getTransactionWithException(id);
- }
-
- public HMSTransaction getTransactionWithException(long id) {
- HMSTransaction hiveTransaction = transactions.get(id);
- if (hiveTransaction == null) {
- throw new RuntimeException("Can't find transaction for " + id);
- }
- return hiveTransaction;
+ HMSTransaction createTransaction() {
+ return new HMSTransaction((HiveMetadataOps) ops, fileSystemProvider,
fileSystemExecutor);
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/transaction/IcebergTransactionManager.java
b/fe/fe-core/src/main/java/org/apache/doris/transaction/IcebergTransactionManager.java
index f373c133685..8f4d25a19b3 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/transaction/IcebergTransactionManager.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/transaction/IcebergTransactionManager.java
@@ -18,56 +18,17 @@
package org.apache.doris.transaction;
-import org.apache.doris.catalog.Env;
-import org.apache.doris.common.UserException;
import org.apache.doris.datasource.iceberg.IcebergMetadataOps;
import org.apache.doris.datasource.iceberg.IcebergTransaction;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-public class IcebergTransactionManager implements TransactionManager {
-
- private final Map<Long, IcebergTransaction> transactions = new
ConcurrentHashMap<>();
- private final IcebergMetadataOps ops;
+public class IcebergTransactionManager extends
AbstractExternalTransactionManager<IcebergTransaction> {
public IcebergTransactionManager(IcebergMetadataOps ops) {
- this.ops = ops;
+ super(ops);
}
@Override
- public long begin() {
- long id = Env.getCurrentEnv().getNextId();
- IcebergTransaction icebergTransaction = new IcebergTransaction(ops);
- transactions.put(id, icebergTransaction);
- return id;
- }
-
- @Override
- public void commit(long id) throws UserException {
- getTransactionWithException(id).commit();
- transactions.remove(id);
- }
-
- @Override
- public void rollback(long id) {
- try {
- getTransactionWithException(id).rollback();
- } finally {
- transactions.remove(id);
- }
- }
-
- @Override
- public IcebergTransaction getTransaction(long id) {
- return getTransactionWithException(id);
- }
-
- public IcebergTransaction getTransactionWithException(long id) {
- IcebergTransaction icebergTransaction = transactions.get(id);
- if (icebergTransaction == null) {
- throw new RuntimeException("Can't find transaction for " + id);
- }
- return icebergTransaction;
+ IcebergTransaction createTransaction() {
+ return new IcebergTransaction((IcebergMetadataOps) ops);
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionManager.java
b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionManager.java
index ca9cbb917ec..fbff324ae91 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionManager.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionManager.java
@@ -27,5 +27,5 @@ public interface TransactionManager {
void rollback(long id);
- Transaction getTransaction(long id);
+ Transaction getTransaction(long id) throws UserException;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]