This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch 2.1 in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/2.1 by this push: new 3e47a0abde Fixed scan server bugs found when testing offline scans (#3164) 3e47a0abde is described below commit 3e47a0abde87740cf7305d203dbd3390ed51edba Author: Keith Turner <ktur...@apache.org> AuthorDate: Sat Jan 21 15:23:05 2023 -0500 Fixed scan server bugs found when testing offline scans (#3164) --- .../org/apache/accumulo/tserver/ScanServer.java | 26 +++++++++++++++++----- .../apache/accumulo/tserver/ScanServerTest.java | 2 +- 2 files changed, 22 insertions(+), 6 deletions(-) diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java index 5a025a156e..19698afad0 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java @@ -44,6 +44,7 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.stream.Collectors; +import java.util.stream.Stream; import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.client.AccumuloException; @@ -459,7 +460,7 @@ public class ScanServer extends AbstractServer /* This constructor is called when continuing a scan */ ScanReservation(Collection<StoredTabletFile> files, long myReservationId) { this.tabletsMetadata = null; - this.failures = null; + this.failures = Map.of(); this.files = files; this.myReservationId = myReservationId; } @@ -525,6 +526,10 @@ public class ScanServer extends AbstractServer LOG.info("RFFS {} extent unable to load {} as AssignmentHandler returned false", myReservationId, extent); failures.add(extent); + if (!(tabletsMetadata instanceof HashMap)) { + // the map returned by getTabletMetadata may not be mutable + tabletsMetadata = new HashMap<>(tabletsMetadata); + } tabletsMetadata.remove(extent); } } @@ -612,12 +617,15 @@ public class ScanServer extends AbstractServer LOG.info("RFFS {} extent unable to load {} as metadata no longer referencing files", myReservationId, extent); failures.add(extent); + if (!(tabletsMetadata instanceof HashMap)) { + // the map returned by getTabletMetadata may not be mutable + tabletsMetadata = new HashMap<>(tabletsMetadata); + } tabletsMetadata.remove(extent); } else { // remove files that are still referenced filesToReserve.removeAll(metadataAfter.getFiles()); } - } // if this is not empty it means some files that we reserved are no longer referenced by @@ -728,9 +736,15 @@ public class ScanServer extends AbstractServer return Set.copyOf(session.getTabletResolver().getTablet(sss.extent).getDatafiles().keySet()); } else if (session instanceof MultiScanSession) { var mss = (MultiScanSession) session; - return mss.exents.stream() - .flatMap(e -> mss.getTabletResolver().getTablet(e).getDatafiles().keySet().stream()) - .collect(Collectors.toUnmodifiableSet()); + return mss.exents.stream().flatMap(e -> { + var tablet = mss.getTabletResolver().getTablet(e); + if (tablet == null) { + // not all tablets passed to a multiscan are present in the metadata table + return Stream.empty(); + } else { + return tablet.getDatafiles().keySet().stream(); + } + }).collect(Collectors.toUnmodifiableSet()); } else { throw new IllegalArgumentException("Unknown session type " + session.getClass().getName()); } @@ -894,6 +908,7 @@ public class ScanServer extends AbstractServer LOG.debug("continue scan: {}", scanID); try (ScanReservation reservation = reserveFiles(scanID)) { + Preconditions.checkState(reservation.getFailures().isEmpty()); return delegate.continueScan(tinfo, scanID, busyTimeout); } } @@ -955,6 +970,7 @@ public class ScanServer extends AbstractServer LOG.debug("continue multi scan: {}", scanID); try (ScanReservation reservation = reserveFiles(scanID)) { + Preconditions.checkState(reservation.getFailures().isEmpty()); return delegate.continueMultiScan(tinfo, scanID, busyTimeout); } } diff --git a/server/tserver/src/test/java/org/apache/accumulo/tserver/ScanServerTest.java b/server/tserver/src/test/java/org/apache/accumulo/tserver/ScanServerTest.java index d2fd4f5cda..fdb79e1b00 100644 --- a/server/tserver/src/test/java/org/apache/accumulo/tserver/ScanServerTest.java +++ b/server/tserver/src/test/java/org/apache/accumulo/tserver/ScanServerTest.java @@ -125,7 +125,7 @@ public class ScanServerTest { TabletResolver resolver = createMock(TabletResolver.class); TestScanServer ss = partialMockBuilder(TestScanServer.class).createMock(); - expect(reservation.getFailures()).andReturn(Map.of()); + expect(reservation.getFailures()).andReturn(Map.of()).times(2); expect(reservation.newTablet(ss, sextent)).andReturn(tablet); reservation.close(); reservation.close();