http://git-wip-us.apache.org/repos/asf/hbase-site/blob/f17356a7/xref/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.html ---------------------------------------------------------------------- diff --git a/xref/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.html b/xref/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.html index bd34b19..cae8355 100644 --- a/xref/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.html +++ b/xref/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.html @@ -372,12 +372,12 @@ <a class="jxr_linenumber" name="362" href="#362">362</a> <em class="jxr_javadoccomment"> * @param regionLocator region locator</em> <a class="jxr_linenumber" name="363" href="#363">363</a> <em class="jxr_javadoccomment"> * @param silence true to ignore unmatched column families</em> <a class="jxr_linenumber" name="364" href="#364">364</a> <em class="jxr_javadoccomment"> * @param copyFile always copy hfiles if true</em> -<a class="jxr_linenumber" name="365" href="#365">365</a> <em class="jxr_javadoccomment"> * @return List of filenames which were not found</em> +<a class="jxr_linenumber" name="365" href="#365">365</a> <em class="jxr_javadoccomment"> * @return Map of LoadQueueItem to region</em> <a class="jxr_linenumber" name="366" href="#366">366</a> <em class="jxr_javadoccomment"> * @throws TableNotFoundException if table does not yet exist</em> <a class="jxr_linenumber" name="367" href="#367">367</a> <em class="jxr_javadoccomment"> */</em> -<a class="jxr_linenumber" name="368" href="#368">368</a> <strong class="jxr_keyword">public</strong> List<String> doBulkLoad(Map<byte[], List<Path>> map, <strong class="jxr_keyword">final</strong> <a href="../../../../../org/apache/hadoop/hbase/client/Admin.html">Admin</a> admin, <a href="../../../../../org/apache/hadoop/hbase/client/Table.html">Table</a> table, -<a class="jxr_linenumber" name="369" href="#369">369</a> <a href="../../../../../org/apache/hadoop/hbase/client/RegionLocator.html">RegionLocator</a> regionLocator, <strong class="jxr_keyword">boolean</strong> silence, <strong class="jxr_keyword">boolean</strong> copyFile) -<a class="jxr_linenumber" name="370" href="#370">370</a> <strong class="jxr_keyword">throws</strong> TableNotFoundException, IOException { +<a class="jxr_linenumber" name="368" href="#368">368</a> <strong class="jxr_keyword">public</strong> Map<LoadQueueItem, ByteBuffer> doBulkLoad(Map<byte[], List<Path>> map, <strong class="jxr_keyword">final</strong> <a href="../../../../../org/apache/hadoop/hbase/client/Admin.html">Admin</a> admin, +<a class="jxr_linenumber" name="369" href="#369">369</a> <a href="../../../../../org/apache/hadoop/hbase/client/Table.html">Table</a> table, <a href="../../../../../org/apache/hadoop/hbase/client/RegionLocator.html">RegionLocator</a> regionLocator, <strong class="jxr_keyword">boolean</strong> silence, <strong class="jxr_keyword">boolean</strong> copyFile) +<a class="jxr_linenumber" name="370" href="#370">370</a> <strong class="jxr_keyword">throws</strong> TableNotFoundException, IOException { <a class="jxr_linenumber" name="371" href="#371">371</a> <strong class="jxr_keyword">if</strong> (!admin.isTableAvailable(regionLocator.getName())) { <a class="jxr_linenumber" name="372" href="#372">372</a> <strong class="jxr_keyword">throw</strong> <strong class="jxr_keyword">new</strong> <a href="../../../../../org/apache/hadoop/hbase/TableNotFoundException.html">TableNotFoundException</a>(<span class="jxr_string">"Table "</span> + table.getName() + <span class="jxr_string">" is not currently available."</span>); <a class="jxr_linenumber" name="373" href="#373">373</a> } @@ -459,8 +459,8 @@ <a class="jxr_linenumber" name="449" href="#449">449</a> } <a class="jxr_linenumber" name="450" href="#450">450</a> } <a class="jxr_linenumber" name="451" href="#451">451</a> -<a class="jxr_linenumber" name="452" href="#452">452</a> List<String> performBulkLoad(<strong class="jxr_keyword">final</strong> <a href="../../../../../org/apache/hadoop/hbase/client/Admin.html">Admin</a> admin, <a href="../../../../../org/apache/hadoop/hbase/client/Table.html">Table</a> table, <a href="../../../../../org/apache/hadoop/hbase/client/RegionLocator.html">RegionLocator</a> regionLocator, -<a class="jxr_linenumber" name="453" href="#453">453</a> Deque<LoadQueueItem> queue, ExecutorService pool, +<a class="jxr_linenumber" name="452" href="#452">452</a> Map<LoadQueueItem, ByteBuffer> performBulkLoad(<strong class="jxr_keyword">final</strong> <a href="../../../../../org/apache/hadoop/hbase/client/Admin.html">Admin</a> admin, <a href="../../../../../org/apache/hadoop/hbase/client/Table.html">Table</a> table, +<a class="jxr_linenumber" name="453" href="#453">453</a> <a href="../../../../../org/apache/hadoop/hbase/client/RegionLocator.html">RegionLocator</a> regionLocator, Deque<LoadQueueItem> queue, ExecutorService pool, <a class="jxr_linenumber" name="454" href="#454">454</a> <a href="../../../../../org/apache/hadoop/hbase/client/SecureBulkLoadClient.html">SecureBulkLoadClient</a> secureClient, <strong class="jxr_keyword">boolean</strong> copyFile) <strong class="jxr_keyword">throws</strong> IOException { <a class="jxr_linenumber" name="455" href="#455">455</a> <strong class="jxr_keyword">int</strong> count = 0; <a class="jxr_linenumber" name="456" href="#456">456</a> @@ -474,802 +474,815 @@ <a class="jxr_linenumber" name="464" href="#464">464</a> <em class="jxr_comment">// fs is the source filesystem</em> <a class="jxr_linenumber" name="465" href="#465">465</a> fsDelegationToken.acquireDelegationToken(fs); <a class="jxr_linenumber" name="466" href="#466">466</a> bulkToken = secureClient.prepareBulkLoad(admin.getConnection()); -<a class="jxr_linenumber" name="467" href="#467">467</a> Pair<Multimap<ByteBuffer, LoadQueueItem>, List<String>> pair = <strong class="jxr_keyword">null</strong>; +<a class="jxr_linenumber" name="467" href="#467">467</a> Pair<Multimap<ByteBuffer, LoadQueueItem>, Set<String>> pair = <strong class="jxr_keyword">null</strong>; <a class="jxr_linenumber" name="468" href="#468">468</a> -<a class="jxr_linenumber" name="469" href="#469">469</a> <em class="jxr_comment">// Assumes that region splits can happen while this occurs.</em> -<a class="jxr_linenumber" name="470" href="#470">470</a> <strong class="jxr_keyword">while</strong> (!queue.isEmpty()) { -<a class="jxr_linenumber" name="471" href="#471">471</a> <em class="jxr_comment">// need to reload split keys each iteration.</em> -<a class="jxr_linenumber" name="472" href="#472">472</a> <strong class="jxr_keyword">final</strong> Pair<byte[][], byte[][]> startEndKeys = regionLocator.getStartEndKeys(); -<a class="jxr_linenumber" name="473" href="#473">473</a> <strong class="jxr_keyword">if</strong> (count != 0) { -<a class="jxr_linenumber" name="474" href="#474">474</a> LOG.info(<span class="jxr_string">"Split occured while grouping HFiles, retry attempt "</span> + -<a class="jxr_linenumber" name="475" href="#475">475</a> + count + <span class="jxr_string">" with "</span> + queue.size() + <span class="jxr_string">" files remaining to group or split"</span>); -<a class="jxr_linenumber" name="476" href="#476">476</a> } -<a class="jxr_linenumber" name="477" href="#477">477</a> -<a class="jxr_linenumber" name="478" href="#478">478</a> <strong class="jxr_keyword">int</strong> maxRetries = getConf().getInt(HConstants.BULKLOAD_MAX_RETRIES_NUMBER, 10); -<a class="jxr_linenumber" name="479" href="#479">479</a> maxRetries = Math.max(maxRetries, startEndKeys.getFirst().length + 1); -<a class="jxr_linenumber" name="480" href="#480">480</a> <strong class="jxr_keyword">if</strong> (maxRetries != 0 && count >= maxRetries) { -<a class="jxr_linenumber" name="481" href="#481">481</a> <strong class="jxr_keyword">throw</strong> <strong class="jxr_keyword">new</strong> IOException(<span class="jxr_string">"Retry attempted "</span> + count + -<a class="jxr_linenumber" name="482" href="#482">482</a> <span class="jxr_string">" times without completing, bailing out"</span>); -<a class="jxr_linenumber" name="483" href="#483">483</a> } -<a class="jxr_linenumber" name="484" href="#484">484</a> count++; -<a class="jxr_linenumber" name="485" href="#485">485</a> -<a class="jxr_linenumber" name="486" href="#486">486</a> <em class="jxr_comment">// Using ByteBuffer for byte[] equality semantics</em> -<a class="jxr_linenumber" name="487" href="#487">487</a> pair = groupOrSplitPhase(table, pool, queue, startEndKeys); -<a class="jxr_linenumber" name="488" href="#488">488</a> Multimap<ByteBuffer, LoadQueueItem> regionGroups = pair.getFirst(); -<a class="jxr_linenumber" name="489" href="#489">489</a> -<a class="jxr_linenumber" name="490" href="#490">490</a> <strong class="jxr_keyword">if</strong> (!checkHFilesCountPerRegionPerFamily(regionGroups)) { -<a class="jxr_linenumber" name="491" href="#491">491</a> <em class="jxr_comment">// Error is logged inside checkHFilesCountPerRegionPerFamily.</em> -<a class="jxr_linenumber" name="492" href="#492">492</a> <strong class="jxr_keyword">throw</strong> <strong class="jxr_keyword">new</strong> IOException(<span class="jxr_string">"Trying to load more than "</span> + maxFilesPerRegionPerFamily -<a class="jxr_linenumber" name="493" href="#493">493</a> + <span class="jxr_string">" hfiles to one family of one region"</span>); -<a class="jxr_linenumber" name="494" href="#494">494</a> } -<a class="jxr_linenumber" name="495" href="#495">495</a> -<a class="jxr_linenumber" name="496" href="#496">496</a> bulkLoadPhase(table, admin.getConnection(), pool, queue, regionGroups, copyFile); -<a class="jxr_linenumber" name="497" href="#497">497</a> -<a class="jxr_linenumber" name="498" href="#498">498</a> <em class="jxr_comment">// NOTE: The next iteration's split / group could happen in parallel to</em> -<a class="jxr_linenumber" name="499" href="#499">499</a> <em class="jxr_comment">// atomic bulkloads assuming that there are splits and no merges, and</em> -<a class="jxr_linenumber" name="500" href="#500">500</a> <em class="jxr_comment">// that we can atomically pull out the groups we want to retry.</em> -<a class="jxr_linenumber" name="501" href="#501">501</a> } -<a class="jxr_linenumber" name="502" href="#502">502</a> -<a class="jxr_linenumber" name="503" href="#503">503</a> <strong class="jxr_keyword">if</strong> (!queue.isEmpty()) { -<a class="jxr_linenumber" name="504" href="#504">504</a> <strong class="jxr_keyword">throw</strong> <strong class="jxr_keyword">new</strong> RuntimeException(<span class="jxr_string">"Bulk load aborted with some files not yet loaded."</span> -<a class="jxr_linenumber" name="505" href="#505">505</a> + <span class="jxr_string">"Please check log for more details."</span>); -<a class="jxr_linenumber" name="506" href="#506">506</a> } -<a class="jxr_linenumber" name="507" href="#507">507</a> <strong class="jxr_keyword">if</strong> (pair == <strong class="jxr_keyword">null</strong>) <strong class="jxr_keyword">return</strong> <strong class="jxr_keyword">null</strong>; -<a class="jxr_linenumber" name="508" href="#508">508</a> <strong class="jxr_keyword">return</strong> pair.getSecond(); -<a class="jxr_linenumber" name="509" href="#509">509</a> } -<a class="jxr_linenumber" name="510" href="#510">510</a> -<a class="jxr_linenumber" name="511" href="#511">511</a> <em class="jxr_javadoccomment">/**</em> -<a class="jxr_linenumber" name="512" href="#512">512</a> <em class="jxr_javadoccomment"> * Prepare a collection of {@link LoadQueueItem} from list of source hfiles contained in the</em> -<a class="jxr_linenumber" name="513" href="#513">513</a> <em class="jxr_javadoccomment"> * passed directory and validates whether the prepared queue has all the valid table column</em> -<a class="jxr_linenumber" name="514" href="#514">514</a> <em class="jxr_javadoccomment"> * families in it.</em> -<a class="jxr_linenumber" name="515" href="#515">515</a> <em class="jxr_javadoccomment"> * @param hfilesDir directory containing list of hfiles to be loaded into the table</em> -<a class="jxr_linenumber" name="516" href="#516">516</a> <em class="jxr_javadoccomment"> * @param table table to which hfiles should be loaded</em> -<a class="jxr_linenumber" name="517" href="#517">517</a> <em class="jxr_javadoccomment"> * @param queue queue which needs to be loaded into the table</em> -<a class="jxr_linenumber" name="518" href="#518">518</a> <em class="jxr_javadoccomment"> * @param validateHFile if true hfiles will be validated for its format</em> -<a class="jxr_linenumber" name="519" href="#519">519</a> <em class="jxr_javadoccomment"> * @throws IOException If any I/O or network error occurred</em> -<a class="jxr_linenumber" name="520" href="#520">520</a> <em class="jxr_javadoccomment"> */</em> -<a class="jxr_linenumber" name="521" href="#521">521</a> <strong class="jxr_keyword">public</strong> <strong class="jxr_keyword">void</strong> prepareHFileQueue(Path hfilesDir, <a href="../../../../../org/apache/hadoop/hbase/client/Table.html">Table</a> table, Deque<LoadQueueItem> queue, -<a class="jxr_linenumber" name="522" href="#522">522</a> <strong class="jxr_keyword">boolean</strong> validateHFile) <strong class="jxr_keyword">throws</strong> IOException { -<a class="jxr_linenumber" name="523" href="#523">523</a> prepareHFileQueue(hfilesDir, table, queue, validateHFile, false); -<a class="jxr_linenumber" name="524" href="#524">524</a> } -<a class="jxr_linenumber" name="525" href="#525">525</a> -<a class="jxr_linenumber" name="526" href="#526">526</a> <em class="jxr_javadoccomment">/**</em> -<a class="jxr_linenumber" name="527" href="#527">527</a> <em class="jxr_javadoccomment"> * Prepare a collection of {@link LoadQueueItem} from list of source hfiles contained in the</em> -<a class="jxr_linenumber" name="528" href="#528">528</a> <em class="jxr_javadoccomment"> * passed directory and validates whether the prepared queue has all the valid table column</em> -<a class="jxr_linenumber" name="529" href="#529">529</a> <em class="jxr_javadoccomment"> * families in it.</em> -<a class="jxr_linenumber" name="530" href="#530">530</a> <em class="jxr_javadoccomment"> * @param hfilesDir directory containing list of hfiles to be loaded into the table</em> -<a class="jxr_linenumber" name="531" href="#531">531</a> <em class="jxr_javadoccomment"> * @param table table to which hfiles should be loaded</em> -<a class="jxr_linenumber" name="532" href="#532">532</a> <em class="jxr_javadoccomment"> * @param queue queue which needs to be loaded into the table</em> -<a class="jxr_linenumber" name="533" href="#533">533</a> <em class="jxr_javadoccomment"> * @param validateHFile if true hfiles will be validated for its format</em> -<a class="jxr_linenumber" name="534" href="#534">534</a> <em class="jxr_javadoccomment"> * @param silence true to ignore unmatched column families</em> -<a class="jxr_linenumber" name="535" href="#535">535</a> <em class="jxr_javadoccomment"> * @throws IOException If any I/O or network error occurred</em> -<a class="jxr_linenumber" name="536" href="#536">536</a> <em class="jxr_javadoccomment"> */</em> -<a class="jxr_linenumber" name="537" href="#537">537</a> <strong class="jxr_keyword">public</strong> <strong class="jxr_keyword">void</strong> prepareHFileQueue(Path hfilesDir, <a href="../../../../../org/apache/hadoop/hbase/client/Table.html">Table</a> table, -<a class="jxr_linenumber" name="538" href="#538">538</a> Deque<LoadQueueItem> queue, <strong class="jxr_keyword">boolean</strong> validateHFile, <strong class="jxr_keyword">boolean</strong> silence) <strong class="jxr_keyword">throws</strong> IOException { -<a class="jxr_linenumber" name="539" href="#539">539</a> discoverLoadQueue(queue, hfilesDir, validateHFile); -<a class="jxr_linenumber" name="540" href="#540">540</a> validateFamiliesInHFiles(table, queue, silence); -<a class="jxr_linenumber" name="541" href="#541">541</a> } -<a class="jxr_linenumber" name="542" href="#542">542</a> -<a class="jxr_linenumber" name="543" href="#543">543</a> <em class="jxr_javadoccomment">/**</em> -<a class="jxr_linenumber" name="544" href="#544">544</a> <em class="jxr_javadoccomment"> * Prepare a collection of {@link LoadQueueItem} from list of source hfiles contained in the</em> -<a class="jxr_linenumber" name="545" href="#545">545</a> <em class="jxr_javadoccomment"> * passed directory and validates whether the prepared queue has all the valid table column</em> -<a class="jxr_linenumber" name="546" href="#546">546</a> <em class="jxr_javadoccomment"> * families in it.</em> -<a class="jxr_linenumber" name="547" href="#547">547</a> <em class="jxr_javadoccomment"> * @param map map of family to List of hfiles</em> -<a class="jxr_linenumber" name="548" href="#548">548</a> <em class="jxr_javadoccomment"> * @param table table to which hfiles should be loaded</em> -<a class="jxr_linenumber" name="549" href="#549">549</a> <em class="jxr_javadoccomment"> * @param queue queue which needs to be loaded into the table</em> -<a class="jxr_linenumber" name="550" href="#550">550</a> <em class="jxr_javadoccomment"> * @param silence true to ignore unmatched column families</em> -<a class="jxr_linenumber" name="551" href="#551">551</a> <em class="jxr_javadoccomment"> * @throws IOException If any I/O or network error occurred</em> -<a class="jxr_linenumber" name="552" href="#552">552</a> <em class="jxr_javadoccomment"> */</em> -<a class="jxr_linenumber" name="553" href="#553">553</a> <strong class="jxr_keyword">public</strong> <strong class="jxr_keyword">void</strong> prepareHFileQueue(Map<byte[], List<Path>> map, <a href="../../../../../org/apache/hadoop/hbase/client/Table.html">Table</a> table, -<a class="jxr_linenumber" name="554" href="#554">554</a> Deque<LoadQueueItem> queue, <strong class="jxr_keyword">boolean</strong> silence) <strong class="jxr_keyword">throws</strong> IOException { -<a class="jxr_linenumber" name="555" href="#555">555</a> populateLoadQueue(queue, map); -<a class="jxr_linenumber" name="556" href="#556">556</a> validateFamiliesInHFiles(table, queue, silence); -<a class="jxr_linenumber" name="557" href="#557">557</a> } -<a class="jxr_linenumber" name="558" href="#558">558</a> -<a class="jxr_linenumber" name="559" href="#559">559</a> <em class="jxr_comment">// Initialize a thread pool</em> -<a class="jxr_linenumber" name="560" href="#560">560</a> <strong class="jxr_keyword">private</strong> ExecutorService createExecutorService() { -<a class="jxr_linenumber" name="561" href="#561">561</a> ThreadFactoryBuilder builder = <strong class="jxr_keyword">new</strong> ThreadFactoryBuilder(); -<a class="jxr_linenumber" name="562" href="#562">562</a> builder.setNameFormat(<span class="jxr_string">"LoadIncrementalHFiles-%1$d"</span>); -<a class="jxr_linenumber" name="563" href="#563">563</a> ExecutorService pool = <strong class="jxr_keyword">new</strong> ThreadPoolExecutor(nrThreads, nrThreads, 60, TimeUnit.SECONDS, -<a class="jxr_linenumber" name="564" href="#564">564</a> <strong class="jxr_keyword">new</strong> LinkedBlockingQueue<Runnable>(), builder.build()); -<a class="jxr_linenumber" name="565" href="#565">565</a> ((ThreadPoolExecutor) pool).allowCoreThreadTimeOut(<strong class="jxr_keyword">true</strong>); -<a class="jxr_linenumber" name="566" href="#566">566</a> <strong class="jxr_keyword">return</strong> pool; -<a class="jxr_linenumber" name="567" href="#567">567</a> } -<a class="jxr_linenumber" name="568" href="#568">568</a> -<a class="jxr_linenumber" name="569" href="#569">569</a> <em class="jxr_javadoccomment">/**</em> -<a class="jxr_linenumber" name="570" href="#570">570</a> <em class="jxr_javadoccomment"> * Checks whether there is any invalid family name in HFiles to be bulk loaded.</em> -<a class="jxr_linenumber" name="571" href="#571">571</a> <em class="jxr_javadoccomment"> */</em> -<a class="jxr_linenumber" name="572" href="#572">572</a> <strong class="jxr_keyword">private</strong> <strong class="jxr_keyword">void</strong> validateFamiliesInHFiles(<a href="../../../../../org/apache/hadoop/hbase/client/Table.html">Table</a> table, Deque<LoadQueueItem> queue, <strong class="jxr_keyword">boolean</strong> silence) -<a class="jxr_linenumber" name="573" href="#573">573</a> <strong class="jxr_keyword">throws</strong> IOException { -<a class="jxr_linenumber" name="574" href="#574">574</a> Collection<HColumnDescriptor> families = table.getTableDescriptor().getFamilies(); -<a class="jxr_linenumber" name="575" href="#575">575</a> List<String> familyNames = <strong class="jxr_keyword">new</strong> ArrayList<>(families.size()); -<a class="jxr_linenumber" name="576" href="#576">576</a> <strong class="jxr_keyword">for</strong> (HColumnDescriptor family : families) { -<a class="jxr_linenumber" name="577" href="#577">577</a> familyNames.add(family.getNameAsString()); -<a class="jxr_linenumber" name="578" href="#578">578</a> } -<a class="jxr_linenumber" name="579" href="#579">579</a> Iterator<LoadQueueItem> queueIter = queue.iterator(); -<a class="jxr_linenumber" name="580" href="#580">580</a> <strong class="jxr_keyword">while</strong> (queueIter.hasNext()) { -<a class="jxr_linenumber" name="581" href="#581">581</a> <a href="../../../../../org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.html">LoadQueueItem</a> lqi = queueIter.next(); -<a class="jxr_linenumber" name="582" href="#582">582</a> String familyNameInHFile = Bytes.toString(lqi.family); -<a class="jxr_linenumber" name="583" href="#583">583</a> <strong class="jxr_keyword">if</strong> (!familyNames.contains(familyNameInHFile)) { -<a class="jxr_linenumber" name="584" href="#584">584</a> unmatchedFamilies.add(familyNameInHFile); -<a class="jxr_linenumber" name="585" href="#585">585</a> } -<a class="jxr_linenumber" name="586" href="#586">586</a> } -<a class="jxr_linenumber" name="587" href="#587">587</a> <strong class="jxr_keyword">if</strong> (unmatchedFamilies.size() > 0) { -<a class="jxr_linenumber" name="588" href="#588">588</a> String msg = -<a class="jxr_linenumber" name="589" href="#589">589</a> <span class="jxr_string">"Unmatched family names found: unmatched family names in HFiles to be bulkloaded: "</span> -<a class="jxr_linenumber" name="590" href="#590">590</a> + unmatchedFamilies + <span class="jxr_string">"; valid family names of table "</span> + table.getName() + <span class="jxr_string">" are: "</span> -<a class="jxr_linenumber" name="591" href="#591">591</a> + familyNames; -<a class="jxr_linenumber" name="592" href="#592">592</a> LOG.error(msg); -<a class="jxr_linenumber" name="593" href="#593">593</a> <strong class="jxr_keyword">if</strong> (!silence) <strong class="jxr_keyword">throw</strong> <strong class="jxr_keyword">new</strong> IOException(msg); -<a class="jxr_linenumber" name="594" href="#594">594</a> } -<a class="jxr_linenumber" name="595" href="#595">595</a> } -<a class="jxr_linenumber" name="596" href="#596">596</a> -<a class="jxr_linenumber" name="597" href="#597">597</a> <em class="jxr_javadoccomment">/**</em> -<a class="jxr_linenumber" name="598" href="#598">598</a> <em class="jxr_javadoccomment"> * Used by the replication sink to load the hfiles from the source cluster. It does the following,</em> -<a class="jxr_linenumber" name="599" href="#599">599</a> <em class="jxr_javadoccomment"> * <ol></em> -<a class="jxr_linenumber" name="600" href="#600">600</a> <em class="jxr_javadoccomment"> * <li>LoadIncrementalHFiles#groupOrSplitPhase(Table, ExecutorService, Deque, Pair)}</li></em> -<a class="jxr_linenumber" name="601" href="#601">601</a> <em class="jxr_javadoccomment"> * <li>LoadIncrementalHFiles#bulkLoadPhase(Table, Connection, ExecutorService, Deque, Multimap)</em> -<a class="jxr_linenumber" name="602" href="#602">602</a> <em class="jxr_javadoccomment"> * </li></em> -<a class="jxr_linenumber" name="603" href="#603">603</a> <em class="jxr_javadoccomment"> * </ol></em> -<a class="jxr_linenumber" name="604" href="#604">604</a> <em class="jxr_javadoccomment"> * @param table Table to which these hfiles should be loaded to</em> -<a class="jxr_linenumber" name="605" href="#605">605</a> <em class="jxr_javadoccomment"> * @param conn Connection to use</em> -<a class="jxr_linenumber" name="606" href="#606">606</a> <em class="jxr_javadoccomment"> * @param queue {@link LoadQueueItem} has hfiles yet to be loaded</em> -<a class="jxr_linenumber" name="607" href="#607">607</a> <em class="jxr_javadoccomment"> * @param startEndKeys starting and ending row keys of the region</em> -<a class="jxr_linenumber" name="608" href="#608">608</a> <em class="jxr_javadoccomment"> */</em> -<a class="jxr_linenumber" name="609" href="#609">609</a> <strong class="jxr_keyword">public</strong> <strong class="jxr_keyword">void</strong> loadHFileQueue(<strong class="jxr_keyword">final</strong> <a href="../../../../../org/apache/hadoop/hbase/client/Table.html">Table</a> table, <strong class="jxr_keyword">final</strong> <a href="../../../../../org/apache/hadoop/hbase/client/Connection.html">Connection</a> conn, Deque<LoadQueueItem> queue, -<a class="jxr_linenumber" name="610" href="#610">610</a> Pair<byte[][], byte[][]> startEndKeys) <strong class="jxr_keyword">throws</strong> IOException { -<a class="jxr_linenumber" name="611" href="#611">611</a> loadHFileQueue(table, conn, queue, startEndKeys, false); -<a class="jxr_linenumber" name="612" href="#612">612</a> } -<a class="jxr_linenumber" name="613" href="#613">613</a> -<a class="jxr_linenumber" name="614" href="#614">614</a> <em class="jxr_javadoccomment">/**</em> -<a class="jxr_linenumber" name="615" href="#615">615</a> <em class="jxr_javadoccomment"> * Used by the replication sink to load the hfiles from the source cluster. It does the following,</em> -<a class="jxr_linenumber" name="616" href="#616">616</a> <em class="jxr_javadoccomment"> * <ol></em> -<a class="jxr_linenumber" name="617" href="#617">617</a> <em class="jxr_javadoccomment"> * <li>LoadIncrementalHFiles#groupOrSplitPhase(Table, ExecutorService, Deque, Pair)}</li></em> -<a class="jxr_linenumber" name="618" href="#618">618</a> <em class="jxr_javadoccomment"> * <li>LoadIncrementalHFiles#bulkLoadPhase(Table, Connection, ExecutorService, Deque, Multimap)</em> -<a class="jxr_linenumber" name="619" href="#619">619</a> <em class="jxr_javadoccomment"> * </li></em> -<a class="jxr_linenumber" name="620" href="#620">620</a> <em class="jxr_javadoccomment"> * </ol></em> -<a class="jxr_linenumber" name="621" href="#621">621</a> <em class="jxr_javadoccomment"> * @param table Table to which these hfiles should be loaded to</em> -<a class="jxr_linenumber" name="622" href="#622">622</a> <em class="jxr_javadoccomment"> * @param conn Connection to use</em> -<a class="jxr_linenumber" name="623" href="#623">623</a> <em class="jxr_javadoccomment"> * @param queue {@link LoadQueueItem} has hfiles yet to be loaded</em> -<a class="jxr_linenumber" name="624" href="#624">624</a> <em class="jxr_javadoccomment"> * @param startEndKeys starting and ending row keys of the region</em> -<a class="jxr_linenumber" name="625" href="#625">625</a> <em class="jxr_javadoccomment"> */</em> -<a class="jxr_linenumber" name="626" href="#626">626</a> <strong class="jxr_keyword">public</strong> <strong class="jxr_keyword">void</strong> loadHFileQueue(<strong class="jxr_keyword">final</strong> <a href="../../../../../org/apache/hadoop/hbase/client/Table.html">Table</a> table, <strong class="jxr_keyword">final</strong> <a href="../../../../../org/apache/hadoop/hbase/client/Connection.html">Connection</a> conn, Deque<LoadQueueItem> queue, -<a class="jxr_linenumber" name="627" href="#627">627</a> Pair<byte[][], byte[][]> startEndKeys, <strong class="jxr_keyword">boolean</strong> copyFile) <strong class="jxr_keyword">throws</strong> IOException { -<a class="jxr_linenumber" name="628" href="#628">628</a> ExecutorService pool = <strong class="jxr_keyword">null</strong>; -<a class="jxr_linenumber" name="629" href="#629">629</a> <strong class="jxr_keyword">try</strong> { -<a class="jxr_linenumber" name="630" href="#630">630</a> pool = createExecutorService(); -<a class="jxr_linenumber" name="631" href="#631">631</a> Multimap<ByteBuffer, LoadQueueItem> regionGroups = -<a class="jxr_linenumber" name="632" href="#632">632</a> groupOrSplitPhase(table, pool, queue, startEndKeys).getFirst(); -<a class="jxr_linenumber" name="633" href="#633">633</a> bulkLoadPhase(table, conn, pool, queue, regionGroups, copyFile); -<a class="jxr_linenumber" name="634" href="#634">634</a> } <strong class="jxr_keyword">finally</strong> { -<a class="jxr_linenumber" name="635" href="#635">635</a> <strong class="jxr_keyword">if</strong> (pool != <strong class="jxr_keyword">null</strong>) { -<a class="jxr_linenumber" name="636" href="#636">636</a> pool.shutdown(); -<a class="jxr_linenumber" name="637" href="#637">637</a> } -<a class="jxr_linenumber" name="638" href="#638">638</a> } -<a class="jxr_linenumber" name="639" href="#639">639</a> } -<a class="jxr_linenumber" name="640" href="#640">640</a> -<a class="jxr_linenumber" name="641" href="#641">641</a> <em class="jxr_javadoccomment">/**</em> -<a class="jxr_linenumber" name="642" href="#642">642</a> <em class="jxr_javadoccomment"> * This takes the LQI's grouped by likely regions and attempts to bulk load</em> -<a class="jxr_linenumber" name="643" href="#643">643</a> <em class="jxr_javadoccomment"> * them. Any failures are re-queued for another pass with the</em> -<a class="jxr_linenumber" name="644" href="#644">644</a> <em class="jxr_javadoccomment"> * groupOrSplitPhase.</em> -<a class="jxr_linenumber" name="645" href="#645">645</a> <em class="jxr_javadoccomment"> */</em> -<a class="jxr_linenumber" name="646" href="#646">646</a> <strong class="jxr_keyword">protected</strong> <strong class="jxr_keyword">void</strong> bulkLoadPhase(<strong class="jxr_keyword">final</strong> <a href="../../../../../org/apache/hadoop/hbase/client/Table.html">Table</a> table, <strong class="jxr_keyword">final</strong> <a href="../../../../../org/apache/hadoop/hbase/client/Connection.html">Connection</a> conn, -<a class="jxr_linenumber" name="647" href="#647">647</a> ExecutorService pool, Deque<LoadQueueItem> queue, -<a class="jxr_linenumber" name="648" href="#648">648</a> <strong class="jxr_keyword">final</strong> Multimap<ByteBuffer, LoadQueueItem> regionGroups, <strong class="jxr_keyword">boolean</strong> copyFile) <strong class="jxr_keyword">throws</strong> IOException { -<a class="jxr_linenumber" name="649" href="#649">649</a> <em class="jxr_comment">// atomically bulk load the groups.</em> -<a class="jxr_linenumber" name="650" href="#650">650</a> Set<Future<List<LoadQueueItem>>> loadingFutures = <strong class="jxr_keyword">new</strong> HashSet<>(); -<a class="jxr_linenumber" name="651" href="#651">651</a> <strong class="jxr_keyword">for</strong> (Entry<ByteBuffer, ? <strong class="jxr_keyword">extends</strong> Collection<LoadQueueItem>> e: regionGroups.asMap().entrySet()){ -<a class="jxr_linenumber" name="652" href="#652">652</a> <strong class="jxr_keyword">final</strong> byte[] first = e.getKey().array(); -<a class="jxr_linenumber" name="653" href="#653">653</a> <strong class="jxr_keyword">final</strong> Collection<LoadQueueItem> lqis = e.getValue(); -<a class="jxr_linenumber" name="654" href="#654">654</a> -<a class="jxr_linenumber" name="655" href="#655">655</a> <strong class="jxr_keyword">final</strong> Callable<List<LoadQueueItem>> call = <strong class="jxr_keyword">new</strong> Callable<List<LoadQueueItem>>() { -<a class="jxr_linenumber" name="656" href="#656">656</a> @Override -<a class="jxr_linenumber" name="657" href="#657">657</a> <strong class="jxr_keyword">public</strong> List<LoadQueueItem> call() <strong class="jxr_keyword">throws</strong> Exception { -<a class="jxr_linenumber" name="658" href="#658">658</a> List<LoadQueueItem> toRetry = -<a class="jxr_linenumber" name="659" href="#659">659</a> tryAtomicRegionLoad(conn, table.getName(), first, lqis, copyFile); -<a class="jxr_linenumber" name="660" href="#660">660</a> <strong class="jxr_keyword">return</strong> toRetry; -<a class="jxr_linenumber" name="661" href="#661">661</a> } -<a class="jxr_linenumber" name="662" href="#662">662</a> }; -<a class="jxr_linenumber" name="663" href="#663">663</a> loadingFutures.add(pool.submit(call)); -<a class="jxr_linenumber" name="664" href="#664">664</a> } -<a class="jxr_linenumber" name="665" href="#665">665</a> -<a class="jxr_linenumber" name="666" href="#666">666</a> <em class="jxr_comment">// get all the results.</em> -<a class="jxr_linenumber" name="667" href="#667">667</a> <strong class="jxr_keyword">for</strong> (Future<List<LoadQueueItem>> future : loadingFutures) { -<a class="jxr_linenumber" name="668" href="#668">668</a> <strong class="jxr_keyword">try</strong> { -<a class="jxr_linenumber" name="669" href="#669">669</a> List<LoadQueueItem> toRetry = future.get(); -<a class="jxr_linenumber" name="670" href="#670">670</a> -<a class="jxr_linenumber" name="671" href="#671">671</a> <em class="jxr_comment">// LQIs that are requeued to be regrouped.</em> -<a class="jxr_linenumber" name="672" href="#672">672</a> queue.addAll(toRetry); -<a class="jxr_linenumber" name="673" href="#673">673</a> -<a class="jxr_linenumber" name="674" href="#674">674</a> } <strong class="jxr_keyword">catch</strong> (ExecutionException e1) { -<a class="jxr_linenumber" name="675" href="#675">675</a> Throwable t = e1.getCause(); -<a class="jxr_linenumber" name="676" href="#676">676</a> <strong class="jxr_keyword">if</strong> (t instanceof IOException) { -<a class="jxr_linenumber" name="677" href="#677">677</a> <em class="jxr_comment">// At this point something unrecoverable has happened.</em> -<a class="jxr_linenumber" name="678" href="#678">678</a> <em class="jxr_comment">// TODO Implement bulk load recovery</em> -<a class="jxr_linenumber" name="679" href="#679">679</a> <strong class="jxr_keyword">throw</strong> <strong class="jxr_keyword">new</strong> IOException(<span class="jxr_string">"BulkLoad encountered an unrecoverable problem"</span>, t); -<a class="jxr_linenumber" name="680" href="#680">680</a> } -<a class="jxr_linenumber" name="681" href="#681">681</a> LOG.error(<span class="jxr_string">"Unexpected execution exception during bulk load"</span>, e1); -<a class="jxr_linenumber" name="682" href="#682">682</a> <strong class="jxr_keyword">throw</strong> <strong class="jxr_keyword">new</strong> IllegalStateException(t); -<a class="jxr_linenumber" name="683" href="#683">683</a> } <strong class="jxr_keyword">catch</strong> (InterruptedException e1) { -<a class="jxr_linenumber" name="684" href="#684">684</a> LOG.error(<span class="jxr_string">"Unexpected interrupted exception during bulk load"</span>, e1); -<a class="jxr_linenumber" name="685" href="#685">685</a> <strong class="jxr_keyword">throw</strong> (InterruptedIOException)<strong class="jxr_keyword">new</strong> InterruptedIOException().initCause(e1); -<a class="jxr_linenumber" name="686" href="#686">686</a> } -<a class="jxr_linenumber" name="687" href="#687">687</a> } -<a class="jxr_linenumber" name="688" href="#688">688</a> } -<a class="jxr_linenumber" name="689" href="#689">689</a> -<a class="jxr_linenumber" name="690" href="#690">690</a> <strong class="jxr_keyword">private</strong> <strong class="jxr_keyword">boolean</strong> checkHFilesCountPerRegionPerFamily( -<a class="jxr_linenumber" name="691" href="#691">691</a> <strong class="jxr_keyword">final</strong> Multimap<ByteBuffer, LoadQueueItem> regionGroups) { -<a class="jxr_linenumber" name="692" href="#692">692</a> <strong class="jxr_keyword">for</strong> (Entry<ByteBuffer, -<a class="jxr_linenumber" name="693" href="#693">693</a> ? <strong class="jxr_keyword">extends</strong> Collection<LoadQueueItem>> e: regionGroups.asMap().entrySet()) { -<a class="jxr_linenumber" name="694" href="#694">694</a> <strong class="jxr_keyword">final</strong> Collection<LoadQueueItem> lqis = e.getValue(); -<a class="jxr_linenumber" name="695" href="#695">695</a> HashMap<byte[], MutableInt> filesMap = <strong class="jxr_keyword">new</strong> HashMap<>(); -<a class="jxr_linenumber" name="696" href="#696">696</a> <strong class="jxr_keyword">for</strong> (LoadQueueItem lqi: lqis) { -<a class="jxr_linenumber" name="697" href="#697">697</a> MutableInt count = filesMap.get(lqi.family); -<a class="jxr_linenumber" name="698" href="#698">698</a> <strong class="jxr_keyword">if</strong> (count == <strong class="jxr_keyword">null</strong>) { -<a class="jxr_linenumber" name="699" href="#699">699</a> count = <strong class="jxr_keyword">new</strong> MutableInt(); -<a class="jxr_linenumber" name="700" href="#700">700</a> filesMap.put(lqi.family, count); -<a class="jxr_linenumber" name="701" href="#701">701</a> } -<a class="jxr_linenumber" name="702" href="#702">702</a> count.increment(); -<a class="jxr_linenumber" name="703" href="#703">703</a> <strong class="jxr_keyword">if</strong> (count.intValue() > maxFilesPerRegionPerFamily) { -<a class="jxr_linenumber" name="704" href="#704">704</a> LOG.error(<span class="jxr_string">"Trying to load more than "</span> + maxFilesPerRegionPerFamily -<a class="jxr_linenumber" name="705" href="#705">705</a> + <span class="jxr_string">" hfiles to family "</span> + Bytes.toStringBinary(lqi.family) -<a class="jxr_linenumber" name="706" href="#706">706</a> + <span class="jxr_string">" of region with start key "</span> -<a class="jxr_linenumber" name="707" href="#707">707</a> + Bytes.toStringBinary(e.getKey())); -<a class="jxr_linenumber" name="708" href="#708">708</a> <strong class="jxr_keyword">return</strong> false; -<a class="jxr_linenumber" name="709" href="#709">709</a> } -<a class="jxr_linenumber" name="710" href="#710">710</a> } -<a class="jxr_linenumber" name="711" href="#711">711</a> } -<a class="jxr_linenumber" name="712" href="#712">712</a> <strong class="jxr_keyword">return</strong> <strong class="jxr_keyword">true</strong>; -<a class="jxr_linenumber" name="713" href="#713">713</a> } -<a class="jxr_linenumber" name="714" href="#714">714</a> -<a class="jxr_linenumber" name="715" href="#715">715</a> <em class="jxr_javadoccomment">/**</em> -<a class="jxr_linenumber" name="716" href="#716">716</a> <em class="jxr_javadoccomment"> * @param table the table to load into</em> -<a class="jxr_linenumber" name="717" href="#717">717</a> <em class="jxr_javadoccomment"> * @param pool the ExecutorService</em> -<a class="jxr_linenumber" name="718" href="#718">718</a> <em class="jxr_javadoccomment"> * @param queue the queue for LoadQueueItem</em> -<a class="jxr_linenumber" name="719" href="#719">719</a> <em class="jxr_javadoccomment"> * @param startEndKeys start and end keys</em> -<a class="jxr_linenumber" name="720" href="#720">720</a> <em class="jxr_javadoccomment"> * @return A map that groups LQI by likely bulk load region targets and List of missing hfiles.</em> -<a class="jxr_linenumber" name="721" href="#721">721</a> <em class="jxr_javadoccomment"> */</em> -<a class="jxr_linenumber" name="722" href="#722">722</a> <strong class="jxr_keyword">private</strong> Pair<Multimap<ByteBuffer, LoadQueueItem>, List<String>> groupOrSplitPhase( -<a class="jxr_linenumber" name="723" href="#723">723</a> <strong class="jxr_keyword">final</strong> <a href="../../../../../org/apache/hadoop/hbase/client/Table.html">Table</a> table, ExecutorService pool, Deque<LoadQueueItem> queue, -<a class="jxr_linenumber" name="724" href="#724">724</a> <strong class="jxr_keyword">final</strong> Pair<byte[][], byte[][]> startEndKeys) <strong class="jxr_keyword">throws</strong> IOException { -<a class="jxr_linenumber" name="725" href="#725">725</a> <em class="jxr_comment">// <region start key, LQI> need synchronized only within this scope of this</em> -<a class="jxr_linenumber" name="726" href="#726">726</a> <em class="jxr_comment">// phase because of the puts that happen in futures.</em> -<a class="jxr_linenumber" name="727" href="#727">727</a> Multimap<ByteBuffer, LoadQueueItem> rgs = HashMultimap.create(); -<a class="jxr_linenumber" name="728" href="#728">728</a> <strong class="jxr_keyword">final</strong> Multimap<ByteBuffer, LoadQueueItem> regionGroups = Multimaps.synchronizedMultimap(rgs); -<a class="jxr_linenumber" name="729" href="#729">729</a> List<String> missingHFiles = <strong class="jxr_keyword">new</strong> ArrayList<>(); -<a class="jxr_linenumber" name="730" href="#730">730</a> Pair<Multimap<ByteBuffer, LoadQueueItem>, List<String>> pair = <strong class="jxr_keyword">new</strong> Pair<>(regionGroups, -<a class="jxr_linenumber" name="731" href="#731">731</a> missingHFiles); -<a class="jxr_linenumber" name="732" href="#732">732</a> -<a class="jxr_linenumber" name="733" href="#733">733</a> <em class="jxr_comment">// drain LQIs and figure out bulk load groups</em> -<a class="jxr_linenumber" name="734" href="#734">734</a> Set<Future<Pair<List<LoadQueueItem>, String>>> splittingFutures = <strong class="jxr_keyword">new</strong> HashSet<>(); -<a class="jxr_linenumber" name="735" href="#735">735</a> <strong class="jxr_keyword">while</strong> (!queue.isEmpty()) { -<a class="jxr_linenumber" name="736" href="#736">736</a> <strong class="jxr_keyword">final</strong> <a href="../../../../../org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.html">LoadQueueItem</a> item = queue.remove(); -<a class="jxr_linenumber" name="737" href="#737">737</a> -<a class="jxr_linenumber" name="738" href="#738">738</a> <strong class="jxr_keyword">final</strong> Callable<Pair<List<LoadQueueItem>, String>> call = -<a class="jxr_linenumber" name="739" href="#739">739</a> <strong class="jxr_keyword">new</strong> Callable<Pair<List<LoadQueueItem>, String>>() { -<a class="jxr_linenumber" name="740" href="#740">740</a> @Override -<a class="jxr_linenumber" name="741" href="#741">741</a> <strong class="jxr_keyword">public</strong> Pair<List<LoadQueueItem>, String> call() <strong class="jxr_keyword">throws</strong> Exception { -<a class="jxr_linenumber" name="742" href="#742">742</a> Pair<List<LoadQueueItem>, String> splits = groupOrSplit(regionGroups, item, table, -<a class="jxr_linenumber" name="743" href="#743">743</a> startEndKeys); -<a class="jxr_linenumber" name="744" href="#744">744</a> <strong class="jxr_keyword">return</strong> splits; -<a class="jxr_linenumber" name="745" href="#745">745</a> } -<a class="jxr_linenumber" name="746" href="#746">746</a> }; -<a class="jxr_linenumber" name="747" href="#747">747</a> splittingFutures.add(pool.submit(call)); -<a class="jxr_linenumber" name="748" href="#748">748</a> } -<a class="jxr_linenumber" name="749" href="#749">749</a> <em class="jxr_comment">// get all the results. All grouping and splitting must finish before</em> -<a class="jxr_linenumber" name="750" href="#750">750</a> <em class="jxr_comment">// we can attempt the atomic loads.</em> -<a class="jxr_linenumber" name="751" href="#751">751</a> <strong class="jxr_keyword">for</strong> (Future<Pair<List<LoadQueueItem>, String>> lqis : splittingFutures) { -<a class="jxr_linenumber" name="752" href="#752">752</a> <strong class="jxr_keyword">try</strong> { -<a class="jxr_linenumber" name="753" href="#753">753</a> Pair<List<LoadQueueItem>, String> splits = lqis.get(); -<a class="jxr_linenumber" name="754" href="#754">754</a> <strong class="jxr_keyword">if</strong> (splits != <strong class="jxr_keyword">null</strong>) { -<a class="jxr_linenumber" name="755" href="#755">755</a> <strong class="jxr_keyword">if</strong> (splits.getFirst() != <strong class="jxr_keyword">null</strong>) { -<a class="jxr_linenumber" name="756" href="#756">756</a> queue.addAll(splits.getFirst()); -<a class="jxr_linenumber" name="757" href="#757">757</a> } <strong class="jxr_keyword">else</strong> { -<a class="jxr_linenumber" name="758" href="#758">758</a> missingHFiles.add(splits.getSecond()); -<a class="jxr_linenumber" name="759" href="#759">759</a> } -<a class="jxr_linenumber" name="760" href="#760">760</a> } -<a class="jxr_linenumber" name="761" href="#761">761</a> } <strong class="jxr_keyword">catch</strong> (ExecutionException e1) { -<a class="jxr_linenumber" name="762" href="#762">762</a> Throwable t = e1.getCause(); -<a class="jxr_linenumber" name="763" href="#763">763</a> <strong class="jxr_keyword">if</strong> (t instanceof IOException) { -<a class="jxr_linenumber" name="764" href="#764">764</a> LOG.error(<span class="jxr_string">"IOException during splitting"</span>, e1); -<a class="jxr_linenumber" name="765" href="#765">765</a> <strong class="jxr_keyword">throw</strong> (IOException)t; <em class="jxr_comment">// would have been thrown if not parallelized,</em> -<a class="jxr_linenumber" name="766" href="#766">766</a> } -<a class="jxr_linenumber" name="767" href="#767">767</a> LOG.error(<span class="jxr_string">"Unexpected execution exception during splitting"</span>, e1); -<a class="jxr_linenumber" name="768" href="#768">768</a> <strong class="jxr_keyword">throw</strong> <strong class="jxr_keyword">new</strong> IllegalStateException(t); -<a class="jxr_linenumber" name="769" href="#769">769</a> } <strong class="jxr_keyword">catch</strong> (InterruptedException e1) { -<a class="jxr_linenumber" name="770" href="#770">770</a> LOG.error(<span class="jxr_string">"Unexpected interrupted exception during splitting"</span>, e1); -<a class="jxr_linenumber" name="771" href="#771">771</a> <strong class="jxr_keyword">throw</strong> (InterruptedIOException)<strong class="jxr_keyword">new</strong> InterruptedIOException().initCause(e1); -<a class="jxr_linenumber" name="772" href="#772">772</a> } -<a class="jxr_linenumber" name="773" href="#773">773</a> } -<a class="jxr_linenumber" name="774" href="#774">774</a> <strong class="jxr_keyword">return</strong> pair; -<a class="jxr_linenumber" name="775" href="#775">775</a> } -<a class="jxr_linenumber" name="776" href="#776">776</a> -<a class="jxr_linenumber" name="777" href="#777">777</a> <em class="jxr_comment">// unique file name for the table</em> -<a class="jxr_linenumber" name="778" href="#778">778</a> <strong class="jxr_keyword">private</strong> String getUniqueName() { -<a class="jxr_linenumber" name="779" href="#779">779</a> <strong class="jxr_keyword">return</strong> UUID.randomUUID().toString().replaceAll(<span class="jxr_string">"-"</span>, <span class="jxr_string">""</span>); -<a class="jxr_linenumber" name="780" href="#780">780</a> } -<a class="jxr_linenumber" name="781" href="#781">781</a> -<a class="jxr_linenumber" name="782" href="#782">782</a> <strong class="jxr_keyword">protected</strong> List<LoadQueueItem> splitStoreFile(<strong class="jxr_keyword">final</strong> <a href="../../../../../org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.html">LoadQueueItem</a> item, -<a class="jxr_linenumber" name="783" href="#783">783</a> <strong class="jxr_keyword">final</strong> <a href="../../../../../org/apache/hadoop/hbase/client/Table.html">Table</a> table, byte[] startKey, -<a class="jxr_linenumber" name="784" href="#784">784</a> byte[] splitKey) <strong class="jxr_keyword">throws</strong> IOException { -<a class="jxr_linenumber" name="785" href="#785">785</a> <strong class="jxr_keyword">final</strong> Path hfilePath = item.hfilePath; -<a class="jxr_linenumber" name="786" href="#786">786</a> -<a class="jxr_linenumber" name="787" href="#787">787</a> Path tmpDir = item.hfilePath.getParent(); -<a class="jxr_linenumber" name="788" href="#788">788</a> <strong class="jxr_keyword">if</strong> (!tmpDir.getName().equals(TMP_DIR)) { -<a class="jxr_linenumber" name="789" href="#789">789</a> tmpDir = <strong class="jxr_keyword">new</strong> Path(tmpDir, TMP_DIR); -<a class="jxr_linenumber" name="790" href="#790">790</a> } -<a class="jxr_linenumber" name="791" href="#791">791</a> -<a class="jxr_linenumber" name="792" href="#792">792</a> LOG.info(<span class="jxr_string">"HFile at "</span> + hfilePath + <span class="jxr_string">" no longer fits inside a single "</span> + -<a class="jxr_linenumber" name="793" href="#793">793</a> <span class="jxr_string">"region. Splitting..."</span>); -<a class="jxr_linenumber" name="794" href="#794">794</a> -<a class="jxr_linenumber" name="795" href="#795">795</a> String uniqueName = getUniqueName(); -<a class="jxr_linenumber" name="796" href="#796">796</a> <a href="../../../../../org/apache/hadoop/hbase/HColumnDescriptor.html">HColumnDescriptor</a> familyDesc = table.getTableDescriptor().getFamily(item.family); -<a class="jxr_linenumber" name="797" href="#797">797</a> -<a class="jxr_linenumber" name="798" href="#798">798</a> Path botOut = <strong class="jxr_keyword">new</strong> Path(tmpDir, uniqueName + <span class="jxr_string">".bottom"</span>); -<a class="jxr_linenumber" name="799" href="#799">799</a> Path topOut = <strong class="jxr_keyword">new</strong> Path(tmpDir, uniqueName + <span class="jxr_string">".top"</span>); -<a class="jxr_linenumber" name="800" href="#800">800</a> splitStoreFile(getConf(), hfilePath, familyDesc, splitKey, botOut, topOut); -<a class="jxr_linenumber" name="801" href="#801">801</a> -<a class="jxr_linenumber" name="802" href="#802">802</a> FileSystem fs = tmpDir.getFileSystem(getConf()); -<a class="jxr_linenumber" name="803" href="#803">803</a> fs.setPermission(tmpDir, FsPermission.valueOf(<span class="jxr_string">"-rwxrwxrwx"</span>)); -<a class="jxr_linenumber" name="804" href="#804">804</a> fs.setPermission(botOut, FsPermission.valueOf(<span class="jxr_string">"-rwxrwxrwx"</span>)); -<a class="jxr_linenumber" name="805" href="#805">805</a> fs.setPermission(topOut, FsPermission.valueOf(<span class="jxr_string">"-rwxrwxrwx"</span>)); +<a class="jxr_linenumber" name="469" href="#469">469</a> Map<LoadQueueItem, ByteBuffer> item2RegionMap = <strong class="jxr_keyword">new</strong> HashMap<>(); +<a class="jxr_linenumber" name="470" href="#470">470</a> <em class="jxr_comment">// Assumes that region splits can happen while this occurs.</em> +<a class="jxr_linenumber" name="471" href="#471">471</a> <strong class="jxr_keyword">while</strong> (!queue.isEmpty()) { +<a class="jxr_linenumber" name="472" href="#472">472</a> <em class="jxr_comment">// need to reload split keys each iteration.</em> +<a class="jxr_linenumber" name="473" href="#473">473</a> <strong class="jxr_keyword">final</strong> Pair<byte[][], byte[][]> startEndKeys = regionLocator.getStartEndKeys(); +<a class="jxr_linenumber" name="474" href="#474">474</a> <strong class="jxr_keyword">if</strong> (count != 0) { +<a class="jxr_linenumber" name="475" href="#475">475</a> LOG.info(<span class="jxr_string">"Split occured while grouping HFiles, retry attempt "</span> + +<a class="jxr_linenumber" name="476" href="#476">476</a> + count + <span class="jxr_string">" with "</span> + queue.size() + <span class="jxr_string">" files remaining to group or split"</span>); +<a class="jxr_linenumber" name="477" href="#477">477</a> } +<a class="jxr_linenumber" name="478" href="#478">478</a> +<a class="jxr_linenumber" name="479" href="#479">479</a> <strong class="jxr_keyword">int</strong> maxRetries = getConf().getInt(HConstants.BULKLOAD_MAX_RETRIES_NUMBER, 10); +<a class="jxr_linenumber" name="480" href="#480">480</a> maxRetries = Math.max(maxRetries, startEndKeys.getFirst().length + 1); +<a class="jxr_linenumber" name="481" href="#481">481</a> <strong class="jxr_keyword">if</strong> (maxRetries != 0 && count >= maxRetries) { +<a class="jxr_linenumber" name="482" href="#482">482</a> <strong class="jxr_keyword">throw</strong> <strong class="jxr_keyword">new</strong> IOException(<span class="jxr_string">"Retry attempted "</span> + count + +<a class="jxr_linenumber" name="483" href="#483">483</a> <span class="jxr_string">" times without completing, bailing out"</span>); +<a class="jxr_linenumber" name="484" href="#484">484</a> } +<a class="jxr_linenumber" name="485" href="#485">485</a> count++; +<a class="jxr_linenumber" name="486" href="#486">486</a> +<a class="jxr_linenumber" name="487" href="#487">487</a> <em class="jxr_comment">// Using ByteBuffer for byte[] equality semantics</em> +<a class="jxr_linenumber" name="488" href="#488">488</a> pair = groupOrSplitPhase(table, pool, queue, startEndKeys); +<a class="jxr_linenumber" name="489" href="#489">489</a> Multimap<ByteBuffer, LoadQueueItem> regionGroups = pair.getFirst(); +<a class="jxr_linenumber" name="490" href="#490">490</a> +<a class="jxr_linenumber" name="491" href="#491">491</a> <strong class="jxr_keyword">if</strong> (!checkHFilesCountPerRegionPerFamily(regionGroups)) { +<a class="jxr_linenumber" name="492" href="#492">492</a> <em class="jxr_comment">// Error is logged inside checkHFilesCountPerRegionPerFamily.</em> +<a class="jxr_linenumber" name="493" href="#493">493</a> <strong class="jxr_keyword">throw</strong> <strong class="jxr_keyword">new</strong> IOException(<span class="jxr_string">"Trying to load more than "</span> + maxFilesPerRegionPerFamily +<a class="jxr_linenumber" name="494" href="#494">494</a> + <span class="jxr_string">" hfiles to one family of one region"</span>); +<a class="jxr_linenumber" name="495" href="#495">495</a> } +<a class="jxr_linenumber" name="496" href="#496">496</a> +<a class="jxr_linenumber" name="497" href="#497">497</a> bulkLoadPhase(table, admin.getConnection(), pool, queue, regionGroups, copyFile, +<a class="jxr_linenumber" name="498" href="#498">498</a> item2RegionMap); +<a class="jxr_linenumber" name="499" href="#499">499</a> +<a class="jxr_linenumber" name="500" href="#500">500</a> <em class="jxr_comment">// NOTE: The next iteration's split / group could happen in parallel to</em> +<a class="jxr_linenumber" name="501" href="#501">501</a> <em class="jxr_comment">// atomic bulkloads assuming that there are splits and no merges, and</em> +<a class="jxr_linenumber" name="502" href="#502">502</a> <em class="jxr_comment">// that we can atomically pull out the groups we want to retry.</em> +<a class="jxr_linenumber" name="503" href="#503">503</a> } +<a class="jxr_linenumber" name="504" href="#504">504</a> +<a class="jxr_linenumber" name="505" href="#505">505</a> <strong class="jxr_keyword">if</strong> (!queue.isEmpty()) { +<a class="jxr_linenumber" name="506" href="#506">506</a> <strong class="jxr_keyword">throw</strong> <strong class="jxr_keyword">new</strong> RuntimeException(<span class="jxr_string">"Bulk load aborted with some files not yet loaded."</span> +<a class="jxr_linenumber" name="507" href="#507">507</a> + <span class="jxr_string">"Please check log for more details."</span>); +<a class="jxr_linenumber" name="508" href="#508">508</a> } +<a class="jxr_linenumber" name="509" href="#509">509</a> <strong class="jxr_keyword">return</strong> item2RegionMap; +<a class="jxr_linenumber" name="510" href="#510">510</a> } +<a class="jxr_linenumber" name="511" href="#511">511</a> +<a class="jxr_linenumber" name="512" href="#512">512</a> <em class="jxr_javadoccomment">/**</em> +<a class="jxr_linenumber" name="513" href="#513">513</a> <em class="jxr_javadoccomment"> * Prepare a collection of {@link LoadQueueItem} from list of source hfiles contained in the</em> +<a class="jxr_linenumber" name="514" href="#514">514</a> <em class="jxr_javadoccomment"> * passed directory and validates whether the prepared queue has all the valid table column</em> +<a class="jxr_linenumber" name="515" href="#515">515</a> <em class="jxr_javadoccomment"> * families in it.</em> +<a class="jxr_linenumber" name="516" href="#516">516</a> <em class="jxr_javadoccomment"> * @param hfilesDir directory containing list of hfiles to be loaded into the table</em> +<a class="jxr_linenumber" name="517" href="#517">517</a> <em class="jxr_javadoccomment"> * @param table table to which hfiles should be loaded</em> +<a class="jxr_linenumber" name="518" href="#518">518</a> <em class="jxr_javadoccomment"> * @param queue queue which needs to be loaded into the table</em> +<a class="jxr_linenumber" name="519" href="#519">519</a> <em class="jxr_javadoccomment"> * @param validateHFile if true hfiles will be validated for its format</em> +<a class="jxr_linenumber" name="520" href="#520">520</a> <em class="jxr_javadoccomment"> * @throws IOException If any I/O or network error occurred</em> +<a class="jxr_linenumber" name="521" href="#521">521</a> <em class="jxr_javadoccomment"> */</em> +<a class="jxr_linenumber" name="522" href="#522">522</a> <strong class="jxr_keyword">public</strong> <strong class="jxr_keyword">void</strong> prepareHFileQueue(Path hfilesDir, <a href="../../../../../org/apache/hadoop/hbase/client/Table.html">Table</a> table, Deque<LoadQueueItem> queue, +<a class="jxr_linenumber" name="523" href="#523">523</a> <strong class="jxr_keyword">boolean</strong> validateHFile) <strong class="jxr_keyword">throws</strong> IOException { +<a class="jxr_linenumber" name="524" href="#524">524</a> prepareHFileQueue(hfilesDir, table, queue, validateHFile, false); +<a class="jxr_linenumber" name="525" href="#525">525</a> } +<a class="jxr_linenumber" name="526" href="#526">526</a> +<a class="jxr_linenumber" name="527" href="#527">527</a> <em class="jxr_javadoccomment">/**</em> +<a class="jxr_linenumber" name="528" href="#528">528</a> <em class="jxr_javadoccomment"> * Prepare a collection of {@link LoadQueueItem} from list of source hfiles contained in the</em> +<a class="jxr_linenumber" name="529" href="#529">529</a> <em class="jxr_javadoccomment"> * passed directory and validates whether the prepared queue has all the valid table column</em> +<a class="jxr_linenumber" name="530" href="#530">530</a> <em class="jxr_javadoccomment"> * families in it.</em> +<a class="jxr_linenumber" name="531" href="#531">531</a> <em class="jxr_javadoccomment"> * @param hfilesDir directory containing list of hfiles to be loaded into the table</em> +<a class="jxr_linenumber" name="532" href="#532">532</a> <em class="jxr_javadoccomment"> * @param table table to which hfiles should be loaded</em> +<a class="jxr_linenumber" name="533" href="#533">533</a> <em class="jxr_javadoccomment"> * @param queue queue which needs to be loaded into the table</em> +<a class="jxr_linenumber" name="534" href="#534">534</a> <em class="jxr_javadoccomment"> * @param validateHFile if true hfiles will be validated for its format</em> +<a class="jxr_linenumber" name="535" href="#535">535</a> <em class="jxr_javadoccomment"> * @param silence true to ignore unmatched column families</em> +<a class="jxr_linenumber" name="536" href="#536">536</a> <em class="jxr_javadoccomment"> * @throws IOException If any I/O or network error occurred</em> +<a class="jxr_linenumber" name="537" href="#537">537</a> <em class="jxr_javadoccomment"> */</em> +<a class="jxr_linenumber" name="538" href="#538">538</a> <strong class="jxr_keyword">public</strong> <strong class="jxr_keyword">void</strong> prepareHFileQueue(Path hfilesDir, <a href="../../../../../org/apache/hadoop/hbase/client/Table.html">Table</a> table, +<a class="jxr_linenumber" name="539" href="#539">539</a> Deque<LoadQueueItem> queue, <strong class="jxr_keyword">boolean</strong> validateHFile, <strong class="jxr_keyword">boolean</strong> silence) <strong class="jxr_keyword">throws</strong> IOException { +<a class="jxr_linenumber" name="540" href="#540">540</a> discoverLoadQueue(queue, hfilesDir, validateHFile); +<a class="jxr_linenumber" name="541" href="#541">541</a> validateFamiliesInHFiles(table, queue, silence); +<a class="jxr_linenumber" name="542" href="#542">542</a> } +<a class="jxr_linenumber" name="543" href="#543">543</a> +<a class="jxr_linenumber" name="544" href="#544">544</a> <em class="jxr_javadoccomment">/**</em> +<a class="jxr_linenumber" name="545" href="#545">545</a> <em class="jxr_javadoccomment"> * Prepare a collection of {@link LoadQueueItem} from list of source hfiles contained in the</em> +<a class="jxr_linenumber" name="546" href="#546">546</a> <em class="jxr_javadoccomment"> * passed directory and validates whether the prepared queue has all the valid table column</em> +<a class="jxr_linenumber" name="547" href="#547">547</a> <em class="jxr_javadoccomment"> * families in it.</em> +<a class="jxr_linenumber" name="548" href="#548">548</a> <em class="jxr_javadoccomment"> * @param map map of family to List of hfiles</em> +<a class="jxr_linenumber" name="549" href="#549">549</a> <em class="jxr_javadoccomment"> * @param table table to which hfiles should be loaded</em> +<a class="jxr_linenumber" name="550" href="#550">550</a> <em class="jxr_javadoccomment"> * @param queue queue which needs to be loaded into the table</em> +<a class="jxr_linenumber" name="551" href="#551">551</a> <em class="jxr_javadoccomment"> * @param silence true to ignore unmatched column families</em> +<a class="jxr_linenumber" name="552" href="#552">552</a> <em class="jxr_javadoccomment"> * @throws IOException If any I/O or network error occurred</em> +<a class="jxr_linenumber" name="553" href="#553">553</a> <em class="jxr_javadoccomment"> */</em> +<a class="jxr_linenumber" name="554" href="#554">554</a> <strong class="jxr_keyword">public</strong> <strong class="jxr_keyword">void</strong> prepareHFileQueue(Map<byte[], List<Path>> map, <a href="../../../../../org/apache/hadoop/hbase/client/Table.html">Table</a> table, +<a class="jxr_linenumber" name="555" href="#555">555</a> Deque<LoadQueueItem> queue, <strong class="jxr_keyword">boolean</strong> silence) <strong class="jxr_keyword">throws</strong> IOException { +<a class="jxr_linenumber" name="556" href="#556">556</a> populateLoadQueue(queue, map); +<a class="jxr_linenumber" name="557" href="#557">557</a> validateFamiliesInHFiles(table, queue, silence); +<a class="jxr_linenumber" name="558" href="#558">558</a> } +<a class="jxr_linenumber" name="559" href="#559">559</a> +<a class="jxr_linenumber" name="560" href="#560">560</a> <em class="jxr_comment">// Initialize a thread pool</em> +<a class="jxr_linenumber" name="561" href="#561">561</a> <strong class="jxr_keyword">private</strong> ExecutorService createExecutorService() { +<a class="jxr_linenumber" name="562" href="#562">562</a> ThreadFactoryBuilder builder = <strong class="jxr_keyword">new</strong> ThreadFactoryBuilder(); +<a class="jxr_linenumber" name="563" href="#563">563</a> builder.setNameFormat(<span class="jxr_string">"LoadIncrementalHFiles-%1$d"</span>); +<a class="jxr_linenumber" name="564" href="#564">564</a> ExecutorService pool = <strong class="jxr_keyword">new</strong> ThreadPoolExecutor(nrThreads, nrThreads, 60, TimeUnit.SECONDS, +<a class="jxr_linenumber" name="565" href="#565">565</a> <strong class="jxr_keyword">new</strong> LinkedBlockingQueue<Runnable>(), builder.build()); +<a class="jxr_linenumber" name="566" href="#566">566</a> ((ThreadPoolExecutor) pool).allowCoreThreadTimeOut(<strong class="jxr_keyword">true</strong>); +<a class="jxr_linenumber" name="567" href="#567">567</a> <strong class="jxr_keyword">return</strong> pool; +<a class="jxr_linenumber" name="568" href="#568">568</a> } +<a class="jxr_linenumber" name="569" href="#569">569</a> +<a class="jxr_linenumber" name="570" href="#570">570</a> <em class="jxr_javadoccomment">/**</em> +<a class="jxr_linenumber" name="571" href="#571">571</a> <em class="jxr_javadoccomment"> * Checks whether there is any invalid family name in HFiles to be bulk loaded.</em> +<a class="jxr_linenumber" name="572" href="#572">572</a> <em class="jxr_javadoccomment"> */</em> +<a class="jxr_linenumber" name="573" href="#573">573</a> <strong class="jxr_keyword">private</strong> <strong class="jxr_keyword">void</strong> validateFamiliesInHFiles(<a href="../../../../../org/apache/hadoop/hbase/client/Table.html">Table</a> table, Deque<LoadQueueItem> queue, <strong class="jxr_keyword">boolean</strong> silence) +<a class="jxr_linenumber" name="574" href="#574">574</a> <strong class="jxr_keyword">throws</strong> IOException { +<a class="jxr_linenumber" name="575" href="#575">575</a> Collection<HColumnDescriptor> families = table.getTableDescriptor().getFamilies(); +<a class="jxr_linenumber" name="576" href="#576">576</a> List<String> familyNames = <strong class="jxr_keyword">new</strong> ArrayList<>(families.size()); +<a class="jxr_linenumber" name="577" href="#577">577</a> <strong class="jxr_keyword">for</strong> (HColumnDescriptor family : families) { +<a class="jxr_linenumber" name="578" href="#578">578</a> familyNames.add(family.getNameAsString()); +<a class="jxr_linenumber" name="579" href="#579">579</a> } +<a class="jxr_linenumber" name="580" href="#580">580</a> Iterator<LoadQueueItem> queueIter = queue.iterator(); +<a class="jxr_linenumber" name="581" href="#581">581</a> <strong class="jxr_keyword">while</strong> (queueIter.hasNext()) { +<a class="jxr_linenumber" name="582" href="#582">582</a> <a href="../../../../../org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.html">LoadQueueItem</a> lqi = queueIter.next(); +<a class="jxr_linenumber" name="583" href="#583">583</a> String familyNameInHFile = Bytes.toString(lqi.family); +<a class="jxr_linenumber" name="584" href="#584">584</a> <strong class="jxr_keyword">if</strong> (!familyNames.contains(familyNameInHFile)) { +<a class="jxr_linenumber" name="585" href="#585">585</a> unmatchedFamilies.add(familyNameInHFile); +<a class="jxr_linenumber" name="586" href="#586">586</a> } +<a class="jxr_linenumber" name="587" href="#587">587</a> } +<a class="jxr_linenumber" name="588" href="#588">588</a> <strong class="jxr_keyword">if</strong> (unmatchedFamilies.size() > 0) { +<a class="jxr_linenumber" name="589" href="#589">589</a> String msg = +<a class="jxr_linenumber" name="590" href="#590">590</a> <span class="jxr_string">"Unmatched family names found: unmatched family names in HFiles to be bulkloaded: "</span> +<a class="jxr_linenumber" name="591" href="#591">591</a> + unmatchedFamilies + <span class="jxr_string">"; valid family names of table "</span> + table.getName() + <span class="jxr_string">" are: "</span> +<a class="jxr_linenumber" name="592" href="#592">592</a> + familyNames; +<a class="jxr_linenumber" name="593" href="#593">593</a> LOG.error(msg); +<a class="jxr_linenumber" name="594" href="#594">594</a> <strong class="jxr_keyword">if</strong> (!silence) <strong class="jxr_keyword">throw</strong> <strong class="jxr_keyword">new</strong> IOException(msg); +<a class="jxr_linenumber" name="595" href="#595">595</a> } +<a class="jxr_linenumber" name="596" href="#596">596</a> } +<a class="jxr_linenumber" name="597" href="#597">597</a> +<a class="jxr_linenumber" name="598" href="#598">598</a> <em class="jxr_javadoccomment">/**</em> +<a class="jxr_linenumber" name="599" href="#599">599</a> <em class="jxr_javadoccomment"> * Used by the replication sink to load the hfiles from the source cluster. It does the following,</em> +<a class="jxr_linenumber" name="600" href="#600">600</a> <em class="jxr_javadoccomment"> * <ol></em> +<a class="jxr_linenumber" name="601" href="#601">601</a> <em class="jxr_javadoccomment"> * <li>LoadIncrementalHFiles#groupOrSplitPhase(Table, ExecutorService, Deque, Pair)}</li></em> +<a class="jxr_linenumber" name="602" href="#602">602</a> <em class="jxr_javadoccomment"> * <li>LoadIncrementalHFiles#bulkLoadPhase(Table, Connection, ExecutorService, Deque, Multimap)</em> +<a class="jxr_linenumber" name="603" href="#603">603</a> <em class="jxr_javadoccomment"> * </li></em> +<a class="jxr_linenumber" name="604" href="#604">604</a> <em class="jxr_javadoccomment"> * </ol></em> +<a class="jxr_linenumber" name="605" href="#605">605</a> <em class="jxr_javadoccomment"> * @param table Table to which these hfiles should be loaded to</em> +<a class="jxr_linenumber" name="606" href="#606">606</a> <em class="jxr_javadoccomment"> * @param conn Connection to use</em> +<a class="jxr_linenumber" name="607" href="#607">607</a> <em class="jxr_javadoccomment"> * @param queue {@link LoadQueueItem} has hfiles yet to be loaded</em> +<a class="jxr_linenumber" name="608" href="#608">608</a> <em class="jxr_javadoccomment"> * @param startEndKeys starting and ending row keys of the region</em> +<a class="jxr_linenumber" name="609" href="#609">609</a> <em class="jxr_javadoccomment"> */</em> +<a class="jxr_linenumber" name="610" href="#610">610</a> <strong class="jxr_keyword">public</strong> <strong class="jxr_keyword">void</strong> loadHFileQueue(<strong class="jxr_keyword">final</strong> <a href="../../../../../org/apache/hadoop/hbase/client/Table.html">Table</a> table, <strong class="jxr_keyword">final</strong> <a href="../../../../../org/apache/hadoop/hbase/client/Connection.html">Connection</a> conn, Deque<LoadQueueItem> queue, +<a class="jxr_linenumber" name="611" href="#611">611</a> Pair<byte[][], byte[][]> startEndKeys) <strong class="jxr_keyword">throws</strong> IOException { +<a class="jxr_linenumber" name="612" href="#612">612</a> loadHFileQueue(table, conn, queue, startEndKeys, false); +<a class="jxr_linenumber" name="613" href="#613">613</a> } +<a class="jxr_linenumber" name="614" href="#614">614</a> +<a class="jxr_linenumber" name="615" href="#615">615</a> <em class="jxr_javadoccomment">/**</em> +<a class="jxr_linenumber" name="616" href="#616">616</a> <em class="jxr_javadoccomment"> * Used by the replication sink to load the hfiles from the source cluster. It does the following,</em> +<a class="jxr_linenumber" name="617" href="#617">617</a> <em class="jxr_javadoccomment"> * <ol></em> +<a class="jxr_linenumber" name="618" href="#618">618</a> <em class="jxr_javadoccomment"> * <li>LoadIncrementalHFiles#groupOrSplitPhase(Table, ExecutorService, Deque, Pair)}</li></em> +<a class="jxr_linenumber" name="619" href="#619">619</a> <em class="jxr_javadoccomment"> * <li>LoadIncrementalHFiles#bulkLoadPhase(Table, Connection, ExecutorService, Deque, Multimap)</em> +<a class="jxr_linenumber" name="620" href="#620">620</a> <em class="jxr_javadoccomment"> * </li></em> +<a class="jxr_linenumber" name="621" href="#621">621</a> <em class="jxr_javadoccomment"> * </ol></em> +<a class="jxr_linenumber" name="622" href="#622">622</a> <em class="jxr_javadoccomment"> * @param table Table to which these hfiles should be loaded to</em> +<a class="jxr_linenumber" name="623" href="#623">623</a> <em class="jxr_javadoccomment"> * @param conn Connection to use</em> +<a class="jxr_linenumber" name="624" href="#624">624</a> <em class="jxr_javadoccomment"> * @param queue {@link LoadQueueItem} has hfiles yet to be loaded</em> +<a class="jxr_linenumber" name="625" href="#625">625</a> <em class="jxr_javadoccomment"> * @param startEndKeys starting and ending row keys of the region</em> +<a class="jxr_linenumber" name="626" href="#626">626</a> <em class="jxr_javadoccomment"> */</em> +<a class="jxr_linenumber" name="627" href="#627">627</a> <strong class="jxr_keyword">public</strong> <strong class="jxr_keyword">void</strong> loadHFileQueue(<strong class="jxr_keyword">final</strong> <a href="../../../../../org/apache/hadoop/hbase/client/Table.html">Table</a> table, <strong class="jxr_keyword">final</strong> <a href="../../../../../org/apache/hadoop/hbase/client/Connection.html">Connection</a> conn, Deque<LoadQueueItem> queue, +<a class="jxr_linenumber" name="628" href="#628">628</a> Pair<byte[][], byte[][]> startEndKeys, <strong class="jxr_keyword">boolean</strong> copyFile) <strong class="jxr_keyword">throws</strong> IOException { +<a class="jxr_linenumber" name="629" href="#629">629</a> ExecutorService pool = <strong class="jxr_keyword">null</strong>; +<a class="jxr_linenumber" name="630" href="#630">630</a> <strong class="jxr_keyword">try</strong> { +<a class="jxr_linenumber" name="631" href="#631">631</a> pool = createExecutorService(); +<a class="jxr_linenumber" name="632" href="#632">632</a> Multimap<ByteBuffer, LoadQueueItem> regionGroups = +<a class="jxr_linenumber" name="633" href="#633">633</a> groupOrSplitPhase(table, pool, queue, startEndKeys).getFirst(); +<a class="jxr_linenumber" name="634" href="#634">634</a> bulkLoadPhase(table, conn, pool, queue, regionGroups, copyFile, <strong class="jxr_keyword">null</strong>); +<a class="jxr_linenumber" name="635" href="#635">635</a> } <strong class="jxr_keyword">finally</strong> { +<a class="jxr_linenumber" name="636" href="#636">636</a> <strong class="jxr_keyword">if</strong> (pool != <strong class="jxr_keyword">null</strong>) { +<a class="jxr_linenumber" name="637" href="#637">637</a> pool.shutdown(); +<a class="jxr_linenumber" name="638" href="#638">638</a> } +<a class="jxr_linenumber" name="639" href="#639">639</a> } +<a class="jxr_linenumber" name="640" href="#640">640</a> } +<a class="jxr_linenumber" name="641" href="#641">641</a> +<a class="jxr_linenumber" name="642" href="#642">642</a> <em class="jxr_javadoccomment">/**</em> +<a class="jxr_linenumber" name="643" href="#643">643</a> <em class="jxr_javadoccomment"> * This takes the LQI's grouped by likely regions and attempts to bulk load</em> +<a class="jxr_linenumber" name="644" href="#644">644</a> <em class="jxr_javadoccomment"> * them. Any failures are re-queued for another pass with the</em> +<a class="jxr_linenumber" name="645" href="#645">645</a> <em class="jxr_javadoccomment"> * groupOrSplitPhase.</em> +<a class="jxr_linenumber" name="646" href="#646">646</a> <em class="jxr_javadoccomment"> */</em> +<a class="jxr_linenumber" name="647" href="#647">647</a> <strong class="jxr_keyword">protected</strong> <strong class="jxr_keyword">void</strong> bulkLoadPhase(<strong class="jxr_keyword">final</strong> <a href="../../../../../org/apache/hadoop/hbase/client/Table.html">Table</a> table, <strong class="jxr_keyword">final</strong> <a href="../../../../../org/apache/hadoop/hbase/client/Connection.html">Connection</a> conn, +<a class="jxr_linenumber" name="648" href="#648">648</a> ExecutorService pool, Deque<LoadQueueItem> queue, +<a class="jxr_linenumber" name="649" href="#649">649</a> <strong class="jxr_keyword">final</strong> Multimap<ByteBuffer, LoadQueueItem> regionGroups, <strong class="jxr_keyword">boolean</strong> copyFile, +<a class="jxr_linenumber" name="650" href="#650">650</a> Map<LoadQueueItem, ByteBuffer> item2RegionMap) <strong class="jxr_keyword">throws</strong> IOException { +<a class="jxr_linenumber" name="651" href="#651">651</a> <em class="jxr_comment">// atomically bulk load the groups.</em> +<a class="jxr_linenumber" name="652" href="#652">652</a> Set<Future<List<LoadQueueItem>>> loadingFutures = <strong class="jxr_keyword">new</strong> HashSet<>(); +<a class="jxr_linenumber" name="653" href="#653">653</a> <strong class="jxr_keyword">for</strong> (Entry<ByteBuffer, ? <strong class="jxr_keyword">extends</strong> Collection<LoadQueueItem>> e: regionGroups.asMap().entrySet()){ +<a class="jxr_linenumber" name="654" href="#654">654</a> <strong class="jxr_keyword">final</strong> byte[] first = e.getKey().array(); +<a class="jxr_linenumber" name="655" href="#655">655</a> <strong class="jxr_keyword">final</strong> Collection<LoadQueueItem> lqis = e.getValue(); +<a class="jxr_linenumber" name="656" href="#656">656</a> +<a class="jxr_linenumber" name="657" href="#657">657</a> <strong class="jxr_keyword">final</strong> Callable<List<LoadQueueItem>> call = <strong class="jxr_keyword">new</strong> Callable<List<LoadQueueItem>>() { +<a class="jxr_linenumber" name="658" href="#658">658</a> @Override +<a class="jxr_linenumber" name="659" href="#659">659</a> <strong class="jxr_keyword">public</strong> List<LoadQueueItem> call() <strong class="jxr_keyword">throws</strong> Exception { +<a class="jxr_linenumber" name="660" href="#660">660</a> List<LoadQueueItem> toRetry = +<a class="jxr_linenumber" name="661" href="#661">661</a> tryAtomicRegionLoad(conn, table.getName(), first, lqis, copyFile); +<a class="jxr_linenumber" name="662" href="#662">662</a> <strong class="jxr_keyword">return</strong> toRetry; +<a class="jxr_linenumber" name="663" href="#663">663</a> } +<a class="jxr_linenumber" name="664" href="#664">664</a> }; +<a class="jxr_linenumber" name="665" href="#665">665</a> <strong class="jxr_keyword">if</strong> (item2RegionMap != <strong class="jxr_keyword">null</strong>) { +<a class="jxr_linenumber" name="666" href="#666">666</a> <strong class="jxr_keyword">for</strong> (LoadQueueItem lqi : lqis) { +<a class="jxr_linenumber" name="667" href="#667">667</a> item2RegionMap.put(lqi, e.getKey()); +<a class="jxr_linenumber" name="668" href="#668">668</a> } +<a class="jxr_linenumber" name="669" href="#669">669</a> } +<a class="jxr_linenumber" name="670" href="#670">670</a> loadingFutures.add(pool.submit(call)); +<a class="jxr_linenumber" name="671" href="#671">671</a> } +<a class="jxr_linenumber" name="672" href="#672">672</a> +<a class="jxr_linenumber" name="673" href="#673">673</a> <em class="jxr_comment">// get all the results.</em> +<a class="jxr_linenumber" name="674" href="#674">674</a> <strong class="jxr_keyword">for</strong> (Future<List<LoadQueueItem>> future : loadingFutures) { +<a class="jxr_linenumber" name="675" href="#675">675</a> <strong class="jxr_keyword">try</strong> { +<a class="jxr_linenumber" name="676" href="#676">676</a> List<LoadQueueItem> toRetry = future.get(); +<a class="jxr_linenumber" name="677" href="#677">677</a> +<a class="jxr_linenumber" name="678" href="#678">678</a> <strong class="jxr_keyword">if</strong> (item2RegionMap != <strong class="jxr_keyword">null</strong>) { +<a class="jxr_linenumber" name="679" href="#679">679</a> <strong class="jxr_keyword">for</strong> (LoadQueueItem lqi : toRetry) { +<a class="jxr_linenumber" name="680" href="#680">680</a> item2RegionMap.remove(lqi); +<a class="jxr_linenumber" name="681" href="#681">681</a> } +<a class="jxr_linenumber" name="682" href="#682">682</a> } +<a class="jxr_linenumber" name="683" href="#683">683</a> <em class="jxr_comment">// LQIs that are requeued to be regrouped.</em> +<a class="jxr_linenumber" name="684" href="#684">684</a> queue.addAll(toRetry); +<a class="jxr_linenumber" name="685" href="#685">685</a> +<a class="jxr_linenumber" name="686" href="#686">686</a> } <strong class="jxr_keyword">catch</strong> (ExecutionException e1) { +<a class="jxr_linenumber" name="687" href="#687">687</a> Throwable t = e1.getCause(); +<a class="jxr_linenumber" name="688" href="#688">688</a> <strong class="jxr_keyword">if</strong> (t instanceof IOException) { +<a class="jxr_linenumber" name="689" href="#689">689</a> <em class="jxr_comment">// At this point something unrecoverable has happened.</em> +<a class="jxr_linenumber" name="690" href="#690">690</a> <em class="jxr_comment">// TODO Implement bulk load recovery</em> +<a class="jxr_linenumber" name="691" href="#691">691</a> <strong class="jxr_keyword">throw</strong> <strong class="jxr_keyword">new</strong> IOException(<span class="jxr_string">"BulkLoad encountered an unrecoverable problem"</span>, t); +<a class="jxr_linenumber" name="692" href="#692">692</a> } +<a class="jxr_linenumber" name="693" href="#693">693</a> LOG.error(<span class="jxr_string">"Unexpected execution exception during bulk load"</span>, e1); +<a class="jxr_linenumber" name="694" href="#694">694</a> <strong class="jxr_keyword">throw</strong> <strong class="jxr_keyword">new</strong> IllegalStateException(t); +<a class="jxr_linenumber" name="695" href="#695">695</a> } <strong class="jxr_keyword">catch</strong> (InterruptedException e1) { +<a class="jxr_linenumber" name="696" href="#696">696</a> LOG.error(<span class="jxr_string">"Unexpected interrupted exception during bulk load"</span>, e1); +<a class="jxr_linenumber" name="697" href="#697">697</a> <strong class="jxr_keyword">throw</strong> (InterruptedIOException)<strong class="jxr_keyword">new</strong> InterruptedIOException().initCause(e1); +<a class="jxr_linenumber" name="698" href="#698">698</a> } +<a class="jxr_linenumber" name="699" href="#699">699</a> } +<a class="jxr_linenumber" name="700" href="#700">700</a> } +<a class="jxr_linenumber" name="701" href="#701">701</a> +<a class="jxr_linenumber" name="702" href="#702">702</a> <strong class="jxr_keyword">private</strong> <strong class="jxr_keyword">boolean</strong> checkHFilesCountPerRegionPerFamily( +<a class="jxr_linenumber" name="703" href="#703">703</a> <strong class="jxr_keyword">final</strong> Multimap<ByteBuffer, LoadQueueItem> regionGroups) { +<a class="jxr_linenumber" name="704" href="#704">704</a> <strong class="jxr_keyword">for</strong> (Entry<ByteBuffer, +<a class="jxr_linenumber" name="705" href="#705">705</a> ? <strong class="jxr_keyword">extends</strong> Collection<LoadQueueItem>> e: regionGroups.asMap().entrySet()) { +<a class="jxr_linenumber" name="706" href="#706">706</a> <strong class="jxr_keyword">final</strong> Collection<LoadQueueItem> lqis = e.getValue(); +<a class="jxr_linenumber" name="707" href="#707">707</a> HashMap<byte[], MutableInt> filesMap = <strong class="jxr_keyword">new</strong> HashMap<>(); +<a class="jxr_linenumber" name="708" href="#708">708</a> <strong class="jxr_keyword">for</strong> (LoadQueueItem lqi: lqis) { +<a class="jxr_linenumber" name="709" href="#709">709</a> MutableInt count = filesMap.get(lqi.family); +<a class="jxr_linenumber" name="710" href="#710">710</a> <strong class="jxr_keyword">if</strong> (count == <strong class="jxr_keyword">null</strong>) { +<a class="jxr_linenumber" name="711" href="#711">711</a> count = <strong class="jxr_keyword">new</strong> MutableInt(); +<a class="jxr_linenumber" name="712" href="#712">712</a> filesMap.put(lqi.family, count); +<a class="jxr_linenumber" name="713" href="#713">713</a> } +<a class="jxr_linenumber" name="714" href="#714">714</a> count.increment(); +<a class="jxr_linenumber" name="715" href="#715">715</a> <strong class="jxr_keyword">if</strong> (count.intValue() > maxFilesPerRegionPerFamily) { +<a class="jxr_linenumber" name="716" href="#716">716</a> LOG.error(<span class="jxr_string">"Trying to load more than "</span> + maxFilesPerRegionPerFamily +<a class="jxr_linenumber" name="717" href="#717">717</a> + <span class="jxr_string">" hfiles to family "</span> + Bytes.toStringBinary(lqi.family) +<a class="jxr_linenumber" name="718" href="#718">718</a> + <span class="jxr_string">" of region with start key "</span> +<a class="jxr_linenumber" name="719" href="#719">719</a> + Bytes.toStringBinary(e.getKey())); +<a class="jxr_linenumber" name="720" href="#720">720</a> <strong class="jxr_keyword">return</strong> false; +<a class="jxr_linenumber" name="721" href="#721">721</a> } +<a class="jxr_linenumber" name="722" href="#722">722</a> } +<a class="jxr_linenumber" name="723" href="#723">723</a> } +<a class="jxr_linenumber" name="724" href="#724">724</a> <strong class="jxr_keyword">return</strong> <strong class="jxr_keyword">true</strong>; +<a class="jxr_linenumber" name="725" href="#725">725</a> } +<a class="jxr_linenumber" name="726" href="#726">726</a> +<a class="jxr_linenumber" name="727" href="#727">727</a> <em class="jxr_javadoccomment">/**</em> +<a class="jxr_linenumber" name="728" href="#728">728</a> <em class="jxr_javadoccomment"> * @param table the table to load into</em> +<a class="jxr_linenumber" name="729" href="#729">729</a> <em class="jxr_javadoccomment"> * @param pool the ExecutorService</em> +<a class="jxr_linenumber" name="730" href="#730">730</a> <em class="jxr_javadoccomment"> * @param queue the queue for LoadQueueItem</em> +<a class="jxr_linenumber" name="731" href="#731">731</a> <em class="jxr_javadoccomment"> * @param startEndKeys start and end keys</em> +<a class="jxr_linenumber" name="732" href="#732">732</a> <em class="jxr_javadoccomment"> * @return A map that groups LQI by likely bulk load region targets and Set of missing hfiles.</em> +<a class="jxr_linenumber" name="733" href="#733">733</a> <em class="jxr_javadoccomment"> */</em> +<a class="jxr_linenumber" name="734" href="#734">734</a> <strong class="jxr_keyword">private</strong> Pair<Multimap<ByteBuffer, LoadQueueItem>, Set<String>> groupOrSplitPhase( +<a class="jxr_linenumber" name="735" href="#735">735</a> <strong class="jxr_keyword">final</strong> <a href="../../../../../org/apache/hadoop/hbase/client/Table.html">Table</a> table, ExecutorService pool, Deque<LoadQueueItem> queue, +<a class="jxr_linenumber" name="736" href="#736">736</a> <strong class="jxr_keyword">final</strong> Pair<byte[][], byte[][]> startEndKeys) <strong class="jxr_keyword">throws</strong> IOException { +<a class="jxr_linenumber" name="737" href="#737">737</a> <em class="jxr_comment">// <region start key, LQI> need synchronized only within this scope of this</em> +<a class="jxr_linenumber" name="738" href="#738">738</a> <em class="jxr_comment">// phase because of the puts that happen in futures.</em> +<a class="jxr_linenumber" name="739" href="#739">739</a> Multimap<ByteBuffer, LoadQueueItem> rgs = HashMultimap.create(); +<a class="jxr_linenumber" name="740" href="#740">740</a> <strong class="jxr_keyword">final</strong> Multimap<ByteBuffer, LoadQueueItem> regionGroups = Multimaps.synchronizedMultimap(rgs); +<a class="jxr_linenumber" name="741" href="#741">741</a> Set<String> missingHFiles = <strong class="jxr_keyword">new</strong> HashSet<>(); +<a class="jxr_linenumber" name="742" href="#742">742</a> Pair<Multimap<ByteBuffer, LoadQueueItem>, Set<String>> pair = <strong class="jxr_keyword">new</strong> Pair<>(regionGroups, +<a class="jxr_linenumber" name="743" href="#743">743</a> missingHFiles); +<a class="jxr_linenumber" name="744" href="#744">744</a> +<a class="jxr_linenumber" name="745" href="#745">745</a> <em class="jxr_comment">// drain LQIs and figure out bulk load groups</em> +<a class="jxr_linenumber"
<TRUNCATED>
