This is an automated email from the ASF dual-hosted git repository.

gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new e78f648023 SeekableStreamSupervisor: Don't enqueue duplicate notices. 
(#13334)
e78f648023 is described below

commit e78f648023466ff3edd8482d6b67c211c5ade0e5
Author: Gian Merlino <[email protected]>
AuthorDate: Fri Nov 11 01:54:01 2022 -0800

    SeekableStreamSupervisor: Don't enqueue duplicate notices. (#13334)
    
    * SeekableStreamSupervisor: Don't enqueue duplicate notices.
    
    Similar goal to #12018, but more aggressive. Don't enqueue a notice at
    all if it is equal to one currently in the queue.
    
    * Adjustments from review.
    
    * Update 
indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/NoticesQueueTest.java
    
    Co-authored-by: Kashif Faraz <[email protected]>
    
    Co-authored-by: Kashif Faraz <[email protected]>
---
 .../seekablestream/supervisor/NoticesQueue.java    | 82 ++++++++++++++++++++++
 .../supervisor/SeekableStreamSupervisor.java       | 36 ++++------
 .../overlord/supervisor/NoticesQueueTest.java      | 81 +++++++++++++++++++++
 3 files changed, 176 insertions(+), 23 deletions(-)

diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/NoticesQueue.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/NoticesQueue.java
new file mode 100644
index 0000000000..49bf71fb25
--- /dev/null
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/NoticesQueue.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.seekablestream.supervisor;
+
+import com.google.common.base.Preconditions;
+import com.google.errorprone.annotations.concurrent.GuardedBy;
+
+import javax.annotation.Nullable;
+import java.util.Iterator;
+import java.util.LinkedHashSet;
+
+/**
+ * Queue that de-duplicates items on addition using {@link Object#equals}.
+ */
+public class NoticesQueue<T>
+{
+  @GuardedBy("this")
+  private final LinkedHashSet<T> queue = new LinkedHashSet<>();
+
+  /**
+   * Adds an item. Throws {@link NullPointerException} if the item is null.
+   */
+  public void add(final T item)
+  {
+    Preconditions.checkNotNull(item, "item");
+
+    synchronized (this) {
+      queue.add(item);
+      this.notifyAll();
+    }
+  }
+
+  /**
+   * Retrieves the head of the queue (eldest item). Returns null if the queue 
is empty and the timeout has elapsed.
+   */
+  @Nullable
+  public T poll(final long timeoutMillis) throws InterruptedException
+  {
+    synchronized (this) {
+      final long timeoutAt = System.currentTimeMillis() + timeoutMillis;
+
+      long waitMillis = timeoutMillis;
+      while (queue.isEmpty() && waitMillis > 0) {
+        wait(waitMillis);
+        waitMillis = timeoutAt - System.currentTimeMillis();
+      }
+
+      final Iterator<T> it = queue.iterator();
+      if (it.hasNext()) {
+        final T item = it.next();
+        it.remove();
+        return item;
+      } else {
+        return null;
+      }
+    }
+  }
+
+  public int size()
+  {
+    synchronized (this) {
+      return queue.size();
+    }
+  }
+}
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
index d96b6b17e9..6f00ef7fbf 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
@@ -112,13 +112,11 @@ import java.util.Set;
 import java.util.SortedMap;
 import java.util.TreeMap;
 import java.util.TreeSet;
-import java.util.concurrent.BlockingDeque;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
@@ -306,15 +304,6 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
     String getType();
 
     void handle() throws ExecutionException, InterruptedException, 
TimeoutException;
-
-    /**
-     * Whether this notice can also handle the work of another notice. Used to 
coalesce notices and avoid
-     * redundant work.
-     */
-    default boolean canAlsoHandle(Notice otherNotice)
-    {
-      return false;
-    }
   }
 
   private static class StatsFromTaskResult
@@ -383,7 +372,7 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
     }
   }
 
-  private class RunNotice implements Notice
+  private final class RunNotice implements Notice
   {
     private static final String TYPE = "run_notice";
 
@@ -406,9 +395,16 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
     }
 
     @Override
-    public boolean canAlsoHandle(Notice otherNotice)
+    public int hashCode()
     {
-      return otherNotice.getType().equals(TYPE);
+      return 0;
+    }
+
+    @Override
+    public boolean equals(Object obj)
+    {
+      // All RunNotices are the same. They are de-duplicated on insertion into 
the NoticesQueue "notices".
+      return obj != null && obj.getClass().equals(RunNotice.class);
     }
   }
 
@@ -725,7 +721,7 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
   private final ScheduledExecutorService scheduledExec;
   private final ScheduledExecutorService reportingExec;
   private final ListeningExecutorService workerExec;
-  private final BlockingDeque<Notice> notices = new LinkedBlockingDeque<>();
+  private final NoticesQueue<Notice> notices = new NoticesQueue<>();
   private final Object stopLock = new Object();
   private final Object stateChangeLock = new Object();
   private final ReentrantLock recordSupplierLock = new ReentrantLock();
@@ -1037,17 +1033,11 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
               try {
                 long pollTimeout = Math.max(ioConfig.getPeriod().getMillis(), 
MAX_RUN_FREQUENCY_MILLIS);
                 while (!Thread.currentThread().isInterrupted() && !stopped) {
-                  final Notice notice = notices.poll(pollTimeout, 
TimeUnit.MILLISECONDS);
+                  final Notice notice = notices.poll(pollTimeout);
                   if (notice == null) {
                     continue;
                   }
 
-                  // Coalesce notices.
-                  Notice nextNotice;
-                  while ((nextNotice = notices.peek()) != null && 
notice.canAlsoHandle(nextNotice)) {
-                    notices.removeFirst();
-                  }
-
                   try {
                     Instant handleNoticeStartTime = Instant.now();
                     notice.handle();
@@ -1104,7 +1094,7 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
 
   public Runnable buildDynamicAllocationTask(Callable<Integer> scaleAction)
   {
-    return () -> notices.add(new DynamicAllocationTasksNotice(scaleAction));
+    return () -> addNotice(new DynamicAllocationTasksNotice(scaleAction));
   }
 
   private Runnable buildRunTask()
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/NoticesQueueTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/NoticesQueueTest.java
new file mode 100644
index 0000000000..7cc3c07ed9
--- /dev/null
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/NoticesQueueTest.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.overlord.supervisor;
+
+import org.apache.druid.indexing.seekablestream.supervisor.NoticesQueue;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+
+public class NoticesQueueTest
+{
+  @Test
+  public void testQueue() throws InterruptedException
+  {
+    final NoticesQueue<String> queue = new NoticesQueue<>();
+
+    for (int i = 0; i < 3; i++) {
+      Assert.assertEquals(0, queue.size());
+      queue.add("xyz");
+      Assert.assertEquals(1, queue.size());
+
+      queue.add("xyz");
+      Assert.assertEquals(1, queue.size());
+
+      queue.add("foo");
+      Assert.assertEquals(2, queue.size());
+
+      queue.add("xyz");
+      Assert.assertEquals(2, queue.size());
+
+      queue.add("bar");
+      Assert.assertEquals(3, queue.size());
+
+      Assert.assertEquals("xyz", queue.poll(10));
+      Assert.assertEquals("foo", queue.poll(10));
+      Assert.assertEquals("bar", queue.poll(10));
+      Assert.assertNull(queue.poll(10));
+      Assert.assertEquals(0, queue.size());
+    }
+  }
+
+  @Test
+  public void testQueueConcurrent() throws InterruptedException, 
ExecutionException
+  {
+    final NoticesQueue<String> queue = new NoticesQueue<>();
+    final ExecutorService exec = 
Execs.singleThreaded(getClass().getSimpleName());
+
+    try {
+      final Future<String> item = exec.submit(() -> queue.poll(60_000));
+
+      // Imperfect test: ideally we "add" after "poll", but we can't tell if 
"poll" has started yet.
+      // Don't want to add a sleep, to avoid adding additional time to the 
test case, so we live with the imperfection.
+      queue.add("xyz");
+      Assert.assertEquals("xyz", item.get());
+    }
+    finally {
+      exec.shutdownNow();
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to