Merge branch '1.6' into 1.7

Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/91b161a9
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/91b161a9
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/91b161a9

Branch: refs/heads/1.7
Commit: 91b161a932307e2d16845a0c0f6304f123a140b9
Parents: 62821a0 4deaf73
Author: Eric C. Newton <eric.new...@gmail.com>
Authored: Tue Oct 13 16:50:18 2015 -0400
Committer: Eric C. Newton <eric.new...@gmail.com>
Committed: Tue Oct 13 16:50:18 2015 -0400

----------------------------------------------------------------------
 .../main/java/org/apache/accumulo/master/tableOps/BulkImport.java   | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/91b161a9/server/master/src/main/java/org/apache/accumulo/master/tableOps/BulkImport.java
----------------------------------------------------------------------
diff --cc 
server/master/src/main/java/org/apache/accumulo/master/tableOps/BulkImport.java
index 031a80c,e661968..ad20473
--- 
a/server/master/src/main/java/org/apache/accumulo/master/tableOps/BulkImport.java
+++ 
b/server/master/src/main/java/org/apache/accumulo/master/tableOps/BulkImport.java
@@@ -263,5 -284,337 +263,6 @@@ public class BulkImport extends MasterR
      Utils.unreserveHdfsDirectory(sourceDir, tid);
      Utils.unreserveHdfsDirectory(errorDir, tid);
      Utils.getReadLock(tableId, tid).unlock();
+     ZooArbitrator.cleanup(Constants.BULK_ARBITRATOR_TYPE, tid);
    }
  }
 -
 -class CleanUpBulkImport extends MasterRepo {
 -
 -  private static final long serialVersionUID = 1L;
 -
 -  private static final Logger log = Logger.getLogger(CleanUpBulkImport.class);
 -
 -  private String tableId;
 -  private String source;
 -  private String bulk;
 -  private String error;
 -
 -  public CleanUpBulkImport(String tableId, String source, String bulk, String 
error) {
 -    this.tableId = tableId;
 -    this.source = source;
 -    this.bulk = bulk;
 -    this.error = error;
 -  }
 -
 -  @Override
 -  public Repo<Master> call(long tid, Master master) throws Exception {
 -    log.debug("removing the bulk processing flag file in " + bulk);
 -    Path bulkDir = new Path(bulk);
 -    MetadataTableUtil.removeBulkLoadInProgressFlag("/" + 
bulkDir.getParent().getName() + "/" + bulkDir.getName());
 -    MetadataTableUtil.addDeleteEntry(tableId, bulkDir.toString());
 -    log.debug("removing the metadata table markers for loaded files");
 -    Connector conn = master.getConnector();
 -    MetadataTableUtil.removeBulkLoadEntries(conn, tableId, tid);
 -    log.debug("releasing HDFS reservations for " + source + " and " + error);
 -    Utils.unreserveHdfsDirectory(source, tid);
 -    Utils.unreserveHdfsDirectory(error, tid);
 -    Utils.getReadLock(tableId, tid).unlock();
 -    log.debug("completing bulk import transaction " + tid);
 -    ZooArbitrator.cleanup(Constants.BULK_ARBITRATOR_TYPE, tid);
 -    return null;
 -  }
 -}
 -
 -class CompleteBulkImport extends MasterRepo {
 -
 -  private static final long serialVersionUID = 1L;
 -
 -  private String tableId;
 -  private String source;
 -  private String bulk;
 -  private String error;
 -
 -  public CompleteBulkImport(String tableId, String source, String bulk, 
String error) {
 -    this.tableId = tableId;
 -    this.source = source;
 -    this.bulk = bulk;
 -    this.error = error;
 -  }
 -
 -  @Override
 -  public Repo<Master> call(long tid, Master master) throws Exception {
 -    ZooArbitrator.stop(Constants.BULK_ARBITRATOR_TYPE, tid);
 -    return new CopyFailed(tableId, source, bulk, error);
 -  }
 -}
 -
 -class CopyFailed extends MasterRepo {
 -
 -  private static final long serialVersionUID = 1L;
 -
 -  private String tableId;
 -  private String source;
 -  private String bulk;
 -  private String error;
 -
 -  public CopyFailed(String tableId, String source, String bulk, String error) 
{
 -    this.tableId = tableId;
 -    this.source = source;
 -    this.bulk = bulk;
 -    this.error = error;
 -  }
 -
 -  @Override
 -  public long isReady(long tid, Master master) throws Exception {
 -    Set<TServerInstance> finished = new HashSet<TServerInstance>();
 -    Set<TServerInstance> running = master.onlineTabletServers();
 -    for (TServerInstance server : running) {
 -      try {
 -        TServerConnection client = master.getConnection(server);
 -        if (client != null && !client.isActive(tid))
 -          finished.add(server);
 -      } catch (TException ex) {
 -        log.info("Ignoring error trying to check on tid " + tid + " from 
server " + server + ": " + ex);
 -      }
 -    }
 -    if (finished.containsAll(running))
 -      return 0;
 -    return 500;
 -  }
 -
 -  @Override
 -  public Repo<Master> call(long tid, Master master) throws Exception {
 -    // This needs to execute after the arbiter is stopped
 -
 -    VolumeManager fs = master.getFileSystem();
 -
 -    if (!fs.exists(new Path(error, BulkImport.FAILURES_TXT)))
 -      return new CleanUpBulkImport(tableId, source, bulk, error);
 -
 -    HashMap<FileRef,String> failures = new HashMap<FileRef,String>();
 -    HashMap<FileRef,String> loadedFailures = new HashMap<FileRef,String>();
 -
 -    FSDataInputStream failFile = fs.open(new Path(error, 
BulkImport.FAILURES_TXT));
 -    BufferedReader in = new BufferedReader(new InputStreamReader(failFile, 
UTF_8));
 -    try {
 -      String line = null;
 -      while ((line = in.readLine()) != null) {
 -        Path path = new Path(line);
 -        if (!fs.exists(new Path(error, path.getName())))
 -          failures.put(new FileRef(line, path), line);
 -      }
 -    } finally {
 -      failFile.close();
 -    }
 -
 -    /*
 -     * I thought I could move files that have no file references in the 
table. However its possible a clone references a file. Therefore only move 
files that
 -     * have no loaded markers.
 -     */
 -
 -    // determine which failed files were loaded
 -    Connector conn = master.getConnector();
 -    Scanner mscanner = new 
IsolatedScanner(conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY));
 -    mscanner.setRange(new KeyExtent(new Text(tableId), null, 
null).toMetadataRange());
 -    mscanner.fetchColumnFamily(TabletsSection.BulkFileColumnFamily.NAME);
 -
 -    for (Entry<Key,Value> entry : mscanner) {
 -      if (Long.parseLong(entry.getValue().toString()) == tid) {
 -        FileRef loadedFile = new FileRef(fs, entry.getKey());
 -        String absPath = failures.remove(loadedFile);
 -        if (absPath != null) {
 -          loadedFailures.put(loadedFile, absPath);
 -        }
 -      }
 -    }
 -
 -    // move failed files that were not loaded
 -    for (String failure : failures.values()) {
 -      Path orig = new Path(failure);
 -      Path dest = new Path(error, orig.getName());
 -      fs.rename(orig, dest);
 -      log.debug("tid " + tid + " renamed " + orig + " to " + dest + ": import 
failed");
 -    }
 -
 -    if (loadedFailures.size() > 0) {
 -      DistributedWorkQueue bifCopyQueue = new 
DistributedWorkQueue(Constants.ZROOT + "/" + 
HdfsZooInstance.getInstance().getInstanceID()
 -          + Constants.ZBULK_FAILED_COPYQ);
 -
 -      HashSet<String> workIds = new HashSet<String>();
 -
 -      for (String failure : loadedFailures.values()) {
 -        Path orig = new Path(failure);
 -        Path dest = new Path(error, orig.getName());
 -
 -        if (fs.exists(dest))
 -          continue;
 -
 -        bifCopyQueue.addWork(orig.getName(), (failure + "," + 
dest).getBytes(UTF_8));
 -        workIds.add(orig.getName());
 -        log.debug("tid " + tid + " added to copyq: " + orig + " to " + dest + 
": failed");
 -      }
 -
 -      bifCopyQueue.waitUntilDone(workIds);
 -    }
 -
 -    fs.deleteRecursively(new Path(error, BulkImport.FAILURES_TXT));
 -    return new CleanUpBulkImport(tableId, source, bulk, error);
 -  }
 -
 -}
 -
 -class LoadFiles extends MasterRepo {
 -
 -  private static final long serialVersionUID = 1L;
 -
 -  private static ExecutorService threadPool = null;
 -  private static final Logger log = Logger.getLogger(BulkImport.class);
 -
 -  private String tableId;
 -  private String source;
 -  private String bulk;
 -  private String errorDir;
 -  private boolean setTime;
 -
 -  public LoadFiles(String tableId, String source, String bulk, String 
errorDir, boolean setTime) {
 -    this.tableId = tableId;
 -    this.source = source;
 -    this.bulk = bulk;
 -    this.errorDir = errorDir;
 -    this.setTime = setTime;
 -  }
 -
 -  @Override
 -  public long isReady(long tid, Master master) throws Exception {
 -    if (master.onlineTabletServers().size() == 0)
 -      return 500;
 -    return 0;
 -  }
 -
 -  private static synchronized ExecutorService getThreadPool(Master master) {
 -    if (threadPool == null) {
 -      int threadPoolSize = 
master.getSystemConfiguration().getCount(Property.MASTER_BULK_THREADPOOL_SIZE);
 -      ThreadPoolExecutor pool = new SimpleThreadPool(threadPoolSize, "bulk 
import");
 -      pool.allowCoreThreadTimeOut(true);
 -      threadPool = new TraceExecutorService(pool);
 -    }
 -    return threadPool;
 -  }
 -
 -  @Override
 -  public Repo<Master> call(final long tid, final Master master) throws 
Exception {
 -    ExecutorService executor = getThreadPool(master);
 -    final SiteConfiguration conf = ServerConfiguration.getSiteConfiguration();
 -    VolumeManager fs = master.getFileSystem();
 -    List<FileStatus> files = new ArrayList<FileStatus>();
 -    for (FileStatus entry : fs.listStatus(new Path(bulk))) {
 -      files.add(entry);
 -    }
 -    log.debug("tid " + tid + " importing " + files.size() + " files");
 -
 -    Path writable = new Path(this.errorDir, ".iswritable");
 -    if (!fs.createNewFile(writable)) {
 -      // Maybe this is a re-try... clear the flag and try again
 -      fs.delete(writable);
 -      if (!fs.createNewFile(writable))
 -        throw new ThriftTableOperationException(tableId, null, 
TableOperation.BULK_IMPORT, 
TableOperationExceptionType.BULK_BAD_ERROR_DIRECTORY,
 -            "Unable to write to " + this.errorDir);
 -    }
 -    fs.delete(writable);
 -
 -    final Set<String> filesToLoad = Collections.synchronizedSet(new 
HashSet<String>());
 -    for (FileStatus f : files)
 -      filesToLoad.add(f.getPath().toString());
 -
 -    final int RETRIES = Math.max(1, 
conf.getCount(Property.MASTER_BULK_RETRIES));
 -    for (int attempt = 0; attempt < RETRIES && filesToLoad.size() > 0; 
attempt++) {
 -      List<Future<List<String>>> results = new 
ArrayList<Future<List<String>>>();
 -
 -      if (master.onlineTabletServers().size() == 0)
 -        log.warn("There are no tablet server to process bulk import, waiting 
(tid = " + tid + ")");
 -
 -      while (master.onlineTabletServers().size() == 0) {
 -        UtilWaitThread.sleep(500);
 -      }
 -
 -      // Use the threadpool to assign files one-at-a-time to the server
 -      final List<String> loaded = Collections.synchronizedList(new 
ArrayList<String>());
 -      for (final String file : filesToLoad) {
 -        results.add(executor.submit(new Callable<List<String>>() {
 -          @Override
 -          public List<String> call() {
 -            List<String> failures = new ArrayList<String>();
 -            ClientService.Client client = null;
 -            String server = null;
 -            try {
 -              // get a connection to a random tablet server, do not prefer 
cached connections because
 -              // this is running on the master and there are lots of 
connections to tablet servers
 -              // serving the metadata tablets
 -              long timeInMillis = 
master.getConfiguration().getConfiguration().getTimeInMillis(Property.MASTER_BULK_TIMEOUT);
 -              Pair<String,Client> pair = 
ServerClient.getConnection(master.getInstance(), false, timeInMillis);
 -              client = pair.getSecond();
 -              server = pair.getFirst();
 -              List<String> attempt = Collections.singletonList(file);
 -              log.debug("Asking " + pair.getFirst() + " to bulk import " + 
file);
 -              List<String> fail = client.bulkImportFiles(Tracer.traceInfo(), 
SystemCredentials.get().toThrift(master.getInstance()), tid, tableId, attempt,
 -                  errorDir, setTime);
 -              if (fail.isEmpty()) {
 -                loaded.add(file);
 -              } else {
 -                failures.addAll(fail);
 -              }
 -            } catch (Exception ex) {
 -              log.error("rpc failed server:" + server + ", tid:" + tid + " " 
+ ex);
 -            } finally {
 -              ServerClient.close(client);
 -            }
 -            return failures;
 -          }
 -        }));
 -      }
 -      Set<String> failures = new HashSet<String>();
 -      for (Future<List<String>> f : results)
 -        failures.addAll(f.get());
 -      filesToLoad.removeAll(loaded);
 -      if (filesToLoad.size() > 0) {
 -        log.debug("tid " + tid + " attempt " + (attempt + 1) + " " + 
sampleList(filesToLoad, 10) + " failed");
 -        UtilWaitThread.sleep(100);
 -      }
 -    }
 -
 -    FSDataOutputStream failFile = fs.create(new Path(errorDir, 
BulkImport.FAILURES_TXT), true);
 -    BufferedWriter out = new BufferedWriter(new OutputStreamWriter(failFile, 
UTF_8));
 -    try {
 -      for (String f : filesToLoad) {
 -        out.write(f);
 -        out.write("\n");
 -      }
 -    } finally {
 -      out.close();
 -    }
 -
 -    // return the next step, which will perform cleanup
 -    return new CompleteBulkImport(tableId, source, bulk, errorDir);
 -  }
 -
 -  static String sampleList(Collection<?> potentiallyLongList, int max) {
 -    StringBuffer result = new StringBuffer();
 -    result.append("[");
 -    int i = 0;
 -    for (Object obj : potentiallyLongList) {
 -      result.append(obj);
 -      if (i >= max) {
 -        result.append("...");
 -        break;
 -      } else {
 -        result.append(", ");
 -      }
 -      i++;
 -    }
 -    if (i < max)
 -      result.delete(result.length() - 2, result.length());
 -    result.append("]");
 -    return result.toString();
 -  }
 -
 -}

Reply via email to