This is an automated email from the ASF dual-hosted git repository.

dlmarion pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new 19c81d8249 User compaction not retried after failure (#3555)
19c81d8249 is described below

commit 19c81d8249969cc6ee11a41330cc3d0a9215932a
Author: Dave Marion <[email protected]>
AuthorDate: Wed Jul 5 13:33:16 2023 -0400

    User compaction not retried after failure (#3555)
    
    Modified CompactableImpl.completeCompaction to always
    call completed so that the input files will become unreserved.
    In the case of an unsuccessful compaction, it will be retried.
    Added internal and external compaction tests.
---
 .../compactions/InternalCompactionExecutor.java    |   1 +
 .../accumulo/tserver/tablet/CompactableImpl.java   |  14 +--
 .../tablet/CompactableImplFileManagerTest.java     |   2 +-
 .../java/org/apache/accumulo/test/TestIngest.java  |  10 ++
 .../test/compaction/ExternalCompaction4_IT.java    | 135 +++++++++++++++++++++
 .../accumulo/test/functional/CompactionIT.java     |  78 ++++++++++++
 .../test/functional/ErrorThrowingIterator.java     | 106 ++++++++++++++++
 .../accumulo/test/functional/ReadWriteIT.java      |   6 +
 8 files changed, 342 insertions(+), 10 deletions(-)

diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/InternalCompactionExecutor.java
 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/InternalCompactionExecutor.java
index b7cdac5d4c..1598ef5eee 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/InternalCompactionExecutor.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/InternalCompactionExecutor.java
@@ -100,6 +100,7 @@ public class InternalCompactionExecutor implements 
CompactionExecutor {
       } catch (RuntimeException e) {
         log.warn("Compaction failed for {} on {}", compactable.getExtent(), 
getJob(), e);
         status.compareAndSet(Status.RUNNING, Status.FAILED);
+        completionCallback.accept(compactable);
       } finally {
         status.compareAndSet(Status.RUNNING, Status.COMPLETE);
       }
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableImpl.java
 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableImpl.java
index df9ec0514d..f020657bed 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableImpl.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableImpl.java
@@ -579,14 +579,15 @@ public class CompactableImpl implements Compactable {
      * @param newFile The file produced by a compaction. If the compaction 
failed, this can be null.
      */
     void completed(CompactionJob job, Set<StoredTabletFile> jobFiles,
-        Optional<StoredTabletFile> newFile) {
+        Optional<StoredTabletFile> newFile, boolean successful) {
       Preconditions.checkArgument(!jobFiles.isEmpty());
       Preconditions.checkState(allCompactingFiles.removeAll(jobFiles));
       if (newFile.isPresent()) {
         choppedFiles.add(newFile.orElseThrow());
       }
 
-      if ((job.getKind() == CompactionKind.USER || job.getKind() == 
CompactionKind.SELECTOR)) {
+      if (successful
+          && (job.getKind() == CompactionKind.USER || job.getKind() == 
CompactionKind.SELECTOR)) {
         selectedCompactionCompleted(job, jobFiles, newFile);
       }
     }
@@ -1258,9 +1259,7 @@ public class CompactableImpl implements Compactable {
       Optional<StoredTabletFile> metaFile, boolean successful) {
     synchronized (this) {
       Preconditions.checkState(removeJob(job));
-      if (successful) {
-        fileMgr.completed(job, jobFiles, metaFile);
-      }
+      fileMgr.completed(job, jobFiles, metaFile, successful);
 
       if (!compactionRunning) {
         notifyAll();
@@ -1288,7 +1287,6 @@ public class CompactableImpl implements Compactable {
     CompactionStats stats = new CompactionStats();
 
     boolean successful = false;
-
     try {
       TabletLogger.compacting(getExtent(), job, cInfo.localCompactionCfg);
       tablet.incrementStatusMajor();
@@ -1306,7 +1304,6 @@ public class CompactableImpl implements Compactable {
           compactFiles, allFiles, kind, tmpFileName);
 
       TabletLogger.compacted(getExtent(), job, newFile.orElse(null));
-
       successful = true;
     } catch (CompactionCanceledException cce) {
       log.debug("Compaction canceled {} ", getExtent());
@@ -1388,11 +1385,10 @@ public class CompactableImpl implements Compactable {
 
       ExternalCompactionInfo ecInfo = externalCompactions.get(extCompactionId);
 
-      boolean successful = false;
-
       if (ecInfo != null) {
         log.debug("Attempting to commit external compaction {}", 
extCompactionId);
         Optional<StoredTabletFile> metaFile = Optional.empty();
+        boolean successful = false;
         try {
           metaFile =
               
tablet.getDatafileManager().bringMajorCompactionOnline(ecInfo.meta.getJobFiles(),
diff --git 
a/server/tserver/src/test/java/org/apache/accumulo/tserver/tablet/CompactableImplFileManagerTest.java
 
b/server/tserver/src/test/java/org/apache/accumulo/tserver/tablet/CompactableImplFileManagerTest.java
index 44bf56f3a2..971ee51e31 100644
--- 
a/server/tserver/src/test/java/org/apache/accumulo/tserver/tablet/CompactableImplFileManagerTest.java
+++ 
b/server/tserver/src/test/java/org/apache/accumulo/tserver/tablet/CompactableImplFileManagerTest.java
@@ -428,7 +428,7 @@ public class CompactableImplFileManagerTest {
     }
 
     void completed(TestCompactionJob job, StoredTabletFile newFile) {
-      super.completed(job, job.getSTFiles(), Optional.ofNullable(newFile));
+      super.completed(job, job.getSTFiles(), Optional.ofNullable(newFile), 
true);
     }
 
     @Override
diff --git a/test/src/main/java/org/apache/accumulo/test/TestIngest.java 
b/test/src/main/java/org/apache/accumulo/test/TestIngest.java
index fd0d795b1a..f20cf31afe 100644
--- a/test/src/main/java/org/apache/accumulo/test/TestIngest.java
+++ b/test/src/main/java/org/apache/accumulo/test/TestIngest.java
@@ -79,6 +79,7 @@ public class TestIngest {
     public int stride;
     public String columnFamily = "colf";
     public ColumnVisibility columnVisibility = new ColumnVisibility();
+    public int flushAfterRows = 0;
 
     public IngestParams(Properties props) {
       clientProps = props;
@@ -142,6 +143,10 @@ public class TestIngest {
         description = "place columns in this column family", converter = 
VisibilityConverter.class)
     ColumnVisibility columnVisibility = new ColumnVisibility();
 
+    @Parameter(names = {"-fr", "--flushAfterRows"},
+        description = "flush after N rows, 0 is default and means disabled")
+    int flushAfterRows = 0;
+
     protected void populateIngestPrams(IngestParams params) {
       params.createTable = createTable;
       params.numsplits = numsplits;
@@ -156,6 +161,7 @@ public class TestIngest {
       params.stride = stride;
       params.columnFamily = columnFamily;
       params.columnVisibility = columnVisibility;
+      params.flushAfterRows = flushAfterRows;
     }
 
     public IngestParams getIngestPrams() {
@@ -357,6 +363,10 @@ public class TestIngest {
       }
       if (bw != null) {
         bw.addMutation(m);
+        if ((params.flushAfterRows != 0) && (i % params.flushAfterRows == 0)) {
+          bw.flush();
+          accumuloClient.tableOperations().flush(params.tableName, null, null, 
true);
+        }
       }
     }
 
diff --git 
a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction4_IT.java
 
b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction4_IT.java
new file mode 100644
index 0000000000..12b0eb5286
--- /dev/null
+++ 
b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction4_IT.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.test.compaction;
+
+import static 
org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.QUEUE1;
+import static 
org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.createTable;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import java.util.EnumSet;
+import java.util.NoSuchElementException;
+
+import org.apache.accumulo.compactor.Compactor;
+import org.apache.accumulo.coordinator.CompactionCoordinator;
+import org.apache.accumulo.core.client.Accumulo;
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.admin.CompactionConfig;
+import org.apache.accumulo.core.clientImpl.ClientContext;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.TableId;
+import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
+import org.apache.accumulo.core.iterators.user.AgeOffFilter;
+import org.apache.accumulo.core.metadata.schema.Ample;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType;
+import org.apache.accumulo.core.metadata.schema.TabletsMetadata;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.accumulo.minicluster.ServerType;
+import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.test.functional.ErrorThrowingIterator;
+import org.apache.accumulo.test.functional.ReadWriteIT;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.jupiter.api.Test;
+
+public class ExternalCompaction4_IT extends AccumuloClusterHarness {
+
+  @Override
+  public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration 
coreSite) {
+    ExternalCompactionTestUtils.configureMiniCluster(cfg, coreSite);
+    cfg.setNumCompactors(2);
+  }
+
+  @Test
+  public void testErrorDuringCompactionNoOutput() throws Exception {
+    final String table1 = this.getUniqueNames(1)[0];
+    try (AccumuloClient client = 
Accumulo.newClient().from(getClientProps()).build()) {
+      
getCluster().getClusterControl().startCoordinator(CompactionCoordinator.class);
+      getCluster().getClusterControl().startCompactors(Compactor.class, 1, 
QUEUE1);
+      createTable(client, table1, "cs1");
+      client.tableOperations().setProperty(table1, 
Property.TABLE_MAJC_RATIO.getKey(), "51");
+      TableId tid = 
TableId.of(client.tableOperations().tableIdMap().get(table1));
+
+      ReadWriteIT.ingest(client, 50, 1, 1, 0, "colf", table1, 1);
+      ReadWriteIT.verify(client, 50, 1, 1, 0, table1);
+
+      Ample ample = ((ClientContext) client).getAmple();
+      TabletsMetadata tms = 
ample.readTablets().forTable(tid).fetch(ColumnType.FILES).build();
+      TabletMetadata tm = tms.iterator().next();
+      assertEquals(50, tm.getFiles().size());
+
+      IteratorSetting setting = new IteratorSetting(50, "ageoff", 
AgeOffFilter.class);
+      setting.addOption("ttl", "0");
+      setting.addOption("currentTime", 
Long.toString(System.currentTimeMillis() + 86400));
+      client.tableOperations().attachIterator(table1, setting, 
EnumSet.of(IteratorScope.majc));
+
+      // Since this iterator is on the top, it will throw an error 3 times, 
then allow the
+      // ageoff iterator to do its work.
+      IteratorSetting setting2 = new IteratorSetting(51, "error", 
ErrorThrowingIterator.class);
+      setting2.addOption(ErrorThrowingIterator.TIMES, "3");
+      client.tableOperations().attachIterator(table1, setting2, 
EnumSet.of(IteratorScope.majc));
+      client.tableOperations().compact(table1, new 
CompactionConfig().setWait(true));
+
+      assertThrows(NoSuchElementException.class, () -> 
ample.readTablets().forTable(tid)
+          .fetch(ColumnType.FILES).build().iterator().next());
+      assertEquals(0, client.createScanner(table1).stream().count());
+    } finally {
+      getCluster().getClusterControl().stopAllServers(ServerType.COMPACTOR);
+      
getCluster().getClusterControl().stopAllServers(ServerType.COMPACTION_COORDINATOR);
+    }
+  }
+
+  @Test
+  public void testErrorDuringUserCompaction() throws Exception {
+    final String table1 = this.getUniqueNames(1)[0];
+    try (AccumuloClient client = 
Accumulo.newClient().from(getClientProps()).build()) {
+      
getCluster().getClusterControl().startCoordinator(CompactionCoordinator.class);
+      getCluster().getClusterControl().startCompactors(Compactor.class, 1, 
QUEUE1);
+      createTable(client, table1, "cs1");
+      client.tableOperations().setProperty(table1, 
Property.TABLE_FILE_MAX.getKey(), "1001");
+      client.tableOperations().setProperty(table1, 
Property.TABLE_MAJC_RATIO.getKey(), "1001");
+      TableId tid = 
TableId.of(client.tableOperations().tableIdMap().get(table1));
+
+      ReadWriteIT.ingest(client, 1000, 1, 1, 0, "colf", table1, 1);
+
+      Ample ample = ((ClientContext) client).getAmple();
+      TabletsMetadata tms = 
ample.readTablets().forTable(tid).fetch(ColumnType.FILES).build();
+      TabletMetadata tm = tms.iterator().next();
+      assertEquals(1000, tm.getFiles().size());
+
+      IteratorSetting setting = new IteratorSetting(50, "error", 
ErrorThrowingIterator.class);
+      setting.addOption(ErrorThrowingIterator.TIMES, "3");
+      client.tableOperations().attachIterator(table1, setting, 
EnumSet.of(IteratorScope.majc));
+      client.tableOperations().compact(table1, new 
CompactionConfig().setWait(true));
+
+      tms = ample.readTablets().forTable(tid).fetch(ColumnType.FILES).build();
+      tm = tms.iterator().next();
+      assertEquals(1, tm.getFiles().size());
+
+      ReadWriteIT.verify(client, 1000, 1, 1, 0, table1);
+
+    } finally {
+      getCluster().getClusterControl().stopAllServers(ServerType.COMPACTOR);
+      
getCluster().getClusterControl().stopAllServers(ServerType.COMPACTION_COORDINATOR);
+    }
+
+  }
+
+}
diff --git 
a/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java 
b/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java
index 42e374f277..efa7d3ffc9 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java
@@ -22,6 +22,7 @@ import static java.util.concurrent.TimeUnit.SECONDS;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 import java.io.IOException;
@@ -31,6 +32,7 @@ import java.util.EnumSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.NoSuchElementException;
 import java.util.Objects;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -51,20 +53,27 @@ import org.apache.accumulo.core.client.admin.PluginConfig;
 import org.apache.accumulo.core.client.admin.compaction.CompactableFile;
 import org.apache.accumulo.core.client.admin.compaction.CompactionSelector;
 import org.apache.accumulo.core.client.admin.compaction.CompressionConfigurer;
+import org.apache.accumulo.core.clientImpl.ClientContext;
 import org.apache.accumulo.core.clientImpl.TableOperationsImpl;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.TableId;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.iterators.DevNull;
 import org.apache.accumulo.core.iterators.Filter;
 import org.apache.accumulo.core.iterators.IteratorEnvironment;
 import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
 import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.iterators.user.AgeOffFilter;
 import org.apache.accumulo.core.iterators.user.GrepIterator;
 import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.schema.Ample;
 import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily;
 import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType;
+import org.apache.accumulo.core.metadata.schema.TabletsMetadata;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.harness.AccumuloClusterHarness;
 import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
@@ -185,6 +194,7 @@ public class CompactionIT extends AccumuloClusterHarness {
           m.put("cf", "cq", new Value());
           bw.addMutation(m);
           bw.flush();
+          // flush often to create multiple files to compact
           c.tableOperations().flush(tableName, null, null, true);
         }
       }
@@ -225,6 +235,7 @@ public class CompactionIT extends AccumuloClusterHarness {
           m.put("cf", "cq", new Value());
           bw.addMutation(m);
           bw.flush();
+          // flush often to create multiple files to compact
           client.tableOperations().flush(table1, null, null, true);
         }
       }
@@ -250,6 +261,7 @@ public class CompactionIT extends AccumuloClusterHarness {
           m.put("cf", "cq", new Value());
           bw.addMutation(m);
           bw.flush();
+          // flush often to create multiple files to compact
           client.tableOperations().flush(table1, null, null, true);
         }
       }
@@ -280,6 +292,70 @@ public class CompactionIT extends AccumuloClusterHarness {
     }
   }
 
+  @Test
+  public void testErrorDuringUserCompaction() throws Exception {
+    final String table1 = this.getUniqueNames(1)[0];
+    try (AccumuloClient client = 
Accumulo.newClient().from(getClientProps()).build()) {
+      client.tableOperations().create(table1);
+      client.tableOperations().setProperty(table1, 
Property.TABLE_FILE_MAX.getKey(), "1001");
+      client.tableOperations().setProperty(table1, 
Property.TABLE_MAJC_RATIO.getKey(), "1001");
+      TableId tid = 
TableId.of(client.tableOperations().tableIdMap().get(table1));
+
+      ReadWriteIT.ingest(client, MAX_DATA, 1, 1, 0, "colf", table1, 1);
+
+      Ample ample = ((ClientContext) client).getAmple();
+      TabletsMetadata tms = 
ample.readTablets().forTable(tid).fetch(ColumnType.FILES).build();
+      TabletMetadata tm = tms.iterator().next();
+      assertEquals(1000, tm.getFiles().size());
+
+      IteratorSetting setting = new IteratorSetting(50, "error", 
ErrorThrowingIterator.class);
+      setting.addOption(ErrorThrowingIterator.TIMES, "3");
+      client.tableOperations().attachIterator(table1, setting, 
EnumSet.of(IteratorScope.majc));
+      client.tableOperations().compact(table1, new 
CompactionConfig().setWait(true));
+
+      tms = ample.readTablets().forTable(tid).fetch(ColumnType.FILES).build();
+      tm = tms.iterator().next();
+      assertEquals(1, tm.getFiles().size());
+
+      ReadWriteIT.verify(client, MAX_DATA, 1, 1, 0, table1);
+
+    }
+  }
+
+  @Test
+  public void testErrorDuringCompactionNoOutput() throws Exception {
+    final String table1 = this.getUniqueNames(1)[0];
+    try (AccumuloClient client = 
Accumulo.newClient().from(getClientProps()).build()) {
+      client.tableOperations().create(table1);
+      client.tableOperations().setProperty(table1, 
Property.TABLE_MAJC_RATIO.getKey(), "51");
+      TableId tid = 
TableId.of(client.tableOperations().tableIdMap().get(table1));
+
+      ReadWriteIT.ingest(client, 50, 1, 1, 0, "colf", table1, 1);
+      ReadWriteIT.verify(client, 50, 1, 1, 0, table1);
+
+      Ample ample = ((ClientContext) client).getAmple();
+      TabletsMetadata tms = 
ample.readTablets().forTable(tid).fetch(ColumnType.FILES).build();
+      TabletMetadata tm = tms.iterator().next();
+      assertEquals(50, tm.getFiles().size());
+
+      IteratorSetting setting = new IteratorSetting(50, "ageoff", 
AgeOffFilter.class);
+      setting.addOption("ttl", "0");
+      setting.addOption("currentTime", 
Long.toString(System.currentTimeMillis() + 86400));
+      client.tableOperations().attachIterator(table1, setting, 
EnumSet.of(IteratorScope.majc));
+
+      // Since this iterator is on the top, it will throw an error 3 times, 
then allow the
+      // ageoff iterator to do its work.
+      IteratorSetting setting2 = new IteratorSetting(51, "error", 
ErrorThrowingIterator.class);
+      setting2.addOption(ErrorThrowingIterator.TIMES, "3");
+      client.tableOperations().attachIterator(table1, setting2, 
EnumSet.of(IteratorScope.majc));
+      client.tableOperations().compact(table1, new 
CompactionConfig().setWait(true));
+
+      assertThrows(NoSuchElementException.class, () -> 
ample.readTablets().forTable(tid)
+          .fetch(ColumnType.FILES).build().iterator().next());
+      assertEquals(0, client.createScanner(table1).stream().count());
+    }
+  }
+
   @Test
   public void testTableDeletedDuringUserCompaction() throws Exception {
     final String table1 = this.getUniqueNames(1)[0];
@@ -291,6 +367,7 @@ public class CompactionIT extends AccumuloClusterHarness {
           m.put("cf", "cq", new Value());
           bw.addMutation(m);
           bw.flush();
+          // flush often to create multiple files to compact
           client.tableOperations().flush(table1, null, null, true);
         }
       }
@@ -516,6 +593,7 @@ public class CompactionIT extends AccumuloClusterHarness {
           m.put("f1", "q1", "v" + i);
           writer.addMutation(m);
           writer.flush();
+          // flush often to create multiple files to compact
           c.tableOperations().flush(tableName, null, null, true);
         }
       }
diff --git 
a/test/src/main/java/org/apache/accumulo/test/functional/ErrorThrowingIterator.java
 
b/test/src/main/java/org/apache/accumulo/test/functional/ErrorThrowingIterator.java
new file mode 100644
index 0000000000..d9cb2c0e8d
--- /dev/null
+++ 
b/test/src/main/java/org/apache/accumulo/test/functional/ErrorThrowingIterator.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.IteratorEnvironment;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.iterators.WrappingIterator;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Iterator used in tests *and* the test class must spawn a new MAC instance 
for each test since the
+ * timesThrown variable is static.
+ */
+public class ErrorThrowingIterator extends WrappingIterator {
+
+  public static final String TIMES = "error.throwing.iterator.times";
+
+  private static final String MESSAGE = "Exception thrown from 
ErrorThrowingIterator";
+  private static final RuntimeException ERROR = new RuntimeException(MESSAGE);
+  private static final AtomicInteger TIMES_THROWN = new AtomicInteger(0);
+
+  private int threshold = 0;
+
+  @Override
+  public void init(SortedKeyValueIterator<Key,Value> source, 
Map<String,String> options,
+      IteratorEnvironment env) throws IOException {
+    super.init(source, options, env);
+    threshold = Integer.parseInt(options.get(TIMES));
+    Preconditions.checkState(TIMES_THROWN.get() <= threshold,
+        "This iterator does not"
+            + " support reuse within the same VM. If using in an IT, then be 
sure to use"
+            + " a different MAC instance between tests.");
+  }
+
+  private void incrementAndThrow(RuntimeException t) {
+    if (TIMES_THROWN.get() < threshold) {
+      TIMES_THROWN.incrementAndGet();
+      throw t;
+    }
+  }
+
+  private void incrementAndThrowIOE() throws IOException {
+    if (TIMES_THROWN.get() < threshold) {
+      TIMES_THROWN.incrementAndGet();
+      throw new IOException(MESSAGE);
+    }
+  }
+
+  @Override
+  public Key getTopKey() {
+    incrementAndThrow(ERROR);
+    return super.getTopKey();
+  }
+
+  @Override
+  public Value getTopValue() {
+    incrementAndThrow(ERROR);
+    return super.getTopValue();
+  }
+
+  @Override
+  public boolean hasTop() {
+    incrementAndThrow(ERROR);
+    return super.hasTop();
+  }
+
+  @Override
+  public void next() throws IOException {
+    incrementAndThrowIOE();
+    super.next();
+  }
+
+  @Override
+  public void seek(Range range, Collection<ByteSequence> columnFamilies, 
boolean inclusive)
+      throws IOException {
+    incrementAndThrowIOE();
+    super.seek(range, columnFamilies, inclusive);
+  }
+
+}
diff --git 
a/test/src/main/java/org/apache/accumulo/test/functional/ReadWriteIT.java 
b/test/src/main/java/org/apache/accumulo/test/functional/ReadWriteIT.java
index e2693edf90..44eb5b2374 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/ReadWriteIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/ReadWriteIT.java
@@ -194,12 +194,18 @@ public class ReadWriteIT extends AccumuloClusterHarness {
 
   public static void ingest(AccumuloClient accumuloClient, int rows, int cols, 
int width,
       int offset, String colf, String tableName) throws Exception {
+    ingest(accumuloClient, rows, cols, width, offset, colf, tableName, 0);
+  }
+
+  public static void ingest(AccumuloClient accumuloClient, int rows, int cols, 
int width,
+      int offset, String colf, String tableName, int flushAfterRows) throws 
Exception {
     IngestParams params = new IngestParams(accumuloClient.properties(), 
tableName, rows);
     params.cols = cols;
     params.dataSize = width;
     params.startRow = offset;
     params.columnFamily = colf;
     params.createTable = true;
+    params.flushAfterRows = flushAfterRows;
     TestIngest.ingest(accumuloClient, params);
   }
 

Reply via email to