This is an automated email from the ASF dual-hosted git repository.
ctubbsii pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo-classloaders.git
The following commit(s) were added to refs/heads/main by this push:
new f325f87 Use hard-links to facilitate easier cleanup (#58)
f325f87 is described below
commit f325f87a89af1c68b0c1b6e15a2209280f7dc1c4
Author: Christopher Tubbs <[email protected]>
AuthorDate: Wed Feb 4 10:44:29 2026 -0500
Use hard-links to facilitate easier cleanup (#58)
* Remove JMX features (use release version of Accumulo)
* Update README to explain different directories and how they are used
and when they can be cleaned up
* Use concrete class for cache key instead of String
* Use hard links before classloader construction, to ensure the shared
resources directory is cleanable
* Use temp file/dir creation utilities to get unique names, rather than UUID
* Remove redundant NIO options
* Adjust file naming conventions to account for new `working` directory
* exclude unused native map from test dependencies
* handle race condition during clean up by implementing a re-download of
the context files if necessary
* retain test cases for checking files in use by listing the
hard-linked files
* Add test for automatic cleanup of hard-link dirs
* Remove URLClassLoaderParams and use LocalStore as param
* Use ContextDefinition for creating the hard links
* Discard any failed hard-link directories and start over with a new
hard link directory after ensuring all resources are fetched
* Move log message to where hard-linked paths get added to classpath
---
.github/workflows/maven-on-demand.yaml | 2 +-
.github/workflows/maven.yaml | 2 +-
modules/local-caching-classloader/README.md | 82 ++++++++----
modules/local-caching-classloader/pom.xml | 6 +
.../lcc/LocalCachingContextClassLoaderFactory.java | 63 ++--------
.../lcc/definition/ContextDefinition.java | 1 +
.../classloader/lcc/jmx/ContextClassLoaders.java | 33 -----
.../lcc/jmx/ContextClassLoadersMXBean.java | 35 ------
.../classloader/lcc/util/ContextCacheKey.java | 67 ++++++++++
.../accumulo/classloader/lcc/util/LccUtils.java | 86 ++++++++++++-
.../accumulo/classloader/lcc/util/LocalStore.java | 138 ++++++++++++++-------
.../LocalCachingContextClassLoaderFactoryTest.java | 16 ++-
.../MiniAccumuloClusterClassLoaderFactoryTest.java | 122 +++++-------------
.../apache/accumulo/classloader/lcc/TestUtils.java | 3 +-
.../classloader/lcc/util/LocalStoreTest.java | 72 ++++++++---
pom.xml | 2 +-
16 files changed, 406 insertions(+), 324 deletions(-)
diff --git a/.github/workflows/maven-on-demand.yaml
b/.github/workflows/maven-on-demand.yaml
index ca60d24..5bdd47c 100644
--- a/.github/workflows/maven-on-demand.yaml
+++ b/.github/workflows/maven-on-demand.yaml
@@ -50,7 +50,7 @@ jobs:
timeout-minutes: 345
run: mvn -B -V -e -ntp "-Dstyle.color=always" ${{
github.event.inputs.goals }}
env:
- MAVEN_OPTS: -Djansi.force=true -Dapache.snapshots=true
+ MAVEN_OPTS: -Djansi.force=true
- name: Upload unit test results
if: ${{ failure() }}
uses: actions/upload-artifact@v4
diff --git a/.github/workflows/maven.yaml b/.github/workflows/maven.yaml
index 434a20e..099e04f 100644
--- a/.github/workflows/maven.yaml
+++ b/.github/workflows/maven.yaml
@@ -45,7 +45,7 @@ jobs:
- name: Build with Maven (verify javadoc:jar)
run: mvn -B -V -e -ntp "-Dstyle.color=always" verify javadoc:jar
-DskipFormat -DverifyFormat
env:
- MAVEN_OPTS: -Djansi.force=true -Dapache.snapshots=true
+ MAVEN_OPTS: -Djansi.force=true
- name: Upload unit test results
if: ${{ failure() }}
uses: actions/upload-artifact@v4
diff --git a/modules/local-caching-classloader/README.md
b/modules/local-caching-classloader/README.md
index 988e947..02651f8 100644
--- a/modules/local-caching-classloader/README.md
+++ b/modules/local-caching-classloader/README.md
@@ -114,10 +114,16 @@ The local storage cache location is configured by the
user by setting the
required Accumulo property named `general.custom.classloader.lcc.cache.dir` to
a directory on the local filesystem. This location may be specified as an
absolute path or as a URL representing an absolute path with the `file` scheme.
+The location, and its directory structure will be created on first use, if it
+does not already exist. This will cause an error if the application does not
+have permission to create the directories.
The selected location should be a persistent location with plenty of space to
store downloaded resources (usually jar files), and should be writable by all
-the processes which use this factory to share the same resources.
+the processes which use this factory to share the same resources. You may wish
+to pre-create the base directory specified by the property, and the three
+sub-directories, `contexts`, `resources`, and `working`, to set the appropriate
+permissions and ACLs.
Resources downloaded to this cache may be used by multiple contexts, threads,
and processes, so be very careful when removing old contents to ensure that
@@ -129,7 +135,7 @@ unexpected behavior to classloaders still using the file.
* Do **NOT** use a temporary directory for the local storage cache location.
* The local storage cache location **MUST** use a filesystem that supports
- atomic moves.
+ atomic moves and hard links.
## Security
@@ -217,28 +223,56 @@ constructed at that time.
## Cleanup
Because the cache directory is shared among multiple processes, and one process
-can't know what the other processes are doing, this class cannot clean up the
-shared cache directory of unused resources. It is left to the user to remove
-unused files from the cache. While the context definition JSON files are always
-safe to delete, it is not recommended to do so for any that are still in use,
-because they can be useful for troubleshooting.
-
-To aid in this task, a JMX MXBean has been created to expose the files that are
-still referenced by the classloaders that have been created by this factory and
-currently still exist in the system. For an example of how to use this MXBean,
-please see the test method
-`MiniAccumuloClusterClassLoaderFactoryTest.getReferencedFiles`. This method
-attaches to the local Accumulo JVM processes to get the set of referenced
-files. It should be safe to delete files that are located in the local cache
-directory (set by property `general.custom.classloader.lcc.cache.dir`) that are
-NOT in the set of referenced files, so long as no new classloaders have been
-created that reference the files being deleted.
-
-**IMPORTANT**: as mentioned earlier, it is not safe to delete resource files
-that are still referenced by any `ClassLoader` instances. Each `ClassLoader`
-instance assumes that the locally cached resources exist and can be read. They
-will not attempt to download any files. Downloading files only occurs when
-`ClassLoader` instances are initially created for a context definition.
+can't know what the other processes are doing, this class cannot always clean
+up the shared cache directory of unused resources automatically. It is left to
+the user to remove unused files from the cache. The local storage is organized
+into several directories, which are explained here to aid in understanding when
+unused files can be safely removed.
+
+### Contexts
+
+The `contexts` directory contents are always safe to delete. These contain only
+copies of the JSON files from which a ClassLoader was constructed. It is never
+used by this factory, and copies are placed here solely to provide more
+information to a user. Because these files are small text files and contain
+useful information, it is generally not recommended to delete these, because it
+may impair troubleshooting.
+
+### Resources
+
+The `resources` directory contains a shared pool of remote resource files that
+have been fetched for all contexts (typically, `.jar` files). The files in this
+directory are generally safe to delete any time. However, some considerations
+should be made:
+
+1. Deleting resources that are still needed will cause them to be downloaded
+ again the next time they are needed, which may cause an increase in network
+ activity.
+2. If any of the removed files had hard-linked "copies" in the `working`
+ directory, the newly downloaded copy will increase the total amount of
+ storage (whereas the original would have shared storage space with the
+ hard-linked "copies").
+
+### Working
+
+The `working` directory contains temporary files for files currently being
+downloaded, and temporary directories containing hard-linked "copies" of files
+from the `resources` directory. These files and directories contain the process
+ID (PID) for the process that created them. Normally, these files are
+automatically cleaned up, but if a process is killed before that can happen,
+they may be left behind. The files with the PID in them can safely be removed,
+so long as the process that created them has been terminated.
+
+This directory also contains files that do not contain a PID. These files end
+with the `.downloading` suffix and exist to signal that a resource file is
+currently being downloaded by a process. These files are very small, containing
+only the PID of the most recent process to attempt downloading the file. They
+are removed when a download completes, or whenever the next time the
+corresponding resource file is used, if it has already been successfully
+downloaded by a previously failed process. Removing them won't break the
+application in any way, but doing so may result in a redundant download, which
+can result in increased network activity or storage space (see the previous
+section for considerations regarding the `resources` directory).
## Accumulo Configuration
diff --git a/modules/local-caching-classloader/pom.xml
b/modules/local-caching-classloader/pom.xml
index 5f0ad98..bd73a78 100644
--- a/modules/local-caching-classloader/pom.xml
+++ b/modules/local-caching-classloader/pom.xml
@@ -110,6 +110,12 @@
<groupId>org.apache.accumulo</groupId>
<artifactId>accumulo-test</artifactId>
<scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.accumulo</groupId>
+ <artifactId>accumulo-native</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
diff --git
a/modules/local-caching-classloader/src/main/java/org/apache/accumulo/classloader/lcc/LocalCachingContextClassLoaderFactory.java
b/modules/local-caching-classloader/src/main/java/org/apache/accumulo/classloader/lcc/LocalCachingContextClassLoaderFactory.java
index f0d54b0..853a683 100644
---
a/modules/local-caching-classloader/src/main/java/org/apache/accumulo/classloader/lcc/LocalCachingContextClassLoaderFactory.java
+++
b/modules/local-caching-classloader/src/main/java/org/apache/accumulo/classloader/lcc/LocalCachingContextClassLoaderFactory.java
@@ -22,15 +22,12 @@ import static java.util.Objects.requireNonNull;
import java.io.IOException;
import java.io.UncheckedIOException;
-import java.lang.management.ManagementFactory;
import java.net.URISyntaxException;
import java.net.URL;
import java.net.URLClassLoader;
import java.nio.file.Path;
import java.time.Duration;
-import java.util.ArrayList;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
@@ -41,16 +38,9 @@ import java.util.function.BiConsumer;
import java.util.function.Supplier;
import java.util.regex.Pattern;
-import javax.management.InstanceAlreadyExistsException;
-import javax.management.MBeanRegistrationException;
-import javax.management.MBeanServer;
-import javax.management.MalformedObjectNameException;
-import javax.management.NotCompliantMBeanException;
-
import org.apache.accumulo.classloader.lcc.definition.ContextDefinition;
import org.apache.accumulo.classloader.lcc.definition.Resource;
-import org.apache.accumulo.classloader.lcc.jmx.ContextClassLoaders;
-import org.apache.accumulo.classloader.lcc.jmx.ContextClassLoadersMXBean;
+import org.apache.accumulo.classloader.lcc.util.ContextCacheKey;
import org.apache.accumulo.classloader.lcc.util.DeduplicationCache;
import org.apache.accumulo.classloader.lcc.util.LccUtils;
import org.apache.accumulo.classloader.lcc.util.LocalStore;
@@ -113,7 +103,7 @@ public class LocalCachingContextClassLoaderFactory
implements ContextClassLoader
// to keep this coherent with the contextDefs, updates to this should be
done in the compute
// method of contextDefs
- private static final DeduplicationCache<String,URL[],URLClassLoader>
classloaders =
+ private final DeduplicationCache<ContextCacheKey,LocalStore,URLClassLoader>
classloaders =
new DeduplicationCache<>(LccUtils::createClassLoader,
Duration.ofHours(24), null);
private final AtomicReference<LocalStore> localStore = new
AtomicReference<>();
@@ -194,16 +184,6 @@ public class LocalCachingContextClassLoaderFactory
implements ContextClassLoader
throw new UncheckedIOException("Unable to create the local storage area
at " + baseCacheDir,
e);
}
- try {
- MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
- mbs.registerMBean(new ContextClassLoaders(),
ContextClassLoadersMXBean.getObjectName());
- } catch (MalformedObjectNameException | MBeanRegistrationException
- | NotCompliantMBeanException e) {
- throw new IllegalStateException("Error registering MBean", e);
- } catch (InstanceAlreadyExistsException e) {
- // instance was re-init'd. This is likely to happen during tests
- // can ignore as no issue here
- }
}
@Override
@@ -243,13 +223,11 @@ public class LocalCachingContextClassLoaderFactory
implements ContextClassLoader
} else {
computedDefinition = previousDefinition;
}
- final URLClassLoader classloader = classloaders
- .computeIfAbsent(newCacheKey(contextLocation, computedDefinition),
(Supplier<URL[]>) () -> {
- try {
- return localStore.get().storeContextResources(computedDefinition);
- } catch (IOException e) {
- throw new UncheckedIOException(e);
- }
+ final URLClassLoader classloader = classloaders.computeIfAbsent(
+ new ContextCacheKey(contextLocation, computedDefinition),
(Supplier<LocalStore>) () -> {
+ var s = localStore.get();
+ s.storeContextResources(computedDefinition);
+ return s;
});
resultHolder.set(classloader);
return computedDefinition;
@@ -262,18 +240,6 @@ public class LocalCachingContextClassLoaderFactory
implements ContextClassLoader
return ContextDefinition.fromRemoteURL(url);
}
- private static String newCacheKey(String contextLocation, ContextDefinition
contextDefinition) {
- // the location is between the first left parenthesis and the last right
parenthesis
- return contextDefinition.getChecksumAlgorithm() + " (" + contextLocation +
") = "
- + contextDefinition.getChecksum();
- }
-
- private static boolean cacheKeyMatchesContextLocation(String cacheKey,
String contextLocation) {
- // extract the location from the parentheses in the cacheKey
- return cacheKey.substring(cacheKey.indexOf('(') + 1,
cacheKey.lastIndexOf(')'))
- .equals(contextLocation);
- }
-
private void checkMonitoredLocation(String contextLocation, long interval) {
ContextDefinition currentDef =
contextDefs.compute(contextLocation, (contextLocationKey,
previousDefinition) -> {
@@ -286,8 +252,7 @@ public class LocalCachingContextClassLoaderFactory
implements ContextClassLoader
}
// check for any classloaders still in the cache that were created
for a context
// definition found at this URL
- if (!classloaders
- .anyMatch(cacheKey -> cacheKeyMatchesContextLocation(cacheKey,
contextLocation))) {
+ if (!classloaders.anyMatch(cacheKey ->
cacheKey.getLocation().equals(contextLocation))) {
LOG.debug("ClassLoader for context {} not present, no longer
monitoring for changes",
contextLocation);
return null;
@@ -358,16 +323,4 @@ public class LocalCachingContextClassLoaderFactory
implements ContextClassLoader
}
}
- public static Map<String,List<String>> getReferencedFiles() {
- final Map<String,List<String>> referencedContexts = new HashMap<>();
- classloaders.forEach((cacheKey, cl) -> {
- List<String> files = new ArrayList<>();
- for (URL u : cl.getURLs()) {
- files.add(u.toString());
- }
- referencedContexts.put(cacheKey, files);
- });
- return referencedContexts;
- }
-
}
diff --git
a/modules/local-caching-classloader/src/main/java/org/apache/accumulo/classloader/lcc/definition/ContextDefinition.java
b/modules/local-caching-classloader/src/main/java/org/apache/accumulo/classloader/lcc/definition/ContextDefinition.java
index 2e42cec..962abd2 100644
---
a/modules/local-caching-classloader/src/main/java/org/apache/accumulo/classloader/lcc/definition/ContextDefinition.java
+++
b/modules/local-caching-classloader/src/main/java/org/apache/accumulo/classloader/lcc/definition/ContextDefinition.java
@@ -113,6 +113,7 @@ public class ContextDefinition implements KeywordExecutable
{
justification = "user-supplied URL is the intended functionality")
public static ContextDefinition create(int monitorIntervalSecs, String
algorithm, URL... sources)
throws IOException {
+ // use a LinkedHashSet to preserve the order of the context resources
LinkedHashSet<Resource> resources = new LinkedHashSet<>();
for (URL u : sources) {
try (InputStream is = new BufferedInputStream(u.openStream())) {
diff --git
a/modules/local-caching-classloader/src/main/java/org/apache/accumulo/classloader/lcc/jmx/ContextClassLoaders.java
b/modules/local-caching-classloader/src/main/java/org/apache/accumulo/classloader/lcc/jmx/ContextClassLoaders.java
deleted file mode 100644
index 4a2a0a9..0000000
---
a/modules/local-caching-classloader/src/main/java/org/apache/accumulo/classloader/lcc/jmx/ContextClassLoaders.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * https://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.accumulo.classloader.lcc.jmx;
-
-import java.util.List;
-import java.util.Map;
-
-import
org.apache.accumulo.classloader.lcc.LocalCachingContextClassLoaderFactory;
-
-public class ContextClassLoaders implements ContextClassLoadersMXBean {
-
- @Override
- public Map<String,List<String>> getReferencedFiles() {
- return LocalCachingContextClassLoaderFactory.getReferencedFiles();
- }
-
-}
diff --git
a/modules/local-caching-classloader/src/main/java/org/apache/accumulo/classloader/lcc/jmx/ContextClassLoadersMXBean.java
b/modules/local-caching-classloader/src/main/java/org/apache/accumulo/classloader/lcc/jmx/ContextClassLoadersMXBean.java
deleted file mode 100644
index 753f3b9..0000000
---
a/modules/local-caching-classloader/src/main/java/org/apache/accumulo/classloader/lcc/jmx/ContextClassLoadersMXBean.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * https://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.accumulo.classloader.lcc.jmx;
-
-import java.util.List;
-import java.util.Map;
-
-import javax.management.MalformedObjectNameException;
-import javax.management.ObjectName;
-
-public interface ContextClassLoadersMXBean {
-
- static ObjectName getObjectName() throws MalformedObjectNameException {
- return new
ObjectName("org.apache.accumulo.classloader:type=ContextClassLoaders");
- }
-
- Map<String,List<String>> getReferencedFiles();
-
-}
diff --git
a/modules/local-caching-classloader/src/main/java/org/apache/accumulo/classloader/lcc/util/ContextCacheKey.java
b/modules/local-caching-classloader/src/main/java/org/apache/accumulo/classloader/lcc/util/ContextCacheKey.java
new file mode 100644
index 0000000..f9f402e
--- /dev/null
+++
b/modules/local-caching-classloader/src/main/java/org/apache/accumulo/classloader/lcc/util/ContextCacheKey.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.classloader.lcc.util;
+
+import static java.util.Objects.hash;
+import static java.util.Objects.requireNonNull;
+
+import java.util.Objects;
+
+import org.apache.accumulo.classloader.lcc.definition.ContextDefinition;
+
+public class ContextCacheKey {
+ private final String location;
+ private final ContextDefinition definition;
+
+ public ContextCacheKey(String location, ContextDefinition definition) {
+ this.location = requireNonNull(location);
+ this.definition = requireNonNull(definition);
+ }
+
+ public String getLocation() {
+ return location;
+ }
+
+ public ContextDefinition getContextDefinition() {
+ return definition;
+ }
+
+ @Override
+ public int hashCode() {
+ return hash(location, definition);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj instanceof ContextCacheKey) {
+ var o = (ContextCacheKey) obj;
+ return Objects.equals(location, o.location) &&
Objects.equals(definition, o.definition);
+ }
+ return false;
+ }
+
+ @Override
+ public String toString() {
+ return definition.getChecksumAlgorithm() + " (" + location + ") = " +
definition.getChecksum();
+ }
+
+}
diff --git
a/modules/local-caching-classloader/src/main/java/org/apache/accumulo/classloader/lcc/util/LccUtils.java
b/modules/local-caching-classloader/src/main/java/org/apache/accumulo/classloader/lcc/util/LccUtils.java
index a971e81..76bd939 100644
---
a/modules/local-caching-classloader/src/main/java/org/apache/accumulo/classloader/lcc/util/LccUtils.java
+++
b/modules/local-caching-classloader/src/main/java/org/apache/accumulo/classloader/lcc/util/LccUtils.java
@@ -18,11 +18,23 @@
*/
package org.apache.accumulo.classloader.lcc.util;
+import java.io.File;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.lang.ref.Cleaner;
+import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLClassLoader;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Comparator;
+import java.util.LinkedHashSet;
import java.util.concurrent.ConcurrentHashMap;
import
org.apache.accumulo.classloader.lcc.LocalCachingContextClassLoaderFactory;
+import org.apache.accumulo.classloader.lcc.definition.ContextDefinition;
+import org.apache.accumulo.classloader.lcc.definition.Resource;
+import
org.apache.accumulo.classloader.lcc.util.LocalStore.HardLinkFailedException;
import org.apache.commons.codec.digest.DigestUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -33,18 +45,86 @@ public class LccUtils {
private static final Logger LOG = LoggerFactory.getLogger(LccUtils.class);
private static final ConcurrentHashMap<String,DigestUtils> DIGESTERS = new
ConcurrentHashMap<>();
+ private static final Cleaner CLEANER = Cleaner.create();
// keep at most one DigestUtils instance for each algorithm
public static DigestUtils getDigester(String algorithm) {
return DIGESTERS.computeIfAbsent(algorithm, DigestUtils::new);
}
+ private static String checksumForFileName(String algorithm, String checksum)
{
+ return algorithm.replace('/', '_') + "-" + checksum;
+ }
+
+ public static String checksumForFileName(ContextDefinition definition) {
+ return checksumForFileName(definition.getChecksumAlgorithm(),
definition.getChecksum());
+ }
+
+ public static String checksumForFileName(Resource definition) {
+ return checksumForFileName(definition.getAlgorithm(),
definition.getChecksum());
+ }
+
@SuppressFBWarnings(value = "DP_CREATE_CLASSLOADER_INSIDE_DO_PRIVILEGED",
justification = "doPrivileged is deprecated without replacement and
removed in newer Java")
- public static URLClassLoader createClassLoader(String name, URL[] urls) {
- final var cl = new URLClassLoader(name, urls,
+ public static URLClassLoader createClassLoader(ContextCacheKey cacheKey,
LocalStore localStore) {
+ // use a LinkedHashSet to preserve the order of the context resources
+ final var hardLinks = new LinkedHashSet<Path>();
+ Path hardLinksDir = null;
+
+ // keep trying to hard-link all the resources if the hard-linking fails
+ while (hardLinksDir == null) {
+ hardLinks.clear();
+ try {
+ hardLinksDir =
+ localStore.createWorkingHardLinks(cacheKey.getContextDefinition(),
hardLinks::add);
+ LOG.trace("Created hard links at {} for context {}", hardLinksDir,
cacheKey);
+ } catch (HardLinkFailedException e) {
+ var failedHardLinksDir = e.getDestinationDirectory();
+ LOG.warn(
+ "Exception creating a hard link in {} due to missing resource {};
attempting re-download of context resources",
+ failedHardLinksDir, e.getMissingResource(), e);
+ try {
+ LccUtils.recursiveDelete(failedHardLinksDir);
+ } catch (IOException ioe) {
+ LOG.warn(
+ "Saw exception removing directory {} after hard link creation
failure; this should be cleaned up manually",
+ failedHardLinksDir, ioe);
+ }
+ localStore.storeContextResources(cacheKey.getContextDefinition());
+ }
+ }
+
+ URL[] urls = hardLinks.stream().map(p -> {
+ try {
+ LOG.trace("Added resource {} to classpath", p);
+ return p.toUri().toURL();
+ } catch (MalformedURLException e) {
+ // shouldn't be possible, since these are file-based URLs
+ throw new UncheckedIOException(e);
+ }
+ }).toArray(URL[]::new);
+
+ final var cl = new URLClassLoader(cacheKey.toString(), urls,
LocalCachingContextClassLoaderFactory.class.getClassLoader());
- LOG.info("New classloader created for {}", name);
+ LOG.info("New classloader created for {}", cacheKey);
+
+ final var cleanDir = hardLinksDir;
+ CLEANER.register(cl, () -> {
+ try {
+ LccUtils.recursiveDelete(cleanDir);
+ } catch (IOException e) {
+ LOG.warn("Saw exception when executing cleaner on directory {}",
cleanDir, e);
+ }
+ });
return cl;
}
+
+ public static void recursiveDelete(Path directory) throws IOException {
+ if (Files.exists(directory)) {
+ try (var walker = Files.walk(directory)) {
+
walker.map(Path::toFile).sorted(Comparator.reverseOrder()).forEach(File::delete);
+ }
+ }
+ }
+
}
diff --git
a/modules/local-caching-classloader/src/main/java/org/apache/accumulo/classloader/lcc/util/LocalStore.java
b/modules/local-caching-classloader/src/main/java/org/apache/accumulo/classloader/lcc/util/LocalStore.java
index 40a143a..d3243c8 100644
---
a/modules/local-caching-classloader/src/main/java/org/apache/accumulo/classloader/lcc/util/LocalStore.java
+++
b/modules/local-caching-classloader/src/main/java/org/apache/accumulo/classloader/lcc/util/LocalStore.java
@@ -22,9 +22,8 @@ import static java.nio.charset.StandardCharsets.UTF_8;
import static java.nio.file.StandardCopyOption.ATOMIC_MOVE;
import static java.nio.file.StandardOpenOption.CREATE_NEW;
import static java.nio.file.StandardOpenOption.SYNC;
-import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING;
-import static java.nio.file.StandardOpenOption.WRITE;
import static java.util.Objects.requireNonNull;
+import static
org.apache.accumulo.classloader.lcc.util.LccUtils.checksumForFileName;
import static org.apache.accumulo.classloader.lcc.util.LccUtils.getDigester;
import java.io.BufferedInputStream;
@@ -32,21 +31,18 @@ import java.io.BufferedOutputStream;
import java.io.Closeable;
import java.io.IOException;
import java.io.UncheckedIOException;
-import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLClassLoader;
import java.nio.file.FileAlreadyExistsException;
import java.nio.file.Files;
import java.nio.file.NoSuchFileException;
import java.nio.file.Path;
-import java.util.LinkedHashSet;
-import java.util.Set;
-import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.BiConsumer;
+import java.util.function.Consumer;
import java.util.regex.Pattern;
import org.apache.accumulo.classloader.lcc.definition.ContextDefinition;
@@ -92,15 +88,16 @@ public final class LocalStore {
private final Path contextsDir;
private final Path resourcesDir;
+ private final Path workingDir;
private final BiConsumer<String,URL> allowedUrlChecker;
public LocalStore(final Path baseDir, final BiConsumer<String,URL>
allowedUrlChecker)
throws IOException {
- this.contextsDir =
requireNonNull(baseDir).toAbsolutePath().resolve("contexts");
- this.resourcesDir = baseDir.resolve("resources");
+ requireNonNull(baseDir);
this.allowedUrlChecker = requireNonNull(allowedUrlChecker);
- Files.createDirectories(contextsDir);
- Files.createDirectories(resourcesDir);
+ this.contextsDir = Files.createDirectories(baseDir.resolve("contexts"));
+ this.resourcesDir = Files.createDirectories(baseDir.resolve("resources"));
+ this.workingDir = Files.createDirectories(baseDir.resolve("working"));
}
Path contextsDir() {
@@ -111,6 +108,10 @@ public final class LocalStore {
return resourcesDir;
}
+ Path workingDir() {
+ return workingDir;
+ }
+
// pattern to match regular files that have at least one non-dot character
preceding a dot and a
// non-zero suffix; these files can be easily converted so the local store
retains the original
// file name extension, while non-matching files will not attempt to retain
the original file name
@@ -120,7 +121,7 @@ public final class LocalStore {
public static String localResourceName(Resource r) {
requireNonNull(r);
String remoteFileName = r.getFileName();
- String checksum = checksumForFileName(r.getAlgorithm(), r.getChecksum());
+ String checksum = checksumForFileName(r);
var matcher = fileNamesWithExtensionPattern.matcher(remoteFileName);
if (matcher.matches()) {
return String.format("%s-%s.%s", matcher.group(1), checksum,
matcher.group(2));
@@ -128,54 +129,57 @@ public final class LocalStore {
return String.format("%s-%s", remoteFileName, checksum);
}
- private static String tempName(String baseName) {
- return "." + requireNonNull(baseName) + "_PID" + PID + "_" +
UUID.randomUUID() + ".tmp";
+ // creates a new empty file with a unique name, for use as a temporary file
+ @SuppressFBWarnings(value = "PATH_TRAVERSAL_IN",
+ justification = "the working directory is intentionally controlled by
the user config")
+ private Path createTempFile(String baseName) {
+ try {
+ return Files.createTempFile(workingDir, "PID_" + PID + "_" + baseName +
"_", ".tmp");
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
}
- static String checksumForFileName(String algorithm, String checksum) {
- return algorithm.replace('/', '_') + "-" + checksum;
+ // creates a new empty directory with a unique name, for use as a temporary
directory
+ @SuppressFBWarnings(value = "PATH_TRAVERSAL_IN",
+ justification = "the working directory is intentionally controlled by
the user config")
+ private Path createTempDirectory(String baseName) {
+ try {
+ return Files.createTempDirectory(workingDir, "PID_" + PID + "_" +
baseName + "_");
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
}
/**
* Save the {@link ContextDefinition} to the contexts directory, and all of
its resources to the
* resources directory.
*/
- public URL[] storeContextResources(final ContextDefinition
contextDefinition) throws IOException {
+ public void storeContextResources(final ContextDefinition contextDefinition)
{
requireNonNull(contextDefinition, "definition must be supplied");
- // use a LinkedHashSet to preserve the order of the context resources
- final Set<Path> localFiles = new LinkedHashSet<>();
- final String destinationName =
checksumForFileName(contextDefinition.getChecksumAlgorithm(),
- contextDefinition.getChecksum()) + ".json";
+ final String destinationName = checksumForFileName(contextDefinition) +
".json";
try {
storeContextDefinition(contextDefinition, destinationName);
- boolean successful = false;
- while (!successful) {
- localFiles.clear();
+ boolean waitingOnOtherDownloads;
+ do {
+ waitingOnOtherDownloads = false;
for (Resource resource : contextDefinition.getResources()) {
Path path = storeResource(resource);
if (path == null) {
LOG.trace("Skipped resource {} while another process or thread is
downloading it",
resource.getLocation());
+ waitingOnOtherDownloads = true;
continue;
}
- localFiles.add(path);
- LOG.trace("Added resource {} to classpath", path);
}
- successful = localFiles.size() ==
contextDefinition.getResources().size();
- }
-
- } catch (IOException | RuntimeException e) {
+ } while (waitingOnOtherDownloads);
+ } catch (IOException e) {
+ LOG.error("Error storing resources for context {}", destinationName, e);
+ throw new UncheckedIOException(e);
+ } catch (RuntimeException e) {
LOG.error("Error storing resources for context {}", destinationName, e);
throw e;
}
- return localFiles.stream().map(p -> {
- try {
- return p.toUri().toURL();
- } catch (MalformedURLException e) {
- // this shouldn't happen since these are local file paths
- throw new UncheckedIOException(e);
- }
- }).toArray(URL[]::new);
}
private void storeContextDefinition(final ContextDefinition
contextDefinition,
@@ -184,11 +188,9 @@ public final class LocalStore {
if (Files.exists(destinationPath)) {
return;
}
- Path tempPath = contextsDir.resolve(tempName(destinationName));
- // the temporary file name should be unique for this attempt, but
CREATE_NEW is used here, along
- // with the subsequent ATOMIC_MOVE, to guarantee we don't collide with any
other task saving the
- // same file
- Files.write(tempPath, contextDefinition.toJson().getBytes(UTF_8),
CREATE_NEW);
+ // Avoid colliding with other processes by saving to a unique temp name
first
+ Path tempPath = createTempFile(destinationName);
+ Files.write(tempPath, contextDefinition.toJson().getBytes(UTF_8));
Files.move(tempPath, destinationPath, ATOMIC_MOVE);
}
@@ -208,8 +210,7 @@ public final class LocalStore {
final URL url = resource.getLocation();
final String baseName = localResourceName(resource);
final Path destinationPath = resourcesDir.resolve(baseName);
- final Path tempPath = resourcesDir.resolve(tempName(baseName));
- final Path downloadingProgressPath = resourcesDir.resolve("." + baseName +
".downloading");
+ final Path downloadingProgressPath = workingDir.resolve(baseName +
".downloading");
if (Files.exists(destinationPath)) {
LOG.trace("Resource {} is already cached at {}", url, destinationPath);
@@ -242,6 +243,7 @@ public final class LocalStore {
return null;
}
+ final Path tempPath = createTempFile(baseName);
var task = new FutureTask<Void>(() -> downloadFile(tempPath, resource),
null);
var t = new Thread(task);
t.setDaemon(true);
@@ -253,7 +255,7 @@ public final class LocalStore {
try {
while (!task.isDone()) {
try {
- Files.write(downloadingProgressPath, PID.getBytes(UTF_8),
TRUNCATE_EXISTING);
+ Files.write(downloadingProgressPath, PID.getBytes(UTF_8));
} catch (IOException e) {
LOG.warn(
"Error writing progress file {}. Other processes may attempt to
download the same file concurrently.",
@@ -308,12 +310,10 @@ public final class LocalStore {
URL url = resource.getLocation();
allowedUrlChecker.accept("Resource", url);
- // CREATE_NEW ensures the temporary file name is unique for this attempt
// SYNC ensures file integrity on each write, in case of system failure.
Buffering minimizes
// system calls te read/write data which minimizes the number of syncs.
try (var in = new BufferedInputStream(url.openStream(), DL_BUFF_SIZE);
- var out = new BufferedOutputStream(Files.newOutputStream(tempPath,
CREATE_NEW, WRITE, SYNC),
- DL_BUFF_SIZE)) {
+ var out = new BufferedOutputStream(Files.newOutputStream(tempPath,
SYNC), DL_BUFF_SIZE)) {
in.transferTo(out);
} catch (IOException e) {
throw new UncheckedIOException(e);
@@ -344,4 +344,46 @@ public final class LocalStore {
throw ise;
}
}
+
+ Path createWorkingHardLinks(final ContextDefinition contextDefinition,
Consumer<Path> forEachLink)
+ throws HardLinkFailedException {
+ Path hardLinkDir = createTempDirectory("context-" +
checksumForFileName(contextDefinition));
+ for (Resource r : contextDefinition.getResources()) {
+ String fileName = localResourceName(r);
+ Path p = resourcesDir.resolve(fileName);
+ Path hardLink;
+ try {
+ hardLink = Files.createLink(hardLinkDir.resolve(fileName), p);
+ } catch (NoSuchFileException e) {
+ throw new HardLinkFailedException(hardLinkDir, p, e);
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ forEachLink.accept(hardLink);
+ }
+ return hardLinkDir;
+ }
+
+ static class HardLinkFailedException extends Exception {
+
+ private static final long serialVersionUID = 1L;
+ private final Path destDir;
+ private Path missingResource;
+
+ public HardLinkFailedException(Path destDir, Path missingResource,
NoSuchFileException cause) {
+ super("Creating hard link in directory " + destDir + " failed", cause);
+ this.destDir = destDir;
+ this.missingResource = missingResource;
+ }
+
+ public Path getDestinationDirectory() {
+ return destDir;
+ }
+
+ public Path getMissingResource() {
+ return missingResource;
+ }
+
+ }
+
}
diff --git
a/modules/local-caching-classloader/src/test/java/org/apache/accumulo/classloader/lcc/LocalCachingContextClassLoaderFactoryTest.java
b/modules/local-caching-classloader/src/test/java/org/apache/accumulo/classloader/lcc/LocalCachingContextClassLoaderFactoryTest.java
index 694546d..1c6515a 100644
---
a/modules/local-caching-classloader/src/test/java/org/apache/accumulo/classloader/lcc/LocalCachingContextClassLoaderFactoryTest.java
+++
b/modules/local-caching-classloader/src/test/java/org/apache/accumulo/classloader/lcc/LocalCachingContextClassLoaderFactoryTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.accumulo.classloader.lcc;
+import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;
import static
org.apache.accumulo.classloader.lcc.LocalCachingContextClassLoaderFactory.ALLOWED_URLS_PATTERN;
import static
org.apache.accumulo.classloader.lcc.LocalCachingContextClassLoaderFactory.CACHE_DIR_PROPERTY;
import static
org.apache.accumulo.classloader.lcc.LocalCachingContextClassLoaderFactory.UPDATE_FAILURE_GRACE_PERIOD_MINS;
@@ -40,8 +41,6 @@ import java.net.MalformedURLException;
import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Path;
-import java.nio.file.StandardCopyOption;
-import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedHashSet;
@@ -138,7 +137,7 @@ public class LocalCachingContextClassLoaderFactoryTest {
// Create local context definition in jar C directory
File localDefFile =
jarCParentDirectory.resolve("allContextDefinition.json").toFile();
- Files.writeString(localDefFile.toPath(), allJarsDefJson,
StandardOpenOption.CREATE);
+ Files.writeString(localDefFile.toPath(), allJarsDefJson);
assertTrue(Files.exists(localDefFile.toPath()));
var hdfsDefFile = new
org.apache.hadoop.fs.Path("/allContextDefinition.json");
@@ -206,8 +205,7 @@ public class LocalCachingContextClassLoaderFactoryTest {
newResources.add(new Resource(new URL(badUrl), "MD5", BAD_MD5));
var context2 = new ContextDefinition(MONITOR_INTERVAL_SECS, newResources);
var disallowedContext =
tempDir.resolve("context-with-disallowed-resource-url.json");
- Files.writeString(disallowedContext, context2.toJson(),
StandardOpenOption.CREATE,
- StandardOpenOption.TRUNCATE_EXISTING);
+ Files.writeString(disallowedContext, context2.toJson());
ex = assertThrows(ContextClassLoaderException.class,
() ->
factory.getClassLoader(disallowedContext.toUri().toURL().toExternalForm()));
assertTrue(ex.getCause() instanceof IllegalStateException);
@@ -325,7 +323,7 @@ public class LocalCachingContextClassLoaderFactoryTest {
assertNotNull(jarAPathParent);
var jarACopy = jarAPathParent.resolve("jarACopy.jar");
assertTrue(!Files.exists(jarACopy));
- Files.copy(jarAPath, jarACopy, StandardCopyOption.REPLACE_EXISTING);
+ Files.copy(jarAPath, jarACopy, REPLACE_EXISTING);
assertTrue(Files.exists(jarACopy));
var def = ContextDefinition.create(MONITOR_INTERVAL_SECS, "SHA-512",
jarACopy.toUri().toURL());
@@ -520,7 +518,7 @@ public class LocalCachingContextClassLoaderFactoryTest {
assertNotNull(jarAPathParent);
var jarACopy = jarAPathParent.resolve("jarACopy.jar");
assertTrue(!Files.exists(jarACopy));
- Files.copy(jarAPath, jarACopy, StandardCopyOption.REPLACE_EXISTING);
+ Files.copy(jarAPath, jarACopy, REPLACE_EXISTING);
assertTrue(Files.exists(jarACopy));
var def2 = ContextDefinition.create(MONITOR_INTERVAL_SECS, "SHA-512",
jarACopy.toUri().toURL());
Files.delete(jarACopy);
@@ -752,7 +750,7 @@ public class LocalCachingContextClassLoaderFactoryTest {
assertNotNull(jarAPathParent);
var jarACopy = jarAPathParent.resolve("jarACopy.jar");
assertTrue(!Files.exists(jarACopy));
- Files.copy(jarAPath, jarACopy, StandardCopyOption.REPLACE_EXISTING);
+ Files.copy(jarAPath, jarACopy, REPLACE_EXISTING);
assertTrue(Files.exists(jarACopy));
var def2 = ContextDefinition.create(MONITOR_INTERVAL_SECS, "SHA-512",
jarACopy.toUri().toURL());
Files.delete(jarACopy);
@@ -810,7 +808,7 @@ public class LocalCachingContextClassLoaderFactoryTest {
assertEquals(2, files.size());
// overwrite one downloaded jar with others content
- Files.copy(files.get(0), files.get(1),
StandardCopyOption.REPLACE_EXISTING);
+ Files.copy(files.get(0), files.get(1), REPLACE_EXISTING);
final var update2 =
createContextDefinitionFile(fs,
"UpdateChangingContextDefinition2.json", def.toJson());
diff --git
a/modules/local-caching-classloader/src/test/java/org/apache/accumulo/classloader/lcc/MiniAccumuloClusterClassLoaderFactoryTest.java
b/modules/local-caching-classloader/src/test/java/org/apache/accumulo/classloader/lcc/MiniAccumuloClusterClassLoaderFactoryTest.java
index 75b4d95..1ba7dea 100644
---
a/modules/local-caching-classloader/src/test/java/org/apache/accumulo/classloader/lcc/MiniAccumuloClusterClassLoaderFactoryTest.java
+++
b/modules/local-caching-classloader/src/test/java/org/apache/accumulo/classloader/lcc/MiniAccumuloClusterClassLoaderFactoryTest.java
@@ -19,9 +19,7 @@
package org.apache.accumulo.classloader.lcc;
import static java.nio.charset.StandardCharsets.UTF_8;
-import static java.nio.file.attribute.PosixFilePermission.OWNER_EXECUTE;
-import static java.nio.file.attribute.PosixFilePermission.OWNER_READ;
-import static java.nio.file.attribute.PosixFilePermission.OWNER_WRITE;
+import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;
import static
org.apache.accumulo.classloader.lcc.LocalCachingContextClassLoaderFactory.ALLOWED_URLS_PATTERN;
import static
org.apache.accumulo.classloader.lcc.LocalCachingContextClassLoaderFactory.CACHE_DIR_PROPERTY;
import static
org.apache.accumulo.classloader.lcc.LocalCachingContextClassLoaderFactory.UPDATE_FAILURE_GRACE_PERIOD_MINS;
@@ -36,31 +34,17 @@ import java.io.IOException;
import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Path;
-import java.nio.file.StandardCopyOption;
-import java.nio.file.StandardOpenOption;
-import java.nio.file.attribute.FileAttribute;
-import java.nio.file.attribute.PosixFilePermission;
-import java.nio.file.attribute.PosixFilePermissions;
import java.util.Arrays;
import java.util.Collections;
import java.util.EnumSet;
-import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
-import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.TreeSet;
import java.util.stream.Collectors;
-import javax.management.JMX;
-import javax.management.MalformedObjectNameException;
-import javax.management.remote.JMXConnectorFactory;
-import javax.management.remote.JMXServiceURL;
-
import org.apache.accumulo.classloader.lcc.definition.ContextDefinition;
import org.apache.accumulo.classloader.lcc.definition.Resource;
-import org.apache.accumulo.classloader.lcc.jmx.ContextClassLoadersMXBean;
import org.apache.accumulo.classloader.lcc.util.LocalStore;
import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.client.AccumuloClient;
@@ -90,24 +74,14 @@ import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.sun.tools.attach.AttachNotSupportedException;
-import com.sun.tools.attach.VirtualMachine;
-import com.sun.tools.attach.VirtualMachineDescriptor;
public class MiniAccumuloClusterClassLoaderFactoryTest extends
SharedMiniClusterBase {
- private static final Logger LOG =
- LoggerFactory.getLogger(MiniAccumuloClusterClassLoaderFactoryTest.class);
-
private static class TestMACConfiguration implements
MiniClusterConfigurationCallback {
@Override
public void configureMiniCluster(MiniAccumuloConfigImpl cfg,
org.apache.hadoop.conf.Configuration coreSite) {
- cfg.getJvmOptions().add("-XX:-PerfDisableSharedMem");
cfg.setNumTservers(3);
cfg.setProperty(Property.TSERV_NATIVEMAP_ENABLED.getKey(), "false");
cfg.setProperty(Property.GENERAL_CONTEXT_CLASSLOADER_FACTORY.getKey(),
@@ -121,10 +95,6 @@ public class MiniAccumuloClusterClassLoaderFactoryTest
extends SharedMiniCluster
@TempDir
private static Path tempDir;
- private static final Set<PosixFilePermission> CACHE_DIR_PERMS =
- EnumSet.of(OWNER_READ, OWNER_WRITE, OWNER_EXECUTE);
- private static final FileAttribute<Set<PosixFilePermission>> PERMISSIONS =
- PosixFilePermissions.asFileAttribute(CACHE_DIR_PERMS);
private static final String ITER_CLASS_NAME =
"org.apache.accumulo.classloader.vfs.examples.ExampleIterator";
private static final int MONITOR_INTERVAL_SECS =
@@ -154,17 +124,16 @@ public class MiniAccumuloClusterClassLoaderFactoryTest
extends SharedMiniCluster
@Test
public void testClassLoader() throws Exception {
- final var baseDirPath = tempDir.resolve("base");
- final var resourcesDirPath = baseDirPath.resolve("resources");
+ final var workingDirPath = tempDir.resolve("base").resolve("working");
final var jsonDirPath = tempDir.resolve("simulatedRemoteContextFiles");
- Files.createDirectory(jsonDirPath, PERMISSIONS);
+ Files.createDirectory(jsonDirPath);
// Create a context definition that only references jar A
final var testContextDef =
ContextDefinition.create(MONITOR_INTERVAL_SECS, "SHA-256",
jarAOrigLocation);
final String testContextDefJson = testContextDef.toJson();
final File testContextDefFile =
jsonDirPath.resolve("testContextDefinition.json").toFile();
- Files.writeString(testContextDefFile.toPath(), testContextDefJson,
StandardOpenOption.CREATE);
+ Files.writeString(testContextDefFile.toPath(), testContextDefJson);
assertTrue(Files.exists(testContextDefFile.toPath()));
Resource jarAResource = testContextDef.getResources().iterator().next();
@@ -225,10 +194,9 @@ public class MiniAccumuloClusterClassLoaderFactoryTest
extends SharedMiniCluster
// before applying the iterator
final byte[] jarAValueBytes = "foo".getBytes(UTF_8);
assertEquals(0, countExpectedValues(client, tableName, jarAValueBytes));
- Set<String> refFiles = getReferencedFiles();
- assertEquals(1, refFiles.size());
- assertTrue(refFiles
-
.contains(resourcesDirPath.resolve(jarALocalFileName).toUri().toURL().toString()));
+ Set<Path> refFiles = getReferencedFiles(workingDirPath);
+ assertEquals(1, refFiles.size(), refFiles::toString);
+ assertTrue(refFiles.stream().anyMatch(p ->
p.endsWith(jarALocalFileName)));
// Attach a scan iterator to the table
IteratorSetting is = new IteratorSetting(101, "example",
ITER_CLASS_NAME);
@@ -240,17 +208,15 @@ public class MiniAccumuloClusterClassLoaderFactoryTest
extends SharedMiniCluster
while (count != 1000) {
count = countExpectedValues(client, tableName, jarAValueBytes);
}
- refFiles = getReferencedFiles();
- assertEquals(1, refFiles.size());
- assertTrue(refFiles
-
.contains(resourcesDirPath.resolve(jarALocalFileName).toUri().toURL().toString()));
+ refFiles = getReferencedFiles(workingDirPath);
+ assertEquals(1, refFiles.size(), refFiles::toString);
+ assertTrue(refFiles.stream().anyMatch(p ->
p.endsWith(jarALocalFileName)));
// Update the context definition to point to jar B
final ContextDefinition testContextDefUpdate =
ContextDefinition.create(MONITOR_INTERVAL_SECS, "SHA-512",
jarBOrigLocation);
final String testContextDefUpdateJson = testContextDefUpdate.toJson();
- Files.writeString(testContextDefFile.toPath(), testContextDefUpdateJson,
- StandardOpenOption.TRUNCATE_EXISTING);
+ Files.writeString(testContextDefFile.toPath(), testContextDefUpdateJson);
assertTrue(Files.exists(testContextDefFile.toPath()));
Resource jarBResource =
testContextDefUpdate.getResources().iterator().next();
@@ -264,12 +230,10 @@ public class MiniAccumuloClusterClassLoaderFactoryTest
extends SharedMiniCluster
// by the iterator
final byte[] jarBValueBytes = "bar".getBytes(UTF_8);
assertEquals(1000, countExpectedValues(client, tableName,
jarBValueBytes));
- refFiles = getReferencedFiles();
- assertEquals(2, refFiles.size());
- assertTrue(refFiles
-
.contains(resourcesDirPath.resolve(jarALocalFileName).toUri().toURL().toString()));
- assertTrue(refFiles
-
.contains(resourcesDirPath.resolve(jarBLocalFileName).toUri().toURL().toString()));
+ refFiles = getReferencedFiles(workingDirPath);
+ assertEquals(2, refFiles.size(), refFiles::toString);
+ assertTrue(refFiles.stream().anyMatch(p ->
p.endsWith(jarALocalFileName)));
+ assertTrue(refFiles.stream().anyMatch(p ->
p.endsWith(jarBLocalFileName)));
// Copy jar A, create a context definition using the copy, then
// remove the copy so that it's not found when the context classloader
@@ -279,7 +243,7 @@ public class MiniAccumuloClusterClassLoaderFactoryTest
extends SharedMiniCluster
assertNotNull(jarAPathParent);
var jarACopy = jarAPathParent.resolve("jarACopy.jar");
assertTrue(!Files.exists(jarACopy));
- Files.copy(jarAPath, jarACopy, StandardCopyOption.REPLACE_EXISTING);
+ Files.copy(jarAPath, jarACopy, REPLACE_EXISTING);
assertTrue(Files.exists(jarACopy));
final ContextDefinition testContextDefUpdate2 =
@@ -288,8 +252,7 @@ public class MiniAccumuloClusterClassLoaderFactoryTest
extends SharedMiniCluster
assertTrue(!Files.exists(jarACopy));
final String testContextDefUpdateJson2 = testContextDefUpdate2.toJson();
- Files.writeString(testContextDefFile.toPath(), testContextDefUpdateJson2,
- StandardOpenOption.TRUNCATE_EXISTING);
+ Files.writeString(testContextDefFile.toPath(),
testContextDefUpdateJson2);
assertTrue(Files.exists(testContextDefFile.toPath()));
// Wait 2x the monitor interval
@@ -299,55 +262,26 @@ public class MiniAccumuloClusterClassLoaderFactoryTest
extends SharedMiniCluster
// by the iterator. The previous classloader is still being used after
// the monitor interval because the jar referenced does not exist.
assertEquals(1000, countExpectedValues(client, tableName,
jarBValueBytes));
- refFiles = getReferencedFiles();
- assertEquals(2, refFiles.size());
- assertTrue(refFiles
-
.contains(resourcesDirPath.resolve(jarALocalFileName).toUri().toURL().toString()));
- assertTrue(refFiles
-
.contains(resourcesDirPath.resolve(jarBLocalFileName).toUri().toURL().toString()));
+ refFiles = getReferencedFiles(workingDirPath);
+ assertEquals(2, refFiles.size(), refFiles::toString);
+ assertTrue(refFiles.stream().anyMatch(p ->
p.endsWith(jarALocalFileName)));
+ assertTrue(refFiles.stream().anyMatch(p ->
p.endsWith(jarBLocalFileName)));
// Wait 2 minutes, 2 times the UPDATE_FAILURE_GRACE_PERIOD_MINS
Thread.sleep(120_000);
// Scan of table with iterator setting should now fail.
final Scanner scanner2 = client.createScanner(tableName);
- RuntimeException re =
- assertThrows(RuntimeException.class, () ->
scanner2.iterator().hasNext());
- Throwable cause = re.getCause();
- assertTrue(cause instanceof AccumuloServerException);
+ var re = assertThrows(RuntimeException.class, () ->
scanner2.iterator().hasNext());
+ assertTrue(re.getCause() instanceof AccumuloServerException);
}
}
- private Set<String> getReferencedFiles() {
- final Map<String,List<String>> referencedFiles = new HashMap<>();
- for (VirtualMachineDescriptor vmd : VirtualMachine.list()) {
- if (vmd.displayName().contains("org.apache.accumulo.start.Main")
- && !vmd.displayName().contains("zookeeper")) {
- LOG.info("Attempting to connect to {}", vmd.displayName());
- try {
- var vm = VirtualMachine.attach(vmd);
- String connectorAddress = vm.getAgentProperties()
-
.getProperty("com.sun.management.jmxremote.localConnectorAddress");
- if (connectorAddress == null) {
- connectorAddress = vm.startLocalManagementAgent();
- }
- var url = new JMXServiceURL(connectorAddress);
- try (var connector = JMXConnectorFactory.connect(url)) {
- var mbsc = connector.getMBeanServerConnection();
- var proxy = JMX.newMXBeanProxy(mbsc,
ContextClassLoadersMXBean.getObjectName(),
- ContextClassLoadersMXBean.class);
- referencedFiles.putAll(proxy.getReferencedFiles());
- }
- } catch (MalformedObjectNameException | AttachNotSupportedException |
IOException e) {
- LOG.error("Error getting referenced files from {}",
vmd.displayName(), e);
- }
- }
- }
- Set<String> justTheFiles = new HashSet<>();
- referencedFiles.values().forEach(justTheFiles::addAll);
- LOG.info("Referenced files with contexts: {}", referencedFiles);
- LOG.info("Referenced files: {}", justTheFiles);
- return justTheFiles;
+ private Set<Path> getReferencedFiles(Path workingDirPath) throws IOException
{
+ // get all files in subdirectories in working directory
+ return Files.walk(workingDirPath).filter(p -> p.toFile().isFile())
+ .filter(p -> p.getNameCount() == workingDirPath.getNameCount() +
2).map(Path::getFileName)
+ .collect(Collectors.toSet());
}
private int countExpectedValues(AccumuloClient client, String table, byte[]
expectedValue)
diff --git
a/modules/local-caching-classloader/src/test/java/org/apache/accumulo/classloader/lcc/TestUtils.java
b/modules/local-caching-classloader/src/test/java/org/apache/accumulo/classloader/lcc/TestUtils.java
index 9811f4e..b0e2d8c 100644
---
a/modules/local-caching-classloader/src/test/java/org/apache/accumulo/classloader/lcc/TestUtils.java
+++
b/modules/local-caching-classloader/src/test/java/org/apache/accumulo/classloader/lcc/TestUtils.java
@@ -32,7 +32,6 @@ import java.io.InputStreamReader;
import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Path;
-import java.nio.file.StandardOpenOption;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
@@ -186,7 +185,7 @@ public class TestUtils {
}
public static long getFileSize(Path p) throws IOException {
- try (InputStream is = Files.newInputStream(p, StandardOpenOption.READ)) {
+ try (InputStream is = Files.newInputStream(p)) {
return IOUtils.consume(is);
}
}
diff --git
a/modules/local-caching-classloader/src/test/java/org/apache/accumulo/classloader/lcc/util/LocalStoreTest.java
b/modules/local-caching-classloader/src/test/java/org/apache/accumulo/classloader/lcc/util/LocalStoreTest.java
index 74408fb..6749ed7 100644
---
a/modules/local-caching-classloader/src/test/java/org/apache/accumulo/classloader/lcc/util/LocalStoreTest.java
+++
b/modules/local-caching-classloader/src/test/java/org/apache/accumulo/classloader/lcc/util/LocalStoreTest.java
@@ -20,6 +20,7 @@ package org.apache.accumulo.classloader.lcc.util;
import static
org.apache.accumulo.classloader.lcc.TestUtils.testClassFailsToLoad;
import static org.apache.accumulo.classloader.lcc.TestUtils.testClassLoads;
+import static
org.apache.accumulo.classloader.lcc.util.LccUtils.checksumForFileName;
import static
org.apache.accumulo.classloader.lcc.util.LocalStore.localResourceName;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -27,13 +28,13 @@ import static
org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
-import java.io.File;
import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Path;
-import java.util.Comparator;
import java.util.LinkedHashSet;
+import java.util.concurrent.CountDownLatch;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
@@ -123,11 +124,7 @@ public class LocalStoreTest {
@AfterEach
public void cleanBaseDir() throws Exception {
- if (Files.exists(baseCacheDir)) {
- try (var walker = Files.walk(baseCacheDir)) {
-
walker.map(Path::toFile).sorted(Comparator.reverseOrder()).forEach(File::delete);
- }
- }
+ LccUtils.recursiveDelete(baseCacheDir);
}
@Test
@@ -147,6 +144,7 @@ public class LocalStoreTest {
assertTrue(Files.exists(baseCacheDir.resolve("resources")));
assertEquals(baseCacheDir.resolve("contexts"), localStore.contextsDir());
assertEquals(baseCacheDir.resolve("resources"), localStore.resourcesDir());
+ assertEquals(baseCacheDir.resolve("working"), localStore.workingDir());
}
@Test
@@ -225,8 +223,8 @@ public class LocalStoreTest {
// Confirm the 3 jars are cached locally
assertTrue(Files.exists(baseCacheDir));
- assertTrue(Files.exists(baseCacheDir.resolve("contexts").resolve(
- LocalStore.checksumForFileName(def.getChecksumAlgorithm(),
def.getChecksum()) + ".json")));
+ assertTrue(
+
Files.exists(baseCacheDir.resolve("contexts").resolve(checksumForFileName(def)
+ ".json")));
for (Resource r : def.getResources()) {
assertTrue(Files.exists(baseCacheDir.resolve("resources").resolve(localResourceName(r))));
}
@@ -234,8 +232,10 @@ public class LocalStoreTest {
@Test
public void testClassLoader() throws Exception {
- var urls = new LocalStore(baseCacheDir,
ALLOW_ALL_URLS).storeContextResources(def);
- var contextClassLoader = LccUtils.createClassLoader("url", urls);
+ var localStore = new LocalStore(baseCacheDir, ALLOW_ALL_URLS);
+ localStore.storeContextResources(def);
+ var cacheKey = new ContextCacheKey("loc", def);
+ var contextClassLoader = LccUtils.createClassLoader(cacheKey, localStore);
testClassLoads(contextClassLoader, classA);
testClassLoads(contextClassLoader, classB);
@@ -245,8 +245,9 @@ public class LocalStoreTest {
@Test
public void testClassLoaderUpdate() throws Exception {
var localStore = new LocalStore(baseCacheDir, ALLOW_ALL_URLS);
- var urls = localStore.storeContextResources(def);
- final var contextClassLoader = LccUtils.createClassLoader("url", urls);
+ localStore.storeContextResources(def);
+ var cacheKey = new ContextCacheKey("loc", def);
+ final var contextClassLoader = LccUtils.createClassLoader(cacheKey,
localStore);
testClassLoads(contextClassLoader, classA);
testClassLoads(contextClassLoader, classB);
@@ -265,12 +266,11 @@ public class LocalStoreTest {
TestUtils.computeResourceChecksum("SHA-512", jarDOrigLocation)));
var updatedDef = new ContextDefinition(MONITOR_INTERVAL_SECS,
updatedResources);
- urls = localStore.storeContextResources(updatedDef);
+ localStore.storeContextResources(updatedDef);
// Confirm the 3 jars are cached locally
- assertTrue(Files.exists(baseCacheDir.resolve("contexts").resolve(
- LocalStore.checksumForFileName(updatedDef.getChecksumAlgorithm(),
updatedDef.getChecksum())
- + ".json")));
+ assertTrue(Files.exists(
+
baseCacheDir.resolve("contexts").resolve(checksumForFileName(updatedDef) +
".json")));
for (Resource r : updatedDef.getResources()) {
assertFalse(r.getFileName().contains("C"));
assertTrue(Files.exists(baseCacheDir.resolve("resources").resolve(localResourceName(r))));
@@ -281,7 +281,8 @@ public class LocalStoreTest {
assertTrue(Files
.exists(baseCacheDir.resolve("resources").resolve(localResourceName(removedResource))));
- final var updatedContextClassLoader = LccUtils.createClassLoader("url",
urls);
+ cacheKey = new ContextCacheKey("loc", updatedDef);
+ final var updatedContextClassLoader = LccUtils.createClassLoader(cacheKey,
localStore);
assertNotEquals(contextClassLoader, updatedContextClassLoader);
testClassLoads(updatedContextClassLoader, classA);
@@ -290,4 +291,39 @@ public class LocalStoreTest {
testClassLoads(updatedContextClassLoader, classD);
}
+ @Test
+ public void testClassLoaderDirCleanup() throws Exception {
+ var localStore = new LocalStore(baseCacheDir, ALLOW_ALL_URLS);
+ assertEquals(0,
Files.list(localStore.workingDir()).filter(Files::isDirectory).count());
+ localStore.storeContextResources(def);
+ var cacheKey = new ContextCacheKey("loc", def);
+ assertEquals(0,
Files.list(localStore.workingDir()).filter(Files::isDirectory).count());
+
+ final var endBackgroundThread = new CountDownLatch(1);
+ var createdClassLoader = new CountDownLatch(1);
+ Thread t = new Thread(() -> {
+ final var contextClassLoader = LccUtils.createClassLoader(cacheKey,
localStore);
+ createdClassLoader.countDown();
+ try {
+ testClassLoads(contextClassLoader, classA);
+ endBackgroundThread.await();
+ } catch (Exception e) {
+ fail(e);
+ }
+ // hold a strong reference in the background thread long enough to
prevent cleanup
+ assertNotNull(contextClassLoader);
+ });
+ t.start();
+ createdClassLoader.await();
+
+ assertEquals(1,
Files.list(localStore.workingDir()).filter(Files::isDirectory).count());
+
+ endBackgroundThread.countDown();
+
+ // wait for classloader to be garbage collected and its cleaner to run
+ while
(Files.list(localStore.workingDir()).filter(Files::isDirectory).count() != 0) {
+ System.gc();
+ Thread.sleep(50);
+ }
+ }
}
diff --git a/pom.xml b/pom.xml
index 934a65c..bcab9b3 100644
--- a/pom.xml
+++ b/pom.xml
@@ -136,7 +136,7 @@ under the License.
<!-- most dependencies will be provided by the accumulo installation
-->
<groupId>org.apache.accumulo</groupId>
<artifactId>accumulo-project</artifactId>
- <version>2.1.5-SNAPSHOT</version>
+ <version>2.1.4</version>
<type>pom</type>
<scope>import</scope>
</dependency>