http://git-wip-us.apache.org/repos/asf/hbase/blob/ef7355e1/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/HRegion.RowLockImpl.html ---------------------------------------------------------------------- diff --git a/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/HRegion.RowLockImpl.html b/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/HRegion.RowLockImpl.html index fe65cea..b625de4 100644 --- a/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/HRegion.RowLockImpl.html +++ b/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/HRegion.RowLockImpl.html @@ -1409,10 +1409,10 @@ <span class="sourceLineNo">1401</span> }<a name="line.1401"></a> <span class="sourceLineNo">1402</span> }<a name="line.1402"></a> <span class="sourceLineNo">1403</span><a name="line.1403"></a> -<span class="sourceLineNo">1404</span> this.closing.set(true);<a name="line.1404"></a> -<span class="sourceLineNo">1405</span> status.setStatus("Disabling writes for close");<a name="line.1405"></a> -<span class="sourceLineNo">1406</span> // block waiting for the lock for closing<a name="line.1406"></a> -<span class="sourceLineNo">1407</span> lock.writeLock().lock();<a name="line.1407"></a> +<span class="sourceLineNo">1404</span> // block waiting for the lock for closing<a name="line.1404"></a> +<span class="sourceLineNo">1405</span> lock.writeLock().lock();<a name="line.1405"></a> +<span class="sourceLineNo">1406</span> this.closing.set(true);<a name="line.1406"></a> +<span class="sourceLineNo">1407</span> status.setStatus("Disabling writes for close");<a name="line.1407"></a> <span class="sourceLineNo">1408</span> try {<a name="line.1408"></a> <span class="sourceLineNo">1409</span> if (this.isClosed()) {<a name="line.1409"></a> <span class="sourceLineNo">1410</span> status.abort("Already got closed by another process");<a name="line.1410"></a> @@ -5242,7 +5242,7 @@ <span class="sourceLineNo">5234</span> * Determines whether multiple column families are present<a name="line.5234"></a> <span class="sourceLineNo">5235</span> * Precondition: familyPaths is not null<a name="line.5235"></a> <span class="sourceLineNo">5236</span> *<a name="line.5236"></a> -<span class="sourceLineNo">5237</span> * @param familyPaths List of Pair<byte[] column family, String hfilePath><a name="line.5237"></a> +<span class="sourceLineNo">5237</span> * @param familyPaths List of (column family, hfilePath)<a name="line.5237"></a> <span class="sourceLineNo">5238</span> */<a name="line.5238"></a> <span class="sourceLineNo">5239</span> private static boolean hasMultipleColumnFamilies(Collection<Pair<byte[], String>> familyPaths) {<a name="line.5239"></a> <span class="sourceLineNo">5240</span> boolean multipleFamilies = false;<a name="line.5240"></a> @@ -5955,10 +5955,10 @@ <span class="sourceLineNo">5947</span><a name="line.5947"></a> <span class="sourceLineNo">5948</span> /**<a name="line.5948"></a> <span class="sourceLineNo">5949</span> * This function is to maintain backward compatibility for 0.94 filters. HBASE-6429 combines<a name="line.5949"></a> -<span class="sourceLineNo">5950</span> * both filterRow & filterRow(List<KeyValue> kvs) functions. While 0.94 code or older, it may<a name="line.5950"></a> -<span class="sourceLineNo">5951</span> * not implement hasFilterRow as HBase-6429 expects because 0.94 hasFilterRow() only returns<a name="line.5951"></a> -<span class="sourceLineNo">5952</span> * true when filterRow(List<KeyValue> kvs) is overridden not the filterRow(). Therefore, the<a name="line.5952"></a> -<span class="sourceLineNo">5953</span> * filterRow() will be skipped.<a name="line.5953"></a> +<span class="sourceLineNo">5950</span> * both filterRow & filterRow({@code List<KeyValue> kvs}) functions. While 0.94 code or older,<a name="line.5950"></a> +<span class="sourceLineNo">5951</span> * it may not implement hasFilterRow as HBase-6429 expects because 0.94 hasFilterRow() only<a name="line.5951"></a> +<span class="sourceLineNo">5952</span> * returns true when filterRow({@code List<KeyValue> kvs}) is overridden not the filterRow().<a name="line.5952"></a> +<span class="sourceLineNo">5953</span> * Therefore, the filterRow() will be skipped.<a name="line.5953"></a> <span class="sourceLineNo">5954</span> */<a name="line.5954"></a> <span class="sourceLineNo">5955</span> private boolean filterRow() throws IOException {<a name="line.5955"></a> <span class="sourceLineNo">5956</span> // when hasFilterRow returns true, filter.filterRow() will be called automatically inside<a name="line.5956"></a> @@ -6958,1155 +6958,1148 @@ <span class="sourceLineNo">6950</span> }<a name="line.6950"></a> <span class="sourceLineNo">6951</span><a name="line.6951"></a> <span class="sourceLineNo">6952</span> /**<a name="line.6952"></a> -<span class="sourceLineNo">6953</span> * @param cell<a name="line.6953"></a> -<span class="sourceLineNo">6954</span> * @param tags<a name="line.6954"></a> -<span class="sourceLineNo">6955</span> * @return The passed-in List<Tag> but with the tags from <code>cell</code> added.<a name="line.6955"></a> -<span class="sourceLineNo">6956</span> */<a name="line.6956"></a> -<span class="sourceLineNo">6957</span> private static List<Tag> carryForwardTags(final Cell cell, final List<Tag> tags) {<a name="line.6957"></a> -<span class="sourceLineNo">6958</span> if (cell.getTagsLength() <= 0) return tags;<a name="line.6958"></a> -<span class="sourceLineNo">6959</span> List<Tag> newTags = tags == null? new ArrayList<Tag>(): /*Append Tags*/tags; <a name="line.6959"></a> -<span class="sourceLineNo">6960</span> Iterator<Tag> i =<a name="line.6960"></a> -<span class="sourceLineNo">6961</span> CellUtil.tagsIterator(cell.getTagsArray(), cell.getTagsOffset(), cell.getTagsLength());<a name="line.6961"></a> -<span class="sourceLineNo">6962</span> while (i.hasNext()) newTags.add(i.next());<a name="line.6962"></a> -<span class="sourceLineNo">6963</span> return newTags;<a name="line.6963"></a> -<span class="sourceLineNo">6964</span> }<a name="line.6964"></a> -<span class="sourceLineNo">6965</span><a name="line.6965"></a> -<span class="sourceLineNo">6966</span> /**<a name="line.6966"></a> -<span class="sourceLineNo">6967</span> * Run a Get against passed in <code>store</code> on passed <code>row</code>, etc.<a name="line.6967"></a> -<span class="sourceLineNo">6968</span> * @param store<a name="line.6968"></a> -<span class="sourceLineNo">6969</span> * @param row<a name="line.6969"></a> -<span class="sourceLineNo">6970</span> * @param family<a name="line.6970"></a> -<span class="sourceLineNo">6971</span> * @param tr<a name="line.6971"></a> -<span class="sourceLineNo">6972</span> * @return Get result.<a name="line.6972"></a> -<span class="sourceLineNo">6973</span> * @throws IOException<a name="line.6973"></a> -<span class="sourceLineNo">6974</span> */<a name="line.6974"></a> -<span class="sourceLineNo">6975</span> private List<Cell> doGet(final Store store, final byte [] row,<a name="line.6975"></a> -<span class="sourceLineNo">6976</span> final Map.Entry<byte[], List<Cell>> family, final TimeRange tr)<a name="line.6976"></a> -<span class="sourceLineNo">6977</span> throws IOException {<a name="line.6977"></a> -<span class="sourceLineNo">6978</span> // Sort the cells so that they match the order that they<a name="line.6978"></a> -<span class="sourceLineNo">6979</span> // appear in the Get results. Otherwise, we won't be able to<a name="line.6979"></a> -<span class="sourceLineNo">6980</span> // find the existing values if the cells are not specified<a name="line.6980"></a> -<span class="sourceLineNo">6981</span> // in order by the client since cells are in an array list.<a name="line.6981"></a> -<span class="sourceLineNo">6982</span> Collections.sort(family.getValue(), store.getComparator());<a name="line.6982"></a> -<span class="sourceLineNo">6983</span> // Get previous values for all columns in this family<a name="line.6983"></a> -<span class="sourceLineNo">6984</span> Get get = new Get(row);<a name="line.6984"></a> -<span class="sourceLineNo">6985</span> for (Cell cell : family.getValue()) {<a name="line.6985"></a> -<span class="sourceLineNo">6986</span> get.addColumn(family.getKey(), CellUtil.cloneQualifier(cell));<a name="line.6986"></a> -<span class="sourceLineNo">6987</span> }<a name="line.6987"></a> -<span class="sourceLineNo">6988</span> if (tr != null) get.setTimeRange(tr.getMin(), tr.getMax());<a name="line.6988"></a> -<span class="sourceLineNo">6989</span> return get(get, false);<a name="line.6989"></a> -<span class="sourceLineNo">6990</span> }<a name="line.6990"></a> -<span class="sourceLineNo">6991</span><a name="line.6991"></a> -<span class="sourceLineNo">6992</span> public Result append(Append append) throws IOException {<a name="line.6992"></a> -<span class="sourceLineNo">6993</span> return append(append, HConstants.NO_NONCE, HConstants.NO_NONCE);<a name="line.6993"></a> -<span class="sourceLineNo">6994</span> }<a name="line.6994"></a> -<span class="sourceLineNo">6995</span><a name="line.6995"></a> -<span class="sourceLineNo">6996</span> // TODO: There's a lot of boiler plate code identical to increment.<a name="line.6996"></a> -<span class="sourceLineNo">6997</span> // We should refactor append and increment as local get-mutate-put<a name="line.6997"></a> -<span class="sourceLineNo">6998</span> // transactions, so all stores only go through one code path for puts.<a name="line.6998"></a> -<span class="sourceLineNo">6999</span><a name="line.6999"></a> -<span class="sourceLineNo">7000</span> @Override<a name="line.7000"></a> -<span class="sourceLineNo">7001</span> public Result append(Append mutate, long nonceGroup, long nonce) throws IOException {<a name="line.7001"></a> -<span class="sourceLineNo">7002</span> Operation op = Operation.APPEND;<a name="line.7002"></a> -<span class="sourceLineNo">7003</span> byte[] row = mutate.getRow();<a name="line.7003"></a> -<span class="sourceLineNo">7004</span> checkRow(row, op.toString());<a name="line.7004"></a> -<span class="sourceLineNo">7005</span> checkFamilies(mutate.getFamilyCellMap().keySet());<a name="line.7005"></a> -<span class="sourceLineNo">7006</span> boolean flush = false;<a name="line.7006"></a> -<span class="sourceLineNo">7007</span> Durability durability = getEffectiveDurability(mutate.getDurability());<a name="line.7007"></a> -<span class="sourceLineNo">7008</span> boolean writeToWAL = durability != Durability.SKIP_WAL;<a name="line.7008"></a> -<span class="sourceLineNo">7009</span> WALEdit walEdits = null;<a name="line.7009"></a> -<span class="sourceLineNo">7010</span> List<Cell> allKVs = new ArrayList<Cell>(mutate.size());<a name="line.7010"></a> -<span class="sourceLineNo">7011</span> Map<Store, List<Cell>> tempMemstore = new HashMap<Store, List<Cell>>();<a name="line.7011"></a> -<span class="sourceLineNo">7012</span> long size = 0;<a name="line.7012"></a> -<span class="sourceLineNo">7013</span> long txid = 0;<a name="line.7013"></a> -<span class="sourceLineNo">7014</span> checkReadOnly();<a name="line.7014"></a> -<span class="sourceLineNo">7015</span> checkResources();<a name="line.7015"></a> -<span class="sourceLineNo">7016</span> // Lock row<a name="line.7016"></a> -<span class="sourceLineNo">7017</span> startRegionOperation(op);<a name="line.7017"></a> -<span class="sourceLineNo">7018</span> this.writeRequestsCount.increment();<a name="line.7018"></a> -<span class="sourceLineNo">7019</span> RowLock rowLock = null;<a name="line.7019"></a> -<span class="sourceLineNo">7020</span> WALKey walKey = null;<a name="line.7020"></a> -<span class="sourceLineNo">7021</span> MultiVersionConcurrencyControl.WriteEntry writeEntry = null;<a name="line.7021"></a> -<span class="sourceLineNo">7022</span> boolean doRollBackMemstore = false;<a name="line.7022"></a> -<span class="sourceLineNo">7023</span> try {<a name="line.7023"></a> -<span class="sourceLineNo">7024</span> rowLock = getRowLock(row);<a name="line.7024"></a> -<span class="sourceLineNo">7025</span> assert rowLock != null;<a name="line.7025"></a> -<span class="sourceLineNo">7026</span> try {<a name="line.7026"></a> -<span class="sourceLineNo">7027</span> lock(this.updatesLock.readLock());<a name="line.7027"></a> -<span class="sourceLineNo">7028</span> try {<a name="line.7028"></a> -<span class="sourceLineNo">7029</span> // Wait for all prior MVCC transactions to finish - while we hold the row lock<a name="line.7029"></a> -<span class="sourceLineNo">7030</span> // (so that we are guaranteed to see the latest state when we do our Get)<a name="line.7030"></a> -<span class="sourceLineNo">7031</span> mvcc.await();<a name="line.7031"></a> -<span class="sourceLineNo">7032</span> if (this.coprocessorHost != null) {<a name="line.7032"></a> -<span class="sourceLineNo">7033</span> Result r = this.coprocessorHost.preAppendAfterRowLock(mutate);<a name="line.7033"></a> -<span class="sourceLineNo">7034</span> if (r!= null) {<a name="line.7034"></a> -<span class="sourceLineNo">7035</span> return r;<a name="line.7035"></a> -<span class="sourceLineNo">7036</span> }<a name="line.7036"></a> -<span class="sourceLineNo">7037</span> }<a name="line.7037"></a> -<span class="sourceLineNo">7038</span> long now = EnvironmentEdgeManager.currentTime();<a name="line.7038"></a> -<span class="sourceLineNo">7039</span> // Process each family<a name="line.7039"></a> -<span class="sourceLineNo">7040</span> for (Map.Entry<byte[], List<Cell>> family : mutate.getFamilyCellMap().entrySet()) {<a name="line.7040"></a> -<span class="sourceLineNo">7041</span> Store store = stores.get(family.getKey());<a name="line.7041"></a> -<span class="sourceLineNo">7042</span> List<Cell> kvs = new ArrayList<Cell>(family.getValue().size());<a name="line.7042"></a> -<span class="sourceLineNo">7043</span><a name="line.7043"></a> -<span class="sourceLineNo">7044</span> List<Cell> results = doGet(store, row, family, null);<a name="line.7044"></a> -<span class="sourceLineNo">7045</span><a name="line.7045"></a> -<span class="sourceLineNo">7046</span> // Iterate the input columns and update existing values if they were<a name="line.7046"></a> -<span class="sourceLineNo">7047</span> // found, otherwise add new column initialized to the append value<a name="line.7047"></a> -<span class="sourceLineNo">7048</span><a name="line.7048"></a> -<span class="sourceLineNo">7049</span> // Avoid as much copying as possible. We may need to rewrite and<a name="line.7049"></a> -<span class="sourceLineNo">7050</span> // consolidate tags. Bytes are only copied once.<a name="line.7050"></a> -<span class="sourceLineNo">7051</span> // Would be nice if KeyValue had scatter/gather logic<a name="line.7051"></a> -<span class="sourceLineNo">7052</span> int idx = 0;<a name="line.7052"></a> -<span class="sourceLineNo">7053</span> for (Cell cell : family.getValue()) {<a name="line.7053"></a> -<span class="sourceLineNo">7054</span> Cell newCell;<a name="line.7054"></a> -<span class="sourceLineNo">7055</span> Cell oldCell = null;<a name="line.7055"></a> -<span class="sourceLineNo">7056</span> if (idx < results.size()<a name="line.7056"></a> -<span class="sourceLineNo">7057</span> && CellUtil.matchingQualifier(results.get(idx), cell)) {<a name="line.7057"></a> -<span class="sourceLineNo">7058</span> oldCell = results.get(idx);<a name="line.7058"></a> -<span class="sourceLineNo">7059</span> long ts = Math.max(now, oldCell.getTimestamp());<a name="line.7059"></a> +<span class="sourceLineNo">6953</span> * @return The passed-in {@code tags} but with the tags from {@code cell} added.<a name="line.6953"></a> +<span class="sourceLineNo">6954</span> */<a name="line.6954"></a> +<span class="sourceLineNo">6955</span> private static List<Tag> carryForwardTags(final Cell cell, final List<Tag> tags) {<a name="line.6955"></a> +<span class="sourceLineNo">6956</span> if (cell.getTagsLength() <= 0) return tags;<a name="line.6956"></a> +<span class="sourceLineNo">6957</span> List<Tag> newTags = tags == null? new ArrayList<Tag>(): /*Append Tags*/tags; <a name="line.6957"></a> +<span class="sourceLineNo">6958</span> Iterator<Tag> i =<a name="line.6958"></a> +<span class="sourceLineNo">6959</span> CellUtil.tagsIterator(cell.getTagsArray(), cell.getTagsOffset(), cell.getTagsLength());<a name="line.6959"></a> +<span class="sourceLineNo">6960</span> while (i.hasNext()) newTags.add(i.next());<a name="line.6960"></a> +<span class="sourceLineNo">6961</span> return newTags;<a name="line.6961"></a> +<span class="sourceLineNo">6962</span> }<a name="line.6962"></a> +<span class="sourceLineNo">6963</span><a name="line.6963"></a> +<span class="sourceLineNo">6964</span> /**<a name="line.6964"></a> +<span class="sourceLineNo">6965</span> * Run a Get against passed in <code>store</code> on passed <code>row</code>, etc.<a name="line.6965"></a> +<span class="sourceLineNo">6966</span> * @return Get result.<a name="line.6966"></a> +<span class="sourceLineNo">6967</span> */<a name="line.6967"></a> +<span class="sourceLineNo">6968</span> private List<Cell> doGet(final Store store, final byte [] row,<a name="line.6968"></a> +<span class="sourceLineNo">6969</span> final Map.Entry<byte[], List<Cell>> family, final TimeRange tr)<a name="line.6969"></a> +<span class="sourceLineNo">6970</span> throws IOException {<a name="line.6970"></a> +<span class="sourceLineNo">6971</span> // Sort the cells so that they match the order that they<a name="line.6971"></a> +<span class="sourceLineNo">6972</span> // appear in the Get results. Otherwise, we won't be able to<a name="line.6972"></a> +<span class="sourceLineNo">6973</span> // find the existing values if the cells are not specified<a name="line.6973"></a> +<span class="sourceLineNo">6974</span> // in order by the client since cells are in an array list.<a name="line.6974"></a> +<span class="sourceLineNo">6975</span> Collections.sort(family.getValue(), store.getComparator());<a name="line.6975"></a> +<span class="sourceLineNo">6976</span> // Get previous values for all columns in this family<a name="line.6976"></a> +<span class="sourceLineNo">6977</span> Get get = new Get(row);<a name="line.6977"></a> +<span class="sourceLineNo">6978</span> for (Cell cell : family.getValue()) {<a name="line.6978"></a> +<span class="sourceLineNo">6979</span> get.addColumn(family.getKey(), CellUtil.cloneQualifier(cell));<a name="line.6979"></a> +<span class="sourceLineNo">6980</span> }<a name="line.6980"></a> +<span class="sourceLineNo">6981</span> if (tr != null) get.setTimeRange(tr.getMin(), tr.getMax());<a name="line.6981"></a> +<span class="sourceLineNo">6982</span> return get(get, false);<a name="line.6982"></a> +<span class="sourceLineNo">6983</span> }<a name="line.6983"></a> +<span class="sourceLineNo">6984</span><a name="line.6984"></a> +<span class="sourceLineNo">6985</span> public Result append(Append append) throws IOException {<a name="line.6985"></a> +<span class="sourceLineNo">6986</span> return append(append, HConstants.NO_NONCE, HConstants.NO_NONCE);<a name="line.6986"></a> +<span class="sourceLineNo">6987</span> }<a name="line.6987"></a> +<span class="sourceLineNo">6988</span><a name="line.6988"></a> +<span class="sourceLineNo">6989</span> // TODO: There's a lot of boiler plate code identical to increment.<a name="line.6989"></a> +<span class="sourceLineNo">6990</span> // We should refactor append and increment as local get-mutate-put<a name="line.6990"></a> +<span class="sourceLineNo">6991</span> // transactions, so all stores only go through one code path for puts.<a name="line.6991"></a> +<span class="sourceLineNo">6992</span><a name="line.6992"></a> +<span class="sourceLineNo">6993</span> @Override<a name="line.6993"></a> +<span class="sourceLineNo">6994</span> public Result append(Append mutate, long nonceGroup, long nonce) throws IOException {<a name="line.6994"></a> +<span class="sourceLineNo">6995</span> Operation op = Operation.APPEND;<a name="line.6995"></a> +<span class="sourceLineNo">6996</span> byte[] row = mutate.getRow();<a name="line.6996"></a> +<span class="sourceLineNo">6997</span> checkRow(row, op.toString());<a name="line.6997"></a> +<span class="sourceLineNo">6998</span> checkFamilies(mutate.getFamilyCellMap().keySet());<a name="line.6998"></a> +<span class="sourceLineNo">6999</span> boolean flush = false;<a name="line.6999"></a> +<span class="sourceLineNo">7000</span> Durability durability = getEffectiveDurability(mutate.getDurability());<a name="line.7000"></a> +<span class="sourceLineNo">7001</span> boolean writeToWAL = durability != Durability.SKIP_WAL;<a name="line.7001"></a> +<span class="sourceLineNo">7002</span> WALEdit walEdits = null;<a name="line.7002"></a> +<span class="sourceLineNo">7003</span> List<Cell> allKVs = new ArrayList<Cell>(mutate.size());<a name="line.7003"></a> +<span class="sourceLineNo">7004</span> Map<Store, List<Cell>> tempMemstore = new HashMap<Store, List<Cell>>();<a name="line.7004"></a> +<span class="sourceLineNo">7005</span> long size = 0;<a name="line.7005"></a> +<span class="sourceLineNo">7006</span> long txid = 0;<a name="line.7006"></a> +<span class="sourceLineNo">7007</span> checkReadOnly();<a name="line.7007"></a> +<span class="sourceLineNo">7008</span> checkResources();<a name="line.7008"></a> +<span class="sourceLineNo">7009</span> // Lock row<a name="line.7009"></a> +<span class="sourceLineNo">7010</span> startRegionOperation(op);<a name="line.7010"></a> +<span class="sourceLineNo">7011</span> this.writeRequestsCount.increment();<a name="line.7011"></a> +<span class="sourceLineNo">7012</span> RowLock rowLock = null;<a name="line.7012"></a> +<span class="sourceLineNo">7013</span> WALKey walKey = null;<a name="line.7013"></a> +<span class="sourceLineNo">7014</span> MultiVersionConcurrencyControl.WriteEntry writeEntry = null;<a name="line.7014"></a> +<span class="sourceLineNo">7015</span> boolean doRollBackMemstore = false;<a name="line.7015"></a> +<span class="sourceLineNo">7016</span> try {<a name="line.7016"></a> +<span class="sourceLineNo">7017</span> rowLock = getRowLock(row);<a name="line.7017"></a> +<span class="sourceLineNo">7018</span> assert rowLock != null;<a name="line.7018"></a> +<span class="sourceLineNo">7019</span> try {<a name="line.7019"></a> +<span class="sourceLineNo">7020</span> lock(this.updatesLock.readLock());<a name="line.7020"></a> +<span class="sourceLineNo">7021</span> try {<a name="line.7021"></a> +<span class="sourceLineNo">7022</span> // Wait for all prior MVCC transactions to finish - while we hold the row lock<a name="line.7022"></a> +<span class="sourceLineNo">7023</span> // (so that we are guaranteed to see the latest state when we do our Get)<a name="line.7023"></a> +<span class="sourceLineNo">7024</span> mvcc.await();<a name="line.7024"></a> +<span class="sourceLineNo">7025</span> if (this.coprocessorHost != null) {<a name="line.7025"></a> +<span class="sourceLineNo">7026</span> Result r = this.coprocessorHost.preAppendAfterRowLock(mutate);<a name="line.7026"></a> +<span class="sourceLineNo">7027</span> if (r!= null) {<a name="line.7027"></a> +<span class="sourceLineNo">7028</span> return r;<a name="line.7028"></a> +<span class="sourceLineNo">7029</span> }<a name="line.7029"></a> +<span class="sourceLineNo">7030</span> }<a name="line.7030"></a> +<span class="sourceLineNo">7031</span> long now = EnvironmentEdgeManager.currentTime();<a name="line.7031"></a> +<span class="sourceLineNo">7032</span> // Process each family<a name="line.7032"></a> +<span class="sourceLineNo">7033</span> for (Map.Entry<byte[], List<Cell>> family : mutate.getFamilyCellMap().entrySet()) {<a name="line.7033"></a> +<span class="sourceLineNo">7034</span> Store store = stores.get(family.getKey());<a name="line.7034"></a> +<span class="sourceLineNo">7035</span> List<Cell> kvs = new ArrayList<Cell>(family.getValue().size());<a name="line.7035"></a> +<span class="sourceLineNo">7036</span><a name="line.7036"></a> +<span class="sourceLineNo">7037</span> List<Cell> results = doGet(store, row, family, null);<a name="line.7037"></a> +<span class="sourceLineNo">7038</span><a name="line.7038"></a> +<span class="sourceLineNo">7039</span> // Iterate the input columns and update existing values if they were<a name="line.7039"></a> +<span class="sourceLineNo">7040</span> // found, otherwise add new column initialized to the append value<a name="line.7040"></a> +<span class="sourceLineNo">7041</span><a name="line.7041"></a> +<span class="sourceLineNo">7042</span> // Avoid as much copying as possible. We may need to rewrite and<a name="line.7042"></a> +<span class="sourceLineNo">7043</span> // consolidate tags. Bytes are only copied once.<a name="line.7043"></a> +<span class="sourceLineNo">7044</span> // Would be nice if KeyValue had scatter/gather logic<a name="line.7044"></a> +<span class="sourceLineNo">7045</span> int idx = 0;<a name="line.7045"></a> +<span class="sourceLineNo">7046</span> for (Cell cell : family.getValue()) {<a name="line.7046"></a> +<span class="sourceLineNo">7047</span> Cell newCell;<a name="line.7047"></a> +<span class="sourceLineNo">7048</span> Cell oldCell = null;<a name="line.7048"></a> +<span class="sourceLineNo">7049</span> if (idx < results.size()<a name="line.7049"></a> +<span class="sourceLineNo">7050</span> && CellUtil.matchingQualifier(results.get(idx), cell)) {<a name="line.7050"></a> +<span class="sourceLineNo">7051</span> oldCell = results.get(idx);<a name="line.7051"></a> +<span class="sourceLineNo">7052</span> long ts = Math.max(now, oldCell.getTimestamp());<a name="line.7052"></a> +<span class="sourceLineNo">7053</span><a name="line.7053"></a> +<span class="sourceLineNo">7054</span> // Process cell tags<a name="line.7054"></a> +<span class="sourceLineNo">7055</span> // Make a union of the set of tags in the old and new KVs<a name="line.7055"></a> +<span class="sourceLineNo">7056</span> List<Tag> newTags = carryForwardTags(oldCell, new ArrayList<Tag>());<a name="line.7056"></a> +<span class="sourceLineNo">7057</span> newTags = carryForwardTags(cell, newTags);<a name="line.7057"></a> +<span class="sourceLineNo">7058</span><a name="line.7058"></a> +<span class="sourceLineNo">7059</span> // Cell TTL handling<a name="line.7059"></a> <span class="sourceLineNo">7060</span><a name="line.7060"></a> -<span class="sourceLineNo">7061</span> // Process cell tags<a name="line.7061"></a> -<span class="sourceLineNo">7062</span> // Make a union of the set of tags in the old and new KVs<a name="line.7062"></a> -<span class="sourceLineNo">7063</span> List<Tag> newTags = carryForwardTags(oldCell, new ArrayList<Tag>());<a name="line.7063"></a> -<span class="sourceLineNo">7064</span> newTags = carryForwardTags(cell, newTags);<a name="line.7064"></a> +<span class="sourceLineNo">7061</span> if (mutate.getTTL() != Long.MAX_VALUE) {<a name="line.7061"></a> +<span class="sourceLineNo">7062</span> // Add the new TTL tag<a name="line.7062"></a> +<span class="sourceLineNo">7063</span> newTags.add(new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(mutate.getTTL())));<a name="line.7063"></a> +<span class="sourceLineNo">7064</span> }<a name="line.7064"></a> <span class="sourceLineNo">7065</span><a name="line.7065"></a> -<span class="sourceLineNo">7066</span> // Cell TTL handling<a name="line.7066"></a> -<span class="sourceLineNo">7067</span><a name="line.7067"></a> -<span class="sourceLineNo">7068</span> if (mutate.getTTL() != Long.MAX_VALUE) {<a name="line.7068"></a> -<span class="sourceLineNo">7069</span> // Add the new TTL tag<a name="line.7069"></a> -<span class="sourceLineNo">7070</span> newTags.add(new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(mutate.getTTL())));<a name="line.7070"></a> -<span class="sourceLineNo">7071</span> }<a name="line.7071"></a> -<span class="sourceLineNo">7072</span><a name="line.7072"></a> -<span class="sourceLineNo">7073</span> // Rebuild tags<a name="line.7073"></a> -<span class="sourceLineNo">7074</span> byte[] tagBytes = Tag.fromList(newTags);<a name="line.7074"></a> -<span class="sourceLineNo">7075</span><a name="line.7075"></a> -<span class="sourceLineNo">7076</span> // allocate an empty cell once<a name="line.7076"></a> -<span class="sourceLineNo">7077</span> newCell = new KeyValue(row.length, cell.getFamilyLength(),<a name="line.7077"></a> -<span class="sourceLineNo">7078</span> cell.getQualifierLength(), ts, KeyValue.Type.Put,<a name="line.7078"></a> -<span class="sourceLineNo">7079</span> oldCell.getValueLength() + cell.getValueLength(),<a name="line.7079"></a> -<span class="sourceLineNo">7080</span> tagBytes.length);<a name="line.7080"></a> -<span class="sourceLineNo">7081</span> // copy in row, family, and qualifier<a name="line.7081"></a> -<span class="sourceLineNo">7082</span> System.arraycopy(cell.getRowArray(), cell.getRowOffset(),<a name="line.7082"></a> -<span class="sourceLineNo">7083</span> newCell.getRowArray(), newCell.getRowOffset(), cell.getRowLength());<a name="line.7083"></a> -<span class="sourceLineNo">7084</span> System.arraycopy(cell.getFamilyArray(), cell.getFamilyOffset(),<a name="line.7084"></a> -<span class="sourceLineNo">7085</span> newCell.getFamilyArray(), newCell.getFamilyOffset(),<a name="line.7085"></a> -<span class="sourceLineNo">7086</span> cell.getFamilyLength());<a name="line.7086"></a> -<span class="sourceLineNo">7087</span> System.arraycopy(cell.getQualifierArray(), cell.getQualifierOffset(),<a name="line.7087"></a> -<span class="sourceLineNo">7088</span> newCell.getQualifierArray(), newCell.getQualifierOffset(),<a name="line.7088"></a> -<span class="sourceLineNo">7089</span> cell.getQualifierLength());<a name="line.7089"></a> -<span class="sourceLineNo">7090</span> // copy in the value<a name="line.7090"></a> -<span class="sourceLineNo">7091</span> CellUtil.copyValueTo(oldCell, newCell.getValueArray(), newCell.getValueOffset());<a name="line.7091"></a> -<span class="sourceLineNo">7092</span> System.arraycopy(cell.getValueArray(), cell.getValueOffset(),<a name="line.7092"></a> -<span class="sourceLineNo">7093</span> newCell.getValueArray(),<a name="line.7093"></a> -<span class="sourceLineNo">7094</span> newCell.getValueOffset() + oldCell.getValueLength(),<a name="line.7094"></a> -<span class="sourceLineNo">7095</span> cell.getValueLength());<a name="line.7095"></a> -<span class="sourceLineNo">7096</span> // Copy in tag data<a name="line.7096"></a> -<span class="sourceLineNo">7097</span> System.arraycopy(tagBytes, 0, newCell.getTagsArray(), newCell.getTagsOffset(),<a name="line.7097"></a> -<span class="sourceLineNo">7098</span> tagBytes.length);<a name="line.7098"></a> -<span class="sourceLineNo">7099</span> idx++;<a name="line.7099"></a> -<span class="sourceLineNo">7100</span> } else {<a name="line.7100"></a> -<span class="sourceLineNo">7101</span> // Append's KeyValue.Type==Put and ts==HConstants.LATEST_TIMESTAMP<a name="line.7101"></a> -<span class="sourceLineNo">7102</span> CellUtil.updateLatestStamp(cell, now);<a name="line.7102"></a> -<span class="sourceLineNo">7103</span><a name="line.7103"></a> -<span class="sourceLineNo">7104</span> // Cell TTL handling<a name="line.7104"></a> -<span class="sourceLineNo">7105</span><a name="line.7105"></a> -<span class="sourceLineNo">7106</span> if (mutate.getTTL() != Long.MAX_VALUE) {<a name="line.7106"></a> -<span class="sourceLineNo">7107</span> List<Tag> newTags = new ArrayList<Tag>(1);<a name="line.7107"></a> -<span class="sourceLineNo">7108</span> newTags.add(new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(mutate.getTTL())));<a name="line.7108"></a> -<span class="sourceLineNo">7109</span> // Add the new TTL tag<a name="line.7109"></a> -<span class="sourceLineNo">7110</span> newCell = new TagRewriteCell(cell, Tag.fromList(newTags));<a name="line.7110"></a> -<span class="sourceLineNo">7111</span> } else {<a name="line.7111"></a> -<span class="sourceLineNo">7112</span> newCell = cell;<a name="line.7112"></a> -<span class="sourceLineNo">7113</span> }<a name="line.7113"></a> -<span class="sourceLineNo">7114</span> }<a name="line.7114"></a> +<span class="sourceLineNo">7066</span> // Rebuild tags<a name="line.7066"></a> +<span class="sourceLineNo">7067</span> byte[] tagBytes = Tag.fromList(newTags);<a name="line.7067"></a> +<span class="sourceLineNo">7068</span><a name="line.7068"></a> +<span class="sourceLineNo">7069</span> // allocate an empty cell once<a name="line.7069"></a> +<span class="sourceLineNo">7070</span> newCell = new KeyValue(row.length, cell.getFamilyLength(),<a name="line.7070"></a> +<span class="sourceLineNo">7071</span> cell.getQualifierLength(), ts, KeyValue.Type.Put,<a name="line.7071"></a> +<span class="sourceLineNo">7072</span> oldCell.getValueLength() + cell.getValueLength(),<a name="line.7072"></a> +<span class="sourceLineNo">7073</span> tagBytes.length);<a name="line.7073"></a> +<span class="sourceLineNo">7074</span> // copy in row, family, and qualifier<a name="line.7074"></a> +<span class="sourceLineNo">7075</span> System.arraycopy(cell.getRowArray(), cell.getRowOffset(),<a name="line.7075"></a> +<span class="sourceLineNo">7076</span> newCell.getRowArray(), newCell.getRowOffset(), cell.getRowLength());<a name="line.7076"></a> +<span class="sourceLineNo">7077</span> System.arraycopy(cell.getFamilyArray(), cell.getFamilyOffset(),<a name="line.7077"></a> +<span class="sourceLineNo">7078</span> newCell.getFamilyArray(), newCell.getFamilyOffset(),<a name="line.7078"></a> +<span class="sourceLineNo">7079</span> cell.getFamilyLength());<a name="line.7079"></a> +<span class="sourceLineNo">7080</span> System.arraycopy(cell.getQualifierArray(), cell.getQualifierOffset(),<a name="line.7080"></a> +<span class="sourceLineNo">7081</span> newCell.getQualifierArray(), newCell.getQualifierOffset(),<a name="line.7081"></a> +<span class="sourceLineNo">7082</span> cell.getQualifierLength());<a name="line.7082"></a> +<span class="sourceLineNo">7083</span> // copy in the value<a name="line.7083"></a> +<span class="sourceLineNo">7084</span> CellUtil.copyValueTo(oldCell, newCell.getValueArray(), newCell.getValueOffset());<a name="line.7084"></a> +<span class="sourceLineNo">7085</span> System.arraycopy(cell.getValueArray(), cell.getValueOffset(),<a name="line.7085"></a> +<span class="sourceLineNo">7086</span> newCell.getValueArray(),<a name="line.7086"></a> +<span class="sourceLineNo">7087</span> newCell.getValueOffset() + oldCell.getValueLength(),<a name="line.7087"></a> +<span class="sourceLineNo">7088</span> cell.getValueLength());<a name="line.7088"></a> +<span class="sourceLineNo">7089</span> // Copy in tag data<a name="line.7089"></a> +<span class="sourceLineNo">7090</span> System.arraycopy(tagBytes, 0, newCell.getTagsArray(), newCell.getTagsOffset(),<a name="line.7090"></a> +<span class="sourceLineNo">7091</span> tagBytes.length);<a name="line.7091"></a> +<span class="sourceLineNo">7092</span> idx++;<a name="line.7092"></a> +<span class="sourceLineNo">7093</span> } else {<a name="line.7093"></a> +<span class="sourceLineNo">7094</span> // Append's KeyValue.Type==Put and ts==HConstants.LATEST_TIMESTAMP<a name="line.7094"></a> +<span class="sourceLineNo">7095</span> CellUtil.updateLatestStamp(cell, now);<a name="line.7095"></a> +<span class="sourceLineNo">7096</span><a name="line.7096"></a> +<span class="sourceLineNo">7097</span> // Cell TTL handling<a name="line.7097"></a> +<span class="sourceLineNo">7098</span><a name="line.7098"></a> +<span class="sourceLineNo">7099</span> if (mutate.getTTL() != Long.MAX_VALUE) {<a name="line.7099"></a> +<span class="sourceLineNo">7100</span> List<Tag> newTags = new ArrayList<Tag>(1);<a name="line.7100"></a> +<span class="sourceLineNo">7101</span> newTags.add(new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(mutate.getTTL())));<a name="line.7101"></a> +<span class="sourceLineNo">7102</span> // Add the new TTL tag<a name="line.7102"></a> +<span class="sourceLineNo">7103</span> newCell = new TagRewriteCell(cell, Tag.fromList(newTags));<a name="line.7103"></a> +<span class="sourceLineNo">7104</span> } else {<a name="line.7104"></a> +<span class="sourceLineNo">7105</span> newCell = cell;<a name="line.7105"></a> +<span class="sourceLineNo">7106</span> }<a name="line.7106"></a> +<span class="sourceLineNo">7107</span> }<a name="line.7107"></a> +<span class="sourceLineNo">7108</span><a name="line.7108"></a> +<span class="sourceLineNo">7109</span> // Give coprocessors a chance to update the new cell<a name="line.7109"></a> +<span class="sourceLineNo">7110</span> if (coprocessorHost != null) {<a name="line.7110"></a> +<span class="sourceLineNo">7111</span> newCell = coprocessorHost.postMutationBeforeWAL(RegionObserver.MutationType.APPEND,<a name="line.7111"></a> +<span class="sourceLineNo">7112</span> mutate, oldCell, newCell);<a name="line.7112"></a> +<span class="sourceLineNo">7113</span> }<a name="line.7113"></a> +<span class="sourceLineNo">7114</span> kvs.add(newCell);<a name="line.7114"></a> <span class="sourceLineNo">7115</span><a name="line.7115"></a> -<span class="sourceLineNo">7116</span> // Give coprocessors a chance to update the new cell<a name="line.7116"></a> -<span class="sourceLineNo">7117</span> if (coprocessorHost != null) {<a name="line.7117"></a> -<span class="sourceLineNo">7118</span> newCell = coprocessorHost.postMutationBeforeWAL(RegionObserver.MutationType.APPEND,<a name="line.7118"></a> -<span class="sourceLineNo">7119</span> mutate, oldCell, newCell);<a name="line.7119"></a> -<span class="sourceLineNo">7120</span> }<a name="line.7120"></a> -<span class="sourceLineNo">7121</span> kvs.add(newCell);<a name="line.7121"></a> -<span class="sourceLineNo">7122</span><a name="line.7122"></a> -<span class="sourceLineNo">7123</span> // Append update to WAL<a name="line.7123"></a> -<span class="sourceLineNo">7124</span> if (writeToWAL) {<a name="line.7124"></a> -<span class="sourceLineNo">7125</span> if (walEdits == null) {<a name="line.7125"></a> -<span class="sourceLineNo">7126</span> walEdits = new WALEdit();<a name="line.7126"></a> -<span class="sourceLineNo">7127</span> }<a name="line.7127"></a> -<span class="sourceLineNo">7128</span> walEdits.add(newCell);<a name="line.7128"></a> -<span class="sourceLineNo">7129</span> }<a name="line.7129"></a> -<span class="sourceLineNo">7130</span> }<a name="line.7130"></a> -<span class="sourceLineNo">7131</span><a name="line.7131"></a> -<span class="sourceLineNo">7132</span> //store the kvs to the temporary memstore before writing WAL<a name="line.7132"></a> -<span class="sourceLineNo">7133</span> tempMemstore.put(store, kvs);<a name="line.7133"></a> -<span class="sourceLineNo">7134</span> }<a name="line.7134"></a> -<span class="sourceLineNo">7135</span><a name="line.7135"></a> -<span class="sourceLineNo">7136</span> // Actually write to WAL now<a name="line.7136"></a> -<span class="sourceLineNo">7137</span> if (walEdits != null && !walEdits.isEmpty()) {<a name="line.7137"></a> -<span class="sourceLineNo">7138</span> if (writeToWAL) {<a name="line.7138"></a> -<span class="sourceLineNo">7139</span> // Using default cluster id, as this can only happen in the originating<a name="line.7139"></a> -<span class="sourceLineNo">7140</span> // cluster. A slave cluster receives the final value (not the delta)<a name="line.7140"></a> -<span class="sourceLineNo">7141</span> // as a Put.<a name="line.7141"></a> -<span class="sourceLineNo">7142</span> // we use HLogKey here instead of WALKey directly to support legacy coprocessors.<a name="line.7142"></a> -<span class="sourceLineNo">7143</span> walKey = new HLogKey(<a name="line.7143"></a> -<span class="sourceLineNo">7144</span> getRegionInfo().getEncodedNameAsBytes(),<a name="line.7144"></a> -<span class="sourceLineNo">7145</span> this.htableDescriptor.getTableName(),<a name="line.7145"></a> -<span class="sourceLineNo">7146</span> WALKey.NO_SEQUENCE_ID,<a name="line.7146"></a> -<span class="sourceLineNo">7147</span> nonceGroup,<a name="line.7147"></a> -<span class="sourceLineNo">7148</span> nonce,<a name="line.7148"></a> -<span class="sourceLineNo">7149</span> mvcc);<a name="line.7149"></a> -<span class="sourceLineNo">7150</span> txid =<a name="line.7150"></a> -<span class="sourceLineNo">7151</span> this.wal.append(this.htableDescriptor, getRegionInfo(), walKey, walEdits, true);<a name="line.7151"></a> -<span class="sourceLineNo">7152</span> } else {<a name="line.7152"></a> -<span class="sourceLineNo">7153</span> recordMutationWithoutWal(mutate.getFamilyCellMap());<a name="line.7153"></a> -<span class="sourceLineNo">7154</span> }<a name="line.7154"></a> -<span class="sourceLineNo">7155</span> }<a name="line.7155"></a> -<span class="sourceLineNo">7156</span> if (walKey == null) {<a name="line.7156"></a> -<span class="sourceLineNo">7157</span> // Append a faked WALEdit in order for SKIP_WAL updates to get mvcc assigned<a name="line.7157"></a> -<span class="sourceLineNo">7158</span> walKey = this.appendEmptyEdit(this.wal);<a name="line.7158"></a> -<span class="sourceLineNo">7159</span> }<a name="line.7159"></a> -<span class="sourceLineNo">7160</span><a name="line.7160"></a> -<span class="sourceLineNo">7161</span> // now start my own transaction<a name="line.7161"></a> -<span class="sourceLineNo">7162</span> writeEntry = walKey.getWriteEntry();<a name="line.7162"></a> -<span class="sourceLineNo">7163</span><a name="line.7163"></a> -<span class="sourceLineNo">7164</span><a name="line.7164"></a> -<span class="sourceLineNo">7165</span> // Actually write to Memstore now<a name="line.7165"></a> -<span class="sourceLineNo">7166</span> if (!tempMemstore.isEmpty()) {<a name="line.7166"></a> -<span class="sourceLineNo">7167</span> for (Map.Entry<Store, List<Cell>> entry : tempMemstore.entrySet()) {<a name="line.7167"></a> -<span class="sourceLineNo">7168</span> Store store = entry.getKey();<a name="line.7168"></a> -<span class="sourceLineNo">7169</span> if (store.getFamily().getMaxVersions() == 1) {<a name="line.7169"></a> -<span class="sourceLineNo">7170</span> // upsert if VERSIONS for this CF == 1<a name="line.7170"></a> -<span class="sourceLineNo">7171</span> // Is this right? It immediately becomes visible? St.Ack 20150907<a name="line.7171"></a> -<span class="sourceLineNo">7172</span> size += store.upsert(entry.getValue(), getSmallestReadPoint());<a name="line.7172"></a> -<span class="sourceLineNo">7173</span> } else {<a name="line.7173"></a> -<span class="sourceLineNo">7174</span> // otherwise keep older versions around<a name="line.7174"></a> -<span class="sourceLineNo">7175</span> for (Cell cell: entry.getValue()) {<a name="line.7175"></a> -<span class="sourceLineNo">7176</span> CellUtil.setSequenceId(cell, writeEntry.getWriteNumber());<a name="line.7176"></a> -<span class="sourceLineNo">7177</span> size += store.add(cell);<a name="line.7177"></a> -<span class="sourceLineNo">7178</span> doRollBackMemstore = true;<a name="line.7178"></a> -<span class="sourceLineNo">7179</span> }<a name="line.7179"></a> -<span class="sourceLineNo">7180</span> }<a name="line.7180"></a> -<span class="sourceLineNo">7181</span> // We add to all KVs here whereas when doing increment, we do it<a name="line.7181"></a> -<span class="sourceLineNo">7182</span> // earlier... why?<a name="line.7182"></a> -<span class="sourceLineNo">7183</span> allKVs.addAll(entry.getValue());<a name="line.7183"></a> -<span class="sourceLineNo">7184</span> }<a name="line.7184"></a> +<span class="sourceLineNo">7116</span> // Append update to WAL<a name="line.7116"></a> +<span class="sourceLineNo">7117</span> if (writeToWAL) {<a name="line.7117"></a> +<span class="sourceLineNo">7118</span> if (walEdits == null) {<a name="line.7118"></a> +<span class="sourceLineNo">7119</span> walEdits = new WALEdit();<a name="line.7119"></a> +<span class="sourceLineNo">7120</span> }<a name="line.7120"></a> +<span class="sourceLineNo">7121</span> walEdits.add(newCell);<a name="line.7121"></a> +<span class="sourceLineNo">7122</span> }<a name="line.7122"></a> +<span class="sourceLineNo">7123</span> }<a name="line.7123"></a> +<span class="sourceLineNo">7124</span><a name="line.7124"></a> +<span class="sourceLineNo">7125</span> //store the kvs to the temporary memstore before writing WAL<a name="line.7125"></a> +<span class="sourceLineNo">7126</span> tempMemstore.put(store, kvs);<a name="line.7126"></a> +<span class="sourceLineNo">7127</span> }<a name="line.7127"></a> +<span class="sourceLineNo">7128</span><a name="line.7128"></a> +<span class="sourceLineNo">7129</span> // Actually write to WAL now<a name="line.7129"></a> +<span class="sourceLineNo">7130</span> if (walEdits != null && !walEdits.isEmpty()) {<a name="line.7130"></a> +<span class="sourceLineNo">7131</span> if (writeToWAL) {<a name="line.7131"></a> +<span class="sourceLineNo">7132</span> // Using default cluster id, as this can only happen in the originating<a name="line.7132"></a> +<span class="sourceLineNo">7133</span> // cluster. A slave cluster receives the final value (not the delta)<a name="line.7133"></a> +<span class="sourceLineNo">7134</span> // as a Put.<a name="line.7134"></a> +<span class="sourceLineNo">7135</span> // we use HLogKey here instead of WALKey directly to support legacy coprocessors.<a name="line.7135"></a> +<span class="sourceLineNo">7136</span> walKey = new HLogKey(<a name="line.7136"></a> +<span class="sourceLineNo">7137</span> getRegionInfo().getEncodedNameAsBytes(),<a name="line.7137"></a> +<span class="sourceLineNo">7138</span> this.htableDescriptor.getTableName(),<a name="line.7138"></a> +<span class="sourceLineNo">7139</span> WALKey.NO_SEQUENCE_ID,<a name="line.7139"></a> +<span class="sourceLineNo">7140</span> nonceGroup,<a name="line.7140"></a> +<span class="sourceLineNo">7141</span> nonce,<a name="line.7141"></a> +<span class="sourceLineNo">7142</span> mvcc);<a name="line.7142"></a> +<span class="sourceLineNo">7143</span> txid =<a name="line.7143"></a> +<span class="sourceLineNo">7144</span> this.wal.append(this.htableDescriptor, getRegionInfo(), walKey, walEdits, true);<a name="line.7144"></a> +<span class="sourceLineNo">7145</span> } else {<a name="line.7145"></a> +<span class="sourceLineNo">7146</span> recordMutationWithoutWal(mutate.getFamilyCellMap());<a name="line.7146"></a> +<span class="sourceLineNo">7147</span> }<a name="line.7147"></a> +<span class="sourceLineNo">7148</span> }<a name="line.7148"></a> +<span class="sourceLineNo">7149</span> if (walKey == null) {<a name="line.7149"></a> +<span class="sourceLineNo">7150</span> // Append a faked WALEdit in order for SKIP_WAL updates to get mvcc assigned<a name="line.7150"></a> +<span class="sourceLineNo">7151</span> walKey = this.appendEmptyEdit(this.wal);<a name="line.7151"></a> +<span class="sourceLineNo">7152</span> }<a name="line.7152"></a> +<span class="sourceLineNo">7153</span><a name="line.7153"></a> +<span class="sourceLineNo">7154</span> // now start my own transaction<a name="line.7154"></a> +<span class="sourceLineNo">7155</span> writeEntry = walKey.getWriteEntry();<a name="line.7155"></a> +<span class="sourceLineNo">7156</span><a name="line.7156"></a> +<span class="sourceLineNo">7157</span><a name="line.7157"></a> +<span class="sourceLineNo">7158</span> // Actually write to Memstore now<a name="line.7158"></a> +<span class="sourceLineNo">7159</span> if (!tempMemstore.isEmpty()) {<a name="line.7159"></a> +<span class="sourceLineNo">7160</span> for (Map.Entry<Store, List<Cell>> entry : tempMemstore.entrySet()) {<a name="line.7160"></a> +<span class="sourceLineNo">7161</span> Store store = entry.getKey();<a name="line.7161"></a> +<span class="sourceLineNo">7162</span> if (store.getFamily().getMaxVersions() == 1) {<a name="line.7162"></a> +<span class="sourceLineNo">7163</span> // upsert if VERSIONS for this CF == 1<a name="line.7163"></a> +<span class="sourceLineNo">7164</span> // Is this right? It immediately becomes visible? St.Ack 20150907<a name="line.7164"></a> +<span class="sourceLineNo">7165</span> size += store.upsert(entry.getValue(), getSmallestReadPoint());<a name="line.7165"></a> +<span class="sourceLineNo">7166</span> } else {<a name="line.7166"></a> +<span class="sourceLineNo">7167</span> // otherwise keep older versions around<a name="line.7167"></a> +<span class="sourceLineNo">7168</span> for (Cell cell: entry.getValue()) {<a name="line.7168"></a> +<span class="sourceLineNo">7169</span> CellUtil.setSequenceId(cell, writeEntry.getWriteNumber());<a name="line.7169"></a> +<span class="sourceLineNo">7170</span> size += store.add(cell);<a name="line.7170"></a> +<span class="sourceLineNo">7171</span> doRollBackMemstore = true;<a name="line.7171"></a> +<span class="sourceLineNo">7172</span> }<a name="line.7172"></a> +<span class="sourceLineNo">7173</span> }<a name="line.7173"></a> +<span class="sourceLineNo">7174</span> // We add to all KVs here whereas when doing increment, we do it<a name="line.7174"></a> +<span class="sourceLineNo">7175</span> // earlier... why?<a name="line.7175"></a> +<span class="sourceLineNo">7176</span> allKVs.addAll(entry.getValue());<a name="line.7176"></a> +<span class="sourceLineNo">7177</span> }<a name="line.7177"></a> +<span class="sourceLineNo">7178</span><a name="line.7178"></a> +<span class="sourceLineNo">7179</span> size = this.addAndGetGlobalMemstoreSize(size);<a name="line.7179"></a> +<span class="sourceLineNo">7180</span> flush = isFlushSize(size);<a name="line.7180"></a> +<span class="sourceLineNo">7181</span> }<a name="line.7181"></a> +<span class="sourceLineNo">7182</span> } finally {<a name="line.7182"></a> +<span class="sourceLineNo">7183</span> this.updatesLock.readLock().unlock();<a name="line.7183"></a> +<span class="sourceLineNo">7184</span> }<a name="line.7184"></a> <span class="sourceLineNo">7185</span><a name="line.7185"></a> -<span class="sourceLineNo">7186</span> size = this.addAndGetGlobalMemstoreSize(size);<a name="line.7186"></a> -<span class="sourceLineNo">7187</span> flush = isFlushSize(size);<a name="line.7187"></a> -<span class="sourceLineNo">7188</span> }<a name="line.7188"></a> -<span class="sourceLineNo">7189</span> } finally {<a name="line.7189"></a> -<span class="sourceLineNo">7190</span> this.updatesLock.readLock().unlock();<a name="line.7190"></a> -<span class="sourceLineNo">7191</span> }<a name="line.7191"></a> -<span class="sourceLineNo">7192</span><a name="line.7192"></a> -<span class="sourceLineNo">7193</span> } finally {<a name="line.7193"></a> -<span class="sourceLineNo">7194</span> rowLock.release();<a name="line.7194"></a> -<span class="sourceLineNo">7195</span> rowLock = null;<a name="line.7195"></a> -<span class="sourceLineNo">7196</span> }<a name="line.7196"></a> -<span class="sourceLineNo">7197</span> // sync the transaction log outside the rowlock<a name="line.7197"></a> -<span class="sourceLineNo">7198</span> if(txid != 0){<a name="line.7198"></a> -<span class="sourceLineNo">7199</span> syncOrDefer(txid, durability);<a name="line.7199"></a> -<span class="sourceLineNo">7200</span> }<a name="line.7200"></a> -<span class="sourceLineNo">7201</span> doRollBackMemstore = false;<a name="line.7201"></a> -<span class="sourceLineNo">7202</span> } finally {<a name="line.7202"></a> -<span class="sourceLineNo">7203</span> if (rowLock != null) {<a name="line.7203"></a> -<span class="sourceLineNo">7204</span> rowLock.release();<a name="line.7204"></a> +<span class="sourceLineNo">7186</span> } finally {<a name="line.7186"></a> +<span class="sourceLineNo">7187</span> rowLock.release();<a name="line.7187"></a> +<span class="sourceLineNo">7188</span> rowLock = null;<a name="line.7188"></a> +<span class="sourceLineNo">7189</span> }<a name="line.7189"></a> +<span class="sourceLineNo">7190</span> // sync the transaction log outside the rowlock<a name="line.7190"></a> +<span class="sourceLineNo">7191</span> if(txid != 0){<a name="line.7191"></a> +<span class="sourceLineNo">7192</span> syncOrDefer(txid, durability);<a name="line.7192"></a> +<span class="sourceLineNo">7193</span> }<a name="line.7193"></a> +<span class="sourceLineNo">7194</span> doRollBackMemstore = false;<a name="line.7194"></a> +<span class="sourceLineNo">7195</span> } finally {<a name="line.7195"></a> +<span class="sourceLineNo">7196</span> if (rowLock != null) {<a name="line.7196"></a> +<span class="sourceLineNo">7197</span> rowLock.release();<a name="line.7197"></a> +<span class="sourceLineNo">7198</span> }<a name="line.7198"></a> +<span class="sourceLineNo">7199</span> // if the wal sync was unsuccessful, remove keys from memstore<a name="line.7199"></a> +<span class="sourceLineNo">7200</span> if (doRollBackMemstore) {<a name="line.7200"></a> +<span class="sourceLineNo">7201</span> rollbackMemstore(allKVs);<a name="line.7201"></a> +<span class="sourceLineNo">7202</span> if (writeEntry != null) mvcc.complete(writeEntry);<a name="line.7202"></a> +<span class="sourceLineNo">7203</span> } else if (writeEntry != null) {<a name="line.7203"></a> +<span class="sourceLineNo">7204</span> mvcc.completeAndWait(writeEntry);<a name="line.7204"></a> <span class="sourceLineNo">7205</span> }<a name="line.7205"></a> -<span class="sourceLineNo">7206</span> // if the wal sync was unsuccessful, remove keys from memstore<a name="line.7206"></a> -<span class="sourceLineNo">7207</span> if (doRollBackMemstore) {<a name="line.7207"></a> -<span class="sourceLineNo">7208</span> rollbackMemstore(allKVs);<a name="line.7208"></a> -<span class="sourceLineNo">7209</span> if (writeEntry != null) mvcc.complete(writeEntry);<a name="line.7209"></a> -<span class="sourceLineNo">7210</span> } else if (writeEntry != null) {<a name="line.7210"></a> -<span class="sourceLineNo">7211</span> mvcc.completeAndWait(writeEntry);<a name="line.7211"></a> -<span class="sourceLineNo">7212</span> }<a name="line.7212"></a> +<span class="sourceLineNo">7206</span><a name="line.7206"></a> +<span class="sourceLineNo">7207</span> closeRegionOperation(op);<a name="line.7207"></a> +<span class="sourceLineNo">7208</span> }<a name="line.7208"></a> +<span class="sourceLineNo">7209</span><a name="line.7209"></a> +<span class="sourceLineNo">7210</span> if (this.metricsRegion != null) {<a name="line.7210"></a> +<span class="sourceLineNo">7211</span> this.metricsRegion.updateAppend();<a name="line.7211"></a> +<span class="sourceLineNo">7212</span> }<a name="line.7212"></a> <span class="sourceLineNo">7213</span><a name="line.7213"></a> -<span class="sourceLineNo">7214</span> closeRegionOperation(op);<a name="line.7214"></a> -<span class="sourceLineNo">7215</span> }<a name="line.7215"></a> -<span class="sourceLineNo">7216</span><a name="line.7216"></a> -<span class="sourceLineNo">7217</span> if (this.metricsRegion != null) {<a name="line.7217"></a> -<span class="sourceLineNo">7218</span> this.metricsRegion.updateAppend();<a name="line.7218"></a> -<span class="sourceLineNo">7219</span> }<a name="line.7219"></a> -<span class="sourceLineNo">7220</span><a name="line.7220"></a> -<span class="sourceLineNo">7221</span> if (flush) {<a name="line.7221"></a> -<span class="sourceLineNo">7222</span> // Request a cache flush. Do it outside update lock.<a name="line.7222"></a> -<span class="sourceLineNo">7223</span> requestFlush();<a name="line.7223"></a> -<span class="sourceLineNo">7224</span> }<a name="line.7224"></a> +<span class="sourceLineNo">7214</span> if (flush) {<a name="line.7214"></a> +<span class="sourceLineNo">7215</span> // Request a cache flush. Do it outside update lock.<a name="line.7215"></a> +<span class="sourceLineNo">7216</span> requestFlush();<a name="line.7216"></a> +<span class="sourceLineNo">7217</span> }<a name="line.7217"></a> +<span class="sourceLineNo">7218</span><a name="line.7218"></a> +<span class="sourceLineNo">7219</span> return mutate.isReturnResults() ? Result.create(allKVs) : null;<a name="line.7219"></a> +<span class="sourceLineNo">7220</span> }<a name="line.7220"></a> +<span class="sourceLineNo">7221</span><a name="line.7221"></a> +<span class="sourceLineNo">7222</span> public Result increment(Increment increment) throws IOException {<a name="line.7222"></a> +<span class="sourceLineNo">7223</span> return increment(increment, HConstants.NO_NONCE, HConstants.NO_NONCE);<a name="line.7223"></a> +<span class="sourceLineNo">7224</span> }<a name="line.7224"></a> <span class="sourceLineNo">7225</span><a name="line.7225"></a> -<span class="sourceLineNo">7226</span> return mutate.isReturnResults() ? Result.create(allKVs) : null;<a name="line.7226"></a> -<span class="sourceLineNo">7227</span> }<a name="line.7227"></a> -<span class="sourceLineNo">7228</span><a name="line.7228"></a> -<span class="sourceLineNo">7229</span> public Result increment(Increment increment) throws IOException {<a name="line.7229"></a> -<span class="sourceLineNo">7230</span> return increment(increment, HConstants.NO_NONCE, HConstants.NO_NONCE);<a name="line.7230"></a> -<span class="sourceLineNo">7231</span> }<a name="line.7231"></a> -<span class="sourceLineNo">7232</span><a name="line.7232"></a> -<span class="sourceLineNo">7233</span> // TODO: There's a lot of boiler plate code identical to append.<a name="line.7233"></a> -<span class="sourceLineNo">7234</span> // We should refactor append and increment as local get-mutate-put<a name="line.7234"></a> -<span class="sourceLineNo">7235</span> // transactions, so all stores only go through one code path for puts.<a name="line.7235"></a> -<span class="sourceLineNo">7236</span><a name="line.7236"></a> -<span class="sourceLineNo">7237</span> // They are subtley different in quiet a few ways. This came out only<a name="line.7237"></a> -<span class="sourceLineNo">7238</span> // after study. I am not sure that many of the differences are intentional.<a name="line.7238"></a> -<span class="sourceLineNo">7239</span> // TODO: St.Ack 20150907 <a name="line.7239"></a> -<span class="sourceLineNo">7240</span><a name="line.7240"></a> -<span class="sourceLineNo">7241</span> @Override<a name="line.7241"></a> -<span class="sourceLineNo">7242</span> public Result increment(Increment mutation, long nonceGroup, long nonce)<a name="line.7242"></a> -<span class="sourceLineNo">7243</span> throws IOException {<a name="line.7243"></a> -<span class="sourceLineNo">7244</span> Operation op = Operation.INCREMENT;<a name="line.7244"></a> -<span class="sourceLineNo">7245</span> byte [] row = mutation.getRow();<a name="line.7245"></a> -<span class="sourceLineNo">7246</span> checkRow(row, op.toString());<a name="line.7246"></a> -<span class="sourceLineNo">7247</span> checkFamilies(mutation.getFamilyCellMap().keySet());<a name="line.7247"></a> -<span class="sourceLineNo">7248</span> boolean flush = false;<a name="line.7248"></a> -<span class="sourceLineNo">7249</span> Durability durability = getEffectiveDurability(mutation.getDurability());<a name="line.7249"></a> -<span class="sourceLineNo">7250</span> boolean writeToWAL = durability != Durability.SKIP_WAL;<a name="line.7250"></a> -<span class="sourceLineNo">7251</span> WALEdit walEdits = null;<a name="line.7251"></a> -<span class="sourceLineNo">7252</span> List<Cell> allKVs = new ArrayList<Cell>(mutation.size());<a name="line.7252"></a> -<span class="sourceLineNo">7253</span> <a name="line.7253"></a> -<span class="sourceLineNo">7254</span> Map<Store, List<Cell>> tempMemstore = new HashMap<Store, List<Cell>>();<a name="line.7254"></a> -<span class="sourceLineNo">7255</span> long size = 0;<a name="line.7255"></a> -<span class="sourceLineNo">7256</span> long txid = 0;<a name="line.7256"></a> -<span class="sourceLineNo">7257</span> checkReadOnly();<a name="line.7257"></a> -<span class="sourceLineNo">7258</span> checkResources();<a name="line.7258"></a> -<span class="sourceLineNo">7259</span> // Lock row<a name="line.7259"></a> -<span class="sourceLineNo">7260</span> startRegionOperation(op);<a name="line.7260"></a> -<span class="sourceLineNo">7261</span> this.writeRequestsCount.increment();<a name="line.7261"></a> -<span class="sourceLineNo">7262</span> RowLock rowLock = null;<a name="line.7262"></a> -<span class="sourceLineNo">7263</span> WALKey walKey = null;<a name="line.7263"></a> -<span class="sourceLineNo">7264</span> MultiVersionConcurrencyControl.WriteEntry writeEntry = null;<a name="line.7264"></a> -<span class="sourceLineNo">7265</span> boolean doRollBackMemstore = false;<a name="line.7265"></a> -<span class="sourceLineNo">7266</span> TimeRange tr = mutation.getTimeRange();<a name="line.7266"></a> -<span class="sourceLineNo">7267</span> try {<a name="line.7267"></a> -<span class="sourceLineNo">7268</span> rowLock = getRowLock(row);<a name="line.7268"></a> -<span class="sourceLineNo">7269</span> assert rowLock != null;<a name="line.7269"></a> -<span class="sourceLineNo">7270</span> try {<a name="line.7270"></a> -<span class="sourceLineNo">7271</span> lock(this.updatesLock.readLock());<a name="line.7271"></a> -<span class="sourceLineNo">7272</span> try {<a name="line.7272"></a> -<span class="sourceLineNo">7273</span> // wait for all prior MVCC transactions to finish - while we hold the row lock<a name="line.7273"></a> -<span class="sourceLineNo">7274</span> // (so that we are guaranteed to see the latest state)<a name="line.7274"></a> -<span class="sourceLineNo">7275</span> mvcc.await();<a name="line.7275"></a> -<span class="sourceLineNo">7276</span> if (this.coprocessorHost != null) {<a name="line.7276"></a> -<span class="sourceLineNo">7277</span> Result r = this.coprocessorHost.preIncrementAfterRowLock(mutation);<a name="line.7277"></a> -<span class="sourceLineNo">7278</span> if (r != null) {<a name="line.7278"></a> -<span class="sourceLineNo">7279</span> return r;<a name="line.7279"></a> -<span class="sourceLineNo">7280</span> }<a name="line.7280"></a> -<span class="sourceLineNo">7281</span> }<a name="line.7281"></a> -<span class="sourceLineNo">7282</span> long now = EnvironmentEdgeManager.currentTime();<a name="line.7282"></a> -<span class="sourceLineNo">7283</span> // Process each family<a name="line.7283"></a> -<span class="sourceLineNo">7284</span> for (Map.Entry<byte [], List<Cell>> family: mutation.getFamilyCellMap().entrySet()) {<a name="line.7284"></a> -<span class="sourceLineNo">7285</span> Store store = stores.get(family.getKey());<a name="line.7285"></a> -<span class="sourceLineNo">7286</span> List<Cell> kvs = new ArrayList<Cell>(family.getValue().size());<a name="line.7286"></a> -<span class="sourceLineNo">7287</span><a name="line.7287"></a> -<span class="sourceLineNo">7288</span> List<Cell> results = doGet(store, row, family, tr);<a name="line.7288"></a> -<span class="sourceLineNo">7289</span><a name="line.7289"></a> -<span class="sourceLineNo">7290</span> // Iterate the input columns and update existing values if they were<a name="line.7290"></a> -<span class="sourceLineNo">7291</span> // found, otherwise add new column initialized to the increment amount<a name="line.7291"></a> -<span class="sourceLineNo">7292</span><a name="line.7292"></a> -<span class="sourceLineNo">7293</span> // Avoid as much copying as possible. We may need to rewrite and<a name="line.7293"></a> -<span class="sourceLineNo">7294</span> // consolidate tags. Bytes are only copied once.<a name="line.7294"></a> -<span class="sourceLineNo">7295</span> // Would be nice if KeyValue had scatter/gather logic<a name="line.7295"></a> -<span class="sourceLineNo">7296</span> int idx = 0;<a name="line.7296"></a> -<span class="sourceLineNo">7297</span> // HERE WE DIVERGE FROM APPEND<a name="line.7297"></a> -<span class="sourceLineNo">7298</span> List<Cell> edits = family.getValue();<a name="line.7298"></a> -<span class="sourceLineNo">7299</span> for (int i = 0; i < edits.size(); i++) {<a name="line.7299"></a> -<span class="sourceLineNo">7300</span> Cell cell = edits.get(i);<a name="line.7300"></a> -<span class="sourceLineNo">7301</span> long amount = Bytes.toLong(CellUtil.cloneValue(cell));<a name="line.7301"></a> -<span class="sourceLineNo">7302</span> boolean noWriteBack = (amount == 0);<a name="line.7302"></a> -<span class="sourceLineNo">7303</span><a name="line.7303"></a> -<span class="sourceLineNo">7304</span> List<Tag> newTags = carryForwardTags(cell, new ArrayList<Tag>());<a name="line.7304"></a> -<span class="sourceLineNo">7305</span><a name="line.7305"></a> -<span class="sourceLineNo">7306</span> Cell c = null;<a name="line.7306"></a> -<span class="sourceLineNo">7307</span> long ts = now;<a name="line.7307"></a> -<span class="sourceLineNo">7308</span> if (idx < results.size() && CellUtil.matchingQualifier(results.get(idx), cell)) {<a name="line.7308"></a> -<span class="sourceLineNo">7309</span> c = results.get(idx);<a name="line.7309"></a> -<span class="sourceLineNo">7310</span> ts = Math.max(now, c.getTimestamp());<a name="line.7310"></a> -<span class="sourceLineNo">7311</span> if(c.getValueLength() == Bytes.SIZEOF_LONG) {<a name="line.7311"></a> -<span class="sourceLineNo">7312</span> amount += CellUtil.getValueAsLong(c);<a name="line.7312"></a> -<span class="sourceLineNo">7313</span> } else {<a name="line.7313"></a> -<span class="sourceLineNo">7314</span> // throw DoNotRetryIOException instead of IllegalArgumentException<a name="line.7314"></a> -<span class="sourceLineNo">7315</span> throw new org.apache.hadoop.hbase.DoNotRetryIOException(<a name="line.7315"></a> -<span class="sourceLineNo">7316</span> "Attempted to increment field that isn't 64 bits wide");<a name="line.7316"></a> -<span class="sourceLineNo">7317</span> }<a name="line.7317"></a> -<span class="sourceLineNo">7318</span> // Carry tags forward from previous version<a name="line.7318"></a> -<span class="sourceLineNo">7319</span> newTags = carryForwardTags(c, newTags);<a name="line.7319"></a> -<span class="sourceLineNo">7320</span> if (i < (edits.size() - 1) && !CellUtil.matchingQualifier(cell, edits.get(i + 1))) {<a name="line.7320"></a> -<span class="sourceLineNo">7321</span> idx++;<a name="line.7321"></a> -<span class="sourceLineNo">7322</span> }<a name="line.7322"></a> -<span class="sourceLineNo">7323</span> }<a name="line.7323"></a> -<span class="sourceLineNo">7324</span><a name="line.7324"></a> -<span class="sourceLineNo">7325</span> // Append new incremented KeyValue to list<a name="line.7325"></a> -<span class="sourceLineNo">7326</span> byte[] q = CellUtil.cloneQualifier(cell);<a name="line.7326"></a> -<span class="sourceLineNo">7327</span> byte[] val = Bytes.toBytes(amount);<a name="line.7327"></a> -<span class="sourceLineNo">7328</span><a name="line.7328"></a> -<span class="sourceLineNo">7329</span> // Add the TTL tag if the mutation carried one<a name="line.7329"></a> -<span class="sourceLineNo">7330</span> if (mutation.getTTL() != Long.MAX_VALUE) {<a name="line.7330"></a> -<span class="sourceLineNo">7331</span> newTags.add(new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(mutation.getTTL())));<a name="line.7331"></a> -<span class="sourceLineNo">7332</span> }<a name="line.7332"></a> -<span class="sourceLineNo">7333</span><a name="line.7333"></a> -<span class="sourceLineNo">7334</span> Cell newKV = new KeyValue(row, 0, row.length,<a name="line.7334"></a> -<span class="sourceLineNo">7335</span> family.getKey(), 0, family.getKey().length,<a name="line.7335"></a> -<span class="sourceLineNo">7336</span> q, 0, q.length,<a name="line.7336"></a> -<span class="sourceLineNo">7337</span> ts,<a name="line.7337"></a> -<span class="sourceLineNo">7338</span> KeyValue.Type.Put,<a name="line.7338"></a> -<span class="sourceLineNo">7339</span> val, 0, val.length,<a name="line.7339"></a> -<span class="sourceLineNo">7340</span> newTags);<a name="line.7340"></a> +<span class="sourceLineNo">7226</span> // TODO: There's a lot of boiler plate code identical to append.<a name="line.7226"></a> +<span class="sourceLineNo">7227</span> // We should refactor append and increment as local get-mutate-put<a name="line.7227"></a> +<span class="sourceLineNo">7228</span> // transactions, so all stores only go through one code path for puts.<a name="line.7228"></a> +<span class="sourceLineNo">7229</span><a name="line.7229"></a> +<span class="sourceLineNo">7230</span> // They are subtley different in quiet a few ways. This came out only<a name="line.7230"></a> +<span class="sourceLineNo">7231</span> // after study. I am not sure that many of the differences are intentional.<a name="line.7231"></a> +<span class="sourceLineNo">7232</span> // TODO: St.Ack 20150907 <a name="line.7232"></a> +<span class="sourceLineNo">7233</span><a name="line.7233"></a> +<span class="sourceLineNo">7234</span> @Override<a name="line.7234"></a> +<span class="sourceLineNo">7235</span> public Result increment(Increment mutation, long nonceGroup, long nonce)<a name="line.7235"></a> +<span class="sourceLineNo">7236</span> throws IOException {<a name="line.7236"></a> +<span class="sourceLineNo">7237</span> Operation op = Operation.INCREMENT;<a name="line.7237"></a> +<span class="sourceLineNo">7238</span> byte [] row = mutation.getRow();<a name="line.7238"></a> +<span class="sourceLineNo">7239</span> checkRow(row, op.toString());<a name="line.7239"></a> +<span class="sourceLineNo">7240</span> checkFamilies(mutation.getFamilyCellMap().keySet());<a name="line.7240"></a> +<span class="sourceLineNo">7241</span> boolean flush = false;<a name="line.7241"></a> +<span class="sourceLineNo">7242</span> Durability durability = getEffectiveDurability(mutation.getDurability());<a name="line.7242"></a> +<span class="sourceLineNo">7243</span> boolean writeToWAL = durability != Durability.SKIP_WAL;<a name="line.7243"></a> +<span class="sourceLineNo">7244</span> WALEdit walEdits = null;<a name="line.7244"></a> +<span class="sourceLineNo">7245</span> List<Cell> allKVs = new ArrayList<Cell>(mutation.size());<a name="line.7245"></a> +<span class="sourceLineNo">7246</span> <a name="line.7246"></a> +<span class="sourceLineNo">7247</span> Map<Store, List<Cell>> tempMemstore = new HashMap<Store, List<Cell>>();<a name="line.7247"></a> +<span class="sourceLineNo">7248</span> long size = 0;<a name="line.7248"></a> +<span class="sourceLineNo">7249</span> long txid = 0;<a name="line.7249"></a> +<span class="sourceLineNo">7250</span> checkReadOnly();<a name="line.7250"></a> +<span class="sourceLineNo">7251</span> checkResources();<a name="line.7251"></a> +<span class="sourceLineNo">7252</span> // Lock row<a name="line.7252"></a> +<span class="sourceLineNo">7253</span> startRegionOperation(op);<a name="line.7253"></a> +<span class="sourceLineNo">7254</span> this.writeRequestsCount.increment();<a name="line.7254"></a> +<span class="sourceLineNo">7255</span> RowLock rowLock = null;<a name="line.7255"></a> +<span class="sourceLineNo">7256</span> WALKey walKey = null;<a name="line.7256"></a> +<span class="sourceLineNo">7257</span> MultiVersionConcurrencyControl.WriteEntry writeEntry = null;<a name="line.7257"></a> +<span class="sourceLineNo">7258</span> boolean doRollBackMemstore = false;<a name="line.7258"></a> +<span class="sourceLineNo">7259</span> TimeRange tr = mutation.getTimeRange();<a name="line.7259"></a> +<span class="sourceLineNo">7260</span> try {<a name="line.7260"></a> +<span class="sourceLineNo">7261</span> rowLock = getRowLock(row);<a name="line.7261"></a> +<span class="sourceLineNo">7262</span> assert rowLock != null;<a name="line.7262"></a> +<span class="sourceLineNo">7263</span> try {<a name="line.7263"></a> +<span class="sourceLineNo">7264</span> lock(this.updatesLock.readLock());<a name="line.7264"></a> +<span class="sourceLineNo">7265</span> try {<a name="line.7265"></a> +<span class="sourceLineNo">7266</span> // wait for all prior MVCC transactions to finish - while we hold the row lock<a name="line.7266"></a> +<span class="sourceLineNo">7267</span> // (so that we are guaranteed to see the latest state)<a name="line.7267"></a> +<span class="sourceLineNo">7268</span> mvcc.await();<a name="line.7268"></a> +<span class="sourceLineNo">7269</span> if (this.coprocessorHost != null) {<a name="line.7269"></a> +<span class="sourceLineNo">7270</span> Result r = this.coprocessorHost.preIncrementAfterRowLock(mutation);<a name="line.7270"></a> +<span class="sourceLineNo">7271</span> if (r != null) {<a name="line.7271"></a> +<span class="sourceLineNo">7272</span> return r;<a name="line.7272"></a> +<span class="sourceLineNo">7273</span> }<a name="line.7273"></a> +<span class="sourceLineNo">7274</span> }<a name="line.7274"></a> +<span class="sourceLineNo">7275</span> long now = EnvironmentEdgeManager.currentTime();<a name="line.7275"></a> +<span class="sourceLineNo">7276</span> // Process each family<a name="line.7276"></a> +<span class="sourceLineNo">7277</span> for (Map.Entry<byte [], List<Cell>> family: mutation.getFamilyCellMap().entrySet()) {<a name="line.7277"></a> +<span class="sourceLineNo">7278</span> Store store = stores.get(family.getKey());<a name="line.7278"></a> +<span class="sourceLineNo">7279</span> List<Cell> kvs = new ArrayList<Cell>(family.getValue().size());<a name="line.7279"></a> +<span class="sourceLineNo">7280</span><a name="line.7280"></a> +<span class="sourceLineNo">7281</span> List<Cell> results = doGet(store, row, family, tr);<a name="line.7281"></a> +<span class="sourceLineNo">7282</span><a name="line.7282"></a> +<span class="sourceLineNo">7283</span> // Iterate the input columns and update existing values if they were<a name="line.7283"></a> +<span class="sourceLineNo">7284</span> // found, otherwise add new column initialized to the increment amount<a name="line.7284"></a> +<span class="sourceLineNo">7285</span><a name="line.7285"></a> +<span class="sourceLineNo">7286</span> // Avoid as much copying as possible. We may need to rewrite and<a name="line.7286"></a> +<span class="sourceLineNo">7287</span> // consolidate tags. Bytes are only copied once.<a name="line.7287"></a> +<span class="sourceLineNo">7288</span> // Would be nice if KeyValue had scatter/gather logic<a name="line.7288"></a> +<span class="sourceLineNo">7289</span> int idx = 0;<a name="line.7289"></a> +<span class="sourceLineNo">7290</span> // HERE WE DIVERGE FROM APPEND<a name="line.7290"></a> +<span class="sourceLineNo">7291</span> List<Cell> edits = family.getValue();<a name="line.7291"></a> +<span class="sourceLineNo">7292</span> for (int i = 0; i < edits.size(); i++) {<a name="line.7292"></a> +<span class="sourceLineNo">7293</span> Cell cell = edits.get(i);<a name="line.7293"></a> +<span class="sourceLineNo">7294</span> long amount = Bytes.toLong(CellUtil.cloneValue(cell));<a name="line.7294"></a> +<span class="sourceLineNo">7295</span> boolean noWriteBack = (amount == 0);<a name="line.7295"></a> +<span class="sourceLineNo">7296</span><a name="line.7296"></a> +<span class="sourceLineNo">7297</span> List<Tag> newTags = carryForwardTags(cell, new ArrayList<Tag>());<a name="line.7297"></a> +<span class="sourceLineNo">7298</span><a name="line.7298"></a> +<span class="sourceLineNo">7299</span> Cell c = null;<a name="line.7299"></a> +<span class="sourceLineNo">7300</span> long ts = now;<a name="line.7300"></a> +<span class="sourceLineNo">7301</span> if (idx < results.size() && CellUtil.matchingQualifier(results.get(idx), cell)) {<a name="line.7301"></a> +<span class="sourceLineNo">7302</span> c = results.get(idx);<a name="line.7302"></a> +<span class="sourceLineNo">7303</span> ts = Math.max(now, c.getTimestamp());<a name="line.7303"></a> +<span class="sourceLineNo">7304</span> if(c.getValueLength() == Bytes.SIZEOF_LONG) {<a name="line.7304"></a> +<span class="sourceLineNo">7305</span> amount += CellUtil.getValueAsLong(c);<a name="line.7305"></a> +<span class="sourceLineNo">7306</span> } else {<a name="line.7306"></a> +<span class="sourceLineNo">7307</span> // throw DoNotRetryIOException instead of IllegalArgumentException<a name="line.7307"></a> +<span class="sourceLineNo">7308</span> throw new org.apache.hadoop.hbase.DoNotRetryIOException(<a name="line.7308"></a> +<span class="sourceLineNo">7309</span> "Attempted to increment field that isn't 64 bits wide");<a name="line.7309"></a> +<span class="sourceLineNo">7310</span> }<a name="line.7310"></a> +<span class="sourceLineNo">7311</span> // Carry tags forward from previous version<a name="line.7311"></a> +<span class="sourceLineNo">7312</span> newTags = carryForwardTags(c, newTags);<a name="line.7312"></a> +<span class="sourceLineNo">7313</span> if (i < (edits.size() - 1) && !CellUtil.matchingQualifier(cell, edits.get(i + 1))) {<a name="line.7313"></a> +<span class="sourceLineNo">7314</span> idx++;<a name="line.7314"></a> +<span class="sourceLineNo">7315</span> }<a name="line.7315"></a> +<span class="sourceLineNo">7316</span> }<a name="line.7316"></a> +<span class="sourceLineNo">7317</span><a name="line.7317"></a> +<span class="sourceLineNo">7318</span> // Append new incremented KeyValue to list<a name="line.7318"></a> +<span class="sourceLineNo">7319</span> byte[] q = CellUtil.cloneQualifier(cell);<a name="line.7319"></a> +<span class="sourceLineNo">7320</span> byte[] val = Bytes.toBytes(amount);<a name="line.7320"></a> +<span class="sourceLineNo">7321</span><a name="line.7321"></a> +<span class="sourceLineNo">7322</span> // Add the TTL tag if the mutation carried one<a name="line.7322"></a> +<span class="sourceLineNo">7323</span> if (mutation.getTTL() != Long.MAX_VALUE) {<a name="line.7323"></a> +<span class="sourceLineNo">7324</span> newTags.add(new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(mutation.getTTL())));<a name="line.7324"></a> +<span class="sourceLineNo">7325</span> }<a name="line.7325"></a> +<span class="sourceLineNo">7326</span><a name="line.7326"></a> +<span class="sourceLineNo">7327</span> Cell newKV = new KeyValue(row, 0, row.length,<a name="line.7327"></a> +<span class="sourceLineNo">7328</span> family.getKey(), 0, family.getKey().length,<a name="line.7328"></a> +<span class="sourceLineNo">7329</span> q, 0, q.length,<a name="line.7329"></a> +<span class="sourceLineNo">7330</span> ts,<a name="line.7330"></a> +<span class="sourceLineNo">7331</span> KeyValue.Type.Put,<a name="line.7331"></a> +<span class="sourceLineNo">7332</span> val, 0, val.length,<a name="line.7332"></a> +<span class="sourceLineNo">7333</span> newTags);<a name="line.7333"></a> +<span class="sourceLineNo">7334</span><a name="line.7334"></a> +<span class="sourceLineNo">7335</span> // Give coprocessors a chance to update the new cell<a name="line.7335"></a> +<span class="sourceLineNo">7336</span> if (coprocessorHost != null) {<a name="line.7336"></a> +<span class="sourceLineNo">7337</span> newKV = coprocessorHost.postMutationBeforeWAL(<a name="line.7337"></a> +<span class="sourceLineNo">7338</span> RegionObserver.MutationType.INCREMENT, mutation, c, newKV);<a name="line.7338"></a> +<span class="sourceLineNo">7339</span> }<a name="line.7339"></a> +<span class="sourceLineNo">7340</span> allKVs.add(newKV);<a name="line.7340"></a> <span class="sourceLineNo">7341</span><a name="line.7341"></a> -<span class="sourceLineNo">7342</span> // Give coprocessors a chance to update the new cell<a name="line.7342"></a> -<span class="sourceLineNo">7343</span> if (coprocessorHost != null) {<a name="line.7343"></a> -<span class="sourceLineNo">7344</span> newKV = coprocessorHost.postMutationBeforeWAL(<a name="line.7344"></a> -<span class="sourceLineNo">7345</span> RegionObserver.MutationType.INCREMENT, mutation, c, newKV);<a name="line.7345"></a> -<span class="sourceLineNo">7346</span> }<a name="line.7346"></a> -<span class="sourceLineNo">7347</span> allKVs.add(newKV);<a name="line.7347"></a> -<span class="sourceLineNo">7348</span><a name="line.7348"></a> -<span class="sourceLineNo">7349</span> if (!noWriteBack) {<a name="line.7349"></a> -<span class="sourceLineNo">7350</span> kvs.add(newKV);<a name="line.7350"></a> -<span class="sourceLineNo">7351</span><a name="line.7351"></a> -<span class="sourceLineNo">7352</span> // Prepare WAL updates<a name="line.7352"></a> -<span class="sourceLineNo">7353</span> if (writeToWAL) {<a name="line.7353"></a> -<span class="sourceLineNo">7354</span> if (walEdits == null) {<a name="line.7354"></a> -<span class="sourceLineNo">7355</span> walEdits = new WALEdit();<a name="line.7355"></a> -<span class="sourceLineNo">7356</span> }<a name="line.7356"></a> -<span class="sourceLineNo">7357</span> walEdits.add(newKV);<a name="line.7357"></a> -<span class="sourceLineNo">7358</span> }<a name="line.7358"></a> -<span class="sourceLineNo">7359</span> }<a name="line.7359"></a> -<span class="sourceLineNo">7360</span> }<a name="line.7360"></a> -<span class="sourceLineNo">7361</span><a name="line.7361"></a> -<span class="sourceLineNo">7362</span> //store the kvs to the temporary memstore before writing WAL<a name="line.7362"></a> -<span class="sourceLineNo">7363</span> if (!kvs.isEmpty()) {<a name="line.7363"></a> -<span class="sourceLineNo">7364</span> tempMemstore.put(store, kvs);<a name="line.7364"></a> -<span class="sourceLineNo">7365</span> }<a name="line.7365"></a> -<span class="sourceLineNo">7366</span> }<a name="line.7366"></a> -<span class="sourceLineNo">7367</span><a name="line.7367"></a> -<span class="sourceLineNo">7368</span> // Actually write to WAL now<a name="line.7368"></a> -<span class="sourceLineNo">7369</span> if (walEdits != null && !walEdits.isEmpty()) {<a name="line.7369"></a> -<span class="sourceLineNo">7370</span> if (writeToWAL) {<a name="line.7370"></a> -<span class="sourceLineNo">7371</span> // Using default cluster id, as this can only happen in the originating<a name="line.7371"></a> -<span class="sourceLineNo">7372</span> // cluster. A slave cluster receives the final value (not the delta)<a name="line.7372"></a> -<span class="sourceLineNo">7373</span> // as a Put.<a name="line.7373"></a> -<span class="sourceLineNo">7374</span> // we use HLogKey here instead of WALKey directly to support legacy coprocessors.<a name="line.7374"></a> -<span class="sourceLineNo">7375</span> walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),<a name="
<TRUNCATED>
