This is an automated email from the ASF dual-hosted git repository.

ctubbsii pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new 1ea29605ee Clear ServerConfigurationFactory cache on ZK notifications 
(#3245)
1ea29605ee is described below

commit 1ea29605ee953e8769f6493aac7fdeb1f6f80559
Author: EdColeman <[email protected]>
AuthorDate: Wed Apr 5 13:32:02 2023 -0400

    Clear ServerConfigurationFactory cache on ZK notifications (#3245)
    
    * Clear ServerConfigurationFactory cache on any detected ZooKeeper change
    * Improve ShellServerIT test
---
 .../server/conf/ServerConfigurationFactory.java    |  23 ++-
 .../apache/accumulo/test/shell/ShellServerIT.java  | 175 ++++++++++++---------
 2 files changed, 113 insertions(+), 85 deletions(-)

diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/conf/ServerConfigurationFactory.java
 
b/server/base/src/main/java/org/apache/accumulo/server/conf/ServerConfigurationFactory.java
index 287e9bca44..5453764091 100644
--- 
a/server/base/src/main/java/org/apache/accumulo/server/conf/ServerConfigurationFactory.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/conf/ServerConfigurationFactory.java
@@ -64,7 +64,7 @@ public class ServerConfigurationFactory extends 
ServerConfiguration {
 
   private final ServerContext context;
   private final SiteConfiguration siteConfig;
-  private final DeleteWatcher deleteWatcher = new DeleteWatcher();
+  private final ChangeWatcher changeWatcher = new ChangeWatcher();
 
   private static final int REFRESH_PERIOD_MINUTES = 15;
 
@@ -102,7 +102,7 @@ public class ServerConfigurationFactory extends 
ServerConfiguration {
   public TableConfiguration getTableConfiguration(TableId tableId) {
     return tableConfigs.computeIfAbsent(tableId, key -> {
       if (context.tableNodeExists(tableId)) {
-        context.getPropStore().registerAsListener(TablePropKey.of(context, 
tableId), deleteWatcher);
+        context.getPropStore().registerAsListener(TablePropKey.of(context, 
tableId), changeWatcher);
         var conf =
             new TableConfiguration(context, tableId, 
getNamespaceConfigurationForTable(tableId));
         ConfigCheckUtil.validate(conf);
@@ -127,34 +127,41 @@ public class ServerConfigurationFactory extends 
ServerConfiguration {
   public NamespaceConfiguration getNamespaceConfiguration(NamespaceId 
namespaceId) {
     return namespaceConfigs.computeIfAbsent(namespaceId, key -> {
       context.getPropStore().registerAsListener(NamespacePropKey.of(context, 
namespaceId),
-          deleteWatcher);
+          changeWatcher);
       var conf = new NamespaceConfiguration(context, namespaceId, 
getSystemConfiguration());
       ConfigCheckUtil.validate(conf);
       return conf;
     });
   }
 
-  private class DeleteWatcher implements PropChangeListener {
+  private class ChangeWatcher implements PropChangeListener {
 
     @Override
     public void zkChangeEvent(PropStoreKey<?> propStoreKey) {
-      // no-op. changes handled by prop store impl
+      clearLocalOnEvent(propStoreKey);
     }
 
     @Override
     public void cacheChangeEvent(PropStoreKey<?> propStoreKey) {
-      // no-op. changes handled by prop store impl
+      clearLocalOnEvent(propStoreKey);
     }
 
     @Override
     public void deleteEvent(PropStoreKey<?> propStoreKey) {
+      clearLocalOnEvent(propStoreKey);
+    }
+
+    private void clearLocalOnEvent(PropStoreKey<?> propStoreKey) {
+      // clearing the local secondary cache stored in this class forces a 
re-read from the prop
+      // store
+      // to guarantee that the updated vales(s) are re-read on a ZooKeeper 
change.
       if (propStoreKey instanceof NamespacePropKey) {
-        log.trace("configuration snapshot refresh: Handle namespace delete for 
{}", propStoreKey);
+        log.trace("configuration snapshot refresh: Handle namespace change for 
{}", propStoreKey);
         namespaceConfigs.remove(((NamespacePropKey) propStoreKey).getId());
         return;
       }
       if (propStoreKey instanceof TablePropKey) {
-        log.trace("configuration snapshot refresh: Handle table delete for 
{}", propStoreKey);
+        log.trace("configuration snapshot refresh: Handle table change for 
{}", propStoreKey);
         tableConfigs.remove(((TablePropKey) propStoreKey).getId());
         tableParentConfigs.remove(((TablePropKey) propStoreKey).getId());
       }
diff --git 
a/test/src/main/java/org/apache/accumulo/test/shell/ShellServerIT.java 
b/test/src/main/java/org/apache/accumulo/test/shell/ShellServerIT.java
index d24363a169..1aa4b36368 100644
--- a/test/src/main/java/org/apache/accumulo/test/shell/ShellServerIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/shell/ShellServerIT.java
@@ -52,7 +52,6 @@ import org.apache.accumulo.core.client.Accumulo;
 import org.apache.accumulo.core.client.AccumuloClient;
 import org.apache.accumulo.core.client.IteratorSetting;
 import org.apache.accumulo.core.client.Scanner;
-import org.apache.accumulo.core.client.admin.TableOperations;
 import org.apache.accumulo.core.client.sample.RowColumnSampler;
 import org.apache.accumulo.core.client.sample.RowSampler;
 import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
@@ -80,6 +79,7 @@ import org.apache.accumulo.harness.SharedMiniClusterBase;
 import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
 import org.apache.accumulo.test.compaction.TestCompactionStrategy;
 import org.apache.accumulo.test.functional.SlowIterator;
+import org.apache.accumulo.test.util.Wait;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -167,63 +167,93 @@ public class ShellServerIT extends SharedMiniClusterBase {
         getCluster().createAccumuloClient(getPrincipal(), new 
PasswordToken(getRootPassword()))) {
       client.securityOperations().grantNamespacePermission(getPrincipal(), "",
           NamespacePermission.ALTER_NAMESPACE);
+
+      final String tableBase = getUniqueNames(1)[0];
+      final String table = tableBase + "_export_src";
+      final String table2 = tableBase + "_import_tgt";
+
+      // exporttable / importtable
+      ts.exec("createtable " + table + " -evc", true);
+      make10();
+      ts.exec("addsplits row5", true);
+      ts.exec("config -t " + table + " -s table.split.threshold=345M", true);
+      ts.exec("offline " + table, true);
+      File exportDir = new File(rootPath, "ShellServerIT.export");
+      String exportUri = "file://" + exportDir;
+      String localTmp = "file://" + new File(rootPath, "ShellServerIT.tmp");
+      ts.exec("exporttable -t " + table + " " + exportUri, true);
+      DistCp cp = new DistCp(new Configuration(false), null);
+      String import_ = "file://" + new File(rootPath, "ShellServerIT.import");
+      ClientInfo info = ClientInfo.from(getCluster().getClientProperties());
+      if (info.saslEnabled()) {
+        // DistCp bugs out trying to get a fs delegation token to perform the 
cp. Just copy it
+        // ourselves by hand.
+        FileSystem fs = getCluster().getFileSystem();
+        FileSystem localFs = FileSystem.getLocal(new Configuration(false));
+
+        // Path on local fs to cp into
+        Path localTmpPath = new Path(localTmp);
+        localFs.mkdirs(localTmpPath);
+
+        // Path in remote fs to importtable from
+        Path importDir = new Path(import_);
+        fs.mkdirs(importDir);
+
+        // Implement a poor-man's DistCp
+        try (BufferedReader reader =
+            new BufferedReader(new FileReader(new File(exportDir, 
"distcp.txt"), UTF_8))) {
+          for (String line; (line = reader.readLine()) != null;) {
+            Path exportedFile = new Path(line);
+            // There isn't a cp on FileSystem??
+            log.info("Copying {} to {}", line, localTmpPath);
+            fs.copyToLocalFile(exportedFile, localTmpPath);
+            Path tmpFile = new Path(localTmpPath, exportedFile.getName());
+            log.info("Moving {} to the import directory {}", tmpFile, 
importDir);
+            fs.moveFromLocalFile(tmpFile, importDir);
+          }
+        }
+      } else {
+        String[] distCpArgs = {"-f", exportUri + "/distcp.txt", import_};
+        assertEquals(0, cp.run(distCpArgs), "Failed to run distcp: " + 
Arrays.toString(distCpArgs));
+      }
+      Thread.sleep(20);
+      ts.exec("importtable " + table2 + " " + import_, true);
+      ts.exec("config -t " + table2 + " -np", true, "345M", true);
+      ts.exec("getsplits -t " + table2, true, "row5", true);
+      ts.exec("constraint --list -t " + table2, true, 
"VisibilityConstraint=2", true);
+      ts.exec("online " + table, true);
+      ts.exec("deletetable -f " + table, true);
+      ts.exec("deletetable -f " + table2, true);
     }
+  }
 
-    final String table = getUniqueNames(1)[0];
-    final String table2 = table + "2";
+  @Test
+  public void propStressTest() throws Exception {
+    try (AccumuloClient client =
+        getCluster().createAccumuloClient(getPrincipal(), new 
PasswordToken(getRootPassword()))) {
+      client.securityOperations().grantNamespacePermission(getPrincipal(), "",
+          NamespacePermission.ALTER_NAMESPACE);
 
-    // exporttable / importtable
-    ts.exec("createtable " + table + " -evc", true);
-    make10();
-    ts.exec("addsplits row5", true);
-    ts.exec("config -t " + table + " -s table.split.threshold=345M", true);
-    ts.exec("offline " + table, true);
-    File exportDir = new File(rootPath, "ShellServerIT.export");
-    String exportUri = "file://" + exportDir;
-    String localTmp = "file://" + new File(rootPath, "ShellServerIT.tmp");
-    ts.exec("exporttable -t " + table + " " + exportUri, true);
-    DistCp cp = new DistCp(new Configuration(false), null);
-    String import_ = "file://" + new File(rootPath, "ShellServerIT.import");
-    ClientInfo info = ClientInfo.from(getCluster().getClientProperties());
-    if (info.saslEnabled()) {
-      // DistCp bugs out trying to get a fs delegation token to perform the 
cp. Just copy it
-      // ourselves by hand.
-      FileSystem fs = getCluster().getFileSystem();
-      FileSystem localFs = FileSystem.getLocal(new Configuration(false));
-
-      // Path on local fs to cp into
-      Path localTmpPath = new Path(localTmp);
-      localFs.mkdirs(localTmpPath);
-
-      // Path in remote fs to importtable from
-      Path importDir = new Path(import_);
-      fs.mkdirs(importDir);
-
-      // Implement a poor-man's DistCp
-      try (BufferedReader reader =
-          new BufferedReader(new FileReader(new File(exportDir, "distcp.txt"), 
UTF_8))) {
-        for (String line; (line = reader.readLine()) != null;) {
-          Path exportedFile = new Path(line);
-          // There isn't a cp on FileSystem??
-          log.info("Copying {} to {}", line, localTmpPath);
-          fs.copyToLocalFile(exportedFile, localTmpPath);
-          Path tmpFile = new Path(localTmpPath, exportedFile.getName());
-          log.info("Moving {} to the import directory {}", tmpFile, importDir);
-          fs.moveFromLocalFile(tmpFile, importDir);
-        }
+      final String table = getUniqueNames(1)[0];
+
+      ts.exec("createtable " + table + " -evc", true);
+      make10();
+      ts.exec("addsplits row5", true);
+
+      ts.exec("config -t " + table + " -s table.split.threshold=345M", true);
+      for (int i = 0; i < 50; i++) {
+        String expected = (100 + i) + "M";
+        ts.exec("config -t " + table + " -s table.split.threshold=" + 
expected, true);
+        ts.exec("config -t " + table + " -np -f table.split.threshold", true, 
expected, true);
+
+        ts.exec("config -t " + table + " -s table.scan.max.memory=" + 
expected, true);
+        ts.exec("config -t " + table + " -np -f table.scan.max.memory", true, 
expected, true);
+
+        String bExpected = ((i % 2) == 0) ? "true" : "false";
+        ts.exec("config -t " + table + " -s table.bloom.enabled=" + bExpected, 
true);
+        ts.exec("config -t " + table + " -np -f table.bloom.enabled", true, 
bExpected, true);
       }
-    } else {
-      String[] distCpArgs = {"-f", exportUri + "/distcp.txt", import_};
-      assertEquals(0, cp.run(distCpArgs), "Failed to run distcp: " + 
Arrays.toString(distCpArgs));
     }
-    ts.exec("importtable " + table2 + " " + import_, true);
-    Thread.sleep(100);
-    ts.exec("config -t " + table2 + " -np", true, "345M", true);
-    ts.exec("getsplits -t " + table2, true, "row5", true);
-    ts.exec("constraint --list -t " + table2, true, "VisibilityConstraint=2", 
true);
-    ts.exec("online " + table, true);
-    ts.exec("deletetable -f " + table, true);
-    ts.exec("deletetable -f " + table2, true);
   }
 
   @Test
@@ -426,8 +456,7 @@ public class ShellServerIT extends SharedMiniClusterBase {
 
       String expectedKey = "table.iterator.scan.cfcounter";
       String expectedValue = "30," + COLUMN_FAMILY_COUNTER_ITERATOR;
-      TableOperations tops = client.tableOperations();
-      checkTableForProperty(tops, tableName0, expectedKey, expectedValue);
+      checkTableForProperty(client, tableName0, expectedKey, expectedValue);
 
       ts.exec("deletetable " + tableName0, true);
 
@@ -441,7 +470,7 @@ public class ShellServerIT extends SharedMiniClusterBase {
       ts.exec("setiter -scan -class " + COLUMN_FAMILY_COUNTER_ITERATOR + " -p 
30", true);
       expectedKey = "table.iterator.scan.customcfcounter";
       expectedValue = "30," + COLUMN_FAMILY_COUNTER_ITERATOR;
-      checkTableForProperty(tops, tableName1, expectedKey, expectedValue);
+      checkTableForProperty(client, tableName1, expectedKey, expectedValue);
 
       ts.exec("deletetable " + tableName1, true);
 
@@ -453,15 +482,16 @@ public class ShellServerIT extends SharedMiniClusterBase {
 
       // Name on the CLI should override OptionDescriber (or user input name, 
in this case)
       ts.exec("setiter -scan -class " + COLUMN_FAMILY_COUNTER_ITERATOR + " -p 
30", true);
+
       expectedKey = "table.iterator.scan.customcfcounter";
       expectedValue = "30," + COLUMN_FAMILY_COUNTER_ITERATOR;
-      checkTableForProperty(tops, tableName2, expectedKey, expectedValue);
+      checkTableForProperty(client, tableName2, expectedKey, expectedValue);
       expectedKey = "table.iterator.scan.customcfcounter.opt.name1";
       expectedValue = "value1";
-      checkTableForProperty(tops, tableName2, expectedKey, expectedValue);
+      checkTableForProperty(client, tableName2, expectedKey, expectedValue);
       expectedKey = "table.iterator.scan.customcfcounter.opt.name2";
       expectedValue = "value2";
-      checkTableForProperty(tops, tableName2, expectedKey, expectedValue);
+      checkTableForProperty(client, tableName2, expectedKey, expectedValue);
 
       ts.exec("deletetable " + tableName2, true);
 
@@ -476,33 +506,24 @@ public class ShellServerIT extends SharedMiniClusterBase {
           true);
       expectedKey = "table.iterator.scan.cfcounter";
       expectedValue = "30," + COLUMN_FAMILY_COUNTER_ITERATOR;
-      checkTableForProperty(tops, tableName3, expectedKey, expectedValue);
+      checkTableForProperty(client, tableName3, expectedKey, expectedValue);
       expectedKey = "table.iterator.scan.cfcounter.opt.name1";
       expectedValue = "value1.1,value1.2,value1.3";
-      checkTableForProperty(tops, tableName3, expectedKey, expectedValue);
+      checkTableForProperty(client, tableName3, expectedKey, expectedValue);
       expectedKey = "table.iterator.scan.cfcounter.opt.name2";
       expectedValue = "value2";
-      checkTableForProperty(tops, tableName3, expectedKey, expectedValue);
+      checkTableForProperty(client, tableName3, expectedKey, expectedValue);
 
       ts.exec("deletetable " + tableName3, true);
-
     }
   }
 
-  protected void checkTableForProperty(TableOperations tops, String tableName, 
String expectedKey,
-      String expectedValue) throws Exception {
-    for (int i = 0; i < 5; i++) {
-      for (Entry<String,String> entry : tops.getProperties(tableName)) {
-        if (expectedKey.equals(entry.getKey())) {
-          assertEquals(expectedValue, entry.getValue());
-          return;
-        }
-      }
-      Thread.sleep(500);
-    }
-
-    fail("Failed to find expected property on " + tableName + ": " + 
expectedKey + "="
-        + expectedValue);
+  protected void checkTableForProperty(final AccumuloClient client, final 
String tableName,
+      final String expectedKey, final String expectedValue) throws Exception {
+    assertTrue(
+        Wait.waitFor(() -> 
client.tableOperations().getConfiguration(tableName).get(expectedKey)
+            .equals(expectedValue), 5000, 500),
+        "Failed to find expected value for key: " + expectedKey);
   }
 
   @Test

Reply via email to