This is an automated email from the ASF dual-hosted git repository. krathbun pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git
commit c170d2912ca5f5204f41fc169037a185fc109ffe Merge: 9827d8475d 8c25167e20 Author: Kevin Rathbun <[email protected]> AuthorDate: Fri Jul 11 10:57:58 2025 -0400 Merge branch '2.1' .../accumulo/miniclusterImpl/MiniAccumuloClusterImpl.java | 8 ++++---- .../accumulo/miniclusterImpl/MiniAccumuloConfigImpl.java | 2 +- .../main/java/org/apache/accumulo/test/TotalQueuedIT.java | 15 ++++++++++++--- 3 files changed, 17 insertions(+), 8 deletions(-) diff --cc minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterImpl.java index 7f80211d8c,c360bf269d..e02ce199c5 --- a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterImpl.java +++ b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterImpl.java @@@ -200,23 -177,21 +200,23 @@@ public class MiniAccumuloClusterImpl im if (!config.useExistingInstance()) { if (!config.useExistingZooKeepers()) { - mkdirs(config.getZooKeeperDir()); + mkdirs(config.getZooKeeperDir().toPath()); } - mkdirs(config.getAccumuloDir()); + mkdirs(config.getAccumuloDir().toPath()); } + java.nio.file.Path confDir = config.getConfDir().toPath(); - if (config.useMiniDFS()) { + if (config.getUseMiniDFS()) { - File nn = new File(config.getAccumuloDir(), "nn"); + java.nio.file.Path configPath = config.getAccumuloDir().toPath(); + java.nio.file.Path nn = configPath.resolve("nn"); mkdirs(nn); - File dn = new File(config.getAccumuloDir(), "dn"); + java.nio.file.Path dn = configPath.resolve("dn"); mkdirs(dn); - File dfs = new File(config.getAccumuloDir(), "dfs"); + java.nio.file.Path dfs = configPath.resolve("dfs"); mkdirs(dfs); Configuration conf = new Configuration(); - conf.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY, nn.getAbsolutePath()); - conf.set(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, dn.getAbsolutePath()); + conf.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY, nn.toAbsolutePath().toString()); + conf.set(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, dn.toAbsolutePath().toString()); conf.set(DFSConfigKeys.DFS_REPLICATION_KEY, "1"); conf.set(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_KEY, "1"); conf.set("dfs.support.append", "true"); @@@ -514,10 -480,7 +514,10 @@@ justification = "insecure socket used for reservation") @Override public synchronized void start() throws IOException, InterruptedException { + Preconditions.checkState(clusterState != State.TERMINATED, + "Cannot start a cluster that is terminated."); + - if (config.useMiniDFS() && miniDFS.get() == null) { + if (config.getUseMiniDFS() && miniDFS.get() == null) { throw new IllegalStateException("Cannot restart mini when using miniDFS"); } @@@ -1170,10 -979,10 +1170,10 @@@ @Override public Path getTemporaryPath() { String p; - if (config.useMiniDFS()) { + if (config.getUseMiniDFS()) { p = "/tmp/"; } else { - File tmp = new File(config.getDir(), "tmp"); + java.nio.file.Path tmp = config.getDir().toPath().resolve("tmp"); mkdirs(tmp); p = tmp.toString(); } diff --cc test/src/main/java/org/apache/accumulo/test/TotalQueuedIT.java index 85afbdf2eb,9b99c5f24e..7f85448fa3 --- a/test/src/main/java/org/apache/accumulo/test/TotalQueuedIT.java +++ b/test/src/main/java/org/apache/accumulo/test/TotalQueuedIT.java @@@ -28,13 -28,14 +28,14 @@@ import org.apache.accumulo.core.client. import org.apache.accumulo.core.client.AccumuloClient; import org.apache.accumulo.core.client.BatchWriter; import org.apache.accumulo.core.client.BatchWriterConfig; +import org.apache.accumulo.core.client.admin.servers.ServerId; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.Mutation; -import org.apache.accumulo.core.master.thrift.TabletServerStatus; +import org.apache.accumulo.core.manager.thrift.TabletServerStatus; import org.apache.accumulo.core.rpc.ThriftUtil; import org.apache.accumulo.core.rpc.clients.ThriftClientTypes; -import org.apache.accumulo.core.tabletserver.thrift.TabletClientService; -import org.apache.accumulo.core.util.HostAndPort; +import org.apache.accumulo.core.tabletserver.thrift.TabletServerClientService; + import org.apache.accumulo.minicluster.ServerType; import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.test.functional.ConfigurableMacBase; @@@ -52,8 -51,11 +53,11 @@@ public class TotalQueuedIT extends Conf @Override public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) { - cfg.setNumTservers(1); + cfg.getClusterServerConfiguration().setNumDefaultTabletServers(1); - cfg.useMiniDFS(); + // use mini DFS so walogs sync and flush will work + cfg.useMiniDFS(true); + // property is a fixed property (requires restart to be picked up), so set here in initial cfg + cfg.setProperty(Property.TSERV_TOTAL_MUTATION_QUEUE_MAX.getKey(), "" + SMALL_QUEUE_SIZE); } private int SMALL_QUEUE_SIZE = 100000; @@@ -102,7 -102,14 +104,14 @@@ c.instanceOperations().setProperty(Property.TSERV_TOTAL_MUTATION_QUEUE_MAX.getKey(), "" + LARGE_QUEUE_SIZE); c.tableOperations().flush(tableName, null, null, true); - sleepUninterruptibly(1, TimeUnit.SECONDS); + Thread.sleep(SECONDS.toMillis(1)); + + // property changed is a fixed property (requires restart to be picked up), restart TServers + c.tableOperations().offline(tableName, true); + getCluster().getClusterControl().stopAllServers(ServerType.TABLET_SERVER); + getCluster().getClusterControl().startAllServers(ServerType.TABLET_SERVER); + c.tableOperations().online(tableName, true); + try (BatchWriter bw = c.createBatchWriter(tableName, cfg)) { now = System.currentTimeMillis(); bytesSent = 0;
