http://git-wip-us.apache.org/repos/asf/hbase-site/blob/670bf1f0/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/HRegion.MutationBatch.html ---------------------------------------------------------------------- diff --git a/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/HRegion.MutationBatch.html b/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/HRegion.MutationBatch.html index 658fe8f..d266952 100644 --- a/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/HRegion.MutationBatch.html +++ b/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/HRegion.MutationBatch.html @@ -2914,5347 +2914,5340 @@ <span class="sourceLineNo">2906</span> * OperationStatusCode and the exceptionMessage if any.<a name="line.2906"></a> <span class="sourceLineNo">2907</span> * @throws IOException<a name="line.2907"></a> <span class="sourceLineNo">2908</span> */<a name="line.2908"></a> -<span class="sourceLineNo">2909</span> OperationStatus[] batchMutate(BatchOperationInProgress<?> batchOp)<a name="line.2909"></a> -<span class="sourceLineNo">2910</span> throws IOException {<a name="line.2910"></a> -<span class="sourceLineNo">2911</span> boolean initialized = false;<a name="line.2911"></a> -<span class="sourceLineNo">2912</span> Operation op = batchOp.isInReplay() ? Operation.REPLAY_BATCH_MUTATE : Operation.BATCH_MUTATE;<a name="line.2912"></a> -<span class="sourceLineNo">2913</span> startRegionOperation(op);<a name="line.2913"></a> -<span class="sourceLineNo">2914</span> int cellCountFromCP = 0;<a name="line.2914"></a> -<span class="sourceLineNo">2915</span> try {<a name="line.2915"></a> -<span class="sourceLineNo">2916</span> while (!batchOp.isDone()) {<a name="line.2916"></a> -<span class="sourceLineNo">2917</span> if (!batchOp.isInReplay()) {<a name="line.2917"></a> -<span class="sourceLineNo">2918</span> checkReadOnly();<a name="line.2918"></a> -<span class="sourceLineNo">2919</span> }<a name="line.2919"></a> -<span class="sourceLineNo">2920</span> checkResources();<a name="line.2920"></a> -<span class="sourceLineNo">2921</span> if (!initialized) {<a name="line.2921"></a> -<span class="sourceLineNo">2922</span> this.writeRequestsCount.add(batchOp.operations.length);<a name="line.2922"></a> -<span class="sourceLineNo">2923</span> if (!batchOp.isInReplay()) {<a name="line.2923"></a> -<span class="sourceLineNo">2924</span> cellCountFromCP = doPreMutationHook(batchOp);<a name="line.2924"></a> -<span class="sourceLineNo">2925</span> }<a name="line.2925"></a> -<span class="sourceLineNo">2926</span> initialized = true;<a name="line.2926"></a> -<span class="sourceLineNo">2927</span> }<a name="line.2927"></a> -<span class="sourceLineNo">2928</span> long addedSize = doMiniBatchMutation(batchOp, cellCountFromCP);<a name="line.2928"></a> -<span class="sourceLineNo">2929</span> long newSize = this.addAndGetGlobalMemstoreSize(addedSize);<a name="line.2929"></a> -<span class="sourceLineNo">2930</span> if (isFlushSize(newSize)) {<a name="line.2930"></a> -<span class="sourceLineNo">2931</span> requestFlush();<a name="line.2931"></a> -<span class="sourceLineNo">2932</span> }<a name="line.2932"></a> -<span class="sourceLineNo">2933</span> }<a name="line.2933"></a> -<span class="sourceLineNo">2934</span> } finally {<a name="line.2934"></a> -<span class="sourceLineNo">2935</span> closeRegionOperation(op);<a name="line.2935"></a> -<span class="sourceLineNo">2936</span> }<a name="line.2936"></a> -<span class="sourceLineNo">2937</span> return batchOp.retCodeDetails;<a name="line.2937"></a> -<span class="sourceLineNo">2938</span> }<a name="line.2938"></a> +<span class="sourceLineNo">2909</span> OperationStatus[] batchMutate(BatchOperationInProgress<?> batchOp) throws IOException {<a name="line.2909"></a> +<span class="sourceLineNo">2910</span> boolean initialized = false;<a name="line.2910"></a> +<span class="sourceLineNo">2911</span> Operation op = batchOp.isInReplay() ? Operation.REPLAY_BATCH_MUTATE : Operation.BATCH_MUTATE;<a name="line.2911"></a> +<span class="sourceLineNo">2912</span> startRegionOperation(op);<a name="line.2912"></a> +<span class="sourceLineNo">2913</span> try {<a name="line.2913"></a> +<span class="sourceLineNo">2914</span> while (!batchOp.isDone()) {<a name="line.2914"></a> +<span class="sourceLineNo">2915</span> if (!batchOp.isInReplay()) {<a name="line.2915"></a> +<span class="sourceLineNo">2916</span> checkReadOnly();<a name="line.2916"></a> +<span class="sourceLineNo">2917</span> }<a name="line.2917"></a> +<span class="sourceLineNo">2918</span> checkResources();<a name="line.2918"></a> +<span class="sourceLineNo">2919</span><a name="line.2919"></a> +<span class="sourceLineNo">2920</span> if (!initialized) {<a name="line.2920"></a> +<span class="sourceLineNo">2921</span> this.writeRequestsCount.add(batchOp.operations.length);<a name="line.2921"></a> +<span class="sourceLineNo">2922</span> if (!batchOp.isInReplay()) {<a name="line.2922"></a> +<span class="sourceLineNo">2923</span> doPreMutationHook(batchOp);<a name="line.2923"></a> +<span class="sourceLineNo">2924</span> }<a name="line.2924"></a> +<span class="sourceLineNo">2925</span> initialized = true;<a name="line.2925"></a> +<span class="sourceLineNo">2926</span> }<a name="line.2926"></a> +<span class="sourceLineNo">2927</span> long addedSize = doMiniBatchMutation(batchOp);<a name="line.2927"></a> +<span class="sourceLineNo">2928</span> long newSize = this.addAndGetGlobalMemstoreSize(addedSize);<a name="line.2928"></a> +<span class="sourceLineNo">2929</span> if (isFlushSize(newSize)) {<a name="line.2929"></a> +<span class="sourceLineNo">2930</span> requestFlush();<a name="line.2930"></a> +<span class="sourceLineNo">2931</span> }<a name="line.2931"></a> +<span class="sourceLineNo">2932</span> }<a name="line.2932"></a> +<span class="sourceLineNo">2933</span> } finally {<a name="line.2933"></a> +<span class="sourceLineNo">2934</span> closeRegionOperation(op);<a name="line.2934"></a> +<span class="sourceLineNo">2935</span> }<a name="line.2935"></a> +<span class="sourceLineNo">2936</span> return batchOp.retCodeDetails;<a name="line.2936"></a> +<span class="sourceLineNo">2937</span> }<a name="line.2937"></a> +<span class="sourceLineNo">2938</span><a name="line.2938"></a> <span class="sourceLineNo">2939</span><a name="line.2939"></a> -<span class="sourceLineNo">2940</span><a name="line.2940"></a> -<span class="sourceLineNo">2941</span> private int doPreMutationHook(BatchOperationInProgress<?> batchOp)<a name="line.2941"></a> -<span class="sourceLineNo">2942</span> throws IOException {<a name="line.2942"></a> -<span class="sourceLineNo">2943</span> /* Run coprocessor pre hook outside of locks to avoid deadlock */<a name="line.2943"></a> -<span class="sourceLineNo">2944</span> WALEdit walEdit = new WALEdit();<a name="line.2944"></a> -<span class="sourceLineNo">2945</span> int cellCount = 0;<a name="line.2945"></a> -<span class="sourceLineNo">2946</span> if (coprocessorHost != null) {<a name="line.2946"></a> -<span class="sourceLineNo">2947</span> for (int i = 0 ; i < batchOp.operations.length; i++) {<a name="line.2947"></a> -<span class="sourceLineNo">2948</span> Mutation m = batchOp.getMutation(i);<a name="line.2948"></a> -<span class="sourceLineNo">2949</span> if (m instanceof Put) {<a name="line.2949"></a> -<span class="sourceLineNo">2950</span> if (coprocessorHost.prePut((Put) m, walEdit, m.getDurability())) {<a name="line.2950"></a> -<span class="sourceLineNo">2951</span> // pre hook says skip this Put<a name="line.2951"></a> -<span class="sourceLineNo">2952</span> // mark as success and skip in doMiniBatchMutation<a name="line.2952"></a> -<span class="sourceLineNo">2953</span> batchOp.retCodeDetails[i] = OperationStatus.SUCCESS;<a name="line.2953"></a> -<span class="sourceLineNo">2954</span> }<a name="line.2954"></a> -<span class="sourceLineNo">2955</span> } else if (m instanceof Delete) {<a name="line.2955"></a> -<span class="sourceLineNo">2956</span> Delete curDel = (Delete) m;<a name="line.2956"></a> -<span class="sourceLineNo">2957</span> if (curDel.getFamilyCellMap().isEmpty()) {<a name="line.2957"></a> -<span class="sourceLineNo">2958</span> // handle deleting a row case<a name="line.2958"></a> -<span class="sourceLineNo">2959</span> prepareDelete(curDel);<a name="line.2959"></a> -<span class="sourceLineNo">2960</span> }<a name="line.2960"></a> -<span class="sourceLineNo">2961</span> if (coprocessorHost.preDelete(curDel, walEdit, m.getDurability())) {<a name="line.2961"></a> -<span class="sourceLineNo">2962</span> // pre hook says skip this Delete<a name="line.2962"></a> -<span class="sourceLineNo">2963</span> // mark as success and skip in doMiniBatchMutation<a name="line.2963"></a> -<span class="sourceLineNo">2964</span> batchOp.retCodeDetails[i] = OperationStatus.SUCCESS;<a name="line.2964"></a> -<span class="sourceLineNo">2965</span> }<a name="line.2965"></a> -<span class="sourceLineNo">2966</span> } else {<a name="line.2966"></a> -<span class="sourceLineNo">2967</span> // In case of passing Append mutations along with the Puts and Deletes in batchMutate<a name="line.2967"></a> -<span class="sourceLineNo">2968</span> // mark the operation return code as failure so that it will not be considered in<a name="line.2968"></a> -<span class="sourceLineNo">2969</span> // the doMiniBatchMutation<a name="line.2969"></a> -<span class="sourceLineNo">2970</span> batchOp.retCodeDetails[i] = new OperationStatus(OperationStatusCode.FAILURE,<a name="line.2970"></a> -<span class="sourceLineNo">2971</span> "Put/Delete mutations only supported in batchMutate() now");<a name="line.2971"></a> -<span class="sourceLineNo">2972</span> }<a name="line.2972"></a> -<span class="sourceLineNo">2973</span> if (!walEdit.isEmpty()) {<a name="line.2973"></a> -<span class="sourceLineNo">2974</span> batchOp.walEditsFromCoprocessors[i] = walEdit;<a name="line.2974"></a> -<span class="sourceLineNo">2975</span> cellCount += walEdit.size();<a name="line.2975"></a> -<span class="sourceLineNo">2976</span> walEdit = new WALEdit();<a name="line.2976"></a> -<span class="sourceLineNo">2977</span> }<a name="line.2977"></a> -<span class="sourceLineNo">2978</span> }<a name="line.2978"></a> -<span class="sourceLineNo">2979</span> }<a name="line.2979"></a> -<span class="sourceLineNo">2980</span> return cellCount;<a name="line.2980"></a> -<span class="sourceLineNo">2981</span> }<a name="line.2981"></a> -<span class="sourceLineNo">2982</span><a name="line.2982"></a> -<span class="sourceLineNo">2983</span> @SuppressWarnings("unchecked")<a name="line.2983"></a> -<span class="sourceLineNo">2984</span> private long doMiniBatchMutation(BatchOperationInProgress<?> batchOp, int cellCount)<a name="line.2984"></a> -<span class="sourceLineNo">2985</span> throws IOException {<a name="line.2985"></a> -<span class="sourceLineNo">2986</span> boolean isInReplay = batchOp.isInReplay();<a name="line.2986"></a> -<span class="sourceLineNo">2987</span> // variable to note if all Put items are for the same CF -- metrics related<a name="line.2987"></a> -<span class="sourceLineNo">2988</span> boolean putsCfSetConsistent = true;<a name="line.2988"></a> -<span class="sourceLineNo">2989</span> //The set of columnFamilies first seen for Put.<a name="line.2989"></a> -<span class="sourceLineNo">2990</span> Set<byte[]> putsCfSet = null;<a name="line.2990"></a> -<span class="sourceLineNo">2991</span> // variable to note if all Delete items are for the same CF -- metrics related<a name="line.2991"></a> -<span class="sourceLineNo">2992</span> boolean deletesCfSetConsistent = true;<a name="line.2992"></a> -<span class="sourceLineNo">2993</span> //The set of columnFamilies first seen for Delete.<a name="line.2993"></a> -<span class="sourceLineNo">2994</span> Set<byte[]> deletesCfSet = null;<a name="line.2994"></a> -<span class="sourceLineNo">2995</span><a name="line.2995"></a> -<span class="sourceLineNo">2996</span> long currentNonceGroup = HConstants.NO_NONCE, currentNonce = HConstants.NO_NONCE;<a name="line.2996"></a> -<span class="sourceLineNo">2997</span> WALEdit walEdit = null;<a name="line.2997"></a> -<span class="sourceLineNo">2998</span> MultiVersionConcurrencyControl.WriteEntry writeEntry = null;<a name="line.2998"></a> -<span class="sourceLineNo">2999</span> long txid = 0;<a name="line.2999"></a> -<span class="sourceLineNo">3000</span> boolean doRollBackMemstore = false;<a name="line.3000"></a> -<span class="sourceLineNo">3001</span> boolean locked = false;<a name="line.3001"></a> -<span class="sourceLineNo">3002</span><a name="line.3002"></a> -<span class="sourceLineNo">3003</span> /** Keep track of the locks we hold so we can release them in finally clause */<a name="line.3003"></a> -<span class="sourceLineNo">3004</span> List<RowLock> acquiredRowLocks = Lists.newArrayListWithCapacity(batchOp.operations.length);<a name="line.3004"></a> -<span class="sourceLineNo">3005</span> // reference family maps directly so coprocessors can mutate them if desired<a name="line.3005"></a> -<span class="sourceLineNo">3006</span> Map<byte[], List<Cell>>[] familyMaps = new Map[batchOp.operations.length];<a name="line.3006"></a> -<span class="sourceLineNo">3007</span> // We try to set up a batch in the range [firstIndex,lastIndexExclusive)<a name="line.3007"></a> -<span class="sourceLineNo">3008</span> int firstIndex = batchOp.nextIndexToProcess;<a name="line.3008"></a> -<span class="sourceLineNo">3009</span> int lastIndexExclusive = firstIndex;<a name="line.3009"></a> -<span class="sourceLineNo">3010</span> boolean success = false;<a name="line.3010"></a> -<span class="sourceLineNo">3011</span> int noOfPuts = 0, noOfDeletes = 0;<a name="line.3011"></a> -<span class="sourceLineNo">3012</span> WALKey walKey = null;<a name="line.3012"></a> -<span class="sourceLineNo">3013</span> long mvccNum = 0;<a name="line.3013"></a> -<span class="sourceLineNo">3014</span> try {<a name="line.3014"></a> -<span class="sourceLineNo">3015</span> // ------------------------------------<a name="line.3015"></a> -<span class="sourceLineNo">3016</span> // STEP 1. Try to acquire as many locks as we can, and ensure<a name="line.3016"></a> -<span class="sourceLineNo">3017</span> // we acquire at least one.<a name="line.3017"></a> -<span class="sourceLineNo">3018</span> // ----------------------------------<a name="line.3018"></a> -<span class="sourceLineNo">3019</span> int numReadyToWrite = 0;<a name="line.3019"></a> -<span class="sourceLineNo">3020</span> long now = EnvironmentEdgeManager.currentTime();<a name="line.3020"></a> -<span class="sourceLineNo">3021</span> while (lastIndexExclusive < batchOp.operations.length) {<a name="line.3021"></a> -<span class="sourceLineNo">3022</span> Mutation mutation = batchOp.getMutation(lastIndexExclusive);<a name="line.3022"></a> -<span class="sourceLineNo">3023</span> boolean isPutMutation = mutation instanceof Put;<a name="line.3023"></a> -<span class="sourceLineNo">3024</span><a name="line.3024"></a> -<span class="sourceLineNo">3025</span> Map<byte[], List<Cell>> familyMap = mutation.getFamilyCellMap();<a name="line.3025"></a> -<span class="sourceLineNo">3026</span> // store the family map reference to allow for mutations<a name="line.3026"></a> -<span class="sourceLineNo">3027</span> familyMaps[lastIndexExclusive] = familyMap;<a name="line.3027"></a> -<span class="sourceLineNo">3028</span> // skip anything that "ran" already<a name="line.3028"></a> -<span class="sourceLineNo">3029</span> if (batchOp.retCodeDetails[lastIndexExclusive].getOperationStatusCode()<a name="line.3029"></a> -<span class="sourceLineNo">3030</span> != OperationStatusCode.NOT_RUN) {<a name="line.3030"></a> -<span class="sourceLineNo">3031</span> lastIndexExclusive++;<a name="line.3031"></a> -<span class="sourceLineNo">3032</span> continue;<a name="line.3032"></a> -<span class="sourceLineNo">3033</span> }<a name="line.3033"></a> -<span class="sourceLineNo">3034</span><a name="line.3034"></a> -<span class="sourceLineNo">3035</span> try {<a name="line.3035"></a> -<span class="sourceLineNo">3036</span> if (isPutMutation) {<a name="line.3036"></a> -<span class="sourceLineNo">3037</span> // Check the families in the put. If bad, skip this one.<a name="line.3037"></a> -<span class="sourceLineNo">3038</span> if (isInReplay) {<a name="line.3038"></a> -<span class="sourceLineNo">3039</span> removeNonExistentColumnFamilyForReplay(familyMap);<a name="line.3039"></a> -<span class="sourceLineNo">3040</span> } else {<a name="line.3040"></a> -<span class="sourceLineNo">3041</span> checkFamilies(familyMap.keySet());<a name="line.3041"></a> -<span class="sourceLineNo">3042</span> }<a name="line.3042"></a> -<span class="sourceLineNo">3043</span> checkTimestamps(mutation.getFamilyCellMap(), now);<a name="line.3043"></a> -<span class="sourceLineNo">3044</span> } else {<a name="line.3044"></a> -<span class="sourceLineNo">3045</span> prepareDelete((Delete) mutation);<a name="line.3045"></a> -<span class="sourceLineNo">3046</span> }<a name="line.3046"></a> -<span class="sourceLineNo">3047</span> checkRow(mutation.getRow(), "doMiniBatchMutation");<a name="line.3047"></a> -<span class="sourceLineNo">3048</span> } catch (NoSuchColumnFamilyException nscf) {<a name="line.3048"></a> -<span class="sourceLineNo">3049</span> LOG.warn("No such column family in batch mutation", nscf);<a name="line.3049"></a> -<span class="sourceLineNo">3050</span> batchOp.retCodeDetails[lastIndexExclusive] = new OperationStatus(<a name="line.3050"></a> -<span class="sourceLineNo">3051</span> OperationStatusCode.BAD_FAMILY, nscf.getMessage());<a name="line.3051"></a> -<span class="sourceLineNo">3052</span> lastIndexExclusive++;<a name="line.3052"></a> -<span class="sourceLineNo">3053</span> continue;<a name="line.3053"></a> -<span class="sourceLineNo">3054</span> } catch (FailedSanityCheckException fsce) {<a name="line.3054"></a> -<span class="sourceLineNo">3055</span> LOG.warn("Batch Mutation did not pass sanity check", fsce);<a name="line.3055"></a> -<span class="sourceLineNo">3056</span> batchOp.retCodeDetails[lastIndexExclusive] = new OperationStatus(<a name="line.3056"></a> -<span class="sourceLineNo">3057</span> OperationStatusCode.SANITY_CHECK_FAILURE, fsce.getMessage());<a name="line.3057"></a> -<span class="sourceLineNo">3058</span> lastIndexExclusive++;<a name="line.3058"></a> -<span class="sourceLineNo">3059</span> continue;<a name="line.3059"></a> -<span class="sourceLineNo">3060</span> } catch (WrongRegionException we) {<a name="line.3060"></a> -<span class="sourceLineNo">3061</span> LOG.warn("Batch mutation had a row that does not belong to this region", we);<a name="line.3061"></a> -<span class="sourceLineNo">3062</span> batchOp.retCodeDetails[lastIndexExclusive] = new OperationStatus(<a name="line.3062"></a> -<span class="sourceLineNo">3063</span> OperationStatusCode.SANITY_CHECK_FAILURE, we.getMessage());<a name="line.3063"></a> -<span class="sourceLineNo">3064</span> lastIndexExclusive++;<a name="line.3064"></a> -<span class="sourceLineNo">3065</span> continue;<a name="line.3065"></a> -<span class="sourceLineNo">3066</span> }<a name="line.3066"></a> -<span class="sourceLineNo">3067</span><a name="line.3067"></a> -<span class="sourceLineNo">3068</span> // If we haven't got any rows in our batch, we should block to<a name="line.3068"></a> -<span class="sourceLineNo">3069</span> // get the next one.<a name="line.3069"></a> -<span class="sourceLineNo">3070</span> RowLock rowLock = null;<a name="line.3070"></a> -<span class="sourceLineNo">3071</span> try {<a name="line.3071"></a> -<span class="sourceLineNo">3072</span> rowLock = getRowLock(mutation.getRow(), true);<a name="line.3072"></a> -<span class="sourceLineNo">3073</span> } catch (IOException ioe) {<a name="line.3073"></a> -<span class="sourceLineNo">3074</span> LOG.warn("Failed getting lock in batch put, row="<a name="line.3074"></a> -<span class="sourceLineNo">3075</span> + Bytes.toStringBinary(mutation.getRow()), ioe);<a name="line.3075"></a> -<span class="sourceLineNo">3076</span> }<a name="line.3076"></a> -<span class="sourceLineNo">3077</span> if (rowLock == null) {<a name="line.3077"></a> -<span class="sourceLineNo">3078</span> // We failed to grab another lock<a name="line.3078"></a> -<span class="sourceLineNo">3079</span> break; // stop acquiring more rows for this batch<a name="line.3079"></a> -<span class="sourceLineNo">3080</span> } else {<a name="line.3080"></a> -<span class="sourceLineNo">3081</span> acquiredRowLocks.add(rowLock);<a name="line.3081"></a> -<span class="sourceLineNo">3082</span> }<a name="line.3082"></a> -<span class="sourceLineNo">3083</span><a name="line.3083"></a> -<span class="sourceLineNo">3084</span> lastIndexExclusive++;<a name="line.3084"></a> -<span class="sourceLineNo">3085</span> numReadyToWrite++;<a name="line.3085"></a> -<span class="sourceLineNo">3086</span><a name="line.3086"></a> -<span class="sourceLineNo">3087</span> if (isPutMutation) {<a name="line.3087"></a> -<span class="sourceLineNo">3088</span> // If Column Families stay consistent through out all of the<a name="line.3088"></a> -<span class="sourceLineNo">3089</span> // individual puts then metrics can be reported as a mutliput across<a name="line.3089"></a> -<span class="sourceLineNo">3090</span> // column families in the first put.<a name="line.3090"></a> -<span class="sourceLineNo">3091</span> if (putsCfSet == null) {<a name="line.3091"></a> -<span class="sourceLineNo">3092</span> putsCfSet = mutation.getFamilyCellMap().keySet();<a name="line.3092"></a> -<span class="sourceLineNo">3093</span> } else {<a name="line.3093"></a> -<span class="sourceLineNo">3094</span> putsCfSetConsistent = putsCfSetConsistent<a name="line.3094"></a> -<span class="sourceLineNo">3095</span> && mutation.getFamilyCellMap().keySet().equals(putsCfSet);<a name="line.3095"></a> -<span class="sourceLineNo">3096</span> }<a name="line.3096"></a> -<span class="sourceLineNo">3097</span> } else {<a name="line.3097"></a> -<span class="sourceLineNo">3098</span> if (deletesCfSet == null) {<a name="line.3098"></a> -<span class="sourceLineNo">3099</span> deletesCfSet = mutation.getFamilyCellMap().keySet();<a name="line.3099"></a> -<span class="sourceLineNo">3100</span> } else {<a name="line.3100"></a> -<span class="sourceLineNo">3101</span> deletesCfSetConsistent = deletesCfSetConsistent<a name="line.3101"></a> -<span class="sourceLineNo">3102</span> && mutation.getFamilyCellMap().keySet().equals(deletesCfSet);<a name="line.3102"></a> -<span class="sourceLineNo">3103</span> }<a name="line.3103"></a> -<span class="sourceLineNo">3104</span> }<a name="line.3104"></a> -<span class="sourceLineNo">3105</span> }<a name="line.3105"></a> -<span class="sourceLineNo">3106</span><a name="line.3106"></a> -<span class="sourceLineNo">3107</span> // we should record the timestamp only after we have acquired the rowLock,<a name="line.3107"></a> -<span class="sourceLineNo">3108</span> // otherwise, newer puts/deletes are not guaranteed to have a newer timestamp<a name="line.3108"></a> -<span class="sourceLineNo">3109</span> now = EnvironmentEdgeManager.currentTime();<a name="line.3109"></a> -<span class="sourceLineNo">3110</span> byte[] byteNow = Bytes.toBytes(now);<a name="line.3110"></a> -<span class="sourceLineNo">3111</span><a name="line.3111"></a> -<span class="sourceLineNo">3112</span> // Nothing to put/delete -- an exception in the above such as NoSuchColumnFamily?<a name="line.3112"></a> -<span class="sourceLineNo">3113</span> if (numReadyToWrite <= 0) return 0L;<a name="line.3113"></a> -<span class="sourceLineNo">3114</span><a name="line.3114"></a> -<span class="sourceLineNo">3115</span> // We've now grabbed as many mutations off the list as we can<a name="line.3115"></a> -<span class="sourceLineNo">3116</span><a name="line.3116"></a> -<span class="sourceLineNo">3117</span> // ------------------------------------<a name="line.3117"></a> -<span class="sourceLineNo">3118</span> // STEP 2. Update any LATEST_TIMESTAMP timestamps<a name="line.3118"></a> -<span class="sourceLineNo">3119</span> // ----------------------------------<a name="line.3119"></a> -<span class="sourceLineNo">3120</span> for (int i = firstIndex; !isInReplay && i < lastIndexExclusive; i++) {<a name="line.3120"></a> -<span class="sourceLineNo">3121</span> // skip invalid<a name="line.3121"></a> -<span class="sourceLineNo">3122</span> if (batchOp.retCodeDetails[i].getOperationStatusCode()<a name="line.3122"></a> -<span class="sourceLineNo">3123</span> != OperationStatusCode.NOT_RUN) continue;<a name="line.3123"></a> -<span class="sourceLineNo">3124</span><a name="line.3124"></a> -<span class="sourceLineNo">3125</span> Mutation mutation = batchOp.getMutation(i);<a name="line.3125"></a> -<span class="sourceLineNo">3126</span> if (mutation instanceof Put) {<a name="line.3126"></a> -<span class="sourceLineNo">3127</span> updateCellTimestamps(familyMaps[i].values(), byteNow);<a name="line.3127"></a> -<span class="sourceLineNo">3128</span> noOfPuts++;<a name="line.3128"></a> -<span class="sourceLineNo">3129</span> } else {<a name="line.3129"></a> -<span class="sourceLineNo">3130</span> prepareDeleteTimestamps(mutation, familyMaps[i], byteNow);<a name="line.3130"></a> -<span class="sourceLineNo">3131</span> noOfDeletes++;<a name="line.3131"></a> -<span class="sourceLineNo">3132</span> }<a name="line.3132"></a> -<span class="sourceLineNo">3133</span> rewriteCellTags(familyMaps[i], mutation);<a name="line.3133"></a> -<span class="sourceLineNo">3134</span> for (List<Cell> cells : familyMaps[i].values()) {<a name="line.3134"></a> -<span class="sourceLineNo">3135</span> cellCount += cells.size();<a name="line.3135"></a> -<span class="sourceLineNo">3136</span> }<a name="line.3136"></a> -<span class="sourceLineNo">3137</span> }<a name="line.3137"></a> -<span class="sourceLineNo">3138</span> walEdit = new WALEdit(cellCount);<a name="line.3138"></a> -<span class="sourceLineNo">3139</span> lock(this.updatesLock.readLock(), numReadyToWrite);<a name="line.3139"></a> -<span class="sourceLineNo">3140</span> locked = true;<a name="line.3140"></a> -<span class="sourceLineNo">3141</span><a name="line.3141"></a> -<span class="sourceLineNo">3142</span> // calling the pre CP hook for batch mutation<a name="line.3142"></a> -<span class="sourceLineNo">3143</span> if (!isInReplay && coprocessorHost != null) {<a name="line.3143"></a> -<span class="sourceLineNo">3144</span> MiniBatchOperationInProgress<Mutation> miniBatchOp =<a name="line.3144"></a> -<span class="sourceLineNo">3145</span> new MiniBatchOperationInProgress<Mutation>(batchOp.getMutationsForCoprocs(),<a name="line.3145"></a> -<span class="sourceLineNo">3146</span> batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive);<a name="line.3146"></a> -<span class="sourceLineNo">3147</span> if (coprocessorHost.preBatchMutate(miniBatchOp)) return 0L;<a name="line.3147"></a> -<span class="sourceLineNo">3148</span> }<a name="line.3148"></a> -<span class="sourceLineNo">3149</span><a name="line.3149"></a> -<span class="sourceLineNo">3150</span> // ------------------------------------<a name="line.3150"></a> -<span class="sourceLineNo">3151</span> // STEP 3. Build WAL edit<a name="line.3151"></a> -<span class="sourceLineNo">3152</span> // ----------------------------------<a name="line.3152"></a> -<span class="sourceLineNo">3153</span> Durability durability = Durability.USE_DEFAULT;<a name="line.3153"></a> -<span class="sourceLineNo">3154</span> for (int i = firstIndex; i < lastIndexExclusive; i++) {<a name="line.3154"></a> -<span class="sourceLineNo">3155</span> // Skip puts that were determined to be invalid during preprocessing<a name="line.3155"></a> -<span class="sourceLineNo">3156</span> if (batchOp.retCodeDetails[i].getOperationStatusCode() != OperationStatusCode.NOT_RUN) {<a name="line.3156"></a> -<span class="sourceLineNo">3157</span> continue;<a name="line.3157"></a> -<span class="sourceLineNo">3158</span> }<a name="line.3158"></a> -<span class="sourceLineNo">3159</span><a name="line.3159"></a> -<span class="sourceLineNo">3160</span> Mutation m = batchOp.getMutation(i);<a name="line.3160"></a> -<span class="sourceLineNo">3161</span> Durability tmpDur = getEffectiveDurability(m.getDurability());<a name="line.3161"></a> -<span class="sourceLineNo">3162</span> if (tmpDur.ordinal() > durability.ordinal()) {<a name="line.3162"></a> -<span class="sourceLineNo">3163</span> durability = tmpDur;<a name="line.3163"></a> -<span class="sourceLineNo">3164</span> }<a name="line.3164"></a> -<span class="sourceLineNo">3165</span> if (tmpDur == Durability.SKIP_WAL) {<a name="line.3165"></a> -<span class="sourceLineNo">3166</span> recordMutationWithoutWal(m.getFamilyCellMap());<a name="line.3166"></a> -<span class="sourceLineNo">3167</span> continue;<a name="line.3167"></a> -<span class="sourceLineNo">3168</span> }<a name="line.3168"></a> -<span class="sourceLineNo">3169</span><a name="line.3169"></a> -<span class="sourceLineNo">3170</span> long nonceGroup = batchOp.getNonceGroup(i), nonce = batchOp.getNonce(i);<a name="line.3170"></a> -<span class="sourceLineNo">3171</span> // In replay, the batch may contain multiple nonces. If so, write WALEdit for each.<a name="line.3171"></a> -<span class="sourceLineNo">3172</span> // Given how nonces are originally written, these should be contiguous.<a name="line.3172"></a> -<span class="sourceLineNo">3173</span> // They don't have to be, it will still work, just write more WALEdits than needed.<a name="line.3173"></a> -<span class="sourceLineNo">3174</span> if (nonceGroup != currentNonceGroup || nonce != currentNonce) {<a name="line.3174"></a> -<span class="sourceLineNo">3175</span> if (walEdit.size() > 0) {<a name="line.3175"></a> -<span class="sourceLineNo">3176</span> assert isInReplay;<a name="line.3176"></a> -<span class="sourceLineNo">3177</span> if (!isInReplay) {<a name="line.3177"></a> -<span class="sourceLineNo">3178</span> throw new IOException("Multiple nonces per batch and not in replay");<a name="line.3178"></a> -<span class="sourceLineNo">3179</span> }<a name="line.3179"></a> -<span class="sourceLineNo">3180</span> // txid should always increase, so having the one from the last call is ok.<a name="line.3180"></a> -<span class="sourceLineNo">3181</span> // we use HLogKey here instead of WALKey directly to support legacy coprocessors.<a name="line.3181"></a> -<span class="sourceLineNo">3182</span> walKey = new ReplayHLogKey(this.getRegionInfo().getEncodedNameAsBytes(),<a name="line.3182"></a> -<span class="sourceLineNo">3183</span> this.htableDescriptor.getTableName(), now, m.getClusterIds(),<a name="line.3183"></a> -<span class="sourceLineNo">3184</span> currentNonceGroup, currentNonce, mvcc);<a name="line.3184"></a> -<span class="sourceLineNo">3185</span> txid = this.wal.append(this.htableDescriptor, this.getRegionInfo(), walKey,<a name="line.3185"></a> -<span class="sourceLineNo">3186</span> walEdit, true);<a name="line.3186"></a> -<span class="sourceLineNo">3187</span> walEdit = new WALEdit(isInReplay);<a name="line.3187"></a> -<span class="sourceLineNo">3188</span> walKey = null;<a name="line.3188"></a> -<span class="sourceLineNo">3189</span> }<a name="line.3189"></a> -<span class="sourceLineNo">3190</span> currentNonceGroup = nonceGroup;<a name="line.3190"></a> -<span class="sourceLineNo">3191</span> currentNonce = nonce;<a name="line.3191"></a> -<span class="sourceLineNo">3192</span> }<a name="line.3192"></a> -<span class="sourceLineNo">3193</span><a name="line.3193"></a> -<span class="sourceLineNo">3194</span> // Add WAL edits by CP<a name="line.3194"></a> -<span class="sourceLineNo">3195</span> WALEdit fromCP = batchOp.walEditsFromCoprocessors[i];<a name="line.3195"></a> -<span class="sourceLineNo">3196</span> if (fromCP != null) {<a name="line.3196"></a> -<span class="sourceLineNo">3197</span> for (Cell cell : fromCP.getCells()) {<a name="line.3197"></a> -<span class="sourceLineNo">3198</span> walEdit.add(cell);<a name="line.3198"></a> -<span class="sourceLineNo">3199</span> }<a name="line.3199"></a> -<span class="sourceLineNo">3200</span> }<a name="line.3200"></a> -<span class="sourceLineNo">3201</span> addFamilyMapToWALEdit(familyMaps[i], walEdit);<a name="line.3201"></a> -<span class="sourceLineNo">3202</span> }<a name="line.3202"></a> -<span class="sourceLineNo">3203</span><a name="line.3203"></a> -<span class="sourceLineNo">3204</span> // -------------------------<a name="line.3204"></a> -<span class="sourceLineNo">3205</span> // STEP 4. Append the final edit to WAL. Do not sync wal.<a name="line.3205"></a> -<span class="sourceLineNo">3206</span> // -------------------------<a name="line.3206"></a> -<span class="sourceLineNo">3207</span> Mutation mutation = batchOp.getMutation(firstIndex);<a name="line.3207"></a> -<span class="sourceLineNo">3208</span> if (isInReplay) {<a name="line.3208"></a> -<span class="sourceLineNo">3209</span> // use wal key from the original<a name="line.3209"></a> -<span class="sourceLineNo">3210</span> walKey = new ReplayHLogKey(this.getRegionInfo().getEncodedNameAsBytes(),<a name="line.3210"></a> -<span class="sourceLineNo">3211</span> this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now,<a name="line.3211"></a> -<span class="sourceLineNo">3212</span> mutation.getClusterIds(), currentNonceGroup, currentNonce, mvcc);<a name="line.3212"></a> -<span class="sourceLineNo">3213</span> long replaySeqId = batchOp.getReplaySequenceId();<a name="line.3213"></a> -<span class="sourceLineNo">3214</span> walKey.setOrigLogSeqNum(replaySeqId);<a name="line.3214"></a> -<span class="sourceLineNo">3215</span> }<a name="line.3215"></a> -<span class="sourceLineNo">3216</span> if (walEdit.size() > 0) {<a name="line.3216"></a> -<span class="sourceLineNo">3217</span> if (!isInReplay) {<a name="line.3217"></a> -<span class="sourceLineNo">3218</span> // we use HLogKey here instead of WALKey directly to support legacy coprocessors.<a name="line.3218"></a> -<span class="sourceLineNo">3219</span> walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),<a name="line.3219"></a> -<span class="sourceLineNo">3220</span> this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now,<a name="line.3220"></a> -<span class="sourceLineNo">3221</span> mutation.getClusterIds(), currentNonceGroup, currentNonce, mvcc);<a name="line.3221"></a> -<span class="sourceLineNo">3222</span> }<a name="line.3222"></a> -<span class="sourceLineNo">3223</span> txid = this.wal.append(this.htableDescriptor, this.getRegionInfo(), walKey, walEdit, true);<a name="line.3223"></a> +<span class="sourceLineNo">2940</span> private void doPreMutationHook(BatchOperationInProgress<?> batchOp)<a name="line.2940"></a> +<span class="sourceLineNo">2941</span> throws IOException {<a name="line.2941"></a> +<span class="sourceLineNo">2942</span> /* Run coprocessor pre hook outside of locks to avoid deadlock */<a name="line.2942"></a> +<span class="sourceLineNo">2943</span> WALEdit walEdit = new WALEdit();<a name="line.2943"></a> +<span class="sourceLineNo">2944</span> if (coprocessorHost != null) {<a name="line.2944"></a> +<span class="sourceLineNo">2945</span> for (int i = 0 ; i < batchOp.operations.length; i++) {<a name="line.2945"></a> +<span class="sourceLineNo">2946</span> Mutation m = batchOp.getMutation(i);<a name="line.2946"></a> +<span class="sourceLineNo">2947</span> if (m instanceof Put) {<a name="line.2947"></a> +<span class="sourceLineNo">2948</span> if (coprocessorHost.prePut((Put) m, walEdit, m.getDurability())) {<a name="line.2948"></a> +<span class="sourceLineNo">2949</span> // pre hook says skip this Put<a name="line.2949"></a> +<span class="sourceLineNo">2950</span> // mark as success and skip in doMiniBatchMutation<a name="line.2950"></a> +<span class="sourceLineNo">2951</span> batchOp.retCodeDetails[i] = OperationStatus.SUCCESS;<a name="line.2951"></a> +<span class="sourceLineNo">2952</span> }<a name="line.2952"></a> +<span class="sourceLineNo">2953</span> } else if (m instanceof Delete) {<a name="line.2953"></a> +<span class="sourceLineNo">2954</span> Delete curDel = (Delete) m;<a name="line.2954"></a> +<span class="sourceLineNo">2955</span> if (curDel.getFamilyCellMap().isEmpty()) {<a name="line.2955"></a> +<span class="sourceLineNo">2956</span> // handle deleting a row case<a name="line.2956"></a> +<span class="sourceLineNo">2957</span> prepareDelete(curDel);<a name="line.2957"></a> +<span class="sourceLineNo">2958</span> }<a name="line.2958"></a> +<span class="sourceLineNo">2959</span> if (coprocessorHost.preDelete(curDel, walEdit, m.getDurability())) {<a name="line.2959"></a> +<span class="sourceLineNo">2960</span> // pre hook says skip this Delete<a name="line.2960"></a> +<span class="sourceLineNo">2961</span> // mark as success and skip in doMiniBatchMutation<a name="line.2961"></a> +<span class="sourceLineNo">2962</span> batchOp.retCodeDetails[i] = OperationStatus.SUCCESS;<a name="line.2962"></a> +<span class="sourceLineNo">2963</span> }<a name="line.2963"></a> +<span class="sourceLineNo">2964</span> } else {<a name="line.2964"></a> +<span class="sourceLineNo">2965</span> // In case of passing Append mutations along with the Puts and Deletes in batchMutate<a name="line.2965"></a> +<span class="sourceLineNo">2966</span> // mark the operation return code as failure so that it will not be considered in<a name="line.2966"></a> +<span class="sourceLineNo">2967</span> // the doMiniBatchMutation<a name="line.2967"></a> +<span class="sourceLineNo">2968</span> batchOp.retCodeDetails[i] = new OperationStatus(OperationStatusCode.FAILURE,<a name="line.2968"></a> +<span class="sourceLineNo">2969</span> "Put/Delete mutations only supported in batchMutate() now");<a name="line.2969"></a> +<span class="sourceLineNo">2970</span> }<a name="line.2970"></a> +<span class="sourceLineNo">2971</span> if (!walEdit.isEmpty()) {<a name="line.2971"></a> +<span class="sourceLineNo">2972</span> batchOp.walEditsFromCoprocessors[i] = walEdit;<a name="line.2972"></a> +<span class="sourceLineNo">2973</span> walEdit = new WALEdit();<a name="line.2973"></a> +<span class="sourceLineNo">2974</span> }<a name="line.2974"></a> +<span class="sourceLineNo">2975</span> }<a name="line.2975"></a> +<span class="sourceLineNo">2976</span> }<a name="line.2976"></a> +<span class="sourceLineNo">2977</span> }<a name="line.2977"></a> +<span class="sourceLineNo">2978</span><a name="line.2978"></a> +<span class="sourceLineNo">2979</span> @SuppressWarnings("unchecked")<a name="line.2979"></a> +<span class="sourceLineNo">2980</span> private long doMiniBatchMutation(BatchOperationInProgress<?> batchOp) throws IOException {<a name="line.2980"></a> +<span class="sourceLineNo">2981</span> boolean isInReplay = batchOp.isInReplay();<a name="line.2981"></a> +<span class="sourceLineNo">2982</span> // variable to note if all Put items are for the same CF -- metrics related<a name="line.2982"></a> +<span class="sourceLineNo">2983</span> boolean putsCfSetConsistent = true;<a name="line.2983"></a> +<span class="sourceLineNo">2984</span> //The set of columnFamilies first seen for Put.<a name="line.2984"></a> +<span class="sourceLineNo">2985</span> Set<byte[]> putsCfSet = null;<a name="line.2985"></a> +<span class="sourceLineNo">2986</span> // variable to note if all Delete items are for the same CF -- metrics related<a name="line.2986"></a> +<span class="sourceLineNo">2987</span> boolean deletesCfSetConsistent = true;<a name="line.2987"></a> +<span class="sourceLineNo">2988</span> //The set of columnFamilies first seen for Delete.<a name="line.2988"></a> +<span class="sourceLineNo">2989</span> Set<byte[]> deletesCfSet = null;<a name="line.2989"></a> +<span class="sourceLineNo">2990</span><a name="line.2990"></a> +<span class="sourceLineNo">2991</span> long currentNonceGroup = HConstants.NO_NONCE, currentNonce = HConstants.NO_NONCE;<a name="line.2991"></a> +<span class="sourceLineNo">2992</span> WALEdit walEdit = new WALEdit(isInReplay);<a name="line.2992"></a> +<span class="sourceLineNo">2993</span> MultiVersionConcurrencyControl.WriteEntry writeEntry = null;<a name="line.2993"></a> +<span class="sourceLineNo">2994</span> long txid = 0;<a name="line.2994"></a> +<span class="sourceLineNo">2995</span> boolean doRollBackMemstore = false;<a name="line.2995"></a> +<span class="sourceLineNo">2996</span> boolean locked = false;<a name="line.2996"></a> +<span class="sourceLineNo">2997</span><a name="line.2997"></a> +<span class="sourceLineNo">2998</span> /** Keep track of the locks we hold so we can release them in finally clause */<a name="line.2998"></a> +<span class="sourceLineNo">2999</span> List<RowLock> acquiredRowLocks = Lists.newArrayListWithCapacity(batchOp.operations.length);<a name="line.2999"></a> +<span class="sourceLineNo">3000</span> // reference family maps directly so coprocessors can mutate them if desired<a name="line.3000"></a> +<span class="sourceLineNo">3001</span> Map<byte[], List<Cell>>[] familyMaps = new Map[batchOp.operations.length];<a name="line.3001"></a> +<span class="sourceLineNo">3002</span> // We try to set up a batch in the range [firstIndex,lastIndexExclusive)<a name="line.3002"></a> +<span class="sourceLineNo">3003</span> int firstIndex = batchOp.nextIndexToProcess;<a name="line.3003"></a> +<span class="sourceLineNo">3004</span> int lastIndexExclusive = firstIndex;<a name="line.3004"></a> +<span class="sourceLineNo">3005</span> boolean success = false;<a name="line.3005"></a> +<span class="sourceLineNo">3006</span> int noOfPuts = 0, noOfDeletes = 0;<a name="line.3006"></a> +<span class="sourceLineNo">3007</span> WALKey walKey = null;<a name="line.3007"></a> +<span class="sourceLineNo">3008</span> long mvccNum = 0;<a name="line.3008"></a> +<span class="sourceLineNo">3009</span> try {<a name="line.3009"></a> +<span class="sourceLineNo">3010</span> // ------------------------------------<a name="line.3010"></a> +<span class="sourceLineNo">3011</span> // STEP 1. Try to acquire as many locks as we can, and ensure<a name="line.3011"></a> +<span class="sourceLineNo">3012</span> // we acquire at least one.<a name="line.3012"></a> +<span class="sourceLineNo">3013</span> // ----------------------------------<a name="line.3013"></a> +<span class="sourceLineNo">3014</span> int numReadyToWrite = 0;<a name="line.3014"></a> +<span class="sourceLineNo">3015</span> long now = EnvironmentEdgeManager.currentTime();<a name="line.3015"></a> +<span class="sourceLineNo">3016</span> while (lastIndexExclusive < batchOp.operations.length) {<a name="line.3016"></a> +<span class="sourceLineNo">3017</span> Mutation mutation = batchOp.getMutation(lastIndexExclusive);<a name="line.3017"></a> +<span class="sourceLineNo">3018</span> boolean isPutMutation = mutation instanceof Put;<a name="line.3018"></a> +<span class="sourceLineNo">3019</span><a name="line.3019"></a> +<span class="sourceLineNo">3020</span> Map<byte[], List<Cell>> familyMap = mutation.getFamilyCellMap();<a name="line.3020"></a> +<span class="sourceLineNo">3021</span> // store the family map reference to allow for mutations<a name="line.3021"></a> +<span class="sourceLineNo">3022</span> familyMaps[lastIndexExclusive] = familyMap;<a name="line.3022"></a> +<span class="sourceLineNo">3023</span><a name="line.3023"></a> +<span class="sourceLineNo">3024</span> // skip anything that "ran" already<a name="line.3024"></a> +<span class="sourceLineNo">3025</span> if (batchOp.retCodeDetails[lastIndexExclusive].getOperationStatusCode()<a name="line.3025"></a> +<span class="sourceLineNo">3026</span> != OperationStatusCode.NOT_RUN) {<a name="line.3026"></a> +<span class="sourceLineNo">3027</span> lastIndexExclusive++;<a name="line.3027"></a> +<span class="sourceLineNo">3028</span> continue;<a name="line.3028"></a> +<span class="sourceLineNo">3029</span> }<a name="line.3029"></a> +<span class="sourceLineNo">3030</span><a name="line.3030"></a> +<span class="sourceLineNo">3031</span> try {<a name="line.3031"></a> +<span class="sourceLineNo">3032</span> if (isPutMutation) {<a name="line.3032"></a> +<span class="sourceLineNo">3033</span> // Check the families in the put. If bad, skip this one.<a name="line.3033"></a> +<span class="sourceLineNo">3034</span> if (isInReplay) {<a name="line.3034"></a> +<span class="sourceLineNo">3035</span> removeNonExistentColumnFamilyForReplay(familyMap);<a name="line.3035"></a> +<span class="sourceLineNo">3036</span> } else {<a name="line.3036"></a> +<span class="sourceLineNo">3037</span> checkFamilies(familyMap.keySet());<a name="line.3037"></a> +<span class="sourceLineNo">3038</span> }<a name="line.3038"></a> +<span class="sourceLineNo">3039</span> checkTimestamps(mutation.getFamilyCellMap(), now);<a name="line.3039"></a> +<span class="sourceLineNo">3040</span> } else {<a name="line.3040"></a> +<span class="sourceLineNo">3041</span> prepareDelete((Delete) mutation);<a name="line.3041"></a> +<span class="sourceLineNo">3042</span> }<a name="line.3042"></a> +<span class="sourceLineNo">3043</span> checkRow(mutation.getRow(), "doMiniBatchMutation");<a name="line.3043"></a> +<span class="sourceLineNo">3044</span> } catch (NoSuchColumnFamilyException nscf) {<a name="line.3044"></a> +<span class="sourceLineNo">3045</span> LOG.warn("No such column family in batch mutation", nscf);<a name="line.3045"></a> +<span class="sourceLineNo">3046</span> batchOp.retCodeDetails[lastIndexExclusive] = new OperationStatus(<a name="line.3046"></a> +<span class="sourceLineNo">3047</span> OperationStatusCode.BAD_FAMILY, nscf.getMessage());<a name="line.3047"></a> +<span class="sourceLineNo">3048</span> lastIndexExclusive++;<a name="line.3048"></a> +<span class="sourceLineNo">3049</span> continue;<a name="line.3049"></a> +<span class="sourceLineNo">3050</span> } catch (FailedSanityCheckException fsce) {<a name="line.3050"></a> +<span class="sourceLineNo">3051</span> LOG.warn("Batch Mutation did not pass sanity check", fsce);<a name="line.3051"></a> +<span class="sourceLineNo">3052</span> batchOp.retCodeDetails[lastIndexExclusive] = new OperationStatus(<a name="line.3052"></a> +<span class="sourceLineNo">3053</span> OperationStatusCode.SANITY_CHECK_FAILURE, fsce.getMessage());<a name="line.3053"></a> +<span class="sourceLineNo">3054</span> lastIndexExclusive++;<a name="line.3054"></a> +<span class="sourceLineNo">3055</span> continue;<a name="line.3055"></a> +<span class="sourceLineNo">3056</span> } catch (WrongRegionException we) {<a name="line.3056"></a> +<span class="sourceLineNo">3057</span> LOG.warn("Batch mutation had a row that does not belong to this region", we);<a name="line.3057"></a> +<span class="sourceLineNo">3058</span> batchOp.retCodeDetails[lastIndexExclusive] = new OperationStatus(<a name="line.3058"></a> +<span class="sourceLineNo">3059</span> OperationStatusCode.SANITY_CHECK_FAILURE, we.getMessage());<a name="line.3059"></a> +<span class="sourceLineNo">3060</span> lastIndexExclusive++;<a name="line.3060"></a> +<span class="sourceLineNo">3061</span> continue;<a name="line.3061"></a> +<span class="sourceLineNo">3062</span> }<a name="line.3062"></a> +<span class="sourceLineNo">3063</span><a name="line.3063"></a> +<span class="sourceLineNo">3064</span> // If we haven't got any rows in our batch, we should block to<a name="line.3064"></a> +<span class="sourceLineNo">3065</span> // get the next one.<a name="line.3065"></a> +<span class="sourceLineNo">3066</span> RowLock rowLock = null;<a name="line.3066"></a> +<span class="sourceLineNo">3067</span> try {<a name="line.3067"></a> +<span class="sourceLineNo">3068</span> rowLock = getRowLock(mutation.getRow(), true);<a name="line.3068"></a> +<span class="sourceLineNo">3069</span> } catch (IOException ioe) {<a name="line.3069"></a> +<span class="sourceLineNo">3070</span> LOG.warn("Failed getting lock in batch put, row="<a name="line.3070"></a> +<span class="sourceLineNo">3071</span> + Bytes.toStringBinary(mutation.getRow()), ioe);<a name="line.3071"></a> +<span class="sourceLineNo">3072</span> }<a name="line.3072"></a> +<span class="sourceLineNo">3073</span> if (rowLock == null) {<a name="line.3073"></a> +<span class="sourceLineNo">3074</span> // We failed to grab another lock<a name="line.3074"></a> +<span class="sourceLineNo">3075</span> break; // stop acquiring more rows for this batch<a name="line.3075"></a> +<span class="sourceLineNo">3076</span> } else {<a name="line.3076"></a> +<span class="sourceLineNo">3077</span> acquiredRowLocks.add(rowLock);<a name="line.3077"></a> +<span class="sourceLineNo">3078</span> }<a name="line.3078"></a> +<span class="sourceLineNo">3079</span><a name="line.3079"></a> +<span class="sourceLineNo">3080</span> lastIndexExclusive++;<a name="line.3080"></a> +<span class="sourceLineNo">3081</span> numReadyToWrite++;<a name="line.3081"></a> +<span class="sourceLineNo">3082</span><a name="line.3082"></a> +<span class="sourceLineNo">3083</span> if (isPutMutation) {<a name="line.3083"></a> +<span class="sourceLineNo">3084</span> // If Column Families stay consistent through out all of the<a name="line.3084"></a> +<span class="sourceLineNo">3085</span> // individual puts then metrics can be reported as a mutliput across<a name="line.3085"></a> +<span class="sourceLineNo">3086</span> // column families in the first put.<a name="line.3086"></a> +<span class="sourceLineNo">3087</span> if (putsCfSet == null) {<a name="line.3087"></a> +<span class="sourceLineNo">3088</span> putsCfSet = mutation.getFamilyCellMap().keySet();<a name="line.3088"></a> +<span class="sourceLineNo">3089</span> } else {<a name="line.3089"></a> +<span class="sourceLineNo">3090</span> putsCfSetConsistent = putsCfSetConsistent<a name="line.3090"></a> +<span class="sourceLineNo">3091</span> && mutation.getFamilyCellMap().keySet().equals(putsCfSet);<a name="line.3091"></a> +<span class="sourceLineNo">3092</span> }<a name="line.3092"></a> +<span class="sourceLineNo">3093</span> } else {<a name="line.3093"></a> +<span class="sourceLineNo">3094</span> if (deletesCfSet == null) {<a name="line.3094"></a> +<span class="sourceLineNo">3095</span> deletesCfSet = mutation.getFamilyCellMap().keySet();<a name="line.3095"></a> +<span class="sourceLineNo">3096</span> } else {<a name="line.3096"></a> +<span class="sourceLineNo">3097</span> deletesCfSetConsistent = deletesCfSetConsistent<a name="line.3097"></a> +<span class="sourceLineNo">3098</span> && mutation.getFamilyCellMap().keySet().equals(deletesCfSet);<a name="line.3098"></a> +<span class="sourceLineNo">3099</span> }<a name="line.3099"></a> +<span class="sourceLineNo">3100</span> }<a name="line.3100"></a> +<span class="sourceLineNo">3101</span> }<a name="line.3101"></a> +<span class="sourceLineNo">3102</span><a name="line.3102"></a> +<span class="sourceLineNo">3103</span> // we should record the timestamp only after we have acquired the rowLock,<a name="line.3103"></a> +<span class="sourceLineNo">3104</span> // otherwise, newer puts/deletes are not guaranteed to have a newer timestamp<a name="line.3104"></a> +<span class="sourceLineNo">3105</span> now = EnvironmentEdgeManager.currentTime();<a name="line.3105"></a> +<span class="sourceLineNo">3106</span> byte[] byteNow = Bytes.toBytes(now);<a name="line.3106"></a> +<span class="sourceLineNo">3107</span><a name="line.3107"></a> +<span class="sourceLineNo">3108</span> // Nothing to put/delete -- an exception in the above such as NoSuchColumnFamily?<a name="line.3108"></a> +<span class="sourceLineNo">3109</span> if (numReadyToWrite <= 0) return 0L;<a name="line.3109"></a> +<span class="sourceLineNo">3110</span><a name="line.3110"></a> +<span class="sourceLineNo">3111</span> // We've now grabbed as many mutations off the list as we can<a name="line.3111"></a> +<span class="sourceLineNo">3112</span><a name="line.3112"></a> +<span class="sourceLineNo">3113</span> // ------------------------------------<a name="line.3113"></a> +<span class="sourceLineNo">3114</span> // STEP 2. Update any LATEST_TIMESTAMP timestamps<a name="line.3114"></a> +<span class="sourceLineNo">3115</span> // ----------------------------------<a name="line.3115"></a> +<span class="sourceLineNo">3116</span> for (int i = firstIndex; !isInReplay && i < lastIndexExclusive; i++) {<a name="line.3116"></a> +<span class="sourceLineNo">3117</span> // skip invalid<a name="line.3117"></a> +<span class="sourceLineNo">3118</span> if (batchOp.retCodeDetails[i].getOperationStatusCode()<a name="line.3118"></a> +<span class="sourceLineNo">3119</span> != OperationStatusCode.NOT_RUN) continue;<a name="line.3119"></a> +<span class="sourceLineNo">3120</span><a name="line.3120"></a> +<span class="sourceLineNo">3121</span> Mutation mutation = batchOp.getMutation(i);<a name="line.3121"></a> +<span class="sourceLineNo">3122</span> if (mutation instanceof Put) {<a name="line.3122"></a> +<span class="sourceLineNo">3123</span> updateCellTimestamps(familyMaps[i].values(), byteNow);<a name="line.3123"></a> +<span class="sourceLineNo">3124</span> noOfPuts++;<a name="line.3124"></a> +<span class="sourceLineNo">3125</span> } else {<a name="line.3125"></a> +<span class="sourceLineNo">3126</span> prepareDeleteTimestamps(mutation, familyMaps[i], byteNow);<a name="line.3126"></a> +<span class="sourceLineNo">3127</span> noOfDeletes++;<a name="line.3127"></a> +<span class="sourceLineNo">3128</span> }<a name="line.3128"></a> +<span class="sourceLineNo">3129</span> rewriteCellTags(familyMaps[i], mutation);<a name="line.3129"></a> +<span class="sourceLineNo">3130</span> }<a name="line.3130"></a> +<span class="sourceLineNo">3131</span><a name="line.3131"></a> +<span class="sourceLineNo">3132</span> lock(this.updatesLock.readLock(), numReadyToWrite);<a name="line.3132"></a> +<span class="sourceLineNo">3133</span> locked = true;<a name="line.3133"></a> +<span class="sourceLineNo">3134</span><a name="line.3134"></a> +<span class="sourceLineNo">3135</span> // calling the pre CP hook for batch mutation<a name="line.3135"></a> +<span class="sourceLineNo">3136</span> if (!isInReplay && coprocessorHost != null) {<a name="line.3136"></a> +<span class="sourceLineNo">3137</span> MiniBatchOperationInProgress<Mutation> miniBatchOp =<a name="line.3137"></a> +<span class="sourceLineNo">3138</span> new MiniBatchOperationInProgress<Mutation>(batchOp.getMutationsForCoprocs(),<a name="line.3138"></a> +<span class="sourceLineNo">3139</span> batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive);<a name="line.3139"></a> +<span class="sourceLineNo">3140</span> if (coprocessorHost.preBatchMutate(miniBatchOp)) return 0L;<a name="line.3140"></a> +<span class="sourceLineNo">3141</span> }<a name="line.3141"></a> +<span class="sourceLineNo">3142</span><a name="line.3142"></a> +<span class="sourceLineNo">3143</span> // ------------------------------------<a name="line.3143"></a> +<span class="sourceLineNo">3144</span> // STEP 3. Build WAL edit<a name="line.3144"></a> +<span class="sourceLineNo">3145</span> // ----------------------------------<a name="line.3145"></a> +<span class="sourceLineNo">3146</span> Durability durability = Durability.USE_DEFAULT;<a name="line.3146"></a> +<span class="sourceLineNo">3147</span> for (int i = firstIndex; i < lastIndexExclusive; i++) {<a name="line.3147"></a> +<span class="sourceLineNo">3148</span> // Skip puts that were determined to be invalid during preprocessing<a name="line.3148"></a> +<span class="sourceLineNo">3149</span> if (batchOp.retCodeDetails[i].getOperationStatusCode() != OperationStatusCode.NOT_RUN) {<a name="line.3149"></a> +<span class="sourceLineNo">3150</span> continue;<a name="line.3150"></a> +<span class="sourceLineNo">3151</span> }<a name="line.3151"></a> +<span class="sourceLineNo">3152</span><a name="line.3152"></a> +<span class="sourceLineNo">3153</span> Mutation m = batchOp.getMutation(i);<a name="line.3153"></a> +<span class="sourceLineNo">3154</span> Durability tmpDur = getEffectiveDurability(m.getDurability());<a name="line.3154"></a> +<span class="sourceLineNo">3155</span> if (tmpDur.ordinal() > durability.ordinal()) {<a name="line.3155"></a> +<span class="sourceLineNo">3156</span> durability = tmpDur;<a name="line.3156"></a> +<span class="sourceLineNo">3157</span> }<a name="line.3157"></a> +<span class="sourceLineNo">3158</span> if (tmpDur == Durability.SKIP_WAL) {<a name="line.3158"></a> +<span class="sourceLineNo">3159</span> recordMutationWithoutWal(m.getFamilyCellMap());<a name="line.3159"></a> +<span class="sourceLineNo">3160</span> continue;<a name="line.3160"></a> +<span class="sourceLineNo">3161</span> }<a name="line.3161"></a> +<span class="sourceLineNo">3162</span><a name="line.3162"></a> +<span class="sourceLineNo">3163</span> long nonceGroup = batchOp.getNonceGroup(i), nonce = batchOp.getNonce(i);<a name="line.3163"></a> +<span class="sourceLineNo">3164</span> // In replay, the batch may contain multiple nonces. If so, write WALEdit for each.<a name="line.3164"></a> +<span class="sourceLineNo">3165</span> // Given how nonces are originally written, these should be contiguous.<a name="line.3165"></a> +<span class="sourceLineNo">3166</span> // They don't have to be, it will still work, just write more WALEdits than needed.<a name="line.3166"></a> +<span class="sourceLineNo">3167</span> if (nonceGroup != currentNonceGroup || nonce != currentNonce) {<a name="line.3167"></a> +<span class="sourceLineNo">3168</span> if (walEdit.size() > 0) {<a name="line.3168"></a> +<span class="sourceLineNo">3169</span> assert isInReplay;<a name="line.3169"></a> +<span class="sourceLineNo">3170</span> if (!isInReplay) {<a name="line.3170"></a> +<span class="sourceLineNo">3171</span> throw new IOException("Multiple nonces per batch and not in replay");<a name="line.3171"></a> +<span class="sourceLineNo">3172</span> }<a name="line.3172"></a> +<span class="sourceLineNo">3173</span> // txid should always increase, so having the one from the last call is ok.<a name="line.3173"></a> +<span class="sourceLineNo">3174</span> // we use HLogKey here instead of WALKey directly to support legacy coprocessors.<a name="line.3174"></a> +<span class="sourceLineNo">3175</span> walKey = new ReplayHLogKey(this.getRegionInfo().getEncodedNameAsBytes(),<a name="line.3175"></a> +<span class="sourceLineNo">3176</span> this.htableDescriptor.getTableName(), now, m.getClusterIds(),<a name="line.3176"></a> +<span class="sourceLineNo">3177</span> currentNonceGroup, currentNonce, mvcc);<a name="line.3177"></a> +<span class="sourceLineNo">3178</span> txid = this.wal.append(this.htableDescriptor, this.getRegionInfo(), walKey,<a name="line.3178"></a> +<span class="sourceLineNo">3179</span> walEdit, true);<a name="line.3179"></a> +<span class="sourceLineNo">3180</span> walEdit = new WALEdit(isInReplay);<a name="line.3180"></a> +<span class="sourceLineNo">3181</span> walKey = null;<a name="line.3181"></a> +<span class="sourceLineNo">3182</span> }<a name="line.3182"></a> +<span class="sourceLineNo">3183</span> currentNonceGroup = nonceGroup;<a name="line.3183"></a> +<span class="sourceLineNo">3184</span> currentNonce = nonce;<a name="line.3184"></a> +<span class="sourceLineNo">3185</span> }<a name="line.3185"></a> +<span class="sourceLineNo">3186</span><a name="line.3186"></a> +<span class="sourceLineNo">3187</span> // Add WAL edits by CP<a name="line.3187"></a> +<span class="sourceLineNo">3188</span> WALEdit fromCP = batchOp.walEditsFromCoprocessors[i];<a name="line.3188"></a> +<span class="sourceLineNo">3189</span> if (fromCP != null) {<a name="line.3189"></a> +<span class="sourceLineNo">3190</span> for (Cell cell : fromCP.getCells()) {<a name="line.3190"></a> +<span class="sourceLineNo">3191</span> walEdit.add(cell);<a name="line.3191"></a> +<span class="sourceLineNo">3192</span> }<a name="line.3192"></a> +<span class="sourceLineNo">3193</span> }<a name="line.3193"></a> +<span class="sourceLineNo">3194</span> addFamilyMapToWALEdit(familyMaps[i], walEdit);<a name="line.3194"></a> +<span class="sourceLineNo">3195</span> }<a name="line.3195"></a> +<span class="sourceLineNo">3196</span><a name="line.3196"></a> +<span class="sourceLineNo">3197</span> // -------------------------<a name="line.3197"></a> +<span class="sourceLineNo">3198</span> // STEP 4. Append the final edit to WAL. Do not sync wal.<a name="line.3198"></a> +<span class="sourceLineNo">3199</span> // -------------------------<a name="line.3199"></a> +<span class="sourceLineNo">3200</span> Mutation mutation = batchOp.getMutation(firstIndex);<a name="line.3200"></a> +<span class="sourceLineNo">3201</span> if (isInReplay) {<a name="line.3201"></a> +<span class="sourceLineNo">3202</span> // use wal key from the original<a name="line.3202"></a> +<span class="sourceLineNo">3203</span> walKey = new ReplayHLogKey(this.getRegionInfo().getEncodedNameAsBytes(),<a name="line.3203"></a> +<span class="sourceLineNo">3204</span> this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now,<a name="line.3204"></a> +<span class="sourceLineNo">3205</span> mutation.getClusterIds(), currentNonceGroup, currentNonce, mvcc);<a name="line.3205"></a> +<span class="sourceLineNo">3206</span> long replaySeqId = batchOp.getReplaySequenceId();<a name="line.3206"></a> +<span class="sourceLineNo">3207</span> walKey.setOrigLogSeqNum(replaySeqId);<a name="line.3207"></a> +<span class="sourceLineNo">3208</span> }<a name="line.3208"></a> +<span class="sourceLineNo">3209</span> if (walEdit.size() > 0) {<a name="line.3209"></a> +<span class="sourceLineNo">3210</span> if (!isInReplay) {<a name="line.3210"></a> +<span class="sourceLineNo">3211</span> // we use HLogKey here instead of WALKey directly to support legacy coprocessors.<a name="line.3211"></a> +<span class="sourceLineNo">3212</span> walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),<a name="line.3212"></a> +<span class="sourceLineNo">3213</span> this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now,<a name="line.3213"></a> +<span class="sourceLineNo">3214</span> mutation.getClusterIds(), currentNonceGroup, currentNonce, mvcc);<a name="line.3214"></a> +<span class="sourceLineNo">3215</span> }<a name="line.3215"></a> +<span class="sourceLineNo">3216</span> txid = this.wal.append(this.htableDescriptor, this.getRegionInfo(), walKey, walEdit, true);<a name="line.3216"></a> +<span class="sourceLineNo">3217</span> }<a name="line.3217"></a> +<span class="sourceLineNo">3218</span> // ------------------------------------<a name="line.3218"></a> +<span class="sourceLineNo">3219</span> // Acquire the latest mvcc number<a name="line.3219"></a> +<span class="sourceLineNo">3220</span> // ----------------------------------<a name="line.3220"></a> +<span class="sourceLineNo">3221</span> if (walKey == null) {<a name="line.3221"></a> +<span class="sourceLineNo">3222</span> // If this is a skip wal operation just get the read point from mvcc<a name="line.3222"></a> +<span class="sourceLineNo">3223</span> walKey = this.appendEmptyEdit(this.wal);<a name="line.3223"></a> <span class="sourceLineNo">3224</span> }<a name="line.3224"></a> -<span class="sourceLineNo">3225</span> // ------------------------------------<a name="line.3225"></a> -<span class="sourceLineNo">3226</span> // Acquire the latest mvcc number<a name="line.3226"></a> -<span class="sourceLineNo">3227</span> // ----------------------------------<a name="line.3227"></a> -<span class="sourceLineNo">3228</span> if (walKey == null) {<a name="line.3228"></a> -<span class="sourceLineNo">3229</span> // If this is a skip wal operation just get the read point from mvcc<a name="line.3229"></a> -<span class="sourceLineNo">3230</span> walKey = this.appendEmptyEdit(this.wal);<a name="line.3230"></a> -<span class="sourceLineNo">3231</span> }<a name="line.3231"></a> -<span class="sourceLineNo">3232</span> if (!isInReplay) {<a name="line.3232"></a> -<span class="sourceLineNo">3233</span> writeEntry = walKey.getWriteEntry();<a name="line.3233"></a> -<span class="sourceLineNo">3234</span> mvccNum = writeEntry.getWriteNumber();<a name="line.3234"></a> -<span class="sourceLineNo">3235</span> } else {<a name="line.3235"></a> -<span class="sourceLineNo">3236</span> mvccNum = batchOp.getReplaySequenceId();<a name="line.3236"></a> -<span class="sourceLineNo">3237</span> }<a name="line.3237"></a> -<span class="sourceLineNo">3238</span><a name="line.3238"></a> -<span class="sourceLineNo">3239</span> // ------------------------------------<a name="line.3239"></a> -<span class="sourceLineNo">3240</span> // STEP 5. Write back to memstore<a name="line.3240"></a> -<span class="sourceLineNo">3241</span> // Write to memstore. It is ok to write to memstore<a name="line.3241"></a> -<span class="sourceLineNo">3242</span> // first without syncing the WAL because we do not roll<a name="line.3242"></a> -<span class="sourceLineNo">3243</span> // forward the memstore MVCC. The MVCC will be moved up when<a name="line.3243"></a> -<span class="sourceLineNo">3244</span> // the complete operation is done. These changes are not yet<a name="line.3244"></a> -<span class="sourceLineNo">3245</span> // visible to scanners till we update the MVCC. The MVCC is<a name="line.3245"></a> -<span class="sourceLineNo">3246</span> // moved only when the sync is complete.<a name="line.3246"></a> -<span class="sourceLineNo">3247</span> // ----------------------------------<a name="line.3247"></a> -<span class="sourceLineNo">3248</span> long addedSize = 0;<a name="line.3248"></a> -<span class="sourceLineNo">3249</span> for (int i = firstIndex; i < lastIndexExclusive; i++) {<a name="line.3249"></a> -<span class="sourceLineNo">3250</span> if (batchOp.retCodeDetails[i].getOperationStatusCode()<a name="line.3250"></a> -<span class="sourceLineNo">3251</span> != OperationStatusCode.NOT_RUN) {<a name="line.3251"></a> -<span class="sourceLineNo">3252</span> continue;<a name="line.3252"></a> -<span class="sourceLineNo">3253</span> }<a name="line.3253"></a> -<span class="sourceLineNo">3254</span> doRollBackMemstore = true; // If we have a failure, we need to clean what we wrote<a name="line.3254"></a> -<span class="sourceLineNo">3255</span> addedSize += applyFamilyMapToMemstore(familyMaps[i], mvccNum, isInReplay);<a name="line.3255"></a> -<span class="sourceLineNo">3256</span> }<a name="line.3256"></a> -<span class="sourceLineNo">3257</span><a name="line.3257"></a> -<span class="sourceLineNo">3258</span> // -------------------------------<a name="line.3258"></a> -<span class="sourceLineNo">3259</span> // STEP 6. Release row locks, etc.<a name="line.3259"></a> -<span class="sourceLineNo">3260</span> // -------------------------------<a name="line.3260"></a> -<span class="sourceLineNo">3261</span> if (locked) {<a name="line.3261"></a> -<span class="sourceLineNo">3262</span> this.updatesLock.readLock().unlock();<a name="line.3262"></a> -<span class="sourceLineNo">3263</span> locked = false;<a name="line.3263"></a> -<span class="sourceLineNo">3264</span> }<a name="line.3264"></a> -<span class="sourceLineNo">3265</span> releaseRowLocks(acquiredRowLocks);<a name="line.3265"></a> +<span class="sourceLineNo">3225</span> if (!isInReplay) {<a name="line.3225"></a> +<span class="sourceLineNo">3226</span> writeEntry = walKey.getWriteEntry();<a name="line.3226"></a> +<span class="sourceLineNo">3227</span> mvccNum = writeEntry.getWriteNumber();<a name="line.3227"></a> +<span class="sourceLineNo">3228</span> } else {<a name="line.3228"></a> +<span class="sourceLineNo">3229</span> mvccNum = batchOp.getReplaySequenceId();<a name="line.3229"></a> +<span class="sourceLineNo">3230</span> }<a name="line.3230"></a> +<span class="sourceLineNo">3231</span><a name="line.3231"></a> +<span class="sourceLineNo">3232</span> // ------------------------------------<a name="line.3232"></a> +<span class="sourceLineNo">3233</span> // STEP 5. Write back to memstore<a name="line.3233"></a> +<span class="sourceLineNo">3234</span> // Write to memstore. It is ok to write to memstore<a name="line.3234"></a> +<span class="sourceLineNo">3235</span> // first without syncing the WAL because we do not roll<a name="line.3235"></a> +<span class="sourceLineNo">3236</span> // forward the memstore MVCC. The MVCC will be moved up when<a name="line.3236"></a> +<span class="sourceLineNo">3237</span> // the complete operation is done. These changes are not yet<a name="line.3237"></a> +<span class="sourceLineNo">3238</span> // visible to scanners till we update the MVCC. The MVCC is<a name="line.3238"></a> +<span class="sourceLineNo">3239</span> // moved only when the sync is complete.<a name="line.3239"></a> +<span class="sourceLineNo">3240</span> // ----------------------------------<a name="line.3240"></a> +<span class="sourceLineNo">3241</span> long addedSize = 0;<a name="line.3241"></a> +<span class="sourceLineNo">3242</span> for (int i = firstIndex; i < lastIndexExclusive; i++) {<a name="line.3242"></a> +<span class="sourceLineNo">3243</span> if (batchOp.retCodeDetails[i].getOperationStatusCode()<a name="line.3243"></a> +<span class="sourceLineNo">3244</span> != OperationStatusCode.NOT_RUN) {<a name="line.3244"></a> +<span class="sourceLineNo">3245</span> continue;<a name="line.3245"></a> +<span class="sourceLineNo">3246</span> }<a name="line.3246"></a> +<span class="sourceLineNo">3247</span> doRollBackMemstore = true; // If we have a failure, we need to clean what we wrote<a name="line.3247"></a> +<span class="sourceLineNo">3248</span> addedSize += applyFamilyMapToMemstore(familyMaps[i], mvccNum, isInReplay);<a name="line.3248"></a> +<span class="sourceLineNo">3249</span> }<a name="line.3249"></a> +<span class="sourceLineNo">3250</span><a name="line.3250"></a> +<span class="sourceLineNo">3251</span> // -------------------------------<a name="line.3251"></a> +<span class="sourceLineNo">3252</span> // STEP 6. Release row locks, etc.<a name="line.3252"></a> +<span class="sourceLineNo">3253</span> // -------------------------------<a name="line.3253"></a> +<span class="sourceLineNo">3254</span> if (locked) {<a name="line.3254"></a> +<span class="sourceLineNo">3255</span> this.updatesLock.readLock().unlock();<a name="line.3255"></a> +<span class="sourceLineNo">3256</span> locked = false;<a name="line.3256"></a> +<span class="sourceLineNo">3257</span> }<a name="line.3257"></a> +<span class="sourceLineNo">3258</span> releaseRowLocks(acquiredRowLocks);<a name="line.3258"></a> +<span class="sourceLineNo">3259</span><a name="line.3259"></a> +<span class="sourceLineNo">3260</span> // -------------------------<a name="line.3260"></a> +<span class="sourceLineNo">3261</span> // STEP 7. Sync wal.<a name="line.3261"></a> +<span class="sourceLineNo">3262</span> // -------------------------<a name="line.3262"></a> +<span class="sourceLineNo">3263</span> if (txid != 0) {<a name="line.3263"></a> +<span class="sourceLineNo">3264</span> syncOrDefer(txid, durability);<a name="line.3264"></a> +<span class="sourceLineNo">3265</span> }<a name="line.3265"></a> <span class="sourceLineNo">3266</span><a name="line.3266"></a> -<span class="sourceLineNo">3267</span> // -------------------------<a name="line.3267"></a> -<span class="sourceLineNo">3268</span> // STEP 7. Sync wal.<a name="line.3268"></a> -<span class="sourceLineNo">3269</span> // -------------------------<a name="line.3269"></a> -<span class="sourceLineNo">3270</span> if (txid != 0) {<a name="line.3270"></a> -<span class="sourceLineNo">3271</span> syncOrDefer(txid, durability);<a name="line.3271"></a> -<span class="sourceLineNo">3272</span> }<a name="line.3272"></a> -<span class="sourceLineNo">3273</span><a name="line.3273"></a> -<span class="sourceLineNo">3274</span> doRollBackMemstore = false;<a name="line.3274"></a> -<span class="sourceLineNo">3275</span> // calling the post CP hook for batch mutation<a name="line.3275"></a> -<span class="sourceLineNo">3276</span> if (!isInReplay && coprocessorHost != null) {<a name="line.3276"></a> -<span class="sourceLineNo">3277</span> MiniBatchOperationInProgress<Mutation> miniBatchOp =<a name="line.3277"></a> -<span class="sourceLineNo">3278</span> new MiniBatchOperationInProgress<Mutation>(batchOp.getMutationsForCoprocs(),<a name="line.3278"></a> -<span class="sourceLineNo">3279</span> batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive);<a name="line.3279"></a> -<span class="sourceLineNo">3280</span> coprocessorHost.postBatchMutate(miniBatchOp);<a name="line.3280"></a> -<span class="sourceLineNo">3281</span> }<a name="line.3281"></a> -<span class="sourceLineNo">3282</span><a name="line.3282"></a> -<span class="sourceLineNo">3283</span> // ------------------------------------------------------------------<a name="line.3283"></a> -<span class="sourceLineNo">3284</span> // STEP 8. Advance mvcc. This will make this put visible to scanners and getters.<a name="line.3284"></a> -<span class="sourceLineNo">3285</span> // ------------------------------------------------------------------<a name="line.3285"></a> -<span class="sourceLineNo">3286</span> if (writeEntry != null) {<a name="line.3286"></a> -<span class="sourceLineNo">3287</span> mvcc.completeAndWait(writeEntry);<a name="line.3287"></a> -<span class="sourceLineNo">3288</span> writeEntry = null;<a name="line.3288"></a> -<span class="sourceLineNo">3289</span> } else if (isInReplay) {<a name="line.3289"></a> -<span class="sourceLineNo">3290</span> // ensure that the sequence id of the region is at least as big as orig log seq id<a name="line.3290"></a> -<span class="sourceLineNo">3291</span> mvcc.advanceTo(mvccNum);<a name="line.3291"></a> -<span class="sourceLineNo">3292</span> }<a name="line.3292"></a> -<span class="sourceLineNo">3293</span><a name="line.3293"></a> -<span class="sourceLineNo">3294</span> for (int i = firstIndex; i < lastIndexExclusive; i ++) {<a name="line.3294"></a> -<span class="sourceLineNo">3295</span> if (batchOp.retCodeDetails[i] == OperationStatus.NOT_RUN) {<a name="line.3295"></a> -<span class="sourceLineNo">3296</span> batchOp.retCodeDetails[i] = OperationStatus.SUCCESS;<a name="line.3296"></a> -<span class="sourceLineNo">3297</span> }<a name="line.3297"></a> -<span class="sourceLineNo">3298</span> }<a name="line.3298"></a> -<span class="sourceLineNo">3299</span><a name="line.3299"></a> -<span class="sourceLineNo">3300</span> // ------------------------------------<a name="line.3300"></a> -<span class="sourceLineNo">3301</span> // STEP 9. Run coprocessor post hooks. This should be done after the wal is<a name="line.3301"></a> -<span class="sourceLineNo">3302</span> // synced so that the coprocessor contract is adhered to.<a name="line.3302"></a> -<span class="sourceLineNo">3303</span> // ------------------------------------<a name="line.3303"></a> -<span class="sourceLineNo">3304</span> if (!isInReplay && coprocessorHost != null) {<a name="line.3304"></a> -<span class="sourceLineNo">3305</span> for (int i = firstIndex; i < lastIndexExclusive; i++) {<a name="line.3305"></a> -<span class="sourceLineNo">3306</span> // only for successful puts<a name="line.3306"></a> -<span class="sourceLineNo">3307</span> if (batchOp.retCodeDetails[i].getOperationStatusCode()<a name="line.3307"></a> -<span class="sourceLineNo">3308</span> != OperationStatusCode.SUCCESS) {<a name="line.3308"></a> -<span class="sourceLineNo">3309</span> continue;<a name="line.3309"></a> -<span class="sourceLineNo">3310</span> }<a name="line.3310"></a> -<span class="sourceLineNo">3311</span> Mutation m = batchOp.getMutation(i);<a name="line.3311"></a> -<span class="sourceLineNo">3312</span> if (m instanceof Put) {<a name="line.3312"></a> -<span class="sourceLineNo">3313</span> coprocessorHost.postPut((Put) m, walEdit, m.getDurability());<a name="line.3313"></a> -<span class="sourceLineNo">3314</span> } else {<a name="line.3314"></a> -<span class="sourceLineNo">3315</span> coprocessorHost.postDelete((Delete) m, walEdit, m.getDurability());<a name="line.3315"></a> -<span class="sourceLineNo">3316</span> }<a name="line.3316"></a> -<span class="sourceLineNo">3317</span> }<a name="line.3317"></a> -<span class="sourceLineNo">3318</span> }<a name="line.3318"></a> -<span class="sourceLineNo">3319</span><a name="line.3319"></a> -<span class="sourceLineNo">3320</span> success = true;<a name="line.3320"></a> -<span class="sourceLineNo">3321</span> return addedSize;<a name="line.3321"></a> -<span class="sourceLineNo">3322</span> } finally {<a name="line.3322"></a> -<span class="sourceLineNo">3323</span> // if the wal sync was unsuccessful, remove keys from memstore<a name="line.3323"></a> -<span class="sourceLineNo">3324</span> if (doRollBackMemstore) {<a name="line.3324"></a> -<span class="sourceLineNo">3325</span> for (int j = 0; j < familyMaps.length; j++) {<a name="line.3325"></a> -<span class="sourceLineNo">3326</span> for(List<Cell> cells:familyMaps[j].values()) {<a name="line.3326"></a> -<span class="sourceLineNo">3327</span> rollbackMemstore(cells);<a name="line.3327"></a> -<span class="sourceLineNo">3328</span> }<a name="line.3328"></a> -<span class="sourceLineNo">3329</span> }<a name="line.3329"></a> -<span class="sourceLineNo">3330</span> if (writeEntry != null) mvcc.complete(writeEntry);<a name="line.3330"></a> -<span class="sourceLineNo">3331</span> } else if (writeEntry != null) {<a name="line.3331"></a> -<span class="sourceLineNo">3332</span> mvcc.completeAndWait(writeEntry);<a name="line.3332"></a> -<span class="sourceLineNo">3333</span> }<a name="line.3333"></a> -<span class="sourceLineNo">3334</span><a name="line.3334"></a> -<span class="sourceLineNo">3335</span> if (locked) {<a name="line.3335"></a> -<span class="sourceLineNo">3336</span> this.updatesLock.readLock().unlock();<a name="line.3336"></a> -<span class="sourceLineNo">3337</span> }<a name="line.3337"></a> -<span class="sourceLineNo">3338</span> releaseRowLocks(acquiredRowLocks);<a name="line.3338"></a> -<span class="sourceLineNo">3339</span><a name="line.3339"></a> -<span class="sourceLineNo">3340</span> // See if the column families were consistent through the whole thing.<a name="line.3340"></a> -<span class="sourceLineNo">3341</span> // if they were then keep them. If they were not then pass a null.<a name="line.3341"></a> -<span class="sourceLineNo">3342</span> // null will be treated as unknown.<a name="line.3342"></a> -<span class="sourceLineNo">3343</span> // Total time taken might be involving Puts and Deletes.<a name="line.3343"></a> -<span class="sourceLineNo">3344</span> // Split the time for puts and deletes based on the total number of Puts and Deletes.<a name="line.3344"></a> -<span class="sourceLineNo">3345</span><a name="line.3345"></a> -<span class="sourceLineNo">3346</span> if (noOfPuts > 0) {<a name="line.3346"></a> -<span class="sourceLineNo">3347</span> // There were some Puts in the batch.<a name="line.3347"></a> -<span class="sourceLineNo">3348</span> if (this.metricsRegion != null) {<a name="line.3348"></a> -<span class="sourceLineNo">3349</span> this.metricsRegion.updatePut();<a name="line.3349"></a> -<span class="sourceLineNo">3350</span> }<a name="line.3350"></a> -<span class="sourceLineNo">3351</span> }<a name="line.3351"></a> -<span class="sourceLineNo">3352</span> if (noOfDeletes > 0) {<a name="line.3352"></a> -<span class="sourceLineNo">3353</span> // There were some Deletes in the batch.<a name="line.3353"></a> -<span class="sourceLineNo">3354</span> if (this.metricsRegion != null) {<a name="line.3354"></a> -<span class="sourceLineNo">3355</span> this.metricsRegion.updateDelete();<a name="line.3355"></a> +<span class="sourceLineNo">3267</span> doRollBackMemstore = false;<a name="line.3267"></a> +<span class="sourceLineNo">3268</span> // calling the post CP hook for batch mutation<a name="line.3268"></a> +<span class="sourceLineNo">3269</span> if (!isInReplay && coprocessorHost != null) {<a name="line.3269"></a> +<span class="sourceLineNo">3270</span> MiniBatchOperationInProgress<Mutation> miniBatchOp =<a name="line.3270"></a> +<span class="sourceLineNo">3271</span> new MiniBatchOperationInProgress<Mutation>(batchOp.getMutationsForCoprocs(),<a name="line.3271"></a> +<span class="sourceLineNo">3272</span> batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive);<a name="line.3272"></a> +<span class="sourceLineNo">3273</span> coprocessorHost.postBatchMutate(miniBatchOp);<a name="line.3273"></a> +<span class="sourceLineNo">3274</span> }<a name="line.3274"></a> +<span class="sourceLineNo">3275</span><a name="line.3275"></a> +<span class="sourceLineNo">3276</span> // ------------------------------------------------------------------<a name="line.3276"></a> +<span class="sourceLineNo">3277</span> // STEP 8. Advance mvcc. This will make this put visible to scanners and getters.<a name="line.3277"></a> +<span class="sourceLineNo">3278</span> // ------------------------------------------------------------------<a name="line.3278"></a> +<span class="sourceLineNo">3279</span> if (writeEntry != null) {<a name="line.3279"></a> +<span class="sourceLineNo">3280</span> mvcc.completeAndWait(writeEntry);<a name="line.3280"></a> +<span class="sourceLineNo">3281</span> writeEntry = null;<a name="line.3281"></a> +<span class="sourceLineNo">3282</span> } else if (isInReplay) {<a name="line.3282"></a> +<span class="sourceLineNo">3283</span> // ensure that the sequence id of the region is at least as big as orig log seq id<a name="line.3283"></a> +<span class="sourceLineNo">3284</span> mvcc.advanceTo(mvccNum);<a name="line.3284"></a> +<span class="sourceLineNo">3285</span> }<a name="line.3285"></a> +<span class="sourceLineNo">3286</span><a name="line.3286"></a> +<span class="sourceLineNo">3287</span> for (int i = firstIndex; i < lastIndexExclusive; i ++) {<a name="line.3287"></a> +<span class="sourceLineNo">3288</span> if (batchOp.retCodeDetails[i] == OperationStatus.NOT_RUN) {<a name="line.3288"></a> +<span class="sourceLineNo">3289</span> batchOp.retCodeDetails[i] = OperationStatus.SUCCESS;<a name="line.3289"></a> +<span class="sourceLineNo">3290</span> }<a name="line.3290"></a> +<span class="sourceLineNo">3291</span> }<a name="line.3291"></a> +<span class="sourceLineNo">3292</span><a name="line.3292"></a> +<span class="sourceLineNo">3293</span> // ------------------------------------<a name="line.3293"></a> +<span class="sourceLineNo">3294</span> // STEP 9. Run coprocessor post hooks. This should be done after the wal is<a name="line.3294"></a> +<span class="sourceLineNo">3295</span> // synced so that the coprocessor contract is adhered to.<a name="line.3295"></a> +<span class="sourceLineNo">3296</span> // ------------------------------------<a name="line.3296"></a> +<span class="sourceLineNo">3297</span> if (!isInReplay && coprocessorHost != null) {<a name="line.3297"></a> +<span class="sourceLineNo">3298</span> for (int i = firstIndex; i < lastIndexExclusive; i++) {<a name="line.3298"></a> +<span class="sourceLineNo">3299</span> // only for successful puts<a name="line.3299"></a> +<span class="sourceLineNo">3300</span> if (batchOp.retCodeDetails[i].getOperationStatusCode()<a name="line.3300"></a> +<span class="sourceLineNo">3301</span> != OperationStatusCode.SUCCESS) {<a name="line.3301"></a> +<span class="sourceLineNo">3302</span> continue;<a name="line.3302"></a> +<span class="sourceLineNo">3303</span> }<a name="line.3303"></a> +<span class="sourceLineNo">3304</span> Mutation m = batchOp.getMutation(i);<a name="line.3304"></a> +<span class="sourceLineNo">3305</span> if (m instanceof Put) {<a name="line.3305"></a> +<span class="sourceLineNo">3306</span
<TRUNCATED>