This is an automated email from the ASF dual-hosted git repository.
dlmarion pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/2.1 by this push:
new 13ce0d1f27 Cache Hadoop config in getVolumeManagerConfiguration (#3706)
13ce0d1f27 is described below
commit 13ce0d1f272950dd5a1a1f882d4ab6b65b1e1419
Author: Dave Marion <[email protected]>
AuthorDate: Mon Aug 21 14:04:00 2023 -0400
Cache Hadoop config in getVolumeManagerConfiguration (#3706)
Cache Hadoop Configuration in getVolumeManagerConfiguration
so as not to call new Configuration(Configuration) frequently. Calling
this version of the constructor causes a synchronization point across
many threads.
Co-authored-by: Christopher Tubbs <[email protected]>
---
.../accumulo/server/fs/VolumeManagerImpl.java | 43 ++++++++++++++++------
.../accumulo/server/fs/VolumeManagerImplTest.java | 39 ++++++++++++++++++++
2 files changed, 71 insertions(+), 11 deletions(-)
diff --git
a/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java
b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java
index 29e1f03d44..51ba4d4c9e 100644
---
a/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java
+++
b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java
@@ -38,12 +38,14 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.DefaultConfiguration;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.spi.fs.VolumeChooser;
+import org.apache.accumulo.core.util.Pair;
import org.apache.accumulo.core.util.threads.ThreadPools;
import org.apache.accumulo.core.volume.Volume;
import org.apache.accumulo.core.volume.VolumeConfiguration;
@@ -66,6 +68,8 @@ import
org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
@@ -75,6 +79,9 @@ public class VolumeManagerImpl implements VolumeManager {
private static final HashSet<String> WARNED_ABOUT_SYNCONCLOSE = new
HashSet<>();
+ private static final Cache<Pair<Configuration,String>,Configuration>
HDFS_CONFIGS_FOR_VOLUME =
+ Caffeine.newBuilder().expireAfterWrite(24, TimeUnit.HOURS).build();
+
private final Map<String,Volume> volumesByName;
private final Multimap<URI,Volume> volumesByFileSystemUri;
private final VolumeChooser chooser;
@@ -377,27 +384,41 @@ public class VolumeManagerImpl implements VolumeManager {
* </pre>
*
* We will use these properties to return a new Configuration object that
can be used with the
- * FileSystem URI.
+ * FileSystem URI to override properties in the original Configuration. If
these properties are
+ * not set for a volume, then the original Configuration is returned. If
they are set, a new
+ * Configuration is created with the overridden properties set. In either
case, the returned
+ * Configuration is cached, to avoid unnecessary recomputation. This works
because these override
+ * properties are instance properties and cannot change while the system is
running.
*
* @param conf AccumuloConfiguration object
* @param hadoopConf Hadoop Configuration object
* @param filesystemURI Volume Filesystem URI
* @return Hadoop Configuration with custom overrides for this FileSystem
*/
- private static Configuration
getVolumeManagerConfiguration(AccumuloConfiguration conf,
+ protected static Configuration
getVolumeManagerConfiguration(AccumuloConfiguration conf,
final Configuration hadoopConf, final String filesystemURI) {
- final Configuration volumeConfig = new Configuration(hadoopConf);
+ final var cacheKey = new Pair<>(hadoopConf, filesystemURI);
+ return HDFS_CONFIGS_FOR_VOLUME.get(cacheKey, (key) -> {
+
+ Map<String,String> volumeHdfsConfigOverrides =
+
conf.getAllPropertiesWithPrefixStripped(Property.INSTANCE_VOLUME_CONFIG_PREFIX).entrySet()
+ .stream().filter(e -> e.getKey().startsWith(filesystemURI + "."))
+ .collect(Collectors.toUnmodifiableMap(
+ e -> e.getKey().substring(filesystemURI.length() + 1),
Entry::getValue));
-
conf.getAllPropertiesWithPrefixStripped(Property.INSTANCE_VOLUME_CONFIG_PREFIX).entrySet()
- .stream().filter(e -> e.getKey().startsWith(filesystemURI +
".")).forEach(e -> {
- String key = e.getKey().substring(filesystemURI.length() + 1);
- String value = e.getValue();
- log.info("Overriding property {} for volume {}", key, value,
filesystemURI);
- volumeConfig.set(key, value);
- });
+ // use the original if no overrides exist
+ if (volumeHdfsConfigOverrides.isEmpty()) {
+ return hadoopConf;
+ }
- return volumeConfig;
+ Configuration volumeConfig = new Configuration(hadoopConf);
+ volumeHdfsConfigOverrides.forEach((k, v) -> {
+ log.info("Overriding property {}={} for volume {}", k, v,
filesystemURI);
+ volumeConfig.set(k, v);
+ });
+ return volumeConfig;
+ });
}
protected static Stream<Entry<String,String>>
diff --git
a/server/base/src/test/java/org/apache/accumulo/server/fs/VolumeManagerImplTest.java
b/server/base/src/test/java/org/apache/accumulo/server/fs/VolumeManagerImplTest.java
index a112a3c906..980526dc3e 100644
---
a/server/base/src/test/java/org/apache/accumulo/server/fs/VolumeManagerImplTest.java
+++
b/server/base/src/test/java/org/apache/accumulo/server/fs/VolumeManagerImplTest.java
@@ -24,7 +24,9 @@ import static
org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_REPLICATION
import static
org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.HedgedRead.THREADPOOL_SIZE_KEY;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNotSame;
import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.fail;
@@ -37,6 +39,7 @@ import java.util.Set;
import java.util.stream.Collectors;
import org.apache.accumulo.core.conf.ConfigurationCopy;
+import org.apache.accumulo.core.conf.DefaultConfiguration;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.spi.common.ServiceEnvironment;
@@ -142,6 +145,42 @@ public class VolumeManagerImplTest {
assertEquals("20", e.getValue());
}
+ @Test
+ public void testGetVolumeManagerConfiguration() throws Exception {
+
+ final ConfigurationCopy accumuloConf =
+ new ConfigurationCopy(DefaultConfiguration.getInstance());
+ final Configuration hadoopConf = new Configuration();
+ final String fileSystem = "file://127.0.0.1/vol1/";
+
+ final Configuration volumeConfig =
+ VolumeManagerImpl.getVolumeManagerConfiguration(accumuloConf,
hadoopConf, fileSystem);
+ assertSame(volumeConfig, hadoopConf);
+
+ accumuloConf.set(Property.INSTANCE_VOLUME_CONFIG_PREFIX.getKey() +
fileSystem + "."
+ + DFS_CLIENT_CACHE_DROP_BEHIND_READS, "true");
+
+ Configuration hadoopConf2 = new Configuration(hadoopConf);
+ final Configuration volumeConfig2 =
+ VolumeManagerImpl.getVolumeManagerConfiguration(accumuloConf,
hadoopConf2, fileSystem);
+
+ assertNotSame(volumeConfig2, hadoopConf); // false because of the
additional property
+ assertNotSame(volumeConfig, volumeConfig2); // false because of the
additional property
+ assertEquals("true",
volumeConfig2.get(DFS_CLIENT_CACHE_DROP_BEHIND_READS));
+
+ final Configuration volumeConfig3 =
+ VolumeManagerImpl.getVolumeManagerConfiguration(accumuloConf,
volumeConfig2, fileSystem);
+ assertEquals("true",
volumeConfig3.get(DFS_CLIENT_CACHE_DROP_BEHIND_READS));
+ // false because of the different Hadoop configuration input
+ assertNotSame(volumeConfig3, volumeConfig2);
+
+ final Configuration volumeConfig4 =
+ VolumeManagerImpl.getVolumeManagerConfiguration(accumuloConf,
hadoopConf2, fileSystem);
+ assertEquals("true",
volumeConfig4.get(DFS_CLIENT_CACHE_DROP_BEHIND_READS));
+ // true because of the same hadoop configuration input
+ assertSame(volumeConfig4, volumeConfig2);
+ }
+
@Test
public void testConfigurationOverrides() throws Exception {