This is an automated email from the ASF dual-hosted git repository.

fjy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/master by this push:
     new aef022d  Fix race in taskMaster (#6388)
aef022d is described below

commit aef022de98bc350231af38c7ce6959b7a61b5320
Author: Jihoon Son <[email protected]>
AuthorDate: Wed Sep 26 21:48:02 2018 -0700

    Fix race in taskMaster (#6388)
---
 .../apache/druid/indexing/overlord/TaskMaster.java | 27 ++++++++++++++++------
 .../druid/server/coordinator/DruidCoordinator.java | 11 +++++----
 2 files changed, 26 insertions(+), 12 deletions(-)

diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskMaster.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskMaster.java
index 4c6d5ec..4428661 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskMaster.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskMaster.java
@@ -25,6 +25,7 @@ import com.google.inject.Inject;
 import org.apache.druid.client.indexing.IndexingService;
 import org.apache.druid.curator.discovery.ServiceAnnouncer;
 import org.apache.druid.discovery.DruidLeaderSelector;
+import org.apache.druid.discovery.DruidLeaderSelector.Listener;
 import org.apache.druid.guice.annotations.Self;
 import org.apache.druid.indexing.common.actions.TaskActionClient;
 import org.apache.druid.indexing.common.actions.TaskActionClientFactory;
@@ -49,6 +50,8 @@ import java.util.concurrent.locks.ReentrantLock;
  */
 public class TaskMaster
 {
+  private static final EmittingLogger log = new 
EmittingLogger(TaskMaster.class);
+
   private final DruidLeaderSelector overlordLeaderSelector;
   private final DruidLeaderSelector.Listener leadershipListener;
 
@@ -61,7 +64,12 @@ public class TaskMaster
   private volatile TaskRunner taskRunner;
   private volatile TaskQueue taskQueue;
 
-  private static final EmittingLogger log = new 
EmittingLogger(TaskMaster.class);
+  /**
+   * This flag indicates that all services has been started and should be true 
before calling
+   * {@link ServiceAnnouncer#announce}. This is set to false immediately once 
{@link Listener#stopBeingLeader()} is
+   * called.
+   */
+  private volatile boolean initialized;
 
   @Inject
   public TaskMaster(
@@ -127,6 +135,7 @@ public class TaskMaster
                 @Override
                 public void start()
                 {
+                  initialized = true;
                   serviceAnnouncer.announce(node);
                 }
 
@@ -153,6 +162,7 @@ public class TaskMaster
       {
         giant.lock();
         try {
+          initialized = false;
           final Lifecycle leaderLifecycle = leaderLifecycleRef.getAndSet(null);
           if (leaderLifecycle != null) {
             leaderLifecycle.stop();
@@ -198,9 +208,12 @@ public class TaskMaster
     }
   }
 
+  /**
+   * Returns true if it's the leader and its all services have been properly 
initialized.
+   */
   public boolean isLeader()
   {
-    return overlordLeaderSelector.isLeader();
+    return overlordLeaderSelector.isLeader() && initialized;
   }
 
   public String getCurrentLeader()
@@ -210,7 +223,7 @@ public class TaskMaster
 
   public Optional<TaskRunner> getTaskRunner()
   {
-    if (overlordLeaderSelector.isLeader()) {
+    if (isLeader()) {
       return Optional.of(taskRunner);
     } else {
       return Optional.absent();
@@ -219,7 +232,7 @@ public class TaskMaster
 
   public Optional<TaskQueue> getTaskQueue()
   {
-    if (overlordLeaderSelector.isLeader()) {
+    if (isLeader()) {
       return Optional.of(taskQueue);
     } else {
       return Optional.absent();
@@ -228,7 +241,7 @@ public class TaskMaster
 
   public Optional<TaskActionClient> getTaskActionClient(Task task)
   {
-    if (overlordLeaderSelector.isLeader()) {
+    if (isLeader()) {
       return Optional.of(taskActionClientFactory.create(task));
     } else {
       return Optional.absent();
@@ -237,7 +250,7 @@ public class TaskMaster
 
   public Optional<ScalingStats> getScalingStats()
   {
-    if (overlordLeaderSelector.isLeader()) {
+    if (isLeader()) {
       return taskRunner.getScalingStats();
     } else {
       return Optional.absent();
@@ -246,7 +259,7 @@ public class TaskMaster
 
   public Optional<SupervisorManager> getSupervisorManager()
   {
-    if (overlordLeaderSelector.isLeader()) {
+    if (isLeader()) {
       return Optional.of(supervisorManager);
     } else {
       return Optional.absent();
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
index 6e11833..40a9739 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
@@ -102,6 +102,7 @@ public class DruidCoordinator
                                                                      
.reverse();
 
   private static final EmittingLogger log = new 
EmittingLogger(DruidCoordinator.class);
+
   private final Object lock = new Object();
   private final DruidCoordinatorConfig config;
   private final ZkPathsConfig zkPaths;
@@ -118,14 +119,15 @@ public class DruidCoordinator
   private final ServiceAnnouncer serviceAnnouncer;
   private final DruidNode self;
   private final Set<DruidCoordinatorHelper> indexingServiceHelpers;
-  private volatile boolean started = false;
-  private volatile SegmentReplicantLookup segmentReplicantLookup = null;
   private final BalancerStrategyFactory factory;
   private final LookupCoordinatorManager lookupCoordinatorManager;
   private final DruidLeaderSelector coordLeaderSelector;
 
   private final DruidCoordinatorSegmentCompactor segmentCompactor;
 
+  private volatile boolean started = false;
+  private volatile SegmentReplicantLookup segmentReplicantLookup = null;
+
   @Inject
   public DruidCoordinator(
       DruidCoordinatorConfig config,
@@ -532,6 +534,7 @@ public class DruidCoordinator
 
       metadataSegmentManager.start();
       metadataRuleManager.start();
+      lookupCoordinatorManager.start();
       serviceAnnouncer.announce(self);
       final int startingLeaderCounter = coordLeaderSelector.localTerm();
 
@@ -579,8 +582,6 @@ public class DruidCoordinator
             }
         );
       }
-
-      lookupCoordinatorManager.start();
     }
   }
 
@@ -597,9 +598,9 @@ public class DruidCoordinator
       loadManagementPeons.clear();
 
       serviceAnnouncer.unannounce(self);
+      lookupCoordinatorManager.stop();
       metadataRuleManager.stop();
       metadataSegmentManager.stop();
-      lookupCoordinatorManager.stop();
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to