This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch 2.1 in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/2.1 by this push: new 7947c2c2df adds PluginEnv support to client side iterators (#4283) 7947c2c2df is described below commit 7947c2c2dfb80fb280420b729b4197547be22124 Author: Keith Turner <ktur...@apache.org> AuthorDate: Wed Mar 20 18:25:43 2024 -0400 adds PluginEnv support to client side iterators (#4283) Accumulo code that ran user iterators client side would throw an unsupported operations exception when attempting to access the PluginEnv. This commit adds support for PluginEnv in situations where iterators are run client side. --- .../core/client/ClientSideIteratorScanner.java | 46 ++++++++- .../accumulo/core/clientImpl/OfflineIterator.java | 34 ++++++- .../accumulo/core/clientImpl/ScannerImpl.java | 10 ++ .../apache/accumulo/test/ClientSideIteratorIT.java | 108 +++++++++++++++++++++ 4 files changed, 192 insertions(+), 6 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/client/ClientSideIteratorScanner.java b/core/src/main/java/org/apache/accumulo/core/client/ClientSideIteratorScanner.java index 542273d470..362c4e85d6 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/ClientSideIteratorScanner.java +++ b/core/src/main/java/org/apache/accumulo/core/client/ClientSideIteratorScanner.java @@ -29,15 +29,20 @@ import java.util.Map.Entry; import java.util.Set; import java.util.TreeMap; import java.util.TreeSet; +import java.util.function.Supplier; import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.client.sample.SamplerConfiguration; +import org.apache.accumulo.core.clientImpl.ClientContext; +import org.apache.accumulo.core.clientImpl.ClientServiceEnvironmentImpl; +import org.apache.accumulo.core.clientImpl.ScannerImpl; import org.apache.accumulo.core.clientImpl.ScannerOptions; import org.apache.accumulo.core.data.ArrayByteSequence; import org.apache.accumulo.core.data.ByteSequence; import org.apache.accumulo.core.data.Column; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.dataImpl.thrift.IterInfo; import org.apache.accumulo.core.iterators.IteratorAdapter; @@ -47,6 +52,7 @@ import org.apache.accumulo.core.iterators.SortedKeyValueIterator; import org.apache.accumulo.core.iteratorsImpl.IteratorBuilder; import org.apache.accumulo.core.iteratorsImpl.IteratorConfigUtil; import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.spi.common.ServiceEnvironment; import org.apache.hadoop.io.Text; /** @@ -70,6 +76,7 @@ import org.apache.hadoop.io.Text; * server side) and to the client side scanner (which will execute client side). */ public class ClientSideIteratorScanner extends ScannerOptions implements Scanner { + private int size; private Range range; @@ -77,6 +84,9 @@ public class ClientSideIteratorScanner extends ScannerOptions implements Scanner private long readaheadThreshold = Constants.SCANNER_DEFAULT_READAHEAD_THRESHOLD; private SamplerConfiguration iteratorSamplerConfig; + private final Supplier<ClientContext> context; + private final Supplier<TableId> tableId; + private class ClientSideIteratorEnvironment implements IteratorEnvironment { private SamplerConfiguration samplerConfig; @@ -94,7 +104,9 @@ public class ClientSideIteratorScanner extends ScannerOptions implements Scanner @Override public boolean isFullMajorCompaction() { - return false; + // The javadocs state this method will throw an ISE when scope is not majc + throw new IllegalStateException( + "Asked about major compaction type when scope is " + getIteratorScope()); } @Override @@ -121,6 +133,22 @@ public class ClientSideIteratorScanner extends ScannerOptions implements Scanner public SamplerConfiguration getSamplerConfiguration() { return samplerConfig; } + + @Deprecated(since = "2.1.0") + @Override + public ServiceEnvironment getServiceEnv() { + return new ClientServiceEnvironmentImpl(context.get()); + } + + @Override + public PluginEnvironment getPluginEnv() { + return new ClientServiceEnvironmentImpl(context.get()); + } + + @Override + public TableId getTableId() { + return tableId.get(); + } } /** @@ -220,6 +248,22 @@ public class ClientSideIteratorScanner extends ScannerOptions implements Scanner if (samplerConfig != null) { setSamplerConfiguration(samplerConfig); } + + if (scanner instanceof ScannerImpl) { + var scannerImpl = (ScannerImpl) scanner; + this.context = () -> scannerImpl.getClientContext(); + this.tableId = () -> scannerImpl.getTableId(); + } else { + // These may never be used, so only fail if an attempt is made to use them. + this.context = () -> { + throw new UnsupportedOperationException( + "Do not know how to obtain client context from " + scanner.getClass().getName()); + }; + this.tableId = () -> { + throw new UnsupportedOperationException( + "Do not know how to obtain tableId from " + scanner.getClass().getName()); + }; + } } /** diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/OfflineIterator.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/OfflineIterator.java index e24724d066..a03cc811ab 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/OfflineIterator.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/OfflineIterator.java @@ -34,6 +34,7 @@ import java.util.Map.Entry; import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.PluginEnvironment; import org.apache.accumulo.core.client.SampleNotPresentException; import org.apache.accumulo.core.client.TableNotFoundException; import org.apache.accumulo.core.client.sample.SamplerConfiguration; @@ -65,6 +66,7 @@ import org.apache.accumulo.core.metadata.schema.TabletsMetadata; import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.security.ColumnVisibility; +import org.apache.accumulo.core.spi.common.ServiceEnvironment; import org.apache.accumulo.core.util.LocalityGroupUtil; import org.apache.accumulo.core.volume.VolumeConfiguration; import org.apache.hadoop.conf.Configuration; @@ -79,9 +81,13 @@ class OfflineIterator implements Iterator<Entry<Key,Value>> { private final AccumuloConfiguration conf; private final boolean useSample; private final SamplerConfiguration sampleConf; + private final ClientContext context; + private final TableId tableId; - public OfflineIteratorEnvironment(Authorizations auths, AccumuloConfiguration acuTableConf, - boolean useSample, SamplerConfiguration samplerConf) { + public OfflineIteratorEnvironment(ClientContext context, TableId tableId, Authorizations auths, + AccumuloConfiguration acuTableConf, boolean useSample, SamplerConfiguration samplerConf) { + this.context = context; + this.tableId = tableId; this.authorizations = auths; this.conf = acuTableConf; this.useSample = useSample; @@ -147,7 +153,24 @@ class OfflineIterator implements Iterator<Entry<Key,Value>> { if (sampleConf == null) { throw new SampleNotPresentException(); } - return new OfflineIteratorEnvironment(authorizations, conf, true, sampleConf); + return new OfflineIteratorEnvironment(context, tableId, authorizations, conf, true, + sampleConf); + } + + @Deprecated(since = "2.1.0") + @Override + public ServiceEnvironment getServiceEnv() { + return new ClientServiceEnvironmentImpl(context); + } + + @Override + public PluginEnvironment getPluginEnv() { + return new ClientServiceEnvironmentImpl(context); + } + + @Override + public TableId getTableId() { + return tableId; } } @@ -322,8 +345,9 @@ class OfflineIterator implements Iterator<Entry<Key,Value>> { MultiIterator multiIter = new MultiIterator(readers, extent); - OfflineIteratorEnvironment iterEnv = new OfflineIteratorEnvironment(authorizations, tableCC, - false, samplerConfImpl == null ? null : samplerConfImpl.toSamplerConfiguration()); + OfflineIteratorEnvironment iterEnv = + new OfflineIteratorEnvironment(context, tableId, authorizations, tableCC, false, + samplerConfImpl == null ? null : samplerConfImpl.toSamplerConfiguration()); byte[] defaultSecurityLabel; ColumnVisibility cv = diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/ScannerImpl.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/ScannerImpl.java index c418e5a412..a1f77184ba 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/ScannerImpl.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/ScannerImpl.java @@ -120,6 +120,16 @@ public class ScannerImpl extends ScannerOptions implements Scanner { this.size = Constants.SCAN_BATCH_SIZE; } + public ClientContext getClientContext() { + ensureOpen(); + return context; + } + + public TableId getTableId() { + ensureOpen(); + return tableId; + } + @Override public synchronized void setRange(Range range) { ensureOpen(); diff --git a/test/src/main/java/org/apache/accumulo/test/ClientSideIteratorIT.java b/test/src/main/java/org/apache/accumulo/test/ClientSideIteratorIT.java index e71fc20e43..c7f56fb71f 100644 --- a/test/src/main/java/org/apache/accumulo/test/ClientSideIteratorIT.java +++ b/test/src/main/java/org/apache/accumulo/test/ClientSideIteratorIT.java @@ -22,9 +22,16 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; +import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Predicate; +import java.util.regex.Pattern; +import java.util.stream.Collectors; import org.apache.accumulo.core.client.Accumulo; import org.apache.accumulo.core.client.AccumuloClient; @@ -32,10 +39,15 @@ import org.apache.accumulo.core.client.BatchWriter; import org.apache.accumulo.core.client.ClientSideIteratorScanner; import org.apache.accumulo.core.client.IteratorSetting; import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.clientImpl.ClientContext; +import org.apache.accumulo.core.clientImpl.OfflineScanner; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.data.PartialKey; import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.iterators.Filter; +import org.apache.accumulo.core.iterators.IteratorEnvironment; +import org.apache.accumulo.core.iterators.SortedKeyValueIterator; import org.apache.accumulo.core.iterators.user.IntersectingIterator; import org.apache.accumulo.core.iterators.user.VersioningIterator; import org.apache.accumulo.core.security.Authorizations; @@ -154,4 +166,100 @@ public class ClientSideIteratorIT extends AccumuloClusterHarness { assertFalse(csis.iterator().hasNext()); } } + + private static final AtomicBoolean initCalled = new AtomicBoolean(false); + + public static class TestPropFilter extends Filter { + + private Predicate<Key> keyPredicate; + + private Predicate<Key> createRegexPredicate(String regex) { + Predicate<Key> kp = k -> true; + if (regex != null) { + var pattern = Pattern.compile(regex); + kp = k -> pattern.matcher(k.getRowData().toString()).matches(); + } + + return kp; + } + + @Override + public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, + IteratorEnvironment env) throws IOException { + super.init(source, options, env); + Predicate<Key> generalPredicate = + createRegexPredicate(env.getPluginEnv().getConfiguration().getCustom("testRegex")); + Predicate<Key> tablePredicate = createRegexPredicate( + env.getPluginEnv().getConfiguration(env.getTableId()).getTableCustom("testRegex")); + keyPredicate = generalPredicate.and(tablePredicate); + initCalled.set(true); + } + + @Override + public boolean accept(Key k, Value v) { + return keyPredicate.test(k); + } + } + + private void runPluginEnvTest(Set<String> expected) throws Exception { + try (var scanner = client.createScanner(tableName)) { + initCalled.set(false); + var csis = new ClientSideIteratorScanner(scanner); + csis.addScanIterator(new IteratorSetting(100, "filter", TestPropFilter.class)); + assertEquals(expected, + csis.stream().map(e -> e.getKey().getRowData().toString()).collect(Collectors.toSet())); + // this check is here to ensure the iterator executed client side and not server side + assertTrue(initCalled.get()); + } + + // The offline scanner also runs iterators client side, so test its client side access to + // accumulo config from iterators also. + client.tableOperations().offline(tableName, true); + var context = (ClientContext) client; + try (OfflineScanner offlineScanner = + new OfflineScanner(context, context.getTableId(tableName), Authorizations.EMPTY)) { + initCalled.set(false); + offlineScanner.addScanIterator(new IteratorSetting(100, "filter", TestPropFilter.class)); + assertEquals(expected, offlineScanner.stream().map(e -> e.getKey().getRowData().toString()) + .collect(Collectors.toSet())); + assertTrue(initCalled.get()); + } + client.tableOperations().online(tableName, true); + } + + /** + * Test an iterators ability to access accumulo config in an iterator running client side. + */ + @Test + public void testPluginEnv() throws Exception { + Set<String> rows = Set.of("1234", "abc", "xyz789"); + + client.tableOperations().create(tableName); + try (BatchWriter bw = client.createBatchWriter(tableName)) { + for (var row : rows) { + Mutation m = new Mutation(row); + m.put("f", "q", "v"); + bw.addMutation(m); + } + } + + runPluginEnvTest(rows); + + // The iterator should see the following system property and filter based on it + client.instanceOperations().setProperty("general.custom.testRegex", ".*[a-z]+.*"); + runPluginEnvTest(Set.of("abc", "xyz789")); + + // The iterator should see the following table property and filter based on the table and system + // property + client.tableOperations().setProperty(tableName, "table.custom.testRegex", ".*[0-9]+.*"); + runPluginEnvTest(Set.of("xyz789")); + + // Remove the system property, so filtering should only happen based on the table property + client.instanceOperations().removeProperty("general.custom.testRegex"); + runPluginEnvTest(Set.of("1234", "xyz789")); + + // Iterator should do no filtering after removing this property + client.tableOperations().removeProperty(tableName, "table.custom.testRegex"); + runPluginEnvTest(rows); + } }