saintstack commented on a change in pull request #2490:
URL: https://github.com/apache/hbase/pull/2490#discussion_r499790081



##########
File path: 
hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
##########
@@ -406,11 +396,8 @@ public void run() {
   private final LockManager lockManager = new LockManager(this);
 
   private RSGroupBasedLoadBalancer balancer;
-  // a lock to prevent concurrent normalization actions.
-  private final ReentrantLock normalizationInProgressLock = new 
ReentrantLock();
-  private RegionNormalizer normalizer;
   private BalancerChore balancerChore;
-  private RegionNormalizerChore normalizerChore;
+  private RegionNormalizerManager regionNormalizerManager;

Review comment:
       Good. Normalizer is growing-up!

##########
File path: 
hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
##########
@@ -3003,18 +2933,20 @@ public double getAverageLoad() {
     return regionStates.getAverageLoad();
   }
 
-  /*
-   * @return the count of region split plans executed
+  /**
+   * Exposed here for metrics.
+   * @see RegionNormalizerManager#getSplitPlanCount()
    */
   public long getSplitPlanCount() {
-    return splitPlanCount;
+    return regionNormalizerManager.getSplitPlanCount();
   }
 
-  /*
-   * @return the count of region merge plans executed
+  /**
+   * Exposed here for metrics.
+   * @see RegionNormalizerManager#getMergePlanCount()
    */
   public long getMergePlanCount() {
-    return mergePlanCount;
+    return regionNormalizerManager.getMergePlanCount();

Review comment:
       These things have to be public on HMaster?

##########
File path: 
hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/RegionNormalizerManager.java
##########
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.normalizer;
+
+import edu.umd.cs.findbugs.annotations.NonNull;
+import edu.umd.cs.findbugs.annotations.Nullable;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.zookeeper.RegionNormalizerTracker;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.zookeeper.KeeperException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import 
org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+/**
+ * This class encapsulates the details of the {@link RegionNormalizer} 
subsystem.
+ */
+@InterfaceAudience.Private
+public class RegionNormalizerManager {
+  private static final Logger LOG = 
LoggerFactory.getLogger(RegionNormalizerManager.class);
+
+  private final RegionNormalizerTracker regionNormalizerTracker;
+  private final RegionNormalizerChore regionNormalizerChore;
+  private final RegionNormalizerWorkQueue<TableName> workQueue;
+  private final RegionNormalizerWorker worker;
+  private final ExecutorService pool;
+
+  public RegionNormalizerManager(
+    @NonNull  final RegionNormalizerTracker regionNormalizerTracker,
+    @Nullable final RegionNormalizerChore regionNormalizerChore,
+    @Nullable final RegionNormalizerWorkQueue<TableName> workQueue,
+    @Nullable final RegionNormalizerWorker worker
+  ) {
+    this.regionNormalizerTracker = regionNormalizerTracker;
+    this.regionNormalizerChore = regionNormalizerChore;
+    this.workQueue = workQueue;
+    this.worker = worker;
+    this.pool = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder()
+      .setDaemon(true)
+      .setNameFormat("normalizer-worker-%d")
+      .setUncaughtExceptionHandler(
+        (thread, throwable) ->
+          LOG.error("Uncaught exception, worker thread likely terminated.", 
throwable))
+      .build());
+  }
+
+  public void start() {
+    regionNormalizerTracker.start();
+    if (worker != null) {
+      // worker will be null when master is in maintenance mode.
+      pool.submit(worker);
+    }
+  }
+
+  public void stop() {
+    pool.shutdownNow(); // shutdownNow to interrupt the worker thread sitting 
on `take()`

Review comment:
       Ok if already stopped? idempotent?

##########
File path: 
hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/RegionNormalizerManager.java
##########
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.normalizer;
+
+import edu.umd.cs.findbugs.annotations.NonNull;
+import edu.umd.cs.findbugs.annotations.Nullable;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.zookeeper.RegionNormalizerTracker;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.zookeeper.KeeperException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import 
org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+/**
+ * This class encapsulates the details of the {@link RegionNormalizer} 
subsystem.
+ */
+@InterfaceAudience.Private
+public class RegionNormalizerManager {
+  private static final Logger LOG = 
LoggerFactory.getLogger(RegionNormalizerManager.class);
+
+  private final RegionNormalizerTracker regionNormalizerTracker;
+  private final RegionNormalizerChore regionNormalizerChore;
+  private final RegionNormalizerWorkQueue<TableName> workQueue;
+  private final RegionNormalizerWorker worker;
+  private final ExecutorService pool;
+
+  public RegionNormalizerManager(
+    @NonNull  final RegionNormalizerTracker regionNormalizerTracker,
+    @Nullable final RegionNormalizerChore regionNormalizerChore,
+    @Nullable final RegionNormalizerWorkQueue<TableName> workQueue,
+    @Nullable final RegionNormalizerWorker worker
+  ) {
+    this.regionNormalizerTracker = regionNormalizerTracker;
+    this.regionNormalizerChore = regionNormalizerChore;
+    this.workQueue = workQueue;
+    this.worker = worker;
+    this.pool = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder()
+      .setDaemon(true)
+      .setNameFormat("normalizer-worker-%d")
+      .setUncaughtExceptionHandler(
+        (thread, throwable) ->
+          LOG.error("Uncaught exception, worker thread likely terminated.", 
throwable))
+      .build());
+  }
+
+  public void start() {
+    regionNormalizerTracker.start();
+    if (worker != null) {
+      // worker will be null when master is in maintenance mode.
+      pool.submit(worker);
+    }
+  }
+
+  public void stop() {
+    pool.shutdownNow(); // shutdownNow to interrupt the worker thread sitting 
on `take()`
+    regionNormalizerTracker.stop();
+  }
+
+  public RegionNormalizerChore getRegionNormalizerChore() {
+    return regionNormalizerChore;
+  }
+
+  /**
+   * Return {@code true} if region normalizer is on, {@code false} otherwise
+   */
+  public boolean isNormalizerOn() {
+    return regionNormalizerTracker.isNormalizerOn();
+  }
+
+  /**
+   * Set region normalizer on/off
+   * @param normalizerOn whether normalizer should be on or off
+   */
+  public void setNormalizerOn(boolean normalizerOn) {
+    try {
+      regionNormalizerTracker.setNormalizerOn(normalizerOn);
+    } catch (KeeperException e) {
+      LOG.warn("Error flipping normalizer switch", e);
+    }
+  }
+
+  /**
+   * Call-back for the case where plan couldn't be executed due to constraint 
violation,
+   * such as namespace quota.
+   * @param type type of plan that was skipped.
+   */
+  public void planSkipped(NormalizationPlan.PlanType type) {
+    // TODO: this appears to be used only for testing.
+    if (worker != null) {
+      worker.planSkipped(type);
+    }
+  }
+
+  /**
+   * Retrieve a count of the number of times plans of type {@code type} were 
submitted but skipped.
+   * @param type type of plan for which skipped count is to be returned
+   */
+  public long getSkippedCount(NormalizationPlan.PlanType type) {
+    // TODO: this appears to be used only for testing.
+    return worker == null ? 0 : worker.getSkippedCount(type);
+  }
+
+  /**
+   * Return the number of times a {@link SplitNormalizationPlan} has been 
submitted.
+   */
+  public long getSplitPlanCount() {
+    return worker == null ? 0 : worker.getSplitPlanCount();
+  }
+
+  /**
+   * Return the number of times a {@link MergeNormalizationPlan} has been 
submitted.
+   */
+  public long getMergePlanCount() {
+    return worker == null ? 0 : worker.getMergePlanCount();
+  }
+
+  /**
+   * Submit tables for normalization.
+   * @param tables   a list of tables to submit.
+   * @param priority {@code true} when these requested tables should skip to 
the front of the queue.
+   * @return {@code true} when work was queued, {@code false} otherwise.
+   */
+  public boolean normalizeRegions(List<TableName> tables, boolean priority) {
+    if (workQueue == null) {
+      return false;
+    }
+    if (priority) {
+      workQueue.putAllFirst(tables);
+    } else {
+      workQueue.putAll(tables);
+    }
+    return true;
+  }
+}

Review comment:
       Got this far. Will be back to review more.

##########
File path: 
hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java
##########
@@ -354,6 +353,13 @@ long splitRegion(
    */
   boolean isInMaintenanceMode();
 
+  /**
+   * Checks master state before initiating action over region topology.
+   * @param action the name of the action under consideration, for logging.
+   * @return {@code true} when the caller should exit early, {@code false} 
otherwise.
+   */
+  boolean skipRegionManagementAction(final String action);

Review comment:
       What is this?  Is it substantial enough to be added to this Interface?

##########
File path: 
hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/RegionNormalizerManager.java
##########
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.normalizer;
+
+import edu.umd.cs.findbugs.annotations.NonNull;

Review comment:
       These ok?
   
   FindBugs™ is licenced under the LGPL. Copyright © 2006 University of 
Maryland.

##########
File path: 
hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/RegionNormalizerFactory.java
##########
@@ -32,13 +35,30 @@
   private RegionNormalizerFactory() {
   }
 
+  public static RegionNormalizerManager createNormalizerService(

Review comment:
       method is createNormalizeService but it returns a 'manager'.

##########
File path: 
hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
##########
@@ -1953,20 +1952,27 @@ public SetNormalizerRunningResponse 
setNormalizerRunning(RpcController controlle
     rpcPreCheck("setNormalizerRunning");
 
     // Sets normalizer on/off flag in ZK.
-    boolean prevValue = master.getRegionNormalizerTracker().isNormalizerOn();
-    boolean newValue = request.getOn();
-    try {
-      master.getRegionNormalizerTracker().setNormalizerOn(newValue);
-    } catch (KeeperException ke) {
-      LOG.warn("Error flipping normalizer switch", ke);
-    }
+    // TODO: this method is totally broken in terms of atomicity of actions 
and values read.
+    //  1. The contract has this RPC returning the previous value. There isn't 
a ZKUtil method
+    //     that lets us retrieve the previous value as part of setting a new 
value, so we simply
+    //     perform a read before issuing the update. Thus we have a data race 
opportunity, between
+    //     when the `prevValue` is read and whatever is actually overwritten.
+    //  2. Down in `setNormalizerOn`, the call to `createAndWatch` inside of 
the catch clause can
+    //     itself fail in the event that the znode already exists. Thus, 
another data race, between
+    //     when the initial `setData` call is notified of the absence of the 
target znode and the
+    //     subsequent `createAndWatch`, with another client creating said node.
+    //  That said, there's supposed to be only one active master and thus 
there's supposed to be
+    //  only one process with the authority to modify the value.
+    final boolean prevValue = 
master.getRegionNormalizerManager().isNormalizerOn();
+    final boolean newValue = request.getOn();
+    master.getRegionNormalizerManager().setNormalizerOn(newValue);
     LOG.info("{} set normalizerSwitch={}", master.getClientIdAuditPrefix(), 
newValue);
     return 
SetNormalizerRunningResponse.newBuilder().setPrevNormalizerValue(prevValue).build();
   }
 
   @Override
   public IsNormalizerEnabledResponse isNormalizerEnabled(RpcController 
controller,

Review comment:
       Changing signature but this should be fine since this is IA.Private.

##########
File path: 
hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/RegionNormalizerManager.java
##########
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.normalizer;
+
+import edu.umd.cs.findbugs.annotations.NonNull;
+import edu.umd.cs.findbugs.annotations.Nullable;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.zookeeper.RegionNormalizerTracker;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.zookeeper.KeeperException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import 
org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+/**
+ * This class encapsulates the details of the {@link RegionNormalizer} 
subsystem.
+ */
+@InterfaceAudience.Private
+public class RegionNormalizerManager {
+  private static final Logger LOG = 
LoggerFactory.getLogger(RegionNormalizerManager.class);
+
+  private final RegionNormalizerTracker regionNormalizerTracker;
+  private final RegionNormalizerChore regionNormalizerChore;
+  private final RegionNormalizerWorkQueue<TableName> workQueue;
+  private final RegionNormalizerWorker worker;
+  private final ExecutorService pool;
+
+  public RegionNormalizerManager(
+    @NonNull  final RegionNormalizerTracker regionNormalizerTracker,
+    @Nullable final RegionNormalizerChore regionNormalizerChore,
+    @Nullable final RegionNormalizerWorkQueue<TableName> workQueue,
+    @Nullable final RegionNormalizerWorker worker
+  ) {
+    this.regionNormalizerTracker = regionNormalizerTracker;
+    this.regionNormalizerChore = regionNormalizerChore;
+    this.workQueue = workQueue;
+    this.worker = worker;
+    this.pool = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder()
+      .setDaemon(true)
+      .setNameFormat("normalizer-worker-%d")
+      .setUncaughtExceptionHandler(
+        (thread, throwable) ->
+          LOG.error("Uncaught exception, worker thread likely terminated.", 
throwable))
+      .build());
+  }
+
+  public void start() {
+    regionNormalizerTracker.start();

Review comment:
       Check it already started or that silly?

##########
File path: 
hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/SplitTableRegionProcedure.java
##########
@@ -570,8 +569,10 @@ private void preSplitRegion(final MasterProcedureEnv env)
     try {
       
env.getMasterServices().getMasterQuotaManager().onRegionSplit(this.getParentRegion());
     } catch (QuotaExceededException e) {
-      
env.getMasterServices().getRegionNormalizer().planSkipped(this.getParentRegion(),
-          NormalizationPlan.PlanType.SPLIT);
+      // TODO: why is this here? merge requests can be submitted by actors 
other than the normalizer

Review comment:
       merge? You mean split?
   
   git blame doesn't explain why this is here?
   
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to