This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch 1451-external-compactions-feature in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/1451-external-compactions-feature by this push: new c478645 Handle splits, merges, and canceling of user compactions (all untested) c478645 is described below commit c478645f984ca0eee8533073d545a20ced29a77d Author: Keith Turner <ktur...@apache.org> AuthorDate: Fri Apr 23 19:54:40 2021 -0400 Handle splits, merges, and canceling of user compactions (all untested) --- .../accumulo/core/metadata/schema/AmpleImpl.java | 4 + .../thrift/TExternalCompactionJob.java | 104 ++++++++++++++- core/src/main/thrift/tabletserver.thrift | 2 +- .../accumulo/server/util/MetadataTableUtil.java | 7 +- .../accumulo/compactor/CompactionEnvironment.java | 11 +- .../accumulo/compactor/CompactionJobHolder.java | 27 ++-- .../org/apache/accumulo/compactor/Compactor.java | 145 ++++++++++----------- .../apache/accumulo/compactor/CompactorTest.java | 3 +- .../accumulo/manager/TabletGroupWatcher.java | 16 ++- .../compactions/ExternalCompactionExecutor.java | 4 +- .../tserver/compactions/ExternalCompactionJob.java | 18 ++- .../accumulo/tserver/tablet/CompactableImpl.java | 3 +- .../org/apache/accumulo/tserver/tablet/Tablet.java | 34 ++++- .../accumulo/test/functional/SplitRecoveryIT.java | 3 +- 14 files changed, 264 insertions(+), 117 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/AmpleImpl.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/AmpleImpl.java index 863a6eb..b8cc498 100644 --- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/AmpleImpl.java +++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/AmpleImpl.java @@ -18,6 +18,8 @@ */ package org.apache.accumulo.core.metadata.schema; +import java.util.NoSuchElementException; + import org.apache.accumulo.core.client.AccumuloClient; import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType; @@ -43,6 +45,8 @@ public class AmpleImpl implements Ample { try (TabletsMetadata tablets = builder.build()) { return Iterables.getOnlyElement(tablets); + } catch (NoSuchElementException e) { + return null; } } diff --git a/core/src/main/thrift-gen-java/org/apache/accumulo/core/tabletserver/thrift/TExternalCompactionJob.java b/core/src/main/thrift-gen-java/org/apache/accumulo/core/tabletserver/thrift/TExternalCompactionJob.java index 3345f6a..a89f563 100644 --- a/core/src/main/thrift-gen-java/org/apache/accumulo/core/tabletserver/thrift/TExternalCompactionJob.java +++ b/core/src/main/thrift-gen-java/org/apache/accumulo/core/tabletserver/thrift/TExternalCompactionJob.java @@ -40,6 +40,7 @@ public class TExternalCompactionJob implements org.apache.thrift.TBase<TExternal private static final org.apache.thrift.protocol.TField OUTPUT_FILE_FIELD_DESC = new org.apache.thrift.protocol.TField("outputFile", org.apache.thrift.protocol.TType.STRING, (short)10); private static final org.apache.thrift.protocol.TField PROPAGATE_DELETES_FIELD_DESC = new org.apache.thrift.protocol.TField("propagateDeletes", org.apache.thrift.protocol.TType.BOOL, (short)11); private static final org.apache.thrift.protocol.TField KIND_FIELD_DESC = new org.apache.thrift.protocol.TField("kind", org.apache.thrift.protocol.TType.I32, (short)12); + private static final org.apache.thrift.protocol.TField USER_COMPACTION_ID_FIELD_DESC = new org.apache.thrift.protocol.TField("userCompactionId", org.apache.thrift.protocol.TType.I64, (short)13); private static final org.apache.thrift.scheme.SchemeFactory STANDARD_SCHEME_FACTORY = new TExternalCompactionJobStandardSchemeFactory(); private static final org.apache.thrift.scheme.SchemeFactory TUPLE_SCHEME_FACTORY = new TExternalCompactionJobTupleSchemeFactory(); @@ -64,6 +65,7 @@ public class TExternalCompactionJob implements org.apache.thrift.TBase<TExternal public @org.apache.thrift.annotation.Nullable java.lang.String outputFile; // required public boolean propagateDeletes; // required public @org.apache.thrift.annotation.Nullable TCompactionKind kind; // required + public long userCompactionId; // required /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */ public enum _Fields implements org.apache.thrift.TFieldIdEnum { @@ -86,7 +88,8 @@ public class TExternalCompactionJob implements org.apache.thrift.TBase<TExternal REASON((short)9, "reason"), OUTPUT_FILE((short)10, "outputFile"), PROPAGATE_DELETES((short)11, "propagateDeletes"), - KIND((short)12, "kind"); + KIND((short)12, "kind"), + USER_COMPACTION_ID((short)13, "userCompactionId"); private static final java.util.Map<java.lang.String, _Fields> byName = new java.util.HashMap<java.lang.String, _Fields>(); @@ -126,6 +129,8 @@ public class TExternalCompactionJob implements org.apache.thrift.TBase<TExternal return PROPAGATE_DELETES; case 12: // KIND return KIND; + case 13: // USER_COMPACTION_ID + return USER_COMPACTION_ID; default: return null; } @@ -171,6 +176,7 @@ public class TExternalCompactionJob implements org.apache.thrift.TBase<TExternal private static final int __READRATE_ISSET_ID = 1; private static final int __WRITERATE_ISSET_ID = 2; private static final int __PROPAGATEDELETES_ISSET_ID = 3; + private static final int __USERCOMPACTIONID_ISSET_ID = 4; private byte __isset_bitfield = 0; public static final java.util.Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap; static { @@ -200,6 +206,8 @@ public class TExternalCompactionJob implements org.apache.thrift.TBase<TExternal new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.BOOL))); tmpMap.put(_Fields.KIND, new org.apache.thrift.meta_data.FieldMetaData("kind", org.apache.thrift.TFieldRequirementType.DEFAULT, new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.ENUM , "TCompactionKind"))); + tmpMap.put(_Fields.USER_COMPACTION_ID, new org.apache.thrift.meta_data.FieldMetaData("userCompactionId", org.apache.thrift.TFieldRequirementType.DEFAULT, + new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I64))); metaDataMap = java.util.Collections.unmodifiableMap(tmpMap); org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(TExternalCompactionJob.class, metaDataMap); } @@ -219,7 +227,8 @@ public class TExternalCompactionJob implements org.apache.thrift.TBase<TExternal TCompactionReason reason, java.lang.String outputFile, boolean propagateDeletes, - TCompactionKind kind) + TCompactionKind kind, + long userCompactionId) { this(); this.externalCompactionId = externalCompactionId; @@ -238,6 +247,8 @@ public class TExternalCompactionJob implements org.apache.thrift.TBase<TExternal this.propagateDeletes = propagateDeletes; setPropagateDeletesIsSet(true); this.kind = kind; + this.userCompactionId = userCompactionId; + setUserCompactionIdIsSet(true); } /** @@ -277,6 +288,7 @@ public class TExternalCompactionJob implements org.apache.thrift.TBase<TExternal if (other.isSetKind()) { this.kind = other.kind; } + this.userCompactionId = other.userCompactionId; } public TExternalCompactionJob deepCopy() { @@ -301,6 +313,8 @@ public class TExternalCompactionJob implements org.apache.thrift.TBase<TExternal setPropagateDeletesIsSet(false); this.propagateDeletes = false; this.kind = null; + setUserCompactionIdIsSet(false); + this.userCompactionId = 0; } @org.apache.thrift.annotation.Nullable @@ -627,6 +641,29 @@ public class TExternalCompactionJob implements org.apache.thrift.TBase<TExternal } } + public long getUserCompactionId() { + return this.userCompactionId; + } + + public TExternalCompactionJob setUserCompactionId(long userCompactionId) { + this.userCompactionId = userCompactionId; + setUserCompactionIdIsSet(true); + return this; + } + + public void unsetUserCompactionId() { + __isset_bitfield = org.apache.thrift.EncodingUtils.clearBit(__isset_bitfield, __USERCOMPACTIONID_ISSET_ID); + } + + /** Returns true if field userCompactionId is set (has been assigned a value) and false otherwise */ + public boolean isSetUserCompactionId() { + return org.apache.thrift.EncodingUtils.testBit(__isset_bitfield, __USERCOMPACTIONID_ISSET_ID); + } + + public void setUserCompactionIdIsSet(boolean value) { + __isset_bitfield = org.apache.thrift.EncodingUtils.setBit(__isset_bitfield, __USERCOMPACTIONID_ISSET_ID, value); + } + public void setFieldValue(_Fields field, @org.apache.thrift.annotation.Nullable java.lang.Object value) { switch (field) { case EXTERNAL_COMPACTION_ID: @@ -725,6 +762,14 @@ public class TExternalCompactionJob implements org.apache.thrift.TBase<TExternal } break; + case USER_COMPACTION_ID: + if (value == null) { + unsetUserCompactionId(); + } else { + setUserCompactionId((java.lang.Long)value); + } + break; + } } @@ -767,6 +812,9 @@ public class TExternalCompactionJob implements org.apache.thrift.TBase<TExternal case KIND: return getKind(); + case USER_COMPACTION_ID: + return getUserCompactionId(); + } throw new java.lang.IllegalStateException(); } @@ -802,6 +850,8 @@ public class TExternalCompactionJob implements org.apache.thrift.TBase<TExternal return isSetPropagateDeletes(); case KIND: return isSetKind(); + case USER_COMPACTION_ID: + return isSetUserCompactionId(); } throw new java.lang.IllegalStateException(); } @@ -929,6 +979,15 @@ public class TExternalCompactionJob implements org.apache.thrift.TBase<TExternal return false; } + boolean this_present_userCompactionId = true; + boolean that_present_userCompactionId = true; + if (this_present_userCompactionId || that_present_userCompactionId) { + if (!(this_present_userCompactionId && that_present_userCompactionId)) + return false; + if (this.userCompactionId != that.userCompactionId) + return false; + } + return true; } @@ -976,6 +1035,8 @@ public class TExternalCompactionJob implements org.apache.thrift.TBase<TExternal if (isSetKind()) hashCode = hashCode * 8191 + kind.getValue(); + hashCode = hashCode * 8191 + org.apache.thrift.TBaseHelper.hashCode(userCompactionId); + return hashCode; } @@ -1107,6 +1168,16 @@ public class TExternalCompactionJob implements org.apache.thrift.TBase<TExternal return lastComparison; } } + lastComparison = java.lang.Boolean.valueOf(isSetUserCompactionId()).compareTo(other.isSetUserCompactionId()); + if (lastComparison != 0) { + return lastComparison; + } + if (isSetUserCompactionId()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.userCompactionId, other.userCompactionId); + if (lastComparison != 0) { + return lastComparison; + } + } return 0; } @@ -1207,6 +1278,10 @@ public class TExternalCompactionJob implements org.apache.thrift.TBase<TExternal sb.append(this.kind); } first = false; + if (!first) sb.append(", "); + sb.append("userCompactionId:"); + sb.append(this.userCompactionId); + first = false; sb.append(")"); return sb.toString(); } @@ -1367,6 +1442,14 @@ public class TExternalCompactionJob implements org.apache.thrift.TBase<TExternal org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } break; + case 13: // USER_COMPACTION_ID + if (schemeField.type == org.apache.thrift.protocol.TType.I64) { + struct.userCompactionId = iprot.readI64(); + struct.setUserCompactionIdIsSet(true); + } else { + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + break; default: org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } @@ -1441,6 +1524,9 @@ public class TExternalCompactionJob implements org.apache.thrift.TBase<TExternal oprot.writeI32(struct.kind.getValue()); oprot.writeFieldEnd(); } + oprot.writeFieldBegin(USER_COMPACTION_ID_FIELD_DESC); + oprot.writeI64(struct.userCompactionId); + oprot.writeFieldEnd(); oprot.writeFieldStop(); oprot.writeStructEnd(); } @@ -1495,7 +1581,10 @@ public class TExternalCompactionJob implements org.apache.thrift.TBase<TExternal if (struct.isSetKind()) { optionals.set(11); } - oprot.writeBitSet(optionals, 12); + if (struct.isSetUserCompactionId()) { + optionals.set(12); + } + oprot.writeBitSet(optionals, 13); if (struct.isSetExternalCompactionId()) { oprot.writeString(struct.externalCompactionId); } @@ -1538,12 +1627,15 @@ public class TExternalCompactionJob implements org.apache.thrift.TBase<TExternal if (struct.isSetKind()) { oprot.writeI32(struct.kind.getValue()); } + if (struct.isSetUserCompactionId()) { + oprot.writeI64(struct.userCompactionId); + } } @Override public void read(org.apache.thrift.protocol.TProtocol prot, TExternalCompactionJob struct) throws org.apache.thrift.TException { org.apache.thrift.protocol.TTupleProtocol iprot = (org.apache.thrift.protocol.TTupleProtocol) prot; - java.util.BitSet incoming = iprot.readBitSet(12); + java.util.BitSet incoming = iprot.readBitSet(13); if (incoming.get(0)) { struct.externalCompactionId = iprot.readString(); struct.setExternalCompactionIdIsSet(true); @@ -1604,6 +1696,10 @@ public class TExternalCompactionJob implements org.apache.thrift.TBase<TExternal struct.kind = org.apache.accumulo.core.tabletserver.thrift.TCompactionKind.findByValue(iprot.readI32()); struct.setKindIsSet(true); } + if (incoming.get(12)) { + struct.userCompactionId = iprot.readI64(); + struct.setUserCompactionIdIsSet(true); + } } } diff --git a/core/src/main/thrift/tabletserver.thrift b/core/src/main/thrift/tabletserver.thrift index 8447426..f8b181a 100644 --- a/core/src/main/thrift/tabletserver.thrift +++ b/core/src/main/thrift/tabletserver.thrift @@ -180,6 +180,7 @@ struct TExternalCompactionJob { 10:string outputFile 11:bool propagateDeletes 12:TCompactionKind kind + 13:i64 userCompactionId } enum TCompactionKind { @@ -558,7 +559,6 @@ service TabletClientService extends client.ClientService { 3:string externalCompactionId 4:data.TKeyExtent extent ) - } typedef i32 TabletID diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java b/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java index 27c53ac..14a7ab4 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java +++ b/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java @@ -70,6 +70,7 @@ import org.apache.accumulo.core.metadata.TabletFileUtil; import org.apache.accumulo.core.metadata.schema.Ample; import org.apache.accumulo.core.metadata.schema.Ample.TabletMutator; import org.apache.accumulo.core.metadata.schema.DataFileValue; +import org.apache.accumulo.core.metadata.schema.ExternalCompactionId; import org.apache.accumulo.core.metadata.schema.MetadataSchema.BlipSection; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.BulkFileColumnFamily; @@ -77,6 +78,7 @@ import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.Ch import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ClonedColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.CurrentLocationColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ExternalCompactionColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LastLocationColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily; @@ -243,7 +245,7 @@ public class MetadataTableUtil { } public static void splitTablet(KeyExtent extent, Text oldPrevEndRow, double splitRatio, - ServerContext context, ServiceLock zooLock) { + ServerContext context, ServiceLock zooLock, Set<ExternalCompactionId> ecids) { Mutation m = TabletColumnFamily.createPrevRowMutation(extent); TabletColumnFamily.SPLIT_RATIO_COLUMN.put(m, new Value(Double.toString(splitRatio))); @@ -251,6 +253,9 @@ public class MetadataTableUtil { TabletColumnFamily.OLD_PREV_ROW_COLUMN.put(m, TabletColumnFamily.encodePrevEndRow(oldPrevEndRow)); ChoppedColumnFamily.CHOPPED_COLUMN.putDelete(m); + + ecids.forEach(ecid -> m.putDelete(ExternalCompactionColumnFamily.STR_NAME, ecid.canonical())); + update(context, zooLock, m, extent); } diff --git a/server/compactor/src/main/java/org/apache/accumulo/compactor/CompactionEnvironment.java b/server/compactor/src/main/java/org/apache/accumulo/compactor/CompactionEnvironment.java index 31ebe8d..7e4e0de 100644 --- a/server/compactor/src/main/java/org/apache/accumulo/compactor/CompactionEnvironment.java +++ b/server/compactor/src/main/java/org/apache/accumulo/compactor/CompactionEnvironment.java @@ -29,6 +29,7 @@ import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope; import org.apache.accumulo.core.iterators.SortedKeyValueIterator; import org.apache.accumulo.core.spi.compaction.CompactionKind; import org.apache.accumulo.core.tabletserver.thrift.TCompactionReason; +import org.apache.accumulo.core.tabletserver.thrift.TExternalCompactionJob; import org.apache.accumulo.core.util.ratelimit.RateLimiter; import org.apache.accumulo.core.util.ratelimit.SharedRateLimiterFactory; import org.apache.accumulo.server.ServerContext; @@ -43,6 +44,7 @@ public class CompactionEnvironment implements Closeable, CompactionEnv { private final ServerContext context; private final CompactionJobHolder jobHolder; private final SharedRateLimiterFactory limiter; + private TExternalCompactionJob job; private String queueName; public static class CompactorIterEnv extends TabletIteratorEnvironment { @@ -64,6 +66,7 @@ public class CompactionEnvironment implements Closeable, CompactionEnv { CompactionEnvironment(ServerContext context, CompactionJobHolder jobHolder, String queueName) { this.context = context; this.jobHolder = jobHolder; + this.job = jobHolder.getJob(); this.limiter = SharedRateLimiterFactory.getInstance(this.context.getConfiguration()); this.queueName = queueName; } @@ -86,12 +89,12 @@ public class CompactionEnvironment implements Closeable, CompactionEnv { @Override public RateLimiter getReadLimiter() { - return limiter.create("read_rate_limiter", () -> jobHolder.getJob().getReadRate()); + return limiter.create("read_rate_limiter", () -> job.getReadRate()); } @Override public RateLimiter getWriteLimiter() { - return limiter.create("write_rate_limiter", () -> jobHolder.getJob().getWriteRate()); + return limiter.create("write_rate_limiter", () -> job.getWriteRate()); } @Override @@ -99,7 +102,7 @@ public class CompactionEnvironment implements Closeable, CompactionEnv { AccumuloConfiguration acuTableConf, TableId tableId) { return new CompactorIterEnv(context, IteratorScope.majc, !jobHolder.getJob().isPropagateDeletes(), acuTableConf, tableId, - CompactionKind.valueOf(jobHolder.getJob().getKind().name()), queueName); + CompactionKind.valueOf(job.getKind().name()), queueName); } @Override @@ -109,7 +112,7 @@ public class CompactionEnvironment implements Closeable, CompactionEnv { @Override public TCompactionReason getReason() { - switch (jobHolder.getJob().getKind()) { + switch (job.getKind()) { case USER: return TCompactionReason.USER; case CHOP: diff --git a/server/compactor/src/main/java/org/apache/accumulo/compactor/CompactionJobHolder.java b/server/compactor/src/main/java/org/apache/accumulo/compactor/CompactionJobHolder.java index b94e033..b45b128 100644 --- a/server/compactor/src/main/java/org/apache/accumulo/compactor/CompactionJobHolder.java +++ b/server/compactor/src/main/java/org/apache/accumulo/compactor/CompactionJobHolder.java @@ -27,26 +27,22 @@ public class CompactionJobHolder { private TExternalCompactionJob job; private Thread compactionThread; - private volatile Boolean cancelled = Boolean.FALSE; - private TCompactionStats stats = null; + private volatile boolean cancelled = false; + private volatile TCompactionStats stats = null; CompactionJobHolder() {} - public void reset() { + public synchronized void reset() { job = null; compactionThread = null; - cancelled = Boolean.FALSE; + cancelled = false; stats = null; } - public TExternalCompactionJob getJob() { + public synchronized TExternalCompactionJob getJob() { return job; } - public Thread getThread() { - return compactionThread; - } - public TCompactionStats getStats() { return stats; } @@ -55,19 +51,24 @@ public class CompactionJobHolder { this.stats = stats; } - public void cancel() { - cancelled = Boolean.TRUE; + public synchronized boolean cancel(String extCompId) { + if (isSet() && getJob().getExternalCompactionId().equals(extCompId)) { + cancelled = true; + compactionThread.interrupt(); + return true; + } + return false; } public boolean isCancelled() { return cancelled; } - public boolean isSet() { + public synchronized boolean isSet() { return (null != this.job); } - public void set(TExternalCompactionJob job, Thread compactionThread) { + public synchronized void set(TExternalCompactionJob job, Thread compactionThread) { Objects.requireNonNull(job, "CompactionJob is null"); Objects.requireNonNull(compactionThread, "Compaction thread is null"); this.job = job; diff --git a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java index 1fbb3b6..6f98923 100644 --- a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java +++ b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java @@ -29,6 +29,7 @@ import java.util.Map; import java.util.TreeMap; import java.util.UUID; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.LongAdder; @@ -52,8 +53,11 @@ import org.apache.accumulo.core.metadata.StoredTabletFile; import org.apache.accumulo.core.metadata.TabletFile; import org.apache.accumulo.core.metadata.schema.DataFileValue; import org.apache.accumulo.core.metadata.schema.ExternalCompactionId; +import org.apache.accumulo.core.metadata.schema.TabletMetadata; +import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType; import org.apache.accumulo.core.rpc.ThriftUtil; import org.apache.accumulo.core.securityImpl.thrift.TCredentials; +import org.apache.accumulo.core.tabletserver.thrift.TCompactionKind; import org.apache.accumulo.core.tabletserver.thrift.TCompactionStats; import org.apache.accumulo.core.tabletserver.thrift.TExternalCompactionJob; import org.apache.accumulo.core.trace.TraceUtil; @@ -71,7 +75,6 @@ import org.apache.accumulo.fate.zookeeper.ZooReaderWriter; import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy; import org.apache.accumulo.server.AbstractServer; import org.apache.accumulo.server.GarbageCollectionLogger; -import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.server.ServerOpts; import org.apache.accumulo.server.compaction.CompactionInfo; import org.apache.accumulo.server.compaction.ExternalCompactionUtil; @@ -90,10 +93,6 @@ import org.apache.hadoop.fs.Path; import org.apache.thrift.TException; import org.apache.thrift.transport.TTransportException; import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.WatchedEvent; -import org.apache.zookeeper.Watcher; -import org.apache.zookeeper.Watcher.WatcherType; -import org.apache.zookeeper.data.Stat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -113,6 +112,8 @@ public class Compactor extends AbstractServer private static final Logger LOG = LoggerFactory.getLogger(Compactor.class); private static final long TIME_BETWEEN_GC_CHECKS = 5000; + private static final long TIME_BETWEEN_CANCEL_CHECKS = 5 * 60 * 1000; + private static final long TEN_MEGABYTES = 10485760; private static final CompactionCoordinator.Client.Factory COORDINATOR_CLIENT_FACTORY = new CompactionCoordinator.Client.Factory(); @@ -140,7 +141,9 @@ public class Compactor extends AbstractServer queueName = opts.getQueueName(); aconf = getConfiguration(); setupSecurity(); - startGCLogger(); + var schedExecutor = ThreadPools.createGeneralScheduledExecutorService(aconf); + startGCLogger(schedExecutor); + startCancelChecker(schedExecutor, TIME_BETWEEN_CANCEL_CHECKS); printStartupMsg(); } @@ -149,7 +152,9 @@ public class Compactor extends AbstractServer queueName = opts.getQueueName(); aconf = conf; setupSecurity(); - startGCLogger(); + var schedExecutor = ThreadPools.createGeneralScheduledExecutorService(aconf); + startGCLogger(schedExecutor); + startCancelChecker(schedExecutor, TIME_BETWEEN_CANCEL_CHECKS); printStartupMsg(); } @@ -158,12 +163,62 @@ public class Compactor extends AbstractServer security = AuditedSecurityOperation.getInstance(getContext()); } - protected void startGCLogger() { - ThreadPools.createGeneralScheduledExecutorService(aconf).scheduleWithFixedDelay( - () -> gcLogger.logGCInfo(getConfiguration()), 0, TIME_BETWEEN_GC_CHECKS, + protected void startGCLogger(ScheduledThreadPoolExecutor schedExecutor) { + schedExecutor.scheduleWithFixedDelay(() -> gcLogger.logGCInfo(getConfiguration()), 0, + TIME_BETWEEN_GC_CHECKS, TimeUnit.MILLISECONDS); + } + + protected void startCancelChecker(ScheduledThreadPoolExecutor schedExecutor, + long timeBetweenChecks) { + schedExecutor.scheduleWithFixedDelay(() -> checkIfCanceled(), 0, timeBetweenChecks, TimeUnit.MILLISECONDS); } + private void checkIfCanceled() { + TExternalCompactionJob job = JOB_HOLDER.getJob(); + if (job != null) { + try { + var extent = KeyExtent.fromThrift(job.getExtent()); + var ecid = ExternalCompactionId.of(job.getExternalCompactionId()); + + TabletMetadata tabletMeta = + getContext().getAmple().readTablet(extent, ColumnType.ECOMP, ColumnType.PREV_ROW); + if (tabletMeta == null || !tabletMeta.getExtent().equals(extent) + || !tabletMeta.getExternalCompactions().containsKey(ecid)) { + // table was deleted OR tablet was split or merged OR tablet no longer thinks compaction + // is running for some reason + LOG.info("Cancelling compaction {} that no longer has a metadata entry at {}", ecid, + extent); + JOB_HOLDER.cancel(job.getExternalCompactionId()); + return; + } + + if (job.getKind() == TCompactionKind.USER) { + String zTablePath = Constants.ZROOT + "/" + getContext().getInstanceID() + + Constants.ZTABLES + "/" + extent.tableId() + Constants.ZTABLE_COMPACT_CANCEL_ID; + byte[] id = getContext().getZooCache().get(zTablePath); + if (id == null) { + // table probably deleted + LOG.info("Cancelling compaction {} for table that no longer exists {}", ecid, extent); + JOB_HOLDER.cancel(job.getExternalCompactionId()); + return; + } else { + var cancelId = Long.parseLong(new String(id, UTF_8)); + + if (cancelId >= job.getUserCompactionId()) { + LOG.info("Cancelling compaction {} because user compaction was canceled"); + JOB_HOLDER.cancel(job.getExternalCompactionId()); + return; + } + } + } + } catch (RuntimeException e) { + LOG.warn("Failed to check if compaction {} for {} was canceled.", + job.getExternalCompactionId(), KeyExtent.fromThrift(job.getExtent()), e); + } + } + } + protected void printStartupMsg() { LOG.info("Version " + Constants.VERSION); LOG.info("Instance " + getContext().getInstanceID()); @@ -300,17 +355,11 @@ public class Compactor extends AbstractServer * thrift error */ private void cancel(String externalCompactionId) throws TException { - synchronized (JOB_HOLDER) { - if (JOB_HOLDER.isSet() - && JOB_HOLDER.getJob().getExternalCompactionId().equals(externalCompactionId)) { - LOG.info("Cancel requested for compaction job {}", externalCompactionId); - JOB_HOLDER.cancel(); - JOB_HOLDER.getThread().interrupt(); - } else { - throw new UnknownCompactionIdException(); - } + if (JOB_HOLDER.cancel(externalCompactionId)) { + LOG.info("Cancel requested for compaction job {}", externalCompactionId); + } else { + throw new UnknownCompactionIdException(); } - } /** @@ -603,57 +652,9 @@ public class Compactor extends AbstractServer "Compaction job for tablet " + job.getExtent().toString(), createCompactionJob(job, totalInputEntries, totalInputBytes, started, stopped, err)); - synchronized (JOB_HOLDER) { - JOB_HOLDER.set(job, compactionThread); - } + JOB_HOLDER.set(job, compactionThread); - final String tableId = new String(job.getExtent().getTable(), UTF_8); - final ServerContext ctxRef = getContext(); - final String tablePath = - getContext().getZooKeeperRoot() + Constants.ZTABLES + "/" + tableId; - Watcher tableNodeWatcher = new Watcher() { - @Override - public void process(WatchedEvent event) { - switch (event.getType()) { - case NodeDeleted: - LOG.info("Zookeeper node for table {} deleted, cancelling compaction.", tableId); - JOB_HOLDER.cancel(); - break; - default: - // Watcher got fired for some other event, need to recreate the Watcher - try { - Stat s = ctxRef.getZooReaderWriter().getZooKeeper().exists(tablePath, this); - if (s == null) { - LOG.info("Zookeeper node for table {} deleted before compaction started.", - tableId); - // if stat is null from the zookeeper.exists(path, Watcher) call, then we just - // created a Watcher on a node that does not exist. Delete the watcher we just - // created. - ctxRef.getZooReaderWriter().getZooKeeper().removeWatches(tablePath, this, - WatcherType.Any, true); - } - } catch (Exception e) { - LOG.error("Error communicating with ZooKeeper and unable to recreate Watcher", e); - // CBUG: Should we exit? - } - break; - } - } - }; try { - // Add a watcher in ZooKeeper on the table id so that we can cancel this compaction - // if the table is deleted - Stat s = - getContext().getZooReaderWriter().getZooKeeper().exists(tablePath, tableNodeWatcher); - if (s == null) { - LOG.info("Zookeeper node for table {} deleted before compaction started.", tableId); - // if stat is null from the zookeeper.exists(path, Watcher) call, then we just - // created a Watcher on a node that does not exist. Delete the watcher we just created. - getContext().getZooReaderWriter().getZooKeeper().removeWatches(tablePath, - tableNodeWatcher, WatcherType.Any, true); - continue; - } - compactionThread.start(); // start the compactionThread started.await(); // wait until the compactor is started final long inputEntries = totalInputEntries.sum(); @@ -740,12 +741,6 @@ public class Compactor extends AbstractServer LOG.error("Error cancelling compaction.", e2); } } finally { - try { - getContext().getZooReaderWriter().getZooKeeper().removeWatches(tablePath, - tableNodeWatcher, WatcherType.Any, true); - } catch (KeeperException e) { - LOG.error("Error removing watch from {}.", tablePath, e); - } currentCompactionId.set(null); } diff --git a/server/compactor/src/test/java/org/apache/accumulo/compactor/CompactorTest.java b/server/compactor/src/test/java/org/apache/accumulo/compactor/CompactorTest.java index 36c7cbc..2e27ffc 100644 --- a/server/compactor/src/test/java/org/apache/accumulo/compactor/CompactorTest.java +++ b/server/compactor/src/test/java/org/apache/accumulo/compactor/CompactorTest.java @@ -27,6 +27,7 @@ import java.util.Timer; import java.util.TimerTask; import java.util.UUID; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.LongAdder; import java.util.function.Supplier; @@ -191,7 +192,7 @@ public class CompactorTest { protected void setupSecurity() {} @Override - protected void startGCLogger() {} + protected void startGCLogger(ScheduledThreadPoolExecutor schedExecutor) {} @Override protected void printStartupMsg() {} diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java b/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java index d3fefbf..8df449d 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java @@ -25,6 +25,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -68,6 +69,7 @@ import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ChoppedColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.CurrentLocationColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ExternalCompactionColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.FutureLocationColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily; @@ -746,16 +748,24 @@ abstract class TabletGroupWatcher extends Thread { scanner = client.createScanner(targetSystemTable, Authorizations.EMPTY); scanner.setRange(new Range(stopRow)); ServerColumnFamily.TIME_COLUMN.fetch(scanner); + scanner.fetchColumnFamily(ExternalCompactionColumnFamily.NAME); + Set<String> extCompIds = new HashSet<>(); for (Entry<Key,Value> entry : scanner) { if (ServerColumnFamily.TIME_COLUMN.hasColumns(entry.getKey())) { maxLogicalTime = TabletTime.maxMetadataTime(maxLogicalTime, MetadataTime.parse(entry.getValue().toString())); + } else if (ExternalCompactionColumnFamily.NAME.equals(entry.getKey().getColumnFamily())) { + extCompIds.add(entry.getKey().getColumnQualifierData().toString()); } } if (maxLogicalTime != null) ServerColumnFamily.TIME_COLUMN.put(m, new Value(maxLogicalTime.encode())); + // delete any entries for external compactions + extCompIds.stream() + .forEach(ecid -> m.putDelete(ExternalCompactionColumnFamily.STR_NAME, ecid)); + if (!m.getUpdates().isEmpty()) { bw.addMutation(m); } @@ -779,9 +789,9 @@ abstract class TabletGroupWatcher extends Thread { deleteTablets(info, scanRange, bw, client); // Clean-up the last chopped marker - m = new Mutation(stopRow); - ChoppedColumnFamily.CHOPPED_COLUMN.putDelete(m); - bw.addMutation(m); + var m2 = new Mutation(stopRow); + ChoppedColumnFamily.CHOPPED_COLUMN.putDelete(m2); + bw.addMutation(m2); bw.flush(); } catch (Exception ex) { diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/ExternalCompactionExecutor.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/ExternalCompactionExecutor.java index 04591ae..6d7757c 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/ExternalCompactionExecutor.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/ExternalCompactionExecutor.java @@ -152,10 +152,12 @@ public class ExternalCompactionExecutor implements CompactionExecutor { if (extJob.getJob().getPriority() >= priority) { if (extJob.status.compareAndSet(Status.QUEUED, Status.RUNNING)) { + queuedTask.remove(extJob); var ecj = extJob.compactable.reserveExternalCompaction(extJob.csid, extJob.getJob(), compactorId, externalCompactionId); + if (ecj == null) + return null; extJob.ecid = ecj.getExternalCompactionId(); - queuedTask.remove(extJob); return ecj; } else { // TODO could this cause a stack overflow? diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/ExternalCompactionJob.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/ExternalCompactionJob.java index fdb58e1..4343448 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/ExternalCompactionJob.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/ExternalCompactionJob.java @@ -36,6 +36,8 @@ import org.apache.accumulo.core.tabletserver.thrift.TCompactionReason; import org.apache.accumulo.core.tabletserver.thrift.TCompactionType; import org.apache.accumulo.core.tabletserver.thrift.TExternalCompactionJob; +import com.google.common.base.Preconditions; + public class ExternalCompactionJob { private Set<StoredTabletFile> jobFiles; @@ -46,12 +48,13 @@ public class ExternalCompactionJob { private long priority; private CompactionKind kind; private List<IteratorSetting> iters; + private long userCompactionId; public ExternalCompactionJob() {} public ExternalCompactionJob(Set<StoredTabletFile> jobFiles, boolean propogateDeletes, TabletFile compactTmpName, KeyExtent extent, ExternalCompactionId externalCompactionId, - long priority, CompactionKind kind, List<IteratorSetting> iters) { + long priority, CompactionKind kind, List<IteratorSetting> iters, Long userCompactionId) { this.jobFiles = Objects.requireNonNull(jobFiles); this.propogateDeletes = propogateDeletes; this.compactTmpName = Objects.requireNonNull(compactTmpName); @@ -60,6 +63,13 @@ public class ExternalCompactionJob { this.priority = priority; this.kind = Objects.requireNonNull(kind); this.iters = Objects.requireNonNull(iters); + if (kind == CompactionKind.USER) { + Preconditions.checkArgument(userCompactionId != null && userCompactionId > 0); + this.userCompactionId = userCompactionId; + } else { + this.userCompactionId = 0; + } + } public TExternalCompactionJob toThrift() { @@ -85,20 +95,18 @@ public class ExternalCompactionJob { default: throw new IllegalStateException(); } - IteratorConfig iteratorSettings = SystemIteratorUtil.toIteratorConfig(iters); // TODO what are things that are zeros below needed for List<InputFile> files = jobFiles.stream().map(stf -> new InputFile(stf.getPathStr(), 0, 0, 0)) .collect(Collectors.toList()); - // CBUG there seem to be two CompactionKind thrift types - // CBUG rename CompactionKind thrift type to TCompactionKind // TODO priority cast and compactionId cast... compactionId could be null I think return new TExternalCompactionJob(externalCompactionId.toString(), extent.toThrift(), files, (int) priority, readRate, writeRate, iteratorSettings, type, reason, compactTmpName.getPathStr(), propogateDeletes, - org.apache.accumulo.core.tabletserver.thrift.TCompactionKind.valueOf(kind.name())); + org.apache.accumulo.core.tabletserver.thrift.TCompactionKind.valueOf(kind.name()), + userCompactionId); } public ExternalCompactionId getExternalCompactionId() { diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableImpl.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableImpl.java index c25dfd8..f40ec6a 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableImpl.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableImpl.java @@ -959,7 +959,8 @@ public class CompactableImpl implements Compactable { externalCompactions.put(externalCompactionId, ecInfo); return new ExternalCompactionJob(cInfo.jobFiles, cInfo.propogateDeletes, compactTmpName, - getExtent(), externalCompactionId, job.getPriority(), job.getKind(), cInfo.iters); + getExtent(), externalCompactionId, job.getPriority(), job.getKind(), cInfo.iters, + cInfo.checkCompactionId); } catch (Exception e) { // CBUG unreserve files for compaction! diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java index 376ec6e..f096e4a 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java @@ -81,6 +81,7 @@ import org.apache.accumulo.core.metadata.schema.ExternalCompactionId; import org.apache.accumulo.core.metadata.schema.ExternalCompactionMetadata; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataTime; +import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType; import org.apache.accumulo.core.protobuf.ProtobufUtil; import org.apache.accumulo.core.replication.ReplicationConfigurationUtil; import org.apache.accumulo.core.security.Authorizations; @@ -1352,19 +1353,35 @@ public class Tablet { } try { - Pair<List<LogEntry>,SortedMap<StoredTabletFile,DataFileValue>> fileLog = - MetadataTableUtil.getFileAndLogEntries(context, extent); + var tabletMeta = context.getAmple().readTablet(extent, ColumnType.FILES, ColumnType.LOGS, + ColumnType.ECOMP, ColumnType.PREV_ROW); - if (!fileLog.getFirst().isEmpty()) { + if (!tabletMeta.getExtent().equals(extent)) { + String msg = "Closed tablet " + extent + " does not match extent in metadata table " + + tabletMeta.getExtent(); + log.error(msg); + throw new RuntimeException(msg); + } + + HashSet<ExternalCompactionId> ecids = new HashSet<>(); + compactable.getExternalCompactionIds(ecids::add); + if (!tabletMeta.getExternalCompactions().keySet().equals(ecids)) { + String msg = "Closed tablet " + extent + " external compaction ids differ " + ecids + " != " + + tabletMeta.getExternalCompactions().keySet(); + log.error(msg); + throw new RuntimeException(msg); + } + + if (!tabletMeta.getLogs().isEmpty()) { String msg = "Closed tablet " + extent + " has walog entries in " + MetadataTable.NAME + " " - + fileLog.getFirst(); + + tabletMeta.getLogs(); log.error(msg); throw new RuntimeException(msg); } - if (!fileLog.getSecond().equals(getDatafileManager().getDatafileSizes())) { + if (!tabletMeta.getFilesMap().equals(getDatafileManager().getDatafileSizes())) { String msg = "Data files in differ from in memory data " + extent + " " - + fileLog.getSecond() + " " + getDatafileManager().getDatafileSizes(); + + tabletMeta.getFilesMap() + " " + getDatafileManager().getDatafileSizes(); log.error(msg); throw new RuntimeException(msg); } @@ -1705,8 +1722,11 @@ public class Tablet { MetadataTime time = tabletTime.getMetadataTime(); + HashSet<ExternalCompactionId> ecids = new HashSet<>(); + compactable.getExternalCompactionIds(ecids::add); + MetadataTableUtil.splitTablet(high, extent.prevEndRow(), splitRatio, - getTabletServer().getContext(), getTabletServer().getLock()); + getTabletServer().getContext(), getTabletServer().getLock(), ecids); ManagerMetadataUtil.addNewTablet(getTabletServer().getContext(), low, lowDirectoryName, getTabletServer().getTabletSession(), lowDatafileSizes, bulkImported, time, lastFlushID, lastCompactID, getTabletServer().getLock()); diff --git a/test/src/main/java/org/apache/accumulo/test/functional/SplitRecoveryIT.java b/test/src/main/java/org/apache/accumulo/test/functional/SplitRecoveryIT.java index 759f87f..e8e2017 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/SplitRecoveryIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/SplitRecoveryIT.java @@ -30,6 +30,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Set; import java.util.SortedMap; import java.util.TreeMap; import java.util.UUID; @@ -205,7 +206,7 @@ public class SplitRecoveryIT extends ConfigurableMacBase { MetadataTableUtil.splitDatafiles(midRow, splitRatio, new HashMap<>(), mapFiles, lowDatafileSizes, highDatafileSizes, highDatafilesToRemove); - MetadataTableUtil.splitTablet(high, extent.prevEndRow(), splitRatio, context, zl); + MetadataTableUtil.splitTablet(high, extent.prevEndRow(), splitRatio, context, zl, Set.of()); TServerInstance instance = new TServerInstance(location, zl.getSessionId()); Assignment assignment = new Assignment(high, instance);