This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/master by this push:
     new b1e67f7  Create max tablets property in new bulk import (#1614)
b1e67f7 is described below

commit b1e67f7ced838037965436fc1d1f255d139a4045
Author: Mike Miller <mmil...@apache.org>
AuthorDate: Fri Jun 5 20:58:50 2020 -0400

    Create max tablets property in new bulk import (#1614)
---
 .../accumulo/core/clientImpl/bulk/BulkImport.java  | 52 +++++++++-----
 .../org/apache/accumulo/core/conf/Property.java    |  3 +
 .../master/tableOps/bulkVer2/PrepBulkImport.java   | 57 ++++++++++++----
 .../tableOps/bulkVer2/PrepBulkImportTest.java      | 31 ++++++++-
 .../apache/accumulo/test/functional/BulkNewIT.java | 79 ++++++++++++++++++++++
 5 files changed, 189 insertions(+), 33 deletions(-)

diff --git 
a/core/src/main/java/org/apache/accumulo/core/clientImpl/bulk/BulkImport.java 
b/core/src/main/java/org/apache/accumulo/core/clientImpl/bulk/BulkImport.java
index ebc3a3d..5120333 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/clientImpl/bulk/BulkImport.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/clientImpl/bulk/BulkImport.java
@@ -64,6 +64,7 @@ import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.ClientProperty;
 import org.apache.accumulo.core.conf.ConfigurationCopy;
 import org.apache.accumulo.core.conf.ConfigurationTypeHelper;
+import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.crypto.CryptoServiceFactory;
 import org.apache.accumulo.core.data.ByteSequence;
 import org.apache.accumulo.core.data.Key;
@@ -133,6 +134,14 @@ public class BulkImport implements 
ImportDestinationArguments, ImportMappingOpti
 
     SortedMap<KeyExtent,Bulk.Files> mappings;
     TableOperationsImpl tableOps = new TableOperationsImpl(context);
+
+    int maxTablets = 0;
+    for (var prop : tableOps.getProperties(tableName)) {
+      if (prop.getKey().equals(Property.TABLE_BULK_MAX_TABLETS.getKey())) {
+        maxTablets = Integer.parseInt(prop.getValue());
+        break;
+      }
+    }
     Retry retry = Retry.builder().infiniteRetries().retryAfter(100, 
MILLISECONDS)
         .incrementBy(100, MILLISECONDS).maxWait(2, MINUTES).backOffFactor(1.5)
         .logInterval(3, TimeUnit.MINUTES).createRetry();
@@ -141,9 +150,9 @@ public class BulkImport implements 
ImportDestinationArguments, ImportMappingOpti
     boolean shouldRetry = true;
     while (shouldRetry) {
       if (plan == null) {
-        mappings = computeMappingFromFiles(fs, tableId, srcPath);
+        mappings = computeMappingFromFiles(fs, tableId, srcPath, maxTablets);
       } else {
-        mappings = computeMappingFromPlan(fs, tableId, srcPath);
+        mappings = computeMappingFromPlan(fs, tableId, srcPath, maxTablets);
       }
 
       if (mappings.isEmpty())
@@ -385,7 +394,7 @@ public class BulkImport implements 
ImportDestinationArguments, ImportMappingOpti
   }
 
   private SortedMap<KeyExtent,Files> computeMappingFromPlan(FileSystem fs, 
TableId tableId,
-      Path srcPath)
+      Path srcPath, int maxTablets)
       throws IOException, AccumuloException, AccumuloSecurityException, 
TableNotFoundException {
 
     Map<String,List<Destination>> fileDestinations =
@@ -422,7 +431,9 @@ public class BulkImport implements 
ImportDestinationArguments, ImportMappingOpti
     for (Entry<String,List<Destination>> entry : fileDestinations.entrySet()) {
       String fileName = entry.getKey();
       List<Destination> destinations = entry.getValue();
-      Set<KeyExtent> extents = mapDesitnationsToExtents(tableId, extentCache, 
destinations);
+      Set<KeyExtent> extents = mapDestinationsToExtents(tableId, extentCache, 
destinations);
+      log.debug("The file {} mapped to {} tablets.", fileName, extents.size());
+      checkTabletCount(maxTablets, extents.size(), fileName);
 
       long estSize = (long) (fileLens.get(fileName) / (double) extents.size());
 
@@ -439,7 +450,7 @@ public class BulkImport implements 
ImportDestinationArguments, ImportMappingOpti
     return row == null ? null : new Text(row);
   }
 
-  private Set<KeyExtent> mapDesitnationsToExtents(TableId tableId, 
KeyExtentCache kec,
+  private Set<KeyExtent> mapDestinationsToExtents(TableId tableId, 
KeyExtentCache kec,
       List<Destination> destinations)
       throws AccumuloException, AccumuloSecurityException, 
TableNotFoundException {
     Set<KeyExtent> extents = new HashSet<>();
@@ -470,7 +481,7 @@ public class BulkImport implements 
ImportDestinationArguments, ImportMappingOpti
   }
 
   private SortedMap<KeyExtent,Bulk.Files> computeMappingFromFiles(FileSystem 
fs, TableId tableId,
-      Path dirPath) throws IOException {
+      Path dirPath, int maxTablets) throws IOException {
 
     Executor executor;
     ExecutorService service = null;
@@ -486,7 +497,7 @@ public class BulkImport implements 
ImportDestinationArguments, ImportMappingOpti
     }
 
     try {
-      return computeFileToTabletMappings(fs, tableId, dirPath, executor, 
context);
+      return computeFileToTabletMappings(fs, tableId, dirPath, executor, 
context, maxTablets);
     } finally {
       if (service != null) {
         service.shutdown();
@@ -523,8 +534,8 @@ public class BulkImport implements 
ImportDestinationArguments, ImportMappingOpti
     return fileList;
   }
 
-  public static SortedMap<KeyExtent,Bulk.Files> 
computeFileToTabletMappings(FileSystem fs,
-      TableId tableId, Path dirPath, Executor executor, ClientContext context) 
throws IOException {
+  public SortedMap<KeyExtent,Bulk.Files> 
computeFileToTabletMappings(FileSystem fs, TableId tableId,
+      Path dirPath, Executor executor, ClientContext context, int maxTablets) 
throws IOException {
 
     KeyExtentCache extentCache = new ConcurrentKeyExtentCache(tableId, 
context);
 
@@ -540,21 +551,22 @@ public class BulkImport implements 
ImportDestinationArguments, ImportMappingOpti
     CryptoService cs = CryptoServiceFactory.newDefaultInstance();
 
     for (FileStatus fileStatus : files) {
+      Path filePath = fileStatus.getPath();
       CompletableFuture<Map<KeyExtent,Bulk.FileInfo>> future = 
CompletableFuture.supplyAsync(() -> {
         try {
           long t1 = System.currentTimeMillis();
-          List<KeyExtent> extents = findOverlappingTablets(context, 
extentCache,
-              fileStatus.getPath(), fs, fileLensCache, cs);
-          Map<KeyExtent,Long> estSizes = 
estimateSizes(context.getConfiguration(),
-              fileStatus.getPath(), fileStatus.getLen(), extents, fs, 
fileLensCache, cs);
+          List<KeyExtent> extents =
+              findOverlappingTablets(context, extentCache, filePath, fs, 
fileLensCache, cs);
+          // make sure file isn't going to too many tablets
+          checkTabletCount(maxTablets, extents.size(), filePath.toString());
+          Map<KeyExtent,Long> estSizes = 
estimateSizes(context.getConfiguration(), filePath,
+              fileStatus.getLen(), extents, fs, fileLensCache, cs);
           Map<KeyExtent,Bulk.FileInfo> pathLocations = new HashMap<>();
           for (KeyExtent ke : extents) {
-            pathLocations.put(ke,
-                new Bulk.FileInfo(fileStatus.getPath(), 
estSizes.getOrDefault(ke, 0L)));
+            pathLocations.put(ke, new Bulk.FileInfo(filePath, 
estSizes.getOrDefault(ke, 0L)));
           }
           long t2 = System.currentTimeMillis();
-          log.trace("Mapped {} to {} tablets in {}ms", fileStatus.getPath(), 
pathLocations.size(),
-              t2 - t1);
+          log.debug("Mapped {} to {} tablets in {}ms", filePath, 
pathLocations.size(), t2 - t1);
           return pathLocations;
         } catch (Exception e) {
           throw new CompletionException(e);
@@ -611,4 +623,10 @@ public class BulkImport implements 
ImportDestinationArguments, ImportMappingOpti
 
     return mappings;
   }
+
+  private void checkTabletCount(int tabletMaxSize, int tabletCount, String 
file) {
+    if (tabletMaxSize > 0 && tabletCount > tabletMaxSize)
+      throw new IllegalArgumentException("The file " + file + " attempted to 
import to "
+          + tabletCount + " tablets. Max tablets allowed set to " + 
tabletMaxSize);
+  }
 }
diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java 
b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
index fc83376..494541c 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@ -715,6 +715,9 @@ public enum Property {
           + " perform specialized parsing of the key. "),
   TABLE_BLOOM_HASHTYPE("table.bloom.hash.type", "murmur", PropertyType.STRING,
       "The bloom filter hash type"),
+  TABLE_BULK_MAX_TABLETS("table.bulk.max.tablets", "0", PropertyType.COUNT,
+      "The maximum number of tablets allowed for one bulk import file. Value 
of 0 is Unlimited. "
+          + "This property is only enforced in the new bulk import API"),
   TABLE_DURABILITY("table.durability", "sync", PropertyType.DURABILITY,
       "The durability used to write to the write-ahead log. Legal values are:"
           + " none, which skips the write-ahead log; log, which sends the data 
to the"
diff --git 
a/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer2/PrepBulkImport.java
 
b/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer2/PrepBulkImport.java
index d949f21..7bc55af 100644
--- 
a/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer2/PrepBulkImport.java
+++ 
b/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer2/PrepBulkImport.java
@@ -38,10 +38,12 @@ import 
org.apache.accumulo.core.clientImpl.bulk.BulkSerialize;
 import org.apache.accumulo.core.clientImpl.bulk.LoadMappingIterator;
 import org.apache.accumulo.core.clientImpl.thrift.TableOperation;
 import org.apache.accumulo.core.clientImpl.thrift.TableOperationExceptionType;
+import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.TableId;
 import org.apache.accumulo.core.dataImpl.KeyExtent;
 import org.apache.accumulo.core.metadata.schema.TabletMetadata;
 import org.apache.accumulo.core.metadata.schema.TabletsMetadata;
+import org.apache.accumulo.fate.FateTxId;
 import org.apache.accumulo.fate.Repo;
 import org.apache.accumulo.master.Master;
 import org.apache.accumulo.master.tableOps.MasterRepo;
@@ -59,7 +61,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Iterators;
 
 /**
  * Prepare bulk import directory. This REPO creates a bulk directory in 
Accumulo, list all the files
@@ -108,18 +109,21 @@ public class PrepBulkImport extends MasterRepo {
   }
 
   @VisibleForTesting
-  static void checkForMerge(String tableId, Iterator<KeyExtent> lmi,
-      TabletIterFactory tabletIterFactory) throws Exception {
-    KeyExtent currRange = lmi.next();
+  static void checkForMerge(String tableId, LoadMappingIterator lmi,
+      TabletIterFactory tabletIterFactory, int maxNumTablets, long tid) throws 
Exception {
+    var currRange = lmi.next();
 
-    Text startRow = currRange.getPrevEndRow();
+    Text startRow = currRange.getKey().getPrevEndRow();
 
     Iterator<KeyExtent> tabletIter = tabletIterFactory.newTabletIter(startRow);
 
     KeyExtent currTablet = tabletIter.next();
 
-    if (!tabletIter.hasNext() && equals(KeyExtent::getPrevEndRow, currTablet, 
currRange)
-        && equals(KeyExtent::getEndRow, currTablet, currRange))
+    var fileCounts = new HashMap<String,Integer>();
+    int count;
+
+    if (!tabletIter.hasNext() && equals(KeyExtent::getPrevEndRow, currTablet, 
currRange.getKey())
+        && equals(KeyExtent::getEndRow, currTablet, currRange.getKey()))
       currRange = null;
 
     while (tabletIter.hasNext()) {
@@ -131,20 +135,29 @@ public class PrepBulkImport extends MasterRepo {
         currRange = lmi.next();
       }
 
-      while (!equals(KeyExtent::getPrevEndRow, currTablet, currRange) && 
tabletIter.hasNext()) {
+      while (!equals(KeyExtent::getPrevEndRow, currTablet, currRange.getKey())
+          && tabletIter.hasNext()) {
         currTablet = tabletIter.next();
       }
 
-      boolean matchedPrevRow = equals(KeyExtent::getPrevEndRow, currTablet, 
currRange);
+      boolean matchedPrevRow = equals(KeyExtent::getPrevEndRow, currTablet, 
currRange.getKey());
+      count = matchedPrevRow ? 1 : 0;
 
-      while (!equals(KeyExtent::getEndRow, currTablet, currRange) && 
tabletIter.hasNext()) {
+      while (!equals(KeyExtent::getEndRow, currTablet, currRange.getKey())
+          && tabletIter.hasNext()) {
         currTablet = tabletIter.next();
+        count++;
       }
 
-      if (!matchedPrevRow || !equals(KeyExtent::getEndRow, currTablet, 
currRange)) {
+      if (!matchedPrevRow || !equals(KeyExtent::getEndRow, currTablet, 
currRange.getKey())) {
         break;
       }
 
+      if (maxNumTablets > 0) {
+        int fc = count;
+        currRange.getValue()
+            .forEach(fileInfo -> fileCounts.merge(fileInfo.getFileName(), fc, 
Integer::sum));
+      }
       currRange = null;
     }
 
@@ -153,12 +166,27 @@ public class PrepBulkImport extends MasterRepo {
       throw new AcceptableThriftTableOperationException(tableId, null, 
TableOperation.BULK_IMPORT,
           TableOperationExceptionType.BULK_CONCURRENT_MERGE, "Concurrent merge 
happened");
     }
+
+    if (maxNumTablets > 0) {
+      fileCounts.values().removeIf(c -> c <= maxNumTablets);
+      if (!fileCounts.isEmpty()) {
+        log.warn("{} Bulk files overlapped too many tablets : {}", 
FateTxId.formatTid(tid),
+            fileCounts);
+        throw new AcceptableThriftTableOperationException(tableId, null, 
TableOperation.BULK_IMPORT,
+            TableOperationExceptionType.OTHER, "Files overlap the configured 
max (" + maxNumTablets
+                + ") number of tablets: " + fileCounts.keySet());
+      }
+    }
   }
 
-  private void checkForMerge(final Master master) throws Exception {
+  private void checkForMerge(final long tid, final Master master) throws 
Exception {
 
     VolumeManager fs = master.getVolumeManager();
     final Path bulkDir = new Path(bulkInfo.sourceDir);
+
+    int maxTablets = 
Integer.parseInt(master.getContext().getTableConfiguration(bulkInfo.tableId)
+        .get(Property.TABLE_BULK_MAX_TABLETS));
+
     try (LoadMappingIterator lmi =
         BulkSerialize.readLoadMapping(bulkDir.toString(), bulkInfo.tableId, 
fs::open)) {
 
@@ -166,15 +194,14 @@ public class PrepBulkImport extends MasterRepo {
           .forTable(bulkInfo.tableId).overlapping(startRow, 
null).checkConsistency().fetch(PREV_ROW)
           
.build(master.getContext()).stream().map(TabletMetadata::getExtent).iterator();
 
-      checkForMerge(bulkInfo.tableId.canonical(), Iterators.transform(lmi, 
Map.Entry::getKey),
-          tabletIterFactory);
+      checkForMerge(bulkInfo.tableId.canonical(), lmi, tabletIterFactory, 
maxTablets, tid);
     }
   }
 
   @Override
   public Repo<Master> call(final long tid, final Master master) throws 
Exception {
     // now that table lock is acquired check that all splits in load mapping 
exists in table
-    checkForMerge(master);
+    checkForMerge(tid, master);
 
     bulkInfo.tableState = Tables.getTableState(master.getContext(), 
bulkInfo.tableId);
 
diff --git 
a/server/master/src/test/java/org/apache/accumulo/master/tableOps/bulkVer2/PrepBulkImportTest.java
 
b/server/master/src/test/java/org/apache/accumulo/master/tableOps/bulkVer2/PrepBulkImportTest.java
index 88a0cc8..4d56526 100644
--- 
a/server/master/src/test/java/org/apache/accumulo/master/tableOps/bulkVer2/PrepBulkImportTest.java
+++ 
b/server/master/src/test/java/org/apache/accumulo/master/tableOps/bulkVer2/PrepBulkImportTest.java
@@ -21,6 +21,9 @@ package org.apache.accumulo.master.tableOps.bulkVer2;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -28,10 +31,15 @@ import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
 import java.util.stream.Collectors;
 
 import org.apache.accumulo.core.client.AccumuloException;
 import 
org.apache.accumulo.core.clientImpl.AcceptableThriftTableOperationException;
+import org.apache.accumulo.core.clientImpl.bulk.Bulk;
+import org.apache.accumulo.core.clientImpl.bulk.BulkSerialize;
+import org.apache.accumulo.core.clientImpl.bulk.LoadMappingIterator;
 import org.apache.accumulo.core.data.TableId;
 import org.apache.accumulo.core.dataImpl.KeyExtent;
 import 
org.apache.accumulo.master.tableOps.bulkVer2.PrepBulkImport.TabletIterFactory;
@@ -97,7 +105,28 @@ public class PrepBulkImportTest {
       return tabletRanges.subList(start, tabletRanges.size()).iterator();
     };
 
-    PrepBulkImport.checkForMerge("1", loadRanges.iterator(), 
tabletIterFactory);
+    try (LoadMappingIterator lmi = createLoadMappingIter(loadRanges)) {
+      PrepBulkImport.checkForMerge("1", lmi, tabletIterFactory, 100, 10001);
+    }
+  }
+
+  private LoadMappingIterator createLoadMappingIter(List<KeyExtent> 
loadRanges) throws IOException {
+    SortedMap<KeyExtent,Bulk.Files> mapping = new TreeMap<>();
+    Bulk.Files testFiles = new Bulk.Files();
+
+    long c = 0L;
+    for (String f : "f1 f2 f3".split(" ")) {
+      c++;
+      testFiles.add(new Bulk.FileInfo(f, c, c));
+    }
+    for (KeyExtent ke : loadRanges)
+      mapping.put(ke, testFiles);
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    BulkSerialize.writeLoadMapping(mapping, "/some/dir", p -> baos);
+    ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
+    LoadMappingIterator lmi =
+        BulkSerialize.readLoadMapping("/some/dir", TableId.of("1"), p -> bais);
+    return lmi;
   }
 
   static String toRangeStrings(Collection<KeyExtent> extents) {
diff --git 
a/test/src/main/java/org/apache/accumulo/test/functional/BulkNewIT.java 
b/test/src/main/java/org/apache/accumulo/test/functional/BulkNewIT.java
index a4b2032..a2f925b 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/BulkNewIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/BulkNewIT.java
@@ -22,6 +22,7 @@ import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType
 import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOADED;
 import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.PREV_ROW;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -41,6 +42,7 @@ import java.util.Map.Entry;
 import java.util.Set;
 import java.util.SortedSet;
 import java.util.TreeSet;
+import java.util.concurrent.ExecutionException;
 import java.util.stream.Collectors;
 
 import org.apache.accumulo.core.client.Accumulo;
@@ -50,6 +52,7 @@ import org.apache.accumulo.core.client.Scanner;
 import org.apache.accumulo.core.client.admin.NewTableConfiguration;
 import org.apache.accumulo.core.client.admin.TimeType;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.crypto.CryptoServiceFactory;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.LoadPlan;
@@ -183,6 +186,32 @@ public class BulkNewIT extends SharedMiniClusterBase {
     }
   }
 
+  @Test
+  public void testMaxTablets() throws Exception {
+    try (AccumuloClient client = 
Accumulo.newClient().from(getClientProps()).build()) {
+      tableName = "testMaxTablets_table1";
+      NewTableConfiguration newTableConf = new NewTableConfiguration();
+      // set logical time type so we can set time on bulk import
+      var props = Map.of(Property.TABLE_BULK_MAX_TABLETS.getKey(), "2");
+      newTableConf.setProperties(props);
+      client.tableOperations().create(tableName, newTableConf);
+
+      // test max tablets hit while inspecting bulk files
+      var thrown = assertThrows(RuntimeException.class, () -> 
testBulkFileMax(false));
+      var c = thrown.getCause();
+      assertTrue("Wrong exception: " + c, c instanceof ExecutionException);
+      assertTrue("Wrong exception: " + c.getCause(),
+          c.getCause() instanceof IllegalArgumentException);
+      var msg = c.getCause().getMessage();
+      assertTrue("Bad File not in exception: " + msg, 
msg.contains("bad-file.rf"));
+
+      // test max tablets hit using load plan on the server side
+      c = assertThrows(AccumuloException.class, () -> testBulkFileMax(true));
+      msg = c.getMessage();
+      assertTrue("Bad File not in exception: " + msg, 
msg.contains("bad-file.rf"));
+    }
+  }
+
   private void testSingleTabletSingleFileNoSplits(AccumuloClient c, boolean 
offline)
       throws Exception {
     if (offline) {
@@ -313,6 +342,56 @@ public class BulkNewIT extends SharedMiniClusterBase {
     }
   }
 
+  private void testBulkFileMax(boolean usePlan) throws Exception {
+    try (AccumuloClient c = 
Accumulo.newClient().from(getClientProps()).build()) {
+      addSplits(c, tableName, "0333 0666 0999 1333 1666");
+
+      String dir = getDir("/testBulkFileMax-");
+
+      Map<String,Set<String>> hashes = new HashMap<>();
+      for (String endRow : Arrays.asList("0333 0666 0999 1333 1666 
null".split(" "))) {
+        hashes.put(endRow, new HashSet<>());
+      }
+
+      // Add a junk file, should be ignored
+      FSDataOutputStream out = fs.create(new Path(dir, "junk"));
+      out.writeChars("ABCDEFG\n");
+      out.close();
+
+      // 1 Tablet 0333-null
+      String h1 = writeData(dir + "/f1.", aconf, 0, 333);
+      hashes.get("0333").add(h1);
+
+      // 3 Tablets 0666-0334, 0999-0667, 1333-1000
+      String h2 = writeData(dir + "/bad-file.", aconf, 334, 1333);
+      hashes.get("0666").add(h2);
+      hashes.get("0999").add(h2);
+      hashes.get("1333").add(h2);
+
+      // 1 Tablet 1666-1334
+      String h3 = writeData(dir + "/f3.", aconf, 1334, 1499);
+      hashes.get("1666").add(h3);
+
+      // 2 Tablets 1666-1334, >1666
+      String h4 = writeData(dir + "/f4.", aconf, 1500, 1999);
+      hashes.get("1666").add(h4);
+      hashes.get("null").add(h4);
+
+      if (usePlan) {
+        LoadPlan loadPlan = LoadPlan.builder().loadFileTo("f1.rf", 
RangeType.TABLE, null, row(333))
+            .loadFileTo("bad-file.rf", RangeType.TABLE, row(333), row(1333))
+            .loadFileTo("f3.rf", RangeType.FILE, row(1334), row(1499))
+            .loadFileTo("f4.rf", RangeType.FILE, row(1500), row(1999)).build();
+        
c.tableOperations().importDirectory(dir).to(tableName).plan(loadPlan).load();
+      } else {
+        c.tableOperations().importDirectory(dir).to(tableName).load();
+      }
+
+      verifyData(c, tableName, 0, 1999, false);
+      verifyMetadata(c, tableName, hashes);
+    }
+  }
+
   @Test
   public void testBulkFile() throws Exception {
     testBulkFile(false, false);

Reply via email to