This is an automated email from the ASF dual-hosted git repository. nightowl888 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/lucenenet.git
commit 2bd1f2c2ea6853d8d46da3ba9760fb90409edeea Author: Shad Storhaug <[email protected]> AuthorDate: Sat Nov 6 16:55:11 2021 +0700 Lucene.Net.Replicator.ReplicationClient: Refactored to use ThreadJob so exceptions are re-thrown on the calling thread. Added missing locks. --- src/Lucene.Net.Replicator/LocalReplicator.cs | 30 +-- src/Lucene.Net.Replicator/ReplicationClient.cs | 304 +++++++++++-------------- 2 files changed, 151 insertions(+), 183 deletions(-) diff --git a/src/Lucene.Net.Replicator/LocalReplicator.cs b/src/Lucene.Net.Replicator/LocalReplicator.cs index aee5c18..7405e13 100644 --- a/src/Lucene.Net.Replicator/LocalReplicator.cs +++ b/src/Lucene.Net.Replicator/LocalReplicator.cs @@ -123,7 +123,7 @@ namespace Lucene.Net.Replicator private long expirationThreshold = DEFAULT_SESSION_EXPIRATION_THRESHOLD; - private readonly object padlock = new object(); + private readonly object syncLock = new object(); private volatile RefCountedRevision currentRevision; private volatile bool disposed = false; @@ -160,7 +160,7 @@ namespace Lucene.Net.Replicator /// <exception cref="ObjectDisposedException">This replicator has already been disposed.</exception> protected void EnsureOpen() { - UninterruptableMonitor.Enter(padlock); + UninterruptableMonitor.Enter(syncLock); try { if (!disposed) @@ -170,13 +170,13 @@ namespace Lucene.Net.Replicator } finally { - UninterruptableMonitor.Exit(padlock); + UninterruptableMonitor.Exit(syncLock); } } public virtual SessionToken CheckForUpdate(string currentVersion) { - UninterruptableMonitor.Enter(padlock); + UninterruptableMonitor.Enter(syncLock); try { EnsureOpen(); @@ -196,7 +196,7 @@ namespace Lucene.Net.Replicator } finally { - UninterruptableMonitor.Exit(padlock); + UninterruptableMonitor.Exit(syncLock); } } @@ -205,7 +205,7 @@ namespace Lucene.Net.Replicator if (disposed || !disposing) return; - UninterruptableMonitor.Enter(padlock); + UninterruptableMonitor.Enter(syncLock); try { foreach (ReplicationSession session in sessions.Values) @@ -214,7 +214,7 @@ namespace Lucene.Net.Replicator } finally { - UninterruptableMonitor.Exit(padlock); + UninterruptableMonitor.Exit(syncLock); } disposed = true; } @@ -237,7 +237,7 @@ namespace Lucene.Net.Replicator get => expirationThreshold; set { - UninterruptableMonitor.Enter(padlock); + UninterruptableMonitor.Enter(syncLock); try { EnsureOpen(); @@ -246,14 +246,14 @@ namespace Lucene.Net.Replicator } finally { - UninterruptableMonitor.Exit(padlock); + UninterruptableMonitor.Exit(syncLock); } } } public virtual Stream ObtainFile(string sessionId, string source, string fileName) { - UninterruptableMonitor.Enter(padlock); + UninterruptableMonitor.Enter(syncLock); try { EnsureOpen(); @@ -273,13 +273,13 @@ namespace Lucene.Net.Replicator } finally { - UninterruptableMonitor.Exit(padlock); + UninterruptableMonitor.Exit(syncLock); } } public virtual void Publish(IRevision revision) { - UninterruptableMonitor.Enter(padlock); + UninterruptableMonitor.Enter(syncLock); try { EnsureOpen(); @@ -310,14 +310,14 @@ namespace Lucene.Net.Replicator } finally { - UninterruptableMonitor.Exit(padlock); + UninterruptableMonitor.Exit(syncLock); } } /// <exception cref="InvalidOperationException"></exception> public virtual void Release(string sessionId) { - UninterruptableMonitor.Enter(padlock); + UninterruptableMonitor.Enter(syncLock); try { EnsureOpen(); @@ -325,7 +325,7 @@ namespace Lucene.Net.Replicator } finally { - UninterruptableMonitor.Exit(padlock); + UninterruptableMonitor.Exit(syncLock); } } } diff --git a/src/Lucene.Net.Replicator/ReplicationClient.cs b/src/Lucene.Net.Replicator/ReplicationClient.cs index 7604fa1..4546d28 100644 --- a/src/Lucene.Net.Replicator/ReplicationClient.cs +++ b/src/Lucene.Net.Replicator/ReplicationClient.cs @@ -1,15 +1,16 @@ -using Lucene.Net.Store; +using J2N; +using J2N.Collections.Generic.Extensions; +using J2N.Threading; +using Lucene.Net.Diagnostics; +using Lucene.Net.Store; using Lucene.Net.Support.Threading; using Lucene.Net.Util; using System; using System.Collections.Generic; -using System.Collections.ObjectModel; -using System.Diagnostics; using System.IO; using System.Threading; -using JCG = J2N.Collections.Generic; using Directory = Lucene.Net.Store.Directory; -using Lucene.Net.Diagnostics; +using JCG = J2N.Collections.Generic; namespace Lucene.Net.Replicator { @@ -44,25 +45,15 @@ namespace Lucene.Net.Replicator /// </remarks> public class ReplicationClient : IDisposable { - // LUCENENET TODO: Check to ensure ThreadInterruptException is being passed from the background worker to the current thread, as it is required by IndexWriter - - //Note: LUCENENET specific, .NET does not work with Threads in the same way as Java does, so we mimic the same behavior using the ThreadPool instead. - private class ReplicationThread + private class ReplicationThread : ThreadJob { + private readonly long intervalMillis; + private readonly ReentrantLock updateLock; private readonly Action doUpdate; - private readonly Action<Exception> handleException; - private readonly ReentrantLock @lock; - private readonly object controlLock = new object(); - - private readonly long interval; - private readonly AutoResetEvent handle = new AutoResetEvent(false); + private readonly Action<Exception> handleUpdateException; - private AutoResetEvent stopHandle; - - /// <summary> - /// Gets or sets the name - /// </summary> - public string Name { get; private set; } + // client uses this to stop us + internal readonly CountdownEvent stop = new CountdownEvent(1); /// <summary> /// @@ -70,117 +61,59 @@ namespace Lucene.Net.Replicator /// <param name="intervalMillis">The interval in milliseconds.</param> /// <param name="threadName">The thread name.</param> /// <param name="doUpdate">A delegate to call to perform the update.</param> - /// <param name="handleException">A delegate to call to handle an exception.</param> - /// <param name="lock"></param> - public ReplicationThread(long intervalMillis, string threadName, Action doUpdate, Action<Exception> handleException, ReentrantLock @lock) + /// <param name="handleUpdateException">A delegate to call to handle an exception.</param> + /// <param name="updateLock"></param> + public ReplicationThread(long intervalMillis, string threadName, Action doUpdate, Action<Exception> handleUpdateException, ReentrantLock updateLock) + : base(threadName) { - this.doUpdate = doUpdate; - this.handleException = handleException; - this.@lock = @lock; - Name = threadName; - this.interval = intervalMillis; + this.intervalMillis = intervalMillis; + this.doUpdate = doUpdate ?? throw new ArgumentNullException(nameof(doUpdate)); + this.handleUpdateException = handleUpdateException ?? throw new ArgumentNullException(nameof(handleUpdateException)); + this.updateLock = updateLock ?? throw new ArgumentNullException(nameof(updateLock)); } - /// <summary> - /// - /// </summary> - public bool IsAlive { get; private set; } - - /// <summary> - /// - /// </summary> - public void Start() - { - UninterruptableMonitor.Enter(controlLock); - try - { - if (IsAlive) - return; - IsAlive = true; - } - finally - { - UninterruptableMonitor.Exit(controlLock); - } - RegisterWait(interval); - } - - /// <summary> - /// - /// </summary> - public void Stop() + public override void Run() { - UninterruptableMonitor.Enter(controlLock); - try + while (true) { - if (!IsAlive) - return; - IsAlive = false; - } - finally - { - UninterruptableMonitor.Exit(controlLock); - } - stopHandle = new AutoResetEvent(false); - - //NOTE: Execute any outstanding, this execution will terminate almost instantaniously if it's not already running. - ExecuteImmediately(); - - stopHandle.WaitOne(); - stopHandle = null; - } - - /// <summary> - /// Executes the next cycle of work immediately - /// </summary> - public void ExecuteImmediately() - { - handle.Set(); - } - - private void RegisterWait(long timeout) - { - //NOTE: We don't care about timedout as it can either be because we was requested to run immidiately or stop. - if (IsAlive) - ThreadPool.RegisterWaitForSingleObject(handle, (state, timedout) => Run(), null, timeout, true); - else - SignalStop(); - } - - private void SignalStop() - { - if (stopHandle != null) - stopHandle.Set(); - } - - private void Run() - { - if (!IsAlive) - { - SignalStop(); - return; - } + long time = Time.NanoTime() / Time.MillisecondsPerNanosecond; + updateLock.Lock(); + try + { + doUpdate(); + } + catch (Exception t) when (t.IsThrowable()) + { + handleUpdateException(t); + } + finally + { + updateLock.Unlock(); + } + time = Time.NanoTime() / Time.MillisecondsPerNanosecond - time; - Stopwatch timer = Stopwatch.StartNew(); - @lock.Lock(); - try - { - doUpdate(); - } - catch (Exception t) when (t.IsThrowable()) - { - handleException(t); - } - finally - { - @lock.Unlock(); - - timer.Stop(); - long driftAdjusted = Math.Max(interval - timer.ElapsedMilliseconds, 0); - if (IsAlive) - RegisterWait(driftAdjusted); - else - SignalStop(); + // adjust timeout to compensate the time spent doing the replication. + long timeout = intervalMillis - time; + if (timeout > 0) + { + try + { + // this will return immediately if we were ordered to stop (count=0) + // or the timeout has elapsed. if it returns true, it means count=0, + // so terminate. + if (stop.Wait(TimeSpan.FromMilliseconds(timeout))) // await(timeout, TimeUnit.MILLISECONDS)) + { + return; + } + } + catch (Exception e) when (e.IsInterruptedException()) + { + // if we were interruted, somebody wants to terminate us, so just + // throw the exception further. + Thread.CurrentThread.Interrupt(); + throw new Util.ThreadInterruptedException(e); + } + } } } } @@ -199,6 +132,7 @@ namespace Lucene.Net.Replicator private readonly byte[] copyBuffer = new byte[16384]; private readonly ReentrantLock updateLock = new ReentrantLock(); + private readonly object syncLock = new object(); // LUCENENET specific to avoid lock (this) private ReplicationThread updateThread; private bool disposed = false; @@ -239,13 +173,13 @@ namespace Lucene.Net.Replicator string version = handler.CurrentVersion; session = replicator.CheckForUpdate(version); - WriteToInfoStream(string.Format("doUpdate(): handlerVersion={0} session={1}", version, session)); + WriteToInfoStream(string.Format("DoUpdate(): handlerVersion={0} session={1}", version, session)); if (session == null) return; IDictionary<string, IList<RevisionFile>> requiredFiles = RequiredFiles(session.SourceFiles); - WriteToInfoStream(string.Format("doUpdate(): handlerVersion={0} session={1}", version, session)); + WriteToInfoStream(string.Format("DoUpdate(): handlerVersion={0} session={1}", version, session)); foreach (KeyValuePair<string, IList<RevisionFile>> pair in requiredFiles) { @@ -260,7 +194,7 @@ namespace Lucene.Net.Replicator if (disposed) { // if we're closed, abort file copy - WriteToInfoStream("doUpdate(): detected client was closed); abort file copy"); + WriteToInfoStream("DoUpdate(): detected client was closed); abort file copy"); return; } @@ -281,9 +215,9 @@ namespace Lucene.Net.Replicator IOUtils.Dispose(input, output); } } - // only notify if all required files were successfully obtained. - notify = true; } + // only notify if all required files were successfully obtained. + notify = true; } finally { @@ -312,7 +246,8 @@ namespace Lucene.Net.Replicator { if (notify && !disposed) { // no use to notify if we are closed already - handler.RevisionReady(session.Version, session.SourceFiles, new ReadOnlyDictionary<string, IList<string>>(copiedFiles), sourceDirectory); + // LUCENENET specific - pass the copiedFiles as read only + handler.RevisionReady(session.Version, session.SourceFiles, copiedFiles.AsReadOnly(), sourceDirectory); } } finally @@ -398,12 +333,20 @@ namespace Lucene.Net.Replicator protected virtual void Dispose(bool disposing) { - if (disposed || !disposing) - return; + UninterruptableMonitor.Enter(syncLock); + try + { + if (disposed || !disposing) + return; - StopUpdateThread(); - infoStream.Dispose(); // LUCENENET specific - disposed = true; + StopUpdateThread(); + infoStream.Dispose(); // LUCENENET specific + disposed = true; + } + finally + { + UninterruptableMonitor.Exit(syncLock); + } } public void Dispose() @@ -415,21 +358,29 @@ namespace Lucene.Net.Replicator /// <summary> /// Start the update thread with the specified interval in milliseconds. For /// debugging purposes, you can optionally set the name to set on - /// <see cref="ReplicationThread.Name"/>. If you pass <c>null</c>, a default name + /// <see cref="ThreadJob.Name"/>. If you pass <c>null</c>, a default name /// will be set. /// </summary> /// <exception cref="InvalidOperationException"> if the thread has already been started </exception> - public virtual void StartUpdateThread(long intervalMillis, string threadName) + public virtual void StartUpdateThread(long intervalInMilliseconds, string threadName) { - EnsureOpen(); - if (updateThread != null && updateThread.IsAlive) - throw IllegalStateException.Create("cannot start an update thread when one is running, must first call 'stopUpdateThread()'"); - - threadName = threadName == null ? INFO_STREAM_COMPONENT : "ReplicationThread-" + threadName; - updateThread = new ReplicationThread(intervalMillis, threadName, DoUpdate, HandleUpdateException, updateLock); - updateThread.Start(); - // we rely on isAlive to return true in isUpdateThreadAlive, assert to be on the safe side - if (Debugging.AssertsEnabled) Debugging.Assert(updateThread.IsAlive, "updateThread started but not alive?"); + UninterruptableMonitor.Enter(syncLock); + try + { + EnsureOpen(); + if (updateThread != null && updateThread.IsAlive) + throw IllegalStateException.Create("cannot start an update thread when one is running, must first call 'stopUpdateThread()'"); + + threadName = threadName == null ? INFO_STREAM_COMPONENT : "ReplicationThread-" + threadName; + updateThread = new ReplicationThread(intervalInMilliseconds, threadName, DoUpdate, HandleUpdateException, updateLock); + updateThread.Start(); + // we rely on isAlive to return true in isUpdateThreadAlive, assert to be on the safe side + if (Debugging.AssertsEnabled) Debugging.Assert(updateThread.IsAlive, "updateThread started but not alive?"); + } + finally + { + UninterruptableMonitor.Exit(syncLock); + } } /// <summary> @@ -438,21 +389,31 @@ namespace Lucene.Net.Replicator /// </summary> public virtual void StopUpdateThread() { - // this will trigger the thread to terminate if it awaits the lock. - // otherwise, if it's in the middle of replication, we wait for it to - // stop. - if (updateThread != null) + UninterruptableMonitor.Enter(syncLock); + try { - try - { - updateThread.Stop(); - } - catch (Exception ie) when (ie.IsInterruptedException()) + if (updateThread != null) { - throw new Util.ThreadInterruptedException(ie); + // this will trigger the thread to terminate if it awaits the lock. + // otherwise, if it's in the middle of replication, we wait for it to + // stop. + updateThread.stop.Signal(); + try + { + updateThread.Join(); + } + catch (Exception ie) when (ie.IsInterruptedException()) + { + Thread.CurrentThread.Interrupt(); + throw new Util.ThreadInterruptedException(ie); + } } + updateThread = null; + } + finally + { + UninterruptableMonitor.Exit(syncLock); } - updateThread = null; } /// <summary> @@ -462,11 +423,25 @@ namespace Lucene.Net.Replicator /// caused it to terminate (i.e. <see cref="HandleUpdateException"/> /// threw the exception further). /// </summary> - public virtual bool IsUpdateThreadAlive => updateThread != null && updateThread.IsAlive; + public virtual bool IsUpdateThreadAlive + { + get + { + UninterruptableMonitor.Enter(syncLock); + try + { + return updateThread != null && updateThread.IsAlive; + } + finally + { + UninterruptableMonitor.Exit(syncLock); + } + } + } public override string ToString() { - if (updateThread == null) + if (updateThread is null) return "ReplicationClient"; return string.Format("ReplicationClient ({0})", updateThread.Name); } @@ -479,13 +454,6 @@ namespace Lucene.Net.Replicator public virtual void UpdateNow() { EnsureOpen(); - if (updateThread != null) - { - //NOTE: We have a worker running, we use that to perform the work instead by requesting it to run - // it's cycle immidiately. - updateThread.ExecuteImmediately(); - return; - } //NOTE: We don't have a worker running, so we just do the work. updateLock.Lock();
