This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new 2586f09 [Bug] Fix bug that SHOW DELETE not return Delete job info
(#3515)
2586f09 is described below
commit 2586f095486ee7019916c320a63af4c9fd712d66
Author: Mingyu Chen <[email protected]>
AuthorDate: Fri May 8 13:04:20 2020 +0800
[Bug] Fix bug that SHOW DELETE not return Delete job info (#3515)
The callback added to the CallbackFactory should not be removed until the
transaction is aborted or visible. Otherwise, some callback method may
failed
to be called.
---
.../apache/doris/common/MarkedCountDownLatch.java | 8 +++--
.../java/org/apache/doris/load/DeleteHandler.java | 40 ++++++++++++++--------
.../main/java/org/apache/doris/load/DeleteJob.java | 19 ++++++++--
.../doris/transaction/TxnStateCallbackFactory.java | 6 ++--
4 files changed, 51 insertions(+), 22 deletions(-)
diff --git a/fe/src/main/java/org/apache/doris/common/MarkedCountDownLatch.java
b/fe/src/main/java/org/apache/doris/common/MarkedCountDownLatch.java
index 74a42ee..53aa426 100644
--- a/fe/src/main/java/org/apache/doris/common/MarkedCountDownLatch.java
+++ b/fe/src/main/java/org/apache/doris/common/MarkedCountDownLatch.java
@@ -56,11 +56,13 @@ public class MarkedCountDownLatch<K, V> extends
CountDownLatch {
}
public synchronized void countDownToZero(Status status) {
- while(getCount() > 0) {
- super.countDown();
- }
+ // update status first before countDown.
+ // so that the waiting thread will get the correct status.
if (st.ok()) {
st = status;
}
+ while(getCount() > 0) {
+ super.countDown();
+ }
}
}
diff --git a/fe/src/main/java/org/apache/doris/load/DeleteHandler.java
b/fe/src/main/java/org/apache/doris/load/DeleteHandler.java
index c6eee8e..621cffc 100644
--- a/fe/src/main/java/org/apache/doris/load/DeleteHandler.java
+++ b/fe/src/main/java/org/apache/doris/load/DeleteHandler.java
@@ -17,11 +17,6 @@
package org.apache.doris.load;
-import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.gson.annotations.SerializedName;
import org.apache.doris.analysis.BinaryPredicate;
import org.apache.doris.analysis.DeleteStmt;
import org.apache.doris.analysis.IsNullPredicate;
@@ -55,16 +50,16 @@ import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.ListComparator;
import org.apache.doris.common.util.TimeUtils;
+import org.apache.doris.load.DeleteJob.DeleteState;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.qe.ConnectContext;
-import org.apache.doris.qe.QueryStateException;
import org.apache.doris.qe.QueryState.MysqlStateType;
+import org.apache.doris.qe.QueryStateException;
import org.apache.doris.service.FrontendOptions;
import org.apache.doris.task.AgentBatchTask;
import org.apache.doris.task.AgentTaskExecutor;
import org.apache.doris.task.AgentTaskQueue;
-import org.apache.doris.load.DeleteJob.DeleteState;
import org.apache.doris.task.PushTask;
import org.apache.doris.thrift.TPriority;
import org.apache.doris.thrift.TPushType;
@@ -72,9 +67,16 @@ import org.apache.doris.thrift.TTaskType;
import org.apache.doris.transaction.GlobalTransactionMgr;
import org.apache.doris.transaction.TabletCommitInfo;
import org.apache.doris.transaction.TransactionState;
-import org.apache.doris.transaction.TransactionStatus;
import org.apache.doris.transaction.TransactionState.TxnCoordinator;
import org.apache.doris.transaction.TransactionState.TxnSourceType;
+import org.apache.doris.transaction.TransactionStatus;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.gson.annotations.SerializedName;
+
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -255,6 +257,15 @@ public class DeleteHandler implements Writable {
}
if (!ok) {
+ String errMsg = "";
+ List<Entry<Long, Long>> unfinishedMarks =
countDownLatch.getLeftMarks();
+ // only show at most 5 results
+ List<Entry<Long, Long>> subList = unfinishedMarks.subList(0,
Math.min(unfinishedMarks.size(), 5));
+ if (!subList.isEmpty()) {
+ errMsg = "unfinished replicas: " + Joiner.on(",
").join(subList);
+ }
+ LOG.warn(errMsg);
+
try {
deleteJob.checkAndUpdateQuorum();
} catch (MetaNotFoundException e) {
@@ -264,14 +275,10 @@ public class DeleteHandler implements Writable {
DeleteState state = deleteJob.getState();
switch (state) {
case UN_QUORUM:
- List<Entry<Long, Long>> unfinishedMarks =
countDownLatch.getLeftMarks();
- // only show at most 5 results
- List<Entry<Long, Long>> subList =
unfinishedMarks.subList(0, Math.min(unfinishedMarks.size(), 5));
- String errMsg = "Unfinished replicas:" + Joiner.on(",
").join(subList);
LOG.warn("delete job timeout: transactionId {},
timeout {}, {}", transactionId, timeoutMs, errMsg);
cancelJob(deleteJob, CancelType.TIMEOUT, "delete job
timeout");
- throw new DdlException("failed to delete replicas from
job: transactionId " + transactionId +
- ", timeout " + timeoutMs + ", " + errMsg);
+ throw new DdlException("failed to execute delete.
transaction id " + transactionId +
+ ", timeout(ms) " + timeoutMs + ", " + errMsg);
case QUORUM_FINISHED:
case FINISHED:
try {
@@ -282,6 +289,7 @@ public class DeleteHandler implements Writable {
deleteJob.checkAndUpdateQuorum();
Thread.sleep(1000);
nowQuorumTimeMs = System.currentTimeMillis();
+ LOG.debug("wait for quorum finished delete
job: {}, txn id: {}" + deleteJob.getId(), transactionId);
}
} catch (MetaNotFoundException e) {
cancelJob(deleteJob, CancelType.METADATA_MISSING,
e.getMessage());
@@ -386,7 +394,9 @@ public class DeleteHandler implements Writable {
pushTask.getVersion(), pushTask.getVersionHash(),
pushTask.getPushType(), pushTask.getTaskType());
}
-
Catalog.getCurrentGlobalTransactionMgr().getCallbackFactory().removeCallback(job.getId());
+
+ // NOT remove callback from GlobalTransactionMgr's callback
factory here.
+ // the callback will be removed after transaction is aborted of
visible.
}
}
diff --git a/fe/src/main/java/org/apache/doris/load/DeleteJob.java
b/fe/src/main/java/org/apache/doris/load/DeleteJob.java
index 972a9cd..957d9f5 100644
--- a/fe/src/main/java/org/apache/doris/load/DeleteJob.java
+++ b/fe/src/main/java/org/apache/doris/load/DeleteJob.java
@@ -17,17 +17,21 @@
package org.apache.doris.load;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Replica;
import org.apache.doris.common.Config;
+import org.apache.doris.common.FeConstants;
import org.apache.doris.common.MetaNotFoundException;
+import org.apache.doris.common.UserException;
import org.apache.doris.task.PushTask;
import org.apache.doris.transaction.AbstractTxnStateChangeCallback;
import org.apache.doris.transaction.TransactionState;
+
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -165,6 +169,13 @@ public class DeleteJob extends
AbstractTxnStateChangeCallback {
Catalog.getCurrentCatalog().getEditLog().logFinishDelete(deleteInfo);
}
+ @Override
+ public void afterAborted(TransactionState txnState, boolean txnOperated,
String txnStatusChangeReason)
+ throws UserException {
+ // just to clean the callback
+
Catalog.getCurrentGlobalTransactionMgr().getCallbackFactory().removeCallback(getId());
+ }
+
public void executeFinish() {
setState(DeleteState.FINISHED);
Catalog.getCurrentCatalog().getDeleteHandler().recordFinishedJob(this);
@@ -180,6 +191,10 @@ public class DeleteJob extends
AbstractTxnStateChangeCallback {
}
public long getTimeoutMs() {
+ if (FeConstants.runningUnitTest) {
+ // for making unit test run fast
+ return 1000;
+ }
// timeout is between 30 seconds to 5 min
long timeout = Math.max(totalTablets.size() *
Config.tablet_delete_timeout_second * 1000L, 30000L);
return Math.min(timeout, Config.load_straggler_wait_second * 1000L);
diff --git
a/fe/src/main/java/org/apache/doris/transaction/TxnStateCallbackFactory.java
b/fe/src/main/java/org/apache/doris/transaction/TxnStateCallbackFactory.java
index 28accde..b9d80bd 100644
--- a/fe/src/main/java/org/apache/doris/transaction/TxnStateCallbackFactory.java
+++ b/fe/src/main/java/org/apache/doris/transaction/TxnStateCallbackFactory.java
@@ -35,13 +35,15 @@ public class TxnStateCallbackFactory {
return false;
}
callbacks.put(callback.getId(), callback);
- LOG.info("add callback of txn state : {}", callback.getId());
+ LOG.info("add callback of txn state : {}. current callback size: {}",
+ callback.getId(), callbacks.size());
return true;
}
public synchronized void removeCallback(long id) {
if (callbacks.remove(id) != null) {
- LOG.info("remove callback of txn state : {}", id);
+ LOG.info("remove callback of txn state : {}. current callback
size: {}",
+ id, callbacks.size());
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]