This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new 3e47a0abde Fixed scan server bugs found when testing offline scans 
(#3164)
3e47a0abde is described below

commit 3e47a0abde87740cf7305d203dbd3390ed51edba
Author: Keith Turner <ktur...@apache.org>
AuthorDate: Sat Jan 21 15:23:05 2023 -0500

    Fixed scan server bugs found when testing offline scans (#3164)
---
 .../org/apache/accumulo/tserver/ScanServer.java    | 26 +++++++++++++++++-----
 .../apache/accumulo/tserver/ScanServerTest.java    |  2 +-
 2 files changed, 22 insertions(+), 6 deletions(-)

diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java
index 5a025a156e..19698afad0 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java
@@ -44,6 +44,7 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.client.AccumuloException;
@@ -459,7 +460,7 @@ public class ScanServer extends AbstractServer
     /* This constructor is called when continuing a scan */
     ScanReservation(Collection<StoredTabletFile> files, long myReservationId) {
       this.tabletsMetadata = null;
-      this.failures = null;
+      this.failures = Map.of();
       this.files = files;
       this.myReservationId = myReservationId;
     }
@@ -525,6 +526,10 @@ public class ScanServer extends AbstractServer
         LOG.info("RFFS {} extent unable to load {} as AssignmentHandler 
returned false",
             myReservationId, extent);
         failures.add(extent);
+        if (!(tabletsMetadata instanceof HashMap)) {
+          // the map returned by getTabletMetadata may not be mutable
+          tabletsMetadata = new HashMap<>(tabletsMetadata);
+        }
         tabletsMetadata.remove(extent);
       }
     }
@@ -612,12 +617,15 @@ public class ScanServer extends AbstractServer
             LOG.info("RFFS {} extent unable to load {} as metadata no longer 
referencing files",
                 myReservationId, extent);
             failures.add(extent);
+            if (!(tabletsMetadata instanceof HashMap)) {
+              // the map returned by getTabletMetadata may not be mutable
+              tabletsMetadata = new HashMap<>(tabletsMetadata);
+            }
             tabletsMetadata.remove(extent);
           } else {
             // remove files that are still referenced
             filesToReserve.removeAll(metadataAfter.getFiles());
           }
-
         }
 
         // if this is not empty it means some files that we reserved are no 
longer referenced by
@@ -728,9 +736,15 @@ public class ScanServer extends AbstractServer
       return 
Set.copyOf(session.getTabletResolver().getTablet(sss.extent).getDatafiles().keySet());
     } else if (session instanceof MultiScanSession) {
       var mss = (MultiScanSession) session;
-      return mss.exents.stream()
-          .flatMap(e -> 
mss.getTabletResolver().getTablet(e).getDatafiles().keySet().stream())
-          .collect(Collectors.toUnmodifiableSet());
+      return mss.exents.stream().flatMap(e -> {
+        var tablet = mss.getTabletResolver().getTablet(e);
+        if (tablet == null) {
+          // not all tablets passed to a multiscan are present in the metadata 
table
+          return Stream.empty();
+        } else {
+          return tablet.getDatafiles().keySet().stream();
+        }
+      }).collect(Collectors.toUnmodifiableSet());
     } else {
       throw new IllegalArgumentException("Unknown session type " + 
session.getClass().getName());
     }
@@ -894,6 +908,7 @@ public class ScanServer extends AbstractServer
     LOG.debug("continue scan: {}", scanID);
 
     try (ScanReservation reservation = reserveFiles(scanID)) {
+      Preconditions.checkState(reservation.getFailures().isEmpty());
       return delegate.continueScan(tinfo, scanID, busyTimeout);
     }
   }
@@ -955,6 +970,7 @@ public class ScanServer extends AbstractServer
     LOG.debug("continue multi scan: {}", scanID);
 
     try (ScanReservation reservation = reserveFiles(scanID)) {
+      Preconditions.checkState(reservation.getFailures().isEmpty());
       return delegate.continueMultiScan(tinfo, scanID, busyTimeout);
     }
   }
diff --git 
a/server/tserver/src/test/java/org/apache/accumulo/tserver/ScanServerTest.java 
b/server/tserver/src/test/java/org/apache/accumulo/tserver/ScanServerTest.java
index d2fd4f5cda..fdb79e1b00 100644
--- 
a/server/tserver/src/test/java/org/apache/accumulo/tserver/ScanServerTest.java
+++ 
b/server/tserver/src/test/java/org/apache/accumulo/tserver/ScanServerTest.java
@@ -125,7 +125,7 @@ public class ScanServerTest {
     TabletResolver resolver = createMock(TabletResolver.class);
 
     TestScanServer ss = partialMockBuilder(TestScanServer.class).createMock();
-    expect(reservation.getFailures()).andReturn(Map.of());
+    expect(reservation.getFailures()).andReturn(Map.of()).times(2);
     expect(reservation.newTablet(ss, sextent)).andReturn(tablet);
     reservation.close();
     reservation.close();

Reply via email to