Repository: hbase Updated Branches: refs/heads/master a3c5a7448 -> 3e426b2f8
HBASE-18099 FlushSnapshotSubprocedure should wait for concurrent Region#flush() to finish Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/3e426b2f Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/3e426b2f Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/3e426b2f Branch: refs/heads/master Commit: 3e426b2f851f40dbc4e8d53c607cddebdb8a73e0 Parents: a3c5a74 Author: tedyu <[email protected]> Authored: Thu May 25 04:41:29 2017 -0700 Committer: tedyu <[email protected]> Committed: Thu May 25 04:41:29 2017 -0700 ---------------------------------------------------------------------- .../hadoop/hbase/regionserver/HRegion.java | 31 ++++++++++++++++++++ .../hadoop/hbase/regionserver/Region.java | 3 ++ .../snapshot/FlushSnapshotSubprocedure.java | 8 ++++- 3 files changed, 41 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/3e426b2f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index a4a7537..f58729d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -1757,6 +1757,37 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } } + @Override + public void waitForFlushes() { + synchronized (writestate) { + if (this.writestate.readOnly) { + // we should not wait for replayed flushed if we are read only (for example in case the + // region is a secondary replica). + return; + } + if (!writestate.flushing) return; + long start = System.currentTimeMillis(); + boolean interrupted = false; + try { + while (writestate.flushing) { + LOG.debug("waiting for cache flush to complete for region " + this); + try { + writestate.wait(); + } catch (InterruptedException iex) { + // essentially ignore and propagate the interrupt back up + LOG.warn("Interrupted while waiting"); + interrupted = true; + } + } + } finally { + if (interrupted) { + Thread.currentThread().interrupt(); + } + } + long duration = System.currentTimeMillis() - start; + LOG.debug("Waited " + duration + " ms for flush to complete"); + } + } protected ThreadPoolExecutor getStoreOpenAndCloseThreadPool( final String threadNamePrefix) { int numStores = Math.max(1, this.htableDescriptor.getFamilies().size()); http://git-wip-us.apache.org/repos/asf/hbase/blob/3e426b2f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java index 295b825..5ff5e52 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java @@ -745,4 +745,7 @@ public interface Region extends ConfigurationObserver { /** Wait for all current flushes and compactions of the region to complete */ void waitForFlushesAndCompactions(); + /** Wait for all current flushes of the region to complete + */ + void waitForFlushes(); } http://git-wip-us.apache.org/repos/asf/hbase/blob/3e426b2f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/FlushSnapshotSubprocedure.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/FlushSnapshotSubprocedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/FlushSnapshotSubprocedure.java index 9c42e4d..22df895 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/FlushSnapshotSubprocedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/FlushSnapshotSubprocedure.java @@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.procedure.ProcedureMember; import org.apache.hadoop.hbase.procedure.Subprocedure; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.Region; +import org.apache.hadoop.hbase.regionserver.Region.FlushResult; import org.apache.hadoop.hbase.regionserver.snapshot.RegionServerSnapshotManager.SnapshotSubprocedurePool; import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription; import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils; @@ -95,7 +96,12 @@ public class FlushSnapshotSubprocedure extends Subprocedure { LOG.debug("take snapshot without flush memstore first"); } else { LOG.debug("Flush Snapshotting region " + region.toString() + " started..."); - region.flush(true); + FlushResult res = region.flush(true); + if (res.getResult() == FlushResult.Result.CANNOT_FLUSH) { + // CANNOT_FLUSH may mean that a flush is already on-going + // we need to wait for that flush to complete + region.waitForFlushes(); + } } ((HRegion)region).addRegionToSnapshot(snapshot, monitor); if (snapshotSkipFlush) {
