This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch 1451-external-compactions-feature in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/1451-external-compactions-feature by this push: new 7551e7c Add missing class, addd Property and set it to lower value in IT to run faster 7551e7c is described below commit 7551e7c09fff294eabe0ed3b085dd6df26a543db Author: Dave Marion <dlmar...@apache.org> AuthorDate: Mon May 10 14:17:52 2021 +0000 Add missing class, addd Property and set it to lower value in IT to run faster --- .../org/apache/accumulo/core/conf/Property.java | 3 ++ .../util/compaction/CompactionExecutorIdImpl.java | 51 ++++++++++++++++++++++ .../coordinator/CompactionCoordinator.java | 13 +++--- .../coordinator/CompactionCoordinatorTest.java | 3 ++ .../apache/accumulo/test/ExternalCompactionIT.java | 1 + 5 files changed, 66 insertions(+), 5 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java index 130e10c..5dba145 100644 --- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java +++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java @@ -1062,6 +1062,9 @@ public enum Property { COORDINATOR_FINALIZER_COMPLETION_CHECK_INTERVAL("coordinator.server.finalizer.check.interval", "60s", PropertyType.TIMEDURATION, "The interval at which to check for external compaction final state markers in the metadata table."), + COORDINATOR_TSERVER_COMPACTION_CHECK_INTERVAL( + "coordinator.server.tserver.compaction.check.interval", "1m", PropertyType.TIMEDURATION, + "The interval at which to check the tservers for external compactions."), // deprecated properties grouped at the end to reference property that replaces them @Deprecated(since = "1.6.0") @ReplacedBy(property = INSTANCE_VOLUMES) diff --git a/core/src/main/java/org/apache/accumulo/core/util/compaction/CompactionExecutorIdImpl.java b/core/src/main/java/org/apache/accumulo/core/util/compaction/CompactionExecutorIdImpl.java new file mode 100644 index 0000000..fa205b4 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/util/compaction/CompactionExecutorIdImpl.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.core.util.compaction; + +import org.apache.accumulo.core.spi.compaction.CompactionExecutorId; +import org.apache.accumulo.core.spi.compaction.CompactionServiceId; + +import com.google.common.base.Preconditions; + +public class CompactionExecutorIdImpl extends CompactionExecutorId { + + protected CompactionExecutorIdImpl(String canonical) { + super(canonical); + } + + private static final long serialVersionUID = 1L; + + public boolean isExernalId() { + return canonical().startsWith("e."); + } + + public String getExernalName() { + Preconditions.checkState(isExernalId()); + return canonical().substring("e.".length()); + } + + public static CompactionExecutorId internalId(CompactionServiceId csid, String executorName) { + return new CompactionExecutorIdImpl("i." + csid + "." + executorName); + } + + public static CompactionExecutorId externalId(String executorName) { + return new CompactionExecutorIdImpl("e." + executorName); + } + +} diff --git a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java index b7dffb1..309c70a 100644 --- a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java +++ b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java @@ -88,8 +88,7 @@ public class CompactionCoordinator extends AbstractServer LiveTServerSet.Listener { private static final Logger LOG = LoggerFactory.getLogger(CompactionCoordinator.class); - private static final long TIME_BETWEEN_CHECKS = 5000; - public static final long TSERVER_CHECK_INTERVAL = 60000; + private static final long TIME_BETWEEN_GC_CHECKS = 5000; private static final long FIFTEEN_MINUTES = TimeUnit.MILLISECONDS.convert(Duration.of(15, TimeUnit.MINUTES.toChronoUnit())); @@ -155,7 +154,7 @@ public class CompactionCoordinator extends AbstractServer protected void startGCLogger(ScheduledThreadPoolExecutor schedExecutor) { schedExecutor.scheduleWithFixedDelay(() -> gcLogger.logGCInfo(getConfiguration()), 0, - TIME_BETWEEN_CHECKS, TimeUnit.MILLISECONDS); + TIME_BETWEEN_GC_CHECKS, TimeUnit.MILLISECONDS); } private void startCompactionCleaner(ScheduledThreadPoolExecutor schedExecutor) { @@ -365,7 +364,7 @@ public class CompactionCoordinator extends AbstractServer } tserverSet.startListeningForTabletServerChanges(); - new DeadCompactionDetector(getContext(), compactionFinalizer, schedExecutor).start(); + startDeadCompactionDetector(); LOG.info("Starting loop to check tservers for compaction summaries"); while (!shutdown) { @@ -416,12 +415,16 @@ public class CompactionCoordinator extends AbstractServer LOG.info("Shutting down"); } + protected void startDeadCompactionDetector() { + new DeadCompactionDetector(getContext(), compactionFinalizer, schedExecutor).start(); + } + protected long getMissingCompactorWarningTime() { return FIFTEEN_MINUTES; } protected long getTServerCheckInterval() { - return TSERVER_CHECK_INTERVAL; + return this.aconf.getTimeInMillis(Property.COORDINATOR_TSERVER_COMPACTION_CHECK_INTERVAL); } protected TabletMetadata getMetadataEntryForExtent(KeyExtent extent) { diff --git a/server/compaction-coordinator/src/test/java/org/apache/accumulo/coordinator/CompactionCoordinatorTest.java b/server/compaction-coordinator/src/test/java/org/apache/accumulo/coordinator/CompactionCoordinatorTest.java index 172bed2..3dd3ecd 100644 --- a/server/compaction-coordinator/src/test/java/org/apache/accumulo/coordinator/CompactionCoordinatorTest.java +++ b/server/compaction-coordinator/src/test/java/org/apache/accumulo/coordinator/CompactionCoordinatorTest.java @@ -101,6 +101,9 @@ public class CompactionCoordinatorTest { } @Override + protected void startDeadCompactionDetector() {} + + @Override protected long getTServerCheckInterval() { this.shutdown = true; return 0L; diff --git a/test/src/main/java/org/apache/accumulo/test/ExternalCompactionIT.java b/test/src/main/java/org/apache/accumulo/test/ExternalCompactionIT.java index 8ae648d..141b6cc 100644 --- a/test/src/main/java/org/apache/accumulo/test/ExternalCompactionIT.java +++ b/test/src/main/java/org/apache/accumulo/test/ExternalCompactionIT.java @@ -145,6 +145,7 @@ public class ExternalCompactionIT extends ConfigurableMacBase { cfg.setProperty("tserver.compaction.major.service.cs2.planner.opts.executors", "[{'name':'all','externalQueue':'DCQ2'}]"); cfg.setProperty(Property.COORDINATOR_DEAD_COMPACTOR_CHECK_INTERVAL.getKey(), "30s"); + cfg.setProperty(Property.COORDINATOR_TSERVER_COMPACTION_CHECK_INTERVAL, "10s"); // use raw local file system so walogs sync and flush will work hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName()); }