(accumulo) branch main updated (23e17129de -> 9839e4d42f)
This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a change to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git from 23e17129de Revert #4358 - Replace long + TimeUnit with Duration in ReadOnlyTStore.unreserve() add 05c2f45042 Reduced warning logs under normal conditions in compaction coordinator (#4362) new 9839e4d42f Merge branch '2.1' The 1 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../coordinator/CompactionCoordinator.java | 32 -- .../accumulo/coordinator/QueueSummaries.java | 8 ++ 2 files changed, 37 insertions(+), 3 deletions(-)
(accumulo) 01/01: Merge branch '2.1'
This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git commit 9839e4d42fae9bd20b1529ca0e5e2fed4122 Merge: 23e17129de 05c2f45042 Author: Dave Marion AuthorDate: Wed Mar 13 19:52:45 2024 + Merge branch '2.1' .../coordinator/CompactionCoordinator.java | 32 -- .../accumulo/coordinator/QueueSummaries.java | 8 ++ 2 files changed, 37 insertions(+), 3 deletions(-) diff --cc server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java index d5ecbf9ddd,b0ec498a9e..685080c5b1 --- a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java +++ b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java @@@ -22,8 -23,7 +22,9 @@@ import static com.google.common.util.co import java.lang.reflect.InvocationTargetException; import java.net.UnknownHostException; +import java.util.ArrayList; + import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; @@@ -321,35 -325,58 +325,57 @@@ public class CompactionCoordinator exte LOG.info("Shutting down"); } + private Map> getIdleCompactors() { + + Map> allCompactors = + ExternalCompactionUtil.getCompactorAddrs(getContext()); + + Set emptyQueues = new HashSet<>(); + + // Remove all of the compactors that are running a compaction + RUNNING_CACHE.values().forEach(rc -> { + List busyCompactors = allCompactors.get(rc.getQueueName()); + if (busyCompactors != null + && busyCompactors.remove(HostAndPort.fromString(rc.getCompactorAddress( { + if (busyCompactors.isEmpty()) { + emptyQueues.add(rc.getQueueName()); + } + } + }); + // Remove entries with empty queues + emptyQueues.forEach(e -> allCompactors.remove(e)); + return allCompactors; + } + private void updateSummaries() { -ExecutorService executor = ThreadPools.getServerThreadPools().createFixedThreadPool(10, -"Compaction Summary Gatherer", false); -try { - Set queuesSeen = new ConcurrentSkipListSet<>(); - tserverSet.getCurrentServers().forEach(tsi -> { -executor.execute(() -> updateSummaries(tsi, queuesSeen)); - }); +final ArrayList> tasks = new ArrayList<>(); +Set queuesSeen = new ConcurrentSkipListSet<>(); - executor.shutdown(); +tserverSet.getCurrentServers().forEach(tsi -> { + tasks.add(summariesExecutor.submit(() -> updateSummaries(tsi, queuesSeen))); +}); - try { -while (!executor.awaitTermination(1, TimeUnit.MINUTES)) {} - } catch (InterruptedException e) { -Thread.currentThread().interrupt(); -throw new RuntimeException(e); +// Wait for all tasks to complete +while (!tasks.isEmpty()) { + Iterator> iter = tasks.iterator(); + while (iter.hasNext()) { +Future f = iter.next(); +if (f.isDone()) { + iter.remove(); +} } + Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS); +} - // remove any queues that were seen in the past, but were not seen in the latest gathering of - // summaries - TIME_COMPACTOR_LAST_CHECKED.keySet().retainAll(queuesSeen); +// remove any queues that were seen in the past, but were not seen in the latest gathering of +// summaries +TIME_COMPACTOR_LAST_CHECKED.keySet().retainAll(queuesSeen); - // add any queues that were never seen before - queuesSeen.forEach(q -> { -TIME_COMPACTOR_LAST_CHECKED.computeIfAbsent(q, k -> System.currentTimeMillis()); - }); -} finally { - executor.shutdownNow(); -} +// add any queues that were never seen before +queuesSeen.forEach(q -> { + TIME_COMPACTOR_LAST_CHECKED.computeIfAbsent(q, k -> System.currentTimeMillis()); +}); } private void updateSummaries(TServerInstance tsi, Set queuesSeen) {
(accumulo) branch main updated: Revert #4358 - Replace long + TimeUnit with Duration in ReadOnlyTStore.unreserve()
This is an automated email from the ASF dual-hosted git repository. domgarguilo pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git The following commit(s) were added to refs/heads/main by this push: new 23e17129de Revert #4358 - Replace long + TimeUnit with Duration in ReadOnlyTStore.unreserve() 23e17129de is described below commit 23e17129de0350d5496345c78d7fd86080bb1115 Author: Dom G AuthorDate: Wed Mar 13 15:02:23 2024 -0400 Revert #4358 - Replace long + TimeUnit with Duration in ReadOnlyTStore.unreserve() --- .../java/org/apache/accumulo/core/fate/AdminUtil.java | 8 .../org/apache/accumulo/core/fate/AgeOffStore.java | 10 +- .../main/java/org/apache/accumulo/core/fate/Fate.java | 14 +++--- .../org/apache/accumulo/core/fate/ReadOnlyTStore.java | 5 +++-- .../java/org/apache/accumulo/core/fate/ZooStore.java | 10 ++ .../org/apache/accumulo/core/logging/FateLogger.java | 6 +++--- .../org/apache/accumulo/core/fate/AgeOffStoreTest.java | 18 +- .../java/org/apache/accumulo/core/fate/TestStore.java | 4 ++-- 8 files changed, 39 insertions(+), 36 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/fate/AdminUtil.java b/core/src/main/java/org/apache/accumulo/core/fate/AdminUtil.java index 7cc0a9c004..858e6e6998 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/AdminUtil.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/AdminUtil.java @@ -20,7 +20,6 @@ package org.apache.accumulo.core.fate; import static java.nio.charset.StandardCharsets.UTF_8; -import java.time.Duration; import java.time.ZoneOffset; import java.time.format.DateTimeFormatter; import java.util.ArrayList; @@ -33,6 +32,7 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; +import java.util.concurrent.TimeUnit; import org.apache.accumulo.core.fate.ReadOnlyTStore.TStatus; import org.apache.accumulo.core.fate.zookeeper.FateLock; @@ -368,7 +368,7 @@ public class AdminUtil { long timeCreated = zs.timeCreated(tid); - zs.unreserve(tid, Duration.ZERO); + zs.unreserve(tid, 0, TimeUnit.MILLISECONDS); if (includeByStatus(status, filterStatus) && includeByTxid(tid, filterTxid)) { statuses.add(new TransactionStatus(tid, status, txName, hlocks, wlocks, top, timeCreated)); @@ -451,7 +451,7 @@ public class AdminUtil { break; } -zs.unreserve(txid, Duration.ZERO); +zs.unreserve(txid, 0, TimeUnit.MILLISECONDS); return state; } @@ -495,7 +495,7 @@ public class AdminUtil { break; } -zs.unreserve(txid, Duration.ZERO); +zs.unreserve(txid, 0, TimeUnit.MILLISECONDS); return state; } diff --git a/core/src/main/java/org/apache/accumulo/core/fate/AgeOffStore.java b/core/src/main/java/org/apache/accumulo/core/fate/AgeOffStore.java index bd2bd5208b..ca016d0c9c 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/AgeOffStore.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/AgeOffStore.java @@ -19,13 +19,13 @@ package org.apache.accumulo.core.fate; import java.io.Serializable; -import java.time.Duration; import java.util.EnumSet; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.concurrent.TimeUnit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -108,7 +108,7 @@ public class AgeOffStore implements TStore { } } finally { - store.unreserve(txid, Duration.ZERO); + store.unreserve(txid, 0, TimeUnit.MILLISECONDS); } } catch (Exception e) { log.warn("Failed to age off FATE tx " + FateTxId.formatTid(txid), e); @@ -138,7 +138,7 @@ public class AgeOffStore implements TStore { break; } } finally { -store.unreserve(txid, Duration.ZERO); +store.unreserve(txid, 0, TimeUnit.MILLISECONDS); } } } @@ -166,8 +166,8 @@ public class AgeOffStore implements TStore { } @Override - public void unreserve(long tid, Duration deferTime) { -store.unreserve(tid, deferTime); + public void unreserve(long tid, long deferTime, TimeUnit deferTimeUnit) { +store.unreserve(tid, deferTime, deferTimeUnit); } @Override diff --git a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java index 4fe07bb8b2..1a14418b1a 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java @@ -30,12 +30,12 @@ import static org.apache.accumulo.core.fate.ReadOnlyTStore.TStatus.SUCCESSFUL; import static org.apache.accumulo.core.fate.ReadOnlyTStore.TStatus.UNKNOWN; import static org.apache.accumulo.core.util.ShutdownUtil.isIOException; -import java.time.Duration; import
(accumulo) branch elasticity updated (8384b9be6a -> ae49a7ff02)
This is an automated email from the ASF dual-hosted git repository. kturner pushed a change to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git from 8384b9be6a Fate Op Command Updates and Tests (#4350) add 162b8effb1 Replace long + TimeUnit with Duration in ReadOnlyTStore.unreserve() (#4358) add 92331ea113 Throw error when non-standard chars exist (#4348) add 171a1e144c Merge branch '2.1' add ae6085c346 Adds NanoTime wrapper for System.nanoTime (#4364) new ae49a7ff02 Merge branch 'main' into elasticity The 1 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../accumulo/core/fate/AbstractFateStore.java | 24 +-- .../org/apache/accumulo/core/fate/AdminUtil.java | 6 +- .../java/org/apache/accumulo/core/fate/Fate.java | 15 +- .../org/apache/accumulo/core/fate/FateCleaner.java | 3 +- .../org/apache/accumulo/core/fate/FateStore.java | 6 +- .../accumulo/core/fate/WrappedFateTxStore.java | 6 +- .../accumulo/core/file/rfile/GenerateSplits.java | 32 +++- .../apache/accumulo/core/util/time/NanoTime.java | 104 + .../apache/accumulo/core/fate/FateCleanerTest.java | 21 ++- .../org/apache/accumulo/core/fate/TestStore.java | 4 +- .../core/file/rfile/GenerateSplitsTest.java| 64 +++- .../accumulo/core/util/time/NanoTimeTest.java | 162 + .../java/org/apache/accumulo/manager/Manager.java | 9 +- .../test/compaction/ExternalCompaction_1_IT.java | 6 +- .../accumulo/test/fate/accumulo/FateStoreIT.java | 23 ++- 15 files changed, 410 insertions(+), 75 deletions(-) create mode 100644 core/src/main/java/org/apache/accumulo/core/util/time/NanoTime.java create mode 100644 core/src/test/java/org/apache/accumulo/core/util/time/NanoTimeTest.java
(accumulo) 01/01: Merge branch 'main' into elasticity
This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git commit ae49a7ff027178a387d39f048e1090bcffe11162 Merge: 8384b9be6a ae6085c346 Author: Keith Turner AuthorDate: Wed Mar 13 15:12:41 2024 -0400 Merge branch 'main' into elasticity .../accumulo/core/fate/AbstractFateStore.java | 24 +-- .../org/apache/accumulo/core/fate/AdminUtil.java | 6 +- .../java/org/apache/accumulo/core/fate/Fate.java | 15 +- .../org/apache/accumulo/core/fate/FateCleaner.java | 3 +- .../org/apache/accumulo/core/fate/FateStore.java | 6 +- .../accumulo/core/fate/WrappedFateTxStore.java | 6 +- .../accumulo/core/file/rfile/GenerateSplits.java | 32 +++- .../apache/accumulo/core/util/time/NanoTime.java | 104 + .../apache/accumulo/core/fate/FateCleanerTest.java | 21 ++- .../org/apache/accumulo/core/fate/TestStore.java | 4 +- .../core/file/rfile/GenerateSplitsTest.java| 64 +++- .../accumulo/core/util/time/NanoTimeTest.java | 162 + .../java/org/apache/accumulo/manager/Manager.java | 9 +- .../test/compaction/ExternalCompaction_1_IT.java | 6 +- .../accumulo/test/fate/accumulo/FateStoreIT.java | 23 ++- 15 files changed, 410 insertions(+), 75 deletions(-) diff --cc core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java index 702313f5ab,00..0cad25f857 mode 100644,00..100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java @@@ -1,494 -1,0 +1,494 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.core.fate; + +import static java.nio.charset.StandardCharsets.UTF_8; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; +import java.io.UncheckedIOException; ++import java.time.Duration; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; - import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; +import java.util.stream.Stream; + +import org.apache.accumulo.core.fate.Fate.TxInfo; +import org.apache.accumulo.core.util.Pair; ++import org.apache.accumulo.core.util.time.NanoTime; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; +import com.google.common.hash.HashCode; +import com.google.common.hash.Hashing; + +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; + +public abstract class AbstractFateStore implements FateStore { + + private static final Logger log = LoggerFactory.getLogger(AbstractFateStore.class); + + // Default maximum size of 100,000 transactions before deferral is stopped and + // all existing transactions are processed immediately again + public static final int DEFAULT_MAX_DEFERRED = 100_000; + + public static final FateIdGenerator DEFAULT_FATE_ID_GENERATOR = new FateIdGenerator() { +@Override +public FateId fromTypeAndKey(FateInstanceType instanceType, FateKey fateKey) { + HashCode hashCode = Hashing.murmur3_128().hashBytes(fateKey.getSerialized()); + long tid = hashCode.asLong() & 0x7fffL; + return FateId.from(instanceType, tid); +} + }; + + protected final Set reserved; - protected final Map deferred; ++ protected final Map deferred; + private final int maxDeferred; + private final AtomicBoolean deferredOverflow = new AtomicBoolean(); + private final FateIdGenerator fateIdGenerator; + + // This is incremented each time a transaction was unreserved that was non new + protected final SignalCount unreservedNonNewCount = new SignalCount(); + + // This is incremented each time a
(accumulo) branch main updated: Adds NanoTime wrapper for System.nanoTime (#4364)
This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git The following commit(s) were added to refs/heads/main by this push: new ae6085c346 Adds NanoTime wrapper for System.nanoTime (#4364) ae6085c346 is described below commit ae6085c34677e752949ef093cb350281b41a275f Author: Keith Turner AuthorDate: Wed Mar 13 14:08:17 2024 -0400 Adds NanoTime wrapper for System.nanoTime (#4364) * Adds NanoTime wrapper for System.nanoTime Adds a strong type for System.nanoTime() and uses it in a few places. Could be used in many more places if this is merged. Co-authored-by: EdColeman --- .../org/apache/accumulo/core/fate/ZooStore.java| 15 +- .../apache/accumulo/core/util/time/NanoTime.java | 104 + .../accumulo/core/util/time/NanoTimeTest.java | 162 + .../java/org/apache/accumulo/manager/Manager.java | 9 +- 4 files changed, 280 insertions(+), 10 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/fate/ZooStore.java b/core/src/main/java/org/apache/accumulo/core/fate/ZooStore.java index 941c04c241..c3de5f29df 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/ZooStore.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/ZooStore.java @@ -39,12 +39,12 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.TimeUnit; import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeExistsPolicy; import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeMissingPolicy; import org.apache.accumulo.core.util.FastFormat; +import org.apache.accumulo.core.util.time.NanoTime; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException.NoNodeException; import org.apache.zookeeper.KeeperException.NodeExistsException; @@ -64,7 +64,7 @@ public class ZooStore implements TStore { private ZooReaderWriter zk; private String lastReserved = ""; private Set reserved; - private Map deferred; // use Long here to properly handle System.nanoTime() + private Map deferred; private long statusChangeEvents = 0; private int reservationsWaiting = 0; @@ -164,7 +164,7 @@ public class ZooStore implements TStore { } if (deferred.containsKey(tid)) { - if (deferred.get(tid) - System.nanoTime() < 0) { + if (deferred.get(tid).elapsed().compareTo(Duration.ZERO) > 0) { deferred.remove(tid); } else { continue; @@ -203,10 +203,11 @@ public class ZooStore implements TStore { if (deferred.isEmpty()) { this.wait(5000); } else { - final long now = System.nanoTime(); - long minWait = deferred.values().stream().mapToLong(l -> l - now).min().orElseThrow(); + var now = NanoTime.now(); + long minWait = deferred.values().stream() + .mapToLong(nanoTime -> nanoTime.subtract(now).toMillis()).min().orElseThrow(); if (minWait > 0) { -this.wait(Math.min(TimeUnit.NANOSECONDS.toMillis(minWait), 5000)); +this.wait(Math.min(minWait, 5000)); } } } @@ -284,7 +285,7 @@ public class ZooStore implements TStore { } if (deferTime.compareTo(Duration.ZERO) > 0) { -deferred.put(tid, deferTime.toNanos() + System.nanoTime()); +deferred.put(tid, NanoTime.nowPlus(deferTime)); } this.notifyAll(); diff --git a/core/src/main/java/org/apache/accumulo/core/util/time/NanoTime.java b/core/src/main/java/org/apache/accumulo/core/util/time/NanoTime.java new file mode 100644 index 00..f081278589 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/util/time/NanoTime.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.core.util.time; + +import java.time.Duration; + +import
(accumulo) branch elasticity updated: Fate Op Command Updates and Tests (#4350)
This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git The following commit(s) were added to refs/heads/elasticity by this push: new 8384b9be6a Fate Op Command Updates and Tests (#4350) 8384b9be6a is described below commit 8384b9be6abe786ad855d5457eba6bceb3868025 Author: Kevin Rathbun <43969518+kevinrr...@users.noreply.github.com> AuthorDate: Wed Mar 13 14:07:33 2024 -0400 Fate Op Command Updates and Tests (#4350) Changes: - Update summary FateOpsCommand - Prints full FateId not just the long - Option to filter based on FateInstanceType - Option to filter based on FateId - Originally, the summary command printed all Fate transactions, thought it might be good to optionally allow filtering by certain Fate transactions like print allows - Update print FateOpsCommand - Prints full FateId not just the long - Option to filter based on FateInstanceType - Option to filter based on FateId - Originally, could filter by long id, but with the replacement of long transaction id by FateId, this had to be updated. - Instead of receiving "" or "FATE[]" on cmd line, now expects a FateId - Update cancel, delete, and fail FateOpsCommand - Now work with both ZooStore and AccumuloStore by taking the full FateId on the command line and determining the store based on the FateInstanceType of the FateId - Added tests for all 5 FateOpsCommands (FateOpsCommandsIT) - Tests using both AccumuloStore (AccumuloFateOpsCommandsIT) and ZooStore (ZookeeperFateOpsCommandsIT) - Deleted FateSummaryIT, moved the test to FateOpsCommands * Changes: - Wait for condition after stopping ServerType.COMPACTOR instead of sleep - Cleaned up tests to use fewer assertions - Added attempt to --delete and --fail a transaction when the Manager is still alive in testFateDeleteCommand() and testFateFailCommand() --- .../org/apache/accumulo/core/fate/AdminUtil.java | 239 + .../org/apache/accumulo/core/fate/FateTxId.java| 8 - .../org/apache/accumulo/server/util/Admin.java | 110 ++-- .../server/util/fateCommand/FateSummaryReport.java | 34 +- .../server/util/fateCommand/FateTxnDetails.java| 24 +- .../server/util/fateCommand/SummaryReportTest.java | 11 +- .../server/util/fateCommand/TxnDetailsTest.java| 7 +- .../manager/metrics/fate/FateMetricValues.java | 2 +- .../org/apache/accumulo/test/FateSummaryIT.java| 156 -- .../accumulo/test/fate/FateOpsCommandsIT.java | 564 + .../fate/accumulo/AccumuloFateOpsCommandsIT.java | 32 ++ .../fate/zookeeper/ZookeeperFateOpsCommandsIT.java | 37 ++ .../test/functional/FateConcurrencyIT.java | 14 +- .../test/functional/FunctionalTestUtils.java | 2 +- 14 files changed, 888 insertions(+), 352 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/fate/AdminUtil.java b/core/src/main/java/org/apache/accumulo/core/fate/AdminUtil.java index 2a436b3444..c18defb1ac 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/AdminUtil.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/AdminUtil.java @@ -46,7 +46,6 @@ import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeMissingPolicy; import org.apache.accumulo.core.lock.ServiceLock; import org.apache.accumulo.core.lock.ServiceLock.ServiceLockPath; -import org.apache.accumulo.core.util.FastFormat; import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -75,7 +74,7 @@ public class AdminUtil { */ public static class TransactionStatus { -private final long txid; +private final FateId fateId; private final FateInstanceType instanceType; private final TStatus status; private final String txName; @@ -84,10 +83,10 @@ public class AdminUtil { private final String top; private final long timeCreated; -private TransactionStatus(Long tid, FateInstanceType instanceType, TStatus status, +private TransactionStatus(FateId fateId, FateInstanceType instanceType, TStatus status, String txName, List hlocks, List wlocks, String top, Long timeCreated) { - this.txid = tid; + this.fateId = fateId; this.instanceType = instanceType; this.status = status; this.txName = txName; @@ -102,8 +101,8 @@ public class AdminUtil { * @return This fate operations transaction id, formatted in the same way as FATE transactions * are in the Accumulo logs. */ -public String getTxid() { - return FastFormat.toHexString(txid); +public FateId getFateId() { + return fateId; }
(accumulo) branch 2.1 updated: Reduced warning logs under normal conditions in compaction coordinator (#4362)
This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch 2.1 in repository https://gitbox.apache.org/repos/asf/accumulo.git The following commit(s) were added to refs/heads/2.1 by this push: new 05c2f45042 Reduced warning logs under normal conditions in compaction coordinator (#4362) 05c2f45042 is described below commit 05c2f45042ee91a5fe04702caa77ab19f78c0f9a Author: Dave Marion AuthorDate: Wed Mar 13 11:52:47 2024 -0400 Reduced warning logs under normal conditions in compaction coordinator (#4362) Fixes #4219 --- .../coordinator/CompactionCoordinator.java | 32 -- .../accumulo/coordinator/QueueSummaries.java | 8 ++ .../coordinator/CompactionCoordinatorTest.java | 6 3 files changed, 43 insertions(+), 3 deletions(-) diff --git a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java index f4819ebefb..b0ec498a9e 100644 --- a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java +++ b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java @@ -23,6 +23,7 @@ import static org.apache.accumulo.core.util.UtilWaitThread.sleepUninterruptibly; import java.lang.reflect.InvocationTargetException; import java.net.UnknownHostException; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -303,9 +304,12 @@ public class CompactionCoordinator extends AbstractServer updateSummaries(); long now = System.currentTimeMillis(); - TIME_COMPACTOR_LAST_CHECKED.forEach((k, v) -> { -if ((now - v) > getMissingCompactorWarningTime()) { - LOG.warn("No compactors have checked in with coordinator for queue {} in {}ms", k, + + Map> idleCompactors = getIdleCompactors(); + TIME_COMPACTOR_LAST_CHECKED.forEach((queue, lastCheckTime) -> { +if ((now - lastCheckTime) > getMissingCompactorWarningTime() +&& QUEUE_SUMMARIES.isCompactionsQueued(queue) && idleCompactors.containsKey(queue)) { + LOG.warn("No compactors have checked in with coordinator for queue {} in {}ms", queue, getMissingCompactorWarningTime()); } }); @@ -321,6 +325,28 @@ public class CompactionCoordinator extends AbstractServer LOG.info("Shutting down"); } + private Map> getIdleCompactors() { + +Map> allCompactors = +ExternalCompactionUtil.getCompactorAddrs(getContext()); + +Set emptyQueues = new HashSet<>(); + +// Remove all of the compactors that are running a compaction +RUNNING_CACHE.values().forEach(rc -> { + List busyCompactors = allCompactors.get(rc.getQueueName()); + if (busyCompactors != null + && busyCompactors.remove(HostAndPort.fromString(rc.getCompactorAddress( { +if (busyCompactors.isEmpty()) { + emptyQueues.add(rc.getQueueName()); +} + } +}); +// Remove entries with empty queues +emptyQueues.forEach(e -> allCompactors.remove(e)); +return allCompactors; + } + private void updateSummaries() { ExecutorService executor = ThreadPools.getServerThreadPools().createFixedThreadPool(10, "Compaction Summary Gatherer", false); diff --git a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/QueueSummaries.java b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/QueueSummaries.java index 6edb2c0f36..1d89cd0321 100644 --- a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/QueueSummaries.java +++ b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/QueueSummaries.java @@ -100,6 +100,14 @@ public class QueueSummaries { } } + synchronized boolean isCompactionsQueued(String queue) { +var q = QUEUES.get(queue); +if (q == null) { + return false; +} +return !q.isEmpty(); + } + synchronized PrioTserver getNextTserver(String queue) { Entry> entry = getNextTserverEntry(queue); diff --git a/server/compaction-coordinator/src/test/java/org/apache/accumulo/coordinator/CompactionCoordinatorTest.java b/server/compaction-coordinator/src/test/java/org/apache/accumulo/coordinator/CompactionCoordinatorTest.java index 117d50108a..87e7471bef 100644 --- a/server/compaction-coordinator/src/test/java/org/apache/accumulo/coordinator/CompactionCoordinatorTest.java +++ b/server/compaction-coordinator/src/test/java/org/apache/accumulo/coordinator/CompactionCoordinatorTest.java @@ -214,6 +214,7 @@ public class CompactionCoordinatorTest { var coordinator = new TestCoordinator(null, null, null, null, context, null); // Should be equal to 3 * 15_000 milliseconds assertEquals(45_000,
(accumulo) branch elasticity updated: Update dead compaction detector to handle metadata/root (#4354)
This is an automated email from the ASF dual-hosted git repository. cshannon pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git The following commit(s) were added to refs/heads/elasticity by this push: new 22621ea501 Update dead compaction detector to handle metadata/root (#4354) 22621ea501 is described below commit 22621ea50190389131146b92546bf5b9c2ab47cf Author: Christopher L. Shannon AuthorDate: Wed Mar 13 11:43:02 2024 -0400 Update dead compaction detector to handle metadata/root (#4354) This updates DeadCompactionDetector to also look at the root and metadata table metadata for dead compactions on those tables This closes #4340 Co-authored-by: Keith Turner --- .../coordinator/CompactionCoordinator.java | 19 +- .../coordinator/DeadCompactionDetector.java| 20 +- .../test/compaction/ExternalCompaction_1_IT.java | 231 +++-- 3 files changed, 197 insertions(+), 73 deletions(-) diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java index 2a12f08708..97397c019b 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java @@ -20,6 +20,9 @@ package org.apache.accumulo.manager.compaction.coordinator; import static java.util.concurrent.TimeUnit.MINUTES; import static java.util.concurrent.TimeUnit.SECONDS; +import static java.util.stream.Collectors.groupingBy; +import static java.util.stream.Collectors.toList; +import static java.util.stream.Collectors.toMap; import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.COMPACTED; import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.ECOMP; import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.FILES; @@ -81,6 +84,7 @@ import org.apache.accumulo.core.metadata.CompactableFileImpl; import org.apache.accumulo.core.metadata.ReferencedTabletFile; import org.apache.accumulo.core.metadata.StoredTabletFile; import org.apache.accumulo.core.metadata.schema.Ample; +import org.apache.accumulo.core.metadata.schema.Ample.DataLevel; import org.apache.accumulo.core.metadata.schema.Ample.RejectionHandler; import org.apache.accumulo.core.metadata.schema.CompactionMetadata; import org.apache.accumulo.core.metadata.schema.ExternalCompactionId; @@ -577,7 +581,7 @@ public class CompactionCoordinator var dfv = metaJob.getTabletMetadata().getFilesMap().get(storedTabletFile); return new InputFile(storedTabletFile.getMetadata(), dfv.getSize(), dfv.getNumEntries(), dfv.getTime()); -}).collect(Collectors.toList()); +}).collect(toList()); FateInstanceType type = FateInstanceType.fromTableId(metaJob.getTabletMetadata().getTableId()); FateId fateId = FateId.from(type, 0); @@ -707,10 +711,19 @@ public class CompactionCoordinator KeyExtent fromThriftExtent = KeyExtent.fromThrift(extent); LOG.info("Compaction failed, id: {}, extent: {}", externalCompactionId, fromThriftExtent); final var ecid = ExternalCompactionId.of(externalCompactionId); -compactionFailed(Map.of(ecid, KeyExtent.fromThrift(extent))); +compactionsFailed(Map.of(ecid, KeyExtent.fromThrift(extent))); } - void compactionFailed(Map compactions) { + void compactionsFailed(Map compactions) { +// Need to process each level by itself because the conditional tablet mutator does not support +// mutating multiple data levels at the same time +compactions.entrySet().stream() +.collect(groupingBy(entry -> DataLevel.of(entry.getValue().tableId()), +Collectors.toMap(Entry::getKey, Entry::getValue))) +.forEach((level, compactionsByLevel) -> compactionFailedForLevel(compactionsByLevel)); + } + + void compactionFailedForLevel(Map compactions) { try (var tabletsMutator = ctx.getAmple().conditionallyMutateTablets()) { compactions.forEach((ecid, extent) -> { diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/DeadCompactionDetector.java b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/DeadCompactionDetector.java index 0857385239..87a4cef78b 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/DeadCompactionDetector.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/DeadCompactionDetector.java @@ -40,6 +40,7 @@ import org.apache.accumulo.core.fate.FateKey; import org.apache.accumulo.core.iterators.user.HasExternalCompactionsFilter; import
(accumulo) branch elasticity updated: Resolved elasticity TODOs in Manager (#4365)
This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git The following commit(s) were added to refs/heads/elasticity by this push: new 75b717ed5f Resolved elasticity TODOs in Manager (#4365) 75b717ed5f is described below commit 75b717ed5f0a7604d6cc25f0ce2a069d9335907b Author: Dave Marion AuthorDate: Wed Mar 13 08:28:15 2024 -0400 Resolved elasticity TODOs in Manager (#4365) Removed the TODO for the bulkImports as it is still being set by BulkImport V2 and referenced in the Monitor. Implemented the suggestion in the other TODO. Co-authored-by: Keith Turner --- .../src/main/java/org/apache/accumulo/manager/Manager.java| 11 --- .../java/org/apache/accumulo/manager/TabletGroupWatcher.java | 2 +- 2 files changed, 5 insertions(+), 8 deletions(-) diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java index 6758acfc1f..45ab2b5e2b 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java @@ -53,6 +53,7 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Predicate; import java.util.stream.Collectors; import org.apache.accumulo.core.Constants; @@ -226,7 +227,6 @@ public class Manager extends AbstractServer volatile SortedMap tserverStatusForBalancer = emptySortedMap(); volatile Map> tServerGroupingForBalancer = emptyMap(); - // ELASTICITY_TODO is this still needed? final ServerBulkImportStatus bulkImportStatus = new ServerBulkImportStatus(); private final AtomicBoolean managerInitialized = new AtomicBoolean(false); @@ -243,14 +243,11 @@ public class Manager extends AbstractServer return state; } - // ELASTICITIY_TODO it would be nice if this method could take DataLevel as an argument and only - // retrieve information about compactions in that data level. Attempted this and a lot of - // refactoring was needed to get that small bit of information to this method. Would be best to - // address this after issue. May be best to attempt this after #3576. - public Map> getCompactionHints() { + public Map> getCompactionHints(DataLevel level) { +Predicate tablePredicate = (tableId) -> DataLevel.of(tableId) == level; Map allConfig; try { - allConfig = CompactionConfigStorage.getAllConfig(getContext(), tableId -> true); + allConfig = CompactionConfigStorage.getAllConfig(getContext(), tablePredicate); } catch (InterruptedException | KeeperException e) { throw new RuntimeException(e); } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java b/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java index e7ea20413a..bd177c2deb 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java @@ -340,7 +340,7 @@ abstract class TabletGroupWatcher extends AccumuloDaemonThread { return new TabletManagementParameters(manager.getManagerState(), parentLevelUpgrade, manager.onlineTables(), tServersSnapshot, shutdownServers, manager.migrationsSnapshot(), -store.getLevel(), manager.getCompactionHints(), canSuspendTablets(), +store.getLevel(), manager.getCompactionHints(store.getLevel()), canSuspendTablets(), lookForTabletsNeedingVolReplacement ? manager.getContext().getVolumeReplacements() : Map.of()); }
(accumulo) branch elasticity updated: Removed special handling logic in TabletManagementIterator (#4363)
This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git The following commit(s) were added to refs/heads/elasticity by this push: new b4a2ac45b6 Removed special handling logic in TabletManagementIterator (#4363) b4a2ac45b6 is described below commit b4a2ac45b69f2b7b6f052e029354cfebb2e66b9a Author: Dave Marion AuthorDate: Wed Mar 13 07:38:47 2024 -0400 Removed special handling logic in TabletManagementIterator (#4363) Removed the logic that always returns the TabletMetadata when the Manager state is not normal, or there are no tablet servers, or no online tables. The code now just calls computeTabletManagementActions in all cases. Closes #4256 --- .../org/apache/accumulo/core/metadata/TabletState.java | 4 .../accumulo/server/manager/state/TabletGoalState.java | 3 +++ .../server/manager/state/TabletManagementIterator.java | 14 +- 3 files changed, 4 insertions(+), 17 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/TabletState.java b/core/src/main/java/org/apache/accumulo/core/metadata/TabletState.java index 9fcf8add3f..ba182514d1 100644 --- a/core/src/main/java/org/apache/accumulo/core/metadata/TabletState.java +++ b/core/src/main/java/org/apache/accumulo/core/metadata/TabletState.java @@ -21,14 +21,10 @@ package org.apache.accumulo.core.metadata; import java.util.Set; import org.apache.accumulo.core.metadata.schema.TabletMetadata; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; public enum TabletState { UNASSIGNED, ASSIGNED, HOSTED, ASSIGNED_TO_DEAD_SERVER, SUSPENDED; - private static Logger log = LoggerFactory.getLogger(TabletState.class); - public static TabletState compute(TabletMetadata tm, Set liveTServers) { TabletMetadata.Location current = null; TabletMetadata.Location future = null; diff --git a/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletGoalState.java b/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletGoalState.java index 81e796608c..0b9b83f159 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletGoalState.java +++ b/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletGoalState.java @@ -103,6 +103,9 @@ public enum TabletGoalState { if (!tm.getHostingRequested()) { return UNASSIGNED; } +break; + default: +break; } } diff --git a/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementIterator.java b/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementIterator.java index 2e6627c78e..39329b0e42 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementIterator.java +++ b/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementIterator.java @@ -43,7 +43,6 @@ import org.apache.accumulo.core.iterators.SortedKeyValueIterator; import org.apache.accumulo.core.iterators.user.WholeRowIterator; import org.apache.accumulo.core.manager.state.TabletManagement; import org.apache.accumulo.core.manager.state.TabletManagement.ManagementAction; -import org.apache.accumulo.core.manager.thrift.ManagerState; import org.apache.accumulo.core.metadata.TabletState; import org.apache.accumulo.core.metadata.schema.TabletMetadata; import org.apache.accumulo.core.metadata.schema.TabletOperationType; @@ -200,18 +199,7 @@ public class TabletManagementIterator extends SkippingIterator { Exception error = null; try { LOG.trace("Evaluating extent: {}", tm); -if (tm.getExtent().isMeta()) { - computeTabletManagementActions(tm, actions); -} else { - if (tabletMgmtParams.getManagerState() != ManagerState.NORMAL - || tabletMgmtParams.getOnlineTsevers().isEmpty() - || tabletMgmtParams.getOnlineTables().isEmpty()) { -// when manager is in the process of starting up or shutting down return everything. -actions.add(ManagementAction.NEEDS_LOCATION_UPDATE); - } else { -computeTabletManagementActions(tm, actions); - } -} +computeTabletManagementActions(tm, actions); } catch (Exception e) { LOG.error("Error computing tablet management actions for extent: {}", tm.getExtent(), e); error = e;