This is an automated email from the ASF dual-hosted git repository.
krathbun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push:
new f682971dd0 Added Eventual Scan test in ComprehensiveIT (#4754)
f682971dd0 is described below
commit f682971dd0c90e6512061b5f00b6d83424140962
Author: Arbaaz Khan <[email protected]>
AuthorDate: Thu Aug 28 17:22:37 2025 -0400
Added Eventual Scan test in ComprehensiveIT (#4754)
* Added Eventual Scan test
---
.../accumulo/test/ComprehensiveFlakyAmpleIT.java | 21 +++++--
.../accumulo/test/ComprehensiveFlakyFateIT.java | 17 +++++-
...iveIT_SimpleSuite.java => ComprehensiveIT.java} | 20 ++++++-
.../apache/accumulo/test/ComprehensiveITBase.java | 68 +++++++++++++++++++++-
4 files changed, 116 insertions(+), 10 deletions(-)
diff --git
a/test/src/main/java/org/apache/accumulo/test/ComprehensiveFlakyAmpleIT.java
b/test/src/main/java/org/apache/accumulo/test/ComprehensiveFlakyAmpleIT.java
index 100985e34e..71429006ff 100644
--- a/test/src/main/java/org/apache/accumulo/test/ComprehensiveFlakyAmpleIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/ComprehensiveFlakyAmpleIT.java
@@ -20,8 +20,11 @@ package org.apache.accumulo.test;
import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.harness.MiniClusterConfigurationCallback;
import org.apache.accumulo.harness.SharedMiniClusterBase;
import org.apache.accumulo.minicluster.ServerType;
+import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
import org.apache.accumulo.test.ample.FlakyAmpleManager;
import org.apache.accumulo.test.ample.FlakyAmpleTserver;
import org.junit.jupiter.api.AfterAll;
@@ -33,12 +36,22 @@ import org.junit.jupiter.api.BeforeAll;
* metadata updates using Ample.
*/
public class ComprehensiveFlakyAmpleIT extends ComprehensiveITBase {
- @BeforeAll
- public static void setup() throws Exception {
- SharedMiniClusterBase.startMiniClusterWithConfig((cfg, coreSite) -> {
+
+ private static class ComprehensiveFlakyAmpleITConfiguration
+ implements MiniClusterConfigurationCallback {
+ @Override
+ public void configureMiniCluster(MiniAccumuloConfigImpl cfg,
+ org.apache.hadoop.conf.Configuration coreSite) {
+ cfg.setProperty(Property.SSERV_CACHED_TABLET_METADATA_EXPIRATION, "5s");
cfg.setServerClass(ServerType.MANAGER, r -> FlakyAmpleManager.class);
cfg.setServerClass(ServerType.TABLET_SERVER, r ->
FlakyAmpleTserver.class);
- });
+ }
+ }
+
+ @BeforeAll
+ public static void setup() throws Exception {
+ ComprehensiveFlakyAmpleITConfiguration c = new
ComprehensiveFlakyAmpleITConfiguration();
+ SharedMiniClusterBase.startMiniClusterWithConfig(c);
try (AccumuloClient client =
Accumulo.newClient().from(getClientProps()).build()) {
client.securityOperations().changeUserAuthorizations("root",
AUTHORIZATIONS);
diff --git
a/test/src/main/java/org/apache/accumulo/test/ComprehensiveFlakyFateIT.java
b/test/src/main/java/org/apache/accumulo/test/ComprehensiveFlakyFateIT.java
index 7c31118d20..0e4691ea30 100644
--- a/test/src/main/java/org/apache/accumulo/test/ComprehensiveFlakyFateIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/ComprehensiveFlakyFateIT.java
@@ -20,8 +20,11 @@ package org.apache.accumulo.test;
import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.harness.MiniClusterConfigurationCallback;
import org.apache.accumulo.harness.SharedMiniClusterBase;
import org.apache.accumulo.minicluster.ServerType;
+import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
import org.apache.accumulo.test.fate.FlakyFateManager;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
@@ -31,10 +34,20 @@ import org.junit.jupiter.api.BeforeAll;
* {@link org.apache.accumulo.test.fate.FlakyFate} because it will run a lot
of FATE operations.
*/
public class ComprehensiveFlakyFateIT extends ComprehensiveITBase {
+ private static class ComprehensiveFlakyFateITConfiguration
+ implements MiniClusterConfigurationCallback {
+ @Override
+ public void configureMiniCluster(MiniAccumuloConfigImpl cfg,
+ org.apache.hadoop.conf.Configuration coreSite) {
+ cfg.setProperty(Property.SSERV_CACHED_TABLET_METADATA_EXPIRATION, "5s");
+ cfg.setServerClass(ServerType.MANAGER, r -> FlakyFateManager.class);
+ }
+ }
+
@BeforeAll
public static void setup() throws Exception {
- SharedMiniClusterBase.startMiniClusterWithConfig(
- (cfg, coreSite) -> cfg.setServerClass(ServerType.MANAGER, r ->
FlakyFateManager.class));
+ ComprehensiveFlakyFateITConfiguration c = new
ComprehensiveFlakyFateITConfiguration();
+ SharedMiniClusterBase.startMiniClusterWithConfig(c);
try (AccumuloClient client =
Accumulo.newClient().from(getClientProps()).build()) {
client.securityOperations().changeUserAuthorizations("root",
AUTHORIZATIONS);
diff --git
a/test/src/main/java/org/apache/accumulo/test/ComprehensiveIT_SimpleSuite.java
b/test/src/main/java/org/apache/accumulo/test/ComprehensiveIT.java
similarity index 64%
rename from
test/src/main/java/org/apache/accumulo/test/ComprehensiveIT_SimpleSuite.java
rename to test/src/main/java/org/apache/accumulo/test/ComprehensiveIT.java
index 0cd25ea302..3ffb5cf003 100644
---
a/test/src/main/java/org/apache/accumulo/test/ComprehensiveIT_SimpleSuite.java
+++ b/test/src/main/java/org/apache/accumulo/test/ComprehensiveIT.java
@@ -19,20 +19,34 @@
package org.apache.accumulo.test;
import static org.apache.accumulo.harness.AccumuloITBase.SUNNY_DAY;
+import static org.apache.accumulo.harness.SharedMiniClusterBase.getClientProps;
+import static org.apache.accumulo.test.ComprehensiveITBase.AUTHORIZATIONS;
import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.harness.MiniClusterConfigurationCallback;
import org.apache.accumulo.harness.SharedMiniClusterBase;
+import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Tag;
@Tag(SUNNY_DAY)
-public class ComprehensiveIT_SimpleSuite extends ComprehensiveITBase {
+public class ComprehensiveIT extends ComprehensiveITBase {
+
+ private static class ComprehensiveITConfiguration implements
MiniClusterConfigurationCallback {
+ @Override
+ public void configureMiniCluster(MiniAccumuloConfigImpl cfg,
+ org.apache.hadoop.conf.Configuration coreSite) {
+ cfg.setProperty(Property.SSERV_CACHED_TABLET_METADATA_EXPIRATION, "5s");
+ }
+ }
+
@BeforeAll
public static void setup() throws Exception {
- SharedMiniClusterBase.startMiniCluster();
-
+ ComprehensiveITConfiguration c = new ComprehensiveITConfiguration();
+ SharedMiniClusterBase.startMiniClusterWithConfig(c);
try (AccumuloClient client =
Accumulo.newClient().from(getClientProps()).build()) {
client.securityOperations().changeUserAuthorizations("root",
AUTHORIZATIONS);
}
diff --git
a/test/src/main/java/org/apache/accumulo/test/ComprehensiveITBase.java
b/test/src/main/java/org/apache/accumulo/test/ComprehensiveITBase.java
index 9517229ba2..a6406a7556 100644
--- a/test/src/main/java/org/apache/accumulo/test/ComprehensiveITBase.java
+++ b/test/src/main/java/org/apache/accumulo/test/ComprehensiveITBase.java
@@ -20,6 +20,7 @@ package org.apache.accumulo.test;
import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.stream.Collectors.toMap;
+import static
org.apache.accumulo.core.client.ScannerBase.ConsistencyLevel.EVENTUAL;
import static org.apache.accumulo.core.util.LazySingletons.RANDOM;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -33,10 +34,12 @@ import java.io.InputStreamReader;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Properties;
import java.util.Set;
import java.util.SortedMap;
import java.util.SortedSet;
@@ -77,6 +80,7 @@ import
org.apache.accumulo.core.client.security.SecurityErrorCode;
import org.apache.accumulo.core.client.security.tokens.PasswordToken;
import org.apache.accumulo.core.client.summary.CountingSummarizer;
import org.apache.accumulo.core.client.summary.SummarizerConfiguration;
+import org.apache.accumulo.core.conf.ClientProperty;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.ColumnUpdate;
import org.apache.accumulo.core.data.Condition;
@@ -97,7 +101,9 @@ import org.apache.accumulo.core.security.ColumnVisibility;
import org.apache.accumulo.core.security.NamespacePermission;
import org.apache.accumulo.core.security.SystemPermission;
import org.apache.accumulo.core.security.TablePermission;
+import org.apache.accumulo.core.spi.scan.ConfigurableScanServerSelector;
import org.apache.accumulo.harness.SharedMiniClusterBase;
+import org.apache.accumulo.minicluster.ServerType;
import org.apache.accumulo.test.util.Wait;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
@@ -119,7 +125,67 @@ public abstract class ComprehensiveITBase extends
SharedMiniClusterBase {
public static final String DOG_AND_CAT = "DOG&CAT";
static final Authorizations AUTHORIZATIONS = new Authorizations("CAT",
"DOG");
- private static final Logger log =
LoggerFactory.getLogger(ComprehensiveIT_SimpleSuite.class);
+ private static final Logger log =
LoggerFactory.getLogger(ComprehensiveIT.class);
+
+ @Test
+ public void testEventualScan() throws Exception {
+ Properties props = new Properties();
+ props.putAll(getClientProps());
+ props.put(ClientProperty.SCAN_SERVER_SELECTOR.getKey(),
+ ConfigurableScanServerSelector.class.getName());
+ props.put(ClientProperty.SCAN_SERVER_SELECTOR_OPTS_PREFIX.getKey() +
"profiles",
+ "[{'isDefault':true,'timeToWaitForScanServers' :
'10d','maxBusyTimeout':'5m','busyTimeoutMultiplier':4,'attemptPlans':[{\"servers\":\"100%\",
\"busyTimeout\":\"3ms\"}]}]");
+
+ try (AccumuloClient client = Accumulo.newClient().from(props).build()) {
+ String table = getUniqueNames(1)[0];
+ client.tableOperations().create(table);
+ getCluster().getClusterControl().start(ServerType.SCAN_SERVER);
+ Wait.waitFor(
+ () ->
!client.instanceOperations().getServers(ServerId.Type.SCAN_SERVER).isEmpty());
+
+ write(client, table, generateMutations(0, 100, tr -> true));
+ verifyData(client, table, AUTHORIZATIONS, generateKeys(0, 100), scanner
-> {});
+ verifyData(client, table, AUTHORIZATIONS, Collections.emptySortedMap(),
+ scanner -> scanner.setConsistencyLevel(EVENTUAL));
+
+ client.tableOperations().flush(table, null, null, true);
+ Wait.waitFor(() -> {
+ try (var scanner = client.createScanner(table, AUTHORIZATIONS)) {
+ scanner.setConsistencyLevel(EVENTUAL);
+ return scan(scanner).size() >= 100;
+ }
+ });
+
+ // should see all data that was flushed in eventual scan
+ verifyData(client, table, AUTHORIZATIONS, generateKeys(0, 100),
+ scanner -> scanner.setConsistencyLevel(EVENTUAL));
+ // should not see data with col vis set
+ verifyData(client, table, Authorizations.EMPTY, generateKeys(0, 100, tr
-> tr.vis.isEmpty()),
+ scanner -> scanner.setConsistencyLevel(EVENTUAL));
+
+ // write some more rows after 100 and verify those are not seen by
eventual scan until table
+ // is flushed.
+ write(client, table, generateMutations(100, 200, tr -> true));
+ verifyData(client, table, AUTHORIZATIONS, generateKeys(0, 100),
+ scanner -> scanner.setConsistencyLevel(EVENTUAL));
+
+ client.tableOperations().flush(table, null, null, true);
+ // wait for the eventual scan to see the new data
+ final int initialSize = generateKeys(0, 100).size();
+ Wait.waitFor(() -> {
+ try (var scanner = client.createScanner(table, AUTHORIZATIONS)) {
+ scanner.setConsistencyLevel(EVENTUAL);
+ return scan(scanner).size() > initialSize;
+ }
+ });
+
+ verifyData(client, table, AUTHORIZATIONS, generateKeys(0, 200),
+ scanner -> scanner.setConsistencyLevel(EVENTUAL));
+
+ } finally {
+ getCluster().getClusterControl().stop(ServerType.SCAN_SERVER);
+ }
+ }
@Test
public void testBulkImport() throws Exception {