This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new ab2f825 [Bug-fix] Pause routine load when data size exceeds quota
(#5749)
ab2f825 is described below
commit ab2f825a88359dab6bd581c39a5f04da54827e1e
Author: EmmyMiao87 <[email protected]>
AuthorDate: Fri May 7 11:19:36 2021 +0800
[Bug-fix] Pause routine load when data size exceeds quota (#5749)
In the previous code, the routine load task did not catch the exception of
opening the transaction.
As a result, although the task cannot be executed,
no exceptions can be seen during show routine load, only the routine load
job is stuck.
The PR catch the QuotaExceedException when opening a transaction.
If the routine load task cannot be executed due to the exhaustion of the
quota,
the routine load will be paused and an error message will be presented to
the user.
Similarly, other load method will also catch similar exceptions and cancel
job.
---
.../org/apache/doris/common/MetaNotFoundException.java | 2 +-
...NotFoundException.java => QuotaExceedException.java} | 17 +++++------------
.../java/org/apache/doris/common/util/DebugUtil.java | 7 +++++++
.../org/apache/doris/load/loadv2/BrokerLoadJob.java | 4 +++-
.../main/java/org/apache/doris/load/loadv2/LoadJob.java | 7 ++++---
.../org/apache/doris/load/loadv2/LoadJobScheduler.java | 4 +++-
.../java/org/apache/doris/load/loadv2/MiniLoadJob.java | 4 +++-
.../java/org/apache/doris/load/loadv2/SparkLoadJob.java | 4 +++-
.../doris/load/routineload/RoutineLoadTaskInfo.java | 8 +++++++-
.../doris/transaction/DatabaseTransactionMgr.java | 14 ++++++--------
.../apache/doris/transaction/GlobalTransactionMgr.java | 8 ++++++--
.../java/org/apache/doris/load/loadv2/LoadJobTest.java | 4 +++-
.../doris/transaction/GlobalTransactionMgrTest.java | 8 +++++++-
13 files changed, 58 insertions(+), 33 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/MetaNotFoundException.java
b/fe/fe-core/src/main/java/org/apache/doris/common/MetaNotFoundException.java
index 9626997..9fbd6b7 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/common/MetaNotFoundException.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/common/MetaNotFoundException.java
@@ -22,7 +22,7 @@ package org.apache.doris.common;
*/
public class MetaNotFoundException extends UserException {
public MetaNotFoundException(String msg) {
- super(msg);
+ super(InternalErrorCode.META_NOT_FOUND_ERR, msg);
}
public MetaNotFoundException(InternalErrorCode errcode, String msg) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/MetaNotFoundException.java
b/fe/fe-core/src/main/java/org/apache/doris/common/QuotaExceedException.java
similarity index 67%
copy from
fe/fe-core/src/main/java/org/apache/doris/common/MetaNotFoundException.java
copy to
fe/fe-core/src/main/java/org/apache/doris/common/QuotaExceedException.java
index 9626997..454161d 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/common/MetaNotFoundException.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/QuotaExceedException.java
@@ -17,19 +17,12 @@
package org.apache.doris.common;
-/**
- * Exception for meta info is null, like db table partition tablet replica job
- */
-public class MetaNotFoundException extends UserException {
- public MetaNotFoundException(String msg) {
- super(msg);
- }
+import org.apache.doris.common.util.DebugUtil;
- public MetaNotFoundException(InternalErrorCode errcode, String msg) {
- super(errcode, msg);
- }
+public class QuotaExceedException extends UserException {
- public MetaNotFoundException(String msg, Throwable e) {
- super(msg, e);
+ public QuotaExceedException(String databaseName, long dataQuotaBytes) {
+ super("Database[" + databaseName + "] data size exceeds quota["
+ + DebugUtil.printByteWithUnit(dataQuotaBytes) + "]");
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/util/DebugUtil.java
b/fe/fe-core/src/main/java/org/apache/doris/common/util/DebugUtil.java
index c8eca0b..8d4b960 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/util/DebugUtil.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/DebugUtil.java
@@ -119,6 +119,13 @@ public class DebugUtil {
return returnValue;
}
+ public static String printByteWithUnit(long value) {
+ Pair<Double, String> quotaUnitPair = getByteUint(value);
+ String readableQuota =
DebugUtil.DECIMAL_FORMAT_SCALE_3.format(quotaUnitPair.first) + " "
+ + quotaUnitPair.second;
+ return readableQuota;
+ }
+
public static String printId(final TUniqueId id) {
if (id == null) {
return "";
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
index 5027ecc..bb3188e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
@@ -28,6 +28,7 @@ import org.apache.doris.common.DataQualityException;
import org.apache.doris.common.DuplicatedRequestException;
import org.apache.doris.common.LabelAlreadyUsedException;
import org.apache.doris.common.MetaNotFoundException;
+import org.apache.doris.common.QuotaExceedException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.common.util.LogBuilder;
@@ -93,7 +94,8 @@ public class BrokerLoadJob extends BulkLoadJob {
@Override
public void beginTxn()
- throws LabelAlreadyUsedException, BeginTransactionException,
AnalysisException, DuplicatedRequestException {
+ throws LabelAlreadyUsedException, BeginTransactionException,
AnalysisException, DuplicatedRequestException,
+ QuotaExceedException, MetaNotFoundException {
MetricRepo.COUNTER_LOAD_ADD.increase(1L);
transactionId = Catalog.getCurrentGlobalTransactionMgr()
.beginTransaction(dbId,
Lists.newArrayList(fileGroupAggInfo.getAllTableIds()), label, null,
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java
index 85a1a46..86e914a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java
@@ -32,6 +32,7 @@ import org.apache.doris.common.FeMetaVersion;
import org.apache.doris.common.LabelAlreadyUsedException;
import org.apache.doris.common.LoadException;
import org.apache.doris.common.MetaNotFoundException;
+import org.apache.doris.common.QuotaExceedException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
@@ -388,7 +389,7 @@ public abstract class LoadJob extends
AbstractTxnStateChangeCallback implements
isJobTypeRead = jobTypeRead;
}
- public void beginTxn() throws LabelAlreadyUsedException,
BeginTransactionException, AnalysisException, DuplicatedRequestException {
+ public void beginTxn() throws LabelAlreadyUsedException,
BeginTransactionException, AnalysisException, DuplicatedRequestException,
QuotaExceedException, MetaNotFoundException {
}
/**
@@ -401,7 +402,7 @@ public abstract class LoadJob extends
AbstractTxnStateChangeCallback implements
* @throws DuplicatedRequestException
*/
public void execute() throws LabelAlreadyUsedException,
BeginTransactionException, AnalysisException,
- DuplicatedRequestException, LoadException {
+ DuplicatedRequestException, LoadException, QuotaExceedException,
MetaNotFoundException {
writeLock();
try {
unprotectedExecute();
@@ -411,7 +412,7 @@ public abstract class LoadJob extends
AbstractTxnStateChangeCallback implements
}
public void unprotectedExecute() throws LabelAlreadyUsedException,
BeginTransactionException, AnalysisException,
- DuplicatedRequestException, LoadException {
+ DuplicatedRequestException, LoadException, QuotaExceedException,
MetaNotFoundException {
// check if job state is pending
if (state != JobState.PENDING) {
return;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJobScheduler.java
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJobScheduler.java
index 4875052..e8d23b8 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJobScheduler.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJobScheduler.java
@@ -23,6 +23,8 @@ import org.apache.doris.common.Config;
import org.apache.doris.common.DuplicatedRequestException;
import org.apache.doris.common.LabelAlreadyUsedException;
import org.apache.doris.common.LoadException;
+import org.apache.doris.common.MetaNotFoundException;
+import org.apache.doris.common.QuotaExceedException;
import org.apache.doris.common.util.LogBuilder;
import org.apache.doris.common.util.LogKey;
import org.apache.doris.common.util.MasterDaemon;
@@ -79,7 +81,7 @@ public class LoadJobScheduler extends MasterDaemon {
// schedule job
try {
loadJob.execute();
- } catch (LabelAlreadyUsedException | AnalysisException e) {
+ } catch (LabelAlreadyUsedException | AnalysisException |
MetaNotFoundException | QuotaExceedException e) {
LOG.warn(new LogBuilder(LogKey.LOAD_JOB, loadJob.getId())
.add("error_msg", "There are error properties
in job. Job will be cancelled")
.build(), e);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MiniLoadJob.java
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MiniLoadJob.java
index 382867a..705e3f6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MiniLoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MiniLoadJob.java
@@ -24,6 +24,7 @@ import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.DuplicatedRequestException;
import org.apache.doris.common.LabelAlreadyUsedException;
import org.apache.doris.common.MetaNotFoundException;
+import org.apache.doris.common.QuotaExceedException;
import org.apache.doris.common.io.Text;
import org.apache.doris.load.EtlJobType;
import org.apache.doris.service.FrontendOptions;
@@ -91,7 +92,8 @@ public class MiniLoadJob extends LoadJob {
@Override
public void beginTxn()
- throws LabelAlreadyUsedException, BeginTransactionException,
AnalysisException, DuplicatedRequestException {
+ throws LabelAlreadyUsedException, BeginTransactionException,
AnalysisException, DuplicatedRequestException,
+ QuotaExceedException, MetaNotFoundException {
transactionId = Catalog.getCurrentGlobalTransactionMgr()
.beginTransaction(dbId, Lists.newArrayList(tableId), label,
requestId,
new TxnCoordinator(TxnSourceType.FE,
FrontendOptions.getLocalHostAddress()),
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java
index 726d5a3..a9ab2d3 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java
@@ -52,6 +52,7 @@ import org.apache.doris.common.LabelAlreadyUsedException;
import org.apache.doris.common.LoadException;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.Pair;
+import org.apache.doris.common.QuotaExceedException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.util.LogBuilder;
@@ -191,7 +192,8 @@ public class SparkLoadJob extends BulkLoadJob {
@Override
public void beginTxn()
- throws LabelAlreadyUsedException, BeginTransactionException,
AnalysisException, DuplicatedRequestException {
+ throws LabelAlreadyUsedException, BeginTransactionException,
AnalysisException, DuplicatedRequestException,
+ QuotaExceedException, MetaNotFoundException {
transactionId = Catalog.getCurrentGlobalTransactionMgr()
.beginTransaction(dbId,
Lists.newArrayList(fileGroupAggInfo.getAllTableIds()), label, null,
new TxnCoordinator(TxnSourceType.FE,
FrontendOptions.getLocalHostAddress()),
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java
index 43966a0..71bdb99 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java
@@ -22,6 +22,8 @@ import org.apache.doris.catalog.Catalog;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.DuplicatedRequestException;
import org.apache.doris.common.LabelAlreadyUsedException;
+import org.apache.doris.common.MetaNotFoundException;
+import org.apache.doris.common.QuotaExceedException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.common.util.TimeUtils;
@@ -162,7 +164,7 @@ public abstract class RoutineLoadTaskInfo {
// begin the txn of this task
// return true if begin successfully, return false if begin failed.
// throw exception if unrecoverable errors happen.
- public boolean beginTxn() throws LabelAlreadyUsedException {
+ public boolean beginTxn() throws UserException {
// begin a txn for task
RoutineLoadJob routineLoadJob = routineLoadManager.getJob(jobId);
try {
@@ -182,6 +184,10 @@ public abstract class RoutineLoadTaskInfo {
} catch (AnalysisException | BeginTransactionException e) {
LOG.debug("begin txn failed for routine load task: {}, {}",
DebugUtil.printId(id), e.getMessage());
return false;
+ } catch (MetaNotFoundException | QuotaExceedException e) {
+ LOG.warn("failed to begin txn for routine load task: {}, job id:
{}",
+ DebugUtil.printId(id), jobId, e);
+ throw e;
}
return true;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
index 9d240c4..45d15dc 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
@@ -38,6 +38,7 @@ import org.apache.doris.common.LabelAlreadyUsedException;
import org.apache.doris.common.LoadException;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.Pair;
+import org.apache.doris.common.QuotaExceedException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.common.util.MetaLockUtils;
@@ -257,7 +258,8 @@ public class DatabaseTransactionMgr {
public long beginTransaction(List<Long> tableIdList, String label,
TUniqueId requestId,
TransactionState.TxnCoordinator coordinator,
TransactionState.LoadJobSourceType sourceType, long listenerId, long
timeoutSecond)
- throws DuplicatedRequestException, LabelAlreadyUsedException,
BeginTransactionException, AnalysisException {
+ throws DuplicatedRequestException, LabelAlreadyUsedException,
BeginTransactionException, AnalysisException,
+ QuotaExceedException, MetaNotFoundException {
checkDatabaseDataQuota();
writeLock();
try {
@@ -324,10 +326,10 @@ public class DatabaseTransactionMgr {
}
- private void checkDatabaseDataQuota() throws AnalysisException {
+ private void checkDatabaseDataQuota() throws MetaNotFoundException,
QuotaExceedException {
Database db = catalog.getDb(dbId);
if (db == null) {
- throw new AnalysisException("Database[" + dbId + "] does not
exist");
+ throw new MetaNotFoundException("Database[" + dbId + "] does not
exist");
}
if (usedQuotaDataBytes == -1) {
@@ -336,11 +338,7 @@ public class DatabaseTransactionMgr {
long dataQuotaBytes = db.getDataQuota();
if (usedQuotaDataBytes >= dataQuotaBytes) {
- Pair<Double, String> quotaUnitPair =
DebugUtil.getByteUint(dataQuotaBytes);
- String readableQuota =
DebugUtil.DECIMAL_FORMAT_SCALE_3.format(quotaUnitPair.first) + " "
- + quotaUnitPair.second;
- throw new AnalysisException("Database[" + db.getFullName()
- + "] data size exceeds quota[" + readableQuota + "]");
+ throw new QuotaExceedException(db.getFullName(), dataQuotaBytes);
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
index f71d2be..6444b6e 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
@@ -24,7 +24,9 @@ import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.DuplicatedRequestException;
import org.apache.doris.common.LabelAlreadyUsedException;
+import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.Pair;
+import org.apache.doris.common.QuotaExceedException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.MetaLockUtils;
@@ -98,7 +100,8 @@ public class GlobalTransactionMgr implements Writable {
public long beginTransaction(long dbId, List<Long> tableIdList, String
label, TxnCoordinator coordinator, LoadJobSourceType sourceType,
long timeoutSecond)
- throws AnalysisException, LabelAlreadyUsedException,
BeginTransactionException, DuplicatedRequestException {
+ throws AnalysisException, LabelAlreadyUsedException,
BeginTransactionException, DuplicatedRequestException,
+ QuotaExceedException, MetaNotFoundException {
return beginTransaction(dbId, tableIdList, label, null, coordinator,
sourceType, -1, timeoutSecond);
}
@@ -117,7 +120,8 @@ public class GlobalTransactionMgr implements Writable {
*/
public long beginTransaction(long dbId, List<Long> tableIdList, String
label, TUniqueId requestId,
TxnCoordinator coordinator, LoadJobSourceType
sourceType, long listenerId, long timeoutSecond)
- throws AnalysisException, LabelAlreadyUsedException,
BeginTransactionException, DuplicatedRequestException {
+ throws AnalysisException, LabelAlreadyUsedException,
BeginTransactionException, DuplicatedRequestException,
+ QuotaExceedException, MetaNotFoundException {
if (Config.disable_load_job) {
throw new AnalysisException("disable_load_job is set to true, all
load jobs are prevented");
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/LoadJobTest.java
b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/LoadJobTest.java
index c25cd6d..5afe467 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/LoadJobTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/LoadJobTest.java
@@ -26,6 +26,7 @@ import org.apache.doris.common.DuplicatedRequestException;
import org.apache.doris.common.LabelAlreadyUsedException;
import org.apache.doris.common.LoadException;
import org.apache.doris.common.MetaNotFoundException;
+import org.apache.doris.common.QuotaExceedException;
import org.apache.doris.common.jmockit.Deencapsulation;
import org.apache.doris.metric.LongCounterMetric;
import org.apache.doris.metric.MetricRepo;
@@ -111,7 +112,8 @@ public class LoadJobTest {
@Test
public void testExecute(@Mocked GlobalTransactionMgr globalTransactionMgr,
@Mocked MasterTaskExecutor masterTaskExecutor)
- throws LabelAlreadyUsedException, BeginTransactionException,
AnalysisException, DuplicatedRequestException {
+ throws LabelAlreadyUsedException, BeginTransactionException,
AnalysisException, DuplicatedRequestException,
+ QuotaExceedException, MetaNotFoundException {
LoadJob loadJob = new BrokerLoadJob();
new Expectations() {
{
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java
b/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java
index c2e3d6e..d891f48 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java
@@ -30,6 +30,8 @@ import org.apache.doris.common.Config;
import org.apache.doris.common.DuplicatedRequestException;
import org.apache.doris.common.FeMetaVersion;
import org.apache.doris.common.LabelAlreadyUsedException;
+import org.apache.doris.common.MetaNotFoundException;
+import org.apache.doris.common.QuotaExceedException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.jmockit.Deencapsulation;
import org.apache.doris.load.routineload.KafkaProgress;
@@ -104,7 +106,7 @@ public class GlobalTransactionMgrTest {
@Test
public void testBeginTransaction() throws LabelAlreadyUsedException,
AnalysisException,
- BeginTransactionException, DuplicatedRequestException {
+ BeginTransactionException, DuplicatedRequestException,
QuotaExceedException, MetaNotFoundException {
FakeCatalog.setCatalog(masterCatalog);
long transactionId =
masterTransMgr.beginTransaction(CatalogTestUtil.testDbId1,
Lists.newArrayList(CatalogTestUtil.testTableId1),
CatalogTestUtil.testTxnLabel1,
@@ -130,6 +132,10 @@ public class GlobalTransactionMgrTest {
LoadJobSourceType.FRONTEND,
Config.stream_load_default_timeout_second);
} catch (AnalysisException | LabelAlreadyUsedException e) {
e.printStackTrace();
+ } catch (MetaNotFoundException e) {
+ e.printStackTrace();
+ } catch (QuotaExceedException e) {
+ e.printStackTrace();
}
TransactionState transactionState =
masterTransMgr.getTransactionState(CatalogTestUtil.testDbId1, transactionId);
assertNotNull(transactionState);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]