morningman closed pull request #438: Check meta context when update partition 
version
URL: https://github.com/apache/incubator-doris/pull/438
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/fe/src/main/java/org/apache/doris/catalog/Partition.java 
b/fe/src/main/java/org/apache/doris/catalog/Partition.java
index 4359f682..0910a8d1 100644
--- a/fe/src/main/java/org/apache/doris/catalog/Partition.java
+++ b/fe/src/main/java/org/apache/doris/catalog/Partition.java
@@ -22,6 +22,7 @@
 import org.apache.doris.common.io.Text;
 import org.apache.doris.common.io.Writable;
 import org.apache.doris.common.util.Util;
+import org.apache.doris.meta.MetaContext;
 
 import java.io.DataInput;
 import java.io.DataOutput;
@@ -113,17 +114,20 @@ public void setState(PartitionState state) {
     public void updateVisibleVersionAndVersionHash(long visibleVersion, long 
visibleVersionHash) {
         this.visibleVersion = visibleVersion;
         this.visibleVersionHash = visibleVersionHash;
-        // if it is upgrade from old palo cluster, then should update next 
version info
-        if (Catalog.getCurrentCatalogJournalVersion() < 
FeMetaVersion.VERSION_45) {
-            // the partition is created and not import any data
-            if (visibleVersion == PARTITION_INIT_VERSION + 1 && 
visibleVersionHash == PARTITION_INIT_VERSION_HASH) {
-                this.nextVersion = PARTITION_INIT_VERSION + 1;
-                this.nextVersionHash = Util.generateVersionHash();
-                this.committedVersionHash = PARTITION_INIT_VERSION_HASH;
-            } else {
-                this.nextVersion = visibleVersion + 1;
-                this.nextVersionHash = Util.generateVersionHash();
-                this.committedVersionHash = visibleVersionHash;
+        if (MetaContext.get() != null) {
+            // MetaContext is not null means we are in a edit log replay 
thread.
+            // if it is upgrade from old palo cluster, then should update next 
version info
+            if (Catalog.getCurrentCatalogJournalVersion() < 
FeMetaVersion.VERSION_45) {
+                // the partition is created and not import any data
+                if (visibleVersion == PARTITION_INIT_VERSION + 1 && 
visibleVersionHash == PARTITION_INIT_VERSION_HASH) {
+                    this.nextVersion = PARTITION_INIT_VERSION + 1;
+                    this.nextVersionHash = Util.generateVersionHash();
+                    this.committedVersionHash = PARTITION_INIT_VERSION_HASH;
+                } else {
+                    this.nextVersion = visibleVersion + 1;
+                    this.nextVersionHash = Util.generateVersionHash();
+                    this.committedVersionHash = visibleVersionHash;
+                }
             }
         }
     }
diff --git a/fe/src/main/java/org/apache/doris/load/Load.java 
b/fe/src/main/java/org/apache/doris/load/Load.java
index ad334924..2dca3de1 100644
--- a/fe/src/main/java/org/apache/doris/load/Load.java
+++ b/fe/src/main/java/org/apache/doris/load/Load.java
@@ -17,15 +17,6 @@
 
 package org.apache.doris.load;
 
-import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Strings;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import com.google.gson.Gson;
-
-import org.apache.commons.lang.StringUtils;
 import org.apache.doris.analysis.BinaryPredicate;
 import org.apache.doris.analysis.CancelLoadStmt;
 import org.apache.doris.analysis.ColumnSeparator;
@@ -95,6 +86,16 @@
 import org.apache.doris.transaction.TransactionState;
 import org.apache.doris.transaction.TransactionState.LoadJobSourceType;
 import org.apache.doris.transaction.TransactionStatus;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import com.google.gson.Gson;
+
+import org.apache.commons.lang.StringUtils;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -2228,6 +2229,8 @@ public boolean updateLoadJobState(LoadJob job, JobState 
destState, CancelType ca
                                 }
                             }
                             MetricRepo.COUNTER_LOAD_FINISHED.increase(1L);
+                            // job will transfer from LOADING to FINISHED, 
skip QUORUM_FINISHED
+                            idToLoadingLoadJob.remove(jobId);
                             idToQuorumFinishedLoadJob.remove(jobId);
                             job.setState(destState);
 
diff --git a/fe/src/main/java/org/apache/doris/load/LoadChecker.java 
b/fe/src/main/java/org/apache/doris/load/LoadChecker.java
index 0b812bd8..91a8ba34 100644
--- a/fe/src/main/java/org/apache/doris/load/LoadChecker.java
+++ b/fe/src/main/java/org/apache/doris/load/LoadChecker.java
@@ -20,23 +20,13 @@
 import org.apache.doris.alter.RollupJob;
 import org.apache.doris.catalog.Catalog;
 import org.apache.doris.catalog.Database;
-import org.apache.doris.catalog.Replica;
-import org.apache.doris.catalog.Replica.ReplicaState;
-import org.apache.doris.transaction.GlobalTransactionMgr;
-import org.apache.doris.transaction.TabletCommitInfo;
-import org.apache.doris.transaction.TransactionCommitFailedException;
-import org.apache.doris.transaction.TransactionException;
-import org.apache.doris.transaction.TransactionState;
-import org.apache.doris.transaction.TransactionStatus;
+import org.apache.doris.catalog.MaterializedIndex;
 import org.apache.doris.catalog.MaterializedIndex.IndexState;
 import org.apache.doris.catalog.OlapTable;
-import org.apache.doris.catalog.Tablet;
-import org.apache.doris.catalog.MaterializedIndex;
 import org.apache.doris.catalog.Partition;
+import org.apache.doris.catalog.Replica;
+import org.apache.doris.catalog.Tablet;
 import org.apache.doris.catalog.TabletInvertedIndex;
-import org.apache.doris.clone.Clone;
-import org.apache.doris.clone.CloneJob.JobPriority;
-import org.apache.doris.clone.CloneJob.JobType;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.MetaNotFoundException;
 import org.apache.doris.common.util.Daemon;
@@ -48,22 +38,28 @@
 import org.apache.doris.task.AgentTaskExecutor;
 import org.apache.doris.task.AgentTaskQueue;
 import org.apache.doris.task.HadoopLoadEtlTask;
-import org.apache.doris.task.MiniLoadEtlTask;
-import org.apache.doris.task.MiniLoadPendingTask;
 import org.apache.doris.task.HadoopLoadPendingTask;
 import org.apache.doris.task.InsertLoadEtlTask;
 import org.apache.doris.task.MasterTask;
 import org.apache.doris.task.MasterTaskExecutor;
+import org.apache.doris.task.MiniLoadEtlTask;
+import org.apache.doris.task.MiniLoadPendingTask;
 import org.apache.doris.task.PullLoadEtlTask;
 import org.apache.doris.task.PullLoadPendingTask;
 import org.apache.doris.task.PushTask;
 import org.apache.doris.thrift.TPriority;
 import org.apache.doris.thrift.TPushType;
 import org.apache.doris.thrift.TTaskType;
+import org.apache.doris.transaction.GlobalTransactionMgr;
+import org.apache.doris.transaction.TabletCommitInfo;
+import org.apache.doris.transaction.TransactionException;
+import org.apache.doris.transaction.TransactionState;
+import org.apache.doris.transaction.TransactionStatus;
+
 import com.google.common.collect.Maps;
 
-import org.apache.logging.log4j.Logger;
 import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
 
 import java.util.ArrayList;
 import java.util.HashSet;
@@ -302,7 +298,10 @@ private void runOneLoadingJob(LoadJob job) {
             // if could not commit successfully and commit again until job is 
timeout
             if (job.getQuorumFinishTimeMs() < 0) {
                 job.setQuorumFinishTimeMs(System.currentTimeMillis());
-            } else if (System.currentTimeMillis() - 
job.getQuorumFinishTimeMs() > stragglerTimeout 
+            }
+
+            // if all tablets are finished or stay in quorum finished for long 
time, try to commit it.
+            if (System.currentTimeMillis() - job.getQuorumFinishTimeMs() > 
stragglerTimeout
                     || job.getFullTablets().containsAll(jobTotalTablets)) {
                 tryCommitJob(job, db);
             }
diff --git 
a/fe/src/main/java/org/apache/doris/transaction/TransactionState.java 
b/fe/src/main/java/org/apache/doris/transaction/TransactionState.java
index e64a47a3..c4437087 100644
--- a/fe/src/main/java/org/apache/doris/transaction/TransactionState.java
+++ b/fe/src/main/java/org/apache/doris/transaction/TransactionState.java
@@ -381,16 +381,21 @@ public void setTxnCommitAttachment(TxnCommitAttachment 
txnCommitAttachment) {
     
     @Override
     public String toString() {
-        return "TransactionState [transactionId=" + transactionId
-                + ", label=" + label
-                + ", dbId=" + dbId
-                + ", coordinator=" + coordinator
-                + ", loadjobsource=" + sourceType
-                + ", transactionStatus=" + transactionStatus
-                + ", errorReplicas=" + errorReplicas
-                + ", prepareTime="
-                + prepareTime + ", commitTime=" + commitTime + ", finishTime="
-                + finishTime + ", reason=" + reason + ", txnCommitAttachment=" 
+ txnCommitAttachment.toString() + "]";
+        StringBuilder sb = new StringBuilder("TransactionState. ");
+        sb.append("transaction id: ").append(transactionId);
+        sb.append(", label: ").append(label);
+        sb.append(", db id: ").append(dbId);
+        sb.append(", coordinator: ").append(coordinator);
+        sb.append(", transaction status: ").append(transactionStatus);
+        sb.append(", error replicas num: ").append(errorReplicas.size());
+        sb.append(", prepare time: ").append(prepareTime);
+        sb.append(", commit time: ").append(commitTime);
+        sb.append(", finish time: ").append(finishTime);
+        sb.append(", reason: ").append(reason);
+        if (txnCommitAttachment != null) {
+            sb.append(" attactment: ").append(txnCommitAttachment);
+        }
+        return sb.toString();
     }
     
     public LoadJobSourceType getSourceType() {


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to