This is an automated email from the ASF dual-hosted git repository.
ctubbsii pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/2.1 by this push:
new 1ea29605ee Clear ServerConfigurationFactory cache on ZK notifications
(#3245)
1ea29605ee is described below
commit 1ea29605ee953e8769f6493aac7fdeb1f6f80559
Author: EdColeman <[email protected]>
AuthorDate: Wed Apr 5 13:32:02 2023 -0400
Clear ServerConfigurationFactory cache on ZK notifications (#3245)
* Clear ServerConfigurationFactory cache on any detected ZooKeeper change
* Improve ShellServerIT test
---
.../server/conf/ServerConfigurationFactory.java | 23 ++-
.../apache/accumulo/test/shell/ShellServerIT.java | 175 ++++++++++++---------
2 files changed, 113 insertions(+), 85 deletions(-)
diff --git
a/server/base/src/main/java/org/apache/accumulo/server/conf/ServerConfigurationFactory.java
b/server/base/src/main/java/org/apache/accumulo/server/conf/ServerConfigurationFactory.java
index 287e9bca44..5453764091 100644
---
a/server/base/src/main/java/org/apache/accumulo/server/conf/ServerConfigurationFactory.java
+++
b/server/base/src/main/java/org/apache/accumulo/server/conf/ServerConfigurationFactory.java
@@ -64,7 +64,7 @@ public class ServerConfigurationFactory extends
ServerConfiguration {
private final ServerContext context;
private final SiteConfiguration siteConfig;
- private final DeleteWatcher deleteWatcher = new DeleteWatcher();
+ private final ChangeWatcher changeWatcher = new ChangeWatcher();
private static final int REFRESH_PERIOD_MINUTES = 15;
@@ -102,7 +102,7 @@ public class ServerConfigurationFactory extends
ServerConfiguration {
public TableConfiguration getTableConfiguration(TableId tableId) {
return tableConfigs.computeIfAbsent(tableId, key -> {
if (context.tableNodeExists(tableId)) {
- context.getPropStore().registerAsListener(TablePropKey.of(context,
tableId), deleteWatcher);
+ context.getPropStore().registerAsListener(TablePropKey.of(context,
tableId), changeWatcher);
var conf =
new TableConfiguration(context, tableId,
getNamespaceConfigurationForTable(tableId));
ConfigCheckUtil.validate(conf);
@@ -127,34 +127,41 @@ public class ServerConfigurationFactory extends
ServerConfiguration {
public NamespaceConfiguration getNamespaceConfiguration(NamespaceId
namespaceId) {
return namespaceConfigs.computeIfAbsent(namespaceId, key -> {
context.getPropStore().registerAsListener(NamespacePropKey.of(context,
namespaceId),
- deleteWatcher);
+ changeWatcher);
var conf = new NamespaceConfiguration(context, namespaceId,
getSystemConfiguration());
ConfigCheckUtil.validate(conf);
return conf;
});
}
- private class DeleteWatcher implements PropChangeListener {
+ private class ChangeWatcher implements PropChangeListener {
@Override
public void zkChangeEvent(PropStoreKey<?> propStoreKey) {
- // no-op. changes handled by prop store impl
+ clearLocalOnEvent(propStoreKey);
}
@Override
public void cacheChangeEvent(PropStoreKey<?> propStoreKey) {
- // no-op. changes handled by prop store impl
+ clearLocalOnEvent(propStoreKey);
}
@Override
public void deleteEvent(PropStoreKey<?> propStoreKey) {
+ clearLocalOnEvent(propStoreKey);
+ }
+
+ private void clearLocalOnEvent(PropStoreKey<?> propStoreKey) {
+ // clearing the local secondary cache stored in this class forces a
re-read from the prop
+ // store
+ // to guarantee that the updated vales(s) are re-read on a ZooKeeper
change.
if (propStoreKey instanceof NamespacePropKey) {
- log.trace("configuration snapshot refresh: Handle namespace delete for
{}", propStoreKey);
+ log.trace("configuration snapshot refresh: Handle namespace change for
{}", propStoreKey);
namespaceConfigs.remove(((NamespacePropKey) propStoreKey).getId());
return;
}
if (propStoreKey instanceof TablePropKey) {
- log.trace("configuration snapshot refresh: Handle table delete for
{}", propStoreKey);
+ log.trace("configuration snapshot refresh: Handle table change for
{}", propStoreKey);
tableConfigs.remove(((TablePropKey) propStoreKey).getId());
tableParentConfigs.remove(((TablePropKey) propStoreKey).getId());
}
diff --git
a/test/src/main/java/org/apache/accumulo/test/shell/ShellServerIT.java
b/test/src/main/java/org/apache/accumulo/test/shell/ShellServerIT.java
index d24363a169..1aa4b36368 100644
--- a/test/src/main/java/org/apache/accumulo/test/shell/ShellServerIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/shell/ShellServerIT.java
@@ -52,7 +52,6 @@ import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.client.Scanner;
-import org.apache.accumulo.core.client.admin.TableOperations;
import org.apache.accumulo.core.client.sample.RowColumnSampler;
import org.apache.accumulo.core.client.sample.RowSampler;
import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
@@ -80,6 +79,7 @@ import org.apache.accumulo.harness.SharedMiniClusterBase;
import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
import org.apache.accumulo.test.compaction.TestCompactionStrategy;
import org.apache.accumulo.test.functional.SlowIterator;
+import org.apache.accumulo.test.util.Wait;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -167,63 +167,93 @@ public class ShellServerIT extends SharedMiniClusterBase {
getCluster().createAccumuloClient(getPrincipal(), new
PasswordToken(getRootPassword()))) {
client.securityOperations().grantNamespacePermission(getPrincipal(), "",
NamespacePermission.ALTER_NAMESPACE);
+
+ final String tableBase = getUniqueNames(1)[0];
+ final String table = tableBase + "_export_src";
+ final String table2 = tableBase + "_import_tgt";
+
+ // exporttable / importtable
+ ts.exec("createtable " + table + " -evc", true);
+ make10();
+ ts.exec("addsplits row5", true);
+ ts.exec("config -t " + table + " -s table.split.threshold=345M", true);
+ ts.exec("offline " + table, true);
+ File exportDir = new File(rootPath, "ShellServerIT.export");
+ String exportUri = "file://" + exportDir;
+ String localTmp = "file://" + new File(rootPath, "ShellServerIT.tmp");
+ ts.exec("exporttable -t " + table + " " + exportUri, true);
+ DistCp cp = new DistCp(new Configuration(false), null);
+ String import_ = "file://" + new File(rootPath, "ShellServerIT.import");
+ ClientInfo info = ClientInfo.from(getCluster().getClientProperties());
+ if (info.saslEnabled()) {
+ // DistCp bugs out trying to get a fs delegation token to perform the
cp. Just copy it
+ // ourselves by hand.
+ FileSystem fs = getCluster().getFileSystem();
+ FileSystem localFs = FileSystem.getLocal(new Configuration(false));
+
+ // Path on local fs to cp into
+ Path localTmpPath = new Path(localTmp);
+ localFs.mkdirs(localTmpPath);
+
+ // Path in remote fs to importtable from
+ Path importDir = new Path(import_);
+ fs.mkdirs(importDir);
+
+ // Implement a poor-man's DistCp
+ try (BufferedReader reader =
+ new BufferedReader(new FileReader(new File(exportDir,
"distcp.txt"), UTF_8))) {
+ for (String line; (line = reader.readLine()) != null;) {
+ Path exportedFile = new Path(line);
+ // There isn't a cp on FileSystem??
+ log.info("Copying {} to {}", line, localTmpPath);
+ fs.copyToLocalFile(exportedFile, localTmpPath);
+ Path tmpFile = new Path(localTmpPath, exportedFile.getName());
+ log.info("Moving {} to the import directory {}", tmpFile,
importDir);
+ fs.moveFromLocalFile(tmpFile, importDir);
+ }
+ }
+ } else {
+ String[] distCpArgs = {"-f", exportUri + "/distcp.txt", import_};
+ assertEquals(0, cp.run(distCpArgs), "Failed to run distcp: " +
Arrays.toString(distCpArgs));
+ }
+ Thread.sleep(20);
+ ts.exec("importtable " + table2 + " " + import_, true);
+ ts.exec("config -t " + table2 + " -np", true, "345M", true);
+ ts.exec("getsplits -t " + table2, true, "row5", true);
+ ts.exec("constraint --list -t " + table2, true,
"VisibilityConstraint=2", true);
+ ts.exec("online " + table, true);
+ ts.exec("deletetable -f " + table, true);
+ ts.exec("deletetable -f " + table2, true);
}
+ }
- final String table = getUniqueNames(1)[0];
- final String table2 = table + "2";
+ @Test
+ public void propStressTest() throws Exception {
+ try (AccumuloClient client =
+ getCluster().createAccumuloClient(getPrincipal(), new
PasswordToken(getRootPassword()))) {
+ client.securityOperations().grantNamespacePermission(getPrincipal(), "",
+ NamespacePermission.ALTER_NAMESPACE);
- // exporttable / importtable
- ts.exec("createtable " + table + " -evc", true);
- make10();
- ts.exec("addsplits row5", true);
- ts.exec("config -t " + table + " -s table.split.threshold=345M", true);
- ts.exec("offline " + table, true);
- File exportDir = new File(rootPath, "ShellServerIT.export");
- String exportUri = "file://" + exportDir;
- String localTmp = "file://" + new File(rootPath, "ShellServerIT.tmp");
- ts.exec("exporttable -t " + table + " " + exportUri, true);
- DistCp cp = new DistCp(new Configuration(false), null);
- String import_ = "file://" + new File(rootPath, "ShellServerIT.import");
- ClientInfo info = ClientInfo.from(getCluster().getClientProperties());
- if (info.saslEnabled()) {
- // DistCp bugs out trying to get a fs delegation token to perform the
cp. Just copy it
- // ourselves by hand.
- FileSystem fs = getCluster().getFileSystem();
- FileSystem localFs = FileSystem.getLocal(new Configuration(false));
-
- // Path on local fs to cp into
- Path localTmpPath = new Path(localTmp);
- localFs.mkdirs(localTmpPath);
-
- // Path in remote fs to importtable from
- Path importDir = new Path(import_);
- fs.mkdirs(importDir);
-
- // Implement a poor-man's DistCp
- try (BufferedReader reader =
- new BufferedReader(new FileReader(new File(exportDir, "distcp.txt"),
UTF_8))) {
- for (String line; (line = reader.readLine()) != null;) {
- Path exportedFile = new Path(line);
- // There isn't a cp on FileSystem??
- log.info("Copying {} to {}", line, localTmpPath);
- fs.copyToLocalFile(exportedFile, localTmpPath);
- Path tmpFile = new Path(localTmpPath, exportedFile.getName());
- log.info("Moving {} to the import directory {}", tmpFile, importDir);
- fs.moveFromLocalFile(tmpFile, importDir);
- }
+ final String table = getUniqueNames(1)[0];
+
+ ts.exec("createtable " + table + " -evc", true);
+ make10();
+ ts.exec("addsplits row5", true);
+
+ ts.exec("config -t " + table + " -s table.split.threshold=345M", true);
+ for (int i = 0; i < 50; i++) {
+ String expected = (100 + i) + "M";
+ ts.exec("config -t " + table + " -s table.split.threshold=" +
expected, true);
+ ts.exec("config -t " + table + " -np -f table.split.threshold", true,
expected, true);
+
+ ts.exec("config -t " + table + " -s table.scan.max.memory=" +
expected, true);
+ ts.exec("config -t " + table + " -np -f table.scan.max.memory", true,
expected, true);
+
+ String bExpected = ((i % 2) == 0) ? "true" : "false";
+ ts.exec("config -t " + table + " -s table.bloom.enabled=" + bExpected,
true);
+ ts.exec("config -t " + table + " -np -f table.bloom.enabled", true,
bExpected, true);
}
- } else {
- String[] distCpArgs = {"-f", exportUri + "/distcp.txt", import_};
- assertEquals(0, cp.run(distCpArgs), "Failed to run distcp: " +
Arrays.toString(distCpArgs));
}
- ts.exec("importtable " + table2 + " " + import_, true);
- Thread.sleep(100);
- ts.exec("config -t " + table2 + " -np", true, "345M", true);
- ts.exec("getsplits -t " + table2, true, "row5", true);
- ts.exec("constraint --list -t " + table2, true, "VisibilityConstraint=2",
true);
- ts.exec("online " + table, true);
- ts.exec("deletetable -f " + table, true);
- ts.exec("deletetable -f " + table2, true);
}
@Test
@@ -426,8 +456,7 @@ public class ShellServerIT extends SharedMiniClusterBase {
String expectedKey = "table.iterator.scan.cfcounter";
String expectedValue = "30," + COLUMN_FAMILY_COUNTER_ITERATOR;
- TableOperations tops = client.tableOperations();
- checkTableForProperty(tops, tableName0, expectedKey, expectedValue);
+ checkTableForProperty(client, tableName0, expectedKey, expectedValue);
ts.exec("deletetable " + tableName0, true);
@@ -441,7 +470,7 @@ public class ShellServerIT extends SharedMiniClusterBase {
ts.exec("setiter -scan -class " + COLUMN_FAMILY_COUNTER_ITERATOR + " -p
30", true);
expectedKey = "table.iterator.scan.customcfcounter";
expectedValue = "30," + COLUMN_FAMILY_COUNTER_ITERATOR;
- checkTableForProperty(tops, tableName1, expectedKey, expectedValue);
+ checkTableForProperty(client, tableName1, expectedKey, expectedValue);
ts.exec("deletetable " + tableName1, true);
@@ -453,15 +482,16 @@ public class ShellServerIT extends SharedMiniClusterBase {
// Name on the CLI should override OptionDescriber (or user input name,
in this case)
ts.exec("setiter -scan -class " + COLUMN_FAMILY_COUNTER_ITERATOR + " -p
30", true);
+
expectedKey = "table.iterator.scan.customcfcounter";
expectedValue = "30," + COLUMN_FAMILY_COUNTER_ITERATOR;
- checkTableForProperty(tops, tableName2, expectedKey, expectedValue);
+ checkTableForProperty(client, tableName2, expectedKey, expectedValue);
expectedKey = "table.iterator.scan.customcfcounter.opt.name1";
expectedValue = "value1";
- checkTableForProperty(tops, tableName2, expectedKey, expectedValue);
+ checkTableForProperty(client, tableName2, expectedKey, expectedValue);
expectedKey = "table.iterator.scan.customcfcounter.opt.name2";
expectedValue = "value2";
- checkTableForProperty(tops, tableName2, expectedKey, expectedValue);
+ checkTableForProperty(client, tableName2, expectedKey, expectedValue);
ts.exec("deletetable " + tableName2, true);
@@ -476,33 +506,24 @@ public class ShellServerIT extends SharedMiniClusterBase {
true);
expectedKey = "table.iterator.scan.cfcounter";
expectedValue = "30," + COLUMN_FAMILY_COUNTER_ITERATOR;
- checkTableForProperty(tops, tableName3, expectedKey, expectedValue);
+ checkTableForProperty(client, tableName3, expectedKey, expectedValue);
expectedKey = "table.iterator.scan.cfcounter.opt.name1";
expectedValue = "value1.1,value1.2,value1.3";
- checkTableForProperty(tops, tableName3, expectedKey, expectedValue);
+ checkTableForProperty(client, tableName3, expectedKey, expectedValue);
expectedKey = "table.iterator.scan.cfcounter.opt.name2";
expectedValue = "value2";
- checkTableForProperty(tops, tableName3, expectedKey, expectedValue);
+ checkTableForProperty(client, tableName3, expectedKey, expectedValue);
ts.exec("deletetable " + tableName3, true);
-
}
}
- protected void checkTableForProperty(TableOperations tops, String tableName,
String expectedKey,
- String expectedValue) throws Exception {
- for (int i = 0; i < 5; i++) {
- for (Entry<String,String> entry : tops.getProperties(tableName)) {
- if (expectedKey.equals(entry.getKey())) {
- assertEquals(expectedValue, entry.getValue());
- return;
- }
- }
- Thread.sleep(500);
- }
-
- fail("Failed to find expected property on " + tableName + ": " +
expectedKey + "="
- + expectedValue);
+ protected void checkTableForProperty(final AccumuloClient client, final
String tableName,
+ final String expectedKey, final String expectedValue) throws Exception {
+ assertTrue(
+ Wait.waitFor(() ->
client.tableOperations().getConfiguration(tableName).get(expectedKey)
+ .equals(expectedValue), 5000, 500),
+ "Failed to find expected value for key: " + expectedKey);
}
@Test