ACCUMULO-4028 use known information to pick a random server, merge to 1.7

Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/78e2c65e
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/78e2c65e
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/78e2c65e

Branch: refs/heads/1.7
Commit: 78e2c65e282abbf36fdd502688a8eabf96f6a9a2
Parents: 91b161a 0212f2f
Author: Eric C. Newton <eric.new...@gmail.com>
Authored: Wed Oct 14 13:57:12 2015 -0400
Committer: Eric C. Newton <eric.new...@gmail.com>
Committed: Wed Oct 14 13:57:12 2015 -0400

----------------------------------------------------------------------
 .../accumulo/master/tableOps/BulkImport.java    |  1 +
 .../accumulo/master/tableOps/LoadFiles.java     | 24 ++++++++++++--------
 2 files changed, 15 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/78e2c65e/server/master/src/main/java/org/apache/accumulo/master/tableOps/BulkImport.java
----------------------------------------------------------------------
diff --cc 
server/master/src/main/java/org/apache/accumulo/master/tableOps/BulkImport.java
index ad20473,5320aae..7001fdd
--- 
a/server/master/src/main/java/org/apache/accumulo/master/tableOps/BulkImport.java
+++ 
b/server/master/src/main/java/org/apache/accumulo/master/tableOps/BulkImport.java
@@@ -266,3 -286,337 +266,4 @@@ public class BulkImport extends MasterR
      ZooArbitrator.cleanup(Constants.BULK_ARBITRATOR_TYPE, tid);
    }
  }
+ 
 -class CleanUpBulkImport extends MasterRepo {
 -
 -  private static final long serialVersionUID = 1L;
 -
 -  private static final Logger log = Logger.getLogger(CleanUpBulkImport.class);
 -
 -  private String tableId;
 -  private String source;
 -  private String bulk;
 -  private String error;
 -
 -  public CleanUpBulkImport(String tableId, String source, String bulk, String 
error) {
 -    this.tableId = tableId;
 -    this.source = source;
 -    this.bulk = bulk;
 -    this.error = error;
 -  }
 -
 -  @Override
 -  public Repo<Master> call(long tid, Master master) throws Exception {
 -    log.debug("removing the bulk processing flag file in " + bulk);
 -    Path bulkDir = new Path(bulk);
 -    MetadataTableUtil.removeBulkLoadInProgressFlag("/" + 
bulkDir.getParent().getName() + "/" + bulkDir.getName());
 -    MetadataTableUtil.addDeleteEntry(tableId, bulkDir.toString());
 -    log.debug("removing the metadata table markers for loaded files");
 -    Connector conn = master.getConnector();
 -    MetadataTableUtil.removeBulkLoadEntries(conn, tableId, tid);
 -    log.debug("releasing HDFS reservations for " + source + " and " + error);
 -    Utils.unreserveHdfsDirectory(source, tid);
 -    Utils.unreserveHdfsDirectory(error, tid);
 -    Utils.getReadLock(tableId, tid).unlock();
 -    log.debug("completing bulk import transaction " + tid);
 -    ZooArbitrator.cleanup(Constants.BULK_ARBITRATOR_TYPE, tid);
 -    return null;
 -  }
 -}
 -
 -class CompleteBulkImport extends MasterRepo {
 -
 -  private static final long serialVersionUID = 1L;
 -
 -  private String tableId;
 -  private String source;
 -  private String bulk;
 -  private String error;
 -
 -  public CompleteBulkImport(String tableId, String source, String bulk, 
String error) {
 -    this.tableId = tableId;
 -    this.source = source;
 -    this.bulk = bulk;
 -    this.error = error;
 -  }
 -
 -  @Override
 -  public Repo<Master> call(long tid, Master master) throws Exception {
 -    ZooArbitrator.stop(Constants.BULK_ARBITRATOR_TYPE, tid);
 -    return new CopyFailed(tableId, source, bulk, error);
 -  }
 -}
 -
 -class CopyFailed extends MasterRepo {
 -
 -  private static final long serialVersionUID = 1L;
 -
 -  private String tableId;
 -  private String source;
 -  private String bulk;
 -  private String error;
 -
 -  public CopyFailed(String tableId, String source, String bulk, String error) 
{
 -    this.tableId = tableId;
 -    this.source = source;
 -    this.bulk = bulk;
 -    this.error = error;
 -  }
 -
 -  @Override
 -  public long isReady(long tid, Master master) throws Exception {
 -    Set<TServerInstance> finished = new HashSet<TServerInstance>();
 -    Set<TServerInstance> running = master.onlineTabletServers();
 -    for (TServerInstance server : running) {
 -      try {
 -        TServerConnection client = master.getConnection(server);
 -        if (client != null && !client.isActive(tid))
 -          finished.add(server);
 -      } catch (TException ex) {
 -        log.info("Ignoring error trying to check on tid " + tid + " from 
server " + server + ": " + ex);
 -      }
 -    }
 -    if (finished.containsAll(running))
 -      return 0;
 -    return 500;
 -  }
 -
 -  @Override
 -  public Repo<Master> call(long tid, Master master) throws Exception {
 -    // This needs to execute after the arbiter is stopped
 -
 -    VolumeManager fs = master.getFileSystem();
 -
 -    if (!fs.exists(new Path(error, BulkImport.FAILURES_TXT)))
 -      return new CleanUpBulkImport(tableId, source, bulk, error);
 -
 -    HashMap<FileRef,String> failures = new HashMap<FileRef,String>();
 -    HashMap<FileRef,String> loadedFailures = new HashMap<FileRef,String>();
 -
 -    FSDataInputStream failFile = fs.open(new Path(error, 
BulkImport.FAILURES_TXT));
 -    BufferedReader in = new BufferedReader(new InputStreamReader(failFile, 
UTF_8));
 -    try {
 -      String line = null;
 -      while ((line = in.readLine()) != null) {
 -        Path path = new Path(line);
 -        if (!fs.exists(new Path(error, path.getName())))
 -          failures.put(new FileRef(line, path), line);
 -      }
 -    } finally {
 -      failFile.close();
 -    }
 -
 -    /*
 -     * I thought I could move files that have no file references in the 
table. However its possible a clone references a file. Therefore only move 
files that
 -     * have no loaded markers.
 -     */
 -
 -    // determine which failed files were loaded
 -    Connector conn = master.getConnector();
 -    Scanner mscanner = new 
IsolatedScanner(conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY));
 -    mscanner.setRange(new KeyExtent(new Text(tableId), null, 
null).toMetadataRange());
 -    mscanner.fetchColumnFamily(TabletsSection.BulkFileColumnFamily.NAME);
 -
 -    for (Entry<Key,Value> entry : mscanner) {
 -      if (Long.parseLong(entry.getValue().toString()) == tid) {
 -        FileRef loadedFile = new FileRef(fs, entry.getKey());
 -        String absPath = failures.remove(loadedFile);
 -        if (absPath != null) {
 -          loadedFailures.put(loadedFile, absPath);
 -        }
 -      }
 -    }
 -
 -    // move failed files that were not loaded
 -    for (String failure : failures.values()) {
 -      Path orig = new Path(failure);
 -      Path dest = new Path(error, orig.getName());
 -      fs.rename(orig, dest);
 -      log.debug("tid " + tid + " renamed " + orig + " to " + dest + ": import 
failed");
 -    }
 -
 -    if (loadedFailures.size() > 0) {
 -      DistributedWorkQueue bifCopyQueue = new 
DistributedWorkQueue(Constants.ZROOT + "/" + 
HdfsZooInstance.getInstance().getInstanceID()
 -          + Constants.ZBULK_FAILED_COPYQ);
 -
 -      HashSet<String> workIds = new HashSet<String>();
 -
 -      for (String failure : loadedFailures.values()) {
 -        Path orig = new Path(failure);
 -        Path dest = new Path(error, orig.getName());
 -
 -        if (fs.exists(dest))
 -          continue;
 -
 -        bifCopyQueue.addWork(orig.getName(), (failure + "," + 
dest).getBytes(UTF_8));
 -        workIds.add(orig.getName());
 -        log.debug("tid " + tid + " added to copyq: " + orig + " to " + dest + 
": failed");
 -      }
 -
 -      bifCopyQueue.waitUntilDone(workIds);
 -    }
 -
 -    fs.deleteRecursively(new Path(error, BulkImport.FAILURES_TXT));
 -    return new CleanUpBulkImport(tableId, source, bulk, error);
 -  }
 -
 -}
 -
 -class LoadFiles extends MasterRepo {
 -
 -  private static final long serialVersionUID = 1L;
 -
 -  private static ExecutorService threadPool = null;
 -  private static final Logger log = Logger.getLogger(BulkImport.class);
 -
 -  private String tableId;
 -  private String source;
 -  private String bulk;
 -  private String errorDir;
 -  private boolean setTime;
 -
 -  public LoadFiles(String tableId, String source, String bulk, String 
errorDir, boolean setTime) {
 -    this.tableId = tableId;
 -    this.source = source;
 -    this.bulk = bulk;
 -    this.errorDir = errorDir;
 -    this.setTime = setTime;
 -  }
 -
 -  @Override
 -  public long isReady(long tid, Master master) throws Exception {
 -    if (master.onlineTabletServers().size() == 0)
 -      return 500;
 -    return 0;
 -  }
 -
 -  private static synchronized ExecutorService getThreadPool(Master master) {
 -    if (threadPool == null) {
 -      int threadPoolSize = 
master.getSystemConfiguration().getCount(Property.MASTER_BULK_THREADPOOL_SIZE);
 -      ThreadPoolExecutor pool = new SimpleThreadPool(threadPoolSize, "bulk 
import");
 -      pool.allowCoreThreadTimeOut(true);
 -      threadPool = new TraceExecutorService(pool);
 -    }
 -    return threadPool;
 -  }
 -
 -  @Override
 -  public Repo<Master> call(final long tid, final Master master) throws 
Exception {
 -    ExecutorService executor = getThreadPool(master);
 -    final SiteConfiguration conf = ServerConfiguration.getSiteConfiguration();
 -    VolumeManager fs = master.getFileSystem();
 -    List<FileStatus> files = new ArrayList<FileStatus>();
 -    for (FileStatus entry : fs.listStatus(new Path(bulk))) {
 -      files.add(entry);
 -    }
 -    log.debug("tid " + tid + " importing " + files.size() + " files");
 -
 -    Path writable = new Path(this.errorDir, ".iswritable");
 -    if (!fs.createNewFile(writable)) {
 -      // Maybe this is a re-try... clear the flag and try again
 -      fs.delete(writable);
 -      if (!fs.createNewFile(writable))
 -        throw new ThriftTableOperationException(tableId, null, 
TableOperation.BULK_IMPORT, 
TableOperationExceptionType.BULK_BAD_ERROR_DIRECTORY,
 -            "Unable to write to " + this.errorDir);
 -    }
 -    fs.delete(writable);
 -
 -    final Set<String> filesToLoad = Collections.synchronizedSet(new 
HashSet<String>());
 -    for (FileStatus f : files)
 -      filesToLoad.add(f.getPath().toString());
 -
 -    final int RETRIES = Math.max(1, 
conf.getCount(Property.MASTER_BULK_RETRIES));
 -    for (int attempt = 0; attempt < RETRIES && filesToLoad.size() > 0; 
attempt++) {
 -      List<Future<List<String>>> results = new 
ArrayList<Future<List<String>>>();
 -
 -      if (master.onlineTabletServers().size() == 0)
 -        log.warn("There are no tablet server to process bulk import, waiting 
(tid = " + tid + ")");
 -
 -      while (master.onlineTabletServers().size() == 0) {
 -        UtilWaitThread.sleep(500);
 -      }
 -
 -      // Use the threadpool to assign files one-at-a-time to the server
 -      final List<String> loaded = Collections.synchronizedList(new 
ArrayList<String>());
 -      final Random random = new Random();
 -      for (final String file : filesToLoad) {
 -        results.add(executor.submit(new Callable<List<String>>() {
 -          @Override
 -          public List<String> call() {
 -            List<String> failures = new ArrayList<String>();
 -            Client client = null;
 -            String server = null;
 -            try {
 -              // get a connection to a random tablet server, do not prefer 
cached connections because
 -              // this is running on the master and there are lots of 
connections to tablet servers
 -              // serving the metadata tablets
 -              long timeInMillis = 
master.getConfiguration().getConfiguration().getTimeInMillis(Property.MASTER_BULK_TIMEOUT);
 -              TServerInstance servers[] = 
master.onlineTabletServers().toArray(new TServerInstance[0]);
 -              server = 
servers[random.nextInt(servers.length)].getLocation().toString();
 -              client = ThriftUtil.getTServerClient(server, 
master.getConfiguration().getConfiguration(), timeInMillis);
 -              List<String> attempt = Collections.singletonList(file);
 -              log.debug("Asking " + server + " to bulk import " + file);
 -              List<String> fail = client.bulkImportFiles(Tracer.traceInfo(), 
SystemCredentials.get().toThrift(master.getInstance()), tid, tableId, attempt,
 -                  errorDir, setTime);
 -              if (fail.isEmpty()) {
 -                loaded.add(file);
 -              } else {
 -                failures.addAll(fail);
 -              }
 -            } catch (Exception ex) {
 -              log.error("rpc failed server:" + server + ", tid:" + tid + " " 
+ ex);
 -            } finally {
 -              if (client != null) {
 -                ThriftUtil.returnClient(client);
 -              }
 -            }
 -            return failures;
 -          }
 -        }));
 -      }
 -      Set<String> failures = new HashSet<String>();
 -      for (Future<List<String>> f : results)
 -        failures.addAll(f.get());
 -      filesToLoad.removeAll(loaded);
 -      if (filesToLoad.size() > 0) {
 -        log.debug("tid " + tid + " attempt " + (attempt + 1) + " " + 
sampleList(filesToLoad, 10) + " failed");
 -        UtilWaitThread.sleep(100);
 -      }
 -    }
 -
 -    FSDataOutputStream failFile = fs.create(new Path(errorDir, 
BulkImport.FAILURES_TXT), true);
 -    BufferedWriter out = new BufferedWriter(new OutputStreamWriter(failFile, 
UTF_8));
 -    try {
 -      for (String f : filesToLoad) {
 -        out.write(f);
 -        out.write("\n");
 -      }
 -    } finally {
 -      out.close();
 -    }
 -
 -    // return the next step, which will perform cleanup
 -    return new CompleteBulkImport(tableId, source, bulk, errorDir);
 -  }
 -
 -  static String sampleList(Collection<?> potentiallyLongList, int max) {
 -    StringBuffer result = new StringBuffer();
 -    result.append("[");
 -    int i = 0;
 -    for (Object obj : potentiallyLongList) {
 -      result.append(obj);
 -      if (i >= max) {
 -        result.append("...");
 -        break;
 -      } else {
 -        result.append(", ");
 -      }
 -      i++;
 -    }
 -    if (i < max)
 -      result.delete(result.length() - 2, result.length());
 -    result.append("]");
 -    return result.toString();
 -  }
 -
 -}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/78e2c65e/server/master/src/main/java/org/apache/accumulo/master/tableOps/LoadFiles.java
----------------------------------------------------------------------
diff --cc 
server/master/src/main/java/org/apache/accumulo/master/tableOps/LoadFiles.java
index 4a56c6f,0000000..a80e1ff
mode 100644,000000..100644
--- 
a/server/master/src/main/java/org/apache/accumulo/master/tableOps/LoadFiles.java
+++ 
b/server/master/src/main/java/org/apache/accumulo/master/tableOps/LoadFiles.java
@@@ -1,209 -1,0 +1,213 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.master.tableOps;
 +
 +import static java.nio.charset.StandardCharsets.UTF_8;
 +
 +import java.io.BufferedWriter;
 +import java.io.OutputStreamWriter;
 +import java.util.ArrayList;
 +import java.util.Collection;
 +import java.util.Collections;
 +import java.util.HashSet;
 +import java.util.List;
++import java.util.Random;
 +import java.util.Set;
 +import java.util.concurrent.Callable;
 +import java.util.concurrent.ExecutorService;
 +import java.util.concurrent.Future;
 +import java.util.concurrent.ThreadPoolExecutor;
 +
- import org.apache.accumulo.core.client.impl.ServerClient;
 +import org.apache.accumulo.core.client.impl.thrift.ClientService;
- import org.apache.accumulo.core.client.impl.thrift.ClientService.Client;
 +import org.apache.accumulo.core.client.impl.thrift.TableOperation;
 +import 
org.apache.accumulo.core.client.impl.thrift.TableOperationExceptionType;
 +import 
org.apache.accumulo.core.client.impl.thrift.ThriftTableOperationException;
 +import org.apache.accumulo.core.conf.AccumuloConfiguration;
 +import org.apache.accumulo.core.conf.Property;
++import org.apache.accumulo.core.rpc.ThriftUtil;
 +import org.apache.accumulo.core.trace.Tracer;
- import org.apache.accumulo.core.util.Pair;
 +import org.apache.accumulo.core.util.SimpleThreadPool;
 +import org.apache.accumulo.core.util.UtilWaitThread;
 +import org.apache.accumulo.fate.Repo;
 +import org.apache.accumulo.master.Master;
 +import org.apache.accumulo.server.fs.VolumeManager;
++import org.apache.accumulo.server.master.state.TServerInstance;
 +import org.apache.hadoop.fs.FSDataOutputStream;
 +import org.apache.hadoop.fs.FileStatus;
 +import org.apache.hadoop.fs.Path;
 +import org.apache.htrace.wrappers.TraceExecutorService;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
++import com.google.common.net.HostAndPort;
++
 +class LoadFiles extends MasterRepo {
 +
 +  private static final long serialVersionUID = 1L;
 +
 +  private static ExecutorService threadPool = null;
 +  private static final Logger log = LoggerFactory.getLogger(LoadFiles.class);
 +
 +  private String tableId;
 +  private String source;
 +  private String bulk;
 +  private String errorDir;
 +  private boolean setTime;
 +
 +  public LoadFiles(String tableId, String source, String bulk, String 
errorDir, boolean setTime) {
 +    this.tableId = tableId;
 +    this.source = source;
 +    this.bulk = bulk;
 +    this.errorDir = errorDir;
 +    this.setTime = setTime;
 +  }
 +
 +  @Override
 +  public long isReady(long tid, Master master) throws Exception {
 +    if (master.onlineTabletServers().size() == 0)
 +      return 500;
 +    return 0;
 +  }
 +
 +  private static synchronized ExecutorService getThreadPool(Master master) {
 +    if (threadPool == null) {
 +      int threadPoolSize = 
master.getConfiguration().getCount(Property.MASTER_BULK_THREADPOOL_SIZE);
 +      ThreadPoolExecutor pool = new SimpleThreadPool(threadPoolSize, "bulk 
import");
 +      pool.allowCoreThreadTimeOut(true);
 +      threadPool = new TraceExecutorService(pool);
 +    }
 +    return threadPool;
 +  }
 +
 +  @Override
 +  public Repo<Master> call(final long tid, final Master master) throws 
Exception {
 +    ExecutorService executor = getThreadPool(master);
 +    final AccumuloConfiguration conf = master.getConfiguration();
 +    VolumeManager fs = master.getFileSystem();
 +    List<FileStatus> files = new ArrayList<FileStatus>();
 +    for (FileStatus entry : fs.listStatus(new Path(bulk))) {
 +      files.add(entry);
 +    }
 +    log.debug("tid " + tid + " importing " + files.size() + " files");
 +
 +    Path writable = new Path(this.errorDir, ".iswritable");
 +    if (!fs.createNewFile(writable)) {
 +      // Maybe this is a re-try... clear the flag and try again
 +      fs.delete(writable);
 +      if (!fs.createNewFile(writable))
 +        throw new ThriftTableOperationException(tableId, null, 
TableOperation.BULK_IMPORT, 
TableOperationExceptionType.BULK_BAD_ERROR_DIRECTORY,
 +            "Unable to write to " + this.errorDir);
 +    }
 +    fs.delete(writable);
 +
 +    final Set<String> filesToLoad = Collections.synchronizedSet(new 
HashSet<String>());
 +    for (FileStatus f : files)
 +      filesToLoad.add(f.getPath().toString());
 +
 +    final int RETRIES = Math.max(1, 
conf.getCount(Property.MASTER_BULK_RETRIES));
 +    for (int attempt = 0; attempt < RETRIES && filesToLoad.size() > 0; 
attempt++) {
 +      List<Future<List<String>>> results = new 
ArrayList<Future<List<String>>>();
 +
 +      if (master.onlineTabletServers().size() == 0)
 +        log.warn("There are no tablet server to process bulk import, waiting 
(tid = " + tid + ")");
 +
 +      while (master.onlineTabletServers().size() == 0) {
 +        UtilWaitThread.sleep(500);
 +      }
 +
 +      // Use the threadpool to assign files one-at-a-time to the server
 +      final List<String> loaded = Collections.synchronizedList(new 
ArrayList<String>());
++      final Random random = new Random();
 +      for (final String file : filesToLoad) {
 +        results.add(executor.submit(new Callable<List<String>>() {
 +          @Override
 +          public List<String> call() {
 +            List<String> failures = new ArrayList<String>();
 +            ClientService.Client client = null;
-             String server = null;
++            HostAndPort server = null;
 +            try {
 +              // get a connection to a random tablet server, do not prefer 
cached connections because
 +              // this is running on the master and there are lots of 
connections to tablet servers
 +              // serving the metadata tablets
 +              long timeInMillis = 
master.getConfiguration().getTimeInMillis(Property.MASTER_BULK_TIMEOUT);
-               Pair<String,Client> pair = ServerClient.getConnection(master, 
false, timeInMillis);
-               client = pair.getSecond();
-               server = pair.getFirst();
++              // Pair<String,Client> pair = 
ServerClient.getConnection(master, false, timeInMillis);
++              TServerInstance[] servers = 
master.onlineTabletServers().toArray(new TServerInstance[0]);
++              server = servers[random.nextInt(servers.length)].getLocation();
++              client = ThriftUtil.getTServerClient(server, master, 
timeInMillis);
 +              List<String> attempt = Collections.singletonList(file);
-               log.debug("Asking " + pair.getFirst() + " to bulk import " + 
file);
++              log.debug("Asking " + server + " to bulk import " + file);
 +              List<String> fail = client.bulkImportFiles(Tracer.traceInfo(), 
master.rpcCreds(), tid, tableId, attempt, errorDir, setTime);
 +              if (fail.isEmpty()) {
 +                loaded.add(file);
 +              } else {
 +                failures.addAll(fail);
 +              }
 +            } catch (Exception ex) {
 +              log.error("rpc failed server:" + server + ", tid:" + tid + " " 
+ ex);
 +            } finally {
-               ServerClient.close(client);
++              ThriftUtil.returnClient(client);
 +            }
 +            return failures;
 +          }
 +        }));
 +      }
 +      Set<String> failures = new HashSet<String>();
 +      for (Future<List<String>> f : results)
 +        failures.addAll(f.get());
 +      filesToLoad.removeAll(loaded);
 +      if (filesToLoad.size() > 0) {
 +        log.debug("tid " + tid + " attempt " + (attempt + 1) + " " + 
sampleList(filesToLoad, 10) + " failed");
 +        UtilWaitThread.sleep(100);
 +      }
 +    }
 +
 +    FSDataOutputStream failFile = fs.create(new Path(errorDir, 
BulkImport.FAILURES_TXT), true);
 +    BufferedWriter out = new BufferedWriter(new OutputStreamWriter(failFile, 
UTF_8));
 +    try {
 +      for (String f : filesToLoad) {
 +        out.write(f);
 +        out.write("\n");
 +      }
 +    } finally {
 +      out.close();
 +    }
 +
 +    // return the next step, which will perform cleanup
 +    return new CompleteBulkImport(tableId, source, bulk, errorDir);
 +  }
 +
 +  static String sampleList(Collection<?> potentiallyLongList, int max) {
 +    StringBuffer result = new StringBuffer();
 +    result.append("[");
 +    int i = 0;
 +    for (Object obj : potentiallyLongList) {
 +      result.append(obj);
 +      if (i >= max) {
 +        result.append("...");
 +        break;
 +      } else {
 +        result.append(", ");
 +      }
 +      i++;
 +    }
 +    if (i < max)
 +      result.delete(result.length() - 2, result.length());
 +    result.append("]");
 +    return result.toString();
 +  }
 +
- }
++}

Reply via email to