sijie closed pull request #838: Issue-326:  Replace observer/observable with a 
simplified watcher/watchable implementation
URL: https://github.com/apache/bookkeeper/pull/838
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/.travis.yml b/.travis.yml
index bd39e3e17..234a60077 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -27,6 +27,9 @@ matrix:
       osx_image: xcode8
     - os: linux
       env: CUSTOM_JDK="oraclejdk8"
+    - os: linux
+      dist: trusty
+      env: CUSTOM_JDK="openjdk8"
 
 before_install:
   - echo "MAVEN_OPTS='-Xmx3072m -XX:MaxPermSize=512m'" > ~/.mavenrc
diff --git a/bookkeeper-common/pom.xml b/bookkeeper-common/pom.xml
index 7726da144..ea751fb83 100644
--- a/bookkeeper-common/pom.xml
+++ b/bookkeeper-common/pom.xml
@@ -36,6 +36,11 @@
       <artifactId>guava</artifactId>
       <version>${guava.version}</version>
     </dependency>
+    <dependency>
+      <groupId>io.netty</groupId>
+      <artifactId>netty-common</artifactId>
+      <version>${netty.version}</version>
+    </dependency>
     <dependency>
       <groupId>com.google.code.findbugs</groupId>
       <artifactId>jsr305</artifactId>
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/RecyclableArrayList.java
 
b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/collections/RecyclableArrayList.java
similarity index 97%
rename from 
bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/RecyclableArrayList.java
rename to 
bookkeeper-common/src/main/java/org/apache/bookkeeper/common/collections/RecyclableArrayList.java
index 3e45c550a..7dd663feb 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/RecyclableArrayList.java
+++ 
b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/collections/RecyclableArrayList.java
@@ -15,7 +15,7 @@
  * under the License.
  */
 
-package org.apache.bookkeeper.util.collections;
+package org.apache.bookkeeper.common.collections;
 
 import io.netty.util.Recycler.Handle;
 import java.util.ArrayList;
diff --git 
a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/collections/package-info.java
 
b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/collections/package-info.java
new file mode 100644
index 000000000..0ed3c7390
--- /dev/null
+++ 
b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/collections/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Bookkeeper common collections.
+ */
+package org.apache.bookkeeper.common.collections;
diff --git 
a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/Recyclable.java
 
b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/Recyclable.java
new file mode 100644
index 000000000..edbaa34a1
--- /dev/null
+++ 
b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/Recyclable.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bookkeeper.common.util;
+
+/**
+ * An interface represents an object that is recyclable.
+ */
+public interface Recyclable {
+
+    /**
+     * Recycle the instance.
+     */
+    void recycle();
+
+}
diff --git 
a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/Watchable.java
 
b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/Watchable.java
new file mode 100644
index 000000000..a74f5f578
--- /dev/null
+++ 
b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/Watchable.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bookkeeper.common.util;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.function.Function;
+import org.apache.bookkeeper.common.collections.RecyclableArrayList;
+import org.apache.bookkeeper.common.collections.RecyclableArrayList.Recycler;
+
+/**
+ * This class represents an watchable object, or "data"
+ * in the model-view paradigm. It can be subclassed to represent an
+ * object that the application wants to have watched.
+ *
+ * <p>An watchable object can have one or more watchers. An watcher
+ * may be any object that implements interface <tt>Watcher</tt>. After an
+ * watchable instance changes, an application calling the
+ * <code>Watchable</code>'s <code>notifyWatchers</code> method
+ * causes all of its watchers to be notified of the change by a call
+ * to their <code>update</code> method.
+ *
+ * <p>A watcher is automatically removed from the watchers list once an event
+ * is fired to the watcher.
+ *
+ * <p>Note that this notification mechanism has nothing to do with threads
+ * and is completely separate from the <tt>wait</tt> and <tt>notify</tt>
+ * mechanism of class <tt>Object</tt>.
+ *
+ * <p>When an watchable object is newly created, its set of watchers is
+ * empty. If a same watcher is added multiple times to this watchable, it will
+ * receive the notifications multiple times.
+ */
+public class Watchable<T> implements Recyclable {
+
+    private final Recycler<Watcher<T>> recycler;
+    private RecyclableArrayList<Watcher<T>> watchers;
+
+    /** Construct an Watchable with zero watchers. */
+
+    public Watchable(Recycler<Watcher<T>> recycler) {
+        this.recycler = recycler;
+        this.watchers = recycler.newInstance();
+    }
+
+    synchronized int getNumWatchers() {
+        return this.watchers.size();
+    }
+
+    /**
+     * Adds an watcher to the set of watchers for this object, provided
+     * that it is not the same as some watcher already in the set.
+     * The order in which notifications will be delivered to multiple
+     * watchers is not specified. See the class comment.
+     *
+     * @param  w an watcher to be added.
+     * @return true if a watcher is added to the list successfully, otherwise 
false.
+     * @throws NullPointerException   if the parameter o is null.
+     */
+    public synchronized boolean addWatcher(Watcher<T> w) {
+        checkNotNull(w, "Null watcher is provided");
+        return watchers.add(w);
+    }
+
+    /**
+     * Deletes an watcher from the set of watcher of this object.
+     * Passing <CODE>null</CODE> to this method will have no effect.
+     * @param w the watcher to be deleted.
+     */
+    public synchronized boolean deleteWatcher(Watcher<T> w) {
+        return watchers.remove(w);
+    }
+
+    /**
+     * Notify the watchers with the update <i>value</i>.
+     *
+     * @param value value to notify
+     */
+    public <R> void notifyWatchers(Function<R, T> valueFn, R value) {
+        RecyclableArrayList<Watcher<T>> watchersLocal;
+        synchronized (this) {
+            watchersLocal = watchers;
+            watchers = recycler.newInstance();
+        }
+
+        for (Watcher<T> watcher : watchersLocal) {
+            watcher.update(valueFn.apply(value));
+        }
+        watchersLocal.recycle();
+    }
+
+    /**
+     * Clears the watcher list so that this object no longer has any watchers.
+     */
+    public synchronized void deleteWatchers() {
+        watchers.clear();
+    }
+
+    @Override
+    public synchronized void recycle() {
+        watchers.recycle();
+    }
+}
diff --git 
a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/Watcher.java
 
b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/Watcher.java
new file mode 100644
index 000000000..220a942b7
--- /dev/null
+++ 
b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/Watcher.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bookkeeper.common.util;
+
+/**
+ * A class can implement the <code>Watcher</code> interface when it
+ * wants to be informed of <i>one-time</i> changes in watchable objects.
+ */
+public interface Watcher<T> {
+
+    /**
+     * This method is called whenever the watched object is changed. An
+     * application calls an <tt>Watchable</tt> object's
+     * <code>notifyWatchers</code> method to have all the object's
+     * watchers notified of the change.
+     *
+     * @param value the updated value of a watchable
+     */
+    void update(T value);
+
+}
diff --git 
a/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/collections/RecyclableArrayListTest.java
 
b/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/collections/RecyclableArrayListTest.java
new file mode 100644
index 000000000..9037d2193
--- /dev/null
+++ 
b/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/collections/RecyclableArrayListTest.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.common.collections;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.bookkeeper.common.collections.RecyclableArrayList.Recycler;
+import org.junit.Test;
+
+/**
+ * Unit test of {@link RecyclableArrayList}.
+ */
+public class RecyclableArrayListTest {
+
+    private final Recycler<Integer> recycler;
+
+    public RecyclableArrayListTest() {
+        this.recycler = new Recycler<>();
+    }
+
+    @Test
+    public void testRecycle() {
+        RecyclableArrayList<Integer> array = recycler.newInstance();
+        for (int i = 0; i < 5; i++) {
+            array.add(i);
+        }
+        assertEquals(5, array.size());
+        array.recycle();
+        assertEquals(0, array.size());
+    }
+
+}
diff --git 
a/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/util/TestWatchable.java
 
b/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/util/TestWatchable.java
new file mode 100644
index 000000000..697d0dacd
--- /dev/null
+++ 
b/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/util/TestWatchable.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bookkeeper.common.util;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.util.function.Function;
+import org.apache.bookkeeper.common.collections.RecyclableArrayList.Recycler;
+import org.junit.After;
+import org.junit.Test;
+
+/**
+ * Unit test of {@link Watchable}.
+ */
+public class TestWatchable {
+
+    private final Recycler<Watcher<Integer>> recycler;
+    private final Watchable<Integer> watchable;
+
+    public TestWatchable() {
+        this.recycler = new Recycler<>();
+        this.watchable = new Watchable<>(recycler);
+    }
+
+    @After
+    public void teardown() {
+        this.watchable.recycle();
+    }
+
+    @Test
+    @SuppressWarnings("unchecked")
+    public void testAddWatcher() {
+        Watcher<Integer> watcher = mock(Watcher.class);
+        assertTrue(watchable.addWatcher(watcher));
+        assertEquals(1, watchable.getNumWatchers());
+
+        watchable.notifyWatchers(Function.identity(), 123);
+        verify(watcher, times(1)).update(eq(123));
+
+        // after the watcher is fired, watcher should be removed from watcher 
list.
+        assertEquals(0, watchable.getNumWatchers());
+    }
+
+    @Test
+    @SuppressWarnings("unchecked")
+    public void testDeleteWatcher() {
+        Watcher<Integer> watcher = mock(Watcher.class);
+        assertTrue(watchable.addWatcher(watcher));
+        assertEquals(1, watchable.getNumWatchers());
+        assertTrue(watchable.deleteWatcher(watcher));
+        assertEquals(0, watchable.getNumWatchers());
+
+        watchable.notifyWatchers(Function.identity(), 123);
+        verify(watcher, times(0)).update(anyInt());
+    }
+
+    @Test
+    @SuppressWarnings("unchecked")
+    public void testMultipleWatchers() {
+        Watcher<Integer> watcher1 = mock(Watcher.class);
+        Watcher<Integer> watcher2 = mock(Watcher.class);
+
+        assertTrue(watchable.addWatcher(watcher1));
+        assertTrue(watchable.addWatcher(watcher2));
+        assertEquals(2, watchable.getNumWatchers());
+
+        watchable.notifyWatchers(Function.identity(), 123);
+        verify(watcher1, times(1)).update(eq(123));
+        verify(watcher2, times(1)).update(eq(123));
+        assertEquals(0, watchable.getNumWatchers());
+    }
+
+    @Test
+    @SuppressWarnings("unchecked")
+    public void testAddWatchMultipleTimes() {
+        Watcher<Integer> watcher = mock(Watcher.class);
+
+        int numTimes = 3;
+        for (int i = 0; i < numTimes; i++) {
+            assertTrue(watchable.addWatcher(watcher));
+        }
+        assertEquals(numTimes, watchable.getNumWatchers());
+
+        watchable.notifyWatchers(Function.identity(), 123);
+        verify(watcher, times(numTimes)).update(eq(123));
+        assertEquals(0, watchable.getNumWatchers());
+    }
+
+    @Test
+    @SuppressWarnings("unchecked")
+    public void testDeleteWatchers() {
+        Watcher<Integer> watcher1 = mock(Watcher.class);
+        Watcher<Integer> watcher2 = mock(Watcher.class);
+
+        assertTrue(watchable.addWatcher(watcher1));
+        assertTrue(watchable.addWatcher(watcher2));
+        assertEquals(2, watchable.getNumWatchers());
+        watchable.deleteWatchers();
+        assertEquals(0, watchable.getNumWatchers());
+
+        watchable.notifyWatchers(Function.identity(), 123);
+        verify(watcher1, times(0)).update(anyInt());
+        verify(watcher2, times(0)).update(anyInt());
+    }
+
+}
diff --git a/bookkeeper-common/src/test/resources/log4j.properties 
b/bookkeeper-common/src/test/resources/log4j.properties
new file mode 100644
index 000000000..10ae6bfcb
--- /dev/null
+++ b/bookkeeper-common/src/test/resources/log4j.properties
@@ -0,0 +1,42 @@
+#
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#
+
+#
+# Bookkeeper Logging Configuration
+#
+
+# Format is "<default threshold> (, <appender>)+
+
+# DEFAULT: console appender only, level INFO
+bookkeeper.root.logger=INFO,CONSOLE
+log4j.rootLogger=${bookkeeper.root.logger}
+
+#
+# Log INFO level and above messages to the console
+#
+log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
+log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
+log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} - %-5p - 
[%t:%C{1}@%L] - %m%n
+
+#disable zookeeper logging
+log4j.logger.org.apache.zookeeper=OFF
+log4j.logger.org.apache.bookkeeper.bookie=INFO
+log4j.logger.org.apache.bookkeeper.meta=INFO
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
index 79cd2f930..bc6b4d242 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
@@ -53,8 +53,6 @@
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Observable;
-import java.util.Observer;
 import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
@@ -73,6 +71,7 @@
 import org.apache.bookkeeper.bookie.Journal.JournalScanner;
 import org.apache.bookkeeper.bookie.LedgerDirsManager.LedgerDirsListener;
 import 
org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException;
+import org.apache.bookkeeper.common.util.Watcher;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.discover.RegistrationManager;
 import org.apache.bookkeeper.discover.ZKRegistrationManager;
@@ -1431,10 +1430,12 @@ public long readLastAddConfirmed(long ledgerId) throws 
IOException {
         return handle.getLastAddConfirmed();
     }
 
-    public Observable waitForLastAddConfirmedUpdate(long ledgerId, long 
previoisLAC, Observer observer)
+    public boolean waitForLastAddConfirmedUpdate(long ledgerId,
+                                                 long previousLAC,
+                                                 
Watcher<LastAddConfirmedUpdateNotification> watcher)
             throws IOException {
         LedgerDescriptor handle = handles.getReadOnlyHandle(ledgerId);
-        return handle.waitForLastAddConfirmedUpdate(previoisLAC, observer);
+        return handle.waitForLastAddConfirmedUpdate(previousLAC, watcher);
     }
 
     // The rest of the code is test stuff
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java
index c14be5fe0..38edbda18 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java
@@ -22,6 +22,7 @@
 package org.apache.bookkeeper.bookie;
 
 import static com.google.common.base.Charsets.UTF_8;
+import static 
org.apache.bookkeeper.bookie.LastAddConfirmedUpdateNotification.WATCHER_RECYCLER;
 
 import com.google.common.annotations.VisibleForTesting;
 import io.netty.buffer.ByteBuf;
@@ -32,9 +33,9 @@
 import java.nio.BufferUnderflowException;
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
-import java.util.Observable;
-import java.util.Observer;
 import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.bookkeeper.common.util.Watchable;
+import org.apache.bookkeeper.common.util.Watcher;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -58,7 +59,7 @@
  * in entry loggers.
  * </p>
  */
-class FileInfo extends Observable {
+class FileInfo extends Watchable<LastAddConfirmedUpdateNotification> {
     private static final Logger LOG = LoggerFactory.getLogger(FileInfo.class);
 
     static final int NO_MASTER_KEY = -1;
@@ -93,8 +94,9 @@
     protected String mode;
 
     public FileInfo(File lf, byte[] masterKey) throws IOException {
-        this.lf = lf;
+        super(WATCHER_RECYCLER);
 
+        this.lf = lf;
         this.masterKey = masterKey;
         mode = "rw";
     }
@@ -105,10 +107,11 @@ synchronized Long getLastAddConfirmed() {
 
     long setLastAddConfirmed(long lac) {
         long lacToReturn;
+        boolean changed = false;
         synchronized (this) {
             if (null == this.lac || this.lac < lac) {
                 this.lac = lac;
-                setChanged();
+                changed = true;
             }
             lacToReturn = this.lac;
         }
@@ -116,22 +119,24 @@ long setLastAddConfirmed(long lac) {
             LOG.trace("Updating LAC {} , {}", lacToReturn, lac);
         }
 
-
-        notifyObservers(new LastAddConfirmedUpdateNotification(lacToReturn));
+        if (changed) {
+            notifyWatchers(LastAddConfirmedUpdateNotification.FUNC, 
lacToReturn);
+        }
         return lacToReturn;
     }
 
-    synchronized Observable waitForLastAddConfirmedUpdate(long previousLAC, 
Observer observe) {
+    synchronized boolean waitForLastAddConfirmedUpdate(long previousLAC,
+                                                       
Watcher<LastAddConfirmedUpdateNotification> watcher) {
         if ((null != lac && lac > previousLAC)
                 || isClosed || ((stateBits & STATE_FENCED_BIT) == 
STATE_FENCED_BIT)) {
             if (LOG.isTraceEnabled()) {
                 LOG.trace("Wait For LAC {} , {}", this.lac, previousLAC);
             }
-            return null;
+            return false;
         }
 
-        addObserver(observe);
-        return this;
+        addWatcher(watcher);
+        return true;
     }
 
     public synchronized File getLf() {
@@ -283,6 +288,7 @@ public synchronized boolean isFenced() throws IOException {
      */
     public boolean setFenced() throws IOException {
         boolean returnVal = false;
+        boolean changed = false;
         synchronized (this) {
             checkOpen(false);
             if (LOG.isDebugEnabled()) {
@@ -293,12 +299,14 @@ public boolean setFenced() throws IOException {
                 stateBits |= STATE_FENCED_BIT;
                 needFlushHeader = true;
                 synchronized (this) {
-                    setChanged();
+                    changed = true;
                 }
                 returnVal = true;
             }
         }
-        notifyObservers(new 
LastAddConfirmedUpdateNotification(Long.MAX_VALUE));
+        if (changed) {
+            notifyWatchers(LastAddConfirmedUpdateNotification.FUNC, 
Long.MAX_VALUE);
+        }
         return returnVal;
     }
 
@@ -379,20 +387,36 @@ private int readAbsolute(ByteBuffer bb, long start, 
boolean bestEffort)
      *          if set to false, the index is not forced to create.
      */
     public void close(boolean force) throws IOException {
-        synchronized (this) {
-            isClosed = true;
-            checkOpen(force, true);
-            // Any time when we force close a file, we should try to flush 
header. otherwise, we might lose fence bit.
-            if (force) {
-                flushHeader();
+        boolean closing = false;
+        try {
+            boolean changed = false;
+            synchronized (this) {
+                if (isClosed) {
+                    return;
+                }
+                isClosed = true;
+                closing = true;
+                checkOpen(force, true);
+                // Any time when we force close a file, we should try to flush 
header.
+                // otherwise, we might lose fence bit.
+                if (force) {
+                    flushHeader();
+                }
+                changed = true;
+                if (useCount.get() == 0 && fc != null) {
+                    fc.close();
+                    fc = null;
+                }
             }
-            setChanged();
-            if (useCount.get() == 0 && fc != null) {
-                fc.close();
-                fc = null;
+            if (changed) {
+                notifyWatchers(LastAddConfirmedUpdateNotification.FUNC, 
Long.MAX_VALUE);
+            }
+        } finally {
+            if (closing) {
+                // recycle this watchable after the FileInfo is closed.
+                recycle();
             }
         }
-        notifyObservers(new 
LastAddConfirmedUpdateNotification(Long.MAX_VALUE));
     }
 
     public synchronized long write(ByteBuffer[] buffs, long position) throws 
IOException {
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java
index 3120fa4a5..8598bc4b6 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java
@@ -31,7 +31,6 @@
 import com.google.common.cache.RemovalListener;
 import com.google.common.cache.RemovalNotification;
 import com.google.common.util.concurrent.UncheckedExecutionException;
-
 import io.netty.buffer.ByteBuf;
 import java.io.File;
 import java.io.IOException;
@@ -40,17 +39,15 @@
 import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
-import java.util.Observable;
-import java.util.Observer;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
-
 import org.apache.bookkeeper.bookie.LedgerDirsManager.LedgerDirsListener;
 import 
org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException;
+import org.apache.bookkeeper.common.util.Watcher;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.stats.Counter;
 import org.apache.bookkeeper.stats.Gauge;
@@ -435,11 +432,13 @@ Long getLastAddConfirmed(long ledgerId) throws 
IOException {
         }
     }
 
-    Observable waitForLastAddConfirmedUpdate(long ledgerId, long previoisLAC, 
Observer observer) throws IOException {
+    boolean waitForLastAddConfirmedUpdate(long ledgerId,
+                                          long previousLAC,
+                                          
Watcher<LastAddConfirmedUpdateNotification> watcher) throws IOException {
         FileInfo fi = null;
         try {
             fi = getFileInfo(ledgerId, null);
-            return fi.waitForLastAddConfirmedUpdate(previoisLAC, observer);
+            return fi.waitForLastAddConfirmedUpdate(previousLAC, watcher);
         } finally {
             if (null != fi) {
                 fi.release();
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
index 2b357eee4..e1c85fbfe 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
@@ -26,22 +26,18 @@
 import static 
org.apache.bookkeeper.bookie.BookKeeperServerStats.STORAGE_GET_OFFSET;
 
 import com.google.common.collect.Lists;
-
 import io.netty.buffer.ByteBuf;
-
 import java.io.File;
 import java.io.IOException;
 import java.util.Map;
 import java.util.NavigableMap;
-import java.util.Observable;
-import java.util.Observer;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.TimeUnit;
-
 import org.apache.bookkeeper.bookie.Bookie.NoLedgerException;
 import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
 import org.apache.bookkeeper.bookie.EntryLogger.EntryLogListener;
 import org.apache.bookkeeper.bookie.LedgerDirsManager.LedgerDirsListener;
+import org.apache.bookkeeper.common.util.Watcher;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.meta.LedgerManager;
 import org.apache.bookkeeper.proto.BookieProtocol;
@@ -253,9 +249,11 @@ public long getLastAddConfirmed(long ledgerId) throws 
IOException {
     }
 
     @Override
-    public Observable waitForLastAddConfirmedUpdate(long ledgerId, long 
previoisLAC, Observer observer)
+    public boolean waitForLastAddConfirmedUpdate(long ledgerId,
+                                                 long previousLAC,
+                                                 
Watcher<LastAddConfirmedUpdateNotification> watcher)
             throws IOException {
-        return ledgerCache.waitForLastAddConfirmedUpdate(ledgerId, 
previoisLAC, observer);
+        return ledgerCache.waitForLastAddConfirmedUpdate(ledgerId, 
previousLAC, watcher);
     }
 
 
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
index faac1b443..4ea47da6a 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
@@ -42,6 +42,7 @@
 import java.util.concurrent.TimeUnit;
 
 import 
org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException;
+import org.apache.bookkeeper.common.collections.RecyclableArrayList;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
 import org.apache.bookkeeper.stats.Counter;
@@ -52,7 +53,6 @@
 import org.apache.bookkeeper.util.IOUtils;
 import org.apache.bookkeeper.util.MathUtils;
 import org.apache.bookkeeper.util.collections.GrowableArrayBlockingQueue;
-import org.apache.bookkeeper.util.collections.RecyclableArrayList;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LastAddConfirmedUpdateNotification.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LastAddConfirmedUpdateNotification.java
index a0c112d62..71cbd6144 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LastAddConfirmedUpdateNotification.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LastAddConfirmedUpdateNotification.java
@@ -20,17 +20,54 @@
  */
 package org.apache.bookkeeper.bookie;
 
+import io.netty.util.Recycler;
+import io.netty.util.Recycler.Handle;
+import java.util.function.Function;
+import lombok.Getter;
+import org.apache.bookkeeper.common.collections.RecyclableArrayList;
+import org.apache.bookkeeper.common.util.Recyclable;
+import org.apache.bookkeeper.common.util.Watcher;
+
 /**
  * A signal object is used for notifying the observers when the {@code 
LastAddConfirmed} is advanced.
  *
  * <p>The signal object contains the latest {@code LastAddConfirmed} and when 
the {@code LastAddConfirmed} is advanced.
  */
-public class LastAddConfirmedUpdateNotification {
-    public long lastAddConfirmed;
-    public long timestamp;
+@Getter
+public class LastAddConfirmedUpdateNotification implements Recyclable {
+
+    public static final Function<Long, LastAddConfirmedUpdateNotification> 
FUNC = lac -> of(lac);
+
+    public static final 
RecyclableArrayList.Recycler<Watcher<LastAddConfirmedUpdateNotification>> 
WATCHER_RECYCLER =
+        new RecyclableArrayList.Recycler<>();
+
+    public static LastAddConfirmedUpdateNotification of(long lastAddConfirmed) 
{
+        LastAddConfirmedUpdateNotification lac = RECYCLER.get();
+        lac.lastAddConfirmed = lastAddConfirmed;
+        lac.timestamp = System.currentTimeMillis();
+        return lac;
+    }
+
+    private static final Recycler<LastAddConfirmedUpdateNotification> RECYCLER 
=
+        new Recycler<LastAddConfirmedUpdateNotification>() {
+            @Override
+            protected LastAddConfirmedUpdateNotification 
newObject(Handle<LastAddConfirmedUpdateNotification> handle) {
+                return new LastAddConfirmedUpdateNotification(handle);
+            }
+        };
+
+    private final Handle<LastAddConfirmedUpdateNotification> handle;
+    private long lastAddConfirmed;
+    private long timestamp;
+
+    public 
LastAddConfirmedUpdateNotification(Handle<LastAddConfirmedUpdateNotification> 
handle) {
+        this.handle = handle;
+    }
 
-    public LastAddConfirmedUpdateNotification(long lastAddConfirmed) {
-        this.lastAddConfirmed = lastAddConfirmed;
-        this.timestamp = System.currentTimeMillis();
+    @Override
+    public void recycle() {
+        this.lastAddConfirmed = -1L;
+        this.timestamp = -1L;
+        handle.recycle(this);
     }
 }
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCache.java 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCache.java
index 26d5245a1..14d48255d 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCache.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCache.java
@@ -24,8 +24,7 @@
 import io.netty.buffer.ByteBuf;
 import java.io.Closeable;
 import java.io.IOException;
-import java.util.Observable;
-import java.util.Observer;
+import org.apache.bookkeeper.common.util.Watcher;
 
 /**
  * This class maps a ledger entry number into a location (entrylogid, offset) 
in
@@ -49,7 +48,9 @@
 
     Long getLastAddConfirmed(long ledgerId) throws IOException;
     long updateLastAddConfirmed(long ledgerId, long lac) throws IOException;
-    Observable waitForLastAddConfirmedUpdate(long ledgerId, long previoisLAC, 
Observer observer) throws IOException;
+    boolean waitForLastAddConfirmedUpdate(long ledgerId,
+                                          long previousLAC,
+                                          
Watcher<LastAddConfirmedUpdateNotification> watcher) throws IOException;
 
     void deleteLedger(long ledgerId) throws IOException;
 
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheImpl.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheImpl.java
index 0b63e3266..1db7d4704 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheImpl.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheImpl.java
@@ -23,8 +23,7 @@
 
 import io.netty.buffer.ByteBuf;
 import java.io.IOException;
-import java.util.Observable;
-import java.util.Observer;
+import org.apache.bookkeeper.common.util.Watcher;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
@@ -85,12 +84,13 @@ public long updateLastAddConfirmed(long ledgerId, long lac) 
throws IOException {
     }
 
     @Override
-    public Observable waitForLastAddConfirmedUpdate(long ledgerId, long 
previoisLAC, Observer observer)
+    public boolean waitForLastAddConfirmedUpdate(long ledgerId,
+                                                 long previousLAC,
+                                                 
Watcher<LastAddConfirmedUpdateNotification> watcher)
             throws IOException {
-        return indexPersistenceManager.waitForLastAddConfirmedUpdate(ledgerId, 
previoisLAC, observer);
+        return indexPersistenceManager.waitForLastAddConfirmedUpdate(ledgerId, 
previousLAC, watcher);
     }
 
-
     @Override
     public void putEntryOffset(long ledger, long entry, long offset) throws 
IOException {
         indexPageManager.putEntryOffset(ledger, entry, offset);
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptor.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptor.java
index 970cec088..f4db7a689 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptor.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptor.java
@@ -27,8 +27,7 @@
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 import java.io.IOException;
-import java.util.Observable;
-import java.util.Observer;
+import org.apache.bookkeeper.common.util.Watcher;
 
 /**
  * Implements a ledger inside a bookie. In particular, it implements operations
@@ -78,7 +77,9 @@ static ByteBuf createLedgerFenceEntry(Long ledgerId) {
     abstract ByteBuf readEntry(long entryId) throws IOException;
 
     abstract long getLastAddConfirmed() throws IOException;
-    abstract Observable waitForLastAddConfirmedUpdate(long previoisLAC, 
Observer observer) throws IOException;
+    abstract boolean waitForLastAddConfirmedUpdate(long previousLAC,
+                                                   
Watcher<LastAddConfirmedUpdateNotification> watcher)
+        throws IOException;
 
     abstract void setExplicitLac(ByteBuf entry) throws IOException;
 
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptorImpl.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptorImpl.java
index af55f6a73..d126c4cc0 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptorImpl.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptorImpl.java
@@ -25,9 +25,8 @@
 import io.netty.buffer.ByteBuf;
 import java.io.IOException;
 import java.util.Arrays;
-import java.util.Observable;
-import java.util.Observer;
 import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.bookkeeper.common.util.Watcher;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -156,7 +155,8 @@ long getLastAddConfirmed() throws IOException {
     }
 
     @Override
-    Observable waitForLastAddConfirmedUpdate(long previousLAC, Observer 
observer) throws IOException {
-        return ledgerStorage.waitForLastAddConfirmedUpdate(ledgerId, 
previousLAC, observer);
+    boolean waitForLastAddConfirmedUpdate(long previousLAC,
+                                          
Watcher<LastAddConfirmedUpdateNotification> watcher) throws IOException {
+        return ledgerStorage.waitForLastAddConfirmedUpdate(ledgerId, 
previousLAC, watcher);
     }
 }
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java
index a0553a34a..ac0df001b 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java
@@ -23,9 +23,8 @@
 
 import io.netty.buffer.ByteBuf;
 import java.io.IOException;
-import java.util.Observable;
-import java.util.Observer;
 import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
+import org.apache.bookkeeper.common.util.Watcher;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.meta.LedgerManager;
 import org.apache.bookkeeper.stats.StatsLogger;
@@ -119,12 +118,14 @@ void initialize(ServerConfiguration conf,
     /**
      * Wait for last add confirmed update.
      *
-     * @param previoisLAC - The threshold beyond which we would wait for the 
update
-     * @param observer  - Observer to notify on update
+     * @param previousLAC - The threshold beyond which we would wait for the 
update
+     * @param watcher  - Watcher to notify on update
      * @return
      * @throws IOException
      */
-    Observable waitForLastAddConfirmedUpdate(long ledgerId, long previoisLAC, 
Observer observer) throws IOException;
+    boolean waitForLastAddConfirmedUpdate(long ledgerId,
+                                          long previousLAC,
+                                          
Watcher<LastAddConfirmedUpdateNotification> watcher) throws IOException;
 
     /**
      * Flushes all data in the storage. Once this is called,
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/LongPollReadEntryProcessorV3.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/LongPollReadEntryProcessorV3.java
index 163817480..97bc08b58 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/LongPollReadEntryProcessorV3.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/LongPollReadEntryProcessorV3.java
@@ -19,22 +19,17 @@
 
 import com.google.common.base.Optional;
 import com.google.common.base.Stopwatch;
-
 import io.netty.channel.Channel;
 import io.netty.util.HashedWheelTimer;
 import io.netty.util.Timeout;
-import io.netty.util.TimerTask;
-
 import java.io.IOException;
-import java.util.Observable;
-import java.util.Observer;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.TimeUnit;
-
 import org.apache.bookkeeper.bookie.Bookie;
 import org.apache.bookkeeper.bookie.LastAddConfirmedUpdateNotification;
+import org.apache.bookkeeper.common.util.Watcher;
 import org.apache.bookkeeper.proto.BookkeeperProtocol.ReadResponse;
 import org.apache.bookkeeper.proto.BookkeeperProtocol.Request;
 import org.apache.bookkeeper.proto.BookkeeperProtocol.StatusCode;
@@ -44,7 +39,7 @@
 /**
  * Processor handling long poll read entry request.
  */
-class LongPollReadEntryProcessorV3 extends ReadEntryProcessorV3 implements 
Observer {
+class LongPollReadEntryProcessorV3 extends ReadEntryProcessorV3 implements 
Watcher<LastAddConfirmedUpdateNotification> {
 
     private static final Logger logger = 
LoggerFactory.getLogger(LongPollReadEntryProcessorV3.class);
 
@@ -145,9 +140,9 @@ private ReadResponse getLongPollReadResponse() {
 
             final Stopwatch startTimeSw = Stopwatch.createStarted();
 
-            final Observable observable;
+            final boolean watched;
             try {
-                observable = 
requestProcessor.bookie.waitForLastAddConfirmedUpdate(ledgerId, previousLAC, 
this);
+                watched = 
requestProcessor.bookie.waitForLastAddConfirmedUpdate(ledgerId, previousLAC, 
this);
             } catch (Bookie.NoLedgerException e) {
                 logger.info("No ledger found while longpoll reading ledger {}, 
previous lac = {}.",
                         ledgerId, previousLAC);
@@ -161,19 +156,16 @@ private ReadResponse getLongPollReadResponse() {
             registerSuccessfulEvent(requestProcessor.longPollPreWaitStats, 
startTimeSw);
             lastPhaseStartTime.reset().start();
 
-            if (null != observable) {
-                // successfully registered observable to lac updates
+            if (watched) {
+                // successfully registered watcher to lac updates
                 if (logger.isTraceEnabled()) {
                     logger.trace("Waiting For LAC Update {}: Timeout {}", 
previousLAC, readRequest.getTimeOut());
                 }
                 synchronized (this) {
-                    expirationTimerTask = requestTimer.newTimeout(new 
TimerTask() {
-                        @Override
-                        public void run(Timeout timeout) throws Exception {
-                            // When the timeout expires just get whatever is 
the current
-                            // readLastConfirmed
-                            
LongPollReadEntryProcessorV3.this.scheduleDeferredRead(observable, true);
-                        }
+                    expirationTimerTask = requestTimer.newTimeout(timeout -> {
+                        // When the timeout expires just get whatever is the 
current
+                        // readLastConfirmed
+                        
LongPollReadEntryProcessorV3.this.scheduleDeferredRead(true);
                     }, readRequest.getTimeOut(), TimeUnit.MILLISECONDS);
                 }
                 return null;
@@ -192,26 +184,25 @@ protected void executeOp() {
     }
 
     @Override
-    public void update(Observable observable, Object o) {
-        LastAddConfirmedUpdateNotification newLACNotification = 
(LastAddConfirmedUpdateNotification) o;
-        if (newLACNotification.lastAddConfirmed > previousLAC) {
-            if (newLACNotification.lastAddConfirmed != Long.MAX_VALUE && 
!lastAddConfirmedUpdateTime.isPresent()) {
-                lastAddConfirmedUpdateTime = 
Optional.of(newLACNotification.timestamp);
+    public void update(LastAddConfirmedUpdateNotification newLACNotification) {
+        if (newLACNotification.getLastAddConfirmed() > previousLAC) {
+            if (newLACNotification.getLastAddConfirmed() != Long.MAX_VALUE && 
!lastAddConfirmedUpdateTime.isPresent()) {
+                lastAddConfirmedUpdateTime = 
Optional.of(newLACNotification.getTimestamp());
             }
             if (logger.isTraceEnabled()) {
                 logger.trace("Last Add Confirmed Advanced to {} for request 
{}",
-                        newLACNotification.lastAddConfirmed, request);
+                        newLACNotification.getLastAddConfirmed(), request);
             }
-            scheduleDeferredRead(observable, false);
+            scheduleDeferredRead(false);
         }
+        newLACNotification.recycle();
     }
 
-    private synchronized void scheduleDeferredRead(Observable observable, 
boolean timeout) {
+    private synchronized void scheduleDeferredRead(boolean timeout) {
         if (null == deferredTask) {
             if (logger.isTraceEnabled()) {
                 logger.trace("Deferred Task, expired: {}, request: {}", 
timeout, request);
             }
-            observable.deleteObserver(this);
             try {
                 shouldReadEntry = true;
                 deferredTask = longPollThreadPool.submit(this);
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/IndexPersistenceMgrTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/IndexPersistenceMgrTest.java
index d729be4e1..974bde296 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/IndexPersistenceMgrTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/IndexPersistenceMgrTest.java
@@ -28,9 +28,7 @@
 import static org.junit.Assert.fail;
 
 import java.io.File;
-import java.util.Observable;
-import java.util.Observer;
-
+import org.apache.bookkeeper.common.util.Watcher;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.util.DiskChecker;
@@ -239,29 +237,27 @@ public void testReadFileInfoCacheEviction() throws 
Exception {
     @Test
     public void testEvictionShouldNotAffectLongPollRead() throws Exception {
         IndexPersistenceMgr indexPersistenceMgr = null;
-        Observer observer = (obs, obj) -> {
-            //no-ops
-        };
+        Watcher<LastAddConfirmedUpdateNotification> watcher = notification -> 
notification.recycle();
         try {
             indexPersistenceMgr = createIndexPersistenceManager(1);
             indexPersistenceMgr.getFileInfo(lid, masterKey);
             indexPersistenceMgr.getFileInfo(lid, null);
             indexPersistenceMgr.updateLastAddConfirmed(lid, 1);
-            Observable observable = 
indexPersistenceMgr.waitForLastAddConfirmedUpdate(lid, 1, observer);
-            // observer shouldn't be null because ledger is not evicted or 
closed
-            assertNotNull("Observer should not be null", observable);
+            // watch should succeed because ledger is not evicted or closed
+            assertTrue(
+                indexPersistenceMgr.waitForLastAddConfirmedUpdate(lid, 1, 
watcher));
             // now evict ledger 1 from write cache
             indexPersistenceMgr.getFileInfo(lid + 1, masterKey);
-            observable = 
indexPersistenceMgr.waitForLastAddConfirmedUpdate(lid, 1, observer);
-            // even if ledger 1 is evicted from write cache, observer still 
shouldn't be null
-            assertNotNull("Observer should not be null", observable);
+            // even if ledger 1 is evicted from write cache, watcher should 
still succeed
+            assertTrue(
+                indexPersistenceMgr.waitForLastAddConfirmedUpdate(lid, 1, 
watcher));
             // now evict ledger 1 from read cache
             indexPersistenceMgr.getFileInfo(lid + 2, masterKey);
             indexPersistenceMgr.getFileInfo(lid + 2, null);
-            // even if ledger 1 is evicted from both cache, observer still 
shouldn't be null because it
+            // even if ledger 1 is evicted from both cache, watcher should 
still succeed because it
             // will create a new FileInfo when cache miss
-            observable = 
indexPersistenceMgr.waitForLastAddConfirmedUpdate(lid, 1, observer);
-            assertNotNull("Observer should not be null", observable);
+            assertTrue(
+                indexPersistenceMgr.waitForLastAddConfirmedUpdate(lid, 1, 
watcher));
         } finally {
             if (null != indexPersistenceMgr) {
                 indexPersistenceMgr.close();
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LastAddConfirmedUpdateNotificationTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LastAddConfirmedUpdateNotificationTest.java
new file mode 100644
index 000000000..90da463b6
--- /dev/null
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LastAddConfirmedUpdateNotificationTest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bookkeeper.bookie;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import org.junit.Test;
+
+/**
+ * Unit test of {@link LastAddConfirmedUpdateNotification}.
+ */
+public class LastAddConfirmedUpdateNotificationTest {
+
+    @Test
+    public void testGetters() {
+        long lac = System.currentTimeMillis();
+        LastAddConfirmedUpdateNotification notification = 
LastAddConfirmedUpdateNotification.of(lac);
+
+        long timestamp = System.currentTimeMillis();
+        assertEquals(lac, notification.getLastAddConfirmed());
+        assertTrue(notification.getTimestamp() <= timestamp);
+
+        notification.recycle();
+    }
+
+    @Test
+    public void testRecycle() {
+        long lac = System.currentTimeMillis();
+        LastAddConfirmedUpdateNotification notification = 
LastAddConfirmedUpdateNotification.of(lac);
+        notification.recycle();
+
+        assertEquals(-1L, notification.getLastAddConfirmed());
+        assertEquals(-1L, notification.getTimestamp());
+    }
+
+    @Test
+    public void testFunc() {
+        long lac = System.currentTimeMillis();
+        LastAddConfirmedUpdateNotification notification = 
LastAddConfirmedUpdateNotification.FUNC.apply(lac);
+
+        assertEquals(lac, notification.getLastAddConfirmed());
+    }
+
+}
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSyncThread.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSyncThread.java
index b2bc916a1..ea60aa839 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSyncThread.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSyncThread.java
@@ -26,11 +26,8 @@
 import static org.junit.Assert.fail;
 
 import io.netty.buffer.ByteBuf;
-
 import java.io.File;
 import java.io.IOException;
-import java.util.Observable;
-import java.util.Observer;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -38,10 +35,10 @@
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
-
 import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
 import org.apache.bookkeeper.bookie.LedgerDirsManager.LedgerDirsListener;
 import 
org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException;
+import org.apache.bookkeeper.common.util.Watcher;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.conf.TestBKConfiguration;
 import org.apache.bookkeeper.meta.LedgerManager;
@@ -348,9 +345,11 @@ public ByteBuf getExplicitLac(long ledgerId) {
         }
 
         @Override
-        public Observable waitForLastAddConfirmedUpdate(long ledgerId, long 
previoisLAC, Observer observer)
+        public boolean waitForLastAddConfirmedUpdate(long ledgerId,
+                                                     long previousLAC,
+                                                     
Watcher<LastAddConfirmedUpdateNotification> watcher)
                 throws IOException {
-            return null;
+            return false;
         }
 
         @Override
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java
index f74289d59..5c234d174 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java
@@ -21,7 +21,6 @@
 
 package org.apache.bookkeeper.meta;
 
-
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
@@ -37,8 +36,6 @@
 import java.util.List;
 import java.util.Map;
 import java.util.NavigableMap;
-import java.util.Observable;
-import java.util.Observer;
 import java.util.Queue;
 import java.util.Random;
 import java.util.Set;
@@ -55,11 +52,13 @@
 import org.apache.bookkeeper.bookie.EntryLocation;
 import org.apache.bookkeeper.bookie.EntryLogger;
 import org.apache.bookkeeper.bookie.GarbageCollector;
+import org.apache.bookkeeper.bookie.LastAddConfirmedUpdateNotification;
 import org.apache.bookkeeper.bookie.LedgerDirsManager;
 import org.apache.bookkeeper.bookie.ScanAndCompareGarbageCollector;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper.DigestType;
 import org.apache.bookkeeper.client.LedgerMetadata;
+import org.apache.bookkeeper.common.util.Watcher;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.meta.LedgerManager.LedgerRange;
 import org.apache.bookkeeper.meta.LedgerManager.LedgerRangeIterator;
@@ -425,9 +424,11 @@ public void flushEntriesLocationsIndex() throws 
IOException {
         }
 
         @Override
-        public Observable waitForLastAddConfirmedUpdate(long ledgerId, long 
previoisLAC, Observer observer)
+        public boolean waitForLastAddConfirmedUpdate(long ledgerId,
+                                                     long previousLAC,
+                                                     
Watcher<LastAddConfirmedUpdateNotification> watcher)
                 throws IOException {
-            return null;
+            return false;
         }
     }
 }
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java
index cbd504b3b..ea02a3055 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java
@@ -27,8 +27,6 @@
 import java.util.Collection;
 import java.util.Map;
 import java.util.NavigableMap;
-import java.util.Observable;
-import java.util.Observer;
 import org.apache.bookkeeper.bookie.BookieException;
 import org.apache.bookkeeper.bookie.CheckpointSource;
 import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
@@ -36,7 +34,9 @@
 import org.apache.bookkeeper.bookie.CompactableLedgerStorage;
 import org.apache.bookkeeper.bookie.EntryLocation;
 import org.apache.bookkeeper.bookie.EntryLogger;
+import org.apache.bookkeeper.bookie.LastAddConfirmedUpdateNotification;
 import org.apache.bookkeeper.bookie.LedgerDirsManager;
+import org.apache.bookkeeper.common.util.Watcher;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
@@ -213,9 +213,11 @@ public void flushEntriesLocationsIndex() throws 
IOException {
         }
 
         @Override
-        public Observable waitForLastAddConfirmedUpdate(long ledgerId, long 
previoisLAC, Observer observer)
+        public boolean waitForLastAddConfirmedUpdate(long ledgerId,
+                                                     long previousLAC,
+                                                     
Watcher<LastAddConfirmedUpdateNotification> watcher)
                 throws IOException {
-            return null;
+            return false;
         }
 
         @Override


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to