This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git
commit f2d82e166a8a7cd3f4e6d1feb60249d77d312c4b Merge: d618018611 c5877f846e Author: Keith Turner <[email protected]> AuthorDate: Tue Mar 3 20:06:12 2026 +0000 Merge branch '2.1' .../org/apache/accumulo/core/data/LoadPlan.java | 67 ++++++++++++++++++++-- .../core/client/rfile/RFileClientTest.java | 41 +++++++++++++ .../apache/accumulo/core/crypto/CryptoTest.java | 21 +++++++ .../java/org/apache/accumulo/manager/Manager.java | 25 +++----- .../org/apache/accumulo/manager/ManagerTest.java | 66 +++++++++++++++++++++ 5 files changed, 200 insertions(+), 20 deletions(-) diff --cc core/src/main/java/org/apache/accumulo/core/data/LoadPlan.java index a2dabffd95,a874ae952e..538ac77f8c --- a/core/src/main/java/org/apache/accumulo/core/data/LoadPlan.java +++ b/core/src/main/java/org/apache/accumulo/core/data/LoadPlan.java @@@ -37,7 -38,14 +37,16 @@@ import java.util.stream.Collectors import org.apache.accumulo.core.client.admin.TableOperations.ImportMappingOptions; import org.apache.accumulo.core.client.rfile.RFile; import org.apache.accumulo.core.clientImpl.bulk.BulkImport; -import org.apache.accumulo.core.conf.SiteConfiguration; ++import org.apache.accumulo.core.conf.ConfigurationCopy; + import org.apache.accumulo.core.crypto.CryptoFactoryLoader; import org.apache.accumulo.core.dataImpl.KeyExtent; + import org.apache.accumulo.core.file.FileOperations; ++import org.apache.accumulo.core.metadata.UnreferencedTabletFile; + import org.apache.accumulo.core.spi.crypto.CryptoEnvironment; + import org.apache.accumulo.core.spi.crypto.CryptoService; ++import org.apache.accumulo.core.util.RowRangeUtil; + import org.apache.hadoop.conf.Configuration; + import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; @@@ -532,24 -526,40 +549,66 @@@ public class LoadPlan } } + /** + * Computes a load plan for a rfile based on the minimum and maximum row present across all + * locality groups. + * + * @param properties used when opening the rfile, see + * {@link org.apache.accumulo.core.client.rfile.RFile.ScannerOptions#withTableProperties(Map)} + * + * @return a load plan of type {@link RangeType#FILE} + * @since 2.1.5 + */ + public static LoadPlan compute(URI file, Map<String,String> properties) throws IOException { + var path = new Path(file); + var conf = new Configuration(); + var fs = FileSystem.get(path.toUri(), conf); + CryptoService cs = + CryptoFactoryLoader.getServiceForClient(CryptoEnvironment.Scope.TABLE, properties); - var tableConf = SiteConfiguration.empty().withOverrides(properties).build(); ++ var tableConf = new ConfigurationCopy(properties); ++ var tabletFile = UnreferencedTabletFile.of(fs, path); + try (var reader = FileOperations.getInstance().newReaderBuilder() - .forFile(file.toString(), fs, conf, cs).withTableConfiguration(tableConf).build();) { - var firstRow = reader.getFirstKey().getRow(); - var lastRow = reader.getLastKey().getRow(); ++ .forFile(tabletFile, fs, conf, cs).withTableConfiguration(tableConf).build()) { ++ var fileRange = reader.getFileRange(); ++ var rowRange = RowRangeUtil.toRowRange(fileRange.rowRange); ++ Preconditions ++ .checkState(rowRange.isLowerBoundInclusive() && rowRange.isUpperBoundInclusive()); ++ var firstRow = rowRange.getLowerBound(); ++ var lastRow = rowRange.getUpperBound(); + return LoadPlan.builder().loadFileTo(path.getName(), RangeType.FILE, firstRow, lastRow) + .build(); + } + } + + /** + * Computes a load plan for a rfile based on the minimum and maximum row present across all + * locality groups. + * + * @return a load plan of type {@link RangeType#FILE} + * @since 2.1.5 + */ + public static LoadPlan compute(URI file) throws IOException { + return compute(file, Map.of()); + } ++ + @Override + public int hashCode() { + return Objects.hash(destinations); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + LoadPlan other = (LoadPlan) obj; + return Objects.equals(destinations, other.destinations); + } + } diff --cc core/src/test/java/org/apache/accumulo/core/crypto/CryptoTest.java index ae9cc8e4a6,a27e99a227..b7833bedeb --- a/core/src/test/java/org/apache/accumulo/core/crypto/CryptoTest.java +++ b/core/src/test/java/org/apache/accumulo/core/crypto/CryptoTest.java @@@ -49,9 -48,11 +49,12 @@@ import java.util.Collection import java.util.HashMap; import java.util.List; import java.util.Map; + import java.util.TreeSet; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executors; import java.util.concurrent.Future; + import java.util.stream.Collectors; + import java.util.stream.Stream; import javax.crypto.Cipher; import javax.crypto.NoSuchPaddingException; diff --cc server/manager/src/main/java/org/apache/accumulo/manager/Manager.java index 3316578756,78ba9542d8..245ef6a21a --- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java @@@ -149,14 -156,10 +149,15 @@@ import org.apache.zookeeper.Watcher import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.base.Preconditions; +import com.google.common.collect.Comparators; import com.google.common.collect.ImmutableSortedMap; +import com.google.common.collect.Maps; ++import com.google.common.net.HostAndPort; import com.google.common.util.concurrent.RateLimiter; +import com.google.common.util.concurrent.Uninterruptibles; -import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import io.micrometer.core.instrument.MeterRegistry; import io.opentelemetry.api.trace.Span; import io.opentelemetry.context.Scope; @@@ -1485,26 -1744,33 +1486,18 @@@ public class Manager extends AbstractSe serversToShutdown.retainAll(current.getCurrentServers()); } - private static void cleanListByHostAndPort(Collection<TServerInstance> badServers, + static void cleanListByHostAndPort(Collection<TServerInstance> badServers, Set<TServerInstance> deleted, Set<TServerInstance> added) { - Iterator<TServerInstance> badIter = badServers.iterator(); - while (badIter.hasNext()) { - TServerInstance bad = badIter.next(); - for (TServerInstance add : added) { - if (bad.getHostPort().equals(add.getHostPort())) { - badIter.remove(); - break; - } - } - for (TServerInstance del : deleted) { - if (bad.getHostPort().equals(del.getHostPort())) { - badIter.remove(); - break; - } - } + if (badServers.isEmpty() || (deleted.isEmpty() && added.isEmpty())) { + // nothing to do + return; } + HashSet<HostAndPort> removalSet = new HashSet<>(deleted.size() + added.size()); + deleted.forEach(tsi -> removalSet.add(tsi.getHostAndPort())); + added.forEach(tsi -> removalSet.add(tsi.getHostAndPort())); + badServers.removeIf(badServer -> removalSet.contains(badServer.getHostAndPort())); } - @Override - public void stateChanged(TableId tableId, TableState state) { - nextEvent.event("Table state in zookeeper changed for %s to %s", tableId, state); - if (state == TableState.OFFLINE) { - clearMigrations(tableId); - } - } - - @Override - public void initialize() {} - - @Override - public void sessionExpired() {} - - @Override public Set<TableId> onlineTables() { Set<TableId> result = new HashSet<>(); if (getManagerState() != ManagerState.NORMAL) {
