Author: kturner
Date: Tue Jul  3 21:13:15 2012
New Revision: 1356949

URL: http://svn.apache.org/viewvc?rev=1356949&view=rev
Log:
ACCUMULO-409 Make tservers copy failed bulk import files instead of master. 
(merged from 1.4)

Added:
    
accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/BulkFailedCopyProcessor.java
      - copied unchanged from r1356900, 
accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/tabletserver/BulkFailedCopyProcessor.java
    
accumulo/trunk/server/src/main/java/org/apache/accumulo/server/zookeeper/DistributedWorkQueue.java
      - copied, changed from r1356900, 
accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/zookeeper/DistributedWorkQueue.java
Modified:
    accumulo/trunk/   (props changed)
    accumulo/trunk/core/   (props changed)
    accumulo/trunk/core/src/main/java/org/apache/accumulo/core/Constants.java
    
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/conf/Property.java
    
accumulo/trunk/core/src/test/java/org/apache/accumulo/core/iterators/FirstEntryInRowTest.java
    
accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/IZooReaderWriter.java
    
accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooReaderWriter.java
    
accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooUtil.java
    accumulo/trunk/server/   (props changed)
    
accumulo/trunk/server/src/main/java/org/apache/accumulo/server/client/BulkImporter.java
    
accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java
    
accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
    accumulo/trunk/src/   (props changed)
    accumulo/trunk/test/system/test4/bulk_import_test.sh

Propchange: accumulo/trunk/
------------------------------------------------------------------------------
  Merged /accumulo/trunk/src:r1341000,1342373,1351691,1356400
  Merged 
/accumulo/branches/1.4/src:r1339309-1342420,1343897-1343898,1343943-1349971,1349973-1351424,1351426-1354669,1354673-1356900
  Merged /accumulo/branches/1.3:r1354669
  Merged /accumulo/branches/1.3/src:r1354669
  Merged 
/accumulo/branches/1.4:r1343943-1349971,1349973-1351424,1351426-1354668,1354673-1356923

Propchange: accumulo/trunk/core/
------------------------------------------------------------------------------
  Merged /accumulo/trunk/src/core:r1341000,1342373,1351691,1356400
  Merged /accumulo/branches/1.3/src/core:r1354669
  Merged 
/accumulo/branches/1.4/core:r1343943-1349971,1349973-1351424,1351426-1354668,1354673-1356923
  Merged 
/accumulo/branches/1.4/src/core:r1339309-1342420,1343897-1343898,1343943-1349971,1349973-1351424,1351426-1354669,1354673-1356900

Modified: 
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/Constants.java
URL: 
http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/Constants.java?rev=1356949&r1=1356948&r2=1356949&view=diff
==============================================================================
--- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/Constants.java 
(original)
+++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/Constants.java 
Tue Jul  3 21:13:15 2012
@@ -74,6 +74,8 @@ public class Constants {
   
   public static final String ZNEXT_FILE = "/next_file";
   
+  public static final String ZBULK_FAILED_COPYQ = "/bulk_failed_copyq";
+
   public static final String ZHDFS_RESERVATIONS = "/hdfs_reservations";
   public static final String ZRECOVERY = "/recovery";
   

Modified: 
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/conf/Property.java
URL: 
http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/conf/Property.java?rev=1356949&r1=1356948&r2=1356949&view=diff
==============================================================================
--- 
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/conf/Property.java 
(original)
+++ 
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/conf/Property.java 
Tue Jul  3 21:13:15 2012
@@ -151,7 +151,9 @@ public enum Property {
   TSERV_RECOVERY_MAX_CONCURRENT("tserver.recovery.concurrent.max", "2", 
PropertyType.COUNT, "The maximum number of threads to use to sort logs during 
recovery"),
   TSERV_SORT_BUFFER_SIZE("tserver.sort.buffer.size", "200M", 
PropertyType.MEMORY, "The amount of memory to use when sorting logs during 
recovery."),
   TSERV_ARCHIVE_WALOGS("tserver.archive.walogs", "false", 
PropertyType.BOOLEAN, "Keep copies of the WALOGs for debugging purposes"),
-  
+  TSERV_WORKQ_THREADS("tserver.workq.threads", "2", PropertyType.COUNT,
+      "The number of threads for the distributed workq.  These threads are 
used for copying failed bulk files."),
+
   // properties that are specific to logger server behavior
   LOGGER_PREFIX("logger.", null, PropertyType.PREFIX, "Properties in this 
category affect the behavior of the write-ahead logger servers"),
   LOGGER_DIR("logger.dir.walog", "walogs", PropertyType.PATH,

Modified: 
accumulo/trunk/core/src/test/java/org/apache/accumulo/core/iterators/FirstEntryInRowTest.java
URL: 
http://svn.apache.org/viewvc/accumulo/trunk/core/src/test/java/org/apache/accumulo/core/iterators/FirstEntryInRowTest.java?rev=1356949&r1=1356948&r2=1356949&view=diff
==============================================================================
--- 
accumulo/trunk/core/src/test/java/org/apache/accumulo/core/iterators/FirstEntryInRowTest.java
 (original)
+++ 
accumulo/trunk/core/src/test/java/org/apache/accumulo/core/iterators/FirstEntryInRowTest.java
 Tue Jul  3 21:13:15 2012
@@ -124,3 +124,4 @@ public class FirstEntryInRowTest {
     assertFalse(fei.hasTop());
   }
 }
+

Modified: 
accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/IZooReaderWriter.java
URL: 
http://svn.apache.org/viewvc/accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/IZooReaderWriter.java?rev=1356949&r1=1356948&r2=1356949&view=diff
==============================================================================
--- 
accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/IZooReaderWriter.java
 (original)
+++ 
accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/IZooReaderWriter.java
 Tue Jul  3 21:13:15 2012
@@ -48,6 +48,8 @@ public interface IZooReaderWriter extend
   
   public abstract String putEphemeralSequential(String zPath, byte[] data) 
throws KeeperException, InterruptedException;
   
+  public String putEphemeralData(String zPath, byte[] data) throws 
KeeperException, InterruptedException;
+
   public abstract void recursiveCopyPersistent(String source, String 
destination, NodeExistsPolicy policy) throws KeeperException, 
InterruptedException;
   
   public abstract void delete(String path, int version) throws 
InterruptedException, KeeperException;

Modified: 
accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooReaderWriter.java
URL: 
http://svn.apache.org/viewvc/accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooReaderWriter.java?rev=1356949&r1=1356948&r2=1356949&view=diff
==============================================================================
--- 
accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooReaderWriter.java
 (original)
+++ 
accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooReaderWriter.java
 Tue Jul  3 21:13:15 2012
@@ -93,6 +93,11 @@ public class ZooReaderWriter extends Zoo
   }
   
   @Override
+  public String putEphemeralData(String zPath, byte[] data) throws 
KeeperException, InterruptedException {
+    return ZooUtil.putEphemeralData(getZooKeeper(), zPath, data);
+  }
+  
+  @Override
   public String putEphemeralSequential(String zPath, byte[] data) throws 
KeeperException, InterruptedException {
     return ZooUtil.putEphemeralSequential(getZooKeeper(), zPath, data);
   }

Modified: 
accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooUtil.java
URL: 
http://svn.apache.org/viewvc/accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooUtil.java?rev=1356949&r1=1356948&r2=1356949&view=diff
==============================================================================
--- 
accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooUtil.java
 (original)
+++ 
accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooUtil.java
 Tue Jul  3 21:13:15 2012
@@ -211,6 +211,10 @@ public class ZooUtil {
     return zk.create(zPath, data, ZooUtil.PUBLIC, 
CreateMode.PERSISTENT_SEQUENTIAL);
   }
   
+  public static String putEphemeralData(ZooKeeper zk, String zPath, byte[] 
data) throws KeeperException, InterruptedException {
+    return zk.create(zPath, data, ZooUtil.PUBLIC, CreateMode.EPHEMERAL);
+  }
+
   public static String putEphemeralSequential(ZooKeeper zk, String zPath, 
byte[] data) throws KeeperException, InterruptedException {
     return zk.create(zPath, data, ZooUtil.PUBLIC, 
CreateMode.EPHEMERAL_SEQUENTIAL);
   }

Propchange: accumulo/trunk/server/
------------------------------------------------------------------------------
  Merged /accumulo/branches/1.3/src/server:r1354669
  Merged 
/accumulo/branches/1.4/server:r1343943-1349971,1349973-1351424,1351426-1354668,1354673-1356923
  Merged /accumulo/trunk/src/server:r1341000,1342373,1351691,1356400
  Merged 
/accumulo/branches/1.4/src/server:r1339309-1342420,1343897-1343898,1343943-1349971,1349973-1351424,1351426-1354669,1354673-1356900

Modified: 
accumulo/trunk/server/src/main/java/org/apache/accumulo/server/client/BulkImporter.java
URL: 
http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/client/BulkImporter.java?rev=1356949&r1=1356948&r2=1356949&view=diff
==============================================================================
--- 
accumulo/trunk/server/src/main/java/org/apache/accumulo/server/client/BulkImporter.java
 (original)
+++ 
accumulo/trunk/server/src/main/java/org/apache/accumulo/server/client/BulkImporter.java
 Tue Jul  3 21:13:15 2012
@@ -64,8 +64,6 @@ import org.apache.accumulo.core.util.Uti
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.SequenceFile.Writer;
 import org.apache.hadoop.io.Text;
 import org.apache.log4j.Logger;
 import org.apache.thrift.TServiceClient;
@@ -299,48 +297,16 @@ public class BulkImporter {
     if (completeFailures.size() == 0)
       return Collections.emptySet();
     
-    log.error("The following map files failed completely, saving this info to 
: " + new Path(failureDir, "failures.seq"));
+    log.debug("The following map files failed ");
     
     for (Entry<Path,List<KeyExtent>> entry : es) {
       List<KeyExtent> extents = entry.getValue();
       
       for (KeyExtent keyExtent : extents)
-        log.error("\t" + entry.getKey() + " -> " + keyExtent);
+        log.debug("\t" + entry.getKey() + " -> " + keyExtent);
     }
-    
-    try {
-      
-      Writer outSeq = SequenceFile.createWriter(fs, conf, new Path(failureDir, 
"failures.seq"), Text.class, KeyExtent.class);
-      
-      for (Entry<Path,List<KeyExtent>> entry : es) {
-        List<KeyExtent> extents = entry.getValue();
-        
-        for (KeyExtent keyExtent : extents)
-          outSeq.append(new Text(entry.getKey().toString()), keyExtent);
-      }
-      
-      outSeq.close();
-    } catch (IOException ioe) {
-      log.error("Failed to create " + new Path(failureDir, "failures.seq") + " 
: " + ioe.getMessage());
-    }
-    
-    // we should make copying multi-threaded
-    Set<Path> failedCopies = new HashSet<Path>();
-    
-    for (Entry<Path,List<KeyExtent>> entry : es) {
-      Path dest = new Path(failureDir, entry.getKey().getName());
-      
-      log.debug("Copying " + entry.getKey() + " to " + dest);
-      
-      try {
-        org.apache.hadoop.fs.FileUtil.copy(fs, entry.getKey(), fs, dest, 
false, conf);
-      } catch (IOException ioe) {
-        log.error("Failed to copy " + entry.getKey() + " : " + 
ioe.getMessage());
-        failedCopies.add(entry.getKey());
-      }
-    }
-    
-    return failedCopies;
+
+    return Collections.emptySet();
   }
   
   private class AssignmentInfo {

Modified: 
accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java
URL: 
http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java?rev=1356949&r1=1356948&r2=1356949&view=diff
==============================================================================
--- 
accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java
 (original)
+++ 
accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java
 Tue Jul  3 21:13:15 2012
@@ -16,13 +16,19 @@
  */
 package org.apache.accumulo.server.master.tableOps;
 
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
@@ -33,6 +39,8 @@ import org.apache.accumulo.cloudtrace.in
 import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.client.Connector;
 import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.IsolatedScanner;
+import org.apache.accumulo.core.client.Scanner;
 import org.apache.accumulo.core.client.impl.ServerClient;
 import org.apache.accumulo.core.client.impl.Tables;
 import org.apache.accumulo.core.client.impl.thrift.ClientService;
@@ -42,10 +50,12 @@ import org.apache.accumulo.core.client.i
 import 
org.apache.accumulo.core.client.impl.thrift.ThriftTableOperationException;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.conf.SiteConfiguration;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.file.FileOperations;
 import org.apache.accumulo.core.master.state.tables.TableState;
 import org.apache.accumulo.core.security.thrift.AuthInfo;
-import org.apache.accumulo.core.util.CachedConfiguration;
 import org.apache.accumulo.core.util.Pair;
 import org.apache.accumulo.core.util.SimpleThreadPool;
 import org.apache.accumulo.core.util.UtilWaitThread;
@@ -58,14 +68,16 @@ import org.apache.accumulo.server.master
 import org.apache.accumulo.server.master.state.TServerInstance;
 import org.apache.accumulo.server.security.SecurityConstants;
 import org.apache.accumulo.server.tabletserver.UniqueNameAllocator;
-import org.apache.accumulo.server.trace.TraceFileSystem;
 import org.apache.accumulo.server.util.MetadataTable;
+import org.apache.accumulo.server.zookeeper.DistributedWorkQueue;
 import org.apache.accumulo.server.zookeeper.TransactionWatcher.ZooArbitrator;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.MapFile;
+import org.apache.hadoop.io.Text;
 import org.apache.log4j.Logger;
 import org.apache.thrift.TException;
 
@@ -133,9 +145,8 @@ public class BulkImport extends MasterRe
     Utils.getReadLock(tableId, tid).lock();
     
     // check that the error directory exists and is empty
-    FileSystem fs = 
TraceFileSystem.wrap(org.apache.accumulo.core.file.FileUtil.getFileSystem(CachedConfiguration.getInstance(),
-        ServerConfiguration.getSiteConfiguration()));
-    ;
+    FileSystem fs = master.getFileSystem();
+
     Path errorPath = new Path(errorDir);
     FileStatus errorStatus = fs.getFileStatus(errorPath);
     if (errorStatus == null)
@@ -273,24 +284,6 @@ class CleanUpBulkImport extends MasterRe
   }
   
   @Override
-  public long isReady(long tid, Master master) throws Exception {
-    Set<TServerInstance> finished = new HashSet<TServerInstance>();
-    Set<TServerInstance> running = master.onlineTabletServers();
-    for (TServerInstance server : running) {
-      try {
-        TServerConnection client = master.getConnection(server);
-        if (client != null && !client.isActive(tid))
-          finished.add(server);
-      } catch (TException ex) {
-        log.info("Ignoring error trying to check on tid " + tid + " from 
server " + server + ": " + ex);
-      }
-    }
-    if (finished.containsAll(running))
-      return 0;
-    return 1000;
-  }
-  
-  @Override
   public Repo<Master> call(long tid, Master master) throws Exception {
     log.debug("removing the bulk processing flag file in " + bulk);
     Path bulkDir = new Path(bulk);
@@ -327,8 +320,124 @@ class CompleteBulkImport extends MasterR
   @Override
   public Repo<Master> call(long tid, Master master) throws Exception {
     ZooArbitrator.stop(Constants.BULK_ARBITRATOR_TYPE, tid);
+    return new CopyFailed(tableId, source, bulk, error);
+  }
+}
+
+class CopyFailed extends MasterRepo {
+  
+  private static final long serialVersionUID = 1L;
+
+  private String tableId;
+  private String source;
+  private String bulk;
+  private String error;
+  
+  public CopyFailed(String tableId, String source, String bulk, String error) {
+    this.tableId = tableId;
+    this.source = source;
+    this.bulk = bulk;
+    this.error = error;
+  }
+  
+  @Override
+  public long isReady(long tid, Master master) throws Exception {
+    Set<TServerInstance> finished = new HashSet<TServerInstance>();
+    Set<TServerInstance> running = master.onlineTabletServers();
+    for (TServerInstance server : running) {
+      try {
+        TServerConnection client = master.getConnection(server);
+        if (client != null && !client.isActive(tid))
+          finished.add(server);
+      } catch (TException ex) {
+        log.info("Ignoring error trying to check on tid " + tid + " from 
server " + server + ": " + ex);
+      }
+    }
+    if (finished.containsAll(running))
+      return 0;
+    return 500;
+  }
+  
+  @Override
+  public Repo<Master> call(long tid, Master environment) throws Exception {
+       //This needs to execute after the arbiter is stopped  
+         
+    FileSystem fs = environment.getFileSystem();
+         
+    if (!fs.exists(new Path(error, "failures.txt")))
+      return new CleanUpBulkImport(tableId, source, bulk, error);
+    
+    HashMap<String,String> failures = new HashMap<String,String>();
+    HashMap<String,String> loadedFailures = new HashMap<String,String>();
+    
+    FSDataInputStream failFile = fs.open(new Path(error, "failures.txt"));
+    BufferedReader in = new BufferedReader(new InputStreamReader(failFile));
+    try {
+      String line = null;
+      while ((line = in.readLine()) != null) {
+        Path path = new Path(line);
+        if (!fs.exists(new Path(error, path.getName())))
+          failures.put("/" + path.getParent().getName() + "/" + 
path.getName(), line);
+      }
+    } finally {
+      failFile.close();
+    }
+    
+    /*
+     * I thought I could move files that have no file references in the table. 
However its possible a clone references a file. Therefore only move files that
+     * have no loaded markers.
+     */
+
+    // determine which failed files were loaded
+    AuthInfo creds = SecurityConstants.getSystemCredentials();
+    Connector conn = HdfsZooInstance.getInstance().getConnector(creds.user, 
creds.password);
+    Scanner mscanner = new 
IsolatedScanner(conn.createScanner(Constants.METADATA_TABLE_NAME, 
Constants.NO_AUTHS));
+    mscanner.setRange(new KeyExtent(new Text(tableId), null, 
null).toMetadataRange());
+    mscanner.fetchColumnFamily(Constants.METADATA_BULKFILE_COLUMN_FAMILY);
+    
+    for (Entry<Key,Value> entry : mscanner) {
+      if (Long.parseLong(entry.getValue().toString()) == tid) {
+        String loadedFile = entry.getKey().getColumnQualifierData().toString();
+        String absPath = failures.remove(loadedFile);
+        if (absPath != null) {
+          loadedFailures.put(loadedFile, absPath);
+        }
+      }
+    }
+    
+    // move failed files that were not loaded
+    for (String failure : failures.values()) {
+      Path orig = new Path(failure);
+      Path dest = new Path(error, orig.getName());
+      fs.rename(orig, dest);
+      log.debug("tid " + tid + " renamed " + orig + " to " + dest + ": 
failed");
+    }
+    
+    if (loadedFailures.size() > 0) {
+      DistributedWorkQueue bifCopyQueue = new 
DistributedWorkQueue(Constants.ZROOT + "/" + 
HdfsZooInstance.getInstance().getInstanceID()
+          + Constants.ZBULK_FAILED_COPYQ);
+      
+      HashSet<String> workIds = new HashSet<String>();
+      
+      for (String failure : loadedFailures.values()) {
+        Path orig = new Path(failure);
+        Path dest = new Path(error, orig.getName());
+        
+        if (fs.exists(dest))
+          continue;
+        
+        bifCopyQueue.addWork(orig.getName(), (failure + "," + 
dest).getBytes());
+        workIds.add(orig.getName());
+        log.debug("tid " + tid + " added to copyq: " + orig + " to " + dest + 
": failed");
+      }
+      
+      bifCopyQueue.waitUntilDone(workIds);
+    }
+
+    fs.delete(new Path(error, "failures.txt"), true);
     return new CleanUpBulkImport(tableId, source, bulk, error);
   }
+  
 }
 
 class LoadFiles extends MasterRepo {
@@ -375,8 +484,7 @@ class LoadFiles extends MasterRepo {
   public Repo<Master> call(final long tid, final Master master) throws 
Exception {
     initializeThreadPool(master);
     final SiteConfiguration conf = ServerConfiguration.getSiteConfiguration();
-    FileSystem fs = 
TraceFileSystem.wrap(org.apache.accumulo.core.file.FileUtil.getFileSystem(CachedConfiguration.getInstance(),
-        ServerConfiguration.getSiteConfiguration()));
+    FileSystem fs = master.getFileSystem();
     List<FileStatus> files = new ArrayList<FileStatus>();
     for (FileStatus entry : fs.listStatus(new Path(bulk))) {
       files.add(entry);
@@ -448,23 +556,18 @@ class LoadFiles extends MasterRepo {
         UtilWaitThread.sleep(100);
       }
     }
-    // Copy/Create failed file markers
-    for (String f : filesToLoad) {
-      Path orig = new Path(f);
-      Path dest = new Path(errorDir, orig.getName());
-      try {
-        FileUtil.copy(fs, orig, fs, dest, false, true, 
CachedConfiguration.getInstance());
-        log.debug("tid " + tid + " copied " + orig + " to " + dest + ": 
failed");
-      } catch (IOException ex) {
-        try {
-          fs.create(dest).close();
-          log.debug("tid " + tid + " marked " + dest + " failed");
-        } catch (IOException e) {
-          log.error("Unable to create failure flag file " + dest, e);
-        }
+    
+    FSDataOutputStream failFile = fs.create(new Path(errorDir, 
"failures.txt"), true);
+    BufferedWriter out = new BufferedWriter(new OutputStreamWriter(failFile));
+    try {
+      for (String f : filesToLoad) {
+        out.write(f);
+        out.write("\n");
       }
+    } finally {
+      out.close();
     }
-    
+
     // return the next step, which will perform cleanup
     return new CompleteBulkImport(tableId, source, bulk, errorDir);
   }

Modified: 
accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
URL: 
http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java?rev=1356949&r1=1356948&r2=1356949&view=diff
==============================================================================
--- 
accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
 (original)
+++ 
accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
 Tue Jul  3 21:13:15 2012
@@ -51,8 +51,10 @@ import java.util.concurrent.ArrayBlockin
 import java.util.concurrent.BlockingDeque;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -124,6 +126,7 @@ import org.apache.accumulo.core.util.Cac
 import org.apache.accumulo.core.util.ColumnFQ;
 import org.apache.accumulo.core.util.Daemon;
 import org.apache.accumulo.core.util.LoggingRunnable;
+import org.apache.accumulo.core.util.NamingThreadFactory;
 import org.apache.accumulo.core.util.ServerServices;
 import org.apache.accumulo.core.util.ServerServices.Service;
 import org.apache.accumulo.core.util.Stat;
@@ -186,6 +189,7 @@ import org.apache.accumulo.server.util.T
 import org.apache.accumulo.server.util.TServerUtils.ServerPort;
 import org.apache.accumulo.server.util.time.RelativeTime;
 import org.apache.accumulo.server.util.time.SimpleTimer;
+import org.apache.accumulo.server.zookeeper.DistributedWorkQueue;
 import org.apache.accumulo.server.zookeeper.TransactionWatcher;
 import org.apache.accumulo.server.zookeeper.ZooCache;
 import org.apache.accumulo.server.zookeeper.ZooLock;
@@ -2569,6 +2573,8 @@ public class TabletServer extends Abstra
   
   private TServer server;
   
+  private DistributedWorkQueue bulkFailedCopyQ;
+  
   private static final String METRICS_PREFIX = "tserver";
   
   private static ObjectName OBJECT_NAME = null;
@@ -2715,6 +2721,16 @@ public class TabletServer extends Abstra
       throw new RuntimeException(ex);
     }
     
+    ThreadPoolExecutor distWorkQThreadPool = (ThreadPoolExecutor) 
Executors.newFixedThreadPool(getSystemConfiguration().getCount(Property.TSERV_WORKQ_THREADS),
+        new NamingThreadFactory("distributed work queue"));
+
+    bulkFailedCopyQ = new DistributedWorkQueue(Constants.ZROOT + "/" + 
HdfsZooInstance.getInstance().getInstanceID() + Constants.ZBULK_FAILED_COPYQ);
+    try {
+      bulkFailedCopyQ.startProcessing(new BulkFailedCopyProcessor(), 
distWorkQThreadPool);
+    } catch (Exception e1) {
+      throw new RuntimeException("Failed to start distributed work queue for 
copying ", e1);
+    }
+
     try {
       OBJECT_NAME = new 
ObjectName("accumulo.server.metrics:service=TServerInfo,name=TabletServerMBean,instance="
 + Thread.currentThread().getName());
       // Do this because interface not in same package.

Copied: 
accumulo/trunk/server/src/main/java/org/apache/accumulo/server/zookeeper/DistributedWorkQueue.java
 (from r1356900, 
accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/zookeeper/DistributedWorkQueue.java)
URL: 
http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/zookeeper/DistributedWorkQueue.java?p2=accumulo/trunk/server/src/main/java/org/apache/accumulo/server/zookeeper/DistributedWorkQueue.java&p1=accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/zookeeper/DistributedWorkQueue.java&r1=1356900&r2=1356949&rev=1356949&view=diff
==============================================================================
--- 
accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/zookeeper/DistributedWorkQueue.java
 (original)
+++ 
accumulo/trunk/server/src/main/java/org/apache/accumulo/server/zookeeper/DistributedWorkQueue.java
 Tue Jul  3 21:13:15 2012
@@ -24,8 +24,8 @@ import java.util.TimerTask;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import org.apache.accumulo.core.zookeeper.ZooUtil.NodeExistsPolicy;
-import org.apache.accumulo.core.zookeeper.ZooUtil.NodeMissingPolicy;
+import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
+import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy;
 import org.apache.accumulo.server.util.time.SimpleTimer;
 import org.apache.log4j.Logger;
 import org.apache.zookeeper.KeeperException;

Propchange: accumulo/trunk/src/
------------------------------------------------------------------------------
  Merged /accumulo/trunk:r1341000,1344302,1344358,1356400
  Merged 
/accumulo/branches/1.4/src:r1343943-1349971,1349973-1351424,1351426-1354668,1354673-1356923
  Merged /accumulo/branches/1.3/src:r1354669
  Merged 
/accumulo/branches/1.4/src/src:r1339309-1342420,1343897-1343898,1343943-1349971,1349973-1351424,1351426-1354669,1354673-1356900

Modified: accumulo/trunk/test/system/test4/bulk_import_test.sh
URL: 
http://svn.apache.org/viewvc/accumulo/trunk/test/system/test4/bulk_import_test.sh?rev=1356949&r1=1356948&r2=1356949&view=diff
==============================================================================
--- accumulo/trunk/test/system/test4/bulk_import_test.sh (original)
+++ accumulo/trunk/test/system/test4/bulk_import_test.sh Tue Jul  3 21:13:15 
2012
@@ -19,11 +19,11 @@ hadoop dfs -rmr /testmf
 
 echo "creating first set of map files"
 
-../../../bin/accumulo org.apache.accumulo.server.test.TestIngest -mapFile 
/testmf/mf01 -timestamp 1 -size 50 -random 56 1000000 0 1 &
-../../../bin/accumulo org.apache.accumulo.server.test.TestIngest -mapFile 
/testmf/mf02 -timestamp 1 -size 50 -random 56 1000000 1000000 1 &
-../../../bin/accumulo org.apache.accumulo.server.test.TestIngest -mapFile 
/testmf/mf03 -timestamp 1 -size 50 -random 56 1000000 2000000 1 &
-../../../bin/accumulo org.apache.accumulo.server.test.TestIngest -mapFile 
/testmf/mf04 -timestamp 1 -size 50 -random 56 1000000 3000000 1 &
-../../../bin/accumulo org.apache.accumulo.server.test.TestIngest -mapFile 
/testmf/mf05 -timestamp 1 -size 50 -random 56 1000000 4000000 1 &
+../../../bin/accumulo org.apache.accumulo.server.test.TestIngest -rfile 
/testmf/mf01 -timestamp 1 -size 50 -random 56 1000000 0 1 &
+../../../bin/accumulo org.apache.accumulo.server.test.TestIngest -rfile 
/testmf/mf02 -timestamp 1 -size 50 -random 56 1000000 1000000 1 &
+../../../bin/accumulo org.apache.accumulo.server.test.TestIngest -rfile 
/testmf/mf03 -timestamp 1 -size 50 -random 56 1000000 2000000 1 &
+../../../bin/accumulo org.apache.accumulo.server.test.TestIngest -rfile 
/testmf/mf04 -timestamp 1 -size 50 -random 56 1000000 3000000 1 &
+../../../bin/accumulo org.apache.accumulo.server.test.TestIngest -rfile 
/testmf/mf05 -timestamp 1 -size 50 -random 56 1000000 4000000 1 &
 
 wait
 
@@ -46,11 +46,11 @@ hadoop dfs -rmr /testmf
 
 echo "creating second set of map files"
 
-../../../bin/accumulo org.apache.accumulo.server.test.TestIngest -mapFile 
/testmf/mf01 -timestamp 2 -size 50 -random 57 1000000 0 1 &
-../../../bin/accumulo org.apache.accumulo.server.test.TestIngest -mapFile 
/testmf/mf02 -timestamp 2 -size 50 -random 57 1000000 1000000 1 &
-../../../bin/accumulo org.apache.accumulo.server.test.TestIngest -mapFile 
/testmf/mf03 -timestamp 2 -size 50 -random 57 1000000 2000000 1 &
-../../../bin/accumulo org.apache.accumulo.server.test.TestIngest -mapFile 
/testmf/mf04 -timestamp 2 -size 50 -random 57 1000000 3000000 1 &
-../../../bin/accumulo org.apache.accumulo.server.test.TestIngest -mapFile 
/testmf/mf05 -timestamp 2 -size 50 -random 57 1000000 4000000 1 &
+../../../bin/accumulo org.apache.accumulo.server.test.TestIngest -rfile 
/testmf/mf01 -timestamp 2 -size 50 -random 57 1000000 0 1 &
+../../../bin/accumulo org.apache.accumulo.server.test.TestIngest -rfile 
/testmf/mf02 -timestamp 2 -size 50 -random 57 1000000 1000000 1 &
+../../../bin/accumulo org.apache.accumulo.server.test.TestIngest -rfile 
/testmf/mf03 -timestamp 2 -size 50 -random 57 1000000 2000000 1 &
+../../../bin/accumulo org.apache.accumulo.server.test.TestIngest -rfile 
/testmf/mf04 -timestamp 2 -size 50 -random 57 1000000 3000000 1 &
+../../../bin/accumulo org.apache.accumulo.server.test.TestIngest -rfile 
/testmf/mf05 -timestamp 2 -size 50 -random 57 1000000 4000000 1 &
 
 wait
 


Reply via email to