This is an automated email from the ASF dual-hosted git repository.
domgarguilo pushed a commit to branch elasticity
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/elasticity by this push:
new af3d4f1c5f Add filtering to Ample (#3968)
af3d4f1c5f is described below
commit af3d4f1c5fff5e8074498b0469c2e3177d3c79cc
Author: Dom G <[email protected]>
AuthorDate: Fri Jan 5 10:41:07 2024 -0500
Add filtering to Ample (#3968)
---------
Co-authored-by: Keith Turner <[email protected]>
---
.../core/iterators/user/HasCurrentFilter.java | 44 +++++
.../user/HasExternalCompactionsFilter.java | 45 +++++
.../core/iterators/user/HasWalsFilter.java | 45 +++++
.../core/iterators/user/TabletMetadataFilter.java | 43 ++++
.../core/metadata/schema/TabletsMetadata.java | 53 ++++-
.../accumulo/gc/GarbageCollectWriteAheadLogs.java | 7 +-
.../coordinator/CompactionCoordinator.java | 7 +-
.../coordinator/DeadCompactionDetector.java | 6 +-
.../monitor/rest/tables/TablesResource.java | 15 +-
.../test/functional/AmpleConditionalWriterIT.java | 218 +++++++++++++++++++++
.../test/functional/TestTabletMetadataFilter.java | 54 +++++
11 files changed, 520 insertions(+), 17 deletions(-)
diff --git
a/core/src/main/java/org/apache/accumulo/core/iterators/user/HasCurrentFilter.java
b/core/src/main/java/org/apache/accumulo/core/iterators/user/HasCurrentFilter.java
new file mode 100644
index 0000000000..ca58922306
--- /dev/null
+++
b/core/src/main/java/org/apache/accumulo/core/iterators/user/HasCurrentFilter.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.iterators.user;
+
+import java.util.Set;
+import java.util.function.Predicate;
+
+import org.apache.accumulo.core.metadata.schema.TabletMetadata;
+
+import com.google.common.collect.Sets;
+
+public class HasCurrentFilter extends TabletMetadataFilter {
+
+ public static final Set<TabletMetadata.ColumnType> COLUMNS =
+ Sets.immutableEnumSet(TabletMetadata.ColumnType.LOCATION);
+
+ private final static Predicate<TabletMetadata> HAS_CURRENT =
TabletMetadata::hasCurrent;
+
+ @Override
+ public Set<TabletMetadata.ColumnType> getColumns() {
+ return COLUMNS;
+ }
+
+ @Override
+ protected Predicate<TabletMetadata> acceptTablet() {
+ return HAS_CURRENT;
+ }
+}
diff --git
a/core/src/main/java/org/apache/accumulo/core/iterators/user/HasExternalCompactionsFilter.java
b/core/src/main/java/org/apache/accumulo/core/iterators/user/HasExternalCompactionsFilter.java
new file mode 100644
index 0000000000..fd3b5ae5d6
--- /dev/null
+++
b/core/src/main/java/org/apache/accumulo/core/iterators/user/HasExternalCompactionsFilter.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.iterators.user;
+
+import java.util.Set;
+import java.util.function.Predicate;
+
+import org.apache.accumulo.core.metadata.schema.TabletMetadata;
+
+import com.google.common.collect.Sets;
+
+public class HasExternalCompactionsFilter extends TabletMetadataFilter {
+
+ public static final Set<TabletMetadata.ColumnType> COLUMNS =
+ Sets.immutableEnumSet(TabletMetadata.ColumnType.ECOMP);
+
+ private final static Predicate<TabletMetadata> HAS_EXT_COMPS =
+ tabletMetadata -> !tabletMetadata.getExternalCompactions().isEmpty();
+
+ @Override
+ public Set<TabletMetadata.ColumnType> getColumns() {
+ return COLUMNS;
+ }
+
+ @Override
+ protected Predicate<TabletMetadata> acceptTablet() {
+ return HAS_EXT_COMPS;
+ }
+}
diff --git
a/core/src/main/java/org/apache/accumulo/core/iterators/user/HasWalsFilter.java
b/core/src/main/java/org/apache/accumulo/core/iterators/user/HasWalsFilter.java
new file mode 100644
index 0000000000..0563a7c3dc
--- /dev/null
+++
b/core/src/main/java/org/apache/accumulo/core/iterators/user/HasWalsFilter.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.iterators.user;
+
+import java.util.Set;
+import java.util.function.Predicate;
+
+import org.apache.accumulo.core.metadata.schema.TabletMetadata;
+
+import com.google.common.collect.Sets;
+
+public class HasWalsFilter extends TabletMetadataFilter {
+
+ private static final Set<TabletMetadata.ColumnType> COLUMNS =
+ Sets.immutableEnumSet(TabletMetadata.ColumnType.LOGS);
+
+ private final static Predicate<TabletMetadata> HAS_WALS =
+ tabletMetadata -> !tabletMetadata.getLogs().isEmpty();
+
+ @Override
+ public Set<TabletMetadata.ColumnType> getColumns() {
+ return COLUMNS;
+ }
+
+ @Override
+ protected Predicate<TabletMetadata> acceptTablet() {
+ return HAS_WALS;
+ }
+}
diff --git
a/core/src/main/java/org/apache/accumulo/core/iterators/user/TabletMetadataFilter.java
b/core/src/main/java/org/apache/accumulo/core/iterators/user/TabletMetadataFilter.java
new file mode 100644
index 0000000000..13163aebea
--- /dev/null
+++
b/core/src/main/java/org/apache/accumulo/core/iterators/user/TabletMetadataFilter.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.iterators.user;
+
+import java.util.EnumSet;
+import java.util.Set;
+import java.util.function.Predicate;
+
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.IteratorAdapter;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata;
+
+public abstract class TabletMetadataFilter extends RowFilter {
+
+ @Override
+ public boolean acceptRow(SortedKeyValueIterator<Key,Value> rowIterator) {
+ TabletMetadata tm = TabletMetadata.convertRow(new
IteratorAdapter(rowIterator),
+ EnumSet.copyOf(getColumns()), true, false);
+ return acceptTablet().test(tm);
+ }
+
+ public abstract Set<TabletMetadata.ColumnType> getColumns();
+
+ protected abstract Predicate<TabletMetadata> acceptTablet();
+}
diff --git
a/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletsMetadata.java
b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletsMetadata.java
index 4e28e46e9d..46b077e626 100644
---
a/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletsMetadata.java
+++
b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletsMetadata.java
@@ -63,6 +63,7 @@ import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.fate.zookeeper.ZooCache;
import org.apache.accumulo.core.fate.zookeeper.ZooReader;
+import org.apache.accumulo.core.iterators.user.TabletMetadataFilter;
import org.apache.accumulo.core.iterators.user.WholeRowIterator;
import org.apache.accumulo.core.metadata.MetadataTable;
import org.apache.accumulo.core.metadata.RootTable;
@@ -114,6 +115,7 @@ public class TabletsMetadata implements
Iterable<TabletMetadata>, AutoCloseable
private TableId tableId;
private ReadConsistency readConsistency = ReadConsistency.IMMEDIATE;
private final AccumuloClient _client;
+ private final List<TabletMetadataFilter> tabletMetadataFilters = new
ArrayList<>();
Builder(AccumuloClient client) {
this._client = client;
@@ -128,6 +130,22 @@ public class TabletsMetadata implements
Iterable<TabletMetadata>, AutoCloseable
return buildExtents(_client);
}
+ if (!tabletMetadataFilters.isEmpty()) {
+ checkState(!checkConsistency, "Can not check tablet consistency and
filter tablets");
+ if (!fetchedCols.isEmpty()) {
+ for (var filter : tabletMetadataFilters) {
+ // This defends against the case where the columns needed by the
filter were not
+ // fetched. For example, the following code only fetches the file
column and then
+ // configures the WAL filter which also needs the column for write
ahead logs.
+ //
ample.readTablets().forLevel(DataLevel.USER).fetch(ColumnType.FILES).filter(new
+ // HasWalsFilter()).build();
+ checkState(fetchedCols.containsAll(filter.getColumns()),
+ "%s needs cols %s however only %s were fetched",
filter.getClass().getSimpleName(),
+ filter.getColumns(), fetchedCols);
+ }
+ }
+ }
+
checkState((level == null) != (table == null),
"scanTable() cannot be used in conjunction with forLevel(),
forTable() or forTablet() %s %s",
level, table);
@@ -168,7 +186,16 @@ public class TabletsMetadata implements
Iterable<TabletMetadata>, AutoCloseable
scanner.setRanges(ranges);
configureColumns(scanner);
- IteratorSetting iterSetting = new IteratorSetting(100,
WholeRowIterator.class);
+ int iteratorPriority = 100;
+
+ for (TabletMetadataFilter tmf : tabletMetadataFilters) {
+ IteratorSetting iterSetting = new
IteratorSetting(iteratorPriority, tmf.getClass());
+ scanner.addScanIterator(iterSetting);
+ iteratorPriority++;
+ }
+
+ IteratorSetting iterSetting =
+ new IteratorSetting(iteratorPriority, WholeRowIterator.class);
scanner.addScanIterator(iterSetting);
Iterable<TabletMetadata> tmi = () ->
Iterators.transform(scanner.iterator(), entry -> {
@@ -239,6 +266,15 @@ public class TabletsMetadata implements
Iterable<TabletMetadata>, AutoCloseable
configureColumns(scanner);
Range range1 = scanner.getRange();
+ if (!tabletMetadataFilters.isEmpty()) {
+ int iteratorPriority = 100;
+ for (TabletMetadataFilter tmf : tabletMetadataFilters) {
+ iteratorPriority++;
+ IteratorSetting iterSetting = new
IteratorSetting(iteratorPriority, tmf.getClass());
+ scanner.addScanIterator(iterSetting);
+ }
+ }
+
Function<Range,Iterator<TabletMetadata>> iterFactory = r -> {
synchronized (scanner) {
scanner.setRange(r);
@@ -445,6 +481,12 @@ public class TabletsMetadata implements
Iterable<TabletMetadata>, AutoCloseable
return this;
}
+ @Override
+ public Options filter(TabletMetadataFilter filter) {
+ this.tabletMetadataFilters.add(filter);
+ return this;
+ }
+
@Override
public Options readConsistency(ReadConsistency readConsistency) {
this.readConsistency = Objects.requireNonNull(readConsistency);
@@ -475,6 +517,15 @@ public class TabletsMetadata implements
Iterable<TabletMetadata>, AutoCloseable
* {@link ReadConsistency#IMMEDIATE}
*/
Options readConsistency(ReadConsistency readConsistency);
+
+ /**
+ * Adds a filter to be applied while fetching the data. Filters are
applied in the order they
+ * are added. This method can be called multiple times to chain multiple
filters together. The
+ * first filter added has the highest priority and each subsequent filter
is applied with a
+ * sequentially lower priority. If columns needed by a filter are not
fetched then a runtime
+ * exception is thrown.
+ */
+ Options filter(TabletMetadataFilter filter);
}
public interface RangeOptions extends Options {
diff --git
a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
index 7e55a5274c..3c93187239 100644
---
a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
+++
b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
@@ -37,6 +37,7 @@ import java.util.UUID;
import org.apache.accumulo.core.gc.thrift.GCStatus;
import org.apache.accumulo.core.gc.thrift.GcCycleStats;
+import org.apache.accumulo.core.iterators.user.HasWalsFilter;
import org.apache.accumulo.core.metadata.TServerInstance;
import org.apache.accumulo.core.metadata.TabletState;
import org.apache.accumulo.core.metadata.schema.Ample.DataLevel;
@@ -83,11 +84,11 @@ public class GarbageCollectWriteAheadLogs {
this.liveServers = liveServers;
this.walMarker = new WalStateManager(context);
this.store = () -> Iterators.concat(
- context.getAmple().readTablets().forLevel(DataLevel.ROOT)
+ context.getAmple().readTablets().forLevel(DataLevel.ROOT).filter(new
HasWalsFilter())
.fetch(LOCATION, LAST, LOGS, PREV_ROW,
SUSPEND).checkConsistency().build().iterator(),
- context.getAmple().readTablets().forLevel(DataLevel.METADATA)
+
context.getAmple().readTablets().forLevel(DataLevel.METADATA).filter(new
HasWalsFilter())
.fetch(LOCATION, LAST, LOGS, PREV_ROW,
SUSPEND).checkConsistency().build().iterator(),
- context.getAmple().readTablets().forLevel(DataLevel.USER)
+ context.getAmple().readTablets().forLevel(DataLevel.USER).filter(new
HasWalsFilter())
.fetch(LOCATION, LAST, LOGS, PREV_ROW,
SUSPEND).checkConsistency().build().iterator());
}
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
index c03cb3241f..faea953da6 100644
---
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
+++
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
@@ -78,6 +78,7 @@ import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.dataImpl.thrift.TKeyExtent;
import org.apache.accumulo.core.fate.FateTxId;
import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter;
+import org.apache.accumulo.core.iterators.user.HasExternalCompactionsFilter;
import org.apache.accumulo.core.iteratorsImpl.system.SystemIteratorUtil;
import org.apache.accumulo.core.metadata.AbstractTabletFile;
import org.apache.accumulo.core.metadata.CompactableFileImpl;
@@ -1226,9 +1227,9 @@ public class CompactionCoordinator
}
protected Set<ExternalCompactionId> readExternalCompactionIds() {
- return
this.ctx.getAmple().readTablets().forLevel(Ample.DataLevel.USER).fetch(ECOMP).build()
- .stream().flatMap(tm -> tm.getExternalCompactions().keySet().stream())
- .collect(Collectors.toSet());
+ return this.ctx.getAmple().readTablets().forLevel(Ample.DataLevel.USER)
+ .filter(new
HasExternalCompactionsFilter()).fetch(ECOMP).build().stream()
+ .flatMap(tm ->
tm.getExternalCompactions().keySet().stream()).collect(Collectors.toSet());
}
/**
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/DeadCompactionDetector.java
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/DeadCompactionDetector.java
index 2c653158ad..479be63ae7 100644
---
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/DeadCompactionDetector.java
+++
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/DeadCompactionDetector.java
@@ -32,6 +32,7 @@ import java.util.stream.Collectors;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.dataImpl.KeyExtent;
+import org.apache.accumulo.core.iterators.user.HasExternalCompactionsFilter;
import org.apache.accumulo.core.metadata.schema.Ample.DataLevel;
import org.apache.accumulo.core.metadata.schema.ExternalCompactionId;
import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType;
@@ -75,10 +76,11 @@ public class DeadCompactionDetector {
log.debug("Starting to look for dead compactions");
Map<ExternalCompactionId,KeyExtent> tabletCompactions = new HashMap<>();
-
+ //
// find what external compactions tablets think are running
context.getAmple().readTablets().forLevel(DataLevel.USER)
- .fetch(ColumnType.ECOMP, ColumnType.PREV_ROW).build().forEach(tm -> {
+ .filter(new HasExternalCompactionsFilter()).fetch(ColumnType.ECOMP,
ColumnType.PREV_ROW)
+ .build().forEach(tm -> {
tm.getExternalCompactions().keySet().forEach(ecid -> {
tabletCompactions.put(ecid, tm.getExtent());
});
diff --git
a/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/tables/TablesResource.java
b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/tables/TablesResource.java
index ccc5eb7ca5..89879cfae5 100644
---
a/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/tables/TablesResource.java
+++
b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/tables/TablesResource.java
@@ -37,6 +37,7 @@ import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;
import org.apache.accumulo.core.data.TableId;
+import org.apache.accumulo.core.iterators.user.HasCurrentFilter;
import org.apache.accumulo.core.manager.state.tables.TableState;
import org.apache.accumulo.core.manager.thrift.ManagerMonitorInfo;
import org.apache.accumulo.core.manager.thrift.TableInfo;
@@ -143,16 +144,14 @@ public class TablesResource {
locs.add(rootTabletLocation);
} else {
var level = Ample.DataLevel.of(tableId);
- try (TabletsMetadata tablets =
-
monitor.getContext().getAmple().readTablets().forLevel(level).build()) {
+ try (TabletsMetadata tablets =
monitor.getContext().getAmple().readTablets().forLevel(level)
+ .filter(new HasCurrentFilter()).build()) {
for (TabletMetadata tm : tablets) {
- if (tm.hasCurrent()) {
- try {
- locs.add(tm.getLocation().getHostPort());
- } catch (Exception ex) {
- return tabletServers;
- }
+ try {
+ locs.add(tm.getLocation().getHostPort());
+ } catch (Exception ex) {
+ return tabletServers;
}
}
}
diff --git
a/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java
b/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java
index ebc86f97c8..53e34bd203 100644
---
a/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java
+++
b/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java
@@ -36,6 +36,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.util.ArrayList;
@@ -66,6 +67,9 @@ import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.fate.FateTxId;
+import org.apache.accumulo.core.iterators.user.HasCurrentFilter;
+import org.apache.accumulo.core.iterators.user.HasWalsFilter;
+import org.apache.accumulo.core.iterators.user.TabletMetadataFilter;
import org.apache.accumulo.core.metadata.MetadataTable;
import org.apache.accumulo.core.metadata.RootTable;
import org.apache.accumulo.core.metadata.StoredTabletFile;
@@ -81,14 +85,17 @@ import
org.apache.accumulo.core.metadata.schema.TabletMetadata.LocationType;
import org.apache.accumulo.core.metadata.schema.TabletMetadataBuilder;
import org.apache.accumulo.core.metadata.schema.TabletOperationId;
import org.apache.accumulo.core.metadata.schema.TabletOperationType;
+import org.apache.accumulo.core.metadata.schema.TabletsMetadata;
import org.apache.accumulo.core.security.TablePermission;
import org.apache.accumulo.core.tabletserver.log.LogEntry;
import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.accumulo.server.ServerContext;
import org.apache.accumulo.server.metadata.AsyncConditionalTabletsMutatorImpl;
import org.apache.accumulo.server.metadata.ConditionalTabletsMutatorImpl;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import com.google.common.collect.Sets;
@@ -151,6 +158,13 @@ public class AmpleConditionalWriterIT extends
AccumuloClusterHarness {
assertEquals(Location.future(ts1),
context.getAmple().readTablet(e1).getLocation());
+ try (TabletsMetadata tablets =
+ context.getAmple().readTablets().forTable(tid).filter(new
HasCurrentFilter()).build()) {
+ List<KeyExtent> actual =
+
tablets.stream().map(TabletMetadata::getExtent).collect(Collectors.toList());
+ assertEquals(List.of(), actual);
+ }
+
ctmi = new ConditionalTabletsMutatorImpl(context);
ctmi.mutateTablet(e1).requireAbsentOperation().requireLocation(Location.future(ts1))
.putLocation(Location.current(ts1)).deleteLocation(Location.future(ts1))
@@ -160,6 +174,13 @@ public class AmpleConditionalWriterIT extends
AccumuloClusterHarness {
assertEquals(Location.current(ts1),
context.getAmple().readTablet(e1).getLocation());
+ try (TabletsMetadata tablets =
+ context.getAmple().readTablets().forTable(tid).filter(new
HasCurrentFilter()).build()) {
+ List<KeyExtent> actual =
+
tablets.stream().map(TabletMetadata::getExtent).collect(Collectors.toList());
+ assertEquals(List.of(e1), actual);
+ }
+
ctmi = new ConditionalTabletsMutatorImpl(context);
ctmi.mutateTablet(e1).requireAbsentOperation().requireLocation(Location.future(ts1))
.putLocation(Location.current(ts1)).deleteLocation(Location.future(ts1))
@@ -851,6 +872,203 @@ public class AmpleConditionalWriterIT extends
AccumuloClusterHarness {
}
}
+ @Nested
+ public class TestFilter {
+
+ /**
+ * @param filters set of filters to apply to the readTablets operation
+ * @param expectedTablets set of tablets expected to be returned with the
filters applied
+ */
+ private void testFilterApplied(ServerContext context,
Set<TabletMetadataFilter> filters,
+ Set<KeyExtent> expectedTablets, String message) {
+ // test with just the needed columns fetched and then with all columns
fetched. both should
+ // yield the same result
+ addFiltersFetchAndAssert(context, filters, true, expectedTablets,
message);
+ addFiltersFetchAndAssert(context, filters, false, expectedTablets,
message);
+ }
+
+ private void addFiltersFetchAndAssert(ServerContext context,
Set<TabletMetadataFilter> filters,
+ boolean shouldFetchNeededCols, Set<KeyExtent> expectedTablets, String
message) {
+ TabletsMetadata.TableRangeOptions options =
context.getAmple().readTablets().forTable(tid);
+ // add the filter(s) to the operation before building
+ for (TabletMetadataFilter filter : filters) {
+ options.filter(filter);
+ // test fetching just the needed columns
+ if (shouldFetchNeededCols) {
+ for (TabletMetadata.ColumnType columnType : filter.getColumns()) {
+ options.fetch(columnType);
+ }
+ }
+ }
+ if (shouldFetchNeededCols) {
+ // if some columns were fetched, also need to fetch PREV_ROW in order
to use getExtent()
+ options.fetch(PREV_ROW);
+ }
+ try (TabletsMetadata tablets = options.build()) {
+ Set<KeyExtent> actual =
+
tablets.stream().map(TabletMetadata::getExtent).collect(Collectors.toSet());
+ assertEquals(expectedTablets, actual,
+ message + (shouldFetchNeededCols
+ ? ". Only needed columns were fetched in the readTablets
operation."
+ : ". All columns were fetched in the readTablets operation."));
+ }
+ }
+
+ @Test
+ public void multipleFilters() {
+ ServerContext context = cluster.getServerContext();
+ ConditionalTabletsMutatorImpl ctmi;
+
+ // make sure we read all tablets on table initially with no filters
+ testFilterApplied(context, Set.of(), Set.of(e1, e2, e3, e4),
+ "Initially, all tablets should be present");
+
+ String server = "server1+8555";
+
+ String walFilePath = java.nio.file.Path.of(server,
UUID.randomUUID().toString()).toString();
+ LogEntry wal = LogEntry.fromPath(walFilePath);
+
+ // add wal compact and flush ID to these tablets
+ final Set<KeyExtent> tabletsWithWalCompactFlush = Set.of(e1, e2, e3);
+ for (KeyExtent ke : tabletsWithWalCompactFlush) {
+ ctmi = new ConditionalTabletsMutatorImpl(context);
+ ctmi.mutateTablet(ke).requireAbsentOperation().putCompacted(34L)
+ .putFlushId(TestTabletMetadataFilter.VALID_FLUSH_ID).putWal(wal)
+ .submit(tabletMetadata -> false);
+ var results = ctmi.process();
+ assertEquals(Status.ACCEPTED, results.get(ke).getStatus());
+ }
+ // check that applying a combination of filters returns only tablets
that meet the criteria
+ testFilterApplied(context, Set.of(new TestTabletMetadataFilter(), new
HasWalsFilter()),
+ tabletsWithWalCompactFlush, "Combination of filters did not return
the expected tablets");
+
+ TServerInstance serverInstance = new TServerInstance(server, 1L);
+
+ // on a subset of the tablets, put a location
+ final Set<KeyExtent> tabletsWithLocation = Set.of(e2, e3, e4);
+ for (KeyExtent ke : tabletsWithLocation) {
+ ctmi = new ConditionalTabletsMutatorImpl(context);
+ ctmi.mutateTablet(ke).requireAbsentOperation().requireAbsentLocation()
+
.putLocation(Location.current(serverInstance)).submit(tabletMetadata -> false);
+ var results = ctmi.process();
+ assertEquals(Status.ACCEPTED, results.get(ke).getStatus());
+ assertEquals(Location.current(serverInstance),
+ context.getAmple().readTablet(ke).getLocation(),
+ "Did not see expected location after adding it");
+ }
+
+ // test that the new subset is returned with all 3 filters applied
+ Set<KeyExtent> expected = Sets.intersection(tabletsWithWalCompactFlush,
tabletsWithLocation);
+ assertFalse(expected.isEmpty());
+ testFilterApplied(context,
+ Set.of(new HasCurrentFilter(), new HasWalsFilter(), new
TestTabletMetadataFilter()),
+ expected, "Combination of filters did not return the expected
tablets");
+ }
+
+ @Test
+ public void testCompactedAndFlushIdFilter() {
+ ServerContext context = cluster.getServerContext();
+ ConditionalTabletsMutatorImpl ctmi = new
ConditionalTabletsMutatorImpl(context);
+ Set<TabletMetadataFilter> filter = Set.of(new
TestTabletMetadataFilter());
+
+ // make sure we read all tablets on table initially with no filters
+ testFilterApplied(context, Set.of(), Set.of(e1, e2, e3, e4),
+ "Initially, all tablets should be present");
+
+ // Set compacted on e2 but with no flush ID
+ ctmi.mutateTablet(e2).requireAbsentOperation().putCompacted(34L)
+ .submit(tabletMetadata -> false);
+ var results = ctmi.process();
+ assertEquals(Status.ACCEPTED, results.get(e2).getStatus());
+ testFilterApplied(context, filter, Set.of(),
+ "Compacted but no flush ID should return no tablets");
+
+ // Set incorrect flush ID on e2
+ ctmi = new ConditionalTabletsMutatorImpl(context);
+ ctmi.mutateTablet(e2).requireAbsentOperation().putFlushId(45L)
+ .submit(tabletMetadata -> false);
+ results = ctmi.process();
+ assertEquals(Status.ACCEPTED, results.get(e2).getStatus());
+ testFilterApplied(context, filter, Set.of(),
+ "Compacted with incorrect flush ID should return no tablets");
+
+ // Set correct flush ID on e2
+ ctmi = new ConditionalTabletsMutatorImpl(context);
+ ctmi.mutateTablet(e2).requireAbsentOperation()
+
.putFlushId(TestTabletMetadataFilter.VALID_FLUSH_ID).submit(tabletMetadata ->
false);
+ results = ctmi.process();
+ assertEquals(Status.ACCEPTED, results.get(e2).getStatus());
+ testFilterApplied(context, filter, Set.of(e2),
+ "Compacted with correct flush ID should return e2");
+
+ // Set compacted and correct flush ID on e3
+ ctmi = new ConditionalTabletsMutatorImpl(context);
+ ctmi.mutateTablet(e3).requireAbsentOperation().putCompacted(987L)
+
.putFlushId(TestTabletMetadataFilter.VALID_FLUSH_ID).submit(tabletMetadata ->
false);
+ results = ctmi.process();
+ assertEquals(Status.ACCEPTED, results.get(e3).getStatus());
+ testFilterApplied(context, filter, Set.of(e2, e3),
+ "Compacted with correct flush ID should return e2 and e3");
+ }
+
+ @Test
+ public void walFilter() {
+ ServerContext context = cluster.getServerContext();
+ ConditionalTabletsMutatorImpl ctmi = new
ConditionalTabletsMutatorImpl(context);
+ Set<TabletMetadataFilter> filter = Set.of(new HasWalsFilter());
+
+ // make sure we read all tablets on table initially with no filters
+ testFilterApplied(context, Set.of(), Set.of(e1, e2, e3, e4),
+ "Initially, all tablets should be present");
+
+ // add a wal to e2
+ String walFilePath =
+ java.nio.file.Path.of("tserver+8080",
UUID.randomUUID().toString()).toString();
+ LogEntry wal = LogEntry.fromPath(walFilePath);
+
ctmi.mutateTablet(e2).requireAbsentOperation().putWal(wal).submit(tabletMetadata
-> false);
+ var results = ctmi.process();
+ assertEquals(Status.ACCEPTED, results.get(e2).getStatus());
+
+ // test that the filter works
+ testFilterApplied(context, filter, Set.of(e2), "Only tablets with wals
should be returned");
+
+ // add wal to tablet e4
+ ctmi = new ConditionalTabletsMutatorImpl(context);
+ walFilePath = java.nio.file.Path.of("tserver+8080",
UUID.randomUUID().toString()).toString();
+ wal = LogEntry.fromPath(walFilePath);
+
ctmi.mutateTablet(e4).requireAbsentOperation().putWal(wal).submit(tabletMetadata
-> false);
+ results = ctmi.process();
+ assertEquals(Status.ACCEPTED, results.get(e4).getStatus());
+
+ // now, when using the wal filter, should see both e2 and e4
+ testFilterApplied(context, filter, Set.of(e2, e4),
+ "Only tablets with wals should be returned");
+
+ // remove the wal from e4
+ ctmi = new ConditionalTabletsMutatorImpl(context);
+
ctmi.mutateTablet(e4).requireAbsentOperation().deleteWal(wal).submit(tabletMetadata
-> false);
+ results = ctmi.process();
+ assertEquals(Status.ACCEPTED, results.get(e4).getStatus());
+
+ // test that now only the tablet with a wal is returned when using
filter()
+ testFilterApplied(context, filter, Set.of(e2), "Only tablets with wals
should be returned");
+ }
+
+ @Test
+ public void partialFetch() {
+ ServerContext context = cluster.getServerContext();
+ TestTabletMetadataFilter filter = new TestTabletMetadataFilter();
+ // if we only fetch some columns needed by the filter, we should get an
exception
+ TabletsMetadata.Options options =
+
context.getAmple().readTablets().forTable(tid).fetch(FLUSH_ID).filter(filter);
+ var ise = assertThrows(IllegalStateException.class, options::build);
+ String expectedMsg = String.format("%s needs cols %s however only %s
were fetched",
+ TestTabletMetadataFilter.class.getSimpleName(), filter.getColumns(),
Set.of(FLUSH_ID));
+ assertTrue(ise.getMessage().contains(expectedMsg));
+ }
+
+ }
+
@Test
public void testFlushId() {
try (AccumuloClient c =
Accumulo.newClient().from(getClientProps()).build()) {
diff --git
a/test/src/main/java/org/apache/accumulo/test/functional/TestTabletMetadataFilter.java
b/test/src/main/java/org/apache/accumulo/test/functional/TestTabletMetadataFilter.java
new file mode 100644
index 0000000000..1847f7d941
--- /dev/null
+++
b/test/src/main/java/org/apache/accumulo/test/functional/TestTabletMetadataFilter.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import java.util.OptionalLong;
+import java.util.Set;
+import java.util.function.Predicate;
+
+import org.apache.accumulo.core.iterators.user.TabletMetadataFilter;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata;
+
+import com.google.common.collect.Sets;
+
+/**
+ * A filter constructed to test filtering in ample. This filter only allows
tablets with compacted
+ * and with a flush ID.
+ */
+public class TestTabletMetadataFilter extends TabletMetadataFilter {
+
+ public static final long VALID_FLUSH_ID = 44L;
+
+ public static final Set<TabletMetadata.ColumnType> COLUMNS = Sets
+ .immutableEnumSet(TabletMetadata.ColumnType.COMPACTED,
TabletMetadata.ColumnType.FLUSH_ID);
+
+ private final static Predicate<TabletMetadata> TEST =
+ tabletMetadata -> !tabletMetadata.getCompacted().isEmpty()
+ &&
tabletMetadata.getFlushId().equals(OptionalLong.of(VALID_FLUSH_ID));
+
+ @Override
+ public Set<TabletMetadata.ColumnType> getColumns() {
+ return COLUMNS;
+ }
+
+ @Override
+ protected Predicate<TabletMetadata> acceptTablet() {
+ return TEST;
+ }
+}