(accumulo) 01/01: Merge branch 'main' into elasticity
This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git commit 4575c2f6557519eff8dcae67177fcfee62e16a72 Merge: 79337889f7 ca80d7cb46 Author: Dave Marion AuthorDate: Thu May 30 20:29:54 2024 + Merge branch 'main' into elasticity .../org/apache/accumulo/tserver/ScanServer.java| 2 +- .../apache/accumulo/test/ScanServerShutdownIT.java | 139 + .../accumulo/test/SelfStoppingScanServer.java | 59 + 3 files changed, 199 insertions(+), 1 deletion(-) diff --cc test/src/main/java/org/apache/accumulo/test/ScanServerShutdownIT.java index 00,6c2d7aac6b..247742ebd3 mode 00,100644..100644 --- a/test/src/main/java/org/apache/accumulo/test/ScanServerShutdownIT.java +++ b/test/src/main/java/org/apache/accumulo/test/ScanServerShutdownIT.java @@@ -1,0 -1,138 +1,139 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package org.apache.accumulo.test; + + import static org.junit.jupiter.api.Assertions.assertEquals; + import static org.junit.jupiter.api.Assertions.assertNotNull; + import static org.junit.jupiter.api.Assertions.assertTrue; + + import java.util.Collections; + import java.util.Iterator; + import java.util.Map.Entry; + + import org.apache.accumulo.core.Constants; + import org.apache.accumulo.core.client.Accumulo; + import org.apache.accumulo.core.client.AccumuloClient; + import org.apache.accumulo.core.client.BatchScanner; + import org.apache.accumulo.core.client.ScannerBase.ConsistencyLevel; + import org.apache.accumulo.core.clientImpl.ClientContext; + import org.apache.accumulo.core.conf.Property; + import org.apache.accumulo.core.data.Key; + import org.apache.accumulo.core.data.Range; + import org.apache.accumulo.core.data.Value; + import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; + import org.apache.accumulo.core.security.Authorizations; -import org.apache.accumulo.core.spi.scan.ScanServerSelector; + import org.apache.accumulo.harness.MiniClusterConfigurationCallback; + import org.apache.accumulo.harness.SharedMiniClusterBase; + import org.apache.accumulo.minicluster.ServerType; + import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; + import org.apache.accumulo.server.ServerContext; + import org.apache.accumulo.test.util.Wait; + import org.junit.jupiter.api.AfterAll; + import org.junit.jupiter.api.BeforeAll; + import org.junit.jupiter.api.Test; + + public class ScanServerShutdownIT extends SharedMiniClusterBase { + + private static class ScanServerShutdownITConfiguration + implements MiniClusterConfigurationCallback { + + @Override + public void configureMiniCluster(MiniAccumuloConfigImpl cfg, + org.apache.hadoop.conf.Configuration coreSite) { - cfg.setNumScanServers(1); ++ ++ cfg.getClusterServerConfiguration().setNumDefaultScanServers(0); + + // Timeout scan sessions after being idle for 3 seconds + cfg.setProperty(Property.TSERV_SESSION_MAXIDLE, "3s"); + + // Configure the scan server to only have 1 scan executor thread. This means + // that the scan server will run scans serially, not concurrently. + cfg.setProperty(Property.SSERV_SCAN_EXECUTORS_DEFAULT_THREADS, "1"); + } + } + + @BeforeAll + public static void start() throws Exception { + ScanServerShutdownITConfiguration c = new ScanServerShutdownITConfiguration(); + SharedMiniClusterBase.startMiniClusterWithConfig(c); + } + + @AfterAll + public static void stop() throws Exception { + SharedMiniClusterBase.stopMiniCluster(); + } + + @Test + public void testRefRemovalOnShutdown() throws Exception { + + ServerContext ctx = getCluster().getServerContext(); + String zooRoot = ctx.getZooKeeperRoot(); + ZooReaderWriter zrw = ctx.getZooReaderWriter(); + String scanServerRoot = zooRoot + Constants.ZSSERVERS; + + Wait.waitFor(() -> zrw.getChildren(scanServerRoot).size() == 0); + + // Stop normal ScanServers so that we can start our custom implementation + // that shuts
(accumulo) branch elasticity updated (79337889f7 -> 4575c2f655)
This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a change to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git from 79337889f7 ensure only single location is set in conditional location check (#4620) add 21f04a6778 Added ScanServerShutdownIT to confirm refs deleted on shutdown (#4615) add ca80d7cb46 Merge branch '2.1' new 4575c2f655 Merge branch 'main' into elasticity The 1 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../org/apache/accumulo/tserver/ScanServer.java| 2 +- .../apache/accumulo/test/ScanServerShutdownIT.java | 139 + .../accumulo/test/SelfStoppingScanServer.java | 59 + 3 files changed, 199 insertions(+), 1 deletion(-) create mode 100644 test/src/main/java/org/apache/accumulo/test/ScanServerShutdownIT.java create mode 100644 test/src/main/java/org/apache/accumulo/test/SelfStoppingScanServer.java
(accumulo) 01/01: Merge branch '2.1'
This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git commit ca80d7cb4688796f1a15c307cbcf51a385bc80e9 Merge: 627ed8 21f04a6778 Author: Dave Marion AuthorDate: Thu May 30 20:04:16 2024 + Merge branch '2.1' .../apache/accumulo/cluster/ClusterControl.java| 11 ++ .../standalone/StandaloneClusterControl.java | 8 ++ .../MiniAccumuloClusterControl.java| 14 +++ .../org/apache/accumulo/tserver/ScanServer.java| 2 +- .../apache/accumulo/test/ScanServerShutdownIT.java | 138 + .../accumulo/test/SelfStoppingScanServer.java | 59 + 6 files changed, 231 insertions(+), 1 deletion(-) diff --cc minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterControl.java index ff9e6d3223,69f6f64297..9e395bb47c --- a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterControl.java +++ b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterControl.java @@@ -181,6 -178,18 +181,20 @@@ public class MiniAccumuloClusterContro } } + @Override + public synchronized void startScanServer(Class scanServer, int limit, + String groupName) throws IOException { + synchronized (scanServerProcesses) { + int count = + Math.min(limit, cluster.getConfig().getNumScanServers() - scanServerProcesses.size()); + for (int i = 0; i < count; i++) { -scanServerProcesses.add(cluster.exec(scanServer, "-g", groupName).getProcess()); ++scanServerProcesses.add( ++cluster.exec(scanServer, "-o", Property.SSERV_GROUP_NAME.getKey() + "=" + groupName) ++.getProcess()); + } + } + } + @Override public synchronized void startAllServers(ServerType server) throws IOException { start(server, null); diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java index 201bfada4f,2216b55489..cd02951c63 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java @@@ -198,8 -200,9 +198,8 @@@ public class ScanServer extends Abstrac private final SessionManager sessionManager; private final TabletServerResourceManager resourceManager; HostAndPort clientAddress; - private final GarbageCollectionLogger gcLogger = new GarbageCollectionLogger(); - private volatile boolean serverStopRequested = false; + protected volatile boolean serverStopRequested = false; private ServiceLock scanServerLock; protected TabletServerScanMetrics scanMetrics; private ScanServerMetrics scanServerMetrics; diff --cc test/src/main/java/org/apache/accumulo/test/SelfStoppingScanServer.java index 00,8391190984..73284dc8c5 mode 00,100644..100644 --- a/test/src/main/java/org/apache/accumulo/test/SelfStoppingScanServer.java +++ b/test/src/main/java/org/apache/accumulo/test/SelfStoppingScanServer.java @@@ -1,0 -1,58 +1,59 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package org.apache.accumulo.test; + + import java.util.concurrent.atomic.AtomicInteger; + ++import org.apache.accumulo.core.cli.ConfigOpts; ++import org.apache.accumulo.core.clientImpl.thrift.TInfo; ++import org.apache.accumulo.core.tabletscan.thrift.TabletScanClientService; + import org.apache.accumulo.core.tabletserver.thrift.NoSuchScanIDException; -import org.apache.accumulo.core.tabletserver.thrift.TabletScanClientService; -import org.apache.accumulo.core.trace.thrift.TInfo; + import org.apache.accumulo.tserver.ScanServer; + import org.apache.accumulo.tserver.TabletHostingServer; + import org.apache.thrift.TException; + + /** + * ScanServer implementation that will stop itself after the the 3rd scan batch scan + * + */ + public class SelfStoppingScanServer extends ScanServer + implements TabletScanClientService.Iface, TabletHostingServer { + + private final AtomicInteger scanCount = new AtomicInteger(0); + - publ
(accumulo) branch main updated (627ed85555 -> ca80d7cb46)
This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a change to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git from 627ed8 Use NanoTime object for rate limiting in FileCompactor stats update add 21f04a6778 Added ScanServerShutdownIT to confirm refs deleted on shutdown (#4615) new ca80d7cb46 Merge branch '2.1' The 1 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../apache/accumulo/cluster/ClusterControl.java| 11 ++ .../standalone/StandaloneClusterControl.java | 8 ++ .../MiniAccumuloClusterControl.java| 14 +++ .../org/apache/accumulo/tserver/ScanServer.java| 2 +- .../apache/accumulo/test/ScanServerShutdownIT.java | 138 + .../accumulo/test/SelfStoppingScanServer.java | 59 + 6 files changed, 231 insertions(+), 1 deletion(-) create mode 100644 test/src/main/java/org/apache/accumulo/test/ScanServerShutdownIT.java create mode 100644 test/src/main/java/org/apache/accumulo/test/SelfStoppingScanServer.java
(accumulo) branch 2.1 updated (d6eb1df8ee -> 21f04a6778)
This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a change to branch 2.1 in repository https://gitbox.apache.org/repos/asf/accumulo.git from d6eb1df8ee Add metrics for entries read and written during compactions (#4572) add 21f04a6778 Added ScanServerShutdownIT to confirm refs deleted on shutdown (#4615) No new revisions were added by this update. Summary of changes: .../apache/accumulo/cluster/ClusterControl.java| 11 ++ .../standalone/StandaloneClusterControl.java | 8 ++ .../MiniAccumuloClusterControl.java| 12 ++ .../org/apache/accumulo/tserver/ScanServer.java| 2 +- .../apache/accumulo/test/ScanServerShutdownIT.java | 138 + .../accumulo/test/SelfStoppingScanServer.java | 58 + 6 files changed, 228 insertions(+), 1 deletion(-) create mode 100644 test/src/main/java/org/apache/accumulo/test/ScanServerShutdownIT.java create mode 100644 test/src/main/java/org/apache/accumulo/test/SelfStoppingScanServer.java
(accumulo) branch main updated: Fix sserver group property argument in ScanServerGroupConfigurationIT (#4606)
This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git The following commit(s) were added to refs/heads/main by this push: new bd024372a1 Fix sserver group property argument in ScanServerGroupConfigurationIT (#4606) bd024372a1 is described below commit bd024372a162690792bc8b1baae4c9951e792f4b Author: Dave Marion AuthorDate: Tue May 28 10:15:04 2024 -0400 Fix sserver group property argument in ScanServerGroupConfigurationIT (#4606) --- .../org/apache/accumulo/test/ScanServerGroupConfigurationIT.java | 5 - 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/test/src/main/java/org/apache/accumulo/test/ScanServerGroupConfigurationIT.java b/test/src/main/java/org/apache/accumulo/test/ScanServerGroupConfigurationIT.java index c18e6e1aff..6773251223 100644 --- a/test/src/main/java/org/apache/accumulo/test/ScanServerGroupConfigurationIT.java +++ b/test/src/main/java/org/apache/accumulo/test/ScanServerGroupConfigurationIT.java @@ -165,8 +165,11 @@ public class ScanServerGroupConfigurationIT extends SharedMiniClusterBase { // Bump the number of scan serves that can run to start the GROUP1 scan server getCluster().getConfig().setNumScanServers(2); +// If the following fails to start the ScanServer, it's possible that the value +// of property SSERV_GROUP_NAME has changed. If that is the case, then this test +// and the scripts need to be updated. getCluster()._exec(ScanServer.class, ServerType.SCAN_SERVER, Map.of(), -new String[] {"-g", "GROUP1"}); +new String[] {"-o", "sserver.group=GROUP1"}); Wait.waitFor(() -> zk.getChildren(scanServerRoot, false).size() == 2); Wait.waitFor(() -> ((ClientContext) client).getScanServers().values().stream().anyMatch( (p) -> p.getSecond().equals(ScanServerSelector.DEFAULT_SCAN_SERVER_GROUP_NAME))
(accumulo) branch main updated: Included port number in Monitor ServiceLockData (#4607)
This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git The following commit(s) were added to refs/heads/main by this push: new 29ff11258d Included port number in Monitor ServiceLockData (#4607) 29ff11258d is described below commit 29ff11258dc173e786a418d053141a25fb3c7f69 Author: Dave Marion AuthorDate: Tue May 28 10:06:04 2024 -0400 Included port number in Monitor ServiceLockData (#4607) Closes #4602 --- .../java/org/apache/accumulo/monitor/Monitor.java | 21 +++-- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java b/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java index 10ccf19c6c..2740a2049f 100644 --- a/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java +++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java @@ -474,13 +474,6 @@ public class Monitor extends AbstractServer implements HighlyAvailableService { log.debug("Monitor started on port {}", livePort); } -try { - getMonitorLock(); -} catch (Exception e) { - log.error("Failed to get Monitor ZooKeeper lock"); - throw new RuntimeException(e); -} - String advertiseHost = getHostname(); if (advertiseHost.equals("0.0.0.0")) { try { @@ -492,6 +485,13 @@ public class Monitor extends AbstractServer implements HighlyAvailableService { HostAndPort monitorHostAndPort = HostAndPort.fromParts(advertiseHost, livePort); log.debug("Using {} to advertise monitor location in ZooKeeper", monitorHostAndPort); +try { + getMonitorLock(monitorHostAndPort); +} catch (Exception e) { + log.error("Failed to get Monitor ZooKeeper lock"); + throw new RuntimeException(e); +} + MetricsInfo metricsInfo = getContext().getMetricsInfo(); metricsInfo.addServiceTags(getApplicationName(), monitorHostAndPort); metricsInfo.addMetricsProducers(this); @@ -796,7 +796,8 @@ public class Monitor extends AbstractServer implements HighlyAvailableService { /** * Get the monitor lock in ZooKeeper */ - private void getMonitorLock() throws KeeperException, InterruptedException { + private void getMonitorLock(HostAndPort monitorLocation) + throws KeeperException, InterruptedException { ServerContext context = getContext(); final String zRoot = context.getZooKeeperRoot(); final String monitorPath = zRoot + Constants.ZMONITOR; @@ -833,8 +834,8 @@ public class Monitor extends AbstractServer implements HighlyAvailableService { while (true) { MoniterLockWatcher monitorLockWatcher = new MoniterLockWatcher(); monitorLock = new ServiceLock(zoo.getZooKeeper(), monitorLockPath, zooLockUUID); - monitorLock.lock(monitorLockWatcher, - new ServiceLockData(zooLockUUID, getHostname(), ThriftService.NONE)); + monitorLock.lock(monitorLockWatcher, new ServiceLockData(zooLockUUID, + monitorLocation.getHost() + ":" + monitorLocation.getPort(), ThriftService.NONE)); monitorLockWatcher.waitForChange();
(accumulo) 01/03: Merge branch 'main' into elasticity
This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git commit 7facf2f35556247dbf847d854fe995d5c20ad105 Merge: d7264bc1d0 aada55ef50 Author: Dave Marion AuthorDate: Fri May 24 17:43:12 2024 + Merge branch 'main' into elasticity core/pom.xml | 16 ++ .../accumulo/core/logging/ConditionalLogger.java | 194 + .../core/logging/DeduplicatingLoggerTest.java | 69 .../core/logging/EscalatingLoggerTest.java | 77 .../accumulo/manager/TabletGroupWatcher.java | 11 +- .../accumulo/tserver/UnloadTabletHandler.java | 1 - .../org/apache/accumulo/tserver/tablet/Tablet.java | 22 ++- 7 files changed, 386 insertions(+), 4 deletions(-) diff --cc server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java index 1b41145fa8,443df6c8f3..9299aab1be --- a/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java @@@ -20,13 -20,10 +20,14 @@@ package org.apache.accumulo.manager import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly; import static java.lang.Math.min; +import static java.util.Objects.requireNonNull; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.FILES; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOGS; import java.io.IOException; + import java.time.Duration; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@@ -56,11 -56,10 +57,12 @@@ import org.apache.accumulo.core.data.Ra import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.dataImpl.KeyExtent; -import org.apache.accumulo.core.gc.ReferenceFile; + import org.apache.accumulo.core.logging.ConditionalLogger.EscalatingLogger; import org.apache.accumulo.core.logging.TabletLogger; +import org.apache.accumulo.core.manager.state.TabletManagement; +import org.apache.accumulo.core.manager.state.TabletManagement.ManagementAction; import org.apache.accumulo.core.manager.state.tables.TableState; +import org.apache.accumulo.core.manager.thrift.ManagerGoalState; import org.apache.accumulo.core.manager.thrift.ManagerState; import org.apache.accumulo.core.manager.thrift.TabletServerStatus; import org.apache.accumulo.core.metadata.AccumuloTable; @@@ -100,31 -110,18 +102,36 @@@ import org.apache.hadoop.fs.Path import org.apache.hadoop.io.Text; import org.apache.thrift.TException; import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import org.slf4j.event.Level; -import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableSortedSet; import com.google.common.collect.Iterators; abstract class TabletGroupWatcher extends AccumuloDaemonThread { + public static class BadLocationStateException extends Exception { +private static final long serialVersionUID = 2L; + +// store as byte array because Text isn't Serializable +private final byte[] metadataTableEntry; + +public BadLocationStateException(String msg, Text row) { + super(msg); + this.metadataTableEntry = TextUtil.getBytes(requireNonNull(row)); +} + +public Text getEncodedEndRow() { + return new Text(metadataTableEntry); +} + } + + private static final Logger LOG = LoggerFactory.getLogger(TabletGroupWatcher.class); ++ + private static final Logger TABLET_UNLOAD_LOGGER = + new EscalatingLogger(Manager.log, Duration.ofMinutes(5), 1000, Level.INFO); ++ private final Manager manager; private final TabletStateStore store; private final TabletGroupWatcher dependentWatcher; @@@ -222,536 -182,203 +229,536 @@@ } } - @Override - public void run() { -int[] oldCounts = new int[TabletState.values().length]; -EventCoordinator.Listener eventListener = this.manager.nextEvent.getListener(); + class EventHandler implements EventCoordinator.Listener { -WalStateManager wals = new WalStateManager(manager.getContext()); +// Setting this to true to start with because its not know what happended before this object was +// created, so just start off with full scan. +private boolean needsFullScan = true; -while (manager.stillManager()) { - // slow things down a little, otherwise we spam the logs when there are many wake-up events - sleepUninterruptibly(100, TimeUnit.MILLISECONDS); +private final BlockingQueue rangesToProcess; - final long waitTimeBetweenScans = manager.getConfiguration
(accumulo) 03/03: Merge remote-tracking branch 'upstream/elasticity' into elasticity
This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git commit 025a821d3689a2d06d2463da19ea80d4347257a4 Merge: 9edcbdc845 7af0decd1c Author: Dave Marion AuthorDate: Fri May 24 17:49:40 2024 + Merge remote-tracking branch 'upstream/elasticity' into elasticity .../apache/accumulo/manager/tableOps/merge/ReserveTablets.java| 8 ++-- 1 file changed, 6 insertions(+), 2 deletions(-)
(accumulo) branch elasticity updated (7af0decd1c -> 025a821d36)
This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a change to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git from 7af0decd1c speed up merge operation for lots of tablets (#4574) add 221259e12e Log message when Tablet has been unloading for over 15 minutes (#4558) add aada55ef50 Merge branch '2.1' new 7facf2f355 Merge branch 'main' into elasticity new 9edcbdc845 Merge remote-tracking branch 'upstream/elasticity' into elasticity new 025a821d36 Merge remote-tracking branch 'upstream/elasticity' into elasticity The 3 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: core/pom.xml | 16 ++ .../accumulo/core/logging/ConditionalLogger.java | 194 + .../core/logging/DeduplicatingLoggerTest.java | 69 .../core/logging/EscalatingLoggerTest.java | 77 .../accumulo/manager/TabletGroupWatcher.java | 11 +- .../accumulo/tserver/UnloadTabletHandler.java | 1 - .../org/apache/accumulo/tserver/tablet/Tablet.java | 22 ++- 7 files changed, 386 insertions(+), 4 deletions(-) create mode 100644 core/src/main/java/org/apache/accumulo/core/logging/ConditionalLogger.java create mode 100644 core/src/test/java/org/apache/accumulo/core/logging/DeduplicatingLoggerTest.java create mode 100644 core/src/test/java/org/apache/accumulo/core/logging/EscalatingLoggerTest.java
(accumulo) 02/03: Merge remote-tracking branch 'upstream/elasticity' into elasticity
This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git commit 9edcbdc845fa8ff04308d81b2591944244d2152b Merge: 7facf2f355 479b2637fc Author: Dave Marion AuthorDate: Fri May 24 17:43:36 2024 + Merge remote-tracking branch 'upstream/elasticity' into elasticity .../accumulo/core/fate/AbstractFateStore.java | 19 +- .../java/org/apache/accumulo/core/fate/Fate.java | 96 ++--- .../org/apache/accumulo/core/fate/FateCleaner.java | 2 +- .../apache/accumulo/core/fate/MetaFateStore.java | 14 +- .../accumulo/core/fate/ReadOnlyFateStore.java | 15 +- .../accumulo/core/fate/user/FateStatusFilter.java | 71 .../accumulo/core/fate/user/UserFateStore.java | 4 +- .../apache/accumulo/core/logging/FateLogger.java | 6 + .../org/apache/accumulo/core/fate/TestStore.java | 5 + .../accumulo/server/init/InitialConfiguration.java | 6 + .../accumulo/manager/tableOps/split/PreSplit.java | 5 - .../accumulo/test/fate/FateInterleavingIT.java | 396 + .../org/apache/accumulo/test/fate/FateStoreIT.java | 39 ++ .../test/fate/meta/MetaFateInterleavingIT.java | 44 +++ .../test/fate/user/UserFateInterleavingIT.java | 42 +++ .../apache/accumulo/test/metrics/MetricsIT.java| 3 +- 16 files changed, 710 insertions(+), 57 deletions(-)
(accumulo) 01/01: Merge branch '2.1'
This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git commit aada55ef50bf6de51e440d9a8162c0a34424a7a9 Merge: ef213b520b 221259e12e Author: Dave Marion AuthorDate: Fri May 24 16:55:20 2024 + Merge branch '2.1' core/pom.xml | 16 ++ .../accumulo/core/logging/ConditionalLogger.java | 194 + .../core/logging/DeduplicatingLoggerTest.java | 69 .../core/logging/EscalatingLoggerTest.java | 77 .../accumulo/manager/TabletGroupWatcher.java | 11 +- .../accumulo/tserver/UnloadTabletHandler.java | 1 - .../org/apache/accumulo/tserver/tablet/Tablet.java | 22 ++- 7 files changed, 385 insertions(+), 5 deletions(-) diff --cc server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java index 7adba90fe0,662fca40d4..443df6c8f3 --- a/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java @@@ -18,10 -18,11 +18,11 @@@ */ package org.apache.accumulo.manager; +import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly; import static java.lang.Math.min; -import static org.apache.accumulo.core.util.UtilWaitThread.sleepUninterruptibly; import java.io.IOException; + import java.time.Duration; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@@ -107,16 -102,17 +109,19 @@@ import org.apache.accumulo.server.table import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.thrift.TException; + import org.slf4j.Logger; + import org.slf4j.event.Level; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableSortedSet; import com.google.common.collect.Iterators; +import com.google.common.collect.Sets; abstract class TabletGroupWatcher extends AccumuloDaemonThread { - // Constants used to make sure assignment logging isn't excessive in quantity or size + private static final Logger TABLET_UNLOAD_LOGGER = + new EscalatingLogger(Manager.log, Duration.ofMinutes(5), 1000, Level.INFO); private final Manager manager; private final TabletStateStore store; private final TabletGroupWatcher dependentWatcher; @@@ -352,10 -350,10 +357,10 @@@ manager.tserverSet.getConnection(location.getServerInstance()); if (client != null) { try { - Manager.log.trace("[{}] Requesting TabletServer {} unload {} {}", store.name(), - location.getServerInstance(), tls.extent, goal.howUnload()); + TABLET_UNLOAD_LOGGER.trace("[{}] Requesting TabletServer {} unload {} {}", + store.name(), location.getServerInstance(), tls.extent, goal.howUnload()); client.unloadTablet(manager.managerLock, tls.extent, goal.howUnload(), -manager.getSteadyTime()); +manager.getSteadyTime().getMillis()); unloaded++; totalUnloaded++; } catch (TException tException) { diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java index 26a2204a08,b6ad6150cb..4ea148046b --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java @@@ -63,16 -64,17 +64,17 @@@ import org.apache.accumulo.core.data.Mu import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.dataImpl.KeyExtent; -import org.apache.accumulo.core.dataImpl.thrift.MapFileInfo; import org.apache.accumulo.core.file.FileOperations; +import org.apache.accumulo.core.file.FilePrefix; import org.apache.accumulo.core.iterators.SortedKeyValueIterator; import org.apache.accumulo.core.iteratorsImpl.system.SourceSwitchingIterator; + import org.apache.accumulo.core.logging.ConditionalLogger.DeduplicatingLogger; import org.apache.accumulo.core.logging.TabletLogger; import org.apache.accumulo.core.manager.state.tables.TableState; -import org.apache.accumulo.core.master.thrift.BulkImportState; -import org.apache.accumulo.core.metadata.MetadataTable; +import org.apache.accumulo.core.manager.thrift.BulkImportState; +import org.apache.accumulo.core.metadata.AccumuloTable; +import org.apache.accumulo.core.metadata.ReferencedTabletFile; import org.apache.accumulo.core.metadata.StoredTabletFile; -import org.apache.accumulo.core.metadata.TabletFile; import org.apache.accumulo.core.metadata.schema.Dat
(accumulo) branch main updated (ef213b520b -> aada55ef50)
This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a change to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git from ef213b520b replaces sorted map w/ list for key vals in tablet metadata (#4600) add 221259e12e Log message when Tablet has been unloading for over 15 minutes (#4558) new aada55ef50 Merge branch '2.1' The 1 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: core/pom.xml | 16 ++ .../accumulo/core/logging/ConditionalLogger.java | 194 + .../core/logging/DeduplicatingLoggerTest.java | 69 .../core/logging/EscalatingLoggerTest.java | 77 .../accumulo/manager/TabletGroupWatcher.java | 11 +- .../accumulo/tserver/UnloadTabletHandler.java | 1 - .../org/apache/accumulo/tserver/tablet/Tablet.java | 22 ++- 7 files changed, 385 insertions(+), 5 deletions(-) create mode 100644 core/src/main/java/org/apache/accumulo/core/logging/ConditionalLogger.java create mode 100644 core/src/test/java/org/apache/accumulo/core/logging/DeduplicatingLoggerTest.java create mode 100644 core/src/test/java/org/apache/accumulo/core/logging/EscalatingLoggerTest.java
(accumulo) branch 2.1 updated: Log message when Tablet has been unloading for over 15 minutes (#4558)
This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch 2.1 in repository https://gitbox.apache.org/repos/asf/accumulo.git The following commit(s) were added to refs/heads/2.1 by this push: new 221259e12e Log message when Tablet has been unloading for over 15 minutes (#4558) 221259e12e is described below commit 221259e12e1cb390dcd5d2dfc93489a27d6e0d25 Author: Dave Marion AuthorDate: Fri May 24 12:46:44 2024 -0400 Log message when Tablet has been unloading for over 15 minutes (#4558) Created an abstract ConditionalLogger class with two implementations. The EscalatingLogger will conditionally log at a higher level and the deduplicating logger will conditionally suppress log messages. Wired up the deduplicating logger in the UnloadTabletHandler to suppress multiple invocations of unload and wired up the escalating logger in the TabletGroupWatcher when the same tablet has been requested to be unloaded. Closes #4539 --- core/pom.xml | 16 ++ .../accumulo/core/logging/ConditionalLogger.java | 194 + .../core/logging/DeduplicatingLoggerTest.java | 69 .../core/logging/EscalatingLoggerTest.java | 77 .../accumulo/manager/TabletGroupWatcher.java | 11 +- .../accumulo/tserver/UnloadTabletHandler.java | 1 - .../org/apache/accumulo/tserver/tablet/Tablet.java | 22 ++- 7 files changed, 385 insertions(+), 5 deletions(-) diff --git a/core/pom.xml b/core/pom.xml index 6e4ffc9883..57c8c425b4 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -152,11 +152,27 @@ hadoop-client-runtime runtime + + + biz.aQute.bnd + biz.aQute.bnd.annotation + test + org.apache.hadoop hadoop-client-minicluster test + + org.apache.logging.log4j + log4j-api + test + + + org.apache.logging.log4j + log4j-core + test + org.apache.logging.log4j log4j-slf4j2-impl diff --git a/core/src/main/java/org/apache/accumulo/core/logging/ConditionalLogger.java b/core/src/main/java/org/apache/accumulo/core/logging/ConditionalLogger.java new file mode 100644 index 00..6da6454f06 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/logging/ConditionalLogger.java @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.core.logging; + +import java.time.Duration; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.ConcurrentMap; +import java.util.function.BiFunction; + +import org.apache.accumulo.core.util.Pair; +import org.slf4j.Logger; +import org.slf4j.Marker; +import org.slf4j.event.Level; +import org.slf4j.helpers.AbstractLogger; + +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; + +/** + * Logger that wraps another Logger and only emits a log message once per the supplied duration. + * + */ +public abstract class ConditionalLogger extends AbstractLogger { + + private static final long serialVersionUID = 1L; + + /** + * A Logger implementation that will log a message at the supplied elevated level if it has not + * been seen in the supplied duration. For repeat occurrences the message will be logged at the + * level used in code (which is likely a lower level). Note that the first log message will be + * logged at the elevated level because it has not been seen before. + */ + public static class EscalatingLogger extends DeduplicatingLogger { + +private static final long serialVersionUID = 1L; +private final Level elevatedLevel; + +public EscalatingLogger(Logger log, Duration threshold, long maxCachedLogMessages, +Level elevatedLevel) { + super(log, threshold, maxCachedLogMessages); + this.elevatedLevel = elevatedLevel; +} + +@Override +protected void handleNormalizedLoggingCall(Level level, Marker marker, String messagePattern, +Object[] arguments, Throwable throwable) { + + if (arguments == null) { +argu
(accumulo) branch 2.1 updated: Fixed NPE in ScanServerMetrics (#4598)
This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch 2.1 in repository https://gitbox.apache.org/repos/asf/accumulo.git The following commit(s) were added to refs/heads/2.1 by this push: new 5f571506dd Fixed NPE in ScanServerMetrics (#4598) 5f571506dd is described below commit 5f571506ddf6d7dfc003c2fd9969dda783118550 Author: Dave Marion AuthorDate: Fri May 24 07:45:40 2024 -0400 Fixed NPE in ScanServerMetrics (#4598) A NPE was being raised in ScanServerMetrics.registerMetrics when the SSERV_CACHED_TABLET_METADATA_EXPIRATION value was zero, which disables the tablet metadata caching and leaves the variable tabletMetadataCache referencing null. A test was failing in ScanServerConcurrentTabletScanIT that led to this discovery. --- .../main/java/org/apache/accumulo/tserver/ScanServerMetrics.java | 9 ++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServerMetrics.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServerMetrics.java index 365c26ceee..1ba7de6e33 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServerMetrics.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServerMetrics.java @@ -63,9 +63,12 @@ public class ScanServerMetrics implements MetricsProducer { "Counts instances where file reservation attempts for scans encountered conflicts") .register(registry); -Preconditions.checkState(tabletMetadataCache.policy().isRecordingStats(), -"Attempted to instrument cache that is not recording stats."); -CaffeineCacheMetrics.monitor(registry, tabletMetadataCache, METRICS_SCAN_TABLET_METADATA_CACHE); +if (tabletMetadataCache != null) { + Preconditions.checkState(tabletMetadataCache.policy().isRecordingStats(), + "Attempted to instrument cache that is not recording stats."); + CaffeineCacheMetrics.monitor(registry, tabletMetadataCache, + METRICS_SCAN_TABLET_METADATA_CACHE); +} } public void recordTotalReservationTime(Duration time) {
(accumulo) 02/03: Merge branch 'main' into elasticity
This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git commit 94f37aaddb4ed01d5f0a66f709409cc2af0051a9 Merge: 96fa64fc2a 91a2ca2349 Author: Dave Marion AuthorDate: Mon May 20 17:03:29 2024 + Merge branch 'main' into elasticity .../core/clientImpl/ThriftTransportPool.java | 28 ++- .../scan/ConfigurableScanServerHostSelector.java | 157 + .../spi/scan/ConfigurableScanServerSelector.java | 49 +++--- .../ConfigurableScanServerHostSelectorTest.java| 191 + .../scan/ConfigurableScanServerSelectorTest.java | 22 +-- 5 files changed, 407 insertions(+), 40 deletions(-)
(accumulo) 01/03: Fixed ScanServerGroupConfigurationIT from error after merge
This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git commit 96fa64fc2aa18d37b882c4c040e658bf90700f1d Author: Dave Marion AuthorDate: Mon May 20 17:02:26 2024 + Fixed ScanServerGroupConfigurationIT from error after merge --- .../apache/accumulo/test/ScanServerGroupConfigurationIT.java | 10 -- 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/test/src/main/java/org/apache/accumulo/test/ScanServerGroupConfigurationIT.java b/test/src/main/java/org/apache/accumulo/test/ScanServerGroupConfigurationIT.java index 09f1cb921e..89582eeea7 100644 --- a/test/src/main/java/org/apache/accumulo/test/ScanServerGroupConfigurationIT.java +++ b/test/src/main/java/org/apache/accumulo/test/ScanServerGroupConfigurationIT.java @@ -38,7 +38,6 @@ import org.apache.accumulo.harness.SharedMiniClusterBase; import org.apache.accumulo.minicluster.ServerType; import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; import org.apache.accumulo.test.util.Wait; -import org.apache.accumulo.tserver.ScanServer; import org.apache.hadoop.conf.Configuration; import org.apache.zookeeper.ZooKeeper; import org.junit.jupiter.api.AfterAll; @@ -163,11 +162,10 @@ public class ScanServerGroupConfigurationIT extends SharedMiniClusterBase { ScanServerIT.ingest(client, tableName, 10, 10, 10, "colf", true); assertEquals(100, additionalIngest1); -// Bump the number of scan serves that can run to start the GROUP1 scan server - getCluster().getConfig().getClusterServerConfiguration().setNumDefaultScanServers(2); - -getCluster()._exec(ScanServer.class, ServerType.SCAN_SERVER, Map.of(), -new String[] {"-g", "GROUP1"}); +// A a scan server for resource group GROUP1 +getCluster().getConfig().getClusterServerConfiguration() +.addScanServerResourceGroup("GROUP1", 1); +getCluster().getClusterControl().start(ServerType.SCAN_SERVER); Wait.waitFor(() -> zk.getChildren(scanServerRoot, false).size() == 2); Wait.waitFor(() -> ((ClientContext) client).getScanServers().values().stream().anyMatch( (p) -> p.getSecond().equals(ScanServerSelector.DEFAULT_SCAN_SERVER_GROUP_NAME))
(accumulo) 03/03: Fixed post-merge issue with ConfigurableScanServerSelectorTest
This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git commit 393c7a6c2f9dd365531bf315eab926d7d528bda6 Author: Dave Marion AuthorDate: Mon May 20 17:11:19 2024 + Fixed post-merge issue with ConfigurableScanServerSelectorTest --- .../accumulo/core/spi/scan/ConfigurableScanServerSelectorTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/java/org/apache/accumulo/core/spi/scan/ConfigurableScanServerSelectorTest.java b/core/src/test/java/org/apache/accumulo/core/spi/scan/ConfigurableScanServerSelectorTest.java index 041f5e6eb3..64fe25bb43 100644 --- a/core/src/test/java/org/apache/accumulo/core/spi/scan/ConfigurableScanServerSelectorTest.java +++ b/core/src/test/java/org/apache/accumulo/core/spi/scan/ConfigurableScanServerSelectorTest.java @@ -505,7 +505,7 @@ public class ConfigurableScanServerSelectorTest { var dg = ScanServerSelector.DEFAULT_SCAN_SERVER_GROUP_NAME; -var params = new DaParams(tabletId, Map.of(), Map.of()) { +var params = new SelectorParams(tabletId, Map.of(), Map.of()) { @Override public Optional waitUntil(Supplier> condition, Duration maxWaitTime, String description) {
(accumulo) branch elasticity updated (6c0f610ed2 -> 393c7a6c2f)
This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a change to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git from 6c0f610ed2 Merge remote-tracking branch 'upstream/main' into elasticity new 96fa64fc2a Fixed ScanServerGroupConfigurationIT from error after merge add 176ba9ea0b Improve ThriftTransportPool shutdown speed (#4561) add 73b97b8d1d Merge branch '2.1' add dd61442925 Created ScanServerSelector that tries to use servers on the same host (#4536) add 91a2ca2349 Merge branch '2.1' new 94f37aaddb Merge branch 'main' into elasticity new 393c7a6c2f Fixed post-merge issue with ConfigurableScanServerSelectorTest The 3 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../core/clientImpl/ThriftTransportPool.java | 28 ++- .../scan/ConfigurableScanServerHostSelector.java | 157 + .../spi/scan/ConfigurableScanServerSelector.java | 49 +++--- .../ConfigurableScanServerHostSelectorTest.java| 191 + .../scan/ConfigurableScanServerSelectorTest.java | 24 +-- .../test/ScanServerGroupConfigurationIT.java | 10 +- 6 files changed, 412 insertions(+), 47 deletions(-) create mode 100644 core/src/main/java/org/apache/accumulo/core/spi/scan/ConfigurableScanServerHostSelector.java create mode 100644 core/src/test/java/org/apache/accumulo/core/spi/scan/ConfigurableScanServerHostSelectorTest.java
(accumulo) 01/01: Merge branch '2.1'
This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git commit 91a2ca234903a098def9bcb5eb52a5f98a2aba2b Merge: 73b97b8d1d dd61442925 Author: Dave Marion AuthorDate: Mon May 20 16:41:27 2024 + Merge branch '2.1' .../scan/ConfigurableScanServerHostSelector.java | 157 + .../spi/scan/ConfigurableScanServerSelector.java | 49 +++--- .../ConfigurableScanServerHostSelectorTest.java| 191 + .../scan/ConfigurableScanServerSelectorTest.java | 22 +-- 4 files changed, 386 insertions(+), 33 deletions(-) diff --cc core/src/main/java/org/apache/accumulo/core/spi/scan/ConfigurableScanServerHostSelector.java index 00,d21a8799b9..f43f21e8c2 mode 00,100644..100644 --- a/core/src/main/java/org/apache/accumulo/core/spi/scan/ConfigurableScanServerHostSelector.java +++ b/core/src/main/java/org/apache/accumulo/core/spi/scan/ConfigurableScanServerHostSelector.java @@@ -1,0 -1,155 +1,157 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package org.apache.accumulo.core.spi.scan; + ++import static org.apache.accumulo.core.util.LazySingletons.RANDOM; ++ + import java.util.ArrayList; + import java.util.Collections; + import java.util.Comparator; + import java.util.HashMap; + import java.util.HashSet; + import java.util.List; + import java.util.Map; + import java.util.Set; + + import org.apache.accumulo.core.data.TabletId; + + import com.google.common.hash.HashCode; + import com.google.common.net.HostAndPort; + + /** + * Extension of the {@code ConfigurableScanServerSelector} that can be used when there are multiple + * ScanServers running on the same host and for some reason, like using a shared off-heap cache, + * sending scans for the same tablet to the same host may provide a better experience. + * + * This implementation will initially hash a Tablet to a ScanServer. If the ScanServer is unable to + * execute the scan, this implementation will try to send the scan to a ScanServer on the same host. + * If there are no more ScanServers to try on that host, then it will fall back to trying a + * different host and the process repeats. + * + */ + public class ConfigurableScanServerHostSelector extends ConfigurableScanServerSelector { + + private static final class PriorHostServersComparator implements Comparator { + + @Override + public int compare(PriorHostServers o1, PriorHostServers o2) { + return Integer.compare(o1.getPriorServers().size(), o2.getPriorServers().size()); + } + + } + + private static final class PriorHostServers { + private final String priorHost; + private final List priorServers = new ArrayList<>(); + + public PriorHostServers(String priorHost) { + this.priorHost = priorHost; + } + + public String getPriorHost() { + return priorHost; + } + + public List getPriorServers() { + return priorServers; + } + } + + @Override + protected int selectServers(SelectorParameters params, Profile profile, + List orderedScanServers, Map serversToUse) { + + // orderedScanServers is the set of ScanServers addresses (host:port) + // for the resource group designated for the profile being used for + // this scan. We want to group these scan servers by hostname and + // hash the tablet to the hostname, then randomly pick one of the + // scan servers in that group. + + final Map> scanServerHosts = new HashMap<>(); + for (final String address : orderedScanServers) { + final HostAndPort hp = HostAndPort.fromString(address); + scanServerHosts.computeIfAbsent(hp.getHost(), (k) -> { + return new ArrayList(); + }).add(address); + } + final List hostIndex = new ArrayList<>(scanServerHosts.keySet()); + + final int numberOfPreviousAttempts = params.getTablets().stream() + .mapToInt(tablet -> params.getAttempts(tablet).size()).max().orElse(0); + + final int numServersToUseInAttemptPlan = + profile.getNumServers(num
(accumulo) branch main updated (73b97b8d1d -> 91a2ca2349)
This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a change to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git from 73b97b8d1d Merge branch '2.1' add dd61442925 Created ScanServerSelector that tries to use servers on the same host (#4536) new 91a2ca2349 Merge branch '2.1' The 1 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../scan/ConfigurableScanServerHostSelector.java | 157 + .../spi/scan/ConfigurableScanServerSelector.java | 49 +++--- .../ConfigurableScanServerHostSelectorTest.java| 191 + .../scan/ConfigurableScanServerSelectorTest.java | 22 +-- 4 files changed, 386 insertions(+), 33 deletions(-) create mode 100644 core/src/main/java/org/apache/accumulo/core/spi/scan/ConfigurableScanServerHostSelector.java create mode 100644 core/src/test/java/org/apache/accumulo/core/spi/scan/ConfigurableScanServerHostSelectorTest.java
(accumulo) branch 2.1 updated (176ba9ea0b -> dd61442925)
This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a change to branch 2.1 in repository https://gitbox.apache.org/repos/asf/accumulo.git from 176ba9ea0b Improve ThriftTransportPool shutdown speed (#4561) add dd61442925 Created ScanServerSelector that tries to use servers on the same host (#4536) No new revisions were added by this update. Summary of changes: .../scan/ConfigurableScanServerHostSelector.java | 155 + .../spi/scan/ConfigurableScanServerSelector.java | 51 +++--- .../ConfigurableScanServerHostSelectorTest.java| 191 + .../scan/ConfigurableScanServerSelectorTest.java | 22 +-- 4 files changed, 385 insertions(+), 34 deletions(-) create mode 100644 core/src/main/java/org/apache/accumulo/core/spi/scan/ConfigurableScanServerHostSelector.java create mode 100644 core/src/test/java/org/apache/accumulo/core/spi/scan/ConfigurableScanServerHostSelectorTest.java
(accumulo) branch main updated (52be928da0 -> 73b97b8d1d)
This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a change to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git from 52be928da0 Merge branch '2.1' add 176ba9ea0b Improve ThriftTransportPool shutdown speed (#4561) new 73b97b8d1d Merge branch '2.1' The 1 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../core/clientImpl/ThriftTransportPool.java | 28 -- 1 file changed, 21 insertions(+), 7 deletions(-)
(accumulo) 01/01: Merge branch '2.1'
This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git commit 73b97b8d1d84717793e9531d9d1ca65d664b7cc2 Merge: 52be928da0 176ba9ea0b Author: Dave Marion AuthorDate: Mon May 20 15:45:44 2024 + Merge branch '2.1' .../core/clientImpl/ThriftTransportPool.java | 28 -- 1 file changed, 21 insertions(+), 7 deletions(-)
(accumulo) branch 2.1 updated (c53bebc3e9 -> 176ba9ea0b)
This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a change to branch 2.1 in repository https://gitbox.apache.org/repos/asf/accumulo.git from c53bebc3e9 Added Scan Server Group Configuration IT (#4506) add 176ba9ea0b Improve ThriftTransportPool shutdown speed (#4561) No new revisions were added by this update. Summary of changes: .../core/clientImpl/ThriftTransportPool.java | 28 -- 1 file changed, 21 insertions(+), 7 deletions(-)
(accumulo) branch main updated (5cd9cdfc70 -> 52be928da0)
This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a change to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git from 5cd9cdfc70 Update MetricsIT (#4576) add c53bebc3e9 Added Scan Server Group Configuration IT (#4506) new 52be928da0 Merge branch '2.1' The 1 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../test/ScanServerGroupConfigurationIT.java | 197 + .../org/apache/accumulo/test/ScanServerIT.java | 4 +- 2 files changed, 199 insertions(+), 2 deletions(-) create mode 100644 test/src/main/java/org/apache/accumulo/test/ScanServerGroupConfigurationIT.java
(accumulo) 01/01: Merge branch '2.1'
This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git commit 52be928da034ce1fe9dd18971171fc60feb55ed3 Merge: 5cd9cdfc70 c53bebc3e9 Author: Dave Marion AuthorDate: Mon May 20 14:53:57 2024 + Merge branch '2.1' .../test/ScanServerGroupConfigurationIT.java | 197 + .../org/apache/accumulo/test/ScanServerIT.java | 4 +- 2 files changed, 199 insertions(+), 2 deletions(-)
(accumulo) branch 2.1 updated: Added Scan Server Group Configuration IT (#4506)
This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch 2.1 in repository https://gitbox.apache.org/repos/asf/accumulo.git The following commit(s) were added to refs/heads/2.1 by this push: new c53bebc3e9 Added Scan Server Group Configuration IT (#4506) c53bebc3e9 is described below commit c53bebc3e9accf63de0c41dac3e2da5ce9597605 Author: Dave Marion AuthorDate: Mon May 20 10:43:28 2024 -0400 Added Scan Server Group Configuration IT (#4506) Closes #4504 --- .../test/ScanServerGroupConfigurationIT.java | 197 + .../org/apache/accumulo/test/ScanServerIT.java | 4 +- 2 files changed, 199 insertions(+), 2 deletions(-) diff --git a/test/src/main/java/org/apache/accumulo/test/ScanServerGroupConfigurationIT.java b/test/src/main/java/org/apache/accumulo/test/ScanServerGroupConfigurationIT.java new file mode 100644 index 00..c18e6e1aff --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/ScanServerGroupConfigurationIT.java @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.test; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.util.Map; + +import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.client.Accumulo; +import org.apache.accumulo.core.client.AccumuloClient; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.ScannerBase.ConsistencyLevel; +import org.apache.accumulo.core.clientImpl.ClientContext; +import org.apache.accumulo.core.conf.ClientProperty; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.spi.scan.ScanServerSelector; +import org.apache.accumulo.harness.MiniClusterConfigurationCallback; +import org.apache.accumulo.harness.SharedMiniClusterBase; +import org.apache.accumulo.minicluster.ServerType; +import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; +import org.apache.accumulo.test.util.Wait; +import org.apache.accumulo.tserver.ScanServer; +import org.apache.hadoop.conf.Configuration; +import org.apache.zookeeper.ZooKeeper; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +import com.google.common.collect.Iterables; + +public class ScanServerGroupConfigurationIT extends SharedMiniClusterBase { + + // @formatter:off + private static final String clientConfiguration = + "["+ + " {"+ + " \"isDefault\": true,"+ + " \"maxBusyTimeout\": \"5m\","+ + " \"busyTimeoutMultiplier\": 8,"+ + " \"scanTypeActivations\": [],"+ + " \"attemptPlans\": ["+ + " {"+ + " \"servers\": \"3\","+ + " \"busyTimeout\": \"33ms\","+ + " \"salt\": \"one\""+ + " },"+ + " {"+ + " \"servers\": \"13\","+ + " \"busyTimeout\": \"33ms\","+ + " \"salt\": \"two\""+ + " },"+ + " {"+ + " \"servers\": \"100%\","+ + " \"busyTimeout\": \"33ms\""+ + " }"+ + " ]"+ + " },"+ + " {"+ + " \"isDefault\": false,"+ + " \"maxBusyTimeout\": \"5m\","+ + " \"busyTimeoutMultiplier\": 8,"+ + " \"group\": \"GROUP1\","+ + " \"scanTypeActivations\": [\"use_group1\"],"+ + " \"attemptPlans\": ["+ + " {"+ + " \"servers\": \"3\","+ + "
(accumulo) 01/02: Merge branch 'main' into elasticity
This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git commit bd6ce7cc1cab36413fa07262111c056453a1feab Merge: f7e6183271 64de0f0aa7 Author: Dave Marion AuthorDate: Mon May 20 13:29:05 2024 + Merge branch 'main' into elasticity .../org/apache/accumulo/tserver/ScanServer.java| 17 +++ .../apache/accumulo/tserver/ScanServerTest.java| 132 ++--- 2 files changed, 134 insertions(+), 15 deletions(-)
(accumulo) branch elasticity updated (f7e6183271 -> 6afdc88f8e)
This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a change to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git from f7e6183271 speeds up tablet mgmt iterator (#4568) add a8d3a101d7 Only allow system user to perform eventual scans on root and meta (#4531) add 64de0f0aa7 Merge branch '2.1' new bd6ce7cc1c Merge branch 'main' into elasticity new 6afdc88f8e Modified ScanServer to not allow scans on Fate table as well The 2 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../apache/accumulo/core/dataImpl/KeyExtent.java | 4 + .../org/apache/accumulo/tserver/ScanServer.java| 17 +++ .../apache/accumulo/tserver/ScanServerTest.java| 132 ++--- 3 files changed, 138 insertions(+), 15 deletions(-)
(accumulo) 02/02: Modified ScanServer to not allow scans on Fate table as well
This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git commit 6afdc88f8e6ebfdd2a5b14c2837f1ff03c062a21 Author: Dave Marion AuthorDate: Mon May 20 14:18:55 2024 + Modified ScanServer to not allow scans on Fate table as well --- .../main/java/org/apache/accumulo/core/dataImpl/KeyExtent.java | 4 .../src/main/java/org/apache/accumulo/tserver/ScanServer.java | 4 ++-- .../test/java/org/apache/accumulo/tserver/ScanServerTest.java | 10 +- 3 files changed, 11 insertions(+), 7 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/dataImpl/KeyExtent.java b/core/src/main/java/org/apache/accumulo/core/dataImpl/KeyExtent.java index e69068478f..133818ed5d 100644 --- a/core/src/main/java/org/apache/accumulo/core/dataImpl/KeyExtent.java +++ b/core/src/main/java/org/apache/accumulo/core/dataImpl/KeyExtent.java @@ -474,6 +474,10 @@ public class KeyExtent implements Comparable { return prevExtent.endRow().equals(prevEndRow()); } + public boolean isSystemTable() { +return AccumuloTable.allTableIds().contains(tableId()); + } + public boolean isMeta() { return tableId().equals(AccumuloTable.METADATA.tableId()) || isRootTablet(); } diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java index c2c71d04fb..f9aed7e2cf 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java @@ -933,7 +933,7 @@ public class ScanServer extends AbstractServer KeyExtent extent = getKeyExtent(textent); -if (extent.isMeta() && !isSystemUser(credentials)) { +if (extent.isSystemTable() && !isSystemUser(credentials)) { throw new TException( "Only the system user can perform eventual consistency scans on the root and metadata tables"); } @@ -1000,7 +1000,7 @@ public class ScanServer extends AbstractServer for (Entry> entry : tbatch.entrySet()) { KeyExtent extent = getKeyExtent(entry.getKey()); - if (extent.isMeta() && !context.getSecurityOperation().isSystemUser(credentials)) { + if (extent.isSystemTable() && !isSystemUser(credentials)) { throw new TException( "Only the system user can perform eventual consistency scans on the root and metadata tables"); } diff --git a/server/tserver/src/test/java/org/apache/accumulo/tserver/ScanServerTest.java b/server/tserver/src/test/java/org/apache/accumulo/tserver/ScanServerTest.java index f93f095a87..bbe8ffb3fc 100644 --- a/server/tserver/src/test/java/org/apache/accumulo/tserver/ScanServerTest.java +++ b/server/tserver/src/test/java/org/apache/accumulo/tserver/ScanServerTest.java @@ -190,7 +190,7 @@ public class ScanServerTest { Map execHints = new HashMap<>(); ScanReservation reservation = createMock(ScanReservation.class); -expect(extent.isMeta()).andReturn(false).anyTimes(); +expect(extent.isSystemTable()).andReturn(false).anyTimes(); expect(extent.toThrift()).andReturn(textent).anyTimes(); expect(reservation.getFailures()).andReturn(Map.of(textent, ranges)); reservation.close(); @@ -242,7 +242,7 @@ public class ScanServerTest { }; TestScanServer ss = partialMockBuilder(TestScanServer.class).createMock(); -expect(extent.isMeta()).andReturn(false).anyTimes(); +expect(extent.isSystemTable()).andReturn(false).anyTimes(); expect(reservation.newTablet(ss, extent)).andReturn(tablet); expect(reservation.getTabletMetadataExtents()).andReturn(Set.of(extent)); expect(reservation.getFailures()).andReturn(Map.of()); @@ -305,7 +305,7 @@ public class ScanServerTest { }; TestScanServer ss = partialMockBuilder(TestScanServer.class).createMock(); -expect(extent.isMeta()).andReturn(false).anyTimes(); +expect(extent.isSystemTable()).andReturn(false).anyTimes(); expect(reservation.newTablet(ss, extent)).andReturn(tablet).anyTimes(); expect(reservation.getTabletMetadataExtents()).andReturn(Set.of()); expect(reservation.getFailures()).andReturn(Map.of(textent, ranges)).anyTimes(); @@ -395,7 +395,7 @@ public class ScanServerTest { TabletResolver resolver = createMock(TabletResolver.class); TestScanServer ss = partialMockBuilder(TestScanServer.class).createMock(); -expect(sextent.isMeta()).andReturn(true).anyTimes(); +expect(sextent.isSystemTable()).andReturn(true).anyTimes(); expect(reservation.newTablet(ss, sextent)).andReturn(tablet); expect(reservation.getFailures()).andReturn(Map.of()).anyTimes(); reservation.close(); @@ -444,7 +444,7 @@ public class ScanServerTest { TabletResolver resolver = createMock(TabletR
(accumulo) 01/01: Merge branch '2.1'
This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git commit 64de0f0aa768c040533107f29b715d636631e41e Merge: 8fe933a671 a8d3a101d7 Author: Dave Marion AuthorDate: Mon May 20 13:21:02 2024 + Merge branch '2.1' .../org/apache/accumulo/tserver/ScanServer.java| 17 +++ .../apache/accumulo/tserver/ScanServerTest.java| 132 ++--- 2 files changed, 134 insertions(+), 15 deletions(-) diff --cc server/tserver/src/test/java/org/apache/accumulo/tserver/ScanServerTest.java index cad9d15aff,73a3e0d03b..f93f095a87 --- a/server/tserver/src/test/java/org/apache/accumulo/tserver/ScanServerTest.java +++ b/server/tserver/src/test/java/org/apache/accumulo/tserver/ScanServerTest.java @@@ -66,8 -64,9 +66,9 @@@ public class ScanServerTest private KeyExtent extent; private TabletResolver resolver; private ScanReservation reservation; + private boolean systemUser; -protected TestScanServer(ScanServerOpts opts, String[] args) { +protected TestScanServer(ConfigOpts opts, String[] args) { super(opts, args); }
(accumulo) branch main updated (8fe933a671 -> 64de0f0aa7)
This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a change to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git from 8fe933a671 Add a constraint check for suspend column (#4546) add a8d3a101d7 Only allow system user to perform eventual scans on root and meta (#4531) new 64de0f0aa7 Merge branch '2.1' The 1 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../org/apache/accumulo/tserver/ScanServer.java| 17 +++ .../apache/accumulo/tserver/ScanServerTest.java| 132 ++--- 2 files changed, 134 insertions(+), 15 deletions(-)
(accumulo) branch 2.1 updated (4b5234bd87 -> a8d3a101d7)
This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a change to branch 2.1 in repository https://gitbox.apache.org/repos/asf/accumulo.git from 4b5234bd87 Added ZK cleanup thread to Manager for Scan Server nodes (#4562) add a8d3a101d7 Only allow system user to perform eventual scans on root and meta (#4531) No new revisions were added by this update. Summary of changes: .../org/apache/accumulo/tserver/ScanServer.java| 17 +++ .../apache/accumulo/tserver/ScanServerTest.java| 132 ++--- 2 files changed, 134 insertions(+), 15 deletions(-)
(accumulo) branch 2.1 updated: Added ZK cleanup thread to Manager for Scan Server nodes (#4562)
This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch 2.1 in repository https://gitbox.apache.org/repos/asf/accumulo.git The following commit(s) were added to refs/heads/2.1 by this push: new 4b5234bd87 Added ZK cleanup thread to Manager for Scan Server nodes (#4562) 4b5234bd87 is described below commit 4b5234bd87a46bfcd686b3db9bda9adff753f556 Author: Dave Marion AuthorDate: Thu May 16 14:54:05 2024 -0400 Added ZK cleanup thread to Manager for Scan Server nodes (#4562) Closes #4559 --- .../java/org/apache/accumulo/manager/Manager.java | 49 +- 1 file changed, 47 insertions(+), 2 deletions(-) diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java index 86a1dd71d3..84e8e68519 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java @@ -72,6 +72,7 @@ import org.apache.accumulo.core.fate.Fate; import org.apache.accumulo.core.fate.zookeeper.ServiceLock; import org.apache.accumulo.core.fate.zookeeper.ServiceLock.LockLossReason; import org.apache.accumulo.core.fate.zookeeper.ServiceLock.ServiceLockPath; +import org.apache.accumulo.core.fate.zookeeper.ZooCache.ZcStat; import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; import org.apache.accumulo.core.fate.zookeeper.ZooUtil; import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeExistsPolicy; @@ -174,7 +175,7 @@ public class Manager extends AbstractServer static final Logger log = LoggerFactory.getLogger(Manager.class); static final int ONE_SECOND = 1000; - private static final long TIME_BETWEEN_MIGRATION_CLEANUPS = 5 * 60 * ONE_SECOND; + private static final long CLEANUP_INTERVAL_MINUTES = 5; static final long WAIT_BETWEEN_ERRORS = ONE_SECOND; private static final long DEFAULT_WAIT_FOR_WATCHER = 10 * ONE_SECOND; private static final int MAX_CLEANUP_WAIT_TIME = ONE_SECOND; @@ -698,7 +699,7 @@ public class Manager extends AbstractServer log.error("Error cleaning up migrations", ex); } } -sleepUninterruptibly(TIME_BETWEEN_MIGRATION_CLEANUPS, MILLISECONDS); +sleepUninterruptibly(CLEANUP_INTERVAL_MINUTES, MINUTES); } } @@ -740,6 +741,48 @@ public class Manager extends AbstractServer } } + private class ScanServerZKCleaner implements Runnable { + +@Override +public void run() { + + final ZooReaderWriter zrw = getContext().getZooReaderWriter(); + final String sserverZNodePath = getContext().getZooKeeperRoot() + Constants.ZSSERVERS; + + while (stillManager()) { +try { + for (String sserverClientAddress : zrw.getChildren(sserverZNodePath)) { + +final String sServerZPath = sserverZNodePath + "/" + sserverClientAddress; +final var zLockPath = ServiceLock.path(sServerZPath); +ZcStat stat = new ZcStat(); +byte[] lockData = ServiceLock.getLockData(getContext().getZooCache(), zLockPath, stat); + +if (lockData == null) { + try { +log.debug("Deleting empty ScanServer ZK node {}", sServerZPath); +zrw.delete(sServerZPath); + } catch (KeeperException.NotEmptyException e) { +log.debug( +"Failed to delete ScanServer ZK node {} its not empty, likely an expected race condition.", +sServerZPath); + } +} + } +} catch (KeeperException e) { + log.error("Exception trying to delete empty scan server ZNodes, will retry", e); +} catch (InterruptedException e) { + Thread.interrupted(); + log.error("Interrupted trying to delete empty scan server ZNodes, will retry", e); +} finally { + // sleep for 5 mins + sleepUninterruptibly(CLEANUP_INTERVAL_MINUTES, MINUTES); +} + } +} + + } + private class StatusThread implements Runnable { private boolean goodStats() { @@ -1118,6 +1161,8 @@ public class Manager extends AbstractServer tserverSet.startListeningForTabletServerChanges(); +Threads.createThread("ScanServer Cleanup Thread", new ScanServerZKCleaner()).start(); + try { blockForTservers(); } catch (InterruptedException ex) {
(accumulo) branch elasticity updated: Modified SetEncodingIterator to include Value (#4486)
This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git The following commit(s) were added to refs/heads/elasticity by this push: new edda158a54 Modified SetEncodingIterator to include Value (#4486) edda158a54 is described below commit edda158a54b1769066e8d9c2dfe97fecee419dd2 Author: Dave Marion AuthorDate: Fri May 10 09:21:24 2024 -0400 Modified SetEncodingIterator to include Value (#4486) Renamed SetEqualityIterator to SetEncodingIterator. Added a mandatory iterator option to determine whether the Value should also be encoded for the equality checks that occur for the Conditional mutations. Fixes #3522 Co-authored-by: Christopher L. Shannon Co-authored-by: Dom G. --- .../metadata/ConditionalTabletMutatorImpl.java | 21 +++--- ...alityIterator.java => SetEncodingIterator.java} | 75 +++--- ...ratorTest.java => SetEncodingIteratorTest.java} | 69 +--- .../test/functional/AmpleConditionalWriterIT.java | 18 +- 4 files changed, 142 insertions(+), 41 deletions(-) diff --git a/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletMutatorImpl.java b/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletMutatorImpl.java index 3e87241c6d..381b3e112e 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletMutatorImpl.java +++ b/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletMutatorImpl.java @@ -55,10 +55,11 @@ import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType; import org.apache.accumulo.core.metadata.schema.TabletMetadata.Location; import org.apache.accumulo.core.metadata.schema.TabletMutatorBase; import org.apache.accumulo.core.metadata.schema.TabletOperationId; +import org.apache.accumulo.core.util.Pair; import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.server.metadata.iterators.LocationExistsIterator; import org.apache.accumulo.server.metadata.iterators.PresentIterator; -import org.apache.accumulo.server.metadata.iterators.SetEqualityIterator; +import org.apache.accumulo.server.metadata.iterators.SetEncodingIterator; import org.apache.accumulo.server.metadata.iterators.TabletExistsIterator; import com.google.common.base.Preconditions; @@ -172,16 +173,18 @@ public class ConditionalTabletMutatorImpl extends TabletMutatorBase(tabletMetadata.getLogs()), +Condition c = SetEncodingIterator.createCondition(new HashSet<>(tabletMetadata.getLogs()), logEntry -> logEntry.getColumnQualifier().toString().getBytes(UTF_8), LogColumnFamily.NAME); mutation.addCondition(c); } break; case FILES: { -// ELASTICITY_TODO compare values? -Condition c = SetEqualityIterator.createCondition(tabletMetadata.getFiles(), -stf -> stf.getMetadata().getBytes(UTF_8), DataFileColumnFamily.NAME); +Condition c = + SetEncodingIterator.createConditionWithVal(tabletMetadata.getFilesMap().entrySet(), +entry -> new Pair<>(entry.getKey().getMetadata().getBytes(UTF_8), +entry.getValue().encode()), +DataFileColumnFamily.NAME); mutation.addCondition(c); } break; @@ -199,7 +202,7 @@ public class ConditionalTabletMutatorImpl extends TabletMutatorBase ecid.canonical().getBytes(UTF_8), ExternalCompactionColumnFamily.NAME); mutation.addCondition(c); } @@ -212,13 +215,13 @@ public class ConditionalTabletMutatorImpl extends TabletMutatorBase stf.getMetadata().getBytes(UTF_8), BulkFileColumnFamily.NAME); mutation.addCondition(c); } break; case COMPACTED: { -Condition c = SetEqualityIterator.createCondition(tabletMetadata.getCompacted(), +Condition c = SetEncodingIterator.createCondition(tabletMetadata.getCompacted(), fTid -> fTid.canonical().getBytes(UTF_8), CompactedColumnFamily.NAME); mutation.addCondition(c); } @@ -241,7 +244,7 @@ public class ConditionalTabletMutatorImpl extends TabletMutatorBase fTid.canonical().getBytes(UTF_8), UserCompactionRequestedColumnFamily.NAME); mutation.addCondition(c); } diff --git a/server/base/src/main/java/org/apache/accumulo/server/metadata/iterators/SetEqualityIterator.java b/server/base/src/main/java/org/apache/accumulo/server/metadata/iterators/SetEncodingIterator.java similarity index 62% rename from server/base/src/main/java/org/apache/accumulo/server/metadata/iterators/SetEqualityIterator.java rename to server/base/src/main/java/org/apache/accumulo/server/metadata/iterators/SetEncodingIterator.java index c5314b4467..af878263d3 100644 --- a/server/base/src/main/java/org/a
(accumulo) branch elasticity updated: Validate seek range in TabletManagementIterator (#4507)
This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git The following commit(s) were added to refs/heads/elasticity by this push: new b0f02ea6de Validate seek range in TabletManagementIterator (#4507) b0f02ea6de is described below commit b0f02ea6dec86297bd752aba73ee67ff11c06c16 Author: Dave Marion AuthorDate: Fri May 10 08:32:23 2024 -0400 Validate seek range in TabletManagementIterator (#4507) Closes #4496 --- .../manager/state/TabletManagementIterator.java| 21 .../state/TabletManagementIteratorTest.java| 60 ++ 2 files changed, 81 insertions(+) diff --git a/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementIterator.java b/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementIterator.java index 3f5397a7a4..4b2ba5d4f7 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementIterator.java +++ b/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementIterator.java @@ -20,6 +20,7 @@ package org.apache.accumulo.server.manager.state; import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.EnumSet; import java.util.HashSet; @@ -35,7 +36,9 @@ import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.conf.ConfigurationCopy; import org.apache.accumulo.core.conf.ConfigurationTypeHelper; import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.ByteSequence; import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.iterators.IteratorEnvironment; import org.apache.accumulo.core.iterators.SkippingIterator; @@ -167,6 +170,23 @@ public class TabletManagementIterator extends SkippingIterator { balancer.init(benv); } + @Override + public void seek(Range range, Collection columnFamilies, boolean inclusive) + throws IOException { +if (range != null) { + // This iterator sits on top of the WholeRowIterator (see configureScanner), so enforce + // that the start and end keys in the Range only have a row component to the key. + for (Key k : new Key[] {range.getStartKey(), range.getEndKey()}) { +if (k != null && k.compareTo(new Key(k.getRow())) != 0) { + throw new IllegalArgumentException( + "TabletManagementIterator must be seeked with keys that only contain a row, supplied range: " + + range); +} + } +} +super.seek(range, columnFamilies, inclusive); + } + @Override public Key getTopKey() { return topKey; @@ -299,4 +319,5 @@ public class TabletManagementIterator extends SkippingIterator { return ALL_COMPACTION_KINDS; } } + } diff --git a/server/base/src/test/java/org/apache/accumulo/server/manager/state/TabletManagementIteratorTest.java b/server/base/src/test/java/org/apache/accumulo/server/manager/state/TabletManagementIteratorTest.java new file mode 100644 index 00..a67321 --- /dev/null +++ b/server/base/src/test/java/org/apache/accumulo/server/manager/state/TabletManagementIteratorTest.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.server.manager.state; + +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.io.IOException; +import java.util.Set; + +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Range; +import org.junit.jupiter.api.Test; + +public class TabletManagementIteratorTest { + + @Test + public void testRanges() throws IOException { +TabletManagementIterator iter = new TabletManagementIterator(); + +// We don't call init, so expect a IllegalStateException on success and +// and IllegalArgumentException on failur
(accumulo) branch elasticity updated: Remove todo from SplitUtils (#4484)
This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git The following commit(s) were added to refs/heads/elasticity by this push: new 2bee648380 Remove todo from SplitUtils (#4484) 2bee648380 is described below commit 2bee6483805ef2329aefb9385110289391c6266a Author: Dave Marion AuthorDate: Fri May 10 08:03:19 2024 -0400 Remove todo from SplitUtils (#4484) I don't know that there is a reliable way to determine what the splits should be given the information that we have in the tablet metadata. The todo suggested running tests and doing some compactions, etc. But I think that it's really going to be situation dependent. Users can apply iterators to perform aggregation, deletes, etc. at compaction time that could influence the split points greatly. I think the better option here is to document compactions should be run first to get more a [...] --- .../src/main/java/org/apache/accumulo/server/split/SplitUtils.java | 7 --- 1 file changed, 7 deletions(-) diff --git a/server/base/src/main/java/org/apache/accumulo/server/split/SplitUtils.java b/server/base/src/main/java/org/apache/accumulo/server/split/SplitUtils.java index 8f64c461bc..5cf5c9edc6 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/split/SplitUtils.java +++ b/server/base/src/main/java/org/apache/accumulo/server/split/SplitUtils.java @@ -188,13 +188,6 @@ public class SplitUtils { } public static int calculateDesiredSplits(long esitimatedSize, long splitThreshold) { -// ELASTICITY_TODO tablets used to always split into 2 tablets. Now the split operation will -// split into many. How does this impact a tablet with many files and the estimated sizes after -// split vs the old method. Need to run test where we add lots of data to a single tablet, -// change the split thresh, wait for splits, then look at the estimated sizes, then compact and -// look at the sizes after. For example if a tablet has 10M of data and the split thesh is set -// to 100K, what will the est sizes look like across the tablets after splitting and then after -// compacting? return (int) Math.floor((double) esitimatedSize / (double) splitThreshold); }
(accumulo) branch main updated (33e2d055af -> f4c1f24658)
This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a change to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git from 33e2d055af Merge remote-tracking branch 'upstream/2.1' add 86c81e42d2 Removed duplicate call to Ample.deleteScanServerFileReferences in ScanServer (#4512) new f4c1f24658 Merge branch '2.1' The 1 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../main/java/org/apache/accumulo/tserver/ScanServer.java | 13 - 1 file changed, 4 insertions(+), 9 deletions(-)
(accumulo) 01/01: Merge branch '2.1'
This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git commit f4c1f2465886c5a5f6ebdc384b0a695ad0ac56ec Merge: 33e2d055af 86c81e42d2 Author: Dave Marion AuthorDate: Tue May 7 20:33:21 2024 + Merge branch '2.1' .../main/java/org/apache/accumulo/tserver/ScanServer.java | 13 - 1 file changed, 4 insertions(+), 9 deletions(-)
(accumulo) branch 2.1 updated: Removed duplicate call to Ample.deleteScanServerFileReferences in ScanServer (#4512)
This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch 2.1 in repository https://gitbox.apache.org/repos/asf/accumulo.git The following commit(s) were added to refs/heads/2.1 by this push: new 86c81e42d2 Removed duplicate call to Ample.deleteScanServerFileReferences in ScanServer (#4512) 86c81e42d2 is described below commit 86c81e42d2bb8ee7ba45f977a2757adf706c327d Author: Dave Marion AuthorDate: Tue May 7 16:26:06 2024 -0400 Removed duplicate call to Ample.deleteScanServerFileReferences in ScanServer (#4512) Closes #4508 --- .../main/java/org/apache/accumulo/tserver/ScanServer.java | 13 - 1 file changed, 4 insertions(+), 9 deletions(-) diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java index dabcc617d1..432d7c5ea4 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java @@ -519,6 +519,10 @@ public class ScanServer extends AbstractServer extents); Map tabletsMetadata = getTabletMetadata(extents); +if (!(tabletsMetadata instanceof HashMap)) { + // the map returned by getTabletMetadata may not be mutable + tabletsMetadata = new HashMap<>(tabletsMetadata); +} for (KeyExtent extent : extents) { var tabletMetadata = tabletsMetadata.get(extent); @@ -531,10 +535,6 @@ public class ScanServer extends AbstractServer LOG.info("RFFS {} extent unable to load {} as AssignmentHandler returned false", myReservationId, extent); failures.add(extent); -if (!(tabletsMetadata instanceof HashMap)) { - // the map returned by getTabletMetadata may not be mutable - tabletsMetadata = new HashMap<>(tabletsMetadata); -} tabletsMetadata.remove(extent); } } @@ -618,14 +618,9 @@ public class ScanServer extends AbstractServer for (KeyExtent extent : tabletsToCheck) { TabletMetadata metadataAfter = tabletsToCheckMetadata.get(extent); if (metadataAfter == null) { -getContext().getAmple().deleteScanServerFileReferences(refs); LOG.info("RFFS {} extent unable to load {} as metadata no longer referencing files", myReservationId, extent); failures.add(extent); -if (!(tabletsMetadata instanceof HashMap)) { - // the map returned by getTabletMetadata may not be mutable - tabletsMetadata = new HashMap<>(tabletsMetadata); -} tabletsMetadata.remove(extent); } else { // remove files that are still referenced
(accumulo) branch elasticity updated: Changed default value of TSERV_PORTSEARCH from false to true (#4514)
This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git The following commit(s) were added to refs/heads/elasticity by this push: new b32a32cb58 Changed default value of TSERV_PORTSEARCH from false to true (#4514) b32a32cb58 is described below commit b32a32cb5882aab9e611aa321b7451f62a0ace03 Author: Dave Marion AuthorDate: Tue May 7 14:11:59 2024 -0400 Changed default value of TSERV_PORTSEARCH from false to true (#4514) Changed the default value of TSERV_PORTSEARCH from false to true to provide some consistency in the PORTSEARCH properties. The default values for the Compactors and ScanServers is already set to true. Closes #4476 --- core/src/main/java/org/apache/accumulo/core/conf/Property.java | 10 ++ .../java/org/apache/accumulo/server/rpc/TServerUtilsTest.java | 8 +--- 2 files changed, 11 insertions(+), 7 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java index a708b14be1..66c27f9311 100644 --- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java +++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java @@ -485,7 +485,8 @@ public enum Property { "2.1.0"), @Experimental SSERV_PORTSEARCH("sserver.port.search", "true", PropertyType.BOOLEAN, - "if the ports above are in use, search higher ports until one is available.", "2.1.0"), + "if the sserver.port.client ports are in use, search higher ports until one is available.", + "2.1.0"), @Experimental SSERV_CLIENTPORT("sserver.port.client", "9996", PropertyType.PORT, "The port used for handling client connections on the tablet servers.", "2.1.0"), @@ -553,8 +554,9 @@ public enum Property { "Specifies the size of the cache for RFile index blocks.", "1.3.5"), TSERV_SUMMARYCACHE_SIZE("tserver.cache.summary.size", "10%", PropertyType.MEMORY, "Specifies the size of the cache for summary data on each tablet server.", "2.0.0"), - TSERV_PORTSEARCH("tserver.port.search", "false", PropertyType.BOOLEAN, - "if the ports above are in use, search higher ports until one is available.", "1.3.5"), + TSERV_PORTSEARCH("tserver.port.search", "true", PropertyType.BOOLEAN, + "if the tserver.port.client ports are in use, search higher ports until one is available.", + "1.3.5"), TSERV_CLIENTPORT("tserver.port.client", "9997", PropertyType.PORT, "The port used for handling client connections on the tablet servers.", "1.3.5"), TSERV_TOTAL_MUTATION_QUEUE_MAX("tserver.total.mutation.queue.max", "5%", PropertyType.MEMORY, @@ -1101,7 +1103,7 @@ public enum Property { "4.0.0"), @Experimental COMPACTOR_PORTSEARCH("compactor.port.search", "true", PropertyType.BOOLEAN, - "If the compactor.port.client is in use, search higher ports until one is available.", + "If the compactor.port.client ports are in use, search higher ports until one is available.", "2.1.0"), @Experimental COMPACTOR_CLIENTPORT("compactor.port.client", "9133", PropertyType.PORT, diff --git a/server/base/src/test/java/org/apache/accumulo/server/rpc/TServerUtilsTest.java b/server/base/src/test/java/org/apache/accumulo/server/rpc/TServerUtilsTest.java index f1a4b58cf3..5ccd6f7358 100644 --- a/server/base/src/test/java/org/apache/accumulo/server/rpc/TServerUtilsTest.java +++ b/server/base/src/test/java/org/apache/accumulo/server/rpc/TServerUtilsTest.java @@ -93,6 +93,7 @@ public class TServerUtilsTest { public void testStartServerZeroPort() throws Exception { TServer server = null; conf.set(Property.TSERV_CLIENTPORT, "0"); +conf.set(Property.TSERV_PORTSEARCH, "false"); try { ServerAddress address = startServer(); assertNotNull(address); @@ -111,6 +112,7 @@ public class TServerUtilsTest { TServer server = null; int port = getFreePort(1024); conf.set(Property.TSERV_CLIENTPORT, Integer.toString(port)); +conf.set(Property.TSERV_PORTSEARCH, "false"); try { ServerAddress address = startServer(); assertNotNull(address); @@ -131,6 +133,7 @@ public class TServerUtilsTest { InetAddress addr = InetAddress.getByName("localhost"); // Bind to the port conf.set(Property.TSERV_CLIENTPORT, Integer.toString(port)); +conf.set(Property.TSERV_PORTSEARCH, "false"); try (ServerSock
(accumulo) branch elasticity updated: Modified Tablet Mutator implementations to always update srv:lock (#4421)
This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git The following commit(s) were added to refs/heads/elasticity by this push: new 76fffdedf3 Modified Tablet Mutator implementations to always update srv:lock (#4421) 76fffdedf3 is described below commit 76fffdedf32d4df03412ade09ec367d909fe79f3 Author: Dave Marion AuthorDate: Mon Apr 29 15:29:28 2024 -0400 Modified Tablet Mutator implementations to always update srv:lock (#4421) Modified the tablet mutating implementations (TabletMutator, ConditionalTabletMutator, etc.) to always update the srv:lock column so that the MetadataConstraints filter on the server side validates that the update comes from a "valid" server. I had to modify MiniAccumuloClusterImpl to acquire a ServiceLock for the ITs to use where the test code is updating the tablet metadata. The changes in the other classes are mainly just making sure that the tablet mutator classes are getting the ServiceLock. Closes #4420 --- .../org/apache/accumulo/core/lock/ServiceLock.java | 12 + .../accumulo/core/metadata/schema/Ample.java | 3 - .../metadata/schema/TabletMetadataBuilder.java | 6 - .../core/metadata/schema/TabletMutatorBase.java| 3 +- .../miniclusterImpl/MiniAccumuloClusterImpl.java | 279 - .../org/apache/accumulo/server/ServerContext.java | 18 ++ .../manager/state/AbstractTabletStateStore.java| 4 +- .../server/manager/state/MetaDataStateStore.java | 5 +- .../server/manager/state/RootTabletStateStore.java | 4 +- .../metadata/ConditionalTabletMutatorImpl.java | 7 + .../server/metadata/RootTabletMutatorImpl.java | 7 + .../server/metadata/TabletMutatorImpl.java | 11 +- .../accumulo/server/util/MetadataTableUtil.java| 1 - .../manager/state/ZooTabletStateStoreTest.java | 9 +- .../ConditionalTabletsMutatorImplTest.java | 24 +- .../org/apache/accumulo/compactor/Compactor.java | 1 + .../apache/accumulo/gc/SimpleGarbageCollector.java | 7 +- .../java/org/apache/accumulo/manager/Manager.java | 1 + .../accumulo/manager/TabletGroupWatcher.java | 2 - .../manager/tableOps/create/PopulateMetadata.java | 1 - .../manager/tableOps/merge/MergeTabletsTest.java | 8 +- .../manager/tableOps/split/UpdateTabletsTest.java | 4 + .../java/org/apache/accumulo/monitor/Monitor.java | 1 + .../org/apache/accumulo/tserver/ScanServer.java| 1 + .../org/apache/accumulo/tserver/TabletServer.java | 1 + .../accumulo/tserver/tablet/ScanfileManager.java | 10 +- .../org/apache/accumulo/tserver/tablet/Tablet.java | 8 +- .../accumulo/test/functional/AccumuloClientIT.java | 10 +- .../apache/accumulo/test/functional/SplitIT.java | 9 +- .../accumulo/test/functional/SplitRecoveryIT.java | 57 + .../accumulo/test/performance/NullTserver.java | 101 ++-- 31 files changed, 380 insertions(+), 235 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/lock/ServiceLock.java b/core/src/main/java/org/apache/accumulo/core/lock/ServiceLock.java index 43388052a3..ef80d4e52d 100644 --- a/core/src/main/java/org/apache/accumulo/core/lock/ServiceLock.java +++ b/core/src/main/java/org/apache/accumulo/core/lock/ServiceLock.java @@ -32,6 +32,7 @@ import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; import org.apache.accumulo.core.fate.zookeeper.ZooUtil; import org.apache.accumulo.core.fate.zookeeper.ZooUtil.LockID; import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeMissingPolicy; +import org.apache.accumulo.core.util.time.NanoTime; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException.Code; @@ -558,6 +559,17 @@ public class ServiceLock implements Watcher { LOG.debug("[{}] Deleting all at path {} due to unlock", vmLockPrefix, pathToDelete); ZooUtil.recursiveDelete(zooKeeper, pathToDelete, NodeMissingPolicy.SKIP); +// Wait for the delete to happen on the server before exiting method +NanoTime start = NanoTime.now(); +while (zooKeeper.exists(pathToDelete, null) != null) { + Thread.onSpinWait(); + if (NanoTime.now().subtract(start).toSeconds() > 10) { +start = NanoTime.now(); +LOG.debug("[{}] Still waiting for zookeeper to delete all at {}", vmLockPrefix, +pathToDelete); + } +} + localLw.lostLock(LockLossReason.LOCK_DELETED); } diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java index d85553f75e..27856c899b 100644 --- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java +++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/Am
(accumulo) branch elasticity updated: Fixed check in TabletResourceGroupBalanceIT to use Wait.waitFor (#4479)
This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git The following commit(s) were added to refs/heads/elasticity by this push: new ee04ad3aa5 Fixed check in TabletResourceGroupBalanceIT to use Wait.waitFor (#4479) ee04ad3aa5 is described below commit ee04ad3aa522404cb37a1eceefd2bec34b2d00e7 Author: Dave Marion AuthorDate: Tue Apr 23 07:28:56 2024 -0400 Fixed check in TabletResourceGroupBalanceIT to use Wait.waitFor (#4479) The assertion in testResourceGroupBalanceWithNoTServers started returning zero for the number of hosted tablets after the `waitForBalance`. Not sure which modification caused this to change behavior, but this fix is likely the correct one regardless. --- .../functional/TabletResourceGroupBalanceIT.java | 30 -- 1 file changed, 16 insertions(+), 14 deletions(-) diff --git a/test/src/main/java/org/apache/accumulo/test/functional/TabletResourceGroupBalanceIT.java b/test/src/main/java/org/apache/accumulo/test/functional/TabletResourceGroupBalanceIT.java index 6b289a72b4..0605a69c2a 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/TabletResourceGroupBalanceIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/TabletResourceGroupBalanceIT.java @@ -225,20 +225,22 @@ public class TabletResourceGroupBalanceIT extends SharedMiniClusterBase { .addTabletServerResourceGroup("GROUP2", 1); getCluster().getClusterControl().start(ServerType.TABLET_SERVER); - client.instanceOperations().waitForBalance(); - assertEquals(26, getCountOfHostedTablets(client, tableName)); - ingest.join(); - assertNull(error.get()); - - client.tableOperations().delete(tableName); - // Stop all tablet servers because there is no way to just stop - // the GROUP2 server yet. - getCluster().getClusterControl().stopAllServers(ServerType.TABLET_SERVER); - getCluster().getConfig().getClusterServerConfiguration().clearTServerResourceGroups(); - getCluster().getConfig().getClusterServerConfiguration() - .addTabletServerResourceGroup("GROUP1", 1); - getCluster().getClusterControl().start(ServerType.TABLET_SERVER); + try { +client.instanceOperations().waitForBalance(); +Wait.waitFor(() -> getCountOfHostedTablets(client, tableName) == 26); +ingest.join(); +assertNull(error.get()); + } finally { +client.tableOperations().delete(tableName); +// Stop all tablet servers because there is no way to just stop +// the GROUP2 server yet. + getCluster().getClusterControl().stopAllServers(ServerType.TABLET_SERVER); + getCluster().getConfig().getClusterServerConfiguration().clearTServerResourceGroups(); +getCluster().getConfig().getClusterServerConfiguration() +.addTabletServerResourceGroup("GROUP1", 1); +getCluster().getClusterControl().start(ServerType.TABLET_SERVER); + } } } @@ -258,7 +260,7 @@ public class TabletResourceGroupBalanceIT extends SharedMiniClusterBase { client.tableOperations().create(tableName, ntc1); // wait for all tablets to be hosted - Wait.waitFor(() -> 26 != getCountOfHostedTablets(client, tableName)); + Wait.waitFor(() -> 26 == getCountOfHostedTablets(client, tableName)); client.instanceOperations().waitForBalance();
(accumulo) branch elasticity updated: Allow tablet refresh while in the process of closing (#4483)
This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git The following commit(s) were added to refs/heads/elasticity by this push: new 45653be366 Allow tablet refresh while in the process of closing (#4483) 45653be366 is described below commit 45653be36678c9f4fa970e7db819ad67c6b02113 Author: Keith Turner AuthorDate: Tue Apr 23 07:27:47 2024 -0400 Allow tablet refresh while in the process of closing (#4483) There was check in the tablet refresh code that was preventing tablet refresh while a tablet was in the middle of closing. Modified the check to only prevent refresh after a tablet is competely closed. fixes #4477 --- .../src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java| 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java index 348807f161..38cd3cff18 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java @@ -1635,7 +1635,7 @@ public class Tablet extends TabletBase { } synchronized (this) { -if (isClosed()) { +if (isCloseComplete()) { log.debug("Unable to refresh tablet {} for {} because the tablet is closed", extent, refreshPurpose); return false;
(accumulo) branch elasticity updated: Log warning when no Compactors for system tables (#4464)
This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git The following commit(s) were added to refs/heads/elasticity by this push: new 0039e5fa75 Log warning when no Compactors for system tables (#4464) 0039e5fa75 is described below commit 0039e5fa75fa78af4d6e17488f5ff82ac96062b5 Author: Dave Marion AuthorDate: Wed Apr 17 08:37:48 2024 -0400 Log warning when no Compactors for system tables (#4464) Added logic in the SimpleGarbageCollector that will periodically log a warning when there are no compactors running for the system tables' resource group. Fixes #4318 --- .../apache/accumulo/gc/SimpleGarbageCollector.java | 31 ++ 1 file changed, 31 insertions(+) diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java index 8f9d406488..ec1f623377 100644 --- a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java +++ b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java @@ -23,6 +23,11 @@ import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterrup import java.io.FileNotFoundException; import java.io.IOException; import java.lang.reflect.InvocationTargetException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.stream.IntStream; @@ -33,6 +38,7 @@ import org.apache.accumulo.core.client.AccumuloClient; import org.apache.accumulo.core.clientImpl.thrift.TInfo; import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.gc.thrift.GCMonitorService.Iface; import org.apache.accumulo.core.gc.thrift.GCStatus; import org.apache.accumulo.core.gc.thrift.GcCycleStats; @@ -45,12 +51,16 @@ import org.apache.accumulo.core.metadata.AccumuloTable; import org.apache.accumulo.core.metadata.schema.Ample.DataLevel; import org.apache.accumulo.core.metrics.MetricsUtil; import org.apache.accumulo.core.securityImpl.thrift.TCredentials; +import org.apache.accumulo.core.spi.balancer.TableLoadBalancer; import org.apache.accumulo.core.trace.TraceUtil; import org.apache.accumulo.core.util.Halt; +import org.apache.accumulo.core.util.compaction.ExternalCompactionUtil; import org.apache.accumulo.core.util.threads.ThreadPools; +import org.apache.accumulo.core.util.time.NanoTime; import org.apache.accumulo.gc.metrics.GcCycleMetrics; import org.apache.accumulo.gc.metrics.GcMetrics; import org.apache.accumulo.server.AbstractServer; +import org.apache.accumulo.server.conf.TableConfiguration; import org.apache.accumulo.server.fs.VolumeManager; import org.apache.accumulo.server.manager.LiveTServerSet; import org.apache.accumulo.server.rpc.ServerAddress; @@ -79,6 +89,8 @@ public class SimpleGarbageCollector extends AbstractServer implements Iface { private final GcCycleMetrics gcCycleMetrics = new GcCycleMetrics(); + private NanoTime lastCompactorCheck = NanoTime.now(); + SimpleGarbageCollector(ConfigOpts opts, String[] args) { super("gc", opts, args); @@ -297,6 +309,25 @@ public class SimpleGarbageCollector extends AbstractServer implements Iface { gcCycleMetrics.incrementRunCycleCount(); long gcDelay = getConfiguration().getTimeInMillis(Property.GC_CYCLE_DELAY); + +if (NanoTime.now().subtract(lastCompactorCheck).toMillis() > gcDelay * 3) { + Map> resourceMapping = new HashMap<>(); + for (TableId tid : AccumuloTable.allTableIds()) { +TableConfiguration tconf = getContext().getTableConfiguration(tid); +String resourceGroup = tconf.get(TableLoadBalancer.TABLE_ASSIGNMENT_GROUP_PROPERTY); +resourceGroup = +resourceGroup == null ? Constants.DEFAULT_RESOURCE_GROUP_NAME : resourceGroup; +resourceMapping.getOrDefault(resourceGroup, new HashSet<>()).add(tid); + } + for (Entry> e : resourceMapping.entrySet()) { +if (ExternalCompactionUtil.countCompactors(e.getKey(), getContext()) == 0) { + log.warn("No Compactors exist in resource group {} for system table {}", e.getKey(), + e.getValue()); +} + } + lastCompactorCheck = NanoTime.now(); +} + log.debug("Sleeping for {} milliseconds", gcDelay); Thread.sleep(gcDelay); } catch (InterruptedException e) {
(accumulo) branch elasticity updated (93c3418cea -> 8c68e4732e)
This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a change to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git from 93c3418cea Logged msg instead of throwing exception in setFutureLocations (#4442) add 8c68e4732e Fixed NPE in CompactionDriver when null passed to Text constructor (#4443) No new revisions were added by this update. Summary of changes: .../org/apache/accumulo/manager/tableOps/compact/CompactionDriver.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-)
(accumulo) branch elasticity updated (9503ebe22f -> 93c3418cea)
This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a change to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git from 9503ebe22f verifies tablets are seen by compaction driver (#4434) add 93c3418cea Logged msg instead of throwing exception in setFutureLocations (#4442) No new revisions were added by this update. Summary of changes: .../server/manager/state/AbstractTabletStateStore.java | 16 1 file changed, 12 insertions(+), 4 deletions(-)
(accumulo) branch elasticity updated: Moved removeUnusedWALEntries to Tablet (#4404)
This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git The following commit(s) were added to refs/heads/elasticity by this push: new 193c010d36 Moved removeUnusedWALEntries to Tablet (#4404) 193c010d36 is described below commit 193c010d36732b3a9a1cd602afad225498b6ac54 Author: Dave Marion AuthorDate: Mon Apr 8 10:58:42 2024 -0400 Moved removeUnusedWALEntries to Tablet (#4404) Moved MetadataTableUtil.removeUnusedWALEntries code to Tablet constructor. Changed logic to use conditional mutations. --- .../accumulo/server/util/MetadataTableUtil.java| 9 -- .../org/apache/accumulo/tserver/tablet/Tablet.java | 35 +++--- 2 files changed, 24 insertions(+), 20 deletions(-) diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java b/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java index f87805d092..e5db26344f 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java +++ b/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java @@ -31,7 +31,6 @@ import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType import java.io.IOException; import java.util.ArrayList; -import java.util.Collection; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -244,14 +243,6 @@ public class MetadataTableUtil { return new Pair<>(result, sizes); } - public static void removeUnusedWALEntries(ServerContext context, KeyExtent extent, - final Collection entries, ServiceLock zooLock) { -TabletMutator tablet = context.getAmple().mutateTablet(extent); -entries.forEach(tablet::deleteWal); -tablet.putZooLock(context.getZooKeeperRoot(), zooLock); -tablet.mutate(); - } - private static Mutation createCloneMutation(TableId srcTableId, TableId tableId, Map tablet) { diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java index a0f8b95857..414fedd105 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java @@ -59,12 +59,17 @@ import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.file.FilePrefix; import org.apache.accumulo.core.iterators.SortedKeyValueIterator; import org.apache.accumulo.core.iteratorsImpl.system.SourceSwitchingIterator; +import org.apache.accumulo.core.lock.ServiceLock; import org.apache.accumulo.core.logging.TabletLogger; import org.apache.accumulo.core.manager.state.tables.TableState; import org.apache.accumulo.core.metadata.AccumuloTable; import org.apache.accumulo.core.metadata.ReferencedTabletFile; import org.apache.accumulo.core.metadata.StoredTabletFile; import org.apache.accumulo.core.metadata.schema.Ample; +import org.apache.accumulo.core.metadata.schema.Ample.ConditionalResult; +import org.apache.accumulo.core.metadata.schema.Ample.ConditionalResult.Status; +import org.apache.accumulo.core.metadata.schema.Ample.ConditionalTabletMutator; +import org.apache.accumulo.core.metadata.schema.Ample.ConditionalTabletsMutator; import org.apache.accumulo.core.metadata.schema.DataFileValue; import org.apache.accumulo.core.metadata.schema.TabletMetadata; import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType; @@ -85,7 +90,6 @@ import org.apache.accumulo.server.problems.ProblemType; import org.apache.accumulo.server.tablets.ConditionCheckerContext.ConditionChecker; import org.apache.accumulo.server.tablets.TabletNameGenerator; import org.apache.accumulo.server.tablets.TabletTime; -import org.apache.accumulo.server.util.MetadataTableUtil; import org.apache.accumulo.tserver.InMemoryMap; import org.apache.accumulo.tserver.MinorCompactionReason; import org.apache.accumulo.tserver.TabletServer; @@ -126,7 +130,6 @@ public class Tablet extends TabletBase { private final TabletTime tabletTime; - private Location lastLocation = null; private final Set checkedTabletDirs = new ConcurrentSkipListSet<>(); private final AtomicLong dataSourceDeletions = new AtomicLong(0); @@ -231,10 +234,6 @@ public class Tablet extends TabletBase { this.tabletServer = tabletServer; this.tabletResources = trm; this.latestMetadata = metadata; - -// TODO look into this.. also last could be null -this.lastLocation = metadata.getLast(); - this.tabletTime = TabletTime.getInstance(metadata.getTime()); this.logId = tabletServer.createLogId(); @@ -277,10 +276,25 @@ public class Tablet extends TabletBase { commitSession.updateMaxCommittedTime(tabletTime.getTime()); if (entriesUsedOnTable
(accumulo) branch elasticity updated: Existing logging in ManagerClientServiceHandler is sufficient, removed todo (#4433)
This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git The following commit(s) were added to refs/heads/elasticity by this push: new 77e95f95b9 Existing logging in ManagerClientServiceHandler is sufficient, removed todo (#4433) 77e95f95b9 is described below commit 77e95f95b9b3b76c42f5b619937dbcb58553f029 Author: Dave Marion AuthorDate: Fri Apr 5 16:28:31 2024 -0400 Existing logging in ManagerClientServiceHandler is sufficient, removed todo (#4433) --- .../java/org/apache/accumulo/manager/ManagerClientServiceHandler.java| 1 - 1 file changed, 1 deletion(-) diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/ManagerClientServiceHandler.java b/server/manager/src/main/java/org/apache/accumulo/manager/ManagerClientServiceHandler.java index 9a1fdaf478..dd14024db6 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/ManagerClientServiceHandler.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/ManagerClientServiceHandler.java @@ -648,7 +648,6 @@ public class ManagerClientServiceHandler implements ManagerClientService.Iface { inProgress.forEach(hostingRequestInProgress::remove); } -// ELASTICITY_TODO pass ranges of individual tablets manager.getEventCoordinator().event(success, "Tablet hosting requested for %d tablets in %s", success.size(), tableId); }
(accumulo) branch elasticity updated: Removed FileUtil.cleanupIndexOp to resolve TODO, related changes (#4385)
This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git The following commit(s) were added to refs/heads/elasticity by this push: new b6769788c0 Removed FileUtil.cleanupIndexOp to resolve TODO, related changes (#4385) b6769788c0 is described below commit b6769788c054407bc1ee269758684a86b8ca6e10 Author: Dave Marion AuthorDate: Fri Apr 5 16:12:51 2024 -0400 Removed FileUtil.cleanupIndexOp to resolve TODO, related changes (#4385) The existing TODO in FileUtil was to determine if the split code in elasticity was missing something. The cleanupIndexOp method was called in earlier versions, but is no longer called in elasticity. I determined that the SplitUtils.IndexIterable.close method was a likely replacement for the cleanupIndexOp method. I removed this method and FileUtilTest as it was only testing this method. The remaining method in FileUtil is only called from Splitter, so I moved the method, related code, and associated test. I also fixed up references that were broken due to the code move. --- .../org/apache/accumulo/server/util/FileUtil.java | 135 .../apache/accumulo/server/util/FileUtilTest.java | 176 - .../apache/accumulo/manager/split/Splitter.java| 81 +- .../manager/tableOps/split/UpdateTablets.java | 6 +- .../manager/upgrade/SplitRecovery12to13.java | 6 +- .../manager/tableOps/split}/FileInfoTest.java | 4 +- .../manager/tableOps/split/UpdateTabletsTest.java | 5 +- 7 files changed, 88 insertions(+), 325 deletions(-) diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/FileUtil.java b/server/base/src/main/java/org/apache/accumulo/server/util/FileUtil.java deleted file mode 100644 index 78a541ca6e..00 --- a/server/base/src/main/java/org/apache/accumulo/server/util/FileUtil.java +++ /dev/null @@ -1,135 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.accumulo.server.util; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Map; -import java.util.Set; - -import org.apache.accumulo.core.file.FileOperations; -import org.apache.accumulo.core.file.FileSKVIterator; -import org.apache.accumulo.core.metadata.TabletFile; -import org.apache.accumulo.server.ServerContext; -import org.apache.accumulo.server.conf.TableConfiguration; -import org.apache.accumulo.server.fs.VolumeManager; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.Text; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class FileUtil { - - public static class FileInfo { -final Text firstRow; -final Text lastRow; - -public FileInfo(Text firstRow, Text lastRow) { - this.firstRow = firstRow; - this.lastRow = lastRow; -} - -public Text getFirstRow() { - return firstRow; -} - -public Text getLastRow() { - return lastRow; -} - } - - private static final Logger log = LoggerFactory.getLogger(FileUtil.class); - - // ELASTICITY_TODO this is only used by test. Determine what the test are doing and if some - // functionality is missing in the new split code. - protected static void cleanupIndexOp(Path tmpDir, VolumeManager fs, - ArrayList readers) throws IOException { -// close all of the index sequence files -for (FileSKVIterator r : readers) { - try { -if (r != null) { - r.close(); -} - } catch (IOException e) { -// okay, try to close the rest anyway -log.error("{}", e.getMessage(), e); - } -} - -if (tmpDir != null) { - FileSystem actualFs = fs.getFileSystemByPath(tmpDir); - if (actualFs.exists(tmpDir)) { -fs.deleteRecursively(tmpDir); -return; - } - - log.error("Did not delete tmp dir because it wasn't a tmp dir {}", tmpDir); -} - } - - public static Map tryToGetFirstAndLastRows( - ServerContext context, TableConfiguration ta
(accumulo) branch elasticity updated: Removed TODO in TabletRefresher (#4417)
This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git The following commit(s) were added to refs/heads/elasticity by this push: new 995f318459 Removed TODO in TabletRefresher (#4417) 995f318459 is described below commit 995f318459efe6e1c3b29cf8fc9c0a27302f04dc Author: Dave Marion AuthorDate: Fri Apr 5 16:13:40 2024 -0400 Removed TODO in TabletRefresher (#4417) Other exceptions which could be more serious than TException would be raised as an ExecutionException when get() is called on the Future and would result in a RuntimeException being raised from TabletRefresher.refreshTablets. If a TException is thrown, then the refresh for all of the Tablets will be retried unless the TabletServer is no longer online. --- .../org/apache/accumulo/manager/tableOps/bulkVer2/TabletRefresher.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/TabletRefresher.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/TabletRefresher.java index a3d341a12b..cb963e583a 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/TabletRefresher.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/TabletRefresher.java @@ -171,9 +171,6 @@ public class TabletRefresher { } catch (TException ex) { log.debug("rpc failed server: " + location + ", " + logId + " " + ex.getMessage(), ex); - // ELASTICITY_TODO are there any other exceptions we should catch in this method and check if - // the tserver is till alive? - // something went wrong w/ RPC return all extents as unrefreshed return refreshes; } finally {
(accumulo) branch elasticity updated (637dd0fd3f -> eea8ce48e2)
This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a change to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git from 637dd0fd3f Removed elasticity comment to do in ExternalCompaction_2_IT (#4416) add 9d4d68b2a3 Changed return variable for ExternalCompactionUtil.getCompactorAddrs (#4419) new eea8ce48e2 Merge branch 'main' into elasticity The 1 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../apache/accumulo/core/clientImpl/InstanceOperationsImpl.java | 2 +- .../accumulo/core/util/compaction/ExternalCompactionUtil.java | 7 --- .../src/main/java/org/apache/accumulo/monitor/Monitor.java| 2 +- .../monitor/rest/compactions/external/CoordinatorInfo.java| 4 ++-- .../monitor/rest/compactions/external/ExternalCompactionInfo.java | 8 .../org/apache/accumulo/test/functional/MemoryStarvedMajCIT.java | 6 -- 6 files changed, 16 insertions(+), 13 deletions(-)
(accumulo) 01/01: Merge branch 'main' into elasticity
This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git commit eea8ce48e2d5d7bd5c6b69e05c6824f697d8de0a Merge: 637dd0fd3f 9d4d68b2a3 Author: Dave Marion AuthorDate: Fri Apr 5 15:09:17 2024 + Merge branch 'main' into elasticity .../apache/accumulo/core/clientImpl/InstanceOperationsImpl.java | 2 +- .../accumulo/core/util/compaction/ExternalCompactionUtil.java | 7 --- .../src/main/java/org/apache/accumulo/monitor/Monitor.java| 2 +- .../monitor/rest/compactions/external/CoordinatorInfo.java| 4 ++-- .../monitor/rest/compactions/external/ExternalCompactionInfo.java | 8 .../org/apache/accumulo/test/functional/MemoryStarvedMajCIT.java | 6 -- 6 files changed, 16 insertions(+), 13 deletions(-) diff --cc core/src/main/java/org/apache/accumulo/core/util/compaction/ExternalCompactionUtil.java index 86181247a8,0046af7dc6..48895192bd --- a/core/src/main/java/org/apache/accumulo/core/util/compaction/ExternalCompactionUtil.java +++ b/core/src/main/java/org/apache/accumulo/core/util/compaction/ExternalCompactionUtil.java @@@ -105,26 -106,26 +106,26 @@@ public class ExternalCompactionUtil } /** - * @return map of queue names to compactor addresses + * @return map of group names to compactor addresses */ - public static Map> getCompactorAddrs(ClientContext context) { + public static Map> getCompactorAddrs(ClientContext context) { try { - final Map> groupsAndAddresses = new HashMap<>(); - final Map> queuesAndAddresses = new HashMap<>(); - final String compactorQueuesPath = context.getZooKeeperRoot() + Constants.ZCOMPACTORS; ++ final Map> groupsAndAddresses = new HashMap<>(); + final String compactorGroupsPath = context.getZooKeeperRoot() + Constants.ZCOMPACTORS; ZooReader zooReader = context.getZooReader(); - List queues = zooReader.getChildren(compactorQueuesPath); - for (String queue : queues) { -queuesAndAddresses.putIfAbsent(queue, new HashSet<>()); + List groups = zooReader.getChildren(compactorGroupsPath); + for (String group : groups) { try { - List compactors = zooReader.getChildren(compactorQueuesPath + "/" + queue); + List compactors = zooReader.getChildren(compactorGroupsPath + "/" + group); for (String compactor : compactors) { // compactor is the address, we are checking to see if there is a child node which // represents the compactor's lock as a check that it's alive. List children = -zooReader.getChildren(compactorQueuesPath + "/" + queue + "/" + compactor); +zooReader.getChildren(compactorGroupsPath + "/" + group + "/" + compactor); if (!children.isEmpty()) { LOG.trace("Found live compactor {} ", compactor); - groupsAndAddresses.putIfAbsent(group, new ArrayList<>()); - queuesAndAddresses.get(queue).add(HostAndPort.fromString(compactor)); ++ groupsAndAddresses.putIfAbsent(group, new HashSet<>()); + groupsAndAddresses.get(group).add(HostAndPort.fromString(compactor)); } } } catch (NoNodeException e) { diff --cc server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/CoordinatorInfo.java index d17ce19f18,8724f758bb..eccda4569e --- a/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/CoordinatorInfo.java +++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/CoordinatorInfo.java @@@ -33,9 -33,9 +33,9 @@@ public class CoordinatorInfo public CoordinatorInfo(Optional serverOpt, ExternalCompactionInfo ecInfo) { server = serverOpt.map(HostAndPort::toString).orElse("none"); -var queueToCompactors = ecInfo.getCompactors(); -numQueues = queueToCompactors.size(); -numCompactors = queueToCompactors.values().stream().mapToInt(Set::size).sum(); +var groupToCompactors = ecInfo.getCompactors(); +numQueues = groupToCompactors.size(); - numCompactors = groupToCompactors.values().stream().mapToInt(List::size).sum(); ++numCompactors = groupToCompactors.values().stream().mapToInt(Set::size).sum(); lastContact = System.currentTimeMillis() - ecInfo.getFetchedTimeMillis(); } } diff --cc test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedMajCIT.java index 03f7442c91,771d74d588..637a71eca8 --- a/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedMajCIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedMajCIT.java @@@ -22,10 -22,
(accumulo) branch main updated: Changed return variable for ExternalCompactionUtil.getCompactorAddrs (#4419)
This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git The following commit(s) were added to refs/heads/main by this push: new 9d4d68b2a3 Changed return variable for ExternalCompactionUtil.getCompactorAddrs (#4419) 9d4d68b2a3 is described below commit 9d4d68b2a373df2c36763a5ed55675a5f8d127a3 Author: Dave Marion AuthorDate: Fri Apr 5 10:55:12 2024 -0400 Changed return variable for ExternalCompactionUtil.getCompactorAddrs (#4419) --- .../apache/accumulo/core/clientImpl/InstanceOperationsImpl.java | 2 +- .../accumulo/core/util/compaction/ExternalCompactionUtil.java | 7 --- .../org/apache/accumulo/coordinator/CompactionCoordinator.java| 8 .../src/main/java/org/apache/accumulo/monitor/Monitor.java| 2 +- .../monitor/rest/compactions/external/CoordinatorInfo.java| 4 ++-- .../monitor/rest/compactions/external/ExternalCompactionInfo.java | 8 6 files changed, 16 insertions(+), 15 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/InstanceOperationsImpl.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/InstanceOperationsImpl.java index 74bd2ece59..215f7c6214 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/InstanceOperationsImpl.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/InstanceOperationsImpl.java @@ -303,7 +303,7 @@ public class InstanceOperationsImpl implements InstanceOperations { public List getActiveCompactions() throws AccumuloException, AccumuloSecurityException { -Map> compactors = ExternalCompactionUtil.getCompactorAddrs(context); +Map> compactors = ExternalCompactionUtil.getCompactorAddrs(context); List tservers = getTabletServers(); int numThreads = Math.max(4, Math.min((tservers.size() + compactors.size()) / 10, 256)); diff --git a/core/src/main/java/org/apache/accumulo/core/util/compaction/ExternalCompactionUtil.java b/core/src/main/java/org/apache/accumulo/core/util/compaction/ExternalCompactionUtil.java index 35c358b7ed..0046af7dc6 100644 --- a/core/src/main/java/org/apache/accumulo/core/util/compaction/ExternalCompactionUtil.java +++ b/core/src/main/java/org/apache/accumulo/core/util/compaction/ExternalCompactionUtil.java @@ -25,6 +25,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; @@ -107,14 +108,14 @@ public class ExternalCompactionUtil { /** * @return map of queue names to compactor addresses */ - public static Map> getCompactorAddrs(ClientContext context) { + public static Map> getCompactorAddrs(ClientContext context) { try { - final Map> queuesAndAddresses = new HashMap<>(); + final Map> queuesAndAddresses = new HashMap<>(); final String compactorQueuesPath = context.getZooKeeperRoot() + Constants.ZCOMPACTORS; ZooReader zooReader = context.getZooReader(); List queues = zooReader.getChildren(compactorQueuesPath); for (String queue : queues) { -queuesAndAddresses.putIfAbsent(queue, new ArrayList<>()); +queuesAndAddresses.putIfAbsent(queue, new HashSet<>()); try { List compactors = zooReader.getChildren(compactorQueuesPath + "/" + queue); for (String compactor : compactors) { diff --git a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java index 7358ce4416..c86a93350c 100644 --- a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java +++ b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java @@ -304,7 +304,7 @@ public class CompactionCoordinator extends AbstractServer long now = System.currentTimeMillis(); - Map> idleCompactors = getIdleCompactors(); + Map> idleCompactors = getIdleCompactors(); TIME_COMPACTOR_LAST_CHECKED.forEach((queue, lastCheckTime) -> { if ((now - lastCheckTime) > getMissingCompactorWarningTime() && QUEUE_SUMMARIES.isCompactionsQueued(queue) && idleCompactors.containsKey(queue)) { @@ -325,16 +325,16 @@ public class CompactionCoordinator extends AbstractServer LOG.info("Shutting down"); } - private Map> getIdleCompactors() { + private Map> getIdleCompactors() { -Map> allCompactors = +Map> allCompactors = ExternalCompactionUtil.getCompactorAddrs(getContext()); Set emptyQueues = new HashSet<>();
(accumulo) branch elasticity updated: Removed elasticity comment to do in ExternalCompaction_2_IT (#4416)
This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git The following commit(s) were added to refs/heads/elasticity by this push: new 637dd0fd3f Removed elasticity comment to do in ExternalCompaction_2_IT (#4416) 637dd0fd3f is described below commit 637dd0fd3f4ce9e408b93db59816e1b2b07b6a9e Author: Dave Marion AuthorDate: Fri Apr 5 10:11:25 2024 -0400 Removed elasticity comment to do in ExternalCompaction_2_IT (#4416) The comment said that the operation id needed to be set when deleting the tablets. This is now done in ReserveTablets.isReady. Co-authored-by: Keith Turner --- .../org/apache/accumulo/test/compaction/ExternalCompaction_2_IT.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_2_IT.java b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_2_IT.java index 937cb03037..4ca3794ca9 100644 --- a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_2_IT.java +++ b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_2_IT.java @@ -238,9 +238,7 @@ public class ExternalCompaction_2_IT extends SharedMiniClusterBase { confirmCompactionCompleted(getCluster().getServerContext(), ecids, TCompactionState.CANCELLED); - // ELASTICITY_TODO make delete table fate op get operation ids before deleting - // there should be no metadata for the table, check to see if the compaction wrote anything - // after table delete + // Ensure compaction did not write anything to metadata table after delete table try (var scanner = client.createScanner(AccumuloTable.METADATA.tableName())) { scanner.setRange(MetadataSchema.TabletsSection.getRange(tid)); assertEquals(0, scanner.stream().count());
(accumulo) branch elasticity updated: Added missing Fate.shutdown in FateOpsCommandsIT (#4428)
This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git The following commit(s) were added to refs/heads/elasticity by this push: new 996952ff55 Added missing Fate.shutdown in FateOpsCommandsIT (#4428) 996952ff55 is described below commit 996952ff5515c1525788cf6cbdee4a752f440229 Author: Dave Marion AuthorDate: Fri Mar 29 09:03:48 2024 -0400 Added missing Fate.shutdown in FateOpsCommandsIT (#4428) --- test/src/main/java/org/apache/accumulo/test/fate/FateOpsCommandsIT.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/test/src/main/java/org/apache/accumulo/test/fate/FateOpsCommandsIT.java b/test/src/main/java/org/apache/accumulo/test/fate/FateOpsCommandsIT.java index b716d12d5f..5bddd4f35c 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/FateOpsCommandsIT.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/FateOpsCommandsIT.java @@ -267,6 +267,8 @@ public abstract class FateOpsCommandsIT extends ConfigurableMacBase || result.contains( "Fate ID Filters: [" + fateId2.canonical() + ", " + fateId1.canonical() + "]")); assertTrue(result.contains("Instance Types Filters: [" + store.type().name() + "]")); + +fate.shutdown(10, TimeUnit.MINUTES); } @Test
(accumulo) branch elasticity updated: Uncomment code in Monitor that retrieved TServer active compactions (#4407)
This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git The following commit(s) were added to refs/heads/elasticity by this push: new 98b994ce0a Uncomment code in Monitor that retrieved TServer active compactions (#4407) 98b994ce0a is described below commit 98b994ce0ad7d67a0e9d028b227909f7b7882ac7 Author: Dave Marion AuthorDate: Fri Mar 22 08:41:14 2024 -0400 Uncomment code in Monitor that retrieved TServer active compactions (#4407) Code was commented out in the Monitor that called TabletClientHandler.getActiveCompactions. This method was removed when major compactions were removed from the tserver, then restored in #3827 because we still need to report minor compaction stats to the monitor. --- .../monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java | 6 ++ 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java b/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java index 9f8eb68473..1743235fd5 100644 --- a/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java +++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java @@ -762,10 +762,8 @@ public class Monitor extends AbstractServer implements HighlyAvailableService { Client tserver = null; try { tserver = ThriftUtil.getClient(ThriftClientTypes.TABLET_SERVER, parsedServer, context); -// ELASTICITY_TODO tservers no longer have any compaction information, following code was -// commented out as the thrift calls no longer exists -// var compacts = tserver.getActiveCompactions(null, context.rpcCreds()); -// allCompactions.put(parsedServer, new CompactionStats(compacts)); +var compacts = tserver.getActiveCompactions(null, context.rpcCreds()); +allCompactions.put(parsedServer, new CompactionStats(compacts)); compactsFetchedNanos = System.nanoTime(); } catch (Exception ex) { log.debug("Failed to get active compactions from {}", server, ex);
(accumulo) branch elasticity updated: Modified TabletGoalState log msg to remove todo (#4406)
This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git The following commit(s) were added to refs/heads/elasticity by this push: new 6176c3d07c Modified TabletGoalState log msg to remove todo (#4406) 6176c3d07c is described below commit 6176c3d07cd57d9e208a6bb8e2580a2e62ae8545 Author: Dave Marion AuthorDate: Fri Mar 22 08:38:01 2024 -0400 Modified TabletGoalState log msg to remove todo (#4406) --- .../accumulo/server/manager/state/TabletGoalState.java | 13 - 1 file changed, 4 insertions(+), 9 deletions(-) diff --git a/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletGoalState.java b/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletGoalState.java index 0b9b83f159..ca9b2ada8c 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletGoalState.java +++ b/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletGoalState.java @@ -146,15 +146,10 @@ public enum TabletGoalState { return UNASSIGNED; } } else { - // ELASTICITY_TODO this log level was set to error so that this case can be examined for - // bugs. A tablet server should always have a resource group. If there are unavoidable - // race conditions for getting tablet servers and their RGs, that that should be handled - // in the TabletManagementParameters data acquisition phase so that not all code has to - // deal with it. Eventually this log level should possibly be adjusted or converted to an - // exception. - log.error( - "Could not find resource group for tserver {}, so did not consult balancer. Need to determine the cause of this.", - tm.getLocation().getServerInstance()); + log.warn("Could not find resource group for tserver {}, did not consult balancer to" + + " check if tablet {} needs to be re-assigned. This tablet will be rechecked" + + " soon. If this condition is not transient, then it could indicate a bug so" + + " please report it.", tm.getLocation().getServerInstance(), tm.getExtent()); } }
(accumulo) branch elasticity updated: Made tablet refresh thread pool size configurable (#4405)
This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git The following commit(s) were added to refs/heads/elasticity by this push: new 1b7861b063 Made tablet refresh thread pool size configurable (#4405) 1b7861b063 is described below commit 1b7861b0633d93e3f4dc0a7c8a177dfce296ed9b Author: Dave Marion AuthorDate: Fri Mar 22 08:34:49 2024 -0400 Made tablet refresh thread pool size configurable (#4405) --- .../java/org/apache/accumulo/core/conf/Property.java | 12 .../java/org/apache/accumulo/manager/Manager.java | 12 .../manager/tableOps/bulkVer2/RefreshTablets.java | 9 ++--- .../manager/tableOps/bulkVer2/TabletRefresher.java| 19 ++- .../manager/tableOps/compact/CompactionDriver.java| 3 +-- .../manager/tableOps/compact/RefreshTablets.java | 3 +-- 6 files changed, 34 insertions(+), 24 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java index a3057f9d36..d5a9504f28 100644 --- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java +++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java @@ -367,6 +367,18 @@ public enum Property { "Maximum number of threads the TabletGroupWatcher will use in its BatchScanner to" + " look for tablets that need maintenance.", "4.0.0"), + MANAGER_TABLET_REFRESH_MINTHREADS("manager.tablet.refresh.threads.mininum", "10", + PropertyType.COUNT, + "The Manager will notify TabletServers that a Tablet needs to be refreshed after certain operations" + + " are performed (e.g. Bulk Import). This property specifies the number of core threads in a" + + " ThreadPool in the Manager that will be used to request these refresh operations.", + "4.0.0"), + MANAGER_TABLET_REFRESH_MAXTHREADS("manager.tablet.refresh.threads.maximum", "10", + PropertyType.COUNT, + "The Manager will notify TabletServers that a Tablet needs to be refreshed after certain operations" + + " are performed (e.g. Bulk Import). This property specifies the maximum number of threads in a" + + " ThreadPool in the Manager that will be used to request these refresh operations.", + "4.0.0"), MANAGER_BULK_TIMEOUT("manager.bulk.timeout", "5m", PropertyType.TIMEDURATION, "The time to wait for a tablet server to process a bulk import request.", "1.4.3"), MANAGER_RENAME_THREADS("manager.rename.threadpool.size", "20", PropertyType.COUNT, diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java index 174edd3dcf..d17f5f570c 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java @@ -50,6 +50,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@ -236,6 +237,7 @@ public class Manager extends AbstractServer private final long timeToCacheRecoveryWalExistence; private ExecutorService tableInformationStatusPool = null; + private ThreadPoolExecutor tabletRefreshThreadPool; private final TabletStateStore rootTabletStore; private final TabletStateStore metadataTabletStore; @@ -436,6 +438,10 @@ public class Manager extends AbstractServer return getContext().getTableManager(); } + public ThreadPoolExecutor getTabletRefreshThreadPool() { +return tabletRefreshThreadPool; + } + public static void main(String[] args) throws Exception { try (Manager manager = new Manager(new ConfigOpts(), args)) { manager.runServer(); @@ -991,6 +997,11 @@ public class Manager extends AbstractServer tableInformationStatusPool = ThreadPools.getServerThreadPools() .createExecutorService(getConfiguration(), Property.MANAGER_STATUS_THREAD_POOL_SIZE, false); +tabletRefreshThreadPool = ThreadPools.getServerThreadPools().getPoolBuilder("Tablet refresh ") + .numCoreThreads(getConfiguration().getCount(Property.MANAGER_TABLET_REFRESH_MINTHREADS)) + .numMaxThreads(getConfiguration().getCount(Property.MANAGER_TABLET_REFRESH_MAXTHREADS)) +.build(); + Thread st
(accumulo) 01/01: Merge branch 'main' into elasticity
This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git commit 0ea9f6a1dd9386ee1f7a8789094cc89e812923d7 Merge: 1d6fd65ded a3a6fc2634 Author: Dave Marion AuthorDate: Fri Mar 22 12:28:27 2024 + Merge branch 'main' into elasticity .../main/java/org/apache/accumulo/server/rpc/TServerUtils.java | 9 + 1 file changed, 9 insertions(+)
(accumulo) branch main updated (eae367264b -> a3a6fc2634)
This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a change to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git from eae367264b Merge branch '2.1' add 33894e6997 Fixed race condition in TServerUtils exposed by TServerUtilsTest (#4413) new a3a6fc2634 Merge branch '2.1' The 1 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../main/java/org/apache/accumulo/server/rpc/TServerUtils.java | 9 + 1 file changed, 9 insertions(+)
(accumulo) branch elasticity updated (1d6fd65ded -> 0ea9f6a1dd)
This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a change to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git from 1d6fd65ded Merge branch 'main' into elasticity add 33894e6997 Fixed race condition in TServerUtils exposed by TServerUtilsTest (#4413) add a3a6fc2634 Merge branch '2.1' new 0ea9f6a1dd Merge branch 'main' into elasticity The 1 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../main/java/org/apache/accumulo/server/rpc/TServerUtils.java | 9 + 1 file changed, 9 insertions(+)
(accumulo) 01/01: Merge branch '2.1'
This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git commit a3a6fc2634d2d067d4c951f441bcebb2a06ed7b0 Merge: eae367264b 33894e6997 Author: Dave Marion AuthorDate: Fri Mar 22 12:28:06 2024 + Merge branch '2.1' .../main/java/org/apache/accumulo/server/rpc/TServerUtils.java | 9 + 1 file changed, 9 insertions(+) diff --cc server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java index 5f0cebd05a,9f37990926..0f28ecab8b --- a/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java +++ b/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java @@@ -46,7 -46,9 +46,8 @@@ import org.apache.accumulo.core.rpc.Ssl import org.apache.accumulo.core.rpc.ThriftUtil; import org.apache.accumulo.core.rpc.UGIAssumingTransportFactory; import org.apache.accumulo.core.util.Halt; -import org.apache.accumulo.core.util.HostAndPort; import org.apache.accumulo.core.util.Pair; + import org.apache.accumulo.core.util.UtilWaitThread; import org.apache.accumulo.core.util.threads.ThreadPools; import org.apache.accumulo.core.util.threads.Threads; import org.apache.accumulo.server.ServerContext; @@@ -70,7 -72,7 +71,8 @@@ import org.apache.thrift.transport.TTra import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import com.google.common.base.Preconditions; +import com.google.common.net.HostAndPort; import com.google.common.primitives.Ints; /**
(accumulo) branch 2.1 updated: Fixed race condition in TServerUtils exposed by TServerUtilsTest (#4413)
This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch 2.1 in repository https://gitbox.apache.org/repos/asf/accumulo.git The following commit(s) were added to refs/heads/2.1 by this push: new 33894e6997 Fixed race condition in TServerUtils exposed by TServerUtilsTest (#4413) 33894e6997 is described below commit 33894e69979afc70efca448ea31fb29ac73288f3 Author: Dave Marion AuthorDate: Fri Mar 22 08:14:21 2024 -0400 Fixed race condition in TServerUtils exposed by TServerUtilsTest (#4413) There are several tests in TServerUtilsTest that have the form: ``` TServer server = null; try { ServerAddress address = startServer(); server = address.getServer(); } finally { if (server != null) { server.stop(); } } ``` The TServerUtilsTest.startServer method calls TServerUtils.startServer which ends up creating a Thread that calls TServer.serve(). When TServer is a TThreadPoolServer, the serve method calls preServe first, which sets the internal boolean variable `stopped_` to false, and then calls execute which will loop while `stopped_` is false. In the case where the Thread created by TServerUtils.startServer is not started right away, then it's possible that the test method will call stop (setting `stopped_` to true) before the TServer.serve method calls preServe (setting `stopped_` back to false) resulting in the Thread being in an endless loop. This can be seen by running TServerTestUtils, where the output contains many lines like: ``` [server.TThreadPoolServer] WARN : Transport error occurred during acceptance of message org.apache.thrift.transport.TTransportException: No underlying server socket. at org.apache.thrift.transport.TServerSocket.accept(TServerSocket.java:113) ~[libthrift-0.17.0.jar:0.17.0] at org.apache.thrift.transport.TServerSocket.accept(TServerSocket.java:31) ~[libthrift-0.17.0.jar:0.17.0] at org.apache.thrift.server.TThreadPoolServer.execute(TThreadPoolServer.java:162) ~[libthrift-0.17.0.jar:0.17.0] at org.apache.thrift.server.TThreadPoolServer.serve(TThreadPoolServer.java:148) ~[libthrift-0.17.0.jar:0.17.0] at org.apache.accumulo.server.rpc.TServerUtils.lambda$startTServer$9(TServerUtils.java:654) ~[classes/:?] at org.apache.accumulo.core.trace.TraceWrappedRunnable.run(TraceWrappedRunnable.java:52) [accumulo-core-2.1.3-SNAPSHOT.jar:2.1.3-SNAPSHOT] at java.base/java.lang.Thread.run(Thread.java:840) [?:?] ``` Because the surefire configuration reuses JVM forks these Threads persist for the duration of the unit tests in server base and pollute every test output file after TServerUtilsTest is executed. The surefire forkCount is set to `1C`, so the volume of output in the logs is dependent on the number of forks. Co-authored-by: Keith Turner --- .../main/java/org/apache/accumulo/server/rpc/TServerUtils.java | 9 + 1 file changed, 9 insertions(+) diff --git a/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java b/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java index 8ae472cf8b..9f37990926 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java +++ b/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java @@ -48,6 +48,7 @@ import org.apache.accumulo.core.rpc.UGIAssumingTransportFactory; import org.apache.accumulo.core.util.Halt; import org.apache.accumulo.core.util.HostAndPort; import org.apache.accumulo.core.util.Pair; +import org.apache.accumulo.core.util.UtilWaitThread; import org.apache.accumulo.core.util.threads.ThreadPools; import org.apache.accumulo.core.util.threads.Threads; import org.apache.accumulo.server.ServerContext; @@ -71,6 +72,7 @@ import org.apache.thrift.transport.TTransportFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.base.Preconditions; import com.google.common.primitives.Ints; /** @@ -657,6 +659,13 @@ public class TServerUtils { } }).start(); +while (!finalServer.isServing()) { + // Wait for the thread to start and for the TServer to start + // serving events + UtilWaitThread.sleep(10); + Preconditions.checkState(!finalServer.getShouldStop()); +} + // check for the special "bind to everything address" if (serverAddress.address.getHost().equals("0.0.0.0")) { // can't get the address from the bind, so we'll do our best to invent our hostname
(accumulo) branch elasticity updated: Reduced number of compactions in CompactionConfigChangeIT (#4402)
This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git The following commit(s) were added to refs/heads/elasticity by this push: new ce83488245 Reduced number of compactions in CompactionConfigChangeIT (#4402) ce83488245 is described below commit ce834882456546df79e957f2f242adc37d059bdc Author: Dave Marion AuthorDate: Tue Mar 19 12:20:55 2024 -0400 Reduced number of compactions in CompactionConfigChangeIT (#4402) The test waits for 60s for the number of F files to reach zero. However, there are 100 files and there is a comment that says each compaction should take about 1s. I reduced the number of files from 100 to 50 to allow the test to complete in the allotted time. --- .../apache/accumulo/test/compaction/CompactionConfigChangeIT.java | 8 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/test/src/main/java/org/apache/accumulo/test/compaction/CompactionConfigChangeIT.java b/test/src/main/java/org/apache/accumulo/test/compaction/CompactionConfigChangeIT.java index c734493287..fe1783b841 100644 --- a/test/src/main/java/org/apache/accumulo/test/compaction/CompactionConfigChangeIT.java +++ b/test/src/main/java/org/apache/accumulo/test/compaction/CompactionConfigChangeIT.java @@ -75,26 +75,26 @@ public class CompactionConfigChangeIT extends AccumuloClusterHarness { try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) { final String table = getUniqueNames(1)[0]; - createTable(client, table, "cs1", 100); + createTable(client, table, "cs1", 50); ExternalCompactionTestUtils.writeData(client, table, MAX_DATA); client.tableOperations().flush(table, null, null, true); - assertEquals(100, countFiles(client, table, "F")); + assertEquals(50, countFiles(client, table, "F")); // Start 100 slow compactions, each compaction should take ~1 second. There are 2 tservers // each with 2 threads and then 8 threads. CompactionConfig compactionConfig = new CompactionConfig(); IteratorSetting iteratorSetting = new IteratorSetting(100, SlowIterator.class); - SlowIterator.setSleepTime(iteratorSetting, 100); + SlowIterator.setSleepTime(iteratorSetting, 50); compactionConfig.setIterators(List.of(iteratorSetting)); compactionConfig.setWait(false); client.tableOperations().compact(table, compactionConfig); // give some time for compactions to start running - Wait.waitFor(() -> countFiles(client, table, "F") < 95); + Wait.waitFor(() -> countFiles(client, table, "F") < 45); // Change config deleting groups named small, medium, and large. There was bug where // deleting groups running compactions would leave the tablet in a bad state for future
(accumulo) branch elasticity updated: Resurrected CompactionDriverTest resolving TODO in PreDeleteTable (#4377)
This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git The following commit(s) were added to refs/heads/elasticity by this push: new ca56493db7 Resurrected CompactionDriverTest resolving TODO in PreDeleteTable (#4377) ca56493db7 is described below commit ca56493db7eb122345a7c954c820fe9b51e4687e Author: Dave Marion AuthorDate: Tue Mar 19 12:20:17 2024 -0400 Resurrected CompactionDriverTest resolving TODO in PreDeleteTable (#4377) The TODO asked if the the delete marker in ZooKeeper was still being used / tested. It is still being used by the CompactionDriver. However, the CompactionDriverTest class was deleted at some point which tests it. I resurrected and fixed the test. --- .../manager/tableOps/compact/CompactionDriver.java | 3 +- .../manager/tableOps/delete/PreDeleteTable.java| 1 - .../tableOps/compact/CompactionDriverTest.java | 148 + 3 files changed, 150 insertions(+), 2 deletions(-) diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriver.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriver.java index f8ad23172b..6167ca05cf 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriver.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriver.java @@ -129,7 +129,8 @@ class CompactionDriver extends ManagerRepo { return sleepTime; } - private boolean isCancelled(FateId fateId, ServerContext context) + // visible for testing + protected boolean isCancelled(FateId fateId, ServerContext context) throws InterruptedException, KeeperException { return CompactionConfigStorage.getConfig(context, fateId) == null; } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/delete/PreDeleteTable.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/delete/PreDeleteTable.java index 196e898c09..6094960b38 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/delete/PreDeleteTable.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/delete/PreDeleteTable.java @@ -60,7 +60,6 @@ public class PreDeleteTable extends ManagerRepo { private void preventFutureCompactions(Manager environment) throws KeeperException, InterruptedException { -// ELASTICITY_TODO investigate this. Is still needed? Is it still working as expected? String deleteMarkerPath = createDeleteMarkerPath(environment.getInstanceID(), tableId); ZooReaderWriter zoo = environment.getContext().getZooReaderWriter(); zoo.putPersistentData(deleteMarkerPath, new byte[] {}, NodeExistsPolicy.SKIP); diff --git a/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriverTest.java b/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriverTest.java new file mode 100644 index 00..caf8be89d5 --- /dev/null +++ b/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriverTest.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.manager.tableOps.compact; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import java.util.UUID; + +import org.apache.accumulo.core.clientImpl.AcceptableThriftTableOperationException; +import org.apache.accumulo.core.clientImpl.TableOperationsImpl; +import org.apache.accumulo.core.clientImpl.thrift.TableOperation; +import org.apache.accumulo.core.clientImpl.thrift.TableOperationExceptionType; +import org.apache.accumulo.core.data.InstanceId; +import org.apache.accumulo.core.data.NamespaceId; +import org.apache.accumulo.core.data.TableId; +import org.apache.accumulo.core.fate.FateId; +import org.apache.accumulo.core.fate.FateInstanceType; +import org.apache.accumulo.core.fate.zookeeper.ZooR
(accumulo) branch elasticity updated: Resolved todo in tabletserver.thrift, removed majors from TabletStats (#4366)
This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git The following commit(s) were added to refs/heads/elasticity by this push: new 2a2b44670e Resolved todo in tabletserver.thrift, removed majors from TabletStats (#4366) 2a2b44670e is described below commit 2a2b44670e732ad1e50c9c10ccad52d258190e28 Author: Dave Marion AuthorDate: Tue Mar 19 10:26:35 2024 -0400 Resolved todo in tabletserver.thrift, removed majors from TabletStats (#4366) --- .../core/tabletserver/thrift/TabletStats.java | 132 ++--- core/src/main/thrift/tabletserver.thrift | 4 +- .../core/spi/balancer/SimpleLoadBalancerTest.java | 2 +- .../monitor/rest/tservers/CurrentOperations.java | 12 +- .../tservers/TabletServerDetailInformation.java| 6 +- .../rest/tservers/TabletServerResource.java| 29 + .../apache/accumulo/tserver/TabletStatsKeeper.java | 2 +- .../accumulo/test/ChaoticLoadBalancerTest.java | 2 +- 8 files changed, 22 insertions(+), 167 deletions(-) diff --git a/core/src/main/thrift-gen-java/org/apache/accumulo/core/tabletserver/thrift/TabletStats.java b/core/src/main/thrift-gen-java/org/apache/accumulo/core/tabletserver/thrift/TabletStats.java index b975e7b22e..a1b5b2c2ac 100644 --- a/core/src/main/thrift-gen-java/org/apache/accumulo/core/tabletserver/thrift/TabletStats.java +++ b/core/src/main/thrift-gen-java/org/apache/accumulo/core/tabletserver/thrift/TabletStats.java @@ -29,7 +29,6 @@ public class TabletStats implements org.apache.thrift.TBase tmpMap = new java.util.EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class); tmpMap.put(_Fields.EXTENT, new org.apache.thrift.meta_data.FieldMetaData("extent", org.apache.thrift.TFieldRequirementType.DEFAULT, new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, org.apache.accumulo.core.dataImpl.thrift.TKeyExtent.class))); -tmpMap.put(_Fields.MAJORS, new org.apache.thrift.meta_data.FieldMetaData("majors", org.apache.thrift.TFieldRequirementType.DEFAULT, -new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, ActionStats.class))); tmpMap.put(_Fields.MINORS, new org.apache.thrift.meta_data.FieldMetaData("minors", org.apache.thrift.TFieldRequirementType.DEFAULT, new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, ActionStats.class))); tmpMap.put(_Fields.SPLITS, new org.apache.thrift.meta_data.FieldMetaData("splits", org.apache.thrift.TFieldRequirementType.DEFAULT, @@ -158,7 +151,6 @@ public class TabletStats implements org.apache.thrift.TBase tsStats = new ArrayList<>(); try { @@ -205,21 +204,11 @@ public class TabletServerResource { if (total.minors.elapsed != 0 && total.minors.num != 0) { currentMinorStdDev = stddev(total.minors.elapsed, total.minors.num, total.minors.sumDev); } -if (total.majors.num != 0) { - currentMajorAvg = total.majors.elapsed / total.majors.num; -} -if (total.majors.elapsed != 0 && total.majors.num != 0 -&& total.majors.elapsed > total.majors.num) { - currentMajorStdDev = stddev(total.majors.elapsed, total.majors.num, total.majors.sumDev); -} ActionStatsUpdator.update(total.minors, historical.minors); -ActionStatsUpdator.update(total.majors, historical.majors); minorStdDev = stddev(total.minors.elapsed, total.minors.num, total.minors.sumDev); minorQueueStdDev = stddev(total.minors.queueTime, total.minors.num, total.minors.queueSumDev); -majorStdDev = stddev(total.majors.elapsed, total.majors.num, total.majors.sumDev); -majorQueueStdDev = stddev(total.majors.queueTime, total.majors.num, total.majors.queueSumDev); splitStdDev = stddev(historical.splits.elapsed, historical.splits.num, historical.splits.sumDev); @@ -267,7 +256,7 @@ public class TabletServerResource { private TabletServerDetailInformation doDetails(int numTablets) { return new TabletServerDetailInformation(numTablets, total.numEntries, total.minors.status, -total.majors.status, historical.splits.status); +historical.splits.status); } private List doAllTimeResults(double majorQueueStdDev, @@ -282,12 +271,6 @@ public class TabletServerResource { minorQueueStdDev, total.minors.num != 0 ? (total.minors.elapsed / total.minors.num) : null, minorStdDev, total.minors.elapsed)); -// Major Compaction Operation -allTime.add(new AllTimeTabletResults("MajorCompaction", total.majors.num, -total.majors.fail, -total.majors.num != 0 ? (total.majors.queueTime / total.majors.num) : null, -majorQueueStdDev, total.majors.num != 0 ? (tot
(accumulo) branch elasticity updated: Call requireSame(SUSPEND) in MetaDataStateStore to resolve todo (#4381)
This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git The following commit(s) were added to refs/heads/elasticity by this push: new 5cb7f7b87b Call requireSame(SUSPEND) in MetaDataStateStore to resolve todo (#4381) 5cb7f7b87b is described below commit 5cb7f7b87b6f6b7b69dbdd6e170e537274d1d717 Author: Dave Marion AuthorDate: Tue Mar 19 10:25:04 2024 -0400 Call requireSame(SUSPEND) in MetaDataStateStore to resolve todo (#4381) --- .../MiniAccumuloClusterControl.java| 20 + .../server/manager/state/MetaDataStateStore.java | 7 +- .../metadata/ConditionalTabletMutatorImpl.java | 11 +++ .../test/functional/AmpleConditionalWriterIT.java | 85 ++ 4 files changed, 120 insertions(+), 3 deletions(-) diff --git a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterControl.java b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterControl.java index 9bf50cff2e..1e10be5faa 100644 --- a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterControl.java +++ b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterControl.java @@ -274,6 +274,26 @@ public class MiniAccumuloClusterControl implements ClusterControl { } } + public void stopTabletServerGroup(String tserverResourceGroup) { +synchronized (tabletServerProcesses) { + var group = tabletServerProcesses.get(tserverResourceGroup); + if (group == null) { +return; + } + group.forEach(process -> { +try { + cluster.stopProcessWithTimeout(process, 30, TimeUnit.SECONDS); +} catch (ExecutionException | TimeoutException e) { + log.warn("TabletServer did not fully stop after 30 seconds", e); + throw new RuntimeException(e); +} catch (InterruptedException e) { + Thread.currentThread().interrupt(); +} + }); + tabletServerProcesses.remove(tserverResourceGroup); +} + } + @Override public synchronized void stop(ServerType server, String hostname) throws IOException { switch (server) { diff --git a/server/base/src/main/java/org/apache/accumulo/server/manager/state/MetaDataStateStore.java b/server/base/src/main/java/org/apache/accumulo/server/manager/state/MetaDataStateStore.java index 689a667e4b..f69f280ed4 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/manager/state/MetaDataStateStore.java +++ b/server/base/src/main/java/org/apache/accumulo/server/manager/state/MetaDataStateStore.java @@ -18,6 +18,8 @@ */ package org.apache.accumulo.server.manager.state; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.SUSPEND; + import java.util.Collection; import java.util.List; @@ -68,9 +70,8 @@ class MetaDataStateStore extends AbstractTabletStateStore implements TabletState try (var tabletsMutator = ample.conditionallyMutateTablets()) { for (TabletMetadata tm : tablets) { if (tm.getSuspend() != null) { - // ELASTICITY_TODO add conditional mutation check that tls.suspend is what currently - // exists in the tablet - tabletsMutator.mutateTablet(tm.getExtent()).requireAbsentOperation().deleteSuspension() + tabletsMutator.mutateTablet(tm.getExtent()).requireAbsentOperation() + .requireSame(tm, SUSPEND).deleteSuspension() .submit(tabletMetadata -> tabletMetadata.getSuspend() == null); } } diff --git a/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletMutatorImpl.java b/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletMutatorImpl.java index 6003fd73e6..6995941089 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletMutatorImpl.java +++ b/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletMutatorImpl.java @@ -24,6 +24,7 @@ import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSec import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily.OPID_COLUMN; import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily.SELECTED_COLUMN; import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily.TIME_COLUMN; +import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.SuspendLocationColumn.SUSPEND_COLUMN; import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily.AVAILABILITY_COLUMN; import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN
(accumulo) branch elasticity updated: Removed MAC methods deprecated in 3.1 (#4395)
This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git The following commit(s) were added to refs/heads/elasticity by this push: new 59b873b49c Removed MAC methods deprecated in 3.1 (#4395) 59b873b49c is described below commit 59b873b49c71649e633b95382088be6a4c94f11e Author: Dave Marion AuthorDate: Mon Mar 18 13:18:19 2024 -0400 Removed MAC methods deprecated in 3.1 (#4395) --- .../accumulo/minicluster/MiniAccumuloConfig.java | 35 -- .../accumulo/minicluster/MiniAccumuloRunner.java | 4 --- 2 files changed, 39 deletions(-) diff --git a/minicluster/src/main/java/org/apache/accumulo/minicluster/MiniAccumuloConfig.java b/minicluster/src/main/java/org/apache/accumulo/minicluster/MiniAccumuloConfig.java index d43008be97..6bf6739d5d 100644 --- a/minicluster/src/main/java/org/apache/accumulo/minicluster/MiniAccumuloConfig.java +++ b/minicluster/src/main/java/org/apache/accumulo/minicluster/MiniAccumuloConfig.java @@ -51,31 +51,6 @@ public class MiniAccumuloConfig { this.impl = new MiniAccumuloConfigImpl(dir, rootPassword); } - /** - * Calling this method is optional. If not set, it defaults to two. - * - * @param numTservers the number of tablet servers that mini accumulo cluster should start - */ - // ELASTICITY_TODO: Deprecate in 3.0.0 and remove in elasticity on the merge - @Deprecated(since = "3.1.0") - public MiniAccumuloConfig setNumTservers(int numTservers) { -// impl.setNumTservers(numTservers); -return this; - } - - /** - * Calling this method is optional. If not set, it defaults to zero. - * - * @param numScanServers the number of scan servers that mini accumulo cluster should start - * @since 2.1.0 - */ - // ELASTICITY_TODO: Deprecate in 3.0.0 and remove in elasticity on the merge - @Deprecated(since = "3.1.0") - public MiniAccumuloConfig setNumScanServers(int numScanServers) { -// impl.setNumScanServers(numScanServers); -return this; - } - /** * Calling this method is optional. If not set, defaults to 'miniInstance' * @@ -225,16 +200,6 @@ public class MiniAccumuloConfig { return impl.getRootPassword(); } - /** - * @return the number of tservers configured for this cluster - */ - // ELASTICITY_TODO: Deprecate in 3.0.0 and remove in elasticity on the merge - @Deprecated(since = "3.1.0") - public int getNumTservers() { -return impl.getClusterServerConfiguration().getTabletServerConfiguration().values().stream() -.reduce(0, Integer::sum); - } - /** * @return is the current configuration in jdwpEnabled mode? * diff --git a/minicluster/src/main/java/org/apache/accumulo/minicluster/MiniAccumuloRunner.java b/minicluster/src/main/java/org/apache/accumulo/minicluster/MiniAccumuloRunner.java index 48dcf7a337..70e8c388a9 100644 --- a/minicluster/src/main/java/org/apache/accumulo/minicluster/MiniAccumuloRunner.java +++ b/minicluster/src/main/java/org/apache/accumulo/minicluster/MiniAccumuloRunner.java @@ -150,7 +150,6 @@ public class MiniAccumuloRunner { * * @param args An optional -p argument can be specified with the path to a valid properties file. */ - @SuppressWarnings("deprecation") @SuppressFBWarnings(value = {"PATH_TRAVERSAL_IN", "UNENCRYPTED_SERVER_SOCKET"}, justification = "code runs in same security context as user who provided input file name; " + "socket need not be encrypted, since this class is provided for testing only") @@ -181,9 +180,6 @@ public class MiniAccumuloRunner { if (opts.prop.containsKey(INSTANCE_NAME_PROP)) { config.setInstanceName(opts.prop.getProperty(INSTANCE_NAME_PROP)); } -if (opts.prop.containsKey(NUM_T_SERVERS_PROP)) { - config.setNumTservers(Integer.parseInt(opts.prop.getProperty(NUM_T_SERVERS_PROP))); -} if (opts.prop.containsKey(ZOO_KEEPER_PORT_PROP)) { config.setZooKeeperPort(Integer.parseInt(opts.prop.getProperty(ZOO_KEEPER_PORT_PROP))); }
(accumulo) branch elasticity updated: Updated coordinator log warning to account for busy compactors (#4372)
This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git The following commit(s) were added to refs/heads/elasticity by this push: new 4099860261 Updated coordinator log warning to account for busy compactors (#4372) 4099860261 is described below commit 4099860261a6cdb68700176ced44eb0519420e88 Author: Dave Marion AuthorDate: Mon Mar 18 11:00:08 2024 -0400 Updated coordinator log warning to account for busy compactors (#4372) Modified the logic in CompactionCoordinator to only warn about compactors not checking in when there are idle compactors for that group. Refactored code to remove a TODO. Fixes #4219 --- .../coordinator/CompactionCoordinator.java | 84 ++ .../compaction/CompactionCoordinatorTest.java | 12 ++-- 2 files changed, 64 insertions(+), 32 deletions(-) diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java index 178b4f1e95..48419a47a0 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java @@ -18,11 +18,9 @@ */ package org.apache.accumulo.manager.compaction.coordinator; -import static java.util.concurrent.TimeUnit.MINUTES; import static java.util.concurrent.TimeUnit.SECONDS; import static java.util.stream.Collectors.groupingBy; import static java.util.stream.Collectors.toList; -import static java.util.stream.Collectors.toMap; import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.COMPACTED; import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.ECOMP; import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.FILES; @@ -37,12 +35,15 @@ import java.time.Duration; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Optional; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -163,7 +164,7 @@ public class CompactionCoordinator private final CompactionJobQueues jobQueues; private final AtomicReference>> fateInstances; // Exposed for tests - protected volatile Boolean shutdown = false; + protected CountDownLatch shutdown = new CountDownLatch(1); private final ScheduledThreadPoolExecutor schedExecutor; @@ -220,7 +221,7 @@ public class CompactionCoordinator } public void shutdown() { -shutdown = true; +shutdown.countDown(); var localThread = serviceThread; if (localThread != null) { try { @@ -243,6 +244,28 @@ public class CompactionCoordinator ThreadPools.watchNonCriticalScheduledTask(future); } + protected void startIdleCompactionWatcher() { + +ScheduledFuture future = schedExecutor.scheduleWithFixedDelay(this::idleCompactionWarning, +getTServerCheckInterval(), getTServerCheckInterval(), TimeUnit.MILLISECONDS); +ThreadPools.watchNonCriticalScheduledTask(future); + } + + private void idleCompactionWarning() { + +long now = System.currentTimeMillis(); +Map> idleCompactors = getIdleCompactors(); +TIME_COMPACTOR_LAST_CHECKED.forEach((groupName, lastCheckTime) -> { + if ((now - lastCheckTime) > getMissingCompactorWarningTime() + && jobQueues.getQueuedJobs(groupName) > 0 + && idleCompactors.containsKey(groupName.canonical())) { +LOG.warn("No compactors have checked in with coordinator for group {} in {}ms", groupName, +getMissingCompactorWarningTime()); + } +}); + + } + @Override public void run() { @@ -270,35 +293,40 @@ public class CompactionCoordinator startDeadCompactionDetector(); -// ELASTICITY_TODO the main function of the following loop was getting group summaries from -// tservers. Its no longer doing that. May be best to remove the loop and make the remaining -// task a scheduled one. - -LOG.info("Starting loop to check for compactors not checking in"); -while (!shutdown) { - long start = System.currentTimeMillis(); - - long now = System.currentTimeMillis(); - TIME_COMPACTOR_LAST_CHECKED.forEach((k, v) -> { -if ((now - v) > getMissingCompactorWarningTime()) { - // ELASTICITY_TODO may want t
(accumulo) branch elasticity updated (158bbaddf7 -> bf9a6a59fe)
This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a change to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git from 158bbaddf7 Resolve TODO in SplitCancelsMajCIT (#4382) add 8b0262d5b3 Deprecated MiniAccumuloConfig setNumServer methods (#4374) new bf9a6a59fe Merge branch 'main' into elasticity The 1 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../main/java/org/apache/accumulo/minicluster/MiniAccumuloConfig.java | 3 +++ .../main/java/org/apache/accumulo/minicluster/MiniAccumuloRunner.java | 1 + 2 files changed, 4 insertions(+)
(accumulo) 01/01: Merge branch 'main' into elasticity
This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git commit bf9a6a59fe34b135ba163c9e9ee4411d0f6c3c12 Merge: 158bbaddf7 8b0262d5b3 Author: Dave Marion AuthorDate: Mon Mar 18 12:03:26 2024 + Merge branch 'main' into elasticity .../main/java/org/apache/accumulo/minicluster/MiniAccumuloConfig.java | 3 +++ .../main/java/org/apache/accumulo/minicluster/MiniAccumuloRunner.java | 1 + 2 files changed, 4 insertions(+) diff --cc minicluster/src/main/java/org/apache/accumulo/minicluster/MiniAccumuloConfig.java index 6446dfd44c,4ee34fcf30..d43008be97 --- a/minicluster/src/main/java/org/apache/accumulo/minicluster/MiniAccumuloConfig.java +++ b/minicluster/src/main/java/org/apache/accumulo/minicluster/MiniAccumuloConfig.java @@@ -56,9 -56,9 +56,10 @@@ public class MiniAccumuloConfig * * @param numTservers the number of tablet servers that mini accumulo cluster should start */ + // ELASTICITY_TODO: Deprecate in 3.0.0 and remove in elasticity on the merge + @Deprecated(since = "3.1.0") public MiniAccumuloConfig setNumTservers(int numTservers) { -impl.setNumTservers(numTservers); +// impl.setNumTservers(numTservers); return this; } @@@ -68,9 -68,9 +69,10 @@@ * @param numScanServers the number of scan servers that mini accumulo cluster should start * @since 2.1.0 */ + // ELASTICITY_TODO: Deprecate in 3.0.0 and remove in elasticity on the merge + @Deprecated(since = "3.1.0") public MiniAccumuloConfig setNumScanServers(int numScanServers) { -impl.setNumScanServers(numScanServers); +// impl.setNumScanServers(numScanServers); return this; } @@@ -226,10 -226,9 +228,11 @@@ /** * @return the number of tservers configured for this cluster */ + // ELASTICITY_TODO: Deprecate in 3.0.0 and remove in elasticity on the merge + @Deprecated(since = "3.1.0") public int getNumTservers() { -return impl.getNumTservers(); +return impl.getClusterServerConfiguration().getTabletServerConfiguration().values().stream() +.reduce(0, Integer::sum); } /**
(accumulo) branch main updated: Deprecated MiniAccumuloConfig setNumServer methods (#4374)
This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git The following commit(s) were added to refs/heads/main by this push: new 8b0262d5b3 Deprecated MiniAccumuloConfig setNumServer methods (#4374) 8b0262d5b3 is described below commit 8b0262d5b31142b5728f463adffe0bdfd301ec79 Author: Dave Marion AuthorDate: Mon Mar 18 07:54:19 2024 -0400 Deprecated MiniAccumuloConfig setNumServer methods (#4374) These methods are removed in the elasticity branch in favor of a cluster server configuration object --- .../main/java/org/apache/accumulo/minicluster/MiniAccumuloConfig.java | 3 +++ .../main/java/org/apache/accumulo/minicluster/MiniAccumuloRunner.java | 1 + 2 files changed, 4 insertions(+) diff --git a/minicluster/src/main/java/org/apache/accumulo/minicluster/MiniAccumuloConfig.java b/minicluster/src/main/java/org/apache/accumulo/minicluster/MiniAccumuloConfig.java index 2945eb7650..4ee34fcf30 100644 --- a/minicluster/src/main/java/org/apache/accumulo/minicluster/MiniAccumuloConfig.java +++ b/minicluster/src/main/java/org/apache/accumulo/minicluster/MiniAccumuloConfig.java @@ -56,6 +56,7 @@ public class MiniAccumuloConfig { * * @param numTservers the number of tablet servers that mini accumulo cluster should start */ + @Deprecated(since = "3.1.0") public MiniAccumuloConfig setNumTservers(int numTservers) { impl.setNumTservers(numTservers); return this; @@ -67,6 +68,7 @@ public class MiniAccumuloConfig { * @param numScanServers the number of scan servers that mini accumulo cluster should start * @since 2.1.0 */ + @Deprecated(since = "3.1.0") public MiniAccumuloConfig setNumScanServers(int numScanServers) { impl.setNumScanServers(numScanServers); return this; @@ -224,6 +226,7 @@ public class MiniAccumuloConfig { /** * @return the number of tservers configured for this cluster */ + @Deprecated(since = "3.1.0") public int getNumTservers() { return impl.getNumTservers(); } diff --git a/minicluster/src/main/java/org/apache/accumulo/minicluster/MiniAccumuloRunner.java b/minicluster/src/main/java/org/apache/accumulo/minicluster/MiniAccumuloRunner.java index 31cd0898f9..cb4e9da20a 100644 --- a/minicluster/src/main/java/org/apache/accumulo/minicluster/MiniAccumuloRunner.java +++ b/minicluster/src/main/java/org/apache/accumulo/minicluster/MiniAccumuloRunner.java @@ -150,6 +150,7 @@ public class MiniAccumuloRunner { * * @param args An optional -p argument can be specified with the path to a valid properties file. */ + @SuppressWarnings("deprecation") @SuppressFBWarnings(value = {"PATH_TRAVERSAL_IN", "UNENCRYPTED_SERVER_SOCKET"}, justification = "code runs in same security context as user who provided input file name; " + "socket need not be encrypted, since this class is provided for testing only")
(accumulo) branch elasticity updated: Resolve TODO in SplitCancelsMajCIT (#4382)
This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git The following commit(s) were added to refs/heads/elasticity by this push: new 158bbaddf7 Resolve TODO in SplitCancelsMajCIT (#4382) 158bbaddf7 is described below commit 158bbaddf73717566f452d4bf3be20ff9a59151c Author: Dave Marion AuthorDate: Mon Mar 18 07:38:52 2024 -0400 Resolve TODO in SplitCancelsMajCIT (#4382) --- .../test/compaction/SplitCancelsMajCIT.java| 32 -- 1 file changed, 29 insertions(+), 3 deletions(-) diff --git a/test/src/main/java/org/apache/accumulo/test/compaction/SplitCancelsMajCIT.java b/test/src/main/java/org/apache/accumulo/test/compaction/SplitCancelsMajCIT.java index 640b52499e..e7ee6a19bb 100644 --- a/test/src/main/java/org/apache/accumulo/test/compaction/SplitCancelsMajCIT.java +++ b/test/src/main/java/org/apache/accumulo/test/compaction/SplitCancelsMajCIT.java @@ -19,10 +19,13 @@ package org.apache.accumulo.test.compaction; import static java.util.concurrent.TimeUnit.SECONDS; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; import java.time.Duration; import java.util.EnumSet; +import java.util.Set; import java.util.SortedSet; import java.util.TreeSet; import java.util.concurrent.atomic.AtomicReference; @@ -31,11 +34,18 @@ import org.apache.accumulo.core.client.Accumulo; import org.apache.accumulo.core.client.AccumuloClient; import org.apache.accumulo.core.client.BatchWriter; import org.apache.accumulo.core.client.IteratorSetting; +import org.apache.accumulo.core.compaction.thrift.TCompactionState; +import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope; +import org.apache.accumulo.core.metadata.schema.ExternalCompactionId; +import org.apache.accumulo.harness.MiniClusterConfigurationCallback; import org.apache.accumulo.harness.SharedMiniClusterBase; +import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; import org.apache.accumulo.test.functional.SlowIterator; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Text; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; @@ -44,8 +54,14 @@ import org.junit.jupiter.api.Test; // ACCUMULO-2862 public class SplitCancelsMajCIT extends SharedMiniClusterBase { - // ELASTICITY_TODO: Need to check new split code to ensure that it - // still cancels running MAJC. + public static class ClusterConfigForTest implements MiniClusterConfigurationCallback { + +@Override +public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration coreSite) { + cfg.setProperty(Property.COMPACTOR_CANCEL_CHECK_INTERVAL, "10s"); +} + + } @Override protected Duration defaultTimeout() { @@ -54,7 +70,7 @@ public class SplitCancelsMajCIT extends SharedMiniClusterBase { @BeforeAll public static void setup() throws Exception { -SharedMiniClusterBase.startMiniCluster(); +SharedMiniClusterBase.startMiniClusterWithConfig(new ClusterConfigForTest()); } @AfterAll @@ -67,6 +83,7 @@ public class SplitCancelsMajCIT extends SharedMiniClusterBase { final String tableName = getUniqueNames(1)[0]; try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) { c.tableOperations().create(tableName); + TableId tid = TableId.of(c.tableOperations().tableIdMap().get(tableName)); // majc should take 100 * .5 secs IteratorSetting it = new IteratorSetting(100, SlowIterator.class); SlowIterator.setSleepTime(it, 500); @@ -90,12 +107,21 @@ public class SplitCancelsMajCIT extends SharedMiniClusterBase { }); thread.start(); + Set compactionIds = ExternalCompactionTestUtils + .waitForCompactionStartAndReturnEcids(getCluster().getServerContext(), tid); + assertNotNull(compactionIds); + assertEquals(1, compactionIds.size()); + long now = System.currentTimeMillis(); Thread.sleep(SECONDS.toMillis(10)); // split the table, interrupts the compaction SortedSet partitionKeys = new TreeSet<>(); partitionKeys.add(new Text("10")); c.tableOperations().addSplits(tableName, partitionKeys); + + ExternalCompactionTestUtils.confirmCompactionCompleted(getCluster().getServerContext(), + compactionIds, TCompactionState.CANCELLED); + thread.join(); // wait for the restarted compaction assertTrue(System.currentTimeMillis() - now > 59_000);
(accumulo) branch elasticity updated: Resolved TODOs in TabletGroupWatcher (#4373)
This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git The following commit(s) were added to refs/heads/elasticity by this push: new 99cee92af5 Resolved TODOs in TabletGroupWatcher (#4373) 99cee92af5 is described below commit 99cee92af5325870a01881ea442fd1a8f342ba97 Author: Dave Marion AuthorDate: Mon Mar 18 07:36:23 2024 -0400 Resolved TODOs in TabletGroupWatcher (#4373) --- .../apache/accumulo/manager/TabletGroupWatcher.java | 21 ++--- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java b/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java index bd177c2deb..8779568916 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java @@ -22,6 +22,7 @@ import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterrup import static java.lang.Math.min; import static java.util.Objects.requireNonNull; import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.FILES; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOGS; import java.io.IOException; import java.util.ArrayList; @@ -285,6 +286,10 @@ abstract class TabletGroupWatcher extends AccumuloDaemonThread { needsFullScan = false; } +public synchronized boolean isNeedsFullScan() { + return needsFullScan; +} + @Override public void process(EventCoordinator.Event event) { @@ -640,10 +645,12 @@ abstract class TabletGroupWatcher extends AccumuloDaemonThread { boolean lookForTabletsNeedingVolReplacement = true; while (manager.stillManager()) { - // slow things down a little, otherwise we spam the logs when there are many wake-up events - sleepUninterruptibly(100, TimeUnit.MILLISECONDS); - // ELASTICITY_TODO above sleep in the case when not doing a full scan to make manager more - // responsive + if (!eventHandler.isNeedsFullScan()) { +// If an event handled by the EventHandler.RangeProcessor indicated +// that we need to do a full scan, then do it. Otherwise wait a bit +// before re-checking the tablets. +sleepUninterruptibly(100, TimeUnit.MILLISECONDS); + } final long waitTimeBetweenScans = manager.getConfiguration() .getTimeInMillis(Property.MANAGER_TABLET_GROUP_WATCHER_INTERVAL); @@ -978,9 +985,9 @@ abstract class TabletGroupWatcher extends AccumuloDaemonThread { private void replaceVolumes(List volumeReplacementsList) { try (var tabletsMutator = manager.getContext().getAmple().conditionallyMutateTablets()) { for (VolumeUtil.VolumeReplacements vr : volumeReplacementsList) { -// ELASTICITY_TODO can require same on WALS once that is implemented, see #3948 -var tabletMutator = tabletsMutator.mutateTablet(vr.tabletMeta.getExtent()) - .requireAbsentOperation().requireAbsentLocation().requireSame(vr.tabletMeta, FILES); +var tabletMutator = + tabletsMutator.mutateTablet(vr.tabletMeta.getExtent()).requireAbsentOperation() +.requireAbsentLocation().requireSame(vr.tabletMeta, FILES, LOGS); vr.logsToRemove.forEach(tabletMutator::deleteWal); vr.logsToAdd.forEach(tabletMutator::putWal);
(accumulo) branch elasticity updated: Modified UnloadTabletHandler to use Tablets metadata (#4368)
This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git The following commit(s) were added to refs/heads/elasticity by this push: new 80452089b9 Modified UnloadTabletHandler to use Tablets metadata (#4368) 80452089b9 is described below commit 80452089b9f9775d31b334d9acc06b6f006e58cd Author: Dave Marion AuthorDate: Mon Mar 18 07:34:53 2024 -0400 Modified UnloadTabletHandler to use Tablets metadata (#4368) Resolved TODO in UnloadTabletHandler by using the Tablets metadata reference, removing the code that was looking up the tablet metadata. --- .../org/apache/accumulo/tserver/UnloadTabletHandler.java | 12 +--- 1 file changed, 1 insertion(+), 11 deletions(-) diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/UnloadTabletHandler.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/UnloadTabletHandler.java index f4ab2ec7b4..a1d46257ec 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/UnloadTabletHandler.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/UnloadTabletHandler.java @@ -20,14 +20,10 @@ package org.apache.accumulo.tserver; import static java.util.concurrent.TimeUnit.NANOSECONDS; -import org.apache.accumulo.core.client.admin.TabletAvailability; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.manager.thrift.TabletLoadState; -import org.apache.accumulo.core.metadata.TServerInstance; import org.apache.accumulo.core.metadata.schema.TabletMetadata; -import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType; -import org.apache.accumulo.core.metadata.schema.TabletMetadata.Location; import org.apache.accumulo.core.tablet.thrift.TUnloadTabletGoal; import org.apache.accumulo.server.manager.state.DistributedStoreException; import org.apache.accumulo.server.manager.state.TabletStateStore; @@ -109,13 +105,7 @@ class UnloadTabletHandler implements Runnable { server.onlineTablets.remove(extent); try { - TServerInstance instance = server.getTabletSession(); - // ELASTICITY_TODO: Modify Tablet to keep a reference to TableMetadata so that we - // can avoid building a tablet metadata that may not have needed information, for example may - // need the last location - TabletMetadata tm = TabletMetadata.builder(extent).putLocation(Location.current(instance)) - .putTabletAvailability(TabletAvailability.ONDEMAND) - .build(ColumnType.LAST, ColumnType.SUSPEND); + TabletMetadata tm = t.getMetadata(); if (!goalState.equals(TUnloadTabletGoal.SUSPENDED) || extent.isRootTablet() || (extent.isMeta() && !server.getConfiguration().getBoolean(Property.MANAGER_METADATA_SUSPENDABLE))) {
(accumulo) branch elasticity updated (9550da7c83 -> f1914da45a)
This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a change to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git from 9550da7c83 Merge branch 'main' into elasticity add 7c9204274c Fixes ConcurrentModificationException in RunningCompaction (#4383) add fe552af0dd Merge branch '2.1' new f1914da45a Merge branch 'main' into elasticity The 1 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../apache/accumulo/core/util/compaction/RunningCompaction.java | 8 ++-- 1 file changed, 6 insertions(+), 2 deletions(-)
(accumulo) 01/01: Merge branch 'main' into elasticity
This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git commit f1914da45a73f6010b5eba5827a22e90b94557be Merge: 9550da7c83 fe552af0dd Author: Dave Marion AuthorDate: Fri Mar 15 17:23:43 2024 + Merge branch 'main' into elasticity .../apache/accumulo/core/util/compaction/RunningCompaction.java | 8 ++-- 1 file changed, 6 insertions(+), 2 deletions(-)
(accumulo) 01/01: Merge branch '2.1'
This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git commit fe552af0dd23d263aca96dd6d356df7070641b29 Merge: 9839e4d42f 7c9204274c Author: Dave Marion AuthorDate: Fri Mar 15 17:22:56 2024 + Merge branch '2.1' .../apache/accumulo/core/util/compaction/RunningCompaction.java | 8 ++-- 1 file changed, 6 insertions(+), 2 deletions(-)
(accumulo) branch main updated (9839e4d42f -> fe552af0dd)
This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a change to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git from 9839e4d42f Merge branch '2.1' add 7c9204274c Fixes ConcurrentModificationException in RunningCompaction (#4383) new fe552af0dd Merge branch '2.1' The 1 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../apache/accumulo/core/util/compaction/RunningCompaction.java | 8 ++-- 1 file changed, 6 insertions(+), 2 deletions(-)
(accumulo) branch 2.1 updated: Fixes ConcurrentModificationException in RunningCompaction (#4383)
This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch 2.1 in repository https://gitbox.apache.org/repos/asf/accumulo.git The following commit(s) were added to refs/heads/2.1 by this push: new 7c9204274c Fixes ConcurrentModificationException in RunningCompaction (#4383) 7c9204274c is described below commit 7c9204274c195e258c6f45dbfe93e9d720533929 Author: Dave Marion AuthorDate: Fri Mar 15 13:21:33 2024 -0400 Fixes ConcurrentModificationException in RunningCompaction (#4383) While working on #4382 I ran into an issue where a CME was being raised in a Thrift thread that was trying to serialize the updates from the RunningCompaction object. --- .../apache/accumulo/core/util/compaction/RunningCompaction.java | 8 ++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/util/compaction/RunningCompaction.java b/core/src/main/java/org/apache/accumulo/core/util/compaction/RunningCompaction.java index b2e4fd1581..4d666e4962 100644 --- a/core/src/main/java/org/apache/accumulo/core/util/compaction/RunningCompaction.java +++ b/core/src/main/java/org/apache/accumulo/core/util/compaction/RunningCompaction.java @@ -43,11 +43,15 @@ public class RunningCompaction { } public Map getUpdates() { -return updates; +synchronized (updates) { + return new TreeMap<>(updates); +} } public void addUpdate(Long timestamp, TCompactionStatusUpdate update) { -this.updates.put(timestamp, update); +synchronized (updates) { + this.updates.put(timestamp, update); +} } public TExternalCompactionJob getJob() {
(accumulo) branch main updated (23e17129de -> 9839e4d42f)
This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a change to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git from 23e17129de Revert #4358 - Replace long + TimeUnit with Duration in ReadOnlyTStore.unreserve() add 05c2f45042 Reduced warning logs under normal conditions in compaction coordinator (#4362) new 9839e4d42f Merge branch '2.1' The 1 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../coordinator/CompactionCoordinator.java | 32 -- .../accumulo/coordinator/QueueSummaries.java | 8 ++ 2 files changed, 37 insertions(+), 3 deletions(-)
(accumulo) 01/01: Merge branch '2.1'
This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git commit 9839e4d42fae9bd20b1529ca0e5e2fed4122 Merge: 23e17129de 05c2f45042 Author: Dave Marion AuthorDate: Wed Mar 13 19:52:45 2024 + Merge branch '2.1' .../coordinator/CompactionCoordinator.java | 32 -- .../accumulo/coordinator/QueueSummaries.java | 8 ++ 2 files changed, 37 insertions(+), 3 deletions(-) diff --cc server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java index d5ecbf9ddd,b0ec498a9e..685080c5b1 --- a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java +++ b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java @@@ -22,8 -23,7 +22,9 @@@ import static com.google.common.util.co import java.lang.reflect.InvocationTargetException; import java.net.UnknownHostException; +import java.util.ArrayList; + import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; @@@ -321,35 -325,58 +325,57 @@@ public class CompactionCoordinator exte LOG.info("Shutting down"); } + private Map> getIdleCompactors() { + + Map> allCompactors = + ExternalCompactionUtil.getCompactorAddrs(getContext()); + + Set emptyQueues = new HashSet<>(); + + // Remove all of the compactors that are running a compaction + RUNNING_CACHE.values().forEach(rc -> { + List busyCompactors = allCompactors.get(rc.getQueueName()); + if (busyCompactors != null + && busyCompactors.remove(HostAndPort.fromString(rc.getCompactorAddress( { + if (busyCompactors.isEmpty()) { + emptyQueues.add(rc.getQueueName()); + } + } + }); + // Remove entries with empty queues + emptyQueues.forEach(e -> allCompactors.remove(e)); + return allCompactors; + } + private void updateSummaries() { -ExecutorService executor = ThreadPools.getServerThreadPools().createFixedThreadPool(10, -"Compaction Summary Gatherer", false); -try { - Set queuesSeen = new ConcurrentSkipListSet<>(); - tserverSet.getCurrentServers().forEach(tsi -> { -executor.execute(() -> updateSummaries(tsi, queuesSeen)); - }); +final ArrayList> tasks = new ArrayList<>(); +Set queuesSeen = new ConcurrentSkipListSet<>(); - executor.shutdown(); +tserverSet.getCurrentServers().forEach(tsi -> { + tasks.add(summariesExecutor.submit(() -> updateSummaries(tsi, queuesSeen))); +}); - try { -while (!executor.awaitTermination(1, TimeUnit.MINUTES)) {} - } catch (InterruptedException e) { -Thread.currentThread().interrupt(); -throw new RuntimeException(e); +// Wait for all tasks to complete +while (!tasks.isEmpty()) { + Iterator> iter = tasks.iterator(); + while (iter.hasNext()) { +Future f = iter.next(); +if (f.isDone()) { + iter.remove(); +} } + Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS); +} - // remove any queues that were seen in the past, but were not seen in the latest gathering of - // summaries - TIME_COMPACTOR_LAST_CHECKED.keySet().retainAll(queuesSeen); +// remove any queues that were seen in the past, but were not seen in the latest gathering of +// summaries +TIME_COMPACTOR_LAST_CHECKED.keySet().retainAll(queuesSeen); - // add any queues that were never seen before - queuesSeen.forEach(q -> { -TIME_COMPACTOR_LAST_CHECKED.computeIfAbsent(q, k -> System.currentTimeMillis()); - }); -} finally { - executor.shutdownNow(); -} +// add any queues that were never seen before +queuesSeen.forEach(q -> { + TIME_COMPACTOR_LAST_CHECKED.computeIfAbsent(q, k -> System.currentTimeMillis()); +}); } private void updateSummaries(TServerInstance tsi, Set queuesSeen) {
(accumulo) branch 2.1 updated: Reduced warning logs under normal conditions in compaction coordinator (#4362)
This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch 2.1 in repository https://gitbox.apache.org/repos/asf/accumulo.git The following commit(s) were added to refs/heads/2.1 by this push: new 05c2f45042 Reduced warning logs under normal conditions in compaction coordinator (#4362) 05c2f45042 is described below commit 05c2f45042ee91a5fe04702caa77ab19f78c0f9a Author: Dave Marion AuthorDate: Wed Mar 13 11:52:47 2024 -0400 Reduced warning logs under normal conditions in compaction coordinator (#4362) Fixes #4219 --- .../coordinator/CompactionCoordinator.java | 32 -- .../accumulo/coordinator/QueueSummaries.java | 8 ++ .../coordinator/CompactionCoordinatorTest.java | 6 3 files changed, 43 insertions(+), 3 deletions(-) diff --git a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java index f4819ebefb..b0ec498a9e 100644 --- a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java +++ b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java @@ -23,6 +23,7 @@ import static org.apache.accumulo.core.util.UtilWaitThread.sleepUninterruptibly; import java.lang.reflect.InvocationTargetException; import java.net.UnknownHostException; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -303,9 +304,12 @@ public class CompactionCoordinator extends AbstractServer updateSummaries(); long now = System.currentTimeMillis(); - TIME_COMPACTOR_LAST_CHECKED.forEach((k, v) -> { -if ((now - v) > getMissingCompactorWarningTime()) { - LOG.warn("No compactors have checked in with coordinator for queue {} in {}ms", k, + + Map> idleCompactors = getIdleCompactors(); + TIME_COMPACTOR_LAST_CHECKED.forEach((queue, lastCheckTime) -> { +if ((now - lastCheckTime) > getMissingCompactorWarningTime() +&& QUEUE_SUMMARIES.isCompactionsQueued(queue) && idleCompactors.containsKey(queue)) { + LOG.warn("No compactors have checked in with coordinator for queue {} in {}ms", queue, getMissingCompactorWarningTime()); } }); @@ -321,6 +325,28 @@ public class CompactionCoordinator extends AbstractServer LOG.info("Shutting down"); } + private Map> getIdleCompactors() { + +Map> allCompactors = +ExternalCompactionUtil.getCompactorAddrs(getContext()); + +Set emptyQueues = new HashSet<>(); + +// Remove all of the compactors that are running a compaction +RUNNING_CACHE.values().forEach(rc -> { + List busyCompactors = allCompactors.get(rc.getQueueName()); + if (busyCompactors != null + && busyCompactors.remove(HostAndPort.fromString(rc.getCompactorAddress( { +if (busyCompactors.isEmpty()) { + emptyQueues.add(rc.getQueueName()); +} + } +}); +// Remove entries with empty queues +emptyQueues.forEach(e -> allCompactors.remove(e)); +return allCompactors; + } + private void updateSummaries() { ExecutorService executor = ThreadPools.getServerThreadPools().createFixedThreadPool(10, "Compaction Summary Gatherer", false); diff --git a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/QueueSummaries.java b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/QueueSummaries.java index 6edb2c0f36..1d89cd0321 100644 --- a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/QueueSummaries.java +++ b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/QueueSummaries.java @@ -100,6 +100,14 @@ public class QueueSummaries { } } + synchronized boolean isCompactionsQueued(String queue) { +var q = QUEUES.get(queue); +if (q == null) { + return false; +} +return !q.isEmpty(); + } + synchronized PrioTserver getNextTserver(String queue) { Entry> entry = getNextTserverEntry(queue); diff --git a/server/compaction-coordinator/src/test/java/org/apache/accumulo/coordinator/CompactionCoordinatorTest.java b/server/compaction-coordinator/src/test/java/org/apache/accumulo/coordinator/CompactionCoordinatorTest.java index 117d50108a..87e7471bef 100644 --- a/server/compaction-coordinator/src/test/java/org/apache/accumulo/coordinator/CompactionCoordinatorTest.java +++ b/server/compaction-coordinator/src/test/java/org/apache/accumulo/coordinator/CompactionCoordinatorTest.java @@ -214,6 +214,7 @@ public class CompactionCoordinatorTest { var coordinator = new TestCoordinator(null, null, null
(accumulo) branch elasticity updated: Resolved elasticity TODOs in Manager (#4365)
This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git The following commit(s) were added to refs/heads/elasticity by this push: new 75b717ed5f Resolved elasticity TODOs in Manager (#4365) 75b717ed5f is described below commit 75b717ed5f0a7604d6cc25f0ce2a069d9335907b Author: Dave Marion AuthorDate: Wed Mar 13 08:28:15 2024 -0400 Resolved elasticity TODOs in Manager (#4365) Removed the TODO for the bulkImports as it is still being set by BulkImport V2 and referenced in the Monitor. Implemented the suggestion in the other TODO. Co-authored-by: Keith Turner --- .../src/main/java/org/apache/accumulo/manager/Manager.java| 11 --- .../java/org/apache/accumulo/manager/TabletGroupWatcher.java | 2 +- 2 files changed, 5 insertions(+), 8 deletions(-) diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java index 6758acfc1f..45ab2b5e2b 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java @@ -53,6 +53,7 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Predicate; import java.util.stream.Collectors; import org.apache.accumulo.core.Constants; @@ -226,7 +227,6 @@ public class Manager extends AbstractServer volatile SortedMap tserverStatusForBalancer = emptySortedMap(); volatile Map> tServerGroupingForBalancer = emptyMap(); - // ELASTICITY_TODO is this still needed? final ServerBulkImportStatus bulkImportStatus = new ServerBulkImportStatus(); private final AtomicBoolean managerInitialized = new AtomicBoolean(false); @@ -243,14 +243,11 @@ public class Manager extends AbstractServer return state; } - // ELASTICITIY_TODO it would be nice if this method could take DataLevel as an argument and only - // retrieve information about compactions in that data level. Attempted this and a lot of - // refactoring was needed to get that small bit of information to this method. Would be best to - // address this after issue. May be best to attempt this after #3576. - public Map> getCompactionHints() { + public Map> getCompactionHints(DataLevel level) { +Predicate tablePredicate = (tableId) -> DataLevel.of(tableId) == level; Map allConfig; try { - allConfig = CompactionConfigStorage.getAllConfig(getContext(), tableId -> true); + allConfig = CompactionConfigStorage.getAllConfig(getContext(), tablePredicate); } catch (InterruptedException | KeeperException e) { throw new RuntimeException(e); } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java b/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java index e7ea20413a..bd177c2deb 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java @@ -340,7 +340,7 @@ abstract class TabletGroupWatcher extends AccumuloDaemonThread { return new TabletManagementParameters(manager.getManagerState(), parentLevelUpgrade, manager.onlineTables(), tServersSnapshot, shutdownServers, manager.migrationsSnapshot(), -store.getLevel(), manager.getCompactionHints(), canSuspendTablets(), +store.getLevel(), manager.getCompactionHints(store.getLevel()), canSuspendTablets(), lookForTabletsNeedingVolReplacement ? manager.getContext().getVolumeReplacements() : Map.of()); }
(accumulo) branch elasticity updated: Removed special handling logic in TabletManagementIterator (#4363)
This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git The following commit(s) were added to refs/heads/elasticity by this push: new b4a2ac45b6 Removed special handling logic in TabletManagementIterator (#4363) b4a2ac45b6 is described below commit b4a2ac45b69f2b7b6f052e029354cfebb2e66b9a Author: Dave Marion AuthorDate: Wed Mar 13 07:38:47 2024 -0400 Removed special handling logic in TabletManagementIterator (#4363) Removed the logic that always returns the TabletMetadata when the Manager state is not normal, or there are no tablet servers, or no online tables. The code now just calls computeTabletManagementActions in all cases. Closes #4256 --- .../org/apache/accumulo/core/metadata/TabletState.java | 4 .../accumulo/server/manager/state/TabletGoalState.java | 3 +++ .../server/manager/state/TabletManagementIterator.java | 14 +- 3 files changed, 4 insertions(+), 17 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/TabletState.java b/core/src/main/java/org/apache/accumulo/core/metadata/TabletState.java index 9fcf8add3f..ba182514d1 100644 --- a/core/src/main/java/org/apache/accumulo/core/metadata/TabletState.java +++ b/core/src/main/java/org/apache/accumulo/core/metadata/TabletState.java @@ -21,14 +21,10 @@ package org.apache.accumulo.core.metadata; import java.util.Set; import org.apache.accumulo.core.metadata.schema.TabletMetadata; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; public enum TabletState { UNASSIGNED, ASSIGNED, HOSTED, ASSIGNED_TO_DEAD_SERVER, SUSPENDED; - private static Logger log = LoggerFactory.getLogger(TabletState.class); - public static TabletState compute(TabletMetadata tm, Set liveTServers) { TabletMetadata.Location current = null; TabletMetadata.Location future = null; diff --git a/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletGoalState.java b/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletGoalState.java index 81e796608c..0b9b83f159 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletGoalState.java +++ b/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletGoalState.java @@ -103,6 +103,9 @@ public enum TabletGoalState { if (!tm.getHostingRequested()) { return UNASSIGNED; } +break; + default: +break; } } diff --git a/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementIterator.java b/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementIterator.java index 2e6627c78e..39329b0e42 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementIterator.java +++ b/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementIterator.java @@ -43,7 +43,6 @@ import org.apache.accumulo.core.iterators.SortedKeyValueIterator; import org.apache.accumulo.core.iterators.user.WholeRowIterator; import org.apache.accumulo.core.manager.state.TabletManagement; import org.apache.accumulo.core.manager.state.TabletManagement.ManagementAction; -import org.apache.accumulo.core.manager.thrift.ManagerState; import org.apache.accumulo.core.metadata.TabletState; import org.apache.accumulo.core.metadata.schema.TabletMetadata; import org.apache.accumulo.core.metadata.schema.TabletOperationType; @@ -200,18 +199,7 @@ public class TabletManagementIterator extends SkippingIterator { Exception error = null; try { LOG.trace("Evaluating extent: {}", tm); -if (tm.getExtent().isMeta()) { - computeTabletManagementActions(tm, actions); -} else { - if (tabletMgmtParams.getManagerState() != ManagerState.NORMAL - || tabletMgmtParams.getOnlineTsevers().isEmpty() - || tabletMgmtParams.getOnlineTables().isEmpty()) { -// when manager is in the process of starting up or shutting down return everything. -actions.add(ManagementAction.NEEDS_LOCATION_UPDATE); - } else { -computeTabletManagementActions(tm, actions); - } -} +computeTabletManagementActions(tm, actions); } catch (Exception e) { LOG.error("Error computing tablet management actions for extent: {}", tm.getExtent(), e); error = e;
(accumulo-access) branch main updated: Modified Authorizations.of to only accept a Set (#68)
This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo-access.git The following commit(s) were added to refs/heads/main by this push: new 33be659 Modified Authorizations.of to only accept a Set (#68) 33be659 is described below commit 33be6595160d0f62041731a720cc2b80f898c243 Author: Dave Marion AuthorDate: Tue Mar 12 13:47:23 2024 -0400 Modified Authorizations.of to only accept a Set (#68) Closes #66 - Co-authored-by: Keith Turner --- .../antlr/AccessExpressionAntlrBenchmark.java | 3 +- .../accumulo/access/grammar/antlr/Antlr4Tests.java | 7 ++-- .../apache/accumulo/access/AccessEvaluator.java| 7 .../accumulo/access/AccessEvaluatorImpl.java | 8 .../accumulo/access/AccessExpressionImpl.java | 2 +- .../org/apache/accumulo/access/Authorizations.java | 21 ++ src/test/java/example/AccessExample.java | 4 +- .../accumulo/access/AccessEvaluatorTest.java | 18 + .../accumulo/access/AccessExpressionBenchmark.java | 7 ++-- .../apache/accumulo/access/AuthorizationTest.java | 45 ++ 10 files changed, 84 insertions(+), 38 deletions(-) diff --git a/src/it/antlr4-example/src/test/java/org/apache/accumulo/access/grammar/antlr/AccessExpressionAntlrBenchmark.java b/src/it/antlr4-example/src/test/java/org/apache/accumulo/access/grammar/antlr/AccessExpressionAntlrBenchmark.java index 0c3422f..75ec0a7 100644 --- a/src/it/antlr4-example/src/test/java/org/apache/accumulo/access/grammar/antlr/AccessExpressionAntlrBenchmark.java +++ b/src/it/antlr4-example/src/test/java/org/apache/accumulo/access/grammar/antlr/AccessExpressionAntlrBenchmark.java @@ -25,6 +25,7 @@ import java.net.URISyntaxException; import java.util.ArrayList; import java.util.List; import java.util.concurrent.TimeUnit; +import java.util.Set; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -86,7 +87,7 @@ public class AccessExpressionAntlrBenchmark { et.expressions = new ArrayList<>(); et.evaluator = new AccessExpressionAntlrEvaluator( - Stream.of(testDataSet.auths).map(Authorizations::of).collect(Collectors.toList())); +Stream.of(testDataSet.auths).map(a -> Authorizations.of(Set.of(a))).collect(Collectors.toList())); for (var tests : testDataSet.tests) { if (tests.expectedResult != TestDataLoader.ExpectedResult.ERROR) { diff --git a/src/it/antlr4-example/src/test/java/org/apache/accumulo/access/grammar/antlr/Antlr4Tests.java b/src/it/antlr4-example/src/test/java/org/apache/accumulo/access/grammar/antlr/Antlr4Tests.java index be40f3b..09870ae 100644 --- a/src/it/antlr4-example/src/test/java/org/apache/accumulo/access/grammar/antlr/Antlr4Tests.java +++ b/src/it/antlr4-example/src/test/java/org/apache/accumulo/access/grammar/antlr/Antlr4Tests.java @@ -27,6 +27,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import java.util.List; import java.util.concurrent.atomic.AtomicLong; +import java.util.Set; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -120,7 +121,7 @@ public class Antlr4Tests { @Test public void testSimpleEvaluation() throws Exception { String accessExpression = "(one)|(foo)"; -Authorizations auths = Authorizations.of("four", "three", "one", "two"); +Authorizations auths = Authorizations.of(Set.of("four", "three", "one", "two")); AccessExpressionAntlrEvaluator eval = new AccessExpressionAntlrEvaluator(List.of(auths)); assertTrue(eval.canAccess(accessExpression)); } @@ -128,7 +129,7 @@ public class Antlr4Tests { @Test public void testSimpleEvaluationFailure() throws Exception { String accessExpression = "(A)"; -Authorizations auths = Authorizations.of("A", "C"); +Authorizations auths = Authorizations.of(Set.of("A", "C")); AccessExpressionAntlrEvaluator eval = new AccessExpressionAntlrEvaluator(List.of(auths)); assertFalse(eval.canAccess(accessExpression)); } @@ -141,7 +142,7 @@ public class Antlr4Tests { for (TestDataSet testSet : testData) { List authSets = - Stream.of(testSet.auths).map(Authorizations::of).collect(Collectors.toList()); + Stream.of(testSet.auths).map(a -> Authorizations.of(Set.of(a))).collect(Collectors.toList()); AccessEvaluator evaluator = AccessEvaluator.of(authSets); AccessExpressionAntlrEvaluator antlr = new AccessExpressionAntlrEvaluator(authSets); diff --git a/src/main/java/org/apache/accumulo/access/AccessEvaluator.java b/src/main/java/org/apache/accumulo/access/AccessEvaluator.java index 1ea4d23..3eff0d1 100644 --- a/src/main/java/org/apache/accumu
(accumulo) branch elasticity updated: Refactor classes to use the Caches object (#4359)
This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git The following commit(s) were added to refs/heads/elasticity by this push: new 7ea10de6c8 Refactor classes to use the Caches object (#4359) 7ea10de6c8 is described below commit 7ea10de6c8af6a8c79e0ae6aadcc491566e75cee Author: Dave Marion AuthorDate: Tue Mar 12 08:10:10 2024 -0400 Refactor classes to use the Caches object (#4359) --- .../java/org/apache/accumulo/core/util/cache/Caches.java | 7 ++- .../accumulo/server/compaction/CompactionJobGenerator.java | 4 ++-- .../accumulo/server/conf/ServerConfigurationFactory.java | 14 -- .../org/apache/accumulo/server/fs/VolumeManagerImpl.java | 6 -- 4 files changed, 20 insertions(+), 11 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/util/cache/Caches.java b/core/src/main/java/org/apache/accumulo/core/util/cache/Caches.java index a96af36bc5..f5ef8e4c8f 100644 --- a/core/src/main/java/org/apache/accumulo/core/util/cache/Caches.java +++ b/core/src/main/java/org/apache/accumulo/core/util/cache/Caches.java @@ -42,12 +42,14 @@ public class Caches implements MetricsProducer { COMPACTION_CONFIGS, COMPACTION_DIR_CACHE, COMPACTION_DISPATCHERS, +COMPACTION_SERVICE_UNKNOWN, COMPACTOR_GROUP_ID, COMPRESSION_ALGORITHM, CRYPT_PASSWORDS, HOST_REGEX_BALANCER_TABLE_REGEX, INSTANCE_ID, NAMESPACE_ID, +NAMESPACE_CONFIGS, PROP_CACHE, RECOVERY_MANAGER_PATH_CACHE, SCAN_SERVER_TABLET_METADATA, @@ -56,10 +58,13 @@ public class Caches implements MetricsProducer { SPLITTER_FILES, SPLITTER_STARTING, SPLITTER_UNSPLITTABLE, +TABLE_CONFIGS, TABLE_ID, +TABLE_PARENT_CONFIGS, TABLE_ZOO_HELPER_CACHE, TSRM_FILE_LENGTHS, -TINYLFU_BLOCK_CACHE; +TINYLFU_BLOCK_CACHE, +VOLUME_HDFS_CONFIGS; } private static final Logger LOG = LoggerFactory.getLogger(Caches.class); diff --git a/server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionJobGenerator.java b/server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionJobGenerator.java index 02e3dc2fca..1d88de2eaa 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionJobGenerator.java +++ b/server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionJobGenerator.java @@ -55,7 +55,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.github.benmanes.caffeine.cache.Cache; -import com.github.benmanes.caffeine.cache.Caffeine; public class CompactionJobGenerator { private static final Logger log = LoggerFactory.getLogger(CompactionJobGenerator.class); @@ -86,7 +85,8 @@ public class CompactionJobGenerator { v.isEmpty() ? Map.of() : Collections.unmodifiableMap(v))); } unknownCompactionServiceErrorCache = -Caffeine.newBuilder().expireAfterWrite(5, TimeUnit.MINUTES).build(); + Caches.getInstance().createNewBuilder(CacheName.COMPACTION_SERVICE_UNKNOWN, false) +.expireAfterWrite(5, TimeUnit.MINUTES).build(); } public Collection generateJobs(TabletMetadata tablet, Set kinds) { diff --git a/server/base/src/main/java/org/apache/accumulo/server/conf/ServerConfigurationFactory.java b/server/base/src/main/java/org/apache/accumulo/server/conf/ServerConfigurationFactory.java index 78b1e4f18b..c6f32946c3 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/conf/ServerConfigurationFactory.java +++ b/server/base/src/main/java/org/apache/accumulo/server/conf/ServerConfigurationFactory.java @@ -36,6 +36,8 @@ import org.apache.accumulo.core.conf.DefaultConfiguration; import org.apache.accumulo.core.conf.SiteConfiguration; import org.apache.accumulo.core.data.NamespaceId; import org.apache.accumulo.core.data.TableId; +import org.apache.accumulo.core.util.cache.Caches; +import org.apache.accumulo.core.util.cache.Caches.CacheName; import org.apache.accumulo.core.util.threads.ThreadPools; import org.apache.accumulo.core.util.threads.Threads; import org.apache.accumulo.server.ServerContext; @@ -49,7 +51,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.github.benmanes.caffeine.cache.Cache; -import com.github.benmanes.caffeine.cache.Caffeine; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; @@ -80,11 +81,12 @@ public class ServerConfigurationFactory extends ServerConfiguration { this.systemConfig = memoize(() -> new SystemConfiguration(context, SystemPropKey.of(context.getInstanceID()), siteConfig)); tableParentConfigs = -Caffeine.newBuilder().expireAfterAccess(CACHE_EXPIRATION_HRS, TimeUnit.HOURS).build(); -tableConfigs = -Caffeine.newBuilder().expireAfterAccess(CACHE_EXPIRATION_HRS, TimeUnit.HOURS).build(); -namespaceConf
(accumulo) 02/02: Merge remote-tracking branch 'upstream/elasticity' into elasticity
This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git commit 1688cecd5a8889bcb41cec2bdb45d2f063d1d9a9 Merge: 3849fffedd 202198a588 Author: Dave Marion AuthorDate: Mon Mar 4 20:15:25 2024 + Merge remote-tracking branch 'upstream/elasticity' into elasticity .../apache/accumulo/core/logging/TabletLogger.java | 49 ++- .../coordinator/CompactionCoordinator.java | 27 +- .../coordinator/commit/CommitCompaction.java | 4 + .../manager/tableOps/compact/CompactionDriver.java | 22 +- .../manager/tableOps/split/UpdateTablets.java | 42 ++- .../compaction/CompactionCoordinatorTest.java | 148 ++ .../manager/tableOps/split/UpdateTabletsTest.java | 328 + .../apache/accumulo/test/functional/SplitIT.java | 54 test/src/main/resources/log4j2-test.properties | 3 + 9 files changed, 636 insertions(+), 41 deletions(-)
(accumulo) branch elasticity updated (202198a588 -> 1688cecd5a)
This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a change to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git from 202198a588 Updates compaction to use TabletLogger (#4333) add d91d016211 Optimized logic for getting a random TabletServer connection (#4309) add f3d5fb01d7 Merge branch '2.1' new 3849fffedd Merge branch 'main' into elasticity new 1688cecd5a Merge remote-tracking branch 'upstream/elasticity' into elasticity The 2 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../core/clientImpl/ThriftTransportKey.java| 29 -- .../core/clientImpl/ThriftTransportPool.java | 110 + .../org/apache/accumulo/core/rpc/ThriftUtil.java | 7 +- .../accumulo/core/rpc/clients/TServerClient.java | 105 +++- .../core/rpc/clients/ThriftClientTypes.java| 6 +- .../core/clientImpl/ThriftTransportKeyTest.java| 25 ++--- .../apache/accumulo/test/TransportCachingIT.java | 42 .../test/functional/MemoryStarvedScanIT.java | 41 +--- 8 files changed, 182 insertions(+), 183 deletions(-)
(accumulo) 01/02: Merge branch 'main' into elasticity
This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git commit 3849fffeddeb29ec92b74926c15e756f182bb7c3 Merge: 422d48a432 f3d5fb01d7 Author: Dave Marion AuthorDate: Mon Mar 4 20:14:59 2024 + Merge branch 'main' into elasticity .../core/clientImpl/ThriftTransportKey.java| 29 -- .../core/clientImpl/ThriftTransportPool.java | 110 + .../org/apache/accumulo/core/rpc/ThriftUtil.java | 7 +- .../accumulo/core/rpc/clients/TServerClient.java | 105 +++- .../core/rpc/clients/ThriftClientTypes.java| 6 +- .../core/clientImpl/ThriftTransportKeyTest.java| 25 ++--- .../apache/accumulo/test/TransportCachingIT.java | 42 .../test/functional/MemoryStarvedScanIT.java | 41 +--- 8 files changed, 182 insertions(+), 183 deletions(-) diff --cc test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedScanIT.java index 8273f8e5b8,59b9a535b8..0becd57120 --- a/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedScanIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedScanIT.java @@@ -31,6 -30,6 +30,7 @@@ import java.util.Iterator import java.util.List; import java.util.Map; import java.util.Map.Entry; ++import java.util.Optional; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.DoubleAdder; @@@ -41,23 -39,12 +41,21 @@@ import org.apache.accumulo.core.client. import org.apache.accumulo.core.client.IteratorSetting; import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.client.admin.TableOperations; +import org.apache.accumulo.core.clientImpl.ClientContext; - import org.apache.accumulo.core.clientImpl.ThriftTransportKey; +import org.apache.accumulo.core.clientImpl.thrift.ClientService.Client; +import org.apache.accumulo.core.clientImpl.thrift.TInfo; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.fate.zookeeper.ZooCache; import org.apache.accumulo.core.iterators.WrappingIterator; +import org.apache.accumulo.core.lock.ServiceLock; - import org.apache.accumulo.core.lock.ServiceLock.ServiceLockPath; ++import org.apache.accumulo.core.lock.ServiceLockData; +import org.apache.accumulo.core.lock.ServiceLockData.ThriftService; import org.apache.accumulo.core.metrics.MetricsProducer; +import org.apache.accumulo.core.rpc.ThriftUtil; +import org.apache.accumulo.core.rpc.clients.ThriftClientTypes; - import org.apache.accumulo.core.util.Pair; import org.apache.accumulo.harness.MiniClusterConfigurationCallback; import org.apache.accumulo.harness.SharedMiniClusterBase; import org.apache.accumulo.minicluster.MemoryUnit; @@@ -67,7 -54,6 +65,8 @@@ import org.apache.accumulo.test.metrics import org.apache.accumulo.test.metrics.TestStatsDSink; import org.apache.accumulo.test.metrics.TestStatsDSink.Metric; import org.apache.hadoop.conf.Configuration; +import org.apache.thrift.transport.TTransport; ++import org.apache.thrift.transport.TTransportException; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; @@@ -75,6 -61,6 +74,8 @@@ import org.junit.jupiter.api.Test import org.slf4j.Logger; import org.slf4j.LoggerFactory; ++import com.google.common.net.HostAndPort; ++ public class MemoryStarvedScanIT extends SharedMiniClusterBase { public static class MemoryStarvedITConfiguration implements MiniClusterConfigurationCallback { @@@ -187,25 -173,10 +188,35 @@@ } static void freeServerMemory(AccumuloClient client) throws Exception { -// Instantiating this class on the TabletServer will free the memory as it -// frees the buffers created by the MemoryConsumingIterator in its constructor. - client.instanceOperations().testClassLoad(MemoryFreeingIterator.class.getName(), -WrappingIterator.class.getName()); + ++// This does not call ThriftClientTypes.CLIENT.execute because ++// we only want to communicate with the TabletServer for this test +final ClientContext context = (ClientContext) client; +final long rpcTimeout = context.getClientTimeoutInMillis(); - final ArrayList servers = new ArrayList<>(); +final String serverPath = context.getZooKeeperRoot() + Constants.ZTSERVERS; +final ZooCache zc = context.getZooCache(); + +for (String server : zc.getChildren(serverPath)) { - ServiceLockPath zLocPath = ServiceLock.path(serverPath + "/" + server); - zc.getLockData(zLocPath).map(sld -> sld.getAddress(ThriftService.CLIENT)) - .map(address -> new ThriftTransportKey(address, rpcTimeout, context))
(accumulo) 01/01: Merge branch '2.1'
This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git commit f3d5fb01d701a6932d37d2f67f52cc0eefa64d50 Merge: ebf7054d1f d91d016211 Author: Dave Marion AuthorDate: Mon Mar 4 18:14:16 2024 + Merge branch '2.1' .../core/clientImpl/ThriftTransportKey.java| 29 -- .../core/clientImpl/ThriftTransportPool.java | 110 + .../org/apache/accumulo/core/rpc/ThriftUtil.java | 7 +- .../accumulo/core/rpc/clients/TServerClient.java | 105 +++- .../core/rpc/clients/ThriftClientTypes.java| 6 +- .../core/clientImpl/ThriftTransportKeyTest.java| 25 ++--- .../coordinator/CompactionCoordinator.java | 4 +- .../apache/accumulo/test/TransportCachingIT.java | 42 8 files changed, 158 insertions(+), 170 deletions(-) diff --cc core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftTransportKey.java index f332a09492,f4c7047d6d..0f84154a15 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftTransportKey.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftTransportKey.java @@@ -24,9 -24,10 +24,10 @@@ import java.util.Objects import org.apache.accumulo.core.rpc.SaslConnectionParams; import org.apache.accumulo.core.rpc.SslConnectionParams; + import org.apache.accumulo.core.rpc.clients.ThriftClientTypes; -import org.apache.accumulo.core.util.HostAndPort; import com.google.common.annotations.VisibleForTesting; +import com.google.common.net.HostAndPort; @VisibleForTesting public class ThriftTransportKey { @@@ -54,12 -58,18 +58,18 @@@ this.saslParams = saslParams; if (saslParams != null && sslParams != null) { // TSasl and TSSL transport factories don't play nicely together - throw new RuntimeException("Cannot use both SSL and SASL thrift transports"); + throw new IllegalArgumentException("Cannot use both SSL and SASL thrift transports"); } - this.hash = Objects.hash(server, timeout, sslParams, saslParams); + this.hash = Objects.hash(type, server, timeout, sslParams, saslParams); } - HostAndPort getServer() { + @VisibleForTesting + public ThriftClientTypes getType() { + return type; + } + + @VisibleForTesting + public HostAndPort getServer() { return server; } diff --cc core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftTransportPool.java index d1bc17e945,a3d38aa10a..b3f205fa2a --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftTransportPool.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftTransportPool.java @@@ -41,6 -41,8 +41,7 @@@ import java.util.function.LongSupplier import java.util.function.Supplier; import org.apache.accumulo.core.rpc.ThriftUtil; + import org.apache.accumulo.core.rpc.clients.ThriftClientTypes; -import org.apache.accumulo.core.util.HostAndPort; import org.apache.accumulo.core.util.Pair; import org.apache.accumulo.core.util.threads.Threads; import org.apache.thrift.TConfiguration; @@@ -49,9 -51,7 +50,8 @@@ import org.apache.thrift.transport.TTra import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import com.google.common.net.HostAndPort; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; @@@ -109,71 -110,40 +109,40 @@@ public class ThriftTransportPool return pool; } - public TTransport getTransport(HostAndPort location, long milliseconds, ClientContext context) - throws TTransportException { - ThriftTransportKey cacheKey = new ThriftTransportKey(location, milliseconds, context); + public TTransport getTransport(ThriftClientTypes type, HostAndPort location, long milliseconds, + ClientContext context, boolean preferCached) throws TTransportException { - CachedConnection connection = connectionPool.reserveAny(cacheKey); - - if (connection != null) { - log.trace("Using existing connection to {}", cacheKey.getServer()); - return connection.transport; - } else { - return createNewTransport(cacheKey); + ThriftTransportKey cacheKey = new ThriftTransportKey(type, location, milliseconds, context); + if (preferCached) { + CachedConnection connection = connectionPool.reserveAny(cacheKey); + if (connection != null) { + log.trace("Using existing connection to {}", cacheKey.getServer()); + return connection.transport; + } } + return createNewTransport(cacheKey); } - @VisibleForTesting - public Pair getAnyTransport(List servers, - boolean preferCachedConnection) throws TTransportException { - - servers = new ArrayList<>(servers); - - if (preferCachedConnection
(accumulo) branch main updated (ebf7054d1f -> f3d5fb01d7)
This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a change to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git from ebf7054d1f Merge branch '2.1' add d91d016211 Optimized logic for getting a random TabletServer connection (#4309) new f3d5fb01d7 Merge branch '2.1' The 1 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../core/clientImpl/ThriftTransportKey.java| 29 -- .../core/clientImpl/ThriftTransportPool.java | 110 + .../org/apache/accumulo/core/rpc/ThriftUtil.java | 7 +- .../accumulo/core/rpc/clients/TServerClient.java | 105 +++- .../core/rpc/clients/ThriftClientTypes.java| 6 +- .../core/clientImpl/ThriftTransportKeyTest.java| 25 ++--- .../coordinator/CompactionCoordinator.java | 4 +- .../apache/accumulo/test/TransportCachingIT.java | 42 8 files changed, 158 insertions(+), 170 deletions(-)
(accumulo) branch 2.1 updated: Optimized logic for getting a random TabletServer connection (#4309)
This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch 2.1 in repository https://gitbox.apache.org/repos/asf/accumulo.git The following commit(s) were added to refs/heads/2.1 by this push: new d91d016211 Optimized logic for getting a random TabletServer connection (#4309) d91d016211 is described below commit d91d0162115ae66112a104278bcd14e8085936d3 Author: Dave Marion AuthorDate: Mon Mar 4 09:16:58 2024 -0500 Optimized logic for getting a random TabletServer connection (#4309) The previous logic in this class would gather all of the Tserver ZNodes in ZooKeeper, then get the data for each ZNode and validate their ServiceLock. Then, after all of that it would randomly pick one of the TabletServers to connect to. It did this through the ZooCache object which on an initial connection would be empty and causes a lot of back and forth to ZooKeeper. The side effect of this is that the ZooCache would be populated with TabletServer information. This change modifies TServerClient such that it no longer populates ZooCache information for each TabletServer and modifies the default logic for getting a connection to a TabletServer. The new logic will make 3 calls to ZooKeeper in the best case scenario, one to get the list of TServer ZNodes in Zookeeper, one to get the ServiceLock for a random TServer and another to get the ZNode data for one of it. This is all done through ZooCache, so it is lazily populated over time instead of incurring the penalty when getting the first TabletServer connection. Fixes #4303 --- .../core/clientImpl/ThriftTransportKey.java| 29 -- .../core/clientImpl/ThriftTransportPool.java | 110 + .../org/apache/accumulo/core/rpc/ThriftUtil.java | 7 +- .../accumulo/core/rpc/clients/TServerClient.java | 73 -- .../core/rpc/clients/ThriftClientTypes.java| 6 +- .../core/clientImpl/ThriftTransportKeyTest.java| 25 ++--- .../coordinator/CompactionCoordinator.java | 4 +- .../apache/accumulo/test/TransportCachingIT.java | 42 8 files changed, 142 insertions(+), 154 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftTransportKey.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftTransportKey.java index 8be320dcc5..f4c7047d6d 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftTransportKey.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftTransportKey.java @@ -24,12 +24,14 @@ import java.util.Objects; import org.apache.accumulo.core.rpc.SaslConnectionParams; import org.apache.accumulo.core.rpc.SslConnectionParams; +import org.apache.accumulo.core.rpc.clients.ThriftClientTypes; import org.apache.accumulo.core.util.HostAndPort; import com.google.common.annotations.VisibleForTesting; @VisibleForTesting public class ThriftTransportKey { + private final ThriftClientTypes type; private final HostAndPort server; private final long timeout; private final SslConnectionParams sslParams; @@ -38,16 +40,18 @@ public class ThriftTransportKey { private final int hash; @VisibleForTesting - public ThriftTransportKey(HostAndPort server, long timeout, ClientContext context) { -this(server, timeout, context.getClientSslParams(), context.getSaslParams()); + public ThriftTransportKey(ThriftClientTypes type, HostAndPort server, long timeout, + ClientContext context) { +this(type, server, timeout, context.getClientSslParams(), context.getSaslParams()); } /** * Visible only for testing */ - ThriftTransportKey(HostAndPort server, long timeout, SslConnectionParams sslParams, - SaslConnectionParams saslParams) { + ThriftTransportKey(ThriftClientTypes type, HostAndPort server, long timeout, + SslConnectionParams sslParams, SaslConnectionParams saslParams) { requireNonNull(server, "location is null"); +this.type = type; this.server = server; this.timeout = timeout; this.sslParams = sslParams; @@ -56,14 +60,21 @@ public class ThriftTransportKey { // TSasl and TSSL transport factories don't play nicely together throw new RuntimeException("Cannot use both SSL and SASL thrift transports"); } -this.hash = Objects.hash(server, timeout, sslParams, saslParams); +this.hash = Objects.hash(type, server, timeout, sslParams, saslParams); } - HostAndPort getServer() { + @VisibleForTesting + public ThriftClientTypes getType() { +return type; + } + + @VisibleForTesting + public HostAndPort getServer() { return server; } - long getTimeout() { + @VisibleForTesting + public long getTimeout() { return timeout; } @@ -81,7 +92,7 @@ public class ThriftTransportKey { return false; } ThriftTransportKey ttk = (T
(accumulo) 01/01: Merge branch 'main' into elasticity
This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git commit 422d48a4325bb11b9aaae1ec8c61bdae4afca214 Merge: afe2857935 ebf7054d1f Author: Dave Marion AuthorDate: Mon Mar 4 12:51:34 2024 + Merge branch 'main' into elasticity assemble/bin/accumulo-cluster | 10 +- .../accumulo/core/conf/cluster/ClusterConfigParser.java | 8 ++-- .../accumulo/core/conf/cluster/ClusterConfigParserTest.java | 11 --- .../core/conf/cluster/cluster-with-optional-services.yaml | 3 ++- 4 files changed, 25 insertions(+), 7 deletions(-) diff --cc core/src/main/java/org/apache/accumulo/core/conf/cluster/ClusterConfigParser.java index 0c465564d9,e00570154a..c4ebf8c7ec --- a/core/src/main/java/org/apache/accumulo/core/conf/cluster/ClusterConfigParser.java +++ b/core/src/main/java/org/apache/accumulo/core/conf/cluster/ClusterConfigParser.java @@@ -39,23 -39,13 +39,24 @@@ import edu.umd.cs.findbugs.annotations. public class ClusterConfigParser { private static final String PROPERTY_FORMAT = "%s=\"%s\"%n"; - private static final String[] SECTIONS = new String[] {"manager", "monitor", "gc", "tserver"}; - - private static final Set VALID_CONFIG_KEYS = Set.of("manager", "monitor", "gc", "tserver", - "tservers_per_host", "sservers_per_host", "compaction.coordinator", "compactors_per_host"); + private static final String COMPACTOR_PREFIX = "compactor."; ++ private static final String COMPACTORS_PER_HOST_KEY = "compactors_per_host"; + private static final String GC_KEY = "gc"; + private static final String MANAGER_KEY = "manager"; + private static final String MONITOR_KEY = "monitor"; + private static final String SSERVER_PREFIX = "sserver."; + private static final String SSERVERS_PER_HOST_KEY = "sservers_per_host"; + private static final String TSERVER_PREFIX = "tserver."; + private static final String TSERVERS_PER_HOST_KEY = "tservers_per_host"; + + private static final String[] UNGROUPED_SECTIONS = + new String[] {MANAGER_KEY, MONITOR_KEY, GC_KEY}; + - private static final Set VALID_CONFIG_KEYS = - Set.of(MANAGER_KEY, MONITOR_KEY, GC_KEY, SSERVERS_PER_HOST_KEY, TSERVERS_PER_HOST_KEY); ++ private static final Set VALID_CONFIG_KEYS = Set.of(MANAGER_KEY, MONITOR_KEY, GC_KEY, ++ SSERVERS_PER_HOST_KEY, TSERVERS_PER_HOST_KEY, COMPACTORS_PER_HOST_KEY); private static final Set VALID_CONFIG_PREFIXES = - Set.of("compaction.compactor.", "sserver."); + Set.of(COMPACTOR_PREFIX, SSERVER_PREFIX, TSERVER_PREFIX); private static final Predicate VALID_CONFIG_SECTIONS = section -> VALID_CONFIG_KEYS.contains(section) diff --cc core/src/test/java/org/apache/accumulo/core/conf/cluster/ClusterConfigParserTest.java index 148b5e4f24,1410dc569a..189e48afc3 --- a/core/src/test/java/org/apache/accumulo/core/conf/cluster/ClusterConfigParserTest.java +++ b/core/src/test/java/org/apache/accumulo/core/conf/cluster/ClusterConfigParserTest.java @@@ -63,15 -63,17 +63,16 @@@ public class ClusterConfigParserTest assertEquals("localhost1 localhost2", contents.get("monitor")); assertTrue(contents.containsKey("gc")); assertEquals("localhost", contents.get("gc")); -assertTrue(contents.containsKey("tserver")); -assertEquals("localhost1 localhost2 localhost3 localhost4", contents.get("tserver")); -assertFalse(contents.containsKey("compaction")); -assertFalse(contents.containsKey("compaction.coordinator")); -assertFalse(contents.containsKey("compaction.compactor")); -assertFalse(contents.containsKey("compaction.compactor.queue")); -assertFalse(contents.containsKey("compaction.compactor.q1")); -assertFalse(contents.containsKey("compaction.compactor.q2")); +assertFalse(contents.containsKey("tserver")); +assertTrue(contents.containsKey("tserver.default")); +assertEquals("localhost1 localhost2 localhost3 localhost4", contents.get("tserver.default")); +assertFalse(contents.containsKey("compactor")); +assertFalse(contents.containsKey("compactor.queue")); +assertFalse(contents.containsKey("compactor.q1")); +assertFalse(contents.containsKey("compactor.q2")); assertFalse(contents.containsKey("tservers_per_host")); assertFalse(contents.containsKey("sservers_per_host")); + assertFalse(co
(accumulo) branch elasticity updated (afe2857935 -> 422d48a432)
This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a change to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git from afe2857935 Merge branch 'main' into elasticity add 18b745466e Added compactors_per_host to accumulo-cluster script (#4329) add ebf7054d1f Merge branch '2.1' new 422d48a432 Merge branch 'main' into elasticity The 1 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: assemble/bin/accumulo-cluster | 10 +- .../accumulo/core/conf/cluster/ClusterConfigParser.java | 8 ++-- .../accumulo/core/conf/cluster/ClusterConfigParserTest.java | 11 --- .../core/conf/cluster/cluster-with-optional-services.yaml | 3 ++- 4 files changed, 25 insertions(+), 7 deletions(-)
(accumulo) 01/01: Merge branch '2.1'
This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git commit ebf7054d1f6bf62d5170dcbbb07c7b0ee572cf2c Merge: c976af383f 18b745466e Author: Dave Marion AuthorDate: Mon Mar 4 12:34:51 2024 + Merge branch '2.1' assemble/bin/accumulo-cluster | 10 +- .../accumulo/core/conf/cluster/ClusterConfigParser.java | 5 - .../accumulo/core/conf/cluster/ClusterConfigParserTest.java | 11 --- .../core/conf/cluster/cluster-with-optional-services.yaml | 3 ++- 4 files changed, 23 insertions(+), 6 deletions(-)
(accumulo) branch main updated (c976af383f -> ebf7054d1f)
This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a change to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git from c976af383f Merge branch '2.1' add 18b745466e Added compactors_per_host to accumulo-cluster script (#4329) new ebf7054d1f Merge branch '2.1' The 1 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: assemble/bin/accumulo-cluster | 10 +- .../accumulo/core/conf/cluster/ClusterConfigParser.java | 5 - .../accumulo/core/conf/cluster/ClusterConfigParserTest.java | 11 --- .../core/conf/cluster/cluster-with-optional-services.yaml | 3 ++- 4 files changed, 23 insertions(+), 6 deletions(-)
(accumulo) branch 2.1 updated: Added compactors_per_host to accumulo-cluster script (#4329)
This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch 2.1 in repository https://gitbox.apache.org/repos/asf/accumulo.git The following commit(s) were added to refs/heads/2.1 by this push: new 18b745466e Added compactors_per_host to accumulo-cluster script (#4329) 18b745466e is described below commit 18b745466eee10c41f72012908b867005ca89881 Author: Dave Marion AuthorDate: Mon Mar 4 07:22:46 2024 -0500 Added compactors_per_host to accumulo-cluster script (#4329) --- assemble/bin/accumulo-cluster | 10 +- .../accumulo/core/conf/cluster/ClusterConfigParser.java | 5 - .../accumulo/core/conf/cluster/ClusterConfigParserTest.java | 11 --- .../core/conf/cluster/cluster-with-optional-services.yaml | 3 ++- 4 files changed, 23 insertions(+), 6 deletions(-) diff --git a/assemble/bin/accumulo-cluster b/assemble/bin/accumulo-cluster index c9936cb78b..5dd9de7e4e 100755 --- a/assemble/bin/accumulo-cluster +++ b/assemble/bin/accumulo-cluster @@ -119,6 +119,11 @@ function parse_config { if [[ -z $NUM_SSERVERS ]]; then echo "INFO: ${NUM_SSERVERS} sservers will be started per host" fi + + if [[ -z $NUM_COMPACTORS ]]; then +echo "INFO: ${NUM_COMPACTORS} compactors will be started per host" + fi + } function control_service() { @@ -130,6 +135,7 @@ function control_service() { last_instance_id=1 [[ $service == "tserver" ]] && last_instance_id=${NUM_TSERVERS:-1} [[ $service == "sserver" ]] && last_instance_id=${NUM_SSERVERS:-1} + [[ $service == "compactor" ]] && last_instance_id=${NUM_COMPACTORS:-1} for ((inst_id = 1; inst_id <= last_instance_id; inst_id++)); do ACCUMULO_SERVICE_INSTANCE="" @@ -510,10 +516,12 @@ tserver: # to start on each host. If the following variables are not set, then they default to 1. # If the environment variable NUM_TSERVERS is set when running accumulo_cluster # then its value will override what is set in this file for tservers_per_host. Likewise if -# NUM_SSERVERS is set then it will override sservers_per_host. +# NUM_SSERVERS or NUM_COMPACTORS are set then it will override sservers_per_host and +# compactors_per_host. # tservers_per_host: 1 sservers_per_host: 1 +compactors_per_host: 1 EOF ;; diff --git a/core/src/main/java/org/apache/accumulo/core/conf/cluster/ClusterConfigParser.java b/core/src/main/java/org/apache/accumulo/core/conf/cluster/ClusterConfigParser.java index 4f41cbef3e..5790fa7fd3 100644 --- a/core/src/main/java/org/apache/accumulo/core/conf/cluster/ClusterConfigParser.java +++ b/core/src/main/java/org/apache/accumulo/core/conf/cluster/ClusterConfigParser.java @@ -42,7 +42,7 @@ public class ClusterConfigParser { private static final String[] SECTIONS = new String[] {"manager", "monitor", "gc", "tserver"}; private static final Set VALID_CONFIG_KEYS = Set.of("manager", "monitor", "gc", "tserver", - "tservers_per_host", "sservers_per_host", "compaction.coordinator"); + "tservers_per_host", "sservers_per_host", "compaction.coordinator", "compactors_per_host"); private static final Set VALID_CONFIG_PREFIXES = Set.of("compaction.compactor.", "sserver."); @@ -150,6 +150,9 @@ public class ClusterConfigParser { String numSservers = config.getOrDefault("sservers_per_host", "1"); out.print("NUM_SSERVERS=\"${NUM_SSERVERS:=" + numSservers + "}\"\n"); +String numCompactors = config.getOrDefault("compactors_per_host", "1"); +out.print("NUM_COMPACTORS=\"${NUM_COMPACTORS:=" + numCompactors + "}\"\n"); + out.flush(); } diff --git a/core/src/test/java/org/apache/accumulo/core/conf/cluster/ClusterConfigParserTest.java b/core/src/test/java/org/apache/accumulo/core/conf/cluster/ClusterConfigParserTest.java index ef2c2382bc..1410dc569a 100644 --- a/core/src/test/java/org/apache/accumulo/core/conf/cluster/ClusterConfigParserTest.java +++ b/core/src/test/java/org/apache/accumulo/core/conf/cluster/ClusterConfigParserTest.java @@ -73,6 +73,7 @@ public class ClusterConfigParserTest { assertFalse(contents.containsKey("compaction.compactor.q2")); assertFalse(contents.containsKey("tservers_per_host")); assertFalse(contents.containsKey("sservers_per_host")); +assertFalse(contents.containsKey("compactors_per_host")); } @Test @@ -84,7 +85,7 @@ public class ClusterConfigParserTest { Map contents = ClusterConfigParser.parseConfiguration(new File(configFile.toURI()).getAbsolutePath()); -assertEquals(12, con