Reamer commented on code in PR #4374:
URL: https://github.com/apache/zeppelin/pull/4374#discussion_r884561932


##########
zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NoteManagerTest.java:
##########
@@ -158,4 +166,117 @@ public void testLruCache() throws IOException {
     assertFalse(noteManager.containsNote(noteNew2.getPath()));
     assertEquals(cacheThreshold, noteManager.getCacheSize());
   }
+
+  @Test
+  public void testConcurrentOperation() throws Exception {
+    int threshold = 10, noteNum = 150;
+    Map<Integer, String> notes = new ConcurrentHashMap<>();
+    ExecutorService threadPool = Executors.newFixedThreadPool(threshold);
+    // Save note concurrently
+    ConcurrentTask saveNote = new ConcurrentTaskSaveNote(threadPool, noteNum, 
notes, "/prod/note%s");
+    saveNote.exec();
+    // Move note concurrently
+    ConcurrentTask moveNote = new ConcurrentTaskMoveNote(threadPool, noteNum, 
notes, "/dev/project_%s/my_note%s");
+    moveNote.exec();
+    // Move folder concurrently
+    ConcurrentTask moveFolder = new ConcurrentTaskMoveFolder(threadPool, 
noteNum, notes, "/staging/note_%s/my_note%s");
+    moveFolder.exec();
+    // Remove note concurrently
+    ConcurrentTask removeNote = new ConcurrentTaskRemoveNote(threadPool, 
noteNum, notes, null);
+    removeNote.exec();
+    threadPool.shutdown();
+  }
+
+  abstract class ConcurrentTask {
+    private ExecutorService threadPool;
+    private int noteNum;
+    private Map<Integer, String> notes;
+    private String pathPattern;
+
+    public ConcurrentTask(ExecutorService threadPool, int noteNum, Map notes, 
String pathPattern) {

Review Comment:
   ```suggestion
       public ConcurrentTask(ExecutorService threadPool, int noteNum, 
Map<Integer, String> notes, String pathPattern) {
   ```



##########
zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/NoteManager.java:
##########
@@ -181,7 +182,9 @@ public void saveNote(Note note, AuthenticationInfo subject) 
throws IOException {
     } else {
       addOrUpdateNoteNode(new NoteInfo(note));
       noteCache.putNote(note);
-      this.notebookRepo.save(note, subject);
+      synchronized (this) {

Review Comment:
   A comment at this point why a `synchronized` has to be used here would be 
very helpful.



##########
zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NoteManagerTest.java:
##########
@@ -158,4 +166,117 @@ public void testLruCache() throws IOException {
     assertFalse(noteManager.containsNote(noteNew2.getPath()));
     assertEquals(cacheThreshold, noteManager.getCacheSize());
   }
+
+  @Test
+  public void testConcurrentOperation() throws Exception {
+    int threshold = 10, noteNum = 150;
+    Map<Integer, String> notes = new ConcurrentHashMap<>();
+    ExecutorService threadPool = Executors.newFixedThreadPool(threshold);
+    // Save note concurrently
+    ConcurrentTask saveNote = new ConcurrentTaskSaveNote(threadPool, noteNum, 
notes, "/prod/note%s");
+    saveNote.exec();
+    // Move note concurrently
+    ConcurrentTask moveNote = new ConcurrentTaskMoveNote(threadPool, noteNum, 
notes, "/dev/project_%s/my_note%s");
+    moveNote.exec();
+    // Move folder concurrently
+    ConcurrentTask moveFolder = new ConcurrentTaskMoveFolder(threadPool, 
noteNum, notes, "/staging/note_%s/my_note%s");
+    moveFolder.exec();
+    // Remove note concurrently
+    ConcurrentTask removeNote = new ConcurrentTaskRemoveNote(threadPool, 
noteNum, notes, null);
+    removeNote.exec();
+    threadPool.shutdown();
+  }
+
+  abstract class ConcurrentTask {
+    private ExecutorService threadPool;
+    private int noteNum;
+    private Map<Integer, String> notes;
+    private String pathPattern;
+
+    public ConcurrentTask(ExecutorService threadPool, int noteNum, Map notes, 
String pathPattern) {
+      this.threadPool = threadPool;
+      this.noteNum = noteNum;
+      this.notes = notes;
+      this.pathPattern = pathPattern;
+    }
+
+    public abstract void run(int index) throws IOException;
+
+    public void exec() throws Exception {
+      // Simulate concurrent operation
+      CountDownLatch latch = new CountDownLatch(noteNum);
+      for (int i = 0; i < noteNum; i++) {
+        int index = i;
+        threadPool.execute(() -> {
+          try {
+            this.run(index);
+            latch.countDown();
+          } catch (IOException e) {
+            e.printStackTrace();
+          }
+        });
+      }
+      latch.await(); //wait till all tasks are completed

Review Comment:
   ```suggestion
         assertTrue(latch.await(5, TimeUnit.SECONDS)); //wait till all tasks 
are completed
   ```
   An additional comment at this point would be good.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@zeppelin.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to