HBASE-10201 Port 'Make flush decisions per column family' to trunk
Signed-off-by: stack <[email protected]>
Conflicts:
hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/e55ef7a6
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/e55ef7a6
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/e55ef7a6
Branch: refs/heads/branch-1
Commit: e55ef7a663dd9a18fa88a506afd8fe0ced10563d
Parents: 9895604
Author: zhangduo <[email protected]>
Authored: Sat Dec 13 12:49:38 2014 +0800
Committer: stack <[email protected]>
Committed: Thu Dec 18 15:58:55 2014 -0800
----------------------------------------------------------------------
.../apache/hadoop/hbase/HTableDescriptor.java | 24 +
.../src/main/resources/hbase-default.xml | 19 +-
.../regionserver/FlushAllStoresPolicy.java | 35 +
.../regionserver/FlushLargeStoresPolicy.java | 108 ++++
.../hadoop/hbase/regionserver/FlushPolicy.java | 49 ++
.../hbase/regionserver/FlushPolicyFactory.java | 76 +++
.../hbase/regionserver/FlushRequester.java | 15 +-
.../hadoop/hbase/regionserver/HRegion.java | 314 ++++++---
.../hbase/regionserver/HRegionServer.java | 4 +-
.../hadoop/hbase/regionserver/LogRoller.java | 3 +-
.../hbase/regionserver/MemStoreFlusher.java | 81 ++-
.../hbase/regionserver/RSRpcServices.java | 12 +-
.../hadoop/hbase/regionserver/wal/FSHLog.java | 242 +++++--
.../hbase/regionserver/wal/FSWALEntry.java | 29 +-
.../hadoop/hbase/wal/DisabledWALProvider.java | 8 +-
.../java/org/apache/hadoop/hbase/wal/WAL.java | 11 +-
.../org/apache/hadoop/hbase/TestIOFencing.java | 4 +-
.../regionserver/TestFlushRegionEntry.java | 4 +-
.../regionserver/TestHeapMemoryManager.java | 16 +-
.../regionserver/TestPerColumnFamilyFlush.java | 644 +++++++++++++++++++
.../hbase/regionserver/wal/TestFSHLog.java | 42 +-
.../hbase/regionserver/wal/TestWALReplay.java | 19 +-
.../hbase/wal/TestDefaultWALProvider.java | 73 ++-
.../apache/hadoop/hbase/wal/TestWALFactory.java | 36 +-
24 files changed, 1549 insertions(+), 319 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/e55ef7a6/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java
----------------------------------------------------------------------
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java
index d16e8ba..ed0cec2 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java
@@ -144,6 +144,8 @@ public class HTableDescriptor implements
WritableComparable<HTableDescriptor> {
private static final ImmutableBytesWritable MEMSTORE_FLUSHSIZE_KEY =
new ImmutableBytesWritable(Bytes.toBytes(MEMSTORE_FLUSHSIZE));
+ public static final String FLUSH_POLICY = "FLUSH_POLICY";
+
/**
* <em>INTERNAL</em> Used by rest interface to access this metadata
* attribute which denotes if the table is a -ROOT- region or not
@@ -779,6 +781,28 @@ public class HTableDescriptor implements
WritableComparable<HTableDescriptor> {
}
/**
+ * This sets the class associated with the flush policy which determines
determines the stores
+ * need to be flushed when flushing a region. The class used by default is
defined in
+ * {@link org.apache.hadoop.hbase.regionserver.FlushPolicy}
+ * @param clazz the class name
+ */
+ public HTableDescriptor setFlushPolicyClassName(String clazz) {
+ setValue(FLUSH_POLICY, clazz);
+ return this;
+ }
+
+ /**
+ * This gets the class associated with the flush policy which determines the
stores need to be
+ * flushed when flushing a region. The class used by default is defined in
+ * {@link org.apache.hadoop.hbase.regionserver.FlushPolicy}
+ * @return the class name of the flush policy for this table. If this
returns null, the default
+ * flush policy is used.
+ */
+ public String getFlushPolicyClassName() {
+ return getValue(FLUSH_POLICY);
+ }
+
+ /**
* Adds a column family.
* For the updating purpose please use {@link
#modifyFamily(HColumnDescriptor)} instead.
* @param family HColumnDescriptor of family to add.
http://git-wip-us.apache.org/repos/asf/hbase/blob/e55ef7a6/hbase-common/src/main/resources/hbase-default.xml
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/resources/hbase-default.xml
b/hbase-common/src/main/resources/hbase-default.xml
index 9730560..9f0c3fe 100644
--- a/hbase-common/src/main/resources/hbase-default.xml
+++ b/hbase-common/src/main/resources/hbase-default.xml
@@ -187,7 +187,7 @@ possible configurations would overwhelm and obscure the
important.
A value of 0 means a single queue shared between all the handlers.
A value of 1 means that each handler has its own queue.</description>
</property>
-<property>
+ <property>
<name>hbase.ipc.server.callqueue.read.ratio</name>
<value>0</value>
<description>Split the call queues into read and write queues.
@@ -337,8 +337,8 @@ possible configurations would overwhelm and obscure the
important.
<value>org.apache.hadoop.hbase.regionserver.IncreasingToUpperBoundRegionSplitPolicy</value>
<description>
A split policy determines when a region should be split. The various
other split policies that
- are available currently are ConstantSizeRegionSplitPolicy,
DisabledRegionSplitPolicy,
- DelimitedKeyPrefixRegionSplitPolicy, KeyPrefixRegionSplitPolicy etc.
+ are available currently are ConstantSizeRegionSplitPolicy,
DisabledRegionSplitPolicy,
+ DelimitedKeyPrefixRegionSplitPolicy, KeyPrefixRegionSplitPolicy etc.
</description>
</property>
@@ -596,6 +596,19 @@ possible configurations would overwhelm and obscure the
important.
every hbase.server.thread.wakefrequency.</description>
</property>
<property>
+ <name>hbase.hregion.percolumnfamilyflush.size.lower.bound</name>
+ <value>16777216</value>
+ <description>
+ If FlushLargeStoresPolicy is used, then every time that we hit the
+ total memstore limit, we find out all the column families whose memstores
+ exceed this value, and only flush them, while retaining the others whose
+ memstores are lower than this limit. If none of the families have their
+ memstore size more than this, all the memstores will be flushed
+ (just as usual). This value should be less than half of the total memstore
+ threshold (hbase.hregion.memstore.flush.size).
+ </description>
+ </property>
+ <property>
<name>hbase.hregion.preclose.flush.size</name>
<value>5242880</value>
<description>
http://git-wip-us.apache.org/repos/asf/hbase/blob/e55ef7a6/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushAllStoresPolicy.java
----------------------------------------------------------------------
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushAllStoresPolicy.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushAllStoresPolicy.java
new file mode 100644
index 0000000..0058104
--- /dev/null
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushAllStoresPolicy.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.util.Collection;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * A {@link FlushPolicy} that always flushes all stores for a given region.
+ */
[email protected]
+public class FlushAllStoresPolicy extends FlushPolicy {
+
+ @Override
+ public Collection<Store> selectStoresToFlush() {
+ return region.stores.values();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/e55ef7a6/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushLargeStoresPolicy.java
----------------------------------------------------------------------
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushLargeStoresPolicy.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushLargeStoresPolicy.java
new file mode 100644
index 0000000..7e0e54c
--- /dev/null
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushLargeStoresPolicy.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * A {@link FlushPolicy} that only flushes store larger a given threshold. If
no store is large
+ * enough, then all stores will be flushed.
+ */
[email protected](HBaseInterfaceAudience.CONFIG)
+public class FlushLargeStoresPolicy extends FlushPolicy {
+
+ private static final Log LOG =
LogFactory.getLog(FlushLargeStoresPolicy.class);
+
+ public static final String HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND =
+ "hbase.hregion.percolumnfamilyflush.size.lower.bound";
+
+ private static final long
DEFAULT_HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND = 1024 * 1024 * 16L;
+
+ private long flushSizeLowerBound;
+
+ @Override
+ protected void configureForRegion(HRegion region) {
+ super.configureForRegion(region);
+ long flushSizeLowerBound;
+ String flushedSizeLowerBoundString =
+
region.getTableDesc().getValue(HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND);
+ if (flushedSizeLowerBoundString == null) {
+ flushSizeLowerBound =
+ getConf().getLong(HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND,
+ DEFAULT_HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND
+ + " is not specified, use global config(" + flushSizeLowerBound +
") instead");
+ }
+ } else {
+ try {
+ flushSizeLowerBound = Long.parseLong(flushedSizeLowerBoundString);
+ } catch (NumberFormatException nfe) {
+ flushSizeLowerBound =
+ getConf().getLong(HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND,
+ DEFAULT_HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND);
+ LOG.warn("Number format exception when parsing "
+ + HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND + " for table "
+ + region.getTableDesc().getTableName() + ":" +
flushedSizeLowerBoundString + ". " + nfe
+ + ", use global config(" + flushSizeLowerBound + ") instead");
+
+ }
+ }
+ this.flushSizeLowerBound = flushSizeLowerBound;
+ }
+
+ private boolean shouldFlush(Store store) {
+ if (store.getMemStoreSize() > this.flushSizeLowerBound) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Column Family: " + store.getColumnFamilyName() + " of
region " + region
+ + " will be flushed because of memstoreSize(" +
store.getMemStoreSize()
+ + ") is larger than lower bound(" + this.flushSizeLowerBound +
")");
+ }
+ return true;
+ }
+ return region.shouldFlushStore(store);
+ }
+
+ @Override
+ public Collection<Store> selectStoresToFlush() {
+ Collection<Store> stores = region.stores.values();
+ Set<Store> specificStoresToFlush = new HashSet<Store>();
+ for (Store store : stores) {
+ if (shouldFlush(store)) {
+ specificStoresToFlush.add(store);
+ }
+ }
+ // Didn't find any CFs which were above the threshold for selection.
+ if (specificStoresToFlush.isEmpty()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Since none of the CFs were above the size, flushing all.");
+ }
+ return stores;
+ } else {
+ return specificStoresToFlush;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/e55ef7a6/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushPolicy.java
----------------------------------------------------------------------
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushPolicy.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushPolicy.java
new file mode 100644
index 0000000..d581fee
--- /dev/null
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushPolicy.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.util.Collection;
+
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * A flush policy determines the stores that need to be flushed when flushing
a region.
+ */
[email protected]
+public abstract class FlushPolicy extends Configured {
+
+ /**
+ * The region configured for this flush policy.
+ */
+ protected HRegion region;
+
+ /**
+ * Upon construction, this method will be called with the region to be
governed. It will be called
+ * once and only once.
+ */
+ protected void configureForRegion(HRegion region) {
+ this.region = region;
+ }
+
+ /**
+ * @return the stores need to be flushed.
+ */
+ public abstract Collection<Store> selectStoresToFlush();
+
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/e55ef7a6/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushPolicyFactory.java
----------------------------------------------------------------------
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushPolicyFactory.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushPolicyFactory.java
new file mode 100644
index 0000000..e80b696
--- /dev/null
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushPolicyFactory.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.util.ReflectionUtils;
+
+/**
+ * The class that creates a flush policy from a conf and HTableDescriptor.
+ * <p>
+ * The default flush policy is {@link FlushLargeStoresPolicy}. And for 0.98,
the default flush
+ * policy is {@link FlushAllStoresPolicy}.
+ */
[email protected](HBaseInterfaceAudience.CONFIG)
+public class FlushPolicyFactory {
+
+ private static final Log LOG = LogFactory.getLog(FlushPolicyFactory.class);
+
+ public static final String HBASE_FLUSH_POLICY_KEY =
"hbase.regionserver.flush.policy";
+
+ private static final Class<? extends FlushPolicy> DEFAULT_FLUSH_POLICY_CLASS
=
+ FlushLargeStoresPolicy.class;
+
+ /**
+ * Create the FlushPolicy configured for the given table.
+ */
+ public static FlushPolicy create(HRegion region, Configuration conf) throws
IOException {
+ Class<? extends FlushPolicy> clazz =
getFlushPolicyClass(region.getTableDesc(), conf);
+ FlushPolicy policy = ReflectionUtils.newInstance(clazz, conf);
+ policy.configureForRegion(region);
+ return policy;
+ }
+
+ /**
+ * Get FlushPolicy class for the given table.
+ */
+ public static Class<? extends FlushPolicy>
getFlushPolicyClass(HTableDescriptor htd,
+ Configuration conf) throws IOException {
+ String className = htd.getFlushPolicyClassName();
+ if (className == null) {
+ className = conf.get(HBASE_FLUSH_POLICY_KEY,
DEFAULT_FLUSH_POLICY_CLASS.getName());
+ }
+ try {
+ Class<? extends FlushPolicy> clazz =
Class.forName(className).asSubclass(FlushPolicy.class);
+ return clazz;
+ } catch (Exception e) {
+ LOG.warn(
+ "Unable to load configured flush policy '" + className + "' for table
'"
+ + htd.getTableName() + "', load default flush policy "
+ + DEFAULT_FLUSH_POLICY_CLASS.getName() + " instead", e);
+ return DEFAULT_FLUSH_POLICY_CLASS;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/e55ef7a6/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushRequester.java
----------------------------------------------------------------------
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushRequester.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushRequester.java
index e1c3144..7517454 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushRequester.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushRequester.java
@@ -30,26 +30,31 @@ public interface FlushRequester {
* Tell the listener the cache needs to be flushed.
*
* @param region the HRegion requesting the cache flush
+ * @param forceFlushAllStores whether we want to flush all stores. e.g.,
when request from log
+ * rolling.
*/
- void requestFlush(HRegion region);
+ void requestFlush(HRegion region, boolean forceFlushAllStores);
+
/**
* Tell the listener the cache needs to be flushed after a delay
*
* @param region the HRegion requesting the cache flush
* @param delay after how much time should the flush happen
+ * @param forceFlushAllStores whether we want to flush all stores. e.g.,
when request from log
+ * rolling.
*/
- void requestDelayedFlush(HRegion region, long delay);
+ void requestDelayedFlush(HRegion region, long delay, boolean
forceFlushAllStores);
/**
* Register a FlushRequestListener
- *
+ *
* @param listener
*/
void registerFlushRequestListener(final FlushRequestListener listener);
/**
* Unregister the given FlushRequestListener
- *
+ *
* @param listener
* @return true when passed listener is unregistered successfully.
*/
@@ -57,7 +62,7 @@ public interface FlushRequester {
/**
* Sets the global memstore limit to a new size.
- *
+ *
* @param globalMemStoreSize
*/
public void setGlobalMemstoreLimit(long globalMemStoreSize);
http://git-wip-us.apache.org/repos/asf/hbase/blob/e55ef7a6/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 33aa8de..7ada09a 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -30,6 +30,7 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -41,6 +42,7 @@ import java.util.TreeMap;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
@@ -62,7 +64,6 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.lang.RandomStringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -89,6 +90,7 @@ import org.apache.hadoop.hbase.Tag;
import org.apache.hadoop.hbase.TagType;
import org.apache.hadoop.hbase.UnknownScannerException;
import org.apache.hadoop.hbase.backup.HFileArchiver;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Append;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
@@ -132,14 +134,9 @@ import
org.apache.hadoop.hbase.regionserver.MultiVersionConsistencyControl.Write
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL;
-import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
-import org.apache.hadoop.hbase.wal.WALFactory;
-import org.apache.hadoop.hbase.wal.WALKey;
-import org.apache.hadoop.hbase.wal.WALSplitter;
-import org.apache.hadoop.hbase.wal.WALSplitter.MutationReplay;
-import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
import org.apache.hadoop.hbase.snapshot.SnapshotManifest;
import org.apache.hadoop.hbase.util.Bytes;
@@ -155,6 +152,11 @@ import org.apache.hadoop.hbase.util.HashedBytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.wal.WALFactory;
+import org.apache.hadoop.hbase.wal.WALKey;
+import org.apache.hadoop.hbase.wal.WALSplitter;
+import org.apache.hadoop.hbase.wal.WALSplitter.MutationReplay;
import org.apache.hadoop.io.MultipleIOException;
import org.apache.hadoop.util.StringUtils;
@@ -228,10 +230,10 @@ public class HRegion implements HeapSize,
PropagatingConfigurationObserver { //
final AtomicBoolean closing = new AtomicBoolean(false);
/**
- * The sequence id of the last flush on this region. Used doing some rough
calculations on
+ * The max sequence id of flushed data on this region. Used doing some
rough calculations on
* whether time to flush or not.
*/
- protected volatile long lastFlushSeqId = -1L;
+ protected volatile long maxFlushedSeqId = -1L;
/**
* Region scoped edit sequence Id. Edits to this region are GUARANTEED to
appear in the WAL
@@ -516,7 +518,11 @@ public class HRegion implements HeapSize,
PropagatingConfigurationObserver { //
long memstoreFlushSize;
final long timestampSlop;
final long rowProcessorTimeout;
- private volatile long lastFlushTime;
+
+ // Last flush time for each Store. Useful when we are flushing for each
column
+ private final ConcurrentMap<Store, Long> lastStoreFlushTimeMap =
+ new ConcurrentHashMap<Store, Long>();
+
final RegionServerServices rsServices;
private RegionServerAccounting rsAccounting;
private long flushCheckInterval;
@@ -542,6 +548,7 @@ public class HRegion implements HeapSize,
PropagatingConfigurationObserver { //
private HTableDescriptor htableDescriptor = null;
private RegionSplitPolicy splitPolicy;
+ private FlushPolicy flushPolicy;
private final MetricsRegion metricsRegion;
private final MetricsRegionWrapperImpl metricsRegionWrapper;
@@ -618,7 +625,6 @@ public class HRegion implements HeapSize,
PropagatingConfigurationObserver { //
throw new IllegalArgumentException(MEMSTORE_FLUSH_PER_CHANGES + " can
not exceed "
+ MAX_FLUSH_PER_CHANGES);
}
-
this.rowLockWaitDuration = conf.getInt("hbase.rowlock.wait.duration",
DEFAULT_ROWLOCK_WAIT_DURATION);
@@ -777,8 +783,15 @@ public class HRegion implements HeapSize,
PropagatingConfigurationObserver { //
// Initialize split policy
this.splitPolicy = RegionSplitPolicy.create(this, conf);
- this.lastFlushTime = EnvironmentEdgeManager.currentTime();
- // Use maximum of wal sequenceid or that which was found in stores
+ // Initialize flush policy
+ this.flushPolicy = FlushPolicyFactory.create(this, conf);
+
+ long lastFlushTime = EnvironmentEdgeManager.currentTime();
+ for (Store store: stores.values()) {
+ this.lastStoreFlushTimeMap.put(store, lastFlushTime);
+ }
+
+ // Use maximum of log sequenceid or that which was found in stores
// (particularly if no recovered edits, seqid will be -1).
long nextSeqid = maxSeqId;
@@ -1316,10 +1329,10 @@ public class HRegion implements HeapSize,
PropagatingConfigurationObserver { //
status.setStatus("Running coprocessor post-close hooks");
this.coprocessorHost.postClose(abort);
}
- if ( this.metricsRegion != null) {
+ if (this.metricsRegion != null) {
this.metricsRegion.close();
}
- if ( this.metricsRegionWrapper != null) {
+ if (this.metricsRegionWrapper != null) {
Closeables.closeQuietly(this.metricsRegionWrapper);
}
status.markComplete("Closed");
@@ -1458,9 +1471,14 @@ public class HRegion implements HeapSize,
PropagatingConfigurationObserver { //
return this.fs;
}
- /** @return the last time the region was flushed */
- public long getLastFlushTime() {
- return this.lastFlushTime;
+ /**
+ * @return Returns the earliest time a store in the region was flushed. All
+ * other stores in the region would have been flushed either at, or
+ * after this time.
+ */
+ @VisibleForTesting
+ public long getEarliestFlushTimeForAllStores() {
+ return Collections.min(lastStoreFlushTimeMap.values());
}
//////////////////////////////////////////////////////////////////////////////
@@ -1626,6 +1644,18 @@ public class HRegion implements HeapSize,
PropagatingConfigurationObserver { //
}
/**
+ * Flush all stores.
+ * <p>
+ * See {@link #flushcache(boolean)}.
+ *
+ * @return whether the flush is success and whether the region needs
compacting
+ * @throws IOException
+ */
+ public FlushResult flushcache() throws IOException {
+ return flushcache(true);
+ }
+
+ /**
* Flush the cache.
*
* When this method is called the cache will be flushed unless:
@@ -1638,14 +1668,14 @@ public class HRegion implements HeapSize,
PropagatingConfigurationObserver { //
*
* <p>This method may block for some time, so it should not be called from a
* time-sensitive thread.
- *
- * @return true if the region needs compacting
+ * @param forceFlushAllStores whether we want to flush all stores
+ * @return whether the flush is success and whether the region needs
compacting
*
* @throws IOException general io exceptions
* @throws DroppedSnapshotException Thrown when replay of wal is required
* because a Snapshot was not properly persisted.
*/
- public FlushResult flushcache() throws IOException {
+ public FlushResult flushcache(boolean forceFlushAllStores) throws
IOException {
// fail-fast instead of waiting on the lock
if (this.closing.get()) {
String msg = "Skipping flush on " + this + " because closing";
@@ -1687,8 +1717,11 @@ public class HRegion implements HeapSize,
PropagatingConfigurationObserver { //
return new FlushResult(FlushResult.Result.CANNOT_FLUSH, msg);
}
}
+
try {
- FlushResult fs = internalFlushcache(status);
+ Collection<Store> specificStoresToFlush =
+ forceFlushAllStores ? stores.values() :
flushPolicy.selectStoresToFlush();
+ FlushResult fs = internalFlushcache(specificStoresToFlush, status);
if (coprocessorHost != null) {
status.setStatus("Running post-flush coprocessor hooks");
@@ -1711,12 +1744,47 @@ public class HRegion implements HeapSize,
PropagatingConfigurationObserver { //
}
/**
+ * Should the store be flushed because it is old enough.
+ * <p>
+ * Every FlushPolicy should call this to determine whether a store is old
enough to flush(except
+ * that you always flush all stores). Otherwise the {@link #shouldFlush()}
method will always
+ * returns true which will make a lot of flush requests.
+ */
+ boolean shouldFlushStore(Store store) {
+ long maxFlushedSeqId =
+
this.wal.getEarliestMemstoreSeqNum(getRegionInfo().getEncodedNameAsBytes(),
store
+ .getFamily().getName()) - 1;
+ if (maxFlushedSeqId > 0 && maxFlushedSeqId + flushPerChanges <
sequenceId.get()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Column Family: " + store.getColumnFamilyName() + " of
region " + this
+ + " will be flushed because its max flushed seqId(" +
maxFlushedSeqId
+ + ") is far away from current(" + sequenceId.get() + "), max
allowed is "
+ + flushPerChanges);
+ }
+ return true;
+ }
+ if (flushCheckInterval <= 0) {
+ return false;
+ }
+ long now = EnvironmentEdgeManager.currentTime();
+ if (store.timeOfOldestEdit() < now - flushCheckInterval) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Column Family: " + store.getColumnFamilyName() + " of
region " + this
+ + " will be flushed because time of its oldest edit (" +
store.timeOfOldestEdit()
+ + ") is far away from now(" + now + "), max allowed is " +
flushCheckInterval);
+ }
+ return true;
+ }
+ return false;
+ }
+
+ /**
* Should the memstore be flushed now
*/
boolean shouldFlush() {
// This is a rough measure.
- if (this.lastFlushSeqId > 0
- && (this.lastFlushSeqId + this.flushPerChanges <
this.sequenceId.get())) {
+ if (this.maxFlushedSeqId > 0
+ && (this.maxFlushedSeqId + this.flushPerChanges <
this.sequenceId.get())) {
return true;
}
if (flushCheckInterval <= 0) { //disabled
@@ -1724,7 +1792,7 @@ public class HRegion implements HeapSize,
PropagatingConfigurationObserver { //
}
long now = EnvironmentEdgeManager.currentTime();
//if we flushed in the recent past, we don't need to do again now
- if ((now - getLastFlushTime() < flushCheckInterval)) {
+ if ((now - getEarliestFlushTimeForAllStores() < flushCheckInterval)) {
return false;
}
//since we didn't flush in the recent past, flush now if certain conditions
@@ -1739,35 +1807,56 @@ public class HRegion implements HeapSize,
PropagatingConfigurationObserver { //
}
/**
- * Flush the memstore. Flushing the memstore is a little tricky. We have a
lot of updates in the
- * memstore, all of which have also been written to the wal. We need to
write those updates in the
- * memstore out to disk, while being able to process reads/writes as much as
possible during the
- * flush operation.
- * <p>This method may block for some time. Every time you call it, we up
the regions
- * sequence id even if we don't flush; i.e. the returned region id will be
at least one larger
- * than the last edit applied to this region. The returned id does not refer
to an actual edit.
- * The returned id can be used for say installing a bulk loaded file just
ahead of the last hfile
- * that was the result of this flush, etc.
- * @return object describing the flush's state
+ * Flushing all stores.
*
- * @throws IOException general io exceptions
- * @throws DroppedSnapshotException Thrown when replay of wal is required
- * because a Snapshot was not properly persisted.
+ * @see #internalFlushcache(Collection, MonitoredTask)
*/
- protected FlushResult internalFlushcache(MonitoredTask status)
+ private FlushResult internalFlushcache(MonitoredTask status)
throws IOException {
- return internalFlushcache(this.wal, -1, status);
+ return internalFlushcache(stores.values(), status);
+ }
+
+ /**
+ * Flushing given stores.
+ *
+ * @see #internalFlushcache(WAL, long, Collection, MonitoredTask)
+ */
+ private FlushResult internalFlushcache(final Collection<Store> storesToFlush,
+ MonitoredTask status) throws IOException {
+ return internalFlushcache(this.wal, HConstants.NO_SEQNUM, storesToFlush,
+ status);
}
/**
- * @param wal Null if we're NOT to go via wal.
- * @param myseqid The seqid to use if <code>wal</code> is null writing out
flush file.
+ * Flush the memstore. Flushing the memstore is a little tricky. We have a
lot
+ * of updates in the memstore, all of which have also been written to the
wal.
+ * We need to write those updates in the memstore out to disk, while being
+ * able to process reads/writes as much as possible during the flush
+ * operation.
+ * <p>
+ * This method may block for some time. Every time you call it, we up the
+ * regions sequence id even if we don't flush; i.e. the returned region id
+ * will be at least one larger than the last edit applied to this region. The
+ * returned id does not refer to an actual edit. The returned id can be used
+ * for say installing a bulk loaded file just ahead of the last hfile that
was
+ * the result of this flush, etc.
+ *
+ * @param wal
+ * Null if we're NOT to go via wal.
+ * @param myseqid
+ * The seqid to use if <code>wal</code> is null writing out flush
+ * file.
+ * @param storesToFlush
+ * The list of stores to flush.
* @return object describing the flush's state
* @throws IOException
- * @see #internalFlushcache(MonitoredTask)
+ * general io exceptions
+ * @throws DroppedSnapshotException
+ * Thrown when replay of wal is required because a Snapshot was not
+ * properly persisted.
*/
- protected FlushResult internalFlushcache(
- final WAL wal, final long myseqid, MonitoredTask status) throws
IOException {
+ protected FlushResult internalFlushcache(final WAL wal, final long myseqid,
+ final Collection<Store> storesToFlush, MonitoredTask status) throws
IOException {
if (this.rsServices != null && this.rsServices.isAborted()) {
// Don't flush when server aborting, it's unsafe
throw new IOException("Aborting flush because server is aborted...");
@@ -1809,63 +1898,86 @@ public class HRegion implements HeapSize,
PropagatingConfigurationObserver { //
}
}
- LOG.info("Started memstore flush for " + this +
- ", current region memstore size " +
- StringUtils.byteDesc(this.memstoreSize.get()) +
- ((wal != null)? "": "; wal is null, using passed sequenceid=" +
myseqid));
-
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Started memstore flush for " + this + ", current region
memstore size "
+ + StringUtils.byteDesc(this.memstoreSize.get()) + ", and " +
storesToFlush.size() + "/"
+ + stores.size() + " column families' memstores are being flushed."
+ + ((wal != null) ? "" : "; wal is null, using passed sequenceid=" +
myseqid));
+ // only log when we are not flushing all stores.
+ if (this.stores.size() > storesToFlush.size()) {
+ for (Store store: storesToFlush) {
+ LOG.info("Flushing Column Family: " + store.getColumnFamilyName()
+ + " which was occupying "
+ + StringUtils.byteDesc(store.getMemStoreSize()) + " of
memstore.");
+ }
+ }
+ }
// Stop updates while we snapshot the memstore of all of these regions'
stores. We only have
// to do this for a moment. It is quick. We also set the memstore size to
zero here before we
// allow updates again so its value will represent the size of the updates
received
// during flush
MultiVersionConsistencyControl.WriteEntry w = null;
-
// We have to take an update lock during snapshot, or else a write could
end up in both snapshot
// and memstore (makes it difficult to do atomic rows then)
status.setStatus("Obtaining lock to block concurrent updates");
// block waiting for the lock for internal flush
this.updatesLock.writeLock().lock();
- long totalFlushableSize = 0;
status.setStatus("Preparing to flush by snapshotting stores in " +
getRegionInfo().getEncodedName());
+ long totalFlushableSizeOfFlushableStores = 0;
+
+ Set<byte[]> flushedFamilyNames = new HashSet<byte[]>();
+ for (Store store: storesToFlush) {
+ flushedFamilyNames.add(store.getFamily().getName());
+ }
+
List<StoreFlushContext> storeFlushCtxs = new
ArrayList<StoreFlushContext>(stores.size());
TreeMap<byte[], List<Path>> committedFiles = new TreeMap<byte[],
List<Path>>(
Bytes.BYTES_COMPARATOR);
- long flushSeqId = -1L;
+ // The sequence id of this flush operation which is used to log
FlushMarker and pass to
+ // createFlushContext to use as the store file's sequence id.
+ long flushOpSeqId = HConstants.NO_SEQNUM;
+ // The max flushed sequence id after this flush operation. Used as
completeSequenceId which is
+ // passed to HMaster.
+ long flushedSeqId = HConstants.NO_SEQNUM;
+ byte[] encodedRegionName = getRegionInfo().getEncodedNameAsBytes();
long trxId = 0;
try {
try {
w = mvcc.beginMemstoreInsert();
if (wal != null) {
- if
(!wal.startCacheFlush(this.getRegionInfo().getEncodedNameAsBytes())) {
+ if (!wal.startCacheFlush(encodedRegionName, flushedFamilyNames)) {
// This should never happen.
String msg = "Flush will not be started for ["
+ this.getRegionInfo().getEncodedName() + "] - because the WAL
is closing.";
status.setStatus(msg);
return new FlushResult(FlushResult.Result.CANNOT_FLUSH, msg);
}
- // Get a sequence id that we can use to denote the flush. It will be
one beyond the last
- // edit that made it into the hfile (the below does not add an edit,
it just asks the
- // WAL system to return next sequence edit).
- flushSeqId = getNextSequenceId(wal);
+ flushOpSeqId = getNextSequenceId(wal);
+ long oldestUnflushedSeqId =
wal.getEarliestMemstoreSeqNum(encodedRegionName);
+ // no oldestUnflushedSeqId means we flushed all stores.
+ // or the unflushed stores are all empty.
+ flushedSeqId =
+ oldestUnflushedSeqId == HConstants.NO_SEQNUM ? flushOpSeqId :
oldestUnflushedSeqId - 1;
} else {
// use the provided sequence Id as WAL is not being used for this
flush.
- flushSeqId = myseqid;
+ flushedSeqId = flushOpSeqId = myseqid;
}
- for (Store s : stores.values()) {
- totalFlushableSize += s.getFlushableSize();
- storeFlushCtxs.add(s.createFlushContext(flushSeqId));
+ for (Store s : storesToFlush) {
+ totalFlushableSizeOfFlushableStores += s.getFlushableSize();
+ storeFlushCtxs.add(s.createFlushContext(flushOpSeqId));
committedFiles.put(s.getFamily().getName(), null); // for writing
stores to WAL
}
// write the snapshot start to WAL
if (wal != null) {
FlushDescriptor desc =
ProtobufUtil.toFlushDescriptor(FlushAction.START_FLUSH,
- getRegionInfo(), flushSeqId, committedFiles);
+ getRegionInfo(), flushOpSeqId, committedFiles);
+ // no sync. Sync is below where we do not hold the updates lock
trxId = WALUtil.writeFlushMarker(wal, this.htableDescriptor,
getRegionInfo(),
- desc, sequenceId, false); // no sync. Sync is below where we do
not hold the updates lock
+ desc, sequenceId, false);
}
// Prepare flush (take a snapshot)
@@ -1877,7 +1989,7 @@ public class HRegion implements HeapSize,
PropagatingConfigurationObserver { //
if (trxId > 0) { // check whether we have already written
START_FLUSH to WAL
try {
FlushDescriptor desc =
ProtobufUtil.toFlushDescriptor(FlushAction.ABORT_FLUSH,
- getRegionInfo(), flushSeqId, committedFiles);
+ getRegionInfo(), flushOpSeqId, committedFiles);
WALUtil.writeFlushMarker(wal, this.htableDescriptor,
getRegionInfo(),
desc, sequenceId, false);
} catch (Throwable t) {
@@ -1894,7 +2006,7 @@ public class HRegion implements HeapSize,
PropagatingConfigurationObserver { //
this.updatesLock.writeLock().unlock();
}
String s = "Finished memstore snapshotting " + this +
- ", syncing WAL and waiting on mvcc, flushsize=" + totalFlushableSize;
+ ", syncing WAL and waiting on mvcc, flushsize=" +
totalFlushableSizeOfFlushableStores;
status.setStatus(s);
if (LOG.isTraceEnabled()) LOG.trace(s);
// sync unflushed WAL changes
@@ -1913,7 +2025,7 @@ public class HRegion implements HeapSize,
PropagatingConfigurationObserver { //
// uncommitted transactions from being written into HFiles.
// We have to block before we start the flush, otherwise keys that
// were removed via a rollbackMemstore could be written to Hfiles.
- w.setWriteNumber(flushSeqId);
+ w.setWriteNumber(flushOpSeqId);
mvcc.waitForPreviousTransactionsComplete(w);
// set w to null to prevent mvcc.advanceMemstore from being called again
inside finally block
w = null;
@@ -1944,8 +2056,8 @@ public class HRegion implements HeapSize,
PropagatingConfigurationObserver { //
// Switch snapshot (in memstore) -> new hfile (thus causing
// all the store scanners to reset/reseek).
- Iterator<Store> it = stores.values().iterator(); // stores.values() and
storeFlushCtxs have
- // same order
+ Iterator<Store> it = storesToFlush.iterator();
+ // stores.values() and storeFlushCtxs have same order
for (StoreFlushContext flush : storeFlushCtxs) {
boolean needsCompaction = flush.commit(status);
if (needsCompaction) {
@@ -1956,12 +2068,12 @@ public class HRegion implements HeapSize,
PropagatingConfigurationObserver { //
storeFlushCtxs.clear();
// Set down the memstore size by amount of flush.
- this.addAndGetGlobalMemstoreSize(-totalFlushableSize);
+ this.addAndGetGlobalMemstoreSize(-totalFlushableSizeOfFlushableStores);
if (wal != null) {
// write flush marker to WAL. If fail, we should throw
DroppedSnapshotException
FlushDescriptor desc =
ProtobufUtil.toFlushDescriptor(FlushAction.COMMIT_FLUSH,
- getRegionInfo(), flushSeqId, committedFiles);
+ getRegionInfo(), flushOpSeqId, committedFiles);
WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(),
desc, sequenceId, true);
}
@@ -1975,7 +2087,7 @@ public class HRegion implements HeapSize,
PropagatingConfigurationObserver { //
if (wal != null) {
try {
FlushDescriptor desc =
ProtobufUtil.toFlushDescriptor(FlushAction.ABORT_FLUSH,
- getRegionInfo(), flushSeqId, committedFiles);
+ getRegionInfo(), flushOpSeqId, committedFiles);
WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(),
desc, sequenceId, false);
} catch (Throwable ex) {
@@ -1998,10 +2110,12 @@ public class HRegion implements HeapSize,
PropagatingConfigurationObserver { //
}
// Record latest flush time
- this.lastFlushTime = EnvironmentEdgeManager.currentTime();
+ for (Store store: storesToFlush) {
+ this.lastStoreFlushTimeMap.put(store, startTime);
+ }
- // Update the last flushed sequence id for region. TODO: This is dup'd
inside the WAL/FSHlog.
- this.lastFlushSeqId = flushSeqId;
+ // Update the oldest unflushed sequence id for region.
+ this.maxFlushedSeqId = flushedSeqId;
// C. Finally notify anyone waiting on memstore to clear:
// e.g. checkResources().
@@ -2011,18 +2125,18 @@ public class HRegion implements HeapSize,
PropagatingConfigurationObserver { //
long time = EnvironmentEdgeManager.currentTime() - startTime;
long memstoresize = this.memstoreSize.get();
- String msg = "Finished memstore flush of ~" +
- StringUtils.byteDesc(totalFlushableSize) + "/" + totalFlushableSize +
- ", currentsize=" +
- StringUtils.byteDesc(memstoresize) + "/" + memstoresize +
- " for region " + this + " in " + time + "ms, sequenceid=" + flushSeqId +
- ", compaction requested=" + compactionRequested +
- ((wal == null)? "; wal=null": "");
+ String msg = "Finished memstore flush of ~"
+ + StringUtils.byteDesc(totalFlushableSizeOfFlushableStores) + "/"
+ + totalFlushableSizeOfFlushableStores + ", currentsize="
+ + StringUtils.byteDesc(memstoresize) + "/" + memstoresize
+ + " for region " + this + " in " + time + "ms, sequenceid="
+ + flushOpSeqId + ", compaction requested=" + compactionRequested
+ + ((wal == null) ? "; wal=null" : "");
LOG.info(msg);
status.setStatus(msg);
return new FlushResult(compactionRequested ?
FlushResult.Result.FLUSHED_COMPACTION_NEEDED :
- FlushResult.Result.FLUSHED_NO_COMPACTION_NEEDED, flushSeqId);
+ FlushResult.Result.FLUSHED_NO_COMPACTION_NEEDED, flushOpSeqId);
}
/**
@@ -2153,7 +2267,7 @@ public class HRegion implements HeapSize,
PropagatingConfigurationObserver { //
if(delete.getFamilyCellMap().isEmpty()){
for(byte [] family : this.htableDescriptor.getFamiliesKeys()){
// Don't eat the timestamp
- delete.deleteFamily(family, delete.getTimeStamp());
+ delete.addFamily(family, delete.getTimeStamp());
}
} else {
for(byte [] family : delete.getFamilyCellMap().keySet()) {
@@ -2804,6 +2918,7 @@ public class HRegion implements HeapSize,
PropagatingConfigurationObserver { //
coprocessorHost.postBatchMutate(miniBatchOp);
}
+
// ------------------------------------------------------------------
// STEP 8. Advance mvcc. This will make this put visible to scanners and
getters.
// ------------------------------------------------------------------
@@ -2835,7 +2950,6 @@ public class HRegion implements HeapSize,
PropagatingConfigurationObserver { //
success = true;
return addedSize;
} finally {
-
// if the wal sync was unsuccessful, remove keys from memstore
if (doRollBackMemstore) {
rollbackMemstore(memstoreCells);
@@ -3194,8 +3308,7 @@ public class HRegion implements HeapSize,
PropagatingConfigurationObserver { //
* We throw RegionTooBusyException if above memstore limit
* and expect client to retry using some kind of backoff
*/
- private void checkResources()
- throws RegionTooBusyException {
+ private void checkResources() throws RegionTooBusyException {
// If catalog region, do not impose resource constraints or block updates.
if (this.getRegionInfo().isMetaRegion()) return;
@@ -3391,7 +3504,7 @@ public class HRegion implements HeapSize,
PropagatingConfigurationObserver { //
writestate.flushRequested = true;
}
// Make request outside of synchronize block; HBASE-818.
- this.rsServices.getFlushRequester().requestFlush(this);
+ this.rsServices.getFlushRequester().requestFlush(this, false);
if (LOG.isDebugEnabled()) {
LOG.debug("Flush requested on " + this);
}
@@ -3512,7 +3625,7 @@ public class HRegion implements HeapSize,
PropagatingConfigurationObserver { //
}
if (seqid > minSeqIdForTheRegion) {
// Then we added some edits to memory. Flush and cleanup split edit
files.
- internalFlushcache(null, seqid, status);
+ internalFlushcache(null, seqid, stores.values(), status);
}
// Now delete the content of recovered edits. We're done w/ them.
for (Path file: files) {
@@ -3666,7 +3779,7 @@ public class HRegion implements HeapSize,
PropagatingConfigurationObserver { //
editsCount++;
}
if (flush) {
- internalFlushcache(null, currentEditSeqId, status);
+ internalFlushcache(null, currentEditSeqId, stores.values(),
status);
}
if (coprocessorHost != null) {
@@ -4014,7 +4127,7 @@ public class HRegion implements HeapSize,
PropagatingConfigurationObserver { //
// guaranteed to be one beyond the file made when we flushed (or if
nothing to flush, it is
// a sequence id that we can be sure is beyond the last hfile written).
if (assignSeqId) {
- FlushResult fs = this.flushcache();
+ FlushResult fs = this.flushcache(true);
if (fs.isFlushSucceeded()) {
seqId = fs.flushSequenceId;
} else if (fs.result ==
FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY) {
@@ -5057,8 +5170,8 @@ public class HRegion implements HeapSize,
PropagatingConfigurationObserver { //
FileSystem fs = a.getRegionFileSystem().getFileSystem();
// Make sure each region's cache is empty
- a.flushcache();
- b.flushcache();
+ a.flushcache(true);
+ b.flushcache(true);
// Compact each region so we only have one store file per family
a.compactStores(true);
@@ -5172,7 +5285,7 @@ public class HRegion implements HeapSize,
PropagatingConfigurationObserver { //
// do after lock
if (this.metricsRegion != null) {
- long totalSize = 0l;
+ long totalSize = 0L;
for (Cell cell : results) {
totalSize += CellUtil.estimatedSerializedSizeOf(cell);
}
@@ -5340,7 +5453,6 @@ public class HRegion implements HeapSize,
PropagatingConfigurationObserver { //
// to get a sequence id assigned which is done by
FSWALEntry#stampRegionSequenceId
walKey = this.appendEmptyEdit(this.wal, memstoreCells);
}
-
// 9. Release region lock
if (locked) {
this.updatesLock.readLock().unlock();
@@ -5468,7 +5580,6 @@ public class HRegion implements HeapSize,
PropagatingConfigurationObserver { //
WALEdit walEdits = null;
List<Cell> allKVs = new ArrayList<Cell>(append.size());
Map<Store, List<Cell>> tempMemstore = new HashMap<Store, List<Cell>>();
-
long size = 0;
long txid = 0;
@@ -5671,7 +5782,6 @@ public class HRegion implements HeapSize,
PropagatingConfigurationObserver { //
// Append a faked WALEdit in order for SKIP_WAL updates to get
mvcc assigned
walKey = this.appendEmptyEdit(this.wal, memstoreCells);
}
-
size = this.addAndGetGlobalMemstoreSize(size);
flush = isFlushSize(size);
} finally {
@@ -5968,8 +6078,8 @@ public class HRegion implements HeapSize,
PropagatingConfigurationObserver { //
public static final long FIXED_OVERHEAD = ClassSize.align(
ClassSize.OBJECT +
ClassSize.ARRAY +
- 42 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT +
- (12 * Bytes.SIZEOF_LONG) +
+ 44 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT +
+ (11 * Bytes.SIZEOF_LONG) +
4 * Bytes.SIZEOF_BOOLEAN);
// woefully out of date - currently missing:
@@ -6539,6 +6649,12 @@ public class HRegion implements HeapSize,
PropagatingConfigurationObserver { //
return this.maxSeqIdInStores;
}
+ @VisibleForTesting
+ public long getOldestSeqIdOfStore(byte[] familyName) {
+ return wal.getEarliestMemstoreSeqNum(getRegionInfo()
+ .getEncodedNameAsBytes(), familyName);
+ }
+
/**
* @return if a given region is in compaction now.
*/
http://git-wip-us.apache.org/repos/asf/hbase/blob/e55ef7a6/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
----------------------------------------------------------------------
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index ddeacd3..873168c 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -1379,7 +1379,7 @@ public class HRegionServer extends HasThread implements
.setWriteRequestsCount(r.writeRequestsCount.get())
.setTotalCompactingKVs(totalCompactingKVs)
.setCurrentCompactedKVs(currentCompactedKVs)
- .setCompleteSequenceId(r.lastFlushSeqId)
+ .setCompleteSequenceId(r.maxFlushedSeqId)
.setDataLocality(dataLocality);
return regionLoadBldr.build();
@@ -1475,7 +1475,7 @@ public class HRegionServer extends HasThread implements
//Throttle the flushes by putting a delay. If we don't throttle,
and there
//is a balanced write-load on the regions in a table, we might end
up
//overwhelming the filesystem with too many flushes at once.
- requester.requestDelayedFlush(r, randomDelay);
+ requester.requestDelayedFlush(r, randomDelay, false);
}
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/e55ef7a6/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java
----------------------------------------------------------------------
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java
index aa5998b..0c5af84 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java
@@ -170,7 +170,8 @@ class LogRoller extends HasThread {
if (r != null) {
requester = this.services.getFlushRequester();
if (requester != null) {
- requester.requestFlush(r);
+ // force flushing all stores to clean old logs
+ requester.requestFlush(r, true);
scheduled = true;
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/e55ef7a6/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
----------------------------------------------------------------------
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
index b2820dd..87821ec 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
@@ -39,17 +39,17 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.DroppedSnapshotException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.RemoteExceptionHandler;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.io.util.HeapMemorySizeUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.HasThread;
import org.apache.hadoop.hbase.util.Threads;
-import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix;
import org.htrace.Trace;
import org.htrace.TraceScope;
import org.apache.hadoop.hbase.util.Counter;
@@ -105,20 +105,20 @@ class MemStoreFlusher implements FlushRequester {
long max =
ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getMax();
float globalMemStorePercent =
HeapMemorySizeUtil.getGlobalMemStorePercent(conf, true);
this.globalMemStoreLimit = (long) (max * globalMemStorePercent);
- this.globalMemStoreLimitLowMarkPercent =
+ this.globalMemStoreLimitLowMarkPercent =
HeapMemorySizeUtil.getGlobalMemStoreLowerMark(conf,
globalMemStorePercent);
- this.globalMemStoreLimitLowMark =
+ this.globalMemStoreLimitLowMark =
(long) (this.globalMemStoreLimit *
this.globalMemStoreLimitLowMarkPercent);
this.blockingWaitTime = conf.getInt("hbase.hstore.blockingWaitTime",
90000);
int handlerCount = conf.getInt("hbase.hstore.flusher.count", 2);
this.flushHandlers = new FlushHandler[handlerCount];
- LOG.info("globalMemStoreLimit=" +
- StringUtils.humanReadableInt(this.globalMemStoreLimit) +
- ", globalMemStoreLimitLowMark=" +
- StringUtils.humanReadableInt(this.globalMemStoreLimitLowMark) +
- ", maxHeap=" + StringUtils.humanReadableInt(max));
+ LOG.info("globalMemStoreLimit="
+ + TraditionalBinaryPrefix.long2String(this.globalMemStoreLimit, "", 1)
+ + ", globalMemStoreLimitLowMark="
+ + TraditionalBinaryPrefix.long2String(this.globalMemStoreLimitLowMark,
"", 1)
+ + ", maxHeap=" + TraditionalBinaryPrefix.long2String(max, "", 1));
}
public Counter getUpdatesBlockedMsHighWater() {
@@ -160,13 +160,12 @@ class MemStoreFlusher implements FlushRequester {
// lots of little flushes and cause lots of compactions, etc, which
just makes
// life worse!
if (LOG.isDebugEnabled()) {
- LOG.debug("Under global heap pressure: " +
- "Region " + bestAnyRegion.getRegionNameAsString() + " has too many
" +
- "store files, but is " +
- StringUtils.humanReadableInt(bestAnyRegion.memstoreSize.get()) +
- " vs best flushable region's " +
-
StringUtils.humanReadableInt(bestFlushableRegion.memstoreSize.get()) +
- ". Choosing the bigger.");
+ LOG.debug("Under global heap pressure: " + "Region "
+ + bestAnyRegion.getRegionNameAsString() + " has too many " +
"store files, but is "
+ +
TraditionalBinaryPrefix.long2String(bestAnyRegion.memstoreSize.get(), "", 1)
+ + " vs best flushable region's "
+ +
TraditionalBinaryPrefix.long2String(bestFlushableRegion.memstoreSize.get(), "",
1)
+ + ". Choosing the bigger.");
}
regionToFlush = bestAnyRegion;
} else {
@@ -180,7 +179,7 @@ class MemStoreFlusher implements FlushRequester {
Preconditions.checkState(regionToFlush.memstoreSize.get() > 0);
LOG.info("Flush of region " + regionToFlush + " due to global heap
pressure");
- flushedOne = flushRegion(regionToFlush, true);
+ flushedOne = flushRegion(regionToFlush, true, true);
if (!flushedOne) {
LOG.info("Excluding unflushable region " + regionToFlush +
" - trying to find a different region to flush.");
@@ -206,7 +205,7 @@ class MemStoreFlusher implements FlushRequester {
if (fqe == null || fqe instanceof WakeupFlushThread) {
if (isAboveLowWaterMark()) {
LOG.debug("Flush thread woke up because memory above low water="
- + StringUtils.humanReadableInt(globalMemStoreLimitLowMark));
+ +
TraditionalBinaryPrefix.long2String(globalMemStoreLimitLowMark, "", 1));
if (!flushOneForGlobalPressure()) {
// Wasn't able to flush any region, but we're above low water
mark
// This is unlikely to happen, but might happen when closing
the
@@ -293,23 +292,23 @@ class MemStoreFlusher implements FlushRequester {
getGlobalMemstoreSize() >= globalMemStoreLimitLowMark;
}
- public void requestFlush(HRegion r) {
+ public void requestFlush(HRegion r, boolean forceFlushAllStores) {
synchronized (regionsInQueue) {
if (!regionsInQueue.containsKey(r)) {
// This entry has no delay so it will be added at the top of the flush
// queue. It'll come out near immediately.
- FlushRegionEntry fqe = new FlushRegionEntry(r);
+ FlushRegionEntry fqe = new FlushRegionEntry(r, forceFlushAllStores);
this.regionsInQueue.put(r, fqe);
this.flushQueue.add(fqe);
}
}
}
- public void requestDelayedFlush(HRegion r, long delay) {
+ public void requestDelayedFlush(HRegion r, long delay, boolean
forceFlushAllStores) {
synchronized (regionsInQueue) {
if (!regionsInQueue.containsKey(r)) {
// This entry has some delay
- FlushRegionEntry fqe = new FlushRegionEntry(r);
+ FlushRegionEntry fqe = new FlushRegionEntry(r, forceFlushAllStores);
fqe.requeue(delay);
this.regionsInQueue.put(r, fqe);
this.flushQueue.add(fqe);
@@ -362,7 +361,7 @@ class MemStoreFlusher implements FlushRequester {
}
}
- /*
+ /**
* A flushRegion that checks store file count. If too many, puts the flush
* on delay queue to retry later.
* @param fqe
@@ -404,22 +403,23 @@ class MemStoreFlusher implements FlushRequester {
return true;
}
}
- return flushRegion(region, false);
+ return flushRegion(region, false, fqe.isForceFlushAllStores());
}
- /*
+ /**
* Flush a region.
* @param region Region to flush.
* @param emergencyFlush Set if we are being force flushed. If true the
region
* needs to be removed from the flush queue. If false, when we were called
* from the main flusher run loop and we got the entry to flush by calling
* poll on the flush queue (which removed it).
- *
+ * @param forceFlushAllStores whether we want to flush all store.
* @return true if the region was successfully flushed, false otherwise. If
* false, there will be accompanying log messages explaining why the log was
* not flushed.
*/
- private boolean flushRegion(final HRegion region, final boolean
emergencyFlush) {
+ private boolean flushRegion(final HRegion region, final boolean
emergencyFlush,
+ boolean forceFlushAllStores) {
long startTime = 0;
synchronized (this.regionsInQueue) {
FlushRegionEntry fqe = this.regionsInQueue.remove(region);
@@ -442,7 +442,7 @@ class MemStoreFlusher implements FlushRequester {
lock.readLock().lock();
try {
notifyFlushRequest(region, emergencyFlush);
- HRegion.FlushResult flushResult = region.flushcache();
+ HRegion.FlushResult flushResult = region.flushcache(forceFlushAllStores);
boolean shouldCompact = flushResult.isCompactionNeeded();
// We just want to check the size
boolean shouldSplit = region.checkSplit() != null;
@@ -524,11 +524,12 @@ class MemStoreFlusher implements FlushRequester {
while (isAboveHighWaterMark() && !server.isStopped()) {
if (!blocked) {
startTime = EnvironmentEdgeManager.currentTime();
- LOG.info("Blocking updates on " + server.toString() +
- ": the global memstore size " +
-
StringUtils.humanReadableInt(server.getRegionServerAccounting().getGlobalMemstoreSize())
+
- " is >= than blocking " +
- StringUtils.humanReadableInt(globalMemStoreLimit) + " size");
+ LOG.info("Blocking updates on "
+ + server.toString()
+ + ": the global memstore size "
+ +
TraditionalBinaryPrefix.long2String(server.getRegionServerAccounting()
+ .getGlobalMemstoreSize(), "", 1) + " is >= than blocking
"
+ + TraditionalBinaryPrefix.long2String(globalMemStoreLimit,
"", 1) + " size");
}
blocked = true;
wakeupFlushThread();
@@ -605,7 +606,7 @@ class MemStoreFlusher implements FlushRequester {
*/
public void setGlobalMemstoreLimit(long globalMemStoreSize) {
this.globalMemStoreLimit = globalMemStoreSize;
- this.globalMemStoreLimitLowMark =
+ this.globalMemStoreLimitLowMark =
(long) (this.globalMemStoreLimitLowMarkPercent * globalMemStoreSize);
reclaimMemStoreMemory();
}
@@ -652,10 +653,13 @@ class MemStoreFlusher implements FlushRequester {
private long whenToExpire;
private int requeueCount = 0;
- FlushRegionEntry(final HRegion r) {
+ private boolean forceFlushAllStores;
+
+ FlushRegionEntry(final HRegion r, boolean forceFlushAllStores) {
this.region = r;
this.createTime = EnvironmentEdgeManager.currentTime();
this.whenToExpire = this.createTime;
+ this.forceFlushAllStores = forceFlushAllStores;
}
/**
@@ -675,6 +679,13 @@ class MemStoreFlusher implements FlushRequester {
}
/**
+ * @return whether we need to flush all stores.
+ */
+ public boolean isForceFlushAllStores() {
+ return forceFlushAllStores;
+ }
+
+ /**
* @param when When to expire, when to come up out of the queue.
* Specify in milliseconds. This method adds
EnvironmentEdgeManager.currentTime()
* to whatever you pass.
http://git-wip-us.apache.org/repos/asf/hbase/blob/e55ef7a6/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
----------------------------------------------------------------------
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index 06e51c6..fec3030 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -72,7 +72,6 @@ import
org.apache.hadoop.hbase.exceptions.OperationConflictException;
import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
import org.apache.hadoop.hbase.filter.ByteArrayComparable;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
-import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.ipc.HBaseRPCErrorHandler;
import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
import org.apache.hadoop.hbase.ipc.PriorityFunction;
@@ -150,8 +149,6 @@ import
org.apache.hadoop.hbase.regionserver.Leases.LeaseStillHeldException;
import org.apache.hadoop.hbase.regionserver.handler.OpenMetaHandler;
import org.apache.hadoop.hbase.regionserver.handler.OpenRegionHandler;
import org.apache.hadoop.hbase.wal.WAL;
-import org.apache.hadoop.hbase.wal.WALKey;
-import org.apache.hadoop.hbase.wal.WALSplitter;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.ByteStringer;
import org.apache.hadoop.hbase.util.Bytes;
@@ -159,6 +156,8 @@ import org.apache.hadoop.hbase.util.Counter;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Strings;
+import org.apache.hadoop.hbase.wal.WALKey;
+import org.apache.hadoop.hbase.wal.WALSplitter;
import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
import org.apache.hadoop.net.DNS;
import org.apache.zookeeper.KeeperException;
@@ -688,7 +687,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
*/
private OperationStatus [] doReplayBatchOp(final HRegion region,
final List<WALSplitter.MutationReplay> mutations, long replaySeqId)
throws IOException {
-
long before = EnvironmentEdgeManager.currentTime();
boolean batchContainsPuts = false, batchContainsDelete = false;
try {
@@ -1069,7 +1067,7 @@ public class RSRpcServices implements
HBaseRPCErrorHandler,
LOG.info("Flushing " + region.getRegionNameAsString());
boolean shouldFlush = true;
if (request.hasIfOlderThanTs()) {
- shouldFlush = region.getLastFlushTime() < request.getIfOlderThanTs();
+ shouldFlush = region.getEarliestFlushTimeForAllStores() <
request.getIfOlderThanTs();
}
FlushRegionResponse.Builder builder = FlushRegionResponse.newBuilder();
if (shouldFlush) {
@@ -1086,7 +1084,7 @@ public class RSRpcServices implements
HBaseRPCErrorHandler,
}
builder.setFlushed(result);
}
- builder.setLastFlushTime(region.getLastFlushTime());
+ builder.setLastFlushTime( region.getEarliestFlushTimeForAllStores());
return builder.build();
} catch (DroppedSnapshotException ex) {
// Cache flush can fail in a few places. If it fails in a critical
@@ -2123,7 +2121,7 @@ public class RSRpcServices implements
HBaseRPCErrorHandler,
} else {
addResults(builder, results, controller,
RegionReplicaUtil.isDefaultReplica(region.getRegionInfo()));
}
- } finally {
+ } finally {
// We're done. On way out re-add the above removed lease.
// Adding resets expiration time on lease.
if (scanners.containsKey(scannerName)) {