(accumulo) branch elasticity updated: avoids a metadata lookup per mutation for batch writer (#4280)

2024-02-16 Thread kturner
This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch elasticity
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/elasticity by this push:
 new af9b0602db avoids a metadata lookup per mutation for batch writer 
(#4280)
af9b0602db is described below

commit af9b0602dbb3debb34828d7d3116ea310e175163
Author: Keith Turner 
AuthorDate: Fri Feb 16 19:01:03 2024 -0500

avoids a metadata lookup per mutation for batch writer (#4280)

When writing to unhosted ondemand tablets the batch writer was performing
a metadata lookup per mutation before it requested hosting.  This was of
course extremely slow.  This commit makes a quick change to perform a
metadata lookup per extent instead of per mutation.  This could be
further improved because its still not as good as the pre elasticity
code. There is in ELASTICITY_TODO in the commit about further
improvements.

This is a massive improvement.  Was running FateStarvationIT which
writes 100K rows.  Without this change it took 53 secs.  With this
change it took 1.7 secs.

This mostly solves #3708, there may be some further minor improvements
that could be made.
---
 .../accumulo/core/clientImpl/ClientTabletCacheImpl.java   | 15 +--
 1 file changed, 13 insertions(+), 2 deletions(-)

diff --git 
a/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientTabletCacheImpl.java
 
b/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientTabletCacheImpl.java
index 4539a60b3a..10fb3aa21e 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientTabletCacheImpl.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientTabletCacheImpl.java
@@ -234,12 +234,23 @@ public class ClientTabletCacheImpl extends 
ClientTabletCache {
 
   wLock.lock();
   try {
+CachedTablet lastTablet = null;
 for (T mutation : notInCache) {
 
   row.set(mutation.getRow());
 
-  CachedTablet tl =
-  _findTablet(context, row, false, false, false, lcSession, 
LocationNeed.REQUIRED);
+  // ELASTICITY_TODO using lastTablet avoids doing a metadata table 
lookup per mutation.
+  // However this still does at least one metadata lookup per tablet. 
This is not as good as
+  // the pre-elasticity code that would lookup N tablets at once and 
use them to bin
+  // mutations. So there is further room for improvement in the way 
this code interacts with
+  // cache and metadata table.
+  CachedTablet tl;
+  if (lastTablet != null && lastTablet.getExtent().contains(row)) {
+tl = lastTablet;
+  } else {
+tl = _findTablet(context, row, false, false, false, lcSession, 
LocationNeed.REQUIRED);
+lastTablet = tl;
+  }
 
   if (!addMutation(binnedMutations, mutation, tl, lcSession)) {
 failures.add(mutation);



(accumulo) branch elasticity updated: Fix build from broken merge

2024-02-16 Thread ddanielr
This is an automated email from the ASF dual-hosted git repository.

ddanielr pushed a commit to branch elasticity
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/elasticity by this push:
 new abad2b2be9 Fix build from broken merge
abad2b2be9 is described below

commit abad2b2be9306de653924d88cb5ce77011b13741
Author: Daniel Roberts 
AuthorDate: Fri Feb 16 23:20:34 2024 +

Fix build from broken merge
---
 .../accumulo/manager/compaction/coordinator/CompactionCoordinator.java  | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
index bf2e9009f3..3bf393d1eb 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
@@ -296,7 +296,7 @@ public class CompactionCoordinator
   }
 
   protected long getMissingCompactorWarningTime() {
-return 
getConfiguration().getTimeInMillis(Property.COMPACTOR_MAX_JOB_WAIT_TIME) * 3;
+return 
this.ctx.getConfiguration().getTimeInMillis(Property.COMPACTOR_MAX_JOB_WAIT_TIME)
 * 3;
   }
 
   protected long getTServerCheckInterval() {



(accumulo) branch elasticity updated (3de72f63c4 -> 9f073b4838)

2024-02-16 Thread ddanielr
This is an automated email from the ASF dual-hosted git repository.

ddanielr pushed a change to branch elasticity
in repository https://gitbox.apache.org/repos/asf/accumulo.git


from 3de72f63c4 fixes FateStarvationIT (#4278)
 add 3298d6db89 Add Compaction Job Min & Max Wait properties (#4223)
 add 9f430a2695 Merge branch '2.1'
 add 4886e821ed Remove unused var added by merge
 new 9f073b4838 Merge branch 'main' into elasticity

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../main/java/org/apache/accumulo/core/conf/Property.java  |  4 ++--
 .../main/java/org/apache/accumulo/compactor/Compactor.java | 14 --
 .../compaction/coordinator/CompactionCoordinator.java  |  3 +--
 3 files changed, 11 insertions(+), 10 deletions(-)



(accumulo) 01/01: Merge branch 'main' into elasticity

2024-02-16 Thread ddanielr
This is an automated email from the ASF dual-hosted git repository.

ddanielr pushed a commit to branch elasticity
in repository https://gitbox.apache.org/repos/asf/accumulo.git

commit 9f073b4838ee87847d51fc94878e6aeffae85a78
Merge: 3de72f63c4 4886e821ed
Author: Daniel Roberts 
AuthorDate: Fri Feb 16 22:39:39 2024 +

Merge branch 'main' into elasticity

 .../main/java/org/apache/accumulo/core/conf/Property.java  |  4 ++--
 .../main/java/org/apache/accumulo/compactor/Compactor.java | 14 --
 .../compaction/coordinator/CompactionCoordinator.java  |  3 +--
 3 files changed, 11 insertions(+), 10 deletions(-)

diff --cc core/src/main/java/org/apache/accumulo/core/conf/Property.java
index 22c2606c85,6cb1207f40..394b79746e
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@@ -1112,12 -1130,20 +1112,12 @@@ public enum Property 
COMPACTOR_MIN_JOB_WAIT_TIME("compactor.wait.time.job.min", "1s", 
PropertyType.TIMEDURATION,
"The minimum amount of time to wait between checks for the next 
compaction job, backing off"
+ "exponentially until COMPACTOR_MAX_JOB_WAIT_TIME is reached.",
-   "4.0.0"),
+   "2.1.3"),
 -  @Experimental
COMPACTOR_MAX_JOB_WAIT_TIME("compactor.wait.time.job.max", "5m", 
PropertyType.TIMEDURATION,
"Compactors do exponential backoff when their request for work 
repeatedly come back empty. "
+ "This is the maximum amount of time to wait between checks for 
the next compaction job.",
-   "4.0.0"),
+   "2.1.3"),
@Experimental
 -  COMPACTOR_PORTSEARCH("compactor.port.search", "false", PropertyType.BOOLEAN,
 -  "If the compactor.port.client is in use, search higher ports until one 
is available.",
 -  "2.1.0"),
 -  @Experimental
 -  COMPACTOR_CLIENTPORT("compactor.port.client", "9133", PropertyType.PORT,
 -  "The port used for handling client connections on the compactor 
servers.", "2.1.0"),
 -  @Experimental
COMPACTOR_MINTHREADS("compactor.threads.minimum", "1", PropertyType.COUNT,
"The minimum number of threads to use to handle incoming requests.", 
"2.1.0"),
@Experimental
diff --cc 
server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
index 1a1c2ece58,b81c30dee3..114dd59519
--- 
a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
+++ 
b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
@@@ -589,15 -582,16 +589,17 @@@ public class Compactor extends Abstract
}
  
protected long getWaitTimeBetweenCompactionChecks() {
 -// get the total number of compactors assigned to this queue
 -int numCompactors = ExternalCompactionUtil.countCompactors(queueName, 
getContext());
 +// get the total number of compactors assigned to this group
 +int numCompactors =
 +ExternalCompactionUtil.countCompactors(this.getResourceGroup(), 
getContext());
- // Aim for around 3 compactors checking in every second
- long sleepTime = numCompactors * 1000L / 3;
- // Ensure a compactor sleeps at least around a second
- sleepTime = Math.max(1000, sleepTime);
- // Ensure a compactor sleep not too much more than 5 mins
- sleepTime = Math.min(300_000L, sleepTime);
+ long minWait = 
getConfiguration().getTimeInMillis(Property.COMPACTOR_MIN_JOB_WAIT_TIME);
+ // Aim for around 3 compactors checking in per min wait time.
+ long sleepTime = numCompactors * minWait / 3;
+ // Ensure a compactor waits at least the minimum time
+ sleepTime = Math.max(minWait, sleepTime);
+ // Ensure a sleeping compactor has a configurable max sleep time
+ sleepTime = 
Math.min(getConfiguration().getTimeInMillis(Property.COMPACTOR_MAX_JOB_WAIT_TIME),
+ sleepTime);
  // Add some random jitter to the sleep time, that averages out to sleep 
time. This will spread
  // compactors out evenly over time.
  sleepTime = (long) (.9 * sleepTime + sleepTime * .2 * 
RANDOM.get().nextDouble());
diff --cc 
server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
index fdbf009352,00..bf2e9009f3
mode 100644,00..100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
@@@ -1,1012 -1,0 +1,1011 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   

(accumulo) branch main updated: Remove unused var added by merge

2024-02-16 Thread ddanielr
This is an automated email from the ASF dual-hosted git repository.

ddanielr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/main by this push:
 new 4886e821ed Remove unused var added by merge
4886e821ed is described below

commit 4886e821ed87ab1934e03540826c6b20863101d3
Author: Daniel Roberts 
AuthorDate: Fri Feb 16 22:27:55 2024 +

Remove unused var added by merge
---
 .../main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java | 1 -
 1 file changed, 1 deletion(-)

diff --git 
a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
 
b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
index 97b09b5e89..1af899acd5 100644
--- 
a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
+++ 
b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
@@ -104,7 +104,6 @@ public class CompactionCoordinator extends AbstractServer
 implements CompactionCoordinatorService.Iface, LiveTServerSet.Listener {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(CompactionCoordinator.class);
-  private static final long TIME_BETWEEN_GC_CHECKS = 5000;
   protected static final QueueSummaries QUEUE_SUMMARIES = new QueueSummaries();
 
   /*



(accumulo) 01/01: Merge branch '2.1'

2024-02-16 Thread ddanielr
This is an automated email from the ASF dual-hosted git repository.

ddanielr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git

commit 9f430a26956449133d13c1a35c367df7ce33dc7e
Merge: 978f8b9eca 3298d6db89
Author: Daniel Roberts 
AuthorDate: Fri Feb 16 22:12:55 2024 +

Merge branch '2.1'

 .../org/apache/accumulo/core/conf/Property.java | 10 ++
 .../accumulo/coordinator/CompactionCoordinator.java |  5 ++---
 .../org/apache/accumulo/compactor/Compactor.java| 21 ++---
 3 files changed, 26 insertions(+), 10 deletions(-)

diff --cc 
server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
index 00956edbf1,1434d2a1f1..b81c30dee3
--- 
a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
+++ 
b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
@@@ -579,15 -592,17 +584,17 @@@ public class Compactor extends Abstract
protected long getWaitTimeBetweenCompactionChecks() {
  // get the total number of compactors assigned to this queue
  int numCompactors = ExternalCompactionUtil.countCompactors(queueName, 
getContext());
- // Aim for around 3 compactors checking in every second
- long sleepTime = numCompactors * 1000L / 3;
- // Ensure a compactor sleeps at least around a second
- sleepTime = Math.max(1000, sleepTime);
- // Ensure a compactor sleep not too much more than 5 mins
- sleepTime = Math.min(300_000L, sleepTime);
+ long minWait = 
getConfiguration().getTimeInMillis(Property.COMPACTOR_MIN_JOB_WAIT_TIME);
+ // Aim for around 3 compactors checking in per min wait time.
+ long sleepTime = numCompactors * minWait / 3;
+ // Ensure a compactor waits at least the minimum time
+ sleepTime = Math.max(minWait, sleepTime);
+ // Ensure a sleeping compactor has a configurable max sleep time
+ sleepTime = 
Math.min(getConfiguration().getTimeInMillis(Property.COMPACTOR_MAX_JOB_WAIT_TIME),
+ sleepTime);
  // Add some random jitter to the sleep time, that averages out to sleep 
time. This will spread
  // compactors out evenly over time.
 -sleepTime = (long) (.9 * sleepTime + sleepTime * .2 * 
random.nextDouble());
 +sleepTime = (long) (.9 * sleepTime + sleepTime * .2 * 
RANDOM.get().nextDouble());
  LOG.trace("Sleeping {}ms based on {} compactors", sleepTime, 
numCompactors);
  return sleepTime;
}



(accumulo) branch main updated (978f8b9eca -> 9f430a2695)

2024-02-16 Thread ddanielr
This is an automated email from the ASF dual-hosted git repository.

ddanielr pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git


from 978f8b9eca Merge branch '2.1'
 add 3298d6db89 Add Compaction Job Min & Max Wait properties (#4223)
 new 9f430a2695 Merge branch '2.1'

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../org/apache/accumulo/core/conf/Property.java | 10 ++
 .../accumulo/coordinator/CompactionCoordinator.java |  5 ++---
 .../org/apache/accumulo/compactor/Compactor.java| 21 ++---
 3 files changed, 26 insertions(+), 10 deletions(-)



(accumulo) branch 2.1 updated: Add Compaction Job Min & Max Wait properties (#4223)

2024-02-16 Thread ddanielr
This is an automated email from the ASF dual-hosted git repository.

ddanielr pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/2.1 by this push:
 new 3298d6db89 Add Compaction Job Min & Max Wait properties (#4223)
3298d6db89 is described below

commit 3298d6db89ad5cd2cf0e73173088df317a4d1fe2
Author: Daniel Roberts 
AuthorDate: Fri Feb 16 17:02:50 2024 -0500

Add Compaction Job Min & Max Wait properties (#4223)

* Add Compaction Min Wait and Max Wait properties

* Adds min and max wait properties to configure the min and max wait 
intervals
in the compactor.
* Changes the logic in compaction-coordinator to use these new properties
 when calculating the wait period for sending warning messages

* Also use the MAX_JOB_WAIT_TIME prop for the thrift retry interval when
the compactor is unable to communicate with the compaction-coordinator.
---
 .../org/apache/accumulo/core/conf/Property.java| 10 
 .../coordinator/CompactionCoordinator.java |  4 +--
 .../coordinator/CompactionCoordinatorTest.java | 21 
 .../org/apache/accumulo/compactor/Compactor.java   | 21 ++--
 .../apache/accumulo/compactor/CompactorTest.java   | 29 ++
 5 files changed, 75 insertions(+), 10 deletions(-)

diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java 
b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
index 1fa04490fb..08a93ae6b3 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@ -1433,6 +1433,16 @@ public enum Property {
   COMPACTOR_PREFIX("compactor.", null, PropertyType.PREFIX,
   "Properties in this category affect the behavior of the accumulo 
compactor server.", "2.1.0"),
   @Experimental
+  COMPACTOR_MIN_JOB_WAIT_TIME("compactor.wait.time.job.min", "1s", 
PropertyType.TIMEDURATION,
+  "The minimum amount of time to wait between checks for the next 
compaction job, backing off"
+  + "exponentially until COMPACTOR_MAX_JOB_WAIT_TIME is reached.",
+  "2.1.3"),
+  @Experimental
+  COMPACTOR_MAX_JOB_WAIT_TIME("compactor.wait.time.job.max", "5m", 
PropertyType.TIMEDURATION,
+  "Compactors do exponential backoff when their request for work 
repeatedly come back empty. "
+  + "This is the maximum amount of time to wait between checks for the 
next compaction job.",
+  "2.1.3"),
+  @Experimental
   COMPACTOR_PORTSEARCH("compactor.port.search", "false", PropertyType.BOOLEAN,
   "If the compactor.port.client is in use, search higher ports until one 
is available.",
   "2.1.0"),
diff --git 
a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
 
b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
index df94ccb824..337f5bc685 100644
--- 
a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
+++ 
b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
@@ -101,8 +101,6 @@ public class CompactionCoordinator extends AbstractServer
 
   private static final Logger LOG = 
LoggerFactory.getLogger(CompactionCoordinator.class);
   private static final long TIME_BETWEEN_GC_CHECKS = 5000;
-  private static final long FIFTEEN_MINUTES = TimeUnit.MINUTES.toMillis(15);
-
   protected static final QueueSummaries QUEUE_SUMMARIES = new QueueSummaries();
 
   /*
@@ -383,7 +381,7 @@ public class CompactionCoordinator extends AbstractServer
   }
 
   protected long getMissingCompactorWarningTime() {
-return FIFTEEN_MINUTES;
+return 
getConfiguration().getTimeInMillis(Property.COMPACTOR_MAX_JOB_WAIT_TIME) * 3;
   }
 
   protected long getTServerCheckInterval() {
diff --git 
a/server/compaction-coordinator/src/test/java/org/apache/accumulo/coordinator/CompactionCoordinatorTest.java
 
b/server/compaction-coordinator/src/test/java/org/apache/accumulo/coordinator/CompactionCoordinatorTest.java
index 7f46d68e7f..117d50108a 100644
--- 
a/server/compaction-coordinator/src/test/java/org/apache/accumulo/coordinator/CompactionCoordinatorTest.java
+++ 
b/server/compaction-coordinator/src/test/java/org/apache/accumulo/coordinator/CompactionCoordinatorTest.java
@@ -39,7 +39,10 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
 
 import org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException;
 import org.apache.accumulo.core.compaction.thrift.TExternalCompaction;
+import org.apache.accumulo.core.conf.ConfigurationCopy;
 import org.apache.accumulo.core.conf.DefaultConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.conf.SiteConfiguration;
 import org.apache.accumulo.core.dataImpl.thrift.TKeyExtent;
 import 

(accumulo) branch elasticity updated: fixes FateStarvationIT (#4278)

2024-02-16 Thread kturner
This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch elasticity
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/elasticity by this push:
 new 3de72f63c4 fixes FateStarvationIT (#4278)
3de72f63c4 is described below

commit 3de72f63c434b977af43b7536d11e35123aa9347
Author: Keith Turner 
AuthorDate: Fri Feb 16 16:45:19 2024 -0500

fixes FateStarvationIT (#4278)

This test had two problems.  First it was using the offline operation to
check if all compactions were finished.  However compactations that were
queued in fate when the offline started would start after the offline
and potentially have fate locks for the check at the end of the test.
Second the test starts a large number of compactions, but there was only
one compactor process.  Started multiple compactor process to process
compactions faster.
---
 .../accumulo/test/functional/FateStarvationIT.java | 43 --
 1 file changed, 40 insertions(+), 3 deletions(-)

diff --git 
a/test/src/main/java/org/apache/accumulo/test/functional/FateStarvationIT.java 
b/test/src/main/java/org/apache/accumulo/test/functional/FateStarvationIT.java
index 92a2d8251c..6b6da95020 100644
--- 
a/test/src/main/java/org/apache/accumulo/test/functional/FateStarvationIT.java
+++ 
b/test/src/main/java/org/apache/accumulo/test/functional/FateStarvationIT.java
@@ -23,21 +23,43 @@ import static 
org.apache.accumulo.core.util.LazySingletons.RANDOM;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 
 import org.apache.accumulo.core.client.Accumulo;
 import org.apache.accumulo.core.client.AccumuloClient;
 import org.apache.accumulo.core.client.admin.NewTableConfiguration;
+import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
 import org.apache.accumulo.test.TestIngest;
 import org.apache.accumulo.test.TestIngest.IngestParams;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Text;
 import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
 
 public class FateStarvationIT extends AccumuloClusterHarness {
 
+  private static final Logger log = 
LoggerFactory.getLogger(FateStarvationIT.class);
+
   @Override
   protected Duration defaultTimeout() {
-return Duration.ofMinutes(2);
+return Duration.ofMinutes(4);
+  }
+
+  @Override
+  public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration 
hadoopCoreSite) {
+var groupName = "user_small";
+// Add this check in case the config changes
+Preconditions.checkState(
+
Property.COMPACTION_SERVICE_DEFAULT_GROUPS.getDefaultValue().contains(groupName));
+// This test creates around ~1300 compaction task, so start more 
compactors. There is randomness
+// so the exact number of task varies.
+cfg.getClusterServerConfiguration().addCompactorResourceGroup(groupName, 
4);
   }
 
   @Test
@@ -53,19 +75,34 @@ public class FateStarvationIT extends 
AccumuloClusterHarness {
   params.dataSize = 50;
   params.cols = 1;
   TestIngest.ingest(c, params);
+  log.debug("Ingest complete");
 
   c.tableOperations().flush(tableName, null, null, true);
+  log.debug("Flush complete");
 
   List splits = new ArrayList<>(TestIngest.getSplitPoints(0, 10, 
67));
 
+  List> futures = new ArrayList<>();
+  var executor = Executors.newCachedThreadPool();
+
   for (int i = 0; i < 100; i++) {
 int idx1 = RANDOM.get().nextInt(splits.size() - 1);
 int idx2 = RANDOM.get().nextInt(splits.size() - (idx1 + 1)) + idx1 + 1;
 
-c.tableOperations().compact(tableName, splits.get(idx1), 
splits.get(idx2), false, false);
+var future = executor.submit(() -> {
+  c.tableOperations().compact(tableName, splits.get(idx1), 
splits.get(idx2), false, true);
+  return null;
+});
+
+futures.add(future);
   }
 
-  c.tableOperations().offline(tableName);
+  log.debug("Started compactions");
+
+  // wait for all compactions to complete
+  for (var future : futures) {
+future.get();
+  }
 
   FunctionalTestUtils.assertNoDanglingFateLocks(getCluster());
 }



(accumulo) branch elasticity updated: Globally Unique FATE Transaction Ids - Part 4 (#4258)

2024-02-16 Thread kturner
This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch elasticity
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/elasticity by this push:
 new a3ec20e41a Globally Unique FATE Transaction Ids - Part 4 (#4258)
a3ec20e41a is described below

commit a3ec20e41a8058106d829f73346f65f1734d1860
Author: Kevin Rathbun <43969518+kevinrr...@users.noreply.github.com>
AuthorDate: Fri Feb 16 14:24:58 2024 -0500

Globally Unique FATE Transaction Ids - Part 4 (#4258)

This addresses several previously deferred changes for issue #4044.
Changes:
- ZooReservation now uses FateId (used in Utils)
- TabletOperationId now uses FateId
- TExternalCompactionJob now uses FateId
- VolumeManager and VolumeManagerImpl now use FateId
- Utils.getLock() lockData now uses the full FateId
- TabletRefresher now uses FateId
- Classes which used the above classes updated
- Several test changes to reflect new changes
- Deferred a couple of changes (in Compactor and CompactionCoordinator) 
(need pull/4247 merged first)
---
 .../java/org/apache/accumulo/core/fate/FateId.java |  23 +++-
 .../core/fate/zookeeper/ZooReservation.java|  24 ++--
 .../core/metadata/schema/TabletOperationId.java|  12 +-
 .../thrift/TExternalCompactionJob.java | 126 -
 core/src/main/thrift/tabletserver.thrift   |   2 +-
 .../core/metadata/schema/TabletMetadataTest.java   |   3 +-
 .../server/constraints/MetadataConstraints.java|   2 +-
 .../apache/accumulo/server/fs/VolumeManager.java   |   5 +-
 .../accumulo/server/fs/VolumeManagerImpl.java  |  11 +-
 .../constraints/MetadataConstraintsTest.java   |   2 +-
 .../org/apache/accumulo/compactor/Compactor.java   |  13 +--
 .../coordinator/CompactionCoordinator.java |   3 +-
 .../apache/accumulo/manager/tableOps/Utils.java|   8 +-
 .../manager/tableOps/bulkVer2/BulkImportMove.java  |   3 +-
 .../manager/tableOps/bulkVer2/RefreshTablets.java  |   3 +-
 .../manager/tableOps/bulkVer2/TabletRefresher.java |  10 +-
 .../manager/tableOps/compact/CompactionDriver.java |   5 +-
 .../manager/tableOps/compact/RefreshTablets.java   |   5 +-
 .../manager/tableOps/delete/ReserveTablets.java|   3 +-
 .../manager/tableOps/merge/DeleteRows.java |   3 +-
 .../manager/tableOps/merge/DeleteTablets.java  |   3 +-
 .../manager/tableOps/merge/FinishTableRangeOp.java |   3 +-
 .../manager/tableOps/merge/MergeTablets.java   |   3 +-
 .../manager/tableOps/merge/ReserveTablets.java |   3 +-
 .../manager/tableOps/split/DeleteOperationIds.java |   3 +-
 .../accumulo/manager/tableOps/split/PreSplit.java  |   6 +-
 .../manager/tableOps/split/UpdateTablets.java  |   3 +-
 .../tableOps/tableImport/MoveExportedFiles.java|   3 +-
 .../compaction/CompactionCoordinatorTest.java  |  10 +-
 .../org/apache/accumulo/test/ScanServerIT.java |   6 +-
 .../test/functional/AmpleConditionalWriterIT.java  |  18 ++-
 .../test/functional/ManagerAssignmentIT.java   |  10 +-
 .../functional/TabletManagementIteratorIT.java |   6 +-
 33 files changed, 191 insertions(+), 152 deletions(-)

diff --git a/core/src/main/java/org/apache/accumulo/core/fate/FateId.java 
b/core/src/main/java/org/apache/accumulo/core/fate/FateId.java
index 5be742d2fd..8907c6879c 100644
--- a/core/src/main/java/org/apache/accumulo/core/fate/FateId.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/FateId.java
@@ -24,6 +24,7 @@ import java.util.stream.Stream;
 
 import org.apache.accumulo.core.data.AbstractId;
 import org.apache.accumulo.core.manager.thrift.TFateId;
+import org.apache.accumulo.core.manager.thrift.TFateInstanceType;
 import org.apache.accumulo.core.util.FastFormat;
 
 /**
@@ -107,7 +108,7 @@ public class FateId extends AbstractId {
* @param fateIdStr the string representation of the FateId
* @return true if the string is a valid FateId, false otherwise
*/
-  public static boolean isFormattedTid(String fateIdStr) {
+  public static boolean isFateId(String fateIdStr) {
 return FATEID_PATTERN.matcher(fateIdStr).matches();
   }
 
@@ -133,6 +134,26 @@ public class FateId extends AbstractId {
 return new FateId(PREFIX + type + ":" + formatTid(tid));
   }
 
+  /**
+   *
+   * @return the TFateId equivalent of the FateId
+   */
+  public TFateId toThrift() {
+TFateInstanceType thriftType;
+FateInstanceType type = getType();
+switch (type) {
+  case USER:
+thriftType = TFateInstanceType.USER;
+break;
+  case META:
+thriftType = TFateInstanceType.META;
+break;
+  default:
+throw new IllegalArgumentException("Invalid FateInstanceType: " + 
type);
+}
+return new TFateId(thriftType, getTid());
+  }
+
   /**
* Returns the hex string equivalent of the tid
*/
diff --git