http://git-wip-us.apache.org/repos/asf/hbase-site/blob/d917c66a/xref/org/apache/hadoop/hbase/ipc/RpcServer.html ---------------------------------------------------------------------- diff --git a/xref/org/apache/hadoop/hbase/ipc/RpcServer.html b/xref/org/apache/hadoop/hbase/ipc/RpcServer.html index eeab98f..99d4beb 100644 --- a/xref/org/apache/hadoop/hbase/ipc/RpcServer.html +++ b/xref/org/apache/hadoop/hbase/ipc/RpcServer.html @@ -329,2322 +329,2333 @@ <a class="jxr_linenumber" name="319" href="#319">319</a> <strong class="jxr_keyword">private</strong> <a href="../../../../../org/apache/hadoop/hbase/ipc/RpcCallback.html">RpcCallback</a> callback; <a class="jxr_linenumber" name="320" href="#320">320</a> <a class="jxr_linenumber" name="321" href="#321">321</a> <strong class="jxr_keyword">private</strong> <strong class="jxr_keyword">long</strong> responseCellSize = 0; -<a class="jxr_linenumber" name="322" href="#322">322</a> <strong class="jxr_keyword">private</strong> <strong class="jxr_keyword">boolean</strong> retryImmediatelySupported; -<a class="jxr_linenumber" name="323" href="#323">323</a> -<a class="jxr_linenumber" name="324" href="#324">324</a> <a href="../../../../../org/apache/hadoop/hbase/ipc/RpcServer.html">Call</a>(<strong class="jxr_keyword">int</strong> id, <strong class="jxr_keyword">final</strong> BlockingService service, <strong class="jxr_keyword">final</strong> MethodDescriptor md, RequestHeader header, -<a class="jxr_linenumber" name="325" href="#325">325</a> Message param, <a href="../../../../../org/apache/hadoop/hbase/CellScanner.html">CellScanner</a> cellScanner, <a href="../../../../../org/apache/hadoop/hbase/client/Connection.html">Connection</a> connection, <a href="../../../../../org/apache/hadoop/hbase/ipc/RpcServer.html">Responder</a> responder, -<a class="jxr_linenumber" name="326" href="#326">326</a> <strong class="jxr_keyword">long</strong> size, TraceInfo tinfo, <strong class="jxr_keyword">final</strong> InetAddress remoteAddress) { -<a class="jxr_linenumber" name="327" href="#327">327</a> <strong class="jxr_keyword">this</strong>.id = id; -<a class="jxr_linenumber" name="328" href="#328">328</a> <strong class="jxr_keyword">this</strong>.service = service; -<a class="jxr_linenumber" name="329" href="#329">329</a> <strong class="jxr_keyword">this</strong>.md = md; -<a class="jxr_linenumber" name="330" href="#330">330</a> <strong class="jxr_keyword">this</strong>.header = header; -<a class="jxr_linenumber" name="331" href="#331">331</a> <strong class="jxr_keyword">this</strong>.param = param; -<a class="jxr_linenumber" name="332" href="#332">332</a> <strong class="jxr_keyword">this</strong>.cellScanner = cellScanner; -<a class="jxr_linenumber" name="333" href="#333">333</a> <strong class="jxr_keyword">this</strong>.connection = connection; -<a class="jxr_linenumber" name="334" href="#334">334</a> <strong class="jxr_keyword">this</strong>.timestamp = System.currentTimeMillis(); -<a class="jxr_linenumber" name="335" href="#335">335</a> <strong class="jxr_keyword">this</strong>.response = <strong class="jxr_keyword">null</strong>; -<a class="jxr_linenumber" name="336" href="#336">336</a> <strong class="jxr_keyword">this</strong>.delayResponse = false; -<a class="jxr_linenumber" name="337" href="#337">337</a> <strong class="jxr_keyword">this</strong>.responder = responder; -<a class="jxr_linenumber" name="338" href="#338">338</a> <strong class="jxr_keyword">this</strong>.isError = false; -<a class="jxr_linenumber" name="339" href="#339">339</a> <strong class="jxr_keyword">this</strong>.size = size; -<a class="jxr_linenumber" name="340" href="#340">340</a> <strong class="jxr_keyword">this</strong>.tinfo = tinfo; -<a class="jxr_linenumber" name="341" href="#341">341</a> <strong class="jxr_keyword">this</strong>.user = connection.user; -<a class="jxr_linenumber" name="342" href="#342">342</a> <strong class="jxr_keyword">this</strong>.remoteAddress = remoteAddress; -<a class="jxr_linenumber" name="343" href="#343">343</a> <strong class="jxr_keyword">this</strong>.retryImmediatelySupported = connection.retryImmediatelySupported; -<a class="jxr_linenumber" name="344" href="#344">344</a> } -<a class="jxr_linenumber" name="345" href="#345">345</a> -<a class="jxr_linenumber" name="346" href="#346">346</a> <em class="jxr_javadoccomment">/**</em> -<a class="jxr_linenumber" name="347" href="#347">347</a> <em class="jxr_javadoccomment"> * Call is done. Execution happened and we returned results to client. It is now safe to</em> -<a class="jxr_linenumber" name="348" href="#348">348</a> <em class="jxr_javadoccomment"> * cleanup.</em> -<a class="jxr_linenumber" name="349" href="#349">349</a> <em class="jxr_javadoccomment"> */</em> -<a class="jxr_linenumber" name="350" href="#350">350</a> <strong class="jxr_keyword">void</strong> done() { -<a class="jxr_linenumber" name="351" href="#351">351</a> <strong class="jxr_keyword">if</strong> (<strong class="jxr_keyword">this</strong>.cellBlock != <strong class="jxr_keyword">null</strong>) { -<a class="jxr_linenumber" name="352" href="#352">352</a> <em class="jxr_comment">// Return buffer to reservoir now we are done with it.</em> -<a class="jxr_linenumber" name="353" href="#353">353</a> reservoir.putBuffer(<strong class="jxr_keyword">this</strong>.cellBlock); -<a class="jxr_linenumber" name="354" href="#354">354</a> <strong class="jxr_keyword">this</strong>.cellBlock = <strong class="jxr_keyword">null</strong>; -<a class="jxr_linenumber" name="355" href="#355">355</a> } -<a class="jxr_linenumber" name="356" href="#356">356</a> <strong class="jxr_keyword">this</strong>.connection.decRpcCount(); <em class="jxr_comment">// Say that we're done with this call.</em> -<a class="jxr_linenumber" name="357" href="#357">357</a> } -<a class="jxr_linenumber" name="358" href="#358">358</a> -<a class="jxr_linenumber" name="359" href="#359">359</a> @Override -<a class="jxr_linenumber" name="360" href="#360">360</a> <strong class="jxr_keyword">public</strong> String toString() { -<a class="jxr_linenumber" name="361" href="#361">361</a> <strong class="jxr_keyword">return</strong> toShortString() + <span class="jxr_string">" param: "</span> + -<a class="jxr_linenumber" name="362" href="#362">362</a> (<strong class="jxr_keyword">this</strong>.param != <strong class="jxr_keyword">null</strong>? ProtobufUtil.getShortTextFormat(<strong class="jxr_keyword">this</strong>.param): <span class="jxr_string">""</span>) + -<a class="jxr_linenumber" name="363" href="#363">363</a> <span class="jxr_string">" connection: "</span> + connection.toString(); -<a class="jxr_linenumber" name="364" href="#364">364</a> } -<a class="jxr_linenumber" name="365" href="#365">365</a> -<a class="jxr_linenumber" name="366" href="#366">366</a> <strong class="jxr_keyword">protected</strong> RequestHeader getHeader() { -<a class="jxr_linenumber" name="367" href="#367">367</a> <strong class="jxr_keyword">return</strong> <strong class="jxr_keyword">this</strong>.header; -<a class="jxr_linenumber" name="368" href="#368">368</a> } -<a class="jxr_linenumber" name="369" href="#369">369</a> -<a class="jxr_linenumber" name="370" href="#370">370</a> <em class="jxr_comment">/*</em> -<a class="jxr_linenumber" name="371" href="#371">371</a> <em class="jxr_comment"> * Short string representation without param info because param itself could be huge depends on</em> -<a class="jxr_linenumber" name="372" href="#372">372</a> <em class="jxr_comment"> * the payload of a command</em> -<a class="jxr_linenumber" name="373" href="#373">373</a> <em class="jxr_comment"> */</em> -<a class="jxr_linenumber" name="374" href="#374">374</a> String toShortString() { -<a class="jxr_linenumber" name="375" href="#375">375</a> String serviceName = <strong class="jxr_keyword">this</strong>.connection.service != <strong class="jxr_keyword">null</strong> ? -<a class="jxr_linenumber" name="376" href="#376">376</a> <strong class="jxr_keyword">this</strong>.connection.service.getDescriptorForType().getName() : <span class="jxr_string">"null"</span>; -<a class="jxr_linenumber" name="377" href="#377">377</a> <strong class="jxr_keyword">return</strong> <span class="jxr_string">"callId: "</span> + <strong class="jxr_keyword">this</strong>.id + <span class="jxr_string">" service: "</span> + serviceName + -<a class="jxr_linenumber" name="378" href="#378">378</a> <span class="jxr_string">" methodName: "</span> + ((<strong class="jxr_keyword">this</strong>.md != <strong class="jxr_keyword">null</strong>) ? <strong class="jxr_keyword">this</strong>.md.getName() : <span class="jxr_string">"n/a"</span>) + -<a class="jxr_linenumber" name="379" href="#379">379</a> <span class="jxr_string">" size: "</span> + StringUtils.TraditionalBinaryPrefix.<strong class="jxr_keyword">long</strong>2String(<strong class="jxr_keyword">this</strong>.size, <span class="jxr_string">""</span>, 1) + -<a class="jxr_linenumber" name="380" href="#380">380</a> <span class="jxr_string">" connection: "</span> + connection.toString(); -<a class="jxr_linenumber" name="381" href="#381">381</a> } -<a class="jxr_linenumber" name="382" href="#382">382</a> -<a class="jxr_linenumber" name="383" href="#383">383</a> String toTraceString() { -<a class="jxr_linenumber" name="384" href="#384">384</a> String serviceName = <strong class="jxr_keyword">this</strong>.connection.service != <strong class="jxr_keyword">null</strong> ? -<a class="jxr_linenumber" name="385" href="#385">385</a> <strong class="jxr_keyword">this</strong>.connection.service.getDescriptorForType().getName() : <span class="jxr_string">""</span>; -<a class="jxr_linenumber" name="386" href="#386">386</a> String methodName = (<strong class="jxr_keyword">this</strong>.md != <strong class="jxr_keyword">null</strong>) ? <strong class="jxr_keyword">this</strong>.md.getName() : <span class="jxr_string">""</span>; -<a class="jxr_linenumber" name="387" href="#387">387</a> <strong class="jxr_keyword">return</strong> serviceName + <span class="jxr_string">"."</span> + methodName; -<a class="jxr_linenumber" name="388" href="#388">388</a> } -<a class="jxr_linenumber" name="389" href="#389">389</a> -<a class="jxr_linenumber" name="390" href="#390">390</a> <strong class="jxr_keyword">protected</strong> <strong class="jxr_keyword">synchronized</strong> <strong class="jxr_keyword">void</strong> setSaslTokenResponse(ByteBuffer response) { -<a class="jxr_linenumber" name="391" href="#391">391</a> <strong class="jxr_keyword">this</strong>.response = <strong class="jxr_keyword">new</strong> <a href="../../../../../org/apache/hadoop/hbase/ipc/BufferChain.html">BufferChain</a>(response); -<a class="jxr_linenumber" name="392" href="#392">392</a> } -<a class="jxr_linenumber" name="393" href="#393">393</a> -<a class="jxr_linenumber" name="394" href="#394">394</a> <strong class="jxr_keyword">protected</strong> <strong class="jxr_keyword">synchronized</strong> <strong class="jxr_keyword">void</strong> setResponse(Object m, <strong class="jxr_keyword">final</strong> <a href="../../../../../org/apache/hadoop/hbase/CellScanner.html">CellScanner</a> cells, -<a class="jxr_linenumber" name="395" href="#395">395</a> Throwable t, String errorMsg) { -<a class="jxr_linenumber" name="396" href="#396">396</a> <strong class="jxr_keyword">if</strong> (<strong class="jxr_keyword">this</strong>.isError) <strong class="jxr_keyword">return</strong>; -<a class="jxr_linenumber" name="397" href="#397">397</a> <strong class="jxr_keyword">if</strong> (t != <strong class="jxr_keyword">null</strong>) <strong class="jxr_keyword">this</strong>.isError = <strong class="jxr_keyword">true</strong>; -<a class="jxr_linenumber" name="398" href="#398">398</a> <a href="../../../../../org/apache/hadoop/hbase/ipc/BufferChain.html">BufferChain</a> bc = <strong class="jxr_keyword">null</strong>; -<a class="jxr_linenumber" name="399" href="#399">399</a> <strong class="jxr_keyword">try</strong> { -<a class="jxr_linenumber" name="400" href="#400">400</a> ResponseHeader.Builder headerBuilder = ResponseHeader.newBuilder(); -<a class="jxr_linenumber" name="401" href="#401">401</a> <em class="jxr_comment">// Presume it a pb Message. Could be null.</em> -<a class="jxr_linenumber" name="402" href="#402">402</a> Message result = (Message)m; -<a class="jxr_linenumber" name="403" href="#403">403</a> <em class="jxr_comment">// Call id.</em> -<a class="jxr_linenumber" name="404" href="#404">404</a> headerBuilder.setCallId(<strong class="jxr_keyword">this</strong>.id); -<a class="jxr_linenumber" name="405" href="#405">405</a> <strong class="jxr_keyword">if</strong> (t != <strong class="jxr_keyword">null</strong>) { -<a class="jxr_linenumber" name="406" href="#406">406</a> ExceptionResponse.Builder exceptionBuilder = ExceptionResponse.newBuilder(); -<a class="jxr_linenumber" name="407" href="#407">407</a> exceptionBuilder.setExceptionClassName(t.getClass().getName()); -<a class="jxr_linenumber" name="408" href="#408">408</a> exceptionBuilder.setStackTrace(errorMsg); -<a class="jxr_linenumber" name="409" href="#409">409</a> exceptionBuilder.setDoNotRetry(t instanceof <a href="../../../../../org/apache/hadoop/hbase/DoNotRetryIOException.html">DoNotRetryIOException</a>); -<a class="jxr_linenumber" name="410" href="#410">410</a> <strong class="jxr_keyword">if</strong> (t instanceof RegionMovedException) { -<a class="jxr_linenumber" name="411" href="#411">411</a> <em class="jxr_comment">// Special casing for this exception. This is only one carrying a payload.</em> -<a class="jxr_linenumber" name="412" href="#412">412</a> <em class="jxr_comment">// Do this instead of build a generic system for allowing exceptions carry</em> -<a class="jxr_linenumber" name="413" href="#413">413</a> <em class="jxr_comment">// any kind of payload.</em> -<a class="jxr_linenumber" name="414" href="#414">414</a> <a href="../../../../../org/apache/hadoop/hbase/exceptions/RegionMovedException.html">RegionMovedException</a> rme = (RegionMovedException)t; -<a class="jxr_linenumber" name="415" href="#415">415</a> exceptionBuilder.setHostname(rme.getHostname()); -<a class="jxr_linenumber" name="416" href="#416">416</a> exceptionBuilder.setPort(rme.getPort()); -<a class="jxr_linenumber" name="417" href="#417">417</a> } -<a class="jxr_linenumber" name="418" href="#418">418</a> <em class="jxr_comment">// Set the exception as the result of the method invocation.</em> -<a class="jxr_linenumber" name="419" href="#419">419</a> headerBuilder.setException(exceptionBuilder.build()); -<a class="jxr_linenumber" name="420" href="#420">420</a> } -<a class="jxr_linenumber" name="421" href="#421">421</a> <em class="jxr_comment">// Pass reservoir to buildCellBlock. Keep reference to returne so can add it back to the</em> -<a class="jxr_linenumber" name="422" href="#422">422</a> <em class="jxr_comment">// reservoir when finished. This is hacky and the hack is not contained but benefits are</em> -<a class="jxr_linenumber" name="423" href="#423">423</a> <em class="jxr_comment">// high when we can avoid a big buffer allocation on each rpc.</em> -<a class="jxr_linenumber" name="424" href="#424">424</a> <strong class="jxr_keyword">this</strong>.cellBlock = ipcUtil.buildCellBlock(<strong class="jxr_keyword">this</strong>.connection.codec, -<a class="jxr_linenumber" name="425" href="#425">425</a> <strong class="jxr_keyword">this</strong>.connection.compressionCodec, cells, reservoir); -<a class="jxr_linenumber" name="426" href="#426">426</a> <strong class="jxr_keyword">if</strong> (<strong class="jxr_keyword">this</strong>.cellBlock != <strong class="jxr_keyword">null</strong>) { -<a class="jxr_linenumber" name="427" href="#427">427</a> CellBlockMeta.Builder cellBlockBuilder = CellBlockMeta.newBuilder(); -<a class="jxr_linenumber" name="428" href="#428">428</a> <em class="jxr_comment">// Presumes the cellBlock bytebuffer has been flipped so limit has total size in it.</em> -<a class="jxr_linenumber" name="429" href="#429">429</a> cellBlockBuilder.setLength(<strong class="jxr_keyword">this</strong>.cellBlock.limit()); -<a class="jxr_linenumber" name="430" href="#430">430</a> headerBuilder.setCellBlockMeta(cellBlockBuilder.build()); -<a class="jxr_linenumber" name="431" href="#431">431</a> } -<a class="jxr_linenumber" name="432" href="#432">432</a> Message header = headerBuilder.build(); -<a class="jxr_linenumber" name="433" href="#433">433</a> -<a class="jxr_linenumber" name="434" href="#434">434</a> <em class="jxr_comment">// Organize the response as a set of bytebuffers rather than collect it all together inside</em> -<a class="jxr_linenumber" name="435" href="#435">435</a> <em class="jxr_comment">// one big byte array; save on allocations.</em> -<a class="jxr_linenumber" name="436" href="#436">436</a> ByteBuffer bbHeader = IPCUtil.getDelimitedMessageAsByteBuffer(header); -<a class="jxr_linenumber" name="437" href="#437">437</a> ByteBuffer bbResult = IPCUtil.getDelimitedMessageAsByteBuffer(result); -<a class="jxr_linenumber" name="438" href="#438">438</a> <strong class="jxr_keyword">int</strong> totalSize = bbHeader.capacity() + (bbResult == <strong class="jxr_keyword">null</strong>? 0: bbResult.limit()) + -<a class="jxr_linenumber" name="439" href="#439">439</a> (<strong class="jxr_keyword">this</strong>.cellBlock == <strong class="jxr_keyword">null</strong>? 0: <strong class="jxr_keyword">this</strong>.cellBlock.limit()); -<a class="jxr_linenumber" name="440" href="#440">440</a> ByteBuffer bbTotalSize = ByteBuffer.wrap(Bytes.toBytes(totalSize)); -<a class="jxr_linenumber" name="441" href="#441">441</a> bc = <strong class="jxr_keyword">new</strong> <a href="../../../../../org/apache/hadoop/hbase/ipc/BufferChain.html">BufferChain</a>(bbTotalSize, bbHeader, bbResult, <strong class="jxr_keyword">this</strong>.cellBlock); -<a class="jxr_linenumber" name="442" href="#442">442</a> <strong class="jxr_keyword">if</strong> (connection.useWrap) { -<a class="jxr_linenumber" name="443" href="#443">443</a> bc = wrapWithSasl(bc); -<a class="jxr_linenumber" name="444" href="#444">444</a> } -<a class="jxr_linenumber" name="445" href="#445">445</a> } <strong class="jxr_keyword">catch</strong> (IOException e) { -<a class="jxr_linenumber" name="446" href="#446">446</a> LOG.warn(<span class="jxr_string">"Exception while creating response "</span> + e); -<a class="jxr_linenumber" name="447" href="#447">447</a> } -<a class="jxr_linenumber" name="448" href="#448">448</a> <strong class="jxr_keyword">this</strong>.response = bc; -<a class="jxr_linenumber" name="449" href="#449">449</a> <em class="jxr_comment">// Once a response message is created and set to this.response, this Call can be treated as</em> -<a class="jxr_linenumber" name="450" href="#450">450</a> <em class="jxr_comment">// done. The Responder thread will do the n/w write of this message back to client.</em> -<a class="jxr_linenumber" name="451" href="#451">451</a> <strong class="jxr_keyword">if</strong> (<strong class="jxr_keyword">this</strong>.callback != <strong class="jxr_keyword">null</strong>) { -<a class="jxr_linenumber" name="452" href="#452">452</a> <strong class="jxr_keyword">try</strong> { -<a class="jxr_linenumber" name="453" href="#453">453</a> <strong class="jxr_keyword">this</strong>.callback.run(); -<a class="jxr_linenumber" name="454" href="#454">454</a> } <strong class="jxr_keyword">catch</strong> (Exception e) { -<a class="jxr_linenumber" name="455" href="#455">455</a> <em class="jxr_comment">// Don't allow any exception here to kill this handler thread.</em> -<a class="jxr_linenumber" name="456" href="#456">456</a> LOG.warn(<span class="jxr_string">"Exception while running the Rpc Callback."</span>, e); -<a class="jxr_linenumber" name="457" href="#457">457</a> } -<a class="jxr_linenumber" name="458" href="#458">458</a> } -<a class="jxr_linenumber" name="459" href="#459">459</a> } -<a class="jxr_linenumber" name="460" href="#460">460</a> -<a class="jxr_linenumber" name="461" href="#461">461</a> <strong class="jxr_keyword">private</strong> <a href="../../../../../org/apache/hadoop/hbase/ipc/BufferChain.html">BufferChain</a> wrapWithSasl(<a href="../../../../../org/apache/hadoop/hbase/ipc/BufferChain.html">BufferChain</a> bc) -<a class="jxr_linenumber" name="462" href="#462">462</a> <strong class="jxr_keyword">throws</strong> IOException { -<a class="jxr_linenumber" name="463" href="#463">463</a> <strong class="jxr_keyword">if</strong> (!<strong class="jxr_keyword">this</strong>.connection.useSasl) <strong class="jxr_keyword">return</strong> bc; -<a class="jxr_linenumber" name="464" href="#464">464</a> <em class="jxr_comment">// Looks like no way around this; saslserver wants a byte array. I have to make it one.</em> -<a class="jxr_linenumber" name="465" href="#465">465</a> <em class="jxr_comment">// THIS IS A BIG UGLY COPY.</em> -<a class="jxr_linenumber" name="466" href="#466">466</a> byte [] responseBytes = bc.getBytes(); -<a class="jxr_linenumber" name="467" href="#467">467</a> byte [] token; -<a class="jxr_linenumber" name="468" href="#468">468</a> <em class="jxr_comment">// synchronization may be needed since there can be multiple Handler</em> -<a class="jxr_linenumber" name="469" href="#469">469</a> <em class="jxr_comment">// threads using saslServer to wrap responses.</em> -<a class="jxr_linenumber" name="470" href="#470">470</a> <strong class="jxr_keyword">synchronized</strong> (connection.saslServer) { -<a class="jxr_linenumber" name="471" href="#471">471</a> token = connection.saslServer.wrap(responseBytes, 0, responseBytes.length); -<a class="jxr_linenumber" name="472" href="#472">472</a> } -<a class="jxr_linenumber" name="473" href="#473">473</a> <strong class="jxr_keyword">if</strong> (LOG.isTraceEnabled()) { -<a class="jxr_linenumber" name="474" href="#474">474</a> LOG.trace(<span class="jxr_string">"Adding saslServer wrapped token of size "</span> + token.length -<a class="jxr_linenumber" name="475" href="#475">475</a> + <span class="jxr_string">" as call response."</span>); -<a class="jxr_linenumber" name="476" href="#476">476</a> } -<a class="jxr_linenumber" name="477" href="#477">477</a> -<a class="jxr_linenumber" name="478" href="#478">478</a> ByteBuffer bbTokenLength = ByteBuffer.wrap(Bytes.toBytes(token.length)); -<a class="jxr_linenumber" name="479" href="#479">479</a> ByteBuffer bbTokenBytes = ByteBuffer.wrap(token); -<a class="jxr_linenumber" name="480" href="#480">480</a> <strong class="jxr_keyword">return</strong> <strong class="jxr_keyword">new</strong> <a href="../../../../../org/apache/hadoop/hbase/ipc/BufferChain.html">BufferChain</a>(bbTokenLength, bbTokenBytes); -<a class="jxr_linenumber" name="481" href="#481">481</a> } -<a class="jxr_linenumber" name="482" href="#482">482</a> -<a class="jxr_linenumber" name="483" href="#483">483</a> @Override -<a class="jxr_linenumber" name="484" href="#484">484</a> <strong class="jxr_keyword">public</strong> <strong class="jxr_keyword">synchronized</strong> <strong class="jxr_keyword">void</strong> endDelay(Object result) <strong class="jxr_keyword">throws</strong> IOException { -<a class="jxr_linenumber" name="485" href="#485">485</a> assert <strong class="jxr_keyword">this</strong>.delayResponse; -<a class="jxr_linenumber" name="486" href="#486">486</a> assert <strong class="jxr_keyword">this</strong>.delayReturnValue || result == <strong class="jxr_keyword">null</strong>; -<a class="jxr_linenumber" name="487" href="#487">487</a> <strong class="jxr_keyword">this</strong>.delayResponse = false; -<a class="jxr_linenumber" name="488" href="#488">488</a> delayedCalls.decrementAndGet(); -<a class="jxr_linenumber" name="489" href="#489">489</a> <strong class="jxr_keyword">if</strong> (<strong class="jxr_keyword">this</strong>.delayReturnValue) { -<a class="jxr_linenumber" name="490" href="#490">490</a> <strong class="jxr_keyword">this</strong>.setResponse(result, <strong class="jxr_keyword">null</strong>, <strong class="jxr_keyword">null</strong>, <strong class="jxr_keyword">null</strong>); -<a class="jxr_linenumber" name="491" href="#491">491</a> } -<a class="jxr_linenumber" name="492" href="#492">492</a> <strong class="jxr_keyword">this</strong>.responder.doRespond(<strong class="jxr_keyword">this</strong>); -<a class="jxr_linenumber" name="493" href="#493">493</a> } -<a class="jxr_linenumber" name="494" href="#494">494</a> -<a class="jxr_linenumber" name="495" href="#495">495</a> @Override -<a class="jxr_linenumber" name="496" href="#496">496</a> <strong class="jxr_keyword">public</strong> <strong class="jxr_keyword">synchronized</strong> <strong class="jxr_keyword">void</strong> endDelay() <strong class="jxr_keyword">throws</strong> IOException { -<a class="jxr_linenumber" name="497" href="#497">497</a> <strong class="jxr_keyword">this</strong>.endDelay(<strong class="jxr_keyword">null</strong>); -<a class="jxr_linenumber" name="498" href="#498">498</a> } -<a class="jxr_linenumber" name="499" href="#499">499</a> -<a class="jxr_linenumber" name="500" href="#500">500</a> @Override -<a class="jxr_linenumber" name="501" href="#501">501</a> <strong class="jxr_keyword">public</strong> <strong class="jxr_keyword">synchronized</strong> <strong class="jxr_keyword">void</strong> startDelay(<strong class="jxr_keyword">boolean</strong> delayReturnValue) { -<a class="jxr_linenumber" name="502" href="#502">502</a> assert !<strong class="jxr_keyword">this</strong>.delayResponse; -<a class="jxr_linenumber" name="503" href="#503">503</a> <strong class="jxr_keyword">this</strong>.delayResponse = <strong class="jxr_keyword">true</strong>; -<a class="jxr_linenumber" name="504" href="#504">504</a> <strong class="jxr_keyword">this</strong>.delayReturnValue = delayReturnValue; -<a class="jxr_linenumber" name="505" href="#505">505</a> <strong class="jxr_keyword">int</strong> numDelayed = delayedCalls.incrementAndGet(); -<a class="jxr_linenumber" name="506" href="#506">506</a> <strong class="jxr_keyword">if</strong> (numDelayed > warnDelayedCalls) { -<a class="jxr_linenumber" name="507" href="#507">507</a> LOG.warn(<span class="jxr_string">"Too many delayed calls: limit "</span> + warnDelayedCalls + <span class="jxr_string">" current "</span> + numDelayed); -<a class="jxr_linenumber" name="508" href="#508">508</a> } -<a class="jxr_linenumber" name="509" href="#509">509</a> } -<a class="jxr_linenumber" name="510" href="#510">510</a> -<a class="jxr_linenumber" name="511" href="#511">511</a> @Override -<a class="jxr_linenumber" name="512" href="#512">512</a> <strong class="jxr_keyword">public</strong> <strong class="jxr_keyword">synchronized</strong> <strong class="jxr_keyword">void</strong> endDelayThrowing(Throwable t) <strong class="jxr_keyword">throws</strong> IOException { -<a class="jxr_linenumber" name="513" href="#513">513</a> <strong class="jxr_keyword">this</strong>.setResponse(<strong class="jxr_keyword">null</strong>, <strong class="jxr_keyword">null</strong>, t, StringUtils.stringifyException(t)); -<a class="jxr_linenumber" name="514" href="#514">514</a> <strong class="jxr_keyword">this</strong>.delayResponse = false; -<a class="jxr_linenumber" name="515" href="#515">515</a> <strong class="jxr_keyword">this</strong>.sendResponseIfReady(); -<a class="jxr_linenumber" name="516" href="#516">516</a> } -<a class="jxr_linenumber" name="517" href="#517">517</a> -<a class="jxr_linenumber" name="518" href="#518">518</a> @Override -<a class="jxr_linenumber" name="519" href="#519">519</a> <strong class="jxr_keyword">public</strong> <strong class="jxr_keyword">synchronized</strong> <strong class="jxr_keyword">boolean</strong> isDelayed() { -<a class="jxr_linenumber" name="520" href="#520">520</a> <strong class="jxr_keyword">return</strong> <strong class="jxr_keyword">this</strong>.delayResponse; -<a class="jxr_linenumber" name="521" href="#521">521</a> } -<a class="jxr_linenumber" name="522" href="#522">522</a> -<a class="jxr_linenumber" name="523" href="#523">523</a> @Override -<a class="jxr_linenumber" name="524" href="#524">524</a> <strong class="jxr_keyword">public</strong> <strong class="jxr_keyword">synchronized</strong> <strong class="jxr_keyword">boolean</strong> isReturnValueDelayed() { -<a class="jxr_linenumber" name="525" href="#525">525</a> <strong class="jxr_keyword">return</strong> <strong class="jxr_keyword">this</strong>.delayReturnValue; -<a class="jxr_linenumber" name="526" href="#526">526</a> } -<a class="jxr_linenumber" name="527" href="#527">527</a> -<a class="jxr_linenumber" name="528" href="#528">528</a> @Override -<a class="jxr_linenumber" name="529" href="#529">529</a> <strong class="jxr_keyword">public</strong> <strong class="jxr_keyword">boolean</strong> isClientCellBlockSupported() { -<a class="jxr_linenumber" name="530" href="#530">530</a> <strong class="jxr_keyword">return</strong> <strong class="jxr_keyword">this</strong>.connection != <strong class="jxr_keyword">null</strong> && <strong class="jxr_keyword">this</strong>.connection.codec != <strong class="jxr_keyword">null</strong>; -<a class="jxr_linenumber" name="531" href="#531">531</a> } -<a class="jxr_linenumber" name="532" href="#532">532</a> -<a class="jxr_linenumber" name="533" href="#533">533</a> @Override -<a class="jxr_linenumber" name="534" href="#534">534</a> <strong class="jxr_keyword">public</strong> <strong class="jxr_keyword">long</strong> disconnectSince() { -<a class="jxr_linenumber" name="535" href="#535">535</a> <strong class="jxr_keyword">if</strong> (!connection.channel.isOpen()) { -<a class="jxr_linenumber" name="536" href="#536">536</a> <strong class="jxr_keyword">return</strong> System.currentTimeMillis() - timestamp; -<a class="jxr_linenumber" name="537" href="#537">537</a> } <strong class="jxr_keyword">else</strong> { -<a class="jxr_linenumber" name="538" href="#538">538</a> <strong class="jxr_keyword">return</strong> -1L; -<a class="jxr_linenumber" name="539" href="#539">539</a> } -<a class="jxr_linenumber" name="540" href="#540">540</a> } -<a class="jxr_linenumber" name="541" href="#541">541</a> -<a class="jxr_linenumber" name="542" href="#542">542</a> <strong class="jxr_keyword">public</strong> <strong class="jxr_keyword">long</strong> getSize() { -<a class="jxr_linenumber" name="543" href="#543">543</a> <strong class="jxr_keyword">return</strong> <strong class="jxr_keyword">this</strong>.size; -<a class="jxr_linenumber" name="544" href="#544">544</a> } -<a class="jxr_linenumber" name="545" href="#545">545</a> -<a class="jxr_linenumber" name="546" href="#546">546</a> <strong class="jxr_keyword">public</strong> <strong class="jxr_keyword">long</strong> getResponseCellSize() { -<a class="jxr_linenumber" name="547" href="#547">547</a> <strong class="jxr_keyword">return</strong> responseCellSize; -<a class="jxr_linenumber" name="548" href="#548">548</a> } -<a class="jxr_linenumber" name="549" href="#549">549</a> -<a class="jxr_linenumber" name="550" href="#550">550</a> <strong class="jxr_keyword">public</strong> <strong class="jxr_keyword">void</strong> incrementResponseCellSize(<strong class="jxr_keyword">long</strong> cellSize) { -<a class="jxr_linenumber" name="551" href="#551">551</a> responseCellSize += cellSize; -<a class="jxr_linenumber" name="552" href="#552">552</a> } -<a class="jxr_linenumber" name="553" href="#553">553</a> -<a class="jxr_linenumber" name="554" href="#554">554</a> <em class="jxr_javadoccomment">/**</em> -<a class="jxr_linenumber" name="555" href="#555">555</a> <em class="jxr_javadoccomment"> * If we have a response, and delay is not set, then respond</em> -<a class="jxr_linenumber" name="556" href="#556">556</a> <em class="jxr_javadoccomment"> * immediately. Otherwise, do not respond to client. This is</em> -<a class="jxr_linenumber" name="557" href="#557">557</a> <em class="jxr_javadoccomment"> * called by the RPC code in the context of the Handler thread.</em> -<a class="jxr_linenumber" name="558" href="#558">558</a> <em class="jxr_javadoccomment"> */</em> -<a class="jxr_linenumber" name="559" href="#559">559</a> <strong class="jxr_keyword">public</strong> <strong class="jxr_keyword">synchronized</strong> <strong class="jxr_keyword">void</strong> sendResponseIfReady() <strong class="jxr_keyword">throws</strong> IOException { -<a class="jxr_linenumber" name="560" href="#560">560</a> <strong class="jxr_keyword">if</strong> (!<strong class="jxr_keyword">this</strong>.delayResponse) { -<a class="jxr_linenumber" name="561" href="#561">561</a> <strong class="jxr_keyword">this</strong>.responder.doRespond(<strong class="jxr_keyword">this</strong>); -<a class="jxr_linenumber" name="562" href="#562">562</a> } +<a class="jxr_linenumber" name="322" href="#322">322</a> <strong class="jxr_keyword">private</strong> <strong class="jxr_keyword">long</strong> responseBlockSize = 0; +<a class="jxr_linenumber" name="323" href="#323">323</a> <strong class="jxr_keyword">private</strong> <strong class="jxr_keyword">boolean</strong> retryImmediatelySupported; +<a class="jxr_linenumber" name="324" href="#324">324</a> +<a class="jxr_linenumber" name="325" href="#325">325</a> <a href="../../../../../org/apache/hadoop/hbase/ipc/RpcServer.html">Call</a>(<strong class="jxr_keyword">int</strong> id, <strong class="jxr_keyword">final</strong> BlockingService service, <strong class="jxr_keyword">final</strong> MethodDescriptor md, RequestHeader header, +<a class="jxr_linenumber" name="326" href="#326">326</a> Message param, <a href="../../../../../org/apache/hadoop/hbase/CellScanner.html">CellScanner</a> cellScanner, <a href="../../../../../org/apache/hadoop/hbase/client/Connection.html">Connection</a> connection, <a href="../../../../../org/apache/hadoop/hbase/ipc/RpcServer.html">Responder</a> responder, +<a class="jxr_linenumber" name="327" href="#327">327</a> <strong class="jxr_keyword">long</strong> size, TraceInfo tinfo, <strong class="jxr_keyword">final</strong> InetAddress remoteAddress) { +<a class="jxr_linenumber" name="328" href="#328">328</a> <strong class="jxr_keyword">this</strong>.id = id; +<a class="jxr_linenumber" name="329" href="#329">329</a> <strong class="jxr_keyword">this</strong>.service = service; +<a class="jxr_linenumber" name="330" href="#330">330</a> <strong class="jxr_keyword">this</strong>.md = md; +<a class="jxr_linenumber" name="331" href="#331">331</a> <strong class="jxr_keyword">this</strong>.header = header; +<a class="jxr_linenumber" name="332" href="#332">332</a> <strong class="jxr_keyword">this</strong>.param = param; +<a class="jxr_linenumber" name="333" href="#333">333</a> <strong class="jxr_keyword">this</strong>.cellScanner = cellScanner; +<a class="jxr_linenumber" name="334" href="#334">334</a> <strong class="jxr_keyword">this</strong>.connection = connection; +<a class="jxr_linenumber" name="335" href="#335">335</a> <strong class="jxr_keyword">this</strong>.timestamp = System.currentTimeMillis(); +<a class="jxr_linenumber" name="336" href="#336">336</a> <strong class="jxr_keyword">this</strong>.response = <strong class="jxr_keyword">null</strong>; +<a class="jxr_linenumber" name="337" href="#337">337</a> <strong class="jxr_keyword">this</strong>.delayResponse = false; +<a class="jxr_linenumber" name="338" href="#338">338</a> <strong class="jxr_keyword">this</strong>.responder = responder; +<a class="jxr_linenumber" name="339" href="#339">339</a> <strong class="jxr_keyword">this</strong>.isError = false; +<a class="jxr_linenumber" name="340" href="#340">340</a> <strong class="jxr_keyword">this</strong>.size = size; +<a class="jxr_linenumber" name="341" href="#341">341</a> <strong class="jxr_keyword">this</strong>.tinfo = tinfo; +<a class="jxr_linenumber" name="342" href="#342">342</a> <strong class="jxr_keyword">this</strong>.user = connection.user; +<a class="jxr_linenumber" name="343" href="#343">343</a> <strong class="jxr_keyword">this</strong>.remoteAddress = remoteAddress; +<a class="jxr_linenumber" name="344" href="#344">344</a> <strong class="jxr_keyword">this</strong>.retryImmediatelySupported = connection.retryImmediatelySupported; +<a class="jxr_linenumber" name="345" href="#345">345</a> } +<a class="jxr_linenumber" name="346" href="#346">346</a> +<a class="jxr_linenumber" name="347" href="#347">347</a> <em class="jxr_javadoccomment">/**</em> +<a class="jxr_linenumber" name="348" href="#348">348</a> <em class="jxr_javadoccomment"> * Call is done. Execution happened and we returned results to client. It is now safe to</em> +<a class="jxr_linenumber" name="349" href="#349">349</a> <em class="jxr_javadoccomment"> * cleanup.</em> +<a class="jxr_linenumber" name="350" href="#350">350</a> <em class="jxr_javadoccomment"> */</em> +<a class="jxr_linenumber" name="351" href="#351">351</a> <strong class="jxr_keyword">void</strong> done() { +<a class="jxr_linenumber" name="352" href="#352">352</a> <strong class="jxr_keyword">if</strong> (<strong class="jxr_keyword">this</strong>.cellBlock != <strong class="jxr_keyword">null</strong>) { +<a class="jxr_linenumber" name="353" href="#353">353</a> <em class="jxr_comment">// Return buffer to reservoir now we are done with it.</em> +<a class="jxr_linenumber" name="354" href="#354">354</a> reservoir.putBuffer(<strong class="jxr_keyword">this</strong>.cellBlock); +<a class="jxr_linenumber" name="355" href="#355">355</a> <strong class="jxr_keyword">this</strong>.cellBlock = <strong class="jxr_keyword">null</strong>; +<a class="jxr_linenumber" name="356" href="#356">356</a> } +<a class="jxr_linenumber" name="357" href="#357">357</a> <strong class="jxr_keyword">this</strong>.connection.decRpcCount(); <em class="jxr_comment">// Say that we're done with this call.</em> +<a class="jxr_linenumber" name="358" href="#358">358</a> } +<a class="jxr_linenumber" name="359" href="#359">359</a> +<a class="jxr_linenumber" name="360" href="#360">360</a> @Override +<a class="jxr_linenumber" name="361" href="#361">361</a> <strong class="jxr_keyword">public</strong> String toString() { +<a class="jxr_linenumber" name="362" href="#362">362</a> <strong class="jxr_keyword">return</strong> toShortString() + <span class="jxr_string">" param: "</span> + +<a class="jxr_linenumber" name="363" href="#363">363</a> (<strong class="jxr_keyword">this</strong>.param != <strong class="jxr_keyword">null</strong>? ProtobufUtil.getShortTextFormat(<strong class="jxr_keyword">this</strong>.param): <span class="jxr_string">""</span>) + +<a class="jxr_linenumber" name="364" href="#364">364</a> <span class="jxr_string">" connection: "</span> + connection.toString(); +<a class="jxr_linenumber" name="365" href="#365">365</a> } +<a class="jxr_linenumber" name="366" href="#366">366</a> +<a class="jxr_linenumber" name="367" href="#367">367</a> <strong class="jxr_keyword">protected</strong> RequestHeader getHeader() { +<a class="jxr_linenumber" name="368" href="#368">368</a> <strong class="jxr_keyword">return</strong> <strong class="jxr_keyword">this</strong>.header; +<a class="jxr_linenumber" name="369" href="#369">369</a> } +<a class="jxr_linenumber" name="370" href="#370">370</a> +<a class="jxr_linenumber" name="371" href="#371">371</a> <em class="jxr_comment">/*</em> +<a class="jxr_linenumber" name="372" href="#372">372</a> <em class="jxr_comment"> * Short string representation without param info because param itself could be huge depends on</em> +<a class="jxr_linenumber" name="373" href="#373">373</a> <em class="jxr_comment"> * the payload of a command</em> +<a class="jxr_linenumber" name="374" href="#374">374</a> <em class="jxr_comment"> */</em> +<a class="jxr_linenumber" name="375" href="#375">375</a> String toShortString() { +<a class="jxr_linenumber" name="376" href="#376">376</a> String serviceName = <strong class="jxr_keyword">this</strong>.connection.service != <strong class="jxr_keyword">null</strong> ? +<a class="jxr_linenumber" name="377" href="#377">377</a> <strong class="jxr_keyword">this</strong>.connection.service.getDescriptorForType().getName() : <span class="jxr_string">"null"</span>; +<a class="jxr_linenumber" name="378" href="#378">378</a> <strong class="jxr_keyword">return</strong> <span class="jxr_string">"callId: "</span> + <strong class="jxr_keyword">this</strong>.id + <span class="jxr_string">" service: "</span> + serviceName + +<a class="jxr_linenumber" name="379" href="#379">379</a> <span class="jxr_string">" methodName: "</span> + ((<strong class="jxr_keyword">this</strong>.md != <strong class="jxr_keyword">null</strong>) ? <strong class="jxr_keyword">this</strong>.md.getName() : <span class="jxr_string">"n/a"</span>) + +<a class="jxr_linenumber" name="380" href="#380">380</a> <span class="jxr_string">" size: "</span> + StringUtils.TraditionalBinaryPrefix.<strong class="jxr_keyword">long</strong>2String(<strong class="jxr_keyword">this</strong>.size, <span class="jxr_string">""</span>, 1) + +<a class="jxr_linenumber" name="381" href="#381">381</a> <span class="jxr_string">" connection: "</span> + connection.toString(); +<a class="jxr_linenumber" name="382" href="#382">382</a> } +<a class="jxr_linenumber" name="383" href="#383">383</a> +<a class="jxr_linenumber" name="384" href="#384">384</a> String toTraceString() { +<a class="jxr_linenumber" name="385" href="#385">385</a> String serviceName = <strong class="jxr_keyword">this</strong>.connection.service != <strong class="jxr_keyword">null</strong> ? +<a class="jxr_linenumber" name="386" href="#386">386</a> <strong class="jxr_keyword">this</strong>.connection.service.getDescriptorForType().getName() : <span class="jxr_string">""</span>; +<a class="jxr_linenumber" name="387" href="#387">387</a> String methodName = (<strong class="jxr_keyword">this</strong>.md != <strong class="jxr_keyword">null</strong>) ? <strong class="jxr_keyword">this</strong>.md.getName() : <span class="jxr_string">""</span>; +<a class="jxr_linenumber" name="388" href="#388">388</a> <strong class="jxr_keyword">return</strong> serviceName + <span class="jxr_string">"."</span> + methodName; +<a class="jxr_linenumber" name="389" href="#389">389</a> } +<a class="jxr_linenumber" name="390" href="#390">390</a> +<a class="jxr_linenumber" name="391" href="#391">391</a> <strong class="jxr_keyword">protected</strong> <strong class="jxr_keyword">synchronized</strong> <strong class="jxr_keyword">void</strong> setSaslTokenResponse(ByteBuffer response) { +<a class="jxr_linenumber" name="392" href="#392">392</a> <strong class="jxr_keyword">this</strong>.response = <strong class="jxr_keyword">new</strong> <a href="../../../../../org/apache/hadoop/hbase/ipc/BufferChain.html">BufferChain</a>(response); +<a class="jxr_linenumber" name="393" href="#393">393</a> } +<a class="jxr_linenumber" name="394" href="#394">394</a> +<a class="jxr_linenumber" name="395" href="#395">395</a> <strong class="jxr_keyword">protected</strong> <strong class="jxr_keyword">synchronized</strong> <strong class="jxr_keyword">void</strong> setResponse(Object m, <strong class="jxr_keyword">final</strong> <a href="../../../../../org/apache/hadoop/hbase/CellScanner.html">CellScanner</a> cells, +<a class="jxr_linenumber" name="396" href="#396">396</a> Throwable t, String errorMsg) { +<a class="jxr_linenumber" name="397" href="#397">397</a> <strong class="jxr_keyword">if</strong> (<strong class="jxr_keyword">this</strong>.isError) <strong class="jxr_keyword">return</strong>; +<a class="jxr_linenumber" name="398" href="#398">398</a> <strong class="jxr_keyword">if</strong> (t != <strong class="jxr_keyword">null</strong>) <strong class="jxr_keyword">this</strong>.isError = <strong class="jxr_keyword">true</strong>; +<a class="jxr_linenumber" name="399" href="#399">399</a> <a href="../../../../../org/apache/hadoop/hbase/ipc/BufferChain.html">BufferChain</a> bc = <strong class="jxr_keyword">null</strong>; +<a class="jxr_linenumber" name="400" href="#400">400</a> <strong class="jxr_keyword">try</strong> { +<a class="jxr_linenumber" name="401" href="#401">401</a> ResponseHeader.Builder headerBuilder = ResponseHeader.newBuilder(); +<a class="jxr_linenumber" name="402" href="#402">402</a> <em class="jxr_comment">// Presume it a pb Message. Could be null.</em> +<a class="jxr_linenumber" name="403" href="#403">403</a> Message result = (Message)m; +<a class="jxr_linenumber" name="404" href="#404">404</a> <em class="jxr_comment">// Call id.</em> +<a class="jxr_linenumber" name="405" href="#405">405</a> headerBuilder.setCallId(<strong class="jxr_keyword">this</strong>.id); +<a class="jxr_linenumber" name="406" href="#406">406</a> <strong class="jxr_keyword">if</strong> (t != <strong class="jxr_keyword">null</strong>) { +<a class="jxr_linenumber" name="407" href="#407">407</a> ExceptionResponse.Builder exceptionBuilder = ExceptionResponse.newBuilder(); +<a class="jxr_linenumber" name="408" href="#408">408</a> exceptionBuilder.setExceptionClassName(t.getClass().getName()); +<a class="jxr_linenumber" name="409" href="#409">409</a> exceptionBuilder.setStackTrace(errorMsg); +<a class="jxr_linenumber" name="410" href="#410">410</a> exceptionBuilder.setDoNotRetry(t instanceof <a href="../../../../../org/apache/hadoop/hbase/DoNotRetryIOException.html">DoNotRetryIOException</a>); +<a class="jxr_linenumber" name="411" href="#411">411</a> <strong class="jxr_keyword">if</strong> (t instanceof RegionMovedException) { +<a class="jxr_linenumber" name="412" href="#412">412</a> <em class="jxr_comment">// Special casing for this exception. This is only one carrying a payload.</em> +<a class="jxr_linenumber" name="413" href="#413">413</a> <em class="jxr_comment">// Do this instead of build a generic system for allowing exceptions carry</em> +<a class="jxr_linenumber" name="414" href="#414">414</a> <em class="jxr_comment">// any kind of payload.</em> +<a class="jxr_linenumber" name="415" href="#415">415</a> <a href="../../../../../org/apache/hadoop/hbase/exceptions/RegionMovedException.html">RegionMovedException</a> rme = (RegionMovedException)t; +<a class="jxr_linenumber" name="416" href="#416">416</a> exceptionBuilder.setHostname(rme.getHostname()); +<a class="jxr_linenumber" name="417" href="#417">417</a> exceptionBuilder.setPort(rme.getPort()); +<a class="jxr_linenumber" name="418" href="#418">418</a> } +<a class="jxr_linenumber" name="419" href="#419">419</a> <em class="jxr_comment">// Set the exception as the result of the method invocation.</em> +<a class="jxr_linenumber" name="420" href="#420">420</a> headerBuilder.setException(exceptionBuilder.build()); +<a class="jxr_linenumber" name="421" href="#421">421</a> } +<a class="jxr_linenumber" name="422" href="#422">422</a> <em class="jxr_comment">// Pass reservoir to buildCellBlock. Keep reference to returne so can add it back to the</em> +<a class="jxr_linenumber" name="423" href="#423">423</a> <em class="jxr_comment">// reservoir when finished. This is hacky and the hack is not contained but benefits are</em> +<a class="jxr_linenumber" name="424" href="#424">424</a> <em class="jxr_comment">// high when we can avoid a big buffer allocation on each rpc.</em> +<a class="jxr_linenumber" name="425" href="#425">425</a> <strong class="jxr_keyword">this</strong>.cellBlock = ipcUtil.buildCellBlock(<strong class="jxr_keyword">this</strong>.connection.codec, +<a class="jxr_linenumber" name="426" href="#426">426</a> <strong class="jxr_keyword">this</strong>.connection.compressionCodec, cells, reservoir); +<a class="jxr_linenumber" name="427" href="#427">427</a> <strong class="jxr_keyword">if</strong> (<strong class="jxr_keyword">this</strong>.cellBlock != <strong class="jxr_keyword">null</strong>) { +<a class="jxr_linenumber" name="428" href="#428">428</a> CellBlockMeta.Builder cellBlockBuilder = CellBlockMeta.newBuilder(); +<a class="jxr_linenumber" name="429" href="#429">429</a> <em class="jxr_comment">// Presumes the cellBlock bytebuffer has been flipped so limit has total size in it.</em> +<a class="jxr_linenumber" name="430" href="#430">430</a> cellBlockBuilder.setLength(<strong class="jxr_keyword">this</strong>.cellBlock.limit()); +<a class="jxr_linenumber" name="431" href="#431">431</a> headerBuilder.setCellBlockMeta(cellBlockBuilder.build()); +<a class="jxr_linenumber" name="432" href="#432">432</a> } +<a class="jxr_linenumber" name="433" href="#433">433</a> Message header = headerBuilder.build(); +<a class="jxr_linenumber" name="434" href="#434">434</a> +<a class="jxr_linenumber" name="435" href="#435">435</a> <em class="jxr_comment">// Organize the response as a set of bytebuffers rather than collect it all together inside</em> +<a class="jxr_linenumber" name="436" href="#436">436</a> <em class="jxr_comment">// one big byte array; save on allocations.</em> +<a class="jxr_linenumber" name="437" href="#437">437</a> ByteBuffer bbHeader = IPCUtil.getDelimitedMessageAsByteBuffer(header); +<a class="jxr_linenumber" name="438" href="#438">438</a> ByteBuffer bbResult = IPCUtil.getDelimitedMessageAsByteBuffer(result); +<a class="jxr_linenumber" name="439" href="#439">439</a> <strong class="jxr_keyword">int</strong> totalSize = bbHeader.capacity() + (bbResult == <strong class="jxr_keyword">null</strong>? 0: bbResult.limit()) + +<a class="jxr_linenumber" name="440" href="#440">440</a> (<strong class="jxr_keyword">this</strong>.cellBlock == <strong class="jxr_keyword">null</strong>? 0: <strong class="jxr_keyword">this</strong>.cellBlock.limit()); +<a class="jxr_linenumber" name="441" href="#441">441</a> ByteBuffer bbTotalSize = ByteBuffer.wrap(Bytes.toBytes(totalSize)); +<a class="jxr_linenumber" name="442" href="#442">442</a> bc = <strong class="jxr_keyword">new</strong> <a href="../../../../../org/apache/hadoop/hbase/ipc/BufferChain.html">BufferChain</a>(bbTotalSize, bbHeader, bbResult, <strong class="jxr_keyword">this</strong>.cellBlock); +<a class="jxr_linenumber" name="443" href="#443">443</a> <strong class="jxr_keyword">if</strong> (connection.useWrap) { +<a class="jxr_linenumber" name="444" href="#444">444</a> bc = wrapWithSasl(bc); +<a class="jxr_linenumber" name="445" href="#445">445</a> } +<a class="jxr_linenumber" name="446" href="#446">446</a> } <strong class="jxr_keyword">catch</strong> (IOException e) { +<a class="jxr_linenumber" name="447" href="#447">447</a> LOG.warn(<span class="jxr_string">"Exception while creating response "</span> + e); +<a class="jxr_linenumber" name="448" href="#448">448</a> } +<a class="jxr_linenumber" name="449" href="#449">449</a> <strong class="jxr_keyword">this</strong>.response = bc; +<a class="jxr_linenumber" name="450" href="#450">450</a> <em class="jxr_comment">// Once a response message is created and set to this.response, this Call can be treated as</em> +<a class="jxr_linenumber" name="451" href="#451">451</a> <em class="jxr_comment">// done. The Responder thread will do the n/w write of this message back to client.</em> +<a class="jxr_linenumber" name="452" href="#452">452</a> <strong class="jxr_keyword">if</strong> (<strong class="jxr_keyword">this</strong>.callback != <strong class="jxr_keyword">null</strong>) { +<a class="jxr_linenumber" name="453" href="#453">453</a> <strong class="jxr_keyword">try</strong> { +<a class="jxr_linenumber" name="454" href="#454">454</a> <strong class="jxr_keyword">this</strong>.callback.run(); +<a class="jxr_linenumber" name="455" href="#455">455</a> } <strong class="jxr_keyword">catch</strong> (Exception e) { +<a class="jxr_linenumber" name="456" href="#456">456</a> <em class="jxr_comment">// Don't allow any exception here to kill this handler thread.</em> +<a class="jxr_linenumber" name="457" href="#457">457</a> LOG.warn(<span class="jxr_string">"Exception while running the Rpc Callback."</span>, e); +<a class="jxr_linenumber" name="458" href="#458">458</a> } +<a class="jxr_linenumber" name="459" href="#459">459</a> } +<a class="jxr_linenumber" name="460" href="#460">460</a> } +<a class="jxr_linenumber" name="461" href="#461">461</a> +<a class="jxr_linenumber" name="462" href="#462">462</a> <strong class="jxr_keyword">private</strong> <a href="../../../../../org/apache/hadoop/hbase/ipc/BufferChain.html">BufferChain</a> wrapWithSasl(<a href="../../../../../org/apache/hadoop/hbase/ipc/BufferChain.html">BufferChain</a> bc) +<a class="jxr_linenumber" name="463" href="#463">463</a> <strong class="jxr_keyword">throws</strong> IOException { +<a class="jxr_linenumber" name="464" href="#464">464</a> <strong class="jxr_keyword">if</strong> (!<strong class="jxr_keyword">this</strong>.connection.useSasl) <strong class="jxr_keyword">return</strong> bc; +<a class="jxr_linenumber" name="465" href="#465">465</a> <em class="jxr_comment">// Looks like no way around this; saslserver wants a byte array. I have to make it one.</em> +<a class="jxr_linenumber" name="466" href="#466">466</a> <em class="jxr_comment">// THIS IS A BIG UGLY COPY.</em> +<a class="jxr_linenumber" name="467" href="#467">467</a> byte [] responseBytes = bc.getBytes(); +<a class="jxr_linenumber" name="468" href="#468">468</a> byte [] token; +<a class="jxr_linenumber" name="469" href="#469">469</a> <em class="jxr_comment">// synchronization may be needed since there can be multiple Handler</em> +<a class="jxr_linenumber" name="470" href="#470">470</a> <em class="jxr_comment">// threads using saslServer to wrap responses.</em> +<a class="jxr_linenumber" name="471" href="#471">471</a> <strong class="jxr_keyword">synchronized</strong> (connection.saslServer) { +<a class="jxr_linenumber" name="472" href="#472">472</a> token = connection.saslServer.wrap(responseBytes, 0, responseBytes.length); +<a class="jxr_linenumber" name="473" href="#473">473</a> } +<a class="jxr_linenumber" name="474" href="#474">474</a> <strong class="jxr_keyword">if</strong> (LOG.isTraceEnabled()) { +<a class="jxr_linenumber" name="475" href="#475">475</a> LOG.trace(<span class="jxr_string">"Adding saslServer wrapped token of size "</span> + token.length +<a class="jxr_linenumber" name="476" href="#476">476</a> + <span class="jxr_string">" as call response."</span>); +<a class="jxr_linenumber" name="477" href="#477">477</a> } +<a class="jxr_linenumber" name="478" href="#478">478</a> +<a class="jxr_linenumber" name="479" href="#479">479</a> ByteBuffer bbTokenLength = ByteBuffer.wrap(Bytes.toBytes(token.length)); +<a class="jxr_linenumber" name="480" href="#480">480</a> ByteBuffer bbTokenBytes = ByteBuffer.wrap(token); +<a class="jxr_linenumber" name="481" href="#481">481</a> <strong class="jxr_keyword">return</strong> <strong class="jxr_keyword">new</strong> <a href="../../../../../org/apache/hadoop/hbase/ipc/BufferChain.html">BufferChain</a>(bbTokenLength, bbTokenBytes); +<a class="jxr_linenumber" name="482" href="#482">482</a> } +<a class="jxr_linenumber" name="483" href="#483">483</a> +<a class="jxr_linenumber" name="484" href="#484">484</a> @Override +<a class="jxr_linenumber" name="485" href="#485">485</a> <strong class="jxr_keyword">public</strong> <strong class="jxr_keyword">synchronized</strong> <strong class="jxr_keyword">void</strong> endDelay(Object result) <strong class="jxr_keyword">throws</strong> IOException { +<a class="jxr_linenumber" name="486" href="#486">486</a> assert <strong class="jxr_keyword">this</strong>.delayResponse; +<a class="jxr_linenumber" name="487" href="#487">487</a> assert <strong class="jxr_keyword">this</strong>.delayReturnValue || result == <strong class="jxr_keyword">null</strong>; +<a class="jxr_linenumber" name="488" href="#488">488</a> <strong class="jxr_keyword">this</strong>.delayResponse = false; +<a class="jxr_linenumber" name="489" href="#489">489</a> delayedCalls.decrementAndGet(); +<a class="jxr_linenumber" name="490" href="#490">490</a> <strong class="jxr_keyword">if</strong> (<strong class="jxr_keyword">this</strong>.delayReturnValue) { +<a class="jxr_linenumber" name="491" href="#491">491</a> <strong class="jxr_keyword">this</strong>.setResponse(result, <strong class="jxr_keyword">null</strong>, <strong class="jxr_keyword">null</strong>, <strong class="jxr_keyword">null</strong>); +<a class="jxr_linenumber" name="492" href="#492">492</a> } +<a class="jxr_linenumber" name="493" href="#493">493</a> <strong class="jxr_keyword">this</strong>.responder.doRespond(<strong class="jxr_keyword">this</strong>); +<a class="jxr_linenumber" name="494" href="#494">494</a> } +<a class="jxr_linenumber" name="495" href="#495">495</a> +<a class="jxr_linenumber" name="496" href="#496">496</a> @Override +<a class="jxr_linenumber" name="497" href="#497">497</a> <strong class="jxr_keyword">public</strong> <strong class="jxr_keyword">synchronized</strong> <strong class="jxr_keyword">void</strong> endDelay() <strong class="jxr_keyword">throws</strong> IOException { +<a class="jxr_linenumber" name="498" href="#498">498</a> <strong class="jxr_keyword">this</strong>.endDelay(<strong class="jxr_keyword">null</strong>); +<a class="jxr_linenumber" name="499" href="#499">499</a> } +<a class="jxr_linenumber" name="500" href="#500">500</a> +<a class="jxr_linenumber" name="501" href="#501">501</a> @Override +<a class="jxr_linenumber" name="502" href="#502">502</a> <strong class="jxr_keyword">public</strong> <strong class="jxr_keyword">synchronized</strong> <strong class="jxr_keyword">void</strong> startDelay(<strong class="jxr_keyword">boolean</strong> delayReturnValue) { +<a class="jxr_linenumber" name="503" href="#503">503</a> assert !<strong class="jxr_keyword">this</strong>.delayResponse; +<a class="jxr_linenumber" name="504" href="#504">504</a> <strong class="jxr_keyword">this</strong>.delayResponse = <strong class="jxr_keyword">true</strong>; +<a class="jxr_linenumber" name="505" href="#505">505</a> <strong class="jxr_keyword">this</strong>.delayReturnValue = delayReturnValue; +<a class="jxr_linenumber" name="506" href="#506">506</a> <strong class="jxr_keyword">int</strong> numDelayed = delayedCalls.incrementAndGet(); +<a class="jxr_linenumber" name="507" href="#507">507</a> <strong class="jxr_keyword">if</strong> (numDelayed > warnDelayedCalls) { +<a class="jxr_linenumber" name="508" href="#508">508</a> LOG.warn(<span class="jxr_string">"Too many delayed calls: limit "</span> + warnDelayedCalls + <span class="jxr_string">" current "</span> + numDelayed); +<a class="jxr_linenumber" name="509" href="#509">509</a> } +<a class="jxr_linenumber" name="510" href="#510">510</a> } +<a class="jxr_linenumber" name="511" href="#511">511</a> +<a class="jxr_linenumber" name="512" href="#512">512</a> @Override +<a class="jxr_linenumber" name="513" href="#513">513</a> <strong class="jxr_keyword">public</strong> <strong class="jxr_keyword">synchronized</strong> <strong class="jxr_keyword">void</strong> endDelayThrowing(Throwable t) <strong class="jxr_keyword">throws</strong> IOException { +<a class="jxr_linenumber" name="514" href="#514">514</a> <strong class="jxr_keyword">this</strong>.setResponse(<strong class="jxr_keyword">null</strong>, <strong class="jxr_keyword">null</strong>, t, StringUtils.stringifyException(t)); +<a class="jxr_linenumber" name="515" href="#515">515</a> <strong class="jxr_keyword">this</strong>.delayResponse = false; +<a class="jxr_linenumber" name="516" href="#516">516</a> <strong class="jxr_keyword">this</strong>.sendResponseIfReady(); +<a class="jxr_linenumber" name="517" href="#517">517</a> } +<a class="jxr_linenumber" name="518" href="#518">518</a> +<a class="jxr_linenumber" name="519" href="#519">519</a> @Override +<a class="jxr_linenumber" name="520" href="#520">520</a> <strong class="jxr_keyword">public</strong> <strong class="jxr_keyword">synchronized</strong> <strong class="jxr_keyword">boolean</strong> isDelayed() { +<a class="jxr_linenumber" name="521" href="#521">521</a> <strong class="jxr_keyword">return</strong> <strong class="jxr_keyword">this</strong>.delayResponse; +<a class="jxr_linenumber" name="522" href="#522">522</a> } +<a class="jxr_linenumber" name="523" href="#523">523</a> +<a class="jxr_linenumber" name="524" href="#524">524</a> @Override +<a class="jxr_linenumber" name="525" href="#525">525</a> <strong class="jxr_keyword">public</strong> <strong class="jxr_keyword">synchronized</strong> <strong class="jxr_keyword">boolean</strong> isReturnValueDelayed() { +<a class="jxr_linenumber" name="526" href="#526">526</a> <strong class="jxr_keyword">return</strong> <strong class="jxr_keyword">this</strong>.delayReturnValue; +<a class="jxr_linenumber" name="527" href="#527">527</a> } +<a class="jxr_linenumber" name="528" href="#528">528</a> +<a class="jxr_linenumber" name="529" href="#529">529</a> @Override +<a class="jxr_linenumber" name="530" href="#530">530</a> <strong class="jxr_keyword">public</strong> <strong class="jxr_keyword">boolean</strong> isClientCellBlockSupported() { +<a class="jxr_linenumber" name="531" href="#531">531</a> <strong class="jxr_keyword">return</strong> <strong class="jxr_keyword">this</strong>.connection != <strong class="jxr_keyword">null</strong> && <strong class="jxr_keyword">this</strong>.connection.codec != <strong class="jxr_keyword">null</strong>; +<a class="jxr_linenumber" name="532" href="#532">532</a> } +<a class="jxr_linenumber" name="533" href="#533">533</a> +<a class="jxr_linenumber" name="534" href="#534">534</a> @Override +<a class="jxr_linenumber" name="535" href="#535">535</a> <strong class="jxr_keyword">public</strong> <strong class="jxr_keyword">long</strong> disconnectSince() { +<a class="jxr_linenumber" name="536" href="#536">536</a> <strong class="jxr_keyword">if</strong> (!connection.channel.isOpen()) { +<a class="jxr_linenumber" name="537" href="#537">537</a> <strong class="jxr_keyword">return</strong> System.currentTimeMillis() - timestamp; +<a class="jxr_linenumber" name="538" href="#538">538</a> } <strong class="jxr_keyword">else</strong> { +<a class="jxr_linenumber" name="539" href="#539">539</a> <strong class="jxr_keyword">return</strong> -1L; +<a class="jxr_linenumber" name="540" href="#540">540</a> } +<a class="jxr_linenumber" name="541" href="#541">541</a> } +<a class="jxr_linenumber" name="542" href="#542">542</a> +<a class="jxr_linenumber" name="543" href="#543">543</a> <strong class="jxr_keyword">public</strong> <strong class="jxr_keyword">long</strong> getSize() { +<a class="jxr_linenumber" name="544" href="#544">544</a> <strong class="jxr_keyword">return</strong> <strong class="jxr_keyword">this</strong>.size; +<a class="jxr_linenumber" name="545" href="#545">545</a> } +<a class="jxr_linenumber" name="546" href="#546">546</a> +<a class="jxr_linenumber" name="547" href="#547">547</a> <strong class="jxr_keyword">public</strong> <strong class="jxr_keyword">long</strong> getResponseCellSize() { +<a class="jxr_linenumber" name="548" href="#548">548</a> <strong class="jxr_keyword">return</strong> responseCellSize; +<a class="jxr_linenumber" name="549" href="#549">549</a> } +<a class="jxr_linenumber" name="550" href="#550">550</a> +<a class="jxr_linenumber" name="551" href="#551">551</a> <strong class="jxr_keyword">public</strong> <strong class="jxr_keyword">void</strong> incrementResponseCellSize(<strong class="jxr_keyword">long</strong> cellSize) { +<a class="jxr_linenumber" name="552" href="#552">552</a> responseCellSize += cellSize; +<a class="jxr_linenumber" name="553" href="#553">553</a> } +<a class="jxr_linenumber" name="554" href="#554">554</a> +<a class="jxr_linenumber" name="555" href="#555">555</a> @Override +<a class="jxr_linenumber" name="556" href="#556">556</a> <strong class="jxr_keyword">public</strong> <strong class="jxr_keyword">long</strong> getResponseBlockSize() { +<a class="jxr_linenumber" name="557" href="#557">557</a> <strong class="jxr_keyword">return</strong> responseBlockSize; +<a class="jxr_linenumber" name="558" href="#558">558</a> } +<a class="jxr_linenumber" name="559" href="#559">559</a> +<a class="jxr_linenumber" name="560" href="#560">560</a> @Override +<a class="jxr_linenumber" name="561" href="#561">561</a> <strong class="jxr_keyword">public</strong> <strong class="jxr_keyword">void</strong> incrementResponseBlockSize(<strong class="jxr_keyword">long</strong> blockSize) { +<a class="jxr_linenumber" name="562" href="#562">562</a> responseBlockSize += blockSize; <a class="jxr_linenumber" name="563" href="#563">563</a> } <a class="jxr_linenumber" name="564" href="#564">564</a> -<a class="jxr_linenumber" name="565" href="#565">565</a> <strong class="jxr_keyword">public</strong> UserGroupInformation getRemoteUser() { -<a class="jxr_linenumber" name="566" href="#566">566</a> <strong class="jxr_keyword">return</strong> connection.ugi; -<a class="jxr_linenumber" name="567" href="#567">567</a> } -<a class="jxr_linenumber" name="568" href="#568">568</a> -<a class="jxr_linenumber" name="569" href="#569">569</a> @Override -<a class="jxr_linenumber" name="570" href="#570">570</a> <strong class="jxr_keyword">public</strong> <a href="../../../../../org/apache/hadoop/hbase/security/User.html">User</a> getRequestUser() { -<a class="jxr_linenumber" name="571" href="#571">571</a> <strong class="jxr_keyword">return</strong> user; -<a class="jxr_linenumber" name="572" href="#572">572</a> } -<a class="jxr_linenumber" name="573" href="#573">573</a> -<a class="jxr_linenumber" name="574" href="#574">574</a> @Override -<a class="jxr_linenumber" name="575" href="#575">575</a> <strong class="jxr_keyword">public</strong> String getRequestUserName() { -<a class="jxr_linenumber" name="576" href="#576">576</a> <a href="../../../../../org/apache/hadoop/hbase/security/User.html">User</a> user = getRequestUser(); -<a class="jxr_linenumber" name="577" href="#577">577</a> <strong class="jxr_keyword">return</strong> user == <strong class="jxr_keyword">null</strong>? <strong class="jxr_keyword">null</strong>: user.getShortName(); +<a class="jxr_linenumber" name="565" href="#565">565</a> <em class="jxr_javadoccomment">/**</em> +<a class="jxr_linenumber" name="566" href="#566">566</a> <em class="jxr_javadoccomment"> * If we have a response, and delay is not set, then respond</em> +<a class="jxr_linenumber" name="567" href="#567">567</a> <em class="jxr_javadoccomment"> * immediately. Otherwise, do not respond to client. This is</em> +<a class="jxr_linenumber" name="568" href="#568">568</a> <em class="jxr_javadoccomment"> * called by the RPC code in the context of the Handler thread.</em> +<a class="jxr_linenumber" name="569" href="#569">569</a> <em class="jxr_javadoccomment"> */</em> +<a class="jxr_linenumber" name="570" href="#570">570</a> <strong class="jxr_keyword">public</strong> <strong class="jxr_keyword">synchronized</strong> <strong class="jxr_keyword">void</strong> sendResponseIfReady() <strong class="jxr_keyword">throws</strong> IOException { +<a class="jxr_linenumber" name="571" href="#571">571</a> <strong class="jxr_keyword">if</strong> (!<strong class="jxr_keyword">this</strong>.delayResponse) { +<a class="jxr_linenumber" name="572" href="#572">572</a> <strong class="jxr_keyword">this</strong>.responder.doRespond(<strong class="jxr_keyword">this</strong>); +<a class="jxr_linenumber" name="573" href="#573">573</a> } +<a class="jxr_linenumber" name="574" href="#574">574</a> } +<a class="jxr_linenumber" name="575" href="#575">575</a> +<a class="jxr_linenumber" name="576" href="#576">576</a> <strong class="jxr_keyword">public</strong> UserGroupInformation getRemoteUser() { +<a class="jxr_linenumber" name="577" href="#577">577</a> <strong class="jxr_keyword">return</strong> connection.ugi; <a class="jxr_linenumber" name="578" href="#578">578</a> } <a class="jxr_linenumber" name="579" href="#579">579</a> <a class="jxr_linenumber" name="580" href="#580">580</a> @Override -<a class="jxr_linenumber" name="581" href="#581">581</a> <strong class="jxr_keyword">public</strong> InetAddress getRemoteAddress() { -<a class="jxr_linenumber" name="582" href="#582">582</a> <strong class="jxr_keyword">return</strong> remoteAddress; +<a class="jxr_linenumber" name="581" href="#581">581</a> <strong class="jxr_keyword">public</strong> <a href="../../../../../org/apache/hadoop/hbase/security/User.html">User</a> getRequestUser() { +<a class="jxr_linenumber" name="582" href="#582">582</a> <strong class="jxr_keyword">return</strong> user; <a class="jxr_linenumber" name="583" href="#583">583</a> } <a class="jxr_linenumber" name="584" href="#584">584</a> <a class="jxr_linenumber" name="585" href="#585">585</a> @Override -<a class="jxr_linenumber" name="586" href="#586">586</a> <strong class="jxr_keyword">public</strong> <a href="../../../../../org/apache/hadoop/hbase/util/VersionInfo.html">VersionInfo</a> getClientVersionInfo() { -<a class="jxr_linenumber" name="587" href="#587">587</a> <strong class="jxr_keyword">return</strong> connection.getVersionInfo(); -<a class="jxr_linenumber" name="588" href="#588">588</a> } -<a class="jxr_linenumber" name="589" href="#589">589</a> -<a class="jxr_linenumber" name="590" href="#590">590</a> @Override -<a class="jxr_linenumber" name="591" href="#591">591</a> <strong class="jxr_keyword">public</strong> <strong class="jxr_keyword">void</strong> setCallBack(<a href="../../../../../org/apache/hadoop/hbase/ipc/RpcCallback.html">RpcCallback</a> callback) { -<a class="jxr_linenumber" name="592" href="#592">592</a> <strong class="jxr_keyword">this</strong>.callback = callback; -<a class="jxr_linenumber" name="593" href="#593">593</a> } -<a class="jxr_linenumber" name="594" href="#594">594</a> -<a class="jxr_linenumber" name="595" href="#595">595</a> @Override -<a class="jxr_linenumber" name="596" href="#596">596</a> <strong class="jxr_keyword">public</strong> <strong class="jxr_keyword">boolean</strong> isRetryImmediatelySupported() { -<a class="jxr_linenumber" name="597" href="#597">597</a> <strong class="jxr_keyword">return</strong> retryImmediatelySupported; -<a class="jxr_linenumber" name="598" href="#598">598</a> } -<a class="jxr_linenumber" name="599" href="#599">599</a> } +<a class="jxr_linenumber" name="586" href="#586">586</a> <strong class="jxr_keyword">public</strong> String getRequestUserName() { +<a class="jxr_linenumber" name="587" href="#587">587</a> <a href="../../../../../org/apache/hadoop/hbase/security/User.html">User</a> user = getRequestUser(); +<a class="jxr_linenumber" name="588" href="#588">588</a> <strong class="jxr_keyword">return</strong> user == <strong class="jxr_keyword">null</strong>? <strong class="jxr_keyword">null</strong>: user.getShortName(); +<a class="jxr_linenumber" name="589" href="#589">589</a> } +<a class="jxr_linenumber" name="590" href="#590">590</a> +<a class="jxr_linenumber" name="591" href="#591">591</a> @Override +<a class="jxr_linenumber" name="592" href="#592">592</a> <strong class="jxr_keyword">public</strong> InetAddress getRemoteAddress() { +<a class="jxr_linenumber" name="593" href="#593">593</a> <strong class="jxr_keyword">return</strong> remoteAddress; +<a class="jxr_linenumber" name="594" href="#594">594</a> } +<a class="jxr_linenumber" name="595" href="#595">595</a> +<a class="jxr_linenumber" name="596" href="#596">596</a> @Override +<a class="jxr_linenumber" name="597" href="#597">597</a> <strong class="jxr_keyword">public</strong> <a href="../../../../../org/apache/hadoop/hbase/util/VersionInfo.html">VersionInfo</a> getClientVersionInfo() { +<a class="jxr_linenumber" name="598" href="#598">598</a> <strong class="jxr_keyword">return</strong> connection.getVersionInfo(); +<a class="jxr_linenumber" name="599" href="#599">599</a> } <a class="jxr_linenumber" name="600" href="#600">600</a> -<a class="jxr_linenumber" name="601" href="#601">601</a> <em class="jxr_javadoccomment">/**</em><em class="jxr_javadoccomment"> Listens on the socket. Creates jobs for the handler threads*/</em> -<a class="jxr_linenumber" name="602" href="#602">602</a> <strong class="jxr_keyword">private</strong> <strong class="jxr_keyword">class</strong> <a href="../../../../../org/apache/hadoop/hbase/client/ClusterStatusListener.html">Listener</a> <strong class="jxr_keyword">extends</strong> Thread { -<a class="jxr_linenumber" name="603" href="#603">603</a> -<a class="jxr_linenumber" name="604" href="#604">604</a> <strong class="jxr_keyword">private</strong> ServerSocketChannel acceptChannel = <strong class="jxr_keyword">null</strong>; <em class="jxr_comment">//the accept channel</em> -<a class="jxr_linenumber" name="605" href="#605">605</a> <strong class="jxr_keyword">private</strong> Selector selector = <strong class="jxr_keyword">null</strong>; <em class="jxr_comment">//the selector that we use for the server</em> -<a class="jxr_linenumber" name="606" href="#606">606</a> <strong class="jxr_keyword">private</strong> <a href="../../../../../org/apache/hadoop/hbase/regionserver/StoreFile.html">Reader</a>[] readers = <strong class="jxr_keyword">null</strong>; -<a class="jxr_linenumber" name="607" href="#607">607</a> <strong class="jxr_keyword">private</strong> <strong class="jxr_keyword">int</strong> currentReader = 0; -<a class="jxr_linenumber" name="608" href="#608">608</a> <strong class="jxr_keyword">private</strong> Random rand = <strong class="jxr_keyword">new</strong> Random(); -<a class="jxr_linenumber" name="609" href="#609">609</a> <strong class="jxr_keyword">private</strong> <strong class="jxr_keyword">long</strong> lastCleanupRunTime = 0; <em class="jxr_comment">//the last time when a cleanup connec-</em> -<a class="jxr_linenumber" name="610" href="#610">610</a> <em class="jxr_comment">//-tion (for idle connections) ran</em> -<a class="jxr_linenumber" name="611" href="#611">611</a> <strong class="jxr_keyword">private</strong> <strong class="jxr_keyword">long</strong> cleanupInterval = 10000; <em class="jxr_comment">//the minimum interval between</em> -<a class="jxr_linenumber" name="612" href="#612">612</a> <em class="jxr_comment">//two cleanup runs</em> -<a class="jxr_linenumber" name="613" href="#613">613</a> <strong class="jxr_keyword">private</strong> <strong class="jxr_keyword">int</strong> backlogLength; +<a class="jxr_linenumber" name="601" href="#601">601</a> @Override +<a class="jxr_linenumber" name="602" href="#602">602</a> <strong class="jxr_keyword">public</strong> <strong class="jxr_keyword">void</strong> setCallBack(<a href="../../../../../org/apache/hadoop/hbase/ipc/RpcCallback.html">RpcCallback</a> callback) { +<a class="jxr_linenumber" name="603" href="#603">603</a> <strong class="jxr_keyword">this</strong>.callback = callback; +<a class="jxr_linenumber" name="604" href="#604">604</a> } +<a class="jxr_linenumber" name="605" href="#605">605</a> +<a class="jxr_linenumber" name="606" href="#606">606</a> @Override +<a class="jxr_linenumber" name="607" href="#607">607</a> <strong class="jxr_keyword">public</strong> <strong class="jxr_keyword">boolean</strong> isRetryImmediatelySupported() { +<a class="jxr_linenumber" name="608" href="#608">608</a> <strong class="jxr_keyword">return</strong> retryImmediatelySupported; +<a class="jxr_linenumber" name="609" href="#609">609</a> } +<a class="jxr_linenumber" name="610" href="#610">610</a> } +<a class="jxr_linenumber" name="611" href="#611">611</a> +<a class="jxr_linenumber" name="612" href="#612">612</a> <em class="jxr_javadoccomment">/**</em><em class="jxr_javadoccomment"> Listens on the socket. Creates jobs for the handler threads*/</em> +<a class="jxr_linenumber" name="613" href="#613">613</a> <strong class="jxr_keyword">private</strong> <strong class="jxr_keyword">class</strong> <a href="../../../../../org/apache/hadoop/hbase/client/ClusterStatusListener.html">Listener</a> <strong class="jxr_keyword">extends</strong> Thread { <a class="jxr_linenumber" name="614" href="#614">614</a> -<a class="jxr_linenumber" name="615" href="#615">615</a> <strong class="jxr_keyword">private</strong> ExecutorService readPool; -<a class="jxr_linenumber" name="616" href="#616">616</a> -<a class="jxr_linenumber" name="617" href="#617">617</a> <strong class="jxr_keyword">public</strong> <a href="../../../../../org/apache/hadoop/hbase/client/ClusterStatusListener.html">Listener</a>(<strong class="jxr_keyword">final</strong> String name) <strong class="jxr_keyword">throws</strong> IOException { -<a class="jxr_linenumber" name="618" href="#618">618</a> <strong class="jxr_keyword">super</strong>(name); -<a class="jxr_linenumber" name="619" href="#619">619</a> backlogLength = conf.getInt(<span class="jxr_string">"hbase.ipc.server.listen.queue.size"</span>, 128); -<a class="jxr_linenumber" name="620" href="#620">620</a> <em class="jxr_comment">// Create a new server socket and set to non blocking mode</em> -<a class="jxr_linenumber" name="621" href="#621">621</a> acceptChannel = ServerSocketChannel.open(); -<a class="jxr_linenumber" name="622" href="#622">622</a> acceptChannel.configureBlocking(false); -<a class="jxr_linenumber" name="623" href="#623">623</a> -<a class="jxr_linenumber" name="624" href="#624">624</a> <em class="jxr_comment">// Bind the server socket to the binding addrees (can be different from the default interface)</em> -<a class="jxr_linenumber" name="625" href="#625">625</a> bind(acceptChannel.socket(), bindAddress, backlogLength); -<a class="jxr_linenumber" name="626" href="#626">626</a> port = acceptChannel.socket().getLocalPort(); <em class="jxr_comment">//Could be an ephemeral port</em> -<a class="jxr_linenumber" name="627" href="#627">627</a> address = (InetSocketAddress)acceptChannel.socket().getLocalSocketAddress(); -<a class="jxr_linenumber" name="628" href="#628">628</a> <em class="jxr_comment">// create a selector;</em> -<a class="jxr_linenumber" name="629" href="#629">629</a> selector= Selector.open(); -<a class="jxr_linenumber" name="630" href="#630">630</a> -<a class="jxr_linenumber" name="631" href="#631">631</a> readers = <strong class="jxr_keyword">new</strong> <a href="../../../../../org/apache/hadoop/hbase/regionserver/StoreFile.html">Reader</a>[readThreads]; -<a class="jxr_linenumber" name="632" href="#632">632</a> readPool = Executors.newFixedThreadPool(readThreads, -<a class="jxr_linenumber" name="633" href="#633">633</a> <strong class="jxr_keyword">new</strong> ThreadFactoryBuilder().setNameFormat( -<a class="jxr_linenumber" name="634" href="#634">634</a> <span class="jxr_string">"RpcServer.reader=%d,bindAddress="</span> + bindAddress.getHostName() + -<a class="jxr_linenumber" name="635" href="#635">635</a> <span class="jxr_string">",port="</span> + port).setDaemon(<strong class="jxr_keyword">true</strong>).build()); -<a class="jxr_linenumber" name="636" href="#636">636</a> <strong class="jxr_keyword">for</strong> (<strong class="jxr_keyword">int</strong> i = 0; i < readThreads; ++i) { -<a class="jxr_linenumber" name="637" href="#637">637</a> <a href="../../../../../org/apache/hadoop/hbase/regionserver/StoreFile.html">Reader</a> reader = <strong class="jxr_keyword">new</strong> <a href="../../../../../org/apache/hadoop/hbase/regionserver/StoreFile.html">Reader</a>(); -<a class="jxr_linenumber" name="638" href="#638">638</a> readers[i] = reader; -<a class="jxr_linenumber" name="639" href="#639">639</a> readPool.execute(reader); -<a class="jxr_linenumber" name="640" href="#640">640</a> } -<a class="jxr_linenumber" name="641" href="#641">641</a> LOG.info(getName() + <span class="jxr_string">": started "</span> + readThreads + <span class="jxr_string">" reader(s) listening on port="</span> + port); -<a class="jxr_linenumber" name="642" href="#642">642</a> -<a class="jxr_linenumber" name="643" href="#643">643</a> <em class="jxr_comment">// Register accepts on the server socket with the selector.</em> -<a class="jxr_linenumber" name="644" href="#644">644</a> acceptChannel.register(selector, SelectionKey.OP_ACCEPT); -<a class="jxr_linenumber" name="645" href="#645">645</a> <strong class="jxr_keyword">this</strong>.setName(<span class="jxr_string">"RpcServer.listener,port="</span> + port); -<a class="jxr_linenumber" name="646" href="#646">646</a> <strong class="jxr_keyword">this</strong>.setDaemon(<strong class="jxr_keyword">true</strong>); -<a class="jxr_linenumber" name="647" href="#647">647</a> } -<a class="jxr_linenumber" name="648" href="#648">648</a> -<a class="jxr_linenumber" name="649" href="#649">649</a> -<a class="jxr_linenumber" name="650" href="#650">650</a> <strong class="jxr_keyword">private</strong> <strong class="jxr_keyword">class</strong> <a href="../../../../../org/apache/hadoop/hbase/regionserver/StoreFile.html">Reader</a> <strong class="jxr_keyword">implements</strong> Runnable { -<a class="jxr_linenumber" name="651" href="#651">651</a> <strong class="jxr_keyword">private</strong> <strong class="jxr_keyword">volatile</strong> <strong class="jxr_keyword">boolean</strong> adding = false; -<a class="jxr_linenumber" name="652" href="#652">652</a> <strong class="jxr_keyword">private</strong> <strong class="jxr_keyword">final</strong> Selector readSelector; +<a class="jxr_linenumber" name="615" href="#615">615</a> <strong class="jxr_keyword">private</strong> ServerSocketChannel acceptChannel = <strong class="jxr_keyword">null</strong>; <em class="jxr_comment">//the accept channel</em> +<a class="jxr_linenumber" name="616" href="#616">616</a> <strong class="jxr_keyword">private</strong> Selector selector = <strong class="jxr_keyword">null</strong>; <em class="jxr_comment">//the selector that we use for the server</em> +<a class="jxr_linenumber" name="617" href="#617">617</a> <strong class="jxr_keyword">private</strong> <a href="../../../../../org/apache/hadoop/hbase/regionserver/StoreFile.html">Reader</a>[] readers = <strong class="jxr_keyword">null</strong>; +<a class="jxr_linenumber" name="618" href="#618">618</a> <strong class="jxr_keyword">private</strong> <strong class="jxr_keyword">int</strong> currentReader = 0; +<a class="jxr_linenumber" name="619" href="#619">619</a> <strong class="jxr_keyword">private</strong> Random rand = <strong class="jxr_keyword">new</strong> Random(); +<a class="jxr_linenumber" name="620" href="#620">620</a> <strong class="jxr_keyword">private</strong> <strong class="jxr_keyword">long</strong> lastCleanupRunTime = 0; <em class="jxr_comment">//the last time when a cleanup connec-</em> +<a class="jxr_linenumber" name="621" href="#621">621</a> <em class="jxr_comment">//-tion (for idle connections) ran</em> +<a class="jxr_linenumber" name="622" href="#622">622</a> <strong class="jxr_keyword">private</strong> <strong class="jxr_keyword">long</strong> cleanupInterval = 10000; <em class="jxr_comment">//the minimum interval between</em> +<a class="jxr_linenumber" name="623" href="#623">623</a> <em class="jxr_comment">//two cleanup runs</em> +<a class="jxr_linenumber" name="624" href="#624">624</a> <strong class="jxr_keyword">private</strong> <strong class="jxr_keyword">int</strong> backlogLength; +<a class="jxr_linenumber" name="625" href="#625">625</a> +<a class="jxr_linenumber" name="626" href="#626">626</a> <strong class="jxr_keyword">private</strong> ExecutorService readPool; +<a class="jxr_linenumber" name="627" href="#627">627</a> +<a class="jxr_linenumber" name="628" href="#628">628</a> <strong class="jxr_keyword">public</strong> <a href="../../../../../org/apache/hadoop/hbase/client/ClusterStatusListener.html">Listener</a>(<strong class="jxr_keyword">final</strong> String name) <strong class="jxr_keyword">throws</strong> IOException { +<a class="jxr_linenumber" name="629" href="#629">629</a> <strong class="jxr_keyword">super</strong>(name); +<a class="jxr_linenumber" name="630" href="#630">630</a> backlogLength = conf.getInt(<span class="jxr_string">"hbase.ipc.server.listen.queue.size"</span>, 128); +<a class="jxr_linenumber" name="631" href="#631">631</a> <em class="jxr_comment">// Create a new server socket and set to non blocking mode</em> +<a class="jxr_linenumber" name="632" href="#632">632</a> acceptChannel = ServerSocketChannel.open(); +<a class="jxr_linenumber" name="633" href="#633">633</a> acceptChannel.configureBlocking(false); +<a class="jxr_linenumber" name="634" href="#634">634</a> +<a class="jxr_linenumber" name="635" href="#635">635</a> <em class="jxr_comment">// Bind the server socket to the binding addrees (can be different from the default interface)</em> +<a class="jxr_linenumber" name="636" href="#636">636</a> bind(acceptChannel.socket(), bindAddress, backlogLength); +<a class="jxr_linenumber" name="637" href="#637">637</a> port = acceptChannel.socket().getLocalPort(); <em class="jxr_comment">//Could be an ephemeral port</em> +<a class="jxr_linenumber" name="638" href="#638">638</a> address = (InetSocketAddress)acceptChannel.socket().getLocalSocketAddress(); +<a class="jxr_linenumber" name="639" href="#639">639</a> <em class="jxr_comment">// create a selector;</em> +<a class="jxr_linenumber" name="640" href="#640">640</a> selector= Selector.open(); +<a class="jxr_linenumber" name="641" href="#641">641</a> +<a class="jxr_linenumber" name="642" href="#642">642</a> readers = <strong class="jxr_keyword">new</strong> <a href="../../../../../org/apache/hadoop/hbase/regionserver/StoreFile.html">Reader</a>[readThreads]; +<a class="jxr_linenumber" name="643" href="#643">643</a> readPool = Executors.newFixedThreadPool(readThreads, +<a class="jxr_linenumber" name="644" href="#644">644</a> <strong class="jxr_keyword">new</strong> ThreadFactoryBuilder().setNameFormat( +<a class="jxr_linenumber" name="645" href="#645">645</a> <span class="jxr_string">"RpcServer.reader=%d,bindAddress="</span> + bindAddress.getHostName() + +<a class="jxr_linenumber" name="646" href="#646">646</a> <span class="jxr_string">",port="</span> + port).setDaemon(<strong class="jxr_keyword">true</strong>).build()); +<a class="jxr_linenumber" name="647" href="#647">647</a> <strong class="jxr_keyword">for</strong> (<strong class="jxr_keyword">int</strong> i = 0; i < readThreads; ++i) { +<a class="jxr_linenumber" name="648" href="#648">648</a> <a href="../../../.
<TRUNCATED>