This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 3ba7050 ISSUE #326: Replace observer/observable with a simplified
watcher/watchable implementation
3ba7050 is described below
commit 3ba705057ecafb0529676603ae1d2c3f266362a0
Author: Sijie Guo <[email protected]>
AuthorDate: Fri Dec 15 00:43:44 2017 -0800
ISSUE #326: Replace observer/observable with a simplified watcher/watchable
implementation
Descriptions of the changes in this PR:
- long poll only need one-time notification on lac updates
- replace observer/observable with a simplified watcher/watchable
implementation. watchers are removed after they are fired.
- add `RecycableHashSet` for `Watchable` keeping the list of watchers
- object pooling on `Watchable` and `LastAddConfirmedUpdateNotification`
Author: Sijie Guo <[email protected]>
Reviewers: Ivan Kelly <[email protected]>, Enrico Olivelli
<[email protected]>
This closes #838 from sijie/rxjava, closes #326
---
.travis.yml | 3 +
bookkeeper-common/pom.xml | 5 +
.../common}/collections/RecyclableArrayList.java | 2 +-
.../common/collections/package-info.java | 19 +--
.../apache/bookkeeper/common/util/Recyclable.java | 24 ++--
.../apache/bookkeeper/common/util/Watchable.java | 120 +++++++++++++++++++
.../org/apache/bookkeeper/common/util/Watcher.java | 30 ++---
.../collections/RecyclableArrayListTest.java | 38 +++---
.../bookkeeper/common/util/TestWatchable.java | 129 +++++++++++++++++++++
.../src/test/resources/log4j.properties | 42 +++++++
.../java/org/apache/bookkeeper/bookie/Bookie.java | 9 +-
.../org/apache/bookkeeper/bookie/FileInfo.java | 72 ++++++++----
.../bookkeeper/bookie/IndexPersistenceMgr.java | 11 +-
.../bookie/InterleavedLedgerStorage.java | 12 +-
.../java/org/apache/bookkeeper/bookie/Journal.java | 2 +-
.../bookie/LastAddConfirmedUpdateNotification.java | 49 +++++++-
.../org/apache/bookkeeper/bookie/LedgerCache.java | 7 +-
.../apache/bookkeeper/bookie/LedgerCacheImpl.java | 10 +-
.../apache/bookkeeper/bookie/LedgerDescriptor.java | 7 +-
.../bookkeeper/bookie/LedgerDescriptorImpl.java | 8 +-
.../apache/bookkeeper/bookie/LedgerStorage.java | 11 +-
.../proto/LongPollReadEntryProcessorV3.java | 45 +++----
.../bookkeeper/bookie/IndexPersistenceMgrTest.java | 26 ++---
.../LastAddConfirmedUpdateNotificationTest.java | 62 ++++++++++
.../apache/bookkeeper/bookie/TestSyncThread.java | 11 +-
.../org/apache/bookkeeper/meta/GcLedgersTest.java | 11 +-
.../bookkeeper/meta/LedgerManagerTestCase.java | 10 +-
27 files changed, 592 insertions(+), 183 deletions(-)
diff --git a/.travis.yml b/.travis.yml
index bd39e3e..234a600 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -27,6 +27,9 @@ matrix:
osx_image: xcode8
- os: linux
env: CUSTOM_JDK="oraclejdk8"
+ - os: linux
+ dist: trusty
+ env: CUSTOM_JDK="openjdk8"
before_install:
- echo "MAVEN_OPTS='-Xmx3072m -XX:MaxPermSize=512m'" > ~/.mavenrc
diff --git a/bookkeeper-common/pom.xml b/bookkeeper-common/pom.xml
index 7726da1..ea751fb 100644
--- a/bookkeeper-common/pom.xml
+++ b/bookkeeper-common/pom.xml
@@ -37,6 +37,11 @@
<version>${guava.version}</version>
</dependency>
<dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-common</artifactId>
+ <version>${netty.version}</version>
+ </dependency>
+ <dependency>
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
<version>${google.code.version}</version>
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/RecyclableArrayList.java
b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/collections/RecyclableArrayList.java
similarity index 97%
rename from
bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/RecyclableArrayList.java
rename to
bookkeeper-common/src/main/java/org/apache/bookkeeper/common/collections/RecyclableArrayList.java
index 3e45c55..7dd663f 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/RecyclableArrayList.java
+++
b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/collections/RecyclableArrayList.java
@@ -15,7 +15,7 @@
* under the License.
*/
-package org.apache.bookkeeper.util.collections;
+package org.apache.bookkeeper.common.collections;
import io.netty.util.Recycler.Handle;
import java.util.ArrayList;
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LastAddConfirmedUpdateNotification.java
b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/collections/package-info.java
similarity index 58%
copy from
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LastAddConfirmedUpdateNotification.java
copy to
bookkeeper-common/src/main/java/org/apache/bookkeeper/common/collections/package-info.java
index a0c112d..0ed3c73 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LastAddConfirmedUpdateNotification.java
+++
b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/collections/package-info.java
@@ -1,5 +1,4 @@
-/**
- *
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -16,21 +15,9 @@
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
- *
*/
-package org.apache.bookkeeper.bookie;
/**
- * A signal object is used for notifying the observers when the {@code
LastAddConfirmed} is advanced.
- *
- * <p>The signal object contains the latest {@code LastAddConfirmed} and when
the {@code LastAddConfirmed} is advanced.
+ * Bookkeeper common collections.
*/
-public class LastAddConfirmedUpdateNotification {
- public long lastAddConfirmed;
- public long timestamp;
-
- public LastAddConfirmedUpdateNotification(long lastAddConfirmed) {
- this.lastAddConfirmed = lastAddConfirmed;
- this.timestamp = System.currentTimeMillis();
- }
-}
+package org.apache.bookkeeper.common.collections;
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LastAddConfirmedUpdateNotification.java
b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/Recyclable.java
similarity index 59%
copy from
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LastAddConfirmedUpdateNotification.java
copy to
bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/Recyclable.java
index a0c112d..edbaa34 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LastAddConfirmedUpdateNotification.java
+++
b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/Recyclable.java
@@ -1,5 +1,4 @@
-/**
- *
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -16,21 +15,18 @@
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
- *
*/
-package org.apache.bookkeeper.bookie;
+
+package org.apache.bookkeeper.common.util;
/**
- * A signal object is used for notifying the observers when the {@code
LastAddConfirmed} is advanced.
- *
- * <p>The signal object contains the latest {@code LastAddConfirmed} and when
the {@code LastAddConfirmed} is advanced.
+ * An interface represents an object that is recyclable.
*/
-public class LastAddConfirmedUpdateNotification {
- public long lastAddConfirmed;
- public long timestamp;
+public interface Recyclable {
+
+ /**
+ * Recycle the instance.
+ */
+ void recycle();
- public LastAddConfirmedUpdateNotification(long lastAddConfirmed) {
- this.lastAddConfirmed = lastAddConfirmed;
- this.timestamp = System.currentTimeMillis();
- }
}
diff --git
a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/Watchable.java
b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/Watchable.java
new file mode 100644
index 0000000..a74f5f5
--- /dev/null
+++
b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/Watchable.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bookkeeper.common.util;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.function.Function;
+import org.apache.bookkeeper.common.collections.RecyclableArrayList;
+import org.apache.bookkeeper.common.collections.RecyclableArrayList.Recycler;
+
+/**
+ * This class represents an watchable object, or "data"
+ * in the model-view paradigm. It can be subclassed to represent an
+ * object that the application wants to have watched.
+ *
+ * <p>An watchable object can have one or more watchers. An watcher
+ * may be any object that implements interface <tt>Watcher</tt>. After an
+ * watchable instance changes, an application calling the
+ * <code>Watchable</code>'s <code>notifyWatchers</code> method
+ * causes all of its watchers to be notified of the change by a call
+ * to their <code>update</code> method.
+ *
+ * <p>A watcher is automatically removed from the watchers list once an event
+ * is fired to the watcher.
+ *
+ * <p>Note that this notification mechanism has nothing to do with threads
+ * and is completely separate from the <tt>wait</tt> and <tt>notify</tt>
+ * mechanism of class <tt>Object</tt>.
+ *
+ * <p>When an watchable object is newly created, its set of watchers is
+ * empty. If a same watcher is added multiple times to this watchable, it will
+ * receive the notifications multiple times.
+ */
+public class Watchable<T> implements Recyclable {
+
+ private final Recycler<Watcher<T>> recycler;
+ private RecyclableArrayList<Watcher<T>> watchers;
+
+ /** Construct an Watchable with zero watchers. */
+
+ public Watchable(Recycler<Watcher<T>> recycler) {
+ this.recycler = recycler;
+ this.watchers = recycler.newInstance();
+ }
+
+ synchronized int getNumWatchers() {
+ return this.watchers.size();
+ }
+
+ /**
+ * Adds an watcher to the set of watchers for this object, provided
+ * that it is not the same as some watcher already in the set.
+ * The order in which notifications will be delivered to multiple
+ * watchers is not specified. See the class comment.
+ *
+ * @param w an watcher to be added.
+ * @return true if a watcher is added to the list successfully, otherwise
false.
+ * @throws NullPointerException if the parameter o is null.
+ */
+ public synchronized boolean addWatcher(Watcher<T> w) {
+ checkNotNull(w, "Null watcher is provided");
+ return watchers.add(w);
+ }
+
+ /**
+ * Deletes an watcher from the set of watcher of this object.
+ * Passing <CODE>null</CODE> to this method will have no effect.
+ * @param w the watcher to be deleted.
+ */
+ public synchronized boolean deleteWatcher(Watcher<T> w) {
+ return watchers.remove(w);
+ }
+
+ /**
+ * Notify the watchers with the update <i>value</i>.
+ *
+ * @param value value to notify
+ */
+ public <R> void notifyWatchers(Function<R, T> valueFn, R value) {
+ RecyclableArrayList<Watcher<T>> watchersLocal;
+ synchronized (this) {
+ watchersLocal = watchers;
+ watchers = recycler.newInstance();
+ }
+
+ for (Watcher<T> watcher : watchersLocal) {
+ watcher.update(valueFn.apply(value));
+ }
+ watchersLocal.recycle();
+ }
+
+ /**
+ * Clears the watcher list so that this object no longer has any watchers.
+ */
+ public synchronized void deleteWatchers() {
+ watchers.clear();
+ }
+
+ @Override
+ public synchronized void recycle() {
+ watchers.recycle();
+ }
+}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LastAddConfirmedUpdateNotification.java
b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/Watcher.java
similarity index 59%
copy from
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LastAddConfirmedUpdateNotification.java
copy to
bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/Watcher.java
index a0c112d..220a942 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LastAddConfirmedUpdateNotification.java
+++
b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/Watcher.java
@@ -1,5 +1,4 @@
-/**
- *
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -16,21 +15,24 @@
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
- *
*/
-package org.apache.bookkeeper.bookie;
+
+package org.apache.bookkeeper.common.util;
/**
- * A signal object is used for notifying the observers when the {@code
LastAddConfirmed} is advanced.
- *
- * <p>The signal object contains the latest {@code LastAddConfirmed} and when
the {@code LastAddConfirmed} is advanced.
+ * A class can implement the <code>Watcher</code> interface when it
+ * wants to be informed of <i>one-time</i> changes in watchable objects.
*/
-public class LastAddConfirmedUpdateNotification {
- public long lastAddConfirmed;
- public long timestamp;
+public interface Watcher<T> {
+
+ /**
+ * This method is called whenever the watched object is changed. An
+ * application calls an <tt>Watchable</tt> object's
+ * <code>notifyWatchers</code> method to have all the object's
+ * watchers notified of the change.
+ *
+ * @param value the updated value of a watchable
+ */
+ void update(T value);
- public LastAddConfirmedUpdateNotification(long lastAddConfirmed) {
- this.lastAddConfirmed = lastAddConfirmed;
- this.timestamp = System.currentTimeMillis();
- }
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LastAddConfirmedUpdateNotification.java
b/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/collections/RecyclableArrayListTest.java
similarity index 53%
copy from
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LastAddConfirmedUpdateNotification.java
copy to
bookkeeper-common/src/test/java/org/apache/bookkeeper/common/collections/RecyclableArrayListTest.java
index a0c112d..9037d21 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LastAddConfirmedUpdateNotification.java
+++
b/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/collections/RecyclableArrayListTest.java
@@ -1,5 +1,4 @@
-/**
- *
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -16,21 +15,34 @@
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
- *
*/
-package org.apache.bookkeeper.bookie;
+package org.apache.bookkeeper.common.collections;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.bookkeeper.common.collections.RecyclableArrayList.Recycler;
+import org.junit.Test;
/**
- * A signal object is used for notifying the observers when the {@code
LastAddConfirmed} is advanced.
- *
- * <p>The signal object contains the latest {@code LastAddConfirmed} and when
the {@code LastAddConfirmed} is advanced.
+ * Unit test of {@link RecyclableArrayList}.
*/
-public class LastAddConfirmedUpdateNotification {
- public long lastAddConfirmed;
- public long timestamp;
+public class RecyclableArrayListTest {
+
+ private final Recycler<Integer> recycler;
- public LastAddConfirmedUpdateNotification(long lastAddConfirmed) {
- this.lastAddConfirmed = lastAddConfirmed;
- this.timestamp = System.currentTimeMillis();
+ public RecyclableArrayListTest() {
+ this.recycler = new Recycler<>();
}
+
+ @Test
+ public void testRecycle() {
+ RecyclableArrayList<Integer> array = recycler.newInstance();
+ for (int i = 0; i < 5; i++) {
+ array.add(i);
+ }
+ assertEquals(5, array.size());
+ array.recycle();
+ assertEquals(0, array.size());
+ }
+
}
diff --git
a/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/util/TestWatchable.java
b/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/util/TestWatchable.java
new file mode 100644
index 0000000..697d0da
--- /dev/null
+++
b/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/util/TestWatchable.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bookkeeper.common.util;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.util.function.Function;
+import org.apache.bookkeeper.common.collections.RecyclableArrayList.Recycler;
+import org.junit.After;
+import org.junit.Test;
+
+/**
+ * Unit test of {@link Watchable}.
+ */
+public class TestWatchable {
+
+ private final Recycler<Watcher<Integer>> recycler;
+ private final Watchable<Integer> watchable;
+
+ public TestWatchable() {
+ this.recycler = new Recycler<>();
+ this.watchable = new Watchable<>(recycler);
+ }
+
+ @After
+ public void teardown() {
+ this.watchable.recycle();
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testAddWatcher() {
+ Watcher<Integer> watcher = mock(Watcher.class);
+ assertTrue(watchable.addWatcher(watcher));
+ assertEquals(1, watchable.getNumWatchers());
+
+ watchable.notifyWatchers(Function.identity(), 123);
+ verify(watcher, times(1)).update(eq(123));
+
+ // after the watcher is fired, watcher should be removed from watcher
list.
+ assertEquals(0, watchable.getNumWatchers());
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testDeleteWatcher() {
+ Watcher<Integer> watcher = mock(Watcher.class);
+ assertTrue(watchable.addWatcher(watcher));
+ assertEquals(1, watchable.getNumWatchers());
+ assertTrue(watchable.deleteWatcher(watcher));
+ assertEquals(0, watchable.getNumWatchers());
+
+ watchable.notifyWatchers(Function.identity(), 123);
+ verify(watcher, times(0)).update(anyInt());
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testMultipleWatchers() {
+ Watcher<Integer> watcher1 = mock(Watcher.class);
+ Watcher<Integer> watcher2 = mock(Watcher.class);
+
+ assertTrue(watchable.addWatcher(watcher1));
+ assertTrue(watchable.addWatcher(watcher2));
+ assertEquals(2, watchable.getNumWatchers());
+
+ watchable.notifyWatchers(Function.identity(), 123);
+ verify(watcher1, times(1)).update(eq(123));
+ verify(watcher2, times(1)).update(eq(123));
+ assertEquals(0, watchable.getNumWatchers());
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testAddWatchMultipleTimes() {
+ Watcher<Integer> watcher = mock(Watcher.class);
+
+ int numTimes = 3;
+ for (int i = 0; i < numTimes; i++) {
+ assertTrue(watchable.addWatcher(watcher));
+ }
+ assertEquals(numTimes, watchable.getNumWatchers());
+
+ watchable.notifyWatchers(Function.identity(), 123);
+ verify(watcher, times(numTimes)).update(eq(123));
+ assertEquals(0, watchable.getNumWatchers());
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testDeleteWatchers() {
+ Watcher<Integer> watcher1 = mock(Watcher.class);
+ Watcher<Integer> watcher2 = mock(Watcher.class);
+
+ assertTrue(watchable.addWatcher(watcher1));
+ assertTrue(watchable.addWatcher(watcher2));
+ assertEquals(2, watchable.getNumWatchers());
+ watchable.deleteWatchers();
+ assertEquals(0, watchable.getNumWatchers());
+
+ watchable.notifyWatchers(Function.identity(), 123);
+ verify(watcher1, times(0)).update(anyInt());
+ verify(watcher2, times(0)).update(anyInt());
+ }
+
+}
diff --git a/bookkeeper-common/src/test/resources/log4j.properties
b/bookkeeper-common/src/test/resources/log4j.properties
new file mode 100644
index 0000000..10ae6bf
--- /dev/null
+++ b/bookkeeper-common/src/test/resources/log4j.properties
@@ -0,0 +1,42 @@
+#
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#
+
+#
+# Bookkeeper Logging Configuration
+#
+
+# Format is "<default threshold> (, <appender>)+
+
+# DEFAULT: console appender only, level INFO
+bookkeeper.root.logger=INFO,CONSOLE
+log4j.rootLogger=${bookkeeper.root.logger}
+
+#
+# Log INFO level and above messages to the console
+#
+log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
+log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
+log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} - %-5p -
[%t:%C{1}@%L] - %m%n
+
+#disable zookeeper logging
+log4j.logger.org.apache.zookeeper=OFF
+log4j.logger.org.apache.bookkeeper.bookie=INFO
+log4j.logger.org.apache.bookkeeper.meta=INFO
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
index 2c526db..812942d 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
@@ -53,8 +53,6 @@ import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Observable;
-import java.util.Observer;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
@@ -73,6 +71,7 @@ import
org.apache.bookkeeper.bookie.BookieException.UnknownBookieIdException;
import org.apache.bookkeeper.bookie.Journal.JournalScanner;
import org.apache.bookkeeper.bookie.LedgerDirsManager.LedgerDirsListener;
import
org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException;
+import org.apache.bookkeeper.common.util.Watcher;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.discover.RegistrationManager;
import org.apache.bookkeeper.discover.ZKRegistrationManager;
@@ -1431,10 +1430,12 @@ public class Bookie extends BookieCriticalThread {
return handle.getLastAddConfirmed();
}
- public Observable waitForLastAddConfirmedUpdate(long ledgerId, long
previoisLAC, Observer observer)
+ public boolean waitForLastAddConfirmedUpdate(long ledgerId,
+ long previousLAC,
+
Watcher<LastAddConfirmedUpdateNotification> watcher)
throws IOException {
LedgerDescriptor handle = handles.getReadOnlyHandle(ledgerId);
- return handle.waitForLastAddConfirmedUpdate(previoisLAC, observer);
+ return handle.waitForLastAddConfirmedUpdate(previousLAC, watcher);
}
@VisibleForTesting
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java
index c14be5f..38edbda 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java
@@ -22,6 +22,7 @@
package org.apache.bookkeeper.bookie;
import static com.google.common.base.Charsets.UTF_8;
+import static
org.apache.bookkeeper.bookie.LastAddConfirmedUpdateNotification.WATCHER_RECYCLER;
import com.google.common.annotations.VisibleForTesting;
import io.netty.buffer.ByteBuf;
@@ -32,9 +33,9 @@ import java.io.RandomAccessFile;
import java.nio.BufferUnderflowException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
-import java.util.Observable;
-import java.util.Observer;
import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.bookkeeper.common.util.Watchable;
+import org.apache.bookkeeper.common.util.Watcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -58,7 +59,7 @@ import org.slf4j.LoggerFactory;
* in entry loggers.
* </p>
*/
-class FileInfo extends Observable {
+class FileInfo extends Watchable<LastAddConfirmedUpdateNotification> {
private static final Logger LOG = LoggerFactory.getLogger(FileInfo.class);
static final int NO_MASTER_KEY = -1;
@@ -93,8 +94,9 @@ class FileInfo extends Observable {
protected String mode;
public FileInfo(File lf, byte[] masterKey) throws IOException {
- this.lf = lf;
+ super(WATCHER_RECYCLER);
+ this.lf = lf;
this.masterKey = masterKey;
mode = "rw";
}
@@ -105,10 +107,11 @@ class FileInfo extends Observable {
long setLastAddConfirmed(long lac) {
long lacToReturn;
+ boolean changed = false;
synchronized (this) {
if (null == this.lac || this.lac < lac) {
this.lac = lac;
- setChanged();
+ changed = true;
}
lacToReturn = this.lac;
}
@@ -116,22 +119,24 @@ class FileInfo extends Observable {
LOG.trace("Updating LAC {} , {}", lacToReturn, lac);
}
-
- notifyObservers(new LastAddConfirmedUpdateNotification(lacToReturn));
+ if (changed) {
+ notifyWatchers(LastAddConfirmedUpdateNotification.FUNC,
lacToReturn);
+ }
return lacToReturn;
}
- synchronized Observable waitForLastAddConfirmedUpdate(long previousLAC,
Observer observe) {
+ synchronized boolean waitForLastAddConfirmedUpdate(long previousLAC,
+
Watcher<LastAddConfirmedUpdateNotification> watcher) {
if ((null != lac && lac > previousLAC)
|| isClosed || ((stateBits & STATE_FENCED_BIT) ==
STATE_FENCED_BIT)) {
if (LOG.isTraceEnabled()) {
LOG.trace("Wait For LAC {} , {}", this.lac, previousLAC);
}
- return null;
+ return false;
}
- addObserver(observe);
- return this;
+ addWatcher(watcher);
+ return true;
}
public synchronized File getLf() {
@@ -283,6 +288,7 @@ class FileInfo extends Observable {
*/
public boolean setFenced() throws IOException {
boolean returnVal = false;
+ boolean changed = false;
synchronized (this) {
checkOpen(false);
if (LOG.isDebugEnabled()) {
@@ -293,12 +299,14 @@ class FileInfo extends Observable {
stateBits |= STATE_FENCED_BIT;
needFlushHeader = true;
synchronized (this) {
- setChanged();
+ changed = true;
}
returnVal = true;
}
}
- notifyObservers(new
LastAddConfirmedUpdateNotification(Long.MAX_VALUE));
+ if (changed) {
+ notifyWatchers(LastAddConfirmedUpdateNotification.FUNC,
Long.MAX_VALUE);
+ }
return returnVal;
}
@@ -379,20 +387,36 @@ class FileInfo extends Observable {
* if set to false, the index is not forced to create.
*/
public void close(boolean force) throws IOException {
- synchronized (this) {
- isClosed = true;
- checkOpen(force, true);
- // Any time when we force close a file, we should try to flush
header. otherwise, we might lose fence bit.
- if (force) {
- flushHeader();
+ boolean closing = false;
+ try {
+ boolean changed = false;
+ synchronized (this) {
+ if (isClosed) {
+ return;
+ }
+ isClosed = true;
+ closing = true;
+ checkOpen(force, true);
+ // Any time when we force close a file, we should try to flush
header.
+ // otherwise, we might lose fence bit.
+ if (force) {
+ flushHeader();
+ }
+ changed = true;
+ if (useCount.get() == 0 && fc != null) {
+ fc.close();
+ fc = null;
+ }
}
- setChanged();
- if (useCount.get() == 0 && fc != null) {
- fc.close();
- fc = null;
+ if (changed) {
+ notifyWatchers(LastAddConfirmedUpdateNotification.FUNC,
Long.MAX_VALUE);
+ }
+ } finally {
+ if (closing) {
+ // recycle this watchable after the FileInfo is closed.
+ recycle();
}
}
- notifyObservers(new
LastAddConfirmedUpdateNotification(Long.MAX_VALUE));
}
public synchronized long write(ByteBuffer[] buffs, long position) throws
IOException {
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java
index 3120fa4..8598bc4 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java
@@ -31,7 +31,6 @@ import com.google.common.cache.CacheBuilder;
import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;
import com.google.common.util.concurrent.UncheckedExecutionException;
-
import io.netty.buffer.ByteBuf;
import java.io.File;
import java.io.IOException;
@@ -40,17 +39,15 @@ import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
-import java.util.Observable;
-import java.util.Observer;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;
-
import org.apache.bookkeeper.bookie.LedgerDirsManager.LedgerDirsListener;
import
org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException;
+import org.apache.bookkeeper.common.util.Watcher;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.stats.Counter;
import org.apache.bookkeeper.stats.Gauge;
@@ -435,11 +432,13 @@ public class IndexPersistenceMgr {
}
}
- Observable waitForLastAddConfirmedUpdate(long ledgerId, long previoisLAC,
Observer observer) throws IOException {
+ boolean waitForLastAddConfirmedUpdate(long ledgerId,
+ long previousLAC,
+
Watcher<LastAddConfirmedUpdateNotification> watcher) throws IOException {
FileInfo fi = null;
try {
fi = getFileInfo(ledgerId, null);
- return fi.waitForLastAddConfirmedUpdate(previoisLAC, observer);
+ return fi.waitForLastAddConfirmedUpdate(previousLAC, watcher);
} finally {
if (null != fi) {
fi.release();
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
index b5c9513..bcb5dc5 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
@@ -27,22 +27,18 @@ import static
org.apache.bookkeeper.bookie.BookKeeperServerStats.STORAGE_GET_OFF
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
-
import io.netty.buffer.ByteBuf;
-
import java.io.File;
import java.io.IOException;
import java.util.Map;
import java.util.NavigableMap;
-import java.util.Observable;
-import java.util.Observer;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
-
import org.apache.bookkeeper.bookie.Bookie.NoLedgerException;
import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
import org.apache.bookkeeper.bookie.EntryLogger.EntryLogListener;
import org.apache.bookkeeper.bookie.LedgerDirsManager.LedgerDirsListener;
+import org.apache.bookkeeper.common.util.Watcher;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.meta.LedgerManager;
import org.apache.bookkeeper.proto.BookieProtocol;
@@ -255,9 +251,11 @@ public class InterleavedLedgerStorage implements
CompactableLedgerStorage, Entry
}
@Override
- public Observable waitForLastAddConfirmedUpdate(long ledgerId, long
previoisLAC, Observer observer)
+ public boolean waitForLastAddConfirmedUpdate(long ledgerId,
+ long previousLAC,
+
Watcher<LastAddConfirmedUpdateNotification> watcher)
throws IOException {
- return ledgerCache.waitForLastAddConfirmedUpdate(ledgerId,
previoisLAC, observer);
+ return ledgerCache.waitForLastAddConfirmedUpdate(ledgerId,
previousLAC, watcher);
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
index d54961e..b979b27 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
@@ -42,6 +42,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import
org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException;
+import org.apache.bookkeeper.common.collections.RecyclableArrayList;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
import org.apache.bookkeeper.stats.Counter;
@@ -52,7 +53,6 @@ import org.apache.bookkeeper.util.DaemonThreadFactory;
import org.apache.bookkeeper.util.IOUtils;
import org.apache.bookkeeper.util.MathUtils;
import org.apache.bookkeeper.util.collections.GrowableArrayBlockingQueue;
-import org.apache.bookkeeper.util.collections.RecyclableArrayList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LastAddConfirmedUpdateNotification.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LastAddConfirmedUpdateNotification.java
index a0c112d..71cbd61 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LastAddConfirmedUpdateNotification.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LastAddConfirmedUpdateNotification.java
@@ -20,17 +20,54 @@
*/
package org.apache.bookkeeper.bookie;
+import io.netty.util.Recycler;
+import io.netty.util.Recycler.Handle;
+import java.util.function.Function;
+import lombok.Getter;
+import org.apache.bookkeeper.common.collections.RecyclableArrayList;
+import org.apache.bookkeeper.common.util.Recyclable;
+import org.apache.bookkeeper.common.util.Watcher;
+
/**
* A signal object is used for notifying the observers when the {@code
LastAddConfirmed} is advanced.
*
* <p>The signal object contains the latest {@code LastAddConfirmed} and when
the {@code LastAddConfirmed} is advanced.
*/
-public class LastAddConfirmedUpdateNotification {
- public long lastAddConfirmed;
- public long timestamp;
+@Getter
+public class LastAddConfirmedUpdateNotification implements Recyclable {
+
+ public static final Function<Long, LastAddConfirmedUpdateNotification>
FUNC = lac -> of(lac);
+
+ public static final
RecyclableArrayList.Recycler<Watcher<LastAddConfirmedUpdateNotification>>
WATCHER_RECYCLER =
+ new RecyclableArrayList.Recycler<>();
+
+ public static LastAddConfirmedUpdateNotification of(long lastAddConfirmed)
{
+ LastAddConfirmedUpdateNotification lac = RECYCLER.get();
+ lac.lastAddConfirmed = lastAddConfirmed;
+ lac.timestamp = System.currentTimeMillis();
+ return lac;
+ }
+
+ private static final Recycler<LastAddConfirmedUpdateNotification> RECYCLER
=
+ new Recycler<LastAddConfirmedUpdateNotification>() {
+ @Override
+ protected LastAddConfirmedUpdateNotification
newObject(Handle<LastAddConfirmedUpdateNotification> handle) {
+ return new LastAddConfirmedUpdateNotification(handle);
+ }
+ };
+
+ private final Handle<LastAddConfirmedUpdateNotification> handle;
+ private long lastAddConfirmed;
+ private long timestamp;
+
+ public
LastAddConfirmedUpdateNotification(Handle<LastAddConfirmedUpdateNotification>
handle) {
+ this.handle = handle;
+ }
- public LastAddConfirmedUpdateNotification(long lastAddConfirmed) {
- this.lastAddConfirmed = lastAddConfirmed;
- this.timestamp = System.currentTimeMillis();
+ @Override
+ public void recycle() {
+ this.lastAddConfirmed = -1L;
+ this.timestamp = -1L;
+ handle.recycle(this);
}
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCache.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCache.java
index 26d5245..14d4825 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCache.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCache.java
@@ -24,8 +24,7 @@ package org.apache.bookkeeper.bookie;
import io.netty.buffer.ByteBuf;
import java.io.Closeable;
import java.io.IOException;
-import java.util.Observable;
-import java.util.Observer;
+import org.apache.bookkeeper.common.util.Watcher;
/**
* This class maps a ledger entry number into a location (entrylogid, offset)
in
@@ -49,7 +48,9 @@ interface LedgerCache extends Closeable {
Long getLastAddConfirmed(long ledgerId) throws IOException;
long updateLastAddConfirmed(long ledgerId, long lac) throws IOException;
- Observable waitForLastAddConfirmedUpdate(long ledgerId, long previoisLAC,
Observer observer) throws IOException;
+ boolean waitForLastAddConfirmedUpdate(long ledgerId,
+ long previousLAC,
+
Watcher<LastAddConfirmedUpdateNotification> watcher) throws IOException;
void deleteLedger(long ledgerId) throws IOException;
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheImpl.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheImpl.java
index 0b63e32..1db7d47 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheImpl.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheImpl.java
@@ -23,8 +23,7 @@ package org.apache.bookkeeper.bookie;
import io.netty.buffer.ByteBuf;
import java.io.IOException;
-import java.util.Observable;
-import java.util.Observer;
+import org.apache.bookkeeper.common.util.Watcher;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
@@ -85,12 +84,13 @@ public class LedgerCacheImpl implements LedgerCache {
}
@Override
- public Observable waitForLastAddConfirmedUpdate(long ledgerId, long
previoisLAC, Observer observer)
+ public boolean waitForLastAddConfirmedUpdate(long ledgerId,
+ long previousLAC,
+
Watcher<LastAddConfirmedUpdateNotification> watcher)
throws IOException {
- return indexPersistenceManager.waitForLastAddConfirmedUpdate(ledgerId,
previoisLAC, observer);
+ return indexPersistenceManager.waitForLastAddConfirmedUpdate(ledgerId,
previousLAC, watcher);
}
-
@Override
public void putEntryOffset(long ledger, long entry, long offset) throws
IOException {
indexPageManager.putEntryOffset(ledger, entry, offset);
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptor.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptor.java
index 970cec0..f4db7a6 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptor.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptor.java
@@ -27,8 +27,7 @@ import com.google.common.util.concurrent.SettableFuture;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import java.io.IOException;
-import java.util.Observable;
-import java.util.Observer;
+import org.apache.bookkeeper.common.util.Watcher;
/**
* Implements a ledger inside a bookie. In particular, it implements operations
@@ -78,7 +77,9 @@ public abstract class LedgerDescriptor {
abstract ByteBuf readEntry(long entryId) throws IOException;
abstract long getLastAddConfirmed() throws IOException;
- abstract Observable waitForLastAddConfirmedUpdate(long previoisLAC,
Observer observer) throws IOException;
+ abstract boolean waitForLastAddConfirmedUpdate(long previousLAC,
+
Watcher<LastAddConfirmedUpdateNotification> watcher)
+ throws IOException;
abstract void setExplicitLac(ByteBuf entry) throws IOException;
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptorImpl.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptorImpl.java
index 4a4be4d..c327afe 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptorImpl.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptorImpl.java
@@ -25,9 +25,8 @@ import com.google.common.util.concurrent.SettableFuture;
import io.netty.buffer.ByteBuf;
import java.io.IOException;
import java.util.Arrays;
-import java.util.Observable;
-import java.util.Observer;
import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.bookkeeper.common.util.Watcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -156,7 +155,8 @@ public class LedgerDescriptorImpl extends LedgerDescriptor {
}
@Override
- Observable waitForLastAddConfirmedUpdate(long previousLAC, Observer
observer) throws IOException {
- return ledgerStorage.waitForLastAddConfirmedUpdate(ledgerId,
previousLAC, observer);
+ boolean waitForLastAddConfirmedUpdate(long previousLAC,
+
Watcher<LastAddConfirmedUpdateNotification> watcher) throws IOException {
+ return ledgerStorage.waitForLastAddConfirmedUpdate(ledgerId,
previousLAC, watcher);
}
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java
index a0553a3..ac0df00 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java
@@ -23,9 +23,8 @@ package org.apache.bookkeeper.bookie;
import io.netty.buffer.ByteBuf;
import java.io.IOException;
-import java.util.Observable;
-import java.util.Observer;
import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
+import org.apache.bookkeeper.common.util.Watcher;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.meta.LedgerManager;
import org.apache.bookkeeper.stats.StatsLogger;
@@ -119,12 +118,14 @@ public interface LedgerStorage {
/**
* Wait for last add confirmed update.
*
- * @param previoisLAC - The threshold beyond which we would wait for the
update
- * @param observer - Observer to notify on update
+ * @param previousLAC - The threshold beyond which we would wait for the
update
+ * @param watcher - Watcher to notify on update
* @return
* @throws IOException
*/
- Observable waitForLastAddConfirmedUpdate(long ledgerId, long previoisLAC,
Observer observer) throws IOException;
+ boolean waitForLastAddConfirmedUpdate(long ledgerId,
+ long previousLAC,
+
Watcher<LastAddConfirmedUpdateNotification> watcher) throws IOException;
/**
* Flushes all data in the storage. Once this is called,
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/LongPollReadEntryProcessorV3.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/LongPollReadEntryProcessorV3.java
index faeb849..fdbbd35 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/LongPollReadEntryProcessorV3.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/LongPollReadEntryProcessorV3.java
@@ -19,22 +19,17 @@ package org.apache.bookkeeper.proto;
import com.google.common.base.Optional;
import com.google.common.base.Stopwatch;
-
import io.netty.channel.Channel;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timeout;
-import io.netty.util.TimerTask;
-
import java.io.IOException;
-import java.util.Observable;
-import java.util.Observer;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
-
import org.apache.bookkeeper.bookie.Bookie;
import org.apache.bookkeeper.bookie.LastAddConfirmedUpdateNotification;
+import org.apache.bookkeeper.common.util.Watcher;
import org.apache.bookkeeper.proto.BookkeeperProtocol.ReadResponse;
import org.apache.bookkeeper.proto.BookkeeperProtocol.Request;
import org.apache.bookkeeper.proto.BookkeeperProtocol.StatusCode;
@@ -44,7 +39,7 @@ import org.slf4j.LoggerFactory;
/**
* Processor handling long poll read entry request.
*/
-class LongPollReadEntryProcessorV3 extends ReadEntryProcessorV3 implements
Observer {
+class LongPollReadEntryProcessorV3 extends ReadEntryProcessorV3 implements
Watcher<LastAddConfirmedUpdateNotification> {
private static final Logger logger =
LoggerFactory.getLogger(LongPollReadEntryProcessorV3.class);
@@ -145,9 +140,9 @@ class LongPollReadEntryProcessorV3 extends
ReadEntryProcessorV3 implements Obser
final Stopwatch startTimeSw = Stopwatch.createStarted();
- final Observable observable;
+ final boolean watched;
try {
- observable =
requestProcessor.bookie.waitForLastAddConfirmedUpdate(ledgerId, previousLAC,
this);
+ watched =
requestProcessor.bookie.waitForLastAddConfirmedUpdate(ledgerId, previousLAC,
this);
} catch (Bookie.NoLedgerException e) {
logger.info("No ledger found while longpoll reading ledger {},
previous lac = {}.",
ledgerId, previousLAC);
@@ -161,19 +156,16 @@ class LongPollReadEntryProcessorV3 extends
ReadEntryProcessorV3 implements Obser
registerSuccessfulEvent(requestProcessor.longPollPreWaitStats,
startTimeSw);
lastPhaseStartTime.reset().start();
- if (null != observable) {
- // successfully registered observable to lac updates
+ if (watched) {
+ // successfully registered watcher to lac updates
if (logger.isTraceEnabled()) {
logger.trace("Waiting For LAC Update {}: Timeout {}",
previousLAC, readRequest.getTimeOut());
}
synchronized (this) {
- expirationTimerTask = requestTimer.newTimeout(new
TimerTask() {
- @Override
- public void run(Timeout timeout) throws Exception {
- // When the timeout expires just get whatever is
the current
- // readLastConfirmed
-
LongPollReadEntryProcessorV3.this.scheduleDeferredRead(observable, true);
- }
+ expirationTimerTask = requestTimer.newTimeout(timeout -> {
+ // When the timeout expires just get whatever is the
current
+ // readLastConfirmed
+
LongPollReadEntryProcessorV3.this.scheduleDeferredRead(true);
}, readRequest.getTimeOut(), TimeUnit.MILLISECONDS);
}
return null;
@@ -192,26 +184,25 @@ class LongPollReadEntryProcessorV3 extends
ReadEntryProcessorV3 implements Obser
}
@Override
- public void update(Observable observable, Object o) {
- LastAddConfirmedUpdateNotification newLACNotification =
(LastAddConfirmedUpdateNotification) o;
- if (newLACNotification.lastAddConfirmed > previousLAC) {
- if (newLACNotification.lastAddConfirmed != Long.MAX_VALUE &&
!lastAddConfirmedUpdateTime.isPresent()) {
- lastAddConfirmedUpdateTime =
Optional.of(newLACNotification.timestamp);
+ public void update(LastAddConfirmedUpdateNotification newLACNotification) {
+ if (newLACNotification.getLastAddConfirmed() > previousLAC) {
+ if (newLACNotification.getLastAddConfirmed() != Long.MAX_VALUE &&
!lastAddConfirmedUpdateTime.isPresent()) {
+ lastAddConfirmedUpdateTime =
Optional.of(newLACNotification.getTimestamp());
}
if (logger.isTraceEnabled()) {
logger.trace("Last Add Confirmed Advanced to {} for request
{}",
- newLACNotification.lastAddConfirmed, request);
+ newLACNotification.getLastAddConfirmed(), request);
}
- scheduleDeferredRead(observable, false);
+ scheduleDeferredRead(false);
}
+ newLACNotification.recycle();
}
- private synchronized void scheduleDeferredRead(Observable observable,
boolean timeout) {
+ private synchronized void scheduleDeferredRead(boolean timeout) {
if (null == deferredTask) {
if (logger.isTraceEnabled()) {
logger.trace("Deferred Task, expired: {}, request: {}",
timeout, request);
}
- observable.deleteObserver(this);
try {
shouldReadEntry = true;
deferredTask = longPollThreadPool.submit(this);
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/IndexPersistenceMgrTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/IndexPersistenceMgrTest.java
index d729be4..974bde2 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/IndexPersistenceMgrTest.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/IndexPersistenceMgrTest.java
@@ -28,9 +28,7 @@ import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.File;
-import java.util.Observable;
-import java.util.Observer;
-
+import org.apache.bookkeeper.common.util.Watcher;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.util.DiskChecker;
@@ -239,29 +237,27 @@ public class IndexPersistenceMgrTest {
@Test
public void testEvictionShouldNotAffectLongPollRead() throws Exception {
IndexPersistenceMgr indexPersistenceMgr = null;
- Observer observer = (obs, obj) -> {
- //no-ops
- };
+ Watcher<LastAddConfirmedUpdateNotification> watcher = notification ->
notification.recycle();
try {
indexPersistenceMgr = createIndexPersistenceManager(1);
indexPersistenceMgr.getFileInfo(lid, masterKey);
indexPersistenceMgr.getFileInfo(lid, null);
indexPersistenceMgr.updateLastAddConfirmed(lid, 1);
- Observable observable =
indexPersistenceMgr.waitForLastAddConfirmedUpdate(lid, 1, observer);
- // observer shouldn't be null because ledger is not evicted or
closed
- assertNotNull("Observer should not be null", observable);
+ // watch should succeed because ledger is not evicted or closed
+ assertTrue(
+ indexPersistenceMgr.waitForLastAddConfirmedUpdate(lid, 1,
watcher));
// now evict ledger 1 from write cache
indexPersistenceMgr.getFileInfo(lid + 1, masterKey);
- observable =
indexPersistenceMgr.waitForLastAddConfirmedUpdate(lid, 1, observer);
- // even if ledger 1 is evicted from write cache, observer still
shouldn't be null
- assertNotNull("Observer should not be null", observable);
+ // even if ledger 1 is evicted from write cache, watcher should
still succeed
+ assertTrue(
+ indexPersistenceMgr.waitForLastAddConfirmedUpdate(lid, 1,
watcher));
// now evict ledger 1 from read cache
indexPersistenceMgr.getFileInfo(lid + 2, masterKey);
indexPersistenceMgr.getFileInfo(lid + 2, null);
- // even if ledger 1 is evicted from both cache, observer still
shouldn't be null because it
+ // even if ledger 1 is evicted from both cache, watcher should
still succeed because it
// will create a new FileInfo when cache miss
- observable =
indexPersistenceMgr.waitForLastAddConfirmedUpdate(lid, 1, observer);
- assertNotNull("Observer should not be null", observable);
+ assertTrue(
+ indexPersistenceMgr.waitForLastAddConfirmedUpdate(lid, 1,
watcher));
} finally {
if (null != indexPersistenceMgr) {
indexPersistenceMgr.close();
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LastAddConfirmedUpdateNotificationTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LastAddConfirmedUpdateNotificationTest.java
new file mode 100644
index 0000000..90da463
--- /dev/null
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LastAddConfirmedUpdateNotificationTest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bookkeeper.bookie;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import org.junit.Test;
+
+/**
+ * Unit test of {@link LastAddConfirmedUpdateNotification}.
+ */
+public class LastAddConfirmedUpdateNotificationTest {
+
+ @Test
+ public void testGetters() {
+ long lac = System.currentTimeMillis();
+ LastAddConfirmedUpdateNotification notification =
LastAddConfirmedUpdateNotification.of(lac);
+
+ long timestamp = System.currentTimeMillis();
+ assertEquals(lac, notification.getLastAddConfirmed());
+ assertTrue(notification.getTimestamp() <= timestamp);
+
+ notification.recycle();
+ }
+
+ @Test
+ public void testRecycle() {
+ long lac = System.currentTimeMillis();
+ LastAddConfirmedUpdateNotification notification =
LastAddConfirmedUpdateNotification.of(lac);
+ notification.recycle();
+
+ assertEquals(-1L, notification.getLastAddConfirmed());
+ assertEquals(-1L, notification.getTimestamp());
+ }
+
+ @Test
+ public void testFunc() {
+ long lac = System.currentTimeMillis();
+ LastAddConfirmedUpdateNotification notification =
LastAddConfirmedUpdateNotification.FUNC.apply(lac);
+
+ assertEquals(lac, notification.getLastAddConfirmed());
+ }
+
+}
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSyncThread.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSyncThread.java
index b2bc916..ea60aa8 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSyncThread.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSyncThread.java
@@ -26,11 +26,8 @@ import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import io.netty.buffer.ByteBuf;
-
import java.io.File;
import java.io.IOException;
-import java.util.Observable;
-import java.util.Observer;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -38,10 +35,10 @@ import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
-
import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
import org.apache.bookkeeper.bookie.LedgerDirsManager.LedgerDirsListener;
import
org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException;
+import org.apache.bookkeeper.common.util.Watcher;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.conf.TestBKConfiguration;
import org.apache.bookkeeper.meta.LedgerManager;
@@ -348,9 +345,11 @@ public class TestSyncThread {
}
@Override
- public Observable waitForLastAddConfirmedUpdate(long ledgerId, long
previoisLAC, Observer observer)
+ public boolean waitForLastAddConfirmedUpdate(long ledgerId,
+ long previousLAC,
+
Watcher<LastAddConfirmedUpdateNotification> watcher)
throws IOException {
- return null;
+ return false;
}
@Override
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java
index f74289d..5c234d1 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java
@@ -21,7 +21,6 @@
package org.apache.bookkeeper.meta;
-
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
@@ -37,8 +36,6 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
-import java.util.Observable;
-import java.util.Observer;
import java.util.Queue;
import java.util.Random;
import java.util.Set;
@@ -55,11 +52,13 @@ import
org.apache.bookkeeper.bookie.CompactableLedgerStorage;
import org.apache.bookkeeper.bookie.EntryLocation;
import org.apache.bookkeeper.bookie.EntryLogger;
import org.apache.bookkeeper.bookie.GarbageCollector;
+import org.apache.bookkeeper.bookie.LastAddConfirmedUpdateNotification;
import org.apache.bookkeeper.bookie.LedgerDirsManager;
import org.apache.bookkeeper.bookie.ScanAndCompareGarbageCollector;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper.DigestType;
import org.apache.bookkeeper.client.LedgerMetadata;
+import org.apache.bookkeeper.common.util.Watcher;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.meta.LedgerManager.LedgerRange;
import org.apache.bookkeeper.meta.LedgerManager.LedgerRangeIterator;
@@ -425,9 +424,11 @@ public class GcLedgersTest extends LedgerManagerTestCase {
}
@Override
- public Observable waitForLastAddConfirmedUpdate(long ledgerId, long
previoisLAC, Observer observer)
+ public boolean waitForLastAddConfirmedUpdate(long ledgerId,
+ long previousLAC,
+
Watcher<LastAddConfirmedUpdateNotification> watcher)
throws IOException {
- return null;
+ return false;
}
}
}
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java
index cbd504b..ea02a30 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java
@@ -27,8 +27,6 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Map;
import java.util.NavigableMap;
-import java.util.Observable;
-import java.util.Observer;
import org.apache.bookkeeper.bookie.BookieException;
import org.apache.bookkeeper.bookie.CheckpointSource;
import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
@@ -36,7 +34,9 @@ import org.apache.bookkeeper.bookie.Checkpointer;
import org.apache.bookkeeper.bookie.CompactableLedgerStorage;
import org.apache.bookkeeper.bookie.EntryLocation;
import org.apache.bookkeeper.bookie.EntryLogger;
+import org.apache.bookkeeper.bookie.LastAddConfirmedUpdateNotification;
import org.apache.bookkeeper.bookie.LedgerDirsManager;
+import org.apache.bookkeeper.common.util.Watcher;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
@@ -213,9 +213,11 @@ public abstract class LedgerManagerTestCase extends
BookKeeperClusterTestCase {
}
@Override
- public Observable waitForLastAddConfirmedUpdate(long ledgerId, long
previoisLAC, Observer observer)
+ public boolean waitForLastAddConfirmedUpdate(long ledgerId,
+ long previousLAC,
+
Watcher<LastAddConfirmedUpdateNotification> watcher)
throws IOException {
- return null;
+ return false;
}
@Override
--
To stop receiving notification emails like this one, please contact
['"[email protected]" <[email protected]>'].