This is an automated email from the ASF dual-hosted git repository.
kturner pushed a commit to branch 1451-external-compactions-feature
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to
refs/heads/1451-external-compactions-feature by this push:
new c478645 Handle splits, merges, and canceling of user compactions (all
untested)
c478645 is described below
commit c478645f984ca0eee8533073d545a20ced29a77d
Author: Keith Turner <[email protected]>
AuthorDate: Fri Apr 23 19:54:40 2021 -0400
Handle splits, merges, and canceling of user compactions (all untested)
---
.../accumulo/core/metadata/schema/AmpleImpl.java | 4 +
.../thrift/TExternalCompactionJob.java | 104 ++++++++++++++-
core/src/main/thrift/tabletserver.thrift | 2 +-
.../accumulo/server/util/MetadataTableUtil.java | 7 +-
.../accumulo/compactor/CompactionEnvironment.java | 11 +-
.../accumulo/compactor/CompactionJobHolder.java | 27 ++--
.../org/apache/accumulo/compactor/Compactor.java | 145 ++++++++++-----------
.../apache/accumulo/compactor/CompactorTest.java | 3 +-
.../accumulo/manager/TabletGroupWatcher.java | 16 ++-
.../compactions/ExternalCompactionExecutor.java | 4 +-
.../tserver/compactions/ExternalCompactionJob.java | 18 ++-
.../accumulo/tserver/tablet/CompactableImpl.java | 3 +-
.../org/apache/accumulo/tserver/tablet/Tablet.java | 34 ++++-
.../accumulo/test/functional/SplitRecoveryIT.java | 3 +-
14 files changed, 264 insertions(+), 117 deletions(-)
diff --git
a/core/src/main/java/org/apache/accumulo/core/metadata/schema/AmpleImpl.java
b/core/src/main/java/org/apache/accumulo/core/metadata/schema/AmpleImpl.java
index 863a6eb..b8cc498 100644
--- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/AmpleImpl.java
+++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/AmpleImpl.java
@@ -18,6 +18,8 @@
*/
package org.apache.accumulo.core.metadata.schema;
+import java.util.NoSuchElementException;
+
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType;
@@ -43,6 +45,8 @@ public class AmpleImpl implements Ample {
try (TabletsMetadata tablets = builder.build()) {
return Iterables.getOnlyElement(tablets);
+ } catch (NoSuchElementException e) {
+ return null;
}
}
diff --git
a/core/src/main/thrift-gen-java/org/apache/accumulo/core/tabletserver/thrift/TExternalCompactionJob.java
b/core/src/main/thrift-gen-java/org/apache/accumulo/core/tabletserver/thrift/TExternalCompactionJob.java
index 3345f6a..a89f563 100644
---
a/core/src/main/thrift-gen-java/org/apache/accumulo/core/tabletserver/thrift/TExternalCompactionJob.java
+++
b/core/src/main/thrift-gen-java/org/apache/accumulo/core/tabletserver/thrift/TExternalCompactionJob.java
@@ -40,6 +40,7 @@ public class TExternalCompactionJob implements
org.apache.thrift.TBase<TExternal
private static final org.apache.thrift.protocol.TField
OUTPUT_FILE_FIELD_DESC = new org.apache.thrift.protocol.TField("outputFile",
org.apache.thrift.protocol.TType.STRING, (short)10);
private static final org.apache.thrift.protocol.TField
PROPAGATE_DELETES_FIELD_DESC = new
org.apache.thrift.protocol.TField("propagateDeletes",
org.apache.thrift.protocol.TType.BOOL, (short)11);
private static final org.apache.thrift.protocol.TField KIND_FIELD_DESC = new
org.apache.thrift.protocol.TField("kind", org.apache.thrift.protocol.TType.I32,
(short)12);
+ private static final org.apache.thrift.protocol.TField
USER_COMPACTION_ID_FIELD_DESC = new
org.apache.thrift.protocol.TField("userCompactionId",
org.apache.thrift.protocol.TType.I64, (short)13);
private static final org.apache.thrift.scheme.SchemeFactory
STANDARD_SCHEME_FACTORY = new TExternalCompactionJobStandardSchemeFactory();
private static final org.apache.thrift.scheme.SchemeFactory
TUPLE_SCHEME_FACTORY = new TExternalCompactionJobTupleSchemeFactory();
@@ -64,6 +65,7 @@ public class TExternalCompactionJob implements
org.apache.thrift.TBase<TExternal
public @org.apache.thrift.annotation.Nullable java.lang.String outputFile;
// required
public boolean propagateDeletes; // required
public @org.apache.thrift.annotation.Nullable TCompactionKind kind; //
required
+ public long userCompactionId; // required
/** The set of fields this struct contains, along with convenience methods
for finding and manipulating them. */
public enum _Fields implements org.apache.thrift.TFieldIdEnum {
@@ -86,7 +88,8 @@ public class TExternalCompactionJob implements
org.apache.thrift.TBase<TExternal
REASON((short)9, "reason"),
OUTPUT_FILE((short)10, "outputFile"),
PROPAGATE_DELETES((short)11, "propagateDeletes"),
- KIND((short)12, "kind");
+ KIND((short)12, "kind"),
+ USER_COMPACTION_ID((short)13, "userCompactionId");
private static final java.util.Map<java.lang.String, _Fields> byName = new
java.util.HashMap<java.lang.String, _Fields>();
@@ -126,6 +129,8 @@ public class TExternalCompactionJob implements
org.apache.thrift.TBase<TExternal
return PROPAGATE_DELETES;
case 12: // KIND
return KIND;
+ case 13: // USER_COMPACTION_ID
+ return USER_COMPACTION_ID;
default:
return null;
}
@@ -171,6 +176,7 @@ public class TExternalCompactionJob implements
org.apache.thrift.TBase<TExternal
private static final int __READRATE_ISSET_ID = 1;
private static final int __WRITERATE_ISSET_ID = 2;
private static final int __PROPAGATEDELETES_ISSET_ID = 3;
+ private static final int __USERCOMPACTIONID_ISSET_ID = 4;
private byte __isset_bitfield = 0;
public static final java.util.Map<_Fields,
org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
static {
@@ -200,6 +206,8 @@ public class TExternalCompactionJob implements
org.apache.thrift.TBase<TExternal
new
org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.BOOL)));
tmpMap.put(_Fields.KIND, new
org.apache.thrift.meta_data.FieldMetaData("kind",
org.apache.thrift.TFieldRequirementType.DEFAULT,
new
org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.ENUM
, "TCompactionKind")));
+ tmpMap.put(_Fields.USER_COMPACTION_ID, new
org.apache.thrift.meta_data.FieldMetaData("userCompactionId",
org.apache.thrift.TFieldRequirementType.DEFAULT,
+ new
org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I64)));
metaDataMap = java.util.Collections.unmodifiableMap(tmpMap);
org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(TExternalCompactionJob.class,
metaDataMap);
}
@@ -219,7 +227,8 @@ public class TExternalCompactionJob implements
org.apache.thrift.TBase<TExternal
TCompactionReason reason,
java.lang.String outputFile,
boolean propagateDeletes,
- TCompactionKind kind)
+ TCompactionKind kind,
+ long userCompactionId)
{
this();
this.externalCompactionId = externalCompactionId;
@@ -238,6 +247,8 @@ public class TExternalCompactionJob implements
org.apache.thrift.TBase<TExternal
this.propagateDeletes = propagateDeletes;
setPropagateDeletesIsSet(true);
this.kind = kind;
+ this.userCompactionId = userCompactionId;
+ setUserCompactionIdIsSet(true);
}
/**
@@ -277,6 +288,7 @@ public class TExternalCompactionJob implements
org.apache.thrift.TBase<TExternal
if (other.isSetKind()) {
this.kind = other.kind;
}
+ this.userCompactionId = other.userCompactionId;
}
public TExternalCompactionJob deepCopy() {
@@ -301,6 +313,8 @@ public class TExternalCompactionJob implements
org.apache.thrift.TBase<TExternal
setPropagateDeletesIsSet(false);
this.propagateDeletes = false;
this.kind = null;
+ setUserCompactionIdIsSet(false);
+ this.userCompactionId = 0;
}
@org.apache.thrift.annotation.Nullable
@@ -627,6 +641,29 @@ public class TExternalCompactionJob implements
org.apache.thrift.TBase<TExternal
}
}
+ public long getUserCompactionId() {
+ return this.userCompactionId;
+ }
+
+ public TExternalCompactionJob setUserCompactionId(long userCompactionId) {
+ this.userCompactionId = userCompactionId;
+ setUserCompactionIdIsSet(true);
+ return this;
+ }
+
+ public void unsetUserCompactionId() {
+ __isset_bitfield =
org.apache.thrift.EncodingUtils.clearBit(__isset_bitfield,
__USERCOMPACTIONID_ISSET_ID);
+ }
+
+ /** Returns true if field userCompactionId is set (has been assigned a
value) and false otherwise */
+ public boolean isSetUserCompactionId() {
+ return org.apache.thrift.EncodingUtils.testBit(__isset_bitfield,
__USERCOMPACTIONID_ISSET_ID);
+ }
+
+ public void setUserCompactionIdIsSet(boolean value) {
+ __isset_bitfield =
org.apache.thrift.EncodingUtils.setBit(__isset_bitfield,
__USERCOMPACTIONID_ISSET_ID, value);
+ }
+
public void setFieldValue(_Fields field,
@org.apache.thrift.annotation.Nullable java.lang.Object value) {
switch (field) {
case EXTERNAL_COMPACTION_ID:
@@ -725,6 +762,14 @@ public class TExternalCompactionJob implements
org.apache.thrift.TBase<TExternal
}
break;
+ case USER_COMPACTION_ID:
+ if (value == null) {
+ unsetUserCompactionId();
+ } else {
+ setUserCompactionId((java.lang.Long)value);
+ }
+ break;
+
}
}
@@ -767,6 +812,9 @@ public class TExternalCompactionJob implements
org.apache.thrift.TBase<TExternal
case KIND:
return getKind();
+ case USER_COMPACTION_ID:
+ return getUserCompactionId();
+
}
throw new java.lang.IllegalStateException();
}
@@ -802,6 +850,8 @@ public class TExternalCompactionJob implements
org.apache.thrift.TBase<TExternal
return isSetPropagateDeletes();
case KIND:
return isSetKind();
+ case USER_COMPACTION_ID:
+ return isSetUserCompactionId();
}
throw new java.lang.IllegalStateException();
}
@@ -929,6 +979,15 @@ public class TExternalCompactionJob implements
org.apache.thrift.TBase<TExternal
return false;
}
+ boolean this_present_userCompactionId = true;
+ boolean that_present_userCompactionId = true;
+ if (this_present_userCompactionId || that_present_userCompactionId) {
+ if (!(this_present_userCompactionId && that_present_userCompactionId))
+ return false;
+ if (this.userCompactionId != that.userCompactionId)
+ return false;
+ }
+
return true;
}
@@ -976,6 +1035,8 @@ public class TExternalCompactionJob implements
org.apache.thrift.TBase<TExternal
if (isSetKind())
hashCode = hashCode * 8191 + kind.getValue();
+ hashCode = hashCode * 8191 +
org.apache.thrift.TBaseHelper.hashCode(userCompactionId);
+
return hashCode;
}
@@ -1107,6 +1168,16 @@ public class TExternalCompactionJob implements
org.apache.thrift.TBase<TExternal
return lastComparison;
}
}
+ lastComparison =
java.lang.Boolean.valueOf(isSetUserCompactionId()).compareTo(other.isSetUserCompactionId());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (isSetUserCompactionId()) {
+ lastComparison =
org.apache.thrift.TBaseHelper.compareTo(this.userCompactionId,
other.userCompactionId);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
return 0;
}
@@ -1207,6 +1278,10 @@ public class TExternalCompactionJob implements
org.apache.thrift.TBase<TExternal
sb.append(this.kind);
}
first = false;
+ if (!first) sb.append(", ");
+ sb.append("userCompactionId:");
+ sb.append(this.userCompactionId);
+ first = false;
sb.append(")");
return sb.toString();
}
@@ -1367,6 +1442,14 @@ public class TExternalCompactionJob implements
org.apache.thrift.TBase<TExternal
org.apache.thrift.protocol.TProtocolUtil.skip(iprot,
schemeField.type);
}
break;
+ case 13: // USER_COMPACTION_ID
+ if (schemeField.type == org.apache.thrift.protocol.TType.I64) {
+ struct.userCompactionId = iprot.readI64();
+ struct.setUserCompactionIdIsSet(true);
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot,
schemeField.type);
+ }
+ break;
default:
org.apache.thrift.protocol.TProtocolUtil.skip(iprot,
schemeField.type);
}
@@ -1441,6 +1524,9 @@ public class TExternalCompactionJob implements
org.apache.thrift.TBase<TExternal
oprot.writeI32(struct.kind.getValue());
oprot.writeFieldEnd();
}
+ oprot.writeFieldBegin(USER_COMPACTION_ID_FIELD_DESC);
+ oprot.writeI64(struct.userCompactionId);
+ oprot.writeFieldEnd();
oprot.writeFieldStop();
oprot.writeStructEnd();
}
@@ -1495,7 +1581,10 @@ public class TExternalCompactionJob implements
org.apache.thrift.TBase<TExternal
if (struct.isSetKind()) {
optionals.set(11);
}
- oprot.writeBitSet(optionals, 12);
+ if (struct.isSetUserCompactionId()) {
+ optionals.set(12);
+ }
+ oprot.writeBitSet(optionals, 13);
if (struct.isSetExternalCompactionId()) {
oprot.writeString(struct.externalCompactionId);
}
@@ -1538,12 +1627,15 @@ public class TExternalCompactionJob implements
org.apache.thrift.TBase<TExternal
if (struct.isSetKind()) {
oprot.writeI32(struct.kind.getValue());
}
+ if (struct.isSetUserCompactionId()) {
+ oprot.writeI64(struct.userCompactionId);
+ }
}
@Override
public void read(org.apache.thrift.protocol.TProtocol prot,
TExternalCompactionJob struct) throws org.apache.thrift.TException {
org.apache.thrift.protocol.TTupleProtocol iprot =
(org.apache.thrift.protocol.TTupleProtocol) prot;
- java.util.BitSet incoming = iprot.readBitSet(12);
+ java.util.BitSet incoming = iprot.readBitSet(13);
if (incoming.get(0)) {
struct.externalCompactionId = iprot.readString();
struct.setExternalCompactionIdIsSet(true);
@@ -1604,6 +1696,10 @@ public class TExternalCompactionJob implements
org.apache.thrift.TBase<TExternal
struct.kind =
org.apache.accumulo.core.tabletserver.thrift.TCompactionKind.findByValue(iprot.readI32());
struct.setKindIsSet(true);
}
+ if (incoming.get(12)) {
+ struct.userCompactionId = iprot.readI64();
+ struct.setUserCompactionIdIsSet(true);
+ }
}
}
diff --git a/core/src/main/thrift/tabletserver.thrift
b/core/src/main/thrift/tabletserver.thrift
index 8447426..f8b181a 100644
--- a/core/src/main/thrift/tabletserver.thrift
+++ b/core/src/main/thrift/tabletserver.thrift
@@ -180,6 +180,7 @@ struct TExternalCompactionJob {
10:string outputFile
11:bool propagateDeletes
12:TCompactionKind kind
+ 13:i64 userCompactionId
}
enum TCompactionKind {
@@ -558,7 +559,6 @@ service TabletClientService extends client.ClientService {
3:string externalCompactionId
4:data.TKeyExtent extent
)
-
}
typedef i32 TabletID
diff --git
a/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java
b/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java
index 27c53ac..14a7ab4 100644
---
a/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java
+++
b/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java
@@ -70,6 +70,7 @@ import org.apache.accumulo.core.metadata.TabletFileUtil;
import org.apache.accumulo.core.metadata.schema.Ample;
import org.apache.accumulo.core.metadata.schema.Ample.TabletMutator;
import org.apache.accumulo.core.metadata.schema.DataFileValue;
+import org.apache.accumulo.core.metadata.schema.ExternalCompactionId;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.BlipSection;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
import
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.BulkFileColumnFamily;
@@ -77,6 +78,7 @@ import
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.Ch
import
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ClonedColumnFamily;
import
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.CurrentLocationColumnFamily;
import
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily;
+import
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ExternalCompactionColumnFamily;
import
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LastLocationColumnFamily;
import
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily;
import
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily;
@@ -243,7 +245,7 @@ public class MetadataTableUtil {
}
public static void splitTablet(KeyExtent extent, Text oldPrevEndRow, double
splitRatio,
- ServerContext context, ServiceLock zooLock) {
+ ServerContext context, ServiceLock zooLock, Set<ExternalCompactionId>
ecids) {
Mutation m = TabletColumnFamily.createPrevRowMutation(extent);
TabletColumnFamily.SPLIT_RATIO_COLUMN.put(m, new
Value(Double.toString(splitRatio)));
@@ -251,6 +253,9 @@ public class MetadataTableUtil {
TabletColumnFamily.OLD_PREV_ROW_COLUMN.put(m,
TabletColumnFamily.encodePrevEndRow(oldPrevEndRow));
ChoppedColumnFamily.CHOPPED_COLUMN.putDelete(m);
+
+ ecids.forEach(ecid -> m.putDelete(ExternalCompactionColumnFamily.STR_NAME,
ecid.canonical()));
+
update(context, zooLock, m, extent);
}
diff --git
a/server/compactor/src/main/java/org/apache/accumulo/compactor/CompactionEnvironment.java
b/server/compactor/src/main/java/org/apache/accumulo/compactor/CompactionEnvironment.java
index 31ebe8d..7e4e0de 100644
---
a/server/compactor/src/main/java/org/apache/accumulo/compactor/CompactionEnvironment.java
+++
b/server/compactor/src/main/java/org/apache/accumulo/compactor/CompactionEnvironment.java
@@ -29,6 +29,7 @@ import
org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
import org.apache.accumulo.core.spi.compaction.CompactionKind;
import org.apache.accumulo.core.tabletserver.thrift.TCompactionReason;
+import org.apache.accumulo.core.tabletserver.thrift.TExternalCompactionJob;
import org.apache.accumulo.core.util.ratelimit.RateLimiter;
import org.apache.accumulo.core.util.ratelimit.SharedRateLimiterFactory;
import org.apache.accumulo.server.ServerContext;
@@ -43,6 +44,7 @@ public class CompactionEnvironment implements Closeable,
CompactionEnv {
private final ServerContext context;
private final CompactionJobHolder jobHolder;
private final SharedRateLimiterFactory limiter;
+ private TExternalCompactionJob job;
private String queueName;
public static class CompactorIterEnv extends TabletIteratorEnvironment {
@@ -64,6 +66,7 @@ public class CompactionEnvironment implements Closeable,
CompactionEnv {
CompactionEnvironment(ServerContext context, CompactionJobHolder jobHolder,
String queueName) {
this.context = context;
this.jobHolder = jobHolder;
+ this.job = jobHolder.getJob();
this.limiter =
SharedRateLimiterFactory.getInstance(this.context.getConfiguration());
this.queueName = queueName;
}
@@ -86,12 +89,12 @@ public class CompactionEnvironment implements Closeable,
CompactionEnv {
@Override
public RateLimiter getReadLimiter() {
- return limiter.create("read_rate_limiter", () ->
jobHolder.getJob().getReadRate());
+ return limiter.create("read_rate_limiter", () -> job.getReadRate());
}
@Override
public RateLimiter getWriteLimiter() {
- return limiter.create("write_rate_limiter", () ->
jobHolder.getJob().getWriteRate());
+ return limiter.create("write_rate_limiter", () -> job.getWriteRate());
}
@Override
@@ -99,7 +102,7 @@ public class CompactionEnvironment implements Closeable,
CompactionEnv {
AccumuloConfiguration acuTableConf, TableId tableId) {
return new CompactorIterEnv(context, IteratorScope.majc,
!jobHolder.getJob().isPropagateDeletes(), acuTableConf, tableId,
- CompactionKind.valueOf(jobHolder.getJob().getKind().name()),
queueName);
+ CompactionKind.valueOf(job.getKind().name()), queueName);
}
@Override
@@ -109,7 +112,7 @@ public class CompactionEnvironment implements Closeable,
CompactionEnv {
@Override
public TCompactionReason getReason() {
- switch (jobHolder.getJob().getKind()) {
+ switch (job.getKind()) {
case USER:
return TCompactionReason.USER;
case CHOP:
diff --git
a/server/compactor/src/main/java/org/apache/accumulo/compactor/CompactionJobHolder.java
b/server/compactor/src/main/java/org/apache/accumulo/compactor/CompactionJobHolder.java
index b94e033..b45b128 100644
---
a/server/compactor/src/main/java/org/apache/accumulo/compactor/CompactionJobHolder.java
+++
b/server/compactor/src/main/java/org/apache/accumulo/compactor/CompactionJobHolder.java
@@ -27,26 +27,22 @@ public class CompactionJobHolder {
private TExternalCompactionJob job;
private Thread compactionThread;
- private volatile Boolean cancelled = Boolean.FALSE;
- private TCompactionStats stats = null;
+ private volatile boolean cancelled = false;
+ private volatile TCompactionStats stats = null;
CompactionJobHolder() {}
- public void reset() {
+ public synchronized void reset() {
job = null;
compactionThread = null;
- cancelled = Boolean.FALSE;
+ cancelled = false;
stats = null;
}
- public TExternalCompactionJob getJob() {
+ public synchronized TExternalCompactionJob getJob() {
return job;
}
- public Thread getThread() {
- return compactionThread;
- }
-
public TCompactionStats getStats() {
return stats;
}
@@ -55,19 +51,24 @@ public class CompactionJobHolder {
this.stats = stats;
}
- public void cancel() {
- cancelled = Boolean.TRUE;
+ public synchronized boolean cancel(String extCompId) {
+ if (isSet() && getJob().getExternalCompactionId().equals(extCompId)) {
+ cancelled = true;
+ compactionThread.interrupt();
+ return true;
+ }
+ return false;
}
public boolean isCancelled() {
return cancelled;
}
- public boolean isSet() {
+ public synchronized boolean isSet() {
return (null != this.job);
}
- public void set(TExternalCompactionJob job, Thread compactionThread) {
+ public synchronized void set(TExternalCompactionJob job, Thread
compactionThread) {
Objects.requireNonNull(job, "CompactionJob is null");
Objects.requireNonNull(compactionThread, "Compaction thread is null");
this.job = job;
diff --git
a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
index 1fbb3b6..6f98923 100644
---
a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
+++
b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
@@ -29,6 +29,7 @@ import java.util.Map;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.LongAdder;
@@ -52,8 +53,11 @@ import org.apache.accumulo.core.metadata.StoredTabletFile;
import org.apache.accumulo.core.metadata.TabletFile;
import org.apache.accumulo.core.metadata.schema.DataFileValue;
import org.apache.accumulo.core.metadata.schema.ExternalCompactionId;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType;
import org.apache.accumulo.core.rpc.ThriftUtil;
import org.apache.accumulo.core.securityImpl.thrift.TCredentials;
+import org.apache.accumulo.core.tabletserver.thrift.TCompactionKind;
import org.apache.accumulo.core.tabletserver.thrift.TCompactionStats;
import org.apache.accumulo.core.tabletserver.thrift.TExternalCompactionJob;
import org.apache.accumulo.core.trace.TraceUtil;
@@ -71,7 +75,6 @@ import org.apache.accumulo.fate.zookeeper.ZooReaderWriter;
import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
import org.apache.accumulo.server.AbstractServer;
import org.apache.accumulo.server.GarbageCollectionLogger;
-import org.apache.accumulo.server.ServerContext;
import org.apache.accumulo.server.ServerOpts;
import org.apache.accumulo.server.compaction.CompactionInfo;
import org.apache.accumulo.server.compaction.ExternalCompactionUtil;
@@ -90,10 +93,6 @@ import org.apache.hadoop.fs.Path;
import org.apache.thrift.TException;
import org.apache.thrift.transport.TTransportException;
import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.Watcher.WatcherType;
-import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -113,6 +112,8 @@ public class Compactor extends AbstractServer
private static final Logger LOG = LoggerFactory.getLogger(Compactor.class);
private static final long TIME_BETWEEN_GC_CHECKS = 5000;
+ private static final long TIME_BETWEEN_CANCEL_CHECKS = 5 * 60 * 1000;
+
private static final long TEN_MEGABYTES = 10485760;
private static final CompactionCoordinator.Client.Factory
COORDINATOR_CLIENT_FACTORY =
new CompactionCoordinator.Client.Factory();
@@ -140,7 +141,9 @@ public class Compactor extends AbstractServer
queueName = opts.getQueueName();
aconf = getConfiguration();
setupSecurity();
- startGCLogger();
+ var schedExecutor =
ThreadPools.createGeneralScheduledExecutorService(aconf);
+ startGCLogger(schedExecutor);
+ startCancelChecker(schedExecutor, TIME_BETWEEN_CANCEL_CHECKS);
printStartupMsg();
}
@@ -149,7 +152,9 @@ public class Compactor extends AbstractServer
queueName = opts.getQueueName();
aconf = conf;
setupSecurity();
- startGCLogger();
+ var schedExecutor =
ThreadPools.createGeneralScheduledExecutorService(aconf);
+ startGCLogger(schedExecutor);
+ startCancelChecker(schedExecutor, TIME_BETWEEN_CANCEL_CHECKS);
printStartupMsg();
}
@@ -158,12 +163,62 @@ public class Compactor extends AbstractServer
security = AuditedSecurityOperation.getInstance(getContext());
}
- protected void startGCLogger() {
-
ThreadPools.createGeneralScheduledExecutorService(aconf).scheduleWithFixedDelay(
- () -> gcLogger.logGCInfo(getConfiguration()), 0,
TIME_BETWEEN_GC_CHECKS,
+ protected void startGCLogger(ScheduledThreadPoolExecutor schedExecutor) {
+ schedExecutor.scheduleWithFixedDelay(() ->
gcLogger.logGCInfo(getConfiguration()), 0,
+ TIME_BETWEEN_GC_CHECKS, TimeUnit.MILLISECONDS);
+ }
+
+ protected void startCancelChecker(ScheduledThreadPoolExecutor schedExecutor,
+ long timeBetweenChecks) {
+ schedExecutor.scheduleWithFixedDelay(() -> checkIfCanceled(), 0,
timeBetweenChecks,
TimeUnit.MILLISECONDS);
}
+ private void checkIfCanceled() {
+ TExternalCompactionJob job = JOB_HOLDER.getJob();
+ if (job != null) {
+ try {
+ var extent = KeyExtent.fromThrift(job.getExtent());
+ var ecid = ExternalCompactionId.of(job.getExternalCompactionId());
+
+ TabletMetadata tabletMeta =
+ getContext().getAmple().readTablet(extent, ColumnType.ECOMP,
ColumnType.PREV_ROW);
+ if (tabletMeta == null || !tabletMeta.getExtent().equals(extent)
+ || !tabletMeta.getExternalCompactions().containsKey(ecid)) {
+ // table was deleted OR tablet was split or merged OR tablet no
longer thinks compaction
+ // is running for some reason
+ LOG.info("Cancelling compaction {} that no longer has a metadata
entry at {}", ecid,
+ extent);
+ JOB_HOLDER.cancel(job.getExternalCompactionId());
+ return;
+ }
+
+ if (job.getKind() == TCompactionKind.USER) {
+ String zTablePath = Constants.ZROOT + "/" +
getContext().getInstanceID()
+ + Constants.ZTABLES + "/" + extent.tableId() +
Constants.ZTABLE_COMPACT_CANCEL_ID;
+ byte[] id = getContext().getZooCache().get(zTablePath);
+ if (id == null) {
+ // table probably deleted
+ LOG.info("Cancelling compaction {} for table that no longer exists
{}", ecid, extent);
+ JOB_HOLDER.cancel(job.getExternalCompactionId());
+ return;
+ } else {
+ var cancelId = Long.parseLong(new String(id, UTF_8));
+
+ if (cancelId >= job.getUserCompactionId()) {
+ LOG.info("Cancelling compaction {} because user compaction was
canceled");
+ JOB_HOLDER.cancel(job.getExternalCompactionId());
+ return;
+ }
+ }
+ }
+ } catch (RuntimeException e) {
+ LOG.warn("Failed to check if compaction {} for {} was canceled.",
+ job.getExternalCompactionId(),
KeyExtent.fromThrift(job.getExtent()), e);
+ }
+ }
+ }
+
protected void printStartupMsg() {
LOG.info("Version " + Constants.VERSION);
LOG.info("Instance " + getContext().getInstanceID());
@@ -300,17 +355,11 @@ public class Compactor extends AbstractServer
* thrift error
*/
private void cancel(String externalCompactionId) throws TException {
- synchronized (JOB_HOLDER) {
- if (JOB_HOLDER.isSet()
- &&
JOB_HOLDER.getJob().getExternalCompactionId().equals(externalCompactionId)) {
- LOG.info("Cancel requested for compaction job {}",
externalCompactionId);
- JOB_HOLDER.cancel();
- JOB_HOLDER.getThread().interrupt();
- } else {
- throw new UnknownCompactionIdException();
- }
+ if (JOB_HOLDER.cancel(externalCompactionId)) {
+ LOG.info("Cancel requested for compaction job {}", externalCompactionId);
+ } else {
+ throw new UnknownCompactionIdException();
}
-
}
/**
@@ -603,57 +652,9 @@ public class Compactor extends AbstractServer
"Compaction job for tablet " + job.getExtent().toString(),
createCompactionJob(job, totalInputEntries, totalInputBytes,
started, stopped, err));
- synchronized (JOB_HOLDER) {
- JOB_HOLDER.set(job, compactionThread);
- }
+ JOB_HOLDER.set(job, compactionThread);
- final String tableId = new String(job.getExtent().getTable(), UTF_8);
- final ServerContext ctxRef = getContext();
- final String tablePath =
- getContext().getZooKeeperRoot() + Constants.ZTABLES + "/" +
tableId;
- Watcher tableNodeWatcher = new Watcher() {
- @Override
- public void process(WatchedEvent event) {
- switch (event.getType()) {
- case NodeDeleted:
- LOG.info("Zookeeper node for table {} deleted, cancelling
compaction.", tableId);
- JOB_HOLDER.cancel();
- break;
- default:
- // Watcher got fired for some other event, need to recreate
the Watcher
- try {
- Stat s =
ctxRef.getZooReaderWriter().getZooKeeper().exists(tablePath, this);
- if (s == null) {
- LOG.info("Zookeeper node for table {} deleted before
compaction started.",
- tableId);
- // if stat is null from the zookeeper.exists(path,
Watcher) call, then we just
- // created a Watcher on a node that does not exist. Delete
the watcher we just
- // created.
-
ctxRef.getZooReaderWriter().getZooKeeper().removeWatches(tablePath, this,
- WatcherType.Any, true);
- }
- } catch (Exception e) {
- LOG.error("Error communicating with ZooKeeper and unable to
recreate Watcher", e);
- // CBUG: Should we exit?
- }
- break;
- }
- }
- };
try {
- // Add a watcher in ZooKeeper on the table id so that we can cancel
this compaction
- // if the table is deleted
- Stat s =
-
getContext().getZooReaderWriter().getZooKeeper().exists(tablePath,
tableNodeWatcher);
- if (s == null) {
- LOG.info("Zookeeper node for table {} deleted before compaction
started.", tableId);
- // if stat is null from the zookeeper.exists(path, Watcher) call,
then we just
- // created a Watcher on a node that does not exist. Delete the
watcher we just created.
-
getContext().getZooReaderWriter().getZooKeeper().removeWatches(tablePath,
- tableNodeWatcher, WatcherType.Any, true);
- continue;
- }
-
compactionThread.start(); // start the compactionThread
started.await(); // wait until the compactor is started
final long inputEntries = totalInputEntries.sum();
@@ -740,12 +741,6 @@ public class Compactor extends AbstractServer
LOG.error("Error cancelling compaction.", e2);
}
} finally {
- try {
-
getContext().getZooReaderWriter().getZooKeeper().removeWatches(tablePath,
- tableNodeWatcher, WatcherType.Any, true);
- } catch (KeeperException e) {
- LOG.error("Error removing watch from {}.", tablePath, e);
- }
currentCompactionId.set(null);
}
diff --git
a/server/compactor/src/test/java/org/apache/accumulo/compactor/CompactorTest.java
b/server/compactor/src/test/java/org/apache/accumulo/compactor/CompactorTest.java
index 36c7cbc..2e27ffc 100644
---
a/server/compactor/src/test/java/org/apache/accumulo/compactor/CompactorTest.java
+++
b/server/compactor/src/test/java/org/apache/accumulo/compactor/CompactorTest.java
@@ -27,6 +27,7 @@ import java.util.Timer;
import java.util.TimerTask;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.Supplier;
@@ -191,7 +192,7 @@ public class CompactorTest {
protected void setupSecurity() {}
@Override
- protected void startGCLogger() {}
+ protected void startGCLogger(ScheduledThreadPoolExecutor schedExecutor) {}
@Override
protected void printStartupMsg() {}
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java
b/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java
index d3fefbf..8df449d 100644
---
a/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java
+++
b/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java
@@ -25,6 +25,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -68,6 +69,7 @@ import
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
import
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ChoppedColumnFamily;
import
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.CurrentLocationColumnFamily;
import
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily;
+import
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ExternalCompactionColumnFamily;
import
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.FutureLocationColumnFamily;
import
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily;
import
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily;
@@ -746,16 +748,24 @@ abstract class TabletGroupWatcher extends Thread {
scanner = client.createScanner(targetSystemTable, Authorizations.EMPTY);
scanner.setRange(new Range(stopRow));
ServerColumnFamily.TIME_COLUMN.fetch(scanner);
+ scanner.fetchColumnFamily(ExternalCompactionColumnFamily.NAME);
+ Set<String> extCompIds = new HashSet<>();
for (Entry<Key,Value> entry : scanner) {
if (ServerColumnFamily.TIME_COLUMN.hasColumns(entry.getKey())) {
maxLogicalTime = TabletTime.maxMetadataTime(maxLogicalTime,
MetadataTime.parse(entry.getValue().toString()));
+ } else if
(ExternalCompactionColumnFamily.NAME.equals(entry.getKey().getColumnFamily())) {
+ extCompIds.add(entry.getKey().getColumnQualifierData().toString());
}
}
if (maxLogicalTime != null)
ServerColumnFamily.TIME_COLUMN.put(m, new
Value(maxLogicalTime.encode()));
+ // delete any entries for external compactions
+ extCompIds.stream()
+ .forEach(ecid ->
m.putDelete(ExternalCompactionColumnFamily.STR_NAME, ecid));
+
if (!m.getUpdates().isEmpty()) {
bw.addMutation(m);
}
@@ -779,9 +789,9 @@ abstract class TabletGroupWatcher extends Thread {
deleteTablets(info, scanRange, bw, client);
// Clean-up the last chopped marker
- m = new Mutation(stopRow);
- ChoppedColumnFamily.CHOPPED_COLUMN.putDelete(m);
- bw.addMutation(m);
+ var m2 = new Mutation(stopRow);
+ ChoppedColumnFamily.CHOPPED_COLUMN.putDelete(m2);
+ bw.addMutation(m2);
bw.flush();
} catch (Exception ex) {
diff --git
a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/ExternalCompactionExecutor.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/ExternalCompactionExecutor.java
index 04591ae..6d7757c 100644
---
a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/ExternalCompactionExecutor.java
+++
b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/ExternalCompactionExecutor.java
@@ -152,10 +152,12 @@ public class ExternalCompactionExecutor implements
CompactionExecutor {
if (extJob.getJob().getPriority() >= priority) {
if (extJob.status.compareAndSet(Status.QUEUED, Status.RUNNING)) {
+ queuedTask.remove(extJob);
var ecj = extJob.compactable.reserveExternalCompaction(extJob.csid,
extJob.getJob(),
compactorId, externalCompactionId);
+ if (ecj == null)
+ return null;
extJob.ecid = ecj.getExternalCompactionId();
- queuedTask.remove(extJob);
return ecj;
} else {
// TODO could this cause a stack overflow?
diff --git
a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/ExternalCompactionJob.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/ExternalCompactionJob.java
index fdb58e1..4343448 100644
---
a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/ExternalCompactionJob.java
+++
b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/ExternalCompactionJob.java
@@ -36,6 +36,8 @@ import
org.apache.accumulo.core.tabletserver.thrift.TCompactionReason;
import org.apache.accumulo.core.tabletserver.thrift.TCompactionType;
import org.apache.accumulo.core.tabletserver.thrift.TExternalCompactionJob;
+import com.google.common.base.Preconditions;
+
public class ExternalCompactionJob {
private Set<StoredTabletFile> jobFiles;
@@ -46,12 +48,13 @@ public class ExternalCompactionJob {
private long priority;
private CompactionKind kind;
private List<IteratorSetting> iters;
+ private long userCompactionId;
public ExternalCompactionJob() {}
public ExternalCompactionJob(Set<StoredTabletFile> jobFiles, boolean
propogateDeletes,
TabletFile compactTmpName, KeyExtent extent, ExternalCompactionId
externalCompactionId,
- long priority, CompactionKind kind, List<IteratorSetting> iters) {
+ long priority, CompactionKind kind, List<IteratorSetting> iters, Long
userCompactionId) {
this.jobFiles = Objects.requireNonNull(jobFiles);
this.propogateDeletes = propogateDeletes;
this.compactTmpName = Objects.requireNonNull(compactTmpName);
@@ -60,6 +63,13 @@ public class ExternalCompactionJob {
this.priority = priority;
this.kind = Objects.requireNonNull(kind);
this.iters = Objects.requireNonNull(iters);
+ if (kind == CompactionKind.USER) {
+ Preconditions.checkArgument(userCompactionId != null && userCompactionId
> 0);
+ this.userCompactionId = userCompactionId;
+ } else {
+ this.userCompactionId = 0;
+ }
+
}
public TExternalCompactionJob toThrift() {
@@ -85,20 +95,18 @@ public class ExternalCompactionJob {
default:
throw new IllegalStateException();
}
-
IteratorConfig iteratorSettings =
SystemIteratorUtil.toIteratorConfig(iters);
// TODO what are things that are zeros below needed for
List<InputFile> files = jobFiles.stream().map(stf -> new
InputFile(stf.getPathStr(), 0, 0, 0))
.collect(Collectors.toList());
- // CBUG there seem to be two CompactionKind thrift types
- // CBUG rename CompactionKind thrift type to TCompactionKind
// TODO priority cast and compactionId cast... compactionId could be null
I think
return new TExternalCompactionJob(externalCompactionId.toString(),
extent.toThrift(), files,
(int) priority, readRate, writeRate, iteratorSettings, type, reason,
compactTmpName.getPathStr(), propogateDeletes,
-
org.apache.accumulo.core.tabletserver.thrift.TCompactionKind.valueOf(kind.name()));
+
org.apache.accumulo.core.tabletserver.thrift.TCompactionKind.valueOf(kind.name()),
+ userCompactionId);
}
public ExternalCompactionId getExternalCompactionId() {
diff --git
a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableImpl.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableImpl.java
index c25dfd8..f40ec6a 100644
---
a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableImpl.java
+++
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableImpl.java
@@ -959,7 +959,8 @@ public class CompactableImpl implements Compactable {
externalCompactions.put(externalCompactionId, ecInfo);
return new ExternalCompactionJob(cInfo.jobFiles, cInfo.propogateDeletes,
compactTmpName,
- getExtent(), externalCompactionId, job.getPriority(), job.getKind(),
cInfo.iters);
+ getExtent(), externalCompactionId, job.getPriority(), job.getKind(),
cInfo.iters,
+ cInfo.checkCompactionId);
} catch (Exception e) {
// CBUG unreserve files for compaction!
diff --git
a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
index 376ec6e..f096e4a 100644
---
a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
+++
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
@@ -81,6 +81,7 @@ import
org.apache.accumulo.core.metadata.schema.ExternalCompactionId;
import org.apache.accumulo.core.metadata.schema.ExternalCompactionMetadata;
import
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily;
import org.apache.accumulo.core.metadata.schema.MetadataTime;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType;
import org.apache.accumulo.core.protobuf.ProtobufUtil;
import org.apache.accumulo.core.replication.ReplicationConfigurationUtil;
import org.apache.accumulo.core.security.Authorizations;
@@ -1352,19 +1353,35 @@ public class Tablet {
}
try {
- Pair<List<LogEntry>,SortedMap<StoredTabletFile,DataFileValue>> fileLog =
- MetadataTableUtil.getFileAndLogEntries(context, extent);
+ var tabletMeta = context.getAmple().readTablet(extent, ColumnType.FILES,
ColumnType.LOGS,
+ ColumnType.ECOMP, ColumnType.PREV_ROW);
- if (!fileLog.getFirst().isEmpty()) {
+ if (!tabletMeta.getExtent().equals(extent)) {
+ String msg = "Closed tablet " + extent + " does not match extent in
metadata table "
+ + tabletMeta.getExtent();
+ log.error(msg);
+ throw new RuntimeException(msg);
+ }
+
+ HashSet<ExternalCompactionId> ecids = new HashSet<>();
+ compactable.getExternalCompactionIds(ecids::add);
+ if (!tabletMeta.getExternalCompactions().keySet().equals(ecids)) {
+ String msg = "Closed tablet " + extent + " external compaction ids
differ " + ecids + " != "
+ + tabletMeta.getExternalCompactions().keySet();
+ log.error(msg);
+ throw new RuntimeException(msg);
+ }
+
+ if (!tabletMeta.getLogs().isEmpty()) {
String msg = "Closed tablet " + extent + " has walog entries in " +
MetadataTable.NAME + " "
- + fileLog.getFirst();
+ + tabletMeta.getLogs();
log.error(msg);
throw new RuntimeException(msg);
}
- if
(!fileLog.getSecond().equals(getDatafileManager().getDatafileSizes())) {
+ if
(!tabletMeta.getFilesMap().equals(getDatafileManager().getDatafileSizes())) {
String msg = "Data files in differ from in memory data " + extent + "
"
- + fileLog.getSecond() + " " +
getDatafileManager().getDatafileSizes();
+ + tabletMeta.getFilesMap() + " " +
getDatafileManager().getDatafileSizes();
log.error(msg);
throw new RuntimeException(msg);
}
@@ -1705,8 +1722,11 @@ public class Tablet {
MetadataTime time = tabletTime.getMetadataTime();
+ HashSet<ExternalCompactionId> ecids = new HashSet<>();
+ compactable.getExternalCompactionIds(ecids::add);
+
MetadataTableUtil.splitTablet(high, extent.prevEndRow(), splitRatio,
- getTabletServer().getContext(), getTabletServer().getLock());
+ getTabletServer().getContext(), getTabletServer().getLock(), ecids);
ManagerMetadataUtil.addNewTablet(getTabletServer().getContext(), low,
lowDirectoryName,
getTabletServer().getTabletSession(), lowDatafileSizes,
bulkImported, time, lastFlushID,
lastCompactID, getTabletServer().getLock());
diff --git
a/test/src/main/java/org/apache/accumulo/test/functional/SplitRecoveryIT.java
b/test/src/main/java/org/apache/accumulo/test/functional/SplitRecoveryIT.java
index 759f87f..e8e2017 100644
---
a/test/src/main/java/org/apache/accumulo/test/functional/SplitRecoveryIT.java
+++
b/test/src/main/java/org/apache/accumulo/test/functional/SplitRecoveryIT.java
@@ -30,6 +30,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.UUID;
@@ -205,7 +206,7 @@ public class SplitRecoveryIT extends ConfigurableMacBase {
MetadataTableUtil.splitDatafiles(midRow, splitRatio, new HashMap<>(),
mapFiles,
lowDatafileSizes, highDatafileSizes, highDatafilesToRemove);
- MetadataTableUtil.splitTablet(high, extent.prevEndRow(), splitRatio,
context, zl);
+ MetadataTableUtil.splitTablet(high, extent.prevEndRow(), splitRatio,
context, zl, Set.of());
TServerInstance instance = new TServerInstance(location,
zl.getSessionId());
Assignment assignment = new Assignment(high, instance);