BewareMyPower commented on a change in pull request #14346:
URL: https://github.com/apache/pulsar/pull/14346#discussion_r809955217
##########
File path:
pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/functioncache/FunctionCacheManagerImpl.java
##########
@@ -73,72 +73,74 @@ public void registerFunctionInstance(String fid,
throw new NullPointerException("FunctionID not set");
}
- synchronized (cacheFunctions) {
- FunctionCacheEntry entry = cacheFunctions.get(fid);
-
- if (null == entry) {
- URL[] urls = new URL[requiredJarFiles.size() +
requiredClasspaths.size()];
- int count = 0;
- try {
- // add jar files to urls
- for (String jarFile : requiredJarFiles) {
- urls[count++] = new File(jarFile).toURI().toURL();
- }
-
- // add classpaths
- for (URL url : requiredClasspaths) {
- urls[count++] = url;
- }
-
- cacheFunctions.put(
- fid,
- new FunctionCacheEntry(
- requiredJarFiles,
- requiredClasspaths,
- urls,
- eid, rootClassLoader));
- } catch (Throwable cause) {
- Exceptions.rethrowIOException(cause);
+ final AtomicBoolean computed = new AtomicBoolean(false);
+ final AtomicReference<Throwable> throwable = new AtomicReference<>();
+ FunctionCacheEntry entry = cacheFunctions.computeIfAbsent(fid, __ -> {
+ URL[] urls = new URL[requiredJarFiles.size() +
requiredClasspaths.size()];
+ int count = 0;
+ try {
+ // add jar files to urls
+ for (String jarFile : requiredJarFiles) {
+ urls[count++] = new File(jarFile).toURI().toURL();
+ }
+ // add classpaths
+ for (URL url : requiredClasspaths) {
+ urls[count++] = url;
}
- } else {
- entry.register(
+ final FunctionCacheEntry cacheEntry = new FunctionCacheEntry(
+ requiredJarFiles, requiredClasspaths, urls, eid,
rootClassLoader);
+ computed.set(true);
+ return cacheEntry;
+ } catch (Throwable cause) {
+ throwable.set(cause);
+ return null;
+ }
+ });
+ if (throwable.get() != null) {
+ Exceptions.rethrowIOException(throwable.get());
+ }
+ if (!computed.get() && entry != null) { // the key already exists
+ entry.register(
eid,
requiredJarFiles,
requiredClasspaths);
- }
}
}
@Override
public void registerFunctionInstanceWithArchive(String fid, String eid,
- String narArchive, String
narExtractionDirectory) throws IOException {
+ String narArchive,
+ String
narExtractionDirectory) throws IOException {
if (fid == null) {
throw new NullPointerException("FunctionID not set");
}
-
- synchronized (cacheFunctions) {
- FunctionCacheEntry entry = cacheFunctions.get(fid);
-
- if (null != entry) {
- entry.register(eid, Collections.singleton(narArchive),
Collections.emptyList());
- return;
- }
-
- // Create new cache entry
+ // Create new cache entry.
+ final AtomicBoolean computed = new AtomicBoolean(false);
+ final AtomicReference<Throwable> throwable = new AtomicReference<>();
+ FunctionCacheEntry entry = cacheFunctions.computeIfAbsent(fid, __ -> {
try {
- cacheFunctions.put(fid, new FunctionCacheEntry(narArchive,
eid, rootClassLoader, narExtractionDirectory));
+ final FunctionCacheEntry cacheEntry =
+ new FunctionCacheEntry(narArchive, eid,
rootClassLoader, narExtractionDirectory);
+ computed.set(true);
+ return cacheEntry;
} catch (Throwable cause) {
- Exceptions.rethrowIOException(cause);
+ throwable.set(cause);
+ return null;
}
+ });
+ if (throwable.get() != null) {
+ Exceptions.rethrowIOException(throwable.get());
+ }
+ if (null != entry && computed.get()) {
+ entry.register(eid, Collections.singleton(narArchive),
Collections.emptyList());
}
+
}
@Override
- public void unregisterFunctionInstance(String fid,
- String eid) {
+ public void unregisterFunctionInstance(String fid, String eid) {
synchronized (cacheFunctions) {
FunctionCacheEntry entry = cacheFunctions.get(fid);
-
if (null != entry) {
if (entry.unregister(eid)) {
cacheFunctions.remove(fid);
Review comment:
For concurrent container, we cannot use the object's lock to make
composite atomic operations of a concurrent container thread safe. Because the
thread safety of a concurrent container are not achieved by locking the object
itself.
You can run following code:
```java
final ConcurrentHashMap<String, String> map = new
ConcurrentHashMap<>();
map.put("A", "Value");
final ExecutorService executorService =
Executors.newSingleThreadExecutor();
executorService.execute(() -> {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
map.remove("A");
});
synchronized (map) { // NOTE: it's meaningless and cannot guarantee
any thread safety of the code block
final String value = map.get("A");
if (value != null) {
Thread.sleep(1000);
final String value1 = map.remove("A");
System.out.println("value: " + value + ", value1: " +
value1);
}
}
executorService.shutdown();
```
The output is:
```
value: Value, value1: null
```
If you changed `Thread.sleep(500);` to `Thread.sleep(1500);` in the thread
function, the output would become:
```
value: Value, value1: Value
```
Therefore, I think even if the `cacheFunctions` is a concurrent hash map,
all access to the field should be protected by `synchronized`. Actually,
`cacheFunctions` should just be a regular hash map.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]