This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git
commit 4ba6c499e28ef347e7438b2ca3dc2b6fc79348b2 Merge: 729115a63b 96814b81f9 Author: Dave Marion <[email protected]> AuthorDate: Mon Jun 5 13:17:00 2023 +0000 Merge branch 'main' into elasticity .../org/apache/accumulo/core/cli/ConfigOpts.java | 34 --- .../accumulo/core/clientImpl/OfflineIterator.java | 5 +- .../accumulo/core/file/BloomFilterLayer.java | 6 +- .../accumulo/core/file/DispatchingFileFactory.java | 4 +- .../apache/accumulo/core/file/FileOperations.java | 26 +-- .../accumulo/core/file/rfile/RFileOperations.java | 4 +- .../apache/accumulo/core/logging/TabletLogger.java | 6 +- .../accumulo/core/metadata/AbstractTabletFile.java | 11 +- .../{TabletFile.java => ReferencedTabletFile.java} | 18 +- .../core/metadata/ScanServerRefTabletFile.java | 4 +- .../accumulo/core/metadata/StoredTabletFile.java | 55 ++++- .../apache/accumulo/core/metadata/TabletFile.java | 126 +---------- .../core/metadata/UnreferencedTabletFile.java | 4 +- .../accumulo/core/metadata/schema/Ample.java | 16 +- .../schema/ExternalCompactionMetadata.java | 15 +- .../core/metadata/schema/TabletMetadata.java | 7 +- .../org/apache/accumulo/core/summary/Gatherer.java | 43 ++-- ...FileTest.java => ReferencedTabletFileTest.java} | 20 +- .../accumulo/server/compaction/CompactionInfo.java | 4 +- .../accumulo/server/compaction/FileCompactor.java | 20 +- .../org/apache/accumulo/server/fs/FileManager.java | 66 +++--- .../org/apache/accumulo/server/fs/VolumeUtil.java | 6 +- .../server/init/FileSystemInitializer.java | 4 +- .../iterators/TabletIteratorEnvironment.java | 6 +- .../metadata/ConditionalTabletMutatorImpl.java | 4 +- .../server/metadata/TabletMutatorBase.java | 20 +- .../org/apache/accumulo/server/util/FileUtil.java | 47 ++-- .../accumulo/server/util/ListVolumesUsed.java | 3 +- .../accumulo/server/util/ManagerMetadataUtil.java | 13 +- .../accumulo/server/util/MetadataTableUtil.java | 15 +- .../accumulo/server/util/TableDiskUsage.java | 9 +- .../manager/state/RootTabletStateStoreTest.java | 3 +- .../accumulo/server/util/TableDiskUsageTest.java | 12 +- .../org/apache/accumulo/compactor/Compactor.java | 5 +- .../main/java/org/apache/accumulo/gc/GCRun.java | 2 +- .../apache/accumulo/manager/split/Splitter.java | 9 +- .../tableOps/bulkVer2/CleanUpBulkImport.java | 3 +- .../manager/tableOps/bulkVer2/LoadFiles.java | 13 +- .../manager/tableOps/split/UpdateTablets.java | 2 +- .../org/apache/accumulo/tserver/ScanServer.java | 7 +- .../accumulo/tserver/TabletClientHandler.java | 9 +- .../tserver/compactions/ExternalCompactionJob.java | 10 +- .../accumulo/tserver/tablet/CompactableImpl.java | 7 +- .../accumulo/tserver/tablet/CompactableUtils.java | 13 +- .../accumulo/tserver/tablet/DatafileManager.java | 27 +-- .../tserver/tablet/MinorCompactionTask.java | 9 +- .../accumulo/tserver/tablet/MinorCompactor.java | 5 +- .../accumulo/tserver/tablet/ScanDataSource.java | 6 +- .../accumulo/tserver/tablet/SnapshotTablet.java | 6 +- .../org/apache/accumulo/tserver/tablet/Tablet.java | 57 ++--- .../apache/accumulo/tserver/tablet/TabletBase.java | 3 +- .../apache/accumulo/tserver/tablet/TabletData.java | 10 +- .../tserver/compaction/CompactableUtilsTest.java | 9 +- .../tserver/tablet/CompactableImplTest.java | 7 +- .../test/functional/AmpleConditionalWriterIT.java | 8 +- .../test/functional/FileNormalizationIT.java | 238 +++++++++++++++++++++ .../test/functional/GarbageCollectorTrashBase.java | 4 +- .../accumulo/test/functional/PerTableCryptoIT.java | 2 +- .../accumulo/test/functional/SplitRecoveryIT.java | 22 +- .../test/performance/scan/CollectTabletStats.java | 25 +-- 60 files changed, 660 insertions(+), 494 deletions(-) diff --cc core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java index 6935d1591b,1ae49e3382..bc7dcdc559 --- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java +++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java @@@ -303,63 -242,54 +303,65 @@@ public interface Ample /** * Interface for changing a tablets persistent data. */ - interface TabletMutator { - TabletMutator putPrevEndRow(Text per); + interface TabletUpdates<T> { + T putPrevEndRow(Text per); + - T putFile(TabletFile path, DataFileValue dfv); ++ T putFile(ReferencedTabletFile path, DataFileValue dfv); ++ ++ T putFile(StoredTabletFile path, DataFileValue dfv); - TabletMutator putFile(ReferencedTabletFile path, DataFileValue dfv); + T deleteFile(StoredTabletFile path); - T putScan(TabletFile path); - TabletMutator putFile(StoredTabletFile path, DataFileValue dfv); ++ T putScan(StoredTabletFile path); - TabletMutator deleteFile(StoredTabletFile path); + T deleteScan(StoredTabletFile path); - TabletMutator putScan(StoredTabletFile path); + T putCompactionId(long compactionId); - TabletMutator deleteScan(StoredTabletFile path); + T putFlushId(long flushId); - TabletMutator putCompactionId(long compactionId); + T putLocation(Location location); - TabletMutator putFlushId(long flushId); + T deleteLocation(Location location); - TabletMutator putLocation(Location location); + T putZooLock(ServiceLock zooLock); - TabletMutator deleteLocation(Location location); + T putDirName(String dirName); - TabletMutator putZooLock(ServiceLock zooLock); + T putWal(LogEntry logEntry); - TabletMutator putDirName(String dirName); + T deleteWal(String wal); - TabletMutator putWal(LogEntry logEntry); + T deleteWal(LogEntry logEntry); - TabletMutator deleteWal(String wal); + T putTime(MetadataTime time); - T putBulkFile(TabletFile bulkref, long tid); - TabletMutator deleteWal(LogEntry logEntry); ++ T putBulkFile(ReferencedTabletFile bulkref, long tid); - T deleteBulkFile(TabletFile bulkref); - TabletMutator putTime(MetadataTime time); ++ T deleteBulkFile(ReferencedTabletFile bulkref); - TabletMutator putBulkFile(ReferencedTabletFile bulkref, long tid); + T putChopped(); - TabletMutator deleteBulkFile(ReferencedTabletFile bulkref); + T putSuspension(TServerInstance tserver, long suspensionTime); - TabletMutator putChopped(); + T deleteSuspension(); - TabletMutator putSuspension(TServerInstance tserver, long suspensionTime); + T putExternalCompaction(ExternalCompactionId ecid, ExternalCompactionMetadata ecMeta); - TabletMutator deleteSuspension(); + T deleteExternalCompaction(ExternalCompactionId ecid); - TabletMutator putExternalCompaction(ExternalCompactionId ecid, - ExternalCompactionMetadata ecMeta); + T putHostingGoal(TabletHostingGoal goal); - TabletMutator deleteExternalCompaction(ExternalCompactionId ecid); + T setHostingRequested(); + T deleteHostingRequested(); + + T putOperation(TabletOperationId opId); + + T deleteOperation(); + } + + interface TabletMutator extends TabletUpdates<TabletMutator> { /** * This method persist (or queues for persisting) previous put and deletes against this object. * Unless this method is called, previous calls will never be persisted. The purpose of this @@@ -375,161 -305,6 +377,161 @@@ void mutate(); } + /** + * A tablet operation is a mutually exclusive action that is running against a tablet. Its very + * important that every conditional mutation specifies requirements about operations in order to + * satisfy the mutual exclusion goal. This interface forces those requirements to specified by + * making it the only choice available before specifying other tablet requirements or mutations. + * + * @see MetadataSchema.TabletsSection.ServerColumnFamily#OPID_COLUMN + */ + interface OperationRequirements { + + /** + * Require a specific operation with a unique id is present. This would be normally be called by + * the code executing that operation. + */ + ConditionalTabletMutator requireOperation(TabletOperationId operationId); + + /** + * Require that no mutually exclusive operations are runnnig against this tablet. + */ + ConditionalTabletMutator requireAbsentOperation(); + + /** + * Require an entire tablet is absent, so the tablet row has no columns. If the entire tablet is + * absent, then this implies the tablet operation is also absent so there is no need to specify + * that. + */ + ConditionalTabletMutator requireAbsentTablet(); + } + + /** + * Convenience interface for handling conditional mutations with a status of REJECTED. + */ + interface RejectionHandler extends Predicate<TabletMetadata> {} + + interface ConditionalTabletMutator extends TabletUpdates<ConditionalTabletMutator> { + + /** + * Require that a tablet has no future or current location set. + */ + ConditionalTabletMutator requireAbsentLocation(); + + /** + * Require that a tablet currently has the specified future or current location. + */ + ConditionalTabletMutator requireLocation(Location location); + + /** + * Require that a tablet currently has the specified file. + */ + ConditionalTabletMutator requireFile(StoredTabletFile path); + + /** + * Require that a tablet does not have the specfied bulk load marker. + */ - ConditionalTabletMutator requireAbsentBulkFile(TabletFile bulkref); ++ ConditionalTabletMutator requireAbsentBulkFile(ReferencedTabletFile bulkref); + + /** + * Require that a tablet has the specified previous end row. + */ + ConditionalTabletMutator requirePrevEndRow(Text per); + + /** + * Requires the tablet to have the specified hosting goal before any changes are made. + */ + ConditionalTabletMutator requireHostingGoal(TabletHostingGoal tabletHostingGoal); + + /** + * <p> + * Ample provides the following features on top of the conditional writer to help automate + * handling of edges cases that arise when using the conditional writer. + * <ul> + * <li>Automatically resubmit conditional mutations with a status of + * {@link org.apache.accumulo.core.client.ConditionalWriter.Status#UNKNOWN}.</li> + * <li>When a mutation is rejected (status of + * {@link org.apache.accumulo.core.client.ConditionalWriter.Status#REJECTED}) it will read the + * tablets metadata and call the passed rejectionHandler to determine if the mutation should be + * considered as accepted.</li> + * <li>For status of + * {@link org.apache.accumulo.core.client.ConditionalWriter.Status#INVISIBLE_VISIBILITY} and + * {@link org.apache.accumulo.core.client.ConditionalWriter.Status#VIOLATED} ample will throw an + * exception. This is done so that all code does not have to deal with these unexpected + * statuses.</li> + * </ul> + * + * <p> + * The motivation behind the rejectionHandler is to help sort things out when conditional + * mutations are submitted twice and the subsequent submission is rejected even though the first + * submission was accepted. There are two causes for this. First when a threads is running in + * something like FATE it may submit a mutation and the thread dies before it sees the response. + * Later FATE will run the code again for a second time, submitting a second mutation. The + * second cause is ample resubmitting on unknown as mentioned above. Below are a few examples + * that go over how Ample will handle these different situations. + * + * <h3>Example 1</h3> + * + * <ul> + * <li>Conditional mutation CM1 with a condition requiring an absent location that sets a future + * location is submitted. When its submitted to ample a rejectionHandler is set that checks the + * future location.</li> + * <li>Inside Ample CM1 is submitted to a conditional writer and returns a status of UNKNOWN, + * but it actually succeeded. This could be caused by the mutation succeeding and the tablet + * server dying just before it reports back.</li> + * <li>Ample sees the UNKNOWN status and resubmits CM1 for a second time. Because the future + * location was set, the mutation is returned to ample with a status of rejected by the + * conditional writer.</li> + * <li>Because the mutation was rejected, ample reads the tablet metadata and calls the + * rejectionHandler. The rejectionHandler sees the future location was set and reports that + * everything is ok, therefore ample reports the status as ACCEPTED.</li> + * </ul> + * + * <h3>Example 2</h3> + * + * <ul> + * <li>Conditional mutation CM2 with a condition requiring an absent location that sets a future + * location is submitted. When its submitted to ample a rejectionHandler is set that checks the + * future location.</li> + * <li>Inside Ample CM2 is submitted to a conditional writer and returns a status of UNKNOWN, + * but it actually never made it to the tserver. This could be caused by the tablet server dying + * just after a network connection was established to send the mutation.</li> + * <li>Ample sees the UNKNOWN status and resubmits CM2 for a second time. There is no future + * location set so the mutation is returned to ample with a status of accepted by the + * conditional writer.</li> + * <li>Because the mutation was accepted, ample never calls the rejectionHandler and returns it + * as accepted.</li> + * </ul> + * + * <h3>Example 3</h3> + * + * <ul> + * <li>Conditional mutation CM3 with a condition requiring an absent operation that sets the + * operation id to a fate transaction id is submitted. When it's submitted to ample a + * rejectionHandler is set that checks if the operation id equals the fate transaction id.</li> + * <li>The thread running the fate operation dies after submitting the mutation but before + * seeing it was actually accepted.</li> + * <li>Later fate creates an identical mutation to CM3, lets call it CM3.2, and resubmits it + * with the same rejection handler.</li> + * <li>CM3.2 is rejected because the operation id is not absent.</li> + * <li>Because the mutation was rejected, ample calls the rejectionHandler. The rejectionHandler + * sees in the tablet metadata that the operation id is its fate transaction id and reports back + * true</li> + * <li>When rejectionHandler reports true, ample reports the mutation as accepted.</li> + * </ul> + * + * @param rejectionHandler if the conditional mutation comes back with a status of + * {@link org.apache.accumulo.core.client.ConditionalWriter.Status#REJECTED} then read + * the tablets metadata and apply this check to see if it should be considered as + * {@link org.apache.accumulo.core.client.ConditionalWriter.Status#ACCEPTED} in the + * return of {@link ConditionalTabletsMutator#process()}. The rejection handler is only + * called when a tablets metadata exists. If ample reads a tablet's metadata and the + * tablet no longer exists, then ample will not call the rejectionHandler with null. It + * will let the rejected status carry forward in this case. + */ + void submit(RejectionHandler rejectionHandler); + } + /** * Insert ScanServer references to Tablet files * diff --cc core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMetadata.java index 7e043c258a,29243b92fa..f4757919f3 --- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMetadata.java +++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMetadata.java @@@ -59,7 -52,7 +59,6 @@@ import org.apache.accumulo.core.metadat import org.apache.accumulo.core.metadata.StoredTabletFile; import org.apache.accumulo.core.metadata.SuspendingTServer; import org.apache.accumulo.core.metadata.TServerInstance; - import org.apache.accumulo.core.metadata.TabletFile; -import org.apache.accumulo.core.metadata.TabletLocationState; import org.apache.accumulo.core.metadata.TabletState; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.BulkFileColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ChoppedColumnFamily; @@@ -96,15 -87,15 +95,15 @@@ public class TabletMetadata private boolean sawPrevEndRow = false; private Text oldPrevEndRow; private boolean sawOldPrevEndRow = false; - private Text endRow; - private Location location; + protected Text endRow; + protected Location location; private Map<StoredTabletFile,DataFileValue> files; private List<StoredTabletFile> scans; - private Map<TabletFile,Long> loadedFiles; + private Map<StoredTabletFile,Long> loadedFiles; - private EnumSet<ColumnType> fetchedCols; - private KeyExtent extent; - private Location last; - private SuspendingTServer suspend; + protected EnumSet<ColumnType> fetchedCols; + protected KeyExtent extent; + protected Location last; + protected SuspendingTServer suspend; private String dirName; private MetadataTime time; private String cloned; @@@ -458,9 -397,8 +457,9 @@@ final var logsBuilder = ImmutableList.<LogEntry>builder(); final var extCompBuilder = ImmutableMap.<ExternalCompactionId,ExternalCompactionMetadata>builder(); - final var loadedFilesBuilder = ImmutableMap.<TabletFile,Long>builder(); + final var loadedFilesBuilder = ImmutableMap.<StoredTabletFile,Long>builder(); ByteSequence row = null; + final var requestIdsBuilder = ImmutableMap.<Long,TServerInstance>builder(); while (rowIter.hasNext()) { final Entry<Key,Value> kv = rowIter.next(); diff --cc server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletMutatorImpl.java index 5f784790e2,0000000000..45e6664f46 mode 100644,000000..100644 --- a/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletMutatorImpl.java +++ b/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletMutatorImpl.java @@@ -1,172 -1,0 +1,172 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.accumulo.server.metadata; + +import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.HostingColumnFamily.GOAL_COLUMN; +import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily.OPID_COLUMN; +import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN; +import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily.encodePrevEndRow; + +import java.util.function.BiConsumer; +import java.util.function.Consumer; + +import org.apache.accumulo.core.client.IteratorSetting; +import org.apache.accumulo.core.client.admin.TabletHostingGoal; +import org.apache.accumulo.core.clientImpl.TabletHostingGoalUtil; +import org.apache.accumulo.core.data.Condition; +import org.apache.accumulo.core.data.ConditionalMutation; +import org.apache.accumulo.core.dataImpl.KeyExtent; ++import org.apache.accumulo.core.metadata.ReferencedTabletFile; +import org.apache.accumulo.core.metadata.StoredTabletFile; - import org.apache.accumulo.core.metadata.TabletFile; +import org.apache.accumulo.core.metadata.schema.Ample; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.BulkFileColumnFamily; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily; +import org.apache.accumulo.core.metadata.schema.TabletMetadata; +import org.apache.accumulo.core.metadata.schema.TabletMetadata.Location; +import org.apache.accumulo.core.metadata.schema.TabletOperationId; +import org.apache.accumulo.server.ServerContext; +import org.apache.accumulo.server.metadata.iterators.LocationExistsIterator; +import org.apache.accumulo.server.metadata.iterators.PresentIterator; +import org.apache.accumulo.server.metadata.iterators.TabletExistsIterator; +import org.apache.hadoop.io.Text; + +import com.google.common.base.Preconditions; + +public class ConditionalTabletMutatorImpl extends TabletMutatorBase<Ample.ConditionalTabletMutator> + implements Ample.ConditionalTabletMutator, Ample.OperationRequirements { + + private static final int INITIAL_ITERATOR_PRIO = 1000000; + + private final ConditionalMutation mutation; + private final Consumer<ConditionalMutation> mutationConsumer; + private final Ample.ConditionalTabletsMutator parent; + + private final BiConsumer<KeyExtent,Ample.RejectionHandler> rejectionHandlerConsumer; + + private final KeyExtent extent; + + private boolean sawOperationRequirement = false; + + protected ConditionalTabletMutatorImpl(Ample.ConditionalTabletsMutator parent, + ServerContext context, KeyExtent extent, Consumer<ConditionalMutation> mutationConsumer, + BiConsumer<KeyExtent,Ample.RejectionHandler> rejectionHandlerConsumer) { + super(context, new ConditionalMutation(extent.toMetaRow())); + this.mutation = (ConditionalMutation) super.mutation; + this.mutationConsumer = mutationConsumer; + this.parent = parent; + this.rejectionHandlerConsumer = rejectionHandlerConsumer; + this.extent = extent; + } + + @Override + public Ample.ConditionalTabletMutator requireAbsentLocation() { + Preconditions.checkState(updatesEnabled, "Cannot make updates after calling mutate."); + IteratorSetting is = new IteratorSetting(INITIAL_ITERATOR_PRIO, LocationExistsIterator.class); + Condition c = new Condition("", "").setIterators(is); + mutation.addCondition(c); + return this; + } + + @Override + public Ample.ConditionalTabletMutator requireLocation(Location location) { + Preconditions.checkState(updatesEnabled, "Cannot make updates after calling mutate."); + Preconditions.checkArgument(location.getType() == TabletMetadata.LocationType.FUTURE + || location.getType() == TabletMetadata.LocationType.CURRENT); + Condition c = new Condition(getLocationFamily(location.getType()), location.getSession()) + .setValue(location.getHostPort()); + mutation.addCondition(c); + return this; + } + + @Override + public Ample.ConditionalTabletMutator requireFile(StoredTabletFile path) { + Preconditions.checkState(updatesEnabled, "Cannot make updates after calling mutate."); + IteratorSetting is = new IteratorSetting(INITIAL_ITERATOR_PRIO, PresentIterator.class); + Condition c = new Condition(DataFileColumnFamily.NAME, path.getMetaUpdateDeleteText()) + .setValue(PresentIterator.VALUE).setIterators(is); + mutation.addCondition(c); + return this; + } + + @Override - public Ample.ConditionalTabletMutator requireAbsentBulkFile(TabletFile bulkref) { ++ public Ample.ConditionalTabletMutator requireAbsentBulkFile(ReferencedTabletFile bulkref) { + Preconditions.checkState(updatesEnabled, "Cannot make updates after calling mutate."); + Condition c = new Condition(BulkFileColumnFamily.NAME, bulkref.getMetaInsertText()); + mutation.addCondition(c); + return this; + } + + @Override + public Ample.ConditionalTabletMutator requirePrevEndRow(Text per) { + Preconditions.checkState(updatesEnabled, "Cannot make updates after calling mutate."); + Condition c = + new Condition(PREV_ROW_COLUMN.getColumnFamily(), PREV_ROW_COLUMN.getColumnQualifier()) + .setValue(encodePrevEndRow(per).get()); + mutation.addCondition(c); + return this; + } + + @Override + public Ample.ConditionalTabletMutator requireHostingGoal(TabletHostingGoal goal) { + Preconditions.checkState(updatesEnabled, "Cannot make updates after calling mutate."); + Condition c = new Condition(GOAL_COLUMN.getColumnFamily(), GOAL_COLUMN.getColumnQualifier()) + .setValue(TabletHostingGoalUtil.toValue(goal).get()); + mutation.addCondition(c); + return this; + } + + @Override + public Ample.ConditionalTabletMutator requireAbsentTablet() { + Preconditions.checkState(updatesEnabled, "Cannot make updates after calling mutate."); + IteratorSetting is = new IteratorSetting(INITIAL_ITERATOR_PRIO, TabletExistsIterator.class); + Condition c = new Condition("", "").setIterators(is); + mutation.addCondition(c); + sawOperationRequirement = true; + return this; + } + + @Override + public Ample.ConditionalTabletMutator requireAbsentOperation() { + Preconditions.checkState(updatesEnabled, "Cannot make updates after calling mutate."); + Condition c = new Condition(OPID_COLUMN.getColumnFamily(), OPID_COLUMN.getColumnQualifier()); + mutation.addCondition(c); + sawOperationRequirement = true; + return this; + } + + @Override + public Ample.ConditionalTabletMutator requireOperation(TabletOperationId opid) { + Preconditions.checkState(updatesEnabled, "Cannot make updates after calling mutate."); + Condition c = new Condition(OPID_COLUMN.getColumnFamily(), OPID_COLUMN.getColumnQualifier()) + .setValue(opid.canonical()); + mutation.addCondition(c); + sawOperationRequirement = true; + return this; + } + + @Override + public void submit(Ample.RejectionHandler rejectionCheck) { + Preconditions.checkState(updatesEnabled, "Cannot make updates after calling mutate."); + Preconditions.checkState(sawOperationRequirement, "No operation requirements were seen"); + getMutation(); + mutationConsumer.accept(mutation); + rejectionHandlerConsumer.accept(extent, rejectionCheck); + } +} diff --cc server/base/src/main/java/org/apache/accumulo/server/metadata/TabletMutatorBase.java index a19905e3d2,620bf36ccf..0e3a48102e --- a/server/base/src/main/java/org/apache/accumulo/server/metadata/TabletMutatorBase.java +++ b/server/base/src/main/java/org/apache/accumulo/server/metadata/TabletMutatorBase.java @@@ -28,8 -27,8 +29,7 @@@ import org.apache.accumulo.core.metadat import org.apache.accumulo.core.metadata.StoredTabletFile; import org.apache.accumulo.core.metadata.SuspendingTServer; import org.apache.accumulo.core.metadata.TServerInstance; - import org.apache.accumulo.core.metadata.TabletFile; import org.apache.accumulo.core.metadata.schema.Ample; -import org.apache.accumulo.core.metadata.schema.Ample.TabletMutator; import org.apache.accumulo.core.metadata.schema.DataFileValue; import org.apache.accumulo.core.metadata.schema.ExternalCompactionId; import org.apache.accumulo.core.metadata.schema.ExternalCompactionMetadata; @@@ -98,24 -81,32 +98,32 @@@ public abstract class TabletMutatorBase } @Override - public T putFile(TabletFile path, DataFileValue dfv) { - public Ample.TabletMutator putFile(ReferencedTabletFile path, DataFileValue dfv) { ++ public T putFile(ReferencedTabletFile path, DataFileValue dfv) { Preconditions.checkState(updatesEnabled, "Cannot make updates after calling mutate."); mutation.put(DataFileColumnFamily.NAME, path.getMetaInsertText(), new Value(dfv.encode())); - return this; + return getThis(); } + @Override - public Ample.TabletMutator putFile(StoredTabletFile path, DataFileValue dfv) { ++ public T putFile(StoredTabletFile path, DataFileValue dfv) { + Preconditions.checkState(updatesEnabled, "Cannot make updates after calling mutate."); + mutation.put(DataFileColumnFamily.NAME, path.getMetaUpdateDeleteText(), + new Value(dfv.encode())); - return this; ++ return getThis(); + } + @Override - public Ample.TabletMutator deleteFile(StoredTabletFile path) { + public T deleteFile(StoredTabletFile path) { Preconditions.checkState(updatesEnabled, "Cannot make updates after calling mutate."); mutation.putDelete(DataFileColumnFamily.NAME, path.getMetaUpdateDeleteText()); - return this; + return getThis(); } @Override - public T putScan(TabletFile path) { - public Ample.TabletMutator putScan(StoredTabletFile path) { ++ public T putScan(StoredTabletFile path) { Preconditions.checkState(updatesEnabled, "Cannot make updates after calling mutate."); - mutation.put(ScanFileColumnFamily.NAME, path.getMetaInsertText(), new Value()); + mutation.put(ScanFileColumnFamily.NAME, path.getMetaUpdateDeleteText(), new Value()); - return this; + return getThis(); } @Override @@@ -204,7 -195,7 +212,7 @@@ } @Override - public T putBulkFile(TabletFile bulkref, long tid) { - public Ample.TabletMutator putBulkFile(ReferencedTabletFile bulkref, long tid) { ++ public T putBulkFile(ReferencedTabletFile bulkref, long tid) { Preconditions.checkState(updatesEnabled, "Cannot make updates after calling mutate."); mutation.put(BulkFileColumnFamily.NAME, bulkref.getMetaInsertText(), new Value(FateTxId.formatTid(tid))); @@@ -212,10 -203,10 +220,10 @@@ } @Override - public T deleteBulkFile(TabletFile bulkref) { - public Ample.TabletMutator deleteBulkFile(ReferencedTabletFile bulkref) { ++ public T deleteBulkFile(ReferencedTabletFile bulkref) { Preconditions.checkState(updatesEnabled, "Cannot make updates after calling mutate."); mutation.putDelete(BulkFileColumnFamily.NAME, bulkref.getMetaInsertText()); - return this; + return getThis(); } @Override diff --cc server/base/src/main/java/org/apache/accumulo/server/util/ManagerMetadataUtil.java index f578e976a4,670802a3ec..4320f61e2e --- a/server/base/src/main/java/org/apache/accumulo/server/util/ManagerMetadataUtil.java +++ b/server/base/src/main/java/org/apache/accumulo/server/util/ManagerMetadataUtil.java @@@ -72,13 -72,9 +72,13 @@@ public class ManagerMetadataUtil public static void addNewTablet(ServerContext context, KeyExtent extent, String dirName, TServerInstance tServerInstance, Map<StoredTabletFile,DataFileValue> datafileSizes, - Map<Long,? extends Collection<TabletFile>> bulkLoadedFiles, MetadataTime time, + Map<Long,? extends Collection<ReferencedTabletFile>> bulkLoadedFiles, MetadataTime time, long lastFlushID, long lastCompactID, ServiceLock zooLock) { + // ELASTICITY_TODO intentionally not using conditional mutations for this code because its only + // called when tablets split. Tablet splitting will drastically change, so there is no need to + // update this to use conditional mutations ATM. + TabletMutator tablet = context.getAmple().mutateTablet(extent); tablet.putPrevEndRow(extent.prevEndRow()); tablet.putZooLock(zooLock); @@@ -98,10 -94,11 +98,11 @@@ tablet.deleteLocation(Location.future(tServerInstance)); } - datafileSizes.forEach(tablet::putFile); - datafileSizes.forEach((key, value) -> tablet.putFile(key.getTabletFile(), value)); ++ datafileSizes.forEach((key, value) -> tablet.putFile(key, value)); - for (Entry<Long,? extends Collection<TabletFile>> entry : bulkLoadedFiles.entrySet()) { - for (TabletFile ref : entry.getValue()) { + for (Entry<Long,? extends Collection<ReferencedTabletFile>> entry : bulkLoadedFiles + .entrySet()) { + for (ReferencedTabletFile ref : entry.getValue()) { tablet.putBulkFile(ref, entry.getKey()); } } diff --cc server/base/src/test/java/org/apache/accumulo/server/manager/state/RootTabletStateStoreTest.java index 9bfa59dfe7,ae5aad3688..90272b3bfd --- a/server/base/src/test/java/org/apache/accumulo/server/manager/state/RootTabletStateStoreTest.java +++ b/server/base/src/test/java/org/apache/accumulo/server/manager/state/RootTabletStateStoreTest.java @@@ -22,40 -22,21 +22,39 @@@ import static java.nio.charset.Standard import static org.apache.accumulo.server.init.ZooKeeperInitializer.getInitialRootTabletJson; import static org.easymock.EasyMock.expect; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.junit.jupiter.api.Assertions.fail; +import static org.junit.jupiter.api.Assertions.assertTrue; +import java.util.Collection; import java.util.Collections; +import java.util.Iterator; import java.util.List; +import java.util.Map; +import java.util.OptionalLong; +import java.util.Set; +import java.util.SortedMap; -import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.client.ConditionalWriter; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.client.admin.TabletHostingGoal; +import org.apache.accumulo.core.data.ConditionalMutation; +import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.TableId; +import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.dataImpl.KeyExtent; +import org.apache.accumulo.core.manager.state.TabletManagement; import org.apache.accumulo.core.metadata.RootTable; +import org.apache.accumulo.core.metadata.StoredTabletFile; +import org.apache.accumulo.core.metadata.SuspendingTServer; import org.apache.accumulo.core.metadata.TServerInstance; - import org.apache.accumulo.core.metadata.TabletFile; -import org.apache.accumulo.core.metadata.TabletLocationState; -import org.apache.accumulo.core.metadata.TabletLocationState.BadLocationStateException; +import org.apache.accumulo.core.metadata.TabletState; import org.apache.accumulo.core.metadata.schema.Ample; +import org.apache.accumulo.core.metadata.schema.DataFileValue; +import org.apache.accumulo.core.metadata.schema.ExternalCompactionId; +import org.apache.accumulo.core.metadata.schema.ExternalCompactionMetadata; +import org.apache.accumulo.core.metadata.schema.MetadataTime; import org.apache.accumulo.core.metadata.schema.RootTabletMetadata; import org.apache.accumulo.core.metadata.schema.TabletMetadata; import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType; @@@ -76,142 -53,6 +75,142 @@@ import com.google.common.net.HostAndPor public class RootTabletStateStoreTest { + private static class BrokenTabletMetadata extends TabletMetadata { + private final TabletMetadata tm; + + BrokenTabletMetadata(TabletMetadata tm) { + this.tm = tm; + } + + public boolean equals(Object obj) { + return tm.equals(obj); + } + + public TableId getTableId() { + return TableId.of("0"); + } + + public KeyExtent getExtent() { + return new KeyExtent(TableId.of("0"), null, null); + } + + public Text getPrevEndRow() { + return tm.getPrevEndRow(); + } + + public boolean sawPrevEndRow() { + return tm.sawPrevEndRow(); + } + + public Text getOldPrevEndRow() { + return tm.getOldPrevEndRow(); + } + + public boolean sawOldPrevEndRow() { + return tm.sawOldPrevEndRow(); + } + + public Text getEndRow() { + return tm.getEndRow(); + } + + public Location getLocation() { + return tm.getLocation(); + } + + public boolean hasCurrent() { + return tm.hasCurrent(); + } + - public Map<TabletFile,Long> getLoaded() { ++ public Map<StoredTabletFile,Long> getLoaded() { + return tm.getLoaded(); + } + + public Location getLast() { + return tm.getLast(); + } + + public SuspendingTServer getSuspend() { + return tm.getSuspend(); + } + + public Collection<StoredTabletFile> getFiles() { + return tm.getFiles(); + } + + public Map<StoredTabletFile,DataFileValue> getFilesMap() { + return tm.getFilesMap(); + } + + public Collection<LogEntry> getLogs() { + return tm.getLogs(); + } + + public List<StoredTabletFile> getScans() { + return tm.getScans(); + } + + public String getDirName() { + return tm.getDirName(); + } + + public MetadataTime getTime() { + return tm.getTime(); + } + + public String getCloned() { + return tm.getCloned(); + } + + public OptionalLong getFlushId() { + return tm.getFlushId(); + } + + public OptionalLong getCompactId() { + return tm.getCompactId(); + } + + public Double getSplitRatio() { + return tm.getSplitRatio(); + } + + public boolean hasChopped() { + return tm.hasChopped(); + } + + public TabletHostingGoal getHostingGoal() { + return tm.getHostingGoal(); + } + + public boolean getHostingRequested() { + return tm.getHostingRequested(); + } + + public SortedMap<Key,Value> getKeyValues() { + return tm.getKeyValues(); + } + + public TabletState getTabletState(Set<TServerInstance> liveTServers) { + return tm.getTabletState(liveTServers); + } + + public Map<ExternalCompactionId,ExternalCompactionMetadata> getExternalCompactions() { + return tm.getExternalCompactions(); + } + + public TabletOperationId getOperationId() { + return tm.getOperationId(); + } + + public int hashCode() { + return tm.hashCode(); + } + + public String toString() { + return tm.toString(); + } + } + private static class TestAmple implements Ample { private String json = diff --cc server/manager/src/main/java/org/apache/accumulo/manager/split/Splitter.java index 6fc79a4b81,0000000000..c52bf4e2c5 mode 100644,000000..100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/split/Splitter.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/split/Splitter.java @@@ -1,198 -1,0 +1,199 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.manager.split; + +import static java.nio.charset.StandardCharsets.UTF_8; + +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; + +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.TableId; +import org.apache.accumulo.core.dataImpl.KeyExtent; +import org.apache.accumulo.core.metadata.StoredTabletFile; +import org.apache.accumulo.core.metadata.TabletFile; +import org.apache.accumulo.core.metadata.schema.TabletMetadata; +import org.apache.accumulo.server.ServerContext; +import org.apache.accumulo.server.conf.TableConfiguration; +import org.apache.accumulo.server.util.FileUtil; +import org.apache.accumulo.server.util.FileUtil.FileInfo; + +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.CacheLoader; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.LoadingCache; +import com.github.benmanes.caffeine.cache.Weigher; +import com.google.common.hash.HashCode; +import com.google.common.hash.Hashing; + +public class Splitter { + + private final ExecutorService splitExecutor; + + Cache<KeyExtent,KeyExtent> splitsStarting; + + Cache<KeyExtent,HashCode> unsplittable; + + private static class CacheKey { + + final TableId tableId; + final TabletFile tabletFile; + + public CacheKey(TableId tableId, TabletFile tabletFile) { + this.tableId = tableId; + this.tabletFile = tabletFile; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + CacheKey cacheKey = (CacheKey) o; + return Objects.equals(tableId, cacheKey.tableId) + && Objects.equals(tabletFile, cacheKey.tabletFile); + } + + @Override + public int hashCode() { + return Objects.hash(tableId, tabletFile); + } + + } + + LoadingCache<CacheKey,FileInfo> splitFileCache; + + public static int weigh(KeyExtent keyExtent) { + int size = 0; + size += keyExtent.tableId().toString().length(); + if (keyExtent.endRow() != null) { + size += keyExtent.endRow().getLength(); + } + if (keyExtent.prevEndRow() != null) { + size += keyExtent.prevEndRow().getLength(); + } + return size; + } + + public Splitter(ServerContext context) { + this.splitExecutor = context.threadPools().createExecutorService(context.getConfiguration(), + Property.MANAGER_SPLIT_WORKER_THREADS, true); + - Weigher<CacheKey,FileInfo> weigher = - (key, info) -> key.tableId.canonical().length() + key.tabletFile.getPathStr().length() - + info.getFirstRow().getLength() + info.getLastRow().getLength(); ++ Weigher<CacheKey, ++ FileInfo> weigher = (key, info) -> key.tableId.canonical().length() ++ + key.tabletFile.getPath().toString().length() + info.getFirstRow().getLength() ++ + info.getLastRow().getLength(); + + CacheLoader<CacheKey,FileInfo> loader = new CacheLoader<>() { + @Override + public FileInfo load(CacheKey key) throws Exception { + TableConfiguration tableConf = context.getTableConfiguration(key.tableId); + return FileUtil.tryToGetFirstAndLastRows(context, tableConf, Set.of(key.tabletFile)) + .get(key.tabletFile); + } + }; + + splitFileCache = Caffeine.newBuilder().expireAfterAccess(10, TimeUnit.MINUTES) + .maximumWeight(10_000_000L).weigher(weigher).build(loader); + + Weigher<KeyExtent,KeyExtent> weigher2 = (keyExtent, keyExtent2) -> weigh(keyExtent); + + // Tracks splits starting, but not forever in case something in the code does not remove it. + splitsStarting = Caffeine.newBuilder().expireAfterAccess(3, TimeUnit.HOURS) + .maximumWeight(10_000_000L).weigher(weigher2).build(); + + Weigher<KeyExtent,HashCode> weigher3 = (keyExtent, hc) -> { + return weigh(keyExtent) + hc.bits() / 8; + }; + + unsplittable = Caffeine.newBuilder().expireAfterAccess(24, TimeUnit.HOURS) + .maximumWeight(10_000_000L).weigher(weigher3).build(); + } + + public synchronized void start() {} + + public synchronized void stop() { + splitExecutor.shutdownNow(); + } + + public FileInfo getCachedFileInfo(TableId tableId, TabletFile tabletFile) { + return splitFileCache.get(new CacheKey(tableId, tabletFile)); + } + + private HashCode caclulateFilesHash(TabletMetadata tabletMetadata) { + var hasher = Hashing.goodFastHash(128).newHasher(); - tabletMetadata.getFiles().stream().map(StoredTabletFile::getPathStr).sorted() ++ tabletMetadata.getFiles().stream().map(StoredTabletFile::getNormalizedPathStr).sorted() + .forEach(path -> hasher.putString(path, UTF_8)); + return hasher.hash(); + } + + /** + * This tablet met the criteria for split but inspection could not find a split point. Remember + * this to avoid wasting time on future inspections until its files change. + */ + public void rememberUnsplittable(TabletMetadata tablet) { + unsplittable.put(tablet.getExtent(), caclulateFilesHash(tablet)); + } + + /** + * If tablet has not been marked as unsplittable, or file set has changed since being marked + * splittable, then return true. Else false. + */ + public boolean isSplittable(TabletMetadata tablet) { + if (splitsStarting.getIfPresent(tablet.getExtent()) != null) { + return false; + } + + var hashCode = unsplittable.getIfPresent(tablet.getExtent()); + + if (hashCode != null) { + if (hashCode.equals(caclulateFilesHash(tablet))) { + return false; + } else { + // We know that the list of files for this tablet have changed + // so we can remove it from the set of unsplittable tablets. + unsplittable.invalidate(tablet.getExtent()); + } + } + + return true; + } + + /** + * Temporarily remember that the process of splitting is starting for this tablet making + * {@link #isSplittable(TabletMetadata)} return false in the future. + */ + public boolean addSplitStarting(KeyExtent extent) { + Objects.requireNonNull(extent); + return splitsStarting.asMap().put(extent, extent) == null; + } + + public void removeSplitStarting(KeyExtent extent) { + splitsStarting.invalidate(extent); + } + + public void executeSplit(SplitTask splitTask) { + splitExecutor.execute(splitTask); + } +} diff --cc server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/CleanUpBulkImport.java index 139762d557,4a1def08d7..5d34279653 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/CleanUpBulkImport.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/CleanUpBulkImport.java @@@ -91,54 -86,7 +91,55 @@@ public class CleanUpBulkImport extends manager.removeBulkImportStatus(info.sourceDir); return null; } + + private static void removeBulkLoadEntries(Ample ample, TableId tableId, long tid, Text firstSplit, + Text lastSplit) { + + Retry retry = Retry.builder().infiniteRetries().retryAfter(100, MILLISECONDS) + .incrementBy(100, MILLISECONDS).maxWait(1, SECONDS).backOffFactor(1.5) + .logInterval(3, MINUTES).createRetry(); + + while (true) { + try ( + var tablets = ample.readTablets().forTable(tableId).overlapping(firstSplit, lastSplit) + .checkConsistency().fetch(ColumnType.PREV_ROW, ColumnType.LOADED).build(); + var tabletsMutator = ample.conditionallyMutateTablets()) { + + for (var tablet : tablets) { + if (tablet.getLoaded().values().stream().anyMatch(l -> l == tid)) { + var tabletMutator = + tabletsMutator.mutateTablet(tablet.getExtent()).requireAbsentOperation(); + tablet.getLoaded().entrySet().stream().filter(entry -> entry.getValue() == tid) - .map(Map.Entry::getKey).forEach(tabletMutator::deleteBulkFile); ++ .map(Map.Entry::getKey) ++ .forEach(stf -> tabletMutator.deleteBulkFile(stf.getTabletFile())); + tabletMutator.submit(tm -> false); + } + } + + var results = tabletsMutator.process(); + + if (results.values().stream() + .anyMatch(condResult -> condResult.getStatus() != Status.ACCEPTED)) { + + results.forEach((extent, condResult) -> { + if (condResult.getStatus() != Status.ACCEPTED) { + var metadata = condResult.readMetadata(); + log.debug("Tablet update failed {} {} {} {} ", FateTxId.formatTid(tid), extent, + condResult.getStatus(), metadata.getOperationId()); + } + }); + + try { + retry.waitForNextAttempt(log, + String.format("%s tableId:%s conditional mutations to delete load markers failed.", + FateTxId.formatTid(tid), tableId)); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } else { + break; + } + } + } + } } diff --cc server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java index 1efd621a0e,27d060ad00..d69edbdc8e --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java @@@ -34,20 -39,31 +34,18 @@@ import org.apache.accumulo.core.clientI import org.apache.accumulo.core.clientImpl.bulk.Bulk.Files; import org.apache.accumulo.core.clientImpl.bulk.BulkSerialize; import org.apache.accumulo.core.clientImpl.bulk.LoadMappingIterator; -import org.apache.accumulo.core.conf.Property; -import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.dataImpl.KeyExtent; -import org.apache.accumulo.core.dataImpl.thrift.TKeyExtent; import org.apache.accumulo.core.fate.FateTxId; import org.apache.accumulo.core.fate.Repo; -import org.apache.accumulo.core.manager.state.tables.TableState; import org.apache.accumulo.core.manager.thrift.BulkImportState; - import org.apache.accumulo.core.metadata.TabletFile; -import org.apache.accumulo.core.metadata.MetadataTable; + import org.apache.accumulo.core.metadata.ReferencedTabletFile; -import org.apache.accumulo.core.metadata.StoredTabletFile; +import org.apache.accumulo.core.metadata.schema.Ample; +import org.apache.accumulo.core.metadata.schema.Ample.ConditionalResult.Status; import org.apache.accumulo.core.metadata.schema.DataFileValue; --import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily; import org.apache.accumulo.core.metadata.schema.TabletMetadata; -import org.apache.accumulo.core.metadata.schema.TabletMetadata.Location; import org.apache.accumulo.core.metadata.schema.TabletsMetadata; -import org.apache.accumulo.core.rpc.ThriftUtil; -import org.apache.accumulo.core.rpc.clients.ThriftClientTypes; -import org.apache.accumulo.core.tabletingest.thrift.DataFileInfo; -import org.apache.accumulo.core.tabletingest.thrift.TabletIngestClientService.Client; -import org.apache.accumulo.core.trace.TraceUtil; -import org.apache.accumulo.core.util.MapCounter; import org.apache.accumulo.core.util.PeekingIterator; --import org.apache.accumulo.core.util.TextUtil; import org.apache.accumulo.manager.Manager; import org.apache.accumulo.manager.tableOps.ManagerRepo; import org.apache.accumulo.server.fs.VolumeManager; @@@ -109,38 -128,175 +107,39 @@@ class LoadFiles extends ManagerRepo this.manager = manager; this.tid = tid; this.setTime = setTime; + conditionalMutator = manager.getContext().getAmple().conditionallyMutateTablets(); } - abstract void load(List<TabletMetadata> tablets, Files files) throws Exception; - - abstract long finish() throws Exception; - } - - private static class OnlineLoader extends Loader { - - long timeInMillis; - String fmtTid; - int locationLess = 0; - - // track how many tablets were sent load messages per tablet server - MapCounter<HostAndPort> loadMsgs; - - // Each RPC to a tablet server needs to check in zookeeper to see if the transaction is still - // active. The purpose of this map is to group load request by tablet servers inorder to do less - // RPCs. Less RPCs will result in less calls to Zookeeper. - Map<HostAndPort,Map<TKeyExtent,Map<String,DataFileInfo>>> loadQueue; - private int queuedDataSize = 0; - - @Override - void start(Path bulkDir, Manager manager, long tid, boolean setTime) throws Exception { - super.start(bulkDir, manager, tid, setTime); - - timeInMillis = manager.getConfiguration().getTimeInMillis(Property.MANAGER_BULK_TIMEOUT); - fmtTid = FateTxId.formatTid(tid); - - loadMsgs = new MapCounter<>(); - - loadQueue = new HashMap<>(); - } - - private void sendQueued(int threshhold) { - if (queuedDataSize > threshhold || threshhold == 0) { - loadQueue.forEach((server, tabletFiles) -> { - - if (log.isTraceEnabled()) { - log.trace("{} asking {} to bulk import {} files for {} tablets", fmtTid, server, - tabletFiles.values().stream().mapToInt(Map::size).sum(), tabletFiles.size()); - } - - Client client = null; - try { - client = ThriftUtil.getClient(ThriftClientTypes.TABLET_INGEST, server, - manager.getContext(), timeInMillis); - client.loadFiles(TraceUtil.traceInfo(), manager.getContext().rpcCreds(), tid, - bulkDir.toString(), tabletFiles, setTime); - } catch (TException ex) { - log.debug("rpc failed server: " + server + ", " + fmtTid + " " + ex.getMessage(), ex); - } finally { - ThriftUtil.returnClient(client, manager.getContext()); - } - }); - - loadQueue.clear(); - queuedDataSize = 0; - } - } - - private void addToQueue(HostAndPort server, KeyExtent extent, - Map<String,DataFileInfo> thriftImports) { - if (!thriftImports.isEmpty()) { - loadMsgs.increment(server, 1); - - Map<String,DataFileInfo> prev = loadQueue.computeIfAbsent(server, k -> new HashMap<>()) - .putIfAbsent(extent.toThrift(), thriftImports); - - Preconditions.checkState(prev == null, "Unexpectedly saw extent %s twice", extent); - - // keep a very rough estimate of how much is memory so we can send if over a few megs is - // buffered - queuedDataSize += thriftImports.keySet().stream().mapToInt(String::length).sum() - + server.getHost().length() + 4 + thriftImports.size() * 32; - } - } - - @Override void load(List<TabletMetadata> tablets, Files files) { - byte[] fam = TextUtil.getBytes(DataFileColumnFamily.NAME); - for (TabletMetadata tablet : tablets) { - // send files to tablet sever - // ideally there should only be one tablet location to send all the files - Location location = tablet.getLocation(); - HostAndPort server = null; - if (location == null) { - locationLess++; - continue; - } else { - server = location.getHostAndPort(); - } - - Set<ReferencedTabletFile> loadedFiles = tablet.getLoaded().keySet().stream() - .map(StoredTabletFile::getTabletFile).collect(Collectors.toSet()); - - Map<String,DataFileInfo> thriftImports = new HashMap<>(); + for (TabletMetadata tablet : tablets) { - Map<TabletFile,DataFileValue> filesToLoad = new HashMap<>(); ++ Map<ReferencedTabletFile,DataFileValue> filesToLoad = new HashMap<>(); for (final Bulk.FileInfo fileInfo : files) { - filesToLoad.put(new TabletFile(new Path(bulkDir, fileInfo.getFileName())), - Path fullPath = new Path(bulkDir, fileInfo.getFileName()); - ReferencedTabletFile bulkFile = new ReferencedTabletFile(fullPath); - - if (!loadedFiles.contains(bulkFile)) { - thriftImports.put(fileInfo.getFileName(), new DataFileInfo(fileInfo.getEstFileSize())); - } ++ filesToLoad.put(new ReferencedTabletFile(new Path(bulkDir, fileInfo.getFileName())), + new DataFileValue(fileInfo.getEstFileSize(), fileInfo.getEstNumEntries())); } - addToQueue(server, tablet.getExtent(), thriftImports); - } - - sendQueued(4 * 1024 * 1024); - } - - @Override - long finish() { - - sendQueued(0); - - long sleepTime = 0; - if (loadMsgs.size() > 0) { - // find which tablet server had the most load messages sent to it and sleep 13ms for each - // load message - sleepTime = loadMsgs.max() * 13; - } - - if (locationLess > 0) { - sleepTime = Math.max(Math.max(100L, locationLess), sleepTime); - } - - return sleepTime; - } - - } - - private static class OfflineLoader extends Loader { + // remove any files that were already loaded - filesToLoad.keySet().removeAll(tablet.getLoaded().keySet()); ++ tablet.getLoaded().keySet().forEach(stf -> { ++ filesToLoad.keySet().remove(stf.getTabletFile()); ++ }); - BatchWriter bw; + if (!filesToLoad.isEmpty()) { + // ELASTICITY_TODO lets automatically call require prev end row + var tabletMutator = conditionalMutator.mutateTablet(tablet.getExtent()) + .requireAbsentOperation().requirePrevEndRow(tablet.getExtent().prevEndRow()); - // track how many tablets were sent load messages per tablet server - MapCounter<HostAndPort> unloadingTablets; + filesToLoad.forEach((f, v) -> { + // ELASTICITY_TODO should not expect to see the bulk files there (as long there is only + // a single thread running this), not sure if the following require absent is needed + tabletMutator.requireAbsentBulkFile(f); + tabletMutator.putBulkFile(f, tid); + tabletMutator.putFile(f, v); + }); - @Override - void start(Path bulkDir, Manager manager, long tid, boolean setTime) throws Exception { - Preconditions.checkArgument(!setTime); - super.start(bulkDir, manager, tid, setTime); - bw = manager.getContext().createBatchWriter(MetadataTable.NAME); - unloadingTablets = new MapCounter<>(); - } - - @Override - void load(List<TabletMetadata> tablets, Files files) throws MutationsRejectedException { - byte[] fam = TextUtil.getBytes(DataFileColumnFamily.NAME); - - for (TabletMetadata tablet : tablets) { - if (tablet.getLocation() != null) { - unloadingTablets.increment(tablet.getLocation().getHostAndPort(), 1L); - continue; + tabletMutator.submit(tm -> false); } - - Mutation mutation = new Mutation(tablet.getExtent().toMetaRow()); - - for (final Bulk.FileInfo fileInfo : files) { - String fullPath = new Path(bulkDir, fileInfo.getFileName()).toString(); - byte[] val = - new DataFileValue(fileInfo.getEstFileSize(), fileInfo.getEstNumEntries()).encode(); - mutation.put(fam, fullPath.getBytes(UTF_8), val); - } - - bw.addMutation(mutation); } } diff --cc server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/UpdateTablets.java index 1ed5de53b9,0000000000..cdc4ac012a mode 100644,000000..100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/UpdateTablets.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/UpdateTablets.java @@@ -1,245 -1,0 +1,245 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.manager.tableOps.split; + +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.SortedSet; +import java.util.TreeMap; + +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.dataImpl.KeyExtent; +import org.apache.accumulo.core.fate.FateTxId; +import org.apache.accumulo.core.fate.Repo; +import org.apache.accumulo.core.metadata.StoredTabletFile; +import org.apache.accumulo.core.metadata.schema.Ample.ConditionalResult.Status; +import org.apache.accumulo.core.metadata.schema.DataFileValue; +import org.apache.accumulo.core.metadata.schema.TabletMetadata; +import org.apache.accumulo.core.metadata.schema.TabletOperationId; +import org.apache.accumulo.core.metadata.schema.TabletOperationType; +import org.apache.accumulo.manager.Manager; +import org.apache.accumulo.manager.tableOps.ManagerRepo; +import org.apache.accumulo.server.util.FileUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Function; +import com.google.common.base.Preconditions; + +public class UpdateTablets extends ManagerRepo { + private static final Logger log = LoggerFactory.getLogger(UpdateTablets.class); + private static final long serialVersionUID = 1L; + private final SplitInfo splitInfo; + private final List<String> dirNames; + + public UpdateTablets(SplitInfo splitInfo, List<String> dirNames) { + this.splitInfo = splitInfo; + this.dirNames = dirNames; + } + + @Override + public Repo<Manager> call(long tid, Manager manager) throws Exception { + TabletMetadata tabletMetadata = + manager.getContext().getAmple().readTablet(splitInfo.getOriginal()); + + var opid = TabletOperationId.from(TabletOperationType.SPLITTING, tid); + + if (tabletMetadata == null) { + // check to see if this operation has already succeeded. + TabletMetadata newTabletMetadata = + manager.getContext().getAmple().readTablet(splitInfo.getTablets().last()); + + if (newTabletMetadata != null && opid.equals(newTabletMetadata.getOperationId())) { + // have already created the new tablet and failed before we could return the next step, so + // lets go ahead and return the next step. + log.trace( + "{} creating new tablet was rejected because it existed, operation probably failed before.", + FateTxId.formatTid(tid)); + return new DeleteOperationIds(splitInfo); + } else { + throw new IllegalStateException("Tablet is in an unexpected condition " + + splitInfo.getOriginal() + " " + (newTabletMetadata == null) + " " + + (newTabletMetadata == null ? null : newTabletMetadata.getOperationId())); + } + } + + Preconditions.checkState(tabletMetadata.getOperationId().equals(opid), + "Tablet %s does not have expected operation id %s it has %s", splitInfo.getOriginal(), opid, + tabletMetadata.getOperationId()); + + var newTablets = splitInfo.getTablets(); + + var newTabletsFiles = getNewTabletFiles(newTablets, tabletMetadata, + file -> manager.getSplitter().getCachedFileInfo(splitInfo.getOriginal().tableId(), file)); + + addNewTablets(tid, manager, tabletMetadata, opid, newTablets, newTabletsFiles); + + // Only update the original tablet after successfully creating the new tablets, this is + // important for failure cases where this operation partially runs a then runs again. + + updateExistingTablet(manager, tabletMetadata, opid, newTablets, newTabletsFiles); + + return new DeleteOperationIds(splitInfo); + } + + /** + * Determine which files from the original tablet go to each new tablet being created by the + * split. + */ + static Map<KeyExtent,Map<StoredTabletFile,DataFileValue>> getNewTabletFiles( + Set<KeyExtent> newTablets, TabletMetadata tabletMetadata, + Function<StoredTabletFile,FileUtil.FileInfo> fileInfoProvider) { + + Map<KeyExtent,Map<StoredTabletFile,DataFileValue>> tabletsFiles = new TreeMap<>(); + + newTablets.forEach(extent -> tabletsFiles.put(extent, new HashMap<>())); + + // determine while files overlap which tablets and their estimated sizes + tabletMetadata.getFilesMap().forEach((file, dataFileValue) -> { + FileUtil.FileInfo fileInfo = fileInfoProvider.apply(file); + + Range fileRange; + if (fileInfo != null) { + fileRange = new Range(fileInfo.getFirstRow(), fileInfo.getLastRow()); + } else { + fileRange = new Range(); + } + + // count how many of the new tablets the file will overlap + double numOverlapping = newTablets.stream().map(KeyExtent::toDataRange) + .filter(range -> range.clip(fileRange, true) != null).count(); + + Preconditions.checkState(numOverlapping > 0); + + // evenly split the tablets estimates between the number of tablets it actually overlaps + double sizePerTablet = dataFileValue.getSize() / numOverlapping; + double entriesPerTablet = dataFileValue.getNumEntries() / numOverlapping; + + // add the file to the tablets it overlaps + newTablets.forEach(newTablet -> { + if (newTablet.toDataRange().clip(fileRange, true) != null) { + DataFileValue ndfv = new DataFileValue((long) sizePerTablet, (long) entriesPerTablet, + dataFileValue.getTime()); + tabletsFiles.get(newTablet).put(file, ndfv); + } + }); + }); + + if (log.isTraceEnabled()) { + tabletMetadata.getFilesMap().forEach((f, v) -> { + log.trace("{} original file {} {} {}", tabletMetadata.getExtent(), f.getFileName(), + v.getSize(), v.getNumEntries()); + }); + + tabletsFiles.forEach((extent, files) -> { + files.forEach((f, v) -> { + log.trace("{} split file {} {} {}", extent, f.getFileName(), v.getSize(), + v.getNumEntries()); + }); + }); + } + + return tabletsFiles; + } + + private void addNewTablets(long tid, Manager manager, TabletMetadata tabletMetadata, + TabletOperationId opid, SortedSet<KeyExtent> newTablets, + Map<KeyExtent,Map<StoredTabletFile,DataFileValue>> newTabletsFiles) { + Iterator<String> dirNameIter = dirNames.iterator(); + + try (var tabletsMutator = manager.getContext().getAmple().conditionallyMutateTablets()) { + for (var newExtent : newTablets) { + if (newExtent.equals(newTablets.last())) { + // Skip the last tablet, its done after successfully adding all new tablets + continue; + } + + var mutator = tabletsMutator.mutateTablet(newExtent).requireAbsentTablet(); + + mutator.putOperation(opid); + mutator.putDirName(dirNameIter.next()); + mutator.putTime(tabletMetadata.getTime()); + tabletMetadata.getFlushId().ifPresent(mutator::putFlushId); + mutator.putPrevEndRow(newExtent.prevEndRow()); + tabletMetadata.getCompactId().ifPresent(mutator::putCompactionId); + mutator.putHostingGoal(tabletMetadata.getHostingGoal()); + - tabletMetadata.getLoaded().forEach(mutator::putBulkFile); ++ tabletMetadata.getLoaded().forEach((k, v) -> mutator.putBulkFile(k.getTabletFile(), v)); + tabletMetadata.getLogs().forEach(mutator::putWal); + + newTabletsFiles.get(newExtent).forEach(mutator::putFile); + + mutator.submit(afterMeta -> opid.equals(afterMeta.getOperationId())); + } + + var results = tabletsMutator.process(); + results.values().forEach(result -> { + var status = result.getStatus(); + + Preconditions.checkState(status == Status.ACCEPTED, "Failed to add new tablet %s %s %s", + status, splitInfo.getOriginal(), result.getExtent()); + }); + } + } + + private void updateExistingTablet(Manager manager, TabletMetadata tabletMetadata, + TabletOperationId opid, SortedSet<KeyExtent> newTablets, + Map<KeyExtent,Map<StoredTabletFile,DataFileValue>> newTabletsFiles) { + try (var tabletsMutator = manager.getContext().getAmple().conditionallyMutateTablets()) { + var newExtent = newTablets.last(); + + var mutator = tabletsMutator.mutateTablet(splitInfo.getOriginal()).requireOperation(opid) + .requirePrevEndRow(splitInfo.getOriginal().prevEndRow()); + + mutator.putPrevEndRow(newExtent.prevEndRow()); + + newTabletsFiles.get(newExtent).forEach(mutator::putFile); + + // remove the files from the original tablet that did not end up in the tablet + tabletMetadata.getFiles().forEach(existingFile -> { + if (!newTabletsFiles.get(newExtent).containsKey(existingFile)) { + mutator.deleteFile(existingFile); + } + }); + + mutator.submit(tm -> false); + + var result = tabletsMutator.process().get(splitInfo.getOriginal()); + + if (result.getStatus() == Status.REJECTED) { + // Can not use Ample's built in code for checking rejected because we are changing the prev + // end row and Ample would try to read the old tablet, so must check it manually. + + var tabletMeta = manager.getContext().getAmple().readTablet(newExtent); + + if (tabletMeta == null || !tabletMeta.getOperationId().equals(opid)) { + throw new IllegalStateException("Failed to update existing tablet in split " + + splitInfo.getOriginal() + " " + result.getStatus() + " " + result.getExtent()); + } + } else if (result.getStatus() != Status.ACCEPTED) { + // maybe this step is being run again and the update was already made + throw new IllegalStateException("Failed to update existing tablet in split " + + splitInfo.getOriginal() + " " + result.getStatus() + " " + result.getExtent()); + } + } + } +} diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletData.java index ad7059e238,5963dc113f..2648185dac --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletData.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletData.java @@@ -26,9 -26,8 +26,9 @@@ import java.util.Map import java.util.SortedMap; import java.util.TreeMap; +import org.apache.accumulo.core.client.admin.TabletHostingGoal; + import org.apache.accumulo.core.metadata.ReferencedTabletFile; import org.apache.accumulo.core.metadata.StoredTabletFile; - import org.apache.accumulo.core.metadata.TabletFile; import org.apache.accumulo.core.metadata.schema.DataFileValue; import org.apache.accumulo.core.metadata.schema.ExternalCompactionId; import org.apache.accumulo.core.metadata.schema.ExternalCompactionMetadata; @@@ -81,7 -78,7 +81,7 @@@ public class TabletData // Data pulled from an existing tablet to make a split public TabletData(String dirName, SortedMap<StoredTabletFile,DataFileValue> highDatafileSizes, MetadataTime time, long lastFlushID, long lastCompactID, Location lastLocation, - Map<Long,List<TabletFile>> bulkIngestedFiles, TabletHostingGoal goal) { - Map<Long,List<ReferencedTabletFile>> bulkIngestedFiles) { ++ Map<Long,List<ReferencedTabletFile>> bulkIngestedFiles, TabletHostingGoal goal) { this.directoryName = dirName; this.dataFiles = highDatafileSizes; this.time = time; diff --cc test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java index ee18a6133a,0000000000..5c25fd9fd7 mode 100644,000000..100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java @@@ -1,428 -1,0 +1,428 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.test.functional; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNull; + +import java.util.List; +import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; + +import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.client.Accumulo; +import org.apache.accumulo.core.client.AccumuloClient; +import org.apache.accumulo.core.client.admin.NewTableConfiguration; +import org.apache.accumulo.core.data.TableId; +import org.apache.accumulo.core.dataImpl.KeyExtent; +import org.apache.accumulo.core.metadata.MetadataTable; +import org.apache.accumulo.core.metadata.RootTable; +import org.apache.accumulo.core.metadata.StoredTabletFile; +import org.apache.accumulo.core.metadata.TServerInstance; +import org.apache.accumulo.core.metadata.schema.Ample.ConditionalResult.Status; +import org.apache.accumulo.core.metadata.schema.DataFileValue; +import org.apache.accumulo.core.metadata.schema.TabletMetadata.Location; +import org.apache.accumulo.core.metadata.schema.TabletMetadata.LocationType; +import org.apache.accumulo.core.metadata.schema.TabletOperationId; +import org.apache.accumulo.core.metadata.schema.TabletOperationType; +import org.apache.accumulo.core.security.TablePermission; +import org.apache.accumulo.harness.AccumuloClusterHarness; +import org.apache.accumulo.server.metadata.ConditionalTabletsMutatorImpl; +import org.apache.accumulo.server.zookeeper.TransactionWatcher; +import org.apache.hadoop.io.Text; +import org.junit.jupiter.api.Test; + +public class AmpleConditionalWriterIT extends AccumuloClusterHarness { + + @Test + public void testLocations() throws Exception { + try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) { + + String tableName = getUniqueNames(1)[0]; + + SortedSet<Text> splits = new TreeSet<>(List.of(new Text("c"), new Text("f"), new Text("j"))); + c.tableOperations().create(tableName, + new NewTableConfiguration().withSplits(splits).createOffline()); + + c.securityOperations().grantTablePermission("root", MetadataTable.NAME, + TablePermission.WRITE); + + var tid = TableId.of(c.tableOperations().tableIdMap().get(tableName)); + + var e1 = new KeyExtent(tid, new Text("c"), null); + + var ts1 = new TServerInstance("localhost:9997", 5000L); + var ts2 = new TServerInstance("localhost:9997", 6000L); + + var context = cluster.getServerContext(); + + assertNull(context.getAmple().readTablet(e1).getLocation()); + + var ctmi = new ConditionalTabletsMutatorImpl(context); + ctmi.mutateTablet(e1).requireAbsentOperation().requireAbsentLocation() + .putLocation(Location.future(ts1)).submit(tm -> false); + var results = ctmi.process(); + assertEquals(Status.ACCEPTED, results.get(e1).getStatus()); + + assertEquals(Location.future(ts1), context.getAmple().readTablet(e1).getLocation()); + + ctmi = new ConditionalTabletsMutatorImpl(context); + ctmi.mutateTablet(e1).requireAbsentOperation().requireAbsentLocation() + .putLocation(Location.future(ts2)).submit(tm -> false); + results = ctmi.process(); + assertEquals(Status.REJECTED, results.get(e1).getStatus()); + + assertEquals(Location.future(ts1), context.getAmple().readTablet(e1).getLocation()); + + ctmi = new ConditionalTabletsMutatorImpl(context); + ctmi.mutateTablet(e1).requireAbsentOperation().requireLocation(Location.future(ts1)) + .putLocation(Location.current(ts1)).deleteLocation(Location.future(ts1)) + .submit(tm -> false); + results = ctmi.process(); + assertEquals(Status.ACCEPTED, results.get(e1).getStatus()); + + assertEquals(Location.current(ts1), context.getAmple().readTablet(e1).getLocation()); + + ctmi = new ConditionalTabletsMutatorImpl(context); + ctmi.mutateTablet(e1).requireAbsentOperation().requireLocation(Location.future(ts1)) + .putLocation(Location.current(ts1)).deleteLocation(Location.future(ts1)) + .submit(tm -> false); + results = ctmi.process(); + assertEquals(Status.REJECTED, results.get(e1).getStatus()); + + assertEquals(Location.current(ts1), context.getAmple().readTablet(e1).getLocation()); + + ctmi = new ConditionalTabletsMutatorImpl(context); + ctmi.mutateTablet(e1).requireAbsentOperation().requireLocation(Location.future(ts2)) + .putLocation(Location.current(ts2)).deleteLocation(Location.future(ts2)) + .submit(tm -> false); + results = ctmi.process(); + assertEquals(Status.REJECTED, results.get(e1).getStatus()); + + assertEquals(Location.current(ts1), context.getAmple().readTablet(e1).getLocation()); + + ctmi = new ConditionalTabletsMutatorImpl(context); + ctmi.mutateTablet(e1).requireAbsentOperation().requireLocation(Location.current(ts1)) + .deleteLocation(Location.current(ts1)).submit(tm -> false); + results = ctmi.process(); + assertEquals(Status.ACCEPTED, results.get(e1).getStatus()); + + assertNull(context.getAmple().readTablet(e1).getLocation()); + } + } + + @Test + public void testFiles() throws Exception { + + try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) { + String tableName = getUniqueNames(1)[0]; + + SortedSet<Text> splits = new TreeSet<>(List.of(new Text("c"), new Text("f"), new Text("j"))); + c.tableOperations().create(tableName, + new NewTableConfiguration().withSplits(splits).createOffline()); + + c.securityOperations().grantTablePermission("root", MetadataTable.NAME, + TablePermission.WRITE); + + var tid = TableId.of(c.tableOperations().tableIdMap().get(tableName)); + + var e1 = new KeyExtent(tid, new Text("c"), null); + + var ts1 = new TServerInstance("localhost:9997", 5000L); + var ts2 = new TServerInstance("localhost:9997", 6000L); + + var context = cluster.getServerContext(); + + var stf1 = new StoredTabletFile( + "hdfs://localhost:8020/accumulo/tables/2a/default_tablet/F0000070.rf"); + var stf2 = new StoredTabletFile( + "hdfs://localhost:8020/accumulo/tables/2a/default_tablet/F0000071.rf"); + var stf3 = new StoredTabletFile( + "hdfs://localhost:8020/accumulo/tables/2a/default_tablet/F0000072.rf"); + var stf4 = new StoredTabletFile( + "hdfs://localhost:8020/accumulo/tables/2a/default_tablet/C0000073.rf"); + + System.out.println(context.getAmple().readTablet(e1).getLocation()); + + // simulate a compaction where the tablet location is not set + var ctmi = new ConditionalTabletsMutatorImpl(context); + ctmi.mutateTablet(e1).requireAbsentOperation().requireFile(stf1).requireFile(stf2) + .requireFile(stf3).putFile(stf4, new DataFileValue(0, 0)).submit(tm -> false); + var results = ctmi.process(); + assertEquals(Status.REJECTED, results.get(e1).getStatus()); + + assertEquals(Set.of(), context.getAmple().readTablet(e1).getFiles()); + + // simulate minor compacts where the tablet location is not set + for (StoredTabletFile file : List.of(stf1, stf2, stf3)) { + ctmi = new ConditionalTabletsMutatorImpl(context); + ctmi.mutateTablet(e1).requireAbsentOperation().requireLocation(Location.current(ts1)) + .putFile(file, new DataFileValue(0, 0)).submit(tm -> false); + results = ctmi.process(); + assertEquals(Status.REJECTED, results.get(e1).getStatus()); + } + + assertEquals(Set.of(), context.getAmple().readTablet(e1).getFiles()); + + // set the location + ctmi = new ConditionalTabletsMutatorImpl(context); + ctmi.mutateTablet(e1).requireAbsentOperation().requireAbsentLocation() + .putLocation(Location.current(ts1)).submit(tm -> false); + results = ctmi.process(); + assertEquals(Status.ACCEPTED, results.get(e1).getStatus()); + + // simulate minor compacts where the tablet location is wrong + for (StoredTabletFile file : List.of(stf1, stf2, stf3)) { + ctmi = new ConditionalTabletsMutatorImpl(context); + ctmi.mutateTablet(e1).requireAbsentOperation().requireLocation(Location.current(ts2)) + .putFile(file, new DataFileValue(0, 0)).submit(tm -> false); + results = ctmi.process(); + assertEquals(Status.REJECTED, results.get(e1).getStatus()); + } + + assertEquals(Set.of(), context.getAmple().readTablet(e1).getFiles()); + + // simulate minor compacts where the tablet location is set + for (StoredTabletFile file : List.of(stf1, stf2, stf3)) { + ctmi = new ConditionalTabletsMutatorImpl(context); + ctmi.mutateTablet(e1).requireAbsentOperation().requireLocation(Location.current(ts1)) + .putFile(file, new DataFileValue(0, 0)).submit(tm -> false); + results = ctmi.process(); + assertEquals(Status.ACCEPTED, results.get(e1).getStatus()); + } + + assertEquals(Set.of(stf1, stf2, stf3), context.getAmple().readTablet(e1).getFiles()); + + // simulate a compaction + ctmi = new ConditionalTabletsMutatorImpl(context); + ctmi.mutateTablet(e1).requireAbsentOperation().requireFile(stf1).requireFile(stf2) + .requireFile(stf3).putFile(stf4, new DataFileValue(0, 0)).deleteFile(stf1) + .deleteFile(stf2).deleteFile(stf3).submit(tm -> false); + results = ctmi.process(); + assertEquals(Status.ACCEPTED, results.get(e1).getStatus()); + + assertEquals(Set.of(stf4), context.getAmple().readTablet(e1).getFiles()); + + // without this the metadata constraint will not allow the bulk file to be added to metadata + TransactionWatcher.ZooArbitrator.start(context, Constants.BULK_ARBITRATOR_TYPE, 9L); + + // simulate a bulk import + var stf5 = + new StoredTabletFile("hdfs://localhost:8020/accumulo/tables/2a/b-0000009/I0000074.rf"); + ctmi = new ConditionalTabletsMutatorImpl(context); - ctmi.mutateTablet(e1).requireAbsentOperation().requireAbsentBulkFile(stf5) - .putFile(stf5, new DataFileValue(0, 0)).putBulkFile(stf5, 9L) ++ ctmi.mutateTablet(e1).requireAbsentOperation().requireAbsentBulkFile(stf5.getTabletFile()) ++ .putFile(stf5, new DataFileValue(0, 0)).putBulkFile(stf5.getTabletFile(), 9L) + .putFile(stf5, new DataFileValue(0, 0)).submit(tm -> false); + results = ctmi.process(); + assertEquals(Status.ACCEPTED, results.get(e1).getStatus()); + + assertEquals(Set.of(stf4, stf5), context.getAmple().readTablet(e1).getFiles()); + + // simulate a compaction + var stf6 = new StoredTabletFile( + "hdfs://localhost:8020/accumulo/tables/2a/default_tablet/A0000075.rf"); + ctmi = new ConditionalTabletsMutatorImpl(context); + ctmi.mutateTablet(e1).requireAbsentOperation().requireFile(stf4).requireFile(stf5) + .putFile(stf6, new DataFileValue(0, 0)).deleteFile(stf4).deleteFile(stf5) + .submit(tm -> false); + results = ctmi.process(); + assertEquals(Status.ACCEPTED, results.get(e1).getStatus()); + + assertEquals(Set.of(stf6), context.getAmple().readTablet(e1).getFiles()); + + // simulate trying to re bulk import file after a compaction + ctmi = new ConditionalTabletsMutatorImpl(context); - ctmi.mutateTablet(e1).requireAbsentOperation().requireAbsentBulkFile(stf5) - .putFile(stf5, new DataFileValue(0, 0)).putBulkFile(stf5, 9L) ++ ctmi.mutateTablet(e1).requireAbsentOperation().requireAbsentBulkFile(stf5.getTabletFile()) ++ .putFile(stf5, new DataFileValue(0, 0)).putBulkFile(stf5.getTabletFile(), 9L) + .putFile(stf5, new DataFileValue(0, 0)).submit(tm -> false); + results = ctmi.process(); + assertEquals(Status.REJECTED, results.get(e1).getStatus()); + + assertEquals(Set.of(stf6), context.getAmple().readTablet(e1).getFiles()); + } + } + + @Test + public void testMultipleExtents() throws Exception { + try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) { + String tableName = getUniqueNames(1)[0]; + + SortedSet<Text> splits = new TreeSet<>(List.of(new Text("c"), new Text("f"), new Text("j"))); + c.tableOperations().create(tableName, + new NewTableConfiguration().withSplits(splits).createOffline()); + + c.securityOperations().grantTablePermission("root", MetadataTable.NAME, + TablePermission.WRITE); + + var tid = TableId.of(c.tableOperations().tableIdMap().get(tableName)); + + var e1 = new KeyExtent(tid, new Text("c"), null); + var e2 = new KeyExtent(tid, new Text("f"), new Text("c")); + var e3 = new KeyExtent(tid, new Text("j"), new Text("f")); + var e4 = new KeyExtent(tid, null, new Text("j")); + + var ts1 = new TServerInstance("localhost:9997", 5000L); + var ts2 = new TServerInstance("localhost:9997", 6000L); + + var context = cluster.getServerContext(); + + var ctmi = new ConditionalTabletsMutatorImpl(context); + ctmi.mutateTablet(e1).requireAbsentOperation().requireAbsentLocation() + .putLocation(Location.future(ts1)).submit(tm -> false); + ctmi.mutateTablet(e2).requireAbsentOperation().requireAbsentLocation() + .putLocation(Location.future(ts2)).submit(tm -> false); + var results = ctmi.process(); + + assertEquals(Status.ACCEPTED, results.get(e1).getStatus()); + assertEquals(Status.ACCEPTED, results.get(e2).getStatus()); + + assertEquals(Location.future(ts1), context.getAmple().readTablet(e1).getLocation()); + assertEquals(Location.future(ts2), context.getAmple().readTablet(e2).getLocation()); + assertNull(context.getAmple().readTablet(e3).getLocation()); + assertNull(context.getAmple().readTablet(e4).getLocation()); + + assertEquals(Set.of(e1, e2), results.keySet()); + + ctmi = new ConditionalTabletsMutatorImpl(context); + ctmi.mutateTablet(e1).requireAbsentOperation().requireAbsentLocation() + .putLocation(Location.future(ts2)).submit(tm -> false); + ctmi.mutateTablet(e2).requireAbsentOperation().requireAbsentLocation() + .putLocation(Location.future(ts1)).submit(tm -> false); + ctmi.mutateTablet(e3).requireAbsentOperation().requireAbsentLocation() + .putLocation(Location.future(ts1)).submit(tm -> false); + ctmi.mutateTablet(e4).requireAbsentOperation().requireAbsentLocation() + .putLocation(Location.future(ts2)).submit(tm -> false); + results = ctmi.process(); + + assertEquals(Status.REJECTED, results.get(e1).getStatus()); + assertEquals(Status.REJECTED, results.get(e2).getStatus()); + assertEquals(Status.ACCEPTED, results.get(e3).getStatus()); + assertEquals(Status.ACCEPTED, results.get(e4).getStatus()); + + assertEquals(Location.future(ts1), context.getAmple().readTablet(e1).getLocation()); + assertEquals(Location.future(ts2), context.getAmple().readTablet(e2).getLocation()); + assertEquals(Location.future(ts1), context.getAmple().readTablet(e3).getLocation()); + assertEquals(Location.future(ts2), context.getAmple().readTablet(e4).getLocation()); + + assertEquals(Set.of(e1, e2, e3, e4), results.keySet()); + + } + } + + @Test + public void testOperations() throws Exception { + try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) { + String tableName = getUniqueNames(1)[0]; + + SortedSet<Text> splits = new TreeSet<>(List.of(new Text("c"), new Text("f"), new Text("j"))); + c.tableOperations().create(tableName, + new NewTableConfiguration().withSplits(splits).createOffline()); + + c.securityOperations().grantTablePermission("root", MetadataTable.NAME, + TablePermission.WRITE); + + var tid = TableId.of(c.tableOperations().tableIdMap().get(tableName)); + + var e1 = new KeyExtent(tid, new Text("c"), null); + var e2 = new KeyExtent(tid, new Text("f"), new Text("c")); + var e3 = new KeyExtent(tid, new Text("j"), new Text("f")); + + var context = cluster.getServerContext(); + + var opid1 = TabletOperationId.from("SPLITTING:FATE[1234]"); + var opid2 = TabletOperationId.from("MERGING:FATE[5678]"); + + var ctmi = new ConditionalTabletsMutatorImpl(context); + ctmi.mutateTablet(e1).requireAbsentOperation().putOperation(opid1).submit(tm -> false); + ctmi.mutateTablet(e2).requireAbsentOperation().putOperation(opid2).submit(tm -> false); + ctmi.mutateTablet(e3).requireOperation(opid1).deleteOperation().submit(tm -> false); + var results = ctmi.process(); + + assertEquals(Status.ACCEPTED, results.get(e1).getStatus()); + assertEquals(Status.ACCEPTED, results.get(e2).getStatus()); + assertEquals(Status.REJECTED, results.get(e3).getStatus()); + assertEquals(TabletOperationType.SPLITTING, + context.getAmple().readTablet(e1).getOperationId().getType()); + assertEquals(opid1, context.getAmple().readTablet(e1).getOperationId()); + assertEquals(TabletOperationType.MERGING, + context.getAmple().readTablet(e2).getOperationId().getType()); + assertEquals(opid2, context.getAmple().readTablet(e2).getOperationId()); + assertNull(context.getAmple().readTablet(e3).getOperationId()); + + ctmi = new ConditionalTabletsMutatorImpl(context); + ctmi.mutateTablet(e1).requireOperation(opid2).deleteOperation().submit(tm -> false); + ctmi.mutateTablet(e2).requireOperation(opid1).deleteOperation().submit(tm -> false); + results = ctmi.process(); + + assertEquals(Status.REJECTED, results.get(e1).getStatus()); + assertEquals(Status.REJECTED, results.get(e2).getStatus()); + assertEquals(TabletOperationType.SPLITTING, + context.getAmple().readTablet(e1).getOperationId().getType()); + assertEquals(TabletOperationType.MERGING, + context.getAmple().readTablet(e2).getOperationId().getType()); + + ctmi = new ConditionalTabletsMutatorImpl(context); + ctmi.mutateTablet(e1).requireOperation(opid1).deleteOperation().submit(tm -> false); + ctmi.mutateTablet(e2).requireOperation(opid2).deleteOperation().submit(tm -> false); + results = ctmi.process(); + + assertEquals(Status.ACCEPTED, results.get(e1).getStatus()); + assertEquals(Status.ACCEPTED, results.get(e2).getStatus()); + assertNull(context.getAmple().readTablet(e1).getOperationId()); + assertNull(context.getAmple().readTablet(e2).getOperationId()); + } + } + + @Test + public void testRootTabletUpdate() throws Exception { + var context = cluster.getServerContext(); + + var rootMeta = context.getAmple().readTablet(RootTable.EXTENT); + var loc = rootMeta.getLocation(); + + assertEquals(LocationType.CURRENT, loc.getType()); + assertFalse(rootMeta.getCompactId().isPresent()); + + var ctmi = new ConditionalTabletsMutatorImpl(context); + ctmi.mutateTablet(RootTable.EXTENT).requireAbsentOperation().requireAbsentLocation() + .putCompactionId(7).submit(tm -> false); + var results = ctmi.process(); + assertEquals(Status.REJECTED, results.get(RootTable.EXTENT).getStatus()); + assertFalse(context.getAmple().readTablet(RootTable.EXTENT).getCompactId().isPresent()); + + ctmi = new ConditionalTabletsMutatorImpl(context); + ctmi.mutateTablet(RootTable.EXTENT).requireAbsentOperation() + .requireLocation(Location.future(loc.getServerInstance())).putCompactionId(7) + .submit(tm -> false); + results = ctmi.process(); + assertEquals(Status.REJECTED, results.get(RootTable.EXTENT).getStatus()); + assertFalse(context.getAmple().readTablet(RootTable.EXTENT).getCompactId().isPresent()); + + ctmi = new ConditionalTabletsMutatorImpl(context); + ctmi.mutateTablet(RootTable.EXTENT).requireAbsentOperation() + .requireLocation(Location.current(loc.getServerInstance())).putCompactionId(7) + .submit(tm -> false); + results = ctmi.process(); + assertEquals(Status.ACCEPTED, results.get(RootTable.EXTENT).getStatus()); + assertEquals(7L, context.getAmple().readTablet(RootTable.EXTENT).getCompactId().getAsLong()); + } + +}
