This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/elasticity by this push: new c08139caf5 executes user initiated splits in manager (#3425) c08139caf5 is described below commit c08139caf511dee6b7166c781a6169aa420dbdc4 Author: Keith Turner <ktur...@apache.org> AuthorDate: Thu May 25 14:27:29 2023 -0400 executes user initiated splits in manager (#3425) Modifies the user API for adding splits to execute a fate operation instead of calling the tablet server. Now user initiated splits can happen without having having to host a tablet. --- .../core/clientImpl/TableOperationsImpl.java | 274 ++++++++------------- .../core/manager/state/TabletManagement.java | 9 +- .../core/metadata/schema/TabletMetadata.java | 7 +- .../core/clientImpl/thrift/TableOperation.java | 5 +- .../core/manager/thrift/FateOperation.java | 5 +- core/src/main/thrift/client.thrift | 1 + core/src/main/thrift/manager.thrift | 1 + .../manager/state/TabletManagementIterator.java | 17 +- .../accumulo/manager/FateServiceHandler.java | 53 ++++ .../java/org/apache/accumulo/manager/Manager.java | 4 + .../manager/tableOps/split/DeleteOperationIds.java | 6 + .../accumulo/manager/tableOps/split/PreSplit.java | 2 +- .../test/functional/ManagerAssignmentIT.java | 61 ++++- .../functional/TabletManagementIteratorIT.java | 28 ++- 14 files changed, 285 insertions(+), 188 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java index 37dbe8488a..a063cff241 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java @@ -59,10 +59,11 @@ import java.util.Set; import java.util.SortedSet; import java.util.TreeMap; import java.util.TreeSet; -import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.function.Predicate; @@ -140,8 +141,6 @@ import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.summary.SummarizerConfigurationUtil; import org.apache.accumulo.core.summary.SummaryCollection; -import org.apache.accumulo.core.tablet.thrift.TabletManagementClientService; -import org.apache.accumulo.core.tabletserver.thrift.NotServingTabletException; import org.apache.accumulo.core.trace.TraceUtil; import org.apache.accumulo.core.util.LocalityGroupUtil; import org.apache.accumulo.core.util.LocalityGroupUtil.LocalityGroupConfigurationError; @@ -155,7 +154,6 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; -import org.apache.thrift.TApplicationException; import org.apache.thrift.TException; import org.apache.thrift.TSerializer; import org.apache.thrift.transport.TTransportException; @@ -164,7 +162,6 @@ import org.slf4j.LoggerFactory; import com.google.common.base.Joiner; import com.google.common.base.Preconditions; -import com.google.common.net.HostAndPort; public class TableOperationsImpl extends TableOperationsHelper { @@ -435,198 +432,143 @@ public class TableOperationsImpl extends TableOperationsHelper { } } - private static class SplitEnv { - private final String tableName; - private final TableId tableId; - private final ExecutorService executor; - private final CountDownLatch latch; - private final AtomicReference<Exception> exception; - - SplitEnv(String tableName, TableId tableId, ExecutorService executor, CountDownLatch latch, - AtomicReference<Exception> exception) { - this.tableName = tableName; - this.tableId = tableId; - this.executor = executor; - this.latch = latch; - this.exception = exception; - } - } - - private class SplitTask implements Runnable { - - private List<Text> splits; - private SplitEnv env; - - SplitTask(SplitEnv env, List<Text> splits) { - this.env = env; - this.splits = splits; - } + /** + * On the server side the fate operation will exit w/o an error if the tablet requested to split + * does not exist. When this happens it will also return an empty string. In the case where the + * fate operation successfully splits the tablet it will return the following string. This code + * uses this return value to see if it needs to retry finding the tablet. + */ + public static final String SPLIT_SUCCESS_MSG = "SPLIT_SUCCEEDED"; - @Override - public void run() { - try { - if (env.exception.get() != null) { - return; - } + @Override + public void addSplits(String tableName, SortedSet<Text> splits) + throws AccumuloException, TableNotFoundException, AccumuloSecurityException { - if (splits.size() <= 2) { - addSplits(env, new TreeSet<>(splits)); - splits.forEach(s -> env.latch.countDown()); - return; - } + EXISTING_TABLE_NAME.validate(tableName); - int mid = splits.size() / 2; + TableId tableId = context.getTableId(tableName); - // split the middle split point to ensure that child task split - // different tablets and can therefore run in parallel - addSplits(env, new TreeSet<>(splits.subList(mid, mid + 1))); - env.latch.countDown(); + // TODO should there be a server side check for this? + context.requireNotOffline(tableId, tableName); - env.executor.execute(new SplitTask(env, splits.subList(0, mid))); - env.executor.execute(new SplitTask(env, splits.subList(mid + 1, splits.size()))); + ClientTabletCache tabLocator = ClientTabletCache.getInstance(context, tableId); - } catch (Exception t) { - env.exception.compareAndSet(null, t); - } - } + SortedSet<Text> splitsTodo = new TreeSet<>(splits); + ExecutorService executor = context.threadPools().createFixedThreadPool(16, "addSplits", false); + try { + while (!splitsTodo.isEmpty()) { - } + tabLocator.invalidateCache(); - @Override - public void addSplits(String tableName, SortedSet<Text> partitionKeys) - throws TableNotFoundException, AccumuloException, AccumuloSecurityException { - EXISTING_TABLE_NAME.validate(tableName); + Map<KeyExtent,List<Text>> tabletSplits = + mapSplitsToTablets(tableName, tableId, tabLocator, splitsTodo); - TableId tableId = context.getTableId(tableName); - List<Text> splits = new ArrayList<>(partitionKeys); + List<Future<List<Text>>> splitTasks = new ArrayList<>(); - // should be sorted because we copied from a sorted set, but that makes - // assumptions about how the copy was done so resort to be sure. - Collections.sort(splits); - CountDownLatch latch = new CountDownLatch(splits.size()); - AtomicReference<Exception> exception = new AtomicReference<>(null); + for (Entry<KeyExtent,List<Text>> splitsForTablet : tabletSplits.entrySet()) { + Callable<List<Text>> splitTask = createSplitTask(tableName, splitsForTablet); + splitTasks.add(executor.submit(splitTask)); + } - ExecutorService executor = context.threadPools().createFixedThreadPool(16, "addSplits", false); - try { - executor.execute( - new SplitTask(new SplitEnv(tableName, tableId, executor, latch, exception), splits)); - - while (!latch.await(100, MILLISECONDS)) { - if (exception.get() != null) { - executor.shutdownNow(); - Throwable excep = exception.get(); - // Below all exceptions are wrapped and rethrown. This is done so that the user knows what - // code path got them here. If the wrapping was not done, the - // user would only have the stack trace for the background thread. - if (excep instanceof TableNotFoundException) { - TableNotFoundException tnfe = (TableNotFoundException) excep; - throw new TableNotFoundException(tableId.canonical(), tableName, - "Table not found by background thread", tnfe); - } else if (excep instanceof TableOfflineException) { - log.debug("TableOfflineException occurred in background thread. Throwing new exception", - excep); - throw new TableOfflineException(tableId, tableName); - } else if (excep instanceof AccumuloSecurityException) { - // base == background accumulo security exception - AccumuloSecurityException base = (AccumuloSecurityException) excep; - throw new AccumuloSecurityException(base.getUser(), base.asThriftException().getCode(), - base.getTableInfo(), excep); - } else if (excep instanceof AccumuloServerException) { - throw new AccumuloServerException((AccumuloServerException) excep); - } else if (excep instanceof Error) { - throw new Error(excep); - } else { - throw new AccumuloException(excep); + for (var future : splitTasks) { + try { + var completedSplits = future.get(); + completedSplits.forEach(splitsTodo::remove); + } catch (ExecutionException ee) { + Throwable excep = ee.getCause(); + // Below all exceptions are wrapped and rethrown. This is done so that the user knows + // what + // code path got them here. If the wrapping was not done, the user would only have the + // stack trace for the background thread. + if (excep instanceof TableNotFoundException) { + TableNotFoundException tnfe = (TableNotFoundException) excep; + throw new TableNotFoundException(tableId.canonical(), tableName, + "Table not found by background thread", tnfe); + } else if (excep instanceof TableOfflineException) { + log.debug( + "TableOfflineException occurred in background thread. Throwing new exception", + excep); + throw new TableOfflineException(tableId, tableName); + } else if (excep instanceof AccumuloSecurityException) { + // base == background accumulo security exception + AccumuloSecurityException base = (AccumuloSecurityException) excep; + throw new AccumuloSecurityException(base.getUser(), + base.asThriftException().getCode(), base.getTableInfo(), excep); + } else if (excep instanceof AccumuloServerException) { + throw new AccumuloServerException((AccumuloServerException) excep); + } else { + throw new AccumuloException(excep); + } + } catch (InterruptedException e) { + throw new IllegalStateException(e); } } } - } catch (InterruptedException e) { - throw new IllegalStateException(e); } finally { - executor.shutdown(); + executor.shutdownNow(); } } - private void addSplits(SplitEnv env, SortedSet<Text> partitionKeys) - throws AccumuloException, AccumuloSecurityException, TableNotFoundException, - AccumuloServerException, InvalidTabletHostingRequestException { - - ClientTabletCache tabLocator = ClientTabletCache.getInstance(context, env.tableId); - for (Text split : partitionKeys) { - boolean successful = false; - int attempt = 0; - long locationFailures = 0; + private Map<KeyExtent,List<Text>> mapSplitsToTablets(String tableName, TableId tableId, + ClientTabletCache tabLocator, SortedSet<Text> splitsTodo) + throws AccumuloException, AccumuloSecurityException, TableNotFoundException { + Map<KeyExtent,List<Text>> tabletSplits = new HashMap<>(); - while (!successful) { + var iterator = splitsTodo.iterator(); + while (iterator.hasNext()) { + var split = iterator.next(); - if (attempt > 0) { - sleepUninterruptibly(100, MILLISECONDS); + try { + var tablet = tabLocator.findTablet(context, split, false, LocationNeed.NOT_REQUIRED); + if (tablet == null) { + context.requireTableExists(tableId, tableName); + throw new IllegalStateException("Unable to find a tablet for split " + split + + " in table " + tableName + " " + tableId); } - attempt++; - - CachedTablet tl = tabLocator.findTablet(context, split, false, LocationNeed.REQUIRED); - - if (tl == null) { - context.requireTableExists(env.tableId, env.tableName); - context.requireNotOffline(env.tableId, env.tableName); + if (split.equals(tablet.getExtent().endRow())) { + // split already exists, so remove it + iterator.remove(); continue; } - HostAndPort address = HostAndPort.fromString(tl.getTserverLocation().orElseThrow()); - - try { - TabletManagementClientService.Client client = - ThriftUtil.getClient(ThriftClientTypes.TABLET_MGMT, address, context); - try { - - OpTimer timer = null; - - if (log.isTraceEnabled()) { - log.trace("tid={} Splitting tablet {} on {} at {}", Thread.currentThread().getId(), - tl.getExtent(), address, split); - timer = new OpTimer().start(); - } - - client.splitTablet(TraceUtil.traceInfo(), context.rpcCreds(), tl.getExtent().toThrift(), - TextUtil.getByteBuffer(split)); + tabletSplits.computeIfAbsent(tablet.getExtent(), k -> new ArrayList<>()).add(split); - // just split it, might as well invalidate it in the cache - tabLocator.invalidateCache(tl.getExtent()); + } catch (InvalidTabletHostingRequestException e) { + // not expected + throw new AccumuloException(e); + } + } + return tabletSplits; + } - if (timer != null) { - timer.stop(); - log.trace("Split tablet in {}", String.format("%.3f secs", timer.scale(SECONDS))); - } + private Callable<List<Text>> createSplitTask(String tableName, + Entry<KeyExtent,List<Text>> splitsForTablet) { + Callable<List<Text>> splitTask = () -> { + var extent = splitsForTablet.getKey(); - } finally { - ThriftUtil.returnClient(client, context); - } + ByteBuffer EMPTY = ByteBuffer.allocate(0); - } catch (TApplicationException tae) { - throw new AccumuloServerException(address.toString(), tae); - } catch (ThriftSecurityException e) { - context.clearTableListCache(); - context.requireTableExists(env.tableId, env.tableName); - throw new AccumuloSecurityException(e.user, e.code, e); - } catch (NotServingTabletException e) { - // Do not silently spin when we repeatedly fail to get the location for a tablet - locationFailures++; - if (locationFailures == 5 || locationFailures % 50 == 0) { - log.warn("Having difficulty locating hosting tabletserver for split {} on table {}." - + " Seen {} failures.", split, env.tableName, locationFailures); - } + List<ByteBuffer> args = new ArrayList<>(); + args.add(ByteBuffer.wrap(extent.tableId().canonical().getBytes(UTF_8))); + args.add(extent.endRow() == null ? EMPTY : TextUtil.getByteBuffer(extent.endRow())); + args.add(extent.prevEndRow() == null ? EMPTY : TextUtil.getByteBuffer(extent.prevEndRow())); + splitsForTablet.getValue().forEach(split -> args.add(TextUtil.getByteBuffer(split))); - tabLocator.invalidateCache(tl.getExtent()); - continue; - } catch (TException e) { - tabLocator.invalidateCache(context, tl.getTserverLocation().orElseThrow()); - continue; + try { + String status = doFateOperation(FateOperation.TABLE_SPLIT, args, Map.of(), tableName); + if (SPLIT_SUCCESS_MSG.equals(status)) { + // the fate operation successfully created the splits, so these splits are done + return splitsForTablet.getValue(); + } else { + // splits did not succeed + return List.of(); } - - successful = true; + } catch (TableExistsException | NamespaceExistsException | NamespaceNotFoundException e) { + throw new RuntimeException(e); } - } + }; + return splitTask; } @Override diff --git a/core/src/main/java/org/apache/accumulo/core/manager/state/TabletManagement.java b/core/src/main/java/org/apache/accumulo/core/manager/state/TabletManagement.java index a9233819d6..73d7971898 100644 --- a/core/src/main/java/org/apache/accumulo/core/manager/state/TabletManagement.java +++ b/core/src/main/java/org/apache/accumulo/core/manager/state/TabletManagement.java @@ -41,14 +41,15 @@ import com.google.common.base.Splitter; */ public class TabletManagement { - public static final EnumSet<ColumnType> CONFIGURED_COLUMNS = EnumSet.of(ColumnType.PREV_ROW, - ColumnType.LOCATION, ColumnType.SUSPEND, ColumnType.LOGS, ColumnType.CHOPPED, - ColumnType.HOSTING_GOAL, ColumnType.HOSTING_REQUESTED, ColumnType.FILES, ColumnType.LAST); + public static final EnumSet<ColumnType> CONFIGURED_COLUMNS = + EnumSet.of(ColumnType.PREV_ROW, ColumnType.LOCATION, ColumnType.SUSPEND, ColumnType.LOGS, + ColumnType.CHOPPED, ColumnType.HOSTING_GOAL, ColumnType.HOSTING_REQUESTED, + ColumnType.FILES, ColumnType.LAST, ColumnType.OPID); private static final String REASONS_COLUMN_NAME = "REASONS"; public static enum ManagementAction { - BAD_STATE, NEEDS_COMPACTING, NEEDS_LOCATION_UPDATE, IS_MERGING, IS_MIGRATING, NEEDS_SPLITTING; + BAD_STATE, NEEDS_COMPACTING, NEEDS_LOCATION_UPDATE, IS_MERGING, NEEDS_SPLITTING; } public static void addActions(final SortedMap<Key,Value> decodedRow, diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMetadata.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMetadata.java index 829ea52c02..7e043c258a 100644 --- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMetadata.java +++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMetadata.java @@ -76,7 +76,6 @@ import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.Se import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.SuspendLocationColumn; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily; import org.apache.accumulo.core.tabletserver.log.LogEntry; -import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.io.Text; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -559,11 +558,7 @@ public class TabletMetadata { case HostingColumnFamily.STR_NAME: switch (qual) { case GOAL_QUAL: - if (StringUtils.isEmpty(kv.getValue().toString())) { - te.goal = TabletHostingGoal.ONDEMAND; - } else { - te.goal = TabletHostingGoalUtil.fromValue(kv.getValue()); - } + te.goal = TabletHostingGoalUtil.fromValue(kv.getValue()); break; case REQUESTED_QUAL: te.onDemandHostingRequested = true; diff --git a/core/src/main/thrift-gen-java/org/apache/accumulo/core/clientImpl/thrift/TableOperation.java b/core/src/main/thrift-gen-java/org/apache/accumulo/core/clientImpl/thrift/TableOperation.java index bc2deb978b..673a1dec18 100644 --- a/core/src/main/thrift-gen-java/org/apache/accumulo/core/clientImpl/thrift/TableOperation.java +++ b/core/src/main/thrift-gen-java/org/apache/accumulo/core/clientImpl/thrift/TableOperation.java @@ -43,7 +43,8 @@ public enum TableOperation implements org.apache.thrift.TEnum { IMPORT(14), EXPORT(15), COMPACT_CANCEL(16), - SET_HOSTING_GOAL(17); + SET_HOSTING_GOAL(17), + SPLIT(18); private final int value; @@ -102,6 +103,8 @@ public enum TableOperation implements org.apache.thrift.TEnum { return COMPACT_CANCEL; case 17: return SET_HOSTING_GOAL; + case 18: + return SPLIT; default: return null; } diff --git a/core/src/main/thrift-gen-java/org/apache/accumulo/core/manager/thrift/FateOperation.java b/core/src/main/thrift-gen-java/org/apache/accumulo/core/manager/thrift/FateOperation.java index d93a7eafaa..29c0cbe578 100644 --- a/core/src/main/thrift-gen-java/org/apache/accumulo/core/manager/thrift/FateOperation.java +++ b/core/src/main/thrift-gen-java/org/apache/accumulo/core/manager/thrift/FateOperation.java @@ -43,7 +43,8 @@ public enum FateOperation implements org.apache.thrift.TEnum { NAMESPACE_DELETE(14), NAMESPACE_RENAME(15), TABLE_BULK_IMPORT2(16), - TABLE_HOSTING_GOAL(17); + TABLE_HOSTING_GOAL(17), + TABLE_SPLIT(18); private final int value; @@ -102,6 +103,8 @@ public enum FateOperation implements org.apache.thrift.TEnum { return TABLE_BULK_IMPORT2; case 17: return TABLE_HOSTING_GOAL; + case 18: + return TABLE_SPLIT; default: return null; } diff --git a/core/src/main/thrift/client.thrift b/core/src/main/thrift/client.thrift index 9c430d4001..700ca7a6db 100644 --- a/core/src/main/thrift/client.thrift +++ b/core/src/main/thrift/client.thrift @@ -40,6 +40,7 @@ enum TableOperation { EXPORT COMPACT_CANCEL SET_HOSTING_GOAL + SPLIT } enum TableOperationExceptionType { diff --git a/core/src/main/thrift/manager.thrift b/core/src/main/thrift/manager.thrift index 3e97977258..51738e7094 100644 --- a/core/src/main/thrift/manager.thrift +++ b/core/src/main/thrift/manager.thrift @@ -68,6 +68,7 @@ enum FateOperation { NAMESPACE_RENAME TABLE_BULK_IMPORT2 TABLE_HOSTING_GOAL + TABLE_SPLIT } enum ManagerState { diff --git a/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementIterator.java b/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementIterator.java index fb393ce25a..0556e68726 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementIterator.java +++ b/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementIterator.java @@ -57,6 +57,7 @@ import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.Fu import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.HostingColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LastLocationColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LogColumnFamily; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.SuspendLocationColumn; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily; import org.apache.accumulo.core.metadata.schema.TabletMetadata; @@ -217,10 +218,16 @@ public class TabletManagementIterator extends SkippingIterator { .collect(Collectors.summarizingLong(Long::longValue)).getSum() > splitThreshold; } - private static boolean shouldReturnDueToLocation(final TabletMetadata tm, + private boolean shouldReturnDueToLocation(final TabletMetadata tm, final Set<TableId> onlineTables, final Set<TServerInstance> current, final boolean debug) { + + if (migrations.contains(tm.getExtent())) { + return true; + } + // is the table supposed to be online or offline? - final boolean shouldBeOnline = onlineTables.contains(tm.getTableId()); + final boolean shouldBeOnline = + onlineTables.contains(tm.getTableId()) && tm.getOperationId() == null; TabletState state = tm.getTabletState(current); if (debug) { @@ -262,6 +269,7 @@ public class TabletManagementIterator extends SkippingIterator { scanner.fetchColumnFamily(LogColumnFamily.NAME); scanner.fetchColumnFamily(ChoppedColumnFamily.NAME); scanner.fetchColumnFamily(HostingColumnFamily.NAME); + ServerColumnFamily.OPID_COLUMN.fetch(scanner); scanner.addScanIterator(new IteratorSetting(1000, "wholeRows", WholeRowIterator.class)); IteratorSetting tabletChange = new IteratorSetting(1001, "ManagerTabletInfoIterator", TabletManagementIterator.class); @@ -393,11 +401,6 @@ public class TabletManagementIterator extends SkippingIterator { reasonsToReturnThisTablet.add(ManagementAction.IS_MERGING); } - // always return the information for migrating tablets - if (migrations.contains(tm.getExtent())) { - reasonsToReturnThisTablet.add(ManagementAction.IS_MIGRATING); - } - if (shouldReturnDueToLocation(tm, onlineTables, current, debug)) { reasonsToReturnThisTablet.add(ManagementAction.NEEDS_LOCATION_UPDATE); } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/FateServiceHandler.java b/server/manager/src/main/java/org/apache/accumulo/manager/FateServiceHandler.java index 785c056874..9cff4251dc 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/FateServiceHandler.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/FateServiceHandler.java @@ -41,6 +41,8 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; import java.util.stream.Collectors; import org.apache.accumulo.core.client.AccumuloSecurityException; @@ -63,6 +65,7 @@ import org.apache.accumulo.core.clientImpl.thrift.ThriftTableOperationException; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.NamespaceId; import org.apache.accumulo.core.data.TableId; +import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.dataImpl.thrift.TRange; import org.apache.accumulo.core.fate.ReadOnlyTStore.TStatus; import org.apache.accumulo.core.manager.thrift.BulkImportState; @@ -89,6 +92,7 @@ import org.apache.accumulo.manager.tableOps.namespace.create.CreateNamespace; import org.apache.accumulo.manager.tableOps.namespace.delete.DeleteNamespace; import org.apache.accumulo.manager.tableOps.namespace.rename.RenameNamespace; import org.apache.accumulo.manager.tableOps.rename.RenameTable; +import org.apache.accumulo.manager.tableOps.split.PreSplit; import org.apache.accumulo.manager.tableOps.tableExport.ExportTable; import org.apache.accumulo.manager.tableOps.tableImport.ImportTable; import org.apache.accumulo.server.client.ClientServiceHandler; @@ -694,6 +698,55 @@ class FateServiceHandler implements FateService.Iface { goalMessage); break; } + case TABLE_SPLIT: { + TableOperation tableOp = TableOperation.SPLIT; + + // ELASTICITY_TODO this does not check if table is offline for now, that is usually done in + // FATE operation with a table lock. Deferring that check for now as its possible tablet + // locks may not be needed. + + int SPLIT_OFFSET = 3; // offset where split data begins in arguments list + if (arguments.size() < (SPLIT_OFFSET + 1)) { + throw new ThriftTableOperationException(null, null, tableOp, + TableOperationExceptionType.OTHER, + "Expected at least " + (SPLIT_OFFSET + 1) + " arguments, saw :" + arguments.size()); + } + + var tableId = validateTableIdArgument(arguments.get(0), tableOp, NOT_ROOT_TABLE_ID); + NamespaceId namespaceId = getNamespaceIdFromTableId(tableOp, tableId); + + boolean canSplit; + + try { + canSplit = manager.security.canSplitTablet(c, tableId, namespaceId); + } catch (ThriftSecurityException e) { + throwIfTableMissingSecurityException(e, tableId, null, TableOperation.SPLIT); + throw e; + } + + if (!canSplit) { + throw new ThriftSecurityException(c.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED); + } + + var endRow = ByteBufferUtil.toText(arguments.get(1)); + var prevEndRow = ByteBufferUtil.toText(arguments.get(2)); + + endRow = endRow.getLength() == 0 ? null : endRow; + prevEndRow = prevEndRow.getLength() == 0 ? null : prevEndRow; + + // ELASTICITY_TODO create table stores splits in a file, maybe this operation should do the + // same + SortedSet<Text> splits = arguments.subList(SPLIT_OFFSET, arguments.size()).stream() + .map(ByteBufferUtil::toText).collect(Collectors.toCollection(TreeSet::new)); + + KeyExtent extent = new KeyExtent(tableId, endRow, prevEndRow); + manager.requestUnassignment(extent, opid); + + goalMessage = "Splitting " + extent + " for user into " + (splits.size() + 1) + " tablets"; + manager.fate().seedTransaction(op.toString(), opid, new PreSplit(extent, splits), + autoCleanup, goalMessage); + break; + } default: throw new UnsupportedOperationException(); } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java index ee84a35273..401d73d91d 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java @@ -706,6 +706,10 @@ public class Manager extends AbstractServer return TabletGoalState.UNASSIGNED; } + if (tm.getOperationId() != null) { + return TabletGoalState.UNASSIGNED; + } + if (tm.hasCurrent() && serversToShutdown.contains(tm.getLocation().getServerInstance())) { return TabletGoalState.SUSPENDED; } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/DeleteOperationIds.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/DeleteOperationIds.java index 6528b7e21c..6ccba834fa 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/DeleteOperationIds.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/DeleteOperationIds.java @@ -20,6 +20,7 @@ package org.apache.accumulo.manager.tableOps.split; import java.util.stream.Collectors; +import org.apache.accumulo.core.clientImpl.TableOperationsImpl; import org.apache.accumulo.core.fate.Repo; import org.apache.accumulo.core.metadata.schema.Ample; import org.apache.accumulo.core.metadata.schema.Ample.ConditionalResult.Status; @@ -68,4 +69,9 @@ public class DeleteOperationIds extends ManagerRepo { return null; } + + @Override + public String getReturn() { + return TableOperationsImpl.SPLIT_SUCCESS_MSG; + } } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/PreSplit.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/PreSplit.java index ae0bfae78a..592f552692 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/PreSplit.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/PreSplit.java @@ -121,7 +121,7 @@ public class PreSplit extends ManagerRepo { var opid = TabletOperationId.from(TabletOperationType.SPLITTING, tid); - if (tabletMetadata == null || !tabletMetadata.getOperationId().equals(opid)) { + if (tabletMetadata == null || !opid.equals(tabletMetadata.getOperationId())) { // the tablet no longer exists or we could not set the operation id, maybe another operation // was running, lets not proceed with the split. var optMeta = Optional.ofNullable(tabletMetadata); diff --git a/test/src/main/java/org/apache/accumulo/test/functional/ManagerAssignmentIT.java b/test/src/main/java/org/apache/accumulo/test/functional/ManagerAssignmentIT.java index 1b0bba78fa..93626de958 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/ManagerAssignmentIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/ManagerAssignmentIT.java @@ -28,8 +28,10 @@ import static org.junit.jupiter.api.Assertions.fail; import java.time.Duration; import java.util.Collections; import java.util.List; +import java.util.Set; import java.util.TreeSet; import java.util.function.Predicate; +import java.util.stream.Collectors; import java.util.stream.IntStream; import org.apache.accumulo.core.client.Accumulo; @@ -50,13 +52,19 @@ import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.TableId; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.manager.state.TabletManagement; import org.apache.accumulo.core.metadata.MetadataTable; import org.apache.accumulo.core.metadata.RootTable; +import org.apache.accumulo.core.metadata.schema.Ample; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection; import org.apache.accumulo.core.metadata.schema.TabletMetadata; import org.apache.accumulo.core.metadata.schema.TabletMetadata.LocationType; +import org.apache.accumulo.core.metadata.schema.TabletOperationId; +import org.apache.accumulo.core.metadata.schema.TabletOperationType; import org.apache.accumulo.core.rpc.clients.ThriftClientTypes; +import org.apache.accumulo.core.security.TablePermission; import org.apache.accumulo.core.spi.ondemand.DefaultOnDemandTabletUnloader; import org.apache.accumulo.core.tabletserver.thrift.TabletStats; import org.apache.accumulo.core.trace.TraceUtil; @@ -192,7 +200,6 @@ public class ManagerAssignmentIT extends SharedMiniClusterBase { assertNull(ondemand.getLocation()); assertEquals(flushed.getLocation().getHostPort(), ondemand.getLast().getHostPort()); assertEquals(TabletHostingGoal.ONDEMAND, ondemand.getHostingGoal()); - } } @@ -357,6 +364,49 @@ public class ManagerAssignmentIT extends SharedMiniClusterBase { } } + @Test + public void testOpidPreventsAssignment() throws Exception { + try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) { + String tableName = super.getUniqueNames(1)[0]; + + var tableId = TableId.of(prepTableForScanTest(c, tableName)); + assertEquals(0, countTabletsWithLocation(c, tableId)); + + assertEquals(Set.of("f", "m", "t"), c.tableOperations().listSplits(tableName).stream() + .map(Text::toString).collect(Collectors.toSet())); + + c.securityOperations().grantTablePermission(getPrincipal(), MetadataTable.NAME, + TablePermission.WRITE); + + try (var writer = c.createBatchWriter(MetadataTable.NAME)) { + var extent = new KeyExtent(tableId, new Text("m"), new Text("f")); + var opid = TabletOperationId.from(TabletOperationType.SPLITTING, 42L); + Mutation m = new Mutation(extent.toMetaRow()); + TabletsSection.ServerColumnFamily.OPID_COLUMN.put(m, new Value(opid.canonical())); + writer.addMutation(m); + } + + c.tableOperations().setTabletHostingGoal(tableName, new Range(), TabletHostingGoal.ALWAYS); + + Wait.waitFor(() -> countTabletsWithLocation(c, tableId) >= 3); + + // there are four tablets, but one has an operation id set and should not be assigned + assertEquals(3, countTabletsWithLocation(c, tableId)); + + try (var writer = c.createBatchWriter(MetadataTable.NAME)) { + var extent = new KeyExtent(tableId, new Text("m"), new Text("f")); + Mutation m = new Mutation(extent.toMetaRow()); + TabletsSection.ServerColumnFamily.OPID_COLUMN.putDelete(m); + writer.addMutation(m); + } + + Wait.waitFor(() -> countTabletsWithLocation(c, tableId) >= 4); + + // after the operation id is deleted the tablet should be assigned + assertEquals(4, countTabletsWithLocation(c, tableId)); + } + } + public static void loadDataForScan(AccumuloClient c, String tableName) throws MutationsRejectedException, TableNotFoundException { final byte[] empty = new byte[0]; @@ -373,6 +423,15 @@ public class ManagerAssignmentIT extends SharedMiniClusterBase { } } + public static Ample getAmple(AccumuloClient c) { + return ((ClientContext) c).getAmple(); + } + + public static long countTabletsWithLocation(AccumuloClient c, TableId tableId) { + return getAmple(c).readTablets().forTable(tableId).fetch(TabletMetadata.ColumnType.LOCATION) + .build().stream().filter(tabletMetadata -> tabletMetadata.getLocation() != null).count(); + } + public static List<TabletStats> getTabletStats(AccumuloClient c, String tableId) throws AccumuloException, AccumuloSecurityException { return ThriftClientTypes.TABLET_SERVER.execute((ClientContext) c, client -> client diff --git a/test/src/main/java/org/apache/accumulo/test/functional/TabletManagementIteratorIT.java b/test/src/main/java/org/apache/accumulo/test/functional/TabletManagementIteratorIT.java index a62100715b..1b3a14df5e 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/TabletManagementIteratorIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/TabletManagementIteratorIT.java @@ -62,8 +62,11 @@ import org.apache.accumulo.core.manager.state.tables.TableState; import org.apache.accumulo.core.manager.thrift.ManagerState; import org.apache.accumulo.core.metadata.MetadataTable; import org.apache.accumulo.core.metadata.TServerInstance; +import org.apache.accumulo.core.metadata.schema.MetadataSchema; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.CurrentLocationColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.HostingColumnFamily; +import org.apache.accumulo.core.metadata.schema.TabletOperationId; +import org.apache.accumulo.core.metadata.schema.TabletOperationType; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.util.UtilWaitThread; import org.apache.accumulo.harness.AccumuloClusterHarness; @@ -148,10 +151,16 @@ public class TabletManagementIteratorIT extends AccumuloClusterHarness { assertEquals(2, findTabletsNeedingAttention(client, metaCopy1, state), "Should have two tablets without a loc"); + // Test setting the operation id on one of the tablets in table t1. Table t1 has two tablets + // w/o a location. Only one should need attention because of the operation id. + setOperationId(client, metaCopy1, t1); + assertEquals(1, findTabletsNeedingAttention(client, metaCopy1, state), + "Should have not tablets needing attention because of operation id"); + // test the cases where the assignment is to a dead tserver reassignLocation(client, metaCopy2, t3); assertEquals(1, findTabletsNeedingAttention(client, metaCopy2, state), - "Should have one tablet that needs to be unassigned"); + "Only 1 of 2 tablets in table t1 should be returned"); // test the cases where there is ongoing merges state = new State(client) { @@ -225,6 +234,23 @@ public class TabletManagementIteratorIT extends AccumuloClusterHarness { } } + private void setOperationId(AccumuloClient client, String table, String tableNameToModify) + throws TableNotFoundException, MutationsRejectedException { + var opid = TabletOperationId.from(TabletOperationType.SPLITTING, 42L); + TableId tableIdToModify = + TableId.of(client.tableOperations().tableIdMap().get(tableNameToModify)); + try (Scanner scanner = client.createScanner(table, Authorizations.EMPTY)) { + scanner.setRange(new KeyExtent(tableIdToModify, null, null).toMetaRange()); + Entry<Key,Value> entry = scanner.iterator().next(); + Mutation m = new Mutation(entry.getKey().getRow()); + MetadataSchema.TabletsSection.ServerColumnFamily.OPID_COLUMN.put(m, + new Value(opid.canonical())); + try (BatchWriter bw = client.createBatchWriter(table)) { + bw.addMutation(m); + } + } + } + private void removeLocation(AccumuloClient client, String table, String tableNameToModify) throws TableNotFoundException, MutationsRejectedException { TableId tableIdToModify =