This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push: new 39bc7a0524 Add base objects for Tablet and TabletServer for the upcoming ScanServer feature (#2661) 39bc7a0524 is described below commit 39bc7a0524862a3d39a818e9ac7748874dc06949 Author: Dave Marion <dlmar...@apache.org> AuthorDate: Thu Apr 28 16:06:57 2022 -0400 Add base objects for Tablet and TabletServer for the upcoming ScanServer feature (#2661) This commit introduces TabletBase, a base class for Tablets, and TabletHostingServer, an interface for server components that host Tablets. These changes will be used by the ScanServer feature that is in the works. Related to #2411 Co-authored-by: Keith Turner <ktur...@apache.org> --- .../system/SourceSwitchingIterator.java | 2 + .../accumulo/tserver/TabletClientHandler.java | 10 +- .../accumulo/tserver/TabletHostingServer.java | 57 +++ .../org/apache/accumulo/tserver/TabletServer.java | 27 +- .../accumulo/tserver/ThriftScanClientHandler.java | 51 +-- .../apache/accumulo/tserver/scan/LookupTask.java | 4 +- .../accumulo/tserver/scan/NextBatchTask.java | 4 +- .../org/apache/accumulo/tserver/scan/ScanTask.java | 6 +- .../accumulo/tserver/tablet/ScanDataSource.java | 33 +- .../apache/accumulo/tserver/tablet/Scanner.java | 8 +- .../org/apache/accumulo/tserver/tablet/Tablet.java | 455 +++----------------- .../apache/accumulo/tserver/tablet/TabletBase.java | 462 +++++++++++++++++++++ 12 files changed, 670 insertions(+), 449 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/iteratorsImpl/system/SourceSwitchingIterator.java b/core/src/main/java/org/apache/accumulo/core/iteratorsImpl/system/SourceSwitchingIterator.java index 27bf74e6b4..92bffc6555 100644 --- a/core/src/main/java/org/apache/accumulo/core/iteratorsImpl/system/SourceSwitchingIterator.java +++ b/core/src/main/java/org/apache/accumulo/core/iteratorsImpl/system/SourceSwitchingIterator.java @@ -54,6 +54,8 @@ public class SourceSwitchingIterator implements InterruptibleIterator { SortedKeyValueIterator<Key,Value> iterator() throws IOException; void setInterruptFlag(AtomicBoolean flag); + + default void close(boolean sawErrors) {} } private DataSource source; diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java index 5ffa3a7311..242b52ba55 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java @@ -1040,7 +1040,7 @@ public class TabletClientHandler implements TabletClientService.Iface { } static void checkPermission(SecurityOperation security, ServerContext context, - TabletServer server, TCredentials credentials, String lock, final String request) + TabletHostingServer server, TCredentials credentials, String lock, final String request) throws ThriftSecurityException { try { log.trace("Got {} message from user: {}", request, credentials.getPrincipal()); @@ -1069,7 +1069,7 @@ public class TabletClientHandler implements TabletClientService.Iface { Halt.halt(1, () -> { log.info("Tablet server no longer holds lock during checkPermission() : {}, exiting", request); - server.gcLogger.logGCInfo(server.getConfiguration()); + server.getGcLogger().logGCInfo(server.getConfiguration()); }); } @@ -1078,11 +1078,11 @@ public class TabletClientHandler implements TabletClientService.Iface { new ZooUtil.LockID(context.getZooKeeperRoot() + Constants.ZMANAGER_LOCK, lock); try { - if (!ServiceLock.isLockHeld(server.managerLockCache, lid)) { + if (!ServiceLock.isLockHeld(server.getManagerLockCache(), lid)) { // maybe the cache is out of date and a new manager holds the // lock? - server.managerLockCache.clear(); - if (!ServiceLock.isLockHeld(server.managerLockCache, lid)) { + server.getManagerLockCache().clear(); + if (!ServiceLock.isLockHeld(server.getManagerLockCache(), lid)) { log.warn("Got {} message from a manager that does not hold the current lock {}", request, lock); throw new RuntimeException("bad manager lock"); diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletHostingServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletHostingServer.java new file mode 100644 index 0000000000..15c43d6625 --- /dev/null +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletHostingServer.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.tserver; + +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.dataImpl.KeyExtent; +import org.apache.accumulo.fate.zookeeper.ServiceLock; +import org.apache.accumulo.fate.zookeeper.ZooCache; +import org.apache.accumulo.server.GarbageCollectionLogger; +import org.apache.accumulo.server.ServerContext; +import org.apache.accumulo.server.conf.TableConfiguration; +import org.apache.accumulo.tserver.metrics.TabletServerScanMetrics; +import org.apache.accumulo.tserver.session.Session; +import org.apache.accumulo.tserver.session.SessionManager; +import org.apache.accumulo.tserver.tablet.Tablet; + +public interface TabletHostingServer { + + ServerContext getContext(); + + AccumuloConfiguration getConfiguration(); + + Tablet getOnlineTablet(KeyExtent extent); + + SessionManager getSessionManager(); + + TabletServerResourceManager getResourceManager(); + + TabletServerScanMetrics getScanMetrics(); + + Session getSession(long scanID); + + TableConfiguration getTableConfiguration(KeyExtent threadPoolExtent); + + ServiceLock getLock(); + + ZooCache getManagerLockCache(); + + GarbageCollectionLogger getGcLogger(); + +} diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java index 910c4e9303..6508d9e718 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java @@ -170,7 +170,7 @@ import com.google.common.collect.Iterators; import io.opentelemetry.api.trace.Span; import io.opentelemetry.context.Scope; -public class TabletServer extends AbstractServer { +public class TabletServer extends AbstractServer implements TabletHostingServer { private static final SecureRandom random = new SecureRandom(); private static final Logger log = LoggerFactory.getLogger(TabletServer.class); @@ -188,6 +188,7 @@ public class TabletServer extends AbstractServer { TabletServerMinCMetrics mincMetrics; CompactionExecutorsMetrics ceMetrics; + @Override public TabletServerScanMetrics getScanMetrics() { return scanMetrics; } @@ -422,6 +423,7 @@ public class TabletServer extends AbstractServer { return totalQueuedMutationSize.addAndGet(additionalMutationSize); } + @Override public Session getSession(long sessionId) { return sessionManager.getSession(sessionId); } @@ -651,10 +653,21 @@ public class TabletServer extends AbstractServer { } } + @Override public ServiceLock getLock() { return tabletServerLock; } + @Override + public ZooCache getManagerLockCache() { + return managerLockCache; + } + + @Override + public GarbageCollectionLogger getGcLogger() { + return gcLogger; + } + private void announceExistence() { ZooReaderWriter zoo = getContext().getZooReaderWriter(); try { @@ -1204,6 +1217,7 @@ public class TabletServer extends AbstractServer { return logId; } + @Override public TableConfiguration getTableConfiguration(KeyExtent extent) { return getContext().getTableConfiguration(extent.tableId()); } @@ -1227,10 +1241,21 @@ public class TabletServer extends AbstractServer { return onlineTablets.snapshot(); } + @Override public Tablet getOnlineTablet(KeyExtent extent) { return onlineTablets.snapshot().get(extent); } + @Override + public SessionManager getSessionManager() { + return sessionManager; + } + + @Override + public TabletServerResourceManager getResourceManager() { + return resourceManager; + } + public VolumeManager getVolumeManager() { return getContext().getVolumeManager(); } diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/ThriftScanClientHandler.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/ThriftScanClientHandler.java index 4e071cba5e..7c0c47fea2 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/ThriftScanClientHandler.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/ThriftScanClientHandler.java @@ -88,13 +88,13 @@ public class ThriftScanClientHandler implements TabletScanClientService.Iface { private static final Logger log = LoggerFactory.getLogger(ThriftScanClientHandler.class); - private final TabletServer server; + private final TabletHostingServer server; protected final ServerContext context; protected final SecurityOperation security; private final WriteTracker writeTracker; private final long MAX_TIME_TO_WAIT_FOR_SCAN_RESULT_MILLIS; - public ThriftScanClientHandler(TabletServer server, WriteTracker writeTracker) { + public ThriftScanClientHandler(TabletHostingServer server, WriteTracker writeTracker) { this.server = server; this.context = server.getContext(); this.writeTracker = writeTracker; @@ -185,7 +185,7 @@ public class ThriftScanClientHandler implements TabletScanClientService.Iface { scanSession.scanner = tablet.createScanner(new Range(range), scanParams, scanSession.interruptFlag); - long sid = server.sessionManager.createSession(scanSession, true); + long sid = server.getSessionManager().createSession(scanSession, true); ScanResult scanResult; try { @@ -194,7 +194,7 @@ public class ThriftScanClientHandler implements TabletScanClientService.Iface { log.error("The impossible happened", e); throw new RuntimeException(); } finally { - server.sessionManager.unreserveSession(sid); + server.getSessionManager().unreserveSession(sid); } return new InitialScan(sid, scanResult); @@ -205,7 +205,7 @@ public class ThriftScanClientHandler implements TabletScanClientService.Iface { NotServingTabletException, org.apache.accumulo.core.tabletserver.thrift.TooManyFilesException, TSampleNotPresentException { SingleScanSession scanSession = - (SingleScanSession) server.sessionManager.reserveSession(scanID); + (SingleScanSession) server.getSessionManager().reserveSession(scanID); if (scanSession == null) { throw new NoSuchScanIDException(); } @@ -213,7 +213,7 @@ public class ThriftScanClientHandler implements TabletScanClientService.Iface { try { return continueScan(tinfo, scanID, scanSession); } finally { - server.sessionManager.unreserveSession(scanSession); + server.getSessionManager().unreserveSession(scanSession); } } @@ -224,7 +224,7 @@ public class ThriftScanClientHandler implements TabletScanClientService.Iface { if (scanSession.nextBatchTask == null) { scanSession.nextBatchTask = new NextBatchTask(server, scanID, scanSession.interruptFlag); - server.resourceManager.executeReadAhead(scanSession.extent, + server.getResourceManager().executeReadAhead(scanSession.extent, getScanDispatcher(scanSession.extent), scanSession, scanSession.nextBatchTask); } @@ -234,7 +234,7 @@ public class ThriftScanClientHandler implements TabletScanClientService.Iface { TimeUnit.MILLISECONDS); scanSession.nextBatchTask = null; } catch (ExecutionException e) { - server.sessionManager.removeSession(scanID); + server.getSessionManager().removeSession(scanID); if (e.getCause() instanceof NotServingTabletException) { throw (NotServingTabletException) e.getCause(); } else if (e.getCause() instanceof TooManyFilesException) { @@ -251,7 +251,7 @@ public class ThriftScanClientHandler implements TabletScanClientService.Iface { throw new RuntimeException(e); } } catch (CancellationException ce) { - server.sessionManager.removeSession(scanID); + server.getSessionManager().removeSession(scanID); Tablet tablet = server.getOnlineTablet(scanSession.extent); if (tablet == null || tablet.isClosed()) { throw new NotServingTabletException(scanSession.extent.toThrift()); @@ -261,10 +261,10 @@ public class ThriftScanClientHandler implements TabletScanClientService.Iface { } catch (TimeoutException e) { List<TKeyValue> param = Collections.emptyList(); long timeout = server.getConfiguration().getTimeInMillis(Property.TSERV_CLIENT_TIMEOUT); - server.sessionManager.removeIfNotAccessed(scanID, timeout); + server.getSessionManager().removeIfNotAccessed(scanID, timeout); return new ScanResult(param, true); } catch (Exception t) { - server.sessionManager.removeSession(scanID); + server.getSessionManager().removeSession(scanID); log.warn("Failed to get next batch", t); throw new RuntimeException(t); } @@ -279,7 +279,7 @@ public class ThriftScanClientHandler implements TabletScanClientService.Iface { // start reading next batch while current batch is transmitted // to client scanSession.nextBatchTask = new NextBatchTask(server, scanID, scanSession.interruptFlag); - server.resourceManager.executeReadAhead(scanSession.extent, + server.getResourceManager().executeReadAhead(scanSession.extent, getScanDispatcher(scanSession.extent), scanSession, scanSession.nextBatchTask); } @@ -292,7 +292,8 @@ public class ThriftScanClientHandler implements TabletScanClientService.Iface { @Override public void closeScan(TInfo tinfo, long scanID) { - final SingleScanSession ss = (SingleScanSession) server.sessionManager.removeSession(scanID); + final SingleScanSession ss = + (SingleScanSession) server.getSessionManager().removeSession(scanID); if (ss != null) { long t2 = System.currentTimeMillis(); @@ -302,8 +303,8 @@ public class ThriftScanClientHandler implements TabletScanClientService.Iface { (t2 - ss.startTime) / 1000.0, ss.runStats.toString())); } - server.scanMetrics.addScan(t2 - ss.startTime); - server.scanMetrics.addResult(ss.entriesReturned); + server.getScanMetrics().addScan(t2 - ss.startTime); + server.getScanMetrics().addResult(ss.entriesReturned); } } @@ -373,13 +374,13 @@ public class ThriftScanClientHandler implements TabletScanClientService.Iface { mss.numRanges += ranges.size(); } - long sid = server.sessionManager.createSession(mss, true); + long sid = server.getSessionManager().createSession(mss, true); MultiScanResult result; try { result = continueMultiScan(sid, mss); } finally { - server.sessionManager.unreserveSession(sid); + server.getSessionManager().unreserveSession(sid); } return new InitialMultiScan(sid, result); @@ -389,7 +390,7 @@ public class ThriftScanClientHandler implements TabletScanClientService.Iface { public MultiScanResult continueMultiScan(TInfo tinfo, long scanID) throws NoSuchScanIDException, TSampleNotPresentException { - MultiScanSession session = (MultiScanSession) server.sessionManager.reserveSession(scanID); + MultiScanSession session = (MultiScanSession) server.getSessionManager().reserveSession(scanID); if (session == null) { throw new NoSuchScanIDException(); @@ -398,7 +399,7 @@ public class ThriftScanClientHandler implements TabletScanClientService.Iface { try { return continueMultiScan(scanID, session); } finally { - server.sessionManager.unreserveSession(session); + server.getSessionManager().unreserveSession(session); } } @@ -407,7 +408,7 @@ public class ThriftScanClientHandler implements TabletScanClientService.Iface { if (session.lookupTask == null) { session.lookupTask = new LookupTask(server, scanID); - server.resourceManager.executeReadAhead(session.threadPoolExtent, + server.getResourceManager().executeReadAhead(session.threadPoolExtent, getScanDispatcher(session.threadPoolExtent), session, session.lookupTask); } @@ -417,7 +418,7 @@ public class ThriftScanClientHandler implements TabletScanClientService.Iface { session.lookupTask = null; return scanResult; } catch (ExecutionException e) { - server.sessionManager.removeSession(scanID); + server.getSessionManager().removeSession(scanID); if (e.getCause() instanceof SampleNotPresentException) { throw new TSampleNotPresentException(); } else { @@ -426,13 +427,13 @@ public class ThriftScanClientHandler implements TabletScanClientService.Iface { } } catch (TimeoutException e1) { long timeout = server.getConfiguration().getTimeInMillis(Property.TSERV_CLIENT_TIMEOUT); - server.sessionManager.removeIfNotAccessed(scanID, timeout); + server.getSessionManager().removeIfNotAccessed(scanID, timeout); List<TKeyValue> results = Collections.emptyList(); Map<TKeyExtent,List<TRange>> failures = Collections.emptyMap(); List<TKeyExtent> fullScans = Collections.emptyList(); return new MultiScanResult(results, failures, fullScans, null, null, false, true); } catch (Exception t) { - server.sessionManager.removeSession(scanID); + server.getSessionManager().removeSession(scanID); log.warn("Failed to get multiscan result", t); throw new RuntimeException(t); } @@ -440,7 +441,7 @@ public class ThriftScanClientHandler implements TabletScanClientService.Iface { @Override public void closeMultiScan(TInfo tinfo, long scanID) throws NoSuchScanIDException { - MultiScanSession session = (MultiScanSession) server.sessionManager.removeSession(scanID); + MultiScanSession session = (MultiScanSession) server.getSessionManager().removeSession(scanID); if (session == null) { throw new NoSuchScanIDException(); } @@ -466,7 +467,7 @@ public class ThriftScanClientHandler implements TabletScanClientService.Iface { throw e; } - return server.sessionManager.getActiveScans(); + return server.getSessionManager().getActiveScans(); } } diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/LookupTask.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/LookupTask.java index 10786f7061..3131196fb9 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/LookupTask.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/LookupTask.java @@ -40,7 +40,7 @@ import org.apache.accumulo.core.dataImpl.thrift.TKeyValue; import org.apache.accumulo.core.dataImpl.thrift.TRange; import org.apache.accumulo.core.iteratorsImpl.system.IterationInterruptedException; import org.apache.accumulo.server.conf.TableConfiguration; -import org.apache.accumulo.tserver.TabletServer; +import org.apache.accumulo.tserver.TabletHostingServer; import org.apache.accumulo.tserver.session.MultiScanSession; import org.apache.accumulo.tserver.tablet.KVEntry; import org.apache.accumulo.tserver.tablet.Tablet; @@ -54,7 +54,7 @@ public class LookupTask extends ScanTask<MultiScanResult> { private final long scanID; - public LookupTask(TabletServer server, long scanID) { + public LookupTask(TabletHostingServer server, long scanID) { super(server); this.scanID = scanID; } diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/NextBatchTask.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/NextBatchTask.java index 14ecf68c02..6a1030dfaf 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/NextBatchTask.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/NextBatchTask.java @@ -24,7 +24,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.apache.accumulo.core.client.SampleNotPresentException; import org.apache.accumulo.core.iteratorsImpl.system.IterationInterruptedException; import org.apache.accumulo.server.fs.TooManyFilesException; -import org.apache.accumulo.tserver.TabletServer; +import org.apache.accumulo.tserver.TabletHostingServer; import org.apache.accumulo.tserver.session.SingleScanSession; import org.apache.accumulo.tserver.tablet.ScanBatch; import org.apache.accumulo.tserver.tablet.Tablet; @@ -38,7 +38,7 @@ public class NextBatchTask extends ScanTask<ScanBatch> { private final long scanID; - public NextBatchTask(TabletServer server, long scanID, AtomicBoolean interruptFlag) { + public NextBatchTask(TabletHostingServer server, long scanID, AtomicBoolean interruptFlag) { super(server); this.scanID = scanID; this.interruptFlag = interruptFlag; diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/ScanTask.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/ScanTask.java index d814410ec2..a8469e04cc 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/ScanTask.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/ScanTask.java @@ -28,11 +28,11 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; -import org.apache.accumulo.tserver.TabletServer; +import org.apache.accumulo.tserver.TabletHostingServer; public abstract class ScanTask<T> implements RunnableFuture<T> { - protected final TabletServer server; + protected final TabletHostingServer server; protected AtomicBoolean interruptFlag; protected ArrayBlockingQueue<Object> resultQueue; protected AtomicInteger state; @@ -42,7 +42,7 @@ public abstract class ScanTask<T> implements RunnableFuture<T> { private static final int ADDED = 2; private static final int CANCELED = 3; - ScanTask(TabletServer server) { + ScanTask(TabletHostingServer server) { this.server = server; interruptFlag = new AtomicBoolean(false); runState = new AtomicReference<>(ScanRunState.QUEUED); diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanDataSource.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanDataSource.java index 54129ccee5..e2411c3458 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanDataSource.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanDataSource.java @@ -57,7 +57,7 @@ class ScanDataSource implements DataSource { private static final Logger log = LoggerFactory.getLogger(ScanDataSource.class); // data source state - private final Tablet tablet; + private final TabletBase tablet; private ScanFileManager fileManager; private SortedKeyValueIterator<Key,Value> iter; private long expectedDeletionCount; @@ -70,7 +70,7 @@ class ScanDataSource implements DataSource { private final boolean loadIters; private final byte[] defaultLabels; - ScanDataSource(Tablet tablet, ScanParameters scanParams, boolean loadIters, + ScanDataSource(TabletBase tablet, ScanParameters scanParams, boolean loadIters, AtomicBoolean interruptFlag) { this.tablet = tablet; this.expectedDeletionCount = tablet.getDataSourceDeletions(); @@ -91,14 +91,14 @@ class ScanDataSource implements DataSource { else { // log.debug("Switching data sources during a scan"); if (memIters != null) { - tablet.getTabletMemory().returnIterators(memIters); + tablet.returnMemIterators(memIters); memIters = null; - tablet.getDatafileManager().returnFilesForScan(fileReservationId); + tablet.returnFilesForScan(fileReservationId); fileReservationId = -1; } if (fileManager != null) { - tablet.getTabletServer().getScanMetrics().decrementOpenFiles(fileManager.getNumOpenFiles()); + tablet.getScanMetrics().decrementOpenFiles(fileManager.getNumOpenFiles()); fileManager.releaseOpenFiles(false); } @@ -142,7 +142,7 @@ class ScanDataSource implements DataSource { // only acquire the file manager when we know the tablet is open if (fileManager == null) { fileManager = tablet.getTabletResources().newScanFileManager(scanParams.getScanDispatch()); - tablet.getTabletServer().getScanMetrics().incrementOpenFiles(fileManager.getNumOpenFiles()); + tablet.getScanMetrics().incrementOpenFiles(fileManager.getNumOpenFiles()); tablet.addActiveScans(this); } @@ -153,9 +153,8 @@ class ScanDataSource implements DataSource { // getIterators() throws an exception expectedDeletionCount = tablet.getDataSourceDeletions(); - memIters = tablet.getTabletMemory().getIterators(samplerConfig); - Pair<Long,Map<TabletFile,DataFileValue>> reservation = - tablet.getDatafileManager().reserveFilesForScan(); + memIters = tablet.getMemIterators(samplerConfig); + Pair<Long,Map<TabletFile,DataFileValue>> reservation = tablet.reserveFilesForScan(); fileReservationId = reservation.getFirst(); files = reservation.getSecond(); } @@ -173,10 +172,9 @@ class ScanDataSource implements DataSource { MultiIterator multiIter = new MultiIterator(iters, tablet.getExtent()); - TabletIteratorEnvironment iterEnv = - new TabletIteratorEnvironment(tablet.getTabletServer().getContext(), IteratorScope.scan, - tablet.getTableConfiguration(), tablet.getExtent().tableId(), fileManager, files, - scanParams.getAuthorizations(), samplerConfig, new ArrayList<>()); + TabletIteratorEnvironment iterEnv = new TabletIteratorEnvironment(tablet.getContext(), + IteratorScope.scan, tablet.getTableConfiguration(), tablet.getExtent().tableId(), + fileManager, files, scanParams.getAuthorizations(), samplerConfig, new ArrayList<>()); statsIterator = new StatsIterator(multiIter, TabletServer.seekCount, tablet.getScannedCounter()); @@ -229,12 +227,13 @@ class ScanDataSource implements DataSource { } } - void close(boolean sawErrors) { + @Override + public void close(boolean sawErrors) { if (memIters != null) { - tablet.getTabletMemory().returnIterators(memIters); + tablet.returnMemIterators(memIters); memIters = null; - tablet.getDatafileManager().returnFilesForScan(fileReservationId); + tablet.returnFilesForScan(fileReservationId); fileReservationId = -1; } @@ -244,7 +243,7 @@ class ScanDataSource implements DataSource { } if (fileManager != null) { - tablet.getTabletServer().getScanMetrics().decrementOpenFiles(fileManager.getNumOpenFiles()); + tablet.getScanMetrics().decrementOpenFiles(fileManager.getNumOpenFiles()); fileManager.releaseOpenFiles(sawErrors); fileManager = null; } diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Scanner.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Scanner.java index b09407774b..d25068bf23 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Scanner.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Scanner.java @@ -38,7 +38,7 @@ import org.slf4j.LoggerFactory; public class Scanner { private static final Logger log = LoggerFactory.getLogger(Scanner.class); - private final Tablet tablet; + private final TabletBase tablet; private final ScanParameters scanParams; private Range range; private SortedKeyValueIterator<Key,Value> isolatedIter; @@ -55,7 +55,7 @@ public class Scanner { private AtomicBoolean interruptFlag; - Scanner(Tablet tablet, Range range, ScanParameters scanParams, AtomicBoolean interruptFlag) { + Scanner(TabletBase tablet, Range range, ScanParameters scanParams, AtomicBoolean interruptFlag) { this.tablet = tablet; this.range = range; this.scanParams = scanParams; @@ -87,10 +87,10 @@ public class Scanner { if (scanParams.isIsolated()) { if (isolatedDataSource == null) - isolatedDataSource = new ScanDataSource(tablet, scanParams, true, interruptFlag); + isolatedDataSource = tablet.createDataSource(scanParams, true, interruptFlag); dataSource = isolatedDataSource; } else { - dataSource = new ScanDataSource(tablet, scanParams, true, interruptFlag); + dataSource = tablet.createDataSource(scanParams, true, interruptFlag); } SortedKeyValueIterator<Key,Value> iter; diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java index eefc71cfa5..027af6a408 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java @@ -19,7 +19,6 @@ package org.apache.accumulo.tserver.tablet; import static java.nio.charset.StandardCharsets.UTF_8; -import static java.util.Objects.requireNonNull; import static java.util.stream.Collectors.toList; import static org.apache.accumulo.fate.util.UtilWaitThread.sleepUninterruptibly; @@ -56,7 +55,6 @@ import org.apache.accumulo.core.clientImpl.UserCompactionUtils; import org.apache.accumulo.core.conf.AccumuloConfiguration.Deriver; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.constraints.Violations; -import org.apache.accumulo.core.data.ByteSequence; import org.apache.accumulo.core.data.ColumnUpdate; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Mutation; @@ -66,8 +64,6 @@ import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.dataImpl.thrift.MapFileInfo; import org.apache.accumulo.core.file.FileOperations; import org.apache.accumulo.core.iterators.SortedKeyValueIterator; -import org.apache.accumulo.core.iterators.YieldCallback; -import org.apache.accumulo.core.iteratorsImpl.system.IterationInterruptedException; import org.apache.accumulo.core.iteratorsImpl.system.SourceSwitchingIterator; import org.apache.accumulo.core.logging.TabletLogger; import org.apache.accumulo.core.manager.state.tables.TableState; @@ -84,21 +80,17 @@ import org.apache.accumulo.core.metadata.schema.MetadataTime; import org.apache.accumulo.core.metadata.schema.TabletMetadata; import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType; import org.apache.accumulo.core.protobuf.ProtobufUtil; +import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl; import org.apache.accumulo.core.security.Authorizations; -import org.apache.accumulo.core.security.ColumnVisibility; import org.apache.accumulo.core.spi.fs.VolumeChooserEnvironment; import org.apache.accumulo.core.spi.scan.ScanDispatch; import org.apache.accumulo.core.tabletserver.log.LogEntry; import org.apache.accumulo.core.tabletserver.thrift.TabletStats; import org.apache.accumulo.core.trace.TraceUtil; -import org.apache.accumulo.core.util.LocalityGroupUtil; import org.apache.accumulo.core.util.Pair; -import org.apache.accumulo.core.util.ShutdownUtil; import org.apache.accumulo.core.volume.Volume; import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.server.compaction.CompactionStats; -import org.apache.accumulo.server.conf.TableConfiguration; -import org.apache.accumulo.server.fs.TooManyFilesException; import org.apache.accumulo.server.fs.VolumeChooserEnvironmentImpl; import org.apache.accumulo.server.fs.VolumeUtil; import org.apache.accumulo.server.fs.VolumeUtil.TabletFiles; @@ -124,6 +116,7 @@ import org.apache.accumulo.tserver.compactions.Compactable; import org.apache.accumulo.tserver.constraints.ConstraintChecker; import org.apache.accumulo.tserver.log.DfsLogger; import org.apache.accumulo.tserver.metrics.TabletServerMinCMetrics; +import org.apache.accumulo.tserver.metrics.TabletServerScanMetrics; import org.apache.accumulo.tserver.scan.ScanParameters; import org.apache.commons.codec.DecoderException; import org.apache.commons.codec.binary.Hex; @@ -144,17 +137,12 @@ import io.opentelemetry.context.Scope; /** * Provide access to a single row range in a living TabletServer. */ -public class Tablet { +public class Tablet extends TabletBase { private static final Logger log = LoggerFactory.getLogger(Tablet.class); - private static final byte[] EMPTY_BYTES = new byte[0]; - private final TabletServer tabletServer; - private final ServerContext context; - private final KeyExtent extent; private final TabletResourceManager tabletResources; private final DatafileManager datafileManager; - private final TableConfiguration tableConfiguration; private final String dirName; private final TabletMemory tabletMemory; @@ -168,12 +156,11 @@ public class Tablet { private final AtomicLong dataSourceDeletions = new AtomicLong(0); + @Override public long getDataSourceDeletions() { return dataSourceDeletions.get(); } - private final Set<ScanDataSource> activeScans = new HashSet<>(); - private enum CloseState { OPEN, CLOSING, CLOSED, COMPLETE } @@ -182,8 +169,8 @@ public class Tablet { private boolean updatingFlushID = false; - private long lastFlushID = -1; - private long lastCompactID = -1; + private AtomicLong lastFlushID = new AtomicLong(-1); + private AtomicLong lastCompactID = new AtomicLong(-1); private static class CompactionWaitInfo { long flushID = -1; @@ -212,12 +199,8 @@ public class Tablet { * Counts are maintained in this object and reported out with the Micrometer metrics via * TabletServerMetricsUtil */ - private long lookupCount = 0; - private long queryResultCount = 0; - private long queryResultBytes = 0; private long ingestCount = 0; private long ingestBytes = 0; - private final AtomicLong scannedCount = new AtomicLong(0); /** * Rates are calculated here in the Tablet for use in the Monitor but we do not emit them as @@ -229,8 +212,6 @@ public class Tablet { private final Rate ingestByteRate = new Rate(0.95); private final Rate scannedRate = new Rate(0.95); - private final Deriver<byte[]> defaultSecurityLabel; - private long lastMinorCompactionFinishTime = 0; private long lastMapFileImportTime = 0; @@ -303,27 +284,18 @@ public class Tablet { final TabletResourceManager trm, TabletData data) throws IOException, IllegalArgumentException { + super(tabletServer.getContext(), extent); + this.tabletServer = tabletServer; - this.context = tabletServer.getContext(); - this.extent = extent; this.tabletResources = trm; this.lastLocation = data.getLastLocation(); - this.lastFlushID = data.getFlushID(); - this.lastCompactID = data.getCompactID(); + this.lastFlushID.set(data.getFlushID()); + this.lastCompactID.set(data.getCompactID()); this.splitCreationTime = data.getSplitTime(); this.tabletTime = TabletTime.getInstance(data.getTime()); this.persistedTime = tabletTime.getTime(); this.logId = tabletServer.createLogId(); - TableConfiguration tblConf = tabletServer.getTableConfiguration(extent); - if (tblConf == null) { - tabletServer.getContext().clearTableListCache(); - tblConf = tabletServer.getTableConfiguration(extent); - requireNonNull(tblConf, "Could not get table configuration for " + extent.tableId()); - } - - this.tableConfiguration = tblConf; - // translate any volume changes @SuppressWarnings("deprecation") boolean replicationEnabled = org.apache.accumulo.core.replication.ReplicationConfigurationUtil @@ -344,14 +316,6 @@ public class Tablet { constraintChecker = tableConfiguration.newDeriver(ConstraintChecker::new); - if (extent.isMeta()) { - defaultSecurityLabel = () -> EMPTY_BYTES; - } else { - defaultSecurityLabel = tableConfiguration.newDeriver( - conf -> new ColumnVisibility(conf.get(Property.TABLE_DEFAULT_SCANTIME_VISIBILITY)) - .getExpression()); - } - tabletMemory = new TabletMemory(this); // don't bother examining WALs for recovery if Table is being deleted @@ -446,10 +410,6 @@ public class Tablet { data.getExternalCompactions()); } - public ServerContext getContext() { - return context; - } - private void removeOldTemporaryFiles( Map<ExternalCompactionId,ExternalCompactionMetadata> externalCompactions) { // remove any temporary files created by a previous tablet server @@ -481,152 +441,6 @@ public class Tablet { } } - private LookupResult lookup(SortedKeyValueIterator<Key,Value> mmfi, List<Range> ranges, - List<KVEntry> results, ScanParameters scanParams, long maxResultsSize) throws IOException { - - LookupResult lookupResult = new LookupResult(); - - boolean exceededMemoryUsage = false; - boolean tabletClosed = false; - - Set<ByteSequence> cfset = null; - if (!scanParams.getColumnSet().isEmpty()) { - cfset = LocalityGroupUtil.families(scanParams.getColumnSet()); - } - - long batchTimeOut = scanParams.getBatchTimeOut(); - - long timeToRun = TimeUnit.MILLISECONDS.toNanos(batchTimeOut); - long startNanos = System.nanoTime(); - - if (batchTimeOut <= 0 || batchTimeOut == Long.MAX_VALUE) { - batchTimeOut = 0; - } - - // determine if the iterator supported yielding - YieldCallback<Key> yield = new YieldCallback<>(); - mmfi.enableYielding(yield); - boolean yielded = false; - - for (Range range : ranges) { - - boolean timesUp = batchTimeOut > 0 && (System.nanoTime() - startNanos) > timeToRun; - - if (exceededMemoryUsage || tabletClosed || timesUp || yielded) { - lookupResult.unfinishedRanges.add(range); - continue; - } - - int entriesAdded = 0; - - try { - if (cfset != null) { - mmfi.seek(range, cfset, true); - } else { - mmfi.seek(range, Set.of(), false); - } - - while (mmfi.hasTop()) { - if (yield.hasYielded()) { - throw new IOException("Coding error: hasTop returned true but has yielded at " - + yield.getPositionAndReset()); - } - Key key = mmfi.getTopKey(); - - KVEntry kve = new KVEntry(key, mmfi.getTopValue()); - results.add(kve); - entriesAdded++; - lookupResult.bytesAdded += kve.estimateMemoryUsed(); - lookupResult.dataSize += kve.numBytes(); - - exceededMemoryUsage = lookupResult.bytesAdded > maxResultsSize; - - timesUp = batchTimeOut > 0 && (System.nanoTime() - startNanos) > timeToRun; - - if (exceededMemoryUsage || timesUp) { - addUnfinishedRange(lookupResult, range, key); - break; - } - - mmfi.next(); - } - - if (yield.hasYielded()) { - yielded = true; - Key yieldPosition = yield.getPositionAndReset(); - if (!range.contains(yieldPosition)) { - throw new IOException("Underlying iterator yielded to a position outside of its range: " - + yieldPosition + " not in " + range); - } - if (!results.isEmpty() - && yieldPosition.compareTo(results.get(results.size() - 1).getKey()) <= 0) { - throw new IOException("Underlying iterator yielded to a position" - + " that does not follow the last key returned: " + yieldPosition + " <= " - + results.get(results.size() - 1).getKey()); - } - addUnfinishedRange(lookupResult, range, yieldPosition); - - log.debug("Scan yield detected at position " + yieldPosition); - getTabletServer().getScanMetrics().addYield(1); - } - } catch (TooManyFilesException tmfe) { - // treat this as a closed tablet, and let the client retry - log.warn("Tablet {} has too many files, batch lookup can not run", getExtent()); - handleTabletClosedDuringScan(results, lookupResult, exceededMemoryUsage, range, - entriesAdded); - tabletClosed = true; - } catch (IOException ioe) { - if (ShutdownUtil.isShutdownInProgress()) { - // assume HDFS shutdown hook caused this exception - log.debug("IOException while shutdown in progress", ioe); - handleTabletClosedDuringScan(results, lookupResult, exceededMemoryUsage, range, - entriesAdded); - tabletClosed = true; - } else { - throw ioe; - } - } catch (IterationInterruptedException iie) { - if (isClosed()) { - handleTabletClosedDuringScan(results, lookupResult, exceededMemoryUsage, range, - entriesAdded); - tabletClosed = true; - } else { - throw iie; - } - } catch (TabletClosedException tce) { - handleTabletClosedDuringScan(results, lookupResult, exceededMemoryUsage, range, - entriesAdded); - tabletClosed = true; - } - - } - - return lookupResult; - } - - private void handleTabletClosedDuringScan(List<KVEntry> results, LookupResult lookupResult, - boolean exceededMemoryUsage, Range range, int entriesAdded) { - if (exceededMemoryUsage) { - throw new IllegalStateException( - "Tablet " + extent + "should not exceed memory usage or close, not both"); - } - - if (entriesAdded > 0) { - addUnfinishedRange(lookupResult, range, results.get(results.size() - 1).getKey()); - } else { - lookupResult.unfinishedRanges.add(range); - } - - lookupResult.closed = true; - } - - private void addUnfinishedRange(LookupResult lookupResult, Range range, Key key) { - if (range.getEndKey() == null || key.compareTo(range.getEndKey()) < 0) { - Range nlur = new Range(new Key(key), false, range.getEndKey(), range.isEndKeyInclusive()); - lookupResult.unfinishedRanges.add(nlur); - } - } - public void checkConditions(ConditionChecker checker, Authorizations authorizations, AtomicBoolean iFlag) throws IOException { @@ -634,7 +448,7 @@ public class Tablet { null, false, null, -1, null); scanParams.setScanDispatch(ScanDispatch.builder().build()); - ScanDataSource dataSource = new ScanDataSource(this, scanParams, false, iFlag); + ScanDataSource dataSource = createDataSource(scanParams, false, iFlag); try { SortedKeyValueIterator<Key,Value> iter = new SourceSwitchingIterator(dataSource); @@ -649,149 +463,6 @@ public class Tablet { } } - public LookupResult lookup(List<Range> ranges, List<KVEntry> results, ScanParameters scanParams, - long maxResultSize, AtomicBoolean interruptFlag) throws IOException { - - if (ranges.isEmpty()) { - return new LookupResult(); - } - - ranges = Range.mergeOverlapping(ranges); - if (ranges.size() > 1) { - Collections.sort(ranges); - } - - Range tabletRange = extent.toDataRange(); - for (Range range : ranges) { - // do a test to see if this range falls within the tablet, if it does not - // then clip will throw an exception - tabletRange.clip(range); - } - - ScanDataSource dataSource = new ScanDataSource(this, scanParams, true, interruptFlag); - - LookupResult result = null; - - try { - SortedKeyValueIterator<Key,Value> iter = new SourceSwitchingIterator(dataSource); - lookupCount++; - result = lookup(iter, ranges, results, scanParams, maxResultSize); - return result; - } catch (IOException ioe) { - dataSource.close(true); - throw ioe; - } finally { - // code in finally block because always want - // to return mapfiles, even when exception is thrown - dataSource.close(false); - - synchronized (this) { - queryResultCount += results.size(); - if (result != null) { - queryResultBytes += result.dataSize; - } - } - } - } - - Batch nextBatch(SortedKeyValueIterator<Key,Value> iter, Range range, ScanParameters scanParams) - throws IOException { - - // log.info("In nextBatch.."); - - long batchTimeOut = scanParams.getBatchTimeOut(); - - long timeToRun = TimeUnit.MILLISECONDS.toNanos(batchTimeOut); - long startNanos = System.nanoTime(); - - if (batchTimeOut == Long.MAX_VALUE || batchTimeOut <= 0) { - batchTimeOut = 0; - } - List<KVEntry> results = new ArrayList<>(); - Key key = null; - - Value value; - long resultSize = 0L; - long resultBytes = 0L; - - long maxResultsSize = tableConfiguration.getAsBytes(Property.TABLE_SCAN_MAXMEM); - - Key continueKey = null; - boolean skipContinueKey = false; - - YieldCallback<Key> yield = new YieldCallback<>(); - - // we cannot yield if we are in isolation mode - if (!scanParams.isIsolated()) { - iter.enableYielding(yield); - } - - if (scanParams.getColumnSet().isEmpty()) { - iter.seek(range, Set.of(), false); - } else { - iter.seek(range, LocalityGroupUtil.families(scanParams.getColumnSet()), true); - } - - while (iter.hasTop()) { - if (yield.hasYielded()) { - throw new IOException( - "Coding error: hasTop returned true but has yielded at " + yield.getPositionAndReset()); - } - value = iter.getTopValue(); - key = iter.getTopKey(); - - KVEntry kvEntry = new KVEntry(key, value); // copies key and value - results.add(kvEntry); - resultSize += kvEntry.estimateMemoryUsed(); - resultBytes += kvEntry.numBytes(); - - boolean timesUp = batchTimeOut > 0 && (System.nanoTime() - startNanos) >= timeToRun; - - if (resultSize >= maxResultsSize || results.size() >= scanParams.getMaxEntries() || timesUp) { - continueKey = new Key(key); - skipContinueKey = true; - break; - } - - iter.next(); - } - - if (yield.hasYielded()) { - continueKey = new Key(yield.getPositionAndReset()); - skipContinueKey = true; - if (!range.contains(continueKey)) { - throw new IOException("Underlying iterator yielded to a position outside of its range: " - + continueKey + " not in " + range); - } - if (!results.isEmpty() - && continueKey.compareTo(results.get(results.size() - 1).getKey()) <= 0) { - throw new IOException( - "Underlying iterator yielded to a position that does not follow the last key returned: " - + continueKey + " <= " + results.get(results.size() - 1).getKey()); - } - - log.debug("Scan yield detected at position " + continueKey); - getTabletServer().getScanMetrics().addYield(1); - } else if (!iter.hasTop()) { - // end of tablet has been reached - continueKey = null; - if (results.isEmpty()) { - results = null; - } - } - - return new Batch(skipContinueKey, results, continueKey, resultBytes); - } - - public Scanner createScanner(Range range, ScanParameters scanParams, - AtomicBoolean interruptFlag) { - // do a test to see if this range falls within the tablet, if it does not - // then clip will throw an exception - extent.toDataRange().clip(range); - - return new Scanner(this, range, scanParams, interruptFlag); - } - DataFileValue minorCompact(InMemoryMap memTable, TabletFile tmpDatafile, TabletFile newDatafile, long queued, CommitSession commitSession, long flushId, MinorCompactionReason mincReason) { boolean failed = false; @@ -879,7 +550,7 @@ public class Tablet { return; } - if (lastFlushID >= tableFlushID) { + if (lastFlushID.get() >= tableFlushID) { return; } @@ -889,7 +560,7 @@ public class Tablet { } if (getTabletMemory().getMemTable().getNumEntries() == 0) { - lastFlushID = tableFlushID; + lastFlushID.set(tableFlushID); updatingFlushID = true; updateMetadata = true; } else { @@ -1216,6 +887,7 @@ public class Tablet { * Closes the mapfiles associated with a Tablet. If saveState is true, a minor compaction is * performed. */ + @Override public void close(boolean saveState) throws IOException { initiateClose(saveState); completeClose(saveState, true); @@ -1402,7 +1074,7 @@ public class Tablet { } tabletMeta.getFlushId().ifPresent(flushId -> { - if (flushId != lastFlushID) { + if (flushId != lastFlushID.get()) { String msg = "Closed tablet " + extent + " lastFlushID is inconsistent with metadata : " + flushId + " != " + lastFlushID; log.error(msg); @@ -1411,7 +1083,7 @@ public class Tablet { }); tabletMeta.getCompactId().ifPresent(compactId -> { - if (compactId != lastCompactID) { + if (compactId != lastCompactID.get()) { String msg = "Closed tablet " + extent + " lastCompactID is inconsistent with metadata : " + compactId + " != " + lastCompactID; log.error(msg); @@ -1672,10 +1344,6 @@ public class Tablet { return findSplitRow(getDatafileManager().getFiles()) != null; } - public KeyExtent getExtent() { - return extent; - } - synchronized void computeNumEntries() { Collection<DataFileValue> vals = getDatafileManager().getDatafileSizes().values(); @@ -1704,6 +1372,7 @@ public class Tablet { return closeState == CloseState.CLOSING; } + @Override public boolean isClosed() { // Assign to a local var to avoid race conditions since closeState is volatile and two // comparisons are done. @@ -1822,17 +1491,17 @@ public class Tablet { MetadataTableUtil.splitTablet(high, extent.prevEndRow(), splitRatio, getTabletServer().getContext(), getTabletServer().getLock(), ecids); ManagerMetadataUtil.addNewTablet(getTabletServer().getContext(), low, lowDirectoryName, - getTabletServer().getTabletSession(), lowDatafileSizes, bulkImported, time, lastFlushID, - lastCompactID, getTabletServer().getLock()); + getTabletServer().getTabletSession(), lowDatafileSizes, bulkImported, time, + lastFlushID.get(), lastCompactID.get(), getTabletServer().getLock()); MetadataTableUtil.finishSplit(high, highDatafileSizes, highDatafilesToRemove, getTabletServer().getContext(), getTabletServer().getLock()); TabletLogger.split(extent, low, high, getTabletServer().getTabletSession()); - newTablets.put(high, new TabletData(dirName, highDatafileSizes, time, lastFlushID, - lastCompactID, lastLocation, bulkImported)); - newTablets.put(low, new TabletData(lowDirectoryName, lowDatafileSizes, time, lastFlushID, - lastCompactID, lastLocation, bulkImported)); + newTablets.put(high, new TabletData(dirName, highDatafileSizes, time, lastFlushID.get(), + lastCompactID.get(), lastLocation, bulkImported)); + newTablets.put(low, new TabletData(lowDirectoryName, lowDatafileSizes, time, + lastFlushID.get(), lastCompactID.get(), lastLocation, bulkImported)); long t2 = System.currentTimeMillis(); @@ -1843,10 +1512,16 @@ public class Tablet { } } + @Override public SortedMap<StoredTabletFile,DataFileValue> getDatafiles() { return getDatafileManager().getDatafileSizes(); } + @Override + public void addToYieldMetric(int i) { + getTabletServer().getScanMetrics().addYield(i); + } + public double queryRate() { return queryRate.rate(); } @@ -1868,7 +1543,7 @@ public class Tablet { } public long totalQueriesResults() { - return this.queryResultCount; + return this.queryResultCount.get(); } public long totalIngest() { @@ -1880,7 +1555,7 @@ public class Tablet { } public long totalQueryResultsBytes() { - return this.queryResultBytes; + return this.queryResultBytes.get(); } public long totalScannedCount() { @@ -1888,13 +1563,13 @@ public class Tablet { } public long totalLookupCount() { - return this.lookupCount; + return this.lookupCount.get(); } // synchronized? public void updateRates(long now) { - queryRate.update(now, queryResultCount); - queryByteRate.update(now, queryResultBytes); + queryRate.update(now, queryResultCount.get()); + queryByteRate.update(now, queryResultBytes.get()); ingestRate.update(now, ingestCount); ingestByteRate.update(now, ingestBytes); scannedRate.update(now, scannedCount.get()); @@ -2206,19 +1881,19 @@ public class Tablet { public void compactAll(long compactionId, CompactionConfig compactionConfig) { synchronized (this) { - if (lastCompactID >= compactionId) { + if (lastCompactID.get() >= compactionId) { return; } if (isMinorCompactionRunning()) { // want to wait for running minc to finish before starting majc, see ACCUMULO-3041 if (compactionWaitInfo.compactionID == compactionId) { - if (lastFlushID == compactionWaitInfo.flushID) { + if (lastFlushID.get() == compactionWaitInfo.flushID) { return; } } else { compactionWaitInfo.compactionID = compactionId; - compactionWaitInfo.flushID = lastFlushID; + compactionWaitInfo.flushID = lastFlushID.get(); return; } } @@ -2232,10 +1907,6 @@ public class Tablet { compactable.initiateUserCompaction(compactionId, compactionConfig); } - public TableConfiguration getTableConfiguration() { - return tableConfiguration; - } - public Durability getDurability() { return DurabilityImpl.fromString(getTableConfiguration().get(Property.TABLE_DURABILITY)); } @@ -2248,11 +1919,6 @@ public class Tablet { return dataSourceDeletions.incrementAndGet(); } - public synchronized void updateQueryStats(int size, long numBytes) { - queryResultCount += size; - queryResultBytes += numBytes; - } - public void updateTimer(Operation operation, long queued, long start, long count, boolean failed) { timer.updateTime(operation, queued, start, count, failed); @@ -2298,14 +1964,30 @@ public class Tablet { } + @Override TabletResourceManager getTabletResources() { return tabletResources; } + @Override + public TabletServerScanMetrics getScanMetrics() { + return getTabletServer().getScanMetrics(); + } + DatafileManager getDatafileManager() { return datafileManager; } + @Override + public Pair<Long,Map<TabletFile,DataFileValue>> reserveFilesForScan() { + return getDatafileManager().reserveFilesForScan(); + } + + @Override + public void returnFilesForScan(long scanId) { + getDatafileManager().returnFilesForScan(scanId); + } + public MetadataUpdateCount getUpdateCount() { return getDatafileManager().getUpdateCount(); } @@ -2314,19 +1996,25 @@ public class Tablet { return tabletMemory; } - public long getAndUpdateTime() { - return tabletTime.getAndUpdateTime(); + @Override + public List<InMemoryMap.MemoryIterator> getMemIterators(SamplerConfigurationImpl samplerConfig) { + return getTabletMemory().getIterators(samplerConfig); } - public byte[] getDefaultSecurityLabels() { - return defaultSecurityLabel.derive(); + @Override + public void returnMemIterators(List<InMemoryMap.MemoryIterator> iters) { + getTabletMemory().returnIterators(iters); + } + + public long getAndUpdateTime() { + return tabletTime.getAndUpdateTime(); } public void flushComplete(long flushId) { lastLocation = null; dataSourceDeletions.incrementAndGet(); tabletMemory.finishedMinC(); - lastFlushID = flushId; + lastFlushID.set(flushId); computeNumEntries(); } @@ -2336,18 +2024,9 @@ public class Tablet { return result; } - public synchronized void addActiveScans(ScanDataSource scanDataSource) { - activeScans.add(scanDataSource); - } - - public int removeScan(ScanDataSource scanDataSource) { - activeScans.remove(scanDataSource); - return activeScans.size(); - } - public synchronized void setLastCompactionID(Long compactionId) { if (compactionId != null) { - this.lastCompactID = compactionId; + this.lastCompactID.set(compactionId); } } @@ -2367,10 +2046,6 @@ public class Tablet { return timer.getTabletStats(); } - public AtomicLong getScannedCounter() { - return scannedCount; - } - private static String createTabletDirectoryName(ServerContext context, Text endRow) { if (endRow == null) { return ServerColumnFamily.DEFAULT_TABLET_DIR_NAME; diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletBase.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletBase.java new file mode 100644 index 0000000000..eb6cf8fc83 --- /dev/null +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletBase.java @@ -0,0 +1,462 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.tserver.tablet; + +import static java.util.Objects.requireNonNull; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.SortedMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.ByteSequence; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.dataImpl.KeyExtent; +import org.apache.accumulo.core.iterators.SortedKeyValueIterator; +import org.apache.accumulo.core.iterators.YieldCallback; +import org.apache.accumulo.core.iteratorsImpl.system.IterationInterruptedException; +import org.apache.accumulo.core.iteratorsImpl.system.SourceSwitchingIterator; +import org.apache.accumulo.core.metadata.StoredTabletFile; +import org.apache.accumulo.core.metadata.TabletFile; +import org.apache.accumulo.core.metadata.schema.DataFileValue; +import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl; +import org.apache.accumulo.core.security.ColumnVisibility; +import org.apache.accumulo.core.util.LocalityGroupUtil; +import org.apache.accumulo.core.util.Pair; +import org.apache.accumulo.core.util.ShutdownUtil; +import org.apache.accumulo.server.ServerContext; +import org.apache.accumulo.server.conf.TableConfiguration; +import org.apache.accumulo.server.fs.TooManyFilesException; +import org.apache.accumulo.tserver.InMemoryMap; +import org.apache.accumulo.tserver.TabletServerResourceManager; +import org.apache.accumulo.tserver.metrics.TabletServerScanMetrics; +import org.apache.accumulo.tserver.scan.ScanParameters; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This class exists to share code for scanning a tablet between TabletHostingServer implementations + */ +public abstract class TabletBase { + + private static final Logger log = LoggerFactory.getLogger(TabletBase.class); + + private static final byte[] EMPTY_BYTES = new byte[0]; + + protected final KeyExtent extent; + protected final ServerContext context; + + protected AtomicLong lookupCount = new AtomicLong(0); + protected AtomicLong queryResultCount = new AtomicLong(0); + protected AtomicLong queryResultBytes = new AtomicLong(0); + + protected final Set<ScanDataSource> activeScans = new HashSet<>(); + + private final AccumuloConfiguration.Deriver<byte[]> defaultSecurityLabel; + + protected final TableConfiguration tableConfiguration; + + protected final AtomicLong scannedCount = new AtomicLong(0); + + public TabletBase(ServerContext context, KeyExtent extent) { + this.context = context; + this.extent = extent; + + TableConfiguration tblConf = context.getTableConfiguration(extent.tableId()); + if (tblConf == null) { + context.clearTableListCache(); + tblConf = context.getTableConfiguration(extent.tableId()); + requireNonNull(tblConf, "Could not get table configuration for " + extent.tableId()); + } + + this.tableConfiguration = tblConf; + + if (extent.isMeta()) { + defaultSecurityLabel = () -> EMPTY_BYTES; + } else { + defaultSecurityLabel = tableConfiguration.newDeriver( + conf -> new ColumnVisibility(conf.get(Property.TABLE_DEFAULT_SCANTIME_VISIBILITY)) + .getExpression()); + } + } + + public abstract boolean isClosed(); + + public abstract SortedMap<StoredTabletFile,DataFileValue> getDatafiles(); + + public abstract void addToYieldMetric(int i); + + public abstract long getDataSourceDeletions(); + + abstract TabletServerResourceManager.TabletResourceManager getTabletResources(); + + public abstract List<InMemoryMap.MemoryIterator> + getMemIterators(SamplerConfigurationImpl samplerConfig); + + public abstract void returnMemIterators(List<InMemoryMap.MemoryIterator> iters); + + public abstract Pair<Long,Map<TabletFile,DataFileValue>> reserveFilesForScan(); + + public abstract void returnFilesForScan(long scanId); + + public abstract TabletServerScanMetrics getScanMetrics(); + + protected ScanDataSource createDataSource(ScanParameters scanParams, boolean loadIters, + AtomicBoolean interruptFlag) { + return new ScanDataSource(this, scanParams, loadIters, interruptFlag); + } + + public Scanner createScanner(Range range, ScanParameters scanParams, + AtomicBoolean interruptFlag) { + // do a test to see if this range falls within the tablet, if it does not + // then clip will throw an exception + extent.toDataRange().clip(range); + + return new Scanner(this, range, scanParams, interruptFlag); + } + + public AtomicLong getScannedCounter() { + return scannedCount; + } + + public ServerContext getContext() { + return context; + } + + public TableConfiguration getTableConfiguration() { + return tableConfiguration; + } + + public KeyExtent getExtent() { + return extent; + } + + public byte[] getDefaultSecurityLabels() { + return defaultSecurityLabel.derive(); + } + + public synchronized void addActiveScans(ScanDataSource scanDataSource) { + activeScans.add(scanDataSource); + } + + public int removeScan(ScanDataSource scanDataSource) { + activeScans.remove(scanDataSource); + return activeScans.size(); + } + + public abstract void close(boolean b) throws IOException; + + public Tablet.LookupResult lookup(List<Range> ranges, List<KVEntry> results, + ScanParameters scanParams, long maxResultSize, AtomicBoolean interruptFlag) + throws IOException { + + if (ranges.isEmpty()) { + return new Tablet.LookupResult(); + } + + ranges = Range.mergeOverlapping(ranges); + if (ranges.size() > 1) { + Collections.sort(ranges); + } + + Range tabletRange = getExtent().toDataRange(); + for (Range range : ranges) { + // do a test to see if this range falls within the tablet, if it does not + // then clip will throw an exception + tabletRange.clip(range); + } + + SourceSwitchingIterator.DataSource dataSource = + createDataSource(scanParams, true, interruptFlag); + + Tablet.LookupResult result = null; + + try { + SortedKeyValueIterator<Key,Value> iter = new SourceSwitchingIterator(dataSource); + lookupCount.incrementAndGet(); + result = lookup(iter, ranges, results, scanParams, maxResultSize); + return result; + } catch (IOException ioe) { + dataSource.close(true); + throw ioe; + } finally { + // code in finally block because always want + // to return mapfiles, even when exception is thrown + dataSource.close(false); + + synchronized (this) { + queryResultCount.addAndGet(results.size()); + if (result != null) { + queryResultBytes.addAndGet(result.dataSize); + } + } + } + } + + Batch nextBatch(SortedKeyValueIterator<Key,Value> iter, Range range, ScanParameters scanParams) + throws IOException { + + // log.info("In nextBatch.."); + + long batchTimeOut = scanParams.getBatchTimeOut(); + + long timeToRun = TimeUnit.MILLISECONDS.toNanos(batchTimeOut); + long startNanos = System.nanoTime(); + + if (batchTimeOut == Long.MAX_VALUE || batchTimeOut <= 0) { + batchTimeOut = 0; + } + List<KVEntry> results = new ArrayList<>(); + Key key = null; + + Value value; + long resultSize = 0L; + long resultBytes = 0L; + + long maxResultsSize = getTableConfiguration().getAsBytes(Property.TABLE_SCAN_MAXMEM); + + Key continueKey = null; + boolean skipContinueKey = false; + + YieldCallback<Key> yield = new YieldCallback<>(); + + // we cannot yield if we are in isolation mode + if (!scanParams.isIsolated()) { + iter.enableYielding(yield); + } + + if (scanParams.getColumnSet().isEmpty()) { + iter.seek(range, Set.of(), false); + } else { + iter.seek(range, LocalityGroupUtil.families(scanParams.getColumnSet()), true); + } + + while (iter.hasTop()) { + if (yield.hasYielded()) { + throw new IOException( + "Coding error: hasTop returned true but has yielded at " + yield.getPositionAndReset()); + } + value = iter.getTopValue(); + key = iter.getTopKey(); + + KVEntry kvEntry = new KVEntry(key, value); // copies key and value + results.add(kvEntry); + resultSize += kvEntry.estimateMemoryUsed(); + resultBytes += kvEntry.numBytes(); + + boolean timesUp = batchTimeOut > 0 && (System.nanoTime() - startNanos) >= timeToRun; + + if (resultSize >= maxResultsSize || results.size() >= scanParams.getMaxEntries() || timesUp) { + continueKey = new Key(key); + skipContinueKey = true; + break; + } + + iter.next(); + } + + if (yield.hasYielded()) { + continueKey = new Key(yield.getPositionAndReset()); + skipContinueKey = true; + if (!range.contains(continueKey)) { + throw new IOException("Underlying iterator yielded to a position outside of its range: " + + continueKey + " not in " + range); + } + if (!results.isEmpty() + && continueKey.compareTo(results.get(results.size() - 1).getKey()) <= 0) { + throw new IOException( + "Underlying iterator yielded to a position that does not follow the last key returned: " + + continueKey + " <= " + results.get(results.size() - 1).getKey()); + } + + log.debug("Scan yield detected at position " + continueKey); + addToYieldMetric(1); + } else if (!iter.hasTop()) { + // end of tablet has been reached + continueKey = null; + if (results.isEmpty()) { + results = null; + } + } + + return new Batch(skipContinueKey, results, continueKey, resultBytes); + } + + private Tablet.LookupResult lookup(SortedKeyValueIterator<Key,Value> mmfi, List<Range> ranges, + List<KVEntry> results, ScanParameters scanParams, long maxResultsSize) throws IOException { + + Tablet.LookupResult lookupResult = new Tablet.LookupResult(); + + boolean exceededMemoryUsage = false; + boolean tabletClosed = false; + + Set<ByteSequence> cfset = null; + if (!scanParams.getColumnSet().isEmpty()) { + cfset = LocalityGroupUtil.families(scanParams.getColumnSet()); + } + + long batchTimeOut = scanParams.getBatchTimeOut(); + + long timeToRun = TimeUnit.MILLISECONDS.toNanos(batchTimeOut); + long startNanos = System.nanoTime(); + + if (batchTimeOut <= 0 || batchTimeOut == Long.MAX_VALUE) { + batchTimeOut = 0; + } + + // determine if the iterator supported yielding + YieldCallback<Key> yield = new YieldCallback<>(); + mmfi.enableYielding(yield); + boolean yielded = false; + + for (Range range : ranges) { + + boolean timesUp = batchTimeOut > 0 && (System.nanoTime() - startNanos) > timeToRun; + + if (exceededMemoryUsage || tabletClosed || timesUp || yielded) { + lookupResult.unfinishedRanges.add(range); + continue; + } + + int entriesAdded = 0; + + try { + if (cfset != null) { + mmfi.seek(range, cfset, true); + } else { + mmfi.seek(range, Set.of(), false); + } + + while (mmfi.hasTop()) { + if (yield.hasYielded()) { + throw new IOException("Coding error: hasTop returned true but has yielded at " + + yield.getPositionAndReset()); + } + Key key = mmfi.getTopKey(); + + KVEntry kve = new KVEntry(key, mmfi.getTopValue()); + results.add(kve); + entriesAdded++; + lookupResult.bytesAdded += kve.estimateMemoryUsed(); + lookupResult.dataSize += kve.numBytes(); + + exceededMemoryUsage = lookupResult.bytesAdded > maxResultsSize; + + timesUp = batchTimeOut > 0 && (System.nanoTime() - startNanos) > timeToRun; + + if (exceededMemoryUsage || timesUp) { + addUnfinishedRange(lookupResult, range, key); + break; + } + + mmfi.next(); + } + + if (yield.hasYielded()) { + yielded = true; + Key yieldPosition = yield.getPositionAndReset(); + if (!range.contains(yieldPosition)) { + throw new IOException("Underlying iterator yielded to a position outside of its range: " + + yieldPosition + " not in " + range); + } + if (!results.isEmpty() + && yieldPosition.compareTo(results.get(results.size() - 1).getKey()) <= 0) { + throw new IOException("Underlying iterator yielded to a position" + + " that does not follow the last key returned: " + yieldPosition + " <= " + + results.get(results.size() - 1).getKey()); + } + addUnfinishedRange(lookupResult, range, yieldPosition); + + log.debug("Scan yield detected at position " + yieldPosition); + addToYieldMetric(1); + } + } catch (TooManyFilesException tmfe) { + // treat this as a closed tablet, and let the client retry + log.warn("Tablet {} has too many files, batch lookup can not run", getExtent()); + handleTabletClosedDuringScan(results, lookupResult, exceededMemoryUsage, range, + entriesAdded); + tabletClosed = true; + } catch (IOException ioe) { + if (ShutdownUtil.isShutdownInProgress()) { + // assume HDFS shutdown hook caused this exception + log.debug("IOException while shutdown in progress", ioe); + handleTabletClosedDuringScan(results, lookupResult, exceededMemoryUsage, range, + entriesAdded); + tabletClosed = true; + } else { + throw ioe; + } + } catch (IterationInterruptedException iie) { + if (isClosed()) { + handleTabletClosedDuringScan(results, lookupResult, exceededMemoryUsage, range, + entriesAdded); + tabletClosed = true; + } else { + throw iie; + } + } catch (TabletClosedException tce) { + handleTabletClosedDuringScan(results, lookupResult, exceededMemoryUsage, range, + entriesAdded); + tabletClosed = true; + } + + } + + return lookupResult; + } + + private void handleTabletClosedDuringScan(List<KVEntry> results, Tablet.LookupResult lookupResult, + boolean exceededMemoryUsage, Range range, int entriesAdded) { + if (exceededMemoryUsage) { + throw new IllegalStateException( + "Tablet " + getExtent() + "should not exceed memory usage or close, not both"); + } + + if (entriesAdded > 0) { + addUnfinishedRange(lookupResult, range, results.get(results.size() - 1).getKey()); + } else { + lookupResult.unfinishedRanges.add(range); + } + + lookupResult.closed = true; + } + + private void addUnfinishedRange(Tablet.LookupResult lookupResult, Range range, Key key) { + if (range.getEndKey() == null || key.compareTo(range.getEndKey()) < 0) { + Range nlur = new Range(new Key(key), false, range.getEndKey(), range.isEndKeyInclusive()); + lookupResult.unfinishedRanges.add(nlur); + } + } + + public synchronized void updateQueryStats(int size, long numBytes) { + queryResultCount.addAndGet(size); + queryResultBytes.addAndGet(numBytes); + } +}