http://git-wip-us.apache.org/repos/asf/hbase-site/blob/5a2158f2/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/HStore.StoreFlusherImpl.html
----------------------------------------------------------------------
diff --git
a/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/HStore.StoreFlusherImpl.html
b/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/HStore.StoreFlusherImpl.html
index 5243b06..af3ba51 100644
---
a/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/HStore.StoreFlusherImpl.html
+++
b/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/HStore.StoreFlusherImpl.html
@@ -2128,464 +2128,463 @@
<span class="sourceLineNo">2120</span> return this.region;<a
name="line.2120"></a>
<span class="sourceLineNo">2121</span> }<a name="line.2121"></a>
<span class="sourceLineNo">2122</span><a name="line.2122"></a>
-<span class="sourceLineNo">2123</span> @Override<a name="line.2123"></a>
-<span class="sourceLineNo">2124</span> public RegionCoprocessorHost
getCoprocessorHost() {<a name="line.2124"></a>
-<span class="sourceLineNo">2125</span> return
this.region.getCoprocessorHost();<a name="line.2125"></a>
-<span class="sourceLineNo">2126</span> }<a name="line.2126"></a>
-<span class="sourceLineNo">2127</span><a name="line.2127"></a>
-<span class="sourceLineNo">2128</span> @Override<a name="line.2128"></a>
-<span class="sourceLineNo">2129</span> public RegionInfo getRegionInfo() {<a
name="line.2129"></a>
-<span class="sourceLineNo">2130</span> return this.fs.getRegionInfo();<a
name="line.2130"></a>
-<span class="sourceLineNo">2131</span> }<a name="line.2131"></a>
-<span class="sourceLineNo">2132</span><a name="line.2132"></a>
-<span class="sourceLineNo">2133</span> @Override<a name="line.2133"></a>
-<span class="sourceLineNo">2134</span> public boolean areWritesEnabled() {<a
name="line.2134"></a>
-<span class="sourceLineNo">2135</span> return
this.region.areWritesEnabled();<a name="line.2135"></a>
-<span class="sourceLineNo">2136</span> }<a name="line.2136"></a>
-<span class="sourceLineNo">2137</span><a name="line.2137"></a>
-<span class="sourceLineNo">2138</span> @Override<a name="line.2138"></a>
-<span class="sourceLineNo">2139</span> public long getSmallestReadPoint() {<a
name="line.2139"></a>
-<span class="sourceLineNo">2140</span> return
this.region.getSmallestReadPoint();<a name="line.2140"></a>
-<span class="sourceLineNo">2141</span> }<a name="line.2141"></a>
-<span class="sourceLineNo">2142</span><a name="line.2142"></a>
-<span class="sourceLineNo">2143</span> /**<a name="line.2143"></a>
-<span class="sourceLineNo">2144</span> * Adds or replaces the specified
KeyValues.<a name="line.2144"></a>
-<span class="sourceLineNo">2145</span> * <p><a name="line.2145"></a>
-<span class="sourceLineNo">2146</span> * For each KeyValue specified, if a
cell with the same row, family, and qualifier exists in<a name="line.2146"></a>
-<span class="sourceLineNo">2147</span> * MemStore, it will be replaced.
Otherwise, it will just be inserted to MemStore.<a name="line.2147"></a>
-<span class="sourceLineNo">2148</span> * <p><a name="line.2148"></a>
-<span class="sourceLineNo">2149</span> * This operation is atomic on each
KeyValue (row/family/qualifier) but not necessarily atomic<a
name="line.2149"></a>
-<span class="sourceLineNo">2150</span> * across all of them.<a
name="line.2150"></a>
-<span class="sourceLineNo">2151</span> * @param cells<a name="line.2151"></a>
-<span class="sourceLineNo">2152</span> * @param readpoint readpoint below
which we can safely remove duplicate KVs<a name="line.2152"></a>
-<span class="sourceLineNo">2153</span> * @param memstoreSize<a
name="line.2153"></a>
-<span class="sourceLineNo">2154</span> * @throws IOException<a
name="line.2154"></a>
-<span class="sourceLineNo">2155</span> */<a name="line.2155"></a>
-<span class="sourceLineNo">2156</span> public void
upsert(Iterable<Cell> cells, long readpoint, MemStoreSize memstoreSize)<a
name="line.2156"></a>
-<span class="sourceLineNo">2157</span> throws IOException {<a
name="line.2157"></a>
-<span class="sourceLineNo">2158</span> this.lock.readLock().lock();<a
name="line.2158"></a>
-<span class="sourceLineNo">2159</span> try {<a name="line.2159"></a>
-<span class="sourceLineNo">2160</span> this.memstore.upsert(cells,
readpoint, memstoreSize);<a name="line.2160"></a>
-<span class="sourceLineNo">2161</span> } finally {<a name="line.2161"></a>
-<span class="sourceLineNo">2162</span> this.lock.readLock().unlock();<a
name="line.2162"></a>
-<span class="sourceLineNo">2163</span> }<a name="line.2163"></a>
-<span class="sourceLineNo">2164</span> }<a name="line.2164"></a>
-<span class="sourceLineNo">2165</span><a name="line.2165"></a>
-<span class="sourceLineNo">2166</span> public StoreFlushContext
createFlushContext(long cacheFlushId) {<a name="line.2166"></a>
-<span class="sourceLineNo">2167</span> return new
StoreFlusherImpl(cacheFlushId);<a name="line.2167"></a>
-<span class="sourceLineNo">2168</span> }<a name="line.2168"></a>
-<span class="sourceLineNo">2169</span><a name="line.2169"></a>
-<span class="sourceLineNo">2170</span> private final class StoreFlusherImpl
implements StoreFlushContext {<a name="line.2170"></a>
-<span class="sourceLineNo">2171</span><a name="line.2171"></a>
-<span class="sourceLineNo">2172</span> private long cacheFlushSeqNum;<a
name="line.2172"></a>
-<span class="sourceLineNo">2173</span> private MemStoreSnapshot snapshot;<a
name="line.2173"></a>
-<span class="sourceLineNo">2174</span> private List<Path>
tempFiles;<a name="line.2174"></a>
-<span class="sourceLineNo">2175</span> private List<Path>
committedFiles;<a name="line.2175"></a>
-<span class="sourceLineNo">2176</span> private long cacheFlushCount;<a
name="line.2176"></a>
-<span class="sourceLineNo">2177</span> private long cacheFlushSize;<a
name="line.2177"></a>
-<span class="sourceLineNo">2178</span> private long outputFileSize;<a
name="line.2178"></a>
-<span class="sourceLineNo">2179</span><a name="line.2179"></a>
-<span class="sourceLineNo">2180</span> private StoreFlusherImpl(long
cacheFlushSeqNum) {<a name="line.2180"></a>
-<span class="sourceLineNo">2181</span> this.cacheFlushSeqNum =
cacheFlushSeqNum;<a name="line.2181"></a>
-<span class="sourceLineNo">2182</span> }<a name="line.2182"></a>
-<span class="sourceLineNo">2183</span><a name="line.2183"></a>
-<span class="sourceLineNo">2184</span> /**<a name="line.2184"></a>
-<span class="sourceLineNo">2185</span> * This is not thread safe. The
caller should have a lock on the region or the store.<a name="line.2185"></a>
-<span class="sourceLineNo">2186</span> * If necessary, the lock can be
added with the patch provided in HBASE-10087<a name="line.2186"></a>
-<span class="sourceLineNo">2187</span> */<a name="line.2187"></a>
-<span class="sourceLineNo">2188</span> @Override<a name="line.2188"></a>
-<span class="sourceLineNo">2189</span> public void prepare() {<a
name="line.2189"></a>
-<span class="sourceLineNo">2190</span> // passing the current sequence
number of the wal - to allow bookkeeping in the memstore<a name="line.2190"></a>
-<span class="sourceLineNo">2191</span> this.snapshot =
memstore.snapshot();<a name="line.2191"></a>
-<span class="sourceLineNo">2192</span> this.cacheFlushCount =
snapshot.getCellsCount();<a name="line.2192"></a>
-<span class="sourceLineNo">2193</span> this.cacheFlushSize =
snapshot.getDataSize();<a name="line.2193"></a>
-<span class="sourceLineNo">2194</span> committedFiles = new
ArrayList<>(1);<a name="line.2194"></a>
-<span class="sourceLineNo">2195</span> }<a name="line.2195"></a>
-<span class="sourceLineNo">2196</span><a name="line.2196"></a>
-<span class="sourceLineNo">2197</span> @Override<a name="line.2197"></a>
-<span class="sourceLineNo">2198</span> public void flushCache(MonitoredTask
status) throws IOException {<a name="line.2198"></a>
-<span class="sourceLineNo">2199</span> RegionServerServices rsService =
region.getRegionServerServices();<a name="line.2199"></a>
-<span class="sourceLineNo">2200</span> ThroughputController
throughputController =<a name="line.2200"></a>
-<span class="sourceLineNo">2201</span> rsService == null ? null :
rsService.getFlushThroughputController();<a name="line.2201"></a>
-<span class="sourceLineNo">2202</span> tempFiles =
HStore.this.flushCache(cacheFlushSeqNum, snapshot, status,
throughputController);<a name="line.2202"></a>
-<span class="sourceLineNo">2203</span> }<a name="line.2203"></a>
-<span class="sourceLineNo">2204</span><a name="line.2204"></a>
-<span class="sourceLineNo">2205</span> @Override<a name="line.2205"></a>
-<span class="sourceLineNo">2206</span> public boolean commit(MonitoredTask
status) throws IOException {<a name="line.2206"></a>
-<span class="sourceLineNo">2207</span> if (this.tempFiles == null ||
this.tempFiles.isEmpty()) {<a name="line.2207"></a>
-<span class="sourceLineNo">2208</span> return false;<a
name="line.2208"></a>
-<span class="sourceLineNo">2209</span> }<a name="line.2209"></a>
-<span class="sourceLineNo">2210</span> List<HStoreFile> storeFiles
= new ArrayList<>(this.tempFiles.size());<a name="line.2210"></a>
-<span class="sourceLineNo">2211</span> for (Path storeFilePath :
tempFiles) {<a name="line.2211"></a>
-<span class="sourceLineNo">2212</span> try {<a name="line.2212"></a>
-<span class="sourceLineNo">2213</span> HStoreFile sf =
HStore.this.commitFile(storeFilePath, cacheFlushSeqNum, status);<a
name="line.2213"></a>
-<span class="sourceLineNo">2214</span> outputFileSize +=
sf.getReader().length();<a name="line.2214"></a>
-<span class="sourceLineNo">2215</span> storeFiles.add(sf);<a
name="line.2215"></a>
-<span class="sourceLineNo">2216</span> } catch (IOException ex) {<a
name="line.2216"></a>
-<span class="sourceLineNo">2217</span> LOG.error("Failed to commit
store file " + storeFilePath, ex);<a name="line.2217"></a>
-<span class="sourceLineNo">2218</span> // Try to delete the files we
have committed before.<a name="line.2218"></a>
-<span class="sourceLineNo">2219</span> for (HStoreFile sf :
storeFiles) {<a name="line.2219"></a>
-<span class="sourceLineNo">2220</span> Path pathToDelete =
sf.getPath();<a name="line.2220"></a>
-<span class="sourceLineNo">2221</span> try {<a name="line.2221"></a>
-<span class="sourceLineNo">2222</span> sf.deleteStoreFile();<a
name="line.2222"></a>
-<span class="sourceLineNo">2223</span> } catch (IOException
deleteEx) {<a name="line.2223"></a>
-<span class="sourceLineNo">2224</span> LOG.fatal("Failed to
delete store file we committed, halting " + pathToDelete, ex);<a
name="line.2224"></a>
-<span class="sourceLineNo">2225</span>
Runtime.getRuntime().halt(1);<a name="line.2225"></a>
-<span class="sourceLineNo">2226</span> }<a name="line.2226"></a>
-<span class="sourceLineNo">2227</span> }<a name="line.2227"></a>
-<span class="sourceLineNo">2228</span> throw new IOException("Failed
to commit the flush", ex);<a name="line.2228"></a>
-<span class="sourceLineNo">2229</span> }<a name="line.2229"></a>
-<span class="sourceLineNo">2230</span> }<a name="line.2230"></a>
-<span class="sourceLineNo">2231</span><a name="line.2231"></a>
-<span class="sourceLineNo">2232</span> for (HStoreFile sf : storeFiles)
{<a name="line.2232"></a>
-<span class="sourceLineNo">2233</span> if
(HStore.this.getCoprocessorHost() != null) {<a name="line.2233"></a>
-<span class="sourceLineNo">2234</span>
HStore.this.getCoprocessorHost().postFlush(HStore.this, sf);<a
name="line.2234"></a>
-<span class="sourceLineNo">2235</span> }<a name="line.2235"></a>
-<span class="sourceLineNo">2236</span>
committedFiles.add(sf.getPath());<a name="line.2236"></a>
-<span class="sourceLineNo">2237</span> }<a name="line.2237"></a>
-<span class="sourceLineNo">2238</span><a name="line.2238"></a>
-<span class="sourceLineNo">2239</span> HStore.this.flushedCellsCount +=
cacheFlushCount;<a name="line.2239"></a>
-<span class="sourceLineNo">2240</span> HStore.this.flushedCellsSize +=
cacheFlushSize;<a name="line.2240"></a>
-<span class="sourceLineNo">2241</span> HStore.this.flushedOutputFileSize
+= outputFileSize;<a name="line.2241"></a>
-<span class="sourceLineNo">2242</span><a name="line.2242"></a>
-<span class="sourceLineNo">2243</span> // Add new file to store files.
Clear snapshot too while we have the Store write lock.<a name="line.2243"></a>
-<span class="sourceLineNo">2244</span> return
HStore.this.updateStorefiles(storeFiles, snapshot.getId());<a
name="line.2244"></a>
-<span class="sourceLineNo">2245</span> }<a name="line.2245"></a>
-<span class="sourceLineNo">2246</span><a name="line.2246"></a>
-<span class="sourceLineNo">2247</span> @Override<a name="line.2247"></a>
-<span class="sourceLineNo">2248</span> public long getOutputFileSize() {<a
name="line.2248"></a>
-<span class="sourceLineNo">2249</span> return outputFileSize;<a
name="line.2249"></a>
-<span class="sourceLineNo">2250</span> }<a name="line.2250"></a>
-<span class="sourceLineNo">2251</span><a name="line.2251"></a>
-<span class="sourceLineNo">2252</span> @Override<a name="line.2252"></a>
-<span class="sourceLineNo">2253</span> public List<Path>
getCommittedFiles() {<a name="line.2253"></a>
-<span class="sourceLineNo">2254</span> return committedFiles;<a
name="line.2254"></a>
-<span class="sourceLineNo">2255</span> }<a name="line.2255"></a>
-<span class="sourceLineNo">2256</span><a name="line.2256"></a>
-<span class="sourceLineNo">2257</span> /**<a name="line.2257"></a>
-<span class="sourceLineNo">2258</span> * Similar to commit, but called in
secondary region replicas for replaying the<a name="line.2258"></a>
-<span class="sourceLineNo">2259</span> * flush cache from primary region.
Adds the new files to the store, and drops the<a name="line.2259"></a>
-<span class="sourceLineNo">2260</span> * snapshot depending on
dropMemstoreSnapshot argument.<a name="line.2260"></a>
-<span class="sourceLineNo">2261</span> * @param fileNames names of the
flushed files<a name="line.2261"></a>
-<span class="sourceLineNo">2262</span> * @param dropMemstoreSnapshot
whether to drop the prepared memstore snapshot<a name="line.2262"></a>
-<span class="sourceLineNo">2263</span> * @throws IOException<a
name="line.2263"></a>
-<span class="sourceLineNo">2264</span> */<a name="line.2264"></a>
-<span class="sourceLineNo">2265</span> @Override<a name="line.2265"></a>
-<span class="sourceLineNo">2266</span> public void
replayFlush(List<String> fileNames, boolean dropMemstoreSnapshot)<a
name="line.2266"></a>
-<span class="sourceLineNo">2267</span> throws IOException {<a
name="line.2267"></a>
-<span class="sourceLineNo">2268</span> List<HStoreFile> storeFiles
= new ArrayList<>(fileNames.size());<a name="line.2268"></a>
-<span class="sourceLineNo">2269</span> for (String file : fileNames) {<a
name="line.2269"></a>
-<span class="sourceLineNo">2270</span> // open the file as a store file
(hfile link, etc)<a name="line.2270"></a>
-<span class="sourceLineNo">2271</span> StoreFileInfo storeFileInfo =
fs.getStoreFileInfo(getColumnFamilyName(), file);<a name="line.2271"></a>
-<span class="sourceLineNo">2272</span> HStoreFile storeFile =
createStoreFileAndReader(storeFileInfo);<a name="line.2272"></a>
-<span class="sourceLineNo">2273</span> storeFiles.add(storeFile);<a
name="line.2273"></a>
-<span class="sourceLineNo">2274</span> HStore.this.storeSize +=
storeFile.getReader().length();<a name="line.2274"></a>
-<span class="sourceLineNo">2275</span>
HStore.this.totalUncompressedBytes +=
storeFile.getReader().getTotalUncompressedBytes();<a name="line.2275"></a>
-<span class="sourceLineNo">2276</span> if (LOG.isInfoEnabled()) {<a
name="line.2276"></a>
-<span class="sourceLineNo">2277</span> LOG.info("Region: " +
HStore.this.getRegionInfo().getEncodedName() +<a name="line.2277"></a>
-<span class="sourceLineNo">2278</span> " added " + storeFile + ",
entries=" + storeFile.getReader().getEntries() +<a name="line.2278"></a>
-<span class="sourceLineNo">2279</span> ", sequenceid=" +
+storeFile.getReader().getSequenceID() + ", filesize="<a name="line.2279"></a>
-<span class="sourceLineNo">2280</span> +
TraditionalBinaryPrefix.long2String(storeFile.getReader().length(), "", 1));<a
name="line.2280"></a>
-<span class="sourceLineNo">2281</span> }<a name="line.2281"></a>
-<span class="sourceLineNo">2282</span> }<a name="line.2282"></a>
-<span class="sourceLineNo">2283</span><a name="line.2283"></a>
-<span class="sourceLineNo">2284</span> long snapshotId = -1; // -1 means
do not drop<a name="line.2284"></a>
-<span class="sourceLineNo">2285</span> if (dropMemstoreSnapshot
&& snapshot != null) {<a name="line.2285"></a>
-<span class="sourceLineNo">2286</span> snapshotId = snapshot.getId();<a
name="line.2286"></a>
-<span class="sourceLineNo">2287</span> }<a name="line.2287"></a>
-<span class="sourceLineNo">2288</span>
HStore.this.updateStorefiles(storeFiles, snapshotId);<a name="line.2288"></a>
-<span class="sourceLineNo">2289</span> }<a name="line.2289"></a>
-<span class="sourceLineNo">2290</span><a name="line.2290"></a>
-<span class="sourceLineNo">2291</span> /**<a name="line.2291"></a>
-<span class="sourceLineNo">2292</span> * Abort the snapshot preparation.
Drops the snapshot if any.<a name="line.2292"></a>
-<span class="sourceLineNo">2293</span> * @throws IOException<a
name="line.2293"></a>
-<span class="sourceLineNo">2294</span> */<a name="line.2294"></a>
-<span class="sourceLineNo">2295</span> @Override<a name="line.2295"></a>
-<span class="sourceLineNo">2296</span> public void abort() throws
IOException {<a name="line.2296"></a>
-<span class="sourceLineNo">2297</span> if (snapshot == null) {<a
name="line.2297"></a>
-<span class="sourceLineNo">2298</span> return;<a name="line.2298"></a>
-<span class="sourceLineNo">2299</span> }<a name="line.2299"></a>
-<span class="sourceLineNo">2300</span> HStore.this.updateStorefiles(new
ArrayList<>(0), snapshot.getId());<a name="line.2300"></a>
-<span class="sourceLineNo">2301</span> }<a name="line.2301"></a>
-<span class="sourceLineNo">2302</span> }<a name="line.2302"></a>
-<span class="sourceLineNo">2303</span><a name="line.2303"></a>
-<span class="sourceLineNo">2304</span> @Override<a name="line.2304"></a>
-<span class="sourceLineNo">2305</span> public boolean needsCompaction() {<a
name="line.2305"></a>
-<span class="sourceLineNo">2306</span> return
this.storeEngine.needsCompaction(this.filesCompacting);<a name="line.2306"></a>
-<span class="sourceLineNo">2307</span> }<a name="line.2307"></a>
-<span class="sourceLineNo">2308</span><a name="line.2308"></a>
-<span class="sourceLineNo">2309</span> /**<a name="line.2309"></a>
-<span class="sourceLineNo">2310</span> * Used for tests.<a
name="line.2310"></a>
-<span class="sourceLineNo">2311</span> * @return cache configuration for
this Store.<a name="line.2311"></a>
-<span class="sourceLineNo">2312</span> */<a name="line.2312"></a>
-<span class="sourceLineNo">2313</span> @VisibleForTesting<a
name="line.2313"></a>
-<span class="sourceLineNo">2314</span> public CacheConfig getCacheConfig()
{<a name="line.2314"></a>
-<span class="sourceLineNo">2315</span> return this.cacheConf;<a
name="line.2315"></a>
-<span class="sourceLineNo">2316</span> }<a name="line.2316"></a>
-<span class="sourceLineNo">2317</span><a name="line.2317"></a>
-<span class="sourceLineNo">2318</span> public static final long
FIXED_OVERHEAD =<a name="line.2318"></a>
-<span class="sourceLineNo">2319</span> ClassSize.align(ClassSize.OBJECT +
(17 * ClassSize.REFERENCE) + (11 * Bytes.SIZEOF_LONG)<a name="line.2319"></a>
-<span class="sourceLineNo">2320</span> + (5 * Bytes.SIZEOF_INT) +
(2 * Bytes.SIZEOF_BOOLEAN));<a name="line.2320"></a>
-<span class="sourceLineNo">2321</span><a name="line.2321"></a>
-<span class="sourceLineNo">2322</span> public static final long DEEP_OVERHEAD
= ClassSize.align(FIXED_OVERHEAD<a name="line.2322"></a>
-<span class="sourceLineNo">2323</span> + ClassSize.OBJECT +
ClassSize.REENTRANT_LOCK<a name="line.2323"></a>
-<span class="sourceLineNo">2324</span> +
ClassSize.CONCURRENT_SKIPLISTMAP<a name="line.2324"></a>
-<span class="sourceLineNo">2325</span> +
ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + ClassSize.OBJECT<a
name="line.2325"></a>
-<span class="sourceLineNo">2326</span> + ScanInfo.FIXED_OVERHEAD);<a
name="line.2326"></a>
-<span class="sourceLineNo">2327</span><a name="line.2327"></a>
-<span class="sourceLineNo">2328</span> @Override<a name="line.2328"></a>
-<span class="sourceLineNo">2329</span> public long heapSize() {<a
name="line.2329"></a>
-<span class="sourceLineNo">2330</span> MemStoreSize memstoreSize =
this.memstore.size();<a name="line.2330"></a>
-<span class="sourceLineNo">2331</span> return DEEP_OVERHEAD +
memstoreSize.getHeapSize();<a name="line.2331"></a>
-<span class="sourceLineNo">2332</span> }<a name="line.2332"></a>
-<span class="sourceLineNo">2333</span><a name="line.2333"></a>
-<span class="sourceLineNo">2334</span> @Override<a name="line.2334"></a>
-<span class="sourceLineNo">2335</span> public CellComparator getComparator()
{<a name="line.2335"></a>
-<span class="sourceLineNo">2336</span> return comparator;<a
name="line.2336"></a>
-<span class="sourceLineNo">2337</span> }<a name="line.2337"></a>
-<span class="sourceLineNo">2338</span><a name="line.2338"></a>
-<span class="sourceLineNo">2339</span> public ScanInfo getScanInfo() {<a
name="line.2339"></a>
-<span class="sourceLineNo">2340</span> return scanInfo;<a
name="line.2340"></a>
-<span class="sourceLineNo">2341</span> }<a name="line.2341"></a>
-<span class="sourceLineNo">2342</span><a name="line.2342"></a>
-<span class="sourceLineNo">2343</span> /**<a name="line.2343"></a>
-<span class="sourceLineNo">2344</span> * Set scan info, used by test<a
name="line.2344"></a>
-<span class="sourceLineNo">2345</span> * @param scanInfo new scan info to
use for test<a name="line.2345"></a>
-<span class="sourceLineNo">2346</span> */<a name="line.2346"></a>
-<span class="sourceLineNo">2347</span> void setScanInfo(ScanInfo scanInfo)
{<a name="line.2347"></a>
-<span class="sourceLineNo">2348</span> this.scanInfo = scanInfo;<a
name="line.2348"></a>
-<span class="sourceLineNo">2349</span> }<a name="line.2349"></a>
-<span class="sourceLineNo">2350</span><a name="line.2350"></a>
-<span class="sourceLineNo">2351</span> @Override<a name="line.2351"></a>
-<span class="sourceLineNo">2352</span> public boolean hasTooManyStoreFiles()
{<a name="line.2352"></a>
-<span class="sourceLineNo">2353</span> return getStorefilesCount() >
this.blockingFileCount;<a name="line.2353"></a>
-<span class="sourceLineNo">2354</span> }<a name="line.2354"></a>
-<span class="sourceLineNo">2355</span><a name="line.2355"></a>
-<span class="sourceLineNo">2356</span> @Override<a name="line.2356"></a>
-<span class="sourceLineNo">2357</span> public long getFlushedCellsCount() {<a
name="line.2357"></a>
-<span class="sourceLineNo">2358</span> return flushedCellsCount;<a
name="line.2358"></a>
-<span class="sourceLineNo">2359</span> }<a name="line.2359"></a>
-<span class="sourceLineNo">2360</span><a name="line.2360"></a>
-<span class="sourceLineNo">2361</span> @Override<a name="line.2361"></a>
-<span class="sourceLineNo">2362</span> public long getFlushedCellsSize() {<a
name="line.2362"></a>
-<span class="sourceLineNo">2363</span> return flushedCellsSize;<a
name="line.2363"></a>
-<span class="sourceLineNo">2364</span> }<a name="line.2364"></a>
-<span class="sourceLineNo">2365</span><a name="line.2365"></a>
-<span class="sourceLineNo">2366</span> @Override<a name="line.2366"></a>
-<span class="sourceLineNo">2367</span> public long getFlushedOutputFileSize()
{<a name="line.2367"></a>
-<span class="sourceLineNo">2368</span> return flushedOutputFileSize;<a
name="line.2368"></a>
-<span class="sourceLineNo">2369</span> }<a name="line.2369"></a>
-<span class="sourceLineNo">2370</span><a name="line.2370"></a>
-<span class="sourceLineNo">2371</span> @Override<a name="line.2371"></a>
-<span class="sourceLineNo">2372</span> public long getCompactedCellsCount()
{<a name="line.2372"></a>
-<span class="sourceLineNo">2373</span> return compactedCellsCount;<a
name="line.2373"></a>
-<span class="sourceLineNo">2374</span> }<a name="line.2374"></a>
-<span class="sourceLineNo">2375</span><a name="line.2375"></a>
-<span class="sourceLineNo">2376</span> @Override<a name="line.2376"></a>
-<span class="sourceLineNo">2377</span> public long getCompactedCellsSize()
{<a name="line.2377"></a>
-<span class="sourceLineNo">2378</span> return compactedCellsSize;<a
name="line.2378"></a>
-<span class="sourceLineNo">2379</span> }<a name="line.2379"></a>
-<span class="sourceLineNo">2380</span><a name="line.2380"></a>
-<span class="sourceLineNo">2381</span> @Override<a name="line.2381"></a>
-<span class="sourceLineNo">2382</span> public long
getMajorCompactedCellsCount() {<a name="line.2382"></a>
-<span class="sourceLineNo">2383</span> return majorCompactedCellsCount;<a
name="line.2383"></a>
-<span class="sourceLineNo">2384</span> }<a name="line.2384"></a>
-<span class="sourceLineNo">2385</span><a name="line.2385"></a>
-<span class="sourceLineNo">2386</span> @Override<a name="line.2386"></a>
-<span class="sourceLineNo">2387</span> public long
getMajorCompactedCellsSize() {<a name="line.2387"></a>
-<span class="sourceLineNo">2388</span> return majorCompactedCellsSize;<a
name="line.2388"></a>
-<span class="sourceLineNo">2389</span> }<a name="line.2389"></a>
-<span class="sourceLineNo">2390</span><a name="line.2390"></a>
-<span class="sourceLineNo">2391</span> /**<a name="line.2391"></a>
-<span class="sourceLineNo">2392</span> * Returns the StoreEngine that is
backing this concrete implementation of Store.<a name="line.2392"></a>
-<span class="sourceLineNo">2393</span> * @return Returns the {@link
StoreEngine} object used internally inside this HStore object.<a
name="line.2393"></a>
-<span class="sourceLineNo">2394</span> */<a name="line.2394"></a>
-<span class="sourceLineNo">2395</span> @VisibleForTesting<a
name="line.2395"></a>
-<span class="sourceLineNo">2396</span> public StoreEngine<?, ?, ?, ?>
getStoreEngine() {<a name="line.2396"></a>
-<span class="sourceLineNo">2397</span> return this.storeEngine;<a
name="line.2397"></a>
-<span class="sourceLineNo">2398</span> }<a name="line.2398"></a>
-<span class="sourceLineNo">2399</span><a name="line.2399"></a>
-<span class="sourceLineNo">2400</span> protected OffPeakHours
getOffPeakHours() {<a name="line.2400"></a>
-<span class="sourceLineNo">2401</span> return this.offPeakHours;<a
name="line.2401"></a>
-<span class="sourceLineNo">2402</span> }<a name="line.2402"></a>
-<span class="sourceLineNo">2403</span><a name="line.2403"></a>
-<span class="sourceLineNo">2404</span> /**<a name="line.2404"></a>
-<span class="sourceLineNo">2405</span> * {@inheritDoc}<a
name="line.2405"></a>
-<span class="sourceLineNo">2406</span> */<a name="line.2406"></a>
-<span class="sourceLineNo">2407</span> @Override<a name="line.2407"></a>
-<span class="sourceLineNo">2408</span> public void
onConfigurationChange(Configuration conf) {<a name="line.2408"></a>
-<span class="sourceLineNo">2409</span> this.conf = new
CompoundConfiguration()<a name="line.2409"></a>
-<span class="sourceLineNo">2410</span> .add(conf)<a
name="line.2410"></a>
-<span class="sourceLineNo">2411</span>
.addBytesMap(family.getValues());<a name="line.2411"></a>
-<span class="sourceLineNo">2412</span>
this.storeEngine.compactionPolicy.setConf(conf);<a name="line.2412"></a>
-<span class="sourceLineNo">2413</span> this.offPeakHours =
OffPeakHours.getInstance(conf);<a name="line.2413"></a>
-<span class="sourceLineNo">2414</span> }<a name="line.2414"></a>
-<span class="sourceLineNo">2415</span><a name="line.2415"></a>
-<span class="sourceLineNo">2416</span> /**<a name="line.2416"></a>
-<span class="sourceLineNo">2417</span> * {@inheritDoc}<a
name="line.2417"></a>
-<span class="sourceLineNo">2418</span> */<a name="line.2418"></a>
-<span class="sourceLineNo">2419</span> @Override<a name="line.2419"></a>
-<span class="sourceLineNo">2420</span> public void
registerChildren(ConfigurationManager manager) {<a name="line.2420"></a>
-<span class="sourceLineNo">2421</span> // No children to register<a
name="line.2421"></a>
-<span class="sourceLineNo">2422</span> }<a name="line.2422"></a>
-<span class="sourceLineNo">2423</span><a name="line.2423"></a>
-<span class="sourceLineNo">2424</span> /**<a name="line.2424"></a>
-<span class="sourceLineNo">2425</span> * {@inheritDoc}<a
name="line.2425"></a>
-<span class="sourceLineNo">2426</span> */<a name="line.2426"></a>
-<span class="sourceLineNo">2427</span> @Override<a name="line.2427"></a>
-<span class="sourceLineNo">2428</span> public void
deregisterChildren(ConfigurationManager manager) {<a name="line.2428"></a>
-<span class="sourceLineNo">2429</span> // No children to deregister<a
name="line.2429"></a>
-<span class="sourceLineNo">2430</span> }<a name="line.2430"></a>
-<span class="sourceLineNo">2431</span><a name="line.2431"></a>
-<span class="sourceLineNo">2432</span> @Override<a name="line.2432"></a>
-<span class="sourceLineNo">2433</span> public double getCompactionPressure()
{<a name="line.2433"></a>
-<span class="sourceLineNo">2434</span> return
storeEngine.getStoreFileManager().getCompactionPressure();<a
name="line.2434"></a>
-<span class="sourceLineNo">2435</span> }<a name="line.2435"></a>
-<span class="sourceLineNo">2436</span><a name="line.2436"></a>
-<span class="sourceLineNo">2437</span> @Override<a name="line.2437"></a>
-<span class="sourceLineNo">2438</span> public boolean isPrimaryReplicaStore()
{<a name="line.2438"></a>
-<span class="sourceLineNo">2439</span> return
getRegionInfo().getReplicaId() == RegionInfo.DEFAULT_REPLICA_ID;<a
name="line.2439"></a>
-<span class="sourceLineNo">2440</span> }<a name="line.2440"></a>
-<span class="sourceLineNo">2441</span><a name="line.2441"></a>
-<span class="sourceLineNo">2442</span> /**<a name="line.2442"></a>
-<span class="sourceLineNo">2443</span> * Sets the store up for a region
level snapshot operation.<a name="line.2443"></a>
-<span class="sourceLineNo">2444</span> * @see #postSnapshotOperation()<a
name="line.2444"></a>
-<span class="sourceLineNo">2445</span> */<a name="line.2445"></a>
-<span class="sourceLineNo">2446</span> public void preSnapshotOperation() {<a
name="line.2446"></a>
-<span class="sourceLineNo">2447</span> archiveLock.lock();<a
name="line.2447"></a>
-<span class="sourceLineNo">2448</span> }<a name="line.2448"></a>
-<span class="sourceLineNo">2449</span><a name="line.2449"></a>
-<span class="sourceLineNo">2450</span> /**<a name="line.2450"></a>
-<span class="sourceLineNo">2451</span> * Perform tasks needed after the
completion of snapshot operation.<a name="line.2451"></a>
-<span class="sourceLineNo">2452</span> * @see #preSnapshotOperation()<a
name="line.2452"></a>
-<span class="sourceLineNo">2453</span> */<a name="line.2453"></a>
-<span class="sourceLineNo">2454</span> public void postSnapshotOperation()
{<a name="line.2454"></a>
-<span class="sourceLineNo">2455</span> archiveLock.unlock();<a
name="line.2455"></a>
-<span class="sourceLineNo">2456</span> }<a name="line.2456"></a>
-<span class="sourceLineNo">2457</span><a name="line.2457"></a>
-<span class="sourceLineNo">2458</span> /**<a name="line.2458"></a>
-<span class="sourceLineNo">2459</span> * Closes and archives the compacted
files under this store<a name="line.2459"></a>
-<span class="sourceLineNo">2460</span> */<a name="line.2460"></a>
-<span class="sourceLineNo">2461</span> public synchronized void
closeAndArchiveCompactedFiles() throws IOException {<a name="line.2461"></a>
-<span class="sourceLineNo">2462</span> // ensure other threads do not
attempt to archive the same files on close()<a name="line.2462"></a>
-<span class="sourceLineNo">2463</span> archiveLock.lock();<a
name="line.2463"></a>
-<span class="sourceLineNo">2464</span> try {<a name="line.2464"></a>
-<span class="sourceLineNo">2465</span> lock.readLock().lock();<a
name="line.2465"></a>
-<span class="sourceLineNo">2466</span> Collection<HStoreFile>
copyCompactedfiles = null;<a name="line.2466"></a>
-<span class="sourceLineNo">2467</span> try {<a name="line.2467"></a>
-<span class="sourceLineNo">2468</span> Collection<HStoreFile>
compactedfiles =<a name="line.2468"></a>
-<span class="sourceLineNo">2469</span>
this.getStoreEngine().getStoreFileManager().getCompactedfiles();<a
name="line.2469"></a>
-<span class="sourceLineNo">2470</span> if (compactedfiles != null
&& compactedfiles.size() != 0) {<a name="line.2470"></a>
-<span class="sourceLineNo">2471</span> // Do a copy under read lock<a
name="line.2471"></a>
-<span class="sourceLineNo">2472</span> copyCompactedfiles = new
ArrayList<>(compactedfiles);<a name="line.2472"></a>
-<span class="sourceLineNo">2473</span> } else {<a name="line.2473"></a>
-<span class="sourceLineNo">2474</span> if (LOG.isTraceEnabled()) {<a
name="line.2474"></a>
-<span class="sourceLineNo">2475</span> LOG.trace("No compacted
files to archive");<a name="line.2475"></a>
-<span class="sourceLineNo">2476</span> return;<a
name="line.2476"></a>
-<span class="sourceLineNo">2477</span> }<a name="line.2477"></a>
-<span class="sourceLineNo">2478</span> }<a name="line.2478"></a>
-<span class="sourceLineNo">2479</span> } finally {<a name="line.2479"></a>
-<span class="sourceLineNo">2480</span> lock.readLock().unlock();<a
name="line.2480"></a>
-<span class="sourceLineNo">2481</span> }<a name="line.2481"></a>
-<span class="sourceLineNo">2482</span> if (copyCompactedfiles != null
&& !copyCompactedfiles.isEmpty()) {<a name="line.2482"></a>
-<span class="sourceLineNo">2483</span>
removeCompactedfiles(copyCompactedfiles);<a name="line.2483"></a>
-<span class="sourceLineNo">2484</span> }<a name="line.2484"></a>
-<span class="sourceLineNo">2485</span> } finally {<a name="line.2485"></a>
-<span class="sourceLineNo">2486</span> archiveLock.unlock();<a
name="line.2486"></a>
-<span class="sourceLineNo">2487</span> }<a name="line.2487"></a>
-<span class="sourceLineNo">2488</span> }<a name="line.2488"></a>
-<span class="sourceLineNo">2489</span><a name="line.2489"></a>
-<span class="sourceLineNo">2490</span> /**<a name="line.2490"></a>
-<span class="sourceLineNo">2491</span> * Archives and removes the compacted
files<a name="line.2491"></a>
-<span class="sourceLineNo">2492</span> * @param compactedfiles The compacted
files in this store that are not active in reads<a name="line.2492"></a>
-<span class="sourceLineNo">2493</span> * @throws IOException<a
name="line.2493"></a>
-<span class="sourceLineNo">2494</span> */<a name="line.2494"></a>
-<span class="sourceLineNo">2495</span> private void
removeCompactedfiles(Collection<HStoreFile> compactedfiles)<a
name="line.2495"></a>
-<span class="sourceLineNo">2496</span> throws IOException {<a
name="line.2496"></a>
-<span class="sourceLineNo">2497</span> final List<HStoreFile>
filesToRemove = new ArrayList<>(compactedfiles.size());<a
name="line.2497"></a>
-<span class="sourceLineNo">2498</span> for (final HStoreFile file :
compactedfiles) {<a name="line.2498"></a>
-<span class="sourceLineNo">2499</span> synchronized (file) {<a
name="line.2499"></a>
-<span class="sourceLineNo">2500</span> try {<a name="line.2500"></a>
-<span class="sourceLineNo">2501</span> StoreFileReader r =
file.getReader();<a name="line.2501"></a>
-<span class="sourceLineNo">2502</span> if (r == null) {<a
name="line.2502"></a>
-<span class="sourceLineNo">2503</span> if (LOG.isDebugEnabled())
{<a name="line.2503"></a>
-<span class="sourceLineNo">2504</span> LOG.debug("The file " +
file + " was closed but still not archived.");<a name="line.2504"></a>
-<span class="sourceLineNo">2505</span> }<a name="line.2505"></a>
-<span class="sourceLineNo">2506</span> filesToRemove.add(file);<a
name="line.2506"></a>
-<span class="sourceLineNo">2507</span> continue;<a
name="line.2507"></a>
-<span class="sourceLineNo">2508</span> }<a name="line.2508"></a>
-<span class="sourceLineNo">2509</span> if (file.isCompactedAway()
&& !file.isReferencedInReads()) {<a name="line.2509"></a>
-<span class="sourceLineNo">2510</span> // Even if deleting fails we
need not bother as any new scanners won't be<a name="line.2510"></a>
-<span class="sourceLineNo">2511</span> // able to use the compacted
file as the status is already compactedAway<a name="line.2511"></a>
-<span class="sourceLineNo">2512</span> if (LOG.isTraceEnabled())
{<a name="line.2512"></a>
-<span class="sourceLineNo">2513</span> LOG.trace("Closing and
archiving the file " + file.getPath());<a name="line.2513"></a>
-<span class="sourceLineNo">2514</span> }<a name="line.2514"></a>
-<span class="sourceLineNo">2515</span> r.close(true);<a
name="line.2515"></a>
-<span class="sourceLineNo">2516</span> // Just close and return<a
name="line.2516"></a>
-<span class="sourceLineNo">2517</span> filesToRemove.add(file);<a
name="line.2517"></a>
-<span class="sourceLineNo">2518</span> }<a name="line.2518"></a>
-<span class="sourceLineNo">2519</span> } catch (Exception e) {<a
name="line.2519"></a>
-<span class="sourceLineNo">2520</span> LOG.error(<a
name="line.2520"></a>
-<span class="sourceLineNo">2521</span> "Exception while trying to
close the compacted store file " + file.getPath().getName());<a
name="line.2521"></a>
-<span class="sourceLineNo">2522</span> }<a name="line.2522"></a>
-<span class="sourceLineNo">2523</span> }<a name="line.2523"></a>
-<span class="sourceLineNo">2524</span> }<a name="line.2524"></a>
-<span class="sourceLineNo">2525</span> if (this.isPrimaryReplicaStore())
{<a name="line.2525"></a>
-<span class="sourceLineNo">2526</span> // Only the primary region is
allowed to move the file to archive.<a name="line.2526"></a>
-<span class="sourceLineNo">2527</span> // The secondary region does not
move the files to archive. Any active reads from<a name="line.2527"></a>
-<span class="sourceLineNo">2528</span> // the secondary region will still
work because the file as such has active readers on it.<a name="line.2528"></a>
-<span class="sourceLineNo">2529</span> if (!filesToRemove.isEmpty()) {<a
name="line.2529"></a>
-<span class="sourceLineNo">2530</span> if (LOG.isDebugEnabled()) {<a
name="line.2530"></a>
-<span class="sourceLineNo">2531</span> LOG.debug("Moving the files "
+ filesToRemove + " to archive");<a name="line.2531"></a>
-<span class="sourceLineNo">2532</span> }<a name="line.2532"></a>
-<span class="sourceLineNo">2533</span> // Only if this is successful it
has to be removed<a name="line.2533"></a>
-<span class="sourceLineNo">2534</span> try {<a name="line.2534"></a>
-<span class="sourceLineNo">2535</span>
this.fs.removeStoreFiles(this.getColumnFamilyDescriptor().getNameAsString(),
filesToRemove);<a name="line.2535"></a>
-<span class="sourceLineNo">2536</span> } catch (FailedArchiveException
fae) {<a name="line.2536"></a>
-<span class="sourceLineNo">2537</span> // Even if archiving some
files failed, we still need to clear out any of the<a name="line.2537"></a>
-<span class="sourceLineNo">2538</span> // files which were
successfully archived. Otherwise we will receive a<a name="line.2538"></a>
-<span class="sourceLineNo">2539</span> // FileNotFoundException when
we attempt to re-archive them in the next go around.<a name="line.2539"></a>
-<span class="sourceLineNo">2540</span> Collection<Path>
failedFiles = fae.getFailedFiles();<a name="line.2540"></a>
-<span class="sourceLineNo">2541</span> Iterator<HStoreFile>
iter = filesToRemove.iterator();<a name="line.2541"></a>
-<span class="sourceLineNo">2542</span> while (iter.hasNext()) {<a
name="line.2542"></a>
-<span class="sourceLineNo">2543</span> if
(failedFiles.contains(iter.next().getPath())) {<a name="line.2543"></a>
-<span class="sourceLineNo">2544</span> iter.remove();<a
name="line.2544"></a>
-<span class="sourceLineNo">2545</span> }<a name="line.2545"></a>
-<span class="sourceLineNo">2546</span> }<a name="line.2546"></a>
-<span class="sourceLineNo">2547</span> if (!filesToRemove.isEmpty())
{<a name="line.2547"></a>
-<span class="sourceLineNo">2548</span>
clearCompactedfiles(filesToRemove);<a name="line.2548"></a>
-<span class="sourceLineNo">2549</span> }<a name="line.2549"></a>
-<span class="sourceLineNo">2550</span> throw fae;<a
name="line.2550"></a>
-<span class="sourceLineNo">2551</span> }<a name="line.2551"></a>
-<span class="sourceLineNo">2552</span> }<a name="line.2552"></a>
-<span class="sourceLineNo">2553</span> }<a name="line.2553"></a>
-<span class="sourceLineNo">2554</span> if (!filesToRemove.isEmpty()) {<a
name="line.2554"></a>
-<span class="sourceLineNo">2555</span> // Clear the compactedfiles from
the store file manager<a name="line.2555"></a>
-<span class="sourceLineNo">2556</span>
clearCompactedfiles(filesToRemove);<a name="line.2556"></a>
-<span class="sourceLineNo">2557</span> }<a name="line.2557"></a>
-<span class="sourceLineNo">2558</span> }<a name="line.2558"></a>
-<span class="sourceLineNo">2559</span><a name="line.2559"></a>
-<span class="sourceLineNo">2560</span> public Long preFlushSeqIDEstimation()
{<a name="line.2560"></a>
-<span class="sourceLineNo">2561</span> return
memstore.preFlushSeqIDEstimation();<a name="line.2561"></a>
-<span class="sourceLineNo">2562</span> }<a name="line.2562"></a>
-<span class="sourceLineNo">2563</span><a name="line.2563"></a>
-<span class="sourceLineNo">2564</span> @Override<a name="line.2564"></a>
-<span class="sourceLineNo">2565</span> public boolean isSloppyMemStore() {<a
name="line.2565"></a>
-<span class="sourceLineNo">2566</span> return this.memstore.isSloppy();<a
name="line.2566"></a>
-<span class="sourceLineNo">2567</span> }<a name="line.2567"></a>
-<span class="sourceLineNo">2568</span><a name="line.2568"></a>
-<span class="sourceLineNo">2569</span> private void
clearCompactedfiles(List<HStoreFile> filesToRemove) throws IOException
{<a name="line.2569"></a>
-<span class="sourceLineNo">2570</span> if (LOG.isTraceEnabled()) {<a
name="line.2570"></a>
-<span class="sourceLineNo">2571</span> LOG.trace("Clearing the compacted
file " + filesToRemove + " from this store");<a name="line.2571"></a>
-<span class="sourceLineNo">2572</span> }<a name="line.2572"></a>
-<span class="sourceLineNo">2573</span> try {<a name="line.2573"></a>
-<span class="sourceLineNo">2574</span> lock.writeLock().lock();<a
name="line.2574"></a>
-<span class="sourceLineNo">2575</span>
this.getStoreEngine().getStoreFileManager().removeCompactedFiles(filesToRemove);<a
name="line.2575"></a>
-<span class="sourceLineNo">2576</span> } finally {<a name="line.2576"></a>
-<span class="sourceLineNo">2577</span> lock.writeLock().unlock();<a
name="line.2577"></a>
-<span class="sourceLineNo">2578</span> }<a name="line.2578"></a>
-<span class="sourceLineNo">2579</span> }<a name="line.2579"></a>
-<span class="sourceLineNo">2580</span>}<a name="line.2580"></a>
+<span class="sourceLineNo">2123</span> public RegionCoprocessorHost
getCoprocessorHost() {<a name="line.2123"></a>
+<span class="sourceLineNo">2124</span> return
this.region.getCoprocessorHost();<a name="line.2124"></a>
+<span class="sourceLineNo">2125</span> }<a name="line.2125"></a>
+<span class="sourceLineNo">2126</span><a name="line.2126"></a>
+<span class="sourceLineNo">2127</span> @Override<a name="line.2127"></a>
+<span class="sourceLineNo">2128</span> public RegionInfo getRegionInfo() {<a
name="line.2128"></a>
+<span class="sourceLineNo">2129</span> return this.fs.getRegionInfo();<a
name="line.2129"></a>
+<span class="sourceLineNo">2130</span> }<a name="line.2130"></a>
+<span class="sourceLineNo">2131</span><a name="line.2131"></a>
+<span class="sourceLineNo">2132</span> @Override<a name="line.2132"></a>
+<span class="sourceLineNo">2133</span> public boolean areWritesEnabled() {<a
name="line.2133"></a>
+<span class="sourceLineNo">2134</span> return
this.region.areWritesEnabled();<a name="line.2134"></a>
+<span class="sourceLineNo">2135</span> }<a name="line.2135"></a>
+<span class="sourceLineNo">2136</span><a name="line.2136"></a>
+<span class="sourceLineNo">2137</span> @Override<a name="line.2137"></a>
+<span class="sourceLineNo">2138</span> public long getSmallestReadPoint() {<a
name="line.2138"></a>
+<span class="sourceLineNo">2139</span> return
this.region.getSmallestReadPoint();<a name="line.2139"></a>
+<span class="sourceLineNo">2140</span> }<a name="line.2140"></a>
+<span class="sourceLineNo">2141</span><a name="line.2141"></a>
+<span class="sourceLineNo">2142</span> /**<a name="line.2142"></a>
+<span class="sourceLineNo">2143</span> * Adds or replaces the specified
KeyValues.<a name="line.2143"></a>
+<span class="sourceLineNo">2144</span> * <p><a name="line.2144"></a>
+<span class="sourceLineNo">2145</span> * For each KeyValue specified, if a
cell with the same row, family, and qualifier exists in<a name="line.2145"></a>
+<span class="sourceLineNo">2146</span> * MemStore, it will be replaced.
Otherwise, it will just be inserted to MemStore.<a name="line.2146"></a>
+<span class="sourceLineNo">2147</span> * <p><a name="line.2147"></a>
+<span class="sourceLineNo">2148</span> * This operation is atomic on each
KeyValue (row/family/qualifier) but not necessarily atomic<a
name="line.2148"></a>
+<span class="sourceLineNo">2149</span> * across all of them.<a
name="line.2149"></a>
+<span class="sourceLineNo">2150</span> * @param cells<a name="line.2150"></a>
+<span class="sourceLineNo">2151</span> * @param readpoint readpoint below
which we can safely remove duplicate KVs<a name="line.2151"></a>
+<span class="sourceLineNo">2152</span> * @param memstoreSize<a
name="line.2152"></a>
+<span class="sourceLineNo">2153</span> * @throws IOException<a
name="line.2153"></a>
+<span class="sourceLineNo">2154</span> */<a name="line.2154"></a>
+<span class="sourceLineNo">2155</span> public void
upsert(Iterable<Cell> cells, long readpoint, MemStoreSize memstoreSize)<a
name="line.2155"></a>
+<span class="sourceLineNo">2156</span> throws IOException {<a
name="line.2156"></a>
+<span class="sourceLineNo">2157</span> this.lock.readLock().lock();<a
name="line.2157"></a>
+<span class="sourceLineNo">2158</span> try {<a name="line.2158"></a>
+<span class="sourceLineNo">2159</span> this.memstore.upsert(cells,
readpoint, memstoreSize);<a name="line.2159"></a>
+<span class="sourceLineNo">2160</span> } finally {<a name="line.2160"></a>
+<span class="sourceLineNo">2161</span> this.lock.readLock().unlock();<a
name="line.2161"></a>
+<span class="sourceLineNo">2162</span> }<a name="line.2162"></a>
+<span class="sourceLineNo">2163</span> }<a name="line.2163"></a>
+<span class="sourceLineNo">2164</span><a name="line.2164"></a>
+<span class="sourceLineNo">2165</span> public StoreFlushContext
createFlushContext(long cacheFlushId) {<a name="line.2165"></a>
+<span class="sourceLineNo">2166</span> return new
StoreFlusherImpl(cacheFlushId);<a name="line.2166"></a>
+<span class="sourceLineNo">2167</span> }<a name="line.2167"></a>
+<span class="sourceLineNo">2168</span><a name="line.2168"></a>
+<span class="sourceLineNo">2169</span> private final class StoreFlusherImpl
implements StoreFlushContext {<a name="line.2169"></a>
+<span class="sourceLineNo">2170</span><a name="line.2170"></a>
+<span class="sourceLineNo">2171</span> private long cacheFlushSeqNum;<a
name="line.2171"></a>
+<span class="sourceLineNo">2172</span> private MemStoreSnapshot snapshot;<a
name="line.2172"></a>
+<span class="sourceLineNo">2173</span> private List<Path>
tempFiles;<a name="line.2173"></a>
+<span class="sourceLineNo">2174</span> private List<Path>
committedFiles;<a name="line.2174"></a>
+<span class="sourceLineNo">2175</span> private long cacheFlushCount;<a
name="line.2175"></a>
+<span class="sourceLineNo">2176</span> private long cacheFlushSize;<a
name="line.2176"></a>
+<span class="sourceLineNo">2177</span> private long outputFileSize;<a
name="line.2177"></a>
+<span class="sourceLineNo">2178</span><a name="line.2178"></a>
+<span class="sourceLineNo">2179</span> private StoreFlusherImpl(long
cacheFlushSeqNum) {<a name="line.2179"></a>
+<span class="sourceLineNo">2180</span> this.cacheFlushSeqNum =
cacheFlushSeqNum;<a name="line.2180"></a>
+<span class="sourceLineNo">2181</span> }<a name="line.2181"></a>
+<span class="sourceLineNo">2182</span><a name="line.2182"></a>
+<span class="sourceLineNo">2183</span> /**<a name="line.2183"></a>
+<span class="sourceLineNo">2184</span> * This is not thread safe. The
caller should have a lock on the region or the store.<a name="line.2184"></a>
+<span class="sourceLineNo">2185</span> * If necessary, the lock can be
added with the patch provided in HBASE-10087<a name="line.2185"></a>
+<span class="sourceLineNo">2186</span> */<a name="line.2186"></a>
+<span class="sourceLineNo">2187</span> @Override<a name="line.2187"></a>
+<span class="sourceLineNo">2188</span> public void prepare() {<a
name="line.2188"></a>
+<span class="sourceLineNo">2189</span> // passing the current sequence
number of the wal - to allow bookkeeping in the memstore<a name="line.2189"></a>
+<span class="sourceLineNo">2190</span> this.snapshot =
memstore.snapshot();<a name="line.2190"></a>
+<span class="sourceLineNo">2191</span> this.cacheFlushCount =
snapshot.getCellsCount();<a name="line.2191"></a>
+<span class="sourceLineNo">2192</span> this.cacheFlushSize =
snapshot.getDataSize();<a name="line.2192"></a>
+<span class="sourceLineNo">2193</span> committedFiles = new
ArrayList<>(1);<a name="line.2193"></a>
+<span class="sourceLineNo">2194</span> }<a name="line.2194"></a>
+<span class="sourceLineNo">2195</span><a name="line.2195"></a>
+<span class="sourceLineNo">2196</span> @Override<a name="line.2196"></a>
+<span class="sourceLineNo">2197</span> public void flushCache(MonitoredTask
status) throws IOException {<a name="line.2197"></a>
+<span class="sourceLineNo">2198</span> RegionServerServices rsService =
region.getRegionServerServices();<a name="line.2198"></a>
+<span class="sourceLineNo">2199</span> ThroughputController
throughputController =<a name="line.2199"></a>
+<span class="sourceLineNo">2200</span> rsService == null ? null :
rsService.getFlushThroughputController();<a name="line.2200"></a>
+<span class="sourceLineNo">2201</span> tempFiles =
HStore.this.flushCache(cacheFlushSeqNum, snapshot, status,
throughputController);<a name="line.2201"></a>
+<span class="sourceLineNo">2202</span> }<a name="line.2202"></a>
+<span class="sourceLineNo">2203</span><a name="line.2203"></a>
+<span class="sourceLineNo">2204</span> @Override<a name="line.2204"></a>
+<span class="sourceLineNo">2205</span> public boolean commit(MonitoredTask
status) throws IOException {<a name="line.2205"></a>
+<span class="sourceLineNo">2206</span> if (this.tempFiles == null ||
this.tempFiles.isEmpty()) {<a name="line.2206"></a>
+<span class="sourceLineNo">2207</span> return false;<a
name="line.2207"></a>
+<span class="sourceLineNo">2208</span> }<a name="line.2208"></a>
+<span class="sourceLineNo">2209</span> List<HStoreFile> storeFiles
= new ArrayList<>(this.tempFiles.size());<a name="line.2209"></a>
+<span class="sourceLineNo">2210</span> for (Path storeFilePath :
tempFiles) {<a name="line.2210"></a>
+<span class="sourceLineNo">2211</span> try {<a name="line.2211"></a>
+<span class="sourceLineNo">2212</span> HStoreFile sf =
HStore.this.commitFile(storeFilePath, cacheFlushSeqNum, status);<a
name="line.2212"></a>
+<span class="sourceLineNo">2213</span> outputFileSize +=
sf.getReader().length();<a name="line.2213"></a>
+<span class="sourceLineNo">2214</span> storeFiles.add(sf);<a
name="line.2214"></a>
+<span class="sourceLineNo">2215</span> } catch (IOException ex) {<a
name="line.2215"></a>
+<span class="sourceLineNo">2216</span> LOG.error("Failed to commit
store file " + storeFilePath, ex);<a name="line.2216"></a>
+<span class="sourceLineNo">2217</span> // Try to delete the files we
have committed before.<a name="line.2217"></a>
+<span class="sourceLineNo">2218</span> for (HStoreFile sf :
storeFiles) {<a name="line.2218"></a>
+<span class="sourceLineNo">2219</span> Path pathToDelete =
sf.getPath();<a name="line.2219"></a>
+<span class="sourceLineNo">2220</span> try {<a name="line.2220"></a>
+<span class="sourceLineNo">2221</span> sf.deleteStoreFile();<a
name="line.2221"></a>
+<span class="sourceLineNo">2222</span> } catch (IOException
deleteEx) {<a name="line.2222"></a>
+<span class="sourceLineNo">2223</span> LOG.fatal("Failed to
delete store file we committed, halting " + pathToDelete, ex);<a
name="line.2223"></a>
+<span class="sourceLineNo">2224</span>
Runtime.getRuntime().halt(1);<a name="line.2224"></a>
+<span class="sourceLineNo">2225</span> }<a name="line.2225"></a>
+<span class="sourceLineNo">2226</span> }<a name="line.2226"></a>
+<span class="sourceLineNo">2227</span> throw new IOException("Failed
to commit the flush", ex);<a name="line.2227"></a>
+<span class="sourceLineNo">2228</span> }<a name="line.2228"></a>
+<span class="sourceLineNo">2229</span> }<a name="line.2229"></a>
+<span class="sourceLineNo">2230</span><a name="line.2230"></a>
+<span class="sourceLineNo">2231</span> for (HStoreFile sf : storeFiles)
{<a name="line.2231"></a>
+<span class="sourceLineNo">2232</span> if
(HStore.this.getCoprocessorHost() != null) {<a name="line.2232"></a>
+<span class="sourceLineNo">2233</span>
HStore.this.getCoprocessorHost().postFlush(HStore.this, sf);<a
name="line.2233"></a>
+<span class="sourceLineNo">2234</span> }<a name="line.2234"></a>
+<span class="sourceLineNo">2235</span>
committedFiles.add(sf.getPath());<a name="line.2235"></a>
+<span class="sourceLineNo">2236</span> }<a name="line.2236"></a>
+<span class="sourceLineNo">2237</span><a name="line.2237"></a>
+<span class="sourceLineNo">2238</span> HStore.this.flushedCellsCount +=
cacheFlushCount;<a name="line.2238"></a>
+<span class="sourceLineNo">2239</span> HStore.this.flushedCellsSize +=
cacheFlushSize;<a name="line.2239"></a>
+<span class="sourceLineNo">2240</span> HStore.this.flushedOutputFileSize
+= outputFileSize;<a name="line.2240"></a>
+<span class="sourceLineNo">2241</span><a name="line.2241"></a>
+<span class="sourceLineNo">2242</span> // Add new file to store files.
Clear snapshot too while we have the Store write lock.<a name="line.2242"></a>
+<span class="sourceLineNo">2243</span> return
HStore.this.updateStorefiles(storeFiles, snapshot.getId());<a
name="line.2243"></a>
+<span class="sourceLineNo">2244</span> }<a name="line.2244"></a>
+<span class="sourceLineNo">2245</span><a name="line.2245"></a>
+<span class="sourceLineNo">2246</span> @Override<a name="line.2246"></a>
+<span class="sourceLineNo">2247</span> public long getOutputFileSize() {<a
name="line.2247"></a>
+<span class="sourceLineNo">2248</span> return outputFileSize;<a
name="line.2248"></a>
+<span class="sourceLineNo">2249</span> }<a name="line.2249"></a>
+<span class="sourceLineNo">2250</span><a name="line.2250"></a>
+<span class="sourceLineNo">2251</span> @Override<a name="line.2251"></a>
+<span class="sourceLineNo">2252</span> public List<Path>
getCommittedFiles() {<a name="line.2252"></a>
+<span class="sourceLineNo">2253</span> return committedFiles;<a
name="line.2253"></a>
+<span class="sourceLineNo">2254</span> }<a name="line.2254"></a>
+<span class="sourceLineNo">2255</span><a name="line.2255"></a>
+<span class="sourceLineNo">2256</span> /**<a name="line.2256"></a>
+<span class="sourceLineNo">2257</span> * Similar to commit, but called in
secondary region replicas for replaying the<a name="line.2257"></a>
+<span class="sourceLineNo">2258</span> * flush cache from primary region.
Adds the new files to the store, and drops the<a name="line.2258"></a>
+<span class="sourceLineNo">2259</span> * snapshot depending on
dropMemstoreSnapshot argument.<a name="line.2259"></a>
+<span class="sourceLineNo">2260</span> * @param fileNames names of the
flushed files<a name="line.2260"></a>
+<span class="sourceLineNo">2261</span> * @param dropMemstoreSnapshot
whether to drop the prepared memstore snapshot<a name="line.2261"></a>
+<span class="sourceLineNo">2262</span> * @throws IOException<a
name="line.2262"></a>
+<span class="sourceLineNo">2263</span> */<a name="line.2263"></a>
+<span class="sourceLineNo">2264</span> @Override<a name="line.2264"></a>
+<span class="sourceLineNo">2265</span> public void
replayFlush(List<String> fileNames, boolean dropMemstoreSnapshot)<a
name="line.2265"></a>
+<span class="sourceLineNo">2266</span> throws IOException {<a
name="line.2266"></a>
+<span class="sourceLineNo">2267</span> List<HStoreFile> storeFiles
= new ArrayList<>(fileNames.size());<a name="line.2267"></a>
+<span class="sourceLineNo">2268</span> for (String file : fileNames) {<a
name="line.2268"></a>
+<span class="sourceLineNo">2269</span> // open the file as a store file
(hfile link, etc)<a name="line.2269"></a>
+<span class="sourceLineNo">2270</span> StoreFileInfo storeFileInfo =
fs.getStoreFileInfo(getColumnFamilyName(), file);<a name="line.2270"></a>
+<span class="sourceLineNo">2271</span> HStoreFile storeFile =
createStoreFileAndReader(storeFileInfo);<a name="line.2271"></a>
+<span class="sourceLineNo">2272</span> storeFiles.add(storeFile);<a
name="line.2272"></a>
+<span class="sourceLineNo">2273</span> HStore.this.storeSize +=
storeFile.getReader().length();<a name="line.2273"></a>
+<span class="sourceLineNo">2274</span>
HStore.this.totalUncompressedBytes +=
storeFile.getReader().getTotalUncompressedBytes();<a name="line.2274"></a>
+<span class="sourceLineNo">2275</span> if (LOG.isInfoEnabled()) {<a
name="line.2275"></a>
+<span class="sourceLineNo">2276</span> LOG.info("Region: " +
HStore.this.getRegionInfo().getEncodedName() +<a name="line.2276"></a>
+<span class="sourceLineNo">2277</span> " added " + storeFile + ",
entries=" + storeFile.getReader().getEntries() +<a name="line.2277"></a>
+<span class="sourceLineNo">2278</span> ", sequenceid=" +
+storeFile.getReader().getSequenceID() + ", filesize="<a name="line.2278"></a>
+<span class="sourceLineNo">2279</span> +
TraditionalBinaryPrefix.long2String(storeFile.getReader().length(), "", 1));<a
name="line.2279"></a>
+<span class="sourceLineNo">2280</span> }<a name="line.2280"></a>
+<span class="sourceLineNo">2281</span> }<a name="line.2281"></a>
+<span class="sourceLineNo">2282</span><a name="line.2282"></a>
+<span class="sourceLineNo">2283</span> long snapshotId = -1; // -1 means
do not drop<a name="line.2283"></a>
+<span class="sourceLineNo">2284</span> if (dropMemstoreSnapshot
&& snapshot != null) {<a name="line.2284"></a>
+<span class="sourceLineNo">2285</span> snapshotId = snapshot.getId();<a
name="line.2285"></a>
+<span class="sourceLineNo">2286</span> }<a name="line.2286"></a>
+<span class="sourceLineNo">2287</span>
HStore.this.updateStorefiles(storeFiles, snapshotId);<a name="line.2287"></a>
+<span class="sourceLineNo">2288</span> }<a name="line.2288"></a>
+<span class="sourceLineNo">2289</span><a name="line.2289"></a>
+<span class="sourceLineNo">2290</span> /**<a name="line.2290"></a>
+<span class="sourceLineNo">2291</span> * Abort the snapshot preparation.
Drops the snapshot if any.<a name="line.2291"></a>
+<span class="sourceLineNo">2292</span> * @throws IOException<a
name="line.2292"></a>
+<span class="sourceLineNo">2293</span> */<a name="line.2293"></a>
+<span class="sourceLineNo">2294</span> @Override<a name="line.2294"></a>
+<span class="sourceLineNo">2295</span> public void abort() throws
IOException {<a name="line.2295"></a>
+<span class="sourceLineNo">2296</span> if (snapshot == null) {<a
name="line.2296"></a>
+<span class="sourceLineNo">2297</span> return;<a name="line.2297"></a>
+<span class="sourceLineNo">2298</span> }<a name="line.2298"></a>
+<span class="sourceLineNo">2299</span> HStore.this.updateStorefiles(new
ArrayList<>(0), snapshot.getId());<a name="line.2299"></a>
+<span class="sourceLineNo">2300</span> }<a name="line.2300"></a>
+<span class="sourceLineNo">2301</span> }<a name="line.2301"></a>
+<span class="sourceLineNo">2302</span><a name="line.2302"></a>
+<span class="sourceLineNo">2303</span> @Override<a name="line.2303"></a>
+<span class="sourceLineNo">2304</span> public boolean needsCompaction() {<a
name="line.2304"></a>
+<span class="sourceLineNo">2305</span> return
this.storeEngine.needsCompaction(this.filesCompacting);<a name="line.2305"></a>
+<span class="sourceLineNo">2306</span> }<a name="line.2306"></a>
+<span class="sourceLineNo">2307</span><a name="line.2307"></a>
+<span class="sourceLineNo">2308</span> /**<a name="line.2308"></a>
+<span class="sourceLineNo">2309</span> * Used for tests.<a
name="line.2309"></a>
+<span class="sourceLineNo">2310</span> * @return cache configuration for
this Store.<a name="line.2310"></a>
+<span class="sourceLineNo">2311</span> */<a name="line.2311"></a>
+<span class="sourceLineNo">2312</span> @VisibleForTesting<a
name="line.2312"></a>
+<span class="sourceLineNo">2313</span> public CacheConfig getCacheConfig()
{<a name="line.2313"></a>
+<span class="sourceLineNo">2314</span> return this.cacheConf;<a
name="line.2314"></a>
+<span class="sourceLineNo">2315</span> }<a name="line.2315"></a>
+<span class="sourceLineNo">2316</span><a name="line.2316"></a>
+<span class="sourceLineNo">2317</span> public static final long
FIXED_OVERHEAD =<a name="line.2317"></a>
+<span class="sourceLineNo">2318</span> ClassSize.align(ClassSize.OBJECT +
(17 * ClassSize.REFERENCE) + (11 * Bytes.SIZEOF_LONG)<a name="line.2318"></a>
+<span class="sourceLineNo">2319</span> + (5 * Bytes.SIZEOF_INT) +
(2 * Bytes.SIZEOF_BOOLEAN));<a name="line.2319"></a>
+<span class="sourceLineNo">2320</span><a name="line.2320"></a>
+<span class="sourceLineNo">2321</span> public static final long DEEP_OVERHEAD
= ClassSize.align(FIXED_OVERHEAD<a name="line.2321"></a>
+<span class="sourceLineNo">2322</span> + ClassSize.OBJECT +
ClassSize.REENTRANT_LOCK<a name="line.2322"></a>
+<span class="sourceLineNo">2323</span> +
ClassSize.CONCURRENT_SKIPLISTMAP<a name="line.2323"></a>
+<span class="sourceLineNo">2324</span> +
ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + ClassSize.OBJECT<a
name="line.2324"></a>
+<span class="sourceLineNo">2325</span> + ScanInfo.FIXED_OVERHEAD);<a
name="line.2325"></a>
+<span class="sourceLineNo">2326</span><a name="line.2326"></a>
+<span class="sourceLineNo">2327</span> @Override<a name="line.2327"></a>
+<span class="sourceLineNo">2328</span> public long heapSize() {<a
name="line.2328"></a>
+<span class="sourceLineNo">2329</span> MemStoreSize memstoreSize =
this.memstore.size();<a name="line.2329"></a>
+<span class="sourceLineNo">2330</span> return DEEP_OVERHEAD +
memstoreSize.getHeapSize();<a name="line.2330"></a>
+<span class="sourceLineNo">2331</span> }<a name="line.2331"></a>
+<span class="sourceLineNo">2332</span><a name="line.2332"></a>
+<span class="sourceLineNo">2333</span> @Override<a name="line.2333"></a>
+<span class="sourceLineNo">2334</span> public CellComparator getComparator()
{<a name="line.2334"></a>
+<span class="sourceLineNo">2335</span> return comparator;<a
name="line.2335"></a>
+<span class="sourceLineNo">2336</span> }<a name="line.2336"></a>
+<span class="sourceLineNo">2337</span><a name="line.2337"></a>
+<span class="sourceLineNo">2338</span> public ScanInfo getScanInfo() {<a
name="line.2338"></a>
+<span class="sourceLineNo">2339</span> return scanInfo;<a
name="line.2339"></a>
+<span class="sourceLineNo">2340</span> }<a name="line.2340"></a>
+<span class="sourceLineNo">2341</span><a name="line.2341"></a>
+<span class="sourceLineNo">2342</span> /**<a name="line.2342"></a>
+<span class="sourceLineNo">2343</span> * Set scan info, used by test<a
name="line.2343"></a>
+<span class="sourceLineNo">2344</span> * @param scanInfo new scan info to
use for test<a name="line.2344"></a>
+<span class="sourceLineNo">2345</span> */<a name="line.2345"></a>
+<span class="sourceLineNo">2346</span> void setScanInfo(ScanInfo scanInfo)
{<a name="line.2346"></a>
+<span class="sourceLineNo">2347</span> this.scanInfo = scanInfo;<a
name="line.2347"></a>
+<span class="sourceLineNo">2348</span> }<a name="line.2348"></a>
+<span class="sourceLineNo">2349</span><a name="line.2349"></a>
+<span class="sourceLineNo">2350</span> @Override<a name="line.2350"></a>
+<span class="sourceLineNo">2351</span> public boolean hasTooManyStoreFiles()
{<a name="line.2351"></a>
+<span class="sourceLineNo">2352</span> return getStorefilesCount() >
this.blockingFileCount;<a name="line.2352"></a>
+<span class="sourceLineNo">2353</span> }<a name="line.2353"></a>
+<span class="sourceLineNo">2354</span><a name="line.2354"></a>
+<span class="sourceLineNo">2355</span> @Override<a name="line.2355"></a>
+<span class="sourceLineNo">2356</span> public long getFlushedCellsCount() {<a
name="line.2356"></a>
+<span class="sourceLineNo">2357</span> return flushedCellsCount;<a
name="line.2357"></a>
+<span class="sourceLineNo">2358</span> }<a name="line.2358"></a>
+<span class="sourceLineNo">2359</span><a name="line.2359"></a>
+<span class="sourceLineNo">2360</span> @Override<a name="line.2360"></a>
+<span class="sourceLineNo">2361</span> public long getFlushedCellsSize() {<a
name="line.2361"></a>
+<span class="sourceLineNo">2362</span> return flushedCellsSize;<a
name="line.2362"></a>
+<span class="sourceLineNo">2363</span> }<a name="line.2363"></a>
+<span class="sourceLineNo">2364</span><a name="line.2364"></a>
+<span class="sourceLineNo">2365</span> @Override<a name="line.2365"></a>
+<span class="sourceLineNo">2366</span> public long getFlushedOutputFileSize()
{<a name="line.2366"></a>
+<span class="sourceLineNo">2367</span> return flushedOutputFileSize;<a
name="line.2367"></a>
+<span class="sourceLineNo">2368</span> }<a name="line.2368"></a>
+<span class="sourceLineNo">2369</span><a name="line.2369"></a>
+<span class="sourceLineNo">2370</span> @Override<a name="line.2370"></a>
+<span class="sourceLineNo">2371</span> public long getCompactedCellsCount()
{<a name="line.2371"></a>
+<span class="sourceLineNo">2372</span> return compactedCellsCount;<a
name="line.2372"></a>
+<span class="sourceLineNo">2373</span> }<a name="line.2373"></a>
+<span class="sourceLineNo">2374</span><a name="line.2374"></a>
+<span class="sourceLineNo">2375</span> @Override<a name="line.2375"></a>
+<span class="sourceLineNo">2376</span> public long getCompactedCellsSize()
{<a name="line.2376"></a>
+<span class="sourceLineNo">2377</span> return compactedCellsSize;<a
name="line.2377"></a>
+<span class="sourceLineNo">2378</span> }<a name="line.2378"></a>
+<span class="sourceLineNo">2379</span><a name="line.2379"></a>
+<span class="sourceLineNo">2380</span> @Override<a name="line.2380"></a>
+<span class="sourceLineNo">2381</span> public long
getMajorCompactedCellsCount() {<a name="line.2381"></a>
+<span class="sourceLineNo">2382</span> return majorCompactedCellsCount;<a
name="line.2382"></a>
+<span class="sourceLineNo">2383</span> }<a name="line.2383"></a>
+<span class="sourceLineNo">2384</span><a name="line.2384"></a>
+<span class="sourceLineNo">2385</span> @Override<a name="line.2385"></a>
+<span class="sourceLineNo">2386</span> public long
getMajorCompactedCellsSize() {<a name="line.2386"></a>
+<span class="sourceLineNo">2387</span> return majorCompactedCellsSize;<a
name="line.2387"></a>
+<span class="sourceLineNo">2388</span> }<a name="line.2388"></a>
+<span class="sourceLineNo">2389</span><a name="line.2389"></a>
+<span class="sourceLineNo">2390</span> /**<a name="line.2390"></a>
+<span class="sourceLineNo">2391</span> * Returns the StoreEngine that is
backing this concrete implementation of Store.<a name="line.2391"></a>
+<span class="sourceLineNo">2392</span> * @return Returns the {@link
StoreEngine} object used internally inside this HStore object.<a
name="line.2392"></a>
+<span class="sourceLineNo">2393</span> */<a name="line.2393"></a>
+<span class="sourceLineNo">2394</span> @VisibleForTesting<a
name="line.2394"></a>
+<span class="sourceLineNo">2395</span> public StoreEngine<?, ?, ?, ?>
getStoreEngine() {<a name="line.2395"></a>
+<span class="sourceLineNo">2396</span> return this.storeEngine;<a
name="line.2396"></a>
+<span class="sourceLineNo">2397</span> }<a name="line.2397"></a>
+<span class="sourceLineNo">2398</span><a name="line.2398"></a>
+<span class="sourceLineNo">2399</span> protected OffPeakHours
getOffPeakHours() {<a name="line.2399"></a>
+<span class="sourceLineNo">2400</span> return this.offPeakHours;<a
name="line.2400"></a>
+<span class="sourceLineNo">2401</span> }<a name="line.2401"></a>
+<span class="sourceLineNo">2402</span><a name="line.2402"></a>
+<span class="sourceLineNo">2403</span> /**<a name="line.2403"></a>
+<span class="sourceLineNo">2404</span> * {@inheritDoc}<a
name="line.2404"></a>
+<span class="sourceLineNo">2405</span> */<a name="line.2405"></a>
+<span class="sourceLineNo">2406</span> @Override<a name="line.2406"></a>
+<span class="sourceLineNo">2407</span> public void
onConfigurationChange(Configuration conf) {<a name="line.2407"></a>
+<span class="sourceLineNo">2408</span> this.conf = new
CompoundConfiguration()<a name="line.2408"></a>
+<span class="sourceLineNo">2409</span> .add(conf)<a
name="line.2409"></a>
+<span class="sourceLineNo">2410</span>
.addBytesMap(family.getValues());<a name="line.2410"></a>
+<span class="sourceLineNo">2411</span>
this.storeEngine.compactionPolicy.setConf(conf);<a name="line.2411"></a>
+<span class="sourceLineNo">2412</span> this.offPeakHours =
OffPeakHours.getInstance(conf);<a name="line.2412"></a>
+<span class="sourceLineNo">2413</span> }<a name="line.2413"></a>
+<span class="sourceLineNo">2414</span><a name="line.2414"></a>
+<span class="sourceLineNo">2415</span> /**<a name="line.2415"></a>
+<span class="sourceLineNo">2416</span> * {@inheritDoc}<a
name="line.2416"></a>
+<span class="sourceLineNo">2417</span> */<a name="line.2417"></a>
+<span class="sourceLineNo">2418</span> @Override<a name="line.2418"></a>
+<span class="sourceLineNo">2419</span> public void
registerChildren(ConfigurationManager manager) {<a name="line.2419"></a>
+<span class="sourceLineNo">2420</span> // No children to register<a
name="line.2420"></a>
+<span class="sourceLineNo">2421</span> }<a name="line.2421"></a>
+<span class="sourceLineNo">2422</span><a name="line.2422"></a>
+<span class="sourceLineNo">2423</span> /**<a name="line.2423"></a>
+<span class="sourceLineNo">2424</span> * {@inheritDoc}<a
name="line.2424"></a>
+<span class="sourceLineNo">2425</span> */<a name="line.2425"></a>
+<span class="sourceLineNo">2426</span> @Override<a name="line.2426"></a>
+<span class="sourceLineNo">2427</span> public void
deregisterChildren(ConfigurationManager manager) {<a name="line.2427"></a>
+<span class="sourceLineNo">2428</span> // No children to deregister<a
name="line.2428"></a>
+<span class="sourceLineNo">2429</span> }<a name="line.2429"></a>
+<span class="sourceLineNo">2430</span><a name="line.2430"></a>
+<span class="sourceLineNo">2431</span> @Override<a name="line.2431"></a>
+<span class="sourceLineNo">2432</span> public double getCompactionPressure()
{<a name="line.2432"></a>
+<span class="sourceLineNo">2433</span> return
storeEngine.getStoreFileManager().getCompactionPressure();<a
name="line.2433"></a>
+<span class="sourceLineNo">2434</span> }<a name="line.2434"></a>
+<span class="sourceLineNo">2435</span><a name="line.2435"></a>
+<span class="sourceLineNo">2436</span> @Override<a name="line.2436"></a>
+<span class="sourceLineNo">2437</span> public boolean isPrimaryReplicaStore()
{<a name="line.2437"></a>
+<span class="sourceLineNo">2438</span> return
getRegionInfo().getReplicaId() == RegionInfo.DEFAULT_REPLICA_ID;<a
name="line.2438"></a>
+<span class="sourceLineNo">2439</span> }<a name="line.2439"></a>
+<span class="sourceLineNo">2440</span><a name="line.2440"></a>
+<span class="sourceLineNo">2441</span> /**<a name="line.2441"></a>
+<span class="sourceLineNo">2442</span> * Sets the store up for a region
level snapshot operation.<a name="line.2442"></a>
+<span class="sourceLineNo">2443</span> * @see #postSnapshotOperation()<a
name="line.2443"></a>
+<span class="sourceLineNo">2444</span> */<a name="line.2444"></a>
+<span class="sourceLineNo">2445</span> public void preSnapshotOperation() {<a
name="line.2445"></a>
+<span class="sourceLineNo">2446</span> archiveLock.lock();<a
name="line.2446"></a>
+<span class="sourceLineNo">2447</span> }<a name="line.2447"></a>
+<span class="sourceLineNo">2448</span><a name="line.2448"></a>
+<span class="sourceLineNo">2449</span> /**<a name="line.2449"></a>
+<span class="sourceLineNo">2450</span> * Perform tasks needed after the
completion of snapshot operation.<a name="line.2450"></a>
+<span class="sourceLineNo">2451</span> * @see #preSnapshotOperation()<a
name="line.2451"></a>
+<span class="sourceLineNo">2452</span> */<a name="line.2452"></a>
+<span class="sourceLineNo">2453</span> public void postSnapshotOperation()
{<a name="line.2453"></a>
+<span class="sourceLineNo">2454</span> archiveLock.unlock();<a
name="line.2454"></a>
+<span class="sourceLineNo">2455</span> }<a name="line.2455"></a>
+<span class="sourceLineNo">2456</span><a name="line.2456"></a>
+<span class="sourceLineNo">2457</span> /**<a name="line.2457"></a>
+<span class="sourceLineNo">2458</span> * Closes and archives the compacted
files under this store<a name="line.2458"></a>
+<span class="sourceLineNo">2459</span> */<a name="line.2459"></a>
+<span class="sourceLineNo">2460</span> public synchronized void
closeAndArchiveCompactedFiles() throws IOException {<a name="line.2460"></a>
+<span class="sourceLineNo">2461</span> // ensure other threads do not
attempt to archive the same files on close()<a name="line.2461"></a>
+<span class="sourceLineNo">2462</span> archiveLock.lock();<a
name="line.2462"></a>
+<span class="sourceLineNo">2463</span> try {<a name="line.2463"></a>
+<span class="sourceLineNo">2464</span> lock.readLock().lock();<a
name="line.2464"></a>
+<span class="sourceLineNo">2465</span> Collection<HStoreFile>
copyCompactedfiles = null;<a name="line.2465"></a>
+<span class="sourceLineNo">2466</span> try {<a name="line.2466"></a>
+<span class="sourceLineNo">2467</span> Collection<HStoreFile>
compactedfiles =<a name="line.2467"></a>
+<span class="sourceLineNo">2468</span>
this.getStoreEngine().getStoreFileManager().getCompactedfiles();<a
name="line.2468"></a>
+<span class="sourceLineNo">2469</span> if (compactedfiles != null
&& compactedfiles.size() != 0) {<a name="line.2469"></a>
+<span class="sourceLineNo">2470</span> // Do a copy under read lock<a
name="line.2470"></a>
+<span class="sourceLineNo">2471</span> copyCompactedfiles = new
ArrayList<>(compactedfiles);<a name="line.2471"></a>
+<span class="sourceLineNo">2472</span> } else {<a name="line.2472"></a>
+<span class="sourceLineNo">2473</span> if (LOG.isTraceEnabled()) {<a
name="line.2473"></a>
+<span class="sourceLineNo">2474</span> LOG.trace("No compacted
files to archive");<a name="line.2474"></a>
+<span class="sourceLineNo">2475</span> return;<a
name="line.2475"></a>
+<span class="sourceLineNo">2476</span> }<a name="line.2476"></a>
+<span class="sourceLineNo">2477</span> }<a name="line.2477"></a>
+<span class="sourceLineNo">2478</span> } finally {<a name="line.2478"></a>
+<span class="sourceLineNo">2479</span> lock.readLock().unlock();<a
name="line.2479"></a>
+<span class="sourceLineNo">2480</span> }<a name="line.2480"></a>
+<span class="sourceLineNo">2481</span> if (copyCompactedfiles != null
&& !copyCompactedfiles.isEmpty()) {<a name="line.2481"></a>
+<span class="sourceLineNo">2482</span>
removeCompactedfiles(copyCompactedfiles);<a name="line.2482"></a>
+<span class="sourceLineNo">2483</span> }<a name="line.2483"></a>
+<span class="sourceLineNo">2484</span> } finally {<a name="line.2484"></a>
+<span class="sourceLineNo">2485</span> archiveLock.unlock();<a
name="line.2485"></a>
+<span class="sourceLineNo">2486</span> }<a name="line.2486"></a>
+<span class="sourceLineNo">2487</span> }<a name="line.2487"></a>
+<span class="sourceLineNo">2488</span><a name="line.2488"></a>
+<span class="sourceLineNo">2489</span> /**<a name="line.2489"></a>
+<span class="sourceLineNo">2490</span> * Archives and removes the compacted
files<a name="line.2490"></a>
+<span class="sourceLineNo">2491</span> * @param compactedfiles The compacted
files in this store that are not active in reads<a name="line.2491"></a>
+<span class="sourceLineNo">2492</span> * @throws IOException<a
name="line.2492"></a>
+<span class="sourceLineNo">2493</span> */<a name="line.2493"></a>
+<span class="sourceLineNo">2494</span> private void
removeCompactedfiles(Collection<HStoreFile> compactedfiles)<a
name="line.2494"></a>
+<span class="sourceLineNo">2495</span> throws IOException {<a
name="line.2495"></a>
+<span class="sourceLineNo">2496</span> final List<HStoreFile>
filesToRemove = new ArrayList<>(compactedfiles.size());<a
name="line.2496"></a>
+<span class="sourceLineNo">2497</span> for (final HStoreFile file :
compactedfiles) {<a name="line.2497"></a>
+<span class="sourceLineNo">2498</span> synchronized (file) {<a
name="line.2498"></a>
+<span class="sourceLineNo">2499</span> try {<a name="line.2499"></a>
+<span class="sourceLineNo">2500</span> StoreFileReader r =
file.getReader();<a name="line.2500"></a>
+<span class="sourceLineNo">2501</span> if (r == null) {<a
name="line.2501"></a>
+<span class="sourceLineNo">2502</span> if (LOG.isDebugEnabled())
{<a name="line.2502"></a>
+<span class="sourceLineNo">2503</span> LOG.debug("The file " +
file + " was closed but still not archived.");<a name="line.2503"></a>
+<span class="sourceLineNo">2504</span> }<a name="line.2504"></a>
+<span class="sourceLineNo">2505</span> filesToRemove.add(file);<a
name="line.2505"></a>
+<span class="sourceLineNo">2506</span> continue;<a
name="line.2506"></a>
+<span class="sourceLineNo">2507</span> }<a name="line.2507"></a>
+<span class="sourceLineNo">2508</span> if (file.isCompactedAway()
&& !file.isReferencedInReads()) {<a name="line.2508"></a>
+<span class="sourceLineNo">2509</span> // Even if deleting fails we
need not bother as any new scanners won't be<a name="line.2509"></a>
+<span class="sourceLineNo">2510</span> // able to use the compacted
file as the status is already compactedAway<a name="line.2510"></a>
+<span class="sourceLineNo">2511</span> if (LOG.isTraceEnabled())
{<a name="line.2511"></a>
+<span class="sourceLineNo">2512</span> LOG.trace("Closing and
archiving the file " + file.getPath());<a name="line.2512"></a>
+<span class="sourceLineNo">2513</span> }<a name="line.2513"></a>
+<span class="sourceLineNo">2514</span> r.close(true);<a
name="line.2514"></a>
+<span class="sourceLineNo">2515</span> // Just close and return<a
name="line.2515"></a>
+<span class="sourceLineNo">2516</span> filesToRemove.add(file);<a
name="line.2516"></a>
+<span class="sourceLineNo">2517</span> }<a name="line.2517"></a>
+<span class="sourceLineNo">2518</span> } catch (Exception e) {<a
name="line.2518"></a>
+<span class="sourceLineNo">2519</span> LOG.error(<a
name="line.2519"></a>
+<span class="sourceLineNo">2520</span> "Exception while trying to
close the compacted store file " + file.getPath().getName());<a
name="line.2520"></a>
+<span class="sourceLineNo">2521</span> }<a name="line.2521"></a>
+<span class="sourceLineNo">2522</span> }<a name="line.2522"></a>
+<span class="sourceLineNo">2523</span> }<a name="line.2523"></a>
+<span class="sourceLineNo">2524</span> if (this.isPrimaryReplicaStore())
{<a name="line.2524"></a>
+<span class="sourceLineNo">2525</span> // Only the primary region is
allowed to move the file to archive.<a name="line.2525"></a>
+<span class="sourceLineNo">2526</span> // The secondary region does not
move the files to archive. Any active reads from<a name="line.2526"></a>
+<span class="sourceLineNo">2527</span> // the secondary region will still
work because the file as such has active readers on it.<a name="line.2527"></a>
+<span class="sourceLineNo">2528</span> if (!filesToRemove.isEmpty()) {<a
name="line.2528"></a>
+<span class="sourceLineNo">2529</span> if (LOG.isDebugEnabled()) {<a
name="line.2529"></a>
+<span class="sourceLineNo">2530</span> LOG.debug("Moving the files "
+ filesToRemove + " to archive");<a name="line.2530"></a>
+<span class="sourceLineNo">2531</span> }<a name="line.2531"></a>
+<span class="sourceLineNo">2532</span> // Only if this is successful it
has to be removed<a name="line.2532"></a>
+<span class="sourceLineNo">2533</span> try {<a name="line.2533"></a>
+<span class="sourceLineNo">2534</span>
this.fs.removeStoreFiles(this.getColumnFamilyDescriptor().getNameAsString(),
filesToRemove);<a name="line.2534"></a>
+<span class="sourceLineNo">2535</span> } catch (FailedArchiveException
fae) {<a name="line.2535"></a>
+<span class="sourceLineNo">2536</span> // Even if archiving some
files failed, we still need to clear out any of the<a name="line.2536"></a>
+<span class="sourceLineNo">2537</span> // files which were
successfully archived. Otherwise we will receive a<a name="line.2537"></a>
+<span class="sourceLineNo">2538</span> // FileNotFoundException when
we attempt to re-archive them in the next go around.<a name="line.2538"></a>
+<span class="sourceLineNo">2539</span> Collection<Path>
failedFiles = fae.getFailedFiles();<a name="line.2539"></a>
+<span class="sourceLineNo">2540</span> Iterator<HStoreFile>
iter = filesToRemove.iterator();<a name="line.2540"></a>
+<span class="sourceLineNo">2541</span> while (iter.hasNext()) {<a
name="line.2541"></a>
+<span class="sourceLineNo">2542</span> if
(failedFiles.contains(iter.next().getPath())) {<a name="line.2542"></a>
+<span class="sourceLineNo">2543</span> iter.remove();<a
name="line.2543"></a>
+<span class="sourceLineNo">2544</span> }<a name="line.2544"></a>
+<span class="sourceLineNo">2545</span> }<a name="line.2545"></a>
+<span class="sourceLineNo">2546</span> if (!filesToRemove.isEmpty())
{<a name="line.2546"></a>
+<span class="sourceLineNo">2547</span>
clearCompactedfiles(filesToRemove);<a name="line.2547"></a>
+<span class="sourceLineNo">2548</span> }<a name="line.2548"></a>
+<span class="sourceLineNo">2549</span> throw fae;<a
name="line.2549"></a>
+<span class="sourceLineNo">2550</span> }<a name="line.2550"></a>
+<span class="sourceLineNo">2551</span> }<a name="line.2551"></a>
+<span class="sourceLineNo">2552</span> }<a name="line.2552"></a>
+<span class="sourceLineNo">2553</span> if (!filesToRemove.isEmpty()) {<a
name="line.2553"></a>
+<span class="sourceLineNo">2554</span> // Clear the compactedfiles from
the store file manager<a name="line.2554"></a>
+<span class="sourceLineNo">2555</span>
clearCompactedfiles(filesToRemove);<a name="line.2555"></a>
+<span class="sourceLineNo">2556</span> }<a name="line.2556"></a>
+<span class="sourceLineNo">2557</span> }<a name="line.2557"></a>
+<span class="sourceLineNo">2558</span><a name="line.2558"></a>
+<span class="sourceLineNo">2559</span> public Long preFlushSeqIDEstimation()
{<a name="line.2559"></a>
+<span class="sourceLineNo">2560</span> return
memstore.preFlushSeqIDEstimation();<a name="line.2560"></a>
+<span class="sourceLineNo">2561</span> }<a name="line.2561"></a>
+<span class="sourceLineNo">2562</span><a name="line.2562"></a>
+<span class="sourceLineNo">2563</span> @Override<a name="line.2563"></a>
+<span class="sourceLineNo">2564</span> public boolean isSloppyMemStore() {<a
name="line.2564"></a>
+<span class="sourceLineNo">2565</span> return this.memstore.isSloppy();<a
name="line.2565"></a>
+<span class="sourceLineNo">2566</span> }<a name="line.2566"></a>
+<span class="sourceLineNo">2567</span><a name="line.2567"></a>
+<span class="sourceLineNo">2568</span> private void
clearCompactedfiles(List<HStoreFile> filesToRemove) throws IOException
{<a name="line.2568"></a>
+<span class="sourceLineNo">2569</span> if (LOG.isTraceEnabled()) {<a
name="line.2569"></a>
+<span class="sourceLineNo">2570</span> LOG.trace("Clearing the compacted
file " + filesToRemove + " from this store");<a name="line.2570"></a>
+<span class="sourceLineNo">2571</span> }<a name="line.2571"></a>
+<span class="sourceLineNo">2572</span> try {<a name="line.2572"></a>
+<span class="sourceLineNo">2573</span> lock.writeLock().lock();<a
name="line.2573"></a>
+<span class="sourceLineNo">2574</span>
this.getStoreEngine().getStoreFileManager().removeCompactedFiles(filesToRemove);<a
name="line.2574"></a>
+<span class="sourceLineNo">2575</span> } finally {<a name="line.2575"></a>
+<span class="sourceLineNo">2576</span> lock.writeLock().unlock();<a
name="line.2576"></a>
+<span class="sourceLineNo">2577</span> }<a name="line.2577"></a>
+<span class="sourceLineNo">2578</span> }<a name="line.2578"></a>
+<span class="sourceLineNo">2579</span>}<a name="line.2579"></a>