This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch 1451-external-compactions-feature in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/1451-external-compactions-feature by this push: new 2357912 re #1451: wip on coordinator 2357912 is described below commit 235791246bf9c888898f6a15ddd24f94197144c9 Author: Dave Marion <dlmar...@apache.org> AuthorDate: Tue Mar 2 22:16:04 2021 +0000 re #1451: wip on coordinator --- .../java/org/apache/accumulo/core/Constants.java | 3 + .../org/apache/accumulo/core/conf/Property.java | 16 +- pom.xml | 1 + .../accumulo/server/manager/LiveTServerSet.java | 4 + server/compaction-coordinator/.gitignore | 28 ++ server/compaction-coordinator/pom.xml | 34 ++ .../coordinator/CompactionCoordinator.java | 518 +++++++++++++++++++++ 7 files changed, 603 insertions(+), 1 deletion(-) diff --git a/core/src/main/java/org/apache/accumulo/core/Constants.java b/core/src/main/java/org/apache/accumulo/core/Constants.java index 96b8b43..70353f2 100644 --- a/core/src/main/java/org/apache/accumulo/core/Constants.java +++ b/core/src/main/java/org/apache/accumulo/core/Constants.java @@ -59,6 +59,9 @@ public class Constants { public static final String ZTSERVERS = "/tservers"; + public static final String ZCOORDINATOR = "/coordinators"; + public static final String ZCOORDINATOR_LOCK = "/coordinators/lock"; + public static final String ZDEAD = "/dead"; public static final String ZDEADTSERVERS = ZDEAD + "/tservers"; diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java index 04015e3..f05794f 100644 --- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java +++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java @@ -1024,7 +1024,21 @@ public enum Property { REPLICATION_RPC_TIMEOUT("replication.rpc.timeout", "2m", PropertyType.TIMEDURATION, "Amount of time for a single replication RPC call to last before failing" + " the attempt. See replication.work.attempts."), - // deprecated properties grouped at the end to reference property that replaces them + // CompactionCoordinator properties + COORDINATOR_PREFIX("coordinator.", null, PropertyType.PREFIX, + "Properties in this category affect the behavior of the accumulo compaction coordinator server."), + COORDINATOR_PORTSEARCH("coordinator.port.search", "false", PropertyType.BOOLEAN, + "if the ports above are in use, search higher ports until one is available"), + COORDINATOR_CLIENTPORT("coordinator.port.client", "9101", PropertyType.PORT, + "The port used for handling client connections on the compactor servers"), + COORDINATOR_MINTHREADS("coordinator.server.threads.minimum", "1", PropertyType.COUNT, + "The minimum number of threads to use to handle incoming requests."), + COORDINATOR_MINTHREADS_TIMEOUT("coordinator.server.threads.timeout", "0s", PropertyType.TIMEDURATION, + "The time after which incoming request threads terminate with no work available. Zero (0) will keep the threads alive indefinitely."), + COORDINATOR_THREADCHECK("coordinator.server.threadcheck.time", "1s", PropertyType.TIMEDURATION, + "The time between adjustments of the server thread pool."), + COORDINATOR_MAX_MESSAGE_SIZE("coordinator.server.message.size.max", "10M", PropertyType.BYTES, + "The maximum size of a message that can be sent to a tablet server."), // deprecated properties grouped at the end to reference property that replaces them @Deprecated(since = "1.6.0") @ReplacedBy(property = INSTANCE_VOLUMES) INSTANCE_DFS_URI("instance.dfs.uri", "", PropertyType.URI, diff --git a/pom.xml b/pom.xml index 0eb0c5c..d2bd521 100644 --- a/pom.xml +++ b/pom.xml @@ -87,6 +87,7 @@ <module>iterator-test-harness</module> <module>minicluster</module> <module>server/base</module> + <module>server/compaction-coordinator</module> <module>server/gc</module> <module>server/manager</module> <module>server/master</module> diff --git a/server/base/src/main/java/org/apache/accumulo/server/manager/LiveTServerSet.java b/server/base/src/main/java/org/apache/accumulo/server/manager/LiveTServerSet.java index 8e85250..b318835 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/manager/LiveTServerSet.java +++ b/server/base/src/main/java/org/apache/accumulo/server/manager/LiveTServerSet.java @@ -77,6 +77,10 @@ public class LiveTServerSet implements Watcher { public TServerConnection(HostAndPort addr) { address = addr; } + + public HostAndPort getAddress() { + return address; + } private String lockString(ZooLock mlock) { return mlock.getLockID().serialize(context.getZooKeeperRoot() + Constants.ZMANAGER_LOCK); diff --git a/server/compaction-coordinator/.gitignore b/server/compaction-coordinator/.gitignore new file mode 100644 index 0000000..e77a822 --- /dev/null +++ b/server/compaction-coordinator/.gitignore @@ -0,0 +1,28 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Maven ignores +/target/ + +# IDE ignores +/.settings/ +/.project +/.classpath +/.pydevproject +/.idea +/*.iml +/nbproject/ +/nbactions.xml +/nb-configuration.xml diff --git a/server/compaction-coordinator/pom.xml b/server/compaction-coordinator/pom.xml new file mode 100644 index 0000000..9f25e5d --- /dev/null +++ b/server/compaction-coordinator/pom.xml @@ -0,0 +1,34 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.accumulo</groupId> + <artifactId>accumulo-project</artifactId> + <version>2.1.0-SNAPSHOT</version> + <relativePath>../../pom.xml</relativePath> + </parent> + <artifactId>accumulo-compaction-coordinator</artifactId> + <name>Apache Accumulo Compaction Coordinator</name> + <dependencies> + <dependency> + <groupId>org.apache.accumulo</groupId> + <artifactId>accumulo-server-base</artifactId> + </dependency> + </dependencies> +</project> \ No newline at end of file diff --git a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java new file mode 100644 index 0000000..73670c0 --- /dev/null +++ b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java @@ -0,0 +1,518 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.coordinator; + +import java.net.UnknownHostException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.TreeMap; +import java.util.UUID; +import java.util.WeakHashMap; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; + +import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.clientImpl.ThriftTransportPool; +import org.apache.accumulo.core.compaction.thrift.CompactionState; +import org.apache.accumulo.core.compaction.thrift.Status; +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.dataImpl.thrift.CompactionStats; +import org.apache.accumulo.core.dataImpl.thrift.TKeyExtent; +import org.apache.accumulo.core.metadata.TServerInstance; +import org.apache.accumulo.core.rpc.ThriftUtil; +import org.apache.accumulo.core.tabletserver.thrift.CompactionJob; +import org.apache.accumulo.core.tabletserver.thrift.CompactionQueueSummary; +import org.apache.accumulo.core.tabletserver.thrift.TabletClientService; +import org.apache.accumulo.core.trace.TraceUtil; +import org.apache.accumulo.core.util.Halt; +import org.apache.accumulo.core.util.HostAndPort; +import org.apache.accumulo.core.util.Pair; +import org.apache.accumulo.core.util.threads.ThreadPools; +import org.apache.accumulo.fate.util.UtilWaitThread; +import org.apache.accumulo.fate.zookeeper.ZooLock; +import org.apache.accumulo.fate.zookeeper.ZooLock.LockLossReason; +import org.apache.accumulo.server.AbstractServer; +import org.apache.accumulo.server.GarbageCollectionLogger; +import org.apache.accumulo.server.ServerContext; +import org.apache.accumulo.server.ServerOpts; +import org.apache.accumulo.server.manager.LiveTServerSet; +import org.apache.accumulo.server.manager.LiveTServerSet.TServerConnection; +import org.apache.accumulo.server.rpc.ServerAddress; +import org.apache.accumulo.server.rpc.TCredentialsUpdatingWrapper; +import org.apache.accumulo.server.rpc.TServerUtils; +import org.apache.accumulo.server.rpc.ThriftServerType; +import org.apache.thrift.TException; +import org.apache.thrift.transport.TTransport; +import org.apache.thrift.transport.TTransportException; +import org.apache.zookeeper.KeeperException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class CompactionCoordinator extends AbstractServer implements + org.apache.accumulo.core.compaction.thrift.CompactionCoordinator.Iface, LiveTServerSet.Listener { + + private static class QueueAndPriority implements Comparable<QueueAndPriority> { + + private static WeakHashMap<Pair<String,Long>, QueueAndPriority> CACHE = new WeakHashMap<>(); + + public static QueueAndPriority get(String queue, Long priority) { + return CACHE.putIfAbsent(new Pair<>(queue, priority), new QueueAndPriority(queue, priority)); + } + + private final String queue; + private final Long priority; + + private QueueAndPriority(String queue, Long priority) { + super(); + this.queue = queue; + this.priority = priority; + } + + public String getQueue() { + return queue; + } + + public Long getPriority() { + return priority; + } + + + @Override + public int hashCode() { + return queue.hashCode() + priority.hashCode(); + } + + @Override + public String toString() { + StringBuilder buf = new StringBuilder(); + buf.append("queue: ").append(queue); + buf.append(", priority: ").append(priority); + return buf.toString(); + } + + @Override + public boolean equals(Object obj) { + if (null == obj) + return false; + if (obj == this) + return true; + if (!(obj instanceof QueueAndPriority)) { + return false; + } else { + QueueAndPriority other = (QueueAndPriority) obj; + return this.queue.equals(other.queue) && this.priority.equals(other.priority); + } + } + + @Override + public int compareTo(QueueAndPriority other) { + int result = this.queue.compareTo(other.queue); + if (result == 0) { + // reversing order such that if other priority is lower, then this has a higher priority + return Long.compare(other.priority, this.priority); + } else { + return result; + } + } + + } + + private static class CoordinatorLockWatcher implements ZooLock.AccumuloLockWatcher { + + @Override + public void lostLock(LockLossReason reason) { + Halt.halt("Coordinator lock in zookeeper lost (reason = " + reason + "), exiting!", -1); + } + + @Override + public void unableToMonitorLockNode(final Exception e) { + // ACCUMULO-3651 Changed level to error and added FATAL to message for slf4j compatibility + Halt.halt(-1, () -> LOG.error("FATAL: No longer able to monitor Coordinator lock node", e)); + + } + + @Override + public synchronized void acquiredLock() { + // This is overridden by the LockWatcherWrapper in ZooLock.tryLock() + } + + @Override + public synchronized void failedToAcquireLock(Exception e) { + // This is overridden by the LockWatcherWrapper in ZooLock.tryLock() + } + + } + + /** + * Utility for returning the address in the form host:port + * + * @return host and port for Compactor client connections + */ + private static String getHostPortString(HostAndPort address) { + if (address == null) { + return null; + } + return address.getHost() + ":" + address.getPort(); + } + + private static class CompactionUpdate { + private final Long timestamp; + private final String message; + private final CompactionState state; + public CompactionUpdate(Long timestamp, String message, CompactionState state) { + super(); + this.timestamp = timestamp; + this.message = message; + this.state = state; + } + public Long getTimestamp() { + return timestamp; + } + public String getMessage() { + return message; + } + public CompactionState getState() { + return state; + } + } + + private static class RunningCompaction { + private final CompactionJob job; + private final String compactor; + private final TServerInstance tserver; + private Map<Long, CompactionUpdate> updates = new TreeMap<>(); + private CompactionStats stats = null; + public RunningCompaction(CompactionJob job, String compactor, TServerInstance tserver) { + super(); + this.job = job; + this.compactor = compactor; + this.tserver = tserver; + } + public Map<Long,CompactionUpdate> getUpdates() { + return updates; + } + public void addUpdate(Long timestamp, String message, CompactionState state) { + this.updates.put(timestamp, new CompactionUpdate(timestamp, message, state)); + } + public void setUpdates(Map<Long,CompactionUpdate> updates) { + this.updates = updates; + } + public CompactionStats getStats() { + return stats; + } + public void setStats(CompactionStats stats) { + this.stats = stats; + } + public CompactionJob getJob() { + return job; + } + public String getCompactor() { + return compactor; + } + public TServerInstance getTserver() { + return tserver; + } + } + + private static final Logger LOG = LoggerFactory.getLogger(CompactionCoordinator.class); + private static final long TIME_BETWEEN_CHECKS = 5000; + + /* Map of external queue name -> priority -> tservers */ + private static final Map<String, TreeMap<Long, LinkedHashSet<TServerInstance>>> QUEUES = new HashMap<>(); + /* index of tserver to queue and priority, exists to provide O(1) lookup into QUEUES */ + private static final Map<TServerInstance, HashSet<QueueAndPriority>> INDEX = new HashMap<>(); + /* Map of compactionId to RunningCompactions */ + private static final Map<Long, RunningCompaction> RUNNING = new ConcurrentHashMap<>(); + + private final GarbageCollectionLogger gcLogger = new GarbageCollectionLogger(); + private final AccumuloConfiguration aconf; + + private ZooLock coordinatorLock; + private LiveTServerSet tserverSet; + + protected CompactionCoordinator(ServerOpts opts, String[] args) { + super("compaction-coordinator", opts, args); + ServerContext context = super.getContext(); + context.setupCrypto(); + + aconf = getConfiguration(); + ThreadPools.createGeneralScheduledExecutorService(aconf).scheduleWithFixedDelay( + () -> gcLogger.logGCInfo(getConfiguration()), 0, TIME_BETWEEN_CHECKS, + TimeUnit.MILLISECONDS); + LOG.info("Version " + Constants.VERSION); + LOG.info("Instance " + context.getInstanceID()); + } + + /** + * Set up nodes and locks in ZooKeeper for this CompactionCoordinator + * + * @param clientAddress + * address of this Compactor + * @return true if lock was acquired, else false + * @throws KeeperException + * @throws InterruptedException + */ + private boolean getCoordinatorLock(HostAndPort clientAddress) + throws KeeperException, InterruptedException { + LOG.info("trying to get coordinator lock"); + + final String coordinatorClientAddress = getHostPortString(clientAddress); + final String lockPath = getContext().getZooKeeperRoot() + Constants.ZCOORDINATOR_LOCK; + final UUID zooLockUUID = UUID.randomUUID(); + + CoordinatorLockWatcher managerLockWatcher = new CoordinatorLockWatcher(); + coordinatorLock = new ZooLock(getContext().getSiteConfiguration(), lockPath, zooLockUUID); + return coordinatorLock.tryLock(managerLockWatcher, coordinatorClientAddress.getBytes()); + } + + /** + * Start this CompactionCoordinator thrift service to handle incoming client requests + * + * @return address of this CompactionCoordinator client service + * @throws UnknownHostException + */ + private ServerAddress startCompactorClientService() throws UnknownHostException { + CompactionCoordinator rpcProxy = TraceUtil.wrapService(this); + final org.apache.accumulo.core.compaction.thrift.CompactionCoordinator.Processor<CompactionCoordinator> processor; + if (getContext().getThriftServerType() == ThriftServerType.SASL) { + CompactionCoordinator tcredProxy = + TCredentialsUpdatingWrapper.service(rpcProxy, CompactionCoordinator.class, getConfiguration()); + processor = new org.apache.accumulo.core.compaction.thrift.CompactionCoordinator.Processor<>(tcredProxy); + } else { + processor = new org.apache.accumulo.core.compaction.thrift.CompactionCoordinator.Processor<>(rpcProxy); + } + Property maxMessageSizeProperty = (aconf.get(Property.COORDINATOR_MAX_MESSAGE_SIZE) != null + ? Property.COORDINATOR_MAX_MESSAGE_SIZE : Property.GENERAL_MAX_MESSAGE_SIZE); + ServerAddress sp = TServerUtils.startServer(getMetricsSystem(), getContext(), getHostname(), + Property.COORDINATOR_CLIENTPORT, processor, this.getClass().getSimpleName(), + "Thrift Client Server", Property.COORDINATOR_PORTSEARCH, Property.COORDINATOR_MINTHREADS, + Property.COORDINATOR_MINTHREADS_TIMEOUT, Property.COORDINATOR_THREADCHECK, + maxMessageSizeProperty); + LOG.info("address = {}", sp.address); + return sp; + } + + @Override + public void run() { + + ServerAddress coordinatorAddress = null; + try { + coordinatorAddress = startCompactorClientService(); + } catch (UnknownHostException e1) { + throw new RuntimeException("Failed to start the coordinator service", e1); + } + final HostAndPort clientAddress = coordinatorAddress.address; + + try { + if (!getCoordinatorLock(clientAddress)) { + throw new RuntimeException("Unable to get Coordinator lock."); + } + } catch (KeeperException | InterruptedException e) { + throw new IllegalStateException("Exception getting Coordinator lock", e); + } + + tserverSet = new LiveTServerSet(getContext(), this); + + // TODO: On initial startup contact all running tservers to get information about the compactions + // that are current running in external queues to populate the RUNNING map. This is to handle + // the case where the coordinator dies or is restarted at runtime + + tserverSet.startListeningForTabletServerChanges(); + + while (true) { + tserverSet.getCurrentServers().forEach(tsi -> { + try { + synchronized(QUEUES) { + TabletClientService.Client client = getTabletServerConnection(tsi); + List<CompactionQueueSummary> summaries = client.getCompactionQueueInfo(); + summaries.forEach(summary -> { + QueueAndPriority qp = QueueAndPriority.get(summary.getQueue().intern(), summary.getPriority()); + QUEUES.putIfAbsent(qp.getQueue(), new TreeMap<>()).putIfAbsent(qp.getPriority(), new LinkedHashSet<>()).add(tsi); + INDEX.putIfAbsent(tsi, new HashSet<>()).add(qp); + }); + } + } catch (TException e) { + LOG.warn("Error getting compaction summaries from tablet server: {}", tsi.getHostAndPort(), e); + } + }); + UtilWaitThread.sleep(60000); + } + + } + + + /** + * Callback for the LiveTServerSet object to update us current set of tablet servers, including + * ones that were deleted and added + * + * @param current current set of live tservers + * @param deleted set of tservers that were removed from current since last update + * @param added set of tservers that were added to current since last update + */ + @Override + public void update(LiveTServerSet current, Set<TServerInstance> deleted, Set<TServerInstance> added) { + + // run() will iterate over the current and added tservers and add them to the internal + // data structures. For tservers that are deleted, we need to remove them from the + // internal data structures + synchronized (QUEUES) { + deleted.forEach(tsi -> { + INDEX.get(tsi).forEach(qp -> { + TreeMap<Long, LinkedHashSet<TServerInstance>> m = QUEUES.get(qp.getQueue()); + if (null != m) { + LinkedHashSet<TServerInstance> tservers = m.get(qp.getPriority()); + if (null != tservers) { + tservers.remove(tsi); + } + } + }); + INDEX.remove(tsi); + }); + } + } + + /** + * Return the next compaction job for the queue to a Compactor + * + * @param queueName queue + * @param compactor compactor address + * @return compaction job + */ + @Override + public CompactionJob getCompactionJob(String queueName, String compactor) throws TException { + String queue = queueName.intern(); + TServerInstance tserver = null; + Long priority = null; + synchronized(QUEUES) { + TreeMap<Long, LinkedHashSet<TServerInstance>> m = QUEUES.get(queueName.intern()); + if (null != m) { + while (tserver == null) { + // Get the first TServerInstance from the highest priority queue + Entry<Long, LinkedHashSet<TServerInstance>> entry = m.firstEntry(); + priority = entry.getKey(); + LinkedHashSet<TServerInstance> tservers = entry.getValue(); + if (null == tservers || m.isEmpty()) { + // Clean up the map entry when no tservers for this queue and priority + m.remove(entry.getKey(), entry.getValue()); + continue; + } else { + tserver = tservers.iterator().next(); + // Remove the tserver from the list, we are going to run a compaction on this server + tservers.remove(tserver); + if (tservers.size() == 0) { + // Clean up the map entry when no tservers remaining for this queue and priority + m.remove(entry.getKey(), entry.getValue()); + } + HashSet<QueueAndPriority> qp = INDEX.get(tserver); + qp.remove(QueueAndPriority.get(queue, priority)); + if (qp.size() == 0) { + // Remove the tserver from the index + INDEX.remove(tserver); + } + break; + } + } + } else { + return (CompactionJob) null; //TODO: or should we thrown an error for no tserver for this queue? + } + } + + try { + TabletClientService.Client client = getTabletServerConnection(tserver); + CompactionJob job = client.reserveCompactionJob(queue, priority, compactor); + RUNNING.put(job.getCompactionId(), new RunningCompaction(job, compactor, tserver)); + return job; + } catch (TException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } + + private TabletClientService.Client getTabletServerConnection(TServerInstance tserver) throws TTransportException { + TServerConnection connection = tserverSet.getConnection(tserver); + TTransport transport = ThriftTransportPool.getInstance().getTransport(connection.getAddress(), 0, getContext()); + return ThriftUtil.createClient(new TabletClientService.Client.Factory(), transport); + } + + + @Override + public void cancelCompaction(TKeyExtent extent, String queueName, long priority) + throws TException { + // TODO Auto-generated method stub + + } + + @Override + public List<Status> getCompactionStatus(TKeyExtent extent, String queueName, long priority) + throws TException { + // TODO Auto-generated method stub + return null; + } + + /** + * Compactor calls compactionCompleted passing in the CompactionStats + * + * @param job compaction job + * @param stats compaction stats + */ + @Override + public void compactionCompleted(CompactionJob job, CompactionStats stats) throws TException { + RunningCompaction rc = RUNNING.get(job.getCompactionId()); + if (null != rc) { + rc.setStats(stats); + } + // TODO: What happens if tserver is no longer hosting tablet? I wonder if we should not notify + // the tserver that the compaction has finished and instead let the tserver that is hosting the + // tablet poll for state updates. That way if the tablet is re-hosted, the tserver can check as + // part of the tablet loading process. This would also enable us to remove the running compaction + // from RUNNING when the tserver makes the call and gets the stats. + TabletClientService.Client client = getTabletServerConnection(rc.getTserver()); + client.compactionJobFinished(rc.getJob()); + } + + /** + * Compactor calls to update the status of the assigned compaction + * + * @param job compaction job + * @param state compaction state + * @param message informational message + * @param timestamp timestamp of the message + */ + @Override + public void updateCompactionStatus(CompactionJob job, CompactionState state, + String message, long timestamp) throws TException { + RunningCompaction rc = RUNNING.get(job.getCompactionId()); + if (null != rc) { + rc.addUpdate(timestamp, message, state); + } + } + + public static void main(String[] args) throws Exception { + try (CompactionCoordinator compactor = new CompactionCoordinator(new ServerOpts(), args)) { + compactor.runServer(); + } + } + +}