This is an automated email from the ASF dual-hosted git repository.
kturner pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/2.1 by this push:
new bb9a967f56 Cleans up non-existent migrating metadata tablets in
manager (#4750)
bb9a967f56 is described below
commit bb9a967f5608986ef92d1299426c437750dfb7aa
Author: Keith Turner <[email protected]>
AuthorDate: Wed Jul 24 12:06:11 2024 -0700
Cleans up non-existent migrating metadata tablets in manager (#4750)
Fixes #4475
---
.../java/org/apache/accumulo/manager/Manager.java | 82 ++++++++++++----------
1 file changed, 45 insertions(+), 37 deletions(-)
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
index d61c43a491..514c0271d4 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
@@ -32,6 +32,7 @@ import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.EnumMap;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@@ -54,18 +55,14 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.apache.accumulo.core.Constants;
-import org.apache.accumulo.core.client.AccumuloClient;
-import org.apache.accumulo.core.client.Scanner;
-import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.clientImpl.ClientContext;
import org.apache.accumulo.core.clientImpl.thrift.TableOperation;
import org.apache.accumulo.core.clientImpl.thrift.TableOperationExceptionType;
import
org.apache.accumulo.core.clientImpl.thrift.ThriftTableOperationException;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.InstanceId;
-import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.TableId;
-import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.fate.AgeOffStore;
import org.apache.accumulo.core.fate.Fate;
@@ -96,12 +93,10 @@ import org.apache.accumulo.core.metadata.TServerInstance;
import org.apache.accumulo.core.metadata.TabletLocationState;
import org.apache.accumulo.core.metadata.TabletState;
import org.apache.accumulo.core.metadata.schema.Ample.DataLevel;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema;
-import
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata;
import org.apache.accumulo.core.metrics.MetricsInfo;
import org.apache.accumulo.core.metrics.MetricsProducer;
import org.apache.accumulo.core.replication.thrift.ReplicationCoordinator;
-import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.spi.balancer.BalancerEnvironment;
import org.apache.accumulo.core.spi.balancer.SimpleLoadBalancer;
import org.apache.accumulo.core.spi.balancer.TabletBalancer;
@@ -712,21 +707,34 @@ public class Manager extends AbstractServer
* migration will refer to a non-existing tablet, so it can never
complete. Periodically scan
* the metadata table and remove any migrating tablets that no longer
exist.
*/
- private void cleanupNonexistentMigrations(final AccumuloClient
accumuloClient)
- throws TableNotFoundException {
- Scanner scanner = accumuloClient.createScanner(MetadataTable.NAME,
Authorizations.EMPTY);
- TabletColumnFamily.PREV_ROW_COLUMN.fetch(scanner);
- scanner.setRange(MetadataSchema.TabletsSection.getRange());
- Set<KeyExtent> notSeen;
+ private void cleanupNonexistentMigrations(final ClientContext
clientContext) {
+
+ Map<DataLevel,Set<KeyExtent>> notSeen;
+
synchronized (migrations) {
- notSeen = new HashSet<>(migrations.keySet());
+ notSeen = partitionMigrations(migrations.keySet());
}
- for (Entry<Key,Value> entry : scanner) {
- KeyExtent extent = KeyExtent.fromMetaPrevRow(entry);
- notSeen.remove(extent);
+
+ // for each level find the set of migrating tablets that do not exists
in metadata store
+ for (DataLevel dataLevel : DataLevel.values()) {
+ var notSeenForLevel = notSeen.getOrDefault(dataLevel, Set.of());
+ if (notSeenForLevel.isEmpty() || dataLevel == DataLevel.ROOT) {
+ // No need to scan this level if there are no migrations. The root
tablet is always
+ // expected to exists, so no need to read its metadata.
+ continue;
+ }
+
+ try (var tablets =
clientContext.getAmple().readTablets().forLevel(dataLevel)
+ .fetch(TabletMetadata.ColumnType.PREV_ROW).build()) {
+ // A goal of this code is to avoid reading all extents in the
metadata table into memory
+ // when finding extents that exists in the migrating set and not in
the metadata table.
+ tablets.forEach(tabletMeta ->
notSeenForLevel.remove(tabletMeta.getExtent()));
+ }
+
+ // remove any tablets that previously existed in migrations for this
level but were not seen
+ // in the metadata table for the level
+ migrations.keySet().removeAll(notSeenForLevel);
}
- // remove tablets that used to be in migrations and were not seen in the
metadata table
- migrations.keySet().removeAll(notSeen);
}
/**
@@ -787,6 +795,23 @@ public class Manager extends AbstractServer
}
+ /**
+ * balanceTablets() balances tables by DataLevel. Return the current set of
migrations partitioned
+ * by DataLevel
+ */
+ private static Map<DataLevel,Set<KeyExtent>>
+ partitionMigrations(final Set<KeyExtent> migrations) {
+ final Map<DataLevel,Set<KeyExtent>> partitionedMigrations = new
EnumMap<>(DataLevel.class);
+ // populate to prevent NPE
+ for (DataLevel dl : DataLevel.values()) {
+ partitionedMigrations.put(dl, new HashSet<>());
+ }
+ migrations.forEach(ke -> {
+ partitionedMigrations.get(DataLevel.of(ke.tableId())).add(ke);
+ });
+ return partitionedMigrations;
+ }
+
private class StatusThread implements Runnable {
private boolean goodStats() {
@@ -957,23 +982,6 @@ public class Manager extends AbstractServer
}
}
- /**
- * balanceTablets() balances tables by DataLevel. Return the current set
of migrations
- * partitioned by DataLevel
- */
- private Map<DataLevel,Set<KeyExtent>> partitionMigrations(final
Set<KeyExtent> migrations) {
- final Map<DataLevel,Set<KeyExtent>> partitionedMigrations =
- new HashMap<>(DataLevel.values().length);
- // populate to prevent NPE
- for (DataLevel dl : DataLevel.values()) {
- partitionedMigrations.put(dl, new HashSet<>());
- }
- migrations.forEach(ke -> {
- partitionedMigrations.get(DataLevel.of(ke.tableId())).add(ke);
- });
- return partitionedMigrations;
- }
-
/**
* Given the current tserverStatus map and a DataLevel, return a view of
the tserverStatus map
* that only contains entries for tables in the DataLevel