This is an automated email from the ASF dual-hosted git repository.
dlmarion pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/2.1 by this push:
new 34823acfc1 Enable clients to scan offline tables using ScanServers
(#6156)
34823acfc1 is described below
commit 34823acfc1cc506637427eeda4e519de1fc320c7
Author: Dave Marion <[email protected]>
AuthorDate: Wed Mar 4 07:18:24 2026 -0500
Enable clients to scan offline tables using ScanServers (#6156)
During a normal client scan the TabletLocator resolves
tablets (key extent and location) for a given search range.
The location is necessary for the client to be able to create
a connection with a tablet server to perform the scan, but
the location is not needed when the client is using scan
servers. The TabletLocator does not resolve tablets for
offline tables.
This change introduces the OfflineTabletLocatorImpl that
performs this resolution (range -> key extents) and does
not provide any location information. This change also
modifies the client to allow scans on offline tables
when using scan servers and uses the new OfflineTabletLocatorImpl
in that code path. The new OfflineTabletLocatorImpl uses
new client properties to configure the caching of offline
extent information.
---
.../accumulo/core/clientImpl/ClientContext.java | 7 +-
.../core/clientImpl/OfflineTabletLocatorImpl.java | 378 +++++++++++++++++++++
.../accumulo/core/clientImpl/ScannerImpl.java | 10 +
.../accumulo/core/clientImpl/TabletLocator.java | 42 ++-
.../core/clientImpl/TabletServerBatchReader.java | 4 +
.../apache/accumulo/core/conf/ClientProperty.java | 24 +-
.../main/java/org/apache/accumulo/gc/GCRun.java | 4 +-
.../accumulo/gc/GarbageCollectWriteAheadLogs.java | 4 +-
.../accumulo/test/ScanServerAllowedTablesIT.java | 4 +-
.../org/apache/accumulo/test/ScanServerIT.java | 21 +-
.../accumulo/test/ScanServerOfflineTableIT.java | 220 ++++++++++++
.../org/apache/accumulo/test/VerifyIngest.java | 9 +
.../accumulo/test/functional/ReadWriteIT.java | 14 +-
13 files changed, 693 insertions(+), 48 deletions(-)
diff --git
a/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java
b/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java
index e5b059a80f..7dc37c3545 100644
--- a/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java
+++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java
@@ -710,8 +710,8 @@ public class ClientContext implements AccumuloClient {
int numQueryThreads) throws TableNotFoundException {
ensureOpen();
checkArgument(authorizations != null, "authorizations is null");
- return new TabletServerBatchReader(this,
requireNotOffline(getTableId(tableName), tableName),
- tableName, authorizations, numQueryThreads);
+ return new TabletServerBatchReader(this, getTableId(tableName), tableName,
authorizations,
+ numQueryThreads);
}
@Override
@@ -796,8 +796,7 @@ public class ClientContext implements AccumuloClient {
throws TableNotFoundException {
ensureOpen();
checkArgument(authorizations != null, "authorizations is null");
- Scanner scanner =
- new ScannerImpl(this, requireNotOffline(getTableId(tableName),
tableName), authorizations);
+ Scanner scanner = new ScannerImpl(this, getTableId(tableName),
authorizations);
Integer batchSize =
ClientProperty.SCANNER_BATCH_SIZE.getInteger(getProperties());
if (batchSize != null) {
scanner.setBatchSize(batchSize);
diff --git
a/core/src/main/java/org/apache/accumulo/core/clientImpl/OfflineTabletLocatorImpl.java
b/core/src/main/java/org/apache/accumulo/core/clientImpl/OfflineTabletLocatorImpl.java
new file mode 100644
index 0000000000..de7b592d26
--- /dev/null
+++
b/core/src/main/java/org/apache/accumulo/core/clientImpl/OfflineTabletLocatorImpl.java
@@ -0,0 +1,378 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.clientImpl;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.conf.ClientProperty;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.PartialKey;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.TableId;
+import org.apache.accumulo.core.dataImpl.KeyExtent;
+import org.apache.accumulo.core.manager.state.tables.TableState;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType;
+import org.apache.accumulo.core.metadata.schema.TabletsMetadata;
+import org.apache.accumulo.core.util.Timer;
+import org.apache.hadoop.io.Text;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.Policy.Eviction;
+import com.github.benmanes.caffeine.cache.RemovalCause;
+import com.github.benmanes.caffeine.cache.RemovalListener;
+import com.github.benmanes.caffeine.cache.Scheduler;
+import com.google.common.base.Preconditions;
+
+import io.micrometer.core.instrument.Metrics;
+import io.micrometer.core.instrument.binder.cache.CaffeineStatsCounter;
+
+public class OfflineTabletLocatorImpl extends TabletLocator {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(OfflineTabletLocatorImpl.class);
+
+ public static class OfflineTabletLocation extends TabletLocation {
+
+ public static final String SERVER = "offline_table_marker";
+
+ public OfflineTabletLocation(KeyExtent tablet_extent) {
+ super(tablet_extent, SERVER, SERVER);
+ }
+
+ }
+
+ private class OfflineTabletsCache implements
RemovalListener<KeyExtent,KeyExtent> {
+
+ private final ClientContext context;
+ private final int maxCacheSize;
+ private final int prefetch;
+ private final Cache<KeyExtent,KeyExtent> cache;
+ private final LinkedBlockingQueue<KeyExtent> evictions = new
LinkedBlockingQueue<>();
+ private final TreeSet<KeyExtent> extents = new TreeSet<>();
+ private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+ private final Timer scanTimer = Timer.startNew();
+ private final AtomicInteger cacheCount = new AtomicInteger(0);
+ private final Eviction<KeyExtent,KeyExtent> evictionPolicy;
+
+ private OfflineTabletsCache(ClientContext context)
+ throws TableNotFoundException, AccumuloSecurityException,
AccumuloException {
+ this.context = context;
+ Properties clientProperties = context.getProperties();
+ Duration cacheDuration = Duration.ofMillis(
+
ClientProperty.OFFLINE_LOCATOR_CACHE_DURATION.getTimeInMillis(clientProperties));
+ maxCacheSize =
+
Integer.parseInt(ClientProperty.OFFLINE_LOCATOR_CACHE_SIZE.getValue(clientProperties));
+ prefetch = Integer
+
.parseInt(ClientProperty.OFFLINE_LOCATOR_CACHE_PREFETCH.getValue(clientProperties));
+
+ // This cache is used to evict KeyExtents from the extents TreeSet when
+ // they have not been accessed in cacheDuration. We are targeting to have
+ // maxCacheSize objects in the cache, but are not using the Cache's
maximumSize
+ // to achieve this as the Cache will remove things from the Cache that
were
+ // newly inserted and not yet used. This negates the pre-fetching feature
+ // that we have added into this TabletLocator for offline tables. Here we
+ // set the maximum size much larger than the property and use the
cacheCount
+ // variable to manage the max size manually.
+ // @formatter:off
+ Caffeine<KeyExtent,KeyExtent> builder = Caffeine.newBuilder()
+ .expireAfterAccess(cacheDuration)
+ .removalListener(this)
+ .scheduler(Scheduler.systemScheduler())
+ .recordStats(() -> new CaffeineStatsCounter(Metrics.globalRegistry,
+ OfflineTabletsCache.class.getSimpleName()));
+ if (maxCacheSize > 0) {
+ builder.initialCapacity(maxCacheSize).maximumSize(maxCacheSize * 2);
+ } else {
+ String tname = context.getTableName(tid);
+
builder.initialCapacity(context.tableOperations().listSplits(tname).size());
+ }
+ cache = builder.build();
+ // @formatter:on
+ if (maxCacheSize > 0) {
+ evictionPolicy = cache.policy().eviction().orElseThrow();
+ } else {
+ evictionPolicy = null;
+ }
+ }
+
+ @Override
+ public void onRemoval(KeyExtent key, KeyExtent value, RemovalCause cause) {
+ if (cause == RemovalCause.REPLACED) {
+ // Don't remove from `extents` if the object was replaced in the cache
+ return;
+ }
+ LOG.trace("Extent {} was evicted from cache for {} ", key, cause);
+ cacheCount.decrementAndGet();
+ evictions.add(key);
+ try {
+ if (lock.writeLock().tryLock(1, TimeUnit.MILLISECONDS)) {
+ try {
+ processRecentCacheEvictions();
+ } finally {
+ lock.writeLock().unlock();
+ }
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException("Interrupted while waiting to acquire write
lock", e);
+ }
+ }
+
+ private void processRecentCacheEvictions() {
+ Preconditions.checkArgument(lock.writeLock().isHeldByCurrentThread());
+ Set<KeyExtent> copy = new HashSet<>();
+ evictions.drainTo(copy);
+ int numEvictions = copy.size();
+ if (numEvictions > 0) {
+ LOG.trace("Processing {} prior evictions", numEvictions);
+ extents.removeAll(copy);
+ }
+ }
+
+ private KeyExtent findOrLoadExtent(KeyExtent searchKey) {
+ lock.readLock().lock();
+ try {
+ KeyExtent match = extents.ceiling(searchKey);
+ if (match != null && match.contains(searchKey.endRow())) {
+ // update access time in cache
+ @SuppressWarnings("unused")
+ var unused = cache.getIfPresent(match);
+ LOG.trace("Extent {} found in cache for start row {}", match,
searchKey);
+ return match;
+ }
+ } finally {
+ lock.readLock().unlock();
+ }
+ lock.writeLock().lock();
+ try {
+ // process prior evictions since we have the write lock
+ processRecentCacheEvictions();
+ // The following block of code fixes an issue with
+ // the cache where recently pre-fetched extents
+ // will be evicted from the cache when it reaches
+ // the maxCacheSize. This is because from the cache's
+ // perspective they are the coldest objects. The code
+ // below manually removes the coldest extents that are
+ // before the searchKey.endRow to make room for the next
+ // batch of extents that we are going to load into the
+ // cache so that they are not immediately evicted.
+ if (maxCacheSize > 0 && cacheCount.get() + prefetch + 1 >=
maxCacheSize) {
+ int evictionSize = prefetch * 2;
+ Set<KeyExtent> candidates = new
HashSet<>(evictionPolicy.coldest(evictionSize).keySet());
+ LOG.trace("Cache near max size, evaluating {} coldest entries",
candidates);
+ candidates.removeIf(ke -> ke.contains(searchKey.endRow()) ||
ke.endRow() == null
+ || ke.endRow().compareTo(searchKey.endRow()) >= 0);
+ LOG.trace("Manually evicting coldest entries: {}", candidates);
+ cache.invalidateAll(candidates);
+ cache.cleanUp();
+ }
+ // Load TabletMetadata
+ if (LOG.isDebugEnabled()) {
+ scanTimer.restart();
+ }
+ int added = 0;
+ try (TabletsMetadata tm =
context.getAmple().readTablets().forTable(tid)
+ .overlapping(searchKey.endRow(), true, null)
+ .fetch(ColumnType.PREV_ROW, ColumnType.LOCATION).build()) {
+ Iterator<TabletMetadata> iter = tm.iterator();
+ for (int i = 0; i < prefetch && iter.hasNext(); i++) {
+ TabletMetadata t = iter.next();
+ KeyExtent ke = t.getExtent();
+ LOG.trace("Caching extent: {}", ke);
+ cache.put(ke, ke);
+ cacheCount.incrementAndGet();
+ TabletLocatorImpl.removeOverlapping(extents, ke);
+ extents.add(ke);
+ added++;
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Took {}ms to scan and load {} metadata tablets for
table {}",
+ scanTimer.elapsed(TimeUnit.MILLISECONDS), added, tid);
+ }
+ return extents.ceiling(searchKey);
+ }
+ } finally {
+ lock.writeLock().unlock();
+ }
+ }
+
+ private void invalidate(KeyExtent failedExtent) {
+ cache.invalidate(failedExtent);
+ }
+
+ private void invalidate(Collection<KeyExtent> keySet) {
+ cache.invalidateAll(keySet);
+ }
+
+ private void invalidateAll() {
+ cache.invalidateAll();
+ }
+
+ }
+
+ private final TableId tid;
+ private final OfflineTabletsCache extentCache;
+
+ public OfflineTabletLocatorImpl(ClientContext context, TableId tableId) {
+ tid = tableId;
+ if (context.getTableState(tid) != TableState.OFFLINE) {
+ throw new IllegalStateException("Table " + tableId + " is not offline");
+ }
+ try {
+ extentCache = new OfflineTabletsCache(context);
+ } catch (TableNotFoundException e) {
+ throw new IllegalStateException("Table " + tableId + " does not exist",
e);
+ } catch (AccumuloSecurityException | AccumuloException e) {
+ throw new IllegalStateException("Unable to get split points for table: "
+ tableId, e);
+ }
+ }
+
+ @Override
+ public TabletLocation locateTablet(ClientContext context, Text row, boolean
skipRow,
+ boolean retry) throws AccumuloException, AccumuloSecurityException,
TableNotFoundException {
+
+ if (skipRow) {
+ row = new Text(row);
+ row.append(new byte[] {0}, 0, 1);
+ }
+
+ Text metadataRow = new Text(tid.canonical());
+ metadataRow.append(new byte[] {';'}, 0, 1);
+ metadataRow.append(row.getBytes(), 0, row.getLength());
+
+ LOG.trace("Locating offline tablet for row: {}", metadataRow);
+ KeyExtent searchKey = KeyExtent.fromMetaRow(metadataRow);
+ KeyExtent match = extentCache.findOrLoadExtent(searchKey);
+ if (match != null) {
+ if (match.contains(row)) {
+ LOG.trace("Found match for row: {}, extent = {}", row, match);
+ return new OfflineTabletLocation(match);
+ }
+ }
+ LOG.trace("Found no matching extent for row: {}", row);
+ return null;
+ }
+
+ @Override
+ public List<Range> binRanges(ClientContext context, List<Range> ranges,
+ Map<String,Map<KeyExtent,List<Range>>> binnedRanges)
+ throws AccumuloException, AccumuloSecurityException,
TableNotFoundException {
+
+ List<TabletLocation> tabletLocations = new ArrayList<>(ranges.size());
+ List<Range> failures = new ArrayList<>();
+
+ l1: for (Range r : ranges) {
+ LOG.trace("Looking up locations for range: {}", r);
+ tabletLocations.clear();
+ Text startRow;
+
+ if (r.getStartKey() != null) {
+ startRow = r.getStartKey().getRow();
+ } else {
+ startRow = new Text();
+ }
+
+ TabletLocation tl = this.locateTablet(context, startRow, false, false);
+ if (tl == null) {
+ LOG.trace("NOT FOUND first tablet in range: {}", r);
+ failures.add(r);
+ continue;
+ }
+ LOG.trace("Found first tablet in range: {}, extent: {}", r,
tl.tablet_extent);
+ tabletLocations.add(tl);
+
+ while (tl.tablet_extent.endRow() != null
+ && !r.afterEndKey(new
Key(tl.tablet_extent.endRow()).followingKey(PartialKey.ROW))) {
+ KeyExtent priorExtent = tl.tablet_extent;
+ tl = locateTablet(context, tl.tablet_extent.endRow(), true, false);
+
+ if (tl == null) {
+ LOG.trace("NOT FOUND tablet following {} in range: {}", priorExtent,
r);
+ failures.add(r);
+ continue l1;
+ }
+ LOG.trace("Found following tablet in range: {}, extent: {}", r,
tl.tablet_extent);
+ tabletLocations.add(tl);
+ }
+
+ // Ensure the extents found are non overlapping and have no holes. When
reading some extents
+ // from the cache and other from the metadata table in the loop above we
may end up with
+ // non-contiguous extents. This can happen when a subset of exents are
placed in the cache and
+ // then after that merges and splits happen.
+ if (TabletLocatorImpl.isContiguous(tabletLocations)) {
+ for (TabletLocation tl2 : tabletLocations) {
+ TabletLocatorImpl.addRange(binnedRanges, tl2.tablet_location,
tl2.tablet_extent, r);
+ }
+ } else {
+ LOG.trace("Found non-contiguous tablet in range: {}", r);
+ failures.add(r);
+ }
+
+ }
+ return failures;
+ }
+
+ @Override
+ public <T extends Mutation> void binMutations(ClientContext context, List<T>
mutations,
+ Map<String,TabletServerMutations<T>> binnedMutations, List<T> failures)
+ throws AccumuloException, AccumuloSecurityException,
TableNotFoundException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void invalidateCache(KeyExtent failedExtent) {
+ extentCache.invalidate(failedExtent);
+ }
+
+ @Override
+ public void invalidateCache(Collection<KeyExtent> keySet) {
+ extentCache.invalidate(keySet);
+ }
+
+ @Override
+ public void invalidateCache() {
+ extentCache.invalidateAll();
+ }
+
+ @Override
+ public void invalidateCache(ClientContext context, String server) {
+ invalidateCache();
+ }
+
+}
diff --git
a/core/src/main/java/org/apache/accumulo/core/clientImpl/ScannerImpl.java
b/core/src/main/java/org/apache/accumulo/core/clientImpl/ScannerImpl.java
index b4dfaa6258..0fef7ecaab 100644
--- a/core/src/main/java/org/apache/accumulo/core/clientImpl/ScannerImpl.java
+++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/ScannerImpl.java
@@ -29,6 +29,7 @@ import java.util.Map.Entry;
import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.TableId;
@@ -163,6 +164,15 @@ public class ScannerImpl extends ScannerOptions implements
Scanner {
@Override
public synchronized Iterator<Entry<Key,Value>> iterator() {
ensureOpen();
+ if (getConsistencyLevel() == ConsistencyLevel.IMMEDIATE) {
+ try {
+ String tableName = context.getTableName(tableId);
+ context.requireNotOffline(tableId, tableName);
+ } catch (TableNotFoundException e) {
+ throw new RuntimeException("Table not found", e);
+ }
+ }
+
ScannerIterator iter = new ScannerIterator(context, tableId,
authorizations, range, size,
Duration.ofMillis(getTimeout(MILLISECONDS)), this, isolated,
readaheadThreshold,
new Reporter());
diff --git
a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletLocator.java
b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletLocator.java
index c4c1dcdcd9..353bfd6da0 100644
--- a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletLocator.java
+++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletLocator.java
@@ -34,6 +34,7 @@ import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.dataImpl.KeyExtent;
+import org.apache.accumulo.core.manager.state.tables.TableState;
import org.apache.accumulo.core.metadata.MetadataLocationObtainer;
import org.apache.accumulo.core.metadata.MetadataTable;
import org.apache.accumulo.core.metadata.RootTable;
@@ -110,6 +111,7 @@ public abstract class TabletLocator {
}
private static final HashMap<LocatorKey,TabletLocator> locators = new
HashMap<>();
+ private static final HashMap<TableId,OfflineTabletLocatorImpl>
offlineLocators = new HashMap<>();
private static boolean enabled = true;
public static synchronized void clearLocators() {
@@ -117,6 +119,7 @@ public abstract class TabletLocator {
locator.isValid = false;
}
locators.clear();
+ offlineLocators.clear();
}
static synchronized boolean isEnabled() {
@@ -135,24 +138,33 @@ public abstract class TabletLocator {
public static synchronized TabletLocator getLocator(ClientContext context,
TableId tableId) {
Preconditions.checkState(enabled, "The Accumulo singleton that that tracks
tablet locations is "
+ "disabled. This is likely caused by all AccumuloClients being closed
or garbage collected");
- LocatorKey key = new LocatorKey(context.getInstanceID(), tableId);
- TabletLocator tl = locators.get(key);
- if (tl == null) {
- MetadataLocationObtainer mlo = new MetadataLocationObtainer();
-
- if (RootTable.ID.equals(tableId)) {
- tl = new RootTabletLocator(context.getTServerLockChecker());
- } else if (MetadataTable.ID.equals(tableId)) {
- tl = new TabletLocatorImpl(MetadataTable.ID, getLocator(context,
RootTable.ID), mlo,
- context.getTServerLockChecker());
- } else {
- tl = new TabletLocatorImpl(tableId, getLocator(context,
MetadataTable.ID), mlo,
- context.getTServerLockChecker());
+ TableState state = context.getTableState(tableId);
+ if (state == TableState.OFFLINE) {
+ LocatorKey key = new LocatorKey(context.getInstanceID(), tableId);
+ locators.remove(key);
+ return offlineLocators.computeIfAbsent(tableId,
+ f -> new OfflineTabletLocatorImpl(context, tableId));
+ } else {
+ offlineLocators.remove(tableId);
+ LocatorKey key = new LocatorKey(context.getInstanceID(), tableId);
+ TabletLocator tl = locators.get(key);
+ if (tl == null) {
+ MetadataLocationObtainer mlo = new MetadataLocationObtainer();
+
+ if (RootTable.ID.equals(tableId)) {
+ tl = new RootTabletLocator(context.getTServerLockChecker());
+ } else if (MetadataTable.ID.equals(tableId)) {
+ tl = new TabletLocatorImpl(MetadataTable.ID, getLocator(context,
RootTable.ID), mlo,
+ context.getTServerLockChecker());
+ } else {
+ tl = new TabletLocatorImpl(tableId, getLocator(context,
MetadataTable.ID), mlo,
+ context.getTServerLockChecker());
+ }
+ locators.put(key, tl);
}
- locators.put(key, tl);
+ return tl;
}
- return tl;
}
static {
diff --git
a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReader.java
b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReader.java
index 8b149da57f..cb03b0b4d8 100644
---
a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReader.java
+++
b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReader.java
@@ -118,6 +118,10 @@ public class TabletServerBatchReader extends
ScannerOptions implements BatchScan
throw new IllegalStateException("batch reader closed");
}
+ if (getConsistencyLevel() == ConsistencyLevel.IMMEDIATE) {
+ context.requireNotOffline(tableId, tableName);
+ }
+
return new TabletServerBatchReaderIterator(context, tableId, tableName,
authorizations, ranges,
numThreads, queryThreadPool, this, retryTimeout);
}
diff --git
a/core/src/main/java/org/apache/accumulo/core/conf/ClientProperty.java
b/core/src/main/java/org/apache/accumulo/core/conf/ClientProperty.java
index 34780c8a06..1bcf48c78c 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/ClientProperty.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/ClientProperty.java
@@ -141,7 +141,29 @@ public enum ClientProperty {
"A list of span receiver classes to send trace spans"),
@Deprecated(since = "2.1.0", forRemoval = true)
TRACE_ZOOKEEPER_PATH("trace.zookeeper.path", Constants.ZTRACERS,
PropertyType.PATH,
- "The zookeeper node where tracers are registered", "2.0.0", false);
+ "The zookeeper node where tracers are registered", "2.0.0", false),
+
+ /*
+ * For use with OfflineTabletLocatorImpl
+ */
+ @Experimental
+ OFFLINE_LOCATOR_CACHE_DURATION("offline.locator.cache.duration", "10m",
PropertyType.TIMEDURATION,
+ "The client caches extent information for offline tables for use with
eventually consistent"
+ + " scans and Scan Servers. This property controls how long the
extent information is cached"
+ + " in the client after it's last use.",
+ "2.1.5", false),
+ @Experimental
+ OFFLINE_LOCATOR_CACHE_PREFETCH("offline.locator.cache.prefetch", "10",
PropertyType.COUNT,
+ "The number of offline extents that should be pre-loaded into the cache.
This may reduce"
+ + " the load on the metadata table when looking up extent
information. Smaller values"
+ + " may make sense here in a random lookup workload, larger values
in sequential scans"
+ + " over multiple tablets.",
+ "2.1.5", false),
+ @Experimental
+ OFFLINE_LOCATOR_CACHE_SIZE("offline.locator.cache.size", "0",
PropertyType.COUNT,
+ "The number of offline extents that should be cached in the client. The"
+ + " value zero disables the size limitation.",
+ "2.1.5", false);
@Deprecated(since = "2.1.0", forRemoval = true)
public static final String TRACE_SPAN_RECEIVER_PREFIX =
"trace.span.receiver";
diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/GCRun.java
b/server/gc/src/main/java/org/apache/accumulo/gc/GCRun.java
index 22275c9e11..f56d2a21a9 100644
--- a/server/gc/src/main/java/org/apache/accumulo/gc/GCRun.java
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/GCRun.java
@@ -47,6 +47,7 @@ import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.IsolatedScanner;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.TableOfflineException;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.TableId;
@@ -448,7 +449,8 @@ public class GCRun implements GarbageCollectionEnvironment {
}
return Maps.immutableEntry(file, stat);
});
- } catch
(org.apache.accumulo.core.replication.ReplicationTableOfflineException e) {
+ } catch
(org.apache.accumulo.core.replication.ReplicationTableOfflineException
+ | TableOfflineException e) {
// No elements that we need to preclude
return Collections.emptyIterator();
}
diff --git
a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
index 4d9c4e745a..39aec22ca2 100644
---
a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
+++
b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
@@ -38,6 +38,7 @@ import java.util.stream.Stream;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.TableOfflineException;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Value;
@@ -448,7 +449,8 @@ public class GarbageCollectWriteAheadLogs {
candidates.remove(id);
log.info("Ignore closed log " + id + " because it is being
replicated");
}
- } catch
(org.apache.accumulo.core.replication.ReplicationTableOfflineException ex) {
+ } catch
(org.apache.accumulo.core.replication.ReplicationTableOfflineException
+ | TableOfflineException ex) {
return candidates.size();
}
diff --git
a/test/src/main/java/org/apache/accumulo/test/ScanServerAllowedTablesIT.java
b/test/src/main/java/org/apache/accumulo/test/ScanServerAllowedTablesIT.java
index e14834cf48..a4039ca36a 100644
--- a/test/src/main/java/org/apache/accumulo/test/ScanServerAllowedTablesIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/ScanServerAllowedTablesIT.java
@@ -154,8 +154,8 @@ public class ScanServerAllowedTablesIT extends
SharedMiniClusterBase {
BATCH_SCANNER, SCANNER;
}
- private ScannerBase createScanner(AccumuloClient client, ScannerType stype,
String tableName)
- throws TableNotFoundException {
+ public static ScannerBase createScanner(AccumuloClient client, ScannerType
stype,
+ String tableName) throws TableNotFoundException {
switch (stype) {
case BATCH_SCANNER:
BatchScanner batchScanner = client.createBatchScanner(tableName,
Authorizations.EMPTY);
diff --git a/test/src/main/java/org/apache/accumulo/test/ScanServerIT.java
b/test/src/main/java/org/apache/accumulo/test/ScanServerIT.java
index 77cdc8c647..b271ae2286 100644
--- a/test/src/main/java/org/apache/accumulo/test/ScanServerIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/ScanServerIT.java
@@ -35,7 +35,6 @@ import org.apache.accumulo.core.client.BatchScanner;
import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.ScannerBase.ConsistencyLevel;
-import org.apache.accumulo.core.client.TableOfflineException;
import org.apache.accumulo.core.client.TimedOutException;
import org.apache.accumulo.core.client.admin.NewTableConfiguration;
import org.apache.accumulo.core.conf.ClientProperty;
@@ -146,24 +145,6 @@ public class ScanServerIT extends SharedMiniClusterBase {
}
}
- @Test
- public void testScanOfflineTable() throws Exception {
- try (AccumuloClient client =
Accumulo.newClient().from(getClientProps()).build()) {
- String tableName = getUniqueNames(1)[0];
-
- createTableAndIngest(client, tableName, null, 10, 10, "colf");
- client.tableOperations().offline(tableName, true);
-
- assertThrows(TableOfflineException.class, () -> {
- try (Scanner scanner = client.createScanner(tableName,
Authorizations.EMPTY)) {
- scanner.setRange(new Range());
- scanner.setConsistencyLevel(ConsistencyLevel.EVENTUAL);
- assertEquals(100, Iterables.size(scanner));
- } // when the scanner is closed, all open sessions should be closed
- });
- }
- }
-
@Test
@Timeout(value = 20)
public void testBatchScannerTimeout() throws Exception {
@@ -232,7 +213,7 @@ public class ScanServerIT extends SharedMiniClusterBase {
*/
public static int ingest(AccumuloClient client, String tableName, int
rowCount, int colCount,
int offset, String colf, boolean shouldFlush) throws Exception {
- ReadWriteIT.ingest(client, colCount, rowCount, 50, offset, colf,
tableName);
+ ReadWriteIT.ingest(client, rowCount, colCount, 50, offset, colf,
tableName);
final int ingestedEntriesCount = colCount * rowCount;
diff --git
a/test/src/main/java/org/apache/accumulo/test/ScanServerOfflineTableIT.java
b/test/src/main/java/org/apache/accumulo/test/ScanServerOfflineTableIT.java
new file mode 100644
index 0000000000..e9056cea5c
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/ScanServerOfflineTableIT.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.stream.Stream;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.Accumulo;
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.ScannerBase;
+import org.apache.accumulo.core.client.ScannerBase.ConsistencyLevel;
+import org.apache.accumulo.core.clientImpl.ClientContext;
+import org.apache.accumulo.core.clientImpl.TabletLocator;
+import org.apache.accumulo.core.conf.ClientProperty;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.TableId;
+import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter;
+import org.apache.accumulo.harness.MiniClusterConfigurationCallback;
+import org.apache.accumulo.harness.SharedMiniClusterBase;
+import org.apache.accumulo.minicluster.ServerType;
+import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.test.ScanServerAllowedTablesIT.ScannerType;
+import org.apache.accumulo.test.functional.ReadWriteIT;
+import org.apache.hadoop.io.Text;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.extension.ExtensionContext;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.ArgumentsProvider;
+import org.junit.jupiter.params.provider.ArgumentsSource;
+import org.junit.jupiter.params.provider.EnumSource;
+
+import com.google.common.collect.Iterables;
+
+public class ScanServerOfflineTableIT extends SharedMiniClusterBase {
+
+ private static class ScanServerOfflineITConfiguration
+ implements MiniClusterConfigurationCallback {
+
+ @Override
+ public void configureMiniCluster(MiniAccumuloConfigImpl cfg,
+ org.apache.hadoop.conf.Configuration coreSite) {
+ cfg.setNumScanServers(1);
+
+ // Timeout scan sessions after being idle for 3 seconds
+ cfg.setProperty(Property.TSERV_SESSION_MAXIDLE, "3s");
+
+ // Configure the scan server to only have 1 scan executor thread. This
means
+ // that the scan server will run scans serially, not concurrently.
+ cfg.setProperty(Property.SSERV_SCAN_EXECUTORS_DEFAULT_THREADS, "1");
+ }
+ }
+
+ @BeforeAll
+ public static void start() throws Exception {
+ ScanServerOfflineITConfiguration c = new
ScanServerOfflineITConfiguration();
+ SharedMiniClusterBase.startMiniClusterWithConfig(c);
+
SharedMiniClusterBase.getCluster().getClusterControl().start(ServerType.SCAN_SERVER,
+ "localhost");
+
+ String zooRoot = getCluster().getServerContext().getZooKeeperRoot();
+ ZooReaderWriter zrw = getCluster().getServerContext().getZooReaderWriter();
+ String scanServerRoot = zooRoot + Constants.ZSSERVERS;
+
+ while (zrw.getChildren(scanServerRoot).size() == 0) {
+ Thread.sleep(500);
+ }
+ }
+
+ @AfterAll
+ public static void stop() throws Exception {
+ SharedMiniClusterBase.stopMiniCluster();
+ }
+
+ @ParameterizedTest
+ @EnumSource(value = ScanServerAllowedTablesIT.ScannerType.class)
+ public void testSimpleScan(ScannerType stype) throws Exception {
+ try (AccumuloClient client =
Accumulo.newClient().from(getClientProps()).build()) {
+ String tableName = getUniqueNames(1)[0] + stype.name();
+
+ final int ingestedEntryCount =
+ ScanServerIT.createTableAndIngest(client, tableName, null, 10, 10,
"colf");
+ client.tableOperations().offline(tableName, true);
+
+ // This isn't necessary, but will ensure that the TabletLocator is
cleared
+ // Invalidate the TabletLocator for the offline table
+ TabletLocator.getLocator((ClientContext) client,
+
TableId.of(client.tableOperations().tableIdMap().get(tableName))).invalidateCache();
+
+ try (
+ ScannerBase scanner =
ScanServerAllowedTablesIT.createScanner(client, stype, tableName)) {
+ scanner.setConsistencyLevel(ConsistencyLevel.EVENTUAL);
+ assertEquals(ingestedEntryCount, Iterables.size(scanner),
+ "The scan server scanner should have seen all ingested and flushed
entries");
+ } // when the scanner is closed, all open sessions should be closed
+ }
+ }
+
+ public static class ArgProvider implements ArgumentsProvider {
+
+ @Override
+ public Stream<? extends Arguments> provideArguments(ExtensionContext
context) throws Exception {
+ List<Arguments> args = new ArrayList<>();
+ args.add(Arguments.of(ScannerType.BATCH_SCANNER, 0));
+ args.add(Arguments.of(ScannerType.SCANNER, 0));
+ args.add(Arguments.of(ScannerType.BATCH_SCANNER, 100));
+ args.add(Arguments.of(ScannerType.SCANNER, 100));
+ return args.stream();
+ }
+
+ }
+
+ @ParameterizedTest
+ @ArgumentsSource(ArgProvider.class)
+ public void testScan(ScannerType stype, int maxCacheSize) throws Exception {
+
+ final int rows = 1000;
+
+ final Properties p = getClientProps();
+ p.put(ClientProperty.OFFLINE_LOCATOR_CACHE_SIZE.getKey(),
Integer.toString(maxCacheSize));
+
+ try (AccumuloClient client = Accumulo.newClient().from(p).build()) {
+ String tableName = getUniqueNames(1)[0] + stype.name() + "_" +
maxCacheSize;
+
+ final int ingestedEntryCount =
+ ScanServerIT.createTableAndIngest(client, tableName, null, rows, 10,
"colf");
+
+ try (
+ ScannerBase scanner =
ScanServerAllowedTablesIT.createScanner(client, stype, tableName)) {
+ scanner.setConsistencyLevel(ConsistencyLevel.IMMEDIATE);
+ assertEquals(ingestedEntryCount, Iterables.size(scanner),
+ "The tablet server scanner should have seen all ingested and
flushed entries");
+ } // when the scanner is closed, all open sessions should be closed
+ ReadWriteIT.verify(client, rows, 10, 50, 0, tableName);
+
+ client.tableOperations().offline(tableName, true);
+
+ // This isn't necessary, but will ensure that the TabletLocator is
cleared
+ // Invalidate the TabletLocator for the offline table
+ TabletLocator.getLocator((ClientContext) client,
+
TableId.of(client.tableOperations().tableIdMap().get(tableName))).invalidateCache();
+
+ try (
+ ScannerBase scanner =
ScanServerAllowedTablesIT.createScanner(client, stype, tableName)) {
+ scanner.setConsistencyLevel(ConsistencyLevel.EVENTUAL);
+ assertEquals(ingestedEntryCount, Iterables.size(scanner),
+ "The scan server scanner should have seen all ingested and flushed
entries");
+ } // when the scanner is closed, all open sessions should be closed
+ ReadWriteIT.verifyEventual(client, rows, 10, 50, 0, tableName);
+
+ client.tableOperations().online(tableName, true);
+
+ try (
+ ScannerBase scanner =
ScanServerAllowedTablesIT.createScanner(client, stype, tableName)) {
+ scanner.setConsistencyLevel(ConsistencyLevel.IMMEDIATE);
+ assertEquals(ingestedEntryCount, Iterables.size(scanner),
+ "The tablet server scanner should have seen all ingested and
flushed entries");
+ } // when the scanner is closed, all open sessions should be closed
+ ReadWriteIT.verify(client, rows, 10, 50, 0, tableName);
+
+ // Add some splits to the table
+ SortedSet<Text> splits = new TreeSet<>();
+ for (int i = 0; i < rows; i++) {
+ splits.add(new Text("row_" + String.format("%010d", i)));
+ }
+ client.tableOperations().addSplits(tableName, splits);
+ client.instanceOperations().waitForBalance();
+
+ try (
+ ScannerBase scanner =
ScanServerAllowedTablesIT.createScanner(client, stype, tableName)) {
+ scanner.setConsistencyLevel(ConsistencyLevel.IMMEDIATE);
+ assertEquals(ingestedEntryCount, Iterables.size(scanner),
+ "The tablet server scanner should have seen all ingested and
flushed entries");
+ } // when the scanner is closed, all open sessions should be closed
+ ReadWriteIT.verify(client, rows, 10, 50, 0, tableName);
+
+ client.tableOperations().offline(tableName, true);
+
+ // This isn't necessary, but will ensure that the TabletLocator is
cleared
+ // Invalidate the TabletLocator for the offline table
+ TabletLocator.getLocator((ClientContext) client,
+
TableId.of(client.tableOperations().tableIdMap().get(tableName))).invalidateCache();
+
+ try (
+ ScannerBase scanner =
ScanServerAllowedTablesIT.createScanner(client, stype, tableName)) {
+ scanner.setConsistencyLevel(ConsistencyLevel.EVENTUAL);
+ assertEquals(ingestedEntryCount, Iterables.size(scanner),
+ "The scan server scanner should have seen all ingested and flushed
entries");
+ } // when the scanner is closed, all open sessions should be closed
+ ReadWriteIT.verifyEventual(client, rows, 10, 50, 0, tableName);
+
+ }
+
+ }
+
+}
diff --git a/test/src/main/java/org/apache/accumulo/test/VerifyIngest.java
b/test/src/main/java/org/apache/accumulo/test/VerifyIngest.java
index 092ea14a75..527ac4a95f 100644
--- a/test/src/main/java/org/apache/accumulo/test/VerifyIngest.java
+++ b/test/src/main/java/org/apache/accumulo/test/VerifyIngest.java
@@ -30,6 +30,7 @@ import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.ScannerBase.ConsistencyLevel;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.conf.ClientProperty;
import org.apache.accumulo.core.data.Key;
@@ -109,6 +110,12 @@ public class VerifyIngest {
public static void verifyIngest(AccumuloClient accumuloClient, VerifyParams
params)
throws AccumuloException, AccumuloSecurityException,
TableNotFoundException {
+ verifyIngest(accumuloClient, params, ConsistencyLevel.IMMEDIATE);
+ }
+
+ public static void verifyIngest(AccumuloClient accumuloClient, VerifyParams
params,
+ ConsistencyLevel cl)
+ throws AccumuloException, AccumuloSecurityException,
TableNotFoundException {
byte[][] bytevals = TestIngest.generateValues(params.dataSize);
Authorizations labelAuths = new Authorizations("L1", "L2", "G1", "GROUP2");
@@ -136,6 +143,7 @@ public class VerifyIngest {
Text colq = new Text("col_" + String.format("%07d", expectedCol));
try (Scanner scanner = accumuloClient.createScanner("test_ingest",
labelAuths)) {
+ scanner.setConsistencyLevel(cl);
scanner.setBatchSize(1);
Key startKey = new Key(rowKey, colf, colq);
Range range = new Range(startKey,
startKey.followingKey(PartialKey.ROW_COLFAM_COLQUAL));
@@ -181,6 +189,7 @@ public class VerifyIngest {
Key startKey = new Key(new Text("row_" + String.format("%010d",
expectedRow)));
try (Scanner scanner = accumuloClient.createScanner(params.tableName,
labelAuths)) {
+ scanner.setConsistencyLevel(cl);
scanner.setRange(new Range(startKey, endKey));
for (int j = 0; j < params.cols; j++) {
scanner.fetchColumn(new Text(params.columnFamily),
diff --git
a/test/src/main/java/org/apache/accumulo/test/functional/ReadWriteIT.java
b/test/src/main/java/org/apache/accumulo/test/functional/ReadWriteIT.java
index 44eb5b2374..c4cb9574a9 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/ReadWriteIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/ReadWriteIT.java
@@ -62,6 +62,7 @@ import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.BatchScanner;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.ScannerBase.ConsistencyLevel;
import org.apache.accumulo.core.client.admin.NewTableConfiguration;
import org.apache.accumulo.core.client.admin.TableOperations;
import org.apache.accumulo.core.clientImpl.ClientContext;
@@ -211,18 +212,23 @@ public class ReadWriteIT extends AccumuloClusterHarness {
public static void verify(AccumuloClient accumuloClient, int rows, int cols,
int width,
int offset, String tableName) throws Exception {
- verify(accumuloClient, rows, cols, width, offset, COLF, tableName);
+ verify(accumuloClient, rows, cols, width, offset, COLF, tableName,
ConsistencyLevel.IMMEDIATE);
+ }
+
+ public static void verifyEventual(AccumuloClient accumuloClient, int rows,
int cols, int width,
+ int offset, String tableName) throws Exception {
+ verify(accumuloClient, rows, cols, width, offset, COLF, tableName,
ConsistencyLevel.EVENTUAL);
}
private static void verify(AccumuloClient accumuloClient, int rows, int
cols, int width,
- int offset, String colf, String tableName) throws Exception {
+ int offset, String colf, String tableName, ConsistencyLevel cl) throws
Exception {
VerifyParams params = new VerifyParams(accumuloClient.properties(),
tableName, rows);
params.rows = rows;
params.dataSize = width;
params.startRow = offset;
params.columnFamily = colf;
params.cols = cols;
- VerifyIngest.verifyIngest(accumuloClient, params);
+ VerifyIngest.verifyIngest(accumuloClient, params, cl);
}
public static String[] args(String... args) {
@@ -445,7 +451,7 @@ public class ReadWriteIT extends AccumuloClusterHarness {
to.setLocalityGroups(table, getGroups(cfg));
to.flush(table, null, null, true);
verify(accumuloClient, ROWS * i, 1, 50, 0, table);
- verify(accumuloClient, ROWS * i, 1, 50, 0, "xyz", table);
+ verify(accumuloClient, ROWS * i, 1, 50, 0, "xyz", table,
ConsistencyLevel.IMMEDIATE);
i++;
}
}