This is an automated email from the ASF dual-hosted git repository.

dlmarion pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/main by this push:
     new af958faa0c Updates to ScanServerAttempts (#2912)
af958faa0c is described below

commit af958faa0c67d17a32f808507ca82728a3e592a3
Author: Christopher Tubbs <ctubb...@apache.org>
AuthorDate: Tue Oct 18 14:08:53 2022 -0400

    Updates to ScanServerAttempts (#2912)
    
    As a follow-up to #2880:
    
    * Remove unused and undocumented endTime in ScanAttempt API
    * Separate out more inner classes/interfaces into their own files
    * Simplify concurrency by synchronizing on the attempts and preparing
      the snapshot using immutable collections
    * IDE changes also converted some anonymous inner classes to lambdas
---
 .../ScanServerAttemptImpl.java}                    |  35 ++++---
 .../ScanServerAttemptReporter.java}                |  23 +----
 .../core/clientImpl/ScanServerAttemptsImpl.java    | 104 +++------------------
 .../TabletServerBatchReaderIterator.java           |  11 +--
 .../accumulo/core/spi/scan/ScanServerAttempt.java  |   2 -
 .../core/clientImpl/ScanAttemptsImplTest.java      |   2 +-
 .../scan/ConfigurableScanServerSelectorTest.java   |  11 +--
 7 files changed, 47 insertions(+), 141 deletions(-)

diff --git 
a/core/src/main/java/org/apache/accumulo/core/spi/scan/ScanServerAttempt.java 
b/core/src/main/java/org/apache/accumulo/core/clientImpl/ScanServerAttemptImpl.java
similarity index 61%
copy from 
core/src/main/java/org/apache/accumulo/core/spi/scan/ScanServerAttempt.java
copy to 
core/src/main/java/org/apache/accumulo/core/clientImpl/ScanServerAttemptImpl.java
index dde2ec4021..bda2f26880 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/spi/scan/ScanServerAttempt.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/clientImpl/ScanServerAttemptImpl.java
@@ -16,25 +16,30 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.accumulo.core.spi.scan;
+package org.apache.accumulo.core.clientImpl;
 
-/**
- * This object is used to communicate what previous actions were attempted, 
when they were
- * attempted, and the result of those attempts
- *
- * @since 2.1.0
- */
-public interface ScanServerAttempt {
+import java.util.Objects;
 
-  // represents reasons that previous attempts to scan failed
-  enum Result {
-    BUSY, ERROR
-  }
+import org.apache.accumulo.core.spi.scan.ScanServerAttempt;
+
+class ScanServerAttemptImpl implements ScanServerAttempt {
 
-  String getServer();
+  private final String server;
+  private final Result result;
 
-  long getEndTime();
+  ScanServerAttemptImpl(Result result, String server) {
+    this.result = result;
+    this.server = Objects.requireNonNull(server);
+  }
+
+  @Override
+  public String getServer() {
+    return server;
+  }
 
-  ScanServerAttempt.Result getResult();
+  @Override
+  public Result getResult() {
+    return result;
+  }
 
 }
diff --git 
a/core/src/main/java/org/apache/accumulo/core/spi/scan/ScanServerAttempt.java 
b/core/src/main/java/org/apache/accumulo/core/clientImpl/ScanServerAttemptReporter.java
similarity index 65%
copy from 
core/src/main/java/org/apache/accumulo/core/spi/scan/ScanServerAttempt.java
copy to 
core/src/main/java/org/apache/accumulo/core/clientImpl/ScanServerAttemptReporter.java
index dde2ec4021..37983d3688 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/spi/scan/ScanServerAttempt.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/clientImpl/ScanServerAttemptReporter.java
@@ -16,25 +16,10 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.accumulo.core.spi.scan;
+package org.apache.accumulo.core.clientImpl;
 
-/**
- * This object is used to communicate what previous actions were attempted, 
when they were
- * attempted, and the result of those attempts
- *
- * @since 2.1.0
- */
-public interface ScanServerAttempt {
-
-  // represents reasons that previous attempts to scan failed
-  enum Result {
-    BUSY, ERROR
-  }
-
-  String getServer();
-
-  long getEndTime();
-
-  ScanServerAttempt.Result getResult();
+import org.apache.accumulo.core.spi.scan.ScanServerAttempt;
 
+interface ScanServerAttemptReporter {
+  void report(ScanServerAttempt.Result result);
 }
diff --git 
a/core/src/main/java/org/apache/accumulo/core/clientImpl/ScanServerAttemptsImpl.java
 
b/core/src/main/java/org/apache/accumulo/core/clientImpl/ScanServerAttemptsImpl.java
index fc0896aeb2..48ff72b532 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/clientImpl/ScanServerAttemptsImpl.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/clientImpl/ScanServerAttemptsImpl.java
@@ -18,12 +18,14 @@
  */
 package org.apache.accumulo.core.clientImpl;
 
+import static java.util.stream.Collectors.toUnmodifiableMap;
+
+import java.util.ArrayList;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.stream.Collectors;
+import java.util.Map.Entry;
 
 import org.apache.accumulo.core.data.TabletId;
 import org.apache.accumulo.core.spi.scan.ScanServerAttempt;
@@ -41,71 +43,14 @@ public class ScanServerAttemptsImpl {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(ScanServerAttemptsImpl.class);
 
-  static class ScanServerAttemptImpl implements ScanServerAttempt {
-
-    private final String server;
-    private final long time;
-    private final Result result;
-    private volatile long mutationCount = Long.MAX_VALUE;
-
-    ScanServerAttemptImpl(Result result, String server, long time) {
-      this.result = result;
-      this.server = Objects.requireNonNull(server);
-      this.time = time;
-    }
-
-    @Override
-    public String getServer() {
-      return server;
-    }
-
-    @Override
-    public long getEndTime() {
-      return time;
-    }
-
-    @Override
-    public Result getResult() {
-      return result;
-    }
-
-    private void setMutationCount(long mc) {
-      this.mutationCount = mc;
-    }
-
-    public long getMutationCount() {
-      return mutationCount;
-    }
-  }
-
-  private final Map<TabletId,Collection<ScanServerAttemptImpl>> attempts =
-      new ConcurrentHashMap<>();
-  private long mutationCounter = 0;
+  private final Map<TabletId,Collection<ScanServerAttemptImpl>> attempts = new 
HashMap<>();
 
-  private void add(TabletId tablet, ScanServerAttempt.Result result, String 
server, long endTime) {
-
-    ScanServerAttemptImpl sa = new ScanServerAttemptImpl(result, server, 
endTime);
-
-    attempts.computeIfAbsent(tablet, k -> 
ConcurrentHashMap.newKeySet()).add(sa);
-
-    synchronized (this) {
-      // now that the scan attempt obj is added to all concurrent data 
structs, make it visible
-      // need to atomically increment the counter AND set the counter on the 
object
-      sa.setMutationCount(mutationCounter++);
-    }
-
-  }
-
-  public interface ScanAttemptReporter {
-    void report(ScanServerAttempt.Result result);
-  }
-
-  ScanAttemptReporter createReporter(String server, TabletId tablet) {
-    return new ScanAttemptReporter() {
-      @Override
-      public void report(ScanServerAttempt.Result result) {
-        LOG.trace("Received result: {}", result);
-        add(tablet, result, server, System.currentTimeMillis());
+  ScanServerAttemptReporter createReporter(String server, TabletId tablet) {
+    return result -> {
+      LOG.trace("Received result: {}", result);
+      synchronized (attempts) {
+        attempts.computeIfAbsent(tablet, k -> new ArrayList<>())
+            .add(new ScanServerAttemptImpl(result, server));
       }
     };
   }
@@ -118,28 +63,9 @@ public class ScanServerAttemptsImpl {
    *         that TabletId
    */
   Map<TabletId,Collection<ScanServerAttemptImpl>> snapshot() {
-
-    final long mutationCounterSnapshot;
-    synchronized (ScanServerAttemptsImpl.this) {
-      mutationCounterSnapshot = mutationCounter;
+    synchronized (attempts) {
+      return attempts.entrySet().stream()
+          .collect(toUnmodifiableMap(Entry::getKey, entry -> 
List.copyOf(entry.getValue())));
     }
-
-    Map<TabletId,Collection<ScanServerAttemptImpl>> result = new 
ConcurrentHashMap<>();
-
-    attempts.forEach((tabletId, scanAttempts) -> {
-
-      // filter out ScanServerScanAttempt objects that were added after this 
call
-      List<ScanServerAttemptImpl> filteredScanAttempts = scanAttempts.stream()
-          .filter(scanAttempt -> scanAttempt.getMutationCount() < 
mutationCounterSnapshot)
-          .collect(Collectors.toList());
-
-      // only add an entry to the map if there are ScanServerScanAttempt 
objects for the current
-      // TabletId
-      if (!filteredScanAttempts.isEmpty())
-        result.put(tabletId, filteredScanAttempts);
-
-    });
-
-    return result;
   }
 }
diff --git 
a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReaderIterator.java
 
b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReaderIterator.java
index 63eb186a49..796a4aa886 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReaderIterator.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReaderIterator.java
@@ -344,13 +344,12 @@ public class TabletServerBatchReaderIterator implements 
Iterator<Entry<Key,Value
     private List<Column> columns;
     private int semaphoreSize;
     private final long busyTimeout;
-    private final ScanServerAttemptsImpl.ScanAttemptReporter reporter;
+    private final ScanServerAttemptReporter reporter;
     private final Duration scanServerSelectorDelay;
 
     QueryTask(String tsLocation, Map<KeyExtent,List<Range>> tabletsRanges,
         Map<KeyExtent,List<Range>> failures, ResultReceiver receiver, 
List<Column> columns,
-        long busyTimeout, ScanServerAttemptsImpl.ScanAttemptReporter reporter,
-        Duration scanServerSelectorDelay) {
+        long busyTimeout, ScanServerAttemptReporter reporter, Duration 
scanServerSelectorDelay) {
       this.tsLocation = tsLocation;
       this.tabletsRanges = tabletsRanges;
       this.receiver = receiver;
@@ -487,7 +486,7 @@ public class TabletServerBatchReaderIterator implements 
Iterator<Entry<Key,Value
 
     long busyTimeout = 0;
     Duration scanServerSelectorDelay = null;
-    Map<String,ScanServerAttemptsImpl.ScanAttemptReporter> reporters = 
Map.of();
+    Map<String,ScanServerAttemptReporter> reporters = Map.of();
 
     if (options.getConsistencyLevel().equals(ConsistencyLevel.EVENTUAL)) {
       var scanServerData = rebinToScanServers(binnedRanges);
@@ -580,7 +579,7 @@ public class TabletServerBatchReaderIterator implements 
Iterator<Entry<Key,Value
   private static class ScanServerData {
     Map<String,Map<KeyExtent,List<Range>>> binnedRanges;
     ScanServerSelections actions;
-    Map<String,ScanServerAttemptsImpl.ScanAttemptReporter> reporters;
+    Map<String,ScanServerAttemptReporter> reporters;
   }
 
   private ScanServerData 
rebinToScanServers(Map<String,Map<KeyExtent,List<Range>>> binnedRanges) {
@@ -624,7 +623,7 @@ public class TabletServerBatchReaderIterator implements 
Iterator<Entry<Key,Value
 
     Map<String,Map<KeyExtent,List<Range>>> binnedRanges2 = new HashMap<>();
 
-    Map<String,ScanServerAttemptsImpl.ScanAttemptReporter> reporters = new 
HashMap<>();
+    Map<String,ScanServerAttemptReporter> reporters = new HashMap<>();
 
     for (TabletIdImpl tabletId : tabletIds) {
       KeyExtent extent = tabletId.toKeyExtent();
diff --git 
a/core/src/main/java/org/apache/accumulo/core/spi/scan/ScanServerAttempt.java 
b/core/src/main/java/org/apache/accumulo/core/spi/scan/ScanServerAttempt.java
index dde2ec4021..b886e3b26d 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/spi/scan/ScanServerAttempt.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/spi/scan/ScanServerAttempt.java
@@ -33,8 +33,6 @@ public interface ScanServerAttempt {
 
   String getServer();
 
-  long getEndTime();
-
   ScanServerAttempt.Result getResult();
 
 }
diff --git 
a/core/src/test/java/org/apache/accumulo/core/clientImpl/ScanAttemptsImplTest.java
 
b/core/src/test/java/org/apache/accumulo/core/clientImpl/ScanAttemptsImplTest.java
index 829f2d3f5a..eee7d2ce77 100644
--- 
a/core/src/test/java/org/apache/accumulo/core/clientImpl/ScanAttemptsImplTest.java
+++ 
b/core/src/test/java/org/apache/accumulo/core/clientImpl/ScanAttemptsImplTest.java
@@ -34,7 +34,7 @@ import org.junit.jupiter.api.Test;
 public class ScanAttemptsImplTest {
 
   private Map<TabletId,Collection<String>>
-      
simplify(Map<TabletId,Collection<ScanServerAttemptsImpl.ScanServerAttemptImpl>> 
map) {
+      simplify(Map<TabletId,Collection<ScanServerAttemptImpl>> map) {
     Map<TabletId,Collection<String>> ret = new HashMap<>();
 
     map.forEach((tabletId, scanAttempts) -> {
diff --git 
a/core/src/test/java/org/apache/accumulo/core/spi/scan/ConfigurableScanServerSelectorTest.java
 
b/core/src/test/java/org/apache/accumulo/core/spi/scan/ConfigurableScanServerSelectorTest.java
index dd22c2913a..d4448fb08d 100644
--- 
a/core/src/test/java/org/apache/accumulo/core/spi/scan/ConfigurableScanServerSelectorTest.java
+++ 
b/core/src/test/java/org/apache/accumulo/core/spi/scan/ConfigurableScanServerSelectorTest.java
@@ -131,12 +131,10 @@ public class ConfigurableScanServerSelectorTest {
   static class TestScanServerAttempt implements ScanServerAttempt {
 
     private final String server;
-    private final long endTime;
     private final Result result;
 
-    TestScanServerAttempt(String server, long endTime, Result result) {
+    TestScanServerAttempt(String server, Result result) {
       this.server = server;
-      this.endTime = endTime;
       this.result = result;
     }
 
@@ -145,11 +143,6 @@ public class ConfigurableScanServerSelectorTest {
       return server;
     }
 
-    @Override
-    public long getEndTime() {
-      return endTime;
-    }
-
     @Override
     public Result getResult() {
       return result;
@@ -204,7 +197,7 @@ public class ConfigurableScanServerSelectorTest {
     var tabletId = nti("1", "m");
 
     var tabletAttempts = Stream.iterate(1, i -> i <= busyAttempts, i -> i + 1)
-        .map(i -> (new TestScanServerAttempt("ss" + i + ":" + i, i, 
ScanServerAttempt.Result.BUSY)))
+        .map(i -> (new TestScanServerAttempt("ss" + i + ":" + i, 
ScanServerAttempt.Result.BUSY)))
         .collect(Collectors.toList());
 
     Map<TabletId,Collection<? extends ScanServerAttempt>> attempts = new 
HashMap<>();

Reply via email to