This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git
commit 406552e46df4412cf8f4f52962446773a9eabee5 Merge: 32f5b4cc69 d4846d407e Author: Dave Marion <[email protected]> AuthorDate: Mon Jul 10 17:12:36 2023 +0000 Merge branch 'main' into elasticity .../hadoopImpl/mapred/AccumuloRecordWriter.java | 2 +- .../hadoopImpl/mapreduce/AccumuloRecordWriter.java | 2 +- .../manager/state/LoggingTabletStateStore.java | 6 + .../server/manager/state/MetaDataStateStore.java | 15 +- .../server/manager/state/RootTabletStateStore.java | 5 +- .../server/manager/state/TabletStateStore.java | 11 +- .../server/manager/state/ZooTabletStateStore.java | 10 +- .../manager/state/ZooTabletStateStoreTest.java | 3 +- .../accumulo/manager/TabletGroupWatcher.java | 47 ++++++- .../apache/accumulo/manager/state/TableCounts.java | 9 ++ .../apache/accumulo/manager/state/TableStats.java | 6 + .../manager/tserverOps/ShutdownTServer.java | 3 + .../org/apache/accumulo/tserver/TabletServer.java | 1 + .../accumulo/tserver/UnloadTabletHandler.java | 1 + .../accumulo/tserver/session/ScanSession.java | 4 + .../apache/accumulo/tserver/session/Session.java | 6 + .../accumulo/tserver/session/SessionManager.java | 83 ++++++----- .../org/apache/accumulo/tserver/tablet/Tablet.java | 14 +- .../test/functional/ManagerAssignmentIT.java | 153 +++++++++++++++++++++ test/src/main/resources/log4j2-test.properties | 3 + 20 files changed, 335 insertions(+), 49 deletions(-) diff --cc server/base/src/main/java/org/apache/accumulo/server/manager/state/LoggingTabletStateStore.java index 0189b9294a,acfb363d2b..9f45e36228 --- a/server/base/src/main/java/org/apache/accumulo/server/manager/state/LoggingTabletStateStore.java +++ b/server/base/src/main/java/org/apache/accumulo/server/manager/state/LoggingTabletStateStore.java @@@ -24,9 -24,9 +24,10 @@@ import java.util.Map import java.util.concurrent.TimeUnit; import org.apache.accumulo.core.logging.TabletLogger; +import org.apache.accumulo.core.manager.state.TabletManagement; import org.apache.accumulo.core.metadata.TServerInstance; -import org.apache.accumulo.core.metadata.TabletLocationState; + import org.apache.accumulo.core.metadata.schema.Ample.DataLevel; +import org.apache.accumulo.core.metadata.schema.TabletMetadata; import org.apache.hadoop.fs.Path; import com.google.common.net.HostAndPort; diff --cc server/base/src/main/java/org/apache/accumulo/server/manager/state/MetaDataStateStore.java index 9874928885,8c7bc888eb..dedffefd14 --- a/server/base/src/main/java/org/apache/accumulo/server/manager/state/MetaDataStateStore.java +++ b/server/base/src/main/java/org/apache/accumulo/server/manager/state/MetaDataStateStore.java @@@ -19,24 -19,33 +19,28 @@@ package org.apache.accumulo.server.manager.state; import java.util.Collection; -import java.util.List; -import java.util.Map; import org.apache.accumulo.core.clientImpl.ClientContext; +import org.apache.accumulo.core.manager.state.TabletManagement; import org.apache.accumulo.core.metadata.MetadataTable; -import org.apache.accumulo.core.metadata.TServerInstance; -import org.apache.accumulo.core.metadata.TabletLocationState; import org.apache.accumulo.core.metadata.schema.Ample; +import org.apache.accumulo.core.metadata.schema.Ample.ConditionalResult.Status; + import org.apache.accumulo.core.metadata.schema.Ample.DataLevel; -import org.apache.accumulo.core.metadata.schema.Ample.TabletMutator; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection; -import org.apache.accumulo.core.metadata.schema.TabletMetadata.Location; -import org.apache.accumulo.core.tabletserver.log.LogEntry; -import org.apache.accumulo.server.util.ManagerMetadataUtil; -import org.apache.hadoop.fs.Path; +import org.apache.accumulo.core.metadata.schema.TabletMetadata; -class MetaDataStateStore implements TabletStateStore { +class MetaDataStateStore extends AbstractTabletStateStore implements TabletStateStore { protected final ClientContext context; protected final CurrentState state; private final String targetTableName; private final Ample ample; + private final DataLevel level; - protected MetaDataStateStore(ClientContext context, CurrentState state, String targetTableName) { + protected MetaDataStateStore(DataLevel level, ClientContext context, CurrentState state, + String targetTableName) { + super(context); + this.level = level; this.context = context; this.state = state; this.ample = context.getAmple(); diff --cc server/base/src/main/java/org/apache/accumulo/server/manager/state/RootTabletStateStore.java index e88acb1946,98123e1c2f..034940fb0b --- a/server/base/src/main/java/org/apache/accumulo/server/manager/state/RootTabletStateStore.java +++ b/server/base/src/main/java/org/apache/accumulo/server/manager/state/RootTabletStateStore.java @@@ -19,8 -19,9 +19,9 @@@ package org.apache.accumulo.server.manager.state; import org.apache.accumulo.core.clientImpl.ClientContext; +import org.apache.accumulo.core.manager.state.TabletManagement; import org.apache.accumulo.core.metadata.RootTable; -import org.apache.accumulo.core.metadata.TabletLocationState; + import org.apache.accumulo.core.metadata.schema.Ample.DataLevel; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection; class RootTabletStateStore extends MetaDataStateStore { diff --cc server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletStateStore.java index 6bba19ddac,05072fd1b0..ea058ab2dd --- a/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletStateStore.java +++ b/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletStateStore.java @@@ -34,8 -34,13 +34,13 @@@ import org.apache.hadoop.fs.Path /** * Interface for storing information about tablet assignments. There are three implementations: */ -public interface TabletStateStore extends Iterable<TabletLocationState> { +public interface TabletStateStore extends Iterable<TabletManagement> { + /** + * Get the level for this state store + */ + DataLevel getLevel(); + /** * Identifying name for this tablet state store. */ diff --cc server/base/src/main/java/org/apache/accumulo/server/manager/state/ZooTabletStateStore.java index 7b9008249d,a2b28e1d71..99994bf794 --- a/server/base/src/main/java/org/apache/accumulo/server/manager/state/ZooTabletStateStore.java +++ b/server/base/src/main/java/org/apache/accumulo/server/manager/state/ZooTabletStateStore.java @@@ -23,17 -24,20 +23,18 @@@ import java.util.EnumSet import java.util.List; import java.util.Map; -import org.apache.accumulo.core.clientImpl.ClientContext; +import org.apache.accumulo.core.manager.state.TabletManagement; +import org.apache.accumulo.core.manager.state.TabletManagement.ManagementAction; import org.apache.accumulo.core.metadata.RootTable; import org.apache.accumulo.core.metadata.TServerInstance; -import org.apache.accumulo.core.metadata.TabletLocationState; -import org.apache.accumulo.core.metadata.TabletLocationState.BadLocationStateException; import org.apache.accumulo.core.metadata.schema.Ample; + import org.apache.accumulo.core.metadata.schema.Ample.DataLevel; import org.apache.accumulo.core.metadata.schema.Ample.ReadConsistency; -import org.apache.accumulo.core.metadata.schema.Ample.TabletMutator; import org.apache.accumulo.core.metadata.schema.TabletMetadata; -import org.apache.accumulo.core.metadata.schema.TabletMetadata.Location; -import org.apache.accumulo.core.metadata.schema.TabletMetadata.LocationType; -import org.apache.accumulo.core.tabletserver.log.LogEntry; -import org.apache.accumulo.server.util.ManagerMetadataUtil; +import org.apache.accumulo.core.spi.compaction.CompactionKind; +import org.apache.accumulo.server.ServerContext; +import org.apache.accumulo.server.ServiceEnvironmentImpl; +import org.apache.accumulo.server.compaction.CompactionJobGenerator; import org.apache.hadoop.fs.Path; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@@ -42,16 -46,22 +43,23 @@@ class ZooTabletStateStore extends Abstr private static final Logger log = LoggerFactory.getLogger(ZooTabletStateStore.class); private final Ample ample; - private final ClientContext context; + private final DataLevel level; + private final ServerContext ctx; - ZooTabletStateStore(ServerContext context) { - ZooTabletStateStore(DataLevel level, ClientContext context) { ++ ZooTabletStateStore(DataLevel level, ServerContext context) { + super(context); + this.ctx = context; + this.level = level; - this.context = context; this.ample = context.getAmple(); } + @Override + public DataLevel getLevel() { + return level; + } + @Override - public ClosableIterator<TabletLocationState> iterator() { + public ClosableIterator<TabletManagement> iterator() { return new ClosableIterator<>() { boolean finished = false; diff --cc server/base/src/test/java/org/apache/accumulo/server/manager/state/ZooTabletStateStoreTest.java index c6023246c2,0000000000..bbae7d3a21 mode 100644,000000..100644 --- a/server/base/src/test/java/org/apache/accumulo/server/manager/state/ZooTabletStateStoreTest.java +++ b/server/base/src/test/java/org/apache/accumulo/server/manager/state/ZooTabletStateStoreTest.java @@@ -1,63 -1,0 +1,64 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.server.manager.state; + +import static org.easymock.EasyMock.expect; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import java.util.List; + +import org.apache.accumulo.core.data.TableId; +import org.apache.accumulo.core.dataImpl.KeyExtent; +import org.apache.accumulo.core.metadata.TServerInstance; +import org.apache.accumulo.core.metadata.schema.Ample; ++import org.apache.accumulo.core.metadata.schema.Ample.DataLevel; +import org.apache.accumulo.core.metadata.schema.TabletMetadata; +import org.apache.accumulo.server.MockServerContext; +import org.apache.accumulo.server.ServerContext; +import org.easymock.EasyMock; +import org.junit.jupiter.api.Test; + +import com.google.common.net.HostAndPort; + +public class ZooTabletStateStoreTest { + + @Test + public void testZooTabletStateStore() throws DistributedStoreException { + ServerContext context = MockServerContext.get(); + Ample ample = EasyMock.createMock(Ample.class); + expect(context.getAmple()).andReturn(ample).anyTimes(); + EasyMock.replay(context, ample); - ZooTabletStateStore tstore = new ZooTabletStateStore(context); ++ ZooTabletStateStore tstore = new ZooTabletStateStore(DataLevel.ROOT, context); + + String sessionId = "this is my unique session data"; + TServerInstance server = + new TServerInstance(HostAndPort.fromParts("127.0.0.1", 10000), sessionId); + + KeyExtent notRoot = new KeyExtent(TableId.of("0"), null, null); + final var assignmentList = List.of(new Assignment(notRoot, server, null)); + assertThrows(IllegalArgumentException.class, () -> tstore.setLocations(assignmentList)); + assertThrows(IllegalArgumentException.class, () -> tstore.setFutureLocations(assignmentList)); + + var nonRootMeta = TabletMetadata.builder(new KeyExtent(TableId.of("notroot"), null, null)) + .putPrevEndRow(null).build(); + + final List<TabletMetadata> assignmentList1 = List.of(nonRootMeta); + assertThrows(IllegalArgumentException.class, () -> tstore.unassign(assignmentList1, null)); + } +} diff --cc server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java index 5f1f5f3f5a,8f14c20614..40df86f82a --- a/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java @@@ -66,8 -61,11 +66,9 @@@ import org.apache.accumulo.core.metadat import org.apache.accumulo.core.metadata.RootTable; import org.apache.accumulo.core.metadata.StoredTabletFile; import org.apache.accumulo.core.metadata.TServerInstance; -import org.apache.accumulo.core.metadata.TabletLocationState; -import org.apache.accumulo.core.metadata.TabletLocationState.BadLocationStateException; import org.apache.accumulo.core.metadata.TabletState; import org.apache.accumulo.core.metadata.schema.Ample; + import org.apache.accumulo.core.metadata.schema.Ample.DataLevel; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ChoppedColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.CurrentLocationColumnFamily; @@@ -310,6 -258,13 +311,13 @@@ abstract class TabletGroupWatcher exten if (state == TabletState.ASSIGNED) { goal = TabletGoalState.HOSTED; } + if (Manager.log.isTraceEnabled()) { + Manager.log.trace( + "[{}] Shutting down all Tservers: {}, dependentCount: {} Extent: {}, state: {}, goal: {}", + store.name(), manager.serversToShutdown.equals(currentTServers.keySet()), - dependentWatcher == null ? "null" : dependentWatcher.assignedOrHosted(), tls.extent, - state, goal); ++ dependentWatcher == null ? "null" : dependentWatcher.assignedOrHosted(), ++ tm.getExtent(), state, goal); + } // if we are shutting down all the tabletservers, we have to do it in order if ((goal == TabletGoalState.SUSPENDED && state == TabletState.HOSTED) diff --cc test/src/main/java/org/apache/accumulo/test/functional/ManagerAssignmentIT.java index 4908655e9a,676de69ef1..787cc77f21 --- a/test/src/main/java/org/apache/accumulo/test/functional/ManagerAssignmentIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/ManagerAssignmentIT.java @@@ -27,57 -25,43 +27,67 @@@ import static org.junit.jupiter.api.Ass import java.time.Duration; import java.util.Collections; +import java.util.List; + import java.util.Map.Entry; +import java.util.Set; +import java.util.TreeSet; + import java.util.concurrent.CountDownLatch; + import java.util.concurrent.ExecutorService; + import java.util.concurrent.Executors; +import java.util.function.Predicate; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.client.Accumulo; import org.apache.accumulo.core.client.AccumuloClient; import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.BatchScanner; import org.apache.accumulo.core.client.BatchWriter; + import org.apache.accumulo.core.client.IsolatedScanner; +import org.apache.accumulo.core.client.MutationsRejectedException; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.TableNotFoundException; import org.apache.accumulo.core.client.admin.Locations; +import org.apache.accumulo.core.client.admin.NewTableConfiguration; +import org.apache.accumulo.core.client.admin.TabletHostingGoal; import org.apache.accumulo.core.clientImpl.ClientContext; +import org.apache.accumulo.core.clientImpl.ClientTabletCache; +import org.apache.accumulo.core.conf.Property; + import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.dataImpl.KeyExtent; + import org.apache.accumulo.core.lock.ServiceLock; +import org.apache.accumulo.core.manager.state.TabletManagement; import org.apache.accumulo.core.metadata.MetadataTable; import org.apache.accumulo.core.metadata.RootTable; -import org.apache.accumulo.core.metadata.TabletLocationState; +import org.apache.accumulo.core.metadata.schema.Ample; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection; +import org.apache.accumulo.core.metadata.schema.TabletMetadata; +import org.apache.accumulo.core.metadata.schema.TabletMetadata.LocationType; +import org.apache.accumulo.core.metadata.schema.TabletOperationId; +import org.apache.accumulo.core.metadata.schema.TabletOperationType; import org.apache.accumulo.core.rpc.clients.ThriftClientTypes; +import org.apache.accumulo.core.security.TablePermission; +import org.apache.accumulo.core.spi.ondemand.DefaultOnDemandTabletUnloader; +import org.apache.accumulo.core.tabletserver.thrift.TabletStats; import org.apache.accumulo.core.trace.TraceUtil; -import org.apache.accumulo.core.util.UtilWaitThread; -import org.apache.accumulo.harness.AccumuloClusterHarness; +import org.apache.accumulo.harness.SharedMiniClusterBase; + import org.apache.accumulo.minicluster.ServerType; -import org.apache.accumulo.miniclusterImpl.MiniAccumuloClusterControl; -import org.apache.accumulo.server.manager.state.MetaDataTableScanner; +import org.apache.accumulo.server.manager.state.TabletManagementScanner; import org.apache.accumulo.test.util.Wait; +import org.apache.hadoop.io.Text; +import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; +import com.google.common.collect.Iterables; + import com.google.common.net.HostAndPort; -public class ManagerAssignmentIT extends AccumuloClusterHarness { +public class ManagerAssignmentIT extends SharedMiniClusterBase { @Override protected Duration defaultTimeout() { @@@ -322,126 -113,154 +332,269 @@@ } } + @Test + public void testBatchScannerAssignsMultipleOnDemandTablets() throws Exception { + try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) { + String tableName = super.getUniqueNames(1)[0]; + + String tableId = prepTableForScanTest(c, tableName); + + try (BatchScanner s = c.createBatchScanner(tableName)) { + s.setRanges(List.of(new Range("a", "s"))); + assertEquals(19, Iterables.size(s)); + } + + List<TabletStats> stats = getTabletStats(c, tableId); + assertEquals(3, stats.size()); + long hostingRequestCount = ClientTabletCache + .getInstance((ClientContext) c, TableId.of(tableId)).getTabletHostingRequestCount(); + assertTrue(hostingRequestCount > 0); + + // Run another scan, all tablets should be loaded + try (BatchScanner s = c.createBatchScanner(tableName)) { + s.setRanges(List.of(new Range("a", "t"))); + assertEquals(20, Iterables.size(s)); + } + + stats = getTabletStats(c, tableId); + assertEquals(3, stats.size()); + // No more tablets should have been brought online + assertEquals(hostingRequestCount, ClientTabletCache + .getInstance((ClientContext) c, TableId.of(tableId)).getTabletHostingRequestCount()); + + } + } + + @Test + public void testBatchWriterAssignsTablets() throws Exception { + try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) { + String tableName = super.getUniqueNames(1)[0]; + + prepTableForScanTest(c, tableName); + } + } + + @Test + public void testOpidPreventsAssignment() throws Exception { + try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) { + String tableName = super.getUniqueNames(1)[0]; + + var tableId = TableId.of(prepTableForScanTest(c, tableName)); + assertEquals(0, countTabletsWithLocation(c, tableId)); + + assertEquals(Set.of("f", "m", "t"), c.tableOperations().listSplits(tableName).stream() + .map(Text::toString).collect(Collectors.toSet())); + + c.securityOperations().grantTablePermission(getPrincipal(), MetadataTable.NAME, + TablePermission.WRITE); + + try (var writer = c.createBatchWriter(MetadataTable.NAME)) { + var extent = new KeyExtent(tableId, new Text("m"), new Text("f")); + var opid = TabletOperationId.from(TabletOperationType.SPLITTING, 42L); + Mutation m = new Mutation(extent.toMetaRow()); + TabletsSection.ServerColumnFamily.OPID_COLUMN.put(m, new Value(opid.canonical())); + writer.addMutation(m); + } + + c.tableOperations().setTabletHostingGoal(tableName, new Range(), TabletHostingGoal.ALWAYS); + + Wait.waitFor(() -> countTabletsWithLocation(c, tableId) >= 3); + + // there are four tablets, but one has an operation id set and should not be assigned + assertEquals(3, countTabletsWithLocation(c, tableId)); + + try (var writer = c.createBatchWriter(MetadataTable.NAME)) { + var extent = new KeyExtent(tableId, new Text("m"), new Text("f")); + Mutation m = new Mutation(extent.toMetaRow()); + TabletsSection.ServerColumnFamily.OPID_COLUMN.putDelete(m); + writer.addMutation(m); + } + + Wait.waitFor(() -> countTabletsWithLocation(c, tableId) >= 4); + + // after the operation id is deleted the tablet should be assigned + assertEquals(4, countTabletsWithLocation(c, tableId)); + } + } + + public static void loadDataForScan(AccumuloClient c, String tableName) + throws MutationsRejectedException, TableNotFoundException { + final byte[] empty = new byte[0]; + try (BatchWriter bw = c.createBatchWriter(tableName)) { + IntStream.range(97, 122).forEach((i) -> { + try { + Mutation m = new Mutation(String.valueOf((char) i)); + m.put(empty, empty, empty); + bw.addMutation(m); + } catch (MutationsRejectedException e) { + fail("Error inserting data", e); + } + }); + } + } + + public static Ample getAmple(AccumuloClient c) { + return ((ClientContext) c).getAmple(); + } + + public static long countTabletsWithLocation(AccumuloClient c, TableId tableId) { + return getAmple(c).readTablets().forTable(tableId).fetch(TabletMetadata.ColumnType.LOCATION) + .build().stream().filter(tabletMetadata -> tabletMetadata.getLocation() != null).count(); + } + + public static List<TabletStats> getTabletStats(AccumuloClient c, String tableId) + throws AccumuloException, AccumuloSecurityException { + return ThriftClientTypes.TABLET_SERVER.execute((ClientContext) c, client -> client + .getTabletStats(TraceUtil.traceInfo(), ((ClientContext) c).rpcCreds(), tableId)); + } + + @Test + public void testShutdownOnlyTServerWithUserTable() throws Exception { + + // 2 TabletServers started for this test, shut them down so we only have 1. - getClusterControl().stopAllServers(ServerType.TABLET_SERVER); - ((MiniAccumuloClusterControl) getClusterControl()).start(ServerType.TABLET_SERVER, - Collections.emptyMap(), 1); ++ getCluster().getClusterControl().stopAllServers(ServerType.TABLET_SERVER); ++ getCluster().getClusterControl().start(ServerType.TABLET_SERVER, Collections.emptyMap(), 1); + + String tableName = getUniqueNames(1)[0]; + + try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) { + + Wait.waitFor(() -> client.instanceOperations().getTabletServers().size() == 1); + + client.tableOperations().create(tableName); + + // wait for everything to be hosted and balanced + client.instanceOperations().waitForBalance(); + + try (var writer = client.createBatchWriter(tableName)) { + for (int i = 0; i < 1000000; i++) { + Mutation m = new Mutation(String.format("%08d", i)); + m.put("", "", ""); + writer.addMutation(m); + } + } + client.tableOperations().flush(tableName, null, null, true); + + final CountDownLatch latch = new CountDownLatch(10); + + Runnable task = new Runnable() { + @Override + public void run() { + while (true) { + try (var scanner = new IsolatedScanner(client.createScanner(tableName))) { + // TODO maybe do not close scanner? The following limit was placed on the stream to + // avoid reading all the data possibly leaving a scan session active on the tserver + int count = 0; + for (Entry<Key,Value> e : scanner) { + count++; + // let the test thread know that this thread has read some data + if (count == 1_000) { + latch.countDown(); + } + } + } catch (Exception e) { + e.printStackTrace(); + break; + } + } + } + }; + + ExecutorService service = Executors.newFixedThreadPool(10); + for (int i = 0; i < 10; i++) { + service.execute(task); + } + + // Wait until all threads are reading some data + latch.await(); + + // getClusterControl().stopAllServers(ServerType.TABLET_SERVER) + // could potentially send a kill -9 to the process. Shut the tablet + // servers down in a more graceful way. + + Locations locs = client.tableOperations().locate(tableName, + Collections.singletonList(TabletsSection.getRange())); + locs.groupByTablet().keySet().stream().map(tid -> locs.getTabletLocation(tid)) + .forEach(location -> { + HostAndPort address = HostAndPort.fromString(location); + String addressWithSession = address.toString(); + var zLockPath = ServiceLock.path(getCluster().getServerContext().getZooKeeperRoot() + + Constants.ZTSERVERS + "/" + address.toString()); + long sessionId = + ServiceLock.getSessionId(getCluster().getServerContext().getZooCache(), zLockPath); + if (sessionId != 0) { + addressWithSession = address.toString() + "[" + Long.toHexString(sessionId) + "]"; + } + + final String finalAddress = addressWithSession; + System.out.println("Attempting to shutdown TabletServer at: " + address.toString()); + try { + ThriftClientTypes.MANAGER.executeVoid((ClientContext) client, + c -> c.shutdownTabletServer(TraceUtil.traceInfo(), + getCluster().getServerContext().rpcCreds(), finalAddress, false)); + } catch (AccumuloException | AccumuloSecurityException e) { + fail("Error shutting down TabletServer", e); + } + + }); + + Wait.waitFor(() -> client.instanceOperations().getTabletServers().size() == 0); + + } + } + + @Test + public void testShutdownOnlyTServerWithoutUserTable() throws Exception { + + // 2 TabletServers started for this test, shut them down so we only have 1. - getClusterControl().stopAllServers(ServerType.TABLET_SERVER); - ((MiniAccumuloClusterControl) getClusterControl()).start(ServerType.TABLET_SERVER, - Collections.emptyMap(), 1); ++ getCluster().getClusterControl().stopAllServers(ServerType.TABLET_SERVER); ++ getCluster().getClusterControl().start(ServerType.TABLET_SERVER, Collections.emptyMap(), 1); + + try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) { + + Wait.waitFor(() -> client.instanceOperations().getTabletServers().size() == 1); + + client.instanceOperations().waitForBalance(); + + // getClusterControl().stopAllServers(ServerType.TABLET_SERVER) + // could potentially send a kill -9 to the process. Shut the tablet + // servers down in a more graceful way. + + Locations locs = client.tableOperations().locate(RootTable.NAME, + Collections.singletonList(TabletsSection.getRange())); + locs.groupByTablet().keySet().stream().map(tid -> locs.getTabletLocation(tid)) + .forEach(location -> { + HostAndPort address = HostAndPort.fromString(location); + String addressWithSession = address.toString(); + var zLockPath = ServiceLock.path(getCluster().getServerContext().getZooKeeperRoot() + + Constants.ZTSERVERS + "/" + address.toString()); + long sessionId = + ServiceLock.getSessionId(getCluster().getServerContext().getZooCache(), zLockPath); + if (sessionId != 0) { + addressWithSession = address.toString() + "[" + Long.toHexString(sessionId) + "]"; + } + + final String finalAddress = addressWithSession; + System.out.println("Attempting to shutdown TabletServer at: " + address.toString()); + try { + ThriftClientTypes.MANAGER.executeVoid((ClientContext) client, + c -> c.shutdownTabletServer(TraceUtil.traceInfo(), + getCluster().getServerContext().rpcCreds(), finalAddress, false)); + } catch (AccumuloException | AccumuloSecurityException e) { + fail("Error shutting down TabletServer", e); + } + + }); + + Wait.waitFor(() -> client.instanceOperations().getTabletServers().size() == 0); + + } + } + - private TabletLocationState getTabletLocationState(AccumuloClient c, String tableId) { - try (MetaDataTableScanner s = new MetaDataTableScanner((ClientContext) c, - new Range(TabletsSection.encodeRow(TableId.of(tableId), null)), MetadataTable.NAME)) { + public static TabletManagement getManagerTabletInfo(AccumuloClient c, String tableId, + Text endRow) { + try (TabletManagementScanner s = new TabletManagementScanner((ClientContext) c, + new Range(TabletsSection.encodeRow(TableId.of(tableId), endRow)), MetadataTable.NAME)) { return s.next(); } }
