This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new 7947c2c2df adds PluginEnv support to client side iterators (#4283)
7947c2c2df is described below

commit 7947c2c2dfb80fb280420b729b4197547be22124
Author: Keith Turner <ktur...@apache.org>
AuthorDate: Wed Mar 20 18:25:43 2024 -0400

    adds PluginEnv support to client side iterators (#4283)
    
    Accumulo code that ran user iterators client side would throw an unsupported
    operations exception when attempting to access the PluginEnv.  This commit
    adds support for PluginEnv in situations where iterators are run client 
side.
---
 .../core/client/ClientSideIteratorScanner.java     |  46 ++++++++-
 .../accumulo/core/clientImpl/OfflineIterator.java  |  34 ++++++-
 .../accumulo/core/clientImpl/ScannerImpl.java      |  10 ++
 .../apache/accumulo/test/ClientSideIteratorIT.java | 108 +++++++++++++++++++++
 4 files changed, 192 insertions(+), 6 deletions(-)

diff --git 
a/core/src/main/java/org/apache/accumulo/core/client/ClientSideIteratorScanner.java
 
b/core/src/main/java/org/apache/accumulo/core/client/ClientSideIteratorScanner.java
index 542273d470..362c4e85d6 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/client/ClientSideIteratorScanner.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/client/ClientSideIteratorScanner.java
@@ -29,15 +29,20 @@ import java.util.Map.Entry;
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.TreeSet;
+import java.util.function.Supplier;
 
 import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.client.sample.SamplerConfiguration;
+import org.apache.accumulo.core.clientImpl.ClientContext;
+import org.apache.accumulo.core.clientImpl.ClientServiceEnvironmentImpl;
+import org.apache.accumulo.core.clientImpl.ScannerImpl;
 import org.apache.accumulo.core.clientImpl.ScannerOptions;
 import org.apache.accumulo.core.data.ArrayByteSequence;
 import org.apache.accumulo.core.data.ByteSequence;
 import org.apache.accumulo.core.data.Column;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.TableId;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.dataImpl.thrift.IterInfo;
 import org.apache.accumulo.core.iterators.IteratorAdapter;
@@ -47,6 +52,7 @@ import 
org.apache.accumulo.core.iterators.SortedKeyValueIterator;
 import org.apache.accumulo.core.iteratorsImpl.IteratorBuilder;
 import org.apache.accumulo.core.iteratorsImpl.IteratorConfigUtil;
 import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.spi.common.ServiceEnvironment;
 import org.apache.hadoop.io.Text;
 
 /**
@@ -70,6 +76,7 @@ import org.apache.hadoop.io.Text;
  * server side) and to the client side scanner (which will execute client 
side).
  */
 public class ClientSideIteratorScanner extends ScannerOptions implements 
Scanner {
+
   private int size;
 
   private Range range;
@@ -77,6 +84,9 @@ public class ClientSideIteratorScanner extends ScannerOptions 
implements Scanner
   private long readaheadThreshold = 
Constants.SCANNER_DEFAULT_READAHEAD_THRESHOLD;
   private SamplerConfiguration iteratorSamplerConfig;
 
+  private final Supplier<ClientContext> context;
+  private final Supplier<TableId> tableId;
+
   private class ClientSideIteratorEnvironment implements IteratorEnvironment {
 
     private SamplerConfiguration samplerConfig;
@@ -94,7 +104,9 @@ public class ClientSideIteratorScanner extends 
ScannerOptions implements Scanner
 
     @Override
     public boolean isFullMajorCompaction() {
-      return false;
+      // The javadocs state this method will throw an ISE when scope is not 
majc
+      throw new IllegalStateException(
+          "Asked about major compaction type when scope is " + 
getIteratorScope());
     }
 
     @Override
@@ -121,6 +133,22 @@ public class ClientSideIteratorScanner extends 
ScannerOptions implements Scanner
     public SamplerConfiguration getSamplerConfiguration() {
       return samplerConfig;
     }
+
+    @Deprecated(since = "2.1.0")
+    @Override
+    public ServiceEnvironment getServiceEnv() {
+      return new ClientServiceEnvironmentImpl(context.get());
+    }
+
+    @Override
+    public PluginEnvironment getPluginEnv() {
+      return new ClientServiceEnvironmentImpl(context.get());
+    }
+
+    @Override
+    public TableId getTableId() {
+      return tableId.get();
+    }
   }
 
   /**
@@ -220,6 +248,22 @@ public class ClientSideIteratorScanner extends 
ScannerOptions implements Scanner
     if (samplerConfig != null) {
       setSamplerConfiguration(samplerConfig);
     }
+
+    if (scanner instanceof ScannerImpl) {
+      var scannerImpl = (ScannerImpl) scanner;
+      this.context = () -> scannerImpl.getClientContext();
+      this.tableId = () -> scannerImpl.getTableId();
+    } else {
+      // These may never be used, so only fail if an attempt is made to use 
them.
+      this.context = () -> {
+        throw new UnsupportedOperationException(
+            "Do not know how to obtain client context from " + 
scanner.getClass().getName());
+      };
+      this.tableId = () -> {
+        throw new UnsupportedOperationException(
+            "Do not know how to obtain tableId from " + 
scanner.getClass().getName());
+      };
+    }
   }
 
   /**
diff --git 
a/core/src/main/java/org/apache/accumulo/core/clientImpl/OfflineIterator.java 
b/core/src/main/java/org/apache/accumulo/core/clientImpl/OfflineIterator.java
index e24724d066..a03cc811ab 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/clientImpl/OfflineIterator.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/clientImpl/OfflineIterator.java
@@ -34,6 +34,7 @@ import java.util.Map.Entry;
 
 import org.apache.accumulo.core.client.AccumuloException;
 import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.PluginEnvironment;
 import org.apache.accumulo.core.client.SampleNotPresentException;
 import org.apache.accumulo.core.client.TableNotFoundException;
 import org.apache.accumulo.core.client.sample.SamplerConfiguration;
@@ -65,6 +66,7 @@ import 
org.apache.accumulo.core.metadata.schema.TabletsMetadata;
 import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.security.ColumnVisibility;
+import org.apache.accumulo.core.spi.common.ServiceEnvironment;
 import org.apache.accumulo.core.util.LocalityGroupUtil;
 import org.apache.accumulo.core.volume.VolumeConfiguration;
 import org.apache.hadoop.conf.Configuration;
@@ -79,9 +81,13 @@ class OfflineIterator implements Iterator<Entry<Key,Value>> {
     private final AccumuloConfiguration conf;
     private final boolean useSample;
     private final SamplerConfiguration sampleConf;
+    private final ClientContext context;
+    private final TableId tableId;
 
-    public OfflineIteratorEnvironment(Authorizations auths, 
AccumuloConfiguration acuTableConf,
-        boolean useSample, SamplerConfiguration samplerConf) {
+    public OfflineIteratorEnvironment(ClientContext context, TableId tableId, 
Authorizations auths,
+        AccumuloConfiguration acuTableConf, boolean useSample, 
SamplerConfiguration samplerConf) {
+      this.context = context;
+      this.tableId = tableId;
       this.authorizations = auths;
       this.conf = acuTableConf;
       this.useSample = useSample;
@@ -147,7 +153,24 @@ class OfflineIterator implements 
Iterator<Entry<Key,Value>> {
       if (sampleConf == null) {
         throw new SampleNotPresentException();
       }
-      return new OfflineIteratorEnvironment(authorizations, conf, true, 
sampleConf);
+      return new OfflineIteratorEnvironment(context, tableId, authorizations, 
conf, true,
+          sampleConf);
+    }
+
+    @Deprecated(since = "2.1.0")
+    @Override
+    public ServiceEnvironment getServiceEnv() {
+      return new ClientServiceEnvironmentImpl(context);
+    }
+
+    @Override
+    public PluginEnvironment getPluginEnv() {
+      return new ClientServiceEnvironmentImpl(context);
+    }
+
+    @Override
+    public TableId getTableId() {
+      return tableId;
     }
   }
 
@@ -322,8 +345,9 @@ class OfflineIterator implements Iterator<Entry<Key,Value>> 
{
 
     MultiIterator multiIter = new MultiIterator(readers, extent);
 
-    OfflineIteratorEnvironment iterEnv = new 
OfflineIteratorEnvironment(authorizations, tableCC,
-        false, samplerConfImpl == null ? null : 
samplerConfImpl.toSamplerConfiguration());
+    OfflineIteratorEnvironment iterEnv =
+        new OfflineIteratorEnvironment(context, tableId, authorizations, 
tableCC, false,
+            samplerConfImpl == null ? null : 
samplerConfImpl.toSamplerConfiguration());
 
     byte[] defaultSecurityLabel;
     ColumnVisibility cv =
diff --git 
a/core/src/main/java/org/apache/accumulo/core/clientImpl/ScannerImpl.java 
b/core/src/main/java/org/apache/accumulo/core/clientImpl/ScannerImpl.java
index c418e5a412..a1f77184ba 100644
--- a/core/src/main/java/org/apache/accumulo/core/clientImpl/ScannerImpl.java
+++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/ScannerImpl.java
@@ -120,6 +120,16 @@ public class ScannerImpl extends ScannerOptions implements 
Scanner {
     this.size = Constants.SCAN_BATCH_SIZE;
   }
 
+  public ClientContext getClientContext() {
+    ensureOpen();
+    return context;
+  }
+
+  public TableId getTableId() {
+    ensureOpen();
+    return tableId;
+  }
+
   @Override
   public synchronized void setRange(Range range) {
     ensureOpen();
diff --git 
a/test/src/main/java/org/apache/accumulo/test/ClientSideIteratorIT.java 
b/test/src/main/java/org/apache/accumulo/test/ClientSideIteratorIT.java
index e71fc20e43..c7f56fb71f 100644
--- a/test/src/main/java/org/apache/accumulo/test/ClientSideIteratorIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/ClientSideIteratorIT.java
@@ -22,9 +22,16 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Predicate;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
 
 import org.apache.accumulo.core.client.Accumulo;
 import org.apache.accumulo.core.client.AccumuloClient;
@@ -32,10 +39,15 @@ import org.apache.accumulo.core.client.BatchWriter;
 import org.apache.accumulo.core.client.ClientSideIteratorScanner;
 import org.apache.accumulo.core.client.IteratorSetting;
 import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.clientImpl.ClientContext;
+import org.apache.accumulo.core.clientImpl.OfflineScanner;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.data.PartialKey;
 import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.Filter;
+import org.apache.accumulo.core.iterators.IteratorEnvironment;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
 import org.apache.accumulo.core.iterators.user.IntersectingIterator;
 import org.apache.accumulo.core.iterators.user.VersioningIterator;
 import org.apache.accumulo.core.security.Authorizations;
@@ -154,4 +166,100 @@ public class ClientSideIteratorIT extends 
AccumuloClusterHarness {
       assertFalse(csis.iterator().hasNext());
     }
   }
+
+  private static final AtomicBoolean initCalled = new AtomicBoolean(false);
+
+  public static class TestPropFilter extends Filter {
+
+    private Predicate<Key> keyPredicate;
+
+    private Predicate<Key> createRegexPredicate(String regex) {
+      Predicate<Key> kp = k -> true;
+      if (regex != null) {
+        var pattern = Pattern.compile(regex);
+        kp = k -> pattern.matcher(k.getRowData().toString()).matches();
+      }
+
+      return kp;
+    }
+
+    @Override
+    public void init(SortedKeyValueIterator<Key,Value> source, 
Map<String,String> options,
+        IteratorEnvironment env) throws IOException {
+      super.init(source, options, env);
+      Predicate<Key> generalPredicate =
+          
createRegexPredicate(env.getPluginEnv().getConfiguration().getCustom("testRegex"));
+      Predicate<Key> tablePredicate = createRegexPredicate(
+          
env.getPluginEnv().getConfiguration(env.getTableId()).getTableCustom("testRegex"));
+      keyPredicate = generalPredicate.and(tablePredicate);
+      initCalled.set(true);
+    }
+
+    @Override
+    public boolean accept(Key k, Value v) {
+      return keyPredicate.test(k);
+    }
+  }
+
+  private void runPluginEnvTest(Set<String> expected) throws Exception {
+    try (var scanner = client.createScanner(tableName)) {
+      initCalled.set(false);
+      var csis = new ClientSideIteratorScanner(scanner);
+      csis.addScanIterator(new IteratorSetting(100, "filter", 
TestPropFilter.class));
+      assertEquals(expected,
+          csis.stream().map(e -> 
e.getKey().getRowData().toString()).collect(Collectors.toSet()));
+      // this check is here to ensure the iterator executed client side and 
not server side
+      assertTrue(initCalled.get());
+    }
+
+    // The offline scanner also runs iterators client side, so test its client 
side access to
+    // accumulo config from iterators also.
+    client.tableOperations().offline(tableName, true);
+    var context = (ClientContext) client;
+    try (OfflineScanner offlineScanner =
+        new OfflineScanner(context, context.getTableId(tableName), 
Authorizations.EMPTY)) {
+      initCalled.set(false);
+      offlineScanner.addScanIterator(new IteratorSetting(100, "filter", 
TestPropFilter.class));
+      assertEquals(expected, offlineScanner.stream().map(e -> 
e.getKey().getRowData().toString())
+          .collect(Collectors.toSet()));
+      assertTrue(initCalled.get());
+    }
+    client.tableOperations().online(tableName, true);
+  }
+
+  /**
+   * Test an iterators ability to access accumulo config in an iterator 
running client side.
+   */
+  @Test
+  public void testPluginEnv() throws Exception {
+    Set<String> rows = Set.of("1234", "abc", "xyz789");
+
+    client.tableOperations().create(tableName);
+    try (BatchWriter bw = client.createBatchWriter(tableName)) {
+      for (var row : rows) {
+        Mutation m = new Mutation(row);
+        m.put("f", "q", "v");
+        bw.addMutation(m);
+      }
+    }
+
+    runPluginEnvTest(rows);
+
+    // The iterator should see the following system property and filter based 
on it
+    client.instanceOperations().setProperty("general.custom.testRegex", 
".*[a-z]+.*");
+    runPluginEnvTest(Set.of("abc", "xyz789"));
+
+    // The iterator should see the following table property and filter based 
on the table and system
+    // property
+    client.tableOperations().setProperty(tableName, "table.custom.testRegex", 
".*[0-9]+.*");
+    runPluginEnvTest(Set.of("xyz789"));
+
+    // Remove the system property, so filtering should only happen based on 
the table property
+    client.instanceOperations().removeProperty("general.custom.testRegex");
+    runPluginEnvTest(Set.of("1234", "xyz789"));
+
+    // Iterator should do no filtering after removing this property
+    client.tableOperations().removeProperty(tableName, 
"table.custom.testRegex");
+    runPluginEnvTest(rows);
+  }
 }

Reply via email to