This is an automated email from the ASF dual-hosted git repository.
kturner pushed a commit to branch 1451-external-compactions-feature
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to
refs/heads/1451-external-compactions-feature by this push:
new b7283aa more IT tweaks
b7283aa is described below
commit b7283aa5853aa62fc30e082d170536c2c810f8c0
Author: Keith Turner <[email protected]>
AuthorDate: Tue Apr 27 20:32:44 2021 -0400
more IT tweaks
---
.../apache/accumulo/test/ExternalCompactionIT.java | 78 ++++++++++++----------
1 file changed, 42 insertions(+), 36 deletions(-)
diff --git
a/test/src/main/java/org/apache/accumulo/test/ExternalCompactionIT.java
b/test/src/main/java/org/apache/accumulo/test/ExternalCompactionIT.java
index 00556f4..fc2ef28 100644
--- a/test/src/main/java/org/apache/accumulo/test/ExternalCompactionIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/ExternalCompactionIT.java
@@ -33,12 +33,16 @@ import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.net.http.HttpResponse.BodyHandlers;
import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.accumulo.compactor.CompactionEnvironment.CompactorIterEnv;
@@ -192,18 +196,15 @@ public class ExternalCompactionIT extends
ConfigurableMacBase {
compact(client, table1, 2, "DCQ1", false);
// Wait for the compaction to start by waiting for 1 external compaction
column
- List<TabletMetadata> md = new ArrayList<>();
- TabletsMetadata tm = null;
+ Set<ExternalCompactionId> ecids = new HashSet<>();
do {
- if (null != tm) {
- tm.close();
+ UtilWaitThread.sleep(50);
+ try (TabletsMetadata tm =
getCluster().getServerContext().getAmple().readTablets()
+ .forTable(tid).fetch(ColumnType.ECOMP).build()) {
+ tm.stream().flatMap(t ->
t.getExternalCompactions().keySet().stream())
+ .forEach(ecids::add);
}
- tm =
getCluster().getServerContext().getAmple().readTablets().forTable(tid)
- .fetch(ColumnType.ECOMP).build();
- tm.forEach(t -> md.add(t));
- } while (md.size() == 0);
- tm.close();
- md.clear();
+ } while (ecids.isEmpty());
// ExternalDoNothingCompactor will not compact, it will wait, split the
table.
SortedSet<Text> splits = new TreeSet<>();
@@ -229,6 +230,13 @@ public class ExternalCompactionIT extends
ConfigurableMacBase {
assertEquals(0, metrics.getCompleted());
assertEquals(1, metrics.getFailed());
+ // ensure compaction ids were deleted by split operation from metadata
table
+ try (TabletsMetadata tm =
getCluster().getServerContext().getAmple().readTablets()
+ .forTable(tid).fetch(ColumnType.ECOMP).build()) {
+ Set<ExternalCompactionId> ecids2 = tm.stream()
+ .flatMap(t ->
t.getExternalCompactions().keySet().stream()).collect(Collectors.toSet());
+ assertTrue(Collections.disjoint(ecids, ecids2));
+ }
}
}
@@ -258,44 +266,34 @@ public class ExternalCompactionIT extends
ConfigurableMacBase {
cluster.exec(TestCompactionCoordinator.class);
// Wait for the compaction to start by waiting for 1 external compaction
column
- List<TabletMetadata> md = new ArrayList<>();
- TabletsMetadata tm = null;
+ Set<ExternalCompactionId> ecids = new HashSet<>();
do {
- if (null != tm) {
- tm.close();
- }
UtilWaitThread.sleep(50);
- tm =
getCluster().getServerContext().getAmple().readTablets().forTable(tid)
- .fetch(ColumnType.ECOMP).build();
- tm.forEach(t -> md.add(t));
- } while (md.size() == 0);
- tm.close();
+ try (TabletsMetadata tm =
getCluster().getServerContext().getAmple().readTablets()
+ .forTable(tid).fetch(ColumnType.ECOMP).build()) {
+ tm.stream().flatMap(t ->
t.getExternalCompactions().keySet().stream())
+ .forEach(ecids::add);
+ }
+ } while (ecids.isEmpty());
+ ;
- md.clear();
- tm =
getCluster().getServerContext().getAmple().readTablets().forTable(tid)
- .fetch(ColumnType.PREV_ROW).build();
- tm.forEach(t -> md.add(t));
- assertEquals(2, md.size());
- Text start = md.get(0).getPrevEndRow();
- Text end = md.get(1).getEndRow();
+ var md = new ArrayList<TabletMetadata>();
+ try (TabletsMetadata tm =
getCluster().getServerContext().getAmple().readTablets()
+ .forTable(tid).fetch(ColumnType.PREV_ROW).build()) {
+ tm.forEach(t -> md.add(t));
+ assertEquals(2, md.size());
+ }
assertEquals(0, getCoordinatorMetrics().getFailed());
// Merge - blocking operation
+ Text start = md.get(0).getPrevEndRow();
+ Text end = md.get(1).getEndRow();
client.tableOperations().merge(table1, start, end);
- // Confirm that there are no external compaction markers or final states
in the metadata table
- md.clear();
- tm =
getCluster().getServerContext().getAmple().readTablets().forTable(tid)
- .fetch(ColumnType.ECOMP).build();
- tm.forEach(t -> md.add(t));
- assertEquals(0, md.size());
assertEquals(0,
getCluster().getServerContext().getAmple().getExternalCompactionFinalStates().count());
- // Wait for the table to merge by waiting for only 1 tablet to show up
in the metadata table
- tm.close();
-
// wait for failure or test timeout
ExternalCompactionMetrics metrics = getCoordinatorMetrics();
while (metrics.getFailed() == 0) {
@@ -309,6 +307,14 @@ public class ExternalCompactionIT extends
ConfigurableMacBase {
assertEquals(0, metrics.getCompleted());
assertTrue(metrics.getFailed() > 0);
+ // ensure compaction ids were deleted by merge operation from metadata
table
+ try (TabletsMetadata tm =
getCluster().getServerContext().getAmple().readTablets()
+ .forTable(tid).fetch(ColumnType.ECOMP).build()) {
+ Set<ExternalCompactionId> ecids2 = tm.stream()
+ .flatMap(t ->
t.getExternalCompactions().keySet().stream()).collect(Collectors.toSet());
+ assertTrue(Collections.disjoint(ecids, ecids2));
+ }
+
}
}