BewareMyPower commented on a change in pull request #14346:
URL: https://github.com/apache/pulsar/pull/14346#discussion_r809955217



##########
File path: 
pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/functioncache/FunctionCacheManagerImpl.java
##########
@@ -73,72 +73,74 @@ public void registerFunctionInstance(String fid,
             throw new NullPointerException("FunctionID not set");
         }
 
-        synchronized (cacheFunctions) {
-            FunctionCacheEntry entry = cacheFunctions.get(fid);
-
-            if (null == entry) {
-                URL[] urls = new URL[requiredJarFiles.size() + 
requiredClasspaths.size()];
-                int count = 0;
-                try {
-                    // add jar files to urls
-                    for (String jarFile : requiredJarFiles) {
-                        urls[count++] = new File(jarFile).toURI().toURL();
-                    }
-
-                    // add classpaths
-                    for (URL url : requiredClasspaths) {
-                        urls[count++] = url;
-                    }
-
-                    cacheFunctions.put(
-                        fid,
-                        new FunctionCacheEntry(
-                            requiredJarFiles,
-                            requiredClasspaths,
-                            urls,
-                            eid, rootClassLoader));
-                } catch (Throwable cause) {
-                    Exceptions.rethrowIOException(cause);
+        final AtomicBoolean computed = new AtomicBoolean(false);
+        final AtomicReference<Throwable> throwable = new AtomicReference<>();
+        FunctionCacheEntry entry = cacheFunctions.computeIfAbsent(fid, __ -> {
+            URL[] urls = new URL[requiredJarFiles.size() + 
requiredClasspaths.size()];
+            int count = 0;
+            try {
+                // add jar files to urls
+                for (String jarFile : requiredJarFiles) {
+                    urls[count++] = new File(jarFile).toURI().toURL();
+                }
+                // add classpaths
+                for (URL url : requiredClasspaths) {
+                    urls[count++] = url;
                 }
-            } else {
-                entry.register(
+                final FunctionCacheEntry cacheEntry = new FunctionCacheEntry(
+                        requiredJarFiles, requiredClasspaths, urls, eid, 
rootClassLoader);
+                computed.set(true);
+                return cacheEntry;
+            } catch (Throwable cause) {
+                throwable.set(cause);
+                return null;
+            }
+        });
+        if (throwable.get() != null) {
+            Exceptions.rethrowIOException(throwable.get());
+        }
+        if (!computed.get() && entry != null) { // the key already exists
+            entry.register(
                     eid,
                     requiredJarFiles,
                     requiredClasspaths);
-            }
         }
     }
 
     @Override
     public void registerFunctionInstanceWithArchive(String fid, String eid,
-                                                    String narArchive, String 
narExtractionDirectory) throws IOException {
+                                                    String narArchive,
+                                                    String 
narExtractionDirectory) throws IOException {
         if (fid == null) {
             throw new NullPointerException("FunctionID not set");
         }
-
-        synchronized (cacheFunctions) {
-            FunctionCacheEntry entry = cacheFunctions.get(fid);
-
-            if (null != entry) {
-                entry.register(eid, Collections.singleton(narArchive), 
Collections.emptyList());
-                return;
-            }
-
-            // Create new cache entry
+        // Create new cache entry.
+        final AtomicBoolean computed = new AtomicBoolean(false);
+        final AtomicReference<Throwable> throwable = new AtomicReference<>();
+        FunctionCacheEntry entry = cacheFunctions.computeIfAbsent(fid, __ -> {
             try {
-                cacheFunctions.put(fid, new FunctionCacheEntry(narArchive, 
eid, rootClassLoader, narExtractionDirectory));
+                final FunctionCacheEntry cacheEntry =
+                        new FunctionCacheEntry(narArchive, eid, 
rootClassLoader, narExtractionDirectory);
+                computed.set(true);
+                return cacheEntry;
             } catch (Throwable cause) {
-                Exceptions.rethrowIOException(cause);
+                throwable.set(cause);
+                return null;
             }
+        });
+        if (throwable.get() != null) {
+            Exceptions.rethrowIOException(throwable.get());
+        }
+        if (null != entry && computed.get()) {
+            entry.register(eid, Collections.singleton(narArchive), 
Collections.emptyList());
         }
+
     }
 
     @Override
-    public void unregisterFunctionInstance(String fid,
-                                           String eid) {
+    public void unregisterFunctionInstance(String fid, String eid) {
         synchronized (cacheFunctions) {
             FunctionCacheEntry entry = cacheFunctions.get(fid);
-
             if (null != entry) {
                 if (entry.unregister(eid)) {
                     cacheFunctions.remove(fid);

Review comment:
       For concurrent container, we cannot use the object's lock to make 
composite atomic operations of a concurrent container thread safe. Because the 
thread safety of a concurrent container are not achieved by locking the object 
itself.
   
   You can run following code:
   
   ```java
           final ConcurrentHashMap<String, String> map = new 
ConcurrentHashMap<>();
           map.put("A", "Value");
   
           final ExecutorService executorService = 
Executors.newSingleThreadExecutor();
           executorService.execute(() -> {
               try {
                   Thread.sleep(500);
               } catch (InterruptedException e) {
                   Thread.currentThread().interrupt();
               }
               map.remove("A");
           });
           synchronized (map) { // NOTE: it's meaningless and cannot guarantee 
any thread safety of the code block
               final String value = map.get("A");
               if (value != null) {
                   Thread.sleep(1000);
                   final String value1 = map.remove("A");
                   System.out.println("value: " + value + ", value1: " + 
value1);
               }
           }
           executorService.shutdown();
   ```
   
   The output is:
   
   ```
   value: Value, value1: null
   ```
   
   If you changed `Thread.sleep(500);` to `Thread.sleep(1500);` in the thread 
function, the output would become:
   
   ```
   value: Value, value1: Value
   ```
   
   Therefore, I think even if the `cacheFunctions` is a concurrent hash map, 
all access to the field should be protected by `synchronized`. Actually, 
`cacheFunctions` should just be a regular hash map.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to