This is an automated email from the ASF dual-hosted git repository.

dlmarion pushed a commit to branch elasticity
in repository https://gitbox.apache.org/repos/asf/accumulo.git

commit 406552e46df4412cf8f4f52962446773a9eabee5
Merge: 32f5b4cc69 d4846d407e
Author: Dave Marion <[email protected]>
AuthorDate: Mon Jul 10 17:12:36 2023 +0000

    Merge branch 'main' into elasticity

 .../hadoopImpl/mapred/AccumuloRecordWriter.java    |   2 +-
 .../hadoopImpl/mapreduce/AccumuloRecordWriter.java |   2 +-
 .../manager/state/LoggingTabletStateStore.java     |   6 +
 .../server/manager/state/MetaDataStateStore.java   |  15 +-
 .../server/manager/state/RootTabletStateStore.java |   5 +-
 .../server/manager/state/TabletStateStore.java     |  11 +-
 .../server/manager/state/ZooTabletStateStore.java  |  10 +-
 .../manager/state/ZooTabletStateStoreTest.java     |   3 +-
 .../accumulo/manager/TabletGroupWatcher.java       |  47 ++++++-
 .../apache/accumulo/manager/state/TableCounts.java |   9 ++
 .../apache/accumulo/manager/state/TableStats.java  |   6 +
 .../manager/tserverOps/ShutdownTServer.java        |   3 +
 .../org/apache/accumulo/tserver/TabletServer.java  |   1 +
 .../accumulo/tserver/UnloadTabletHandler.java      |   1 +
 .../accumulo/tserver/session/ScanSession.java      |   4 +
 .../apache/accumulo/tserver/session/Session.java   |   6 +
 .../accumulo/tserver/session/SessionManager.java   |  83 ++++++-----
 .../org/apache/accumulo/tserver/tablet/Tablet.java |  14 +-
 .../test/functional/ManagerAssignmentIT.java       | 153 +++++++++++++++++++++
 test/src/main/resources/log4j2-test.properties     |   3 +
 20 files changed, 335 insertions(+), 49 deletions(-)

diff --cc 
server/base/src/main/java/org/apache/accumulo/server/manager/state/LoggingTabletStateStore.java
index 0189b9294a,acfb363d2b..9f45e36228
--- 
a/server/base/src/main/java/org/apache/accumulo/server/manager/state/LoggingTabletStateStore.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/manager/state/LoggingTabletStateStore.java
@@@ -24,9 -24,9 +24,10 @@@ import java.util.Map
  import java.util.concurrent.TimeUnit;
  
  import org.apache.accumulo.core.logging.TabletLogger;
 +import org.apache.accumulo.core.manager.state.TabletManagement;
  import org.apache.accumulo.core.metadata.TServerInstance;
 -import org.apache.accumulo.core.metadata.TabletLocationState;
+ import org.apache.accumulo.core.metadata.schema.Ample.DataLevel;
 +import org.apache.accumulo.core.metadata.schema.TabletMetadata;
  import org.apache.hadoop.fs.Path;
  
  import com.google.common.net.HostAndPort;
diff --cc 
server/base/src/main/java/org/apache/accumulo/server/manager/state/MetaDataStateStore.java
index 9874928885,8c7bc888eb..dedffefd14
--- 
a/server/base/src/main/java/org/apache/accumulo/server/manager/state/MetaDataStateStore.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/manager/state/MetaDataStateStore.java
@@@ -19,24 -19,33 +19,28 @@@
  package org.apache.accumulo.server.manager.state;
  
  import java.util.Collection;
 -import java.util.List;
 -import java.util.Map;
  
  import org.apache.accumulo.core.clientImpl.ClientContext;
 +import org.apache.accumulo.core.manager.state.TabletManagement;
  import org.apache.accumulo.core.metadata.MetadataTable;
 -import org.apache.accumulo.core.metadata.TServerInstance;
 -import org.apache.accumulo.core.metadata.TabletLocationState;
  import org.apache.accumulo.core.metadata.schema.Ample;
 +import 
org.apache.accumulo.core.metadata.schema.Ample.ConditionalResult.Status;
+ import org.apache.accumulo.core.metadata.schema.Ample.DataLevel;
 -import org.apache.accumulo.core.metadata.schema.Ample.TabletMutator;
  import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
 -import org.apache.accumulo.core.metadata.schema.TabletMetadata.Location;
 -import org.apache.accumulo.core.tabletserver.log.LogEntry;
 -import org.apache.accumulo.server.util.ManagerMetadataUtil;
 -import org.apache.hadoop.fs.Path;
 +import org.apache.accumulo.core.metadata.schema.TabletMetadata;
  
 -class MetaDataStateStore implements TabletStateStore {
 +class MetaDataStateStore extends AbstractTabletStateStore implements 
TabletStateStore {
  
    protected final ClientContext context;
    protected final CurrentState state;
    private final String targetTableName;
    private final Ample ample;
+   private final DataLevel level;
  
-   protected MetaDataStateStore(ClientContext context, CurrentState state, 
String targetTableName) {
+   protected MetaDataStateStore(DataLevel level, ClientContext context, 
CurrentState state,
+       String targetTableName) {
 +    super(context);
+     this.level = level;
      this.context = context;
      this.state = state;
      this.ample = context.getAmple();
diff --cc 
server/base/src/main/java/org/apache/accumulo/server/manager/state/RootTabletStateStore.java
index e88acb1946,98123e1c2f..034940fb0b
--- 
a/server/base/src/main/java/org/apache/accumulo/server/manager/state/RootTabletStateStore.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/manager/state/RootTabletStateStore.java
@@@ -19,8 -19,9 +19,9 @@@
  package org.apache.accumulo.server.manager.state;
  
  import org.apache.accumulo.core.clientImpl.ClientContext;
 +import org.apache.accumulo.core.manager.state.TabletManagement;
  import org.apache.accumulo.core.metadata.RootTable;
 -import org.apache.accumulo.core.metadata.TabletLocationState;
+ import org.apache.accumulo.core.metadata.schema.Ample.DataLevel;
  import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
  
  class RootTabletStateStore extends MetaDataStateStore {
diff --cc 
server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletStateStore.java
index 6bba19ddac,05072fd1b0..ea058ab2dd
--- 
a/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletStateStore.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletStateStore.java
@@@ -34,8 -34,13 +34,13 @@@ import org.apache.hadoop.fs.Path
  /**
   * Interface for storing information about tablet assignments. There are 
three implementations:
   */
 -public interface TabletStateStore extends Iterable<TabletLocationState> {
 +public interface TabletStateStore extends Iterable<TabletManagement> {
  
+   /**
+    * Get the level for this state store
+    */
+   DataLevel getLevel();
+ 
    /**
     * Identifying name for this tablet state store.
     */
diff --cc 
server/base/src/main/java/org/apache/accumulo/server/manager/state/ZooTabletStateStore.java
index 7b9008249d,a2b28e1d71..99994bf794
--- 
a/server/base/src/main/java/org/apache/accumulo/server/manager/state/ZooTabletStateStore.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/manager/state/ZooTabletStateStore.java
@@@ -23,17 -24,20 +23,18 @@@ import java.util.EnumSet
  import java.util.List;
  import java.util.Map;
  
 -import org.apache.accumulo.core.clientImpl.ClientContext;
 +import org.apache.accumulo.core.manager.state.TabletManagement;
 +import 
org.apache.accumulo.core.manager.state.TabletManagement.ManagementAction;
  import org.apache.accumulo.core.metadata.RootTable;
  import org.apache.accumulo.core.metadata.TServerInstance;
 -import org.apache.accumulo.core.metadata.TabletLocationState;
 -import 
org.apache.accumulo.core.metadata.TabletLocationState.BadLocationStateException;
  import org.apache.accumulo.core.metadata.schema.Ample;
+ import org.apache.accumulo.core.metadata.schema.Ample.DataLevel;
  import org.apache.accumulo.core.metadata.schema.Ample.ReadConsistency;
 -import org.apache.accumulo.core.metadata.schema.Ample.TabletMutator;
  import org.apache.accumulo.core.metadata.schema.TabletMetadata;
 -import org.apache.accumulo.core.metadata.schema.TabletMetadata.Location;
 -import org.apache.accumulo.core.metadata.schema.TabletMetadata.LocationType;
 -import org.apache.accumulo.core.tabletserver.log.LogEntry;
 -import org.apache.accumulo.server.util.ManagerMetadataUtil;
 +import org.apache.accumulo.core.spi.compaction.CompactionKind;
 +import org.apache.accumulo.server.ServerContext;
 +import org.apache.accumulo.server.ServiceEnvironmentImpl;
 +import org.apache.accumulo.server.compaction.CompactionJobGenerator;
  import org.apache.hadoop.fs.Path;
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
@@@ -42,16 -46,22 +43,23 @@@ class ZooTabletStateStore extends Abstr
  
    private static final Logger log = 
LoggerFactory.getLogger(ZooTabletStateStore.class);
    private final Ample ample;
 -  private final ClientContext context;
+   private final DataLevel level;
 +  private final ServerContext ctx;
  
-   ZooTabletStateStore(ServerContext context) {
 -  ZooTabletStateStore(DataLevel level, ClientContext context) {
++  ZooTabletStateStore(DataLevel level, ServerContext context) {
 +    super(context);
 +    this.ctx = context;
+     this.level = level;
 -    this.context = context;
      this.ample = context.getAmple();
    }
  
+   @Override
+   public DataLevel getLevel() {
+     return level;
+   }
+ 
    @Override
 -  public ClosableIterator<TabletLocationState> iterator() {
 +  public ClosableIterator<TabletManagement> iterator() {
  
      return new ClosableIterator<>() {
        boolean finished = false;
diff --cc 
server/base/src/test/java/org/apache/accumulo/server/manager/state/ZooTabletStateStoreTest.java
index c6023246c2,0000000000..bbae7d3a21
mode 100644,000000..100644
--- 
a/server/base/src/test/java/org/apache/accumulo/server/manager/state/ZooTabletStateStoreTest.java
+++ 
b/server/base/src/test/java/org/apache/accumulo/server/manager/state/ZooTabletStateStoreTest.java
@@@ -1,63 -1,0 +1,64 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   https://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.apache.accumulo.server.manager.state;
 +
 +import static org.easymock.EasyMock.expect;
 +import static org.junit.jupiter.api.Assertions.assertThrows;
 +
 +import java.util.List;
 +
 +import org.apache.accumulo.core.data.TableId;
 +import org.apache.accumulo.core.dataImpl.KeyExtent;
 +import org.apache.accumulo.core.metadata.TServerInstance;
 +import org.apache.accumulo.core.metadata.schema.Ample;
++import org.apache.accumulo.core.metadata.schema.Ample.DataLevel;
 +import org.apache.accumulo.core.metadata.schema.TabletMetadata;
 +import org.apache.accumulo.server.MockServerContext;
 +import org.apache.accumulo.server.ServerContext;
 +import org.easymock.EasyMock;
 +import org.junit.jupiter.api.Test;
 +
 +import com.google.common.net.HostAndPort;
 +
 +public class ZooTabletStateStoreTest {
 +
 +  @Test
 +  public void testZooTabletStateStore() throws DistributedStoreException {
 +    ServerContext context = MockServerContext.get();
 +    Ample ample = EasyMock.createMock(Ample.class);
 +    expect(context.getAmple()).andReturn(ample).anyTimes();
 +    EasyMock.replay(context, ample);
-     ZooTabletStateStore tstore = new ZooTabletStateStore(context);
++    ZooTabletStateStore tstore = new ZooTabletStateStore(DataLevel.ROOT, 
context);
 +
 +    String sessionId = "this is my unique session data";
 +    TServerInstance server =
 +        new TServerInstance(HostAndPort.fromParts("127.0.0.1", 10000), 
sessionId);
 +
 +    KeyExtent notRoot = new KeyExtent(TableId.of("0"), null, null);
 +    final var assignmentList = List.of(new Assignment(notRoot, server, null));
 +    assertThrows(IllegalArgumentException.class, () -> 
tstore.setLocations(assignmentList));
 +    assertThrows(IllegalArgumentException.class, () -> 
tstore.setFutureLocations(assignmentList));
 +
 +    var nonRootMeta = TabletMetadata.builder(new 
KeyExtent(TableId.of("notroot"), null, null))
 +        .putPrevEndRow(null).build();
 +
 +    final List<TabletMetadata> assignmentList1 = List.of(nonRootMeta);
 +    assertThrows(IllegalArgumentException.class, () -> 
tstore.unassign(assignmentList1, null));
 +  }
 +}
diff --cc 
server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java
index 5f1f5f3f5a,8f14c20614..40df86f82a
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java
@@@ -66,8 -61,11 +66,9 @@@ import org.apache.accumulo.core.metadat
  import org.apache.accumulo.core.metadata.RootTable;
  import org.apache.accumulo.core.metadata.StoredTabletFile;
  import org.apache.accumulo.core.metadata.TServerInstance;
 -import org.apache.accumulo.core.metadata.TabletLocationState;
 -import 
org.apache.accumulo.core.metadata.TabletLocationState.BadLocationStateException;
  import org.apache.accumulo.core.metadata.TabletState;
  import org.apache.accumulo.core.metadata.schema.Ample;
+ import org.apache.accumulo.core.metadata.schema.Ample.DataLevel;
  import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
  import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ChoppedColumnFamily;
  import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.CurrentLocationColumnFamily;
@@@ -310,6 -258,13 +311,13 @@@ abstract class TabletGroupWatcher exten
            if (state == TabletState.ASSIGNED) {
              goal = TabletGoalState.HOSTED;
            }
+           if (Manager.log.isTraceEnabled()) {
+             Manager.log.trace(
+                 "[{}] Shutting down all Tservers: {}, dependentCount: {} 
Extent: {}, state: {}, goal: {}",
+                 store.name(), 
manager.serversToShutdown.equals(currentTServers.keySet()),
 -                dependentWatcher == null ? "null" : 
dependentWatcher.assignedOrHosted(), tls.extent,
 -                state, goal);
++                dependentWatcher == null ? "null" : 
dependentWatcher.assignedOrHosted(),
++                tm.getExtent(), state, goal);
+           }
  
            // if we are shutting down all the tabletservers, we have to do it 
in order
            if ((goal == TabletGoalState.SUSPENDED && state == 
TabletState.HOSTED)
diff --cc 
test/src/main/java/org/apache/accumulo/test/functional/ManagerAssignmentIT.java
index 4908655e9a,676de69ef1..787cc77f21
--- 
a/test/src/main/java/org/apache/accumulo/test/functional/ManagerAssignmentIT.java
+++ 
b/test/src/main/java/org/apache/accumulo/test/functional/ManagerAssignmentIT.java
@@@ -27,57 -25,43 +27,67 @@@ import static org.junit.jupiter.api.Ass
  
  import java.time.Duration;
  import java.util.Collections;
 +import java.util.List;
+ import java.util.Map.Entry;
 +import java.util.Set;
 +import java.util.TreeSet;
+ import java.util.concurrent.CountDownLatch;
+ import java.util.concurrent.ExecutorService;
+ import java.util.concurrent.Executors;
 +import java.util.function.Predicate;
 +import java.util.stream.Collectors;
 +import java.util.stream.IntStream;
  
+ import org.apache.accumulo.core.Constants;
  import org.apache.accumulo.core.client.Accumulo;
  import org.apache.accumulo.core.client.AccumuloClient;
  import org.apache.accumulo.core.client.AccumuloException;
  import org.apache.accumulo.core.client.AccumuloSecurityException;
 +import org.apache.accumulo.core.client.BatchScanner;
  import org.apache.accumulo.core.client.BatchWriter;
+ import org.apache.accumulo.core.client.IsolatedScanner;
 +import org.apache.accumulo.core.client.MutationsRejectedException;
 +import org.apache.accumulo.core.client.Scanner;
 +import org.apache.accumulo.core.client.TableNotFoundException;
  import org.apache.accumulo.core.client.admin.Locations;
 +import org.apache.accumulo.core.client.admin.NewTableConfiguration;
 +import org.apache.accumulo.core.client.admin.TabletHostingGoal;
  import org.apache.accumulo.core.clientImpl.ClientContext;
 +import org.apache.accumulo.core.clientImpl.ClientTabletCache;
 +import org.apache.accumulo.core.conf.Property;
+ import org.apache.accumulo.core.data.Key;
  import org.apache.accumulo.core.data.Mutation;
  import org.apache.accumulo.core.data.Range;
  import org.apache.accumulo.core.data.TableId;
  import org.apache.accumulo.core.data.Value;
 +import org.apache.accumulo.core.dataImpl.KeyExtent;
+ import org.apache.accumulo.core.lock.ServiceLock;
 +import org.apache.accumulo.core.manager.state.TabletManagement;
  import org.apache.accumulo.core.metadata.MetadataTable;
  import org.apache.accumulo.core.metadata.RootTable;
 -import org.apache.accumulo.core.metadata.TabletLocationState;
 +import org.apache.accumulo.core.metadata.schema.Ample;
  import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
 +import org.apache.accumulo.core.metadata.schema.TabletMetadata;
 +import org.apache.accumulo.core.metadata.schema.TabletMetadata.LocationType;
 +import org.apache.accumulo.core.metadata.schema.TabletOperationId;
 +import org.apache.accumulo.core.metadata.schema.TabletOperationType;
  import org.apache.accumulo.core.rpc.clients.ThriftClientTypes;
 +import org.apache.accumulo.core.security.TablePermission;
 +import org.apache.accumulo.core.spi.ondemand.DefaultOnDemandTabletUnloader;
 +import org.apache.accumulo.core.tabletserver.thrift.TabletStats;
  import org.apache.accumulo.core.trace.TraceUtil;
 -import org.apache.accumulo.core.util.UtilWaitThread;
 -import org.apache.accumulo.harness.AccumuloClusterHarness;
 +import org.apache.accumulo.harness.SharedMiniClusterBase;
+ import org.apache.accumulo.minicluster.ServerType;
 -import org.apache.accumulo.miniclusterImpl.MiniAccumuloClusterControl;
 -import org.apache.accumulo.server.manager.state.MetaDataTableScanner;
 +import org.apache.accumulo.server.manager.state.TabletManagementScanner;
  import org.apache.accumulo.test.util.Wait;
 +import org.apache.hadoop.io.Text;
 +import org.junit.jupiter.api.BeforeAll;
  import org.junit.jupiter.api.Test;
  
 +import com.google.common.collect.Iterables;
+ import com.google.common.net.HostAndPort;
  
 -public class ManagerAssignmentIT extends AccumuloClusterHarness {
 +public class ManagerAssignmentIT extends SharedMiniClusterBase {
  
    @Override
    protected Duration defaultTimeout() {
@@@ -322,126 -113,154 +332,269 @@@
      }
    }
  
 +  @Test
 +  public void testBatchScannerAssignsMultipleOnDemandTablets() throws 
Exception {
 +    try (AccumuloClient c = 
Accumulo.newClient().from(getClientProps()).build()) {
 +      String tableName = super.getUniqueNames(1)[0];
 +
 +      String tableId = prepTableForScanTest(c, tableName);
 +
 +      try (BatchScanner s = c.createBatchScanner(tableName)) {
 +        s.setRanges(List.of(new Range("a", "s")));
 +        assertEquals(19, Iterables.size(s));
 +      }
 +
 +      List<TabletStats> stats = getTabletStats(c, tableId);
 +      assertEquals(3, stats.size());
 +      long hostingRequestCount = ClientTabletCache
 +          .getInstance((ClientContext) c, 
TableId.of(tableId)).getTabletHostingRequestCount();
 +      assertTrue(hostingRequestCount > 0);
 +
 +      // Run another scan, all tablets should be loaded
 +      try (BatchScanner s = c.createBatchScanner(tableName)) {
 +        s.setRanges(List.of(new Range("a", "t")));
 +        assertEquals(20, Iterables.size(s));
 +      }
 +
 +      stats = getTabletStats(c, tableId);
 +      assertEquals(3, stats.size());
 +      // No more tablets should have been brought online
 +      assertEquals(hostingRequestCount, ClientTabletCache
 +          .getInstance((ClientContext) c, 
TableId.of(tableId)).getTabletHostingRequestCount());
 +
 +    }
 +  }
 +
 +  @Test
 +  public void testBatchWriterAssignsTablets() throws Exception {
 +    try (AccumuloClient c = 
Accumulo.newClient().from(getClientProps()).build()) {
 +      String tableName = super.getUniqueNames(1)[0];
 +
 +      prepTableForScanTest(c, tableName);
 +    }
 +  }
 +
 +  @Test
 +  public void testOpidPreventsAssignment() throws Exception {
 +    try (AccumuloClient c = 
Accumulo.newClient().from(getClientProps()).build()) {
 +      String tableName = super.getUniqueNames(1)[0];
 +
 +      var tableId = TableId.of(prepTableForScanTest(c, tableName));
 +      assertEquals(0, countTabletsWithLocation(c, tableId));
 +
 +      assertEquals(Set.of("f", "m", "t"), 
c.tableOperations().listSplits(tableName).stream()
 +          .map(Text::toString).collect(Collectors.toSet()));
 +
 +      c.securityOperations().grantTablePermission(getPrincipal(), 
MetadataTable.NAME,
 +          TablePermission.WRITE);
 +
 +      try (var writer = c.createBatchWriter(MetadataTable.NAME)) {
 +        var extent = new KeyExtent(tableId, new Text("m"), new Text("f"));
 +        var opid = TabletOperationId.from(TabletOperationType.SPLITTING, 42L);
 +        Mutation m = new Mutation(extent.toMetaRow());
 +        TabletsSection.ServerColumnFamily.OPID_COLUMN.put(m, new 
Value(opid.canonical()));
 +        writer.addMutation(m);
 +      }
 +
 +      c.tableOperations().setTabletHostingGoal(tableName, new Range(), 
TabletHostingGoal.ALWAYS);
 +
 +      Wait.waitFor(() -> countTabletsWithLocation(c, tableId) >= 3);
 +
 +      // there are four tablets, but one has an operation id set and should 
not be assigned
 +      assertEquals(3, countTabletsWithLocation(c, tableId));
 +
 +      try (var writer = c.createBatchWriter(MetadataTable.NAME)) {
 +        var extent = new KeyExtent(tableId, new Text("m"), new Text("f"));
 +        Mutation m = new Mutation(extent.toMetaRow());
 +        TabletsSection.ServerColumnFamily.OPID_COLUMN.putDelete(m);
 +        writer.addMutation(m);
 +      }
 +
 +      Wait.waitFor(() -> countTabletsWithLocation(c, tableId) >= 4);
 +
 +      // after the operation id is deleted the tablet should be assigned
 +      assertEquals(4, countTabletsWithLocation(c, tableId));
 +    }
 +  }
 +
 +  public static void loadDataForScan(AccumuloClient c, String tableName)
 +      throws MutationsRejectedException, TableNotFoundException {
 +    final byte[] empty = new byte[0];
 +    try (BatchWriter bw = c.createBatchWriter(tableName)) {
 +      IntStream.range(97, 122).forEach((i) -> {
 +        try {
 +          Mutation m = new Mutation(String.valueOf((char) i));
 +          m.put(empty, empty, empty);
 +          bw.addMutation(m);
 +        } catch (MutationsRejectedException e) {
 +          fail("Error inserting data", e);
 +        }
 +      });
 +    }
 +  }
 +
 +  public static Ample getAmple(AccumuloClient c) {
 +    return ((ClientContext) c).getAmple();
 +  }
 +
 +  public static long countTabletsWithLocation(AccumuloClient c, TableId 
tableId) {
 +    return 
getAmple(c).readTablets().forTable(tableId).fetch(TabletMetadata.ColumnType.LOCATION)
 +        .build().stream().filter(tabletMetadata -> 
tabletMetadata.getLocation() != null).count();
 +  }
 +
 +  public static List<TabletStats> getTabletStats(AccumuloClient c, String 
tableId)
 +      throws AccumuloException, AccumuloSecurityException {
 +    return ThriftClientTypes.TABLET_SERVER.execute((ClientContext) c, client 
-> client
 +        .getTabletStats(TraceUtil.traceInfo(), ((ClientContext) 
c).rpcCreds(), tableId));
 +  }
 +
+   @Test
+   public void testShutdownOnlyTServerWithUserTable() throws Exception {
+ 
+     // 2 TabletServers started for this test, shut them down so we only have 
1.
 -    getClusterControl().stopAllServers(ServerType.TABLET_SERVER);
 -    ((MiniAccumuloClusterControl) 
getClusterControl()).start(ServerType.TABLET_SERVER,
 -        Collections.emptyMap(), 1);
++    getCluster().getClusterControl().stopAllServers(ServerType.TABLET_SERVER);
++    getCluster().getClusterControl().start(ServerType.TABLET_SERVER, 
Collections.emptyMap(), 1);
+ 
+     String tableName = getUniqueNames(1)[0];
+ 
+     try (AccumuloClient client = 
Accumulo.newClient().from(getClientProps()).build()) {
+ 
+       Wait.waitFor(() -> 
client.instanceOperations().getTabletServers().size() == 1);
+ 
+       client.tableOperations().create(tableName);
+ 
+       // wait for everything to be hosted and balanced
+       client.instanceOperations().waitForBalance();
+ 
+       try (var writer = client.createBatchWriter(tableName)) {
+         for (int i = 0; i < 1000000; i++) {
+           Mutation m = new Mutation(String.format("%08d", i));
+           m.put("", "", "");
+           writer.addMutation(m);
+         }
+       }
+       client.tableOperations().flush(tableName, null, null, true);
+ 
+       final CountDownLatch latch = new CountDownLatch(10);
+ 
+       Runnable task = new Runnable() {
+         @Override
+         public void run() {
+           while (true) {
+             try (var scanner = new 
IsolatedScanner(client.createScanner(tableName))) {
+               // TODO maybe do not close scanner? The following limit was 
placed on the stream to
+               // avoid reading all the data possibly leaving a scan session 
active on the tserver
+               int count = 0;
+               for (Entry<Key,Value> e : scanner) {
+                 count++;
+                 // let the test thread know that this thread has read some 
data
+                 if (count == 1_000) {
+                   latch.countDown();
+                 }
+               }
+             } catch (Exception e) {
+               e.printStackTrace();
+               break;
+             }
+           }
+         }
+       };
+ 
+       ExecutorService service = Executors.newFixedThreadPool(10);
+       for (int i = 0; i < 10; i++) {
+         service.execute(task);
+       }
+ 
+       // Wait until all threads are reading some data
+       latch.await();
+ 
+       // getClusterControl().stopAllServers(ServerType.TABLET_SERVER)
+       // could potentially send a kill -9 to the process. Shut the tablet
+       // servers down in a more graceful way.
+ 
+       Locations locs = client.tableOperations().locate(tableName,
+           Collections.singletonList(TabletsSection.getRange()));
+       locs.groupByTablet().keySet().stream().map(tid -> 
locs.getTabletLocation(tid))
+           .forEach(location -> {
+             HostAndPort address = HostAndPort.fromString(location);
+             String addressWithSession = address.toString();
+             var zLockPath = 
ServiceLock.path(getCluster().getServerContext().getZooKeeperRoot()
+                 + Constants.ZTSERVERS + "/" + address.toString());
+             long sessionId =
+                 
ServiceLock.getSessionId(getCluster().getServerContext().getZooCache(), 
zLockPath);
+             if (sessionId != 0) {
+               addressWithSession = address.toString() + "[" + 
Long.toHexString(sessionId) + "]";
+             }
+ 
+             final String finalAddress = addressWithSession;
+             System.out.println("Attempting to shutdown TabletServer at: " + 
address.toString());
+             try {
+               ThriftClientTypes.MANAGER.executeVoid((ClientContext) client,
+                   c -> c.shutdownTabletServer(TraceUtil.traceInfo(),
+                       getCluster().getServerContext().rpcCreds(), 
finalAddress, false));
+             } catch (AccumuloException | AccumuloSecurityException e) {
+               fail("Error shutting down TabletServer", e);
+             }
+ 
+           });
+ 
+       Wait.waitFor(() -> 
client.instanceOperations().getTabletServers().size() == 0);
+ 
+     }
+   }
+ 
+   @Test
+   public void testShutdownOnlyTServerWithoutUserTable() throws Exception {
+ 
+     // 2 TabletServers started for this test, shut them down so we only have 
1.
 -    getClusterControl().stopAllServers(ServerType.TABLET_SERVER);
 -    ((MiniAccumuloClusterControl) 
getClusterControl()).start(ServerType.TABLET_SERVER,
 -        Collections.emptyMap(), 1);
++    getCluster().getClusterControl().stopAllServers(ServerType.TABLET_SERVER);
++    getCluster().getClusterControl().start(ServerType.TABLET_SERVER, 
Collections.emptyMap(), 1);
+ 
+     try (AccumuloClient client = 
Accumulo.newClient().from(getClientProps()).build()) {
+ 
+       Wait.waitFor(() -> 
client.instanceOperations().getTabletServers().size() == 1);
+ 
+       client.instanceOperations().waitForBalance();
+ 
+       // getClusterControl().stopAllServers(ServerType.TABLET_SERVER)
+       // could potentially send a kill -9 to the process. Shut the tablet
+       // servers down in a more graceful way.
+ 
+       Locations locs = client.tableOperations().locate(RootTable.NAME,
+           Collections.singletonList(TabletsSection.getRange()));
+       locs.groupByTablet().keySet().stream().map(tid -> 
locs.getTabletLocation(tid))
+           .forEach(location -> {
+             HostAndPort address = HostAndPort.fromString(location);
+             String addressWithSession = address.toString();
+             var zLockPath = 
ServiceLock.path(getCluster().getServerContext().getZooKeeperRoot()
+                 + Constants.ZTSERVERS + "/" + address.toString());
+             long sessionId =
+                 
ServiceLock.getSessionId(getCluster().getServerContext().getZooCache(), 
zLockPath);
+             if (sessionId != 0) {
+               addressWithSession = address.toString() + "[" + 
Long.toHexString(sessionId) + "]";
+             }
+ 
+             final String finalAddress = addressWithSession;
+             System.out.println("Attempting to shutdown TabletServer at: " + 
address.toString());
+             try {
+               ThriftClientTypes.MANAGER.executeVoid((ClientContext) client,
+                   c -> c.shutdownTabletServer(TraceUtil.traceInfo(),
+                       getCluster().getServerContext().rpcCreds(), 
finalAddress, false));
+             } catch (AccumuloException | AccumuloSecurityException e) {
+               fail("Error shutting down TabletServer", e);
+             }
+ 
+           });
+ 
+       Wait.waitFor(() -> 
client.instanceOperations().getTabletServers().size() == 0);
+ 
+     }
+   }
+ 
 -  private TabletLocationState getTabletLocationState(AccumuloClient c, String 
tableId) {
 -    try (MetaDataTableScanner s = new MetaDataTableScanner((ClientContext) c,
 -        new Range(TabletsSection.encodeRow(TableId.of(tableId), null)), 
MetadataTable.NAME)) {
 +  public static TabletManagement getManagerTabletInfo(AccumuloClient c, 
String tableId,
 +      Text endRow) {
 +    try (TabletManagementScanner s = new 
TabletManagementScanner((ClientContext) c,
 +        new Range(TabletsSection.encodeRow(TableId.of(tableId), endRow)), 
MetadataTable.NAME)) {
        return s.next();
      }
    }

Reply via email to