This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git
commit 37279e8008bff6acd4aed72c479924ea2ba6b475 Merge: c3d8fdfcc6 a2ca2bc382 Author: Dave Marion <[email protected]> AuthorDate: Wed Jun 11 20:24:52 2025 +0000 Merge branch '2.1' .../main/java/org/apache/accumulo/core/cli/ConfigOpts.java | 2 ++ .../main/java/org/apache/accumulo/core/conf/Property.java | 6 ++++++ .../java/org/apache/accumulo/core/cli/ConfigOptsTest.java | 13 +++++++------ .../java/org/apache/accumulo/server/AbstractServer.java | 11 +++++++++-- .../java/org/apache/accumulo/server/rpc/TServerUtils.java | 5 +++-- .../src/main/java/org/apache/accumulo/monitor/Monitor.java | 2 +- .../apache/accumulo/test/functional/GarbageCollectorIT.java | 3 ++- .../org/apache/accumulo/test/functional/ZombieTServer.java | 4 +++- .../java/org/apache/accumulo/test/metrics/MetricsIT.java | 4 +++- .../org/apache/accumulo/test/performance/NullTserver.java | 3 ++- 10 files changed, 38 insertions(+), 15 deletions(-) diff --cc core/src/main/java/org/apache/accumulo/core/cli/ConfigOpts.java index 36cce79e9c,94de245b71..c8f7c576b3 --- a/core/src/main/java/org/apache/accumulo/core/cli/ConfigOpts.java +++ b/core/src/main/java/org/apache/accumulo/core/cli/ConfigOpts.java @@@ -39,6 -39,6 +39,8 @@@ public class ConfigOpts extends Help private static final Logger log = LoggerFactory.getLogger(ConfigOpts.class); ++ public static final String BIND_ALL_ADDRESSES = "0.0.0.0"; ++ @Parameter(names = {"-p", "-props", "--props"}, description = "Sets path to accumulo.properties." + "The classpath will be searched if this property is not set") private String propsPath; diff --cc core/src/main/java/org/apache/accumulo/core/conf/Property.java index f03c1a341b,adfd95a66b..1e2220dc3a --- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java +++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java @@@ -358,17 -338,10 +361,20 @@@ public enum Property + " was changed and it now can accept multiple class names. The metrics spi was introduced in 2.1.3," + " the deprecated factory is org.apache.accumulo.core.metrics.MeterRegistryFactory.", "2.1.0"), - GENERAL_SERVER_LOCK_VERIFICATION_INTERVAL("general.server.lock.verification.interval", "0", + GENERAL_MICROMETER_USER_TAGS("general.micrometer.user.tags", "", PropertyType.STRING, + "A comma separated list of tags to emit with all metrics from the process. Example:" + + "\"tag1=value1,tag2=value2\".", + "4.0.0"), ++ // TODO: Make sure to backport this to 3.1, then remove here in 4.0 ++ @Deprecated(since = "3.1.0") ++ @ReplacedBy(property = RPC_PROCESS_BIND_ADDRESS) + GENERAL_PROCESS_BIND_ADDRESS("general.process.bind.addr", "0.0.0.0", PropertyType.STRING, + "The local IP address to which this server should bind for sending and receiving network traffic.", + "3.0.0"), + GENERAL_SERVER_LOCK_VERIFICATION_INTERVAL("general.server.lock.verification.interval", "2m", PropertyType.TIMEDURATION, "Interval at which the Manager and TabletServer should verify their server locks. A value of zero" - + " disables this check.", + + " disables this check. The default value change from 0 to 2m in 3.1.0.", "2.1.4"), // properties that are specific to manager server behavior MANAGER_PREFIX("manager.", null, PropertyType.PREFIX, diff --cc core/src/test/java/org/apache/accumulo/core/cli/ConfigOptsTest.java index 5e71cf26c9,e991bc74a3..225db54925 --- a/core/src/test/java/org/apache/accumulo/core/cli/ConfigOptsTest.java +++ b/core/src/test/java/org/apache/accumulo/core/cli/ConfigOptsTest.java @@@ -42,9 -42,9 +42,10 @@@ public class ConfigOptsTest } @Test - public void testGetAddress_NOne() { + public void testGetAddress_None() { opts.parseArgs(ConfigOptsTest.class.getName(), new String[] {}); - assertEquals("0.0.0.0", opts.getSiteConfiguration().get(Property.GENERAL_PROCESS_BIND_ADDRESS)); - assertEquals("", opts.getSiteConfiguration().get(Property.RPC_PROCESS_BIND_ADDRESS)); ++ assertEquals(Property.RPC_PROCESS_BIND_ADDRESS.getDefaultValue(), ++ opts.getSiteConfiguration().get(Property.RPC_PROCESS_BIND_ADDRESS)); } @Test @@@ -59,10 -59,10 +60,10 @@@ @Test public void testOverrideMultiple() { opts.parseArgs(ConfigOptsTest.class.getName(), - new String[] {"-o", Property.GENERAL_PROCESS_BIND_ADDRESS.getKey() + "=1.2.3.4", "-o", + new String[] {"-o", Property.RPC_PROCESS_BIND_ADDRESS.getKey() + "=1.2.3.4", "-o", - Property.SSERV_CLIENTPORT.getKey() + "=8888"}); + Property.COMPACTOR_GROUP_NAME.getKey() + "=test"}); - assertEquals("1.2.3.4", opts.getSiteConfiguration().get(Property.GENERAL_PROCESS_BIND_ADDRESS)); + assertEquals("1.2.3.4", opts.getSiteConfiguration().get(Property.RPC_PROCESS_BIND_ADDRESS)); - assertEquals("8888", opts.getSiteConfiguration().get(Property.SSERV_CLIENTPORT)); + assertEquals("test", opts.getSiteConfiguration().get(Property.COMPACTOR_GROUP_NAME)); } } diff --cc server/base/src/main/java/org/apache/accumulo/server/AbstractServer.java index fb189c61f0,932eb0d078..643bf00fda --- a/server/base/src/main/java/org/apache/accumulo/server/AbstractServer.java +++ b/server/base/src/main/java/org/apache/accumulo/server/AbstractServer.java @@@ -82,19 -60,38 +82,26 @@@ public abstract class AbstractServe private final AtomicBoolean shutdownRequested = new AtomicBoolean(false); private final AtomicBoolean shutdownComplete = new AtomicBoolean(false); - protected AbstractServer(String appName, ServerOpts opts, String[] args) { - this.log = LoggerFactory.getLogger(getClass().getName()); - this.applicationName = appName; - opts.parseArgs(appName, args); + protected AbstractServer(ServerId.Type serverType, ConfigOpts opts, + Function<SiteConfiguration,ServerContext> serverContextFactory, String[] args) { + this.applicationName = serverType.name(); + opts.parseArgs(applicationName, args); var siteConfig = opts.getSiteConfiguration(); - this.hostname = siteConfig.get(Property.GENERAL_PROCESS_BIND_ADDRESS); - boolean oldBindParameterSpecifiedOnCmdLine = false; - boolean newBindParameterSpecified = false; - for (String arg : args) { - if (arg.equals("-a") || arg.equals("--address")) { - oldBindParameterSpecifiedOnCmdLine = true; - } else if (siteConfig.isPropertySet(Property.RPC_PROCESS_BIND_ADDRESS)) { - newBindParameterSpecified = true; - } - } - if (oldBindParameterSpecifiedOnCmdLine && newBindParameterSpecified) { - throw new IllegalStateException("Argument '-a' cannot be used with property 'rpc.bind.addr'"); - } + final String newBindParameter = siteConfig.get(Property.RPC_PROCESS_BIND_ADDRESS); + // If new bind parameter passed on command line or in file, then use it. - if (newBindParameterSpecified - || !newBindParameter.equals(Property.RPC_PROCESS_BIND_ADDRESS.getDefaultValue())) { ++ if (newBindParameter != null ++ && !newBindParameter.equals(Property.RPC_PROCESS_BIND_ADDRESS.getDefaultValue())) { + this.hostname = newBindParameter; - } else if (oldBindParameterSpecifiedOnCmdLine) { - this.hostname = opts.getAddress(); + } else { - this.hostname = ServerOpts.BIND_ALL_ADDRESSES; ++ this.hostname = ConfigOpts.BIND_ALL_ADDRESSES; + } + this.resourceGroup = getResourceGroupPropertyValue(siteConfig); + ClusterConfigParser.validateGroupNames(List.of(resourceGroup)); SecurityUtil.serverLogin(siteConfig); - context = new ServerContext(siteConfig); - final String upgradePrepNode = context.getZooKeeperRoot() + Constants.ZPREPARE_FOR_UPGRADE; + context = serverContextFactory.apply(siteConfig); + log = LoggerFactory.getLogger(getClass()); try { - if (context.getZooReader().exists(upgradePrepNode)) { + if (context.getZooSession().asReader().exists(Constants.ZPREPARE_FOR_UPGRADE)) { throw new IllegalStateException( "Instance has been prepared for upgrade to a minor or major version greater than " + Constants.VERSION + ", no servers can be started." @@@ -293,47 -247,6 +300,47 @@@ return applicationName; } + @Override + public MetricResponse getMetrics(TInfo tinfo, TCredentials credentials) throws TException { + + if (!context.getSecurityOperation().authenticateUser(credentials, credentials)) { + throw new ThriftSecurityException(credentials.getPrincipal(), + SecurityErrorCode.PERMISSION_DENIED); + } + + final FlatBufferBuilder builder = new FlatBufferBuilder(1024); + final MetricResponseWrapper response = new MetricResponseWrapper(builder); + - if (getHostname().startsWith(Property.GENERAL_PROCESS_BIND_ADDRESS.getDefaultValue())) { ++ if (getHostname().startsWith(Property.RPC_PROCESS_BIND_ADDRESS.getDefaultValue())) { + log.error("Host is not set, this should have been done after starting the Thrift service."); + return response; + } + + if (metricSource == null) { + // Metrics not reported for Monitor type + return response; + } + + response.setServerType(metricSource); + response.setServer(getHostname()); + response.setResourceGroup(getResourceGroup()); + response.setTimestamp(System.currentTimeMillis()); + + if (context.getMetricsInfo().isMetricsEnabled()) { + Metrics.globalRegistry.getMeters().forEach(m -> { + if (m.getId().getName().startsWith("accumulo.")) { + m.match(response::writeMeter, response::writeMeter, response::writeTimer, + response::writeDistributionSummary, response::writeLongTaskTimer, + response::writeMeter, response::writeMeter, response::writeFunctionTimer, + response::writeMeter); + } + }); + } + + builder.clear(); + return response; + } + /** * Get the ServiceLock for this server process. May return null if called before the lock is * acquired. diff --cc server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java index 09f28e2504,7c34b6601d..f454984fc1 --- a/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java +++ b/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java @@@ -39,6 -39,6 +39,7 @@@ import java.util.stream.IntStream import javax.net.ssl.SSLServerSocket; ++import org.apache.accumulo.core.cli.ConfigOpts; import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.conf.PropertyType; @@@ -491,7 -499,7 +492,7 @@@ public class TServerUtils // If we can't get a real hostname from the provided host test, use the hostname from DNS for // localhost - if ("0.0.0.0".equals(hostname)) { - if (ServerOpts.BIND_ALL_ADDRESSES.equals(hostname)) { ++ if (ConfigOpts.BIND_ALL_ADDRESSES.equals(hostname)) { hostname = fqdn; } @@@ -655,11 -658,22 +656,11 @@@ "Unable to create server on addresses: " + Arrays.toString(addresses)); } - final TServer finalServer = serverAddress.server; - - Threads.createCriticalThread(threadName, finalServer::serve).start(); - - while (!finalServer.isServing()) { - // Wait for the thread to start and for the TServer to start - // serving events - UtilWaitThread.sleep(10); - Preconditions.checkState(!finalServer.getShouldStop()); - } - // check for the special "bind to everything address" - if (serverAddress.address.getHost().equals("0.0.0.0")) { - if (serverAddress.address.getHost().equals(ServerOpts.BIND_ALL_ADDRESSES)) { ++ if (serverAddress.address.getHost().equals(ConfigOpts.BIND_ALL_ADDRESSES)) { // can't get the address from the bind, so we'll do our best to invent our hostname try { - serverAddress = new ServerAddress(finalServer, HostAndPort + serverAddress = new ServerAddress(serverAddress.server, HostAndPort .fromParts(InetAddress.getLocalHost().getHostName(), serverAddress.address.getPort())); } catch (UnknownHostException e) { throw new TTransportException(e); diff --cc server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java index 760852d7e2,369e725734..eb535b4e66 --- a/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java +++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java @@@ -380,8 -474,15 +380,8 @@@ public class Monitor extends AbstractSe log.debug("Monitor started on port {}", livePort); } - try { - getMonitorLock(); - } catch (Exception e) { - log.error("Failed to get Monitor ZooKeeper lock"); - throw new RuntimeException(e); - } - String advertiseHost = getHostname(); - if (advertiseHost.equals("0.0.0.0")) { - if (advertiseHost.equals(ServerOpts.BIND_ALL_ADDRESSES)) { ++ if (advertiseHost.equals(ConfigOpts.BIND_ALL_ADDRESSES)) { try { advertiseHost = InetAddress.getLocalHost().getHostName(); } catch (UnknownHostException e) { diff --cc test/src/main/java/org/apache/accumulo/test/functional/GarbageCollectorIT.java index 79e785fa1f,25e89fbd60..e4d93597b8 --- a/test/src/main/java/org/apache/accumulo/test/functional/GarbageCollectorIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/GarbageCollectorIT.java @@@ -39,6 -40,6 +39,7 @@@ import java.util.stream.Collectors import java.util.stream.Stream; import org.apache.accumulo.core.Constants; ++import org.apache.accumulo.core.cli.ConfigOpts; import org.apache.accumulo.core.client.Accumulo; import org.apache.accumulo.core.client.AccumuloClient; import org.apache.accumulo.core.client.BatchWriter; @@@ -424,15 -424,22 +425,15 @@@ public class GarbageCollectorIT extend if (locks != null && !locks.isEmpty()) { String lockPath = path + "/" + locks.get(0); - String gcLoc = new String(zk.getData(lockPath), UTF_8); + Optional<ServiceLockData> sld = ServiceLockData.parse(zk.getData(lockPath)); - assertTrue(gcLoc.startsWith(Service.GC_CLIENT.name()), - "Found unexpected data in zookeeper for GC location: " + gcLoc); - int loc = gcLoc.indexOf(ServerServices.SEPARATOR_CHAR); - assertNotEquals(-1, loc, "Could not find split point of GC location for: " + gcLoc); - String addr = gcLoc.substring(loc + 1); + assertNotNull(sld.orElseThrow()); + HostAndPort hostAndPort = sld.orElseThrow().getAddress(ThriftService.GC); - int addrSplit = addr.indexOf(':'); - assertNotEquals(-1, addrSplit, "Could not find split of GC host:port for: " + addr); - - String host = addr.substring(0, addrSplit), port = addr.substring(addrSplit + 1); // We shouldn't have the "bindall" address in zk - assertNotEquals("0.0.0.0", hostAndPort.getHost()); - assertNotEquals(ServerOpts.BIND_ALL_ADDRESSES, host); ++ assertNotEquals(ConfigOpts.BIND_ALL_ADDRESSES, hostAndPort.getHost()); // Nor should we have the "random port" in zk - assertNotEquals(0, Integer.parseInt(port)); + assertNotEquals(0, hostAndPort.getPort()); return; } diff --cc test/src/main/java/org/apache/accumulo/test/functional/ZombieTServer.java index 71076c8764,3703b717a8..2f156c5882 --- a/test/src/main/java/org/apache/accumulo/test/functional/ZombieTServer.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/ZombieTServer.java @@@ -23,20 -24,18 +23,21 @@@ import static org.apache.accumulo.core. import java.util.HashMap; import java.util.UUID; -import java.util.concurrent.TimeUnit; import org.apache.accumulo.core.Constants; ++import org.apache.accumulo.core.cli.ConfigOpts; import org.apache.accumulo.core.clientImpl.thrift.ClientService; +import org.apache.accumulo.core.clientImpl.thrift.TInfo; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.conf.SiteConfiguration; -import org.apache.accumulo.core.fate.zookeeper.ServiceLock; -import org.apache.accumulo.core.fate.zookeeper.ServiceLock.LockLossReason; -import org.apache.accumulo.core.fate.zookeeper.ServiceLock.LockWatcher; import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeExistsPolicy; -import org.apache.accumulo.core.master.thrift.TabletServerStatus; +import org.apache.accumulo.core.lock.ServiceLock; +import org.apache.accumulo.core.lock.ServiceLock.LockLossReason; +import org.apache.accumulo.core.lock.ServiceLock.LockWatcher; +import org.apache.accumulo.core.lock.ServiceLockData; +import org.apache.accumulo.core.lock.ServiceLockData.ThriftService; +import org.apache.accumulo.core.manager.thrift.TabletServerStatus; import org.apache.accumulo.core.metrics.MetricsInfo; import org.apache.accumulo.core.rpc.clients.ThriftClientTypes; import org.apache.accumulo.core.securityImpl.thrift.TCredentials; @@@ -117,26 -116,26 +118,27 @@@ public class ZombieTServer ThriftProcessorTypes.CLIENT.getTProcessor(ClientService.Processor.class, ClientService.Iface.class, csh, context)); muxProcessor.registerProcessor(ThriftClientTypes.TABLET_SERVER.getServiceName(), - ThriftProcessorTypes.TABLET_SERVER.getTProcessor(TabletClientService.Processor.class, - TabletClientService.Iface.class, tch, context)); - muxProcessor.registerProcessor(ThriftProcessorTypes.TABLET_SERVER_SCAN.getServiceName(), - ThriftProcessorTypes.TABLET_SERVER_SCAN.getTProcessor( - TabletScanClientService.Processor.class, TabletScanClientService.Iface.class, tch, - context)); - - ServerAddress serverPort = TServerUtils.startTServer(context.getConfiguration(), - ThriftServerType.CUSTOM_HS_HA, muxProcessor, "ZombieTServer", "walking dead", 2, + ThriftProcessorTypes.TABLET_SERVER.getTProcessor(TabletServerClientService.Processor.class, + TabletServerClientService.Iface.class, tch, context)); + muxProcessor.registerProcessor(ThriftProcessorTypes.TABLET_SCAN.getServiceName(), + ThriftProcessorTypes.TABLET_SCAN.getTProcessor(TabletScanClientService.Processor.class, + TabletScanClientService.Iface.class, tch, context)); + + ServerAddress serverPort = TServerUtils.createThriftServer(context.getConfiguration(), + ThriftServerType.CUSTOM_HS_HA, muxProcessor, "ZombieTServer", 2, ThreadPools.DEFAULT_TIMEOUT_MILLISECS, 1000, 10 * 1024 * 1024, null, null, -1, context.getConfiguration().getCount(Property.RPC_BACKLOG), context.getMetricsInfo(), false, - HostAndPort.fromParts("0.0.0.0", port)); - HostAndPort.fromParts(ServerOpts.BIND_ALL_ADDRESSES, port)); ++ HostAndPort.fromParts(ConfigOpts.BIND_ALL_ADDRESSES, port)); + serverPort.startThriftServer("walking dead"); + String addressString = serverPort.address.toString(); - var zLockPath = - ServiceLock.path(context.getZooKeeperRoot() + Constants.ZTSERVERS + "/" + addressString); - ZooReaderWriter zoo = context.getZooReaderWriter(); + + var zLockPath = context.getServerPaths() + .createTabletServerPath(Constants.DEFAULT_RESOURCE_GROUP_NAME, serverPort.address); + ZooReaderWriter zoo = context.getZooSession().asReaderWriter(); zoo.putPersistentData(zLockPath.toString(), new byte[] {}, NodeExistsPolicy.SKIP); - ServiceLock zlock = new ServiceLock(zoo.getZooKeeper(), zLockPath, UUID.randomUUID()); + ServiceLock zlock = new ServiceLock(context.getZooSession(), zLockPath, UUID.randomUUID()); MetricsInfo metricsInfo = context.getMetricsInfo(); metricsInfo.init(MetricsInfo.serviceTags(context.getInstanceName(), "zombie.server", diff --cc test/src/main/java/org/apache/accumulo/test/metrics/MetricsIT.java index 742f0ccade,b7cefe3c8b..03c035b64b --- a/test/src/main/java/org/apache/accumulo/test/metrics/MetricsIT.java +++ b/test/src/main/java/org/apache/accumulo/test/metrics/MetricsIT.java @@@ -45,8 -31,8 +45,9 @@@ import java.util.Map import java.util.Set; import java.util.SortedSet; import java.util.TreeSet; -import java.util.function.Predicate; +import java.util.concurrent.atomic.AtomicReference; ++import org.apache.accumulo.core.cli.ConfigOpts; import org.apache.accumulo.core.client.Accumulo; import org.apache.accumulo.core.client.AccumuloClient; import org.apache.accumulo.core.client.BatchWriter; @@@ -287,20 -205,10 +288,21 @@@ public class MetricsIT extends Configur statsDMetrics.stream().filter(line -> line.startsWith("accumulo")) .map(TestStatsDSink::parseStatsDMetric).forEach(a -> { var t = a.getTags(); - log.trace("METRICS, name: '{}' num tags: {}, tags: {}", a.getName(), t.size(), t); + log.info("METRICS, received from statsd - name: '{}' num tags: {}, tags: {} = {}", + a.getName(), t.size(), t, a.getValue()); // check hostname is always set and is valid - assertNotEquals("0.0.0.0", a.getTags().get(MetricsInfo.HOST_TAG_KEY)); - assertNotEquals(ServerOpts.BIND_ALL_ADDRESSES, a.getTags().get("host")); - assertNotNull(a.getTags().get("instance.name")); ++ assertNotEquals(ConfigOpts.BIND_ALL_ADDRESSES, ++ a.getTags().get(MetricsInfo.HOST_TAG_KEY)); + assertNotNull(a.getTags().get(MetricsInfo.INSTANCE_NAME_TAG_KEY)); + + assertNotNull(a.getTags().get(MetricsInfo.PROCESS_NAME_TAG_KEY)); + + // check resource.group tag exists + assertNotNull(a.getTags().get(MetricsInfo.RESOURCE_GROUP_TAG_KEY)); + + // check that the user tags are present + assertEquals("value1", a.getTags().get("tag1")); + assertEquals("value2", a.getTags().get("tag2")); // check the length of the tag value is sane final int MAX_EXPECTED_TAG_LEN = 128; diff --cc test/src/main/java/org/apache/accumulo/test/performance/NullTserver.java index 7a09fc5e85,d9625eaa21..4b1286625d --- a/test/src/main/java/org/apache/accumulo/test/performance/NullTserver.java +++ b/test/src/main/java/org/apache/accumulo/test/performance/NullTserver.java @@@ -26,12 -26,11 +26,13 @@@ import java.util.ArrayList import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.TimeUnit; +import java.util.UUID; +import org.apache.accumulo.core.Constants; ++import org.apache.accumulo.core.cli.ConfigOpts; import org.apache.accumulo.core.cli.Help; import org.apache.accumulo.core.clientImpl.thrift.ClientService; -import org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException; +import org.apache.accumulo.core.clientImpl.thrift.TInfo; import org.apache.accumulo.core.conf.DefaultConfiguration; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.conf.SiteConfiguration; @@@ -301,90 -340,41 +302,90 @@@ public class NullTserver ThriftProcessorTypes.CLIENT.getTProcessor(ClientService.Processor.class, ClientService.Iface.class, csh, context)); muxProcessor.registerProcessor(ThriftClientTypes.TABLET_SERVER.getServiceName(), - ThriftProcessorTypes.TABLET_SERVER.getTProcessor(TabletClientService.Processor.class, - TabletClientService.Iface.class, tch, context)); - muxProcessor.registerProcessor(ThriftProcessorTypes.TABLET_SERVER_SCAN.getServiceName(), - ThriftProcessorTypes.TABLET_SERVER_SCAN.getTProcessor( - TabletScanClientService.Processor.class, TabletScanClientService.Iface.class, tch, - context)); - - TServerUtils.startTServer(context.getConfiguration(), ThriftServerType.CUSTOM_HS_HA, - muxProcessor, "NullTServer", "null tserver", 2, ThreadPools.DEFAULT_TIMEOUT_MILLISECS, 1000, - 10 * 1024 * 1024, null, null, -1, context.getConfiguration().getCount(Property.RPC_BACKLOG), - context.getMetricsInfo(), false, - HostAndPort.fromParts(ServerOpts.BIND_ALL_ADDRESSES, opts.port)); - - HostAndPort addr = HostAndPort.fromParts(InetAddress.getLocalHost().getHostName(), opts.port); - - TableId tableId = context.getTableId(opts.tableName); - - // read the locations for the table - Range tableRange = new KeyExtent(tableId, null, null).toMetaRange(); - List<Assignment> assignments = new ArrayList<>(); - try (var s = new MetaDataTableScanner(context, tableRange, DataLevel.of(tableId))) { - long randomSessionID = opts.port; - TServerInstance instance = new TServerInstance(addr, randomSessionID); - - while (s.hasNext()) { - TabletLocationState next = s.next(); - assignments.add(new Assignment(next.extent, instance, next.last)); + ThriftProcessorTypes.TABLET_SERVER.getTProcessor(TabletServerClientService.Processor.class, + TabletServerClientService.Iface.class, tch, context)); + muxProcessor.registerProcessor(ThriftProcessorTypes.TABLET_SCAN.getServiceName(), + ThriftProcessorTypes.TABLET_SCAN.getTProcessor(TabletScanClientService.Processor.class, + TabletScanClientService.Iface.class, tch, context)); + muxProcessor.registerProcessor(ThriftClientTypes.TABLET_INGEST.getServiceName(), + ThriftProcessorTypes.TABLET_INGEST.getTProcessor(TabletIngestClientService.Processor.class, + TabletIngestClientService.Iface.class, tch, context)); + muxProcessor.registerProcessor(ThriftProcessorTypes.TABLET_MGMT.getServiceName(), + ThriftProcessorTypes.TABLET_MGMT.getTProcessor( + TabletManagementClientService.Processor.class, + TabletManagementClientService.Iface.class, tch, context)); + + ServerAddress sa = TServerUtils.createThriftServer(context.getConfiguration(), + ThriftServerType.CUSTOM_HS_HA, muxProcessor, "NullTServer", 2, + ThreadPools.DEFAULT_TIMEOUT_MILLISECS, 1000, 10 * 1024 * 1024, null, null, -1, + context.getConfiguration().getCount(Property.RPC_BACKLOG), context.getMetricsInfo(), false, - HostAndPort.fromParts("0.0.0.0", opts.port)); ++ HostAndPort.fromParts(ConfigOpts.BIND_ALL_ADDRESSES, opts.port)); + sa.startThriftServer("null tserver"); + + AccumuloLockWatcher miniLockWatcher = new AccumuloLockWatcher() { + + @Override + public void lostLock(LockLossReason reason) { + LOG.warn("Lost lock: " + reason.toString()); } - } - // point them to this server - TabletStateStore store = TabletStateStore.getStoreForLevel(DataLevel.USER, context); - store.setLocations(assignments); - while (true) { - sleepUninterruptibly(10, TimeUnit.SECONDS); + @Override + public void unableToMonitorLockNode(Exception e) { + LOG.warn("Unable to monitor lock: " + e.getMessage()); + } + + @Override + public void acquiredLock() { + LOG.debug("Acquired ZooKeeper lock for NullTserver"); + } + + @Override + public void failedToAcquireLock(Exception e) { + LOG.warn("Failed to acquire ZK lock for NullTserver, msg: " + e.getMessage()); + } + }; + + ServiceLock miniLock = null; + try { + ZooSession zk = context.getZooSession(); + UUID nullTServerUUID = UUID.randomUUID(); + ServiceLockPath slp = context.getServerPaths().createMiniPath(nullTServerUUID.toString()); + try { + zk.asReaderWriter().mkdirs(slp.toString()); + } catch (KeeperException | InterruptedException e) { + throw new IllegalStateException("Error creating path in ZooKeeper", e); + } + ServiceLockData sld = new ServiceLockData(nullTServerUUID, "localhost", ThriftService.TSERV, + Constants.DEFAULT_RESOURCE_GROUP_NAME); + miniLock = new ServiceLock(zk, slp, UUID.randomUUID()); + miniLock.lock(miniLockWatcher, sld); + context.setServiceLock(miniLock); + HostAndPort addr = HostAndPort.fromParts(InetAddress.getLocalHost().getHostName(), opts.port); + + // read the locations for the table + List<Assignment> assignments = new ArrayList<>(); + try (var tablets = context.getAmple().readTablets().forLevel(DataLevel.USER).build()) { + long randomSessionID = opts.port; + TServerInstance instance = new TServerInstance(addr, randomSessionID); + var s = tablets.iterator(); + + while (s.hasNext()) { + TabletMetadata next = s.next(); + assignments.add(new Assignment(next.getExtent(), instance, next.getLast())); + } + } + // point them to this server + TabletStateStore store = TabletStateStore.getStoreForLevel(DataLevel.USER, context); + store.setLocations(assignments); + + while (true) { + Thread.sleep(SECONDS.toMillis(10)); + } + + } finally { + if (miniLock != null) { + miniLock.unlock(); + } } } }
