This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch elasticity
in repository https://gitbox.apache.org/repos/asf/accumulo.git

commit 9c733bd2db2d0eb2d77b9fe80f1656216a324625
Merge: 0a5845e6d6 afe5654613
Author: Keith Turner <[email protected]>
AuthorDate: Tue May 9 16:16:01 2023 -0400

    Merge branch 'main' into elasticity

 .../core/clientImpl/ClientTabletCacheImpl.java     |   6 +-
 .../TabletServerBatchReaderIterator.java           |  10 +-
 .../accumulo/core/metadata/schema/Ample.java       |   4 +-
 .../core/metadata/schema/TabletMetadata.java       |  10 ++
 .../core/metadata/schema/TabletsMetadata.java      |  61 +++++++++---
 .../manager/state/AbstractTabletStateStore.java    |   8 +-
 .../accumulo/server/manager/state/Assignment.java  |  14 ++-
 .../server/manager/state/UnassignedTablet.java     |  84 +++++++++++++++++
 .../metadata/ConditionalTabletsMutatorImpl.java    |   4 +-
 .../accumulo/server/metadata/ServerAmpleImpl.java  |   4 +-
 .../accumulo/server/util/ManagerMetadataUtil.java  |  14 +--
 .../manager/state/RootTabletStateStoreTest.java    |   4 +-
 .../server/util/ManagerMetadataUtilTest.java       | 102 +++++++++++++++++++++
 .../accumulo/coordinator/CompactionFinalizer.java  |   6 +-
 .../java/org/apache/accumulo/manager/Manager.java  |   9 +-
 .../accumulo/manager/TabletGroupWatcher.java       |  51 ++++++-----
 .../manager/tableOps/bulkVer2/BulkInfo.java        |   4 +
 .../tableOps/bulkVer2/CleanUpBulkImport.java       |  16 +++-
 .../manager/tableOps/bulkVer2/PrepBulkImport.java  |  30 +++++-
 .../tableOps/bulkVer2/PrepBulkImportTest.java      |  12 ++-
 .../apache/accumulo/tserver/AssignmentHandler.java |   3 +-
 .../org/apache/accumulo/tserver/ScanServer.java    |   5 +-
 .../org/apache/accumulo/tserver/TabletServer.java  |  15 ++-
 .../java/org/apache/accumulo/test/AmpleIT.java     | 100 ++++++++++++++++++++
 .../accumulo/test/functional/SplitRecoveryIT.java  |   2 +-
 .../apache/accumulo/test/manager/MergeStateIT.java |   2 +-
 .../accumulo/test/performance/NullTserver.java     |   2 +-
 27 files changed, 502 insertions(+), 80 deletions(-)

diff --cc 
core/src/main/java/org/apache/accumulo/core/clientImpl/ClientTabletCacheImpl.java
index e5119e25e3,dafada9132..76c98c0cdf
--- 
a/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientTabletCacheImpl.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientTabletCacheImpl.java
@@@ -33,6 -32,6 +33,7 @@@ import java.util.Iterator
  import java.util.List;
  import java.util.Map;
  import java.util.Map.Entry;
++import java.util.Optional;
  import java.util.SortedMap;
  import java.util.TreeMap;
  import java.util.TreeSet;
@@@ -554,94 -500,28 +555,95 @@@ public class ClientTabletCacheImpl exte
        timer = new OpTimer().start();
      }
  
 -    while (true) {
 +    LockCheckerSession lcSession = new LockCheckerSession();
 +    CachedTablet tl = _findTablet(context, row, skipRow, false, true, 
lcSession, locationNeed);
  
 -      LockCheckerSession lcSession = new LockCheckerSession();
 -      TabletLocation tl = _locateTablet(context, row, skipRow, retry, true, 
lcSession);
 +    if (timer != null) {
 +      timer.stop();
 +      log.trace("tid={} Located tablet {} at {} in {}", 
Thread.currentThread().getId(),
 +          (tl == null ? "null" : tl.getExtent()), (tl == null ? "null" : 
tl.getTserverLocation()),
 +          String.format("%.3f secs", timer.scale(SECONDS)));
 +    }
  
 -      if (retry && tl == null) {
 -        sleepUninterruptibly(100, MILLISECONDS);
 -        if (log.isTraceEnabled()) {
 -          log.trace("Failed to locate tablet containing row {} in table {}, 
will retry...",
 -              TextUtil.truncate(row), tableId);
 -        }
 -        continue;
 +    if (tl != null && locationNeed == LocationNeed.REQUIRED && 
tl.getTserverLocation().isEmpty()) {
 +      requestTabletHosting(context, List.of(tl.getExtent()));
 +      return null;
 +    }
 +
 +    return tl;
 +
 +  }
 +
 +  @Override
 +  public long getTabletHostingRequestCount() {
 +    return tabletHostingRequestCount.get();
 +  }
 +
 +  @VisibleForTesting
 +  public void resetTabletHostingRequestCount() {
 +    tabletHostingRequestCount.set(0);
 +  }
 +
 +  @VisibleForTesting
 +  public void enableTabletHostingRequests(boolean enabled) {
 +    HOSTING_ENABLED.set(enabled);
 +  }
 +
 +  private void requestTabletHosting(ClientContext context,
 +      Collection<KeyExtent> extentsWithNoLocation) throws AccumuloException,
 +      AccumuloSecurityException, TableNotFoundException, 
InvalidTabletHostingRequestException {
 +
 +    if (!HOSTING_ENABLED.get()) {
 +      return;
 +    }
 +
 +    // System tables should always be hosted
 +    if (RootTable.ID == tableId || MetadataTable.ID == tableId) {
 +      return;
 +    }
 +
 +    if (extentsWithNoLocation.isEmpty()) {
 +      return;
 +    }
 +
 +    if (context.getTableState(tableId) != TableState.ONLINE) {
 +      log.trace("requestTabletHosting: table {} is not online", tableId);
 +      return;
 +    }
 +
 +    List<KeyExtent> extentsToLookup = new ArrayList<>();
 +    for (var extent : extentsWithNoLocation) {
 +      if (recentOndemandRequest.asMap().putIfAbsent(extent, 
System.currentTimeMillis()) == null) {
 +        extentsToLookup.add(extent);
 +        log.debug("Marking tablet as onDemand: {}", extent);
        }
 +    }
 +
 +    List<TKeyExtent> extentsToBringOnline = new ArrayList<>();
 +
-     try (TabletsMetadata tm = 
context.getAmple().readTablets().forTablets(extentsToLookup)
-         .fetch(HOSTING_REQUESTED, HOSTING_GOAL).build()) {
++    try (TabletsMetadata tm =
++        context.getAmple().readTablets().forTablets(extentsToLookup, 
Optional.empty())
++            .fetch(HOSTING_REQUESTED, HOSTING_GOAL).build()) {
  
 -      if (timer != null) {
 -        timer.stop();
 -        log.trace("tid={} Located tablet {} at {} in {}", 
Thread.currentThread().getId(),
 -            (tl == null ? "null" : tl.getExtent()), (tl == null ? "null" : 
tl.getTserverLocation()),
 -            String.format("%.3f secs", timer.scale(SECONDS)));
 +      for (TabletMetadata tabletMetadata : tm) {
 +        if (tabletMetadata.getHostingGoal() == TabletHostingGoal.ONDEMAND
 +            && !tabletMetadata.getHostingRequested()) {
 +          extentsToBringOnline.add(tabletMetadata.getExtent().toThrift());
 +        }
 +
 +        if (tabletMetadata.getHostingGoal() == TabletHostingGoal.NEVER) {
 +          throw new InvalidTabletHostingRequestException("Extent " + 
tabletMetadata.getExtent()
 +              + " has a tablet hosting goal state " + 
TabletHostingGoal.NEVER);
 +        }
        }
 +    }
  
 -      return tl;
 +    if (!extentsToBringOnline.isEmpty()) {
 +      log.debug("Requesting tablets be hosted: {}", extentsToBringOnline);
 +      ThriftClientTypes.TABLET_MGMT.executeVoid(context,
 +          client -> client.requestTabletHosting(TraceUtil.traceInfo(), 
context.rpcCreds(),
 +              tableId.canonical(), extentsToBringOnline));
 +      tabletHostingRequestCount.addAndGet(extentsToBringOnline.size());
      }
    }
  
diff --cc 
core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReaderIterator.java
index 1f85991a72,a2b549139b..07a83586a2
--- 
a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReaderIterator.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReaderIterator.java
@@@ -254,10 -252,10 +257,14 @@@ public class TabletServerBatchReaderIte
  
      int lastFailureSize = Integer.MAX_VALUE;
  
+     Retry retry = Retry.builder().infiniteRetries().retryAfter(100, 
MILLISECONDS)
+         .incrementBy(100, MILLISECONDS).maxWait(10, 
SECONDS).backOffFactor(1.07)
+         .logInterval(1, MINUTES).createFactory().createRetry();
+ 
 +    ScanServerData ssd;
 +
 +    long startTime = System.currentTimeMillis();
 +
      while (true) {
  
        binnedRanges.clear();
@@@ -295,14 -282,8 +302,14 @@@
                failures.size());
          }
  
 +        if (System.currentTimeMillis() - startTime > retryTimeout) {
 +          // TODO exception used for timeout is inconsistent
 +          throw new TimedOutException(
 +              "Failed to find servers to process scans before timeout was 
exceeded.");
 +        }
 +
          try {
-           Thread.sleep(100);
+           retry.waitForNextAttempt(log, "binRanges retry failures");
          } catch (InterruptedException e) {
            throw new RuntimeException(e);
          }
diff --cc 
server/base/src/main/java/org/apache/accumulo/server/manager/state/AbstractTabletStateStore.java
index f1636ea882,0000000000..9e519273f8
mode 100644,000000..100644
--- 
a/server/base/src/main/java/org/apache/accumulo/server/manager/state/AbstractTabletStateStore.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/manager/state/AbstractTabletStateStore.java
@@@ -1,174 -1,0 +1,174 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   https://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.apache.accumulo.server.manager.state;
 +
 +import java.util.Collection;
 +import java.util.List;
 +import java.util.Map;
 +
 +import org.apache.accumulo.core.client.ConditionalWriter;
 +import org.apache.accumulo.core.clientImpl.ClientContext;
 +import org.apache.accumulo.core.dataImpl.KeyExtent;
 +import org.apache.accumulo.core.metadata.TServerInstance;
 +import org.apache.accumulo.core.metadata.TabletLocationState;
 +import org.apache.accumulo.core.metadata.schema.Ample;
 +import org.apache.accumulo.core.metadata.schema.TabletMetadata;
 +import org.apache.accumulo.core.tabletserver.log.LogEntry;
 +import org.apache.accumulo.server.util.ManagerMetadataUtil;
 +import org.apache.hadoop.fs.Path;
 +
 +import com.google.common.base.Preconditions;
 +
 +public abstract class AbstractTabletStateStore implements TabletStateStore {
 +
 +  private final ClientContext context;
 +  private final Ample ample;
 +
 +  protected AbstractTabletStateStore(ClientContext context) {
 +    this.context = context;
 +    this.ample = context.getAmple();
 +  }
 +
 +  @Override
 +  public void setLocations(Collection<Assignment> assignments) throws 
DistributedStoreException {
 +    try (var tabletsMutator = ample.conditionallyMutateTablets()) {
 +      for (Assignment assignment : assignments) {
 +        var conditionalMutator = 
tabletsMutator.mutateTablet(assignment.tablet)
 +            .requireAbsentOperation()
 +            
.requireLocation(TabletMetadata.Location.future(assignment.server))
 +            .putLocation(TabletMetadata.Location.current(assignment.server))
 +            
.deleteLocation(TabletMetadata.Location.future(assignment.server)).deleteSuspension();
 +
-         ManagerMetadataUtil.updateLastForAssignmentMode(context, ample, 
conditionalMutator,
-             assignment.tablet, assignment.server);
++        ManagerMetadataUtil.updateLastForAssignmentMode(context, 
conditionalMutator,
++            assignment.server, assignment.lastLocation);
 +
 +        conditionalMutator.submit(tabletMetadata -> {
 +          
Preconditions.checkArgument(tabletMetadata.getExtent().equals(assignment.tablet));
 +          // see if we are the current location, if so then the unknown 
mutation actually
 +          // succeeded
 +          return tabletMetadata.getLocation() != null && 
tabletMetadata.getLocation()
 +              .equals(TabletMetadata.Location.current(assignment.server));
 +        });
 +      }
 +
 +      if (tabletsMutator.process().values().stream()
 +          .anyMatch(result -> result.getStatus() != 
ConditionalWriter.Status.ACCEPTED)) {
 +        // TODO should this look at why?
 +        throw new DistributedStoreException(
 +            "failed to set tablet location, conditional mutation failed");
 +      }
 +    } catch (RuntimeException ex) {
 +      throw new DistributedStoreException(ex);
 +    }
 +  }
 +
 +  @Override
 +  public void setFutureLocations(Collection<Assignment> assignments)
 +      throws DistributedStoreException {
 +    try (var tabletsMutator = ample.conditionallyMutateTablets()) {
 +      for (Assignment assignment : assignments) {
 +        
tabletsMutator.mutateTablet(assignment.tablet).requireAbsentOperation()
 +            .requireAbsentLocation().deleteSuspension()
 +            .putLocation(TabletMetadata.Location.future(assignment.server))
 +            .submit(tabletMetadata -> {
 +              
Preconditions.checkArgument(tabletMetadata.getExtent().equals(assignment.tablet));
 +              // see if we are the future location, if so then the unknown 
mutation actually
 +              // succeeded
 +              return tabletMetadata.getLocation() != null && 
tabletMetadata.getLocation()
 +                  .equals(TabletMetadata.Location.future(assignment.server));
 +            });
 +      }
 +
 +      if (tabletsMutator.process().values().stream()
 +          .anyMatch(result -> result.getStatus() != 
ConditionalWriter.Status.ACCEPTED)) {
 +        // TODO should this look at why?
 +        throw new DistributedStoreException(
 +            "failed to set tablet location, conditional mutation failed");
 +      }
 +
 +    } catch (RuntimeException ex) {
 +      throw new DistributedStoreException(ex);
 +    }
 +  }
 +
 +  @Override
 +  public void unassign(Collection<TabletLocationState> tablets,
 +      Map<TServerInstance,List<Path>> logsForDeadServers) throws 
DistributedStoreException {
 +    unassign(tablets, logsForDeadServers, -1);
 +  }
 +
 +  @Override
 +  public void suspend(Collection<TabletLocationState> tablets,
 +      Map<TServerInstance,List<Path>> logsForDeadServers, long 
suspensionTimestamp)
 +      throws DistributedStoreException {
 +    unassign(tablets, logsForDeadServers, suspensionTimestamp);
 +  }
 +
 +  protected abstract void processSuspension(Ample.ConditionalTabletMutator 
tabletMutator,
 +      TabletLocationState tls, long suspensionTimestamp);
 +
 +  private void unassign(Collection<TabletLocationState> tablets,
 +      Map<TServerInstance,List<Path>> logsForDeadServers, long 
suspensionTimestamp)
 +      throws DistributedStoreException {
 +    try (var tabletsMutator = ample.conditionallyMutateTablets()) {
 +      for (TabletLocationState tls : tablets) {
 +        var tabletMutator = 
tabletsMutator.mutateTablet(tls.extent).requireAbsentOperation();
 +
 +        if (tls.hasCurrent()) {
 +          tabletMutator.requireLocation(tls.current);
 +
-           ManagerMetadataUtil.updateLastForAssignmentMode(context, ample, 
tabletMutator, tls.extent,
-               tls.current.getServerInstance());
++          ManagerMetadataUtil.updateLastForAssignmentMode(context, 
tabletMutator,
++              tls.current.getServerInstance(), tls.last);
 +          tabletMutator.deleteLocation(tls.current);
 +          if (logsForDeadServers != null) {
 +            List<Path> logs = 
logsForDeadServers.get(tls.current.getServerInstance());
 +            if (logs != null) {
 +              for (Path log : logs) {
 +                LogEntry entry = new LogEntry(tls.extent, 0, log.toString());
 +                tabletMutator.putWal(entry);
 +              }
 +            }
 +          }
 +        }
 +
 +        if (tls.hasFuture()) {
 +          tabletMutator.requireLocation(tls.future);
 +          tabletMutator.deleteLocation(tls.future);
 +        }
 +
 +        processSuspension(tabletMutator, tls, suspensionTimestamp);
 +
 +        tabletMutator.submit(tabletMetadata -> {
 +          // The status of the conditional update is unknown, so check and 
see if things are ok
 +          return tabletMetadata.getLocation() == null;
 +        });
 +      }
 +
 +      Map<KeyExtent,Ample.ConditionalResult> results = 
tabletsMutator.process();
 +
 +      if (results.values().stream().anyMatch(conditionalResult -> 
conditionalResult.getStatus()
 +          != ConditionalWriter.Status.ACCEPTED)) {
 +        throw new DistributedStoreException("Some unassignments did not 
satisfy conditions.");
 +      }
 +
 +    } catch (RuntimeException ex) {
 +      throw new DistributedStoreException(ex);
 +    }
 +  }
 +}
diff --cc 
server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletsMutatorImpl.java
index f8da9b26c3,0000000000..9129c22f1c
mode 100644,000000..100644
--- 
a/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletsMutatorImpl.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletsMutatorImpl.java
@@@ -1,194 -1,0 +1,196 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   https://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.apache.accumulo.server.metadata;
 +
 +import java.util.ArrayList;
 +import java.util.HashMap;
 +import java.util.List;
 +import java.util.Map;
++import java.util.Optional;
 +import java.util.Set;
 +import java.util.stream.Collectors;
 +
 +import org.apache.accumulo.core.client.AccumuloException;
 +import org.apache.accumulo.core.client.AccumuloSecurityException;
 +import org.apache.accumulo.core.client.ConditionalWriter;
 +import org.apache.accumulo.core.client.TableNotFoundException;
 +import org.apache.accumulo.core.data.ConditionalMutation;
 +import org.apache.accumulo.core.dataImpl.KeyExtent;
 +import org.apache.accumulo.core.metadata.schema.Ample;
 +import org.apache.accumulo.core.metadata.schema.TabletMetadata;
 +import org.apache.accumulo.server.ServerContext;
 +import org.apache.hadoop.io.Text;
 +
 +import com.google.common.base.Preconditions;
 +import com.google.common.base.Suppliers;
 +import com.google.common.collect.Maps;
 +
 +public class ConditionalTabletsMutatorImpl implements 
Ample.ConditionalTabletsMutator {
 +
 +  private final ServerContext context;
 +  private Ample.DataLevel dataLevel = null;
 +
 +  private List<ConditionalMutation> mutations = new ArrayList<>();
 +
 +  private Map<Text,KeyExtent> extents = new HashMap<>();
 +
 +  private boolean active = true;
 +
 +  Map<KeyExtent,Ample.UnknownValidator> unknownValidators = new HashMap<>();
 +
 +  public ConditionalTabletsMutatorImpl(ServerContext context) {
 +    this.context = context;
 +  }
 +
 +  @Override
 +  public Ample.OperationRequirements mutateTablet(KeyExtent extent) {
 +    Preconditions.checkState(active);
 +
 +    var dataLevel = Ample.DataLevel.of(extent.tableId());
 +
 +    if (this.dataLevel == null) {
 +      this.dataLevel = dataLevel;
 +    } else if (!this.dataLevel.equals(dataLevel)) {
 +      throw new IllegalArgumentException(
 +          "Can not mix data levels " + this.dataLevel + " " + dataLevel);
 +    }
 +
 +    Preconditions.checkState(extents.putIfAbsent(extent.toMetaRow(), extent) 
== null,
 +        "Duplicate extents not handled");
 +    return new ConditionalTabletMutatorImpl(this, context, extent, 
mutations::add,
 +        unknownValidators::put);
 +  }
 +
 +  protected ConditionalWriter createConditionalWriter(Ample.DataLevel 
dataLevel)
 +      throws TableNotFoundException {
 +    if (dataLevel == Ample.DataLevel.ROOT) {
 +      return new RootConditionalWriter(context);
 +    } else {
 +      return context.createConditionalWriter(dataLevel.metaTable());
 +    }
 +  }
 +
 +  protected Map<KeyExtent,TabletMetadata> readTablets(List<KeyExtent> 
extents) {
 +    Map<KeyExtent,TabletMetadata> failedTablets = new HashMap<>();
 +
-     try (var tabletsMeta = 
context.getAmple().readTablets().forTablets(extents).build()) {
++    try (var tabletsMeta =
++        context.getAmple().readTablets().forTablets(extents, 
Optional.empty()).build()) {
 +      tabletsMeta
 +          .forEach(tabletMetadata -> 
failedTablets.put(tabletMetadata.getExtent(), tabletMetadata));
 +    }
 +
 +    return failedTablets;
 +  }
 +
 +  private Map<KeyExtent,TabletMetadata>
 +      readFailedTablets(Map<KeyExtent,ConditionalWriter.Result> results) {
 +
 +    var extents = results.entrySet().stream().filter(e -> {
 +      try {
 +        return e.getValue().getStatus() != ConditionalWriter.Status.ACCEPTED;
 +      } catch (AccumuloException | AccumuloSecurityException ex) {
 +        throw new RuntimeException(ex);
 +      }
 +    }).map(Map.Entry::getKey).collect(Collectors.toList());
 +
 +    if (extents.isEmpty()) {
 +      return Map.of();
 +    }
 +
 +    return readTablets(extents);
 +  }
 +
 +  @Override
 +  public Map<KeyExtent,Ample.ConditionalResult> process() {
 +    Preconditions.checkState(active);
 +    if (dataLevel != null) {
 +      try (ConditionalWriter conditionalWriter = 
createConditionalWriter(dataLevel)) {
 +        var results = conditionalWriter.write(mutations.iterator());
 +
 +        var resultsMap = new HashMap<KeyExtent,ConditionalWriter.Result>();
 +
 +        while (results.hasNext()) {
 +          var result = results.next();
 +          var row = new Text(result.getMutation().getRow());
 +          resultsMap.put(extents.get(row), result);
 +        }
 +
 +        if (!resultsMap.keySet().equals(Set.copyOf(extents.values()))) {
 +          throw new AssertionError("Not all extents were seen, this is 
unexpected");
 +        }
 +
 +        // only fetch the metadata for failures when requested and when it is 
requested fetch all
 +        // of the failed extents at once to avoid fetching them one by one.
 +        var failedMetadata = Suppliers.memoize(() -> 
readFailedTablets(resultsMap));
 +
 +        return Maps.transformEntries(resultsMap, (extent, result) -> new 
Ample.ConditionalResult() {
 +
 +          private ConditionalWriter.Status _getStatus() {
 +            try {
 +              return result.getStatus();
 +            } catch (AccumuloException | AccumuloSecurityException e) {
 +              throw new RuntimeException(e);
 +            }
 +          }
 +
 +          @Override
 +          public ConditionalWriter.Status getStatus() {
 +            var status = _getStatus();
 +            if (status == ConditionalWriter.Status.UNKNOWN
 +                && unknownValidators.containsKey(extent)) {
 +              var tabletMetadata = readMetadata();
 +              if (tabletMetadata != null && 
unknownValidators.get(extent).test(tabletMetadata)) {
 +                return ConditionalWriter.Status.ACCEPTED;
 +              }
 +            }
 +
 +            return status;
 +          }
 +
 +          @Override
 +          public KeyExtent getExtent() {
 +            return extent;
 +          }
 +
 +          @Override
 +          public TabletMetadata readMetadata() {
 +            Preconditions.checkState(_getStatus() != 
ConditionalWriter.Status.ACCEPTED);
 +            return failedMetadata.get().get(getExtent());
 +          }
 +        });
 +      } catch (TableNotFoundException e) {
 +        throw new RuntimeException(e);
 +      } finally {
 +        // render inoperable because reuse is not tested
 +        extents.clear();
 +        mutations.clear();
 +        active = false;
 +      }
 +    } else {
 +      // render inoperable because reuse is not tested
 +      extents.clear();
 +      mutations.clear();
 +      active = false;
 +      return Map.of();
 +    }
 +  }
 +
 +  @Override
 +  public void close() {}
 +}
diff --cc 
server/base/src/main/java/org/apache/accumulo/server/util/ManagerMetadataUtil.java
index 8d5cf02fb5,3b36bd892e..f578e976a4
--- 
a/server/base/src/main/java/org/apache/accumulo/server/util/ManagerMetadataUtil.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/util/ManagerMetadataUtil.java
@@@ -257,13 -255,15 +259,15 @@@ public class ManagerMetadataUtil 
     * last location if needed and set the new last location
     *
     * @param context The server context
-    * @param ample The metadata persistence layer
     * @param tabletMutator The mutator being built
-    * @param extent The tablet extent
     * @param location The new location
+    * @param lastLocation The previous last location, which may be null
     */
-   public static void updateLastForAssignmentMode(ClientContext context, Ample 
ample,
-       Ample.TabletUpdates<?> tabletMutator, KeyExtent extent, TServerInstance 
location) {
+   public static void updateLastForAssignmentMode(ClientContext context,
 -      Ample.TabletMutator tabletMutator, TServerInstance location, Location 
lastLocation) {
++      Ample.TabletUpdates<?> tabletMutator, TServerInstance location, 
Location lastLocation) {
+     Preconditions.checkArgument(
+         lastLocation == null || lastLocation.getType() == 
TabletMetadata.LocationType.LAST);
+ 
      // if the location mode is assignment, then preserve the current location 
in the last
      // location value
      if 
("assignment".equals(context.getConfiguration().get(Property.TSERV_LAST_LOCATION_MODE)))
 {
diff --cc server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
index bdd554c727,42c90ad754..5ce57ad57e
--- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
@@@ -1772,10 -1754,11 +1773,12 @@@ public class Manager extends AbstractSe
    }
  
    void getAssignments(SortedMap<TServerInstance,TabletServerStatus> 
currentStatus,
-       Map<KeyExtent,TServerInstance> unassigned, 
Map<KeyExtent,TServerInstance> assignedOut) {
-     AssignmentParamsImpl params =
-         AssignmentParamsImpl.fromThrift(currentStatus, unassigned, 
assignedOut);
+       Map<KeyExtent,UnassignedTablet> unassigned, 
Map<KeyExtent,TServerInstance> assignedOut) {
+     AssignmentParamsImpl params = 
AssignmentParamsImpl.fromThrift(currentStatus,
+         unassigned.entrySet().stream().collect(HashMap::new,
+             (m, e) -> m.put(e.getKey(), e.getValue().getServerInstance()), 
Map::putAll),
+         assignedOut);
      tabletBalancer.getAssignments(params);
    }
 +
  }

Reply via email to