This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch branch-4.6 in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
commit 18b87b569c77da2d52ad8651e4a992be9cf5b0d7 Author: Sijie Guo <[email protected]> AuthorDate: Fri Dec 15 00:43:44 2017 -0800 ISSUE #326: Replace observer/observable with a simplified watcher/watchable implementation Descriptions of the changes in this PR: - long poll only need one-time notification on lac updates - replace observer/observable with a simplified watcher/watchable implementation. watchers are removed after they are fired. - add `RecycableHashSet` for `Watchable` keeping the list of watchers - object pooling on `Watchable` and `LastAddConfirmedUpdateNotification` Author: Sijie Guo <[email protected]> Reviewers: Ivan Kelly <[email protected]>, Enrico Olivelli <[email protected]> This closes #838 from sijie/rxjava, closes #326 --- .travis.yml | 3 + bookkeeper-common/pom.xml | 5 + .../common/collections/RecyclableArrayList.java | 57 +++++++++ .../common/collections/package-info.java | 19 +-- .../apache/bookkeeper/common/util/Recyclable.java | 24 ++-- .../apache/bookkeeper/common/util/Watchable.java | 120 +++++++++++++++++++ .../org/apache/bookkeeper/common/util/Watcher.java | 30 ++--- .../collections/RecyclableArrayListTest.java | 38 +++--- .../bookkeeper/common/util/TestWatchable.java | 129 +++++++++++++++++++++ .../src/test/resources/log4j.properties | 42 +++++++ .../java/org/apache/bookkeeper/bookie/Bookie.java | 9 +- .../org/apache/bookkeeper/bookie/FileInfo.java | 72 ++++++++---- .../bookkeeper/bookie/IndexPersistenceMgr.java | 11 +- .../bookie/InterleavedLedgerStorage.java | 9 +- .../java/org/apache/bookkeeper/bookie/Journal.java | 1 + .../bookie/LastAddConfirmedUpdateNotification.java | 49 +++++++- .../org/apache/bookkeeper/bookie/LedgerCache.java | 7 +- .../apache/bookkeeper/bookie/LedgerCacheImpl.java | 10 +- .../apache/bookkeeper/bookie/LedgerDescriptor.java | 7 +- .../bookkeeper/bookie/LedgerDescriptorImpl.java | 8 +- .../apache/bookkeeper/bookie/LedgerStorage.java | 11 +- .../proto/LongPollReadEntryProcessorV3.java | 43 +++---- .../bookkeeper/bookie/IndexPersistenceMgrTest.java | 38 +++--- .../LastAddConfirmedUpdateNotificationTest.java | 62 ++++++++++ .../apache/bookkeeper/bookie/TestSyncThread.java | 17 +-- .../org/apache/bookkeeper/meta/GcLedgersTest.java | 12 +- .../bookkeeper/meta/LedgerManagerTestCase.java | 11 +- 27 files changed, 663 insertions(+), 181 deletions(-) diff --git a/.travis.yml b/.travis.yml index bea17d5..718d388 100644 --- a/.travis.yml +++ b/.travis.yml @@ -27,6 +27,9 @@ matrix: osx_image: xcode8 - os: linux env: CUSTOM_JDK="oraclejdk8" + - os: linux + dist: trusty + env: CUSTOM_JDK="openjdk8" before_install: - echo "MAVEN_OPTS='-Xmx3072m -XX:MaxPermSize=512m'" > ~/.mavenrc diff --git a/bookkeeper-common/pom.xml b/bookkeeper-common/pom.xml index 1e502ca..5398664 100644 --- a/bookkeeper-common/pom.xml +++ b/bookkeeper-common/pom.xml @@ -36,6 +36,11 @@ <version>${guava.version}</version> </dependency> <dependency> + <groupId>io.netty</groupId> + <artifactId>netty-common</artifactId> + <version>${netty.version}</version> + </dependency> + <dependency> <groupId>com.google.code.findbugs</groupId> <artifactId>jsr305</artifactId> <version>${google.code.version}</version> diff --git a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/collections/RecyclableArrayList.java b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/collections/RecyclableArrayList.java new file mode 100644 index 0000000..7dd663f --- /dev/null +++ b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/collections/RecyclableArrayList.java @@ -0,0 +1,57 @@ +// Originally copied from netty project, version 4.1.17-Final, heavily modified +/* + * Copyright 2013 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package org.apache.bookkeeper.common.collections; + +import io.netty.util.Recycler.Handle; +import java.util.ArrayList; + +/** + * A simple list which is recyclable. + */ +public final class RecyclableArrayList<T> extends ArrayList<T> { + + private static final int DEFAULT_INITIAL_CAPACITY = 8; + + /** + * An ArrayList recycler. + */ + public static class Recycler<X> + extends io.netty.util.Recycler<RecyclableArrayList<X>> { + @Override + protected RecyclableArrayList<X> newObject( + Handle<RecyclableArrayList<X>> handle) { + return new RecyclableArrayList<X>(handle, DEFAULT_INITIAL_CAPACITY); + } + + public RecyclableArrayList<X> newInstance() { + return get(); + } + } + + private final Handle<RecyclableArrayList<T>> handle; + + private RecyclableArrayList(Handle<RecyclableArrayList<T>> handle, int initialCapacity) { + super(initialCapacity); + this.handle = handle; + } + + public void recycle() { + clear(); + handle.recycle(this); + } +} diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LastAddConfirmedUpdateNotification.java b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/collections/package-info.java similarity index 58% copy from bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LastAddConfirmedUpdateNotification.java copy to bookkeeper-common/src/main/java/org/apache/bookkeeper/common/collections/package-info.java index a0c112d..0ed3c73 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LastAddConfirmedUpdateNotification.java +++ b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/collections/package-info.java @@ -1,5 +1,4 @@ -/** - * +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -16,21 +15,9 @@ * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. - * */ -package org.apache.bookkeeper.bookie; /** - * A signal object is used for notifying the observers when the {@code LastAddConfirmed} is advanced. - * - * <p>The signal object contains the latest {@code LastAddConfirmed} and when the {@code LastAddConfirmed} is advanced. + * Bookkeeper common collections. */ -public class LastAddConfirmedUpdateNotification { - public long lastAddConfirmed; - public long timestamp; - - public LastAddConfirmedUpdateNotification(long lastAddConfirmed) { - this.lastAddConfirmed = lastAddConfirmed; - this.timestamp = System.currentTimeMillis(); - } -} +package org.apache.bookkeeper.common.collections; diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LastAddConfirmedUpdateNotification.java b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/Recyclable.java similarity index 59% copy from bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LastAddConfirmedUpdateNotification.java copy to bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/Recyclable.java index a0c112d..edbaa34 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LastAddConfirmedUpdateNotification.java +++ b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/Recyclable.java @@ -1,5 +1,4 @@ -/** - * +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -16,21 +15,18 @@ * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. - * */ -package org.apache.bookkeeper.bookie; + +package org.apache.bookkeeper.common.util; /** - * A signal object is used for notifying the observers when the {@code LastAddConfirmed} is advanced. - * - * <p>The signal object contains the latest {@code LastAddConfirmed} and when the {@code LastAddConfirmed} is advanced. + * An interface represents an object that is recyclable. */ -public class LastAddConfirmedUpdateNotification { - public long lastAddConfirmed; - public long timestamp; +public interface Recyclable { + + /** + * Recycle the instance. + */ + void recycle(); - public LastAddConfirmedUpdateNotification(long lastAddConfirmed) { - this.lastAddConfirmed = lastAddConfirmed; - this.timestamp = System.currentTimeMillis(); - } } diff --git a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/Watchable.java b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/Watchable.java new file mode 100644 index 0000000..a74f5f5 --- /dev/null +++ b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/Watchable.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.bookkeeper.common.util; + +import static com.google.common.base.Preconditions.checkNotNull; + +import java.util.function.Function; +import org.apache.bookkeeper.common.collections.RecyclableArrayList; +import org.apache.bookkeeper.common.collections.RecyclableArrayList.Recycler; + +/** + * This class represents an watchable object, or "data" + * in the model-view paradigm. It can be subclassed to represent an + * object that the application wants to have watched. + * + * <p>An watchable object can have one or more watchers. An watcher + * may be any object that implements interface <tt>Watcher</tt>. After an + * watchable instance changes, an application calling the + * <code>Watchable</code>'s <code>notifyWatchers</code> method + * causes all of its watchers to be notified of the change by a call + * to their <code>update</code> method. + * + * <p>A watcher is automatically removed from the watchers list once an event + * is fired to the watcher. + * + * <p>Note that this notification mechanism has nothing to do with threads + * and is completely separate from the <tt>wait</tt> and <tt>notify</tt> + * mechanism of class <tt>Object</tt>. + * + * <p>When an watchable object is newly created, its set of watchers is + * empty. If a same watcher is added multiple times to this watchable, it will + * receive the notifications multiple times. + */ +public class Watchable<T> implements Recyclable { + + private final Recycler<Watcher<T>> recycler; + private RecyclableArrayList<Watcher<T>> watchers; + + /** Construct an Watchable with zero watchers. */ + + public Watchable(Recycler<Watcher<T>> recycler) { + this.recycler = recycler; + this.watchers = recycler.newInstance(); + } + + synchronized int getNumWatchers() { + return this.watchers.size(); + } + + /** + * Adds an watcher to the set of watchers for this object, provided + * that it is not the same as some watcher already in the set. + * The order in which notifications will be delivered to multiple + * watchers is not specified. See the class comment. + * + * @param w an watcher to be added. + * @return true if a watcher is added to the list successfully, otherwise false. + * @throws NullPointerException if the parameter o is null. + */ + public synchronized boolean addWatcher(Watcher<T> w) { + checkNotNull(w, "Null watcher is provided"); + return watchers.add(w); + } + + /** + * Deletes an watcher from the set of watcher of this object. + * Passing <CODE>null</CODE> to this method will have no effect. + * @param w the watcher to be deleted. + */ + public synchronized boolean deleteWatcher(Watcher<T> w) { + return watchers.remove(w); + } + + /** + * Notify the watchers with the update <i>value</i>. + * + * @param value value to notify + */ + public <R> void notifyWatchers(Function<R, T> valueFn, R value) { + RecyclableArrayList<Watcher<T>> watchersLocal; + synchronized (this) { + watchersLocal = watchers; + watchers = recycler.newInstance(); + } + + for (Watcher<T> watcher : watchersLocal) { + watcher.update(valueFn.apply(value)); + } + watchersLocal.recycle(); + } + + /** + * Clears the watcher list so that this object no longer has any watchers. + */ + public synchronized void deleteWatchers() { + watchers.clear(); + } + + @Override + public synchronized void recycle() { + watchers.recycle(); + } +} diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LastAddConfirmedUpdateNotification.java b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/Watcher.java similarity index 59% copy from bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LastAddConfirmedUpdateNotification.java copy to bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/Watcher.java index a0c112d..220a942 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LastAddConfirmedUpdateNotification.java +++ b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/Watcher.java @@ -1,5 +1,4 @@ -/** - * +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -16,21 +15,24 @@ * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. - * */ -package org.apache.bookkeeper.bookie; + +package org.apache.bookkeeper.common.util; /** - * A signal object is used for notifying the observers when the {@code LastAddConfirmed} is advanced. - * - * <p>The signal object contains the latest {@code LastAddConfirmed} and when the {@code LastAddConfirmed} is advanced. + * A class can implement the <code>Watcher</code> interface when it + * wants to be informed of <i>one-time</i> changes in watchable objects. */ -public class LastAddConfirmedUpdateNotification { - public long lastAddConfirmed; - public long timestamp; +public interface Watcher<T> { + + /** + * This method is called whenever the watched object is changed. An + * application calls an <tt>Watchable</tt> object's + * <code>notifyWatchers</code> method to have all the object's + * watchers notified of the change. + * + * @param value the updated value of a watchable + */ + void update(T value); - public LastAddConfirmedUpdateNotification(long lastAddConfirmed) { - this.lastAddConfirmed = lastAddConfirmed; - this.timestamp = System.currentTimeMillis(); - } } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LastAddConfirmedUpdateNotification.java b/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/collections/RecyclableArrayListTest.java similarity index 53% copy from bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LastAddConfirmedUpdateNotification.java copy to bookkeeper-common/src/test/java/org/apache/bookkeeper/common/collections/RecyclableArrayListTest.java index a0c112d..9037d21 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LastAddConfirmedUpdateNotification.java +++ b/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/collections/RecyclableArrayListTest.java @@ -1,5 +1,4 @@ -/** - * +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -16,21 +15,34 @@ * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. - * */ -package org.apache.bookkeeper.bookie; +package org.apache.bookkeeper.common.collections; + +import static org.junit.Assert.assertEquals; + +import org.apache.bookkeeper.common.collections.RecyclableArrayList.Recycler; +import org.junit.Test; /** - * A signal object is used for notifying the observers when the {@code LastAddConfirmed} is advanced. - * - * <p>The signal object contains the latest {@code LastAddConfirmed} and when the {@code LastAddConfirmed} is advanced. + * Unit test of {@link RecyclableArrayList}. */ -public class LastAddConfirmedUpdateNotification { - public long lastAddConfirmed; - public long timestamp; +public class RecyclableArrayListTest { + + private final Recycler<Integer> recycler; - public LastAddConfirmedUpdateNotification(long lastAddConfirmed) { - this.lastAddConfirmed = lastAddConfirmed; - this.timestamp = System.currentTimeMillis(); + public RecyclableArrayListTest() { + this.recycler = new Recycler<>(); } + + @Test + public void testRecycle() { + RecyclableArrayList<Integer> array = recycler.newInstance(); + for (int i = 0; i < 5; i++) { + array.add(i); + } + assertEquals(5, array.size()); + array.recycle(); + assertEquals(0, array.size()); + } + } diff --git a/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/util/TestWatchable.java b/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/util/TestWatchable.java new file mode 100644 index 0000000..697d0da --- /dev/null +++ b/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/util/TestWatchable.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.bookkeeper.common.util; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import java.util.function.Function; +import org.apache.bookkeeper.common.collections.RecyclableArrayList.Recycler; +import org.junit.After; +import org.junit.Test; + +/** + * Unit test of {@link Watchable}. + */ +public class TestWatchable { + + private final Recycler<Watcher<Integer>> recycler; + private final Watchable<Integer> watchable; + + public TestWatchable() { + this.recycler = new Recycler<>(); + this.watchable = new Watchable<>(recycler); + } + + @After + public void teardown() { + this.watchable.recycle(); + } + + @Test + @SuppressWarnings("unchecked") + public void testAddWatcher() { + Watcher<Integer> watcher = mock(Watcher.class); + assertTrue(watchable.addWatcher(watcher)); + assertEquals(1, watchable.getNumWatchers()); + + watchable.notifyWatchers(Function.identity(), 123); + verify(watcher, times(1)).update(eq(123)); + + // after the watcher is fired, watcher should be removed from watcher list. + assertEquals(0, watchable.getNumWatchers()); + } + + @Test + @SuppressWarnings("unchecked") + public void testDeleteWatcher() { + Watcher<Integer> watcher = mock(Watcher.class); + assertTrue(watchable.addWatcher(watcher)); + assertEquals(1, watchable.getNumWatchers()); + assertTrue(watchable.deleteWatcher(watcher)); + assertEquals(0, watchable.getNumWatchers()); + + watchable.notifyWatchers(Function.identity(), 123); + verify(watcher, times(0)).update(anyInt()); + } + + @Test + @SuppressWarnings("unchecked") + public void testMultipleWatchers() { + Watcher<Integer> watcher1 = mock(Watcher.class); + Watcher<Integer> watcher2 = mock(Watcher.class); + + assertTrue(watchable.addWatcher(watcher1)); + assertTrue(watchable.addWatcher(watcher2)); + assertEquals(2, watchable.getNumWatchers()); + + watchable.notifyWatchers(Function.identity(), 123); + verify(watcher1, times(1)).update(eq(123)); + verify(watcher2, times(1)).update(eq(123)); + assertEquals(0, watchable.getNumWatchers()); + } + + @Test + @SuppressWarnings("unchecked") + public void testAddWatchMultipleTimes() { + Watcher<Integer> watcher = mock(Watcher.class); + + int numTimes = 3; + for (int i = 0; i < numTimes; i++) { + assertTrue(watchable.addWatcher(watcher)); + } + assertEquals(numTimes, watchable.getNumWatchers()); + + watchable.notifyWatchers(Function.identity(), 123); + verify(watcher, times(numTimes)).update(eq(123)); + assertEquals(0, watchable.getNumWatchers()); + } + + @Test + @SuppressWarnings("unchecked") + public void testDeleteWatchers() { + Watcher<Integer> watcher1 = mock(Watcher.class); + Watcher<Integer> watcher2 = mock(Watcher.class); + + assertTrue(watchable.addWatcher(watcher1)); + assertTrue(watchable.addWatcher(watcher2)); + assertEquals(2, watchable.getNumWatchers()); + watchable.deleteWatchers(); + assertEquals(0, watchable.getNumWatchers()); + + watchable.notifyWatchers(Function.identity(), 123); + verify(watcher1, times(0)).update(anyInt()); + verify(watcher2, times(0)).update(anyInt()); + } + +} diff --git a/bookkeeper-common/src/test/resources/log4j.properties b/bookkeeper-common/src/test/resources/log4j.properties new file mode 100644 index 0000000..10ae6bf --- /dev/null +++ b/bookkeeper-common/src/test/resources/log4j.properties @@ -0,0 +1,42 @@ +# +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +# + +# +# Bookkeeper Logging Configuration +# + +# Format is "<default threshold> (, <appender>)+ + +# DEFAULT: console appender only, level INFO +bookkeeper.root.logger=INFO,CONSOLE +log4j.rootLogger=${bookkeeper.root.logger} + +# +# Log INFO level and above messages to the console +# +log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender +log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout +log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} - %-5p - [%t:%C{1}@%L] - %m%n + +#disable zookeeper logging +log4j.logger.org.apache.zookeeper=OFF +log4j.logger.org.apache.bookkeeper.bookie=INFO +log4j.logger.org.apache.bookkeeper.meta=INFO diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java index c08ff46..074c29c 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java @@ -53,8 +53,6 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Observable; -import java.util.Observer; import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; @@ -73,6 +71,7 @@ import org.apache.bookkeeper.bookie.BookieException.UnknownBookieIdException; import org.apache.bookkeeper.bookie.Journal.JournalScanner; import org.apache.bookkeeper.bookie.LedgerDirsManager.LedgerDirsListener; import org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException; +import org.apache.bookkeeper.common.util.Watcher; import org.apache.bookkeeper.conf.ServerConfiguration; import org.apache.bookkeeper.discover.RegistrationManager; import org.apache.bookkeeper.discover.ZKRegistrationManager; @@ -1433,10 +1432,12 @@ public class Bookie extends BookieCriticalThread { return handle.getLastAddConfirmed(); } - public Observable waitForLastAddConfirmedUpdate(long ledgerId, long previoisLAC, Observer observer) + public boolean waitForLastAddConfirmedUpdate(long ledgerId, + long previousLAC, + Watcher<LastAddConfirmedUpdateNotification> watcher) throws IOException { LedgerDescriptor handle = handles.getReadOnlyHandle(ledgerId); - return handle.waitForLastAddConfirmedUpdate(previoisLAC, observer); + return handle.waitForLastAddConfirmedUpdate(previousLAC, watcher); } // The rest of the code is test stuff diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java index c14be5f..38edbda 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java @@ -22,6 +22,7 @@ package org.apache.bookkeeper.bookie; import static com.google.common.base.Charsets.UTF_8; +import static org.apache.bookkeeper.bookie.LastAddConfirmedUpdateNotification.WATCHER_RECYCLER; import com.google.common.annotations.VisibleForTesting; import io.netty.buffer.ByteBuf; @@ -32,9 +33,9 @@ import java.io.RandomAccessFile; import java.nio.BufferUnderflowException; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; -import java.util.Observable; -import java.util.Observer; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.bookkeeper.common.util.Watchable; +import org.apache.bookkeeper.common.util.Watcher; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -58,7 +59,7 @@ import org.slf4j.LoggerFactory; * in entry loggers. * </p> */ -class FileInfo extends Observable { +class FileInfo extends Watchable<LastAddConfirmedUpdateNotification> { private static final Logger LOG = LoggerFactory.getLogger(FileInfo.class); static final int NO_MASTER_KEY = -1; @@ -93,8 +94,9 @@ class FileInfo extends Observable { protected String mode; public FileInfo(File lf, byte[] masterKey) throws IOException { - this.lf = lf; + super(WATCHER_RECYCLER); + this.lf = lf; this.masterKey = masterKey; mode = "rw"; } @@ -105,10 +107,11 @@ class FileInfo extends Observable { long setLastAddConfirmed(long lac) { long lacToReturn; + boolean changed = false; synchronized (this) { if (null == this.lac || this.lac < lac) { this.lac = lac; - setChanged(); + changed = true; } lacToReturn = this.lac; } @@ -116,22 +119,24 @@ class FileInfo extends Observable { LOG.trace("Updating LAC {} , {}", lacToReturn, lac); } - - notifyObservers(new LastAddConfirmedUpdateNotification(lacToReturn)); + if (changed) { + notifyWatchers(LastAddConfirmedUpdateNotification.FUNC, lacToReturn); + } return lacToReturn; } - synchronized Observable waitForLastAddConfirmedUpdate(long previousLAC, Observer observe) { + synchronized boolean waitForLastAddConfirmedUpdate(long previousLAC, + Watcher<LastAddConfirmedUpdateNotification> watcher) { if ((null != lac && lac > previousLAC) || isClosed || ((stateBits & STATE_FENCED_BIT) == STATE_FENCED_BIT)) { if (LOG.isTraceEnabled()) { LOG.trace("Wait For LAC {} , {}", this.lac, previousLAC); } - return null; + return false; } - addObserver(observe); - return this; + addWatcher(watcher); + return true; } public synchronized File getLf() { @@ -283,6 +288,7 @@ class FileInfo extends Observable { */ public boolean setFenced() throws IOException { boolean returnVal = false; + boolean changed = false; synchronized (this) { checkOpen(false); if (LOG.isDebugEnabled()) { @@ -293,12 +299,14 @@ class FileInfo extends Observable { stateBits |= STATE_FENCED_BIT; needFlushHeader = true; synchronized (this) { - setChanged(); + changed = true; } returnVal = true; } } - notifyObservers(new LastAddConfirmedUpdateNotification(Long.MAX_VALUE)); + if (changed) { + notifyWatchers(LastAddConfirmedUpdateNotification.FUNC, Long.MAX_VALUE); + } return returnVal; } @@ -379,20 +387,36 @@ class FileInfo extends Observable { * if set to false, the index is not forced to create. */ public void close(boolean force) throws IOException { - synchronized (this) { - isClosed = true; - checkOpen(force, true); - // Any time when we force close a file, we should try to flush header. otherwise, we might lose fence bit. - if (force) { - flushHeader(); + boolean closing = false; + try { + boolean changed = false; + synchronized (this) { + if (isClosed) { + return; + } + isClosed = true; + closing = true; + checkOpen(force, true); + // Any time when we force close a file, we should try to flush header. + // otherwise, we might lose fence bit. + if (force) { + flushHeader(); + } + changed = true; + if (useCount.get() == 0 && fc != null) { + fc.close(); + fc = null; + } } - setChanged(); - if (useCount.get() == 0 && fc != null) { - fc.close(); - fc = null; + if (changed) { + notifyWatchers(LastAddConfirmedUpdateNotification.FUNC, Long.MAX_VALUE); + } + } finally { + if (closing) { + // recycle this watchable after the FileInfo is closed. + recycle(); } } - notifyObservers(new LastAddConfirmedUpdateNotification(Long.MAX_VALUE)); } public synchronized long write(ByteBuffer[] buffs, long position) throws IOException { diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java index 3120fa4..8598bc4 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java @@ -31,7 +31,6 @@ import com.google.common.cache.CacheBuilder; import com.google.common.cache.RemovalListener; import com.google.common.cache.RemovalNotification; import com.google.common.util.concurrent.UncheckedExecutionException; - import io.netty.buffer.ByteBuf; import java.io.File; import java.io.IOException; @@ -40,17 +39,15 @@ import java.util.Collections; import java.util.Comparator; import java.util.List; import java.util.Map; -import java.util.Observable; -import java.util.Observer; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantReadWriteLock; - import org.apache.bookkeeper.bookie.LedgerDirsManager.LedgerDirsListener; import org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException; +import org.apache.bookkeeper.common.util.Watcher; import org.apache.bookkeeper.conf.ServerConfiguration; import org.apache.bookkeeper.stats.Counter; import org.apache.bookkeeper.stats.Gauge; @@ -435,11 +432,13 @@ public class IndexPersistenceMgr { } } - Observable waitForLastAddConfirmedUpdate(long ledgerId, long previoisLAC, Observer observer) throws IOException { + boolean waitForLastAddConfirmedUpdate(long ledgerId, + long previousLAC, + Watcher<LastAddConfirmedUpdateNotification> watcher) throws IOException { FileInfo fi = null; try { fi = getFileInfo(ledgerId, null); - return fi.waitForLastAddConfirmedUpdate(previoisLAC, observer); + return fi.waitForLastAddConfirmedUpdate(previousLAC, watcher); } finally { if (null != fi) { fi.release(); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java index 6db3e6d..b654b0e 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java @@ -32,14 +32,13 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.Map; import java.util.NavigableMap; -import java.util.Observable; -import java.util.Observer; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.TimeUnit; import org.apache.bookkeeper.bookie.Bookie.NoLedgerException; import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint; import org.apache.bookkeeper.bookie.EntryLogger.EntryLogListener; import org.apache.bookkeeper.bookie.LedgerDirsManager.LedgerDirsListener; +import org.apache.bookkeeper.common.util.Watcher; import org.apache.bookkeeper.conf.ServerConfiguration; import org.apache.bookkeeper.meta.LedgerManager; import org.apache.bookkeeper.proto.BookieProtocol; @@ -251,9 +250,11 @@ public class InterleavedLedgerStorage implements CompactableLedgerStorage, Entry } @Override - public Observable waitForLastAddConfirmedUpdate(long ledgerId, long previoisLAC, Observer observer) + public boolean waitForLastAddConfirmedUpdate(long ledgerId, + long previousLAC, + Watcher<LastAddConfirmedUpdateNotification> watcher) throws IOException { - return ledgerCache.waitForLastAddConfirmedUpdate(ledgerId, previoisLAC, observer); + return ledgerCache.waitForLastAddConfirmedUpdate(ledgerId, previousLAC, watcher); } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java index 1a3635c..11468d1 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java @@ -38,6 +38,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException; +import org.apache.bookkeeper.common.collections.RecyclableArrayList; import org.apache.bookkeeper.conf.ServerConfiguration; import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback; import org.apache.bookkeeper.stats.Counter; diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LastAddConfirmedUpdateNotification.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LastAddConfirmedUpdateNotification.java index a0c112d..71cbd61 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LastAddConfirmedUpdateNotification.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LastAddConfirmedUpdateNotification.java @@ -20,17 +20,54 @@ */ package org.apache.bookkeeper.bookie; +import io.netty.util.Recycler; +import io.netty.util.Recycler.Handle; +import java.util.function.Function; +import lombok.Getter; +import org.apache.bookkeeper.common.collections.RecyclableArrayList; +import org.apache.bookkeeper.common.util.Recyclable; +import org.apache.bookkeeper.common.util.Watcher; + /** * A signal object is used for notifying the observers when the {@code LastAddConfirmed} is advanced. * * <p>The signal object contains the latest {@code LastAddConfirmed} and when the {@code LastAddConfirmed} is advanced. */ -public class LastAddConfirmedUpdateNotification { - public long lastAddConfirmed; - public long timestamp; +@Getter +public class LastAddConfirmedUpdateNotification implements Recyclable { + + public static final Function<Long, LastAddConfirmedUpdateNotification> FUNC = lac -> of(lac); + + public static final RecyclableArrayList.Recycler<Watcher<LastAddConfirmedUpdateNotification>> WATCHER_RECYCLER = + new RecyclableArrayList.Recycler<>(); + + public static LastAddConfirmedUpdateNotification of(long lastAddConfirmed) { + LastAddConfirmedUpdateNotification lac = RECYCLER.get(); + lac.lastAddConfirmed = lastAddConfirmed; + lac.timestamp = System.currentTimeMillis(); + return lac; + } + + private static final Recycler<LastAddConfirmedUpdateNotification> RECYCLER = + new Recycler<LastAddConfirmedUpdateNotification>() { + @Override + protected LastAddConfirmedUpdateNotification newObject(Handle<LastAddConfirmedUpdateNotification> handle) { + return new LastAddConfirmedUpdateNotification(handle); + } + }; + + private final Handle<LastAddConfirmedUpdateNotification> handle; + private long lastAddConfirmed; + private long timestamp; + + public LastAddConfirmedUpdateNotification(Handle<LastAddConfirmedUpdateNotification> handle) { + this.handle = handle; + } - public LastAddConfirmedUpdateNotification(long lastAddConfirmed) { - this.lastAddConfirmed = lastAddConfirmed; - this.timestamp = System.currentTimeMillis(); + @Override + public void recycle() { + this.lastAddConfirmed = -1L; + this.timestamp = -1L; + handle.recycle(this); } } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCache.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCache.java index 26d5245..14d4825 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCache.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCache.java @@ -24,8 +24,7 @@ package org.apache.bookkeeper.bookie; import io.netty.buffer.ByteBuf; import java.io.Closeable; import java.io.IOException; -import java.util.Observable; -import java.util.Observer; +import org.apache.bookkeeper.common.util.Watcher; /** * This class maps a ledger entry number into a location (entrylogid, offset) in @@ -49,7 +48,9 @@ interface LedgerCache extends Closeable { Long getLastAddConfirmed(long ledgerId) throws IOException; long updateLastAddConfirmed(long ledgerId, long lac) throws IOException; - Observable waitForLastAddConfirmedUpdate(long ledgerId, long previoisLAC, Observer observer) throws IOException; + boolean waitForLastAddConfirmedUpdate(long ledgerId, + long previousLAC, + Watcher<LastAddConfirmedUpdateNotification> watcher) throws IOException; void deleteLedger(long ledgerId) throws IOException; diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheImpl.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheImpl.java index 0b63e32..1db7d47 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheImpl.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheImpl.java @@ -23,8 +23,7 @@ package org.apache.bookkeeper.bookie; import io.netty.buffer.ByteBuf; import java.io.IOException; -import java.util.Observable; -import java.util.Observer; +import org.apache.bookkeeper.common.util.Watcher; import org.apache.bookkeeper.conf.ServerConfiguration; import org.apache.bookkeeper.stats.NullStatsLogger; import org.apache.bookkeeper.stats.StatsLogger; @@ -85,12 +84,13 @@ public class LedgerCacheImpl implements LedgerCache { } @Override - public Observable waitForLastAddConfirmedUpdate(long ledgerId, long previoisLAC, Observer observer) + public boolean waitForLastAddConfirmedUpdate(long ledgerId, + long previousLAC, + Watcher<LastAddConfirmedUpdateNotification> watcher) throws IOException { - return indexPersistenceManager.waitForLastAddConfirmedUpdate(ledgerId, previoisLAC, observer); + return indexPersistenceManager.waitForLastAddConfirmedUpdate(ledgerId, previousLAC, watcher); } - @Override public void putEntryOffset(long ledger, long entry, long offset) throws IOException { indexPageManager.putEntryOffset(ledger, entry, offset); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptor.java index 970cec0..f4db7a6 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptor.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptor.java @@ -27,8 +27,7 @@ import com.google.common.util.concurrent.SettableFuture; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import java.io.IOException; -import java.util.Observable; -import java.util.Observer; +import org.apache.bookkeeper.common.util.Watcher; /** * Implements a ledger inside a bookie. In particular, it implements operations @@ -78,7 +77,9 @@ public abstract class LedgerDescriptor { abstract ByteBuf readEntry(long entryId) throws IOException; abstract long getLastAddConfirmed() throws IOException; - abstract Observable waitForLastAddConfirmedUpdate(long previoisLAC, Observer observer) throws IOException; + abstract boolean waitForLastAddConfirmedUpdate(long previousLAC, + Watcher<LastAddConfirmedUpdateNotification> watcher) + throws IOException; abstract void setExplicitLac(ByteBuf entry) throws IOException; diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptorImpl.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptorImpl.java index af55f6a..d126c4c 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptorImpl.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptorImpl.java @@ -25,9 +25,8 @@ import com.google.common.util.concurrent.SettableFuture; import io.netty.buffer.ByteBuf; import java.io.IOException; import java.util.Arrays; -import java.util.Observable; -import java.util.Observer; import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.bookkeeper.common.util.Watcher; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -156,7 +155,8 @@ public class LedgerDescriptorImpl extends LedgerDescriptor { } @Override - Observable waitForLastAddConfirmedUpdate(long previousLAC, Observer observer) throws IOException { - return ledgerStorage.waitForLastAddConfirmedUpdate(ledgerId, previousLAC, observer); + boolean waitForLastAddConfirmedUpdate(long previousLAC, + Watcher<LastAddConfirmedUpdateNotification> watcher) throws IOException { + return ledgerStorage.waitForLastAddConfirmedUpdate(ledgerId, previousLAC, watcher); } } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java index d2055a8..3e2f7dc 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java @@ -23,9 +23,8 @@ package org.apache.bookkeeper.bookie; import io.netty.buffer.ByteBuf; import java.io.IOException; -import java.util.Observable; -import java.util.Observer; import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint; +import org.apache.bookkeeper.common.util.Watcher; import org.apache.bookkeeper.conf.ServerConfiguration; import org.apache.bookkeeper.meta.LedgerManager; import org.apache.bookkeeper.stats.StatsLogger; @@ -119,12 +118,14 @@ public interface LedgerStorage { /** * Wait for last add confirmed update. * - * @param previoisLAC - The threshold beyond which we would wait for the update - * @param observer - Observer to notify on update + * @param previousLAC - The threshold beyond which we would wait for the update + * @param watcher - Watcher to notify on update * @return * @throws IOException */ - Observable waitForLastAddConfirmedUpdate(long ledgerId, long previoisLAC, Observer observer) throws IOException; + boolean waitForLastAddConfirmedUpdate(long ledgerId, + long previousLAC, + Watcher<LastAddConfirmedUpdateNotification> watcher) throws IOException; /** * Flushes all data in the storage. Once this is called, diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/LongPollReadEntryProcessorV3.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/LongPollReadEntryProcessorV3.java index 342e788..f60dac7 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/LongPollReadEntryProcessorV3.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/LongPollReadEntryProcessorV3.java @@ -22,16 +22,14 @@ import com.google.common.base.Stopwatch; import io.netty.channel.Channel; import io.netty.util.HashedWheelTimer; import io.netty.util.Timeout; -import io.netty.util.TimerTask; import java.io.IOException; -import java.util.Observable; -import java.util.Observer; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; import org.apache.bookkeeper.bookie.Bookie; import org.apache.bookkeeper.bookie.LastAddConfirmedUpdateNotification; +import org.apache.bookkeeper.common.util.Watcher; import org.apache.bookkeeper.proto.BookkeeperProtocol.Request; import org.apache.bookkeeper.proto.BookkeeperProtocol.ReadResponse; import org.apache.bookkeeper.proto.BookkeeperProtocol.StatusCode; @@ -41,7 +39,7 @@ import org.slf4j.LoggerFactory; /** * Processor handling long poll read entry request. */ -class LongPollReadEntryProcessorV3 extends ReadEntryProcessorV3 implements Observer { +class LongPollReadEntryProcessorV3 extends ReadEntryProcessorV3 implements Watcher<LastAddConfirmedUpdateNotification> { private final static Logger logger = LoggerFactory.getLogger(LongPollReadEntryProcessorV3.class); @@ -141,9 +139,9 @@ class LongPollReadEntryProcessorV3 extends ReadEntryProcessorV3 implements Obser final Stopwatch startTimeSw = Stopwatch.createStarted(); - final Observable observable; + final boolean watched; try { - observable = requestProcessor.bookie.waitForLastAddConfirmedUpdate(ledgerId, previousLAC, this); + watched = requestProcessor.bookie.waitForLastAddConfirmedUpdate(ledgerId, previousLAC, this); } catch (Bookie.NoLedgerException e) { logger.info("No ledger found while longpoll reading ledger {}, previous lac = {}.", ledgerId, previousLAC); @@ -157,19 +155,16 @@ class LongPollReadEntryProcessorV3 extends ReadEntryProcessorV3 implements Obser registerSuccessfulEvent(requestProcessor.longPollPreWaitStats, startTimeSw); lastPhaseStartTime.reset().start(); - if (null != observable) { - // successfully registered observable to lac updates + if (watched) { + // successfully registered watcher to lac updates if (logger.isTraceEnabled()) { logger.trace("Waiting For LAC Update {}: Timeout {}", previousLAC, readRequest.getTimeOut()); } synchronized (this) { - expirationTimerTask = requestTimer.newTimeout(new TimerTask() { - @Override - public void run(Timeout timeout) throws Exception { - // When the timeout expires just get whatever is the current - // readLastConfirmed - LongPollReadEntryProcessorV3.this.scheduleDeferredRead(observable, true); - } + expirationTimerTask = requestTimer.newTimeout(timeout -> { + // When the timeout expires just get whatever is the current + // readLastConfirmed + LongPollReadEntryProcessorV3.this.scheduleDeferredRead(true); }, readRequest.getTimeOut(), TimeUnit.MILLISECONDS); } return null; @@ -188,27 +183,25 @@ class LongPollReadEntryProcessorV3 extends ReadEntryProcessorV3 implements Obser } @Override - public void update(Observable observable, Object o) { - LastAddConfirmedUpdateNotification newLACNotification = (LastAddConfirmedUpdateNotification)o; - if (newLACNotification.lastAddConfirmed > previousLAC) { - if (newLACNotification.lastAddConfirmed != Long.MAX_VALUE && - !lastAddConfirmedUpdateTime.isPresent()) { - lastAddConfirmedUpdateTime = Optional.of(newLACNotification.timestamp); + public void update(LastAddConfirmedUpdateNotification newLACNotification) { + if (newLACNotification.getLastAddConfirmed() > previousLAC) { + if (newLACNotification.getLastAddConfirmed() != Long.MAX_VALUE && !lastAddConfirmedUpdateTime.isPresent()) { + lastAddConfirmedUpdateTime = Optional.of(newLACNotification.getTimestamp()); } if (logger.isTraceEnabled()) { logger.trace("Last Add Confirmed Advanced to {} for request {}", - newLACNotification.lastAddConfirmed, request); + newLACNotification.getLastAddConfirmed(), request); } - scheduleDeferredRead(observable, false); + scheduleDeferredRead(false); } + newLACNotification.recycle(); } - private synchronized void scheduleDeferredRead(Observable observable, boolean timeout) { + private synchronized void scheduleDeferredRead(boolean timeout) { if (null == deferredTask) { if (logger.isTraceEnabled()) { logger.trace("Deferred Task, expired: {}, request: {}", timeout, request); } - observable.deleteObserver(this); try { shouldReadEntry = true; deferredTask = longPollThreadPool.submit(this); diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/IndexPersistenceMgrTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/IndexPersistenceMgrTest.java index 1c63d39..a179745 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/IndexPersistenceMgrTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/IndexPersistenceMgrTest.java @@ -20,6 +20,15 @@ */ package org.apache.bookkeeper.bookie; +import static com.google.common.base.Charsets.UTF_8; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.File; +import org.apache.bookkeeper.common.util.Watcher; import org.apache.bookkeeper.conf.ServerConfiguration; import org.apache.bookkeeper.stats.NullStatsLogger; import org.apache.bookkeeper.util.DiskChecker; @@ -31,13 +40,6 @@ import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.File; -import java.util.Observable; -import java.util.Observer; - -import static com.google.common.base.Charsets.UTF_8; -import static org.junit.Assert.*; - /** * Test cases for IndexPersistenceMgr */ @@ -235,29 +237,27 @@ public class IndexPersistenceMgrTest { @Test public void testEvictionShouldNotAffectLongPollRead() throws Exception { IndexPersistenceMgr indexPersistenceMgr = null; - Observer observer = (obs, obj) -> { - //no-ops - }; + Watcher<LastAddConfirmedUpdateNotification> watcher = notification -> notification.recycle(); try { indexPersistenceMgr = createIndexPersistenceManager(1); indexPersistenceMgr.getFileInfo(lid, masterKey); indexPersistenceMgr.getFileInfo(lid, null); indexPersistenceMgr.updateLastAddConfirmed(lid, 1); - Observable observable = indexPersistenceMgr.waitForLastAddConfirmedUpdate(lid, 1, observer); - // observer shouldn't be null because ledger is not evicted or closed - assertNotNull("Observer should not be null", observable); + // watch should succeed because ledger is not evicted or closed + assertTrue( + indexPersistenceMgr.waitForLastAddConfirmedUpdate(lid, 1, watcher)); // now evict ledger 1 from write cache indexPersistenceMgr.getFileInfo(lid + 1, masterKey); - observable = indexPersistenceMgr.waitForLastAddConfirmedUpdate(lid, 1, observer); - // even if ledger 1 is evicted from write cache, observer still shouldn't be null - assertNotNull("Observer should not be null", observable); + // even if ledger 1 is evicted from write cache, watcher should still succeed + assertTrue( + indexPersistenceMgr.waitForLastAddConfirmedUpdate(lid, 1, watcher)); // now evict ledger 1 from read cache indexPersistenceMgr.getFileInfo(lid + 2, masterKey); indexPersistenceMgr.getFileInfo(lid + 2, null); - // even if ledger 1 is evicted from both cache, observer still shouldn't be null because it + // even if ledger 1 is evicted from both cache, watcher should still succeed because it // will create a new FileInfo when cache miss - observable = indexPersistenceMgr.waitForLastAddConfirmedUpdate(lid, 1, observer); - assertNotNull("Observer should not be null", observable); + assertTrue( + indexPersistenceMgr.waitForLastAddConfirmedUpdate(lid, 1, watcher)); } finally { if (null != indexPersistenceMgr) { indexPersistenceMgr.close(); diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LastAddConfirmedUpdateNotificationTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LastAddConfirmedUpdateNotificationTest.java new file mode 100644 index 0000000..90da463 --- /dev/null +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LastAddConfirmedUpdateNotificationTest.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.bookkeeper.bookie; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import org.junit.Test; + +/** + * Unit test of {@link LastAddConfirmedUpdateNotification}. + */ +public class LastAddConfirmedUpdateNotificationTest { + + @Test + public void testGetters() { + long lac = System.currentTimeMillis(); + LastAddConfirmedUpdateNotification notification = LastAddConfirmedUpdateNotification.of(lac); + + long timestamp = System.currentTimeMillis(); + assertEquals(lac, notification.getLastAddConfirmed()); + assertTrue(notification.getTimestamp() <= timestamp); + + notification.recycle(); + } + + @Test + public void testRecycle() { + long lac = System.currentTimeMillis(); + LastAddConfirmedUpdateNotification notification = LastAddConfirmedUpdateNotification.of(lac); + notification.recycle(); + + assertEquals(-1L, notification.getLastAddConfirmed()); + assertEquals(-1L, notification.getTimestamp()); + } + + @Test + public void testFunc() { + long lac = System.currentTimeMillis(); + LastAddConfirmedUpdateNotification notification = LastAddConfirmedUpdateNotification.FUNC.apply(lac); + + assertEquals(lac, notification.getLastAddConfirmed()); + } + +} diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSyncThread.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSyncThread.java index 5f30557..d02f067 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSyncThread.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSyncThread.java @@ -23,8 +23,7 @@ package org.apache.bookkeeper.bookie; import io.netty.buffer.ByteBuf; import java.io.File; import java.io.IOException; -import java.util.Observable; -import java.util.Observer; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Callable; @@ -33,13 +32,14 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import org.apache.bookkeeper.conf.TestBKConfiguration; -import org.apache.bookkeeper.conf.ServerConfiguration; -import org.apache.bookkeeper.stats.StatsLogger; import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint; import org.apache.bookkeeper.bookie.LedgerDirsManager.LedgerDirsListener; import org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException; +import org.apache.bookkeeper.common.util.Watcher; +import org.apache.bookkeeper.conf.ServerConfiguration; +import org.apache.bookkeeper.conf.TestBKConfiguration; import org.apache.bookkeeper.meta.LedgerManager; +import org.apache.bookkeeper.stats.StatsLogger; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.junit.Test; @@ -341,8 +341,11 @@ public class TestSyncThread { } @Override - public Observable waitForLastAddConfirmedUpdate(long ledgerId, long previoisLAC, Observer observer) throws IOException { - return null; + public boolean waitForLastAddConfirmedUpdate(long ledgerId, + long previousLAC, + Watcher<LastAddConfirmedUpdateNotification> watcher) + throws IOException { + return false; } @Override diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java index ad21fc1..8cec701 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java @@ -21,7 +21,6 @@ package org.apache.bookkeeper.meta; - import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; @@ -37,8 +36,6 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.NavigableMap; -import java.util.Observable; -import java.util.Observer; import java.util.Queue; import java.util.Random; import java.util.Set; @@ -55,11 +52,13 @@ import org.apache.bookkeeper.bookie.CompactableLedgerStorage; import org.apache.bookkeeper.bookie.EntryLocation; import org.apache.bookkeeper.bookie.EntryLogger; import org.apache.bookkeeper.bookie.GarbageCollector; +import org.apache.bookkeeper.bookie.LastAddConfirmedUpdateNotification; import org.apache.bookkeeper.bookie.LedgerDirsManager; import org.apache.bookkeeper.bookie.ScanAndCompareGarbageCollector; import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.BookKeeper.DigestType; import org.apache.bookkeeper.client.LedgerMetadata; +import org.apache.bookkeeper.common.util.Watcher; import org.apache.bookkeeper.conf.ServerConfiguration; import org.apache.bookkeeper.meta.LedgerManager.LedgerRange; import org.apache.bookkeeper.meta.LedgerManager.LedgerRangeIterator; @@ -425,8 +424,11 @@ public class GcLedgersTest extends LedgerManagerTestCase { } @Override - public Observable waitForLastAddConfirmedUpdate(long ledgerId, long previoisLAC, Observer observer) throws IOException { - return null; + public boolean waitForLastAddConfirmedUpdate(long ledgerId, + long previousLAC, + Watcher<LastAddConfirmedUpdateNotification> watcher) + throws IOException { + return false; } } } diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java index 5d357c3..492320c 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java @@ -27,8 +27,6 @@ import java.util.Arrays; import java.util.Collection; import java.util.Map; import java.util.NavigableMap; -import java.util.Observable; -import java.util.Observer; import org.apache.bookkeeper.bookie.BookieException; import org.apache.bookkeeper.bookie.CheckpointSource; import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint; @@ -36,7 +34,9 @@ import org.apache.bookkeeper.bookie.Checkpointer; import org.apache.bookkeeper.bookie.CompactableLedgerStorage; import org.apache.bookkeeper.bookie.EntryLocation; import org.apache.bookkeeper.bookie.EntryLogger; +import org.apache.bookkeeper.bookie.LastAddConfirmedUpdateNotification; import org.apache.bookkeeper.bookie.LedgerDirsManager; +import org.apache.bookkeeper.common.util.Watcher; import org.apache.bookkeeper.conf.ServerConfiguration; import org.apache.bookkeeper.stats.StatsLogger; import org.apache.bookkeeper.test.BookKeeperClusterTestCase; @@ -210,8 +210,11 @@ public abstract class LedgerManagerTestCase extends BookKeeperClusterTestCase { } @Override - public Observable waitForLastAddConfirmedUpdate(long ledgerId, long previoisLAC, Observer observer) throws IOException { - return null; + public boolean waitForLastAddConfirmedUpdate(long ledgerId, + long previousLAC, + Watcher<LastAddConfirmedUpdateNotification> watcher) + throws IOException { + return false; } @Override -- To stop receiving notification emails like this one, please contact "[email protected]" <[email protected]>.
